Skip to content

Commit 49d58c5

Browse files
committed
basichost: improve autonatv2 reachability logic
This improves the reachability detection logic by introducing the concept of primary and secondary addresses. If we have a webtransport address which shares the IP and Port with a QUIC address, the WebTransport address will be considered secondary and the QUIC address will be considered primary. If the Primary is reachable or unreachable, we require only one confirmation for the Secondary address. This speeds up address verification considerably.
1 parent 4d77c2b commit 49d58c5

File tree

2 files changed

+245
-28
lines changed

2 files changed

+245
-28
lines changed

p2p/host/basic/addrs_reachability_tracker.go

Lines changed: 143 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,10 @@ const (
361361
// and then a success(...S S S S F S). The confidence in the targetConfidence window will be equal to
362362
// targetConfidence, the last F and S cancel each other, and we won't probe again for maxProbeInterval.
363363
maxRecentDialsWindow = targetConfidence + 2
364+
// secondaryAddrsScalingFactor is the multiplier applied to secondary address dial outcomes. For secondary
365+
// addr, if the primary addr is reachable, a single successful dial is enough to consider the secondary addr
366+
// reachable.
367+
secondaryAddrsScalingFactor = targetConfidence
364368
// highConfidenceAddrProbeInterval is the maximum interval between probes for an address
365369
highConfidenceAddrProbeInterval = 1 * time.Hour
366370
// maxProbeResultTTL is the maximum time to keep probe results for an address
@@ -380,7 +384,8 @@ type probeManager struct {
380384
inProgressProbes map[string]int // addr -> count
381385
inProgressProbesTotal int
382386
statuses map[string]*addrStatus
383-
addrs []ma.Multiaddr
387+
primaryAddrs []ma.Multiaddr
388+
secondaryAddrs []ma.Multiaddr
384389
}
385390

386391
// newProbeManager creates a new probe manager.
@@ -397,7 +402,19 @@ func (m *probeManager) AppendConfirmedAddrs(reachable, unreachable, unknown []ma
397402
m.mx.Lock()
398403
defer m.mx.Unlock()
399404

400-
for _, a := range m.addrs {
405+
for _, a := range m.primaryAddrs {
406+
s := m.statuses[string(a.Bytes())]
407+
s.RemoveBefore(m.now().Add(-maxProbeResultTTL)) // cleanup stale results
408+
switch s.Reachability() {
409+
case network.ReachabilityPublic:
410+
reachable = append(reachable, a)
411+
case network.ReachabilityPrivate:
412+
unreachable = append(unreachable, a)
413+
case network.ReachabilityUnknown:
414+
unknown = append(unknown, a)
415+
}
416+
}
417+
for _, a := range m.secondaryAddrs {
401418
s := m.statuses[string(a.Bytes())]
402419
s.RemoveBefore(m.now().Add(-maxProbeResultTTL)) // cleanup stale results
403420
switch s.Reachability() {
@@ -425,9 +442,20 @@ func (m *probeManager) UpdateAddrs(addrs []ma.Multiaddr) {
425442
statuses[k] = &addrStatus{Addr: addr}
426443
} else {
427444
statuses[k] = m.statuses[k]
445+
// our addresses have changed, we might have removed the primary address
446+
statuses[k].primary = nil
447+
}
448+
}
449+
assignPrimaryAddrs(statuses)
450+
m.primaryAddrs = m.primaryAddrs[:0]
451+
m.secondaryAddrs = m.secondaryAddrs[:0]
452+
for _, a := range addrs {
453+
if statuses[string(a.Bytes())].primary == nil {
454+
m.primaryAddrs = append(m.primaryAddrs, a)
455+
} else {
456+
m.secondaryAddrs = append(m.secondaryAddrs, a)
428457
}
429458
}
430-
m.addrs = addrs
431459
m.statuses = statuses
432460
}
433461

@@ -439,32 +467,52 @@ func (m *probeManager) GetProbe() probe {
439467
defer m.mx.Unlock()
440468

441469
now := m.now()
442-
for i, a := range m.addrs {
443-
ab := a.Bytes()
444-
pc := m.statuses[string(ab)].RequiredProbeCount(now)
445-
if m.inProgressProbes[string(ab)] >= pc {
446-
continue
470+
reqs := make(probe, 0, maxAddrsPerRequest)
471+
reqs = m.appendRequestsToProbe(reqs, m.primaryAddrs, now)
472+
reqs = m.appendRequestsToProbe(reqs, m.secondaryAddrs, now)
473+
if len(reqs) >= maxAddrsPerRequest {
474+
reqs = reqs[:maxAddrsPerRequest]
475+
}
476+
return reqs
477+
}
478+
479+
func (m *probeManager) appendRequestsToProbe(reqs probe, addrs []ma.Multiaddr, now time.Time) probe {
480+
n := len(addrs)
481+
i := n
482+
if len(reqs) == 0 {
483+
for i = 0; i < n; i++ {
484+
sab := string(addrs[i].Bytes())
485+
s := m.statuses[sab]
486+
pc := s.RequiredProbeCount(now)
487+
if pc == 0 || m.inProgressProbes[sab] >= pc {
488+
continue
489+
}
490+
reqs = append(reqs, autonatv2.Request{Addr: addrs[i], SendDialData: true})
491+
break
447492
}
448-
reqs := make(probe, 0, maxAddrsPerRequest)
449-
reqs = append(reqs, autonatv2.Request{Addr: a, SendDialData: true})
450-
// We have the first(primary) address. Append other addresses, ignoring inprogress probes
451-
// on secondary addresses. The expectation is that the primary address will
452-
// be dialed.
453-
for j := 1; j < len(m.addrs); j++ {
454-
k := (i + j) % len(m.addrs)
455-
ab := m.addrs[k].Bytes()
456-
pc := m.statuses[string(ab)].RequiredProbeCount(now)
493+
}
494+
495+
// We have the first address. Append other addresses, ignoring inprogress probes.
496+
// The expectation is that the first address will be dialed.
497+
if len(reqs) > 0 {
498+
for j := range n {
499+
k := (j + i) % n
500+
if k == i {
501+
continue
502+
}
503+
sab := string(addrs[k].Bytes())
504+
s := m.statuses[sab]
505+
pc := s.RequiredProbeCount(now)
457506
if pc == 0 {
458507
continue
459508
}
460-
reqs = append(reqs, autonatv2.Request{Addr: m.addrs[k], SendDialData: true})
509+
reqs = append(reqs, autonatv2.Request{Addr: addrs[k], SendDialData: true})
461510
if len(reqs) >= maxAddrsPerRequest {
462511
break
463512
}
464513
}
465-
return reqs
466514
}
467-
return nil
515+
return reqs
468516
}
469517

470518
// MarkProbeInProgress should be called when a probe is started.
@@ -499,10 +547,10 @@ func (m *probeManager) CompleteProbe(reqs probe, res autonatv2.Result, err error
499547
defer m.mx.Unlock()
500548

501549
// decrement in-progress count for the first address
502-
primaryAddrKey := string(reqs[0].Addr.Bytes())
503-
m.inProgressProbes[primaryAddrKey]--
504-
if m.inProgressProbes[primaryAddrKey] <= 0 {
505-
delete(m.inProgressProbes, primaryAddrKey)
550+
firstAddrKey := string(reqs[0].Addr.Bytes())
551+
m.inProgressProbes[firstAddrKey]--
552+
if m.inProgressProbes[firstAddrKey] <= 0 {
553+
delete(m.inProgressProbes, firstAddrKey)
506554
}
507555
m.inProgressProbesTotal--
508556

@@ -511,17 +559,17 @@ func (m *probeManager) CompleteProbe(reqs probe, res autonatv2.Result, err error
511559
return
512560
}
513561

514-
// Consider only primary address as refused. This increases the number of
562+
// Consider only first address as refused. This increases the number of
515563
// refused probes, but refused probes are cheap for a server as no dials are made.
516564
if res.AllAddrsRefused {
517-
if s, ok := m.statuses[primaryAddrKey]; ok {
565+
if s, ok := m.statuses[firstAddrKey]; ok {
518566
s.AddRefusal(now)
519567
}
520568
return
521569
}
522570
dialAddrKey := string(res.Addr.Bytes())
523-
if dialAddrKey != primaryAddrKey {
524-
if s, ok := m.statuses[primaryAddrKey]; ok {
571+
if dialAddrKey != firstAddrKey {
572+
if s, ok := m.statuses[firstAddrKey]; ok {
525573
s.AddRefusal(now)
526574
}
527575
}
@@ -539,6 +587,7 @@ type dialOutcome struct {
539587

540588
type addrStatus struct {
541589
Addr ma.Multiaddr
590+
primary *addrStatus
542591
lastRefusalTime time.Time
543592
consecutiveRefusals int
544593
dialTimes []time.Time
@@ -670,6 +719,15 @@ func (s *addrStatus) reachabilityAndCounts() (rch network.Reachability, successe
670719
failures++
671720
}
672721
}
722+
if s.primary != nil {
723+
prch, _, _ := s.primary.reachabilityAndCounts()
724+
switch prch {
725+
case network.ReachabilityPublic:
726+
successes *= secondaryAddrsScalingFactor
727+
case network.ReachabilityPrivate:
728+
failures *= secondaryAddrsScalingFactor
729+
}
730+
}
673731
if successes-failures >= minConfidence {
674732
return network.ReachabilityPublic, successes, failures
675733
}
@@ -678,3 +736,60 @@ func (s *addrStatus) reachabilityAndCounts() (rch network.Reachability, successe
678736
}
679737
return network.ReachabilityUnknown, successes, failures
680738
}
739+
740+
var errNotTW = errors.New("not a thinwaist address")
741+
742+
func thinWaistPart(a ma.Multiaddr) (ma.Multiaddr, error) {
743+
if len(a) < 2 {
744+
return nil, errNotTW
745+
}
746+
if c0, c1 := a[0].Code(), a[1].Code(); (c0 != ma.P_IP4 && c0 != ma.P_IP6) || (c1 != ma.P_TCP && c1 != ma.P_UDP) {
747+
return nil, errNotTW
748+
}
749+
return a[:2], nil
750+
}
751+
752+
func assignPrimaryAddrs(statuses map[string]*addrStatus) {
753+
twMap := make(map[string][]ma.Multiaddr, len(statuses))
754+
for _, s := range statuses {
755+
twp, err := thinWaistPart(s.Addr)
756+
if err != nil {
757+
continue
758+
}
759+
twMap[string(twp.Bytes())] = append(twMap[string(twp.Bytes())], s.Addr)
760+
}
761+
762+
score := func(a ma.Multiaddr) int {
763+
score := 0
764+
for _, p := range a {
765+
switch p.Code() {
766+
case ma.P_QUIC_V1, ma.P_TCP:
767+
score += 1
768+
case ma.P_WEBTRANSPORT:
769+
score += 1 << 1
770+
case ma.P_WEBRTC:
771+
score += 1 << 2
772+
case ma.P_WS, ma.P_WSS:
773+
score += 1 << 3
774+
}
775+
}
776+
if score == 0 {
777+
return 1 << 20
778+
}
779+
return score
780+
}
781+
for _, addrs := range twMap {
782+
if len(addrs) <= 1 {
783+
continue
784+
}
785+
slices.SortFunc(addrs, func(a, b ma.Multiaddr) int {
786+
return score(a) - score(b)
787+
})
788+
primary := addrs[0]
789+
ps := statuses[string(primary.Bytes())]
790+
for _, a := range addrs[1:] {
791+
s := statuses[string(a.Bytes())]
792+
s.primary = ps
793+
}
794+
}
795+
}

p2p/host/basic/addrs_reachability_tracker_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/libp2p/go-libp2p/core/network"
2020
"github.com/libp2p/go-libp2p/p2p/protocol/autonatv2"
2121
ma "github.com/multiformats/go-multiaddr"
22+
"github.com/multiformats/go-multiaddr/matest"
2223
"github.com/stretchr/testify/assert"
2324
"github.com/stretchr/testify/require"
2425
)
@@ -193,6 +194,48 @@ func TestProbeManager(t *testing.T) {
193194
require.Empty(t, reachable)
194195
require.Empty(t, unreachable)
195196
})
197+
198+
t.Run("primary secondary", func(t *testing.T) {
199+
quic := ma.StringCast("/ip4/1.1.1.1/udp/1/quic-v1")
200+
webrtc := ma.StringCast("/ip4/1.1.1.1/udp/1/webrtc-direct")
201+
tcp := ma.StringCast("/ip4/1.1.1.1/tcp/1")
202+
websocket := ma.StringCast("/ip4/1.1.1.1/tcp/1/ws")
203+
pm := makeNewProbeManager([]ma.Multiaddr{tcp, websocket, webrtc, quic})
204+
// tcp private
205+
for range targetConfidence {
206+
reqs := nextProbe(pm)
207+
matest.AssertEqualMultiaddr(t, tcp, reqs[0].Addr)
208+
matest.AssertEqualMultiaddr(t, quic, reqs[1].Addr)
209+
matest.AssertEqualMultiaddr(t, websocket, reqs[2].Addr)
210+
matest.AssertEqualMultiaddr(t, webrtc, reqs[3].Addr)
211+
pm.CompleteProbe(reqs, autonatv2.Result{Addr: tcp, Idx: 0, Reachability: network.ReachabilityPrivate}, nil)
212+
}
213+
// quic public
214+
for range targetConfidence {
215+
reqs := nextProbe(pm)
216+
matest.AssertEqualMultiaddr(t, quic, reqs[0].Addr)
217+
matest.AssertEqualMultiaddr(t, websocket, reqs[1].Addr)
218+
matest.AssertEqualMultiaddr(t, webrtc, reqs[2].Addr)
219+
pm.CompleteProbe(reqs, autonatv2.Result{Addr: quic, Idx: 0, Reachability: network.ReachabilityPublic}, nil)
220+
}
221+
// only 1 check now required for websocket
222+
for range 1 {
223+
reqs := nextProbe(pm)
224+
matest.AssertEqualMultiaddr(t, websocket, reqs[0].Addr)
225+
matest.AssertEqualMultiaddr(t, webrtc, reqs[1].Addr)
226+
pm.CompleteProbe(reqs, autonatv2.Result{Addr: websocket, Idx: 0, Reachability: network.ReachabilityPrivate}, nil)
227+
}
228+
// 3 checks required for webrtc because its reachability is different from quic
229+
for range targetConfidence {
230+
reqs := nextProbe(pm)
231+
matest.AssertEqualMultiaddr(t, webrtc, reqs[0].Addr)
232+
pm.CompleteProbe(reqs, autonatv2.Result{Addr: webrtc, Idx: 0, Reachability: network.ReachabilityPrivate}, nil)
233+
}
234+
235+
reachable, unreachable, _ := pm.AppendConfirmedAddrs(nil, nil, nil)
236+
matest.AssertMultiaddrsMatch(t, reachable, []ma.Multiaddr{quic})
237+
matest.AssertMultiaddrsMatch(t, unreachable, []ma.Multiaddr{tcp, websocket, webrtc})
238+
})
196239
}
197240

198241
type mockAutoNATClient struct {
@@ -720,6 +763,65 @@ func TestAddrStatusProbeCount(t *testing.T) {
720763
}
721764
}
722765

766+
func TestAssignPrimaryAddress(t *testing.T) {
767+
webTransport1 := ma.StringCast("/ip4/127.0.0.1/udp/1/quic-v1/webtransport")
768+
quic1 := ma.StringCast("/ip4/127.0.0.1/udp/1/quic-v1")
769+
webRTC1 := ma.StringCast("/ip4/127.0.0.1/udp/1/webrtc-direct")
770+
771+
webTransport2 := ma.StringCast("/ip4/127.0.0.1/udp/2/quic-v1/webtransport")
772+
quic2 := ma.StringCast("/ip4/127.0.0.1/udp/2/quic-v1")
773+
webRTC2 := ma.StringCast("/ip4/127.0.0.1/udp/2/webrtc-direct")
774+
775+
tcp1 := ma.StringCast("/ip4/127.0.0.1/tcp/1")
776+
ws1 := ma.StringCast("/ip4/127.0.0.1/tcp/1/ws")
777+
778+
tests := [][]struct{ secondary, primary ma.Multiaddr }{
779+
{
780+
{webTransport1, quic1},
781+
{webRTC1, quic1},
782+
},
783+
{
784+
{webTransport1, quic1},
785+
{webRTC1, quic1},
786+
{webTransport2, quic2},
787+
{webRTC2, quic2},
788+
},
789+
{
790+
{webTransport1, quic1},
791+
{webRTC1, quic1},
792+
{webTransport2, quic2},
793+
{webRTC2, quic2},
794+
{ws1, tcp1},
795+
},
796+
{
797+
{webTransport1, nil},
798+
{quic2, nil},
799+
{ws1, nil},
800+
},
801+
}
802+
for i, tt := range tests {
803+
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
804+
statuses := make(map[string]*addrStatus)
805+
for _, p := range tt {
806+
if p.primary != nil {
807+
statuses[string(p.primary.Bytes())] = &addrStatus{Addr: p.primary}
808+
}
809+
statuses[string(p.secondary.Bytes())] = &addrStatus{Addr: p.secondary}
810+
}
811+
assignPrimaryAddrs(statuses)
812+
for _, p := range tt {
813+
if p.primary != nil {
814+
require.Nil(t, statuses[string(p.primary.Bytes())].primary)
815+
require.Equal(t, statuses[string(p.secondary.Bytes())].primary, statuses[string(p.primary.Bytes())])
816+
} else {
817+
require.Nil(t, statuses[string(p.secondary.Bytes())].primary)
818+
}
819+
}
820+
})
821+
}
822+
823+
}
824+
723825
func BenchmarkAddrTracker(b *testing.B) {
724826
cl := clock.NewMock()
725827
t := newProbeManager(cl.Now)

0 commit comments

Comments
 (0)