Skip to content

Commit 818aa71

Browse files
committed
more testing and test fixes
1 parent 4542e0b commit 818aa71

File tree

20 files changed

+773
-268
lines changed

20 files changed

+773
-268
lines changed

integration/querier_test.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1450,3 +1450,124 @@ func TestQuerierDistributedExecution(t *testing.T) {
14501450
require.NoError(t, err)
14511451
require.Equal(t, expectedVector2, val.(model.Vector))
14521452
}
1453+
1454+
func TestDistributedExecResults(t *testing.T) {
1455+
// e2e test setup
1456+
s, err := e2e.NewScenario(networkName)
1457+
require.NoError(t, err)
1458+
defer s.Close()
1459+
1460+
// initialize the flags
1461+
flags := mergeFlags(
1462+
BlocksStorageFlags(),
1463+
map[string]string{
1464+
"-blocks-storage.tsdb.block-ranges-period": (5 * time.Second).String(),
1465+
"-blocks-storage.tsdb.ship-interval": "1s",
1466+
"-blocks-storage.tsdb.retention-period": ((5 * time.Second * 2) - 1).String(),
1467+
"-querier.thanos-engine": "true",
1468+
"-querier.distributed-exec-enabled": "true",
1469+
},
1470+
)
1471+
1472+
flags2 := mergeFlags(
1473+
BlocksStorageFlags(),
1474+
map[string]string{
1475+
"-blocks-storage.tsdb.block-ranges-period": (5 * time.Second).String(),
1476+
"-blocks-storage.tsdb.ship-interval": "1s",
1477+
"-blocks-storage.tsdb.retention-period": ((5 * time.Second * 2) - 1).String(),
1478+
"-querier.thanos-engine": "true",
1479+
"-querier.distributed-exec-enabled": "false",
1480+
},
1481+
)
1482+
1483+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
1484+
consul := e2edb.NewConsul()
1485+
require.NoError(t, s.StartAndWaitReady(consul, minio))
1486+
1487+
// START SERVICE 1
1488+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1489+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1490+
queryScheduler := e2ecortex.NewQueryScheduler("query-scheduler", flags, "")
1491+
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1492+
require.NoError(t, s.StartAndWaitReady(queryScheduler, distributor, ingester, storeGateway))
1493+
flags = mergeFlags(flags, map[string]string{
1494+
"-querier.store-gateway-addresses": strings.Join([]string{storeGateway.NetworkGRPCEndpoint()}, ","),
1495+
})
1496+
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", mergeFlags(flags, map[string]string{
1497+
"-frontend.scheduler-address": queryScheduler.NetworkGRPCEndpoint(),
1498+
}), "")
1499+
require.NoError(t, s.Start(queryFrontend))
1500+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
1501+
"-querier.scheduler-address": queryScheduler.NetworkGRPCEndpoint(),
1502+
}), "")
1503+
require.NoError(t, s.StartAndWaitReady(querier))
1504+
1505+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
1506+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*512), "cortex_ring_tokens_total"))
1507+
1508+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-1")
1509+
require.NoError(t, err)
1510+
1511+
// START SERVICE 2
1512+
distributor2 := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags2, "")
1513+
ingester2 := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags2, "")
1514+
queryScheduler2 := e2ecortex.NewQueryScheduler("query-scheduler", flags2, "")
1515+
storeGateway2 := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1516+
require.NoError(t, s.StartAndWaitReady(queryScheduler2, distributor2, ingester2, storeGateway2))
1517+
flags2 = mergeFlags(flags2, map[string]string{
1518+
"-querier.store-gateway-addresses": strings.Join([]string{storeGateway2.NetworkGRPCEndpoint()}, ","),
1519+
})
1520+
queryFrontend2 := e2ecortex.NewQueryFrontend("query-frontend", mergeFlags(flags2, map[string]string{
1521+
"-frontend.scheduler-address": queryScheduler2.NetworkGRPCEndpoint(),
1522+
}), "")
1523+
require.NoError(t, s.Start(queryFrontend2))
1524+
querier2 := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags2, map[string]string{
1525+
"-querier.scheduler-address": queryScheduler2.NetworkGRPCEndpoint(),
1526+
}), "")
1527+
require.NoError(t, s.StartAndWaitReady(querier2))
1528+
require.NoError(t, distributor2.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
1529+
require.NoError(t, querier2.WaitSumMetrics(e2e.Equals(2*512), "cortex_ring_tokens_total"))
1530+
c2, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend2.HTTPEndpoint(), "", "", "user-2")
1531+
require.NoError(t, err)
1532+
1533+
// INJECT DATA
1534+
series1Timestamp := time.Now()
1535+
series2Timestamp := series1Timestamp
1536+
series1, expectedVector1 := generateSeries("series_1", series1Timestamp, prompb.Label{Name: "series_1", Value: "series_1"})
1537+
series2, expectedVector2 := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "series_2", Value: "series_2"})
1538+
1539+
res, err := c.Push(series1)
1540+
require.NoError(t, err)
1541+
require.Equal(t, 200, res.StatusCode)
1542+
res, err = c2.Push(series1)
1543+
require.NoError(t, err)
1544+
require.Equal(t, 200, res.StatusCode)
1545+
1546+
res, err = c.Push(series2)
1547+
require.NoError(t, err)
1548+
require.Equal(t, 200, res.StatusCode)
1549+
res, err = c2.Push(series2)
1550+
require.NoError(t, err)
1551+
require.Equal(t, 200, res.StatusCode)
1552+
1553+
var val model.Value
1554+
val, err = c.Query("series_1", series1Timestamp)
1555+
require.NoError(t, err)
1556+
require.Equal(t, expectedVector1, val.(model.Vector))
1557+
val, err = c.Query("series_2", series2Timestamp)
1558+
require.NoError(t, err)
1559+
require.Equal(t, expectedVector2, val.(model.Vector))
1560+
1561+
val, err = c2.Query("series_1", series1Timestamp)
1562+
require.NoError(t, err)
1563+
require.Equal(t, expectedVector1, val.(model.Vector))
1564+
val, err = c2.Query("series_2", series2Timestamp)
1565+
require.NoError(t, err)
1566+
require.Equal(t, expectedVector2, val.(model.Vector))
1567+
1568+
val, err = c.Query("sum(series_1)+sum(series_2)", series1Timestamp)
1569+
require.NoError(t, err)
1570+
val_distributed, err := c2.Query("sum(series_1)+sum(series_2)", series1Timestamp)
1571+
require.NoError(t, err)
1572+
require.Equal(t, val, val_distributed)
1573+
}

pkg/api/handlers_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package api
22

33
import (
44
"encoding/json"
5+
"github.com/cortexproject/cortex/pkg/querier"
56
"io"
67
"net/http"
78
"net/http/httptest"
@@ -232,7 +233,7 @@ func TestBuildInfoAPI(t *testing.T) {
232233
version.Version = tc.version
233234
version.Branch = tc.branch
234235
version.Revision = tc.revision
235-
handler := NewQuerierHandler(cfg, nil, nil, nil, nil, nil, nil, &FakeLogger{}, false)
236+
handler := NewQuerierHandler(cfg, querier.Config{}, nil, nil, nil, nil, nil, nil, &FakeLogger{}, false)
236237
writer := httptest.NewRecorder()
237238
req := httptest.NewRequest("GET", "/api/v1/status/buildinfo", nil)
238239
req = req.WithContext(user.InjectOrgID(req.Context(), "test"))

pkg/cortex/modules.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,9 @@ import (
77
"github.com/cortexproject/cortex/pkg/distributed_execution"
88
"github.com/cortexproject/cortex/pkg/ring/client"
99
"log/slog"
10-
"net"
1110
"net/http"
1211
"runtime"
1312
"runtime/debug"
14-
"strconv"
1513
"strings"
1614
"time"
1715

@@ -370,12 +368,6 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
370368
t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent
371369
t.Cfg.Worker.TargetHeaders = t.Cfg.API.HTTPRequestHeadersToLog
372370

373-
ipAddr, err := ring.GetInstanceAddr(t.Cfg.Alertmanager.ShardingRing.InstanceAddr, t.Cfg.Alertmanager.ShardingRing.InstanceInterfaceNames, util_log.Logger)
374-
if err != nil {
375-
return nil, err
376-
}
377-
serverAddress := net.JoinHostPort(ipAddr, strconv.Itoa(t.Cfg.Server.GRPCListenPort))
378-
379371
// Create new map for caching partial results during distributed execution
380372
var queryResultCache *distributed_execution.QueryResultCache
381373
var queryServer *distributed_execution.QuerierServer
@@ -451,12 +443,12 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
451443
}
452444

453445
if t.Cfg.Querier.DistributedExecEnabled {
454-
querierPool := distributed_execution.NewQuerierPool(t.Cfg.QueryScheduler.GRPCClientConfig, prometheus.DefaultRegisterer, util_log.Logger)
446+
querierPool := distributed_execution.NewQuerierPool(t.Cfg.Worker.GRPCClientConfig, prometheus.DefaultRegisterer, util_log.Logger)
455447
internalQuerierRouter = injectPool(internalQuerierRouter, querierPool)
456448
//go watchQuerierRingAndUpdatePool(context.Background(), t.Ring, querierPool)
457449
}
458450

459-
return querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(internalQuerierRouter), util_log.Logger, prometheus.DefaultRegisterer, serverAddress, t.Cfg.Querier.DistributedExecEnabled, queryResultCache)
451+
return querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(internalQuerierRouter), util_log.Logger, prometheus.DefaultRegisterer, t.Cfg.Querier.DistributedExecEnabled, queryResultCache)
460452
}
461453

462454
func injectPool(next http.Handler, pool *client.Pool) http.Handler {
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package distributed_execution
2+
3+
//func TestDistributedExec(t *testing.T) {
4+
//
5+
// // non-fragmented query
6+
// now := time.Now()
7+
// plan := createTestLogicalPlan(t, now, now, 0, "sum() + sum()")
8+
// oneFrag := []plan_fragments.Fragment{
9+
// {
10+
// Node: plan.Root(),
11+
// FragmentID: 0,
12+
// ChildIDs: []uint64{},
13+
// IsRoot: true,
14+
// },
15+
// }
16+
//
17+
// // fragmented query
18+
// plan2 := createTestLogicalPlan(t, now, now, 0, "sum() + sum()")
19+
// dOptimizer := DistributedOptimizer{}
20+
// node2, _, err := dOptimizer.Optimize(plan2.Root())
21+
// require.NoError(t, err)
22+
//
23+
// fragments, err := FragmentLogicalPlanNode(uint64(2), node2)
24+
// require.NoError(t, err)
25+
//
26+
// // get results
27+
//
28+
// // compare if they are the same
29+
//}

pkg/distributed_execution/id.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,7 @@ type fragmentMetadata struct {
1313
isRoot bool
1414
}
1515

16-
func InjectFragmentMetaData(ctx context.Context, fragmentID uint64, queryID uint64, isRoot bool, childIDs []uint64, childAddr []string) context.Context {
17-
18-
childIDToAddr := make(map[uint64]string, len(childIDs))
19-
for i, childID := range childIDs {
20-
childIDToAddr[childID] = childAddr[i]
21-
}
16+
func InjectFragmentMetaData(ctx context.Context, fragmentID uint64, queryID uint64, isRoot bool, childIDToAddr map[uint64]string) context.Context {
2217

2318
return context.WithValue(ctx, fragmentMetadataKey{}, fragmentMetadata{
2419
queryID: queryID,

pkg/distributed_execution/id_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,12 @@ func TestFragmentMetadata(t *testing.T) {
4545
t.Run(tt.name, func(t *testing.T) {
4646
// injection
4747
ctx := context.Background()
48-
newCtx := InjectFragmentMetaData(ctx, tt.fragID, tt.queryID, tt.isRoot, tt.childIDs, tt.childAddr)
48+
49+
childIDToAddr := make(map[uint64]string)
50+
for i, childID := range tt.childIDs {
51+
childIDToAddr[childID] = tt.childAddr[i]
52+
}
53+
newCtx := InjectFragmentMetaData(ctx, tt.fragID, tt.queryID, tt.isRoot, childIDToAddr)
4954

5055
// extraction
5156
isRoot, queryID, fragmentID, childAddrs, ok := ExtractFragmentMetaData(newCtx)

pkg/distributed_execution/querier_service_client.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,10 @@ func NewQuerierPool(cfg grpcclient.Config, reg prometheus.Registerer, log log.Lo
3535
}, []string{"operation", "status_code"})
3636

3737
clientsGauge := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
38-
Name: "cortex_querier_query_clients",
39-
Help: "The current number of clients connected to querier.",
38+
Namespace: "cortex",
39+
Name: "cortex_querier_query_clients",
40+
Help: "TThe current number of clients connected to querier.",
41+
ConstLabels: map[string]string{"client": "querier"},
4042
})
4143

4244
poolConfig := client.PoolConfig{

0 commit comments

Comments
 (0)