Skip to content

Allow store gateway to ignore syncing blocks older than certain time #6830

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
* [ENHANCEMENT] Compactor: Optimize cleaner run time. #6815
* [ENHANCEMENT] Parquet Storage: Allow percentage based dynamic shard size for Parquet Converter. #6817
* [ENHANCEMENT] Query Frontend: Enhance the performance of the JSON codec. #6816
* [ENHANCEMENT] Store Gateway: Allow to ignore syncing blocks older than certain time using `ignore_blocks_before`. #6830
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576
Expand Down
5 changes: 5 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -1377,6 +1377,11 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.ignore-blocks-within
[ignore_blocks_within: <duration> | default = 0s]

# The blocks created before `now() - ignore_blocks_before` will not be
# synced. 0 to disable.
# CLI flag: -blocks-storage.bucket-store.ignore-blocks-before
[ignore_blocks_before: <duration> | default = 0s]

bucket_index:
# True to enable querier and store-gateway to discover blocks in the
# storage via bucket index instead of bucket scanning.
Expand Down
5 changes: 5 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -1498,6 +1498,11 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.ignore-blocks-within
[ignore_blocks_within: <duration> | default = 0s]

# The blocks created before `now() - ignore_blocks_before` will not be
# synced. 0 to disable.
# CLI flag: -blocks-storage.bucket-store.ignore-blocks-before
[ignore_blocks_before: <duration> | default = 0s]

bucket_index:
# True to enable querier and store-gateway to discover blocks in the
# storage via bucket index instead of bucket scanning.
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1952,6 +1952,11 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.ignore-blocks-within
[ignore_blocks_within: <duration> | default = 0s]

# The blocks created before `now() - ignore_blocks_before` will not be synced.
# 0 to disable.
# CLI flag: -blocks-storage.bucket-store.ignore-blocks-before
[ignore_blocks_before: <duration> | default = 0s]

bucket_index:
# True to enable querier and store-gateway to discover blocks in the storage
# via bucket index instead of bucket scanning.
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ type BucketStoreConfig struct {
MatchersCacheMaxItems int `yaml:"matchers_cache_max_items"`
IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay"`
IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within"`
IgnoreBlocksBefore time.Duration `yaml:"ignore_blocks_before"`
BucketIndex BucketIndexConfig `yaml:"bucket_index"`
BlockDiscoveryStrategy string `yaml:"block_discovery_strategy"`

Expand Down Expand Up @@ -364,6 +365,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
"The idea of ignore-deletion-marks-delay is to ignore blocks that are marked for deletion with some delay. This ensures store can still serve blocks that are meant to be deleted but do not have a replacement yet. "+
"Default is 6h, half of the default value for -compactor.deletion-delay.")
f.DurationVar(&cfg.IgnoreBlocksWithin, "blocks-storage.bucket-store.ignore-blocks-within", 0, "The blocks created since `now() - ignore_blocks_within` will not be synced. This should be used together with `-querier.query-store-after` to filter out the blocks that are too new to be queried. A reasonable value for this flag would be `-querier.query-store-after - blocks-storage.bucket-store.bucket-index.max-stale-period` to give some buffer. 0 to disable.")
f.DurationVar(&cfg.IgnoreBlocksBefore, "blocks-storage.bucket-store.ignore-blocks-before", 0, "The blocks created before `now() - ignore_blocks_before` will not be synced. 0 to disable.")
f.IntVar(&cfg.PostingOffsetsInMemSampling, "blocks-storage.bucket-store.posting-offsets-in-mem-sampling", store.DefaultPostingOffsetInMemorySampling, "Controls what is the ratio of postings offsets that the store will hold in memory.")
f.BoolVar(&cfg.IndexHeaderLazyLoadingEnabled, "blocks-storage.bucket-store.index-header-lazy-loading-enabled", false, "If enabled, store-gateway will lazily memory-map an index-header only once required by a query.")
f.DurationVar(&cfg.IndexHeaderLazyLoadingIdleTimeout, "blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout", 20*time.Minute, "If index-header lazy loading is enabled and this setting is > 0, the store-gateway will release memory-mapped index-headers after 'idle timeout' inactivity.")
Expand Down
17 changes: 16 additions & 1 deletion pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block"
thanos_metadata "github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
thanos_model "github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/pool"
"github.com/thanos-io/thanos/pkg/store"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
Expand Down Expand Up @@ -548,7 +550,20 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
fetcherReg := prometheus.NewRegistry()

// The sharding strategy filter MUST be before the ones we create here (order matters).
filters := append([]block.MetadataFilter{NewShardingMetadataFilterAdapter(userID, u.shardingStrategy)}, []block.MetadataFilter{
filters := []block.MetadataFilter{NewShardingMetadataFilterAdapter(userID, u.shardingStrategy)}

if u.cfg.BucketStore.IgnoreBlocksBefore > 0 {
// We don't want to filter out any blocks for max time.
// Set a positive duration so we can always load blocks till now.
// IgnoreBlocksWithin
filterMaxTimeDuration := model.Duration(time.Second)
filterMinTime := thanos_model.TimeOrDurationValue{}
ignoreBlocksBefore := -model.Duration(u.cfg.BucketStore.IgnoreBlocksBefore)
filterMinTime.Dur = &ignoreBlocksBefore
filters = append(filters, block.NewTimePartitionMetaFilter(filterMinTime, thanos_model.TimeOrDurationValue{Dur: &filterMaxTimeDuration}))
}

filters = append(filters, []block.MetadataFilter{
block.NewConsistencyDelayMetaFilter(userLogger, u.cfg.BucketStore.ConsistencyDelay, fetcherReg),
// Use our own custom implementation.
NewIgnoreDeletionMarkFilter(userLogger, userBkt, u.cfg.BucketStore.IgnoreDeletionMarksDelay, u.cfg.BucketStore.MetaSyncConcurrency),
Expand Down
73 changes: 73 additions & 0 deletions pkg/storegateway/bucket_stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,79 @@ func TestBucketStores_Series_ShouldNotCheckMaxInflightRequestsIfTheLimitIsDisabl
assert.Equal(t, 1, len(series))
}

func TestBucketStores_SyncBlocksWithIgnoreBlocksBefore(t *testing.T) {
t.Parallel()

const userID = "user-1"
const metricName = "test_metric"

ctx := context.Background()
cfg := prepareStorageConfig(t)

// Configure IgnoreBlocksBefore to filter out blocks older than 2 hours
cfg.BucketStore.IgnoreBlocksBefore = 2 * time.Hour

storageDir := t.TempDir()

// Create blocks with different timestamps
now := time.Now()

// Block 1: Very old block (should be ignored - time-excluded)
oldBlockTime := now.Add(-5 * time.Hour)
generateStorageBlock(t, storageDir, userID, metricName+"_old",
oldBlockTime.UnixMilli(), oldBlockTime.Add(time.Hour).UnixMilli(), 15)

// Block 2: Recent block (should be synced)
recentBlockTime := now.Add(-1 * time.Hour)
generateStorageBlock(t, storageDir, userID, metricName+"_recent",
recentBlockTime.UnixMilli(), recentBlockTime.Add(time.Hour).UnixMilli(), 15)

// Block 3: Current block (should be synced)
currentBlockTime := now.Add(-30 * time.Minute)
generateStorageBlock(t, storageDir, userID, metricName+"_current",
currentBlockTime.UnixMilli(), currentBlockTime.Add(time.Hour).UnixMilli(), 15)

bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)

reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil),
objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)

// Perform initial sync
require.NoError(t, stores.InitialSync(ctx))

// Verify that only recent and current blocks are loaded
// The old block should be filtered out by IgnoreBlocksBefore (time-excluded)
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_blocks_meta_synced Reflects current state of synced blocks (over all tenants).
# TYPE cortex_blocks_meta_synced gauge
cortex_blocks_meta_synced{state="corrupted-meta-json"} 0
cortex_blocks_meta_synced{state="duplicate"} 0
cortex_blocks_meta_synced{state="failed"} 0
cortex_blocks_meta_synced{state="label-excluded"} 0
cortex_blocks_meta_synced{state="loaded"} 2
cortex_blocks_meta_synced{state="marked-for-deletion"} 0
cortex_blocks_meta_synced{state="marked-for-no-compact"} 0
cortex_blocks_meta_synced{state="no-meta-json"} 0
cortex_blocks_meta_synced{state="time-excluded"} 1
cortex_blocks_meta_synced{state="too-fresh"} 0
# HELP cortex_blocks_meta_syncs_total Total blocks metadata synchronization attempts
# TYPE cortex_blocks_meta_syncs_total counter
cortex_blocks_meta_syncs_total 3
# HELP cortex_bucket_store_blocks_meta_sync_failures_total Total blocks metadata synchronization failures
# TYPE cortex_bucket_store_blocks_meta_sync_failures_total counter
cortex_bucket_store_blocks_meta_sync_failures_total 0
# HELP cortex_bucket_store_block_loads_total Total number of remote block loading attempts.
# TYPE cortex_bucket_store_block_loads_total counter
cortex_bucket_store_block_loads_total 2
# HELP cortex_bucket_store_blocks_loaded Number of currently loaded blocks.
# TYPE cortex_bucket_store_blocks_loaded gauge
cortex_bucket_store_blocks_loaded{user="user-1"} 2
`), "cortex_bucket_store_block_loads_total", "cortex_bucket_store_blocks_loaded", "cortex_blocks_meta_synced"))
}

func prepareStorageConfig(t *testing.T) cortex_tsdb.BlocksStorageConfig {
cfg := cortex_tsdb.BlocksStorageConfig{}
flagext.DefaultValues(&cfg)
Expand Down
Loading