Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
46f8fad
fix(store): track store's contiguous head
cristaloleg Jan 8, 2025
cbcb738
more fixes
cristaloleg Jan 8, 2025
4bc4aa0
fix test
cristaloleg Jan 8, 2025
9e19840
fix sync test
cristaloleg Jan 8, 2025
cc8894c
fix sync test
cristaloleg Jan 8, 2025
73b65f2
test heightSub
cristaloleg Jan 9, 2025
b74e825
simplify test
cristaloleg Jan 9, 2025
2a317ad
wip
cristaloleg Jan 10, 2025
1903695
cleanup
cristaloleg Jan 10, 2025
268afc3
simplify heightSub
cristaloleg Jan 10, 2025
674e461
fix tests
cristaloleg Jan 13, 2025
a68fd8a
fixes
cristaloleg Jan 13, 2025
7c3cc9b
fix ctx
cristaloleg Jan 13, 2025
5d89381
review suggestions
cristaloleg Jan 14, 2025
b267831
rebase
cristaloleg Jan 14, 2025
beda8d1
tests
cristaloleg Jan 15, 2025
311561b
more tests
cristaloleg Jan 15, 2025
22a4b2f
review suggestions
cristaloleg Jan 22, 2025
26e6621
small refactoring
cristaloleg Jan 22, 2025
06e73c1
add heightSub.Init
cristaloleg Jan 23, 2025
b506084
do 1 more fetch before subscribe
cristaloleg Jan 23, 2025
7b5b282
review suggestions
cristaloleg Jan 24, 2025
ac9eb8c
do advance at start
cristaloleg Jan 24, 2025
7a53bc7
load key on start
cristaloleg Jan 27, 2025
3089d68
fix finding
cristaloleg Jan 27, 2025
75b0141
review suggestions
cristaloleg Jan 27, 2025
8194f52
rename to a better name
cristaloleg Jan 27, 2025
1375bd3
even better names
cristaloleg Jan 27, 2025
e9fbde6
sky is the limit
cristaloleg Jan 27, 2025
07babe7
hehe
cristaloleg Jan 27, 2025
459baad
drop param
cristaloleg Jan 27, 2025
9f1c92b
review suggestions
cristaloleg Jan 30, 2025
d554c55
fix
cristaloleg Jan 31, 2025
034e618
revert
cristaloleg Jan 31, 2025
aa97c39
more review suggestions
cristaloleg Feb 4, 2025
f2d860c
simplify
cristaloleg Feb 4, 2025
5518222
simplify again
cristaloleg Feb 4, 2025
aef55c4
simplify
cristaloleg Feb 4, 2025
ac051e4
linter
cristaloleg Feb 4, 2025
35856eb
think different
cristaloleg Feb 4, 2025
73b5c89
more suggestions
cristaloleg Feb 5, 2025
84a7090
upd cmnt
cristaloleg Feb 5, 2025
360a153
refactor(store): synchronize Store writes
Wondertan Jan 13, 2025
279e167
store: add batches tracking gaps
Wondertan Feb 11, 2025
d8a158a
batch read-only mode to protect from dirty writes
Wondertan Mar 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions p2p/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ func TestExchangeServer_handleRequestTimeout(t *testing.T) {
peer := createMocknet(t, 1)
s, err := store.NewStore[*headertest.DummyHeader](datastore.NewMapDatastore())
require.NoError(t, err)
head := headertest.RandDummyHeader(t)
head.HeightI %= 1000 // make it a bit lower
s.Init(context.Background(), head)
server, err := NewExchangeServer[*headertest.DummyHeader](
peer[0],
s,
Expand Down
192 changes: 151 additions & 41 deletions store/batch.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,129 @@
package store

import (
"slices"
"sync"

"github.com/celestiaorg/go-header"
)

// batch keeps an adjacent range of headers and loosely mimics the Store
// interface. NOTE: Can fully implement Store for a use case.
type batches[H header.Header[H]] struct {
batchesMu sync.RWMutex
batches []*batch[H]

batchLenLimit int
}

func newEmptyBatches[H header.Header[H]]() *batches[H] {
return &batches[H]{batches: make([]*batch[H], 0, 8)}
}

// Append must take adjacent range of Headers.
// Returns one of the internal batches once it reaches the length limit
// with true.
func (bs *batches[H]) Append(headers ...H) (*batch[H], bool) {
// TODO: Check if headers are adjacent?
if len(headers) == 0 {
return nil, false
}
bs.batchesMu.Lock()
defer bs.batchesMu.Unlock()

// 1. Add headers as a new batch
newBatch := newBatch[H](len(headers))
newBatch.Append(headers...)
bs.batches = append(bs.batches, newBatch)

// 2. Ensure all the batches are sorted in descending order
slices.SortFunc(bs.batches, func(a, b *batch[H]) int {
return int(b.Head() - a.Head())
})

// 3. Merge adjacent and overlapping batches
mergeIdx := 0
for idx := 1; idx < len(bs.batches); idx++ {
curr := bs.batches[mergeIdx]
next := bs.batches[idx]

if !next.IsReadOnly() && curr.Tail()-1 <= next.Head() {
curr.Append(next.GetAll()...)
} else {
mergeIdx++
bs.batches[mergeIdx] = next
}
}
clear(bs.batches[mergeIdx+1:])
bs.batches = bs.batches[:mergeIdx+1]

// 4. Mark filled batches as read only and return if any
for i := len(bs.batches) - 1; i >= 0; i-- {
// Why in reverse? There might be several batches
// but only one is processed, so there needs to be prioritization
// which in this case is for lower heights.
b := bs.batches[i]
if b.Len() >= bs.batchLenLimit {
b.MarkReadOnly()
return b, true
}
}

return nil, false
}

func (bs *batches[H]) GetByHeight(height uint64) (H, error) {
bs.batchesMu.RLock()
defer bs.batchesMu.RUnlock()

for _, b := range bs.batches {
if height >= b.Tail() && height <= b.Head() {
return b.GetByHeight(height)
}
}

var zero H
return zero, header.ErrNotFound
}

func (bs *batches[H]) Get(hash header.Hash) (H, error) {
bs.batchesMu.RLock()
defer bs.batchesMu.RUnlock()

for _, b := range bs.batches {
h, err := b.Get(hash)
if err == nil {
return h, nil
}
}

var zero H
return zero, header.ErrNotFound
}

func (bs *batches[H]) Has(hash header.Hash) bool {
bs.batchesMu.RLock()
defer bs.batchesMu.RUnlock()

for _, b := range bs.batches {
if b.Has(hash) {
return true
}
}

return false
}

// batch keeps a range of adjacent headers and loosely mimics the Store
// interface.
//
// It keeps a mapping 'height -> header' and 'hash -> height'
// unlike the Store which keeps 'hash -> header' and 'height -> hash'.
// The approach simplifies implementation for the batch and
// makes it better optimized for the GetByHeight case which is what we need.
type batch[H header.Header[H]] struct {
lk sync.RWMutex
heights map[string]uint64
headers []H
headers []H // in descending order

readOnly bool
}

// newBatch creates the batch with the given pre-allocated size.
Expand All @@ -27,80 +134,83 @@ func newBatch[H header.Header[H]](size int) *batch[H] {
}
}

func (b *batch[H]) MarkReadOnly() {
b.readOnly = true
}

func (b *batch[H]) IsReadOnly() bool {
return b.readOnly
}

func (b *batch[H]) Head() uint64 {
if len(b.headers) == 0 {
return 0
}
return b.headers[0].Height()
}

func (b *batch[H]) Tail() uint64 {
if len(b.headers) == 0 {
return 0
}
return b.headers[len(b.headers)-1].Height()
}

// Len gives current length of the batch.
func (b *batch[H]) Len() int {
b.lk.RLock()
defer b.lk.RUnlock()
return len(b.headers)
}

// GetAll returns a slice of all the headers in the batch.
func (b *batch[H]) GetAll() []H {
b.lk.RLock()
defer b.lk.RUnlock()
return b.headers
}

// Get returns a header by its hash.
func (b *batch[H]) Get(hash header.Hash) H {
b.lk.RLock()
defer b.lk.RUnlock()
func (b *batch[H]) Get(hash header.Hash) (H, error) {
height, ok := b.heights[hash.String()]
if !ok {
var zero H
return zero
return zero, header.ErrNotFound
}

return b.getByHeight(height)
return b.GetByHeight(height)
}

// GetByHeight returns a header by its height.
func (b *batch[H]) GetByHeight(height uint64) H {
b.lk.RLock()
defer b.lk.RUnlock()
return b.getByHeight(height)
}

func (b *batch[H]) getByHeight(height uint64) H {
var (
ln = uint64(len(b.headers))
zero H
)
if ln == 0 {
return zero
}

head := b.headers[ln-1].Height()
base := head - ln
if height > head || height <= base {
return zero
func (b *batch[H]) GetByHeight(height uint64) (H, error) {
h := b.headers[b.Head()-height]
if h.Height() != height {
var zero H
return zero, header.ErrNotFound
}

return b.headers[height-base-1]
return h, nil
}

// Append appends new headers to the batch.
func (b *batch[H]) Append(headers ...H) {
b.lk.Lock()
defer b.lk.Unlock()
head, tail := b.Head(), b.Tail()
for _, h := range headers {
b.headers = append(b.headers, h)
b.heights[h.Hash().String()] = h.Height()
if h.Height() >= tail && h.Height() <= head {
// overwrite if exists already
b.headers[head-h.Height()] = h
} else {
// add new
b.headers = append(b.headers, h)
b.heights[h.Hash().String()] = h.Height()
}
}
}

// Has checks whether header by the hash is present in the batch.
func (b *batch[H]) Has(hash header.Hash) bool {
b.lk.RLock()
defer b.lk.RUnlock()
_, ok := b.heights[hash.String()]
return ok
}

// Reset cleans references to batched headers.
func (b *batch[H]) Reset() {
b.lk.Lock()
defer b.lk.Unlock()
b.headers = b.headers[:0]
for k := range b.heights {
delete(b.heights, k)
Expand Down
Loading
Loading