diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs index 66815e16d..0ed2e4a3b 100644 --- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs @@ -15,7 +15,6 @@ namespace DotNetCore.CAP.RabbitMQ; public class RabbitMQBasicConsumer : AsyncDefaultBasicConsumer { - private readonly SemaphoreSlim _semaphore; private readonly string _groupName; private readonly bool _usingTaskRun; private readonly Func _msgCallback; @@ -23,6 +22,8 @@ public class RabbitMQBasicConsumer : AsyncDefaultBasicConsumer private readonly Func>>? _customHeadersBuilder; private readonly IServiceProvider _serviceProvider; + protected SemaphoreSlim Semaphore { get; } + public RabbitMQBasicConsumer(IModel? model, byte concurrent, string groupName, Func msgCallback, @@ -31,7 +32,7 @@ public RabbitMQBasicConsumer(IModel? model, IServiceProvider serviceProvider) : base(model) { - _semaphore = new SemaphoreSlim(concurrent); + Semaphore = new SemaphoreSlim(concurrent); _groupName = groupName; _usingTaskRun = concurrent > 0; _msgCallback = msgCallback; @@ -45,7 +46,7 @@ public override async Task HandleBasicDeliver(string consumerTag, ulong delivery { if (_usingTaskRun) { - await _semaphore.WaitAsync(); + await Semaphore.WaitAsync(); _ = Task.Run(Consume).ConfigureAwait(false); } @@ -85,20 +86,28 @@ Task Consume() } } - public void BasicAck(ulong deliveryTag) + public virtual void BasicAck(ulong deliveryTag) { if (Model.IsOpen) Model.BasicAck(deliveryTag, false); - _semaphore.Release(); + Semaphore.Release(); } - public void BasicReject(ulong deliveryTag) + public virtual void BasicReject(ulong deliveryTag) { if (Model.IsOpen) Model.BasicReject(deliveryTag, true); - _semaphore.Release(); + Semaphore.Release(); + } + + public virtual void BasicNack(ulong deliveryTag) + { + if (Model.IsOpen) + Model.BasicNack(deliveryTag, false, true); + + Semaphore.Release(); } public override async Task OnCancel(params string[] consumerTags)