Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
d91b688
Add historical data column storage functionality
eserilev Jul 21, 2025
f3b9f24
some progress on a manager -> sync service architecture
eserilev Aug 2, 2025
cbdb44d
Adding custody sync service
eserilev Aug 5, 2025
8c1fafa
resolve merge conflicts
eserilev Aug 5, 2025
a5670c8
minor cleanup
eserilev Aug 6, 2025
e182af7
enable batch request/response handling
eserilev Aug 13, 2025
3cff919
refactor
eserilev Aug 13, 2025
2fcd512
update
eserilev Aug 14, 2025
6ae8174
resolve merge conflicts
eserilev Aug 14, 2025
8c074ec
Remove some TODOs
eserilev Aug 14, 2025
725dabd
update
eserilev Aug 15, 2025
82c9160
Adding a bunch of logs and fixing some edge cases
eserilev Aug 18, 2025
83bba63
some progress
eserilev Aug 18, 2025
27f4438
Merge unstable
eserilev Aug 19, 2025
1bf46a7
fix logs
eserilev Aug 19, 2025
f415360
some cleanup
eserilev Aug 19, 2025
faf389c
more progress
eserilev Aug 19, 2025
fb83add
some comments
eserilev Aug 19, 2025
a05ce72
update
eserilev Aug 20, 2025
aae5dd9
linting fixes
eserilev Aug 20, 2025
f4f21c6
linting
eserilev Aug 20, 2025
107b131
Linting
eserilev Aug 20, 2025
7293cfe
some fixes
eserilev Aug 21, 2025
d4d5dba
Add KZG verification logic
eserilev Aug 25, 2025
2c8f7f9
Handle skipped slots
eserilev Aug 25, 2025
86beef5
Resume pending custody backfill when necessary
eserilev Aug 26, 2025
2f5f7b4
Fix some TODOs
eserilev Aug 26, 2025
0b1b2e6
Resolve some TODOs
eserilev Aug 26, 2025
d7323e0
Cleanup
eserilev Aug 26, 2025
b88f53c
Fix a bug where custody sync cant be restarted
eserilev Aug 27, 2025
acb8b30
remove TODOs
eserilev Aug 27, 2025
103de67
Need to fix speedo
eserilev Aug 27, 2025
890683e
update
eserilev Aug 27, 2025
f8ba916
resolve merge conflicts
eserilev Aug 27, 2025
79748a1
fix test
eserilev Aug 27, 2025
46a41d6
fix test
eserilev Aug 27, 2025
ecb1980
fmt
eserilev Aug 27, 2025
370a5d4
fix test
eserilev Aug 27, 2025
a001eb0
fix test
eserilev Aug 28, 2025
9205a91
merge conflicts
eserilev Sep 2, 2025
0113b00
fmt
eserilev Sep 2, 2025
9c67997
some cleanup
eserilev Sep 2, 2025
9ac4a7d
Merge branch 'unstable' of https://github.com/sigp/lighthouse into cu…
eserilev Sep 3, 2025
9e4fd45
Fix some comments
eserilev Sep 3, 2025
76b5f30
Merge branch 'unstable' of https://github.com/sigp/lighthouse into cu…
eserilev Sep 4, 2025
8579c03
Await finalization before triggering custody backfill sync
eserilev Sep 4, 2025
3fceed3
fix finalization delay
eserilev Sep 9, 2025
6705dd8
fix
eserilev Sep 9, 2025
2644404
fix tests
eserilev Sep 9, 2025
93d5786
Fix test
eserilev Sep 9, 2025
eb2b075
FMT
eserilev Sep 9, 2025
316583a
fix test
eserilev Sep 9, 2025
95e3e6d
Resolve conflicts
eserilev Sep 11, 2025
8b757ad
Notifier fixes
eserilev Sep 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions beacon_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ slasher = { workspace = true }
store = { workspace = true }
strum = { workspace = true }
task_executor = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
types = { workspace = true }

Expand Down
22 changes: 22 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataC
use crate::early_attester_cache::EarlyAttesterCache;
use crate::errors::{BeaconChainError as Error, BlockProductionError};
use crate::events::ServerSentEventHandler;
use crate::events::SyncServiceMessage;
use crate::execution_payload::{NotifyExecutionLayer, PreparePayloadHandle, get_execution_payload};
use crate::fetch_blobs::EngineGetBlobsOutput;
use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult};
Expand Down Expand Up @@ -125,6 +126,7 @@ use store::{
KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp,
};
use task_executor::{ShutdownReason, TaskExecutor};
use tokio::sync::mpsc;
use tokio_stream::Stream;
use tracing::{Span, debug, debug_span, error, info, info_span, instrument, trace, warn};
use tree_hash::TreeHash;
Expand Down Expand Up @@ -490,6 +492,8 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub kzg: Arc<Kzg>,
/// RNG instance used by the chain. Currently used for shuffling column sidecars in block publishing.
pub rng: Arc<Mutex<Box<dyn RngCore + Send>>>,
/// The sending channel for the beacon chain to send messages to sync.
pub sync_service_send: mpsc::UnboundedSender<SyncServiceMessage>,
}

pub enum BeaconBlockResponseWrapper<E: EthSpec> {
Expand Down Expand Up @@ -6898,6 +6902,24 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
}

/// The da boundary for custodying columns. It will just be the DA boundary unless we are near the Fulu fork epoch.
pub fn get_column_da_boundary(&self) -> Option<Epoch> {
match self.data_availability_boundary() {
Some(da_boundary_epoch) => {
if let Some(fulu_fork_epoch) = self.spec.fulu_fork_epoch {
if da_boundary_epoch < fulu_fork_epoch {
Some(fulu_fork_epoch)
} else {
Some(da_boundary_epoch)
}
} else {
None
}
}
None => None, // If no DA boundary set, dont try to custody backfill
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove comment

}
}

/// This method serves to get a sense of the current chain health. It is used in block proposal
/// to determine whether we should outsource payload production duties.
///
Expand Down
20 changes: 20 additions & 0 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::beacon_chain::{
};
use crate::beacon_proposer_cache::BeaconProposerCache;
use crate::data_availability_checker::DataAvailabilityChecker;
use crate::events::SyncServiceMessage;
use crate::fork_choice_signal::ForkChoiceSignalTx;
use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary};
use crate::graffiti_calculator::{GraffitiCalculator, GraffitiOrigin};
Expand Down Expand Up @@ -38,6 +39,7 @@ use std::sync::Arc;
use std::time::Duration;
use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp};
use task_executor::{ShutdownReason, TaskExecutor};
use tokio::sync::mpsc;
use tracing::{debug, error, info};
use types::{
BeaconBlock, BeaconState, BlobSidecarList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec,
Expand Down Expand Up @@ -102,6 +104,7 @@ pub struct BeaconChainBuilder<T: BeaconChainTypes> {
validator_monitor_config: Option<ValidatorMonitorConfig>,
import_all_data_columns: bool,
rng: Option<Box<dyn RngCore + Send>>,
sync_service_send: Option<mpsc::UnboundedSender<SyncServiceMessage>>,
}

impl<TSlotClock, E, THotStore, TColdStore>
Expand Down Expand Up @@ -141,6 +144,7 @@ where
validator_monitor_config: None,
import_all_data_columns: false,
rng: None,
sync_service_send: None,
}
}

Expand Down Expand Up @@ -662,6 +666,14 @@ where
self
}

pub fn sync_service_send(
mut self,
sync_service_send: Option<mpsc::UnboundedSender<SyncServiceMessage>>,
) -> Self {
self.sync_service_send = sync_service_send;
self
}

/// Fetch a reference to the slot clock.
///
/// Can be used for mutation during testing due to `SlotClock`'s internal mutability.
Expand Down Expand Up @@ -941,6 +953,10 @@ where
};
debug!(?custody_context, "Loading persisted custody context");

let Some(sync_service_send) = self.sync_service_send else {
return Err("Must supply a sync service send channel".to_string());
};

let beacon_chain = BeaconChain {
spec: self.spec.clone(),
config: self.chain_config,
Expand Down Expand Up @@ -1023,6 +1039,7 @@ where
),
kzg: self.kzg.clone(),
rng: Arc::new(Mutex::new(rng)),
sync_service_send,
};

let head = beacon_chain.head_snapshot();
Expand Down Expand Up @@ -1244,6 +1261,8 @@ mod test {

let kzg = get_kzg(&spec);

let (sync_service_send, _) = mpsc::unbounded_channel::<SyncServiceMessage>();

let chain = Builder::new(MinimalEthSpec, kzg)
.store(Arc::new(store))
.task_executor(runtime.task_executor.clone())
Expand All @@ -1252,6 +1271,7 @@ mod test {
.testing_slot_clock(Duration::from_secs(1))
.expect("should configure testing slot clock")
.shutdown_sender(shutdown_tx)
.sync_service_send(Some(sync_service_send))
.rng(Box::new(StdRng::seed_from_u64(42)))
.build()
.expect("should build");
Expand Down
38 changes: 38 additions & 0 deletions beacon_node/beacon_chain/src/canonical_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
//! the head block root. This is unacceptable for fast-responding functions like the networking
//! stack.

use crate::events::SyncServiceMessage;
use crate::persisted_fork_choice::PersistedForkChoice;
use crate::shuffling_cache::BlockShufflingIds;
use crate::{
Expand Down Expand Up @@ -991,12 +992,49 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.process_prune_blobs(data_availability_boundary);
}

if self.should_trigger_custody_backfill_sync(new_view.finalized_checkpoint.epoch)
&& let Err(e) = self
.sync_service_send
.send(SyncServiceMessage::EarliestCustodyEpochFinalized)
{
error!(
finalized_epoch=?new_view.finalized_checkpoint.epoch,
error=?e,
"Unable to trigger custody backfill sync at finalized epoch"
);
}

// Take a write-lock on the canonical head and signal for it to prune.
self.canonical_head.fork_choice_write_lock().prune()?;

Ok(())
}

/// Checks if we should trigger custody backfill sync.
/// Returns true if the earliest data column custodied is from an epoch
/// that's greater than or equal to the most recently finalized epoch and is within the DA boundary.
fn should_trigger_custody_backfill_sync(&self, finalized_epoch: Epoch) -> bool {
let Some(earliest_data_column_slot) = self
.store
.get_data_column_custody_info()
.ok()
.flatten()
.and_then(|custody_info| custody_info.earliest_data_column_slot)
else {
return false;
};

let Some(column_da_boundary) = self.get_column_da_boundary() else {
return false;
};

let earliest_data_column_epoch =
earliest_data_column_slot.epoch(T::EthSpec::slots_per_epoch());

earliest_data_column_epoch >= finalized_epoch
&& earliest_data_column_epoch > column_da_boundary
}

/// Persist fork choice to disk, writing immediately.
pub fn persist_fork_choice(&self) -> Result<(), Error> {
let _fork_choice_timer = metrics::start_timer(&metrics::PERSIST_FORK_CHOICE);
Expand Down
10 changes: 9 additions & 1 deletion beacon_node/beacon_chain/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::collections::HashSet;

pub use eth2::types::{EventKind, SseBlock, SseFinalizedCheckpoint, SseHead};
use tokio::sync::broadcast;
use tokio::sync::broadcast::{Receiver, Sender, error::SendError};
use tracing::trace;
use types::EthSpec;
use types::{ColumnIndex, EthSpec};

const DEFAULT_CHANNEL_CAPACITY: usize = 16;

Expand Down Expand Up @@ -312,3 +314,9 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
self.block_gossip_tx.receiver_count() > 0
}
}

#[derive(Debug)]
pub enum SyncServiceMessage {
CustodyCountChanged { columns: HashSet<ColumnIndex> },
EarliestCustodyEpochFinalized,
}
145 changes: 145 additions & 0 deletions beacon_node/beacon_chain/src/historical_data_columns.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use std::collections::{HashMap, HashSet};

use crate::{
BeaconChain, BeaconChainError, BeaconChainTypes,
data_column_verification::verify_kzg_for_data_column_list,
};
use store::{Error as StoreError, KeyValueStore};
use types::{ColumnIndex, DataColumnSidecarList, Epoch, EthSpec, Hash256, Slot};

#[derive(Debug)]
pub enum HistoricalDataColumnError {
// The provided data column sidecar pertains to a block that doesn't exist in the database.
NoBlockFound {
data_column_block_root: Hash256,
},

/// Logic error: should never occur.
IndexOutOfBounds,

/// The provided data column sidecar list doesn't contain columns for the full range of slots for the given epoch.
MissingDataColumns {
missing_slots_and_data_columns: Vec<(Slot, ColumnIndex)>,
},

/// The provided data column sidecar list contains at least one column with an invalid kzg commitment.
InvalidKzg,

/// Internal store error
StoreError(StoreError),

/// Internal beacon chain error
BeaconChainError(Box<BeaconChainError>),
}

impl From<StoreError> for HistoricalDataColumnError {
fn from(e: StoreError) -> Self {
Self::StoreError(e)
}
}

impl<T: BeaconChainTypes> BeaconChain<T> {
/// Store a batch of historical data columns in the database.
///
/// The data columns block roots and proposer signatures are verified with the existing
/// block stored in the DB. This function assumes that KZG proofs have already been verified.
///
/// This function requires that the data column sidecar list contains columns for a full epoch.
///
/// Return the number of `data_columns` successfully imported.
pub fn import_historical_data_column_batch(
&self,
epoch: Epoch,
historical_data_column_sidecar_list: DataColumnSidecarList<T::EthSpec>,
) -> Result<usize, HistoricalDataColumnError> {
let mut total_imported = 0;
let mut ops = vec![];

let unique_column_indices = historical_data_column_sidecar_list
.iter()
.map(|item| item.index)
.collect::<HashSet<_>>();

let mut slot_and_column_index_to_data_columns = historical_data_column_sidecar_list
.iter()
.map(|data_column| ((data_column.slot(), data_column.index), data_column))
.collect::<HashMap<_, _>>();

if historical_data_column_sidecar_list.is_empty() {
return Ok(total_imported);
}

let forward_blocks_iter = self
.forwards_iter_block_roots_until(
epoch.start_slot(T::EthSpec::slots_per_epoch()),
epoch.end_slot(T::EthSpec::slots_per_epoch()),
)
.map_err(|e| HistoricalDataColumnError::BeaconChainError(Box::new(e)))?;

for block_iter_result in forward_blocks_iter {
let (block_root, slot) = block_iter_result
.map_err(|e| HistoricalDataColumnError::BeaconChainError(Box::new(e)))?;

for column_index in unique_column_indices.clone() {
if let Some(data_column) =
slot_and_column_index_to_data_columns.remove(&(slot, column_index))
{
if self
.store
.get_data_column(&block_root, &data_column.index)?
.is_none()
{
tracing::debug!(
block_root = ?block_root,
column_index = data_column.index,
"Skipping data column import as identical data column exists"
);
continue;
}
if block_root != data_column.block_root() {
return Err(HistoricalDataColumnError::NoBlockFound {
data_column_block_root: data_column.block_root(),
});
}
self.store.data_column_as_kv_store_ops(
&block_root,
data_column.clone(),
&mut ops,
);
total_imported += 1;
}
}
}

verify_kzg_for_data_column_list(historical_data_column_sidecar_list.iter(), &self.kzg)
.map_err(|_| HistoricalDataColumnError::InvalidKzg)?;

self.store.blobs_db.do_atomically(ops)?;

if slot_and_column_index_to_data_columns.is_empty() {
self.store.put_data_column_custody_info(Some(
epoch.start_slot(T::EthSpec::slots_per_epoch()),
))?;
} else {
tracing::warn!(
?epoch,
missing_slots = ?slot_and_column_index_to_data_columns.keys().map(|(slot, _)| slot),
"Some data columns are missing from the batch"
);
return Err(HistoricalDataColumnError::MissingDataColumns {
missing_slots_and_data_columns: slot_and_column_index_to_data_columns
.keys()
.cloned()
.collect::<Vec<_>>(),
});
}

self.data_availability_checker
.custody_context()
.backfill_custody_count_at_epoch(epoch);

tracing::debug!(total_imported, "Imported historical data columns");

Ok(total_imported)
}
}
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub mod fork_choice_signal;
pub mod fork_revert;
pub mod graffiti_calculator;
pub mod historical_blocks;
pub mod historical_data_columns;
pub mod kzg_utils;
pub mod light_client_finality_update_verification;
pub mod light_client_optimistic_update_verification;
Expand Down
Loading