Skip to content

Commit d388884

Browse files
committed
basichost: move EvtLocalAddrsChanged to addrs_manager
We'll deprecate this event, but we still have to keep sending this for a few more releases. More importantly, we need to update the peerstore with the host's addresses and it's better to do this *before* sending update events so that consumers of the event can rely on the host addrs being updated in the peerstore.
1 parent cc0711a commit d388884

File tree

3 files changed

+259
-226
lines changed

3 files changed

+259
-226
lines changed

p2p/host/basic/addrs_manager.go

Lines changed: 234 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,12 @@ import (
1111
"sync/atomic"
1212
"time"
1313

14+
"github.com/libp2p/go-libp2p/core/crypto"
1415
"github.com/libp2p/go-libp2p/core/event"
1516
"github.com/libp2p/go-libp2p/core/network"
17+
"github.com/libp2p/go-libp2p/core/peer"
18+
"github.com/libp2p/go-libp2p/core/peerstore"
19+
"github.com/libp2p/go-libp2p/core/record"
1620
"github.com/libp2p/go-libp2p/p2p/host/basic/internal/backoff"
1721
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
1822
"github.com/libp2p/go-netroute"
@@ -26,6 +30,13 @@ const maxObservedAddrsPerListenAddr = 3
2630
// addrChangeTickrInterval is the interval to recompute host addrs.
2731
var addrChangeTickrInterval = 5 * time.Second
2832

33+
const maxPeerRecordSize = 8 * 1024 // 8k to be compatible with identify's limit
34+
35+
// addrStore is a minimal interface for storing peer addresses
36+
type addrStore interface {
37+
SetAddrs(peer.ID, []ma.Multiaddr, time.Duration)
38+
}
39+
2940
// ObservedAddrsManager maps our local listen addrs to externally observed addrs.
3041
type ObservedAddrsManager interface {
3142
Addrs(minObservers int) []ma.Multiaddr
@@ -51,9 +62,6 @@ type addrsManager struct {
5162
interfaceAddrs *interfaceAddrsCache
5263
addrsReachabilityTracker *addrsReachabilityTracker
5364

54-
// addrsUpdatedChan is notified when addrs change. This is provided by the caller.
55-
addrsUpdatedChan chan struct{}
56-
5765
// triggerAddrsUpdateChan is used to trigger an addresses update.
5866
triggerAddrsUpdateChan chan chan struct{}
5967
// started is used to check whether the addrsManager has started.
@@ -66,6 +74,11 @@ type addrsManager struct {
6674
addrsMx sync.RWMutex
6775
currentAddrs hostAddrs
6876

77+
signKey crypto.PrivKey
78+
addrStore addrStore
79+
signedRecordStore peerstore.CertifiedAddrBook
80+
hostID peer.ID
81+
6982
wg sync.WaitGroup
7083
ctx context.Context
7184
ctxCancel context.CancelFunc
@@ -78,10 +91,13 @@ func newAddrsManager(
7891
listenAddrs func() []ma.Multiaddr,
7992
addCertHashes func([]ma.Multiaddr) []ma.Multiaddr,
8093
observedAddrsManager ObservedAddrsManager,
81-
addrsUpdatedChan chan struct{},
8294
client autonatv2Client,
8395
enableMetrics bool,
8496
registerer prometheus.Registerer,
97+
disableSignedPeerRecord bool,
98+
signKey crypto.PrivKey,
99+
addrStore addrStore,
100+
hostID peer.ID,
85101
) (*addrsManager, error) {
86102
ctx, cancel := context.WithCancel(context.Background())
87103
as := &addrsManager{
@@ -93,14 +109,24 @@ func newAddrsManager(
93109
addrsFactory: addrsFactory,
94110
triggerAddrsUpdateChan: make(chan chan struct{}, 1),
95111
triggerReachabilityUpdate: make(chan struct{}, 1),
96-
addrsUpdatedChan: addrsUpdatedChan,
97112
interfaceAddrs: &interfaceAddrsCache{},
113+
signKey: signKey,
114+
addrStore: addrStore,
115+
hostID: hostID,
98116
ctx: ctx,
99117
ctxCancel: cancel,
100118
}
101119
unknownReachability := network.ReachabilityUnknown
102120
as.hostReachability.Store(&unknownReachability)
103121

122+
if !disableSignedPeerRecord {
123+
var ok bool
124+
as.signedRecordStore, ok = as.addrStore.(peerstore.CertifiedAddrBook)
125+
if !ok {
126+
return nil, errors.New("peerstore doesn't implement CertifiedAddrBook interface")
127+
}
128+
}
129+
104130
if client != nil {
105131
var metricsTracker MetricsTracker
106132
if enableMetrics {
@@ -183,6 +209,15 @@ func (a *addrsManager) startBackgroundWorker() (retErr error) {
183209
mc.Close(),
184210
)
185211
}
212+
mc = append(mc, emitter)
213+
214+
localAddrsEmitter, err := a.bus.Emitter(new(event.EvtLocalAddressesUpdated), eventbus.Stateful)
215+
if err != nil {
216+
return errors.Join(
217+
fmt.Errorf("error creating local addrs emitter: %s", err),
218+
mc.Close(),
219+
)
220+
}
186221

187222
var relayAddrs []ma.Multiaddr
188223
// update relay addrs in case we're private
@@ -205,15 +240,20 @@ func (a *addrsManager) startBackgroundWorker() (retErr error) {
205240
a.started.Store(true)
206241
// update addresses before starting the worker loop. This ensures that any address updates
207242
// before calling addrsManager.Start are correctly reported after Start returns.
208-
a.updateAddrs(relayAddrs)
243+
ha := a.updateAddrs(relayAddrs)
244+
a.updatePeerStore(ha.addrs, nil)
209245

210246
a.wg.Add(1)
211-
go a.background(autoRelayAddrsSub, autonatReachabilitySub, emitter, relayAddrs)
247+
go a.background(autoRelayAddrsSub, autonatReachabilitySub, emitter, localAddrsEmitter, relayAddrs)
212248
return nil
213249
}
214250

215-
func (a *addrsManager) background(autoRelayAddrsSub, autonatReachabilitySub event.Subscription,
216-
emitter event.Emitter, relayAddrs []ma.Multiaddr,
251+
func (a *addrsManager) background(
252+
autoRelayAddrsSub,
253+
autonatReachabilitySub event.Subscription,
254+
emitter event.Emitter,
255+
localAddrsEmitter event.Emitter,
256+
relayAddrs []ma.Multiaddr,
217257
) {
218258
defer a.wg.Done()
219259
defer func() {
@@ -229,6 +269,10 @@ func (a *addrsManager) background(autoRelayAddrsSub, autonatReachabilitySub even
229269
if err != nil {
230270
log.Warnf("error closing host reachability emitter: %s", err)
231271
}
272+
err = localAddrsEmitter.Close()
273+
if err != nil {
274+
log.Warnf("error closing local addrs emitter: %s", err)
275+
}
232276
}()
233277

234278
ticker := time.NewTicker(addrChangeTickrInterval)
@@ -241,7 +285,7 @@ func (a *addrsManager) background(autoRelayAddrsSub, autonatReachabilitySub even
241285
close(notifCh)
242286
notifCh = nil
243287
}
244-
a.notifyAddrsChanged(emitter, previousAddrs, currAddrs)
288+
a.handleAddrsUpdated(emitter, localAddrsEmitter, previousAddrs, currAddrs)
245289
previousAddrs = currAddrs
246290
select {
247291
case <-ticker.C:
@@ -294,19 +338,16 @@ func (a *addrsManager) updateAddrs(relayAddrs []ma.Multiaddr) hostAddrs {
294338
}
295339
}
296340

297-
func (a *addrsManager) notifyAddrsChanged(emitter event.Emitter, previous, current hostAddrs) {
341+
func (a *addrsManager) handleAddrsUpdated(emitter event.Emitter, localAddrsEmitter event.Emitter, previous, current hostAddrs) {
298342
if areAddrsDifferent(previous.localAddrs, current.localAddrs) {
299-
log.Debugf("host local addresses updated: %s", current.localAddrs)
343+
log.Debugf("host direct addresses updated: %s", current.localAddrs)
300344
if a.addrsReachabilityTracker != nil {
301345
a.addrsReachabilityTracker.UpdateAddrs(current.localAddrs)
302346
}
303347
}
304348
if areAddrsDifferent(previous.addrs, current.addrs) {
305-
log.Debugf("host addresses updated: %s", current.localAddrs)
306-
select {
307-
case a.addrsUpdatedChan <- struct{}{}:
308-
default:
309-
}
349+
log.Debugf("host addresses updated: %s", current.addrs)
350+
a.handleHostAddrsUpdated(localAddrsEmitter, current.addrs, previous.addrs)
310351
}
311352

312353
// We *must* send both reachability changed and addrs changed events from the
@@ -487,6 +528,100 @@ func (a *addrsManager) appendObservedAddrs(dst []ma.Multiaddr, listenAddrs, ifac
487528
return dst
488529
}
489530

531+
// makeSignedPeerRecord creates a signed peer record for the given addresses
532+
func (a *addrsManager) makeSignedPeerRecord(addrs []ma.Multiaddr) (*record.Envelope, error) {
533+
if a.signKey == nil {
534+
return nil, errors.New("signKey is nil")
535+
}
536+
// Limit the length of currentAddrs to ensure that our signed peer records aren't rejected
537+
peerRecordSize := 64 // HostID
538+
k, err := a.signKey.Raw()
539+
var nk int
540+
if err == nil {
541+
nk = len(k)
542+
} else {
543+
nk = 1024 // In case of error, use a large enough value.
544+
}
545+
peerRecordSize += 2 * nk // 1 for signature, 1 for public key
546+
// we want the final address list to be small for keeping the signed peer record in size
547+
addrs = trimHostAddrList(addrs, maxPeerRecordSize-peerRecordSize-256) // 256 B of buffer
548+
rec := peer.PeerRecordFromAddrInfo(peer.AddrInfo{
549+
ID: a.hostID,
550+
Addrs: addrs,
551+
})
552+
return record.Seal(rec, a.signKey)
553+
}
554+
555+
// handleHostAddrsUpdated emits an EvtLocalAddressesUpdated event and updates the addresses in the peerstore.
556+
func (a *addrsManager) handleHostAddrsUpdated(emitter event.Emitter, currentAddrs []ma.Multiaddr, lastAddrs []ma.Multiaddr) {
557+
added, maintained, removed := diffAddrs(lastAddrs, currentAddrs)
558+
if len(added) == 0 && len(removed) == 0 {
559+
return
560+
}
561+
562+
sr := a.updatePeerStore(currentAddrs, removed)
563+
564+
evt := &event.EvtLocalAddressesUpdated{
565+
Diffs: true,
566+
Current: make([]event.UpdatedAddress, 0, len(currentAddrs)),
567+
Removed: make([]event.UpdatedAddress, 0, len(removed)),
568+
SignedPeerRecord: sr,
569+
}
570+
571+
for _, addr := range maintained {
572+
evt.Current = append(evt.Current, event.UpdatedAddress{
573+
Address: addr,
574+
Action: event.Maintained,
575+
})
576+
}
577+
578+
for _, addr := range added {
579+
evt.Current = append(evt.Current, event.UpdatedAddress{
580+
Address: addr,
581+
Action: event.Added,
582+
})
583+
}
584+
585+
for _, addr := range removed {
586+
evt.Removed = append(evt.Removed, event.UpdatedAddress{
587+
Address: addr,
588+
Action: event.Removed,
589+
})
590+
}
591+
592+
// emit addr change event
593+
if err := emitter.Emit(*evt); err != nil {
594+
log.Warnf("error emitting event for updated addrs: %s", err)
595+
}
596+
}
597+
598+
// updatePeerStore updates the peer store and returns the signed peer record.
599+
// If the signed peer record is not created, it returns nil.
600+
func (a *addrsManager) updatePeerStore(currentAddrs []ma.Multiaddr, removedAddrs []ma.Multiaddr) *record.Envelope {
601+
// update host addresses in the peer store
602+
a.addrStore.SetAddrs(a.hostID, currentAddrs, peerstore.PermanentAddrTTL)
603+
a.addrStore.SetAddrs(a.hostID, removedAddrs, 0)
604+
605+
var sr *record.Envelope
606+
// Our addresses have changed.
607+
// store the signed peer record in the peer store.
608+
if a.signedRecordStore != nil {
609+
var err error
610+
// add signed peer record to the event
611+
// in case of an error drop this event.
612+
sr, err = a.makeSignedPeerRecord(currentAddrs)
613+
if err != nil {
614+
log.Errorf("error creating a signed peer record from the set of current addresses, err=%s", err)
615+
return nil
616+
}
617+
if _, err := a.signedRecordStore.ConsumePeerRecord(sr, peerstore.PermanentAddrTTL); err != nil {
618+
log.Errorf("failed to persist signed peer record in peer store, err=%s", err)
619+
return nil
620+
}
621+
}
622+
return sr
623+
}
624+
490625
func areAddrsDifferent(prev, current []ma.Multiaddr) bool {
491626
// TODO: make the sorted nature of ma.Unique a guarantee in multiaddrs
492627
prev = ma.Unique(prev)
@@ -504,6 +639,88 @@ func areAddrsDifferent(prev, current []ma.Multiaddr) bool {
504639
return false
505640
}
506641

642+
// diffAddrs diffs prev and current addrs and returns added, maintained, and removed addrs.
643+
// Both prev and current are expected to be sorted using ma.Compare()
644+
func diffAddrs(prev, current []ma.Multiaddr) (added, maintained, removed []ma.Multiaddr) {
645+
i, j := 0, 0
646+
for i < len(prev) && j < len(current) {
647+
cmp := prev[i].Compare(current[j])
648+
switch {
649+
case cmp < 0:
650+
// prev < current
651+
removed = append(removed, prev[i])
652+
i++
653+
case cmp > 0:
654+
// current < prev
655+
added = append(added, current[j])
656+
j++
657+
default:
658+
maintained = append(maintained, current[j])
659+
i++
660+
j++
661+
}
662+
}
663+
// All remaining current addresses are added
664+
added = append(added, current[j:]...)
665+
666+
// All remaining previous addresses are removed
667+
removed = append(removed, prev[i:]...)
668+
return
669+
}
670+
671+
// trimHostAddrList trims the address list to fit within the maximum size
672+
func trimHostAddrList(addrs []ma.Multiaddr, maxSize int) []ma.Multiaddr {
673+
totalSize := 0
674+
for _, a := range addrs {
675+
totalSize += len(a.Bytes())
676+
}
677+
if totalSize <= maxSize {
678+
return addrs
679+
}
680+
681+
score := func(addr ma.Multiaddr) int {
682+
var res int
683+
if manet.IsPublicAddr(addr) {
684+
res |= 1 << 12
685+
} else if !manet.IsIPLoopback(addr) {
686+
res |= 1 << 11
687+
}
688+
var protocolWeight int
689+
ma.ForEach(addr, func(c ma.Component) bool {
690+
switch c.Protocol().Code {
691+
case ma.P_QUIC_V1:
692+
protocolWeight = 5
693+
case ma.P_TCP:
694+
protocolWeight = 4
695+
case ma.P_WSS:
696+
protocolWeight = 3
697+
case ma.P_WEBTRANSPORT:
698+
protocolWeight = 2
699+
case ma.P_WEBRTC_DIRECT:
700+
protocolWeight = 1
701+
case ma.P_P2P:
702+
return false
703+
}
704+
return true
705+
})
706+
res |= 1 << protocolWeight
707+
return res
708+
}
709+
710+
slices.SortStableFunc(addrs, func(a, b ma.Multiaddr) int {
711+
return score(b) - score(a) // b-a for reverse order
712+
})
713+
totalSize = 0
714+
for i, a := range addrs {
715+
totalSize += len(a.Bytes())
716+
if totalSize > maxSize {
717+
addrs = addrs[:i]
718+
break
719+
}
720+
}
721+
return addrs
722+
}
723+
507724
const interfaceAddrsCacheTTL = time.Minute
508725

509726
type interfaceAddrsCache struct {

0 commit comments

Comments
 (0)