diff --git a/crates/hotshot/hotshot/src/lib.rs b/crates/hotshot/hotshot/src/lib.rs index 278946bc31..39e6c3a1af 100644 --- a/crates/hotshot/hotshot/src/lib.rs +++ b/crates/hotshot/hotshot/src/lib.rs @@ -1218,17 +1218,21 @@ async fn load_start_epoch_info( for epoch_info in start_epoch_info { if let Some(block_header) = &epoch_info.block_header { tracing::info!("Calling add_epoch_root for epoch {:?}", epoch_info.epoch); - let write_callback = { - let membership_reader = membership.read().await; - membership_reader - .add_epoch_root(epoch_info.epoch, block_header.clone()) - .await - }; - if let Ok(Some(write_callback)) = write_callback { - let mut membership_writer = membership.write().await; - write_callback(&mut *membership_writer); - } + Membership::add_epoch_root( + Arc::clone(membership), + epoch_info.epoch, + block_header.clone(), + ) + .await + .unwrap_or_else(|err| { + // REVIEW NOTE: Should we panic here? a failure here seems like it should be fatal + tracing::error!( + "Failed to add epoch root for epoch {:?}: {}", + epoch_info.epoch, + err + ); + }); } } diff --git a/crates/hotshot/hotshot/src/traits/election/dummy_catchup_membership.rs b/crates/hotshot/hotshot/src/traits/election/dummy_catchup_membership.rs index 25ff98edb1..16f439cd7c 100644 --- a/crates/hotshot/hotshot/src/traits/election/dummy_catchup_membership.rs +++ b/crates/hotshot/hotshot/src/traits/election/dummy_catchup_membership.rs @@ -199,16 +199,15 @@ where self.inner.set_first_epoch(epoch, initial_drb_result); } - #[allow(refining_impl_trait)] async fn add_epoch_root( - &self, + membership: Arc>, epoch: TYPES::Epoch, _block_header: TYPES::BlockHeader, - ) -> anyhow::Result>> { - Ok(Some(Box::new(move |mem: &mut Self| { - tracing::error!("Adding epoch root for {epoch}"); - mem.epochs.insert(epoch); - }))) + ) -> anyhow::Result<()> { + let mut membership_writer = membership.write().await; + tracing::error!("Adding epoch root for {epoch}"); + membership_writer.epochs.insert(epoch); + Ok(()) } fn first_epoch(&self) -> Option { diff --git a/crates/hotshot/task-impls/src/helpers.rs b/crates/hotshot/task-impls/src/helpers.rs index 24b3289dd4..7b2e7b1e0f 100644 --- a/crates/hotshot/task-impls/src/helpers.rs +++ b/crates/hotshot/task-impls/src/helpers.rs @@ -31,7 +31,7 @@ use hotshot_types::{ stake_table::StakeTableEntries, traits::{ block_contents::BlockHeader, - election::Membership, + election::{membership_spawn_add_epoch_root, Membership}, node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions}, signature_key::{SignatureKey, StakeTableEntryType, StateSignatureKey}, storage::{load_drb_progress_fn, store_drb_progress_fn, Storage}, @@ -222,18 +222,13 @@ async fn decide_epoch_root>( tracing::info!("Time taken to store epoch root: {:?}", start.elapsed()); start = Instant::now(); - let write_callback = { - tracing::debug!("Calling add_epoch_root for epoch {next_epoch_number}"); - let membership_reader = membership.read().await; - membership_reader - .add_epoch_root(next_epoch_number, decided_leaf.block_header().clone()) - .await - }; - if let Ok(Some(write_callback)) = write_callback { - let mut membership_writer = membership.write().await; - write_callback(&mut *membership_writer); - } - tracing::info!("Time taken to add epoch root: {:?}", start.elapsed()); + + membership_spawn_add_epoch_root( + Arc::clone(membership), + next_epoch_number, + decided_leaf.block_header().clone(), + ); + tracing::info!("Time taken to spawn add epoch root: {:?}", start.elapsed()); let mut consensus_writer = consensus.write().await; consensus_writer diff --git a/crates/hotshot/types/src/epoch_membership.rs b/crates/hotshot/types/src/epoch_membership.rs index 9c569adf9f..e738dba4eb 100644 --- a/crates/hotshot/types/src/epoch_membership.rs +++ b/crates/hotshot/types/src/epoch_membership.rs @@ -379,17 +379,19 @@ where )); }; - let add_epoch_root_updater = { - let membership_read = self.membership.read().await; - membership_read - .add_epoch_root(epoch, root_leaf.block_header().clone()) - .await - }; - - if let Ok(Some(updater)) = add_epoch_root_updater { - let mut membership_write = self.membership.write().await; - updater(&mut *(membership_write)); - }; + Membership::add_epoch_root( + Arc::clone(&self.membership), + epoch, + root_leaf.block_header().clone(), + ) + .await + .map_err(|e| { + anytrace::error!( + "Failed to add epoch root for epoch {:?} to membership: {}", + epoch, + e + ) + })?; Ok(root_leaf) } diff --git a/crates/hotshot/types/src/traits/election.rs b/crates/hotshot/types/src/traits/election.rs index 6d7b4b65f1..24254ac202 100644 --- a/crates/hotshot/types/src/traits/election.rs +++ b/crates/hotshot/types/src/traits/election.rs @@ -162,19 +162,13 @@ pub trait Membership: Debug + Send + Sync { async move { anyhow::bail!("Not implemented") } } - #[allow(clippy::type_complexity)] - /// Handles notifications that a new epoch root has been created - /// Is called under a read lock to the Membership. Return a callback - /// with Some to have that callback invoked under a write lock. - /// - /// #3967 REVIEW NOTE: this is only called if epoch is Some. Is there any reason to do otherwise? + /// Handles notifications that a new epoch root has been created. fn add_epoch_root( - &self, + _membership: Arc>, _epoch: TYPES::Epoch, _block_header: TYPES::BlockHeader, - ) -> impl std::future::Future>>> - + Send { - async { Ok(None) } + ) -> impl std::future::Future> + Send { + async { Ok(()) } } /// Called to notify the Membership when a new DRB result has been calculated. @@ -192,3 +186,15 @@ pub trait Membership: Debug + Send + Sync { None } } + +pub fn membership_spawn_add_epoch_root( + membership: Arc + 'static>>, + epoch: TYPES::Epoch, + block_header: TYPES::BlockHeader, +) { + tokio::spawn(async move { + if let Err(e) = Membership::::add_epoch_root(membership, epoch, block_header).await { + tracing::error!("Failed to add epoch root for epoch {}: {}", epoch, e); + } + }); +} diff --git a/types/src/v0/impls/stake_table.rs b/types/src/v0/impls/stake_table.rs index 76c535b1de..e94a3e7377 100644 --- a/types/src/v0/impls/stake_table.rs +++ b/types/src/v0/impls/stake_table.rs @@ -1503,24 +1503,27 @@ impl Membership for EpochCommittees { max(higher_threshold, normal_threshold) } - #[allow(refining_impl_trait)] async fn add_epoch_root( - &self, + membership: Arc>, epoch: Epoch, block_header: Header, - ) -> anyhow::Result>> { - if self.state.contains_key(&epoch) { + ) -> anyhow::Result<()> { + let membership_reader = membership.read().await; + if membership_reader.state.contains_key(&epoch) { tracing::info!( "We already have the stake table for epoch {}. Skipping L1 fetching.", epoch ); - return Ok(None); + return Ok(()); } + let fetcher = Arc::clone(&membership_reader.fetcher); + drop(membership_reader); + + let stake_tables = fetcher.fetch(epoch, block_header).await?; - let stake_tables = self.fetcher.fetch(epoch, block_header).await?; // Store stake table in persistence { - let persistence_lock = self.fetcher.persistence.lock().await; + let persistence_lock = fetcher.persistence.lock().await; if let Err(e) = persistence_lock .store_stake(epoch, stake_tables.clone()) .await @@ -1529,9 +1532,9 @@ impl Membership for EpochCommittees { } } - Ok(Some(Box::new(move |committee: &mut Self| { - committee.update_stake_table(epoch, stake_tables); - }))) + let mut membership_writer = membership.write().await; + membership_writer.update_stake_table(epoch, stake_tables); + Ok(()) } fn has_stake_table(&self, epoch: Epoch) -> bool {