Skip to content

Commit 2ed990b

Browse files
authored
Clean up filters (#802)
1 parent ecfe869 commit 2ed990b

File tree

3 files changed

+45
-57
lines changed

3 files changed

+45
-57
lines changed

pkg/epp/scheduling/config.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@ limitations under the License.
1616

1717
package scheduling
1818

19-
import "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
19+
import (
20+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
21+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/filter"
22+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/picker"
23+
)
2024

2125
// NewSchedulerConfig creates a new SchedulerConfig object with the given plugins.
2226
func NewSchedulerConfig(preSchedulePlugins []plugins.PreSchedule, filters []plugins.Filter, scorers map[plugins.Scorer]int,
@@ -39,16 +43,14 @@ type SchedulerConfig struct {
3943
postSchedulePlugins []plugins.PostSchedule
4044
}
4145

42-
var defPlugin = &defaultPlugin{}
43-
4446
// When the scheduler is initialized with NewScheduler function, this config will be used as default.
4547
// it's possible to call NewSchedulerWithConfig to pass a different argument.
4648

4749
// For build time plugins changes, it's recommended to change the defaultConfig variable in this file.
4850
var defaultConfig = &SchedulerConfig{
4951
preSchedulePlugins: []plugins.PreSchedule{},
50-
filters: []plugins.Filter{defPlugin},
52+
filters: []plugins.Filter{&filter.SheddableRequestFilter{}, filter.LowLatencyFilter},
5153
scorers: map[plugins.Scorer]int{},
52-
picker: defPlugin,
54+
picker: &picker.RandomPicker{},
5355
postSchedulePlugins: []plugins.PostSchedule{},
5456
}

pkg/epp/scheduling/plugins/filter/filter.go

+38
Original file line numberDiff line numberDiff line change
@@ -276,3 +276,41 @@ func (pp podPredicate) and(another podPredicate) podPredicate {
276276
return pp(req, pod) && another(req, pod)
277277
}
278278
}
279+
280+
var LowLatencyFilter = &DecisionTreeFilter{
281+
Current: LowQueueFilter,
282+
NextOnSuccess: &DecisionTreeFilter{
283+
Current: LoRAAffinityFilter,
284+
NextOnSuccessOrFailure: &DecisionTreeFilter{
285+
Current: LeastQueueFilter,
286+
NextOnSuccessOrFailure: &DecisionTreeFilter{
287+
Current: LeastKVCacheFilter,
288+
},
289+
},
290+
},
291+
NextOnFailure: &DecisionTreeFilter{
292+
Current: LeastQueueFilter,
293+
NextOnSuccessOrFailure: &DecisionTreeFilter{
294+
Current: LoRAAffinityFilter,
295+
NextOnSuccessOrFailure: &DecisionTreeFilter{
296+
Current: LeastKVCacheFilter,
297+
},
298+
},
299+
},
300+
}
301+
302+
type SheddableRequestFilter struct{}
303+
304+
func (p *SheddableRequestFilter) Name() string {
305+
return "SheddableRequestFilter"
306+
}
307+
308+
func (p *SheddableRequestFilter) Filter(ctx *types.SchedulingContext, pods []types.Pod) []types.Pod {
309+
if ctx.Req.Critical {
310+
// Allow all pods to pass through if the request is critical, even if all pods reach their capacity.
311+
return pods
312+
}
313+
314+
// Only allow pods that have enough capacity to handle the request.
315+
return HasCapacityFilter.Filter(ctx, pods)
316+
}

pkg/epp/scheduling/scheduler.go

-52
Original file line numberDiff line numberDiff line change
@@ -26,47 +26,11 @@ import (
2626
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
2727
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
2828
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
29-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/filter"
30-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/picker"
3129
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
3230
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
3331
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3432
)
3533

36-
var (
37-
lowLatencyFilter = &filter.DecisionTreeFilter{
38-
Current: filter.LowQueueFilter,
39-
NextOnSuccess: &filter.DecisionTreeFilter{
40-
Current: filter.LoRAAffinityFilter,
41-
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
42-
Current: filter.LeastQueueFilter,
43-
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
44-
Current: filter.LeastKVCacheFilter,
45-
},
46-
},
47-
},
48-
NextOnFailure: &filter.DecisionTreeFilter{
49-
Current: filter.LeastQueueFilter,
50-
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
51-
Current: filter.LoRAAffinityFilter,
52-
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
53-
Current: filter.LeastKVCacheFilter,
54-
},
55-
},
56-
},
57-
}
58-
59-
sheddableRequestFilter = &filter.DecisionTreeFilter{
60-
// When there is at least one model server that's not queuing requests, and still has KV
61-
// cache below a certain threshold, we consider this model server has capacity to handle
62-
// a sheddable request without impacting critical requests.
63-
Current: filter.HasCapacityFilter,
64-
NextOnSuccess: lowLatencyFilter,
65-
// If all pods are queuing or running above the KVCache threshold, we drop the sheddable
66-
// request to make room for critical requests. for this, we don't define nextOnFailure.
67-
}
68-
)
69-
7034
func NewScheduler(datastore Datastore) *Scheduler {
7135
return NewSchedulerWithConfig(datastore, defaultConfig)
7236
}
@@ -206,19 +170,3 @@ func (s *Scheduler) runPostSchedulePlugins(ctx *types.SchedulingContext, res *ty
206170
metrics.RecordSchedulerPluginProcessingLatency(plugins.PostSchedulePluginType, plugin.Name(), time.Since(before))
207171
}
208172
}
209-
210-
type defaultPlugin struct {
211-
picker.RandomPicker
212-
}
213-
214-
func (p *defaultPlugin) Name() string {
215-
return "DefaultPlugin"
216-
}
217-
218-
func (p *defaultPlugin) Filter(ctx *types.SchedulingContext, pods []types.Pod) []types.Pod {
219-
if ctx.Req.Critical {
220-
return lowLatencyFilter.Filter(ctx, pods)
221-
}
222-
223-
return sheddableRequestFilter.Filter(ctx, pods)
224-
}

0 commit comments

Comments
 (0)