diff --git a/epoch.go b/epoch.go index 3873d0a..d4e1af9 100644 --- a/epoch.go +++ b/epoch.go @@ -978,6 +978,9 @@ func (e *Epoch) persistFinalization(finalization Finalization) error { // or otherwise write it to the WAL in order to commit it later. startRound := e.round nextSeqToCommit := e.nextSeqToCommit() + + e.sched.ExecuteDependents(finalization.Finalization.Digest) + if finalization.Finalization.Seq == nextSeqToCommit { if err := e.indexFinalizations(finalization.Finalization.Round); err != nil { e.Logger.Error("Failed to index finalizations", zap.Error(err)) @@ -1327,6 +1330,8 @@ func (e *Epoch) persistNotarization(notarization Notarization) error { return err } + e.sched.ExecuteDependents(notarization.Vote.Digest) + round := notarization.Vote.Round for _, signer := range notarization.QC.Signers() { if signerIndex := e.nodes.IndexOf(signer); signerIndex != -1 { @@ -1559,7 +1564,10 @@ func (e *Epoch) handleBlockMessage(message *BlockMessage, from NodeID) error { // Schedule the block to be verified once its direct predecessor have been verified, // or if it can be verified immediately. - e.Logger.Debug("Scheduling block verification", zap.Uint64("round", md.Round)) + e.Logger.Debug("Scheduling block verification", + zap.Uint64("round", md.Round), + zap.Uint64("seq", md.Seq), + zap.Bool("ready", canBeImmediatelyVerified)) e.sched.Schedule(task, md.Prev, canBeImmediatelyVerified) return nil @@ -1881,12 +1889,9 @@ func (e *Epoch) createNotarizedBlockVerificationTask(block Block, notarization N func (e *Epoch) isBlockReadyToBeScheduled(seq uint64, prev Digest) bool { if seq > 0 { - // A block can be scheduled if its predecessor either exists in storage, - // or there exists a round object for it. - // Since we only create a round object after we verify the block, - // it means we have verified this block in the past. - _, ok := e.locateBlock(seq-1, prev[:]) - return ok + // A block can be scheduled if its predecessor is either notarized or finalized. + _, notarizedOrFinalized, _ := e.locateBlock(seq-1, prev[:]) + return notarizedOrFinalized != nil } // The first block is always ready to be scheduled return true @@ -1918,7 +1923,7 @@ func (e *Epoch) verifyProposalMetadataAndBlacklist(block Block) bool { // If it's the former, we need to find the parent of the block and ensure it is correct. prevBlacklist := NewBlacklist(uint16(len(e.nodes))) if bh.Seq > 0 { - prevBlock, found := e.locateBlock(bh.Seq-1, bh.Prev[:]) + prevBlock, _, found := e.locateBlock(bh.Seq-1, bh.Prev[:]) if !found { e.Logger.Debug("Could not find parent block with given digest", zap.Uint64("blockSeq", bh.Seq-1), @@ -1974,46 +1979,57 @@ func (e *Epoch) verifyProposalMetadataAndBlacklist(block Block) bool { // 2) Else, on storage. // Compares to the given digest, and if it's the same, returns it. // Otherwise, returns false. -func (e *Epoch) locateBlock(seq uint64, digest []byte) (VerifiedBlock, bool) { +func (e *Epoch) locateBlock(seq uint64, digest []byte) (VerifiedBlock, *notarizationOrFinalization, bool) { // TODO index rounds by digest too to make it quicker // TODO: optimize this by building an index from digest to round for _, round := range e.rounds { dig := round.block.BlockHeader().Digest if bytes.Equal(dig[:], digest) { - return round.block, true + nof := ¬arizationOrFinalization{ + Notarization: round.notarization, + Finalization: round.finalization, + } + if nof.Notarization == nil && nof.Finalization == nil { + return nil, nil, false + } + return round.block, nof, true } } height := e.nextSeqToCommit() // Not in memory, and no block resides in storage. if height == 0 { - return nil, false + return nil, nil, false } // If the given block has a sequence that is higher than the last block we committed to storage, // we don't have the block in our storage. maxSeq := height - 1 if maxSeq < seq { - return nil, false + return nil, nil, false } if seq >= e.nextSeqToCommit() { e.Logger.Debug("Requested block sequence we have not yet committed to storage", zap.Uint64("requestedSeq", seq), zap.Uint64("numBlocks", e.nextSeqToCommit())) - return nil, false + return nil, nil, false } - block, _, ok := e.retrieveBlockOrHalt(seq) + block, finalization, ok := e.retrieveBlockOrHalt(seq) if !ok { - return nil, false + return nil, nil, false + } + + nof := ¬arizationOrFinalization{ + Finalization: &finalization, } dig := block.BlockHeader().Digest if bytes.Equal(dig[:], digest) { - return block, true + return block, nof, true } - return nil, false + return nil, nil, false } func (e *Epoch) buildBlock() { @@ -2071,7 +2087,7 @@ func (e *Epoch) buildBlock() { func (e *Epoch) retrieveBlacklistOfParentBlock(metadata ProtocolMetadata) (Blacklist, bool) { var blacklist Blacklist if metadata.Seq > 0 { - prevBlock, ok := e.locateBlock(metadata.Seq-1, metadata.Prev[:]) + prevBlock, _, ok := e.locateBlock(metadata.Seq-1, metadata.Prev[:]) if !ok { e.Logger.Error("Failed locating previous block", zap.Uint64("round", metadata.Round), @@ -3020,3 +3036,8 @@ type messagesForRound struct { finalization *Finalization notarization *Notarization } + +type notarizationOrFinalization struct { + *Notarization + *Finalization +} diff --git a/epoch_test.go b/epoch_test.go index 274d651..226c790 100644 --- a/epoch_test.go +++ b/epoch_test.go @@ -12,6 +12,7 @@ import ( rand2 "math/rand" "strings" "sync" + "sync/atomic" "testing" "time" @@ -32,6 +33,71 @@ var ( } ) +func TestBlockNotVerifiedIfParentNotNotarized(t *testing.T) { + bb := &testutil.TestBlockBuilder{Out: make(chan *testutil.TestBlock, 1)} + + nodes := []NodeID{{1}, {2}, {3}, {4}} + + comm := testutil.NewNoopComm(nodes) + conf, _, _ := testutil.DefaultTestNodeEpochConfig(t, nodes[3], comm, bb) + + e, err := NewEpoch(conf) + require.NoError(t, err) + + require.NoError(t, e.Start()) + + blocks := createBlocks(t, nodes, 2) + + var block1Verified atomic.Bool + + var wg sync.WaitGroup + wg.Add(1) + + block0 := blocks[0].VerifiedBlock.(*testutil.TestBlock) + block0.OnVerify = func() { + wg.Done() + } + block1 := blocks[1].VerifiedBlock.(*testutil.TestBlock) + block1.OnVerify = func() { + block1Verified.Store(true) + } + + v0, err := testutil.NewTestVote(block0, nodes[0]) + require.NoError(t, err) + + v1, err := testutil.NewTestVote(block1, nodes[1]) + require.NoError(t, err) + + emptyNotarization := testutil.NewEmptyNotarization(nodes, 0) + + err = e.HandleMessage(&Message{ + BlockMessage: &BlockMessage{ + Vote: *v0, + Block: block0, + }, + }, nodes[0]) + require.NoError(t, err) + + wg.Wait() + + err = e.HandleMessage(&Message{ + BlockMessage: &BlockMessage{ + Vote: *v1, + Block: block1, + }, + }, nodes[1]) + require.NoError(t, err) + + err = e.HandleMessage(&Message{ + EmptyNotarization: emptyNotarization, + }, nodes[1]) + require.NoError(t, err) + + require.Never(t, func() bool { + return block1Verified.Load() + }, time.Second, 100*time.Millisecond) +} + func TestEpochHandleNotarizationFutureRound(t *testing.T) { bb := &testutil.TestBlockBuilder{} nodes := []NodeID{{1}, {2}, {3}, {4}} diff --git a/sched.go b/sched.go index edcb2de..0812570 100644 --- a/sched.go +++ b/sched.go @@ -140,6 +140,23 @@ func (as *Scheduler) Schedule(f func() Digest, prev Digest, ready bool) { as.signal.Broadcast() // (11) } +func (as *Scheduler) ExecuteDependents(dep Digest) { + as.lock.Lock() + defer as.lock.Unlock() + + if as.close { + return + } + + newlyReadyTasks := as.pending.Remove(dep) + if len(newlyReadyTasks) == 0 { + return + } + as.ready = append(as.ready, newlyReadyTasks...) + + as.signal.Broadcast() +} + type Task struct { F func() Digest Parent Digest