Skip to content

Commit b6f8ab0

Browse files
committed
Stricter checks in noise; more atomicity in yamux
1 parent 598bb99 commit b6f8ab0

File tree

2 files changed

+32
-23
lines changed

2 files changed

+32
-23
lines changed

src/libp2p/Libp2p.Protocols.Noise/NoiseProtocol.cs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
1+
// SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited
22
// SPDX-License-Identifier: MIT
33

44
using System.Buffers;
@@ -239,7 +239,7 @@ private static Task ExchangeData(Transport transport, IChannel downChannel, ICha
239239

240240
int bytesWritten = transport.WriteMessage(dataReadResult.Data.ToArray(), buffer.AsSpan(2));
241241
BinaryPrimitives.WriteUInt16BigEndian(buffer.AsSpan(), (ushort)bytesWritten);
242-
IOResult writeResult = await downChannel.WriteAsync(new ReadOnlySequence<byte>(buffer));
242+
IOResult writeResult = await downChannel.WriteAsync(new ReadOnlySequence<byte>(buffer, 0, 2 + bytesWritten));
243243
if (writeResult != IOResult.Ok)
244244
{
245245
logger?.LogDebug("End sending, due to {}", writeResult);
@@ -271,11 +271,14 @@ private static Task ExchangeData(Transport transport, IChannel downChannel, ICha
271271

272272
int bytesRead = transport.ReadMessage(dataReadResult.Data.ToArray(), buffer);
273273

274-
IOResult writeResult = await upChannel.WriteAsync(new ReadOnlySequence<byte>(buffer, 0, bytesRead));
275-
if (writeResult != IOResult.Ok)
274+
if (bytesRead != 0)
276275
{
277-
logger?.LogDebug("Receiving data failed due to {}", dataReadResult);
278-
return;
276+
IOResult writeResult = await upChannel.WriteAsync(new ReadOnlySequence<byte>(buffer, 0, bytesRead));
277+
if (writeResult != IOResult.Ok)
278+
{
279+
logger?.LogDebug("Receiving data failed due to {}", dataReadResult);
280+
return;
281+
}
279282
}
280283
}
281284
});

src/libp2p/Libp2p.Protocols.Yamux/YamuxProtocol.cs

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using Nethermind.Libp2p.Core.Exceptions;
77
using Nethermind.Libp2p.Protocols.Yamux;
88
using System.Buffers;
9+
using System.Collections.Concurrent;
910
using System.Diagnostics;
1011
using System.Runtime.CompilerServices;
1112

@@ -39,7 +40,7 @@ protected override async Task ConnectAsync(IChannel channel, IConnectionContext
3940
TaskAwaiter downChannelAwaiter = channel.GetAwaiter();
4041
channel.GetAwaiter().OnCompleted(() => context.Activity?.AddEvent(new ActivityEvent("channel closed")));
4142

42-
Dictionary<int, ChannelState> channels = [];
43+
ConcurrentDictionary<int, ChannelState> channels = [];
4344
INewSessionContext? session = null;
4445
Timer? timer = null;
4546

@@ -173,13 +174,13 @@ protected override async Task ConnectAsync(IChannel channel, IConnectionContext
173174
bool spent = channels[header.StreamID].LocalWindow.TrySpend((int)data.Length);
174175
if (!spent)
175176
{
176-
_logger?.LogDebug("Ctx({ctx}), stream {stream id}: Window spent out of budget", session.Id, header.StreamID);
177+
_logger?.LogDebug("Ctx({ctx}), stream {stream id}: Local window spent out of budget", session.Id, header.StreamID);
177178
await WriteGoAwayAsync(session.Id, channel, SessionTerminationCode.InternalError);
178179
return;
179180
}
180181

181-
_logger?.LogDebug("Ctx({ctx}), stream {stream id}: Spent window, was {available}, became {new}", session.Id,
182-
header.StreamID, available, channels[header.StreamID].LocalWindow.Available);
182+
_logger?.LogDebug("Ctx({ctx}), stream {stream id}: Local spent window, was {available}, became {new}", session.Id,
183+
header.StreamID, available, channels[header.StreamID].LocalWindow.Available);
183184

184185
_ = channels[header.StreamID].Channel!.WriteAsync(data).AsTask().ContinueWith((t) =>
185186
{
@@ -196,7 +197,7 @@ protected override async Task ConnectAsync(IChannel channel, IConnectionContext
196197
{
197198
int oldSize = channels[header.StreamID].RemoteWindow.Available;
198199
int newSize = channels[header.StreamID].RemoteWindow.Extend(header.Length);
199-
_logger?.LogDebug("Ctx({ctx}), stream {stream id}: Window update requested: {old} => {new}", session.Id, header.StreamID, oldSize, newSize);
200+
_logger?.LogDebug("Ctx({ctx}), stream {stream id}: Window update received: {old} => {new}", session.Id, header.StreamID, oldSize, newSize);
200201
}
201202

202203
if ((header.Flags & YamuxHeaderFlags.Fin) == YamuxHeaderFlags.Fin)
@@ -239,7 +240,7 @@ ChannelState CreateUpchannel(string contextId, int streamId, YamuxHeaderFlags in
239240

240241
upChannel.GetAwaiter().OnCompleted(() =>
241242
{
242-
channels.Remove(streamId);
243+
channels.TryRemove(streamId, out ChannelState? _);
243244
_logger?.LogDebug("Ctx({ctx}), stream {stream id}: Closed", contextId, streamId);
244245
});
245246

@@ -272,13 +273,15 @@ await WriteHeaderAsync(contextId, channel,
272273
{
273274
int sendingSize = await state.RemoteWindow.SpendOrWait((int)upData.Length - i, state.Channel!.CancellationToken);
274275

276+
_logger?.LogDebug("Ctx({ctx}), stream {stream id}: Remote window spend {sendingSize}", contextId, streamId, sendingSize);
277+
275278
await WriteHeaderAsync(contextId, channel,
276279
new YamuxHeader
277280
{
278281
Type = YamuxHeaderType.Data,
279282
Length = sendingSize,
280283
StreamID = streamId
281-
}, upData.Slice(i, sendingSize));
284+
}, new ReadOnlySequence<byte>(upData.Slice(i, sendingSize).ToArray()));
282285
i += sendingSize;
283286
}
284287
}
@@ -307,7 +310,7 @@ await WriteHeaderAsync(contextId, channel,
307310
StreamID = streamId
308311
});
309312
_ = upChannel.CloseAsync();
310-
channels.Remove(streamId);
313+
channels.TryRemove(streamId, out ChannelState? _);
311314

312315
_logger?.LogDebug("Ctx({ctx}), stream {stream id}: Unexpected error, closing: {error}", contextId, streamId, e.Message);
313316
}
@@ -344,16 +347,19 @@ void ExtendWindow(IChannel channel, string sessionId, int streamId, IOResult res
344347
{
345348
if (result == IOResult.Ok)
346349
{
347-
int extendedBy = channels[streamId].LocalWindow.ExtendIfNeeded();
348-
if (extendedBy is not 0)
350+
if (channels.TryGetValue(streamId, out ChannelState? channelState))
349351
{
350-
_ = WriteHeaderAsync(sessionId, channel,
351-
new YamuxHeader
352-
{
353-
Type = YamuxHeaderType.WindowUpdate,
354-
Length = extendedBy,
355-
StreamID = streamId
356-
});
352+
int extendedBy = channelState.LocalWindow.ExtendIfNeeded();
353+
if (extendedBy is not 0)
354+
{
355+
_ = WriteHeaderAsync(sessionId, channel,
356+
new YamuxHeader
357+
{
358+
Type = YamuxHeaderType.WindowUpdate,
359+
Length = extendedBy,
360+
StreamID = streamId
361+
});
362+
}
357363
}
358364
}
359365
}

0 commit comments

Comments
 (0)