Skip to content

Commit 0005d86

Browse files
authored
Remove sync dispose (#681)
1 parent f0acf60 commit 0005d86

File tree

6 files changed

+44
-29
lines changed

6 files changed

+44
-29
lines changed

source/Halibut.Tests/Queue/QueueMessageSerializerFixture.cs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,16 @@ namespace Halibut.Tests.Queue
1818
public class QueueMessageSerializerFixture : BaseTest
1919
{
2020
[Test]
21-
public void SerializeAndDeserializeSimpleStringMessage_ShouldRoundTrip()
21+
public async Task SerializeAndDeserializeSimpleStringMessage_ShouldRoundTrip()
2222
{
2323
// Arrange
2424
var sut = new QueueMessageSerializerBuilder().Build();
2525

2626
const string testMessage = "Hello, Queue!";
2727

2828
// Act
29-
var (json, dataStreams) = sut.WriteMessage(testMessage);
30-
var (deserializedMessage, deserializedDataStreams) = sut.ReadMessage<string>(json);
29+
var (json, dataStreams) = await sut.WriteMessage(testMessage);
30+
var (deserializedMessage, deserializedDataStreams) = await sut.ReadMessage<string>(json);
3131

3232
// Assert
3333
deserializedMessage.Should().Be(testMessage);
@@ -36,7 +36,7 @@ public void SerializeAndDeserializeSimpleStringMessage_ShouldRoundTrip()
3636
}
3737

3838
[Test]
39-
public void SerializeAndDeserializeRequestMessage_ShouldRoundTrip_RequestMessage()
39+
public async Task SerializeAndDeserializeRequestMessage_ShouldRoundTrip_RequestMessage()
4040
{
4141
// Arrange
4242
var sut = new QueueMessageSerializerBuilder().Build();
@@ -52,8 +52,8 @@ public void SerializeAndDeserializeRequestMessage_ShouldRoundTrip_RequestMessage
5252
};
5353

5454
// Act
55-
var (json, dataStreams) = sut.WriteMessage(request);
56-
var (deserializedMessage, deserializedDataStreams) = sut.ReadMessage<RequestMessage>(json);
55+
var (json, dataStreams) = await sut.WriteMessage(request);
56+
var (deserializedMessage, deserializedDataStreams) = await sut.ReadMessage<RequestMessage>(json);
5757

5858
// Assert
5959
deserializedMessage.Should().BeEquivalentTo(request);
@@ -62,7 +62,7 @@ public void SerializeAndDeserializeRequestMessage_ShouldRoundTrip_RequestMessage
6262
}
6363

6464
[Test]
65-
public void SerializeAndDeserializeRequestMessageWithDataStream_ShouldRoundTrip_RequestMessage()
65+
public async Task SerializeAndDeserializeRequestMessageWithDataStream_ShouldRoundTrip_RequestMessage()
6666
{
6767
var typeRegistry = new TypeRegistry();
6868
typeRegistry.Register(typeof(IHaveTypeWithDataStreamsService));
@@ -86,7 +86,7 @@ public void SerializeAndDeserializeRequestMessageWithDataStream_ShouldRoundTrip_
8686
};
8787

8888
// Act
89-
var (json, dataStreams) = sut.WriteMessage(request);
89+
var (json, dataStreams) = await sut.WriteMessage(request);
9090

9191
dataStreams[1].Should().BeOfType<RepeatingStringDataStream>();
9292

@@ -95,7 +95,7 @@ public void SerializeAndDeserializeRequestMessageWithDataStream_ShouldRoundTrip_
9595
jsonString.Should().Contain("TypeWithDataStreams");
9696
jsonString.Should().NotContain("RepeatingStringDataStream");
9797

98-
var (deserializedMessage, deserializedDataStreams) = sut.ReadMessage<RequestMessage>(json);
98+
var (deserializedMessage, deserializedDataStreams) = await sut.ReadMessage<RequestMessage>(json);
9999

100100
// Manually check each field of the deserializedMessage matches the request
101101
deserializedMessage.Id.Should().Be(request.Id);
@@ -113,7 +113,7 @@ public void SerializeAndDeserializeRequestMessageWithDataStream_ShouldRoundTrip_
113113
}
114114

115115
[Test]
116-
public void SerializeAndDeserializeSimpleStringMessage_WithStreamWrappers_ShouldRoundTrip()
116+
public async Task SerializeAndDeserializeSimpleStringMessage_WithStreamWrappers_ShouldRoundTrip()
117117
{
118118
// Arrange
119119
var sut = new QueueMessageSerializerBuilder()
@@ -126,8 +126,8 @@ public void SerializeAndDeserializeSimpleStringMessage_WithStreamWrappers_Should
126126
const string testMessage = "Hello, Queue!";
127127

128128
// Act
129-
var (json, dataStreams) = sut.WriteMessage(testMessage);
130-
var (deserializedMessage, deserializedDataStreams) = sut.ReadMessage<string>(json);
129+
var (json, dataStreams) = await sut.WriteMessage(testMessage);
130+
var (deserializedMessage, deserializedDataStreams) = await sut.ReadMessage<string>(json);
131131

132132
// Assert
133133
deserializedMessage.Should().Be(testMessage);
@@ -136,7 +136,7 @@ public void SerializeAndDeserializeSimpleStringMessage_WithStreamWrappers_Should
136136
}
137137

138138
[Test]
139-
public void SerializeAndDeserializeSimpleStringMessage_WithStreamWrappers_ShouldDisposeStreamsInCorrectOrder()
139+
public async Task SerializeAndDeserializeSimpleStringMessage_WithStreamWrappers_ShouldDisposeStreamsInCorrectOrder()
140140
{
141141
// Arrange
142142
var disposeOrderWriter = new List<string>();
@@ -161,8 +161,8 @@ public void SerializeAndDeserializeSimpleStringMessage_WithStreamWrappers_Should
161161
const string testMessage = "Hello, Queue!";
162162

163163
// Act
164-
var (json, dataStreams) = sut.WriteMessage(testMessage);
165-
var (deserializedMessage, deserializedDataStreams) = sut.ReadMessage<string>(json);
164+
var (json, dataStreams) = await sut.WriteMessage(testMessage);
165+
var (deserializedMessage, deserializedDataStreams) = await sut.ReadMessage<string>(json);
166166

167167
// Assert
168168
deserializedMessage.Should().Be(testMessage);

source/Halibut/Queue/QueueMessageSerializer.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Collections.Generic;
44
using System.IO;
55
using System.Text;
6+
using System.Threading.Tasks;
67
using Halibut.Queue.MessageStreamWrapping;
78
using Halibut.Transport.Protocol;
89
using Halibut.Util;
@@ -28,13 +29,13 @@ public QueueMessageSerializer(Func<StreamCapturingJsonSerializer> createStreamCa
2829
this.messageStreamWrappers = messageStreamWrappers;
2930
}
3031

31-
public (byte[], IReadOnlyList<DataStream>) WriteMessage<T>(T message)
32+
public async Task<(byte[], IReadOnlyList<DataStream>)> WriteMessage<T>(T message)
3233
{
3334
IReadOnlyList<DataStream> dataStreams;
3435

3536
using var ms = new MemoryStream();
3637
Stream stream = ms;
37-
using (var wrappedStreamDisposables = new DisposableCollection())
38+
await using (var wrappedStreamDisposables = new DisposableCollection())
3839
{
3940
stream = WrapInMessageSerialisationStreams(messageStreamWrappers, stream, wrappedStreamDisposables);
4041

@@ -71,11 +72,11 @@ public static Stream WrapInMessageSerialisationStreams(MessageStreamWrappers mes
7172
return stream;
7273
}
7374

74-
public (T Message, IReadOnlyList<DataStream> DataStreams) ReadMessage<T>(byte[] json)
75+
public async Task<(T Message, IReadOnlyList<DataStream> DataStreams)> ReadMessage<T>(byte[] json)
7576
{
7677
using var ms = new MemoryStream(json);
7778
Stream stream = ms;
78-
using var disposables = new DisposableCollection();
79+
await using var disposables = new DisposableCollection();
7980
stream = WrapStreamInMessageDeserialisationStreams(messageStreamWrappers, stream, disposables);
8081
using var sr = new StreamReader(stream, Encoding.UTF8
8182
#if NET8_0_OR_GREATER

source/Halibut/Queue/Redis/MessageStorage/MessageSerialiserAndDataStreamStorage.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public MessageSerialiserAndDataStreamStorage(QueueMessageSerializer queueMessage
2020

2121
public async Task<(RedisStoredMessage, HeartBeatDrivenDataStreamProgressReporter)> PrepareRequest(RequestMessage request, CancellationToken cancellationToken)
2222
{
23-
var (jsonRequestMessage, dataStreams) = queueMessageSerializer.WriteMessage(request);
23+
var (jsonRequestMessage, dataStreams) = await queueMessageSerializer.WriteMessage(request);
2424
SwitchDataStreamsToNotReportProgress(dataStreams);
2525
var dataStreamProgressReporter = HeartBeatDrivenDataStreamProgressReporter.CreateForDataStreams(dataStreams);
2626
var dataStreamMetadata = await storeDataStreamsForDistributedQueues.StoreDataStreams(dataStreams, cancellationToken);
@@ -41,7 +41,7 @@ static void SwitchDataStreamsToNotReportProgress(IReadOnlyList<DataStream> dataS
4141

4242
public async Task<(RequestMessage, RequestDataStreamsTransferProgress)> ReadRequest(RedisStoredMessage storedMessage, CancellationToken cancellationToken)
4343
{
44-
var (request, dataStreams) = queueMessageSerializer.ReadMessage<RequestMessage>(storedMessage.Message);
44+
var (request, dataStreams) = await queueMessageSerializer.ReadMessage<RequestMessage>(storedMessage.Message);
4545

4646
var rehydratableDataStreams = BuildUpRehydratableDataStreams(dataStreams, out var dataStreamTransferProgress);
4747

@@ -51,14 +51,14 @@ static void SwitchDataStreamsToNotReportProgress(IReadOnlyList<DataStream> dataS
5151

5252
public async Task<RedisStoredMessage> PrepareResponse(ResponseMessage response, CancellationToken cancellationToken)
5353
{
54-
var (jsonResponseMessage, dataStreams) = queueMessageSerializer.WriteMessage(response);
54+
var (jsonResponseMessage, dataStreams) = await queueMessageSerializer.WriteMessage(response);
5555
var dataStreamMetadata = await storeDataStreamsForDistributedQueues.StoreDataStreams(dataStreams, cancellationToken);
5656
return new RedisStoredMessage(jsonResponseMessage, dataStreamMetadata);
5757
}
5858

5959
public async Task<ResponseMessage> ReadResponse(RedisStoredMessage storedMessage, CancellationToken cancellationToken)
6060
{
61-
var (response, dataStreams) = queueMessageSerializer.ReadMessage<ResponseMessage>(storedMessage.Message);
61+
var (response, dataStreams) = await queueMessageSerializer.ReadMessage<ResponseMessage>(storedMessage.Message);
6262

6363
var rehydratableDataStreams = BuildUpRehydratableDataStreams(dataStreams, out _);
6464

source/Halibut/Queue/Redis/NodeHeartBeat/NodeHeartBeatSender.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public NodeHeartBeatSender(
4141

4242
async Task SendPulsesWhileProcessingRequest(Func<HeartBeatMessage> heartBeatMessageProvider, TimeSpan defaultDelayBetweenPulses, CancellationToken cancellationToken)
4343
{
44+
await NodeHeartBeatWatcher.WaitBeforePulses(cancellationToken);
45+
4446
log.Write(EventType.Diagnostic, "Starting heartbeat pulse loop for {0} node, request {1}", nodeSendingPulsesType, requestActivityId);
4547

4648
TimeSpan delayBetweenPulse;

source/Halibut/Queue/Redis/NodeHeartBeat/NodeHeartBeatWatcher.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public static async Task<NodeWatcherResult> WatchThatNodeProcessingTheRequestIsS
2222
IGetNotifiedOfHeartBeats notifiedOfHeartBeats,
2323
CancellationToken watchCancellationToken)
2424
{
25+
await WaitBeforePulses(watchCancellationToken);
2526
log = log.ForContext<NodeHeartBeatWatcher>();
2627
// Once the pending's CT has been cancelled we no longer care to keep observing
2728
await using var cts = new CancelOnDisposeCancellationToken(watchCancellationToken, redisPending.PendingRequestCancellationToken);
@@ -57,6 +58,7 @@ public static async Task<NodeWatcherResult> WatchThatNodeWhichSentTheRequestIsSt
5758
TimeSpan maxTimeBetweenSenderHeartBeetsBeforeSenderIsAssumedToBeOffline,
5859
CancellationToken watchCancellationToken)
5960
{
61+
await WaitBeforePulses(watchCancellationToken);
6062
try
6163
{
6264
return await WatchForPulsesFromNode(endpoint, requestActivityId, halibutRedisTransport, log, maxTimeBetweenSenderHeartBeetsBeforeSenderIsAssumedToBeOffline, HalibutQueueNodeSendingPulses.RequestSenderNode, watchCancellationToken);
@@ -70,6 +72,20 @@ public static async Task<NodeWatcherResult> WatchThatNodeWhichSentTheRequestIsSt
7072
return NodeWatcherResult.NodeMayHaveDisconnected;
7173
}
7274
}
75+
76+
public static TimeSpan delayBeforeCheckingForOrSendingPulses = TimeSpan.Zero;
77+
78+
public static async Task WaitBeforePulses(CancellationToken cancellationToken)
79+
{
80+
try
81+
{
82+
await Task.Delay(NodeHeartBeatWatcher.delayBeforeCheckingForOrSendingPulses, cancellationToken);
83+
}
84+
catch
85+
{
86+
87+
}
88+
}
7389

7490
static async Task<NodeWatcherResult> WatchForPulsesFromNode(Uri endpoint,
7591
Guid requestActivityId,
@@ -80,6 +96,7 @@ static async Task<NodeWatcherResult> WatchForPulsesFromNode(Uri endpoint,
8096
CancellationToken watchCancellationToken,
8197
Func<HeartBeatMessage, Task>? notifiedOfHeartBeats = null)
8298
{
99+
83100
log.ForContext<NodeHeartBeatSender>();
84101
log.Write(EventType.Diagnostic, "Starting to watch for pulses from {0} node, request {1}, endpoint {2}", watchingForPulsesFrom, requestActivityId, endpoint);
85102

source/Halibut/Util/DisposableCollection.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
namespace Halibut.Util
88
{
9-
public class DisposableCollection : IDisposable, IAsyncDisposable
9+
public class DisposableCollection : IAsyncDisposable
1010
{
1111

1212
//Dispose in the reverse order of when they were added so we deal with nested objects correctly.
@@ -48,11 +48,6 @@ public IAsyncDisposable AddAsyncDisposable<T>(T asyncDisposable) where T : IAsyn
4848
return asyncDisposable;
4949
}
5050

51-
public void Dispose()
52-
{
53-
DisposeAsync().AsTask().GetAwaiter().GetResult();
54-
}
55-
5651
public async ValueTask DisposeAsync()
5752
{
5853
if (isDisposed) return;

0 commit comments

Comments
 (0)