@@ -350,7 +350,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
350350 return Ok ( KeepChain ) ;
351351 }
352352 BatchState :: Poisoned => unreachable ! ( "Poisoned batch" ) ,
353- BatchState :: Processing ( _) | BatchState :: AwaitingDownload | BatchState :: Failed => {
353+ // Batches can be in `AwaitingDownload` state if there weren't good data column subnet
354+ // peers to send the request to.
355+ BatchState :: AwaitingDownload => return Ok ( KeepChain ) ,
356+ BatchState :: Processing ( _) | BatchState :: Failed => {
354357 // these are all inconsistent states:
355358 // - Processing -> `self.current_processing_batch` is None
356359 // - Failed -> non recoverable batch. For an optimistic batch, it should
@@ -384,7 +387,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
384387 // Batch is not ready, nothing to process
385388 }
386389 BatchState :: Poisoned => unreachable ! ( "Poisoned batch" ) ,
387- BatchState :: Failed | BatchState :: AwaitingDownload | BatchState :: Processing ( _) => {
390+ // Batches can be in `AwaitingDownload` state if there weren't good data column subnet
391+ // peers to send the request to.
392+ BatchState :: AwaitingDownload => return Ok ( KeepChain ) ,
393+ BatchState :: Failed | BatchState :: Processing ( _) => {
388394 // these are all inconsistent states:
389395 // - Failed -> non recoverable batch. Chain should have been removed
390396 // - AwaitingDownload -> A recoverable failed batch should have been
@@ -582,8 +588,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
582588 BatchProcessResult :: NonFaultyFailure => {
583589 batch. processing_completed ( BatchProcessingResult :: NonFaultyFailure ) ?;
584590
585- // Simply re-download the batch .
586- self . send_batch ( network, batch_id )
591+ // Simply re-download all batches in `AwaitingDownload` state .
592+ self . attempt_send_awaiting_download_batches ( network, "non-faulty-failure" )
587593 }
588594 }
589595 }
@@ -717,6 +723,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
717723 previous_start = %old_start,
718724 new_start = %self . start_epoch,
719725 processing_target = %self . processing_target,
726+ id=%self . id,
720727 "Chain advanced"
721728 ) ;
722729 }
@@ -753,7 +760,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
753760 }
754761 // this is our robust `processing_target`. All previous batches must be awaiting
755762 // validation
756- let mut redownload_queue = Vec :: new ( ) ;
757763
758764 for ( id, batch) in self . batches . range_mut ( ..batch_id) {
759765 if let BatchOperationOutcome :: Failed { blacklist } = batch. validation_failed ( ) ? {
@@ -763,18 +769,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
763769 failing_batch : * id,
764770 } ) ;
765771 }
766- redownload_queue. push ( * id) ;
767772 }
768773
769774 // no batch maxed out it process attempts, so now the chain's volatile progress must be
770775 // reset
771776 self . processing_target = self . start_epoch ;
772777
773- for id in redownload_queue {
774- self . send_batch ( network, id) ?;
775- }
776- // finally, re-request the failed batch.
777- self . send_batch ( network, batch_id)
778+ // finally, re-request the failed batch and all other batches in `AwaitingDownload` state.
779+ self . attempt_send_awaiting_download_batches ( network, "handle_invalid_batch" )
778780 }
779781
780782 pub fn stop_syncing ( & mut self ) {
@@ -810,6 +812,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
810812
811813 // advance the chain to the new validating epoch
812814 self . advance_chain ( network, validating_epoch) ;
815+ // attempt to download any batches stuck in the `AwaitingDownload` state because of
816+ // a lack of peers earlier
817+ self . attempt_send_awaiting_download_batches ( network, "start_syncing" ) ?;
813818 if self . optimistic_start . is_none ( )
814819 && optimistic_epoch > self . processing_target
815820 && !self . attempted_optimistic_starts . contains ( & optimistic_epoch)
@@ -939,6 +944,41 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
939944 }
940945 }
941946
947+ /// Attempts to send all batches that are in `AwaitingDownload` state.
948+ ///
949+ /// Batches might get stuck in `AwaitingDownload` post peerdas because of lack of peers
950+ /// in required subnets. We need to progress them if peers are available at a later point.
951+ pub fn attempt_send_awaiting_download_batches (
952+ & mut self ,
953+ network : & mut SyncNetworkContext < T > ,
954+ src : & str ,
955+ ) -> ProcessingResult {
956+ // Collect all batches in AwaitingDownload state and see if they can be sent
957+ let awaiting_downloads: Vec < _ > = self
958+ . batches
959+ . iter ( )
960+ . filter ( |( _, batch) | matches ! ( batch. state( ) , BatchState :: AwaitingDownload ) )
961+ . map ( |( batch_id, _) | batch_id)
962+ . copied ( )
963+ . collect ( ) ;
964+ debug ! (
965+ ?awaiting_downloads,
966+ src, "Attempting to send batches awaiting downlaod"
967+ ) ;
968+
969+ for batch_id in awaiting_downloads {
970+ if self . good_peers_on_sampling_subnets ( batch_id, network) {
971+ self . send_batch ( network, batch_id) ?;
972+ } else {
973+ debug ! (
974+ src = "attempt_send_awaiting_download_batches" ,
975+ "Waiting for peers to be available on sampling column subnets"
976+ ) ;
977+ }
978+ }
979+ Ok ( KeepChain )
980+ }
981+
942982 /// Requests the batch assigned to the given id from a given peer.
943983 pub fn send_batch (
944984 & mut self ,
@@ -1089,14 +1129,16 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
10891129 if !matches ! ( self . state, ChainSyncingState :: Syncing ) {
10901130 return Ok ( KeepChain ) ;
10911131 }
1092-
10931132 // find the next pending batch and request it from the peer
10941133
10951134 // check if we have the batch for our optimistic start. If not, request it first.
10961135 // We wait for this batch before requesting any other batches.
10971136 if let Some ( epoch) = self . optimistic_start {
10981137 if !self . good_peers_on_sampling_subnets ( epoch, network) {
1099- debug ! ( "Waiting for peers to be available on sampling column subnets" ) ;
1138+ debug ! (
1139+ src = "request_batches_optimistic" ,
1140+ "Waiting for peers to be available on sampling column subnets"
1141+ ) ;
11001142 return Ok ( KeepChain ) ;
11011143 }
11021144
@@ -1105,6 +1147,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
11051147 let optimistic_batch = BatchInfo :: new ( & epoch, EPOCHS_PER_BATCH , batch_type) ;
11061148 entry. insert ( optimistic_batch) ;
11071149 self . send_batch ( network, epoch) ?;
1150+ } else {
1151+ self . attempt_send_awaiting_download_batches ( network, "request_batches_optimistic" ) ?;
11081152 }
11091153 return Ok ( KeepChain ) ;
11101154 }
@@ -1179,7 +1223,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
11791223 // block and data column requests are currently coupled. This can be removed once we find a
11801224 // way to decouple the requests and do retries individually, see issue #6258.
11811225 if !self . good_peers_on_sampling_subnets ( self . to_be_downloaded , network) {
1182- debug ! ( "Waiting for peers to be available on custody column subnets" ) ;
1226+ debug ! (
1227+ src = "include_next_batch" ,
1228+ "Waiting for peers to be available on custody column subnets"
1229+ ) ;
11831230 return None ;
11841231 }
11851232
0 commit comments