diff --git a/.changeset/cyan-ducks-wonder.md b/.changeset/cyan-ducks-wonder.md new file mode 100644 index 000000000..123897616 --- /dev/null +++ b/.changeset/cyan-ducks-wonder.md @@ -0,0 +1,5 @@ +--- +"@workflow/core": patch +--- + +Allow step retrying if it fails without proper cleanup diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index 006aa64ea..3d61a0622 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -719,4 +719,49 @@ describe('e2e', () => { expect(stepCompletedEvents).toHaveLength(1); } ); + + test( + 'processExitResilienceWorkflow - step self-terminates on first attempt', + { timeout: 60_000 }, + async () => { + // This test verifies that the workflow system is resilient to fatal process crashes + // The step will call process.exit() on the first attempt, simulating an unhandled + // fatal error, but the system should recover and retry on a new process + const run = await triggerWorkflow('processExitResilienceWorkflow', []); + const returnValue = await getWorkflowReturnValue(run.runId); + + // The workflow should complete successfully after the process crash + expect(returnValue).toMatchObject({ + attempt: 2, + status: 'recovered', + }); + + // Verify the run data shows successful completion + const { json: runData } = await cliInspectJson( + `runs ${run.runId} --withData` + ); + expect(runData).toMatchObject({ + runId: run.runId, + status: 'completed', + output: { attempt: 2, status: 'recovered' }, + }); + + // Query steps to verify the step was retried + const { json: stepsData } = await cliInspectJson( + `steps --runId ${run.runId} --withData` + ); + expect(stepsData).toBeDefined(); + expect(Array.isArray(stepsData)).toBe(true); + expect(stepsData.length).toBeGreaterThan(0); + + // Find the stepThatExitsOnFirstAttempt step + const exitStep = stepsData.find((s: any) => + s.stepName.includes('stepThatExitsOnFirstAttempt') + ); + expect(exitStep).toBeDefined(); + expect(exitStep.status).toBe('completed'); + expect(exitStep.attempt).toBe(2); + expect(exitStep.output).toEqual([{ attempt: 2, status: 'recovered' }]); + } + ); }); diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 069accbb3..ba20bc08b 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -640,12 +640,14 @@ export const stepEntrypoint = let result: unknown; const attempt = step.attempt + 1; try { - if (step.status !== 'pending') { - // We should only be running the step if it's pending - // (initial state, or state set on re-try), so the step has been + if (!['pending', 'running'].includes(step.status)) { + // We should only be running the step if it's either + // a) pending - initial state, or state set on re-try + // b) running - if a step fails mid-execution, like a function timeout + // so the step has been // invoked erroneously. console.error( - `[Workflows] "${workflowRunId}" - Step invoked erroneously, expected status "pending", got "${step.status}" instead, skipping execution` + `[Workflows] "${workflowRunId}" - Step invoked erroneously, expected status "pending" or "running", got "${step.status}" instead, skipping execution` ); span?.setAttributes({ ...Attribute.StepSkipped(true), @@ -698,11 +700,24 @@ export const stepEntrypoint = () => stepFn(...args) ); + // NOTE: None of the code from this point is guaranteed to run + // Since the step might fail or cause a function timeout and the process might be SIGKILL'd + // The workflow runtime must be resilient to the below code not executing on a failed step + result = dehydrateStepReturnValue(result, ops); waitUntil(Promise.all(ops)); - // Update the event log with the step result + // Mark the step as completed first. This order is important. If a concurrent + // execution marked the step as complete, this request should throw, and + // this prevent the step_completed event in the event log + // TODO: this should really be atomic and handled by the world + await world.steps.update(workflowRunId, stepId, { + status: 'completed', + output: result as Serializable, + }); + + // Then, append the event log with the step result await world.events.create(workflowRunId, { eventType: 'step_completed', correlationId: stepId, @@ -711,11 +726,6 @@ export const stepEntrypoint = }, }); - await world.steps.update(workflowRunId, stepId, { - status: 'completed', - output: result as Serializable, - }); - span?.setAttributes({ ...Attribute.StepStatus('completed'), ...Attribute.StepResultType(typeof result), diff --git a/workbench/example/workflows/99_e2e.ts b/workbench/example/workflows/99_e2e.ts index bd4f1faa7..61b46cbd9 100644 --- a/workbench/example/workflows/99_e2e.ts +++ b/workbench/example/workflows/99_e2e.ts @@ -512,3 +512,36 @@ async function doubleNumber(x: number) { 'use step'; return x * 2; } + +////////////////////////////////////////////////////////// + +async function stepThatExitsOnFirstAttempt() { + 'use step'; + const { attempt } = getStepMetadata(); + console.log(`stepThatExitsOnFirstAttempt - attempt: ${attempt}`); + + // Kill the process on the first attempt to simulate a fatal crash + if (attempt === 1) { + console.log( + `Attempt ${attempt} - calling process.exit() to simulate fatal crash` + ); + process.exit(1); + } + + console.log(`Attempt ${attempt} - succeeding after process recovered`); + return { attempt, status: 'recovered' }; +} + +export async function processExitResilienceWorkflow() { + 'use workflow'; + console.log('Starting process exit resilience workflow'); + + // This step should crash the process on the first attempt, + // but the workflow should recover and retry on a new process + const result = await stepThatExitsOnFirstAttempt(); + + console.log( + `Workflow completed successfully after process crash: ${JSON.stringify(result)}` + ); + return result; +}