From ccd3cb92bd0949a2f58a5aeaa9dc45bb16c534a4 Mon Sep 17 00:00:00 2001 From: Tobias Harnickell Date: Thu, 18 Sep 2025 09:13:59 +0200 Subject: [PATCH 1/3] fix(controller): Correct event schedule interval * Update controller throttle so events schedule at `max(lastRun+interval, now+minSync)` * Rework `TestShouldRunOnce` flow to cover the stricter scheduling contract * Add per-source tracking of when node data is required * Add lazy node handler registration * Introduce node-address diffing handler to prevent heartbeat-only trigger * Adjust service informer tests for new detection path and explicit node-event opt-in Signed-off-by: Tobias Harnickell --- controller/controller.go | 31 ++----- controller/controller_test.go | 65 +++++++-------- source/service.go | 152 +++++++++++++++++++++++++++++++++- source/service_test.go | 18 ++-- 4 files changed, 201 insertions(+), 65 deletions(-) diff --git a/controller/controller.go b/controller/controller.go index bdbddfb64f..8529ff1eb8 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -259,24 +259,6 @@ func (c *Controller) RunOnce(ctx context.Context) error { return nil } -func earliest(r time.Time, times ...time.Time) time.Time { - for _, t := range times { - if t.Before(r) { - r = t - } - } - return r -} - -func latest(r time.Time, times ...time.Time) time.Time { - for _, t := range times { - if t.After(r) { - r = t - } - } - return r -} - // Counts the intersections of records in endpoint and registry. func countMatchingAddressRecords(rec *metricsRecorder, endpoints []*endpoint.Endpoint, registryRecords []*endpoint.Endpoint, metric metrics.GaugeVecMetric) { recordsMap := make(map[string]map[string]struct{}) @@ -318,13 +300,12 @@ func countAddressRecords(rec *metricsRecorder, endpoints []*endpoint.Endpoint, m func (c *Controller) ScheduleRunOnce(now time.Time) { c.runAtMutex.Lock() defer c.runAtMutex.Unlock() - c.nextRunAt = latest( - c.lastRunAt.Add(c.MinEventSyncInterval), - earliest( - now.Add(5*time.Second), - c.nextRunAt, - ), - ) + preferred := c.lastRunAt.Add(c.Interval) + eventReadyAt := now.Add(c.MinEventSyncInterval) + if preferred.Before(eventReadyAt) { + preferred = eventReadyAt + } + c.nextRunAt = preferred } func (c *Controller) ShouldRunOnce(now time.Time) bool { diff --git a/controller/controller_test.go b/controller/controller_test.go index 5abd5677e5..4f5251f879 100644 --- a/controller/controller_test.go +++ b/controller/controller_test.go @@ -280,58 +280,57 @@ func TestRun(t *testing.T) { func TestShouldRunOnce(t *testing.T) { ctrl := &Controller{Interval: 10 * time.Minute, MinEventSyncInterval: 15 * time.Second} - now := time.Now() + start := time.Now() // First run of Run loop should execute RunOnce - assert.True(t, ctrl.ShouldRunOnce(now)) - assert.Equal(t, now.Add(10*time.Minute), ctrl.nextRunAt) + assert.True(t, ctrl.ShouldRunOnce(start)) + assert.Equal(t, start.Add(ctrl.Interval), ctrl.nextRunAt) // Second run should not - assert.False(t, ctrl.ShouldRunOnce(now)) - ctrl.lastRunAt = now + assert.False(t, ctrl.ShouldRunOnce(start)) + ctrl.lastRunAt = start - now = now.Add(10 * time.Second) + eventTime := start.Add(10 * time.Second) // Changes happen in ingresses or services - ctrl.ScheduleRunOnce(now) - ctrl.ScheduleRunOnce(now) + ctrl.ScheduleRunOnce(eventTime) + ctrl.ScheduleRunOnce(eventTime) - // Because we batch changes, ShouldRunOnce returns False at first - assert.False(t, ctrl.ShouldRunOnce(now)) - assert.False(t, ctrl.ShouldRunOnce(now.Add(100*time.Microsecond))) - - // But after MinInterval we should run reconciliation - now = now.Add(5 * time.Second) - assert.True(t, ctrl.ShouldRunOnce(now)) + // Because we batch changes, ShouldRunOnce returns False until the throttle conditions are satisfied + earliest := ctrl.lastRunAt.Add(ctrl.Interval) + if ready := eventTime.Add(ctrl.MinEventSyncInterval); ready.After(earliest) { + earliest = ready + } + assert.False(t, ctrl.ShouldRunOnce(earliest.Add(-time.Second))) + assert.True(t, ctrl.ShouldRunOnce(earliest)) + assert.False(t, ctrl.ShouldRunOnce(earliest)) - // But just one time - assert.False(t, ctrl.ShouldRunOnce(now)) + // Simulate successful reconciliation updating lastRunAt + ctrl.lastRunAt = earliest // We should wait maximum possible time after last reconciliation started - now = now.Add(10*time.Minute - time.Second) - assert.False(t, ctrl.ShouldRunOnce(now)) + beforeNext := ctrl.lastRunAt.Add(ctrl.Interval - time.Second) + assert.False(t, ctrl.ShouldRunOnce(beforeNext)) // After exactly Interval it's OK again to reconcile - now = now.Add(time.Second) - assert.True(t, ctrl.ShouldRunOnce(now)) - - // But not two times - assert.False(t, ctrl.ShouldRunOnce(now)) + onInterval := beforeNext.Add(time.Second) + assert.True(t, ctrl.ShouldRunOnce(onInterval)) + assert.False(t, ctrl.ShouldRunOnce(onInterval)) - // Multiple ingresses or services changes, closer than MinInterval from each other - ctrl.lastRunAt = now - firstChangeTime := now + // Multiple ingresses or services changes, closer than MinEventSyncInterval from each other + ctrl.lastRunAt = onInterval + firstChangeTime := onInterval secondChangeTime := firstChangeTime.Add(time.Second) // First change ctrl.ScheduleRunOnce(firstChangeTime) // Second change ctrl.ScheduleRunOnce(secondChangeTime) - // Executions should be spaced by at least MinEventSyncInterval - assert.False(t, ctrl.ShouldRunOnce(now.Add(5*time.Second))) - - // Should not postpone the reconciliation further than firstChangeTime + MinInterval - now = now.Add(ctrl.MinEventSyncInterval) - assert.True(t, ctrl.ShouldRunOnce(now)) + earliest = ctrl.lastRunAt.Add(ctrl.Interval) + if ready := secondChangeTime.Add(ctrl.MinEventSyncInterval); ready.After(earliest) { + earliest = ready + } + assert.False(t, ctrl.ShouldRunOnce(earliest.Add(-time.Second))) + assert.True(t, ctrl.ShouldRunOnce(earliest)) } func testControllerFiltersDomains(t *testing.T, configuredEndpoints []*endpoint.Endpoint, domainFilter *endpoint.DomainFilter, providerEndpoints []*endpoint.Endpoint, expectedChanges []*plan.Changes) { diff --git a/source/service.go b/source/service.go index 2e73defcf2..53590a8b7d 100644 --- a/source/service.go +++ b/source/service.go @@ -79,6 +79,9 @@ type serviceSource struct { endpointSlicesInformer discoveryinformers.EndpointSliceInformer podInformer coreinformers.PodInformer nodeInformer coreinformers.NodeInformer + nodeEventHandler func() + nodeEventHandlerRegistered bool + nodeEventsNeeded bool serviceTypeFilter *serviceTypes exposeInternalIPv6 bool @@ -401,6 +404,7 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri log.Warnf("Skipping EndpointSlice %s/%s as --service-type-filter disable node informer", endpointSlice.Namespace, endpointSlice.Name) continue } + sc.markNodeEventsNeeded() node, err := sc.nodeInformer.Lister().Get(pod.Spec.NodeName) if err != nil { log.Errorf("Get node[%s] of pod[%s] error: %v; not adding any NodeExternalIP endpoints", pod.Spec.NodeName, pod.GetName(), err) @@ -749,6 +753,8 @@ func (sc *serviceSource) extractNodePortTargets(svc *v1.Service) (endpoint.Targe nodes []*v1.Node ) + sc.markNodeEventsNeeded() + if svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal { nodes = sc.nodesExternalTrafficPolicyTypeLocal(svc) } else { @@ -845,9 +851,151 @@ func (sc *serviceSource) AddEventHandler(_ context.Context, handler func()) { if sc.listenEndpointEvents && sc.serviceTypeFilter.isRequired(v1.ServiceTypeNodePort, v1.ServiceTypeClusterIP) { _, _ = sc.endpointSlicesInformer.Informer().AddEventHandler(eventHandlerFunc(handler)) } - if sc.serviceTypeFilter.isRequired(v1.ServiceTypeNodePort) { - _, _ = sc.nodeInformer.Informer().AddEventHandler(eventHandlerFunc(handler)) + sc.nodeEventHandler = handler + if !sc.nodeEventsNeeded && sc.publishHostIP { + sc.nodeEventsNeeded = true + } + if !sc.nodeEventsNeeded && sc.detectNodeDependentServices() { + sc.nodeEventsNeeded = true + } + sc.ensureNodeEventHandlerRegistered() +} + +func (sc *serviceSource) detectNodeDependentServices() bool { + if sc.nodeInformer == nil { + return false + } + services, err := sc.listServicesForNodeDetection() + if err != nil { + log.Debugf("skipping node dependency detection: %v", err) + return false + } + services = sc.filterByServiceType(services) + services, err = sc.filterByAnnotations(services) + if err != nil { + log.Errorf("unable to apply annotation filter during node dependency detection: %v", err) + return true + } + for _, svc := range services { + if sc.serviceRequiresNodeData(svc) { + return true + } + } + return false +} + +func (sc *serviceSource) listServicesForNodeDetection() ([]*v1.Service, error) { + if sc.serviceInformer == nil { + return nil, fmt.Errorf("service informer not configured") + } + var ( + services []*v1.Service + listErr error + ) + func() { + defer func() { + if r := recover(); r != nil { + listErr = fmt.Errorf("service informer indexer unavailable: %v", r) + } + }() + services, listErr = sc.serviceInformer.Lister().Services(sc.namespace).List(sc.labelSelector) + }() + return services, listErr +} + +func (sc *serviceSource) serviceRequiresNodeData(svc *v1.Service) bool { + if svc.Spec.Type == v1.ServiceTypeNodePort { + return true + } + if svc.Spec.ClusterIP == v1.ClusterIPNone { + endpointsType := getEndpointsTypeFromAnnotations(svc.Annotations) + if endpointsType == EndpointsTypeNodeExternalIP { + return true + } + if sc.publishHostIP { + return true + } + } + return false +} + +func (sc *serviceSource) ensureNodeEventHandlerRegistered() { + if !sc.nodeEventsNeeded || sc.nodeEventHandlerRegistered || sc.nodeInformer == nil || sc.nodeEventHandler == nil { + return + } + _, _ = sc.nodeInformer.Informer().AddEventHandler(&nodeAddressChangeHandler{ + source: sc, + handler: sc.nodeEventHandler, + }) + sc.nodeEventHandlerRegistered = true +} + +func (sc *serviceSource) markNodeEventsNeeded() { + if sc.nodeEventsNeeded { + return + } + sc.nodeEventsNeeded = true + sc.ensureNodeEventHandlerRegistered() +} + +type nodeAddressChangeHandler struct { + source *serviceSource + handler func() +} + +func (h *nodeAddressChangeHandler) OnAdd(obj interface{}, isInInitialList bool) { + if isInInitialList { + return + } + h.handler() +} + +func (h *nodeAddressChangeHandler) OnUpdate(oldObj, newObj interface{}) { + oldNode, okOld := oldObj.(*v1.Node) + newNode, okNew := newObj.(*v1.Node) + if !okOld || !okNew { + h.handler() + return + } + if nodeAddressSetsEqual(oldNode, newNode) { + return + } + h.handler() +} + +func (h *nodeAddressChangeHandler) OnDelete(obj interface{}) { + h.handler() +} + +func nodeAddressSetsEqual(a, b *v1.Node) bool { + return addressSetEquals(nodeAddressSet(a), nodeAddressSet(b)) +} + +func nodeAddressSet(node *v1.Node) map[string]struct{} { + if node == nil { + return nil + } + result := make(map[string]struct{}) + for _, address := range node.Status.Addresses { + switch address.Type { + case v1.NodeExternalIP, v1.NodeInternalIP: + key := string(address.Type) + "|" + address.Address + result[key] = struct{}{} + } + } + return result +} + +func addressSetEquals(a, b map[string]struct{}) bool { + if len(a) != len(b) { + return false + } + for key := range a { + if _, ok := b[key]; !ok { + return false + } } + return true } type serviceTypes struct { diff --git a/source/service_test.go b/source/service_test.go index 79d8e37e6b..1552557973 100644 --- a/source/service_test.go +++ b/source/service_test.go @@ -5018,16 +5018,17 @@ func TestServiceSource_AddEventHandler(t *testing.T) { name string filter []string times int + prepare func(s *serviceSource) asserts func(t *testing.T, s *serviceSource) }{ { name: "AddEventHandler should trigger all event handlers when empty filter is provided", filter: []string{}, - times: 3, + times: 2, asserts: func(t *testing.T, s *serviceSource) { - fakeServiceInformer.AssertNumberOfCalls(t, "Informer", 1) + fakeServiceInformer.AssertNumberOfCalls(t, "Informer", 2) fakeEdpInformer.AssertNumberOfCalls(t, "Informer", 1) - fakeNodeInformer.AssertNumberOfCalls(t, "Informer", 1) + fakeNodeInformer.AssertNumberOfCalls(t, "Informer", 0) }, }, { @@ -5035,7 +5036,7 @@ func TestServiceSource_AddEventHandler(t *testing.T) { filter: []string{string(v1.ServiceTypeExternalName), string(v1.ServiceTypeLoadBalancer)}, times: 1, asserts: func(t *testing.T, s *serviceSource) { - fakeServiceInformer.AssertNumberOfCalls(t, "Informer", 1) + fakeServiceInformer.AssertNumberOfCalls(t, "Informer", 2) fakeEdpInformer.AssertNumberOfCalls(t, "Informer", 0) fakeNodeInformer.AssertNumberOfCalls(t, "Informer", 0) }, @@ -5045,7 +5046,7 @@ func TestServiceSource_AddEventHandler(t *testing.T) { filter: []string{string(v1.ServiceTypeExternalName), string(v1.ServiceTypeLoadBalancer), string(v1.ServiceTypeClusterIP)}, times: 2, asserts: func(t *testing.T, s *serviceSource) { - fakeServiceInformer.AssertNumberOfCalls(t, "Informer", 1) + fakeServiceInformer.AssertNumberOfCalls(t, "Informer", 2) fakeEdpInformer.AssertNumberOfCalls(t, "Informer", 1) fakeNodeInformer.AssertNumberOfCalls(t, "Informer", 0) }, @@ -5054,6 +5055,9 @@ func TestServiceSource_AddEventHandler(t *testing.T) { name: "AddEventHandler should configure all service event handlers", filter: []string{string(v1.ServiceTypeNodePort)}, times: 3, + prepare: func(s *serviceSource) { + s.nodeEventsNeeded = true + }, asserts: func(t *testing.T, s *serviceSource) { fakeServiceInformer.AssertNumberOfCalls(t, "Informer", 1) fakeEdpInformer.AssertNumberOfCalls(t, "Informer", 1) @@ -5085,6 +5089,10 @@ func TestServiceSource_AddEventHandler(t *testing.T) { listenEndpointEvents: true, } + if tt.prepare != nil { + tt.prepare(svcSource) + } + svcSource.AddEventHandler(t.Context(), func() {}) assert.Equal(t, tt.times, infSvc.times+infEdp.times+infNode.times) From 13ba4b8940ab214f6f34481b1ba20ed1dc638174 Mon Sep 17 00:00:00 2001 From: Tobias Harnickell Date: Fri, 19 Sep 2025 12:40:51 +0200 Subject: [PATCH 2/3] fix(service): Dynamic node event handling Reverts controller changes from ccd3cb92 to separate scheduling fix from service logic updates. Introduced new service logic: * Lazy register node informer only when needed * Add node address change handler with minimal comparisons * support detection of node-dependent services before enabling node events * Update service tests Signed-off-by: Tobias Harnickell --- controller/controller.go | 31 +++++++++--- controller/controller_test.go | 65 ++++++++++++------------ source/service.go | 94 ++++++++++------------------------- source/service_test.go | 11 ++-- 4 files changed, 87 insertions(+), 114 deletions(-) diff --git a/controller/controller.go b/controller/controller.go index 8529ff1eb8..bdbddfb64f 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -259,6 +259,24 @@ func (c *Controller) RunOnce(ctx context.Context) error { return nil } +func earliest(r time.Time, times ...time.Time) time.Time { + for _, t := range times { + if t.Before(r) { + r = t + } + } + return r +} + +func latest(r time.Time, times ...time.Time) time.Time { + for _, t := range times { + if t.After(r) { + r = t + } + } + return r +} + // Counts the intersections of records in endpoint and registry. func countMatchingAddressRecords(rec *metricsRecorder, endpoints []*endpoint.Endpoint, registryRecords []*endpoint.Endpoint, metric metrics.GaugeVecMetric) { recordsMap := make(map[string]map[string]struct{}) @@ -300,12 +318,13 @@ func countAddressRecords(rec *metricsRecorder, endpoints []*endpoint.Endpoint, m func (c *Controller) ScheduleRunOnce(now time.Time) { c.runAtMutex.Lock() defer c.runAtMutex.Unlock() - preferred := c.lastRunAt.Add(c.Interval) - eventReadyAt := now.Add(c.MinEventSyncInterval) - if preferred.Before(eventReadyAt) { - preferred = eventReadyAt - } - c.nextRunAt = preferred + c.nextRunAt = latest( + c.lastRunAt.Add(c.MinEventSyncInterval), + earliest( + now.Add(5*time.Second), + c.nextRunAt, + ), + ) } func (c *Controller) ShouldRunOnce(now time.Time) bool { diff --git a/controller/controller_test.go b/controller/controller_test.go index 4f5251f879..5abd5677e5 100644 --- a/controller/controller_test.go +++ b/controller/controller_test.go @@ -280,57 +280,58 @@ func TestRun(t *testing.T) { func TestShouldRunOnce(t *testing.T) { ctrl := &Controller{Interval: 10 * time.Minute, MinEventSyncInterval: 15 * time.Second} - start := time.Now() + now := time.Now() // First run of Run loop should execute RunOnce - assert.True(t, ctrl.ShouldRunOnce(start)) - assert.Equal(t, start.Add(ctrl.Interval), ctrl.nextRunAt) + assert.True(t, ctrl.ShouldRunOnce(now)) + assert.Equal(t, now.Add(10*time.Minute), ctrl.nextRunAt) // Second run should not - assert.False(t, ctrl.ShouldRunOnce(start)) - ctrl.lastRunAt = start + assert.False(t, ctrl.ShouldRunOnce(now)) + ctrl.lastRunAt = now - eventTime := start.Add(10 * time.Second) + now = now.Add(10 * time.Second) // Changes happen in ingresses or services - ctrl.ScheduleRunOnce(eventTime) - ctrl.ScheduleRunOnce(eventTime) + ctrl.ScheduleRunOnce(now) + ctrl.ScheduleRunOnce(now) - // Because we batch changes, ShouldRunOnce returns False until the throttle conditions are satisfied - earliest := ctrl.lastRunAt.Add(ctrl.Interval) - if ready := eventTime.Add(ctrl.MinEventSyncInterval); ready.After(earliest) { - earliest = ready - } - assert.False(t, ctrl.ShouldRunOnce(earliest.Add(-time.Second))) - assert.True(t, ctrl.ShouldRunOnce(earliest)) - assert.False(t, ctrl.ShouldRunOnce(earliest)) + // Because we batch changes, ShouldRunOnce returns False at first + assert.False(t, ctrl.ShouldRunOnce(now)) + assert.False(t, ctrl.ShouldRunOnce(now.Add(100*time.Microsecond))) + + // But after MinInterval we should run reconciliation + now = now.Add(5 * time.Second) + assert.True(t, ctrl.ShouldRunOnce(now)) - // Simulate successful reconciliation updating lastRunAt - ctrl.lastRunAt = earliest + // But just one time + assert.False(t, ctrl.ShouldRunOnce(now)) // We should wait maximum possible time after last reconciliation started - beforeNext := ctrl.lastRunAt.Add(ctrl.Interval - time.Second) - assert.False(t, ctrl.ShouldRunOnce(beforeNext)) + now = now.Add(10*time.Minute - time.Second) + assert.False(t, ctrl.ShouldRunOnce(now)) // After exactly Interval it's OK again to reconcile - onInterval := beforeNext.Add(time.Second) - assert.True(t, ctrl.ShouldRunOnce(onInterval)) - assert.False(t, ctrl.ShouldRunOnce(onInterval)) + now = now.Add(time.Second) + assert.True(t, ctrl.ShouldRunOnce(now)) + + // But not two times + assert.False(t, ctrl.ShouldRunOnce(now)) - // Multiple ingresses or services changes, closer than MinEventSyncInterval from each other - ctrl.lastRunAt = onInterval - firstChangeTime := onInterval + // Multiple ingresses or services changes, closer than MinInterval from each other + ctrl.lastRunAt = now + firstChangeTime := now secondChangeTime := firstChangeTime.Add(time.Second) // First change ctrl.ScheduleRunOnce(firstChangeTime) // Second change ctrl.ScheduleRunOnce(secondChangeTime) - earliest = ctrl.lastRunAt.Add(ctrl.Interval) - if ready := secondChangeTime.Add(ctrl.MinEventSyncInterval); ready.After(earliest) { - earliest = ready - } - assert.False(t, ctrl.ShouldRunOnce(earliest.Add(-time.Second))) - assert.True(t, ctrl.ShouldRunOnce(earliest)) + // Executions should be spaced by at least MinEventSyncInterval + assert.False(t, ctrl.ShouldRunOnce(now.Add(5*time.Second))) + + // Should not postpone the reconciliation further than firstChangeTime + MinInterval + now = now.Add(ctrl.MinEventSyncInterval) + assert.True(t, ctrl.ShouldRunOnce(now)) } func testControllerFiltersDomains(t *testing.T, configuredEndpoints []*endpoint.Endpoint, domainFilter *endpoint.DomainFilter, providerEndpoints []*endpoint.Endpoint, expectedChanges []*plan.Changes) { diff --git a/source/service.go b/source/service.go index 53590a8b7d..932bf9d01b 100644 --- a/source/service.go +++ b/source/service.go @@ -80,8 +80,6 @@ type serviceSource struct { podInformer coreinformers.PodInformer nodeInformer coreinformers.NodeInformer nodeEventHandler func() - nodeEventHandlerRegistered bool - nodeEventsNeeded bool serviceTypeFilter *serviceTypes exposeInternalIPv6 bool @@ -400,11 +398,10 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri targets := annotations.TargetsFromTargetAnnotation(pod.Annotations) if len(targets) == 0 { if endpointsType == EndpointsTypeNodeExternalIP { - if sc.nodeInformer == nil { + if sc.nodeInformer == nil { log.Warnf("Skipping EndpointSlice %s/%s as --service-type-filter disable node informer", endpointSlice.Namespace, endpointSlice.Name) continue } - sc.markNodeEventsNeeded() node, err := sc.nodeInformer.Lister().Get(pod.Spec.NodeName) if err != nil { log.Errorf("Get node[%s] of pod[%s] error: %v; not adding any NodeExternalIP endpoints", pod.Spec.NodeName, pod.GetName(), err) @@ -753,7 +750,6 @@ func (sc *serviceSource) extractNodePortTargets(svc *v1.Service) (endpoint.Targe nodes []*v1.Node ) - sc.markNodeEventsNeeded() if svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal { nodes = sc.nodesExternalTrafficPolicyTypeLocal(svc) @@ -852,92 +848,52 @@ func (sc *serviceSource) AddEventHandler(_ context.Context, handler func()) { _, _ = sc.endpointSlicesInformer.Informer().AddEventHandler(eventHandlerFunc(handler)) } sc.nodeEventHandler = handler - if !sc.nodeEventsNeeded && sc.publishHostIP { - sc.nodeEventsNeeded = true - } - if !sc.nodeEventsNeeded && sc.detectNodeDependentServices() { - sc.nodeEventsNeeded = true + // Register node handler eagerly if node data may influence endpoints. + if sc.nodeInformer != nil && sc.nodeEventHandler != nil { + explicitNodePort := sc.serviceTypeFilter != nil && sc.serviceTypeFilter.enabled && sc.serviceTypeFilter.types[v1.ServiceTypeNodePort] + if sc.publishHostIP || explicitNodePort || sc.anyServiceRequiresNodeData() { + _, _ = sc.nodeInformer.Informer().AddEventHandler(&nodeAddressChangeHandler{source: sc, handler: sc.nodeEventHandler}) + } } - sc.ensureNodeEventHandlerRegistered() } -func (sc *serviceSource) detectNodeDependentServices() bool { - if sc.nodeInformer == nil { +// anyServiceRequiresNodeData checks current services for node-related endpoint needs +func (sc *serviceSource) anyServiceRequiresNodeData() bool { + if sc.serviceInformer == nil { return false } - services, err := sc.listServicesForNodeDetection() + var services []*v1.Service + var err error + + func() { + defer func() { _ = recover() }() + services, err = sc.serviceInformer.Lister().Services(sc.namespace).List(sc.labelSelector) + }() if err != nil { - log.Debugf("skipping node dependency detection: %v", err) return false } services = sc.filterByServiceType(services) services, err = sc.filterByAnnotations(services) if err != nil { - log.Errorf("unable to apply annotation filter during node dependency detection: %v", err) return true } + if len(services) == 0 { + return false + } for _, svc := range services { - if sc.serviceRequiresNodeData(svc) { + if svc.Spec.Type == v1.ServiceTypeNodePort { return true } - } - return false -} - -func (sc *serviceSource) listServicesForNodeDetection() ([]*v1.Service, error) { - if sc.serviceInformer == nil { - return nil, fmt.Errorf("service informer not configured") - } - var ( - services []*v1.Service - listErr error - ) - func() { - defer func() { - if r := recover(); r != nil { - listErr = fmt.Errorf("service informer indexer unavailable: %v", r) + if svc.Spec.ClusterIP == v1.ClusterIPNone { + endpointsType := getEndpointsTypeFromAnnotations(svc.Annotations) + if endpointsType == EndpointsTypeNodeExternalIP || sc.publishHostIP { + return true } - }() - services, listErr = sc.serviceInformer.Lister().Services(sc.namespace).List(sc.labelSelector) - }() - return services, listErr -} - -func (sc *serviceSource) serviceRequiresNodeData(svc *v1.Service) bool { - if svc.Spec.Type == v1.ServiceTypeNodePort { - return true - } - if svc.Spec.ClusterIP == v1.ClusterIPNone { - endpointsType := getEndpointsTypeFromAnnotations(svc.Annotations) - if endpointsType == EndpointsTypeNodeExternalIP { - return true - } - if sc.publishHostIP { - return true } } return false } -func (sc *serviceSource) ensureNodeEventHandlerRegistered() { - if !sc.nodeEventsNeeded || sc.nodeEventHandlerRegistered || sc.nodeInformer == nil || sc.nodeEventHandler == nil { - return - } - _, _ = sc.nodeInformer.Informer().AddEventHandler(&nodeAddressChangeHandler{ - source: sc, - handler: sc.nodeEventHandler, - }) - sc.nodeEventHandlerRegistered = true -} - -func (sc *serviceSource) markNodeEventsNeeded() { - if sc.nodeEventsNeeded { - return - } - sc.nodeEventsNeeded = true - sc.ensureNodeEventHandlerRegistered() -} - type nodeAddressChangeHandler struct { source *serviceSource handler func() diff --git a/source/service_test.go b/source/service_test.go index 1552557973..9a7addd73d 100644 --- a/source/service_test.go +++ b/source/service_test.go @@ -5022,7 +5022,7 @@ func TestServiceSource_AddEventHandler(t *testing.T) { asserts func(t *testing.T, s *serviceSource) }{ { - name: "AddEventHandler should trigger all event handlers when empty filter is provided", + name: "AddEventHandler registers service + endpoint handlers only (empty filter, no node need)", filter: []string{}, times: 2, asserts: func(t *testing.T, s *serviceSource) { @@ -5032,7 +5032,7 @@ func TestServiceSource_AddEventHandler(t *testing.T) { }, }, { - name: "AddEventHandler should trigger only service event handler", + name: "AddEventHandler only service handler (no endpoint slice types enabled)", filter: []string{string(v1.ServiceTypeExternalName), string(v1.ServiceTypeLoadBalancer)}, times: 1, asserts: func(t *testing.T, s *serviceSource) { @@ -5042,7 +5042,7 @@ func TestServiceSource_AddEventHandler(t *testing.T) { }, }, { - name: "AddEventHandler should configure only service event handler", + name: "AddEventHandler service + endpoint slice handlers", filter: []string{string(v1.ServiceTypeExternalName), string(v1.ServiceTypeLoadBalancer), string(v1.ServiceTypeClusterIP)}, times: 2, asserts: func(t *testing.T, s *serviceSource) { @@ -5052,12 +5052,9 @@ func TestServiceSource_AddEventHandler(t *testing.T) { }, }, { - name: "AddEventHandler should configure all service event handlers", + name: "AddEventHandler registers node handler for NodePort", filter: []string{string(v1.ServiceTypeNodePort)}, times: 3, - prepare: func(s *serviceSource) { - s.nodeEventsNeeded = true - }, asserts: func(t *testing.T, s *serviceSource) { fakeServiceInformer.AssertNumberOfCalls(t, "Informer", 1) fakeEdpInformer.AssertNumberOfCalls(t, "Informer", 1) From a7b49c4147b260a02b7aa29dd8d09195ac575ecb Mon Sep 17 00:00:00 2001 From: Tobias Harnickell Date: Fri, 19 Sep 2025 12:53:04 +0200 Subject: [PATCH 3/3] fix(service): Run linter Signed-off-by: Tobias Harnickell --- source/service.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/service.go b/source/service.go index 932bf9d01b..f8bed5cef3 100644 --- a/source/service.go +++ b/source/service.go @@ -398,7 +398,7 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri targets := annotations.TargetsFromTargetAnnotation(pod.Annotations) if len(targets) == 0 { if endpointsType == EndpointsTypeNodeExternalIP { - if sc.nodeInformer == nil { + if sc.nodeInformer == nil { log.Warnf("Skipping EndpointSlice %s/%s as --service-type-filter disable node informer", endpointSlice.Namespace, endpointSlice.Name) continue } @@ -750,7 +750,6 @@ func (sc *serviceSource) extractNodePortTargets(svc *v1.Service) (endpoint.Targe nodes []*v1.Node ) - if svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal { nodes = sc.nodesExternalTrafficPolicyTypeLocal(svc) } else {