diff --git a/src/libp2p/Libp2p.Core.Benchmarks/Program.cs b/src/libp2p/Libp2p.Core.Benchmarks/Program.cs index 22ceb117..34a48f8c 100644 --- a/src/libp2p/Libp2p.Core.Benchmarks/Program.cs +++ b/src/libp2p/Libp2p.Core.Benchmarks/Program.cs @@ -42,7 +42,7 @@ await Task.Run(async () => { try { - d = (await revChan.ReadAsync(0, ReadBlockingMode.WaitAny).OrThrow()); + d = await revChan.ReadAsync(0, ReadBlockingMode.WaitAny).OrThrow(); i += d.Length; } catch diff --git a/src/libp2p/Libp2p.Core.TestsBase/E2e/TestBuilder.cs b/src/libp2p/Libp2p.Core.TestsBase/E2e/TestBuilder.cs index 5aa0bab8..1c63d34e 100644 --- a/src/libp2p/Libp2p.Core.TestsBase/E2e/TestBuilder.cs +++ b/src/libp2p/Libp2p.Core.TestsBase/E2e/TestBuilder.cs @@ -28,12 +28,12 @@ .. additionalProtocols public class TestPeerFactory(IProtocolStackSettings protocolStackSettings, PeerStore peerStore, ActivitySource? activitySource = null, ILoggerFactory? loggerFactory = null) : PeerFactory(protocolStackSettings, peerStore, activitySource) { - ConcurrentDictionary peers = new(); + readonly ConcurrentDictionary peers = new(); public override ILocalPeer Create(Identity? identity = default) { ArgumentNullException.ThrowIfNull(identity); - return peers.GetOrAdd(identity.PeerId, (p) => new TestLocalPeer(identity, protocolStackSettings, peerStore, activitySource, loggerFactory)); + return peers.GetOrAdd(identity.PeerId, (p) => new TestLocalPeer(identity, protocolStackSettings, base.PeerStore, activitySource, loggerFactory)); } } diff --git a/src/libp2p/Libp2p.Core.TestsBase/LocalPeerStub.cs b/src/libp2p/Libp2p.Core.TestsBase/LocalPeerStub.cs index f3a37083..93a3619f 100644 --- a/src/libp2p/Libp2p.Core.TestsBase/LocalPeerStub.cs +++ b/src/libp2p/Libp2p.Core.TestsBase/LocalPeerStub.cs @@ -7,11 +7,11 @@ namespace Nethermind.Libp2p.Core.TestsBase; -public class LocalPeerStub : ILocalPeer +public sealed class LocalPeerStub : ILocalPeer { public LocalPeerStub() { - Identity = new(Enumerable.Repeat((byte)42, 32).ToArray()); + Identity = new([.. Enumerable.Repeat((byte)42, 32)]); Address = $"/p2p/{Identity.PeerId}"; } @@ -20,7 +20,7 @@ public LocalPeerStub() public ObservableCollection ListenAddresses => throw new NotImplementedException(); - public event Connected? OnConnected = _ => Task.CompletedTask; + public event Connected? OnConnected = _ => { }; public Task DialAsync(Multiaddress addr, CancellationToken token = default) { @@ -42,22 +42,16 @@ public ValueTask DisposeAsync() return ValueTask.CompletedTask; } - public Task StartListenAsync(Multiaddress[] addrs, CancellationToken token = default) + public Task StartListenAsync(Multiaddress[]? addrs = null, CancellationToken token = default) { return Task.CompletedTask; } } -public class TestRemotePeer : ISession +public class TestRemotePeer(Multiaddress addr) : ISession { - public TestRemotePeer(Multiaddress addr) - { - Identity = TestPeers.Identity(addr); - Address = addr; - } - - public Identity Identity { get; set; } - public Multiaddress Address { get; set; } + public Identity Identity { get; set; } = TestPeers.Identity(addr); + public Multiaddress Address { get; set; } = addr; public Multiaddress RemoteAddress => $"/p2p/{Identity.PeerId}"; diff --git a/src/libp2p/Libp2p.Core/Context/NewSessionContext.cs b/src/libp2p/Libp2p.Core/Context/NewSessionContext.cs index 2ef84f69..351263c9 100644 --- a/src/libp2p/Libp2p.Core/Context/NewSessionContext.cs +++ b/src/libp2p/Libp2p.Core/Context/NewSessionContext.cs @@ -6,9 +6,10 @@ namespace Nethermind.Libp2p.Core.Context; -public class NewSessionContext(LocalPeer localPeer, LocalPeer.Session session, ProtocolRef protocol, bool isListener, UpgradeOptions? upgradeOptions, +public sealed class NewSessionContext(LocalPeer localPeer, LocalPeer.Session session, ProtocolRef protocol, bool isListener, UpgradeOptions? upgradeOptions, ActivitySource? activitySource, Activity? parentActivity, ILoggerFactory? loggerFactory = null) - : ContextBase(localPeer, session, protocol, isListener, upgradeOptions, activitySource, parentActivity), INewSessionContext + : ContextBase(localPeer, session, protocol, isListener, upgradeOptions, activitySource, + activitySource?.CreateActivity("New session", ActivityKind.Internal, parentActivity?.Context ?? default)), INewSessionContext { private readonly ILogger? logger = loggerFactory?.CreateLogger(); @@ -16,8 +17,6 @@ public class NewSessionContext(LocalPeer localPeer, LocalPeer.Session session, P public CancellationToken Token => session.ConnectionToken; - public Activity? Activity { get; } = activitySource?.CreateActivity("New session", ActivityKind.Internal, parentActivity?.Context ?? default); - public void Dispose() { logger?.LogDebug("Disposing session context {Id}", Id); diff --git a/src/libp2p/Libp2p.Core/ILocalPeer.cs b/src/libp2p/Libp2p.Core/ILocalPeer.cs index e3ad12c6..95ff69c9 100644 --- a/src/libp2p/Libp2p.Core/ILocalPeer.cs +++ b/src/libp2p/Libp2p.Core/ILocalPeer.cs @@ -26,4 +26,4 @@ public interface ILocalPeer : IAsyncDisposable event Connected? OnConnected; } -public delegate Task Connected(ISession newSession); +public delegate void Connected(ISession newSession); diff --git a/src/libp2p/Libp2p.Core/MultiplexerSettings.cs b/src/libp2p/Libp2p.Core/MultiplexerSettings.cs index 6c12af8b..63e3403b 100644 --- a/src/libp2p/Libp2p.Core/MultiplexerSettings.cs +++ b/src/libp2p/Libp2p.Core/MultiplexerSettings.cs @@ -4,9 +4,10 @@ namespace Nethermind.Libp2p.Core; public class MultiplexerSettings { - public List _multiplexers = []; + private readonly List _multiplexers = []; public IEnumerable Multiplexers => _multiplexers; + public void Add(IProtocol multiplexerProtocol) { _multiplexers.Add(multiplexerProtocol); diff --git a/src/libp2p/Libp2p.Core/Peer.cs b/src/libp2p/Libp2p.Core/Peer.cs index ddb45cab..578c053f 100644 --- a/src/libp2p/Libp2p.Core/Peer.cs +++ b/src/libp2p/Libp2p.Core/Peer.cs @@ -24,7 +24,7 @@ public partial class LocalPeer(Identity identity, PeerStore peerStore, IProtocol protected readonly IProtocolStackSettings _protocolStackSettings = protocolStackSettings; protected readonly Activity? peerActivity = activitySource?.StartActivity($"Peer {identity.PeerId}", ActivityKind.Internal, rootActivity?.Id); - Dictionary> listenerReadyTcs = []; + readonly Dictionary> listenerReadyTcs = []; public ObservableCollection Sessions { get; } = []; public override string ToString() @@ -56,7 +56,7 @@ protected virtual Multiaddress[] GetDefaultAddresses() throw new Libp2pSetupException($"Protocols are not set in {nameof(_protocolStackSettings)}"); } - return _protocolStackSettings.TopProtocols.SelectMany(p => ITransportProtocol.GetDefaultAddresses(p.Protocol, Identity.PeerId)).ToArray(); + return [.. _protocolStackSettings.TopProtocols.SelectMany(p => ITransportProtocol.GetDefaultAddresses(p.Protocol, Identity.PeerId))]; } protected virtual IEnumerable PrepareAddresses(Multiaddress[] addrs) @@ -115,13 +115,13 @@ public virtual async Task StartListenAsync(Multiaddress[]? addrs = default, Canc ListenAddresses.Remove(tcs.Task.Result); } listenActivity?.Dispose(); - }); + }, token); - listenTasks.Add(tcs.Task.WaitAsync(TimeSpan.FromMilliseconds(5000)).ContinueWith(t => + listenTasks.Add(tcs.Task.WaitAsync(TimeSpan.FromMilliseconds(5000), token).ContinueWith(t => { if (t.IsFaulted) { - _logger?.LogDebug($"Failed to start listener for address {addr}"); + _logger?.LogDebug("Failed to start listener for address {addr}", addr); return null; } @@ -137,7 +137,7 @@ public virtual async Task StartListenAsync(Multiaddress[]? addrs = default, Canc if (addr is not null) { - _logger?.LogDebug($"Adding listener address {addr}"); + _logger?.LogDebug("Adding listener address {addr}", addr); ListenAddresses.Add(addr); } } @@ -169,7 +169,7 @@ public INewSessionContext UpgradeToSession(Session session, ProtocolRef proto, b _ = session.DisconnectAsync(); throw new SessionExistsException(remotePeerId); } - _logger?.LogDebug($"New session with {remotePeerId} ({session.RemoteAddress})"); + _logger?.LogDebug("New session with {remotePeerId} ({remoteAddress})", remotePeerId, session.RemoteAddress); Sessions.Add(session); } @@ -195,12 +195,12 @@ internal IEnumerable GetProtocolsFor(ProtocolRef protocol) throw new Libp2pSetupException($"Protocols are not set in {nameof(_protocolStackSettings)}"); } - if (!_protocolStackSettings.Protocols.ContainsKey(protocol)) + if (!_protocolStackSettings.Protocols.TryGetValue(protocol, out ProtocolRef[]? value)) { throw new Libp2pSetupException($"{protocol} is not added"); } - return _protocolStackSettings.Protocols[protocol].Select(p => p.Protocol); + return value.Select(p => p.Protocol); } @@ -264,7 +264,7 @@ public async Task DialAsync(Multiaddress addr, CancellationToken token Task dialingTask = transportProtocol.DialAsync(ctx, addr, token); - _ = dialingTask.ContinueWith(t => dialActivity?.Dispose()); + _ = dialingTask.ContinueWith(t => dialActivity?.Dispose(), token); Task dialingResult = await Task.WhenAny(dialingTask, session.Connected); @@ -306,7 +306,7 @@ private static void MapToTaskCompletionSource(Task t, TaskCompletionSource false, UpgradeModeOverride.Listen => true, _ => isListener }; - _logger?.LogInformation($"Upgrade and bind {parentProtocol} to {top}, listen={isListener}"); + _logger?.LogInformation("Upgrade and bind {parentProtocol} to {protocol}, listen={isListener}", parentProtocol, top, isListener); Task upgradeTask; Activity? upgrageActivity = activitySource?.StartActivity($"Upgrade to {top.Protocol.Id}, {(isListener ? "listen" : "dial")}", ActivityKind.Internal, activity?.Id); @@ -410,14 +410,14 @@ internal Task Upgrade(Session session, IChannel downChannel, ProtocolRef parentP }); } - upgradeTask.ContinueWith(t => + _ = upgradeTask.ContinueWith(t => { if (t.IsFaulted) { - _logger?.LogError($"Upgrade task failed with {t.Exception}"); + _logger?.LogError("Upgrade task failed with {exception}", t.Exception); } - _ = downChannel.CloseAsync(); - _logger?.LogInformation($"Finished#2 {parentProtocol} to {top}, listen={isListener}"); + _ = downChannel.CloseAsync().AsTask(); + _logger?.LogInformation("Finished#2 {parentProtocol} to {top}, listen={isListener}", parentProtocol, top, isListener); upgrageActivity?.Dispose(); }); @@ -432,6 +432,7 @@ internal Task Upgrade(Session session, IChannel downChannel, ProtocolRef parentP public async ValueTask DisposeAsync() { + GC.SuppressFinalize(this); await Task.WhenAll(Sessions.ToArray().Select(s => s?.DisconnectAsync() ?? Task.CompletedTask)); peerActivity?.Dispose(); } diff --git a/src/libp2p/Libp2p.Core/Stream.cs b/src/libp2p/Libp2p.Core/Stream.cs index 7f3e36e5..80c37a3f 100644 --- a/src/libp2p/Libp2p.Core/Stream.cs +++ b/src/libp2p/Libp2p.Core/Stream.cs @@ -1,24 +1,17 @@ // SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited // SPDX-License-Identifier: MIT -using Microsoft.Extensions.Logging; -using Nethermind.Libp2p.Core; using System.Buffers; -public class ChannelStream : Stream +namespace Nethermind.Libp2p.Core; + +public class ChannelStream(IChannel chan) : Stream { - private readonly IChannel _chan; - private readonly ILogger logger; + private readonly IChannel _chan = chan ?? throw new ArgumentNullException(nameof(chan)); private bool _disposed = false; private bool _canRead = true; private bool _canWrite = true; - // Constructor - public ChannelStream(IChannel chan) - { - _chan = chan ?? throw new ArgumentNullException(nameof(_chan)); - } - public override bool CanRead => _canRead; public override bool CanSeek => false; public override bool CanWrite => _canWrite; @@ -59,7 +52,7 @@ public override void Write(byte[] buffer, int offset, int count) public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - if ((await _chan.WriteAsync(new ReadOnlySequence(buffer.AsMemory(offset, count)))) != IOResult.Ok) + if ((await _chan.WriteAsync(new ReadOnlySequence(buffer.AsMemory(offset, count)), cancellationToken)) != IOResult.Ok) { _canWrite = false; } @@ -72,7 +65,7 @@ public override async Task ReadAsync(byte[] buffer, int offset, int count, { if (buffer is { Length: 0 } && _canRead) return 0; - ReadResult result = await _chan.ReadAsync(buffer.Length, ReadBlockingMode.WaitAny); + ReadResult result = await _chan.ReadAsync(buffer.Length, ReadBlockingMode.WaitAny, cancellationToken); if (result.Result != IOResult.Ok) { _canRead = false; diff --git a/src/libp2p/Libp2p.Core/TransportContext.cs b/src/libp2p/Libp2p.Core/TransportContext.cs index ce5fb77d..95a9d659 100644 --- a/src/libp2p/Libp2p.Core/TransportContext.cs +++ b/src/libp2p/Libp2p.Core/TransportContext.cs @@ -8,6 +8,9 @@ namespace Nethermind.Libp2p.Core; public class TransportContext(LocalPeer peer, ProtocolRef proto, bool isListener, Activity? activity) : ITransportContext { + protected readonly ProtocolRef proto = proto; + protected readonly LocalPeer peer = peer; + public Identity Identity => peer.Identity; public ILocalPeer Peer => peer; public bool IsListener => isListener; @@ -24,11 +27,11 @@ public virtual INewConnectionContext CreateConnection() } } -public class DialerTransportContext(LocalPeer peer, LocalPeer.Session session, ProtocolRef proto, Activity? context) - : TransportContext(peer, proto, false, context) +public class DialerTransportContext(LocalPeer peer, LocalPeer.Session session, ProtocolRef proto, Activity? activity) + : TransportContext(peer, proto, false, activity) { public override INewConnectionContext CreateConnection() { - return peer.CreateConnection(proto, session, false, context); + return peer.CreateConnection(proto, session, false, Activity); } } diff --git a/src/libp2p/Libp2p.OpenTelemetry/Libp2p.OpenTelemetry.csproj b/src/libp2p/Libp2p.OpenTelemetry/Libp2p.OpenTelemetry.csproj index 1a47cdc7..5ff1a40c 100644 --- a/src/libp2p/Libp2p.OpenTelemetry/Libp2p.OpenTelemetry.csproj +++ b/src/libp2p/Libp2p.OpenTelemetry/Libp2p.OpenTelemetry.csproj @@ -33,8 +33,6 @@ - - diff --git a/src/libp2p/Libp2p.Protocols.Noise/NoiseProtocol.cs b/src/libp2p/Libp2p.Protocols.Noise/NoiseProtocol.cs index 4a542887..26256def 100644 --- a/src/libp2p/Libp2p.Protocols.Noise/NoiseProtocol.cs +++ b/src/libp2p/Libp2p.Protocols.Noise/NoiseProtocol.cs @@ -12,6 +12,7 @@ using Nethermind.Libp2p.Protocols.Noise.Dto; using PublicKey = Nethermind.Libp2p.Core.Dto.PublicKey; +#pragma warning disable IDE0130 namespace Nethermind.Libp2p.Protocols; /// @@ -25,11 +26,12 @@ public class NoiseProtocol(MultiplexerSettings? multiplexerSettings = null, ILog ); private readonly ILogger? _logger = loggerFactory?.CreateLogger(); + private NoiseExtensions _extensions => new() { StreamMuxers = { } // TODO: return the following after go question resolution: //{ - // multiplexerSettings is null || !multiplexerSettings.Multiplexers.Any() ? ["na"] : [.. multiplexerSettings.Multiplexers.Select(proto => proto.Id)] + // multiplexerSettings is null || !multiplexerSettings.Multiplexers.Any() ? ["na"] : [.. multiplexerSettings.Multiplexers.Select(proto => proto.Id)] //} }; @@ -64,9 +66,7 @@ public async Task DialAsync(IChannel downChannel, IConnectionContext context) context.State.RemotePublicKey = msg1KeyDecoded; // TODO: verify signature - List responderMuxers = msg1Decoded.Extensions.StreamMuxers - .Where(m => !string.IsNullOrEmpty(m)) - .ToList(); + List responderMuxers = [.. msg1Decoded.Extensions.StreamMuxers.Where(m => !string.IsNullOrEmpty(m))]; IProtocol? commonMuxer = null;// multiplexerSettings?.Multiplexers.FirstOrDefault(m => responderMuxers.Contains(m.Id)); UpgradeOptions? upgradeOptions = null; @@ -134,9 +134,7 @@ public async Task ListenAsync(IChannel downChannel, IConnectionContext context) ReadOnlySequence msg0Bytes = await downChannel.ReadAsync(len).OrThrow(); handshakeState.ReadMessage(msg0Bytes.ToArray(), buffer); - byte[] msg = Encoding.UTF8.GetBytes(PayloadSigPrefix) - .Concat(ByteString.CopyFrom(serverStatic.PublicKey)) - .ToArray(); + byte[] msg = [.. Encoding.UTF8.GetBytes(PayloadSigPrefix), .. ByteString.CopyFrom(serverStatic.PublicKey)]; byte[] sig = context.Peer.Identity.Sign(msg); NoiseHandshakePayload payload = new() @@ -164,7 +162,7 @@ public async Task ListenAsync(IChannel downChannel, IConnectionContext context) // TODO: verify signature Transport? transport = msg2.Transport; - List initiatorMuxers = msg2Decoded.Extensions.StreamMuxers.Where(m => !string.IsNullOrEmpty(m)).ToList(); + List initiatorMuxers = [.. msg2Decoded.Extensions.StreamMuxers.Where(m => !string.IsNullOrEmpty(m))]; IProtocol? commonMuxer = null; // multiplexerSettings?.Multiplexers.FirstOrDefault(m => initiatorMuxers.Contains(m.Id)); UpgradeOptions? upgradeOptions = null; diff --git a/src/libp2p/Libp2p.Protocols.Pubsub/PubsubProtocol.cs b/src/libp2p/Libp2p.Protocols.Pubsub/PubsubProtocol.cs index 4663ba6a..28fc7d59 100644 --- a/src/libp2p/Libp2p.Protocols.Pubsub/PubsubProtocol.cs +++ b/src/libp2p/Libp2p.Protocols.Pubsub/PubsubProtocol.cs @@ -7,12 +7,14 @@ using Nethermind.Libp2p.Protocols.Pubsub.Dto; using System.Diagnostics; +#pragma warning disable IDE0130 namespace Nethermind.Libp2p.Protocols; /// /// https://github.com/libp2p/specs/tree/master/pubsub /// public abstract class PubsubProtocol : ISessionProtocol +#pragma warning restore IDE0079 // Remove unnecessary suppression { private readonly ILogger? _logger; private readonly PubsubRouter router; @@ -34,7 +36,7 @@ public async Task DialAsync(IChannel channel, ISessionContext context) PeerId? remotePeerId = context.State.RemotePeerId; - _logger?.LogDebug($"Dialed({context.Id}) {context.State.RemoteAddress}"); + _logger?.LogDebug("Dialed({contextId}) {remoteAddress}", context.Id, context.State.RemoteAddress); TaskCompletionSource dialTcs = new(); CancellationToken token = router.OutboundConnection(context.State.RemoteAddress, Id, dialTcs.Task, (rpc) => @@ -61,12 +63,12 @@ public async Task ListenAsync(IChannel channel, ISessionContext context) PeerId? remotePeerId = context.State.RemotePeerId; - _logger?.LogDebug($"Listen({context.Id}) to {context.State.RemoteAddress}"); + _logger?.LogDebug("Listen({contextId}) to {remoteAddress}", context.Id, context.State.RemoteAddress); TaskCompletionSource listTcs = new(); TaskCompletionSource dialTcs = new(); - CancellationToken token = router.InboundConnection(context.State.RemoteAddress, Id, listTcs.Task, dialTcs.Task, () => + CancellationToken token = router.InboundConnection(context.State.RemoteAddress, Id, listTcs.Task, () => { _ = context.DialAsync(this); return dialTcs.Task; @@ -79,16 +81,14 @@ public async Task ListenAsync(IChannel channel, ISessionContext context) Rpc? rpc = await channel.ReadPrefixedProtobufAsync(Rpc.Parser, token); if (rpc is null) { - string logMessage = $"Received a broken message or EOF from {remotePeerId}"; - _logger?.LogDebug(logMessage); - context.Activity?.AddEvent(new ActivityEvent(logMessage)); + _logger?.LogDebug("Received a broken message or EOF from {remotePeerId}", remotePeerId); + context.Activity?.AddEvent(new ActivityEvent($"Received a broken message or EOF from {remotePeerId}")); break; } else { - string logMessage = $"Received message from {remotePeerId}: {rpc}"; - _logger?.LogTrace(logMessage); - context.Activity?.AddEvent(new ActivityEvent(logMessage)); + _logger?.LogTrace("Received message from {remotePeerId}: {rpc}", remotePeerId, rpc); + context.Activity?.AddEvent(new ActivityEvent($"Received message from {remotePeerId}: {rpc}")); router.OnRpc(remotePeerId, rpc); } } diff --git a/src/libp2p/Libp2p.Protocols.Pubsub/PubsubRouter.cs b/src/libp2p/Libp2p.Protocols.Pubsub/PubsubRouter.cs index 9f8debe1..3cc8e9b0 100644 --- a/src/libp2p/Libp2p.Protocols.Pubsub/PubsubRouter.cs +++ b/src/libp2p/Libp2p.Protocols.Pubsub/PubsubRouter.cs @@ -32,23 +32,14 @@ public override string ToString() public const string GossipsubProtocolVersionV11 = "/meshsub/1.1.0"; public const string GossipsubProtocolVersionV12 = "/meshsub/1.2.0"; - class PubsubPeer + class PubsubPeer(PeerId peerId, string protocolId, ILogger? logger, Multiaddress address, ConnectionInitiation initialisedBy) { - public PubsubPeer(PeerId peerId, string protocolId, ILogger? logger) - { - PeerId = peerId; - _logger = logger; - Protocol = protocolId switch - { - GossipsubProtocolVersionV10 => PubsubProtocol.GossipsubV10, - GossipsubProtocolVersionV11 => PubsubProtocol.GossipsubV11, - GossipsubProtocolVersionV12 => PubsubProtocol.GossipsubV12, - _ => PubsubProtocol.Floodsub, - }; - TokenSource = new CancellationTokenSource(); - Backoff = []; - SendRpcQueue = new ConcurrentQueue(); - } + public CancellationTokenSource TokenSource { get; init; } = new CancellationTokenSource(); + public PeerId PeerId { get; set; } = peerId; + public bool IsGossipSub => (Protocol & PubsubProtocol.AnyGossipsub) != PubsubProtocol.None; + public bool IsFloodSub => Protocol == PubsubProtocol.Floodsub; + public ConnectionInitiation InititatedBy { get; internal set; } = initialisedBy; + public Multiaddress Address { get; } = address; public enum PubsubProtocol { @@ -57,7 +48,9 @@ public enum PubsubProtocol GossipsubV10 = 2, GossipsubV11 = 4, GossipsubV12 = 8, - AnyGossipsub = GossipsubV10 | GossipsubV11 | GossipsubV12, + GossipsubV13 = 16, + GossipsubV14 = 32, + AnyGossipsub = GossipsubV10 | GossipsubV11 | GossipsubV12 | GossipsubV13 | GossipsubV14, } public void Send(Rpc rpc) @@ -74,16 +67,16 @@ public void Send(Rpc rpc) } } } - public Dictionary Backoff { get; internal set; } - public ConcurrentQueue SendRpcQueue { get; } + public Dictionary Backoff { get; internal set; } = []; + public ConcurrentQueue SendRpcQueue { get; } = new ConcurrentQueue(); private Action? _sendRpc; - private readonly ILogger? _logger; + private readonly ILogger? _logger = logger; public Action? SendRpc { get => _sendRpc; set { - _logger?.LogDebug($"Set SENDRPC for {PeerId}: {value}"); + _logger?.LogDebug("Set SENDRPC for {peerId}: {value}", PeerId, value); _sendRpc = value; if (_sendRpc is not null) lock (SendRpcQueue) @@ -95,15 +88,14 @@ public Action? SendRpc } } } - public CancellationTokenSource TokenSource { get; init; } - public PeerId PeerId { get; set; } - - public PubsubProtocol Protocol { get; set; } - public bool IsGossipSub => (Protocol & PubsubProtocol.AnyGossipsub) != PubsubProtocol.None; - public bool IsFloodSub => Protocol == PubsubProtocol.Floodsub; - public ConnectionInitiation InititatedBy { get; internal set; } - public Multiaddress Address { get; internal set; } + public PubsubProtocol Protocol { get; set; } = protocolId switch + { + GossipsubProtocolVersionV10 => PubsubProtocol.GossipsubV10, + GossipsubProtocolVersionV11 => PubsubProtocol.GossipsubV11, + GossipsubProtocolVersionV12 => PubsubProtocol.GossipsubV12, + _ => PubsubProtocol.Floodsub, + }; } private static readonly CancellationToken Canceled; @@ -172,7 +164,7 @@ public PubsubRouter(PeerStore store, PubsubSettings? settings = null, ILoggerFac public Task StartAsync(ILocalPeer localPeer, CancellationToken token = default) { - logger?.LogDebug($"Running pubsub for {string.Join(",", localPeer.ListenAddresses)}"); + logger?.LogDebug("Running pubsub for {listenAddresses}", string.Join(",", localPeer.ListenAddresses)); if (this.localPeer is not null) { @@ -218,6 +210,8 @@ async Task LoopReconnect() private async Task Connect(Multiaddress[] addrs, CancellationToken token, bool reconnect = false) { + ArgumentNullException.ThrowIfNull(localPeer); + try { ISession session = await localPeer.DialAsync(addrs, token); @@ -246,7 +240,7 @@ private async Task Connect(Multiaddress[] addrs, CancellationToken token, bool r _ = session.DisconnectAsync(); return; } - logger?.LogDebug($"Dialing ended to {session.RemoteAddress}"); + logger?.LogDebug("Dialing ended to {remoteAddress}", session.RemoteAddress); if (peerState.TryGetValue(session.RemoteAddress.GetPeerId()!, out PubsubPeer? state) && state.InititatedBy == ConnectionInitiation.Remote) { _ = session.DisconnectAsync(); @@ -255,13 +249,14 @@ private async Task Connect(Multiaddress[] addrs, CancellationToken token, bool r } catch (Exception e) { - logger?.LogDebug($"Adding reconnections for {string.Join(",", addrs.Select(a => a.ToString()))}: {e.Message}"); + logger?.LogDebug("Adding reconnections for {addrs}: {message}", string.Join(",", addrs.Select(a => a.ToString())), e.Message); if (reconnect) reconnections.Add(new Reconnection(addrs, _settings.ReconnectionAttempts)); } } public void Dispose() { + GC.SuppressFinalize(this); _messageCache.Dispose(); _limboMessageCache.Dispose(); } @@ -272,7 +267,7 @@ private void Reconnect(CancellationToken token) for (int rCount = 0; reconnections.TryTake(out Reconnection? rec) && rCount < MaxParallelReconnections; rCount++) { - logger?.LogDebug($"Reconnect to {string.Join(",", rec.Addresses.Select(a => a.ToString()))}"); + logger?.LogDebug("Reconnect to {addrs}", string.Join(",", rec.Addresses.Select(a => a.ToString()))); _ = Connect(rec.Addresses, token, true).ContinueWith(t => { if (t.IsFaulted && rec.Attempts != 1) @@ -292,9 +287,9 @@ public Task Heartbeat() { if (mesh.Value.Count < _settings.LowestDegree) { - PeerId[] peersToGraft = gPeers[mesh.Key] + PeerId[] peersToGraft = [.. gPeers[mesh.Key] .Where(p => !mesh.Value.Contains(p) && (peerState.GetValueOrDefault(p)?.Backoff.TryGetValue(mesh.Key, out DateTime backoff) != true || backoff < DateTime.Now)) - .Take(_settings.Degree - mesh.Value.Count).ToArray(); + .Take(_settings.Degree - mesh.Value.Count)]; foreach (PeerId peerId in peersToGraft) { mesh.Value.Add(peerId); @@ -305,7 +300,7 @@ public Task Heartbeat() } else if (mesh.Value.Count > _settings.HighestDegree) { - PeerId[] peerstoPrune = mesh.Value.Take(mesh.Value.Count - _settings.HighestDegree).ToArray(); + PeerId[] peerstoPrune = [.. mesh.Value.Take(mesh.Value.Count - _settings.HighestDegree)]; foreach (PeerId? peerId in peerstoPrune) { mesh.Value.Remove(peerId); @@ -381,7 +376,7 @@ internal CancellationToken OutboundConnection(Multiaddress addr, string protocol return Canceled; } - PubsubPeer peer = peerState.GetOrAdd(peerId, (id) => new PubsubPeer(peerId, protocolId, logger) { Address = addr, SendRpc = sendRpc, InititatedBy = ConnectionInitiation.Local }); + PubsubPeer peer = peerState.GetOrAdd(peerId, (id) => new PubsubPeer(peerId, protocolId, logger, addr, ConnectionInitiation.Local) { SendRpc = sendRpc, InititatedBy = ConnectionInitiation.Local }); lock (peer) { @@ -424,9 +419,9 @@ internal CancellationToken OutboundConnection(Multiaddress addr, string protocol reconnections.Add(new Reconnection([addr], _settings.ReconnectionAttempts)); }); - string[] topics = topicState.Keys.ToArray(); + string[] topics = [.. topicState.Keys]; - if (topics.Any()) + if (topics.Length != 0) { logger?.LogDebug("Topics sent to {peerId}: {topics}", peerId, string.Join(",", topics)); @@ -439,9 +434,9 @@ internal CancellationToken OutboundConnection(Multiaddress addr, string protocol } } - internal CancellationToken InboundConnection(Multiaddress addr, string protocolId, Task listTask, Task dialTask, Func subDial) + internal CancellationToken InboundConnection(Multiaddress remoteAddr, string protocolId, Task listTask, Func subDial) { - PeerId? peerId = addr.GetPeerId(); + PeerId? peerId = remoteAddr.GetPeerId(); if (peerId is null || peerId == localPeer!.Identity.PeerId) { @@ -449,7 +444,7 @@ internal CancellationToken InboundConnection(Multiaddress addr, string protocolI } PubsubPeer? newPeer = null; - PubsubPeer existingPeer = peerState.GetOrAdd(peerId, (id) => newPeer = new PubsubPeer(peerId, protocolId, logger) { Address = addr, InititatedBy = ConnectionInitiation.Remote }); + PubsubPeer existingPeer = peerState.GetOrAdd(peerId, (id) => newPeer = new PubsubPeer(peerId, protocolId, logger, remoteAddr, ConnectionInitiation.Remote)); lock (existingPeer) { @@ -476,7 +471,7 @@ internal CancellationToken InboundConnection(Multiaddress addr, string protocolI { topicPeers.Value.Remove(peerId); } - reconnections.Add(new Reconnection([addr], _settings.ReconnectionAttempts)); + reconnections.Add(new Reconnection([remoteAddr], _settings.ReconnectionAttempts)); }); subDial(); diff --git a/src/samples/chat/ConsoleReader.cs b/src/samples/chat/ConsoleReader.cs index d11eb6bd..3e6a7e5e 100644 --- a/src/samples/chat/ConsoleReader.cs +++ b/src/samples/chat/ConsoleReader.cs @@ -14,16 +14,16 @@ public Task ReadLineAsync(CancellationToken token = default) if (!_isRequested) { _isRequested = true; - Task.Run(() => + _ = Task.Run(() => { - string? input = Console.ReadLine(); - while (_requests.TryDequeue(out TaskCompletionSource src)) + string input = Console.ReadLine()!; + while (_requests.TryDequeue(out TaskCompletionSource? src)) { Task.Run(() => src.SetResult(input)); } _isRequested = false; - }); + }, token); } return result.Task; diff --git a/src/samples/chat/Program.cs b/src/samples/chat/Program.cs index bdb7b9a8..d88950df 100644 --- a/src/samples/chat/Program.cs +++ b/src/samples/chat/Program.cs @@ -41,14 +41,14 @@ } else { - Identity optionalFixedIdentity = new(Enumerable.Repeat((byte)42, 32).ToArray()); + Identity optionalFixedIdentity = new([.. Enumerable.Repeat((byte)42, 32)]); await using ILocalPeer peer = peerFactory.Create(optionalFixedIdentity); string addrTemplate = args.Contains("-quic") ? "/ip4/0.0.0.0/udp/{0}/quic-v1" : "/ip4/0.0.0.0/tcp/{0}"; - peer.OnConnected += async newSession => logger.LogInformation("A peer connected {remote}", newSession.RemoteAddress); + peer.OnConnected += newSession => logger.LogInformation("A peer connected {remote}", newSession.RemoteAddress); await peer.StartListenAsync( [string.Format(addrTemplate, args.Length > 0 && args[0] == "-sp" ? args[1] : "0")], diff --git a/src/samples/transport-interop/Program.cs b/src/samples/transport-interop/Program.cs index 62cc206d..2744e4da 100644 --- a/src/samples/transport-interop/Program.cs +++ b/src/samples/transport-interop/Program.cs @@ -63,16 +63,16 @@ { if (ip == "0.0.0.0") { - List d = NetworkInterface.GetAllNetworkInterfaces()! + List d = [.. NetworkInterface.GetAllNetworkInterfaces()! .Where(i => i.Name == "eth0" || (i.OperationalStatus == OperationalStatus.Up && - i.NetworkInterfaceType == NetworkInterfaceType.Ethernet)).ToList(); + i.NetworkInterfaceType == NetworkInterfaceType.Ethernet))]; IEnumerable addresses = NetworkInterface.GetAllNetworkInterfaces()! .Where(i => i.Name == "eth0" || (i.OperationalStatus == OperationalStatus.Up && i.NetworkInterfaceType == NetworkInterfaceType.Ethernet && - i.GetIPProperties().GatewayAddresses.Any()) + i.GetIPProperties().GatewayAddresses.Count != 0) ).First() .GetIPProperties() .UnicastAddresses @@ -86,7 +86,7 @@ CancellationTokenSource listennTcs = new(); await localPeer.StartListenAsync([builder.MakeAddress(ip)], listennTcs.Token); - localPeer.OnConnected += (session) => { Log($"Connected {session.RemoteAddress}"); return Task.CompletedTask; }; + localPeer.OnConnected += (session) => Log($"Connected {session.RemoteAddress}"); Log($"Listening on {string.Join(", ", localPeer.ListenAddresses)}"); db.ListRightPush(new RedisKey("listenerAddr"), new RedisValue(localPeer.ListenAddresses.First().ToString())); await Task.Delay(testTimeoutSeconds * 1000); @@ -156,7 +156,10 @@ protected override ProtocolRef[] BuildStack(IEnumerable additionalP _ => throw new NotImplementedException(), }]; - selector = Connect(selector, transportStack, [Get()], muxerStack, [Get()]); + selector = Connect(selector, + transportStack, [Get()], + securityStack, [Get()], + muxerStack, [Get()]); } ProtocolRef[] apps = [Get(), Get()];