Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
caa37cc
reprovide sweep draft
guillaumemichel Jun 10, 2025
302cfa8
update reprovider dep
guillaumemichel Jun 11, 2025
1c9e27b
go mod tidy
guillaumemichel Jun 11, 2025
f406fde
fix provider type
guillaumemichel Jun 11, 2025
7b11dfc
change router type
guillaumemichel Jun 11, 2025
a60eaee
dual reprovider
guillaumemichel Jun 11, 2025
456c69b
revert to provider.System
guillaumemichel Jun 11, 2025
b62ac64
back to start
guillaumemichel Jun 11, 2025
99f0550
SweepingReprovider test
guillaumemichel Jun 12, 2025
d767715
fix nil pointer deref
guillaumemichel Jun 12, 2025
92710b3
noop provider for nil dht
guillaumemichel Jun 12, 2025
c2d6065
disabled initial network estimation
guillaumemichel Jun 12, 2025
4333fcf
another iteration
guillaumemichel Jun 12, 2025
b8fdd9d
suppress missing self addrs err
guillaumemichel Jun 12, 2025
a7f0dd0
silence empty rt err on lan dht
guillaumemichel Jun 12, 2025
45b7887
comments
guillaumemichel Jun 13, 2025
6644657
new attempt at integrating
guillaumemichel Jun 24, 2025
7b7e528
reverting changes in core/node/libp2p/routing.go
guillaumemichel Jun 24, 2025
f7ab84f
removing SweepingProvider
guillaumemichel Jun 24, 2025
02f979d
make reprovider optional
guillaumemichel Jun 24, 2025
4025316
add noop reprovider
guillaumemichel Jun 24, 2025
2dfcd3d
update KeyChanFunc type alias
guillaumemichel Jun 24, 2025
d7d3e2b
restore boxo KeyChanFunc
guillaumemichel Jun 24, 2025
d13eaa6
fix missing KeyChanFunc
guillaumemichel Jun 24, 2025
4829261
test(sharness): PARALLEL=1 and timeout 30m
lidel Jun 24, 2025
5a57c4f
initialize MHStore
guillaumemichel Jun 25, 2025
bf5125a
revert workflow debug
guillaumemichel Jun 25, 2025
82d6fee
config
guillaumemichel Jun 25, 2025
d140885
config docs
guillaumemichel Jun 25, 2025
d67173a
merged IpfsNode provider and reprovider
guillaumemichel Jul 8, 2025
688944a
move Provider interface to from kad-dht to node
guillaumemichel Jul 8, 2025
107a0bb
moved Provider interface from kad-dht to kubo/core/node
guillaumemichel Jul 8, 2025
9531315
mod_tidy
gammazero Aug 12, 2025
5ad8e3a
Add Clear to Provider interface
gammazero Aug 12, 2025
49f74e8
use latest kad-dht commit
guillaumemichel Aug 12, 2025
b650b9f
make linter happy
guillaumemichel Aug 12, 2025
6788e6e
updated boxo provide interface
guillaumemichel Aug 12, 2025
9764140
boxo PR fix
guillaumemichel Aug 12, 2025
c571318
using latest kad-dht commit
guillaumemichel Aug 20, 2025
1a389ff
use latest boxo release
guillaumemichel Aug 20, 2025
55df232
Merge branch 'master' into reprovide-sweep
guillaumemichel Aug 20, 2025
a44d3f5
fix fx
guillaumemichel Aug 20, 2025
0718927
fx cyclic deps
guillaumemichel Aug 20, 2025
c9fbe62
fix merge issues
guillaumemichel Aug 20, 2025
dc2bb67
extended tests
guillaumemichel Aug 21, 2025
814085f
Merge branch 'master' into reprovide-sweep
guillaumemichel Aug 22, 2025
7903873
don't provide LAN DHT
guillaumemichel Aug 22, 2025
812f7fc
docs
guillaumemichel Aug 22, 2025
aaba9b4
Merge branch 'master' into reprovide-sweep
guillaumemichel Aug 22, 2025
0dbf157
restore dual dht provider
guillaumemichel Aug 22, 2025
4aadf79
don't start provider before it is online
guillaumemichel Aug 27, 2025
0c09e67
address linter
guillaumemichel Aug 27, 2025
1f668b9
dual/provider fix
guillaumemichel Aug 27, 2025
28d3ef1
add delay in provider tests for dht bootstrap
guillaumemichel Aug 27, 2025
ec952cf
add OfflineDelay parameter to config
guillaumemichel Aug 27, 2025
a5d3866
remove increase number of workers in test
guillaumemichel Aug 27, 2025
138a866
Merge branch 'master' into reprovide-sweep
guillaumemichel Aug 27, 2025
b540fba
improved keystore gc process
guillaumemichel Sep 1, 2025
4bf892d
fix: replace incorrect logger import in coreapi
lidel Sep 1, 2025
2d2eb5d
fix: remove duplicate WithDefault call in provider config
lidel Sep 1, 2025
f51ee49
fix: use correct option method for burst workers
lidel Sep 1, 2025
3300818
fix: improve error messages for experimental sweeping provider
lidel Sep 1, 2025
a287541
docs: remove obsolete KeyStoreGCInterval config
lidel Sep 1, 2025
00121ce
docs: add TODO placeholder changelog for experimental sweeping DHT pr…
lidel Sep 1, 2025
4f17c53
Merge remote-tracking branch 'origin/master' into reprovide-sweep
lidel Sep 1, 2025
e71bd8c
fix: provideKeysRec go routine
guillaumemichel Sep 2, 2025
9315679
clear keystore on close
guillaumemichel Sep 2, 2025
2b566a1
fix: datastore prefix
guillaumemichel Sep 2, 2025
2c3d1c8
fix: improve error handling in provideKeysRec
lidel Sep 2, 2025
98f4f90
address gammazero's review
guillaumemichel Sep 4, 2025
80c22f0
rename BurstProvider to LegacyProvider
guillaumemichel Sep 5, 2025
2ce2762
use latest provider/keystore
guillaumemichel Sep 5, 2025
1981a63
enable reprovide sweep by default
guillaumemichel Sep 5, 2025
643a544
Merge branch 'master' into reprovide-sweep
guillaumemichel Sep 8, 2025
72b66b3
boxo: make mfs StartProviding async
guillaumemichel Sep 8, 2025
1719d77
Merge branch 'reprovide-sweep' into reprovide-sweep-enabled
guillaumemichel Sep 8, 2025
7c8e299
not supplying DHT shouldn't make fx fail
guillaumemichel Sep 8, 2025
86bbdd3
bump boxo
guillaumemichel Sep 9, 2025
306624d
Merge branch 'master' into reprovide-sweep
guillaumemichel Sep 9, 2025
27a266a
merge: resolve conflicts with master
lidel Sep 9, 2025
dbe9fff
chore: update boxo to f2b4e12fb9a8ac138ccb82aae3b51ec51d9f631c
lidel Sep 9, 2025
20e4570
use latest kad-dht/boxo
guillaumemichel Sep 14, 2025
6919d40
Buffered SweepingProvider wrapper
guillaumemichel Sep 14, 2025
cfb6e22
Merge branch 'reprovide-sweep' into reprovide-sweep-enabled
guillaumemichel Sep 14, 2025
d77ce2d
use latest kad-dht commit
guillaumemichel Sep 17, 2025
35bffb8
allow no DHT router
guillaumemichel Sep 17, 2025
e1601f3
Merge branch 'reprovide-sweep' into reprovide-sweep-enabled
guillaumemichel Sep 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/sharness.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ on:
workflow_dispatch:
pull_request:
paths-ignore:
- '**/*.md'
- "**/*.md"
push:
branches:
- 'master'
- "master"

concurrency:
group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event_name == 'push' && github.sha || github.ref }}
Expand Down
25 changes: 25 additions & 0 deletions config/reprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ import (
const (
DefaultReproviderInterval = time.Hour * 22 // https://github.com/ipfs/kubo/pull/9326
DefaultReproviderStrategy = "all"

DefaultReproviderSweepEnabled = true
DefaultReproviderSweepMaxWorkers = 4
DefaultReproviderSweepDedicatedPeriodicWorkers = 2
DefaultReproviderSweepDedicatedBurstWorkers = 1
DefaultReproviderSweepMaxProvideConnsPerWorker = 16
DefaultReproviderSweepKeyStoreBatchSize = 1 << 14 // ~544 KiB per batch (1 multihash = 34 bytes)
DefaultReproviderSweepOfflineDelay = 2 * time.Hour
)

type ReproviderStrategy int
Expand All @@ -24,6 +32,23 @@ const (
type Reprovider struct {
Interval *OptionalDuration `json:",omitempty"` // Time period to reprovide locally stored objects to the network
Strategy *OptionalString `json:",omitempty"` // Which keys to announce

Sweep Sweep
}

// Sweep configuration describes how the Sweeping Reprovider is configured if enabled.
type Sweep struct {
Enabled Flag `json:",omitempty"`

MaxWorkers *OptionalInteger // Max number of concurrent workers performing a provide operation.
DedicatedPeriodicWorkers *OptionalInteger // Number of workers dedicated to periodic reprovides.
DedicatedBurstWorkers *OptionalInteger // Number of workers dedicated to initial provides or burst reproviding keyspace regions after a period of inactivity.
MaxProvideConnsPerWorker *OptionalInteger // Number of connections that a worker is able to open to send provider records during a (re)provide operation.

KeyStoreGCInterval *OptionalDuration // Interval for garbage collection in KeyStore.
KeyStoreBatchSize *OptionalInteger // Number of multihashes to keep in memory when gc'ing the KeyStore.

OfflineDelay *OptionalDuration // Delay after which the provides changes state from Disconnected to Offline.
}

func ParseReproviderStrategy(s string) ReproviderStrategy {
Expand Down
8 changes: 7 additions & 1 deletion core/commands/provide.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package commands

import (
"errors"
"fmt"
"io"
"text/tabwriter"
Expand Down Expand Up @@ -118,7 +119,12 @@ This interface is not stable and may change from release to release.
return ErrNotOnline
}

stats, err := nd.Provider.Stat()
provideSys, ok := nd.Provider.(provider.System)
if !ok {
return errors.New("stats not available with experimental sweeping provider (Reprovider.Sweep.Enabled=true)")
}

stats, err := provideSys.Stat()
if err != nil {
return err
}
Expand Down
42 changes: 17 additions & 25 deletions core/commands/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (

"github.com/ipfs/kubo/config"
cmdenv "github.com/ipfs/kubo/core/commands/cmdenv"
"github.com/ipfs/kubo/core/node"
mh "github.com/multiformats/go-multihash"

dag "github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/boxo/ipns"
Expand Down Expand Up @@ -207,9 +209,9 @@ var provideRefRoutingCmd = &cmds.Command{
go func() {
defer cancel()
if rec {
provideErr = provideKeysRec(ctx, nd.Routing, nd.DAG, cids)
provideErr = provideCidsRec(ctx, nd.Provider, nd.DAG, cids)
} else {
provideErr = provideKeys(ctx, nd.Routing, cids)
provideErr = provideCids(nd.Provider, cids)
}
if provideErr != nil {
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Expand Down Expand Up @@ -274,8 +276,12 @@ Trigger reprovider to announce our data to network.
if cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval) == 0 {
return errors.New("invalid configuration: Reprovider.Interval is set to '0'")
}
provideSys, ok := nd.Provider.(*node.LegacyProvider)
if !ok {
return errors.New("manual reprovide not available with experimental sweeping provider (Reprovider.Sweep.Enabled=true)")
}

err = nd.Provider.Reprovide(req.Context)
err = provideSys.Reprovide(req.Context)
if err != nil {
return err
}
Expand All @@ -284,39 +290,25 @@ Trigger reprovider to announce our data to network.
},
}

func provideKeys(ctx context.Context, r routing.Routing, cids []cid.Cid) error {
for _, c := range cids {
err := r.Provide(ctx, c, true)
if err != nil {
return err
}
func provideCids(prov node.DHTProvider, cids []cid.Cid) error {
mhs := make([]mh.Multihash, len(cids))
for i, c := range cids {
mhs[i] = c.Hash()
}
return nil
return prov.StartProviding(true, mhs...)
}

func provideKeysRec(ctx context.Context, r routing.Routing, dserv ipld.DAGService, cids []cid.Cid) error {
provided := cid.NewSet()
func provideCidsRec(ctx context.Context, prov node.DHTProvider, dserv ipld.DAGService, cids []cid.Cid) error {
for _, c := range cids {
kset := cid.NewSet()

err := dag.Walk(ctx, dag.GetLinksDirect(dserv), c, kset.Visit)
if err != nil {
return err
}

for _, k := range kset.Keys() {
if provided.Has(k) {
continue
}

err = r.Provide(ctx, k, true)
if err != nil {
return err
}
provided.Add(k)
if err = provideCids(prov, kset.Keys()); err != nil {
return err
}
}

return nil
}

Expand Down
42 changes: 21 additions & 21 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,31 +92,31 @@ type IpfsNode struct {
RecordValidator record.Validator

// Online
PeerHost p2phost.Host `optional:"true"` // the network host (server+client)
Peering *peering.PeeringService `optional:"true"`
Filters *ma.Filters `optional:"true"`
Bootstrapper io.Closer `optional:"true"` // the periodic bootstrapper
Routing irouting.ProvideManyRouter `optional:"true"` // the routing system. recommend ipfs-dht
ContentDiscovery routing.ContentDiscovery `optional:"true"` // the discovery part of the routing system
DNSResolver *madns.Resolver // the DNS resolver
IPLDPathResolver pathresolver.Resolver `name:"ipldPathResolver"` // The IPLD path resolver
UnixFSPathResolver pathresolver.Resolver `name:"unixFSPathResolver"` // The UnixFS path resolver
OfflineIPLDPathResolver pathresolver.Resolver `name:"offlineIpldPathResolver"` // The IPLD path resolver that uses only locally available blocks
OfflineUnixFSPathResolver pathresolver.Resolver `name:"offlineUnixFSPathResolver"` // The UnixFS path resolver that uses only locally available blocks
Exchange exchange.Interface // the block exchange + strategy
Bitswap *bitswap.Bitswap `optional:"true"` // The Bitswap instance
Namesys namesys.NameSystem // the name system, resolves paths to hashes
Provider provider.System // the value provider system
ProvidingStrategy config.ReproviderStrategy `optional:"true"`
ProvidingKeyChanFunc provider.KeyChanFunc `optional:"true"`
IpnsRepub *ipnsrp.Republisher `optional:"true"`
ResourceManager network.ResourceManager `optional:"true"`
PeerHost p2phost.Host `optional:"true"` // the network host (server+client)
Peering *peering.PeeringService `optional:"true"`
Filters *ma.Filters `optional:"true"`
Bootstrapper io.Closer `optional:"true"` // the periodic bootstrapper
ContentDiscovery routing.ContentDiscovery `optional:"true"` // the discovery part of the routing system
DNSResolver *madns.Resolver // the DNS resolver
IPLDPathResolver pathresolver.Resolver `name:"ipldPathResolver"` // The IPLD path resolver
UnixFSPathResolver pathresolver.Resolver `name:"unixFSPathResolver"` // The UnixFS path resolver
OfflineIPLDPathResolver pathresolver.Resolver `name:"offlineIpldPathResolver"` // The IPLD path resolver that uses only locally available blocks
OfflineUnixFSPathResolver pathresolver.Resolver `name:"offlineUnixFSPathResolver"` // The UnixFS path resolver that uses only locally available blocks
Exchange exchange.Interface // the block exchange + strategy
Bitswap *bitswap.Bitswap `optional:"true"` // The Bitswap instance
Namesys namesys.NameSystem // the name system, resolves paths to hashes
ProvidingStrategy config.ReproviderStrategy `optional:"true"`
ProvidingKeyChanFunc provider.KeyChanFunc `optional:"true"`
IpnsRepub *ipnsrp.Republisher `optional:"true"`
ResourceManager network.ResourceManager `optional:"true"`

PubSub *pubsub.PubSub `optional:"true"`
PSRouter *psrouter.PubsubValueStore `optional:"true"`

DHT *ddht.DHT `optional:"true"`
DHTClient routing.Routing `name:"dhtc" optional:"true"`
Routing irouting.ProvideManyRouter `optional:"true"` // the routing system. recommend ipfs-dht
Provider node.DHTProvider // the value provider system
DHT *ddht.DHT `optional:"true"`
DHTClient routing.Routing `name:"dhtc" optional:"true"`

P2P *p2p.P2P `optional:"true"`

Expand Down
6 changes: 1 addition & 5 deletions core/coreapi/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ import (
dag "github.com/ipfs/boxo/ipld/merkledag"
pathresolver "github.com/ipfs/boxo/path/resolver"
pin "github.com/ipfs/boxo/pinning/pinner"
provider "github.com/ipfs/boxo/provider"
offlineroute "github.com/ipfs/boxo/routing/offline"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/kubo/config"
coreiface "github.com/ipfs/kubo/core/coreiface"
"github.com/ipfs/kubo/core/coreiface/options"
Expand All @@ -45,8 +43,6 @@ import (
"github.com/ipfs/kubo/repo"
)

var log = logging.Logger("coreapi")

type CoreAPI struct {
nctx context.Context

Expand All @@ -73,7 +69,7 @@ type CoreAPI struct {
ipldPathResolver pathresolver.Resolver
unixFSPathResolver pathresolver.Resolver

provider provider.System
provider node.DHTProvider
providingStrategy config.ReproviderStrategy

pubSub *pubsub.PubSub
Expand Down
74 changes: 49 additions & 25 deletions core/coreapi/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import (
cidutil "github.com/ipfs/go-cidutil"
coreiface "github.com/ipfs/kubo/core/coreiface"
caopts "github.com/ipfs/kubo/core/coreiface/options"
"github.com/ipfs/kubo/core/node"
"github.com/ipfs/kubo/tracing"
peer "github.com/libp2p/go-libp2p/core/peer"
routing "github.com/libp2p/go-libp2p/core/routing"
mh "github.com/multiformats/go-multihash"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
Expand Down Expand Up @@ -148,9 +149,9 @@ func (api *RoutingAPI) Provide(ctx context.Context, path path.Path, opts ...caop
}

if settings.Recursive {
err = provideKeysRec(ctx, api.routing, api.blockstore, []cid.Cid{c})
err = provideKeysRec(ctx, api.provider, api.blockstore, []cid.Cid{c})
} else {
err = provideKeys(ctx, api.routing, []cid.Cid{c})
err = api.provider.StartProviding(false, c.Hash())
}
if err != nil {
return err
Expand All @@ -159,41 +160,64 @@ func (api *RoutingAPI) Provide(ctx context.Context, path path.Path, opts ...caop
return nil
}

func provideKeys(ctx context.Context, r routing.Routing, cids []cid.Cid) error {
for _, c := range cids {
err := r.Provide(ctx, c, true)
if err != nil {
return err
}
}
return nil
}

func provideKeysRec(ctx context.Context, r routing.Routing, bs blockstore.Blockstore, cids []cid.Cid) error {
func provideKeysRec(ctx context.Context, prov node.DHTProvider, bs blockstore.Blockstore, cids []cid.Cid) error {
provided := cidutil.NewStreamingSet()

errCh := make(chan error)
// Error channel with buffer size 1 to avoid blocking the goroutine
errCh := make(chan error, 1)
go func() {
// Always close provided.New to signal completion
defer close(provided.New)
// Also close error channel to distinguish between "no error" and "pending error"
defer close(errCh)

dserv := dag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
for _, c := range cids {
err := dag.Walk(ctx, dag.GetLinksDirect(dserv), c, provided.Visitor(ctx))
if err != nil {
errCh <- err
if err := dag.Walk(ctx, dag.GetLinksDirect(dserv), c, provided.Visitor(ctx)); err != nil {
// Send error to channel. If context is cancelled while trying to send,
// exit immediately as the main loop will return ctx.Err()
select {
case errCh <- err:
// Error sent successfully, exit goroutine
case <-ctx.Done():
// Context cancelled, exit without sending error
return
}
return
}
}
// All CIDs walked successfully, goroutine will exit and channels will close
}()

keys := make([]mh.Multihash, 0)
for {
select {
case k := <-provided.New:
err := r.Provide(ctx, k, true)
if err != nil {
return err
}
case err := <-errCh:
return err
case <-ctx.Done():
// Context cancelled, return immediately
return ctx.Err()
case err := <-errCh:
// Received error from DAG walk, return it
return err
case c, ok := <-provided.New:
if !ok {
// Channel closed means goroutine finished.
// CRITICAL: Check for any error that was sent just before channel closure.
// This handles the race where error is sent to errCh but main loop
// sees provided.New close first.
select {
case err := <-errCh:
if err != nil {
return err
}
// errCh closed with nil, meaning success
default:
// No pending error in errCh
}
// All CIDs successfully processed, start providing
return prov.StartProviding(true, keys...)
}
// Accumulate the CID for providing
keys = append(keys, c.Hash())
}
}
}
Expand Down
Loading
Loading