Skip to content

Commit 61ce732

Browse files
committed
Address comments
1 parent 97acfa6 commit 61ce732

File tree

12 files changed

+200
-209
lines changed

12 files changed

+200
-209
lines changed

cmd/epp/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ func loadPrefixCacheConfig() prefix.Config {
121121
baseLogger := log.Log.WithName("env-config")
122122

123123
return prefix.Config{
124+
Weight: envutil.GetEnvInt("PREFIX_CACHE_WEIGHT", prefix.DefaultScorerWeight, baseLogger),
124125
HashBlockSize: envutil.GetEnvInt("PREFIX_CACHE_HASH_BLOCK_SIZE", prefix.DefaultHashBlockSize, baseLogger),
125126
MaxPrefixBlocksToMatch: envutil.GetEnvInt("PREFIX_CACHE_MAX_PREFIX_BLOCKS", prefix.DefaultMaxPrefixBlocks, baseLogger),
126127
LRUIndexerCapacity: envutil.GetEnvInt("PREFIX_CACHE_LRU_CAPACITY", prefix.DefaultLRUIndexerCapacity, baseLogger),

pkg/epp/metrics/metrics.go

Lines changed: 11 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package metrics
1818

1919
import (
2020
"context"
21-
"runtime/debug"
2221
"sync"
2322
"time"
2423

@@ -210,17 +209,6 @@ var (
210209
[]string{"plugin_type", "plugin_name"},
211210
)
212211

213-
// Info Metrics
214-
InferenceExtensionInfo = compbasemetrics.NewGaugeVec(
215-
&compbasemetrics.GaugeOpts{
216-
Subsystem: InferenceExtension,
217-
Name: "info",
218-
Help: "General information of the current build of Inference Extension.",
219-
StabilityLevel: compbasemetrics.ALPHA,
220-
},
221-
[]string{"commit"},
222-
)
223-
224212
// Prefix indexer Metrics
225213
PrefixCacheSize = compbasemetrics.NewGaugeVec(
226214
&compbasemetrics.GaugeOpts{
@@ -254,6 +242,17 @@ var (
254242
},
255243
[]string{},
256244
)
245+
246+
// Info Metrics
247+
InferenceExtensionInfo = compbasemetrics.NewGaugeVec(
248+
&compbasemetrics.GaugeOpts{
249+
Subsystem: InferenceExtension,
250+
Name: "info",
251+
Help: "General information of the current build of Inference Extension.",
252+
StabilityLevel: compbasemetrics.ALPHA,
253+
},
254+
[]string{"commit"},
255+
)
257256
)
258257

259258
var registerMetrics sync.Once
@@ -414,21 +413,3 @@ func RecordInferenceExtensionInfo() {
414413
InferenceExtensionInfo.WithLabelValues(CommitSHA).Set(1)
415414
}
416415
}
417-
418-
func init() {
419-
info, ok := debug.ReadBuildInfo()
420-
if !ok {
421-
return
422-
}
423-
424-
var Commit = func(i *debug.BuildInfo) string {
425-
for _, setting := range i.Settings {
426-
if setting.Key == "vcs.revision" {
427-
return setting.Value
428-
}
429-
}
430-
return ""
431-
}(info)
432-
433-
CommitSHA = Commit
434-
}

pkg/epp/scheduling/config.go

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,14 @@ limitations under the License.
1616

1717
package scheduling
1818

19-
import "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
19+
import (
20+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
21+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/filter"
22+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/picker"
23+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/prefix"
24+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/scorer"
25+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
26+
)
2027

2128
// NewSchedulerConfig creates a new SchedulerConfig object with the given plugins.
2229
func NewSchedulerConfig(preSchedulePlugins []plugins.PreSchedule, filters []plugins.Filter, scorers map[plugins.Scorer]int,
@@ -52,3 +59,58 @@ var defaultConfig = &SchedulerConfig{
5259
picker: defPlugin,
5360
postSchedulePlugins: []plugins.PostSchedule{},
5461
}
62+
63+
func CreateConfig(opts ...ConfigOption) *SchedulerConfig {
64+
config := &SchedulerConfig{
65+
preSchedulePlugins: []plugins.PreSchedule{},
66+
postSchedulePlugins: []plugins.PostSchedule{},
67+
scorers: map[plugins.Scorer]int{},
68+
filters: []plugins.Filter{&sheddableRequestFilterV2{}},
69+
picker: &picker.MaxScorePicker{},
70+
}
71+
for _, opt := range opts {
72+
opt(config)
73+
}
74+
return config
75+
}
76+
77+
type ConfigOption func(*SchedulerConfig)
78+
79+
func WithPrefixPlugin(prefixConfig prefix.Config) ConfigOption {
80+
return func(cfg *SchedulerConfig) {
81+
prefixPlugin := prefix.New(prefixConfig)
82+
cfg.preSchedulePlugins = append(cfg.preSchedulePlugins, prefixPlugin)
83+
cfg.postSchedulePlugins = append(cfg.postSchedulePlugins, prefixPlugin)
84+
cfg.scorers[prefixPlugin] = prefixConfig.Weight
85+
}
86+
}
87+
88+
func WithQueuePlugin(queueConfig scorer.QueueScorerConfig) ConfigOption {
89+
return func(cfg *SchedulerConfig) {
90+
queuePlugin := &scorer.QueueScorer{}
91+
cfg.scorers[queuePlugin] = queueConfig.Weight
92+
}
93+
}
94+
95+
func WithKVCachePlugin(kvCacheConfig scorer.KVCacheScorerConfig) ConfigOption {
96+
return func(cfg *SchedulerConfig) {
97+
kvCachePlugin := &scorer.KVCacheScorer{}
98+
cfg.scorers[kvCachePlugin] = kvCacheConfig.Weight
99+
}
100+
}
101+
102+
type sheddableRequestFilterV2 struct{}
103+
104+
func (p *sheddableRequestFilterV2) Name() string {
105+
return "sheddableRequestFilterV2"
106+
}
107+
108+
func (p *sheddableRequestFilterV2) Filter(ctx *types.SchedulingContext, pods []types.Pod) []types.Pod {
109+
if ctx.Req.Critical {
110+
// Allow all pods to pass through if the request is critical, even if all pods reach their capacity.
111+
return pods
112+
}
113+
114+
// Only allow pods that have enough capacity to handle the request.
115+
return filter.HasCapacityFilter.Filter(ctx, pods)
116+
}

pkg/epp/scheduling/config_v2.go

Lines changed: 0 additions & 82 deletions
This file was deleted.

pkg/epp/scheduling/plugins/filter/filter.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -256,14 +256,6 @@ var HasCapacityFilter = &baseFilter{
256256
filter: toFilterFunc(queueThresholdPredicate(config.Conf.QueueThresholdCritical).and(kvCacheThresholdPredicate(config.Conf.KVCacheThreshold))),
257257
}
258258

259-
// NoopFilter is a filter that does not filter out any pods.
260-
var NoopFilter = &baseFilter{
261-
name: "noop",
262-
filter: toFilterFunc(func(req *types.LLMRequest, pod types.Pod) bool {
263-
return true
264-
}),
265-
}
266-
267259
// podPredicate is a filter function to check whether a pod is desired.
268260
type podPredicate func(req *types.LLMRequest, pod types.Pod) bool
269261

pkg/epp/scheduling/plugins/prefix/indexer.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,13 @@ import (
2424

2525
"sigs.k8s.io/controller-runtime/pkg/log"
2626
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
27-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
2827
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
2928
)
3029

3130
func newIndexer(maxCacheSize int) *indexer {
3231
t := &indexer{
3332
maxCacheSize: maxCacheSize,
34-
table: make(map[types.BlockHash]map[types.ServerID]*node),
33+
table: make(map[BlockHash]map[ServerID]*node),
3534
list: newLinkedList(),
3635
}
3736
go t.ReportCacheSize(time.Second)
@@ -43,15 +42,15 @@ func newIndexer(maxCacheSize int) *indexer {
4342
type indexer struct {
4443
mu sync.RWMutex
4544
maxCacheSize int
46-
table map[types.BlockHash]map[types.ServerID]*node // from any prefix cache to the cache entry to find the server
47-
list *linkedList // LRU list to keep track of the order of entries
45+
table map[BlockHash]map[ServerID]*node // from any prefix cache to the cache entry to find the server
46+
list *linkedList // LRU list to keep track of the order of entries
4847
}
4948

5049
// Get returns the set of servers that have the given prefix hash cached.
51-
func (i *indexer) Get(hash types.BlockHash) map[types.ServerID]bool {
50+
func (i *indexer) Get(hash BlockHash) map[ServerID]bool {
5251
i.mu.RLock()
5352
defer i.mu.RUnlock()
54-
res := map[types.ServerID]bool{}
53+
res := map[ServerID]bool{}
5554
for server := range i.table[hash] {
5655
res[server] = true
5756
}
@@ -61,15 +60,15 @@ func (i *indexer) Get(hash types.BlockHash) map[types.ServerID]bool {
6160
// Add adds a list of prefix hashes of a single request to the server the request was sent to.
6261
// The intuition is that this server is likely to have the prefix cached, so next time a request
6362
// sharing the longest prefix should be sent to the same server to take advantage of the cache hit.
64-
func (i *indexer) Add(hashes []types.BlockHash, server types.ServerID) {
63+
func (i *indexer) Add(hashes []BlockHash, server ServerID) {
6564
i.mu.Lock()
6665
defer i.mu.Unlock()
6766
for _, hash := range hashes {
6867
i.add(hash, server)
6968
}
7069
}
7170

72-
func (i *indexer) check(hash types.BlockHash, server types.ServerID) (*node, bool) {
71+
func (i *indexer) check(hash BlockHash, server ServerID) (*node, bool) {
7372
servers, ok := i.table[hash]
7473
if !ok {
7574
return nil, false
@@ -78,7 +77,7 @@ func (i *indexer) check(hash types.BlockHash, server types.ServerID) (*node, boo
7877
return n, ok
7978
}
8079

81-
func (i *indexer) add(hash types.BlockHash, server types.ServerID) {
80+
func (i *indexer) add(hash BlockHash, server ServerID) {
8281
node, exists := i.check(hash, server)
8382
if exists {
8483
i.list.moveToTail(node)
@@ -87,7 +86,7 @@ func (i *indexer) add(hash types.BlockHash, server types.ServerID) {
8786
}
8887
}
8988

90-
func (i *indexer) create(hash types.BlockHash, server types.ServerID) {
89+
func (i *indexer) create(hash BlockHash, server ServerID) {
9190
n := &node{
9291
hash: hash,
9392
server: server,
@@ -99,7 +98,7 @@ func (i *indexer) create(hash types.BlockHash, server types.ServerID) {
9998
}
10099

101100
if _, ok := i.table[hash]; !ok {
102-
i.table[hash] = make(map[types.ServerID]*node)
101+
i.table[hash] = make(map[ServerID]*node)
103102
}
104103
i.table[hash][server] = n
105104
i.list.add(n)

pkg/epp/scheduling/plugins/prefix/indexer_test.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,28 +19,27 @@ import (
1919
"testing"
2020

2121
"github.com/stretchr/testify/assert"
22-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
2322
)
2423

2524
func TestIndexer_AddAndGet(t *testing.T) {
2625
cache := newIndexer(2)
2726

28-
hash1 := types.BlockHash(1)
29-
server := types.ServerID{Namespace: "default", Name: "server1"}
27+
hash1 := BlockHash(1)
28+
server := ServerID{Namespace: "default", Name: "server1"}
3029

3130
// Add an entry to the cache
32-
cache.Add([]types.BlockHash{hash1}, server)
31+
cache.Add([]BlockHash{hash1}, server)
3332

3433
// Retrieve the entry
3534
assert.Equal(t, 1, cache.list.size, "Cache size should be 1 after adding an entry")
3635
servers := cache.Get(hash1)
3736
assert.Contains(t, servers, server, "Cache should contain the added server")
3837

3938
// Add another entry to the cache, the cache size should be incremented to 2.
40-
cache.Add([]types.BlockHash{types.BlockHash(2)}, server)
39+
cache.Add([]BlockHash{BlockHash(2)}, server)
4140
assert.Equal(t, 2, cache.list.size, "Cache size should be 2 after adding an entry")
4241

4342
// Add another entry to the cache, which should evict the first one due to max size.
44-
cache.Add([]types.BlockHash{types.BlockHash(3)}, server)
43+
cache.Add([]BlockHash{BlockHash(3)}, server)
4544
assert.Equal(t, 2, cache.list.size, "Cache size should still be 2 after adding an entry")
4645
}

pkg/epp/scheduling/plugins/prefix/linked_list.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,6 @@ limitations under the License.
1616

1717
package prefix
1818

19-
import (
20-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
21-
)
22-
2319
type linkedList struct {
2420
dummyHead *node // The head of the linked list (dummy node).
2521
tail *node // The tail of the linked list.
@@ -40,8 +36,8 @@ func newLinkedList() *linkedList {
4036
type node struct {
4137
prev *node
4238
next *node
43-
server types.ServerID
44-
hash types.BlockHash
39+
server ServerID
40+
hash BlockHash
4541
}
4642

4743
// add adds a node to the end of the linked list.

0 commit comments

Comments
 (0)