Skip to content

Commit 3e5be60

Browse files
committed
distributed execution fuzz test
1 parent 6dc7422 commit 3e5be60

File tree

3 files changed

+199
-129
lines changed

3 files changed

+199
-129
lines changed
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
//go:build integration_query_fuzz
2+
// +build integration_query_fuzz
3+
4+
package integration
5+
6+
import (
7+
"context"
8+
"math/rand"
9+
"path"
10+
"strconv"
11+
"strings"
12+
"testing"
13+
"time"
14+
15+
"github.com/cortexproject/promqlsmith"
16+
"github.com/prometheus/prometheus/model/labels"
17+
"github.com/prometheus/prometheus/prompb"
18+
"github.com/stretchr/testify/require"
19+
20+
"github.com/cortexproject/cortex/integration/e2e"
21+
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
22+
"github.com/cortexproject/cortex/integration/e2ecortex"
23+
"github.com/cortexproject/cortex/pkg/storage/tsdb"
24+
)
25+
26+
func TestDistributedExecutionFuzz(t *testing.T) {
27+
s, err := e2e.NewScenario(networkName)
28+
require.NoError(t, err)
29+
defer s.Close()
30+
31+
// start dependencies.
32+
consul1 := e2edb.NewConsulWithName("consul1")
33+
consul2 := e2edb.NewConsulWithName("consul2")
34+
require.NoError(t, s.StartAndWaitReady(consul1, consul2))
35+
36+
flags := mergeFlags(
37+
AlertmanagerLocalFlags(),
38+
map[string]string{
39+
"-store.engine": blocksStorageEngine,
40+
"-blocks-storage.backend": "filesystem",
41+
"-blocks-storage.tsdb.head-compaction-interval": "4m",
42+
"-blocks-storage.tsdb.block-ranges-period": "2h",
43+
"-blocks-storage.tsdb.ship-interval": "1h",
44+
"-blocks-storage.bucket-store.sync-interval": "15m",
45+
"-blocks-storage.tsdb.retention-period": "2h",
46+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
47+
"-querier.query-store-for-labels-enabled": "true",
48+
// Ingester.
49+
"-ring.store": "consul",
50+
"-consul.hostname": consul1.NetworkHTTPEndpoint(),
51+
// Distributor.
52+
"-distributor.replication-factor": "1",
53+
// Store-gateway.
54+
"-store-gateway.sharding-enabled": "false",
55+
// alert manager
56+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
57+
},
58+
)
59+
// make alert manager config dir
60+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
61+
62+
path1 := path.Join(s.SharedDir(), "cortex-1")
63+
path2 := path.Join(s.SharedDir(), "cortex-2")
64+
65+
flags1 := mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path1})
66+
67+
// Start first Cortex replicas
68+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul1.NetworkHTTPEndpoint(), flags1, "")
69+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul1.NetworkHTTPEndpoint(), flags1, "")
70+
queryScheduler := e2ecortex.NewQueryScheduler("query-scheduler", flags1, "")
71+
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul1.NetworkHTTPEndpoint(), flags1, "")
72+
require.NoError(t, s.StartAndWaitReady(queryScheduler, distributor, ingester, storeGateway))
73+
flags1 = mergeFlags(flags1, map[string]string{
74+
"-querier.store-gateway-addresses": strings.Join([]string{storeGateway.NetworkGRPCEndpoint()}, ","),
75+
})
76+
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", mergeFlags(flags1, map[string]string{
77+
"-frontend.scheduler-address": queryScheduler.NetworkGRPCEndpoint(),
78+
}), "")
79+
require.NoError(t, s.Start(queryFrontend))
80+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul1.NetworkHTTPEndpoint(), mergeFlags(flags1, map[string]string{
81+
"-querier.scheduler-address": queryScheduler.NetworkGRPCEndpoint(),
82+
}), "")
83+
require.NoError(t, s.StartAndWaitReady(querier))
84+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
85+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
86+
c1, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-1")
87+
require.NoError(t, err)
88+
89+
// Enable distributed execution for the second Cortex instance.
90+
flags2 := mergeFlags(flags, map[string]string{
91+
"-frontend.query-vertical-shard-size": "2",
92+
"-blocks-storage.filesystem.dir": path2,
93+
"-consul.hostname": consul2.NetworkHTTPEndpoint(),
94+
"-querier.thanos-engine": "true",
95+
"-querier.distributed-exec-enabled": "true",
96+
"-api.querier-default-codec": "protobuf",
97+
})
98+
99+
distributor2 := e2ecortex.NewDistributor("distributor2", e2ecortex.RingStoreConsul, consul2.NetworkHTTPEndpoint(), flags2, "")
100+
ingester2 := e2ecortex.NewIngester("ingester2", e2ecortex.RingStoreConsul, consul2.NetworkHTTPEndpoint(), flags2, "")
101+
queryScheduler2 := e2ecortex.NewQueryScheduler("query-scheduler2", flags2, "")
102+
storeGateway2 := e2ecortex.NewStoreGateway("store-gateway2", e2ecortex.RingStoreConsul, consul2.NetworkHTTPEndpoint(), flags2, "")
103+
require.NoError(t, s.StartAndWaitReady(queryScheduler2, distributor2, ingester2, storeGateway2))
104+
flags2 = mergeFlags(flags1, map[string]string{
105+
"-querier.store-gateway-addresses": strings.Join([]string{storeGateway2.NetworkGRPCEndpoint()}, ","),
106+
})
107+
queryFrontend2 := e2ecortex.NewQueryFrontend("query-frontend2", mergeFlags(flags2, map[string]string{
108+
"-frontend.scheduler-address": queryScheduler2.NetworkGRPCEndpoint(),
109+
}), "")
110+
require.NoError(t, s.Start(queryFrontend2))
111+
querier2 := e2ecortex.NewQuerier("querier2", e2ecortex.RingStoreConsul, consul2.NetworkHTTPEndpoint(), mergeFlags(flags2, map[string]string{
112+
"-querier.scheduler-address": queryScheduler2.NetworkGRPCEndpoint(),
113+
}), "")
114+
require.NoError(t, s.StartAndWaitReady(querier2))
115+
require.NoError(t, distributor2.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
116+
require.NoError(t, querier2.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
117+
c2, err := e2ecortex.NewClient(distributor2.HTTPEndpoint(), queryFrontend2.HTTPEndpoint(), "", "", "user-1")
118+
require.NoError(t, err)
119+
120+
now := time.Now()
121+
// Push some series to Cortex.
122+
start := now.Add(-time.Minute * 10)
123+
end := now.Add(-time.Minute * 1)
124+
numSeries := 3
125+
numSamples := 20
126+
lbls := make([]labels.Labels, numSeries*2)
127+
serieses := make([]prompb.TimeSeries, numSeries*2)
128+
scrapeInterval := 30 * time.Second
129+
for i := 0; i < numSeries; i++ {
130+
series := e2e.GenerateSeriesWithSamples("test_series_a", start, scrapeInterval, i*numSamples, numSamples, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "series", Value: strconv.Itoa(i)})
131+
serieses[i] = series
132+
builder := labels.NewBuilder(labels.EmptyLabels())
133+
for _, lbl := range series.Labels {
134+
builder.Set(lbl.Name, lbl.Value)
135+
}
136+
lbls[i] = builder.Labels()
137+
}
138+
139+
// Generate another set of series for testing binary expression and vector matching.
140+
for i := numSeries; i < 2*numSeries; i++ {
141+
prompbLabels := []prompb.Label{{Name: "job", Value: "test"}, {Name: "series", Value: strconv.Itoa(i)}}
142+
switch i % 3 {
143+
case 0:
144+
prompbLabels = append(prompbLabels, prompb.Label{Name: "status_code", Value: "200"})
145+
case 1:
146+
prompbLabels = append(prompbLabels, prompb.Label{Name: "status_code", Value: "400"})
147+
default:
148+
prompbLabels = append(prompbLabels, prompb.Label{Name: "status_code", Value: "500"})
149+
}
150+
series := e2e.GenerateSeriesWithSamples("test_series_b", start, scrapeInterval, i*numSamples, numSamples, prompbLabels...)
151+
serieses[i] = series
152+
builder := labels.NewBuilder(labels.EmptyLabels())
153+
for _, lbl := range series.Labels {
154+
builder.Set(lbl.Name, lbl.Value)
155+
}
156+
lbls[i] = builder.Labels()
157+
}
158+
res, err := c1.Push(serieses)
159+
require.NoError(t, err)
160+
require.Equal(t, 200, res.StatusCode)
161+
res, err = c2.Push(serieses)
162+
require.NoError(t, err)
163+
require.Equal(t, 200, res.StatusCode)
164+
165+
waitUntilReady(t, context.Background(), c1, c2, `{job="test"}`, start, end)
166+
167+
rnd := rand.New(rand.NewSource(now.Unix()))
168+
opts := []promqlsmith.Option{
169+
promqlsmith.WithEnableOffset(true),
170+
promqlsmith.WithEnableAtModifier(true),
171+
promqlsmith.WithEnabledFunctions(enabledFunctions),
172+
promqlsmith.WithEnabledAggrs(enabledAggrs),
173+
}
174+
ps := promqlsmith.New(rnd, lbls, opts...)
175+
176+
runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 1000, false)
177+
}

integration/querier_test.go

Lines changed: 0 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -1450,124 +1450,3 @@ func TestQuerierDistributedExecution(t *testing.T) {
14501450
require.NoError(t, err)
14511451
require.Equal(t, expectedVector2, val.(model.Vector))
14521452
}
1453-
1454-
func TestDistributedExecResults(t *testing.T) {
1455-
// e2e test setup
1456-
s, err := e2e.NewScenario(networkName)
1457-
require.NoError(t, err)
1458-
defer s.Close()
1459-
1460-
// initialize the flags
1461-
flags := mergeFlags(
1462-
BlocksStorageFlags(),
1463-
map[string]string{
1464-
"-blocks-storage.tsdb.block-ranges-period": (5 * time.Second).String(),
1465-
"-blocks-storage.tsdb.ship-interval": "1s",
1466-
"-blocks-storage.tsdb.retention-period": ((5 * time.Second * 2) - 1).String(),
1467-
"-querier.thanos-engine": "true",
1468-
"-querier.distributed-exec-enabled": "true",
1469-
},
1470-
)
1471-
1472-
flags2 := mergeFlags(
1473-
BlocksStorageFlags(),
1474-
map[string]string{
1475-
"-blocks-storage.tsdb.block-ranges-period": (5 * time.Second).String(),
1476-
"-blocks-storage.tsdb.ship-interval": "1s",
1477-
"-blocks-storage.tsdb.retention-period": ((5 * time.Second * 2) - 1).String(),
1478-
"-querier.thanos-engine": "true",
1479-
"-querier.distributed-exec-enabled": "false",
1480-
},
1481-
)
1482-
1483-
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
1484-
consul := e2edb.NewConsul()
1485-
require.NoError(t, s.StartAndWaitReady(consul, minio))
1486-
1487-
// START SERVICE 1
1488-
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1489-
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1490-
queryScheduler := e2ecortex.NewQueryScheduler("query-scheduler", flags, "")
1491-
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1492-
require.NoError(t, s.StartAndWaitReady(queryScheduler, distributor, ingester, storeGateway))
1493-
flags = mergeFlags(flags, map[string]string{
1494-
"-querier.store-gateway-addresses": strings.Join([]string{storeGateway.NetworkGRPCEndpoint()}, ","),
1495-
})
1496-
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", mergeFlags(flags, map[string]string{
1497-
"-frontend.scheduler-address": queryScheduler.NetworkGRPCEndpoint(),
1498-
}), "")
1499-
require.NoError(t, s.Start(queryFrontend))
1500-
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
1501-
"-querier.scheduler-address": queryScheduler.NetworkGRPCEndpoint(),
1502-
}), "")
1503-
require.NoError(t, s.StartAndWaitReady(querier))
1504-
1505-
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
1506-
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*512), "cortex_ring_tokens_total"))
1507-
1508-
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-1")
1509-
require.NoError(t, err)
1510-
1511-
// START SERVICE 2
1512-
distributor2 := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags2, "")
1513-
ingester2 := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags2, "")
1514-
queryScheduler2 := e2ecortex.NewQueryScheduler("query-scheduler", flags2, "")
1515-
storeGateway2 := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1516-
require.NoError(t, s.StartAndWaitReady(queryScheduler2, distributor2, ingester2, storeGateway2))
1517-
flags2 = mergeFlags(flags2, map[string]string{
1518-
"-querier.store-gateway-addresses": strings.Join([]string{storeGateway2.NetworkGRPCEndpoint()}, ","),
1519-
})
1520-
queryFrontend2 := e2ecortex.NewQueryFrontend("query-frontend", mergeFlags(flags2, map[string]string{
1521-
"-frontend.scheduler-address": queryScheduler2.NetworkGRPCEndpoint(),
1522-
}), "")
1523-
require.NoError(t, s.Start(queryFrontend2))
1524-
querier2 := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags2, map[string]string{
1525-
"-querier.scheduler-address": queryScheduler2.NetworkGRPCEndpoint(),
1526-
}), "")
1527-
require.NoError(t, s.StartAndWaitReady(querier2))
1528-
require.NoError(t, distributor2.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
1529-
require.NoError(t, querier2.WaitSumMetrics(e2e.Equals(2*512), "cortex_ring_tokens_total"))
1530-
c2, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend2.HTTPEndpoint(), "", "", "user-2")
1531-
require.NoError(t, err)
1532-
1533-
// INJECT DATA
1534-
series1Timestamp := time.Now()
1535-
series2Timestamp := series1Timestamp
1536-
series1, expectedVector1 := generateSeries("series_1", series1Timestamp, prompb.Label{Name: "series_1", Value: "series_1"})
1537-
series2, expectedVector2 := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "series_2", Value: "series_2"})
1538-
1539-
res, err := c.Push(series1)
1540-
require.NoError(t, err)
1541-
require.Equal(t, 200, res.StatusCode)
1542-
res, err = c2.Push(series1)
1543-
require.NoError(t, err)
1544-
require.Equal(t, 200, res.StatusCode)
1545-
1546-
res, err = c.Push(series2)
1547-
require.NoError(t, err)
1548-
require.Equal(t, 200, res.StatusCode)
1549-
res, err = c2.Push(series2)
1550-
require.NoError(t, err)
1551-
require.Equal(t, 200, res.StatusCode)
1552-
1553-
var val model.Value
1554-
val, err = c.Query("series_1", series1Timestamp)
1555-
require.NoError(t, err)
1556-
require.Equal(t, expectedVector1, val.(model.Vector))
1557-
val, err = c.Query("series_2", series2Timestamp)
1558-
require.NoError(t, err)
1559-
require.Equal(t, expectedVector2, val.(model.Vector))
1560-
1561-
val, err = c2.Query("series_1", series1Timestamp)
1562-
require.NoError(t, err)
1563-
require.Equal(t, expectedVector1, val.(model.Vector))
1564-
val, err = c2.Query("series_2", series2Timestamp)
1565-
require.NoError(t, err)
1566-
require.Equal(t, expectedVector2, val.(model.Vector))
1567-
1568-
val, err = c.Query("sum(series_1)+sum(series_2)", series1Timestamp)
1569-
require.NoError(t, err)
1570-
val_distributed, err := c2.Query("sum(series_1)+sum(series_2)", series1Timestamp)
1571-
require.NoError(t, err)
1572-
require.Equal(t, val, val_distributed)
1573-
}

pkg/api/queryapi/query_api.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,16 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
109109
ctx = engine.AddEngineTypeToContext(ctx, r)
110110
ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader))
111111

112+
var isRoot bool
113+
var queryID, fragmentID uint64
114+
if q.distributedExecEnabled {
115+
isRoot, queryID, fragmentID, _, _ = distributed_execution.ExtractFragmentMetaData(ctx)
116+
if !isRoot {
117+
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
118+
q.queryResultCache.InitWriting(key)
119+
}
120+
}
121+
112122
var qry promql.Query
113123
startTime := convertMsToTime(start)
114124
endTime := convertMsToTime(end)
@@ -118,10 +128,22 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
118128
if len(byteLP) != 0 {
119129
logicalPlan, err := distributed_execution.Unmarshal(byteLP)
120130
if err != nil {
131+
if q.distributedExecEnabled {
132+
if !isRoot {
133+
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
134+
q.queryResultCache.SetError(key)
135+
}
136+
}
121137
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("invalid logical plan: %v", err)}, nil, nil}
122138
}
123139
qry, err = q.queryEngine.MakeRangeQueryFromPlan(ctx, q.queryable, opts, logicalPlan, startTime, endTime, stepDuration, r.FormValue("query"))
124140
if err != nil {
141+
if q.distributedExecEnabled {
142+
if !isRoot {
143+
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
144+
q.queryResultCache.SetError(key)
145+
}
146+
}
125147
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("failed to create range query from logical plan: %v", err)}, nil, nil}
126148
}
127149
} else { // if there is logical plan field is empty, fall back
@@ -142,14 +164,6 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
142164

143165
ctx = httputil.ContextFromRequest(ctx, r)
144166

145-
if q.distributedExecEnabled {
146-
isRoot, queryID, fragmentID, _, _ := distributed_execution.ExtractFragmentMetaData(ctx)
147-
if !isRoot {
148-
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
149-
q.queryResultCache.InitWriting(key)
150-
}
151-
}
152-
153167
res := qry.Exec(ctx)
154168
if res.Err != nil {
155169
return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}

0 commit comments

Comments
 (0)