Skip to content

Commit 48cc9a0

Browse files
committed
Externalize Scheduler's saturation logic
This commit refactors the request processing pipeline, externalizing saturation detection and criticality-based service differentiation from the Scheduler. These responsibilities are now primarily managed by the RequestControl.Director. This change is a preparatory step for the introduction of a new Flow Controller component, which will eventually absorb these admission control duties. Key changes include: - Introduced `PreDispatch` method to `RequestControl.Director` It utilizes the `SaturationDetector` for admission control of non-critical requests and handles request criticality to determine if saturation checks are bypassed. - The saturation detection logic for dropping non-critical requests is intentionally preserved within the `Director` at this stage. This allows the option to bypass the future Flow Controller component during its maturation, ensuring the existing saturation and sheddable request behavior can be maintained as a fallback. - Simplified the `Scheduler` to focus solely on preference-based filtering and pod selection for requests that have already been admitted by the `Director`. - Removed the `HasCapacityFilter` and the distinct critical/sheddable filter paths from the `Scheduler`'s internal logic. The `Scheduler` now applies a single, unified preference filter chain (`lowLatencyFilter`) to all incoming requests. - Updated `main.go` to instantiate the `SaturationDetector` and `RequestControl.Director`, wiring them into the request handling flow. - Updated tests across `scheduler_test.go`, `director_test.go`, and `filter_test.go` to align with the new component responsibilities, adding additional coverage where necessary. This refactoring leads to a cleaner architecture, making the `Scheduler` a more focused component and centralizing initial admission control logic, while paving the way for the future Flow Controller. This is aligned with the direction in `0683-epp-architecture-proposal` and should be nearly no-op in terms of EPP behavior.
1 parent cb52769 commit 48cc9a0

File tree

13 files changed

+1351
-514
lines changed

13 files changed

+1351
-514
lines changed

cmd/epp/main.go

+46-16
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
4343
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
4444
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
45+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
4546
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
4647
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
4748
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
@@ -136,13 +137,19 @@ func run() error {
136137
})
137138
setupLog.Info("Flags processed", "flags", flags)
138139

139-
// Init runtime.
140+
// --- Load Configurations from Environment Variables ---
141+
// Note: Scheduler config is loaded via its package init currently. We may
142+
// want to load it here explicitly:
143+
sdConfig := saturationdetector.LoadConfigFromEnv()
144+
145+
// --- Get Kubernetes Config ---
140146
cfg, err := ctrl.GetConfig()
141147
if err != nil {
142-
setupLog.Error(err, "Failed to get rest config")
148+
setupLog.Error(err, "Failed to get Kubernetes rest config")
143149
return err
144150
}
145151

152+
// --- Setup Manager ---
146153
poolNamespacedName := types.NamespacedName{
147154
Name: *poolName,
148155
Namespace: *poolNamespace,
@@ -153,7 +160,7 @@ func run() error {
153160
return err
154161
}
155162

156-
// Set up mapper for metric scraping.
163+
// --- Setup Datastore ---
157164
mapping, err := backendmetrics.NewMetricMapping(
158165
*totalQueuedRequestsMetric,
159166
*kvCacheUsagePercentageMetric,
@@ -164,47 +171,60 @@ func run() error {
164171
return err
165172
}
166173
verifyMetricMapping(*mapping, setupLog)
167-
168174
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{MetricMapping: mapping}, *refreshMetricsInterval)
169-
// Setup runner.
170175
ctx := ctrl.SetupSignalHandler()
176+
appDatastore := datastore.NewDatastore(ctx, pmf)
171177

172-
datastore := datastore.NewDatastore(ctx, pmf)
178+
// --- Initialize EPP Components ---
179+
appScheduler := scheduling.NewScheduler(appDatastore)
180+
181+
appSaturationDetector, err := saturationdetector.NewDetector(
182+
*sdConfig,
183+
appDatastore,
184+
ctrl.Log.WithName("saturation-detector"),
185+
)
186+
if err != nil {
187+
setupLog.Error(err, "Failed to create SaturationDetector")
188+
return err
189+
}
173190

174-
scheduler := scheduling.NewScheduler(datastore)
191+
// --- Setup ExtProc Server Runner ---
175192
serverRunner := &runserver.ExtProcServerRunner{
176193
GrpcPort: *grpcPort,
177194
DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace,
178195
DestinationEndpointHintKey: *destinationEndpointHintKey,
179196
PoolNamespacedName: poolNamespacedName,
180-
Datastore: datastore,
197+
Datastore: appDatastore,
181198
SecureServing: *secureServing,
182199
CertPath: *certPath,
183200
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
184-
Scheduler: scheduler,
201+
Scheduler: appScheduler,
202+
SaturationDetector: appSaturationDetector,
185203
}
186204
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
187-
setupLog.Error(err, "Failed to setup ext-proc controllers")
205+
setupLog.Error(err, "Failed to setup EPP controllers")
188206
return err
189207
}
190208

209+
// --- Add Runnables to Manager ---
210+
191211
// Register health server.
192-
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), datastore, *grpcHealthPort); err != nil {
212+
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), appDatastore, *grpcHealthPort); err != nil {
193213
return err
194214
}
195215

196216
// Register ext-proc server.
197-
if err := mgr.Add(serverRunner.AsRunnable(ctrl.Log.WithName("ext-proc"))); err != nil {
198-
setupLog.Error(err, "Failed to register ext-proc gRPC server")
217+
if err := registerExtProcServer(mgr, serverRunner, ctrl.Log.WithName("ext-proc")); err != nil {
199218
return err
200219
}
201220

202221
// Register metrics handler.
203-
if err := registerMetricsHandler(mgr, *metricsPort, cfg, datastore); err != nil {
222+
if err := registerMetricsHandler(mgr, *metricsPort, cfg, appDatastore); err != nil {
204223
return err
205224
}
206225

207-
// Start the manager. This blocks until a signal is received.
226+
// --- Start Manager ---
227+
// This blocks until a signal is received.
208228
setupLog.Info("Controller manager starting")
209229
if err := mgr.Start(ctx); err != nil {
210230
setupLog.Error(err, "Error starting controller manager")
@@ -232,6 +252,17 @@ func initLogging(opts *zap.Options) {
232252
ctrl.SetLogger(logger)
233253
}
234254

255+
// registerExtProcServer adds the ExtProcServerRunner as a Runnable to the
256+
// manager.
257+
func registerExtProcServer(mgr manager.Manager, runner *runserver.ExtProcServerRunner, logger logr.Logger) error {
258+
if err := mgr.Add(runner.AsRunnable(logger)); err != nil {
259+
setupLog.Error(err, "Failed to register ext-proc gRPC server runnable")
260+
return err
261+
}
262+
setupLog.Info("ExtProc server runner added to manager.")
263+
return nil
264+
}
265+
235266
// registerHealthServer adds the Health gRPC server as a Runnable to the given manager.
236267
func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore.Datastore, port int) error {
237268
srv := grpc.NewServer()
@@ -321,5 +352,4 @@ func verifyMetricMapping(mapping backendmetrics.MetricMapping, logger logr.Logge
321352
if mapping.LoraRequestInfo == nil {
322353
logger.Info("Not scraping metric: LoraRequestInfo")
323354
}
324-
325355
}

0 commit comments

Comments
 (0)