-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Description
Please describe your problem in detail
I'm ready to add preemptionPolicy related logic in reclaim action, but when I'm confused that when the task's preemptionPolicy is Never, do I need to push back the job and queue, continue allowing other tasks or jobs to reclaim resources. You can see that in the reclaim action,
volcano/pkg/scheduler/actions/reclaim/reclaim.go
Lines 86 to 220 in 0843c0d
| for { | |
| // If no queues, break | |
| if queues.Empty() { | |
| break | |
| } | |
| var job *api.JobInfo | |
| var task *api.TaskInfo | |
| queue := queues.Pop().(*api.QueueInfo) | |
| if ssn.Overused(queue) { | |
| klog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name) | |
| continue | |
| } | |
| // Found "high" priority job | |
| jobs, found := preemptorsMap[queue.UID] | |
| if !found || jobs.Empty() { | |
| continue | |
| } else { | |
| job = jobs.Pop().(*api.JobInfo) | |
| } | |
| // Found "high" priority task to reclaim others | |
| if tasks, found := preemptorTasks[job.UID]; !found || tasks.Empty() { | |
| continue | |
| } else { | |
| task = tasks.Pop().(*api.TaskInfo) | |
| } | |
| if !ssn.Allocatable(queue, task) { | |
| klog.V(3).Infof("Queue <%s> is overused when considering task <%s>, ignore it.", queue.Name, task.Name) | |
| continue | |
| } | |
| if !ssn.Preemptive(queue, task) { | |
| klog.V(3).Infof("Queue <%s> can not reclaim by preempt others when considering task <%s> , ignore it.", queue.Name, task.Name) | |
| continue | |
| } | |
| if err := ssn.PrePredicateFn(task); err != nil { | |
| klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err) | |
| continue | |
| } | |
| assigned := false | |
| // we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action | |
| totalNodes := ssn.GetUnschedulableAndUnresolvableNodesForTask(task) | |
| for _, n := range totalNodes { | |
| // When filtering candidate nodes, need to consider the node statusSets instead of the err information. | |
| // refer to kube-scheduler preemption code: https://github.com/kubernetes/kubernetes/blob/9d87fa215d9e8020abdc17132d1252536cd752d2/pkg/scheduler/framework/preemption/preemption.go#L422 | |
| if err := ssn.PredicateForPreemptAction(task, n); err != nil { | |
| klog.V(4).Infof("Reclaim predicate for task %s/%s on node %s return error %v ", task.Namespace, task.Name, n.Name, err) | |
| continue | |
| } | |
| klog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.", task.Namespace, task.Name, n.Name) | |
| var reclaimees []*api.TaskInfo | |
| for _, task := range n.Tasks { | |
| // Ignore non running task. | |
| if task.Status != api.Running { | |
| continue | |
| } | |
| if !task.Preemptable { | |
| continue | |
| } | |
| if j, found := ssn.Jobs[task.Job]; !found { | |
| continue | |
| } else if j.Queue != job.Queue { | |
| q := ssn.Queues[j.Queue] | |
| if !q.Reclaimable() { | |
| continue | |
| } | |
| // Clone task to avoid modify Task's status on node. | |
| reclaimees = append(reclaimees, task.Clone()) | |
| } | |
| } | |
| if len(reclaimees) == 0 { | |
| klog.V(4).Infof("No reclaimees on Node <%s>.", n.Name) | |
| continue | |
| } | |
| victims := ssn.Reclaimable(task, reclaimees) | |
| if err := util.ValidateVictims(task, n, victims); err != nil { | |
| klog.V(3).Infof("No validated victims on Node <%s>: %v", n.Name, err) | |
| continue | |
| } | |
| victimsQueue := ssn.BuildVictimsPriorityQueue(victims) | |
| resreq := task.InitResreq.Clone() | |
| reclaimed := api.EmptyResource() | |
| // Reclaim victims for tasks. | |
| for !victimsQueue.Empty() { | |
| reclaimee := victimsQueue.Pop().(*api.TaskInfo) | |
| klog.Errorf("Try to reclaim Task <%s/%s> for Tasks <%s/%s>", | |
| reclaimee.Namespace, reclaimee.Name, task.Namespace, task.Name) | |
| if err := ssn.Evict(reclaimee, "reclaim"); err != nil { | |
| klog.Errorf("Failed to reclaim Task <%s/%s> for Tasks <%s/%s>: %v", | |
| reclaimee.Namespace, reclaimee.Name, task.Namespace, task.Name, err) | |
| continue | |
| } | |
| reclaimed.Add(reclaimee.Resreq) | |
| // If reclaimed enough resources, break loop to avoid Sub panic. | |
| if resreq.LessEqual(reclaimed, api.Zero) { | |
| break | |
| } | |
| } | |
| klog.V(3).Infof("Reclaimed <%v> for task <%s/%s> requested <%v>.", | |
| reclaimed, task.Namespace, task.Name, task.InitResreq) | |
| if task.InitResreq.LessEqual(reclaimed, api.Zero) { | |
| if err := ssn.Pipeline(task, n.Name); err != nil { | |
| klog.Errorf("Failed to pipeline Task <%s/%s> on Node <%s>", | |
| task.Namespace, task.Name, n.Name) | |
| } | |
| // Ignore error of pipeline, will be corrected in next scheduling loop. | |
| assigned = true | |
| break | |
| } | |
| } | |
| if assigned { | |
| jobs.Push(job) | |
| } | |
| queues.Push(queue) | |
| } |
volcano/pkg/scheduler/actions/reclaim/reclaim.go
Lines 110 to 112 in 0843c0d
| if tasks, found := preemptorTasks[job.UID]; !found || tasks.Empty() { | |
| continue | |
| } else { |
volcano/pkg/scheduler/actions/reclaim/reclaim.go
Lines 116 to 119 in 0843c0d
| if !ssn.Allocatable(queue, task) { | |
| klog.V(3).Infof("Queue <%s> is overused when considering task <%s>, ignore it.", queue.Name, task.Name) | |
| continue | |
| } |
volcano/pkg/scheduler/actions/reclaim/reclaim.go
Lines 121 to 124 in 0843c0d
| if !ssn.Preemptive(queue, task) { | |
| klog.V(3).Infof("Queue <%s> can not reclaim by preempt others when considering task <%s> , ignore it.", queue.Name, task.Name) | |
| continue | |
| } |
volcano/pkg/scheduler/actions/reclaim/reclaim.go
Lines 126 to 129 in 0843c0d
| if err := ssn.PrePredicateFn(task); err != nil { | |
| klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err) | |
| continue | |
| } |
If the task fails to filter in allocatable, Preemptive, PrePredicateFn, the queue will never be pushed back, but whether if other tasks in same jobs or other jobs in same queue can reclaim resources, I'm little bit confused about the logic here, I think when I need to implement the
preemptionPolicy, there is need to push the job and queue back to allow others to continue reclaiming.
You can also see the logic in allocate:
volcano/pkg/scheduler/actions/allocate/allocate.go
Lines 192 to 199 in 0843c0d
| for !tasks.Empty() { | |
| task := tasks.Pop().(*api.TaskInfo) | |
| if !ssn.Allocatable(queue, task) { | |
| klog.V(3).Infof("Queue <%s> is overused when considering task <%s>, ignore it.", queue.Name, task.Name) | |
| continue | |
| } | |
!tasks.Empty() loop, so if the task fails to filter in allocatable, it's reasonable to continue here, allow other task to continue allocate.
9.20 updated:
After discussing with @Monokaix @hwdef @lowang-bh , we think there are some problems in reclaim action, need to refactor the reclaim action
Any other relevant information
No response