Skip to content

feat(e2e tests): invalid block penalizes peer #228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/engine/src/future/result.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::*;

#[derive(Debug)]
/// A type that represents the result of the engine driver future.
Comment on lines +3 to 4
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we do comments then derive

pub(crate) enum EngineDriverFutureResult {
BlockImport(
Expand Down
7 changes: 6 additions & 1 deletion crates/manager/src/manager/command.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
use super::{RollupManagerEvent, RollupManagerStatus};

use reth_network_api::FullNetwork;
use reth_scroll_node::ScrollNetworkPrimitives;
use reth_tokio_util::EventStream;
use scroll_network::ScrollNetworkHandle;
use tokio::sync::oneshot;

/// The commands that can be sent to the rollup manager.
#[derive(Debug)]
pub enum RollupManagerCommand {
pub enum RollupManagerCommand<N: FullNetwork<Primitives = ScrollNetworkPrimitives>> {
/// Command to build a new block.
BuildBlock,
/// Returns an event stream for rollup manager events.
EventListener(oneshot::Sender<EventStream<RollupManagerEvent>>),
/// Report the current status of the manager via the oneshot channel.
Status(oneshot::Sender<RollupManagerStatus>),
/// Returns the network handle.
NetworkHandle(oneshot::Sender<ScrollNetworkHandle<N>>),
}
22 changes: 17 additions & 5 deletions crates/manager/src/manager/handle.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
use super::{RollupManagerCommand, RollupManagerEvent};
use reth_network_api::FullNetwork;
use reth_scroll_node::ScrollNetworkPrimitives;
use reth_tokio_util::EventStream;
use scroll_network::ScrollNetworkHandle;
use tokio::sync::{mpsc, oneshot};

/// The handle used to send commands to the rollup manager.
#[derive(Debug, Clone)]
pub struct RollupManagerHandle {
pub struct RollupManagerHandle<N: FullNetwork<Primitives = ScrollNetworkPrimitives>> {
/// The channel used to send commands to the rollup manager.
to_manager_tx: mpsc::Sender<RollupManagerCommand>,
to_manager_tx: mpsc::Sender<RollupManagerCommand<N>>,
}

impl RollupManagerHandle {
impl<N: FullNetwork<Primitives = ScrollNetworkPrimitives>> RollupManagerHandle<N> {
/// Create a new rollup manager handle.
pub const fn new(to_manager_tx: mpsc::Sender<RollupManagerCommand>) -> Self {
pub const fn new(to_manager_tx: mpsc::Sender<RollupManagerCommand<N>>) -> Self {
Self { to_manager_tx }
}

/// Sends a command to the rollup manager.
pub async fn send_command(&self, command: RollupManagerCommand) {
pub async fn send_command(&self, command: RollupManagerCommand<N>) {
let _ = self.to_manager_tx.send(command).await;
}

Expand All @@ -25,6 +28,15 @@ impl RollupManagerHandle {
self.send_command(RollupManagerCommand::BuildBlock).await;
}

/// Sends a command to the rollup manager to get the network handle.
pub async fn get_network_handle(
&self,
) -> Result<ScrollNetworkHandle<N>, oneshot::error::RecvError> {
let (tx, rx) = oneshot::channel();
self.send_command(RollupManagerCommand::NetworkHandle(tx)).await;
rx.await
}

/// Sends a command to the rollup manager to fetch an event listener for the rollup node
/// manager.
pub async fn get_event_listener(
Expand Down
9 changes: 7 additions & 2 deletions crates/manager/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub struct RollupNodeManager<
CS,
> {
/// The handle receiver used to receive commands.
handle_rx: Receiver<RollupManagerCommand>,
handle_rx: Receiver<RollupManagerCommand<N>>,
/// The chain spec used by the rollup node.
chain_spec: Arc<CS>,
/// The network manager that manages the scroll p2p network.
Expand Down Expand Up @@ -164,7 +164,7 @@ where
sequencer: Option<Sequencer<L1MP>>,
signer: Option<SignerHandle>,
block_time: Option<u64>,
) -> (Self, RollupManagerHandle) {
) -> (Self, RollupManagerHandle<N>) {
let (handle_tx, handle_rx) = mpsc::channel(EVENT_CHANNEL_SIZE);
let indexer = Indexer::new(database.clone(), chain_spec.clone());
let derivation_pipeline = DerivationPipeline::new(l1_provider, database);
Expand Down Expand Up @@ -450,6 +450,11 @@ where
RollupManagerCommand::Status(tx) => {
tx.send(this.status()).expect("Failed to send status to handle");
}
RollupManagerCommand::NetworkHandle(tx) => {
let network_handle = this.network.handle();
tx.send(network_handle.clone())
.expect("Failed to send network handle to handle");
}
}
}

Expand Down
15 changes: 11 additions & 4 deletions crates/node/src/add_ons/handle.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,31 @@
use reth_network_api::FullNetwork;
use reth_node_api::FullNodeComponents;
use reth_node_builder::rpc::{RpcHandle, RpcHandleProvider};
use reth_rpc_eth_api::EthApiTypes;
use reth_scroll_node::ScrollNetworkPrimitives;
use rollup_node_manager::RollupManagerHandle;
#[cfg(feature = "test-utils")]
use {rollup_node_watcher::L1Notification, std::sync::Arc, tokio::sync::mpsc::Sender};

/// A handle for scroll addons, which includes handles for the rollup manager and RPC server.
#[derive(Debug, Clone)]
pub struct ScrollAddOnsHandle<Node: FullNodeComponents, EthApi: EthApiTypes> {
pub struct ScrollAddOnsHandle<
Node: FullNodeComponents<Network: FullNetwork<Primitives = ScrollNetworkPrimitives>>,
EthApi: EthApiTypes,
> {
/// The handle used to send commands to the rollup manager.
pub rollup_manager_handle: RollupManagerHandle,
pub rollup_manager_handle: RollupManagerHandle<Node::Network>,
/// The handle used to send commands to the RPC server.
pub rpc_handle: RpcHandle<Node, EthApi>,
/// An optional channel used to send `L1Watcher` notifications to the `RollupNodeManager`.
#[cfg(feature = "test-utils")]
pub l1_watcher_tx: Option<Sender<Arc<L1Notification>>>,
}

impl<Node: FullNodeComponents, EthApi: EthApiTypes> RpcHandleProvider<Node, EthApi>
for ScrollAddOnsHandle<Node, EthApi>
impl<
Node: FullNodeComponents<Network: FullNetwork<Primitives = ScrollNetworkPrimitives>>,
EthApi: EthApiTypes,
> RpcHandleProvider<Node, EthApi> for ScrollAddOnsHandle<Node, EthApi>
{
fn rpc_handle(&self) -> &RpcHandle<Node, EthApi> {
&self.rpc_handle
Expand Down
2 changes: 1 addition & 1 deletion crates/node/src/add_ons/rollup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl RollupManagerAddOn {
self,
ctx: AddOnsContext<'_, N>,
rpc: RpcHandle<N, EthApi>,
) -> eyre::Result<(RollupManagerHandle, Option<Sender<Arc<L1Notification>>>)>
) -> eyre::Result<(RollupManagerHandle<N::Network>, Option<Sender<Arc<L1Notification>>>)>
where
<<N as FullNodeTypes>::Types as NodeTypes>::ChainSpec: ScrollHardforks + IsDevChain,
N::Network: NetworkProtocols + FullNetwork<Primitives = ScrollNetworkPrimitives>,
Expand Down
2 changes: 1 addition & 1 deletion crates/node/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl ScrollRollupNodeConfig {
impl L1MessageProvider,
impl ScrollHardforks + EthChainSpec<Header: BlockHeader> + IsDevChain + Clone + 'static,
>,
RollupManagerHandle,
RollupManagerHandle<N>,
Option<Sender<Arc<L1Notification>>>,
)> {
// Instantiate the network manager
Expand Down
101 changes: 96 additions & 5 deletions crates/node/tests/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
use alloy_eips::BlockNumberOrTag;
use alloy_primitives::{address, b256, Address, Bytes, Signature, B256, U256};
use futures::StreamExt;
use reth_network::{NetworkConfigBuilder, PeersInfo};
use reth_network::{NetworkConfigBuilder, Peers, PeersInfo};
use reth_rpc_api::EthApiServer;
use reth_scroll_chainspec::SCROLL_DEV;
use reth_scroll_node::ScrollNetworkPrimitives;
use reth_scroll_primitives::ScrollBlock;
use reth_tokio_util::EventStream;
use rollup_node::{
test_utils::{
Expand All @@ -16,16 +17,19 @@ use rollup_node::{
BeaconProviderArgs, DatabaseArgs, EngineDriverArgs, GasPriceOracleArgs, L1ProviderArgs,
NetworkArgs as ScrollNetworkArgs, ScrollRollupNodeConfig, SequencerArgs,
};
use rollup_node_manager::{RollupManagerCommand, RollupManagerEvent, RollupManagerHandle};
use rollup_node_manager::{RollupManagerCommand, RollupManagerEvent};
use rollup_node_primitives::BatchCommitData;
use rollup_node_providers::BlobSource;
use rollup_node_sequencer::L1MessageInclusionMode;
use rollup_node_watcher::L1Notification;
use scroll_alloy_consensus::TxL1Message;
use scroll_network::{NewBlockWithPeer, SCROLL_MAINNET};
use scroll_wire::{ScrollWireConfig, ScrollWireProtocolHandler};
use std::{path::PathBuf, sync::Arc};
use tokio::sync::{oneshot, Mutex};
use std::{path::PathBuf, sync::Arc, time::Duration};
use tokio::{
sync::{oneshot, Mutex},
time,
};
use tracing::trace;

#[tokio::test]
Expand Down Expand Up @@ -57,7 +61,7 @@ async fn can_bridge_l1_messages() -> eyre::Result<()> {
let (mut nodes, _tasks, _wallet) = setup_engine(node_args, 1, chain_spec, false, false).await?;
let node = nodes.pop().unwrap();

let rnm_handle: RollupManagerHandle = node.inner.add_ons_handle.rollup_manager_handle.clone();
let rnm_handle = node.inner.add_ons_handle.rollup_manager_handle.clone();
let mut rnm_events = rnm_handle.get_event_listener().await?;
let l1_watcher_tx = node.inner.add_ons_handle.l1_watcher_tx.clone().unwrap();

Expand Down Expand Up @@ -164,6 +168,93 @@ async fn can_sequence_and_gossip_blocks() {
}
}

#[tokio::test]
async fn can_penalize_peer_for_invalid_block() {
reth_tracing::init_test_tracing();

// create 2 nodes
let chain_spec = (*SCROLL_DEV).clone();
let rollup_manager_args = ScrollRollupNodeConfig {
test: true,
network_args: ScrollNetworkArgs {
enable_eth_scroll_wire_bridge: true,
enable_scroll_wire: true,
sequencer_url: None,
},
database_args: DatabaseArgs { path: Some(PathBuf::from("sqlite::memory:")) },
l1_provider_args: L1ProviderArgs::default(),
engine_driver_args: EngineDriverArgs::default(),
sequencer_args: SequencerArgs {
sequencer_enabled: true,
block_time: 0,
max_l1_messages_per_block: 4,
l1_message_inclusion_mode: L1MessageInclusionMode::BlockDepth(0),
payload_building_duration: 1000,
..SequencerArgs::default()
},
beacon_provider_args: BeaconProviderArgs {
blob_source: BlobSource::Mock,
..Default::default()
},
signer_args: Default::default(),
gas_price_oracle_args: GasPriceOracleArgs::default(),
};

let (nodes, _tasks, _) =
setup_engine(rollup_manager_args, 2, chain_spec, false, false).await.unwrap();

let node0_rmn_handle = nodes[0].inner.add_ons_handle.rollup_manager_handle.clone();
let node0_network_handle = node0_rmn_handle.get_network_handle().await.unwrap();
let node0_id = node0_network_handle.inner().peer_id();

let node1_rnm_handle = nodes[1].inner.add_ons_handle.rollup_manager_handle.clone();
let node1_network_handle = node1_rnm_handle.get_network_handle().await.unwrap();
Comment on lines +206 to +211
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be mistaking but isn't it possible to use nodes[i].inner.network?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checked and we could just use nodes[i].inner.network, just need to update the announce_block method to:

    let block = NewBlock {
        block: ScrollBlock {
            header: Header {
                extra_data: Signature::new(U256::ONE, U256::ONE, false).as_bytes().into(),
                ..Default::default()
            },
            ..Default::default()
        },
        ..Default::default()
    };

Copy link
Collaborator

@frisitano frisitano Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pattern that @greged93 described will only gossip over the eth-wire protocol not the scroll-wire protocol.


// get initial reputation of node0 from pov of node1
let initial_reputation =
node1_network_handle.inner().reputation_by_id(*node0_id).await.unwrap().unwrap();
assert_eq!(initial_reputation, 0);

// create invalid block
let block = ScrollBlock::default();

// send invalid block from node0 to node1. We don't care about the signature here since we use a
// NoopConsensus in the test.
node0_network_handle.announce_block(block, Signature::new(U256::from(1), U256::from(1), false));

eventually(
Duration::from_secs(100),
Duration::from_millis(10000),
"Peer0 reputation should be lower after sending invalid block",
|| async {
// check that the node0 is penalized on node1
let slashed_reputation =
node1_network_handle.inner().reputation_by_id(*node0_id).await.unwrap().unwrap();
slashed_reputation < initial_reputation
},
)
.await;
}

/// Helper function to wait until a predicate is true or a timeout occurs.
pub async fn eventually<F, Fut>(timeout: Duration, tick: Duration, message: &str, mut predicate: F)
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = bool>,
{
let mut interval = time::interval(tick);
let start = time::Instant::now();
loop {
if predicate().await {
return;
}

assert!(start.elapsed() <= timeout, "Timeout while waiting for condition: {message}");

interval.tick().await;
}
}

#[allow(clippy::large_stack_frames)]
#[tokio::test]
async fn can_sequence_and_gossip_transactions() {
Expand Down