Skip to content

Commit 4a84ed8

Browse files
authored
sqs-consumer 14 fix (#340)
* Fixing sqs-consumer 14 acknowledgment * Fixing tests * Adding tests to make sure that acknowledgement works * Trying to fix test * Release prepare
1 parent 52b6398 commit 4a84ed8

7 files changed

+89
-15
lines changed

packages/sqs/lib/sqs/AbstractSqsConsumer.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ export abstract class AbstractSqsConsumer<
223223
...this.consumerOptionsOverride,
224224
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: fixme
225225
handleMessage: async (message: SQSMessage) => {
226-
if (message === null) return
226+
if (message === null) return message
227227

228228
const messageProcessingStartTimestamp = Date.now()
229229

@@ -239,7 +239,8 @@ export abstract class AbstractSqsConsumer<
239239
queueName: this.queueName,
240240
messageId: messageId.result,
241241
})
242-
return
242+
243+
return message
243244
}
244245
const { parsedMessage, originalMessage } = deserializedMessage.result
245246

@@ -254,7 +255,8 @@ export abstract class AbstractSqsConsumer<
254255
queueName: this.queueName,
255256
messageId: this.tryToExtractId(message).result,
256257
})
257-
return
258+
259+
return message
258260
}
259261

260262
const acquireLockResult = this.isDeduplicationEnabledForMessage(parsedMessage)
@@ -271,6 +273,7 @@ export abstract class AbstractSqsConsumer<
271273
parsedMessage,
272274
messageProcessingStartTimestamp,
273275
)
276+
274277
return message
275278
}
276279

@@ -288,7 +291,8 @@ export abstract class AbstractSqsConsumer<
288291
queueName: this.queueName,
289292
messageId: this.tryToExtractId(message).result,
290293
})
291-
return
294+
295+
return message
292296
}
293297

294298
// @ts-expect-error
@@ -326,6 +330,7 @@ export abstract class AbstractSqsConsumer<
326330
messageProcessingStartTimestamp,
327331
queueName: this.queueName,
328332
})
333+
329334
return message
330335
}
331336

@@ -348,6 +353,7 @@ export abstract class AbstractSqsConsumer<
348353
messageProcessingStartTimestamp,
349354
queueName: this.queueName,
350355
})
356+
351357
return Promise.reject(result.error)
352358
},
353359
})

packages/sqs/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@message-queue-toolkit/sqs",
3-
"version": "22.2.1",
3+
"version": "22.3.0",
44
"private": false,
55
"license": "MIT",
66
"description": "SQS adapter for message-queue-toolkit",

packages/sqs/test/consumers/SqsPermissionConsumer.messageDeduplication.spec.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { randomUUID } from 'node:crypto'
22
import { setTimeout } from 'node:timers/promises'
3+
import { ReceiveMessageCommand, type SQSClient } from '@aws-sdk/client-sqs'
34
import { waitAndRetry } from '@lokalise/node-core'
45
import type { MessageDeduplicationConfig } from '@message-queue-toolkit/core'
56
import { RedisMessageDeduplicationStore } from '@message-queue-toolkit/redis-message-deduplication-store'
@@ -18,6 +19,7 @@ import type {
1819

1920
describe('SqsPermissionConsumer', () => {
2021
let diContainer: AwilixContainer<Dependencies>
22+
let sqsClient: SQSClient
2123
let messageDeduplicationStore: RedisMessageDeduplicationStore
2224
let messageDeduplicationConfig: MessageDeduplicationConfig
2325
let publisher: SqsPermissionPublisher
@@ -26,6 +28,7 @@ describe('SqsPermissionConsumer', () => {
2628
diContainer = await registerDependencies({
2729
permissionConsumer: asValue(() => undefined),
2830
})
31+
sqsClient = diContainer.cradle.sqsClient
2932
publisher = diContainer.cradle.permissionPublisher
3033
messageDeduplicationStore = new RedisMessageDeduplicationStore({
3134
redis: diContainer.cradle.redis,
@@ -84,6 +87,16 @@ describe('SqsPermissionConsumer', () => {
8487
})
8588

8689
await consumer.close()
90+
91+
// Verify that both messages were acknowledged (removed from queue)
92+
const receiveCommandResult = await sqsClient.send(
93+
new ReceiveMessageCommand({
94+
QueueUrl: consumer.queueProps.url,
95+
MaxNumberOfMessages: 1,
96+
WaitTimeSeconds: 1,
97+
}),
98+
)
99+
expect(receiveCommandResult.Messages).toBeUndefined()
87100
})
88101

89102
it('consumes second message immediately when the first one failed to be processed', async () => {
@@ -334,7 +347,7 @@ describe('SqsPermissionConsumer', () => {
334347
messageDeduplicationConfig,
335348
enableConsumerDeduplication: true,
336349
addHandlerOverride: async () => {
337-
await setTimeout(2500)
350+
await setTimeout(3000)
338351
return Promise.resolve({ result: 'success' })
339352
},
340353
})

packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -773,6 +773,16 @@ describe('SqsPermissionConsumer', () => {
773773

774774
expect(consumer.addCounter).toBe(0)
775775
expect(consumer.removeCounter).toBe(0)
776+
777+
// Verify that message was acknowledged (removed from queue)
778+
const receiveCommandResult = await sqsClient.send(
779+
new ReceiveMessageCommand({
780+
QueueUrl: consumer.queueProps.url,
781+
MaxNumberOfMessages: 1,
782+
WaitTimeSeconds: 1,
783+
}),
784+
)
785+
expect(receiveCommandResult.Messages).toBeUndefined()
776786
})
777787

778788
it('Processes messages', async () => {
@@ -795,6 +805,16 @@ describe('SqsPermissionConsumer', () => {
795805

796806
expect(consumer.addCounter).toBe(1)
797807
expect(consumer.removeCounter).toBe(2)
808+
809+
// Verify that all messages were acknowledged (removed from queue)
810+
const receiveCommandResult = await sqsClient.send(
811+
new ReceiveMessageCommand({
812+
QueueUrl: consumer.queueProps.url,
813+
MaxNumberOfMessages: 1,
814+
WaitTimeSeconds: 1,
815+
}),
816+
)
817+
expect(receiveCommandResult.Messages).toBeUndefined()
798818
})
799819
})
800820

@@ -855,6 +875,16 @@ describe('SqsPermissionConsumer', () => {
855875
expect(consumer.addCounter).toBe(messagesAmount)
856876
// Verifies that no message is lost
857877
expect(consumer.processedMessagesIds).toHaveLength(messagesAmount)
878+
879+
// Verify that all messages were acknowledged (removed from queue)
880+
const receiveCommandResult = await sqsClient.send(
881+
new ReceiveMessageCommand({
882+
QueueUrl: consumer.queueProps.url,
883+
MaxNumberOfMessages: 1,
884+
WaitTimeSeconds: 1,
885+
}),
886+
)
887+
expect(receiveCommandResult.Messages).toBeUndefined()
858888
})
859889
})
860890

packages/sqs/test/consumers/SqsPermisssionConsumer.deadLetterQueue.spec.ts

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
import { ListQueueTagsCommand, SendMessageCommand, type SQSClient } from '@aws-sdk/client-sqs'
1+
import {
2+
ListQueueTagsCommand,
3+
ReceiveMessageCommand,
4+
SendMessageCommand,
5+
type SQSClient,
6+
} from '@aws-sdk/client-sqs'
27
import { waitAndRetry } from '@lokalise/node-core'
38
import type { AwilixContainer } from 'awilix'
49
import { Consumer } from 'sqs-consumer'
@@ -260,7 +265,7 @@ describe('SqsPermissionConsumer - deadLetterQueue', () => {
260265
queueUrl: consumer.dlqUrl,
261266
handleMessage: (message: SQSMessage) => {
262267
dlqMessage = message
263-
return Promise.resolve()
268+
return Promise.resolve(message)
264269
},
265270
})
266271
dlqConsumer.start()
@@ -276,6 +281,16 @@ describe('SqsPermissionConsumer - deadLetterQueue', () => {
276281
timestamp: expect.any(String),
277282
_internalRetryLaterCount: 0,
278283
})
284+
285+
// Verify that all messages were acknowledged (removed from queue)
286+
const receiveCommandResult = await sqsClient.send(
287+
new ReceiveMessageCommand({
288+
QueueUrl: consumer.queueProps.url,
289+
MaxNumberOfMessages: 1,
290+
WaitTimeSeconds: 1,
291+
}),
292+
)
293+
expect(receiveCommandResult.Messages).toBeUndefined()
279294
})
280295

281296
it('messages with retryLater should be retried with exponential delay and not go to DLQ', async () => {
@@ -308,6 +323,16 @@ describe('SqsPermissionConsumer - deadLetterQueue', () => {
308323
expect(handlerSpyResult.processingResult).toEqual({ status: 'consumed' })
309324
expect(handlerSpyResult.message).toMatchObject({ id: '1', messageType: 'remove' })
310325

326+
// Verify that message was acknowledged after successful processing
327+
const receiveCommandResult = await sqsClient.send(
328+
new ReceiveMessageCommand({
329+
QueueUrl: consumer.queueProps.url,
330+
MaxNumberOfMessages: 1,
331+
WaitTimeSeconds: 1,
332+
}),
333+
)
334+
expect(receiveCommandResult.Messages).toBeUndefined()
335+
311336
expect(counter).toBe(2)
312337

313338
// delay is 1s, but consumer can take the message
@@ -331,7 +356,7 @@ describe('SqsPermissionConsumer - deadLetterQueue', () => {
331356
queueUrl: consumer.dlqUrl,
332357
handleMessage: (message: SQSMessage) => {
333358
dlqMessage = message
334-
return Promise.resolve()
359+
return Promise.resolve(message)
335360
},
336361
})
337362
dlqConsumer.start()
@@ -373,7 +398,7 @@ describe('SqsPermissionConsumer - deadLetterQueue', () => {
373398
queueUrl: consumer.dlqUrl,
374399
handleMessage: (message: SQSMessage) => {
375400
dlqMessage = message
376-
return Promise.resolve()
401+
return Promise.resolve(message)
377402
},
378403
})
379404
dlqConsumer.start()
@@ -424,7 +449,7 @@ describe('SqsPermissionConsumer - deadLetterQueue', () => {
424449
queueUrl: consumer.dlqUrl,
425450
handleMessage: (message: SQSMessage) => {
426451
dlqMessage = message
427-
return Promise.resolve()
452+
return Promise.resolve(message)
428453
},
429454
})
430455
dlqConsumer.start()

packages/sqs/test/publishers/SqsPermissionPublisher.payloadOffloading.spec.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,10 @@ describe('SqsPermissionPublisher - payload offloading', () => {
6262
queueUrl,
6363
handleMessage: (message: Message) => {
6464
if (message === null) {
65-
return Promise.resolve()
65+
return Promise.resolve(message)
6666
}
6767
receivedSqsMessages.push(message)
68-
return Promise.resolve()
68+
return Promise.resolve(message)
6969
},
7070
sqs: sqsClient,
7171
messageAttributeNames: [OFFLOADED_PAYLOAD_SIZE_ATTRIBUTE],

packages/sqs/test/publishers/SqsPermissionPublisher.spec.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -455,15 +455,15 @@ describe('SqsPermissionPublisher', () => {
455455
queueUrl: queueUrl,
456456
handleMessage: (message: SQSMessage) => {
457457
if (message === null) {
458-
return Promise.resolve()
458+
return Promise.resolve(message)
459459
}
460460
const decodedMessage = deserializeSQSMessage(
461461
message as any,
462462
PERMISSIONS_ADD_MESSAGE_SCHEMA,
463463
new FakeConsumerErrorResolver(),
464464
)
465465
receivedMessage = decodedMessage.result!
466-
return Promise.resolve()
466+
return Promise.resolve(message)
467467
},
468468
sqs: diContainer.cradle.sqsClient,
469469
})

0 commit comments

Comments
 (0)