Skip to content

Commit 21857b1

Browse files
committed
WIP: start implementing origin in the signature trait and processor
1 parent 6818667 commit 21857b1

File tree

5 files changed

+61
-12
lines changed

5 files changed

+61
-12
lines changed

mithril-aggregator/src/dependency_injection/builder/enablers/misc.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ impl DependenciesBuilder {
8787
self.get_certifier_service().await?,
8888
stop_rx,
8989
self.root_logger(),
90+
self.get_metrics_service().await?,
9091
);
9192

9293
Ok(Arc::new(signature_processor))

mithril-aggregator/src/services/signature_consumer/fake.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ impl SignatureConsumer for FakeSignatureConsumer {
3737
}
3838
}
3939
}
40+
fn get_origin_network(&self) -> String {
41+
"FAKE".to_string()
42+
}
4043
}
4144

4245
#[cfg(test)]

mithril-aggregator/src/services/signature_consumer/interface.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,7 @@ use mithril_common::{
99
pub trait SignatureConsumer: Sync + Send {
1010
/// Returns signatures when available
1111
async fn get_signatures(&self) -> StdResult<Vec<(SingleSignature, SignedEntityType)>>;
12+
13+
/// Returns the origin metric of the consumer (for example HTTP or DMQ)
14+
fn get_origin_network(&self) -> String;
1215
}

mithril-aggregator/src/services/signature_consumer/noop.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ impl SignatureConsumer for SignatureConsumerNoop {
1919
> {
2020
future::pending().await
2121
}
22+
23+
fn get_origin_network(&self) -> String {
24+
"NOOP".to_string()
25+
}
2226
}
2327

2428
#[cfg(test)]

mithril-aggregator/src/services/signature_processor.rs

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use slog::{error, warn, Logger};
55
use mithril_common::{logging::LoggerExtensions, StdResult};
66
use tokio::{select, sync::watch::Receiver};
77

8+
use crate::MetricsService;
9+
810
use super::{CertifierService, SignatureConsumer};
911

1012
/// A signature processor which receives signature and processes them.
@@ -24,6 +26,7 @@ pub struct SequentialSignatureProcessor {
2426
certifier: Arc<dyn CertifierService>,
2527
stop_rx: Receiver<()>,
2628
logger: Logger,
29+
metrics_service: Arc<MetricsService>,
2730
}
2831

2932
impl SequentialSignatureProcessor {
@@ -33,12 +36,14 @@ impl SequentialSignatureProcessor {
3336
certifier: Arc<dyn CertifierService>,
3437
stop_rx: Receiver<()>,
3538
logger: Logger,
39+
metrics_service: Arc<MetricsService>,
3640
) -> Self {
3741
Self {
3842
consumer,
3943
certifier,
4044
stop_rx,
4145
logger: logger.new_with_component_name::<Self>(),
46+
metrics_service,
4247
}
4348
}
4449
}
@@ -55,6 +60,11 @@ impl SignatureProcessor for SequentialSignatureProcessor {
5560
.await
5661
{
5762
error!(self.logger, "Error dispatching single signature"; "error" => ?e);
63+
} else {
64+
let origin_network = self.consumer.get_origin_network();
65+
self.metrics_service
66+
.get_signature_registration_total_received_since_startup()
67+
.increment(&[&origin_network]);
5868
}
5969
}
6070
}
@@ -107,24 +117,33 @@ mod tests {
107117
#[tokio::test]
108118
async fn processor_process_signatures_succeeds() {
109119
let logger = TestLogger::stdout();
120+
121+
let single_signatures = vec![
122+
(
123+
fake_data::single_signature(vec![1, 2, 3]),
124+
SignedEntityType::MithrilStakeDistribution(Epoch(1)),
125+
),
126+
(
127+
fake_data::single_signature(vec![4, 5, 6]),
128+
SignedEntityType::MithrilStakeDistribution(Epoch(2)),
129+
),
130+
];
131+
132+
let singles_signatures_length = single_signatures.len();
133+
134+
let network_origin = "test_network";
135+
110136
let mock_consumer = {
111137
let mut mock_consumer = MockSignatureConsumer::new();
112138
mock_consumer
113139
.expect_get_signatures()
114-
.returning(|| {
115-
Ok(vec![
116-
(
117-
fake_data::single_signature(vec![1, 2, 3]),
118-
SignedEntityType::MithrilStakeDistribution(Epoch(1)),
119-
),
120-
(
121-
fake_data::single_signature(vec![4, 5, 6]),
122-
SignedEntityType::MithrilStakeDistribution(Epoch(2)),
123-
),
124-
])
125-
})
140+
.returning(move || Ok(single_signatures.clone()))
126141
.times(1);
127142
mock_consumer
143+
.expect_get_origin_network()
144+
.returning(|| network_origin.to_string())
145+
.times(singles_signatures_length);
146+
mock_consumer
128147
};
129148
let mock_certifier = {
130149
let mut mock_certifier = MockCertifierService::new();
@@ -148,17 +167,33 @@ mod tests {
148167
mock_certifier
149168
};
150169
let (_stop_tx, stop_rx) = channel(());
170+
let metrics_service = MetricsService::new(TestLogger::stdout()).unwrap();
171+
172+
let initial_counter_value = metrics_service
173+
.get_signature_registration_total_received_since_startup()
174+
.get(&[network_origin]);
175+
176+
let metrics_service = Arc::new(metrics_service);
177+
151178
let processor = SequentialSignatureProcessor::new(
152179
Arc::new(mock_consumer),
153180
Arc::new(mock_certifier),
154181
stop_rx,
155182
logger,
183+
metrics_service.clone(),
156184
);
157185

158186
processor
159187
.process_signatures()
160188
.await
161189
.expect("Failed to process signatures");
190+
191+
assert_eq!(
192+
initial_counter_value + singles_signatures_length as u32,
193+
metrics_service
194+
.get_signature_registration_total_received_since_startup()
195+
.get(&[network_origin])
196+
)
162197
}
163198

164199
#[tokio::test]
@@ -185,11 +220,14 @@ mod tests {
185220
mock_certifier
186221
};
187222
let (stop_tx, stop_rx) = channel(());
223+
let metrics_service = MetricsService::new(TestLogger::stdout()).unwrap();
224+
188225
let processor = SequentialSignatureProcessor::new(
189226
Arc::new(fake_consumer),
190227
Arc::new(mock_certifier),
191228
stop_rx,
192229
logger,
230+
Arc::new(metrics_service),
193231
);
194232

195233
tokio::select!(

0 commit comments

Comments
 (0)