diff --git a/src/go/pkg/synk/BUILD.bazel b/src/go/pkg/synk/BUILD.bazel index c10d04b0..d83f6ef5 100644 --- a/src/go/pkg/synk/BUILD.bazel +++ b/src/go/pkg/synk/BUILD.bazel @@ -11,6 +11,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//src/go/pkg/apis/apps/v1alpha1:go_default_library", + "//src/go/pkg/client/versioned:go_default_library", "@com_github_cenkalti_backoff//:go_default_library", "@com_github_googlecloudrobotics_ilog//:go_default_library", "@com_github_pkg_errors//:go_default_library", @@ -45,6 +46,7 @@ go_test( embed = [":go_default_library"], deps = [ "//src/go/pkg/apis/apps/v1alpha1:go_default_library", + "//src/go/pkg/client/versioned/fake:go_default_library", "@com_github_pkg_errors//:go_default_library", "@io_k8s_api//core/v1:go_default_library", "@io_k8s_apimachinery//pkg/api/errors:go_default_library", diff --git a/src/go/pkg/synk/synk.go b/src/go/pkg/synk/synk.go index 5aaa6f0b..a9660063 100644 --- a/src/go/pkg/synk/synk.go +++ b/src/go/pkg/synk/synk.go @@ -30,6 +30,7 @@ import ( "github.com/cenkalti/backoff" apps "github.com/googlecloudrobotics/core/src/go/pkg/apis/apps/v1alpha1" + crcapi "github.com/googlecloudrobotics/core/src/go/pkg/client/versioned" "github.com/googlecloudrobotics/ilog" "github.com/pkg/errors" "go.opencensus.io/trace" @@ -56,19 +57,24 @@ import ( // src/k8s.io/apimachinery/pkg/api/validation/objectmeta.go const totalAnnotationSizeLimitB int = 256 * (1 << 10) // 256 kB +// Can be overridden for testing. +var metav1Now = metav1.Now + // Synk allows to synchronize sets of resources with a fixed cluster. type Synk struct { discovery discovery.CachedDiscoveryInterface client dynamic.Interface + rsClient crcapi.Interface mapper meta.RESTMapper resetMapper func() } // New returns a new Synk object that acts against the cluster for the given configuration. -func New(client dynamic.Interface, discovery discovery.CachedDiscoveryInterface) *Synk { +func New(client dynamic.Interface, rsClient crcapi.Interface, discovery discovery.CachedDiscoveryInterface) *Synk { s := &Synk{ discovery: discovery, client: client, + rsClient: rsClient, } // Store reset function seperately to allow reasonable tests. m := restmapper.NewDeferredDiscoveryRESTMapper(discovery) @@ -83,6 +89,10 @@ func NewForConfig(cfg *rest.Config) (*Synk, error) { if err != nil { return nil, err } + rsClient, err := crcapi.NewForConfig(cfg) + if err != nil { + return nil, err + } discovery, err := discovery.NewDiscoveryClientForConfig(cfg) if err != nil { return nil, err @@ -91,7 +101,7 @@ func NewForConfig(cfg *rest.Config) (*Synk, error) { // Without initial invalidation all calls will fail. cachedDiscovery.Invalidate() - return New(client, cachedDiscovery), nil + return New(client, rsClient, cachedDiscovery), nil } // TODO: determine options that allow us to be semantically compatible with @@ -215,7 +225,7 @@ func (s *Synk) Init() error { func (s *Synk) Delete(ctx context.Context, name string) error { policy := metav1.DeletePropagationForeground deleteOpts := metav1.DeleteOptions{PropagationPolicy: &policy} - return s.client.Resource(resourceSetGVR).DeleteCollection(ctx, deleteOpts, metav1.ListOptions{ + return s.rsClient.AppsV1alpha1().ResourceSets().DeleteCollection(ctx, deleteOpts, metav1.ListOptions{ LabelSelector: fmt.Sprintf("name=%s", name), }) } @@ -471,7 +481,7 @@ func (s *Synk) initialize( rs.Status = apps.ResourceSetStatus{ Phase: apps.ResourceSetPhasePending, - StartedAt: metav1.Now(), + StartedAt: metav1Now(), } if err := s.createResourceSet(ctx, &rs); err != nil { return nil, nil, errors.Wrapf(err, "create resources object %q", rs.Name) @@ -862,25 +872,9 @@ func (s *Synk) crdAvailable(ucrd *unstructured.Unstructured) (bool, error) { return true, nil } -var resourceSetGVR = schema.GroupVersionResource{ - Group: "apps.cloudrobotics.com", - Version: "v1alpha1", - Resource: "resourcesets", -} - func (s *Synk) createResourceSet(ctx context.Context, rs *apps.ResourceSet) error { - rs.Kind = "ResourceSet" - rs.APIVersion = "apps.cloudrobotics.com/v1alpha1" - - var u unstructured.Unstructured - if err := convert(rs, &u); err != nil { - return err - } - res, err := s.client.Resource(resourceSetGVR).Create(ctx, &u, metav1.CreateOptions{}) - if err != nil { - return err - } - return convert(res, rs) + _, err := s.rsClient.AppsV1alpha1().ResourceSets().Create(ctx, rs, metav1.CreateOptions{}) + return err } type applyResult struct { @@ -959,29 +953,21 @@ func (s *Synk) updateResourceSetStatus(ctx context.Context, rs *apps.ResourceSet build(applied, &rs.Status.Applied) build(failed, &rs.Status.Failed) - rs.Status.FinishedAt = metav1.Now() + rs.Status.FinishedAt = metav1Now() if len(rs.Status.Failed) > 0 { rs.Status.Phase = apps.ResourceSetPhaseFailed } else { rs.Status.Phase = apps.ResourceSetPhaseSettled } - var u unstructured.Unstructured - if err := convert(rs, &u); err != nil { - return err - } - res, err := s.client.Resource(resourceSetGVR).Update(ctx, &u, metav1.UpdateOptions{}) - if err != nil { - return errors.Wrap(err, "update ResourceSet status") - } - return convert(res, rs) + _, err := s.rsClient.AppsV1alpha1().ResourceSets().Update(ctx, rs, metav1.UpdateOptions{}) + return err } // deleteFailedResourceSets deletes all failed ResourceSets of the given name // that have a lower version. func (s *Synk) deleteFailedResourceSets(ctx context.Context, name string, version int32) error { - c := s.client.Resource(resourceSetGVR) - + c := s.rsClient.AppsV1alpha1().ResourceSets() list, err := c.List(ctx, metav1.ListOptions{ LabelSelector: "name=" + name, }) @@ -989,11 +975,7 @@ func (s *Synk) deleteFailedResourceSets(ctx context.Context, name string, versio return errors.Wrap(err, "list existing resources") } for _, r := range list.Items { - phase, found, err := unstructured.NestedString(r.Object, "status", "phase") - if err != nil { - return errors.Wrapf(err, "failed to get status.phase from ResourceSet %q", r.GetName()) - } - if !found || phase != "Failed" { + if r.Status.Phase != apps.ResourceSetPhaseFailed { continue } n, v, ok := decodeResourceSetName(r.GetName()) @@ -1014,7 +996,7 @@ func (s *Synk) deleteFailedResourceSets(ctx context.Context, name string, versio // deleteResourceSets deletes all ResourceSets of the given name that have a // lower version. func (s *Synk) deleteResourceSets(ctx context.Context, name string, version int32) error { - c := s.client.Resource(resourceSetGVR) + c := s.rsClient.AppsV1alpha1().ResourceSets() list, err := c.List(ctx, metav1.ListOptions{ LabelSelector: "name=" + name, @@ -1040,7 +1022,7 @@ func (s *Synk) deleteResourceSets(ctx context.Context, name string, version int3 // next returns the next version for the resources name. func (s *Synk) next(ctx context.Context, name string) (version int32, err error) { - list, err := s.client.Resource(resourceSetGVR).List(ctx, metav1.ListOptions{}) + list, err := s.rsClient.AppsV1alpha1().ResourceSets().List(ctx, metav1.ListOptions{}) if err != nil { return 0, errors.Wrap(err, "list existing ResourceSets") } diff --git a/src/go/pkg/synk/synk_test.go b/src/go/pkg/synk/synk_test.go index 1aa25d31..260b4b2c 100644 --- a/src/go/pkg/synk/synk_test.go +++ b/src/go/pkg/synk/synk_test.go @@ -20,8 +20,11 @@ import ( "reflect" "strings" "testing" + "time" apps "github.com/googlecloudrobotics/core/src/go/pkg/apis/apps/v1alpha1" + crcapi "github.com/googlecloudrobotics/core/src/go/pkg/client/versioned" + crcfake "github.com/googlecloudrobotics/core/src/go/pkg/client/versioned/fake" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -59,6 +62,22 @@ func (d *fakeCachedDiscoveryClient) ServerGroupsAndResources() ([]*metav1.APIGro }, nil } +// Helpers to override time when testing. +var ( + fakeEndTime = metav1.Date(2025, 01, 01, 0, 0, 0, 0, time.UTC) +) + +func fakeTime(t *testing.T, fakeTime metav1.Time) { + t.Helper() + oldFunc := metav1Now + t.Cleanup(func() { + metav1Now = oldFunc + }) + metav1Now = func() metav1.Time { + return fakeTime + } +} + type fixture struct { *testing.T fake *k8stest.Fake @@ -78,8 +97,9 @@ func (f *fixture) newSynk() *Synk { scheme.AddToScheme(sc) apps.AddToScheme(sc) // For tests with CRDs. var ( - client = dynamicfake.NewSimpleDynamicClient(sc, f.objects...) - s = New(client, &fakeCachedDiscoveryClient{}) + client = dynamicfake.NewSimpleDynamicClient(sc, f.objects...) + rsClient = crcfake.NewSimpleClientset() + s = New(client, rsClient, &fakeCachedDiscoveryClient{}) ) s.mapper = testrestmapper.TestOnlyStaticRESTMapper(sc) s.resetMapper = func() {} @@ -167,11 +187,11 @@ func TestSynk_initialize(t *testing.T) { if err != nil { t.Fatal(err) } - got, err := s.client.Resource(resourceSetGVR).Get(ctx, "test.v1", metav1.GetOptions{}) + got, err := s.rsClient.AppsV1alpha1().ResourceSets().Get(ctx, "test.v1", metav1.GetOptions{}) if err != nil { t.Fatal(err) } - var want unstructured.Unstructured + var want apps.ResourceSet unmarshalYAML(t, &want, ` apiVersion: apps.cloudrobotics.com/v1alpha1 kind: ResourceSet @@ -200,13 +220,11 @@ status: if want.GetName() != got.GetName() { t.Errorf("expected name %q but got %q", want.GetName(), got.GetName()) } - wantPhase, _, _ := unstructured.NestedString(want.Object, "status", "phase") - gotPhase, _, _ := unstructured.NestedString(got.Object, "status", "phase") - if wantPhase != gotPhase { - t.Errorf("expected status phase %q but got %q", wantPhase, gotPhase) + if want.Status.Phase != got.Status.Phase { + t.Errorf("expected status phase %q but got %q", want.Status.Phase, got.Status.Phase) } - if !reflect.DeepEqual(want.Object["spec"], got.Object["spec"]) { - t.Errorf("expected spec\n%v\nbut got\n%v", want.Object["spec"], got.Object["spec"]) + if !reflect.DeepEqual(want.Spec, got.Spec) { + t.Errorf("expected spec\n%v\nbut got\n%v", want.Spec, got.Spec) } } @@ -240,15 +258,16 @@ func TestSynk_updateResourceSetStatus(t *testing.T) { action: apps.ResourceActionCreate, }, } + fakeTime(t, fakeEndTime) err := s.updateResourceSetStatus(ctx, rs, results) if err != nil { t.Fatal(err) } - got, err := s.client.Resource(resourceSetGVR).Get(ctx, "set1", metav1.GetOptions{}) + got, err := s.rsClient.AppsV1alpha1().ResourceSets().Get(ctx, "set1", metav1.GetOptions{}) if err != nil { t.Fatal(err) } - var want unstructured.Unstructured + var want apps.ResourceSet unmarshalYAML(t, &want, ` apiVersion: apps.cloudrobotics.com/v1alpha1 kind: ResourceSet @@ -283,15 +302,10 @@ status: name: deploy1 action: Create `) - if v, _, _ := unstructured.NestedString(got.Object, "status", "finishedAt"); v == "" { - t.Errorf("finishedAt timestamp was not set") - } - // Remove unknown timestamps before running DeepEqual. - unstructured.RemoveNestedField(got.Object, "status", "startedAt") - unstructured.RemoveNestedField(got.Object, "status", "finishedAt") + want.Status.FinishedAt = fakeEndTime - if !reflect.DeepEqual(got.Object["status"], want.Object["status"]) { - t.Errorf("expected status:\n%q\nbut got:\n%q", want.Object["status"], got.Object["status"]) + if !reflect.DeepEqual(got.Status, want.Status) { + t.Errorf("expected status:\n%q\nbut got:\n%q", want.Status, got.Status) } } @@ -502,11 +516,11 @@ func TestSynk_skipsTestResources(t *testing.T) { if err != nil { t.Fatal(err) } - got, err := s.client.Resource(resourceSetGVR).Get(ctx, "test.v1", metav1.GetOptions{}) + got, err := s.rsClient.AppsV1alpha1().ResourceSets().Get(ctx, "test.v1", metav1.GetOptions{}) if err != nil { t.Fatal(err) } - var want unstructured.Unstructured + var want apps.ResourceSet unmarshalYAML(t, &want, ` apiVersion: apps.cloudrobotics.com/v1alpha1 kind: ResourceSet @@ -524,69 +538,95 @@ spec: status: phase: Pending `) - if !reflect.DeepEqual(want.Object["spec"], got.Object["spec"]) { - t.Errorf("expected spec\n%v\nbut got\n%v", want.Object["spec"], got.Object["spec"]) + if !reflect.DeepEqual(want.Spec, got.Spec) { + t.Errorf("expected spec\n%v\nbut got\n%v", want.Spec, got.Spec) + } +} + +func createResourceSet(t *testing.T, rsClient crcapi.Interface, name, version string) { + t.Helper() + if _, err := rsClient.AppsV1alpha1().ResourceSets().Create(context.TODO(), &apps.ResourceSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name + "." + version, + Labels: map[string]string{"name": name}, + }, + }, metav1.CreateOptions{}); err != nil { + t.Error(err) + } +} + +func createFailedResourceSet(t *testing.T, rsClient crcapi.Interface, name, version string) { + t.Helper() + if _, err := rsClient.AppsV1alpha1().ResourceSets().Create(context.TODO(), &apps.ResourceSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name + "." + version, + Labels: map[string]string{"name": name}, + }, + Status: apps.ResourceSetStatus{ + Phase: apps.ResourceSetPhaseFailed, + }, + }, metav1.CreateOptions{}); err != nil { + t.Error(err) } } func TestSynk_deleteResourceSets(t *testing.T) { ctx := context.Background() - nu := func(name, version string) *unstructured.Unstructured { - u := newUnstructured("apps.cloudrobotics.com/v1alpha1", "ResourceSet", "", name+"."+version) - u.SetLabels(map[string]string{"name": name}) - return u - } f := newFixture(t) - f.addObjects( - nu("test", "v2"), - nu("bad_name", ""), - nu("other", "v3"), - nu("test", "v4"), - nu("test", "v7"), - nu("test", "v8"), - ) synk := f.newSynk() + rsClient := synk.rsClient.AppsV1alpha1().ResourceSets() + createResourceSet(t, synk.rsClient, "bad_name", "") + createResourceSet(t, synk.rsClient, "other", "v3") + createResourceSet(t, synk.rsClient, "test", "v2") + createResourceSet(t, synk.rsClient, "test", "v4") + createResourceSet(t, synk.rsClient, "test", "v7") + createResourceSet(t, synk.rsClient, "test", "v8") + wantNames := []string{"bad_name.", "other.v3", "test.v7", "test.v8"} + wantDeletions := []string{"test.v2", "test.v4"} err := synk.deleteResourceSets(ctx, "test", 7) if err != nil { t.Fatal(err) } - f.expectActions( - k8stest.NewRootDeleteAction(resourceSetGVR, "test.v2"), - k8stest.NewRootDeleteAction(resourceSetGVR, "test.v4"), - ) - f.verifyWriteActions() + for _, name := range wantNames { + if _, err := rsClient.Get(ctx, name, metav1.GetOptions{}); err != nil { + t.Errorf("unexpected error getting ResourceSet %q: err=%v; want nil error", err, err) + } + } + for _, name := range wantDeletions { + if _, err := rsClient.Get(ctx, name, metav1.GetOptions{}); !k8serrors.IsNotFound(err) { + t.Errorf("unexpected error getting ResourceSet %q: err=%v; want not found", name, err) + } + } } func TestSynk_deleteFailedResourceSets(t *testing.T) { ctx := context.Background() - nu := func(name, version string, failed bool) *unstructured.Unstructured { - u := newUnstructured("apps.cloudrobotics.com/v1alpha1", "ResourceSet", "", name+"."+version) - u.SetLabels(map[string]string{"name": name}) - if failed { - unstructured.SetNestedField(u.Object, "Failed", "status", "phase") - } - return u - } f := newFixture(t) - f.addObjects( - nu("test", "v2", true), - nu("test", "v4", false), - nu("test", "v6", true), - nu("test", "v7", true), - nu("test", "v8", true), - ) synk := f.newSynk() + rsClient := synk.rsClient.AppsV1alpha1().ResourceSets() + createFailedResourceSet(t, synk.rsClient, "test", "v2") + createResourceSet(t, synk.rsClient, "test", "v4") + createFailedResourceSet(t, synk.rsClient, "test", "v6") + createFailedResourceSet(t, synk.rsClient, "test", "v7") + createFailedResourceSet(t, synk.rsClient, "test", "v8") + wantNames := []string{"test.v7", "test.v8"} + wantDeletions := []string{"test.v2", "test.v6"} err := synk.deleteFailedResourceSets(ctx, "test", 7) if err != nil { t.Fatal(err) } - f.expectActions( - k8stest.NewRootDeleteAction(resourceSetGVR, "test.v2"), - k8stest.NewRootDeleteAction(resourceSetGVR, "test.v6"), - ) - f.verifyWriteActions() + for _, name := range wantNames { + if _, err := rsClient.Get(ctx, name, metav1.GetOptions{}); err != nil { + t.Errorf("unexpected error getting ResourceSet %q: err=%v; want nil error", err, err) + } + } + for _, name := range wantDeletions { + if _, err := rsClient.Get(ctx, name, metav1.GetOptions{}); !k8serrors.IsNotFound(err) { + t.Errorf("unexpected error getting ResourceSet %q: err=%v; want not found", name, err) + } + } } func TestSynk_populateNamespaces(t *testing.T) {