|
6 | 6 | "database/sql" |
7 | 7 | "encoding/binary" |
8 | 8 | "encoding/hex" |
| 9 | + "errors" |
9 | 10 | "fmt" |
10 | 11 | "math/big" |
11 | 12 | "sync/atomic" |
@@ -101,27 +102,39 @@ func (d *executionDepositsExporter) Init() error { |
101 | 102 | } |
102 | 103 | d.DepositMethod = depositMethod |
103 | 104 |
|
104 | | - // check if any log_index is missing, if yes we have to do a soft re-export |
105 | | - // ideally i would check for gaps in the merkletree-index column, but this is extremely annoying as its stored as little endian bytes in the db |
106 | | - var isV2Check bool |
107 | | - err = db.WriterDb.Get(&isV2Check, "select count(*) = count(log_index) as is_v2 from eth1_deposits") |
| 105 | + // check for gaps in the merkletree-index column |
| 106 | + log.Info("checking for gaps in eth1_deposits table") |
| 107 | + gapsInTable, err := db.GetGapsInEth1DepositsTable() |
108 | 108 | if err != nil { |
109 | 109 | return err |
110 | 110 | } |
| 111 | + log.Infof("found %v gaps in eth1_deposits table", len(gapsInTable)) |
111 | 112 |
|
112 | | - if isV2Check { |
113 | | - // get latest block from db |
114 | | - err = db.WriterDb.Get(&d.LastExportedBlock, "select block_number from eth1_deposits order by block_number desc limit 1") |
| 113 | + for _, gap := range gapsInTable { |
| 114 | + log.Infof("gap in eth1_deposits table: %v", gap) |
| 115 | + deposits, err := d.fetchDeposits(uint64(gap.FromBlock), uint64(gap.ToBlock)) |
115 | 116 | if err != nil { |
116 | | - if err == sql.ErrNoRows { |
117 | | - d.LastExportedBlock = utils.Config.Indexer.ELDepositContractFirstBlock |
118 | | - } else { |
119 | | - return err |
120 | | - } |
| 117 | + return err |
| 118 | + } |
| 119 | + // can return more than the expected missing deposits as the start and end blocks are inclusive |
| 120 | + if int64(len(deposits)) < gap.MissingCount { |
| 121 | + return fmt.Errorf("only %d of %d expected deposits found for gap in eth1_deposits table: %v", len(deposits), gap.MissingCount, gap) |
| 122 | + } |
| 123 | + log.Infof("saving %v deposits", len(deposits)) |
| 124 | + err = d.saveDeposits(deposits) |
| 125 | + if err != nil { |
| 126 | + return err |
| 127 | + } |
| 128 | + } |
| 129 | + |
| 130 | + // get latest block from db |
| 131 | + err = db.WriterDb.Get(&d.LastExportedBlock, "select block_number from eth1_deposits order by block_number desc limit 1") |
| 132 | + if err != nil { |
| 133 | + if errors.Is(err, sql.ErrNoRows) { |
| 134 | + d.LastExportedBlock = utils.Config.Indexer.ELDepositContractFirstBlock |
| 135 | + } else { |
| 136 | + return err |
121 | 137 | } |
122 | | - } else { |
123 | | - log.Warnf("log_index is missing in eth1_deposits table, starting from the beginning") |
124 | | - d.LastExportedBlock = utils.Config.Indexer.ELDepositContractFirstBlock |
125 | 138 | } |
126 | 139 |
|
127 | 140 | val, err := db.PersistentRedisDbClient.Get(context.Background(), d.LastExportedFinalizedBlockRedisKey).Uint64() |
|
0 commit comments