7
7
*******************************************************************************/
8
8
package org .phoebus .applications .alarm .talk ;
9
9
10
+ import static java .lang .Thread .sleep ;
10
11
import static org .phoebus .applications .alarm .AlarmSystem .logger ;
11
12
13
+ import java .time .Duration ;
14
+ import java .time .Instant ;
12
15
import java .util .Collections ;
13
16
import java .util .List ;
14
17
import java .util .Objects ;
18
+ import java .util .Optional ;
15
19
import java .util .concurrent .CopyOnWriteArrayList ;
16
20
import java .util .concurrent .atomic .AtomicBoolean ;
21
+ import java .util .concurrent .atomic .AtomicReference ;
17
22
import java .util .logging .Level ;
18
23
19
24
import org .apache .kafka .clients .consumer .Consumer ;
@@ -41,7 +46,10 @@ public class TalkClient
41
46
private final CopyOnWriteArrayList <TalkClientListener > listeners = new CopyOnWriteArrayList <>();
42
47
private final AtomicBoolean running = new AtomicBoolean (true );
43
48
private final Consumer <String , String > consumer ;
49
+ private final Consumer <String , String > heartbeatConsumer ;
44
50
private final Thread thread ;
51
+ private final Thread updateHeartbeatTimestampThread ;
52
+ private final Thread annunciateDisconnectionThread ;
45
53
46
54
/** @param server - Kafka Server host:port
47
55
* @param config_name - Name of kafka config topic that the talk topic accompanies.
@@ -56,6 +64,16 @@ public TalkClient(final String server, final String config_name)
56
64
57
65
thread = new Thread (this ::run , "TalkClient" );
58
66
thread .setDaemon (true );
67
+
68
+ {
69
+ heartbeatConsumer = KafkaHelper .connectConsumer (server , List .of (config_name ), Collections .emptyList (), AlarmSystem .kafka_properties );
70
+ updateHeartbeatTimestampThread = new Thread (() -> updateHeartbeatTimestampLoop (), "UpdateHeartbeatTimestampThread" );
71
+ updateHeartbeatTimestampThread .setDaemon (false );
72
+ updateHeartbeatTimestampThread .start ();
73
+ annunciateDisconnectionThread = new Thread (() -> annunciateDisconnectionLoop (), "AnnunciateDisconnectionThread" );
74
+ annunciateDisconnectionThread .setDaemon (false );
75
+ annunciateDisconnectionThread .start ();
76
+ }
59
77
}
60
78
61
79
/** @param listener - Listener to add */
@@ -144,11 +162,57 @@ private void checkUpdates()
144
162
}
145
163
}
146
164
165
+ private Optional <Instant > nextDisconnectedAnnunciation = Optional .empty (); // When alarm server is disconnected: point in time for next annunciation of disconnection.
166
+ private final Duration disconnectionAnnunciationPeriod = Duration .ofMillis (AlarmSystem .nag_period_ms );
167
+ private final Duration idleTimeoutDuration = Duration .ofMillis (AlarmSystem .idle_timeout_ms ).multipliedBy (3 );
168
+ private AtomicReference <Instant > lastReceivedUpdateFromAlarmServer = new AtomicReference <>(Instant .now ());
169
+
170
+ private void updateHeartbeatTimestampLoop () {
171
+ while (running .get ()) {
172
+ final ConsumerRecords <String , String > records = heartbeatConsumer .poll (100 );
173
+ if (!records .isEmpty ()) {
174
+ lastReceivedUpdateFromAlarmServer .set (Instant .now ());
175
+ }
176
+ try {
177
+ sleep (1000 );
178
+ } catch (InterruptedException e ) {
179
+ logger .log (Level .WARNING , "updateHeartbeatTimestampLoop() was interrupted when sleeping." );
180
+ }
181
+ }
182
+ }
183
+
184
+ /** Background thread loop that detects and annunciates disconnections. */
185
+ private void annunciateDisconnectionLoop () {
186
+ while (running .get ()) {
187
+ Instant now = Instant .now ();
188
+ if (Duration .between (lastReceivedUpdateFromAlarmServer .get (), now ).compareTo (idleTimeoutDuration ) > 0 ) {
189
+ if (nextDisconnectedAnnunciation .isEmpty () || nextDisconnectedAnnunciation .get ().isBefore (now )) {
190
+ try {
191
+ for (final TalkClientListener listener : listeners ) {
192
+ listener .messageReceived (SeverityLevel .UNDEFINED , true , "Alarm Server Disconnected" );
193
+ }
194
+ } catch (final Exception ex ) {
195
+ logger .log (Level .WARNING , "Talk error for " + SeverityLevel .UNDEFINED + ", " + "Alarm Server Disconnected" , ex );
196
+ }
197
+ nextDisconnectedAnnunciation = Optional .of (now .plus (disconnectionAnnunciationPeriod ));
198
+ }
199
+ } else {
200
+ nextDisconnectedAnnunciation = Optional .empty (); // Connection to the Alarm Server exists.
201
+ }
202
+ try {
203
+ sleep (1000 );
204
+ } catch (InterruptedException e ) {
205
+ logger .log (Level .WARNING , "annunciateDisconnectionLoop() was interrupted when sleeping." );
206
+ }
207
+ }
208
+ }
209
+
147
210
/** Stop client */
148
211
public void shutdown ()
149
212
{
150
213
running .set (false );
151
214
consumer .wakeup ();
215
+ heartbeatConsumer .wakeup ();
152
216
try
153
217
{
154
218
thread .join (2000 );
@@ -157,6 +221,22 @@ public void shutdown()
157
221
{
158
222
logger .log (Level .WARNING , "Talk client thread doesn't shut down" , ex );
159
223
}
224
+ try
225
+ {
226
+ annunciateDisconnectionThread .join (2000 );
227
+ }
228
+ catch (final InterruptedException ex )
229
+ {
230
+ logger .log (Level .WARNING , "Annunciate Disconnection from Alarm Server thread doesn't shut down" , ex );
231
+ }
232
+ try
233
+ {
234
+ updateHeartbeatTimestampThread .join (2000 );
235
+ }
236
+ catch (final InterruptedException ex )
237
+ {
238
+ logger .log (Level .WARNING , "Update Alarm Server Heartbeat thread doesn't shut down" , ex );
239
+ }
160
240
logger .info (thread .getName () + " shut down" );
161
241
}
162
242
}
0 commit comments