diff --git a/crates/engine/src/future/result.rs b/crates/engine/src/future/result.rs index ffd113ba..b156ae76 100644 --- a/crates/engine/src/future/result.rs +++ b/crates/engine/src/future/result.rs @@ -1,6 +1,7 @@ use super::*; /// A type that represents the result of the engine driver future. +#[derive(Debug)] pub(crate) enum EngineDriverFutureResult { BlockImport( Result< diff --git a/crates/manager/src/manager/command.rs b/crates/manager/src/manager/command.rs index 2b42fc8c..353107fe 100644 --- a/crates/manager/src/manager/command.rs +++ b/crates/manager/src/manager/command.rs @@ -1,15 +1,20 @@ use super::{RollupManagerEvent, RollupManagerStatus}; +use reth_network_api::FullNetwork; +use reth_scroll_node::ScrollNetworkPrimitives; use reth_tokio_util::EventStream; +use scroll_network::ScrollNetworkHandle; use tokio::sync::oneshot; /// The commands that can be sent to the rollup manager. #[derive(Debug)] -pub enum RollupManagerCommand { +pub enum RollupManagerCommand> { /// Command to build a new block. BuildBlock, /// Returns an event stream for rollup manager events. EventListener(oneshot::Sender>), /// Report the current status of the manager via the oneshot channel. Status(oneshot::Sender), + /// Returns the network handle. + NetworkHandle(oneshot::Sender>), } diff --git a/crates/manager/src/manager/handle.rs b/crates/manager/src/manager/handle.rs index 0651add0..bc578c04 100644 --- a/crates/manager/src/manager/handle.rs +++ b/crates/manager/src/manager/handle.rs @@ -1,22 +1,25 @@ use super::{RollupManagerCommand, RollupManagerEvent}; +use reth_network_api::FullNetwork; +use reth_scroll_node::ScrollNetworkPrimitives; use reth_tokio_util::EventStream; +use scroll_network::ScrollNetworkHandle; use tokio::sync::{mpsc, oneshot}; /// The handle used to send commands to the rollup manager. #[derive(Debug, Clone)] -pub struct RollupManagerHandle { +pub struct RollupManagerHandle> { /// The channel used to send commands to the rollup manager. - to_manager_tx: mpsc::Sender, + to_manager_tx: mpsc::Sender>, } -impl RollupManagerHandle { +impl> RollupManagerHandle { /// Create a new rollup manager handle. - pub const fn new(to_manager_tx: mpsc::Sender) -> Self { + pub const fn new(to_manager_tx: mpsc::Sender>) -> Self { Self { to_manager_tx } } /// Sends a command to the rollup manager. - pub async fn send_command(&self, command: RollupManagerCommand) { + pub async fn send_command(&self, command: RollupManagerCommand) { let _ = self.to_manager_tx.send(command).await; } @@ -25,6 +28,15 @@ impl RollupManagerHandle { self.send_command(RollupManagerCommand::BuildBlock).await; } + /// Sends a command to the rollup manager to get the network handle. + pub async fn get_network_handle( + &self, + ) -> Result, oneshot::error::RecvError> { + let (tx, rx) = oneshot::channel(); + self.send_command(RollupManagerCommand::NetworkHandle(tx)).await; + rx.await + } + /// Sends a command to the rollup manager to fetch an event listener for the rollup node /// manager. pub async fn get_event_listener( diff --git a/crates/manager/src/manager/mod.rs b/crates/manager/src/manager/mod.rs index d4c9d3a9..4557ac7a 100644 --- a/crates/manager/src/manager/mod.rs +++ b/crates/manager/src/manager/mod.rs @@ -78,7 +78,7 @@ pub struct RollupNodeManager< CS, > { /// The handle receiver used to receive commands. - handle_rx: Receiver, + handle_rx: Receiver>, /// The chain spec used by the rollup node. chain_spec: Arc, /// The network manager that manages the scroll p2p network. @@ -164,7 +164,7 @@ where sequencer: Option>, signer: Option, block_time: Option, - ) -> (Self, RollupManagerHandle) { + ) -> (Self, RollupManagerHandle) { let (handle_tx, handle_rx) = mpsc::channel(EVENT_CHANNEL_SIZE); let indexer = Indexer::new(database.clone(), chain_spec.clone()); let derivation_pipeline = DerivationPipeline::new(l1_provider, database); @@ -450,6 +450,11 @@ where RollupManagerCommand::Status(tx) => { tx.send(this.status()).expect("Failed to send status to handle"); } + RollupManagerCommand::NetworkHandle(tx) => { + let network_handle = this.network.handle(); + tx.send(network_handle.clone()) + .expect("Failed to send network handle to handle"); + } } } diff --git a/crates/node/src/add_ons/handle.rs b/crates/node/src/add_ons/handle.rs index e5a37480..757e7931 100644 --- a/crates/node/src/add_ons/handle.rs +++ b/crates/node/src/add_ons/handle.rs @@ -1,15 +1,20 @@ +use reth_network_api::FullNetwork; use reth_node_api::FullNodeComponents; use reth_node_builder::rpc::{RpcHandle, RpcHandleProvider}; use reth_rpc_eth_api::EthApiTypes; +use reth_scroll_node::ScrollNetworkPrimitives; use rollup_node_manager::RollupManagerHandle; #[cfg(feature = "test-utils")] use {rollup_node_watcher::L1Notification, std::sync::Arc, tokio::sync::mpsc::Sender}; /// A handle for scroll addons, which includes handles for the rollup manager and RPC server. #[derive(Debug, Clone)] -pub struct ScrollAddOnsHandle { +pub struct ScrollAddOnsHandle< + Node: FullNodeComponents>, + EthApi: EthApiTypes, +> { /// The handle used to send commands to the rollup manager. - pub rollup_manager_handle: RollupManagerHandle, + pub rollup_manager_handle: RollupManagerHandle, /// The handle used to send commands to the RPC server. pub rpc_handle: RpcHandle, /// An optional channel used to send `L1Watcher` notifications to the `RollupNodeManager`. @@ -17,8 +22,10 @@ pub struct ScrollAddOnsHandle { pub l1_watcher_tx: Option>>, } -impl RpcHandleProvider - for ScrollAddOnsHandle +impl< + Node: FullNodeComponents>, + EthApi: EthApiTypes, + > RpcHandleProvider for ScrollAddOnsHandle { fn rpc_handle(&self) -> &RpcHandle { &self.rpc_handle diff --git a/crates/node/src/add_ons/rollup.rs b/crates/node/src/add_ons/rollup.rs index aed820ee..0c49654c 100644 --- a/crates/node/src/add_ons/rollup.rs +++ b/crates/node/src/add_ons/rollup.rs @@ -50,7 +50,7 @@ impl RollupManagerAddOn { self, ctx: AddOnsContext<'_, N>, rpc: RpcHandle, - ) -> eyre::Result<(RollupManagerHandle, Option>>)> + ) -> eyre::Result<(RollupManagerHandle, Option>>)> where <::Types as NodeTypes>::ChainSpec: ScrollHardforks + IsDevChain, N::Network: NetworkProtocols + FullNetwork, diff --git a/crates/node/src/args.rs b/crates/node/src/args.rs index 719193e3..826669d4 100644 --- a/crates/node/src/args.rs +++ b/crates/node/src/args.rs @@ -134,7 +134,7 @@ impl ScrollRollupNodeConfig { impl L1MessageProvider, impl ScrollHardforks + EthChainSpec + IsDevChain + Clone + 'static, >, - RollupManagerHandle, + RollupManagerHandle, Option>>, )> { tracing::info!(target: "rollup_node::args", diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index 8c7a8fe2..d64b2d91 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -6,11 +6,12 @@ use alloy_signer::Signer; use alloy_signer_local::PrivateKeySigner; use futures::StreamExt; use reth_chainspec::EthChainSpec; -use reth_network::{NetworkConfigBuilder, PeersInfo}; +use reth_network::{NetworkConfigBuilder, Peers, PeersInfo}; use reth_network_api::block::EthWireProvider; use reth_rpc_api::EthApiServer; use reth_scroll_chainspec::SCROLL_DEV; use reth_scroll_node::ScrollNetworkPrimitives; +use reth_scroll_primitives::ScrollBlock; use reth_tokio_util::EventStream; use rollup_node::{ test_utils::{ @@ -21,7 +22,7 @@ use rollup_node::{ GasPriceOracleArgs, L1ProviderArgs, NetworkArgs as ScrollNetworkArgs, ScrollRollupNodeConfig, SequencerArgs, }; -use rollup_node_manager::{RollupManagerCommand, RollupManagerEvent, RollupManagerHandle}; +use rollup_node_manager::{RollupManagerCommand, RollupManagerEvent}; use rollup_node_primitives::{sig_encode_hash, BatchCommitData, ConsensusUpdate}; use rollup_node_providers::BlobSource; use rollup_node_sequencer::L1MessageInclusionMode; @@ -29,8 +30,11 @@ use rollup_node_watcher::L1Notification; use scroll_alloy_consensus::TxL1Message; use scroll_network::{NewBlockWithPeer, SCROLL_MAINNET}; use scroll_wire::{ScrollWireConfig, ScrollWireProtocolHandler}; -use std::{path::PathBuf, sync::Arc}; -use tokio::sync::{oneshot, Mutex}; +use std::{path::PathBuf, sync::Arc, time::Duration}; +use tokio::{ + sync::{oneshot, Mutex}, + time, +}; use tracing::trace; #[tokio::test] @@ -63,7 +67,7 @@ async fn can_bridge_l1_messages() -> eyre::Result<()> { let (mut nodes, _tasks, _wallet) = setup_engine(node_args, 1, chain_spec, false, false).await?; let node = nodes.pop().unwrap(); - let rnm_handle: RollupManagerHandle = node.inner.add_ons_handle.rollup_manager_handle.clone(); + let rnm_handle = node.inner.add_ons_handle.rollup_manager_handle.clone(); let mut rnm_events = rnm_handle.get_event_listener().await?; let l1_watcher_tx = node.inner.add_ons_handle.l1_watcher_tx.clone().unwrap(); @@ -168,6 +172,94 @@ async fn can_sequence_and_gossip_blocks() { .await; } +#[tokio::test] +async fn can_penalize_peer_for_invalid_block() { + reth_tracing::init_test_tracing(); + + // create 2 nodes + let chain_spec = (*SCROLL_DEV).clone(); + let rollup_manager_args = ScrollRollupNodeConfig { + test: true, + network_args: ScrollNetworkArgs { + enable_eth_scroll_wire_bridge: true, + enable_scroll_wire: true, + sequencer_url: None, + }, + database_args: DatabaseArgs { path: Some(PathBuf::from("sqlite::memory:")) }, + l1_provider_args: L1ProviderArgs::default(), + engine_driver_args: EngineDriverArgs::default(), + sequencer_args: SequencerArgs { + sequencer_enabled: true, + block_time: 0, + max_l1_messages_per_block: 4, + l1_message_inclusion_mode: L1MessageInclusionMode::BlockDepth(0), + payload_building_duration: 1000, + ..SequencerArgs::default() + }, + beacon_provider_args: BeaconProviderArgs { + blob_source: BlobSource::Mock, + ..Default::default() + }, + signer_args: Default::default(), + gas_price_oracle_args: GasPriceOracleArgs::default(), + consensus_args: ConsensusArgs::noop(), + }; + + let (nodes, _tasks, _) = + setup_engine(rollup_manager_args, 2, chain_spec, false, false).await.unwrap(); + + let node0_rmn_handle = nodes[0].inner.add_ons_handle.rollup_manager_handle.clone(); + let node0_network_handle = node0_rmn_handle.get_network_handle().await.unwrap(); + let node0_id = node0_network_handle.inner().peer_id(); + + let node1_rnm_handle = nodes[1].inner.add_ons_handle.rollup_manager_handle.clone(); + let node1_network_handle = node1_rnm_handle.get_network_handle().await.unwrap(); + + // get initial reputation of node0 from pov of node1 + let initial_reputation = + node1_network_handle.inner().reputation_by_id(*node0_id).await.unwrap().unwrap(); + assert_eq!(initial_reputation, 0); + + // create invalid block + let block = ScrollBlock::default(); + + // send invalid block from node0 to node1. We don't care about the signature here since we use a + // NoopConsensus in the test. + node0_network_handle.announce_block(block, Signature::new(U256::from(1), U256::from(1), false)); + + eventually( + Duration::from_secs(5), + Duration::from_millis(10), + "Peer0 reputation should be lower after sending invalid block", + || async { + // check that the node0 is penalized on node1 + let slashed_reputation = + node1_network_handle.inner().reputation_by_id(*node0_id).await.unwrap().unwrap(); + slashed_reputation < initial_reputation + }, + ) + .await; +} + +/// Helper function to wait until a predicate is true or a timeout occurs. +pub async fn eventually(timeout: Duration, tick: Duration, message: &str, mut predicate: F) +where + F: FnMut() -> Fut, + Fut: std::future::Future, +{ + let mut interval = time::interval(tick); + let start = time::Instant::now(); + loop { + if predicate().await { + return; + } + + assert!(start.elapsed() <= timeout, "Timeout while waiting for condition: {message}"); + + interval.tick().await; + } +} + #[allow(clippy::large_stack_frames)] #[tokio::test] async fn can_sequence_and_gossip_transactions() {