Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions pkg/scheduler/cache/usagedb/api/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package api

import (
"maps"
"time"

"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/queue_info"
Expand All @@ -19,6 +20,15 @@ type UsageDBConfig struct {
UsageParams *UsageParams `yaml:"usageParams" json:"usageParams"`
}

func (c *UsageDBConfig) DeepCopy() *UsageDBConfig {
return &UsageDBConfig{
ClientType: c.ClientType,
ConnectionString: c.ConnectionString,
ConnectionStringEnvVar: c.ConnectionStringEnvVar,
UsageParams: c.UsageParams.DeepCopy(),
}
}

// GetUsageParams returns the usage params if set, and default params if not set.
func (c *UsageDBConfig) GetUsageParams() *UsageParams {
up := UsageParams{}
Expand All @@ -43,3 +53,13 @@ type UsageParams struct {
// ExtraParams are extra parameters for the usage db client, which are client specific.
ExtraParams map[string]string `yaml:"extraParams" json:"extraParams"`
}

func (up *UsageParams) DeepCopy() *UsageParams {
return &UsageParams{
HalfLifePeriod: up.HalfLifePeriod,
WindowSize: up.WindowSize,
WindowType: up.WindowType,
TumblingWindowCronString: up.TumblingWindowCronString,
ExtraParams: maps.Clone(up.ExtraParams),
}
}
39 changes: 39 additions & 0 deletions pkg/scheduler/conf/default_conf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2025 NVIDIA CORPORATION
// SPDX-License-Identifier: Apache-2.0

package conf

func GetDefaultSchedulerConfiguration() *SchedulerConfiguration {
return &SchedulerConfiguration{
Actions: "allocate, consolidation, reclaim, preempt, stalegangeviction",
Tiers: []Tier{
{
Plugins: []PluginOption{
{Name: "predicates"},
{Name: "proportion"},
{Name: "priority"},
{Name: "elastic"},
{Name: "kubeflow"},
{Name: "ray"},
{Name: "nodeavailability"},
{Name: "gpusharingorder"},
{Name: "gpupack"},
{Name: "resourcetype"},
{Name: "subgrouporder"},
{Name: "taskorder"},
{Name: "nominatednode"},
{Name: "dynamicresources"},
{
Name: "nodeplacement",
Arguments: map[string]string{
"cpu": "binpack",
"gpu": "binpack",
},
},
{Name: "minruntime"},
{Name: "topology"},
},
},
},
}
}
45 changes: 45 additions & 0 deletions pkg/scheduler/conf/scheduler_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ limitations under the License.
package conf

import (
"maps"
"slices"
"time"

"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -59,33 +61,76 @@ type SchedulerConfiguration struct {
UsageDBConfig *usagedbapi.UsageDBConfig `yaml:"usageDBConfig,omitempty" json:"usageDBConfig,omitempty"`
}

func (s *SchedulerConfiguration) DeepCopy() *SchedulerConfiguration {
copy := SchedulerConfiguration{
Actions: s.Actions,
Tiers: slices.Clone(s.Tiers),
QueueDepthPerAction: maps.Clone(s.QueueDepthPerAction),
UsageDBConfig: s.UsageDBConfig.DeepCopy(),
}

return &copy
}

// Tier defines plugin tier
type Tier struct {
Plugins []PluginOption `yaml:"plugins" json:"plugins"`
}

func (t *Tier) DeepCopy() *Tier {
copy := Tier{
Plugins: make([]PluginOption, len(t.Plugins)),
}
for i, plugin := range t.Plugins {
copy.Plugins[i] = *plugin.DeepCopy()
}
return &copy
}

// PluginOption defines the options of plugin
type PluginOption struct {
// The name of Plugin
Name string `yaml:"name" json:"name"`

// JobOrderDisabled defines whether jobOrderFn is disabled
// Deprecated: To disable, don't set this plugin in the config
JobOrderDisabled bool `yaml:"disableJobOrder" json:"disableJobOrder"`
// TaskOrderDisabled defines whether taskOrderFn is disabled
TaskOrderDisabled bool `yaml:"disableTaskOrder" json:"disableTaskOrder"`
// Deprecated: To disable, don't set this plugin in the config
// PreemptableDisabled defines whether preemptableFn is disabled
PreemptableDisabled bool `yaml:"disablePreemptable" json:"disablePreemptable"`
// ReclaimableDisabled defines whether reclaimableFn is disabled
// Deprecated: To disable, don't set this plugin in the config
ReclaimableDisabled bool `yaml:"disableReclaimable" json:"disableReclaimable"`
// QueueOrderDisabled defines whether queueOrderFn is disabled
QueueOrderDisabled bool `yaml:"disableQueueOrder" json:"disableQueueOrder"`
// Deprecated: To disable, don't set this plugin in the config
// PredicateDisabled defines whether predicateFn is disabled
PredicateDisabled bool `yaml:"disablePredicate" json:"disablePredicate"`
// NodeOrderDisabled defines whether NodeOrderFn is disabled
// Deprecated: To disable, don't set this plugin in the config
NodeOrderDisabled bool `yaml:"disableNodeOrder" json:"disableNodeOrder"`

// Arguments defines the different arguments that can be given to different plugins
Arguments map[string]string `yaml:"arguments" json:"arguments"`
}

func (p *PluginOption) DeepCopy() *PluginOption {
copy := PluginOption{
Name: p.Name,
Arguments: maps.Clone(p.Arguments),
}
copy.JobOrderDisabled = p.JobOrderDisabled
copy.TaskOrderDisabled = p.TaskOrderDisabled
copy.PreemptableDisabled = p.PreemptableDisabled
copy.ReclaimableDisabled = p.ReclaimableDisabled
copy.QueueOrderDisabled = p.QueueOrderDisabled
copy.PredicateDisabled = p.PredicateDisabled
copy.NodeOrderDisabled = p.NodeOrderDisabled
return &copy
}

type SchedulingNodePoolParams struct {
NodePoolLabelKey string
NodePoolLabelValue string
Expand Down
36 changes: 2 additions & 34 deletions pkg/scheduler/conf_util/scheduler_conf_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,42 +30,10 @@ import (
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/framework"
)

/*
defaultSchedulerConf is the default conf, unless overridden from the operator (in the scheduler configmap).
*/
const defaultSchedulerConf = `
actions: "allocate, consolidation, reclaim, preempt, stalegangeviction"
tiers:
- plugins:
- name: predicates
- name: proportion
- name: priority
- name: elastic
- name: kubeflow
- name: ray
- name: nodeavailability
- name: gpusharingorder
- name: gpupack
- name: resourcetype
- name: subgrouporder
- name: taskorder
- name: nominatednode
- name: dynamicresources
- name: nodeplacement
arguments:
cpu: binpack
gpu: binpack
- name: minruntime
- name: topology
`

func ResolveConfigurationFromFile(confPath string) (*conf.SchedulerConfiguration, error) {
schedulerConfStr, err := readSchedulerConf(confPath)
if err != nil {
return nil, err
}
defaultConfig := conf.GetDefaultSchedulerConfiguration()

defaultConfig, err := loadSchedulerConf(defaultSchedulerConf)
schedulerConfStr, err := readSchedulerConf(confPath)
if err != nil {
return nil, err
}
Expand Down
64 changes: 31 additions & 33 deletions test/e2e/modules/configurations/feature_flags/placement_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ package feature_flags
import (
"context"
"fmt"
"slices"
"strings"

"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/conf"
testContext "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/context"
)

Expand All @@ -24,38 +24,36 @@ const (
func SetPlacementStrategy(
ctx context.Context, testCtx *testContext.TestContext, strategy string,
) error {
return updateKaiSchedulerConfigMap(ctx, testCtx, func() (*config, error) {
placementArguments := map[string]string{
gpuResource: strategy, cpuResource: strategy,
}

innerConfig := config{}

actions := []string{"allocate"}
if placementArguments[gpuResource] != SpreadStrategy && placementArguments[cpuResource] != SpreadStrategy {
actions = append(actions, "consolidation")
}
actions = append(actions, []string{"reclaim", "preempt", "stalegangeviction"}...)

innerConfig.Actions = strings.Join(actions, ", ")

innerConfig.Tiers = slices.Clone(defaultKaiSchedulerPlugins)
innerConfig.Tiers[0].Plugins = append(
innerConfig.Tiers[0].Plugins,
plugin{Name: fmt.Sprintf("gpu%s", strings.Replace(placementArguments[gpuResource], "bin", "", 1))},
plugin{
Name: "nodeplacement",
Arguments: placementArguments,
},
placementArguments := map[string]string{
gpuResource: strategy, cpuResource: strategy,
}

config := conf.SchedulerConfiguration{}

actions := []string{"allocate"}
if placementArguments[gpuResource] != SpreadStrategy && placementArguments[cpuResource] != SpreadStrategy {
actions = append(actions, "consolidation")
}
actions = append(actions, []string{"reclaim", "preempt", "stalegangeviction"}...)

config.Actions = strings.Join(actions, ", ")

config.Tiers = conf.GetDefaultSchedulerConfiguration().Tiers
config.Tiers[0].Plugins = append(
config.Tiers[0].Plugins,
conf.PluginOption{Name: fmt.Sprintf("gpu%s", strings.Replace(placementArguments[gpuResource], "bin", "", 1))},
conf.PluginOption{
Name: "nodeplacement",
Arguments: placementArguments,
},
)

if placementArguments[gpuResource] == binpackStrategy {
config.Tiers[0].Plugins = append(
config.Tiers[0].Plugins,
conf.PluginOption{Name: "gpusharingorder"},
)
}

if placementArguments[gpuResource] == binpackStrategy {
innerConfig.Tiers[0].Plugins = append(
innerConfig.Tiers[0].Plugins,
plugin{Name: "gpusharingorder"},
)
}

return &innerConfig, nil
})
return updateKaiSchedulerConfigMap(ctx, testCtx, &config)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/conf"
"github.com/NVIDIA/KAI-scheduler/test/e2e/modules/constant"
testContext "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/context"
"github.com/NVIDIA/KAI-scheduler/test/e2e/modules/wait"
Expand All @@ -21,58 +22,13 @@ const (
schedulerConfigDataKey = "config.yaml"
)

type config struct {
Actions string `yaml:"actions"`
Tiers []tier `yaml:"tiers,omitempty"`
QueueDepthPerAction map[string]int `yaml:"queueDepthPerAction,omitempty"`
}

type tier struct {
Plugins []plugin `yaml:"plugins"`
}

type plugin struct {
Name string `yaml:"name"`
Arguments map[string]string `yaml:"arguments,omitempty"`
}

var defaultKaiSchedulerPlugins = []tier{
{
Plugins: []plugin{
{Name: "predicates"},
{Name: "proportion"},
{Name: "priority"},
{Name: "nodeavailability"},
{Name: "resourcetype"},
{Name: "podaffinity"},
{Name: "elastic"},
{Name: "kubeflow"},
{Name: "ray"},
{Name: "taskorder"},
{Name: "subgrouporder"},
{Name: "nominatednode"},
{Name: "dynamicresources"},
{Name: "minruntime"},
{Name: "topology"},
},
},
}

type getSchedulerConfigMapData func() (*config, error)

func updateKaiSchedulerConfigMap(ctx context.Context, testCtx *testContext.TestContext, getCmData getSchedulerConfigMapData) error {
func updateKaiSchedulerConfigMap(ctx context.Context, testCtx *testContext.TestContext, config *conf.SchedulerConfiguration) error {
schedulerConfig, err := testCtx.KubeClientset.CoreV1().ConfigMaps(constant.SystemPodsNamespace).
Get(ctx, schedulerConfigMapName, metav1.GetOptions{})
if err != nil {
return err
}

innerConfig, err := getCmData()
if err != nil {
return err
}

data, marshalErr := yaml.Marshal(&innerConfig)
data, marshalErr := yaml.Marshal(config)
if marshalErr != nil {
return marshalErr
}
Expand Down
Loading