Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/cyan-ducks-wonder.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/core": patch
---

Allow step retrying if it fails without proper cleanup
30 changes: 19 additions & 11 deletions packages/core/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -640,12 +640,13 @@ export const stepEntrypoint =
let result: unknown;
const attempt = step.attempt + 1;
try {
if (step.status !== 'pending') {
// We should only be running the step if it's pending
// (initial state, or state set on re-try), so the step has been
// invoked erroneously.
if (!['pending', 'running'].includes(step.status)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this gets called with status running, should we check the updatedAt timestamp to confirm we're at least a reasonable amount of time away from the last update?

I check logs for Vade, which e.g. sees about ~300 of these catch clauses per day, which would have all result in double step runs with this new code change

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I think that gets hairy and can cause nasty bugs down the line. What's a "aafe amount of time"?

I think better to err towards atleast once semantics than run the risk of hanging the workflow and never running the step

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(can be convinced otherwise)

// We should only be running the step if it's either
// a) pending - initial state, or state set on re-try
// b) running - if a step fails mid-execution, like a function timeout
// otherwise, the step has been invoked erroneously
console.error(
`[Workflows] "${workflowRunId}" - Step invoked erroneously, expected status "pending", got "${step.status}" instead, skipping execution`
`[Workflows] "${workflowRunId}" - Step invoked erroneously, expected status "pending" or "running", got "${step.status}" instead, skipping execution`
);
span?.setAttributes({
...Attribute.StepSkipped(true),
Expand Down Expand Up @@ -698,11 +699,23 @@ export const stepEntrypoint =
() => stepFn(...args)
);

// NOTE: None of the code from this point is guaranteed to run
// Since the step might fail or cause a function timeout and the process might be SIGKILL'd
// The workflow runtime must be resilient to the below code not executing on a failed step
result = dehydrateStepReturnValue(result, ops, workflowRunId);

waitUntil(Promise.all(ops));

// Update the event log with the step result
// Mark the step as completed first. This order is important. If a concurrent
// execution marked the step as complete, this request should throw, and
// this prevent the step_completed event in the event log
// TODO: this should really be atomic and handled by the world
await world.steps.update(workflowRunId, stepId, {
status: 'completed',
output: result as Serializable,
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Problem now is that if we fail to create the event, e.g. time-out in between this update and the event creation, the run is eternally stuck while the step shows up as complete

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah true :( this really needs to be atomic. Let's :itg:


// Then, append the event log with the step result
await world.events.create(workflowRunId, {
eventType: 'step_completed',
correlationId: stepId,
Expand All @@ -711,11 +724,6 @@ export const stepEntrypoint =
},
});

await world.steps.update(workflowRunId, stepId, {
status: 'completed',
output: result as Serializable,
});

span?.setAttributes({
...Attribute.StepStatus('completed'),
...Attribute.StepResultType(typeof result),
Expand Down
Loading