Skip to content

Commit 4a7de3f

Browse files
committed
Externalize Scheduler's saturation logic
This commit refactors the request processing pipeline, externalizing saturation detection and criticality-based service differentiation from the Scheduler. These responsibilities are now primarily managed by the RequestControl.Director. This change is a preparatory step for the introduction of a new Flow Controller component, which will eventually absorb these admission control duties. Key changes include: - Introduced `PreDispatch` method to `RequestControl.Director` It utilizes the `SaturationDetector` for admission control of non-critical requests and handles request criticality to determine if saturation checks are bypassed. - The saturation detection logic for dropping non-critical requests is intentionally preserved within the `Director` at this stage. This allows the option to bypass the future Flow Controller component during its maturation, ensuring the existing saturation and sheddable request behavior can be maintained as a fallback. - Simplified the `Scheduler` to focus solely on preference-based filtering and pod selection for requests that have already been admitted by the `Director`. - Removed the `SheddableRequestFilter` and the distinct critical/sheddable filter paths from the `Scheduler`'s internal logic. The `Scheduler` now applies a single, unified preference filter chain to all incoming requests. - Updated `main.go` to instantiate the `SaturationDetector`, wiring it into the request handling flow. - Updated tests across `scheduler_test.go`, `director_test.go`, and `filter_test.go` to align with the new component responsibilities, adding additional coverage where necessary. This refactoring leads to a cleaner architecture, making the `Scheduler` a more focused component and centralizing initial admission control logic, while paving the way for the future Flow Controller. This is aligned with the direction in `0683-epp-architecture-proposal` and should be nearly no-op in terms of EPP behavior.
1 parent 3d99aa1 commit 4a7de3f

File tree

9 files changed

+603
-487
lines changed

9 files changed

+603
-487
lines changed

cmd/epp/main.go

+49-20
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ import (
4343
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
4444
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
4545
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
46+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
4647
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
4748
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
48-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/filter"
4949
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/picker"
5050
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/prefix"
5151
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/scorer"
@@ -157,13 +157,19 @@ func run() error {
157157
})
158158
setupLog.Info("Flags processed", "flags", flags)
159159

160-
// Init runtime.
160+
// --- Load Configurations from Environment Variables ---
161+
// Note: Scheduler config is loaded via its package init currently. We may
162+
// want to load it here explicitly:
163+
sdConfig := saturationdetector.LoadConfigFromEnv()
164+
165+
// --- Get Kubernetes Config ---
161166
cfg, err := ctrl.GetConfig()
162167
if err != nil {
163-
setupLog.Error(err, "Failed to get rest config")
168+
setupLog.Error(err, "Failed to get Kubernetes rest config")
164169
return err
165170
}
166171

172+
// --- Setup Manager ---
167173
poolNamespacedName := types.NamespacedName{
168174
Name: *poolName,
169175
Namespace: *poolNamespace,
@@ -174,7 +180,7 @@ func run() error {
174180
return err
175181
}
176182

177-
// Set up mapper for metric scraping.
183+
// --- Setup Datastore ---
178184
mapping, err := backendmetrics.NewMetricMapping(
179185
*totalQueuedRequestsMetric,
180186
*kvCacheUsagePercentageMetric,
@@ -185,14 +191,12 @@ func run() error {
185191
return err
186192
}
187193
verifyMetricMapping(*mapping, setupLog)
188-
189194
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{MetricMapping: mapping}, *refreshMetricsInterval)
190-
// Setup runner.
191195
ctx := ctrl.SetupSignalHandler()
196+
appDatastore := datastore.NewDatastore(ctx, pmf)
192197

193-
datastore := datastore.NewDatastore(ctx, pmf)
194-
195-
scheduler := scheduling.NewScheduler(datastore)
198+
// --- Initialize EPP Components ---
199+
appScheduler := scheduling.NewScheduler(appDatastore)
196200
if schedulerV2 == "true" {
197201
queueScorerWeight := envutil.GetEnvInt("QUEUE_SCORE_WEIGHT", scorer.DefaultQueueScorerWeight, setupLog)
198202
kvCacheScorerWeight := envutil.GetEnvInt("KV_CACHE_SCORE_WEIGHT", scorer.DefaultKVCacheScorerWeight, setupLog)
@@ -207,47 +211,62 @@ func run() error {
207211
}
208212
schedulerConfig := scheduling.NewSchedulerConfig(
209213
[]plugins.PreSchedule{},
210-
[]plugins.Filter{filter.NewSheddableCapacityFilter()},
214+
[]plugins.Filter{},
211215
scorers,
212216
picker.NewMaxScorePicker(),
213217
[]plugins.PostSchedule{},
214218
[]plugins.PostResponse{},
215219
schedConfigOpts...)
216-
scheduler = scheduling.NewSchedulerWithConfig(datastore, schedulerConfig)
220+
appScheduler = scheduling.NewSchedulerWithConfig(appDatastore, schedulerConfig)
221+
}
222+
223+
appSaturationDetector, err := saturationdetector.NewDetector(
224+
*sdConfig,
225+
appDatastore,
226+
ctrl.Log.WithName("saturation-detector"),
227+
)
228+
if err != nil {
229+
setupLog.Error(err, "Failed to create SaturationDetector")
230+
return err
217231
}
232+
233+
// --- Setup ExtProc Server Runner ---
218234
serverRunner := &runserver.ExtProcServerRunner{
219235
GrpcPort: *grpcPort,
220236
DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace,
221237
DestinationEndpointHintKey: *destinationEndpointHintKey,
222238
PoolNamespacedName: poolNamespacedName,
223-
Datastore: datastore,
239+
Datastore: appDatastore,
224240
SecureServing: *secureServing,
225241
CertPath: *certPath,
226242
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
227-
Scheduler: scheduler,
243+
Scheduler: appScheduler,
244+
SaturationDetector: appSaturationDetector,
228245
}
229246
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
230-
setupLog.Error(err, "Failed to setup ext-proc controllers")
247+
setupLog.Error(err, "Failed to setup EPP controllers")
231248
return err
232249
}
233250

251+
// --- Add Runnables to Manager ---
252+
234253
// Register health server.
235-
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), datastore, *grpcHealthPort); err != nil {
254+
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), appDatastore, *grpcHealthPort); err != nil {
236255
return err
237256
}
238257

239258
// Register ext-proc server.
240-
if err := mgr.Add(serverRunner.AsRunnable(ctrl.Log.WithName("ext-proc"))); err != nil {
241-
setupLog.Error(err, "Failed to register ext-proc gRPC server")
259+
if err := registerExtProcServer(mgr, serverRunner, ctrl.Log.WithName("ext-proc")); err != nil {
242260
return err
243261
}
244262

245263
// Register metrics handler.
246-
if err := registerMetricsHandler(mgr, *metricsPort, cfg, datastore); err != nil {
264+
if err := registerMetricsHandler(mgr, *metricsPort, cfg, appDatastore); err != nil {
247265
return err
248266
}
249267

250-
// Start the manager. This blocks until a signal is received.
268+
// --- Start Manager ---
269+
// This blocks until a signal is received.
251270
setupLog.Info("Controller manager starting")
252271
if err := mgr.Start(ctx); err != nil {
253272
setupLog.Error(err, "Error starting controller manager")
@@ -275,6 +294,17 @@ func initLogging(opts *zap.Options) {
275294
ctrl.SetLogger(logger)
276295
}
277296

297+
// registerExtProcServer adds the ExtProcServerRunner as a Runnable to the
298+
// manager.
299+
func registerExtProcServer(mgr manager.Manager, runner *runserver.ExtProcServerRunner, logger logr.Logger) error {
300+
if err := mgr.Add(runner.AsRunnable(logger)); err != nil {
301+
setupLog.Error(err, "Failed to register ext-proc gRPC server runnable")
302+
return err
303+
}
304+
setupLog.Info("ExtProc server runner added to manager.")
305+
return nil
306+
}
307+
278308
// registerHealthServer adds the Health gRPC server as a Runnable to the given manager.
279309
func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore.Datastore, port int) error {
280310
srv := grpc.NewServer()
@@ -364,5 +394,4 @@ func verifyMetricMapping(mapping backendmetrics.MetricMapping, logger logr.Logge
364394
if mapping.LoraRequestInfo == nil {
365395
logger.Info("Not scraping metric: LoraRequestInfo")
366396
}
367-
368397
}

0 commit comments

Comments
 (0)