Skip to content

Commit 597ffc6

Browse files
committed
runner: enhance monitoring handler for airflow
1 parent b618b50 commit 597ffc6

File tree

3 files changed

+120
-36
lines changed

3 files changed

+120
-36
lines changed

dashboard/ui/src/pages/dashboard.js

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,9 @@ function Dashboard() {
5252

5353
const statusPriority = {
5454
DOWN: 0,
55-
MONITORED: 1,
56-
UP: 2,
55+
CHECKING: 1,
56+
MONITORED: 2,
57+
UP: 3,
5758
};
5859

5960
serviceArray.sort((a, b) => {
@@ -121,15 +122,28 @@ function Dashboard() {
121122
backgroundColor:
122123
status === 'UP' ? '#28a745' :
123124
status === 'DOWN' ? '#dc3545' :
124-
'#ffc107',
125+
status === 'MONITORED' ? '#ffc107' :
126+
'#b4e83a',
125127
whiteSpace: 'nowrap',
126128
animation: `pulse ${randomDuration.toFixed(2)}s infinite`,
127129
flexShrink: 0,
128130
};
129131
};
130-
131-
132-
132+
133+
const getMessageDetailsStyle = (service) => {
134+
return {
135+
fontSize: '14px',
136+
color:
137+
service.status === 'UP' ? '#28a745' :
138+
service.status === 'DOWN' ? '#dc3545' :
139+
service.status === 'MONITORED' ? '#ffc107' :
140+
'#b4e83a',
141+
marginBottom: '10px',
142+
wordWrap: 'break-word',
143+
overflowWrap: 'break-word',
144+
whiteSpace: 'pre-line',
145+
};
146+
};
133147

134148
const getTagsStyle = () => ({
135149
display: 'flex',
@@ -277,6 +291,8 @@ function Dashboard() {
277291
? 'Not OK'
278292
: service.status === 'MONITORED'
279293
? 'Monitored'
294+
: service.status == 'CHECKING'
295+
? 'Checking'
280296
: service.status}
281297
</span>
282298
</div>
@@ -352,14 +368,7 @@ function Dashboard() {
352368
)
353369
:
354370
(
355-
<div style={{
356-
fontSize: '14px',
357-
color: '#dc3545',
358-
marginBottom: '10px',
359-
wordWrap: 'break-word',
360-
overflowWrap: 'break-word',
361-
whiteSpace: 'pre-line',
362-
}}>
371+
<div style={getMessageDetailsStyle(service)}>
363372
{service.messageDetails}
364373
</div>
365374
)

runner/runner.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,29 @@ func healthCheck(n string, s tob.Service, t *time.Ticker, waiter tob.Waiter) {
227227
resp := s.Ping()
228228
respStr := string(resp)
229229

230+
// Airflow Monitoring
231+
if s.Name() == string(tob.Airflow) {
232+
for _, notificator := range s.GetNotificators() {
233+
if !util.IsNilish(notificator) {
234+
if notificator.IsEnabled() && notificator.Provider() == "webhook" {
235+
notificatorMessage := fmt.Sprintf("%s is DOWN", n)
236+
if s.GetMessage() != "" {
237+
notificatorMessage = fmt.Sprintf("%s is CHECKING | %s", n, s.GetMessage())
238+
if respStr == tob.NotOk {
239+
notificatorMessage = fmt.Sprintf("%s is DOWN | %s", n, s.GetMessage())
240+
}
241+
}
242+
if notificator.IsEnabled() && s.Name() != string(tob.SSLStatus) {
243+
err := notificator.Send(notificatorMessage)
244+
if err != nil {
245+
tob.Logger.Printf("notificator %s error: %s", notificator.Provider(), err.Error())
246+
}
247+
}
248+
}
249+
}
250+
}
251+
}
252+
230253
// SSL Monitoring
231254
if s.Name() == string(tob.SSLStatus) {
232255
for _, notificator := range s.GetNotificators() {
@@ -275,7 +298,7 @@ func healthCheck(n string, s tob.Service, t *time.Ticker, waiter tob.Waiter) {
275298

276299
notificatorMessage := fmt.Sprintf("%s is UP. It was down for %s", n, s.GetDownTimeDiff())
277300
if s.GetMessage() != "" {
278-
notificatorMessage = fmt.Sprintf("%s %s", n, s.GetMessage())
301+
notificatorMessage = fmt.Sprintf("%s is UP | %s", n, s.GetMessage())
279302
}
280303

281304
for _, notificator := range s.GetNotificators() {

services/airflow/airflow.go

Lines changed: 73 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,19 @@ import (
1717

1818
// Airflow service
1919
type Airflow struct {
20-
url string
21-
recovered bool
22-
lastDownTime string
23-
schedulerStatus string
24-
metadatabaseStatus string
25-
enabled bool
26-
verbose bool
27-
logger *log.Logger
28-
checkInterval int
29-
stopChan chan bool
30-
message string
31-
notificatorConfig config.Config
20+
url string
21+
recovered bool
22+
lastDownTime string
23+
schedulerStatus string
24+
latestSchedulerHeartbeat string
25+
metadatabaseStatus string
26+
enabled bool
27+
verbose bool
28+
logger *log.Logger
29+
checkInterval int
30+
stopChan chan bool
31+
message string
32+
notificatorConfig config.Config
3233
}
3334

3435
// Airflow's constructor
@@ -72,35 +73,86 @@ func (a *Airflow) checkClusterStatus(resp *http.Response) error {
7273
return err
7374
}
7475

75-
schedulerStatus, ok := data["scheduler"].(map[string]interface{})["status"].(string)
76+
schedulerRaw, ok := data["scheduler"].(map[string]interface{})
7677
if !ok {
7778
if a.verbose {
78-
a.logger.Printf("cannot read scheduler status: %v\n", err)
79+
a.logger.Println("cannot read scheduler block (not a map)")
7980
}
81+
return errors.New("invalid scheduler block")
82+
}
8083

81-
return err
84+
schedulerStatus, ok := schedulerRaw["status"].(string)
85+
if !ok {
86+
if a.verbose {
87+
a.logger.Println("cannot read scheduler status (not a string)")
88+
}
89+
90+
return errors.New("invalid scheduler status")
91+
}
92+
93+
latestSchedulerHeartbeat, ok := schedulerRaw["latest_scheduler_heartbeat"].(string)
94+
if !ok {
95+
if a.verbose {
96+
a.logger.Println("cannot read scheduler latest_scheduler_heartbeat (not a string)")
97+
}
98+
99+
return errors.New("invalid latest_scheduler_heartbeat")
100+
}
101+
102+
utcTime, err := time.Parse(time.RFC3339Nano, latestSchedulerHeartbeat)
103+
if err != nil {
104+
if a.verbose {
105+
a.logger.Printf("failed to parse time: %v\n", err)
106+
}
107+
108+
return errors.New("invalid latest_scheduler_heartbeat")
82109
}
110+
111+
timezoneJakarta, err := time.LoadLocation("Asia/Jakarta")
112+
if err != nil {
113+
if a.verbose {
114+
a.logger.Printf("failed to load WIB location: %v\n", err)
115+
}
116+
117+
return errors.New("invalid timezone")
118+
}
119+
120+
wibTime := utcTime.In(timezoneJakarta)
121+
a.latestSchedulerHeartbeat = wibTime.String()
122+
83123
a.schedulerStatus = schedulerStatus
84124

85-
metadatabaseStatus, ok := data["metadatabase"].(map[string]interface{})["status"].(string)
125+
metadatabaseRaw, ok := data["metadatabase"].(map[string]interface{})
86126
if !ok {
87127
if a.verbose {
88-
a.logger.Printf("cannot read metadatabase status: %v\n", err)
128+
a.logger.Println("cannot read metadatabase block (not a map)")
89129
}
130+
return errors.New("invalid metadatabase block")
131+
}
90132

91-
return err
133+
metadatabaseStatus, ok := metadatabaseRaw["status"].(string)
134+
if !ok {
135+
if a.verbose {
136+
a.logger.Println("cannot read metadatabase status")
137+
}
138+
139+
return errors.New("invalid metadatabase status")
92140
}
141+
93142
a.metadatabaseStatus = metadatabaseStatus
94143

144+
message := fmt.Sprintf("Airflow Scheduler Status: %s\nLatest Scheduler Heartbeat: %s\n\n\n Airflow Metadatabase: %s",
145+
a.schedulerStatus, a.latestSchedulerHeartbeat, a.metadatabaseStatus)
95146
if a.schedulerStatus != "healthy" || a.metadatabaseStatus != "healthy" {
96-
errMsg := fmt.Sprintf("airflow is unhealthy: scheduler (%s), metadatabase (%s)", a.schedulerStatus, a.metadatabaseStatus)
97147
if a.verbose {
98-
a.logger.Println(errMsg)
148+
a.logger.Println(message)
99149
}
100150

101-
return errors.New(errMsg)
151+
return errors.New(message)
102152
}
103153

154+
a.SetMessage(message)
155+
104156
return nil
105157
}
106158

0 commit comments

Comments
 (0)