Skip to content

Commit 9540136

Browse files
authored
feat(kube): Add kubeconfig autodiscovery and loading (#9)
Signed-off-by: Kyle Squizzato <[email protected]>
1 parent 8bda323 commit 9540136

13 files changed

+1284
-46
lines changed

.prettierignore

Whitespace-only changes.

README.md

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ FLAGS
4545
-?, --help display help
4646
-p, --plain if true, output in plain text format (default: JSON format)
4747
-u, --unique if true, only show the first instance of each connection
48-
--kubeconfig STRING path to kubeconfig file (Kubernetes lookups enabled if provided)
49-
--external if true, only show traffic to external destinations
48+
--kubeconfig STRING path to kubeconfig file (Kubernetes lookups enabled if provided; if not set, kubeconfig auto-discovery is used by default)
49+
-e, --external if true, only show traffic to external destinations
5050
--internal-networks STRING comma-separated list of internal network CIDRs to filter out when using --external
5151
(default: 127.0.0.0/8,10.0.0.0/8,172.16.0.0/12,192.168.0.0/16)
5252
--version display program version
@@ -82,14 +82,4 @@ sudo apt install linux-headers-$(uname -r) \
8282
linux-tools-generic
8383
```
8484
85-
### Testing a Release with Cluster Provisioner
86-
Build the test release of `pktstat-bpf`:
8785
88-
```shell
89-
make generate build
90-
```
91-
92-
Move the release into the `network-report-collector-path` set in
93-
`/etc/cluster-provisioner/cluster-provisioner.yaml` (default: `/opt/cluster-provisioner/bin/pktstat-bpf`) and spin up a cluster
94-
or VM. Cluster Provisioner will use the binary at that path to collect
95-
network reports.

config_discovery.go

Lines changed: 340 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,340 @@
1+
// @license
2+
// Copyright (C) 2024 Dinko Korunic
3+
//
4+
// Permission is hereby granted, free of charge, to any person obtaining a copy
5+
// of this software and associated documentation files (the "Software"), to deal
6+
// in the Software without restriction, including without limitation the rights
7+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
// copies of the Software, and to permit persons to whom the Software is
9+
// furnished to do so, subject to the following conditions:
10+
//
11+
// The above copyright notice and this permission notice shall be included in all
12+
// copies or substantial portions of the Software.
13+
//
14+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20+
// SOFTWARE.
21+
22+
package main
23+
24+
import (
25+
"context"
26+
"fmt"
27+
"log"
28+
"os"
29+
"path/filepath"
30+
"sync"
31+
32+
"github.com/fsnotify/fsnotify"
33+
"k8s.io/client-go/tools/clientcmd"
34+
)
35+
36+
// ConfigChangeCallback is called when a valid kubeconfig is found or changed
37+
type ConfigChangeCallback func(configPath string)
38+
39+
// ConfigDiscovery manages kubeconfig discovery and monitoring
40+
type ConfigDiscovery interface {
41+
// Start begins watching for kubeconfig files
42+
Start(ctx context.Context) error
43+
44+
// GetCurrentPath returns the active kubeconfig path
45+
GetCurrentPath() string
46+
47+
// Subscribe registers a callback for config changes
48+
Subscribe(callback ConfigChangeCallback)
49+
}
50+
51+
// configDiscovery is the concrete implementation of ConfigDiscovery
52+
type configDiscovery struct {
53+
// Standard paths to scan for kubeconfig files
54+
standardPaths []string
55+
56+
// Current active kubeconfig path
57+
currentPath string
58+
59+
// Mutex to protect currentPath
60+
pathMutex sync.RWMutex
61+
62+
// Callbacks to invoke when config changes
63+
callbacks []ConfigChangeCallback
64+
65+
// Mutex to protect callbacks
66+
callbackMutex sync.RWMutex
67+
68+
// Filesystem watcher
69+
watcher *fsnotify.Watcher
70+
71+
// Whether discovery is running
72+
running bool
73+
74+
// Mutex to protect running state
75+
runningMutex sync.Mutex
76+
}
77+
78+
// NewConfigDiscovery creates a new ConfigDiscovery instance
79+
func NewConfigDiscovery() ConfigDiscovery {
80+
homeDir, _ := os.UserHomeDir()
81+
82+
return &configDiscovery{
83+
standardPaths: []string{
84+
os.Getenv("KUBECONFIG"),
85+
filepath.Join(homeDir, ".kube", "config"),
86+
"/tmp/kubeconfig-local", // kind
87+
"/var/lib/rancher/k3s/agent/k3scontroller.kubeconfig", // k3s secondary nodes (must be before k3s control plane)
88+
"/etc/rancher/k3s/k3s.yaml", // k3s control plane
89+
"/etc/rancher/rke2/rke2.yaml", // rke2
90+
"/var/lib/embedded-cluster/k0s/pki/admin.conf", // embedded-cluster control plane
91+
"/var/lib/embedded-cluster/k0s/kubelet.conf", // embedded-cluster secondary nodes
92+
"/var/home/core/kubeconfig", // openshift
93+
"/etc/kubernetes/admin.conf", // kurl control plane
94+
"/etc/kubernetes/kubelet.conf", // kurl secondary nodes
95+
},
96+
callbacks: make([]ConfigChangeCallback, 0),
97+
}
98+
}
99+
100+
// Start begins watching for kubeconfig files
101+
func (cd *configDiscovery) Start(ctx context.Context) error {
102+
cd.runningMutex.Lock()
103+
if cd.running {
104+
cd.runningMutex.Unlock()
105+
return fmt.Errorf("config discovery already running")
106+
}
107+
cd.running = true
108+
cd.runningMutex.Unlock()
109+
110+
// Initialize filesystem watcher
111+
watcher, err := fsnotify.NewWatcher()
112+
if err != nil {
113+
cd.runningMutex.Lock()
114+
cd.running = false
115+
cd.runningMutex.Unlock()
116+
return fmt.Errorf("failed to create filesystem watcher: %v", err)
117+
}
118+
cd.watcher = watcher
119+
120+
// Scan standard paths immediately
121+
initialPath := cd.scanStandardPaths()
122+
if initialPath != "" {
123+
cd.setCurrentPath(initialPath)
124+
cd.notifyCallbacks(initialPath)
125+
}
126+
127+
// Watch directories for file creation/modification
128+
cd.setupWatchers()
129+
130+
// Start watching goroutine
131+
go cd.watchLoop(ctx)
132+
133+
return nil
134+
}
135+
136+
// GetCurrentPath returns the active kubeconfig path
137+
func (cd *configDiscovery) GetCurrentPath() string {
138+
cd.pathMutex.RLock()
139+
defer cd.pathMutex.RUnlock()
140+
return cd.currentPath
141+
}
142+
143+
// Subscribe registers a callback for config changes
144+
func (cd *configDiscovery) Subscribe(callback ConfigChangeCallback) {
145+
cd.callbackMutex.Lock()
146+
defer cd.callbackMutex.Unlock()
147+
cd.callbacks = append(cd.callbacks, callback)
148+
}
149+
150+
// scanStandardPaths scans all standard paths and returns the first valid kubeconfig
151+
func (cd *configDiscovery) scanStandardPaths() string {
152+
for _, path := range cd.standardPaths {
153+
if path == "" {
154+
continue
155+
}
156+
157+
// Check if file exists
158+
if _, err := os.Stat(path); err != nil {
159+
continue
160+
}
161+
162+
// Validate the kubeconfig
163+
if cd.validateKubeconfig(path) {
164+
log.Printf("Found valid kubeconfig at: %s", path)
165+
return path
166+
} else {
167+
log.Printf("Found kubeconfig at %s but it is invalid", path)
168+
}
169+
}
170+
171+
return ""
172+
}
173+
174+
// validateKubeconfig checks if a file is a valid kubeconfig
175+
func (cd *configDiscovery) validateKubeconfig(path string) bool {
176+
// Try to load the config
177+
_, err := clientcmd.LoadFromFile(path)
178+
if err != nil {
179+
log.Printf("Invalid kubeconfig at %s: %v", path, err)
180+
return false
181+
}
182+
183+
return true
184+
}
185+
186+
// setupWatchers sets up filesystem watchers for kubeconfig directories
187+
func (cd *configDiscovery) setupWatchers() {
188+
// Watch directories that might contain kubeconfig files
189+
watchDirs := make(map[string]bool)
190+
191+
for _, path := range cd.standardPaths {
192+
if path == "" {
193+
continue
194+
}
195+
196+
dir := filepath.Dir(path)
197+
if _, ok := watchDirs[dir]; ok {
198+
continue
199+
}
200+
watchDirs[dir] = true
201+
202+
// Check if directory exists
203+
if _, err := os.Stat(dir); err != nil {
204+
// Directory doesn't exist yet, try to watch parent
205+
parentDir := filepath.Dir(dir)
206+
if _, err := os.Stat(parentDir); err == nil {
207+
_ = cd.watcher.Add(parentDir)
208+
log.Printf("Watching parent directory for kubeconfig: %s", parentDir)
209+
}
210+
continue
211+
}
212+
213+
// Watch the directory
214+
err := cd.watcher.Add(dir)
215+
if err != nil {
216+
log.Printf("Failed to watch directory %s: %v", dir, err)
217+
} else {
218+
log.Printf("Watching directory for kubeconfig: %s", dir)
219+
}
220+
}
221+
}
222+
223+
// watchLoop processes filesystem events
224+
func (cd *configDiscovery) watchLoop(ctx context.Context) {
225+
defer cd.watcher.Close()
226+
227+
for {
228+
select {
229+
case <-ctx.Done():
230+
log.Printf("Config discovery stopped")
231+
cd.runningMutex.Lock()
232+
cd.running = false
233+
cd.runningMutex.Unlock()
234+
return
235+
236+
case event, ok := <-cd.watcher.Events:
237+
if !ok {
238+
return
239+
}
240+
241+
// Check if this event is for one of our watched paths
242+
if cd.isWatchedPath(event.Name) {
243+
if event.Op&fsnotify.Write == fsnotify.Write || event.Op&fsnotify.Create == fsnotify.Create {
244+
log.Printf("Detected kubeconfig change: %s (op: %s)", event.Name, event.Op)
245+
246+
// Validate and potentially update current path
247+
if cd.validateKubeconfig(event.Name) {
248+
currentPath := cd.GetCurrentPath()
249+
if currentPath != event.Name {
250+
log.Printf("Switching to kubeconfig: %s", event.Name)
251+
cd.setCurrentPath(event.Name)
252+
cd.notifyCallbacks(event.Name)
253+
} else {
254+
log.Printf("Kubeconfig modified, reloading: %s", event.Name)
255+
cd.notifyCallbacks(event.Name)
256+
}
257+
}
258+
} else if event.Op&fsnotify.Remove == fsnotify.Remove {
259+
currentPath := cd.GetCurrentPath()
260+
if currentPath == event.Name {
261+
log.Printf("Current kubeconfig removed: %s, searching for alternative", event.Name)
262+
cd.setCurrentPath("")
263+
264+
// Try to find another valid config
265+
newPath := cd.scanStandardPaths()
266+
if newPath != "" {
267+
cd.setCurrentPath(newPath)
268+
cd.notifyCallbacks(newPath)
269+
} else {
270+
log.Printf("No alternative kubeconfig found")
271+
// Notify callbacks with empty path to disable Kubernetes features
272+
cd.notifyCallbacks("")
273+
}
274+
}
275+
}
276+
}
277+
278+
// If a directory was created, set up watchers for it
279+
if event.Op&fsnotify.Create == fsnotify.Create {
280+
if info, err := os.Stat(event.Name); err == nil && info.IsDir() {
281+
if cd.shouldWatchDir(event.Name) {
282+
_ = cd.watcher.Add(event.Name)
283+
log.Printf("Started watching newly created directory: %s", event.Name)
284+
}
285+
}
286+
}
287+
288+
case err, ok := <-cd.watcher.Errors:
289+
if !ok {
290+
return
291+
}
292+
log.Printf("Filesystem watcher error: %v", err)
293+
}
294+
}
295+
}
296+
297+
// isWatchedPath checks if a path matches one of our standard paths
298+
func (cd *configDiscovery) isWatchedPath(path string) bool {
299+
for _, standardPath := range cd.standardPaths {
300+
if standardPath == "" {
301+
continue
302+
}
303+
if path == standardPath {
304+
return true
305+
}
306+
}
307+
return false
308+
}
309+
310+
// shouldWatchDir checks if we should watch a directory
311+
func (cd *configDiscovery) shouldWatchDir(dir string) bool {
312+
for _, standardPath := range cd.standardPaths {
313+
if standardPath == "" {
314+
continue
315+
}
316+
if filepath.Dir(standardPath) == dir {
317+
return true
318+
}
319+
}
320+
return false
321+
}
322+
323+
// setCurrentPath sets the current kubeconfig path
324+
func (cd *configDiscovery) setCurrentPath(path string) {
325+
cd.pathMutex.Lock()
326+
defer cd.pathMutex.Unlock()
327+
cd.currentPath = path
328+
}
329+
330+
// notifyCallbacks invokes all registered callbacks
331+
func (cd *configDiscovery) notifyCallbacks(path string) {
332+
cd.callbackMutex.RLock()
333+
callbacks := make([]ConfigChangeCallback, len(cd.callbacks))
334+
copy(callbacks, cd.callbacks)
335+
cd.callbackMutex.RUnlock()
336+
337+
for _, callback := range callbacks {
338+
callback(path)
339+
}
340+
}

0 commit comments

Comments
 (0)