Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions block/internal/cache/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,20 @@ func benchSetupStore(b *testing.B, n int, txsPer int, chainID string) store.Stor
}
st := store.New(ds)
ctx := context.Background()
batch, err := st.NewBatch(ctx)
if err != nil {
b.Fatal(err)
}
for i := 1; i <= n; i++ {
h, d := types.GetRandomBlock(uint64(i), txsPer, chainID)
if err := st.SaveBlockData(ctx, h, d, &types.Signature{}); err != nil {
if err := batch.SaveBlockData(h, d, &types.Signature{}); err != nil {
b.Fatal(err)
}
}
if err := st.SetHeight(ctx, uint64(n)); err != nil {
if err := batch.SetHeight(uint64(n)); err != nil {
b.Fatal(err)
}
if err := batch.Commit(); err != nil {
b.Fatal(err)
}
return st
Expand Down
7 changes: 5 additions & 2 deletions block/internal/cache/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,17 @@ func TestPendingHeadersAndData_Flow(t *testing.T) {
h3, d3 := types.GetRandomBlock(3, 2, chainID)

// persist in store and set height
batch, err := st.NewBatch(ctx)
require.NoError(t, err)
for _, pair := range []struct {
h *types.SignedHeader
d *types.Data
}{{h1, d1}, {h2, d2}, {h3, d3}} {
err := st.SaveBlockData(ctx, pair.h, pair.d, &types.Signature{})
err := batch.SaveBlockData(pair.h, pair.d, &types.Signature{})
require.NoError(t, err)
}
require.NoError(t, st.SetHeight(ctx, 3))
require.NoError(t, batch.SetHeight(3))
require.NoError(t, batch.Commit())

// construct manager which brings up pending managers
cfg := tempConfig(t)
Expand Down
5 changes: 4 additions & 1 deletion block/internal/cache/pending_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ func TestPendingBase_PersistLastSubmitted(t *testing.T) {
require.NoError(t, err)

// store height 3 to make numPending meaningful
require.NoError(t, st.SetHeight(ctx, 3))
batch, err := st.NewBatch(ctx)
require.NoError(t, err)
require.NoError(t, batch.SetHeight(3))
require.NoError(t, batch.Commit())
assert.Equal(t, uint64(3), ph.NumPendingHeaders())

// set last submitted higher and ensure metadata is written
Expand Down
17 changes: 13 additions & 4 deletions block/internal/cache/pending_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ func TestPendingData_BasicFlow(t *testing.T) {
h2, d2 := types.GetRandomBlock(2, 1, chainID)
h3, d3 := types.GetRandomBlock(3, 2, chainID)

batch, err := store.NewBatch(ctx)
require.NoError(t, err)
for _, p := range []struct {
h *types.SignedHeader
d *types.Data
}{{h1, d1}, {h2, d2}, {h3, d3}} {
require.NoError(t, store.SaveBlockData(ctx, p.h, p.d, &types.Signature{}))
require.NoError(t, batch.SaveBlockData(p.h, p.d, &types.Signature{}))
}
require.NoError(t, store.SetHeight(ctx, 3))
require.NoError(t, batch.SetHeight(3))
require.NoError(t, batch.Commit())

pendingData, err := NewPendingData(store, zerolog.Nop())
require.NoError(t, err)
Expand Down Expand Up @@ -69,7 +72,10 @@ func TestPendingData_InitFromMetadata(t *testing.T) {
require.NoError(t, store.SetMetadata(ctx, LastSubmittedDataHeightKey, bz))

// store height is 3
require.NoError(t, store.SetHeight(ctx, 3))
batch, err := store.NewBatch(ctx)
require.NoError(t, err)
require.NoError(t, batch.SetHeight(3))
require.NoError(t, batch.Commit())

pendingData, err := NewPendingData(store, zerolog.Nop())
require.NoError(t, err)
Expand All @@ -82,7 +88,10 @@ func TestPendingData_GetPending_PropagatesFetchError(t *testing.T) {
store := memStore(t)

// Set height to 1 but do not save any block data
require.NoError(t, store.SetHeight(ctx, 1))
batch, err := store.NewBatch(ctx)
require.NoError(t, err)
require.NoError(t, batch.SetHeight(1))
require.NoError(t, batch.Commit())

pendingData, err := NewPendingData(store, zerolog.Nop())
require.NoError(t, err)
Expand Down
14 changes: 10 additions & 4 deletions block/internal/cache/pending_headers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ func TestPendingHeaders_BasicFlow(t *testing.T) {
h2, d2 := types.GetRandomBlock(2, 1, chainID)
h3, d3 := types.GetRandomBlock(3, 2, chainID)

batch, err := store.NewBatch(ctx)
require.NoError(t, err)
for _, p := range []struct {
h *types.SignedHeader
d *types.Data
}{{h1, d1}, {h2, d2}, {h3, d3}} {
require.NoError(t, store.SaveBlockData(ctx, p.h, p.d, &types.Signature{}))
require.NoError(t, batch.SaveBlockData(p.h, p.d, &types.Signature{}))
}
require.NoError(t, store.SetHeight(ctx, 3))
require.NoError(t, batch.SetHeight(3))
require.NoError(t, batch.Commit())

pendingHeaders, err := NewPendingHeaders(store, zerolog.Nop())
require.NoError(t, err)
Expand Down Expand Up @@ -67,8 +70,11 @@ func TestPendingHeaders_EmptyWhenUpToDate(t *testing.T) {
store := memStore(t)

h, d := types.GetRandomBlock(1, 1, "ph-up")
require.NoError(t, store.SaveBlockData(ctx, h, d, &types.Signature{}))
require.NoError(t, store.SetHeight(ctx, 1))
batch, err := store.NewBatch(ctx)
require.NoError(t, err)
require.NoError(t, batch.SaveBlockData(h, d, &types.Signature{}))
require.NoError(t, batch.SetHeight(1))
require.NoError(t, batch.Commit())

pendingHeaders, err := NewPendingHeaders(store, zerolog.Nop())
require.NoError(t, err)
Expand Down
49 changes: 36 additions & 13 deletions block/internal/executing/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,15 +202,22 @@ func (e *Executor) initializeState() error {
AppHash: stateRoot,
DAHeight: 0,
}

// Set store height and save initial state
batch, err := e.store.NewBatch(e.ctx)
if err != nil {
return fmt.Errorf("failed to create batch for initialization: %w", err)
}
if err := batch.SetHeight(state.LastBlockHeight); err != nil {
return fmt.Errorf("failed to set store height: %w", err)
}
if err := batch.Commit(); err != nil {
return fmt.Errorf("failed to commit initial state: %w", err)
}
}

e.setLastState(state)

// Set store height
if err := e.store.SetHeight(e.ctx, state.LastBlockHeight); err != nil {
return fmt.Errorf("failed to set store height: %w", err)
}

e.logger.Info().Uint64("height", state.LastBlockHeight).
Str("chain_id", state.ChainID).Msg("initialized state")

Expand Down Expand Up @@ -349,9 +356,18 @@ func (e *Executor) produceBlock() error {
}

// saved early for crash recovery, will be overwritten later with the final signature
if err = e.store.SaveBlockData(e.ctx, header, data, &types.Signature{}); err != nil {
b, err := e.store.NewBatch(context.Background())
if err != nil {
return fmt.Errorf("failed to create batch: %w", err)
}

if err = b.SaveBlockData(header, data, &types.Signature{}); err != nil {
return fmt.Errorf("failed to save block: %w", err)
}

if err := b.Commit(); err != nil {
return fmt.Errorf("failed to commit batch: %w", err)
}
}

newState, err := e.applyBlock(e.ctx, header.Header, data)
Expand All @@ -372,20 +388,27 @@ func (e *Executor) produceBlock() error {
return fmt.Errorf("failed to validate block: %w", err)
}

if err := e.store.SaveBlockData(e.ctx, header, data, &signature); err != nil {
batch, err := e.store.NewBatch(e.ctx)
if err != nil {
return fmt.Errorf("failed to create batch: %w", err)
}

if err := batch.SaveBlockData(header, data, &signature); err != nil {
return fmt.Errorf("failed to save block: %w", err)
}

// Once the SaveBlockData has been saved we must update the height and the state.
// context.TODO() should be reverted to the real context (e.ctx) once https://github.com/evstack/ev-node/issues/2274 has been implemented, this prevents context cancellation
if err := e.store.SetHeight(context.TODO(), newHeight); err != nil {
if err := batch.SetHeight(newHeight); err != nil {
return fmt.Errorf("failed to update store height: %w", err)
}

if err := e.updateState(context.TODO(), newState); err != nil {
if err := e.updateState(batch, newState); err != nil {
return fmt.Errorf("failed to update state: %w", err)
}

if err := batch.Commit(); err != nil {
return fmt.Errorf("failed to commit block data: %w", err)
}

// broadcast header and data to P2P network
g, ctx := errgroup.WithContext(e.ctx)
g.Go(func() error { return e.headerBroadcaster.WriteToStoreAndBroadcast(ctx, header) })
Expand Down Expand Up @@ -604,8 +627,8 @@ func (e *Executor) validateBlock(lastState types.State, header *types.SignedHead
}

// updateState saves the new state
func (e *Executor) updateState(ctx context.Context, newState types.State) error {
if err := e.store.UpdateState(ctx, newState); err != nil {
func (e *Executor) updateState(batch store.Batch, newState types.State) error {
if err := batch.UpdateState(newState); err != nil {
return err
}

Expand Down
6 changes: 5 additions & 1 deletion block/internal/executing/executor_restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,11 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) {
pendingHeader.DataHash = pendingData.DACommitment()

// Save pending block data (this is what would happen during a crash)
err = memStore.SaveBlockData(context.Background(), pendingHeader, pendingData, &types.Signature{})
batch, err := memStore.NewBatch(context.Background())
require.NoError(t, err)
err = batch.SaveBlockData(pendingHeader, pendingData, &types.Signature{})
require.NoError(t, err)
err = batch.Commit()
require.NoError(t, err)

// Stop first executor (simulating crash/restart)
Expand Down
9 changes: 6 additions & 3 deletions block/internal/submitting/da_submitter_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,12 @@ func TestDASubmitter_SubmitHeadersAndData_MarksInclusionAndUpdatesLastSubmitted(
// persist to store
sig1t := types.Signature(sig1)
sig2t := types.Signature(sig2)
require.NoError(t, st.SaveBlockData(context.Background(), hdr1, data1, &sig1t))
require.NoError(t, st.SaveBlockData(context.Background(), hdr2, data2, &sig2t))
require.NoError(t, st.SetHeight(context.Background(), 2))
batch, err := st.NewBatch(context.Background())
require.NoError(t, err)
require.NoError(t, batch.SaveBlockData(hdr1, data1, &sig1t))
require.NoError(t, batch.SaveBlockData(hdr2, data2, &sig2t))
require.NoError(t, batch.SetHeight(2))
require.NoError(t, batch.Commit())

// Dummy DA
dummyDA := coreda.NewDummyDA(10_000_000, 0, 0, 10*time.Millisecond)
Expand Down
40 changes: 26 additions & 14 deletions block/internal/submitting/da_submitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,15 @@ func TestDASubmitter_SubmitHeaders_Success(t *testing.T) {
// Save to store to make them pending
sig1 := header1.Signature
sig2 := header2.Signature
require.NoError(t, st.SaveBlockData(ctx, header1, data1, &sig1))
require.NoError(t, st.SaveBlockData(ctx, header2, data2, &sig2))
require.NoError(t, st.SetHeight(ctx, 2))
batch, err := st.NewBatch(ctx)
require.NoError(t, err)
require.NoError(t, batch.SaveBlockData(header1, data1, &sig1))
require.NoError(t, batch.SaveBlockData(header2, data2, &sig2))
require.NoError(t, batch.SetHeight(2))
require.NoError(t, batch.Commit())

// Submit headers
err := submitter.SubmitHeaders(ctx, cm)
err = submitter.SubmitHeaders(ctx, cm)
require.NoError(t, err)

// Verify headers are marked as DA included
Expand Down Expand Up @@ -253,12 +256,15 @@ func TestDASubmitter_SubmitData_Success(t *testing.T) {
// Save to store to make them pending
sig1 := types.Signature([]byte("sig1"))
sig2 := types.Signature([]byte("sig2"))
require.NoError(t, st.SaveBlockData(ctx, header1, data1, &sig1))
require.NoError(t, st.SaveBlockData(ctx, header2, data2, &sig2))
require.NoError(t, st.SetHeight(ctx, 2))
batch, err := st.NewBatch(ctx)
require.NoError(t, err)
require.NoError(t, batch.SaveBlockData(header1, data1, &sig1))
require.NoError(t, batch.SaveBlockData(header2, data2, &sig2))
require.NoError(t, batch.SetHeight(2))
require.NoError(t, batch.Commit())

// Submit data
err := submitter.SubmitData(ctx, cm, signer, gen)
err = submitter.SubmitData(ctx, cm, signer, gen)
require.NoError(t, err)

// Verify data is marked as DA included
Expand Down Expand Up @@ -302,11 +308,14 @@ func TestDASubmitter_SubmitData_SkipsEmptyData(t *testing.T) {

// Save to store
sig := types.Signature([]byte("sig"))
require.NoError(t, st.SaveBlockData(ctx, header, emptyData, &sig))
require.NoError(t, st.SetHeight(ctx, 1))
batch, err := st.NewBatch(ctx)
require.NoError(t, err)
require.NoError(t, batch.SaveBlockData(header, emptyData, &sig))
require.NoError(t, batch.SetHeight(1))
require.NoError(t, batch.Commit())

// Submit data - should succeed but skip empty data
err := submitter.SubmitData(ctx, cm, signer, gen)
err = submitter.SubmitData(ctx, cm, signer, gen)
require.NoError(t, err)

// Empty data should not be marked as DA included (it's implicitly included)
Expand Down Expand Up @@ -353,11 +362,14 @@ func TestDASubmitter_SubmitData_NilSigner(t *testing.T) {

// Save to store
sig := types.Signature([]byte("sig"))
require.NoError(t, st.SaveBlockData(ctx, header, data, &sig))
require.NoError(t, st.SetHeight(ctx, 1))
batch, err := st.NewBatch(ctx)
require.NoError(t, err)
require.NoError(t, batch.SaveBlockData(header, data, &sig))
require.NoError(t, batch.SetHeight(1))
require.NoError(t, batch.Commit())

// Submit data with nil signer - should fail
err := submitter.SubmitData(ctx, cm, nil, gen)
err = submitter.SubmitData(ctx, cm, nil, gen)
require.Error(t, err)
assert.Contains(t, err.Error(), "signer is nil")
}
Expand Down
27 changes: 19 additions & 8 deletions block/internal/submitting/submitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ func TestSubmitter_IsHeightDAIncluded(t *testing.T) {

ctx := t.Context()
cm, st := newTestCacheAndStore(t)
require.NoError(t, st.SetHeight(ctx, 5))
batch, err := st.NewBatch(ctx)
require.NoError(t, err)
require.NoError(t, batch.SetHeight(5))
require.NoError(t, batch.Commit())

s := &Submitter{store: st, cache: cm, logger: zerolog.Nop()}
s.ctx = ctx
Expand Down Expand Up @@ -245,9 +248,12 @@ func TestSubmitter_processDAInclusionLoop_advances(t *testing.T) {
require.NotEqual(t, d1.DACommitment(), d2.DACommitment())

sig := types.Signature([]byte("sig"))
require.NoError(t, st.SaveBlockData(ctx, h1, d1, &sig))
require.NoError(t, st.SaveBlockData(ctx, h2, d2, &sig))
require.NoError(t, st.SetHeight(ctx, 2))
batch, err := st.NewBatch(ctx)
require.NoError(t, err)
require.NoError(t, batch.SaveBlockData(h1, d1, &sig))
require.NoError(t, batch.SaveBlockData(h2, d2, &sig))
require.NoError(t, batch.SetHeight(2))
require.NoError(t, batch.Commit())

cm.SetHeaderDAIncluded(h1.Hash().String(), 100)
cm.SetDataDAIncluded(d1.DACommitment().String(), 100)
Expand Down Expand Up @@ -286,10 +292,12 @@ func TestSubmitter_processDAInclusionLoop_advances(t *testing.T) {
// helper to create a minimal header and data for tests
func newHeaderAndData(chainID string, height uint64, nonEmpty bool) (*types.SignedHeader, *types.Data) {
now := time.Now()
h := &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{ChainID: chainID, Height: height, Time: uint64(now.UnixNano())}, ProposerAddress: []byte{1}}}
d := &types.Data{Metadata: &types.Metadata{ChainID: chainID, Height: height, Time: uint64(now.UnixNano())}}
// Use height multiplied by a large offset to ensure uniqueness across multiple calls
timestamp := uint64(now.UnixNano()) + (height * 1000000)
h := &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{ChainID: chainID, Height: height, Time: timestamp}, ProposerAddress: []byte{1}}}
d := &types.Data{Metadata: &types.Metadata{ChainID: chainID, Height: height, Time: timestamp}}
if nonEmpty {
d.Txs = types.Txs{types.Tx(fmt.Sprintf("any-unique-tx-%d", now.UnixNano()))}
d.Txs = types.Txs{types.Tx(fmt.Sprintf("any-unique-tx-%d-%d", height, now.UnixNano()))}
}
return h, d
}
Expand Down Expand Up @@ -337,7 +345,10 @@ func TestSubmitter_daSubmissionLoop(t *testing.T) {
}

// Make there be pending headers and data by setting store height > last submitted
require.NoError(t, st.SetHeight(ctx, 2))
batch, err := st.NewBatch(ctx)
require.NoError(t, err)
require.NoError(t, batch.SetHeight(2))
require.NoError(t, batch.Commit())

// Start and wait for calls
require.NoError(t, s.Start(ctx))
Expand Down
Loading