Skip to content

Commit c8907fe

Browse files
committed
init distributed optimizer and shard storage implementation
1 parent 50b29ac commit c8907fe

File tree

10 files changed

+349
-23
lines changed

10 files changed

+349
-23
lines changed

pkg/cortex/modules.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,7 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
367367

368368
t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent
369369
t.Cfg.Worker.TargetHeaders = t.Cfg.API.HTTPRequestHeadersToLog
370+
t.Cfg.Worker.ListenPort = t.Cfg.Server.GRPCListenPort
370371

371372
// Create new map for caching partial results during distributed execution
372373
var queryResultCache *distributed_execution.QueryResultCache
@@ -447,7 +448,6 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
447448
internalQuerierRouter = injectPool(internalQuerierRouter, querierPool)
448449
//go watchQuerierRingAndUpdatePool(context.Background(), t.Ring, querierPool)
449450
}
450-
451451
return querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(internalQuerierRouter), util_log.Logger, prometheus.DefaultRegisterer, t.Cfg.Querier.DistributedExecEnabled, queryResultCache)
452452
}
453453

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package distributed_execution
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/prometheus/prometheus/promql/parser"
7+
"github.com/prometheus/prometheus/storage"
8+
"github.com/thanos-io/promql-engine/execution"
9+
"github.com/thanos-io/promql-engine/execution/exchange"
10+
"github.com/thanos-io/promql-engine/execution/model"
11+
"github.com/thanos-io/promql-engine/logicalplan"
12+
"github.com/thanos-io/promql-engine/query"
13+
)
14+
15+
type ShardedRemoteExecutions struct {
16+
Expressions []logicalplan.Node `json:"-"`
17+
}
18+
19+
func (r *ShardedRemoteExecutions) MakeExecutionOperator(
20+
ctx context.Context,
21+
vectors *model.VectorPool,
22+
opts *query.Options,
23+
hints storage.SelectHints,
24+
) (model.VectorOperator, error) {
25+
operators := make([]model.VectorOperator, len(r.Expressions))
26+
var err error
27+
for i := 0; i < len(operators); i++ {
28+
operators[i], err = execution.New(ctx, r.Expressions[i], nil, opts)
29+
if err != nil {
30+
return nil, err
31+
}
32+
}
33+
coalesce := exchange.NewCoalesce(vectors, opts, 0, operators...)
34+
return exchange.NewConcurrent(coalesce, 2, opts), nil
35+
}
36+
37+
func copyHints(hints storage.SelectHints) storage.SelectHints {
38+
return storage.SelectHints{
39+
Start: hints.Start,
40+
End: hints.End,
41+
Limit: hints.Limit,
42+
Step: hints.Step,
43+
Func: hints.Func,
44+
Grouping: hints.Grouping,
45+
By: hints.By,
46+
Range: hints.Range,
47+
ShardCount: hints.ShardCount,
48+
ShardIndex: hints.ShardIndex,
49+
DisableTrimming: hints.DisableTrimming,
50+
}
51+
}
52+
53+
func (r ShardedRemoteExecutions) Clone() logicalplan.Node {
54+
clone := r
55+
clone.Expressions = make([]logicalplan.Node, len(r.Expressions))
56+
for i, e := range r.Expressions {
57+
clone.Expressions[i] = e.Clone().(*Remote)
58+
}
59+
return clone
60+
}
61+
62+
func (r ShardedRemoteExecutions) Children() []*logicalplan.Node {
63+
return []*logicalplan.Node{&r.Expressions[0], &r.Expressions[1]}
64+
}
65+
66+
func (r ShardedRemoteExecutions) String() string {
67+
return fmt.Sprintf("shard(%s)", "str")
68+
}
69+
70+
func (r ShardedRemoteExecutions) ReturnType() parser.ValueType { return RemoteNode }
71+
72+
func (r ShardedRemoteExecutions) Type() logicalplan.NodeType { return ShardedRemoteExecutionNode }

pkg/distributed_execution/distributed_optimizer.go

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ package distributed_execution
22

33
import (
44
"fmt"
5-
5+
"github.com/prometheus/prometheus/model/labels"
6+
"github.com/prometheus/prometheus/promql/parser"
67
"github.com/prometheus/prometheus/util/annotations"
78
"github.com/thanos-io/promql-engine/logicalplan"
89
)
@@ -22,6 +23,54 @@ func (d *DistributedOptimizer) Optimize(root logicalplan.Node) (logicalplan.Node
2223
return nil, *warns, fmt.Errorf("nil root node")
2324
}
2425

26+
//logicalplan.TraverseBottomUp(nil, &root, func(parent, current *logicalplan.Node) bool {
27+
// if aggr, ok := (*current).(*logicalplan.Aggregation); ok {
28+
// // count -> sum( count(shard1) + count(shard2))
29+
// if aggr.Op == parser.COUNT {
30+
// subqueries := newRemoteAggregation(aggr, 2)
31+
// *current = &logicalplan.Aggregation{
32+
// Op: parser.SUM,
33+
// Expr: ShardedRemoteExecutions{Expressions: subqueries},
34+
// Param: aggr.Param,
35+
// Grouping: aggr.Grouping,
36+
// Without: aggr.Without,
37+
// }
38+
// }
39+
//
40+
// return true
41+
// }
42+
//
43+
// return false
44+
//})
45+
46+
logicalplan.TraverseBottomUp(nil, &root, func(parent, current *logicalplan.Node) (stop bool) {
47+
if aggr, ok := (*current).(*logicalplan.Aggregation); ok {
48+
// sum -> sum( count(shard1) + count(shard2))
49+
if aggr.Op == parser.SUM {
50+
subqueries := newRemoteAggregation(aggr, 2)
51+
*current = &logicalplan.Aggregation{
52+
Op: parser.SUM,
53+
Expr: ShardedRemoteExecutions{Expressions: subqueries},
54+
Param: aggr.Param,
55+
Grouping: aggr.Grouping,
56+
Without: aggr.Without,
57+
}
58+
// count -> sum(count, count)
59+
} else if aggr.Op == parser.COUNT {
60+
subqueries := newRemoteAggregation(aggr, 2)
61+
*current = &logicalplan.Aggregation{
62+
Op: parser.SUM,
63+
Expr: ShardedRemoteExecutions{Expressions: subqueries},
64+
Param: aggr.Param,
65+
Grouping: aggr.Grouping,
66+
Without: aggr.Without,
67+
}
68+
}
69+
return true
70+
}
71+
return false
72+
})
73+
2574
logicalplan.TraverseBottomUp(nil, &root, func(parent, current *logicalplan.Node) bool {
2675

2776
if (*current).Type() == logicalplan.BinaryNode {
@@ -36,5 +85,36 @@ func (d *DistributedOptimizer) Optimize(root logicalplan.Node) (logicalplan.Node
3685

3786
return false
3887
})
88+
3989
return root, *warns, nil
4090
}
91+
92+
func insertShardNum(root logicalplan.Node, shardCount int, shardIdx int) logicalplan.Node {
93+
logicalplan.TraverseBottomUp(nil, &root, func(parent, current *logicalplan.Node) bool {
94+
if (*current).Type() == logicalplan.VectorSelectorNode {
95+
cur := (*current).(*logicalplan.VectorSelector)
96+
97+
cur.LabelMatchers = append(cur.LabelMatchers, labels.MustNewMatcher(labels.MatchEqual, "__cortex_ingester_shard__", fmt.Sprintf("%d_%d", shardCount, shardIdx)))
98+
}
99+
return false
100+
})
101+
return root
102+
}
103+
104+
func newRemoteAggregation(rootAggregation *logicalplan.Aggregation, shardNum int) []logicalplan.Node {
105+
nodes := []logicalplan.Node{}
106+
107+
for i := 0; i < shardNum; i++ {
108+
rc := rootAggregation.Expr.Clone()
109+
rc = insertShardNum(&Remote{
110+
Expr: &logicalplan.Aggregation{
111+
Op: rootAggregation.Op,
112+
Expr: rc,
113+
Param: rootAggregation.Param,
114+
Grouping: rootAggregation.Grouping,
115+
}}, shardNum, i)
116+
nodes = append(nodes, rc)
117+
}
118+
119+
return nodes
120+
}

pkg/distributed_execution/distributed_optimizer_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,55 @@ import (
1010
"github.com/thanos-io/promql-engine/query"
1111
)
1212

13+
func TestDistributedOptimizerWithShard(t *testing.T) {
14+
now := time.Now()
15+
testCases := []struct {
16+
name string
17+
query string
18+
start time.Time
19+
end time.Time
20+
step time.Duration
21+
remoteExecCount int
22+
}{
23+
{
24+
name: "sum",
25+
query: "sum(rate(node_cpu_seconds_total{mode!=\"idle\"}[5m]))",
26+
start: now,
27+
end: now,
28+
step: time.Minute,
29+
remoteExecCount: 2,
30+
},
31+
{
32+
name: "count",
33+
query: "count(rate(node_cpu_seconds_total{mode!=\"idle\"}[5m]))",
34+
start: now,
35+
end: now,
36+
step: time.Minute,
37+
remoteExecCount: 2,
38+
},
39+
}
40+
41+
for _, tc := range testCases {
42+
t.Run(tc.name, func(t *testing.T) {
43+
lp, _, err := CreateTestLogicalPlan(tc.query, tc.start, tc.end, tc.step)
44+
require.NoError(t, err)
45+
46+
d := DistributedOptimizer{}
47+
newRoot, _, err := d.Optimize((*lp).Root())
48+
require.NoError(t, err)
49+
50+
remoteNodeCount := 0
51+
logicalplan.TraverseBottomUp(nil, &newRoot, func(parent, current *logicalplan.Node) bool {
52+
if RemoteNode == (*current).Type() {
53+
remoteNodeCount++
54+
}
55+
return false
56+
})
57+
require.Equal(t, tc.remoteExecCount, remoteNodeCount)
58+
})
59+
}
60+
}
61+
1362
func TestDistributedOptimizer(t *testing.T) {
1463
now := time.Now()
1564
testCases := []struct {

pkg/distributed_execution/fragmenter.go

Lines changed: 114 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -39,34 +39,133 @@ func (s *Fragment) IsEmpty() bool {
3939
func FragmentLogicalPlanNode(queryID uint64, node logicalplan.Node) ([]Fragment, error) {
4040
newFragment := Fragment{}
4141
fragments := []Fragment{}
42-
nextChildrenIDs := []uint64{}
42+
43+
childIDs := []uint64{}
44+
nextChildIDs := []uint64{}
45+
var prevID uint64
4346

4447
logicalplan.TraverseBottomUp(nil, &node, func(parent, current *logicalplan.Node) bool {
45-
if parent == nil { // if we have reached the root
48+
49+
curlen := len(childIDs)
50+
51+
if parent == nil { // root fragment
52+
if len(nextChildIDs) < 2 {
53+
newFragment = Fragment{
54+
Node: node,
55+
FragmentID: getNewID(),
56+
ChildIDs: []uint64{},
57+
IsRoot: true,
58+
}
59+
} else {
60+
newFragment = Fragment{
61+
Node: node,
62+
FragmentID: getNewID(),
63+
ChildIDs: []uint64{nextChildIDs[len(nextChildIDs)-2], nextChildIDs[len(nextChildIDs)-1]},
64+
IsRoot: true,
65+
}
66+
}
67+
68+
fragments = append(fragments, newFragment)
69+
70+
} else if RemoteNode == (*current).Type() {
71+
nextChildIDs = append(nextChildIDs, prevID)
72+
73+
} else if RemoteNode == (*parent).Type() {
74+
if curlen <= 2 {
75+
newFragment = Fragment{
76+
Node: *current,
77+
FragmentID: getNewID(),
78+
ChildIDs: []uint64{},
79+
IsRoot: false,
80+
}
81+
childIDs = append(childIDs, newFragment.FragmentID)
82+
} else {
83+
newFragment = Fragment{
84+
Node: node,
85+
FragmentID: getNewID(),
86+
ChildIDs: []uint64{childIDs[curlen-2], childIDs[curlen-1]},
87+
IsRoot: false,
88+
}
89+
childIDs = []uint64{}
90+
}
91+
prevID = newFragment.FragmentID
92+
fragments = append(fragments, newFragment)
93+
94+
// append remote node information that will be used in the execution stage
95+
key := MakeFragmentKey(queryID, newFragment.FragmentID)
96+
(*parent).(*Remote).FragmentKey = key
97+
}
98+
return false
99+
})
100+
101+
if fragments != nil {
102+
return fragments, nil
103+
} else {
104+
// for non-query API calls
105+
// --> treat as root fragment and immediately return the result
106+
return []Fragment{{
107+
Node: node,
108+
FragmentID: uint64(0),
109+
ChildIDs: []uint64{},
110+
IsRoot: true,
111+
}}, nil
112+
}
113+
}
114+
115+
func TraverseDown(parent *Node, current *Node, layer int,
116+
transform func(parent *Node, current *Node, layer int) (bool, bool)) (bool, bool) {
117+
var stop bool
118+
layer = layer + 1
119+
120+
for _, c := range (*current).Children() {
121+
newStop, _ := TraverseDown(current, c, layer, transform)
122+
stop = newStop || stop
123+
}
124+
if stop {
125+
return false, false
126+
}
127+
return transform(parent, current, layer)
128+
}
129+
130+
func FragmentLogicalPlanNode2(queryID uint64, node logicalplan.Node) ([]Fragment, error) {
131+
newFragment := Fragment{}
132+
fragments := []Fragment{}
133+
134+
nextChildIDs := make(map[int][]uint64, 0)
135+
136+
layer := 0
137+
138+
TraverseDown(nil, &node, layer, func(parent, current *logicalplan.Node, layer int) (stop bool, remote bool) {
139+
140+
if parent == nil { // root fragment
46141
newFragment = Fragment{
47142
Node: node,
48143
FragmentID: getNewID(),
49-
ChildIDs: nextChildrenIDs,
144+
ChildIDs: nextChildIDs[layer],
50145
IsRoot: true,
51146
}
52147
fragments = append(fragments, newFragment)
53-
return false // break the loop
54-
}
55-
if RemoteNode == (*parent).Type() {
148+
return false, true
149+
} else if RemoteNode == (*parent).Type() {
56150
newFragment = Fragment{
57-
Node: *current,
151+
Node: node,
58152
FragmentID: getNewID(),
59-
ChildIDs: []uint64{},
153+
ChildIDs: nextChildIDs[layer],
60154
IsRoot: false,
61155
}
62-
fragments = append(fragments, newFragment)
63-
nextChildrenIDs = append(nextChildrenIDs, newFragment.FragmentID)
64-
65-
// append remote node information that will be used in the execution stage
66-
key := MakeFragmentKey(queryID, newFragment.FragmentID)
67-
(*parent).(*Remote).FragmentKey = key
156+
return false, true
68157
}
69-
return false
158+
159+
nextChildIDs[layer-1] = append(nextChildIDs[layer-1], newFragment.FragmentID)
160+
161+
fragments = append(fragments, newFragment)
162+
163+
// append remote node information that will be used in the execution stage
164+
key := MakeFragmentKey(queryID, newFragment.FragmentID)
165+
(*parent).(*Remote).FragmentKey = key
166+
167+
//isLeaf = false
168+
return false, false
70169
})
71170

72171
if fragments != nil {

0 commit comments

Comments
 (0)