Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ restate-utoipa = { path = "crates/utoipa" }
restate-vqueues = { path = "crates/vqueues" }
restate-wal-protocol = { path = "crates/wal-protocol" }
restate-worker = { path = "crates/worker" }
restate-ingestion-client = { path = "crates/ingestion-client" }

# this workspace-hack package is overridden by a patch below to use workspace-hack subdir when building in this repo
# outside this repo, the crates.io restate-workspace-hack (an empty package) will be used instead
Expand Down
24 changes: 9 additions & 15 deletions crates/bifrost/src/background_appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::sync::Arc;

use futures::FutureExt;
use pin_project::pin_project;
use restate_types::logs::Record;
use tokio::sync::{Notify, mpsc, oneshot};
use tokio::sync::{mpsc, oneshot};
use tracing::{trace, warn};

use restate_core::{ShutdownError, TaskCenter, TaskHandle, cancellation_watcher};
Expand Down Expand Up @@ -130,8 +128,8 @@ where
batch.push(record);
notif_buffer.push(tx);
}
AppendOperation::Canary(notify) => {
notify.notify_one();
AppendOperation::Canary(tx) => {
notif_buffer.push(tx);
}
AppendOperation::MarkAsPreferred => {
appender.mark_as_preferred();
Expand Down Expand Up @@ -353,23 +351,19 @@ impl<T: StorageEncode> LogSender<T> {
Ok(CommitToken { rx })
}

/// Wait for previously enqueued records to be committed
///
/// Not cancellation safe. Every call will attempt to acquire capacity on the channel and send
/// a new message to the appender.
pub async fn notify_committed(&self) -> Result<(), EnqueueError<()>> {
/// Returns a [`CommitToken`] that is resolved once all previously enqueued records are committed.
pub async fn notify_committed(&self) -> Result<CommitToken, EnqueueError<()>> {
let Ok(permit) = self.tx.reserve().await else {
// channel is closed, this should happen the appender is draining or has been darained
// already
return Err(EnqueueError::Closed(()));
};

let notify = Arc::new(Notify::new());
let canary = AppendOperation::Canary(notify.clone());
let (tx, rx) = oneshot::channel();
let canary = AppendOperation::Canary(tx);
permit.send(canary);

notify.notified().await;
Ok(())
Ok(CommitToken { rx })
}

/// Marks this node as a preferred writer for the underlying log
Expand Down Expand Up @@ -422,7 +416,7 @@ enum AppendOperation {
EnqueueWithNotification(Record, oneshot::Sender<()>),
// A message denoting a request to be notified when it's processed by the appender.
// It's used to check if previously enqueued appends have been committed or not
Canary(Arc<Notify>),
Canary(oneshot::Sender<()>),
/// Let's bifrost know that this node is the preferred writer of this log
MarkAsPreferred,
/// Let's bifrost know that this node might not be the preferred writer of this log
Expand Down
44 changes: 26 additions & 18 deletions crates/bifrost/src/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use core::str;
use std::marker::PhantomData;
use std::sync::Arc;

use bytes::Bytes;
use restate_types::logs::{BodyWithKeys, HasRecordKeys, Keys, Lsn, Record};
use restate_types::logs::{LogletOffset, SequenceNumber};
use restate_types::storage::{PolyBytes, StorageDecode, StorageDecodeError, StorageEncode};
Expand Down Expand Up @@ -200,29 +201,19 @@ pub struct Gap<S> {
pub to: S,
}

#[derive(Clone)]
pub struct InputRecord<T> {
created_at: NanosSinceEpoch,
keys: Keys,
body: Arc<dyn StorageEncode>,
body: PolyBytes,
_phantom: PhantomData<T>,
}

impl<T> Clone for InputRecord<T> {
fn clone(&self) -> Self {
Self {
created_at: self.created_at,
keys: self.keys.clone(),
body: Arc::clone(&self.body),
_phantom: self._phantom,
}
}
}

// This is a zero-cost transformation. The type is erased at runtime, but the underlying
// layout is identical.
impl<T: StorageEncode> InputRecord<T> {
pub fn into_record(self) -> Record {
Record::from_parts(self.created_at, self.keys, PolyBytes::Typed(self.body))
Record::from_parts(self.created_at, self.keys, self.body)
}
}

Expand All @@ -231,7 +222,24 @@ impl<T: StorageEncode> InputRecord<T> {
Self {
created_at,
keys,
body,
body: PolyBytes::Typed(body),
_phantom: PhantomData,
}
}

/// Builds an [`InputRecord<T>`] directly from raw bytes without validating the payload.
///
/// # Safety
/// Caller must guarantee the bytes are a correctly storage-encoded `T`.
pub unsafe fn from_bytes_unchecked(
created_at: NanosSinceEpoch,
keys: Keys,
body: Bytes,
) -> Self {
Self {
created_at,
keys,
body: PolyBytes::Bytes(body),
_phantom: PhantomData,
}
}
Expand All @@ -246,7 +254,7 @@ impl<T: StorageEncode + HasRecordKeys> From<Arc<T>> for InputRecord<T> {
InputRecord {
created_at: NanosSinceEpoch::now(),
keys: val.record_keys(),
body: val,
body: PolyBytes::Typed(val),
_phantom: PhantomData,
}
}
Expand All @@ -257,7 +265,7 @@ impl From<String> for InputRecord<String> {
InputRecord {
created_at: NanosSinceEpoch::now(),
keys: Keys::None,
body: Arc::new(val),
body: PolyBytes::Typed(Arc::new(val)),
_phantom: PhantomData,
}
}
Expand All @@ -268,7 +276,7 @@ impl From<&str> for InputRecord<String> {
InputRecord {
created_at: NanosSinceEpoch::now(),
keys: Keys::None,
body: Arc::new(String::from(val)),
body: PolyBytes::Typed(Arc::new(String::from(val))),
_phantom: PhantomData,
}
}
Expand All @@ -279,7 +287,7 @@ impl<T: StorageEncode> From<BodyWithKeys<T>> for InputRecord<T> {
InputRecord {
created_at: NanosSinceEpoch::now(),
keys: val.record_keys(),
body: Arc::new(val.into_inner()),
body: PolyBytes::Typed(Arc::new(val.into_inner())),
_phantom: PhantomData,
}
}
Expand Down
12 changes: 2 additions & 10 deletions crates/core/src/worker_api/partition_processor_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ pub enum RpcErrorKind {
Busy,
#[error("internal error: {0}")]
Internal(String),
#[error("partition processor starting")]
Starting,
#[error("partition processor stopping")]
Stopping,
}

impl PartitionProcessorInvocationClientError {
Expand Down Expand Up @@ -106,10 +102,8 @@ impl RpcError {
match self.source {
RpcErrorKind::Connect(_)
| RpcErrorKind::NotLeader
| RpcErrorKind::Starting
| RpcErrorKind::Busy
| RpcErrorKind::SendFailed
| RpcErrorKind::Stopping => {
| RpcErrorKind::SendFailed => {
// These are pre-flight error that we can distinguish,
// and for which we know for certain that no message was proposed yet to the log.
true
Expand Down Expand Up @@ -143,7 +137,7 @@ impl From<RpcReplyError> for RpcErrorKind {
RpcReplyError::ServiceNotFound | RpcReplyError::SortCodeNotFound => Self::NotLeader,
RpcReplyError::LoadShedding => Self::Busy,
RpcReplyError::ServiceNotReady => Self::Busy,
RpcReplyError::ServiceStopped => Self::Stopping,
RpcReplyError::ServiceStopped => Self::NotLeader,
}
}
}
Expand All @@ -154,8 +148,6 @@ impl From<PartitionProcessorRpcError> for RpcErrorKind {
PartitionProcessorRpcError::NotLeader(_) => RpcErrorKind::NotLeader,
PartitionProcessorRpcError::LostLeadership(_) => RpcErrorKind::LostLeadership,
PartitionProcessorRpcError::Internal(msg) => RpcErrorKind::Internal(msg),
PartitionProcessorRpcError::Starting => RpcErrorKind::Starting,
PartitionProcessorRpcError::Stopping => RpcErrorKind::Stopping,
}
}
}
Expand Down
30 changes: 30 additions & 0 deletions crates/ingestion-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[package]
name = "restate-ingestion-client"
version.workspace = true
authors.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
publish = false

[dependencies]
arc-swap = { workspace = true }
dashmap = { workspace = true }
futures = { workspace = true }
pin-project = { workspace = true }
thiserror = { workspace = true }
tokio-stream = { workspace = true }
tokio-util = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }

restate-core = { workspace = true }
restate-types = { workspace = true }
restate-workspace-hack = { workspace = true }

[dev-dependencies]
bytes = { workspace = true }
googletest = { workspace = true }
test-log = { workspace = true }

restate-core = { workspace = true, features = ["test-util"] }
Loading
Loading