diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 3bb3256fe8c..4406918d239 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -64,10 +64,6 @@ const ( typeMetadata = "metadata" instanceIngestionRateTickInterval = time.Second - - // mergeSlicesParallelism is a constant of how much go routines we should use to merge slices, and - // it was based on empirical observation: See BenchmarkMergeSlicesParallel - mergeSlicesParallelism = 8 ) // Distributor is a storage.SampleAppender and a client.Querier which @@ -973,7 +969,7 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t for i, resp := range resps { values[i] = resp.([]string) } - r := util.MergeSlicesParallel(mergeSlicesParallelism, values...) + r := util.MergeSlicesParallel(util.DefaultMergeSlicesParallelism, values...) span.SetTag("result_length", len(r)) return r, nil } @@ -1043,7 +1039,7 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time, for i, resp := range resps { values[i] = resp.([]string) } - r := util.MergeSlicesParallel(mergeSlicesParallelism, values...) + r := util.MergeSlicesParallel(util.DefaultMergeSlicesParallelism, values...) span.SetTag("result_length", len(r)) return r, nil diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index a8b0f0027cb..aeaf53c3bc7 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -407,7 +407,7 @@ func (q *blocksStoreQuerier) LabelValues(ctx context.Context, name string, match return nil, nil, err } - return strutil.MergeSlices(resValueSets...), resWarnings, nil + return util.MergeSlicesParallel(util.DefaultMergeSlicesParallelism, resValueSets...), resWarnings, nil } func (q *blocksStoreQuerier) Close() error { diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index f624644866a..de1cfaa2313 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -892,6 +892,57 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { } } +func BenchmarkBlocksStoreQuerier_Labels(b *testing.B) { + const ( + minT = int64(10) + maxT = int64(20) + ) + ctx := user.InjectOrgID(context.Background(), "user-1") + reg := prometheus.NewPedanticRegistry() + blocks := bucketindex.Blocks{} + resps := map[BlocksStoreClient][]ulid.ULID{} + for i := 0; i < 500; i++ { + b := &bucketindex.Block{ID: ulid.MustNew(uint64(i), nil)} + blocks = append(blocks, b) + values := []string{} + for j := i; j < i+300; j++ { + values = append(values, fmt.Sprintf("Value_%v", j)) + } + sort.Strings(values) + resps[&storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: values, + Warnings: []string{}, + Hints: mockValuesHints(b.ID), + }, + }] = []ulid.ULID{b.ID} + } + + stores := &blocksStoreSetMock{mockedResponses: []interface{}{resps}, rotateMockResults: true} + finder := &blocksFinderMock{} + + finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) + + q := &blocksStoreQuerier{ + minT: minT, + maxT: maxT, + finder: finder, + stores: stores, + consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil), + logger: log.NewNopLogger(), + metrics: newBlocksStoreQueryableMetrics(reg), + limits: &blocksStoreLimitsMock{}, + } + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + _, _, err := q.LabelValues(ctx, labels.MetricName) + require.NoError(b, err) + } +} + func TestBlocksStoreQuerier_Labels(t *testing.T) { t.Parallel() @@ -1619,6 +1670,8 @@ type blocksStoreSetMock struct { mockedResponses []interface{} nextResult int + + rotateMockResults bool } func (m *blocksStoreSetMock) GetClientsFor(_ string, _ []ulid.ULID, _ map[ulid.ULID][]string, _ map[ulid.ULID]map[string]int) (map[BlocksStoreClient][]ulid.ULID, error) { @@ -1629,6 +1682,10 @@ func (m *blocksStoreSetMock) GetClientsFor(_ string, _ []ulid.ULID, _ map[ulid.U res := m.mockedResponses[m.nextResult] m.nextResult++ + if m.rotateMockResults { + m.nextResult = m.nextResult % len(m.mockedResponses) + } + if err, ok := res.(error); ok { return nil, err } diff --git a/pkg/util/strings.go b/pkg/util/strings.go index 30a3283c536..13dffa15784 100644 --- a/pkg/util/strings.go +++ b/pkg/util/strings.go @@ -7,6 +7,12 @@ import ( "github.com/bboreham/go-loser" ) +const ( + // DefaultMergeSlicesParallelism is a constant of how much go routines we should use to merge slices, and + // it was based on empirical observation: See BenchmarkMergeSlicesParallel + DefaultMergeSlicesParallelism = 8 +) + // StringsContain returns true if the search value is within the list of input values. func StringsContain(values []string, search string) bool { for _, v := range values {