Skip to content

Commit b73698c

Browse files
radioheadclaude
andauthored
Add EventTimeout option and use contextual ListerWatcher methods (#901)
### What - Add EventTimeout option to CustomCacheInformerOptions and KubernetesBasedInformerOptions - Update ListerWatcher to use contextual ListWithContextFunc and WatchFuncWithContext - Modify NewCustomCacheInformer signature to require CustomCacheInformerOptions parameter ### Why - EventTimeout prevents resource leaks by dropping events that take too long to process - Contextual ListerWatcher improves cancellation handling and resource cleanup during watch operations - Configurable options provide better control over informer behavior Part of #902 --------- Signed-off-by: Igor Suleymanov <[email protected]> Co-authored-by: Claude <[email protected]>
1 parent 39ebadc commit b73698c

File tree

11 files changed

+2389
-70
lines changed

11 files changed

+2389
-70
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,7 @@ vendor/
2525

2626
# gorelease dist dir
2727
dist/
28+
29+
# WIP docs
30+
docs/wip
31+

.golangci.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ linters:
108108
- third_party$
109109
- builtin$
110110
- examples$
111+
- k8s/client
112+
- k8s/cache
111113
formatters:
112114
enable:
113115
- gofmt
@@ -124,3 +126,5 @@ formatters:
124126
- third_party$
125127
- builtin$
126128
- examples$
129+
- k8s/client
130+
- k8s/cache

AGENTS.md

Lines changed: 972 additions & 0 deletions
Large diffs are not rendered by default.

CLAUDE.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
AGENTS.md

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ require (
4141
k8s.io/apiserver v0.34.1
4242
k8s.io/client-go v0.34.1
4343
k8s.io/gengo/v2 v2.0.0-20250604051438-85fd79dbfd9f
44+
k8s.io/klog/v2 v2.130.1
4445
k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b
4546
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397
4647
sigs.k8s.io/structured-merge-diff/v6 v6.3.0
@@ -140,7 +141,6 @@ require (
140141
gopkg.in/inf.v0 v0.9.1 // indirect
141142
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
142143
k8s.io/component-base v0.34.1 // indirect
143-
k8s.io/klog/v2 v2.130.1 // indirect
144144
k8s.io/kms v0.34.1 // indirect
145145
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 // indirect
146146
sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 // indirect

k8s/cache/controller.go

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
// The code in this file is copied from k8s.io/client-go/tools/cache/controller.go.
2+
// It contains minor modifications to the original code.
3+
4+
/*
5+
Copyright 2014 The Kubernetes Authors.
6+
7+
Licensed under the Apache License, Version 2.0 (the "License");
8+
you may not use this file except in compliance with the License.
9+
You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing, software
14+
distributed under the License is distributed on an "AS IS" BASIS,
15+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
See the License for the specific language governing permissions and
17+
limitations under the License.
18+
*/
19+
20+
package cache
21+
22+
import (
23+
"context"
24+
"sync"
25+
"time"
26+
27+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
28+
"k8s.io/apimachinery/pkg/util/wait"
29+
"k8s.io/client-go/tools/cache"
30+
"k8s.io/utils/clock"
31+
)
32+
33+
var _ cache.Controller = &Controller{}
34+
35+
// Controller is a controller that uses a reflector to watch a resource and update a queue.
36+
// It is copied over from k8s.io/client-go/tools/cache/controller.go.
37+
type Controller struct {
38+
config cache.Config
39+
clock clock.Clock
40+
reflector *Reflector
41+
reflectorMutex sync.RWMutex
42+
// UseWatchList if turned on instructs the reflector to open a stream to bring data from the API server.
43+
// Defaults to false.
44+
UseWatchList bool
45+
// WatchListPageSize is the requested chunk size for paginated LIST operations.
46+
// An empty value (0) will use client-go's default pagination behavior.
47+
WatchListPageSize int64
48+
}
49+
50+
// NewController makes a new Controller from the given Config.
51+
func NewController(c *cache.Config) *Controller {
52+
ctlr := &Controller{
53+
config: *c,
54+
clock: &clock.RealClock{},
55+
}
56+
return ctlr
57+
}
58+
59+
// Run implements [Controller.Run].
60+
func (c *Controller) Run(stopCh <-chan struct{}) {
61+
c.RunWithContext(wait.ContextForChannel(stopCh))
62+
}
63+
64+
// RunWithContext implements [Controller.RunWithContext].
65+
func (c *Controller) RunWithContext(ctx context.Context) {
66+
defer utilruntime.HandleCrashWithContext(ctx)
67+
68+
go func() {
69+
<-ctx.Done()
70+
c.config.Queue.Close()
71+
}()
72+
73+
var useWatchList *bool
74+
if c.UseWatchList {
75+
val := true
76+
useWatchList = &val
77+
}
78+
79+
r := NewReflectorWithOptions(
80+
cache.ToListerWatcherWithContext(c.config.ListerWatcher),
81+
c.config.ObjectType,
82+
c.config.Queue,
83+
ReflectorOptions{
84+
ResyncPeriod: c.config.FullResyncPeriod,
85+
MinWatchTimeout: c.config.MinWatchTimeout,
86+
TypeDescription: c.config.ObjectDescription,
87+
Clock: c.clock,
88+
UseWatchList: useWatchList,
89+
},
90+
)
91+
r.ShouldResync = c.config.ShouldResync
92+
r.WatchListPageSize = c.WatchListPageSize
93+
94+
c.reflectorMutex.Lock()
95+
c.reflector = r
96+
c.reflectorMutex.Unlock()
97+
98+
var wg wait.Group
99+
100+
wg.StartWithContext(ctx, r.RunWithContext)
101+
102+
wait.UntilWithContext(ctx, c.processLoop, time.Second)
103+
wg.Wait()
104+
}
105+
106+
// HasSynced returns true once this cache has completed an initial resource listing.
107+
func (c *Controller) HasSynced() bool {
108+
return c.config.Queue.HasSynced()
109+
}
110+
111+
// LastSyncResourceVersion returns the last sync resource version of the cache.
112+
func (c *Controller) LastSyncResourceVersion() string {
113+
c.reflectorMutex.RLock()
114+
defer c.reflectorMutex.RUnlock()
115+
if c.reflector == nil {
116+
return ""
117+
}
118+
return c.reflector.LastSyncResourceVersion()
119+
}
120+
121+
// processLoop drains the work queue.
122+
func (c *Controller) processLoop(ctx context.Context) {
123+
for {
124+
select {
125+
case <-ctx.Done():
126+
// If the context is canceled, we have to exit the loop, even without draining the queue.
127+
// So we close the queue and return.
128+
c.config.Queue.Close()
129+
return
130+
default:
131+
_, err := c.config.Queue.Pop(cache.PopProcessFunc(c.config.Process))
132+
if err != nil {
133+
if err == cache.ErrFIFOClosed {
134+
return
135+
}
136+
}
137+
}
138+
}
139+
}

0 commit comments

Comments
 (0)