Skip to content

Commit 77513b0

Browse files
committed
feat(dash-exporter): use metadata table for rolling gen
1 parent 7b868a1 commit 77513b0

File tree

3 files changed

+104
-36
lines changed

3 files changed

+104
-36
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
-- +goose Up
2+
-- +goose StatementBegin
3+
CREATE TABLE IF NOT EXISTS _exporter_rolling_metadata (
4+
rolling_name String,
5+
last_epoch Int64,
6+
updated_at DateTime DEFAULT now()
7+
)
8+
ENGINE = ReplacingMergeTree
9+
ORDER BY (last_epoch, rolling_name)
10+
PRIMARY KEY (last_epoch, rolling_name)
11+
SETTINGS non_replicated_deduplication_window = 2048, replicated_deduplication_window = 2048;
12+
-- +goose StatementEnd
13+
14+
-- +goose Down
15+
-- +goose StatementBegin
16+
DROP TABLE IF EXISTS _exporter_rolling_metadata;
17+
-- +goose StatementEnd

backend/pkg/exporter/db/db.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2331,8 +2331,47 @@ func CheckIfAggregateIsBotchedEpochStart(a AggregateType, t []AggregateBackfillM
23312331
return is_dirty, nil
23322332
}
23332333

2334+
func CheckIfRollingIsGenerated(rolling Rollings, last_epoch int64) (bool, error) {
2335+
var exists bool
2336+
q := goqu.Dialect("postgres").Select(
2337+
goqu.COUNT(goqu.Star()).Gt(0).As("exists"),
2338+
).
2339+
From(goqu.T(ExporterRollingMetadataTableName)).
2340+
Where(
2341+
goqu.I("rolling_name").Eq(string(rolling)),
2342+
goqu.I("last_epoch").Gte(last_epoch),
2343+
)
2344+
sql, args, err := q.Prepared(true).ToSQL()
2345+
if err != nil {
2346+
return false, fmt.Errorf("error building query to check if rolling is generated: %w", err)
2347+
}
2348+
2349+
err = db.ClickHouseWriter.Get(&exists, sql, args...)
2350+
if err != nil {
2351+
return false, fmt.Errorf("error checking if rolling is generated: %w", err)
2352+
}
2353+
return exists, nil
2354+
}
2355+
2356+
func MarkRollingAsGenerated(rolling Rollings, last_epoch int64) error {
2357+
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
2358+
defer cancel()
2359+
batch, err := db.ClickHouseNativeWriter.PrepareBatch(ctx, `INSERT INTO `+ExporterRollingMetadataTableName)
2360+
if err != nil {
2361+
return fmt.Errorf("error preparing batch: %w", err)
2362+
}
2363+
if err := batch.Append(rolling, last_epoch); err != nil {
2364+
return fmt.Errorf("error appending row to batch: %w", err)
2365+
}
2366+
if err := batch.Send(); err != nil {
2367+
return fmt.Errorf("error sending batch: %w", err)
2368+
}
2369+
return nil
2370+
}
2371+
23342372
const ExporterMetadataTableName = "_exporter_metadata" // look i hate metadata tables as much as the next guy but this is a necessary evil
23352373
const ExporterBackfillMetadataTableName = "_exporter_backfill_metadata"
2374+
const ExporterRollingMetadataTableName = "_exporter_rolling_metadata"
23362375
const ExporterAggregateBackfillMetadataTableName = "_exporter_aggregate_backfill_metadata"
23372376
const EpochWriterSink = "_insert_sink_validator_dashboard_data_epoch"
23382377
const BackfillRoiSink = "_insert_sink_backfill_validator_dashboard_data_roi"

backend/pkg/exporter/modules/dashboard_data_rollings.go

Lines changed: 48 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -52,16 +52,6 @@ func (d *dashboardData) handleRollings() error {
5252
if err != nil {
5353
return errors.Wrap(err, "failed to do rollings")
5454
}
55-
for _, rolling := range rollings {
56-
rolling := rolling
57-
eg.Go(func() error {
58-
return d.swapRollingTables(rolling)
59-
})
60-
}
61-
err = eg.Wait()
62-
if err != nil {
63-
return errors.Wrap(err, "failed to swap rolling tables")
64-
}
6555
return nil
6656
}
6757

@@ -87,23 +77,46 @@ func (d *dashboardData) fillUnsafeRolling(rolling edb.Rollings) error {
8777
}
8878
metrics.State.WithLabelValues("dashboard_data_exporter_finished_epoch").Set(float64(finishedEpoch))
8979
// if finishedEpoch is not the same as the safeepoch we skip updating the rolling so resyncing after falling back is fast
90-
if safeEpoch := d.latestSafeEpoch.Load(); finishedEpoch != safeEpoch {
91-
d.log.Infof("skipping rolling %s update, finished epoch %d, safe epoch %d", rolling, finishedEpoch, safeEpoch)
92-
return nil
93-
}
94-
rollingEpoch, err := edb.GetRollingLastEpoch(rolling)
80+
/*
81+
if safeEpoch := d.latestSafeEpoch.Load(); finishedEpoch != safeEpoch {
82+
d.log.Infof("skipping rolling %s update, finished epoch %d, safe epoch %d", rolling, finishedEpoch, safeEpoch)
83+
return nil
84+
}
85+
*/
86+
87+
// check if we have work to do
88+
rollingGenerated, err := edb.CheckIfRollingIsGenerated(rolling, finishedEpoch)
9589
if err != nil {
96-
return errors.Wrap(err, "failed to get rolling last epoch")
90+
return errors.Wrap(err, "failed to check if rolling is generated")
9791
}
98-
metrics.State.WithLabelValues(fmt.Sprintf("dashboard_data_exporter_rolling_%s_epoch", rolling)).Set(float64(rollingEpoch))
99-
if rollingEpoch >= finishedEpoch {
100-
d.log.Debugf("rolling %s is up to date", rolling)
92+
if rollingGenerated {
93+
d.log.Debugf("rolling %s is already generated for epoch %d", rolling, finishedEpoch)
10194
return nil
10295
}
96+
97+
// metrics crap
98+
defer func() {
99+
rollingEpoch, err := edb.GetRollingLastEpoch(rolling)
100+
if err != nil {
101+
d.log.Error(err, "failed to get rolling last epoch", 0)
102+
return
103+
}
104+
metrics.State.WithLabelValues(fmt.Sprintf("dashboard_data_exporter_rolling_%s_epoch", rolling)).Set(float64(rollingEpoch))
105+
}()
103106
defer func() {
104107
metrics.TaskDuration.WithLabelValues(fmt.Sprintf("dashboard_data_exporter_rolling_%s_overall", rolling)).Observe(time.Since(start).Seconds())
105108
}()
106-
d.log.Infof("rolling %s is outdated, latest epoch %d, latest finished epoch %d", rolling, rollingEpoch, finishedEpoch)
109+
110+
// also cleanup crap
111+
defer func() {
112+
err := edb.NukeUnsafeRollingTable(rolling)
113+
if err != nil {
114+
d.log.Error(err, "failed to nuke unsafe rolling table", 0)
115+
}
116+
}()
117+
118+
//d.log.Infof("rolling %s is outdated, latest epoch %d, latest finished epoch %d", rolling, rollingEpoch, finishedEpoch)
119+
d.log.Infof("rolling %s needs generation for epoch %d", rolling, finishedEpoch)
107120

108121
now := time.Now()
109122
// next, nuke the unsafe rolling tables to prepare them for us to fill them
@@ -112,6 +125,7 @@ func (d *dashboardData) fillUnsafeRolling(rolling edb.Rollings) error {
112125
return errors.Wrap(err, "failed to nuke unsafe rolling table")
113126
}
114127
metrics.TaskDuration.WithLabelValues(fmt.Sprintf("dashboard_data_exporter_rolling_%s_nuke_unsafe", rolling)).Observe(time.Since(now).Seconds())
128+
115129
// now we fetch the start & end for each pre-aggregated table we use
116130
minTs := utils.EpochToTime(uint64(finishedEpoch)).Add(-rolling.GetDuration())
117131
// we also need to add one epoch to the minTs because each epoch timestamp is the start of the epoch
@@ -141,6 +155,7 @@ func (d *dashboardData) fillUnsafeRolling(rolling edb.Rollings) error {
141155
metrics.TaskDuration.WithLabelValues(fmt.Sprintf("dashboard_data_exporter_rolling_%s_minmax", table)).Observe(time.Since(now).Seconds())
142156
metrics.TaskDuration.WithLabelValues(fmt.Sprintf("dashboard_data_exporter_rolling_%s_minmax", rolling)).Observe(time.Since(now).Seconds())
143157
}
158+
144159
// now the transfer logic for each source
145160
eg := errgroup.Group{}
146161
eg.SetLimit(int(utils.Config.DashboardExporter.RollingPartsInParallel))
@@ -154,6 +169,10 @@ func (d *dashboardData) fillUnsafeRolling(rolling edb.Rollings) error {
154169
eg.Go(func() error {
155170
d.log.Infof("transferring rolling source %s to rolling %s", source, rolling)
156171
now := time.Now()
172+
// add caching for bounded source reads that dont change? so hourly read for the 7d can be reused for up to 9 rolling gen, for example
173+
// daily could be reused for up to a day for >90d rollings
174+
// basically if a min and max is defined it can be cached
175+
// ye we should really be doing this
157176
err := edb.TransferRollingSourceToRolling(rolling, source, *minmax)
158177
if err != nil {
159178
return errors.Wrap(err, "failed to transfer rolling source to rolling")
@@ -170,28 +189,21 @@ func (d *dashboardData) fillUnsafeRolling(rolling edb.Rollings) error {
170189
if err != nil {
171190
return errors.Wrap(err, "failed to transfer all rolling sources")
172191
}
173-
return nil
174-
}
175192

176-
func (d *dashboardData) swapRollingTables(rolling edb.Rollings) error {
177-
now := time.Now()
178-
// swap or not, we want unsafe clean so clickhouse doesnt waste compute on it. if it fails the next attempt would nuke it at startup anyways
179-
defer func() {
180-
err := edb.NukeUnsafeRollingTable(rolling)
181-
if err != nil {
182-
d.log.Error(err, "failed to nuke unsafe rolling table", 0)
183-
}
184-
}()
185-
err := edb.SwapRollingTables(rolling)
193+
// now we swap the tables
194+
err = edb.SwapRollingTables(rolling)
186195
if err != nil {
187196
return errors.Wrap(err, "failed to swap rolling tables")
188197
}
189-
metrics.TaskDuration.WithLabelValues(fmt.Sprintf("dashboard_data_exporter_rolling_%s_swap", rolling)).Observe(time.Since(now).Seconds()) // update metric after run
198+
metrics.TaskDuration.WithLabelValues(fmt.Sprintf("dashboard_data_exporter_rolling_%s_swap", rolling)).Observe(time.Since(now).Seconds())
190199

191-
rollingEpoch, err := edb.GetRollingLastEpoch(rolling)
200+
// mark rolling as generated for this epoch
201+
err = edb.MarkRollingAsGenerated(rolling, finishedEpoch)
192202
if err != nil {
193-
d.log.Error(err, "failed to get rolling last epoch", 0)
203+
return errors.Wrap(err, "failed to mark rolling as generated")
194204
}
195-
metrics.State.WithLabelValues(fmt.Sprintf("dashboard_data_exporter_rolling_%s_epoch", rolling)).Set(float64(rollingEpoch))
205+
206+
d.log.Infof("completed rolling %s generation for epoch %d in %s", rolling, finishedEpoch, time.Since(start))
207+
196208
return nil
197209
}

0 commit comments

Comments
 (0)