Skip to content

✨ Use Secret as provider cache #827

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/v1alpha2/provider_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
ConfigMapNameLabel = "provider.cluster.x-k8s.io/name"

CompressedAnnotation = "provider.cluster.x-k8s.io/compressed"
TrueValue = "true"

MetadataConfigMapKey = "metadata"
ComponentsConfigMapKey = "components"
Expand Down
1 change: 0 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/config"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/healthz"

ctrlwebhook "sigs.k8s.io/controller-runtime/pkg/webhook"

operatorv1 "sigs.k8s.io/cluster-api-operator/api/v1alpha2"
Expand Down
185 changes: 180 additions & 5 deletions internal/controller/genericprovider_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,21 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/rest"
operatorv1 "sigs.k8s.io/cluster-api-operator/api/v1alpha2"
"sigs.k8s.io/cluster-api-operator/internal/controller/genericprovider"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
configclient "sigs.k8s.io/cluster-api/cmd/clusterctl/client/config"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

Expand All @@ -56,6 +59,7 @@ type GenericProviderReconciler struct {

const (
appliedSpecHashAnnotation = "operator.cluster.x-k8s.io/applied-spec-hash"
cacheOwner = "capi-operator"
)

func (r *GenericProviderReconciler) BuildWithManager(ctx context.Context, mgr ctrl.Manager) (*ctrl.Builder, error) {
Expand Down Expand Up @@ -88,14 +92,17 @@ func (r *GenericProviderReconciler) BuildWithManager(ctx context.Context, mgr ct
reconciler := NewPhaseReconciler(*r, r.Provider, r.ProviderList)

r.ReconcilePhases = []PhaseFn{
reconciler.ApplyFromCache,
reconciler.PreflightChecks,
reconciler.InitializePhaseReconciler,
reconciler.DownloadManifests,
reconciler.Load,
reconciler.Fetch,
reconciler.Store,
reconciler.Upgrade,
reconciler.Install,
reconciler.ReportStatus,
reconciler.Finalize,
}

r.DeletePhases = []PhaseFn{
Expand Down Expand Up @@ -175,6 +182,7 @@ func (r *GenericProviderReconciler) Reconcile(ctx context.Context, req reconcile

if r.Provider.GetAnnotations()[appliedSpecHashAnnotation] == specHash {
log.Info("No changes detected, skipping further steps")

return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -316,17 +324,184 @@ func addObjectToHash(hash hash.Hash, object interface{}) error {
return nil
}

// providerHash calculates hash for provider and referenced objects.
func providerHash(ctx context.Context, client client.Client, hash hash.Hash, provider genericprovider.GenericProvider) error {
log := log.FromContext(ctx)

err := addObjectToHash(hash, provider.GetSpec())
if err != nil {
log.Error(err, "failed to calculate provider hash")

return err
}

if err := addConfigSecretToHash(ctx, client, hash, provider); err != nil {
log.Error(err, "failed to calculate secret hash")

return err
}

return nil
}

func calculateHash(ctx context.Context, k8sClient client.Client, provider genericprovider.GenericProvider) (string, error) {
hash := sha256.New()

err := addObjectToHash(hash, provider.GetSpec())
err := providerHash(ctx, k8sClient, hash, provider)

return fmt.Sprintf("%x", hash.Sum(nil)), err
}

// ApplyFromCache applies provider configuration from cache and returns true if the cache did not change.
func (p *PhaseReconciler) ApplyFromCache(ctx context.Context) (*Result, error) {
log := log.FromContext(ctx)

secret := &corev1.Secret{}
if err := p.ctrlClient.Get(ctx, client.ObjectKey{Name: ProviderCacheName(p.provider), Namespace: p.provider.GetNamespace()}, secret); apierrors.IsNotFound(err) {
// secret does not exist, nothing to apply
return &Result{}, nil
} else if err != nil {
log.Error(err, "failed to get provider cache")

return &Result{}, fmt.Errorf("failed to get provider cache: %w", err)
}

// calculate combined hash for provider and config map cache
hash := sha256.New()
if err := providerHash(ctx, p.ctrlClient, hash, p.provider); err != nil {
log.Error(err, "failed to calculate provider hash")

return &Result{}, err
}

if err := addObjectToHash(hash, secret.Data); err != nil {
log.Error(err, "failed to calculate config map hash")

return &Result{}, err
}

cacheHash := fmt.Sprintf("%x", hash.Sum(nil))
if secret.GetAnnotations()[appliedSpecHashAnnotation] != cacheHash || p.provider.GetAnnotations()[appliedSpecHashAnnotation] != cacheHash {
log.Info("Provider or cache state has changed", "cacheHash", cacheHash, "providerHash", secret.GetAnnotations()[appliedSpecHashAnnotation])

return &Result{}, nil
}

log.Info("Applying provider configuration from cache")

errs := []error{}

mr := configclient.NewMemoryReader()

if err := mr.Init(ctx, ""); err != nil {
return &Result{}, err
}

// Fetch configuration variables from the secret. See API field docs for more info.
if err := initReaderVariables(ctx, p.ctrlClient, mr, p.provider); err != nil {
return &Result{}, err
}

for _, manifest := range secret.Data {
if secret.GetAnnotations()[operatorv1.CompressedAnnotation] == operatorv1.TrueValue {
break
}

manifests := []unstructured.Unstructured{}

err := json.Unmarshal(manifest, &manifests)
if err != nil {
log.Error(err, "failed to convert yaml to unstructured")

return &Result{}, err
}

for _, manifest := range manifests {
if err := p.ctrlClient.Patch(ctx, &manifest, client.Apply, client.ForceOwnership, client.FieldOwner(cacheOwner)); err != nil {
errs = append(errs, err)
}
}
}

for _, binaryManifest := range secret.Data {
if secret.GetAnnotations()[operatorv1.CompressedAnnotation] != operatorv1.TrueValue {
break
}

manifest, err := decompressData(binaryManifest)
if err != nil {
log.Error(err, "failed to decompress yaml")

return &Result{}, err
}

manifests := []unstructured.Unstructured{}

err = json.Unmarshal(manifest, &manifests)
if err != nil {
log.Error(err, "failed to convert yaml to unstructured")

return &Result{}, err
}

for _, manifest := range manifests {
if err := p.ctrlClient.Patch(ctx, &manifest, client.Apply, client.ForceOwnership, client.FieldOwner(cacheOwner)); err != nil {
errs = append(errs, err)
}
}
}

if err := kerrors.NewAggregate(errs); err != nil {
log.Error(err, "failed to apply objects from cache")

return &Result{}, err
}

log.Info("Applied all objects from cache")

return &Result{Completed: true}, nil
}

// setCacheHash calculates current provider and secret hash, and updates it on the secret.
func setCacheHash(ctx context.Context, cl client.Client, provider genericprovider.GenericProvider) error {
secret := &corev1.Secret{}
if err := cl.Get(ctx, client.ObjectKey{Name: ProviderCacheName(provider), Namespace: provider.GetNamespace()}, secret); err != nil {
return fmt.Errorf("failed to get cache secret: %w", err)
}

helper, err := patch.NewHelper(secret, cl)
if err != nil {
return "", err
return err
}

if err := addConfigSecretToHash(ctx, k8sClient, hash, provider); err != nil {
return "", err
hash := sha256.New()

if err := providerHash(ctx, cl, hash, provider); err != nil {
return err
}

if err := addObjectToHash(hash, secret.Data); err != nil {
return err
}

return fmt.Sprintf("%x", hash.Sum(nil)), nil
cacheHash := fmt.Sprintf("%x", hash.Sum(nil))

annotations := secret.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
}

annotations[appliedSpecHashAnnotation] = cacheHash
secret.SetAnnotations(annotations)

// Set hash on the provider to avoid cache re-use on re-creation
annotations = provider.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
}

annotations[appliedSpecHashAnnotation] = cacheHash
provider.SetAnnotations(annotations)

return helper.Patch(ctx, secret)
}
62 changes: 54 additions & 8 deletions internal/controller/manifests_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"compress/gzip"
"context"
"fmt"
"io"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -40,6 +41,7 @@ const (
configMapSourceLabel = "provider.cluster.x-k8s.io/source"
configMapSourceAnnotation = "provider.cluster.x-k8s.io/source"
operatorManagedLabel = "managed-by.operator.cluster.x-k8s.io"
operatorCacheLabel = "cached-by.operator.cluster.x-k8s.io"

maxConfigMapSize = 1 * 1024 * 1024
ociSource = "oci"
Expand Down Expand Up @@ -138,6 +140,16 @@ func (p *PhaseReconciler) checkConfigMapExists(ctx context.Context, labelSelecto
return len(configMapList.Items) == 1, nil
}

// Finalize applies combined hash to a configMap, in order to mark provider provisioning completed.
func (p *PhaseReconciler) Finalize(ctx context.Context) (*Result, error) {
err := setCacheHash(ctx, p.ctrlClient, p.provider)
if err != nil {
ctrl.LoggerFrom(ctx).V(5).Error(err, "Failed to update providers hash")
}

return &Result{}, wrapPhaseError(err, "failed to update providers hash", operatorv1.ProviderInstalledCondition)
}

// prepareConfigMapLabels returns labels that identify a config map with downloaded manifests.
func (p *PhaseReconciler) prepareConfigMapLabels() map[string]string {
return ProviderLabels(p.provider)
Expand Down Expand Up @@ -178,17 +190,10 @@ func TemplateManifestsConfigMap(provider operatorv1.GenericProvider, labels map[
configMap.Data[operatorv1.ComponentsConfigMapKey] = string(components)
} else {
var componentsBuf bytes.Buffer
zw := gzip.NewWriter(&componentsBuf)

_, err := zw.Write(components)
if err != nil {
if err := compressData(&componentsBuf, components); err != nil {
return nil, fmt.Errorf("cannot compress data for provider %s/%s: %w", provider.GetNamespace(), provider.GetName(), err)
}

if err := zw.Close(); err != nil {
return nil, err
}

configMap.BinaryData = map[string][]byte{
operatorv1.ComponentsConfigMapKey: componentsBuf.Bytes(),
}
Expand All @@ -211,6 +216,41 @@ func TemplateManifestsConfigMap(provider operatorv1.GenericProvider, labels map[
return configMap, nil
}

// compressData takes a bytes.Buffer and data, and compresses data into it.
func compressData(componentsBuf *bytes.Buffer, data []byte) (err error) {
zw := gzip.NewWriter(componentsBuf)

_, err = zw.Write(data)
defer func() {
err = zw.Close()
}()

if err != nil {
return fmt.Errorf("cannot compress data: %w", err)
}

return
}

// decompressData takes a compressed data, and decompresses it.
func decompressData(compressedData []byte) (data []byte, err error) {
zr, err := gzip.NewReader(bytes.NewReader(compressedData))
if err != nil {
return nil, fmt.Errorf("cannot open gzip reader from data: %w", err)
}

defer func() {
err = zr.Close()
}()

decompressedData, err := io.ReadAll(zr)
if err != nil {
return nil, fmt.Errorf("cannot decompress data: %w", err)
}

return decompressedData, nil
}

// OCIConfigMap templates config from the OCI source.
func OCIConfigMap(ctx context.Context, provider operatorv1.GenericProvider, auth *auth.Credential) (*corev1.ConfigMap, error) {
store, err := FetchOCI(ctx, provider, auth)
Expand Down Expand Up @@ -301,6 +341,12 @@ func ProviderLabels(provider operatorv1.GenericProvider) map[string]string {
return labels
}

// ProviderCacheName generates a cache name for a given provider.

func ProviderCacheName(provider operatorv1.GenericProvider) string {
return fmt.Sprintf("%s-%s-%s-cache", provider.GetType(), provider.GetName(), provider.GetSpec().Version)
}

// needToCompress checks whether the input data exceeds the maximum configmap
// size limit and returns whether it should be compressed.
func needToCompress(bs ...[]byte) bool {
Expand Down
Loading
Loading