Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/libp2p/Libp2p.Core.Benchmarks/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ await Task.Run(async () =>
{
try
{
d = (await revChan.ReadAsync(0, ReadBlockingMode.WaitAny).OrThrow());
d = await revChan.ReadAsync(0, ReadBlockingMode.WaitAny).OrThrow();
i += d.Length;
}
catch
Expand Down
4 changes: 2 additions & 2 deletions src/libp2p/Libp2p.Core.TestsBase/E2e/TestBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ .. additionalProtocols

public class TestPeerFactory(IProtocolStackSettings protocolStackSettings, PeerStore peerStore, ActivitySource? activitySource = null, ILoggerFactory? loggerFactory = null) : PeerFactory(protocolStackSettings, peerStore, activitySource)
{
ConcurrentDictionary<PeerId, ILocalPeer> peers = new();
readonly ConcurrentDictionary<PeerId, ILocalPeer> peers = new();

public override ILocalPeer Create(Identity? identity = default)
{
ArgumentNullException.ThrowIfNull(identity);
return peers.GetOrAdd(identity.PeerId, (p) => new TestLocalPeer(identity, protocolStackSettings, peerStore, activitySource, loggerFactory));
return peers.GetOrAdd(identity.PeerId, (p) => new TestLocalPeer(identity, protocolStackSettings, base.PeerStore, activitySource, loggerFactory));
}
}

Expand Down
20 changes: 7 additions & 13 deletions src/libp2p/Libp2p.Core.TestsBase/LocalPeerStub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@

namespace Nethermind.Libp2p.Core.TestsBase;

public class LocalPeerStub : ILocalPeer
public sealed class LocalPeerStub : ILocalPeer
{
public LocalPeerStub()
{
Identity = new(Enumerable.Repeat((byte)42, 32).ToArray());
Identity = new([.. Enumerable.Repeat((byte)42, 32)]);
Address = $"/p2p/{Identity.PeerId}";
}

Expand All @@ -20,7 +20,7 @@ public LocalPeerStub()

public ObservableCollection<Multiaddress> ListenAddresses => throw new NotImplementedException();

public event Connected? OnConnected = _ => Task.CompletedTask;
public event Connected? OnConnected = _ => { };

public Task<ISession> DialAsync(Multiaddress addr, CancellationToken token = default)
{
Expand All @@ -42,22 +42,16 @@ public ValueTask DisposeAsync()
return ValueTask.CompletedTask;
}

public Task StartListenAsync(Multiaddress[] addrs, CancellationToken token = default)
public Task StartListenAsync(Multiaddress[]? addrs = null, CancellationToken token = default)
{
return Task.CompletedTask;
}
}

public class TestRemotePeer : ISession
public class TestRemotePeer(Multiaddress addr) : ISession
{
public TestRemotePeer(Multiaddress addr)
{
Identity = TestPeers.Identity(addr);
Address = addr;
}

public Identity Identity { get; set; }
public Multiaddress Address { get; set; }
public Identity Identity { get; set; } = TestPeers.Identity(addr);
public Multiaddress Address { get; set; } = addr;

public Multiaddress RemoteAddress => $"/p2p/{Identity.PeerId}";

Expand Down
7 changes: 3 additions & 4 deletions src/libp2p/Libp2p.Core/Context/NewSessionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,17 @@

namespace Nethermind.Libp2p.Core.Context;

public class NewSessionContext(LocalPeer localPeer, LocalPeer.Session session, ProtocolRef protocol, bool isListener, UpgradeOptions? upgradeOptions,
public sealed class NewSessionContext(LocalPeer localPeer, LocalPeer.Session session, ProtocolRef protocol, bool isListener, UpgradeOptions? upgradeOptions,
ActivitySource? activitySource, Activity? parentActivity, ILoggerFactory? loggerFactory = null)
: ContextBase(localPeer, session, protocol, isListener, upgradeOptions, activitySource, parentActivity), INewSessionContext
: ContextBase(localPeer, session, protocol, isListener, upgradeOptions, activitySource,
activitySource?.CreateActivity("New session", ActivityKind.Internal, parentActivity?.Context ?? default)), INewSessionContext
{
private readonly ILogger? logger = loggerFactory?.CreateLogger<NewSessionContext>();

public IEnumerable<UpgradeOptions> DialRequests => session.GetRequestQueue();

public CancellationToken Token => session.ConnectionToken;

public Activity? Activity { get; } = activitySource?.CreateActivity("New session", ActivityKind.Internal, parentActivity?.Context ?? default);

public void Dispose()
{
logger?.LogDebug("Disposing session context {Id}", Id);
Expand Down
2 changes: 1 addition & 1 deletion src/libp2p/Libp2p.Core/ILocalPeer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ public interface ILocalPeer : IAsyncDisposable
event Connected? OnConnected;
}

public delegate Task Connected(ISession newSession);
public delegate void Connected(ISession newSession);
3 changes: 2 additions & 1 deletion src/libp2p/Libp2p.Core/MultiplexerSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
namespace Nethermind.Libp2p.Core;
public class MultiplexerSettings
{
public List<IProtocol> _multiplexers = [];
private readonly List<IProtocol> _multiplexers = [];

public IEnumerable<IProtocol> Multiplexers => _multiplexers;

public void Add(IProtocol multiplexerProtocol)
{
_multiplexers.Add(multiplexerProtocol);
Expand Down
33 changes: 17 additions & 16 deletions src/libp2p/Libp2p.Core/Peer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public partial class LocalPeer(Identity identity, PeerStore peerStore, IProtocol
protected readonly IProtocolStackSettings _protocolStackSettings = protocolStackSettings;
protected readonly Activity? peerActivity = activitySource?.StartActivity($"Peer {identity.PeerId}", ActivityKind.Internal, rootActivity?.Id);

Dictionary<object, TaskCompletionSource<Multiaddress>> listenerReadyTcs = [];
readonly Dictionary<object, TaskCompletionSource<Multiaddress>> listenerReadyTcs = [];
public ObservableCollection<Session> Sessions { get; } = [];

public override string ToString()
Expand Down Expand Up @@ -56,7 +56,7 @@ protected virtual Multiaddress[] GetDefaultAddresses()
throw new Libp2pSetupException($"Protocols are not set in {nameof(_protocolStackSettings)}");
}

return _protocolStackSettings.TopProtocols.SelectMany(p => ITransportProtocol.GetDefaultAddresses(p.Protocol, Identity.PeerId)).ToArray();
return [.. _protocolStackSettings.TopProtocols.SelectMany(p => ITransportProtocol.GetDefaultAddresses(p.Protocol, Identity.PeerId))];
}

protected virtual IEnumerable<Multiaddress> PrepareAddresses(Multiaddress[] addrs)
Expand Down Expand Up @@ -115,13 +115,13 @@ public virtual async Task StartListenAsync(Multiaddress[]? addrs = default, Canc
ListenAddresses.Remove(tcs.Task.Result);
}
listenActivity?.Dispose();
});
}, token);

listenTasks.Add(tcs.Task.WaitAsync(TimeSpan.FromMilliseconds(5000)).ContinueWith(t =>
listenTasks.Add(tcs.Task.WaitAsync(TimeSpan.FromMilliseconds(5000), token).ContinueWith(t =>
{
if (t.IsFaulted)
{
_logger?.LogDebug($"Failed to start listener for address {addr}");
_logger?.LogDebug("Failed to start listener for address {addr}", addr);
return null;
}

Expand All @@ -137,7 +137,7 @@ public virtual async Task StartListenAsync(Multiaddress[]? addrs = default, Canc

if (addr is not null)
{
_logger?.LogDebug($"Adding listener address {addr}");
_logger?.LogDebug("Adding listener address {addr}", addr);
ListenAddresses.Add(addr);
}
}
Expand Down Expand Up @@ -169,7 +169,7 @@ public INewSessionContext UpgradeToSession(Session session, ProtocolRef proto, b
_ = session.DisconnectAsync();
throw new SessionExistsException(remotePeerId);
}
_logger?.LogDebug($"New session with {remotePeerId} ({session.RemoteAddress})");
_logger?.LogDebug("New session with {remotePeerId} ({remoteAddress})", remotePeerId, session.RemoteAddress);
Sessions.Add(session);
}

Expand All @@ -195,12 +195,12 @@ internal IEnumerable<IProtocol> GetProtocolsFor(ProtocolRef protocol)
throw new Libp2pSetupException($"Protocols are not set in {nameof(_protocolStackSettings)}");
}

if (!_protocolStackSettings.Protocols.ContainsKey(protocol))
if (!_protocolStackSettings.Protocols.TryGetValue(protocol, out ProtocolRef[]? value))
{
throw new Libp2pSetupException($"{protocol} is not added");
}

return _protocolStackSettings.Protocols[protocol].Select(p => p.Protocol);
return value.Select(p => p.Protocol);
}


Expand Down Expand Up @@ -264,7 +264,7 @@ public async Task<ISession> DialAsync(Multiaddress addr, CancellationToken token

Task dialingTask = transportProtocol.DialAsync(ctx, addr, token);

_ = dialingTask.ContinueWith(t => dialActivity?.Dispose());
_ = dialingTask.ContinueWith(t => dialActivity?.Dispose(), token);

Task dialingResult = await Task.WhenAny(dialingTask, session.Connected);

Expand Down Expand Up @@ -306,7 +306,7 @@ private static void MapToTaskCompletionSource(Task t, TaskCompletionSource<objec
{
if (t.IsCompletedSuccessfully)
{
tcs.SetResult(t.GetType().GenericTypeArguments.Any() ? t.GetType().GetProperty("Result")!.GetValue(t) : null);
tcs.SetResult(t.GetType().GenericTypeArguments.Length != 0 ? t.GetType().GetProperty("Result")!.GetValue(t) : null);
return;
}
if (t.IsCanceled)
Expand Down Expand Up @@ -344,7 +344,7 @@ internal Task Upgrade(Session session, IChannel downChannel, ProtocolRef parentP

isListener = options?.ModeOverride switch { UpgradeModeOverride.Dial => false, UpgradeModeOverride.Listen => true, _ => isListener };

_logger?.LogInformation($"Upgrade and bind {parentProtocol} to {top}, listen={isListener}");
_logger?.LogInformation("Upgrade and bind {parentProtocol} to {protocol}, listen={isListener}", parentProtocol, top, isListener);

Task upgradeTask;
Activity? upgrageActivity = activitySource?.StartActivity($"Upgrade to {top.Protocol.Id}, {(isListener ? "listen" : "dial")}", ActivityKind.Internal, activity?.Id);
Expand Down Expand Up @@ -410,14 +410,14 @@ internal Task Upgrade(Session session, IChannel downChannel, ProtocolRef parentP
});
}

upgradeTask.ContinueWith(t =>
_ = upgradeTask.ContinueWith(t =>
{
if (t.IsFaulted)
{
_logger?.LogError($"Upgrade task failed with {t.Exception}");
_logger?.LogError("Upgrade task failed with {exception}", t.Exception);
}
_ = downChannel.CloseAsync();
_logger?.LogInformation($"Finished#2 {parentProtocol} to {top}, listen={isListener}");
_ = downChannel.CloseAsync().AsTask();
_logger?.LogInformation("Finished#2 {parentProtocol} to {top}, listen={isListener}", parentProtocol, top, isListener);
upgrageActivity?.Dispose();
});

Expand All @@ -432,6 +432,7 @@ internal Task Upgrade(Session session, IChannel downChannel, ProtocolRef parentP

public async ValueTask DisposeAsync()
{
GC.SuppressFinalize(this);
await Task.WhenAll(Sessions.ToArray().Select(s => s?.DisconnectAsync() ?? Task.CompletedTask));
peerActivity?.Dispose();
}
Expand Down
19 changes: 6 additions & 13 deletions src/libp2p/Libp2p.Core/Stream.cs
Original file line number Diff line number Diff line change
@@ -1,24 +1,17 @@
// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
// SPDX-License-Identifier: MIT

using Microsoft.Extensions.Logging;
using Nethermind.Libp2p.Core;
using System.Buffers;

public class ChannelStream : Stream
namespace Nethermind.Libp2p.Core;

public class ChannelStream(IChannel chan) : Stream
{
private readonly IChannel _chan;
private readonly ILogger<ChannelStream> logger;
private readonly IChannel _chan = chan ?? throw new ArgumentNullException(nameof(chan));
private bool _disposed = false;
private bool _canRead = true;
private bool _canWrite = true;

// Constructor
public ChannelStream(IChannel chan)
{
_chan = chan ?? throw new ArgumentNullException(nameof(_chan));
}

public override bool CanRead => _canRead;
public override bool CanSeek => false;
public override bool CanWrite => _canWrite;
Expand Down Expand Up @@ -59,7 +52,7 @@ public override void Write(byte[] buffer, int offset, int count)

public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if ((await _chan.WriteAsync(new ReadOnlySequence<byte>(buffer.AsMemory(offset, count)))) != IOResult.Ok)
if ((await _chan.WriteAsync(new ReadOnlySequence<byte>(buffer.AsMemory(offset, count)), cancellationToken)) != IOResult.Ok)
{
_canWrite = false;
}
Expand All @@ -72,7 +65,7 @@ public override async Task<int> ReadAsync(byte[] buffer, int offset, int count,
{
if (buffer is { Length: 0 } && _canRead) return 0;

ReadResult result = await _chan.ReadAsync(buffer.Length, ReadBlockingMode.WaitAny);
ReadResult result = await _chan.ReadAsync(buffer.Length, ReadBlockingMode.WaitAny, cancellationToken);
if (result.Result != IOResult.Ok)
{
_canRead = false;
Expand Down
9 changes: 6 additions & 3 deletions src/libp2p/Libp2p.Core/TransportContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ namespace Nethermind.Libp2p.Core;

public class TransportContext(LocalPeer peer, ProtocolRef proto, bool isListener, Activity? activity) : ITransportContext
{
protected readonly ProtocolRef proto = proto;
protected readonly LocalPeer peer = peer;

public Identity Identity => peer.Identity;
public ILocalPeer Peer => peer;
public bool IsListener => isListener;
Expand All @@ -24,11 +27,11 @@ public virtual INewConnectionContext CreateConnection()
}
}

public class DialerTransportContext(LocalPeer peer, LocalPeer.Session session, ProtocolRef proto, Activity? context)
: TransportContext(peer, proto, false, context)
public class DialerTransportContext(LocalPeer peer, LocalPeer.Session session, ProtocolRef proto, Activity? activity)
: TransportContext(peer, proto, false, activity)
{
public override INewConnectionContext CreateConnection()
{
return peer.CreateConnection(proto, session, false, context);
return peer.CreateConnection(proto, session, false, Activity);
}
}
2 changes: 0 additions & 2 deletions src/libp2p/Libp2p.OpenTelemetry/Libp2p.OpenTelemetry.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
<PackageReference Include="SimpleBase" />
<PackageReference Include="OpenTelemetry.Exporter.Console"/>
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" />
</ItemGroup>

</Project>
14 changes: 6 additions & 8 deletions src/libp2p/Libp2p.Protocols.Noise/NoiseProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Nethermind.Libp2p.Protocols.Noise.Dto;
using PublicKey = Nethermind.Libp2p.Core.Dto.PublicKey;

#pragma warning disable IDE0130
namespace Nethermind.Libp2p.Protocols;

/// <summary>
Expand All @@ -25,11 +26,12 @@ public class NoiseProtocol(MultiplexerSettings? multiplexerSettings = null, ILog
);

private readonly ILogger? _logger = loggerFactory?.CreateLogger<NoiseProtocol>();

private NoiseExtensions _extensions => new()
{
StreamMuxers = { } // TODO: return the following after go question resolution:
//{
// multiplexerSettings is null || !multiplexerSettings.Multiplexers.Any() ? ["na"] : [.. multiplexerSettings.Multiplexers.Select(proto => proto.Id)]
// multiplexerSettings is null || !multiplexerSettings.Multiplexers.Any() ? ["na"] : [.. multiplexerSettings.Multiplexers.Select(proto => proto.Id)]
//}
};

Expand Down Expand Up @@ -64,9 +66,7 @@ public async Task DialAsync(IChannel downChannel, IConnectionContext context)
context.State.RemotePublicKey = msg1KeyDecoded;
// TODO: verify signature

List<string> responderMuxers = msg1Decoded.Extensions.StreamMuxers
.Where(m => !string.IsNullOrEmpty(m))
.ToList();
List<string> responderMuxers = [.. msg1Decoded.Extensions.StreamMuxers.Where(m => !string.IsNullOrEmpty(m))];
IProtocol? commonMuxer = null;// multiplexerSettings?.Multiplexers.FirstOrDefault(m => responderMuxers.Contains(m.Id));

UpgradeOptions? upgradeOptions = null;
Expand Down Expand Up @@ -134,9 +134,7 @@ public async Task ListenAsync(IChannel downChannel, IConnectionContext context)
ReadOnlySequence<byte> msg0Bytes = await downChannel.ReadAsync(len).OrThrow();
handshakeState.ReadMessage(msg0Bytes.ToArray(), buffer);

byte[] msg = Encoding.UTF8.GetBytes(PayloadSigPrefix)
.Concat(ByteString.CopyFrom(serverStatic.PublicKey))
.ToArray();
byte[] msg = [.. Encoding.UTF8.GetBytes(PayloadSigPrefix), .. ByteString.CopyFrom(serverStatic.PublicKey)];
byte[] sig = context.Peer.Identity.Sign(msg);

NoiseHandshakePayload payload = new()
Expand Down Expand Up @@ -164,7 +162,7 @@ public async Task ListenAsync(IChannel downChannel, IConnectionContext context)
// TODO: verify signature

Transport? transport = msg2.Transport;
List<string> initiatorMuxers = msg2Decoded.Extensions.StreamMuxers.Where(m => !string.IsNullOrEmpty(m)).ToList();
List<string> initiatorMuxers = [.. msg2Decoded.Extensions.StreamMuxers.Where(m => !string.IsNullOrEmpty(m))];
IProtocol? commonMuxer = null; // multiplexerSettings?.Multiplexers.FirstOrDefault(m => initiatorMuxers.Contains(m.Id));

UpgradeOptions? upgradeOptions = null;
Expand Down
Loading
Loading