Skip to content

Commit ad2db37

Browse files
authored
Merge pull request #827 from Danil-Grigorev/config-map-cache-hash-wip
✨ Use Secret as provider cache
2 parents 7d3bc72 + 7dd20b3 commit ad2db37

File tree

5 files changed

+372
-45
lines changed

5 files changed

+372
-45
lines changed

api/v1alpha2/provider_types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const (
3030
ConfigMapNameLabel = "provider.cluster.x-k8s.io/name"
3131

3232
CompressedAnnotation = "provider.cluster.x-k8s.io/compressed"
33+
TrueValue = "true"
3334

3435
MetadataConfigMapKey = "metadata"
3536
ComponentsConfigMapKey = "components"

cmd/main.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ import (
4343
"sigs.k8s.io/controller-runtime/pkg/config"
4444
"sigs.k8s.io/controller-runtime/pkg/controller"
4545
"sigs.k8s.io/controller-runtime/pkg/healthz"
46-
4746
ctrlwebhook "sigs.k8s.io/controller-runtime/pkg/webhook"
4847

4948
operatorv1 "sigs.k8s.io/cluster-api-operator/api/v1alpha2"

internal/controller/genericprovider_controller.go

Lines changed: 180 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,21 @@ import (
2727
corev1 "k8s.io/api/core/v1"
2828
apierrors "k8s.io/apimachinery/pkg/api/errors"
2929
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
3031
kerrors "k8s.io/apimachinery/pkg/util/errors"
3132
"k8s.io/client-go/rest"
3233
operatorv1 "sigs.k8s.io/cluster-api-operator/api/v1alpha2"
3334
"sigs.k8s.io/cluster-api-operator/internal/controller/genericprovider"
3435
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
36+
configclient "sigs.k8s.io/cluster-api/cmd/clusterctl/client/config"
3537
"sigs.k8s.io/cluster-api/util/conditions"
3638
"sigs.k8s.io/cluster-api/util/patch"
3739
ctrl "sigs.k8s.io/controller-runtime"
3840
"sigs.k8s.io/controller-runtime/pkg/client"
3941
"sigs.k8s.io/controller-runtime/pkg/controller"
4042
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
4143
"sigs.k8s.io/controller-runtime/pkg/handler"
44+
"sigs.k8s.io/controller-runtime/pkg/log"
4245
"sigs.k8s.io/controller-runtime/pkg/reconcile"
4346
)
4447

@@ -56,6 +59,7 @@ type GenericProviderReconciler struct {
5659

5760
const (
5861
appliedSpecHashAnnotation = "operator.cluster.x-k8s.io/applied-spec-hash"
62+
cacheOwner = "capi-operator"
5963
)
6064

6165
func (r *GenericProviderReconciler) BuildWithManager(ctx context.Context, mgr ctrl.Manager) (*ctrl.Builder, error) {
@@ -88,14 +92,17 @@ func (r *GenericProviderReconciler) BuildWithManager(ctx context.Context, mgr ct
8892
reconciler := NewPhaseReconciler(*r, r.Provider, r.ProviderList)
8993

9094
r.ReconcilePhases = []PhaseFn{
95+
reconciler.ApplyFromCache,
9196
reconciler.PreflightChecks,
9297
reconciler.InitializePhaseReconciler,
9398
reconciler.DownloadManifests,
9499
reconciler.Load,
95100
reconciler.Fetch,
101+
reconciler.Store,
96102
reconciler.Upgrade,
97103
reconciler.Install,
98104
reconciler.ReportStatus,
105+
reconciler.Finalize,
99106
}
100107

101108
r.DeletePhases = []PhaseFn{
@@ -175,6 +182,7 @@ func (r *GenericProviderReconciler) Reconcile(ctx context.Context, req reconcile
175182

176183
if r.Provider.GetAnnotations()[appliedSpecHashAnnotation] == specHash {
177184
log.Info("No changes detected, skipping further steps")
185+
178186
return ctrl.Result{}, nil
179187
}
180188

@@ -316,17 +324,184 @@ func addObjectToHash(hash hash.Hash, object interface{}) error {
316324
return nil
317325
}
318326

327+
// providerHash calculates hash for provider and referenced objects.
328+
func providerHash(ctx context.Context, client client.Client, hash hash.Hash, provider genericprovider.GenericProvider) error {
329+
log := log.FromContext(ctx)
330+
331+
err := addObjectToHash(hash, provider.GetSpec())
332+
if err != nil {
333+
log.Error(err, "failed to calculate provider hash")
334+
335+
return err
336+
}
337+
338+
if err := addConfigSecretToHash(ctx, client, hash, provider); err != nil {
339+
log.Error(err, "failed to calculate secret hash")
340+
341+
return err
342+
}
343+
344+
return nil
345+
}
346+
319347
func calculateHash(ctx context.Context, k8sClient client.Client, provider genericprovider.GenericProvider) (string, error) {
320348
hash := sha256.New()
321349

322-
err := addObjectToHash(hash, provider.GetSpec())
350+
err := providerHash(ctx, k8sClient, hash, provider)
351+
352+
return fmt.Sprintf("%x", hash.Sum(nil)), err
353+
}
354+
355+
// ApplyFromCache applies provider configuration from cache and returns true if the cache did not change.
356+
func (p *PhaseReconciler) ApplyFromCache(ctx context.Context) (*Result, error) {
357+
log := log.FromContext(ctx)
358+
359+
secret := &corev1.Secret{}
360+
if err := p.ctrlClient.Get(ctx, client.ObjectKey{Name: ProviderCacheName(p.provider), Namespace: p.provider.GetNamespace()}, secret); apierrors.IsNotFound(err) {
361+
// secret does not exist, nothing to apply
362+
return &Result{}, nil
363+
} else if err != nil {
364+
log.Error(err, "failed to get provider cache")
365+
366+
return &Result{}, fmt.Errorf("failed to get provider cache: %w", err)
367+
}
368+
369+
// calculate combined hash for provider and config map cache
370+
hash := sha256.New()
371+
if err := providerHash(ctx, p.ctrlClient, hash, p.provider); err != nil {
372+
log.Error(err, "failed to calculate provider hash")
373+
374+
return &Result{}, err
375+
}
376+
377+
if err := addObjectToHash(hash, secret.Data); err != nil {
378+
log.Error(err, "failed to calculate config map hash")
379+
380+
return &Result{}, err
381+
}
382+
383+
cacheHash := fmt.Sprintf("%x", hash.Sum(nil))
384+
if secret.GetAnnotations()[appliedSpecHashAnnotation] != cacheHash || p.provider.GetAnnotations()[appliedSpecHashAnnotation] != cacheHash {
385+
log.Info("Provider or cache state has changed", "cacheHash", cacheHash, "providerHash", secret.GetAnnotations()[appliedSpecHashAnnotation])
386+
387+
return &Result{}, nil
388+
}
389+
390+
log.Info("Applying provider configuration from cache")
391+
392+
errs := []error{}
393+
394+
mr := configclient.NewMemoryReader()
395+
396+
if err := mr.Init(ctx, ""); err != nil {
397+
return &Result{}, err
398+
}
399+
400+
// Fetch configuration variables from the secret. See API field docs for more info.
401+
if err := initReaderVariables(ctx, p.ctrlClient, mr, p.provider); err != nil {
402+
return &Result{}, err
403+
}
404+
405+
for _, manifest := range secret.Data {
406+
if secret.GetAnnotations()[operatorv1.CompressedAnnotation] == operatorv1.TrueValue {
407+
break
408+
}
409+
410+
manifests := []unstructured.Unstructured{}
411+
412+
err := json.Unmarshal(manifest, &manifests)
413+
if err != nil {
414+
log.Error(err, "failed to convert yaml to unstructured")
415+
416+
return &Result{}, err
417+
}
418+
419+
for _, manifest := range manifests {
420+
if err := p.ctrlClient.Patch(ctx, &manifest, client.Apply, client.ForceOwnership, client.FieldOwner(cacheOwner)); err != nil {
421+
errs = append(errs, err)
422+
}
423+
}
424+
}
425+
426+
for _, binaryManifest := range secret.Data {
427+
if secret.GetAnnotations()[operatorv1.CompressedAnnotation] != operatorv1.TrueValue {
428+
break
429+
}
430+
431+
manifest, err := decompressData(binaryManifest)
432+
if err != nil {
433+
log.Error(err, "failed to decompress yaml")
434+
435+
return &Result{}, err
436+
}
437+
438+
manifests := []unstructured.Unstructured{}
439+
440+
err = json.Unmarshal(manifest, &manifests)
441+
if err != nil {
442+
log.Error(err, "failed to convert yaml to unstructured")
443+
444+
return &Result{}, err
445+
}
446+
447+
for _, manifest := range manifests {
448+
if err := p.ctrlClient.Patch(ctx, &manifest, client.Apply, client.ForceOwnership, client.FieldOwner(cacheOwner)); err != nil {
449+
errs = append(errs, err)
450+
}
451+
}
452+
}
453+
454+
if err := kerrors.NewAggregate(errs); err != nil {
455+
log.Error(err, "failed to apply objects from cache")
456+
457+
return &Result{}, err
458+
}
459+
460+
log.Info("Applied all objects from cache")
461+
462+
return &Result{Completed: true}, nil
463+
}
464+
465+
// setCacheHash calculates current provider and secret hash, and updates it on the secret.
466+
func setCacheHash(ctx context.Context, cl client.Client, provider genericprovider.GenericProvider) error {
467+
secret := &corev1.Secret{}
468+
if err := cl.Get(ctx, client.ObjectKey{Name: ProviderCacheName(provider), Namespace: provider.GetNamespace()}, secret); err != nil {
469+
return fmt.Errorf("failed to get cache secret: %w", err)
470+
}
471+
472+
helper, err := patch.NewHelper(secret, cl)
323473
if err != nil {
324-
return "", err
474+
return err
325475
}
326476

327-
if err := addConfigSecretToHash(ctx, k8sClient, hash, provider); err != nil {
328-
return "", err
477+
hash := sha256.New()
478+
479+
if err := providerHash(ctx, cl, hash, provider); err != nil {
480+
return err
481+
}
482+
483+
if err := addObjectToHash(hash, secret.Data); err != nil {
484+
return err
329485
}
330486

331-
return fmt.Sprintf("%x", hash.Sum(nil)), nil
487+
cacheHash := fmt.Sprintf("%x", hash.Sum(nil))
488+
489+
annotations := secret.GetAnnotations()
490+
if annotations == nil {
491+
annotations = map[string]string{}
492+
}
493+
494+
annotations[appliedSpecHashAnnotation] = cacheHash
495+
secret.SetAnnotations(annotations)
496+
497+
// Set hash on the provider to avoid cache re-use on re-creation
498+
annotations = provider.GetAnnotations()
499+
if annotations == nil {
500+
annotations = map[string]string{}
501+
}
502+
503+
annotations[appliedSpecHashAnnotation] = cacheHash
504+
provider.SetAnnotations(annotations)
505+
506+
return helper.Patch(ctx, secret)
332507
}

internal/controller/manifests_downloader.go

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"compress/gzip"
2222
"context"
2323
"fmt"
24+
"io"
2425

2526
corev1 "k8s.io/api/core/v1"
2627
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -40,6 +41,7 @@ const (
4041
configMapSourceLabel = "provider.cluster.x-k8s.io/source"
4142
configMapSourceAnnotation = "provider.cluster.x-k8s.io/source"
4243
operatorManagedLabel = "managed-by.operator.cluster.x-k8s.io"
44+
operatorCacheLabel = "cached-by.operator.cluster.x-k8s.io"
4345

4446
maxConfigMapSize = 1 * 1024 * 1024
4547
ociSource = "oci"
@@ -138,6 +140,16 @@ func (p *PhaseReconciler) checkConfigMapExists(ctx context.Context, labelSelecto
138140
return len(configMapList.Items) == 1, nil
139141
}
140142

143+
// Finalize applies combined hash to a configMap, in order to mark provider provisioning completed.
144+
func (p *PhaseReconciler) Finalize(ctx context.Context) (*Result, error) {
145+
err := setCacheHash(ctx, p.ctrlClient, p.provider)
146+
if err != nil {
147+
ctrl.LoggerFrom(ctx).V(5).Error(err, "Failed to update providers hash")
148+
}
149+
150+
return &Result{}, wrapPhaseError(err, "failed to update providers hash", operatorv1.ProviderInstalledCondition)
151+
}
152+
141153
// prepareConfigMapLabels returns labels that identify a config map with downloaded manifests.
142154
func (p *PhaseReconciler) prepareConfigMapLabels() map[string]string {
143155
return ProviderLabels(p.provider)
@@ -178,17 +190,10 @@ func TemplateManifestsConfigMap(provider operatorv1.GenericProvider, labels map[
178190
configMap.Data[operatorv1.ComponentsConfigMapKey] = string(components)
179191
} else {
180192
var componentsBuf bytes.Buffer
181-
zw := gzip.NewWriter(&componentsBuf)
182-
183-
_, err := zw.Write(components)
184-
if err != nil {
193+
if err := compressData(&componentsBuf, components); err != nil {
185194
return nil, fmt.Errorf("cannot compress data for provider %s/%s: %w", provider.GetNamespace(), provider.GetName(), err)
186195
}
187196

188-
if err := zw.Close(); err != nil {
189-
return nil, err
190-
}
191-
192197
configMap.BinaryData = map[string][]byte{
193198
operatorv1.ComponentsConfigMapKey: componentsBuf.Bytes(),
194199
}
@@ -211,6 +216,41 @@ func TemplateManifestsConfigMap(provider operatorv1.GenericProvider, labels map[
211216
return configMap, nil
212217
}
213218

219+
// compressData takes a bytes.Buffer and data, and compresses data into it.
220+
func compressData(componentsBuf *bytes.Buffer, data []byte) (err error) {
221+
zw := gzip.NewWriter(componentsBuf)
222+
223+
_, err = zw.Write(data)
224+
defer func() {
225+
err = zw.Close()
226+
}()
227+
228+
if err != nil {
229+
return fmt.Errorf("cannot compress data: %w", err)
230+
}
231+
232+
return
233+
}
234+
235+
// decompressData takes a compressed data, and decompresses it.
236+
func decompressData(compressedData []byte) (data []byte, err error) {
237+
zr, err := gzip.NewReader(bytes.NewReader(compressedData))
238+
if err != nil {
239+
return nil, fmt.Errorf("cannot open gzip reader from data: %w", err)
240+
}
241+
242+
defer func() {
243+
err = zr.Close()
244+
}()
245+
246+
decompressedData, err := io.ReadAll(zr)
247+
if err != nil {
248+
return nil, fmt.Errorf("cannot decompress data: %w", err)
249+
}
250+
251+
return decompressedData, nil
252+
}
253+
214254
// OCIConfigMap templates config from the OCI source.
215255
func OCIConfigMap(ctx context.Context, provider operatorv1.GenericProvider, auth *auth.Credential) (*corev1.ConfigMap, error) {
216256
store, err := FetchOCI(ctx, provider, auth)
@@ -301,6 +341,12 @@ func ProviderLabels(provider operatorv1.GenericProvider) map[string]string {
301341
return labels
302342
}
303343

344+
// ProviderCacheName generates a cache name for a given provider.
345+
346+
func ProviderCacheName(provider operatorv1.GenericProvider) string {
347+
return fmt.Sprintf("%s-%s-%s-cache", provider.GetType(), provider.GetName(), provider.GetSpec().Version)
348+
}
349+
304350
// needToCompress checks whether the input data exceeds the maximum configmap
305351
// size limit and returns whether it should be compressed.
306352
func needToCompress(bs ...[]byte) bool {

0 commit comments

Comments
 (0)