Skip to content

Commit 75234be

Browse files
committed
Temporarily avoid InvocationStatus::Invoked to ::Inboxed state transition when using vqueues
While it is correct that we should go from InvocationStatus::Invoked to ::Inboxed when resuming an invocation using vqueues, it is currently supported by the state machine. For it to be fully supported, we need to support handling journal completions when being ::Inboxed. Morever, we need to handle cancellations of ::Inboxed items depending on whether the invocation was running before or not. Additionally, we need to support the hotfix_apply_cancellation_after_deployment_is_pinned when going from ::Suspended to ::Inbox, for example. Until this is done, we stay in ::Invoked when resuming an invocation even when using vqueues.
1 parent ac359b2 commit 75234be

File tree

2 files changed

+34
-17
lines changed

2 files changed

+34
-17
lines changed

crates/worker/src/partition/state_machine/lifecycle/resume.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
// by the Apache License, Version 2.0.
1010

1111
use restate_invoker_api::InvokeInputJournal;
12-
use restate_storage_api::invocation_status_table::{InboxedInvocation, InvocationStatus};
12+
use restate_storage_api::invocation_status_table::InvocationStatus;
1313
use restate_storage_api::vqueue_table::{ReadVQueueTable, WriteVQueueTable};
1414
use restate_types::config::Configuration;
1515
use restate_types::identifiers::InvocationId;
@@ -45,12 +45,15 @@ where
4545
if Configuration::pinned().common.experimental_enable_vqueues {
4646
ctx.vqueue_move_invocation_to_inbox_stage(&self.invocation_id)
4747
.await?;
48-
// When moving an invocation back into the inbox stage, we have to update the
49-
// InvocationStatus accordingly. Otherwise, we miss running changes on the VQueues
50-
// inbox.
51-
*self.invocation_status = InvocationStatus::Inboxed(
52-
InboxedInvocation::from_in_flight_invocation_metadata(metadata.clone()),
53-
);
48+
49+
// todo enable once we properly support handling completions when being in status Inboxed
50+
// // When moving an invocation back into the inbox stage, we have to update the
51+
// // InvocationStatus accordingly. Otherwise, we miss running changes on the VQueues
52+
// // inbox.
53+
// *self.invocation_status = InvocationStatus::Inboxed(
54+
// InboxedInvocation::from_in_flight_invocation_metadata(metadata.clone()),
55+
// );
56+
*self.invocation_status = InvocationStatus::Invoked(metadata.clone());
5457
} else {
5558
ctx.action_collector.push(Action::Invoke {
5659
invocation_id: self.invocation_id,

crates/worker/src/partition/state_machine/mod.rs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2905,18 +2905,32 @@ impl<S> StateMachineApplyContext<'_, S> {
29052905
)
29062906
.await?;
29072907
}
2908-
InvocationStatus::Invoked(metadata) if self.is_leader => {
2909-
// just send to invoker
2910-
debug_if_leader!(self.is_leader, "Invoke");
2911-
self.action_collector.push(Action::VQInvoke {
2908+
InvocationStatus::Invoked(metadata) => {
2909+
// Temporary fix to transition the vqueue item back to the running stage. This is
2910+
// needed because we don't support yet going back from InvocationStatus::Invoked
2911+
// to InvocationStatus::Inboxed and then accepting journal completions.
2912+
// See https://github.com/restatedev/restate/blob/86d4d055ad8f3aa8b426c06486d52382a76bf9dd/crates/worker/src/partition/state_machine/lifecycle/resume.rs#L53
2913+
VQueues::new(
29122914
qid,
2913-
item_hash: card.unique_hash(),
2914-
invocation_id,
2915-
invocation_target: metadata.invocation_target,
2916-
invoke_input_journal: InvokeInputJournal::NoCachedJournal,
2917-
});
2915+
self.storage,
2916+
self.vqueues_cache,
2917+
self.is_leader.then_some(self.action_collector),
2918+
)
2919+
.attempt_to_run(at, card)
2920+
.await?;
2921+
2922+
if self.is_leader {
2923+
// just send to invoker
2924+
debug_if_leader!(self.is_leader, "Invoke");
2925+
self.action_collector.push(Action::VQInvoke {
2926+
qid,
2927+
item_hash: card.unique_hash(),
2928+
invocation_id,
2929+
invocation_target: metadata.invocation_target,
2930+
invoke_input_journal: InvokeInputJournal::NoCachedJournal,
2931+
});
2932+
}
29182933
}
2919-
InvocationStatus::Invoked(_) => { /* do nothing when not leader */ }
29202934
// Suspended invocations must first be put back on inbox. On wake-up, they
29212935
// transition back into Invoked state. So seeing a suspended invocation
29222936
// here means that some state transition is missing.

0 commit comments

Comments
 (0)