Skip to content

Commit 73a70a4

Browse files
committed
main loop listen also to block notify from zmq
1 parent 1c651dc commit 73a70a4

File tree

2 files changed

+32
-11
lines changed

2 files changed

+32
-11
lines changed

src/bin/electrs.rs

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ extern crate electrs;
66

77
use error_chain::ChainedError;
88
use std::process;
9+
use std::sync::mpsc::channel;
910
use std::sync::{Arc, RwLock};
1011
use std::time::Duration;
1112

@@ -45,8 +46,9 @@ fn run_server(config: Arc<Config>) -> Result<()> {
4546
let metrics = Metrics::new(config.monitoring_addr);
4647
metrics.start();
4748

49+
let (block_hash_notify, block_hash_receive) = channel();
4850
if let Some(zmq_addr) = config.zmq_addr.as_ref() {
49-
zmq::start(&format!("tcp://{zmq_addr}"), None);
51+
zmq::start(&format!("tcp://{zmq_addr}"), Some(block_hash_notify));
5052
}
5153

5254
let daemon = Arc::new(Daemon::new(
@@ -123,14 +125,33 @@ fn run_server(config: Arc<Config>) -> Result<()> {
123125
"count of iterations of electrs main loop each 5 seconds or after interrupts",
124126
));
125127

126-
loop {
128+
'outer: loop {
127129
main_loop_count.inc();
128130

129-
if let Err(err) = signal.wait(Duration::from_secs(5), true) {
130-
info!("stopping server: {}", err);
131-
rest_server.stop();
132-
// the electrum server is stopped when dropped
133-
break;
131+
// In the next for loop:
132+
// We are going to wait 5 secs (50*100ms) if nothings happens.
133+
// We are stopping if we receive a TERM or INT signal.
134+
// We are interrupting the wait if we receive a block hash notification (if zmq enabled) or receiving a USR1 signal.
135+
for _ in 0..50 {
136+
match signal.wait(Duration::from_millis(100), true) {
137+
Ok(is_sigusr) if is_sigusr => break,
138+
Ok(_) => (),
139+
Err(err) => {
140+
info!("stopping server: {}", err);
141+
rest_server.stop();
142+
// the electrum server is stopped when dropped
143+
break 'outer;
144+
}
145+
}
146+
147+
if let Ok(block_hash) = block_hash_receive.try_recv() {
148+
debug!("Main loop notified of a new block {block_hash}");
149+
while let Ok(block_hash) = block_hash_receive.try_recv() {
150+
// let's deplete the queue in the case there is a block burst
151+
debug!("Main loop notified of a new block {block_hash}");
152+
}
153+
break;
154+
}
134155
}
135156

136157
// Index new blocks

src/signal.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,24 +35,24 @@ impl Waiter {
3535
}
3636
}
3737

38-
pub fn wait(&self, duration: Duration, accept_sigusr: bool) -> Result<()> {
38+
pub fn wait(&self, duration: Duration, accept_sigusr: bool) -> Result<bool> {
3939
// Determine the deadline time based on the duration, so that it doesn't
4040
// get pushed back when wait_deadline() recurses
4141
self.wait_deadline(Instant::now() + duration, accept_sigusr)
4242
}
4343

44-
fn wait_deadline(&self, deadline: Instant, accept_sigusr: bool) -> Result<()> {
44+
fn wait_deadline(&self, deadline: Instant, accept_sigusr: bool) -> Result<bool> {
4545
match self.receiver.recv_deadline(deadline) {
4646
Ok(sig) if sig == SIGUSR1 => {
4747
trace!("notified via SIGUSR1");
4848
if accept_sigusr {
49-
Ok(())
49+
Ok(true)
5050
} else {
5151
self.wait_deadline(deadline, accept_sigusr)
5252
}
5353
}
5454
Ok(sig) => bail!(ErrorKind::Interrupt(sig)),
55-
Err(RecvTimeoutError::Timeout) => Ok(()),
55+
Err(RecvTimeoutError::Timeout) => Ok(false),
5656
Err(RecvTimeoutError::Disconnected) => bail!("signal hook channel disconnected"),
5757
}
5858
}

0 commit comments

Comments
 (0)