From 1e0319d9a9324fc4400b32f245ea587e79458af4 Mon Sep 17 00:00:00 2001 From: Paurush Garg Date: Fri, 6 Jun 2025 06:49:24 -0700 Subject: [PATCH 1/4] Add active series limit for nativeHistograms samples Signed-off-by: Paurush Garg --- docs/configuration/config-file-reference.md | 11 +++ pkg/ingester/ingester.go | 5 ++ pkg/ingester/limiter.go | 36 ++++++++-- pkg/ingester/limiter_test.go | 76 +++++++++++++++++++++ pkg/util/validation/limits.go | 35 ++++++++-- 5 files changed, 152 insertions(+), 11 deletions(-) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 4460ae63021..e27d87a5cc2 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3544,6 +3544,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -ingester.max-series-per-metric [max_series_per_metric: | default = 50000] +# The maximum number of active nativeHistograms series per user, per ingester. 0 +# to disable. +# CLI flag: -ingester.max-native-histograms-series-per-user +[max_native_histograms_series_per_user: | default = 5000000] + # The maximum number of active series per user, across the cluster before # replication. 0 to disable. Supported only if -distributor.shard-by-all-labels # is true. @@ -3555,6 +3560,12 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -ingester.max-global-series-per-metric [max_global_series_per_metric: | default = 0] +# The maximum number of active nativeHistograms series per user, across the +# cluster before replication. 0 to disable. Supported only if +# -distributor.shard-by-all-labels is true. +# CLI flag: -ingester.max-global-native-histograms-series-per-user +[max_global_native_histograms_series_per_user: | default = 0] + # [Experimental] Enable limits per LabelSet. Supported limits per labelSet: # [max_series] [limits_per_label_set: | default = []] diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index fec90ad821e..1139cd5017e 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -448,6 +448,11 @@ func (u *userTSDB) PreCreation(metric labels.Labels) error { } } + // Total nativeHistograms series limit. + if err := u.limiter.AssertMaxNativeHistogramsSeriesPerUser(u.userID, u.activeSeries.ActiveNativeHistogram()); err != nil { + return err + } + // Total series limit. if err := u.limiter.AssertMaxSeriesPerUser(u.userID, int(u.Head().NumSeries())); err != nil { return err diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index 94dd409b3bc..b7a96dbf342 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -12,10 +12,11 @@ import ( ) var ( - errMaxSeriesPerMetricLimitExceeded = errors.New("per-metric series limit exceeded") - errMaxMetadataPerMetricLimitExceeded = errors.New("per-metric metadata limit exceeded") - errMaxSeriesPerUserLimitExceeded = errors.New("per-user series limit exceeded") - errMaxMetadataPerUserLimitExceeded = errors.New("per-user metric metadata limit exceeded") + errMaxSeriesPerMetricLimitExceeded = errors.New("per-metric series limit exceeded") + errMaxMetadataPerMetricLimitExceeded = errors.New("per-metric metadata limit exceeded") + errMaxSeriesPerUserLimitExceeded = errors.New("per-user series limit exceeded") + errMaxNativeHistogramsSeriesPerUserLimitExceeded = errors.New("per-user nativeHistograms series limit exceeded") + errMaxMetadataPerUserLimitExceeded = errors.New("per-user metric metadata limit exceeded") ) type errMaxSeriesPerLabelSetLimitExceeded struct { @@ -95,6 +96,16 @@ func (l *Limiter) AssertMaxSeriesPerUser(userID string, series int) error { return errMaxSeriesPerUserLimitExceeded } +// AssertMaxNativeHistogramsSeriesPerUser limit has not been reached compared to the current +// number of nativeHistograms series in input and returns an error if so. +func (l *Limiter) AssertMaxNativeHistogramsSeriesPerUser(userID string, series int) error { + if actualLimit := l.maxNativeHistogramsSeriesPerUser(userID); series < actualLimit { + return nil + } + + return errMaxNativeHistogramsSeriesPerUserLimitExceeded +} + // AssertMaxMetricsWithMetadataPerUser limit has not been reached compared to the current // number of metrics with metadata in input and returns an error if so. func (l *Limiter) AssertMaxMetricsWithMetadataPerUser(userID string, metrics int) error { @@ -158,6 +169,15 @@ func (l *Limiter) formatMaxSeriesPerUserError(userID string) error { minNonZero(localLimit, globalLimit), l.AdminLimitMessage, localLimit, globalLimit, actualLimit) } +func (l *Limiter) formatMaxNativeHistogramsSeriesPerUserError(userID string) error { + actualLimit := l.maxNativeHistogramsSeriesPerUser(userID) + localLimit := l.limits.MaxLocalNativeHistogramsSeriesPerUser(userID) + globalLimit := l.limits.MaxGlobalNativeHistogramsSeriesPerUser(userID) + + return fmt.Errorf("per-user nativeHistograms series limit of %d exceeded, %s (local limit: %d global limit: %d actual local limit: %d)", + minNonZero(localLimit, globalLimit), l.AdminLimitMessage, localLimit, globalLimit, actualLimit) +} + func (l *Limiter) formatMaxSeriesPerMetricError(userID string, metric string) error { actualLimit := l.maxSeriesPerMetric(userID) localLimit := l.limits.MaxLocalSeriesPerMetric(userID) @@ -248,6 +268,14 @@ func (l *Limiter) maxSeriesPerUser(userID string) int { ) } +func (l *Limiter) maxNativeHistogramsSeriesPerUser(userID string) int { + return l.maxByLocalAndGlobal( + userID, + l.limits.MaxLocalNativeHistogramsSeriesPerUser, + l.limits.MaxGlobalNativeHistogramsSeriesPerUser, + ) +} + func (l *Limiter) maxMetadataPerUser(userID string) int { return l.maxByLocalAndGlobal( userID, diff --git a/pkg/ingester/limiter_test.go b/pkg/ingester/limiter_test.go index a1043b053e5..d1cbe48c32b 100644 --- a/pkg/ingester/limiter_test.go +++ b/pkg/ingester/limiter_test.go @@ -54,6 +54,19 @@ func TestLimiter_maxSeriesPerUser(t *testing.T) { runLimiterMaxFunctionTest(t, applyLimits, runMaxFn, false) } +func TestLimiter_maxNativeHistogramsSeriesPerUser(t *testing.T) { + applyLimits := func(limits *validation.Limits, localLimit, globalLimit int) { + limits.MaxLocalNativeHistogramsSeriesPerUser = localLimit + limits.MaxGlobalNativeHistogramsSeriesPerUser = globalLimit + } + + runMaxFn := func(limiter *Limiter) int { + return limiter.maxNativeHistogramsSeriesPerUser("test") + } + + runLimiterMaxFunctionTest(t, applyLimits, runMaxFn, false) +} + func TestLimiter_maxMetadataPerUser(t *testing.T) { applyLimits := func(limits *validation.Limits, localLimit, globalLimit int) { limits.MaxLocalMetricsWithMetadataPerUser = localLimit @@ -425,6 +438,69 @@ func TestLimiter_AssertMaxSeriesPerUser(t *testing.T) { } } +func TestLimiter_AssertMaxNativeHistogramsSeriesPerUser(t *testing.T) { + tests := map[string]struct { + maxLocalNativeHistogramsSeriesPerUser int + maxGlobalNativeHistogramsSeriesPerUser int + ringReplicationFactor int + ringIngesterCount int + shardByAllLabels bool + series int + expected error + }{ + "both local and global limit are disabled": { + maxLocalNativeHistogramsSeriesPerUser: 0, + maxGlobalNativeHistogramsSeriesPerUser: 0, + ringReplicationFactor: 1, + ringIngesterCount: 1, + shardByAllLabels: false, + series: 100, + expected: nil, + }, + "current number of series is below the limit": { + maxLocalNativeHistogramsSeriesPerUser: 0, + maxGlobalNativeHistogramsSeriesPerUser: 1000, + ringReplicationFactor: 3, + ringIngesterCount: 10, + shardByAllLabels: true, + series: 299, + expected: nil, + }, + "current number of series is above the limit": { + maxLocalNativeHistogramsSeriesPerUser: 0, + maxGlobalNativeHistogramsSeriesPerUser: 1000, + ringReplicationFactor: 3, + ringIngesterCount: 10, + shardByAllLabels: true, + series: 300, + expected: errMaxNativeHistogramsSeriesPerUserLimitExceeded, + }, + } + + for testName, testData := range tests { + testData := testData + + t.Run(testName, func(t *testing.T) { + // Mock the ring + ring := &ringCountMock{} + ring.On("HealthyInstancesCount").Return(testData.ringIngesterCount) + ring.On("ZonesCount").Return(1) + + // Mock limits + limits, err := validation.NewOverrides(validation.Limits{ + MaxLocalNativeHistogramsSeriesPerUser: testData.maxLocalNativeHistogramsSeriesPerUser, + MaxGlobalNativeHistogramsSeriesPerUser: testData.maxGlobalNativeHistogramsSeriesPerUser, + }, nil) + require.NoError(t, err) + + limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "") + actual := limiter.AssertMaxNativeHistogramsSeriesPerUser("test", testData.series) + + assert.Equal(t, testData.expected, actual) + }) + } +} + func TestLimiter_AssertMaxSeriesPerLabelSet(t *testing.T) { tests := map[string]struct { diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 6419dc6ba89..76751d5dc31 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -23,7 +23,8 @@ import ( util_log "github.com/cortexproject/cortex/pkg/util/log" ) -var errMaxGlobalSeriesPerUserValidation = errors.New("The ingester.max-global-series-per-user limit is unsupported if distributor.shard-by-all-labels is disabled") +var errMaxGlobalSeriesPerUserValidation = errors.New("the ingester.max-global-series-per-user limit is unsupported if distributor.shard-by-all-labels is disabled") +var errMaxGlobalNativeHistogramsSeriesPerUserValidation = errors.New("the ingester.max-global-native-histograms-series-per-user limit is unsupported if distributor.shard-by-all-labels is disabled") var errDuplicateQueryPriorities = errors.New("duplicate entry of priorities found. Make sure they are all unique, including the default priority") var errCompilingQueryPriorityRegex = errors.New("error compiling query priority regex") var errDuplicatePerLabelSetLimit = errors.New("duplicate per labelSet limits found. Make sure they are all unique") @@ -149,12 +150,14 @@ type Limits struct { // Ingester enforced limits. // Series - MaxLocalSeriesPerUser int `yaml:"max_series_per_user" json:"max_series_per_user"` - MaxLocalSeriesPerMetric int `yaml:"max_series_per_metric" json:"max_series_per_metric"` - MaxGlobalSeriesPerUser int `yaml:"max_global_series_per_user" json:"max_global_series_per_user"` - MaxGlobalSeriesPerMetric int `yaml:"max_global_series_per_metric" json:"max_global_series_per_metric"` - LimitsPerLabelSet []LimitsPerLabelSet `yaml:"limits_per_label_set" json:"limits_per_label_set" doc:"nocli|description=[Experimental] Enable limits per LabelSet. Supported limits per labelSet: [max_series]"` - EnableNativeHistograms bool `yaml:"enable_native_histograms" json:"enable_native_histograms"` + MaxLocalSeriesPerUser int `yaml:"max_series_per_user" json:"max_series_per_user"` + MaxLocalSeriesPerMetric int `yaml:"max_series_per_metric" json:"max_series_per_metric"` + MaxLocalNativeHistogramsSeriesPerUser int `yaml:"max_native_histograms_series_per_user" json:"max_native_histograms_series_per_user"` + MaxGlobalSeriesPerUser int `yaml:"max_global_series_per_user" json:"max_global_series_per_user"` + MaxGlobalSeriesPerMetric int `yaml:"max_global_series_per_metric" json:"max_global_series_per_metric"` + MaxGlobalNativeHistogramsSeriesPerUser int `yaml:"max_global_native_histograms_series_per_user" json:"max_global_native_histograms_series_per_user"` + LimitsPerLabelSet []LimitsPerLabelSet `yaml:"limits_per_label_set" json:"limits_per_label_set" doc:"nocli|description=[Experimental] Enable limits per LabelSet. Supported limits per labelSet: [max_series]"` + EnableNativeHistograms bool `yaml:"enable_native_histograms" json:"enable_native_histograms"` // Metadata MaxLocalMetricsWithMetadataPerUser int `yaml:"max_metadata_per_user" json:"max_metadata_per_user"` @@ -267,6 +270,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxLocalSeriesPerMetric, "ingester.max-series-per-metric", 50000, "The maximum number of active series per metric name, per ingester. 0 to disable.") f.IntVar(&l.MaxGlobalSeriesPerUser, "ingester.max-global-series-per-user", 0, "The maximum number of active series per user, across the cluster before replication. 0 to disable. Supported only if -distributor.shard-by-all-labels is true.") f.IntVar(&l.MaxGlobalSeriesPerMetric, "ingester.max-global-series-per-metric", 0, "The maximum number of active series per metric name, across the cluster before replication. 0 to disable.") + f.IntVar(&l.MaxLocalNativeHistogramsSeriesPerUser, "ingester.max-native-histograms-series-per-user", 5000000, "The maximum number of active nativeHistograms series per user, per ingester. 0 to disable.") + f.IntVar(&l.MaxGlobalNativeHistogramsSeriesPerUser, "ingester.max-global-native-histograms-series-per-user", 0, "The maximum number of active nativeHistograms series per user, across the cluster before replication. 0 to disable. Supported only if -distributor.shard-by-all-labels is true.") f.BoolVar(&l.EnableNativeHistograms, "blocks-storage.tsdb.enable-native-histograms", false, "[EXPERIMENTAL] True to enable native histogram.") f.IntVar(&l.MaxExemplars, "ingester.max-exemplars", 0, "Enables support for exemplars in TSDB and sets the maximum number that will be stored. less than zero means disabled. If the value is set to zero, cortex will fallback to blocks-storage.tsdb.max-exemplars value.") f.Var(&l.OutOfOrderTimeWindow, "ingester.out-of-order-time-window", "[Experimental] Configures the allowed time window for ingestion of out-of-order samples. Disabled (0s) by default.") @@ -341,6 +346,12 @@ func (l *Limits) Validate(shardByAllLabels bool) error { return errMaxGlobalSeriesPerUserValidation } + // The ingester.max-global-native-histograms-series-per-user metric is not supported + // if shard-by-all-labels is disabled + if l.MaxGlobalNativeHistogramsSeriesPerUser > 0 && !shardByAllLabels { + return errMaxGlobalNativeHistogramsSeriesPerUserValidation + } + if err := l.RulerExternalLabels.Validate(func(l labels.Label) error { if !model.LabelName(l.Name).IsValid() { return fmt.Errorf("%w: %q", errInvalidLabelName, l.Name) @@ -663,6 +674,11 @@ func (o *Overrides) MaxLocalSeriesPerUser(userID string) int { return o.GetOverridesForUser(userID).MaxLocalSeriesPerUser } +// MaxLocalNativeHistogramsSeriesPerUser returns the maximum number of nativeHistograms series a user is allowed to store in a single ingester. +func (o *Overrides) MaxLocalNativeHistogramsSeriesPerUser(userID string) int { + return o.GetOverridesForUser(userID).MaxLocalNativeHistogramsSeriesPerUser +} + // MaxLocalSeriesPerMetric returns the maximum number of series allowed per metric in a single ingester. func (o *Overrides) MaxLocalSeriesPerMetric(userID string) int { return o.GetOverridesForUser(userID).MaxLocalSeriesPerMetric @@ -673,6 +689,11 @@ func (o *Overrides) MaxGlobalSeriesPerUser(userID string) int { return o.GetOverridesForUser(userID).MaxGlobalSeriesPerUser } +// MaxGlobalNativeHistogramsSeriesPerUser returns the maximum number of nativeHistograms series a user is allowed to store across the cluster. +func (o *Overrides) MaxGlobalNativeHistogramsSeriesPerUser(userID string) int { + return o.GetOverridesForUser(userID).MaxGlobalNativeHistogramsSeriesPerUser +} + // EnableNativeHistograms returns whether the Ingester should accept NativeHistograms samples from this user. func (o *Overrides) EnableNativeHistograms(userID string) bool { return o.GetOverridesForUser(userID).EnableNativeHistograms From bf9a020839e22b645e03cb4dc3a2c82680003812 Mon Sep 17 00:00:00 2001 From: Paurush Garg Date: Fri, 6 Jun 2025 06:58:32 -0700 Subject: [PATCH 2/4] Adding Changelog Signed-off-by: Paurush Garg --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f78f1b445c3..3bd7d859217 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog ## master / unreleased +* [ENHANCEMENT] Ingester: Add activeSeries limit specifically for NativeHistograms. #6796 * [CHANGE] Ingester: Remove EnableNativeHistograms config flag and instead gate keep through new per-tenant limit at ingestion. #6718 * [CHANGE] StoreGateway/Alertmanager: Add default 5s connection timeout on client. #6603 * [CHANGE] Validate a tenantID when to use a single tenant resolver. #6727 From e2ed3f3ce674e61001de4325a6abacebaf577611 Mon Sep 17 00:00:00 2001 From: Paurush Garg Date: Fri, 6 Jun 2025 07:07:34 -0700 Subject: [PATCH 3/4] Resolving Errror in limiter Signed-off-by: Paurush Garg --- pkg/ingester/limiter.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index b7a96dbf342..7eb305e9aa3 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -145,6 +145,8 @@ func (l *Limiter) FormatError(userID string, err error, lbls labels.Labels) erro switch { case errors.Is(err, errMaxSeriesPerUserLimitExceeded): return l.formatMaxSeriesPerUserError(userID) + case errors.Is(err, errMaxNativeHistogramsSeriesPerUserLimitExceeded): + return l.formatMaxNativeHistogramsSeriesPerUserError(userID) case errors.Is(err, errMaxSeriesPerMetricLimitExceeded): return l.formatMaxSeriesPerMetricError(userID, lbls.Get(labels.MetricName)) case errors.Is(err, errMaxMetadataPerUserLimitExceeded): From d0affaa8b10aaa726a82794600aa134d07fd9668 Mon Sep 17 00:00:00 2001 From: Paurush Garg Date: Fri, 6 Jun 2025 13:48:00 -0700 Subject: [PATCH 4/4] Adding testing Signed-off-by: Paurush Garg --- pkg/ingester/ingester.go | 40 ++++++++++------ pkg/ingester/ingester_test.go | 87 +++++++++++++++++++++++++++++++++++ pkg/ingester/limiter_test.go | 12 +++-- pkg/ingester/user_state.go | 7 +-- 4 files changed, 124 insertions(+), 22 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 1139cd5017e..fe20c1c243c 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1224,21 +1224,22 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte // Keep track of some stats which are tracked only if the samples will be // successfully committed var ( - succeededSamplesCount = 0 - failedSamplesCount = 0 - succeededHistogramsCount = 0 - failedHistogramsCount = 0 - succeededExemplarsCount = 0 - failedExemplarsCount = 0 - startAppend = time.Now() - sampleOutOfBoundsCount = 0 - sampleOutOfOrderCount = 0 - sampleTooOldCount = 0 - newValueForTimestampCount = 0 - perUserSeriesLimitCount = 0 - perLabelSetSeriesLimitCount = 0 - perMetricSeriesLimitCount = 0 - discardedNativeHistogramCount = 0 + succeededSamplesCount = 0 + failedSamplesCount = 0 + succeededHistogramsCount = 0 + failedHistogramsCount = 0 + succeededExemplarsCount = 0 + failedExemplarsCount = 0 + startAppend = time.Now() + sampleOutOfBoundsCount = 0 + sampleOutOfOrderCount = 0 + sampleTooOldCount = 0 + newValueForTimestampCount = 0 + perUserSeriesLimitCount = 0 + perUserNativeHistogramsSeriesLimitCount = 0 + perLabelSetSeriesLimitCount = 0 + perMetricSeriesLimitCount = 0 + discardedNativeHistogramCount = 0 updateFirstPartial = func(errFn func() error) { if firstPartialErr == nil { @@ -1274,6 +1275,12 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause, copiedLabels)) }) + case errors.Is(cause, errMaxNativeHistogramsSeriesPerUserLimitExceeded): + perUserNativeHistogramsSeriesLimitCount++ + updateFirstPartial(func() error { + return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause, copiedLabels)) + }) + case errors.Is(cause, errMaxSeriesPerMetricLimitExceeded): perMetricSeriesLimitCount++ updateFirstPartial(func() error { @@ -1517,6 +1524,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte if perUserSeriesLimitCount > 0 { i.validateMetrics.DiscardedSamples.WithLabelValues(perUserSeriesLimit, userID).Add(float64(perUserSeriesLimitCount)) } + if perUserNativeHistogramsSeriesLimitCount > 0 { + i.validateMetrics.DiscardedSamples.WithLabelValues(perUserNativeHistogramsSeriesLimit, userID).Add(float64(perUserNativeHistogramsSeriesLimitCount)) + } if perMetricSeriesLimitCount > 0 { i.validateMetrics.DiscardedSamples.WithLabelValues(perMetricSeriesLimit, userID).Add(float64(perMetricSeriesLimitCount)) } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index b0098663ab7..998cbf883f5 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -868,6 +868,93 @@ func TestIngesterUserLimitExceeded(t *testing.T) { } +func TestIngesterUserLimitExceededForNativeHistograms(t *testing.T) { + limits := defaultLimitsTestConfig() + limits.EnableNativeHistograms = true + limits.MaxLocalNativeHistogramsSeriesPerUser = 1 + limits.MaxLocalSeriesPerUser = 1 + limits.MaxLocalMetricsWithMetadataPerUser = 1 + + userID := "1" + // Series + labels1 := labels.Labels{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}} + labels3 := labels.Labels{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "biz"}} + sampleNativeHistogram1 := cortexpb.HistogramToHistogramProto(0, tsdbutil.GenerateTestHistogram(1)) + sampleNativeHistogram2 := cortexpb.HistogramToHistogramProto(1, tsdbutil.GenerateTestHistogram(2)) + sampleNativeHistogram3 := cortexpb.HistogramToHistogramProto(0, tsdbutil.GenerateTestHistogram(3)) + + // Metadata + metadata1 := &cortexpb.MetricMetadata{MetricFamilyName: "testmetric", Help: "a help for testmetric", Type: cortexpb.COUNTER} + metadata2 := &cortexpb.MetricMetadata{MetricFamilyName: "testmetric2", Help: "a help for testmetric2", Type: cortexpb.COUNTER} + + dir := t.TempDir() + + chunksDir := filepath.Join(dir, "chunks") + blocksDir := filepath.Join(dir, "blocks") + require.NoError(t, os.Mkdir(chunksDir, os.ModePerm)) + require.NoError(t, os.Mkdir(blocksDir, os.ModePerm)) + + blocksIngesterGenerator := func(reg prometheus.Registerer) *Ingester { + ing, err := prepareIngesterWithBlocksStorageAndLimits(t, defaultIngesterTestConfig(t), limits, nil, blocksDir, reg) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) + // Wait until it's ACTIVE + test.Poll(t, time.Second, ring.ACTIVE, func() interface{} { + return ing.lifecycler.GetState() + }) + + return ing + } + + tests := []string{"blocks"} + for i, ingGenerator := range []func(reg prometheus.Registerer) *Ingester{blocksIngesterGenerator} { + t.Run(tests[i], func(t *testing.T) { + reg := prometheus.NewRegistry() + ing := ingGenerator(reg) + + // Append only one series and one metadata first, expect no error. + ctx := user.InjectOrgID(context.Background(), userID) + _, err := ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels1}, nil, []*cortexpb.MetricMetadata{metadata1}, []cortexpb.Histogram{sampleNativeHistogram1}, cortexpb.API)) + require.NoError(t, err) + + testLimits := func(reg prometheus.Gatherer) { + // Append to two series, expect series-exceeded error. + _, err = ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels1, labels3}, nil, nil, []cortexpb.Histogram{sampleNativeHistogram2, sampleNativeHistogram3}, cortexpb.API)) + httpResp, ok := httpgrpc.HTTPResponseFromError(err) + require.True(t, ok, "returned error is not an httpgrpc response") + assert.Equal(t, http.StatusBadRequest, int(httpResp.Code)) + assert.Equal(t, wrapWithUser(makeLimitError(perUserNativeHistogramsSeriesLimit, ing.limiter.FormatError(userID, errMaxNativeHistogramsSeriesPerUserLimitExceeded, labels1)), userID).Error(), string(httpResp.Body)) + + // Append two metadata, expect no error since metadata is a best effort approach. + _, err = ing.Push(ctx, cortexpb.ToWriteRequest(nil, nil, []*cortexpb.MetricMetadata{metadata1, metadata2}, nil, cortexpb.API)) + require.NoError(t, err) + + // Read samples back via ingester queries. + res, _, err := runTestQuery(ctx, t, ing, labels.MatchEqual, model.MetricNameLabel, "testmetric") + require.NoError(t, err) + require.NotNil(t, res) + + // Verify metadata + m, err := ing.MetricsMetadata(ctx, &client.MetricsMetadataRequest{Limit: -1, LimitPerMetric: -1, Metric: ""}) + require.NoError(t, err) + assert.Equal(t, []*cortexpb.MetricMetadata{metadata1}, m.Metadata) + } + + testLimits(reg) + + // Limits should hold after restart. + services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck + // Use new registry to prevent metrics registration panic. + reg = prometheus.NewRegistry() + ing = ingGenerator(reg) + defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck + + testLimits(reg) + }) + } + +} + func benchmarkData(nSeries int) (allLabels []labels.Labels, allSamples []cortexpb.Sample) { for j := 0; j < nSeries; j++ { labels := chunk.BenchmarkLabels.Copy() diff --git a/pkg/ingester/limiter_test.go b/pkg/ingester/limiter_test.go index d1cbe48c32b..239b84ce85e 100644 --- a/pkg/ingester/limiter_test.go +++ b/pkg/ingester/limiter_test.go @@ -656,10 +656,11 @@ func TestLimiter_FormatError(t *testing.T) { // Mock limits limits, err := validation.NewOverrides(validation.Limits{ - MaxGlobalSeriesPerUser: 100, - MaxGlobalSeriesPerMetric: 20, - MaxGlobalMetricsWithMetadataPerUser: 10, - MaxGlobalMetadataPerMetric: 3, + MaxGlobalSeriesPerUser: 100, + MaxGlobalNativeHistogramsSeriesPerUser: 100, + MaxGlobalSeriesPerMetric: 20, + MaxGlobalMetricsWithMetadataPerUser: 10, + MaxGlobalMetadataPerMetric: 3, }, nil) require.NoError(t, err) @@ -669,6 +670,9 @@ func TestLimiter_FormatError(t *testing.T) { actual := limiter.FormatError("user-1", errMaxSeriesPerUserLimitExceeded, lbls) assert.EqualError(t, actual, "per-user series limit of 100 exceeded, please contact administrator to raise it (local limit: 0 global limit: 100 actual local limit: 100)") + actual = limiter.FormatError("user-1", errMaxNativeHistogramsSeriesPerUserLimitExceeded, lbls) + assert.EqualError(t, actual, "per-user nativeHistograms series limit of 100 exceeded, please contact administrator to raise it (local limit: 0 global limit: 100 actual local limit: 100)") + actual = limiter.FormatError("user-1", errMaxSeriesPerMetricLimitExceeded, lbls) assert.EqualError(t, actual, "per-metric series limit of 20 exceeded for metric testMetric, please contact administrator to raise it (local limit: 0 global limit: 20 actual local limit: 20)") diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index 9ef89d48a92..5d04cee8a7d 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -16,9 +16,10 @@ import ( // DiscardedSamples metric labels const ( - perUserSeriesLimit = "per_user_series_limit" - perMetricSeriesLimit = "per_metric_series_limit" - perLabelsetSeriesLimit = "per_labelset_series_limit" + perUserSeriesLimit = "per_user_series_limit" + perUserNativeHistogramsSeriesLimit = "per_user_native_histograms_series_limit" + perMetricSeriesLimit = "per_metric_series_limit" + perLabelsetSeriesLimit = "per_labelset_series_limit" ) const numMetricCounterShards = 128