Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions code/crates/engine/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,21 @@ where
}

Effect::GetDecidedValues(request_id, range, r) => {
// For simplicity, to avoid asking the application for the exact size of addresses, signatures, etc.
// we assume some maximum byte sizes for an address and a signature in order to calculate
// an **approximate** total size per value, so that we only send at most `self.sync_config.max_response_size`
// to another peer.
const MAX_BYTES_PER_ADDRESS: usize = 32;
const MAX_BYTES_PER_SIGNATURE: usize = 100;
Comment on lines +290 to +291
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would declare these variables close to the declarations of the address and signature types. Also, each app will have its own definition of address and signature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I fully follow your comment: Are you suggesting contacting the app to receive the sizes of addresses and signatures? If not, where exactly would you declare those constants?

Copy link
Contributor

@romac romac Aug 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add them as static const to the Address and SigningScheme traits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that's pretty brittle as well, and could break things if the upper bound is not tight enough, eg. some application uses a signature scheme with variable-size signatures and defines Signature::MAX_SIZE to a large value just in case.

Copy link
Contributor

@romac romac Aug 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The best way would probably be to encode the commit certificate (or even the full sync response) using the (Network)Codec and use the actual computed size. Not great for performance, we should probably either measure the impact upfront, or go with it and only optimize this if we see too big a perf impact. What do you think?

Copy link
Contributor Author

@insumity insumity Aug 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The best way would probably be to encode the commit certificate (or even the full sync response) using the (Network)Codec and use the actual computed size.

I agree this is the right away to go about it. I'll rewrite the PR to take this into account.


// a `CommitSignature` consists of an address and a signature
const MAX_BYTES_PER_COMMIT_SIGNATURE: usize =
MAX_BYTES_PER_ADDRESS + MAX_BYTES_PER_SIGNATURE;

let mut values = Vec::new();
let mut height = *range.start();

let mut response_size_bytes = 0;
while height <= *range.end() {
let value = self
.host
Expand All @@ -296,6 +309,24 @@ where
.success_or(eyre!("Failed to get decided value for height {height}"))?;

if let Some(value) = value {
let value_size_bytes = value.value_bytes.len();

let num_commit_signature = value.certificate.commit_signatures.len();
let certificate_size_estimate =
num_commit_signature * MAX_BYTES_PER_COMMIT_SIGNATURE;

let total_value_size_bytes = value_size_bytes + certificate_size_estimate;

// check if adding this value would exceed the max-response limit
if response_size_bytes + total_value_size_bytes
> self.sync_config.max_response_size
{
warn!("Maximum byte size limit ({} bytes) would be exceeded (current: {}, value + certificate estimate: {}), stopping at height {}",
self.sync_config.max_response_size, response_size_bytes, total_value_size_bytes, height);
break;
}

response_size_bytes += total_value_size_bytes;
values.push(value);
} else {
warn!("Decided value not found for height {height}");
Expand Down
1 change: 1 addition & 0 deletions code/crates/starknet/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ fn apply_params(config: &mut Config, params: &TestParams) {
config.value_sync.enabled = params.enable_value_sync;
config.value_sync.parallel_requests = params.parallel_requests;
config.value_sync.batch_size = params.batch_size;
config.value_sync.max_response_size = params.max_response_size;
config.consensus.p2p.protocol = params.protocol;
config.test.max_block_size = params.block_size;
config.test.txs_per_part = params.txs_per_part;
Expand Down
70 changes: 66 additions & 4 deletions code/crates/sync/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,36 @@ where
"Received response from different peer than expected"
Copy link
Contributor

@romac romac Aug 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably use this PR to abort processing the response in the case where we got a response from another peer than the one we expected.

);
}

let range_len = requested_range.end().as_u64() - requested_range.start().as_u64() + 1;
if (response.values.len() as u64) < range_len {
re_request_values_from_peer_except(co, state, metrics, request_id, None).await?;

if response.values.len() < range_len as usize {
// NOTE: We cannot simply call `re_request_values_from_peer_except` here.
// Although we received some values from the peer, these values have not yet been processed
// by the consensus engine. If we called `re_request_values_from_peer_except`, we would
// end up re-requesting the entire original range (including values we already received),
// causing the syncing peer to repeatedly send multiple requests until the already-received
// values are fully processed.
// To tackle this, we first update the current pending request with the range of values
// it provides we received, and then issue a new request with the remaining values.
let new_start = requested_range
.start()
.increment_by(response.values.len() as u64);

let end = *requested_range.end();

if response.values.is_empty() {
error!(%request_id, %peer_id, "Received response contains no values");
} else {
// The response of this request only provided `response.values.len()` values,
// so update the pending request accordingly
let updated_range = *requested_range.start()..=new_start.decrement().unwrap();
state.update_request(request_id, peer_id, updated_range);
}

// Issue a new request to any peer, not necessarily the same one, for the remaining values
let new_range = new_start..=end;
request_values_range(co, state, metrics, new_range).await?;
}
}

Expand Down Expand Up @@ -360,7 +387,7 @@ where
// Validate response from host
let batch_size = end.as_u64() - start.as_u64() + 1;
if batch_size != values.len() as u64 {
error!(
warn!(
"Received {} values from host, expected {batch_size}",
values.len()
)
Expand Down Expand Up @@ -483,7 +510,7 @@ async fn request_values<Ctx>(
where
Ctx: Context,
{
let max_parallel_requests = max(1, state.config.parallel_requests);
let max_parallel_requests = state.max_parallel_requests();

if state.pending_requests.len() as u64 >= max_parallel_requests {
info!(
Expand Down Expand Up @@ -515,6 +542,41 @@ where
Ok(())
}

/// Request multiple batches of values in parallel.
async fn request_values_range<Ctx>(
co: Co<Ctx>,
state: &mut State<Ctx>,
metrics: &Metrics,
range: RangeInclusive<Ctx::Height>,
) -> Result<(), Error<Ctx>>
where
Ctx: Context,
{
let max_parallel_requests = state.max_parallel_requests();

if state.pending_requests.len() as u64 >= max_parallel_requests {
info!(
%max_parallel_requests,
pending_requests = %state.pending_requests.len(),
range = %DisplayRange::<Ctx>(&range),
"Maximum number of parallel requests reached, skipping request for values"
);

return Ok(());
};

// Get a random peer that can provide the values in the range.
let Some((peer, range)) = state.random_peer_with(&range) else {
// No connected peer reached this height yet, we can stop syncing here.
debug!(range = %DisplayRange::<Ctx>(&range), "No peer to request sync from");
return Ok(());
};

request_values_from_peer(&co, state, metrics, range, peer).await?;

Ok(())
}

async fn request_values_from_peer<Ctx>(
co: &Co<Ctx>,
state: &mut State<Ctx>,
Expand Down
15 changes: 15 additions & 0 deletions code/crates/sync/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,25 @@ where
}
}

/// The maximum number of parallel requests that can be made to peers.
/// If the configuration is set to 0, it defaults to 1.
pub fn max_parallel_requests(&self) -> u64 {
max(1, self.config.parallel_requests)
}

pub fn update_status(&mut self, status: Status<Ctx>) {
self.peers.insert(status.peer_id, status);
}

pub fn update_request(
&mut self,
request_id: OutboundRequestId,
peer_id: PeerId,
range: RangeInclusive<Ctx::Height>,
) {
self.pending_requests.insert(request_id, (range, peer_id));
}

/// Filter peers to only include those that can provide the given range of values, or at least a prefix of the range.
///
/// If there is no peer with all requested values, select a peer that has a tip at or above the start of the range.
Expand Down
3 changes: 3 additions & 0 deletions code/crates/test/framework/src/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub struct TestParams {
pub value_payload: ValuePayload,
pub max_retain_blocks: usize,
pub stable_block_times: bool,
pub max_response_size: ByteSize,
}

impl Default for TestParams {
Expand All @@ -32,6 +33,7 @@ impl Default for TestParams {
value_payload: ValuePayload::ProposalAndParts,
max_retain_blocks: 50,
stable_block_times: true,
max_response_size: ByteSize::mib(1),
}
}
}
Expand All @@ -41,6 +43,7 @@ impl TestParams {
config.value_sync.enabled = self.enable_value_sync;
config.value_sync.parallel_requests = self.parallel_requests;
config.value_sync.batch_size = self.batch_size;
config.value_sync.max_response_size = self.max_response_size;
config.consensus.p2p.protocol = self.protocol;
config.consensus.value_payload = self.value_payload;
config.test.max_block_size = self.block_size;
Expand Down
Loading