Skip to content

Commit 3691db4

Browse files
committed
feat: Rewrite WAL and avoid missing entries
1 parent f52e073 commit 3691db4

27 files changed

+527
-470
lines changed

consensus/consensus.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func Init(
5454

5555
proposalStore := proposal.ProposalStore[starknet.Hash]{}
5656
proposer := proposer.New(logger, &builder, &proposalStore, *nodeAddress, toValue)
57-
stateMachine := tendermint.New(tendermintDB, logger, *nodeAddress, proposer, validators, currentHeight)
57+
stateMachine := tendermint.New(logger, *nodeAddress, proposer, validators, currentHeight)
5858

5959
p2p := p2p.New(host, logger, &builder, &proposalStore, currentHeight, &config.DefaultBufferSizes, bootstrapPeersFn)
6060

consensus/db/db.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@ type walMsgCount uint32
3333
// 2. Right before we broadcast a message
3434
//
3535
// We call Delete when we start a new height and commit a block
36-
//
37-
//go:generate mockgen -destination=../mocks/mock_db.go -package=mocks github.com/NethermindEth/juno/consensus/db TendermintDB
3836
type TendermintDB[V types.Hashable[H], H types.Hash, A types.Addr] interface {
3937
// Flush writes the accumulated batch operations to the underlying database.
4038
Flush() error

consensus/driver/driver.go

Lines changed: 65 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package driver
22

33
import (
44
"context"
5+
"fmt"
56
"time"
67

78
"github.com/NethermindEth/juno/consensus/db"
@@ -62,16 +63,40 @@ func (d *Driver[V, H, A]) Run(ctx context.Context) error {
6263
}
6364
}()
6465

65-
d.stateMachine.ReplayWAL()
66+
if err := d.replay(ctx); err != nil {
67+
return err
68+
}
69+
70+
return d.listen(ctx)
71+
}
72+
73+
func (d *Driver[V, H, A]) replay(ctx context.Context) error {
74+
for walEntry, err := range d.db.LoadAllEntries() {
75+
if err != nil {
76+
return fmt.Errorf("failed to load WAL entries: %w", err)
77+
}
6678

79+
if _, err := d.execute(ctx, true, d.stateMachine.ProcessWAL(walEntry)); err != nil {
80+
return err
81+
}
82+
}
83+
84+
return nil
85+
}
86+
87+
func (d *Driver[V, H, A]) listen(ctx context.Context) error {
6788
for {
6889
select {
6990
case <-ctx.Done():
7091
return nil
7192
default:
7293
}
94+
7395
actions := d.stateMachine.ProcessStart(0)
74-
isCommitted := d.execute(ctx, actions)
96+
isCommitted, err := d.execute(ctx, false, actions)
97+
if err != nil {
98+
return err
99+
}
75100

76101
// Todo: check message signature everytime a message is received.
77102
// For the time being it can be assumed the signature is correct.
@@ -100,7 +125,10 @@ func (d *Driver[V, H, A]) Run(ctx context.Context) error {
100125
actions = d.stateMachine.ProcessPrecommit(p)
101126
}
102127

103-
isCommitted = d.execute(ctx, actions)
128+
isCommitted, err = d.execute(ctx, false, actions)
129+
if err != nil {
130+
return err
131+
}
104132
}
105133
}
106134
}
@@ -109,37 +137,55 @@ func (d *Driver[V, H, A]) Run(ctx context.Context) error {
109137
// It returns true if a commit action was executed. This is to notify the caller to start a new height with round 0.
110138
func (d *Driver[V, H, A]) execute(
111139
ctx context.Context,
112-
executingActions []actions.Action[V, H, A],
113-
) (isCommitted bool) {
114-
for _, action := range executingActions {
140+
isReplaying bool,
141+
resultActions []actions.Action[V, H, A],
142+
) (isCommitted bool, err error) {
143+
for _, action := range resultActions {
144+
if !isReplaying && action.RequiresWALFlush() {
145+
if err := d.db.Flush(); err != nil {
146+
return false, fmt.Errorf("failed to flush WAL: %w", err)
147+
}
148+
}
149+
115150
switch action := action.(type) {
151+
case *actions.WriteWAL[V, H, A]:
152+
if !isReplaying {
153+
if err := d.db.SetWALEntry(action.Entry); err != nil {
154+
return false, fmt.Errorf("failed to write WAL: %w", err)
155+
}
156+
}
157+
116158
case *actions.BroadcastProposal[V, H, A]:
117159
d.broadcasters.ProposalBroadcaster.Broadcast(ctx, (*types.Proposal[V, H, A])(action))
160+
118161
case *actions.BroadcastPrevote[H, A]:
119162
d.broadcasters.PrevoteBroadcaster.Broadcast(ctx, (*types.Prevote[H, A])(action))
163+
120164
case *actions.BroadcastPrecommit[H, A]:
121165
d.broadcasters.PrecommitBroadcaster.Broadcast(ctx, (*types.Precommit[H, A])(action))
166+
122167
case *actions.ScheduleTimeout:
123-
d.scheduledTms[types.Timeout(*action)] = time.AfterFunc(d.getTimeout(action.Step, action.Round), func() {
124-
select {
125-
case <-ctx.Done():
126-
case d.timeoutsCh <- types.Timeout(*action):
127-
}
128-
})
129-
case *actions.Commit[V, H, A]:
130-
if err := d.db.Flush(); err != nil {
131-
d.log.Fatalf("failed to flush WAL during commit", "height", action.Height, "round", action.Round, "err", err)
132-
}
168+
d.setTimeout(ctx, types.Timeout(*action))
133169

170+
case *actions.Commit[V, H, A]:
134171
d.log.Debugw("Committing", "height", action.Height, "round", action.Round)
135172
d.commitListener.OnCommit(ctx, action.Height, *action.Value)
136173

137174
if err := d.db.DeleteWALEntries(action.Height); err != nil {
138-
d.log.Errorw("failed to delete WAL messages during commit", "height", action.Height, "round", action.Round, "err", err)
175+
return true, fmt.Errorf("failed to delete WAL messages during commit: %w", err)
139176
}
140177

141-
return true
178+
return true, nil
142179
}
143180
}
144-
return false
181+
return false, nil
182+
}
183+
184+
func (d *Driver[V, H, A]) setTimeout(ctx context.Context, timeout types.Timeout) {
185+
d.scheduledTms[timeout] = time.AfterFunc(d.getTimeout(timeout.Step, timeout.Round), func() {
186+
select {
187+
case <-ctx.Done():
188+
case d.timeoutsCh <- timeout:
189+
}
190+
})
145191
}

consensus/driver/driver_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,6 @@ func TestDriver(t *testing.T) {
157157
stateMachine := mocks.NewMockStateMachine[starknet.Value, starknet.Hash, starknet.Address](
158158
ctrl,
159159
)
160-
stateMachine.EXPECT().ReplayWAL().AnyTimes().Return() // ignore WAL replay logic here
161160

162161
commitAction := starknet.Commit(getRandProposal(random))
163162

consensus/mocks/mock_db.go

Lines changed: 0 additions & 99 deletions
This file was deleted.

consensus/mocks/mock_state_machine.go

Lines changed: 9 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

consensus/starknet/starknet.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type (
2525
MessageHeader = types.MessageHeader[Address]
2626

2727
Action = actions.Action[Value, Hash, Address]
28+
WriteWAL = actions.WriteWAL[Value, Hash, Address]
2829
BroadcastProposal = actions.BroadcastProposal[Value, Hash, Address]
2930
BroadcastPrevote = actions.BroadcastPrevote[Hash, Address]
3031
BroadcastPrecommit = actions.BroadcastPrecommit[Hash, Address]

consensus/sync/sync_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func TestSync(t *testing.T) {
6565
cancel()
6666
})
6767

68-
stateMachine := tendermint.New(tmDB, logger, nodeAddr, mockApp, allNodes, types.Height(0))
68+
stateMachine := tendermint.New(logger, nodeAddr, mockApp, allNodes, types.Height(0))
6969

7070
proposalCh := make(chan *starknet.Proposal)
7171
prevoteCh := make(chan *starknet.Prevote)

0 commit comments

Comments
 (0)