Skip to content

Conversation

@EtherZa
Copy link

@EtherZa EtherZa commented Jul 19, 2025

This is a partial implementation of the feature requested in #218 (limited to subscriber middleware)

The change modifies HandlerInvoker to add middleware to the execution pipeline.

I acknowledge that both the PR and implementation are unsolicited, and am happy to modify/withdraw as required.

Example usage:

services.AddAWSMessageBus(builder =>
{
    builder.AddMiddleware<LoggingMiddleware>();  // Singleton by default
    ...
}

public class LoggingMiddleware(ILogger<LoggingMiddleware> Logger) : IMiddleware
{
    public Task<MessageProcessStatus> InvokeAsync<T>(MessageEnvelope<T> messageEnvelope, RequestDelegate next, CancellationToken token = default)
    {
        var stopwatch = Stopwatch.StartNew();
        try
        {
            return next();
        }
        finally
        {
            stopwatch.Stop();
            Logger.LogInformation("Processed message with ID: {MessageId} and Type: {MessageType} in {Elapsed}",
                messageEnvelope.Id,
                messageEnvelope.MessageTypeIdentifier,
                stopwatch.Elapsed);
        }
    }
}

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@normj normj self-requested a review July 24, 2025 16:46
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements subscriber middleware functionality for the AWS Messaging library, allowing users to add middleware components to the message processing pipeline. The middleware executes in the order of registration and can intercept, modify, or handle messages before they reach the message handlers.

Key changes:

  • Added IMiddleware interface and related infrastructure for middleware support
  • Modified HandlerInvoker to execute middleware pipeline before message handlers
  • Enhanced configuration classes to register and manage middleware

Reviewed Changes

Copilot reviewed 15 out of 15 changed files in this pull request and generated 13 comments.

Show a summary per file
File Description
src/AWS.Messaging/IMiddleware.cs Defines the middleware interface and request delegate
src/AWS.Messaging/Services/HandlerInvoker.cs Implements middleware pipeline execution logic
src/AWS.Messaging/Configuration/SubscriberMiddleware.cs Configuration class for tracking middleware types and lifetimes
src/AWS.Messaging/Configuration/MessageBusBuilder.cs Adds middleware registration methods and DI setup
src/AWS.Messaging/Configuration/SubscriberMapping.cs Enhanced to cache middleware method info for reflection
test/AWS.Messaging.UnitTests/Models/SubscriberMiddlewareModels.cs Test models for middleware functionality
test/AWS.Messaging.UnitTests/HandlerInvokerTests.cs Unit tests for middleware execution behavior
sampleapps/SubscriberService/Middleware/SampleMiddleware.cs Sample middleware implementation

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@EtherZa EtherZa force-pushed the feature/middleware branch from 726da60 to 8f91ed8 Compare August 21, 2025 08:15
@EtherZa
Copy link
Author

EtherZa commented Aug 21, 2025

Typos and documentation added as per copilot. Squashed and rebased on dev.

@GarrettBeatty
Copy link
Contributor

thanks for the PR! will take a look at it next week

@EtherZa
Copy link
Author

EtherZa commented Aug 23, 2025

I have added a commit to introduce a IMessageErrorHandler which, if registered, will be executed on an exception being thrown by the IMessageHandler or IMiddleware instances. The error handler can then override the exception or have the invoker attempt to re-execute the messaging pipeline with a new DI scope.

Sample usage:

Retry - apply a jitter on a transient error to avoid having a collection of messages failing simultaneously on a dead-lock only to be sent back to the queue and be processed again at the same time, causing the dead-lock once more. As the DI scope is recreated, scoped instances reset any non-recoverable states eg. shared entity framework context.

Failed - override exception to return MessageProcessStatus.Failed() status. Useful for exceptions that may be resolved over an extended time or in a difference process eg. OutOfMemoryException, SocketException, etc

Success - override exception to return MessageProcessStatus.Success() status. Redirect a message to a DLQ to avoid reprocessing on a non-transient, unhandled exception. Responding with Success will remove the message from the original queue.


I am well aware that this is undiscussed functionality on an already undiscussed PR. While I have been using both implementations with success since April, I understand if one or both are a step to far. As such, I have left the two as distinct commits which can be squashed or removed as desired.

One area that I suspect may be less than desirable in the later commit, is the caching of generic invocations for HandlerInvoker in SubscriberMapping. This creates a tight coupling of the HandlerInvoker in the registration process. The coupling could possibly be relaxed, by extracting the cache from SubscriberMapping into it's own registration.

If LangVersion were to be bumped from 10 to 11, one possible implementation could be by adding a static abstract member on the IHandlerInvoker which then would allow an implementation to perform its own caching. The selected IHandlerInvoker type would still need to registered somewhere to be identified though -possibly as part of the MessageBusBuilder configuration.

Please let me know if you would like any changes to ease your comfort with the submission, alternatively please feel free to add/remove as you see fit or ignore it all together.

@EtherZa EtherZa force-pushed the feature/middleware branch from f4b534d to d9a500a Compare August 25, 2025 01:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants