diff --git a/.vscode/settings.json b/.vscode/settings.json index 575d6e0..12d93a2 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,5 +1,17 @@ { "editor.defaultFormatter": "biomejs.biome", + "[javascript]": { + "editor.defaultFormatter": "biomejs.biome" + }, + "[typescript]": { + "editor.defaultFormatter": "biomejs.biome" + }, + "[json]": { + "editor.defaultFormatter": "biomejs.biome" + }, + "[jsonc]": { + "editor.defaultFormatter": "biomejs.biome" + }, "editor.codeActionsOnSave": { "quickfix.biome": "explicit", "source.organizeImports.biome": "explicit" diff --git a/biome.json b/biome.json index 2a7ade0..68a8240 100644 --- a/biome.json +++ b/biome.json @@ -25,6 +25,11 @@ "recommended": true } }, + "json": { + "formatter": { + "enabled": true + } + }, "javascript": { "formatter": { "enabled": true, diff --git a/packages/chrono-core/src/datastore.ts b/packages/chrono-core/src/datastore.ts index b084619..4f3c449 100644 --- a/packages/chrono-core/src/datastore.ts +++ b/packages/chrono-core/src/datastore.ts @@ -19,6 +19,10 @@ export type Task = { data: TaskData; /** The priority level of the task (lower numbers can indicate higher priority) */ priority?: number; + /** Identifier used to group tasks together so that they are processed sequentially. FIFO + * Should be used sparingly and only when necessary, as it can lead to performance issues. + */ + groupId?: string; /** A key used for idempotency to prevent duplicate processing */ idempotencyKey?: string; /** The original scheduled date when the task was first intended to run */ diff --git a/packages/chrono-mongo-datastore/src/chrono-mongo-datastore.ts b/packages/chrono-mongo-datastore/src/chrono-mongo-datastore.ts index 941b04e..05f82c3 100644 --- a/packages/chrono-mongo-datastore/src/chrono-mongo-datastore.ts +++ b/packages/chrono-mongo-datastore/src/chrono-mongo-datastore.ts @@ -118,24 +118,100 @@ export class ChronoMongoDatastore input: ClaimTaskInput, ): Promise | undefined> { const now = new Date(); + const claimedAtStaleTimeout = new Date(now.getTime() - this.config.claimStaleTimeout); + + const possibleTasks = await this.collection() + .aggregate([ + // 1. Initial Match: Find potential candidates + { + $match: { + kind: input.kind, + scheduledAt: { $lte: now }, + $or: [ + { status: TaskStatus.PENDING }, + { + status: TaskStatus.CLAIMED, + claimedAt: { + $lte: claimedAtStaleTimeout, + }, + }, + ], + }, + }, + // 2. Lookup Blocking Tasks: Check for earlier, active tasks in the same group + { + $lookup: { + from: this.config.collectionName, + let: { + current_groupId: '$groupId', + current_originalScheduledAt: '$originalScheduleDate', + current_id: '$_id', + }, + pipeline: [ + { + $match: { + // Match documents that could potentially block the current one + groupId: { $ne: null }, // Only apply FIFO to tasks with a groupId + $expr: { + $and: [ + { $ne: ['$_id', '$$current_id'] }, // Not the same task + { $eq: ['$groupId', '$$current_groupId'] }, // Same group + { $lt: ['$originalScheduleDate', '$$current_originalScheduledAt'] }, // originalScheduled *before* the current task. TODO: what if originalScheduledAt are the same? + { $in: ['$status', [TaskStatus.PENDING, TaskStatus.CLAIMED, TaskStatus.FAILED]] }, // Is in a blocking state (PENDING or CLAIMED or FAILED) + ], + }, + }, + }, + // Optimization: We only need to know if *any* blocker exists + { $project: { _id: 1 } }, + { $limit: 1 }, + ], + as: 'blockingTasks', + }, + }, + // 3. Filter Based on Lookup: Only keep tasks with NO blocking tasks + { + $match: { + // Keep documents where the blockingTasks array is empty + blockingTasks: { $size: 0 }, + }, + }, + // 4. Sort: Prioritize by priority then scheduledAt + { + $sort: { + priority: -1, // Then by priority (high to low) + scheduledAt: 1, // Enforce overall earliest schedule first + _id: 1, // Deterministic tie-breaker + }, + }, + // 5. Limit: Get only the single best candidate. Possible we might want more then one incase task is claimed by another process between aggregate and findOneAndUpdate + { + $limit: 1, + }, + ]) + .toArray(); + + const possibleTask = possibleTasks.shift(); + + if (!possibleTask) { + return undefined; + } + const task = await this.collection().findOneAndUpdate( { - kind: input.kind, - scheduledAt: { $lte: now }, + _id: possibleTask._id, $or: [ { status: TaskStatus.PENDING }, { status: TaskStatus.CLAIMED, claimedAt: { - $lte: new Date(now.getTime() - this.config.claimStaleTimeout), + $lte: claimedAtStaleTimeout, }, }, ], }, { $set: { status: TaskStatus.CLAIMED, claimedAt: now } }, { - sort: { priority: -1, scheduledAt: 1 }, - // hint: IndexNames.CLAIM_DOCUMENT_INDEX as unknown as Document, returnDocument: 'after', }, ); diff --git a/packages/chrono-mongo-datastore/test/unit/chrono-mongo-datastore.test.ts b/packages/chrono-mongo-datastore/test/unit/chrono-mongo-datastore.test.ts index 6d4146f..1a9fc87 100644 --- a/packages/chrono-mongo-datastore/test/unit/chrono-mongo-datastore.test.ts +++ b/packages/chrono-mongo-datastore/test/unit/chrono-mongo-datastore.test.ts @@ -198,6 +198,73 @@ describe('ChronoMongoDatastore', () => { expect.objectContaining({ id: task2.id, status: TaskStatus.CLAIMED }), ); }); + + describe('fifo grouping', () => { + const input = { + kind: 'test' as const, + data: { test: 'test' }, + priority: 1, + groupId: 'group1', + }; + + test('should claim tasks in FIFO order based on groupId', async () => { + const task1 = await dataStore.schedule({ ...input, when: new Date(Date.now() - 100) }); + const task2 = await dataStore.schedule({ ...input, when: new Date(Date.now() - 50) }); + const task3 = await dataStore.schedule({ ...input, when: new Date(Date.now() - 1) }); + + const possibleClaimedTasks = await Promise.all([ + dataStore.claim({ kind: input.kind }), + dataStore.claim({ kind: input.kind }), + dataStore.claim({ kind: input.kind }), + ]); + + expect(possibleClaimedTasks.length).toEqual(3); + + const claimedTasks = possibleClaimedTasks.filter(Boolean); + expect(claimedTasks).toHaveLength(1); + expect(claimedTasks[0]).toEqual( + expect.objectContaining({ + id: task1.id, + kind: task1.kind, + status: TaskStatus.CLAIMED, + }), + ); + + await dataStore.complete(task1.id); + + const possibleClaimedTasks2 = await Promise.all([ + dataStore.claim({ kind: input.kind }), + dataStore.claim({ kind: input.kind }), + dataStore.claim({ kind: input.kind }), + ]); + expect(possibleClaimedTasks2.length).toEqual(3); + const claimedTasks2 = possibleClaimedTasks2.filter(Boolean); + expect(claimedTasks2).toHaveLength(1); + expect(claimedTasks2[0]).toEqual( + expect.objectContaining({ + id: task2.id, + kind: task2.kind, + status: TaskStatus.CLAIMED, + }), + ); + await dataStore.complete(task2.id); + const possibleClaimedTasks3 = await Promise.all([ + dataStore.claim({ kind: input.kind }), + dataStore.claim({ kind: input.kind }), + dataStore.claim({ kind: input.kind }), + ]); + expect(possibleClaimedTasks3.length).toEqual(3); + const claimedTasks3 = possibleClaimedTasks3.filter(Boolean); + expect(claimedTasks3).toHaveLength(1); + expect(claimedTasks3[0]).toEqual( + expect.objectContaining({ + id: task3.id, + kind: task3.kind, + status: TaskStatus.CLAIMED, + }), + ); + }); + }); }); describe('complete', () => {