Skip to content

Split up add_epoch_root to allow stake table storage to happen outsid… #3312

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions crates/hotshot/hotshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1219,17 +1219,21 @@ async fn load_start_epoch_info<TYPES: NodeType>(
for epoch_info in start_epoch_info {
if let Some(block_header) = &epoch_info.block_header {
tracing::info!("Calling add_epoch_root for epoch {:?}", epoch_info.epoch);
let write_callback = {
let membership_reader = membership.read().await;
membership_reader
.add_epoch_root(epoch_info.epoch, block_header.clone())
.await
};

if let Ok(Some(write_callback)) = write_callback {
let mut membership_writer = membership.write().await;
write_callback(&mut *membership_writer);
}
Membership::add_epoch_root(
Arc::clone(membership),
epoch_info.epoch,
block_header.clone(),
)
.await
.unwrap_or_else(|err| {
// REVIEW NOTE: Should we panic here? a failure here seems like it should be fatal
tracing::error!(
"Failed to add epoch root for epoch {:?}: {}",
epoch_info.epoch,
err
);
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,16 +208,15 @@ where
self.inner.set_first_epoch(epoch, initial_drb_result);
}

#[allow(refining_impl_trait)]
async fn add_epoch_root(
&self,
membership: Arc<RwLock<Self>>,
epoch: TYPES::Epoch,
_block_header: TYPES::BlockHeader,
) -> anyhow::Result<Option<Box<dyn FnOnce(&mut Self) + Send>>> {
Ok(Some(Box::new(move |mem: &mut Self| {
tracing::error!("Adding epoch root for {epoch}");
mem.epochs.insert(epoch);
})))
) -> anyhow::Result<()> {
let mut membership_writer = membership.write().await;
tracing::error!("Adding epoch root for {epoch}");
membership_writer.epochs.insert(epoch);
Ok(())
}

fn first_epoch(&self) -> Option<TYPES::Epoch> {
Expand Down
141 changes: 77 additions & 64 deletions crates/hotshot/task-impls/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use hotshot_task::dependency::{Dependency, EventDependency};
use hotshot_types::{
consensus::OuterConsensus,
data::{Leaf2, QuorumProposalWrapper, VidDisperseShare, ViewChangeEvidence2},
drb::{DrbInput, DrbResult, DrbSeedInput},
drb::{DrbInput, DrbResult},
epoch_membership::EpochMembershipCoordinator,
event::{Event, EventType, LeafInfo},
message::{Proposal, UpgradeLock},
Expand Down Expand Up @@ -167,38 +167,7 @@ pub async fn handle_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>>(

membership.write().await.add_drb_result(epoch, drb_result)
}
/// Start the DRB computation task for the next epoch.
async fn start_drb_task<TYPES: NodeType, I: NodeImplementation<TYPES>>(
seed: DrbSeedInput,
epoch: TYPES::Epoch,
membership: &Arc<RwLock<TYPES::Membership>>,
storage: &I::Storage,
consensus: &OuterConsensus<TYPES>,
) {
let membership = membership.clone();
let storage = storage.clone();
let store_drb_progress_fn = store_drb_progress_fn(storage.clone());
let load_drb_progress_fn = load_drb_progress_fn(storage.clone());
let consensus = consensus.clone();
let difficulty_level = consensus.read().await.drb_difficulty;
let drb_input = DrbInput {
epoch: *epoch,
iteration: 0,
value: seed,
difficulty_level,
};
tokio::spawn(async move {
let drb_result = hotshot_types::drb::compute_drb_result(
drb_input,
store_drb_progress_fn,
load_drb_progress_fn,
)
.await;

handle_drb_result::<TYPES, I>(&membership, epoch, &storage, &consensus, drb_result).await;
drb_result
});
}
/// Handles calling add_epoch_root and sync_l1 on Membership if necessary.
async fn decide_epoch_root<TYPES: NodeType, I: NodeImplementation<TYPES>>(
decided_leaf: &Leaf2<TYPES>,
Expand All @@ -214,7 +183,7 @@ async fn decide_epoch_root<TYPES: NodeType, I: NodeImplementation<TYPES>>(
let next_epoch_number =
TYPES::Epoch::new(epoch_from_block_number(decided_block_number, epoch_height) + 2);

let mut start = Instant::now();
let start = Instant::now();
if let Err(e) = storage
.store_epoch_root(next_epoch_number, decided_leaf.block_header().clone())
.await
Expand All @@ -223,44 +192,88 @@ async fn decide_epoch_root<TYPES: NodeType, I: NodeImplementation<TYPES>>(
}
tracing::info!("Time taken to store epoch root: {:?}", start.elapsed());

start = Instant::now();
let write_callback = {
tracing::debug!("Calling add_epoch_root for epoch {next_epoch_number}");
let membership_reader = membership.read().await;
membership_reader
.add_epoch_root(next_epoch_number, decided_leaf.block_header().clone())
.await
};
if let Ok(Some(write_callback)) = write_callback {
let mut membership_writer = membership.write().await;
write_callback(&mut *membership_writer);
}
tracing::info!("Time taken to add epoch root: {:?}", start.elapsed());

let mut consensus_writer = consensus.write().await;
consensus_writer
.drb_results
.garbage_collect(next_epoch_number);
drop(consensus_writer);

let Ok(drb_seed_input_vec) = bincode::serialize(&decided_leaf.justify_qc().signatures)
else {
tracing::error!("Failed to serialize the QC signature.");
return;
};

let mut drb_seed_input = [0u8; 32];
let len = drb_seed_input_vec.len().min(32);
drb_seed_input[..len].copy_from_slice(&drb_seed_input_vec[..len]);
let membership = membership.clone();
let decided_block_header = decided_leaf.block_header().clone();
let storage = storage.clone();
let store_drb_progress_fn = store_drb_progress_fn(storage.clone());
let load_drb_progress_fn = load_drb_progress_fn(storage.clone());
let consensus = consensus.clone();
let difficulty_level = consensus.read().await.drb_difficulty;

tokio::spawn(async move {
let membership_clone = membership.clone();
let epoch_root_future = tokio::spawn(async move {
let start = Instant::now();
if let Err(e) = Membership::add_epoch_root(
Arc::clone(&membership_clone),
next_epoch_number,
decided_block_header,
)
.await
{
tracing::error!("Failed to add epoch root for epoch {next_epoch_number}: {e}");
}
tracing::info!("Time taken to add epoch root: {:?}", start.elapsed());
});

let mut consensus_writer = consensus.write().await;
consensus_writer
.drb_results
.garbage_collect(next_epoch_number);
drop(consensus_writer);

let drb_result_future = tokio::spawn(async move {
let start = Instant::now();
let mut drb_seed_input = [0u8; 32];
let len = drb_seed_input_vec.len().min(32);
drb_seed_input[..len].copy_from_slice(&drb_seed_input_vec[..len]);

let drb_input = DrbInput {
epoch: *next_epoch_number,
iteration: 0,
value: drb_seed_input,
difficulty_level,
};

start_drb_task::<TYPES, I>(
drb_seed_input,
next_epoch_number,
membership,
storage,
consensus,
)
.await;
let drb_result = hotshot_types::drb::compute_drb_result(
drb_input,
store_drb_progress_fn,
load_drb_progress_fn,
)
.await;

tracing::info!("Time taken to calculate drb result: {:?}", start.elapsed());

drb_result
});

let (_, drb_result) = tokio::join!(epoch_root_future, drb_result_future);

let drb_result = match drb_result {
Ok(result) => result,
Err(e) => {
tracing::error!("Failed to compute DRB result: {e}");
return;
},
};

let start = Instant::now();
handle_drb_result::<TYPES, I>(
&membership,
next_epoch_number,
&storage,
&consensus,
drb_result,
)
.await;
tracing::info!("Time taken to handle drb result: {:?}", start.elapsed());
});
}
}

Expand Down
24 changes: 13 additions & 11 deletions crates/hotshot/types/src/epoch_membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,17 +385,19 @@ where
));
};

let add_epoch_root_updater = {
let membership_read = self.membership.read().await;
membership_read
.add_epoch_root(epoch, root_leaf.block_header().clone())
.await
};

if let Ok(Some(updater)) = add_epoch_root_updater {
let mut membership_write = self.membership.write().await;
updater(&mut *(membership_write));
};
Membership::add_epoch_root(
Arc::clone(&self.membership),
epoch,
root_leaf.block_header().clone(),
)
.await
.map_err(|e| {
anytrace::error!(
"Failed to add epoch root for epoch {:?} to membership: {}",
epoch,
e
)
})?;

Ok(root_leaf)
}
Expand Down
26 changes: 16 additions & 10 deletions crates/hotshot/types/src/traits/election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,19 +162,13 @@ pub trait Membership<TYPES: NodeType>: Debug + Send + Sync {
async move { anyhow::bail!("Not implemented") }
}

#[allow(clippy::type_complexity)]
/// Handles notifications that a new epoch root has been created
/// Is called under a read lock to the Membership. Return a callback
/// with Some to have that callback invoked under a write lock.
///
/// #3967 REVIEW NOTE: this is only called if epoch is Some. Is there any reason to do otherwise?
/// Handles notifications that a new epoch root has been created.
fn add_epoch_root(
&self,
_membership: Arc<RwLock<Self>>,
_epoch: TYPES::Epoch,
_block_header: TYPES::BlockHeader,
) -> impl std::future::Future<Output = anyhow::Result<Option<Box<dyn FnOnce(&mut Self) + Send>>>>
+ Send {
async { Ok(None) }
) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
async { Ok(()) }
}

/// Called to notify the Membership when a new DRB result has been calculated.
Expand All @@ -192,3 +186,15 @@ pub trait Membership<TYPES: NodeType>: Debug + Send + Sync {
None
}
}

pub fn membership_spawn_add_epoch_root<TYPES: NodeType>(
membership: Arc<RwLock<impl Membership<TYPES> + 'static>>,
epoch: TYPES::Epoch,
block_header: TYPES::BlockHeader,
) {
tokio::spawn(async move {
if let Err(e) = Membership::<TYPES>::add_epoch_root(membership, epoch, block_header).await {
tracing::error!("Failed to add epoch root for epoch {}: {}", epoch, e);
}
});
}
23 changes: 13 additions & 10 deletions types/src/v0/impls/stake_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1503,24 +1503,27 @@ impl Membership<SeqTypes> for EpochCommittees {
max(higher_threshold, normal_threshold)
}

#[allow(refining_impl_trait)]
async fn add_epoch_root(
&self,
membership: Arc<RwLock<Self>>,
epoch: Epoch,
block_header: Header,
) -> anyhow::Result<Option<Box<dyn FnOnce(&mut Self) + Send>>> {
if self.state.contains_key(&epoch) {
) -> anyhow::Result<()> {
let membership_reader = membership.read().await;
if membership_reader.state.contains_key(&epoch) {
tracing::info!(
"We already have the stake table for epoch {}. Skipping L1 fetching.",
epoch
);
return Ok(None);
return Ok(());
}
let fetcher = Arc::clone(&membership_reader.fetcher);
drop(membership_reader);

let stake_tables = fetcher.fetch(epoch, block_header).await?;

let stake_tables = self.fetcher.fetch(epoch, block_header).await?;
// Store stake table in persistence
{
let persistence_lock = self.fetcher.persistence.lock().await;
let persistence_lock = fetcher.persistence.lock().await;
if let Err(e) = persistence_lock
.store_stake(epoch, stake_tables.clone())
.await
Expand All @@ -1529,9 +1532,9 @@ impl Membership<SeqTypes> for EpochCommittees {
}
}

Ok(Some(Box::new(move |committee: &mut Self| {
committee.update_stake_table(epoch, stake_tables);
})))
let mut membership_writer = membership.write().await;
membership_writer.update_stake_table(epoch, stake_tables);
Ok(())
}

fn has_stake_table(&self, epoch: Epoch) -> bool {
Expand Down