Skip to content

Commit 611df04

Browse files
committed
basichost: move EvtLocalAddrsChanged to addrs_manager
We'll deprecate this event, but we still have to keep sending this for a few more releases. More importantly, we need to update the peerstore with the host's addresses and it's better to do this *before* sending update events so that consumers of the event can rely on the host addrs being updated in the peerstore.
1 parent 7ceafe9 commit 611df04

File tree

3 files changed

+252
-223
lines changed

3 files changed

+252
-223
lines changed

p2p/host/basic/addrs_manager.go

Lines changed: 227 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,12 @@ import (
1111
"sync/atomic"
1212
"time"
1313

14+
"github.com/libp2p/go-libp2p/core/crypto"
1415
"github.com/libp2p/go-libp2p/core/event"
1516
"github.com/libp2p/go-libp2p/core/network"
17+
"github.com/libp2p/go-libp2p/core/peer"
18+
"github.com/libp2p/go-libp2p/core/peerstore"
19+
"github.com/libp2p/go-libp2p/core/record"
1620
"github.com/libp2p/go-libp2p/p2p/host/basic/internal/backoff"
1721
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
1822
"github.com/libp2p/go-netroute"
@@ -28,6 +32,13 @@ var (
2832
natTypeChangeTickrInterval = time.Minute
2933
)
3034

35+
const maxPeerRecordSize = 8 * 1024 // 8k to be compatible with identify's limit
36+
37+
// addrStore is a minimal interface for storing peer addresses
38+
type addrStore interface {
39+
SetAddrs(peer.ID, []ma.Multiaddr, time.Duration)
40+
}
41+
3142
type observedAddrsManager interface {
3243
Addrs(minObservers int) []ma.Multiaddr
3344
AddrsFor(local ma.Multiaddr) []ma.Multiaddr
@@ -58,9 +69,6 @@ type addrsManager struct {
5869
interfaceAddrs *interfaceAddrsCache
5970
addrsReachabilityTracker *addrsReachabilityTracker
6071

61-
// addrsUpdatedChan is notified when addrs change. This is provided by the caller.
62-
addrsUpdatedChan chan struct{}
63-
6472
// triggerAddrsUpdateChan is used to trigger an addresses update.
6573
triggerAddrsUpdateChan chan chan struct{}
6674
// started is used to check whether the addrsManager has started.
@@ -73,6 +81,11 @@ type addrsManager struct {
7381
addrsMx sync.RWMutex
7482
currentAddrs hostAddrs
7583

84+
signKey crypto.PrivKey
85+
addrStore addrStore
86+
signedRecordStore peerstore.CertifiedAddrBook
87+
hostID peer.ID
88+
7689
wg sync.WaitGroup
7790
ctx context.Context
7891
ctxCancel context.CancelFunc
@@ -86,10 +99,13 @@ func newAddrsManager(
8699
addCertHashes func([]ma.Multiaddr) []ma.Multiaddr,
87100
disableObservedAddrs bool,
88101
observedAddrsManager observedAddrsManager,
89-
addrsUpdatedChan chan struct{},
90102
client autonatv2Client,
91103
enableMetrics bool,
92104
registerer prometheus.Registerer,
105+
disableSignedPeerRecord bool,
106+
signKey crypto.PrivKey,
107+
addrStore addrStore,
108+
hostID peer.ID,
93109
) (*addrsManager, error) {
94110
ctx, cancel := context.WithCancel(context.Background())
95111
as := &addrsManager{
@@ -100,8 +116,10 @@ func newAddrsManager(
100116
addrsFactory: addrsFactory,
101117
triggerAddrsUpdateChan: make(chan chan struct{}, 1),
102118
triggerReachabilityUpdate: make(chan struct{}, 1),
103-
addrsUpdatedChan: addrsUpdatedChan,
104119
interfaceAddrs: &interfaceAddrsCache{},
120+
signKey: signKey,
121+
addrStore: addrStore,
122+
hostID: hostID,
105123
ctx: ctx,
106124
ctxCancel: cancel,
107125
}
@@ -127,6 +145,14 @@ func newAddrsManager(
127145
}
128146
}
129147

148+
if !disableSignedPeerRecord {
149+
var ok bool
150+
as.signedRecordStore, ok = as.addrStore.(peerstore.CertifiedAddrBook)
151+
if !ok {
152+
return nil, errors.New("peerstore doesn't implement CertifiedAddrBook interface")
153+
}
154+
}
155+
130156
if client != nil {
131157
var metricsTracker MetricsTracker
132158
if enableMetrics {
@@ -254,6 +280,12 @@ func (a *addrsManager) startBackgroundWorker() (retErr error) {
254280
return fmt.Errorf("error creating reachability subscriber: %s", err)
255281
}
256282

283+
localAddrsEmitter, err := a.bus.Emitter(new(event.EvtLocalAddressesUpdated), eventbus.Stateful)
284+
if err != nil {
285+
return fmt.Errorf("error creating local addrs emitter: %s", err)
286+
}
287+
defer func() { retErr = closeIfError(retErr, localAddrsEmitter, "local addrs emitter") }()
288+
257289
var relayAddrs []ma.Multiaddr
258290
// update relay addrs in case we're private
259291
select {
@@ -275,17 +307,19 @@ func (a *addrsManager) startBackgroundWorker() (retErr error) {
275307
a.started.Store(true)
276308
// update addresses before starting the worker loop. This ensures that any address updates
277309
// before calling addrsManager.Start are correctly reported after Start returns.
278-
a.updateAddrs(relayAddrs)
310+
ha := a.updateAddrs(relayAddrs)
311+
a.updatePeerStore(ha.addrs, nil)
279312

280313
a.wg.Add(1)
281-
go a.background(autoRelayAddrsSub, autonatReachabilitySub, emitter, relayAddrs)
314+
go a.background(autoRelayAddrsSub, autonatReachabilitySub, emitter, localAddrsEmitter, relayAddrs)
282315
return nil
283316
}
284317

285318
func (a *addrsManager) background(
286319
autoRelayAddrsSub,
287320
autonatReachabilitySub event.Subscription,
288321
emitter event.Emitter,
322+
localAddrsEmitter event.Emitter,
289323
relayAddrs []ma.Multiaddr,
290324
) {
291325
defer a.wg.Done()
@@ -302,6 +336,10 @@ func (a *addrsManager) background(
302336
if err != nil {
303337
log.Warnf("error closing host reachability emitter: %s", err)
304338
}
339+
err = localAddrsEmitter.Close()
340+
if err != nil {
341+
log.Warnf("error closing local addrs emitter: %s", err)
342+
}
305343
}()
306344

307345
ticker := time.NewTicker(addrChangeTickrInterval)
@@ -314,7 +352,7 @@ func (a *addrsManager) background(
314352
close(notifCh)
315353
notifCh = nil
316354
}
317-
a.notifyAddrsChanged(emitter, previousAddrs, currAddrs)
355+
a.notifyAddrsChanged(emitter, localAddrsEmitter, previousAddrs, currAddrs)
318356
previousAddrs = currAddrs
319357
select {
320358
case <-ticker.C:
@@ -399,19 +437,18 @@ func (a *addrsManager) updateAddrs(relayAddrs []ma.Multiaddr) hostAddrs {
399437
}
400438
}
401439

402-
func (a *addrsManager) notifyAddrsChanged(emitter event.Emitter, previous, current hostAddrs) {
440+
func (a *addrsManager) notifyAddrsChanged(emitter event.Emitter, localAddrsEmitter event.Emitter, previous, current hostAddrs) {
403441
if areAddrsDifferent(previous.localAddrs, current.localAddrs) {
404442
log.Debugf("host local addresses updated: %s", current.localAddrs)
405443
if a.addrsReachabilityTracker != nil {
406444
a.addrsReachabilityTracker.UpdateAddrs(current.localAddrs)
407445
}
408446
}
409447
if areAddrsDifferent(previous.addrs, current.addrs) {
410-
log.Debugf("host addresses updated: %s", current.localAddrs)
411-
select {
412-
case a.addrsUpdatedChan <- struct{}{}:
413-
default:
414-
}
448+
log.Debugf("host addresses updated: %s", current.addrs)
449+
450+
// Emit EvtLocalAddressesUpdated event and handle peerstore operations
451+
a.handleHostAddrsUpdated(localAddrsEmitter, current.addrs, previous.addrs)
415452
}
416453

417454
// We *must* send both reachability changed and addrs changed events from the
@@ -617,6 +654,182 @@ func areAddrsDifferent(prev, current []ma.Multiaddr) bool {
617654
return false
618655
}
619656

657+
// diffAddrs diffs prev and current addrs and returns added, maintained, and removed addrs.
658+
// Both prev and current are expected to be sorted using ma.Compare()
659+
func (a *addrsManager) diffAddrs(prev, current []ma.Multiaddr) (added, maintained, removed []ma.Multiaddr) {
660+
i, j := 0, 0
661+
for i < len(prev) && j < len(current) {
662+
cmp := prev[i].Compare(current[j])
663+
switch {
664+
case cmp < 0:
665+
// prev < current
666+
removed = append(removed, prev[i])
667+
i++
668+
case cmp > 0:
669+
// current < prev
670+
added = append(added, current[j])
671+
j++
672+
default:
673+
maintained = append(maintained, current[j])
674+
i++
675+
j++
676+
}
677+
}
678+
// All remaining current addresses are added
679+
added = append(added, current[j:]...)
680+
681+
// All remaining previous addresses are removed
682+
removed = append(removed, prev[i:]...)
683+
return
684+
}
685+
686+
// makeSignedPeerRecord creates a signed peer record for the given addresses
687+
func (a *addrsManager) makeSignedPeerRecord(addrs []ma.Multiaddr) (*record.Envelope, error) {
688+
if a.signKey == nil {
689+
return nil, fmt.Errorf("signKey is nil")
690+
}
691+
// Limit the length of currentAddrs to ensure that our signed peer records aren't rejected
692+
peerRecordSize := 64 // HostID
693+
k, err := a.signKey.Raw()
694+
var nk int
695+
if err == nil {
696+
nk = len(k)
697+
} else {
698+
nk = 1024 // In case of error, use a large enough value.
699+
}
700+
peerRecordSize += 2 * nk // 1 for signature, 1 for public key
701+
// we want the final address list to be small for keeping the signed peer record in size
702+
addrs = trimHostAddrList(addrs, maxPeerRecordSize-peerRecordSize-256) // 256 B of buffer
703+
rec := peer.PeerRecordFromAddrInfo(peer.AddrInfo{
704+
ID: a.hostID,
705+
Addrs: addrs,
706+
})
707+
return record.Seal(rec, a.signKey)
708+
}
709+
710+
// trimHostAddrList trims the address list to fit within the maximum size
711+
func trimHostAddrList(addrs []ma.Multiaddr, maxSize int) []ma.Multiaddr {
712+
totalSize := 0
713+
for _, a := range addrs {
714+
totalSize += len(a.Bytes())
715+
}
716+
if totalSize <= maxSize {
717+
return addrs
718+
}
719+
720+
score := func(addr ma.Multiaddr) int {
721+
var res int
722+
if manet.IsPublicAddr(addr) {
723+
res |= 1 << 12
724+
} else if !manet.IsIPLoopback(addr) {
725+
res |= 1 << 11
726+
}
727+
var protocolWeight int
728+
ma.ForEach(addr, func(c ma.Component) bool {
729+
switch c.Protocol().Code {
730+
case ma.P_QUIC_V1:
731+
protocolWeight = 5
732+
case ma.P_TCP:
733+
protocolWeight = 4
734+
case ma.P_WSS:
735+
protocolWeight = 3
736+
case ma.P_WEBTRANSPORT:
737+
protocolWeight = 2
738+
case ma.P_WEBRTC_DIRECT:
739+
protocolWeight = 1
740+
case ma.P_P2P:
741+
return false
742+
}
743+
return true
744+
})
745+
res |= 1 << protocolWeight
746+
return res
747+
}
748+
749+
slices.SortStableFunc(addrs, func(a, b ma.Multiaddr) int {
750+
return score(b) - score(a) // b-a for reverse order
751+
})
752+
totalSize = 0
753+
for i, a := range addrs {
754+
totalSize += len(a.Bytes())
755+
if totalSize > maxSize {
756+
addrs = addrs[:i]
757+
break
758+
}
759+
}
760+
return addrs
761+
}
762+
763+
// handleHostAddrsUpdated emits an EvtLocalAddressesUpdated event and updates the addresses in the peerstore.
764+
func (a *addrsManager) handleHostAddrsUpdated(emitter event.Emitter, currentAddrs []ma.Multiaddr, lastAddrs []ma.Multiaddr) {
765+
added, maintained, removed := a.diffAddrs(lastAddrs, currentAddrs)
766+
if len(added) == 0 && len(removed) == 0 {
767+
return
768+
}
769+
770+
sr := a.updatePeerStore(currentAddrs, removed)
771+
772+
evt := &event.EvtLocalAddressesUpdated{
773+
Diffs: true,
774+
Current: make([]event.UpdatedAddress, 0, len(currentAddrs)),
775+
Removed: make([]event.UpdatedAddress, 0, len(removed)),
776+
SignedPeerRecord: sr,
777+
}
778+
779+
for _, addr := range maintained {
780+
evt.Current = append(evt.Current, event.UpdatedAddress{
781+
Address: addr,
782+
Action: event.Maintained,
783+
})
784+
}
785+
786+
for _, addr := range added {
787+
evt.Current = append(evt.Current, event.UpdatedAddress{
788+
Address: addr,
789+
Action: event.Added,
790+
})
791+
}
792+
793+
for _, addr := range removed {
794+
evt.Removed = append(evt.Removed, event.UpdatedAddress{
795+
Address: addr,
796+
Action: event.Removed,
797+
})
798+
}
799+
800+
// emit addr change event
801+
if err := emitter.Emit(*evt); err != nil {
802+
log.Warnf("error emitting event for updated addrs: %s", err)
803+
}
804+
}
805+
806+
// updatePeerStore updates the peer store and returns the signed peer record.
807+
// If the signed peer record is not created, it returns nil.
808+
func (a *addrsManager) updatePeerStore(currentAddrs []ma.Multiaddr, removedAddrs []ma.Multiaddr) *record.Envelope {
809+
// update host addresses in the peer store
810+
a.addrStore.SetAddrs(a.hostID, currentAddrs, peerstore.PermanentAddrTTL)
811+
a.addrStore.SetAddrs(a.hostID, removedAddrs, 0)
812+
813+
var sr *record.Envelope
814+
// Our addresses have changed.
815+
// store the signed peer record in the peer store.
816+
if a.signedRecordStore != nil {
817+
var err error
818+
// add signed peer record to the event
819+
// in case of an error drop this event.
820+
sr, err = a.makeSignedPeerRecord(currentAddrs)
821+
if err != nil {
822+
log.Errorf("error creating a signed peer record from the set of current addresses, err=%s", err)
823+
return nil
824+
}
825+
if _, err := a.signedRecordStore.ConsumePeerRecord(sr, peerstore.PermanentAddrTTL); err != nil {
826+
log.Errorf("failed to persist signed peer record in peer store, err=%s", err)
827+
return nil
828+
}
829+
}
830+
return sr
831+
}
832+
620833
const interfaceAddrsCacheTTL = time.Minute
621834

622835
type interfaceAddrsCache struct {

0 commit comments

Comments
 (0)