Skip to content

Setup a possible example to enforce FIFO like setup on claim #30

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
{
"editor.defaultFormatter": "biomejs.biome",
"[javascript]": {
"editor.defaultFormatter": "biomejs.biome"
},
"[typescript]": {
"editor.defaultFormatter": "biomejs.biome"
},
"[json]": {
"editor.defaultFormatter": "biomejs.biome"
},
"[jsonc]": {
"editor.defaultFormatter": "biomejs.biome"
},
"editor.codeActionsOnSave": {
"quickfix.biome": "explicit",
"source.organizeImports.biome": "explicit"
Expand Down
5 changes: 5 additions & 0 deletions biome.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
"recommended": true
}
},
"json": {
"formatter": {
"enabled": true
}
},
"javascript": {
"formatter": {
"enabled": true,
Expand Down
4 changes: 4 additions & 0 deletions packages/chrono-core/src/datastore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ export type Task<TaskKind, TaskData> = {
data: TaskData;
/** The priority level of the task (lower numbers can indicate higher priority) */
priority?: number;
/** Identifier used to group tasks together so that they are processed sequentially. FIFO
* Should be used sparingly and only when necessary, as it can lead to performance issues.
*/
groupId?: string;
/** A key used for idempotency to prevent duplicate processing */
idempotencyKey?: string;
/** The original scheduled date when the task was first intended to run */
Expand Down
86 changes: 81 additions & 5 deletions packages/chrono-mongo-datastore/src/chrono-mongo-datastore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,24 +118,100 @@ export class ChronoMongoDatastore<TaskMapping extends TaskMappingBase>
input: ClaimTaskInput<TaskKind>,
): Promise<Task<TaskKind, TaskMapping[TaskKind]> | undefined> {
const now = new Date();
const claimedAtStaleTimeout = new Date(now.getTime() - this.config.claimStaleTimeout);

const possibleTasks = await this.collection<TaskKind>()
.aggregate([
// 1. Initial Match: Find potential candidates
{
$match: {
kind: input.kind,
scheduledAt: { $lte: now },
$or: [
{ status: TaskStatus.PENDING },
{
status: TaskStatus.CLAIMED,
claimedAt: {
$lte: claimedAtStaleTimeout,
},
},
],
},
},
// 2. Lookup Blocking Tasks: Check for earlier, active tasks in the same group
{
$lookup: {
from: this.config.collectionName,
let: {
current_groupId: '$groupId',
current_originalScheduledAt: '$originalScheduleDate',
current_id: '$_id',
},
pipeline: [
{
$match: {
// Match documents that could potentially block the current one
groupId: { $ne: null }, // Only apply FIFO to tasks with a groupId
$expr: {
$and: [
{ $ne: ['$_id', '$$current_id'] }, // Not the same task
{ $eq: ['$groupId', '$$current_groupId'] }, // Same group
{ $lt: ['$originalScheduleDate', '$$current_originalScheduledAt'] }, // originalScheduled *before* the current task. TODO: what if originalScheduledAt are the same?
{ $in: ['$status', [TaskStatus.PENDING, TaskStatus.CLAIMED, TaskStatus.FAILED]] }, // Is in a blocking state (PENDING or CLAIMED or FAILED)
],
},
},
},
// Optimization: We only need to know if *any* blocker exists
{ $project: { _id: 1 } },
{ $limit: 1 },
],
as: 'blockingTasks',
},
},
// 3. Filter Based on Lookup: Only keep tasks with NO blocking tasks
{
$match: {
// Keep documents where the blockingTasks array is empty
blockingTasks: { $size: 0 },
},
},
// 4. Sort: Prioritize by priority then scheduledAt
{
$sort: {
priority: -1, // Then by priority (high to low)
scheduledAt: 1, // Enforce overall earliest schedule first
_id: 1, // Deterministic tie-breaker
},
},
// 5. Limit: Get only the single best candidate. Possible we might want more then one incase task is claimed by another process between aggregate and findOneAndUpdate
{
$limit: 1,
},
])
.toArray();

const possibleTask = possibleTasks.shift();

if (!possibleTask) {
return undefined;
}

const task = await this.collection<TaskKind>().findOneAndUpdate(
{
kind: input.kind,
scheduledAt: { $lte: now },
_id: possibleTask._id,
$or: [
{ status: TaskStatus.PENDING },
{
status: TaskStatus.CLAIMED,
claimedAt: {
$lte: new Date(now.getTime() - this.config.claimStaleTimeout),
$lte: claimedAtStaleTimeout,
},
},
],
},
{ $set: { status: TaskStatus.CLAIMED, claimedAt: now } },
{
sort: { priority: -1, scheduledAt: 1 },
// hint: IndexNames.CLAIM_DOCUMENT_INDEX as unknown as Document,
returnDocument: 'after',
},
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,73 @@ describe('ChronoMongoDatastore', () => {
expect.objectContaining({ id: task2.id, status: TaskStatus.CLAIMED }),
);
});

describe('fifo grouping', () => {
const input = {
kind: 'test' as const,
data: { test: 'test' },
priority: 1,
groupId: 'group1',
};

test('should claim tasks in FIFO order based on groupId', async () => {
const task1 = await dataStore.schedule({ ...input, when: new Date(Date.now() - 100) });
const task2 = await dataStore.schedule({ ...input, when: new Date(Date.now() - 50) });
const task3 = await dataStore.schedule({ ...input, when: new Date(Date.now() - 1) });

const possibleClaimedTasks = await Promise.all([
dataStore.claim({ kind: input.kind }),
dataStore.claim({ kind: input.kind }),
dataStore.claim({ kind: input.kind }),
]);

expect(possibleClaimedTasks.length).toEqual(3);

const claimedTasks = possibleClaimedTasks.filter(Boolean);
expect(claimedTasks).toHaveLength(1);
expect(claimedTasks[0]).toEqual(
expect.objectContaining({
id: task1.id,
kind: task1.kind,
status: TaskStatus.CLAIMED,
}),
);

await dataStore.complete(task1.id);

const possibleClaimedTasks2 = await Promise.all([
dataStore.claim({ kind: input.kind }),
dataStore.claim({ kind: input.kind }),
dataStore.claim({ kind: input.kind }),
]);
expect(possibleClaimedTasks2.length).toEqual(3);
const claimedTasks2 = possibleClaimedTasks2.filter(Boolean);
expect(claimedTasks2).toHaveLength(1);
expect(claimedTasks2[0]).toEqual(
expect.objectContaining({
id: task2.id,
kind: task2.kind,
status: TaskStatus.CLAIMED,
}),
);
await dataStore.complete(task2.id);
const possibleClaimedTasks3 = await Promise.all([
dataStore.claim({ kind: input.kind }),
dataStore.claim({ kind: input.kind }),
dataStore.claim({ kind: input.kind }),
]);
expect(possibleClaimedTasks3.length).toEqual(3);
const claimedTasks3 = possibleClaimedTasks3.filter(Boolean);
expect(claimedTasks3).toHaveLength(1);
expect(claimedTasks3[0]).toEqual(
expect.objectContaining({
id: task3.id,
kind: task3.kind,
status: TaskStatus.CLAIMED,
}),
);
});
});
});

describe('complete', () => {
Expand Down