diff --git a/Jasper.sln b/Jasper.sln index adc384855..7f72b14cb 100644 --- a/Jasper.sln +++ b/Jasper.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio 15 -VisualStudioVersion = 15.0.26730.16 +# Visual Studio Version 16 +VisualStudioVersion = 16.0.29806.167 MinimumVisualStudioVersion = 10.0.40219.1 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper", "src\Jasper\Jasper.csproj", "{E617B35A-B0D2-43BB-A723-D29B992EE744}" EndProject @@ -15,45 +15,45 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Samples", "Samples", "{A806 EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Publisher", "src\Publisher\Publisher.csproj", "{2AE00C65-E72F-4F95-9554-55E9B7D1A519}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Jasper.JsonCommands", "src\Jasper.JsonCommands\Jasper.JsonCommands.csproj", "{627A87BA-75B9-4C2E-861E-02880CA04295}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper.JsonCommands", "src\Jasper.JsonCommands\Jasper.JsonCommands.csproj", "{627A87BA-75B9-4C2E-861E-02880CA04295}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Persistence", "Persistence", "{166943FC-7D19-4C4A-9E74-02A2CB49CD6B}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Jasper.Persistence.Marten", "src\Jasper.Persistence.Marten\Jasper.Persistence.Marten.csproj", "{79B5A55F-4AC6-46BA-829B-00A3ACE540D8}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper.Persistence.Marten", "src\Jasper.Persistence.Marten\Jasper.Persistence.Marten.csproj", "{79B5A55F-4AC6-46BA-829B-00A3ACE540D8}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Jasper.Persistence.SqlServer", "src\Jasper.Persistence.SqlServer\Jasper.Persistence.SqlServer.csproj", "{A762EA81-6ECD-4DDD-B073-AC00BA4521CD}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper.Persistence.SqlServer", "src\Jasper.Persistence.SqlServer\Jasper.Persistence.SqlServer.csproj", "{A762EA81-6ECD-4DDD-B073-AC00BA4521CD}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Extensions", "Extensions", "{CA5A0AA5-2CAD-4F42-AF73-980469934F27}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TestingSupport", "src\TestingSupport\TestingSupport.csproj", "{9D1DBC42-96F7-4516-8C47-9DD35C6D9AAD}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TestingSupport", "src\TestingSupport\TestingSupport.csproj", "{9D1DBC42-96F7-4516-8C47-9DD35C6D9AAD}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Samples", "src\Samples\Samples.csproj", "{32E92CD7-190E-4E9E-8230-1D7E5E19A539}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Samples", "src\Samples\Samples.csproj", "{32E92CD7-190E-4E9E-8230-1D7E5E19A539}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Testing", "Testing", "{2257A448-52A2-466A-ABC5-BD63018F004A}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Jasper.Persistence.Postgresql", "src\Jasper.Persistence.Postgresql\Jasper.Persistence.Postgresql.csproj", "{4860ADD6-EC91-4DA8-84AC-9882B6210D4D}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper.Persistence.Postgresql", "src\Jasper.Persistence.Postgresql\Jasper.Persistence.Postgresql.csproj", "{4860ADD6-EC91-4DA8-84AC-9882B6210D4D}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "StorytellerRunner", "src\StorytellerRunner\StorytellerRunner.csproj", "{0197CBCD-7A5F-4A52-9294-32336B312F9C}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "StorytellerRunner", "src\StorytellerRunner\StorytellerRunner.csproj", "{0197CBCD-7A5F-4A52-9294-32336B312F9C}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Jasper.Persistence.Database", "src\Jasper.Persistence.Database\Jasper.Persistence.Database.csproj", "{442F7B7D-C529-4C17-A6F6-C578CA5C06F5}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper.Persistence.Database", "src\Jasper.Persistence.Database\Jasper.Persistence.Database.csproj", "{442F7B7D-C529-4C17-A6F6-C578CA5C06F5}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Jasper.Persistence.Testing", "src\Jasper.Persistence.Testing\Jasper.Persistence.Testing.csproj", "{899902B6-63DB-4FED-ABC7-9AE35CCE1DB6}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper.Persistence.Testing", "src\Jasper.Persistence.Testing\Jasper.Persistence.Testing.csproj", "{899902B6-63DB-4FED-ABC7-9AE35CCE1DB6}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Jasper.Testing", "src\Jasper.Testing\Jasper.Testing.csproj", "{1C7783B1-CC8E-4225-9B9D-30C05A99B912}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper.Testing", "src\Jasper.Testing\Jasper.Testing.csproj", "{1C7783B1-CC8E-4225-9B9D-30C05A99B912}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Jasper.Persistence.EntityFrameworkCore", "src\Jasper.Persistence.EntityFrameworkCore\Jasper.Persistence.EntityFrameworkCore.csproj", "{D830F62B-1031-47D5-AF3B-CC48A178FE43}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper.Persistence.EntityFrameworkCore", "src\Jasper.Persistence.EntityFrameworkCore\Jasper.Persistence.EntityFrameworkCore.csproj", "{D830F62B-1031-47D5-AF3B-CC48A178FE43}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Jasper.RabbitMQ", "src\Jasper.RabbitMQ\Jasper.RabbitMQ.csproj", "{1B86F467-4DC6-4D30-9201-FD1BD44C3271}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper.RabbitMQ", "src\Jasper.RabbitMQ\Jasper.RabbitMQ.csproj", "{1B86F467-4DC6-4D30-9201-FD1BD44C3271}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Jasper.RabbitMQ.Tests", "src\Jasper.RabbitMQ.Tests\Jasper.RabbitMQ.Tests.csproj", "{57273C2A-3F16-49B7-AB6C-80C6F44A60FE}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper.RabbitMQ.Tests", "src\Jasper.RabbitMQ.Tests\Jasper.RabbitMQ.Tests.csproj", "{57273C2A-3F16-49B7-AB6C-80C6F44A60FE}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Jasper.TestSupport.Storyteller", "src\Jasper.TestSupport.Storyteller\Jasper.TestSupport.Storyteller.csproj", "{40670392-73E5-499C-8324-EE40BA6B5A10}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper.TestSupport.Storyteller", "src\Jasper.TestSupport.Storyteller\Jasper.TestSupport.Storyteller.csproj", "{40670392-73E5-499C-8324-EE40BA6B5A10}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Jasper.TestSupport.Tests", "src\Jasper.TestSupport.Tests\Jasper.TestSupport.Tests.csproj", "{D09FBD2B-87AD-47CC-9191-5B4E06A48FBC}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper.TestSupport.Tests", "src\Jasper.TestSupport.Tests\Jasper.TestSupport.Tests.csproj", "{D09FBD2B-87AD-47CC-9191-5B4E06A48FBC}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Jasper.AzureServiceBus", "src\Jasper.AzureServiceBus\Jasper.AzureServiceBus.csproj", "{CA4812BF-8580-4891-95FE-518930FCF859}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper.AzureServiceBus", "src\Jasper.AzureServiceBus\Jasper.AzureServiceBus.csproj", "{CA4812BF-8580-4891-95FE-518930FCF859}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Jasper.AzureServiceBus.Tests", "src\Jasper.AzureServiceBus.Tests\Jasper.AzureServiceBus.Tests.csproj", "{6EC1EA66-63C7-4DF6-8DCB-40DFBEA4E07A}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper.AzureServiceBus.Tests", "src\Jasper.AzureServiceBus.Tests\Jasper.AzureServiceBus.Tests.csproj", "{6EC1EA66-63C7-4DF6-8DCB-40DFBEA4E07A}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Http", "Http", "{7687741C-5880-464B-A51D-CAA0C0B1CE0D}" EndProject @@ -63,6 +63,14 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Jasper.Http.Testing", "src\ EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EFPlusSqlServerConsole", "src\EFPlusSqlServerConsole\EFPlusSqlServerConsole.csproj", "{5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper.ConfluentKafka", "src\Jasper.ConfluentKafka\Jasper.ConfluentKafka.csproj", "{ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper.ConfluentKafka.Tests", "src\Jasper.ConfluentKafka.Tests\Jasper.ConfluentKafka.Tests.csproj", "{B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Jasper.Pulsar", "src\Jasper.Pulsar\Jasper.Pulsar.csproj", "{BB253930-8225-4737-9BB0-6F89A4073225}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper.Pulsar.Tests", "src\Jasper.Pulsar.Tests\Jasper.Pulsar.Tests.csproj", "{0A6C2CD0-23AF-45AB-A737-9D1D64693717}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -373,20 +381,68 @@ Global {5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Release|x64.Build.0 = Release|Any CPU {5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Release|x86.ActiveCfg = Release|Any CPU {5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Release|x86.Build.0 = Release|Any CPU + {ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Debug|x64.ActiveCfg = Debug|Any CPU + {ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Debug|x64.Build.0 = Debug|Any CPU + {ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Debug|x86.ActiveCfg = Debug|Any CPU + {ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Debug|x86.Build.0 = Debug|Any CPU + {ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Release|Any CPU.Build.0 = Release|Any CPU + {ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Release|x64.ActiveCfg = Release|Any CPU + {ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Release|x64.Build.0 = Release|Any CPU + {ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Release|x86.ActiveCfg = Release|Any CPU + {ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Release|x86.Build.0 = Release|Any CPU + {B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A}.Debug|x64.ActiveCfg = Debug|Any CPU + {B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A}.Debug|x64.Build.0 = Debug|Any CPU + {B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A}.Debug|x86.ActiveCfg = Debug|Any CPU + {B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A}.Debug|x86.Build.0 = Debug|Any CPU + {B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A}.Release|Any CPU.Build.0 = Release|Any CPU + {B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A}.Release|x64.ActiveCfg = Release|Any CPU + {B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A}.Release|x64.Build.0 = Release|Any CPU + {B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A}.Release|x86.ActiveCfg = Release|Any CPU + {B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A}.Release|x86.Build.0 = Release|Any CPU + {BB253930-8225-4737-9BB0-6F89A4073225}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {BB253930-8225-4737-9BB0-6F89A4073225}.Debug|Any CPU.Build.0 = Debug|Any CPU + {BB253930-8225-4737-9BB0-6F89A4073225}.Debug|x64.ActiveCfg = Debug|Any CPU + {BB253930-8225-4737-9BB0-6F89A4073225}.Debug|x64.Build.0 = Debug|Any CPU + {BB253930-8225-4737-9BB0-6F89A4073225}.Debug|x86.ActiveCfg = Debug|Any CPU + {BB253930-8225-4737-9BB0-6F89A4073225}.Debug|x86.Build.0 = Debug|Any CPU + {BB253930-8225-4737-9BB0-6F89A4073225}.Release|Any CPU.ActiveCfg = Release|Any CPU + {BB253930-8225-4737-9BB0-6F89A4073225}.Release|Any CPU.Build.0 = Release|Any CPU + {BB253930-8225-4737-9BB0-6F89A4073225}.Release|x64.ActiveCfg = Release|Any CPU + {BB253930-8225-4737-9BB0-6F89A4073225}.Release|x64.Build.0 = Release|Any CPU + {BB253930-8225-4737-9BB0-6F89A4073225}.Release|x86.ActiveCfg = Release|Any CPU + {BB253930-8225-4737-9BB0-6F89A4073225}.Release|x86.Build.0 = Release|Any CPU + {0A6C2CD0-23AF-45AB-A737-9D1D64693717}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {0A6C2CD0-23AF-45AB-A737-9D1D64693717}.Debug|Any CPU.Build.0 = Debug|Any CPU + {0A6C2CD0-23AF-45AB-A737-9D1D64693717}.Debug|x64.ActiveCfg = Debug|Any CPU + {0A6C2CD0-23AF-45AB-A737-9D1D64693717}.Debug|x64.Build.0 = Debug|Any CPU + {0A6C2CD0-23AF-45AB-A737-9D1D64693717}.Debug|x86.ActiveCfg = Debug|Any CPU + {0A6C2CD0-23AF-45AB-A737-9D1D64693717}.Debug|x86.Build.0 = Debug|Any CPU + {0A6C2CD0-23AF-45AB-A737-9D1D64693717}.Release|Any CPU.ActiveCfg = Release|Any CPU + {0A6C2CD0-23AF-45AB-A737-9D1D64693717}.Release|Any CPU.Build.0 = Release|Any CPU + {0A6C2CD0-23AF-45AB-A737-9D1D64693717}.Release|x64.ActiveCfg = Release|Any CPU + {0A6C2CD0-23AF-45AB-A737-9D1D64693717}.Release|x64.Build.0 = Release|Any CPU + {0A6C2CD0-23AF-45AB-A737-9D1D64693717}.Release|x86.ActiveCfg = Release|Any CPU + {0A6C2CD0-23AF-45AB-A737-9D1D64693717}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE EndGlobalSection GlobalSection(NestedProjects) = preSolution - {2AE00C65-E72F-4F95-9554-55E9B7D1A519} = {A806095A-9D66-4D55-8662-1FC67E90F6FB} - {3FA62C1A-7046-4CFB-8124-284EC7BD3660} = {A806095A-9D66-4D55-8662-1FC67E90F6FB} {4F18A2E4-5056-48C8-89BA-4837F6F983E4} = {A806095A-9D66-4D55-8662-1FC67E90F6FB} + {3FA62C1A-7046-4CFB-8124-284EC7BD3660} = {A806095A-9D66-4D55-8662-1FC67E90F6FB} + {51279890-5EDE-42A3-8D8D-EE0230CE0944} = {2257A448-52A2-466A-ABC5-BD63018F004A} + {2AE00C65-E72F-4F95-9554-55E9B7D1A519} = {A806095A-9D66-4D55-8662-1FC67E90F6FB} + {627A87BA-75B9-4C2E-861E-02880CA04295} = {CA5A0AA5-2CAD-4F42-AF73-980469934F27} {79B5A55F-4AC6-46BA-829B-00A3ACE540D8} = {166943FC-7D19-4C4A-9E74-02A2CB49CD6B} {A762EA81-6ECD-4DDD-B073-AC00BA4521CD} = {166943FC-7D19-4C4A-9E74-02A2CB49CD6B} - {627A87BA-75B9-4C2E-861E-02880CA04295} = {CA5A0AA5-2CAD-4F42-AF73-980469934F27} - {32E92CD7-190E-4E9E-8230-1D7E5E19A539} = {A806095A-9D66-4D55-8662-1FC67E90F6FB} - {51279890-5EDE-42A3-8D8D-EE0230CE0944} = {2257A448-52A2-466A-ABC5-BD63018F004A} {9D1DBC42-96F7-4516-8C47-9DD35C6D9AAD} = {2257A448-52A2-466A-ABC5-BD63018F004A} + {32E92CD7-190E-4E9E-8230-1D7E5E19A539} = {A806095A-9D66-4D55-8662-1FC67E90F6FB} {4860ADD6-EC91-4DA8-84AC-9882B6210D4D} = {166943FC-7D19-4C4A-9E74-02A2CB49CD6B} {0197CBCD-7A5F-4A52-9294-32336B312F9C} = {2257A448-52A2-466A-ABC5-BD63018F004A} {442F7B7D-C529-4C17-A6F6-C578CA5C06F5} = {166943FC-7D19-4C4A-9E74-02A2CB49CD6B} @@ -402,6 +458,10 @@ Global {AB120B77-376F-4F84-8FAC-297A066E9434} = {7687741C-5880-464B-A51D-CAA0C0B1CE0D} {39F644C3-832A-471D-8827-BDC6B270F73B} = {7687741C-5880-464B-A51D-CAA0C0B1CE0D} {5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B} = {A806095A-9D66-4D55-8662-1FC67E90F6FB} + {ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A} = {CA5A0AA5-2CAD-4F42-AF73-980469934F27} + {B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A} = {CA5A0AA5-2CAD-4F42-AF73-980469934F27} + {BB253930-8225-4737-9BB0-6F89A4073225} = {CA5A0AA5-2CAD-4F42-AF73-980469934F27} + {0A6C2CD0-23AF-45AB-A737-9D1D64693717} = {CA5A0AA5-2CAD-4F42-AF73-980469934F27} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {D92D723F-44EC-4C1E-AAC3-C162FCEAAA08} diff --git a/docker-compose.yml b/docker-compose.yml index 5f9053c2f..7e42ed9c9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,3 +16,13 @@ services: - "ACCEPT_EULA=Y" - "SA_PASSWORD=P@55w0rd" - "MSSQL_PID=Developer" + pulsar: + image: apachepulsar/pulsar + ports: + - "6650:6650" + environment: + - PULSAR_MEM=" -Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g" + command: > + /bin/bash -c + "bin/apply-config-from-env.py conf/standalone.conf + && bin/pulsar standalone" \ No newline at end of file diff --git a/src/Jasper.AzureServiceBus.Tests/Jasper.AzureServiceBus.Tests.csproj b/src/Jasper.AzureServiceBus.Tests/Jasper.AzureServiceBus.Tests.csproj index 91ca1a574..7eb6d8d9c 100644 --- a/src/Jasper.AzureServiceBus.Tests/Jasper.AzureServiceBus.Tests.csproj +++ b/src/Jasper.AzureServiceBus.Tests/Jasper.AzureServiceBus.Tests.csproj @@ -1,28 +1,28 @@ - + - - netcoreapp3.0 + + netcoreapp3.0 - false - + false + - - - - - + + + + + - - - - + + + + - + - - - Servers.cs - - + + + Servers.cs + + diff --git a/src/Jasper.AzureServiceBus.Tests/Samples.cs b/src/Jasper.AzureServiceBus.Tests/Samples.cs index bdb01f59d..21a8338f9 100644 --- a/src/Jasper.AzureServiceBus.Tests/Samples.cs +++ b/src/Jasper.AzureServiceBus.Tests/Samples.cs @@ -3,10 +3,10 @@ using Baseline; using Jasper.Attributes; using Jasper.AzureServiceBus.Internal; +using Jasper.Transports; using Microsoft.Azure.ServiceBus; using Microsoft.Azure.ServiceBus.Primitives; using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; namespace Jasper.AzureServiceBus.Tests @@ -64,7 +64,7 @@ public AzureServiceBusTopicSendingApp( ) - public class MySpecialProtocol : IAzureServiceBusProtocol + public class MySpecialProtocol : ITransportProtocol { public Message WriteFromEnvelope(Envelope envelope) { diff --git a/src/Jasper.AzureServiceBus/AzureServiceBusEndpoint.cs b/src/Jasper.AzureServiceBus/AzureServiceBusEndpoint.cs index 81e88493c..a85024d39 100644 --- a/src/Jasper.AzureServiceBus/AzureServiceBusEndpoint.cs +++ b/src/Jasper.AzureServiceBus/AzureServiceBusEndpoint.cs @@ -8,6 +8,7 @@ using Jasper.Transports; using Jasper.Transports.Sending; using Jasper.Util; +using Microsoft.Azure.ServiceBus; namespace Jasper.AzureServiceBus { @@ -33,7 +34,7 @@ public AzureServiceBusEndpoint(Uri uri) : base(uri) public string QueueName { get; set; } public string TopicName { get; set; } - public IAzureServiceBusProtocol Protocol { get; set; } = new DefaultAzureServiceBusProtocol(); + public ITransportProtocol Protocol { get; set; } = new DefaultAzureServiceBusProtocol(); public override Uri Uri => buildUri(false); @@ -139,7 +140,7 @@ protected internal override void StartListening(IMessagingRoot root, ITransportR protected override ISender CreateSender(IMessagingRoot root) { if (Parent.ConnectionString == null) throw new InvalidOperationException("There is no configured connection string for Azure Service Bus, or it is empty"); - return new AzureServiceBusSender(this, Parent, root.TransportLogger, root.Cancellation); + return new AzureServiceBusSender(this, Parent); } } } diff --git a/src/Jasper.AzureServiceBus/AzureServiceBusListenerConfiguration.cs b/src/Jasper.AzureServiceBus/AzureServiceBusListenerConfiguration.cs index 3291239fc..8e6e3d80f 100644 --- a/src/Jasper.AzureServiceBus/AzureServiceBusListenerConfiguration.cs +++ b/src/Jasper.AzureServiceBus/AzureServiceBusListenerConfiguration.cs @@ -1,4 +1,6 @@ using Jasper.Configuration; +using Jasper.Transports; +using Microsoft.Azure.ServiceBus; namespace Jasper.AzureServiceBus { @@ -14,7 +16,7 @@ public AzureServiceBusListenerConfiguration(AzureServiceBusEndpoint endpoint) : /// /// /// - public AzureServiceBusListenerConfiguration Protocol() where T : IAzureServiceBusProtocol, new() + public AzureServiceBusListenerConfiguration Protocol() where T : ITransportProtocol, new() { return Protocol(new T()); } @@ -25,7 +27,7 @@ public AzureServiceBusListenerConfiguration(AzureServiceBusEndpoint endpoint) : /// /// /// - public AzureServiceBusListenerConfiguration Protocol(IAzureServiceBusProtocol protocol) + public AzureServiceBusListenerConfiguration Protocol(ITransportProtocol protocol) { endpoint.Protocol = protocol; return this; diff --git a/src/Jasper.AzureServiceBus/AzureServiceBusSubscriberConfiguration.cs b/src/Jasper.AzureServiceBus/AzureServiceBusSubscriberConfiguration.cs index 9824fb38a..95111e368 100644 --- a/src/Jasper.AzureServiceBus/AzureServiceBusSubscriberConfiguration.cs +++ b/src/Jasper.AzureServiceBus/AzureServiceBusSubscriberConfiguration.cs @@ -1,4 +1,6 @@ using Jasper.Configuration; +using Jasper.Transports; +using Microsoft.Azure.ServiceBus; namespace Jasper.AzureServiceBus { @@ -14,7 +16,7 @@ public AzureServiceBusSubscriberConfiguration(AzureServiceBusEndpoint endpoint) /// /// /// - public AzureServiceBusSubscriberConfiguration Protocol() where T : IAzureServiceBusProtocol, new() + public AzureServiceBusSubscriberConfiguration Protocol() where T : ITransportProtocol, new() { return Protocol(new T()); } @@ -25,7 +27,7 @@ public AzureServiceBusSubscriberConfiguration(AzureServiceBusEndpoint endpoint) /// /// /// - public AzureServiceBusSubscriberConfiguration Protocol(IAzureServiceBusProtocol protocol) + public AzureServiceBusSubscriberConfiguration Protocol(ITransportProtocol protocol) { _endpoint.Protocol = protocol; return this; diff --git a/src/Jasper.AzureServiceBus/IAzureServiceBusProtocol.cs b/src/Jasper.AzureServiceBus/IAzureServiceBusProtocol.cs deleted file mode 100644 index 331a50cbc..000000000 --- a/src/Jasper.AzureServiceBus/IAzureServiceBusProtocol.cs +++ /dev/null @@ -1,27 +0,0 @@ -using Microsoft.Azure.ServiceBus; - -namespace Jasper.AzureServiceBus -{ - // SAMPLE: IAzureServiceBusProtocol - /// - /// Used to "map" incoming Azure Service Bus Message objects to Jasper Envelopes. Can be implemented to - /// connect Jasper to non-Jasper applications - /// - public interface IAzureServiceBusProtocol - { - /// - /// Creates an Azure Service Bus Message object for a Jasper Envelope - /// - /// - /// - Message WriteFromEnvelope(Envelope envelope); - - /// - /// Creates an Envelope for the incoming Azure Service Bus Message - /// - /// - /// - Envelope ReadEnvelope(Message message); - } - // ENDSAMPLE -} diff --git a/src/Jasper.AzureServiceBus/Internal/AzureServiceBusListener.cs b/src/Jasper.AzureServiceBus/Internal/AzureServiceBusListener.cs index b29fa51db..bd7459d86 100644 --- a/src/Jasper.AzureServiceBus/Internal/AzureServiceBusListener.cs +++ b/src/Jasper.AzureServiceBus/Internal/AzureServiceBusListener.cs @@ -16,7 +16,7 @@ public class AzureServiceBusListener : IListener private readonly IList _clientEntities = new List(); private readonly AzureServiceBusEndpoint _endpoint; private readonly ITransportLogger _logger; - private readonly IAzureServiceBusProtocol _protocol; + private readonly ITransportProtocol _protocol; private readonly AzureServiceBusTransport _transport; private IReceiverCallback _callback; diff --git a/src/Jasper.AzureServiceBus/Internal/AzureServiceBusSender.cs b/src/Jasper.AzureServiceBus/Internal/AzureServiceBusSender.cs index 9c19b0b76..d122176cc 100644 --- a/src/Jasper.AzureServiceBus/Internal/AzureServiceBusSender.cs +++ b/src/Jasper.AzureServiceBus/Internal/AzureServiceBusSender.cs @@ -1,11 +1,9 @@ using System; -using System.Collections.Concurrent; -using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; -using System.Threading.Tasks.Dataflow; using Baseline; using Jasper.Logging; +using Jasper.Transports; using Jasper.Transports.Sending; using Microsoft.Azure.ServiceBus; using Microsoft.Azure.ServiceBus.Core; @@ -14,40 +12,19 @@ namespace Jasper.AzureServiceBus.Internal { public class AzureServiceBusSender : ISender { - private readonly IAzureServiceBusProtocol _protocol; + private readonly ITransportProtocol _protocol; private readonly AzureServiceBusEndpoint _endpoint; private readonly AzureServiceBusTransport _transport; - private readonly ITransportLogger _logger; - private readonly CancellationToken _cancellation; private ISenderClient _sender; - private ActionBlock _sending; - private ISenderCallback _callback; - - public AzureServiceBusSender(AzureServiceBusEndpoint endpoint, AzureServiceBusTransport transport, ITransportLogger logger, - CancellationToken cancellation) + public bool SupportsNativeScheduledSend { get; } = true; + public Uri Destination => _endpoint.Uri; + + public AzureServiceBusSender(AzureServiceBusEndpoint endpoint, AzureServiceBusTransport transport) { _protocol = endpoint.Protocol; _endpoint = endpoint; _transport = transport; - _logger = logger; - _cancellation = cancellation; - Destination = endpoint.Uri; - } - - public void Dispose() - { - _sender?.CloseAsync().GetAwaiter().GetResult(); - } - - public Uri Destination { get; } - public int QueuedCount => _sending.InputCount; - public bool Latched { get; private set; } - - - public void Start(ISenderCallback callback) - { - _callback = callback; - + // The variance here should be in constructing the sending & buffer blocks if (_endpoint.TopicName.IsEmpty()) { @@ -55,11 +32,6 @@ public void Start(ISenderCallback callback) ? new MessageSender(_transport.ConnectionString, _endpoint.QueueName, _transport.TokenProvider, _transport.TransportType, _transport.RetryPolicy) : new MessageSender(_transport.ConnectionString, _endpoint.QueueName, _transport.RetryPolicy); - - _sending = new ActionBlock(sendBySession, new ExecutionDataflowBlockOptions - { - CancellationToken = _cancellation - }); } else { @@ -68,44 +40,31 @@ public void Start(ISenderCallback callback) _transport.TransportType, _transport.RetryPolicy) : new TopicClient(_transport.ConnectionString, _endpoint.TopicName, _transport.RetryPolicy); - - _sending = new ActionBlock(sendBySession, new ExecutionDataflowBlockOptions - { - CancellationToken = _cancellation - }); } } - - public Task Enqueue(Envelope envelope) + public void Dispose() { - _sending.Post(envelope); - - return Task.CompletedTask; + _sender?.CloseAsync().GetAwaiter().GetResult(); } - public async Task LatchAndDrain() + public Task Send(Envelope envelope) { - Latched = true; - - await _sender.CloseAsync(); + var message = _protocol.WriteFromEnvelope(envelope); + message.SessionId = Guid.NewGuid().ToString(); - _sending.Complete(); - _logger.CircuitBroken(Destination); - } - - public void Unlatch() - { - _logger.CircuitResumed(Destination); + if (envelope.IsDelayed(DateTime.UtcNow)) + { + return _sender.ScheduleMessageAsync(message, envelope.ExecutionTime.Value); + } - Start(_callback); - Latched = false; + return _sender.SendAsync(message); } - + public async Task Ping(CancellationToken cancellationToken) { - var envelope = Envelope.ForPing(Destination); + var envelope = Envelope.ForPing(_endpoint.Uri); var message = _protocol.WriteFromEnvelope(envelope); message.SessionId = Guid.NewGuid().ToString(); @@ -113,39 +72,5 @@ public async Task Ping(CancellationToken cancellationToken) return true; } - - public bool SupportsNativeScheduledSend { get; } = true; - - private async Task sendBySession(Envelope envelope) - { - try - { - var message = _protocol.WriteFromEnvelope(envelope); - message.SessionId = Guid.NewGuid().ToString(); - - - if (envelope.IsDelayed(DateTime.UtcNow)) - { - await _sender.ScheduleMessageAsync(message, envelope.ExecutionTime.Value); - } - else - { - await _sender.SendAsync(message); - } - - await _callback.Successful(envelope); - } - catch (Exception e) - { - try - { - await _callback.ProcessingFailure(envelope, e); - } - catch (Exception exception) - { - _logger.LogException(exception); - } - } - } } } diff --git a/src/Jasper.AzureServiceBus/Internal/DefaultAzureServiceBusProtocol.cs b/src/Jasper.AzureServiceBus/Internal/DefaultAzureServiceBusProtocol.cs index 010721af5..d85dee696 100644 --- a/src/Jasper.AzureServiceBus/Internal/DefaultAzureServiceBusProtocol.cs +++ b/src/Jasper.AzureServiceBus/Internal/DefaultAzureServiceBusProtocol.cs @@ -1,11 +1,12 @@ using System; using Baseline; +using Jasper.Transports; using Microsoft.Azure.ServiceBus; namespace Jasper.AzureServiceBus.Internal { // SAMPLE: DefaultAzureServiceBusProtocol - public class DefaultAzureServiceBusProtocol : IAzureServiceBusProtocol + public class DefaultAzureServiceBusProtocol : ITransportProtocol { public virtual Message WriteFromEnvelope(Envelope envelope) { diff --git a/src/Jasper.ConfluentKafka.Tests/Jasper.ConfluentKafka.Tests.csproj b/src/Jasper.ConfluentKafka.Tests/Jasper.ConfluentKafka.Tests.csproj new file mode 100644 index 000000000..4bf5e2989 --- /dev/null +++ b/src/Jasper.ConfluentKafka.Tests/Jasper.ConfluentKafka.Tests.csproj @@ -0,0 +1,30 @@ + + + + netcoreapp3.0 + + false + + + + + + + + + + + + + + + + + + + + Servers.cs + + + + diff --git a/src/Jasper.ConfluentKafka.Tests/KafkaEndpointTester.cs b/src/Jasper.ConfluentKafka.Tests/KafkaEndpointTester.cs new file mode 100644 index 000000000..bbff770e9 --- /dev/null +++ b/src/Jasper.ConfluentKafka.Tests/KafkaEndpointTester.cs @@ -0,0 +1,61 @@ +using System; +using Jasper.ConfluentKafka; +using Shouldly; +using Xunit; + +namespace Jasper.Kafka.Tests +{ + public class KafkaEndpointTester + { + [Fact] + public void parse_non_durable_uri() + { + var endpoint = new KafkaEndpoint(); + endpoint.Parse(new Uri("kafka://topic/key1")); + + endpoint.IsDurable.ShouldBeFalse(); + endpoint.TopicName.ShouldBe("key1"); + } + + [Fact] + public void parse_durable_uri() + { + var endpoint = new KafkaEndpoint(); + endpoint.Parse(new Uri("kafka://topic/key1/durable")); + + endpoint.IsDurable.ShouldBeTrue(); + endpoint.TopicName.ShouldBe("key1"); + } + + [Fact] + public void build_uri_for_subscription_and_topic() + { + new KafkaEndpoint + { + TopicName = "key1" + } + .Uri.ShouldBe(new Uri("kafka://topic/key1")); + } + + [Fact] + public void generate_reply_uri_for_non_durable() + { + new KafkaEndpoint + { + TopicName = "key1" + } + .ReplyUri().ShouldBe(new Uri("kafka://topic/key1")); + } + + [Fact] + public void generate_reply_uri_for_durable() + { + new KafkaEndpoint + { + TopicName = "key1", + IsDurable = true + }.ReplyUri().ShouldBe(new Uri("kafka://topic/key1/durable")); + } + + } +} diff --git a/src/Jasper.ConfluentKafka.Tests/end_to_end.cs b/src/Jasper.ConfluentKafka.Tests/end_to_end.cs new file mode 100644 index 000000000..52f9d9e48 --- /dev/null +++ b/src/Jasper.ConfluentKafka.Tests/end_to_end.cs @@ -0,0 +1,192 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using AutoFixture; +using AutoFixture.Xunit2; +using Baseline.Dates; +using Confluent.Kafka; +using Jasper.Tracking; +using Jasper.Util; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using TestingSupport; +using TestingSupport.Compliance; +using TestingSupport.ErrorHandling; +using TestMessages; +using Xunit; + +namespace Jasper.ConfluentKafka.Tests +{ + [Obsolete("try to replace with compliance tests")] + public class end_to_end + { + private static string KafkaServer = "b-1.jj-test.y7lv7k.c5.kafka.us-east-1.amazonaws.com:9094,b-2.jj-test.y7lv7k.c5.kafka.us-east-1.amazonaws.com:9094"; + private static ProducerConfig ProducerConfig = new ProducerConfig + { + BootstrapServers = KafkaServer, + SecurityProtocol = SecurityProtocol.Ssl + }; + private static ProducerConfig FailureProducerConfig = new ProducerConfig + { + BootstrapServers = "badaddress", + MessageTimeoutMs = 1000 + }; + + private static ConsumerConfig ConsumerConfig = new ConsumerConfig + { + BootstrapServers = KafkaServer, + SecurityProtocol = SecurityProtocol.Ssl, + GroupId = nameof(end_to_end), + }; + + public class Sender : JasperOptions + { + public const string Topic = "jasper-compliance"; + public static string ReplyTopic = $"{Topic}-reply"; + public Sender() + { + Endpoints.ConfigureKafka(); + Endpoints.PublishAllMessages().ToKafkaTopic(Topic, ProducerConfig); + Endpoints.ListenToKafkaTopic(ReplyTopic, ConsumerConfig).UseForReplies(); + } + } + + public class FailureSender : JasperOptions + { + public const string Topic = "jasper-compliance"; + public FailureSender() + { + Endpoints.ConfigureKafka(); + Endpoints.PublishAllMessages().ToKafkaTopic(Topic, FailureProducerConfig); + } + } + + public class Receiver : JasperOptions + { + public Receiver(string topic) + { + Endpoints.ConfigureKafka(); + Endpoints.PublishAllMessages().ToKafkaTopic(Sender.ReplyTopic, ProducerConfig); + Endpoints.ListenToKafkaTopic(topic, ConsumerConfig); + } + } + + + public class KafkaSendingComplianceTests : SendingCompliance + { + public KafkaSendingComplianceTests() : base($"kafka://topic/{Sender.Topic}".ToUri(), 15.Seconds()) + { + var sender = new Sender(); + + SenderIs(sender); + + var receiver = new Receiver(Sender.Topic); + + ReceiverIs(receiver); + } + + [Fact] + public async Task publish_failures_reported_to_caller() + { + theSender = null; + SenderIs(); + + _ = await theSender.TrackActivity(60.Seconds()) + .DoNotAssertOnExceptionsDetected() + .DoNotAssertTimeout() + .ExecuteAndWait(c => + { + Should.Throw(c.Publish(new Message1())); + return Task.CompletedTask; + }); + } + } + + + + + + // SAMPLE: can_stop_and_start_ASB + [Fact] + public async Task can_send_and_receive_from_kafka() + { + using var host = JasperHost.For(); + await host + // The TrackActivity() method starts a Fluent Interface + // that gives you fine-grained control over the + // message tracking + .TrackActivity() + .Timeout(30.Seconds()) + // Include the external transports in the determination + // of "completion" + .IncludeExternalTransports() + .SendMessageAndWait(new ColorChosen { Name = "Red" }); + + var colors = host.Get(); + + colors.Name.ShouldBe("Red"); + } + + [Fact] + public async Task send_multiple_messages_in_order() + { + var colorsChosens = Enumerable.Range(0, 100).Select(i => new ColorChosen {Name = i.ToString()}); + var sequence = Guid.NewGuid().ToString(); + using var host = JasperHost.For(host => + { + host.Endpoints.ConfigureKafka(); + host.Endpoints.ListenToKafkaTopic("messages", ConsumerConfig).Sequential(); + host.Endpoints.Publish(pub => pub.Message().ToKafkaTopic("messages", ProducerConfig) + .CustomizeOutgoing(e => e.Headers.Add("MessageKey", sequence)) // use the same message key in Kafka + ); + host.Handlers.IncludeType(); + host.Services.AddSingleton(); + host.Extensions.UseMessageTrackingTestingSupport(); + }); + + ITrackedSession session = await host + .TrackActivity() + .Timeout(60.Seconds()) + .IncludeExternalTransports() + .ExecuteAndWait(async ctx => + { + foreach (ColorChosen colorsChosen in colorsChosens) + { + await ctx.Publish(colorsChosen); + } + }); + + IEnumerable colorsSent = session.AllRecordsInOrder() + .Where(e => e.EventType == EventType.Sent) + .Select(e => e.Envelope.Message).Cast().Select(c => c.Name); + IEnumerable colorsPublished = colorsChosens.Select(c => c.Name); + + colorsSent.ShouldBe(colorsPublished); + } + + // ENDSAMPLE + public class KafkaUsingApp : JasperOptions + { + public KafkaUsingApp() + { + Endpoints.ConfigureKafka(); + Endpoints.ListenToKafkaTopic("messages", ConsumerConfig); + Endpoints.Publish(pub => pub.Message().ToKafkaTopic("messages", ProducerConfig)); + + Handlers.IncludeType(); + + Services.AddSingleton(); + + Extensions.UseMessageTrackingTestingSupport(); + } + + public override void Configure(IHostEnvironment hosting, IConfiguration config) + { + //Endpoints.ConfigureAzureServiceBus(config.GetValue("AzureServiceBusConnectionString")); + } + } + } +} diff --git a/src/Jasper.ConfluentKafka/Exceptions/KafkaSenderException.cs b/src/Jasper.ConfluentKafka/Exceptions/KafkaSenderException.cs new file mode 100644 index 000000000..bd538219e --- /dev/null +++ b/src/Jasper.ConfluentKafka/Exceptions/KafkaSenderException.cs @@ -0,0 +1,15 @@ +using System; +using Confluent.Kafka; + +namespace Jasper.ConfluentKafka.Exceptions +{ + public class KafkaSenderException : ApplicationException + { + public Error Error { get; } + + public KafkaSenderException(Error error) : base(error.Reason) + { + Error = error; + } + } +} diff --git a/src/Jasper.ConfluentKafka/Exceptions/UnsupportedFeatureException.cs b/src/Jasper.ConfluentKafka/Exceptions/UnsupportedFeatureException.cs new file mode 100644 index 000000000..6bb8a46be --- /dev/null +++ b/src/Jasper.ConfluentKafka/Exceptions/UnsupportedFeatureException.cs @@ -0,0 +1,14 @@ +using System; + +namespace Jasper.ConfluentKafka.Exceptions +{ + public class UnsupportedFeatureException : ApplicationException + { + public string Feature { get; } + public UnsupportedFeatureException(string feature) + : base ($"Confluent Kafka Transport does not support {feature}") + { + Feature = feature; + } + } +} diff --git a/src/Jasper.ConfluentKafka/Internal/ConfluentKafkaListener.cs b/src/Jasper.ConfluentKafka/Internal/ConfluentKafkaListener.cs new file mode 100644 index 000000000..540e07db1 --- /dev/null +++ b/src/Jasper.ConfluentKafka/Internal/ConfluentKafkaListener.cs @@ -0,0 +1,100 @@ +using System; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; +using Confluent.Kafka; +using Jasper.ConfluentKafka; +using Jasper.Logging; +using Jasper.Transports; + +namespace Jasper.Kafka.Internal +{ + public class ConfluentKafkaListener : IListener + { + private readonly CancellationToken _cancellation; + private readonly IConsumer _consumer; + private readonly KafkaEndpoint _endpoint; + private readonly ITransportLogger _logger; + private IReceiverCallback _callback; + private readonly ITransportProtocol> _protocol; + private Task _consumerTask; + + public ConfluentKafkaListener(KafkaEndpoint endpoint, ITransportLogger logger, CancellationToken cancellation) + { + _endpoint = endpoint; + _logger = logger; + _cancellation = cancellation; + _protocol = new KafkaTransportProtocol(); + _consumer = new ConsumerBuilder(endpoint.ConsumerConfig).Build(); + } + + public void Dispose() + { + _consumer?.Dispose(); + _consumerTask?.Dispose(); + } + + public Uri Address => _endpoint.Uri; + public ListeningStatus Status { get; set; } + + public void Start(IReceiverCallback callback) + { + _callback = callback; + + _consumer.Subscribe(new []{ _endpoint.TopicName }); + + _consumerTask = ConsumeAsync(); + + _logger.ListeningStatusChange(ListeningStatus.Accepting); + } + + private async Task ConsumeAsync() + { + while (!_cancellation.IsCancellationRequested) + { + ConsumeResult message; + try + { + message = await Task.Run(() => _consumer.Consume(), _cancellation); + } + catch (Confluent.Kafka.ConsumeException cex) + { + if (cex.Error.Code == ErrorCode.PolicyViolation) + { + throw; + } + + continue; + } + catch (Exception ex) + { + _logger.LogException(ex, message: $"Error consuming message from Kafka topic {_endpoint.TopicName}"); + continue; + } + + Envelope envelope; + + try + { + envelope = _protocol.ReadEnvelope(message.Message); + } + catch (Exception ex) + { + _logger.LogException(ex, message: $"Error trying to map an incoming Kafka {_endpoint.TopicName} Topic message to an Envelope. See the Dead Letter Queue"); + continue; + } + + try + { + await _callback.Received(Address, new[] {envelope}); + + _consumer.Commit(message); + } + catch (Exception e) + { + _logger.LogException(e, envelope.Id, "Error trying to receive a message from " + Address); + } + } + } + } +} diff --git a/src/Jasper.ConfluentKafka/Internal/ConfluentKafkaSender.cs b/src/Jasper.ConfluentKafka/Internal/ConfluentKafkaSender.cs new file mode 100644 index 000000000..1ef127e0b --- /dev/null +++ b/src/Jasper.ConfluentKafka/Internal/ConfluentKafkaSender.cs @@ -0,0 +1,69 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Confluent.Kafka; +using Jasper.ConfluentKafka.Exceptions; +using Jasper.Logging; +using Jasper.Transports; +using Jasper.Transports.Sending; + +namespace Jasper.ConfluentKafka.Internal +{ + public class ConfluentKafkaSender : ISender + { + private readonly ITransportProtocol> _protocol; + private readonly IProducer _publisher; + private readonly KafkaEndpoint _endpoint; + public bool SupportsNativeScheduledSend { get; } = false; + public Uri Destination => _endpoint.Uri; + public ConfluentKafkaSender(KafkaEndpoint endpoint) + { + if(endpoint?.ProducerConfig == null) + throw new ArgumentNullException(nameof(KafkaEndpoint.ProducerConfig)); + + _endpoint = endpoint; + _publisher = new ProducerBuilder(endpoint.ProducerConfig) + .SetErrorHandler((producer, error) => + { + if (error.IsFatal) + { + throw new KafkaSenderException(error); + } + }) + .Build(); + _protocol = new KafkaTransportProtocol(); + } + + public void Dispose() + { + _publisher?.Dispose(); + } + + public async Task Ping(CancellationToken cancellationToken) + { + Envelope envelope = Envelope.ForPing(Destination); + try + { + await Send(envelope); + } + catch + { + return false; + } + + return true; + } + + public Task Send(Envelope envelope) + { + if (envelope.IsDelayed(DateTime.UtcNow)) + { + throw new UnsupportedFeatureException("Delayed Message Delivery"); + } + + Message message = _protocol.WriteFromEnvelope(envelope); + + return _publisher.ProduceAsync(_endpoint.TopicName, message); + } + } +} diff --git a/src/Jasper.ConfluentKafka/Internal/KafkaTopicRouter.cs b/src/Jasper.ConfluentKafka/Internal/KafkaTopicRouter.cs new file mode 100644 index 000000000..747b924ad --- /dev/null +++ b/src/Jasper.ConfluentKafka/Internal/KafkaTopicRouter.cs @@ -0,0 +1,30 @@ +using System; +using Baseline; +using Jasper.Configuration; +using Jasper.Runtime.Routing; + +namespace Jasper.ConfluentKafka.Internal +{ + public class KafkaTopicRouter : TopicRouter + { + public override Uri BuildUriForTopic(string topicName) + { + var endpoint = new KafkaEndpoint + { + IsDurable = true, + TopicName = topicName + }; + + return endpoint.Uri; + } + + public override KafkaSubscriberConfiguration FindConfigurationForTopic(string topicName, + IEndpoints endpoints) + { + var uri = BuildUriForTopic(topicName); + var endpoint = endpoints.As().GetOrCreateEndpoint(uri); + + return new KafkaSubscriberConfiguration((KafkaEndpoint) endpoint); + } + } +} diff --git a/src/Jasper.ConfluentKafka/Jasper.ConfluentKafka.csproj b/src/Jasper.ConfluentKafka/Jasper.ConfluentKafka.csproj new file mode 100644 index 000000000..d3587cd7d --- /dev/null +++ b/src/Jasper.ConfluentKafka/Jasper.ConfluentKafka.csproj @@ -0,0 +1,34 @@ + + + + Kafka Transport for Jasper Messaging Systems using Confluent Kafka Client + Jeremy D. Miller, Jarrod Johnson + netstandard2.1 + portable + Jasper.ConfluentKafka + Jasper.ConfluentKafka + https://avatars2.githubusercontent.com/u/10048186?v=3&s=200 + http://jasperfx.github.io + https://github.com/JasperFX/jasper/blob/master/LICENSE.txt + false + false + false + false + false + + + + + + + + + + + + + CommonAssemblyInfo.cs + + + + diff --git a/src/Jasper.ConfluentKafka/KafkaEndpoint.cs b/src/Jasper.ConfluentKafka/KafkaEndpoint.cs new file mode 100644 index 000000000..19f1ce3a5 --- /dev/null +++ b/src/Jasper.ConfluentKafka/KafkaEndpoint.cs @@ -0,0 +1,103 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Baseline; +using Confluent.Kafka; +using Jasper.Configuration; +using Jasper.ConfluentKafka.Internal; +using Jasper.Kafka.Internal; +using Jasper.Runtime; +using Jasper.Transports; +using Jasper.Transports.Sending; +using Jasper.Util; + +namespace Jasper.ConfluentKafka +{ + public class KafkaEndpoint : Endpoint + { + private const string TopicToken = "topic"; + public string TopicName { get; set; } + public ProducerConfig ProducerConfig { get; set; } + public ConsumerConfig ConsumerConfig { get; set; } + public override Uri Uri => BuildUri(); + + public KafkaEndpoint() + { + + } + public KafkaEndpoint(Uri uri) : base(uri) + { + + } + + + private Uri BuildUri(bool forReply = false) + { + var list = new List(); + + if (TopicName.IsNotEmpty()) + { + list.Add(TopicToken); + list.Add(TopicName.ToLowerInvariant()); + } + + if (forReply && IsDurable) + { + list.Add(TransportConstants.Durable); + } + + var uri = $"{Protocols.Kafka}://{list.Join("/")}".ToUri(); + + return uri; + } + + public override void Parse(Uri uri) + { + if (uri.Scheme != Protocols.Kafka) + { + throw new ArgumentOutOfRangeException($"This is not a Kafka Transport Uri"); + } + + var raw = uri.Segments.Where(x => x != "/").Select(x => x.Trim('/')); + var segments = new Queue(); + segments.Enqueue(uri.Host); + foreach (var segment in raw) + { + segments.Enqueue(segment); + } + + while (segments.Any()) + { + if (segments.Peek().EqualsIgnoreCase(TopicToken)) + { + segments.Dequeue(); // token + TopicName = segments.Dequeue(); // value + } + else if (segments.Peek().EqualsIgnoreCase(TransportConstants.Durable)) + { + segments.Dequeue(); // token + IsDurable = true; + } + else + { + throw new InvalidOperationException($"The Uri '{uri}' is invalid for a Kafka Transport endpoint"); + } + } + } + + protected internal override void StartListening(IMessagingRoot root, ITransportRuntime runtime) + { + if (!IsListener) return; + + var listener = new ConfluentKafkaListener(this, root.TransportLogger, root.Cancellation); + runtime.AddListener(listener, this); + } + + protected override ISender CreateSender(IMessagingRoot root) + { + return new ConfluentKafkaSender(this); + } + + public override Uri ReplyUri() => BuildUri(true); + } +} diff --git a/src/Jasper.ConfluentKafka/KafkaListenerConfiguration.cs b/src/Jasper.ConfluentKafka/KafkaListenerConfiguration.cs new file mode 100644 index 000000000..0541f62ad --- /dev/null +++ b/src/Jasper.ConfluentKafka/KafkaListenerConfiguration.cs @@ -0,0 +1,12 @@ +using Jasper.Configuration; +using Jasper.ConfluentKafka; + +namespace Jasper.Kafka +{ + public class KafkaListenerConfiguration : ListenerConfiguration + { + public KafkaListenerConfiguration(KafkaEndpoint endpoint) : base(endpoint) + { + } + } +} diff --git a/src/Jasper.ConfluentKafka/KafkaSubscriberConfiguration.cs b/src/Jasper.ConfluentKafka/KafkaSubscriberConfiguration.cs new file mode 100644 index 000000000..57b9ece0e --- /dev/null +++ b/src/Jasper.ConfluentKafka/KafkaSubscriberConfiguration.cs @@ -0,0 +1,12 @@ +using Jasper.Configuration; + +namespace Jasper.ConfluentKafka +{ + public class KafkaSubscriberConfiguration : SubscriberConfiguration + { + public KafkaSubscriberConfiguration(KafkaEndpoint endpoint) : base(endpoint) + { + } + + } +} diff --git a/src/Jasper.ConfluentKafka/KafkaTransport.cs b/src/Jasper.ConfluentKafka/KafkaTransport.cs new file mode 100644 index 000000000..b8110f0a8 --- /dev/null +++ b/src/Jasper.ConfluentKafka/KafkaTransport.cs @@ -0,0 +1,65 @@ +using System; +using System.Collections.Generic; +using Confluent.Kafka; +using Jasper.ConfluentKafka.Internal; +using Jasper.Transports; + +namespace Jasper.ConfluentKafka +{ + public static class Protocols + { + public const string Kafka = "kafka"; + + } + + public class KafkaTransport : TransportBase + { + private readonly Dictionary _endpoints; + + public KafkaTopicRouter Topics { get; } = new KafkaTopicRouter(); + public KafkaTransport() : base(ConfluentKafka.Protocols.Kafka) + { + _endpoints = new Dictionary(); + } + + protected override IEnumerable endpoints() => _endpoints.Values; + + protected override KafkaEndpoint findEndpointByUri(Uri uri) + { + if (!_endpoints.ContainsKey(uri)) + { + _endpoints.Add(uri, new KafkaEndpoint(uri)); + } + + return _endpoints[uri]; + } + + public KafkaEndpoint EndpointForTopic(string topicName, ProducerConfig producerConifg) => + AddOrUpdateEndpoint(topicName, endpoint => endpoint.ProducerConfig = producerConifg); + + public KafkaEndpoint EndpointForTopic(string topicName, ConsumerConfig consumerConifg) => + AddOrUpdateEndpoint(topicName, endpoint => endpoint.ConsumerConfig = consumerConifg); + + KafkaEndpoint AddOrUpdateEndpoint(string topicName, Action configure) + { + var endpoint = new KafkaEndpoint + { + TopicName = topicName + }; + + if (_endpoints.ContainsKey(endpoint.Uri)) + { + endpoint = _endpoints[endpoint.Uri]; + configure(endpoint); + } + else + { + configure(endpoint); + _endpoints.Add(endpoint.Uri, endpoint); + } + + return endpoint; + } + + } +} diff --git a/src/Jasper.ConfluentKafka/KafkaTransportConfigurationExtensions.cs b/src/Jasper.ConfluentKafka/KafkaTransportConfigurationExtensions.cs new file mode 100644 index 000000000..e1d6b4516 --- /dev/null +++ b/src/Jasper.ConfluentKafka/KafkaTransportConfigurationExtensions.cs @@ -0,0 +1,95 @@ +using System; +using Baseline; +using Confluent.Kafka; +using Jasper.Configuration; +using Jasper.Kafka; + +namespace Jasper.ConfluentKafka +{ + public static class KafkaTransportConfigurationExtensions + {/// + /// Quick access to the kafka Transport within this application. + /// This is for advanced usage + /// + /// + /// + internal static KafkaTransport KafkaTransport(this IEndpoints endpoints) + { + var transports = endpoints.As(); + + var transport = transports.Get(); + + if (transport == null) + { + transport = new KafkaTransport(); + transports.Add(transport); + } + + transports.Subscribers.Fill(transport.Topics); + + return transport; + } + /// + /// Configure connection and authentication information about the Azure Service Bus usage + /// within this Jasper application + /// + /// + /// + public static void ConfigureKafka(this IEndpoints endpoints, Action configure) + { + var transport = endpoints.KafkaTransport(); + endpoints.As().Subscribers.Fill(transport.Topics); + configure(transport); + } + + /// + /// Configure connection and authentication information about the Azure Service Bus usage + /// within this Jasper application + /// + /// + /// + public static void ConfigureKafka(this IEndpoints endpoints) + { + endpoints.ConfigureKafka(_ => + { + + }); + } + + /// + /// Listen for incoming messages at the designated Kafka Topic by name + /// + /// + /// + /// + /// + /// + /// + public static KafkaListenerConfiguration ListenToKafkaTopic(this IEndpoints endpoints, string topicName, ConsumerConfig consumerConfig) + { + var endpoint = endpoints.KafkaTransport().EndpointForTopic(topicName, consumerConfig); + endpoint.IsListener = true; + return new KafkaListenerConfiguration(endpoint); + } + + /// + /// Publish matching messages to Kafka Topic using provided Producer Configuration + /// + /// + /// This is used as the topic name when publishing. Can be either a binding key or a queue name or a static topic name if the exchange is topic-based + /// Optional, you only need to supply this if you are using a non-default exchange + /// + public static KafkaSubscriberConfiguration ToKafkaTopic(this IPublishToExpression publishing, string topicName, ProducerConfig producerConfig) + { + var transports = publishing.As().Parent; + var transport = transports.Get(); + var endpoint = transport.EndpointForTopic(topicName, producerConfig); + + // This is necessary unfortunately to hook up the subscription rules + publishing.To(endpoint.Uri); + + return new KafkaSubscriberConfiguration(endpoint); + } + + } +} diff --git a/src/Jasper.ConfluentKafka/KafkaTransportProtocol.cs b/src/Jasper.ConfluentKafka/KafkaTransportProtocol.cs new file mode 100644 index 000000000..b7f4b7281 --- /dev/null +++ b/src/Jasper.ConfluentKafka/KafkaTransportProtocol.cs @@ -0,0 +1,66 @@ +using System.Collections.Generic; +using System.Linq; +using System.Text; +using Confluent.Kafka; +using Jasper.Transports; + +namespace Jasper.ConfluentKafka +{ + public class KafkaTransportProtocol : ITransportProtocol> + { + public const string KafkaMessageKeyHeader = "Confluent.Kafka.Message.Key"; + + public Message WriteFromEnvelope(Envelope envelope) + { + var message = new Message + { + Headers = new Headers(), + Value = envelope.Data + }; + + IDictionary envelopHeaders = new Dictionary(); + envelope.WriteToDictionary(envelopHeaders); + + var headers = new Headers(); + foreach (Header header in envelopHeaders.Select(h => new Header(h.Key, Encoding.UTF8.GetBytes(h.Value.ToString())))) + { + headers.Add(header); + } + + message.Headers = headers; + + if (!envelopHeaders.TryGetValue(KafkaMessageKeyHeader, out object msgKey)) return message; + + if (msgKey is byte[] key) + { + message.Key = key; + } + else + { + message.Key = Encoding.UTF8.GetBytes(msgKey.ToString()); + } + + return message; + } + + public Envelope ReadEnvelope(Message message) + { + var env = new Envelope() + { + Data = message.Value + }; + + Dictionary incomingHeaders = message.Headers.Select(h => new {h.Key, Value = h.GetValueBytes()}) + .ToDictionary(k => k.Key, v => (object)Encoding.UTF8.GetString(v.Value)); + + if(message.Key != null && !incomingHeaders.ContainsKey(KafkaMessageKeyHeader)) + { + env.Headers.Add(KafkaMessageKeyHeader, Encoding.UTF8.GetString(message.Key)); + } + + env.ReadPropertiesFromDictionary(incomingHeaders); + + return env; + } + } +} diff --git a/src/Jasper.Pulsar.Tests/Jasper.Pulsar.Tests.csproj b/src/Jasper.Pulsar.Tests/Jasper.Pulsar.Tests.csproj new file mode 100644 index 000000000..71c4ca988 --- /dev/null +++ b/src/Jasper.Pulsar.Tests/Jasper.Pulsar.Tests.csproj @@ -0,0 +1,29 @@ + + + + netcoreapp3.1 + false + + + + + + + + + + + + + + + + + + + + Servers.cs + + + + diff --git a/src/Jasper.Pulsar.Tests/PulsarEndpointTester.cs b/src/Jasper.Pulsar.Tests/PulsarEndpointTester.cs new file mode 100644 index 000000000..4789e50f0 --- /dev/null +++ b/src/Jasper.Pulsar.Tests/PulsarEndpointTester.cs @@ -0,0 +1,125 @@ +using System; +using AutoFixture.Xunit2; +using Shouldly; +using Xunit; + +namespace Jasper.Pulsar.Tests +{ + public class PulsarEndpointTester + { + [Fact] + public void parse_non_durable_uri() + { + var endpoint = new PulsarEndpoint(); + endpoint.Parse(new Uri($"{PulsarPersistence.Persistent}://tenant/jasper/key1")); + + endpoint.IsDurable.ShouldBeFalse(); + endpoint.Topic.TopicName.ShouldBe("key1"); + } + + [Theory, AutoData] + public void persistent_pulsar_topic_parts_match(string tenant, string @namespace, string topic) + { + var endpoint = new PulsarEndpoint(); + endpoint.Parse(new Uri($"{PulsarPersistence.Persistent}://{tenant}/{@namespace}/{topic}")); + + endpoint.Topic.Persistence.ShouldBe(PulsarPersistence.Persistent); + endpoint.Topic.Tenant.ShouldBe(tenant); + endpoint.Topic.Namespace.ShouldBe(@namespace); + endpoint.Topic.TopicName.ShouldBe(topic); + } + + [Theory, AutoData] + public void non_persistent_pulsar_topic_parts_match(string tenant, string @namespace, string topic) + { + var endpoint = new PulsarEndpoint(); + endpoint.Parse(new Uri($"{PulsarPersistence.NonPersistent}://{tenant}/{@namespace}/{topic}")); + + endpoint.Topic.Persistence.ShouldBe(PulsarPersistence.NonPersistent); + endpoint.Topic.Tenant.ShouldBe(tenant); + endpoint.Topic.Namespace.ShouldBe(@namespace); + endpoint.Topic.TopicName.ShouldBe(topic); + } + + [Fact] + public void parse_non_durable_persistent_uri() + { + var endpoint = new PulsarEndpoint(); + endpoint.Parse(new Uri($"{PulsarPersistence.Persistent}://tenant/jasper/key1")); + + endpoint.IsDurable.ShouldBeFalse(); + endpoint.Topic.Persistence.ShouldBe(PulsarPersistence.Persistent); + } + + [Fact] + public void parse_non_durable_non_persistent_uri() + { + var endpoint = new PulsarEndpoint(); + endpoint.Parse(new Uri($"{PulsarPersistence.NonPersistent}://tenant/jasper/key1")); + + endpoint.IsDurable.ShouldBeFalse(); + endpoint.Topic.Persistence.ShouldBe(PulsarPersistence.NonPersistent); + } + + [Fact] + public void parse_durable_persistent_uri() + { + var endpoint = new PulsarEndpoint(); + endpoint.Parse(new Uri($"{PulsarPersistence.Persistent}://tenant/jasper/key1/durable")); + + endpoint.IsDurable.ShouldBeTrue(); + endpoint.Topic.Persistence.ShouldBe(PulsarPersistence.Persistent); + } + + [Fact] + public void parse_durable_non_persistent_uri() + { + var endpoint = new PulsarEndpoint(); + endpoint.Parse(new Uri($"{PulsarPersistence.NonPersistent}://tenant/jasper/key1/durable")); + + endpoint.IsDurable.ShouldBeTrue(); + endpoint.Topic.Persistence.ShouldBe(PulsarPersistence.NonPersistent); + } + + [Fact] + public void parse_durable_uri() + { + var endpoint = new PulsarEndpoint(); + endpoint.Parse(new Uri($"{PulsarPersistence.Persistent}://tenant/jasper/key1/durable")); + + endpoint.IsDurable.ShouldBeTrue(); + endpoint.Topic.TopicName.ShouldBe("key1"); + } + + [Fact] + public void build_uri_for_subscription_and_topic() + { + new PulsarEndpoint + { + Topic = $"{PulsarPersistence.Persistent}://tenant/jasper/key1" + } + .Uri.ShouldBe(new Uri($"{PulsarPersistence.Persistent}://tenant/jasper/key1")); + } + + [Fact] + public void generate_reply_uri_for_non_durable() + { + new PulsarEndpoint + { + Topic = $"{PulsarPersistence.Persistent}://tenant/jasper/key1" + } + .ReplyUri().ShouldBe(new Uri($"{PulsarPersistence.Persistent}://tenant/jasper/key1")); + } + + [Fact] + public void generate_reply_uri_for_durable() + { + new PulsarEndpoint + { + Topic = $"{PulsarPersistence.Persistent}://tenant/jasper/key1", + IsDurable = true + }.ReplyUri().ShouldBe(new Uri($"{PulsarPersistence.Persistent}://tenant/jasper/key1/durable")); + } + + } +} diff --git a/src/Jasper.Pulsar.Tests/PulsarSendingComplianceTests.cs b/src/Jasper.Pulsar.Tests/PulsarSendingComplianceTests.cs new file mode 100644 index 000000000..8cf1e027c --- /dev/null +++ b/src/Jasper.Pulsar.Tests/PulsarSendingComplianceTests.cs @@ -0,0 +1,98 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Baseline.Dates; +using DotPulsar; +using DotPulsar.Internal; +using Jasper.Tracking; +using Newtonsoft.Json; +using Shouldly; +using TestingSupport.Compliance; +using TestMessages; +using Xunit; + +namespace Jasper.Pulsar.Tests +{ + public class PulsarSendingComplianceTestsShell + { + private static string Server = "pulsar://localhost:6650"; + + public class Sender : JasperOptions + { + public Sender(string topic) + { + Endpoints.ConfigurePulsar(new PulsarClientBuilder() + .ServiceUrl(new Uri(Server))); + Endpoints.PublishAllMessages().ToPulsarTopic(new ProducerOptions(topic)); + Endpoints.ListenToPulsarTopic(Guid.NewGuid().ToString(), topic + "-reply").UseForReplies(); + } + } + + public class Receiver : JasperOptions + { + public Receiver(string topic) + { + Endpoints.ConfigurePulsar(new PulsarClientBuilder().ServiceUrl(new Uri(Server))); + Endpoints.PublishAllMessages().ToPulsarTopic(topic + "-reply"); + Endpoints.ListenToPulsarTopic(Guid.NewGuid().ToString(), topic); + } + } + + public class PulsarSendingComplianceTests : SendingCompliance + { + public static string Topic { get; } = "persistent://public/default/jasper"; + + public PulsarSendingComplianceTests() : base(new Uri(Topic), 30.Seconds()) + { + var sender = new Sender(Topic); + + SenderIs(sender); + + var receiver = new Receiver(Topic); + + ReceiverIs(receiver); + + Thread.Sleep(2000); + } + + [Fact] + + public async Task publish_failures_reported_to_caller() + { + _ = await theSender.TrackActivity(10.Seconds()) + .DoNotAssertOnExceptionsDetected() + .DoNotAssertTimeout() + .ExecuteAndWait(c => + { + var serializationException = Should.Throw(c.Publish(new PoisonEnvelop())); + serializationException.InnerException.ShouldBeOfType(); + return Task.CompletedTask; + }); + } + + [Fact] + + public async Task publish_succeeds() + { + _ = await theSender.TrackActivity(10.Seconds()) + .DoNotAssertOnExceptionsDetected() + .DoNotAssertTimeout() + .ExecuteAndWait(c => c.Publish(new Message1())); + } + } + } + + public class PoisionMessageException : Exception + { + public const string PoisonMessage = "Poison message"; + public PoisionMessageException() : base(PoisonMessage) + { + + } + } + + public class PoisonEnvelop : Envelope + { + public new byte[] Data => throw new PoisionMessageException(); + } +} diff --git a/src/Jasper.Pulsar/Exceptions/PulsarSenderException.cs b/src/Jasper.Pulsar/Exceptions/PulsarSenderException.cs new file mode 100644 index 000000000..a79d98db4 --- /dev/null +++ b/src/Jasper.Pulsar/Exceptions/PulsarSenderException.cs @@ -0,0 +1,14 @@ +using System; + +namespace Jasper.Pulsar.Exceptions +{ + public class PulsarSenderException : ApplicationException + { + //public Error Error { get; } + + //public PulsarSenderException(Error error) : base(error.Reason) + //{ + // Error = error; + //} + } +} diff --git a/src/Jasper.Pulsar/Internal/PulsarListener.cs b/src/Jasper.Pulsar/Internal/PulsarListener.cs new file mode 100644 index 000000000..3dfa6d06d --- /dev/null +++ b/src/Jasper.Pulsar/Internal/PulsarListener.cs @@ -0,0 +1,77 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using DotPulsar; +using DotPulsar.Abstractions; +using Jasper.Logging; +using Jasper.Transports; + +namespace Jasper.Pulsar.Internal +{ + public class PulsarListener : IListener + { + private readonly CancellationToken _cancellation; + private readonly IConsumer _consumer; + private readonly PulsarEndpoint _endpoint; + private readonly ITransportLogger _logger; + private IReceiverCallback _callback; + private readonly ITransportProtocol _protocol; + private Task _consumerTask; + + public PulsarListener(PulsarEndpoint endpoint, ITransportLogger logger, CancellationToken cancellation) + { + _endpoint = endpoint; + _logger = logger; + _cancellation = cancellation; + _protocol = new PulsarTransportProtocol(); + _consumer = endpoint.PulsarClient.CreateConsumer(endpoint.ConsumerOptions); + } + + public void Dispose() + { + _consumer?.DisposeAsync(); + _consumerTask?.Dispose(); + } + + public Uri Address => _endpoint.Uri; + public ListeningStatus Status { get; set; } + + public void Start(IReceiverCallback callback) + { + _callback = callback; + + _consumerTask = ConsumeAsync(); + + _logger.ListeningStatusChange(ListeningStatus.Accepting); + } + + private async Task ConsumeAsync() + { + await foreach (Message message in _consumer.Messages(_cancellation)) + { + Envelope envelope; + + try + { + envelope = _protocol.ReadEnvelope(new PulsarMessage(message.Data, message.Properties)); + } + catch (Exception ex) + { + _logger.LogException(ex, message: $"Error trying to map an incoming Pulsar {_endpoint.Topic} Topic message to an Envelope. See the Dead Letter Queue"); + continue; + } + + try + { + await _callback.Received(Address, new[] {envelope}); + + await _consumer.Acknowledge(message, _cancellation); + } + catch (Exception e) + { + _logger.LogException(e, envelope.Id, "Error trying to receive a message from " + Address); + } + } + } + } +} diff --git a/src/Jasper.Pulsar/Internal/PulsarMessage.cs b/src/Jasper.Pulsar/Internal/PulsarMessage.cs new file mode 100644 index 000000000..5d0dda5df --- /dev/null +++ b/src/Jasper.Pulsar/Internal/PulsarMessage.cs @@ -0,0 +1,33 @@ +using System.Buffers; +using System.Collections.Generic; +using DotPulsar; + +namespace Jasper.Pulsar.Internal +{ + internal class PulsarMessage + { + public MessageMetadata Metadata { get; } = new MessageMetadata(); + public ReadOnlySequence Data { get; } + public IReadOnlyDictionary Properties { get; } = new Dictionary(); + + public PulsarMessage(ReadOnlySequence data) + { + Data = data; + } + + public PulsarMessage(ReadOnlySequence data, MessageMetadata metadata) + { + Data = data; + Metadata = metadata; + } + + public PulsarMessage(byte[] data, MessageMetadata metadata) : this(new ReadOnlySequence(data), metadata) + { + } + + public PulsarMessage(ReadOnlySequence data, IReadOnlyDictionary properties) : this(data) + { + Properties = properties; + } + } +} diff --git a/src/Jasper.Pulsar/Internal/PulsarTopicRouter.cs b/src/Jasper.Pulsar/Internal/PulsarTopicRouter.cs new file mode 100644 index 000000000..914ac1442 --- /dev/null +++ b/src/Jasper.Pulsar/Internal/PulsarTopicRouter.cs @@ -0,0 +1,30 @@ +using System; +using Baseline; +using Jasper.Configuration; +using Jasper.Runtime.Routing; + +namespace Jasper.Pulsar.Internal +{ + public class PulsarTopicRouter : TopicRouter + { + public override Uri BuildUriForTopic(string topic) + { + var endpoint = new PulsarEndpoint + { + IsDurable = true, + Topic = topic + }; + + return endpoint.Uri; + } + + public override PulsarSubscriberConfiguration FindConfigurationForTopic(string topicName, + IEndpoints endpoints) + { + Uri uri = BuildUriForTopic(topicName); + Endpoint endpoint = endpoints.As().GetOrCreateEndpoint(uri); + + return new PulsarSubscriberConfiguration((PulsarEndpoint) endpoint); + } + } +} diff --git a/src/Jasper.Pulsar/Internal/PulsarTransportProtocol.cs b/src/Jasper.Pulsar/Internal/PulsarTransportProtocol.cs new file mode 100644 index 000000000..e1db00733 --- /dev/null +++ b/src/Jasper.Pulsar/Internal/PulsarTransportProtocol.cs @@ -0,0 +1,65 @@ +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Linq; +using DotPulsar; +using Jasper.Pulsar.Internal; +using Jasper.Transports; + +namespace Jasper.Pulsar +{ + internal class PulsarTransportProtocol : ITransportProtocol + { + public const string PulsarMessageKeyHeader = "Pulsar.Message.Key"; + public const string PulsarMessageSequenceIdHeader = "Pulsar.Message.SequenceId"; + + private readonly Dictionary> _pulsarMsgPropTypes = new Dictionary>() + { + { PulsarMessageKeyHeader, (metadata, val) => metadata.Key = val?.ToString() }, + { PulsarMessageSequenceIdHeader, (metadata, val) => metadata.SequenceId = val != null ? ulong.Parse(val.ToString()) : default }, + }; + + public PulsarMessage WriteFromEnvelope(Envelope envelope) + { + IDictionary envelopHeaders = new Dictionary(); + envelope.WriteToDictionary(envelopHeaders); + + var metadata = new MessageMetadata(); + + foreach (var header in envelopHeaders.Where(h => !_pulsarMsgPropTypes.Keys.Contains(h.Key))) + { + metadata[header.Key] = header.Value.ToString(); + } + + SetMetaDataFromHeaderValues(metadata, envelopHeaders); + + return new PulsarMessage(envelope.Data, metadata); + } + + private void SetMetaDataFromHeaderValues(MessageMetadata metadata, IDictionary envelopHeaders) + { + foreach (var pulsarMsgPropType in _pulsarMsgPropTypes) + { + SetMetaDataFromHeaderValue(metadata, envelopHeaders, pulsarMsgPropType.Key, pulsarMsgPropType.Value); + } + } + + private void SetMetaDataFromHeaderValue(MessageMetadata metadata, IDictionary envelopHeaders, string propertyName, Action propertySetter) + { + if (envelopHeaders.TryGetValue(propertyName, out object headerValue)) propertySetter(metadata, headerValue); + } + + public Envelope ReadEnvelope(PulsarMessage message) + { + var envelope = new Envelope + { + Data = message.Data.ToArray(), + Headers = message.Properties.ToDictionary(ks => ks.Key, vs => vs.Value) + }; + + envelope.ReadPropertiesFromDictionary(message.Properties.ToDictionary(ks => ks.Key, vs => (object)vs.Value)); + + return envelope; + } + } +} diff --git a/src/Jasper.Pulsar/Internal/PulssarSender.cs b/src/Jasper.Pulsar/Internal/PulssarSender.cs new file mode 100644 index 000000000..7371c68a2 --- /dev/null +++ b/src/Jasper.Pulsar/Internal/PulssarSender.cs @@ -0,0 +1,58 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using DotPulsar.Abstractions; +using Jasper.Transports; +using Jasper.Transports.Sending; + +namespace Jasper.Pulsar.Internal +{ + public class PulsarSender : ISender + { + private readonly ITransportProtocol _protocol; + private readonly IProducer _publisher; + private readonly PulsarEndpoint _endpoint; + private readonly CancellationToken _cancellationToken; + public bool SupportsNativeScheduledSend { get; } = false; + public Uri Destination => _endpoint.Uri; + public PulsarSender(PulsarEndpoint endpoint, CancellationToken cancellationToken) + { + _endpoint = endpoint; + _cancellationToken = cancellationToken; + _publisher = endpoint.PulsarClient.CreateProducer(endpoint.ProducerOptions); + _protocol = new PulsarTransportProtocol(); + } + + public void Dispose() + { + _publisher?.DisposeAsync(); + } + + public async Task Ping(CancellationToken cancellationToken) + { + Envelope envelope = Envelope.ForPing(Destination); + try + { + await Send(envelope); + } + catch + { + return false; + } + + return true; + } + + public async Task Send(Envelope envelope) + { + if (envelope.IsDelayed(DateTime.UtcNow)) + { + throw new Exception("Delayed Message Delivery"); + } + + var message = _protocol.WriteFromEnvelope(envelope); + + _ = await _publisher.Send(message.Metadata, message.Data, _cancellationToken); + } + } +} diff --git a/src/Jasper.Pulsar/Jasper.Pulsar.csproj b/src/Jasper.Pulsar/Jasper.Pulsar.csproj new file mode 100644 index 000000000..81d7faa2c --- /dev/null +++ b/src/Jasper.Pulsar/Jasper.Pulsar.csproj @@ -0,0 +1,34 @@ + + + + Pulsar support for Jasper Messaging Systems + Jeremy D. Miller, Jarrod Johnson + netstandard2.1 + portable + Jasper.Pulsar + Jasper.Pulsar + https://avatars2.githubusercontent.com/u/10048186?v=3&s=200 + http://jasperfx.github.io + https://github.com/JasperFX/jasper/blob/master/LICENSE.txt + false + false + false + false + false + + + + + + + + + CommonAssemblyInfo.cs + + + + + + + + diff --git a/src/Jasper.Pulsar/PulsarEndpoint.cs b/src/Jasper.Pulsar/PulsarEndpoint.cs new file mode 100644 index 000000000..f028e4c4d --- /dev/null +++ b/src/Jasper.Pulsar/PulsarEndpoint.cs @@ -0,0 +1,70 @@ +using System; +using DotPulsar; +using DotPulsar.Abstractions; +using Jasper.Configuration; +using Jasper.Pulsar.Internal; +using Jasper.Runtime; +using Jasper.Transports; +using Jasper.Transports.Sending; + +namespace Jasper.Pulsar +{ + public class PulsarEndpoint : Endpoint + { + public PulsarTopic Topic { get; set; } + + public override Uri Uri => BuildUri(false); + public ConsumerOptions ConsumerOptions { get; set; } + public ProducerOptions ProducerOptions { get; set; } + public IPulsarClient PulsarClient { get; set; } + + public PulsarEndpoint() + { + + } + + public PulsarEndpoint(string topic) + { + Topic = topic; + } + + public PulsarEndpoint(Uri uri) : base(uri) + { + Topic = uri; + } + + public override void Parse(Uri uri) + { + IsDurable = uri.ToString().EndsWith(TransportConstants.Durable); + var url = uri.ToString(); + string pulsarTopic = url.Substring(0, url.Length - (IsDurable ? TransportConstants.Durable.Length + 1 : 0)); + + Topic = new PulsarTopic(pulsarTopic); + } + + private Uri BuildUri(bool forReply = false) + { + if (forReply && IsDurable) + { + return new Uri(Topic + $"/{TransportConstants.Durable}"); + } + + return Topic; + } + + protected internal override void StartListening(IMessagingRoot root, ITransportRuntime runtime) + { + if (!IsListener) return; + + var listener = new PulsarListener(this, root.TransportLogger, root.Cancellation); + runtime.AddListener(listener, this); + } + + protected override ISender CreateSender(IMessagingRoot root) + { + return new PulsarSender(this, root.Cancellation); + } + + public override Uri ReplyUri() => BuildUri(true); + } +} diff --git a/src/Jasper.Pulsar/PulsarListenerConfiguration.cs b/src/Jasper.Pulsar/PulsarListenerConfiguration.cs new file mode 100644 index 000000000..b3d0d54d9 --- /dev/null +++ b/src/Jasper.Pulsar/PulsarListenerConfiguration.cs @@ -0,0 +1,12 @@ +using Jasper.Configuration; +using Jasper.Pulsar; + +namespace Jasper.Pulsar +{ + public class PulsarListenerConfiguration : ListenerConfiguration + { + public PulsarListenerConfiguration(PulsarEndpoint endpoint) : base(endpoint) + { + } + } +} diff --git a/src/Jasper.Pulsar/PulsarSubscriberConfiguration.cs b/src/Jasper.Pulsar/PulsarSubscriberConfiguration.cs new file mode 100644 index 000000000..95851dd98 --- /dev/null +++ b/src/Jasper.Pulsar/PulsarSubscriberConfiguration.cs @@ -0,0 +1,12 @@ +using Jasper.Configuration; + +namespace Jasper.Pulsar +{ + public class PulsarSubscriberConfiguration : SubscriberConfiguration + { + public PulsarSubscriberConfiguration(PulsarEndpoint endpoint) : base(endpoint) + { + } + + } +} diff --git a/src/Jasper.Pulsar/PulsarTopic.cs b/src/Jasper.Pulsar/PulsarTopic.cs new file mode 100644 index 000000000..708b7d349 --- /dev/null +++ b/src/Jasper.Pulsar/PulsarTopic.cs @@ -0,0 +1,49 @@ +using System; +using System.Linq; +using System.Text.RegularExpressions; + +namespace Jasper.Pulsar +{ + public static class PulsarPersistence + { + public const string Persistent = "persistent"; + public const string NonPersistent = "non-persistent"; + } + + public struct PulsarTopic + { + public string Persistence { get; } + public string Tenant { get; } + public string Namespace { get; } + public string TopicName { get; } + + private const string PulsarTopicRegex = @"(non-persistent|persistent)://([-A-Za-z0-9]*)/([-A-Za-z0-9]*)/([-A-Za-z0-9]*)?"; + + private const string InvalidTopicFormatMessage = + "Invalid Pulsar topic. Expecting format of \"{persistent|non-persistent}://tenant/namespace/topic\""; + + public PulsarTopic(Uri topic) : this(topic?.ToString()) + { + + } + + public PulsarTopic(string fullyQualifiedTopic) + { + MatchCollection match = Regex.Matches(fullyQualifiedTopic, PulsarTopicRegex, RegexOptions.Compiled); + + if (!match.Any()) + throw new ArgumentException(InvalidTopicFormatMessage, nameof(fullyQualifiedTopic)); + + Persistence = match[0].Groups[1].Captures[0].Value; + Tenant = match[0].Groups[2].Captures[0].Value; + Namespace = match[0].Groups[3].Captures[0].Value; + TopicName = match[0].Groups[4].Captures[0].Value; + } + + public static explicit operator string(PulsarTopic topic) => topic.ToString(); + public static implicit operator Uri(PulsarTopic topic) => new Uri(topic.ToString()); + public static implicit operator PulsarTopic(string topic) => new PulsarTopic(topic); + public static implicit operator PulsarTopic(Uri topic) => new PulsarTopic(topic); + + public override string ToString() => $"{Persistence}://{Tenant}/{Namespace}/{TopicName}"; } +} diff --git a/src/Jasper.Pulsar/PulsarTransport.cs b/src/Jasper.Pulsar/PulsarTransport.cs new file mode 100644 index 000000000..b41d1fb2c --- /dev/null +++ b/src/Jasper.Pulsar/PulsarTransport.cs @@ -0,0 +1,78 @@ +using System; +using System.Collections.Generic; +using DotPulsar; +using DotPulsar.Abstractions; +using Jasper.Pulsar.Internal; +using Jasper.Transports; + +namespace Jasper.Pulsar +{ + public static class Protocols + { + public static readonly string[] Pulsar = { "persistent", "non-persistent" }; + } + + public class PulsarTransport : TransportBase + { + private readonly Dictionary _endpoints; + + public PulsarTopicRouter Topics { get; } = new PulsarTopicRouter(); + public PulsarTransport() : base(Pulsar.Protocols.Pulsar) + { + _endpoints = new Dictionary(); + } + + public IPulsarClient PulsarClient { get; set; } + + protected override IEnumerable endpoints() => _endpoints.Values; + + protected override PulsarEndpoint findEndpointByUri(Uri uri) + { + if (!_endpoints.ContainsKey(uri)) + { + _endpoints.Add(uri, new PulsarEndpoint(uri) + { + PulsarClient = PulsarClient + }); + } + + return _endpoints[uri]; + } + + public PulsarEndpoint EndpointFor(ProducerOptions producerConifg) => + AddOrUpdateEndpoint(endpoint => + { + endpoint.Topic = producerConifg.Topic; + endpoint.ProducerOptions = producerConifg; + }); + + public PulsarEndpoint EndpointFor(ConsumerOptions consumerConifg) => + AddOrUpdateEndpoint(endpoint => + { + endpoint.Topic = consumerConifg.Topic; + endpoint.ConsumerOptions = consumerConifg; + }); + + PulsarEndpoint AddOrUpdateEndpoint(Action configure) + { + var endpoint = new PulsarEndpoint + { + PulsarClient = PulsarClient + }; + + configure(endpoint); + + if (_endpoints.ContainsKey(endpoint.Uri)) + { + endpoint = _endpoints[endpoint.Uri]; + } + else + { + _endpoints.Add(endpoint.Uri, endpoint); + } + + return endpoint; + } + + } +} diff --git a/src/Jasper.Pulsar/PulsarTransportConfigurationExtensions.cs b/src/Jasper.Pulsar/PulsarTransportConfigurationExtensions.cs new file mode 100644 index 000000000..89c741123 --- /dev/null +++ b/src/Jasper.Pulsar/PulsarTransportConfigurationExtensions.cs @@ -0,0 +1,124 @@ +using System; +using Baseline; +using DotPulsar; +using DotPulsar.Abstractions; +using DotPulsar.Internal; +using Jasper.Configuration; +using Jasper.Pulsar; + +namespace Jasper.Pulsar +{ + public static class PulsarTransportConfigurationExtensions + {/// + /// Quick access to the pulsar Transport within this application. + /// This is for advanced usage + /// + /// + /// + internal static PulsarTransport PulsarTransport(this IEndpoints endpoints) + { + var transports = endpoints.As(); + + var transport = transports.Get(); + + if (transport == null) + { + transport = new PulsarTransport(); + transports.Add(transport); + } + + transports.Subscribers.Fill(transport.Topics); + + return transport; + } + /// + /// Configure connection and authentication information about the Azure Service Bus usage + /// within this Jasper application + /// + /// + /// + public static void ConfigurePulsar(this IEndpoints endpoints, Action configure) + { + var transport = endpoints.PulsarTransport(); + endpoints.As().Subscribers.Fill(transport.Topics); + configure(transport); + } + + /// + /// Configure connection and authentication information about the Azure Service Bus usage + /// within this Jasper application + /// + /// + /// + public static void ConfigurePulsar(this IEndpoints endpoints, IPulsarClient client) + { + endpoints.ConfigurePulsar(_ => { _.PulsarClient = client; }); + } + + + public static void ConfigurePulsar(this IEndpoints endpoints, string pulsarCluster) + { + endpoints.ConfigurePulsar(_ => + { + _.PulsarClient = new PulsarClientBuilder().ServiceUrl(new Uri(pulsarCluster)).Build(); + }); + } + + public static void ConfigurePulsar(this IEndpoints endpoints, Uri pulsarCluster) + { + endpoints.ConfigurePulsar(_ => + { + _.PulsarClient = new PulsarClientBuilder().ServiceUrl(pulsarCluster).Build(); + }); + } + + public static void ConfigurePulsar(this IEndpoints endpoints, IPulsarClientBuilder pulsarClientBuilder) + { + endpoints.ConfigurePulsar(_ => + { + _.PulsarClient = pulsarClientBuilder.Build(); + }); + } + + /// + /// Listen for incoming messages at the designated Pulsar Topic by name + /// + /// + /// + /// + /// + /// + /// + public static PulsarListenerConfiguration ListenToPulsarTopic(this IEndpoints endpoints, string subscription, string topicName) => + ListenToPulsarTopic(endpoints, new ConsumerOptions(subscription, topicName)); + + public static PulsarListenerConfiguration ListenToPulsarTopic(this IEndpoints endpoints, ConsumerOptions consumerConfig) + { + var endpoint = endpoints.PulsarTransport().EndpointFor(consumerConfig); + endpoint.IsListener = true; + return new PulsarListenerConfiguration(endpoint); + } + + /// + /// Publish matching messages to Pulsar Topic using provided Producer Configuration + /// + /// + /// This is used as the topic name when publishing. Can be either a binding key or a queue name or a static topic name if the exchange is topic-based + /// Optional, you only need to supply this if you are using a non-default exchange + /// + public static PulsarSubscriberConfiguration ToPulsarTopic(this IPublishToExpression publishing, string topicName) => ToPulsarTopic(publishing, new ProducerOptions(topicName)); + + public static PulsarSubscriberConfiguration ToPulsarTopic(this IPublishToExpression publishing, ProducerOptions producerOptions) + { + var transports = publishing.As().Parent; + var transport = transports.Get(); + var endpoint = transport.EndpointFor(producerOptions); + + // This is necessary unfortunately to hook up the subscription rules + publishing.To(endpoint.Uri); + + return new PulsarSubscriberConfiguration(endpoint); + } + + } +} diff --git a/src/Jasper.RabbitMQ/Internal/RabbitMqConnectionAgent.cs b/src/Jasper.RabbitMQ/Internal/RabbitMqConnectionAgent.cs index 3c785d8dc..71bbe2576 100644 --- a/src/Jasper.RabbitMQ/Internal/RabbitMqConnectionAgent.cs +++ b/src/Jasper.RabbitMQ/Internal/RabbitMqConnectionAgent.cs @@ -21,7 +21,7 @@ public void Dispose() teardownConnection(); } - internal void Connect() + internal void EnsureConnected() { lock (_locker) { diff --git a/src/Jasper.RabbitMQ/Internal/RabbitMqEndpoint.cs b/src/Jasper.RabbitMQ/Internal/RabbitMqEndpoint.cs index 5531ca95c..1990da238 100644 --- a/src/Jasper.RabbitMQ/Internal/RabbitMqEndpoint.cs +++ b/src/Jasper.RabbitMQ/Internal/RabbitMqEndpoint.cs @@ -2,13 +2,11 @@ using System.Collections.Generic; using System.Linq; using Baseline; -using ImTools; using Jasper.Configuration; using Jasper.Runtime; using Jasper.Transports; using Jasper.Transports.Sending; using Jasper.Util; -using Microsoft.CodeAnalysis.CSharp.Syntax; namespace Jasper.RabbitMQ.Internal { @@ -127,10 +125,8 @@ protected internal override void StartListening(IMessagingRoot root, ITransportR protected override ISender CreateSender(IMessagingRoot root) { - return new RabbitMqSender(root.TransportLogger, this, Parent, root.Cancellation); + return new RabbitMqSender(this, this.Parent); } - - } diff --git a/src/Jasper.RabbitMQ/Internal/RabbitMqListener.cs b/src/Jasper.RabbitMQ/Internal/RabbitMqListener.cs index 8892487bb..39daba259 100644 --- a/src/Jasper.RabbitMQ/Internal/RabbitMqListener.cs +++ b/src/Jasper.RabbitMQ/Internal/RabbitMqListener.cs @@ -1,4 +1,4 @@ -using System; +using System; using Baseline; using Jasper.Logging; using Jasper.Transports; @@ -48,7 +48,7 @@ public void Start(IReceiverCallback callback) { if (callback == null) return; - Connect(); + EnsureConnected(); _callback = callback; _consumer = new MessageConsumer(callback, _logger, Channel, _mapper, Address) diff --git a/src/Jasper.RabbitMQ/Internal/RabbitMqSender.cs b/src/Jasper.RabbitMQ/Internal/RabbitMqSender.cs index 8b8ba8dbb..3e242841f 100644 --- a/src/Jasper.RabbitMQ/Internal/RabbitMqSender.cs +++ b/src/Jasper.RabbitMQ/Internal/RabbitMqSender.cs @@ -1,9 +1,6 @@ -using System; -using System.Security.Authentication.ExtendedProtection; +using System; using System.Threading; using System.Threading.Tasks; -using System.Threading.Tasks.Dataflow; -using Jasper.Logging; using Jasper.Transports; using Jasper.Transports.Sending; using RabbitMQ.Client; @@ -12,77 +9,39 @@ namespace Jasper.RabbitMQ.Internal { public class RabbitMqSender : RabbitMqConnectionAgent, ISender { - private readonly CancellationToken _cancellation; - private readonly ITransportLogger _logger; private readonly IRabbitMqProtocol _protocol; - private ISenderCallback _callback; - private ActionBlock _sending; private readonly string _exchangeName; private readonly string _key; private readonly bool _isDurable; + public bool SupportsNativeScheduledSend { get; } = false; + public Uri Destination { get; } - public RabbitMqSender(ITransportLogger logger, RabbitMqEndpoint endpoint, RabbitMqTransport transport, - CancellationToken cancellation) : base(transport) + public RabbitMqSender(RabbitMqEndpoint endpoint, RabbitMqTransport transport) : base(transport) { _protocol = endpoint.Protocol; - _logger = logger; - _cancellation = cancellation; Destination = endpoint.Uri; - + _isDurable = endpoint.IsDurable; _exchangeName = endpoint.ExchangeName == TransportConstants.Default ? "" : endpoint.ExchangeName; _key = endpoint.RoutingKey ?? endpoint.QueueName ?? ""; } - - - public void Start(ISenderCallback callback) - { - Connect(); - - _callback = callback; - - _sending = new ActionBlock(send, new ExecutionDataflowBlockOptions - { - CancellationToken = _cancellation - }); - } - - public Task Enqueue(Envelope envelope) +#pragma warning disable 1998 + public async Task Send(Envelope envelope) +#pragma warning restore 1998 { - _sending.Post(envelope); - - return Task.CompletedTask; - } - - public Uri Destination { get; } - public int QueuedCount => _sending.InputCount; - - public bool Latched { get; private set; } - - public Task LatchAndDrain() - { - Latched = true; - - Stop(); - - _sending.Complete(); - - - _logger.CircuitBroken(Destination); - - return Task.CompletedTask; - } + EnsureConnected(); + if (State == AgentState.Disconnected) + throw new InvalidOperationException($"The RabbitMQ agent for {Destination} is disconnected"); + var props = Channel.CreateBasicProperties(); + props.Persistent = _isDurable; - public void Unlatch() - { - _logger.CircuitResumed(Destination); + _protocol.WriteFromEnvelope(envelope, props); - Start(_callback); - Latched = false; + Channel.BasicPublish(_exchangeName, _key, props, envelope.Data); } public Task Ping(CancellationToken cancellationToken) @@ -104,36 +63,5 @@ public Task Ping(CancellationToken cancellationToken) } } } - - public bool SupportsNativeScheduledSend { get; } = false; - - private async Task send(Envelope envelope) - { - if (State == AgentState.Disconnected) - throw new InvalidOperationException($"The RabbitMQ agent for {Destination} is disconnected"); - - try - { - var props = Channel.CreateBasicProperties(); - props.Persistent = _isDurable; - - _protocol.WriteFromEnvelope(envelope, props); - - Channel.BasicPublish(_exchangeName, _key, props, envelope.Data); - - await _callback.Successful(envelope); - } - catch (Exception e) - { - try - { - await _callback.ProcessingFailure(envelope, e); - } - catch (Exception exception) - { - _logger.LogException(exception); - } - } - } } } diff --git a/src/Jasper.Testing/Configuration/TransportCollectionTests.cs b/src/Jasper.Testing/Configuration/TransportCollectionTests.cs index 87f6be921..8a2a7e00e 100644 --- a/src/Jasper.Testing/Configuration/TransportCollectionTests.cs +++ b/src/Jasper.Testing/Configuration/TransportCollectionTests.cs @@ -23,7 +23,7 @@ public class TransportCollectionTests public void add_transport() { var transport = Substitute.For(); - transport.Protocol.Returns("fake"); + transport.Protocols.Returns(new []{"fake"}); var collection = new TransportCollection(new JasperOptions()) {transport}; @@ -133,7 +133,7 @@ public void Dispose() throw new NotImplementedException(); } - public string Protocol { get; } = "fake"; + public ICollection Protocols { get; } = new []{"fake"}; public ISendingAgent BuildSendingAgent(Uri uri, IMessagingRoot root, CancellationToken cancellation) { throw new NotImplementedException(); diff --git a/src/Jasper.Testing/Runtime/ping_handling.cs b/src/Jasper.Testing/Runtime/ping_handling.cs index c01f08cc4..f2ebf8b9c 100644 --- a/src/Jasper.Testing/Runtime/ping_handling.cs +++ b/src/Jasper.Testing/Runtime/ping_handling.cs @@ -20,7 +20,7 @@ public async Task ping_happy_path_with_tcp() var sender = new BatchedSender("tcp://localhost:2222".ToUri(), new SocketSenderProtocol(), CancellationToken.None, TransportLogger.Empty()); - sender.Start(new StubSenderCallback()); + sender.RegisterCallback(new StubSenderCallback()); await sender.Ping(CancellationToken.None); } diff --git a/src/Jasper.Testing/Transports/Sending/BatchedSenderTests.cs b/src/Jasper.Testing/Transports/Sending/BatchedSenderTests.cs index 9e0175487..7bda03c00 100644 --- a/src/Jasper.Testing/Transports/Sending/BatchedSenderTests.cs +++ b/src/Jasper.Testing/Transports/Sending/BatchedSenderTests.cs @@ -1,4 +1,4 @@ -using System.Threading; +using System.Threading; using System.Threading.Tasks; using Baseline; using Jasper.Logging; @@ -16,7 +16,8 @@ public BatchedSenderTests() { theSender = new BatchedSender(TransportConstants.RepliesUri, theProtocol, theCancellation.Token, TransportLogger.Empty()); - theSender.Start(theSenderCallback); + + theSender.RegisterCallback(theSenderCallback); theBatch = new OutgoingMessageBatch(theSender.Destination, new[] { diff --git a/src/Jasper.Testing/Transports/Sending/NulloSenderTester.cs b/src/Jasper.Testing/Transports/Sending/NulloSenderTester.cs index 88da40996..5fc5f9a68 100644 --- a/src/Jasper.Testing/Transports/Sending/NulloSenderTester.cs +++ b/src/Jasper.Testing/Transports/Sending/NulloSenderTester.cs @@ -1,8 +1,8 @@ +using System.Threading; using System.Threading.Tasks; using Jasper.Testing.Messaging; using Jasper.Transports.Sending; using Jasper.Util; -using NSubstitute; using Xunit; namespace Jasper.Testing.Transports.Sending @@ -14,14 +14,9 @@ public async Task enqueue_automatically_marks_envelope_as_successful() { var sender = new NulloSender("tcp://localhost:3333".ToUri()); - var callback = Substitute.For(); - sender.Start(callback); - var env = ObjectMother.Envelope(); - await sender.Enqueue(env); - - callback.Received().Successful(env); + await sender.Send(env); } } } diff --git a/src/Jasper.Testing/Transports/Tcp/LightweightTcpTransportCompliance.cs b/src/Jasper.Testing/Transports/Tcp/LightweightTcpTransportCompliance.cs index 5bd3eac5c..ab9766089 100644 --- a/src/Jasper.Testing/Transports/Tcp/LightweightTcpTransportCompliance.cs +++ b/src/Jasper.Testing/Transports/Tcp/LightweightTcpTransportCompliance.cs @@ -1,3 +1,9 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.NetworkInformation; +using System.Net.Sockets; using Jasper.Util; using TestingSupport.Compliance; using Xunit; @@ -6,28 +12,54 @@ namespace Jasper.Testing.Transports.Tcp { public class Sender : JasperOptions { + public Sender(int portNumber) + { + Endpoints.ListenForMessagesFrom($"tcp://localhost:{portNumber}/incoming".ToUri()); + } + public Sender() + : this(2389) { - Endpoints.ListenForMessagesFrom($"tcp://localhost:2289/incoming".ToUri()); + } } public class Receiver : JasperOptions { - public Receiver() + public Receiver(int portNumber) + { + Endpoints.ListenForMessagesFrom($"tcp://localhost:{portNumber}/incoming".ToUri()); + } + + public Receiver() : this(2388) + { + + } + } + + + public class PortFinder + { + private static readonly IPEndPoint DefaultLoopbackEndpoint = new IPEndPoint(IPAddress.Loopback, port: 0); + + public static int GetAvailablePort() { - Endpoints.ListenForMessagesFrom($"tcp://localhost:2288/incoming".ToUri()); + using var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + socket.Bind(DefaultLoopbackEndpoint); + var port = ((IPEndPoint)socket.LocalEndPoint).Port; + return port; } } + [Collection("compliance")] public class LightweightTcpTransportCompliance : SendingCompliance { - public LightweightTcpTransportCompliance() : base($"tcp://localhost:2288/incoming".ToUri()) + public LightweightTcpTransportCompliance() : base($"tcp://localhost:{PortFinder.GetAvailablePort()}/incoming".ToUri()) { - SenderIs(); + SenderIs(new Sender(PortFinder.GetAvailablePort())); - ReceiverIs(); + ReceiverIs(new Receiver(theOutboundAddress.Port)); } } } diff --git a/src/Jasper.Testing/Transports/Tcp/Protocol/ProtocolContext.cs b/src/Jasper.Testing/Transports/Tcp/Protocol/ProtocolContext.cs index 46a3ae3ed..b1676a80e 100644 --- a/src/Jasper.Testing/Transports/Tcp/Protocol/ProtocolContext.cs +++ b/src/Jasper.Testing/Transports/Tcp/Protocol/ProtocolContext.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Linq; using System.Net; using System.Net.Sockets; @@ -21,7 +21,7 @@ public abstract class ProtocolContext : IDisposable private readonly IPAddress theAddress = IPAddress.Loopback; private readonly int thePort = ++NextPort; private readonly ListeningAgent _listener; - private readonly Uri destination; + public readonly Uri Destination; private readonly OutgoingMessageBatch theMessageBatch; @@ -30,7 +30,7 @@ public abstract class ProtocolContext : IDisposable public ProtocolContext() { - destination = $"durable://localhost:{thePort}/incoming".ToUri(); + Destination = $"durable://localhost:{thePort}/incoming".ToUri(); _listener = new ListeningAgent(theReceiver, theAddress, thePort, "durable", CancellationToken.None); @@ -44,7 +44,7 @@ public ProtocolContext() outgoingMessage() }; - theMessageBatch = new OutgoingMessageBatch(destination, messages); + theMessageBatch = new OutgoingMessageBatch(Destination, messages); } public void Dispose() @@ -57,8 +57,8 @@ private Envelope outgoingMessage() { return new Envelope { - Destination = destination, - Data = new byte[] {1, 2, 3, 4, 5, 6, 7}, + Destination = Destination, + Data = new byte[] { 1, 2, 3, 4, 5, 6, 7 }, SentAt = DateTime.Today.ToUniversalTime() }; } @@ -69,10 +69,10 @@ protected async Task afterSending() using (var client = new TcpClient()) { - if (Dns.GetHostName() == destination.Host) - await client.ConnectAsync(IPAddress.Loopback, destination.Port); + if (Dns.GetHostName() == Destination.Host) + await client.ConnectAsync(IPAddress.Loopback, Destination.Port); - await client.ConnectAsync(destination.Host, destination.Port); + await client.ConnectAsync(Destination.Host, Destination.Port); await WireProtocol.Send(client.GetStream(), theMessageBatch, null, theSender); } diff --git a/src/Jasper/AssemblyAttributes.cs b/src/Jasper/AssemblyAttributes.cs index d56975d74..af98138a6 100644 --- a/src/Jasper/AssemblyAttributes.cs +++ b/src/Jasper/AssemblyAttributes.cs @@ -1,6 +1,5 @@ -using System.Runtime.CompilerServices; +using System.Runtime.CompilerServices; using Jasper.Attributes; -using Jasper.Configuration; using Lamar; [assembly: IgnoreAssembly] @@ -13,9 +12,12 @@ [assembly: InternalsVisibleTo("Jasper.Http")] [assembly: InternalsVisibleTo("Jasper.RabbitMq.Tests")] [assembly: InternalsVisibleTo("Jasper.AzureServiceBus")] +[assembly: InternalsVisibleTo("Jasper.ConfluentKafka")] [assembly: InternalsVisibleTo("Jasper.AzureServiceBus.Tests")] [assembly: InternalsVisibleTo("Jasper.Persistence.Testing")] [assembly: InternalsVisibleTo("Jasper.Persistence.Database")] [assembly: InternalsVisibleTo("Jasper.Persistence.Marten")] [assembly: InternalsVisibleTo("Jasper.Persistence.EntityFrameworkCore")] +[assembly: InternalsVisibleTo("Jasper.Pulsar")] +[assembly: InternalsVisibleTo("Jasper.Pulsar.Tests")] [assembly: InternalsVisibleTo("StorytellerSpecs")] diff --git a/src/Jasper/Configuration/TransportCollection.cs b/src/Jasper/Configuration/TransportCollection.cs index e4120ceeb..1d3a2b2f9 100644 --- a/src/Jasper/Configuration/TransportCollection.cs +++ b/src/Jasper/Configuration/TransportCollection.cs @@ -31,7 +31,10 @@ public ITransport TransportForScheme(string scheme) public void Add(ITransport transport) { - _transports.SmartAdd(transport.Protocol, transport); + foreach (var protocol in transport.Protocols) + { + _transports.SmartAdd(protocol, transport); + } } public T Get() where T : ITransport, new() diff --git a/src/Jasper/Logging/MessageLogger.cs b/src/Jasper/Logging/MessageLogger.cs index 2d5bab2a2..ba21f75a5 100644 --- a/src/Jasper/Logging/MessageLogger.cs +++ b/src/Jasper/Logging/MessageLogger.cs @@ -1,4 +1,4 @@ -using System; +using System; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -20,14 +20,14 @@ public class MessageLogger : IMessageLogger private readonly Action _executionStarted; private readonly ILogger _logger; - private readonly Action _messageFailed; - private readonly Action _messageSucceeded; + private readonly Action _messageFailed; + private readonly Action _messageSucceeded; private readonly IMetrics _metrics; private readonly Action _movedToErrorQueue; - private readonly Action _noHandler; + private readonly Action _noHandler; private readonly Action _noRoutes; - private readonly Action _received; - private readonly Action _sent; + private readonly Action _received; + private readonly Action _sent; private readonly Action _undeliverable; public static MessageLogger Empty() @@ -40,10 +40,10 @@ public MessageLogger(ILoggerFactory factory, IMetrics metrics) _metrics = metrics; _logger = factory.CreateLogger("Jasper.Messages"); - _sent = LoggerMessage.Define(LogLevel.Debug, SentEventId, + _sent = LoggerMessage.Define(LogLevel.Debug, SentEventId, "Enqueued for sending {Name}#{Id} to {Destination}"); - _received = LoggerMessage.Define(LogLevel.Debug, ReceivedEventId, + _received = LoggerMessage.Define(LogLevel.Debug, ReceivedEventId, "Received {Name}#{Id} at {Destination} from {ReplyUri}"); _executionStarted = LoggerMessage.Define(LogLevel.Debug, ExecutionStartedEventId, @@ -53,13 +53,13 @@ public MessageLogger(ILoggerFactory factory, IMetrics metrics) "Finished processing {Name}#{Id}"); _messageSucceeded = - LoggerMessage.Define(LogLevel.Information, MessageSucceededEventId, + LoggerMessage.Define(LogLevel.Information, MessageSucceededEventId, "Successfully processed message {Name}#{envelope} from {ReplyUri}"); - _messageFailed = LoggerMessage.Define(LogLevel.Error, MessageFailedEventId, + _messageFailed = LoggerMessage.Define(LogLevel.Error, MessageFailedEventId, "Failed to process message {Name}#{envelope} from {ReplyUri}"); - _noHandler = LoggerMessage.Define(LogLevel.Information, NoHandlerEventId, + _noHandler = LoggerMessage.Define(LogLevel.Information, NoHandlerEventId, "No known handler for {Name}#{Id} from {ReplyUri}"); _noRoutes = LoggerMessage.Define(LogLevel.Information, NoRoutesEventId, @@ -74,13 +74,13 @@ public MessageLogger(ILoggerFactory factory, IMetrics metrics) public virtual void Sent(Envelope envelope) { - _sent(_logger, envelope.GetMessageTypeName(), envelope.Id, envelope.Destination, null); + _sent(_logger, envelope.GetMessageTypeName(), envelope.Id, envelope.Destination?.ToString(), null); } public virtual void Received(Envelope envelope) { - _received(_logger, envelope.GetMessageTypeName(), envelope.Id, envelope.Destination, - envelope.ReplyUri, null); + _received(_logger, envelope.GetMessageTypeName(), envelope.Id, envelope.Destination?.ToString(), + envelope.ReplyUri?.ToString(), null); } public virtual void ExecutionStarted(Envelope envelope) @@ -96,18 +96,18 @@ public virtual void ExecutionFinished(Envelope envelope) public virtual void MessageSucceeded(Envelope envelope) { _metrics.MessageExecuted(envelope); - _messageSucceeded(_logger, envelope.GetMessageTypeName(), envelope.Id, envelope.ReplyUri, null); + _messageSucceeded(_logger, envelope.GetMessageTypeName(), envelope.Id, envelope.ReplyUri?.ToString(), null); } public virtual void MessageFailed(Envelope envelope, Exception ex) { _metrics.MessageExecuted(envelope); - _messageFailed(_logger, envelope.GetMessageTypeName(), envelope.Id, envelope.ReplyUri, ex); + _messageFailed(_logger, envelope.GetMessageTypeName(), envelope.Id, envelope.ReplyUri?.ToString(), ex); } public virtual void NoHandlerFor(Envelope envelope) { - _noHandler(_logger, envelope.GetMessageTypeName(), envelope.Id, envelope.ReplyUri, null); + _noHandler(_logger, envelope.GetMessageTypeName(), envelope.Id, envelope.ReplyUri?.ToString(), null); } public virtual void NoRoutesFor(Envelope envelope) diff --git a/src/Jasper/Persistence/Durability/DurableSendingAgent.cs b/src/Jasper/Persistence/Durability/DurableSendingAgent.cs index 846102eb7..57f17eb81 100644 --- a/src/Jasper/Persistence/Durability/DurableSendingAgent.cs +++ b/src/Jasper/Persistence/Durability/DurableSendingAgent.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Linq; using System.Threading; @@ -77,10 +77,9 @@ protected override async Task afterRestarting(ISender sender) foreach (var envelope in toRetry) { - await _sender.Enqueue(envelope); + await _sender.Send(envelope); } } - public override Task Successful(OutgoingMessageBatch outgoing) { return _policy.ExecuteAsync(c => _persistence.DeleteOutgoing(outgoing.Messages.ToArray()), _settings.Cancellation); @@ -97,7 +96,7 @@ protected override async Task storeAndForward(Envelope envelope) { await _persistence.StoreOutgoing(envelope, _settings.UniqueNodeId); - await _sender.Enqueue(envelope); + await _sender.Send(envelope); } } diff --git a/src/Jasper/Tracking/JasperHostMessageTrackingExtensions.cs b/src/Jasper/Tracking/JasperHostMessageTrackingExtensions.cs index 5f0d02c27..92b54ea44 100644 --- a/src/Jasper/Tracking/JasperHostMessageTrackingExtensions.cs +++ b/src/Jasper/Tracking/JasperHostMessageTrackingExtensions.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Threading.Tasks; using Baseline.Dates; using Jasper.Logging; @@ -33,6 +33,13 @@ public static TrackedSessionConfiguration TrackActivity(this IHost host) return new TrackedSessionConfiguration(session); } + public static TrackedSessionConfiguration TrackActivity(this IHost host, TimeSpan trackingTimeout) + { + var session = new TrackedSession(host); + session.Timeout = trackingTimeout; + return new TrackedSessionConfiguration(session); + } + /// /// Send a message through the service bus and wait until that message /// and all cascading messages have been successfully processed diff --git a/src/Jasper/Tracking/TrackedSession.cs b/src/Jasper/Tracking/TrackedSession.cs index 49045ffe1..58023fda7 100644 --- a/src/Jasper/Tracking/TrackedSession.cs +++ b/src/Jasper/Tracking/TrackedSession.cs @@ -45,6 +45,7 @@ public void WatchOther(IHost host) public TimeSpan Timeout { get; set; } = 5.Seconds(); public bool AssertNoExceptions { get; set; } = true; + public bool AssertNoTimeout { get; set; } = true; public Func Execution { get; set; } = c => Task.CompletedTask; @@ -166,9 +167,6 @@ public async Task ExecuteAndTrack() _stopwatch.Start(); - - - try { using (var scope = _primaryHost.Services.As().GetNestedContainer()) @@ -191,7 +189,7 @@ public async Task ExecuteAndTrack() if (AssertNoExceptions) AssertNoExceptionsWereThrown(); - AssertNotTimedOut(); + if (AssertNoExceptions) AssertNotTimedOut(); } public Task Track() @@ -262,12 +260,23 @@ public bool IsCompleted() public void LogException(Exception exception, string serviceName) { + Debug.WriteLine($"Exception Occurred in {serviceName}: {exception}"); _exceptions.Add(exception); } public void AddCondition(ITrackedCondition condition) { + Debug.WriteLine($"Condition Added: {condition}"); _conditions.Add(condition); } + + public override string ToString() + { + var conditionas = $"Conditions:\n{ _conditions.Select(x => x.ToString()).Join("\n")}"; + var activity = $"Activity:\n{ AllRecordsInOrder().Select(x => x.ToString()).Join("\n")}"; + var exceptions = $"Exceptions:\n{ _exceptions.Select(x => x.ToString()).Join("\n")}"; + + return $"{conditionas}\n\n{activity}\\{exceptions}"; + } } } diff --git a/src/Jasper/Tracking/TrackedSessionConfiguration.cs b/src/Jasper/Tracking/TrackedSessionConfiguration.cs index 941875db9..66671341b 100644 --- a/src/Jasper/Tracking/TrackedSessionConfiguration.cs +++ b/src/Jasper/Tracking/TrackedSessionConfiguration.cs @@ -68,6 +68,12 @@ public TrackedSessionConfiguration DoNotAssertOnExceptionsDetected() return this; } + public TrackedSessionConfiguration DoNotAssertTimeout() + { + _session.AssertNoTimeout = false; + return this; + } + public TrackedSessionConfiguration WaitForMessageToBeReceivedAt(IHost host) { var condition = new WaitForMessage @@ -93,6 +99,13 @@ public async Task ExecuteAndWait(Func ac return _session; } + public async Task ExecuteWithoutWaiting(Func action) + { + _session.Execution = action; + await _session.ExecuteAndTrack(); + return _session; + } + /// /// Invoke a message inline from the current Jasper application /// and wait for all cascading activity to complete diff --git a/src/Jasper/Transports/ITransport.cs b/src/Jasper/Transports/ITransport.cs index c34f82faf..bac03fcda 100644 --- a/src/Jasper/Transports/ITransport.cs +++ b/src/Jasper/Transports/ITransport.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using Jasper.Configuration; using Jasper.Runtime; @@ -7,7 +7,7 @@ namespace Jasper.Transports { public interface ITransport : IDisposable { - string Protocol { get; } + ICollection Protocols { get; } Endpoint ReplyEndpoint(); diff --git a/src/Jasper/Transports/ITransportProtocol.cs b/src/Jasper/Transports/ITransportProtocol.cs new file mode 100644 index 000000000..21d780f63 --- /dev/null +++ b/src/Jasper/Transports/ITransportProtocol.cs @@ -0,0 +1,19 @@ +namespace Jasper.Transports +{ + public interface ITransportProtocol + { + /// + /// Creates a transport message object from a Jasper Envelope + /// + /// + /// + TTransportMsg WriteFromEnvelope(Envelope envelope); + + /// + /// Creates an Envelope from the incoming transport message + /// + /// + /// + Envelope ReadEnvelope(TTransportMsg message); + } +} diff --git a/src/Jasper/Transports/Local/LightweightLocalSendingAgent.cs b/src/Jasper/Transports/Local/LightweightLocalSendingAgent.cs index 81c076680..4139a4c89 100644 --- a/src/Jasper/Transports/Local/LightweightLocalSendingAgent.cs +++ b/src/Jasper/Transports/Local/LightweightLocalSendingAgent.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Threading.Tasks; using Jasper.Configuration; using Jasper.Logging; diff --git a/src/Jasper/Transports/Local/LocalTransport.cs b/src/Jasper/Transports/Local/LocalTransport.cs index 5edb1dbc1..596f33898 100644 --- a/src/Jasper/Transports/Local/LocalTransport.cs +++ b/src/Jasper/Transports/Local/LocalTransport.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Linq; using Baseline; @@ -50,7 +50,7 @@ public void Initialize(IMessagingRoot root) } - public string Protocol { get; } = TransportConstants.Local; + public ICollection Protocols { get; } = new []{ TransportConstants.Local }; void ITransport.StartSenders(IMessagingRoot root, ITransportRuntime runtime) { diff --git a/src/Jasper/Transports/Sending/BatchedSender.cs b/src/Jasper/Transports/Sending/BatchedSender.cs index 9e8e4927d..44ceed35c 100644 --- a/src/Jasper/Transports/Sending/BatchedSender.cs +++ b/src/Jasper/Transports/Sending/BatchedSender.cs @@ -1,15 +1,14 @@ -using System; +using System; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using Jasper.Logging; using Jasper.Transports.Tcp; using Jasper.Transports.Util; -using LamarCodeGeneration.Frames; namespace Jasper.Transports.Sending { - public class BatchedSender : ISender + public class BatchedSender : ISender, ISenderRequiresCallback { private readonly CancellationToken _cancellation; private readonly ITransportLogger _logger; @@ -22,20 +21,12 @@ public class BatchedSender : ISender private ActionBlock _sender; private ActionBlock _serializing; - public BatchedSender(Uri destination, ISenderProtocol protocol, CancellationToken cancellation, - ITransportLogger logger) + public BatchedSender(Uri destination, ISenderProtocol protocol, CancellationToken cancellation, ITransportLogger logger) { Destination = destination; _protocol = protocol; _cancellation = cancellation; _logger = logger; - } - - public Uri Destination { get; } - - public void Start(ISenderCallback callback) - { - _callback = callback; _sender = new ActionBlock(SendBatch, new ExecutionDataflowBlockOptions { @@ -50,16 +41,16 @@ public void Start(ISenderCallback callback) }, _cancellation); _serializing = new ActionBlock(async e => + { + try { - try - { - await _batching.SendAsync(e); - } - catch (Exception ex) - { - _logger.LogException(ex, message: $"Error while trying to serialize envelope {e}"); - } - }, + await _batching.SendAsync(e); + } + catch (Exception ex) + { + _logger.LogException(ex, message: $"Error while trying to serialize envelope {e}"); + } + }, new ExecutionDataflowBlockOptions { CancellationToken = _cancellation, @@ -81,7 +72,7 @@ public void Start(ISenderCallback callback) return batch; }, new ExecutionDataflowBlockOptions - {BoundedCapacity = DataflowBlockOptions.Unbounded, MaxDegreeOfParallelism = 10, CancellationToken = _cancellation}); + { BoundedCapacity = DataflowBlockOptions.Unbounded, MaxDegreeOfParallelism = 10, CancellationToken = _cancellation }); _batchWriting.Completion.ContinueWith(x => { @@ -97,6 +88,8 @@ public void Start(ISenderCallback callback) }, _cancellation); } + public Uri Destination { get; } + public int QueuedCount => _queued + _batching.ItemCount + _serializing.InputCount; public bool Latched { get; private set; } @@ -119,7 +112,6 @@ public void Unlatch() { _logger.CircuitResumed(Destination); - Start(_callback); Latched = false; } @@ -133,7 +125,7 @@ public async Task Ping(CancellationToken cancellationToken) public bool SupportsNativeScheduledSend { get; } = true; - public Task Enqueue(Envelope message) + public Task Send(Envelope message) { if (_batching == null) throw new InvalidOperationException("This agent has not been started"); @@ -150,6 +142,8 @@ public void Dispose() _batching?.Dispose(); } + public void RegisterCallback(ISenderCallback senderCallback) => _callback = senderCallback; + public async Task SendBatch(OutgoingMessageBatch batch) { if (_cancellation.IsCancellationRequested) return; diff --git a/src/Jasper/Transports/Sending/ISender.cs b/src/Jasper/Transports/Sending/ISender.cs index cfb8ae9a6..d6d600330 100644 --- a/src/Jasper/Transports/Sending/ISender.cs +++ b/src/Jasper/Transports/Sending/ISender.cs @@ -1,32 +1,19 @@ -using System; +using System; using System.Threading; using System.Threading.Tasks; namespace Jasper.Transports.Sending { + public interface ISenderRequiresCallback : IDisposable + { + void RegisterCallback(ISenderCallback senderCallback); + } + public interface ISender : IDisposable { + bool SupportsNativeScheduledSend { get; } Uri Destination { get; } - - int QueuedCount { get; } - - bool Latched { get; } - void Start(ISenderCallback callback); - - Task Enqueue(Envelope envelope); - - Task LatchAndDrain(); - void Unlatch(); - - /// - /// Simply try to reach the endpoint to verify it can receive - /// - /// - /// Task Ping(CancellationToken cancellationToken); - - bool SupportsNativeScheduledSend { get; } - - + Task Send(Envelope envelope); } } diff --git a/src/Jasper/Transports/Sending/ISendingAgent.cs b/src/Jasper/Transports/Sending/ISendingAgent.cs index d9e1f615a..9429a1a0e 100644 --- a/src/Jasper/Transports/Sending/ISendingAgent.cs +++ b/src/Jasper/Transports/Sending/ISendingAgent.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Threading.Tasks; using Jasper.Configuration; @@ -15,8 +15,6 @@ public interface ISendingAgent : IDisposable bool SupportsNativeScheduledSend { get; } - - // This would be called in the future by the outbox, assuming // that the envelope is already persisted and just needs to be sent out Task EnqueueOutgoing(Envelope envelope); diff --git a/src/Jasper/Transports/Sending/LightweightSendingAgent.cs b/src/Jasper/Transports/Sending/LightweightSendingAgent.cs index 8ab9e2719..0d6c0b910 100644 --- a/src/Jasper/Transports/Sending/LightweightSendingAgent.cs +++ b/src/Jasper/Transports/Sending/LightweightSendingAgent.cs @@ -1,4 +1,4 @@ -using System.Collections.Generic; +using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Baseline; @@ -39,7 +39,7 @@ protected override Task afterRestarting(ISender sender) foreach (var envelope in toRetry) { // It's perfectly okay to not wait on the task here - _sender.Enqueue(envelope); + _sender.Send(envelope); } return Task.CompletedTask; @@ -57,7 +57,7 @@ public override Task Successful(Envelope outgoing) protected override Task storeAndForward(Envelope envelope) { - return _sender.Enqueue(envelope); + return _sender.Send(envelope); } public override bool IsDurable { get; } = false; diff --git a/src/Jasper/Transports/Sending/NulloSender.cs b/src/Jasper/Transports/Sending/NulloSender.cs index 860cfbbf2..73a4b8a5f 100644 --- a/src/Jasper/Transports/Sending/NulloSender.cs +++ b/src/Jasper/Transports/Sending/NulloSender.cs @@ -1,14 +1,11 @@ using System; using System.Threading; using System.Threading.Tasks; -using LamarCodeGeneration.Frames; namespace Jasper.Transports.Sending { - public class NulloSender : ISender + public class NulloSender : ISender { - private ISenderCallback _callback; - public NulloSender(Uri destination) { Destination = destination; @@ -18,35 +15,9 @@ public void Dispose() { } - public Uri Destination { get; } - public int QueuedCount => 0; - public bool Latched => false; - public void Start(ISenderCallback callback) - { - _callback = callback; - } - - public Task Enqueue(Envelope envelope) - { - _callback.Successful(envelope); - return Task.CompletedTask; - } - - public Task LatchAndDrain() - { - return Task.CompletedTask; - } - - public void Unlatch() - { - - } - - public Task Ping(CancellationToken cancellationToken) - { - return Task.FromResult(true); - } - public bool SupportsNativeScheduledSend { get; } = false; + public Uri Destination { get; } + public Task Send(Envelope envelope) => Task.CompletedTask; + public Task Ping(CancellationToken cancellationToken) => Task.FromResult(true); } } diff --git a/src/Jasper/Transports/Sending/SendingAgent.cs b/src/Jasper/Transports/Sending/SendingAgent.cs index b69dd9095..838040c26 100644 --- a/src/Jasper/Transports/Sending/SendingAgent.cs +++ b/src/Jasper/Transports/Sending/SendingAgent.cs @@ -1,7 +1,8 @@ -using System; +using System; using System.Linq; using System.Threading; using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; using Baseline; using Jasper.Configuration; using Jasper.Logging; @@ -18,14 +19,19 @@ public abstract class SendingAgent : ISendingAgent, ISenderCallback, ICircuit private int _failureCount; private CircuitWatcher _circuitWatcher; - public SendingAgent(ITransportLogger logger, IMessageLogger messageLogger, ISender sender, - AdvancedSettings settings, Endpoint endpoint) + public SendingAgent(ITransportLogger logger, IMessageLogger messageLogger, ISender sender, AdvancedSettings settings, Endpoint endpoint) { _logger = logger; _messageLogger = messageLogger; _sender = sender; _settings = settings; Endpoint = endpoint; + _sending = new ActionBlock(sendViaSender, Endpoint.ExecutionOptions); + + _sending.Completion.ContinueWith(t => + { + Console.WriteLine(t.Exception?.ToString()); + }); } public Endpoint Endpoint { get; } @@ -39,7 +45,7 @@ public void Dispose() _sender.Dispose(); } - public bool Latched => _sender.Latched; + public bool Latched { get; private set; } public abstract bool IsDurable { get; } private void setDefaults(Envelope envelope) @@ -52,8 +58,8 @@ private void setDefaults(Envelope envelope) public async Task EnqueueOutgoing(Envelope envelope) { setDefaults(envelope); - await _sender.Enqueue(envelope); - _messageLogger.Sent(envelope); + _sending.Post(envelope); + _messageLogger.Sent(envelope); } public async Task StoreAndForward(Envelope envelope) @@ -67,13 +73,69 @@ public async Task StoreAndForward(Envelope envelope) protected abstract Task storeAndForward(Envelope envelope); - public bool SupportsNativeScheduledSend => _sender.SupportsNativeScheduledSend; + public Task TryToResume(CancellationToken cancellationToken) + { + return _sender.Ping(cancellationToken); + } + TimeSpan ICircuit.RetryInterval => Endpoint.PingIntervalForCircuitResume; + + Task ICircuit.Resume(CancellationToken cancellationToken) + { + _circuitWatcher = null; + + Unlatch(); + + return afterRestarting(_sender); + } + + protected abstract Task afterRestarting(ISender sender); + + public abstract Task Successful(Envelope outgoing); + + private ActionBlock _sending; + public Task LatchAndDrain() + { + Latched = true; + + _sending.Complete(); + + _logger.CircuitBroken(Destination); + + return Task.CompletedTask; + } + + public void Unlatch() + { + _logger.CircuitResumed(Destination); + + Latched = false; + } + private async Task sendViaSender(Envelope envelope) + { + try + { + await _sender.Send(envelope); + + await Successful(envelope); + } + catch (Exception e) + { + try + { + await ((ISenderCallback)this).ProcessingFailure(envelope, e); + } + catch (Exception exception) + { + _logger.LogException(exception); + } + } + } public async Task MarkFailed(OutgoingMessageBatch batch) { // If it's already latched, just enqueue again - if (_sender.Latched) + if (Latched) { await EnqueueForRetry(batch); return; @@ -83,51 +145,35 @@ public async Task MarkFailed(OutgoingMessageBatch batch) if (_failureCount >= Endpoint.FailuresBeforeCircuitBreaks) { - await _sender.LatchAndDrain(); + await LatchAndDrain(); await EnqueueForRetry(batch); _circuitWatcher = new CircuitWatcher(this, _settings.Cancellation); - //_circuitWatcher = new CircuitWatcher(_sender, _endpoint.PingIntervalForCircuitResume, restartSending); } else { foreach (var envelope in batch.Messages) { #pragma warning disable 4014 - _sender.Enqueue(envelope); + _sender.Send(envelope); #pragma warning restore 4014 } } } - public Task TryToResume(CancellationToken cancellationToken) - { - return _sender.Ping(cancellationToken); - } - - TimeSpan ICircuit.RetryInterval => Endpoint.PingIntervalForCircuitResume; public abstract Task EnqueueForRetry(OutgoingMessageBatch batch); - Task ICircuit.Resume(CancellationToken cancellationToken) - { - _circuitWatcher = null; - - _sender.Unlatch(); - - return afterRestarting(_sender); - } - - protected abstract Task afterRestarting(ISender sender); public Task MarkSuccess() { _failureCount = 0; - _sender.Unlatch(); + Unlatch(); _circuitWatcher = null; return Task.CompletedTask; } + Task ISenderCallback.TimedOut(OutgoingMessageBatch outgoing) { @@ -160,7 +206,7 @@ Task ISenderCallback.ProcessingFailure(OutgoingMessageBatch outgoing) Task ISenderCallback.ProcessingFailure(Envelope outgoing, Exception exception) { - var batch = new OutgoingMessageBatch(outgoing.Destination, new[] {outgoing}); + var batch = new OutgoingMessageBatch(outgoing.Destination, new[] { outgoing }); _logger.OutgoingBatchFailed(batch, exception); return MarkFailed(batch); } @@ -180,8 +226,7 @@ Task ISenderCallback.SenderIsLatched(OutgoingMessageBatch outgoing) public abstract Task Successful(OutgoingMessageBatch outgoing); - public abstract Task Successful(Envelope outgoing); - + public bool SupportsNativeScheduledSend => _sender.SupportsNativeScheduledSend; } } diff --git a/src/Jasper/Transports/TransportBase.cs b/src/Jasper/Transports/TransportBase.cs index b9b322baa..99c546fb0 100644 --- a/src/Jasper/Transports/TransportBase.cs +++ b/src/Jasper/Transports/TransportBase.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Linq; using Jasper.Configuration; @@ -11,10 +11,18 @@ namespace Jasper.Transports { public TransportBase(string protocol) { - Protocol = protocol; + Protocols.Add(protocol); } - public string Protocol { get; } + public TransportBase(IEnumerable protocols) + { + foreach (string protocol in protocols) + { + Protocols.Add(protocol); + } + } + + public ICollection Protocols { get; } = new List(); public IEnumerable Endpoints() { @@ -82,7 +90,9 @@ public Endpoint GetOrCreateEndpoint(Uri uri) var endpoint = findEndpointByUri(canonicizeUri(uri)); - // It's coded this way so you don't override + if(endpoint == null) + + // It's coded this way so you don't override // durability if it's already set if (shouldBeDurable) endpoint.IsDurable = true; diff --git a/src/Jasper/Transports/TransportRuntime.cs b/src/Jasper/Transports/TransportRuntime.cs index 449720e23..94da66aa5 100644 --- a/src/Jasper/Transports/TransportRuntime.cs +++ b/src/Jasper/Transports/TransportRuntime.cs @@ -67,10 +67,14 @@ public ISendingAgent AddSubscriber(Uri replyUri, ISender sender, Endpoint endpoi : new LightweightSendingAgent(_root.TransportLogger, _root.MessageLogger, sender, _root.Settings, endpoint); agent.ReplyUri = replyUri; - sender.Start((ISenderCallback) agent); endpoint.Agent = agent; + if (sender is ISenderRequiresCallback senderRequiringCallback && agent is ISenderCallback callbackAgent) + { + senderRequiringCallback.RegisterCallback(callbackAgent); + } + AddSendingAgent(agent); AddSubscriber(endpoint); @@ -156,11 +160,9 @@ public void AddListener(IListener listener, Endpoint settings) ? (IWorkerQueue) new DurableWorkerQueue(settings, _root.Pipeline, _root.Settings, _root.Persistence, _root.TransportLogger) : new LightweightWorkerQueue(settings, _root.TransportLogger, _root.Pipeline, _root.Settings); - - + _listeners.Add(worker); - worker.StartListening(listener); } diff --git a/src/StorytellerSpecs/Fixtures/RetryAgentFixture.cs b/src/StorytellerSpecs/Fixtures/RetryAgentFixture.cs index c1d8ac453..d1fc1d265 100644 --- a/src/StorytellerSpecs/Fixtures/RetryAgentFixture.cs +++ b/src/StorytellerSpecs/Fixtures/RetryAgentFixture.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Linq; using System.Threading; @@ -38,11 +38,7 @@ void IDisposable.Dispose() { } - void ISender.Start(ISenderCallback callback) - { - } - - Task ISender.Enqueue(Envelope envelope) + Task ISender.Send(Envelope envelope) { _enqueued.Add(envelope); return Task.CompletedTask; @@ -50,17 +46,15 @@ Task ISender.Enqueue(Envelope envelope) Uri ISender.Destination { get; } - int ISender.QueuedCount { get; } - - bool ISender.Latched => _latched; + bool Latched => _latched; - Task ISender.LatchAndDrain() + Task LatchAndDrain() { _latched = true; return Task.CompletedTask; } - void ISender.Unlatch() + void Unlatch() { _unlatched = true; } diff --git a/src/StorytellerSpecs/Stub/StubEndpoint.cs b/src/StorytellerSpecs/Stub/StubEndpoint.cs index d79cb9cdc..ec7e1630b 100644 --- a/src/StorytellerSpecs/Stub/StubEndpoint.cs +++ b/src/StorytellerSpecs/Stub/StubEndpoint.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -17,7 +17,6 @@ public class StubEndpoint : Endpoint, ISendingAgent, ISender, IDisposable public readonly IList Callbacks = new List(); public readonly IList Sent = new List(); - private ISenderCallback _callback; private IMessageLogger _logger; private IHandlerPipeline _pipeline; private Uri _replyUri; @@ -31,44 +30,22 @@ public StubEndpoint(Uri destination, StubTransport stubTransport) } public Endpoint Endpoint => this; + public bool Latched { get; set; } public override Uri Uri => $"stub://{Name}".ToUri(); - - public int QueuedCount { get; } - - public void Start(ISenderCallback callback) - { - _callback = callback; - } - - public Task Enqueue(Envelope envelope) + + public Task Send(Envelope envelope) { Sent.Add(envelope); return _pipeline?.Invoke(envelope, new StubChannelCallback(this, envelope)) ?? Task.CompletedTask; } - public Task LatchAndDrain() - { - return Task.CompletedTask; - } - - public void Unlatch() - { - Latched = false; - } - - public Task Ping(CancellationToken cancellationToken) - { - return Task.FromResult(true); - } - + public Task Ping(CancellationToken cancellationToken) => Task.FromResult(true); public void Dispose() { } - public bool Latched { get; set; } - public Uri Destination { get; } Uri ISendingAgent.ReplyUri diff --git a/src/StorytellerSpecs/Stub/StubTransport.cs b/src/StorytellerSpecs/Stub/StubTransport.cs index e89e462d5..fac13474f 100644 --- a/src/StorytellerSpecs/Stub/StubTransport.cs +++ b/src/StorytellerSpecs/Stub/StubTransport.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Linq; using Baseline; @@ -67,7 +67,7 @@ public void Dispose() { } - public string Protocol { get; } = "stub"; + public ICollection Protocols { get; } = new []{"stub"}; public void StartSenders(IMessagingRoot root, ITransportRuntime runtime) { diff --git a/src/TestingSupport/Compliance/SendingCompliance.cs b/src/TestingSupport/Compliance/SendingCompliance.cs index cc3f20ddf..2e1f48464 100644 --- a/src/TestingSupport/Compliance/SendingCompliance.cs +++ b/src/TestingSupport/Compliance/SendingCompliance.cs @@ -2,6 +2,7 @@ using System.Data; using System.IO; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Baseline.Dates; using Jasper; @@ -14,6 +15,8 @@ using TestingSupport.ErrorHandling; using TestMessages; using Xunit; +using Xunit.Abstractions; +using Xunit.Sdk; namespace TestingSupport.Compliance { @@ -22,6 +25,8 @@ public abstract class SendingCompliance : IDisposable protected IHost theSender; protected IHost theReceiver; protected Uri theOutboundAddress; + private readonly ITestOutputHelper _testOutputHelper = new TestOutputHelper(); + private readonly TimeSpan _defaultTimeout = 5.Seconds(); protected readonly ErrorCausingMessage theMessage = new ErrorCausingMessage(); private ITrackedSession _session; @@ -31,10 +36,15 @@ protected SendingCompliance(Uri destination) theOutboundAddress = destination; } + protected SendingCompliance(Uri destination, TimeSpan defaultTimeout) + { + theOutboundAddress = destination; + _defaultTimeout = defaultTimeout; + } + public void SenderIs() where T : JasperOptions, new() { theSender = JasperHost.For(configureSender); - } public void TheOnlyAppIs() where T : JasperOptions, new() @@ -113,13 +123,13 @@ public void Dispose() public async Task can_apply_requeue_mechanics() { - var session = await theSender.TrackActivity() + var session = await theSender.TrackActivity(_defaultTimeout) .AlsoTrack(theReceiver) .DoNotAssertOnExceptionsDetected() .ExecuteAndWait(c => c.SendToDestination(theOutboundAddress, new Message2())); - + session.FindSingleTrackedMessageOfType(EventType.MessageSucceeded) .ShouldNotBeNull(); @@ -128,7 +138,7 @@ public async Task can_apply_requeue_mechanics() [Fact] public async Task can_send_from_one_node_to_another_by_destination() { - var session = await theSender.TrackActivity() + var session = await theSender.TrackActivity(_defaultTimeout) .AlsoTrack(theReceiver) .DoNotAssertOnExceptionsDetected() .ExecuteAndWait(c => c.SendToDestination(theOutboundAddress, new Message1())); @@ -143,7 +153,7 @@ public async Task can_send_from_one_node_to_another_by_publishing_rule() { var message1 = new Message1(); - var session = await theSender.TrackActivity() + var session = await theSender.TrackActivity(_defaultTimeout) .AlsoTrack(theReceiver) .DoNotAssertOnExceptionsDetected() .SendMessageAndWait(message1); @@ -156,7 +166,7 @@ public async Task can_send_from_one_node_to_another_by_publishing_rule() [Fact] public async Task tags_the_envelope_with_the_source() { - var session = await theSender.TrackActivity() + var session = await theSender.TrackActivity(_defaultTimeout) .AlsoTrack(theReceiver) .DoNotAssertOnExceptionsDetected() .ExecuteAndWait(c => c.SendToDestination(theOutboundAddress, new Message1())); @@ -175,7 +185,7 @@ public async Task tracking_correlation_id_on_everything() var id2 = Guid.Empty; var session2 = await theSender - .TrackActivity() + .TrackActivity(_defaultTimeout) .AlsoTrack(theReceiver) .ExecuteAndWait(async context => @@ -200,7 +210,7 @@ public async Task tracking_correlation_id_on_everything() public async Task schedule_send() { var session = await theSender - .TrackActivity() + .TrackActivity(_defaultTimeout) .AlsoTrack(theReceiver) .Timeout(15.Seconds()) .WaitForMessageToBeReceivedAt(theReceiver ?? theSender) @@ -219,7 +229,7 @@ public async Task schedule_send() protected async Task afterProcessingIsComplete() { _session = await theSender - .TrackActivity() + .TrackActivity(_defaultTimeout) .AlsoTrack(theReceiver) .DoNotAssertOnExceptionsDetected() .SendMessageAndWait(theMessage); @@ -232,7 +242,7 @@ protected async Task afterProcessingIsComplete() protected async Task shouldSucceedOnAttempt(int attempt) { var session = await theSender - .TrackActivity() + .TrackActivity(_defaultTimeout) .AlsoTrack(theReceiver) .Timeout(15.Seconds()) .DoNotAssertOnExceptionsDetected() @@ -266,7 +276,7 @@ protected async Task shouldSucceedOnAttempt(int attempt) protected async Task shouldMoveToErrorQueueOnAttempt(int attempt) { var session = await theSender - .TrackActivity() + .TrackActivity(_defaultTimeout) .AlsoTrack(theReceiver) .DoNotAssertOnExceptionsDetected() .Timeout(30.Seconds()) @@ -343,7 +353,7 @@ public async Task explicit_respond_to_sender() var ping = new PingMessage(); var session = await theSender - .TrackActivity() + .TrackActivity(_defaultTimeout) .AlsoTrack(theReceiver) .Timeout(30.Seconds()) .SendMessageAndWait(ping); @@ -358,7 +368,7 @@ public async Task requested_response() var ping = new ImplicitPing(); var session = await theSender - .TrackActivity() + .TrackActivity(_defaultTimeout) .AlsoTrack(theReceiver) .Timeout(30.Seconds()) .ExecuteAndWait(x => x.SendAndExpectResponseFor(ping));