Skip to content

Commit b275441

Browse files
committed
Add prefix cache aware scheduling
1 parent a6ee559 commit b275441

14 files changed

+921
-0
lines changed

cmd/epp/main.go

+23
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"k8s.io/client-go/rest"
3535
"k8s.io/component-base/metrics/legacyregistry"
3636
ctrl "sigs.k8s.io/controller-runtime"
37+
"sigs.k8s.io/controller-runtime/pkg/log"
3738
"sigs.k8s.io/controller-runtime/pkg/log/zap"
3839
"sigs.k8s.io/controller-runtime/pkg/manager"
3940
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
@@ -42,7 +43,9 @@ import (
4243
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
4344
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
4445
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
46+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/prefix"
4547
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
48+
envutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
4649
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
4750
)
4851

@@ -106,8 +109,24 @@ var (
106109
"Prometheus metric for the LoRA info metrics (must be in vLLM label format).")
107110

108111
setupLog = ctrl.Log.WithName("setup")
112+
113+
// Environment variables
114+
schedulerV2 = envutil.GetEnvString("EXPERIMENTAL_USE_SCHEDULE_V2", "false", setupLog)
115+
prefixCacheConfig = loadPrefixCacheConfig()
109116
)
110117

118+
func loadPrefixCacheConfig() prefix.Config {
119+
// logger := zap.New(zap.RawZapOpts(uberzap.AddCaller()))
120+
// log.SetLogger(logger)
121+
baseLogger := log.Log.WithName("env-config")
122+
123+
return prefix.Config{
124+
HashBlockSize: envutil.GetEnvInt("PREFIX_CACHE_HASH_BLOCK_SIZE", prefix.DefaultCacheBlockSize, baseLogger),
125+
MaxPrefixBlocksToMatch: envutil.GetEnvInt("PREFIX_CACHE_MAX_PREFIX_BLOCKS", prefix.DefaultMaxPrefixBlocks, baseLogger),
126+
LRUIndexerCapacity: envutil.GetEnvInt("PREFIX_CACHE_MAX_CACHE_SIZE_MB", prefix.DefaultLRUIndexerCapacity, baseLogger),
127+
}
128+
}
129+
111130
func main() {
112131
if err := run(); err != nil {
113132
os.Exit(1)
@@ -171,6 +190,10 @@ func run() error {
171190
datastore := datastore.NewDatastore(ctx, pmf)
172191

173192
scheduler := scheduling.NewScheduler(datastore)
193+
if schedulerV2 == "true" {
194+
setupLog.Info("Creating scheduler with prefixCache plugin", "prefix cache config", prefixCacheConfig)
195+
scheduler = scheduling.NewSchedulerV2(datastore, prefixCacheConfig)
196+
}
174197
serverRunner := &runserver.ExtProcServerRunner{
175198
GrpcPort: *grpcPort,
176199
DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace,

pkg/epp/metrics/metrics.go

+75
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package metrics
1818

1919
import (
2020
"context"
21+
"runtime/debug"
2122
"sync"
2223
"time"
2324

@@ -219,6 +220,40 @@ var (
219220
},
220221
[]string{"commit"},
221222
)
223+
224+
// Prefix indexer Metrics
225+
PrefixCacheSize = compbasemetrics.NewGaugeVec(
226+
&compbasemetrics.GaugeOpts{
227+
Subsystem: InferenceExtension,
228+
Name: "prefix_indexer_size",
229+
Help: "Size of the prefix indexer.",
230+
StabilityLevel: compbasemetrics.ALPHA,
231+
},
232+
[]string{},
233+
)
234+
235+
PrefixCacheHitRatio = compbasemetrics.NewHistogramVec(
236+
&compbasemetrics.HistogramOpts{
237+
Subsystem: InferenceExtension,
238+
Name: "prefix_indexer_hit_ratio",
239+
Help: "Ratio of prefix length matched to total prefix length in the cache lookup.",
240+
// Buckets from 0.0 to 1.0 in increments
241+
Buckets: []float64{0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0},
242+
StabilityLevel: compbasemetrics.ALPHA,
243+
},
244+
[]string{},
245+
)
246+
247+
PrefixCacheHitLength = compbasemetrics.NewHistogramVec(
248+
&compbasemetrics.HistogramOpts{
249+
Subsystem: InferenceExtension,
250+
Name: "prefix_indexer_hit_bytes",
251+
Help: "Length of the prefix match in number of bytes in the cache lookup.",
252+
Buckets: []float64{0, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536},
253+
StabilityLevel: compbasemetrics.ALPHA,
254+
},
255+
[]string{},
256+
)
222257
)
223258

224259
var registerMetrics sync.Once
@@ -244,6 +279,10 @@ func Register() {
244279
legacyregistry.MustRegister(SchedulerE2ELatency)
245280

246281
legacyregistry.MustRegister(InferenceExtensionInfo)
282+
283+
legacyregistry.MustRegister(PrefixCacheSize)
284+
legacyregistry.MustRegister(PrefixCacheHitRatio)
285+
legacyregistry.MustRegister(PrefixCacheHitLength)
247286
})
248287
}
249288

@@ -352,8 +391,44 @@ func RecordSchedulerE2ELatency(duration time.Duration) {
352391
SchedulerE2ELatency.WithLabelValues().Observe(duration.Seconds())
353392
}
354393

394+
// RecordPrefixCacheSize records the size of the prefix indexer in megabytes.
395+
func RecordPrefixCacheSize(size int64) {
396+
PrefixCacheSize.WithLabelValues().Set(float64(size))
397+
}
398+
399+
// RecordPrefixCacheMatch records both the hit ratio and hit length for a prefix indexer match.
400+
// matchedLength is the number of characters that matched, and totalLength is the total prefix length.
401+
func RecordPrefixCacheMatch(matchedLength, totalLength int) {
402+
// Record the hit length metric
403+
PrefixCacheHitLength.WithLabelValues().Observe(float64(matchedLength))
404+
405+
// Record the hit ratio metric if totalLength is positive
406+
if totalLength > 0 {
407+
ratio := float64(matchedLength) / float64(totalLength)
408+
PrefixCacheHitRatio.WithLabelValues().Observe(ratio)
409+
}
410+
}
411+
355412
func RecordInferenceExtensionInfo() {
356413
if CommitSHA != "" {
357414
InferenceExtensionInfo.WithLabelValues(CommitSHA).Set(1)
358415
}
359416
}
417+
418+
func init() {
419+
info, ok := debug.ReadBuildInfo()
420+
if !ok {
421+
return
422+
}
423+
424+
var Commit = func(i *debug.BuildInfo) string {
425+
for _, setting := range i.Settings {
426+
if setting.Key == "vcs.revision" {
427+
return setting.Value
428+
}
429+
}
430+
return ""
431+
}(info)
432+
433+
CommitSHA = Commit
434+
}

pkg/epp/metrics/metrics_test.go

+103
Original file line numberDiff line numberDiff line change
@@ -663,3 +663,106 @@ func TestSchedulerE2ELatency(t *testing.T) {
663663
})
664664
}
665665
}
666+
667+
func TestPrefixCacheMetrics(t *testing.T) {
668+
const (
669+
PrefixCacheSizeMetric = InferenceExtension + "_prefix_indexer_size"
670+
PrefixCacheHitRatioMetric = InferenceExtension + "_prefix_indexer_hit_ratio"
671+
PrefixCacheHitLengthMetric = InferenceExtension + "_prefix_indexer_hit_bytes"
672+
)
673+
674+
type cacheMatchRecord struct {
675+
matchedLength int
676+
totalLength int
677+
}
678+
679+
scenario := struct {
680+
name string
681+
cacheSizes []int64
682+
cacheMatches []cacheMatchRecord
683+
}{
684+
name: "multiple cache metrics",
685+
cacheSizes: []int64{1024, 2048, 4096},
686+
cacheMatches: []cacheMatchRecord{
687+
{
688+
matchedLength: 5,
689+
totalLength: 10,
690+
},
691+
{
692+
matchedLength: 0,
693+
totalLength: 10,
694+
},
695+
{
696+
matchedLength: 10,
697+
totalLength: 10,
698+
},
699+
{
700+
matchedLength: 7,
701+
totalLength: 10,
702+
},
703+
{
704+
matchedLength: 64,
705+
totalLength: 128,
706+
},
707+
{
708+
matchedLength: 0,
709+
totalLength: 128,
710+
},
711+
},
712+
}
713+
714+
Register()
715+
t.Run(scenario.name, func(t *testing.T) {
716+
// Record cache size metrics
717+
for _, size := range scenario.cacheSizes {
718+
RecordPrefixCacheSize(size)
719+
}
720+
721+
// Record cache match metrics (both hit ratio and hit length)
722+
for _, match := range scenario.cacheMatches {
723+
RecordPrefixCacheMatch(match.matchedLength, match.totalLength)
724+
}
725+
726+
// Verify cache size metrics
727+
wantCacheSizeMetrics, err := os.Open("testdata/prefix_indexer_size_metric")
728+
defer func() {
729+
if err := wantCacheSizeMetrics.Close(); err != nil {
730+
t.Error(err)
731+
}
732+
}()
733+
if err != nil {
734+
t.Fatal(err)
735+
}
736+
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantCacheSizeMetrics, PrefixCacheSizeMetric); err != nil {
737+
t.Error(err)
738+
}
739+
740+
// Verify hit ratio metrics
741+
wantHitRatioMetrics, err := os.Open("testdata/prefix_indexer_hit_ratio_metric")
742+
defer func() {
743+
if err := wantHitRatioMetrics.Close(); err != nil {
744+
t.Error(err)
745+
}
746+
}()
747+
if err != nil {
748+
t.Fatal(err)
749+
}
750+
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantHitRatioMetrics, PrefixCacheHitRatioMetric); err != nil {
751+
t.Error(err)
752+
}
753+
754+
// Verify hit length metrics
755+
wantHitLengthMetrics, err := os.Open("testdata/prefix_indexer_hit_bytes_metric")
756+
defer func() {
757+
if err := wantHitLengthMetrics.Close(); err != nil {
758+
t.Error(err)
759+
}
760+
}()
761+
if err != nil {
762+
t.Fatal(err)
763+
}
764+
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantHitLengthMetrics, PrefixCacheHitLengthMetric); err != nil {
765+
t.Error(err)
766+
}
767+
})
768+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# HELP inference_extension_prefix_indexer_hit_bytes [ALPHA] Length of the prefix match in number of bytes in the cache lookup.
2+
# TYPE inference_extension_prefix_indexer_hit_bytes histogram
3+
inference_extension_prefix_indexer_hit_bytes_bucket{le="0"} 2
4+
inference_extension_prefix_indexer_hit_bytes_bucket{le="16"} 5
5+
inference_extension_prefix_indexer_hit_bytes_bucket{le="32"} 5
6+
inference_extension_prefix_indexer_hit_bytes_bucket{le="64"} 6
7+
inference_extension_prefix_indexer_hit_bytes_bucket{le="128"} 6
8+
inference_extension_prefix_indexer_hit_bytes_bucket{le="256"} 6
9+
inference_extension_prefix_indexer_hit_bytes_bucket{le="512"} 6
10+
inference_extension_prefix_indexer_hit_bytes_bucket{le="1024"} 6
11+
inference_extension_prefix_indexer_hit_bytes_bucket{le="2048"} 6
12+
inference_extension_prefix_indexer_hit_bytes_bucket{le="4096"} 6
13+
inference_extension_prefix_indexer_hit_bytes_bucket{le="8192"} 6
14+
inference_extension_prefix_indexer_hit_bytes_bucket{le="16384"} 6
15+
inference_extension_prefix_indexer_hit_bytes_bucket{le="32768"} 6
16+
inference_extension_prefix_indexer_hit_bytes_bucket{le="65536"} 6
17+
inference_extension_prefix_indexer_hit_bytes_bucket{le="+Inf"} 6
18+
inference_extension_prefix_indexer_hit_bytes_sum 86
19+
inference_extension_prefix_indexer_hit_bytes_count 6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# HELP inference_extension_prefix_indexer_hit_ratio [ALPHA] Ratio of prefix length matched to total prefix length in the cache lookup.
2+
# TYPE inference_extension_prefix_indexer_hit_ratio histogram
3+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0"} 2
4+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.1"} 2
5+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.2"} 2
6+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.3"} 2
7+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.4"} 2
8+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.5"} 4
9+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.6"} 4
10+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.7"} 5
11+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.8"} 5
12+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.9"} 5
13+
inference_extension_prefix_indexer_hit_ratio_bucket{le="1"} 6
14+
inference_extension_prefix_indexer_hit_ratio_bucket{le="+Inf"} 6
15+
inference_extension_prefix_indexer_hit_ratio_sum 2.7
16+
inference_extension_prefix_indexer_hit_ratio_count 6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# HELP inference_extension_prefix_indexer_size [ALPHA] Size of the prefix indexer.
2+
# TYPE inference_extension_prefix_indexer_size gauge
3+
inference_extension_prefix_indexer_size{} 4096

pkg/epp/scheduling/plugins/filter/filter.go

+8
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,14 @@ var HasCapacityFilter = &baseFilter{
256256
filter: toFilterFunc(queueThresholdPredicate(config.Conf.QueueThresholdCritical).and(kvCacheThresholdPredicate(config.Conf.KVCacheThreshold))),
257257
}
258258

259+
// NoopFilter is a filter that does not filter out any pods.
260+
var NoopFilter = &baseFilter{
261+
name: "noop",
262+
filter: toFilterFunc(func(req *types.LLMRequest, pod types.Pod) bool {
263+
return true
264+
}),
265+
}
266+
259267
// podPredicate is a filter function to check whether a pod is desired.
260268
type podPredicate func(req *types.LLMRequest, pod types.Pod) bool
261269

0 commit comments

Comments
 (0)