Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/distributed_execution/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestUnmarshalWithLogicalPlan(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
plan, _, err := CreateTestLogicalPlan(tc.query, start, end, step)
plan, err := CreateTestLogicalPlan(tc.query, start, end, step)
require.NoError(t, err)
require.NotNil(t, plan)

Expand Down
4 changes: 2 additions & 2 deletions pkg/distributed_execution/distributed_optimizer.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package distributed_execution

import (
"github.com/thanos-io/promql-engine/query"

"github.com/prometheus/prometheus/util/annotations"
"github.com/thanos-io/promql-engine/logicalplan"
"github.com/thanos-io/promql-engine/query"
)

// This is a simplified implementation that only handles binary aggregation cases
Expand All @@ -18,6 +17,7 @@ type DistributedOptimizer struct{}
func (d *DistributedOptimizer) Optimize(root logicalplan.Node, opts *query.Options) (logicalplan.Node, annotations.Annotations) {
warns := annotations.New()

// insert remote nodes
logicalplan.TraverseBottomUp(nil, &root, func(parent, current *logicalplan.Node) bool {

if (*current).Type() == logicalplan.BinaryNode && d.hasAggregation(current) {
Expand Down
50 changes: 1 addition & 49 deletions pkg/distributed_execution/distributed_optimizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import (
"testing"
"time"

"github.com/prometheus/prometheus/promql/parser"
"github.com/stretchr/testify/require"
"github.com/thanos-io/promql-engine/logicalplan"
"github.com/thanos-io/promql-engine/query"
)

func TestDistributedOptimizer(t *testing.T) {
Expand Down Expand Up @@ -58,7 +56,7 @@ func TestDistributedOptimizer(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
lp, _, err := CreateTestLogicalPlan(tc.query, now, now, time.Minute)
lp, err := CreateTestLogicalPlan(tc.query, now, now, time.Minute)
require.NoError(t, err)

node := (*lp).Root()
Expand All @@ -75,49 +73,3 @@ func TestDistributedOptimizer(t *testing.T) {
})
}
}

func getStartAndEnd(start time.Time, end time.Time, step time.Duration) (time.Time, time.Time) {
if step == 0 {
return start, start
}
return start, end
}

func CreateTestLogicalPlan(qs string, start time.Time, end time.Time, step time.Duration) (*logicalplan.Plan, query.Options, error) {

start, end = getStartAndEnd(start, end, step)

qOpts := query.Options{
Start: start,
End: end,
Step: step,
StepsBatch: 10,
NoStepSubqueryIntervalFn: func(duration time.Duration) time.Duration {
return 0
},
LookbackDelta: 0,
EnablePerStepStats: false,
}

expr, err := parser.NewParser(qs, parser.WithFunctions(parser.Functions)).ParseExpr()
if err != nil {
return nil, qOpts, err
}

planOpts := logicalplan.PlanOptions{
DisableDuplicateLabelCheck: false,
}

logicalPlan, err := logicalplan.NewFromAST(expr, &qOpts, planOpts)
if err != nil {
return nil, qOpts, err
}
optimizedPlan, _ := logicalPlan.Optimize(logicalplan.DefaultOptimizers)

distributedOptimizer := DistributedOptimizer{}
dOptimizedNode, _ := distributedOptimizer.Optimize(optimizedPlan.Root(), &qOpts)

plan := logicalplan.New(dOptimizedNode, &qOpts, planOpts)

return &plan, qOpts, nil
}
100 changes: 88 additions & 12 deletions pkg/distributed_execution/plan_fragments/fragmenter.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,103 @@
package plan_fragments

import "github.com/thanos-io/promql-engine/logicalplan"
import (
"encoding/binary"

"github.com/google/uuid"
"github.com/thanos-io/promql-engine/logicalplan"

"github.com/cortexproject/cortex/pkg/distributed_execution"
)

// Fragmenter interface
type Fragmenter interface {
// Fragment function fragments the logical query plan and will always return the fragment in the order of child-to-root
// in other words, the order of the fragment in the array will be the order they are being scheduled
Fragment(node logicalplan.Node) ([]Fragment, error)
Fragment(queryID uint64, node logicalplan.Node) ([]Fragment, error)
}

type DummyFragmenter struct {
func getNewID() uint64 {
id := uuid.New()
return binary.BigEndian.Uint64(id[:8])
}

func (f *DummyFragmenter) Fragment(node logicalplan.Node) ([]Fragment, error) {
// simple logic without distributed optimizer
return []Fragment{
{
type PlanFragmenter struct {
}

func (f *PlanFragmenter) Fragment(queryID uint64, node logicalplan.Node) ([]Fragment, error) {
fragments := []Fragment{}

nodeToFragmentID := make(map[*logicalplan.Node]uint64)
nodeToSubtreeFragmentIDs := make(map[*logicalplan.Node][]uint64)

logicalplan.TraverseBottomUp(nil, &node, func(parent, current *logicalplan.Node) bool {
childFragmentIDs := make(map[uint64]bool)
children := (*current).Children()

for _, child := range children {
if subtreeIDs, exists := nodeToSubtreeFragmentIDs[child]; exists {
for _, fragmentID := range subtreeIDs {
childFragmentIDs[fragmentID] = true
}
}
}

childIDs := make([]uint64, 0, len(childFragmentIDs))
for fragmentID := range childFragmentIDs {
childIDs = append(childIDs, fragmentID)
}

if parent == nil { // root fragment
newFragment := Fragment{
Node: *current,
FragmentID: getNewID(),
ChildIDs: childIDs,
IsRoot: true,
}
fragments = append(fragments, newFragment)

// cache subtree fragment IDs for this node
nodeToSubtreeFragmentIDs[current] = childIDs

} else if distributed_execution.RemoteNode == (*current).Type() {
remoteNode := (*current).(*distributed_execution.Remote)
fragmentID := getNewID()
nodeToFragmentID[current] = fragmentID

// Set the fragment key for the remote node
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
remoteNode.FragmentKey = key

newFragment := Fragment{
Node: remoteNode.Expr,
FragmentID: fragmentID,
ChildIDs: childIDs,
IsRoot: false,
}

fragments = append(fragments, newFragment)

subtreeIDs := append([]uint64{fragmentID}, childIDs...)
nodeToSubtreeFragmentIDs[current] = subtreeIDs
} else {
nodeToSubtreeFragmentIDs[current] = childIDs
}

return false
})

if len(fragments) > 0 {
return fragments, nil
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. You can remove this else as we already return above

// for non-query API calls
// --> treat as root fragment and immediately return the result
return []Fragment{{
Node: node,
FragmentID: uint64(1),
FragmentID: uint64(0),
ChildIDs: []uint64{},
IsRoot: true,
},
}, nil
}}, nil
}
}

type Fragment struct {
Expand All @@ -47,6 +123,6 @@ func (s *Fragment) IsEmpty() bool {
return true
}

func NewDummyFragmenter() Fragmenter {
return &DummyFragmenter{}
func NewPlanFragmenter() Fragmenter {
return &PlanFragmenter{}
}
32 changes: 26 additions & 6 deletions pkg/distributed_execution/plan_fragments/fragmenter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ import (

"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/util/logical_plan"
"github.com/cortexproject/cortex/pkg/distributed_execution"
)

// Tests fragmentation of logical plans, verifying that the fragments contain correct metadata.
// Note: The number of fragments is determined by the distributed optimizer's strategy -
// if the optimizer logic changes, this test will need to be updated accordingly.
func TestFragmenter(t *testing.T) {
type testCase struct {
name string
Expand All @@ -19,8 +22,6 @@ func TestFragmenter(t *testing.T) {
}

now := time.Now()

// more tests will be added when distributed optimizer and fragmenter are implemented
tests := []testCase{
{
name: "simple logical query plan - no fragmentation",
Expand All @@ -29,18 +30,37 @@ func TestFragmenter(t *testing.T) {
end: now,
expectedFragments: 1,
},
{
name: "binary operation with aggregations",
query: "sum(rate(node_cpu_seconds_total{mode!=\"idle\"}[5m])) + sum(rate(node_memory_Active_bytes[5m]))",
start: now,
end: now,
expectedFragments: 3,
},
{
name: "multiple binary operation with aggregations",
query: "sum(rate(http_requests_total{job=\"api\"}[5m])) + sum(rate(http_requests_total{job=\"web\"}[5m])) + sum(rate(http_requests_total{job=\"cache\"}[5m])) + sum(rate(http_requests_total{job=\"db\"}[5m]))",
start: now,
end: now,
expectedFragments: 7,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test should try to compare the output fragments instead of number of fragments to be safer

},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
lp, err := logical_plan.CreateTestLogicalPlan(tc.query, tc.start, tc.end, 0)
lp, err := distributed_execution.CreateTestLogicalPlan(tc.query, tc.start, tc.end, 0)
require.NoError(t, err)

fragmenter := NewDummyFragmenter()
res, err := fragmenter.Fragment((*lp).Root())
fragmenter := NewPlanFragmenter()
res, err := fragmenter.Fragment(uint64(1), (*lp).Root())

require.NoError(t, err)
require.Equal(t, tc.expectedFragments, len(res))

// check the metadata of the fragments of binary expressions
if len(res) == 3 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this check? This only works for the second test case?

require.Equal(t, []uint64{res[0].FragmentID, res[1].FragmentID}, res[2].ChildIDs)
}
})
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package logical_plan
package distributed_execution

import (
"time"
Expand Down Expand Up @@ -44,7 +44,7 @@ func CreateTestLogicalPlan(qs string, start time.Time, end time.Time, step time.
if err != nil {
return nil, err
}
optimizedPlan, _ := logicalPlan.Optimize(logicalplan.DefaultOptimizers)
optimizedPlan, _ := logicalPlan.Optimize(append(logicalplan.DefaultOptimizers, &DistributedOptimizer{}))

return &optimizedPlan, nil
}
4 changes: 2 additions & 2 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe
connectedFrontends: map[string]*connectedFrontend{},

fragmentTable: fragment_table.NewFragmentTable(2 * time.Minute),
fragmenter: plan_fragments.NewDummyFragmenter(),
fragmenter: plan_fragments.NewPlanFragmenter(),
distributedExecEnabled: distributedExecEnabled,
queryFragmentRegistry: map[queryKey][]uint64{},
}
Expand Down Expand Up @@ -351,7 +351,7 @@ func (s *Scheduler) fragmentAndEnqueueRequest(frontendContext context.Context, f
return err
}

fragments, err := s.fragmenter.Fragment(lpNode)
fragments, err := s.fragmenter.Fragment(msg.QueryID, lpNode)
if err != nil {
return err
}
Expand Down
Loading