Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions backend/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,12 @@ frontend-types:

addhooks:
git config core.hooksPath hooks

.PHONY: docker-build-amd64-bc
docker-build-amd64-bc:
mkdir -p bin
docker buildx build --platform linux/amd64 --load -t beaconchain-backend:amd64 -f Dockerfile --cache-to=type=local,dest=/tmp/.buildx-cache --cache-from=type=local,src=/tmp/.buildx-cache ..
docker create --platform linux/amd64 --name tmp-bc-extract beaconchain-backend:amd64
docker cp tmp-bc-extract:/usr/local/bin/bc ./bin/bc
docker rm -f tmp-bc-extract
docker rmi -f beaconchain-backend:amd64
44 changes: 44 additions & 0 deletions backend/pkg/commons/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2576,3 +2576,47 @@ func HasEventsForEpoch(epoch uint64) (bool, error) {

return count > 0, nil
}

func GetGapsInEth1DepositsTable() ([]types.GapInEth1DepositsTableRow, error) {
// the query converts the little endian encoded merkle tree index into a number and
// checks for gaps in the continuous series
query := `
WITH deposit_idx AS (
SELECT
(
SELECT SUM((get_byte(ed.merkletree_index, i)::bigint) << (8 * i))
FROM generate_series(0, length(ed.merkletree_index) - 1) AS gs(i)
) AS idx,
ed.block_number
FROM eth1_deposits AS ed
),
ordered AS (
SELECT
idx,
block_number,
lag(idx) OVER (ORDER BY idx DESC) AS prev_idx,
lag(block_number) OVER (ORDER BY idx DESC) AS prev_block_number
FROM deposit_idx
)
SELECT
prev_idx AS higher_idx,
prev_block_number AS to_block,
idx AS lower_idx,
block_number AS from_block,
(prev_idx - idx - 1) AS missing_count,
(idx + 1) AS missing_low, -- first missing value
(prev_idx - 1) AS missing_high -- last missing value
FROM ordered
WHERE prev_idx IS NOT NULL
AND (prev_idx - idx) > 1
ORDER BY higher_idx DESC;
`
var res []types.GapInEth1DepositsTableRow

err := ReaderDb.Select(&res, query)
if err != nil {
return nil, err
}

return res, nil
}
15 changes: 15 additions & 0 deletions backend/pkg/commons/types/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,3 +777,18 @@ type SlashingInfo struct {
SlashedValidatorPubkey []byte `db:"slashedvalidator_pubkey"`
Reason string `db:"reason"`
}

type GapInEth1DepositsTableRow struct {
HigherIndex int64 `db:"higher_idx"`
ToBlock int64 `db:"to_block"`
LowerIndex int64 `db:"lower_idx"`
FromBlock int64 `db:"from_block"`
MissingCount int64 `db:"missing_count"`
MissingLow int64 `db:"missing_low"`
MissingHigh int64 `db:"missing_high"`
}

func (gap GapInEth1DepositsTableRow) String() string {
return fmt.Sprintf("{HigherIndex: %d, ToBlock: %d, Index: %d, FromBlock: %d, MissingCount: %d, MissingLow: %d, MissingHigh: %d}",
gap.HigherIndex, gap.ToBlock, gap.LowerIndex, gap.FromBlock, gap.MissingCount, gap.MissingLow, gap.MissingHigh)
}
54 changes: 32 additions & 22 deletions backend/pkg/exporter/modules/execution_deposits_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"database/sql"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"math/big"
"sync/atomic"
Expand Down Expand Up @@ -101,27 +102,39 @@ func (d *executionDepositsExporter) Init() error {
}
d.DepositMethod = depositMethod

// check if any log_index is missing, if yes we have to do a soft re-export
// ideally i would check for gaps in the merkletree-index column, but this is extremely annoying as its stored as little endian bytes in the db
var isV2Check bool
err = db.WriterDb.Get(&isV2Check, "select count(*) = count(log_index) as is_v2 from eth1_deposits")
// check for gaps in the merkletree-index column
log.Info("checking for gaps in eth1_deposits table")
gapsInTable, err := db.GetGapsInEth1DepositsTable()
if err != nil {
return err
}
log.Infof("found %v gaps in eth1_deposits table", len(gapsInTable))

if isV2Check {
// get latest block from db
err = db.WriterDb.Get(&d.LastExportedBlock, "select block_number from eth1_deposits order by block_number desc limit 1")
for _, gap := range gapsInTable {
log.Infof("gap in eth1_deposits table: %v", gap)
deposits, err := d.fetchDeposits(uint64(gap.FromBlock), uint64(gap.ToBlock))
if err != nil {
if err == sql.ErrNoRows {
d.LastExportedBlock = utils.Config.Indexer.ELDepositContractFirstBlock
} else {
return err
}
return err
}
// can return more than the expected missing deposits as the start and end blocks are inclusive
if int64(len(deposits)) < gap.MissingCount {
return fmt.Errorf("only %d of %d expected deposits found for gap in eth1_deposits table: %v", len(deposits), gap.MissingCount, gap)
}
log.Infof("saving %v deposits", len(deposits))
err = d.saveDeposits(deposits)
if err != nil {
return err
}
}

// get latest block from db
err = db.WriterDb.Get(&d.LastExportedBlock, "select block_number from eth1_deposits order by block_number desc limit 1")
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
d.LastExportedBlock = utils.Config.Indexer.ELDepositContractFirstBlock
} else {
return err
}
} else {
log.Warnf("log_index is missing in eth1_deposits table, starting from the beginning")
d.LastExportedBlock = utils.Config.Indexer.ELDepositContractFirstBlock
}

val, err := db.PersistentRedisDbClient.Get(context.Background(), d.LastExportedFinalizedBlockRedisKey).Uint64()
Expand Down Expand Up @@ -561,7 +574,6 @@ func (d *executionDepositsExporter) batchRequestHeadersAndTxs(blocksToFetch []ui
elems := make([]gethrpc.BatchElem, 0, len(blocksToFetch)+len(txsToFetch))
headers := make(map[uint64]*gethtypes.Header, len(blocksToFetch))
txs := make(map[string]*gethtypes.Transaction, len(txsToFetch))
errors := make([]error, 0, len(blocksToFetch)+len(txsToFetch))

for _, b := range blocksToFetch {
header := &gethtypes.Header{}
Expand All @@ -573,7 +585,6 @@ func (d *executionDepositsExporter) batchRequestHeadersAndTxs(blocksToFetch []ui
Error: err,
})
headers[b] = header
errors = append(errors, err)
}

for _, txHashHex := range txsToFetch {
Expand All @@ -586,7 +597,6 @@ func (d *executionDepositsExporter) batchRequestHeadersAndTxs(blocksToFetch []ui
Error: err,
})
txs[txHashHex] = tx
errors = append(errors, err)
}

lenElems := len(elems)
Expand All @@ -596,7 +606,7 @@ func (d *executionDepositsExporter) batchRequestHeadersAndTxs(blocksToFetch []ui
}

for i := 0; (i * 100) < lenElems; i++ {
start := (i * 100)
start := i * 100
end := start + 100

if end > lenElems {
Expand All @@ -609,9 +619,9 @@ func (d *executionDepositsExporter) batchRequestHeadersAndTxs(blocksToFetch []ui
}
}

for _, e := range errors {
if e != nil {
return nil, nil, e
for _, e := range elems {
if e.Error != nil {
return nil, nil, e.Error
}
}

Expand Down
Loading