diff --git a/Makefile b/Makefile
index 5e71650f58..759bed4615 100644
--- a/Makefile
+++ b/Makefile
@@ -15,7 +15,7 @@
BIN_DIR=_output/bin
RELEASE_DIR=_output/release
REPO_PATH=volcano.sh/volcano
-IMAGE_PREFIX=volcanosh
+IMAGE_PREFIX=sailimages
CRD_OPTIONS ?= "crd:crdVersions=v1,generateEmbeddedObjectMeta=true"
CRD_OPTIONS_EXCLUDE_DESCRIPTION=${CRD_OPTIONS}",maxDescLen=0"
CC ?= "gcc"
@@ -89,9 +89,14 @@ vcctl: init
image_bins: vc-scheduler vc-controller-manager vc-webhook-manager
-images:
- for name in controller-manager scheduler webhook-manager; do\
- docker buildx build -t "${IMAGE_PREFIX}/vc-$$name:$(TAG)" . -f ./installer/dockerfile/$$name/Dockerfile --output=type=${BUILDX_OUTPUT_TYPE} --platform ${DOCKER_PLATFORMS} --build-arg APK_MIRROR=${APK_MIRROR}; \
+# build everytime code changes in scheduler
+scheduler-image:
+ docker buildx build -t "${IMAGE_PREFIX}/vc-sail-scheduler:v3" . -f ./installer/dockerfile/scheduler/Dockerfile --output=type=${BUILDX_OUTPUT_TYPE} --platform ${DOCKER_PLATFORMS}
+
+# build only once
+images:
+ for name in controller-manager webhook-manager; do\
+ docker buildx build -t "${IMAGE_PREFIX}/vc-sail-$$name:v3" . -f ./installer/dockerfile/$$name/Dockerfile --output=type=${BUILDX_OUTPUT_TYPE} --platform ${DOCKER_PLATFORMS}; \
done
generate-code:
diff --git a/README.md b/README.md
index 807aa90cde..7931632d4a 100644
--- a/README.md
+++ b/README.md
@@ -1,3 +1,4 @@
+
@@ -10,7 +11,6 @@
[](https://github.com/volcano-sh/volcano/releases)
[](https://github.com/volcano-sh/volcano/blob/master/LICENSE)
[](https://bestpractices.coreinfrastructure.org/projects/3012)
-[](https://scorecard.dev/viewer/?uri=github.com/volcano-sh/volcano)
[Volcano](https://volcano.sh/) is a batch system built on Kubernetes. It provides a suite of mechanisms that are commonly required by
@@ -43,7 +43,7 @@ Volcano is an incubating project of the [Cloud Native Computing Foundation](http
- [Intro: Kubernetes Batch Scheduling @ KubeCon 2019 EU](https://sched.co/MPi7)
- [Volcano 在 Kubernetes 中运行高性能作业实践 @ ArchSummit 2019](https://archsummit.infoq.cn/2019/shenzhen/presentation/1817)
-- [Volcano:基于云原生的高密计算解决方案 @ Huawei Connection 2019](https://e.huawei.com/cn/material/event/HC/09099dce0070415e9f26ada51b2216d7)
+- [Volcano:基于云原生的高密计算解决方案 @ Huawei Connection 2019](https://agenda.events.huawei.com/2019/cn/minisite/agenda.html#dayTab=day7&tagName=%7B%22language%22%3A%22Cn%22%7D&seminarId=1743)
- [Improving Performance of Deep Learning Workloads With Volcano @ KubeCon 2019 NA](https://sched.co/UaZi)
- [Batch Capability of Kubernetes Intro @ KubeCon 2019 NA](https://sched.co/Uajv)
- [Intro: Kubernetes Batch Scheduling @ KubeCon 2019 EU](https://sched.co/MPi7)
@@ -77,7 +77,7 @@ Note:
Install Volcano on an existing Kubernetes cluster. This way is both available for x86_64 and arm64 architecture.
```
-kubectl apply -f https://raw.githubusercontent.com/volcano-sh/volcano/release-1.10/installer/volcano-development.yaml
+kubectl apply -f https://raw.githubusercontent.com/volcano-sh/volcano/release-1.8/installer/volcano-development.yaml
```
Enjoy! Volcano will create the following resources in `volcano-system` namespace.
@@ -108,24 +108,6 @@ job.batch/volcano-admission-init 1/1 48s 96s
```
-### Install via helm
-
-To install official release, please visit to [helm-charts](https://github.com/volcano-sh/helm-charts) for details.
-
-```bash
-helm repo add volcano-sh https://volcano-sh.github.io/helm-charts
-helm install volcano volcano-sh/volcano -n volcano-system --create-namespace
-```
-
-Install from source code for developers:
-
-```bash
-helm install volcano installer/helm/chart/volcano --namespace volcano-system --create-namespace
-
-# list helm release
-helm list -n volcano-system
-```
-
### Install from code
If you don't have a kubernetes cluster, try one-click install from code base:
@@ -141,20 +123,18 @@ This way is only available for x86_64 temporarily.
If you want to get prometheus and grafana volcano dashboard after volcano installed, try following commands:
```bash
-make TAG=v1.10.0 generate-yaml
-kubectl create -f _output/release/volcano-monitoring-v1.10.0.yaml
+make TAG=v1.8.2 generate-yaml
+kubectl create -f _output/release/volcano-monitoring-v1.8.2.yaml
```
## Kubernetes compatibility
-| | Kubernetes 1.17 | Kubernetes 1.18 | Kubernetes 1.19 | Kubernetes 1.20 | Kubernetes 1.21 | Kubernetes 1.22 | Kubernetes 1.23 | Kubernetes 1.24 | Kubernetes 1.25 | Kubernetes 1.26 | Kubernetes 1.27 | Kubernetes 1.28 | Kubernetes 1.29 |Kubernetes 1.30 |
-|-----------------------|-----------------|-----------------|-----------------|-----------------|-----------------|-----------------|-----------------|-----------------|-----------------|-----------------|-----------------|-----------------|-----------------|----------------|
-| Volcano v1.6 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | - | - | - | - | - | - |- |
-| Volcano v1.7 | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | - |_ |
-| Volcano v1.8 | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | - |- |
-| Volcano v1.9 | - | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |- |
-| Volcano v1.10 | - | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |✓ |
-| Volcano HEAD (master) | - | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |✓ |
+| | Kubernetes 1.17 | Kubernetes 1.18 | Kubernetes 1.19 | Kubernetes 1.20 | Kubernetes 1.21 | Kubernetes 1.22 | Kubernetes 1.23 | Kubernetes 1.24 | Kubernetes 1.25 | Kubernetes 1.27 | Kubernetes 1.28 |
+|------------------------|-----------------|-----------------|-----------------|-----------------|-----------------|-----------------|-----------------|-----------------|-----------------|-----------------|-----------------|
+| Volcano v1.6 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | - | - | - | - |
+| Volcano v1.7 | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
+| Volcano v1.8 | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |✓ | ✓ | ✓ |
+| Volcano HEAD (master) | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |✓ | ✓ | ✓ |
Key:
* `✓` Volcano and the Kubernetes version are exactly compatible.
@@ -180,8 +160,6 @@ Resources:
If you have any question, feel free to reach out to us in the following ways:
-[Volcano Slack Channel](https://cloud-native.slack.com/archives/C011GJDQS0N) | [Join](https://slack.cncf.io/)
+[Volcano Slack Channel](https://volcano-sh.slack.com)
[Mailing List](https://groups.google.com/forum/#!forum/volcano-sh)
-
-Wechat: Add WeChat account `k8s2222` (华为云小助手2号) to let her pull you into the group.
\ No newline at end of file
diff --git a/installer/helm/chart/volcano/config/volcano-scheduler.conf b/installer/helm/chart/volcano/config/volcano-scheduler.conf
index 1bf55d4142..be6f8e8969 100644
--- a/installer/helm/chart/volcano/config/volcano-scheduler.conf
+++ b/installer/helm/chart/volcano/config/volcano-scheduler.conf
@@ -2,14 +2,16 @@ actions: "enqueue, allocate, backfill"
tiers:
- plugins:
- name: priority
+ - name: overcommit
+ arguments:
+ overcommit-factor: 20
- name: gang
enablePreemptable: false
- name: conformance
- plugins:
- - name: overcommit
- name: drf
enablePreemptable: false
- name: predicates
- - name: capacity
+ - name: proportion
- name: nodeorder
- name: binpack
diff --git a/installer/helm/chart/volcano/templates/admission.yaml b/installer/helm/chart/volcano/templates/admission.yaml
index 03f3d8ab56..d0398a73ae 100644
--- a/installer/helm/chart/volcano/templates/admission.yaml
+++ b/installer/helm/chart/volcano/templates/admission.yaml
@@ -138,7 +138,7 @@ spec:
- --port={{.Values.basic.admission_port}}
- -v={{.Values.custom.admission_log_level}}
- 2>&1
- image: {{ .Values.basic.image_registry }}/{{.Values.basic.admission_image_name}}:{{.Values.basic.image_tag_version}}
+ image: {{ .Values.basic.image_registry }}/{{.Values.basic.admission_image_name}}:v3
imagePullPolicy: {{ .Values.basic.image_pull_policy }}
name: admission
{{- if .Values.custom.admission_resources }}
diff --git a/installer/helm/chart/volcano/templates/controllers.yaml b/installer/helm/chart/volcano/templates/controllers.yaml
index 826d4bd303..20ec3c6036 100644
--- a/installer/helm/chart/volcano/templates/controllers.yaml
+++ b/installer/helm/chart/volcano/templates/controllers.yaml
@@ -151,7 +151,7 @@ spec:
resources:
{{- toYaml .Values.custom.controller_resources | nindent 14 }}
{{- end }}
- image: {{ .Values.basic.image_registry }}/{{.Values.basic.controller_image_name}}:{{.Values.basic.image_tag_version}}
+ image: {{ .Values.basic.image_registry }}/{{.Values.basic.controller_image_name}}:v3
args:
- --logtostderr
- --enable-healthz=true
diff --git a/installer/helm/chart/volcano/templates/scheduler.yaml b/installer/helm/chart/volcano/templates/scheduler.yaml
index 3a56a2f639..42aebec9ca 100644
--- a/installer/helm/chart/volcano/templates/scheduler.yaml
+++ b/installer/helm/chart/volcano/templates/scheduler.yaml
@@ -44,7 +44,7 @@ rules:
verbs: ["create", "get", "list", "watch", "delete"]
- apiGroups: ["batch.volcano.sh"]
resources: ["jobs"]
- verbs: ["get", "list", "watch", "update", "delete"]
+ verbs: ["get", "list", "watch", "update", "delete", "patch"]
- apiGroups: ["batch.volcano.sh"]
resources: ["jobs/status"]
verbs: ["update", "patch"]
@@ -169,7 +169,7 @@ spec:
{{- end }}
containers:
- name: {{ .Release.Name }}-scheduler
- image: {{ .Values.basic.image_registry }}/{{.Values.basic.scheduler_image_name}}:{{.Values.basic.image_tag_version}}
+ image: sailimages/vc-sail-scheduler:v3
{{- if .Values.custom.scheduler_resources }}
resources:
{{- toYaml .Values.custom.scheduler_resources | nindent 12 }}
@@ -188,7 +188,7 @@ spec:
env:
- name: DEBUG_SOCKET_DIR
value: /tmp/klog-socks
- imagePullPolicy: {{ .Values.basic.image_pull_policy }}
+ imagePullPolicy: Never
volumeMounts:
- name: scheduler-config
mountPath: /volcano.scheduler
diff --git a/installer/helm/chart/volcano/values.yaml b/installer/helm/chart/volcano/values.yaml
index 5cbccf0e7d..0144175ae5 100644
--- a/installer/helm/chart/volcano/values.yaml
+++ b/installer/helm/chart/volcano/values.yaml
@@ -1,13 +1,13 @@
basic:
- controller_image_name: "volcanosh/vc-controller-manager"
- scheduler_image_name: "volcanosh/vc-scheduler"
- admission_image_name: "volcanosh/vc-webhook-manager"
+ controller_image_name: "sailimages/vc-sail-controller-manager"
+ scheduler_image_name: "sailimages/vc-sail-scheduler"
+ admission_image_name: "sailimages/vc-sail-webhook-manager"
admission_secret_name: "volcano-admission-secret"
admission_config_file: "config/volcano-admission.conf"
scheduler_config_file: "config/volcano-scheduler.conf"
image_pull_secret: ""
image_pull_policy: "Always"
- image_tag_version: "v1.10.0"
+ image_tag_version: "v3"
admission_port: 8443
image_registry: "docker.io"
custom:
@@ -143,7 +143,7 @@ custom:
webhooks_namespace_selector_expressions: ~
-# Specify log level for Volcano main component
+# Specify log level for Volcano main component
admission_log_level: 4
scheduler_log_level: 3
controller_log_level: 4
diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go
index 3c99eefc1b..a617377238 100644
--- a/pkg/scheduler/actions/allocate/allocate.go
+++ b/pkg/scheduler/actions/allocate/allocate.go
@@ -17,18 +17,62 @@
package allocate
import (
- "time"
+ // "time"
"k8s.io/klog/v2"
- "volcano.sh/apis/pkg/apis/scheduling"
+ // disabling the following packages to make p3k8s work
+ // "volcano.sh/apis/pkg/apis/scheduling"
"volcano.sh/volcano/pkg/scheduler/api"
- "volcano.sh/volcano/pkg/scheduler/conf"
+ // "volcano.sh/volcano/pkg/scheduler/conf"
"volcano.sh/volcano/pkg/scheduler/framework"
- "volcano.sh/volcano/pkg/scheduler/metrics"
+ // "volcano.sh/volcano/pkg/scheduler/metrics"
"volcano.sh/volcano/pkg/scheduler/util"
+
+ // packages needed to make p3k8s work
+ // Author: Tianya Chen
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/labels"
+
+ "encoding/json"
+ "fmt"
+ "os"
+ "reflect"
+ "sort"
+ "strconv"
+ "strings"
)
+/**************** p3k8s specific strcuts ********************/
+// Author: Tianya Chen
+type JobT struct {
+ JobID int `json:"jobID"`
+ JobType string `json:"jobType"`
+ K int `json:"k"`
+ Duration int `json:"duration"`
+ SlowDuration int `json:"slowDuration"`
+}
+
+type InputT struct {
+ RackCap []int `json:"rack_cap"`
+ NumLargeMachineRacks int `json:"numLargeMachineRacks"`
+ Queue []JobT `json:"queue"`
+ Machines []int `json:"machines"`
+}
+
+type OutputT struct {
+ JobID int `json:"jobID"`
+ Machines []int `json:"machines"`
+}
+
+type Message struct {
+ Input InputT `json:"input"`
+ Output interface{} `json:"output"`
+}
+
+/***************** p3k8s specific structs above ******************/
+
+// Xuye He: volcano v1.10 updates action struct
type Action struct {
session *framework.Session
// configured flag for error cache
@@ -47,294 +91,402 @@ func (alloc *Action) Name() string {
func (alloc *Action) Initialize() {}
-func (alloc *Action) parseArguments(ssn *framework.Session) {
- arguments := framework.GetArgOfActionFromConf(ssn.Configurations, alloc.Name())
- arguments.GetBool(&alloc.enablePredicateErrorCache, conf.EnablePredicateErrCacheKey)
-}
+// volcano v1.10 updates action struct
+// requires importing conf, omitted
+// func (alloc *Action) parseArguments(ssn *framework.Session) {
+// arguments := framework.GetArgOfActionFromConf(ssn.Configurations, alloc.Name())
+// arguments.GetBool(&alloc.enablePredicateErrorCache, conf.EnablePredicateErrCacheKey)
+// }
+// Custom execute function for p3k8s
+// Authors: Tianya Chen, Baljit Singh
func (alloc *Action) Execute(ssn *framework.Session) {
- klog.V(5).Infof("Enter Allocate ...")
- defer klog.V(5).Infof("Leaving Allocate ...")
-
- alloc.parseArguments(ssn)
-
- // the allocation for pod may have many stages
- // 1. pick a queue named Q (using ssn.QueueOrderFn)
- // 2. pick a job named J from Q (using ssn.JobOrderFn)
- // 3. pick a task T from J (using ssn.TaskOrderFn)
- // 4. use predicateFn to filter out node that T can not be allocated on.
- // 5. use ssn.NodeOrderFn to judge the best node and assign it to T
-
- // queues sort queues by QueueOrderFn.
- queues := util.NewPriorityQueue(ssn.QueueOrderFn)
- // jobsMap is used to find job with the highest priority in given queue.
- jobsMap := map[api.QueueID]*util.PriorityQueue{}
+ klog.V(3).Infof("Enter Allocate ...")
+ defer klog.V(3).Infof("Leaving Allocate ...")
+ // parse config, populates the Action struct
+ // alloc.parseArguments(ssn)
alloc.session = ssn
- alloc.pickUpQueuesAndJobs(queues, jobsMap)
- klog.V(3).Infof("Try to allocate resource to %d Queues", len(jobsMap))
- alloc.allocateResources(queues, jobsMap)
-}
-func (alloc *Action) pickUpQueuesAndJobs(queues *util.PriorityQueue, jobsMap map[api.QueueID]*util.PriorityQueue) {
- ssn := alloc.session
+ // Load configuration of policy
+ policyConf := ssn.GetPolicy("kube-system/scheduler-conf")
+ klog.V(3).Infof("Using policy %v.", policyConf)
+
+ // Select a policy function
+ policyFn := fifoRandomFn
+ switch policyConf {
+ case "fifoRandom":
+ policyFn = fifoRandomFn
+ case "fifoHeter":
+ policyFn = fifoHeterFn
+ case "sjfHeter":
+ policyFn = sjfHeterFn
+ case "custom":
+ policyFn = customFn
+ }
+
+ // Prepare job queue
+ jobQueue := util.NewPriorityQueue(ssn.JobOrderFn)
+ var trace string
+ var t *api.TaskInfo
for _, job := range ssn.Jobs {
- // If not config enqueue action, change Pending pg into Inqueue statue to avoid blocking job scheduling.
- if job.IsPending() {
- if conf.EnabledActionMap["enqueue"] {
- klog.V(4).Infof("Job <%s/%s> Queue <%s> skip allocate, reason: job status is pending.",
- job.Namespace, job.Name, job.Queue)
- continue
- } else {
- klog.V(4).Infof("Job <%s/%s> Queue <%s> status update from pending to inqueue, reason: no enqueue action is configured.",
- job.Namespace, job.Name, job.Queue)
- job.PodGroup.Status.Phase = scheduling.PodGroupInqueue
+ numPendingTasks := len(job.TaskStatusIndex[api.Pending])
+ if numPendingTasks >= int(job.MinAvailable) {
+ job = addJobProperty(job)
+ jobQueue.Push(job)
+ if trace == "" {
+ trace = job.Trace
+ } else if trace != job.Trace {
+ klog.Errorf("Found multiple traces (%v, %v) in Session %v",
+ trace, job.Trace, ssn.UID)
}
+ if t == nil {
+ t = getOneTask(job)
+ }
+ } else {
+ klog.V(3).Infof("Job <%v, %v> has %v tasks pending but requires %v tasks (creation in progress?).",
+ job.Namespace, job.Name, numPendingTasks, job.MinAvailable)
}
+ }
- if vr := ssn.JobValid(job); vr != nil && !vr.Pass {
- klog.V(4).Infof("Job <%s/%s> Queue <%s> skip allocate, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message)
- continue
- }
+ // no jobs in the queue, exit
+ if jobQueue.Empty() {
+ klog.V(3).Infof("No jobs awaiting, DONE")
+ return
+ }
- if _, found := ssn.Queues[job.Queue]; !found {
- klog.Warningf("Skip adding Job <%s/%s> because its queue %s is not found",
- job.Namespace, job.Name, job.Queue)
- continue
+ // add jobs to the queue in the order of their creation time
+ jobs := []*api.JobInfo{}
+ for {
+ job := jobQueue.Pop()
+ jobs = append(jobs, job.(*api.JobInfo))
+ if jobQueue.Empty() {
+ break
}
+ }
- if _, found := jobsMap[job.Queue]; !found {
- jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
- queues.Push(ssn.Queues[job.Queue])
+ // Prepare node info
+ nodes := []*api.NodeInfo{}
+ nodesAvailable := make(map[string]*api.NodeInfo)
+ selector := labels.SelectorFromSet(labels.Set(map[string]string{"type": "virtual-kubelet"}))
+ for _, node := range ssn.Nodes {
+ if selector.Matches(labels.Set(node.Node.Labels)) {
+ node = addNodeProperty(node)
+ if node.Rack < 0 {
+ continue
+ }
+ nodes = append(nodes, node)
+ if t.Resreq.LessEqual(node.Idle, api.Zero) {
+ nodesAvailable[node.Node.ObjectMeta.Name] = node
+ }
}
-
- klog.V(4).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
- jobsMap[job.Queue].Push(job)
}
-}
-
-// allocateResources primarily accomplishes two steps:
-// 1. picks up tasks.
-// 2. allocates resources to these tasks. (this step is carried out by the allocateResourcesForTasks method.)
-func (alloc *Action) allocateResources(queues *util.PriorityQueue, jobsMap map[api.QueueID]*util.PriorityQueue) {
- ssn := alloc.session
- pendingTasks := map[api.JobID]*util.PriorityQueue{}
- allNodes := ssn.NodeList
+ // no nodes available, exit
+ if len(nodesAvailable) <= 0 {
+ klog.V(3).Infof("No nodes available, DONE")
+ return
+ }
- // To pick tuple for job, we choose to pick namespace firstly.
- // Because we believe that number of queues would less than namespaces in most case.
- // And, this action would make the resource usage among namespace balanced.
- for {
- if queues.Empty() {
- break
+ klog.V(3).Infof("%v/%v nodes available:", len(nodesAvailable), len(nodes))
+ for _, node := range nodes {
+ if _, found := nodesAvailable[node.Name]; found {
+ klog.V(3).Infof(" <%v>: available", node.Name)
+ } else {
+ klog.V(3).Infof(" <%v>", node.Name)
}
+ }
- queue := queues.Pop().(*api.QueueInfo)
+ nothingScheduled := true
+ var input InputT
- if ssn.Overused(queue) {
- klog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name)
- continue
- }
+ // repeat until no more jobs can be scheduled (one job scheduled in each iteration)
+ for {
+ // Prepare policy input for grader json
+ input = prepareInput(jobs, nodes, nodesAvailable)
- klog.V(3).Infof("Try to allocate resource to Jobs in Queue <%s>", queue.Name)
+ // Call policy function to get allocation
+ allocation := policyFn(jobs, nodes)
- jobs, found := jobsMap[queue.UID]
- if !found || jobs.Empty() {
- klog.V(4).Infof("Can not find jobs for queue %s.", queue.Name)
- continue
+ // nothing could be scheduled
+ if len(allocation) == 0 {
+ break
}
- job := jobs.Pop().(*api.JobInfo)
- if _, found = pendingTasks[job.UID]; !found {
- tasks := util.NewPriorityQueue(ssn.TaskOrderFn)
+ // Validate allocation returned by the policy
+ var jobAllocated *api.JobInfo
+ var jobAllocatedIdx int
+ validAllocation := true
+ // Tasks don't include reference to job, so need to traverse all jobs and tasks
+ for idx, job := range jobs {
+ nodeInUse := make(map[*api.NodeInfo]bool)
for _, task := range job.TaskStatusIndex[api.Pending] {
- // Skip tasks whose pod are scheduling gated
- if task.SchGated {
- continue
- }
-
- // Skip BestEffort task in 'allocate' action.
- if task.Resreq.IsEmpty() {
- klog.V(4).Infof("Task <%v/%v> is BestEffort task, skip it.",
- task.Namespace, task.Name)
- continue
+ node, taskAllocated := allocation[task]
+ // task found in allocation
+ if taskAllocated {
+ nothingScheduled = false
+ if jobAllocated == nil {
+ // we found the job
+ jobAllocated = job
+ jobAllocatedIdx = idx
+ } else {
+ // we already found allocated task before, check if they match
+ // allocated included multiple jobs
+ if job != jobAllocated {
+ validAllocation = false
+ klog.Errorf("ERROR! Allocation included both Job %v and %v.",
+ jobAllocated.Name, job.Name)
+ break
+ }
+ }
+ if nodeInUse[node] {
+ validAllocation = false
+ klog.Errorf("ERROR! Could not allocate Task <%v/%v>: Node %v already in use",
+ task.Namespace, task.Name, node.Name)
+ break
+ }
+ if !task.Resreq.LessEqual(node.Idle, api.Zero) {
+ validAllocation = false
+ klog.Errorf("ERROR! Could not allocate Task <%v/%v>: node enough idle resources in Node %v",
+ task.Namespace, task.Name, node.Name)
+ break
+ }
+ nodeInUse[node] = true
+ } else { // task not allocated by the policy
+ if jobAllocated != nil { // some task from this job was allocated, but this task wasn't
+ validAllocation = false
+ klog.Errorf("ERROR! Job %v partially allocated", job.Name)
+ break
+ } else {
+ // can contiue to the next task
+ // not skipping the entire job, to detect partial allocations
+ continue
+ }
}
-
- tasks.Push(task)
}
- pendingTasks[job.UID] = tasks
+ // allocation included task(s) from this job
+ if jobAllocated != nil {
+ // no need to check other jobs
+ break
+ }
}
- tasks := pendingTasks[job.UID]
- if tasks.Empty() {
- // put queue back again and try other jobs in this queue
- queues.Push(queue)
- continue
+ if jobAllocated == nil {
+ // returned allocation does not contain tasks of a valid job from the queue
+ // no point to retry with the same inputs - exit the loop
+ break
}
- klog.V(3).Infof("Try to allocate resource to %d tasks of Job <%v/%v>",
- tasks.Len(), job.Namespace, job.Name)
-
- alloc.allocateResourcesForTasks(tasks, job, jobs, queue, allNodes)
-
- // Put back the queue to priority queue after job's resource allocating finished,
- // To ensure that the priority of the queue is calculated based on the latest resource allocation situation.
- queues.Push(queue)
- }
-}
-
-func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *api.JobInfo, jobs *util.PriorityQueue, queue *api.QueueInfo, allNodes []*api.NodeInfo) {
- ssn := alloc.session
- stmt := framework.NewStatement(ssn)
- ph := util.NewPredicateHelper()
+ // prepare output for grader
+ var output OutputT
+ output.JobID = jobAllocated.ID
+ // find nodes in the returned allocation that belong to
+ for _, task := range jobAllocated.TaskStatusIndex[api.Pending] {
+ node, found := allocation[task]
+ if found {
+ output.Machines = append(output.Machines, node.ID)
+ }
+ }
- for !tasks.Empty() {
- task := tasks.Pop().(*api.TaskInfo)
+ // Record scheduling decision in a json file
+ recordDecision(input, output, trace)
+
+ if validAllocation {
+ // Allocate tasks
+ for task, node := range allocation {
+ klog.V(3).Infof("Try to bind Task <%v/%v> to Node <%v>: <%v> vs. <%v>",
+ task.Namespace, task.Name, node.Name, task.Resreq, node.Idle)
+ if err := ssn.Allocate(task, node); err != nil {
+ klog.V(3).Infof("ERROR! Failed to bind Task %v, %v on %v in Session %v\n",
+ task.UID, task.Name, node.Name, ssn.UID)
+ } else {
+ ssn.UpdateScheduledTime(task)
+ // update nodesAvailable for next iteration
+ delete(nodesAvailable, node.Name)
+ }
+ }
- if !ssn.Allocatable(queue, task) {
- klog.V(3).Infof("Queue <%s> is overused when considering task <%s>, ignore it.", queue.Name, task.Name)
- continue
}
- // check if the task with its spec has already predicates failed
- if job.TaskHasFitErrors(task) {
- klog.V(5).Infof("Task %s with role spec %s has already predicated failed, skip", task.Name, task.TaskRole)
- continue
+ // for _, j := range jobs {
+ // fmt.Printf("Printing job %v in jobs list and its annotations \n", j.Name)
+ // for _, info := range j.Tasks {
+ // p, e := ssn.KubeClient().CoreV1().Pods(j.Namespace).Get(context.TODO(), info.Pod.Name, metav1.GetOptions{})
+ // if e != nil {
+ // fmt.Printf("Error getting pod %v: %v\n", p.Name, e)
+ // } else {
+ // fmt.Printf("Printing annotations from pod %v: %v\n", p.Name, p.Annotations)
+ // }
+ // }
+ // }
+ // fmt.Printf("Finished printing all jobs in job list passed to the policy in iteration %v\n", itCounter)
+
+ // remove the allocated job from the list passed to the policy in the next loop iteration
+ // if allocation was not valid, the job will be considered again next time Execute() is called
+ jobs = append(jobs[:jobAllocatedIdx], jobs[jobAllocatedIdx+1:]...)
+
+ // if no more jobs or nodes, exit the loop
+ if len(jobs) == 0 {
+ klog.V(3).Infof("No jobs awaiting, DONE")
+ break
}
- klog.V(3).Infof("There are <%d> nodes for Job <%v/%v>", len(ssn.Nodes), job.Namespace, job.Name)
+ klog.V(3).Infof("%v jobs awaiting:", len(jobs))
+ for _, job := range jobs {
+ klog.V(3).Infof(" <%v/%v>", job.Namespace, job.Name)
+ }
- if err := ssn.PrePredicateFn(task); err != nil {
- klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err)
- fitErrors := api.NewFitErrors()
- for _, ni := range allNodes {
- fitErrors.SetNodeError(ni.Name, err)
- }
- job.NodesFitErrors[task.UID] = fitErrors
+ if len(nodesAvailable) <= 0 {
+ klog.V(3).Infof("No nodes available, DONE")
break
}
- predicateNodes, fitErrors := ph.PredicateNodes(task, allNodes, alloc.predicate, alloc.enablePredicateErrorCache)
- if len(predicateNodes) == 0 {
- job.NodesFitErrors[task.UID] = fitErrors
- // Assume that all left tasks are allocatable, but can not meet gang-scheduling min member,
- // so we should break from continuously allocating.
- // otherwise, should continue to find other allocatable task
- if job.NeedContinueAllocating() {
- continue
+ klog.V(3).Infof("%v/%v nodes available:", len(nodesAvailable), len(nodes))
+ for _, node := range nodes {
+ if _, found := nodesAvailable[node.Name]; found {
+ klog.V(3).Infof(" <%v>: available", node.Name)
} else {
- break
+ klog.V(3).Infof(" <%v>", node.Name)
}
}
+ }
+ if nothingScheduled { // if nothing scheduled, record empty scheduling decision
+ var output OutputT // empty
+ recordDecision(input, output, trace)
+ }
+}
- // Candidate nodes are divided into two gradients:
- // - the first gradient node: a list of free nodes that satisfy the task resource request;
- // - The second gradient node: the node list whose sum of node idle resources and future idle meets the task resource request;
- // Score the first gradient node first. If the first gradient node meets the requirements, ignore the second gradient node list,
- // otherwise, score the second gradient node and select the appropriate node.
- var candidateNodes [][]*api.NodeInfo
- var idleCandidateNodes []*api.NodeInfo
- var futureIdleCandidateNodes []*api.NodeInfo
- for _, n := range predicateNodes {
- if task.InitResreq.LessEqual(n.Idle, api.Zero) {
- idleCandidateNodes = append(idleCandidateNodes, n)
- } else if task.InitResreq.LessEqual(n.FutureIdle(), api.Zero) {
- futureIdleCandidateNodes = append(futureIdleCandidateNodes, n)
- } else {
- klog.V(5).Infof("Predicate filtered node %v, idle: %v and future idle: %v do not meet the requirements of task: %v",
- n.Name, n.Idle, n.FutureIdle(), task.Name)
- }
- }
- candidateNodes = append(candidateNodes, idleCandidateNodes)
- candidateNodes = append(candidateNodes, futureIdleCandidateNodes)
-
- var bestNode *api.NodeInfo
- for index, nodes := range candidateNodes {
- if klog.V(5).Enabled() {
- for _, node := range nodes {
- klog.V(5).Infof("node %v, idle: %v, future idle: %v", node.Name, node.Idle, node.FutureIdle())
- }
- }
- switch {
- case len(nodes) == 0:
- klog.V(5).Infof("Task: %v, no matching node is found in the candidateNodes(index: %d) list.", task.Name, index)
- case len(nodes) == 1: // If only one node after predicate, just use it.
- bestNode = nodes[0]
- case len(nodes) > 1: // If more than one node after predicate, using "the best" one
- nodeScores := util.PrioritizeNodes(task, nodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
-
- bestNode = ssn.BestNodeFn(task, nodeScores)
- if bestNode == nil {
- bestNode = util.SelectBestNode(nodeScores)
- }
- }
+func (alloc *Action) UnInitialize() {}
- // If a proper node is found in idleCandidateNodes, skip futureIdleCandidateNodes and directly return the node information.
- if bestNode != nil {
- break
- }
- }
+/*********************** p3k8s specific functions *********************************/
+// Author: Tianya Chen
- // Allocate idle resource to the task.
- if task.InitResreq.LessEqual(bestNode.Idle, api.Zero) {
- klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, bestNode.Name)
- if err := stmt.Allocate(task, bestNode); err != nil {
- klog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v",
- task.UID, bestNode.Name, ssn.UID, err)
- if rollbackErr := stmt.UnAllocate(task); rollbackErr != nil {
- klog.Errorf("Failed to unallocate Task %v on %v in Session %v for %v.",
- task.UID, bestNode.Name, ssn.UID, rollbackErr)
- }
- } else {
- metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
- metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
- }
+// keep track of input and output in the previous allocation decision
+var prevInput InputT
+var prevOutput OutputT
+
+func recordDecision(input InputT, output OutputT, trace string) {
+ // Marshal policy input and output to json and write to file
+ var message Message
+ message.Input = input
+ if len(output.Machines) > 0 {
+ sort.Ints(output.Machines)
+ message.Output = output
+ }
+ // save only if input is different than the previous one
+ if !reflect.DeepEqual(input, prevInput) || !reflect.DeepEqual(output, prevOutput) {
+ jobsInfo := []int{}
+ for _, jq := range input.Queue {
+ jobsInfo = append(jobsInfo, jq.JobID)
+ }
+ sort.Ints(jobsInfo)
+ nodesInfo := input.Machines
+ sort.Ints(nodesInfo)
+ if len(output.Machines) > 0 {
+ klog.Infof("Policy scheduled JobID=%v to %v (Input queue: %v, nodes: %v)",
+ output.JobID, output.Machines, jobsInfo, nodesInfo)
} else {
- klog.V(3).Infof("Predicates failed in allocate for task <%s/%s> on node <%s> with limited resources",
- task.Namespace, task.Name, bestNode.Name)
-
- // Allocate releasing resource to the task if any.
- if task.InitResreq.LessEqual(bestNode.FutureIdle(), api.Zero) {
- klog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>",
- task.Namespace, task.Name, bestNode.Name, task.InitResreq, bestNode.Releasing)
- if err := stmt.Pipeline(task, bestNode.Name, false); err != nil {
- klog.Errorf("Failed to pipeline Task %v on %v in Session %v for %v.",
- task.UID, bestNode.Name, ssn.UID, err)
- if rollbackErr := stmt.UnPipeline(task); rollbackErr != nil {
- klog.Errorf("Failed to unpipeline Task %v on %v in Session %v for %v.",
- task.UID, bestNode.Name, ssn.UID, rollbackErr)
- }
- } else {
- metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
- metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
- }
- }
+ klog.Infof("Policy could not schedule any job (Input queue: %v, nodes: %v)",
+ jobsInfo, nodesInfo)
}
+ b, _ := json.Marshal(message)
+ traceFile, _ := os.OpenFile(fmt.Sprintf("/tmp/trace-%s.json", trace), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
+ traceFile.Write(append(b, ','))
+ traceFile.Close()
+ } else {
+ klog.V(3).Infof("Same input, skip recording")
+ }
+ // remember input and output, to avoid saving identical scheduling decisions
+ prevInput = input
+ prevOutput = output
+}
- if ssn.JobReady(job) && !tasks.Empty() {
- jobs.Push(job)
- break
+func addJobProperty(job *api.JobInfo) *api.JobInfo {
+ for _, task := range job.TaskStatusIndex[api.Pending] {
+ // jobID, _ := strconv.ParseInt(job.Name[3 :], 10, 64)
+ jobID, _ := strconv.ParseInt(strings.Split(job.Name, "-")[1], 10, 64)
+ job.ID = int(jobID)
+ job.Trace = task.Pod.ObjectMeta.Labels["trace"]
+ job.Type = task.Pod.ObjectMeta.Labels["type"]
+ fastDuration, _ := strconv.ParseInt(task.Pod.ObjectMeta.Labels["FastDuration"], 10, 64)
+ job.FastDuration = int(fastDuration)
+ slowDuration, _ := strconv.ParseInt(task.Pod.ObjectMeta.Labels["SlowDuration"], 10, 64)
+ job.SlowDuration = int(slowDuration)
+ break
+ }
+ job.CreationTime = metav1.Now()
+ for _, task := range job.TaskStatusIndex[api.Pending] {
+ if task.Pod.ObjectMeta.CreationTimestamp.Before(&job.CreationTime) {
+ job.CreationTime = task.Pod.ObjectMeta.CreationTimestamp
}
}
+ return job
+}
- if ssn.JobReady(job) {
- stmt.Commit()
+func addNodeProperty(node *api.NodeInfo) *api.NodeInfo {
+ nodeID, _ := strconv.ParseInt(node.Node.ObjectMeta.Name[3:], 10, 64)
+ node.ID = int(nodeID)
+ if rack, found := node.Node.ObjectMeta.Labels["Rack"]; found {
+ rackID, _ := strconv.ParseInt(rack, 10, 64)
+ node.Rack = int(rackID)
} else {
- if !ssn.JobPipelined(job) {
- stmt.Discard()
- }
+ node.Rack = -1
}
+ if gpu, found := node.Node.ObjectMeta.Labels["GPU"]; found && gpu == "true" {
+ node.GPU = true
+ } else {
+ node.GPU = false
+ }
+ return node
}
-func (alloc *Action) predicate(task *api.TaskInfo, node *api.NodeInfo) error {
- // Check for Resource Predicate
- var statusSets api.StatusSets
- if ok, resources := task.InitResreq.LessEqualWithResourcesName(node.FutureIdle(), api.Zero); !ok {
- statusSets = append(statusSets, &api.Status{Code: api.Unschedulable, Reason: api.WrapInsufficientResourceReason(resources)})
- return api.NewFitErrWithStatus(task, node, statusSets...)
+func getOneTask(job *api.JobInfo) *api.TaskInfo {
+ for _, t := range job.TaskStatusIndex[api.Pending] {
+ return t
}
- return alloc.session.PredicateForAllocateAction(task, node)
+ return nil
}
-func (alloc *Action) UnInitialize() {}
+func prepareInput(jobs []*api.JobInfo, nodes []*api.NodeInfo, nodesAvailable map[string]*api.NodeInfo) InputT {
+ var input InputT
+
+ // Collect rack capacities and number of GPU racks from node info
+ rackCap := make(map[int]int)
+ for _, node := range nodes {
+ if node.Rack >= 0 {
+ if _, found := rackCap[node.Rack]; found {
+ rackCap[node.Rack] = rackCap[node.Rack] + 1
+ } else {
+ rackCap[node.Rack] = 1
+ }
+ if node.GPU {
+ if node.Rack > input.NumLargeMachineRacks {
+ input.NumLargeMachineRacks = node.Rack
+ }
+ }
+ }
+ }
+ for rackID := 1; rackID <= len(rackCap); rackID++ {
+ input.RackCap = append(input.RackCap, rackCap[rackID])
+ }
+
+ // Collect job info
+ for _, job := range jobs {
+ var queueJob JobT
+ queueJob.JobID = job.ID
+ queueJob.K = int(job.MinAvailable)
+ queueJob.JobType = job.Type
+ queueJob.Duration = job.FastDuration
+ queueJob.SlowDuration = job.SlowDuration
+ input.Queue = append(input.Queue, queueJob)
+ }
+
+ // Collect node info
+ for _, node := range nodesAvailable {
+ input.Machines = append(input.Machines, node.ID)
+ }
+
+ sort.Ints(input.Machines)
+
+ return input
+}
diff --git a/pkg/scheduler/actions/allocate/custom.go b/pkg/scheduler/actions/allocate/custom.go
new file mode 100644
index 0000000000..70b29e2bee
--- /dev/null
+++ b/pkg/scheduler/actions/allocate/custom.go
@@ -0,0 +1,20 @@
+package allocate
+
+import (
+ "volcano.sh/volcano/pkg/scheduler/api"
+)
+
+func customFn(jobs []*api.JobInfo, nodes []*api.NodeInfo) map[*api.TaskInfo]*api.NodeInfo {
+ allocation := make(map[*api.TaskInfo]*api.NodeInfo)
+ i := 0
+ for _, job := range jobs {
+ for _, task := range job.TaskStatusIndex[api.Pending] {
+ if i >= len(nodes) {
+ break
+ }
+ allocation[task] = nodes[i]
+ i++
+ }
+ }
+ return allocation
+}
\ No newline at end of file
diff --git a/pkg/scheduler/actions/allocate/fifoHeter.go b/pkg/scheduler/actions/allocate/fifoHeter.go
new file mode 100644
index 0000000000..84456d241c
--- /dev/null
+++ b/pkg/scheduler/actions/allocate/fifoHeter.go
@@ -0,0 +1,30 @@
+package allocate
+
+import (
+ "volcano.sh/volcano/pkg/scheduler/api"
+ "math/rand"
+)
+
+func fifoHeterFn(jobs []*api.JobInfo, nodes []*api.NodeInfo) map[*api.TaskInfo]*api.NodeInfo {
+ // TODO: UPDATE THIS FUNCTION
+ allocation := make(map[*api.TaskInfo]*api.NodeInfo)
+ i := 0
+ job := jobs[rand.Intn(len(jobs))]
+ for _, task := range job.TaskStatusIndex[api.Pending] {
+ for i 0 {
+ // skip nodes that have tasks running
+ i++
+ }
+ if i>=len(nodes) {
+ // out of nodes
+ break
+ }
+ allocation[task] = nodes[i]
+ i++
+ }
+ if len(job.TaskStatusIndex[api.Pending]) != len(allocation) {
+ // could not allocate all the tasks, return empty allocation
+ allocation = make(map[*api.TaskInfo]*api.NodeInfo)
+ }
+ return allocation
+}
diff --git a/pkg/scheduler/actions/allocate/fifoRandom.go b/pkg/scheduler/actions/allocate/fifoRandom.go
new file mode 100644
index 0000000000..ac9ab94566
--- /dev/null
+++ b/pkg/scheduler/actions/allocate/fifoRandom.go
@@ -0,0 +1,43 @@
+package allocate
+
+import (
+ "volcano.sh/volcano/pkg/scheduler/api"
+ "math/rand"
+ "time"
+)
+
+func fifoRandomFn(jobs []*api.JobInfo, nodes []*api.NodeInfo) map[*api.TaskInfo]*api.NodeInfo {
+ // TODO: UPDATE THIS FUNCTION
+ allocation := make(map[*api.TaskInfo]*api.NodeInfo)
+
+ if len(jobs) == 0 {
+ return allocation
+ }
+
+ job := jobs[0]
+ // get available nodes
+ availableNodeIdx := []int{}
+ for i, node := range nodes {
+ if len(node.Tasks) == 0 {
+ availableNodeIdx = append(availableNodeIdx, i)
+ }
+ }
+ // shuffle the nodes index
+ rand.Seed(time.Now().UnixNano())
+ rand.Shuffle(len(availableNodeIdx), func(i, j int) {availableNodeIdx[i], availableNodeIdx[j] = availableNodeIdx[j], availableNodeIdx[i]})
+ // allocate
+ i := 0
+ for _, task := range job.TaskStatusIndex[api.Pending] {
+ if i < len(availableNodeIdx) {
+ allocation[task] = nodes[availableNodeIdx[i]]
+ } else {
+ break
+ }
+ i++
+ }
+ if len(job.TaskStatusIndex[api.Pending]) != len(allocation) {
+ // could not allocate all the tasks, return empty allocation
+ allocation = make(map[*api.TaskInfo]*api.NodeInfo)
+ }
+ return allocation
+}
diff --git a/pkg/scheduler/actions/allocate/sjfHeter.go b/pkg/scheduler/actions/allocate/sjfHeter.go
new file mode 100644
index 0000000000..81384b937a
--- /dev/null
+++ b/pkg/scheduler/actions/allocate/sjfHeter.go
@@ -0,0 +1,30 @@
+package allocate
+
+import (
+ "volcano.sh/volcano/pkg/scheduler/api"
+ "math/rand"
+)
+
+func sjfHeterFn(jobs []*api.JobInfo, nodes []*api.NodeInfo) map[*api.TaskInfo]*api.NodeInfo {
+ // TODO: UPDATE THIS FUNCTION
+ allocation := make(map[*api.TaskInfo]*api.NodeInfo)
+ i := 0
+ job := jobs[rand.Intn(len(jobs))]
+ for _, task := range job.TaskStatusIndex[api.Pending] {
+ for i 0 {
+ // skip nodes that have tasks running
+ i++
+ }
+ if i>=len(nodes) {
+ // out of nodes
+ break
+ }
+ allocation[task] = nodes[i]
+ i++
+ }
+ if len(job.TaskStatusIndex[api.Pending]) != len(allocation) {
+ // could not allocate all the tasks, return empty allocation
+ allocation = make(map[*api.TaskInfo]*api.NodeInfo)
+ }
+ return allocation
+}
diff --git a/pkg/scheduler/api/job_info.go b/pkg/scheduler/api/job_info.go
index 6699ee5f19..872384991d 100644
--- a/pkg/scheduler/api/job_info.go
+++ b/pkg/scheduler/api/job_info.go
@@ -370,6 +370,15 @@ type JobInfo struct {
// * value means workload can use all the revocable node for during node active revocable time.
RevocableZone string
Budget *DisruptionBudget
+
+ // CMU719 p3 custom properties
+ // Author: Tianya Chen
+ ID int
+ Trace string
+ Type string
+ FastDuration int
+ SlowDuration int
+ CreationTime metav1.Time
}
// NewJobInfo creates a new jobInfo for set of tasks
diff --git a/pkg/scheduler/api/node_info.go b/pkg/scheduler/api/node_info.go
index c749cd903c..36f338ef66 100644
--- a/pkg/scheduler/api/node_info.go
+++ b/pkg/scheduler/api/node_info.go
@@ -88,6 +88,12 @@ type NodeInfo struct {
// checking an image's existence and advanced usage (e.g., image locality scheduling policy) based on the image
// state information.
ImageStates map[string]*k8sframework.ImageStateSummary
+
+ // CMU719 p3 custom properties
+ // Author: Tianya Chen
+ ID int
+ Rack int
+ GPU bool
}
// FutureIdle returns resources that will be idle in the future:
diff --git a/pkg/scheduler/cache/cache_extension.go b/pkg/scheduler/cache/cache_extension.go
new file mode 100644
index 0000000000..daf642c32f
--- /dev/null
+++ b/pkg/scheduler/cache/cache_extension.go
@@ -0,0 +1,112 @@
+package cache
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/client-go/tools/cache"
+ "time"
+
+ volapi "volcano.sh/volcano/pkg/scheduler/api"
+)
+
+func (sc *SchedulerCache) UpdateScheduledTime(task *volapi.TaskInfo) error {
+ sc.Mutex.Lock()
+ defer sc.Mutex.Unlock()
+ updateInterval := 3 * time.Second
+ retryTimes := 5
+ scheduledTime := metav1.NewTime(time.Now())
+ for i := 0; i < retryTimes; i++ {
+ pod, err := sc.kubeClient.CoreV1().Pods(task.Pod.Namespace).Get(context.TODO(), task.Pod.Name, metav1.GetOptions{})
+ if err != nil {
+ if apierrors.IsNotFound(err) {
+ return nil
+ }
+ return err
+ }
+
+ refJob := pod.Annotations["volcano.sh/job-name"]
+ if refJob == "" {
+ fmt.Printf("Sh*t we don't have volcano.sh/job-name label that is supposed to be here present for pod %v\n", pod.Name)
+ }
+
+ var perr error
+ if (pod.ObjectMeta.Annotations == nil) || (pod.ObjectMeta.Annotations != nil && pod.ObjectMeta.Annotations["scheduledTime"] == "") {
+ a := map[string]string{"scheduledTime": scheduledTime.Rfc3339Copy().String()}
+ m := map[string]interface{}{"annotations": a}
+ pat := map[string]interface{}{"metadata": m}
+
+ bt, err := json.Marshal(pat)
+ if err != nil {
+ fmt.Printf("Error with marshalling patch %v\n", err)
+ }
+ _, perr = sc.kubeClient.CoreV1().Pods(pod.Namespace).Patch(context.TODO(), task.Pod.Name, types.StrategicMergePatchType, bt, metav1.PatchOptions{})
+
+ if perr != nil {
+ fmt.Printf("Error with patching pod %v\n", err)
+ return perr
+ }
+ }
+
+
+ // Patch a vcjob
+ vcj, err := sc.vcClient.BatchV1alpha1().Jobs("default").Get(context.TODO(), refJob, metav1.GetOptions{})
+ // Problem usually for only one replica pod jobs
+ if vcj.Spec.Tasks[0].Replicas == 1 {
+ var patches []map[string]interface{}
+ var patch map[string]interface{}
+ if len(vcj.Annotations) == 0 {
+ patch = map[string]interface{}{
+ "op": "add",
+ "path": "/metadata/annotations",
+ "value": map[string]string{"scheduledTime": scheduledTime.Rfc3339Copy().String()},
+ }
+ patches = append(patches, patch)
+ } else {
+ patch = map[string]interface{}{
+ "op": "add",
+ "path": "/metadata/annotations/scheduledTime",
+ "value": scheduledTime.Rfc3339Copy().String(),
+ }
+ patches = append(patches, patch)
+ }
+
+ bt, err := json.Marshal(patches)
+ if err != nil {
+ fmt.Printf("Error with marshalling patch %v\n", err)
+ }
+ vcj, jerr := sc.vcClient.BatchV1alpha1().Jobs("default").Patch(context.TODO(), refJob, types.JSONPatchType, bt, metav1.PatchOptions{})
+ if jerr != nil {
+ fmt.Printf("Patching vcjob %v not successful: %v\n", refJob, err)
+ return jerr
+ }
+ fmt.Printf("New annotations of job %v are %v\n", refJob, vcj.Annotations)
+ if perr == nil && jerr == nil {
+ return nil
+ }
+ }
+ if perr == nil {
+ return nil
+ }
+
+ time.Sleep(updateInterval)
+ }
+ return fmt.Errorf("update pod scheduled time failed after %d retries", retryTimes)
+}
+
+func (sc *SchedulerCache) LoadSchedulerConf(path string) (map[string]string, error) {
+ ns, name, err := cache.SplitMetaNamespaceKey(path)
+ if err != nil {
+ return nil, err
+ }
+
+ confMap, err := sc.kubeClient.CoreV1().ConfigMaps(ns).Get(context.TODO(), name, metav1.GetOptions{})
+ if err != nil {
+ return nil, err
+ }
+
+ return confMap.Data, nil
+}
diff --git a/pkg/scheduler/cache/interface.go b/pkg/scheduler/cache/interface.go
index 94ab18f373..4a475daafd 100644
--- a/pkg/scheduler/cache/interface.go
+++ b/pkg/scheduler/cache/interface.go
@@ -87,6 +87,11 @@ type Cache interface {
// EventRecorder returns the event recorder
EventRecorder() record.EventRecorder
+
+ // cmu719 p3k8s specific functions
+ // Author: Tianya Chen
+ LoadSchedulerConf(path string) (map[string]string, error)
+ UpdateScheduledTime(task *api.TaskInfo) error
}
// VolumeBinder interface for allocate and bind volumes
diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go
index 03e40c30fd..5814b6cae8 100644
--- a/pkg/scheduler/framework/session.go
+++ b/pkg/scheduler/framework/session.go
@@ -477,7 +477,10 @@ func (ssn *Session) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) (err er
}
}
} else {
- ssn.cache.RevertVolumes(task, podVolumes)
+ // p3k8s changes
+ // Author: Baljit Singh
+ fmt.Printf("entering revert volumes through jobready with task %v\n", task.Name)
+ // ssn.cache.RevertVolumes(task, podVolumes)
}
return nil
diff --git a/pkg/scheduler/framework/session_extension.go b/pkg/scheduler/framework/session_extension.go
new file mode 100644
index 0000000000..7dc59d22cc
--- /dev/null
+++ b/pkg/scheduler/framework/session_extension.go
@@ -0,0 +1,28 @@
+package framework
+
+import (
+ "k8s.io/klog/v2"
+ "volcano.sh/volcano/pkg/scheduler/api"
+)
+
+func (ssn Session) GetPolicy(schedulerConf string) string {
+ var err error
+ conf := map[string]string{"policy": "fifoRandom"}
+ if conf, err = ssn.cache.LoadSchedulerConf(schedulerConf); err != nil {
+ klog.Errorf("Failed to load scheduler policy '%s', using default fifoRandom policy: %v",
+ schedulerConf, err)
+ }
+
+ policyConf, found := conf["policy"]
+ if !found {
+ policyConf = "fifoRandom"
+ }
+
+ return policyConf
+}
+
+func (ssn Session) UpdateScheduledTime(task *api.TaskInfo) {
+ if err := ssn.cache.UpdateScheduledTime(task); err != nil {
+ klog.Errorf("Failed to update scheduled time of task <%v/%v>: %v", task.Namespace, task.Name, err)
+ }
+}
diff --git a/pkg/scheduler/framework/session_plugins.go b/pkg/scheduler/framework/session_plugins.go
index 36557ec587..44dc9ed9f6 100644
--- a/pkg/scheduler/framework/session_plugins.go
+++ b/pkg/scheduler/framework/session_plugins.go
@@ -521,21 +521,23 @@ func (ssn *Session) ReservedNodes() {
// JobOrderFn invoke joborder function of the plugins
func (ssn *Session) JobOrderFn(l, r interface{}) bool {
- for _, tier := range ssn.Tiers {
- for _, plugin := range tier.Plugins {
- if !isEnabled(plugin.EnabledJobOrder) {
- continue
- }
- jof, found := ssn.jobOrderFns[plugin.Name]
- if !found {
- continue
- }
- if j := jof(l, r); j != 0 {
- return j < 0
+ // changes specific to p3k8s --> only order based on creation time,UID
+ /*
+ for _, tier := range ssn.Tiers {
+ for _, plugin := range tier.Plugins {
+ if !isEnabled(plugin.EnabledJobOrder) {
+ continue
+ }
+ jof, found := ssn.jobOrderFns[plugin.Name]
+ if !found {
+ continue
+ }
+ if j := jof(l, r); j != 0 {
+ return j < 0
+ }
}
}
- }
-
+ */
// If no job order funcs, order job by CreationTimestamp first, then by UID.
lv := l.(*api.JobInfo)
rv := r.(*api.JobInfo)
diff --git a/pkg/scheduler/util.go b/pkg/scheduler/util.go
index 57c03c4d53..50159cdf24 100644
--- a/pkg/scheduler/util.go
+++ b/pkg/scheduler/util.go
@@ -31,18 +31,15 @@ import (
)
var DefaultSchedulerConf = `
-actions: "enqueue, allocate, backfill"
+actions: "allocate"
tiers:
- plugins:
- name: priority
- name: gang
- - name: conformance
- plugins:
- - name: overcommit
- name: drf
- name: predicates
- name: proportion
- - name: nodeorder
`
func UnmarshalSchedulerConf(confStr string) ([]framework.Action, []conf.Tier, []conf.Configuration, map[string]string, error) {