Skip to content

Commit 5854b79

Browse files
committed
basichost: improve autonatv2 reachability logic
This improves the reachability detection logic by introducing the concept of primary and secondary addresses. If we have a webtransport address which shares the IP and Port with a QUIC address, the WebTransport address will be considered secondary and the QUIC address will be considered primary. If the Primary is reachable or unreachable, we require only one confirmation for the Secondary address. This speeds up address verification considerably.
1 parent 2346193 commit 5854b79

File tree

2 files changed

+243
-28
lines changed

2 files changed

+243
-28
lines changed

p2p/host/basic/addrs_reachability_tracker.go

Lines changed: 141 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,10 @@ const (
361361
// and then a success(...S S S S F S). The confidence in the targetConfidence window will be equal to
362362
// targetConfidence, the last F and S cancel each other, and we won't probe again for maxProbeInterval.
363363
maxRecentDialsWindow = targetConfidence + 2
364+
// secondaryAddrsScalingFactor is the multiplier applied to secondary address dial outcomes. For secondary
365+
// addr, if the primary addr is reachable, a single successful dial is enough to consider the secondary addr
366+
// reachable.
367+
secondaryAddrsScalingFactor = targetConfidence
364368
// highConfidenceAddrProbeInterval is the maximum interval between probes for an address
365369
highConfidenceAddrProbeInterval = 1 * time.Hour
366370
// maxProbeResultTTL is the maximum time to keep probe results for an address
@@ -380,7 +384,8 @@ type probeManager struct {
380384
inProgressProbes map[string]int // addr -> count
381385
inProgressProbesTotal int
382386
statuses map[string]*addrStatus
383-
addrs []ma.Multiaddr
387+
primaryAddrs []ma.Multiaddr
388+
secondaryAddrs []ma.Multiaddr
384389
}
385390

386391
// newProbeManager creates a new probe manager.
@@ -397,7 +402,19 @@ func (m *probeManager) AppendConfirmedAddrs(reachable, unreachable, unknown []ma
397402
m.mx.Lock()
398403
defer m.mx.Unlock()
399404

400-
for _, a := range m.addrs {
405+
for _, a := range m.primaryAddrs {
406+
s := m.statuses[string(a.Bytes())]
407+
s.RemoveBefore(m.now().Add(-maxProbeResultTTL)) // cleanup stale results
408+
switch s.Reachability() {
409+
case network.ReachabilityPublic:
410+
reachable = append(reachable, a)
411+
case network.ReachabilityPrivate:
412+
unreachable = append(unreachable, a)
413+
case network.ReachabilityUnknown:
414+
unknown = append(unknown, a)
415+
}
416+
}
417+
for _, a := range m.secondaryAddrs {
401418
s := m.statuses[string(a.Bytes())]
402419
s.RemoveBefore(m.now().Add(-maxProbeResultTTL)) // cleanup stale results
403420
switch s.Reachability() {
@@ -425,9 +442,20 @@ func (m *probeManager) UpdateAddrs(addrs []ma.Multiaddr) {
425442
statuses[k] = &addrStatus{Addr: addr}
426443
} else {
427444
statuses[k] = m.statuses[k]
445+
// our addresses have changed, we might have removed the primary address
446+
statuses[k].primary = nil
447+
}
448+
}
449+
assignPrimaryAddrs(statuses)
450+
m.primaryAddrs = m.primaryAddrs[:0]
451+
m.secondaryAddrs = m.secondaryAddrs[:0]
452+
for _, a := range addrs {
453+
if statuses[string(a.Bytes())].primary == nil {
454+
m.primaryAddrs = append(m.primaryAddrs, a)
455+
} else {
456+
m.secondaryAddrs = append(m.secondaryAddrs, a)
428457
}
429458
}
430-
m.addrs = addrs
431459
m.statuses = statuses
432460
}
433461

@@ -439,32 +467,50 @@ func (m *probeManager) GetProbe() probe {
439467
defer m.mx.Unlock()
440468

441469
now := m.now()
442-
for i, a := range m.addrs {
443-
ab := a.Bytes()
444-
pc := m.statuses[string(ab)].RequiredProbeCount(now)
445-
if m.inProgressProbes[string(ab)] >= pc {
446-
continue
470+
reqs := make(probe, 0, maxAddrsPerRequest)
471+
reqs = m.appendRequestsToProbe(reqs, m.primaryAddrs, now)
472+
reqs = m.appendRequestsToProbe(reqs, m.secondaryAddrs, now)
473+
if len(reqs) >= maxAddrsPerRequest {
474+
reqs = reqs[:maxAddrsPerRequest]
475+
}
476+
return reqs
477+
}
478+
479+
func (m *probeManager) appendRequestsToProbe(reqs probe, addrs []ma.Multiaddr, now time.Time) probe {
480+
n := len(addrs)
481+
i := n
482+
if len(reqs) == 0 {
483+
for i = 0; i < n; i++ {
484+
s := m.statuses[string(addrs[i].Bytes())]
485+
pc := s.RequiredProbeCount(now)
486+
if pc == 0 || m.inProgressProbes[string(addrs[i].Bytes())] >= pc {
487+
continue
488+
}
489+
reqs = append(reqs, autonatv2.Request{Addr: addrs[i], SendDialData: true})
490+
break
447491
}
448-
reqs := make(probe, 0, maxAddrsPerRequest)
449-
reqs = append(reqs, autonatv2.Request{Addr: a, SendDialData: true})
450-
// We have the first(primary) address. Append other addresses, ignoring inprogress probes
451-
// on secondary addresses. The expectation is that the primary address will
452-
// be dialed.
453-
for j := 1; j < len(m.addrs); j++ {
454-
k := (i + j) % len(m.addrs)
455-
ab := m.addrs[k].Bytes()
456-
pc := m.statuses[string(ab)].RequiredProbeCount(now)
492+
}
493+
494+
// We have the first address. Append other addresses, ignoring inprogress probes.
495+
// The expectation is that the first address will be dialed.
496+
if len(reqs) > 0 {
497+
for j := range n {
498+
k := (j + i) % n
499+
if k == i {
500+
continue
501+
}
502+
s := m.statuses[string(addrs[k].Bytes())]
503+
pc := s.RequiredProbeCount(now)
457504
if pc == 0 {
458505
continue
459506
}
460-
reqs = append(reqs, autonatv2.Request{Addr: m.addrs[k], SendDialData: true})
507+
reqs = append(reqs, autonatv2.Request{Addr: addrs[k], SendDialData: true})
461508
if len(reqs) >= maxAddrsPerRequest {
462509
break
463510
}
464511
}
465-
return reqs
466512
}
467-
return nil
513+
return reqs
468514
}
469515

470516
// MarkProbeInProgress should be called when a probe is started.
@@ -499,10 +545,10 @@ func (m *probeManager) CompleteProbe(reqs probe, res autonatv2.Result, err error
499545
defer m.mx.Unlock()
500546

501547
// decrement in-progress count for the first address
502-
primaryAddrKey := string(reqs[0].Addr.Bytes())
503-
m.inProgressProbes[primaryAddrKey]--
504-
if m.inProgressProbes[primaryAddrKey] <= 0 {
505-
delete(m.inProgressProbes, primaryAddrKey)
548+
firstAddrKey := string(reqs[0].Addr.Bytes())
549+
m.inProgressProbes[firstAddrKey]--
550+
if m.inProgressProbes[firstAddrKey] <= 0 {
551+
delete(m.inProgressProbes, firstAddrKey)
506552
}
507553
m.inProgressProbesTotal--
508554

@@ -511,17 +557,17 @@ func (m *probeManager) CompleteProbe(reqs probe, res autonatv2.Result, err error
511557
return
512558
}
513559

514-
// Consider only primary address as refused. This increases the number of
560+
// Consider only first address as refused. This increases the number of
515561
// refused probes, but refused probes are cheap for a server as no dials are made.
516562
if res.AllAddrsRefused {
517-
if s, ok := m.statuses[primaryAddrKey]; ok {
563+
if s, ok := m.statuses[firstAddrKey]; ok {
518564
s.AddRefusal(now)
519565
}
520566
return
521567
}
522568
dialAddrKey := string(res.Addr.Bytes())
523-
if dialAddrKey != primaryAddrKey {
524-
if s, ok := m.statuses[primaryAddrKey]; ok {
569+
if dialAddrKey != firstAddrKey {
570+
if s, ok := m.statuses[firstAddrKey]; ok {
525571
s.AddRefusal(now)
526572
}
527573
}
@@ -539,6 +585,7 @@ type dialOutcome struct {
539585

540586
type addrStatus struct {
541587
Addr ma.Multiaddr
588+
primary *addrStatus
542589
lastRefusalTime time.Time
543590
consecutiveRefusals int
544591
dialTimes []time.Time
@@ -670,6 +717,15 @@ func (s *addrStatus) reachabilityAndCounts() (rch network.Reachability, successe
670717
failures++
671718
}
672719
}
720+
if s.primary != nil {
721+
prch, _, _ := s.primary.reachabilityAndCounts()
722+
switch prch {
723+
case network.ReachabilityPublic:
724+
successes *= secondaryAddrsScalingFactor
725+
case network.ReachabilityPrivate:
726+
failures *= secondaryAddrsScalingFactor
727+
}
728+
}
673729
if successes-failures >= minConfidence {
674730
return network.ReachabilityPublic, successes, failures
675731
}
@@ -678,3 +734,60 @@ func (s *addrStatus) reachabilityAndCounts() (rch network.Reachability, successe
678734
}
679735
return network.ReachabilityUnknown, successes, failures
680736
}
737+
738+
var errNotTW = errors.New("not a thinwaist address")
739+
740+
func thinWaistPart(a ma.Multiaddr) (ma.Multiaddr, error) {
741+
if len(a) < 2 {
742+
return nil, errNotTW
743+
}
744+
if c0, c1 := a[0].Code(), a[1].Code(); (c0 != ma.P_IP4 && c0 != ma.P_IP6) || (c1 != ma.P_TCP && c1 != ma.P_UDP) {
745+
return nil, errNotTW
746+
}
747+
return a[:2], nil
748+
}
749+
750+
func assignPrimaryAddrs(statuses map[string]*addrStatus) {
751+
twMap := make(map[string][]ma.Multiaddr, len(statuses))
752+
for _, s := range statuses {
753+
twp, err := thinWaistPart(s.Addr)
754+
if err != nil {
755+
continue
756+
}
757+
twMap[string(twp.Bytes())] = append(twMap[string(twp.Bytes())], s.Addr)
758+
}
759+
760+
score := func(a ma.Multiaddr) int {
761+
score := 0
762+
for _, p := range a {
763+
switch p.Code() {
764+
case ma.P_QUIC_V1, ma.P_TCP:
765+
score += 1
766+
case ma.P_WEBTRANSPORT:
767+
score += 1 << 1
768+
case ma.P_WEBRTC:
769+
score += 1 << 2
770+
case ma.P_WS, ma.P_WSS:
771+
score += 1 << 3
772+
}
773+
}
774+
if score == 0 {
775+
return 1 << 20
776+
}
777+
return score
778+
}
779+
for _, addrs := range twMap {
780+
if len(addrs) <= 1 {
781+
continue
782+
}
783+
slices.SortFunc(addrs, func(a, b ma.Multiaddr) int {
784+
return score(a) - score(b)
785+
})
786+
primary := addrs[0]
787+
ps := statuses[string(primary.Bytes())]
788+
for _, a := range addrs[1:] {
789+
s := statuses[string(a.Bytes())]
790+
s.primary = ps
791+
}
792+
}
793+
}

p2p/host/basic/addrs_reachability_tracker_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/libp2p/go-libp2p/core/network"
2020
"github.com/libp2p/go-libp2p/p2p/protocol/autonatv2"
2121
ma "github.com/multiformats/go-multiaddr"
22+
"github.com/multiformats/go-multiaddr/matest"
2223
"github.com/stretchr/testify/assert"
2324
"github.com/stretchr/testify/require"
2425
)
@@ -193,6 +194,48 @@ func TestProbeManager(t *testing.T) {
193194
require.Empty(t, reachable)
194195
require.Empty(t, unreachable)
195196
})
197+
198+
t.Run("primary secondary", func(t *testing.T) {
199+
quic := ma.StringCast("/ip4/1.1.1.1/udp/1/quic-v1")
200+
webrtc := ma.StringCast("/ip4/1.1.1.1/udp/1/webrtc-direct")
201+
tcp := ma.StringCast("/ip4/1.1.1.1/tcp/1")
202+
websocket := ma.StringCast("/ip4/1.1.1.1/tcp/1/ws")
203+
pm := makeNewProbeManager([]ma.Multiaddr{tcp, websocket, webrtc, quic})
204+
// tcp private
205+
for range targetConfidence {
206+
reqs := nextProbe(pm)
207+
matest.AssertEqualMultiaddr(t, tcp, reqs[0].Addr)
208+
matest.AssertEqualMultiaddr(t, quic, reqs[1].Addr)
209+
matest.AssertEqualMultiaddr(t, websocket, reqs[2].Addr)
210+
matest.AssertEqualMultiaddr(t, webrtc, reqs[3].Addr)
211+
pm.CompleteProbe(reqs, autonatv2.Result{Addr: tcp, Idx: 0, Reachability: network.ReachabilityPrivate}, nil)
212+
}
213+
// quic public
214+
for range targetConfidence {
215+
reqs := nextProbe(pm)
216+
matest.AssertEqualMultiaddr(t, quic, reqs[0].Addr)
217+
matest.AssertEqualMultiaddr(t, websocket, reqs[1].Addr)
218+
matest.AssertEqualMultiaddr(t, webrtc, reqs[2].Addr)
219+
pm.CompleteProbe(reqs, autonatv2.Result{Addr: quic, Idx: 0, Reachability: network.ReachabilityPublic}, nil)
220+
}
221+
// only 1 check now required for websocket
222+
for range 1 {
223+
reqs := nextProbe(pm)
224+
matest.AssertEqualMultiaddr(t, websocket, reqs[0].Addr)
225+
matest.AssertEqualMultiaddr(t, webrtc, reqs[1].Addr)
226+
pm.CompleteProbe(reqs, autonatv2.Result{Addr: websocket, Idx: 0, Reachability: network.ReachabilityPrivate}, nil)
227+
}
228+
// 3 checks required for webrtc because its reachability is different from quic
229+
for range targetConfidence {
230+
reqs := nextProbe(pm)
231+
matest.AssertEqualMultiaddr(t, webrtc, reqs[0].Addr)
232+
pm.CompleteProbe(reqs, autonatv2.Result{Addr: webrtc, Idx: 0, Reachability: network.ReachabilityPrivate}, nil)
233+
}
234+
235+
reachable, unreachable, _ := pm.AppendConfirmedAddrs(nil, nil, nil)
236+
matest.AssertMultiaddrsMatch(t, reachable, []ma.Multiaddr{quic})
237+
matest.AssertMultiaddrsMatch(t, unreachable, []ma.Multiaddr{tcp, websocket, webrtc})
238+
})
196239
}
197240

198241
type mockAutoNATClient struct {
@@ -720,6 +763,65 @@ func TestAddrStatusProbeCount(t *testing.T) {
720763
}
721764
}
722765

766+
func TestAssignPrimaryAddress(t *testing.T) {
767+
webTransport1 := ma.StringCast("/ip4/127.0.0.1/udp/1/quic-v1/webtransport")
768+
quic1 := ma.StringCast("/ip4/127.0.0.1/udp/1/quic-v1")
769+
webRTC1 := ma.StringCast("/ip4/127.0.0.1/udp/1/webrtc-direct")
770+
771+
webTransport2 := ma.StringCast("/ip4/127.0.0.1/udp/2/quic-v1/webtransport")
772+
quic2 := ma.StringCast("/ip4/127.0.0.1/udp/2/quic-v1")
773+
webRTC2 := ma.StringCast("/ip4/127.0.0.1/udp/2/webrtc-direct")
774+
775+
tcp1 := ma.StringCast("/ip4/127.0.0.1/tcp/1")
776+
ws1 := ma.StringCast("/ip4/127.0.0.1/tcp/1/ws")
777+
778+
tests := [][]struct{ secondary, primary ma.Multiaddr }{
779+
{
780+
{webTransport1, quic1},
781+
{webRTC1, quic1},
782+
},
783+
{
784+
{webTransport1, quic1},
785+
{webRTC1, quic1},
786+
{webTransport2, quic2},
787+
{webRTC2, quic2},
788+
},
789+
{
790+
{webTransport1, quic1},
791+
{webRTC1, quic1},
792+
{webTransport2, quic2},
793+
{webRTC2, quic2},
794+
{ws1, tcp1},
795+
},
796+
{
797+
{webTransport1, nil},
798+
{quic2, nil},
799+
{ws1, nil},
800+
},
801+
}
802+
for i, tt := range tests {
803+
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
804+
statuses := make(map[string]*addrStatus)
805+
for _, p := range tt {
806+
if p.primary != nil {
807+
statuses[string(p.primary.Bytes())] = &addrStatus{Addr: p.primary}
808+
}
809+
statuses[string(p.secondary.Bytes())] = &addrStatus{Addr: p.secondary}
810+
}
811+
assignPrimaryAddrs(statuses)
812+
for _, p := range tt {
813+
if p.primary != nil {
814+
require.Nil(t, statuses[string(p.primary.Bytes())].primary)
815+
require.Equal(t, statuses[string(p.secondary.Bytes())].primary, statuses[string(p.primary.Bytes())])
816+
} else {
817+
require.Nil(t, statuses[string(p.secondary.Bytes())].primary)
818+
}
819+
}
820+
})
821+
}
822+
823+
}
824+
723825
func BenchmarkAddrTracker(b *testing.B) {
724826
cl := clock.NewMock()
725827
t := newProbeManager(cl.Now)

0 commit comments

Comments
 (0)