Skip to content

Commit 10ec261

Browse files
authored
Add prefix cache aware scheduling (#768)
* Add prefix cache aware scheduling * Replace scheduler v2 with config v2 * Add score weight to XXScorerConfig * Address comments * Clean up * Change to use container/list lib * cleanup * Add TODO * make linter happy
1 parent 64a37d1 commit 10ec261

14 files changed

+851
-6
lines changed

cmd/epp/main.go

+42
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"k8s.io/client-go/rest"
3535
"k8s.io/component-base/metrics/legacyregistry"
3636
ctrl "sigs.k8s.io/controller-runtime"
37+
"sigs.k8s.io/controller-runtime/pkg/log"
3738
"sigs.k8s.io/controller-runtime/pkg/log/zap"
3839
"sigs.k8s.io/controller-runtime/pkg/manager"
3940
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
@@ -43,7 +44,13 @@ import (
4344
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
4445
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
4546
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
47+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
48+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/filter"
49+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/picker"
50+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/prefix"
51+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/scorer"
4652
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
53+
envutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
4754
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
4855
)
4956

@@ -107,8 +114,22 @@ var (
107114
"Prometheus metric for the LoRA info metrics (must be in vLLM label format).")
108115

109116
setupLog = ctrl.Log.WithName("setup")
117+
118+
// Environment variables
119+
schedulerV2 = envutil.GetEnvString("EXPERIMENTAL_USE_SCHEDULER_V2", "false", setupLog)
120+
prefixCacheScheduling = envutil.GetEnvString("ENABLE_PREFIX_CACHE_SCHEDULING", "false", setupLog)
110121
)
111122

123+
func loadPrefixCacheConfig() prefix.Config {
124+
baseLogger := log.Log.WithName("env-config")
125+
126+
return prefix.Config{
127+
HashBlockSize: envutil.GetEnvInt("PREFIX_CACHE_HASH_BLOCK_SIZE", prefix.DefaultHashBlockSize, baseLogger),
128+
MaxPrefixBlocksToMatch: envutil.GetEnvInt("PREFIX_CACHE_MAX_PREFIX_BLOCKS", prefix.DefaultMaxPrefixBlocks, baseLogger),
129+
LRUIndexerCapacity: envutil.GetEnvInt("PREFIX_CACHE_LRU_CAPACITY", prefix.DefaultLRUIndexerCapacity, baseLogger),
130+
}
131+
}
132+
112133
func main() {
113134
if err := run(); err != nil {
114135
os.Exit(1)
@@ -172,6 +193,27 @@ func run() error {
172193
datastore := datastore.NewDatastore(ctx, pmf)
173194

174195
scheduler := scheduling.NewScheduler(datastore)
196+
if schedulerV2 == "true" {
197+
queueScorerWeight := envutil.GetEnvInt("QUEUE_SCORE_WEIGHT", scorer.DefaultQueueScorerWeight, setupLog)
198+
kvCacheScorerWeight := envutil.GetEnvInt("KV_CACHE_SCORE_WEIGHT", scorer.DefaultKVCacheScorerWeight, setupLog)
199+
scorers := map[plugins.Scorer]int{
200+
&scorer.QueueScorer{}: queueScorerWeight,
201+
&scorer.KVCacheScorer{}: kvCacheScorerWeight,
202+
}
203+
schedConfigOpts := []scheduling.ConfigOption{}
204+
if prefixCacheScheduling == "true" {
205+
prefixScorerWeight := envutil.GetEnvInt("PREFIX_CACHE_SCORE_WEIGHT", prefix.DefaultScorerWeight, setupLog)
206+
schedConfigOpts = append(schedConfigOpts, scheduling.AddPrefixPlugin(loadPrefixCacheConfig(), prefixScorerWeight))
207+
}
208+
schedulerConfig := scheduling.NewSchedulerConfig(
209+
[]plugins.PreSchedule{},
210+
[]plugins.Filter{filter.NewSheddableCapacityFilter()},
211+
scorers,
212+
picker.NewMaxScorePicker(),
213+
[]plugins.PostSchedule{},
214+
schedConfigOpts...)
215+
scheduler = scheduling.NewSchedulerWithConfig(datastore, schedulerConfig)
216+
}
175217
serverRunner := &runserver.ExtProcServerRunner{
176218
GrpcPort: *grpcPort,
177219
DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace,

pkg/epp/metrics/metrics.go

+56
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,40 @@ var (
209209
[]string{"plugin_type", "plugin_name"},
210210
)
211211

212+
// Prefix indexer Metrics
213+
PrefixCacheSize = compbasemetrics.NewGaugeVec(
214+
&compbasemetrics.GaugeOpts{
215+
Subsystem: InferenceExtension,
216+
Name: "prefix_indexer_size",
217+
Help: "Size of the prefix indexer.",
218+
StabilityLevel: compbasemetrics.ALPHA,
219+
},
220+
[]string{},
221+
)
222+
223+
PrefixCacheHitRatio = compbasemetrics.NewHistogramVec(
224+
&compbasemetrics.HistogramOpts{
225+
Subsystem: InferenceExtension,
226+
Name: "prefix_indexer_hit_ratio",
227+
Help: "Ratio of prefix length matched to total prefix length in the cache lookup.",
228+
// Buckets from 0.0 to 1.0 in increments
229+
Buckets: []float64{0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0},
230+
StabilityLevel: compbasemetrics.ALPHA,
231+
},
232+
[]string{},
233+
)
234+
235+
PrefixCacheHitLength = compbasemetrics.NewHistogramVec(
236+
&compbasemetrics.HistogramOpts{
237+
Subsystem: InferenceExtension,
238+
Name: "prefix_indexer_hit_bytes",
239+
Help: "Length of the prefix match in number of bytes in the cache lookup.",
240+
Buckets: []float64{0, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536},
241+
StabilityLevel: compbasemetrics.ALPHA,
242+
},
243+
[]string{},
244+
)
245+
212246
// Info Metrics
213247
InferenceExtensionInfo = compbasemetrics.NewGaugeVec(
214248
&compbasemetrics.GaugeOpts{
@@ -244,6 +278,10 @@ func Register() {
244278
legacyregistry.MustRegister(SchedulerE2ELatency)
245279

246280
legacyregistry.MustRegister(InferenceExtensionInfo)
281+
282+
legacyregistry.MustRegister(PrefixCacheSize)
283+
legacyregistry.MustRegister(PrefixCacheHitRatio)
284+
legacyregistry.MustRegister(PrefixCacheHitLength)
247285
})
248286
}
249287

@@ -352,6 +390,24 @@ func RecordSchedulerE2ELatency(duration time.Duration) {
352390
SchedulerE2ELatency.WithLabelValues().Observe(duration.Seconds())
353391
}
354392

393+
// RecordPrefixCacheSize records the size of the prefix indexer in megabytes.
394+
func RecordPrefixCacheSize(size int64) {
395+
PrefixCacheSize.WithLabelValues().Set(float64(size))
396+
}
397+
398+
// RecordPrefixCacheMatch records both the hit ratio and hit length for a prefix indexer match.
399+
// matchedLength is the number of characters that matched, and totalLength is the total prefix length.
400+
func RecordPrefixCacheMatch(matchedLength, totalLength int) {
401+
// Record the hit length metric
402+
PrefixCacheHitLength.WithLabelValues().Observe(float64(matchedLength))
403+
404+
// Record the hit ratio metric if totalLength is positive
405+
if totalLength > 0 {
406+
ratio := float64(matchedLength) / float64(totalLength)
407+
PrefixCacheHitRatio.WithLabelValues().Observe(ratio)
408+
}
409+
}
410+
355411
func RecordInferenceExtensionInfo() {
356412
if CommitSHA != "" {
357413
InferenceExtensionInfo.WithLabelValues(CommitSHA).Set(1)

pkg/epp/metrics/metrics_test.go

+103
Original file line numberDiff line numberDiff line change
@@ -664,3 +664,106 @@ func TestSchedulerE2ELatency(t *testing.T) {
664664
})
665665
}
666666
}
667+
668+
func TestPrefixCacheMetrics(t *testing.T) {
669+
const (
670+
PrefixCacheSizeMetric = InferenceExtension + "_prefix_indexer_size"
671+
PrefixCacheHitRatioMetric = InferenceExtension + "_prefix_indexer_hit_ratio"
672+
PrefixCacheHitLengthMetric = InferenceExtension + "_prefix_indexer_hit_bytes"
673+
)
674+
675+
type cacheMatchRecord struct {
676+
matchedLength int
677+
totalLength int
678+
}
679+
680+
scenario := struct {
681+
name string
682+
cacheSizes []int64
683+
cacheMatches []cacheMatchRecord
684+
}{
685+
name: "multiple cache metrics",
686+
cacheSizes: []int64{1024, 2048, 4096},
687+
cacheMatches: []cacheMatchRecord{
688+
{
689+
matchedLength: 5,
690+
totalLength: 10,
691+
},
692+
{
693+
matchedLength: 0,
694+
totalLength: 10,
695+
},
696+
{
697+
matchedLength: 10,
698+
totalLength: 10,
699+
},
700+
{
701+
matchedLength: 7,
702+
totalLength: 10,
703+
},
704+
{
705+
matchedLength: 64,
706+
totalLength: 128,
707+
},
708+
{
709+
matchedLength: 0,
710+
totalLength: 128,
711+
},
712+
},
713+
}
714+
715+
Register()
716+
t.Run(scenario.name, func(t *testing.T) {
717+
// Record cache size metrics
718+
for _, size := range scenario.cacheSizes {
719+
RecordPrefixCacheSize(size)
720+
}
721+
722+
// Record cache match metrics (both hit ratio and hit length)
723+
for _, match := range scenario.cacheMatches {
724+
RecordPrefixCacheMatch(match.matchedLength, match.totalLength)
725+
}
726+
727+
// Verify cache size metrics
728+
wantCacheSizeMetrics, err := os.Open("testdata/prefix_indexer_size_metric")
729+
defer func() {
730+
if err := wantCacheSizeMetrics.Close(); err != nil {
731+
t.Error(err)
732+
}
733+
}()
734+
if err != nil {
735+
t.Fatal(err)
736+
}
737+
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantCacheSizeMetrics, PrefixCacheSizeMetric); err != nil {
738+
t.Error(err)
739+
}
740+
741+
// Verify hit ratio metrics
742+
wantHitRatioMetrics, err := os.Open("testdata/prefix_indexer_hit_ratio_metric")
743+
defer func() {
744+
if err := wantHitRatioMetrics.Close(); err != nil {
745+
t.Error(err)
746+
}
747+
}()
748+
if err != nil {
749+
t.Fatal(err)
750+
}
751+
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantHitRatioMetrics, PrefixCacheHitRatioMetric); err != nil {
752+
t.Error(err)
753+
}
754+
755+
// Verify hit length metrics
756+
wantHitLengthMetrics, err := os.Open("testdata/prefix_indexer_hit_bytes_metric")
757+
defer func() {
758+
if err := wantHitLengthMetrics.Close(); err != nil {
759+
t.Error(err)
760+
}
761+
}()
762+
if err != nil {
763+
t.Fatal(err)
764+
}
765+
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantHitLengthMetrics, PrefixCacheHitLengthMetric); err != nil {
766+
t.Error(err)
767+
}
768+
})
769+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# HELP inference_extension_prefix_indexer_hit_bytes [ALPHA] Length of the prefix match in number of bytes in the cache lookup.
2+
# TYPE inference_extension_prefix_indexer_hit_bytes histogram
3+
inference_extension_prefix_indexer_hit_bytes_bucket{le="0"} 2
4+
inference_extension_prefix_indexer_hit_bytes_bucket{le="16"} 5
5+
inference_extension_prefix_indexer_hit_bytes_bucket{le="32"} 5
6+
inference_extension_prefix_indexer_hit_bytes_bucket{le="64"} 6
7+
inference_extension_prefix_indexer_hit_bytes_bucket{le="128"} 6
8+
inference_extension_prefix_indexer_hit_bytes_bucket{le="256"} 6
9+
inference_extension_prefix_indexer_hit_bytes_bucket{le="512"} 6
10+
inference_extension_prefix_indexer_hit_bytes_bucket{le="1024"} 6
11+
inference_extension_prefix_indexer_hit_bytes_bucket{le="2048"} 6
12+
inference_extension_prefix_indexer_hit_bytes_bucket{le="4096"} 6
13+
inference_extension_prefix_indexer_hit_bytes_bucket{le="8192"} 6
14+
inference_extension_prefix_indexer_hit_bytes_bucket{le="16384"} 6
15+
inference_extension_prefix_indexer_hit_bytes_bucket{le="32768"} 6
16+
inference_extension_prefix_indexer_hit_bytes_bucket{le="65536"} 6
17+
inference_extension_prefix_indexer_hit_bytes_bucket{le="+Inf"} 6
18+
inference_extension_prefix_indexer_hit_bytes_sum 86
19+
inference_extension_prefix_indexer_hit_bytes_count 6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# HELP inference_extension_prefix_indexer_hit_ratio [ALPHA] Ratio of prefix length matched to total prefix length in the cache lookup.
2+
# TYPE inference_extension_prefix_indexer_hit_ratio histogram
3+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0"} 2
4+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.1"} 2
5+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.2"} 2
6+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.3"} 2
7+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.4"} 2
8+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.5"} 4
9+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.6"} 4
10+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.7"} 5
11+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.8"} 5
12+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.9"} 5
13+
inference_extension_prefix_indexer_hit_ratio_bucket{le="1"} 6
14+
inference_extension_prefix_indexer_hit_ratio_bucket{le="+Inf"} 6
15+
inference_extension_prefix_indexer_hit_ratio_sum 2.7
16+
inference_extension_prefix_indexer_hit_ratio_count 6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# HELP inference_extension_prefix_indexer_size [ALPHA] Size of the prefix indexer.
2+
# TYPE inference_extension_prefix_indexer_size gauge
3+
inference_extension_prefix_indexer_size{} 4096

pkg/epp/scheduling/config.go

+20-2
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,23 @@ package scheduling
1818

1919
import (
2020
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
21+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/prefix"
2122
)
2223

2324
// NewSchedulerConfig creates a new SchedulerConfig object with the given plugins.
2425
func NewSchedulerConfig(preSchedulePlugins []plugins.PreSchedule, filters []plugins.Filter, scorers map[plugins.Scorer]int,
25-
picker plugins.Picker, postSchedulePlugins []plugins.PostSchedule) *SchedulerConfig {
26-
return &SchedulerConfig{
26+
picker plugins.Picker, postSchedulePlugins []plugins.PostSchedule, opts ...ConfigOption) *SchedulerConfig {
27+
config := &SchedulerConfig{
2728
preSchedulePlugins: preSchedulePlugins,
2829
filters: filters,
2930
scorers: scorers,
3031
picker: picker,
3132
postSchedulePlugins: postSchedulePlugins,
3233
}
34+
for _, opt := range opts {
35+
opt(config)
36+
}
37+
return config
3338
}
3439

3540
// SchedulerConfig provides a configuration for the scheduler which influence routing decisions.
@@ -40,3 +45,16 @@ type SchedulerConfig struct {
4045
picker plugins.Picker
4146
postSchedulePlugins []plugins.PostSchedule
4247
}
48+
49+
type ConfigOption func(*SchedulerConfig)
50+
51+
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/813): Replace this
52+
// with a more generic way to add plugins.
53+
func AddPrefixPlugin(prefixConfig prefix.Config, weight int) ConfigOption {
54+
return func(cfg *SchedulerConfig) {
55+
prefixPlugin := prefix.New(prefixConfig)
56+
cfg.preSchedulePlugins = append(cfg.preSchedulePlugins, prefixPlugin)
57+
cfg.postSchedulePlugins = append(cfg.postSchedulePlugins, prefixPlugin)
58+
cfg.scorers[prefixPlugin] = weight
59+
}
60+
}

0 commit comments

Comments
 (0)