Skip to content

Commit a9d0723

Browse files
authored
STE: Provide indicators of progress (#517)
extra: give access to executors within executor group
1 parent 5bfd679 commit a9d0723

16 files changed

+110
-23
lines changed

src/DotNetty.Common/Concurrency/AbstractEventExecutor.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
namespace DotNetty.Common.Concurrency
55
{
66
using System;
7+
using System.Collections.Generic;
78
using System.Threading;
89
using System.Threading.Tasks;
910
using DotNetty.Common.Internal.Logging;
@@ -46,6 +47,11 @@ protected AbstractEventExecutor(IEventExecutorGroup parent)
4647
/// <inheritdoc cref="IEventExecutor"/>
4748
public bool InEventLoop => this.IsInEventLoop(Thread.CurrentThread);
4849

50+
/// <inheritdoc cref="IEventExecutor" />
51+
public IEnumerable<IEventExecutor> Items => this.GetItems();
52+
53+
protected abstract IEnumerable<IEventExecutor> GetItems();
54+
4955
/// <inheritdoc cref="IEventExecutor"/>
5056
public abstract bool IsInEventLoop(Thread thread);
5157

src/DotNetty.Common/Concurrency/AbstractEventExecutorGroup.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
namespace DotNetty.Common.Concurrency
55
{
66
using System;
7+
using System.Collections.Generic;
78
using System.Threading;
89
using System.Threading.Tasks;
910

@@ -20,6 +21,8 @@ public abstract class AbstractEventExecutorGroup : IEventExecutorGroup
2021

2122
public abstract Task TerminationCompletion { get; }
2223

24+
public IEnumerable<IEventExecutor> Items => this.GetItems();
25+
2326
public abstract IEventExecutor GetNext();
2427

2528
public void Execute(IRunnable task) => this.GetNext().Execute(task);
@@ -65,5 +68,7 @@ public abstract class AbstractEventExecutorGroup : IEventExecutorGroup
6568
public Task ShutdownGracefullyAsync() => this.ShutdownGracefullyAsync(DefaultShutdownQuietPeriod, DefaultShutdownTimeout);
6669

6770
public abstract Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout);
71+
72+
protected abstract IEnumerable<IEventExecutor> GetItems();
6873
}
6974
}

src/DotNetty.Common/Concurrency/IEventExecutorGroup.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,19 @@
44
namespace DotNetty.Common.Concurrency
55
{
66
using System;
7+
using System.Collections.Generic;
78
using System.Threading.Tasks;
89

910
/// <summary>
1011
/// Provides an access to a set of <see cref="IEventExecutor"/>s it manages.
1112
/// </summary>
1213
public interface IEventExecutorGroup : IScheduledExecutorService
1314
{
15+
/// <summary>
16+
/// Returns list of owned event executors.
17+
/// </summary>
18+
IEnumerable<IEventExecutor> Items { get; }
19+
1420
/// <summary>
1521
/// Returns <c>true</c> if and only if this executor is being shut down via <see cref="ShutdownGracefullyAsync()" />.
1622
/// </summary>

src/DotNetty.Common/Concurrency/SingleThreadEventExecutor.cs

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class SingleThreadEventExecutor : AbstractScheduledEventExecutor
4545
PreciseTimeSpan gracefulShutdownQuietPeriod;
4646
PreciseTimeSpan gracefulShutdownTimeout;
4747
readonly ISet<Action> shutdownHooks = new HashSet<Action>();
48+
long progress;
4849

4950
/// <summary>Creates a new instance of <see cref="SingleThreadEventExecutor"/>.</summary>
5051
public SingleThreadEventExecutor(string threadName, TimeSpan breakoutInterval)
@@ -86,6 +87,21 @@ protected SingleThreadEventExecutor(IEventExecutorGroup parent, string threadNam
8687
/// </summary>
8788
public TaskScheduler Scheduler => this.scheduler;
8889

90+
/// <summary>
91+
/// Allows to track whether executor is progressing through its backlog. Useful for diagnosing / mitigating stalls due to blocking calls in conjunction with IsBacklogEmpty property.
92+
/// </summary>
93+
public long Progress => Volatile.Read(ref this.progress);
94+
95+
/// <summary>
96+
/// Indicates whether executor's backlog is empty. Useful for diagnosing / mitigating stalls due to blocking calls in conjunction with Progress property.
97+
/// </summary>
98+
public bool IsBacklogEmpty => this.taskQueue.IsEmpty;
99+
100+
/// <summary>
101+
/// Gets length of backlog of tasks queued for immediate execution.
102+
/// </summary>
103+
public int BacklogLength => this.taskQueue.Count;
104+
89105
void Loop()
90106
{
91107
this.SetCurrentExecutor(this);
@@ -140,6 +156,8 @@ public override void Execute(IRunnable task)
140156
}
141157
}
142158

159+
protected override IEnumerable<IEventExecutor> GetItems() => new[] { this };
160+
143161
protected void WakeUp(bool inEventLoop)
144162
{
145163
if (!inEventLoop || (this.executionState == ST_SHUTTING_DOWN))
@@ -152,12 +170,12 @@ protected void WakeUp(bool inEventLoop)
152170
/// Adds an <see cref="Action"/> which will be executed on shutdown of this instance.
153171
/// </summary>
154172
/// <param name="action">The <see cref="Action"/> to run on shutdown.</param>
155-
public void AddShutdownHook(Action action)
173+
public void AddShutdownHook(Action action)
156174
{
157-
if (this.InEventLoop)
175+
if (this.InEventLoop)
158176
{
159177
this.shutdownHooks.Add(action);
160-
}
178+
}
161179
else
162180
{
163181
this.Execute(() => this.shutdownHooks.Add(action));
@@ -169,53 +187,53 @@ public void AddShutdownHook(Action action)
169187
/// executed on shutdown of this instance.
170188
/// </summary>
171189
/// <param name="action">The <see cref="Action"/> to remove.</param>
172-
public void RemoveShutdownHook(Action action)
190+
public void RemoveShutdownHook(Action action)
173191
{
174-
if (this.InEventLoop)
192+
if (this.InEventLoop)
175193
{
176194
this.shutdownHooks.Remove(action);
177-
}
195+
}
178196
else
179197
{
180198
this.Execute(() => this.shutdownHooks.Remove(action));
181199
}
182200
}
183201

184-
bool RunShutdownHooks()
202+
bool RunShutdownHooks()
185203
{
186204
bool ran = false;
187-
205+
188206
// Note shutdown hooks can add / remove shutdown hooks.
189-
while (this.shutdownHooks.Count > 0)
207+
while (this.shutdownHooks.Count > 0)
190208
{
191209
var copy = this.shutdownHooks.ToArray();
192210
this.shutdownHooks.Clear();
193211

194212
for (var i = 0; i < copy.Length; i++)
195213
{
196-
try
214+
try
197215
{
198216
copy[i]();
199-
}
200-
catch (Exception ex)
217+
}
218+
catch (Exception ex)
201219
{
202220
Logger.Warn("Shutdown hook raised an exception.", ex);
203-
}
204-
finally
221+
}
222+
finally
205223
{
206224
ran = true;
207225
}
208226
}
209227
}
210228

211-
if (ran)
229+
if (ran)
212230
{
213231
this.lastExecutionTime = PreciseTimeSpan.FromStart;
214232
}
215233

216234
return ran;
217235
}
218-
236+
219237

220238
/// <inheritdoc cref="IEventExecutor"/>
221239
public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout)
@@ -398,6 +416,7 @@ protected bool RunAllTasks()
398416

399417
while (true)
400418
{
419+
Volatile.Write(ref this.progress, this.progress + 1); // volatile write is enough as this is the only thread ever writing
401420
SafeExecute(task);
402421
task = this.PollTask();
403422
if (task == null)

src/DotNetty.Transport.Libuv/DispatcherEventLoop.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
namespace DotNetty.Transport.Libuv
55
{
66
using System;
7+
using System.Collections.Generic;
78
using System.Diagnostics;
89
using System.Diagnostics.Contracts;
910
using System.Runtime.InteropServices;
@@ -69,5 +70,7 @@ internal void Dispatch(NativeHandle handle)
6970
public Task RegisterAsync(IChannel channel) => channel.Unsafe.RegisterAsync(this);
7071

7172
public new IEventLoopGroup Parent => (IEventLoopGroup)base.Parent;
73+
74+
public new IEnumerable<IEventLoop> Items => new[] { this };
7275
}
7376
}

src/DotNetty.Transport.Libuv/DispatcherEventLoopGroup.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
namespace DotNetty.Transport.Libuv
77
{
88
using System;
9+
using System.Collections.Generic;
910
using System.Threading.Tasks;
1011
using DotNetty.Common.Concurrency;
1112
using DotNetty.Transport.Channels;
@@ -29,6 +30,10 @@ public DispatcherEventLoopGroup()
2930

3031
internal DispatcherEventLoop Dispatcher => this.dispatcherEventLoop;
3132

33+
protected override IEnumerable<IEventExecutor> GetItems() => new[] { this.dispatcherEventLoop };
34+
35+
public new IEnumerable<IEventLoop> Items => new[] { this.dispatcherEventLoop };
36+
3237
IEventLoop IEventLoopGroup.GetNext() => (IEventLoop)this.GetNext();
3338

3439
public override IEventExecutor GetNext() => this.dispatcherEventLoop;

src/DotNetty.Transport.Libuv/EventLoop.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
namespace DotNetty.Transport.Libuv
55
{
6+
using System.Collections.Generic;
67
using System.Threading.Tasks;
78
using DotNetty.Transport.Channels;
89

@@ -19,5 +20,7 @@ public EventLoop(IEventLoopGroup parent, string threadName)
1920
public Task RegisterAsync(IChannel channel) => channel.Unsafe.RegisterAsync(this);
2021

2122
public new IEventLoopGroup Parent => (IEventLoopGroup)base.Parent;
23+
24+
public new IEnumerable<IEventLoop> Items => new[] { this };
2225
}
2326
}

src/DotNetty.Transport.Libuv/EventLoopGroup.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
namespace DotNetty.Transport.Libuv
77
{
88
using System;
9+
using System.Collections.Generic;
910
using System.Linq;
1011
using System.Threading;
1112
using System.Threading.Tasks;
@@ -27,6 +28,8 @@ public sealed class EventLoopGroup : AbstractEventExecutorGroup, IEventLoopGroup
2728

2829
public override Task TerminationCompletion { get; }
2930

31+
public new IEnumerable<IEventLoop> Items => this.eventLoops;
32+
3033
public EventLoopGroup()
3134
: this(DefaultEventLoopCount)
3235
{
@@ -119,5 +122,7 @@ public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan time
119122
}
120123
return this.TerminationCompletion;
121124
}
125+
126+
protected override IEnumerable<IEventExecutor> GetItems() => this.eventLoops;
122127
}
123128
}

src/DotNetty.Transport.Libuv/LoopExecutor.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
namespace DotNetty.Transport.Libuv
88
{
99
using System;
10+
using System.Collections.Generic;
1011
using System.Diagnostics;
1112
using System.Diagnostics.Contracts;
1213
using System.Threading.Tasks;
@@ -17,7 +18,6 @@ namespace DotNetty.Transport.Libuv
1718
using System.Threading;
1819
using DotNetty.Common;
1920
using DotNetty.Transport.Libuv.Native;
20-
2121
using Timer = Native.Timer;
2222

2323
class LoopExecutor : AbstractScheduledEventExecutor
@@ -297,7 +297,7 @@ void RunAllTasks(long timeout)
297297
long runTasks = 0;
298298
long executionTime;
299299
this.wakeUp = false;
300-
for (;;)
300+
for (; ; )
301301
{
302302
SafeExecute(task);
303303

@@ -402,7 +402,7 @@ static bool RunAllTasksFrom(IQueue<IRunnable> taskQueue)
402402
{
403403
return false;
404404
}
405-
for (;;)
405+
for (; ; )
406406
{
407407
SafeExecute(task);
408408
task = PollTaskFrom(taskQueue);
@@ -488,7 +488,7 @@ public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan time
488488
bool inEventLoop = this.InEventLoop;
489489
bool wakeUpLoop;
490490
int oldState;
491-
for (;;)
491+
for (; ; )
492492
{
493493
if (this.IsShuttingDown)
494494
{
@@ -540,5 +540,7 @@ public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan time
540540

541541
return this.TerminationCompletion;
542542
}
543+
544+
protected override IEnumerable<IEventExecutor> GetItems() => new[] { this };
543545
}
544546
}

src/DotNetty.Transport.Libuv/WorkerEventLoop.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
namespace DotNetty.Transport.Libuv
88
{
99
using System;
10+
using System.Collections.Generic;
1011
using System.Diagnostics;
1112
using System.Diagnostics.Contracts;
1213
using System.Threading.Tasks;
@@ -112,6 +113,8 @@ void OnRead(Pipe handle, int status)
112113

113114
public new IEventLoopGroup Parent => (IEventLoopGroup)base.Parent;
114115

116+
IEnumerable<IEventLoop> IEventLoopGroup.Items => new[] { this };
117+
115118
sealed class PipeConnect : ConnectRequest
116119
{
117120
const int MaximumRetryCount = 10;

src/DotNetty.Transport.Libuv/WorkerEventLoopGroup.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
namespace DotNetty.Transport.Libuv
77
{
88
using System;
9+
using System.Collections.Generic;
910
using System.Diagnostics;
1011
using System.Diagnostics.Contracts;
1112
using System.Linq;
@@ -84,6 +85,8 @@ public WorkerEventLoopGroup(DispatcherEventLoopGroup eventLoopGroup, int eventLo
8485

8586
internal string PipeName { get; }
8687

88+
IEnumerable<IEventLoop> IEventLoopGroup.Items => this.eventLoops;
89+
8790
internal void Accept(NativeHandle handle)
8891
{
8992
Debug.Assert(this.dispatcherLoop != null);
@@ -126,5 +129,7 @@ public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan time
126129
}
127130
return this.TerminationCompletion;
128131
}
132+
133+
protected override IEnumerable<IEventExecutor> GetItems() => this.eventLoops;
129134
}
130135
}

src/DotNetty.Transport/Channels/AffinitizedEventLoopGroup.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
namespace DotNetty.Transport.Channels
55
{
66
using System;
7+
using System.Collections.Generic;
78
using System.Threading.Tasks;
89
using DotNetty.Common.Concurrency;
910

@@ -23,6 +24,10 @@ public class AffinitizedEventLoopGroup : AbstractEventExecutorGroup, IEventLoopG
2324
/// <inheritdoc cref="IEventExecutorGroup"/>
2425
public override Task TerminationCompletion => this.innerGroup.TerminationCompletion;
2526

27+
protected override IEnumerable<IEventExecutor> GetItems() => this.innerGroup.Items;
28+
29+
public new IEnumerable<IEventLoop> Items => ((IEventLoopGroup)this.innerGroup).Items;
30+
2631
/// <summary>
2732
/// Creates a new instance of <see cref="AffinitizedEventLoopGroup"/>.
2833
/// </summary>

0 commit comments

Comments
 (0)