diff --git a/Jasper.sln b/Jasper.sln index 9b527d582..46bd0b3d4 100644 --- a/Jasper.sln +++ b/Jasper.sln @@ -59,13 +59,13 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper.ConfluentKafka.Tests EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Http", "Http", "{7687741C-5880-464B-A51D-CAA0C0B1CE0D}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Jasper.Http", "src\Jasper.Http\Jasper.Http.csproj", "{AB120B77-376F-4F84-8FAC-297A066E9434}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper.Http", "src\Jasper.Http\Jasper.Http.csproj", "{AB120B77-376F-4F84-8FAC-297A066E9434}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Jasper.Http.Testing", "src\Jasper.Http.Testing\Jasper.Http.Testing.csproj", "{39F644C3-832A-471D-8827-BDC6B270F73B}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper.Http.Testing", "src\Jasper.Http.Testing\Jasper.Http.Testing.csproj", "{39F644C3-832A-471D-8827-BDC6B270F73B}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EFPlusSqlServerConsole", "src\EFPlusSqlServerConsole\EFPlusSqlServerConsole.csproj", "{5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EFPlusSqlServerConsole", "src\EFPlusSqlServerConsole\EFPlusSqlServerConsole.csproj", "{5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Jasper.Pulsar", "src\Jasper.Pulsar\Jasper.Pulsar.csproj", "{BB253930-8225-4737-9BB0-6F89A4073225}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper.Pulsar", "src\Jasper.Pulsar\Jasper.Pulsar.csproj", "{BB253930-8225-4737-9BB0-6F89A4073225}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper.Pulsar.Tests", "src\Jasper.Pulsar.Tests\Jasper.Pulsar.Tests.csproj", "{0A6C2CD0-23AF-45AB-A737-9D1D64693717}" EndProject @@ -345,6 +345,8 @@ Global {6EC1EA66-63C7-4DF6-8DCB-40DFBEA4E07A}.Release|x64.Build.0 = Release|Any CPU {6EC1EA66-63C7-4DF6-8DCB-40DFBEA4E07A}.Release|x86.ActiveCfg = Release|Any CPU {6EC1EA66-63C7-4DF6-8DCB-40DFBEA4E07A}.Release|x86.Build.0 = Release|Any CPU + {ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Debug|Any CPU.Build.0 = Debug|Any CPU {ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Debug|x64.ActiveCfg = Debug|Any CPU {ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Debug|x64.Build.0 = Debug|Any CPU {ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Debug|x86.ActiveCfg = Debug|Any CPU @@ -355,8 +357,6 @@ Global {ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Release|x64.Build.0 = Release|Any CPU {ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Release|x86.ActiveCfg = Release|Any CPU {ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Release|x86.Build.0 = Release|Any CPU - {ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Debug|Any CPU.Build.0 = Debug|Any CPU {B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A}.Debug|Any CPU.Build.0 = Debug|Any CPU {B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A}.Debug|x64.ActiveCfg = Debug|Any CPU @@ -369,6 +369,42 @@ Global {B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A}.Release|x64.Build.0 = Release|Any CPU {B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A}.Release|x86.ActiveCfg = Release|Any CPU {B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A}.Release|x86.Build.0 = Release|Any CPU + {AB120B77-376F-4F84-8FAC-297A066E9434}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {AB120B77-376F-4F84-8FAC-297A066E9434}.Debug|Any CPU.Build.0 = Debug|Any CPU + {AB120B77-376F-4F84-8FAC-297A066E9434}.Debug|x64.ActiveCfg = Debug|Any CPU + {AB120B77-376F-4F84-8FAC-297A066E9434}.Debug|x64.Build.0 = Debug|Any CPU + {AB120B77-376F-4F84-8FAC-297A066E9434}.Debug|x86.ActiveCfg = Debug|Any CPU + {AB120B77-376F-4F84-8FAC-297A066E9434}.Debug|x86.Build.0 = Debug|Any CPU + {AB120B77-376F-4F84-8FAC-297A066E9434}.Release|Any CPU.ActiveCfg = Release|Any CPU + {AB120B77-376F-4F84-8FAC-297A066E9434}.Release|Any CPU.Build.0 = Release|Any CPU + {AB120B77-376F-4F84-8FAC-297A066E9434}.Release|x64.ActiveCfg = Release|Any CPU + {AB120B77-376F-4F84-8FAC-297A066E9434}.Release|x64.Build.0 = Release|Any CPU + {AB120B77-376F-4F84-8FAC-297A066E9434}.Release|x86.ActiveCfg = Release|Any CPU + {AB120B77-376F-4F84-8FAC-297A066E9434}.Release|x86.Build.0 = Release|Any CPU + {39F644C3-832A-471D-8827-BDC6B270F73B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {39F644C3-832A-471D-8827-BDC6B270F73B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {39F644C3-832A-471D-8827-BDC6B270F73B}.Debug|x64.ActiveCfg = Debug|Any CPU + {39F644C3-832A-471D-8827-BDC6B270F73B}.Debug|x64.Build.0 = Debug|Any CPU + {39F644C3-832A-471D-8827-BDC6B270F73B}.Debug|x86.ActiveCfg = Debug|Any CPU + {39F644C3-832A-471D-8827-BDC6B270F73B}.Debug|x86.Build.0 = Debug|Any CPU + {39F644C3-832A-471D-8827-BDC6B270F73B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {39F644C3-832A-471D-8827-BDC6B270F73B}.Release|Any CPU.Build.0 = Release|Any CPU + {39F644C3-832A-471D-8827-BDC6B270F73B}.Release|x64.ActiveCfg = Release|Any CPU + {39F644C3-832A-471D-8827-BDC6B270F73B}.Release|x64.Build.0 = Release|Any CPU + {39F644C3-832A-471D-8827-BDC6B270F73B}.Release|x86.ActiveCfg = Release|Any CPU + {39F644C3-832A-471D-8827-BDC6B270F73B}.Release|x86.Build.0 = Release|Any CPU + {5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Debug|x64.ActiveCfg = Debug|Any CPU + {5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Debug|x64.Build.0 = Debug|Any CPU + {5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Debug|x86.ActiveCfg = Debug|Any CPU + {5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Debug|x86.Build.0 = Debug|Any CPU + {5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Release|Any CPU.Build.0 = Release|Any CPU + {5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Release|x64.ActiveCfg = Release|Any CPU + {5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Release|x64.Build.0 = Release|Any CPU + {5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Release|x86.ActiveCfg = Release|Any CPU + {5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Release|x86.Build.0 = Release|Any CPU {BB253930-8225-4737-9BB0-6F89A4073225}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {BB253930-8225-4737-9BB0-6F89A4073225}.Debug|Any CPU.Build.0 = Debug|Any CPU {BB253930-8225-4737-9BB0-6F89A4073225}.Debug|x64.ActiveCfg = Debug|Any CPU @@ -393,20 +429,9 @@ Global {0A6C2CD0-23AF-45AB-A737-9D1D64693717}.Release|x64.Build.0 = Release|Any CPU {0A6C2CD0-23AF-45AB-A737-9D1D64693717}.Release|x86.ActiveCfg = Release|Any CPU {0A6C2CD0-23AF-45AB-A737-9D1D64693717}.Release|x86.Build.0 = Release|Any CPU - {5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Debug|Any CPU.Build.0 = Debug|Any CPU - {5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Release|Any CPU.ActiveCfg = Release|Any CPU - {5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Release|Any CPU.Build.0 = Release|Any CPU - {AB120B77-376F-4F84-8FAC-297A066E9434}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {AB120B77-376F-4F84-8FAC-297A066E9434}.Debug|Any CPU.Build.0 = Debug|Any CPU - {AB120B77-376F-4F84-8FAC-297A066E9434}.Release|Any CPU.ActiveCfg = Release|Any CPU - {AB120B77-376F-4F84-8FAC-297A066E9434}.Release|Any CPU.Build.0 = Release|Any CPU - {39F644C3-832A-471D-8827-BDC6B270F73B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {39F644C3-832A-471D-8827-BDC6B270F73B}.Debug|Any CPU.Build.0 = Debug|Any CPU - {39F644C3-832A-471D-8827-BDC6B270F73B}.Release|Any CPU.ActiveCfg = Release|Any CPU - {39F644C3-832A-471D-8827-BDC6B270F73B}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE EndGlobalSection GlobalSection(NestedProjects) = preSolution {4F18A2E4-5056-48C8-89BA-4837F6F983E4} = {A806095A-9D66-4D55-8662-1FC67E90F6FB} @@ -423,19 +448,19 @@ Global {899902B6-63DB-4FED-ABC7-9AE35CCE1DB6} = {166943FC-7D19-4C4A-9E74-02A2CB49CD6B} {1C7783B1-CC8E-4225-9B9D-30C05A99B912} = {2257A448-52A2-466A-ABC5-BD63018F004A} {D830F62B-1031-47D5-AF3B-CC48A178FE43} = {166943FC-7D19-4C4A-9E74-02A2CB49CD6B} - {AB120B77-376F-4F84-8FAC-297A066E9434} = {7687741C-5880-464B-A51D-CAA0C0B1CE0D} - {39F644C3-832A-471D-8827-BDC6B270F73B} = {7687741C-5880-464B-A51D-CAA0C0B1CE0D} - {5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B} = {A806095A-9D66-4D55-8662-1FC67E90F6FB} + {1B86F467-4DC6-4D30-9201-FD1BD44C3271} = {C275B6A4-5CE8-4E1D-84C1-ADD4F959A6E4} + {57273C2A-3F16-49B7-AB6C-80C6F44A60FE} = {C275B6A4-5CE8-4E1D-84C1-ADD4F959A6E4} + {40670392-73E5-499C-8324-EE40BA6B5A10} = {2257A448-52A2-466A-ABC5-BD63018F004A} + {D09FBD2B-87AD-47CC-9191-5B4E06A48FBC} = {2257A448-52A2-466A-ABC5-BD63018F004A} {CA4812BF-8580-4891-95FE-518930FCF859} = {C275B6A4-5CE8-4E1D-84C1-ADD4F959A6E4} {6EC1EA66-63C7-4DF6-8DCB-40DFBEA4E07A} = {C275B6A4-5CE8-4E1D-84C1-ADD4F959A6E4} {ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A} = {C275B6A4-5CE8-4E1D-84C1-ADD4F959A6E4} {B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A} = {C275B6A4-5CE8-4E1D-84C1-ADD4F959A6E4} + {AB120B77-376F-4F84-8FAC-297A066E9434} = {7687741C-5880-464B-A51D-CAA0C0B1CE0D} + {39F644C3-832A-471D-8827-BDC6B270F73B} = {7687741C-5880-464B-A51D-CAA0C0B1CE0D} + {5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B} = {A806095A-9D66-4D55-8662-1FC67E90F6FB} {BB253930-8225-4737-9BB0-6F89A4073225} = {C275B6A4-5CE8-4E1D-84C1-ADD4F959A6E4} {0A6C2CD0-23AF-45AB-A737-9D1D64693717} = {C275B6A4-5CE8-4E1D-84C1-ADD4F959A6E4} - {1B86F467-4DC6-4D30-9201-FD1BD44C3271} = {C275B6A4-5CE8-4E1D-84C1-ADD4F959A6E4} - {57273C2A-3F16-49B7-AB6C-80C6F44A60FE} = {C275B6A4-5CE8-4E1D-84C1-ADD4F959A6E4} - {40670392-73E5-499C-8324-EE40BA6B5A10} = {2257A448-52A2-466A-ABC5-BD63018F004A} - {D09FBD2B-87AD-47CC-9191-5B4E06A48FBC} = {2257A448-52A2-466A-ABC5-BD63018F004A} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {D92D723F-44EC-4C1E-AAC3-C162FCEAAA08} diff --git a/docker-compose.yml b/docker-compose.yml index 5f9053c2f..7e42ed9c9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,3 +16,13 @@ services: - "ACCEPT_EULA=Y" - "SA_PASSWORD=P@55w0rd" - "MSSQL_PID=Developer" + pulsar: + image: apachepulsar/pulsar + ports: + - "6650:6650" + environment: + - PULSAR_MEM=" -Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g" + command: > + /bin/bash -c + "bin/apply-config-from-env.py conf/standalone.conf + && bin/pulsar standalone" \ No newline at end of file diff --git a/src/Jasper.AzureServiceBus/Internal/AzureServiceBusListener.cs b/src/Jasper.AzureServiceBus/Internal/AzureServiceBusListener.cs index bd7459d86..0a5eed55f 100644 --- a/src/Jasper.AzureServiceBus/Internal/AzureServiceBusListener.cs +++ b/src/Jasper.AzureServiceBus/Internal/AzureServiceBusListener.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; using Baseline; using Jasper.Logging; using Jasper.Transports; @@ -18,8 +19,6 @@ public class AzureServiceBusListener : IListener private readonly ITransportLogger _logger; private readonly ITransportProtocol _protocol; private readonly AzureServiceBusTransport _transport; - private IReceiverCallback _callback; - public AzureServiceBusListener(AzureServiceBusEndpoint endpoint, AzureServiceBusTransport transport, ITransportLogger logger, CancellationToken cancellation) @@ -28,8 +27,6 @@ public AzureServiceBusListener(AzureServiceBusEndpoint endpoint, AzureServiceBus _transport = transport; _logger = logger; _cancellation = cancellation; - - _protocol = endpoint.Protocol; Address = endpoint.Uri; } @@ -43,10 +40,38 @@ public void Dispose() public Uri Address { get; } public ListeningStatus Status { get; set; } - public void Start(IReceiverCallback callback) + private readonly BufferBlock<(Envelope Envelope, object AckObject)> _buffer = new BufferBlock<(Envelope Envelope, object AckObject)>(new DataflowBlockOptions + {// DO NOT CHANGE THESE SETTINGS THEY ARE IMPORTANT TO LINK RECEIVE DELEGATE WITH CONSUME() + BoundedCapacity = 1, + MaxMessagesPerTask = 1, + EnsureOrdered = true + }); + + public async IAsyncEnumerable<(Envelope Envelope, object AckObject)> Consume() + { + Start(); + + while(!_cancellation.IsCancellationRequested) + { + var received = await _buffer.ReceiveAsync(_cancellation); + yield return received; + } + } + + public Task Ack((Envelope Envelope, object AckObject) messageInfo) + { + var ackObj = ((Task Ack, Task Nack))messageInfo.AckObject; + return ackObj.Ack; + } + + public Task Nack((Envelope Envelope, object AckObject) messageInfo) { - _callback = callback; + var ackObj = ((Task Ack, Task Nack))messageInfo.AckObject; + return ackObj.Nack; + } + public void Start() + { var options = new SessionHandlerOptions(handleException); var connectionString = _transport.ConnectionString; @@ -60,26 +85,22 @@ public void Start(IReceiverCallback callback) if (topicName.IsEmpty()) { - var client = tokenProvider != null + QueueClient queueClient = tokenProvider != null ? new QueueClient(connectionString, queueName, tokenProvider, transportType, receiveMode, retryPolicy) : new QueueClient(connectionString, queueName, receiveMode, retryPolicy); - client.RegisterSessionHandler(handleMessage, options); - - _clientEntities.Add(client); + queueClient.RegisterSessionHandler(handleMessage, options); } else { - var client = tokenProvider != null + SubscriptionClient subscriptionClient = tokenProvider != null ? new SubscriptionClient(connectionString, topicName, subscriptionName, tokenProvider, transportType, receiveMode, retryPolicy) : new SubscriptionClient(connectionString, topicName, subscriptionName, receiveMode, retryPolicy); - client.RegisterSessionHandler(handleMessage, options); - - _clientEntities.Add(client); + subscriptionClient.RegisterSessionHandler(handleMessage, options); } } @@ -90,9 +111,9 @@ private Task handleException(ExceptionReceivedEventArgs arg) return Task.CompletedTask; } - private async Task handleMessage(IMessageSession session, Message message, CancellationToken token) + private async Task handleMessage(IMessageSession session, Message message, CancellationToken cancellationToken) { - var lockToken = message.SystemProperties.LockToken; + string lockToken = message.SystemProperties.LockToken; Envelope envelope; @@ -110,16 +131,7 @@ private async Task handleMessage(IMessageSession session, Message message, Cance return; } - try - { - await _callback.Received(Address, new[] {envelope}); - await session.CompleteAsync(lockToken); - } - catch (Exception e) - { - _logger.LogException(e, envelope.Id, "Error trying to receive a message from " + Address); - await session.AbandonAsync(lockToken); - } + await _buffer.SendAsync((envelope, (session.CompleteAsync(lockToken), session.AbandonAsync(lockToken))), cancellationToken); } } } diff --git a/src/Jasper.AzureServiceBus/Internal/DefaultAzureServiceBusProtocol.cs b/src/Jasper.AzureServiceBus/Internal/DefaultAzureServiceBusProtocol.cs index d85dee696..0a9b31d35 100644 --- a/src/Jasper.AzureServiceBus/Internal/DefaultAzureServiceBusProtocol.cs +++ b/src/Jasper.AzureServiceBus/Internal/DefaultAzureServiceBusProtocol.cs @@ -18,7 +18,6 @@ public virtual Message WriteFromEnvelope(Envelope envelope) ContentType = envelope.ContentType, ReplyTo = envelope.ReplyUri?.ToString(), ReplyToSessionId = envelope.CausationId.ToString(), - }; if (envelope.ExecutionTime.HasValue) @@ -50,7 +49,6 @@ public virtual Envelope ReadEnvelope(Message message) CausationId = message.ReplyToSessionId.IsNotEmpty() ? Guid.Parse(message.ReplyToSessionId) : Guid.Empty }; - envelope.ReadPropertiesFromDictionary(message.UserProperties); return envelope; diff --git a/src/Jasper.ConfluentKafka/Internal/ConfluentKafkaListener.cs b/src/Jasper.ConfluentKafka/Internal/ConfluentKafkaListener.cs index 540e07db1..e1e4cf8ff 100644 --- a/src/Jasper.ConfluentKafka/Internal/ConfluentKafkaListener.cs +++ b/src/Jasper.ConfluentKafka/Internal/ConfluentKafkaListener.cs @@ -1,5 +1,5 @@ using System; -using System.Diagnostics; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Confluent.Kafka; @@ -15,7 +15,6 @@ public class ConfluentKafkaListener : IListener private readonly IConsumer _consumer; private readonly KafkaEndpoint _endpoint; private readonly ITransportLogger _logger; - private IReceiverCallback _callback; private readonly ITransportProtocol> _protocol; private Task _consumerTask; @@ -34,22 +33,25 @@ public void Dispose() _consumerTask?.Dispose(); } - public Uri Address => _endpoint.Uri; - public ListeningStatus Status { get; set; } - public void Start(IReceiverCallback callback) + public Task Ack((Envelope Envelope, object AckObject) messageInfo) { - _callback = callback; - - _consumer.Subscribe(new []{ _endpoint.TopicName }); + var achObj = messageInfo.AckObject as ConsumeResult; - _consumerTask = ConsumeAsync(); + _consumer.Commit(achObj); - _logger.ListeningStatusChange(ListeningStatus.Accepting); + return Task.CompletedTask; } - private async Task ConsumeAsync() + public Task Nack((Envelope Envelope, object AckObject) messageInfo) => Task.CompletedTask; + + public Uri Address => _endpoint.Uri; + public ListeningStatus Status { get; set; } + + public async IAsyncEnumerable<(Envelope Envelope, object AckObject)> Consume() { + _consumer.Subscribe(new[] { _endpoint.TopicName }); + while (!_cancellation.IsCancellationRequested) { ConsumeResult message; @@ -84,16 +86,7 @@ private async Task ConsumeAsync() continue; } - try - { - await _callback.Received(Address, new[] {envelope}); - - _consumer.Commit(message); - } - catch (Exception e) - { - _logger.LogException(e, envelope.Id, "Error trying to receive a message from " + Address); - } + yield return (envelope, message); } } } diff --git a/src/Jasper.Pulsar/Internal/PulsarListener.cs b/src/Jasper.Pulsar/Internal/PulsarListener.cs index 3dfa6d06d..7489baa75 100644 --- a/src/Jasper.Pulsar/Internal/PulsarListener.cs +++ b/src/Jasper.Pulsar/Internal/PulsarListener.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using DotPulsar; @@ -14,9 +15,7 @@ public class PulsarListener : IListener private readonly IConsumer _consumer; private readonly PulsarEndpoint _endpoint; private readonly ITransportLogger _logger; - private IReceiverCallback _callback; private readonly ITransportProtocol _protocol; - private Task _consumerTask; public PulsarListener(PulsarEndpoint endpoint, ITransportLogger logger, CancellationToken cancellation) { @@ -27,26 +26,13 @@ public PulsarListener(PulsarEndpoint endpoint, ITransportLogger logger, Cancella _consumer = endpoint.PulsarClient.CreateConsumer(endpoint.ConsumerOptions); } - public void Dispose() - { - _consumer?.DisposeAsync(); - _consumerTask?.Dispose(); - } - public Uri Address => _endpoint.Uri; public ListeningStatus Status { get; set; } - public void Start(IReceiverCallback callback) + public async IAsyncEnumerable<(Envelope Envelope, object AckObject)> Consume() { - _callback = callback; - - _consumerTask = ConsumeAsync(); - _logger.ListeningStatusChange(ListeningStatus.Accepting); - } - private async Task ConsumeAsync() - { await foreach (Message message in _consumer.Messages(_cancellation)) { Envelope envelope; @@ -57,21 +43,27 @@ private async Task ConsumeAsync() } catch (Exception ex) { - _logger.LogException(ex, message: $"Error trying to map an incoming Pulsar {_endpoint.Topic} Topic message to an Envelope. See the Dead Letter Queue"); + _logger.LogException(ex, message: $"Error trying to map an incoming Pulsar {_endpoint.Topic} Topic message to an Envelope"); continue; } - try - { - await _callback.Received(Address, new[] {envelope}); - - await _consumer.Acknowledge(message, _cancellation); - } - catch (Exception e) - { - _logger.LogException(e, envelope.Id, "Error trying to receive a message from " + Address); - } + yield return (envelope, message.MessageId); } } + + + public Task Ack((Envelope Envelope, object AckObject) messageInfo) + { + var ackObj = (MessageId)messageInfo.AckObject; + + return _consumer.Acknowledge(ackObj, _cancellation).AsTask(); + } + + public Task Nack((Envelope Envelope, object AckObject) messageInfo) => Task.CompletedTask; + + public void Dispose() + { + _consumer.DisposeAsync(); + } } } diff --git a/src/Jasper.Pulsar/Internal/PulsarMessage.cs b/src/Jasper.Pulsar/Internal/PulsarMessage.cs index 5d0dda5df..597792430 100644 --- a/src/Jasper.Pulsar/Internal/PulsarMessage.cs +++ b/src/Jasper.Pulsar/Internal/PulsarMessage.cs @@ -6,10 +6,25 @@ namespace Jasper.Pulsar.Internal { internal class PulsarMessage { + public MessageId MessageId { get; set; } public MessageMetadata Metadata { get; } = new MessageMetadata(); public ReadOnlySequence Data { get; } public IReadOnlyDictionary Properties { get; } = new Dictionary(); + public PulsarMessage(Message message) + { + MessageId = message.MessageId; + Metadata.Key = message.Key; + Metadata.SequenceId = message.SequenceId; + Metadata.EventTime = message.EventTime; + Metadata.EventTimeAsDateTimeOffset = message.EventTimeAsDateTimeOffset; + Metadata.KeyBytes = message.KeyBytes; + Metadata.OrderingKey = message.OrderingKey; + + Properties = message.Properties; + Data = message.Data; + } + public PulsarMessage(ReadOnlySequence data) { Data = data; diff --git a/src/Jasper.Pulsar/Internal/PulsarTransportProtocol.cs b/src/Jasper.Pulsar/Internal/PulsarTransportProtocol.cs index e1db00733..16a7317e5 100644 --- a/src/Jasper.Pulsar/Internal/PulsarTransportProtocol.cs +++ b/src/Jasper.Pulsar/Internal/PulsarTransportProtocol.cs @@ -5,6 +5,7 @@ using DotPulsar; using Jasper.Pulsar.Internal; using Jasper.Transports; +using Newtonsoft.Json; namespace Jasper.Pulsar { @@ -12,11 +13,13 @@ internal class PulsarTransportProtocol : ITransportProtocol { public const string PulsarMessageKeyHeader = "Pulsar.Message.Key"; public const string PulsarMessageSequenceIdHeader = "Pulsar.Message.SequenceId"; + public const string PulsarMessageIdHeader = "Pulsar.Message.MessageId"; - private readonly Dictionary> _pulsarMsgPropTypes = new Dictionary>() + private readonly Dictionary> _pulsarMsgPropTypes = new Dictionary>() { - { PulsarMessageKeyHeader, (metadata, val) => metadata.Key = val?.ToString() }, - { PulsarMessageSequenceIdHeader, (metadata, val) => metadata.SequenceId = val != null ? ulong.Parse(val.ToString()) : default }, + { PulsarMessageKeyHeader, (msg, val) => msg.Metadata.Key = val?.ToString() }, + { PulsarMessageSequenceIdHeader, (msg, val) => msg.Metadata.SequenceId = val != null ? ulong.Parse(val.ToString()) : default }, + { PulsarMessageIdHeader, (msg, val) => msg.MessageId = val != null ? JsonConvert.DeserializeObject(val.ToString()) : null }, }; public PulsarMessage WriteFromEnvelope(Envelope envelope) @@ -31,32 +34,34 @@ public PulsarMessage WriteFromEnvelope(Envelope envelope) metadata[header.Key] = header.Value.ToString(); } - SetMetaDataFromHeaderValues(metadata, envelopHeaders); - - return new PulsarMessage(envelope.Data, metadata); + var pulsarMessage = new PulsarMessage(envelope.Data, metadata); + SetMetaDataFromHeaderValues(pulsarMessage, envelopHeaders); + return pulsarMessage; } - private void SetMetaDataFromHeaderValues(MessageMetadata metadata, IDictionary envelopHeaders) + private void SetMetaDataFromHeaderValues(PulsarMessage msg, IDictionary envelopHeaders) { - foreach (var pulsarMsgPropType in _pulsarMsgPropTypes) + foreach (KeyValuePair> pulsarMsgPropType in _pulsarMsgPropTypes) { - SetMetaDataFromHeaderValue(metadata, envelopHeaders, pulsarMsgPropType.Key, pulsarMsgPropType.Value); + SetMetaDataFromHeaderValue(msg, envelopHeaders, pulsarMsgPropType.Key, pulsarMsgPropType.Value); } } - private void SetMetaDataFromHeaderValue(MessageMetadata metadata, IDictionary envelopHeaders, string propertyName, Action propertySetter) + private void SetMetaDataFromHeaderValue(PulsarMessage msg, IDictionary envelopHeaders, string propertyName, Action propertySetter) { - if (envelopHeaders.TryGetValue(propertyName, out object headerValue)) propertySetter(metadata, headerValue); + if (envelopHeaders.TryGetValue(propertyName, out object headerValue)) propertySetter(msg, headerValue); } public Envelope ReadEnvelope(PulsarMessage message) { var envelope = new Envelope - { + { Data = message.Data.ToArray(), - Headers = message.Properties.ToDictionary(ks => ks.Key, vs => vs.Value) + Headers = message.Properties.ToDictionary(ks => ks.Key, vs => vs.Value), }; + envelope.Headers.Add(PulsarMessageIdHeader, JsonConvert.SerializeObject(message.MessageId)); + envelope.ReadPropertiesFromDictionary(message.Properties.ToDictionary(ks => ks.Key, vs => (object)vs.Value)); return envelope; diff --git a/src/Jasper.RabbitMQ/Internal/RabbitMqEndpoint.cs b/src/Jasper.RabbitMQ/Internal/RabbitMqEndpoint.cs index 1990da238..3fe5d6b27 100644 --- a/src/Jasper.RabbitMQ/Internal/RabbitMqEndpoint.cs +++ b/src/Jasper.RabbitMQ/Internal/RabbitMqEndpoint.cs @@ -119,7 +119,7 @@ protected internal override void StartListening(IMessagingRoot root, ITransportR { if (!IsListener) return; - var listener = new RabbitMqListener(root.TransportLogger, this, Parent); + var listener = new RabbitMqListener(root.TransportLogger, this, Parent, root.Cancellation); runtime.AddListener(listener, this); } diff --git a/src/Jasper.RabbitMQ/Internal/RabbitMqListener.cs b/src/Jasper.RabbitMQ/Internal/RabbitMqListener.cs index 39daba259..de0f2681d 100644 --- a/src/Jasper.RabbitMQ/Internal/RabbitMqListener.cs +++ b/src/Jasper.RabbitMQ/Internal/RabbitMqListener.cs @@ -1,5 +1,8 @@ using System; -using Baseline; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; using Jasper.Logging; using Jasper.Transports; using RabbitMQ.Client; @@ -9,49 +12,59 @@ namespace Jasper.RabbitMQ.Internal public class RabbitMqListener : RabbitMqConnectionAgent, IListener { private readonly ITransportLogger _logger; + private readonly CancellationToken _cancellationToken; private readonly IRabbitMqProtocol _mapper; - private IReceiverCallback _callback; private MessageConsumer _consumer; private readonly string _routingKey; - public RabbitMqListener(ITransportLogger logger, - RabbitMqEndpoint endpoint, RabbitMqTransport transport) : base(transport) + public RabbitMqListener(ITransportLogger logger, RabbitMqEndpoint endpoint, RabbitMqTransport transport, CancellationToken cancellationToken) : base(transport) { _logger = logger; + _cancellationToken = cancellationToken; _mapper = endpoint.Protocol; Address = endpoint.Uri; _routingKey = endpoint.RoutingKey ?? endpoint.QueueName ?? ""; } - public ListeningStatus Status + public ListeningStatus Status { get; set; } + + + private readonly BufferBlock<(Envelope Envelope, object AckObject)> _buffer = new BufferBlock<(Envelope Envelope, object AckObject)>(new DataflowBlockOptions + {// DO NOT CHANGE THESE SETTINGS THEY ARE IMPORTANT TO LINK RECEIVE DELEGATE WITH CONSUME() + BoundedCapacity = 1, + MaxMessagesPerTask = 1, + EnsureOrdered = true + }); + public async IAsyncEnumerable<(Envelope Envelope, object AckObject)> Consume() { - get => _consumer != null ? ListeningStatus.Accepting : ListeningStatus.TooBusy; - set + Start(); + + while (!_cancellationToken.IsCancellationRequested) { - switch (value) - { - case ListeningStatus.TooBusy when _consumer != null: - _consumer.Dispose(); - - Channel.BasicCancel(_consumer.ConsumerTag); - _consumer = null; - break; - case ListeningStatus.Accepting when _consumer == null: - Start(_callback); - break; - } + yield return await _buffer.ReceiveAsync(_cancellationToken); } } - public void Start(IReceiverCallback callback) + public Task Ack((Envelope Envelope, object AckObject) messageInfo) + { + Channel.BasicAck((ulong)messageInfo.AckObject, false); + return Task.CompletedTask; + } + + public Task Nack((Envelope Envelope, object AckObject) messageInfo) { - if (callback == null) return; + Channel.BasicNack((ulong)messageInfo.AckObject, false, true); + return Task.CompletedTask; + } + public void Start() + { EnsureConnected(); - _callback = callback; - _consumer = new MessageConsumer(callback, _logger, Channel, _mapper, Address) + Status = ListeningStatus.Accepting; + + _consumer = new MessageConsumer(_logger, Channel, _mapper, Address, _buffer, _cancellationToken) { ConsumerTag = Guid.NewGuid().ToString() }; @@ -59,26 +72,27 @@ public void Start(IReceiverCallback callback) Channel.BasicConsume(_consumer, _routingKey); } - public Uri Address { get; } public class MessageConsumer : DefaultBasicConsumer, IDisposable { private readonly Uri _address; - private readonly IReceiverCallback _callback; - private readonly IModel _channel; + private readonly BufferBlock<(Envelope Envelope, object AckObject)> _receiveBuffer; + private readonly CancellationToken _cancellationToken; + public readonly IModel _channel; private readonly ITransportLogger _logger; private readonly IRabbitMqProtocol _mapper; private bool _latched; - public MessageConsumer(IReceiverCallback callback, ITransportLogger logger, IModel channel, - IRabbitMqProtocol mapper, Uri address) : base(channel) + public MessageConsumer(ITransportLogger logger, IModel channel, + IRabbitMqProtocol mapper, Uri address, BufferBlock<(Envelope Envelope, object AckObject)> receiveBuffer, CancellationToken cancellationToken) : base(channel) { - _callback = callback; _logger = logger; _channel = channel; _mapper = mapper; _address = address; + _receiveBuffer = receiveBuffer; + _cancellationToken = cancellationToken; } public void Dispose() @@ -90,8 +104,6 @@ public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, b string exchange, string routingKey, IBasicProperties properties, byte[] body) { - if (_callback == null) return; - if (_latched) { _channel.BasicReject(deliveryTag, true); @@ -117,18 +129,7 @@ public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, b return; } - _callback.Received(_address, new[] {envelope}).ContinueWith(t => - { - if (t.IsFaulted) - { - _logger.LogException(t.Exception, envelope.Id, "Failure to receive an incoming message"); - _channel.BasicNack(deliveryTag, false, true); - } - else - { - _channel.BasicAck(deliveryTag, false); - } - }); + _receiveBuffer.SendAsync((envelope, deliveryTag), _cancellationToken).Wait(_cancellationToken); } } } diff --git a/src/Jasper.Testing/Transports/Receiving/SocketListeningAgentTests.cs b/src/Jasper.Testing/Transports/Receiving/SocketListeningAgentTests.cs index 519785d82..87a794019 100644 --- a/src/Jasper.Testing/Transports/Receiving/SocketListeningAgentTests.cs +++ b/src/Jasper.Testing/Transports/Receiving/SocketListeningAgentTests.cs @@ -1,4 +1,4 @@ -using System.IO; +using System.IO; using System.Net; using System.Threading; using System.Threading.Tasks; @@ -13,25 +13,29 @@ namespace Jasper.Testing.Transports.Receiving { public class SocketListeningAgentTests { - [Fact] - public async Task receive_when_it_is_latched() - { - var stream = new MemoryStream(); + // Moved to Agent (not listener + //[Fact] + //public async Task receive_when_it_is_latched() + //{ + // var stream = new MemoryStream(); - var agent = new SocketListener(IPAddress.Any, 5500, CancellationToken.None); - agent.Status = ListeningStatus.TooBusy; + // var agent = new SocketListener(IPAddress.Any, 5500, CancellationToken.None); + // agent.Status = ListeningStatus.TooBusy; - var callback = Substitute.For(); + // var callback = Substitute.For(); - await agent.HandleStream(callback, stream); + // await foreach (var recievedMessage in agent.Consume()) + // { + // break; + // } - stream.Position = 0; - var bytes = stream.ReadAllBytes(); + // stream.Position = 0; + // var bytes = stream.ReadAllBytes(); - bytes.ShouldBe(WireProtocol.ProcessingFailureBuffer); + // bytes.ShouldBe(WireProtocol.ProcessingFailureBuffer); - callback.DidNotReceive().Received(); - } + // callback.DidNotReceive().Received(); + //} [Fact] public void status_is_accepting_by_default() diff --git a/src/Jasper.Testing/Transports/Tcp/Protocol/ProtocolContext.cs b/src/Jasper.Testing/Transports/Tcp/Protocol/ProtocolContext.cs index b1676a80e..2b0f95d64 100644 --- a/src/Jasper.Testing/Transports/Tcp/Protocol/ProtocolContext.cs +++ b/src/Jasper.Testing/Transports/Tcp/Protocol/ProtocolContext.cs @@ -74,7 +74,33 @@ protected async Task afterSending() await client.ConnectAsync(Destination.Host, Destination.Port); - await WireProtocol.Send(client.GetStream(), theMessageBatch, null, theSender); + var callback = (ISenderCallback)theSender; + try + { + WireProtocol.SendStatus result = await WireProtocol.Send(client.GetStream(), theMessageBatch, null); + switch (result) + { + + case WireProtocol.SendStatus.Failure: + await callback.ProcessingFailure(theMessageBatch); + break; + case WireProtocol.SendStatus.Success: + await callback.Successful(theMessageBatch); + break; + case WireProtocol.SendStatus.SerializationFailure: + await callback.SerializationFailure(theMessageBatch); + break; + case WireProtocol.SendStatus.QueueDoesNotExist: + await callback.QueueDoesNotExist(theMessageBatch); + break; + default: + throw new ArgumentOutOfRangeException(); + } + } + catch (Exception e) + { + await callback.ProcessingFailure(theMessageBatch, e); + } } } diff --git a/src/Jasper/Jasper.csproj b/src/Jasper/Jasper.csproj index b6295f4aa..724b1e00f 100644 --- a/src/Jasper/Jasper.csproj +++ b/src/Jasper/Jasper.csproj @@ -1,4 +1,4 @@ - + In-memory command bus and asynchronous messaging framework Jeremy D. Miller, Mark Warpool @@ -14,7 +14,7 @@ false false false - 7.1 + 8.0 diff --git a/src/Jasper/Runtime/WorkerQueues/DurableWorkerQueue.cs b/src/Jasper/Runtime/WorkerQueues/DurableWorkerQueue.cs index 6f3e32b4a..fbc95114f 100644 --- a/src/Jasper/Runtime/WorkerQueues/DurableWorkerQueue.cs +++ b/src/Jasper/Runtime/WorkerQueues/DurableWorkerQueue.cs @@ -20,6 +20,7 @@ public class DurableWorkerQueue : IWorkerQueue, IChannelCallback, IHasNativeSche private readonly ActionBlock _receiver; private IListener _agent; private readonly AsyncRetryPolicy _policy; + private Task _listenerTask; public DurableWorkerQueue(Endpoint endpoint, IHandlerPipeline pipeline, AdvancedSettings settings, IEnvelopePersistence persistence, ITransportLogger logger) @@ -71,20 +72,39 @@ public Task ScheduleExecution(Envelope envelope) public void StartListening(IListener listener) { _agent = listener; - _agent.Start(this); - Address = _agent.Address; - } - - public Uri Address { get; set; } + _listenerTask = ConsumeFromAgent(); + } - public ListeningStatus Status + async Task ConsumeFromAgent() { - get => _agent.Status; - set => _agent.Status = value; + Status = ListeningStatus.Accepting; + var callback = ((IReceiverCallback)this); + await foreach ((Envelope Envelope, object AckObject) receivedInfo in _agent.Consume()) + { + Envelope envelope = receivedInfo.Envelope; + + try + { + await callback.Received(Address, new[] { envelope }); + + await _agent.Ack(receivedInfo); + await callback.Acknowledged(new[] { envelope }); + } + catch (Exception e) + { + _logger.LogException(e, envelope.Id, "Error trying to receive a message from " + Address); + await _agent.Nack(receivedInfo); + await callback.NotAcknowledged(new[] { envelope }); + } + } } + public Uri Address { get; set; } + + public ListeningStatus Status { get; set; } + async Task IReceiverCallback.Received(Uri uri, Envelope[] messages) { var now = DateTime.UtcNow; @@ -102,13 +122,12 @@ Task IReceiverCallback.NotAcknowledged(Envelope[] messages) return _persistence.DeleteIncomingEnvelopes(messages); } - Task IReceiverCallback.Failed(Exception exception, Envelope[] messages) + public Task Failed(Exception exception, Envelope[] messages) { _logger.LogException(new MessageFailureException(messages, exception)); return Task.CompletedTask; } - public void Dispose() { // nothing @@ -125,8 +144,7 @@ public async Task ProcessReceivedMessages(DateTime now, Uri uri, await _persistence.StoreIncoming(envelopes); - - foreach (var message in incoming) + foreach (Envelope message in incoming) { await Enqueue(message); } diff --git a/src/Jasper/Runtime/WorkerQueues/LightweightWorkerQueue.cs b/src/Jasper/Runtime/WorkerQueues/LightweightWorkerQueue.cs index 069492f1c..ad1ea0cb4 100644 --- a/src/Jasper/Runtime/WorkerQueues/LightweightWorkerQueue.cs +++ b/src/Jasper/Runtime/WorkerQueues/LightweightWorkerQueue.cs @@ -10,7 +10,7 @@ namespace Jasper.Runtime.WorkerQueues { - public class LightweightWorkerQueue : IWorkerQueue, IChannelCallback, IHasNativeScheduling + public class LightweightWorkerQueue : IWorkerQueue, IChannelCallback, IHasNativeScheduling, IReceiverCallback { private readonly ITransportLogger _logger; private readonly AdvancedSettings _settings; @@ -68,13 +68,39 @@ public Task ScheduleExecution(Envelope envelope) return Task.CompletedTask; } - + private Task _listeningTask; public void StartListening(IListener listener) { _agent = listener; - _agent.Start(this); - Address = _agent.Address; + + _listeningTask = ConsumeFromAgent(); + } + + async Task ConsumeFromAgent() + { + Status = ListeningStatus.Accepting; + + var callback = ((IReceiverCallback) this); + + await foreach ((Envelope Envelope, object AckObject) received in _agent.Consume()) + { + try + { + var envelope = received.Envelope; + + await callback.Received(Address, new[] {envelope}); + + await _agent.Ack(received); + await callback.Acknowledged(new[] {envelope}); + } + catch (Exception e) + { + _logger.LogException(e, received.Envelope.Id, "Error trying to receive a message from " + Address); + await _agent.Nack(received); + await callback.NotAcknowledged(new[] {received.Envelope}); + } + } } public Uri Address { get; set; } diff --git a/src/Jasper/Transports/IListener.cs b/src/Jasper/Transports/IListener.cs index 9ea7dcd2b..022a7bc2b 100644 --- a/src/Jasper/Transports/IListener.cs +++ b/src/Jasper/Transports/IListener.cs @@ -1,4 +1,6 @@ -using System; +using System; +using System.Collections.Generic; +using System.Threading.Tasks; namespace Jasper.Transports { @@ -6,6 +8,8 @@ public interface IListener : IDisposable { Uri Address { get; } ListeningStatus Status { get; set; } - void Start(IReceiverCallback callback); + IAsyncEnumerable<(Envelope Envelope, object AckObject)> Consume(); + Task Ack((Envelope Envelope, object AckObject) messageInfo); + Task Nack((Envelope Envelope, object AckObject) messageInfo); } } diff --git a/src/Jasper/Transports/ListeningAgent.cs b/src/Jasper/Transports/ListeningAgent.cs index 0f74c06f7..32d005d37 100644 --- a/src/Jasper/Transports/ListeningAgent.cs +++ b/src/Jasper/Transports/ListeningAgent.cs @@ -1,10 +1,12 @@ -using System; +using System; +using System.Linq; using System.Net; using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using Jasper.Transports.Tcp; +using Jasper.Transports.Util; using Jasper.Util; namespace Jasper.Transports @@ -33,9 +35,28 @@ public ListeningAgent(IReceiverCallback callback, IPAddress ipaddr, int port, st _socketHandling = new ActionBlock(async s => { - using (var stream = new NetworkStream(s, true)) + await using var stream = new NetworkStream(s, true); + WireProtocol.BeginReceiveResult result = await WireProtocol.BeginReceive(stream, _uri); + + if (result.Status == ReceivedStatus.Successful) { - await WireProtocol.Receive(stream, _callback, _uri); + try + { + if (result.Messages.Any() && result.Messages.First().IsPing()) + { + await WireProtocol.EndReceive(stream, ReceivedStatus.Successful); + return; + } + + ReceivedStatus receivedStatus = await _callback.Received(_uri, result.Messages); + + await WireProtocol.EndReceive(stream, receivedStatus); + } + catch (Exception ex) + { + await _callback.Failed(ex, result.Messages); + await WireProtocol.EndReceive(stream, ReceivedStatus.ProcessFailure); + } } }, new ExecutionDataflowBlockOptions{CancellationToken = _cancellationToken}); diff --git a/src/Jasper/Transports/Sending/SocketSenderProtocol.cs b/src/Jasper/Transports/Sending/SocketSenderProtocol.cs index 52f853d30..d6b5f70d3 100644 --- a/src/Jasper/Transports/Sending/SocketSenderProtocol.cs +++ b/src/Jasper/Transports/Sending/SocketSenderProtocol.cs @@ -23,14 +23,32 @@ public async Task SendBatch(ISenderCallback callback, OutgoingMessageBatch batch if (connection.IsCompleted) using (var stream = client.GetStream()) { - var protocolTimeout = WireProtocol.Send(stream, batch, batch.Data, callback); + var protocolTimeout = WireProtocol.Send(stream, batch, batch.Data); //var protocolTimeout = .TimeoutAfter(5000); - await protocolTimeout.ConfigureAwait(false); + WireProtocol.SendStatus result = await protocolTimeout.ConfigureAwait(false); if (!protocolTimeout.IsCompleted) await callback.TimedOut(batch); if (protocolTimeout.IsFaulted) await callback.ProcessingFailure(batch, protocolTimeout.Exception); + + switch (result) + { + case WireProtocol.SendStatus.Failure: + await callback.ProcessingFailure(batch); + break; + case WireProtocol.SendStatus.Success: + await callback.Successful(batch); + break; + case WireProtocol.SendStatus.SerializationFailure: + await callback.SerializationFailure(batch); + break; + case WireProtocol.SendStatus.QueueDoesNotExist: + await callback.QueueDoesNotExist(batch); + break; + default: + throw new ArgumentOutOfRangeException(nameof(WireProtocol.SendStatus)); + } } else await callback.TimedOut(batch); diff --git a/src/Jasper/Transports/Tcp/ReceivedStatus.cs b/src/Jasper/Transports/Tcp/ReceivedStatus.cs index b1981f248..8c6278a55 100644 --- a/src/Jasper/Transports/Tcp/ReceivedStatus.cs +++ b/src/Jasper/Transports/Tcp/ReceivedStatus.cs @@ -1,9 +1,12 @@ -namespace Jasper.Transports.Tcp +namespace Jasper.Transports.Tcp { public enum ReceivedStatus { Successful, QueueDoesNotExist, - ProcessFailure + SerializationFailure, + ProcessFailure, + Acknowledged, + NotAcknowledged } } diff --git a/src/Jasper/Transports/Tcp/SocketListener.cs b/src/Jasper/Transports/Tcp/SocketListener.cs index 74141f214..cb3bdc04c 100644 --- a/src/Jasper/Transports/Tcp/SocketListener.cs +++ b/src/Jasper/Transports/Tcp/SocketListener.cs @@ -1,11 +1,11 @@ -using System; -using System.IO; +using System; +using System.Collections.Generic; using System.Net; using System.Net.Sockets; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; -using Jasper.Transports.Util; using Jasper.Util; namespace Jasper.Transports.Tcp @@ -15,9 +15,7 @@ public class SocketListener : IListener private readonly CancellationToken _cancellationToken; private readonly IPAddress _ipaddr; private readonly int _port; - private TcpListener _listener; - private Task _receivingLoop; - private ActionBlock _socketHandling; + private TcpListener _tcpListener; public SocketListener(IPAddress ipaddr, int port, CancellationToken cancellationToken) { @@ -28,47 +26,48 @@ public SocketListener(IPAddress ipaddr, int port, CancellationToken cancellation Address = $"tcp://{ipaddr}:{port}/".ToUri(); } - public void Start(IReceiverCallback callback) + public async IAsyncEnumerable<(Envelope Envelope, object AckObject)> Consume() { - _listener = new TcpListener(new IPEndPoint(_ipaddr, _port)); - _listener.Server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); + _tcpListener = new TcpListener(new IPEndPoint(_ipaddr, _port)); + _tcpListener.Server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); + _tcpListener.Start(); - _socketHandling = new ActionBlock(async s => + while (!_cancellationToken.IsCancellationRequested) { - using (var stream = new NetworkStream(s, true)) + Socket socket = await _tcpListener.AcceptSocketAsync(); + + await using var stream = new NetworkStream(socket, true); + + if (Status == ListeningStatus.TooBusy) { - await HandleStream(callback, stream); + await WireProtocol.EndReceive(stream, ReceivedStatus.ProcessFailure); + continue; } - }, new ExecutionDataflowBlockOptions{CancellationToken = _cancellationToken}); - _receivingLoop = Task.Run(async () => - { - _listener.Start(); + WireProtocol.BeginReceiveResult result = await WireProtocol.BeginReceive(stream, Address); + + if (result.Status != ReceivedStatus.Successful) continue; - while (!_cancellationToken.IsCancellationRequested) + foreach (Envelope resultMessage in result.Messages) { - var socket = await _listener.AcceptSocketAsync(); - await _socketHandling.SendAsync(socket, _cancellationToken); + yield return (resultMessage, null); } - }, _cancellationToken); + + await WireProtocol.EndReceive(stream, ReceivedStatus.Successful); + } } + public Task Ack((Envelope Envelope, object AckObject) messageInfo) => Task.CompletedTask; + public Task Nack((Envelope Envelope, object AckObject) messageInfo) => Task.CompletedTask; + public Uri Address { get; } public void Dispose() { - _socketHandling?.Complete(); - _listener?.Stop(); - _listener?.Server.Dispose(); + _tcpListener?.Stop(); + _tcpListener?.Server.Dispose(); } public ListeningStatus Status { get; set; } = ListeningStatus.Accepting; - - public Task HandleStream(IReceiverCallback callback, Stream stream) - { - if (Status == ListeningStatus.TooBusy) return stream.SendBuffer(WireProtocol.ProcessingFailureBuffer); - - return WireProtocol.Receive(stream, callback, Address); - } } } diff --git a/src/Jasper/Transports/Tcp/WireProtocol.cs b/src/Jasper/Transports/Tcp/WireProtocol.cs index dc9380662..4dc2fe41c 100644 --- a/src/Jasper/Transports/Tcp/WireProtocol.cs +++ b/src/Jasper/Transports/Tcp/WireProtocol.cs @@ -3,7 +3,6 @@ using System.Linq; using System.Text; using System.Threading.Tasks; -using Jasper.Transports.Sending; using Jasper.Transports.Util; namespace Jasper.Transports.Tcp @@ -31,87 +30,89 @@ public static class WireProtocol // Nothing but actually sending here. Worry about timeouts and retries somewhere // else - public static async Task Send(Stream stream, OutgoingMessageBatch batch, byte[] messageBytes, - ISenderCallback callback) + public static async Task Send(Stream stream, OutgoingMessageBatch batch, byte[] messageBytes) { - messageBytes = messageBytes ?? Envelope.Serialize(batch.Messages); - - var lengthBytes = BitConverter.GetBytes(messageBytes.Length); + messageBytes ??= Envelope.Serialize(batch.Messages); + byte[] lengthBytes = BitConverter.GetBytes(messageBytes.Length); await stream.WriteAsync(lengthBytes, 0, lengthBytes.Length); - await stream.WriteAsync(messageBytes, 0, messageBytes.Length); // All four of the possible receive confirmation messages are the same length: 8 characters long encoded in UTF-16. - var confirmationBytes = await stream.ReadBytesAsync(ReceivedBuffer.Length).ConfigureAwait(false); + byte[] confirmationBytes = await stream.ReadBytesAsync(ReceivedBuffer.Length).ConfigureAwait(false); if (confirmationBytes.SequenceEqual(ReceivedBuffer)) { - await callback.Successful(batch); - - await stream.WriteAsync(AcknowledgedBuffer, 0, AcknowledgedBuffer.Length); + return SendStatus.Success; } else if (confirmationBytes.SequenceEqual(ProcessingFailureBuffer)) { - await callback.ProcessingFailure(batch); + return SendStatus.Failure; } else if (confirmationBytes.SequenceEqual(SerializationFailureBuffer)) { - await callback.SerializationFailure(batch); + return SendStatus.SerializationFailure; } else if (confirmationBytes.SequenceEqual(QueueDoesNotExistBuffer)) { - await callback.QueueDoesNotExist(batch); + return SendStatus.QueueDoesNotExist; } + + return SendStatus.Failure; } + public static Task Ack(Stream stream) => stream.WriteAsync(AcknowledgedBuffer, 0, AcknowledgedBuffer.Length); - public static async Task Receive(Stream stream, IReceiverCallback callback, Uri uri) + public enum SendStatus { - Envelope[] messages = null; + Failure, + Success, + SerializationFailure, + QueueDoesNotExist + } - try - { - var lengthBytes = await stream.ReadBytesAsync(sizeof(int)); - var length = BitConverter.ToInt32(lengthBytes, 0); - if (length == 0) return; + public class BeginReceiveResult + { + public ReceivedStatus Status { get; } + public Envelope[] Messages { get; } + public Exception Exception { get; } - var bytes = await stream.ReadBytesAsync(length); - messages = Envelope.ReadMany(bytes); - } - catch (Exception e) + BeginReceiveResult(ReceivedStatus status) { - await callback.Failed(e, messages); - await stream.SendBuffer(SerializationFailureBuffer); - return; + Status = status; } - try + public BeginReceiveResult(ReceivedStatus status, Envelope[] messages) : this(status) { - await receive(stream, callback, messages, uri); + Messages = messages; } - catch (Exception ex) + + public BeginReceiveResult(ReceivedStatus status, Exception exception) : this(status) { - await callback.Failed(ex, messages); - await stream.SendBuffer(ProcessingFailureBuffer); + Exception = exception; } } - private static async Task receive(Stream stream, IReceiverCallback callback, Envelope[] messages, Uri uri) + public static async Task BeginReceive(Stream stream, Uri uri) { - // Just a ping - if (messages.Any() && messages.First().IsPing()) + try { - await stream.SendBuffer(ReceivedBuffer); - - // We aren't gonna use this in this case - var ack = await stream.ReadExpectedBuffer(AcknowledgedBuffer); + var lengthBytes = await stream.ReadBytesAsync(sizeof(int)); + var length = BitConverter.ToInt32(lengthBytes, 0); + if (length == 0) return new BeginReceiveResult(ReceivedStatus.Successful, new Envelope[0]); - return; + var bytes = await stream.ReadBytesAsync(length); + var messages = Envelope.ReadMany(bytes); + return new BeginReceiveResult(ReceivedStatus.Successful, messages); } + catch (Exception e) + { + return new BeginReceiveResult(ReceivedStatus.SerializationFailure, e); + } + } - - var status = await callback.Received(uri, messages); + public static async Task EndReceive(Stream stream, ReceivedStatus status) + { switch (status) { case ReceivedStatus.ProcessFailure: @@ -126,14 +127,12 @@ private static async Task receive(Stream stream, IReceiverCallback callback, Env default: await stream.SendBuffer(ReceivedBuffer); - var ack = await stream.ReadExpectedBuffer(AcknowledgedBuffer); + bool ack = await stream.ReadExpectedBuffer(AcknowledgedBuffer); - if (ack) - await callback.Acknowledged(messages); - else - await callback.NotAcknowledged(messages); - break; + return ack ? ReceivedStatus.Acknowledged : ReceivedStatus.NotAcknowledged; } + + return status; } } }