Skip to content

Commit 7588f68

Browse files
committed
Don't block fsnotify watcher during stack application
Decouple the stack application from the fsnotify watcher loop. This prevents the loop from stalling due to a long-running application and reduces the risk of the operating system buffer getting overflowed. Trigger applications via a separate channel with a buffer size of one item. This naturally implements debouncing. Fixes a panic that could occur when the Run method exits before the initial apply event has been sent from the separate goroutine. Signed-off-by: Tom Wieczorek <[email protected]>
1 parent b622a54 commit 7588f68

File tree

1 file changed

+39
-38
lines changed

1 file changed

+39
-38
lines changed

pkg/applier/stackapplier.go

Lines changed: 39 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"time"
1212

1313
"github.com/avast/retry-go"
14-
"github.com/k0sproject/k0s/pkg/debounce"
1514
"github.com/k0sproject/k0s/pkg/kubernetes"
1615

1716
"github.com/fsnotify/fsnotify"
@@ -55,57 +54,59 @@ func (s *StackApplier) Run(ctx context.Context) error {
5554
return nil // The context is already done.
5655
}
5756

57+
trigger := make(chan struct{}, 1)
58+
ctx, cancel := context.WithCancel(ctx)
59+
defer cancel()
60+
5861
watcher, err := fsnotify.NewWatcher()
5962
if err != nil {
6063
return fmt.Errorf("failed to create watcher: %w", err)
6164
}
62-
defer watcher.Close()
63-
64-
debounceCtx, cancelDebouncer := context.WithCancel(ctx)
65-
defer cancelDebouncer()
66-
67-
debouncer := debounce.Debouncer[fsnotify.Event]{
68-
Input: watcher.Events,
69-
Timeout: 1 * time.Second,
70-
Filter: s.triggersApply,
71-
Callback: func(fsnotify.Event) { s.apply(debounceCtx) },
72-
}
73-
74-
// Send an artificial event to ensure that an initial apply will happen.
75-
go func() { watcher.Events <- fsnotify.Event{} }()
7665

77-
// Consume and log any errors.
7866
go func() {
79-
for {
80-
err, ok := <-watcher.Errors
81-
if !ok {
82-
return
83-
}
84-
s.log.WithError(err).Error("Error while watching stack")
85-
}
67+
defer watcher.Close()
68+
defer close(trigger)
69+
err = s.runWatcher(watcher, trigger, ctx.Done())
8670
}()
8771

88-
err = watcher.Add(s.path)
89-
if err != nil {
90-
return fmt.Errorf("failed to watch %q: %w", s.path, err)
72+
if addErr := watcher.Add(s.path); addErr != nil {
73+
return fmt.Errorf("failed to watch %q: %w", s.path, addErr)
74+
}
75+
76+
for range trigger {
77+
s.apply(ctx)
9178
}
9279

93-
_ = debouncer.Run(debounceCtx)
94-
return nil
80+
return err
9581
}
9682

97-
func (*StackApplier) triggersApply(event fsnotify.Event) bool {
98-
// Always let the initial apply happen
99-
if event == (fsnotify.Event{}) {
100-
return true
101-
}
83+
func (s *StackApplier) runWatcher(watcher *fsnotify.Watcher, trigger chan<- struct{}, stop <-chan struct{}) error {
84+
const timeout = 1 * time.Second // debounce events for one second
85+
timer := time.NewTimer(timeout)
86+
defer timer.Stop()
10287

103-
// Only consider events on manifest files
104-
if match, _ := filepath.Match(manifestFilePattern, filepath.Base(event.Name)); !match {
105-
return false
106-
}
88+
for {
89+
select {
90+
case err := <-watcher.Errors:
91+
return fmt.Errorf("while watching stack: %w", err)
92+
93+
case event := <-watcher.Events:
94+
// Only consider events on manifest files
95+
if match, _ := filepath.Match(manifestFilePattern, filepath.Base(event.Name)); !match {
96+
continue
97+
}
98+
timer.Reset(timeout)
10799

108-
return true
100+
case <-timer.C:
101+
select {
102+
case trigger <- struct{}{}:
103+
default:
104+
}
105+
106+
case <-stop:
107+
return nil
108+
}
109+
}
109110
}
110111

111112
func (s *StackApplier) apply(ctx context.Context) {

0 commit comments

Comments
 (0)