Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions consensus/p2p/buffered/buffered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const (
maxWait = 30 * time.Second
)

var network = &utils.Mainnet

type TestMessage = consensus.ConsensusStreamId

type origin struct {
Expand All @@ -45,7 +47,7 @@ func TestBufferedTopicSubscriptionAndProtoBroadcaster(t *testing.T) {
require.NoError(t, err)

nodes := testutils.BuildNetworks(t, testutils.LineNetworkConfig(nodeCount))
topics := nodes.JoinTopic(t, chainID, protocolID, topicName)
topics := nodes.JoinTopic(t, network, protocolID, topicName)

messages := make([][]*TestMessage, nodeCount)
allMessages := make(map[string]origin)
Expand Down Expand Up @@ -136,7 +138,7 @@ func TestBufferedTopicSubscriptionAndProtoBroadcaster(t *testing.T) {
require.NoError(t, err)

nodes := testutils.BuildNetworks(t, testutils.NewAdjacentNodes(1))
topics := nodes.JoinTopic(t, chainID, protocolID, topicName)
topics := nodes.JoinTopic(t, network, protocolID, topicName)
topic := topics[0]

ctx, cancel := context.WithCancel(t.Context())
Expand Down
16 changes: 9 additions & 7 deletions consensus/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/NethermindEth/juno/consensus/starknet"
"github.com/NethermindEth/juno/consensus/types"
"github.com/NethermindEth/juno/p2p/pubsub"
"github.com/NethermindEth/juno/p2p/starknetp2p"
"github.com/NethermindEth/juno/service"
"github.com/NethermindEth/juno/utils"
libp2p "github.com/libp2p/go-libp2p-pubsub"
Expand All @@ -24,10 +25,9 @@ import (
type topicName string

const (
chainID = "1" // TODO: Make this configurable
consensusProtocolID = "consensus"
proposalTopicName topicName = "consensus_proposals"
voteTopicName topicName = "consensus_votes"
chainID = "1" // TODO: Make this configurable
proposalTopicName topicName = "consensus_proposals"
voteTopicName topicName = "consensus_votes"
)

type P2P[V types.Hashable[H], H types.Hash, A types.Addr] interface {
Expand All @@ -40,6 +40,7 @@ type P2P[V types.Hashable[H], H types.Hash, A types.Addr] interface {
type p2p[V types.Hashable[H], H types.Hash, A types.Addr] struct {
host host.Host
log utils.Logger
network *utils.Network
commitNotifier chan types.Height
broadcasters Broadcasters[V, H, A]
listeners Listeners[V, H, A]
Expand Down Expand Up @@ -110,6 +111,7 @@ func New(
return &p2p[starknet.Value, starknet.Hash, starknet.Address]{
host: host,
log: log,
network: builder.Network(),
commitNotifier: commitNotifier,
broadcasters: broadcasters,
listeners: listeners,
Expand All @@ -122,11 +124,11 @@ func New(
func (p *p2p[V, H, A]) Run(ctx context.Context) error {
gossipSub, err := pubsub.Run(
ctx,
chainID,
consensusProtocolID,
p.host,
p.pubSubQueueSize,
p.network,
starknetp2p.ConsensusProtocolID,
p.bootstrapPeersFn,
p.pubSubQueueSize,
)
if err != nil {
return fmt.Errorf("unable to create gossipsub with error: %w", err)
Expand Down
5 changes: 3 additions & 2 deletions consensus/p2p/validator/proposal_stream_demux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,18 @@ const (
firstHalfSize = 2
)

var network = &utils.Mainnet

func TestProposalStreamDemux(t *testing.T) {
logger, err := utils.NewZapLogger(utils.NewLogLevel(logLevel), true)
require.NoError(t, err)

nodes := testutils.BuildNetworks(t, testutils.NewAdjacentNodes(1))
topics := nodes.JoinTopic(t, chainID, protocolID, topicName)
topics := nodes.JoinTopic(t, network, protocolID, topicName)
topic := topics[0]

commitNotifier := make(chan types.Height)

network := &utils.SepoliaIntegration
executor := NewMockExecutor(t, network)
database := memory.New()
bc := blockchain.New(database, network)
Expand Down
4 changes: 3 additions & 1 deletion consensus/p2p/vote/vote_broadcasters_listeners_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const (
maxWait = 5 * time.Second
)

var network = &utils.Mainnet

func TestVoteBroadcastersAndListeners(t *testing.T) {
logger, err := utils.NewZapLogger(utils.NewLogLevel(logLevel), true)
require.NoError(t, err)
Expand All @@ -51,7 +53,7 @@ func TestVoteBroadcastersAndListeners(t *testing.T) {
pending := maps.Clone(voteSet)

nodes := testutils.BuildNetworks(t, testutils.LineNetworkConfig(2))
topics := nodes.JoinTopic(t, chainID, protocolID, topicName)
topics := nodes.JoinTopic(t, network, protocolID, topicName)

source := topics[0]
destination := topics[1]
Expand Down
9 changes: 6 additions & 3 deletions mempool/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/NethermindEth/juno/consensus/p2p/config"
"github.com/NethermindEth/juno/mempool"
"github.com/NethermindEth/juno/p2p/pubsub"
"github.com/NethermindEth/juno/p2p/starknetp2p"
"github.com/NethermindEth/juno/utils"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -23,6 +24,7 @@ const (
type P2P struct {
host host.Host
log utils.Logger
network *utils.Network
pool mempool.Pool
broadcaster transactionBroadcaster
listener buffered.TopicSubscription
Expand All @@ -41,6 +43,7 @@ func New(
return &P2P{
host: host,
log: log,
network: network,
pool: pool,
broadcaster: NewTransactionBroadcaster(log, config.MempoolBroadcaster, config.RetryInterval),
listener: NewTransactionListener(network, log, pool, config.MempoolListener),
Expand All @@ -52,11 +55,11 @@ func New(
func (p *P2P) Run(ctx context.Context) error {
gossipSub, err := pubsub.Run(
ctx,
chainID,
mempoolProtocolID,
p.host,
p.config.PubSubQueueSize,
p.network,
starknetp2p.MempoolProtocolID,
p.bootstrapPeersFn,
p.config.PubSubQueueSize,
)
if err != nil {
return fmt.Errorf("unable to create gossipsub with error: %w", err)
Expand Down
47 changes: 47 additions & 0 deletions p2p/dht/dht.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package dht

import (
"context"
"fmt"
"strings"

"github.com/NethermindEth/juno/p2p/starknetp2p"
"github.com/NethermindEth/juno/utils"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
)

func New(
ctx context.Context,
host host.Host,
network *utils.Network,
starknetProtocol starknetp2p.Protocol,
bootstrapPeersFn func() []peer.AddrInfo,
) (*dht.IpfsDHT, error) {
return dht.New(
ctx,
host,
append(
starknetp2p.DHT(network, starknetProtocol),
dht.BootstrapPeersFunc(bootstrapPeersFn),
dht.Mode(dht.ModeServer),
)...,
)
}

func ExtractPeers(peers string) ([]peer.AddrInfo, error) {
if peers == "" {
return nil, nil
}

peerAddrs := []peer.AddrInfo{}
for peerStr := range strings.SplitSeq(peers, ",") {
peerAddr, err := peer.AddrInfoFromString(peerStr)
if err != nil {
return nil, fmt.Errorf("unable to parse peer address %q: %w", peerStr, err)
}
peerAddrs = append(peerAddrs, *peerAddr)
}
return peerAddrs, nil
}
65 changes: 29 additions & 36 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@ import (
"errors"
"fmt"
"math/rand"
"strings"
"time"

"github.com/Masterminds/semver/v3"
"github.com/NethermindEth/juno/blockchain"
"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/p2p/dht"
p2pPeers "github.com/NethermindEth/juno/p2p/peers"
"github.com/NethermindEth/juno/p2p/starknetp2p"
p2pSync "github.com/NethermindEth/juno/p2p/sync"
junoSync "github.com/NethermindEth/juno/sync"
"github.com/NethermindEth/juno/utils"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
libp2pdht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/crypto/pb"
Expand All @@ -42,7 +43,7 @@ type Service struct {
handler *p2pPeers.Handler
log utils.SimpleLogger

dht *dht.IpfsDHT
dht *libp2pdht.IpfsDHT
pubsub *pubsub.PubSub

synchroniser *p2pSync.Service
Expand Down Expand Up @@ -116,29 +117,27 @@ func New(addr, publicAddr, version, peers, privKeyStr string, feederNode bool, b
func NewWithHost(p2phost host.Host, peers string, feederNode bool, bc *blockchain.Blockchain, snNetwork *utils.Network,
log utils.SimpleLogger, database db.KeyValueStore,
) (*Service, error) {
var (
peersAddrInfoS []peer.AddrInfo
err error
)

peersAddrInfoS, err = loadPeers(database)
peersAddrInfoS, err := loadPeers(database)
if err != nil {
log.Warnw("Failed to load peers", "err", err)
}

if peers != "" {
for peerStr := range strings.SplitSeq(peers, ",") {
var peerAddr *peer.AddrInfo
peerAddr, err = peer.AddrInfoFromString(peerStr)
if err != nil {
return nil, fmt.Errorf("addr info from %q: %w", peerStr, err)
}

peersAddrInfoS = append(peersAddrInfoS, *peerAddr)
}
configuredPeers, err := dht.ExtractPeers(peers)
if err != nil {
return nil, fmt.Errorf("unable to extract peers: %w", err)
}

p2pdht, err := makeDHT(p2phost, peersAddrInfoS)
peersAddrInfoS = append(peersAddrInfoS, configuredPeers...)

p2pdht, err := dht.New(
context.Background(),
p2phost,
snNetwork,
starknetp2p.SyncProtocolID,
func() []peer.AddrInfo {
return peersAddrInfoS
},
)
if err != nil {
return nil, err
}
Expand All @@ -160,15 +159,6 @@ func NewWithHost(p2phost host.Host, peers string, feederNode bool, bc *blockchai
return s, nil
}

func makeDHT(p2phost host.Host, addrInfos []peer.AddrInfo) (*dht.IpfsDHT, error) {
return dht.New(context.Background(), p2phost,
dht.ProtocolPrefix(p2pSync.Prefix),
dht.BootstrapPeers(addrInfos...),
dht.RoutingTableRefreshPeriod(routingTableRefreshPeriod),
dht.Mode(dht.ModeServer),
)
}

func privateKey(privKeyStr string) (crypto.PrivKey, error) {
if privKeyStr == "" {
// Creates a new key pair for this host.
Expand Down Expand Up @@ -255,11 +245,11 @@ func (s *Service) Run(ctx context.Context) error {
}

func (s *Service) setProtocolHandlers() {
s.SetProtocolHandler(p2pSync.HeadersPID(), s.handler.HeadersHandler)
s.SetProtocolHandler(p2pSync.EventsPID(), s.handler.EventsHandler)
s.SetProtocolHandler(p2pSync.TransactionsPID(), s.handler.TransactionsHandler)
s.SetProtocolHandler(p2pSync.ClassesPID(), s.handler.ClassesHandler)
s.SetProtocolHandler(p2pSync.StateDiffPID(), s.handler.StateDiffHandler)
s.SetProtocolHandler(starknetp2p.HeadersSyncSubProtocol, s.handler.HeadersHandler)
s.SetProtocolHandler(starknetp2p.EventsSyncSubProtocol, s.handler.EventsHandler)
s.SetProtocolHandler(starknetp2p.TransactionsSyncSubProtocol, s.handler.TransactionsHandler)
s.SetProtocolHandler(starknetp2p.ClassesSyncSubProtocol, s.handler.ClassesHandler)
s.SetProtocolHandler(starknetp2p.StateDiffSyncSubProtocol, s.handler.StateDiffHandler)
}

func (s *Service) callAndLogErr(f func() error, msg string) {
Expand Down Expand Up @@ -309,8 +299,11 @@ func (s *Service) NewStream(ctx context.Context, pids ...protocol.ID) (network.S
}
}

func (s *Service) SetProtocolHandler(pid protocol.ID, handler func(network.Stream)) {
s.host.SetStreamHandler(pid, handler)
func (s *Service) SetProtocolHandler(
syncSubProtocol starknetp2p.SyncSubProtocol,
handler func(network.Stream),
) {
s.host.SetStreamHandler(starknetp2p.Sync(s.network, syncSubProtocol), handler)
}

func (s *Service) WithListener(l junoSync.EventListener) {
Expand Down
Loading
Loading