Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 244 additions & 1 deletion pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"math"
"math/rand"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -56,6 +57,15 @@ const (
MinCandidateNodesPercentageKey = "minCandidateNodesPercentage"
MinCandidateNodesAbsoluteKey = "minCandidateNodesAbsolute"
MaxCandidateNodesAbsoluteKey = "maxCandidateNodesAbsolute"
GangPreemptionModeKey = "gangPreemptionMode"
)

type GangPreemptionMode uint8

const (
GPModeOff GangPreemptionMode = iota
GPModeMinimal
GPModeAtomic
)

type Action struct {
Expand All @@ -69,6 +79,8 @@ type Action struct {
minCandidateNodesPercentage int
minCandidateNodesAbsolute int
maxCandidateNodesAbsolute int

gpMode GangPreemptionMode
}

func New() *Action {
Expand All @@ -79,6 +91,7 @@ func New() *Action {
minCandidateNodesPercentage: 10,
minCandidateNodesAbsolute: 1,
maxCandidateNodesAbsolute: 100,
gpMode: GPModeOff,
}
}

Expand All @@ -96,9 +109,26 @@ func (pmpt *Action) parseArguments(ssn *framework.Session) {
arguments.GetInt(&pmpt.minCandidateNodesPercentage, MinCandidateNodesPercentageKey)
arguments.GetInt(&pmpt.minCandidateNodesAbsolute, MinCandidateNodesAbsoluteKey)
arguments.GetInt(&pmpt.maxCandidateNodesAbsolute, MaxCandidateNodesAbsoluteKey)
var gpModeStr string
arguments.GetString(&gpModeStr, GangPreemptionModeKey)
pmpt.gpMode = parseGPMode(gpModeStr)
pmpt.ssn = ssn
}

func parseGPMode(s string) GangPreemptionMode {
switch strings.ToLower(strings.TrimSpace(s)) {
case "", "off", "disabled":
return GPModeOff
case "minimal":
return GPModeMinimal
case "atomic":
return GPModeAtomic
default:
klog.V(3).Infof("Unrecognized Gang Preemption Mode, defaulting to `disabled`")
return GPModeOff
}
}

func (pmpt *Action) Execute(ssn *framework.Session) {
klog.V(5).Infof("Enter Preempt ...")
defer klog.V(5).Infof("Leaving Preempt ...")
Expand Down Expand Up @@ -294,7 +324,9 @@ func (pmpt *Action) preempt(
if pmpt.enableTopologyAwarePreemption {
return pmpt.topologyAwarePreempt(ssn, stmt, preemptor, filter, predicateNodes)
}

if pmpt.gpMode != GPModeOff {
return pmpt.gangPreempt(ssn, stmt, preemptor, filter, predicateNodes)
}
return pmpt.normalPreempt(ssn, stmt, preemptor, filter, predicateNodes)
}

Expand Down Expand Up @@ -397,6 +429,217 @@ func (pmpt *Action) normalPreempt(
return assigned, nil
}

func (pmpt *Action) gangPreempt(
ssn *framework.Session,
stmt *framework.Statement,
preemptor *api.TaskInfo,
filter func(*api.TaskInfo) bool,
predicateNodes []*api.NodeInfo,
) (bool, error) {
klog.V(3).Infof("Running Gang Preemption with mode: %v", pmpt.gpMode)
preemptorJob, found := ssn.Jobs[preemptor.Job]
if !found {
return false, fmt.Errorf("not found Job %s in Session", preemptor.Job)
}
currentQueue := ssn.Queues[preemptorJob.Queue]
var preemptees []*api.TaskInfo
for _, node := range predicateNodes {
for _, task := range node.Tasks {
if filter == nil {
preemptees = append(preemptees, task.Clone())
} else if filter(task) {
preemptees = append(preemptees, task.Clone())
}
Comment on lines +448 to +452

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This conditional logic can be simplified. Both branches append task.Clone() to preemptees. You can combine them into a single if statement with an || operator for better readability.

            if filter == nil || filter(task) {
                preemptees = append(preemptees, task.Clone())
            }

}
}
preemptees = ssn.Preemptable(preemptor, preemptees)
nodeJobPreempteesMap := make(map[string]map[api.JobID][]*api.TaskInfo)
for _, p := range preemptees {
nodeName := p.NodeName
if _, ok := nodeJobPreempteesMap[nodeName]; !ok {
nodeJobPreempteesMap[nodeName] = make(map[api.JobID][]*api.TaskInfo)
}
jobID := p.Job
if preemptorJob.UID == jobID {
continue
}
nodeJobPreempteesMap[nodeName][jobID] = append(nodeJobPreempteesMap[nodeName][jobID], p)
}
// Node order comes into play to keep results deterministic when there is a tie for best fitting gang
nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
selectedNodes := util.SortNodes(nodeScores)
preempteeJobIds, targetNode := pmpt.findBestPreemptionTarget(selectedNodes, nodeJobPreempteesMap, preemptor)
if targetNode == nil {
klog.V(3).Infof("No suitable target nodes for preemptor: <%s/%s>", preemptor.Namespace, preemptor.Name)
return false, nil
}
preempted := api.EmptyResource()
var victims []*api.TaskInfo
for _, jid := range preempteeJobIds {
var vics []*api.TaskInfo
if pmpt.gpMode == GPModeMinimal {
vics = nodeJobPreempteesMap[targetNode.Name][jid]
} else {
for _, t := range ssn.Jobs[jid].Tasks {
vics = append(vics, t)
}
}
victims = append(victims, vics...)
}

if pmpt.gpMode == GPModeMinimal {
// If we are evicting minimal tasks, preempt the lower priority ones first.
vq := ssn.BuildVictimsPriorityQueue(victims, preemptor)
victims = victims[:0]
for vq.Len() > 0 {
victims = append(victims, vq.Pop().(*api.TaskInfo))
}
}

for _, preemptee := range victims {
metrics.RegisterPreemptionAttempts()
klog.V(3).Infof("Try to preempt Task <%s/%s> for Task <%s/%s>",
preemptee.Namespace, preemptee.Name, preemptor.Namespace, preemptor.Name)
if err := stmt.Evict(preemptee, "preempt"); err != nil {
klog.Errorf("Failed to preempt Task <%s/%s> for Task <%s/%s>: %v",
preemptee.Namespace, preemptee.Name, preemptor.Namespace, preemptor.Name, err)
continue
}
preempted.Add(preemptee.Resreq)
if pmpt.gpMode == GPModeMinimal && preemptor.InitResreq.LessEqual(targetNode.FutureIdle(), api.Zero) {
break
}
}
// If this check fails, it implies some Evictions failed.
// Since we are optimizing for gangs per node we should try again in next session
if ssn.Allocatable(currentQueue, preemptor) && preemptor.InitResreq.LessEqual(targetNode.FutureIdle(), api.Zero) {
if err := stmt.Pipeline(preemptor, targetNode.Name, !preempted.IsEmpty()); err != nil {
klog.Errorf("Failed to pipeline Task <%s/%s> on Node <%s>",
preemptor.Namespace, preemptor.Name, targetNode.Name)
if rollbackErr := stmt.UnPipeline(preemptor); rollbackErr != nil {
klog.Errorf("Failed to unpipeline Task %v on %v in Session %v for %v.",
preemptor.UID, targetNode.Name, ssn.UID, rollbackErr)
}
}
}
return true, nil
}
Comment on lines +513 to +526

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The function gangPreempt may return true even when the preemptor task is not successfully pipelined. This is inconsistent with normalPreempt and could cause the scheduler to behave incorrectly, for example by not retrying to schedule a job that could have been scheduled. The function should only return true if the preemption leads to a successful allocation of the preemptor task.

To fix this, you can introduce an assigned boolean variable, similar to how it's used in normalPreempt, to track the outcome of the Pipeline operation and return its value at the end.

    assigned := false
    // If this check fails, it implies some Evictions failed.
    // Since we are optimizing for gangs per node we should try again in next session
    if ssn.Allocatable(currentQueue, preemptor) && preemptor.InitResreq.LessEqual(targetNode.FutureIdle(), api.Zero) {
        if err := stmt.Pipeline(preemptor, targetNode.Name, !preempted.IsEmpty()); err != nil {
            klog.Errorf("Failed to pipeline Task <%s/%s> on Node <%s>",
                preemptor.Namespace, preemptor.Name, targetNode.Name)
            if rollbackErr := stmt.UnPipeline(preemptor); rollbackErr != nil {
                klog.Errorf("Failed to unpipeline Task %v on %v in Session %v for %v.",
                    preemptor.UID, targetNode.Name, ssn.UID, rollbackErr)
            }
        } else {
            assigned = true
        }
    }
    return assigned, nil
}


// Returns the node and the minimal-count set of victim jobs on that node.
// If no preemption is needed: jobs == nil, node != nil.
// If impossible on all nodes: jobs == nil, node == nil.
func (pmpt *Action) findBestPreemptionTarget(
selectedNodes []*api.NodeInfo,
nodeJobVictimsMap map[string]map[api.JobID][]*api.TaskInfo,
preemptor *api.TaskInfo,
) (jobs []api.JobID, node *api.NodeInfo) {
// ---- Phase 1: try best single-job victim (minimal overage) ----
var (
bestSingleOver *api.Resource
bestSingleJob api.JobID
bestSingleNode *api.NodeInfo
)
for _, n := range selectedNodes {
idle := n.FutureIdle()
idle.SetScalar("pods", 0)
// Fits without preemption
if preemptor.InitResreq.LessEqual(idle, api.Zero) {
return nil, n
}
need := preemptor.InitResreq.Clone()
// idle could be 0
need.Sub(idle)
for j, jtasks := range nodeJobVictimsMap[n.Name] {
sum := api.EmptyResource()
for _, t := range jtasks {
if t.Job == preemptor.Job {
continue
}
Comment on lines +555 to +557

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This check is redundant. The nodeJobVictimsMap is built from preemptees, which are already filtered in gangPreempt to exclude tasks from the preemptor's own job. Removing this unnecessary check will make the code cleaner.

sum.Add(t.Resreq)
}
if !need.LessEqual(sum, api.Zero) {
continue
}
diff := sum.Sub(need) // overage = sum - need
if bestSingleOver == nil || !bestSingleOver.LessEqual(diff, api.Zero) {
bestSingleOver = diff
bestSingleJob = j
bestSingleNode = n
}
}
}
if bestSingleOver != nil {
return []api.JobID{bestSingleJob}, bestSingleNode
}

// ---- Phase 2: greedy, largest first fit Jobs per node, minimal Job count combo per node ----
type combo struct {
node *api.NodeInfo
jobs []api.JobID
overage *api.Resource
}
var best *combo

for _, n := range selectedNodes {
idle := n.FutureIdle()
idle.SetScalar("pods", 0)
need := preemptor.InitResreq.Clone()
need.Sub(idle)

// Build candidates: per-job sum on this node
type jr struct {
id api.JobID
res *api.Resource
}
var cand []jr
for j, jtasks := range nodeJobVictimsMap[n.Name] {
sum := api.EmptyResource()
for _, t := range jtasks {
if t.Job == preemptor.Job {
continue
}
sum.Add(t.Resreq)
}
cand = append(cand, jr{j, sum})
}
// Sort descending by resource (largest first).
sort.Slice(cand, func(i, j int) bool {
return !cand[i].res.LessEqual(cand[j].res, api.Zero)
})
Comment on lines +606 to +608

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The sorting logic !cand[i].res.LessEqual(cand[j].res, api.Zero) is functionally correct for a descending sort, but it's not very readable. Using cand[j].res.Less(cand[i].res, api.Zero) would be more idiomatic and easier to understand, as it clearly expresses "is j less than i?".

        sort.Slice(cand, func(i, j int) bool {
            return cand[j].res.Less(cand[i].res, api.Zero)
        })


acc := api.EmptyResource()
var picked []api.JobID
for _, c := range cand {
if need.LessEqual(acc, api.Zero) {
break
}
picked = append(picked, c.id)
acc.Add(c.res)
}

if !need.LessEqual(acc, api.Zero) {
continue
}

over := acc.Clone()
over.Sub(need)
// If we don't have the best candidate jobs OR
// If number of selected Jobs is less than the best candidates (disrupt min gangs) OR
// If the disruption cost is same, pick the ones with the least wastage
if best == nil ||
len(picked) < len(best.jobs) ||
(len(picked) == len(best.jobs) && over.Less(best.overage, api.Zero)) {
cp := append([]api.JobID(nil), picked...)
best = &combo{node: n, jobs: cp, overage: over}
}
}

if best != nil {
return best.jobs, best.node
}
return nil, nil
}

func (pmpt *Action) taskEligibleToPreempt(preemptor *api.TaskInfo) error {
if preemptor.Pod.Spec.PreemptionPolicy != nil && *preemptor.Pod.Spec.PreemptionPolicy == v1.PreemptNever {
return fmt.Errorf("not eligible to preempt other tasks due to preemptionPolicy is Never")
Expand Down
Loading
Loading