Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/cyan-ducks-wonder.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/core": patch
---

Allow step retrying if it fails without proper cleanup
45 changes: 45 additions & 0 deletions packages/core/e2e/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
const url = new URL('/api/trigger', deploymentUrl);
url.searchParams.set('runId', runId);

const res = await fetch(url);

Check failure on line 46 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Windows Tests

packages/core/e2e/e2e.test.ts > e2e > processExitResilienceWorkflow - step self-terminates on first attempt

TypeError: fetch failed ❯ getWorkflowReturnValue packages/core/e2e/e2e.test.ts:46:17 ❯ packages/core/e2e/e2e.test.ts:731:27 Caused by: Caused by: Error: read ECONNRESET ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯ Serialized Error: { errno: -4077, code: 'ECONNRESET', syscall: 'read' }

Check failure on line 46 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Dev Tests (nextjs-webpack - stable)

packages/core/e2e/e2e.test.ts > e2e > processExitResilienceWorkflow - step self-terminates on first attempt

TypeError: fetch failed ❯ getWorkflowReturnValue packages/core/e2e/e2e.test.ts:46:17 ❯ packages/core/e2e/e2e.test.ts:731:27 Caused by: Caused by: SocketError: other side closed ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯ Serialized Error: { code: 'UND_ERR_SOCKET', socket: { localAddress: '::1', localPort: 54936, remoteAddress: '::1', remotePort: 3000, remoteFamily: 'IPv6', timeout: undefined, bytesWritten: 218, bytesRead: +0 } }

Check failure on line 46 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Dev Tests (nextjs-turbopack - stable)

packages/core/e2e/e2e.test.ts > e2e > processExitResilienceWorkflow - step self-terminates on first attempt

TypeError: fetch failed ❯ getWorkflowReturnValue packages/core/e2e/e2e.test.ts:46:17 ❯ packages/core/e2e/e2e.test.ts:731:27 Caused by: Caused by: SocketError: other side closed ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯ Serialized Error: { code: 'UND_ERR_SOCKET', socket: { localAddress: '::1', localPort: 37240, remoteAddress: '::1', remotePort: 3000, remoteFamily: 'IPv6', timeout: undefined, bytesWritten: 218, bytesRead: +0 } }

Check failure on line 46 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Prod Tests (nextjs-turbopack - stable)

packages/core/e2e/e2e.test.ts > e2e > processExitResilienceWorkflow - step self-terminates on first attempt

TypeError: fetch failed ❯ getWorkflowReturnValue packages/core/e2e/e2e.test.ts:46:17 ❯ packages/core/e2e/e2e.test.ts:731:27 Caused by: Caused by: SocketError: other side closed ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯ Serialized Error: { code: 'UND_ERR_SOCKET', socket: { localAddress: '::1', localPort: 45342, remoteAddress: '::1', remotePort: 3000, remoteFamily: 'IPv6', timeout: undefined, bytesWritten: 218, bytesRead: +0 } }

Check failure on line 46 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Prod Tests (nuxt - stable)

packages/core/e2e/e2e.test.ts > e2e > processExitResilienceWorkflow - step self-terminates on first attempt

TypeError: fetch failed ❯ getWorkflowReturnValue packages/core/e2e/e2e.test.ts:46:17 ❯ packages/core/e2e/e2e.test.ts:731:27 Caused by: Caused by: SocketError: other side closed ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯ Serialized Error: { code: 'UND_ERR_SOCKET', socket: { localAddress: '::1', localPort: 43068, remoteAddress: '::1', remotePort: 3000, remoteFamily: 'IPv6', timeout: undefined, bytesWritten: 218, bytesRead: +0 } }

Check failure on line 46 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Prod Tests (sveltekit - stable)

packages/core/e2e/e2e.test.ts > e2e > processExitResilienceWorkflow - step self-terminates on first attempt

TypeError: fetch failed ❯ getWorkflowReturnValue packages/core/e2e/e2e.test.ts:46:17 ❯ packages/core/e2e/e2e.test.ts:731:27 Caused by: Caused by: SocketError: other side closed ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯ Serialized Error: { code: 'UND_ERR_SOCKET', socket: { localAddress: '::1', localPort: 46228, remoteAddress: '::1', remotePort: 4173, remoteFamily: 'IPv6', timeout: undefined, bytesWritten: 218, bytesRead: +0 } }

Check failure on line 46 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Prod Tests (nextjs-webpack - stable)

packages/core/e2e/e2e.test.ts > e2e > processExitResilienceWorkflow - step self-terminates on first attempt

TypeError: fetch failed ❯ getWorkflowReturnValue packages/core/e2e/e2e.test.ts:46:17 ❯ packages/core/e2e/e2e.test.ts:731:27 Caused by: Caused by: SocketError: other side closed ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯ Serialized Error: { code: 'UND_ERR_SOCKET', socket: { localAddress: '::1', localPort: 44012, remoteAddress: '::1', remotePort: 3000, remoteFamily: 'IPv6', timeout: undefined, bytesWritten: 218, bytesRead: +0 } }

Check failure on line 46 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Prod Tests (nitro - stable)

packages/core/e2e/e2e.test.ts > e2e > processExitResilienceWorkflow - step self-terminates on first attempt

TypeError: fetch failed ❯ getWorkflowReturnValue packages/core/e2e/e2e.test.ts:46:17 ❯ packages/core/e2e/e2e.test.ts:731:27 Caused by: Caused by: SocketError: other side closed ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯ Serialized Error: { code: 'UND_ERR_SOCKET', socket: { localAddress: '::1', localPort: 55312, remoteAddress: '::1', remotePort: 3000, remoteFamily: 'IPv6', timeout: undefined, bytesWritten: 218, bytesRead: +0 } }

if (res.status === 202) {
// Workflow run is still running, so we need to wait and poll again
Expand All @@ -60,7 +60,7 @@
return res.body;
}

throw new Error(`Unexpected content type: ${contentType}`);

Check failure on line 63 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Vercel Prod Tests (sveltekit)

packages/core/e2e/e2e.test.ts > e2e > processExitResilienceWorkflow - step self-terminates on first attempt

Error: Unexpected content type: text/plain; charset=utf-8 ❯ getWorkflowReturnValue packages/core/e2e/e2e.test.ts:63:11 ❯ packages/core/e2e/e2e.test.ts:731:27
}
}

Expand All @@ -76,7 +76,7 @@
workflowFile: 'workflows/98_duplicate_case.ts',
workflowFn: 'addTenWorkflow',
},
])('addTenWorkflow', { timeout: 60_000 }, async (workflow) => {

Check failure on line 79 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Dev Tests (sveltekit - stable)

packages/core/e2e/e2e.test.ts > e2e > addTenWorkflow

Error: Test timed out in 60000ms. If this is a long-running test, pass a timeout value as the last argument or configure it globally with "testTimeout". ❯ packages/core/e2e/e2e.test.ts:79:4

Check failure on line 79 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Dev Tests (sveltekit - stable)

packages/core/e2e/e2e.test.ts > e2e > addTenWorkflow

Error: Test timed out in 60000ms. If this is a long-running test, pass a timeout value as the last argument or configure it globally with "testTimeout". ❯ packages/core/e2e/e2e.test.ts:79:4

Check failure on line 79 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Dev Tests (nuxt - stable)

packages/core/e2e/e2e.test.ts > e2e > addTenWorkflow

Error: Test timed out in 60000ms. If this is a long-running test, pass a timeout value as the last argument or configure it globally with "testTimeout". ❯ packages/core/e2e/e2e.test.ts:79:4
const run = await triggerWorkflow(workflow, [123]);
const returnValue = await getWorkflowReturnValue(run.runId);
expect(returnValue).toBe(133);
Expand All @@ -97,25 +97,25 @@
]);
});

test('promiseAllWorkflow', { timeout: 60_000 }, async () => {

Check failure on line 100 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Dev Tests (sveltekit - stable)

packages/core/e2e/e2e.test.ts > e2e > promiseAllWorkflow

Error: Test timed out in 60000ms. If this is a long-running test, pass a timeout value as the last argument or configure it globally with "testTimeout". ❯ packages/core/e2e/e2e.test.ts:100:3
const run = await triggerWorkflow('promiseAllWorkflow', []);
const returnValue = await getWorkflowReturnValue(run.runId);
expect(returnValue).toBe('ABC');
});

test('promiseRaceWorkflow', { timeout: 60_000 }, async () => {

Check failure on line 106 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Dev Tests (sveltekit - stable)

packages/core/e2e/e2e.test.ts > e2e > promiseRaceWorkflow

Error: Test timed out in 60000ms. If this is a long-running test, pass a timeout value as the last argument or configure it globally with "testTimeout". ❯ packages/core/e2e/e2e.test.ts:106:3
const run = await triggerWorkflow('promiseRaceWorkflow', []);
const returnValue = await getWorkflowReturnValue(run.runId);
expect(returnValue).toBe('B');
});

test('promiseAnyWorkflow', { timeout: 60_000 }, async () => {

Check failure on line 112 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Dev Tests (sveltekit - stable)

packages/core/e2e/e2e.test.ts > e2e > promiseAnyWorkflow

Error: Test timed out in 60000ms. If this is a long-running test, pass a timeout value as the last argument or configure it globally with "testTimeout". ❯ packages/core/e2e/e2e.test.ts:112:3
const run = await triggerWorkflow('promiseAnyWorkflow', []);
const returnValue = await getWorkflowReturnValue(run.runId);
expect(returnValue).toBe('B');
});

test('readableStreamWorkflow', { timeout: 60_000 }, async () => {

Check failure on line 118 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Dev Tests (sveltekit - stable)

packages/core/e2e/e2e.test.ts > e2e > readableStreamWorkflow

Error: Test timed out in 60000ms. If this is a long-running test, pass a timeout value as the last argument or configure it globally with "testTimeout". ❯ packages/core/e2e/e2e.test.ts:118:3
const run = await triggerWorkflow('readableStreamWorkflow', []);
const returnValue = await getWorkflowReturnValue(run.runId);
expect(returnValue).toBeInstanceOf(ReadableStream);
Expand Down Expand Up @@ -145,7 +145,7 @@
method: 'POST',
body: JSON.stringify({ token, data: { message: 'one' } }),
});
expect(res.status).toBe(200);

Check failure on line 148 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Dev Tests (sveltekit - stable)

packages/core/e2e/e2e.test.ts > e2e > hookWorkflow

AssertionError: expected 404 to be 200 // Object.is equality - Expected + Received - 200 + 404 ❯ packages/core/e2e/e2e.test.ts:148:24
let body = await res.json();
expect(body.runId).toBe(run.runId);

Expand Down Expand Up @@ -214,7 +214,7 @@
body: JSON.stringify({ message: 'one' }),
}
);
expect(res.status).toBe(202);

Check failure on line 217 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Dev Tests (sveltekit - stable)

packages/core/e2e/e2e.test.ts > e2e > webhookWorkflow

AssertionError: expected 404 to be 202 // Object.is equality - Expected + Received - 202 + 404 ❯ packages/core/e2e/e2e.test.ts:217:24
const body = await res.text();
expect(body).toBe('');

Expand Down Expand Up @@ -292,14 +292,14 @@
expect(body).toBe('');
});

test('sleepingWorkflow', { timeout: 60_000 }, async () => {

Check failure on line 295 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Dev Tests (sveltekit - stable)

packages/core/e2e/e2e.test.ts > e2e > sleepingWorkflow

Error: Test timed out in 60000ms. If this is a long-running test, pass a timeout value as the last argument or configure it globally with "testTimeout". ❯ packages/core/e2e/e2e.test.ts:295:3
const run = await triggerWorkflow('sleepingWorkflow', []);
const returnValue = await getWorkflowReturnValue(run.runId);
expect(returnValue.startTime).toBeLessThan(returnValue.endTime);
expect(returnValue.endTime - returnValue.startTime).toBeGreaterThan(9999);
});

test('nullByteWorkflow', { timeout: 60_000 }, async () => {

Check failure on line 302 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Dev Tests (sveltekit - stable)

packages/core/e2e/e2e.test.ts > e2e > nullByteWorkflow

Error: Test timed out in 60000ms. If this is a long-running test, pass a timeout value as the last argument or configure it globally with "testTimeout". ❯ packages/core/e2e/e2e.test.ts:302:3
const run = await triggerWorkflow('nullByteWorkflow', []);
const returnValue = await getWorkflowReturnValue(run.runId);
expect(returnValue).toBe('null byte \0');
Expand Down Expand Up @@ -719,4 +719,49 @@
expect(stepCompletedEvents).toHaveLength(1);
}
);

test(

Check failure on line 723 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Dev Tests (nuxt - stable)

packages/core/e2e/e2e.test.ts > e2e > processExitResilienceWorkflow - step self-terminates on first attempt

Error: Test timed out in 60000ms. If this is a long-running test, pass a timeout value as the last argument or configure it globally with "testTimeout". ❯ packages/core/e2e/e2e.test.ts:723:3
'processExitResilienceWorkflow - step self-terminates on first attempt',
{ timeout: 60_000 },
async () => {
// This test verifies that the workflow system is resilient to fatal process crashes
// The step will call process.exit() on the first attempt, simulating an unhandled
// fatal error, but the system should recover and retry on a new process
const run = await triggerWorkflow('processExitResilienceWorkflow', []);
const returnValue = await getWorkflowReturnValue(run.runId);

// The workflow should complete successfully after the process crash
expect(returnValue).toMatchObject({

Check failure on line 734 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Dev Tests (nitro - stable)

packages/core/e2e/e2e.test.ts > e2e > processExitResilienceWorkflow - step self-terminates on first attempt

AssertionError: expected { error: true, …(4) } to match object { attempt: 2, status: 'recovered' } (4 matching properties omitted from actual) - Expected + Received { - "attempt": 2, - "status": "recovered", + "status": 500, } ❯ packages/core/e2e/e2e.test.ts:734:27
attempt: 2,
status: 'recovered',
});

// Verify the run data shows successful completion
const { json: runData } = await cliInspectJson(
`runs ${run.runId} --withData`
);
expect(runData).toMatchObject({
runId: run.runId,
status: 'completed',
output: { attempt: 2, status: 'recovered' },
});

// Query steps to verify the step was retried
const { json: stepsData } = await cliInspectJson(
`steps --runId ${run.runId} --withData`
);
expect(stepsData).toBeDefined();
expect(Array.isArray(stepsData)).toBe(true);
expect(stepsData.length).toBeGreaterThan(0);

// Find the stepThatExitsOnFirstAttempt step
const exitStep = stepsData.find((s: any) =>
s.stepName.includes('stepThatExitsOnFirstAttempt')
);
expect(exitStep).toBeDefined();
expect(exitStep.status).toBe('completed');
expect(exitStep.attempt).toBe(2);
expect(exitStep.output).toEqual([{ attempt: 2, status: 'recovered' }]);

Check failure on line 764 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Vercel Prod Tests (nextjs-turbopack)

packages/core/e2e/e2e.test.ts > e2e > processExitResilienceWorkflow - step self-terminates on first attempt

AssertionError: expected [ { attempt: 1, status: 2 }, 2, …(1) ] to deeply equal [ { attempt: 2, status: 'recovered' } ] - Expected + Received [ { - "attempt": 2, - "status": "recovered", + "attempt": 1, + "status": 2, }, + 2, + "recovered", ] ❯ packages/core/e2e/e2e.test.ts:764:31

Check failure on line 764 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Vercel Prod Tests (nextjs-webpack)

packages/core/e2e/e2e.test.ts > e2e > processExitResilienceWorkflow - step self-terminates on first attempt

AssertionError: expected [ { attempt: 1, status: 2 }, 2, …(1) ] to deeply equal [ { attempt: 2, status: 'recovered' } ] - Expected + Received [ { - "attempt": 2, - "status": "recovered", + "attempt": 1, + "status": 2, }, + 2, + "recovered", ] ❯ packages/core/e2e/e2e.test.ts:764:31

Check failure on line 764 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Vercel Prod Tests (vite)

packages/core/e2e/e2e.test.ts > e2e > processExitResilienceWorkflow - step self-terminates on first attempt

AssertionError: expected [ { attempt: 1, status: 2 }, 2, …(1) ] to deeply equal [ { attempt: 2, status: 'recovered' } ] - Expected + Received [ { - "attempt": 2, - "status": "recovered", + "attempt": 1, + "status": 2, }, + 2, + "recovered", ] ❯ packages/core/e2e/e2e.test.ts:764:31

Check failure on line 764 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Vercel Prod Tests (nitro)

packages/core/e2e/e2e.test.ts > e2e > processExitResilienceWorkflow - step self-terminates on first attempt

AssertionError: expected [ { attempt: 1, status: 2 }, 2, …(1) ] to deeply equal [ { attempt: 2, status: 'recovered' } ] - Expected + Received [ { - "attempt": 2, - "status": "recovered", + "attempt": 1, + "status": 2, }, + 2, + "recovered", ] ❯ packages/core/e2e/e2e.test.ts:764:31

Check failure on line 764 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Vercel Prod Tests (example)

packages/core/e2e/e2e.test.ts > e2e > processExitResilienceWorkflow - step self-terminates on first attempt

AssertionError: expected [ { attempt: 1, status: 2 }, 2, …(1) ] to deeply equal [ { attempt: 2, status: 'recovered' } ] - Expected + Received [ { - "attempt": 2, - "status": "recovered", + "attempt": 1, + "status": 2, }, + 2, + "recovered", ] ❯ packages/core/e2e/e2e.test.ts:764:31

Check failure on line 764 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Vercel Prod Tests (nuxt)

packages/core/e2e/e2e.test.ts > e2e > processExitResilienceWorkflow - step self-terminates on first attempt

AssertionError: expected [ { attempt: 1, status: 2 }, 2, …(1) ] to deeply equal [ { attempt: 2, status: 'recovered' } ] - Expected + Received [ { - "attempt": 2, - "status": "recovered", + "attempt": 1, + "status": 2, }, + 2, + "recovered", ] ❯ packages/core/e2e/e2e.test.ts:764:31
}
);
});
30 changes: 20 additions & 10 deletions packages/core/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -640,12 +640,14 @@ export const stepEntrypoint =
let result: unknown;
const attempt = step.attempt + 1;
try {
if (step.status !== 'pending') {
// We should only be running the step if it's pending
// (initial state, or state set on re-try), so the step has been
if (!['pending', 'running'].includes(step.status)) {
// We should only be running the step if it's either
// a) pending - initial state, or state set on re-try
// b) running - if a step fails mid-execution, like a function timeout
// so the step has been
// invoked erroneously.
console.error(
`[Workflows] "${workflowRunId}" - Step invoked erroneously, expected status "pending", got "${step.status}" instead, skipping execution`
`[Workflows] "${workflowRunId}" - Step invoked erroneously, expected status "pending" or "running", got "${step.status}" instead, skipping execution`
);
span?.setAttributes({
...Attribute.StepSkipped(true),
Expand Down Expand Up @@ -698,11 +700,24 @@ export const stepEntrypoint =
() => stepFn(...args)
);

// NOTE: None of the code from this point is guaranteed to run
// Since the step might fail or cause a function timeout and the process might be SIGKILL'd
// The workflow runtime must be resilient to the below code not executing on a failed step

result = dehydrateStepReturnValue(result, ops);

waitUntil(Promise.all(ops));

// Update the event log with the step result
// Mark the step as completed first. This order is important. If a concurrent
// execution marked the step as complete, this request should throw, and
// this prevent the step_completed event in the event log
// TODO: this should really be atomic and handled by the world
await world.steps.update(workflowRunId, stepId, {
status: 'completed',
output: result as Serializable,
});

// Then, append the event log with the step result
await world.events.create(workflowRunId, {
eventType: 'step_completed',
correlationId: stepId,
Expand All @@ -711,11 +726,6 @@ export const stepEntrypoint =
},
});

await world.steps.update(workflowRunId, stepId, {
status: 'completed',
output: result as Serializable,
});

span?.setAttributes({
...Attribute.StepStatus('completed'),
...Attribute.StepResultType(typeof result),
Expand Down
33 changes: 33 additions & 0 deletions workbench/example/workflows/99_e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -512,3 +512,36 @@ async function doubleNumber(x: number) {
'use step';
return x * 2;
}

//////////////////////////////////////////////////////////

async function stepThatExitsOnFirstAttempt() {
'use step';
const { attempt } = getStepMetadata();
console.log(`stepThatExitsOnFirstAttempt - attempt: ${attempt}`);

// Kill the process on the first attempt to simulate a fatal crash
if (attempt === 1) {
console.log(
`Attempt ${attempt} - calling process.exit() to simulate fatal crash`
);
process.exit(1);
}

console.log(`Attempt ${attempt} - succeeding after process recovered`);
return { attempt, status: 'recovered' };
}

export async function processExitResilienceWorkflow() {
'use workflow';
console.log('Starting process exit resilience workflow');

// This step should crash the process on the first attempt,
// but the workflow should recover and retry on a new process
const result = await stepThatExitsOnFirstAttempt();

console.log(
`Workflow completed successfully after process crash: ${JSON.stringify(result)}`
);
return result;
}
Loading