diff --git a/Amqp/src/Akka.Streams.Amqp.Tests/AmqpConnectionProvidersTest.cs b/Amqp/src/Akka.Streams.Amqp.Tests/AmqpConnectionProvidersTest.cs new file mode 100644 index 000000000..3ef3bdfe2 --- /dev/null +++ b/Amqp/src/Akka.Streams.Amqp.Tests/AmqpConnectionProvidersTest.cs @@ -0,0 +1,256 @@ +using System; +using System.Collections.Generic; +using System.Text; +using FluentAssertions; +using RabbitMQ.Client; +using RabbitMQ.Client.Exceptions; +using Xunit; + +namespace Akka.Streams.Amqp.Tests +{ + public class AmqpConnectionProvidersTest : Akka.TestKit.Xunit2.TestKit + { + private readonly ActorMaterializer _mat; + public AmqpConnectionProvidersTest() + { + _mat = ActorMaterializer.Create(Sys); + } + + [Fact] + public void + The_AMQP_default_connection_providers_should_create_new_a_new_connection_per_invocation_of_LocalAmqpConnection() + { + var connectionProvider = AmqpLocalConnectionProvider.Instance; + var connection1 = connectionProvider.Get(); + var connection2 = connectionProvider.Get(); + Assert.NotEqual(connection1, connection2); + connectionProvider.Release(connection1); + connectionProvider.Release(connection2); + } + + [Fact] + public void Not_error_if_releasing_already_closed_LocalAmqpCOnnection() + { + var connectionProvider = AmqpLocalConnectionProvider.Instance; + var connection1 = connectionProvider.Get(); + var connection2 = connectionProvider.Get(); + connectionProvider.Release(connection1); + connectionProvider.Release(connection2); + } + + [Fact] + public void Create_new_connection_per_invocation_of_AmqpConnectionUri() + { + var connectionProvider = AmqpUriConnectionProvider.Create("amqp://localhost:5672"); + var connection1 = connectionProvider.Get(); + var connection2 = connectionProvider.Get(); + Assert.NotEqual(connection1, connection2); + connectionProvider.Release(connection1); + connectionProvider.Release(connection2); + } + + [Fact] + public void Not_error_if_releasing_already_closed_AmqpConnectionUri() + { + var connectionProvider = AmqpUriConnectionProvider.Create("amqp://localhost:5672"); + var connection1 = connectionProvider.Get(); + var connection2 = connectionProvider.Get(); + connectionProvider.Release(connection1); + connectionProvider.Release(connection2); + } + + [Fact] + public void Create_new_connection_per_invocation_of_AmqpConnectionDetails() + { + var connectionProvider = AmqpDetailsConnectionProvider.Create("localhost", 5672); + var connection1 = connectionProvider.Get(); + var connection2 = connectionProvider.Get(); + Assert.NotEqual(connection1, connection2); + connectionProvider.Release(connection1); + connectionProvider.Release(connection2); + } + + [Fact] + public void Not_error_if_releasing_already_closed_AmqpConnectionDetails() + { + var connectionProvider = AmqpDetailsConnectionProvider.Create("localhost", 5672); + var connection1 = connectionProvider.Get(); + var connection2 = connectionProvider.Get(); + connectionProvider.Release(connection1); + connectionProvider.Release(connection2); + } + + [Fact] + public void Create_new_connection_per_invocation_of_AmqpConnectionFactory() + { + var connectionFactory = new ConnectionFactory(); + var connectionProvider = AmqpConnectionFactoryConnectionProvider.Create(connectionFactory).WithHostsAndPorts(("localhost", 5672)); + var connection1 = connectionProvider.Get(); + var connection2 = connectionProvider.Get(); + Assert.NotEqual(connection1, connection2); + connectionProvider.Release(connection1); + connectionProvider.Release(connection2); + } + + [Fact] + public void Not_error_if_releasing_already_closed_AmqpConnectionFactory() + { + var connectionFactory = new ConnectionFactory(); + var connectionProvider = AmqpConnectionFactoryConnectionProvider.Create(connectionFactory).WithHostsAndPorts(("localhost", 5672)); + var connection1 = connectionProvider.Get(); + var connection2 = connectionProvider.Get(); + connectionProvider.Release(connection1); + connectionProvider.Release(connection2); + } + + [Fact] + public void + Reusable_connection_provider_with_automatic_release_should_reuse_the_same_connection_from_LocalAmqpConnection_and_release_it_when_last_client_disconnects() + { + var connectionProvider = AmqpLocalConnectionProvider.Instance; + var reusableConnectionProvider = AmqpCachedConnectionProvider.Create(connectionProvider); + var connection1 = reusableConnectionProvider.Get(); + var connection2 = reusableConnectionProvider.Get(); + Assert.Equal(connection1, connection2); + reusableConnectionProvider.Release(connection1); + Assert.True(connection1.IsOpen); + Assert.True(connection2.IsOpen); + reusableConnectionProvider.Release(connection2); + Assert.False(connection1.IsOpen); + Assert.False(connection2.IsOpen); + } + + [Fact] + public void Reuse_the_same_connection_from_AmqpConnectionUri_and_release_it_when_last_client_disconnects() + { + var connectionProvider = AmqpUriConnectionProvider.Create("amqp://localhost:5672"); + var reusableConnectionProvider = AmqpCachedConnectionProvider.Create(connectionProvider); + var connection1 = reusableConnectionProvider.Get(); + var connection2 = reusableConnectionProvider.Get(); + Assert.Equal(connection1, connection2); + reusableConnectionProvider.Release(connection1); + Assert.True(connection1.IsOpen); + Assert.True(connection2.IsOpen); + reusableConnectionProvider.Release(connection2); + Assert.False(connection1.IsOpen); + Assert.False(connection2.IsOpen); + } + + [Fact] + public void Reuse_the_same_connection_from_AmqpConnectionDetails_and_release_it_when_last_client_disconnects() + { + var connectionProvider = AmqpDetailsConnectionProvider.Create("localhost", 5672); + var reusableConnectionProvider = AmqpCachedConnectionProvider.Create(connectionProvider); + var connection1 = reusableConnectionProvider.Get(); + var connection2 = reusableConnectionProvider.Get(); + Assert.Equal(connection1, connection2); + reusableConnectionProvider.Release(connection1); + Assert.True(connection1.IsOpen); + Assert.True(connection2.IsOpen); + reusableConnectionProvider.Release(connection2); + Assert.False(connection1.IsOpen); + Assert.False(connection2.IsOpen); + } + + [Fact] + public void Reuse_the_same_connection_from_AmqpConnectionFactory_and_release_it_when_last_client_disconnects() + { + var connectionFactory = new ConnectionFactory(); + var connectionProvider = AmqpConnectionFactoryConnectionProvider.Create(connectionFactory) + .WithHostsAndPorts(("localhost", 5672)); + var reusableConnectionProvider = AmqpCachedConnectionProvider.Create(connectionProvider); + var connection1 = reusableConnectionProvider.Get(); + var connection2 = reusableConnectionProvider.Get(); + Assert.Equal(connection1, connection2); + reusableConnectionProvider.Release(connection1); + Assert.True(connection1.IsOpen); + Assert.True(connection2.IsOpen); + reusableConnectionProvider.Release(connection2); + Assert.False(connection1.IsOpen); + Assert.False(connection2.IsOpen); + } + + + [Fact] + public void + The_AMQP_reusable_connection_provider_without_automatic_release_should_reuse_same_connection_from_LocalAmqpConnection() + { + var connectionProvider = AmqpLocalConnectionProvider.Instance; + var reusableConnectionProvider = + AmqpCachedConnectionProvider.Create(connectionProvider, automaticRelease: false); + var connection1 = reusableConnectionProvider.Get(); + var connection2 = reusableConnectionProvider.Get(); + Assert.Equal(connection1, connection2); + reusableConnectionProvider.Release(connection2); + Assert.False(connection1.IsOpen); + Assert.False(connection2.IsOpen); + } + + [Fact] + public void Reuse_the_same_connection_from_AmqpConnectionUri() + { + var connectionProvider = AmqpUriConnectionProvider.Create("amqp://localhost:5672"); + var reusableConnectionProvider = + AmqpCachedConnectionProvider.Create(connectionProvider, automaticRelease: false); + var connection1 = reusableConnectionProvider.Get(); + var connection2 = reusableConnectionProvider.Get(); + Assert.Equal(connection1, connection2); + reusableConnectionProvider.Release(connection2); + Assert.False(connection1.IsOpen); + Assert.False(connection2.IsOpen); + } + + [Fact] + public void Reuse_the_same_connection_from_AmqpConnectionDetails() + { + var connectionProvider = AmqpDetailsConnectionProvider.Create("localhost", 5672); + var reusableConnectionProvider = + AmqpCachedConnectionProvider.Create(connectionProvider, automaticRelease: false); + var connection1 = reusableConnectionProvider.Get(); + var connection2 = reusableConnectionProvider.Get(); + Assert.Equal(connection1, connection2); + reusableConnectionProvider.Release(connection2); + Assert.False(connection1.IsOpen); + Assert.False(connection2.IsOpen); + } + + [Fact] + public void Reuse_the_same_connection_from_AmqpConnectionFactory() + { + var connectionFactory = new ConnectionFactory(); + var connectionProvider = AmqpConnectionFactoryConnectionProvider.Create(connectionFactory) + .WithHostsAndPorts(("localhost", 5672)); + var reusableConnectionProvider = + AmqpCachedConnectionProvider.Create(connectionProvider, automaticRelease: false); + var connection1 = reusableConnectionProvider.Get(); + var connection2 = reusableConnectionProvider.Get(); + Assert.Equal(connection1, connection2); + reusableConnectionProvider.Release(connection2); + Assert.False(connection1.IsOpen); + Assert.False(connection2.IsOpen); + } + + [Fact] + public void Not_leave_the_provider_in_an_invalid_state_if_getting_connection_fails() + { + var connectionProvider = AmqpDetailsConnectionProvider.Create("localhost", 5673); + var reusableConnectionProvider = AmqpCachedConnectionProvider.Create(connectionProvider, false); + try + { + reusableConnectionProvider.Get(); + } + catch (Exception e) + { + e.Should().BeOfType(); + } + try + { + reusableConnectionProvider.Get(); + } + catch (Exception e) + { + e.Should().BeOfType(); + } + } + } +} diff --git a/Amqp/src/Akka.Streams.Amqp.Tests/AmqpConnectorsTest.cs b/Amqp/src/Akka.Streams.Amqp.Tests/AmqpConnectorsTest.cs index 9e938c8c2..66775c7a1 100644 --- a/Amqp/src/Akka.Streams.Amqp.Tests/AmqpConnectorsTest.cs +++ b/Amqp/src/Akka.Streams.Amqp.Tests/AmqpConnectorsTest.cs @@ -18,14 +18,14 @@ namespace Akka.Streams.Amqp.Tests /// public class AmqpConnectorsTest : Akka.TestKit.Xunit2.TestKit { - private readonly AmqpConnectionDetails _connectionSettings; + private readonly AmqpConnectionProvider _connectionProvider; private readonly ActorMaterializer _mat; public AmqpConnectorsTest() { _mat = ActorMaterializer.Create(Sys); - _connectionSettings = - AmqpConnectionDetails.Create("localhost", 5672) + _connectionProvider = + AmqpDetailsConnectionProvider.Create("localhost", 5672) .WithAutomaticRecoveryEnabled(true) .WithNetworkRecoveryInterval(TimeSpan.FromSeconds(1)); } @@ -41,13 +41,13 @@ public void Publish_and_consume_elements_through_a_simple_queue_again_in_the_sam //create sink var amqpSink = AmqpSink.CreateSimple( - AmqpSinkSettings.Create(_connectionSettings) + AmqpSinkSettings.Create(_connectionProvider) .WithRoutingKey(queueName) .WithDeclarations(queueDeclaration)); //create source var amqpSource = AmqpSource.AtMostOnceSource( - NamedQueueSourceSettings.Create(DefaultAmqpConnection.Instance, queueName).WithDeclarations(queueDeclaration), + NamedQueueSourceSettings.Create(AmqpLocalConnectionProvider.Instance, queueName).WithDeclarations(queueDeclaration), bufferSize: 10); //run sink @@ -72,10 +72,10 @@ public void Publish_via_RPC_and_then_consume_through_a_simple_queue_again_in_the //#create-rpc-flow var amqpRpcFlow = AmqpRpcFlow.CreateSimple( - AmqpSinkSettings.Create(_connectionSettings).WithRoutingKey(queueName).WithDeclarations(queueDeclaration)); + AmqpSinkSettings.Create(_connectionProvider).WithRoutingKey(queueName).WithDeclarations(queueDeclaration)); //#create-rpc-flow - var amqpSource = AmqpSource.AtMostOnceSource(NamedQueueSourceSettings.Create(_connectionSettings, queueName), bufferSize: 1); + var amqpSource = AmqpSource.AtMostOnceSource(NamedQueueSourceSettings.Create(_connectionProvider, queueName), bufferSize: 1); var input = new[] {"one", "two", "three", "four", "five"}; @@ -92,7 +92,7 @@ public void Publish_via_RPC_and_then_consume_through_a_simple_queue_again_in_the //#run-rpc-flow rpcQueueNameTask.Result.Should().NotBeNullOrWhiteSpace("RPC flow materializes into response queue name"); - var amqpSink = AmqpSink.ReplyTo(AmqpReplyToSinkSettings.Create(_connectionSettings)); + var amqpSink = AmqpSink.ReplyTo(AmqpReplyToSinkSettings.Create(_connectionProvider)); amqpSource .Select(msg => new OutgoingMessage(msg.Bytes.Concat(ByteString.FromString("a")), false, false, msg.Properties)) @@ -111,9 +111,9 @@ public void Publish_via_RPC_which_expects_2_responses_per_message_and_then_consu var queueDeclaration = QueueDeclaration.Create(queueName); var amqpRpcFlow = AmqpRpcFlow.CreateSimple( - AmqpSinkSettings.Create(_connectionSettings).WithRoutingKey(queueName).WithDeclarations(queueDeclaration), repliesPerMessage: 2); + AmqpSinkSettings.Create(_connectionProvider).WithRoutingKey(queueName).WithDeclarations(queueDeclaration), repliesPerMessage: 2); - var amqpSource = AmqpSource.AtMostOnceSource(NamedQueueSourceSettings.Create(_connectionSettings, queueName), bufferSize: 1); + var amqpSource = AmqpSource.AtMostOnceSource(NamedQueueSourceSettings.Create(_connectionProvider, queueName), bufferSize: 1); var input = new[] {"one", "two", "three", "four", "five"}; @@ -128,7 +128,7 @@ public void Publish_via_RPC_which_expects_2_responses_per_message_and_then_consu var probe = t.Item2; rpcQueueF.Result.Should().NotBeNullOrWhiteSpace("RPC flow materializes into response queue name"); - var amqpSink = AmqpSink.ReplyTo(AmqpReplyToSinkSettings.Create(_connectionSettings)); + var amqpSink = AmqpSink.ReplyTo(AmqpReplyToSinkSettings.Create(_connectionProvider)); amqpSource .SelectMany(b => @@ -149,7 +149,7 @@ public void Publish_via_RPC_which_expects_2_responses_per_message_and_then_consu public void Correctly_close_a_AmqpRpcFlow_when_stream_is_closed_without_passing_any_elements() { Source.Empty() - .Via(AmqpRpcFlow.CreateSimple(AmqpSinkSettings.Create(_connectionSettings))) + .Via(AmqpRpcFlow.CreateSimple(AmqpSinkSettings.Create(_connectionProvider))) .RunWith(this.SinkProbe(), _mat) .EnsureSubscription() .ExpectComplete(); @@ -163,7 +163,7 @@ public void Handle_missing_reply_to_header_correctly() Source .Single(outgoingMessage) .WatchTermination(Keep.Right) - .To(AmqpSink.ReplyTo(AmqpReplyToSinkSettings.Create(_connectionSettings))) + .To(AmqpSink.ReplyTo(AmqpReplyToSinkSettings.Create(_connectionProvider))) .Run(_mat) .Wait(); @@ -171,7 +171,7 @@ public void Handle_missing_reply_to_header_correctly() { Source .Single(outgoingMessage) - .ToMaterialized(AmqpSink.ReplyTo(AmqpReplyToSinkSettings.Create(_connectionSettings, failIfReplyToMissing: true)), Keep.Right) + .ToMaterialized(AmqpSink.ReplyTo(AmqpReplyToSinkSettings.Create(_connectionProvider, failIfReplyToMissing: true)), Keep.Right) .Run(_mat) .Wait(); @@ -190,9 +190,9 @@ public void Not_fail_on_a_fast_producer_and_a_slow_consumer() var queueDeclaration = QueueDeclaration.Create(queueName); var amqpSource = AmqpSource.AtMostOnceSource( - NamedQueueSourceSettings.Create(_connectionSettings, queueName).WithDeclarations(queueDeclaration), bufferSize: 2); + NamedQueueSourceSettings.Create(_connectionProvider, queueName).WithDeclarations(queueDeclaration), bufferSize: 2); - var amqpSink = AmqpSink.CreateSimple(AmqpSinkSettings.Create(_connectionSettings).WithRoutingKey(queueName).WithDeclarations(queueDeclaration)); + var amqpSink = AmqpSink.CreateSimple(AmqpSinkSettings.Create(_connectionProvider).WithRoutingKey(queueName).WithDeclarations(queueDeclaration)); var publisher = this.CreatePublisherProbe(); var subscriber = this.CreateSubscriberProbe(); amqpSink.AddAttributes(Attributes.CreateInputBuffer(1, 1)).RunWith(Source.FromPublisher(publisher), _mat); @@ -241,7 +241,7 @@ public void Pub_sub_from_one_source_with_multiple_sinks() var exchangeDeclaration = ExchangeDeclaration.Create(exchangeName, "fanout"); //#create-exchange-sink - var amqpSink = AmqpSink.CreateSimple(AmqpSinkSettings.Create(_connectionSettings).WithExchange(exchangeName).WithDeclarations(exchangeDeclaration)); + var amqpSink = AmqpSink.CreateSimple(AmqpSinkSettings.Create(_connectionProvider).WithExchange(exchangeName).WithDeclarations(exchangeDeclaration)); //#create-exchange-source const int fanoutSize = 4; @@ -251,7 +251,7 @@ public void Pub_sub_from_one_source_with_multiple_sinks() .Aggregate(Source.Empty<(int, string)>(), (source, fanoutBranch) => source.Merge( AmqpSource.AtMostOnceSource( - TemporaryQueueSourceSettings.Create(_connectionSettings, exchangeName).WithDeclarations(exchangeDeclaration), + TemporaryQueueSourceSettings.Create(_connectionProvider, exchangeName).WithDeclarations(exchangeDeclaration), bufferSize: 1) .Select(msg => (branch: fanoutBranch, message: msg.Bytes.ToString()))) ); @@ -277,11 +277,11 @@ public void Publish_and_consume_elements_through_a_simple_queue_again_in_the_sam { var queueName = "amqp-conn-it-spec-simple-queue-" + Environment.TickCount; var queueDeclaration = QueueDeclaration.Create(queueName); - var amqpSink = AmqpSink.CreateSimple(AmqpSinkSettings.Create(_connectionSettings).WithRoutingKey(queueName).WithDeclarations(queueDeclaration)); + var amqpSink = AmqpSink.CreateSimple(AmqpSinkSettings.Create(_connectionProvider).WithRoutingKey(queueName).WithDeclarations(queueDeclaration)); //#create-source-withoutautoack var amqpSource = AmqpSource.CommittableSource( - NamedQueueSourceSettings.Create(_connectionSettings, queueName).WithDeclarations(queueDeclaration), + NamedQueueSourceSettings.Create(_connectionProvider, queueName).WithDeclarations(queueDeclaration), bufferSize: 10); //#create-source-withoutautoack @@ -308,13 +308,13 @@ public void Republish_message_without_autoAck_if_nack_is_sent() { var queueName = "amqp-conn-it-spec-simple-queue-" + Environment.TickCount; var queueDeclaration = QueueDeclaration.Create(queueName); - var amqpSink = AmqpSink.CreateSimple(AmqpSinkSettings.Create(_connectionSettings).WithRoutingKey(queueName).WithDeclarations(queueDeclaration)); + var amqpSink = AmqpSink.CreateSimple(AmqpSinkSettings.Create(_connectionProvider).WithRoutingKey(queueName).WithDeclarations(queueDeclaration)); var input = new[] {"one", "two", "three", "four", "five"}; Source.From(input).Select(ByteString.FromString).RunWith(amqpSink, _mat).Wait(); var amqpSource = AmqpSource.CommittableSource( - NamedQueueSourceSettings.Create(_connectionSettings, queueName).WithDeclarations(queueDeclaration), bufferSize: 10); + NamedQueueSourceSettings.Create(_connectionProvider, queueName).WithDeclarations(queueDeclaration), bufferSize: 10); //#run-source-withoutautoack-and-nack var result1 = amqpSource @@ -347,10 +347,10 @@ public void Keep_connection_open_if_downstream_closes_and_there_are_pending_acks var queueName = "amqp-conn-it-spec-simple-queue-" + Environment.TickCount; var queueDeclaration = QueueDeclaration.Create(queueName); - var amqpSink = AmqpSink.CreateSimple(AmqpSinkSettings.Create(_connectionSettings).WithRoutingKey(queueName).WithDeclarations(queueDeclaration)); + var amqpSink = AmqpSink.CreateSimple(AmqpSinkSettings.Create(_connectionProvider).WithRoutingKey(queueName).WithDeclarations(queueDeclaration)); var amqpSource = AmqpSource.CommittableSource( - NamedQueueSourceSettings.Create(_connectionSettings, queueName).WithDeclarations(queueDeclaration), bufferSize: 10); + NamedQueueSourceSettings.Create(_connectionProvider, queueName).WithDeclarations(queueDeclaration), bufferSize: 10); var input = new[] {"one", "two", "three", "four", "five"}; Source.From(input).Select(ByteString.FromString).RunWith(amqpSink, _mat).Wait(); @@ -367,12 +367,12 @@ public void Not_republish_message_without_autoAck_false_if_nack_is_sent() { var queueName = "amqp-conn-it-spec-simple-queue-" + Environment.TickCount; var queueDeclaration = QueueDeclaration.Create(queueName); - var amqpSink = AmqpSink.CreateSimple(AmqpSinkSettings.Create(_connectionSettings).WithRoutingKey(queueName).WithDeclarations(queueDeclaration)); + var amqpSink = AmqpSink.CreateSimple(AmqpSinkSettings.Create(_connectionProvider).WithRoutingKey(queueName).WithDeclarations(queueDeclaration)); var input = new[] {"one", "two", "three", "four", "five"}; Source.From(input).Select(ByteString.FromString).RunWith(amqpSink, _mat).Wait(); var amqpSource = AmqpSource.CommittableSource( - NamedQueueSourceSettings.Create(_connectionSettings, queueName).WithDeclarations(queueDeclaration), bufferSize: 10); + NamedQueueSourceSettings.Create(_connectionProvider, queueName).WithDeclarations(queueDeclaration), bufferSize: 10); var result1 = amqpSource .SelectAsync(1, async cm => @@ -406,7 +406,7 @@ public void Publish_via_RPC_and_then_consume_through_a_simple_queue_again_in_the var input = new[] {"one", "two", "three", "four", "five"}; var amqpRpcFlow = AmqpRpcFlow.CommittableFlow( - AmqpSinkSettings.Create(_connectionSettings).WithRoutingKey(queueName).WithDeclarations(queueDeclaration), bufferSize: 10); + AmqpSinkSettings.Create(_connectionProvider).WithRoutingKey(queueName).WithDeclarations(queueDeclaration), bufferSize: 10); var t = Source.From(input) @@ -425,9 +425,9 @@ public void Publish_via_RPC_and_then_consume_through_a_simple_queue_again_in_the var probe = t.Item2; rpcQueueF.Wait(); - var amqpSink = AmqpSink.ReplyTo(AmqpReplyToSinkSettings.Create(_connectionSettings)); + var amqpSink = AmqpSink.ReplyTo(AmqpReplyToSinkSettings.Create(_connectionProvider)); - var amqpSource = AmqpSource.AtMostOnceSource(NamedQueueSourceSettings.Create(_connectionSettings, queueName), bufferSize: 1); + var amqpSource = AmqpSource.AtMostOnceSource(NamedQueueSourceSettings.Create(_connectionProvider, queueName), bufferSize: 1); amqpSource .Select(b => new OutgoingMessage(b.Bytes, false, false, b.Properties)) @@ -448,12 +448,12 @@ public void Set_routing_key_per_message_and_consume_them_in_the_same_process() var bindingDeclaration = BindingDeclaration.Create(queueName, exchangeName).WithRoutingKey(GetRoutingKey("*")); var amqpSink = AmqpSink.Create( - AmqpSinkSettings.Create(_connectionSettings) + AmqpSinkSettings.Create(_connectionProvider) .WithExchange(exchangeName) .WithDeclarations(exchangeDeclaration, queueDeclaration, bindingDeclaration)); var amqpSource = AmqpSource.AtMostOnceSource( - NamedQueueSourceSettings.Create(_connectionSettings, queueName).WithDeclarations(exchangeDeclaration, queueDeclaration, bindingDeclaration), + NamedQueueSourceSettings.Create(_connectionProvider, queueName).WithDeclarations(exchangeDeclaration, queueDeclaration, bindingDeclaration), bufferSize: 10); var input = new[] {"one", "two", "three", "four", "five"}; @@ -479,7 +479,7 @@ public void Publish_from_one_source_and_consume_elements_with_multiple_sinks() var queueName = "amqp-conn-it-spec-work-queues-" + Environment.TickCount; var queueDeclaration = QueueDeclaration.Create(queueName); var amqpSink = AmqpSink.CreateSimple( - AmqpSinkSettings.Create(_connectionSettings) + AmqpSinkSettings.Create(_connectionProvider) .WithRoutingKey(queueName) .WithDeclarations(queueDeclaration)); @@ -494,7 +494,7 @@ public void Publish_from_one_source_and_consume_elements_with_multiple_sinks() { var source = b.Add( AmqpSource.AtMostOnceSource( - NamedQueueSourceSettings.Create(_connectionSettings, queueName).WithDeclarations(queueDeclaration), + NamedQueueSourceSettings.Create(_connectionProvider, queueName).WithDeclarations(queueDeclaration), bufferSize: 1)); b.From(source.Outlet).To(merge.In(n)); @@ -517,7 +517,7 @@ public void Publish_elements_with_flow_then_consume_them_with_source() var amqpFlow = AmqpFlow.Create( - AmqpSinkSettings.Create(_connectionSettings) + AmqpSinkSettings.Create(_connectionProvider) .WithRoutingKey(queueName) .WithDeclarations(queueDeclaration)); @@ -536,7 +536,7 @@ public void Publish_elements_with_flow_then_consume_them_with_source() var consumed = AmqpSource.AtMostOnceSource( - NamedQueueSourceSettings.Create(DefaultAmqpConnection.Instance, queueName).WithDeclarations(queueDeclaration), + NamedQueueSourceSettings.Create(AmqpLocalConnectionProvider.Instance, queueName).WithDeclarations(queueDeclaration), bufferSize: 10) .Select(m => m.Bytes.ToString(Encoding.UTF8)) .Take(input.Length) @@ -551,7 +551,7 @@ public void Publish_elements_with_flow_then_consume_them_with_source() public void Correctly_close_a_AmqpFlow_when_stream_is_closed_without_passing_any_elements() { Source.Empty<(OutgoingMessage, int)>() - .Via(AmqpFlow.Create(AmqpSinkSettings.Create(_connectionSettings))) + .Via(AmqpFlow.Create(AmqpSinkSettings.Create(_connectionProvider))) .RunWith(this.SinkProbe(), _mat) .EnsureSubscription() .ExpectComplete(); @@ -569,12 +569,12 @@ public void Set_routing_key_per_message_while_publishing_with_flow_and_consume_t var bindingDeclaration = BindingDeclaration.Create(queueName, exchangeName).WithRoutingKey(GetRoutingKey("*")); var amqpFlow = AmqpFlow.Create( - AmqpSinkSettings.Create(_connectionSettings) + AmqpSinkSettings.Create(_connectionProvider) .WithExchange(exchangeName) .WithDeclarations(exchangeDeclaration, queueDeclaration, bindingDeclaration)); var amqpSource = AmqpSource.AtMostOnceSource( - NamedQueueSourceSettings.Create(_connectionSettings, queueName).WithDeclarations(exchangeDeclaration, queueDeclaration, bindingDeclaration), + NamedQueueSourceSettings.Create(_connectionProvider, queueName).WithDeclarations(exchangeDeclaration, queueDeclaration, bindingDeclaration), bufferSize: 10); var input = new[] {"one", "two", "three", "four", "five"}; diff --git a/Amqp/src/Akka.Streams.Amqp/AmqpConnectionProvider.cs b/Amqp/src/Akka.Streams.Amqp/AmqpConnectionProvider.cs new file mode 100644 index 000000000..fb5cc8100 --- /dev/null +++ b/Amqp/src/Akka.Streams.Amqp/AmqpConnectionProvider.cs @@ -0,0 +1,419 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using Akka.Util; +using RabbitMQ.Client; + +namespace Akka.Streams.Amqp +{ + /// + /// Only for internal implementations + /// + public abstract class AmqpConnectionProvider + { + public abstract IConnection Get(); + + public virtual void Release(IConnection connection) + { + if(connection.IsOpen) + connection.Close(); + } + } + + /// + /// Connects to a local AMQP broker at the default port with no password. + /// + public sealed class AmqpLocalConnectionProvider : AmqpConnectionProvider + { + private AmqpLocalConnectionProvider() + { + + } + public static AmqpLocalConnectionProvider Instance=> new AmqpLocalConnectionProvider(); + public override IConnection Get() => new ConnectionFactory().CreateConnection(); + } + + public sealed class AmqpUriConnectionProvider : AmqpConnectionProvider + { + private readonly Uri _uri; + + public static AmqpUriConnectionProvider Create(Uri uri)=> new AmqpUriConnectionProvider(uri); + + public static AmqpConnectionProvider Create(string uri)=> new AmqpUriConnectionProvider(new Uri(uri)); + + private AmqpUriConnectionProvider(Uri uri) + { + _uri = uri; + } + public override IConnection Get() + { + var factory = new ConnectionFactory(); + factory.Uri = _uri; + return factory.CreateConnection(); + } + } + + public sealed class AmqpDetailsConnectionProvider : AmqpConnectionProvider + { + private AmqpDetailsConnectionProvider(IReadOnlyList<(string host, int port)> hostAndPortList, + AmqpCredentials? credentials = null, + string virtualHost = null, + SslOption ssl = null, + ushort? requestedHeartbeat = null, + TimeSpan? connectionTimeout = null, + TimeSpan? handshakeTimeout = null, + TimeSpan? networkRecoveryInterval = null, + bool? automaticRecoveryEnabled = null, + bool? topologyRecoveryEnabled = null, + string connectionName = null) + { + HostAndPortList = hostAndPortList; + Credentials = credentials; + VirtualHost = virtualHost; + Ssl = ssl; + RequestedHeartbeat = requestedHeartbeat; + ConnectionTimeout = connectionTimeout; + HandshakeTimeout = handshakeTimeout; + NetworkRecoveryInterval = networkRecoveryInterval; + AutomaticRecoveryEnabled = automaticRecoveryEnabled; + TopologyRecoveryEnabled = topologyRecoveryEnabled; + ConnectionName = connectionName; + } + + public IReadOnlyList<(string host, int port)> HostAndPortList { get; } + public AmqpCredentials? Credentials { get; } + public string VirtualHost { get; } + public SslOption Ssl { get; } + public ushort? RequestedHeartbeat { get; } + public TimeSpan? ConnectionTimeout { get; } + public TimeSpan? HandshakeTimeout { get; } + public TimeSpan? NetworkRecoveryInterval { get; } + public bool? AutomaticRecoveryEnabled { get; } + public bool? TopologyRecoveryEnabled { get; } + public string ConnectionName { get; } + + public static AmqpDetailsConnectionProvider Create(string host, int port) => + new AmqpDetailsConnectionProvider(new List<(string host, int port)> { (host, port) }); + + public AmqpDetailsConnectionProvider WithHostsAndPorts((string host, int port) hostAndPort, + params (string host, int port)[] hostAndPortList) + { + return new AmqpDetailsConnectionProvider(new List<(string host, int port)>(hostAndPortList.ToList()) { hostAndPort }, + Credentials, VirtualHost, Ssl, RequestedHeartbeat, + ConnectionTimeout, HandshakeTimeout, NetworkRecoveryInterval, AutomaticRecoveryEnabled, + TopologyRecoveryEnabled, ConnectionName); + } + + public AmqpDetailsConnectionProvider WithCredentials(AmqpCredentials credentials) + { + return new AmqpDetailsConnectionProvider(HostAndPortList, credentials, VirtualHost, Ssl, RequestedHeartbeat, + ConnectionTimeout, HandshakeTimeout, NetworkRecoveryInterval, AutomaticRecoveryEnabled, + TopologyRecoveryEnabled, ConnectionName); + } + + public AmqpDetailsConnectionProvider WithVirtualHost(string virtualHost) + { + return new AmqpDetailsConnectionProvider(HostAndPortList, Credentials, virtualHost, Ssl, RequestedHeartbeat, + ConnectionTimeout, HandshakeTimeout, NetworkRecoveryInterval, AutomaticRecoveryEnabled, + TopologyRecoveryEnabled, ConnectionName); + } + + public AmqpDetailsConnectionProvider WithSsl(SslOption sslOption) + { + return new AmqpDetailsConnectionProvider(HostAndPortList, Credentials, VirtualHost, sslOption, RequestedHeartbeat, + ConnectionTimeout, HandshakeTimeout, NetworkRecoveryInterval, AutomaticRecoveryEnabled, + TopologyRecoveryEnabled, ConnectionName); + } + + public AmqpDetailsConnectionProvider WithRequestedHeartbeat(ushort requestedHeartbeat) + { + return new AmqpDetailsConnectionProvider(HostAndPortList, Credentials, VirtualHost, Ssl, requestedHeartbeat, + ConnectionTimeout, HandshakeTimeout, NetworkRecoveryInterval, AutomaticRecoveryEnabled, + TopologyRecoveryEnabled, ConnectionName); + } + + public AmqpDetailsConnectionProvider WithConnectionTimeout(TimeSpan connectionTimeout) + { + return new AmqpDetailsConnectionProvider(HostAndPortList, Credentials, VirtualHost, Ssl, RequestedHeartbeat, + connectionTimeout, HandshakeTimeout, NetworkRecoveryInterval, AutomaticRecoveryEnabled, + TopologyRecoveryEnabled, ConnectionName); + } + + public AmqpDetailsConnectionProvider WithHandshakeTimeout(TimeSpan handshakeTimeout) + { + return new AmqpDetailsConnectionProvider(HostAndPortList, Credentials, VirtualHost, Ssl, RequestedHeartbeat, + ConnectionTimeout, handshakeTimeout, NetworkRecoveryInterval, AutomaticRecoveryEnabled, + TopologyRecoveryEnabled, ConnectionName); + } + + public AmqpDetailsConnectionProvider WithNetworkRecoveryInterval(TimeSpan networkRecoveryInterval) + { + return new AmqpDetailsConnectionProvider(HostAndPortList, Credentials, VirtualHost, Ssl, RequestedHeartbeat, + ConnectionTimeout, HandshakeTimeout, networkRecoveryInterval, AutomaticRecoveryEnabled, + TopologyRecoveryEnabled, ConnectionName); + } + + public AmqpDetailsConnectionProvider WithAutomaticRecoveryEnabled(bool automaticRecoveryEnabled) + { + return new AmqpDetailsConnectionProvider(HostAndPortList, Credentials, VirtualHost, Ssl, RequestedHeartbeat, + ConnectionTimeout, HandshakeTimeout, NetworkRecoveryInterval, automaticRecoveryEnabled, + TopologyRecoveryEnabled, ConnectionName); + } + + public AmqpDetailsConnectionProvider WithTopologyRecoveryEnabled(bool topologyRecoveryEnabled) + { + return new AmqpDetailsConnectionProvider(HostAndPortList, Credentials, VirtualHost, Ssl, RequestedHeartbeat, + ConnectionTimeout, HandshakeTimeout, NetworkRecoveryInterval, AutomaticRecoveryEnabled, + topologyRecoveryEnabled, ConnectionName); + } + + public AmqpDetailsConnectionProvider WithConnectionName(string connectionName) + { + return new AmqpDetailsConnectionProvider(HostAndPortList, Credentials, VirtualHost, Ssl, RequestedHeartbeat, + ConnectionTimeout, HandshakeTimeout, NetworkRecoveryInterval, AutomaticRecoveryEnabled, + TopologyRecoveryEnabled, connectionName); + } + + public override IConnection Get() + { + var factory = new ConnectionFactory(); + if (Credentials.HasValue) + { + factory.UserName = Credentials.Value.Username; + factory.Password = Credentials.Value.Password; + } + if (!string.IsNullOrEmpty(VirtualHost)) + factory.VirtualHost = VirtualHost; + if (Ssl != null) + factory.Ssl = Ssl; + if (AutomaticRecoveryEnabled.HasValue) + factory.AutomaticRecoveryEnabled = AutomaticRecoveryEnabled.Value; + if (RequestedHeartbeat.HasValue) + factory.RequestedHeartbeat = RequestedHeartbeat.Value; + if (NetworkRecoveryInterval.HasValue) + factory.NetworkRecoveryInterval = NetworkRecoveryInterval.Value; + if (TopologyRecoveryEnabled.HasValue) + factory.TopologyRecoveryEnabled = TopologyRecoveryEnabled.Value; + if (ConnectionTimeout.HasValue) + factory.ContinuationTimeout = ConnectionTimeout.Value; + if (HandshakeTimeout.HasValue) + factory.HandshakeContinuationTimeout = HandshakeTimeout.Value; + return factory.CreateConnection( + new DefaultEndpointResolver(HostAndPortList.Select(hp => new AmqpTcpEndpoint(hp.host, hp.port))), + ConnectionName ?? ""); + } + + public override string ToString() + { + return + $"AmqpDetailsConnectionProvider(HostAndPortList=({HostAndPortList.Select(x => $"[{x.host}:{x.port}]").Aggregate((left, right) => $"{right}, {left}")}), Credentials={Credentials}, VirtualHost={VirtualHost})"; + } + } + + public sealed class AmqpConnectionFactoryConnectionProvider : AmqpConnectionProvider + { + private readonly ConnectionFactory _factory; + private readonly IReadOnlyList<(string host, int port)> _hostAndPortList; + + public static AmqpConnectionFactoryConnectionProvider Create(ConnectionFactory factory) => + new AmqpConnectionFactoryConnectionProvider(factory); + + private AmqpConnectionFactoryConnectionProvider(ConnectionFactory factory, + IReadOnlyList<(string host, int port)> hostAndPortList = null) + { + _factory = factory; + _hostAndPortList = hostAndPortList?? new List<(string host, int port)>(); + } + + public IReadOnlyList<(string host, int port)> HostAndPortList => _hostAndPortList.Any() + ? _hostAndPortList.ToList() + : new List<(string host, int port)> {(_factory.HostName, _factory.Port)}; + + public AmqpConnectionFactoryConnectionProvider WithHostsAndPorts((string host, int port) hostAndPort, + params (string host, int port)[] hostAndPortList) + { + return new AmqpConnectionFactoryConnectionProvider(_factory, + new List<(string host, int port)>(hostAndPortList.ToList()) {hostAndPort}); + } + + public override IConnection Get() + { + return _factory.CreateConnection(HostAndPortList.Select(hp => new AmqpTcpEndpoint(hp.host, hp.port)) + .ToList()); + } + } + + public sealed class AmqpCachedConnectionProvider : AmqpConnectionProvider + { + private AtomicReference _state = new AtomicReference(Empty.Instance); + + public AmqpConnectionProvider Provider { get; } + public bool AutomaticRelease { get; } + + public static AmqpCachedConnectionProvider + Create(AmqpConnectionProvider provider, bool automaticRelease = true) => + new AmqpCachedConnectionProvider(provider, automaticRelease); + + private AmqpCachedConnectionProvider(AmqpConnectionProvider provider, bool automaticRelease = true) + { + Provider = provider; + AutomaticRelease = automaticRelease; + } + + public override IConnection Get() + { + var state = _state.Value; + switch (state) + { + case Empty x: + { + if (_state.CompareAndSet(Empty.Instance, Connecting.Instance)) + { + try + { + var connection = Provider.Get(); + if (!_state.CompareAndSet(Connecting.Instance, new Connected(connection, 1))) + throw new InvalidOperationException( + "Unexpected concurrent modification while creating the connection."); + return connection; + } + catch (InvalidOperationException) + { + throw; + } + catch (Exception) + { + _state.CompareAndSet(Connecting.Instance, Empty.Instance); + throw; + } + } + return Get(); + } + case Connecting x: + { + return Get(); + } + case Connected c: + { + if (_state.CompareAndSet(c, new Connected(c.Connection, c.Clients + 1))) + return c.Connection; + return Get(); + } + case Closing x: + return Get(); + default: + throw new Exception("invalid state!"); + } + } + + public override void Release(IConnection connection) + { + var state = _state.Value; + switch (state) + { + case Empty x: + { + throw new InvalidOperationException("There is no connection to release."); + } + case Connecting x: + Release(connection); + break; + case Connected c: + { + if(!c.Connection.Equals(connection)) + throw new InvalidOperationException("Can't release a connection that's not owned by this provider"); + if (c.Clients == 1 || !AutomaticRelease) + { + if (_state.CompareAndSet(c, Closing.Instance)) + { + Provider.Release(connection); + if (!_state.CompareAndSet(Closing.Instance, Empty.Instance)) + throw new InvalidOperationException( + "Unexpected concurrent modification while closing the connection."); + } + break; + } + else + { + if(!_state.CompareAndSet(c, new Connected(c.Connection, c.Clients - 1))) + Release(connection); + break; + } + } + case Closing x: + { + Release(connection); + break; + } + default: + throw new Exception("invalid state!"); + } + } + + + private interface IState { } + + private sealed class Empty : IState + { + public static Empty Instance { get; } = new Empty(); + } + + private sealed class Connecting : IState + { + public static Connecting Instance { get; } = new Connecting(); + } + + private sealed class Connected : IState, IEquatable + { + public IConnection Connection { get; } + public int Clients { get; } + + public Connected(IConnection connection, int clients) + { + Connection = connection; + Clients = clients; + } + + public bool Equals(Connected other) + { + if (ReferenceEquals(null, other)) return false; + if (ReferenceEquals(this, other)) return true; + return Equals(Connection, other.Connection) && Clients == other.Clients; + } + + public override bool Equals(object obj) + { + if (ReferenceEquals(null, obj)) return false; + if (ReferenceEquals(this, obj)) return true; + return obj is Connected && Equals((Connected) obj); + } + + public override int GetHashCode() + { + unchecked + { + return ((Connection != null ? Connection.GetHashCode() : 0) * 397) ^ Clients; + } + } + + public static bool operator ==(Connected left, Connected right) + { + return Equals(left, right); + } + + public static bool operator !=(Connected left, Connected right) + { + return !Equals(left, right); + } + } + + private sealed class Closing : IState + { + public static Closing Instance { get; } = new Closing(); + } + + } + + +} diff --git a/Amqp/src/Akka.Streams.Amqp/AmqpConnector.cs b/Amqp/src/Akka.Streams.Amqp/AmqpConnector.cs index d10332f8e..3d669e8b6 100644 --- a/Amqp/src/Akka.Streams.Amqp/AmqpConnector.cs +++ b/Amqp/src/Akka.Streams.Amqp/AmqpConnector.cs @@ -6,72 +6,6 @@ namespace Akka.Streams.Amqp { - /// - /// Internal API - /// - internal class AmqpConnector - { - public static IConnectionFactory ConnectionFactoryFrom(IAmqpConnectionSettings settings) - { - var factory = new ConnectionFactory(); - switch (settings) - { - case AmqpConnectionUri connectionUri: - factory.Uri = connectionUri.Uri; - break; - case AmqpConnectionDetails details: - { - if (details.Credentials.HasValue) - { - factory.UserName = details.Credentials.Value.Username; - factory.Password = details.Credentials.Value.Password; - } - if (!string.IsNullOrEmpty(details.VirtualHost)) - factory.VirtualHost = details.VirtualHost; - if (details.Ssl != null) - factory.Ssl = details.Ssl; - if (details.AutomaticRecoveryEnabled.HasValue) - factory.AutomaticRecoveryEnabled = details.AutomaticRecoveryEnabled.Value; - if (details.RequestedHeartbeat.HasValue) - factory.RequestedHeartbeat = details.RequestedHeartbeat.Value; - if (details.NetworkRecoveryInterval.HasValue) - factory.NetworkRecoveryInterval = details.NetworkRecoveryInterval.Value; - if (details.TopologyRecoveryEnabled.HasValue) - factory.TopologyRecoveryEnabled = details.TopologyRecoveryEnabled.Value; - if (details.ConnectionTimeout.HasValue) - factory.ContinuationTimeout = details.ConnectionTimeout.Value; - if (details.HandshakeTimeout.HasValue) - factory.HandshakeContinuationTimeout = details.HandshakeTimeout.Value; - break; - } - case DefaultAmqpConnection defaultConnection: - //leave it be as is - break; - } - return factory; - } - - public static IConnection NewConnection(IConnectionFactory factory, IAmqpConnectionSettings settings) - { - switch (settings) - { - case AmqpConnectionDetails details: - { - if (details.HostAndPortList.Count > 0) - { - return factory.CreateConnection(details.HostAndPortList - .Select(pair => new AmqpTcpEndpoint(pair.host, pair.port)).ToList()); - } - else - { - throw new ArgumentException("You need to supply at least one host/port pair."); - } - } - default: - return factory.CreateConnection(); - } - } - } /// /// Internal API @@ -90,10 +24,6 @@ protected AmqpConnectorLogic(Shape shape) public abstract IAmqpConnectorSettings Settings { get; } - public abstract IConnectionFactory ConnectionFactoryFrom(IAmqpConnectionSettings settings); - - public abstract IConnection NewConnection(IConnectionFactory factory, IAmqpConnectionSettings settings); - public abstract void WhenConnected(); public abstract void OnFailure(Exception ex); @@ -102,8 +32,7 @@ public override void PreStart() { try { - var factory = ConnectionFactoryFrom(Settings.ConnectionSettings); - Connection = NewConnection(factory, Settings.ConnectionSettings); + Connection = Settings.ConnectionProvider.Get(); Channel = Connection.CreateModel(); ShutdownCallback = GetAsyncCallback(args => { @@ -168,9 +97,8 @@ public override void PostStop() } if (Connection != null) { - if(Connection.IsOpen) - Connection.Close(); Connection.ConnectionShutdown -= OnConnectionShutdown; + Settings.ConnectionProvider.Release(Connection); Connection = null; } } diff --git a/Amqp/src/Akka.Streams.Amqp/AmqpFlowStage.cs b/Amqp/src/Akka.Streams.Amqp/AmqpFlowStage.cs index 637318d4f..4f6416a3d 100644 --- a/Amqp/src/Akka.Streams.Amqp/AmqpFlowStage.cs +++ b/Amqp/src/Akka.Streams.Amqp/AmqpFlowStage.cs @@ -79,13 +79,7 @@ public Logic(AmqpFlowStage stage, TaskCompletionSource promi public string RoutingKey => _stage.Settings.RoutingKey ?? ""; public override IAmqpConnectorSettings Settings => _stage.Settings; - - public override IConnectionFactory ConnectionFactoryFrom(IAmqpConnectionSettings settings) => - AmqpConnector.ConnectionFactoryFrom(settings); - - public override IConnection NewConnection(IConnectionFactory factory, IAmqpConnectionSettings settings) => - AmqpConnector.NewConnection(factory, settings); - + public override void WhenConnected() { _shutdownCallback = GetAsyncCallback(args => diff --git a/Amqp/src/Akka.Streams.Amqp/AmqpReplyToSinkStage.cs b/Amqp/src/Akka.Streams.Amqp/AmqpReplyToSinkStage.cs index db831a2e2..04bd99353 100644 --- a/Amqp/src/Akka.Streams.Amqp/AmqpReplyToSinkStage.cs +++ b/Amqp/src/Akka.Streams.Amqp/AmqpReplyToSinkStage.cs @@ -76,12 +76,7 @@ public AmqpSinkStageLogic(AmqpReplyToSinkStage stage, TaskCompletionSource public override IAmqpConnectorSettings Settings => _stage.Settings; - public override IConnectionFactory ConnectionFactoryFrom(IAmqpConnectionSettings settings) => - AmqpConnector.ConnectionFactoryFrom(settings); - - public override IConnection NewConnection(IConnectionFactory factory, IAmqpConnectionSettings settings) => - AmqpConnector.NewConnection(factory, settings); - + public override void WhenConnected() { _shutdownCallback = GetAsyncCallback(args => diff --git a/Amqp/src/Akka.Streams.Amqp/AmqpRpcFlowStage.cs b/Amqp/src/Akka.Streams.Amqp/AmqpRpcFlowStage.cs index 66ba0d058..ff50d5237 100644 --- a/Amqp/src/Akka.Streams.Amqp/AmqpRpcFlowStage.cs +++ b/Amqp/src/Akka.Streams.Amqp/AmqpRpcFlowStage.cs @@ -121,13 +121,7 @@ int ExpectedResponses() } public override IAmqpConnectorSettings Settings => _stage.Settings; - - public override IConnectionFactory ConnectionFactoryFrom(IAmqpConnectionSettings settings) => - AmqpConnector.ConnectionFactoryFrom(settings); - - public override IConnection NewConnection(IConnectionFactory factory, IAmqpConnectionSettings settings) => - AmqpConnector.NewConnection(factory, settings); - + public override void WhenConnected() { var shutdownCallback = GetAsyncCallback<(string consumerTag, ShutdownEventArgs args)>(tuple => diff --git a/Amqp/src/Akka.Streams.Amqp/AmqpSinkStage.cs b/Amqp/src/Akka.Streams.Amqp/AmqpSinkStage.cs index 0af80f1d1..1cb06f326 100644 --- a/Amqp/src/Akka.Streams.Amqp/AmqpSinkStage.cs +++ b/Amqp/src/Akka.Streams.Amqp/AmqpSinkStage.cs @@ -66,14 +66,7 @@ public AmqpSinkStageLogic(AmqpSinkStage stage, TaskCompletionSource promis public string RoutingKey => _stage.Settings.RoutingKey ?? ""; - public override IConnectionFactory ConnectionFactoryFrom(IAmqpConnectionSettings settings) - { - return AmqpConnector.ConnectionFactoryFrom(settings); - } - - public override IConnection NewConnection(IConnectionFactory factory, IAmqpConnectionSettings settings) => - AmqpConnector.NewConnection(factory, settings); - + public override void WhenConnected() { _shutdownCallback = GetAsyncCallback(args => diff --git a/Amqp/src/Akka.Streams.Amqp/AmqpSourceStage.cs b/Amqp/src/Akka.Streams.Amqp/AmqpSourceStage.cs index ec1198da7..fae7d5f73 100644 --- a/Amqp/src/Akka.Streams.Amqp/AmqpSourceStage.cs +++ b/Amqp/src/Akka.Streams.Amqp/AmqpSourceStage.cs @@ -67,13 +67,7 @@ public AmqpSourceStageLogic(AmqpSourceStage stage) : base(stage.Shape) } public override IAmqpConnectorSettings Settings => _stage.Settings; - - public override IConnectionFactory ConnectionFactoryFrom(IAmqpConnectionSettings settings) => - AmqpConnector.ConnectionFactoryFrom(settings); - - public override IConnection NewConnection(IConnectionFactory factory, IAmqpConnectionSettings settings) => - AmqpConnector.NewConnection(factory, settings); - + public override void WhenConnected() { // we have only one consumer per connection so global is ok diff --git a/Amqp/src/Akka.Streams.Amqp/Model.cs b/Amqp/src/Akka.Streams.Amqp/Model.cs index 186ab7a1b..7518b241d 100644 --- a/Amqp/src/Akka.Streams.Amqp/Model.cs +++ b/Amqp/src/Akka.Streams.Amqp/Model.cs @@ -7,7 +7,7 @@ namespace Akka.Streams.Amqp { public interface IAmqpConnectorSettings { - IAmqpConnectionSettings ConnectionSettings { get; } + AmqpConnectionProvider ConnectionProvider { get; } IReadOnlyList Declarations { get; } } @@ -17,12 +17,12 @@ public interface IAmqpSourceSettings : IAmqpConnectorSettings public sealed class NamedQueueSourceSettings : IAmqpSourceSettings { - private NamedQueueSourceSettings(IAmqpConnectionSettings connectionSettings, string queue, + private NamedQueueSourceSettings(AmqpConnectionProvider connectionProvider, string queue, IReadOnlyList declarations = null, bool noLocal = false, bool exclusive = false, string consumerTag = null, IReadOnlyDictionary arguments = null) { - ConnectionSettings = connectionSettings; + ConnectionProvider = connectionProvider; Queue = queue; Declarations = declarations ?? new List(); NoLocal = noLocal; @@ -31,7 +31,7 @@ private NamedQueueSourceSettings(IAmqpConnectionSettings connectionSettings, str Arguments = arguments ?? new Dictionary(); } - public IAmqpConnectionSettings ConnectionSettings { get; } + public AmqpConnectionProvider ConnectionProvider { get; } public string Queue { get; } public IReadOnlyList Declarations { get; } public bool NoLocal { get; } @@ -39,282 +39,131 @@ private NamedQueueSourceSettings(IAmqpConnectionSettings connectionSettings, str public string ConsumerTag { get; } public IReadOnlyDictionary Arguments { get; } - public static NamedQueueSourceSettings Create(IAmqpConnectionSettings connectionSettings, string queue) + public static NamedQueueSourceSettings Create(AmqpConnectionProvider connectionProvider, string queue) { - return new NamedQueueSourceSettings(connectionSettings, queue); + return new NamedQueueSourceSettings(connectionProvider, queue); } public NamedQueueSourceSettings WithDeclarations(params IDeclaration[] declarations) { - return new NamedQueueSourceSettings(ConnectionSettings, Queue, declarations, NoLocal, Exclusive, ConsumerTag, Arguments); + return new NamedQueueSourceSettings(ConnectionProvider, Queue, declarations, NoLocal, Exclusive, ConsumerTag, Arguments); } public NamedQueueSourceSettings WithNoLocal(bool noLocal) { - return new NamedQueueSourceSettings(ConnectionSettings, Queue, Declarations, noLocal, Exclusive, ConsumerTag, Arguments); + return new NamedQueueSourceSettings(ConnectionProvider, Queue, Declarations, noLocal, Exclusive, ConsumerTag, Arguments); } public NamedQueueSourceSettings WithExclusive(bool exclusive) { - return new NamedQueueSourceSettings(ConnectionSettings, Queue, Declarations, NoLocal, exclusive, ConsumerTag, Arguments); + return new NamedQueueSourceSettings(ConnectionProvider, Queue, Declarations, NoLocal, exclusive, ConsumerTag, Arguments); } public NamedQueueSourceSettings WithConsumerTag(string consumerTag) { - return new NamedQueueSourceSettings(ConnectionSettings, Queue, Declarations, NoLocal, Exclusive, consumerTag, Arguments); + return new NamedQueueSourceSettings(ConnectionProvider, Queue, Declarations, NoLocal, Exclusive, consumerTag, Arguments); } public NamedQueueSourceSettings WithArguments(params KeyValuePair[] arguments) { - return new NamedQueueSourceSettings(ConnectionSettings, Queue, Declarations, NoLocal, Exclusive, ConsumerTag, + return new NamedQueueSourceSettings(ConnectionProvider, Queue, Declarations, NoLocal, Exclusive, ConsumerTag, arguments.ToDictionary(key => key.Key, val => val.Value)); } public NamedQueueSourceSettings WithArguments(params (string paramName, object paramValue)[] arguments) { - return new NamedQueueSourceSettings(ConnectionSettings, Queue, Declarations, NoLocal, Exclusive, ConsumerTag, + return new NamedQueueSourceSettings(ConnectionProvider, Queue, Declarations, NoLocal, Exclusive, ConsumerTag, arguments.ToDictionary(key => key.paramName, val => val.paramValue)); } public NamedQueueSourceSettings WithArguments(string key, object value) { - return new NamedQueueSourceSettings(ConnectionSettings, Queue, Declarations, NoLocal, Exclusive, + return new NamedQueueSourceSettings(ConnectionProvider, Queue, Declarations, NoLocal, Exclusive, ConsumerTag, new Dictionary {{key, value}}); } public override string ToString() => - $"NamedQueueSourceSettings(ConnectionSettings={ConnectionSettings}, Queue={Queue}, Declarations={Declarations.Count}, NoLocal={NoLocal}, Exclusive={Exclusive}, Arguments={Arguments.Count})"; + $"NamedQueueSourceSettings(ConnectionSettings={ConnectionProvider}, Queue={Queue}, Declarations={Declarations.Count}, NoLocal={NoLocal}, Exclusive={Exclusive}, Arguments={Arguments.Count})"; } public sealed class TemporaryQueueSourceSettings : IAmqpSourceSettings { - private TemporaryQueueSourceSettings(IAmqpConnectionSettings connectionSettings, string exchange, + private TemporaryQueueSourceSettings(AmqpConnectionProvider connectionProvider, string exchange, IReadOnlyList declarations = null, string routingKey = null) { - ConnectionSettings = connectionSettings; + ConnectionProvider = connectionProvider; Exchange = exchange; Declarations = declarations ?? new List(); RoutingKey = routingKey; } - public IAmqpConnectionSettings ConnectionSettings { get; } + public AmqpConnectionProvider ConnectionProvider { get; } public string Exchange { get; } public IReadOnlyList Declarations { get; } public string RoutingKey { get; } - public static TemporaryQueueSourceSettings Create(IAmqpConnectionSettings connectionSettings, string exchange) => - new TemporaryQueueSourceSettings(connectionSettings, exchange); + public static TemporaryQueueSourceSettings Create(AmqpConnectionProvider connectionProvider, string exchange) => + new TemporaryQueueSourceSettings(connectionProvider, exchange); public TemporaryQueueSourceSettings WithRoutingKey(string routingKey) => - new TemporaryQueueSourceSettings(ConnectionSettings, Exchange, Declarations, routingKey); + new TemporaryQueueSourceSettings(ConnectionProvider, Exchange, Declarations, routingKey); public TemporaryQueueSourceSettings WithDeclarations(params IDeclaration[] declarations) => - new TemporaryQueueSourceSettings(ConnectionSettings, Exchange, declarations, RoutingKey); + new TemporaryQueueSourceSettings(ConnectionProvider, Exchange, declarations, RoutingKey); public override string ToString() => - $"TemporaryQueueSourceSettings(ConnectionSettings={ConnectionSettings},Exchange={Exchange}, Declarations={Declarations.Count}, RoutingKey={RoutingKey})"; + $"TemporaryQueueSourceSettings(ConnectionSettings={ConnectionProvider},Exchange={Exchange}, Declarations={Declarations.Count}, RoutingKey={RoutingKey})"; } public sealed class AmqpReplyToSinkSettings : IAmqpConnectorSettings { - private AmqpReplyToSinkSettings(IAmqpConnectionSettings connectionSettings, bool failIfReplyToMissing = true) + private AmqpReplyToSinkSettings(AmqpConnectionProvider connectionProvider, bool failIfReplyToMissing = true) { - ConnectionSettings = connectionSettings; + ConnectionProvider = connectionProvider; FailIfReplyToMissing = failIfReplyToMissing; Declarations = new List(); } - public IAmqpConnectionSettings ConnectionSettings { get; } + public AmqpConnectionProvider ConnectionProvider { get; } public bool FailIfReplyToMissing { get; } public IReadOnlyList Declarations { get; } - public static AmqpReplyToSinkSettings Create(IAmqpConnectionSettings connectionSettings, bool failIfReplyToMissing = true) + public static AmqpReplyToSinkSettings Create(AmqpConnectionProvider connectionProvider, bool failIfReplyToMissing = true) { - return new AmqpReplyToSinkSettings(connectionSettings, failIfReplyToMissing); + return new AmqpReplyToSinkSettings(connectionProvider, failIfReplyToMissing); } } public sealed class AmqpSinkSettings : IAmqpConnectorSettings { - private AmqpSinkSettings(IAmqpConnectionSettings connectionSettings, string exchange = null, + private AmqpSinkSettings(AmqpConnectionProvider connectionProvider, string exchange = null, string routingKey = null, IReadOnlyList declarations = null) { - ConnectionSettings = connectionSettings; + ConnectionProvider = connectionProvider; Exchange = exchange; RoutingKey = routingKey; Declarations = declarations ?? new List(); } - public IAmqpConnectionSettings ConnectionSettings { get; } + public AmqpConnectionProvider ConnectionProvider { get; } public string Exchange { get; } public string RoutingKey { get; } public IReadOnlyList Declarations { get; } - public static AmqpSinkSettings Create(IAmqpConnectionSettings connectionSettings = null) => - new AmqpSinkSettings(connectionSettings ?? DefaultAmqpConnection.Instance); + public static AmqpSinkSettings Create(AmqpConnectionProvider connectionProvider) => + new AmqpSinkSettings(connectionProvider); - public AmqpSinkSettings WithExchange(string exchange) => new AmqpSinkSettings(ConnectionSettings, exchange, RoutingKey, Declarations); + public AmqpSinkSettings WithExchange(string exchange) => new AmqpSinkSettings(ConnectionProvider, exchange, RoutingKey, Declarations); - public AmqpSinkSettings WithRoutingKey(string routingKey) => new AmqpSinkSettings(ConnectionSettings, Exchange, routingKey, Declarations); + public AmqpSinkSettings WithRoutingKey(string routingKey) => new AmqpSinkSettings(ConnectionProvider, Exchange, routingKey, Declarations); public AmqpSinkSettings WithDeclarations(params IDeclaration[] declarations) => - new AmqpSinkSettings(ConnectionSettings, Exchange, RoutingKey, declarations); + new AmqpSinkSettings(ConnectionProvider, Exchange, RoutingKey, declarations); public override string ToString() => - $"AmqpSinkSettings(ConnectionSettings={ConnectionSettings}, Exchange={Exchange}, RoutingKey={RoutingKey}, Delcarations={Declarations.Count})"; + $"AmqpSinkSettings(ConnectionSettings={ConnectionProvider}, Exchange={Exchange}, RoutingKey={RoutingKey}, Delcarations={Declarations.Count})"; } - - /// - /// Only for internal implementations - /// - public interface IAmqpConnectionSettings - { - } - - /// - /// Connects to a local AMQP broker at the default port with no password. - /// - // ReSharper disable once InheritdocConsiderUsage - public class DefaultAmqpConnection : IAmqpConnectionSettings - { - public static IAmqpConnectionSettings Instance => new DefaultAmqpConnection(); - } - - public sealed class AmqpConnectionUri : IAmqpConnectionSettings - { - private AmqpConnectionUri(Uri uri) - { - Uri = uri; - } - - public Uri Uri { get; } - - public static AmqpConnectionUri Create(string uri) => new AmqpConnectionUri(new Uri(uri)); - public static AmqpConnectionUri Create(Uri uri) => new AmqpConnectionUri(uri); - - public override string ToString() => $"AmqpConnectionUri(Uri={Uri})"; - } - - public sealed class AmqpConnectionDetails : IAmqpConnectionSettings - { - private AmqpConnectionDetails(IReadOnlyList<(string host, int port)> hostAndPortList, - AmqpCredentials? credentials = null, - string virtualHost = null, - SslOption ssl = null, - ushort? requestedHeartbeat = null, - TimeSpan? connectionTimeout = null, - TimeSpan? handshakeTimeout = null, - TimeSpan? networkRecoveryInterval = null, - bool? automaticRecoveryEnabled = null, - bool? topologyRecoveryEnabled = null) - { - HostAndPortList = hostAndPortList; - Credentials = credentials; - VirtualHost = virtualHost; - Ssl = ssl; - RequestedHeartbeat = requestedHeartbeat; - ConnectionTimeout = connectionTimeout; - HandshakeTimeout = handshakeTimeout; - NetworkRecoveryInterval = networkRecoveryInterval; - AutomaticRecoveryEnabled = automaticRecoveryEnabled; - TopologyRecoveryEnabled = topologyRecoveryEnabled; - } - - public IReadOnlyList<(string host, int port)> HostAndPortList { get; } - public AmqpCredentials? Credentials { get; } - public string VirtualHost { get; } - public SslOption Ssl { get; } - public ushort? RequestedHeartbeat { get; } - public TimeSpan? ConnectionTimeout { get; } - public TimeSpan? HandshakeTimeout { get; } - public TimeSpan? NetworkRecoveryInterval { get; } - public bool? AutomaticRecoveryEnabled { get; } - public bool? TopologyRecoveryEnabled { get; } - - public static AmqpConnectionDetails Create(string host, int port) => - new AmqpConnectionDetails(new List<(string host, int port)> {(host, port)}); - - public AmqpConnectionDetails WithHostsAndPorts((string host, int port) hostAndPort, - params (string host, int port)[] hostAndPortList) - { - return new AmqpConnectionDetails(new List<(string host, int port)>(hostAndPortList.ToList()) {hostAndPort}, - Credentials, VirtualHost, Ssl, RequestedHeartbeat, - ConnectionTimeout, HandshakeTimeout, NetworkRecoveryInterval, AutomaticRecoveryEnabled, - TopologyRecoveryEnabled); - } - - public AmqpConnectionDetails WithCredentials(AmqpCredentials credentials) - { - return new AmqpConnectionDetails(HostAndPortList, credentials, VirtualHost, Ssl, RequestedHeartbeat, - ConnectionTimeout, HandshakeTimeout, NetworkRecoveryInterval, AutomaticRecoveryEnabled, - TopologyRecoveryEnabled); - } - - public AmqpConnectionDetails WithVirtualHost(string virtualHost) - { - return new AmqpConnectionDetails(HostAndPortList, Credentials, virtualHost, Ssl, RequestedHeartbeat, - ConnectionTimeout, HandshakeTimeout, NetworkRecoveryInterval, AutomaticRecoveryEnabled, - TopologyRecoveryEnabled); - } - - public AmqpConnectionDetails WithSsl(SslOption sslOption) - { - return new AmqpConnectionDetails(HostAndPortList, Credentials, VirtualHost, sslOption, RequestedHeartbeat, - ConnectionTimeout, HandshakeTimeout, NetworkRecoveryInterval, AutomaticRecoveryEnabled, - TopologyRecoveryEnabled); - } - - public AmqpConnectionDetails WithRequestedHeartbeat(ushort requestedHeartbeat) - { - return new AmqpConnectionDetails(HostAndPortList, Credentials, VirtualHost, Ssl, requestedHeartbeat, - ConnectionTimeout, HandshakeTimeout, NetworkRecoveryInterval, AutomaticRecoveryEnabled, - TopologyRecoveryEnabled); - } - - public AmqpConnectionDetails WithConnectionTimeout(TimeSpan connectionTimeout) - { - return new AmqpConnectionDetails(HostAndPortList, Credentials, VirtualHost, Ssl, RequestedHeartbeat, - connectionTimeout, HandshakeTimeout, NetworkRecoveryInterval, AutomaticRecoveryEnabled, - TopologyRecoveryEnabled); - } - - public AmqpConnectionDetails WithHandshakeTimeout(TimeSpan handshakeTimeout) - { - return new AmqpConnectionDetails(HostAndPortList, Credentials, VirtualHost, Ssl, RequestedHeartbeat, - ConnectionTimeout, handshakeTimeout, NetworkRecoveryInterval, AutomaticRecoveryEnabled, - TopologyRecoveryEnabled); - } - - public AmqpConnectionDetails WithNetworkRecoveryInterval(TimeSpan networkRecoveryInterval) - { - return new AmqpConnectionDetails(HostAndPortList, Credentials, VirtualHost, Ssl, RequestedHeartbeat, - ConnectionTimeout, HandshakeTimeout, networkRecoveryInterval, AutomaticRecoveryEnabled, - TopologyRecoveryEnabled); - } - - public AmqpConnectionDetails WithAutomaticRecoveryEnabled(bool automaticRecoveryEnabled) - { - return new AmqpConnectionDetails(HostAndPortList, Credentials, VirtualHost, Ssl, RequestedHeartbeat, - ConnectionTimeout, HandshakeTimeout, NetworkRecoveryInterval, automaticRecoveryEnabled, - TopologyRecoveryEnabled); - } - - public AmqpConnectionDetails WithTopologyRecoveryEnabled(bool topologyRecoveryEnabled) - { - return new AmqpConnectionDetails(HostAndPortList, Credentials, VirtualHost, Ssl, RequestedHeartbeat, - ConnectionTimeout, HandshakeTimeout, NetworkRecoveryInterval, AutomaticRecoveryEnabled, - topologyRecoveryEnabled); - } - - - public override string ToString() - { - return - $"AmqpConnectionDetails(HostAndPortList=({HostAndPortList.Select(x => $"[{x.host}:{x.port}]").Aggregate((left, right) => $"{right}, {left}")}), Credentials={Credentials}, VirtualHost={VirtualHost})"; - } - } - public struct AmqpCredentials : IEquatable { public string Username { get; }