Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 43 additions & 39 deletions pkg/applier/stackapplier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ package applier

import (
"context"
"errors"
"fmt"
"path/filepath"
"sync"
"time"

"github.com/avast/retry-go"
"github.com/k0sproject/k0s/pkg/debounce"
"github.com/k0sproject/k0s/pkg/kubernetes"

"github.com/fsnotify/fsnotify"
Expand Down Expand Up @@ -49,63 +49,67 @@ func NewStackApplier(path string, kubeClientFactory kubernetes.ClientFactoryInte
}
}

// Run executes the initial apply and watches the stack for updates.
// Run watches the stack for updates and executes the initial apply.
func (s *StackApplier) Run(ctx context.Context) error {
if ctx.Err() != nil {
return nil // The context is already done.
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

watcher, err := fsnotify.NewWatcher()
if err != nil {
return fmt.Errorf("failed to create watcher: %w", err)
}
defer watcher.Close()

debounceCtx, cancelDebouncer := context.WithCancel(ctx)
defer cancelDebouncer()
trigger := make(chan struct{}, 1)
watchErr := make(chan error, 1)
go func() { watchErr <- s.runWatcher(watcher, trigger, ctx.Done()) }()

debouncer := debounce.Debouncer[fsnotify.Event]{
Input: watcher.Events,
Timeout: 1 * time.Second,
Filter: s.triggersApply,
Callback: func(fsnotify.Event) { s.apply(debounceCtx) },
if err := watcher.Add(s.path); err != nil {
return fmt.Errorf("failed to watch %q: %w", s.path, err)
}

// Send an artificial event to ensure that an initial apply will happen.
go func() { watcher.Events <- fsnotify.Event{} }()

// Consume and log any errors.
go func() {
for {
err, ok := <-watcher.Errors
if !ok {
return
}
s.log.WithError(err).Error("Error while watching stack")
for {
select {
case <-trigger:
s.apply(ctx)
case err := <-watchErr:
return err
}
}()

err = watcher.Add(s.path)
if err != nil {
return fmt.Errorf("failed to watch %q: %w", s.path, err)
}

_ = debouncer.Run(debounceCtx)
return nil
}

func (*StackApplier) triggersApply(event fsnotify.Event) bool {
// Always let the initial apply happen
if event == (fsnotify.Event{}) {
return true
}
func (s *StackApplier) runWatcher(watcher *fsnotify.Watcher, trigger chan<- struct{}, stop <-chan struct{}) (err error) {
defer func() { err = errors.Join(err, watcher.Close()) }()

// Only consider events on manifest files
if match, _ := filepath.Match(manifestFilePattern, filepath.Base(event.Name)); !match {
return false
}
const timeout = 1 * time.Second // debounce events for one second
timer := time.NewTimer(timeout)
defer timer.Stop()

for {
select {
case err := <-watcher.Errors:
return fmt.Errorf("while watching stack: %w", err)

case event := <-watcher.Events:
// Only consider events on manifest files
if match, _ := filepath.Match(manifestFilePattern, filepath.Base(event.Name)); !match {
continue
}
timer.Reset(timeout)

return true
case <-timer.C:
select {
case trigger <- struct{}{}:
default:
}

case <-stop:
return nil
}
}
}

func (s *StackApplier) apply(ctx context.Context) {
Expand Down