Skip to content

Commit 09b0474

Browse files
committed
refactor!(blankhost): make blankhost more amenable to a lifecycle
1 parent 9d8465f commit 09b0474

File tree

1 file changed

+85
-36
lines changed

1 file changed

+85
-36
lines changed

p2p/host/blank/blank.go

Lines changed: 85 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -14,30 +14,33 @@ import (
1414
"github.com/libp2p/go-libp2p/core/peerstore"
1515
"github.com/libp2p/go-libp2p/core/protocol"
1616
"github.com/libp2p/go-libp2p/core/record"
17-
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
18-
1917
logging "github.com/libp2p/go-libp2p/gologshim"
18+
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
2019

2120
ma "github.com/multiformats/go-multiaddr"
2221
mstream "github.com/multiformats/go-multistream"
2322
)
2423

2524
var log = logging.Logger("blankhost")
2625

27-
// BlankHost is the thinnest implementation of the host.Host interface
26+
// BlankHost is a thin implementation of the host.Host interface
2827
type BlankHost struct {
29-
n network.Network
30-
mux *mstream.MultistreamMuxer[protocol.ID]
31-
cmgr connmgr.ConnManager
32-
eventbus event.Bus
33-
emitters struct {
28+
N network.Network
29+
M *mstream.MultistreamMuxer[protocol.ID]
30+
E event.Bus
31+
ConnMgr connmgr.ConnManager
32+
// SkipInitSignedRecord is a flag to skip the initialization of a signed record for the host
33+
SkipInitSignedRecord bool
34+
emitters struct {
3435
evtLocalProtocolsUpdated event.Emitter
3536
}
37+
onStop []func() error
3638
}
3739

3840
type config struct {
39-
cmgr connmgr.ConnManager
40-
eventBus event.Bus
41+
cmgr connmgr.ConnManager
42+
eventBus event.Bus
43+
skipInitSignedRecord bool
4144
}
4245

4346
type Option = func(cfg *config)
@@ -54,6 +57,12 @@ func WithEventBus(eventBus event.Bus) Option {
5457
}
5558
}
5659

60+
func SkipInitSignedRecord() Option {
61+
return func(cfg *config) {
62+
cfg.skipInitSignedRecord = true
63+
}
64+
}
65+
5766
func NewBlankHost(n network.Network, options ...Option) *BlankHost {
5867
cfg := config{
5968
cmgr: &connmgr.NullConnMgr{},
@@ -63,36 +72,72 @@ func NewBlankHost(n network.Network, options ...Option) *BlankHost {
6372
}
6473

6574
bh := &BlankHost{
66-
n: n,
67-
cmgr: cfg.cmgr,
68-
mux: mstream.NewMultistreamMuxer[protocol.ID](),
69-
eventbus: cfg.eventBus,
75+
N: n,
76+
ConnMgr: cfg.cmgr,
77+
M: mstream.NewMultistreamMuxer[protocol.ID](),
78+
E: cfg.eventBus,
79+
80+
SkipInitSignedRecord: cfg.skipInitSignedRecord,
7081
}
71-
if bh.eventbus == nil {
72-
bh.eventbus = eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer()))
82+
83+
if err := bh.Start(); err != nil {
84+
log.Error("error creating blank host", "err", err)
85+
return nil
86+
}
87+
88+
return bh
89+
}
90+
91+
func (bh *BlankHost) Start() error {
92+
if bh.E == nil {
93+
bh.E = eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer()))
7394
}
7495

7596
// subscribe the connection manager to network notifications (has no effect with NullConnMgr)
76-
n.Notify(bh.cmgr.Notifee())
97+
notifee := bh.ConnMgr.Notifee()
98+
bh.N.Notify(notifee)
99+
bh.onStop = append(bh.onStop, func() error {
100+
bh.N.StopNotify(notifee)
101+
return nil
102+
})
77103

78104
var err error
79-
if bh.emitters.evtLocalProtocolsUpdated, err = bh.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil {
80-
return nil
105+
if bh.emitters.evtLocalProtocolsUpdated, err = bh.E.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil {
106+
return err
81107
}
108+
bh.onStop = append(bh.onStop, func() error {
109+
bh.emitters.evtLocalProtocolsUpdated.Close()
110+
return nil
111+
})
82112

83-
n.SetStreamHandler(bh.newStreamHandler)
113+
bh.N.SetStreamHandler(bh.newStreamHandler)
114+
bh.onStop = append(bh.onStop, func() error {
115+
bh.N.SetStreamHandler(func(s network.Stream) { s.Reset() })
116+
return nil
117+
})
84118

85119
// persist a signed peer record for self to the peerstore.
86-
if err := bh.initSignedRecord(); err != nil {
87-
log.Error("error creating blank host", "err", err)
88-
return nil
120+
if !bh.SkipInitSignedRecord {
121+
if err := bh.initSignedRecord(); err != nil {
122+
log.Error("error creating blank host", "err", err)
123+
return err
124+
}
89125
}
90126

91-
return bh
127+
return nil
128+
}
129+
130+
func (bh *BlankHost) Stop() error {
131+
var err error
132+
for _, f := range bh.onStop {
133+
err = errors.Join(err, f())
134+
}
135+
bh.onStop = nil
136+
return err
92137
}
93138

94139
func (bh *BlankHost) initSignedRecord() error {
95-
cab, ok := peerstore.GetCertifiedAddrBook(bh.n.Peerstore())
140+
cab, ok := peerstore.GetCertifiedAddrBook(bh.N.Peerstore())
96141
if !ok {
97142
log.Error("peerstore does not support signed records")
98143
return errors.New("peerstore does not support signed records")
@@ -114,7 +159,7 @@ func (bh *BlankHost) initSignedRecord() error {
114159
var _ host.Host = (*BlankHost)(nil)
115160

116161
func (bh *BlankHost) Addrs() []ma.Multiaddr {
117-
addrs, err := bh.n.InterfaceListenAddresses()
162+
addrs, err := bh.N.InterfaceListenAddresses()
118163
if err != nil {
119164
log.Debug("error retrieving network interface addrs", "err", err)
120165
return nil
@@ -124,14 +169,18 @@ func (bh *BlankHost) Addrs() []ma.Multiaddr {
124169
}
125170

126171
func (bh *BlankHost) Close() error {
127-
return bh.n.Close()
172+
var err error
173+
if bh.onStop != nil {
174+
err = bh.Stop()
175+
}
176+
return errors.Join(err, bh.N.Close())
128177
}
129178

130179
func (bh *BlankHost) Connect(ctx context.Context, ai peer.AddrInfo) error {
131180
// absorb addresses into peerstore
132181
bh.Peerstore().AddAddrs(ai.ID, ai.Addrs, peerstore.TempAddrTTL)
133182

134-
cs := bh.n.ConnsToPeer(ai.ID)
183+
cs := bh.N.ConnsToPeer(ai.ID)
135184
if len(cs) > 0 {
136185
return nil
137186
}
@@ -144,15 +193,15 @@ func (bh *BlankHost) Connect(ctx context.Context, ai peer.AddrInfo) error {
144193
}
145194

146195
func (bh *BlankHost) Peerstore() peerstore.Peerstore {
147-
return bh.n.Peerstore()
196+
return bh.N.Peerstore()
148197
}
149198

150199
func (bh *BlankHost) ID() peer.ID {
151-
return bh.n.LocalPeer()
200+
return bh.N.LocalPeer()
152201
}
153202

154203
func (bh *BlankHost) NewStream(ctx context.Context, p peer.ID, protos ...protocol.ID) (network.Stream, error) {
155-
s, err := bh.n.NewStream(ctx, p)
204+
s, err := bh.N.NewStream(ctx, p)
156205
if err != nil {
157206
return nil, fmt.Errorf("failed to open stream: %w", err)
158207
}
@@ -204,7 +253,7 @@ func (bh *BlankHost) SetStreamHandlerMatch(pid protocol.ID, m func(protocol.ID)
204253
func (bh *BlankHost) newStreamHandler(s network.Stream) {
205254
protoID, handle, err := bh.Mux().Negotiate(s)
206255
if err != nil {
207-
log.Info("protocol negotiation failed", "err", err)
256+
log.Error("protocol negotiation failed", "err", err)
208257
s.Reset()
209258
return
210259
}
@@ -216,18 +265,18 @@ func (bh *BlankHost) newStreamHandler(s network.Stream) {
216265

217266
// TODO: i'm not sure this really needs to be here
218267
func (bh *BlankHost) Mux() protocol.Switch {
219-
return bh.mux
268+
return bh.M
220269
}
221270

222271
// TODO: also not sure this fits... Might be better ways around this (leaky abstractions)
223272
func (bh *BlankHost) Network() network.Network {
224-
return bh.n
273+
return bh.N
225274
}
226275

227276
func (bh *BlankHost) ConnManager() connmgr.ConnManager {
228-
return bh.cmgr
277+
return bh.ConnMgr
229278
}
230279

231280
func (bh *BlankHost) EventBus() event.Bus {
232-
return bh.eventbus
281+
return bh.E
233282
}

0 commit comments

Comments
 (0)