Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions code/crates/app-channel/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@

use eyre::Result;

use malachitebft_engine::consensus::{ConsensusMsg, ConsensusRef};
use malachitebft_engine::util::events::TxEvent;
use tokio::sync::mpsc::Receiver;

use crate::app::metrics::{Metrics, SharedRegistry};
use crate::app::node::{self, EngineHandle, NodeConfig};
use crate::app::spawn::{
Expand All @@ -17,6 +13,10 @@ use crate::app::types::core::Context;
use crate::msgs::ConsensusRequest;
use crate::spawn::{spawn_host_actor, spawn_network_actor};
use crate::Channels;
use malachitebft_app::types::sync;
use malachitebft_engine::consensus::{ConsensusMsg, ConsensusRef};
use malachitebft_engine::util::events::TxEvent;
use tokio::sync::mpsc::Receiver;

pub async fn start_engine<Node, Ctx, WalCodec, NetCodec>(
ctx: Ctx,
Expand All @@ -33,6 +33,7 @@ where
WalCodec: codec::WalCodec<Ctx> + Clone,
NetCodec: codec::ConsensusCodec<Ctx>,
NetCodec: codec::SyncCodec<Ctx>,
NetCodec: codec::HasEncodedLen<sync::Response<Ctx>>,
{
let start_height = start_height.unwrap_or_default();

Expand Down
17 changes: 9 additions & 8 deletions code/crates/app-channel/src/spawn.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,5 @@
//! Utility functions for spawning the actor system and connecting it to the application.

use eyre::Result;
use tokio::sync::mpsc;

use malachitebft_engine::consensus::ConsensusCodec;
use malachitebft_engine::host::HostRef;
use malachitebft_engine::network::NetworkRef;
use malachitebft_engine::sync::SyncCodec;

use crate::app;
use crate::app::config::ConsensusConfig;
use crate::app::metrics::Metrics;
Expand All @@ -16,6 +8,14 @@ use crate::app::types::core::Context;
use crate::app::types::Keypair;
use crate::connector::Connector;
use crate::{AppMsg, NetworkMsg};
use eyre::Result;
use malachitebft_app::types::codec::HasEncodedLen;
use malachitebft_app::types::sync;
use malachitebft_engine::consensus::ConsensusCodec;
use malachitebft_engine::host::HostRef;
use malachitebft_engine::network::NetworkRef;
use malachitebft_engine::sync::SyncCodec;
use tokio::sync::mpsc;

pub async fn spawn_host_actor<Ctx>(
metrics: Metrics,
Expand All @@ -38,6 +38,7 @@ where
Ctx: Context,
Codec: ConsensusCodec<Ctx>,
Codec: SyncCodec<Ctx>,
Codec: HasEncodedLen<sync::Response<Ctx>>,
{
let (tx, mut rx) = mpsc::channel::<NetworkMsg<Ctx>>(1);

Expand Down
7 changes: 4 additions & 3 deletions code/crates/app/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ use std::path::Path;
use std::time::Duration;

use eyre::Result;
use tokio::task::JoinHandle;
use tracing::Span;

use malachitebft_codec::HasEncodedLen;
use malachitebft_engine::consensus::{Consensus, ConsensusCodec, ConsensusParams, ConsensusRef};
use malachitebft_engine::host::HostRef;
use malachitebft_engine::network::{Network, NetworkRef};
Expand All @@ -18,6 +16,8 @@ use malachitebft_network::{
ChannelNames, Config as NetworkConfig, DiscoveryConfig, GossipSubConfig, Keypair,
};
use malachitebft_sync as sync;
use tokio::task::JoinHandle;
use tracing::Span;

use crate::config::{ConsensusConfig, PubSubProtocol, ValueSyncConfig};
use crate::metrics::{Metrics, SharedRegistry};
Expand Down Expand Up @@ -60,6 +60,7 @@ where
Ctx: Context,
Codec: ConsensusCodec<Ctx>,
Codec: SyncCodec<Ctx>,
Codec: HasEncodedLen<sync::Response<Ctx>>,
{
let config = make_gossip_config(cfg);

Expand Down
1 change: 1 addition & 0 deletions code/crates/app/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod sync {

pub mod codec {
pub use malachitebft_codec::Codec;
pub use malachitebft_codec::HasEncodedLen;
pub use malachitebft_engine::consensus::ConsensusCodec;
pub use malachitebft_engine::sync::SyncCodec;
pub use malachitebft_engine::wal::WalCodec;
Expand Down
5 changes: 5 additions & 0 deletions code/crates/codec/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,8 @@ pub trait Codec<T>: Send + Sync + 'static {
fn decode(&self, bytes: Bytes) -> Result<T, Self::Error>;
fn encode(&self, msg: &T) -> Result<Bytes, Self::Error>;
}

/// Codec extension trait for types that can also compute the length of the encoded data.
pub trait HasEncodedLen<T>: Codec<T> {
fn encoded_len(&self, msg: &T) -> Result<usize, <Self as Codec<T>>::Error>;
}
4 changes: 4 additions & 0 deletions code/crates/core-consensus/src/util/bounded_queue.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::BTreeMap;
use tracing::warn;

/// A data structure that maintains a queue of values associated with monotonically increasing indices.
///
Expand Down Expand Up @@ -46,6 +47,8 @@ where
if &index < max_index {
let max_index = max_index.clone();

warn!("Bounded queue is full, dropping value");

// Remove the highest index
self.queue.remove(&max_index);

Expand All @@ -56,6 +59,7 @@ where
}
}

warn!("Bounded queue is full, no value is inserted");
false
}

Expand Down
46 changes: 28 additions & 18 deletions code/crates/engine/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,20 @@ where
// Process values sequentially starting from the lowest height
let mut height = start_height;
for value in values.iter() {
self.process_sync_response(&myself, state, peer, height, value)
.await?;
if let Err(e) = self
.process_sync_response(&myself, state, peer, height, value)
.await
{
// At this point, `process_sync_response` has already sent a message
// about an invalid value, etc. to the sync actor. The sync actor
// will then, re-request this range again from some peer.
// Because of this, in case of failing to process the response, we need
// to exit early this loop to avoid issuing multiple parallel requests
// for the same range of values. There's also no benefit in processing
// the rest of the values.
error!(%start_height, %height, %request_id, "Failed to process sync response:{e:?}");
break;
}

height = height.increment();
}
Expand Down Expand Up @@ -650,22 +662,20 @@ where
where
Ctx: Context,
{
if let Err(e) = self
.process_input(
myself,
state,
ConsensusInput::SyncValueResponse(CoreValueResponse::new(
peer,
value.value_bytes.clone(),
value.certificate.clone(),
)),
)
.await
{
error!(%height, "Error when processing received synced block: {e}");
}

Ok(())
self.process_input(
myself,
state,
ConsensusInput::SyncValueResponse(CoreValueResponse::new(
peer,
value.value_bytes.clone(),
value.certificate.clone(),
)),
)
.await
.map_err(|e| {
error!(%height, error = ?e, "Error when processing received synced block");
e.into()
})
}

async fn timeout_elapsed(
Expand Down
24 changes: 21 additions & 3 deletions code/crates/engine/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::collections::{BTreeSet, HashMap};
use std::marker::PhantomData;

use async_trait::async_trait;
use derive_where::derive_where;
use eyre::eyre;
use libp2p::identity::Keypair;
use libp2p::request_response;
use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort};
use std::collections::{BTreeSet, HashMap};
use std::marker::PhantomData;
use tokio::task::JoinHandle;
use tracing::{error, trace};

Expand Down Expand Up @@ -71,6 +70,7 @@ where
Ctx: Context,
Codec: ConsensusCodec<Ctx>,
Codec: SyncCodec<Ctx>,
Codec: codec::HasEncodedLen<sync::Response<Ctx>>,
{
pub async fn spawn(
keypair: Keypair,
Expand Down Expand Up @@ -167,6 +167,9 @@ pub enum Msg<Ctx: Context> {
/// Send a response for a request to a peer
OutgoingResponse(InboundRequestId, Response<Ctx>),

/// Request the total number of bytes that are to be transmitted for this response
GetResponseSize(Response<Ctx>, RpcReplyPort<usize>),

/// Request for number of peers from gossip
GetState { reply: RpcReplyPort<usize> },

Expand All @@ -187,6 +190,7 @@ where
Codec: codec::Codec<sync::Request<Ctx>>,
Codec: codec::Codec<sync::Response<Ctx>>,
Codec: codec::Codec<LivenessMsg<Ctx>>,
Codec: codec::HasEncodedLen<sync::Response<Ctx>>,
{
type Msg = Msg<Ctx>;
type State = State<Ctx>;
Expand Down Expand Up @@ -328,6 +332,20 @@ where
};
}

Msg::GetResponseSize(response, reply_to) => {
let encoded_len = self.codec.encoded_len(&response);

match encoded_len {
Ok(size) => {
reply_to.send(size)?;
}
Err(e) => {
error!(?response, "Failed to encode response message: {e:?}");
return Ok(());
}
};
}

Msg::NewEvent(Event::Listening(addr)) => {
listen_addrs.push(addr.clone());
output_port.send(NetworkEvent::Listening(addr));
Expand Down
48 changes: 43 additions & 5 deletions code/crates/engine/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@ use rand::SeedableRng;
use tokio::task::JoinHandle;
use tracing::{debug, error, info, warn, Instrument};

use crate::host::{HostMsg, HostRef};
use crate::network::{NetworkEvent, NetworkMsg, NetworkRef, Status};
use crate::util::ticker::ticker;
use crate::util::timers::{TimeoutElapsed, TimerScheduler};
use malachitebft_codec as codec;
use malachitebft_core_consensus::PeerId;
use malachitebft_core_types::{CommitCertificate, Context, Height};
use malachitebft_sync::Response::ValueResponse;
use malachitebft_sync::{
self as sync, HeightStartType, InboundRequestId, OutboundRequestId, RawDecidedValue, Request,
Response, Resumable,
};

use crate::host::{HostMsg, HostRef};
use crate::network::{NetworkEvent, NetworkMsg, NetworkRef, Status};
use crate::util::ticker::ticker;
use crate::util::timers::{TimeoutElapsed, TimerScheduler};

/// Codec for sync protocol messages
///
/// This trait is automatically implemented for any type that implements:
Expand Down Expand Up @@ -285,6 +285,8 @@ where
Effect::GetDecidedValues(request_id, range, r) => {
let mut values = Vec::new();
let mut height = *range.start();

let mut response_size_bytes = 0;
while height <= *range.end() {
let value = self
.host
Expand All @@ -296,7 +298,43 @@ where
.success_or(eyre!("Failed to get decided value for height {height}"))?;

if let Some(value) = value {
let value_response = ValueResponse(sync::ValueResponse::new(
*range.start(),
vec![value.clone()],
));

let result = ractor::call!(self.gossip, move |reply_to| {
NetworkMsg::GetResponseSize(value_response.clone(), reply_to)
});

let total_value_size_bytes = match result {
Ok(value_in_bytes) => value_in_bytes,
Err(e) => {
error!("Failed to get response size for value, stopping at for height {}: {:?}", height, e);
break;
}
};

// check if adding this value would exceed the max-response limit
if response_size_bytes + total_value_size_bytes
> self.sync_config.max_response_size
{
warn!("Maximum byte size limit ({} bytes) would be exceeded (current: {} + upcoming value: {}), stopping at height {}",
self.sync_config.max_response_size, response_size_bytes, total_value_size_bytes, height);
break;
}

response_size_bytes += total_value_size_bytes;
values.push(value);

if response_size_bytes == self.sync_config.max_response_size {
info!(
"Reached maximum byte size limit ({} bytes) exactly at height {}",
self.sync_config.max_response_size, height
);

break;
}
} else {
warn!("Decided value not found for height {height}");
break;
Expand Down
11 changes: 10 additions & 1 deletion code/crates/starknet/host/src/codec.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use bytes::Bytes;
use prost::Message;

use malachitebft_codec::Codec;
use malachitebft_codec::{Codec, HasEncodedLen};
use malachitebft_core_consensus::{LivenessMsg, PeerId, ProposedValue, SignedConsensusMsg};
use malachitebft_core_types::{
CommitCertificate, CommitSignature, NilOrVal, PolkaCertificate, PolkaSignature, Round,
Expand Down Expand Up @@ -222,6 +222,15 @@ impl Codec<sync::Request<MockContext>> for ProtobufCodec {
}
}

impl HasEncodedLen<sync::Response<MockContext>> for ProtobufCodec {
fn encoded_len(
&self,
response: &sync::Response<MockContext>,
) -> Result<usize, <Self as Codec<sync::Response<MockContext>>>::Error> {
Ok(encode_sync_response(response)?.encoded_len())
}
}

pub fn decode_sync_response(
proto_response: proto::sync::SyncResponse,
) -> Result<sync::Response<MockContext>, ProtoError> {
Expand Down
1 change: 1 addition & 0 deletions code/crates/starknet/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ fn apply_params(config: &mut Config, params: &TestParams) {
config.value_sync.enabled = params.enable_value_sync;
config.value_sync.parallel_requests = params.parallel_requests;
config.value_sync.batch_size = params.batch_size;
config.value_sync.max_response_size = params.max_response_size;
config.consensus.p2p.protocol = params.protocol;
config.test.max_block_size = params.block_size;
config.test.txs_per_part = params.txs_per_part;
Expand Down
Loading
Loading