Skip to content

Commit b51855d

Browse files
committed
refactor: Separate packages to build p2p protocol ID and DHT
1 parent 49631b8 commit b51855d

File tree

12 files changed

+208
-104
lines changed

12 files changed

+208
-104
lines changed

consensus/p2p/buffered/buffered_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ const (
3232
maxWait = 30 * time.Second
3333
)
3434

35+
var network = &utils.Mainnet
36+
3537
type TestMessage = consensus.ConsensusStreamId
3638

3739
type origin struct {
@@ -45,7 +47,7 @@ func TestBufferedTopicSubscriptionAndProtoBroadcaster(t *testing.T) {
4547
require.NoError(t, err)
4648

4749
nodes := testutils.BuildNetworks(t, testutils.LineNetworkConfig(nodeCount))
48-
topics := nodes.JoinTopic(t, chainID, protocolID, topicName)
50+
topics := nodes.JoinTopic(t, network, protocolID, topicName)
4951

5052
messages := make([][]*TestMessage, nodeCount)
5153
allMessages := make(map[string]origin)
@@ -136,7 +138,7 @@ func TestBufferedTopicSubscriptionAndProtoBroadcaster(t *testing.T) {
136138
require.NoError(t, err)
137139

138140
nodes := testutils.BuildNetworks(t, testutils.NewAdjacentNodes(1))
139-
topics := nodes.JoinTopic(t, chainID, protocolID, topicName)
141+
topics := nodes.JoinTopic(t, network, protocolID, topicName)
140142
topic := topics[0]
141143

142144
ctx, cancel := context.WithCancel(t.Context())

consensus/p2p/p2p.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/NethermindEth/juno/consensus/starknet"
1414
"github.com/NethermindEth/juno/consensus/types"
1515
"github.com/NethermindEth/juno/p2p/pubsub"
16+
"github.com/NethermindEth/juno/p2p/starknetp2p"
1617
"github.com/NethermindEth/juno/service"
1718
"github.com/NethermindEth/juno/utils"
1819
libp2p "github.com/libp2p/go-libp2p-pubsub"
@@ -24,10 +25,9 @@ import (
2425
type topicName string
2526

2627
const (
27-
chainID = "1" // TODO: Make this configurable
28-
consensusProtocolID = "consensus"
29-
proposalTopicName topicName = "consensus_proposals"
30-
voteTopicName topicName = "consensus_votes"
28+
chainID = "1" // TODO: Make this configurable
29+
proposalTopicName topicName = "consensus_proposals"
30+
voteTopicName topicName = "consensus_votes"
3131
)
3232

3333
type P2P[V types.Hashable[H], H types.Hash, A types.Addr] interface {
@@ -40,6 +40,7 @@ type P2P[V types.Hashable[H], H types.Hash, A types.Addr] interface {
4040
type p2p[V types.Hashable[H], H types.Hash, A types.Addr] struct {
4141
host host.Host
4242
log utils.Logger
43+
network *utils.Network
4344
commitNotifier chan types.Height
4445
broadcasters Broadcasters[V, H, A]
4546
listeners Listeners[V, H, A]
@@ -122,11 +123,11 @@ func New(
122123
func (p *p2p[V, H, A]) Run(ctx context.Context) error {
123124
gossipSub, err := pubsub.Run(
124125
ctx,
125-
chainID,
126-
consensusProtocolID,
127126
p.host,
128-
p.pubSubQueueSize,
127+
p.network,
128+
starknetp2p.ConsensusProtocolID,
129129
p.bootstrapPeersFn,
130+
p.pubSubQueueSize,
130131
)
131132
if err != nil {
132133
return fmt.Errorf("unable to create gossipsub with error: %w", err)

consensus/p2p/validator/proposal_stream_demux_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,18 @@ const (
3838
firstHalfSize = 2
3939
)
4040

41+
var network = &utils.Mainnet
42+
4143
func TestProposalStreamDemux(t *testing.T) {
4244
logger, err := utils.NewZapLogger(utils.NewLogLevel(logLevel), true)
4345
require.NoError(t, err)
4446

4547
nodes := testutils.BuildNetworks(t, testutils.NewAdjacentNodes(1))
46-
topics := nodes.JoinTopic(t, chainID, protocolID, topicName)
48+
topics := nodes.JoinTopic(t, network, protocolID, topicName)
4749
topic := topics[0]
4850

4951
commitNotifier := make(chan types.Height)
5052

51-
network := &utils.SepoliaIntegration
5253
executor := NewMockExecutor(t, network)
5354
database := memory.New()
5455
bc := blockchain.New(database, network)

consensus/p2p/vote/vote_broadcasters_listeners_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ const (
3030
maxWait = 5 * time.Second
3131
)
3232

33+
var network = &utils.Mainnet
34+
3335
func TestVoteBroadcastersAndListeners(t *testing.T) {
3436
logger, err := utils.NewZapLogger(utils.NewLogLevel(logLevel), true)
3537
require.NoError(t, err)
@@ -51,7 +53,7 @@ func TestVoteBroadcastersAndListeners(t *testing.T) {
5153
pending := maps.Clone(voteSet)
5254

5355
nodes := testutils.BuildNetworks(t, testutils.LineNetworkConfig(2))
54-
topics := nodes.JoinTopic(t, chainID, protocolID, topicName)
56+
topics := nodes.JoinTopic(t, network, protocolID, topicName)
5557

5658
source := topics[0]
5759
destination := topics[1]

mempool/p2p/p2p.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/NethermindEth/juno/consensus/p2p/config"
99
"github.com/NethermindEth/juno/mempool"
1010
"github.com/NethermindEth/juno/p2p/pubsub"
11+
"github.com/NethermindEth/juno/p2p/starknetp2p"
1112
"github.com/NethermindEth/juno/utils"
1213
"github.com/libp2p/go-libp2p/core/host"
1314
"github.com/libp2p/go-libp2p/core/peer"
@@ -23,6 +24,7 @@ const (
2324
type P2P struct {
2425
host host.Host
2526
log utils.Logger
27+
network *utils.Network
2628
pool mempool.Pool
2729
broadcaster transactionBroadcaster
2830
listener buffered.TopicSubscription
@@ -41,6 +43,7 @@ func New(
4143
return &P2P{
4244
host: host,
4345
log: log,
46+
network: network,
4447
pool: pool,
4548
broadcaster: NewTransactionBroadcaster(log, config.MempoolBroadcaster, config.RetryInterval),
4649
listener: NewTransactionListener(network, log, pool, config.MempoolListener),
@@ -52,11 +55,11 @@ func New(
5255
func (p *P2P) Run(ctx context.Context) error {
5356
gossipSub, err := pubsub.Run(
5457
ctx,
55-
chainID,
56-
mempoolProtocolID,
5758
p.host,
58-
p.config.PubSubQueueSize,
59+
p.network,
60+
starknetp2p.MempoolProtocolID,
5961
p.bootstrapPeersFn,
62+
p.config.PubSubQueueSize,
6063
)
6164
if err != nil {
6265
return fmt.Errorf("unable to create gossipsub with error: %w", err)

p2p/dht/dht.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package dht
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
8+
"github.com/NethermindEth/juno/p2p/starknetp2p"
9+
"github.com/NethermindEth/juno/utils"
10+
dht "github.com/libp2p/go-libp2p-kad-dht"
11+
"github.com/libp2p/go-libp2p/core/host"
12+
"github.com/libp2p/go-libp2p/core/peer"
13+
)
14+
15+
func New(
16+
ctx context.Context,
17+
host host.Host,
18+
network *utils.Network,
19+
starknetProtocol starknetp2p.Protocol,
20+
bootstrapPeersFn func() []peer.AddrInfo,
21+
) (*dht.IpfsDHT, error) {
22+
return dht.New(
23+
ctx,
24+
host,
25+
append(
26+
starknetp2p.PubSub(network, starknetProtocol),
27+
dht.BootstrapPeersFunc(bootstrapPeersFn),
28+
dht.Mode(dht.ModeServer),
29+
)...,
30+
)
31+
}
32+
33+
func ExtractPeers(peers string) ([]peer.AddrInfo, error) {
34+
if peers == "" {
35+
return nil, nil
36+
}
37+
38+
peerAddrs := []peer.AddrInfo{}
39+
for peerStr := range strings.SplitSeq(peers, ",") {
40+
peerAddr, err := peer.AddrInfoFromString(peerStr)
41+
if err != nil {
42+
return nil, fmt.Errorf("unable to parse peer address %q: %w", peerStr, err)
43+
}
44+
peerAddrs = append(peerAddrs, *peerAddr)
45+
}
46+
return peerAddrs, nil
47+
}

p2p/p2p.go

Lines changed: 29 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,19 @@ import (
66
"errors"
77
"fmt"
88
"math/rand"
9-
"strings"
109
"time"
1110

1211
"github.com/Masterminds/semver/v3"
1312
"github.com/NethermindEth/juno/blockchain"
1413
"github.com/NethermindEth/juno/db"
14+
"github.com/NethermindEth/juno/p2p/dht"
1515
p2pPeers "github.com/NethermindEth/juno/p2p/peers"
16+
"github.com/NethermindEth/juno/p2p/starknetp2p"
1617
p2pSync "github.com/NethermindEth/juno/p2p/sync"
1718
junoSync "github.com/NethermindEth/juno/sync"
1819
"github.com/NethermindEth/juno/utils"
1920
"github.com/libp2p/go-libp2p"
20-
dht "github.com/libp2p/go-libp2p-kad-dht"
21+
libp2pdht "github.com/libp2p/go-libp2p-kad-dht"
2122
pubsub "github.com/libp2p/go-libp2p-pubsub"
2223
"github.com/libp2p/go-libp2p/core/crypto"
2324
"github.com/libp2p/go-libp2p/core/crypto/pb"
@@ -42,7 +43,7 @@ type Service struct {
4243
handler *p2pPeers.Handler
4344
log utils.SimpleLogger
4445

45-
dht *dht.IpfsDHT
46+
dht *libp2pdht.IpfsDHT
4647
pubsub *pubsub.PubSub
4748

4849
synchroniser *p2pSync.Service
@@ -116,29 +117,27 @@ func New(addr, publicAddr, version, peers, privKeyStr string, feederNode bool, b
116117
func NewWithHost(p2phost host.Host, peers string, feederNode bool, bc *blockchain.Blockchain, snNetwork *utils.Network,
117118
log utils.SimpleLogger, database db.KeyValueStore,
118119
) (*Service, error) {
119-
var (
120-
peersAddrInfoS []peer.AddrInfo
121-
err error
122-
)
123-
124-
peersAddrInfoS, err = loadPeers(database)
120+
peersAddrInfoS, err := loadPeers(database)
125121
if err != nil {
126122
log.Warnw("Failed to load peers", "err", err)
127123
}
128124

129-
if peers != "" {
130-
for peerStr := range strings.SplitSeq(peers, ",") {
131-
var peerAddr *peer.AddrInfo
132-
peerAddr, err = peer.AddrInfoFromString(peerStr)
133-
if err != nil {
134-
return nil, fmt.Errorf("addr info from %q: %w", peerStr, err)
135-
}
136-
137-
peersAddrInfoS = append(peersAddrInfoS, *peerAddr)
138-
}
125+
configuredPeers, err := dht.ExtractPeers(peers)
126+
if err != nil {
127+
return nil, fmt.Errorf("unable to extract peers: %w", err)
139128
}
140129

141-
p2pdht, err := makeDHT(p2phost, peersAddrInfoS)
130+
peersAddrInfoS = append(peersAddrInfoS, configuredPeers...)
131+
132+
p2pdht, err := dht.New(
133+
context.Background(),
134+
p2phost,
135+
snNetwork,
136+
starknetp2p.SyncProtocolID,
137+
func() []peer.AddrInfo {
138+
return peersAddrInfoS
139+
},
140+
)
142141
if err != nil {
143142
return nil, err
144143
}
@@ -160,15 +159,6 @@ func NewWithHost(p2phost host.Host, peers string, feederNode bool, bc *blockchai
160159
return s, nil
161160
}
162161

163-
func makeDHT(p2phost host.Host, addrInfos []peer.AddrInfo) (*dht.IpfsDHT, error) {
164-
return dht.New(context.Background(), p2phost,
165-
dht.ProtocolPrefix(p2pSync.Prefix),
166-
dht.BootstrapPeers(addrInfos...),
167-
dht.RoutingTableRefreshPeriod(routingTableRefreshPeriod),
168-
dht.Mode(dht.ModeServer),
169-
)
170-
}
171-
172162
func privateKey(privKeyStr string) (crypto.PrivKey, error) {
173163
if privKeyStr == "" {
174164
// Creates a new key pair for this host.
@@ -255,11 +245,11 @@ func (s *Service) Run(ctx context.Context) error {
255245
}
256246

257247
func (s *Service) setProtocolHandlers() {
258-
s.SetProtocolHandler(p2pSync.HeadersPID(), s.handler.HeadersHandler)
259-
s.SetProtocolHandler(p2pSync.EventsPID(), s.handler.EventsHandler)
260-
s.SetProtocolHandler(p2pSync.TransactionsPID(), s.handler.TransactionsHandler)
261-
s.SetProtocolHandler(p2pSync.ClassesPID(), s.handler.ClassesHandler)
262-
s.SetProtocolHandler(p2pSync.StateDiffPID(), s.handler.StateDiffHandler)
248+
s.SetProtocolHandler(starknetp2p.HeadersSyncSubProtocol, s.handler.HeadersHandler)
249+
s.SetProtocolHandler(starknetp2p.EventsSyncSubProtocol, s.handler.EventsHandler)
250+
s.SetProtocolHandler(starknetp2p.TransactionsSyncSubProtocol, s.handler.TransactionsHandler)
251+
s.SetProtocolHandler(starknetp2p.ClassesSyncSubProtocol, s.handler.ClassesHandler)
252+
s.SetProtocolHandler(starknetp2p.StateDiffSyncSubProtocol, s.handler.StateDiffHandler)
263253
}
264254

265255
func (s *Service) callAndLogErr(f func() error, msg string) {
@@ -309,8 +299,11 @@ func (s *Service) NewStream(ctx context.Context, pids ...protocol.ID) (network.S
309299
}
310300
}
311301

312-
func (s *Service) SetProtocolHandler(pid protocol.ID, handler func(network.Stream)) {
313-
s.host.SetStreamHandler(pid, handler)
302+
func (s *Service) SetProtocolHandler(
303+
syncSubProtocol starknetp2p.SyncSubProtocol,
304+
handler func(network.Stream),
305+
) {
306+
s.host.SetStreamHandler(starknetp2p.Sync(s.network, syncSubProtocol), handler)
314307
}
315308

316309
func (s *Service) WithListener(l junoSync.EventListener) {

p2p/pubsub/pubsub.go

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,18 @@ import (
55
"fmt"
66
"strings"
77

8+
"github.com/NethermindEth/juno/p2p/dht"
9+
"github.com/NethermindEth/juno/p2p/starknetp2p"
10+
"github.com/NethermindEth/juno/utils"
811
"github.com/libp2p/go-libp2p"
9-
dht "github.com/libp2p/go-libp2p-kad-dht"
1012
pubsub "github.com/libp2p/go-libp2p-pubsub"
1113
"github.com/libp2p/go-libp2p/core/crypto"
1214
"github.com/libp2p/go-libp2p/core/host"
1315
"github.com/libp2p/go-libp2p/core/peer"
14-
"github.com/libp2p/go-libp2p/core/protocol"
1516
"github.com/libp2p/go-libp2p/p2p/discovery/routing"
1617
)
1718

18-
const (
19-
protocolPrefix = "starknet"
20-
gossipSubHistory = 60
21-
)
19+
const gossipSubHistory = 60
2220

2321
func GetHost(hostPrivateKey crypto.PrivKey, hostAddress string) (host.Host, error) {
2422
return libp2p.New(
@@ -40,21 +38,13 @@ func GetHost(hostPrivateKey crypto.PrivKey, hostAddress string) (host.Host, erro
4038

4139
func Run(
4240
ctx context.Context,
43-
chainID protocol.ID,
44-
protocolID protocol.ID,
4541
host host.Host,
46-
pubSubQueueSize int,
42+
network *utils.Network,
43+
starknetProtocol starknetp2p.Protocol,
4744
bootstrapPeersFn func() []peer.AddrInfo,
45+
pubSubQueueSize int,
4846
) (*pubsub.PubSub, error) {
49-
dht, err := dht.New(
50-
ctx,
51-
host,
52-
dht.ProtocolPrefix("/"+protocolPrefix),
53-
dht.ProtocolExtension("/"+chainID),
54-
dht.ProtocolExtension("/"+protocolID),
55-
dht.BootstrapPeersFunc(bootstrapPeersFn),
56-
dht.Mode(dht.ModeServer),
57-
)
47+
dht, err := dht.New(ctx, host, network, starknetProtocol, bootstrapPeersFn)
5848
if err != nil {
5949
return nil, fmt.Errorf("unable to create dht with error: %w", err)
6050
}

0 commit comments

Comments
 (0)