diff --git a/pkg/kube_events_manager/factory.go b/pkg/kube_events_manager/factory.go index a52ebab4..66cc7a9a 100644 --- a/pkg/kube_events_manager/factory.go +++ b/pkg/kube_events_manager/factory.go @@ -36,38 +36,44 @@ type Factory struct { handlerRegistrations map[string]cache.ResourceEventHandlerRegistration ctx context.Context cancel context.CancelFunc + // done is closed when the underlying informer.Run returns + done chan struct{} } type FactoryStore struct { mu sync.Mutex - data map[FactoryIndex]Factory + cond *sync.Cond + data map[FactoryIndex]*Factory } func NewFactoryStore() *FactoryStore { - return &FactoryStore{ - data: make(map[FactoryIndex]Factory), + fs := &FactoryStore{ + data: make(map[FactoryIndex]*Factory), } + fs.cond = sync.NewCond(&fs.mu) + return fs } func (c *FactoryStore) Reset() { c.mu.Lock() defer c.mu.Unlock() - c.data = make(map[FactoryIndex]Factory) + c.data = make(map[FactoryIndex]*Factory) } func (c *FactoryStore) add(index FactoryIndex, f dynamicinformer.DynamicSharedInformerFactory) { ctx, cancel := context.WithCancel(context.Background()) - c.data[index] = Factory{ + c.data[index] = &Factory{ shared: f, handlerRegistrations: make(map[string]cache.ResourceEventHandlerRegistration), ctx: ctx, cancel: cancel, + done: nil, } log.Debug("Factory store: added a new factory for index", slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String())) } -func (c *FactoryStore) get(client dynamic.Interface, index FactoryIndex) Factory { +func (c *FactoryStore) get(client dynamic.Interface, index FactoryIndex) *Factory { f, ok := c.data[index] if ok { log.Debug("Factory store: the factory with index found", @@ -115,9 +121,18 @@ func (c *FactoryStore) Start(ctx context.Context, informerId string, client dyna slog.Int("value", len(factory.handlerRegistrations)), slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String())) - if !informer.HasSynced() { - go informer.Run(factory.ctx.Done()) + // Ensure informer.Run is started once and tracked + if factory.done == nil { + factory.done = make(chan struct{}) + go func() { + informer.Run(factory.ctx.Done()) + close(factory.done) + log.Debug("Factory store: informer goroutine exited", + slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String())) + }() + } + if !informer.HasSynced() { if err := wait.PollUntilContextCancel(ctx, DefaultSyncTime, true, func(_ context.Context) (bool, error) { return informer.HasSynced(), nil }); err != nil { @@ -131,11 +146,10 @@ func (c *FactoryStore) Start(ctx context.Context, informerId string, client dyna func (c *FactoryStore) Stop(informerId string, index FactoryIndex) { c.mu.Lock() - defer c.mu.Unlock() - f, ok := c.data[index] if !ok { // already deleted + c.mu.Unlock() return } @@ -146,15 +160,38 @@ func (c *FactoryStore) Stop(informerId string, index FactoryIndex) { slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String()), log.Err(err)) } + delete(f.handlerRegistrations, informerId) log.Debug("Factory store: decreased usage counter of the factory", slog.Int("value", len(f.handlerRegistrations)), slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String())) if len(f.handlerRegistrations) == 0 { + log.Debug("Factory store: last handler removed, canceling shared informer", + slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String())) + done := f.done f.cancel() + c.mu.Unlock() + if done != nil { + <-done + } + c.mu.Lock() delete(c.data, index) log.Debug("Factory store: deleted factory", slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String())) + c.cond.Broadcast() + } + } + c.mu.Unlock() +} + +// WaitStopped blocks until there is no factory for the index +func (c *FactoryStore) WaitStopped(index FactoryIndex) { + c.mu.Lock() + for { + if _, ok := c.data[index]; !ok { + c.mu.Unlock() + return } + c.cond.Wait() } } diff --git a/pkg/kube_events_manager/kube_events_manager.go b/pkg/kube_events_manager/kube_events_manager.go index 1c6cff72..8e13a401 100644 --- a/pkg/kube_events_manager/kube_events_manager.go +++ b/pkg/kube_events_manager/kube_events_manager.go @@ -24,7 +24,8 @@ type KubeEventsManager interface { StopMonitor(monitorID string) error Ch() chan kemtypes.KubeEvent - PauseHandleEvents() + Stop() + Wait() } // kubeEventsManager is a main implementation of KubeEventsManager. @@ -138,14 +139,20 @@ func (mgr *kubeEventsManager) Ch() chan kemtypes.KubeEvent { return mgr.KubeEventCh } -// PauseHandleEvents set flags for all informers to ignore incoming events. -// Useful for shutdown without panicking. -// Calling cancel() leads to a race and panicking, see https://github.com/kubernetes/kubernetes/issues/59822 -func (mgr *kubeEventsManager) PauseHandleEvents() { +// Stop the kube events manager and all the informers inside monitors. +func (mgr *kubeEventsManager) Stop() { + mgr.cancel() +} + +func (mgr *kubeEventsManager) Wait() { mgr.m.RLock() - defer mgr.m.RUnlock() - for _, monitor := range mgr.Monitors { - monitor.PauseHandleEvents() + monitors := make([]Monitor, 0, len(mgr.Monitors)) + for _, mon := range mgr.Monitors { + monitors = append(monitors, mon) + } + mgr.m.RUnlock() + for _, mon := range monitors { + mon.Wait() } } diff --git a/pkg/kube_events_manager/monitor.go b/pkg/kube_events_manager/monitor.go index da11c384..9e252eb8 100644 --- a/pkg/kube_events_manager/monitor.go +++ b/pkg/kube_events_manager/monitor.go @@ -19,7 +19,7 @@ type Monitor interface { CreateInformers() error Start(context.Context) Stop() - PauseHandleEvents() + Wait() Snapshot() []kemtypes.ObjectAndFilterResult EnableKubeEventCb() GetConfig() *MonitorConfig @@ -373,22 +373,18 @@ func (m *monitor) Stop() { } } -// PauseHandleEvents set flags for all informers to ignore incoming events. -// Useful for shutdown without panicking. -// Calling cancel() leads to a race and panicking, see https://github.com/kubernetes/kubernetes/issues/59822 -func (m *monitor) PauseHandleEvents() { +// Wait waits for all started informers to stop +func (m *monitor) Wait() { for _, informer := range m.ResourceInformers { - informer.pauseHandleEvents() + informer.wait() } - m.VaryingInformers.RangeValue(func(value []*resourceInformer) { for _, informer := range value { - informer.pauseHandleEvents() + informer.wait() } }) - if m.NamespaceInformer != nil { - m.NamespaceInformer.pauseHandleEvents() + m.NamespaceInformer.wait() } } diff --git a/pkg/kube_events_manager/namespace_informer.go b/pkg/kube_events_manager/namespace_informer.go index fbce2814..5e0e8f3f 100644 --- a/pkg/kube_events_manager/namespace_informer.go +++ b/pkg/kube_events_manager/namespace_informer.go @@ -21,6 +21,7 @@ type namespaceInformer struct { ctx context.Context cancel context.CancelFunc stopped bool + done chan struct{} KubeClient *klient.Client Monitor *MonitorConfig @@ -128,13 +129,18 @@ func (ni *namespaceInformer) start() { return } cctx, cancel := context.WithCancel(ni.ctx) + ni.done = make(chan struct{}) go func() { <-ni.ctx.Done() ni.stopped = true cancel() }() - go ni.SharedInformer.Run(cctx.Done()) + go func() { + ni.SharedInformer.Run(cctx.Done()) + close(ni.done) + log.Debug("Namespace informer goroutine exited", slog.String("name", ni.Monitor.Metadata.DebugName)) + }() if err := wait.PollUntilContextCancel(cctx, DefaultSyncTime, true, func(_ context.Context) (bool, error) { return ni.SharedInformer.HasSynced(), nil @@ -146,6 +152,8 @@ func (ni *namespaceInformer) start() { log.Debug("Informer is ready", slog.String("debugName", ni.Monitor.Metadata.DebugName)) } -func (ni *namespaceInformer) pauseHandleEvents() { - ni.stopped = true +func (ni *namespaceInformer) wait() { + if ni.done != nil { + <-ni.done + } } diff --git a/pkg/kube_events_manager/resource_informer.go b/pkg/kube_events_manager/resource_informer.go index 3d15e570..ef9db494 100644 --- a/pkg/kube_events_manager/resource_informer.go +++ b/pkg/kube_events_manager/resource_informer.go @@ -469,9 +469,9 @@ func (ei *resourceInformer) start() { log.Debug("informer is ready", slog.String("debugName", ei.Monitor.Metadata.DebugName)) } -func (ei *resourceInformer) pauseHandleEvents() { - log.Debug("PAUSE resource informer", slog.String("debugName", ei.Monitor.Metadata.DebugName)) - ei.stopped = true +// wait blocks until the underlying shared informer for this FactoryIndex is stopped +func (ei *resourceInformer) wait() { + DefaultFactoryStore.WaitStopped(ei.FactoryIndex) } // CachedObjectsInfo returns info accumulated from start. diff --git a/pkg/shell-operator/operator.go b/pkg/shell-operator/operator.go index c68c2fe6..4c57464c 100644 --- a/pkg/shell-operator/operator.go +++ b/pkg/shell-operator/operator.go @@ -960,9 +960,18 @@ func (op *ShellOperator) runMetrics() { // Shutdown pause kubernetes events handling and stop queues. Wait for queues to stop. func (op *ShellOperator) Shutdown() { + op.logger.Info("shutdown begin", slog.String("phase", "shutdown")) op.ScheduleManager.Stop() - op.KubeEventsManager.PauseHandleEvents() + op.logger.Info("schedule manager stopped", slog.String("phase", "shutdown")) + + op.KubeEventsManager.Stop() + op.logger.Info("waiting informers", slog.String("phase", "shutdown")) + op.KubeEventsManager.Wait() + op.logger.Info("informers stopped", slog.String("phase", "shutdown")) + op.TaskQueues.Stop() + op.logger.Info("waiting task queues", slog.String("phase", "shutdown")) // Wait for queues to stop, but no more than 10 seconds op.TaskQueues.WaitStopWithTimeout(WaitQueuesTimeout) + op.logger.Info("task queues stopped", slog.String("phase", "shutdown")) } diff --git a/test/integration/kube_event_manager/kube_event_manager_test.go b/test/integration/kube_event_manager/kube_event_manager_test.go index c991efbb..e2da5571 100644 --- a/test/integration/kube_event_manager/kube_event_manager_test.go +++ b/test/integration/kube_event_manager/kube_event_manager_test.go @@ -56,7 +56,7 @@ var _ = Describe("Binding 'kubernetes' with kind 'Pod' should emit KubeEvent obj AfterEach(func() { fmt.Fprintf(GinkgoWriter, "Starting AfterEach\n") - KubeEventsManager.PauseHandleEvents() + KubeEventsManager.Stop() fmt.Fprintf(GinkgoWriter, "Finished AfterEach\n") })