This monorepo contains two packages for consuming AWS SQS messages with type safety and validation:
- @snow-tzu/sqs-listener - Framework-agnostic core package that works with vanilla Node.js, Express, Fastify, Koa, or any Node.js environment
- @snow-tzu/nest-sqs-listener - NestJS adapter that wraps the core with NestJS-specific features (dependency injection, lifecycle hooks, decorators)
Both packages share the same powerful features: type safety, automatic validation, flexible acknowledgement modes, concurrency control, and extensibility. The core package provides the foundation, while framework adapters add integration with specific frameworks.
Choose your package:
- 🎯 Using NestJS? → Install
@snow-tzu/nest-sqs-listener(includes core automatically) - 🚀 Using Express, Fastify, or vanilla Node.js? → Install
@snow-tzu/sqs-listener
The framework-agnostic foundation that handles all SQS message consumption logic.
Features:
- ✅ Type-safe message handling with generics
- ✅ Automatic message validation with class-validator
- ✅ Flexible acknowledgement modes (ON_SUCCESS, MANUAL, ALWAYS)
- ✅ Concurrency control with configurable limits
- ✅ Custom message converters and error handlers
- ✅ Works with any Node.js framework or vanilla Node.js
Installation:
npm install @snow-tzu/sqs-listener @aws-sdk/client-sqsDocumentation: Core Package README
NestJS-specific wrapper that adds framework integration on top of the core package.
Additional Features:
- ✅ Full NestJS dependency injection support
- ✅ Automatic lifecycle management (OnModuleInit, OnModuleDestroy)
- ✅ NestJS Logger integration
- ✅ 100% backward compatible with v0.0.4
- ✅ Includes all core package features
Installation:
npm install @snow-tzu/nest-sqs-listener @aws-sdk/client-sqsDocumentation: NestJS Adapter README
┌─────────────────────────────────────────┐
│ Your Application │
│ (NestJS, Express, Vanilla Node.js) │
└─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ Framework Adapter (Optional) │
│ @snow-tzu/nest-sqs-listener │
│ - Lifecycle hooks │
│ - DI integration │
└─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ Core Package │
│ @snow-tzu/sqs-listener │
│ - Message consumption │
│ - Validation │
│ - Error handling │
└─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ AWS SQS │
└─────────────────────────────────────────┘
Get started quickly with complete, runnable examples for your framework.
// 1. Install
npm
install
@snow
-tzu / nest - sqs - listener
@aws
-sdk / client - sqs
// 2. Create your event class
import {IsString, IsNumber} from 'class-validator';
export class OrderCreatedEvent {
@IsString()
orderId: string;
@IsNumber()
amount: number;
}
// 3. Create your listener
import {Injectable} from '@nestjs/common';
import {QueueListener, MessageContext} from '@snow-tzu/nest-sqs-listener';
@Injectable()
export class OrderListener implements QueueListener<OrderCreatedEvent> {
async onMessage(event: OrderCreatedEvent, context: MessageContext): Promise<void> {
console.log(`Processing order ${event.orderId} for $${event.amount}`);
// Your business logic here
}
}
// 4. Configure in your module
import {Module} from '@nestjs/common';
import {SQSClient} from '@aws-sdk/client-sqs';
import {NestJSSqsMessageListenerContainer} from '@snow-tzu/nest-sqs-listener';
@Module({
providers: [
OrderListener,
{
provide: 'SQS_CLIENT',
useValue: new SQSClient({region: 'us-east-1'}),
},
{
provide: 'ORDER_CONTAINER',
useFactory: (listener, sqsClient) => {
const container = new NestJSSqsMessageListenerContainer(sqsClient);
container.configure(options => {
options
.queueName('order-queue')
.targetClass(OrderCreatedEvent)
.enableValidation(true);
});
container.setMessageListener(listener);
return container;
},
inject: [OrderListener, 'SQS_CLIENT'],
},
],
})
export class OrderModule {
}That's it! The container automatically starts on application initialization and stops on shutdown.
See the NestJS basic example for a complete working application.
// 1. Install
npm install @snow-tzu/sqs-listener @aws-sdk/client-sqs
// 2. Create your event class
import {IsString, IsNumber} from 'class-validator';
export class OrderCreatedEvent {
@IsString()
orderId: string;
@IsNumber()
amount: number;
}
// 3. Create your listener
import {QueueListener, MessageContext} from '@snow-tzu/sqs-listener';
class OrderListener implements QueueListener<OrderCreatedEvent> {
async onMessage(event: OrderCreatedEvent, context: MessageContext): Promise<void> {
console.log(`Processing order ${event.orderId} for $${event.amount}`);
// Your business logic here
}
}
// 4. Set up the container
import {SQSClient} from '@aws-sdk/client-sqs';
import {SqsMessageListenerContainer} from '@snow-tzu/sqs-listener';
const sqsClient = new SQSClient({region: 'us-east-1'});
const container = new SqsMessageListenerContainer(sqsClient);
container.configure(options => {
options
.queueName('order-queue')
.targetClass(OrderCreatedEvent)
.enableValidation(true);
});
container.setMessageListener(new OrderListener());
// 5. Start listening
await container.start();
console.log('Listening for messages...');
// Graceful shutdown
process.on('SIGTERM', async () => {
await container.stop();
process.exit(0);
});Key difference: You manually control the container lifecycle with start() and stop().
See the vanilla Node.js example for a complete working application.
// 1. Install
npm install @snow-tzu/sqs-listener @aws-sdk/client-sqs express
// 2. Create your event class
import {IsString, IsNumber} from 'class-validator';
export class OrderCreatedEvent {
@IsString()
orderId: string;
@IsNumber()
amount: number;
}
// 3. Set up Express app with SQS listener
import express from 'express';
import {SQSClient} from '@aws-sdk/client-sqs';
import {SqsMessageListenerContainer, QueueListener, MessageContext} from '@snow-tzu/sqs-listener';
const app = express();
app.use(express.json());
// Your listener
class OrderListener implements QueueListener<OrderCreatedEvent> {
async onMessage(event: OrderCreatedEvent, context: MessageContext): Promise<void> {
console.log(`Processing order ${event.orderId} for $${event.amount}`);
// Your business logic here
}
}
// Set up SQS container
const sqsClient = new SQSClient({region: 'us-east-1'});
const container = new SqsMessageListenerContainer(sqsClient);
container.configure(options => {
options
.queueName('order-queue')
.targetClass(OrderCreatedEvent)
.enableValidation(true);
});
container.setMessageListener(new OrderListener());
// Add Express routes
app.get('/health', (req, res) => {
res.json({status: 'ok'});
});
// Start both Express and SQS listener
const PORT = 3000;
app.listen(PORT, async () => {
console.log(`Express server running on port ${PORT}`);
await container.start();
console.log('SQS listener started');
});
// Graceful shutdown
process.on('SIGTERM', async () => {
await container.stop();
process.exit(0);
});Perfect for: Running SQS listeners alongside your REST API or web server.
See the Express example for a complete working application.
- 🚀 Infrastructure abstraction - Focus on business logic while the package handles all SQS infrastructure concerns
- 💉 Full NestJS integration - Leverage dependency injection and lifecycle hooks for seamless integration
- 🔒 Type-safe - Generic types throughout for compile-time safety and better developer experience
- 🎯 Flexible acknowledgement - Choose between ON_SUCCESS, MANUAL, or ALWAYS acknowledgement modes
- 🔄 Concurrency control - Configurable parallel message processing with semaphore-based limits
- 🛠️ Highly customizable - Bring your own message converters, error handlers
- ✅ Testable - All components are injectable and mockable for easy unit and integration testing
- Packages
- Quick Start
- Features
- Installation
- Why This Package?
- Comparison
- Core Concepts
- Extensibility & Decorators
- Getting Started
- Validation
- Configuration & Acknowledgement
- Best Practices
- Advanced Usage
- Examples
- Testing
- Troubleshooting
- Contributing
- License
Choose the installation command based on your framework:
Install the NestJS adapter package, which includes the core package automatically:
npm install @snow-tzu/nest-sqs-listener @aws-sdk/client-sqsThe @snow-tzu/nest-sqs-listener package includes @snow-tzu/sqs-listener as a dependency, so you don't need to
install the core package separately.
Install the framework-agnostic core package:
npm install @snow-tzu/sqs-listener @aws-sdk/client-sqsFor automatic message validation using decorators, install class-validator as a peer dependency:
npm install class-validator class-transformerNote: class-transformer is already used internally for type conversion, but you may need to install it explicitly if not already in your project. This applies to both packages.
Existing solutions for SQS message consumption (AWS SDK, bbc/sqs-consumer, @ssut/nestjs-sqs) often:
- Mix infrastructure logic with business code
- Require manual parsing and error handling
- Are tightly coupled to AWS SDK types (e.g., SQS Message)
- Lack of strong typing and validation
- Offer limited extensibility and testability
- Are framework-specific or require significant boilerplate for different frameworks
This package was created to solve these pain points by:
- Framework-agnostic core - Works with vanilla Node.js, Express, Fastify, Koa, or any Node.js environment
- Clean architecture separation - Core business logic is completely decoupled from framework concerns
- Multiple framework support - Use the same powerful features across NestJS, Express, or vanilla Node.js
- Abstracting all infrastructure concerns
- Providing a decorator-friendly, type-safe listener interface
- Supporting custom converters and error handlers
- Enabling centralized error handling and flexible acknowledgement modes
- Integrating seamlessly with NestJS DI and lifecycle hooks (via adapter)
Whether you're building a microservice with NestJS, adding background processing to your Express API, or creating a standalone Node.js worker, this package provides a consistent, type-safe approach to SQS message consumption.
| Capability | AWS SDK (raw) | bbc/sqs-consumer | @ssut/nestjs-sqs | @snow-tzu/sqs-listener (Core) | @snow-tzu/nest-sqs-listener (Adapter) |
|---|---|---|---|---|---|
| Framework Support | Any | Any | NestJS only | Any (Node.js) | NestJS only |
| Listener Payload | Raw JSON | Raw JSON | Raw SQS Message | Strong Domain Event | Strong Domain Event |
| Parsing | Manual | Manual | Manual | Automatic via converter | Automatic via converter |
| Type Safety | ❌ None | ❌ None | ✅ Strong | ✅ Strong | |
| NestJS DI Integration | ❌ No | ❌ No | ✅ Partial | N/A | ✅ Full |
| Architecture Separation | ❌ Poor | ❌ Poor | ✅ Clean | ✅ Clean | |
| Decorator-Friendly | ❌ No | ❌ No | ❌ No | ✅ Yes | ✅ Yes |
| Ack Modes | Manual only | Auto only | Auto only | ON_SUCCESS / ALWAYS / MANUAL | ON_SUCCESS / ALWAYS / MANUAL |
| Centralized Errors | ❌ No | ❌ No | ✅ Yes | ✅ Yes | |
| Custom Converters | ❌ No | ❌ No | ❌ No | ✅ Yes | ✅ Yes |
| Concurrency Control | Manual | ✅ Yes | ✅ Yes | ✅ Yes | ✅ Yes |
| Testability | Poor | Hard | Limited | ✅ Excellent | ✅ Excellent |
| Extensibility | Low | Low | Low | High | High |
Note: The concepts below apply to both the core package (
@snow-tzu/sqs-listener) and the NestJS adapter (@snow-tzu/nest-sqs-listener). The core functionality is identical; the adapter adds NestJS-specific lifecycle management and dependency injection integration.
The message listener container is the central component that manages the complete lifecycle of message consumption for a single queue.
Available in @snow-tzu/sqs-listener for framework-agnostic usage:
import {SqsMessageListenerContainer} from '@snow-tzu/sqs-listener';
const container = new SqsMessageListenerContainer(sqsClient);
// Manual lifecycle control
await container.start();
await container.stop();Features:
- Polls an SQS queue using long polling
- Converts raw messages to typed payloads
- Validates messages (optional, using class-validator)
- Invokes your listener with the typed message
- Handles acknowledgement based on configured mode
- Manages concurrency limits
- Handles errors via error handlers
Available in @snow-tzu/nest-sqs-listener for NestJS integration:
import {NestJSSqsMessageListenerContainer} from '@snow-tzu/nest-sqs-listener';
const container = new NestJSSqsMessageListenerContainer(sqsClient);
// Automatic lifecycle via NestJS hooks (OnModuleInit, OnModuleDestroy)Additional Features:
- Extends
SqsMessageListenerContainerwith all core features - Implements
OnModuleInit- automatically starts when NestJS module initializes - Implements
OnModuleDestroy- automatically stops on graceful shutdown - Integrates with NestJS Logger
- Works seamlessly with NestJS dependency injection
Key Difference: The NestJS adapter manages the container lifecycle automatically through NestJS lifecycle hooks,
while the core package requires manual start() and stop() calls.
The listener interface is identical in both packages:
// Available in both packages
import {QueueListener, MessageContext} from '@snow-tzu/sqs-listener';
// OR
import {QueueListener, MessageContext} from '@snow-tzu/nest-sqs-listener';
interface QueueListener<T> {
onMessage(payload: T, context: MessageContext): Promise<void>;
}Your listener receives:
payload: T- The strongly-typed, validated message payloadcontext: MessageContext- Metadata and control methods for the message
Implementation:
// Vanilla Node.js / Express
class OrderListener implements QueueListener<OrderCreatedEvent> {
async onMessage(event: OrderCreatedEvent, context: MessageContext): Promise<void> {
// Your business logic
}
}
// NestJS with dependency injection
@Injectable()
class OrderListener implements QueueListener<OrderCreatedEvent> {
constructor(private orderService: OrderService) {
}
async onMessage(event: OrderCreatedEvent, context: MessageContext): Promise<void> {
await this.orderService.processOrder(event);
}
}Provides access to message metadata and control methods:
interface MessageContext {
getMessageId(): string;
getReceiptHandle(): string;
getQueueUrl(): string;
getMessageAttributes(): SQSMessageAttributes;
getSystemAttributes(): Record<string, string>;
getApproximateReceiveCount(): number;
acknowledge(): Promise<void>; // For MANUAL acknowledgement mode
}Usage Example:
async
onMessage(event
:
OrderCreatedEvent, context
:
MessageContext
):
Promise < void > {
console.log(`Processing message ${context.getMessageId()}`);
// Check retry count
if(context.getApproximateReceiveCount() > 3
)
{
console.warn('Message has been retried multiple times');
}
// Manual acknowledgement (when using AcknowledgementMode.MANUAL)
await this.processOrder(event);
await context.acknowledge();
}Core Package (@snow-tzu/sqs-listener):
import {
SqsMessageListenerContainer,
QueueListener,
MessageContext,
AcknowledgementMode,
ValidationFailureMode,
PayloadMessagingConverter,
QueueListenerErrorHandler,
LoggerInterface
} from '@snow-tzu/sqs-listener';NestJS Adapter (@snow-tzu/nest-sqs-listener):
import {
NestJSSqsMessageListenerContainer, // NestJS-specific container
QueueListener, // Re-exported from core
MessageContext, // Re-exported from core
AcknowledgementMode, // Re-exported from core
ValidationFailureMode, // Re-exported from core
PayloadMessagingConverter, // Re-exported from core
QueueListenerErrorHandler, // Re-exported from core
// Note: Use NestJS Logger instead of LoggerInterface
} from '@snow-tzu/nest-sqs-listener';The adapter re-exports all core types, so you only need to import from one package.
Note: Validation is a core feature available in both packages. Whether you're using
@snow-tzu/sqs-listener( core) or@snow-tzu/nest-sqs-listener(adapter), the validation functionality works identically. All examples below apply to both packages.
This package integrates seamlessly with class-validator to automatically validate incoming SQS messages against your business rules. When enabled, messages are validated before reaching your listener, ensuring your business logic only processes valid data.
- Data Integrity: Ensure messages meet your business rules before processing
- Early Error Detection: Catch invalid messages before they cause runtime errors
- Clear Error Messages: Get detailed validation failures for debugging
- Flexible Error Handling: Choose how to handle invalid messages (throw, acknowledge, or reject)
- Type Safety: Leverage TypeScript decorators for compile-time and runtime validation
import {IsString, IsNumber, IsPositive, IsEmail, Min, Max} from 'class-validator';
export class OrderCreatedEvent {
@IsString()
orderId: string;
@IsString()
customerId: string;
@IsEmail()
customerEmail: string;
@IsNumber()
@IsPositive()
amount: number;
@IsNumber()
@Min(0)
@Max(100)
discountPercent: number;
}// Import from either package - validation works the same way
import {ValidationFailureMode} from '@snow-tzu/sqs-listener';
// OR
import {ValidationFailureMode} from '@snow-tzu/nest-sqs-listener';
container.configure(options => {
options
.queueName('order-created-queue')
.targetClass(OrderCreatedEvent)
.enableValidation(true)
.validationFailureMode(ValidationFailureMode.THROW)
.validatorOptions({
whitelist: true, // Strip non-decorated properties
forbidNonWhitelisted: true, // Reject messages with unexpected properties
});
});// NestJS example with dependency injection
@Injectable()
export class OrderCreatedListener implements QueueListener<OrderCreatedEvent> {
constructor(private orderService: OrderService) {
}
async onMessage(message: OrderCreatedEvent, context: MessageContext): Promise<void> {
// message is guaranteed to be valid - no need for manual validation!
await this.orderService.processOrder(message);
}
}
// Vanilla Node.js / Express example
class OrderCreatedListener implements QueueListener<OrderCreatedEvent> {
async onMessage(message: OrderCreatedEvent, context: MessageContext): Promise<void> {
// message is guaranteed to be valid - no need for manual validation!
console.log(`Processing order ${message.orderId}`);
}
}Control what happens when a message fails validation:
Throws an error and invokes your error handler. The message remains in the queue for retry.
.
validationFailureMode(ValidationFailureMode.THROW)Use when:
- You want to handle validation errors in your error handler
- Invalid messages might become valid after a system fix
- You want validation errors to follow your standard error handling flow
Logs the validation error and immediately removes the message from the queue. Your listener is never invoked.
.
validationFailureMode(ValidationFailureMode.ACKNOWLEDGE)Use when:
- Invalid messages will never become valid (bad data from source)
- You want to discard invalid messages to prevent queue blocking
- You're monitoring logs for validation failures
Logs the validation error but doesn't acknowledge the message. The message will retry and eventually move to your dead-letter queue.
.
validationFailureMode(ValidationFailureMode.REJECT)Use when:
- You want invalid messages to go to a dead-letter queue for analysis
- You don't want to invoke error handler overhead for validation failures
- You're using a separate process to handle DLQ messages
Pass any class-validator ValidatorOptions to customize validation behavior:
container.configure(options => {
options
.targetClass(OrderCreatedEvent)
.enableValidation(true)
.validatorOptions({
skipMissingProperties: false, // Validate all properties
whitelist: true, // Strip properties without decorators
forbidNonWhitelisted: true, // Throw error for unexpected properties
forbidUnknownValues: true, // Throw error for unknown objects
groups: ['create'], // Only validate 'create' group
dismissDefaultMessages: false, // Include default error messages
validationError: {
target: false, // Don't include target in error
value: true, // Include value in error
},
});
});Validate complex nested structures using @ValidateNested() and @Type():
import {IsString, IsNumber, IsPositive, ValidateNested, IsArray} from 'class-validator';
import {Type} from 'class-transformer';
export class OrderItem {
@IsString()
productId: string;
@IsNumber()
@IsPositive()
quantity: number;
@IsNumber()
@IsPositive()
price: number;
}
export class OrderCreatedEvent {
@IsString()
orderId: string;
@IsArray()
@ValidateNested({each: true})
@Type(() => OrderItem)
items: OrderItem[];
@IsNumber()
@IsPositive()
totalAmount: number;
}If you're using a custom message converter (XML, Protobuf, etc.), you can still add validation using the
ValidatingPayloadConverter decorator:
// Import from either package - works the same way
import {
ValidatingPayloadConverter,
ValidationFailureMode,
PayloadMessagingConverter
} from '@snow-tzu/sqs-listener';
// OR
import {
ValidatingPayloadConverter,
ValidationFailureMode,
PayloadMessagingConverter
} from '@snow-tzu/nest-sqs-listener';
// Your custom converter
class XmlOrderConverter implements PayloadMessagingConverter<OrderCreatedEvent> {
convert(body: string): OrderCreatedEvent {
return this.parseXmlToOrder(body);
}
}
// Wrap with validation
const xmlConverter = new XmlOrderConverter();
const validatingConverter = new ValidatingPayloadConverter(
xmlConverter,
OrderCreatedEvent,
{
enableValidation: true,
validationFailureMode: ValidationFailureMode.THROW,
validatorOptions: {whitelist: true}
},
logger // Optional logger for ACKNOWLEDGE/REJECT modes
);
container.configure(options => {
options
.queueName('order-queue')
.messageConverter(validatingConverter);
});How it works:
- Your custom converter transforms the message (XML → object)
- ValidatingPayloadConverter ensures it's a class instance
- class-validator validates the instance
- Your listener receives the validated message
This pattern works with any converter format: XML, Protobuf, Avro, CSV, etc.
container.configure(options => {
options
.queueName('order-queue')
.targetClass(OrderCreatedEvent)
.enableValidation(true);
// Uses THROW mode by default
});container.configure(options => {
options
.queueName('order-queue')
.targetClass(OrderCreatedEvent)
.enableValidation(true)
.validationFailureMode(ValidationFailureMode.ACKNOWLEDGE)
.validatorOptions({
whitelist: true,
forbidNonWhitelisted: true,
});
});const customConverter = new XmlOrderConverter();
container.configure(options => {
options
.queueName('order-queue')
.messageConverter(customConverter)
.targetClass(OrderCreatedEvent)
.enableValidation(true)
.validationFailureMode(ValidationFailureMode.REJECT);
// Container automatically wraps customConverter with ValidatingPayloadConverter
});Validation is designed to fail gracefully:
- class-validator not installed: Validation is skipped, messages are processed normally
- No decorators on class: Validation passes, messages are processed normally
- Validation explicitly disabled: Validation is skipped entirely
This ensures your application continues to work even if validation dependencies are missing.
Note: Configuration options are identical in both the core package (
@snow-tzu/sqs-listener) and the NestJS adapter (@snow-tzu/nest-sqs-listener). The key difference is in lifecycle management: NestJS automatically manages container startup/shutdown, while the core package requires manual control.
The configuration API is the same for both packages:
container.configure(options => {
options
.queueName('my-queue')
.pollTimeout(20)
.visibilityTimeout(30)
.maxConcurrentMessages(10)
.maxMessagesPerPoll(10)
.autoStartup(true)
.acknowledgementMode(AcknowledgementMode.ON_SUCCESS)
.messageConverter(customConverter)
// Validation options
.targetClass(MyEventClass)
.enableValidation(true)
.validationFailureMode(ValidationFailureMode.THROW)
.validatorOptions({whitelist: true});
});Configuration Methods:
queueName(name: string)- Queue name to consume frompollTimeout(seconds: number)- Long polling timeout (0-20 seconds)visibilityTimeout(seconds: number)- Message visibility timeoutmaxConcurrentMessages(count: number)- Maximum parallel message processingmaxMessagesPerPoll(count: number)- Maximum messages per poll (1-10)autoStartup(enabled: boolean)- Start automatically on application startupacknowledgementMode(mode: AcknowledgementMode)- Message acknowledgement behaviormessageConverter(converter: PayloadMessagingConverter<T>)- Custom message convertertargetClass(type: Type<T>)- Target class for transformation and validationenableValidation(enabled: boolean)- Enable class-validator validationvalidationFailureMode(mode: ValidationFailureMode)- Validation failure behaviorvalidatorOptions(options: ValidatorOptions)- class-validator options
With the NestJS adapter, the container automatically starts when the module initializes and stops on shutdown:
import {Module} from '@nestjs/common';
import {SQSClient} from '@aws-sdk/client-sqs';
import {
NestJSSqsMessageListenerContainer,
AcknowledgementMode
} from '@snow-tzu/nest-sqs-listener';
@Module({
providers: [
OrderListener,
{
provide: 'SQS_CLIENT',
useValue: new SQSClient({region: 'us-east-1'}),
},
{
provide: 'ORDER_CONTAINER',
useFactory: (listener, sqsClient) => {
const container = new NestJSSqsMessageListenerContainer(sqsClient);
container.configure(options => {
options
.queueName('order-queue')
.targetClass(OrderCreatedEvent)
.enableValidation(true)
.pollTimeout(20)
.maxConcurrentMessages(5)
.acknowledgementMode(AcknowledgementMode.ON_SUCCESS);
});
container.setMessageListener(listener);
return container;
},
inject: [OrderListener, 'SQS_CLIENT'],
},
],
})
export class OrderModule {
}Lifecycle Management:
- ✅ Container starts automatically via
OnModuleInithook - ✅ Container stops automatically via
OnModuleDestroyhook - ✅ No manual
start()orstop()calls needed - ✅ Integrates with NestJS graceful shutdown
With the core package, you manually control the container lifecycle:
import {SQSClient} from '@aws-sdk/client-sqs';
import {
SqsMessageListenerContainer,
AcknowledgementMode
} from '@snow-tzu/sqs-listener';
// Create and configure container
const sqsClient = new SQSClient({region: 'us-east-1'});
const container = new SqsMessageListenerContainer(sqsClient);
container.configure(options => {
options
.queueName('order-queue')
.targetClass(OrderCreatedEvent)
.enableValidation(true)
.pollTimeout(20)
.maxConcurrentMessages(5)
.acknowledgementMode(AcknowledgementMode.ON_SUCCESS);
});
container.setMessageListener(new OrderListener());
// Manually start the container
await container.start();
console.log('Container started, listening for messages...');
// Graceful shutdown handling
process.on('SIGTERM', async () => {
console.log('Shutting down gracefully...');
await container.stop();
process.exit(0);
});
process.on('SIGINT', async () => {
console.log('Shutting down gracefully...');
await container.stop();
process.exit(0);
});Lifecycle Management:
⚠️ You must callcontainer.start()to begin listening⚠️ You must callcontainer.stop()for graceful shutdown⚠️ You must handle process signals (SIGTERM, SIGINT) manually- ✅ Full control over when the container runs
With Express, you typically start the container alongside your HTTP server:
import express from 'express';
import {SQSClient} from '@aws-sdk/client-sqs';
import {
SqsMessageListenerContainer,
AcknowledgementMode
} from '@snow-tzu/sqs-listener';
const app = express();
// Configure SQS container
const sqsClient = new SQSClient({region: 'us-east-1'});
const container = new SqsMessageListenerContainer(sqsClient);
container.configure(options => {
options
.queueName('order-queue')
.targetClass(OrderCreatedEvent)
.enableValidation(true)
.pollTimeout(20)
.maxConcurrentMessages(5)
.acknowledgementMode(AcknowledgementMode.ON_SUCCESS);
});
container.setMessageListener(new OrderListener());
// Start both Express and SQS listener
const PORT = 3000;
app.listen(PORT, async () => {
console.log(`Express server running on port ${PORT}`);
await container.start();
console.log('SQS listener started');
});
// Graceful shutdown for both services
const shutdown = async () => {
console.log('Shutting down gracefully...');
await container.stop();
process.exit(0);
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);Lifecycle Management:
⚠️ Start container after Express server is ready⚠️ Stop container before process exits- ✅ Run SQS listener alongside HTTP server
- ✅ Share dependencies between HTTP and SQS handlers
| Aspect | NestJS Adapter | Core Package |
|---|---|---|
| Startup | Automatic via OnModuleInit |
Manual via container.start() |
| Shutdown | Automatic via OnModuleDestroy |
Manual via container.stop() |
| Signal Handling | Built into NestJS | You must implement |
| Control | Framework-managed | Full manual control |
| Use Case | NestJS applications | Vanilla Node.js, Express, Fastify, etc. |
Acknowledgement modes control when messages are deleted from the queue. This behavior is identical in both packages.
Deletes message only if onMessage() completes successfully:
container.configure(options => {
options
.queueName('order-queue')
.acknowledgementMode(AcknowledgementMode.ON_SUCCESS);
});Behavior:
- ✅ Message deleted if
onMessage()completes without error - ❌ Message remains in queue if error is thrown
- ✅ Message becomes visible again after visibility timeout
- ✅ Automatic retry on failure
Use when:
- You want automatic retry on failure
- Processing is idempotent
- You have a dead-letter queue configured for poison messages
Never automatically deletes messages - you control acknowledgement:
container.configure(options => {
options
.queueName('order-queue')
.acknowledgementMode(AcknowledgementMode.MANUAL);
});
// In your listener
class OrderListener implements QueueListener<OrderCreatedEvent> {
async onMessage(event: OrderCreatedEvent, context: MessageContext): Promise<void> {
try {
// Process the order
await this.processOrder(event);
// Explicitly acknowledge only on success
await context.acknowledge();
} catch (error) {
// Don't acknowledge - message will retry
console.error('Processing failed, message will retry', error);
throw error;
}
}
}Behavior:
⚠️ Message is NEVER deleted automatically- ✅ You call
context.acknowledge()to delete - ✅ Full control over when to acknowledge
- ✅ Can acknowledge at any point in your workflow
Use when:
- You need transactional processing (e.g., database commit + acknowledge)
- You want to acknowledge before processing completes
- You have complex conditional acknowledgement logic
- You're implementing saga patterns or distributed transactions
Always deletes message, even if processing fails:
container.configure(options => {
options
.queueName('order-queue')
.acknowledgementMode(AcknowledgementMode.ALWAYS);
});Behavior:
- ✅ Message deleted even if
onMessage()throws error ⚠️ Failed messages are lost (not retried)- ✅ Queue never gets blocked by poison messages
⚠️ You must handle failures externally
Use when:
- Messages are non-critical (e.g., analytics events)
- You have external error tracking/logging
- You're using an external dead-letter mechanism
- You want to prevent queue blocking at all costs
| Mode | Auto Delete on Success | Auto Delete on Failure | Retry on Failure | Use Case |
|---|---|---|---|---|
| ON_SUCCESS | ✅ Yes | ❌ No | ✅ Yes | Most use cases, idempotent processing |
| MANUAL | ❌ No | ❌ No | Transactional workflows, complex logic | |
| ALWAYS | ✅ Yes | ✅ Yes | ❌ No | Non-critical messages, external DLQ |
Note: The decorator pattern is a core feature that works identically in both packages. Whether you're using
@snow-tzu/sqs-listener(core) or@snow-tzu/nest-sqs-listener(adapter), you can implement decorators the same way. The examples below are framework-agnostic and work with vanilla Node.js, Express, or NestJS.
This package focuses on SQS message consumption and does not include built-in tracing or observability features. Instead, you can implement your own decorators to add cross-cutting concerns like tracing, metrics, or logging.
Decorators allow you to:
- Keep business logic clean and focused
- Use any observability tool (OpenTelemetry, New Relic, Datadog, etc.)
- Compose multiple decorators together
- Test business logic without tracing overhead
- Add or remove concerns without modifying core code
- Work consistently across any Node.js framework
This decorator works with both packages and any framework:
// Import from either package - works the same way
import {QueueListener, MessageContext} from '@snow-tzu/sqs-listener';
// OR
import {QueueListener, MessageContext} from '@snow-tzu/nest-sqs-listener';
import {trace, context as otContext, SpanStatusCode} from '@opentelemetry/api';
export class TracingListener<T> implements QueueListener<T> {
constructor(private readonly listener: QueueListener<T>) {
}
async onMessage(payload: T, context: MessageContext): Promise<void> {
const tracer = trace.getTracer('sqs-listener');
const span = tracer.startSpan('sqs.consume', {
attributes: {
'messaging.system': 'aws_sqs',
'messaging.destination': context.getQueueUrl(),
'messaging.message_id': context.getMessageId(),
},
});
try {
await otContext.with(trace.setSpan(otContext.active(), span), async () => {
await this.listener.onMessage(payload, context);
});
span.setStatus({code: SpanStatusCode.OK});
} catch (error) {
span.recordException(error as Error);
span.setStatus({code: SpanStatusCode.ERROR});
throw error;
} finally {
span.end();
}
}
}The decorator pattern works identically across all frameworks:
import {SqsMessageListenerContainer} from '@snow-tzu/sqs-listener';
// Create business logic listener
const businessListener = new OrderCreatedListener();
// Wrap with tracing decorator
const tracingListener = new TracingListener(businessListener);
// Register decorated listener with container
const container = new SqsMessageListenerContainer(sqsClient);
container.setMessageListener(tracingListener);
await container.start();import {NestJSSqsMessageListenerContainer} from '@snow-tzu/nest-sqs-listener';
@Module({
providers: [
OrderCreatedListener,
{
provide: 'ORDER_CONTAINER',
useFactory: (listener: OrderCreatedListener, sqsClient: SQSClient) => {
// Wrap with decorator
const tracingListener = new TracingListener(listener);
const container = new NestJSSqsMessageListenerContainer(sqsClient);
container.configure(options => options.queueName('order-queue'));
container.setMessageListener(tracingListener);
return container;
},
inject: [OrderCreatedListener, 'SQS_CLIENT'],
},
],
})
export class OrderModule {
}You can chain multiple decorators together in any framework:
// Framework-agnostic logging decorator
export class LoggingListener<T> implements QueueListener<T> {
constructor(
private readonly listener: QueueListener<T>,
private readonly logger: { log: (msg: string) => void; error: (msg: string, err?: any) => void }
) {
}
async onMessage(payload: T, context: MessageContext): Promise<void> {
this.logger.log(`Processing message: ${context.getMessageId()}`);
const start = Date.now();
try {
await this.listener.onMessage(payload, context);
this.logger.log(`Completed in ${Date.now() - start}ms`);
} catch (error) {
this.logger.error(`Failed after ${Date.now() - start}ms`, error);
throw error;
}
}
}
// Framework-agnostic metrics decorator
export class MetricsListener<T> implements QueueListener<T> {
constructor(
private readonly listener: QueueListener<T>,
private readonly metrics: {
increment: (name: string) => void;
timing: (name: string, value: number) => void;
}
) {
}
async onMessage(payload: T, context: MessageContext): Promise<void> {
this.metrics.increment('messages.received');
const start = Date.now();
try {
await this.listener.onMessage(payload, context);
this.metrics.timing('messages.duration', Date.now() - start);
this.metrics.increment('messages.success');
} catch (error) {
this.metrics.increment('messages.error');
throw error;
}
}
}
// Compose decorators - works the same in any framework
const businessListener = new OrderCreatedListener();
const withLogging = new LoggingListener(businessListener, logger);
const withMetrics = new MetricsListener(withLogging, metricsService);
const withTracing = new TracingListener(withMetrics);
container.setMessageListener(withTracing);While the decorator pattern is framework-agnostic, you may want to leverage framework-specific features:
When using NestJS, you can take advantage of dependency injection for decorators:
// Make decorators injectable for better DI integration
@Injectable()
export class TracingListenerFactory {
create<T>(listener: QueueListener<T>): QueueListener<T> {
return new TracingListener(listener);
}
}
// Use in module
@Module({
providers: [
OrderCreatedListener,
TracingListenerFactory,
{
provide: 'ORDER_CONTAINER',
useFactory: (
listener: OrderCreatedListener,
tracingFactory: TracingListenerFactory,
sqsClient: SQSClient
) => {
const decoratedListener = tracingFactory.create(listener);
const container = new NestJSSqsMessageListenerContainer(sqsClient);
container.setMessageListener(decoratedListener);
return container;
},
inject: [OrderCreatedListener, TracingListenerFactory, 'SQS_CLIENT'],
},
],
})
export class OrderModule {
}For non-NestJS frameworks, you can use factory functions or simple composition:
// Factory function for creating decorated listeners
function createDecoratedListener<T>(
businessListener: QueueListener<T>,
logger: any,
metrics: any
): QueueListener<T> {
const withLogging = new LoggingListener(businessListener, logger);
const withMetrics = new MetricsListener(withLogging, metrics);
const withTracing = new TracingListener(withMetrics);
return withTracing;
}
// Usage
const listener = createDecoratedListener(
new OrderCreatedListener(),
console,
metricsClient
);
container.setMessageListener(listener);See the advanced example for a complete NestJS implementation with OpenTelemetry, and the vanilla Node.js example for framework-agnostic patterns.
For type safety and to prevent naming collisions, use Symbols instead of strings for provider tokens:
// tokens.ts
export const ORDER_SQS_CLIENT = Symbol('ORDER_SQS_CLIENT');
export const ORDER_CONTAINER = Symbol('ORDER_CONTAINER');
export const NOTIFICATION_SQS_CLIENT = Symbol('NOTIFICATION_SQS_CLIENT');
export const NOTIFICATION_CONTAINER = Symbol('NOTIFICATION_CONTAINER');
// module.ts
@Module({
providers: [
{
provide: ORDER_SQS_CLIENT,
useFactory: () => new SQSClient({region: 'us-east-1'}),
},
{
provide: ORDER_CONTAINER,
useFactory: (listener, sqsClient) => {
const container = new SqsMessageListenerContainer(sqsClient);
// ... configuration
return container;
},
inject: [OrderCreatedListener, ORDER_SQS_CLIENT],
},
],
})
export class OrderModule {
}Benefits:
- Compile-time type safety (no string typos)
- Prevents naming collisions between modules
- Better IDE support and refactoring
- Clear intent and self-documenting code
- Essential when working with multiple AWS accounts or containers
This pattern is especially important when connecting to multiple AWS accounts:
// Primary AWS account for orders
{
provide: ORDER_SQS_CLIENT,
useFactory: () => new SQSClient({
region: process.env.ORDER_AWS_REGION,
credentials: {
accessKeyId: process.env.ORDER_AWS_ACCESS_KEY_ID,
secretAccessKey: process.env.ORDER_AWS_SECRET_ACCESS_KEY,
},
}),
}
// Secondary AWS account for notifications
{
provide: NOTIFICATION_SQS_CLIENT,
useFactory: () => new SQSClient({
region: process.env.NOTIFICATION_AWS_REGION,
credentials: {
accessKeyId: process.env.NOTIFICATION_AWS_ACCESS_KEY_ID,
secretAccessKey: process.env.NOTIFICATION_AWS_SECRET_ACCESS_KEY,
},
}),
}See the advanced example for a complete implementation.
Implement custom error handling logic:
import {Injectable, Logger} from '@nestjs/common';
import {QueueListenerErrorHandler, MessageContext} from 'nest-sqs-listener';
@Injectable()
export class OrderErrorHandler implements QueueListenerErrorHandler {
constructor(private readonly logger: Logger) {
}
async handleError(error: Error, message: any, context: MessageContext): Promise<void> {
this.logger.error(`Error processing order: ${error.message}`, error.stack);
// Custom logic based on error type
if (error.name === 'ValidationError') {
// Remove invalid messages from queue
await context.acknowledge();
} else if (error.name === 'TemporaryError') {
// Let it retry (don't acknowledge)
return;
} else {
// Send to monitoring system
await this.sendToMonitoring(error, context);
}
}
}
// Register with container:
container.setErrorHandler(errorHandler);Implement custom message parsing logic:
import {Injectable} from '@nestjs/common';
import {PayloadMessagingConverter} from 'nest-sqs-listener';
@Injectable()
export class XmlOrderConverter implements PayloadMessagingConverter<OrderEvent> {
convert(body: string, attributes: SQSMessageAttributes): OrderEvent {
// Parse XML and return typed object
return this.parseXmlToOrder(body);
}
}
// Register with container:
container.configure(options => {
options.messageConverter(new XmlOrderConverter());
});Configure multiple containers for different queues and regions:
@Module({
providers: [
// US East SQS Client
{
provide: 'US_EAST_SQS_CLIENT',
useFactory: () => new SQSClient({region: 'us-east-1'}),
},
// EU West SQS Client
{
provide: 'EU_WEST_SQS_CLIENT',
useFactory: () => new SQSClient({region: 'eu-west-1'}),
},
// US Container
{
provide: 'US_ORDER_CONTAINER',
useFactory: (listener, sqsClient) => {
const container = new SqsMessageListenerContainer<OrderEvent>(sqsClient);
container.configure(options => options.queueName('us-orders'));
container.setId('usOrderListener');
container.setMessageListener(listener);
return container;
},
inject: [OrderListener, 'US_EAST_SQS_CLIENT']
},
// EU Container
{
provide: 'EU_ORDER_CONTAINER',
useFactory: (listener, sqsClient) => {
const container = new SqsMessageListenerContainer<OrderEvent>(sqsClient);
container.configure(options => options.queueName('eu-orders'));
container.setId('euOrderListener');
container.setMessageListener(listener);
return container;
},
inject: [OrderListener, 'EU_WEST_SQS_CLIENT']
},
]
})
export class OrderModule {
}The repository includes complete, runnable examples for different frameworks and use cases. All examples work with LocalStack for local testing without AWS credentials.
These examples use the NestJS adapter (@snow-tzu/nest-sqs-listener) with full dependency injection and lifecycle
management.
Package: @snow-tzu/nest-sqs-listener (NestJS Adapter)
Minimal NestJS setup demonstrating core functionality with a single queue listener.
What you'll learn:
- Single queue listener configuration with NestJS DI
- Automatic acknowledgement (ON_SUCCESS mode)
- Automatic lifecycle management (OnModuleInit/OnModuleDestroy)
- Message validation with class-validator
- Business logic separation with injectable services
- LocalStack setup for local testing
Perfect for: First-time NestJS users, simple use cases, learning the basics
Key files:
order-created.listener.ts- Injectable listener with DIorder-created.event.ts- Event class with validation decoratorsorder.module.ts- NestJS module with container configurationorder.service.ts- Business logic with dependency injection
Advanced Example ⭐⭐⭐
Package: @snow-tzu/nest-sqs-listener (NestJS Adapter)
Production-ready NestJS patterns with advanced features and multiple queue listeners.
What you'll learn:
- Listener decorator pattern for adding tracing, logging, and metrics
- OpenTelemetry distributed tracing implementation
- Custom error handling with retry logic and validation error detection
- Manual acknowledgement for fine-grained control
- Multiple queue listeners with different configurations
- Multiple AWS account connections using separate SQS clients
- Symbol-based dependency injection for type safety
- Advanced validation patterns with different failure modes (THROW, ACKNOWLEDGE)
Perfect for: Production NestJS applications, complex workflows, advanced patterns
Key files:
tracing.listener.ts- Decorator pattern for cross-cutting concernscustom-error.handler.ts- Custom error handling with validation supportorder-created.event.ts- Event with strict UUID validationnotification.event.ts- Event with ACKNOWLEDGE validation modenotification.listener.ts- Manual acknowledgement exampletokens.ts- Symbol-based DI tokens
These examples use the core package (@snow-tzu/sqs-listener) directly, demonstrating usage without framework
dependencies.
Package: @snow-tzu/sqs-listener (Core Package)
Pure Node.js implementation with no framework dependencies, showing manual lifecycle management.
What you'll learn:
- Using the core package without any framework
- Manual container lifecycle control (
start()andstop()) - Custom logger implementation (LoggerInterface)
- Graceful shutdown handling with process signals
- Type-safe message handling without DI
- Minimal dependencies and maximum portability
Perfect for: Framework-agnostic applications, standalone workers, learning core concepts
Key files:
index.ts- Manual container setup and lifecycle managementorder-listener.ts- Plain class implementing QueueListenercustom-logger.ts- Custom logger implementationorder-created.event.ts- Event class with validation
Package: @snow-tzu/sqs-listener (Core Package)
Integration with Express web framework, running SQS listener alongside HTTP server.
What you'll learn:
- Using the core package with Express.js
- Running SQS listener alongside a web server
- Coordinating multiple service lifecycles
- Graceful shutdown for both HTTP and SQS
- Sharing business logic between HTTP and SQS handlers
- Manual container lifecycle management
Perfect for: Express applications, REST APIs with background processing, hybrid services
Key files:
app.ts- Express app setup with SQS integrationindex.ts- Coordinated startup of HTTP and SQS servicesorder-listener.ts- Listener implementationsqs-manager.ts- SQS container lifecycle management
Applies to: Both packages (core and adapter)
Comprehensive guide to message validation patterns with class-validator.
What you'll learn:
- Basic validation setup with decorators
- Three validation failure modes (THROW, ACKNOWLEDGE, REJECT)
- Nested object and array validation
- Custom validators and conditional validation
- Handling validation errors in error handlers
- Testing validation with invalid messages
- Best practices for production use
Perfect for: Understanding validation features, implementing data integrity checks
All examples include LocalStack setup for testing without AWS credentials:
# From the examples directory
cd examples
# Start LocalStack
docker-compose up -d
# Create SQS queues
./scripts/setup-queues.shChoose an example based on your framework:
For NestJS examples (basic or advanced):
cd basic # or: cd advanced
npm install
cp .env.example .env
npm run start:devFor vanilla Node.js example:
cd vanilla-nodejs
npm install
cp .env.example .env
npm startFor Express example:
cd express
npm install
cp .env.example .env
npm startIn another terminal, from the examples' directory:
./scripts/send-test-messages.sh localstackYou should see messages being processed in the application logs.
./scripts/send-invalid-messages.shThis sends messages with validation errors to test different validation failure modes.
| Example | Framework | Package | Lifecycle | DI | Best For |
|---|---|---|---|---|---|
| Vanilla Node.js | None | Core | Manual | No | Framework-agnostic usage |
| Express | Express.js | Core | Manual | No | Express applications |
| Basic | NestJS | Adapter | Automatic | Yes | Getting started with NestJS |
| Advanced | NestJS | Adapter | Automatic | Yes | Production NestJS patterns |
See the examples README for detailed instructions, troubleshooting, and additional commands.
import {Test} from '@nestjs/testing';
import {OrderCreatedListener} from './order-created.listener';
import {OrderService} from './order.service';
import {MessageContext} from 'nest-sqs-listener';
describe('OrderCreatedListener', () => {
let listener: OrderCreatedListener;
let orderService: jest.Mocked<OrderService>;
let context: jest.Mocked<MessageContext>;
beforeEach(async () => {
const module = await Test.createTestingModule({
providers: [
OrderCreatedListener,
{
provide: OrderService,
useValue: {
processNewOrder: jest.fn(),
},
},
],
}).compile();
listener = module.get(OrderCreatedListener);
orderService = module.get(OrderService);
context = {
getMessageId: jest.fn().mockReturnValue('msg-123'),
getReceiptHandle: jest.fn().mockReturnValue('handle-123'),
acknowledge: jest.fn(),
} as any;
});
it('should process order successfully', async () => {
const payload = {orderId: '123', customerId: '456', amount: 100};
await listener.onMessage(payload, context);
expect(orderService.processNewOrder).toHaveBeenCalledWith(payload);
});
});import {Test} from '@nestjs/testing';
import {SqsMessageListenerContainer} from 'nest-sqs-listener';
import {SQSClient, ReceiveMessageCommand} from '@aws-sdk/client-sqs';
import {mockClient} from 'aws-sdk-client-mock';
describe('SqsMessageListenerContainer Integration', () => {
let container: SqsMessageListenerContainer<OrderEvent>;
let sqsClientMock;
let listener: jest.Mocked<QueueListener<OrderEvent>>;
beforeEach(() => {
sqsClientMock = mockClient(SQSClient);
listener = {
onMessage: jest.fn(),
};
container = new SqsMessageListenerContainer(new SQSClient({}));
container.configure(options => {
options
.queueName('test-queue')
.autoStartup(false);
});
container.setMessageListener(listener);
});
it('should receive and process messages', async () => {
sqsClientMock.on(ReceiveMessageCommand).resolves({
Messages: [{
MessageId: '123',
ReceiptHandle: 'handle-123',
Body: JSON.stringify({orderId: '456', customerId: '789', amount: 100}),
}]
});
await container.start();
await new Promise(resolve => setTimeout(resolve, 100)); // Wait for processing
expect(listener.onMessage).toHaveBeenCalledWith(
{orderId: '456', customerId: '789', amount: 100},
expect.any(Object)
);
await container.stop();
});
});import {Test} from '@nestjs/testing';
import {INestApplication} from '@nestjs/common';
import {SQSClient, CreateQueueCommand, SendMessageCommand} from '@aws-sdk/client-sqs';
import {AppModule} from './app.module';
describe('E2E with LocalStack', () => {
let app: INestApplication;
let sqsClient: SQSClient;
let queueUrl: string;
beforeAll(async () => {
sqsClient = new SQSClient({
region: 'us-east-1',
endpoint: 'http://localhost:4566', // LocalStack endpoint
credentials: {
accessKeyId: 'test',
secretAccessKey: 'test',
},
});
// Create test queue
const result = await sqsClient.send(new CreateQueueCommand({
QueueName: 'test-queue'
}));
queueUrl = result.QueueUrl;
// Start NestJS app
const module = await Test.createTestingModule({
imports: [AppModule],
}).compile();
app = module.createNestApplication();
await app.init();
});
afterAll(async () => {
await app.close();
});
it('should process messages end-to-end', async () => {
// Send message to queue
await sqsClient.send(new SendMessageCommand({
QueueUrl: queueUrl,
MessageBody: JSON.stringify({orderId: '789', customerId: '123', amount: 50}),
}));
// Wait for processing
await new Promise(resolve => setTimeout(resolve, 2000));
// Verify message was processed (check database, mock service calls, etc.)
});
});Main container class for managing message consumption.
Constructor:
constructor(sqsClient: SQSClient)Methods:
configure(callback: (options: ContainerOptions) => void): void- Configure container optionssetId(id: string): void- Set container identifiersetMessageListener(listener: QueueListener<T>): void- Set message listenersetErrorHandler(handler: QueueListenerErrorHandler): void- Set error handlerstart(): Promise<void>- Manually start the containerstop(): Promise<void>- Manually stop the container
Default JSON message converter with optional validation support.
Constructor:
constructor(
targetClass?: Type<T>,
options?: JsonPayloadConverterOptions,
logger?: Logger)Options:
interface JsonPayloadConverterOptions {
enableValidation?: boolean;
validationFailureMode?: ValidationFailureMode;
validatorOptions?: ValidatorOptions;
}Decorator that wraps any PayloadMessagingConverter to add validation capabilities.
Constructor:
constructor(innerConverter: PayloadMessagingConverter<T>, targetClass: Type<T>,
options ? : JsonPayloadConverterOptions,
logger ? : Logger
)Usage:
const xmlConverter = new XmlPayloadConverter();
const validatingConverter = new ValidatingPayloadConverter(
xmlConverter,
OrderCreatedEvent,
{enableValidation: true}
);Error is thrown when message validation fails (in THROW mode).
Properties:
message: string- Human-readable error summaryvalidationErrors: ValidationError[]- Array of class-validator errorsoriginalBody: string- Raw message body for debuggingtargetClass: string- Class name that failed validation
Methods:
getConstraints()- Get all constraint failures as flat arraygetFormattedErrors()- Get formatted error messages
Default error handler that logs errors.
interface QueueListener<T> {
onMessage(payload: T, context: MessageContext): Promise<void>;
}interface MessageContext {
getMessageId(): string;
getReceiptHandle(): string;
getQueueUrl(): string;
getMessageAttributes(): SQSMessageAttributes;
getSystemAttributes(): Record<string, string>;
getApproximateReceiveCount(): number;
acknowledge(): Promise<void>;
}interface QueueListenerErrorHandler {
handleError(error: Error, message: any, context: MessageContext): Promise<void>;
}interface PayloadMessagingConverter<T> {
convert(
body: string,
attributes: SQSMessageAttributes,
context?: MessageContext
): Promise<T> | T;
}enum AcknowledgementMode {
ON_SUCCESS = 'ON_SUCCESS',
MANUAL = 'MANUAL',
ALWAYS = 'ALWAYS',
}enum ValidationFailureMode {
THROW = 'THROW', // Throw error and invoke error handler (default)
ACKNOWLEDGE = 'ACKNOWLEDGE', // Log error and remove message from queue
REJECT = 'REJECT', // Log error, don't acknowledge (message retries)
}Check:
- Verify SQSClient credentials and region are correct
- Ensure the queue name or URL is correct
- Check that
autoStartupis set totrueor manually callstart() - Verify IAM permissions include
sqs:ReceiveMessage,sqs:DeleteMessage, andsqs:GetQueueUrl - Check CloudWatch logs for any AWS SDK errors
Possible causes:
visibilityTimeoutis too short - increase it to give processing more time- Processing is taking longer than visibility timeout
- Acknowledgement mode is set to
MANUALbutacknowledge()is not being called - Error in listener is preventing acknowledgement in
ON_SUCCESSmode
Solutions:
- Increase
visibilityTimeoutto match your processing time - Use
MANUALmode and callacknowledge()only after successful processing - Implement proper error handling
Possible causes:
maxConcurrentMessagesis too high- Messages are large and many are being processed simultaneously
- Memory leak in listener implementation
Solutions:
- Reduce
maxConcurrentMessagesto limit parallel processing - Reduce
maxMessagesPerPollto fetch fewer messages at once - Profile your listener code for memory leaks
Check:
- Verify the container is registered as a provider
- Check that dependencies are properly injected
- Look for errors in application startup logs
- Verify SQSClient is properly configured and injected
Possible causes:
- Listener is throwing errors and acknowledgement mode is
ON_SUCCESS - Visibility timeout is too long
- Dead letter queue is not configured
Solutions:
- Implement proper error handling in your listener
- Configure a dead letter queue on your SQS queue
- Use
ALWAYSacknowledgement mode if messages should be removed regardless of processing outcome - Check error handler logs to identify processing issues
Solutions:
- Ensure your message class matches the JSON structure
- Use
class-transformerdecorators for complex types - Implement a custom
PayloadMessagingConverterfor non-JSON formats - Enable validation to catch type mismatches early
Possible causes:
- class-validator decorators don't match your data structure
- Nested objects missing
@ValidateNested()or@Type()decorators - String numbers not being transformed (e.g., "123" vs 123)
- Date strings not being transformed to Date objects
Solutions:
- Review your validation decorators against actual message structure
- Add
@Type()decorator for nested objects and arrays - Use
@Transform()decorator for custom transformations - Enable
validationError.value: trueto see actual values in errors - Check validation error details in logs or error handler
Check:
- Verify class-validator is installed:
npm list class-validator - Ensure validation is enabled:
enableValidation(true) - Verify target class is set:
targetClass(YourEventClass) - Check that your event class has validation decorators
- Ensure decorators are imported from 'class-validator', not other packages
Possible causes:
- Validation is disabled (default behavior)
- No validation decorators on the class
- Wrong decorators being used
Solutions:
- Explicitly enable validation:
enableValidation(true) - Verify the target class is set: targetClass(YourEventClass)
- Add appropriate class-validator decorators to your event class
- Verify decorators are from 'class-validator' package
- Test validation in isolation with class-validator's
validate()function
Contributions are welcome! Please open an issue or submit a pull request.
MIT