Skip to content

Commit a348af9

Browse files
committed
main loop listen also to block notify from zmq
1 parent f1d8844 commit a348af9

File tree

5 files changed

+38
-17
lines changed

5 files changed

+38
-17
lines changed

src/bin/electrs.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,12 @@ fn fetch_from(config: &Config, store: &Store) -> FetchFrom {
4141
}
4242

4343
fn run_server(config: Arc<Config>) -> Result<()> {
44-
let signal = Waiter::start();
44+
let (block_hash_notify, signal) = Waiter::start();
4545
let metrics = Metrics::new(config.monitoring_addr);
4646
metrics.start();
4747

4848
if let Some(zmq_addr) = config.zmq_addr.as_ref() {
49-
zmq::start(&format!("tcp://{zmq_addr}"), None);
49+
zmq::start(&format!("tcp://{zmq_addr}"), Some(block_hash_notify));
5050
}
5151

5252
let daemon = Arc::new(Daemon::new(

src/bin/tx-fingerprint-stats.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ fn main() {
2121
util::has_prevout,
2222
};
2323

24-
let signal = Waiter::start();
24+
let signal = Waiter::start().1;
2525
let config = Config::from_args();
2626
let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config));
2727

src/new_index/zmq.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
use std::sync::mpsc::Sender;
2-
31
use bitcoin::{hashes::Hash, BlockHash};
2+
use crossbeam_channel::Sender;
43

54
use crate::util::spawn_thread;
65

src/signal.rs

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
use crossbeam_channel::{self as channel, after, select};
1+
use bitcoin::BlockHash;
2+
use crossbeam_channel::{self as channel, after, select, Sender};
23
use std::thread;
34
use std::time::{Duration, Instant};
45

@@ -9,6 +10,7 @@ use crate::errors::*;
910
#[derive(Clone)] // so multiple threads could wait on signals
1011
pub struct Waiter {
1112
receiver: channel::Receiver<i32>,
13+
zmq_receiver: channel::Receiver<BlockHash>,
1214
}
1315

1416
fn notify(signals: &[i32]) -> channel::Receiver<i32> {
@@ -25,34 +27,54 @@ fn notify(signals: &[i32]) -> channel::Receiver<i32> {
2527
}
2628

2729
impl Waiter {
28-
pub fn start() -> Waiter {
29-
Waiter {
30-
receiver: notify(&[
31-
SIGINT, SIGTERM,
32-
SIGUSR1, // allow external triggering (e.g. via bitcoind `blocknotify`)
33-
]),
34-
}
30+
pub fn start() -> (Sender<BlockHash>, Waiter) {
31+
let (block_hash_notify, block_hash_receive) = channel::bounded(1);
32+
33+
(
34+
block_hash_notify,
35+
Waiter {
36+
receiver: notify(&[
37+
SIGINT, SIGTERM,
38+
SIGUSR1, // allow external triggering (e.g. via bitcoind `blocknotify`)
39+
]),
40+
zmq_receiver: block_hash_receive,
41+
},
42+
)
3543
}
3644

37-
pub fn wait(&self, duration: Duration, accept_sigusr: bool) -> Result<()> {
45+
pub fn wait(&self, duration: Duration, accept_block_notification: bool) -> Result<()> {
3846
let start = Instant::now();
3947
select! {
4048
recv(self.receiver) -> msg => {
4149
match msg {
4250
Ok(sig) if sig == SIGUSR1 => {
4351
trace!("notified via SIGUSR1");
44-
if accept_sigusr {
52+
if accept_block_notification {
4553
Ok(())
4654
} else {
4755
let wait_more = duration.saturating_sub(start.elapsed());
48-
self.wait(wait_more, accept_sigusr)
56+
self.wait(wait_more, accept_block_notification)
4957
}
5058
}
5159
Ok(sig) => bail!(ErrorKind::Interrupt(sig)),
5260
Err(_) => bail!("signal hook channel disconnected"),
5361
}
5462
},
63+
recv(self.zmq_receiver) -> msg => {
64+
match msg {
65+
Ok(_) => {
66+
if accept_block_notification {
67+
Ok(())
68+
} else {
69+
let wait_more = duration.saturating_sub(start.elapsed());
70+
self.wait(wait_more, accept_block_notification)
71+
}
72+
}
73+
Err(_) => bail!("signal hook channel disconnected"),
74+
}
75+
},
5576
recv(after(duration)) -> _ => Ok(()),
77+
5678
}
5779
}
5880
}

tests/common.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ impl TestRunner {
123123
//tor_proxy: Option<std::net::SocketAddr>,
124124
});
125125

126-
let signal = Waiter::start();
126+
let signal = Waiter::start().1;
127127
let metrics = Metrics::new(rand_available_addr());
128128
metrics.start();
129129

0 commit comments

Comments
 (0)