Skip to content

Commit a16b71e

Browse files
committed
[PP] Handle IngestRequest message
Summary: Handle the incoming `IngestRequest` messages sent by the `ingestion-client`
1 parent 03cf056 commit a16b71e

File tree

8 files changed

+242
-65
lines changed

8 files changed

+242
-65
lines changed

crates/bifrost/src/record.rs

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use core::str;
1212
use std::marker::PhantomData;
1313
use std::sync::Arc;
1414

15+
use bytes::Bytes;
1516
use restate_types::logs::{BodyWithKeys, HasRecordKeys, Keys, Lsn, Record};
1617
use restate_types::logs::{LogletOffset, SequenceNumber};
1718
use restate_types::storage::{PolyBytes, StorageDecode, StorageDecodeError, StorageEncode};
@@ -200,29 +201,19 @@ pub struct Gap<S> {
200201
pub to: S,
201202
}
202203

204+
#[derive(Clone)]
203205
pub struct InputRecord<T> {
204206
created_at: NanosSinceEpoch,
205207
keys: Keys,
206-
body: Arc<dyn StorageEncode>,
208+
body: PolyBytes,
207209
_phantom: PhantomData<T>,
208210
}
209211

210-
impl<T> Clone for InputRecord<T> {
211-
fn clone(&self) -> Self {
212-
Self {
213-
created_at: self.created_at,
214-
keys: self.keys.clone(),
215-
body: Arc::clone(&self.body),
216-
_phantom: self._phantom,
217-
}
218-
}
219-
}
220-
221212
// This is a zero-cost transformation. The type is erased at runtime, but the underlying
222213
// layout is identical.
223214
impl<T: StorageEncode> InputRecord<T> {
224215
pub fn into_record(self) -> Record {
225-
Record::from_parts(self.created_at, self.keys, PolyBytes::Typed(self.body))
216+
Record::from_parts(self.created_at, self.keys, self.body)
226217
}
227218
}
228219

@@ -231,7 +222,24 @@ impl<T: StorageEncode> InputRecord<T> {
231222
Self {
232223
created_at,
233224
keys,
234-
body,
225+
body: PolyBytes::Typed(body),
226+
_phantom: PhantomData,
227+
}
228+
}
229+
230+
/// Builds an [`InputRecord<T>`] directly from raw bytes without validating the payload.
231+
///
232+
/// # Safety
233+
/// Caller must guarantee the bytes are a correctly storage-encoded `T`.
234+
pub unsafe fn from_bytes_unchecked(
235+
created_at: NanosSinceEpoch,
236+
keys: Keys,
237+
body: Bytes,
238+
) -> Self {
239+
Self {
240+
created_at,
241+
keys,
242+
body: PolyBytes::Bytes(body),
235243
_phantom: PhantomData,
236244
}
237245
}
@@ -246,7 +254,7 @@ impl<T: StorageEncode + HasRecordKeys> From<Arc<T>> for InputRecord<T> {
246254
InputRecord {
247255
created_at: NanosSinceEpoch::now(),
248256
keys: val.record_keys(),
249-
body: val,
257+
body: PolyBytes::Typed(val),
250258
_phantom: PhantomData,
251259
}
252260
}
@@ -257,7 +265,7 @@ impl From<String> for InputRecord<String> {
257265
InputRecord {
258266
created_at: NanosSinceEpoch::now(),
259267
keys: Keys::None,
260-
body: Arc::new(val),
268+
body: PolyBytes::Typed(Arc::new(val)),
261269
_phantom: PhantomData,
262270
}
263271
}
@@ -268,7 +276,7 @@ impl From<&str> for InputRecord<String> {
268276
InputRecord {
269277
created_at: NanosSinceEpoch::now(),
270278
keys: Keys::None,
271-
body: Arc::new(String::from(val)),
279+
body: PolyBytes::Typed(Arc::new(String::from(val))),
272280
_phantom: PhantomData,
273281
}
274282
}
@@ -279,7 +287,7 @@ impl<T: StorageEncode> From<BodyWithKeys<T>> for InputRecord<T> {
279287
InputRecord {
280288
created_at: NanosSinceEpoch::now(),
281289
keys: val.record_keys(),
282-
body: Arc::new(val.into_inner()),
290+
body: PolyBytes::Typed(Arc::new(val.into_inner())),
283291
_phantom: PhantomData,
284292
}
285293
}

crates/core/src/worker_api/partition_processor_rpc_client.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,6 @@ pub enum RpcErrorKind {
7171
Busy,
7272
#[error("internal error: {0}")]
7373
Internal(String),
74-
#[error("partition processor starting")]
75-
Starting,
76-
#[error("partition processor stopping")]
77-
Stopping,
7874
}
7975

8076
impl PartitionProcessorInvocationClientError {
@@ -106,10 +102,8 @@ impl RpcError {
106102
match self.source {
107103
RpcErrorKind::Connect(_)
108104
| RpcErrorKind::NotLeader
109-
| RpcErrorKind::Starting
110105
| RpcErrorKind::Busy
111-
| RpcErrorKind::SendFailed
112-
| RpcErrorKind::Stopping => {
106+
| RpcErrorKind::SendFailed => {
113107
// These are pre-flight error that we can distinguish,
114108
// and for which we know for certain that no message was proposed yet to the log.
115109
true
@@ -143,7 +137,7 @@ impl From<RpcReplyError> for RpcErrorKind {
143137
RpcReplyError::ServiceNotFound | RpcReplyError::SortCodeNotFound => Self::NotLeader,
144138
RpcReplyError::LoadShedding => Self::Busy,
145139
RpcReplyError::ServiceNotReady => Self::Busy,
146-
RpcReplyError::ServiceStopped => Self::Stopping,
140+
RpcReplyError::ServiceStopped => Self::LostLeadership,
147141
}
148142
}
149143
}
@@ -154,8 +148,6 @@ impl From<PartitionProcessorRpcError> for RpcErrorKind {
154148
PartitionProcessorRpcError::NotLeader(_) => RpcErrorKind::NotLeader,
155149
PartitionProcessorRpcError::LostLeadership(_) => RpcErrorKind::LostLeadership,
156150
PartitionProcessorRpcError::Internal(msg) => RpcErrorKind::Internal(msg),
157-
PartitionProcessorRpcError::Starting => RpcErrorKind::Starting,
158-
PartitionProcessorRpcError::Stopping => RpcErrorKind::Stopping,
159151
}
160152
}
161153
}

crates/types/src/net/partition_processor.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -142,20 +142,14 @@ pub enum PartitionProcessorRpcError {
142142
//Busy,
143143
#[error("internal error: {0}")]
144144
Internal(String),
145-
#[error("partition processor starting")]
146-
Starting,
147-
#[error("partition processor stopping")]
148-
Stopping,
149145
}
150146

151147
impl PartitionProcessorRpcError {
152148
pub fn likely_stale_route(&self) -> bool {
153149
match self {
154150
PartitionProcessorRpcError::NotLeader(_) => true,
155151
PartitionProcessorRpcError::LostLeadership(_) => true,
156-
PartitionProcessorRpcError::Stopping => true,
157152
PartitionProcessorRpcError::Internal(_) => false,
158-
PartitionProcessorRpcError::Starting => false,
159153
}
160154
}
161155
}

crates/worker/src/metric_definitions.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ pub const PARTITION_IS_EFFECTIVE_LEADER: &str = "restate.partition.is_effective_
3333
pub const PARTITION_RECORD_COMMITTED_TO_READ_LATENCY_SECONDS: &str =
3434
"restate.partition.record_committed_to_read_latency.seconds";
3535

36+
pub const PARTITION_INGESTION_REQUEST_LEN: &str = "restate.partition.ingest.request.len";
37+
pub const PARTITION_INGESTION_REQUEST_SIZE: &str = "restate.partition.ingest.request.size.bytes";
38+
3639
pub(crate) fn describe_metrics() {
3740
describe_gauge!(
3841
PARTITION_BLOCKED_FLARE,
@@ -97,4 +100,16 @@ pub(crate) fn describe_metrics() {
97100
Unit::Count,
98101
"Number of records between last applied lsn and the log tail"
99102
);
103+
104+
describe_histogram!(
105+
PARTITION_INGESTION_REQUEST_LEN,
106+
Unit::Count,
107+
"Number of records in a single ingestion request"
108+
);
109+
110+
describe_histogram!(
111+
PARTITION_INGESTION_REQUEST_SIZE,
112+
Unit::Bytes,
113+
"Total size of records in a single ingestion request"
114+
);
100115
}

crates/worker/src/partition/leadership/leader_state.rs

Lines changed: 67 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use restate_types::identifiers::{
3636
};
3737
use restate_types::invocation::client::{InvocationOutput, SubmittedInvocationNotification};
3838
use restate_types::logs::Keys;
39+
use restate_types::net::ingest::IngestRecord;
3940
use restate_types::net::partition_processor::{
4041
PartitionProcessorRpcError, PartitionProcessorRpcResponse,
4142
};
@@ -435,14 +436,35 @@ impl LeaderState {
435436
Ok(commit_token) => {
436437
self.awaiting_rpc_self_propose.push(SelfAppendFuture::new(
437438
commit_token,
438-
success_response,
439-
reciprocal,
439+
|result: Result<(), PartitionProcessorRpcError>| {
440+
reciprocal.send(result.map(|_| success_response));
441+
},
440442
));
441443
}
442444
Err(e) => reciprocal.send(Err(PartitionProcessorRpcError::Internal(e.to_string()))),
443445
}
444446
}
445447

448+
pub async fn propose_many_with_callback<F>(
449+
&mut self,
450+
records: impl ExactSizeIterator<Item = IngestRecord>,
451+
callback: F,
452+
) where
453+
F: FnOnce(Result<(), PartitionProcessorRpcError>) + Send + Sync + 'static,
454+
{
455+
match self
456+
.self_proposer
457+
.propose_many_with_notification(records)
458+
.await
459+
{
460+
Ok(commit_token) => {
461+
self.awaiting_rpc_self_propose
462+
.push(SelfAppendFuture::new(commit_token, callback));
463+
}
464+
Err(e) => callback(Err(PartitionProcessorRpcError::Internal(e.to_string()))),
465+
}
466+
}
467+
446468
pub fn handle_actions(
447469
&mut self,
448470
invoker_tx: &mut impl restate_invoker_api::InvokerHandle<InvokerStorageReader<PartitionStore>>,
@@ -686,42 +708,72 @@ impl LeaderState {
686708
}
687709
}
688710

711+
trait CallbackInner: Send + Sync + 'static {
712+
fn call(self: Box<Self>, result: Result<(), PartitionProcessorRpcError>);
713+
}
714+
715+
impl<F> CallbackInner for F
716+
where
717+
F: FnOnce(Result<(), PartitionProcessorRpcError>) + Send + Sync + 'static,
718+
{
719+
fn call(self: Box<Self>, result: Result<(), PartitionProcessorRpcError>) {
720+
self(result)
721+
}
722+
}
723+
724+
struct Callback {
725+
inner: Box<dyn CallbackInner>,
726+
}
727+
728+
impl Callback {
729+
fn call(self, result: Result<(), PartitionProcessorRpcError>) {
730+
self.inner.call(result);
731+
}
732+
}
733+
734+
impl<I> From<I> for Callback
735+
where
736+
I: CallbackInner,
737+
{
738+
fn from(value: I) -> Self {
739+
Self {
740+
inner: Box::new(value),
741+
}
742+
}
743+
}
744+
689745
struct SelfAppendFuture {
690746
commit_token: CommitToken,
691-
response: Option<(PartitionProcessorRpcResponse, RpcReciprocal)>,
747+
callback: Option<Callback>,
692748
}
693749

694750
impl SelfAppendFuture {
695-
fn new(
696-
commit_token: CommitToken,
697-
success_response: PartitionProcessorRpcResponse,
698-
response_reciprocal: RpcReciprocal,
699-
) -> Self {
751+
fn new(commit_token: CommitToken, callback: impl Into<Callback>) -> Self {
700752
Self {
701753
commit_token,
702-
response: Some((success_response, response_reciprocal)),
754+
callback: Some(callback.into()),
703755
}
704756
}
705757

706758
fn fail_with_internal(&mut self) {
707-
if let Some((_, reciprocal)) = self.response.take() {
708-
reciprocal.send(Err(PartitionProcessorRpcError::Internal(
759+
if let Some(callback) = self.callback.take() {
760+
callback.call(Err(PartitionProcessorRpcError::Internal(
709761
"error when proposing to bifrost".to_string(),
710762
)));
711763
}
712764
}
713765

714766
fn fail_with_lost_leadership(&mut self, this_partition_id: PartitionId) {
715-
if let Some((_, reciprocal)) = self.response.take() {
716-
reciprocal.send(Err(PartitionProcessorRpcError::LostLeadership(
767+
if let Some(callback) = self.callback.take() {
768+
callback.call(Err(PartitionProcessorRpcError::LostLeadership(
717769
this_partition_id,
718770
)));
719771
}
720772
}
721773

722774
fn succeed_with_appended(&mut self) {
723-
if let Some((success_response, reciprocal)) = self.response.take() {
724-
reciprocal.send(Ok(success_response));
775+
if let Some(callback) = self.callback.take() {
776+
callback.call(Ok(()))
725777
}
726778
}
727779
}

crates/worker/src/partition/leadership/mod.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,15 @@ use restate_types::errors::GenericError;
4949
use restate_types::identifiers::{InvocationId, PartitionKey, PartitionProcessorRpcRequestId};
5050
use restate_types::identifiers::{LeaderEpoch, PartitionLeaderEpoch};
5151
use restate_types::message::MessageIndex;
52+
use restate_types::net::ingest::IngestRecord;
5253
use restate_types::net::partition_processor::{
5354
PartitionProcessorRpcError, PartitionProcessorRpcResponse,
5455
};
5556
use restate_types::partitions::Partition;
5657
use restate_types::partitions::state::PartitionReplicaSetStates;
5758
use restate_types::retries::with_jitter;
5859
use restate_types::schema::Schema;
59-
use restate_types::storage::StorageEncodeError;
60+
use restate_types::storage::{StorageDecodeError, StorageEncodeError};
6061
use restate_vqueues::{SchedulerService, VQueuesMeta, VQueuesMetaMut};
6162
use restate_wal_protocol::Command;
6263
use restate_wal_protocol::control::{AnnounceLeader, PartitionDurability};
@@ -86,7 +87,9 @@ pub(crate) enum Error {
8687
#[error("failed writing to bifrost: {0}")]
8788
Bifrost(#[from] restate_bifrost::Error),
8889
#[error("failed serializing payload: {0}")]
89-
Codec(#[from] StorageEncodeError),
90+
Encode(#[from] StorageEncodeError),
91+
#[error("failed deserializing payload: {0}")]
92+
Decode(#[from] StorageDecodeError),
9093
#[error(transparent)]
9194
Shutdown(#[from] ShutdownError),
9295
#[error("error when self proposing")]
@@ -648,6 +651,26 @@ impl<I> LeadershipState<I> {
648651
}
649652
}
650653
}
654+
655+
/// propose to this partition
656+
pub async fn propose_many_with_callback<F>(
657+
&mut self,
658+
records: impl ExactSizeIterator<Item = IngestRecord>,
659+
callback: F,
660+
) where
661+
F: FnOnce(Result<(), PartitionProcessorRpcError>) + Send + Sync + 'static,
662+
{
663+
match &mut self.state {
664+
State::Follower | State::Candidate { .. } => callback(Err(
665+
PartitionProcessorRpcError::NotLeader(self.partition.partition_id),
666+
)),
667+
State::Leader(leader_state) => {
668+
leader_state
669+
.propose_many_with_callback(records, callback)
670+
.await;
671+
}
672+
}
673+
}
651674
}
652675
#[derive(Debug, derive_more::From)]
653676
struct TimerReader(PartitionStore);

0 commit comments

Comments
 (0)