Skip to content

Commit b231836

Browse files
committed
Respect sleep
1 parent fc002dc commit b231836

File tree

3 files changed

+25
-10
lines changed

3 files changed

+25
-10
lines changed

packages/world-postgres/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import { createWorld, createPgBossQueue } from "@workflow/world-postgres";
4242
const world = createWorld({
4343
connectionString: "postgres://username:password@localhost:5432/database",
4444
securityToken: "your-secret-token-here",
45-
queueFactorya: createPgBossHttpProxyQueue({
45+
queueFactory: createPgBossHttpProxyQueue({
4646
jobPrefix: "my-app",
4747
queueConcurrency: 10,
4848
})

packages/world-postgres/src/queue-drivers/pgboss.ts

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,12 @@ export function createPgBossQueue(
5555
start: async () => {
5656
await ensureStarted();
5757

58-
const stepWorker = createWorker(proxy.proxyStep);
59-
const workflowWorker = createWorker(proxy.proxyWorkflow);
58+
const stepWorker = createWorker(boss, stepQueueName, proxy.proxyStep);
59+
const workflowWorker = createWorker(
60+
boss,
61+
workflowQueueName,
62+
proxy.proxyWorkflow
63+
);
6064

6165
for (let i = 0; i < opts.queueConcurrency; i++) {
6266
await boss.work(workflowQueueName, workflowWorker);
@@ -66,7 +70,11 @@ export function createPgBossQueue(
6670
};
6771
}
6872

69-
function createWorker(proxy: WkfProxy[keyof WkfProxy]) {
73+
function createWorker(
74+
boss: PgBoss,
75+
queueName: string,
76+
proxy: WkfProxy[keyof WkfProxy]
77+
) {
7078
return async ([job]: PgBoss.Job[]) => {
7179
const message = MessageData.parse(job.data);
7280

@@ -75,13 +83,21 @@ function createWorker(proxy: WkfProxy[keyof WkfProxy]) {
7583
try {
7684
const response = await proxy(message);
7785

78-
// TODO: Properly handle 503
7986
if (response.status === 503) {
80-
const body = (await response.json()) as {
81-
timeoutSeconds?: number;
82-
};
87+
const body = (await response.json()) as { timeoutSeconds?: number };
88+
8389
if (body.timeoutSeconds) {
84-
throw new Error(`Retry after ${body.timeoutSeconds}s`);
90+
await boss.send(queueName, job.data, {
91+
startAfter: new Date(Date.now() + body.timeoutSeconds * 1000),
92+
singletonKey: message?.idempotencyKey ?? message.messageId,
93+
retryLimit: 3,
94+
});
95+
96+
console.log(
97+
`[${job.id}] requeued: ${message.queueName} for ${body.timeoutSeconds}s`
98+
);
99+
100+
return;
85101
}
86102
}
87103

packages/world-postgres/src/queue.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ export function createQueue(
8383
messageId: message.messageId,
8484
});
8585

86-
// TODO: Understand what's this?
8786
let timeoutSeconds: number | null = null;
8887
if (typeof result?.timeoutSeconds === 'number') {
8988
timeoutSeconds = Math.min(

0 commit comments

Comments
 (0)