17
17
import contextlib
18
18
import ccf .ledger
19
19
from reconfiguration import test_ledger_invariants
20
+ import subprocess
20
21
21
22
from loguru import logger as LOG
22
23
@@ -711,6 +712,150 @@ def wait_for_new_view(node, original_view, timeout_multiplier):
711
712
return network
712
713
713
714
715
+ @reqs .supports_methods ("/app/log/public" )
716
+ def test_recovery_elections (orig_network , args ):
717
+ # Ensure we have 3 nodes
718
+ original_size = orig_network .resize (3 , args )
719
+
720
+ old_primary , _ = orig_network .find_nodes ()
721
+ with old_primary .client ("user0" ) as c :
722
+ LOG .warning ("Writing some initial state" )
723
+ for _ in range (300 ):
724
+ r = c .post (
725
+ "/app/log/public" ,
726
+ {
727
+ "id" : 42 ,
728
+ "msg" : "Uninteresting recoverable transactions" ,
729
+ },
730
+ )
731
+ assert r .status_code == 200 , r
732
+
733
+ r = c .get ("/node/network" )
734
+ assert r .status_code == 200 , r
735
+ previous_identity = orig_network .save_service_identity (args )
736
+ c .wait_for_commit (
737
+ orig_network .consortium .set_recovery_threshold (old_primary , 1 )
738
+ )
739
+ orig_network .stop_all_nodes (skip_verification = True )
740
+ current_ledger_dir , committed_ledger_dirs = old_primary .get_ledger ()
741
+
742
+ # Create a recovery network, where we will manually take the recovery steps (transition to open and submit share)
743
+ network = infra .network .Network (
744
+ args .nodes ,
745
+ args .binary_dir ,
746
+ args .debug_nodes ,
747
+ args .perf_nodes ,
748
+ existing_network = orig_network ,
749
+ )
750
+ network .start_in_recovery (
751
+ args ,
752
+ ledger_dir = current_ledger_dir ,
753
+ committed_ledger_dirs = committed_ledger_dirs ,
754
+ )
755
+ new_primary , new_backups = network .find_nodes ()
756
+ network .consortium .transition_service_to_open (
757
+ new_primary , previous_service_identity = previous_identity
758
+ )
759
+
760
+ with new_primary .client ("user0" ) as c :
761
+ previous_identity = network .save_service_identity (args )
762
+
763
+ member = network .consortium .get_active_recovery_members ()[0 ]
764
+
765
+ # We need to delay a backup's private recovery process until:
766
+ # - The primary has completed its private recovery, and fully opened the network
767
+ # - The backup has called and won an election
768
+ # So that the backup node _is primary_ at the point it completes private recovery.
769
+ # We force the delay by injecting a delay into the file operations of the backup,
770
+ # and force an election (after the primary has completed its recovery) by killing
771
+ # the original primary node.
772
+ backup = new_backups [0 ]
773
+ LOG .info (f"Using strace to inject delays in file IO of { backup } " )
774
+ assert not backup .remote .check_done ()
775
+
776
+ strace_command = [
777
+ "strace" ,
778
+ f"--attach={ backup .remote .remote .proc .pid } " ,
779
+ "--inject=lseek:delay_exit=10s" ,
780
+ "-tt" ,
781
+ "--trace=lseek,read,open,openat" ,
782
+ "--output=strace_output.txt" ,
783
+ ]
784
+ LOG .warning (f"About to run strace: { strace_command } " )
785
+ strace_process = subprocess .Popen (
786
+ strace_command ,
787
+ stdout = subprocess .PIPE ,
788
+ stderr = subprocess .PIPE ,
789
+ )
790
+
791
+ member .get_and_submit_recovery_share (new_primary )
792
+ network .recovery_count += 1
793
+
794
+ LOG .info ("Confirming that primary completes private recovery" )
795
+ network .wait_for_state (
796
+ new_primary ,
797
+ infra .node .State .PART_OF_NETWORK .value ,
798
+ timeout = 30 ,
799
+ )
800
+
801
+ election_s = args .election_timeout_ms / 1000
802
+ LOG .info (
803
+ f"Holding backup stalled via strace for { election_s } , to trigger an election"
804
+ )
805
+ time .sleep (election_s )
806
+
807
+ # If strace failed to stall the node, the rest of the test is meaningless.
808
+ try :
809
+ strace_process .communicate (timeout = 1 )
810
+ except subprocess .TimeoutExpired :
811
+ assert strace_process .returncode is None , strace_process .returncode
812
+ else :
813
+ assert (
814
+ False
815
+ ), f"strace must not have been completed yet (retcode: { strace_process .returncode } )"
816
+
817
+ LOG .info ("Ending strace, and terminating primary node" )
818
+ strace_process .terminate ()
819
+ strace_process .communicate ()
820
+
821
+ new_primary .stop ()
822
+
823
+ LOG .info (
824
+ f"Give { backup } time to finish its recovery (including becoming primary), and confirm that it dies in the process"
825
+ )
826
+ time .sleep (election_s )
827
+ # The result of all of that is that this node, which had become primary while it
828
+ # completed its private recovery, crashed at the end of recovery (rather than)
829
+ # producing an invalid ledger)
830
+ assert backup .remote .check_done ()
831
+
832
+ network .ignore_errors_on_shutdown ()
833
+ network .stop_all_nodes (skip_verification = True )
834
+ current_ledger_dir , committed_ledger_dirs = backup .get_ledger ()
835
+
836
+ LOG .info (
837
+ "Trying a further recovery, to confirm that the ledger is in a recoverable state"
838
+ )
839
+ recovery_network = infra .network .Network (
840
+ args .nodes ,
841
+ args .binary_dir ,
842
+ args .debug_nodes ,
843
+ args .perf_nodes ,
844
+ existing_network = network ,
845
+ )
846
+ recovery_network .start_in_recovery (
847
+ args ,
848
+ ledger_dir = current_ledger_dir ,
849
+ committed_ledger_dirs = committed_ledger_dirs ,
850
+ )
851
+ recovery_network .recover (args )
852
+
853
+ # Restore original network size
854
+ recovery_network .resize (original_size , args )
855
+
856
+ return recovery_network
857
+
858
+
714
859
def run (args ):
715
860
txs = app .LoggingTxs ("user0" )
716
861
@@ -737,6 +882,7 @@ def run(args):
737
882
# HTTP2 doesn't support forwarding
738
883
if not args .http2 :
739
884
test_session_consistency (network , args )
885
+ network = test_recovery_elections (network , args )
740
886
test_ledger_invariants (network , args )
741
887
742
888
0 commit comments