-
Notifications
You must be signed in to change notification settings - Fork 675
[MEL] - Implement delayed message accumulation in native mode #3389
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
dee2081
68c9112
4170ed1
ffb0921
3aad133
3e23fa1
6a1a0c5
a46c9ce
a584333
40b5e8f
d5ed1e8
bcfd767
8f53088
6f1b33f
8d788ee
4b42aaa
c290a9c
cd2cb43
c100876
4329432
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,138 @@ | ||
| package mel | ||
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "fmt" | ||
|
|
||
| "github.com/ethereum/go-ethereum/common" | ||
| "github.com/ethereum/go-ethereum/log" | ||
| ) | ||
|
|
||
| // DelayedMessageBacklogEntry contains metadata relating to delayed messages required for merkle-tree verification | ||
| type DelayedMessageBacklogEntry struct { | ||
| Index uint64 // Global delayed index of a delayed inbox message wrt to the chain | ||
| MsgHash common.Hash // Hash of the delayed inbox message | ||
| MelStateParentChainBlockNum uint64 // ParentChainBlocknumber of the MEL state in which this delayed inbox message was SEEN | ||
| } | ||
|
|
||
| // DelayedMessageBacklog is a data structure that holds metadata related to delayed messages that have been SEEN by MEL but not yet READ. | ||
| // This enables verification of delayed messages read from a database against the current Merkle root of the head MEL state. The MEL state | ||
| // also contains compact witnesses of a Merkle tree representing all seen delayed messages. To prove that a delayed message is part of | ||
| // this Merkle tree, this data structure can be used to verify Merkle proofs against the MEL state. | ||
| type DelayedMessageBacklog struct { | ||
| ctx context.Context | ||
| capacity int | ||
| entries []*DelayedMessageBacklogEntry | ||
| dirtiesStartPos int // represents the starting point of dirties in the entries list, items added while processing a state | ||
| initMessage *DelayedInboxMessage | ||
| finalizedAndReadIndexFetcher func(context.Context) (uint64, error) | ||
| } | ||
|
|
||
| func NewDelayedMessageBacklog(ctx context.Context, capacity int, finalizedAndReadIndexFetcher func(context.Context) (uint64, error), opts ...func(*DelayedMessageBacklog)) (*DelayedMessageBacklog, error) { | ||
| if capacity == 0 { | ||
| return nil, fmt.Errorf("capacity of DelayedMessageBacklog cannot be zero") | ||
| } | ||
| if finalizedAndReadIndexFetcher == nil { | ||
| return nil, fmt.Errorf("finalizedAndReadIndexFetcher of DelayedMessageBacklog cannot be nil") | ||
| } | ||
| backlog := &DelayedMessageBacklog{ | ||
| ctx: ctx, | ||
| capacity: capacity, | ||
| entries: make([]*DelayedMessageBacklogEntry, 0), | ||
| initMessage: nil, | ||
| finalizedAndReadIndexFetcher: finalizedAndReadIndexFetcher, | ||
| } | ||
| for _, opt := range opts { | ||
| opt(backlog) | ||
| } | ||
| return backlog, nil | ||
| } | ||
|
|
||
| func WithUnboundedCapacity(d *DelayedMessageBacklog) { | ||
| d.capacity = 0 | ||
| d.finalizedAndReadIndexFetcher = nil | ||
| } | ||
|
|
||
| // Add takes values of a DelayedMessageBacklogEntry and adds it to the backlog given the entry succeeds validation. It also attempts trimming of backlog if capacity is reached | ||
| func (d *DelayedMessageBacklog) Add(entry *DelayedMessageBacklogEntry) error { | ||
| if len(d.entries) > 0 { | ||
| expectedIndex := d.entries[0].Index + uint64(len(d.entries)) | ||
| if entry.Index != expectedIndex { | ||
| return fmt.Errorf("message index %d is not sequential, expected %d", entry.Index, expectedIndex) | ||
| } | ||
| } | ||
| d.entries = append(d.entries, entry) | ||
| return d.clear() | ||
| } | ||
|
|
||
| func (d *DelayedMessageBacklog) Get(index uint64) (*DelayedMessageBacklogEntry, error) { | ||
| if len(d.entries) == 0 { | ||
| return nil, errors.New("delayed message backlog is empty") | ||
| } | ||
| if index < d.entries[0].Index || index > d.entries[len(d.entries)-1].Index { | ||
| return nil, fmt.Errorf("queried index: %d out of bounds, delayed message backlog's starting index: %d, ending index: %d", index, d.entries[0].Index, d.entries[len(d.entries)-1].Index) | ||
| } | ||
| pos := index - d.entries[0].Index | ||
| entry := d.entries[pos] | ||
| if entry.Index != index { | ||
| return nil, fmt.Errorf("index mismatch in the delayed message backlog entry. Queried index: %d, backlog entry's index: %d", index, entry.Index) | ||
| } | ||
| return entry, nil | ||
| } | ||
|
|
||
| func (d *DelayedMessageBacklog) CommitDirties() { d.dirtiesStartPos = len(d.entries) } // Add dirties to the entries by moving dirtiesStartPos to the end | ||
| func (d *DelayedMessageBacklog) Len() int { return len(d.entries) } // Used for testing InitializeDelayedMessageBacklog function in melrunner | ||
| func (d *DelayedMessageBacklog) GetInitMsg() *DelayedInboxMessage { return d.initMessage } | ||
| func (d *DelayedMessageBacklog) setInitMsg(msg *DelayedInboxMessage) { d.initMessage = msg } | ||
|
|
||
| // clear removes from backlog (if exceeds capacity) the entries that correspond to the delayed messages that are both READ and belong to finalized parent chain blocks | ||
| func (d *DelayedMessageBacklog) clear() error { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is more idiomatic for clear to take in a context rather than storing a context in the struct, IMO
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had thought a lot about this, the ctx would have to trickle down from |
||
| if len(d.entries) <= d.capacity { | ||
| return nil | ||
| } | ||
| if d.finalizedAndReadIndexFetcher != nil && d.dirtiesStartPos > 0 { // if all entries are currently dirty we dont trim the finalized ones | ||
| finalizedDelayedMessagesRead, err := d.finalizedAndReadIndexFetcher(d.ctx) | ||
| if err != nil { | ||
| log.Error("Unable to trim finalized and read delayed messages from DelayedMessageBacklog, will be retried later", "err", err) | ||
| return nil // we should not interrupt delayed messages accumulation if we cannot trim the backlog, since its not high priority | ||
| } | ||
| if finalizedDelayedMessagesRead > d.entries[0].Index { | ||
| leftTrimPos := min(finalizedDelayedMessagesRead-d.entries[0].Index, uint64(len(d.entries))) | ||
| // #nosec G115 | ||
| leftTrimPos = min(leftTrimPos, uint64(d.dirtiesStartPos)) // cannot clear dirties yet, they will be cleared out in the next attempt | ||
| d.entries = d.entries[leftTrimPos:] | ||
| // #nosec G115 | ||
| d.dirtiesStartPos -= int(leftTrimPos) // adjust start position of dirties | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // Reorg removes from backlog the entries that corresponded to the reorged out parent chain blocks | ||
| func (d *DelayedMessageBacklog) reorg(newDelayedMessagedSeen uint64) error { | ||
| if d.dirtiesStartPos != len(d.entries) { | ||
| return fmt.Errorf("delayedMessageBacklog dirties is non-empty when reorg was called, size of dirties:%d", len(d.entries)-d.dirtiesStartPos) | ||
| } | ||
| if len(d.entries) == 0 { | ||
| return nil | ||
| } | ||
| if newDelayedMessagedSeen >= d.entries[0].Index { | ||
| rightTrimPos := newDelayedMessagedSeen - d.entries[0].Index | ||
| if rightTrimPos > uint64(len(d.entries)) { | ||
| return fmt.Errorf("newDelayedMessagedSeen: %d durign a reorg is greater (by more than 1) than the greatest delayed message index stored in backlog: %d", newDelayedMessagedSeen, d.entries[len(d.entries)-1].Index) | ||
| } | ||
| d.entries = d.entries[:rightTrimPos] | ||
| } else { | ||
| d.entries = make([]*DelayedMessageBacklogEntry, 0) | ||
| } | ||
| d.dirtiesStartPos = len(d.entries) | ||
| return nil | ||
| } | ||
|
|
||
| // clone is a shallow clone of DelayedMessageBacklog | ||
| func (d *DelayedMessageBacklog) clone() *DelayedMessageBacklog { | ||
| // Remove dirties from entries | ||
| d.entries = d.entries[:d.dirtiesStartPos] | ||
| return d | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| package mel | ||
|
|
||
| import ( | ||
| "context" | ||
| "reflect" | ||
| "strings" | ||
| "testing" | ||
|
|
||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| func TestDelayedMessageBacklog(t *testing.T) { | ||
| ctx, cancel := context.WithCancel(context.Background()) | ||
| defer cancel() | ||
|
|
||
| backlog, err := NewDelayedMessageBacklog(ctx, 1, func(ctx context.Context) (uint64, error) { return 0, nil }, WithUnboundedCapacity) | ||
| require.NoError(t, err) | ||
|
|
||
| // Verify handling of dirties | ||
| for i := uint64(0); i < 2; i++ { | ||
| require.NoError(t, backlog.Add(&DelayedMessageBacklogEntry{Index: i})) | ||
| } | ||
| backlog.CommitDirties() | ||
| require.True(t, backlog.dirtiesStartPos == 2) | ||
| // Add dirties and verify that calling a clone would remove them | ||
| for i := uint64(2); i < 5; i++ { | ||
| require.NoError(t, backlog.Add(&DelayedMessageBacklogEntry{Index: i})) | ||
| } | ||
| backlog.clone() // should remove all the dirties from entries list | ||
| require.True(t, len(backlog.entries) == 2) | ||
| numEntries := uint64(25) | ||
| for i := uint64(2); i < numEntries; i++ { | ||
| require.NoError(t, backlog.Add(&DelayedMessageBacklogEntry{Index: i})) | ||
| } | ||
| backlog.CommitDirties() | ||
| // #nosec G115 | ||
| require.True(t, uint64(backlog.dirtiesStartPos) == numEntries) | ||
|
|
||
| // Test that clone works | ||
| cloned := backlog.clone() | ||
| if !reflect.DeepEqual(backlog, cloned) { | ||
| t.Fatal("cloned doesnt match original") | ||
| } | ||
|
|
||
| // Test failures with Get | ||
| // Entry not found | ||
| _, err = backlog.Get(numEntries + 1) | ||
| if err == nil { | ||
| t.Fatal("backlog Get function should've errored for an invalid index query") | ||
| } | ||
| if !strings.Contains(err.Error(), "out of bounds") { | ||
| t.Fatalf("unexpected error: %s", err.Error()) | ||
| } | ||
| // Index mismatch | ||
| failIndex := uint64(3) | ||
| backlog.entries[failIndex].Index = failIndex + 1 // shouldnt match | ||
| _, err = backlog.Get(failIndex) | ||
| if err == nil { | ||
| t.Fatal("backlog Get function should've errored for an invalid entry in the backlog") | ||
| } | ||
| if !strings.Contains(err.Error(), "index mismatch in the delayed message backlog entry") { | ||
| t.Fatalf("unexpected error: %s", err.Error()) | ||
| } | ||
|
|
||
| // Verify that advancing the finalizedAndRead will trim the delayedMessageBacklogEntry while keeping the unread ones | ||
| finalizedAndRead := uint64(7) | ||
| backlog.finalizedAndReadIndexFetcher = func(context.Context) (uint64, error) { return finalizedAndRead, nil } | ||
| require.NoError(t, backlog.clear()) | ||
| require.True(t, len(backlog.entries) == int(numEntries-finalizedAndRead)) // #nosec G115 | ||
| require.True(t, backlog.entries[0].Index == finalizedAndRead) | ||
|
|
||
| // Verify that Reorg handling works as expected, reorg of 5 indexes | ||
| newSeen := numEntries - 5 | ||
| require.NoError(t, backlog.reorg(newSeen)) | ||
| // as newDelayedMessageBacklog hasnt updated with new finalized info, its starting elements remain unchanged, just that the right parts are trimmed till (newSeen-1) delayed index | ||
| require.True(t, len(backlog.entries) == int(newSeen-finalizedAndRead)) // #nosec G115 | ||
| require.True(t, backlog.entries[len(backlog.entries)-1].Index == newSeen-1) | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.