Skip to content

Commit 4801766

Browse files
committed
basichost: improve autonatv2 reachability logic
This improves the reachability detection logic by introducing the concept of primary and secondary addresses. If we have a webtransport address which shares the IP and Port with a QUIC address, the WebTransport address will be considered secondary and the QUIC address will be considered primary. If the Primary is reachable or unreachable, we require only one confirmation for the Secondary address. This speeds up address verification considerably.
1 parent 31f527d commit 4801766

File tree

2 files changed

+311
-35
lines changed

2 files changed

+311
-35
lines changed

p2p/host/basic/addrs_reachability_tracker.go

Lines changed: 171 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -361,9 +361,15 @@ const (
361361
// and then a success(...S S S S F S). The confidence in the targetConfidence window will be equal to
362362
// targetConfidence, the last F and S cancel each other, and we won't probe again for maxProbeInterval.
363363
maxRecentDialsWindow = targetConfidence + 2
364+
// secondaryAddrsScalingFactor is the multiplier applied to secondary address dial outcomes. For secondary
365+
// addr, if the primary addr is reachable, a single successful dial is enough to consider the secondary addr
366+
// reachable.
367+
secondaryAddrsScalingFactor = targetConfidence
364368
// highConfidenceAddrProbeInterval is the maximum interval between probes for an address
365369
highConfidenceAddrProbeInterval = 1 * time.Hour
366-
// maxProbeResultTTL is the maximum time to keep probe results for an address
370+
// highConfidenceSecondaryAddrProbeInterval is the maximum interval between probes for an address
371+
highConfidenceSecondaryAddrProbeInterval = 3 * time.Hour
372+
// maxProbeResultTTL is the maximum time to keep probe results for a primary address
367373
maxProbeResultTTL = maxRecentDialsWindow * highConfidenceAddrProbeInterval
368374
)
369375

@@ -380,7 +386,8 @@ type probeManager struct {
380386
inProgressProbes map[string]int // addr -> count
381387
inProgressProbesTotal int
382388
statuses map[string]*addrStatus
383-
addrs []ma.Multiaddr
389+
primaryAddrs []ma.Multiaddr
390+
secondaryAddrs []ma.Multiaddr
384391
}
385392

386393
// newProbeManager creates a new probe manager.
@@ -397,7 +404,20 @@ func (m *probeManager) AppendConfirmedAddrs(reachable, unreachable, unknown []ma
397404
m.mx.Lock()
398405
defer m.mx.Unlock()
399406

400-
for _, a := range m.addrs {
407+
for _, a := range m.primaryAddrs {
408+
s := m.statuses[string(a.Bytes())]
409+
s.RemoveBefore(m.now().Add(-maxProbeResultTTL)) // cleanup stale results
410+
switch s.Reachability() {
411+
case network.ReachabilityPublic:
412+
reachable = append(reachable, a)
413+
case network.ReachabilityPrivate:
414+
unreachable = append(unreachable, a)
415+
case network.ReachabilityUnknown:
416+
unknown = append(unknown, a)
417+
}
418+
}
419+
420+
for _, a := range m.secondaryAddrs {
401421
s := m.statuses[string(a.Bytes())]
402422
s.RemoveBefore(m.now().Add(-maxProbeResultTTL)) // cleanup stale results
403423
switch s.Reachability() {
@@ -425,9 +445,20 @@ func (m *probeManager) UpdateAddrs(addrs []ma.Multiaddr) {
425445
statuses[k] = &addrStatus{Addr: addr}
426446
} else {
427447
statuses[k] = m.statuses[k]
448+
// our addresses have changed, we may have removed the primary address
449+
statuses[k].primary = nil
450+
}
451+
}
452+
assignPrimaryAddrs(statuses)
453+
m.primaryAddrs = m.primaryAddrs[:0]
454+
m.secondaryAddrs = m.secondaryAddrs[:0]
455+
for _, a := range addrs {
456+
if statuses[string(a.Bytes())].primary == nil {
457+
m.primaryAddrs = append(m.primaryAddrs, a)
458+
} else {
459+
m.secondaryAddrs = append(m.secondaryAddrs, a)
428460
}
429461
}
430-
m.addrs = addrs
431462
m.statuses = statuses
432463
}
433464

@@ -438,33 +469,70 @@ func (m *probeManager) GetProbe() probe {
438469
m.mx.Lock()
439470
defer m.mx.Unlock()
440471

472+
/*
473+
- First, select the first address for the probe. The assumption is that this is the
474+
address which will be dialled.
475+
- Then, we fill the rest of the addresses in the probe while trying to ensure diversity.
476+
*/
441477
now := m.now()
442-
for i, a := range m.addrs {
443-
ab := a.Bytes()
444-
pc := m.statuses[string(ab)].RequiredProbeCount(now)
445-
if m.inProgressProbes[string(ab)] >= pc {
478+
// first check if the probe's first address is a primary address
479+
idx, ok := m.getFirstProbeAddrIdx(m.primaryAddrs, now)
480+
var reqs probe
481+
if ok {
482+
reqs = make(probe, 0, maxAddrsPerRequest)
483+
reqs = append(reqs, autonatv2.Request{Addr: m.primaryAddrs[idx], SendDialData: true})
484+
reqs = m.appendRequestsToProbe(reqs, m.primaryAddrs, idx, true, now)
485+
reqs = m.appendRequestsToProbe(reqs, m.secondaryAddrs, 0, false, now)
486+
} else {
487+
// no primary addresses available, try secondary.
488+
idx, ok := m.getFirstProbeAddrIdx(m.secondaryAddrs, now)
489+
if !ok {
490+
return nil
491+
}
492+
reqs = make(probe, 0, maxAddrsPerRequest)
493+
reqs = append(reqs, autonatv2.Request{Addr: m.secondaryAddrs[idx], SendDialData: true})
494+
reqs = m.appendRequestsToProbe(reqs, m.primaryAddrs, 0, false, now)
495+
reqs = m.appendRequestsToProbe(reqs, m.secondaryAddrs, idx, true, now)
496+
}
497+
498+
if len(reqs) >= maxAddrsPerRequest {
499+
reqs = reqs[:maxAddrsPerRequest]
500+
}
501+
return reqs
502+
}
503+
504+
// getFirstProbeAddrIdx returns the idx of the probe's first address
505+
func (m *probeManager) getFirstProbeAddrIdx(addrs []ma.Multiaddr, now time.Time) (int, bool) {
506+
for i, a := range addrs {
507+
s := m.statuses[string(a.Bytes())]
508+
pc := s.RequiredProbeCount(now)
509+
if pc == 0 || m.inProgressProbes[string(addrs[i].Bytes())] >= pc {
446510
continue
447511
}
448-
reqs := make(probe, 0, maxAddrsPerRequest)
449-
reqs = append(reqs, autonatv2.Request{Addr: a, SendDialData: true})
450-
// We have the first(primary) address. Append other addresses, ignoring inprogress probes
451-
// on secondary addresses. The expectation is that the primary address will
452-
// be dialed.
453-
for j := 1; j < len(m.addrs); j++ {
454-
k := (i + j) % len(m.addrs)
455-
ab := m.addrs[k].Bytes()
456-
pc := m.statuses[string(ab)].RequiredProbeCount(now)
457-
if pc == 0 {
458-
continue
459-
}
460-
reqs = append(reqs, autonatv2.Request{Addr: m.addrs[k], SendDialData: true})
461-
if len(reqs) >= maxAddrsPerRequest {
462-
break
463-
}
512+
return i, true
513+
}
514+
return -1, false
515+
}
516+
517+
// TODO: Refactor this
518+
func (m *probeManager) appendRequestsToProbe(reqs probe, addrs []ma.Multiaddr, st int, skipStart bool, now time.Time) probe {
519+
n := len(addrs)
520+
for j := range n {
521+
k := (j + st) % n // We start from index: st
522+
if skipStart && k == st {
523+
continue
524+
}
525+
s := m.statuses[string(addrs[k].Bytes())]
526+
pc := s.RequiredProbeCount(now)
527+
if pc == 0 {
528+
continue
529+
}
530+
reqs = append(reqs, autonatv2.Request{Addr: addrs[k], SendDialData: true})
531+
if len(reqs) >= maxAddrsPerRequest {
532+
break
464533
}
465-
return reqs
466534
}
467-
return nil
535+
return reqs
468536
}
469537

470538
// MarkProbeInProgress should be called when a probe is started.
@@ -499,10 +567,10 @@ func (m *probeManager) CompleteProbe(reqs probe, res autonatv2.Result, err error
499567
defer m.mx.Unlock()
500568

501569
// decrement in-progress count for the first address
502-
primaryAddrKey := string(reqs[0].Addr.Bytes())
503-
m.inProgressProbes[primaryAddrKey]--
504-
if m.inProgressProbes[primaryAddrKey] <= 0 {
505-
delete(m.inProgressProbes, primaryAddrKey)
570+
firstAddrKey := string(reqs[0].Addr.Bytes())
571+
m.inProgressProbes[firstAddrKey]--
572+
if m.inProgressProbes[firstAddrKey] <= 0 {
573+
delete(m.inProgressProbes, firstAddrKey)
506574
}
507575
m.inProgressProbesTotal--
508576

@@ -511,17 +579,17 @@ func (m *probeManager) CompleteProbe(reqs probe, res autonatv2.Result, err error
511579
return
512580
}
513581

514-
// Consider only primary address as refused. This increases the number of
582+
// Consider only first address as refused. This increases the number of
515583
// refused probes, but refused probes are cheap for a server as no dials are made.
516584
if res.AllAddrsRefused {
517-
if s, ok := m.statuses[primaryAddrKey]; ok {
585+
if s, ok := m.statuses[firstAddrKey]; ok {
518586
s.AddRefusal(now)
519587
}
520588
return
521589
}
522590
dialAddrKey := string(res.Addr.Bytes())
523-
if dialAddrKey != primaryAddrKey {
524-
if s, ok := m.statuses[primaryAddrKey]; ok {
591+
if dialAddrKey != firstAddrKey {
592+
if s, ok := m.statuses[firstAddrKey]; ok {
525593
s.AddRefusal(now)
526594
}
527595
}
@@ -539,6 +607,7 @@ type dialOutcome struct {
539607

540608
type addrStatus struct {
541609
Addr ma.Multiaddr
610+
primary *addrStatus
542611
lastRefusalTime time.Time
543612
consecutiveRefusals int
544613
dialTimes []time.Time
@@ -587,7 +656,8 @@ func (s *addrStatus) requiredProbeCountForConfirmation(now time.Time) int {
587656
}
588657
lastOutcome := s.outcomes[len(s.outcomes)-1]
589658
// If the last probe result is old, we need to retest
590-
if now.Sub(lastOutcome.At) > highConfidenceAddrProbeInterval {
659+
if d := now.Sub(lastOutcome.At); (s.primary == nil && d > highConfidenceAddrProbeInterval) ||
660+
(d > highConfidenceSecondaryAddrProbeInterval) {
591661
return 1
592662
}
593663
// if the last probe result was different from reachability, probe again.
@@ -670,6 +740,15 @@ func (s *addrStatus) reachabilityAndCounts() (rch network.Reachability, successe
670740
failures++
671741
}
672742
}
743+
if s.primary != nil {
744+
prch, _, _ := s.primary.reachabilityAndCounts()
745+
switch prch {
746+
case network.ReachabilityPublic:
747+
successes *= secondaryAddrsScalingFactor
748+
case network.ReachabilityPrivate:
749+
failures *= secondaryAddrsScalingFactor
750+
}
751+
}
673752
if successes-failures >= minConfidence {
674753
return network.ReachabilityPublic, successes, failures
675754
}
@@ -678,3 +757,60 @@ func (s *addrStatus) reachabilityAndCounts() (rch network.Reachability, successe
678757
}
679758
return network.ReachabilityUnknown, successes, failures
680759
}
760+
761+
var errNotTW = errors.New("not a thinwaist address")
762+
763+
func thinWaistPart(a ma.Multiaddr) (ma.Multiaddr, error) {
764+
if len(a) < 2 {
765+
return nil, errNotTW
766+
}
767+
if c0, c1 := a[0].Code(), a[1].Code(); (c0 != ma.P_IP4 && c0 != ma.P_IP6) || (c1 != ma.P_TCP && c1 != ma.P_UDP) {
768+
return nil, errNotTW
769+
}
770+
return a[:2], nil
771+
}
772+
773+
func assignPrimaryAddrs(statuses map[string]*addrStatus) {
774+
twMap := make(map[string][]ma.Multiaddr, len(statuses))
775+
for _, s := range statuses {
776+
twp, err := thinWaistPart(s.Addr)
777+
if err != nil {
778+
continue
779+
}
780+
twMap[string(twp.Bytes())] = append(twMap[string(twp.Bytes())], s.Addr)
781+
}
782+
783+
score := func(a ma.Multiaddr) int {
784+
score := 0
785+
for _, p := range a {
786+
switch p.Code() {
787+
case ma.P_QUIC_V1, ma.P_TCP:
788+
score += 1
789+
case ma.P_WEBTRANSPORT:
790+
score += 1 << 1
791+
case ma.P_WEBRTC:
792+
score += 1 << 2
793+
case ma.P_WS, ma.P_WSS:
794+
score += 1 << 3
795+
}
796+
}
797+
if score == 0 {
798+
return 1 << 20
799+
}
800+
return score
801+
}
802+
for _, addrs := range twMap {
803+
if len(addrs) <= 1 {
804+
continue
805+
}
806+
slices.SortFunc(addrs, func(a, b ma.Multiaddr) int {
807+
return score(a) - score(b)
808+
})
809+
primary := addrs[0]
810+
ps := statuses[string(primary.Bytes())]
811+
for _, a := range addrs[1:] {
812+
s := statuses[string(a.Bytes())]
813+
s.primary = ps
814+
}
815+
}
816+
}

0 commit comments

Comments
 (0)