From 128c10bf9bf2473a10f4d89b460dd81fa7e5dfe0 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Sat, 19 Dec 2020 00:22:29 -0600 Subject: [PATCH 1/6] implement FlushConsolidationHandler --- src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs index 0f377718a57..4d79206cd34 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs @@ -15,6 +15,11 @@ namespace Akka.Remote.Transport.DotNetty { + internal class FlushConsolidationHandler : ChannelDuplexHandler + { + + } + /// /// INTERNAL API. /// From 796ea5f55bd3db181f40880bb631ed3f1fdd7867 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Sat, 19 Dec 2020 01:04:56 -0600 Subject: [PATCH 2/6] first complete flush consilidator implementation --- .../Transport/DotNetty/BatchWriter.cs | 180 ++++++++++++++++++ .../Transport/DotNetty/DotNettyTransport.cs | 2 +- .../Transport/DotNetty/TcpTransport.cs | 2 +- 3 files changed, 182 insertions(+), 2 deletions(-) diff --git a/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs index 4d79206cd34..76791beb4ee 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs @@ -6,6 +6,7 @@ //----------------------------------------------------------------------- using System; +using System.Threading; using System.Threading.Tasks; using Akka.Actor; using DotNetty.Buffers; @@ -17,7 +18,186 @@ namespace Akka.Remote.Transport.DotNetty { internal class FlushConsolidationHandler : ChannelDuplexHandler { + /// + /// The default number of flushes after which a flush will be forwarded to downstream handlers (whether while in a + /// read loop, or while batching outside of a read loop). + /// + public const int DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES = 256; + + public int ExplicitFlushAfterFlushes { get; } + public bool ConsolidateWhenNoReadInProgress { get; } + + public readonly IScheduler Scheduler; + private int _flushPendingCount; + private bool _readInProgress; + private IChannelHandlerContext _context; + private CancellationTokenSource _nextScheduledFlush; + private FlushTask _flushTask; + private Func _flushFunc; + + private class FlushTask : IRunnable + { + private readonly FlushConsolidationHandler _handler; + + public FlushTask(FlushConsolidationHandler handler) + { + _handler = handler; + } + + public void Run() + { + if (_handler._flushPendingCount > 0 && !_handler._readInProgress) + { + _handler._flushPendingCount = 0; + _handler._nextScheduledFlush?.Dispose(); + _handler._nextScheduledFlush = null; + _handler._context.Flush(); + } // else we'll flush when the read completes + } + } + + public FlushConsolidationHandler() : this(DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true){} + + public FlushConsolidationHandler(int explicitFlushAfterFlushes, bool consolidateWhenNoReadInProgress) + { + ExplicitFlushAfterFlushes = explicitFlushAfterFlushes; + ConsolidateWhenNoReadInProgress = consolidateWhenNoReadInProgress; + if (consolidateWhenNoReadInProgress) + { + _flushTask = new FlushTask(this); + _flushFunc = () => + { + _flushTask?.Run(); + return true; + }; + } + } + public override void HandlerAdded(IChannelHandlerContext context) + { + _context = context; + } + + public override void Flush(IChannelHandlerContext context) + { + if (_readInProgress) + { + // If there is still a read in progress we are sure we will see a channelReadComplete(...) call. Thus + // we only need to flush if we reach the explicitFlushAfterFlushes limit. + if (++_flushPendingCount == ExplicitFlushAfterFlushes) + { + FlushNow(context); + } + } else if (ConsolidateWhenNoReadInProgress) + { + // Flush immediately if we reach the threshold, otherwise schedule + if (++_flushPendingCount == ExplicitFlushAfterFlushes) + { + FlushNow(context); + } + else + { + ScheduleFlush(context); + } + } + else + { + // Always flush directly + FlushNow(context); + } + } + + public override void ChannelReadComplete(IChannelHandlerContext context) + { + // This may be the last event in the read loop, so flush now! + ResetReadAndFlushIfNeeded(context); + context.FireChannelReadComplete(); + } + + public override void ChannelRead(IChannelHandlerContext context, object message) + { + _readInProgress = true; + context.FireChannelRead(message); + } + + public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) + { + // To ensure we not miss to flush anything, do it now. + ResetReadAndFlushIfNeeded(context); + context.FireExceptionCaught(exception); + } + + public override Task DisconnectAsync(IChannelHandlerContext context) + { + // Try to flush one last time if flushes are pending before disconnect the channel. + ResetReadAndFlushIfNeeded(context); + return context.DisconnectAsync(); + } + + public override Task CloseAsync(IChannelHandlerContext context) + { + // Try to flush one last time if flushes are pending before disconnect the channel. + ResetReadAndFlushIfNeeded(context); + return context.CloseAsync(); + } + + public override void ChannelWritabilityChanged(IChannelHandlerContext context) + { + if (!context.Channel.IsWritable) + { + // The writability of the channel changed to false, so flush all consolidated flushes now to free up memory. + FlushIfNeeded(context); + } + + context.FireChannelWritabilityChanged(); + } + + public override void HandlerRemoved(IChannelHandlerContext context) + { + FlushIfNeeded(context); + _flushFunc = null; + _flushTask = null; + } + + private void ResetReadAndFlushIfNeeded(IChannelHandlerContext ctx) + { + _readInProgress = false; + FlushIfNeeded(ctx); + } + + private void FlushIfNeeded(IChannelHandlerContext ctx) + { + if (_flushPendingCount > 0) + { + FlushNow(ctx); + } + } + + private void FlushNow(IChannelHandlerContext ctx) + { + CancelScheduledFlush(); + _flushPendingCount = 0; + ctx.Flush(); + } + + private void ScheduleFlush(IChannelHandlerContext ctx) + { + if (_nextScheduledFlush == null && ConsolidateWhenNoReadInProgress) + { + // Run as soon as possible, but still yield to give a chance for additional writes to enqueue. + _nextScheduledFlush = new CancellationTokenSource(); + ctx.Channel.EventLoop.SubmitAsync(_flushFunc, _nextScheduledFlush.Token); + } + } + + private void CancelScheduledFlush() + { + if (_nextScheduledFlush != null) + { + _nextScheduledFlush.Cancel(false); + _nextScheduledFlush = null; + } + } } /// diff --git a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs index 8aa82cdb6b4..01ae5918e48 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs @@ -329,7 +329,7 @@ private void SetInitialChannelPipeline(IChannel channel) } if(Settings.BatchWriterSettings.EnableBatching) - pipeline.AddLast("BatchWriter", new BatchWriter(Settings.BatchWriterSettings, System.Scheduler)); + pipeline.AddLast("BatchWriter", new FlushConsolidationHandler()); } private void SetClientPipeline(IChannel channel, Address remoteAddress) diff --git a/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs b/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs index 37dd4c92199..0c689c5f9f2 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs @@ -176,7 +176,7 @@ public override bool Write(ByteString payload) if (_channel.Open) { var data = ToByteBuffer(_channel, payload); - _channel.WriteAsync(data); + _channel.WriteAndFlushAsync(data); return true; } return false; From 9cb131ee98ab13dffdc187fa02b9874a4e602cd7 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 22 Dec 2020 12:20:08 -0600 Subject: [PATCH 3/6] styling --- src/common.props | 30 ++----------------- .../Transport/DotNetty/BatchWriter.cs | 5 ++-- 2 files changed, 4 insertions(+), 31 deletions(-) diff --git a/src/common.props b/src/common.props index 8bb72f862b0..da8344b665c 100644 --- a/src/common.props +++ b/src/common.props @@ -2,7 +2,7 @@ Copyright © 2013-2020 Akka.NET Team Akka.NET Team - 1.4.13 + 1.4.14 https://getakka.net/images/akkalogo.png https://github.com/akkadotnet/akka.net https://github.com/akkadotnet/akka.net/blob/master/LICENSE @@ -28,33 +28,7 @@ true - Maintenance Release for Akka.NET 1.4** -Akka.NET v1.4.13 includes a number of bug fixes and enhancements: -`AppVersion` now uses Assembly Version by Default** -The new `AppVersion` setting, which is used to communicate application version numbers throughout Akka.Cluster and is used in scenarios such as Akka.Cluster.Sharding to help determine which nodes receive new shard allocations and which ones do not, now uses the following default HOCON setting: -``` -akka.cluster.app-version = assembly-version -``` -By default now the `AppVersion` communicated inside Akka.Cluster `Member` events uses the `Major.Minor.BuildNumber` from the `Assembly.GetEntryssembly()` or `Assembly.GetExecutingAssembly()` (in case the `EntryAssembly` is `null`). That way any updates made to your executable's (i.e. the .dll that hosts `Program.cs`) version number will be automatically reflected in the cluster now without Akka.NET developers having to set an additional configuration value during deployments. -Other bug fixes and improvements: -[Akka.IO: UdpExt.Manager: OverflowException when sending UDP packets to terminated clients](https://github.com/akkadotnet/akka.net/issues/4641) -[Akka.Configuration / Akka.Streams: Memory Leak when using many short lived instances of ActorMaterializer](https://github.com/akkadotnet/akka.net/issues/4659) -[Akka: Deprecate `PatternMatch`](https://github.com/akkadotnet/akka.net/issues/4658) -[Akka: FSM: exception in LogTermination changes stopEvent.Reason to Shutdown](https://github.com/akkadotnet/akka.net/issues/3723) -[Akka.Cluster.Tools: ClusterSingleton - Ignore possible state change in start](https://github.com/akkadotnet/akka.net/pull/4646) -[Akka.Cluster.Tools: DistributedPubSub - new setting and small fixes](https://github.com/akkadotnet/akka.net/pull/4649) -[Akka.DistributedData: `KeyNotFoundException` thrown periodically](https://github.com/akkadotnet/akka.net/issues/4639) -To see the [full set of fixes in Akka.NET v1.4.13, please see the milestone on Github](https://github.com/akkadotnet/akka.net/milestone/44). -| COMMITS | LOC+ | LOC- | AUTHOR | -| --- | --- | --- | --- | -| 5 | 316 | 29 | Aaron Stannard | -| 2 | 53 | 8 | Gregorius Soedharmo | -| 2 | 223 | 197 | zbynek001 | -| 2 | 2 | 2 | dependabot-preview[bot] | -| 2 | 11 | 3 | Ebere Abanonu | -| 1 | 37 | 27 | Razvan Goga | -| 1 | 217 | 11 | motmot80 | -| 1 | 2 | 0 | Ismael Hamed | + Placeholder for nightlies** diff --git a/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs index 76791beb4ee..745f15fee06 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs @@ -22,12 +22,11 @@ internal class FlushConsolidationHandler : ChannelDuplexHandler /// The default number of flushes after which a flush will be forwarded to downstream handlers (whether while in a /// read loop, or while batching outside of a read loop). /// - public const int DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES = 256; + public const int DefaultExplicitFlushAfterFlushes = 256; public int ExplicitFlushAfterFlushes { get; } public bool ConsolidateWhenNoReadInProgress { get; } - public readonly IScheduler Scheduler; private int _flushPendingCount; private bool _readInProgress; private IChannelHandlerContext _context; @@ -56,7 +55,7 @@ public void Run() } } - public FlushConsolidationHandler() : this(DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true){} + public FlushConsolidationHandler() : this(DefaultExplicitFlushAfterFlushes, true){} public FlushConsolidationHandler(int explicitFlushAfterFlushes, bool consolidateWhenNoReadInProgress) { From fc984ce8cefa3ed506c22f99ef27d5d818f704d9 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 22 Dec 2020 17:04:32 -0600 Subject: [PATCH 4/6] fixed batch writer specs --- .../Akka.Remote.Tests/RemoteConfigSpec.cs | 4 +- .../Transport/DotNettyBatchWriterSpecs.cs | 20 +- .../Transport/DotNetty/BatchWriter.cs | 207 ++++-------------- 3 files changed, 50 insertions(+), 181 deletions(-) diff --git a/src/core/Akka.Remote.Tests/RemoteConfigSpec.cs b/src/core/Akka.Remote.Tests/RemoteConfigSpec.cs index 4fb72cfcca9..0a9ebd4717e 100644 --- a/src/core/Akka.Remote.Tests/RemoteConfigSpec.cs +++ b/src/core/Akka.Remote.Tests/RemoteConfigSpec.cs @@ -174,9 +174,7 @@ public void Remoting_should_contain_correct_BatchWriter_settings_in_ReferenceCon var s = DotNettyTransportSettings.Create(c); s.BatchWriterSettings.EnableBatching.Should().BeTrue(); - s.BatchWriterSettings.FlushInterval.Should().Be(BatchWriterSettings.DefaultFlushInterval); - s.BatchWriterSettings.MaxPendingBytes.Should().Be(BatchWriterSettings.DefaultMaxPendingBytes); - s.BatchWriterSettings.MaxPendingWrites.Should().Be(BatchWriterSettings.DefaultMaxPendingWrites); + s.BatchWriterSettings.MaxExplicitFlushes.Should().Be(BatchWriterSettings.DefaultMaxPendingWrites); } } } diff --git a/src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs b/src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs index 46b3b356757..b511a853a3a 100644 --- a/src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs +++ b/src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs @@ -66,17 +66,13 @@ public void Bugfix4434_should_overwrite_default_BatchWriterSettings() batching{ enabled = false max-pending-writes = 50 - max-pending-bytes = 32k - flush-interval = 10ms } } "; var s = DotNettyTransportSettings.Create(c.GetConfig("akka.remote.dot-netty.tcp")); s.BatchWriterSettings.EnableBatching.Should().BeFalse(); - s.BatchWriterSettings.FlushInterval.Should().NotBe(BatchWriterSettings.DefaultFlushInterval); - s.BatchWriterSettings.MaxPendingBytes.Should().NotBe(BatchWriterSettings.DefaultMaxPendingBytes); - s.BatchWriterSettings.MaxPendingWrites.Should().NotBe(BatchWriterSettings.DefaultMaxPendingWrites); + s.BatchWriterSettings.MaxExplicitFlushes.Should().NotBe(BatchWriterSettings.DefaultMaxPendingWrites); } /// @@ -85,7 +81,7 @@ public void Bugfix4434_should_overwrite_default_BatchWriterSettings() [Fact] public async Task BatchWriter_should_succeed_with_timer() { - var writer = new BatchWriter(new BatchWriterSettings(), Sys.Scheduler); + var writer = new FlushConsolidationHandler(); var ch = new EmbeddedChannel(Flush, writer); await Flush.Activated; @@ -98,14 +94,13 @@ public async Task BatchWriter_should_succeed_with_timer() var ints = Enumerable.Range(0, 4).ToArray(); foreach (var i in ints) { - _ = ch.WriteAsync(Unpooled.Buffer(1).WriteInt(i)); + _ = ch.WriteAndFlushAsync(Unpooled.Buffer(1).WriteInt(i)); } // force write tasks to run ch.RunPendingTasks(); - ch.Unsafe.OutboundBuffer.TotalPendingWriteBytes().Should().Be(ints.Length * 4); - ch.OutboundMessages.Count.Should().Be(0); + ch.OutboundMessages.Count.Should().Be(ints.Length); await AwaitAssertAsync(() => { @@ -124,7 +119,7 @@ await AwaitAssertAsync(() => [Fact] public async Task BatchWriter_should_flush_messages_during_shutdown() { - var writer = new BatchWriter(new BatchWriterSettings(), Sys.Scheduler); + var writer = new FlushConsolidationHandler(); var ch = new EmbeddedChannel(Flush, writer); await Flush.Activated; @@ -136,14 +131,13 @@ public async Task BatchWriter_should_flush_messages_during_shutdown() var ints = Enumerable.Range(0, 10).ToArray(); foreach (var i in ints) { - _ = ch.WriteAsync(Unpooled.Buffer(1).WriteInt(i)); + _ = ch.WriteAndFlushAsync(Unpooled.Buffer(1).WriteInt(i)); } // force write tasks to run ch.RunPendingTasks(); - ch.Unsafe.OutboundBuffer.TotalPendingWriteBytes().Should().Be(ints.Length * 4); - ch.OutboundMessages.Count.Should().Be(0); + ch.OutboundMessages.Count.Should().Be(ints.Length); // close channels _ = ch.CloseAsync(); diff --git a/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs index 745f15fee06..8878265486b 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs @@ -8,14 +8,32 @@ using System; using System.Threading; using System.Threading.Tasks; -using Akka.Actor; -using DotNetty.Buffers; -using DotNetty.Common.Concurrency; using DotNetty.Transport.Channels; using Akka.Configuration; namespace Akka.Remote.Transport.DotNetty { + + /* Adapted and Derived from https://github.com/netty/netty/blob/4.1/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java */ + /* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + + /// + /// INTERNAL API + /// internal class FlushConsolidationHandler : ChannelDuplexHandler { /// @@ -31,45 +49,34 @@ internal class FlushConsolidationHandler : ChannelDuplexHandler private bool _readInProgress; private IChannelHandlerContext _context; private CancellationTokenSource _nextScheduledFlush; - private FlushTask _flushTask; - private Func _flushFunc; - private class FlushTask : IRunnable - { - private readonly FlushConsolidationHandler _handler; - public FlushTask(FlushConsolidationHandler handler) + private static bool TryFlush(FlushConsolidationHandler handler) + { + if (handler._flushPendingCount > 0 && !handler._readInProgress) { - _handler = handler; - } + handler._flushPendingCount = 0; + handler._nextScheduledFlush?.Dispose(); + handler._nextScheduledFlush = null; + handler._context.Flush(); + return true; + } // else we'll flush when the read completes - public void Run() - { - if (_handler._flushPendingCount > 0 && !_handler._readInProgress) - { - _handler._flushPendingCount = 0; - _handler._nextScheduledFlush?.Dispose(); - _handler._nextScheduledFlush = null; - _handler._context.Flush(); - } // else we'll flush when the read completes - } + // didn't flush + return false; } + // cache the delegate + private static readonly Func FlushOp = obj => TryFlush((FlushConsolidationHandler)obj); + public FlushConsolidationHandler() : this(DefaultExplicitFlushAfterFlushes, true){} + public FlushConsolidationHandler(int explicitFlushAfterFlushes) : this(explicitFlushAfterFlushes, true) { } + public FlushConsolidationHandler(int explicitFlushAfterFlushes, bool consolidateWhenNoReadInProgress) { ExplicitFlushAfterFlushes = explicitFlushAfterFlushes; ConsolidateWhenNoReadInProgress = consolidateWhenNoReadInProgress; - if (consolidateWhenNoReadInProgress) - { - _flushTask = new FlushTask(this); - _flushFunc = () => - { - _flushTask?.Run(); - return true; - }; - } } public override void HandlerAdded(IChannelHandlerContext context) @@ -154,8 +161,6 @@ public override void ChannelWritabilityChanged(IChannelHandlerContext context) public override void HandlerRemoved(IChannelHandlerContext context) { FlushIfNeeded(context); - _flushFunc = null; - _flushTask = null; } private void ResetReadAndFlushIfNeeded(IChannelHandlerContext ctx) @@ -185,7 +190,7 @@ private void ScheduleFlush(IChannelHandlerContext ctx) { // Run as soon as possible, but still yield to give a chance for additional writes to enqueue. _nextScheduledFlush = new CancellationTokenSource(); - ctx.Channel.EventLoop.SubmitAsync(_flushFunc, _nextScheduledFlush.Token); + ctx.Channel.EventLoop.SubmitAsync(FlushOp, this, _nextScheduledFlush.Token); } } @@ -213,18 +218,14 @@ internal class BatchWriterSettings public BatchWriterSettings(Config hocon) { EnableBatching = hocon.GetBoolean("enabled", true); - MaxPendingWrites = hocon.GetInt("max-pending-writes", DefaultMaxPendingWrites); - MaxPendingBytes = hocon.GetByteSize("max-pending-bytes", null) ?? DefaultMaxPendingBytes; - FlushInterval = hocon.GetTimeSpan("flush-interval", DefaultFlushInterval, false); + MaxExplicitFlushes = hocon.GetInt("max-pending-writes", DefaultMaxPendingWrites); } public BatchWriterSettings(TimeSpan? maxDuration = null, bool enableBatching = true, - int maxPendingWrites = DefaultMaxPendingWrites, long maxPendingBytes = DefaultMaxPendingBytes) + int maxExplicitFlushes = DefaultMaxPendingWrites, long maxPendingBytes = DefaultMaxPendingBytes) { EnableBatching = enableBatching; - MaxPendingWrites = maxPendingWrites; - FlushInterval = maxDuration ?? DefaultFlushInterval; - MaxPendingBytes = maxPendingBytes; + MaxExplicitFlushes = maxExplicitFlushes; } /// @@ -241,130 +242,6 @@ public BatchWriterSettings(TimeSpan? maxDuration = null, bool enableBatching = t /// /// Defaults to 30. /// - public int MaxPendingWrites { get; } - - /// - /// In the event of low-traffic channels, the maximum amount of time we'll wait before flushing writes. - /// - /// - /// Defaults to 40 milliseconds. - /// - public TimeSpan FlushInterval { get; } - - /// - /// The maximum number of outstanding bytes that can be written prior to a flush. - /// - /// - /// Defaults to 16kb. - /// - public long MaxPendingBytes { get; } - } - - /// - /// INTERNAL API. - /// - /// Responsible for batching socket writes together into fewer sys calls to the socket. - /// - internal class BatchWriter : ChannelHandlerAdapter - { - private class FlushCmd - { - public static readonly FlushCmd Instance = new FlushCmd(); - private FlushCmd() { } - } - - public readonly BatchWriterSettings Settings; - public readonly IScheduler Scheduler; - private ICancelable _flushSchedule; - - public BatchWriter(BatchWriterSettings settings, IScheduler scheduler) - { - Settings = settings; - Scheduler = scheduler; - } - - private int _currentPendingWrites = 0; - private long _currentPendingBytes; - - public bool HasPendingWrites => _currentPendingWrites > 0; - - public override void HandlerAdded(IChannelHandlerContext context) - { - ScheduleFlush(context); // only schedule flush operations when batching is enabled - base.HandlerAdded(context); - } - - public override Task WriteAsync(IChannelHandlerContext context, object message) - { - /* - * Need to add the write to the rest of the pipeline first before we - * include it in the formula for determining whether or not we flush - * right now. The reason being is that if we did this the other way around, - * we could flush first before the write was in the "flushable" buffer and - * this can lead to "dangling writes" that never actually get transmitted - * across the network. - */ - var write = base.WriteAsync(context, message); - - _currentPendingBytes += ((IByteBuffer)message).ReadableBytes; - _currentPendingWrites++; - if (_currentPendingWrites >= Settings.MaxPendingWrites - || _currentPendingBytes >= Settings.MaxPendingBytes) - { - context.Flush(); - Reset(); - } - - return write; - } - - public override void Flush(IChannelHandlerContext context) - { - // reset statistics upon flush - Reset(); - base.Flush(context); - } - - public override void UserEventTriggered(IChannelHandlerContext context, object evt) - { - if (evt is FlushCmd) - { - if (HasPendingWrites) - { - context.Flush(); - Reset(); - } - } - else - { - base.UserEventTriggered(context, evt); - } - } - - public override Task CloseAsync(IChannelHandlerContext context) - { - // flush any pending writes first - context.Flush(); - _flushSchedule?.Cancel(); - return base.CloseAsync(context); - } - - private void ScheduleFlush(IChannelHandlerContext context) - { - // Schedule a recurring flush - only fires when there's writable data - _flushSchedule = Scheduler.Advanced.ScheduleRepeatedlyCancelable(Settings.FlushInterval, - Settings.FlushInterval, - () => - { - // want to fire this event through the top of the pipeline - context.Channel.Pipeline.FireUserEventTriggered(FlushCmd.Instance); - }); - } - - public void Reset() - { - _currentPendingWrites = 0; - _currentPendingBytes = 0; - } + public int MaxExplicitFlushes { get; } } } From 0acef2488ff39dc2ea33830ae4d55ed7e60341f2 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 22 Dec 2020 17:10:14 -0600 Subject: [PATCH 5/6] removed previous BatchWriter settings + handle --- .../Transport/DotNetty/DotNettyTransport.cs | 2 +- .../Transport/DotNetty/TcpTransport.cs | 60 +++---------------- 2 files changed, 10 insertions(+), 52 deletions(-) diff --git a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs index 01ae5918e48..57f09d22f36 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs @@ -329,7 +329,7 @@ private void SetInitialChannelPipeline(IChannel channel) } if(Settings.BatchWriterSettings.EnableBatching) - pipeline.AddLast("BatchWriter", new FlushConsolidationHandler()); + pipeline.AddLast("BatchWriter", new FlushConsolidationHandler(Settings.BatchWriterSettings.MaxExplicitFlushes)); } private void SetClientPipeline(IChannel channel, Address remoteAddress) diff --git a/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs b/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs index 0c689c5f9f2..42a5bd276f2 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs @@ -35,14 +35,12 @@ protected TcpHandlers(DotNettyTransport transport, ILoggingAdapter log) : base(t protected override void RegisterListener(IChannel channel, IHandleEventListener listener, object msg, IPEndPoint remoteAddress) { - this._listener = listener; + _listener = listener; } protected override AssociationHandle CreateHandle(IChannel channel, Address localAddress, Address remoteAddress) { - if(Transport.Settings.BatchWriterSettings.EnableBatching) - return new BatchingTcpAssociationHandle(localAddress, remoteAddress, Transport, channel); - return new NonBatchingTcpAssociationHandle(localAddress, remoteAddress, Transport, channel); + return new TcpAssociationHandle(localAddress, remoteAddress, Transport, channel); } public override void ChannelInactive(IChannelHandlerContext context) @@ -105,7 +103,7 @@ internal sealed class TcpServerHandler : TcpHandlers public TcpServerHandler(DotNettyTransport transport, ILoggingAdapter log, Task associationEventListener) : base(transport, log) { - this._associationEventListener = associationEventListener; + _associationEventListener = associationEventListener; } public override void ChannelActive(IChannelHandlerContext context) @@ -126,8 +124,7 @@ void InitInbound(IChannel channel, IPEndPoint socketAddress, object msg) socketAddress: socketAddress, schemeIdentifier: Transport.SchemeIdentifier, systemName: Transport.System.Name); - AssociationHandle handle; - Init(channel, socketAddress, remoteAddress, msg, out handle); + Init(channel, socketAddress, remoteAddress, msg, out var handle); listener.Notify(new InboundAssociation(handle)); }, TaskContinuationOptions.OnlyOnRanToCompletion); } @@ -155,52 +152,16 @@ public override void ChannelActive(IChannelHandlerContext context) private void InitOutbound(IChannel channel, IPEndPoint socketAddress, object msg) { - AssociationHandle handle; - Init(channel, socketAddress, _remoteAddress, msg, out handle); + Init(channel, socketAddress, _remoteAddress, msg, out var handle); _statusPromise.TrySetResult(handle); } } - internal sealed class BatchingTcpAssociationHandle : AssociationHandle + internal sealed class TcpAssociationHandle : AssociationHandle { private readonly IChannel _channel; - public BatchingTcpAssociationHandle(Address localAddress, Address remoteAddress, DotNettyTransport transport, IChannel channel) - : base(localAddress, remoteAddress) - { - _channel = channel; - } - - public override bool Write(ByteString payload) - { - if (_channel.Open) - { - var data = ToByteBuffer(_channel, payload); - _channel.WriteAndFlushAsync(data); - return true; - } - return false; - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static IByteBuffer ToByteBuffer(IChannel channel, ByteString payload) - { - var buffer = Unpooled.WrappedBuffer(payload.ToByteArray()); - return buffer; - } - - public override void Disassociate() - { - _channel.Flush(); // flush before we close - _channel.CloseAsync(); - } - } - - internal sealed class NonBatchingTcpAssociationHandle : AssociationHandle - { - private readonly IChannel _channel; - - public NonBatchingTcpAssociationHandle(Address localAddress, Address remoteAddress, DotNettyTransport transport, IChannel channel) + public TcpAssociationHandle(Address localAddress, Address remoteAddress, DotNettyTransport transport, IChannel channel) : base(localAddress, remoteAddress) { _channel = channel; @@ -251,19 +212,16 @@ protected override async Task AssociateInternal(Address remot { throw HandleConnectException(remoteAddress, c, null); } - catch (AggregateException e) when (e.InnerException is ConnectException) + catch (AggregateException e) when (e.InnerException is ConnectException cause) { - var cause = (ConnectException)e.InnerException; throw HandleConnectException(remoteAddress, cause, e); } catch (ConnectTimeoutException t) { throw new InvalidAssociationException(t.Message); } - catch (AggregateException e) when (e.InnerException is ConnectTimeoutException) + catch (AggregateException e) when (e.InnerException is ConnectTimeoutException cause) { - var cause = (ConnectTimeoutException)e.InnerException; - throw new InvalidAssociationException(cause.Message); } } From 05b18ac877fc06b7915db7d70367c365c5d1aab2 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 22 Dec 2020 17:11:26 -0600 Subject: [PATCH 6/6] removed unused batching settings from Akka.Remote --- .../Akka.Remote/Configuration/Remote.conf | 23 ++----------------- 1 file changed, 2 insertions(+), 21 deletions(-) diff --git a/src/core/Akka.Remote/Configuration/Remote.conf b/src/core/Akka.Remote/Configuration/Remote.conf index 8ebcd54d840..364108afb2c 100644 --- a/src/core/Akka.Remote/Configuration/Remote.conf +++ b/src/core/Akka.Remote/Configuration/Remote.conf @@ -444,28 +444,9 @@ akka { # fast (< 40ms) acknowledgement for all periodic messages. enabled = true - # The max write threshold based on the number of logical messages regardless of their size. - # This is a safe default value - decrease it if you have a small number of remote actors - # who engage in frequent request->response communication which requires low latency (< 40ms). + # The maximum number of explicit flushes that will be allowed before being batched. + # Batching normally happens when the channel switches from writes to reads and self-tunes automatically. max-pending-writes = 30 - - # The max write threshold based on the byte size of all buffered messages. If there are 4 messages - # waiting to be written (with batching.max-pending-writes = 30) but their total size is greater than - # batching.max-pending-bytes, a flush will be triggered immediately. - # - # Increase this value is you have larger message sizes and watch to take advantage of batching, but - # otherwise leave it as-is. - # - # NOTE: this value should always be smaller than dot-netty.tcp.maximum-frame-size. - max-pending-bytes = 16k - - # In the event that neither the batching.max-pending-writes or batching.max-pending-bytes - # is hit we guarantee that all pending writes will be flushed within this interval. - # - # This setting, realistically, can't be enforced any lower than the OS' clock resolution (~20ms). - # If you have a very low-traffic system, either disable pooling altogether or lower the batching.max-pending-writes - # threshold to maximize throughput. Otherwise, leave this setting as-is. - flush-interval = 40ms } # If set to "" then the specified dispatcher