Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ require (
github.com/aws/aws-sdk-go-v2/service/kms v1.45.6
github.com/aws/smithy-go v1.23.0
github.com/beevik/ntp v1.4.3
github.com/benbjohnson/clock v1.3.5 // project archived on 2023-05-18
github.com/blang/semver/v4 v4.0.0
github.com/cenkalti/backoff/v4 v4.3.0
github.com/containerd/cgroups/v3 v3.0.5
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ github.com/aws/smithy-go v1.23.0 h1:8n6I3gXzWJB2DxBDnfxgBaSX6oe0d/t10qGz7OKqMCE=
github.com/aws/smithy-go v1.23.0/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI=
github.com/beevik/ntp v1.4.3 h1:PlbTvE5NNy4QHmA4Mg57n7mcFTmr1W1j3gcK7L1lqho=
github.com/beevik/ntp v1.4.3/go.mod h1:Unr8Zg+2dRn7d8bHFuehIMSvvUYssHMxW3Q5Nx4RW5Q=
github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o=
github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d h1:xDfNPAt8lFiC1UJrqV3uuy861HCTo708pDMbjHHdCas=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"net"
"sync"
"syscall"
"time"

"github.com/benbjohnson/clock"
"github.com/siderolabs/gen/channel"
"go.uber.org/zap"

Expand All @@ -21,9 +21,8 @@ import (

// Runner describes a state of running probe.
type Runner struct {
ID string
Spec network.ProbeSpecSpec
Clock clock.Clock
ID string
Spec network.ProbeSpecSpec

cancel context.CancelFunc
wg sync.WaitGroup
Expand Down Expand Up @@ -61,11 +60,7 @@ func (runner *Runner) Stop() {
func (runner *Runner) run(ctx context.Context, notifyCh chan<- Notification, logger *zap.Logger) {
logger = logger.With(zap.String("probe", runner.ID))

if runner.Clock == nil {
runner.Clock = clock.New()
}

ticker := runner.Clock.Ticker(runner.Spec.Interval)
ticker := time.NewTicker(runner.Spec.Interval)
defer ticker.Stop()

consecutiveFailures := 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"net/http/httptest"
"net/url"
"testing"
"testing/synctest"
"time"

"github.com/benbjohnson/clock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
Expand All @@ -22,8 +22,6 @@ import (
)

func TestProbeHTTP(t *testing.T) {
t.Parallel()

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
Expand All @@ -43,7 +41,7 @@ func TestProbeHTTP(t *testing.T) {
},
}

ctx, cancel := context.WithTimeout(t.Context(), 3*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)

notifyCh := make(chan probe.Notification)
Expand Down Expand Up @@ -80,73 +78,76 @@ func TestProbeHTTP(t *testing.T) {
}

func TestProbeConsecutiveFailures(t *testing.T) {
t.Parallel()

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
t.Cleanup(server.Close)

u, err := url.Parse(server.URL)
require.NoError(t, err)

mockClock := clock.NewMock()

p := probe.Runner{
ID: "consecutive-failures",
Spec: network.ProbeSpecSpec{
Interval: 10 * time.Millisecond,
FailureThreshold: 3,
TCP: network.TCPProbeSpec{
Endpoint: u.Host,
Timeout: time.Second,
// Use synctest.Test to run the test in a controlled time bubble.
// This allows us to test time-dependent behavior without actual delays,
// making the test both faster and more deterministic.
synctest.Test(t, func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer server.Close()

u, err := url.Parse(server.URL)
require.NoError(t, err)

p := probe.Runner{
ID: "consecutive-failures",
Spec: network.ProbeSpecSpec{
Interval: 10 * time.Millisecond,
FailureThreshold: 3,
TCP: network.TCPProbeSpec{
Endpoint: u.Host,
Timeout: time.Second,
},
},
},
Clock: mockClock,
}
}

ctx, cancel := context.WithTimeout(t.Context(), 3*time.Second)
t.Cleanup(cancel)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

notifyCh := make(chan probe.Notification)
notifyCh := make(chan probe.Notification)

p.Start(ctx, notifyCh, zaptest.NewLogger(t))
t.Cleanup(p.Stop)

// first iteration should succeed
assert.Equal(t, probe.Notification{
ID: "consecutive-failures",
Status: network.ProbeStatusSpec{
Success: true,
},
}, <-notifyCh)
p.Start(ctx, notifyCh, zaptest.NewLogger(t))
defer p.Stop()

// stop the test server, probe should fail
server.Close()

for range p.Spec.FailureThreshold - 1 {
// probe should fail, but no notification should be sent yet (failure threshold not reached)
mockClock.Add(p.Spec.Interval)
// first iteration should succeed
assert.Equal(t, probe.Notification{
ID: "consecutive-failures",
Status: network.ProbeStatusSpec{
Success: true,
},
}, <-notifyCh)

select {
case ev := <-notifyCh:
require.Fail(t, "unexpected notification", "got: %v", ev)
case <-time.After(100 * time.Millisecond):
// stop the test server, probe should fail
server.Close()

for range p.Spec.FailureThreshold - 1 {
// probe should fail, but no notification should be sent yet (failure threshold not reached)
// synctest.Wait() waits until all goroutines in the bubble are durably blocked,
// which happens when the ticker in the probe runner is waiting for the next interval
synctest.Wait()

select {
case ev := <-notifyCh:
require.Fail(t, "unexpected notification", "got: %v", ev)
default:
// Expected: no notification yet
}
}
}

// advance clock to trigger another failure(s)
mockClock.Add(p.Spec.Interval)
// wait for next interval to trigger failure notification
synctest.Wait()

notify := <-notifyCh
assert.Equal(t, "consecutive-failures", notify.ID)
assert.False(t, notify.Status.Success)
assert.Contains(t, notify.Status.LastError, "connection refused")
notify := <-notifyCh
assert.Equal(t, "consecutive-failures", notify.ID)
assert.False(t, notify.Status.Success)
assert.Contains(t, notify.Status.LastError, "connection refused")

// advance clock to trigger another failure(s)
mockClock.Add(p.Spec.Interval)
// wait for next interval to trigger another failure notification
synctest.Wait()

notify = <-notifyCh
assert.Equal(t, "consecutive-failures", notify.ID)
assert.False(t, notify.Status.Success)
notify = <-notifyCh
assert.Equal(t, "consecutive-failures", notify.ID)
assert.False(t, notify.Status.Success)
})
}
16 changes: 5 additions & 11 deletions internal/app/machined/pkg/controllers/runtime/cri_image_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"fmt"
"time"

"github.com/benbjohnson/clock"
containerd "github.com/containerd/containerd/v2/client"
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/pkg/namespaces"
Expand Down Expand Up @@ -38,7 +37,6 @@ const ImageGCGracePeriod = 4 * ImageCleanupInterval
// CRIImageGCController renders manifests based on templates and config/secrets.
type CRIImageGCController struct {
ImageServiceProvider func() (ImageServiceProvider, error)
Clock clock.Clock

imageFirstSeenUnreferenced map[string]time.Time
}
Expand Down Expand Up @@ -114,10 +112,6 @@ func (ctrl *CRIImageGCController) Run(ctx context.Context, r controller.Runtime,
ctrl.ImageServiceProvider = defaultImageServiceProvider
}

if ctrl.Clock == nil {
ctrl.Clock = clock.New()
}

if ctrl.imageFirstSeenUnreferenced == nil {
ctrl.imageFirstSeenUnreferenced = map[string]time.Time{}
}
Expand All @@ -128,7 +122,7 @@ func (ctrl *CRIImageGCController) Run(ctx context.Context, r controller.Runtime,
imageServiceProvider ImageServiceProvider
)

ticker := ctrl.Clock.Ticker(ImageCleanupInterval)
ticker := time.NewTicker(ImageCleanupInterval)
defer ticker.Stop()

defer func() {
Expand Down Expand Up @@ -179,7 +173,7 @@ func (ctrl *CRIImageGCController) Run(ctx context.Context, r controller.Runtime,

kubeletSpec, err := safe.ReaderGet[*k8s.KubeletSpec](ctx, r, resource.NewMetadata(k8s.NamespaceName, k8s.KubeletSpecType, k8s.KubeletID, resource.VersionUndefined))
if err != nil && !state.IsNotFoundError(err) {
return fmt.Errorf("error getting etcd spec: %w", err)
return fmt.Errorf("error getting kubelet spec: %w", err)
}

if kubeletSpec != nil {
Expand Down Expand Up @@ -285,14 +279,14 @@ func (ctrl *CRIImageGCController) cleanup(ctx context.Context, logger *zap.Logge
}

if _, ok := ctrl.imageFirstSeenUnreferenced[image.Name]; !ok {
ctrl.imageFirstSeenUnreferenced[image.Name] = ctrl.Clock.Now()
ctrl.imageFirstSeenUnreferenced[image.Name] = time.Now()
}

// calculate image age two ways, and pick the minimum:
// * as CRI reports it, which is the time image got pulled
// * as we see it, this means the image won't be deleted until it reaches the age of ImageGCGracePeriod from the moment it became unreferenced
imageAgeCRI := ctrl.Clock.Since(image.CreatedAt)
imageAgeInternal := ctrl.Clock.Since(ctrl.imageFirstSeenUnreferenced[image.Name])
imageAgeCRI := time.Since(image.CreatedAt)
imageAgeInternal := time.Since(ctrl.imageFirstSeenUnreferenced[image.Name])

imageAge := min(imageAgeCRI, imageAgeInternal)

Expand Down
Loading
Loading