Skip to content

Commit 538aa1b

Browse files
committed
Make MockPayloadSender a subclass of ApmChannel to assert tests
1 parent eef82db commit 538aa1b

File tree

6 files changed

+155
-94
lines changed

6 files changed

+155
-94
lines changed

src/Elastic.Apm/Ingest/ApmChannel.cs

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,24 @@
55
using System;
66
using System.Collections.Generic;
77
using System.Diagnostics;
8+
using System.Diagnostics.CodeAnalysis;
89
using System.IO;
10+
using System.Linq;
911
using System.Text.Encodings.Web;
1012
using System.Text.Json;
1113
using System.Text.Json.Serialization;
1214
using System.Threading;
1315
using System.Threading.Tasks;
1416
using Elastic.Apm.Api;
17+
using Elastic.Apm.Logging;
1518
using Elastic.Apm.Report;
1619
using Elastic.Channels;
1720
using Elastic.Ingest.Transport;
1821
using Elastic.Transport;
1922

2023
namespace Elastic.Apm.Ingest;
2124

25+
#nullable enable
2226
internal static class ApmChannelStatics
2327
{
2428
public static readonly byte[] LineFeed = { (byte)'\n' };
@@ -40,18 +44,56 @@ internal static class ApmChannelStatics
4044
/// </summary>
4145
public class ApmChannel
4246
: TransportChannelBase<ApmChannelOptions, IIntakeRoot, EventIntakeResponse, IntakeErrorItem>
43-
, IPayloadSender
47+
, IPayloadSender
4448
{
49+
private readonly List<Func<ITransaction, ITransaction?>> _transactionFilters = new();
50+
private readonly List<Func<ISpan, ISpan?>> _spanFilters = new();
51+
private readonly List<Func<IError, IError?>> _errorFilters = new();
52+
4553
/// <inheritdoc cref="ApmChannel"/>
46-
public ApmChannel(ApmChannelOptions options) : base(options) { }
54+
public ApmChannel(ApmChannelOptions options, IApmLogger? logger = null) : base(options) =>
55+
PayloadSenderV2.SetUpFilters(_transactionFilters, _spanFilters, _errorFilters, null, logger ?? new TraceLogger(LogLevel.Trace));
56+
57+
public IError? Filter(IError error) => _errorFilters.Aggregate(error, (current, filter) => filter(current)!);
58+
59+
public ISpan? Filter(ISpan span) => _spanFilters.Aggregate(span, (current, filter) => filter(current)!);
4760

48-
void IPayloadSender.QueueError(IError error) => TryWrite(error);
61+
public ITransaction? Filter(ITransaction span) => _transactionFilters.Aggregate(span, (current, filter) => filter(current)!);
62+
63+
public bool TryFilter(IError error, [NotNullWhen(true)] out IError? filtered)
64+
{
65+
filtered = _errorFilters.Select(f => f(error)).TakeWhile(e => e != null).LastOrDefault();
66+
return filtered != null;
67+
}
4968

50-
void IPayloadSender.QueueMetrics(IMetricSet metrics) => TryWrite(metrics);
69+
public bool TryFilter(ISpan span, [NotNullWhen(true)] out ISpan? filtered)
70+
{
71+
filtered = _spanFilters.Select(f => f(span)).TakeWhile(e => e != null).LastOrDefault();
72+
return filtered != null;
73+
}
5174

52-
void IPayloadSender.QueueSpan(ISpan span) => TryWrite(span);
75+
public bool TryFilter(ITransaction transaction, [NotNullWhen(true)] out ITransaction? filtered)
76+
{
77+
filtered = _transactionFilters.Select(f => f(transaction)).TakeWhile(e => e != null).LastOrDefault();
78+
return filtered != null;
79+
}
5380

54-
void IPayloadSender.QueueTransaction(ITransaction transaction) => TryWrite(transaction);
81+
public virtual void QueueMetrics(IMetricSet metrics) => TryWrite(metrics);
82+
83+
public virtual void QueueError(IError error)
84+
{
85+
if (TryFilter(error, out var e)) TryWrite(e);
86+
}
87+
88+
public virtual void QueueSpan(ISpan span)
89+
{
90+
if (TryFilter(span, out var s)) TryWrite(s);
91+
}
92+
93+
public virtual void QueueTransaction(ITransaction transaction)
94+
{
95+
if (TryFilter(transaction, out var t)) TryWrite(t);
96+
}
5597

5698
//retry if APM server returns 429
5799
/// <inheritdoc cref="ResponseItemsBufferedChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}.Retry"/>
@@ -74,7 +116,8 @@ public ApmChannel(ApmChannelOptions options) : base(options) { }
74116
protected override bool RejectEvent((IIntakeRoot, IntakeErrorItem) @event) => false;
75117

76118
/// <inheritdoc cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.ExportAsync"/>
77-
protected override Task<EventIntakeResponse> ExportAsync(HttpTransport transport, ArraySegment<IIntakeRoot> page, CancellationToken ctx = default) =>
119+
protected override Task<EventIntakeResponse>
120+
ExportAsync(HttpTransport transport, ArraySegment<IIntakeRoot> page, CancellationToken ctx = default) =>
78121
transport.RequestAsync<EventIntakeResponse>(HttpMethod.POST, "/intake/v2/events",
79122
PostData.StreamHandler(page,
80123
(_, _) =>

src/Elastic.Apm/Ingest/ApmChannelOptions.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
33
// See the LICENSE file in the project root for more information
44

5+
using System;
56
using Elastic.Apm.Api;
67
using Elastic.Ingest.Transport;
78
using Elastic.Transport;
@@ -14,5 +15,11 @@ namespace Elastic.Apm.Ingest;
1415
public class ApmChannelOptions : TransportChannelOptionsBase<IIntakeRoot, EventIntakeResponse, IntakeErrorItem>
1516
{
1617
/// <inheritdoc cref="ApmChannelOptions"/>
17-
public ApmChannelOptions(HttpTransport transport) : base(transport) { }
18+
private ApmChannelOptions(HttpTransport transport) : base(transport) { }
19+
20+
public ApmChannelOptions(Uri serverEndpoint, TransportClient transportClient = null)
21+
: this(new DefaultHttpTransport(new TransportConfiguration(new SingleNodePool(serverEndpoint), connection: transportClient!)))
22+
{
23+
24+
}
1825
}

src/Elastic.Apm/Libraries/Newtonsoft.Json/Utilities/NullableAttributes.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#endregion
2727

2828
#nullable enable
29+
#if !NET6_0_OR_GREATER
2930
namespace System.Diagnostics.CodeAnalysis
3031
{
3132
/// <summary>Specifies that an output will not be null even if the corresponding type allows it.</summary>
@@ -78,3 +79,4 @@ internal class DoesNotReturnIfAttribute : Attribute
7879
public bool ParameterValue { get; }
7980
}
8081
}
82+
#endif

test/Elastic.Apm.Tests.Utilities/MockPayloadSender.cs

Lines changed: 77 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -10,33 +10,34 @@
1010
using System.Linq;
1111
using System.Threading;
1212
using Elastic.Apm.Api;
13+
using Elastic.Apm.Ingest;
1314
using Elastic.Apm.Libraries.Newtonsoft.Json.Linq;
1415
using Elastic.Apm.Logging;
1516
using Elastic.Apm.Metrics;
1617
using Elastic.Apm.Model;
1718
using Elastic.Apm.Report;
19+
using Elastic.Transport;
1820
using FluentAssertions;
1921

22+
#nullable enable
2023
namespace Elastic.Apm.Tests.Utilities
2124
{
22-
internal class MockPayloadSender : IPayloadSender
25+
internal class MockPayloadSender : ApmChannel
2326
{
2427
private static readonly JObject JsonSpanTypesData =
2528
JObject.Parse(File.ReadAllText("./TestResources/json-specs/span_types.json"));
2629

27-
private readonly List<IError> _errors = new List<IError>();
28-
private readonly List<Func<IError, IError>> _errorFilters = new List<Func<IError, IError>>();
29-
private readonly object _spanLock = new object();
30-
private readonly object _transactionLock = new object();
31-
private readonly object _metricsLock = new object();
32-
private readonly object _errorLock = new object();
33-
private readonly List<IMetricSet> _metrics = new List<IMetricSet>();
34-
private readonly List<Func<ISpan, ISpan>> _spanFilters = new List<Func<ISpan, ISpan>>();
35-
private readonly List<ISpan> _spans = new List<ISpan>();
36-
private readonly List<Func<ITransaction, ITransaction>> _transactionFilters = new List<Func<ITransaction, ITransaction>>();
37-
private readonly List<ITransaction> _transactions = new List<ITransaction>();
38-
39-
public MockPayloadSender(IApmLogger logger = null)
30+
private readonly object _spanLock = new();
31+
private readonly object _transactionLock = new();
32+
private readonly object _metricsLock = new();
33+
private readonly object _errorLock = new();
34+
private readonly List<IMetricSet> _metrics = new();
35+
private readonly List<IError> _errors = new();
36+
private readonly List<ISpan> _spans = new();
37+
private readonly List<ITransaction> _transactions = new();
38+
39+
public MockPayloadSender(IApmLogger? logger = null)
40+
: base(new ApmChannelOptions(new Uri("http://localhost:8080"), transportClient: new InMemoryConnection()), logger)
4041
{
4142
_waitHandles = new[] { new AutoResetEvent(false), new AutoResetEvent(false), new AutoResetEvent(false), new AutoResetEvent(false) };
4243

@@ -45,7 +46,6 @@ public MockPayloadSender(IApmLogger logger = null)
4546
_errorWaitHandle = _waitHandles[2];
4647
_metricSetWaitHandle = _waitHandles[3];
4748

48-
PayloadSenderV2.SetUpFilters(_transactionFilters, _spanFilters, _errorFilters, MockApmServerInfo.Version710, logger ?? new NoopLogger());
4949
}
5050

5151
/// <summary>
@@ -61,6 +61,54 @@ public MockPayloadSender(IApmLogger logger = null)
6161
private readonly AutoResetEvent[] _waitHandles;
6262
private static readonly TimeSpan DefaultTimeout = TimeSpan.FromMinutes(1);
6363

64+
public override bool TryWrite(IIntakeRoot item)
65+
{
66+
var written = base.TryWrite(item);
67+
switch (item)
68+
{
69+
case IError error:
70+
_errors.Add(error);
71+
_errorWaitHandle.Set();
72+
break;
73+
case ITransaction transaction:
74+
_transactions.Add(transaction);
75+
_transactionWaitHandle.Set();
76+
break;
77+
case ISpan span:
78+
_spans.Add(span);
79+
_spanWaitHandle.Set();
80+
break;
81+
case IMetricSet metricSet:
82+
_metrics.Add(metricSet);
83+
_metricSetWaitHandle.Set();
84+
break;
85+
}
86+
return written;
87+
}
88+
89+
public override void QueueError(IError error)
90+
{
91+
lock (_errorLock) base.QueueError(error);
92+
}
93+
94+
public override void QueueTransaction(ITransaction transaction)
95+
{
96+
lock (_transactionLock) base.QueueTransaction(transaction);
97+
}
98+
99+
public override void QueueSpan(ISpan span)
100+
{
101+
VerifySpan(span);
102+
lock (_spanLock) base.QueueSpan(span);
103+
}
104+
105+
public override void QueueMetrics(IMetricSet metricSet)
106+
{
107+
lock (_metricsLock) base.QueueMetrics(metricSet);
108+
}
109+
110+
111+
64112
/// <summary>
65113
/// Waits for any events to be queued
66114
/// </summary>
@@ -191,27 +239,27 @@ public IReadOnlyList<IError> Errors
191239
get
192240
{
193241
lock (_errorLock)
194-
return CreateImmutableSnapshot<IError>(_errors);
242+
return CreateImmutableSnapshot(_errors);
195243
}
196244
}
197245

198-
public Error FirstError => Errors.FirstOrDefault() as Error;
199-
public MetricSet FirstMetric => Metrics.FirstOrDefault() as MetricSet;
246+
public Error? FirstError => Errors.FirstOrDefault() as Error;
247+
public MetricSet? FirstMetric => Metrics.FirstOrDefault() as MetricSet;
200248

201249
/// <summary>
202250
/// The 1. Span on the 1. Transaction
203251
/// </summary>
204-
public Span FirstSpan => Spans.FirstOrDefault() as Span;
252+
public Span? FirstSpan => Spans.FirstOrDefault() as Span;
205253

206-
public Transaction FirstTransaction =>
254+
public Transaction? FirstTransaction =>
207255
Transactions.FirstOrDefault() as Transaction;
208256

209257
public IReadOnlyList<IMetricSet> Metrics
210258
{
211259
get
212260
{
213261
lock (_metricsLock)
214-
return CreateImmutableSnapshot<IMetricSet>(_metrics);
262+
return CreateImmutableSnapshot(_metrics);
215263
}
216264
}
217265

@@ -220,7 +268,7 @@ public IReadOnlyList<ISpan> Spans
220268
get
221269
{
222270
lock (_spanLock)
223-
return CreateImmutableSnapshot<ISpan>(_spans);
271+
return CreateImmutableSnapshot(_spans);
224272
}
225273
}
226274

@@ -229,45 +277,15 @@ public IReadOnlyList<ITransaction> Transactions
229277
get
230278
{
231279
lock (_transactionLock)
232-
return CreateImmutableSnapshot<ITransaction>(_transactions);
280+
return CreateImmutableSnapshot(_transactions);
233281
}
234282
}
235283

236284
public Span[] SpansOnFirstTransaction =>
237-
Spans.Where(n => n.TransactionId == Transactions.First().Id).Select(n => n as Span).ToArray();
238-
239-
public void QueueError(IError error)
240-
{
241-
lock (_errorLock)
242-
{
243-
error = _errorFilters.Aggregate(error,
244-
(current, filter) => filter(current));
245-
_errors.Add(error);
246-
_errorWaitHandle.Set();
247-
}
248-
}
249-
250-
public virtual void QueueTransaction(ITransaction transaction)
251-
{
252-
lock (_transactionLock)
253-
{
254-
transaction = _transactionFilters.Aggregate(transaction,
255-
(current, filter) => filter(current));
256-
_transactions.Add(transaction);
257-
_transactionWaitHandle.Set();
258-
}
259-
}
260-
261-
public void QueueSpan(ISpan span)
262-
{
263-
VerifySpan(span);
264-
lock (_spanLock)
265-
{
266-
span = _spanFilters.Aggregate(span, (current, filter) => filter(current));
267-
_spans.Add(span);
268-
_spanWaitHandle.Set();
269-
}
270-
}
285+
Spans
286+
.Where(n => n.TransactionId == Transactions.First().Id)
287+
.Select(n => (Span)n)
288+
.ToArray();
271289

272290
private void VerifySpan(ISpan span)
273291
{
@@ -279,7 +297,7 @@ private void VerifySpan(ISpan span)
279297
var spanTypeInfo = JsonSpanTypesData[type] as JObject;
280298
spanTypeInfo.Should().NotBeNull($"span type '{type}' is not allowed by the spec");
281299

282-
var allowNullSubtype = spanTypeInfo["allow_null_subtype"]?.Value<bool>();
300+
var allowNullSubtype = spanTypeInfo!["allow_null_subtype"]?.Value<bool>();
283301
var allowUnlistedSubtype = spanTypeInfo["allow_unlisted_subtype"]?.Value<bool>();
284302
var subTypes = spanTypeInfo["subtypes"];
285303
var hasSubtypes = subTypes != null && subTypes.Any();
@@ -289,7 +307,7 @@ private void VerifySpan(ISpan span)
289307
{
290308
if (!allowUnlistedSubtype.GetValueOrDefault() && hasSubtypes)
291309
{
292-
var subTypeInfo = subTypes[subType];
310+
var subTypeInfo = subTypes![subType];
293311
subTypeInfo.Should()
294312
.NotBeNull($"span subtype '{subType}' is not allowed by the spec for type '{type}'");
295313
}
@@ -305,15 +323,6 @@ private void VerifySpan(ISpan span)
305323
}
306324
}
307325

308-
public void QueueMetrics(IMetricSet metricSet)
309-
{
310-
lock (_metricsLock)
311-
{
312-
_metrics.Add(metricSet);
313-
_metricSetWaitHandle.Set();
314-
}
315-
}
316-
317326
public void Clear()
318327
{
319328
lock (_spanLock)

0 commit comments

Comments
 (0)