Skip to content

Commit b8d1eab

Browse files
committed
basichost: move EvtLocalAddrsChanged to addrs_manager
We'll deprecate this event, but we still have to keep sending this for a few more releases. More importantly, we need to update the peerstore with the host's addresses and it's better to do this *before* sending update events so that consumers of the event can rely on the host addrs being updated in the peerstore.
1 parent cc0711a commit b8d1eab

File tree

3 files changed

+260
-225
lines changed

3 files changed

+260
-225
lines changed

p2p/host/basic/addrs_manager.go

Lines changed: 235 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,12 @@ import (
1111
"sync/atomic"
1212
"time"
1313

14+
"github.com/libp2p/go-libp2p/core/crypto"
1415
"github.com/libp2p/go-libp2p/core/event"
1516
"github.com/libp2p/go-libp2p/core/network"
17+
"github.com/libp2p/go-libp2p/core/peer"
18+
"github.com/libp2p/go-libp2p/core/peerstore"
19+
"github.com/libp2p/go-libp2p/core/record"
1620
"github.com/libp2p/go-libp2p/p2p/host/basic/internal/backoff"
1721
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
1822
"github.com/libp2p/go-netroute"
@@ -26,6 +30,13 @@ const maxObservedAddrsPerListenAddr = 3
2630
// addrChangeTickrInterval is the interval to recompute host addrs.
2731
var addrChangeTickrInterval = 5 * time.Second
2832

33+
const maxPeerRecordSize = 8 * 1024 // 8k to be compatible with identify's limit
34+
35+
// addrStore is a minimal interface for storing peer addresses
36+
type addrStore interface {
37+
SetAddrs(peer.ID, []ma.Multiaddr, time.Duration)
38+
}
39+
2940
// ObservedAddrsManager maps our local listen addrs to externally observed addrs.
3041
type ObservedAddrsManager interface {
3142
Addrs(minObservers int) []ma.Multiaddr
@@ -51,9 +62,6 @@ type addrsManager struct {
5162
interfaceAddrs *interfaceAddrsCache
5263
addrsReachabilityTracker *addrsReachabilityTracker
5364

54-
// addrsUpdatedChan is notified when addrs change. This is provided by the caller.
55-
addrsUpdatedChan chan struct{}
56-
5765
// triggerAddrsUpdateChan is used to trigger an addresses update.
5866
triggerAddrsUpdateChan chan chan struct{}
5967
// started is used to check whether the addrsManager has started.
@@ -66,6 +74,11 @@ type addrsManager struct {
6674
addrsMx sync.RWMutex
6775
currentAddrs hostAddrs
6876

77+
signKey crypto.PrivKey
78+
addrStore addrStore
79+
signedRecordStore peerstore.CertifiedAddrBook
80+
hostID peer.ID
81+
6982
wg sync.WaitGroup
7083
ctx context.Context
7184
ctxCancel context.CancelFunc
@@ -78,10 +91,13 @@ func newAddrsManager(
7891
listenAddrs func() []ma.Multiaddr,
7992
addCertHashes func([]ma.Multiaddr) []ma.Multiaddr,
8093
observedAddrsManager ObservedAddrsManager,
81-
addrsUpdatedChan chan struct{},
8294
client autonatv2Client,
8395
enableMetrics bool,
8496
registerer prometheus.Registerer,
97+
disableSignedPeerRecord bool,
98+
signKey crypto.PrivKey,
99+
addrStore addrStore,
100+
hostID peer.ID,
85101
) (*addrsManager, error) {
86102
ctx, cancel := context.WithCancel(context.Background())
87103
as := &addrsManager{
@@ -93,14 +109,24 @@ func newAddrsManager(
93109
addrsFactory: addrsFactory,
94110
triggerAddrsUpdateChan: make(chan chan struct{}, 1),
95111
triggerReachabilityUpdate: make(chan struct{}, 1),
96-
addrsUpdatedChan: addrsUpdatedChan,
97112
interfaceAddrs: &interfaceAddrsCache{},
113+
signKey: signKey,
114+
addrStore: addrStore,
115+
hostID: hostID,
98116
ctx: ctx,
99117
ctxCancel: cancel,
100118
}
101119
unknownReachability := network.ReachabilityUnknown
102120
as.hostReachability.Store(&unknownReachability)
103121

122+
if !disableSignedPeerRecord {
123+
var ok bool
124+
as.signedRecordStore, ok = as.addrStore.(peerstore.CertifiedAddrBook)
125+
if !ok {
126+
return nil, errors.New("peerstore doesn't implement CertifiedAddrBook interface")
127+
}
128+
}
129+
104130
if client != nil {
105131
var metricsTracker MetricsTracker
106132
if enableMetrics {
@@ -183,6 +209,15 @@ func (a *addrsManager) startBackgroundWorker() (retErr error) {
183209
mc.Close(),
184210
)
185211
}
212+
mc = append(mc, emitter)
213+
214+
localAddrsEmitter, err := a.bus.Emitter(new(event.EvtLocalAddressesUpdated), eventbus.Stateful)
215+
if err != nil {
216+
return errors.Join(
217+
fmt.Errorf("error creating local addrs emitter: %s", err),
218+
mc.Close(),
219+
)
220+
}
186221

187222
var relayAddrs []ma.Multiaddr
188223
// update relay addrs in case we're private
@@ -205,15 +240,20 @@ func (a *addrsManager) startBackgroundWorker() (retErr error) {
205240
a.started.Store(true)
206241
// update addresses before starting the worker loop. This ensures that any address updates
207242
// before calling addrsManager.Start are correctly reported after Start returns.
208-
a.updateAddrs(relayAddrs)
243+
ha := a.updateAddrs(relayAddrs)
244+
a.updatePeerStore(ha.addrs, nil)
209245

210246
a.wg.Add(1)
211-
go a.background(autoRelayAddrsSub, autonatReachabilitySub, emitter, relayAddrs)
247+
go a.background(autoRelayAddrsSub, autonatReachabilitySub, emitter, localAddrsEmitter, relayAddrs)
212248
return nil
213249
}
214250

215-
func (a *addrsManager) background(autoRelayAddrsSub, autonatReachabilitySub event.Subscription,
216-
emitter event.Emitter, relayAddrs []ma.Multiaddr,
251+
func (a *addrsManager) background(
252+
autoRelayAddrsSub,
253+
autonatReachabilitySub event.Subscription,
254+
emitter event.Emitter,
255+
localAddrsEmitter event.Emitter,
256+
relayAddrs []ma.Multiaddr,
217257
) {
218258
defer a.wg.Done()
219259
defer func() {
@@ -229,6 +269,10 @@ func (a *addrsManager) background(autoRelayAddrsSub, autonatReachabilitySub even
229269
if err != nil {
230270
log.Warnf("error closing host reachability emitter: %s", err)
231271
}
272+
err = localAddrsEmitter.Close()
273+
if err != nil {
274+
log.Warnf("error closing local addrs emitter: %s", err)
275+
}
232276
}()
233277

234278
ticker := time.NewTicker(addrChangeTickrInterval)
@@ -241,7 +285,7 @@ func (a *addrsManager) background(autoRelayAddrsSub, autonatReachabilitySub even
241285
close(notifCh)
242286
notifCh = nil
243287
}
244-
a.notifyAddrsChanged(emitter, previousAddrs, currAddrs)
288+
a.notifyAddrsChanged(emitter, localAddrsEmitter, previousAddrs, currAddrs)
245289
previousAddrs = currAddrs
246290
select {
247291
case <-ticker.C:
@@ -294,19 +338,18 @@ func (a *addrsManager) updateAddrs(relayAddrs []ma.Multiaddr) hostAddrs {
294338
}
295339
}
296340

297-
func (a *addrsManager) notifyAddrsChanged(emitter event.Emitter, previous, current hostAddrs) {
341+
func (a *addrsManager) notifyAddrsChanged(emitter event.Emitter, localAddrsEmitter event.Emitter, previous, current hostAddrs) {
298342
if areAddrsDifferent(previous.localAddrs, current.localAddrs) {
299343
log.Debugf("host local addresses updated: %s", current.localAddrs)
300344
if a.addrsReachabilityTracker != nil {
301345
a.addrsReachabilityTracker.UpdateAddrs(current.localAddrs)
302346
}
303347
}
304348
if areAddrsDifferent(previous.addrs, current.addrs) {
305-
log.Debugf("host addresses updated: %s", current.localAddrs)
306-
select {
307-
case a.addrsUpdatedChan <- struct{}{}:
308-
default:
309-
}
349+
log.Debugf("host addresses updated: %s", current.addrs)
350+
351+
// Emit EvtLocalAddressesUpdated event and handle peerstore operations
352+
a.handleHostAddrsUpdated(localAddrsEmitter, current.addrs, previous.addrs)
310353
}
311354

312355
// We *must* send both reachability changed and addrs changed events from the
@@ -504,6 +547,182 @@ func areAddrsDifferent(prev, current []ma.Multiaddr) bool {
504547
return false
505548
}
506549

550+
// diffAddrs diffs prev and current addrs and returns added, maintained, and removed addrs.
551+
// Both prev and current are expected to be sorted using ma.Compare()
552+
func (a *addrsManager) diffAddrs(prev, current []ma.Multiaddr) (added, maintained, removed []ma.Multiaddr) {
553+
i, j := 0, 0
554+
for i < len(prev) && j < len(current) {
555+
cmp := prev[i].Compare(current[j])
556+
switch {
557+
case cmp < 0:
558+
// prev < current
559+
removed = append(removed, prev[i])
560+
i++
561+
case cmp > 0:
562+
// current < prev
563+
added = append(added, current[j])
564+
j++
565+
default:
566+
maintained = append(maintained, current[j])
567+
i++
568+
j++
569+
}
570+
}
571+
// All remaining current addresses are added
572+
added = append(added, current[j:]...)
573+
574+
// All remaining previous addresses are removed
575+
removed = append(removed, prev[i:]...)
576+
return
577+
}
578+
579+
// makeSignedPeerRecord creates a signed peer record for the given addresses
580+
func (a *addrsManager) makeSignedPeerRecord(addrs []ma.Multiaddr) (*record.Envelope, error) {
581+
if a.signKey == nil {
582+
return nil, fmt.Errorf("signKey is nil")
583+
}
584+
// Limit the length of currentAddrs to ensure that our signed peer records aren't rejected
585+
peerRecordSize := 64 // HostID
586+
k, err := a.signKey.Raw()
587+
var nk int
588+
if err == nil {
589+
nk = len(k)
590+
} else {
591+
nk = 1024 // In case of error, use a large enough value.
592+
}
593+
peerRecordSize += 2 * nk // 1 for signature, 1 for public key
594+
// we want the final address list to be small for keeping the signed peer record in size
595+
addrs = trimHostAddrList(addrs, maxPeerRecordSize-peerRecordSize-256) // 256 B of buffer
596+
rec := peer.PeerRecordFromAddrInfo(peer.AddrInfo{
597+
ID: a.hostID,
598+
Addrs: addrs,
599+
})
600+
return record.Seal(rec, a.signKey)
601+
}
602+
603+
// trimHostAddrList trims the address list to fit within the maximum size
604+
func trimHostAddrList(addrs []ma.Multiaddr, maxSize int) []ma.Multiaddr {
605+
totalSize := 0
606+
for _, a := range addrs {
607+
totalSize += len(a.Bytes())
608+
}
609+
if totalSize <= maxSize {
610+
return addrs
611+
}
612+
613+
score := func(addr ma.Multiaddr) int {
614+
var res int
615+
if manet.IsPublicAddr(addr) {
616+
res |= 1 << 12
617+
} else if !manet.IsIPLoopback(addr) {
618+
res |= 1 << 11
619+
}
620+
var protocolWeight int
621+
ma.ForEach(addr, func(c ma.Component) bool {
622+
switch c.Protocol().Code {
623+
case ma.P_QUIC_V1:
624+
protocolWeight = 5
625+
case ma.P_TCP:
626+
protocolWeight = 4
627+
case ma.P_WSS:
628+
protocolWeight = 3
629+
case ma.P_WEBTRANSPORT:
630+
protocolWeight = 2
631+
case ma.P_WEBRTC_DIRECT:
632+
protocolWeight = 1
633+
case ma.P_P2P:
634+
return false
635+
}
636+
return true
637+
})
638+
res |= 1 << protocolWeight
639+
return res
640+
}
641+
642+
slices.SortStableFunc(addrs, func(a, b ma.Multiaddr) int {
643+
return score(b) - score(a) // b-a for reverse order
644+
})
645+
totalSize = 0
646+
for i, a := range addrs {
647+
totalSize += len(a.Bytes())
648+
if totalSize > maxSize {
649+
addrs = addrs[:i]
650+
break
651+
}
652+
}
653+
return addrs
654+
}
655+
656+
// handleHostAddrsUpdated emits an EvtLocalAddressesUpdated event and updates the addresses in the peerstore.
657+
func (a *addrsManager) handleHostAddrsUpdated(emitter event.Emitter, currentAddrs []ma.Multiaddr, lastAddrs []ma.Multiaddr) {
658+
added, maintained, removed := a.diffAddrs(lastAddrs, currentAddrs)
659+
if len(added) == 0 && len(removed) == 0 {
660+
return
661+
}
662+
663+
sr := a.updatePeerStore(currentAddrs, removed)
664+
665+
evt := &event.EvtLocalAddressesUpdated{
666+
Diffs: true,
667+
Current: make([]event.UpdatedAddress, 0, len(currentAddrs)),
668+
Removed: make([]event.UpdatedAddress, 0, len(removed)),
669+
SignedPeerRecord: sr,
670+
}
671+
672+
for _, addr := range maintained {
673+
evt.Current = append(evt.Current, event.UpdatedAddress{
674+
Address: addr,
675+
Action: event.Maintained,
676+
})
677+
}
678+
679+
for _, addr := range added {
680+
evt.Current = append(evt.Current, event.UpdatedAddress{
681+
Address: addr,
682+
Action: event.Added,
683+
})
684+
}
685+
686+
for _, addr := range removed {
687+
evt.Removed = append(evt.Removed, event.UpdatedAddress{
688+
Address: addr,
689+
Action: event.Removed,
690+
})
691+
}
692+
693+
// emit addr change event
694+
if err := emitter.Emit(*evt); err != nil {
695+
log.Warnf("error emitting event for updated addrs: %s", err)
696+
}
697+
}
698+
699+
// updatePeerStore updates the peer store and returns the signed peer record.
700+
// If the signed peer record is not created, it returns nil.
701+
func (a *addrsManager) updatePeerStore(currentAddrs []ma.Multiaddr, removedAddrs []ma.Multiaddr) *record.Envelope {
702+
// update host addresses in the peer store
703+
a.addrStore.SetAddrs(a.hostID, currentAddrs, peerstore.PermanentAddrTTL)
704+
a.addrStore.SetAddrs(a.hostID, removedAddrs, 0)
705+
706+
var sr *record.Envelope
707+
// Our addresses have changed.
708+
// store the signed peer record in the peer store.
709+
if a.signedRecordStore != nil {
710+
var err error
711+
// add signed peer record to the event
712+
// in case of an error drop this event.
713+
sr, err = a.makeSignedPeerRecord(currentAddrs)
714+
if err != nil {
715+
log.Errorf("error creating a signed peer record from the set of current addresses, err=%s", err)
716+
return nil
717+
}
718+
if _, err := a.signedRecordStore.ConsumePeerRecord(sr, peerstore.PermanentAddrTTL); err != nil {
719+
log.Errorf("failed to persist signed peer record in peer store, err=%s", err)
720+
return nil
721+
}
722+
}
723+
return sr
724+
}
725+
507726
const interfaceAddrsCacheTTL = time.Minute
508727

509728
type interfaceAddrsCache struct {

0 commit comments

Comments
 (0)