diff --git a/CHANGELOG.md b/CHANGELOG.md index 553c6733e9d..392bff0582a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ * [FEATURE] Querier: Support for configuring query optimizers and enabling XFunctions in the Thanos engine. #6873 * [FEATURE] Query Frontend: Add support /api/v1/format_query API for formatting queries. #6893 * [FEATURE] Query Frontend: Add support for /api/v1/parse_query API (experimental) to parse a PromQL expression and return it as a JSON-formatted AST (abstract syntax tree). #6978 +* [ENHANCEMENT] Parquet Storage: Add support for additional sort columns during Parquet file generation #7003 * [ENHANCEMENT] Modernizes the entire codebase by using go modernize tool. #7005 * [ENHANCEMENT] Overrides Exporter: Expose all fields that can be converted to float64. Also, the label value `max_local_series_per_metric` got renamed to `max_series_per_metric`, and `max_local_series_per_user` got renamed to `max_series_per_user`. #6979 * [ENHANCEMENT] Ingester: Add `cortex_ingester_tsdb_wal_replay_unknown_refs_total` and `cortex_ingester_tsdb_wbl_replay_unknown_refs_total` metrics to track unknown series references during wal/wbl replaying. #6945 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 68873e26cb1..a642c4f3241 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4252,6 +4252,11 @@ query_rejection: # CLI flag: -parquet-converter.tenant-shard-size [parquet_converter_tenant_shard_size: | default = 0] +# Additional label names for specific tenants to sort by after metric name, in +# order of precedence. These are applied during Parquet file generation. +# CLI flag: -parquet-converter.sort-columns +[parquet_converter_sort_columns: | default = []] + # S3 server-side encryption type. Required to enable server-side encryption # overrides for a specific tenant. If not set, the default S3 client settings # are used. diff --git a/docs/guides/parquet-mode.md b/docs/guides/parquet-mode.md index 3bec5d11147..c68dd691ba6 100644 --- a/docs/guides/parquet-mode.md +++ b/docs/guides/parquet-mode.md @@ -106,6 +106,9 @@ limits: # Shard size for shuffle sharding (0 = disabled) parquet_converter_tenant_shard_size: 0.8 + + # Defines sort columns applied during Parquet file generation for specific tenants + parquet_converter_sort_columns: ["label1", "label2"] ``` You can also configure per-tenant settings using runtime configuration: @@ -115,6 +118,7 @@ overrides: tenant-1: parquet_converter_enabled: true parquet_converter_tenant_shard_size: 2 + parquet_converter_sort_columns: ["cluster", "namespace"] tenant-2: parquet_converter_enabled: false ``` @@ -280,6 +284,7 @@ cortex_parquet_queryable_cache_misses_total 1. **Row Group Size**: Adjust `max_rows_per_row_group` based on your query patterns 2. **Cache Size**: Tune `parquet_queryable_shard_cache_size` based on available memory 3. **Concurrency**: Adjust `meta_sync_concurrency` based on object storage performance +4. **Sort Columns**: Configure `parquet_converter_sort_columns` based on your most common query filters to improve query performance ### Fallback Configuration diff --git a/pkg/parquetconverter/converter.go b/pkg/parquetconverter/converter.go index ccfcdd0da24..477149705c7 100644 --- a/pkg/parquetconverter/converter.go +++ b/pkg/parquetconverter/converter.go @@ -139,7 +139,6 @@ func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex metrics: newMetrics(registerer), bkt: bkt, baseConverterOptions: []convert.ConvertOption{ - convert.WithSortBy(labels.MetricName), convert.WithColDuration(time.Hour * 8), convert.WithRowGroupSize(cfg.MaxRowsPerRowGroup), }, @@ -430,6 +429,11 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin converterOpts := append(c.baseConverterOptions, convert.WithName(b.ULID.String())) + sortColumns := []string{labels.MetricName} + userConfiguredSortColumns := c.limits.ParquetConverterSortColumns(userID) + sortColumns = append(sortColumns, userConfiguredSortColumns...) + converterOpts = append(converterOpts, convert.WithSortBy(sortColumns...)) + if c.cfg.FileBufferEnabled { converterOpts = append(converterOpts, convert.WithColumnPageBuffers(parquet.NewFileBufferPool(bdir, "buffers.*"))) } diff --git a/pkg/parquetconverter/converter_test.go b/pkg/parquetconverter/converter_test.go index fbaa947b95e..81caa86c63d 100644 --- a/pkg/parquetconverter/converter_test.go +++ b/pkg/parquetconverter/converter_test.go @@ -59,7 +59,19 @@ func TestConverter(t *testing.T) { flagext.DefaultValues(limits) limits.ParquetConverterEnabled = true - c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits) + userSpecificSortColumns := []string{"cluster", "namespace"} + + // Create a mock tenant limits implementation + tenantLimits := &mockTenantLimits{ + limits: map[string]*validation.Limits{ + user: { + ParquetConverterSortColumns: userSpecificSortColumns, + ParquetConverterEnabled: true, + }, + }, + } + + c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits, tenantLimits) ctx := context.Background() @@ -157,7 +169,7 @@ func prepareConfig() Config { return cfg } -func prepare(t *testing.T, cfg Config, bucketClient objstore.InstrumentedBucket, limits *validation.Limits) (*Converter, log.Logger, prometheus.Gatherer) { +func prepare(t *testing.T, cfg Config, bucketClient objstore.InstrumentedBucket, limits *validation.Limits, tenantLimits validation.TenantLimits) (*Converter, log.Logger, prometheus.Gatherer) { storageCfg := cortex_tsdb.BlocksStorageConfig{} blockRanges := cortex_tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour} flagext.DefaultValues(&storageCfg) @@ -176,7 +188,7 @@ func prepare(t *testing.T, cfg Config, bucketClient objstore.InstrumentedBucket, flagext.DefaultValues(limits) } - overrides := validation.NewOverrides(*limits, nil) + overrides := validation.NewOverrides(*limits, tenantLimits) scanner, err := users.NewScanner(cortex_tsdb.UsersScannerConfig{ Strategy: cortex_tsdb.UserScanStrategyList, @@ -384,3 +396,19 @@ func (r *RingMock) Get(key uint32, op ring.Operation, bufDescs []ring.InstanceDe }, }, nil } + +// mockTenantLimits implements the validation.TenantLimits interface for testing +type mockTenantLimits struct { + limits map[string]*validation.Limits +} + +func (m *mockTenantLimits) ByUserID(userID string) *validation.Limits { + if limits, ok := m.limits[userID]; ok { + return limits + } + return nil +} + +func (m *mockTenantLimits) AllByUserID() map[string]*validation.Limits { + return m.limits +} diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 1305c4034c5..c0611bff908 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -220,9 +220,9 @@ type Limits struct { CompactorPartitionSeriesCount int64 `yaml:"compactor_partition_series_count" json:"compactor_partition_series_count"` // Parquet converter - ParquetConverterEnabled bool `yaml:"parquet_converter_enabled" json:"parquet_converter_enabled"` - ParquetConverterTenantShardSize float64 `yaml:"parquet_converter_tenant_shard_size" json:"parquet_converter_tenant_shard_size"` - + ParquetConverterEnabled bool `yaml:"parquet_converter_enabled" json:"parquet_converter_enabled"` + ParquetConverterTenantShardSize float64 `yaml:"parquet_converter_tenant_shard_size" json:"parquet_converter_tenant_shard_size"` + ParquetConverterSortColumns []string `yaml:"parquet_converter_sort_columns" json:"parquet_converter_sort_columns"` // This config doesn't have a CLI flag registered here because they're registered in // their own original config struct. S3SSEType string `yaml:"s3_sse_type" json:"s3_sse_type" doc:"nocli|description=S3 server-side encryption type. Required to enable server-side encryption overrides for a specific tenant. If not set, the default S3 client settings are used."` @@ -325,6 +325,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.Float64Var(&l.ParquetConverterTenantShardSize, "parquet-converter.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the parquet converter. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant. If the value is < 1 and > 0 the shard size will be a percentage of the total parquet converters.") f.BoolVar(&l.ParquetConverterEnabled, "parquet-converter.enabled", false, "If set, enables the Parquet converter to create the parquet files.") + f.Var((*flagext.StringSlice)(&l.ParquetConverterSortColumns), "parquet-converter.sort-columns", "Additional label names for specific tenants to sort by after metric name, in order of precedence. These are applied during Parquet file generation.") // Parquet Queryable enforced limits. f.IntVar(&l.ParquetMaxFetchedRowCount, "querier.parquet-queryable.max-fetched-row-count", 0, "The maximum number of rows that can be fetched when querying parquet storage. Each row maps to a series in a parquet file. This limit applies before materializing chunks. 0 to disable.") @@ -903,6 +904,11 @@ func (o *Overrides) ParquetConverterEnabled(userID string) bool { return o.GetOverridesForUser(userID).ParquetConverterEnabled } +// ParquetConverterSortColumns returns the additional sort columns for parquet files. +func (o *Overrides) ParquetConverterSortColumns(userID string) []string { + return o.GetOverridesForUser(userID).ParquetConverterSortColumns +} + // ParquetMaxFetchedRowCount returns the maximum number of rows that can be fetched when querying parquet storage. func (o *Overrides) ParquetMaxFetchedRowCount(userID string) int { return o.GetOverridesForUser(userID).ParquetMaxFetchedRowCount