@@ -5,6 +5,8 @@ use slog::{error, warn, Logger};
5
5
use mithril_common:: { logging:: LoggerExtensions , StdResult } ;
6
6
use tokio:: { select, sync:: watch:: Receiver } ;
7
7
8
+ use crate :: MetricsService ;
9
+
8
10
use super :: { CertifierService , SignatureConsumer } ;
9
11
10
12
/// A signature processor which receives signature and processes them.
@@ -24,6 +26,7 @@ pub struct SequentialSignatureProcessor {
24
26
certifier : Arc < dyn CertifierService > ,
25
27
stop_rx : Receiver < ( ) > ,
26
28
logger : Logger ,
29
+ metrics_service : Arc < MetricsService > ,
27
30
}
28
31
29
32
impl SequentialSignatureProcessor {
@@ -33,12 +36,14 @@ impl SequentialSignatureProcessor {
33
36
certifier : Arc < dyn CertifierService > ,
34
37
stop_rx : Receiver < ( ) > ,
35
38
logger : Logger ,
39
+ metrics_service : Arc < MetricsService > ,
36
40
) -> Self {
37
41
Self {
38
42
consumer,
39
43
certifier,
40
44
stop_rx,
41
45
logger : logger. new_with_component_name :: < Self > ( ) ,
46
+ metrics_service,
42
47
}
43
48
}
44
49
}
@@ -55,6 +60,11 @@ impl SignatureProcessor for SequentialSignatureProcessor {
55
60
. await
56
61
{
57
62
error ! ( self . logger, "Error dispatching single signature" ; "error" => ?e) ;
63
+ } else {
64
+ let origin_network = self . consumer . get_origin_tag ( ) ;
65
+ self . metrics_service
66
+ . get_signature_registration_total_received_since_startup ( )
67
+ . increment ( & [ & origin_network] ) ;
58
68
}
59
69
}
60
70
}
@@ -107,24 +117,29 @@ mod tests {
107
117
#[ tokio:: test]
108
118
async fn processor_process_signatures_succeeds ( ) {
109
119
let logger = TestLogger :: stdout ( ) ;
120
+ let single_signatures = vec ! [
121
+ (
122
+ fake_data:: single_signature( vec![ 1 , 2 , 3 ] ) ,
123
+ SignedEntityType :: MithrilStakeDistribution ( Epoch ( 1 ) ) ,
124
+ ) ,
125
+ (
126
+ fake_data:: single_signature( vec![ 4 , 5 , 6 ] ) ,
127
+ SignedEntityType :: MithrilStakeDistribution ( Epoch ( 2 ) ) ,
128
+ ) ,
129
+ ] ;
130
+ let single_signatures_length = single_signatures. len ( ) ;
131
+ let network_origin = "test_network" ;
110
132
let mock_consumer = {
111
133
let mut mock_consumer = MockSignatureConsumer :: new ( ) ;
112
134
mock_consumer
113
135
. expect_get_signatures ( )
114
- . returning ( || {
115
- Ok ( vec ! [
116
- (
117
- fake_data:: single_signature( vec![ 1 , 2 , 3 ] ) ,
118
- SignedEntityType :: MithrilStakeDistribution ( Epoch ( 1 ) ) ,
119
- ) ,
120
- (
121
- fake_data:: single_signature( vec![ 4 , 5 , 6 ] ) ,
122
- SignedEntityType :: MithrilStakeDistribution ( Epoch ( 2 ) ) ,
123
- ) ,
124
- ] )
125
- } )
136
+ . returning ( move || Ok ( single_signatures. clone ( ) ) )
126
137
. times ( 1 ) ;
127
138
mock_consumer
139
+ . expect_get_origin_tag ( )
140
+ . returning ( || network_origin. to_string ( ) )
141
+ . times ( single_signatures_length) ;
142
+ mock_consumer
128
143
} ;
129
144
let mock_certifier = {
130
145
let mut mock_certifier = MockCertifierService :: new ( ) ;
@@ -144,21 +159,33 @@ mod tests {
144
159
)
145
160
. returning ( |_, _| Ok ( SignatureRegistrationStatus :: Registered ) )
146
161
. times ( 1 ) ;
147
-
148
162
mock_certifier
149
163
} ;
150
164
let ( _stop_tx, stop_rx) = channel ( ( ) ) ;
165
+ let metrics_service = MetricsService :: new ( TestLogger :: stdout ( ) ) . unwrap ( ) ;
166
+ let initial_counter_value = metrics_service
167
+ . get_signature_registration_total_received_since_startup ( )
168
+ . get ( & [ network_origin] ) ;
169
+ let metrics_service = Arc :: new ( metrics_service) ;
151
170
let processor = SequentialSignatureProcessor :: new (
152
171
Arc :: new ( mock_consumer) ,
153
172
Arc :: new ( mock_certifier) ,
154
173
stop_rx,
155
174
logger,
175
+ metrics_service. clone ( ) ,
156
176
) ;
157
177
158
178
processor
159
179
. process_signatures ( )
160
180
. await
161
181
. expect ( "Failed to process signatures" ) ;
182
+
183
+ assert_eq ! (
184
+ initial_counter_value + single_signatures_length as u32 ,
185
+ metrics_service
186
+ . get_signature_registration_total_received_since_startup( )
187
+ . get( & [ network_origin] )
188
+ )
162
189
}
163
190
164
191
#[ tokio:: test]
@@ -185,11 +212,13 @@ mod tests {
185
212
mock_certifier
186
213
} ;
187
214
let ( stop_tx, stop_rx) = channel ( ( ) ) ;
215
+ let metrics_service = MetricsService :: new ( TestLogger :: stdout ( ) ) . unwrap ( ) ;
188
216
let processor = SequentialSignatureProcessor :: new (
189
217
Arc :: new ( fake_consumer) ,
190
218
Arc :: new ( mock_certifier) ,
191
219
stop_rx,
192
220
logger,
221
+ Arc :: new ( metrics_service) ,
193
222
) ;
194
223
195
224
tokio:: select!(
0 commit comments