diff --git a/cmd/shell-operator/start.go b/cmd/shell-operator/start.go index a4aedb5e..ed979928 100644 --- a/cmd/shell-operator/start.go +++ b/cmd/shell-operator/start.go @@ -16,6 +16,7 @@ import ( "github.com/flant/shell-operator/pkg/app" shell_operator "github.com/flant/shell-operator/pkg/shell-operator" + operatorconfig "github.com/flant/shell-operator/pkg/shell-operator/config" utils_signal "github.com/flant/shell-operator/pkg/utils/signal" ) @@ -30,7 +31,7 @@ func start(logger *log.Logger) func(_ *kingpin.ParseContext) error { ctx := context.Background() telemetryShutdown := registerTelemetry(ctx) // Init logging and initialize a ShellOperator instance. - operator, err := shell_operator.Init(logger.Named("shell-operator")) + operator, err := shell_operator.NewShellOperatorWithOptions(context.TODO(), operatorconfig.WithLogger(logger)) if err != nil { return fmt.Errorf("init failed: %w", err) } diff --git a/examples/API_USAGE_EXAMPLES.md b/examples/API_USAGE_EXAMPLES.md new file mode 100644 index 00000000..f5a44265 --- /dev/null +++ b/examples/API_USAGE_EXAMPLES.md @@ -0,0 +1,166 @@ +# Shell Operator API Usage Examples + +## Complete API Reference + +The Shell Operator now provides a flexible, options-based API for initialization. Here are all the available ways to create a ShellOperator instance: + +### 1. Basic Constructor (Original) +```go +import ( + "github.com/flant/shell-operator/pkg/shell-operator" + "log" +) + +// Simple constructor - uses default logger and no metrics storage +operator := shell_operator.NewShellOperator() +``` + +### 2. Configuration-Based Constructor +```go +import ( + "context" + "github.com/flant/shell-operator/pkg/shell-operator" + "github.com/flant/shell-operator/pkg/shell-operator/config" +) + +// Using the flexible configuration system +operator, err := shell_operator.NewShellOperatorWithOptions(ctx, + config.WithLogger(logger), + config.WithMetricStorage(storage), + config.WithHookMetricStorage(hookStorage), + config.WithHooksDir("/custom/hooks"), +) +``` + +### 3. With Separate Metric Storages +```go +import ( + "github.com/flant/shell-operator/pkg/shell-operator" + metricsstorage "github.com/deckhouse/deckhouse/pkg/metrics-storage" +) + +builtinStorage := metricsstorage.NewStorage("builtin-metrics") +hookStorage := metricsstorage.NewStorage("hook-metrics") + +operator, err := shell_operator.NewShellOperatorWithOptions(ctx, + shell_operator.WithMetricStorage(builtinStorage), + shell_operator.WithHookMetricStorage(hookStorage), +) +``` + +### 4. With Convenience Function for Both Storages +```go +import ( + "github.com/flant/shell-operator/pkg/shell-operator" + metricsstorage "github.com/deckhouse/deckhouse/pkg/metrics-storage" +) + +storage := metricsstorage.NewStorage("my-app") + +// Set both metric storages at once +operator, err := shell_operator.NewShellOperatorWithOptions(ctx, + config.WithMetricStorages(storage, storage), // Same storage for both +) +``` + +### 5. Convenience Constructor with Logger +```go +// When you primarily need to provide a logger +operator, err := shell_operator.NewShellOperatorWithLogger(ctx, logger, + config.WithMetricStorage(storage), +) +``` + +### 6. Configuration Presets +```go +// Development configuration with sensible defaults +cfg := shell_operator.NewDevelopmentConfig() +operator, err := shell_operator.NewShellOperatorWithConfig(ctx, cfg) + +// Production configuration +cfg := shell_operator.NewProductionConfig() +operator, err := shell_operator.NewShellOperatorWithConfig(ctx, cfg) +``` + +### 7. Advanced Configuration +```go +// Full control over configuration +cfg := config.NewShellOperatorConfig( + config.WithLogger(customLogger), + config.WithMetricStorage(metricsStorage), + config.WithHookMetricStorage(hookMetricsStorage), + config.WithListenAddress("0.0.0.0"), + config.WithListenPort("9090"), + config.WithHooksDir("/app/hooks"), + config.WithTempDir("/tmp/shell-operator"), +) + +operator, err := shell_operator.NewShellOperatorWithConfig(ctx, cfg) +``` + +## Available Configuration Options + +### Configuration Options (for NewShellOperatorWithOptions) +All configuration options are available in the `config` package: +- `config.WithLogger(logger *log.Logger)` - Set the logger +- `config.WithMetricStorage(storage)` - Set the main metrics storage +- `config.WithHookMetricStorage(storage)` - Set the hook-specific metrics storage +- `config.WithMetricStorages(metricStorage, hookStorage)` - Set both metric storages at once +- `config.WithListenAddress(address string)` - HTTP server listen address +- `config.WithListenPort(port string)` - HTTP server listen port +- `config.WithHooksDir(dir string)` - Directory containing hooks +- `config.WithTempDir(dir string)` - Temporary directory +- `config.WithDebugUnixSocket(socket string)` - Debug unix socket path +- `config.WithDebugHttpServerAddr(addr string)` - Debug HTTP server address + +### Convenience Options +- `config.WithListenConfig(address, port string)` - Set both listen address and port +- `config.WithDirectories(hooksDir, tempDir string)` - Set both hooks and temp directories +- `config.WithDebugConfig(unixSocket, httpAddr string)` - Set both debug configurations +- `config.WithMetricStorages(metricStorage, hookStorage)` - Set both metric storages + +## Migration from Old API + +### Before (Old Init function) +```go +// Old way - rigid initialization +operator, err := shell_operator.Init(logger, metricsStorage) +``` + +### After (New Options Pattern) +```go +// New way - flexible configuration options +operator, err := shell_operator.NewShellOperatorWithOptions(ctx, + config.WithLogger(logger), + config.WithMetricStorage(metricsStorage), +) + +// Or using convenience function +operator, err := shell_operator.NewShellOperatorWithLogger(ctx, logger, + config.WithMetricStorage(metricsStorage), +) +``` + +## Error Handling + +All constructor functions return an error if configuration validation fails: + +```go +operator, err := shell_operator.NewShellOperatorWithOptions(ctx, + shell_operator.WithHooksDir(""), // Invalid: empty hooks directory +) +if err != nil { + log.Fatalf("Failed to create operator: %v", err) +} +``` + +## Best Practices + +1. **Use separate metric storage options**: `WithMetricStorage()` and `WithHookMetricStorage()` for explicit control +2. **Use convenience functions when appropriate**: `WithMetricStorages()` when both storages are the same +3. **Use configuration options for complex setups**: When you need multiple configuration parameters +4. **Use presets for common scenarios**: Development vs Production configurations +5. **Always handle errors**: Constructor functions validate configuration and can fail +6. **Prefer explicit options**: Makes configuration clear and maintainable + +This API provides backward compatibility while enabling flexible, maintainable configuration patterns. diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go deleted file mode 100644 index fa3b4ba5..00000000 --- a/internal/metrics/metrics.go +++ /dev/null @@ -1,7 +0,0 @@ -package metrics - -const ( - TasksQueueActionDurationSeconds = "{PREFIX}tasks_queue_action_duration_seconds" - TasksQueueCompactionInQueueTasks = "d8_telemetry_{PREFIX}tasks_queue_compaction_in_queue_tasks" - TasksQueueCompactionReached = "d8_telemetry_{PREFIX}tasks_queue_compaction_reached" -) diff --git a/pkg/kube_events_manager/error_handler.go b/pkg/kube_events_manager/error_handler.go index 49956094..271df8f9 100644 --- a/pkg/kube_events_manager/error_handler.go +++ b/pkg/kube_events_manager/error_handler.go @@ -10,6 +10,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/tools/cache" + "github.com/flant/shell-operator/pkg/metrics" utils "github.com/flant/shell-operator/pkg/utils/labels" ) @@ -63,7 +64,7 @@ func (weh *WatchErrorHandler) handler(_ *cache.Reflector, err error) { } if weh.metricStorage != nil { - weh.metricStorage.CounterAdd("{PREFIX}kubernetes_client_watch_errors_total", 1.0, map[string]string{"error_type": errorType}) + weh.metricStorage.CounterAdd(metrics.KubernetesClientWatchErrorsTotal, 1.0, map[string]string{"error_type": errorType}) } } diff --git a/pkg/kube_events_manager/monitor_test.go b/pkg/kube_events_manager/monitor_test.go index c261c0ec..feff07a7 100644 --- a/pkg/kube_events_manager/monitor_test.go +++ b/pkg/kube_events_manager/monitor_test.go @@ -16,6 +16,7 @@ import ( "github.com/flant/kube-client/manifest" kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" "github.com/flant/shell-operator/pkg/metric" + "github.com/flant/shell-operator/pkg/metrics" ) func Test_Monitor_should_handle_dynamic_ns_events(t *testing.T) { @@ -43,18 +44,18 @@ func Test_Monitor_should_handle_dynamic_ns_events(t *testing.T) { metricStorage := metric.NewStorageMock(t) metricStorage.HistogramObserveMock.Set(func(metric string, value float64, labels map[string]string, buckets []float64) { - metrics := []string{ - "{PREFIX}kube_event_duration_seconds", - "{PREFIX}kube_jq_filter_duration_seconds", + metricsList := []string{ + metrics.KubeEventDurationSeconds, + metrics.KubeJqFilterDurationSeconds, } - assert.Contains(t, metrics, metric) + assert.Contains(t, metricsList, metric) assert.NotZero(t, value) assert.Equal(t, map[string]string(nil), labels) assert.Nil(t, buckets) }) - metricStorage.GaugeSetMock.When("{PREFIX}kube_snapshot_objects", 1, map[string]string(nil)).Then() - metricStorage.GaugeSetMock.When("{PREFIX}kube_snapshot_objects", 2, map[string]string(nil)).Then() - metricStorage.GaugeSetMock.When("{PREFIX}kube_snapshot_objects", 3, map[string]string(nil)).Then() + metricStorage.GaugeSetMock.When(metrics.KubeSnapshotObjects, 1, map[string]string(nil)).Then() + metricStorage.GaugeSetMock.When(metrics.KubeSnapshotObjects, 2, map[string]string(nil)).Then() + metricStorage.GaugeSetMock.When(metrics.KubeSnapshotObjects, 3, map[string]string(nil)).Then() mon := NewMonitor(context.Background(), fc.Client, metricStorage, monitorCfg, func(ev kemtypes.KubeEvent) { objsMutex.Lock() diff --git a/pkg/kube_events_manager/resource_informer.go b/pkg/kube_events_manager/resource_informer.go index 72a38c2c..ebb8a5f6 100644 --- a/pkg/kube_events_manager/resource_informer.go +++ b/pkg/kube_events_manager/resource_informer.go @@ -19,6 +19,7 @@ import ( klient "github.com/flant/kube-client/client" "github.com/flant/shell-operator/pkg/filter/jq" kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" + "github.com/flant/shell-operator/pkg/metrics" "github.com/flant/shell-operator/pkg/utils/measure" ) @@ -224,7 +225,7 @@ func (ei *resourceInformer) loadExistedObjects() error { var err error func() { defer measure.Duration(func(d time.Duration) { - ei.metricStorage.HistogramObserve("{PREFIX}kube_jq_filter_duration_seconds", d.Seconds(), ei.Monitor.Metadata.MetricLabels, nil) + ei.metricStorage.HistogramObserve(metrics.KubeJqFilterDurationSeconds, d.Seconds(), ei.Monitor.Metadata.MetricLabels, nil) })() filter := jq.NewFilter() objFilterRes, err = applyFilter(ei.Monitor.JqFilter, filter, ei.Monitor.FilterFunc, &obj) @@ -254,7 +255,7 @@ func (ei *resourceInformer) loadExistedObjects() error { } ei.cachedObjectsInfo.Count = uint64(len(ei.cachedObjects)) - ei.metricStorage.GaugeSet("{PREFIX}kube_snapshot_objects", float64(len(ei.cachedObjects)), ei.Monitor.Metadata.MetricLabels) + ei.metricStorage.GaugeSet(metrics.KubeSnapshotObjects, float64(len(ei.cachedObjects)), ei.Monitor.Metadata.MetricLabels) return nil } @@ -287,7 +288,7 @@ func (ei *resourceInformer) handleWatchEvent(object interface{}, eventType kemty } defer measure.Duration(func(d time.Duration) { - ei.metricStorage.HistogramObserve("{PREFIX}kube_event_duration_seconds", d.Seconds(), ei.Monitor.Metadata.MetricLabels, nil) + ei.metricStorage.HistogramObserve(metrics.KubeEventDurationSeconds, d.Seconds(), ei.Monitor.Metadata.MetricLabels, nil) })() defer trace.StartRegion(context.Background(), "handleWatchEvent").End() @@ -304,7 +305,7 @@ func (ei *resourceInformer) handleWatchEvent(object interface{}, eventType kemty var err error func() { defer measure.Duration(func(d time.Duration) { - ei.metricStorage.HistogramObserve("{PREFIX}kube_jq_filter_duration_seconds", d.Seconds(), ei.Monitor.Metadata.MetricLabels, nil) + ei.metricStorage.HistogramObserve(metrics.KubeJqFilterDurationSeconds, d.Seconds(), ei.Monitor.Metadata.MetricLabels, nil) })() filter := jq.NewFilter() objFilterRes, err = applyFilter(ei.Monitor.JqFilter, filter, ei.Monitor.FilterFunc, obj) @@ -351,7 +352,7 @@ func (ei *resourceInformer) handleWatchEvent(object interface{}, eventType kemty ei.cachedObjectsIncrement.Modified++ } // Update metrics. - ei.metricStorage.GaugeSet("{PREFIX}kube_snapshot_objects", float64(len(ei.cachedObjects)), ei.Monitor.Metadata.MetricLabels) + ei.metricStorage.GaugeSet(metrics.KubeSnapshotObjects, float64(len(ei.cachedObjects)), ei.Monitor.Metadata.MetricLabels) ei.cacheLock.Unlock() if skipEvent { return @@ -369,7 +370,7 @@ func (ei *resourceInformer) handleWatchEvent(object interface{}, eventType kemty ei.cachedObjectsInfo.Deleted++ ei.cachedObjectsIncrement.Deleted++ // Update metrics. - ei.metricStorage.GaugeSet("{PREFIX}kube_snapshot_objects", float64(len(ei.cachedObjects)), ei.Monitor.Metadata.MetricLabels) + ei.metricStorage.GaugeSet(metrics.KubeSnapshotObjects, float64(len(ei.cachedObjects)), ei.Monitor.Metadata.MetricLabels) ei.cacheLock.Unlock() } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go new file mode 100644 index 00000000..e98f2cf7 --- /dev/null +++ b/pkg/metrics/metrics.go @@ -0,0 +1,393 @@ +// Copyright 2025 Flant JSC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package metrics provides centralized metric names and registration functions for shell-operator. +// All metric names use constants to ensure consistency and prevent typos. +// The {PREFIX} placeholder is replaced by the metrics storage with the appropriate prefix. +package metrics + +import ( + "fmt" + "time" + + metricsstorage "github.com/deckhouse/deckhouse/pkg/metrics-storage" + "github.com/deckhouse/deckhouse/pkg/metrics-storage/options" + + "github.com/flant/shell-operator/pkg/task" +) + +// Metric name constants organized by functional area. +// Each constant represents a unique metric name used throughout shell-operator. +const ( + // ============================================================================ + // Common Metrics + // ============================================================================ + // LiveTicks is a counter that increases every 10 seconds to indicate shell-operator is alive + LiveTicks = "{PREFIX}live_ticks" + + // ============================================================================ + // Task Queue Metrics + // ============================================================================ + // TasksQueueActionDurationSeconds measures task queue operation durations + TasksQueueActionDurationSeconds = "{PREFIX}tasks_queue_action_duration_seconds" + // TasksQueueLength shows the current length of the task queue + TasksQueueLength = "{PREFIX}tasks_queue_length" + // TasksQueueCompactionInQueueTasks tracks telemetry for queue compaction + TasksQueueCompactionInQueueTasks = "d8_telemetry_{PREFIX}tasks_queue_compaction_in_queue_tasks" + // TasksQueueCompactionReached tracks when queue compaction is reached + TasksQueueCompactionReached = "d8_telemetry_{PREFIX}tasks_queue_compaction_reached" + + // ============================================================================ + // Hook Execution Metrics + // ============================================================================ + // Kubernetes Bindings + HookEnableKubernetesBindingsSeconds = "{PREFIX}hook_enable_kubernetes_bindings_seconds" + HookEnableKubernetesBindingsErrorsTotal = "{PREFIX}hook_enable_kubernetes_bindings_errors_total" + HookEnableKubernetesBindingsSuccess = "{PREFIX}hook_enable_kubernetes_bindings_success" + + // Hook Runtime Metrics + HookRunSeconds = "{PREFIX}hook_run_seconds" + HookRunUserCPUSeconds = "{PREFIX}hook_run_user_cpu_seconds" + HookRunSysCPUSeconds = "{PREFIX}hook_run_sys_cpu_seconds" + HookRunMaxRSSBytes = "{PREFIX}hook_run_max_rss_bytes" + + // Hook Execution Results + HookRunErrorsTotal = "{PREFIX}hook_run_errors_total" + HookRunAllowedErrorsTotal = "{PREFIX}hook_run_allowed_errors_total" + HookRunSuccessTotal = "{PREFIX}hook_run_success_total" + + // Task Queue Wait Time + TaskWaitInQueueSecondsTotal = "{PREFIX}task_wait_in_queue_seconds_total" + + // ============================================================================ + // Kubernetes Events Manager Metrics + // ============================================================================ + // KubeSnapshotObjects counts cached objects for particular bindings + KubeSnapshotObjects = "{PREFIX}kube_snapshot_objects" + // KubeJqFilterDurationSeconds measures jq filter execution time + KubeJqFilterDurationSeconds = "{PREFIX}kube_jq_filter_duration_seconds" + // KubeEventDurationSeconds measures Kubernetes event handling time + KubeEventDurationSeconds = "{PREFIX}kube_event_duration_seconds" + // KubernetesClientWatchErrorsTotal counts Kubernetes client watch errors + KubernetesClientWatchErrorsTotal = "{PREFIX}kubernetes_client_watch_errors_total" +) + +// ============================================================================ +// Registration Functions +// ============================================================================ + +// RegisterHookMetrics registers all hook-related metrics with the provided storage. +// This includes metrics for hook execution, Kubernetes bindings, and resource usage. +// Used specifically by shell-operator's HookManager. +func RegisterHookMetrics(metricStorage metricsstorage.Storage) error { + // Register Kubernetes bindings metrics + if err := registerKubernetesBindingsMetrics(metricStorage); err != nil { + return fmt.Errorf("register kubernetes bindings metrics: %w", err) + } + + // Register hook execution metrics + if err := registerHookExecutionMetrics(metricStorage); err != nil { + return fmt.Errorf("register hook execution metrics: %w", err) + } + + return nil +} + +// registerKubernetesBindingsMetrics registers metrics related to Kubernetes bindings setup +func registerKubernetesBindingsMetrics(metricStorage metricsstorage.Storage) error { + hookLabels := []string{"hook"} + + _, err := metricStorage.RegisterGauge( + HookEnableKubernetesBindingsSeconds, hookLabels, + options.WithHelp("Duration of enabling kubernetes bindings in seconds"), + ) + if err != nil { + return fmt.Errorf("can not register %s: %w", HookEnableKubernetesBindingsSeconds, err) + } + + _, err = metricStorage.RegisterCounter( + HookEnableKubernetesBindingsErrorsTotal, hookLabels, + options.WithHelp("Counter of failed attempts to start Kubernetes informers for a hook"), + ) + if err != nil { + return fmt.Errorf("can not register %s: %w", HookEnableKubernetesBindingsErrorsTotal, err) + } + + _, err = metricStorage.RegisterGauge( + HookEnableKubernetesBindingsSuccess, hookLabels, + options.WithHelp("Gauge indicating successful start of Kubernetes informers (0.0 = not started, 1.0 = started)"), + ) + if err != nil { + return fmt.Errorf("can not register %s: %w", HookEnableKubernetesBindingsSuccess, err) + } + + return nil +} + +// registerHookExecutionMetrics registers metrics related to hook execution and resource usage +func registerHookExecutionMetrics(metricStorage metricsstorage.Storage) error { + // Common labels for hook execution metrics + executionLabels := []string{"hook", "binding", "queue"} + + // Standard histogram buckets for timing metrics (microseconds to minutes) + timingBuckets := []float64{ + 0.0, + 0.02, 0.05, // 20,50 milliseconds + 0.1, 0.2, 0.5, // 100,200,500 milliseconds + 1, 2, 5, // 1,2,5 seconds + 10, 20, 50, // 10,20,50 seconds + 100, 200, 500, // 100,200,500 seconds + } + + // Register execution duration metric + _, err := metricStorage.RegisterHistogram( + HookRunSeconds, executionLabels, timingBuckets, + options.WithHelp("Histogram of hook execution times in seconds"), + ) + if err != nil { + return fmt.Errorf("can not register %s: %w", HookRunSeconds, err) + } + + // Register CPU usage metrics + _, err = metricStorage.RegisterHistogram( + HookRunUserCPUSeconds, executionLabels, timingBuckets, + options.WithHelp("Histogram of hook user CPU usage in seconds"), + ) + if err != nil { + return fmt.Errorf("can not register %s: %w", HookRunUserCPUSeconds, err) + } + + _, err = metricStorage.RegisterHistogram( + HookRunSysCPUSeconds, executionLabels, timingBuckets, + options.WithHelp("Histogram of hook system CPU usage in seconds"), + ) + if err != nil { + return fmt.Errorf("can not register %s: %w", HookRunSysCPUSeconds, err) + } + // Register memory usage metric + _, err = metricStorage.RegisterGauge( + HookRunMaxRSSBytes, executionLabels, + options.WithHelp("Gauge of maximum resident set size used by hook in bytes"), + ) + if err != nil { + return fmt.Errorf("can not register %s: %w", HookRunMaxRSSBytes, err) + } + + // Register execution result counters + _, err = metricStorage.RegisterCounter( + HookRunErrorsTotal, executionLabels, + options.WithHelp("Counter of hook execution errors (allowFailure: false)"), + ) + if err != nil { + return fmt.Errorf("can not register %s: %w", HookRunErrorsTotal, err) + } + + _, err = metricStorage.RegisterCounter( + HookRunAllowedErrorsTotal, executionLabels, + options.WithHelp("Counter of hook execution errors that are allowed to fail (allowFailure: true)"), + ) + if err != nil { + return fmt.Errorf("can not register %s: %w", HookRunAllowedErrorsTotal, err) + } + + _, err = metricStorage.RegisterCounter( + HookRunSuccessTotal, executionLabels, + options.WithHelp("Counter of successful hook executions"), + ) + if err != nil { + return fmt.Errorf("can not register %s: %w", HookRunSuccessTotal, err) + } + + // Register queue wait time metric + _, err = metricStorage.RegisterCounter( + TaskWaitInQueueSecondsTotal, executionLabels, + options.WithHelp("Counter of seconds that hooks waited in queue before execution"), + ) + if err != nil { + return fmt.Errorf("can not register %s: %w", TaskWaitInQueueSecondsTotal, err) + } + + return nil +} + +func RegisterOperatorMetrics(metricStorage metricsstorage.Storage, kubeEventManagerLabels []string) error { + if err := RegisterCommonMetrics(metricStorage); err != nil { + return fmt.Errorf("register common metrics: %w", err) + } + + if err := RegisterTaskQueueMetrics(metricStorage); err != nil { + return fmt.Errorf("register task queue metrics: %w", err) + } + + if err := RegisterKubeEventsManagerMetrics(metricStorage, kubeEventManagerLabels); err != nil { + return fmt.Errorf("register kube events manager metrics: %w", err) + } + + return nil +} + +// RegisterCommonMetrics registers base metrics used across shell-operator and addon-operator. +// These are fundamental metrics that indicate the health and activity of the operator. +func RegisterCommonMetrics(metricStorage metricsstorage.Storage) error { + _, err := metricStorage.RegisterCounter( + LiveTicks, []string{}, + options.WithHelp("Counter that increases every 10 seconds to indicate shell-operator is alive"), + ) + if err != nil { + return fmt.Errorf("failed to register %s: %w", LiveTicks, err) + } + + return nil +} + +// RegisterTaskQueueMetrics registers metrics related to task queue operations and performance. +// Used by both shell-operator and addon-operator to monitor queue health and performance. +func RegisterTaskQueueMetrics(metricStorage metricsstorage.Storage) error { + // Task queue operation labels + queueActionLabels := []string{"queue_name", "queue_action"} + queueLabels := []string{"queue"} + + // High-resolution buckets for queue operations (microseconds to milliseconds) + queueOperationBuckets := []float64{ + 0.0, + 0.0001, 0.0002, 0.0005, // 100, 200, 500 microseconds + 0.001, 0.002, 0.005, // 1,2,5 milliseconds + 0.01, 0.02, 0.05, // 10,20,50 milliseconds + 0.1, 0.2, 0.5, // 100,200,500 milliseconds + } + + // Register queue operation duration metric + _, err := metricStorage.RegisterHistogram( + TasksQueueActionDurationSeconds, queueActionLabels, queueOperationBuckets, + options.WithHelp("Histogram of task queue operation durations in seconds"), + ) + if err != nil { + return fmt.Errorf("failed to register %s: %w", TasksQueueActionDurationSeconds, err) + } + + // Register queue length metric + _, err = metricStorage.RegisterGauge( + TasksQueueLength, queueLabels, + options.WithHelp("Gauge showing the length of the task queue"), + ) + if err != nil { + return fmt.Errorf("failed to register %s: %w", TasksQueueLength, err) + } + + return nil +} + +// RegisterKubeEventsManagerMetrics registers metrics for Kubernetes events manager. +// These metrics monitor Kubernetes API interactions, object caching, and event processing. +// Used by both shell-operator and addon-operator. +func RegisterKubeEventsManagerMetrics(metricStorage metricsstorage.Storage, labels []string) error { + // Error type labels for watch error tracking + errorTypeLabels := []string{"error_type"} + + // Buckets for Kubernetes operation timing (milliseconds to seconds) + kubeOperationBuckets := []float64{ + 0.0, + 0.001, 0.002, 0.005, // 1,2,5 milliseconds + 0.01, 0.02, 0.05, // 10,20,50 milliseconds + 0.1, 0.2, 0.5, // 100,200,500 milliseconds + 1, 2, 5, 10, // 1,2,5,10 seconds + } + + // Register snapshot objects gauge + _, err := metricStorage.RegisterGauge( + KubeSnapshotObjects, labels, + options.WithHelp("Gauge with count of cached objects (snapshot) for particular binding"), + ) + if err != nil { + return fmt.Errorf("failed to register %s: %w", KubeSnapshotObjects, err) + } + + // Register jq filter duration histogram + _, err = metricStorage.RegisterHistogram( + KubeJqFilterDurationSeconds, labels, kubeOperationBuckets, + options.WithHelp("Histogram of jq filter execution duration in seconds"), + ) + if err != nil { + return fmt.Errorf("failed to register %s: %w", KubeJqFilterDurationSeconds, err) + } + + // Register Kubernetes event handling duration histogram + _, err = metricStorage.RegisterHistogram( + KubeEventDurationSeconds, labels, kubeOperationBuckets, + options.WithHelp("Histogram of Kubernetes event handling duration in seconds"), + ) + if err != nil { + return fmt.Errorf("failed to register %s: %w", KubeEventDurationSeconds, err) + } + + // Register watch errors counter + _, err = metricStorage.RegisterCounter( + KubernetesClientWatchErrorsTotal, errorTypeLabels, + options.WithHelp("Counter of Kubernetes client watch errors by error type"), + ) + if err != nil { + return fmt.Errorf("failed to register %s: %w", KubernetesClientWatchErrorsTotal, err) + } + + return nil +} + +// ============================================================================ +// Background Metric Updaters +// ============================================================================ + +const ( + liveTicksInterval = 10 * time.Second + queueLengthUpdateInterval = 5 * time.Second +) + +// StartLiveTicksUpdater starts a goroutine that periodically updates +// the live_ticks metric every 10 seconds. +// This metric can be used to verify that addon-operator is alive and functioning. +func StartLiveTicksUpdater(metricStorage metricsstorage.Storage) { + if metricStorage == nil { + return + } + + // Addon-operator live ticks. + go func() { + ticker := time.NewTicker(liveTicksInterval) + + for range ticker.C { + metricStorage.CounterAdd(LiveTicks, 1.0, map[string]string{}) + } + }() +} + +// StartTasksQueueLengthUpdater starts a goroutine that periodically updates +// the tasks_queue_length metric every 5 seconds. +// This metric shows the number of pending tasks in each queue, which can be useful +// for monitoring system load and potential backlog issues. +func StartTasksQueueLengthUpdater(metricStorage metricsstorage.Storage, tqs task.TaskQueueSet) { + if metricStorage == nil { + return + } + + go func() { + ticker := time.NewTicker(queueLengthUpdateInterval) + + for range ticker.C { + // Gather task queues lengths. + tqs.Iterate(func(queue task.TaskQueue) { + queueLen := float64(queue.Length()) + metricStorage.GaugeSet(TasksQueueLength, queueLen, map[string]string{"queue": queue.GetName()}) + }) + } + }() +} diff --git a/pkg/shell-operator/bootstrap.go b/pkg/shell-operator/bootstrap.go index cb3f2b41..1a408ad1 100644 --- a/pkg/shell-operator/bootstrap.go +++ b/pkg/shell-operator/bootstrap.go @@ -1,3 +1,21 @@ +// Copyright 2025 Flant JSC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package shell_operator provides bootstrap functionality for creating and initializing ShellOperator instances. +// +// This file contains the initialization and assembly logic for ShellOperator. +// Configuration is handled via the ShellOperatorConfig struct and related functions defined in config/config.go. package shell_operator import ( @@ -12,73 +30,94 @@ import ( "github.com/flant/shell-operator/pkg/filter/jq" "github.com/flant/shell-operator/pkg/hook" kubeeventsmanager "github.com/flant/shell-operator/pkg/kube_events_manager" + "github.com/flant/shell-operator/pkg/metrics" schedulemanager "github.com/flant/shell-operator/pkg/schedule_manager" + operatorconfig "github.com/flant/shell-operator/pkg/shell-operator/config" "github.com/flant/shell-operator/pkg/task/queue" utils "github.com/flant/shell-operator/pkg/utils/file" "github.com/flant/shell-operator/pkg/webhook/admission" "github.com/flant/shell-operator/pkg/webhook/conversion" ) -// Init initialize logging, ensures directories and creates -// a ShellOperator instance with all dependencies. -func Init(logger *log.Logger) (*ShellOperator, error) { - runtimeConfig := config.NewConfig(logger) - // Init logging subsystem. - app.SetupLogging(runtimeConfig, logger) +// NewShellOperatorWithConfig creates a fully configured ShellOperator instance with all dependencies. +// This replaces the old Init function with a more flexible constructor approach. +func NewShellOperatorWithConfig(ctx context.Context, cfg *operatorconfig.ShellOperatorConfig) (*ShellOperator, error) { + if err := cfg.Validate(); err != nil { + return nil, fmt.Errorf("invalid configuration: %w", err) + } + + op := NewShellOperator( + ctx, + WithLogger(cfg.Logger), + WithMetricStorage(cfg.MetricStorage), + WithHookMetricStorage(cfg.HookMetricStorage), + ) - // Log version and jq filtering implementation. - logger.Info(app.AppStartMessage) + // Initialize runtime configuration and logging + runtimeConfig := config.NewConfig(op.logger) + app.SetupLogging(runtimeConfig, op.logger) + + // Log version and jq filtering implementation + op.logger.Info(app.AppStartMessage) fl := jq.NewFilter() - logger.Debug(fl.FilterInfo()) + op.logger.Debug(fl.FilterInfo()) - hooksDir, err := utils.RequireExistingDirectory(app.HooksDir) + // Validate and prepare directories + hooksDir, err := utils.RequireExistingDirectory(cfg.HooksDir) if err != nil { - logger.Log(context.TODO(), log.LevelFatal.Level(), "hooks directory is required", log.Err(err)) - return nil, err + return nil, fmt.Errorf("hooks directory validation failed: %w", err) } - tempDir, err := utils.EnsureTempDirectory(app.TempDir) + tempDir, err := utils.EnsureTempDirectory(cfg.TempDir) if err != nil { - logger.Log(context.TODO(), log.LevelFatal.Level(), "temp directory", log.Err(err)) - return nil, err + return nil, fmt.Errorf("temp directory setup failed: %w", err) } - op := NewShellOperator(context.TODO(), WithLogger(logger)) - - // Debug server. - debugServer, err := RunDefaultDebugServer(app.DebugUnixSocket, app.DebugHttpServerAddr, op.logger.Named("debug-server")) + // Start debug server + debugServer, err := RunDefaultDebugServer(cfg.DebugUnixSocket, cfg.DebugHttpServerAddr, + op.logger.Named("debug-server")) if err != nil { - logger.Log(context.TODO(), log.LevelFatal.Level(), "start Debug server", log.Err(err)) - return nil, err + return nil, fmt.Errorf("failed to start debug server: %w", err) } - err = op.AssembleCommonOperator(app.ListenAddress, app.ListenPort, []string{ - "hook", - "binding", - "queue", - }) + err = metrics.RegisterOperatorMetrics(op.MetricStorage, []string{"hook", "binding", "queue"}) if err != nil { - logger.Log(context.TODO(), log.LevelFatal.Level(), "essemble common operator", log.Err(err)) - return nil, err + return nil, fmt.Errorf("failed to register operator metrics: %w", err) } - err = op.assembleShellOperator(hooksDir, tempDir, debugServer, runtimeConfig) - if err != nil { - logger.Log(context.TODO(), log.LevelFatal.Level(), "essemble shell operator", log.Err(err)) - return nil, err + // Assemble common components + if err := op.AssembleCommonOperator(cfg.ListenAddress, cfg.ListenPort); err != nil { + return nil, fmt.Errorf("failed to assemble common operator: %w", err) + } + + // Assemble shell-operator specific components + if err := op.assembleShellOperator(hooksDir, tempDir, debugServer, runtimeConfig); err != nil { + return nil, fmt.Errorf("failed to assemble shell operator: %w", err) } return op, nil } +// NewShellOperatorWithOptions creates a ShellOperator instance using functional options. +func NewShellOperatorWithOptions(ctx context.Context, options ...operatorconfig.ConfigOption) (*ShellOperator, error) { + cfg := operatorconfig.NewShellOperatorConfig(options...) + return NewShellOperatorWithConfig(ctx, cfg) +} + +// Init provides backward compatibility with the old initialization function. +// Deprecated: Use NewShellOperatorWithOptions for more flexibility. +func Init(logger *log.Logger) (*ShellOperator, error) { + return NewShellOperatorWithOptions(context.TODO(), operatorconfig.WithLogger(logger)) +} + // AssembleCommonOperator instantiate common dependencies. These dependencies // may be used for shell-operator derivatives, like addon-operator. // requires listenAddress, listenPort to run http server for operator APIs -func (op *ShellOperator) AssembleCommonOperator(listenAddress, listenPort string, kubeEventsManagerLabels []string) error { +func (op *ShellOperator) AssembleCommonOperator(listenAddress, listenPort string) error { op.APIServer = newBaseHTTPServer(listenAddress, listenPort) // built-in metrics - op.setupMetricStorage(kubeEventsManagerLabels) + op.setupMetricStorage() // metrics from user's hooks op.setupHookMetricStorage() @@ -116,7 +155,9 @@ func (op *ShellOperator) AssembleCommonOperator(listenAddress, listenPort string func (op *ShellOperator) assembleShellOperator(hooksDir string, tempDir string, debugServer *debug.Server, runtimeConfig *config.Config) error { registerRootRoute(op) // for shell-operator only - registerHookMetrics(op.HookMetricStorage) + if err := metrics.RegisterHookMetrics(op.HookMetricStorage); err != nil { + return fmt.Errorf("register hook metrics: %w", err) + } op.RegisterDebugQueueRoutes(debugServer) op.RegisterDebugHookRoutes(debugServer) diff --git a/pkg/shell-operator/combine_binding_context.go b/pkg/shell-operator/combine_binding_context.go index defc7696..b37120d2 100644 --- a/pkg/shell-operator/combine_binding_context.go +++ b/pkg/shell-operator/combine_binding_context.go @@ -1,3 +1,17 @@ +// Copyright 2025 Flant JSC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package shell_operator import ( @@ -23,7 +37,7 @@ type CombineResult struct { // If input task has no metadata, result will be nil. // Metadata should implement HookNameAccessor, BindingContextAccessor and MonitorIDAccessor interfaces. // DEV WARNING! Do not use HookMetadataAccessor here. Use only *Accessor interfaces because this method is used from addon-operator. -func (op *ShellOperator) combineBindingContextForHook(tqs *queue.TaskQueueSet, q *queue.TaskQueue, t task.Task) *CombineResult { +func (op *ShellOperator) combineBindingContextForHook(tqs *queue.TaskQueueSet, q task.TaskQueue, t task.Task) *CombineResult { if q == nil { return nil } diff --git a/pkg/shell-operator/combine_binding_context_test.go b/pkg/shell-operator/combine_binding_context_test.go index 5022dc03..43ea4864 100644 --- a/pkg/shell-operator/combine_binding_context_test.go +++ b/pkg/shell-operator/combine_binding_context_test.go @@ -1,3 +1,17 @@ +// Copyright 2025 Flant JSC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package shell_operator import ( @@ -8,12 +22,12 @@ import ( . "github.com/onsi/gomega" "github.com/stretchr/testify/assert" - "github.com/flant/shell-operator/internal/metrics" bindingcontext "github.com/flant/shell-operator/pkg/hook/binding_context" "github.com/flant/shell-operator/pkg/hook/task_metadata" "github.com/flant/shell-operator/pkg/hook/types" kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" "github.com/flant/shell-operator/pkg/metric" + "github.com/flant/shell-operator/pkg/metrics" "github.com/flant/shell-operator/pkg/task" "github.com/flant/shell-operator/pkg/task/queue" ) @@ -37,8 +51,8 @@ func Test_CombineBindingContext_MultipleHooks(t *testing.T) { TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage) TaskQueues.WithContext(context.Background()) TaskQueues.NewNamedQueue("test_multiple_hooks", - func(_ context.Context, _ task.Task) queue.TaskResult { - return queue.TaskResult{ + func(_ context.Context, _ task.Task) task.Result { + return task.Result{ Status: "Success", } }, @@ -147,8 +161,8 @@ func Test_CombineBindingContext_Nil_On_NoCombine(t *testing.T) { TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage) TaskQueues.WithContext(context.Background()) TaskQueues.NewNamedQueue("test_no_combine", - func(_ context.Context, _ task.Task) queue.TaskResult { - return queue.TaskResult{ + func(_ context.Context, _ task.Task) task.Result { + return task.Result{ Status: "Success", } }, @@ -222,8 +236,8 @@ func Test_CombineBindingContext_Group_Compaction(t *testing.T) { TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage) TaskQueues.WithContext(context.Background()) TaskQueues.NewNamedQueue("test_multiple_hooks", - func(_ context.Context, _ task.Task) queue.TaskResult { - return queue.TaskResult{ + func(_ context.Context, _ task.Task) task.Result { + return task.Result{ Status: "Success", } }, @@ -340,8 +354,8 @@ func Test_CombineBindingContext_Group_Type(t *testing.T) { TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage) TaskQueues.WithContext(context.Background()) TaskQueues.NewNamedQueue("test_multiple_hooks", - func(_ context.Context, _ task.Task) queue.TaskResult { - return queue.TaskResult{ + func(_ context.Context, _ task.Task) task.Result { + return task.Result{ Status: "Success", } }, diff --git a/pkg/shell-operator/config/config.go b/pkg/shell-operator/config/config.go new file mode 100644 index 00000000..13951d06 --- /dev/null +++ b/pkg/shell-operator/config/config.go @@ -0,0 +1,175 @@ +// Copyright 2025 Flant JSC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "fmt" + + "github.com/deckhouse/deckhouse/pkg/log" + metricsstorage "github.com/deckhouse/deckhouse/pkg/metrics-storage" + + "github.com/flant/shell-operator/pkg/app" +) + +// ShellOperatorConfig holds configuration for ShellOperator initialization +type ShellOperatorConfig struct { + Logger *log.Logger + ListenAddress string + ListenPort string + HooksDir string + TempDir string + DebugUnixSocket string + DebugHttpServerAddr string + MetricStorage metricsstorage.Storage + HookMetricStorage metricsstorage.Storage +} + +// ConfigOption defines a functional option for ShellOperatorConfig +type ConfigOption func(*ShellOperatorConfig) + +// NewShellOperatorConfig creates a new configuration with default values and applies the provided options. +// If no logger is provided via options, a default logger will be created. +func NewShellOperatorConfig(options ...ConfigOption) *ShellOperatorConfig { + config := &ShellOperatorConfig{ + ListenAddress: app.ListenAddress, + ListenPort: app.ListenPort, + HooksDir: app.HooksDir, + TempDir: app.TempDir, + DebugUnixSocket: app.DebugUnixSocket, + DebugHttpServerAddr: app.DebugHttpServerAddr, + } + + // Apply all provided options + for _, option := range options { + option(config) + } + + return config +} + +// Validate validates the configuration and returns an error if invalid +func (cfg *ShellOperatorConfig) Validate() error { + if cfg.ListenAddress == "" { + return fmt.Errorf("listen address cannot be empty") + } + if cfg.ListenPort == "" { + return fmt.Errorf("listen port cannot be empty") + } + if cfg.HooksDir == "" { + return fmt.Errorf("hooks directory cannot be empty") + } + if cfg.TempDir == "" { + return fmt.Errorf("temp directory cannot be empty") + } + + return nil +} + +// WithLogger sets the logger for the operator configuration +func WithLogger(logger *log.Logger) ConfigOption { + return func(config *ShellOperatorConfig) { + config.Logger = logger + } +} + +// WithListenAddress sets the listen address for the HTTP server +func WithListenAddress(address string) ConfigOption { + return func(config *ShellOperatorConfig) { + config.ListenAddress = address + } +} + +// WithListenPort sets the listen port for the HTTP server +func WithListenPort(port string) ConfigOption { + return func(config *ShellOperatorConfig) { + config.ListenPort = port + } +} + +// WithHooksDir sets the directory containing hooks +func WithHooksDir(dir string) ConfigOption { + return func(config *ShellOperatorConfig) { + config.HooksDir = dir + } +} + +// WithTempDir sets the temporary directory +func WithTempDir(dir string) ConfigOption { + return func(config *ShellOperatorConfig) { + config.TempDir = dir + } +} + +// WithDebugUnixSocket sets the debug unix socket path +func WithDebugUnixSocket(socket string) ConfigOption { + return func(config *ShellOperatorConfig) { + config.DebugUnixSocket = socket + } +} + +// WithDebugHttpServerAddr sets the debug HTTP server address +func WithDebugHttpServerAddr(addr string) ConfigOption { + return func(config *ShellOperatorConfig) { + config.DebugHttpServerAddr = addr + } +} + +// WithMetricStorage sets the metric storage for built-in metrics +func WithMetricStorage(storage metricsstorage.Storage) ConfigOption { + return func(config *ShellOperatorConfig) { + config.MetricStorage = storage + } +} + +// WithHookMetricStorage sets the metric storage for hook metrics +func WithHookMetricStorage(storage metricsstorage.Storage) ConfigOption { + return func(config *ShellOperatorConfig) { + config.HookMetricStorage = storage + } +} + +// Convenience options that combine multiple settings + +// WithListenConfig sets both listen address and port +func WithListenConfig(address, port string) ConfigOption { + return func(config *ShellOperatorConfig) { + config.ListenAddress = address + config.ListenPort = port + } +} + +// WithDirectories sets both hooks and temp directories +func WithDirectories(hooksDir, tempDir string) ConfigOption { + return func(config *ShellOperatorConfig) { + config.HooksDir = hooksDir + config.TempDir = tempDir + } +} + +// WithDebugConfig sets both debug unix socket and HTTP server address +func WithDebugConfig(unixSocket, httpAddr string) ConfigOption { + return func(config *ShellOperatorConfig) { + config.DebugUnixSocket = unixSocket + config.DebugHttpServerAddr = httpAddr + } +} + +// WithMetricStorages sets both metric storage and hook metric storage +func WithMetricStorages(metricStorage, hookMetricStorage metricsstorage.Storage) ConfigOption { + return func(config *ShellOperatorConfig) { + config.MetricStorage = metricStorage + config.HookMetricStorage = hookMetricStorage + } +} diff --git a/pkg/shell-operator/debug_server.go b/pkg/shell-operator/debug_server.go index 3b406d82..037d78a7 100644 --- a/pkg/shell-operator/debug_server.go +++ b/pkg/shell-operator/debug_server.go @@ -1,3 +1,17 @@ +// Copyright 2025 Flant JSC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package shell_operator import ( diff --git a/pkg/shell-operator/http_server.go b/pkg/shell-operator/http_server.go index aef13d0b..15697570 100644 --- a/pkg/shell-operator/http_server.go +++ b/pkg/shell-operator/http_server.go @@ -1,3 +1,17 @@ +// Copyright 2025 Flant JSC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package shell_operator import ( diff --git a/pkg/shell-operator/kube_client.go b/pkg/shell-operator/kube_client.go index 4b1a3e04..84c56244 100644 --- a/pkg/shell-operator/kube_client.go +++ b/pkg/shell-operator/kube_client.go @@ -1,3 +1,17 @@ +// Copyright 2025 Flant JSC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package shell_operator import ( diff --git a/pkg/shell-operator/manager_events_handler.go b/pkg/shell-operator/manager_events_handler.go index cac19d1e..72114ff6 100644 --- a/pkg/shell-operator/manager_events_handler.go +++ b/pkg/shell-operator/manager_events_handler.go @@ -1,3 +1,17 @@ +// Copyright 2025 Flant JSC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package shell_operator import ( @@ -86,9 +100,9 @@ func (m *ManagerEventsHandler) Start() { return } - m.taskQueues.DoWithLock(func(tqs *queue.TaskQueueSet) { + m.taskQueues.DoWithLock(func(tqs task.TaskQueueSet) { for _, resTask := range tailTasks { - if q := tqs.Queues[resTask.GetQueueName()]; q == nil { + if q := tqs.GetByName(resTask.GetQueueName()); q == nil { log.Error("Possible bug!!! Got task for queue but queue is not created yet.", slog.String("queueName", resTask.GetQueueName()), slog.String("description", resTask.GetDescription())) diff --git a/pkg/shell-operator/metrics.go b/pkg/shell-operator/metrics.go new file mode 100644 index 00000000..1caff0e7 --- /dev/null +++ b/pkg/shell-operator/metrics.go @@ -0,0 +1,31 @@ +// Copyright 2025 Flant JSC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package shell_operator + +import ( + "net/http" +) + +// setupMetricStorage creates and initializes metrics storage for built-in operator metrics. +// If MetricStorage is already set via options, it uses that; otherwise creates a new one. +func (op *ShellOperator) setupMetricStorage() { + op.APIServer.RegisterRoute(http.MethodGet, "/metrics", op.MetricStorage.Handler().ServeHTTP) +} + +// setupHookMetricStorage creates and initializes metrics storage for hook metrics. +// If HookMetricStorage is already set via options, it uses that; otherwise creates a new one. +func (op *ShellOperator) setupHookMetricStorage() { + op.APIServer.RegisterRoute(http.MethodGet, "/metrics/hooks", op.HookMetricStorage.Handler().ServeHTTP) +} diff --git a/pkg/shell-operator/metrics_hooks.go b/pkg/shell-operator/metrics_hooks.go deleted file mode 100644 index 7e385043..00000000 --- a/pkg/shell-operator/metrics_hooks.go +++ /dev/null @@ -1,84 +0,0 @@ -package shell_operator - -import ( - "net/http" - - metricsstorage "github.com/deckhouse/deckhouse/pkg/metrics-storage" - - "github.com/flant/shell-operator/pkg/app" -) - -func (op *ShellOperator) setupHookMetricStorage() { - metricStorage := metricsstorage.NewMetricStorage( - metricsstorage.WithPrefix(app.PrometheusMetricsPrefix), - metricsstorage.WithNewRegistry(), - metricsstorage.WithLogger(op.logger.Named("metric-storage")), - ) - op.APIServer.RegisterRoute(http.MethodGet, "/metrics/hooks", metricStorage.Handler().ServeHTTP) - // create new metric storage for hooks - // register scrape handler - op.HookMetricStorage = metricStorage -} - -// specific metrics for shell-operator HookManager -func registerHookMetrics(metricStorage metricsstorage.Storage) { - // Metrics for enable kubernetes bindings. - _, _ = metricStorage.RegisterGauge("{PREFIX}hook_enable_kubernetes_bindings_seconds", []string{"hook"}) - _, _ = metricStorage.RegisterCounter("{PREFIX}hook_enable_kubernetes_bindings_errors_total", []string{"hook"}) - _, _ = metricStorage.RegisterGauge("{PREFIX}hook_enable_kubernetes_bindings_success", []string{"hook"}) - - // Metrics for hook executions. - labels := []string{ - "hook", - "binding", - "queue", - } - // Duration of hook execution. - _, _ = metricStorage.RegisterHistogram( - "{PREFIX}hook_run_seconds", - labels, - []float64{ - 0.0, - 0.02, 0.05, // 20,50 milliseconds - 0.1, 0.2, 0.5, // 100,200,500 milliseconds - 1, 2, 5, // 1,2,5 seconds - 10, 20, 50, // 10,20,50 seconds - 100, 200, 500, // 100,200,500 seconds - }, - ) - - // System CPU usage. - _, _ = metricStorage.RegisterHistogram( - "{PREFIX}hook_run_user_cpu_seconds", - labels, - []float64{ - 0.0, - 0.02, 0.05, // 20,50 milliseconds - 0.1, 0.2, 0.5, // 100,200,500 milliseconds - 1, 2, 5, // 1,2,5 seconds - 10, 20, 50, // 10,20,50 seconds - 100, 200, 500, // 100,200,500 seconds - }, - ) - // User CPU usage. - _, _ = metricStorage.RegisterHistogram( - "{PREFIX}hook_run_sys_cpu_seconds", - labels, - []float64{ - 0.0, - 0.02, 0.05, // 20,50 milliseconds - 0.1, 0.2, 0.5, // 100,200,500 milliseconds - 1, 2, 5, // 1,2,5 seconds - 10, 20, 50, // 10,20,50 seconds - 100, 200, 500, // 100,200,500 seconds - }, - ) - // Max RSS in bytes. - _, _ = metricStorage.RegisterGauge("{PREFIX}hook_run_max_rss_bytes", labels) - - _, _ = metricStorage.RegisterCounter("{PREFIX}hook_run_errors_total", labels) - _, _ = metricStorage.RegisterCounter("{PREFIX}hook_run_allowed_errors_total", labels) - _, _ = metricStorage.RegisterCounter("{PREFIX}hook_run_success_total", labels) - // hook_run task waiting time - _, _ = metricStorage.RegisterCounter("{PREFIX}task_wait_in_queue_seconds_total", labels) -} diff --git a/pkg/shell-operator/metrics_operator.go b/pkg/shell-operator/metrics_operator.go deleted file mode 100644 index 1b155910..00000000 --- a/pkg/shell-operator/metrics_operator.go +++ /dev/null @@ -1,88 +0,0 @@ -package shell_operator - -import ( - "net/http" - - metricsstorage "github.com/deckhouse/deckhouse/pkg/metrics-storage" - - "github.com/flant/shell-operator/internal/metrics" - "github.com/flant/shell-operator/pkg/app" -) - -// setupMetricStorage creates and initializes metrics storage for built-in operator metrics -func (op *ShellOperator) setupMetricStorage(kubeEventsManagerLabels []string) { - metricStorage := metricsstorage.NewMetricStorage( - metricsstorage.WithPrefix(app.PrometheusMetricsPrefix), - metricsstorage.WithLogger(op.logger.Named("metric-storage")), - ) - - registerCommonMetrics(metricStorage) - registerTaskQueueMetrics(metricStorage) - registerKubeEventsManagerMetrics(metricStorage, kubeEventsManagerLabels) - - op.APIServer.RegisterRoute(http.MethodGet, "/metrics", metricStorage.Handler().ServeHTTP) - // create new metric storage for hooks - // register scrape handler - op.MetricStorage = metricStorage -} - -// registerCommonMetrics register base metric -// This function is used in the addon-operator -func registerCommonMetrics(metricStorage metricsstorage.Storage) { - _, _ = metricStorage.RegisterCounter("{PREFIX}live_ticks", []string{}) -} - -// registerTaskQueueMetrics -// This function is used in the addon-operator -func registerTaskQueueMetrics(metricStorage metricsstorage.Storage) { - _, _ = metricStorage.RegisterHistogram( - metrics.TasksQueueActionDurationSeconds, - []string{ - "queue_name", - "queue_action", - }, - []float64{ - 0.0, - 0.0001, 0.0002, 0.0005, // 100, 200, 500 microseconds - 0.001, 0.002, 0.005, // 1,2,5 milliseconds - 0.01, 0.02, 0.05, // 10,20,50 milliseconds - 0.1, 0.2, 0.5, // 100,200,500 milliseconds - }, - ) - - _, _ = metricStorage.RegisterGauge("{PREFIX}tasks_queue_length", []string{"queue"}) -} - -// registerKubeEventsManagerMetrics registers metrics for kube_event_manager -// This function is used in the addon-operator -func registerKubeEventsManagerMetrics(metricStorage metricsstorage.Storage, labels []string) { - // Count of objects in snapshot for one kubernets bindings. - _, _ = metricStorage.RegisterGauge("{PREFIX}kube_snapshot_objects", labels) - // Duration of jqFilter applying. - _, _ = metricStorage.RegisterHistogram( - "{PREFIX}kube_jq_filter_duration_seconds", - labels, - []float64{ - 0.0, - 0.001, 0.002, 0.005, // 1,2,5 milliseconds - 0.01, 0.02, 0.05, // 10,20,50 milliseconds - 0.1, 0.2, 0.5, // 100,200,500 milliseconds - 1, 2, 5, 10, // 1,2,5,10 seconds - }, - ) - // Duration of handling kubernetes event. - _, _ = metricStorage.RegisterHistogram( - "{PREFIX}kube_event_duration_seconds", - labels, - []float64{ - 0.0, - 0.001, 0.002, 0.005, // 1,2,5 milliseconds - 0.01, 0.02, 0.05, // 10,20,50 milliseconds - 0.1, 0.2, 0.5, // 100,200,500 milliseconds - 1, 2, 5, 10, // 1,2,5,10 seconds - }, - ) - - // Count of watch errors. - _, _ = metricStorage.RegisterCounter("{PREFIX}kubernetes_client_watch_errors_total", []string{"error_type"}) -} diff --git a/pkg/shell-operator/operator.go b/pkg/shell-operator/operator.go index 44970b89..41f7ecfd 100644 --- a/pkg/shell-operator/operator.go +++ b/pkg/shell-operator/operator.go @@ -1,3 +1,17 @@ +// Copyright 2025 Flant JSC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package shell_operator import ( @@ -13,6 +27,7 @@ import ( v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" klient "github.com/flant/kube-client/client" + "github.com/flant/shell-operator/pkg/app" "github.com/flant/shell-operator/pkg/hook" bindingcontext "github.com/flant/shell-operator/pkg/hook/binding_context" "github.com/flant/shell-operator/pkg/hook/controller" @@ -21,6 +36,7 @@ import ( objectpatch "github.com/flant/shell-operator/pkg/kube/object_patch" kubeeventsmanager "github.com/flant/shell-operator/pkg/kube_events_manager" kemTypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" + "github.com/flant/shell-operator/pkg/metrics" schedulemanager "github.com/flant/shell-operator/pkg/schedule_manager" "github.com/flant/shell-operator/pkg/task" "github.com/flant/shell-operator/pkg/task/queue" @@ -74,6 +90,18 @@ func WithLogger(logger *log.Logger) Option { } } +func WithMetricStorage(storage metricsstorage.Storage) Option { + return func(operator *ShellOperator) { + operator.MetricStorage = storage + } +} + +func WithHookMetricStorage(storage metricsstorage.Storage) Option { + return func(operator *ShellOperator) { + operator.HookMetricStorage = storage + } +} + func NewShellOperator(ctx context.Context, opts ...Option) *ShellOperator { cctx, cancel := context.WithCancel(ctx) @@ -90,6 +118,23 @@ func NewShellOperator(ctx context.Context, opts ...Option) *ShellOperator { so.logger = log.NewLogger().Named("shell-operator") } + // Use provided metric storage or create default + if so.MetricStorage == nil { + so.MetricStorage = metricsstorage.NewMetricStorage( + metricsstorage.WithPrefix(app.PrometheusMetricsPrefix), + metricsstorage.WithLogger(so.logger.Named("metric-storage")), + ) + } + + // Use provided hook metric storage or create default + if so.HookMetricStorage == nil { + so.HookMetricStorage = metricsstorage.NewMetricStorage( + metricsstorage.WithPrefix(app.PrometheusMetricsPrefix), + metricsstorage.WithNewRegistry(), + metricsstorage.WithLogger(so.logger.Named("hook-metric-storage")), + ) + } + return so } @@ -417,10 +462,10 @@ func (op *ShellOperator) conversionEventHandler(ctx context.Context, crdName str } // taskHandler -func (op *ShellOperator) taskHandler(ctx context.Context, t task.Task) queue.TaskResult { +func (op *ShellOperator) taskHandler(ctx context.Context, t task.Task) task.Result { logEntry := op.logger.With("operator.component", "taskRunner") hookMeta := task_metadata.HookMetadataAccessor(t) - var res queue.TaskResult + var res task.Result switch t.GetType() { case task_metadata.HookRun: @@ -448,7 +493,7 @@ func (op *ShellOperator) taskHandler(ctx context.Context, t task.Task) queue.Tas } // taskHandleEnableKubernetesBindings creates task for each Kubernetes binding in the hook and queues them. -func (op *ShellOperator) taskHandleEnableKubernetesBindings(ctx context.Context, t task.Task) queue.TaskResult { +func (op *ShellOperator) taskHandleEnableKubernetesBindings(ctx context.Context, t task.Task) task.Result { ctx, span := otel.Tracer(serviceName).Start(ctx, "taskHandleEnableKubernetesBindings") defer span.End() @@ -458,10 +503,10 @@ func (op *ShellOperator) taskHandleEnableKubernetesBindings(ctx context.Context, "hook": hookMeta.HookName, } defer measure.Duration(func(d time.Duration) { - op.MetricStorage.GaugeSet("{PREFIX}hook_enable_kubernetes_bindings_seconds", d.Seconds(), metricLabels) + op.MetricStorage.GaugeSet(metrics.HookEnableKubernetesBindingsSeconds, d.Seconds(), metricLabels) })() - var res queue.TaskResult + var res task.Result hookLogLabels := map[string]string{} hookLogLabels["hook"] = hookMeta.HookName hookLogLabels["binding"] = "" @@ -517,14 +562,14 @@ func (op *ShellOperator) taskHandleEnableKubernetesBindings(ctx context.Context, res.AddHeadTasks(hookRunTasks...) } - op.MetricStorage.CounterAdd("{PREFIX}hook_enable_kubernetes_bindings_errors_total", errors, metricLabels) - op.MetricStorage.GaugeAdd("{PREFIX}hook_enable_kubernetes_bindings_success", success, metricLabels) + op.MetricStorage.CounterAdd(metrics.HookEnableKubernetesBindingsErrorsTotal, errors, metricLabels) + op.MetricStorage.GaugeAdd(metrics.HookEnableKubernetesBindingsSuccess, success, metricLabels) return res } // TODO use Context to pass labels and a queue name -func (op *ShellOperator) taskHandleHookRun(ctx context.Context, t task.Task) queue.TaskResult { +func (op *ShellOperator) taskHandleHookRun(ctx context.Context, t task.Task) task.Result { ctx, span := otel.Tracer(serviceName).Start(ctx, "taskHandleHookRun") defer span.End() @@ -534,7 +579,7 @@ func (op *ShellOperator) taskHandleHookRun(ctx context.Context, t task.Task) que err := taskHook.RateLimitWait(context.Background()) if err != nil { // This could happen when the Context is canceled, so just repeat the task until the queue is stopped. - return queue.TaskResult{ + return task.Result{ Status: "Repeat", } } @@ -545,10 +590,10 @@ func (op *ShellOperator) taskHandleHookRun(ctx context.Context, t task.Task) que "queue": t.GetQueueName(), } taskWaitTime := time.Since(t.GetQueuedAt()).Seconds() - op.MetricStorage.CounterAdd("{PREFIX}task_wait_in_queue_seconds_total", taskWaitTime, metricLabels) + op.MetricStorage.CounterAdd(metrics.TaskWaitInQueueSecondsTotal, taskWaitTime, metricLabels) defer measure.Duration(func(d time.Duration) { - op.MetricStorage.HistogramObserve("{PREFIX}hook_run_seconds", d.Seconds(), metricLabels, nil) + op.MetricStorage.HistogramObserve(metrics.HookRunSeconds, d.Seconds(), metricLabels, nil) })() hookLogLabels := map[string]string{} @@ -595,7 +640,7 @@ func (op *ShellOperator) taskHandleHookRun(ctx context.Context, t task.Task) que } } - var res queue.TaskResult + var res task.Result // Default when shouldRunHook is false. res.Status = "Success" @@ -625,9 +670,9 @@ func (op *ShellOperator) taskHandleHookRun(ctx context.Context, t task.Task) que taskLogEntry.Info("Hook executed successfully") res.Status = "Success" } - op.MetricStorage.CounterAdd("{PREFIX}hook_run_allowed_errors_total", allowed, metricLabels) - op.MetricStorage.CounterAdd("{PREFIX}hook_run_errors_total", errors, metricLabels) - op.MetricStorage.CounterAdd("{PREFIX}hook_run_success_total", success, metricLabels) + op.MetricStorage.CounterAdd(metrics.HookRunAllowedErrorsTotal, allowed, metricLabels) + op.MetricStorage.CounterAdd(metrics.HookRunErrorsTotal, errors, metricLabels) + op.MetricStorage.CounterAdd(metrics.HookRunSuccessTotal, success, metricLabels) } // Unlock Kubernetes events for all monitors when Synchronization task is done. @@ -667,9 +712,9 @@ func (op *ShellOperator) handleRunHook(ctx context.Context, t task.Task, taskHoo if result.Usage != nil { taskLogEntry.Debug("Usage", slog.String("value", fmt.Sprintf("%+v", result.Usage))) - op.MetricStorage.HistogramObserve("{PREFIX}hook_run_sys_seconds", result.Usage.Sys.Seconds(), metricLabels, nil) - op.MetricStorage.HistogramObserve("{PREFIX}hook_run_user_seconds", result.Usage.User.Seconds(), metricLabels, nil) - op.MetricStorage.GaugeSet("{PREFIX}hook_run_max_rss_bytes", float64(result.Usage.MaxRss)*1024, metricLabels) + op.MetricStorage.HistogramObserve(metrics.HookRunSysCPUSeconds, result.Usage.Sys.Seconds(), metricLabels, nil) + op.MetricStorage.HistogramObserve(metrics.HookRunUserCPUSeconds, result.Usage.User.Seconds(), metricLabels, nil) + op.MetricStorage.GaugeSet(metrics.HookRunMaxRSSBytes, float64(result.Usage.MaxRss)*1024, metricLabels) } // Try to apply Kubernetes actions. @@ -717,7 +762,7 @@ func (op *ShellOperator) handleRunHook(ctx context.Context, t task.Task, taskHoo // If input task has no metadata, result will be nil. // Metadata should implement HookNameAccessor, BindingContextAccessor and MonitorIDAccessor interfaces. // DEV WARNING! Do not use HookMetadataAccessor here. Use only *Accessor interfaces because this method is used from addon-operator. -func (op *ShellOperator) CombineBindingContextForHook(q *queue.TaskQueue, t task.Task, stopCombineFn func(tsk task.Task) bool) *CombineResult { +func (op *ShellOperator) CombineBindingContextForHook(q task.TaskQueue, t task.Task, stopCombineFn func(tsk task.Task) bool) *CombineResult { if q == nil { return nil } @@ -901,6 +946,31 @@ func (op *ShellOperator) bootstrapMainQueue(tqs *queue.TaskQueueSet) { } } +func (op *ShellOperator) runMetrics() { + if op.MetricStorage == nil { + return + } + + // live ticks. + go func() { + for { + op.MetricStorage.CounterAdd("{PREFIX}live_ticks", 1.0, map[string]string{}) + time.Sleep(10 * time.Second) + } + }() + + // task queue length + go func() { + for { + op.TaskQueues.Iterate(func(queue task.TaskQueue) { + queueLen := float64(queue.Length()) + op.MetricStorage.GaugeSet("{PREFIX}tasks_queue_length", queueLen, map[string]string{"queue": queue.GetName()}) + }) + time.Sleep(5 * time.Second) + } + }() +} + // initAndStartHookQueues create all queues defined in hooks func (op *ShellOperator) initAndStartHookQueues() { schHooks, _ := op.HookManager.GetHooksInOrder(types.Schedule) @@ -934,31 +1004,6 @@ func (op *ShellOperator) initAndStartHookQueues() { } } -func (op *ShellOperator) runMetrics() { - if op.MetricStorage == nil { - return - } - - // live ticks. - go func() { - for { - op.MetricStorage.CounterAdd("{PREFIX}live_ticks", 1.0, map[string]string{}) - time.Sleep(10 * time.Second) - } - }() - - // task queue length - go func() { - for { - op.TaskQueues.Iterate(func(queue *queue.TaskQueue) { - queueLen := float64(queue.Length()) - op.MetricStorage.GaugeSet("{PREFIX}tasks_queue_length", queueLen, map[string]string{"queue": queue.Name}) - }) - time.Sleep(5 * time.Second) - } - }() -} - // Shutdown pause kubernetes events handling and stop queues. Wait for queues to stop. func (op *ShellOperator) Shutdown() { op.ScheduleManager.Stop() diff --git a/pkg/shell-operator/operator_test.go b/pkg/shell-operator/operator_test.go index a870a3fb..81610d44 100644 --- a/pkg/shell-operator/operator_test.go +++ b/pkg/shell-operator/operator_test.go @@ -1,3 +1,17 @@ +// Copyright 2025 Flant JSC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package shell_operator import ( @@ -8,10 +22,10 @@ import ( . "github.com/onsi/gomega" "github.com/stretchr/testify/assert" - "github.com/flant/shell-operator/internal/metrics" . "github.com/flant/shell-operator/pkg/hook/task_metadata" htypes "github.com/flant/shell-operator/pkg/hook/types" "github.com/flant/shell-operator/pkg/metric" + "github.com/flant/shell-operator/pkg/metrics" "github.com/flant/shell-operator/pkg/task" utils "github.com/flant/shell-operator/pkg/utils/file" ) @@ -35,8 +49,9 @@ func Test_Operator_startup_tasks(t *testing.T) { metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) { }) - op := NewShellOperator(context.Background(), WithLogger(log.NewNop())) + op := NewShellOperator(context.Background()) op.MetricStorage = metricStorage + op.logger = log.NewNop() op.SetupEventManagers() op.setupHookManagers(hooksDir, "") diff --git a/pkg/task/dump/dump.go b/pkg/task/dump/dump.go index b0ec1241..b2c215cf 100644 --- a/pkg/task/dump/dump.go +++ b/pkg/task/dump/dump.go @@ -40,7 +40,7 @@ func TaskMainQueue(tqs *queue.TaskQueueSet, format string) interface{} { } else { tasks := getTasksForQueue(q) dq = dumpQueue{ - Name: q.Name, + Name: q.GetName(), TasksCount: q.Length(), Status: q.GetStatus(), Tasks: tasks, @@ -78,16 +78,16 @@ func TaskQueues(tqs *queue.TaskQueueSet, format string, showEmpty bool) interfac tasksCount := 0 mainTasksCount := 0 - tqs.Iterate(func(queue *queue.TaskQueue) { + tqs.Iterate(func(queue task.TaskQueue) { if queue == nil { return } - if queue.Name == tqs.MainName { + if queue.GetName() == tqs.MainName { mainTasksCount = queue.Length() if queue.IsEmpty() { mainQueue := dumpQueue{ - Name: queue.Name, + Name: queue.GetName(), TasksCount: queue.Length(), } result.Empty = append(result.Empty, mainQueue) @@ -95,7 +95,7 @@ func TaskQueues(tqs *queue.TaskQueueSet, format string, showEmpty bool) interfac } else { tasks := getTasksForQueue(queue) mainQueue := dumpQueue{ - Name: queue.Name, + Name: queue.GetName(), TasksCount: queue.Length(), Status: queue.GetStatus(), Tasks: tasks, @@ -110,14 +110,14 @@ func TaskQueues(tqs *queue.TaskQueueSet, format string, showEmpty bool) interfac if queue.IsEmpty() { emptyQueues++ result.Empty = append(result.Empty, dumpQueue{ - Name: queue.Name, + Name: queue.GetName(), }) } else { activeQueues++ tasksCount += queue.Length() tasks := getTasksForQueue(queue) result.Active = append(result.Active, dumpQueue{ - Name: queue.Name, + Name: queue.GetName(), TasksCount: queue.Length(), Status: queue.GetStatus(), Tasks: tasks, @@ -156,7 +156,7 @@ func pluralize(n int, zero, one, many string) string { return fmt.Sprintf("%d %s", n, description) } -func getTasksForQueue(q *queue.TaskQueue) []dumpTask { +func getTasksForQueue(q task.TaskQueue) []dumpTask { tasks := make([]dumpTask, 0, q.Length()) index := 1 diff --git a/pkg/task/dump/dump_test.go b/pkg/task/dump/dump_test.go index c43e84b7..833ae7f6 100644 --- a/pkg/task/dump/dump_test.go +++ b/pkg/task/dump/dump_test.go @@ -11,9 +11,9 @@ import ( "github.com/stretchr/testify/assert" "sigs.k8s.io/yaml" - "github.com/flant/shell-operator/internal/metrics" "github.com/flant/shell-operator/pkg/hook/task_metadata" "github.com/flant/shell-operator/pkg/metric" + "github.com/flant/shell-operator/pkg/metrics" "github.com/flant/shell-operator/pkg/task" "github.com/flant/shell-operator/pkg/task/queue" ) @@ -55,11 +55,11 @@ func Test_Sort_ByNamespaceAndName(t *testing.T) { } } -func fillQueue(q *queue.TaskQueue, n int) { +func fillQueue(q task.TaskQueue, n int) { for i := 0; i < n; i++ { - t := &task.BaseTask{Id: fmt.Sprintf("test_task_%s_%04d", q.Name, i)} + t := &task.BaseTask{Id: fmt.Sprintf("test_task_%s_%04d", q.GetName(), i)} t.WithMetadata(task_metadata.HookMetadata{ - HookName: fmt.Sprintf("test_task_%s_%04d", q.Name, i), + HookName: fmt.Sprintf("test_task_%s_%04d", q.GetName(), i), }) q.AddFirst(t) } diff --git a/pkg/task/queue.go b/pkg/task/queue.go new file mode 100644 index 00000000..658f8a2b --- /dev/null +++ b/pkg/task/queue.go @@ -0,0 +1,59 @@ +package task + +import ( + "context" + "time" +) + +type TaskQueue interface { + // Status and lifecycle methods + GetStatus() string + SetStatus(status string) + GetName() string + GetHandler() func(ctx context.Context, t Task) Result + SetHandler(handler func(ctx context.Context, t Task) Result) + Start(ctx context.Context) + Stop() + + // Queue operations + IsEmpty() bool + Length() int + + // Add operations + AddFirst(tasks ...Task) + AddLast(tasks ...Task) + AddAfter(id string, tasks ...Task) + AddBefore(id string, newTask Task) + + // Remove operations + RemoveFirst() Task + RemoveLast() Task + Remove(id string) Task + + // Get operations + GetFirst() Task + GetLast() Task + Get(id string) Task + + // Iteration and filtering + Iterate(doFn func(Task)) + Filter(filterFn func(Task) bool) + + // Utility methods + CancelTaskDelay() + MeasureActionTime(action string) func() + String() string +} + +type TaskQueueSet interface { + Stop() + StartMain(ctx context.Context) + Start(ctx context.Context) + Add(queue TaskQueue) + GetByName(name string) TaskQueue + GetMain() TaskQueue + DoWithLock(fn func(tqs TaskQueueSet)) + Iterate(doFn func(queue TaskQueue)) + Remove(name string) + WaitStopWithTimeout(timeout time.Duration) +} diff --git a/pkg/task/queue/queue_set.go b/pkg/task/queue/queue_set.go index e4d7517c..317c7dc9 100644 --- a/pkg/task/queue/queue_set.go +++ b/pkg/task/queue/queue_set.go @@ -23,12 +23,12 @@ type TaskQueueSet struct { cancel context.CancelFunc m sync.RWMutex - Queues map[string]*TaskQueue + Queues map[string]task.TaskQueue } func NewTaskQueueSet() *TaskQueueSet { return &TaskQueueSet{ - Queues: make(map[string]*TaskQueue), + Queues: make(map[string]task.TaskQueue), MainName: MainQueueName, } } @@ -69,13 +69,13 @@ func (tqs *TaskQueueSet) Start(ctx context.Context) { tqs.m.RUnlock() } -func (tqs *TaskQueueSet) Add(queue *TaskQueue) { +func (tqs *TaskQueueSet) Add(queue task.TaskQueue) { tqs.m.Lock() - tqs.Queues[queue.Name] = queue + tqs.Queues[queue.GetName()] = queue tqs.m.Unlock() } -func (tqs *TaskQueueSet) NewNamedQueue(name string, handler func(ctx context.Context, t task.Task) TaskResult, opts ...TaskQueueOption) { +func (tqs *TaskQueueSet) NewNamedQueue(name string, handler func(ctx context.Context, t task.Task) task.Result, opts ...TaskQueueOption) { q := NewTasksQueue( tqs.metricStorage, WithName(name), @@ -96,7 +96,7 @@ func (tqs *TaskQueueSet) NewNamedQueue(name string, handler func(ctx context.Con tqs.m.Unlock() } -func (tqs *TaskQueueSet) GetByName(name string) *TaskQueue { +func (tqs *TaskQueueSet) GetByName(name string) task.TaskQueue { tqs.m.RLock() defer tqs.m.RUnlock() ts, exists := tqs.Queues[name] @@ -106,7 +106,7 @@ func (tqs *TaskQueueSet) GetByName(name string) *TaskQueue { return nil } -func (tqs *TaskQueueSet) GetMain() *TaskQueue { +func (tqs *TaskQueueSet) GetMain() task.TaskQueue { return tqs.GetByName(tqs.MainName) } @@ -115,7 +115,7 @@ func (tqs *TaskQueueSet) GetMain() *TaskQueue { tqs.GetMain().Pop() }) */ -func (tqs *TaskQueueSet) DoWithLock(fn func(tqs *TaskQueueSet)) { +func (tqs *TaskQueueSet) DoWithLock(fn func(tqs task.TaskQueueSet)) { tqs.m.Lock() defer tqs.m.Unlock() if fn != nil { @@ -124,7 +124,7 @@ func (tqs *TaskQueueSet) DoWithLock(fn func(tqs *TaskQueueSet)) { } // Iterate run doFn for every task. -func (tqs *TaskQueueSet) Iterate(doFn func(queue *TaskQueue)) { +func (tqs *TaskQueueSet) Iterate(doFn func(queue task.TaskQueue)) { if doFn == nil { return } @@ -142,7 +142,7 @@ func (tqs *TaskQueueSet) Iterate(doFn func(queue *TaskQueue)) { // TODO sort names for _, q := range tqs.Queues { - if q.Name != tqs.MainName { + if q.GetName() != tqs.MainName { doFn(q) } } @@ -171,7 +171,7 @@ func (tqs *TaskQueueSet) WaitStopWithTimeout(timeout time.Duration) { stopped := true tqs.m.RLock() for _, q := range tqs.Queues { - if q.Status != "stop" { + if q.GetStatus() != "stop" { stopped = false break } diff --git a/pkg/task/queue/task_counter.go b/pkg/task/queue/task_counter.go index 100a255b..d6ae3e9c 100644 --- a/pkg/task/queue/task_counter.go +++ b/pkg/task/queue/task_counter.go @@ -3,7 +3,7 @@ package queue import ( metricsstorage "github.com/deckhouse/deckhouse/pkg/metrics-storage" - "github.com/flant/shell-operator/internal/metrics" + "github.com/flant/shell-operator/pkg/metrics" "github.com/flant/shell-operator/pkg/task" ) diff --git a/pkg/task/queue/task_queue.go b/pkg/task/queue/task_queue.go deleted file mode 100644 index 5d8003da..00000000 --- a/pkg/task/queue/task_queue.go +++ /dev/null @@ -1,931 +0,0 @@ -// !DEPRECATED -package queue - -import ( - "context" - "fmt" - "log/slog" - "os" - "sort" - "strings" - "sync" - "time" - - "github.com/deckhouse/deckhouse/pkg/log" - metricsstorage "github.com/deckhouse/deckhouse/pkg/metrics-storage" - - "github.com/flant/shell-operator/internal/metrics" - bindingcontext "github.com/flant/shell-operator/pkg/hook/binding_context" - "github.com/flant/shell-operator/pkg/hook/task_metadata" - "github.com/flant/shell-operator/pkg/task" - "github.com/flant/shell-operator/pkg/utils/exponential_backoff" - "github.com/flant/shell-operator/pkg/utils/measure" -) - -/* -WARNING: This file is deprecated and will be removed in the future. -Mainly used for benchmark purposes. -*/ - -type TaskQueueSlice struct { - logger *log.Logger - - m sync.RWMutex - metricStorage metricsstorage.Storage - ctx context.Context - cancel context.CancelFunc - - waitMu sync.Mutex - waitInProgress bool - cancelDelay bool - - isCompactable bool - CompactableTypes map[task.TaskType]struct{} - - items []task.Task - started bool // a flag to ignore multiple starts - - // Log debug messages if true. - debug bool - - Name string - Handler func(ctx context.Context, t task.Task) TaskResult - Status string - - // Callback for task compaction events - CompactionCallback func(compactedTasks []task.Task, targetTask task.Task) - - measureActionFn func() - measureActionFnOnce sync.Once - - // Timing settings. - WaitLoopCheckInterval time.Duration - DelayOnQueueIsEmpty time.Duration - DelayOnRepeat time.Duration - ExponentialBackoffFn func(failureCount int) time.Duration -} - -func NewTasksQueueSlice() *TaskQueueSlice { - return &TaskQueueSlice{ - items: make([]task.Task, 0), - // Default timings - logger: log.NewNop(), - WaitLoopCheckInterval: DefaultWaitLoopCheckInterval, - DelayOnQueueIsEmpty: DefaultDelayOnQueueIsEmpty, - DelayOnRepeat: DefaultDelayOnRepeat, - ExponentialBackoffFn: func(failureCount int) time.Duration { - return exponential_backoff.CalculateDelay(DefaultInitialDelayOnFailedTask, failureCount) - }, - } -} - -func (q *TaskQueueSlice) WithContext(ctx context.Context) { - q.ctx, q.cancel = context.WithCancel(ctx) -} - -func (q *TaskQueueSlice) WithLogger(logger *log.Logger) { - q.logger = logger -} - -func (q *TaskQueueSlice) WithMetricStorage(mstor metricsstorage.Storage) *TaskQueueSlice { - q.metricStorage = mstor - - return q -} - -func (q *TaskQueueSlice) WithCompactableTypes(taskTypes []task.TaskType) *TaskQueueSlice { - q.CompactableTypes = make(map[task.TaskType]struct{}, len(taskTypes)) - for _, taskType := range taskTypes { - q.CompactableTypes[taskType] = struct{}{} - } - return q -} - -func (q *TaskQueueSlice) WithName(name string) *TaskQueueSlice { - q.Name = name - return q -} - -func (q *TaskQueueSlice) WithHandler(fn func(ctx context.Context, t task.Task) TaskResult) *TaskQueueSlice { - q.Handler = fn - return q -} - -func (q *TaskQueueSlice) WithCompactionCallback(callback func(compactedTasks []task.Task, targetTask task.Task)) *TaskQueueSlice { - q.CompactionCallback = callback - return q -} - -// MeasureActionTime is a helper to measure execution time of queue's actions -func (q *TaskQueueSlice) MeasureActionTime(action string) func() { - if q.metricStorage == nil { - return func() {} - } - q.measureActionFnOnce.Do(func() { - if os.Getenv("QUEUE_ACTIONS_METRICS") == "no" { - q.measureActionFn = func() {} - } else { - q.measureActionFn = measure.Duration(func(d time.Duration) { - q.metricStorage.HistogramObserve(metrics.TasksQueueActionDurationSeconds, d.Seconds(), map[string]string{"queue_name": q.Name, "queue_action": action}, nil) - }) - } - }) - return q.measureActionFn -} - -func (q *TaskQueueSlice) GetStatus() string { - defer q.MeasureActionTime("GetStatus")() - q.m.RLock() - defer q.m.RUnlock() - return q.Status -} - -func (q *TaskQueueSlice) SetStatus(status string) { - q.m.Lock() - q.Status = status - q.m.Unlock() -} - -func (q *TaskQueueSlice) IsEmpty() bool { - defer q.MeasureActionTime("IsEmpty")() - q.m.RLock() - defer q.m.RUnlock() - isEmpty := q.isEmpty() - - return isEmpty -} - -func (q *TaskQueueSlice) isEmpty() bool { - return len(q.items) == 0 -} - -func (q *TaskQueueSlice) Length() int { - defer q.MeasureActionTime("Length")() - q.m.RLock() - defer q.m.RUnlock() - return len(q.items) -} - -// AddFirst adds new head element. -func (q *TaskQueueSlice) AddFirst(tasks ...task.Task) { - defer q.MeasureActionTime("AddFirst")() - q.withLock(func() { - q.addFirst(tasks...) - }) -} - -// addFirst adds new head element. -func (q *TaskQueueSlice) addFirst(tasks ...task.Task) { - // Also, add tasks in reverse order - // at the start of the queue. The first task in HeadTasks - // become the new first task in the queue. - for i := len(tasks) - 1; i >= 0; i-- { - q.items = append([]task.Task{tasks[i]}, q.items...) - } -} - -// RemoveFirst deletes a head element, so head is moved. -func (q *TaskQueueSlice) RemoveFirst() task.Task { - defer q.MeasureActionTime("RemoveFirst")() - var t task.Task - - q.withLock(func() { - t = q.removeFirst() - }) - - return t -} - -// removeFirst deletes a head element, so head is moved. -func (q *TaskQueueSlice) removeFirst() task.Task { - if q.isEmpty() { - return nil - } - - t := q.items[0] - q.items = q.items[1:] - - return t -} - -// GetFirst returns a head element. -func (q *TaskQueueSlice) GetFirst() task.Task { - defer q.MeasureActionTime("GetFirst")() - q.m.RLock() - defer q.m.RUnlock() - if q.isEmpty() { - return nil - } - task := q.items[0] - return task -} - -// AddLast adds new tail element. -func (q *TaskQueueSlice) AddLast(tasks ...task.Task) { - defer q.MeasureActionTime("AddLast")() - q.withLock(func() { - q.addLast(tasks...) - }) -} - -// addFirst adds new tail element. -func (q *TaskQueueSlice) addLast(tasks ...task.Task) { - for _, t := range tasks { - q.items = append(q.items, t) - taskType := t.GetType() - - if q.isCompactable { - continue - } - - if _, ok := q.CompactableTypes[taskType]; ok { - q.isCompactable = true - } - } - - if q.isCompactable && len(q.items) > 100 { - q.compaction() - q.isCompactable = false - } -} - -// compaction merges HookRun tasks for the same hook. -// DEV WARNING! Do not use HookMetadataAccessor here. Use only *Accessor interfaces because this method is used from addon-operator. -func (q *TaskQueueSlice) compaction() { - if len(q.items) == 0 { - return - } - - // Предварительно выделяем память для результата - result := make([]task.Task, 0, len(q.items)) - - hookGroups := make(map[string][]int, 10) // hookName -> []indices - var hookOrder []string - - for i, task := range q.items { - if _, ok := q.CompactableTypes[task.GetType()]; !ok { - result = append(result, task) - continue - } - hm := task.GetMetadata() - if isNil(hm) || task.IsProcessing() { - result = append(result, task) // Nil metadata и processing задачи сразу в результат - continue - } - - // Safety check to ensure we can access hook name - hookNameAccessor, ok := hm.(task_metadata.HookNameAccessor) - if !ok { - result = append(result, task) // Cannot access hook name, skip compaction - continue - } - hookName := hookNameAccessor.GetHookName() - if _, exists := hookGroups[hookName]; !exists { - hookOrder = append(hookOrder, hookName) - } - hookGroups[hookName] = append(hookGroups[hookName], i) - } - - // Обрабатываем группы хуков - O(N) в худшем случае - for _, hookName := range hookOrder { - indices := hookGroups[hookName] - - if len(indices) == 1 { - // Только одна задача - добавляем как есть - result = append(result, q.items[indices[0]]) - continue - } - - // Находим задачу с минимальным индексом как целевую - minIndex := indices[0] - for _, idx := range indices { - if idx < minIndex { - minIndex = idx - } - } - - // Safety check to ensure minIndex is valid - if minIndex < 0 || minIndex >= len(q.items) { - continue - } - - targetTask := q.items[minIndex] - targetHm := targetTask.GetMetadata() - if targetHm == nil { - continue - } - - // Safety checks for type assertions - bindingContextAccessor, ok := targetHm.(task_metadata.BindingContextAccessor) - if !ok { - continue - } - monitorIDAccessor, ok := targetHm.(task_metadata.MonitorIDAccessor) - if !ok { - continue - } - - contexts := bindingContextAccessor.GetBindingContext() - monitorIDs := monitorIDAccessor.GetMonitorIDs() - // Предварительно вычисляем общий размер - totalContexts := len(contexts) - totalMonitorIDs := len(monitorIDs) - - for _, idx := range indices { - if idx == minIndex { - continue // Пропускаем целевую задачу - } - existingHm := q.items[idx].GetMetadata() - if existingHm != nil { - if bindingContextAccessor, ok := existingHm.(task_metadata.BindingContextAccessor); ok { - totalContexts += len(bindingContextAccessor.GetBindingContext()) - } - if monitorIDAccessor, ok := existingHm.(task_metadata.MonitorIDAccessor); ok { - totalMonitorIDs += len(monitorIDAccessor.GetMonitorIDs()) - } - } - } - - // Создаем новые слайсы с правильным размером - // Safety check to ensure we don't create negative-sized slices - if totalContexts < 0 { - totalContexts = 0 - } - if totalMonitorIDs < 0 { - totalMonitorIDs = 0 - } - newContexts := make([]bindingcontext.BindingContext, totalContexts) - newMonitorIDs := make([]string, totalMonitorIDs) - - // Копируем контексты целевой задачи - if len(contexts) > 0 && len(newContexts) > 0 { - copySize := len(contexts) - if copySize > len(newContexts) { - copySize = len(newContexts) - } - copy(newContexts[:copySize], contexts[:copySize]) - } - if len(monitorIDs) > 0 && len(newMonitorIDs) > 0 { - copySize := len(monitorIDs) - if copySize > len(newMonitorIDs) { - copySize = len(newMonitorIDs) - } - copy(newMonitorIDs[:copySize], monitorIDs[:copySize]) - } - - // Копируем контексты от остальных задач - contextIndex := len(contexts) - monitorIndex := len(monitorIDs) - - for _, idx := range indices { - if idx == minIndex { - continue - } - // Safety check to ensure idx is valid - if idx < 0 || idx >= len(q.items) { - continue - } - existingHm := q.items[idx].GetMetadata() - if existingHm == nil { - continue - } - - // Safety checks for type assertions - bindingContextAccessor, ok := existingHm.(task_metadata.BindingContextAccessor) - if !ok { - continue - } - monitorIDAccessor, ok := existingHm.(task_metadata.MonitorIDAccessor) - if !ok { - continue - } - - existingContexts := bindingContextAccessor.GetBindingContext() - existingMonitorIDs := monitorIDAccessor.GetMonitorIDs() - - if len(existingContexts) > 0 && contextIndex < len(newContexts) { - // Safety check to ensure we don't exceed slice bounds - remainingSpace := len(newContexts) - contextIndex - if remainingSpace > 0 { - copySize := len(existingContexts) - if copySize > remainingSpace { - copySize = remainingSpace - } - copy(newContexts[contextIndex:contextIndex+copySize], existingContexts[:copySize]) - } - } - contextIndex += len(existingContexts) - - if len(existingMonitorIDs) > 0 && monitorIndex < len(newMonitorIDs) { - // Safety check to ensure we don't exceed slice bounds - remainingSpace := len(newMonitorIDs) - monitorIndex - if remainingSpace > 0 { - copySize := len(existingMonitorIDs) - if copySize > remainingSpace { - copySize = remainingSpace - } - copy(newMonitorIDs[monitorIndex:monitorIndex+copySize], existingMonitorIDs[:copySize]) - } - } - monitorIndex += len(existingMonitorIDs) - } - - // Обновляем метаданные - bindingContextSetter, ok := targetHm.(task_metadata.BindingContextSetter) - if !ok { - continue - } - withContext := bindingContextSetter.SetBindingContext(compactBindingContexts(newContexts)) - - monitorIDSetter, ok := withContext.(task_metadata.MonitorIDSetter) - if !ok { - continue - } - withContext = monitorIDSetter.SetMonitorIDs(newMonitorIDs) - targetTask.UpdateMetadata(withContext) - - // Просто добавляем в конец, потом отсортируем - result = append(result, targetTask) - - // Call compaction callback if set - if q.CompactionCallback != nil && len(indices) > 1 { - compactedTasks := make([]task.Task, 0, len(indices)-1) - for _, idx := range indices { - if idx != minIndex { - compactedTasks = append(compactedTasks, q.items[idx]) - } - } - q.CompactionCallback(compactedTasks, targetTask) - } - } - - positionMap := make(map[task.Task]int, len(q.items)) - for i, task := range q.items { - positionMap[task] = i - } - - sort.Slice(result, func(i, j int) bool { - posI := positionMap[result[i]] - posJ := positionMap[result[j]] - return posI < posJ - }) - - q.items = result - // reset dirty flag - q.isCompactable = false -} - -// RemoveLast deletes a tail element, so tail is moved. -func (q *TaskQueueSlice) RemoveLast() task.Task { - defer q.MeasureActionTime("RemoveLast")() - var t task.Task - - q.withLock(func() { - t = q.removeLast() - }) - - return t -} - -// RemoveLast deletes a tail element, so tail is moved. -func (q *TaskQueueSlice) removeLast() task.Task { - if q.isEmpty() { - return nil - } - - t := q.items[len(q.items)-1] - if len(q.items) == 1 { - q.items = make([]task.Task, 0) - } else { - q.items = q.items[:len(q.items)-1] - } - - return t -} - -// GetLast returns a tail element. -func (q *TaskQueueSlice) GetLast() task.Task { - defer q.MeasureActionTime("GetLast")() - var t task.Task - - q.withRLock(func() { - t = q.getLast() - }) - - return t -} - -// GetLast returns a tail element. -func (q *TaskQueueSlice) getLast() task.Task { - if q.isEmpty() { - return nil - } - - return q.items[len(q.items)-1] -} - -// Get returns a task by id. -func (q *TaskQueueSlice) Get(id string) task.Task { - defer q.MeasureActionTime("Get")() - var t task.Task - - q.withRLock(func() { - t = q.get(id) - }) - - return t -} - -// Get returns a task by id. -func (q *TaskQueueSlice) get(id string) task.Task { - for _, t := range q.items { - if t.GetId() == id { - return t - } - } - - return nil -} - -// AddAfter inserts a task after the task with specified id. -func (q *TaskQueueSlice) AddAfter(id string, newTask task.Task) { - defer q.MeasureActionTime("AddAfter")() - q.withLock(func() { - q.addAfter(id, newTask) - }) -} - -// addAfter inserts a task after the task with specified id. -func (q *TaskQueueSlice) addAfter(id string, newTask task.Task) { - newItems := make([]task.Task, len(q.items)+1) - - idFound := false - for i, t := range q.items { - if !idFound { - // copy task while id not found - newItems[i] = t - if t.GetId() == id { - idFound = true - // when id is found, inject new task after task with equal id - newItems[i+1] = newTask - } - } else { - // when id is found, copy other tasks to i+1 position - newItems[i+1] = t - } - } - - if !idFound { - newItems[len(q.items)] = newTask - } - - q.items = newItems -} - -// AddBefore inserts a task before the task with specified id. -func (q *TaskQueueSlice) AddBefore(id string, newTask task.Task) { - defer q.MeasureActionTime("AddBefore")() - q.withLock(func() { - q.addBefore(id, newTask) - }) -} - -// addBefore inserts a task before the task with specified id. -func (q *TaskQueueSlice) addBefore(id string, newTask task.Task) { - newItems := make([]task.Task, len(q.items)+1) - - idFound := false - for i, t := range q.items { - if !idFound { - if t.GetId() != id { - // copy task while id not found - newItems[i] = t - } else { - idFound = true - // when id is found, inject newTask to a current position - // and copy current task to i+1 position - newItems[i] = newTask - newItems[i+1] = t - } - } else { - // when id is found, copy other taskы to i+1 position - newItems[i+1] = t - } - } - - q.items = newItems -} - -// Remove finds element by id and deletes it. -func (q *TaskQueueSlice) Remove(id string) task.Task { - defer q.MeasureActionTime("Remove")() - var t task.Task - - q.withLock(func() { - t = q.remove(id) - }) - - return t -} - -func (q *TaskQueueSlice) remove(id string) task.Task { - delId := -1 - for i, item := range q.items { - if item.GetId() == id { - delId = i - break - } - } - if delId == -1 { - return nil - } - - t := q.items[delId] - q.items = append(q.items[:delId], q.items[delId+1:]...) - - return t -} - -func (q *TaskQueueSlice) SetDebug(debug bool) { - q.debug = debug -} - -func (q *TaskQueueSlice) debugf(format string, args ...interface{}) { - if !q.debug { - return - } - log.Debug("DEBUG", fmt.Sprintf(format, args...)) -} - -func (q *TaskQueueSlice) Stop() { - if q.cancel != nil { - q.cancel() - } -} - -func (q *TaskQueueSlice) Start(ctx context.Context) { - if q.started { - return - } - - if q.Handler == nil { - log.Error("should set handler before start in queue", slog.String("name", q.Name)) - q.SetStatus("no handler set") - return - } - - go func() { - q.SetStatus("") - var sleepDelay time.Duration - for { - q.debugf("queue %s: wait for task, delay %d", q.Name, sleepDelay) - t := q.waitForTask(sleepDelay) - if t == nil { - q.SetStatus("stop") - log.Info("queue stopped", slog.String("name", q.Name)) - return - } - - // dump task and a whole queue - q.debugf("queue %s: tasks after wait %s", q.Name, q.String()) - q.debugf("queue %s: task to handle '%s'", q.Name, t.GetType()) - - // compact queue if it's dirty - q.withLock(func() { - if q.isCompactable { - q.compaction() - q.isCompactable = false - } - }) - - // set that current task is being processed, so we don't merge it with other tasks - t.SetProcessing(true) - - // Now the task can be handled! - var nextSleepDelay time.Duration - q.SetStatus("run first task") - taskRes := q.Handler(ctx, t) - - // Check Done channel after long-running operation. - select { - case <-q.ctx.Done(): - log.Info("queue stopped after task handling", slog.String("name", q.Name)) - q.SetStatus("stop") - return - default: - } - - switch taskRes.Status { - case Fail: - // Reset processing flag for failed task - t.SetProcessing(false) - // Exponential backoff delay before retry. - nextSleepDelay = q.ExponentialBackoffFn(t.GetFailureCount()) - t.IncrementFailureCount() - q.SetStatus(fmt.Sprintf("sleep after fail for %s", nextSleepDelay.String())) - case Success, Keep: - // Insert new tasks right after the current task in reverse order. - q.withLock(func() { - for i := len(taskRes.afterTasks) - 1; i >= 0; i-- { - q.addAfter(t.GetId(), taskRes.afterTasks[i]) - } - // Remove current task on success. - if taskRes.Status == Success { - q.remove(t.GetId()) - } else { - // Reset processing flag for kept task - t.SetProcessing(false) - } - - q.addFirst(taskRes.headTasks...) - - // Add tasks to the end of the queue - q.addLast(taskRes.GetTailTasks()...) - }) - q.SetStatus("") - case Repeat: - // Reset processing flag for repeated task - t.SetProcessing(false) - // repeat a current task after a small delay - nextSleepDelay = q.DelayOnRepeat - q.SetStatus("repeat head task") - } - - if taskRes.DelayBeforeNextTask != 0 { - nextSleepDelay = taskRes.DelayBeforeNextTask - q.SetStatus(fmt.Sprintf("sleep for %s", nextSleepDelay.String())) - } - - sleepDelay = nextSleepDelay - - if taskRes.AfterHandle != nil { - taskRes.AfterHandle() - } - - q.debugf("queue %s: tasks after handle %s", q.Name, q.String()) - } - }() - q.started = true -} - -// waitForTask returns a task that can be processed or a nil if context is canceled. -// sleepDelay is used to sleep before check a task, e.g. in case of failed previous task. -// If queue is empty, then it will be checked every DelayOnQueueIsEmpty. -func (q *TaskQueueSlice) waitForTask(sleepDelay time.Duration) task.Task { - // Check Done channel. - select { - case <-q.ctx.Done(): - return nil - default: - } - - // Shortcut: return the first task if the queue is not empty and delay is not required. - if !q.IsEmpty() && sleepDelay == 0 { - return q.GetFirst() - } - - // Initialize wait settings. - waitBegin := time.Now() - waitUntil := q.DelayOnQueueIsEmpty - if sleepDelay != 0 { - waitUntil = sleepDelay - } - - checkTicker := time.NewTicker(q.WaitLoopCheckInterval) - q.waitMu.Lock() - q.waitInProgress = true - q.cancelDelay = false - q.waitMu.Unlock() - - origStatus := q.GetStatus() - - defer func() { - checkTicker.Stop() - q.waitMu.Lock() - q.waitInProgress = false - q.cancelDelay = false - q.waitMu.Unlock() - q.SetStatus(origStatus) - }() - - // Wait for the queued task with some delay. - // Every tick increases the 'elapsed' counter until it outgrows the waitUntil value. - // Or, delay can be canceled to handle new head task immediately. - for { - checkTask := false - select { - case <-q.ctx.Done(): - // Queue is stopped. - return nil - case <-checkTicker.C: - // Check and update waitUntil. - elapsed := time.Since(waitBegin) - - q.waitMu.Lock() - if q.cancelDelay { - // Reset waitUntil to check task immediately. - waitUntil = elapsed - } - q.waitMu.Unlock() - - // Wait loop is done or canceled: break select to check for the head task. - if elapsed >= waitUntil { - // Increase waitUntil to wait on the next iteration and go check for the head task. - checkTask = true - } - } - - // Break the for-loop to see if the head task can be returned. - if checkTask { - if q.IsEmpty() { - // No task to return: increase wait time. - waitUntil += q.DelayOnQueueIsEmpty - } else { - return q.GetFirst() - } - } - - // Wait loop still in progress: update queue status. - waitTime := time.Since(waitBegin).Truncate(time.Second) - if sleepDelay == 0 { - q.SetStatus(fmt.Sprintf("waiting for task %s", waitTime.String())) - } else { - delay := sleepDelay.Truncate(time.Second) - q.SetStatus(fmt.Sprintf("%s (%s left of %s delay)", origStatus, (delay - waitTime).String(), delay.String())) - } - } -} - -// CancelTaskDelay breaks wait loop. Useful to break the possible long sleep delay. -func (q *TaskQueueSlice) CancelTaskDelay() { - q.waitMu.Lock() - if q.waitInProgress { - q.cancelDelay = true - } - q.waitMu.Unlock() -} - -// Iterate run doFn for every task. -func (q *TaskQueueSlice) Iterate(doFn func(task.Task)) { - if doFn == nil { - return - } - - defer q.MeasureActionTime("Iterate")() - - q.withRLock(func() { - for _, t := range q.items { - doFn(t) - } - }) -} - -// Filter run filterFn on every task and remove each with false result. -func (q *TaskQueueSlice) Filter(filterFn func(task.Task) bool) { - if filterFn == nil { - return - } - - defer q.MeasureActionTime("Filter")() - - q.withLock(func() { - newItems := make([]task.Task, 0) - for _, t := range q.items { - if filterFn(t) { - newItems = append(newItems, t) - } - } - q.items = newItems - }) -} - -// TODO define mapping method with QueueAction to insert, modify and delete tasks. - -// Dump tasks in queue to one line -func (q *TaskQueueSlice) String() string { - var buf strings.Builder - var index int - qLen := q.Length() - q.Iterate(func(t task.Task) { - buf.WriteString(fmt.Sprintf("[%s,id=%10.10s]", t.GetDescription(), t.GetId())) - index++ - if index == qLen { - return - } - buf.WriteString(", ") - }) - - return buf.String() -} - -func (q *TaskQueueSlice) withLock(fn func()) { - q.m.Lock() - fn() - q.m.Unlock() -} - -func (q *TaskQueueSlice) withRLock(fn func()) { - q.m.RLock() - fn() - q.m.RUnlock() -} diff --git a/pkg/task/queue/task_queue_benchmark_test.go b/pkg/task/queue/task_queue_benchmark_test.go index 8cdb8950..58c2d150 100644 --- a/pkg/task/queue/task_queue_benchmark_test.go +++ b/pkg/task/queue/task_queue_benchmark_test.go @@ -140,47 +140,6 @@ func benchmarkGetByID(b *testing.B, queue Queue, size int) { } } -/* Old code */ -func BenchmarkTaskQueueSlice_AddLast_100(b *testing.B) { - benchmarkAddLast(b, NewTasksQueueSlice(), 100) -} - -func BenchmarkTaskQueueSlice_AddLast_1000(b *testing.B) { - benchmarkAddLast(b, NewTasksQueueSlice(), 1000) -} - -func BenchmarkTaskQueueSlice_AddFirst_100(b *testing.B) { - benchmarkAddFirst(b, NewTasksQueueSlice(), 100) -} - -func BenchmarkTaskQueueSlice_AddFirst_1000(b *testing.B) { - benchmarkAddFirst(b, NewTasksQueueSlice(), 1000) -} - -func BenchmarkTaskQueueSlice_RemoveFirst_100(b *testing.B) { - benchmarkRemoveFirst(b, NewTasksQueueSlice(), 100) -} - -func BenchmarkTaskQueueSlice_RemoveFirst_1000(b *testing.B) { - benchmarkRemoveFirst(b, NewTasksQueueSlice(), 1000) -} - -func BenchmarkTaskQueueSlice_GetFirst_100(b *testing.B) { - benchmarkGetFirst(b, NewTasksQueueSlice(), 100) -} - -func BenchmarkTaskQueueSlice_GetFirst_1000(b *testing.B) { - benchmarkGetFirst(b, NewTasksQueueSlice(), 1000) -} - -func BenchmarkTaskQueueSlice_GetByID_100(b *testing.B) { - benchmarkGetByID(b, NewTasksQueueSlice(), 100) -} - -func BenchmarkTaskQueueSlice_GetByID_1000(b *testing.B) { - benchmarkGetByID(b, NewTasksQueueSlice(), 1000) -} - /* New code */ func newBenchmarkTasksQueue(b *testing.B) *TaskQueue { @@ -294,37 +253,6 @@ func benchmarkTaskQueueCompaction(b *testing.B, size int) { } } -func benchmarkTaskQueueSliceCompaction(b *testing.B, size int) { - for i := 0; i < b.N; i++ { - b.StopTimer() - q := NewTasksQueueSlice() - q.WithCompactableTypes([]task.TaskType{task_metadata.HookRun}) - tasks := createCompactionBenchmarkData(b, size) - // Setup queue without triggering compaction - q.items = append(q.items, tasks...) - - b.StartTimer() - q.compaction() - } -} - -/* Old code */ -func BenchmarkTaskQueueSlice_Compaction_10(b *testing.B) { - benchmarkTaskQueueSliceCompaction(b, 10) -} - -func BenchmarkTaskQueueSlice_Compaction_100(b *testing.B) { - benchmarkTaskQueueSliceCompaction(b, 100) -} - -func BenchmarkTaskQueueSlice_Compaction_500(b *testing.B) { - benchmarkTaskQueueSliceCompaction(b, 500) -} - -func BenchmarkTaskQueueSlice_Compaction_1000(b *testing.B) { - benchmarkTaskQueueSliceCompaction(b, 1000) -} - /* New code */ func BenchmarkTaskQueue_Compaction_10(b *testing.B) { benchmarkTaskQueueCompaction(b, 10) diff --git a/pkg/task/queue/task_queue_list.go b/pkg/task/queue/task_queue_list.go index 9177af08..0f0aa62a 100644 --- a/pkg/task/queue/task_queue_list.go +++ b/pkg/task/queue/task_queue_list.go @@ -13,9 +13,9 @@ import ( "github.com/deckhouse/deckhouse/pkg/log" metricsstorage "github.com/deckhouse/deckhouse/pkg/metrics-storage" - "github.com/flant/shell-operator/internal/metrics" bindingcontext "github.com/flant/shell-operator/pkg/hook/binding_context" "github.com/flant/shell-operator/pkg/hook/task_metadata" + "github.com/flant/shell-operator/pkg/metrics" "github.com/flant/shell-operator/pkg/task" "github.com/flant/shell-operator/pkg/utils/exponential_backoff" "github.com/flant/shell-operator/pkg/utils/list" @@ -34,6 +34,8 @@ config parameter. This implementation uses container/list for O(1) queue operations and a map for O(1) task lookup by ID. */ +const compactionThreshold = 100 + var ( DefaultWaitLoopCheckInterval = 125 * time.Millisecond DefaultDelayOnQueueIsEmpty = 250 * time.Millisecond @@ -104,9 +106,9 @@ type TaskQueue struct { started bool // a flag to ignore multiple starts - Name string - Handler func(ctx context.Context, t task.Task) TaskResult - Status string + name string + handler func(ctx context.Context, t task.Task) task.Result + status string measureActionFn func() measureActionFnOnce sync.Once @@ -143,14 +145,14 @@ func WithContext(ctx context.Context) TaskQueueOption { // WithName sets the name for the TaskQueue func WithName(name string) TaskQueueOption { return func(q *TaskQueue) { - q.Name = name + q.name = name } } // WithHandler sets the task handler for the TaskQueue -func WithHandler(fn func(ctx context.Context, t task.Task) TaskResult) TaskQueueOption { +func WithHandler(fn func(ctx context.Context, t task.Task) task.Result) TaskQueueOption { return func(q *TaskQueue) { - q.Handler = fn + q.handler = fn } } @@ -201,7 +203,7 @@ func NewTasksQueue(metricStorage metricsstorage.Storage, opts ...TaskQueueOption opt(q) } - q.queueTasksCounter = NewTaskCounter(q.Name, q.compactableTypes, q.metricStorage) + q.queueTasksCounter = NewTaskCounter(q.name, q.compactableTypes, q.metricStorage) return q } @@ -257,7 +259,7 @@ func (q *TaskQueue) MeasureActionTime(action string) func() { q.measureActionFn = func() {} } else { q.measureActionFn = measure.Duration(func(d time.Duration) { - q.metricStorage.HistogramObserve(metrics.TasksQueueActionDurationSeconds, d.Seconds(), map[string]string{"queue_name": q.Name, "queue_action": action}, nil) + q.metricStorage.HistogramObserve(metrics.TasksQueueActionDurationSeconds, d.Seconds(), map[string]string{"queue_name": q.name, "queue_action": action}, nil) }) } }) @@ -269,15 +271,30 @@ func (q *TaskQueue) GetStatus() string { defer q.MeasureActionTime("GetStatus")() q.m.RLock() defer q.m.RUnlock() - return q.Status + return q.status } func (q *TaskQueue) SetStatus(status string) { q.m.Lock() - q.Status = status + q.status = status q.m.Unlock() } +// non thread safe method +func (q *TaskQueue) GetName() string { + return q.name +} + +// non thread safe method. for tests only +func (q *TaskQueue) GetHandler() func(ctx context.Context, t task.Task) task.Result { + return q.handler +} + +// non thread safe method. for tests only +func (q *TaskQueue) SetHandler(handler func(ctx context.Context, t task.Task) task.Result) { + q.handler = handler +} + func (q *TaskQueue) IsEmpty() bool { defer q.MeasureActionTime("IsEmpty")() q.m.RLock() @@ -369,7 +386,7 @@ func (q *TaskQueue) addLast(tasks ...task.Task) { for _, t := range tasks { q.lazydebug("adding task to queue", func() []any { return []any{ - slog.String("queue", q.Name), + slog.String("queue", q.name), slog.String("task_id", t.GetId()), slog.String("task_type", string(t.GetType())), slog.String("task_description", t.GetDescription()), @@ -378,7 +395,7 @@ func (q *TaskQueue) addLast(tasks ...task.Task) { }) if _, ok := q.idIndex[t.GetId()]; ok { - q.logger.Warn("task collision detected, unexpected behavior possible", slog.String("queue", q.Name), slog.String("task_id", t.GetId())) + q.logger.Warn("task collision detected, unexpected behavior possible", slog.String("queue", q.name), slog.String("task_id", t.GetId())) } element := q.items.PushBack(t) @@ -391,7 +408,7 @@ func (q *TaskQueue) addLast(tasks ...task.Task) { if _, ok := q.compactableTypes[taskType]; ok { q.lazydebug("task is mergeable, marking queue as dirty", func() []any { return []any{ - slog.String("queue", q.Name), + slog.String("queue", q.name), slog.String("task_id", t.GetId()), slog.String("task_type", string(taskType)), slog.Int("queue_length", q.items.Len()), @@ -403,7 +420,7 @@ func (q *TaskQueue) addLast(tasks ...task.Task) { if q.items.Len() > compactionThreshold && q.queueTasksCounter.IsAnyCapReached() { q.lazydebug("triggering compaction due to queue length", func() []any { return []any{ - slog.String("queue", q.Name), + slog.String("queue", q.name), slog.Int("queue_length", q.items.Len()), slog.Int("compaction_threshold", compactionThreshold), } @@ -414,7 +431,7 @@ func (q *TaskQueue) addLast(tasks ...task.Task) { q.lazydebug("compaction finished", func() []any { return []any{ - slog.String("queue", q.Name), + slog.String("queue", q.name), slog.Int("queue_length_before", currentQueue), slog.Int("queue_length_after", q.items.Len()), } @@ -423,7 +440,7 @@ func (q *TaskQueue) addLast(tasks ...task.Task) { } else { q.lazydebug("task is not mergeable", func() []any { return []any{ - slog.String("queue", q.Name), + slog.String("queue", q.name), slog.String("task_id", t.GetId()), slog.String("task_type", string(taskType)), } @@ -805,8 +822,8 @@ func (q *TaskQueue) Start(ctx context.Context) { return } - if q.Handler == nil { - log.Error("should set handler before start in queue", slog.String("name", q.Name)) + if q.handler == nil { + log.Error("should set handler before start in queue", slog.String("name", q.name)) q.SetStatus("no handler set") return } @@ -815,24 +832,24 @@ func (q *TaskQueue) Start(ctx context.Context) { q.SetStatus("") var sleepDelay time.Duration for { - q.logger.Debug("queue: wait for task", slog.String("queue", q.Name), slog.Duration("sleep_delay", sleepDelay)) + q.logger.Debug("queue: wait for task", slog.String("queue", q.name), slog.Duration("sleep_delay", sleepDelay)) t := q.waitForTask(sleepDelay) if t == nil { q.SetStatus("stop") - q.logger.Info("queue stopped", slog.String("name", q.Name)) + q.logger.Info("queue stopped", slog.String("name", q.name)) return } q.withLock(func() { if q.queueTasksCounter.IsAnyCapReached() { q.lazydebug("triggering compaction before task processing", func() []any { - return []any{slog.String("queue", q.Name), slog.String("task_id", t.GetId()), slog.String("task_type", string(t.GetType())), slog.Int("queue_length", q.items.Len())} + return []any{slog.String("queue", q.name), slog.String("task_id", t.GetId()), slog.String("task_type", string(t.GetType())), slog.Int("queue_length", q.items.Len())} }) q.compaction(q.queueTasksCounter.GetReachedCap()) q.lazydebug("compaction completed, queue no longer dirty", func() []any { - return []any{slog.String("queue", q.Name), slog.Int("queue_length_after", q.items.Len())} + return []any{slog.String("queue", q.name), slog.Int("queue_length_after", q.items.Len())} }) } }) @@ -843,38 +860,38 @@ func (q *TaskQueue) Start(ctx context.Context) { // use lazydebug because it dumps whole queue and task q.lazydebug("queue tasks after wait", func() []any { return []any{ - slog.String("queue", q.Name), + slog.String("queue", q.name), slog.String("tasks", q.String()), } }) q.lazydebug("queue task to handle", func() []any { return []any{ - slog.String("queue", q.Name), + slog.String("queue", q.name), slog.String("task_type", string(t.GetType())), } }) var nextSleepDelay time.Duration q.SetStatus("run first task") - taskRes := q.Handler(ctx, t) + taskRes := q.handler(ctx, t) // Check Done channel after long-running operation. select { case <-q.ctx.Done(): - q.logger.Info("queue stopped after task handling", slog.String("name", q.Name)) + q.logger.Info("queue stopped after task handling", slog.String("name", q.name)) q.SetStatus("stop") return default: } switch taskRes.Status { - case Success, Keep: + case task.Success, task.Keep: // Insert new tasks right after the current task in reverse order. q.withLock(func() { q.addAfter(t.GetId(), taskRes.GetAfterTasks()...) - if taskRes.Status == Success { + if taskRes.Status == task.Success { q.remove(t.GetId()) } t.SetProcessing(false) // release processing flag @@ -888,7 +905,7 @@ func (q *TaskQueue) Start(ctx context.Context) { q.addLast(taskRes.GetTailTasks()...) }) q.SetStatus("") - case Fail: + case task.Fail: if len(taskRes.GetAfterTasks()) > 0 || len(taskRes.GetHeadTasks()) > 0 || len(taskRes.GetTailTasks()) > 0 { q.logger.Warn("result is fail, cannot process tasks in result", slog.Int("after_task_count", len(taskRes.GetAfterTasks())), @@ -901,7 +918,7 @@ func (q *TaskQueue) Start(ctx context.Context) { nextSleepDelay = q.ExponentialBackoffFn(t.GetFailureCount()) t.IncrementFailureCount() q.SetStatus(fmt.Sprintf("sleep after fail for %s", nextSleepDelay.String())) - case Repeat: + case task.Repeat: if len(taskRes.GetAfterTasks()) > 0 || len(taskRes.GetHeadTasks()) > 0 || len(taskRes.GetTailTasks()) > 0 { q.logger.Warn("result is repeat, cannot process tasks in result", slog.Int("after_task_count", len(taskRes.GetAfterTasks())), @@ -927,7 +944,7 @@ func (q *TaskQueue) Start(ctx context.Context) { } // use lazydebug because it dumps whole queue q.lazydebug("queue: tasks after handle", func() []any { - return []any{slog.String("queue", q.Name), slog.String("tasks", q.String())} + return []any{slog.String("queue", q.name), slog.String("tasks", q.String())} }) } }() diff --git a/pkg/task/queue/task_queue_requeue_test.go b/pkg/task/queue/task_queue_requeue_test.go index 6ef61c36..455df2c8 100644 --- a/pkg/task/queue/task_queue_requeue_test.go +++ b/pkg/task/queue/task_queue_requeue_test.go @@ -10,9 +10,9 @@ import ( . "github.com/onsi/gomega" "github.com/stretchr/testify/assert" - "github.com/flant/shell-operator/internal/metrics" "github.com/flant/shell-operator/pkg/hook/task_metadata" "github.com/flant/shell-operator/pkg/metric" + "github.com/flant/shell-operator/pkg/metrics" "github.com/flant/shell-operator/pkg/task" ) @@ -54,7 +54,7 @@ func Test_TaskQueueList_Requeue(t *testing.T) { q.DelayOnRepeat = 5 * time.Millisecond // Define the handler for tasks - q.Handler = func(_ context.Context, tsk task.Task) TaskResult { + q.handler = func(_ context.Context, tsk task.Task) task.Result { mu.Lock() executionOrder = append(executionOrder, tsk.GetId()) mu.Unlock() @@ -62,8 +62,8 @@ func Test_TaskQueueList_Requeue(t *testing.T) { if tsk.GetId() == "RequeueTask" { // If there are other tasks in the queue, move this task to the end. if q.Length() > 1 { - res := TaskResult{Status: Success} - res.tailTasks = append(res.tailTasks, tsk) + res := task.Result{Status: task.Success} + res.AddTailTasks(tsk) return res } @@ -71,11 +71,11 @@ func Test_TaskQueueList_Requeue(t *testing.T) { // If no other tasks, wait for the signal to finish. <-requeueTaskCanFinish close(requeueTaskFinished) - return TaskResult{Status: Success} + return task.Result{Status: task.Success} } // For simple tasks, just succeed. - return TaskResult{Status: Success} + return task.Result{Status: task.Success} } // Add the "requeue" task first. @@ -131,113 +131,3 @@ func Test_TaskQueueList_Requeue(t *testing.T) { g.Expect(q.Length()).To(Equal(0)) } - -func Test_TaskQueue_Requeue(t *testing.T) { - g := NewWithT(t) - - metricStorage := metric.NewStorageMock(t) - metricStorage.HistogramObserveMock.Set(func(metric string, value float64, labels map[string]string, buckets []float64) { - assert.Equal(t, metric, metrics.TasksQueueActionDurationSeconds) - assert.NotZero(t, value) - assert.Equal(t, map[string]string{ - "queue_action": "AddLast", - "queue_name": "requeue-test-queue", - }, labels) - assert.Nil(t, buckets) - }) - - // A channel to control when RequeueTask can finish. - requeueTaskCanFinish := make(chan struct{}) - - // A channel to signal that RequeueTask has finished. - requeueTaskFinished := make(chan struct{}) - - // Store execution order. - executionOrder := make([]string, 0) - mu := &sync.Mutex{} - - // Create a new task queue - q := NewTasksQueueSlice() - q.WithMetricStorage(metricStorage) - q.WithName("requeue-test-queue") - q.WithContext(context.Background()) - - q.WaitLoopCheckInterval = 5 * time.Millisecond - q.DelayOnQueueIsEmpty = 5 * time.Millisecond - q.DelayOnRepeat = 5 * time.Millisecond - - // Define the handler for tasks - q.WithHandler(func(_ context.Context, tsk task.Task) TaskResult { - mu.Lock() - executionOrder = append(executionOrder, tsk.GetId()) - mu.Unlock() - - if tsk.GetId() == "RequeueTask" { - // If there are other tasks in the queue, move this task to the end. - if q.Length() > 1 { - return TaskResult{Status: Success, tailTasks: []task.Task{tsk}} - } - - // If no other tasks, wait for the signal to finish. - <-requeueTaskCanFinish - close(requeueTaskFinished) - return TaskResult{Status: Success} - } - - // For simple tasks, just succeed. - return TaskResult{Status: Success} - }) - - // Add the "requeue" task first. - requeueTask := task.BaseTask{Id: "RequeueTask", Type: task_metadata.HookRun} - q.AddLast(&requeueTask) - - // Add a few simple tasks. - for i := 0; i < 3; i++ { - simpleTask := task.BaseTask{Id: fmt.Sprintf("SimpleTask-%d", i), Type: task_metadata.HookRun} - q.AddLast(&simpleTask) - } - - g.Expect(q.Length()).To(Equal(4)) - - // Start processing the queue in a separate goroutine. - go q.Start(context.Background()) - defer q.Stop() - - // Wait until all simple tasks are processed. - g.Eventually(func() int { - mu.Lock() - defer mu.Unlock() - count := 0 - for _, id := range executionOrder { - if id != "RequeueTask" { - count++ - } - } - return count - }, "5s", "10ms").Should(Equal(3), "All simple tasks should run") - - // Verify the order of execution so far. - // The RequeueTask should have been processed and moved to the back. - mu.Lock() - // The first task should be the RequeueTask. - g.Expect(executionOrder[0]).To(Equal("RequeueTask")) - // The next 3 tasks should be SimpleTasks - g.Expect(executionOrder[1:4]).To(Equal([]string{"SimpleTask-0", "SimpleTask-1", "SimpleTask-2"})) - mu.Unlock() - - // Allow the RequeueTask to finish. - close(requeueTaskCanFinish) - - // Wait for the RequeueTask to finish. - g.Eventually(requeueTaskFinished, "5s", "10ms").Should(BeClosed()) - - // Check final execution order. - mu.Lock() - g.Expect(len(executionOrder)).To(BeNumerically(">=", 5)) - // Last executed task must be RequeueTask - g.Expect(executionOrder[len(executionOrder)-1]).To(Equal("RequeueTask")) - mu.Unlock() - - g.Expect(q.Length()).To(Equal(0)) -} diff --git a/pkg/task/queue/task_queue_test.go b/pkg/task/queue/task_queue_test.go deleted file mode 100644 index 1ed9bf3b..00000000 --- a/pkg/task/queue/task_queue_test.go +++ /dev/null @@ -1,426 +0,0 @@ -package queue - -import ( - "bytes" - "context" - "fmt" - "strings" - "testing" - "time" - - . "github.com/onsi/gomega" - "github.com/stretchr/testify/assert" - - "github.com/flant/shell-operator/internal/metrics" - "github.com/flant/shell-operator/pkg/hook/task_metadata" - htypes "github.com/flant/shell-operator/pkg/hook/types" - "github.com/flant/shell-operator/pkg/metric" - "github.com/flant/shell-operator/pkg/task" -) - -func DumpTaskIds(q *TaskQueue) string { - var buf bytes.Buffer - var index int - q.Iterate(func(t task.Task) { - buf.WriteString(fmt.Sprintf("%d: %s\n", index, t.GetId())) - index++ - }) - return buf.String() -} - -func Test_TasksQueue_Remove(t *testing.T) { - g := NewWithT(t) - - metricStorage := metric.NewStorageMock(t) - metricStorage.HistogramObserveMock.Set(func(metric string, value float64, labels map[string]string, buckets []float64) { - assert.Equal(t, metric, metrics.TasksQueueActionDurationSeconds) - assert.NotZero(t, value) - assert.Equal(t, map[string]string{ - "queue_action": "AddFirst", - "queue_name": "", - }, labels) - assert.Nil(t, buckets) - }) - metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) { - }) - - q := NewTasksQueue(metricStorage) - - // Remove just one element - Task := &task.BaseTask{Id: "First one"} - q.AddFirst(Task) - g.Expect(q.Length()).To(Equal(1)) - q.Remove("First one") - g.Expect(q.Length()).To(Equal(0)) - - // Remove element in the middle - for i := 0; i < 5; i++ { - Task := &task.BaseTask{Id: fmt.Sprintf("task_%02d", i)} - q.AddFirst(Task) - } - g.Expect(q.Length()).To(Equal(5)) - q.Remove("task_02") - g.Expect(q.Length()).To(Equal(4)) - - idsDump := DumpTaskIds(q) - - g.Expect(idsDump).To(And( - ContainSubstring("task_00"), - ContainSubstring("task_01"), - ContainSubstring("task_03"), - ContainSubstring("task_04"), - )) - - // Remove last element - q.Remove("task_04") - g.Expect(q.Length()).To(Equal(3)) - - idsDump = DumpTaskIds(q) - - g.Expect(idsDump).To(And( - ContainSubstring("task_00"), - ContainSubstring("task_01"), - ContainSubstring("task_03"), - )) - - // Remove first element by id - q.Remove("task_00") - g.Expect(q.Length()).To(Equal(2)) - - idsDump = DumpTaskIds(q) - - g.Expect(idsDump).To(And( - ContainSubstring("task_01"), - ContainSubstring("task_03"), - )) -} - -func Test_TasksQueue_RemoveFirst(t *testing.T) { - g := NewWithT(t) - - metricStorage := metric.NewStorageMock(t) - metricStorage.HistogramObserveMock.Set(func(metric string, value float64, labels map[string]string, buckets []float64) { - assert.Equal(t, metric, metrics.TasksQueueActionDurationSeconds) - assert.NotZero(t, value) - assert.Equal(t, map[string]string{ - "queue_action": "AddFirst", - "queue_name": "", - }, labels) - assert.Nil(t, buckets) - }) - metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) { - }) - - q := NewTasksQueue(metricStorage) - - // Remove just one element - Task := &task.BaseTask{Id: "First one"} - q.AddFirst(Task) - g.Expect(q.Length()).To(Equal(1)) - q.RemoveFirst() - g.Expect(q.Length()).To(Equal(0)) - - // Remove element in the middle - for i := 0; i < 5; i++ { - Task := &task.BaseTask{Id: fmt.Sprintf("task_%02d", i)} - q.AddFirst(Task) - } - g.Expect(q.Length()).To(Equal(5)) - q.RemoveFirst() - g.Expect(q.Length()).To(Equal(4)) - - idsDump := DumpTaskIds(q) - - g.Expect(idsDump).To(And( - ContainSubstring("task_00"), - ContainSubstring("task_01"), - ContainSubstring("task_02"), - )) - - // Remove last element - q.RemoveFirst() - g.Expect(q.Length()).To(Equal(3)) - - idsDump = DumpTaskIds(q) - - g.Expect(idsDump).To(And( - ContainSubstring("task_00"), - ContainSubstring("task_01"), - ContainSubstring("task_02"), - )) - - // Remove first element by id - q.RemoveFirst() - g.Expect(q.Length()).To(Equal(2)) - - idsDump = DumpTaskIds(q) - - g.Expect(idsDump).To(And( - ContainSubstring("task_01"), - ContainSubstring("task_00"), - )) -} - -func Test_ExponentialBackoff(t *testing.T) { - g := NewWithT(t) - - metricStorage := metric.NewStorageMock(t) - metricStorage.HistogramObserveMock.Set(func(metric string, value float64, labels map[string]string, buckets []float64) { - assert.Equal(t, metric, metrics.TasksQueueActionDurationSeconds) - assert.NotZero(t, value) - assert.Equal(t, map[string]string{ - "queue_action": "AddFirst", - "queue_name": "test-queue", - }, labels) - assert.Nil(t, buckets) - }) - metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) { - }) - - // Init and prefill queue. - q := NewTasksQueue(metricStorage, WithContext(context.TODO()), WithName("test-queue")) - // Since we don't want the test to run for too long, we don't - // want to use lengthy times. - q.WaitLoopCheckInterval = 5 * time.Millisecond // default is 125ms - q.DelayOnQueueIsEmpty = 5 * time.Millisecond // default is 250ms - q.DelayOnRepeat = 5 * time.Millisecond // default is 25ms - // Add one task. - Task := &task.BaseTask{Id: "First one"} - q.AddFirst(Task) - - // Set handler to fail 10 times and catch timestamps for each task execution. - runsAt := make([]time.Time, 0) - failureCounts := make([]int, 0) - const fails = 10 - failsCount := fails - queueStopCh := make(chan struct{}, 1) - q.Handler = func(_ context.Context, t task.Task) TaskResult { - var res TaskResult - runsAt = append(runsAt, time.Now()) - failureCounts = append(failureCounts, t.GetFailureCount()) - if failsCount > 0 { - res.Status = Fail - failsCount-- - - return res - } - - res.Status = Success - res.AfterHandle = func() { - close(queueStopCh) - } - - return res - } - - // Set exponential backoff to the constant delay just to wait more than DelayOnQueueIsEmpty. - // It is a test of delaying between task runs, not a test of exponential distribution. - mockExponentialDelay := 30 * time.Millisecond - q.ExponentialBackoffFn = func(_ int) time.Duration { - return mockExponentialDelay - } - - q.Start(context.TODO()) - - // Expect taskHandler returns Success result. - g.Eventually(queueStopCh, "5s", "20ms").Should(BeClosed(), "Should handle first task in queue successfully") - - // Expect taskHandler called 'fails' times. - g.Expect(Task.GetFailureCount()).Should(Equal(fails), "task should fail %d times.", fails) - - prev := failureCounts[0] - for i := 1; i < len(failureCounts); i++ { - cur := failureCounts[i] - g.Expect(cur > prev).Should(BeTrue(), "taskHandler should receive task with growing FailureCount. Got %d after %d", cur, prev) - } - - // Expect mean delay is greater than mocked delay. - mean, _ := calculateMeanDelay(runsAt) - g.Expect(mean).Should(BeNumerically(">", mockExponentialDelay), - "mean delay of %d fails should be more than %s, got %s. Check exponential delaying not broken in Start or waitForTask.", - fails, mockExponentialDelay.String(), mean.Truncate(100*time.Microsecond).String()) -} - -func calculateMeanDelay(in []time.Time) (time.Duration, []int64) { - var sum int64 - - // Calculate deltas from timestamps. - prev := in[0].UnixNano() - deltas := make([]int64, 0, len(in)-1) - for i := 1; i < len(in); i++ { - delta := in[i].UnixNano() - prev - prev = in[i].UnixNano() - deltas = append(deltas, delta) - sum += delta - } - mean := time.Duration(sum / int64(len(deltas))) - - return mean, deltas -} - -func Test_CancelDelay(t *testing.T) { - g := NewWithT(t) - - metricStorage := metric.NewStorageMock(t) - metricStorage.HistogramObserveMock.Set(func(metric string, value float64, labels map[string]string, buckets []float64) { - assert.Equal(t, metric, metrics.TasksQueueActionDurationSeconds) - assert.NotZero(t, value) - assert.Equal(t, map[string]string{ - "queue_action": "AddFirst", - "queue_name": "test-queue", - }, labels) - assert.Nil(t, buckets) - }) - metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) { - }) - - // Init and prefill queue. - q := NewTasksQueue(metricStorage, WithContext(context.TODO()), WithName("test-queue")) - // Since we don't want the test to run for too long, we don't - // want to use lengthy times. - q.WaitLoopCheckInterval = 5 * time.Millisecond // default is 125ms - q.DelayOnQueueIsEmpty = 5 * time.Millisecond // default is 250ms - q.DelayOnRepeat = 5 * time.Millisecond // default is 25ms - // Add 'always fail' task. - ErrTask := &task.BaseTask{Id: "erroneous"} - q.AddFirst(ErrTask) - - HealingTask := &task.BaseTask{Id: "healing"} - - // Set handler to always return fail for task "erroneous", and return success for other tasks. - // Catch time between "erroneous" task handling and "healing" task handling. - startedAt := time.Now() - endedAt := startedAt - delayStartsCh := make(chan struct{}, 1) - healingDoneCh := make(chan struct{}, 1) - q.Handler = func(_ context.Context, t task.Task) TaskResult { - var res TaskResult - if t.GetId() == ErrTask.GetId() { - res.Status = Fail - // Close chan after first delay. - if t.GetFailureCount() == 1 { - res.AfterHandle = func() { - close(delayStartsCh) - } - } - - return res - } - - if t.GetId() == HealingTask.GetId() { - endedAt = time.Now() - res.AfterHandle = func() { - close(healingDoneCh) - } - } - - res.Status = Success - - return res - } - - // Set exponential backoff to the constant delay just to wait more than DelayOnQueueIsEmpty. - // It is a test of delaying between task runs, not a test of exponential distribution. - mockExponentialDelay := 150 * time.Millisecond - q.ExponentialBackoffFn = func(_ int) time.Duration { - return mockExponentialDelay - } - - // Start handling 'erroneous' task. - q.Start(context.TODO()) - - // Expect taskHandler returns Success result. - g.Eventually(delayStartsCh, "5s", "20ms").Should(BeClosed(), "Should handle failed task and starts a delay") - - // Add healing task and cancel delay in parallel. - go func() { - q.AddFirst(HealingTask) - q.CancelTaskDelay() - }() - - // Expect queue handles 'healing' task. - g.Eventually(healingDoneCh, "5s", "20ms").Should(BeClosed(), "Should handle 'healing' task") - - elapsed := endedAt.Sub(startedAt).Truncate(100 * time.Microsecond) - - // Expect elapsed is less than mocked delay. - g.Expect(elapsed).Should(BeNumerically(">", mockExponentialDelay), - "Should delay after failed task. Got delay of %s, expect more than %s. Check delay for failed task not broken in Start or waitForTask.", - elapsed.String(), mockExponentialDelay.String()) - g.Expect(elapsed).Should(BeNumerically("<", 2*mockExponentialDelay), - "Should stop delaying after CancelTaskDelay call. Got delay of %s, expect less than %s. Check cancel delay not broken in Start or waitForTask.", - elapsed.String(), (2 * mockExponentialDelay).String()) -} - -func Test_QueueDump_HookMetadata_Task_Description(t *testing.T) { - g := NewWithT(t) - - logLabels := map[string]string{ - "hook": "hook1.sh", - } - - metricStorage := metric.NewStorageMock(t) - metricStorage.HistogramObserveMock.Set(func(metric string, value float64, labels map[string]string, buckets []float64) { - assert.Equal(t, metric, metrics.TasksQueueActionDurationSeconds) - assert.NotZero(t, value) - assert.Equal(t, map[string]string{ - "queue_action": "AddLast", - "queue_name": "", - }, labels) - assert.Nil(t, buckets) - }) - metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) { - }) - - q := NewTasksQueue(metricStorage) - - q.AddLast(task.NewTask(task_metadata.EnableKubernetesBindings). - WithMetadata(task_metadata.HookMetadata{ - HookName: "hook1.sh", - Binding: string(task_metadata.EnableKubernetesBindings), - })) - - q.AddLast(task.NewTask(task_metadata.HookRun). - WithMetadata(task_metadata.HookMetadata{ - HookName: "hook1.sh", - BindingType: htypes.OnKubernetesEvent, - Binding: "monitor_pods", - }). - WithLogLabels(logLabels). - WithQueueName("main")) - - q.AddLast(task.NewTask(task_metadata.HookRun). - WithMetadata(task_metadata.HookMetadata{ - HookName: "hook1.sh", - BindingType: htypes.Schedule, - AllowFailure: true, - Binding: "every 1 sec", - Group: "monitor_pods", - }). - WithLogLabels(logLabels). - WithQueueName("main")) - - queueDump := taskQueueToText(q) - - g.Expect(queueDump).Should(ContainSubstring("hook1.sh"), "Queue dump should reveal a hook name.") - g.Expect(queueDump).Should(ContainSubstring("EnableKubernetesBindings"), "Queue dump should reveal EnableKubernetesBindings.") - g.Expect(queueDump).Should(ContainSubstring(":kubernetes:"), "Queue dump should show kubernetes binding.") - g.Expect(queueDump).Should(ContainSubstring(":schedule:"), "Queue dump should show schedule binding.") - g.Expect(queueDump).Should(ContainSubstring("group=monitor_pods"), "Queue dump should show group name.") -} - -func taskQueueToText(q *TaskQueue) string { - var buf strings.Builder - buf.WriteString(fmt.Sprintf("Queue '%s': length %d, status: '%s'\n", q.Name, q.Length(), q.Status)) - buf.WriteString("\n") - - index := 1 - q.Iterate(func(task task.Task) { - buf.WriteString(fmt.Sprintf("%2d. ", index)) - buf.WriteString(task.GetDescription()) - buf.WriteString("\n") - index++ - }) - - return buf.String() -} diff --git a/pkg/task/queue/task_result.go b/pkg/task/queue/task_result.go deleted file mode 100644 index 3c7e45a8..00000000 --- a/pkg/task/queue/task_result.go +++ /dev/null @@ -1,65 +0,0 @@ -package queue - -import ( - "time" - - "github.com/flant/shell-operator/pkg/task" -) - -type TaskStatus string - -const ( - Success TaskStatus = "Success" - Fail TaskStatus = "Fail" - Repeat TaskStatus = "Repeat" - Keep TaskStatus = "Keep" -) - -const compactionThreshold = 100 - -type TaskResult struct { - Status TaskStatus - headTasks []task.Task - tailTasks []task.Task - afterTasks []task.Task - - DelayBeforeNextTask time.Duration - - AfterHandle func() -} - -func (res *TaskResult) GetHeadTasks() []task.Task { - return res.headTasks -} - -func (res *TaskResult) GetTailTasks() []task.Task { - return res.tailTasks -} - -func (res *TaskResult) GetAfterTasks() []task.Task { - return res.afterTasks -} - -func (res *TaskResult) AddHeadTasks(t ...task.Task) { - if res.headTasks == nil { - res.headTasks = make([]task.Task, 0, len(t)) - } - - res.headTasks = append(res.headTasks, t...) -} - -func (res *TaskResult) AddTailTasks(t ...task.Task) { - if res.tailTasks == nil { - res.tailTasks = make([]task.Task, 0, len(t)) - } - - res.tailTasks = append(res.tailTasks, t...) -} - -func (res *TaskResult) AddAfterTasks(t ...task.Task) { - if res.afterTasks == nil { - res.afterTasks = make([]task.Task, 0, len(t)) - } - - res.afterTasks = append(res.afterTasks, t...) -} diff --git a/pkg/task/result.go b/pkg/task/result.go new file mode 100644 index 00000000..025e7dd4 --- /dev/null +++ b/pkg/task/result.go @@ -0,0 +1,61 @@ +package task + +import ( + "time" +) + +type Status string + +const ( + Success Status = "Success" + Fail Status = "Fail" + Repeat Status = "Repeat" + Keep Status = "Keep" +) + +type Result struct { + Status Status + headTasks []Task + tailTasks []Task + afterTasks []Task + + DelayBeforeNextTask time.Duration + + AfterHandle func() +} + +func (res *Result) GetHeadTasks() []Task { + return res.headTasks +} + +func (res *Result) GetTailTasks() []Task { + return res.tailTasks +} + +func (res *Result) GetAfterTasks() []Task { + return res.afterTasks +} + +func (res *Result) AddHeadTasks(t ...Task) { + if res.headTasks == nil { + res.headTasks = make([]Task, 0, len(t)) + } + + res.headTasks = append(res.headTasks, t...) +} + +func (res *Result) AddTailTasks(t ...Task) { + if res.tailTasks == nil { + res.tailTasks = make([]Task, 0, len(t)) + } + + res.tailTasks = append(res.tailTasks, t...) +} + +func (res *Result) AddAfterTasks(t ...Task) { + if res.afterTasks == nil { + res.afterTasks = make([]Task, 0, len(t)) + } + + res.afterTasks = append(res.afterTasks, t...) +} diff --git a/test/hook/context/context_combiner.go b/test/hook/context/context_combiner.go index f4005681..a34bbc68 100644 --- a/test/hook/context/context_combiner.go +++ b/test/hook/context/context_combiner.go @@ -27,7 +27,7 @@ const TestQueueName string = "test-queue" // with tasks. type ContextCombiner struct { op *shell_operator.ShellOperator - q *queue.TaskQueue + q task.TaskQueue } func NewContextCombiner() *ContextCombiner { diff --git a/test/integration/kube_event_manager/kube_event_manager_test.go b/test/integration/kube_event_manager/kube_event_manager_test.go index c991efbb..4dc399d9 100644 --- a/test/integration/kube_event_manager/kube_event_manager_test.go +++ b/test/integration/kube_event_manager/kube_event_manager_test.go @@ -17,6 +17,7 @@ import ( kubeeventsmanager "github.com/flant/shell-operator/pkg/kube_events_manager" kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" "github.com/flant/shell-operator/pkg/metric" + "github.com/flant/shell-operator/pkg/metrics" . "github.com/flant/shell-operator/test/integration/suite" testutils "github.com/flant/shell-operator/test/utils" ) @@ -36,8 +37,8 @@ var _ = Describe("Binding 'kubernetes' with kind 'Pod' should emit KubeEvent obj defer GinkgoRecover() fmt.Fprintf(GinkgoWriter, "HistogramObserve: %s, value: %f\n", metric, value) Expect(metric).To(Or( - Equal("{PREFIX}kube_jq_filter_duration_seconds"), - Equal("{PREFIX}kube_event_duration_seconds"), + Equal(metrics.KubeJqFilterDurationSeconds), + Equal(metrics.KubeEventDurationSeconds), )) Expect(value).To(BeNumerically(">=", 0)) Expect(labels).To(BeNil()) @@ -46,7 +47,7 @@ var _ = Describe("Binding 'kubernetes' with kind 'Pod' should emit KubeEvent obj metricStorage.GaugeSetMock.Set(func(metric string, value float64, labels map[string]string) { defer GinkgoRecover() fmt.Fprintf(GinkgoWriter, "GaugeSet: %s, value: %f\n", metric, value) - Expect(metric).To(Equal("{PREFIX}kube_snapshot_objects")) + Expect(metric).To(Equal(metrics.KubeSnapshotObjects)) Expect(value).To(BeNumerically(">=", 0)) Expect(labels).To(BeEmpty()) }) @@ -97,8 +98,8 @@ var _ = Describe("Binding 'kubernetes' with kind 'Pod' should emit KubeEvent obj Expect(snapshot).Should(HaveLen(0), "No pods in default namespace. Snapshot at start should have no objects.") // Trigger metrics - KubeEventsManager.MetricStorage().GaugeSet("{PREFIX}kube_snapshot_objects", 0, nil) - KubeEventsManager.MetricStorage().HistogramObserve("{PREFIX}kube_jq_filter_duration_seconds", 0, nil, nil) + KubeEventsManager.MetricStorage().GaugeSet(metrics.KubeSnapshotObjects, 0, nil) + KubeEventsManager.MetricStorage().HistogramObserve(metrics.KubeJqFilterDurationSeconds, 0, nil, nil) fmt.Fprintf(GinkgoWriter, "Finished test: should have cached objects\n") }, SpecTimeout(30*time.Second))