diff --git a/code/crates/app-channel/src/run.rs b/code/crates/app-channel/src/run.rs index 5380e36dc..6ec35b2d3 100644 --- a/code/crates/app-channel/src/run.rs +++ b/code/crates/app-channel/src/run.rs @@ -3,10 +3,6 @@ use eyre::Result; -use malachitebft_engine::consensus::{ConsensusMsg, ConsensusRef}; -use malachitebft_engine::util::events::TxEvent; -use tokio::sync::mpsc::Receiver; - use crate::app::metrics::{Metrics, SharedRegistry}; use crate::app::node::{self, EngineHandle, NodeConfig}; use crate::app::spawn::{ @@ -17,6 +13,10 @@ use crate::app::types::core::Context; use crate::msgs::ConsensusRequest; use crate::spawn::{spawn_host_actor, spawn_network_actor}; use crate::Channels; +use malachitebft_app::types::sync; +use malachitebft_engine::consensus::{ConsensusMsg, ConsensusRef}; +use malachitebft_engine::util::events::TxEvent; +use tokio::sync::mpsc::Receiver; pub async fn start_engine( ctx: Ctx, @@ -33,6 +33,7 @@ where WalCodec: codec::WalCodec + Clone, NetCodec: codec::ConsensusCodec, NetCodec: codec::SyncCodec, + NetCodec: codec::HasEncodedLen>, { let start_height = start_height.unwrap_or_default(); diff --git a/code/crates/app-channel/src/spawn.rs b/code/crates/app-channel/src/spawn.rs index 9d30213ac..4d59e692a 100644 --- a/code/crates/app-channel/src/spawn.rs +++ b/code/crates/app-channel/src/spawn.rs @@ -1,13 +1,5 @@ //! Utility functions for spawning the actor system and connecting it to the application. -use eyre::Result; -use tokio::sync::mpsc; - -use malachitebft_engine::consensus::ConsensusCodec; -use malachitebft_engine::host::HostRef; -use malachitebft_engine::network::NetworkRef; -use malachitebft_engine::sync::SyncCodec; - use crate::app; use crate::app::config::ConsensusConfig; use crate::app::metrics::Metrics; @@ -16,6 +8,14 @@ use crate::app::types::core::Context; use crate::app::types::Keypair; use crate::connector::Connector; use crate::{AppMsg, NetworkMsg}; +use eyre::Result; +use malachitebft_app::types::codec::HasEncodedLen; +use malachitebft_app::types::sync; +use malachitebft_engine::consensus::ConsensusCodec; +use malachitebft_engine::host::HostRef; +use malachitebft_engine::network::NetworkRef; +use malachitebft_engine::sync::SyncCodec; +use tokio::sync::mpsc; pub async fn spawn_host_actor( metrics: Metrics, @@ -38,6 +38,7 @@ where Ctx: Context, Codec: ConsensusCodec, Codec: SyncCodec, + Codec: HasEncodedLen>, { let (tx, mut rx) = mpsc::channel::>(1); diff --git a/code/crates/app/src/spawn.rs b/code/crates/app/src/spawn.rs index 4f62a36da..667bd0764 100644 --- a/code/crates/app/src/spawn.rs +++ b/code/crates/app/src/spawn.rs @@ -4,9 +4,7 @@ use std::path::Path; use std::time::Duration; use eyre::Result; -use tokio::task::JoinHandle; -use tracing::Span; - +use malachitebft_codec::HasEncodedLen; use malachitebft_engine::consensus::{Consensus, ConsensusCodec, ConsensusParams, ConsensusRef}; use malachitebft_engine::host::HostRef; use malachitebft_engine::network::{Network, NetworkRef}; @@ -18,6 +16,8 @@ use malachitebft_network::{ ChannelNames, Config as NetworkConfig, DiscoveryConfig, GossipSubConfig, Keypair, }; use malachitebft_sync as sync; +use tokio::task::JoinHandle; +use tracing::Span; use crate::config::{ConsensusConfig, PubSubProtocol, ValueSyncConfig}; use crate::metrics::{Metrics, SharedRegistry}; @@ -60,6 +60,7 @@ where Ctx: Context, Codec: ConsensusCodec, Codec: SyncCodec, + Codec: HasEncodedLen>, { let config = make_gossip_config(cfg); diff --git a/code/crates/app/src/types.rs b/code/crates/app/src/types.rs index 4fff74cd7..5a91264b2 100644 --- a/code/crates/app/src/types.rs +++ b/code/crates/app/src/types.rs @@ -22,6 +22,7 @@ pub mod sync { pub mod codec { pub use malachitebft_codec::Codec; + pub use malachitebft_codec::HasEncodedLen; pub use malachitebft_engine::consensus::ConsensusCodec; pub use malachitebft_engine::sync::SyncCodec; pub use malachitebft_engine::wal::WalCodec; diff --git a/code/crates/codec/src/lib.rs b/code/crates/codec/src/lib.rs index 68f1c6fb3..a3e5770ac 100644 --- a/code/crates/codec/src/lib.rs +++ b/code/crates/codec/src/lib.rs @@ -8,3 +8,8 @@ pub trait Codec: Send + Sync + 'static { fn decode(&self, bytes: Bytes) -> Result; fn encode(&self, msg: &T) -> Result; } + +/// Codec extension trait for types that can also compute the length of the encoded data. +pub trait HasEncodedLen: Codec { + fn encoded_len(&self, msg: &T) -> Result>::Error>; +} diff --git a/code/crates/core-consensus/src/util/bounded_queue.rs b/code/crates/core-consensus/src/util/bounded_queue.rs index 46b089f96..581ef0402 100644 --- a/code/crates/core-consensus/src/util/bounded_queue.rs +++ b/code/crates/core-consensus/src/util/bounded_queue.rs @@ -1,4 +1,5 @@ use std::collections::BTreeMap; +use tracing::warn; /// A data structure that maintains a queue of values associated with monotonically increasing indices. /// @@ -46,6 +47,8 @@ where if &index < max_index { let max_index = max_index.clone(); + warn!("Bounded queue is full, dropping value"); + // Remove the highest index self.queue.remove(&max_index); @@ -56,6 +59,7 @@ where } } + warn!("Bounded queue is full, no value is inserted"); false } diff --git a/code/crates/engine/src/consensus.rs b/code/crates/engine/src/consensus.rs index 87add4925..afdd8fbea 100644 --- a/code/crates/engine/src/consensus.rs +++ b/code/crates/engine/src/consensus.rs @@ -506,8 +506,20 @@ where // Process values sequentially starting from the lowest height let mut height = start_height; for value in values.iter() { - self.process_sync_response(&myself, state, peer, height, value) - .await?; + if let Err(e) = self + .process_sync_response(&myself, state, peer, height, value) + .await + { + // At this point, `process_sync_response` has already sent a message + // about an invalid value, etc. to the sync actor. The sync actor + // will then, re-request this range again from some peer. + // Because of this, in case of failing to process the response, we need + // to exit early this loop to avoid issuing multiple parallel requests + // for the same range of values. There's also no benefit in processing + // the rest of the values. + error!(%start_height, %height, %request_id, "Failed to process sync response:{e:?}"); + break; + } height = height.increment(); } @@ -650,22 +662,20 @@ where where Ctx: Context, { - if let Err(e) = self - .process_input( - myself, - state, - ConsensusInput::SyncValueResponse(CoreValueResponse::new( - peer, - value.value_bytes.clone(), - value.certificate.clone(), - )), - ) - .await - { - error!(%height, "Error when processing received synced block: {e}"); - } - - Ok(()) + self.process_input( + myself, + state, + ConsensusInput::SyncValueResponse(CoreValueResponse::new( + peer, + value.value_bytes.clone(), + value.certificate.clone(), + )), + ) + .await + .map_err(|e| { + error!(%height, error = ?e, "Error when processing received synced block"); + e.into() + }) } async fn timeout_elapsed( diff --git a/code/crates/engine/src/network.rs b/code/crates/engine/src/network.rs index 254bd3d7d..f75dbfe20 100644 --- a/code/crates/engine/src/network.rs +++ b/code/crates/engine/src/network.rs @@ -1,12 +1,11 @@ -use std::collections::{BTreeSet, HashMap}; -use std::marker::PhantomData; - use async_trait::async_trait; use derive_where::derive_where; use eyre::eyre; use libp2p::identity::Keypair; use libp2p::request_response; use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort}; +use std::collections::{BTreeSet, HashMap}; +use std::marker::PhantomData; use tokio::task::JoinHandle; use tracing::{error, trace}; @@ -71,6 +70,7 @@ where Ctx: Context, Codec: ConsensusCodec, Codec: SyncCodec, + Codec: codec::HasEncodedLen>, { pub async fn spawn( keypair: Keypair, @@ -167,6 +167,9 @@ pub enum Msg { /// Send a response for a request to a peer OutgoingResponse(InboundRequestId, Response), + /// Request the total number of bytes that are to be transmitted for this response + GetResponseSize(Response, RpcReplyPort), + /// Request for number of peers from gossip GetState { reply: RpcReplyPort }, @@ -187,6 +190,7 @@ where Codec: codec::Codec>, Codec: codec::Codec>, Codec: codec::Codec>, + Codec: codec::HasEncodedLen>, { type Msg = Msg; type State = State; @@ -328,6 +332,20 @@ where }; } + Msg::GetResponseSize(response, reply_to) => { + let encoded_len = self.codec.encoded_len(&response); + + match encoded_len { + Ok(size) => { + reply_to.send(size)?; + } + Err(e) => { + error!(?response, "Failed to encode response message: {e:?}"); + return Ok(()); + } + }; + } + Msg::NewEvent(Event::Listening(addr)) => { listen_addrs.push(addr.clone()); output_port.send(NetworkEvent::Listening(addr)); diff --git a/code/crates/engine/src/sync.rs b/code/crates/engine/src/sync.rs index f17937e7f..1ae897a0a 100644 --- a/code/crates/engine/src/sync.rs +++ b/code/crates/engine/src/sync.rs @@ -12,19 +12,19 @@ use rand::SeedableRng; use tokio::task::JoinHandle; use tracing::{debug, error, info, warn, Instrument}; +use crate::host::{HostMsg, HostRef}; +use crate::network::{NetworkEvent, NetworkMsg, NetworkRef, Status}; +use crate::util::ticker::ticker; +use crate::util::timers::{TimeoutElapsed, TimerScheduler}; use malachitebft_codec as codec; use malachitebft_core_consensus::PeerId; use malachitebft_core_types::{CommitCertificate, Context, Height}; +use malachitebft_sync::Response::ValueResponse; use malachitebft_sync::{ self as sync, HeightStartType, InboundRequestId, OutboundRequestId, RawDecidedValue, Request, Response, Resumable, }; -use crate::host::{HostMsg, HostRef}; -use crate::network::{NetworkEvent, NetworkMsg, NetworkRef, Status}; -use crate::util::ticker::ticker; -use crate::util::timers::{TimeoutElapsed, TimerScheduler}; - /// Codec for sync protocol messages /// /// This trait is automatically implemented for any type that implements: @@ -285,6 +285,8 @@ where Effect::GetDecidedValues(request_id, range, r) => { let mut values = Vec::new(); let mut height = *range.start(); + + let mut response_size_bytes = 0; while height <= *range.end() { let value = self .host @@ -296,7 +298,43 @@ where .success_or(eyre!("Failed to get decided value for height {height}"))?; if let Some(value) = value { + let value_response = ValueResponse(sync::ValueResponse::new( + *range.start(), + vec![value.clone()], + )); + + let result = ractor::call!(self.gossip, move |reply_to| { + NetworkMsg::GetResponseSize(value_response.clone(), reply_to) + }); + + let total_value_size_bytes = match result { + Ok(value_in_bytes) => value_in_bytes, + Err(e) => { + error!("Failed to get response size for value, stopping at for height {}: {:?}", height, e); + break; + } + }; + + // check if adding this value would exceed the max-response limit + if response_size_bytes + total_value_size_bytes + > self.sync_config.max_response_size + { + warn!("Maximum byte size limit ({} bytes) would be exceeded (current: {} + upcoming value: {}), stopping at height {}", + self.sync_config.max_response_size, response_size_bytes, total_value_size_bytes, height); + break; + } + + response_size_bytes += total_value_size_bytes; values.push(value); + + if response_size_bytes == self.sync_config.max_response_size { + info!( + "Reached maximum byte size limit ({} bytes) exactly at height {}", + self.sync_config.max_response_size, height + ); + + break; + } } else { warn!("Decided value not found for height {height}"); break; diff --git a/code/crates/starknet/host/src/codec.rs b/code/crates/starknet/host/src/codec.rs index 7741a2677..af678b5f9 100644 --- a/code/crates/starknet/host/src/codec.rs +++ b/code/crates/starknet/host/src/codec.rs @@ -1,7 +1,7 @@ use bytes::Bytes; use prost::Message; -use malachitebft_codec::Codec; +use malachitebft_codec::{Codec, HasEncodedLen}; use malachitebft_core_consensus::{LivenessMsg, PeerId, ProposedValue, SignedConsensusMsg}; use malachitebft_core_types::{ CommitCertificate, CommitSignature, NilOrVal, PolkaCertificate, PolkaSignature, Round, @@ -222,6 +222,15 @@ impl Codec> for ProtobufCodec { } } +impl HasEncodedLen> for ProtobufCodec { + fn encoded_len( + &self, + response: &sync::Response, + ) -> Result>>::Error> { + Ok(encode_sync_response(response)?.encoded_len()) + } +} + pub fn decode_sync_response( proto_response: proto::sync::SyncResponse, ) -> Result, ProtoError> { diff --git a/code/crates/starknet/test/src/lib.rs b/code/crates/starknet/test/src/lib.rs index e7948cd68..53b7d424a 100644 --- a/code/crates/starknet/test/src/lib.rs +++ b/code/crates/starknet/test/src/lib.rs @@ -233,6 +233,7 @@ fn apply_params(config: &mut Config, params: &TestParams) { config.value_sync.enabled = params.enable_value_sync; config.value_sync.parallel_requests = params.parallel_requests; config.value_sync.batch_size = params.batch_size; + config.value_sync.max_response_size = params.max_response_size; config.consensus.p2p.protocol = params.protocol; config.test.max_block_size = params.block_size; config.test.txs_per_part = params.txs_per_part; diff --git a/code/crates/sync/src/handle.rs b/code/crates/sync/src/handle.rs index c09a83b5a..151487f56 100644 --- a/code/crates/sync/src/handle.rs +++ b/code/crates/sync/src/handle.rs @@ -306,14 +306,44 @@ where // If the response contains a prefix of the requested values, re-request the remaining values. if let Some((requested_range, stored_peer_id)) = state.pending_requests.get(&request_id) { if stored_peer_id != &peer_id { - warn!( + // Defensive check: This should never happen because this check is already performed in + // the handler of `Input::ValueResponse`. + error!( %request_id, peer.actual = %peer_id, peer.expected = %stored_peer_id, "Received response from different peer than expected" ); + return on_invalid_value_response(co, state, metrics, request_id, peer_id).await; } + let range_len = requested_range.end().as_u64() - requested_range.start().as_u64() + 1; - if (response.values.len() as u64) < range_len { - re_request_values_from_peer_except(co, state, metrics, request_id, None).await?; + + if response.values.len() < range_len as usize { + // NOTE: We cannot simply call `re_request_values_from_peer_except` here. + // Although we received some values from the peer, these values have not yet been processed + // by the consensus engine. If we called `re_request_values_from_peer_except`, we would + // end up re-requesting the entire original range (including values we already received), + // causing the syncing peer to repeatedly send multiple requests until the already-received + // values are fully processed. + // To tackle this, we first update the current pending request with the range of values + // it provides we received, and then issue a new request with the remaining values. + let new_start = requested_range + .start() + .increment_by(response.values.len() as u64); + + let end = *requested_range.end(); + + if response.values.is_empty() { + error!(%request_id, %peer_id, "Received response contains no values"); + } else { + // The response of this request only provided `response.values.len()` values, + // so update the pending request accordingly + let updated_range = *requested_range.start()..=new_start.decrement().unwrap(); + state.update_request(request_id, peer_id, updated_range); + } + + // Issue a new request to any peer, not necessarily the same one, for the remaining values + let new_range = new_start..=end; + request_values_range(co, state, metrics, new_range).await?; } } @@ -360,7 +390,7 @@ where // Validate response from host let batch_size = end.as_u64() - start.as_u64() + 1; if batch_size != values.len() as u64 { - error!( + warn!( "Received {} values from host, expected {batch_size}", values.len() ) @@ -483,7 +513,7 @@ async fn request_values( where Ctx: Context, { - let max_parallel_requests = max(1, state.config.parallel_requests); + let max_parallel_requests = state.max_parallel_requests(); if state.pending_requests.len() as u64 >= max_parallel_requests { info!( @@ -515,6 +545,43 @@ where Ok(()) } +/// Request values for this specific range from a peer. +/// Should only be used when re-requesting a partial range of values from a peer. +async fn request_values_range( + co: Co, + state: &mut State, + metrics: &Metrics, + range: RangeInclusive, +) -> Result<(), Error> +where + Ctx: Context, +{ + // NOTE: We do not perform a `max_parallel_requests` check and return here in contrast to what is done, for + // example, in `request_values`. This is because `request_values_range` is only called for retrieving + // partial responses, which means the original request is not on the wire anymore. Nevertheless, + // we log here because seeing this log frequently implies that we keep getting partial responses + // from peers and hints to potential reconfiguration. + let max_parallel_requests = state.max_parallel_requests(); + if state.pending_requests.len() as u64 >= max_parallel_requests { + info!( + %max_parallel_requests, + pending_requests = %state.pending_requests.len(), + "Maximum number of pending requests reached when re-requesting a partial range of values" + ); + }; + + // Get a random peer that can provide the values in the range. + let Some((peer, range)) = state.random_peer_with(&range) else { + // No connected peer reached this height yet, we can stop syncing here. + debug!(range = %DisplayRange::(&range), "No peer to request sync from"); + return Ok(()); + }; + + request_values_from_peer(&co, state, metrics, range, peer).await?; + + Ok(()) +} + async fn request_values_from_peer( co: &Co, state: &mut State, diff --git a/code/crates/sync/src/state.rs b/code/crates/sync/src/state.rs index 3bd2590a5..5f7278bcb 100644 --- a/code/crates/sync/src/state.rs +++ b/code/crates/sync/src/state.rs @@ -63,10 +63,25 @@ where } } + /// The maximum number of parallel requests that can be made to peers. + /// If the configuration is set to 0, it defaults to 1. + pub fn max_parallel_requests(&self) -> u64 { + max(1, self.config.parallel_requests) + } + pub fn update_status(&mut self, status: Status) { self.peers.insert(status.peer_id, status); } + pub fn update_request( + &mut self, + request_id: OutboundRequestId, + peer_id: PeerId, + range: RangeInclusive, + ) { + self.pending_requests.insert(request_id, (range, peer_id)); + } + /// Filter peers to only include those that can provide the given range of values, or at least a prefix of the range. /// /// If there is no peer with all requested values, select a peer that has a tip at or above the start of the range. diff --git a/code/crates/test/Cargo.toml b/code/crates/test/Cargo.toml index 7ae7d241a..01d1c691a 100644 --- a/code/crates/test/Cargo.toml +++ b/code/crates/test/Cargo.toml @@ -31,6 +31,7 @@ hex = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } rand = { workspace = true } +tracing = { workspace = true } serde = { workspace = true, features = ["derive", "rc"] } serde_json = { workspace = true } sha3 = { workspace = true } diff --git a/code/crates/test/framework/src/params.rs b/code/crates/test/framework/src/params.rs index d317d78b6..e61ac0eae 100644 --- a/code/crates/test/framework/src/params.rs +++ b/code/crates/test/framework/src/params.rs @@ -9,6 +9,7 @@ pub struct TestParams { pub parallel_requests: usize, pub batch_size: usize, pub protocol: PubSubProtocol, + pub rpc_max_size: ByteSize, pub block_size: ByteSize, pub tx_size: ByteSize, pub txs_per_part: usize, @@ -16,6 +17,7 @@ pub struct TestParams { pub value_payload: ValuePayload, pub max_retain_blocks: usize, pub stable_block_times: bool, + pub max_response_size: ByteSize, } impl Default for TestParams { @@ -25,6 +27,7 @@ impl Default for TestParams { parallel_requests: 1, batch_size: 1, protocol: PubSubProtocol::default(), + rpc_max_size: ByteSize::mib(2), block_size: ByteSize::mib(1), tx_size: ByteSize::kib(1), txs_per_part: 256, @@ -32,6 +35,7 @@ impl Default for TestParams { value_payload: ValuePayload::ProposalAndParts, max_retain_blocks: 50, stable_block_times: true, + max_response_size: ByteSize::mib(1), } } } @@ -41,7 +45,9 @@ impl TestParams { config.value_sync.enabled = self.enable_value_sync; config.value_sync.parallel_requests = self.parallel_requests; config.value_sync.batch_size = self.batch_size; + config.value_sync.max_response_size = self.max_response_size; config.consensus.p2p.protocol = self.protocol; + config.consensus.p2p.rpc_max_size = self.rpc_max_size; config.consensus.value_payload = self.value_payload; config.test.max_block_size = self.block_size; config.test.txs_per_part = self.txs_per_part; diff --git a/code/crates/test/src/codec/json/mod.rs b/code/crates/test/src/codec/json/mod.rs index 096ea9c78..bfe02f719 100644 --- a/code/crates/test/src/codec/json/mod.rs +++ b/code/crates/test/src/codec/json/mod.rs @@ -1,7 +1,8 @@ pub mod raw; use bytes::Bytes; -use malachitebft_codec::Codec; +use malachitebft_codec::{Codec, HasEncodedLen}; +use tracing::warn; use malachitebft_core_consensus::{LivenessMsg, SignedConsensusMsg}; use malachitebft_engine::util::streaming::StreamMessage; @@ -100,6 +101,17 @@ impl Codec> for JsonCodec { } } +impl HasEncodedLen> for JsonCodec { + fn encoded_len( + &self, + msg: &Response, + ) -> Result>>::Error> { + warn!("encoded_len serializes the data to compute the length; consider using ProtobufCodec if \ + you want to compute the length of the encoded data without encoding them"); + serde_json::to_vec(&RawResponse::from(msg.clone())).map(|b| b.len()) + } +} + impl Codec> for JsonCodec { type Error = serde_json::Error; diff --git a/code/crates/test/src/codec/proto/mod.rs b/code/crates/test/src/codec/proto/mod.rs index 4d7ebc68d..2bcf67038 100644 --- a/code/crates/test/src/codec/proto/mod.rs +++ b/code/crates/test/src/codec/proto/mod.rs @@ -2,7 +2,7 @@ use bytes::Bytes; use prost::Message; use malachitebft_app::engine::util::streaming::{StreamContent, StreamId, StreamMessage}; -use malachitebft_codec::Codec; +use malachitebft_codec::{Codec, HasEncodedLen}; use malachitebft_core_consensus::{LivenessMsg, ProposedValue, SignedConsensusMsg}; use malachitebft_core_types::{ CommitCertificate, CommitSignature, NilOrVal, PolkaCertificate, PolkaSignature, Round, @@ -394,6 +394,18 @@ impl Codec> for ProtobufCodec { } } +impl HasEncodedLen> for ProtobufCodec { + fn encoded_len( + &self, + response: &sync::Response, + ) -> Result>>::Error> { + // The main cost of encoding "big" responses stems from calling `encode_to_vec` (see `encode` method) + // on `proto::SyncResponse`. We do NOT do this here. We simply call `encoded_len` on the `proto::SyncResponse` + // to quickly retrieve the length of the encoded data without first encoding the response. + Ok(encode_sync_response(response)?.encoded_len()) + } +} + pub fn decode_sync_response( proto_response: proto::SyncResponse, ) -> Result, ProtoError> { diff --git a/code/crates/test/tests/it/value_sync.rs b/code/crates/test/tests/it/value_sync.rs index b41e96576..bdf00a1a8 100644 --- a/code/crates/test/tests/it/value_sync.rs +++ b/code/crates/test/tests/it/value_sync.rs @@ -1,13 +1,13 @@ -use std::sync::atomic::{AtomicBool, Ordering}; -use std::time::Duration; - use crate::{TestBuilder, TestParams}; +use bytesize::ByteSize; use eyre::bail; use informalsystems_malachitebft_test::middleware::{Middleware, RotateEpochValidators}; use informalsystems_malachitebft_test::TestContext; use malachitebft_config::ValuePayload; use malachitebft_core_consensus::ProposedValue; use malachitebft_core_types::CommitCertificate; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; pub async fn crash_restart_from_start(params: TestParams) { const HEIGHT: u64 = 6; @@ -416,3 +416,58 @@ pub async fn reset_height() { ) .await } + +#[tokio::test] +pub async fn response_size_limit_exceeded() { + const HEIGHT: u64 = 5; + + let mut test = TestBuilder::<()>::new(); + + test.add_node() + .with_voting_power(10) + .start() + .wait_until(HEIGHT) + .success(); + + test.add_node() + .with_voting_power(10) + .start() + .wait_until(HEIGHT) + .success(); + + // Node 3 starts with 5 voting power, in parallel with node 1 and 2. + test.add_node() + .with_voting_power(5) + .start() + // Wait until the node reaches height 2... + .wait_until(2) + // ...and then kills it + .crash() + // Reset the database so that the node has to do Sync from height 1 + .reset_db() + // After that, it waits 5 seconds before restarting the node + .restart_after(Duration::from_secs(5)) + // Wait until the node reached the expected height + .wait_until(HEIGHT) + // Record a successful test for this node + .success(); + + test.build() + .run_with_params( + Duration::from_secs(60), + TestParams { + enable_value_sync: true, + // Values are around ~900 bytes, so this `max_response_size` in combination + // with a `batch_size` of 2 leads to having a syncing peer sending partial responses. + max_response_size: ByteSize::b(1000), + // Values are around ~900 bytes, so we canNOT have more than one value in a response. + // In other words, if `max_response_size` is not respected, node 3 would not have been + // able to sync in this test. + rpc_max_size: ByteSize::b(1000), + batch_size: 2, + parallel_requests: 1, + ..Default::default() + }, + ) + .await +}