diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index f3837b14329..d1613bb6cac 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,3 +1,23 @@ +#### 1.4.8 June 17 2020 #### +**Maintenance Release for Akka.NET 1.4** + +Akka.NET v1.4.8 features some important bug fixes for Akka.NET v1.4: + +* [Akka: fix issue with setting IActorRefProvider via BootstrapSetup](https://github.com/akkadotnet/akka.net/pull/4473) +* [Akka.Cluster: Akka v1.4 Idle CPU usage increased comparing v1.3](https://github.com/akkadotnet/akka.net/issues/4434) +* [Akka.Cluster.Sharding: Backport of the feature called ClusterDistribution in Lagom](https://github.com/akkadotnet/akka.net/pull/4455) +* [Akka.TestKit: added ActorSystemSetup overload for TestKits](https://github.com/akkadotnet/akka.net/pull/4464) + +To see the [full set of fixes in Akka.NET v1.4.8, please see the milestone on Github](https://github.com/akkadotnet/akka.net/milestone/39). + +| COMMITS | LOC+ | LOC- | AUTHOR | +| --- | --- | --- | --- | +| 7 | 310 | 24 | Aaron Stannard | +| 6 | 8 | 8 | dependabot-preview[bot] | +| 2 | 669 | 10 | Ismael Hamed | +| 2 | 38 | 27 | Gregorius Soedharmo | +| 1 | 1 | 1 | Bartosz Sypytkowski | + #### 1.4.7 May 26 2020 #### **Maintenance Release for Akka.NET 1.4** diff --git a/build-system/azure-pipeline.mntr-template.yaml b/build-system/azure-pipeline.mntr-template.yaml index 7a86f3279af..bfbebe829c4 100644 --- a/build-system/azure-pipeline.mntr-template.yaml +++ b/build-system/azure-pipeline.mntr-template.yaml @@ -16,6 +16,11 @@ jobs: pool: vmImage: ${{ parameters.vmImage }} steps: + - task: UseDotNet@2 + displayName: 'Use .NET Core SDK 3.1.105' + inputs: + packageType: sdk + version: 3.1.105 - task: Bash@3 displayName: Linux / OSX Build inputs: diff --git a/build-system/azure-pipeline.template.yaml b/build-system/azure-pipeline.template.yaml index 7e9d56aa4f9..0415ac80c11 100644 --- a/build-system/azure-pipeline.template.yaml +++ b/build-system/azure-pipeline.template.yaml @@ -16,6 +16,11 @@ jobs: pool: vmImage: ${{ parameters.vmImage }} steps: + - task: UseDotNet@2 + displayName: 'Use .NET Core SDK 3.1.105' + inputs: + packageType: sdk + version: 3.1.105 - task: Bash@3 displayName: Linux / OSX Build inputs: diff --git a/build-system/nightly-builds.yaml b/build-system/nightly-builds.yaml index c8e91b27bf1..ed97c2b011e 100644 --- a/build-system/nightly-builds.yaml +++ b/build-system/nightly-builds.yaml @@ -19,6 +19,11 @@ variables: - group: nugetKeys #create this group with SECRET variables `nugetKey` steps: +- task: UseDotNet@2 + displayName: 'Use .NET Core SDK 3.1.105' + inputs: + packageType: sdk + version: 3.1.105 - task: BatchScript@1 displayName: 'FAKE Build' inputs: diff --git a/build-system/pr-validation.yaml b/build-system/pr-validation.yaml index dc9d1d0751c..8c259c129ca 100644 --- a/build-system/pr-validation.yaml +++ b/build-system/pr-validation.yaml @@ -6,11 +6,12 @@ trigger: - dev - v1.* - master + - feature/* pr: autoCancel: true # indicates whether additional pushes to a PR should cancel in-progress runs for the same PR. Defaults to true branches: - include: [ dev, v1.*, master ] # branch names which will trigger a build + include: [ dev, v1.*, master, feature/* ] # branch names which will trigger a build name: $(Year:yyyy).$(Month).$(DayOfMonth)$(Rev:.r) @@ -25,6 +26,11 @@ jobs: clean: false # whether to fetch clean each time submodules: recursive # set to 'true' for a single level of submodules or 'recursive' to get submodules of submodules persistCredentials: true + - task: UseDotNet@2 + displayName: 'Use .NET Core SDK 3.1.105' + inputs: + packageType: sdk + version: 3.1.105 - task: BatchScript@1 displayName: Windows Build inputs: diff --git a/build-system/windows-release.yaml b/build-system/windows-release.yaml index 4e1fc043246..253dce9e9ba 100644 --- a/build-system/windows-release.yaml +++ b/build-system/windows-release.yaml @@ -22,6 +22,11 @@ variables: - name: githubRepositoryName value: akkadotnet/akka.net steps: +- task: UseDotNet@2 + displayName: 'Use .NET Core SDK 3.1.105' + inputs: + packageType: sdk + version: 3.1.105 - task: BatchScript@1 displayName: 'FAKE Build' inputs: diff --git a/build.ps1 b/build.ps1 index 5415a1e06ac..7cfd2b5e988 100644 --- a/build.ps1 +++ b/build.ps1 @@ -31,7 +31,7 @@ Param( $FakeVersion = "4.63.0" $DotNetChannel = "LTS"; -$DotNetVersion = "3.1.100"; +$DotNetVersion = "3.1.105"; $DotNetInstallerUri = "https://dot.net/v1/dotnet-install.ps1"; $NugetVersion = "4.3.0"; $NugetUrl = "https://dist.nuget.org/win-x86-commandline/v$NugetVersion/nuget.exe" diff --git a/docs/articles/clustering/cluster-sharded-daemon-process.md b/docs/articles/clustering/cluster-sharded-daemon-process.md new file mode 100644 index 00000000000..87c27173368 --- /dev/null +++ b/docs/articles/clustering/cluster-sharded-daemon-process.md @@ -0,0 +1,31 @@ +# Sharded Daemon Process + +> [!WARNING] +>This module is currently marked as [may change](../utilities/may-change.md) because it is a new feature that +>needs feedback from real usage before finalizing the API. This means that API or semantics can change without +>warning or deprecation period. It is also not recommended to use this module in production just yet. + +## Introduction + +Sharded Daemon Process provides a way to run `N` actors, each given a numeric id starting from `0` that are then kept alive +and balanced across the cluster. When a rebalance is needed the actor is stopped and, triggered by a keep alive running on +all nodes, started on a new node (the keep alive should be seen as an implementation detail and may change in future versions). + +The intended use case is for splitting data processing workloads across a set number of workers that each get to work on a subset +of the data that needs to be processed. This is commonly needed to create projections based on the event streams available +from all the [Persistent Actors](../persistence/event-sourcing.md) in a CQRS application. Events are tagged with one out of `N` tags +used to split the workload of consuming and updating a projection between `N` workers. + +For cases where a single actor needs to be kept alive see [Cluster Singleton](cluster-singleton.md) + +## Basic example + +To set up a set of actors running with Sharded Daemon process each node in the cluster needs to run the same initialization +when starting up: + +[!code-csharp[ShardedDaemonProcessSpec.cs](../../../src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardedDaemonProcessSpec.cs?name=tag-processing)] + +## Scalability + +This cluster tool is intended for small numbers of consumers and will not scale well to a large set. In large clusters +it is recommended to limit the nodes the sharded daemon process will run on using a role. \ No newline at end of file diff --git a/docs/articles/toc.yml b/docs/articles/toc.yml index 7c265437ff0..258f863aa37 100644 --- a/docs/articles/toc.yml +++ b/docs/articles/toc.yml @@ -172,6 +172,8 @@ href: clustering/cluster-client.md - name: Cluster Sharding href: clustering/cluster-sharding.md + - name: Sharded Daemon Process + href: clustering/cluster-sharded-daemon-process.md - name: Cluster Metrics href: clustering/cluster-metrics.md - name: Distributed Data diff --git a/src/common.props b/src/common.props index 5d351627977..12cf95ec503 100644 --- a/src/common.props +++ b/src/common.props @@ -2,7 +2,7 @@ Copyright © 2013-2020 Akka.NET Team Akka.NET Team - 1.4.4 + 1.4.8 http://getakka.net/images/akkalogo.png https://github.com/akkadotnet/akka.net https://github.com/akkadotnet/akka.net/blob/master/LICENSE @@ -11,16 +11,16 @@ 2.4.1 16.6.1 - 0.9.15 + 0.9.16 12.0.3 2.0.1 - 3.12.1 + 3.12.3 netcoreapp3.1 net461 netstandard2.0 net452 4.14.0 - 2.14.2 + 2.14.3 2.0.3 4.7.0 akka;actors;actor model;Akka;concurrency @@ -29,21 +29,7 @@ true - Maintenance Release for Akka.NET 1.4** -Akka.NET v1.4.4 includes one major fix for HOCON fallback configurations, a new module (Akka.Coordination), and some major improvements to Akka.Cluster.Tools / Akka.Cluster.Sharding: -[Akka.Coordination: Lease API & integration](https://github.com/akkadotnet/akka.net/pull/4344) -[Akka: Timers for self scheduled messages added, FSM timers fixes](https://github.com/akkadotnet/akka.net/pull/3778) -[Akka **Important** Bugfix: Config.WithFallback is acting inconsistently](https://github.com/akkadotnet/akka.net/pull/4358) -[Akka.Cluster.Sharding: Updates](https://github.com/akkadotnet/akka.net/pull/4354) -To see the full set of changes for Akka.NET 1.4.4, please [see the 1.4.4 milestone](https://github.com/akkadotnet/akka.net/milestone/35). -| COMMITS | LOC+ | LOC- | AUTHOR | -| --- | --- | --- | --- | -| 3 | 4845 | 225 | zbynek001 | -| 2 | 3 | 3 | dependabot-preview[bot] | -| 2 | 159 | 0 | Aaron Stannard | -| 2 | 1099 | 23 | Gregorius Soedharmo | -| 1 | 34 | 5 | Petri Kero | -| 1 | 1 | 1 | Felix Reisinger | + Placeholder for nightlies** diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ShardedDaemonProcessSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ShardedDaemonProcessSpec.cs new file mode 100644 index 00000000000..c778ef93b93 --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ShardedDaemonProcessSpec.cs @@ -0,0 +1,174 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2020 Lightbend Inc. +// Copyright (C) 2013-2020 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Linq; +using Akka.Actor; +using Akka.Cluster.TestKit; +using Akka.Cluster.Tools.Singleton; +using Akka.Configuration; +using Akka.Remote.TestKit; +using FluentAssertions; + +namespace Akka.Cluster.Sharding.Tests.MultiNode +{ + public class ShardedDaemonProcessSpecConfig : MultiNodeConfig + { + public RoleName First { get; } + public RoleName Second { get; } + public RoleName Third { get; } + + public ShardedDaemonProcessSpecConfig() + { + First = Role("first"); + Second = Role("second"); + Third = Role("third"); + + CommonConfig = DebugConfig(false) + .WithFallback(ConfigurationFactory.ParseString(@" + akka.loglevel = INFO + akka.cluster.sharded-daemon-process {{ + sharding {{ + # First is likely to be ignored as shard coordinator not ready + retry-interval = 0.2s + }} + # quick ping to make test swift + keep-alive-interval = 1s + }} + ")) + .WithFallback(ClusterSharding.DefaultConfig()) + .WithFallback(ClusterSingletonManager.DefaultConfig()) + .WithFallback(MultiNodeClusterSpec.ClusterConfig()); + } + } + + public class ShardedDaemonProcessMultiNode : ShardedDaemonProcessSpec + { + public ShardedDaemonProcessMultiNode() : this(new ShardedDaemonProcessSpecConfig()) { } + protected ShardedDaemonProcessMultiNode(ShardedDaemonProcessSpecConfig config) : base(config, typeof(ShardedDaemonProcessMultiNode)) { } + } + + public abstract class ShardedDaemonProcessSpec : MultiNodeClusterSpec + { + private readonly ShardedDaemonProcessSpecConfig _config; + + protected ShardedDaemonProcessSpec(ShardedDaemonProcessSpecConfig config, Type type) + : base(config, type) + { + _config = config; + } + + [MultiNodeFact] + public void ShardedDaemonProcess_Specs() + { + ShardedDaemonProcess_Should_Init_Actor_Set(); + } + + public void ShardedDaemonProcess_Should_Init_Actor_Set() + { + // HACK + RunOn(() => FormCluster(_config.First, _config.Second, _config.Third), _config.First); + + var probe = CreateTestProbe(); + ShardedDaemonProcess.Get(Sys).Init("the-fearless", 4, id => ProcessActor.Props(id, probe.Ref)); + EnterBarrier("actor-set-initialized"); + + RunOn(() => + { + var startedIds = Enumerable.Range(0, 4).Select(_ => + { + var evt = probe.ExpectMsg(TimeSpan.FromSeconds(5)); + evt.Event.Should().Be("Started"); + return evt.Id; + }).ToList(); + startedIds.Count.Should().Be(4); + }, _config.First); + EnterBarrier("actor-set-started"); + } + + private void FormCluster(RoleName first, params RoleName[] rest) + { + RunOn(() => + { + Cluster.Join(GetAddress(first)); + AwaitAssert(() => + { + Cluster.State.Members.Select(i => i.UniqueAddress).Should().Contain(Cluster.SelfUniqueAddress); + Cluster.State.Members.Select(i => i.Status).Should().OnlyContain(i => i == MemberStatus.Up); + }); + }, first); + EnterBarrier(first.Name + "-joined"); + + foreach (var node in rest) + { + RunOn(() => + { + Cluster.Join(GetAddress(first)); + AwaitAssert(() => + { + Cluster.State.Members.Select(i => i.UniqueAddress).Should().Contain(Cluster.SelfUniqueAddress); + Cluster.State.Members.Select(i => i.Status).Should().OnlyContain(i => i == MemberStatus.Up); + }); + }, node); + } + EnterBarrier("all-joined"); + } + } + + internal class ProcessActor : UntypedActor + { + #region Protocol + + [Serializable] + public sealed class Stop + { + public static readonly Stop Instance = new Stop(); + private Stop() { } + } + + #endregion + + public static Props Props(int id, IActorRef probe) => + Actor.Props.Create(() => new ProcessActor(id, probe)); + + public ProcessActor(int id, IActorRef probe) + { + Probe = probe; + Id = id; + } + + public IActorRef Probe { get; } + public int Id { get; } + + protected override void PreStart() + { + base.PreStart(); + Probe.Tell(new ProcessActorEvent(Id, "Started")); + } + + protected override void OnReceive(object message) + { + if (message is Stop) + { + Probe.Tell(new ProcessActorEvent(Id, "Stopped")); + Context.Stop(Self); + } + } + } + + internal sealed class ProcessActorEvent + { + public ProcessActorEvent(int id, object @event) + { + Id = id; + Event = @event; + } + + public int Id { get; } + public object Event { get; } + } +} diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardedDaemonProcessSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardedDaemonProcessSpec.cs new file mode 100644 index 00000000000..4a866c9c193 --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardedDaemonProcessSpec.cs @@ -0,0 +1,158 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2020 Lightbend Inc. +// Copyright (C) 2013-2020 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Linq; +using Akka.Actor; +using Akka.Cluster.Tools.Singleton; +using Akka.Configuration; +using Akka.TestKit; +using Xunit; + +namespace Akka.Cluster.Sharding.Tests +{ + public class ShardedDaemonProcessSpec : AkkaSpec + { + private sealed class Stop + { + public static Stop Instance { get; } = new Stop(); + private Stop() { } + } + + private sealed class Started + { + public int Id { get; } + public IActorRef SelfRef { get; } + + public Started(int id, IActorRef selfRef) + { + Id = id; + SelfRef = selfRef; + } + } + + private class MyActor : UntypedActor + { + public int Id { get; } + public IActorRef Probe { get; } + + public static Props Props(int id, IActorRef probe) => + Actor.Props.Create(() => new MyActor(id, probe)); + + public MyActor(int id, IActorRef probe) + { + Id = id; + Probe = probe; + } + + protected override void PreStart() + { + base.PreStart(); + Probe.Tell(new Started(Id, Context.Self)); + } + + protected override void OnReceive(object message) + { + if (message is Stop _) + Context.Stop(Self); + } + } + + private static Config GetConfig() + { + return ConfigurationFactory.ParseString(@" + akka.actor.provider = cluster + akka.remote.dot-netty.tcp.port = 0 + akka.remote.dot-netty.tcp.hostname = 127.0.0.1 + + # ping often/start fast for test + akka.cluster.sharded-daemon-process.keep-alive-interval = 1s + + akka.coordinated-shutdown.terminate-actor-system = off + akka.coordinated-shutdown.run-by-actor-system-terminate = off") + .WithFallback(ClusterSharding.DefaultConfig()) + .WithFallback(ClusterSingletonProxy.DefaultConfig()); + } + + public ShardedDaemonProcessSpec() + : base(GetConfig()) + { } + + [Fact] + public void ShardedDaemonProcess_must_have_a_single_node_cluster_running_first() + { + var probe = CreateTestProbe(); + Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress); + probe.AwaitAssert(() => Cluster.Get(Sys).SelfMember.Status.ShouldBe(MemberStatus.Up), TimeSpan.FromSeconds(3)); + } + + [Fact] + public void ShardedDaemonProcess_must_start_N_actors_with_unique_ids() + { + Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress); + + var probe = CreateTestProbe(); + ShardedDaemonProcess.Get(Sys).Init("a", 5, id => MyActor.Props(id, probe.Ref)); + + var started = probe.ReceiveN(5); + started.Count.ShouldBe(5); + } + + [Fact] + public void ShardedDaemonProcess_must_restart_actors_if_they_stop() + { + Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress); + + var probe = CreateTestProbe(); + ShardedDaemonProcess.Get(Sys).Init("stop", 2, id => MyActor.Props(id, probe.Ref)); + + foreach (var started in Enumerable.Range(0, 2).Select(_ => probe.ExpectMsg())) + started.SelfRef.Tell(Stop.Instance); + + // periodic ping every 1s makes it restart + Enumerable.Range(0, 2).Select(_ => probe.ExpectMsg(TimeSpan.FromSeconds(3))); + } + + [Fact] + public void ShardedDaemonProcess_must_not_run_if_the_role_does_not_match_node_role() + { + Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress); + + var probe = CreateTestProbe(); + var settings = ShardedDaemonProcessSettings.Create(Sys).WithShardingSettings(ClusterShardingSettings.Create(Sys).WithRole("workers")); + ShardedDaemonProcess.Get(Sys).Init("roles", 3, id => MyActor.Props(id, probe.Ref), settings); + + probe.ExpectNoMsg(); + } + + // only used in documentation + private class TagProcessor : ReceiveActor + { + public string Tag { get; } + + public static Props Props(string tag) => + Actor.Props.Create(() => new TagProcessor(tag)); + + public TagProcessor(string tag) => Tag = tag; + + protected override void PreStart() + { + // start the processing ... + base.PreStart(); + Context.System.Log.Debug("Starting processor for tag {0}", Tag); + } + } + + private void DocExample() + { + #region tag-processing + var tags = new[] { "tag-1", "tag-2", "tag-3" }; + ShardedDaemonProcess.Get(Sys).Init("TagProcessors", tags.Length, id => TagProcessor.Props(tags[id])); + #endregion + } + } +} diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs index 40bac68c6e5..fecca87fb6d 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs @@ -117,7 +117,7 @@ public override int GetHashCode() /// to start(it does not guarantee the entity successfully started) /// [Serializable] - public sealed class StartEntityAck : IClusterShardingSerializable + public sealed class StartEntityAck : IClusterShardingSerializable, IDeadLetterSuppression { /// /// An identifier of a newly started entity. Unique in scope of a given shard. diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcess.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcess.cs new file mode 100644 index 00000000000..77dc48b203a --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcess.cs @@ -0,0 +1,179 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2020 Lightbend Inc. +// Copyright (C) 2013-2020 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Linq; +using Akka.Actor; +using Akka.Annotations; +using Akka.Util.Internal; + +namespace Akka.Cluster.Sharding +{ + internal class KeepAlivePinger : UntypedActor, IWithTimers + { + private sealed class Tick + { + public static Tick Instance { get; } = new Tick(); + private Tick() { } + } + + public string Name { get; } + public string[] Identities { get; } + public IActorRef ShardingRef { get; } + public ShardedDaemonProcessSettings Settings { get; } + + public ITimerScheduler Timers { get; set; } + + public static Props Props(ShardedDaemonProcessSettings settings, string name, string[] identities, IActorRef shardingRef) => + Actor.Props.Create(() => new KeepAlivePinger(settings, name, identities, shardingRef)); + + public KeepAlivePinger(ShardedDaemonProcessSettings settings, string name, string[] identities, IActorRef shardingRef) + { + Settings = settings; + Name = name; + Identities = identities; + ShardingRef = shardingRef; + } + + protected override void PreStart() + { + base.PreStart(); + + Context.System.Log.Debug("Starting Sharded Daemon Process KeepAlivePinger for [{0}], with ping interval [{1}]"); + Timers.StartPeriodicTimer("tick", Tick.Instance, Settings.KeepAliveInterval); + TriggerStartAll(); + } + + protected override void OnReceive(object message) + { + if (message is Tick _) + { + TriggerStartAll(); + Context.System.Log.Debug("Periodic ping sent to [{0}] processes", Identities.Length); + } + } + + private void TriggerStartAll() => Identities.ForEach(id => ShardingRef.Tell(new ShardRegion.StartEntity(id))); + } + + internal sealed class MessageExtractor : HashCodeMessageExtractor + { + public MessageExtractor(int maxNumberOfShards) + : base(maxNumberOfShards) + { } + + public override string EntityId(object message) => (message as ShardingEnvelope)?.EntityId; + public override object EntityMessage(object message) => (message as ShardingEnvelope)?.Message; + public override string ShardId(object message) => message is ShardRegion.StartEntity se ? se.EntityId : EntityId(message); + } + + /// + /// Default envelope type that may be used with Cluster Sharding. + /// + /// The alternative way of routing messages through sharding is to not use envelopes, + /// and have the message types themselves carry identifiers. + /// + /// + public sealed class ShardingEnvelope + { + public string EntityId { get; } + public object Message { get; } + + public ShardingEnvelope(string entityId, object message) + { + EntityId = entityId; + Message = message; + } + } + + /// + /// This extension runs a pre set number of actors in a cluster. + /// + /// The typical use case is when you have a task that can be divided in a number of workers, each doing a + /// sharded part of the work, for example consuming the read side events from Akka Persistence through + /// tagged events where each tag decides which consumer that should consume the event. + /// + /// Each named set needs to be started on all the nodes of the cluster on start up. + /// + /// The processes are spread out across the cluster, when the cluster topology changes the processes may be stopped + /// and started anew on a new node to rebalance them. + /// + /// Not for user extension. + /// + [ApiMayChange] + public class ShardedDaemonProcess : IExtension + { + private readonly ExtendedActorSystem _system; + + public ShardedDaemonProcess(ExtendedActorSystem system) => _system = system; + + public static ShardedDaemonProcess Get(ActorSystem system) => + system.WithExtension(); + + /// + /// Start a specific number of actors that is then kept alive in the cluster. + /// + /// TBD + /// TBD + /// Given a unique id of `0` until `numberOfInstance` create an entity actor. + public void Init(string name, int numberOfInstances, Func propsFactory) + { + Init(name, numberOfInstances, propsFactory, ShardedDaemonProcessSettings.Create(_system)); + } + + /// + /// Start a specific number of actors, each with a unique numeric id in the set, that is then kept alive in the cluster. + /// + /// TBD + /// TBD + /// Given a unique id of `0` until `numberOfInstance` create an entity actor. + /// TBD + public void Init(string name, int numberOfInstances, Func propsFactory, ShardedDaemonProcessSettings settings) + { + // One shard per actor identified by the numeric id encoded in the entity id + var numberOfShards = numberOfInstances; + var entityIds = Enumerable.Range(0, numberOfInstances).Select(i => i.ToString()).ToArray(); + + // Defaults in `akka.cluster.sharding` but allow overrides specifically for actor-set + var shardingBaseSettings = settings.ShardingSettings; + if (shardingBaseSettings == null) + { + var shardingConfig = _system.Settings.Config.GetConfig("akka.cluster.sharded-daemon-process.sharding"); + var coordinatorSingletonConfig = _system.Settings.Config.GetConfig(shardingConfig.GetString("coordinator-singleton")); + shardingBaseSettings = ClusterShardingSettings.Create(shardingConfig, coordinatorSingletonConfig); + } + + var shardingSettings = new ClusterShardingSettings( + shardingBaseSettings.Role, + false, // remember entities disabled + "", + "", + TimeSpan.Zero, // passivation disabled + StateStoreMode.DData, + shardingBaseSettings.TunningParameters, + shardingBaseSettings.CoordinatorSingletonSettings); + + if (string.IsNullOrEmpty(shardingSettings.Role) || Cluster.Get(_system).SelfRoles.Contains(shardingSettings.Role)) + { + var shardRegion = ClusterSharding.Get(_system).Start( + typeName: $"sharded-daemon-process-{name}", + entityPropsFactory: entityId => propsFactory(int.Parse(entityId)), + settings: shardingSettings, + messageExtractor: new MessageExtractor(numberOfShards)); + + _system.ActorOf( + KeepAlivePinger.Props(settings, name, entityIds, shardRegion), + $"ShardedDaemonProcessKeepAlive-{name}"); + } + } + } + + public class ShardedDaemonProcessExtensionProvider : ExtensionIdProvider + { + public override ShardedDaemonProcess CreateExtension(ExtendedActorSystem system) => new ShardedDaemonProcess(system); + } +} diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcessSettings.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcessSettings.cs new file mode 100644 index 00000000000..c1508422c04 --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcessSettings.cs @@ -0,0 +1,72 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2020 Lightbend Inc. +// Copyright (C) 2013-2020 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using Akka.Actor; +using Akka.Annotations; +using Akka.Configuration; + +namespace Akka.Cluster.Sharding +{ + [Serializable] + [ApiMayChange] + public sealed class ShardedDaemonProcessSettings + { + /// + /// The interval each parent of the sharded set is pinged from each node in the cluster. + /// + public readonly TimeSpan KeepAliveInterval; + + /// + /// Specify sharding settings that should be used for the sharded daemon process instead of loading from config. + /// + public readonly ClusterShardingSettings ShardingSettings; + + /// + /// Create default settings for system + /// + public static ShardedDaemonProcessSettings Create(ActorSystem system) + { + return FromConfig(system.Settings.Config.GetConfig("akka.cluster.sharded-daemon-process")); + } + + public static ShardedDaemonProcessSettings FromConfig(Config config) + { + var keepAliveInterval = config.GetTimeSpan("keep-alive-interval"); + return new ShardedDaemonProcessSettings(keepAliveInterval); + } + + /// + /// Not for user constructions, use factory methods to instantiate. + /// + private ShardedDaemonProcessSettings(TimeSpan keepAliveInterval, ClusterShardingSettings shardingSettings = null) + { + KeepAliveInterval = keepAliveInterval; + ShardingSettings = shardingSettings; + } + + /// + /// NOTE: How the sharded set is kept alive may change in the future meaning this setting may go away. + /// + /// The interval each parent of the sharded set is pinged from each node in the cluster. + public ShardedDaemonProcessSettings WithKeepAliveInterval(TimeSpan keepAliveInterval) + { + return new ShardedDaemonProcessSettings(keepAliveInterval, ShardingSettings); + } + + /// + /// Specify sharding settings that should be used for the sharded daemon process instead of loading from config. + /// Some settings can not be changed (remember-entities and related settings, passivation, number-of-shards), + /// changing those settings will be ignored. + /// + /// TBD + public ShardedDaemonProcessSettings WithShardingSettings(ClusterShardingSettings shardingSettings) + { + return new ShardedDaemonProcessSettings(KeepAliveInterval, shardingSettings); + } + } +} diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf b/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf index b69d6c0dad5..36f571ec725 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf +++ b/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf @@ -268,6 +268,20 @@ akka.cluster.sharding { } # //#sharding-ext-config +akka.cluster.sharded-daemon-process { + # Settings for the sharded dameon process internal usage of sharding are using the akka.cluste.sharding defaults. + # Some of the settings can be overriden specifically for the sharded daemon process here. For example can the + # `role` setting limit what nodes the daemon processes and the keep alive pingers will run on. + # Some settings can not be changed (remember-entitites and related settings, passivation, number-of-shards), + # overriding those settings will be ignored. + sharding = ${akka.cluster.sharding} + + # Each entity is pinged at this interval from each node in the + # cluster to trigger a start if it has stopped, for example during + # rebalancing. + # Note: How the set of actors is kept alive may change in the future meaning this setting may go away. + keep-alive-interval = 10s +} # Protobuf serializer for Cluster Sharding messages akka.actor { diff --git a/src/contrib/dependencyinjection/Akka.DI.Core/Akka.DI.Core.csproj b/src/contrib/dependencyinjection/Akka.DI.Core/Akka.DI.Core.csproj index fa890aafc33..858bae5d912 100644 --- a/src/contrib/dependencyinjection/Akka.DI.Core/Akka.DI.Core.csproj +++ b/src/contrib/dependencyinjection/Akka.DI.Core/Akka.DI.Core.csproj @@ -27,7 +27,7 @@ - + \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.Sqlite/Akka.Persistence.Sqlite.csproj b/src/contrib/persistence/Akka.Persistence.Sqlite/Akka.Persistence.Sqlite.csproj index cd1c14a1016..2270bb1cbae 100644 --- a/src/contrib/persistence/Akka.Persistence.Sqlite/Akka.Persistence.Sqlite.csproj +++ b/src/contrib/persistence/Akka.Persistence.Sqlite/Akka.Persistence.Sqlite.csproj @@ -16,7 +16,7 @@ - + diff --git a/src/contrib/testkits/Akka.TestKit.Xunit/TestKit.cs b/src/contrib/testkits/Akka.TestKit.Xunit/TestKit.cs index 485ba9fbd4f..1236d6eaca3 100644 --- a/src/contrib/testkits/Akka.TestKit.Xunit/TestKit.cs +++ b/src/contrib/testkits/Akka.TestKit.Xunit/TestKit.cs @@ -7,6 +7,7 @@ using System; using Akka.Actor; +using Akka.Actor.Setup; using Akka.Configuration; using Akka.Event; using Akka.TestKit.Xunit.Internals; @@ -45,6 +46,19 @@ public TestKit(ActorSystem system = null, ITestOutputHelper output = null) InitializeLogger(Sys); } + /// + /// Initializes a new instance of the class. + /// + /// The to use for configuring the ActorSystem. + /// The name of the system. The default name is "test". + /// The provider used to write test output. The default value is . + public TestKit(ActorSystemSetup config, string actorSystemName = null, ITestOutputHelper output = null) + : base(Assertions, config, actorSystemName) + { + Output = output; + InitializeLogger(Sys); + } + /// /// Initializes a new instance of the class. /// diff --git a/src/contrib/testkits/Akka.TestKit.Xunit2/TestKit.cs b/src/contrib/testkits/Akka.TestKit.Xunit2/TestKit.cs index 24009875e8c..c1b0cd06209 100644 --- a/src/contrib/testkits/Akka.TestKit.Xunit2/TestKit.cs +++ b/src/contrib/testkits/Akka.TestKit.Xunit2/TestKit.cs @@ -7,6 +7,7 @@ using System; using Akka.Actor; +using Akka.Actor.Setup; using Akka.Configuration; using Akka.Event; using Akka.TestKit.Xunit2.Internals; @@ -45,6 +46,19 @@ public TestKit(ActorSystem system = null, ITestOutputHelper output = null) InitializeLogger(Sys); } + /// + /// Initializes a new instance of the class. + /// + /// The to use for configuring the ActorSystem. + /// The name of the system. The default name is "test". + /// The provider used to write test output. The default value is . + public TestKit(ActorSystemSetup config, string actorSystemName = null, ITestOutputHelper output = null) + : base(Assertions, config, actorSystemName) + { + Output = output; + InitializeLogger(Sys); + } + /// /// Initializes a new instance of the class. /// diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.approved.txt index 66bd20ef2d2..09338c9831b 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.approved.txt @@ -80,13 +80,13 @@ namespace Akka.Cluster public Akka.Cluster.ClusterEvent.CurrentClusterState Copy(System.Collections.Immutable.ImmutableSortedSet members = null, System.Collections.Immutable.ImmutableHashSet unreachable = null, System.Collections.Immutable.ImmutableHashSet seenBy = null, Akka.Actor.Address leader = null, System.Collections.Immutable.ImmutableDictionary roleLeaderMap = null) { } public Akka.Actor.Address RoleLeader(string role) { } } - public interface IClusterDomainEvent { } - public interface IMemberEvent : Akka.Cluster.ClusterEvent.IClusterDomainEvent + public interface IClusterDomainEvent : Akka.Event.IDeadLetterSuppression { } + public interface IMemberEvent : Akka.Cluster.ClusterEvent.IClusterDomainEvent, Akka.Event.IDeadLetterSuppression { Akka.Cluster.Member Member { get; } } - public interface IReachabilityEvent : Akka.Cluster.ClusterEvent.IClusterDomainEvent { } - public sealed class LeaderChanged : Akka.Cluster.ClusterEvent.IClusterDomainEvent + public interface IReachabilityEvent : Akka.Cluster.ClusterEvent.IClusterDomainEvent, Akka.Event.IDeadLetterSuppression { } + public sealed class LeaderChanged : Akka.Cluster.ClusterEvent.IClusterDomainEvent, Akka.Event.IDeadLetterSuppression { public LeaderChanged(Akka.Actor.Address leader) { } public Akka.Actor.Address Leader { get; } @@ -117,7 +117,7 @@ namespace Akka.Cluster public override bool Equals(object obj) { } public override int GetHashCode() { } } - public abstract class MemberStatusChange : Akka.Cluster.ClusterEvent.IClusterDomainEvent, Akka.Cluster.ClusterEvent.IMemberEvent + public abstract class MemberStatusChange : Akka.Cluster.ClusterEvent.IClusterDomainEvent, Akka.Cluster.ClusterEvent.IMemberEvent, Akka.Event.IDeadLetterSuppression { protected readonly Akka.Cluster.Member _member; protected MemberStatusChange(Akka.Cluster.Member member, Akka.Cluster.MemberStatus validStatus) { } @@ -134,7 +134,7 @@ namespace Akka.Cluster { public MemberWeaklyUp(Akka.Cluster.Member member) { } } - public abstract class ReachabilityEvent : Akka.Cluster.ClusterEvent.IClusterDomainEvent, Akka.Cluster.ClusterEvent.IReachabilityEvent + public abstract class ReachabilityEvent : Akka.Cluster.ClusterEvent.IClusterDomainEvent, Akka.Cluster.ClusterEvent.IReachabilityEvent, Akka.Event.IDeadLetterSuppression { protected ReachabilityEvent(Akka.Cluster.Member member) { } public Akka.Cluster.Member Member { get; } @@ -146,7 +146,7 @@ namespace Akka.Cluster { public ReachableMember(Akka.Cluster.Member member) { } } - public sealed class RoleLeaderChanged : Akka.Cluster.ClusterEvent.IClusterDomainEvent + public sealed class RoleLeaderChanged : Akka.Cluster.ClusterEvent.IClusterDomainEvent, Akka.Event.IDeadLetterSuppression { public RoleLeaderChanged(string role, Akka.Actor.Address leader) { } public Akka.Actor.Address Leader { get; } diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveClusterSharding.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveClusterSharding.approved.txt index b4bf1399368..785e8a4a162 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveClusterSharding.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveClusterSharding.approved.txt @@ -190,7 +190,7 @@ namespace Akka.Cluster.Sharding public override bool Equals(object obj) { } public override int GetHashCode() { } } - public sealed class StartEntityAck : Akka.Cluster.Sharding.IClusterShardingSerializable + public sealed class StartEntityAck : Akka.Cluster.Sharding.IClusterShardingSerializable, Akka.Event.IDeadLetterSuppression { public readonly string EntityId; public readonly string ShardId; @@ -213,6 +213,35 @@ namespace Akka.Cluster.Sharding public readonly string ShardId; public ShardState(string shardId, System.Collections.Immutable.IImmutableSet entityIds) { } } + [Akka.Annotations.ApiMayChangeAttribute()] + public class ShardedDaemonProcess : Akka.Actor.IExtension + { + public ShardedDaemonProcess(Akka.Actor.ExtendedActorSystem system) { } + public static Akka.Cluster.Sharding.ShardedDaemonProcess Get(Akka.Actor.ActorSystem system) { } + public void Init(string name, int numberOfInstances, System.Func propsFactory) { } + public void Init(string name, int numberOfInstances, System.Func propsFactory, Akka.Cluster.Sharding.ShardedDaemonProcessSettings settings) { } + } + public class ShardedDaemonProcessExtensionProvider : Akka.Actor.ExtensionIdProvider + { + public ShardedDaemonProcessExtensionProvider() { } + public override Akka.Cluster.Sharding.ShardedDaemonProcess CreateExtension(Akka.Actor.ExtendedActorSystem system) { } + } + [Akka.Annotations.ApiMayChangeAttribute()] + public sealed class ShardedDaemonProcessSettings + { + public readonly System.TimeSpan KeepAliveInterval; + public readonly Akka.Cluster.Sharding.ClusterShardingSettings ShardingSettings; + public static Akka.Cluster.Sharding.ShardedDaemonProcessSettings Create(Akka.Actor.ActorSystem system) { } + public static Akka.Cluster.Sharding.ShardedDaemonProcessSettings FromConfig(Akka.Configuration.Config config) { } + public Akka.Cluster.Sharding.ShardedDaemonProcessSettings WithKeepAliveInterval(System.TimeSpan keepAliveInterval) { } + public Akka.Cluster.Sharding.ShardedDaemonProcessSettings WithShardingSettings(Akka.Cluster.Sharding.ClusterShardingSettings shardingSettings) { } + } + public sealed class ShardingEnvelope + { + public ShardingEnvelope(string entityId, object message) { } + public string EntityId { get; } + public object Message { get; } + } public enum StateStoreMode { Persistence = 0, diff --git a/src/core/Akka.Cluster.Tests/ProviderSelectionSpec.cs b/src/core/Akka.Cluster.Tests/ProviderSelectionSpec.cs new file mode 100644 index 00000000000..675d3b8f1fc --- /dev/null +++ b/src/core/Akka.Cluster.Tests/ProviderSelectionSpec.cs @@ -0,0 +1,76 @@ +using Akka.Actor; +using Akka.Actor.Setup; +using Akka.Configuration; +using FluentAssertions; +using Xunit; + +namespace Akka.Cluster.Tests +{ + public class ProviderSelectionSpec + { + public ActorSystemSetup Setup { get; } = ActorSystemSetup.Create(); + public Config LocalConfig { get; } = ConfigurationFactory.Load(); + + public Settings SettingsWith(string key) + { + var c = ConfigurationFactory.ParseString($"akka.actor.provider = \"{key}\"") + .WithFallback(LocalConfig); + return new Settings(null, c, Setup); + } + + [Fact] + public void ProviderSelectionMustCreateLocalProviderSelection() + { + var ps = ProviderSelection.Local.Instance; + ps.Fqn.Should() + .Be(ProviderSelection.LocalActorRefProvider); + ps.HasCluster.Should().BeFalse(); + SettingsWith("local").ProviderClass.Should().Be(ps.Fqn); + } + + [Fact] + public void ProviderSelectionMustCreateRemoteProviderSelection() + { + var ps = ProviderSelection.Remote.Instance; + ps.Fqn.Should() + .Be(ProviderSelection.RemoteActorRefProvider); + ps.HasCluster.Should().BeFalse(); + ProviderSelection.GetProvider("remote").Should().Be(ProviderSelection.Remote.Instance); + SettingsWith("remote").ProviderClass.Should().Be(ps.Fqn); + } + + [Fact] + public void ProviderSelectionMustCreateClusterProviderSelection() + { + var ps = ProviderSelection.Cluster.Instance; + ps.Fqn.Should() + .Be(ProviderSelection.ClusterActorRefProvider); + ps.HasCluster.Should().BeTrue(); + ProviderSelection.GetProvider("cluster").Should().Be(ProviderSelection.Cluster.Instance); + SettingsWith("cluster").ProviderClass.Should().Be(ps.Fqn); + } + + [Fact] + public void ProviderSelectionMustCreateCustomProviderSelection() + { + var other = ProviderSelection.ClusterActorRefProvider; + var ps = new ProviderSelection.Custom(other, "cluster"); + ps.Fqn.Should() + .Be(other); + ps.HasCluster.Should().BeFalse(); + SettingsWith(other).ProviderClass.Should().Be(ps.Fqn); + } + + [Fact] + public void ProviderSelectionMustCreateActorSystemWithCustomProviderSelection() + { + var other = ProviderSelection.ClusterActorRefProvider; + var ps = new ProviderSelection.Custom(other, "test"); + using (var actorSystem = ActorSystem.Create("Test1", BootstrapSetup.Create().WithActorRefProvider(ps))) + { + actorSystem.Settings.ProviderClass.Should().Be(ps.Fqn); + } + + } + } +} diff --git a/src/core/Akka.Cluster/ClusterEvent.cs b/src/core/Akka.Cluster/ClusterEvent.cs index b4681c0aa53..0ac9ed65763 100644 --- a/src/core/Akka.Cluster/ClusterEvent.cs +++ b/src/core/Akka.Cluster/ClusterEvent.cs @@ -62,7 +62,7 @@ public enum SubscriptionInitialStateMode /// /// Marker interface for cluster domain events /// - public interface IClusterDomainEvent { } + public interface IClusterDomainEvent : IDeadLetterSuppression { } /// /// A snapshot of the current state of the diff --git a/src/core/Akka.NodeTestRunner/Akka.NodeTestRunner.csproj b/src/core/Akka.NodeTestRunner/Akka.NodeTestRunner.csproj index e56af60f41b..59804994ecc 100644 --- a/src/core/Akka.NodeTestRunner/Akka.NodeTestRunner.csproj +++ b/src/core/Akka.NodeTestRunner/Akka.NodeTestRunner.csproj @@ -22,7 +22,7 @@ - + diff --git a/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs b/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs index 4b1b67faef0..731b41139a9 100644 --- a/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs +++ b/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs @@ -5,6 +5,7 @@ // //----------------------------------------------------------------------- +using Akka.Actor.Setup; using Akka.Configuration; namespace Akka.Persistence.TestKit @@ -22,6 +23,25 @@ namespace Akka.Persistence.TestKit /// public abstract class PersistenceTestKit : TestKit { + /// + /// Create a new instance of the class. + /// A new system with the specified configuration will be created. + /// + /// Test ActorSystem configuration + /// Optional: The name of the actor system + /// TBD + protected PersistenceTestKit(ActorSystemSetup setup, string actorSystemName = null, ITestOutputHelper output = null) + : base(GetConfig(setup), actorSystemName, output) + { + var persistenceExtension = Persistence.Instance.Apply(Sys); + + JournalActorRef = persistenceExtension.JournalFor(null); + Journal = TestJournal.FromRef(JournalActorRef); + + SnapshotsActorRef = persistenceExtension.SnapshotStoreFor(null); + Snapshots = TestSnapshotStore.FromRef(SnapshotsActorRef); + } + /// /// Create a new instance of the class. /// A new system with the specified configuration will be created. @@ -287,6 +307,36 @@ public Task WithSnapshotDelete(Func behaviorS return Task.FromResult(true); }); + /// + /// Loads from embedded resources actor system persistence configuration with and + /// configured as default persistence plugins. + /// + /// Custom configuration that was passed in the constructor. + /// Actor system configuration object. + /// + private static ActorSystemSetup GetConfig(ActorSystemSetup customConfig) + { + var bootstrapSetup = customConfig.Get(); + var config = bootstrapSetup.FlatSelect(x => x.Config); + var actorProvider = bootstrapSetup.FlatSelect(x => x.ActorRefProvider); + var newSetup = BootstrapSetup.Create(); + if (config.HasValue) + { + newSetup = newSetup.WithConfig(GetConfig(config.Value)); + } + else + { + newSetup = newSetup.WithConfig(GetConfig(Config.Empty)); + } + + if (actorProvider.HasValue) + { + newSetup = newSetup.WithActorRefProvider(actorProvider.Value); + } + + return customConfig.WithSetup(newSetup); + } + /// /// Loads from embedded resources actor system persistence configuration with and /// configured as default persistence plugins. diff --git a/src/core/Akka.Remote.Tests/Akka.Remote.Tests.csproj b/src/core/Akka.Remote.Tests/Akka.Remote.Tests.csproj index 25ec4489567..d5e0c571a2e 100644 --- a/src/core/Akka.Remote.Tests/Akka.Remote.Tests.csproj +++ b/src/core/Akka.Remote.Tests/Akka.Remote.Tests.csproj @@ -24,8 +24,8 @@ - - + + diff --git a/src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs b/src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs index 1afe3df14b8..54005363745 100644 --- a/src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs +++ b/src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs @@ -10,6 +10,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; +using Akka.Configuration; using Akka.Remote.Transport.DotNetty; using Akka.TestKit; using DotNetty.Buffers; @@ -57,7 +58,26 @@ public DotNettyBatchWriterSpecs(ITestOutputHelper helper) : base(helper) Flush = new FlushLogger(helper); } + [Fact] + public void Bugfix4434_should_overwrite_default_BatchWriterSettings() + { + Config c = @" + akka.remote.dot-netty.tcp{ + batching{ + enabled = false + max-pending-writes = 50 + max-pending-bytes = 32k + flush-interval = 10ms + } + } + "; + var s = DotNettyTransportSettings.Create(c.GetConfig("akka.remote.dot-netty.tcp")); + s.BatchWriterSettings.EnableBatching.Should().BeFalse(); + s.BatchWriterSettings.FlushInterval.Should().NotBe(BatchWriterSettings.DefaultFlushInterval); + s.BatchWriterSettings.MaxPendingBytes.Should().NotBe(BatchWriterSettings.DefaultMaxPendingBytes); + s.BatchWriterSettings.MaxPendingWrites.Should().NotBe(BatchWriterSettings.DefaultMaxPendingWrites); + } /// /// Stay below the write / count and write / byte threshold. Rely on the timer. diff --git a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs index f515db244d5..dc778be5d0e 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs @@ -328,7 +328,8 @@ private void SetInitialChannelPipeline(IChannel channel) } } - pipeline.AddLast("BatchWriter", new BatchWriter(Settings.BatchWriterSettings)); + if(Settings.BatchWriterSettings.EnableBatching) + pipeline.AddLast("BatchWriter", new BatchWriter(Settings.BatchWriterSettings)); } private void SetClientPipeline(IChannel channel, Address remoteAddress) diff --git a/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs b/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs index 16a0e84c23f..37dd4c92199 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs @@ -40,7 +40,9 @@ protected override void RegisterListener(IChannel channel, IHandleEventListener protected override AssociationHandle CreateHandle(IChannel channel, Address localAddress, Address remoteAddress) { - return new TcpAssociationHandle(localAddress, remoteAddress, Transport, channel); + if(Transport.Settings.BatchWriterSettings.EnableBatching) + return new BatchingTcpAssociationHandle(localAddress, remoteAddress, Transport, channel); + return new NonBatchingTcpAssociationHandle(localAddress, remoteAddress, Transport, channel); } public override void ChannelInactive(IChannelHandlerContext context) @@ -159,11 +161,11 @@ private void InitOutbound(IChannel channel, IPEndPoint socketAddress, object msg } } - internal sealed class TcpAssociationHandle : AssociationHandle + internal sealed class BatchingTcpAssociationHandle : AssociationHandle { private readonly IChannel _channel; - public TcpAssociationHandle(Address localAddress, Address remoteAddress, DotNettyTransport transport, IChannel channel) + public BatchingTcpAssociationHandle(Address localAddress, Address remoteAddress, DotNettyTransport transport, IChannel channel) : base(localAddress, remoteAddress) { _channel = channel; @@ -193,7 +195,41 @@ public override void Disassociate() _channel.CloseAsync(); } } - + + internal sealed class NonBatchingTcpAssociationHandle : AssociationHandle + { + private readonly IChannel _channel; + + public NonBatchingTcpAssociationHandle(Address localAddress, Address remoteAddress, DotNettyTransport transport, IChannel channel) + : base(localAddress, remoteAddress) + { + _channel = channel; + } + + public override bool Write(ByteString payload) + { + if (_channel.Open) + { + var data = ToByteBuffer(_channel, payload); + _channel.WriteAndFlushAsync(data); + return true; + } + return false; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static IByteBuffer ToByteBuffer(IChannel channel, ByteString payload) + { + var buffer = Unpooled.WrappedBuffer(payload.ToByteArray()); + return buffer; + } + + public override void Disassociate() + { + _channel.CloseAsync(); + } + } + internal sealed class TcpTransport : DotNettyTransport { public TcpTransport(ActorSystem system, Config config) : base(system, config) diff --git a/src/core/Akka.Streams/Dsl/Restart.cs b/src/core/Akka.Streams/Dsl/Restart.cs index 8f95295d713..aac8621f41b 100644 --- a/src/core/Akka.Streams/Dsl/Restart.cs +++ b/src/core/Akka.Streams/Dsl/Restart.cs @@ -494,7 +494,7 @@ protected SubSinkInlet CreateSubInlet(Outlet outlet) Fail(Out, ex); else { - Log.Warning($"Restarting graph due to failure. Stacktrace: {ex.StackTrace}"); + Log.Warning(ex, "Restarting graph due to failure."); ScheduleRestartTimer(); } })); diff --git a/src/core/Akka.TestKit.Tests/ActorSystemSetupSpecs.cs b/src/core/Akka.TestKit.Tests/ActorSystemSetupSpecs.cs new file mode 100644 index 00000000000..2d9034e6f88 --- /dev/null +++ b/src/core/Akka.TestKit.Tests/ActorSystemSetupSpecs.cs @@ -0,0 +1,30 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2020 Lightbend Inc. +// Copyright (C) 2013-2020 .NET Foundation +// +//----------------------------------------------------------------------- + +using Akka.Actor; +using Akka.Actor.Setup; +using Xunit; + +namespace Akka.TestKit.Tests +{ + /// + /// Validate that an inside the testkit + /// can be configured using an instance. + /// + public class ActorSystemSetupSpecs : AkkaSpec + { + public static readonly ActorSystemSetup Setup = ActorSystemSetup.Create().WithSetup(BootstrapSetup.Create().WithConfig(@"akka.hi = true")); + + public ActorSystemSetupSpecs() : base(Setup) { } + + [Fact] + public void ShouldReadConfigFromActorSystemSetup() + { + Assert.True(Sys.Settings.Config.HasPath("akka.hi")); + } + } +} diff --git a/src/core/Akka.TestKit/TestKitBase.cs b/src/core/Akka.TestKit/TestKitBase.cs index 38212506197..c046d11e588 100644 --- a/src/core/Akka.TestKit/TestKitBase.cs +++ b/src/core/Akka.TestKit/TestKitBase.cs @@ -10,6 +10,7 @@ using System.Threading.Tasks; using Akka.Actor; using Akka.Actor.Internal; +using Akka.Actor.Setup; using Akka.Configuration; using Akka.Event; using Akka.Pattern; @@ -72,7 +73,7 @@ public TestState() /// This exception is thrown when the given is undefined. /// protected TestKitBase(ITestKitAssertions assertions, ActorSystem system = null, string testActorName=null) - : this(assertions, system, _defaultConfig, null, testActorName) + : this(assertions, system, ActorSystemSetup.Empty.WithSetup(BootstrapSetup.Create().WithConfig(_defaultConfig)), null, testActorName) { } @@ -80,19 +81,35 @@ protected TestKitBase(ITestKitAssertions assertions, ActorSystem system = null, /// Create a new instance of the class. /// A new system with the specified configuration will be created. /// - /// TBD + /// The set of assertions used by the TestKit. + /// The to use for the configuring the system. + /// Optional: the name of the ActorSystem. + /// Optional: the name of the TestActor. + /// + /// This exception is thrown when the given is undefined. + /// + protected TestKitBase(ITestKitAssertions assertions, ActorSystemSetup setup, string actorSystemName = null, string testActorName = null) + : this(assertions, null, setup, actorSystemName, testActorName) + { + } + + /// + /// Create a new instance of the class. + /// A new system with the specified configuration will be created. + /// + /// The set of assertions used by the TestKit. /// The configuration to use for the system. - /// TBD - /// Optional: The name of the TestActor. + /// Optional: the name of the ActorSystem. + /// Optional: the name of the TestActor. /// /// This exception is thrown when the given is undefined. /// protected TestKitBase(ITestKitAssertions assertions, Config config, string actorSystemName = null, string testActorName = null) - : this(assertions, null, config ?? ConfigurationFactory.Empty, actorSystemName, testActorName) + : this(assertions, null, ActorSystemSetup.Empty.WithSetup(BootstrapSetup.Create().WithConfig(config)), actorSystemName, testActorName) { } - private TestKitBase(ITestKitAssertions assertions, ActorSystem system, Config config, string actorSystemName, string testActorName) + private TestKitBase(ITestKitAssertions assertions, ActorSystem system, ActorSystemSetup config, string actorSystemName, string testActorName) { if(assertions == null) throw new ArgumentNullException(nameof(assertions), "The supplied assertions must not be null."); @@ -108,14 +125,24 @@ private TestKitBase(ITestKitAssertions assertions, ActorSystem system, Config co /// The configuration that will use if it's null. /// The name that will use if it's null. /// The name of the test actor. Can be null. - protected void InitializeTest(ActorSystem system, Config config, string actorSystemName, string testActorName) + protected void InitializeTest(ActorSystem system, ActorSystemSetup config, string actorSystemName, string testActorName) { _testState = new TestState(); if (system == null) { - var configWithDefaultFallback = config.SafeWithFallback(_defaultConfig); - system = ActorSystem.Create(actorSystemName ?? "test", configWithDefaultFallback); + var boostrap = config.Get(); + var configWithDefaultFallback = boostrap.HasValue + ? boostrap.Value.Config.Select(c => c == _defaultConfig ? c : c.WithFallback(_defaultConfig)) + : _defaultConfig; + + var newBootstrap = BootstrapSetup.Create().WithConfig(configWithDefaultFallback.Value); + if (boostrap.FlatSelect(x => x.ActorRefProvider).HasValue) + { + newBootstrap = + newBootstrap.WithActorRefProvider(boostrap.FlatSelect(x => x.ActorRefProvider).Value); + } + system = ActorSystem.Create(actorSystemName ?? "test", config.WithSetup(newBootstrap)); } _testState.System = system; @@ -160,6 +187,18 @@ protected void InitializeTest(ActorSystem system, Config config, string actorSys _testState.TestActor = testActor; } + /// + /// Initializes the for a new spec. + /// + /// The actor system this test will use. Can be null. + /// The configuration that will use if it's null. + /// The name that will use if it's null. + /// The name of the test actor. Can be null. + protected void InitializeTest(ActorSystem system, Config config, string actorSystemName, string testActorName) + { + InitializeTest(system, ActorSystemSetup.Create(BootstrapSetup.Create().WithConfig(config)), actorSystemName, testActorName); + } + private TimeSpan SingleExpectDefaultTimeout { get { return _testState.TestKitSettings.SingleExpectDefault; } } /// diff --git a/src/core/Akka.Tests.Shared.Internals/AkkaSpec.cs b/src/core/Akka.Tests.Shared.Internals/AkkaSpec.cs index 4a1339ac6c8..edc336fbd89 100644 --- a/src/core/Akka.Tests.Shared.Internals/AkkaSpec.cs +++ b/src/core/Akka.Tests.Shared.Internals/AkkaSpec.cs @@ -13,6 +13,7 @@ using System.Threading; using System.Threading.Tasks; using Akka.Actor; +using Akka.Actor.Setup; using Akka.Configuration; using Akka.TestKit.Internal.StringMatcher; using Akka.TestKit.TestEvent; @@ -60,6 +61,12 @@ public AkkaSpec(Config config = null, ITestOutputHelper output = null) BeforeAll(); } + public AkkaSpec(ActorSystemSetup setup, ITestOutputHelper output = null) + : base(setup, GetCallerName(), output) + { + BeforeAll(); + } + public AkkaSpec(ITestOutputHelper output, Config config = null) : base(config.SafeWithFallback(_akkaSpecConfig), GetCallerName(), output) { diff --git a/src/core/Akka/Actor/ActorSystem.cs b/src/core/Akka/Actor/ActorSystem.cs index b9646f47774..8d8658ad3d3 100644 --- a/src/core/Akka/Actor/ActorSystem.cs +++ b/src/core/Akka/Actor/ActorSystem.cs @@ -33,12 +33,13 @@ private ProviderSelection(string identifier, string fqn, bool hasCluster) internal string Fqn { get; } internal bool HasCluster { get; } + internal const string LocalActorRefProvider = "Akka.Actor.LocalActorRefProvider, Akka"; internal const string RemoteActorRefProvider = "Akka.Remote.RemoteActorRefProvider, Akka.Remote"; internal const string ClusterActorRefProvider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"; public sealed class Local : ProviderSelection { - private Local() : base("local", typeof(LocalActorRefProvider).FullName, false) + private Local() : base("local", LocalActorRefProvider, false) { } diff --git a/src/core/Akka/Actor/Settings.cs b/src/core/Akka/Actor/Settings.cs index 29e3fb3098f..e9169ad40ba 100644 --- a/src/core/Akka/Actor/Settings.cs +++ b/src/core/Akka/Actor/Settings.cs @@ -80,7 +80,7 @@ public Settings(ActorSystem system, Config config, ActorSystemSetup setup) var providerSelectionSetup = Setup.Get() .FlatSelect(_ => _.ActorRefProvider) - .Select(_ => _.Identifier) + .Select(_ => _.Fqn) .GetOrElse(Config.GetString("akka.actor.provider", null)); ProviderSelectionType = ProviderSelection.GetProvider(providerSelectionSetup); diff --git a/src/core/Akka/Dispatch/AbstractDispatcher.cs b/src/core/Akka/Dispatch/AbstractDispatcher.cs index 977024cd287..0b52c829763 100644 --- a/src/core/Akka/Dispatch/AbstractDispatcher.cs +++ b/src/core/Akka/Dispatch/AbstractDispatcher.cs @@ -218,9 +218,7 @@ private static DedicatedThreadPoolSettings ConfigureSettings(Config config) /// internal sealed class ThreadPoolExecutorServiceFactory : ExecutorServiceConfigurator { -#if APPDOMAIN private static readonly bool IsFullTrusted = AppDomain.CurrentDomain.IsFullyTrusted; -#endif /// /// TBD @@ -229,10 +227,9 @@ internal sealed class ThreadPoolExecutorServiceFactory : ExecutorServiceConfigur /// TBD public override ExecutorService Produce(string id) { -#if APPDOMAIN if (IsFullTrusted) return new FullThreadPoolExecutorServiceImpl(id); -#endif + return new PartialTrustThreadPoolExecutorService(id); } diff --git a/src/core/Akka/Dispatch/Dispatchers.cs b/src/core/Akka/Dispatch/Dispatchers.cs index 90c3f17b35e..1b630aa3031 100644 --- a/src/core/Akka/Dispatch/Dispatchers.cs +++ b/src/core/Akka/Dispatch/Dispatchers.cs @@ -11,7 +11,6 @@ using System.Threading.Tasks; using Akka.Actor; using Akka.Configuration; -using Akka.Configuration; using Helios.Concurrency; using ConfigurationFactory = Akka.Configuration.ConfigurationFactory; @@ -45,7 +44,6 @@ protected ThreadPoolExecutorService(string id) : base(id) } } -#if UNSAFE_THREADING /// /// INTERNAL API /// @@ -68,7 +66,6 @@ public FullThreadPoolExecutorServiceImpl(string id) : base(id) { } } -#endif /// /// INTERNAL API