From 6ff046054001fcb86641c14c911c313dccdcab94 Mon Sep 17 00:00:00 2001 From: Angith Date: Sun, 31 Aug 2025 23:47:07 +0530 Subject: [PATCH 1/6] feat(parquetconverter): add support for additional sort columns during Parquet file generation Signed-off-by: Angith --- docs/configuration/config-file-reference.md | 5 ++ docs/guides/parquet-mode.md | 8 +++ pkg/parquetconverter/converter.go | 19 +++++- pkg/parquetconverter/converter_test.go | 73 ++++++++++++++++++++- pkg/util/validation/limits.go | 11 +++- 5 files changed, 110 insertions(+), 6 deletions(-) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index be3b38f6e65..bbb2fdccfa3 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -186,6 +186,11 @@ parquet_converter: # CLI flag: -parquet-converter.file-buffer-enabled [file_buffer_enabled: | default = true] + # Configure the sort columns, in order of precedence, to improve query performance. + # These will be applied during Parquet file generation. + # CLI flag: -parquet-converter.sort-columns + [sort_columns: | default = []] + # Local directory path for caching TSDB blocks during parquet conversion. # CLI flag: -parquet-converter.data-dir [data_dir: | default = "./data"] diff --git a/docs/guides/parquet-mode.md b/docs/guides/parquet-mode.md index 3bec5d11147..978a7669367 100644 --- a/docs/guides/parquet-mode.md +++ b/docs/guides/parquet-mode.md @@ -83,6 +83,9 @@ parquet_converter: # Enable file buffering to reduce memory usage file_buffer_enabled: true + # Defines additional sort columns applied during Parquet file generation. + sort_columns: ["label1", "label2"] + # Ring configuration for distributed conversion ring: kvstore: @@ -106,6 +109,9 @@ limits: # Shard size for shuffle sharding (0 = disabled) parquet_converter_tenant_shard_size: 0.8 + + # Defines sort columns applied during Parquet file generation for specific tenants + parquet_converter_sort_columns: ["label1", "label2"] ``` You can also configure per-tenant settings using runtime configuration: @@ -115,6 +121,7 @@ overrides: tenant-1: parquet_converter_enabled: true parquet_converter_tenant_shard_size: 2 + parquet_converter_sort_columns: ["cluster", "namespace"] tenant-2: parquet_converter_enabled: false ``` @@ -280,6 +287,7 @@ cortex_parquet_queryable_cache_misses_total 1. **Row Group Size**: Adjust `max_rows_per_row_group` based on your query patterns 2. **Cache Size**: Tune `parquet_queryable_shard_cache_size` based on available memory 3. **Concurrency**: Adjust `meta_sync_concurrency` based on object storage performance +4. **Sort Columns**: Configure `sort_columns` based on your most common query filters to improve query performance ### Fallback Configuration diff --git a/pkg/parquetconverter/converter.go b/pkg/parquetconverter/converter.go index ccfcdd0da24..b3d4112145b 100644 --- a/pkg/parquetconverter/converter.go +++ b/pkg/parquetconverter/converter.go @@ -39,6 +39,7 @@ import ( "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" cortex_errors "github.com/cortexproject/cortex/pkg/util/errors" + "github.com/cortexproject/cortex/pkg/util/flagext" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/validation" @@ -58,6 +59,7 @@ type Config struct { ConversionInterval time.Duration `yaml:"conversion_interval"` MaxRowsPerRowGroup int `yaml:"max_rows_per_row_group"` FileBufferEnabled bool `yaml:"file_buffer_enabled"` + SortColumns []string `yaml:"sort_columns"` DataDir string `yaml:"data_dir"` @@ -109,6 +111,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.MaxRowsPerRowGroup, "parquet-converter.max-rows-per-row-group", 1e6, "Maximum number of time series per parquet row group. Larger values improve compression but may reduce performance during reads.") f.DurationVar(&cfg.ConversionInterval, "parquet-converter.conversion-interval", time.Minute, "How often to check for new TSDB blocks to convert to parquet format.") f.BoolVar(&cfg.FileBufferEnabled, "parquet-converter.file-buffer-enabled", true, "Enable disk-based write buffering to reduce memory consumption during parquet file generation.") + f.Var((*flagext.StringSlice)(&cfg.SortColumns), "parquet-converter.sort-columns", "Configure the sort columns, in order of precedence, to improve query performance. These will be applied during parquet file generation.") } func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Converter, error) { @@ -126,6 +129,13 @@ func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockR } func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides, usersScanner users.Scanner) *Converter { + // Create base sort columns with metric name as the primary sort column + sortColumns := []string{labels.MetricName} + if len(cfg.SortColumns) > 0 { + sortColumns = append(sortColumns, cfg.SortColumns...) + } + cfg.SortColumns = sortColumns + c := &Converter{ cfg: cfg, reg: registerer, @@ -139,7 +149,7 @@ func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex metrics: newMetrics(registerer), bkt: bkt, baseConverterOptions: []convert.ConvertOption{ - convert.WithSortBy(labels.MetricName), + convert.WithSortBy(sortColumns...), convert.WithColDuration(time.Hour * 8), convert.WithRowGroupSize(cfg.MaxRowsPerRowGroup), }, @@ -430,6 +440,13 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin converterOpts := append(c.baseConverterOptions, convert.WithName(b.ULID.String())) + userConfiguredSortColumns := c.limits.ParquetConverterSortColumns(userID) + if len(userConfiguredSortColumns) > 0 { + sortColumns := []string{labels.MetricName} + sortColumns = append(sortColumns, userConfiguredSortColumns...) + converterOpts = append(converterOpts, convert.WithSortBy(sortColumns...)) + } + if c.cfg.FileBufferEnabled { converterOpts = append(converterOpts, convert.WithColumnPageBuffers(parquet.NewFileBufferPool(bdir, "buffers.*"))) } diff --git a/pkg/parquetconverter/converter_test.go b/pkg/parquetconverter/converter_test.go index 70b6469a7ba..66c29a5aa13 100644 --- a/pkg/parquetconverter/converter_test.go +++ b/pkg/parquetconverter/converter_test.go @@ -59,7 +59,19 @@ func TestConverter(t *testing.T) { flagext.DefaultValues(limits) limits.ParquetConverterEnabled = true - c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits) + userSpecificSortColumns := []string{"cluster", "namespace"} + + // Create a mock tenant limits implementation + tenantLimits := &mockTenantLimits{ + limits: map[string]*validation.Limits{ + user: { + ParquetConverterSortColumns: userSpecificSortColumns, + ParquetConverterEnabled: true, + }, + }, + } + + c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits, tenantLimits) ctx := context.Background() @@ -157,7 +169,7 @@ func prepareConfig() Config { return cfg } -func prepare(t *testing.T, cfg Config, bucketClient objstore.InstrumentedBucket, limits *validation.Limits) (*Converter, log.Logger, prometheus.Gatherer) { +func prepare(t *testing.T, cfg Config, bucketClient objstore.InstrumentedBucket, limits *validation.Limits, tenantLimits validation.TenantLimits) (*Converter, log.Logger, prometheus.Gatherer) { storageCfg := cortex_tsdb.BlocksStorageConfig{} blockRanges := cortex_tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour} flagext.DefaultValues(&storageCfg) @@ -176,7 +188,7 @@ func prepare(t *testing.T, cfg Config, bucketClient objstore.InstrumentedBucket, flagext.DefaultValues(limits) } - overrides := validation.NewOverrides(*limits, nil) + overrides := validation.NewOverrides(*limits, tenantLimits) scanner, err := users.NewScanner(cortex_tsdb.UsersScannerConfig{ Strategy: cortex_tsdb.UserScanStrategyList, @@ -350,6 +362,45 @@ func TestConverter_ShouldNotFailOnAccessDenyError(t *testing.T) { assert.Equal(t, 0.0, testutil.ToFloat64(converter.metrics.convertBlockFailures.WithLabelValues(userID))) } +func TestConverter_SortColumns(t *testing.T) { + bucketClient, err := filesystem.NewBucket(t.TempDir()) + require.NoError(t, err) + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.ParquetConverterEnabled = true + + testCases := []struct { + desc string + cfg Config + expectedSortColumns []string + }{ + { + desc: "no additional sort columns are added", + cfg: Config{ + MetaSyncConcurrency: 1, + DataDir: t.TempDir(), + }, + expectedSortColumns: []string{labels.MetricName}, + }, + { + desc: "additional sort columns are added", + cfg: Config{ + MetaSyncConcurrency: 1, + DataDir: t.TempDir(), + SortColumns: []string{"cluster", "namespace"}, + }, + expectedSortColumns: []string{labels.MetricName, "cluster", "namespace"}, + }, + } + for _, tC := range testCases { + t.Run(tC.desc, func(t *testing.T) { + c, _, _ := prepare(t, tC.cfg, objstore.WithNoopInstr(bucketClient), limits, nil) + assert.Equal(t, tC.expectedSortColumns, c.cfg.SortColumns, + "Converter should be created with the expected sort columns") + }) + } +} + // mockBucket implements objstore.Bucket for testing type mockBucket struct { objstore.Bucket @@ -384,3 +435,19 @@ func (r *RingMock) Get(key uint32, op ring.Operation, bufDescs []ring.InstanceDe }, }, nil } + +// mockTenantLimits implements the validation.TenantLimits interface for testing +type mockTenantLimits struct { + limits map[string]*validation.Limits +} + +func (m *mockTenantLimits) ByUserID(userID string) *validation.Limits { + if limits, ok := m.limits[userID]; ok { + return limits + } + return nil +} + +func (m *mockTenantLimits) AllByUserID() map[string]*validation.Limits { + return m.limits +} diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index fd077ebd18c..2b945d14b08 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -219,8 +219,9 @@ type Limits struct { CompactorPartitionSeriesCount int64 `yaml:"compactor_partition_series_count" json:"compactor_partition_series_count"` // Parquet converter - ParquetConverterEnabled bool `yaml:"parquet_converter_enabled" json:"parquet_converter_enabled" doc:"hidden"` - ParquetConverterTenantShardSize float64 `yaml:"parquet_converter_tenant_shard_size" json:"parquet_converter_tenant_shard_size" doc:"hidden"` + ParquetConverterEnabled bool `yaml:"parquet_converter_enabled" json:"parquet_converter_enabled" doc:"hidden"` + ParquetConverterTenantShardSize float64 `yaml:"parquet_converter_tenant_shard_size" json:"parquet_converter_tenant_shard_size" doc:"hidden"` + ParquetConverterSortColumns []string `yaml:"parquet_converter_sort_columns" json:"parquet_converter_sort_columns" doc:"hidden"` // This config doesn't have a CLI flag registered here because they're registered in // their own original config struct. @@ -324,6 +325,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.Float64Var(&l.ParquetConverterTenantShardSize, "parquet-converter.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the parquet converter. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant. If the value is < 1 and > 0 the shard size will be a percentage of the total parquet converters.") f.BoolVar(&l.ParquetConverterEnabled, "parquet-converter.enabled", false, "If set, enables the Parquet converter to create the parquet files.") + f.Var((*flagext.StringSlice)(&l.ParquetConverterSortColumns), "parquet-converter.sort-columns", "Additional label names for specific tenants to sort by after metric name, in order of precedence. These are applied during Parquet file generation.") // Parquet Queryable enforced limits. f.IntVar(&l.ParquetMaxFetchedRowCount, "querier.parquet-queryable.max-fetched-row-count", 0, "The maximum number of rows that can be fetched when querying parquet storage. Each row maps to a series in a parquet file. This limit applies before materializing chunks. 0 to disable.") @@ -904,6 +906,11 @@ func (o *Overrides) ParquetConverterEnabled(userID string) bool { return o.GetOverridesForUser(userID).ParquetConverterEnabled } +// ParquetConverterSortColumns returns the additional sort columns for parquet files. +func (o *Overrides) ParquetConverterSortColumns(userID string) []string { + return o.GetOverridesForUser(userID).ParquetConverterSortColumns +} + // ParquetMaxFetchedRowCount returns the maximum number of rows that can be fetched when querying parquet storage. func (o *Overrides) ParquetMaxFetchedRowCount(userID string) int { return o.GetOverridesForUser(userID).ParquetMaxFetchedRowCount From 386ce103b36c56ae5d9134e70b532cfa69fa491b Mon Sep 17 00:00:00 2001 From: Angith Date: Mon, 1 Sep 2025 23:42:26 +0530 Subject: [PATCH 2/6] docs: update CHANGELOG.md to include support for sort columns during Parquet file generation Signed-off-by: Angith --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6fc6b0c741d..16620596e59 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ * [FEATURE] Querier: Support for configuring query optimizers and enabling XFunctions in the Thanos engine. #6873 * [FEATURE] Query Frontend: Add support /api/v1/format_query API for formatting queries. #6893 * [FEATURE] Query Frontend: Add support for /api/v1/parse_query API (experimental) to parse a PromQL expression and return it as a JSON-formatted AST (abstract syntax tree). #6978 +* [ENHANCEMENT] Parquet Storage: Add support for additional sort columns during Parquet file generation #7003 * [ENHANCEMENT] Ingester: Add `cortex_ingester_tsdb_wal_replay_unknown_refs_total` and `cortex_ingester_tsdb_wbl_replay_unknown_refs_total` metrics to track unknown series references during wal/wbl replaying. #6945 * [ENHANCEMENT] Ruler: Emit an error message when the rule synchronization fails. #6902 * [ENHANCEMENT] Querier: Support snappy and zstd response compression for `-querier.response-compression` flag. #6848 From 42b59a3409eee38b5eba3848a9135abfc9a06cfd Mon Sep 17 00:00:00 2001 From: Angith Date: Sun, 7 Sep 2025 23:05:48 +0530 Subject: [PATCH 3/6] fix: Remove duplicate flag registration for parquet-converter.sort-columns Signed-off-by: Angith --- docs/configuration/config-file-reference.md | 4 ++-- docs/guides/parquet-mode.md | 2 +- pkg/parquetconverter/converter.go | 18 +++++++++--------- pkg/parquetconverter/converter_test.go | 8 ++++---- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 894c88e736f..1bd35c2e4cd 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -186,10 +186,10 @@ parquet_converter: # CLI flag: -parquet-converter.file-buffer-enabled [file_buffer_enabled: | default = true] - # Configure the sort columns, in order of precedence, to improve query performance. + # Configure the additional sort columns, in order of precedence, to improve query performance. # These will be applied during Parquet file generation. # CLI flag: -parquet-converter.sort-columns - [sort_columns: | default = []] + [additional_sort_columns: | default = []] # Local directory path for caching TSDB blocks during parquet conversion. # CLI flag: -parquet-converter.data-dir diff --git a/docs/guides/parquet-mode.md b/docs/guides/parquet-mode.md index 978a7669367..5eaadd80405 100644 --- a/docs/guides/parquet-mode.md +++ b/docs/guides/parquet-mode.md @@ -84,7 +84,7 @@ parquet_converter: file_buffer_enabled: true # Defines additional sort columns applied during Parquet file generation. - sort_columns: ["label1", "label2"] + additional_sort_columns: ["label1", "label2"] # Ring configuration for distributed conversion ring: diff --git a/pkg/parquetconverter/converter.go b/pkg/parquetconverter/converter.go index b3d4112145b..60604af37c5 100644 --- a/pkg/parquetconverter/converter.go +++ b/pkg/parquetconverter/converter.go @@ -55,11 +55,11 @@ const ( var RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) type Config struct { - MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` - ConversionInterval time.Duration `yaml:"conversion_interval"` - MaxRowsPerRowGroup int `yaml:"max_rows_per_row_group"` - FileBufferEnabled bool `yaml:"file_buffer_enabled"` - SortColumns []string `yaml:"sort_columns"` + MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` + ConversionInterval time.Duration `yaml:"conversion_interval"` + MaxRowsPerRowGroup int `yaml:"max_rows_per_row_group"` + FileBufferEnabled bool `yaml:"file_buffer_enabled"` + AdditionalSortColumns []string `yaml:"additional_sort_columns"` DataDir string `yaml:"data_dir"` @@ -111,7 +111,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.MaxRowsPerRowGroup, "parquet-converter.max-rows-per-row-group", 1e6, "Maximum number of time series per parquet row group. Larger values improve compression but may reduce performance during reads.") f.DurationVar(&cfg.ConversionInterval, "parquet-converter.conversion-interval", time.Minute, "How often to check for new TSDB blocks to convert to parquet format.") f.BoolVar(&cfg.FileBufferEnabled, "parquet-converter.file-buffer-enabled", true, "Enable disk-based write buffering to reduce memory consumption during parquet file generation.") - f.Var((*flagext.StringSlice)(&cfg.SortColumns), "parquet-converter.sort-columns", "Configure the sort columns, in order of precedence, to improve query performance. These will be applied during parquet file generation.") + f.Var((*flagext.StringSlice)(&cfg.AdditionalSortColumns), "parquet-converter.additional-sort-columns", "Configure the additional sort columns, in order of precedence, to improve query performance. These will be applied during parquet file generation.") } func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Converter, error) { @@ -131,10 +131,10 @@ func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockR func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides, usersScanner users.Scanner) *Converter { // Create base sort columns with metric name as the primary sort column sortColumns := []string{labels.MetricName} - if len(cfg.SortColumns) > 0 { - sortColumns = append(sortColumns, cfg.SortColumns...) + if len(cfg.AdditionalSortColumns) > 0 { + sortColumns = append(sortColumns, cfg.AdditionalSortColumns...) } - cfg.SortColumns = sortColumns + cfg.AdditionalSortColumns = sortColumns c := &Converter{ cfg: cfg, diff --git a/pkg/parquetconverter/converter_test.go b/pkg/parquetconverter/converter_test.go index 66c29a5aa13..f3b323f6f94 100644 --- a/pkg/parquetconverter/converter_test.go +++ b/pkg/parquetconverter/converter_test.go @@ -385,9 +385,9 @@ func TestConverter_SortColumns(t *testing.T) { { desc: "additional sort columns are added", cfg: Config{ - MetaSyncConcurrency: 1, - DataDir: t.TempDir(), - SortColumns: []string{"cluster", "namespace"}, + MetaSyncConcurrency: 1, + DataDir: t.TempDir(), + AdditionalSortColumns: []string{"cluster", "namespace"}, }, expectedSortColumns: []string{labels.MetricName, "cluster", "namespace"}, }, @@ -395,7 +395,7 @@ func TestConverter_SortColumns(t *testing.T) { for _, tC := range testCases { t.Run(tC.desc, func(t *testing.T) { c, _, _ := prepare(t, tC.cfg, objstore.WithNoopInstr(bucketClient), limits, nil) - assert.Equal(t, tC.expectedSortColumns, c.cfg.SortColumns, + assert.Equal(t, tC.expectedSortColumns, c.cfg.AdditionalSortColumns, "Converter should be created with the expected sort columns") }) } From afe4e2a1c65ae3051d168af9541ba0a0536925cd Mon Sep 17 00:00:00 2001 From: Angith Date: Sun, 21 Sep 2025 18:38:49 +0530 Subject: [PATCH 4/6] fix: exluded sort columns from base converter options Signed-off-by: Angith --- docs/configuration/config-file-reference.md | 5 --- docs/guides/parquet-mode.md | 5 +-- pkg/parquetconverter/converter.go | 27 ++++---------- pkg/parquetconverter/converter_test.go | 39 --------------------- 4 files changed, 8 insertions(+), 68 deletions(-) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 1bd35c2e4cd..faec3da324a 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -186,11 +186,6 @@ parquet_converter: # CLI flag: -parquet-converter.file-buffer-enabled [file_buffer_enabled: | default = true] - # Configure the additional sort columns, in order of precedence, to improve query performance. - # These will be applied during Parquet file generation. - # CLI flag: -parquet-converter.sort-columns - [additional_sort_columns: | default = []] - # Local directory path for caching TSDB blocks during parquet conversion. # CLI flag: -parquet-converter.data-dir [data_dir: | default = "./data"] diff --git a/docs/guides/parquet-mode.md b/docs/guides/parquet-mode.md index 5eaadd80405..c79584dcb42 100644 --- a/docs/guides/parquet-mode.md +++ b/docs/guides/parquet-mode.md @@ -83,9 +83,6 @@ parquet_converter: # Enable file buffering to reduce memory usage file_buffer_enabled: true - # Defines additional sort columns applied during Parquet file generation. - additional_sort_columns: ["label1", "label2"] - # Ring configuration for distributed conversion ring: kvstore: @@ -287,7 +284,7 @@ cortex_parquet_queryable_cache_misses_total 1. **Row Group Size**: Adjust `max_rows_per_row_group` based on your query patterns 2. **Cache Size**: Tune `parquet_queryable_shard_cache_size` based on available memory 3. **Concurrency**: Adjust `meta_sync_concurrency` based on object storage performance -4. **Sort Columns**: Configure `sort_columns` based on your most common query filters to improve query performance +4. **Sort Columns**: Configure `parquet_converter_sort_columns` based on your most common query filters to improve query performance ### Fallback Configuration diff --git a/pkg/parquetconverter/converter.go b/pkg/parquetconverter/converter.go index 60604af37c5..477149705c7 100644 --- a/pkg/parquetconverter/converter.go +++ b/pkg/parquetconverter/converter.go @@ -39,7 +39,6 @@ import ( "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" cortex_errors "github.com/cortexproject/cortex/pkg/util/errors" - "github.com/cortexproject/cortex/pkg/util/flagext" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/validation" @@ -55,11 +54,10 @@ const ( var RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) type Config struct { - MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` - ConversionInterval time.Duration `yaml:"conversion_interval"` - MaxRowsPerRowGroup int `yaml:"max_rows_per_row_group"` - FileBufferEnabled bool `yaml:"file_buffer_enabled"` - AdditionalSortColumns []string `yaml:"additional_sort_columns"` + MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` + ConversionInterval time.Duration `yaml:"conversion_interval"` + MaxRowsPerRowGroup int `yaml:"max_rows_per_row_group"` + FileBufferEnabled bool `yaml:"file_buffer_enabled"` DataDir string `yaml:"data_dir"` @@ -111,7 +109,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.MaxRowsPerRowGroup, "parquet-converter.max-rows-per-row-group", 1e6, "Maximum number of time series per parquet row group. Larger values improve compression but may reduce performance during reads.") f.DurationVar(&cfg.ConversionInterval, "parquet-converter.conversion-interval", time.Minute, "How often to check for new TSDB blocks to convert to parquet format.") f.BoolVar(&cfg.FileBufferEnabled, "parquet-converter.file-buffer-enabled", true, "Enable disk-based write buffering to reduce memory consumption during parquet file generation.") - f.Var((*flagext.StringSlice)(&cfg.AdditionalSortColumns), "parquet-converter.additional-sort-columns", "Configure the additional sort columns, in order of precedence, to improve query performance. These will be applied during parquet file generation.") } func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Converter, error) { @@ -129,13 +126,6 @@ func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockR } func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides, usersScanner users.Scanner) *Converter { - // Create base sort columns with metric name as the primary sort column - sortColumns := []string{labels.MetricName} - if len(cfg.AdditionalSortColumns) > 0 { - sortColumns = append(sortColumns, cfg.AdditionalSortColumns...) - } - cfg.AdditionalSortColumns = sortColumns - c := &Converter{ cfg: cfg, reg: registerer, @@ -149,7 +139,6 @@ func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex metrics: newMetrics(registerer), bkt: bkt, baseConverterOptions: []convert.ConvertOption{ - convert.WithSortBy(sortColumns...), convert.WithColDuration(time.Hour * 8), convert.WithRowGroupSize(cfg.MaxRowsPerRowGroup), }, @@ -440,12 +429,10 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin converterOpts := append(c.baseConverterOptions, convert.WithName(b.ULID.String())) + sortColumns := []string{labels.MetricName} userConfiguredSortColumns := c.limits.ParquetConverterSortColumns(userID) - if len(userConfiguredSortColumns) > 0 { - sortColumns := []string{labels.MetricName} - sortColumns = append(sortColumns, userConfiguredSortColumns...) - converterOpts = append(converterOpts, convert.WithSortBy(sortColumns...)) - } + sortColumns = append(sortColumns, userConfiguredSortColumns...) + converterOpts = append(converterOpts, convert.WithSortBy(sortColumns...)) if c.cfg.FileBufferEnabled { converterOpts = append(converterOpts, convert.WithColumnPageBuffers(parquet.NewFileBufferPool(bdir, "buffers.*"))) diff --git a/pkg/parquetconverter/converter_test.go b/pkg/parquetconverter/converter_test.go index a34fe498c31..81caa86c63d 100644 --- a/pkg/parquetconverter/converter_test.go +++ b/pkg/parquetconverter/converter_test.go @@ -362,45 +362,6 @@ func TestConverter_ShouldNotFailOnAccessDenyError(t *testing.T) { assert.Equal(t, 0.0, testutil.ToFloat64(converter.metrics.convertBlockFailures.WithLabelValues(userID))) } -func TestConverter_SortColumns(t *testing.T) { - bucketClient, err := filesystem.NewBucket(t.TempDir()) - require.NoError(t, err) - limits := &validation.Limits{} - flagext.DefaultValues(limits) - limits.ParquetConverterEnabled = true - - testCases := []struct { - desc string - cfg Config - expectedSortColumns []string - }{ - { - desc: "no additional sort columns are added", - cfg: Config{ - MetaSyncConcurrency: 1, - DataDir: t.TempDir(), - }, - expectedSortColumns: []string{labels.MetricName}, - }, - { - desc: "additional sort columns are added", - cfg: Config{ - MetaSyncConcurrency: 1, - DataDir: t.TempDir(), - AdditionalSortColumns: []string{"cluster", "namespace"}, - }, - expectedSortColumns: []string{labels.MetricName, "cluster", "namespace"}, - }, - } - for _, tC := range testCases { - t.Run(tC.desc, func(t *testing.T) { - c, _, _ := prepare(t, tC.cfg, objstore.WithNoopInstr(bucketClient), limits, nil) - assert.Equal(t, tC.expectedSortColumns, c.cfg.AdditionalSortColumns, - "Converter should be created with the expected sort columns") - }) - } -} - // mockBucket implements objstore.Bucket for testing type mockBucket struct { objstore.Bucket From b457ded166c0ae57ee5b2e700d7601f9db8abb5b Mon Sep 17 00:00:00 2001 From: Angith Date: Wed, 24 Sep 2025 00:20:06 +0530 Subject: [PATCH 5/6] doc: updated config-file-reference.md with parquet_converter_sort_columns configuration Signed-off-by: Angith --- docs/configuration/config-file-reference.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 68873e26cb1..a642c4f3241 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4252,6 +4252,11 @@ query_rejection: # CLI flag: -parquet-converter.tenant-shard-size [parquet_converter_tenant_shard_size: | default = 0] +# Additional label names for specific tenants to sort by after metric name, in +# order of precedence. These are applied during Parquet file generation. +# CLI flag: -parquet-converter.sort-columns +[parquet_converter_sort_columns: | default = []] + # S3 server-side encryption type. Required to enable server-side encryption # overrides for a specific tenant. If not set, the default S3 client settings # are used. From 920c851bb357d5ac2ab69bc322cabc9b0b9d9f21 Mon Sep 17 00:00:00 2001 From: Angith Date: Wed, 24 Sep 2025 20:32:56 +0530 Subject: [PATCH 6/6] doc: cleaned up white noise Signed-off-by: Angith --- docs/guides/parquet-mode.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/guides/parquet-mode.md b/docs/guides/parquet-mode.md index c79584dcb42..c68dd691ba6 100644 --- a/docs/guides/parquet-mode.md +++ b/docs/guides/parquet-mode.md @@ -106,7 +106,7 @@ limits: # Shard size for shuffle sharding (0 = disabled) parquet_converter_tenant_shard_size: 0.8 - + # Defines sort columns applied during Parquet file generation for specific tenants parquet_converter_sort_columns: ["label1", "label2"] ```