Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
490b627
Penalize if invalid EL block
pawanjay176 Aug 13, 2025
836f9c6
Priorotize status v2
pawanjay176 Aug 13, 2025
156449c
Increase columns_by_root quota
pawanjay176 Aug 14, 2025
6bd8944
Reduce backfill buffer size
pawanjay176 Aug 15, 2025
9455153
Without retries
pawanjay176 Aug 18, 2025
5337e46
Add a function to retry column requests that could not be made
pawanjay176 Aug 19, 2025
ca9cfd5
Small fixes
pawanjay176 Aug 19, 2025
68cce37
Try to avoid chains failing for rpc errors
pawanjay176 Aug 20, 2025
6da924b
Fix bug in initialization code
pawanjay176 Aug 20, 2025
1a0df30
Also penalize all batch peers for availability check errors
pawanjay176 Aug 20, 2025
17c4e34
Avoid root requests for backfill sync
pawanjay176 Aug 20, 2025
fdce537
Implement responsible peer tracking
pawanjay176 Aug 21, 2025
4540195
Request columns from global peer pool
pawanjay176 Aug 14, 2025
521778b
Random logs
pawanjay176 Aug 21, 2025
da27441
Merge branch 'unstable' into blocks-then-columns
pawanjay176 Aug 21, 2025
52762b9
Handle 0 blobs per epoch case
pawanjay176 Aug 22, 2025
7c214f5
Merge branch 'unstable' into blocks-then-columns
pawanjay176 Aug 25, 2025
90d319f
Merge branch 'unstable' into blocks-then-columns
pawanjay176 Aug 26, 2025
27d0b36
Remove debug statements
pawanjay176 Aug 26, 2025
a97cf88
Add docs
pawanjay176 Aug 27, 2025
05adb71
Fix bug with partial column responses before all column requests sent
pawanjay176 Aug 27, 2025
b4bc7fe
Remove more debug logs
pawanjay176 Aug 27, 2025
8386bd9
Merge branch 'unstable' into blocks-then-columns
pawanjay176 Aug 28, 2025
7331323
AwaitingValidation state only needs block peer
pawanjay176 Aug 28, 2025
da1aaba
Revise error tolerance
pawanjay176 Aug 28, 2025
8e1337d
Merge branch 'unstable' into blocks-then-columns
pawanjay176 Aug 29, 2025
19b0a5c
Merge branch 'unstable' into blocks-then-columns
pawanjay176 Aug 29, 2025
b07bc6d
Force requests if batch buffer is full under certain conditions
pawanjay176 Aug 29, 2025
4f60e86
Add logs to debug stuck range sync
pawanjay176 Aug 31, 2025
7a6d0d9
Force processing_target request
pawanjay176 Sep 1, 2025
8458df6
Attempt sending awaitingDownload batches when restarting sync
pawanjay176 Sep 1, 2025
29c2f83
Cleanup SyncingChain
pawanjay176 Sep 2, 2025
7e91eeb
Merge branch 'unstable' into blocks-then-columns
pawanjay176 Sep 5, 2025
e0d8f04
Tests compile
pawanjay176 Sep 5, 2025
6a2a33d
Fix some issues from review
pawanjay176 Sep 5, 2025
e259ecd
More renamings
pawanjay176 Sep 5, 2025
4f62a9c
Merge branch 'unstable' into blocks-then-columns
pawanjay176 Sep 5, 2025
04398ad
Fix some more issues from review
pawanjay176 Sep 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions beacon_node/lighthouse_network/src/peer_manager/peerdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,36 @@ impl<E: EthSpec> PeerDB<E> {
.map(|(peer_id, _)| peer_id)
}

/// Returns an iterator of all good gossipsub peers that are supposed to be custodying
/// the given subnet id and have the epoch according to their status messages.
pub fn good_custody_subnet_peer_range_sync(
&self,
subnet: DataColumnSubnetId,
epoch: Epoch,
) -> impl Iterator<Item = &PeerId> {
self.peers
.iter()
.filter(move |(_, info)| {
// The custody_subnets hashset can be populated via enr or metadata
let is_custody_subnet_peer = info.is_assigned_to_custody_subnet(&subnet);

info.is_connected()
&& is_custody_subnet_peer
&& match info.sync_status() {
SyncStatus::Synced { info } => {
info.has_slot(epoch.end_slot(E::slots_per_epoch()))
}
SyncStatus::Advanced { info } => {
info.has_slot(epoch.end_slot(E::slots_per_epoch()))
}
SyncStatus::IrrelevantPeer
| SyncStatus::Behind { .. }
| SyncStatus::Unknown => false,
}
})
.map(|(peer_id, _)| peer_id)
}

/// Checks if there is at least one good peer for each specified custody subnet for the given epoch.
/// A "good" peer is one that is both connected and synced (or advanced) for the specified epoch.
pub fn has_good_custody_range_sync_peer(
Expand Down
29 changes: 29 additions & 0 deletions beacon_node/lighthouse_network/src/service/api_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub enum SyncRequestId {
pub struct DataColumnsByRootRequestId {
pub id: Id,
pub requester: DataColumnsByRootRequester,
pub peer: PeerId,
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
Expand All @@ -46,6 +47,20 @@ pub struct BlocksByRangeRequestId {
pub id: Id,
/// The Id of the overall By Range request for block components.
pub parent_request_id: ComponentsByRangeRequestId,
/// The peer that we made this request to
pub peer_id: PeerId,
}

impl BlocksByRangeRequestId {
pub fn batch_id(&self) -> Epoch {
match self.parent_request_id.requester {
RangeRequestId::BackfillSync { batch_id } => batch_id,
RangeRequestId::RangeSync {
chain_id: _,
batch_id,
} => batch_id,
}
}
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
Expand Down Expand Up @@ -86,12 +101,24 @@ pub enum RangeRequestId {
RangeSync { chain_id: Id, batch_id: Epoch },
BackfillSync { batch_id: Epoch },
}
impl RangeRequestId {
pub fn batch_id(&self) -> Epoch {
match &self {
RangeRequestId::BackfillSync { batch_id } => *batch_id,
RangeRequestId::RangeSync {
chain_id: _,
batch_id,
} => *batch_id,
}
}
}

// TODO(das) refactor in a separate PR. We might be able to remove this and replace
// [`DataColumnsByRootRequestId`] with a [`SingleLookupReqId`].
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum DataColumnsByRootRequester {
Custody(CustodyId),
RangeSync { parent: ComponentsByRangeRequestId },
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
Expand Down Expand Up @@ -222,6 +249,7 @@ impl Display for DataColumnsByRootRequester {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Custody(id) => write!(f, "Custody/{id}"),
Self::RangeSync { parent } => write!(f, "Range/{parent}"),
}
}
}
Expand Down Expand Up @@ -255,6 +283,7 @@ mod tests {
lookup_id: 101,
}),
}),
peer: PeerId::random(),
};
assert_eq!(format!("{id}"), "123/Custody/121/Lookup/101");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::metrics::{self, register_process_result_metrics};
use crate::network_beacon_processor::{FUTURE_SLOT_TOLERANCE, NetworkBeaconProcessor};
use crate::sync::BatchProcessResult;
use crate::sync::manager::FaultyComponent;
use crate::sync::{
ChainId,
manager::{BlockProcessType, SyncMessage},
Expand All @@ -10,7 +11,8 @@ use beacon_chain::data_availability_checker::AvailabilityCheckError;
use beacon_chain::data_availability_checker::MaybeAvailableBlock;
use beacon_chain::{
AvailabilityProcessingStatus, BeaconChainTypes, BlockError, ChainSegmentResult,
HistoricalBlockError, NotifyExecutionLayer, validator_monitor::get_slot_delay_ms,
ExecutionPayloadError, HistoricalBlockError, NotifyExecutionLayer,
validator_monitor::get_slot_delay_ms,
};
use beacon_processor::{
AsyncFn, BlockingFn, DuplicateCache,
Expand Down Expand Up @@ -45,6 +47,8 @@ struct ChainSegmentFailed {
message: String,
/// Used to penalize peers.
peer_action: Option<PeerAction>,
/// Used to identify the faulty component
faulty_component: Option<FaultyComponent>,
}

impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
Expand Down Expand Up @@ -492,6 +496,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
Some(penalty) => BatchProcessResult::FaultyFailure {
imported_blocks,
penalty,
faulty_component: e.faulty_component,
},
None => BatchProcessResult::NonFaultyFailure,
}
Expand Down Expand Up @@ -543,6 +548,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
Some(penalty) => BatchProcessResult::FaultyFailure {
imported_blocks: 0,
penalty,
faulty_component: e.faulty_component,
},
None => BatchProcessResult::NonFaultyFailure,
}
Expand Down Expand Up @@ -614,15 +620,18 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
Err(ChainSegmentFailed {
peer_action: None,
message: "Failed to check block availability".into(),
faulty_component: None,
}),
);
}

e => {
return (
0,
Err(ChainSegmentFailed {
peer_action: Some(PeerAction::LowToleranceError),
message: format!("Failed to check block availability : {:?}", e),
faulty_component: None, // Todo(pawan): replicate behaviour in forward sync once its proven
}),
);
}
Expand All @@ -639,6 +648,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
(total_blocks - available_blocks.len()),
total_blocks
),
faulty_component: Some(FaultyComponent::Blocks),
}),
);
}
Expand All @@ -654,7 +664,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_FAILED_TOTAL,
);
let peer_action = match &e {
let (peer_action, faulty_component) = match &e {
HistoricalBlockError::MismatchedBlockRoot {
block_root,
expected_block_root,
Expand All @@ -666,7 +676,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"Backfill batch processing error"
);
// The peer is faulty if they send blocks with bad roots.
Some(PeerAction::LowToleranceError)
(
Some(PeerAction::LowToleranceError),
Some(FaultyComponent::Blocks),
)
}
HistoricalBlockError::InvalidSignature
| HistoricalBlockError::SignatureSet(_) => {
Expand All @@ -675,28 +688,31 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"Backfill batch processing error"
);
// The peer is faulty if they bad signatures.
Some(PeerAction::LowToleranceError)
(
Some(PeerAction::LowToleranceError),
Some(FaultyComponent::Blocks),
)
}
HistoricalBlockError::ValidatorPubkeyCacheTimeout => {
warn!(
error = "pubkey_cache_timeout",
"Backfill batch processing error"
);
// This is an internal error, do not penalize the peer.
None
(None, None)
}
HistoricalBlockError::IndexOutOfBounds => {
error!(
error = ?e,
"Backfill batch OOB error"
);
// This should never occur, don't penalize the peer.
None
(None, None)
}
HistoricalBlockError::StoreError(e) => {
warn!(error = ?e, "Backfill batch processing error");
// This is an internal error, don't penalize the peer.
None
(None, None)
} //
// Do not use a fallback match, handle all errors explicitly
};
Expand All @@ -707,6 +723,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
message: format!("{:?}", err_str),
// This is an internal error, don't penalize the peer.
peer_action,
faulty_component,
}),
)
}
Expand All @@ -721,7 +738,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
Err(ChainSegmentFailed {
message: format!("Block has an unknown parent: {}", parent_root),
// Peers are faulty if they send non-sequential blocks.
peer_action: Some(PeerAction::LowToleranceError),
peer_action: Some(PeerAction::LowToleranceError), // todo(pawan): revise this
faulty_component: Some(FaultyComponent::Blocks),
})
}
BlockError::DuplicateFullyImported(_)
Expand Down Expand Up @@ -760,6 +778,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
),
// Peers are faulty if they send blocks from the future.
peer_action: Some(PeerAction::LowToleranceError),
faulty_component: Some(FaultyComponent::Blocks),
})
}
BlockError::WouldRevertFinalizedSlot { .. } => {
Expand All @@ -776,6 +795,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
block_parent_root
),
peer_action: Some(PeerAction::Fatal),
faulty_component: Some(FaultyComponent::Blocks),
})
}
BlockError::GenesisBlock => {
Expand All @@ -793,10 +813,24 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
message: format!("Internal error whilst processing block: {:?}", e),
// Do not penalize peers for internal errors.
peer_action: None,
faulty_component: None,
})
}
ref err @ BlockError::ExecutionPayloadError(ref epe) => {
if !epe.penalize_peer() {
if matches!(epe, ExecutionPayloadError::RejectedByExecutionEngine { .. }) {
debug!(
error = ?err,
"Invalid execution payload rejected by EE"
);
Err(ChainSegmentFailed {
message: format!(
"Peer sent a block containing invalid execution payload. Reason: {:?}",
err
),
peer_action: Some(PeerAction::LowToleranceError),
faulty_component: Some(FaultyComponent::Blocks), // todo(pawan): recheck this
})
} else if !epe.penalize_peer() {
// These errors indicate an issue with the EL and not the `ChainSegment`.
// Pause the syncing while the EL recovers
debug!(
Expand All @@ -808,6 +842,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
message: format!("Execution layer offline. Reason: {:?}", err),
// Do not penalize peers for internal errors.
peer_action: None,
faulty_component: None,
})
} else {
debug!(
Expand All @@ -820,6 +855,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
err
),
peer_action: Some(PeerAction::LowToleranceError),
faulty_component: Some(FaultyComponent::Blocks),
})
}
}
Expand All @@ -835,6 +871,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// of a faulty EL it will usually require manual intervention to fix anyway, so
// it's not too bad if we drop most of our peers.
peer_action: Some(PeerAction::LowToleranceError),
faulty_component: Some(FaultyComponent::Blocks),
})
}
// Penalise peers for sending us banned blocks.
Expand All @@ -843,8 +880,49 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
Err(ChainSegmentFailed {
message: format!("Banned block: {block_root:?}"),
peer_action: Some(PeerAction::Fatal),
faulty_component: Some(FaultyComponent::Blocks),
})
}
ref err @ BlockError::AvailabilityCheck(ref e) => {
match &e {
AvailabilityCheckError::InvalidBlobs(_)
| AvailabilityCheckError::BlobIndexInvalid(_) => Err(ChainSegmentFailed {
message: format!("Peer sent invalid blobs. Reason: {:?}", err),
// Do not penalize peers for internal errors.
peer_action: Some(PeerAction::LowToleranceError),
faulty_component: Some(FaultyComponent::Blobs),
}),
AvailabilityCheckError::InvalidColumn((column_opt, _)) => {
let (peer_action, faulty_component) = if let Some(column) = column_opt {
(
Some(PeerAction::LowToleranceError),
Some(FaultyComponent::Columns(vec![*column])),
)
} else {
(None, None)
};
Err(ChainSegmentFailed {
message: format!("Peer sent invalid columns. Reason: {:?}", err),
peer_action,
faulty_component,
})
}
AvailabilityCheckError::DataColumnIndexInvalid(column) => {
Err(ChainSegmentFailed {
message: format!("Peer sent invalid columns. Reason: {:?}", err),
// Do not penalize peers for internal errors.
peer_action: Some(PeerAction::LowToleranceError),
faulty_component: Some(FaultyComponent::Columns(vec![*column])),
})
}
_ => Err(ChainSegmentFailed {
message: format!("Peer sent invalid block. Reason: {:?}", err),
// Do not penalize peers for internal errors.
peer_action: None,
faulty_component: None,
}),
}
}
other => {
debug!(
msg = "peer sent invalid block",
Expand All @@ -856,6 +934,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
message: format!("Peer sent invalid block. Reason: {:?}", other),
// Do not penalize peers for internal errors.
peer_action: None,
faulty_component: None,
})
}
}
Expand Down
Loading
Loading