Skip to content

Commit 4303305

Browse files
committed
context propagation fixes
1 parent 1d67b78 commit 4303305

File tree

12 files changed

+197
-228
lines changed

12 files changed

+197
-228
lines changed

src/DotNetty.Common/Concurrency/AbstractPromise.cs

Lines changed: 93 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -5,57 +5,55 @@ namespace DotNetty.Common.Concurrency
55
{
66
using System;
77
using System.Collections.Generic;
8+
using System.Diagnostics.Contracts;
9+
using System.Reflection;
810
using System.Runtime.CompilerServices;
911
using System.Runtime.ExceptionServices;
12+
using System.Runtime.InteropServices.ComTypes;
1013
using System.Threading;
1114
using System.Threading.Tasks;
1215
using System.Threading.Tasks.Sources;
1316

1417
public abstract class AbstractPromise : IPromise, IValueTaskSource
1518
{
16-
struct CompletionData
17-
{
18-
public Action<object> Continuation { get; }
19-
public object State { get; }
20-
public ExecutionContext ExecutionContext { get; }
21-
public SynchronizationContext SynchronizationContext { get; }
22-
23-
public CompletionData(Action<object> continuation, object state, ExecutionContext executionContext, SynchronizationContext synchronizationContext)
24-
{
25-
this.Continuation = continuation;
26-
this.State = state;
27-
this.ExecutionContext = executionContext;
28-
this.SynchronizationContext = synchronizationContext;
29-
}
30-
}
31-
32-
const short SourceToken = 0;
33-
3419
static readonly ContextCallback ExecutionContextCallback = Execute;
35-
static readonly SendOrPostCallback SyncContextCallbackWithExecutionContext = ExecuteWithExecutionContext;
3620
static readonly SendOrPostCallback SyncContextCallback = Execute;
21+
static readonly SendOrPostCallback SyncContextCallbackWithExecutionContext = ExecuteWithExecutionContext;
22+
static readonly Action<object> TaskSchedulerCallback = Execute;
23+
static readonly Action<object> TaskScheduleCallbackWithExecutionContext = ExecuteWithExecutionContext;
3724

38-
static readonly Exception CanceledException = new OperationCanceledException();
39-
static readonly Exception CompletedNoException = new Exception();
25+
static readonly Exception CompletedSentinel = new Exception();
4026

27+
short currentId;
4128
protected Exception exception;
29+
30+
Action<object> continuation;
31+
object state;
32+
ExecutionContext executionContext;
33+
object schedulingContext;
34+
35+
public ValueTask ValueTask => new ValueTask(this, this.currentId);
4236

43-
int callbackCount;
44-
CompletionData[] completions;
45-
46-
public bool TryComplete() => this.TryComplete0(CompletedNoException);
37+
public bool TryComplete() => this.TryComplete0(CompletedSentinel, out _);
4738

48-
public bool TrySetException(Exception exception) => this.TryComplete0(exception);
39+
public bool TrySetException(Exception exception) => this.TryComplete0(exception, out _);
4940

50-
public bool TrySetCanceled() => this.TryComplete0(CanceledException);
41+
public bool TrySetCanceled(CancellationToken cancellationToken = default(CancellationToken)) => this.TryComplete0(new OperationCanceledException(cancellationToken), out _);
5142

52-
protected virtual bool TryComplete0(Exception exception)
43+
protected virtual bool TryComplete0(Exception exception, out bool continuationInvoked)
5344
{
45+
continuationInvoked = false;
46+
5447
if (this.exception == null)
5548
{
5649
// Set the exception object to the exception passed in or a sentinel value
5750
this.exception = exception;
58-
this.TryExecuteCompletions();
51+
52+
if (this.continuation != null)
53+
{
54+
this.ExecuteContinuation();
55+
continuationInvoked = true;
56+
}
5957
return true;
6058
}
6159

@@ -66,15 +64,17 @@ protected virtual bool TryComplete0(Exception exception)
6664

6765
public virtual ValueTaskSourceStatus GetStatus(short token)
6866
{
67+
this.EnsureValidToken(token);
68+
6969
if (this.exception == null)
7070
{
7171
return ValueTaskSourceStatus.Pending;
7272
}
73-
else if (this.exception == CompletedNoException)
73+
else if (this.exception == CompletedSentinel)
7474
{
7575
return ValueTaskSourceStatus.Succeeded;
7676
}
77-
else if (this.exception == CanceledException)
77+
else if (this.exception is OperationCanceledException)
7878
{
7979
return ValueTaskSourceStatus.Canceled;
8080
}
@@ -86,150 +86,117 @@ public virtual ValueTaskSourceStatus GetStatus(short token)
8686

8787
public virtual void GetResult(short token)
8888
{
89+
this.EnsureValidToken(token);
90+
8991
if (this.exception == null)
9092
{
9193
throw new InvalidOperationException("Attempt to get result on not yet completed promise");
9294
}
9395

94-
this.IsCompletedOrThrow();
95-
}
96+
this.currentId++;
9697

97-
public virtual void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
98-
{
99-
if (this.completions == null)
100-
{
101-
this.completions = new CompletionData[1];
102-
}
103-
104-
int newIndex = this.callbackCount;
105-
this.callbackCount++;
106-
107-
if (newIndex == this.completions.Length)
108-
{
109-
var newArray = new CompletionData[this.completions.Length * 2];
110-
Array.Copy(this.completions, newArray, this.completions.Length);
111-
this.completions = newArray;
112-
}
113-
114-
this.completions[newIndex] = new CompletionData(
115-
continuation,
116-
state,
117-
(flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0 ? ExecutionContext.Capture() : null,
118-
(flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0 ? SynchronizationContext.Current : null
119-
);
120-
121-
if (this.exception != null)
98+
if (this.exception != CompletedSentinel)
12299
{
123-
this.TryExecuteCompletions();
100+
this.ThrowLatchedException();
124101
}
125102
}
126103

127-
public static implicit operator ValueTask(AbstractPromise promise) => new ValueTask(promise, SourceToken);
128-
129-
[MethodImpl(MethodImplOptions.AggressiveInlining)]
130-
bool IsCompletedOrThrow()
104+
public virtual void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
131105
{
132-
if (this.exception == null)
133-
{
134-
return false;
135-
}
106+
this.EnsureValidToken(token);
136107

137-
if (this.exception != CompletedNoException)
108+
if (this.continuation != null)
138109
{
139-
this.ThrowLatchedException();
110+
throw new InvalidOperationException("Attempt to subscribe same promise twice");
140111
}
141112

142-
return true;
143-
}
144-
145-
[MethodImpl(MethodImplOptions.NoInlining)]
146-
void ThrowLatchedException() => ExceptionDispatchInfo.Capture(this.exception).Throw();
147-
148-
bool TryExecuteCompletions()
149-
{
150-
if (this.callbackCount == 0 || this.completions == null)
151-
{
152-
return false;
153-
}
113+
this.continuation = continuation;
114+
this.state = state;
115+
this.executionContext = (flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0 ? ExecutionContext.Capture() : null;
154116

155-
List<Exception> exceptions = null;
156-
157-
for (int i = 0; i < this.callbackCount; i++)
117+
if ((flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0)
158118
{
159-
try
119+
SynchronizationContext sc = SynchronizationContext.Current;
120+
if (sc != null && sc.GetType() != typeof(SynchronizationContext))
160121
{
161-
CompletionData completion = this.completions[i];
162-
ExecuteCompletion(completion);
122+
this.schedulingContext = sc;
163123
}
164-
catch (Exception ex)
124+
else
165125
{
166-
if (exceptions == null)
126+
TaskScheduler ts = TaskScheduler.Current;
127+
if (ts != TaskScheduler.Default)
167128
{
168-
exceptions = new List<Exception>();
129+
this.schedulingContext = ts;
169130
}
170-
171-
exceptions.Add(ex);
172131
}
173132
}
174133

175-
if (exceptions == null)
134+
if (this.exception != null)
176135
{
177-
return true;
136+
this.ExecuteContinuation();
178137
}
179-
180-
throw new AggregateException(exceptions);
181138
}
139+
140+
public static implicit operator ValueTask(AbstractPromise promise) => promise.ValueTask;
141+
142+
[MethodImpl(MethodImplOptions.NoInlining)]
143+
void ThrowLatchedException() => ExceptionDispatchInfo.Capture(this.exception).Throw();
182144

183145
[MethodImpl(MethodImplOptions.AggressiveInlining)]
184-
protected void ClearCallbacks()
146+
protected void ClearCallback()
185147
{
186-
if (this.callbackCount > 0)
148+
this.continuation = null;
149+
this.state = null;
150+
this.executionContext = null;
151+
this.schedulingContext = null;
152+
}
153+
154+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
155+
void EnsureValidToken(short token)
156+
{
157+
if (this.currentId != token)
187158
{
188-
this.callbackCount = 0;
189-
Array.Clear(this.completions, 0, this.completions.Length);
159+
throw new InvalidOperationException("Incorrect ValueTask token");
190160
}
191-
}
161+
}
192162

193163
[MethodImpl(MethodImplOptions.AggressiveInlining)]
194-
static void ExecuteCompletion(CompletionData completion)
164+
void ExecuteContinuation()
195165
{
196-
if (completion.SynchronizationContext == null)
166+
ExecutionContext executionContext = this.executionContext;
167+
object schedulingContext = this.schedulingContext;
168+
169+
if (schedulingContext == null)
197170
{
198-
if (completion.ExecutionContext == null)
171+
if (executionContext == null)
199172
{
200-
completion.Continuation(completion.State);
173+
this.ExecuteContinuation0();
201174
}
202175
else
203176
{
204-
//boxing
205-
ExecutionContext.Run(completion.ExecutionContext, ExecutionContextCallback, completion);
177+
ExecutionContext.Run(executionContext, ExecutionContextCallback, this);
206178
}
207179
}
180+
else if (schedulingContext is SynchronizationContext sc)
181+
{
182+
sc.Post(executionContext == null ? SyncContextCallback : SyncContextCallbackWithExecutionContext, this);
183+
}
208184
else
209185
{
210-
if (completion.ExecutionContext == null)
211-
{
212-
//boxing
213-
completion.SynchronizationContext.Post(SyncContextCallback, completion);
214-
}
215-
else
216-
{
217-
//boxing
218-
completion.SynchronizationContext.Post(SyncContextCallbackWithExecutionContext, completion);
219-
}
186+
TaskScheduler ts = (TaskScheduler)schedulingContext;
187+
Contract.Assert(ts != null, "Expected a TaskScheduler");
188+
Task.Factory.StartNew(executionContext == null ? TaskSchedulerCallback : TaskScheduleCallbackWithExecutionContext, this, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
220189
}
221190
}
222191

223-
static void Execute(object state)
224-
{
225-
CompletionData completion = (CompletionData)state;
226-
completion.Continuation(completion.State);
227-
}
228-
229-
static void ExecuteWithExecutionContext(object state)
192+
static void Execute(object state) => ((AbstractPromise)state).ExecuteContinuation0();
193+
194+
static void ExecuteWithExecutionContext(object state) => ExecutionContext.Run(((AbstractPromise)state).executionContext, ExecutionContextCallback, state);
195+
196+
protected virtual void ExecuteContinuation0()
230197
{
231-
CompletionData completion = (CompletionData)state;
232-
ExecutionContext.Run(completion.ExecutionContext, ExecutionContextCallback, state);
198+
Contract.Assert(this.continuation != null);
199+
this.continuation(this.state);
233200
}
234201
}
235202
}

0 commit comments

Comments
 (0)