diff --git a/docs/developer/designs/topology-awareness/README.md b/docs/developer/designs/topology-awareness/README.md index fae975f7a..af6e10106 100644 --- a/docs/developer/designs/topology-awareness/README.md +++ b/docs/developer/designs/topology-awareness/README.md @@ -301,6 +301,7 @@ The implementation will proceed in phases: 3. Implementation of Approach 1 in a topology plugin (can be the same plugin from step 2.) + ## Alternatives Considered - Using existing Kubernetes topology mechanisms like topology spread constraints, pod affinity diff --git a/pkg/scheduler/plugins/scores/scores.go b/pkg/scheduler/plugins/scores/scores.go index 723ee61e4..40d1bd13c 100644 --- a/pkg/scheduler/plugins/scores/scores.go +++ b/pkg/scheduler/plugins/scores/scores.go @@ -8,6 +8,7 @@ const ( ResourceType = 10 Availability = 100 GpuSharing = 1000 - K8sPlugins = 10000 - NominatedNode = 100000 + Topology = 10000 + K8sPlugins = 100000 + NominatedNode = 1000000 ) diff --git a/pkg/scheduler/plugins/topology/topology_plugin.go b/pkg/scheduler/plugins/topology/topology_plugin.go index ff2cb5ec8..106b311c5 100644 --- a/pkg/scheduler/plugins/topology/topology_plugin.go +++ b/pkg/scheduler/plugins/topology/topology_plugin.go @@ -4,20 +4,23 @@ package topology import ( + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/common_info" "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/node_info" - "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_info" "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/framework" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/k8s_internal" kueuev1alpha1 "sigs.k8s.io/kueue/apis/kueue/v1alpha1" ) const ( topologyPluginName = "topology" - noNodeName = "" ) type topologyPlugin struct { - enabled bool - TopologyTrees map[string]*TopologyInfo + enabled bool + taskOrderFunc common_info.LessFn + sessionStateGetter k8s_internal.SessionStateProvider + nodesInfos map[string]*node_info.NodeInfo + TopologyTrees map[string]*TopologyInfo } func New(pluginArgs map[string]string) framework.Plugin { @@ -33,19 +36,25 @@ func (t *topologyPlugin) Name() string { func (t *topologyPlugin) OnSessionOpen(ssn *framework.Session) { topologies := ssn.Topologies + t.taskOrderFunc = ssn.TaskOrderFn + t.sessionStateGetter = ssn + t.nodesInfos = ssn.Nodes t.initializeTopologyTree(topologies, ssn) - ssn.AddEventHandler(&framework.EventHandler{ - AllocateFunc: t.handleAllocate(ssn), - DeallocateFunc: t.handleDeallocate(ssn), - }) + //pre-predicate to generate the whole topology tree and store per workload + ssn.AddPrePredicateFn(t.prePredicateFn) + //predicate to filter nodes that are related to parts of the tree that cannot accommodate the workload - this is for "required" use only + ssn.AddPredicateFn(t.predicateFn) + //node order to sort the nodes according to topology nodes score - this is for "prefer" use only + ssn.AddNodeOrderFn(t.nodeOrderFn) } func (t *topologyPlugin) initializeTopologyTree(topologies []*kueuev1alpha1.Topology, ssn *framework.Session) { for _, singleTopology := range topologies { topologyTree := &TopologyInfo{ - Name: singleTopology.Name, - Domains: map[TopologyDomainID]*TopologyDomainInfo{}, + Name: singleTopology.Name, + //Domains: map[TopologyDomainID]*TopologyDomainInfo{}, + DomainsByLevel: map[string]map[TopologyDomainID]*TopologyDomainInfo{}, Root: NewTopologyDomainInfo(TopologyDomainID("root"), "datacenter", "cluster", 0), TopologyResource: singleTopology, } @@ -69,10 +78,16 @@ func (*topologyPlugin) addNodeDataToTopology(topologyTree *TopologyInfo, singleT } domainId := calcDomainId(levelIndex, singleTopology.Spec.Levels, nodeInfo.Node.Labels) - domainInfo, foundLevelLabel := topologyTree.Domains[domainId] + domainLevel := level.NodeLabel + domainsForLevel, foundLevelLabel := topologyTree.DomainsByLevel[domainLevel] if !foundLevelLabel { + topologyTree.DomainsByLevel[level.NodeLabel] = map[TopologyDomainID]*TopologyDomainInfo{} + domainsForLevel = topologyTree.DomainsByLevel[level.NodeLabel] + } + domainInfo, foundDomain := domainsForLevel[domainId] + if !foundDomain { domainInfo = NewTopologyDomainInfo(domainId, domainName, level.NodeLabel, levelIndex+1) - topologyTree.Domains[domainId] = domainInfo + domainsForLevel[domainId] = domainInfo } domainInfo.AddNode(nodeInfo) @@ -86,48 +101,4 @@ func (*topologyPlugin) addNodeDataToTopology(topologyTree *TopologyInfo, singleT topologyTree.Root.AddNode(nodeInfo) } -func (t *topologyPlugin) handleAllocate(ssn *framework.Session) func(event *framework.Event) { - return t.updateTopologyGivenPodEvent(ssn, func(domainInfo *TopologyDomainInfo, podInfo *pod_info.PodInfo) { - domainInfo.AllocatedResources.AddResourceRequirements(podInfo.AcceptedResource) - domainInfo.AllocatedResources.BaseResource.ScalarResources()["pods"] = - domainInfo.AllocatedResources.BaseResource.ScalarResources()["pods"] + 1 - }) -} - -func (t *topologyPlugin) handleDeallocate(ssn *framework.Session) func(event *framework.Event) { - return t.updateTopologyGivenPodEvent(ssn, func(domainInfo *TopologyDomainInfo, podInfo *pod_info.PodInfo) { - domainInfo.AllocatedResources.SubResourceRequirements(podInfo.AcceptedResource) - domainInfo.AllocatedResources.BaseResource.ScalarResources()["pods"] = - domainInfo.AllocatedResources.BaseResource.ScalarResources()["pods"] - 1 - }) -} - -func (t *topologyPlugin) updateTopologyGivenPodEvent( - ssn *framework.Session, - domainUpdater func(domainInfo *TopologyDomainInfo, podInfo *pod_info.PodInfo), -) func(event *framework.Event) { - return func(event *framework.Event) { - pod := event.Task.Pod - nodeName := event.Task.NodeName - if nodeName == noNodeName { - return - } - node := ssn.Nodes[nodeName].Node - podInfo := ssn.Nodes[nodeName].PodInfos[pod_info.PodKey(pod)] - - for _, topologyTree := range t.TopologyTrees { - leafDomainId := calcLeafDomainId(topologyTree.TopologyResource, node.Labels) - domainInfo := topologyTree.Domains[leafDomainId] - for domainInfo != nil { - domainUpdater(domainInfo, podInfo) - - if domainInfo.Nodes[nodeName] != nil { - break - } - domainInfo = domainInfo.Parent - } - } - } -} - func (t *topologyPlugin) OnSessionClose(ssn *framework.Session) {} diff --git a/pkg/scheduler/plugins/topology/topology_plugin_job_filtering.go b/pkg/scheduler/plugins/topology/topology_plugin_job_filtering.go new file mode 100644 index 000000000..efd34f241 --- /dev/null +++ b/pkg/scheduler/plugins/topology/topology_plugin_job_filtering.go @@ -0,0 +1,357 @@ +// Copyright 2025 NVIDIA CORPORATION +// SPDX-License-Identifier: Apache-2.0 + +package topology + +import ( + "errors" + "fmt" + "slices" + "sort" + + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/node_info" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_info" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/podgroup_info" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/resource_info" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/log" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/plugins/scores" + "k8s.io/apimachinery/pkg/types" + k8sframework "k8s.io/kubernetes/pkg/scheduler/framework" +) + +type topologyStateData struct { + relevantDomains []*TopologyDomainInfo +} + +func (t *topologyStateData) Clone() k8sframework.StateData { + return &topologyStateData{ + relevantDomains: t.relevantDomains, + } +} + +type jobAllocationMetaData struct { + maxPodResources *resource_info.ResourceRequirements + allocationTestPods []*pod_info.PodInfo + tasksToAllocate []*pod_info.PodInfo +} + +func (t *topologyPlugin) prePredicateFn(_ *pod_info.PodInfo, job *podgroup_info.PodGroupInfo) error { + topologyTree, err := t.getJobTopology(job) + if err != nil { + return err + } + if topologyTree == nil { + return nil + } + + //Check cycle cache to see if the calculation has already been done + jobAllocateableDomains, _ := t.loadAllocateableDomainsFromCache(types.UID(job.PodGroupUID)) + if jobAllocateableDomains != nil { + return nil + } + + // Calc tree job allocation data + maxAllocatablePods, err := t.calcTreeAllocatable(job, topologyTree) + if err != nil { + return err + } + + // Get best domains for the job + var jobAllocateableDomain []*TopologyDomainInfo + if maxAllocatablePods >= len(podgroup_info.GetTasksToAllocate(job, t.taskOrderFunc, true)) { + jobAllocateableDomain, err = t.getBestjobAllocateableDomains(job, topologyTree) + if err != nil { + return err + } + } + + // Clean allocation data from the tree + for _, levelDomains := range topologyTree.DomainsByLevel { + for _, domain := range levelDomains { + domain.AllocatablePods = 0 + } + } + + if len(jobAllocateableDomain) == 0 { + log.InfraLogger.V(6).Infof("no relevant domains found for job %s, workload topology name: %s", + job.PodGroup.Name, topologyTree.Name) + } + + //Save results to cycle cache + cycleJobState := (*k8sframework.CycleState)(t.sessionStateGetter.GetK8sStateForPod(job.PodGroupUID)) + cycleJobState.Write( + k8sframework.StateKey(topologyPluginName), + &topologyStateData{relevantDomains: jobAllocateableDomain}, + ) + + return nil +} + +func (t *topologyPlugin) getJobTopology(job *podgroup_info.PodGroupInfo) (*TopologyInfo, error) { + jobTopologyName := job.PodGroup.Spec.TopologyConstraint.Topology + if jobTopologyName == "" { + return nil, nil + } + topologyTree := t.TopologyTrees[jobTopologyName] + if topologyTree == nil { + return nil, fmt.Errorf("matching topology tree haven't been found for job %s, workload topology name: %s", + job.PodGroup.Name, jobTopologyName) + } + return topologyTree, nil +} + +func (t *topologyPlugin) calcTreeAllocatable(job *podgroup_info.PodGroupInfo, topologyTree *TopologyInfo) (int, error) { + jobAllocationMetaData, err := initJobAllocationMetadataStruct(job, t) + if err != nil { + return 0, err + } + + return t.calcSubTreeAllocatable(jobAllocationMetaData, topologyTree.Root) +} + +func initJobAllocationMetadataStruct(job *podgroup_info.PodGroupInfo, t *topologyPlugin) (*jobAllocationMetaData, error) { + tasksToAllocate := podgroup_info.GetTasksToAllocate(job, t.taskOrderFunc, true) + maxPodResources := resource_info.NewResourceRequirements(0, 0, 0) + for _, podInfo := range tasksToAllocate { + err := maxPodResources.SetMaxResource(podInfo.ResReq) + if err != nil { + return nil, err + } + } + initialAllocationTestPods := []*pod_info.PodInfo{ + {Name: "1-pods-resources", ResReq: maxPodResources}, + } + jobAllocationData := &jobAllocationMetaData{ + maxPodResources: maxPodResources, + allocationTestPods: initialAllocationTestPods, + tasksToAllocate: tasksToAllocate, + } + return jobAllocationData, nil +} + +func (t *topologyPlugin) calcSubTreeAllocatable(jobAllocationData *jobAllocationMetaData, rootDomain *TopologyDomainInfo) (int, error) { + if rootDomain == nil { + return 0, nil + } + + if len(rootDomain.Children) == 0 { + for _, node := range rootDomain.Nodes { + rootDomain.AllocatablePods += calcNodeAccomedation(jobAllocationData, node) + } + return rootDomain.AllocatablePods, nil + } + + for _, child := range rootDomain.Children { + childAllocateable, err := t.calcSubTreeAllocatable(jobAllocationData, child) + if err != nil { + return 0, err + } + rootDomain.AllocatablePods += childAllocateable + } + return rootDomain.AllocatablePods, nil +} + +func (t *topologyPlugin) getBestjobAllocateableDomains(job *podgroup_info.PodGroupInfo, topologyTree *TopologyInfo) ([]*TopologyDomainInfo, error) { + relevantLevels, err := t.calculateRelevantDomainLevels(job, topologyTree.Name, topologyTree) + if err != nil { + return nil, err + } + taskToAllocateCount := len(podgroup_info.GetTasksToAllocate(job, t.taskOrderFunc, true)) + + maxDepthDomains := []*TopologyDomainInfo{} + for _, level := range relevantLevels { + for _, domain := range topologyTree.DomainsByLevel[level] { + if domain.AllocatablePods < taskToAllocateCount { // Filter domains that cannot allocate the job + continue + } + + maxDepthDomains = append(maxDepthDomains, domain) + } + if len(maxDepthDomains) > 0 { + break + } + } + + if job.PodGroup.Spec.TopologyConstraint.PreferredTopologyLevel != "" { + return t.improveChoiseForPreference(maxDepthDomains, job) + } + + return maxDepthDomains, nil +} + +func (t *topologyPlugin) improveChoiseForPreference(maxDepthDomains []*TopologyDomainInfo, job *podgroup_info.PodGroupInfo) ([]*TopologyDomainInfo, error) { + // if Preferred is defined and we found a domain on the prefered level, return it + taskToAllocateCount := len(podgroup_info.GetTasksToAllocate(job, t.taskOrderFunc, true)) + if maxDepthDomains[0].Level == job.PodGroup.Spec.TopologyConstraint.PreferredTopologyLevel { + return []*TopologyDomainInfo{maxDepthDomains[0]}, nil + } + + // else, look for a subgroup of children domains that allows the job to be allocated + bestChildrenSubset := []*TopologyDomainInfo{} + for _, domain := range maxDepthDomains { + childDomainSubset := getJobAllocateableChildrenSubset(domain, taskToAllocateCount) + if len(bestChildrenSubset) == 0 || len(childDomainSubset) < len(bestChildrenSubset) { + bestChildrenSubset = childDomainSubset + } + } + return bestChildrenSubset, nil +} + +func getJobAllocateableChildrenSubset(domain *TopologyDomainInfo, taskToAllocateCount int) []*TopologyDomainInfo { + children := slices.Clone(domain.Children) + sort.SliceStable(children, func(i, j int) bool { + return children[i].AllocatablePods > children[j].AllocatablePods + }) + + allocateablePodsSum := 0 + childDomainSubset := []*TopologyDomainInfo{} + for _, childDomain := range children { + allocateablePodsSum += childDomain.AllocatablePods + childDomainSubset = append(childDomainSubset, childDomain) + if allocateablePodsSum >= taskToAllocateCount { + break + } + } + return childDomainSubset +} + +func calcNodeAccomedation(jobAllocationMetaData *jobAllocationMetaData, node *node_info.NodeInfo) int { + allocateablePodsCount := 0 + for _, resourceRepresentorPod := range jobAllocationMetaData.allocationTestPods { + if node.IsTaskAllocatable(resourceRepresentorPod) { + allocateablePodsCount++ + } else { + break + } + } + // Add more to jobResourcesAllocationsRepresentors until node cannot accommodate any more pods + if allocateablePodsCount == len(jobAllocationMetaData.allocationTestPods) { + for i := allocateablePodsCount; i < len(jobAllocationMetaData.tasksToAllocate); i++ { + latestTestPod := jobAllocationMetaData.allocationTestPods[len(jobAllocationMetaData.allocationTestPods)-1] + + iAllocationsTestPod := &pod_info.PodInfo{ + Name: fmt.Sprintf("%d-pods-resources", allocateablePodsCount+1), + ResReq: calcNextAllocationTestPodResources(latestTestPod.ResReq, jobAllocationMetaData.maxPodResources), + } + jobAllocationMetaData.allocationTestPods = append(jobAllocationMetaData.allocationTestPods, iAllocationsTestPod) + if node.IsTaskAllocatable(iAllocationsTestPod) { + allocateablePodsCount++ + } else { + break + } + } + } + return allocateablePodsCount +} + +func calcNextAllocationTestPodResources(previousTestResources, maxPodResources *resource_info.ResourceRequirements) *resource_info.ResourceRequirements { + nPlus1Resources := previousTestResources.Clone() + nPlus1Resources.BaseResource.Add(&maxPodResources.BaseResource) + if len(nPlus1Resources.GpuResourceRequirement.MigResources()) > 0 { + for migResource, quant := range maxPodResources.GpuResourceRequirement.MigResources() { + nPlus1Resources.GpuResourceRequirement.MigResources()[migResource] += quant + } + } else { + updatedGpuResource := resource_info.NewGpuResourceRequirementWithMultiFraction( + nPlus1Resources.GetNumOfGpuDevices(), + nPlus1Resources.GpuFractionalPortion(), + nPlus1Resources.GpuMemory()) + nPlus1Resources.GpuResourceRequirement = *updatedGpuResource + } + return nPlus1Resources +} + +func (*topologyPlugin) calculateRelevantDomainLevels( + job *podgroup_info.PodGroupInfo, jobTopologyName string, + topologyTree *TopologyInfo) ([]string, error) { + requiredPlacement := job.PodGroup.Spec.TopologyConstraint.RequiredTopologyLevel + preferredPlacement := job.PodGroup.Spec.TopologyConstraint.PreferredTopologyLevel + if requiredPlacement == "" && preferredPlacement == "" { + return nil, fmt.Errorf("no topology placement annotations found for job %s, workload topology name: %s", job.PodGroup.Name, jobTopologyName) + } + + foundRequiredLevel := false + foundPreferredLevel := false + relevantLevels := []string{} + abovePreferredLevel := preferredPlacement == "" + for _, level := range topologyTree.TopologyResource.Spec.Levels { + if preferredPlacement != "" && preferredPlacement == level.NodeLabel { + foundPreferredLevel = true + abovePreferredLevel = true + } + + if !abovePreferredLevel { + continue + } + relevantLevels = append(relevantLevels, level.NodeLabel) + + if requiredPlacement != "" && requiredPlacement == level.NodeLabel { + foundRequiredLevel = true + break // Next level won't fulfill the required placement + } + } + if requiredPlacement != "" && !foundRequiredLevel { + return nil, fmt.Errorf("the topology %s doesn't have a level matching the required(%s) spesified for the job %s", + jobTopologyName, requiredPlacement, job.Name, + ) + } + if preferredPlacement != "" && !foundPreferredLevel { + return nil, fmt.Errorf("the topology %s doesn't have a level matching the preffered(%s) spesified for the job %s", + jobTopologyName, preferredPlacement, job.Name, + ) + } + return relevantLevels, nil +} + +func (t *topologyPlugin) predicateFn(pod *pod_info.PodInfo, job *podgroup_info.PodGroupInfo, node *node_info.NodeInfo) error { + jobAllocateableDomains, err := t.loadAllocateableDomainsFromCache(job.PodGroupUID) + if err != nil { + return err + } + + // For topology stage 1 - choose one domain, accept nodes allocation only if they are part of the chosen domain + if len(jobAllocateableDomains) > 0 { + chosenDomain := jobAllocateableDomains[0] + if chosenDomain.Nodes[node.Node.Name] == nil { + return fmt.Errorf("the node %s is not part of the chosen topology domain for the job %s. The chosen domain is %s", + node.Node.Name, job.PodGroup.Name, chosenDomain.Name) + } + } + + return nil +} + +func (t *topologyPlugin) nodeOrderFn(pod *pod_info.PodInfo, node *node_info.NodeInfo) (float64, error) { + score := 0.0 + + jobAllocateableDomains, err := t.loadAllocateableDomainsFromCache(types.UID(pod.Job)) + if err != nil { + return score, err + } + + // For topology stage 1 - choose one domain, accept nodes allocation only if they are part of the chosen domain + if len(jobAllocateableDomains) > 0 { + chosenDomain := jobAllocateableDomains[0] + if chosenDomain.Nodes[node.Node.Name] != nil { + score = scores.Topology + } + } + + return score, nil +} + +func (t *topologyPlugin) loadAllocateableDomainsFromCache(podGroupUID types.UID) ([]*TopologyDomainInfo, error) { + cycleJobState := (*k8sframework.CycleState)(t.sessionStateGetter.GetK8sStateForPod(podGroupUID)) + if cycleJobState == nil { + return nil, nil + } + jobTopologyStateData, err := cycleJobState.Read(k8sframework.StateKey(topologyPluginName)) + if err != nil { + if errors.Is(err, k8sframework.ErrNotFound) { + return nil, nil + } + return nil, err + } + jobAllocateableDomains := jobTopologyStateData.(*topologyStateData).relevantDomains + return jobAllocateableDomains, nil +} diff --git a/pkg/scheduler/plugins/topology/topology_plugin_job_filtering_test.go b/pkg/scheduler/plugins/topology/topology_plugin_job_filtering_test.go new file mode 100644 index 000000000..4ed2df510 --- /dev/null +++ b/pkg/scheduler/plugins/topology/topology_plugin_job_filtering_test.go @@ -0,0 +1,1504 @@ +// Copyright 2025 NVIDIA CORPORATION +// SPDX-License-Identifier: Apache-2.0 + +package topology + +import ( + "slices" + "sort" + "testing" + + "k8s.io/utils/ptr" + kueuev1alpha1 "sigs.k8s.io/kueue/apis/kueue/v1alpha1" + + enginev2alpha2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2alpha2" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/common_info" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/node_info" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_info" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_status" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/podgroup_info" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/framework" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/test_utils/jobs_fake" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/test_utils/nodes_fake" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/test_utils/tasks_fake" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestTopologyPlugin_calculateRelevantDomainLevels(t *testing.T) { + tests := []struct { + name string + job *podgroup_info.PodGroupInfo + jobTopologyName string + topologyTree *TopologyInfo + expectedLevels []string + expectedError string + }{ + { + name: "both required and preferred placement specified", + job: &podgroup_info.PodGroupInfo{ + Name: "test-job", + PodGroup: &enginev2alpha2.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + }, + Spec: enginev2alpha2.PodGroupSpec{ + TopologyConstraint: enginev2alpha2.TopologyConstraint{ + RequiredTopologyLevel: "zone", + PreferredTopologyLevel: "rack", + }, + }, + }, + }, + jobTopologyName: "test-topology", + topologyTree: &TopologyInfo{ + Name: "test-topology", + TopologyResource: &kueuev1alpha1.Topology{ + Spec: kueuev1alpha1.TopologySpec{ + Levels: []kueuev1alpha1.TopologyLevel{ + {NodeLabel: "rack"}, + {NodeLabel: "zone"}, + {NodeLabel: "datacenter"}, + }, + }, + }, + }, + expectedLevels: []string{ + "rack", + "zone", + }, + expectedError: "", + }, + { + name: "only required placement specified", + job: &podgroup_info.PodGroupInfo{ + Name: "test-job", + PodGroup: &enginev2alpha2.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + }, + Spec: enginev2alpha2.PodGroupSpec{ + TopologyConstraint: enginev2alpha2.TopologyConstraint{ + RequiredTopologyLevel: "zone", + }, + }, + }, + }, + jobTopologyName: "test-topology", + topologyTree: &TopologyInfo{ + Name: "test-topology", + TopologyResource: &kueuev1alpha1.Topology{ + Spec: kueuev1alpha1.TopologySpec{ + Levels: []kueuev1alpha1.TopologyLevel{ + {NodeLabel: "rack"}, + {NodeLabel: "zone"}, + {NodeLabel: "datacenter"}, + }, + }, + }, + }, + expectedLevels: []string{ + "rack", + "zone", + }, + expectedError: "", + }, + { + name: "only preferred placement specified", + job: &podgroup_info.PodGroupInfo{ + Name: "test-job", + PodGroup: &enginev2alpha2.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + }, + Spec: enginev2alpha2.PodGroupSpec{ + TopologyConstraint: enginev2alpha2.TopologyConstraint{ + PreferredTopologyLevel: "rack", + }, + }, + }, + }, + jobTopologyName: "test-topology", + topologyTree: &TopologyInfo{ + Name: "test-topology", + TopologyResource: &kueuev1alpha1.Topology{ + Spec: kueuev1alpha1.TopologySpec{ + Levels: []kueuev1alpha1.TopologyLevel{ + {NodeLabel: "rack"}, + {NodeLabel: "zone"}, + {NodeLabel: "datacenter"}, + }, + }, + }, + }, + expectedLevels: []string{ + "rack", + "zone", + "datacenter", + }, + expectedError: "", + }, + { + name: "no placement annotations specified", + job: &podgroup_info.PodGroupInfo{ + Name: "test-job", + PodGroup: &enginev2alpha2.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + Annotations: map[string]string{}, + }, + Spec: enginev2alpha2.PodGroupSpec{ + TopologyConstraint: enginev2alpha2.TopologyConstraint{}, + }, + }, + }, + jobTopologyName: "test-topology", + topologyTree: &TopologyInfo{ + Name: "test-topology", + TopologyResource: &kueuev1alpha1.Topology{ + Spec: kueuev1alpha1.TopologySpec{ + Levels: []kueuev1alpha1.TopologyLevel{ + {NodeLabel: "rack"}, + {NodeLabel: "zone"}, + }, + }, + }, + }, + expectedLevels: nil, + expectedError: "no topology placement annotations found for job test-job, workload topology name: test-topology", + }, + { + name: "required placement not found in topology", + job: &podgroup_info.PodGroupInfo{ + Name: "test-job", + PodGroup: &enginev2alpha2.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + }, + Spec: enginev2alpha2.PodGroupSpec{ + TopologyConstraint: enginev2alpha2.TopologyConstraint{ + RequiredTopologyLevel: "nonexistent", + }, + }, + }, + }, + jobTopologyName: "test-topology", + topologyTree: &TopologyInfo{ + Name: "test-topology", + TopologyResource: &kueuev1alpha1.Topology{ + Spec: kueuev1alpha1.TopologySpec{ + Levels: []kueuev1alpha1.TopologyLevel{ + {NodeLabel: "rack"}, + {NodeLabel: "zone"}, + }, + }, + }, + }, + expectedLevels: nil, + expectedError: "the topology test-topology doesn't have a level matching the required(nonexistent) spesified for the job test-job", + }, + { + name: "preferred placement not found in topology", + job: &podgroup_info.PodGroupInfo{ + Name: "test-job", + PodGroup: &enginev2alpha2.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + }, + Spec: enginev2alpha2.PodGroupSpec{ + TopologyConstraint: enginev2alpha2.TopologyConstraint{ + PreferredTopologyLevel: "nonexistent", + }, + }, + }, + }, + jobTopologyName: "test-topology", + topologyTree: &TopologyInfo{ + Name: "test-topology", + TopologyResource: &kueuev1alpha1.Topology{ + Spec: kueuev1alpha1.TopologySpec{ + Levels: []kueuev1alpha1.TopologyLevel{ + {NodeLabel: "rack"}, + {NodeLabel: "zone"}, + }, + }, + }, + }, + expectedLevels: nil, + expectedError: "the topology test-topology doesn't have a level matching the preffered(nonexistent) spesified for the job test-job", + }, + { + name: "required placement at first level", + job: &podgroup_info.PodGroupInfo{ + Name: "test-job", + PodGroup: &enginev2alpha2.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + }, + Spec: enginev2alpha2.PodGroupSpec{ + TopologyConstraint: enginev2alpha2.TopologyConstraint{ + RequiredTopologyLevel: "rack", + }, + }, + }, + }, + jobTopologyName: "test-topology", + topologyTree: &TopologyInfo{ + Name: "test-topology", + TopologyResource: &kueuev1alpha1.Topology{ + Spec: kueuev1alpha1.TopologySpec{ + Levels: []kueuev1alpha1.TopologyLevel{ + {NodeLabel: "rack"}, + {NodeLabel: "zone"}, + {NodeLabel: "datacenter"}, + }, + }, + }, + }, + expectedLevels: []string{ + "rack", + }, + expectedError: "", + }, + { + name: "preferred placement at first level", + job: &podgroup_info.PodGroupInfo{ + Name: "test-job", + PodGroup: &enginev2alpha2.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + }, + Spec: enginev2alpha2.PodGroupSpec{ + TopologyConstraint: enginev2alpha2.TopologyConstraint{ + PreferredTopologyLevel: "rack", + }, + }, + }, + }, + jobTopologyName: "test-topology", + topologyTree: &TopologyInfo{ + Name: "test-topology", + TopologyResource: &kueuev1alpha1.Topology{ + Spec: kueuev1alpha1.TopologySpec{ + Levels: []kueuev1alpha1.TopologyLevel{ + {NodeLabel: "rack"}, + {NodeLabel: "zone"}, + {NodeLabel: "datacenter"}, + }, + }, + }, + }, + expectedLevels: []string{ + "rack", + "zone", + "datacenter", + }, + expectedError: "", + }, + { + name: "preferred placement at middle level", + job: &podgroup_info.PodGroupInfo{ + Name: "test-job", + PodGroup: &enginev2alpha2.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + }, + Spec: enginev2alpha2.PodGroupSpec{ + TopologyConstraint: enginev2alpha2.TopologyConstraint{ + PreferredTopologyLevel: "zone", + }, + }, + }, + }, + jobTopologyName: "test-topology", + topologyTree: &TopologyInfo{ + Name: "test-topology", + TopologyResource: &kueuev1alpha1.Topology{ + Spec: kueuev1alpha1.TopologySpec{ + Levels: []kueuev1alpha1.TopologyLevel{ + {NodeLabel: "rack"}, + {NodeLabel: "zone"}, + {NodeLabel: "datacenter"}, + }, + }, + }, + }, + expectedLevels: []string{ + "zone", + "datacenter", + }, + expectedError: "", + }, + { + name: "single level topology with preferred placement", + job: &podgroup_info.PodGroupInfo{ + Name: "test-job", + PodGroup: &enginev2alpha2.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + }, + Spec: enginev2alpha2.PodGroupSpec{ + TopologyConstraint: enginev2alpha2.TopologyConstraint{ + PreferredTopologyLevel: "zone", + }, + }, + }, + }, + jobTopologyName: "test-topology", + topologyTree: &TopologyInfo{ + Name: "test-topology", + TopologyResource: &kueuev1alpha1.Topology{ + Spec: kueuev1alpha1.TopologySpec{ + Levels: []kueuev1alpha1.TopologyLevel{ + {NodeLabel: "zone"}, + }, + }, + }, + }, + expectedLevels: []string{ + "zone", + }, + expectedError: "", + }, + { + name: "complex topology with multiple levels", + job: &podgroup_info.PodGroupInfo{ + Name: "test-job", + PodGroup: &enginev2alpha2.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + }, + Spec: enginev2alpha2.PodGroupSpec{ + TopologyConstraint: enginev2alpha2.TopologyConstraint{ + RequiredTopologyLevel: "region", + PreferredTopologyLevel: "zone", + }, + }, + }, + }, + jobTopologyName: "test-topology", + topologyTree: &TopologyInfo{ + Name: "test-topology", + TopologyResource: &kueuev1alpha1.Topology{ + Spec: kueuev1alpha1.TopologySpec{ + Levels: []kueuev1alpha1.TopologyLevel{ + {NodeLabel: "rack"}, + {NodeLabel: "zone"}, + {NodeLabel: "region"}, + {NodeLabel: "datacenter"}, + }, + }, + }, + }, + expectedLevels: []string{ + "zone", + "region", + }, + expectedError: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + plugin := &topologyPlugin{} + + result, err := plugin.calculateRelevantDomainLevels(tt.job, tt.jobTopologyName, tt.topologyTree) + + // Check error + if tt.expectedError != "" { + if err == nil { + t.Errorf("expected error '%s', but got nil", tt.expectedError) + return + } + if err.Error() != tt.expectedError { + t.Errorf("expected error '%s', but got '%s'", tt.expectedError, err.Error()) + } + return + } + + if err != nil { + t.Errorf("unexpected error: %v", err) + return + } + + // Check result + if tt.expectedLevels == nil { + if result != nil { + t.Errorf("expected nil result, but got %v", result) + } + return + } + + if result == nil { + t.Errorf("expected result %v, but got nil", tt.expectedLevels) + return + } + + // Compare maps + if len(result) != len(tt.expectedLevels) { + t.Errorf("expected %d levels, but got %d", len(tt.expectedLevels), len(result)) + } + + if !slices.Equal(result, tt.expectedLevels) { + t.Errorf("expected %v, but got %v", tt.expectedLevels, result) + } + }) + } +} + +func TestTopologyPlugin_calcTreeAllocatable(t *testing.T) { + tests := []struct { + name string + job *jobs_fake.TestJobBasic + allocatedPodGroups []*jobs_fake.TestJobBasic + nodes map[string]nodes_fake.TestNodeBasic + nodesToDomains map[string]TopologyDomainID + setupTopologyTree func() *TopologyInfo + expectedMaxAllocatablePods int + expectedDomains map[TopologyDomainID]*TopologyDomainInfo + }{ + { + name: "two level topology - parent takes child values when children can allocate full job", + job: &jobs_fake.TestJobBasic{ + Name: "test-job", + RequiredCPUsPerTask: 500, + Tasks: []*tasks_fake.TestTaskBasic{ + {State: pod_status.Pending}, + {State: pod_status.Pending}, + }, + }, + nodes: map[string]nodes_fake.TestNodeBasic{ + "node-1": { + CPUMillis: 1000, + GPUs: 6, + MaxTaskNum: ptr.To(100), + }, + "node-2": { + CPUMillis: 1000, + GPUs: 6, + MaxTaskNum: ptr.To(100), + }, + }, + nodesToDomains: map[string]TopologyDomainID{ + "node-1": "rack1.zone1", + "node-2": "rack2.zone1", + }, + setupTopologyTree: func() *TopologyInfo { + tree := &TopologyInfo{ + Name: "test-topology", + TopologyResource: &kueuev1alpha1.Topology{ + Spec: kueuev1alpha1.TopologySpec{ + Levels: []kueuev1alpha1.TopologyLevel{ + {NodeLabel: "rack"}, + {NodeLabel: "zone"}, + }, + }, + }, + DomainsByLevel: map[string]map[TopologyDomainID]*TopologyDomainInfo{ + "rack": { + "rack1.zone1": { + ID: "rack1.zone1", + Name: "rack1", + Level: "rack", + Nodes: map[string]*node_info.NodeInfo{}, + }, + "rack2.zone1": { + ID: "rack2.zone1", + Name: "rack2", + Level: "rack", + Nodes: map[string]*node_info.NodeInfo{}, + }, + }, + "zone": { + "zone1": { + ID: "zone1", + Name: "zone1", + Level: "zone", + Nodes: map[string]*node_info.NodeInfo{}, + }, + }, + }, + } + + tree.Root = tree.DomainsByLevel["zone"]["zone1"] + + // Set parent relationships + tree.DomainsByLevel["zone"]["zone1"].Children = []*TopologyDomainInfo{ + tree.DomainsByLevel["rack"]["rack1.zone1"], + tree.DomainsByLevel["rack"]["rack2.zone1"], + } + tree.DomainsByLevel["rack"]["rack1.zone1"].Parent = tree.DomainsByLevel["zone"]["zone1"] + tree.DomainsByLevel["rack"]["rack2.zone1"].Parent = tree.DomainsByLevel["zone"]["zone1"] + + return tree + }, + expectedMaxAllocatablePods: 4, + expectedDomains: map[TopologyDomainID]*TopologyDomainInfo{ + "rack1.zone1": { + ID: "rack1.zone1", + Name: "rack1", + Level: "rack", + AllocatablePods: 2, + }, + "rack2.zone1": { + ID: "rack2.zone1", + Name: "rack2", + Level: "rack", + AllocatablePods: 2, + }, + "zone1": { + ID: "zone1", + Name: "zone1", + Level: "zone", + AllocatablePods: 4, + }, + }, + }, + { + name: "children cannot allocate full job individually - parent sums allocations", + job: &jobs_fake.TestJobBasic{ + Name: "test-job", + RequiredCPUsPerTask: 800, // Each node can only fit 1 pod (1000/800 = 1) + Tasks: []*tasks_fake.TestTaskBasic{ + {State: pod_status.Pending}, + {State: pod_status.Pending}, + }, + }, + nodes: map[string]nodes_fake.TestNodeBasic{ + "node-1": { + CPUMillis: 1000, + GPUs: 6, + MaxTaskNum: ptr.To(100), + }, + "node-2": { + CPUMillis: 1000, + GPUs: 6, + MaxTaskNum: ptr.To(100), + }, + }, + nodesToDomains: map[string]TopologyDomainID{ + "node-1": "rack1.zone1", + "node-2": "rack2.zone1", + }, + setupTopologyTree: func() *TopologyInfo { + tree := &TopologyInfo{ + Name: "test-topology", + TopologyResource: &kueuev1alpha1.Topology{ + Spec: kueuev1alpha1.TopologySpec{ + Levels: []kueuev1alpha1.TopologyLevel{ + {NodeLabel: "rack"}, + {NodeLabel: "zone"}, + }, + }, + }, + DomainsByLevel: map[string]map[TopologyDomainID]*TopologyDomainInfo{ + "rack": { + "rack1.zone1": { + ID: "rack1.zone1", + Name: "rack1", + Level: "rack", + Nodes: map[string]*node_info.NodeInfo{}, + }, + "rack2.zone1": { + ID: "rack2.zone1", + Name: "rack2", + Level: "rack", + Nodes: map[string]*node_info.NodeInfo{}, + }, + }, + "zone": { + "zone1": { + ID: "zone1", + Name: "zone1", + Level: "zone", + Nodes: map[string]*node_info.NodeInfo{}, + }, + }, + }, + } + + tree.Root = tree.DomainsByLevel["zone"]["zone1"] + + // Set parent relationships + tree.DomainsByLevel["zone"]["zone1"].Children = []*TopologyDomainInfo{ + tree.DomainsByLevel["rack"]["rack1.zone1"], + tree.DomainsByLevel["rack"]["rack2.zone1"], + } + tree.DomainsByLevel["rack"]["rack1.zone1"].Parent = tree.DomainsByLevel["zone"]["zone1"] + tree.DomainsByLevel["rack"]["rack2.zone1"].Parent = tree.DomainsByLevel["zone"]["zone1"] + + return tree + }, + expectedMaxAllocatablePods: 2, + expectedDomains: map[TopologyDomainID]*TopologyDomainInfo{ + "rack1.zone1": { + ID: "rack1.zone1", + Name: "rack1", + Level: "rack", + AllocatablePods: 1, // Can only fit 1 pod + }, + "rack2.zone1": { + ID: "rack2.zone1", + Name: "rack2", + Level: "rack", + AllocatablePods: 1, // Can only fit 1 pod + }, + "zone1": { + ID: "zone1", + Name: "zone1", + Level: "zone", + AllocatablePods: 2, // Sum of children allocations: 1 + 1 + }, + }, + }, + { + name: "mixed distances - parent takes minimum distance", + job: &jobs_fake.TestJobBasic{ + Name: "test-job", + RequiredCPUsPerTask: 500, + Tasks: []*tasks_fake.TestTaskBasic{ + {State: pod_status.Pending}, + {State: pod_status.Pending}, + }, + }, + nodes: map[string]nodes_fake.TestNodeBasic{ + "node-1": { + CPUMillis: 500, + GPUs: 6, + MaxTaskNum: ptr.To(100), + }, + "node-2": { + CPUMillis: 500, + GPUs: 6, + MaxTaskNum: ptr.To(100), + }, + "node-3": { + CPUMillis: 1000, + GPUs: 6, + MaxTaskNum: ptr.To(100), + }, + }, + nodesToDomains: map[string]TopologyDomainID{ + "node-1": "rack1.zone1", + "node-2": "rack1.zone1", + "node-3": "rack2.zone1", + }, + setupTopologyTree: func() *TopologyInfo { + tree := &TopologyInfo{ + Name: "test-topology", + TopologyResource: &kueuev1alpha1.Topology{ + Spec: kueuev1alpha1.TopologySpec{ + Levels: []kueuev1alpha1.TopologyLevel{ + {NodeLabel: "rack"}, + {NodeLabel: "zone"}, + }, + }, + }, + DomainsByLevel: map[string]map[TopologyDomainID]*TopologyDomainInfo{ + "rack": { + "rack1.zone1": { + ID: "rack1.zone1", + Name: "rack1", + Level: "rack", + Nodes: map[string]*node_info.NodeInfo{}, + }, + "rack2.zone1": { + ID: "rack2.zone1", + Name: "rack2", + Level: "rack", + Nodes: map[string]*node_info.NodeInfo{}, + }, + }, + "zone": { + "zone1": { + ID: "zone1", + Name: "zone1", + Level: "zone", + Nodes: map[string]*node_info.NodeInfo{}, + }, + }, + }, + } + + tree.Root = tree.DomainsByLevel["zone"]["zone1"] + + // Set parent relationships + tree.DomainsByLevel["zone"]["zone1"].Children = []*TopologyDomainInfo{ + tree.DomainsByLevel["rack"]["rack1.zone1"], + tree.DomainsByLevel["rack"]["rack2.zone1"], + } + tree.DomainsByLevel["rack"]["rack1.zone1"].Parent = tree.DomainsByLevel["zone"]["zone1"] + tree.DomainsByLevel["rack"]["rack2.zone1"].Parent = tree.DomainsByLevel["zone"]["zone1"] + + return tree + }, + expectedMaxAllocatablePods: 4, + expectedDomains: map[TopologyDomainID]*TopologyDomainInfo{ + "rack1.zone1": { + ID: "rack1.zone1", + Name: "rack1", + Level: "rack", + AllocatablePods: 2, + }, + "rack2.zone1": { + ID: "rack2.zone1", + Name: "rack2", + Level: "rack", + AllocatablePods: 2, + }, + "zone1": { + ID: "zone1", + Name: "zone1", + Level: "zone", + AllocatablePods: 4, + }, + }, + }, + { + name: "no leaf domains - no allocateable domains", + job: &jobs_fake.TestJobBasic{ + Name: "test-job", + RequiredCPUsPerTask: 2000, // Too much for any node + Tasks: []*tasks_fake.TestTaskBasic{ + {State: pod_status.Pending}, + }, + }, + nodes: map[string]nodes_fake.TestNodeBasic{ + "node-1": { + CPUMillis: 1000, + GPUs: 6, + MaxTaskNum: ptr.To(100), + }, + }, + nodesToDomains: map[string]TopologyDomainID{ + "node-1": "zone1", + }, + setupTopologyTree: func() *TopologyInfo { + tree := &TopologyInfo{ + Name: "test-topology", + TopologyResource: &kueuev1alpha1.Topology{ + Spec: kueuev1alpha1.TopologySpec{ + Levels: []kueuev1alpha1.TopologyLevel{ + {NodeLabel: "zone"}, + }, + }, + }, + DomainsByLevel: map[string]map[TopologyDomainID]*TopologyDomainInfo{ + "zone": { + "zone1": { + ID: "zone1", + Name: "zone1", + Level: "zone", + Nodes: map[string]*node_info.NodeInfo{}, + }, + }, + }, + } + + tree.Root = tree.DomainsByLevel["zone"]["zone1"] + + return tree + }, + expectedMaxAllocatablePods: 0, + expectedDomains: map[TopologyDomainID]*TopologyDomainInfo{ + // No domains should have allocations since no nodes can accommodate the job + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + jobName := tt.job.Name + clusterPodGroups := append(tt.allocatedPodGroups, tt.job) + jobsInfoMap, tasksToNodeMap, _ := jobs_fake.BuildJobsAndTasksMaps(clusterPodGroups) + nodesInfoMap := nodes_fake.BuildNodesInfoMap(tt.nodes, tasksToNodeMap) + job := jobsInfoMap[common_info.PodGroupID(jobName)] + + topologyTree := tt.setupTopologyTree() + for nodeName, domainId := range tt.nodesToDomains { + nodeInfo := nodesInfoMap[nodeName] + domain := topologyTree.DomainsByLevel[topologyTree.TopologyResource.Spec.Levels[0].NodeLabel][domainId] + for domain != nil { + if nodeInfo.Node.Labels == nil { + nodeInfo.Node.Labels = map[string]string{ + domain.Level: domain.Name, + } + } else { + nodeInfo.Node.Labels[domain.Level] = domain.Name + } + domain.AddNode(nodeInfo) + domain = domain.Parent + } + } + + session := &framework.Session{ + Nodes: nodesInfoMap, + PodGroupInfos: jobsInfoMap, + Topologies: []*kueuev1alpha1.Topology{topologyTree.TopologyResource}, + } + plugin := &topologyPlugin{ + sessionStateGetter: session, + nodesInfos: nodesInfoMap, + } + + // Call the function under test + maxAllocatablePods, err := plugin.calcTreeAllocatable(job, topologyTree) + if err != nil { + t.Errorf("failed to calc tree allocatable. job: %s, error: %v", job.PodGroup.Name, err) + } + + // Assert + if maxAllocatablePods != tt.expectedMaxAllocatablePods { + t.Errorf("expected max allocatable pods %d, got %d", tt.expectedMaxAllocatablePods, maxAllocatablePods) + } + + if len(tt.expectedDomains) == 0 { + // Check that no domains have allocations + for _, levelDomains := range topologyTree.DomainsByLevel { + for _, domain := range levelDomains { + if domain.AllocatablePods != 0 { + t.Errorf("expected domain %s to have 0 AllocatablePods, got %d", + domain.ID, domain.AllocatablePods) + } + } + } + return + } + + for domainID, expectedDomain := range tt.expectedDomains { + actualDomain, exists := topologyTree.DomainsByLevel[expectedDomain.Level][domainID] + if !exists { + t.Errorf("expected domain %s not found", domainID) + continue + } + if actualDomain.AllocatablePods != expectedDomain.AllocatablePods { + t.Errorf("domain %s: expected AllocatablePods %d, got %d", + domainID, expectedDomain.AllocatablePods, actualDomain.AllocatablePods) + } + } + }) + } +} + +func TestTopologyPlugin_getBestjobAllocateableDomains(t *testing.T) { + tests := []struct { + name string + job *podgroup_info.PodGroupInfo + topologyTree *TopologyInfo + taskOrderFunc common_info.LessFn + expectedDomains []*TopologyDomainInfo + expectedError string + }{ + { + name: "single domain with minimum distance", + job: &podgroup_info.PodGroupInfo{ + Name: "test-job", + MinAvailable: 2, + PodGroup: &enginev2alpha2.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + }, + Spec: enginev2alpha2.PodGroupSpec{ + TopologyConstraint: enginev2alpha2.TopologyConstraint{ + RequiredTopologyLevel: "zone", + PreferredTopologyLevel: "rack", + }, + }, + }, + PodInfos: map[common_info.PodID]*pod_info.PodInfo{ + "pod1": {Name: "pod1", Status: pod_status.Pending}, + "pod2": {Name: "pod2", Status: pod_status.Pending}, + }, + }, + topologyTree: &TopologyInfo{ + Name: "test-topology", + TopologyResource: &kueuev1alpha1.Topology{ + Spec: kueuev1alpha1.TopologySpec{ + Levels: []kueuev1alpha1.TopologyLevel{ + {NodeLabel: "rack"}, + {NodeLabel: "zone"}, + }, + }, + }, + DomainsByLevel: map[string]map[TopologyDomainID]*TopologyDomainInfo{ + "rack": { + "rack1.zone1": { + ID: "rack1.zone1", + Name: "rack1", + Level: "rack", + AllocatablePods: 2, + }, + "rack2.zone1": { + ID: "rack2.zone1", + Name: "rack2", + Level: "rack", + AllocatablePods: 1, + }, + }, + "zone": { + "zone1": { + ID: "zone1", + Name: "zone1", + Level: "zone", + AllocatablePods: 3, + }, + }, + }, + }, + taskOrderFunc: func(l, r interface{}) bool { + return l.(*pod_info.PodInfo).Name < r.(*pod_info.PodInfo).Name + }, + expectedDomains: []*TopologyDomainInfo{ + { + ID: "rack1.zone1", + Name: "rack1", + Level: "rack", + AllocatablePods: 2, + }, + }, + expectedError: "", + }, + { + name: "multiple domains with same minimum distance", + job: &podgroup_info.PodGroupInfo{ + Name: "test-job", + MinAvailable: 2, + PodGroup: &enginev2alpha2.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + }, + Spec: enginev2alpha2.PodGroupSpec{ + TopologyConstraint: enginev2alpha2.TopologyConstraint{ + RequiredTopologyLevel: "zone", + }, + }, + }, + PodInfos: map[common_info.PodID]*pod_info.PodInfo{ + "pod1": {Name: "pod1", Status: pod_status.Pending}, + "pod2": {Name: "pod2", Status: pod_status.Pending}, + }, + }, + topologyTree: &TopologyInfo{ + Name: "test-topology", + TopologyResource: &kueuev1alpha1.Topology{ + Spec: kueuev1alpha1.TopologySpec{ + Levels: []kueuev1alpha1.TopologyLevel{ + {NodeLabel: "rack"}, + {NodeLabel: "zone"}, + }, + }, + }, + DomainsByLevel: map[string]map[TopologyDomainID]*TopologyDomainInfo{ + "rack": { + "rack1.zone1": { + ID: "rack1.zone1", + Name: "rack1", + Level: "rack", + AllocatablePods: 2, + }, + "rack2.zone1": { + ID: "rack2.zone1", + Name: "rack2", + Level: "rack", + AllocatablePods: 2, + }, + }, + "zone": { + "zone1": { + ID: "zone1", + Name: "zone1", + Level: "zone", + AllocatablePods: 4, + }, + }, + }, + }, + taskOrderFunc: func(l, r interface{}) bool { + return l.(*pod_info.PodInfo).Name < r.(*pod_info.PodInfo).Name + }, + expectedDomains: []*TopologyDomainInfo{ + { + ID: "rack1.zone1", + Name: "rack1", + Level: "rack", + AllocatablePods: 2, + }, + { + ID: "rack2.zone1", + Name: "rack2", + Level: "rack", + AllocatablePods: 2, + }, + }, + expectedError: "", + }, + + { + name: "no domains can allocate the job", + job: &podgroup_info.PodGroupInfo{ + Name: "test-job", + MinAvailable: 2, + PodGroup: &enginev2alpha2.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + }, + Spec: enginev2alpha2.PodGroupSpec{ + TopologyConstraint: enginev2alpha2.TopologyConstraint{ + RequiredTopologyLevel: "zone", + }, + }, + }, + PodInfos: map[common_info.PodID]*pod_info.PodInfo{ + "pod1": {Name: "pod1", Status: pod_status.Pending}, + "pod2": {Name: "pod2", Status: pod_status.Pending}, + }, + }, + topologyTree: &TopologyInfo{ + Name: "test-topology", + TopologyResource: &kueuev1alpha1.Topology{ + Spec: kueuev1alpha1.TopologySpec{ + Levels: []kueuev1alpha1.TopologyLevel{ + {NodeLabel: "rack"}, + {NodeLabel: "zone"}, + }, + }, + }, + DomainsByLevel: map[string]map[TopologyDomainID]*TopologyDomainInfo{ + "rack": { + "rack1.zone1": { + ID: "rack1.zone1", + Name: "rack1", + Level: "rack", + AllocatablePods: 1, // Can only fit 1 pod, job needs 2 + }, + }, + "zone": { + "zone1": { + ID: "zone1", + Name: "zone1", + Level: "zone", + AllocatablePods: 1, // Can only fit 1 pod, job needs 2 + }, + }, + }, + }, + taskOrderFunc: func(l, r interface{}) bool { + return l.(*pod_info.PodInfo).Name < r.(*pod_info.PodInfo).Name + }, + expectedDomains: []*TopologyDomainInfo{}, + expectedError: "", + }, + { + name: "no relevant domain levels", + job: &podgroup_info.PodGroupInfo{ + Name: "test-job", + MinAvailable: 1, + PodGroup: &enginev2alpha2.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + }, + Spec: enginev2alpha2.PodGroupSpec{ + TopologyConstraint: enginev2alpha2.TopologyConstraint{ + RequiredTopologyLevel: "zone", + PreferredTopologyLevel: "rack", + }, + }, + }, + PodInfos: map[common_info.PodID]*pod_info.PodInfo{ + "pod1": {Name: "pod1", Status: pod_status.Pending}, + }, + }, + topologyTree: &TopologyInfo{ + Name: "test-topology", + TopologyResource: &kueuev1alpha1.Topology{ + Spec: kueuev1alpha1.TopologySpec{ + Levels: []kueuev1alpha1.TopologyLevel{ + {NodeLabel: "datacenter"}, + {NodeLabel: "region"}, + }, + }, + }, + DomainsByLevel: map[string]map[TopologyDomainID]*TopologyDomainInfo{ + "datacenter": { + "datacenter1": { + ID: "datacenter1", + Name: "datacenter1", + Level: "datacenter", + AllocatablePods: 1, + }, + }, + }, + }, + taskOrderFunc: func(l, r interface{}) bool { + return l.(*pod_info.PodInfo).Name < r.(*pod_info.PodInfo).Name + }, + expectedDomains: nil, + expectedError: "the topology test-topology doesn't have a level matching the required(zone) spesified for the job test-job", + }, + { + name: "complex topology with multiple levels", + job: &podgroup_info.PodGroupInfo{ + Name: "test-job", + MinAvailable: 3, + PodGroup: &enginev2alpha2.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + }, + Spec: enginev2alpha2.PodGroupSpec{ + TopologyConstraint: enginev2alpha2.TopologyConstraint{ + RequiredTopologyLevel: "region", + PreferredTopologyLevel: "zone", + }, + }, + }, + PodInfos: map[common_info.PodID]*pod_info.PodInfo{ + "pod1": {Name: "pod1", Status: pod_status.Pending}, + "pod2": {Name: "pod2", Status: pod_status.Pending}, + "pod3": {Name: "pod3", Status: pod_status.Pending}, + }, + }, + topologyTree: &TopologyInfo{ + Name: "test-topology", + TopologyResource: &kueuev1alpha1.Topology{ + Spec: kueuev1alpha1.TopologySpec{ + Levels: []kueuev1alpha1.TopologyLevel{ + {NodeLabel: "rack"}, + {NodeLabel: "zone"}, + {NodeLabel: "region"}, + {NodeLabel: "datacenter"}, + }, + }, + }, + DomainsByLevel: map[string]map[TopologyDomainID]*TopologyDomainInfo{ + "rack": { + "rack1.zone1.region1": { + ID: "rack1.zone1.region1", + Name: "rack1", + Level: "rack", + AllocatablePods: 3, + }, + "rack2.zone1.region1": { + ID: "rack2.zone1.region1", + Name: "rack2", + Level: "rack", + AllocatablePods: 3, + }, + }, + "zone": { + "zone1.region1": { + ID: "zone1.region1", + Name: "zone1", + Level: "zone", + AllocatablePods: 6, + }, + }, + "region": { + "region1": { + ID: "region1", + Name: "region1", + Level: "region", + AllocatablePods: 9, + }, + }, + }, + }, + taskOrderFunc: func(l, r interface{}) bool { + return l.(*pod_info.PodInfo).Name < r.(*pod_info.PodInfo).Name + }, + expectedDomains: []*TopologyDomainInfo{ + { + ID: "zone1.region1", + Name: "zone1", + Level: "zone", + AllocatablePods: 6, + }, + }, + expectedError: "", + }, + { + name: "mixed task statuses - some pending, some running", + job: &podgroup_info.PodGroupInfo{ + Name: "test-job", + MinAvailable: 2, + PodGroup: &enginev2alpha2.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + }, + Spec: enginev2alpha2.PodGroupSpec{ + TopologyConstraint: enginev2alpha2.TopologyConstraint{ + RequiredTopologyLevel: "zone", + }, + }, + }, + PodInfos: map[common_info.PodID]*pod_info.PodInfo{ + "pod1": {Name: "pod1", Status: pod_status.Running}, + "pod2": {Name: "pod2", Status: pod_status.Pending}, + "pod3": {Name: "pod3", Status: pod_status.Pending}, + }, + }, + topologyTree: &TopologyInfo{ + Name: "test-topology", + TopologyResource: &kueuev1alpha1.Topology{ + Spec: kueuev1alpha1.TopologySpec{ + Levels: []kueuev1alpha1.TopologyLevel{ + {NodeLabel: "zone"}, + }, + }, + }, + DomainsByLevel: map[string]map[TopologyDomainID]*TopologyDomainInfo{ + "zone": { + "zone1": { + ID: "zone1", + Name: "zone1", + Level: "zone", + AllocatablePods: 2, + }, + }, + }, + }, + taskOrderFunc: func(l, r interface{}) bool { + return l.(*pod_info.PodInfo).Name < r.(*pod_info.PodInfo).Name + }, + expectedDomains: []*TopologyDomainInfo{ + { + ID: "zone1", + Name: "zone1", + Level: "zone", + AllocatablePods: 2, + }, + }, + expectedError: "", + }, + { + name: "Return children subset", + job: &podgroup_info.PodGroupInfo{ + Name: "test-job", + MinAvailable: 4, + PodGroup: &enginev2alpha2.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + }, + Spec: enginev2alpha2.PodGroupSpec{ + TopologyConstraint: enginev2alpha2.TopologyConstraint{ + RequiredTopologyLevel: "region", + PreferredTopologyLevel: "rack", + }, + }, + }, + PodInfos: map[common_info.PodID]*pod_info.PodInfo{ + "pod1": {Name: "pod1", Status: pod_status.Pending}, + "pod2": {Name: "pod2", Status: pod_status.Pending}, + "pod3": {Name: "pod3", Status: pod_status.Pending}, + "pod4": {Name: "pod4", Status: pod_status.Pending}, + }, + }, + topologyTree: &TopologyInfo{ + Name: "test-topology", + TopologyResource: &kueuev1alpha1.Topology{ + Spec: kueuev1alpha1.TopologySpec{ + Levels: []kueuev1alpha1.TopologyLevel{ + {NodeLabel: "rack"}, + {NodeLabel: "zone"}, + {NodeLabel: "region"}, + {NodeLabel: "datacenter"}, + }, + }, + }, + DomainsByLevel: map[string]map[TopologyDomainID]*TopologyDomainInfo{ + "rack": { + "rack1.zone1.region1": { + ID: "rack1.zone1.region1", + Name: "rack1", + Level: "rack", + AllocatablePods: 3, + }, + "rack2.zone1.region1": { + ID: "rack2.zone1.region1", + Name: "rack2", + Level: "rack", + AllocatablePods: 3, + }, + "rack1.zone2.region1": { + ID: "rack1.zone2.region1", + Name: "rack1", + Level: "rack", + AllocatablePods: 2, + }, + "rack2.zone2.region1": { + ID: "rack2.zone1.region1", + Name: "rack2", + Level: "rack", + AllocatablePods: 1, + }, + "rack3.zone3.region1": { + ID: "rack3.zone2.region1", + Name: "rack3", + Level: "rack", + AllocatablePods: 1, + }, + "rack4.zone2.region1": { + ID: "rack4.zone2.region1", + Name: "rack4", + Level: "rack", + AllocatablePods: 1, + }, + "rack5.zone2.region1": { + ID: "rack5.zone2.region1", + Name: "rack5", + Level: "rack", + AllocatablePods: 1, + }, + }, + "zone": { + "zone1.region1": { + ID: "zone1.region1", + Name: "zone1", + Level: "zone", + AllocatablePods: 6, + Children: []*TopologyDomainInfo{ + { + ID: "rack1.zone1.region1", + Name: "rack1", + Level: "rack", + AllocatablePods: 3, + }, + { + ID: "rack2.zone1.region1", + Name: "rack2", + Level: "rack", + AllocatablePods: 3, + }, + }, + }, + "zone2.region1": { + ID: "zone2.region1", + Name: "zone2", + Level: "zone", + AllocatablePods: 6, + Children: []*TopologyDomainInfo{ + { + ID: "rack1.zone2.region1", + Name: "rack1", + Level: "rack", + AllocatablePods: 2, + }, + { + ID: "rack2.zone1.region1", + Name: "rack2", + Level: "rack", + AllocatablePods: 1, + }, + { + ID: "rack3.zone2.region1", + Name: "rack3", + Level: "rack", + AllocatablePods: 1, + }, + { + ID: "rack4.zone2.region1", + Name: "rack4", + Level: "rack", + AllocatablePods: 1, + }, + { + ID: "rack5.zone2.region1", + Name: "rack5", + Level: "rack", + AllocatablePods: 1, + }, + }, + }, + }, + "region": { + "region1": { + ID: "region1", + Name: "region1", + Level: "region", + AllocatablePods: 9, + }, + }, + }, + }, + taskOrderFunc: func(l, r interface{}) bool { + return l.(*pod_info.PodInfo).Name < r.(*pod_info.PodInfo).Name + }, + expectedDomains: []*TopologyDomainInfo{ + { + ID: "rack1.zone1.region1", + Name: "rack1", + Level: "rack", + AllocatablePods: 3, + }, + { + ID: "rack2.zone1.region1", + Name: "rack2", + Level: "rack", + AllocatablePods: 3, + }, + }, + expectedError: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + plugin := &topologyPlugin{ + taskOrderFunc: tt.taskOrderFunc, + } + + result, err := plugin.getBestjobAllocateableDomains(tt.job, tt.topologyTree) + + // Check error + if tt.expectedError != "" { + if err == nil { + t.Errorf("expected error '%s', but got nil", tt.expectedError) + return + } + if err.Error() != tt.expectedError { + t.Errorf("expected error '%s', but got '%s'", tt.expectedError, err.Error()) + } + return + } + + if err != nil { + t.Errorf("unexpected error: %v", err) + return + } + + // Check result + if len(result) != len(tt.expectedDomains) { + t.Errorf("expected %d domains, but got %d", len(tt.expectedDomains), len(result)) + return + } + + // Sort both slices by domain ID for consistent comparison + sortDomains := func(domains []*TopologyDomainInfo) { + sort.Slice(domains, func(i, j int) bool { + return domains[i].ID < domains[j].ID + }) + } + sortDomains(result) + sortDomains(tt.expectedDomains) + + for i, expectedDomain := range tt.expectedDomains { + if i >= len(result) { + t.Errorf("expected domain at index %d not found in result", i) + continue + } + + actualDomain := result[i] + if actualDomain.ID != expectedDomain.ID { + t.Errorf("domain %d: expected ID %s, got %s", i, expectedDomain.ID, actualDomain.ID) + } + if actualDomain.Name != expectedDomain.Name { + t.Errorf("domain %d: expected Name %s, got %s", i, expectedDomain.Name, actualDomain.Name) + } + if actualDomain.Level != expectedDomain.Level { + t.Errorf("domain %d: expected Level %s, got %s", i, expectedDomain.Level, actualDomain.Level) + } + if actualDomain.AllocatablePods != expectedDomain.AllocatablePods { + t.Errorf("domain %d: expected AllocatablePods %d, got %d", i, expectedDomain.AllocatablePods, actualDomain.AllocatablePods) + } + } + }) + } +} diff --git a/pkg/scheduler/plugins/topology/topology_plugin_test.go b/pkg/scheduler/plugins/topology/topology_plugin_test.go index 96e0b9677..2cf356b9d 100644 --- a/pkg/scheduler/plugins/topology/topology_plugin_test.go +++ b/pkg/scheduler/plugins/topology/topology_plugin_test.go @@ -9,16 +9,13 @@ import ( kubeaischedulerver "github.com/NVIDIA/KAI-scheduler/pkg/apis/client/clientset/versioned/fake" "github.com/NVIDIA/KAI-scheduler/pkg/common/constants" - "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/common_info" "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/node_info" - "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_info" "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/resource_info" "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/cache" "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/conf" "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/framework" "github.com/stretchr/testify/assert" - corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -185,636 +182,27 @@ func TestTopologyPlugin_initializeTopologyTree(t *testing.T) { assert.Equal(t, 1, len(topologyTrees)) testTopologyObj := topologyTrees["test-topology"] assert.Equal(t, "test-topology", testTopologyObj.Name) - assert.Equal(t, 5, len(testTopologyObj.Domains)) + assert.Equal(t, 2, len(testTopologyObj.DomainsByLevel)) - assert.Equal(t, "test-block-1", testTopologyObj.Domains["test-block-1"].Name) - assert.Equal(t, 1, testTopologyObj.Domains["test-block-1"].Depth) - assert.Equal(t, "CPU: 2 (cores), memory: 0 (GB), Gpus: 2", - testTopologyObj.Domains["test-block-1"].AvailableResources.String()) - assert.Equal(t, "CPU: 1 (cores), memory: 0 (GB), Gpus: 1", - testTopologyObj.Domains["test-block-1"].AllocatedResources.String()) + blockDomains := testTopologyObj.DomainsByLevel["test-topology-label/block"] + rackDomains := testTopologyObj.DomainsByLevel["test-topology-label/rack"] - assert.Equal(t, "test-rack-1", testTopologyObj.Domains["test-block-1.test-rack-1"].Name) - assert.Equal(t, "test-block-1", testTopologyObj.Domains["test-block-1.test-rack-1"].Parent.Name) - assert.Equal(t, 2, testTopologyObj.Domains["test-block-1.test-rack-1"].Depth) - assert.Equal(t, "CPU: 1 (cores), memory: 0 (GB), Gpus: 1", - testTopologyObj.Domains["test-block-1.test-rack-1"].AvailableResources.String()) - assert.Equal(t, "CPU: 0.5 (cores), memory: 0 (GB), Gpus: 1", - testTopologyObj.Domains["test-block-1.test-rack-1"].AllocatedResources.String()) + assert.Equal(t, "test-block-1", blockDomains["test-block-1"].Name) + assert.Equal(t, "test-topology-label/block", blockDomains["test-block-1"].Level) + assert.Equal(t, 1, blockDomains["test-block-1"].Depth) - assert.Equal(t, "test-rack-2", testTopologyObj.Domains["test-block-1.test-rack-2"].Name) - assert.Equal(t, "test-block-1", testTopologyObj.Domains["test-block-1.test-rack-2"].Parent.Name) - assert.Equal(t, 2, testTopologyObj.Domains["test-block-1.test-rack-2"].Depth) - assert.Equal(t, "CPU: 1 (cores), memory: 0 (GB), Gpus: 1", - testTopologyObj.Domains["test-block-1.test-rack-2"].AvailableResources.String()) - assert.Equal(t, "CPU: 0.5 (cores), memory: 0 (GB), Gpus: 0", - testTopologyObj.Domains["test-block-1.test-rack-2"].AllocatedResources.String()) + assert.Equal(t, "test-rack-1", rackDomains["test-block-1.test-rack-1"].Name) + assert.Equal(t, "test-block-1", rackDomains["test-block-1.test-rack-1"].Parent.Name) + assert.Equal(t, 2, rackDomains["test-block-1.test-rack-1"].Depth) - assert.Equal(t, "test-block-2", testTopologyObj.Domains["test-block-2"].Name) - assert.Equal(t, 1, testTopologyObj.Domains["test-block-2"].Depth) - assert.Equal(t, "CPU: 1 (cores), memory: 0 (GB), Gpus: 3", - testTopologyObj.Domains["test-block-2"].AvailableResources.String()) - assert.Equal(t, "CPU: 1 (cores), memory: 0 (GB), Gpus: 3", - testTopologyObj.Domains["test-block-2"].AllocatedResources.String()) + assert.Equal(t, "test-rack-2", rackDomains["test-block-1.test-rack-2"].Name) + assert.Equal(t, "test-block-1", rackDomains["test-block-1.test-rack-2"].Parent.Name) + assert.Equal(t, 2, rackDomains["test-block-1.test-rack-2"].Depth) - assert.Equal(t, "test-rack-1", testTopologyObj.Domains["test-block-2.test-rack-1"].Name) - assert.Equal(t, "test-block-2", testTopologyObj.Domains["test-block-2.test-rack-1"].Parent.Name) - assert.Equal(t, 2, testTopologyObj.Domains["test-block-2.test-rack-1"].Depth) - assert.Equal(t, "CPU: 1 (cores), memory: 0 (GB), Gpus: 3", - testTopologyObj.Domains["test-block-2.test-rack-1"].AvailableResources.String()) - assert.Equal(t, "CPU: 1 (cores), memory: 0 (GB), Gpus: 3", - testTopologyObj.Domains["test-block-2.test-rack-1"].AllocatedResources.String()) -} - -func TestTopologyPlugin_HandleAllocate(t *testing.T) { - tests := []struct { - name string - setupTopology func() *topologyPlugin - setupEvent func() *framework.Event - podInfos map[common_info.PodID]*pod_info.PodInfo - expectedAllocs map[string]map[string]int64 // topologyName -> domainID -> expected CPU allocation - }{ - { - name: "SingleTopologySingleDomain", - setupTopology: func() *topologyPlugin { - plugin := &topologyPlugin{ - enabled: true, - TopologyTrees: make(map[string]*TopologyInfo), - } - - // Create topology resource - topology := &kueuev1alpha1.Topology{ - ObjectMeta: metav1.ObjectMeta{Name: "test-topology"}, - Spec: kueuev1alpha1.TopologySpec{ - Levels: []kueuev1alpha1.TopologyLevel{ - {NodeLabel: "zone"}, - }, - }, - } - - // Create domain info - domainInfo := NewTopologyDomainInfo("zone1", "zone1", "zone", 1) - domainInfo.AvailableResources = resource_info.NewResource(4000, 0, 0) - domainInfo.AllocatedResources = resource_info.EmptyResource() - domainInfo.AllocatedResources.BaseResource.ScalarResources()["pods"] = 0 - - // Create topology tree - topologyTree := &TopologyInfo{ - Name: "test-topology", - Domains: map[TopologyDomainID]*TopologyDomainInfo{"zone1": domainInfo}, - Root: NewTopologyDomainInfo("root", "datacenter", "cluster", 0), - TopologyResource: topology, - } - - // Set parent relationship - domainInfo.Parent = topologyTree.Root - - plugin.TopologyTrees["test-topology"] = topologyTree - return plugin - }, - setupEvent: func() *framework.Event { - return &framework.Event{ - Task: &pod_info.PodInfo{ - Pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, - Spec: corev1.PodSpec{ - NodeName: "node1", - }, - }, - NodeName: "node1", - }, - } - }, - podInfos: map[common_info.PodID]*pod_info.PodInfo{ - "test-pod": { - AcceptedResource: resource_info.NewResourceRequirements(0, 1000, 0), - }, - }, - expectedAllocs: map[string]map[string]int64{ - "test-topology": { - "zone1": 1000, // Expect 1 CPU allocation - }, - }, - }, - { - name: "MultiLevelTopology", - setupTopology: func() *topologyPlugin { - plugin := &topologyPlugin{ - enabled: true, - TopologyTrees: make(map[string]*TopologyInfo), - } - - // Create topology resource - topology := &kueuev1alpha1.Topology{ - ObjectMeta: metav1.ObjectMeta{Name: "multi-topology"}, - Spec: kueuev1alpha1.TopologySpec{ - Levels: []kueuev1alpha1.TopologyLevel{ - {NodeLabel: "zone"}, - {NodeLabel: "rack"}, - }, - }, - } - - // Create domain hierarchy - zoneDomain := NewTopologyDomainInfo("zone1", "zone1", "zone", 1) - zoneDomain.AvailableResources = resource_info.NewResource(8, 16, 0) - zoneDomain.AllocatedResources = resource_info.EmptyResource() - zoneDomain.AllocatedResources.BaseResource.ScalarResources()["pods"] = 0 - - rackDomain := NewTopologyDomainInfo("zone1.rack1", "rack1", "rack", 2) - rackDomain.AvailableResources = resource_info.NewResource(4, 8, 0) - rackDomain.AllocatedResources = resource_info.EmptyResource() - rackDomain.AllocatedResources.BaseResource.ScalarResources()["pods"] = 0 - - // Create topology tree - topologyTree := &TopologyInfo{ - Name: "multi-topology", - Domains: map[TopologyDomainID]*TopologyDomainInfo{ - "zone1": zoneDomain, - "zone1.rack1": rackDomain, - }, - Root: NewTopologyDomainInfo("root", "datacenter", "cluster", 0), - TopologyResource: topology, - } - - // Set parent relationships - rackDomain.Parent = zoneDomain - zoneDomain.Parent = topologyTree.Root - - plugin.TopologyTrees["multi-topology"] = topologyTree - return plugin - }, - setupEvent: func() *framework.Event { - return &framework.Event{ - Task: &pod_info.PodInfo{ - Pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, - Spec: corev1.PodSpec{ - NodeName: "node1", - }, - }, - NodeName: "node1", - }, - } - }, - podInfos: map[common_info.PodID]*pod_info.PodInfo{ - "test-pod": { - AcceptedResource: resource_info.NewResourceRequirements(0, 1000, 0), - }, - }, - expectedAllocs: map[string]map[string]int64{ - "multi-topology": { - "zone1": 1000, // Expect 1 CPU allocation in zone - "zone1.rack1": 1000, // Expect 1 CPU allocation in rack - }, - }, - }, - { - name: "MultipleTopologies", - setupTopology: func() *topologyPlugin { - plugin := &topologyPlugin{ - enabled: true, - TopologyTrees: make(map[string]*TopologyInfo), - } - - // Create first topology - topology1 := &kueuev1alpha1.Topology{ - ObjectMeta: metav1.ObjectMeta{Name: "topology-1"}, - Spec: kueuev1alpha1.TopologySpec{ - Levels: []kueuev1alpha1.TopologyLevel{ - {NodeLabel: "zone"}, - }, - }, - } - - domain1 := NewTopologyDomainInfo("zone1", "zone1", "zone", 1) - domain1.AvailableResources = resource_info.NewResource(4, 8, 0) - domain1.AllocatedResources = resource_info.EmptyResource() - domain1.AllocatedResources.BaseResource.ScalarResources()["pods"] = 0 - - topologyTree1 := &TopologyInfo{ - Name: "topology-1", - Domains: map[TopologyDomainID]*TopologyDomainInfo{"zone1": domain1}, - Root: NewTopologyDomainInfo("root", "datacenter", "cluster", 0), - TopologyResource: topology1, - } - domain1.Parent = topologyTree1.Root - - // Create second topology - topology2 := &kueuev1alpha1.Topology{ - ObjectMeta: metav1.ObjectMeta{Name: "topology-2"}, - Spec: kueuev1alpha1.TopologySpec{ - Levels: []kueuev1alpha1.TopologyLevel{ - {NodeLabel: "region"}, - }, - }, - } - - domain2 := NewTopologyDomainInfo("region1", "region1", "region", 1) - domain2.AvailableResources = resource_info.NewResource(8, 16, 0) - domain2.AllocatedResources = resource_info.EmptyResource() - domain2.AllocatedResources.BaseResource.ScalarResources()["pods"] = 0 - - topologyTree2 := &TopologyInfo{ - Name: "topology-2", - Domains: map[TopologyDomainID]*TopologyDomainInfo{"region1": domain2}, - Root: NewTopologyDomainInfo("root", "datacenter", "cluster", 0), - TopologyResource: topology2, - } - domain2.Parent = topologyTree2.Root - - plugin.TopologyTrees["topology-1"] = topologyTree1 - plugin.TopologyTrees["topology-2"] = topologyTree2 - return plugin - }, - setupEvent: func() *framework.Event { - return &framework.Event{ - Task: &pod_info.PodInfo{ - Pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, - Spec: corev1.PodSpec{ - NodeName: "node1", - }, - }, - NodeName: "node1", - }, - } - }, - podInfos: map[common_info.PodID]*pod_info.PodInfo{ - "test-pod": { - AcceptedResource: resource_info.NewResourceRequirements(0, 1000, 0), - }, - }, - expectedAllocs: map[string]map[string]int64{ - "topology-1": { - "zone1": 1000, // Expect 1 CPU allocation - }, - "topology-2": { - "region1": 1000, // Expect 1 CPU allocation - }, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Setup topology plugin - plugin := tt.setupTopology() - - // Create mock session - ssn := &framework.Session{ - Nodes: map[string]*node_info.NodeInfo{ - "node1": { - Node: &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node1", - Labels: map[string]string{ - "zone": "zone1", - "rack": "rack1", - "region": "region1", - }, - }, - }, - PodInfos: tt.podInfos, - }, - }, - } - - // Create event - event := tt.setupEvent() - - // Get the allocate handler - allocateHandler := plugin.handleAllocate(ssn) - - // Execute the handler - allocateHandler(event) - - // Verify allocations - for topologyName, expectedDomains := range tt.expectedAllocs { - topologyTree, exists := plugin.TopologyTrees[topologyName] - assert.True(t, exists, "Topology %s should exist", topologyName) - - for domainID, expectedCPU := range expectedDomains { - domainInfo, exists := topologyTree.Domains[TopologyDomainID(domainID)] - assert.True(t, exists, "Domain %s should exist in topology %s", domainID, topologyName) - assert.Equal(t, expectedCPU, int64(domainInfo.AllocatedResources.Cpu()), - "Expected %d CPU allocation in domain %s of topology %s", expectedCPU, domainID, topologyName) - assert.Equal(t, int64(1), domainInfo.AllocatedResources.BaseResource.ScalarResources()["pods"], - "Expected 1 allocated pod in domain %s of topology %s", domainID, topologyName) - } - } - }) - } -} - -func TestTopologyPlugin_HandleDeallocate(t *testing.T) { - tests := []struct { - name string - setupTopology func() *topologyPlugin - setupEvent func() *framework.Event - podInfos map[common_info.PodID]*pod_info.PodInfo - expectedAllocs map[string]map[string]int64 // topologyName -> domainID -> expected CPU allocation - expectedPodsNum map[string]map[string]int - }{ - { - name: "SingleTopologySingleDomain", - setupTopology: func() *topologyPlugin { - plugin := &topologyPlugin{ - enabled: true, - TopologyTrees: make(map[string]*TopologyInfo), - } - - // Create topology resource - topology := &kueuev1alpha1.Topology{ - ObjectMeta: metav1.ObjectMeta{Name: "test-topology"}, - Spec: kueuev1alpha1.TopologySpec{ - Levels: []kueuev1alpha1.TopologyLevel{ - {NodeLabel: "zone"}, - }, - }, - } - - // Create domain info - domainInfo := NewTopologyDomainInfo("zone1", "zone1", "zone", 1) - domainInfo.AvailableResources = resource_info.NewResource(4000, 0, 0) - domainInfo.AllocatedResources = resource_info.NewResource(1000, 0, 0) - domainInfo.AllocatedResources.BaseResource.ScalarResources()["pods"] = 1 - - // Create topology tree - topologyTree := &TopologyInfo{ - Name: "test-topology", - Domains: map[TopologyDomainID]*TopologyDomainInfo{"zone1": domainInfo}, - Root: NewTopologyDomainInfo("root", "datacenter", "cluster", 0), - TopologyResource: topology, - } - - // Set parent relationship - domainInfo.Parent = topologyTree.Root - - plugin.TopologyTrees["test-topology"] = topologyTree - return plugin - }, - setupEvent: func() *framework.Event { - return &framework.Event{ - Task: &pod_info.PodInfo{ - Pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, - Spec: corev1.PodSpec{ - NodeName: "node1", - }, - }, - NodeName: "node1", - }, - } - }, - podInfos: map[common_info.PodID]*pod_info.PodInfo{ - "test-pod": { - AcceptedResource: resource_info.NewResourceRequirements(0, 1000, 0), - }, - }, - expectedAllocs: map[string]map[string]int64{ - "test-topology": { - "zone1": 0, // Expect 1 CPU allocation - }, - }, - expectedPodsNum: map[string]map[string]int{ - "test-topology": { - "zone1": 0, - }, - }, - }, - { - name: "MultiLevelTopology", - setupTopology: func() *topologyPlugin { - plugin := &topologyPlugin{ - enabled: true, - TopologyTrees: make(map[string]*TopologyInfo), - } - - // Create topology resource - topology := &kueuev1alpha1.Topology{ - ObjectMeta: metav1.ObjectMeta{Name: "multi-topology"}, - Spec: kueuev1alpha1.TopologySpec{ - Levels: []kueuev1alpha1.TopologyLevel{ - {NodeLabel: "zone"}, - {NodeLabel: "rack"}, - }, - }, - } - - // Create domain hierarchy - zoneDomain := NewTopologyDomainInfo("zone1", "zone1", "zone", 1) - zoneDomain.AvailableResources = resource_info.NewResource(8, 16, 0) - zoneDomain.AllocatedResources = resource_info.NewResource(3000, 0, 0) - zoneDomain.AllocatedResources.BaseResource.ScalarResources()["pods"] = 2 - - rackDomain := NewTopologyDomainInfo("zone1.rack1", "rack1", "rack", 2) - rackDomain.AvailableResources = resource_info.NewResource(4, 8, 0) - rackDomain.AllocatedResources = resource_info.NewResource(2000, 0, 0) - rackDomain.AllocatedResources.BaseResource.ScalarResources()["pods"] = 1 - - // Create topology tree - topologyTree := &TopologyInfo{ - Name: "multi-topology", - Domains: map[TopologyDomainID]*TopologyDomainInfo{ - "zone1": zoneDomain, - "zone1.rack1": rackDomain, - }, - Root: NewTopologyDomainInfo("root", "datacenter", "cluster", 0), - TopologyResource: topology, - } - - // Set parent relationships - rackDomain.Parent = zoneDomain - zoneDomain.Parent = topologyTree.Root - - plugin.TopologyTrees["multi-topology"] = topologyTree - return plugin - }, - setupEvent: func() *framework.Event { - return &framework.Event{ - Task: &pod_info.PodInfo{ - Pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, - Spec: corev1.PodSpec{ - NodeName: "node1", - }, - }, - NodeName: "node1", - }, - } - }, - podInfos: map[common_info.PodID]*pod_info.PodInfo{ - "test-pod": { - AcceptedResource: resource_info.NewResourceRequirements(0, 1000, 0), - }, - }, - expectedAllocs: map[string]map[string]int64{ - "multi-topology": { - "zone1": 2000, // Expect 1 CPU allocation in zone - "zone1.rack1": 1000, // Expect 1 CPU allocation in rack - }, - }, - expectedPodsNum: map[string]map[string]int{ - "multi-topology": { - "zone1": 1, - "zone1.rack1": 0, - }, - }, - }, - { - name: "MultipleTopologies", - setupTopology: func() *topologyPlugin { - plugin := &topologyPlugin{ - enabled: true, - TopologyTrees: make(map[string]*TopologyInfo), - } - - // Create first topology - topology1 := &kueuev1alpha1.Topology{ - ObjectMeta: metav1.ObjectMeta{Name: "topology-1"}, - Spec: kueuev1alpha1.TopologySpec{ - Levels: []kueuev1alpha1.TopologyLevel{ - {NodeLabel: "zone"}, - }, - }, - } - - domain1 := NewTopologyDomainInfo("zone1", "zone1", "zone", 1) - domain1.AvailableResources = resource_info.NewResource(4, 8, 0) - domain1.AllocatedResources = resource_info.NewResource(2000, 0, 0) - domain1.AllocatedResources.BaseResource.ScalarResources()["pods"] = 5 - - topologyTree1 := &TopologyInfo{ - Name: "topology-1", - Domains: map[TopologyDomainID]*TopologyDomainInfo{"zone1": domain1}, - Root: NewTopologyDomainInfo("root", "datacenter", "cluster", 0), - TopologyResource: topology1, - } - domain1.Parent = topologyTree1.Root - - // Create second topology - topology2 := &kueuev1alpha1.Topology{ - ObjectMeta: metav1.ObjectMeta{Name: "topology-2"}, - Spec: kueuev1alpha1.TopologySpec{ - Levels: []kueuev1alpha1.TopologyLevel{ - {NodeLabel: "region"}, - }, - }, - } - - domain2 := NewTopologyDomainInfo("region1", "region1", "region", 1) - domain2.AvailableResources = resource_info.NewResource(8, 16, 0) - domain2.AllocatedResources = resource_info.NewResource(1000, 0, 0) - domain2.AllocatedResources.BaseResource.ScalarResources()["pods"] = 2 - - topologyTree2 := &TopologyInfo{ - Name: "topology-2", - Domains: map[TopologyDomainID]*TopologyDomainInfo{"region1": domain2}, - Root: NewTopologyDomainInfo("root", "datacenter", "cluster", 0), - TopologyResource: topology2, - } - domain2.Parent = topologyTree2.Root - - plugin.TopologyTrees["topology-1"] = topologyTree1 - plugin.TopologyTrees["topology-2"] = topologyTree2 - return plugin - }, - setupEvent: func() *framework.Event { - return &framework.Event{ - Task: &pod_info.PodInfo{ - Pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, - Spec: corev1.PodSpec{ - NodeName: "node1", - }, - }, - NodeName: "node1", - }, - } - }, - podInfos: map[common_info.PodID]*pod_info.PodInfo{ - "test-pod": { - AcceptedResource: resource_info.NewResourceRequirements(0, 1000, 0), - }, - }, - expectedAllocs: map[string]map[string]int64{ - "topology-1": { - "zone1": 1000, // Expect 1 CPU allocation - }, - "topology-2": { - "region1": 0, // Expect 1 CPU allocation - }, - }, - expectedPodsNum: map[string]map[string]int{ - "topology-1": { - "zone1": 4, - }, - "topology-2": { - "region1": 1, - }, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Setup topology plugin - plugin := tt.setupTopology() + assert.Equal(t, "test-block-2", blockDomains["test-block-2"].Name) + assert.Equal(t, 1, blockDomains["test-block-2"].Depth) - // Create mock session - ssn := &framework.Session{ - Nodes: map[string]*node_info.NodeInfo{ - "node1": { - Node: &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node1", - Labels: map[string]string{ - "zone": "zone1", - "rack": "rack1", - "region": "region1", - }, - }, - }, - PodInfos: tt.podInfos, - }, - }, - } - - // Create event - event := tt.setupEvent() - - // Get the allocate handler - deallocateHandler := plugin.handleDeallocate(ssn) - - // Execute the handler - deallocateHandler(event) - - // Verify allocations - for topologyName, expectedDomains := range tt.expectedAllocs { - topologyTree, exists := plugin.TopologyTrees[topologyName] - assert.True(t, exists, "Topology %s should exist", topologyName) - - for domainID, expectedCPU := range expectedDomains { - domainInfo, exists := topologyTree.Domains[TopologyDomainID(domainID)] - assert.True(t, exists, "Domain %s should exist in topology %s", domainID, topologyName) - assert.Equal(t, expectedCPU, int64(domainInfo.AllocatedResources.Cpu()), - "Expected %d CPU allocation in domain %s of topology %s", expectedCPU, domainID, topologyName) - } - } - - // Verify pods - for topologyName, expectedDomains := range tt.expectedPodsNum { - topologyTree, exists := plugin.TopologyTrees[topologyName] - assert.True(t, exists, "Topology %s should exist", topologyName) - - for domainID, expectedPodsNum := range expectedDomains { - domainInfo, exists := topologyTree.Domains[TopologyDomainID(domainID)] - assert.True(t, exists, "Domain %s should exist in topology %s", domainID, topologyName) - assert.Equal(t, int64(expectedPodsNum), domainInfo.AllocatedResources.BaseResource.ScalarResources()["pods"], - "Expected 1 allocated pod in domain %s of topology %s", domainID, topologyName) - } - } - }) - } + assert.Equal(t, "test-rack-1", rackDomains["test-block-2.test-rack-1"].Name) + assert.Equal(t, "test-block-2", rackDomains["test-block-2.test-rack-1"].Parent.Name) + assert.Equal(t, 2, rackDomains["test-block-2.test-rack-1"].Depth) } diff --git a/pkg/scheduler/plugins/topology/topology_structs.go b/pkg/scheduler/plugins/topology/topology_structs.go index df3446c70..dd8fcd9fb 100644 --- a/pkg/scheduler/plugins/topology/topology_structs.go +++ b/pkg/scheduler/plugins/topology/topology_structs.go @@ -7,7 +7,6 @@ import ( "strings" "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/node_info" - "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/resource_info" kueuev1alpha1 "sigs.k8s.io/kueue/apis/kueue/v1alpha1" ) @@ -20,7 +19,10 @@ type TopologyInfo struct { Root *TopologyDomainInfo // Map of all domains by their ID for quick lookup - Domains map[TopologyDomainID]*TopologyDomainInfo + //Domains map[TopologyDomainID]*TopologyDomainInfo + + // Map of all domains by their level for quick lookup + DomainsByLevel map[string]map[TopologyDomainID]*TopologyDomainInfo // Name of this topology configuration Name string @@ -49,34 +51,22 @@ type TopologyDomainInfo struct { // Nodes that belong to this domain Nodes map[string]*node_info.NodeInfo - // Total available resources in this domain - AvailableResources *resource_info.Resource - - // Total allocated resources in this domain - AllocatedResources *resource_info.Resource - // Number of pods that can be allocated in this domain for the job AllocatablePods int - // List of resources requested by each pod in the job this tree is built for - RequestedResources *resource_info.ResourceRequirements - // Depth in the tree from root (0 for root) Depth int } func NewTopologyDomainInfo(id TopologyDomainID, name, level string, depth int) *TopologyDomainInfo { return &TopologyDomainInfo{ - ID: id, - Name: name, - Level: level, - Parent: nil, - Children: []*TopologyDomainInfo{}, - Nodes: map[string]*node_info.NodeInfo{}, - AvailableResources: resource_info.EmptyResource(), - AllocatedResources: resource_info.EmptyResource(), - RequestedResources: nil, - Depth: depth, + ID: id, + Name: name, + Level: level, + Parent: nil, + Children: []*TopologyDomainInfo{}, + Nodes: map[string]*node_info.NodeInfo{}, + Depth: depth, } } @@ -93,15 +83,6 @@ func calcDomainId(leafLevelIndex int, levels []kueuev1alpha1.TopologyLevel, node return TopologyDomainID(strings.Join(domainsNames, ".")) } -func calcLeafDomainId(topologyResource *kueuev1alpha1.Topology, nodeLabels map[string]string) TopologyDomainID { - return calcDomainId(len(topologyResource.Spec.Levels)-1, topologyResource.Spec.Levels, nodeLabels) -} - func (t *TopologyDomainInfo) AddNode(nodeInfo *node_info.NodeInfo) { t.Nodes[nodeInfo.Node.Name] = nodeInfo - t.AvailableResources.Add(nodeInfo.Allocatable) - t.AllocatedResources.Add(nodeInfo.Used) - // TODO: do not take into account fractional gpus - t.AllocatedResources.BaseResource.ScalarResources()["pods"] = - t.AllocatedResources.BaseResource.ScalarResources()["pods"] + int64(len(nodeInfo.PodInfos)) }