From 1f0b4144789dd388964491afa26ac9bfb5f67b8c Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Thu, 1 Sep 2022 12:25:09 +0000 Subject: [PATCH 01/15] chore(dot/network): use `mdns` instead of `mdns_legacy` --- dot/network/mdns.go | 33 +++++++++------------------------ 1 file changed, 9 insertions(+), 24 deletions(-) diff --git a/dot/network/mdns.go b/dot/network/mdns.go index 97a08e2575..a6376f5295 100644 --- a/dot/network/mdns.go +++ b/dot/network/mdns.go @@ -5,18 +5,14 @@ package network import ( "context" - "time" "github.com/ChainSafe/gossamer/internal/log" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" - libp2pdiscovery "github.com/libp2p/go-libp2p/p2p/discovery/mdns_legacy" + libp2pdiscovery "github.com/libp2p/go-libp2p/p2p/discovery/mdns" ) -// MDNSPeriod is 1 minute -const MDNSPeriod = time.Minute - -// Notifee See https://godoc.org/github.com/libp2p/go-libp2p/p2p/discovery#Notifee +// Notifee see https://pkg.go.dev/github.com/libp2p/go-libp2p@v0.22.0/p2p/discovery/mdns#Notifee type Notifee struct { logger Logger ctx context.Context @@ -41,29 +37,18 @@ func newMDNS(host *host) *mdns { // startMDNS starts a new mDNS discovery service func (m *mdns) start() { m.logger.Debugf( - "Starting mDNS discovery service with host %s, period %s and protocol %s...", - m.host.id(), MDNSPeriod, m.host.protocolID) + "Starting mDNS discovery service with host %s and protocol %s...", + m.host.id(), m.host.protocolID) // create and start service - mdns, err := libp2pdiscovery.NewMdnsService( - m.host.ctx, - m.host.p2pHost, - MDNSPeriod, - string(m.host.protocolID), - ) - if err != nil { - m.logger.Errorf("Failed to start mDNS discovery service: %s", err) - return - } - - // register Notifee on service - mdns.RegisterNotifee(Notifee{ + serviceName := string(m.host.protocolID) + notifee := &Notifee{ logger: m.logger, ctx: m.host.ctx, host: m.host, - }) - - m.mdns = mdns + } + m.mdns = libp2pdiscovery.NewMdnsService( + m.host.p2pHost, serviceName, notifee) } // close shuts down the mDNS discovery service From c66dcbbdc66d5aa1b58a67134b927f8d25274788 Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Wed, 12 Oct 2022 07:35:48 +0000 Subject: [PATCH 02/15] `internal/mdns` implementation - Inspired from mdns_legacy - Remove mdns dependency on libp2p - Improve existing code to be safer and service oriented --- dot/network/interfaces.go | 5 ++ dot/network/mdns.go | 80 ----------------- dot/network/service.go | 21 ++++- go.mod | 3 +- go.sum | 1 - internal/mdns/dialable.go | 59 +++++++++++++ internal/mdns/interfaces.go | 26 ++++++ internal/mdns/mdns.go | 172 ++++++++++++++++++++++++++++++++++++ internal/mdns/notifee.go | 42 +++++++++ 9 files changed, 322 insertions(+), 87 deletions(-) delete mode 100644 dot/network/mdns.go create mode 100644 internal/mdns/dialable.go create mode 100644 internal/mdns/interfaces.go create mode 100644 internal/mdns/mdns.go create mode 100644 internal/mdns/notifee.go diff --git a/dot/network/interfaces.go b/dot/network/interfaces.go index acff76d6ea..9d8f3ef281 100644 --- a/dot/network/interfaces.go +++ b/dot/network/interfaces.go @@ -18,3 +18,8 @@ type Logger interface { Warnf(format string, args ...interface{}) Errorf(format string, args ...interface{}) } + +type MDNS interface { + Start() error + Stop() error +} diff --git a/dot/network/mdns.go b/dot/network/mdns.go deleted file mode 100644 index a6376f5295..0000000000 --- a/dot/network/mdns.go +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright 2021 ChainSafe Systems (ON) -// SPDX-License-Identifier: LGPL-3.0-only - -package network - -import ( - "context" - - "github.com/ChainSafe/gossamer/internal/log" - "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/peerstore" - libp2pdiscovery "github.com/libp2p/go-libp2p/p2p/discovery/mdns" -) - -// Notifee see https://pkg.go.dev/github.com/libp2p/go-libp2p@v0.22.0/p2p/discovery/mdns#Notifee -type Notifee struct { - logger Logger - ctx context.Context - host *host -} - -// mdns submodule -type mdns struct { - logger Logger - host *host - mdns libp2pdiscovery.Service -} - -// newMDNS creates a new mDNS instance from the host -func newMDNS(host *host) *mdns { - return &mdns{ - logger: log.NewFromGlobal(log.AddContext("module", "mdns")), - host: host, - } -} - -// startMDNS starts a new mDNS discovery service -func (m *mdns) start() { - m.logger.Debugf( - "Starting mDNS discovery service with host %s and protocol %s...", - m.host.id(), m.host.protocolID) - - // create and start service - serviceName := string(m.host.protocolID) - notifee := &Notifee{ - logger: m.logger, - ctx: m.host.ctx, - host: m.host, - } - m.mdns = libp2pdiscovery.NewMdnsService( - m.host.p2pHost, serviceName, notifee) -} - -// close shuts down the mDNS discovery service -func (m *mdns) close() error { - // check if service is running - if m.mdns == nil { - return nil - } - - // close service - err := m.mdns.Close() - if err != nil { - m.logger.Warnf("Failed to close mDNS discovery service: %s", err) - return err - } - - return nil -} - -// HandlePeerFound is event handler called when a peer is found -func (n Notifee) HandlePeerFound(p peer.AddrInfo) { - n.logger.Debugf( - "Peer %s found using mDNS discovery, with host %s", - p.ID, n.host.id()) - - n.host.p2pHost.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL) - // connect to found peer - n.host.cm.peerSetHandler.AddPeer(0, p.ID) -} diff --git a/dot/network/service.go b/dot/network/service.go index 93df156132..5797453458 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -6,6 +6,7 @@ package network import ( "context" "errors" + "fmt" "math/big" "strings" "sync" @@ -14,6 +15,7 @@ import ( "github.com/ChainSafe/gossamer/dot/peerset" "github.com/ChainSafe/gossamer/dot/telemetry" "github.com/ChainSafe/gossamer/internal/log" + "github.com/ChainSafe/gossamer/internal/mdns" "github.com/ChainSafe/gossamer/internal/metrics" "github.com/ChainSafe/gossamer/lib/common" libp2pnetwork "github.com/libp2p/go-libp2p-core/network" @@ -103,7 +105,7 @@ type Service struct { cfg *Config host *host - mdns *mdns + mdns MDNS gossip *gossip bufPool *sync.Pool streamManager *streamManager @@ -186,12 +188,20 @@ func NewService(cfg *Config) (*Service, error) { }, } + serviceTag := string(host.protocolID) + notifee := mdns.NewNotifeeTracker(host.p2pHost.Peerstore(), host.cm.peerSetHandler) + mdnsLogger := log.NewFromGlobal(log.AddContext("module", "mdns")) + mdnsLogger.Debugf( + "Creating mDNS discovery service with host %s and protocol %s...", + host.id(), host.protocolID) + mdnsService := mdns.NewService(host.p2pHost, serviceTag, mdnsLogger, notifee) + network := &Service{ ctx: ctx, cancel: cancel, cfg: cfg, host: host, - mdns: newMDNS(host), + mdns: mdnsService, gossip: newGossip(), blockState: cfg.BlockState, transactionHandler: cfg.TransactionHandler, @@ -303,7 +313,10 @@ func (s *Service) Start() error { s.startPeerSetHandler() if !s.noMDNS { - s.mdns.start() + err = s.mdns.Start() + if err != nil { + return fmt.Errorf("starting mDNS service: %w", err) + } } if !s.noDiscover { @@ -443,7 +456,7 @@ func (s *Service) Stop() error { s.cancel() // close mDNS discovery service - err := s.mdns.close() + err := s.mdns.Stop() if err != nil { logger.Errorf("Failed to close mDNS discovery service: %s", err) } diff --git a/go.mod b/go.mod index b41672efa7..1746d8d60b 100644 --- a/go.mod +++ b/go.mod @@ -119,7 +119,6 @@ require ( github.com/libp2p/go-openssl v0.0.7 // indirect github.com/libp2p/go-reuseport v0.2.0 // indirect github.com/libp2p/go-yamux/v3 v3.1.2 // indirect - github.com/libp2p/zeroconf/v2 v2.1.1 // indirect github.com/lucas-clemente/quic-go v0.27.1 // indirect github.com/marten-seemann/qtls-go1-16 v0.1.5 // indirect github.com/marten-seemann/qtls-go1-17 v0.1.1 // indirect @@ -170,7 +169,7 @@ require ( github.com/tomasen/realip v0.0.0-20180522021738-f0c99a92ddce // indirect github.com/vedhavyas/go-subkey v1.0.3 // indirect github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect - github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9 // indirect + github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9 github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 // indirect go.opencensus.io v0.23.0 // indirect go.uber.org/atomic v1.9.0 // indirect diff --git a/go.sum b/go.sum index ba10328498..0e77a12731 100644 --- a/go.sum +++ b/go.sum @@ -711,7 +711,6 @@ github.com/libp2p/go-yamux/v3 v3.0.1/go.mod h1:s2LsDhHbh+RfCsQoICSYt58U2f8ijtPAN github.com/libp2p/go-yamux/v3 v3.0.2/go.mod h1:s2LsDhHbh+RfCsQoICSYt58U2f8ijtPANFD8BmE74Bo= github.com/libp2p/go-yamux/v3 v3.1.2 h1:lNEy28MBk1HavUAlzKgShp+F6mn/ea1nDYWftZhFW9Q= github.com/libp2p/go-yamux/v3 v3.1.2/go.mod h1:jeLEQgLXqE2YqX1ilAClIfCMDY+0uXQUKmmb/qp0gT4= -github.com/libp2p/zeroconf/v2 v2.1.1 h1:XAuSczA96MYkVwH+LqqqCUZb2yH3krobMJ1YE+0hG2s= github.com/libp2p/zeroconf/v2 v2.1.1/go.mod h1:fuJqLnUwZTshS3U/bMRJ3+ow/v9oid1n0DmyYyNO1Xs= github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= diff --git a/internal/mdns/dialable.go b/internal/mdns/dialable.go new file mode 100644 index 0000000000..00ddc09560 --- /dev/null +++ b/internal/mdns/dialable.go @@ -0,0 +1,59 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package mdns + +import ( + "errors" + "fmt" + "net" + + manet "github.com/multiformats/go-multiaddr/net" +) + +var ( + ErrTCPListenAddressNotFound = errors.New("TCP listen address not found") +) + +func getMDNSIPsAndPort(p2pHost Networker) (ips []net.IP, port uint16, err error) { + tcpAddresses, err := getDialableListenAddrs(p2pHost) + if err != nil { + return nil, 0, fmt.Errorf("getting dialable listen addresses: %w", err) + } + + ips = make([]net.IP, len(tcpAddresses)) + for i := range tcpAddresses { + ips[i] = tcpAddresses[i].IP + } + port = uint16(tcpAddresses[0].Port) + + return ips, port, nil +} + +func getDialableListenAddrs(p2pHost Networker) (tcpAddresses []*net.TCPAddr, err error) { + multiAddresses, err := p2pHost.Network().InterfaceListenAddresses() + if err != nil { + return nil, fmt.Errorf("listing host interface listen addresses: %w", err) + } + + tcpAddresses = make([]*net.TCPAddr, 0, len(multiAddresses)) + for _, multiAddress := range multiAddresses { + netAddress, err := manet.ToNetAddr(multiAddress) + if err != nil { + continue + } + + tcpAddress, ok := netAddress.(*net.TCPAddr) + if !ok { + continue + } + + tcpAddresses = append(tcpAddresses, tcpAddress) + } + + if len(tcpAddresses) == 0 { + return nil, fmt.Errorf("%w: in %d multiaddresses", ErrTCPListenAddressNotFound, len(multiAddresses)) + } + + return tcpAddresses, nil +} diff --git a/internal/mdns/interfaces.go b/internal/mdns/interfaces.go new file mode 100644 index 0000000000..2610897dda --- /dev/null +++ b/internal/mdns/interfaces.go @@ -0,0 +1,26 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package mdns + +import ( + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" +) + +// Logger is a logger interface for the mDNS service. +type Logger interface { + Debugf(format string, args ...any) + Warnf(format string, args ...any) +} + +// IDNetworker can return the peer ID and a network interface. +type IDNetworker interface { + ID() peer.ID + Networker +} + +// Networker can return a network interface. +type Networker interface { + Network() network.Network +} diff --git a/internal/mdns/mdns.go b/internal/mdns/mdns.go new file mode 100644 index 0000000000..d3e4f24a7e --- /dev/null +++ b/internal/mdns/mdns.go @@ -0,0 +1,172 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package mdns + +import ( + "fmt" + "net" + "time" + + "github.com/libp2p/go-libp2p-core/peer" + + "github.com/hashicorp/mdns" + "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" +) + +// Notifee is notified when a new peer is found. +type Notifee interface { + HandlePeerFound(peer.AddrInfo) +} + +// Service implements a mDNS service. +type Service struct { + server *mdns.Server + p2pHost IDNetworker + serviceTag string + notifee Notifee + logger Logger + started bool + stop chan struct{} + done chan struct{} +} + +// NewService creates and returns a new mDNS service. +func NewService(p2pHost IDNetworker, serviceTag string, + logger Logger, notifee Notifee) (service *Service) { + if serviceTag == "" { + serviceTag = "_ipfs-discovery._udp" + } + + return &Service{ + p2pHost: p2pHost, + serviceTag: serviceTag, + notifee: notifee, + logger: logger, + } +} + +// Start starts the mDNS service. +func (s *Service) Start() (err error) { + ips, port, err := getMDNSIPsAndPort(s.p2pHost) + if err != nil { + return fmt.Errorf("getting MDNS ips and port: %w", err) + } + + hostID := s.p2pHost.ID() + + hostIDPretty := hostID.Pretty() + txt := []string{hostIDPretty} + mdnsService, err := mdns.NewMDNSService(hostIDPretty, s.serviceTag, "", "", int(port), ips, txt) + if err != nil { + return fmt.Errorf("creating mDNS service: %w", err) + } + + server, err := mdns.NewServer(&mdns.Config{Zone: mdnsService}) + if err != nil { + return fmt.Errorf("creating mDNS server: %w", err) + } + s.server = server + + s.started = true + s.stop = make(chan struct{}) + s.done = make(chan struct{}) + + go s.run() + + return nil +} + +// Stop stops the mDNS service and server. +func (s *Service) Stop() (err error) { + if !s.started { + return nil + } + + defer func() { + s.started = false + }() + close(s.stop) + <-s.done + return s.server.Shutdown() +} + +func (s *Service) run() { + defer close(s.done) + + const pollPeriod = time.Minute + ticker := time.NewTicker(pollPeriod) + defer ticker.Stop() + + for { + entriesCh := make(chan *mdns.ServiceEntry, 16) + go func() { + for entry := range entriesCh { + s.handleEntry(entry) + } + }() + + const queryTimeout = 5 * time.Second + params := &mdns.QueryParam{ + Domain: "local", + Entries: entriesCh, + Service: s.serviceTag, + Timeout: queryTimeout, + } + err := mdns.Query(params) + if err != nil { + s.logger.Warnf("mdns query failed: %s", err) + } + close(entriesCh) + + select { + case <-ticker.C: + case <-s.stop: + return + } + } +} + +func (s *Service) handleEntry(entry *mdns.ServiceEntry) { + receivedPeerID, err := peer.Decode(entry.Info) + if err != nil { + s.logger.Warnf("error parsing peer ID from mdns entry: %s", err) + return + } + + if receivedPeerID == s.p2pHost.ID() { + return + } + + var ip net.IP + switch { + case entry.AddrV4 != nil: + ip = entry.AddrV4 + case entry.AddrV6 != nil: + ip = entry.AddrV6 + default: + s.logger.Warnf("mdns entry from peer id %s has no IP address", receivedPeerID) + return + } + + tcpAddress := &net.TCPAddr{ + IP: ip, + Port: entry.Port, + } + + multiAddress, err := manet.FromNetAddr(tcpAddress) + if err != nil { + s.logger.Warnf("failed converting tcp address from peer id %s to multiaddress: %s", + receivedPeerID, err) + return + } + + addressInfo := peer.AddrInfo{ + ID: receivedPeerID, + Addrs: []multiaddr.Multiaddr{multiAddress}, + } + + s.logger.Debugf("Peer %s has addresses %s", receivedPeerID, addressInfo.Addrs) + go s.notifee.HandlePeerFound(addressInfo) +} diff --git a/internal/mdns/notifee.go b/internal/mdns/notifee.go new file mode 100644 index 0000000000..72be0c29e9 --- /dev/null +++ b/internal/mdns/notifee.go @@ -0,0 +1,42 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package mdns + +import ( + "time" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peerstore" + "github.com/multiformats/go-multiaddr" +) + +// AddressAdder is an interface that adds addresses. +type AddressAdder interface { + AddAddrs(p peer.ID, addrs []multiaddr.Multiaddr, ttl time.Duration) +} + +// PeerAdder adds peers. +type PeerAdder interface { + AddPeer(setID int, peerIDs ...peer.ID) +} + +// NewNotifeeTracker returns a new notifee tracker. +func NewNotifeeTracker(addressAdder AddressAdder, peerAdder PeerAdder) *NotifeeTracker { + return &NotifeeTracker{ + addressAdder: addressAdder, + peerAdder: peerAdder, + } +} + +// NotifeeTracker tracks new peers found. +type NotifeeTracker struct { + addressAdder AddressAdder + peerAdder PeerAdder +} + +// HandlePeerFound tracks the address info from the peer found. +func (n *NotifeeTracker) HandlePeerFound(p peer.AddrInfo) { + n.addressAdder.AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL) + n.peerAdder.AddPeer(0, p.ID) +} From c5dd63f804d3564646bb2000cfbfb93e8b94a048 Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Wed, 12 Oct 2022 07:37:16 +0000 Subject: [PATCH 03/15] Revert to whyrusleeping? --- internal/mdns/mdns.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/mdns/mdns.go b/internal/mdns/mdns.go index d3e4f24a7e..4bedd612e0 100644 --- a/internal/mdns/mdns.go +++ b/internal/mdns/mdns.go @@ -10,9 +10,9 @@ import ( "github.com/libp2p/go-libp2p-core/peer" - "github.com/hashicorp/mdns" "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" + "github.com/whyrusleeping/mdns" ) // Notifee is notified when a new peer is found. From 16a37c5ae848dee3b7c3938d744871ee020dda41 Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Wed, 12 Oct 2022 08:07:39 +0000 Subject: [PATCH 04/15] Default to nil ips and port 4001 if no dialable found --- internal/mdns/dialable.go | 7 ++++--- internal/mdns/mdns.go | 5 +---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/internal/mdns/dialable.go b/internal/mdns/dialable.go index 00ddc09560..905e0e9dc4 100644 --- a/internal/mdns/dialable.go +++ b/internal/mdns/dialable.go @@ -15,10 +15,11 @@ var ( ErrTCPListenAddressNotFound = errors.New("TCP listen address not found") ) -func getMDNSIPsAndPort(p2pHost Networker) (ips []net.IP, port uint16, err error) { +func getMDNSIPsAndPort(p2pHost Networker) (ips []net.IP, port uint16) { tcpAddresses, err := getDialableListenAddrs(p2pHost) if err != nil { - return nil, 0, fmt.Errorf("getting dialable listen addresses: %w", err) + const defaultPort = 4001 + return nil, defaultPort } ips = make([]net.IP, len(tcpAddresses)) @@ -27,7 +28,7 @@ func getMDNSIPsAndPort(p2pHost Networker) (ips []net.IP, port uint16, err error) } port = uint16(tcpAddresses[0].Port) - return ips, port, nil + return ips, port } func getDialableListenAddrs(p2pHost Networker) (tcpAddresses []*net.TCPAddr, err error) { diff --git a/internal/mdns/mdns.go b/internal/mdns/mdns.go index 4bedd612e0..5267e43311 100644 --- a/internal/mdns/mdns.go +++ b/internal/mdns/mdns.go @@ -49,10 +49,7 @@ func NewService(p2pHost IDNetworker, serviceTag string, // Start starts the mDNS service. func (s *Service) Start() (err error) { - ips, port, err := getMDNSIPsAndPort(s.p2pHost) - if err != nil { - return fmt.Errorf("getting MDNS ips and port: %w", err) - } + ips, port := getMDNSIPsAndPort(s.p2pHost) hostID := s.p2pHost.ID() From 13adee4041d1f2249ab74b3e51594a604f2694ad Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Thu, 1 Dec 2022 12:31:07 +0000 Subject: [PATCH 05/15] Re-use entries channel across ticks --- internal/mdns/mdns.go | 71 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 56 insertions(+), 15 deletions(-) diff --git a/internal/mdns/mdns.go b/internal/mdns/mdns.go index 5267e43311..a8ef890b5e 100644 --- a/internal/mdns/mdns.go +++ b/internal/mdns/mdns.go @@ -96,35 +96,76 @@ func (s *Service) run() { ticker := time.NewTicker(pollPeriod) defer ticker.Stop() + entriesListeningLoopStop := make(chan struct{}) + entriesListeningLoopDone := make(chan struct{}) + entriesCh := make(chan *mdns.ServiceEntry, 16) + entriesStartListening := make(chan struct{}) + entriesStopListening := make(chan struct{}) + + go s.handleEntries(entriesListeningLoopStop, entriesListeningLoopDone, + entriesStartListening, entriesStopListening, entriesCh) + + const queryTimeout = 5 * time.Second + params := &mdns.QueryParam{ + Domain: "local", + Entries: entriesCh, + Service: s.serviceTag, + Timeout: queryTimeout, + } + for { - entriesCh := make(chan *mdns.ServiceEntry, 16) - go func() { - for entry := range entriesCh { - s.handleEntry(entry) - } - }() - - const queryTimeout = 5 * time.Second - params := &mdns.QueryParam{ - Domain: "local", - Entries: entriesCh, - Service: s.serviceTag, - Timeout: queryTimeout, - } + entriesStartListening <- struct{}{} err := mdns.Query(params) if err != nil { s.logger.Warnf("mdns query failed: %s", err) } - close(entriesCh) + entriesStopListening <- struct{}{} + + // Drain the entries channel, we no longer care about entries. + for len(entriesCh) > 0 { + <-entriesCh + } select { case <-ticker.C: case <-s.stop: + close(entriesListeningLoopStop) + <-entriesListeningLoopDone + close(entriesCh) + close(entriesStartListening) + close(entriesStopListening) return } } } +func (s *Service) handleEntries(stop <-chan struct{}, done chan<- struct{}, + startListening, stopListening <-chan struct{}, entries <-chan *mdns.ServiceEntry) { + defer close(done) + + for { + // Wait for the start signal to start listening for entries + select { + case <-startListening: + case <-stop: + return + } + + continueListening := true + for continueListening { + // Listen for entries until we receive a stop listening signal. + select { + case entry := <-entries: + s.handleEntry(entry) + case <-stopListening: + continueListening = false + case <-stop: + return + } + } + } +} + func (s *Service) handleEntry(entry *mdns.ServiceEntry) { receivedPeerID, err := peer.Decode(entry.Info) if err != nil { From 30fc2c5ede305b6cffd657d176307b02f83a9737 Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Thu, 1 Dec 2022 16:09:21 +0000 Subject: [PATCH 06/15] Add ready channels --- internal/mdns/mdns.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/internal/mdns/mdns.go b/internal/mdns/mdns.go index a8ef890b5e..e5d3cdc407 100644 --- a/internal/mdns/mdns.go +++ b/internal/mdns/mdns.go @@ -69,8 +69,12 @@ func (s *Service) Start() (err error) { s.started = true s.stop = make(chan struct{}) s.done = make(chan struct{}) + ready := make(chan struct{}) - go s.run() + go s.run(ready) + // It takes a few milliseconds to launch a goroutine + // so we wait for the run goroutine to be ready. + <-ready return nil } @@ -89,21 +93,23 @@ func (s *Service) Stop() (err error) { return s.server.Shutdown() } -func (s *Service) run() { +func (s *Service) run(ready chan<- struct{}) { defer close(s.done) const pollPeriod = time.Minute ticker := time.NewTicker(pollPeriod) defer ticker.Stop() + handleEntriesReady := make(chan struct{}) entriesListeningLoopStop := make(chan struct{}) entriesListeningLoopDone := make(chan struct{}) entriesCh := make(chan *mdns.ServiceEntry, 16) entriesStartListening := make(chan struct{}) entriesStopListening := make(chan struct{}) - go s.handleEntries(entriesListeningLoopStop, entriesListeningLoopDone, + go s.handleEntries(handleEntriesReady, entriesListeningLoopStop, entriesListeningLoopDone, entriesStartListening, entriesStopListening, entriesCh) + <-handleEntriesReady const queryTimeout = 5 * time.Second params := &mdns.QueryParam{ @@ -113,6 +119,8 @@ func (s *Service) run() { Timeout: queryTimeout, } + close(ready) + for { entriesStartListening <- struct{}{} err := mdns.Query(params) @@ -139,9 +147,10 @@ func (s *Service) run() { } } -func (s *Service) handleEntries(stop <-chan struct{}, done chan<- struct{}, +func (s *Service) handleEntries(ready chan<- struct{}, stop <-chan struct{}, done chan<- struct{}, startListening, stopListening <-chan struct{}, entries <-chan *mdns.ServiceEntry) { defer close(done) + close(ready) for { // Wait for the start signal to start listening for entries From ea22ee8d0df4874cfba71c9af7ac014bb09de94b Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Thu, 1 Dec 2022 16:12:40 +0000 Subject: [PATCH 07/15] Return error from handleEntry and log them in handleEntries --- internal/mdns/mdns.go | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/internal/mdns/mdns.go b/internal/mdns/mdns.go index e5d3cdc407..02ed2ca8df 100644 --- a/internal/mdns/mdns.go +++ b/internal/mdns/mdns.go @@ -4,6 +4,7 @@ package mdns import ( + "errors" "fmt" "net" "time" @@ -165,7 +166,10 @@ func (s *Service) handleEntries(ready chan<- struct{}, stop <-chan struct{}, don // Listen for entries until we receive a stop listening signal. select { case entry := <-entries: - s.handleEntry(entry) + err := s.handleEntry(entry) + if err != nil { + s.logger.Warnf("handling mDNS entry: %s", err) + } case <-stopListening: continueListening = false case <-stop: @@ -175,15 +179,18 @@ func (s *Service) handleEntries(ready chan<- struct{}, stop <-chan struct{}, don } } -func (s *Service) handleEntry(entry *mdns.ServiceEntry) { +var ( + errEntryHasNoIP = errors.New("MDNS entry has no IP address") +) + +func (s *Service) handleEntry(entry *mdns.ServiceEntry) (err error) { receivedPeerID, err := peer.Decode(entry.Info) if err != nil { - s.logger.Warnf("error parsing peer ID from mdns entry: %s", err) - return + return fmt.Errorf("parsing peer ID from mdns entry: %w", err) } if receivedPeerID == s.p2pHost.ID() { - return + return nil } var ip net.IP @@ -193,8 +200,7 @@ func (s *Service) handleEntry(entry *mdns.ServiceEntry) { case entry.AddrV6 != nil: ip = entry.AddrV6 default: - s.logger.Warnf("mdns entry from peer id %s has no IP address", receivedPeerID) - return + return fmt.Errorf("%w: from peer id %s", errEntryHasNoIP, receivedPeerID) } tcpAddress := &net.TCPAddr{ @@ -204,9 +210,8 @@ func (s *Service) handleEntry(entry *mdns.ServiceEntry) { multiAddress, err := manet.FromNetAddr(tcpAddress) if err != nil { - s.logger.Warnf("failed converting tcp address from peer id %s to multiaddress: %s", + return fmt.Errorf("converting tcp address from peer id %s to multiaddress: %w", receivedPeerID, err) - return } addressInfo := peer.AddrInfo{ @@ -216,4 +221,5 @@ func (s *Service) handleEntry(entry *mdns.ServiceEntry) { s.logger.Debugf("Peer %s has addresses %s", receivedPeerID, addressInfo.Addrs) go s.notifee.HandlePeerFound(addressInfo) + return nil } From 88e234d0e99b7c86ae10239d6d34b8fcba8a584d Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Fri, 2 Dec 2022 10:14:21 +0000 Subject: [PATCH 08/15] Change place where started is set in Start --- internal/mdns/mdns.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/mdns/mdns.go b/internal/mdns/mdns.go index 02ed2ca8df..2224f3706e 100644 --- a/internal/mdns/mdns.go +++ b/internal/mdns/mdns.go @@ -67,7 +67,6 @@ func (s *Service) Start() (err error) { } s.server = server - s.started = true s.stop = make(chan struct{}) s.done = make(chan struct{}) ready := make(chan struct{}) @@ -77,6 +76,8 @@ func (s *Service) Start() (err error) { // so we wait for the run goroutine to be ready. <-ready + s.started = true + return nil } From 92d02628cb2e35b66c83b7a957a589ee59bd3e34 Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Fri, 2 Dec 2022 10:15:00 +0000 Subject: [PATCH 09/15] Return `nil` for Start on a started service --- internal/mdns/mdns.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/mdns/mdns.go b/internal/mdns/mdns.go index 2224f3706e..af621c2b79 100644 --- a/internal/mdns/mdns.go +++ b/internal/mdns/mdns.go @@ -50,6 +50,10 @@ func NewService(p2pHost IDNetworker, serviceTag string, // Start starts the mDNS service. func (s *Service) Start() (err error) { + if s.started { + return nil + } + ips, port := getMDNSIPsAndPort(s.p2pHost) hostID := s.p2pHost.ID() From 8ef4b7de98f6180083aa066a0afbc50eb95369e8 Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Fri, 2 Dec 2022 10:22:24 +0000 Subject: [PATCH 10/15] Simplify back entries handling code --- internal/mdns/mdns.go | 72 +++++++++++-------------------------------- 1 file changed, 18 insertions(+), 54 deletions(-) diff --git a/internal/mdns/mdns.go b/internal/mdns/mdns.go index af621c2b79..b1daf4531d 100644 --- a/internal/mdns/mdns.go +++ b/internal/mdns/mdns.go @@ -106,21 +106,9 @@ func (s *Service) run(ready chan<- struct{}) { ticker := time.NewTicker(pollPeriod) defer ticker.Stop() - handleEntriesReady := make(chan struct{}) - entriesListeningLoopStop := make(chan struct{}) - entriesListeningLoopDone := make(chan struct{}) - entriesCh := make(chan *mdns.ServiceEntry, 16) - entriesStartListening := make(chan struct{}) - entriesStopListening := make(chan struct{}) - - go s.handleEntries(handleEntriesReady, entriesListeningLoopStop, entriesListeningLoopDone, - entriesStartListening, entriesStopListening, entriesCh) - <-handleEntriesReady - const queryTimeout = 5 * time.Second params := &mdns.QueryParam{ Domain: "local", - Entries: entriesCh, Service: s.serviceTag, Timeout: queryTimeout, } @@ -128,62 +116,38 @@ func (s *Service) run(ready chan<- struct{}) { close(ready) for { - entriesStartListening <- struct{}{} + entriesListeningReady := make(chan struct{}) + entriesListeningDone := make(chan struct{}) + entriesCh := make(chan *mdns.ServiceEntry, 16) + go func() { + defer close(entriesListeningDone) + close(entriesListeningReady) + for entry := range entriesCh { + err := s.handleEntry(entry) + if err != nil { + s.logger.Warnf("handling mDNS entry: %s", err) + } + } + }() + <-entriesListeningReady + + params.Entries = entriesCh err := mdns.Query(params) if err != nil { s.logger.Warnf("mdns query failed: %s", err) } - entriesStopListening <- struct{}{} - // Drain the entries channel, we no longer care about entries. - for len(entriesCh) > 0 { - <-entriesCh - } + close(entriesCh) + <-entriesListeningDone select { case <-ticker.C: case <-s.stop: - close(entriesListeningLoopStop) - <-entriesListeningLoopDone - close(entriesCh) - close(entriesStartListening) - close(entriesStopListening) return } } } -func (s *Service) handleEntries(ready chan<- struct{}, stop <-chan struct{}, done chan<- struct{}, - startListening, stopListening <-chan struct{}, entries <-chan *mdns.ServiceEntry) { - defer close(done) - close(ready) - - for { - // Wait for the start signal to start listening for entries - select { - case <-startListening: - case <-stop: - return - } - - continueListening := true - for continueListening { - // Listen for entries until we receive a stop listening signal. - select { - case entry := <-entries: - err := s.handleEntry(entry) - if err != nil { - s.logger.Warnf("handling mDNS entry: %s", err) - } - case <-stopListening: - continueListening = false - case <-stop: - return - } - } - } -} - var ( errEntryHasNoIP = errors.New("MDNS entry has no IP address") ) From b5ec107a59f1260a025c5805a535a7e26597e4b5 Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Fri, 2 Dec 2022 10:24:56 +0000 Subject: [PATCH 11/15] `pollPeriod` as service struct field --- internal/mdns/mdns.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/mdns/mdns.go b/internal/mdns/mdns.go index b1daf4531d..9ecf2d0aa8 100644 --- a/internal/mdns/mdns.go +++ b/internal/mdns/mdns.go @@ -28,6 +28,7 @@ type Service struct { serviceTag string notifee Notifee logger Logger + pollPeriod time.Duration started bool stop chan struct{} done chan struct{} @@ -45,6 +46,7 @@ func NewService(p2pHost IDNetworker, serviceTag string, serviceTag: serviceTag, notifee: notifee, logger: logger, + pollPeriod: time.Minute, } } @@ -102,8 +104,7 @@ func (s *Service) Stop() (err error) { func (s *Service) run(ready chan<- struct{}) { defer close(s.done) - const pollPeriod = time.Minute - ticker := time.NewTicker(pollPeriod) + ticker := time.NewTicker(s.pollPeriod) defer ticker.Stop() const queryTimeout = 5 * time.Second From 92882f446711c5c2f01f9aced9301e8ba9fb8bf8 Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Fri, 2 Dec 2022 10:31:40 +0000 Subject: [PATCH 12/15] Small interface changes in dialable.go --- internal/mdns/dialable.go | 8 ++++---- internal/mdns/interfaces.go | 6 ++++++ internal/mdns/mdns.go | 2 +- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/internal/mdns/dialable.go b/internal/mdns/dialable.go index 905e0e9dc4..1da61b7408 100644 --- a/internal/mdns/dialable.go +++ b/internal/mdns/dialable.go @@ -15,8 +15,8 @@ var ( ErrTCPListenAddressNotFound = errors.New("TCP listen address not found") ) -func getMDNSIPsAndPort(p2pHost Networker) (ips []net.IP, port uint16) { - tcpAddresses, err := getDialableListenAddrs(p2pHost) +func getMDNSIPsAndPort(network interfaceListenAddressesGetter) (ips []net.IP, port uint16) { + tcpAddresses, err := getDialableListenAddrs(network) if err != nil { const defaultPort = 4001 return nil, defaultPort @@ -31,8 +31,8 @@ func getMDNSIPsAndPort(p2pHost Networker) (ips []net.IP, port uint16) { return ips, port } -func getDialableListenAddrs(p2pHost Networker) (tcpAddresses []*net.TCPAddr, err error) { - multiAddresses, err := p2pHost.Network().InterfaceListenAddresses() +func getDialableListenAddrs(network interfaceListenAddressesGetter) (tcpAddresses []*net.TCPAddr, err error) { + multiAddresses, err := network.InterfaceListenAddresses() if err != nil { return nil, fmt.Errorf("listing host interface listen addresses: %w", err) } diff --git a/internal/mdns/interfaces.go b/internal/mdns/interfaces.go index 2610897dda..8021b4c579 100644 --- a/internal/mdns/interfaces.go +++ b/internal/mdns/interfaces.go @@ -6,6 +6,7 @@ package mdns import ( "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" + "github.com/multiformats/go-multiaddr" ) // Logger is a logger interface for the mDNS service. @@ -24,3 +25,8 @@ type IDNetworker interface { type Networker interface { Network() network.Network } + +// interfaceListenAddressesGetter returns the listen addresses of the interfaces. +type interfaceListenAddressesGetter interface { + InterfaceListenAddresses() ([]multiaddr.Multiaddr, error) +} diff --git a/internal/mdns/mdns.go b/internal/mdns/mdns.go index 9ecf2d0aa8..99f2200465 100644 --- a/internal/mdns/mdns.go +++ b/internal/mdns/mdns.go @@ -56,7 +56,7 @@ func (s *Service) Start() (err error) { return nil } - ips, port := getMDNSIPsAndPort(s.p2pHost) + ips, port := getMDNSIPsAndPort(s.p2pHost.Network()) hostID := s.p2pHost.ID() From dd1c1edb1f52289d51c51788edcbb405557ab5f5 Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Wed, 14 Dec 2022 09:52:41 +0000 Subject: [PATCH 13/15] Re-organize service struct fields --- internal/mdns/mdns.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/internal/mdns/mdns.go b/internal/mdns/mdns.go index 99f2200465..2e654225f1 100644 --- a/internal/mdns/mdns.go +++ b/internal/mdns/mdns.go @@ -23,15 +23,22 @@ type Notifee interface { // Service implements a mDNS service. type Service struct { - server *mdns.Server + // Dependencies and configuration injected p2pHost IDNetworker serviceTag string - notifee Notifee logger Logger + notifee Notifee + + // Constant fields pollPeriod time.Duration - started bool - stop chan struct{} - done chan struct{} + + // Fields set by the Start method. + server *mdns.Server + + // Internal service management fields. + started bool + stop chan struct{} + done chan struct{} } // NewService creates and returns a new mDNS service. From 618e98564e2ba38657b9c015b1cc39450ff1ff5b Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Wed, 14 Dec 2022 09:52:59 +0000 Subject: [PATCH 14/15] Add start stop mutex --- internal/mdns/mdns.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/internal/mdns/mdns.go b/internal/mdns/mdns.go index 2e654225f1..3f81fb3f25 100644 --- a/internal/mdns/mdns.go +++ b/internal/mdns/mdns.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "net" + "sync" "time" "github.com/libp2p/go-libp2p-core/peer" @@ -36,9 +37,11 @@ type Service struct { server *mdns.Server // Internal service management fields. - started bool - stop chan struct{} - done chan struct{} + // startStopMutex is to prevent concurrent calls to Start and Stop. + startStopMutex sync.Mutex + started bool + stop chan struct{} + done chan struct{} } // NewService creates and returns a new mDNS service. @@ -59,6 +62,9 @@ func NewService(p2pHost IDNetworker, serviceTag string, // Start starts the mDNS service. func (s *Service) Start() (err error) { + s.startStopMutex.Lock() + defer s.startStopMutex.Unlock() + if s.started { return nil } @@ -96,6 +102,9 @@ func (s *Service) Start() (err error) { // Stop stops the mDNS service and server. func (s *Service) Stop() (err error) { + s.startStopMutex.Lock() + defer s.startStopMutex.Unlock() + if !s.started { return nil } From 6da74a2c7f004f8db5a1f7df6f15eb4266a7f276 Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Mon, 2 Jan 2023 11:32:26 +0000 Subject: [PATCH 15/15] Fix linting --- dot/network/interfaces.go | 1 + 1 file changed, 1 insertion(+) diff --git a/dot/network/interfaces.go b/dot/network/interfaces.go index 9d8f3ef281..6f9145367f 100644 --- a/dot/network/interfaces.go +++ b/dot/network/interfaces.go @@ -19,6 +19,7 @@ type Logger interface { Errorf(format string, args ...interface{}) } +// MDNS is the mDNS service interface. type MDNS interface { Start() error Stop() error