Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
1d09051
Watch child objects and trigger instance reconciliation on change
barney-s May 30, 2025
94eafd1
replace watchset with informers (for now)
jakobmoellerdev Jul 23, 2025
e5b9bca
Merge remote-tracking branch 'upstream/main' into watchset
jakobmoellerdev Jul 23, 2025
f8ba07a
chore: remove example for separate PR
jakobmoellerdev Jul 23, 2025
ee7771a
chore: fixup context handling (attempt)
jakobmoellerdev Jul 24, 2025
864d50a
Merge remote-tracking branch 'refs/remotes/upstream/main' into watchset
jakobmoellerdev Jul 24, 2025
41738d0
test: add some tests for the instance resource reconcile
jakobmoellerdev Jul 24, 2025
e0bd3b6
refactor: replace dynamic client with metadata client
jakobmoellerdev Sep 15, 2025
558604e
Merge remote-tracking branch 'refs/remotes/upstream/main' into replac…
jakobmoellerdev Sep 15, 2025
988f728
refactor: replace dynamic client with metadata client
jakobmoellerdev Sep 15, 2025
263729c
refactor: replace instance-watch-resources with instance reconciliati…
jakobmoellerdev Sep 18, 2025
80c650d
Merge remote-tracking branch 'upstream/main' into replace-watchset-wi…
jakobmoellerdev Oct 1, 2025
6877414
refactor: replace instance-watch-resources with instance reconciliati…
jakobmoellerdev Sep 18, 2025
b01c70c
chore: merge cleanup
jakobmoellerdev Oct 1, 2025
af83d2c
Merge remote-tracking branch 'upstream/main' into replace-watchset-wi…
jakobmoellerdev Oct 2, 2025
cd1e97f
refactor: replace perGVRWatch with LazyInformer for improved informer…
jakobmoellerdev Oct 2, 2025
07abb25
Merge remote-tracking branch 'upstream/main' into replace-watchset-wi…
jakobmoellerdev Oct 9, 2025
a669a89
refactor: enhance LazyInformer with logging, context handling, and im…
jakobmoellerdev Oct 9, 2025
cc988bb
refactor: disable resync in DynamicController and improve configurati…
jakobmoellerdev Oct 9, 2025
8433e39
refactor: simplify test case for validating kind names
jakobmoellerdev Oct 9, 2025
7e88506
refactor: add configurable queue shutdown timeout in DynamicController
jakobmoellerdev Oct 9, 2025
0a5f974
Merge remote-tracking branch 'upstream/main' into replace-watchset-wi…
jakobmoellerdev Oct 20, 2025
4c9832c
chore: bring back accidentally removed comment
jakobmoellerdev Oct 20, 2025
cadb6be
fix: adjust TestStatus kind suffix in instance_resource_watch_test
jakobmoellerdev Oct 20, 2025
572b315
chore: RESTMapper inclusion and tests
jakobmoellerdev Oct 29, 2025
0665922
chore: update default InstancePolicy to Reactive in CRD and API types
jakobmoellerdev Oct 29, 2025
4d65398
chore: small fixup of invalid resync
jakobmoellerdev Oct 29, 2025
4d5b4da
chore: unify test because of runtime behavior of test context
jakobmoellerdev Oct 29, 2025
0359adb
chore: remove `ResourceGraphDefinitionInstancePolicy` and related log…
jakobmoellerdev Nov 3, 2025
d8eec74
feat: add dependency readiness check in instance reconciliation
govindup63 Oct 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ func main() {
"Burst size of events for the dynamic controller rate limiter.")

// reconciler parameters
flag.IntVar(&resyncPeriod, "dynamic-controller-default-resync-period", 36000,
"interval at which the controller will re list resources even with no changes, in seconds")
flag.IntVar(&resyncPeriod, "dynamic-controller-default-resync-period", 0,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would still keep this 10hours, just in case we miss events, or encounter watch errors.

"interval at which the controller will re list resources even with no changes, in seconds. "+
"By default resync is disabled.")
flag.IntVar(&queueMaxRetries, "dynamic-controller-default-queue-max-retries", 20,
"maximum number of retries for an item in the queue will be retried before being dropped")
// log level flags
Expand Down Expand Up @@ -186,7 +187,7 @@ func main() {
MaxRetryDelay: maxRetryDelay,
RateLimit: rateLimit,
BurstLimit: burstLimit,
}, set.Dynamic())
}, set.Metadata(), set.RESTMapper())

resourceGraphDefinitionGraphBuilder, err := graph.NewBuilder(
restConfig, set.HTTPClient(),
Expand Down
19 changes: 19 additions & 0 deletions config/crd/bases/kro.run_resourcegraphdefinitions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,25 @@ spec:
description: ResourceGraphDefinitionSpec defines the desired state of
ResourceGraphDefinition
properties:
reconcile:
description: Reconcile defines the desired reconciliation settings
for a ResourceGraphDefinition.
properties:
instancePolicy:
default: Reactive
description: |-
InstancePolicy defines the reconciliation policy for the instances
created from the ResourceGraphDefinition.
When set to "Periodic", or not set, the instances will be reconciled periodically.
When set to "Reactive", the instances will be reconciled when any changes are detected
on the resources owned by the instance. This change detection is achieved by watching
the resources owned by the instance and enqueueing a reconcile request for the controller
responsible for the instance.
enum:
- Periodic
- Reactive
type: string
type: object
resources:
description: The resources that are part of the resourcegraphdefinition.
items:
Expand Down
19 changes: 19 additions & 0 deletions helm/crds/kro.run_resourcegraphdefinitions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,25 @@ spec:
description: ResourceGraphDefinitionSpec defines the desired state of
ResourceGraphDefinition
properties:
reconcile:
description: Reconcile defines the desired reconciliation settings
for a ResourceGraphDefinition.
properties:
instancePolicy:
default: Reactive
description: |-
InstancePolicy defines the reconciliation policy for the instances
created from the ResourceGraphDefinition.
When set to "Periodic", or not set, the instances will be reconciled periodically.
When set to "Reactive", the instances will be reconciled when any changes are detected
on the resources owned by the instance. This change detection is achieved by watching
the resources owned by the instance and enqueueing a reconcile request for the controller
responsible for the instance.
enum:
- Periodic
- Reactive
type: string
type: object
resources:
description: The resources that are part of the resourcegraphdefinition.
items:
Expand Down
12 changes: 12 additions & 0 deletions pkg/client/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/metadata"
"k8s.io/client-go/rest"
ctrlrtconfig "sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/release-utils/version"
Expand Down Expand Up @@ -58,6 +59,7 @@ type Set struct {
config *rest.Config
kubernetes *kubernetes.Clientset
dynamic *dynamic.DynamicClient
metadata metadata.Interface
apiExtensionsV1 *apiextensionsv1.ApiextensionsV1Client
// restMapper is a REST mapper for the Kubernetes API server
restMapper meta.RESTMapper
Expand Down Expand Up @@ -124,6 +126,11 @@ func (c *Set) init() error {
return err
}

c.metadata, err = metadata.NewForConfigAndClient(c.config, c.httpClient)
if err != nil {
return err
}

c.dynamic, err = dynamic.NewForConfigAndClient(c.config, c.httpClient)
if err != nil {
return err
Expand All @@ -146,6 +153,11 @@ func (c *Set) Kubernetes() kubernetes.Interface {
return c.kubernetes
}

// Metadata returns the metadata client
func (c *Set) Metadata() metadata.Interface {
return c.metadata
}

// Dynamic returns the dynamic client
func (c *Set) Dynamic() dynamic.Interface {
return c.dynamic
Expand Down
26 changes: 26 additions & 0 deletions pkg/controller/instance/controller_reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,25 @@ func (igr *instanceGraphReconciler) updateResourceReadiness(resourceID string) {
}
}

// areDependenciesReady checks if all dependencies of a resource are ready
func (igr *instanceGraphReconciler) areDependenciesReady(resourceID string) bool {
dependencies := igr.runtime.ResourceDescriptor(resourceID).GetDependencies()

for _, depID := range dependencies {
// Check if dependency is resolved
if _, state := igr.runtime.GetResource(depID); state != runtime.ResourceStateResolved {
return false
}

// Check if dependency satisfies its readyWhen conditions
if ready, _, err := igr.runtime.IsResourceReady(depID); err != nil || !ready {
return false
}
}

return true
}

// reconcileInstance handles the reconciliation of an active instance
func (igr *instanceGraphReconciler) reconcileInstance(ctx context.Context) error {
instance := igr.runtime.GetInstance()
Expand Down Expand Up @@ -195,6 +214,13 @@ func (igr *instanceGraphReconciler) reconcileInstance(ctx context.Context) error
break
}

// Check if all dependencies are ready
if !igr.areDependenciesReady(resourceID) {
unresolvedResourceID = resourceID
prune = false
break
}

applyable := applyset.ApplyableObject{
Unstructured: resource,
ID: resourceID,
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/resourcegraphdefinition/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ func (r *ResourceGraphDefinitionReconciler) findRGDsForCRD(ctx context.Context,
}
}

func (r *ResourceGraphDefinitionReconciler) Reconcile(ctx context.Context, o *v1alpha1.ResourceGraphDefinition) (ctrl.Result, error) {
func (r *ResourceGraphDefinitionReconciler) Reconcile(
ctx context.Context,
o *v1alpha1.ResourceGraphDefinition,
) (ctrl.Result, error) {
if !o.DeletionTimestamp.IsZero() {
if err := r.cleanupResourceGraphDefinition(ctx, o); err != nil {
return ctrl.Result{}, err
Expand Down
38 changes: 28 additions & 10 deletions pkg/controller/resourcegraphdefinition/controller_reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package resourcegraphdefinition
import (
"context"
"fmt"
"maps"
"slices"
"time"

v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
Expand All @@ -25,7 +27,6 @@ import (

"github.com/kubernetes-sigs/kro/api/v1alpha1"
instancectrl "github.com/kubernetes-sigs/kro/pkg/controller/instance"
"github.com/kubernetes-sigs/kro/pkg/dynamiccontroller"
"github.com/kubernetes-sigs/kro/pkg/graph"
"github.com/kubernetes-sigs/kro/pkg/metadata"
)
Expand Down Expand Up @@ -72,15 +73,10 @@ func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinition(
mark.KindReady(crd.Status.AcceptedNames.Kind)
}

// Setup and start microcontroller
gvr := processedRGD.Instance.GetGroupVersionResource()
controller := r.setupMicroController(gvr, processedRGD, graphExecLabeler)

log.V(1).Info("reconciling resource graph definition micro controller")
// TODO: the context that is passed here is tied to the reconciliation of the rgd, we might need to make
// a new context with our own cancel function here to allow us to cleanly term the dynamic controller
// rather than have it ignore this context and use the background context.
if err := r.reconcileResourceGraphDefinitionMicroController(ctx, &gvr, controller.Reconcile); err != nil {
if err := r.reconcileResourceGraphDefinitionMicroController(ctx, processedRGD, graphExecLabeler); err != nil {
mark.ControllerFailedToStart(err.Error())
return processedRGD.TopologicalOrder, resourcesInfo, err
}
Expand All @@ -89,6 +85,14 @@ func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinition(
return processedRGD.TopologicalOrder, resourcesInfo, nil
}

func (r *ResourceGraphDefinitionReconciler) getResourceGVRsToWatchForRGD(processedRGD *graph.Graph) []schema.GroupVersionResource {
resourceHandlers := make(map[schema.GroupVersionResource]struct{}, len(processedRGD.Resources))
for _, resource := range processedRGD.Resources {
resourceHandlers[resource.GetGroupVersionResource()] = struct{}{}
}
return slices.Collect(maps.Keys(resourceHandlers))
}

// setupLabeler creates and merges the required labelers for the resource graph definition
func (r *ResourceGraphDefinitionReconciler) setupLabeler(rgd *v1alpha1.ResourceGraphDefinition) (metadata.Labeler, error) {
rgLabeler := metadata.NewResourceGraphDefinitionLabeler(rgd)
Expand All @@ -97,10 +101,10 @@ func (r *ResourceGraphDefinitionReconciler) setupLabeler(rgd *v1alpha1.ResourceG

// setupMicroController creates a new controller instance with the required configuration
func (r *ResourceGraphDefinitionReconciler) setupMicroController(
gvr schema.GroupVersionResource,
processedRGD *graph.Graph,
labeler metadata.Labeler,
) *instancectrl.Controller {
gvr := processedRGD.Instance.GetGroupVersionResource()
instanceLogger := r.instanceLogger.WithName(fmt.Sprintf("%s-controller", gvr.Resource)).WithValues(
"controller", gvr.Resource,
"controllerGroup", processedRGD.Instance.GetCRD().Spec.Group,
Expand Down Expand Up @@ -162,8 +166,22 @@ func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinitionCRD(
}

// reconcileResourceGraphDefinitionMicroController starts the microcontroller for handling the resources
func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinitionMicroController(ctx context.Context, gvr *schema.GroupVersionResource, handler dynamiccontroller.Handler) error {
err := r.dynamicController.Register(ctx, *gvr, handler)
func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinitionMicroController(
ctx context.Context,
processedRGD *graph.Graph,
graphExecLabeler metadata.Labeler,
) error {
// If we want to react to changes to resources, we need to watch for them
// and trigger reconciliations of the instances whenever these resources change.
resourceGVRsToWatch := r.getResourceGVRsToWatchForRGD(processedRGD)

// Setup and start microcontroller
controller := r.setupMicroController(processedRGD, graphExecLabeler)

ctrl.LoggerFrom(ctx).V(1).Info("reconciling resource graph definition micro controller")
gvr := processedRGD.Instance.GetGroupVersionResource()

err := r.dynamicController.Register(ctx, gvr, controller.Reconcile, resourceGVRsToWatch...)
if err != nil {
return newMicroControllerError(err)
}
Expand Down
Loading
Loading