From 654cf15833a1e38852a887861112fd6de3e841aa Mon Sep 17 00:00:00 2001 From: insumity Date: Tue, 12 Aug 2025 11:55:50 +0200 Subject: [PATCH 01/18] prevent sending a total of values that exceed the max response size --- code/crates/engine/src/sync.rs | 27 ++++++++++++++++++++++++ code/crates/starknet/test/src/lib.rs | 1 + code/crates/sync/src/handle.rs | 26 +++++++++++++++++++++-- code/crates/test/framework/src/params.rs | 3 +++ 4 files changed, 55 insertions(+), 2 deletions(-) diff --git a/code/crates/engine/src/sync.rs b/code/crates/engine/src/sync.rs index f17937e7f..6ead08a7f 100644 --- a/code/crates/engine/src/sync.rs +++ b/code/crates/engine/src/sync.rs @@ -283,8 +283,20 @@ where } Effect::GetDecidedValues(request_id, range, r) => { + // For simplicity, to avoid asking the application for the exact size of addresses, signatures, etc. + // we assume some maximum byte sizes for an address and a signature in order to calculate + // an **approximate** total size per value, so that we only send at most `self.sync_config.max_response_size` + // to another peer. + const MAX_BYTES_PER_ADDRESS: usize = 32; + const MAX_BYTES_PER_SIGNATURE: usize = 100; + + // a `CommitSignature` consists of an address and a signature + const MAX_BYTES_PER_COMMIT_SIGNATURE: usize = MAX_BYTES_PER_ADDRESS + MAX_BYTES_PER_SIGNATURE; + let mut values = Vec::new(); let mut height = *range.start(); + + let mut response_size_bytes = 0; while height <= *range.end() { let value = self .host @@ -296,6 +308,21 @@ where .success_or(eyre!("Failed to get decided value for height {height}"))?; if let Some(value) = value { + let value_size_bytes = value.value_bytes.len(); + + let num_commit_signature = value.certificate.commit_signatures.len(); + let certificate_size_estimate = num_commit_signature * MAX_BYTES_PER_COMMIT_SIGNATURE; + + let total_value_size_bytes = value_size_bytes + certificate_size_estimate; + + // check if adding this value would exceed the max-response limit + if response_size_bytes + total_value_size_bytes > self.sync_config.max_response_size { + warn!("Maximum byte size limit ({} bytes) would be exceeded (current: {}, value + certificate estimate: {}), stopping at height {}", + self.sync_config.max_response_size, response_size_bytes, total_value_size_bytes, height); + break; + } + + response_size_bytes += total_value_size_bytes; values.push(value); } else { warn!("Decided value not found for height {height}"); diff --git a/code/crates/starknet/test/src/lib.rs b/code/crates/starknet/test/src/lib.rs index e7948cd68..53b7d424a 100644 --- a/code/crates/starknet/test/src/lib.rs +++ b/code/crates/starknet/test/src/lib.rs @@ -233,6 +233,7 @@ fn apply_params(config: &mut Config, params: &TestParams) { config.value_sync.enabled = params.enable_value_sync; config.value_sync.parallel_requests = params.parallel_requests; config.value_sync.batch_size = params.batch_size; + config.value_sync.max_response_size = params.max_response_size; config.consensus.p2p.protocol = params.protocol; config.test.max_block_size = params.block_size; config.test.txs_per_part = params.txs_per_part; diff --git a/code/crates/sync/src/handle.rs b/code/crates/sync/src/handle.rs index c09a83b5a..51e2e1466 100644 --- a/code/crates/sync/src/handle.rs +++ b/code/crates/sync/src/handle.rs @@ -311,9 +311,31 @@ where "Received response from different peer than expected" ); } + let range_len = requested_range.end().as_u64() - requested_range.start().as_u64() + 1; if (response.values.len() as u64) < range_len { - re_request_values_from_peer_except(co, state, metrics, request_id, None).await?; + // **NOTE** We cannot simply call `re_request_values_from_peer_except` here. + // Although we received some values from the peer, these values have not yet been processed + // by the consensus engine. If we called `re_request_values_from_peer_except`, we would + // end up re-requesting the entire original range (including values we already received), + // causing the syncing peer to repeatedly send multiple requests until the already-received + // values are fully processed. + // To tackle this, we first update the current pending request with the range of values + // it provides we received, and then issue a new request with the remaining values. + let new_start = requested_range.start().increment_by(response.values.len() as u64); + let end = *requested_range.end(); + + if response.values.is_empty() { + error!(%request_id, %peer_id, "Received response contains no values"); + } else { + // the response of this request only provided `response.values.len()` values, so update the pending request accordingly + let updated_range = *requested_range.start()..=new_start.decrement().unwrap(); + state.pending_requests.insert(request_id.clone(), (updated_range, peer_id)); + } + + // issue a new request to the same peer for the remaining values + let new_range = new_start..=end; + request_values_from_peer(&co, state, metrics, new_range, peer_id).await?; } } @@ -360,7 +382,7 @@ where // Validate response from host let batch_size = end.as_u64() - start.as_u64() + 1; if batch_size != values.len() as u64 { - error!( + warn!( "Received {} values from host, expected {batch_size}", values.len() ) diff --git a/code/crates/test/framework/src/params.rs b/code/crates/test/framework/src/params.rs index d317d78b6..19e847fe8 100644 --- a/code/crates/test/framework/src/params.rs +++ b/code/crates/test/framework/src/params.rs @@ -16,6 +16,7 @@ pub struct TestParams { pub value_payload: ValuePayload, pub max_retain_blocks: usize, pub stable_block_times: bool, + pub max_response_size: ByteSize, } impl Default for TestParams { @@ -32,6 +33,7 @@ impl Default for TestParams { value_payload: ValuePayload::ProposalAndParts, max_retain_blocks: 50, stable_block_times: true, + max_response_size: ByteSize::mib(1), } } } @@ -41,6 +43,7 @@ impl TestParams { config.value_sync.enabled = self.enable_value_sync; config.value_sync.parallel_requests = self.parallel_requests; config.value_sync.batch_size = self.batch_size; + config.value_sync.max_response_size = self.max_response_size; config.consensus.p2p.protocol = self.protocol; config.consensus.value_payload = self.value_payload; config.test.max_block_size = self.block_size; From f4be612b9fb642cba22c32e57697be5f59d25741 Mon Sep 17 00:00:00 2001 From: insumity Date: Tue, 12 Aug 2025 12:12:55 +0200 Subject: [PATCH 02/18] fix formatting issues --- code/crates/engine/src/sync.rs | 12 ++++++++---- code/crates/sync/src/handle.rs | 8 ++++++-- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/code/crates/engine/src/sync.rs b/code/crates/engine/src/sync.rs index 6ead08a7f..911ad2a64 100644 --- a/code/crates/engine/src/sync.rs +++ b/code/crates/engine/src/sync.rs @@ -291,12 +291,13 @@ where const MAX_BYTES_PER_SIGNATURE: usize = 100; // a `CommitSignature` consists of an address and a signature - const MAX_BYTES_PER_COMMIT_SIGNATURE: usize = MAX_BYTES_PER_ADDRESS + MAX_BYTES_PER_SIGNATURE; + const MAX_BYTES_PER_COMMIT_SIGNATURE: usize = + MAX_BYTES_PER_ADDRESS + MAX_BYTES_PER_SIGNATURE; let mut values = Vec::new(); let mut height = *range.start(); - let mut response_size_bytes = 0; + let mut response_size_bytes = 0; while height <= *range.end() { let value = self .host @@ -311,12 +312,15 @@ where let value_size_bytes = value.value_bytes.len(); let num_commit_signature = value.certificate.commit_signatures.len(); - let certificate_size_estimate = num_commit_signature * MAX_BYTES_PER_COMMIT_SIGNATURE; + let certificate_size_estimate = + num_commit_signature * MAX_BYTES_PER_COMMIT_SIGNATURE; let total_value_size_bytes = value_size_bytes + certificate_size_estimate; // check if adding this value would exceed the max-response limit - if response_size_bytes + total_value_size_bytes > self.sync_config.max_response_size { + if response_size_bytes + total_value_size_bytes + > self.sync_config.max_response_size + { warn!("Maximum byte size limit ({} bytes) would be exceeded (current: {}, value + certificate estimate: {}), stopping at height {}", self.sync_config.max_response_size, response_size_bytes, total_value_size_bytes, height); break; diff --git a/code/crates/sync/src/handle.rs b/code/crates/sync/src/handle.rs index 51e2e1466..385e384b4 100644 --- a/code/crates/sync/src/handle.rs +++ b/code/crates/sync/src/handle.rs @@ -322,7 +322,9 @@ where // values are fully processed. // To tackle this, we first update the current pending request with the range of values // it provides we received, and then issue a new request with the remaining values. - let new_start = requested_range.start().increment_by(response.values.len() as u64); + let new_start = requested_range + .start() + .increment_by(response.values.len() as u64); let end = *requested_range.end(); if response.values.is_empty() { @@ -330,7 +332,9 @@ where } else { // the response of this request only provided `response.values.len()` values, so update the pending request accordingly let updated_range = *requested_range.start()..=new_start.decrement().unwrap(); - state.pending_requests.insert(request_id.clone(), (updated_range, peer_id)); + state + .pending_requests + .insert(request_id.clone(), (updated_range, peer_id)); } // issue a new request to the same peer for the remaining values From 1359cf478f757c497a7ee2647a2ff48c49da49a2 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 21 Aug 2025 08:02:49 +0000 Subject: [PATCH 03/18] Cleanup --- code/crates/sync/src/handle.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/code/crates/sync/src/handle.rs b/code/crates/sync/src/handle.rs index 385e384b4..f975e5b43 100644 --- a/code/crates/sync/src/handle.rs +++ b/code/crates/sync/src/handle.rs @@ -313,8 +313,9 @@ where } let range_len = requested_range.end().as_u64() - requested_range.start().as_u64() + 1; - if (response.values.len() as u64) < range_len { - // **NOTE** We cannot simply call `re_request_values_from_peer_except` here. + + if response.values.len() < range_len as usize { + // NOTE: We cannot simply call `re_request_values_from_peer_except` here. // Although we received some values from the peer, these values have not yet been processed // by the consensus engine. If we called `re_request_values_from_peer_except`, we would // end up re-requesting the entire original range (including values we already received), From 5bd7c264664ba4eb6c0bdf46923fdb55254802ef Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 21 Aug 2025 08:03:27 +0000 Subject: [PATCH 04/18] Small refactor using new `update_request` state method --- code/crates/sync/src/handle.rs | 8 ++++---- code/crates/sync/src/state.rs | 9 +++++++++ 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/code/crates/sync/src/handle.rs b/code/crates/sync/src/handle.rs index f975e5b43..8fe37e538 100644 --- a/code/crates/sync/src/handle.rs +++ b/code/crates/sync/src/handle.rs @@ -326,16 +326,16 @@ where let new_start = requested_range .start() .increment_by(response.values.len() as u64); + let end = *requested_range.end(); if response.values.is_empty() { error!(%request_id, %peer_id, "Received response contains no values"); } else { - // the response of this request only provided `response.values.len()` values, so update the pending request accordingly + // The response of this request only provided `response.values.len()` values, + // so update the pending request accordingly let updated_range = *requested_range.start()..=new_start.decrement().unwrap(); - state - .pending_requests - .insert(request_id.clone(), (updated_range, peer_id)); + state.update_request(request_id, peer_id, updated_range); } // issue a new request to the same peer for the remaining values diff --git a/code/crates/sync/src/state.rs b/code/crates/sync/src/state.rs index 3bd2590a5..ccadfb065 100644 --- a/code/crates/sync/src/state.rs +++ b/code/crates/sync/src/state.rs @@ -67,6 +67,15 @@ where self.peers.insert(status.peer_id, status); } + pub fn update_request( + &mut self, + request_id: OutboundRequestId, + peer_id: PeerId, + range: RangeInclusive, + ) { + self.pending_requests.insert(request_id, (range, peer_id)); + } + /// Filter peers to only include those that can provide the given range of values, or at least a prefix of the range. /// /// If there is no peer with all requested values, select a peer that has a tip at or above the start of the range. From 3db4ca5f3795d8e48daf4f388289a0b8bec8b3e8 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 21 Aug 2025 08:03:57 +0000 Subject: [PATCH 05/18] Add `max_parallel_requests` state method --- code/crates/sync/src/handle.rs | 2 +- code/crates/sync/src/state.rs | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/code/crates/sync/src/handle.rs b/code/crates/sync/src/handle.rs index 8fe37e538..244718b17 100644 --- a/code/crates/sync/src/handle.rs +++ b/code/crates/sync/src/handle.rs @@ -510,7 +510,7 @@ async fn request_values( where Ctx: Context, { - let max_parallel_requests = max(1, state.config.parallel_requests); + let max_parallel_requests = state.max_parallel_requests(); if state.pending_requests.len() as u64 >= max_parallel_requests { info!( diff --git a/code/crates/sync/src/state.rs b/code/crates/sync/src/state.rs index ccadfb065..5f7278bcb 100644 --- a/code/crates/sync/src/state.rs +++ b/code/crates/sync/src/state.rs @@ -63,6 +63,12 @@ where } } + /// The maximum number of parallel requests that can be made to peers. + /// If the configuration is set to 0, it defaults to 1. + pub fn max_parallel_requests(&self) -> u64 { + max(1, self.config.parallel_requests) + } + pub fn update_status(&mut self, status: Status) { self.peers.insert(status.peer_id, status); } From bc304c5315e616a815940c246f4997adff463020 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 21 Aug 2025 08:04:09 +0000 Subject: [PATCH 06/18] Re-request range from any peer, not necessarily the same one --- code/crates/sync/src/handle.rs | 39 ++++++++++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/code/crates/sync/src/handle.rs b/code/crates/sync/src/handle.rs index 244718b17..2e09cd934 100644 --- a/code/crates/sync/src/handle.rs +++ b/code/crates/sync/src/handle.rs @@ -338,9 +338,9 @@ where state.update_request(request_id, peer_id, updated_range); } - // issue a new request to the same peer for the remaining values + // Issue a new request to any peer, not necessarily the same one, for the remaining values let new_range = new_start..=end; - request_values_from_peer(&co, state, metrics, new_range, peer_id).await?; + request_values_range(co, state, metrics, new_range).await?; } } @@ -542,6 +542,41 @@ where Ok(()) } +/// Request multiple batches of values in parallel. +async fn request_values_range( + co: Co, + state: &mut State, + metrics: &Metrics, + range: RangeInclusive, +) -> Result<(), Error> +where + Ctx: Context, +{ + let max_parallel_requests = state.max_parallel_requests(); + + if state.pending_requests.len() as u64 >= max_parallel_requests { + info!( + %max_parallel_requests, + pending_requests = %state.pending_requests.len(), + range = %DisplayRange::(&range), + "Maximum number of parallel requests reached, skipping request for values" + ); + + return Ok(()); + }; + + // Get a random peer that can provide the values in the range. + let Some((peer, range)) = state.random_peer_with(&range) else { + // No connected peer reached this height yet, we can stop syncing here. + debug!(range = %DisplayRange::(&range), "No peer to request sync from"); + return Ok(()); + }; + + request_values_from_peer(&co, state, metrics, range, peer).await?; + + Ok(()) +} + async fn request_values_from_peer( co: &Co, state: &mut State, From 3dadbb1c7e93377fac14312da9414dc545bc3966 Mon Sep 17 00:00:00 2001 From: insumity Date: Wed, 27 Aug 2025 09:03:18 +0200 Subject: [PATCH 07/18] introduce warnings when pushing to a bounded queue --- code/crates/core-consensus/src/util/bounded_queue.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/code/crates/core-consensus/src/util/bounded_queue.rs b/code/crates/core-consensus/src/util/bounded_queue.rs index 46b089f96..3732ce275 100644 --- a/code/crates/core-consensus/src/util/bounded_queue.rs +++ b/code/crates/core-consensus/src/util/bounded_queue.rs @@ -1,4 +1,5 @@ use std::collections::BTreeMap; +use tracing::{warn}; /// A data structure that maintains a queue of values associated with monotonically increasing indices. /// @@ -46,6 +47,8 @@ where if &index < max_index { let max_index = max_index.clone(); + warn!("Bounded queue is full, dropping value"); + // Remove the highest index self.queue.remove(&max_index); @@ -56,6 +59,7 @@ where } } + warn!("Bounded queue is full, no value is inserted"); false } From 37a1d4a7bf5a739f5b320d338027793512e14933 Mon Sep 17 00:00:00 2001 From: insumity Date: Wed, 27 Aug 2025 09:09:53 +0200 Subject: [PATCH 08/18] get the value size from the network codec --- code/crates/engine/src/network.rs | 21 ++++++++++++++--- code/crates/engine/src/sync.rs | 38 +++++++++++++++---------------- 2 files changed, 36 insertions(+), 23 deletions(-) diff --git a/code/crates/engine/src/network.rs b/code/crates/engine/src/network.rs index 254bd3d7d..5b83c40d4 100644 --- a/code/crates/engine/src/network.rs +++ b/code/crates/engine/src/network.rs @@ -10,9 +10,7 @@ use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort}; use tokio::task::JoinHandle; use tracing::{error, trace}; -use malachitebft_sync::{ - self as sync, InboundRequestId, OutboundRequestId, RawMessage, Request, Response, -}; +use malachitebft_sync::{self as sync, InboundRequestId, OutboundRequestId, RawMessage, Request, Response}; use malachitebft_codec as codec; use malachitebft_core_consensus::{LivenessMsg, SignedConsensusMsg}; @@ -167,6 +165,9 @@ pub enum Msg { /// Send a response for a request to a peer OutgoingResponse(InboundRequestId, Response), + /// Request the total number of bytes that are to be transmitted for this response + GetResponseSize(Response, RpcReplyPort), + /// Request for number of peers from gossip GetState { reply: RpcReplyPort }, @@ -328,6 +329,20 @@ where }; } + Msg::GetResponseSize(response, reply_to) => { + let encoded = self.codec.encode(&response); + + match encoded { + Ok(data) => { + reply_to.send(data.len())?; + } + Err(e) => { + error!(?response, "Failed to encode response message: {e:?}"); + return Ok(()); + } + }; + } + Msg::NewEvent(Event::Listening(addr)) => { listen_addrs.push(addr.clone()); output_port.send(NetworkEvent::Listening(addr)); diff --git a/code/crates/engine/src/sync.rs b/code/crates/engine/src/sync.rs index 911ad2a64..aa01d5a54 100644 --- a/code/crates/engine/src/sync.rs +++ b/code/crates/engine/src/sync.rs @@ -19,7 +19,7 @@ use malachitebft_sync::{ self as sync, HeightStartType, InboundRequestId, OutboundRequestId, RawDecidedValue, Request, Response, Resumable, }; - +use malachitebft_sync::Response::ValueResponse; use crate::host::{HostMsg, HostRef}; use crate::network::{NetworkEvent, NetworkMsg, NetworkRef, Status}; use crate::util::ticker::ticker; @@ -283,17 +283,6 @@ where } Effect::GetDecidedValues(request_id, range, r) => { - // For simplicity, to avoid asking the application for the exact size of addresses, signatures, etc. - // we assume some maximum byte sizes for an address and a signature in order to calculate - // an **approximate** total size per value, so that we only send at most `self.sync_config.max_response_size` - // to another peer. - const MAX_BYTES_PER_ADDRESS: usize = 32; - const MAX_BYTES_PER_SIGNATURE: usize = 100; - - // a `CommitSignature` consists of an address and a signature - const MAX_BYTES_PER_COMMIT_SIGNATURE: usize = - MAX_BYTES_PER_ADDRESS + MAX_BYTES_PER_SIGNATURE; - let mut values = Vec::new(); let mut height = *range.start(); @@ -309,19 +298,27 @@ where .success_or(eyre!("Failed to get decided value for height {height}"))?; if let Some(value) = value { - let value_size_bytes = value.value_bytes.len(); - - let num_commit_signature = value.certificate.commit_signatures.len(); - let certificate_size_estimate = - num_commit_signature * MAX_BYTES_PER_COMMIT_SIGNATURE; - - let total_value_size_bytes = value_size_bytes + certificate_size_estimate; + let value_response = ValueResponse(sync::ValueResponse::new(*range.start(), vec![value.clone()])); + + let result = ractor::call!(self.gossip, move |reply_to| { + NetworkMsg::GetResponseSize(value_response.clone(), reply_to) + }); + + let total_value_size_bytes = match result { + Ok(value_in_bytes) => { + value_in_bytes + } + Err(e) => { + error!("Failed to get response size for value, stopping at for height {}: {:?}", height, e); + break; + } + }; // check if adding this value would exceed the max-response limit if response_size_bytes + total_value_size_bytes > self.sync_config.max_response_size { - warn!("Maximum byte size limit ({} bytes) would be exceeded (current: {}, value + certificate estimate: {}), stopping at height {}", + warn!("Maximum byte size limit ({} bytes) would be exceeded (current: {} + upcoming value: {}), stopping at height {}", self.sync_config.max_response_size, response_size_bytes, total_value_size_bytes, height); break; } @@ -336,6 +333,7 @@ where height = height.increment(); } + myself.cast(Msg::::GotDecidedValues(request_id, range, values))?; Ok(r.resume_with(())) From ef5d2af0a825a1e2dc931330b9cbc41d471f6bb7 Mon Sep 17 00:00:00 2001 From: insumity Date: Thu, 28 Aug 2025 08:34:56 +0200 Subject: [PATCH 09/18] remove parallel-requests check and add test --- code/crates/sync/src/handle.rs | 18 +++----- code/crates/test/tests/it/value_sync.rs | 56 +++++++++++++++++++++++-- 2 files changed, 58 insertions(+), 16 deletions(-) diff --git a/code/crates/sync/src/handle.rs b/code/crates/sync/src/handle.rs index 2e09cd934..594f199e4 100644 --- a/code/crates/sync/src/handle.rs +++ b/code/crates/sync/src/handle.rs @@ -542,7 +542,8 @@ where Ok(()) } -/// Request multiple batches of values in parallel. +/// Request values for this specific range from a peer. +/// Should only be used when re-requesting a partial range of values from a peer. async fn request_values_range( co: Co, state: &mut State, @@ -552,18 +553,9 @@ async fn request_values_range( where Ctx: Context, { - let max_parallel_requests = state.max_parallel_requests(); - - if state.pending_requests.len() as u64 >= max_parallel_requests { - info!( - %max_parallel_requests, - pending_requests = %state.pending_requests.len(), - range = %DisplayRange::(&range), - "Maximum number of parallel requests reached, skipping request for values" - ); - - return Ok(()); - }; + // NOTE: We do not perform a `max_parallel_requests` check here in contrast to what is done, for + // example in `request_values`. This is because `request_values_range` is only called for retrieving + // partial responses, which means the original request is not on the wire anymore. // Get a random peer that can provide the values in the range. let Some((peer, range)) = state.random_peer_with(&range) else { diff --git a/code/crates/test/tests/it/value_sync.rs b/code/crates/test/tests/it/value_sync.rs index b41e96576..601289f19 100644 --- a/code/crates/test/tests/it/value_sync.rs +++ b/code/crates/test/tests/it/value_sync.rs @@ -1,13 +1,13 @@ -use std::sync::atomic::{AtomicBool, Ordering}; -use std::time::Duration; - use crate::{TestBuilder, TestParams}; +use bytesize::ByteSize; use eyre::bail; use informalsystems_malachitebft_test::middleware::{Middleware, RotateEpochValidators}; use informalsystems_malachitebft_test::TestContext; use malachitebft_config::ValuePayload; use malachitebft_core_consensus::ProposedValue; use malachitebft_core_types::CommitCertificate; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; pub async fn crash_restart_from_start(params: TestParams) { const HEIGHT: u64 = 6; @@ -416,3 +416,53 @@ pub async fn reset_height() { ) .await } + +#[tokio::test] +pub async fn response_size_limit_exceeded() { + const HEIGHT: u64 = 5; + + let mut test = TestBuilder::<()>::new(); + + test.add_node() + .with_voting_power(10) + .start() + .wait_until(HEIGHT) + .success(); + + test.add_node() + .with_voting_power(10) + .start() + .wait_until(HEIGHT) + .success(); + + // Node 3 starts with 5 voting power, in parallel with node 1 and 2. + test.add_node() + .with_voting_power(5) + .start() + // Wait until the node reaches height 2... + .wait_until(2) + // ...and then kills it + .crash() + // Reset the database so that the node has to do Sync from height 1 + .reset_db() + // After that, it waits 5 seconds before restarting the node + .restart_after(Duration::from_secs(5)) + // Wait until the node reached the expected height + .wait_until(HEIGHT) + // Record a successful test for this node + .success(); + + test.build() + .run_with_params( + Duration::from_secs(60), + TestParams { + enable_value_sync: true, + // Values are around ~900 bytes, so this `max_response_size` in combination + // with a `batch_size` of 2 leads to having a syncing peer sending partial responses. + max_response_size: ByteSize::b(1000), + batch_size: 2, + parallel_requests: 1, + ..Default::default() + }, + ).await +} \ No newline at end of file From 3bf42dd3c7bd1c702db535dbd96a84657aa8e6db Mon Sep 17 00:00:00 2001 From: insumity Date: Thu, 28 Aug 2025 09:05:35 +0200 Subject: [PATCH 10/18] fix formatting issues --- .../core-consensus/src/util/bounded_queue.rs | 2 +- code/crates/engine/src/network.rs | 4 +++- code/crates/engine/src/sync.rs | 20 +++++++++---------- code/crates/test/tests/it/value_sync.rs | 5 +++-- 4 files changed, 17 insertions(+), 14 deletions(-) diff --git a/code/crates/core-consensus/src/util/bounded_queue.rs b/code/crates/core-consensus/src/util/bounded_queue.rs index 3732ce275..581ef0402 100644 --- a/code/crates/core-consensus/src/util/bounded_queue.rs +++ b/code/crates/core-consensus/src/util/bounded_queue.rs @@ -1,5 +1,5 @@ use std::collections::BTreeMap; -use tracing::{warn}; +use tracing::warn; /// A data structure that maintains a queue of values associated with monotonically increasing indices. /// diff --git a/code/crates/engine/src/network.rs b/code/crates/engine/src/network.rs index 5b83c40d4..2895fdc3a 100644 --- a/code/crates/engine/src/network.rs +++ b/code/crates/engine/src/network.rs @@ -10,7 +10,9 @@ use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort}; use tokio::task::JoinHandle; use tracing::{error, trace}; -use malachitebft_sync::{self as sync, InboundRequestId, OutboundRequestId, RawMessage, Request, Response}; +use malachitebft_sync::{ + self as sync, InboundRequestId, OutboundRequestId, RawMessage, Request, Response, +}; use malachitebft_codec as codec; use malachitebft_core_consensus::{LivenessMsg, SignedConsensusMsg}; diff --git a/code/crates/engine/src/sync.rs b/code/crates/engine/src/sync.rs index aa01d5a54..880a75592 100644 --- a/code/crates/engine/src/sync.rs +++ b/code/crates/engine/src/sync.rs @@ -12,18 +12,18 @@ use rand::SeedableRng; use tokio::task::JoinHandle; use tracing::{debug, error, info, warn, Instrument}; +use crate::host::{HostMsg, HostRef}; +use crate::network::{NetworkEvent, NetworkMsg, NetworkRef, Status}; +use crate::util::ticker::ticker; +use crate::util::timers::{TimeoutElapsed, TimerScheduler}; use malachitebft_codec as codec; use malachitebft_core_consensus::PeerId; use malachitebft_core_types::{CommitCertificate, Context, Height}; +use malachitebft_sync::Response::ValueResponse; use malachitebft_sync::{ self as sync, HeightStartType, InboundRequestId, OutboundRequestId, RawDecidedValue, Request, Response, Resumable, }; -use malachitebft_sync::Response::ValueResponse; -use crate::host::{HostMsg, HostRef}; -use crate::network::{NetworkEvent, NetworkMsg, NetworkRef, Status}; -use crate::util::ticker::ticker; -use crate::util::timers::{TimeoutElapsed, TimerScheduler}; /// Codec for sync protocol messages /// @@ -298,16 +298,17 @@ where .success_or(eyre!("Failed to get decided value for height {height}"))?; if let Some(value) = value { - let value_response = ValueResponse(sync::ValueResponse::new(*range.start(), vec![value.clone()])); + let value_response = ValueResponse(sync::ValueResponse::new( + *range.start(), + vec![value.clone()], + )); let result = ractor::call!(self.gossip, move |reply_to| { NetworkMsg::GetResponseSize(value_response.clone(), reply_to) }); let total_value_size_bytes = match result { - Ok(value_in_bytes) => { - value_in_bytes - } + Ok(value_in_bytes) => value_in_bytes, Err(e) => { error!("Failed to get response size for value, stopping at for height {}: {:?}", height, e); break; @@ -333,7 +334,6 @@ where height = height.increment(); } - myself.cast(Msg::::GotDecidedValues(request_id, range, values))?; Ok(r.resume_with(())) diff --git a/code/crates/test/tests/it/value_sync.rs b/code/crates/test/tests/it/value_sync.rs index 601289f19..f181c536f 100644 --- a/code/crates/test/tests/it/value_sync.rs +++ b/code/crates/test/tests/it/value_sync.rs @@ -464,5 +464,6 @@ pub async fn response_size_limit_exceeded() { parallel_requests: 1, ..Default::default() }, - ).await -} \ No newline at end of file + ) + .await +} From 4b9bddc4e65fbd0f3628e760aac4ced7162b6e88 Mon Sep 17 00:00:00 2001 From: insumity Date: Thu, 28 Aug 2025 11:12:30 +0200 Subject: [PATCH 11/18] add a defensive check if peer ids do not match --- code/crates/sync/src/handle.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/code/crates/sync/src/handle.rs b/code/crates/sync/src/handle.rs index 594f199e4..f5ced9d97 100644 --- a/code/crates/sync/src/handle.rs +++ b/code/crates/sync/src/handle.rs @@ -306,10 +306,13 @@ where // If the response contains a prefix of the requested values, re-request the remaining values. if let Some((requested_range, stored_peer_id)) = state.pending_requests.get(&request_id) { if stored_peer_id != &peer_id { - warn!( + // Defensive check: This should never happen because this check is already performed in + // the handler of `Input::ValueResponse`. + error!( %request_id, peer.actual = %peer_id, peer.expected = %stored_peer_id, "Received response from different peer than expected" ); + return on_invalid_value_response(co, state, metrics, request_id, peer_id).await; } let range_len = requested_range.end().as_u64() - requested_range.start().as_u64() + 1; From 912a27d835e1b65ba4eb053800bdf6fc1a94f78a Mon Sep 17 00:00:00 2001 From: insumity Date: Thu, 28 Aug 2025 15:58:01 +0200 Subject: [PATCH 12/18] fix issue with potentially multiple parallel requests --- code/crates/engine/src/consensus.rs | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/code/crates/engine/src/consensus.rs b/code/crates/engine/src/consensus.rs index 87add4925..f0a9bda4b 100644 --- a/code/crates/engine/src/consensus.rs +++ b/code/crates/engine/src/consensus.rs @@ -506,8 +506,20 @@ where // Process values sequentially starting from the lowest height let mut height = start_height; for value in values.iter() { - self.process_sync_response(&myself, state, peer, height, value) - .await?; + if let Err(e) = self + .process_sync_response(&myself, state, peer, height, value) + .await + { + // At this point, `process_sync_response` has already sent a message + // about an invalid value, etc. to the sync actor. The sync actor + // will then, re-request this range again from some peer. + // Because of this, in case of failing to process the response, we need + // to exit early this loop to avoid issuing multiple parallel requests + // for the same range of values. There's also no benefit in processing + // the rest of the values. + error!(%start_height, %height, %request_id, "Failed to process sync response:{e:?}"); + break; + } height = height.increment(); } @@ -663,6 +675,7 @@ where .await { error!(%height, "Error when processing received synced block: {e}"); + return Err(e.into()); } Ok(()) From 6f06ea43e5d3d59a3b73e664b25ae1fa7de3d8b2 Mon Sep 17 00:00:00 2001 From: insumity Date: Wed, 3 Sep 2025 11:01:11 +0200 Subject: [PATCH 13/18] take into account breaking-early comment --- code/crates/engine/src/sync.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/code/crates/engine/src/sync.rs b/code/crates/engine/src/sync.rs index 880a75592..1ae897a0a 100644 --- a/code/crates/engine/src/sync.rs +++ b/code/crates/engine/src/sync.rs @@ -326,6 +326,15 @@ where response_size_bytes += total_value_size_bytes; values.push(value); + + if response_size_bytes == self.sync_config.max_response_size { + info!( + "Reached maximum byte size limit ({} bytes) exactly at height {}", + self.sync_config.max_response_size, height + ); + + break; + } } else { warn!("Decided value not found for height {height}"); break; From 49fddfde299b89e9f99c0155772ed2a1c6d1474b Mon Sep 17 00:00:00 2001 From: insumity Date: Mon, 8 Sep 2025 13:25:52 +0200 Subject: [PATCH 14/18] add rpc_max_size config in tests --- code/crates/test/framework/src/params.rs | 3 +++ code/crates/test/tests/it/value_sync.rs | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/code/crates/test/framework/src/params.rs b/code/crates/test/framework/src/params.rs index 19e847fe8..e61ac0eae 100644 --- a/code/crates/test/framework/src/params.rs +++ b/code/crates/test/framework/src/params.rs @@ -9,6 +9,7 @@ pub struct TestParams { pub parallel_requests: usize, pub batch_size: usize, pub protocol: PubSubProtocol, + pub rpc_max_size: ByteSize, pub block_size: ByteSize, pub tx_size: ByteSize, pub txs_per_part: usize, @@ -26,6 +27,7 @@ impl Default for TestParams { parallel_requests: 1, batch_size: 1, protocol: PubSubProtocol::default(), + rpc_max_size: ByteSize::mib(2), block_size: ByteSize::mib(1), tx_size: ByteSize::kib(1), txs_per_part: 256, @@ -45,6 +47,7 @@ impl TestParams { config.value_sync.batch_size = self.batch_size; config.value_sync.max_response_size = self.max_response_size; config.consensus.p2p.protocol = self.protocol; + config.consensus.p2p.rpc_max_size = self.rpc_max_size; config.consensus.value_payload = self.value_payload; config.test.max_block_size = self.block_size; config.test.txs_per_part = self.txs_per_part; diff --git a/code/crates/test/tests/it/value_sync.rs b/code/crates/test/tests/it/value_sync.rs index f181c536f..bdf00a1a8 100644 --- a/code/crates/test/tests/it/value_sync.rs +++ b/code/crates/test/tests/it/value_sync.rs @@ -460,6 +460,10 @@ pub async fn response_size_limit_exceeded() { // Values are around ~900 bytes, so this `max_response_size` in combination // with a `batch_size` of 2 leads to having a syncing peer sending partial responses. max_response_size: ByteSize::b(1000), + // Values are around ~900 bytes, so we canNOT have more than one value in a response. + // In other words, if `max_response_size` is not respected, node 3 would not have been + // able to sync in this test. + rpc_max_size: ByteSize::b(1000), batch_size: 2, parallel_requests: 1, ..Default::default() From cb40f73ef7b115793ac6099c46c6f9157086e8c9 Mon Sep 17 00:00:00 2001 From: insumity Date: Thu, 11 Sep 2025 15:20:25 +0200 Subject: [PATCH 15/18] do not encode the data to compute the encoded length --- code/crates/app-channel/src/run.rs | 9 +++++---- code/crates/app-channel/src/spawn.rs | 17 +++++++++-------- code/crates/app/src/spawn.rs | 7 ++++--- code/crates/app/src/types.rs | 1 + code/crates/codec/src/lib.rs | 5 +++++ code/crates/engine/src/network.rs | 15 ++++++++------- code/crates/proto/src/lib.rs | 5 +++++ code/crates/starknet/host/src/codec.rs | 11 ++++++++++- code/crates/test/Cargo.toml | 1 + code/crates/test/src/codec/json/mod.rs | 14 +++++++++++++- code/crates/test/src/codec/proto/mod.rs | 14 +++++++++++++- 11 files changed, 74 insertions(+), 25 deletions(-) diff --git a/code/crates/app-channel/src/run.rs b/code/crates/app-channel/src/run.rs index 5380e36dc..6ec35b2d3 100644 --- a/code/crates/app-channel/src/run.rs +++ b/code/crates/app-channel/src/run.rs @@ -3,10 +3,6 @@ use eyre::Result; -use malachitebft_engine::consensus::{ConsensusMsg, ConsensusRef}; -use malachitebft_engine::util::events::TxEvent; -use tokio::sync::mpsc::Receiver; - use crate::app::metrics::{Metrics, SharedRegistry}; use crate::app::node::{self, EngineHandle, NodeConfig}; use crate::app::spawn::{ @@ -17,6 +13,10 @@ use crate::app::types::core::Context; use crate::msgs::ConsensusRequest; use crate::spawn::{spawn_host_actor, spawn_network_actor}; use crate::Channels; +use malachitebft_app::types::sync; +use malachitebft_engine::consensus::{ConsensusMsg, ConsensusRef}; +use malachitebft_engine::util::events::TxEvent; +use tokio::sync::mpsc::Receiver; pub async fn start_engine( ctx: Ctx, @@ -33,6 +33,7 @@ where WalCodec: codec::WalCodec + Clone, NetCodec: codec::ConsensusCodec, NetCodec: codec::SyncCodec, + NetCodec: codec::HasEncodedLen>, { let start_height = start_height.unwrap_or_default(); diff --git a/code/crates/app-channel/src/spawn.rs b/code/crates/app-channel/src/spawn.rs index 9d30213ac..4d59e692a 100644 --- a/code/crates/app-channel/src/spawn.rs +++ b/code/crates/app-channel/src/spawn.rs @@ -1,13 +1,5 @@ //! Utility functions for spawning the actor system and connecting it to the application. -use eyre::Result; -use tokio::sync::mpsc; - -use malachitebft_engine::consensus::ConsensusCodec; -use malachitebft_engine::host::HostRef; -use malachitebft_engine::network::NetworkRef; -use malachitebft_engine::sync::SyncCodec; - use crate::app; use crate::app::config::ConsensusConfig; use crate::app::metrics::Metrics; @@ -16,6 +8,14 @@ use crate::app::types::core::Context; use crate::app::types::Keypair; use crate::connector::Connector; use crate::{AppMsg, NetworkMsg}; +use eyre::Result; +use malachitebft_app::types::codec::HasEncodedLen; +use malachitebft_app::types::sync; +use malachitebft_engine::consensus::ConsensusCodec; +use malachitebft_engine::host::HostRef; +use malachitebft_engine::network::NetworkRef; +use malachitebft_engine::sync::SyncCodec; +use tokio::sync::mpsc; pub async fn spawn_host_actor( metrics: Metrics, @@ -38,6 +38,7 @@ where Ctx: Context, Codec: ConsensusCodec, Codec: SyncCodec, + Codec: HasEncodedLen>, { let (tx, mut rx) = mpsc::channel::>(1); diff --git a/code/crates/app/src/spawn.rs b/code/crates/app/src/spawn.rs index 4f62a36da..667bd0764 100644 --- a/code/crates/app/src/spawn.rs +++ b/code/crates/app/src/spawn.rs @@ -4,9 +4,7 @@ use std::path::Path; use std::time::Duration; use eyre::Result; -use tokio::task::JoinHandle; -use tracing::Span; - +use malachitebft_codec::HasEncodedLen; use malachitebft_engine::consensus::{Consensus, ConsensusCodec, ConsensusParams, ConsensusRef}; use malachitebft_engine::host::HostRef; use malachitebft_engine::network::{Network, NetworkRef}; @@ -18,6 +16,8 @@ use malachitebft_network::{ ChannelNames, Config as NetworkConfig, DiscoveryConfig, GossipSubConfig, Keypair, }; use malachitebft_sync as sync; +use tokio::task::JoinHandle; +use tracing::Span; use crate::config::{ConsensusConfig, PubSubProtocol, ValueSyncConfig}; use crate::metrics::{Metrics, SharedRegistry}; @@ -60,6 +60,7 @@ where Ctx: Context, Codec: ConsensusCodec, Codec: SyncCodec, + Codec: HasEncodedLen>, { let config = make_gossip_config(cfg); diff --git a/code/crates/app/src/types.rs b/code/crates/app/src/types.rs index 4fff74cd7..5a91264b2 100644 --- a/code/crates/app/src/types.rs +++ b/code/crates/app/src/types.rs @@ -22,6 +22,7 @@ pub mod sync { pub mod codec { pub use malachitebft_codec::Codec; + pub use malachitebft_codec::HasEncodedLen; pub use malachitebft_engine::consensus::ConsensusCodec; pub use malachitebft_engine::sync::SyncCodec; pub use malachitebft_engine::wal::WalCodec; diff --git a/code/crates/codec/src/lib.rs b/code/crates/codec/src/lib.rs index 68f1c6fb3..a3e5770ac 100644 --- a/code/crates/codec/src/lib.rs +++ b/code/crates/codec/src/lib.rs @@ -8,3 +8,8 @@ pub trait Codec: Send + Sync + 'static { fn decode(&self, bytes: Bytes) -> Result; fn encode(&self, msg: &T) -> Result; } + +/// Codec extension trait for types that can also compute the length of the encoded data. +pub trait HasEncodedLen: Codec { + fn encoded_len(&self, msg: &T) -> Result>::Error>; +} diff --git a/code/crates/engine/src/network.rs b/code/crates/engine/src/network.rs index 2895fdc3a..f75dbfe20 100644 --- a/code/crates/engine/src/network.rs +++ b/code/crates/engine/src/network.rs @@ -1,12 +1,11 @@ -use std::collections::{BTreeSet, HashMap}; -use std::marker::PhantomData; - use async_trait::async_trait; use derive_where::derive_where; use eyre::eyre; use libp2p::identity::Keypair; use libp2p::request_response; use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort}; +use std::collections::{BTreeSet, HashMap}; +use std::marker::PhantomData; use tokio::task::JoinHandle; use tracing::{error, trace}; @@ -71,6 +70,7 @@ where Ctx: Context, Codec: ConsensusCodec, Codec: SyncCodec, + Codec: codec::HasEncodedLen>, { pub async fn spawn( keypair: Keypair, @@ -190,6 +190,7 @@ where Codec: codec::Codec>, Codec: codec::Codec>, Codec: codec::Codec>, + Codec: codec::HasEncodedLen>, { type Msg = Msg; type State = State; @@ -332,11 +333,11 @@ where } Msg::GetResponseSize(response, reply_to) => { - let encoded = self.codec.encode(&response); + let encoded_len = self.codec.encoded_len(&response); - match encoded { - Ok(data) => { - reply_to.send(data.len())?; + match encoded_len { + Ok(size) => { + reply_to.send(size)?; } Err(e) => { error!(?response, "Failed to encode response message: {e:?}"); diff --git a/code/crates/proto/src/lib.rs b/code/crates/proto/src/lib.rs index 6f1a1bf5c..fe33b2a61 100644 --- a/code/crates/proto/src/lib.rs +++ b/code/crates/proto/src/lib.rs @@ -75,6 +75,11 @@ pub trait Protobuf: Sized { Ok(Bytes::from(proto.encode_to_vec())) } + fn encoded_len(&self) -> Result { + let proto = self.to_proto()?; + Ok(proto.encoded_len()) + } + fn from_any(any: &Any) -> Result { Self::from_proto(any.to_msg::()?) } diff --git a/code/crates/starknet/host/src/codec.rs b/code/crates/starknet/host/src/codec.rs index 7741a2677..af678b5f9 100644 --- a/code/crates/starknet/host/src/codec.rs +++ b/code/crates/starknet/host/src/codec.rs @@ -1,7 +1,7 @@ use bytes::Bytes; use prost::Message; -use malachitebft_codec::Codec; +use malachitebft_codec::{Codec, HasEncodedLen}; use malachitebft_core_consensus::{LivenessMsg, PeerId, ProposedValue, SignedConsensusMsg}; use malachitebft_core_types::{ CommitCertificate, CommitSignature, NilOrVal, PolkaCertificate, PolkaSignature, Round, @@ -222,6 +222,15 @@ impl Codec> for ProtobufCodec { } } +impl HasEncodedLen> for ProtobufCodec { + fn encoded_len( + &self, + response: &sync::Response, + ) -> Result>>::Error> { + Ok(encode_sync_response(response)?.encoded_len()) + } +} + pub fn decode_sync_response( proto_response: proto::sync::SyncResponse, ) -> Result, ProtoError> { diff --git a/code/crates/test/Cargo.toml b/code/crates/test/Cargo.toml index 7ae7d241a..01d1c691a 100644 --- a/code/crates/test/Cargo.toml +++ b/code/crates/test/Cargo.toml @@ -31,6 +31,7 @@ hex = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } rand = { workspace = true } +tracing = { workspace = true } serde = { workspace = true, features = ["derive", "rc"] } serde_json = { workspace = true } sha3 = { workspace = true } diff --git a/code/crates/test/src/codec/json/mod.rs b/code/crates/test/src/codec/json/mod.rs index 096ea9c78..bfe02f719 100644 --- a/code/crates/test/src/codec/json/mod.rs +++ b/code/crates/test/src/codec/json/mod.rs @@ -1,7 +1,8 @@ pub mod raw; use bytes::Bytes; -use malachitebft_codec::Codec; +use malachitebft_codec::{Codec, HasEncodedLen}; +use tracing::warn; use malachitebft_core_consensus::{LivenessMsg, SignedConsensusMsg}; use malachitebft_engine::util::streaming::StreamMessage; @@ -100,6 +101,17 @@ impl Codec> for JsonCodec { } } +impl HasEncodedLen> for JsonCodec { + fn encoded_len( + &self, + msg: &Response, + ) -> Result>>::Error> { + warn!("encoded_len serializes the data to compute the length; consider using ProtobufCodec if \ + you want to compute the length of the encoded data without encoding them"); + serde_json::to_vec(&RawResponse::from(msg.clone())).map(|b| b.len()) + } +} + impl Codec> for JsonCodec { type Error = serde_json::Error; diff --git a/code/crates/test/src/codec/proto/mod.rs b/code/crates/test/src/codec/proto/mod.rs index 4d7ebc68d..2bcf67038 100644 --- a/code/crates/test/src/codec/proto/mod.rs +++ b/code/crates/test/src/codec/proto/mod.rs @@ -2,7 +2,7 @@ use bytes::Bytes; use prost::Message; use malachitebft_app::engine::util::streaming::{StreamContent, StreamId, StreamMessage}; -use malachitebft_codec::Codec; +use malachitebft_codec::{Codec, HasEncodedLen}; use malachitebft_core_consensus::{LivenessMsg, ProposedValue, SignedConsensusMsg}; use malachitebft_core_types::{ CommitCertificate, CommitSignature, NilOrVal, PolkaCertificate, PolkaSignature, Round, @@ -394,6 +394,18 @@ impl Codec> for ProtobufCodec { } } +impl HasEncodedLen> for ProtobufCodec { + fn encoded_len( + &self, + response: &sync::Response, + ) -> Result>>::Error> { + // The main cost of encoding "big" responses stems from calling `encode_to_vec` (see `encode` method) + // on `proto::SyncResponse`. We do NOT do this here. We simply call `encoded_len` on the `proto::SyncResponse` + // to quickly retrieve the length of the encoded data without first encoding the response. + Ok(encode_sync_response(response)?.encoded_len()) + } +} + pub fn decode_sync_response( proto_response: proto::SyncResponse, ) -> Result, ProtoError> { From 73734b5d5fffc49e22e7f6175dfa1b72ee3bb5a6 Mon Sep 17 00:00:00 2001 From: insumity Date: Thu, 11 Sep 2025 15:34:21 +0200 Subject: [PATCH 16/18] clean up process_sync_response based on Hernan's comment --- code/crates/engine/src/consensus.rs | 31 +++++++++++++---------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/code/crates/engine/src/consensus.rs b/code/crates/engine/src/consensus.rs index f0a9bda4b..afdd8fbea 100644 --- a/code/crates/engine/src/consensus.rs +++ b/code/crates/engine/src/consensus.rs @@ -662,23 +662,20 @@ where where Ctx: Context, { - if let Err(e) = self - .process_input( - myself, - state, - ConsensusInput::SyncValueResponse(CoreValueResponse::new( - peer, - value.value_bytes.clone(), - value.certificate.clone(), - )), - ) - .await - { - error!(%height, "Error when processing received synced block: {e}"); - return Err(e.into()); - } - - Ok(()) + self.process_input( + myself, + state, + ConsensusInput::SyncValueResponse(CoreValueResponse::new( + peer, + value.value_bytes.clone(), + value.certificate.clone(), + )), + ) + .await + .map_err(|e| { + error!(%height, error = ?e, "Error when processing received synced block"); + e.into() + }) } async fn timeout_elapsed( From d0f460d037840e1d3d3193eb10223a0663b5b9f7 Mon Sep 17 00:00:00 2001 From: insumity Date: Thu, 11 Sep 2025 15:58:19 +0200 Subject: [PATCH 17/18] remove unused method --- code/crates/proto/src/lib.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/code/crates/proto/src/lib.rs b/code/crates/proto/src/lib.rs index fe33b2a61..6f1a1bf5c 100644 --- a/code/crates/proto/src/lib.rs +++ b/code/crates/proto/src/lib.rs @@ -75,11 +75,6 @@ pub trait Protobuf: Sized { Ok(Bytes::from(proto.encode_to_vec())) } - fn encoded_len(&self) -> Result { - let proto = self.to_proto()?; - Ok(proto.encoded_len()) - } - fn from_any(any: &Any) -> Result { Self::from_proto(any.to_msg::()?) } From 225536e3cf1e0b946e1e9f214be513a2cf51828d Mon Sep 17 00:00:00 2001 From: insumity Date: Fri, 12 Sep 2025 10:38:48 +0200 Subject: [PATCH 18/18] log in case of re-requesting partial ranges --- code/crates/sync/src/handle.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/code/crates/sync/src/handle.rs b/code/crates/sync/src/handle.rs index f5ced9d97..151487f56 100644 --- a/code/crates/sync/src/handle.rs +++ b/code/crates/sync/src/handle.rs @@ -556,9 +556,19 @@ async fn request_values_range( where Ctx: Context, { - // NOTE: We do not perform a `max_parallel_requests` check here in contrast to what is done, for - // example in `request_values`. This is because `request_values_range` is only called for retrieving - // partial responses, which means the original request is not on the wire anymore. + // NOTE: We do not perform a `max_parallel_requests` check and return here in contrast to what is done, for + // example, in `request_values`. This is because `request_values_range` is only called for retrieving + // partial responses, which means the original request is not on the wire anymore. Nevertheless, + // we log here because seeing this log frequently implies that we keep getting partial responses + // from peers and hints to potential reconfiguration. + let max_parallel_requests = state.max_parallel_requests(); + if state.pending_requests.len() as u64 >= max_parallel_requests { + info!( + %max_parallel_requests, + pending_requests = %state.pending_requests.len(), + "Maximum number of pending requests reached when re-requesting a partial range of values" + ); + }; // Get a random peer that can provide the values in the range. let Some((peer, range)) = state.random_peer_with(&range) else {