29
29
from cryptography .hazmat .primitives import serialization
30
30
from ccf .cose import validate_cose_sign1
31
31
from pycose .messages import Sign1Message # type: ignore
32
-
32
+ import random
33
33
from loguru import logger as LOG
34
34
35
35
@@ -107,10 +107,77 @@ def verify_endorsements_chain(primary, endorsements, pubkey):
107
107
pubkey = serialization .load_der_public_key (next_key_bytes , default_backend ())
108
108
109
109
110
+ def recover_with_primary_dying (args , recovered_network ):
111
+ # Minimal copy-paste from network.recover() with primary shut down.
112
+ recovered_network .consortium .activate (recovered_network .find_random_node ())
113
+ recovered_network .consortium .check_for_service (
114
+ recovered_network .find_random_node (),
115
+ status = infra .network .ServiceStatus .RECOVERING ,
116
+ )
117
+ recovered_network .wait_for_all_nodes_to_be_trusted (
118
+ recovered_network .find_random_node ()
119
+ )
120
+
121
+ prev_service_identity = None
122
+ if args .previous_service_identity_file :
123
+ prev_service_identity = slurp_file (args .previous_service_identity_file )
124
+ LOG .info (f"Prev identity: { prev_service_identity } " )
125
+
126
+ recovered_network .consortium .transition_service_to_open (
127
+ recovered_network .find_random_node (),
128
+ previous_service_identity = prev_service_identity ,
129
+ )
130
+
131
+ recovered_network .consortium .recover_with_shares (
132
+ recovered_network .find_random_node ()
133
+ )
134
+ for node in recovered_network .get_joined_nodes ():
135
+ recovered_network .wait_for_state (
136
+ node ,
137
+ infra .node .State .READING_PRIVATE_LEDGER .value ,
138
+ timeout = args .ledger_recovery_timeout ,
139
+ )
140
+
141
+ retired_primary , _ = recovered_network .find_primary ()
142
+ retired_id = retired_primary .node_id
143
+
144
+ LOG .info (f"Force-kill primary { retired_id } " )
145
+ retired_primary .sigkill ()
146
+ recovered_network .nodes .remove (retired_primary )
147
+
148
+ primary , _ = recovered_network .find_primary ()
149
+ while not primary or primary .node_id == retired_id :
150
+ LOG .info ("Keep looking for new primary" )
151
+ time .sleep (0.1 )
152
+ primary , _ = recovered_network .find_primary ()
153
+
154
+ # Ensure new primary has been elected while all nodes are still reading private entries.
155
+ for node in recovered_network .get_joined_nodes ():
156
+ LOG .info (f"Check state for node id { node .node_id } " )
157
+ with node .client (connection_timeout = 1 ) as c :
158
+ assert (
159
+ infra .node .State .READING_PRIVATE_LEDGER .value
160
+ == c .get ("/node/state" ).body .json ()["state" ]
161
+ )
162
+
163
+ # Wait for recovery to complete.
164
+ for node in recovered_network .get_joined_nodes ():
165
+ recovered_network .wait_for_state (
166
+ node ,
167
+ infra .node .State .PART_OF_NETWORK .value ,
168
+ timeout = args .ledger_recovery_timeout ,
169
+ )
170
+
171
+
110
172
@reqs .description ("Recover a service" )
111
173
@reqs .recover (number_txs = 2 )
112
174
def test_recover_service (
113
- network , args , from_snapshot = True , no_ledger = False , via_recovery_owner = False
175
+ network ,
176
+ args ,
177
+ from_snapshot = True ,
178
+ no_ledger = False ,
179
+ via_recovery_owner = False ,
180
+ force_election = False ,
114
181
):
115
182
network .save_service_identity (args )
116
183
old_primary , _ = network .find_primary ()
@@ -127,6 +194,16 @@ def test_recover_service(
127
194
if from_snapshot :
128
195
snapshots_dir = network .get_committed_snapshots (old_primary )
129
196
197
+ if force_election :
198
+ # Necessary to make recovering private entries taking long enough time
199
+ # to allow election to happen if primary gets killed. These later get verified post-recovery (logging app verify_tx() thing).
200
+ network .txs .issue (
201
+ network ,
202
+ number_txs = 10000 ,
203
+ send_public = False ,
204
+ msg = str (bytes (random .getrandbits (8 ) for _ in range (512 ))),
205
+ )
206
+
130
207
# Start health watcher and stop nodes one by one until a recovery has to be staged
131
208
watcher = infra .health_watcher .NetworkHealthWatcher (network , args , verbose = True )
132
209
watcher .start ()
@@ -202,7 +279,10 @@ def test_recover_service(
202
279
r = c .get ("/node/ready/app" )
203
280
assert r .status_code == http .HTTPStatus .SERVICE_UNAVAILABLE .value , r
204
281
205
- recovered_network .recover (args , via_recovery_owner = via_recovery_owner )
282
+ if force_election :
283
+ recover_with_primary_dying (args , recovered_network )
284
+ else :
285
+ recovered_network .recover (args , via_recovery_owner = via_recovery_owner )
206
286
207
287
LOG .info ("Check that new service view is as expected" )
208
288
new_primary , _ = recovered_network .find_primary ()
@@ -216,6 +296,16 @@ def test_recover_service(
216
296
r = c .get ("/node/ready/gov" )
217
297
assert r .status_code == http .HTTPStatus .NO_CONTENT .value , r
218
298
r = c .get ("/node/ready/app" )
299
+
300
+ # Service opening may be slightly delayed due to forced election (if option enabled).
301
+ app_ready_attempts = 10 if force_election else 0
302
+ while (
303
+ r .status_code != http .HTTPStatus .NO_CONTENT .value and app_ready_attempts > 0
304
+ ):
305
+ time .sleep (0.1 )
306
+ app_ready_attempts -= 1
307
+ r = c .get ("/node/ready/app" )
308
+
219
309
assert r .status_code == http .HTTPStatus .NO_CONTENT .value , r
220
310
221
311
return recovered_network
@@ -999,6 +1089,27 @@ def run_recover_snapshot_alone(args):
999
1089
return network
1000
1090
1001
1091
1092
+ def run_recovery_with_election (args ):
1093
+ """
1094
+ Recover a service but force election during recovery.
1095
+ """
1096
+ if not args .with_election :
1097
+ return
1098
+
1099
+ txs = app .LoggingTxs ("user0" )
1100
+ with infra .network .network (
1101
+ args .nodes ,
1102
+ args .binary_dir ,
1103
+ args .debug_nodes ,
1104
+ args .perf_nodes ,
1105
+ pdb = args .pdb ,
1106
+ txs = txs ,
1107
+ ) as network :
1108
+ network .start_and_open (args )
1109
+ test_recover_service (network , args , force_election = True )
1110
+ return network
1111
+
1112
+
1002
1113
def run_recover_via_initial_recovery_owner (args ):
1003
1114
"""
1004
1115
Recover a service using the recovery owner added as part of service creation, without requiring any other recovery members to participate.
@@ -1082,6 +1193,12 @@ def add(parser):
1082
1193
action = "store_true" ,
1083
1194
default = False ,
1084
1195
)
1196
+ parser .add_argument (
1197
+ "--with-election" ,
1198
+ help = "If set, the primary gets killed to force election mid-recovery" ,
1199
+ action = "store_true" ,
1200
+ default = False ,
1201
+ )
1085
1202
1086
1203
cr = ConcurrentRunner (add )
1087
1204
@@ -1130,4 +1247,13 @@ def add(parser):
1130
1247
nodes = infra .e2e_args .min_nodes (cr .args , f = 0 ), # 1 node suffices for recovery
1131
1248
)
1132
1249
1250
+ cr .add (
1251
+ "recovery_with_election" ,
1252
+ run_recovery_with_election ,
1253
+ package = "samples/apps/logging/liblogging" ,
1254
+ nodes = infra .e2e_args .min_nodes (cr .args , f = 1 ),
1255
+ ledger_chunk_bytes = "50KB" ,
1256
+ snapshot_tx_interval = 30 ,
1257
+ )
1258
+
1133
1259
cr .run ()
0 commit comments