From c7a5d9ae6c9096432d276c585090ce74753ef492 Mon Sep 17 00:00:00 2001 From: pls148 <184445976+pls148@users.noreply.github.com> Date: Fri, 23 May 2025 15:52:33 -0700 Subject: [PATCH] Split up add_epoch_root to allow stake table storage to happen outside of read lock --- crates/hotshot/hotshot/src/lib.rs | 16 ++++--- .../election/dummy_catchup_membership.rs | 26 +++++++--- crates/hotshot/task-impls/src/helpers.rs | 18 ++++--- crates/hotshot/types/src/epoch_membership.rs | 18 +++---- crates/hotshot/types/src/traits/election.rs | 39 ++++++++++++--- types/src/v0/impls/stake_table.rs | 47 ++++++++++++------- 6 files changed, 113 insertions(+), 51 deletions(-) diff --git a/crates/hotshot/hotshot/src/lib.rs b/crates/hotshot/hotshot/src/lib.rs index ab7433c799..8b7ed69eb5 100644 --- a/crates/hotshot/hotshot/src/lib.rs +++ b/crates/hotshot/hotshot/src/lib.rs @@ -1211,16 +1211,18 @@ async fn load_start_epoch_info( for epoch_info in start_epoch_info { if let Some(block_header) = &epoch_info.block_header { tracing::info!("Calling add_epoch_root for epoch {:?}", epoch_info.epoch); - let write_callback = { + let add_epoch_root_worker = { let membership_reader = membership.read().await; - membership_reader - .add_epoch_root(epoch_info.epoch, block_header.clone()) - .await + membership_reader.add_epoch_root(epoch_info.epoch, block_header.clone()) }; - if let Some(write_callback) = write_callback { - let mut membership_writer = membership.write().await; - write_callback(&mut *membership_writer); + if let Some(worker) = add_epoch_root_worker { + let add_epoch_root_updater = worker().await; + + if let Some(updater) = add_epoch_root_updater { + let mut membership_writer = membership.write().await; + updater(&mut *membership_writer); + } } } } diff --git a/crates/hotshot/hotshot/src/traits/election/dummy_catchup_membership.rs b/crates/hotshot/hotshot/src/traits/election/dummy_catchup_membership.rs index 09473962e0..3441f902bb 100644 --- a/crates/hotshot/hotshot/src/traits/election/dummy_catchup_membership.rs +++ b/crates/hotshot/hotshot/src/traits/election/dummy_catchup_membership.rs @@ -1,4 +1,4 @@ -use std::{collections::HashSet, sync::Arc, time::Duration}; +use std::{collections::HashSet, pin::Pin, sync::Arc, time::Duration}; use alloy::primitives::U256; use anyhow::Ok; @@ -199,15 +199,27 @@ where self.inner.set_first_epoch(epoch, initial_drb_result); } - #[allow(refining_impl_trait)] - async fn add_epoch_root( + fn add_epoch_root( &self, epoch: TYPES::Epoch, _block_header: TYPES::BlockHeader, - ) -> Option> { - Some(Box::new(move |mem: &mut Self| { - tracing::error!("Adding epoch root for {epoch}"); - mem.epochs.insert(epoch); + ) -> Option< + Box< + dyn FnOnce() -> Pin< + Box< + dyn std::future::Future>> + + Send, + >, + > + Send, + >, + > { + Some(Box::new(move || { + Box::pin(async move { + Some(Box::new(move |mem: &mut Self| { + tracing::error!("Adding epoch root for {epoch}"); + mem.epochs.insert(epoch); + }) as Box) + }) })) } diff --git a/crates/hotshot/task-impls/src/helpers.rs b/crates/hotshot/task-impls/src/helpers.rs index 34a1ff721d..607a79a5f8 100644 --- a/crates/hotshot/task-impls/src/helpers.rs +++ b/crates/hotshot/task-impls/src/helpers.rs @@ -222,16 +222,20 @@ async fn decide_epoch_root>( tracing::info!("Time taken to store epoch root: {:?}", start.elapsed()); start = Instant::now(); - let write_callback = { + + let add_epoch_root_worker = { tracing::debug!("Calling add_epoch_root for epoch {next_epoch_number}"); let membership_reader = membership.read().await; - membership_reader - .add_epoch_root(next_epoch_number, decided_leaf.block_header().clone()) - .await + membership_reader.add_epoch_root(next_epoch_number, decided_leaf.block_header().clone()) }; - if let Some(write_callback) = write_callback { - let mut membership_writer = membership.write().await; - write_callback(&mut *membership_writer); + + if let Some(worker) = add_epoch_root_worker { + let add_epoch_root_updater = worker().await; + + if let Some(updater) = add_epoch_root_updater { + let mut membership_writer = membership.write().await; + updater(&mut *membership_writer); + } } tracing::info!("Time taken to add epoch root: {:?}", start.elapsed()); diff --git a/crates/hotshot/types/src/epoch_membership.rs b/crates/hotshot/types/src/epoch_membership.rs index 63ffadc241..fc08248b9d 100644 --- a/crates/hotshot/types/src/epoch_membership.rs +++ b/crates/hotshot/types/src/epoch_membership.rs @@ -373,16 +373,18 @@ where )); }; - let add_epoch_root_updater = { - let membership_read = self.membership.read().await; - membership_read - .add_epoch_root(epoch, root_leaf.block_header().clone()) - .await + let add_epoch_root_worker = { + let membership_reader = self.membership.read().await; + membership_reader.add_epoch_root(epoch, root_leaf.block_header().clone()) }; - if let Some(updater) = add_epoch_root_updater { - let mut membership_write = self.membership.write().await; - updater(&mut *(membership_write)); + if let Some(worker) = add_epoch_root_worker { + let add_epoch_root_updater = worker().await; + + if let Some(updater) = add_epoch_root_updater { + let mut membership_writer = self.membership.write().await; + updater(&mut *membership_writer); + }; }; Ok(root_leaf) diff --git a/crates/hotshot/types/src/traits/election.rs b/crates/hotshot/types/src/traits/election.rs index 6993cd602c..c66966fe11 100644 --- a/crates/hotshot/types/src/traits/election.rs +++ b/crates/hotshot/types/src/traits/election.rs @@ -5,7 +5,7 @@ // along with the HotShot repository. If not, see . //! The election trait, used to decide which node is the leader and determine if a vote is valid. -use std::{collections::BTreeSet, fmt::Debug, sync::Arc}; +use std::{collections::BTreeSet, fmt::Debug, pin::Pin, sync::Arc}; use alloy::primitives::U256; use async_lock::RwLock; @@ -164,17 +164,44 @@ pub trait Membership: Debug + Send + Sync { #[allow(clippy::type_complexity)] /// Handles notifications that a new epoch root has been created - /// Is called under a read lock to the Membership. Return a callback - /// with Some to have that callback invoked under a write lock. + /// This function results in a chain of callbacks being returned. It is invoked + /// synchronously under a read lock. Return an Option-wrapped callback to run an + /// asynchronous handler to execute. This handler returns another Option-wrapped + /// callback, which will be executed synchronously under a write lock to save + /// results back to the membership. /// - /// #3967 REVIEW NOTE: this is only called if epoch is Some. Is there any reason to do otherwise? + /// Example usage: + /// ``` + /// // Do some synchronous work first inside of the read lock + /// Some(Box::new(move || { + /// Box::pin(async move { + /// // We're now in the async context between the two locks. + /// // Do some async work here, like storing the stake table + /// + /// // Return a callback to be executed under the write lock + /// Some(Box::new(move |committee: &mut Self| { + /// // Do some synchronous work here, updating the membership + /// }) as Box) + /// }) + /// })) + /// ``` fn add_epoch_root( &self, _epoch: TYPES::Epoch, _block_header: TYPES::BlockHeader, - ) -> impl std::future::Future>> + Send { - async { None } + ) -> Option< + Box< + dyn FnOnce() -> Pin< + Box< + dyn std::future::Future>> + + Send, + >, + > + Send, + >, + > { + None } + //) -> impl std::future::Future>> + Send { /// Called to notify the Membership when a new DRB result has been calculated. /// Observes the same semantics as add_epoch_root diff --git a/types/src/v0/impls/stake_table.rs b/types/src/v0/impls/stake_table.rs index 2ab12ada8e..9c6b7c1f94 100644 --- a/types/src/v0/impls/stake_table.rs +++ b/types/src/v0/impls/stake_table.rs @@ -2,6 +2,7 @@ use std::{ cmp::{max, min}, collections::{BTreeMap, BTreeSet, HashMap, HashSet}, future::Future, + pin::Pin, sync::Arc, }; @@ -1221,12 +1222,20 @@ impl Membership for EpochCommittees { max(higher_threshold, normal_threshold) } - #[allow(refining_impl_trait)] - async fn add_epoch_root( + fn add_epoch_root( &self, epoch: Epoch, block_header: Header, - ) -> Option> { + ) -> Option< + Box< + dyn FnOnce() -> Pin< + Box< + dyn std::future::Future>> + + Send, + >, + > + Send, + >, + > { if self.state.contains_key(&epoch) { tracing::info!( "We already have a the stake table for epoch {}. Skipping L1 fetching.", @@ -1235,20 +1244,26 @@ impl Membership for EpochCommittees { return None; } - let stake_tables = self.fetcher.fetch(epoch, block_header).await?; - // Store stake table in persistence - { - let persistence_lock = self.fetcher.persistence.lock().await; - if let Err(e) = persistence_lock - .store_stake(epoch, stake_tables.clone()) - .await - { - tracing::error!(?e, "`add_epoch_root`, error storing stake table"); - } - } + let fetcher = Arc::clone(&self.fetcher); + + Some(Box::new(move || { + Box::pin(async move { + let stake_tables = fetcher.fetch(epoch, block_header).await?; + // Store stake table in persistence + { + let persistence_lock = fetcher.persistence.lock().await; + if let Err(e) = persistence_lock + .store_stake(epoch, stake_tables.clone()) + .await + { + tracing::error!(?e, "`add_epoch_root`, error storing stake table"); + } + } - Some(Box::new(move |committee: &mut Self| { - committee.update_stake_table(epoch, stake_tables); + Some(Box::new(move |committee: &mut Self| { + committee.update_stake_table(epoch, stake_tables); + }) as Box) + }) })) }