Skip to content

Commit de95cba

Browse files
committed
allow distributed-exec-enabled param changes in request for demo
1 parent 0514dea commit de95cba

File tree

7 files changed

+219
-21
lines changed

7 files changed

+219
-21
lines changed

pkg/frontend/frontend_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ func testFrontend(t *testing.T, config CombinedFrontendConfig, handler http.Hand
291291
go grpcServer.Serve(grpcListen) //nolint:errcheck
292292

293293
var worker services.Service
294-
worker, err = querier_worker.NewQuerierWorker(workerConfig, httpgrpc_server.NewServer(handler), logger, nil)
294+
worker, err = querier_worker.NewQuerierWorker(workerConfig, httpgrpc_server.NewServer(handler), logger, nil, "", false, nil)
295295
require.NoError(t, err)
296296
require.NoError(t, services.StartAndAwaitRunning(context.Background(), worker))
297297

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package tripperware
2+
3+
import (
4+
"context"
5+
"strconv"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestLogicalPlanGenWithPromReq(t *testing.T) {
13+
testCases := []struct {
14+
name string
15+
queryType string // "instant" or "range"
16+
input *PrometheusRequest
17+
err error
18+
}{
19+
{
20+
name: "instant - rate vector selector",
21+
queryType: "instant",
22+
input: &PrometheusRequest{
23+
Start: 100000,
24+
End: 100000,
25+
Query: "rate(node_cpu_seconds_total{mode!=\"idle\"}[5m])",
26+
DistributedExec: true,
27+
},
28+
},
29+
{
30+
name: "instant - rate vector selector",
31+
queryType: "instant",
32+
input: &PrometheusRequest{
33+
Start: 100000,
34+
End: 100000,
35+
Query: "rate(node_cpu_seconds_total{mode!=\"idle\"}[5m])",
36+
DistributedExec: false,
37+
},
38+
},
39+
{
40+
name: "instant - memory usage expression",
41+
queryType: "instant",
42+
input: &PrometheusRequest{
43+
Start: 100000,
44+
End: 100000,
45+
Query: "100 * (1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes))",
46+
DistributedExec: true,
47+
},
48+
},
49+
{
50+
name: "instant - scalar only query",
51+
queryType: "instant",
52+
input: &PrometheusRequest{
53+
Start: 100000,
54+
End: 100000,
55+
Query: "42",
56+
DistributedExec: false,
57+
},
58+
},
59+
}
60+
61+
for i, tc := range testCases {
62+
tc := tc
63+
t.Run(strconv.Itoa(i)+"_"+tc.name, func(t *testing.T) {
64+
t.Parallel()
65+
66+
middleware := DistributedQueryMiddleware(time.Minute, 5*time.Minute)
67+
68+
handler := middleware.Wrap(HandlerFunc(func(_ context.Context, req Request) (Response, error) {
69+
return nil, nil
70+
}))
71+
72+
// additional validation on the test cases based on query type
73+
if tc.queryType == "range" {
74+
require.NotZero(t, tc.input.Step, "range query should have non-zero step")
75+
require.NotEqual(t, tc.input.Start, tc.input.End, "range query should have different start and end times")
76+
} else {
77+
require.Equal(t, tc.input.Start, tc.input.End, "instant query should have equal start and end times")
78+
require.Zero(t, tc.input.Step, "instant query should have zero step")
79+
}
80+
81+
// test: execute middleware to populate the logical plan
82+
_, err := handler.Do(context.Background(), tc.input)
83+
require.NoError(t, err)
84+
85+
if tc.input.DistributedExec {
86+
require.NotEmpty(t, tc.input.LogicalPlan, "logical plan should be populated")
87+
} else {
88+
require.Empty(t, tc.input.LogicalPlan, "logical plan should be empty")
89+
}
90+
91+
})
92+
}
93+
}

pkg/querier/tripperware/distributed_query.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -88,18 +88,20 @@ func (d distributedQueryMiddleware) Do(ctx context.Context, r Request) (Response
8888
return nil, httpgrpc.Errorf(http.StatusBadRequest, "invalid request format")
8989
}
9090

91-
startTime := time.Unix(0, promReq.Start*int64(time.Millisecond))
92-
endTime := time.Unix(0, promReq.End*int64(time.Millisecond))
93-
step := time.Duration(promReq.Step) * time.Millisecond
91+
if promReq.DistributedExec {
92+
startTime := time.Unix(0, promReq.Start*int64(time.Millisecond))
93+
endTime := time.Unix(0, promReq.End*int64(time.Millisecond))
94+
step := time.Duration(promReq.Step) * time.Millisecond
9495

95-
var err error
96+
var err error
9697

97-
newLogicalPlan, err := d.newLogicalPlan(promReq.Query, startTime, endTime, step)
98-
if err != nil {
99-
return nil, err
100-
}
98+
newLogicalPlan, err := d.newLogicalPlan(promReq.Query, startTime, endTime, step)
99+
if err != nil {
100+
return nil, err
101+
}
101102

102-
promReq.LogicalPlan = *newLogicalPlan
103+
promReq.LogicalPlan = *newLogicalPlan
104+
}
103105

104106
return d.next.Do(ctx, r)
105107
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package instantquery
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/cortexproject/cortex/pkg/querier/tripperware"
7+
"github.com/stretchr/testify/require"
8+
"github.com/weaveworks/common/user"
9+
"net/http"
10+
"testing"
11+
"time"
12+
)
13+
14+
func TestRequestDistributedExec(t *testing.T) {
15+
t.Parallel()
16+
codec := testInstantQueryCodec
17+
18+
for _, tc := range []struct {
19+
url string
20+
expectedURL string
21+
expected *tripperware.PrometheusRequest
22+
expectedErr error
23+
}{
24+
{
25+
url: "/api/v1/query?query=sum%28container_memory_rss%29+by+%28namespace%29&stats=all&time=1536673680",
26+
expectedURL: "/api/v1/query?query=sum%28container_memory_rss%29+by+%28namespace%29&stats=all&time=1536673680",
27+
expected: &tripperware.PrometheusRequest{
28+
Path: "/api/v1/query",
29+
Time: 1536673680 * 1e3,
30+
Query: "sum(container_memory_rss) by (namespace)",
31+
Stats: "all",
32+
Headers: map[string][]string{
33+
"Test-Header": {"test"},
34+
},
35+
DistributedExec: false,
36+
},
37+
},
38+
{
39+
url: "/api/v1/query?query=sum%28container_memory_rss%29+by+%28namespace%29&time=1536673680&distributedExec=false",
40+
expectedURL: "/api/v1/query?query=sum%28container_memory_rss%29+by+%28namespace%29&time=1536673680",
41+
expected: &tripperware.PrometheusRequest{
42+
Path: "/api/v1/query",
43+
Time: 1536673680 * 1e3,
44+
Query: "sum(container_memory_rss) by (namespace)",
45+
DistributedExec: false,
46+
Stats: "",
47+
Headers: map[string][]string{
48+
"Test-Header": {"test"},
49+
},
50+
},
51+
},
52+
{
53+
url: "/api/v1/query?query=sum%28container_memory_rss%29+by+%28namespace%29&time=1536673680&distributedExec=true",
54+
expectedURL: "/api/v1/query?query=sum%28container_memory_rss%29+by+%28namespace%29&time=1536673680",
55+
expected: &tripperware.PrometheusRequest{
56+
Path: "/api/v1/query",
57+
Time: 1536673680 * 1e3,
58+
Query: "sum(container_memory_rss) by (namespace)",
59+
DistributedExec: true,
60+
Stats: "",
61+
Headers: map[string][]string{
62+
"Test-Header": {"test"},
63+
},
64+
},
65+
},
66+
} {
67+
tc := tc
68+
t.Run(tc.url, func(t *testing.T) {
69+
t.Parallel()
70+
r, err := http.NewRequest("POST", tc.url, http.NoBody)
71+
require.NoError(t, err)
72+
r.Header.Add("Test-Header", "test")
73+
74+
ctx := user.InjectOrgID(context.Background(), "1")
75+
76+
r = r.Clone(ctx)
77+
78+
if tc.expected.Time == 0 {
79+
now := time.Now()
80+
tc.expectedURL = fmt.Sprintf("%s%d", tc.expectedURL, now.Unix())
81+
tc.expected.Time = now.Unix() * 1e3
82+
}
83+
req, err := codec.DecodeRequest(ctx, r, []string{"Test-Header"})
84+
if err != nil {
85+
require.EqualValues(t, tc.expectedErr, err)
86+
return
87+
}
88+
require.EqualValues(t, tc.expected, req)
89+
90+
rdash, err := codec.EncodeRequest(context.Background(), req)
91+
require.NoError(t, err)
92+
require.EqualValues(t, tc.expectedURL, rdash.RequestURI)
93+
})
94+
}
95+
}

pkg/querier/tripperware/instantquery/instant_query.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"io"
77
"net/http"
88
"net/url"
9+
"strconv"
910
"strings"
1011
"time"
1112

@@ -81,6 +82,9 @@ func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, for
8182
result.Stats = r.FormValue("stats")
8283
result.Path = r.URL.Path
8384

85+
strBool := r.FormValue("distributedExec")
86+
result.DistributedExec, _ = strconv.ParseBool(strBool)
87+
8488
isSourceRuler := strings.Contains(r.Header.Get("User-Agent"), tripperware.RulerUserAgent)
8589
if isSourceRuler {
8690
// When the source is the Ruler, then forward whole headers

pkg/querier/tripperware/query.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -148,17 +148,18 @@ type CachingOptions struct {
148148

149149
type PrometheusRequest struct {
150150
Request
151-
Time int64
152-
Start int64
153-
End int64
154-
Step int64
155-
Timeout time.Duration
156-
Query string
157-
Path string
158-
Headers http.Header
159-
Stats string
160-
CachingOptions CachingOptions
161-
LogicalPlan logicalplan.Plan
151+
Time int64
152+
Start int64
153+
End int64
154+
Step int64
155+
Timeout time.Duration
156+
Query string
157+
Path string
158+
Headers http.Header
159+
Stats string
160+
CachingOptions CachingOptions
161+
LogicalPlan logicalplan.Plan
162+
DistributedExec bool
162163
}
163164

164165
func (m *PrometheusRequest) GetPath() string {

pkg/querier/tripperware/queryrange/query_range.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ func (c prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forwa
128128
return nil, queryapi.ErrNegativeStep
129129
}
130130

131+
strBool := r.FormValue("distributedExec")
132+
result.DistributedExec, _ = strconv.ParseBool(strBool)
133+
131134
// For safety, limit the number of returned points per timeseries.
132135
// This is sufficient for 60s resolution for a week or 1h resolution for a year.
133136
if (result.End-result.Start)/result.Step > 11000 {

0 commit comments

Comments
 (0)