Skip to content
This repository was archived by the owner on Apr 24, 2023. It is now read-only.

Commit a823627

Browse files
authored
Ignore mostly empty nodes when scheduling small-ish apps (#104)
1 parent 837eb14 commit a823627

File tree

2 files changed

+101
-23
lines changed

2 files changed

+101
-23
lines changed

pkg/binpack/minimal_fragmentation.go

Lines changed: 57 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ import (
2222
"github.com/palantir/k8s-spark-scheduler-lib/pkg/resources"
2323
)
2424

25-
// MinimalFragmentation is a SparkBinPackFunction that tries to put the driver pod on the first possible node with
26-
// enough capacity, and then tries to pack executors onto as few nodes as possible.
25+
// MinimalFragmentation is a SparkBinPackFunction that tries to minimize spark app fragmentation across the cluster.
26+
// see minimalFragmentation for more details.
2727
var MinimalFragmentation = SparkBinPackFunction(func(
2828
ctx context.Context,
2929
driverResources, executorResources *resources.Resources,
@@ -33,66 +33,104 @@ var MinimalFragmentation = SparkBinPackFunction(func(
3333
return SparkBinPack(ctx, driverResources, executorResources, executorCount, driverNodePriorityOrder, executorNodePriorityOrder, nodesSchedulingMetadata, minimalFragmentation)
3434
})
3535

36-
// minimalFragmentation attempts to pack executors onto as few nodes as possible, ideally a single one
36+
// minimalFragmentation attempts to pack executors onto as few nodes as possible, ideally a single one.
3737
// nodePriorityOrder is still used as a guideline, i.e. if an application can fit on multiple nodes, it will pick
38-
// the first eligible node according to nodePriorityOrder
38+
// the first eligible node according to nodePriorityOrder. additionally, minimalFragmentation will attempt to avoid
39+
// mostly empty nodes unless those are required for scheduling or they provide a perfect fit, see a couple examples below.
3940
//
40-
// for instance if nodePriorityOrder = [a, b, c, d, e]
41-
// and we can fit 1 executor on a, 1 executor on b, 3 executors on c, 5 executors on d, 5 executors on e
41+
// 'mostly' empty nodes are currently defined as the ones having capacity >= (executor count + max capacity) / 2
42+
//
43+
// for instance if nodePriorityOrder = [a, b, c, d, e, f]
44+
// and we can fit 1 executor on a, 1 executor on b, 3 executors on c, 5 executors on d, 5 executors on e, 17 executors on f
4245
// and executorCount = 11, then we will return:
4346
// [d, d, d, d, d, e, e, e, e, e, a], true
4447
//
4548
// if instead we have executorCount = 6, then we will return:
4649
// [d, d, d, d, d, a], true
50+
//
51+
// if instead we have executorCount = 15, then we will return:
52+
// [d, d, d, d, d, e, e, e, e, e, c, c, c, a, b], true
53+
//
54+
// if instead we have executorCount = 17, then we will return:
55+
// [f, f, ..., f], true
56+
//
57+
// if instead we have executorCount = 19, then we will return:
58+
// [f, f, ..., f, a, b], true
4759
func minimalFragmentation(
4860
_ context.Context,
4961
executorResources *resources.Resources,
5062
executorCount int,
5163
nodePriorityOrder []string,
5264
nodeGroupSchedulingMetadata resources.NodeGroupSchedulingMetadata,
5365
reservedResources resources.NodeGroupResources) ([]string, bool) {
54-
executorNodes := make([]string, 0, executorCount)
5566
if executorCount == 0 {
56-
return executorNodes, true
67+
return []string{}, true
5768
}
5869

5970
nodeCapacities := capacity.GetNodeCapacities(nodePriorityOrder, nodeGroupSchedulingMetadata, reservedResources, executorResources)
6071
nodeCapacities = capacity.FilterOutNodesWithoutCapacity(nodeCapacities)
72+
if len(nodeCapacities) == 0 {
73+
return nil, false
74+
}
75+
6176
sort.SliceStable(nodeCapacities, func(i, j int) bool {
6277
return nodeCapacities[i].Capacity < nodeCapacities[j].Capacity
6378
})
79+
maxCapacity := nodeCapacities[len(nodeCapacities)-1].Capacity
80+
if executorCount < maxCapacity {
81+
targetCapacity := (executorCount + maxCapacity) / 2
82+
firstNodeWithAtLeastTargetCapacity := sort.Search(len(nodeCapacities), func(i int) bool {
83+
return nodeCapacities[i].Capacity >= targetCapacity
84+
})
85+
86+
// try scheduling on a subset of nodes that excludes the 'emptiest' nodes
87+
if executorNodes, ok := internalMinimalFragmentation(executorCount, nodeCapacities[:firstNodeWithAtLeastTargetCapacity]); ok {
88+
return executorNodes, ok
89+
}
90+
}
91+
92+
// fall back to using empty nodes
93+
return internalMinimalFragmentation(executorCount, nodeCapacities)
94+
}
95+
96+
func internalMinimalFragmentation(
97+
executorCount int,
98+
nodeCapacities []capacity.NodeAndExecutorCapacity) ([]string, bool) {
99+
nodeCapacitiesCopy := make([]capacity.NodeAndExecutorCapacity, 0, len(nodeCapacities))
100+
nodeCapacitiesCopy = append(nodeCapacitiesCopy, nodeCapacities...)
101+
executorNodes := make([]string, 0, executorCount)
64102

65103
// as long as we have nodes where we could schedule executors
66-
for len(nodeCapacities) > 0 {
104+
for len(nodeCapacitiesCopy) > 0 {
67105
// pick the first node that could fit all the executors (if there's one)
68-
position := sort.Search(len(nodeCapacities), func(i int) bool {
69-
return nodeCapacities[i].Capacity >= executorCount
106+
position := sort.Search(len(nodeCapacitiesCopy), func(i int) bool {
107+
return nodeCapacitiesCopy[i].Capacity >= executorCount
70108
})
71109

72-
if position != len(nodeCapacities) {
110+
if position != len(nodeCapacitiesCopy) {
73111
// we found a node that has the required capacity, schedule everything there and we're done
74-
return append(executorNodes, repeat(nodeCapacities[position].NodeName, executorCount)...), true
112+
return append(executorNodes, repeat(nodeCapacitiesCopy[position].NodeName, executorCount)...), true
75113
}
76114

77115
// we will need multiple nodes for scheduling, thus we'll try to schedule executors on nodes with the most capacity
78-
maxCapacity := nodeCapacities[len(nodeCapacities)-1].Capacity
79-
firstNodeWithMaxCapacityIdx := sort.Search(len(nodeCapacities), func(i int) bool {
80-
return nodeCapacities[i].Capacity >= maxCapacity
116+
maxCapacity := nodeCapacitiesCopy[len(nodeCapacitiesCopy)-1].Capacity
117+
firstNodeWithMaxCapacityIdx := sort.Search(len(nodeCapacitiesCopy), func(i int) bool {
118+
return nodeCapacitiesCopy[i].Capacity >= maxCapacity
81119
})
82120

83121
// the loop will exit because maxCapacity is always > 0
84122
currentPos := firstNodeWithMaxCapacityIdx
85-
for ; executorCount >= maxCapacity && currentPos < len(nodeCapacities); currentPos++ {
123+
for ; executorCount >= maxCapacity && currentPos < len(nodeCapacitiesCopy); currentPos++ {
86124
// we can skip the check on firstNodeWithMaxCapacityIdx since we know at least one node will be found
87-
executorNodes = append(executorNodes, repeat(nodeCapacities[currentPos].NodeName, maxCapacity)...)
125+
executorNodes = append(executorNodes, repeat(nodeCapacitiesCopy[currentPos].NodeName, maxCapacity)...)
88126
executorCount -= maxCapacity
89127
}
90128

91129
if executorCount == 0 {
92130
return executorNodes, true
93131
}
94132

95-
nodeCapacities = append(nodeCapacities[:firstNodeWithMaxCapacityIdx], nodeCapacities[currentPos:]...)
133+
nodeCapacitiesCopy = append(nodeCapacitiesCopy[:firstNodeWithMaxCapacityIdx], nodeCapacitiesCopy[currentPos:]...)
96134
}
97135

98136
return nil, false

pkg/binpack/minimal_fragmentation_test.go

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func TestMinimalFragmentation(t *testing.T) {
8585
willFit: true,
8686
expectedCounts: map[string]int{"n1": 1},
8787
}, {
88-
name: "executors fit on a single node",
88+
name: "executors fits perfectly on a single node",
8989
driverResources: resources.CreateResources(1, 3, 0),
9090
executorResources: resources.CreateResources(2, 5, 0),
9191
numExecutors: 5,
@@ -98,6 +98,20 @@ func TestMinimalFragmentation(t *testing.T) {
9898
expectedDriverNode: "n1",
9999
willFit: true,
100100
expectedCounts: map[string]int{"n3": 5},
101+
}, {
102+
name: "executors would fit on a single node, but we consider this too wasteful and revert to fragmenting",
103+
driverResources: resources.CreateResources(1, 3, 0),
104+
executorResources: resources.CreateResources(2, 5, 0),
105+
numExecutors: 5,
106+
nodesSchedulingMetadata: resources.NodeGroupSchedulingMetadata(map[string]*resources.NodeSchedulingMetadata{
107+
"n1": resources.CreateSchedulingMetadata(10, 25, 6, "zone1"),
108+
"n2": resources.CreateSchedulingMetadata(5, 25, 6, "zone1"),
109+
"n3": resources.CreateSchedulingMetadata(100, 100, 6, "zone1"),
110+
}),
111+
nodePriorityOrder: []string{"n1", "n2", "n3"},
112+
expectedDriverNode: "n1",
113+
willFit: true,
114+
expectedCounts: map[string]int{"n1": 4, "n2": 1},
101115
}, {
102116
name: "executors fit on the smallest nodes that can accommodate all of them",
103117
driverResources: resources.CreateResources(1, 3, 0),
@@ -107,18 +121,34 @@ func TestMinimalFragmentation(t *testing.T) {
107121
"n1": resources.CreateSchedulingMetadata(200, 500, 6, "zone1"),
108122
"n2": resources.CreateSchedulingMetadata(100, 250, 6, "zone1"),
109123
}),
110-
nodePriorityOrder: []string{"n1", "n2", "n3"},
124+
nodePriorityOrder: []string{"n1", "n2"},
111125
expectedDriverNode: "n1",
112126
willFit: true,
113127
expectedCounts: map[string]int{"n2": 5},
128+
}, {
129+
name: "consider 'empty' nodes when the app can't fit on 'non-empty' ones",
130+
driverResources: resources.CreateResources(1, 3, 0),
131+
executorResources: resources.CreateResources(2, 5, 0),
132+
numExecutors: 5,
133+
nodesSchedulingMetadata: resources.NodeGroupSchedulingMetadata(map[string]*resources.NodeSchedulingMetadata{
134+
"n1": resources.CreateSchedulingMetadata(1, 3, 6, "zone1"),
135+
"n2": resources.CreateSchedulingMetadata(8, 10, 6, "zone1"),
136+
"n3": resources.CreateSchedulingMetadata(20, 25, 6, "zone1"),
137+
"n4": resources.CreateSchedulingMetadata(20, 25, 6, "zone1"),
138+
"n5": resources.CreateSchedulingMetadata(20, 25, 6, "zone1"),
139+
}),
140+
nodePriorityOrder: []string{"n1", "n2", "n3", "n4", "n5"},
141+
expectedDriverNode: "n1",
142+
willFit: true,
143+
expectedCounts: map[string]int{"n3": 5},
114144
}, {
115145
name: "when available resources are equal, prefer nodes according to the requested priorities",
116146
driverResources: resources.CreateResources(1, 3, 0),
117147
executorResources: resources.CreateResources(2, 5, 0),
118148
numExecutors: 5,
119149
nodesSchedulingMetadata: resources.NodeGroupSchedulingMetadata(map[string]*resources.NodeSchedulingMetadata{
120-
"n1": resources.CreateSchedulingMetadata(10, 25, 6, "zone1"),
121-
"n2": resources.CreateSchedulingMetadata(5, 25, 6, "zone1"),
150+
"n1": resources.CreateSchedulingMetadata(1, 3, 6, "zone1"),
151+
"n2": resources.CreateSchedulingMetadata(8, 10, 6, "zone1"),
122152
"n3": resources.CreateSchedulingMetadata(20, 25, 6, "zone1"),
123153
"n4": resources.CreateSchedulingMetadata(20, 25, 6, "zone1"),
124154
"n5": resources.CreateSchedulingMetadata(20, 25, 6, "zone1"),
@@ -141,6 +171,16 @@ func TestMinimalFragmentation(t *testing.T) {
141171
expectedDriverNode: "n1",
142172
willFit: true,
143173
expectedCounts: map[string]int{"n2": 1, "n3": 4},
174+
}, {
175+
name: "driver fits but not executors",
176+
driverResources: resources.CreateResources(2, 3, 1),
177+
executorResources: resources.CreateResources(1, 1, 0),
178+
numExecutors: 1,
179+
nodesSchedulingMetadata: resources.NodeGroupSchedulingMetadata(map[string]*resources.NodeSchedulingMetadata{
180+
"n1": resources.CreateSchedulingMetadata(2, 3, 1, "zone1"),
181+
}),
182+
nodePriorityOrder: []string{"n1"},
183+
willFit: false,
144184
}, {
145185
name: "driver memory does not fit",
146186
driverResources: resources.CreateResources(2, 4, 1),

0 commit comments

Comments
 (0)