Skip to content

Commit ac359b2

Browse files
committed
Set InvocationStatus correctly when resuming invocation with vqueues
When resuming an invocation using the vqueue scheduler we cannot go directly into the InvocationStatus::Invoked since we first need to get scheduled again. As part of the scheduling the system needs to go through a few state changes that depend on the InvocationStatus. Hence, this commit sets the InvocationStatus to Inboxed when moving an item back into the inbox stage.
1 parent 30eea20 commit ac359b2

File tree

5 files changed

+140
-43
lines changed

5 files changed

+140
-43
lines changed

crates/storage-api/src/invocation_status_table/mod.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,20 @@ impl InvocationStatus {
271271
})
272272
| InvocationStatus::Completed(CompletedInvocation {
273273
journal_metadata, ..
274+
})
275+
// When using vqueues, we can be in Inboxed state after yield from running. In this
276+
// case, there will be a journal created for this invocation.
277+
| InvocationStatus::Inboxed(InboxedInvocation {
278+
metadata:
279+
PreFlightInvocationMetadata {
280+
input:
281+
PreFlightInvocationArgument::Journal(PreFlightInvocationJournal {
282+
journal_metadata,
283+
..
284+
}),
285+
..
286+
},
287+
..
274288
}) => Some(journal_metadata),
275289
_ => None,
276290
}
@@ -539,6 +553,30 @@ impl PreFlightInvocationMetadata {
539553
}
540554
}
541555

556+
pub fn from_in_flight_invocation_metadata(
557+
in_flight_invocation_metadata: InFlightInvocationMetadata,
558+
) -> Self {
559+
Self {
560+
response_sinks: in_flight_invocation_metadata.response_sinks,
561+
timestamps: in_flight_invocation_metadata.timestamps,
562+
invocation_target: in_flight_invocation_metadata.invocation_target,
563+
source: in_flight_invocation_metadata.source,
564+
execution_time: in_flight_invocation_metadata.execution_time,
565+
completion_retention_duration: in_flight_invocation_metadata
566+
.completion_retention_duration,
567+
journal_retention_duration: in_flight_invocation_metadata.journal_retention_duration,
568+
idempotency_key: in_flight_invocation_metadata.idempotency_key,
569+
created_using_restate_version: in_flight_invocation_metadata
570+
.created_using_restate_version,
571+
random_seed: in_flight_invocation_metadata.random_seed,
572+
// we must have created the journal when coming from an InFlightInvocationMetadata
573+
input: PreFlightInvocationArgument::Journal(PreFlightInvocationJournal {
574+
journal_metadata: in_flight_invocation_metadata.journal_metadata,
575+
pinned_deployment: in_flight_invocation_metadata.pinned_deployment,
576+
}),
577+
}
578+
}
579+
542580
pub fn span_context(&self) -> &ServiceInvocationSpanContext {
543581
self.input.span_context()
544582
}
@@ -579,6 +617,20 @@ impl InboxedInvocation {
579617
timestamp,
580618
)
581619
}
620+
621+
/// Important: This conversion should only be used by the new vqueue based scheduler
622+
pub fn from_in_flight_invocation_metadata(
623+
in_flight_invocation_metadata: InFlightInvocationMetadata,
624+
) -> Self {
625+
Self {
626+
// The new vqueue based scheduler no longer uses the old inboxes. Hence, we can put an
627+
// arbitrary value here.
628+
inbox_sequence_number: 1,
629+
metadata: PreFlightInvocationMetadata::from_in_flight_invocation_metadata(
630+
in_flight_invocation_metadata,
631+
),
632+
}
633+
}
582634
}
583635

584636
/// This map is used to record trim points and determine whether a completion from an old epoch should be accepted or rejected.

crates/worker/src/partition/state_machine/lifecycle/paused.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use restate_storage_api::invocation_status_table::{
1515
InvocationStatus, ReadInvocationStatusTable, WriteInvocationStatusTable,
1616
};
1717
use restate_storage_api::journal_events::WriteJournalEventsTable;
18+
use restate_storage_api::service_status_table::WriteVirtualObjectStatusTable;
1819
use restate_storage_api::vqueue_table::{ReadVQueueTable, WriteVQueueTable};
1920
use restate_types::config::Configuration;
2021
use restate_types::identifiers::InvocationId;
@@ -32,7 +33,8 @@ where
3233
+ WriteInvocationStatusTable
3334
+ WriteJournalEventsTable
3435
+ WriteVQueueTable
35-
+ ReadVQueueTable,
36+
+ ReadVQueueTable
37+
+ WriteVirtualObjectStatusTable,
3638
{
3739
async fn apply(self, ctx: &'ctx mut StateMachineApplyContext<'s, S>) -> Result<(), Error> {
3840
let OnPausedCommand {

crates/worker/src/partition/state_machine/lifecycle/resume.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
// by the Apache License, Version 2.0.
1010

1111
use restate_invoker_api::InvokeInputJournal;
12-
use restate_storage_api::invocation_status_table::InvocationStatus;
12+
use restate_storage_api::invocation_status_table::{InboxedInvocation, InvocationStatus};
1313
use restate_storage_api::vqueue_table::{ReadVQueueTable, WriteVQueueTable};
1414
use restate_types::config::Configuration;
1515
use restate_types::identifiers::InvocationId;
@@ -45,16 +45,22 @@ where
4545
if Configuration::pinned().common.experimental_enable_vqueues {
4646
ctx.vqueue_move_invocation_to_inbox_stage(&self.invocation_id)
4747
.await?;
48+
// When moving an invocation back into the inbox stage, we have to update the
49+
// InvocationStatus accordingly. Otherwise, we miss running changes on the VQueues
50+
// inbox.
51+
*self.invocation_status = InvocationStatus::Inboxed(
52+
InboxedInvocation::from_in_flight_invocation_metadata(metadata.clone()),
53+
);
4854
} else {
4955
ctx.action_collector.push(Action::Invoke {
5056
invocation_id: self.invocation_id,
5157
invocation_epoch: current_invocation_epoch,
5258
invocation_target,
5359
invoke_input_journal: InvokeInputJournal::NoCachedJournal,
5460
});
55-
}
5661

57-
*self.invocation_status = InvocationStatus::Invoked(metadata.clone());
62+
*self.invocation_status = InvocationStatus::Invoked(metadata.clone());
63+
}
5864

5965
Ok(())
6066
}

crates/worker/src/partition/state_machine/lifecycle/suspend.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use crate::partition::state_machine::{CommandHandler, Error, ParkCause, StateMachineApplyContext};
1212
use restate_storage_api::invocation_status_table::{InvocationStatus, WriteInvocationStatusTable};
1313
use restate_storage_api::journal_table_v2::ReadJournalTable;
14+
use restate_storage_api::service_status_table::WriteVirtualObjectStatusTable;
1415
use restate_storage_api::vqueue_table::{ReadVQueueTable, WriteVQueueTable};
1516
use restate_types::config::Configuration;
1617
use restate_types::identifiers::InvocationId;
@@ -27,7 +28,11 @@ pub struct OnSuspendCommand {
2728
impl<'ctx, 's: 'ctx, S> CommandHandler<&'ctx mut StateMachineApplyContext<'s, S>>
2829
for OnSuspendCommand
2930
where
30-
S: ReadJournalTable + WriteInvocationStatusTable + WriteVQueueTable + ReadVQueueTable,
31+
S: ReadJournalTable
32+
+ WriteInvocationStatusTable
33+
+ WriteVQueueTable
34+
+ ReadVQueueTable
35+
+ WriteVirtualObjectStatusTable,
3136
{
3237
async fn apply(self, ctx: &'ctx mut StateMachineApplyContext<'s, S>) -> Result<(), Error> {
3338
debug_assert!(

crates/worker/src/partition/state_machine/mod.rs

Lines changed: 70 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1177,17 +1177,6 @@ impl<S> StateMachineApplyContext<'_, S> {
11771177
where
11781178
S: WriteJournalTable + WriteInvocationStatusTable + WriteVQueueTable + ReadVQueueTable,
11791179
{
1180-
// Usage metering for "actions" should include the Input journal entry
1181-
// type, but it gets filtered out before reaching the state machine.
1182-
// Therefore we count it here, as a special case.
1183-
if self.is_leader {
1184-
counter!(
1185-
USAGE_LEADER_JOURNAL_ENTRY_COUNT,
1186-
"entry" => "Command/Input",
1187-
)
1188-
.increment(1);
1189-
}
1190-
11911180
let invoke_input_journal = if let Some(invocation_input) = invocation_input {
11921181
self.init_journal(
11931182
invocation_id,
@@ -1255,6 +1244,17 @@ impl<S> StateMachineApplyContext<'_, S> {
12551244
where
12561245
S: WriteJournalTable,
12571246
{
1247+
// Usage metering for "actions" should include the Input journal entry
1248+
// type, but it gets filtered out before reaching the state machine.
1249+
// Therefore we count it here, as a special case.
1250+
if self.is_leader {
1251+
counter!(
1252+
USAGE_LEADER_JOURNAL_ENTRY_COUNT,
1253+
"entry" => "Command/Input",
1254+
)
1255+
.increment(1);
1256+
}
1257+
12581258
debug_if_leader!(self.is_leader, "Init journal with input entry");
12591259

12601260
// In our current data model, ServiceInvocation has always an input, so initial length is 1
@@ -2860,27 +2860,31 @@ impl<S> StateMachineApplyContext<'_, S> {
28602860
invocation_target.invocation_target_ty(),
28612861
InvocationTargetType::VirtualObject(VirtualObjectHandlerType::Exclusive)
28622862
) {
2863-
let keyed_service_id = invocation_target.as_keyed_service_id().expect(
2864-
"When the handler type is Exclusive, the invocation target must have a key",
2865-
);
2866-
match self
2867-
.storage
2868-
.get_virtual_object_status(&keyed_service_id)
2869-
.await?
2870-
{
2871-
VirtualObjectStatus::Locked(iid) => {
2872-
panic!(
2873-
"invariant violated trying to run an invocation {invocation_id} on a VO while another invocation {iid} is holding the lock"
2874-
);
2875-
}
2876-
VirtualObjectStatus::Unlocked => {
2877-
// Lock the service
2878-
self.storage
2879-
.put_virtual_object_status(
2880-
&keyed_service_id,
2881-
&VirtualObjectStatus::Locked(invocation_id),
2882-
)
2883-
.map_err(Error::Storage)?;
2863+
// We might have already locked the service before if we are resuming, for example.
2864+
// Hence, only lock it if we are not holding a token yet.
2865+
if !card.priority.token_held() {
2866+
let keyed_service_id = invocation_target.as_keyed_service_id().expect(
2867+
"When the handler type is Exclusive, the invocation target must have a key",
2868+
);
2869+
match self
2870+
.storage
2871+
.get_virtual_object_status(&keyed_service_id)
2872+
.await?
2873+
{
2874+
VirtualObjectStatus::Locked(iid) => {
2875+
panic!(
2876+
"invariant violated trying to run an invocation {invocation_id} on a VO while another invocation {iid} is holding the lock"
2877+
);
2878+
}
2879+
VirtualObjectStatus::Unlocked => {
2880+
// Lock the service
2881+
self.storage
2882+
.put_virtual_object_status(
2883+
&keyed_service_id,
2884+
&VirtualObjectStatus::Locked(invocation_id),
2885+
)
2886+
.map_err(Error::Storage)?;
2887+
}
28842888
}
28852889
}
28862890
}
@@ -4379,15 +4383,25 @@ impl<S> StateMachineApplyContext<'_, S> {
43794383
let current_invocation_epoch = metadata.current_invocation_epoch;
43804384

43814385
metadata.timestamps.update(self.record_created_at);
4382-
let invocation_target = metadata.invocation_target.clone();
4383-
self.storage
4384-
.put_invocation_status(&invocation_id, &InvocationStatus::Invoked(metadata))
4385-
.map_err(Error::Storage)?;
43864386

43874387
if Configuration::pinned().common.experimental_enable_vqueues {
43884388
self.vqueue_move_invocation_to_inbox_stage(&invocation_id)
43894389
.await?;
4390+
self.storage
4391+
.put_invocation_status(
4392+
&invocation_id,
4393+
&InvocationStatus::Inboxed(
4394+
InboxedInvocation::from_in_flight_invocation_metadata(metadata),
4395+
),
4396+
)
4397+
.map_err(Error::Storage)?;
43904398
} else {
4399+
let invocation_target = metadata.invocation_target.clone();
4400+
4401+
self.storage
4402+
.put_invocation_status(&invocation_id, &InvocationStatus::Invoked(metadata))
4403+
.map_err(Error::Storage)?;
4404+
43914405
self.action_collector.push(Action::Invoke {
43924406
invocation_id,
43934407
invocation_epoch: current_invocation_epoch,
@@ -4416,7 +4430,10 @@ impl<S> StateMachineApplyContext<'_, S> {
44164430
waiting_for_completed_entries: HashSet<EntryIndex>,
44174431
) -> Result<(), Error>
44184432
where
4419-
S: WriteInvocationStatusTable + WriteVQueueTable + ReadVQueueTable,
4433+
S: WriteInvocationStatusTable
4434+
+ WriteVQueueTable
4435+
+ ReadVQueueTable
4436+
+ WriteVirtualObjectStatusTable,
44204437
{
44214438
debug_if_leader!(
44224439
self.is_leader,
@@ -5077,7 +5094,7 @@ impl<S> StateMachineApplyContext<'_, S> {
50775094
cause: ParkCause,
50785095
) -> Result<(), Error>
50795096
where
5080-
S: WriteVQueueTable + ReadVQueueTable,
5097+
S: WriteVQueueTable + ReadVQueueTable + WriteVirtualObjectStatusTable,
50815098
{
50825099
let qid = Self::vqueue_id_from_invocation(invocation_id, invocation_target);
50835100

@@ -5139,6 +5156,21 @@ impl<S> StateMachineApplyContext<'_, S> {
51395156
)
51405157
.await?;
51415158

5159+
// If we release the concurrency token for exclusive VO handlers, then we also need to
5160+
// unlock it so that other invocations can make progress.
5161+
if should_release_concurrency_token
5162+
&& invocation_target.invocation_target_ty()
5163+
== InvocationTargetType::VirtualObject(VirtualObjectHandlerType::Exclusive)
5164+
{
5165+
let keyed_service_id = invocation_target.as_keyed_service_id().expect(
5166+
"When the handler type is Exclusive, the invocation target must have a key",
5167+
);
5168+
// We consumed the inbox, nothing else to do here
5169+
self.storage
5170+
.put_virtual_object_status(&keyed_service_id, &VirtualObjectStatus::Unlocked)
5171+
.map_err(Error::Storage)?;
5172+
}
5173+
51425174
Ok(())
51435175
}
51445176

0 commit comments

Comments
 (0)