@@ -23,6 +23,7 @@ import (
2323
2424 appsv1 "k8s.io/api/apps/v1"
2525 corev1 "k8s.io/api/core/v1"
26+ "k8s.io/apimachinery/pkg/api/equality"
2627 "k8s.io/apimachinery/pkg/api/errors"
2728 "k8s.io/apimachinery/pkg/runtime"
2829 "k8s.io/client-go/tools/record"
@@ -78,8 +79,24 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
7879 return ctrl.Result {}, err
7980 }
8081
82+ // Keep a copy of the old status for patching later
83+ oldEtcdCluster := etcdCluster .DeepCopy ()
84+
85+ // --- Defer the status update ---
86+ // Use a closure to capture the logger and handle potential errors from updateStatusIfNeeded
87+ defer func () {
88+ if err := r .updateStatusIfNeeded (ctx , etcdCluster , oldEtcdCluster ); err != nil {
89+ // Log the error from status update, but don't change the Reconcile return value here.
90+ // Controller Runtime will likely retry anyway if the status update failed.
91+ logger .Error (err , "Deferred status update failed" )
92+ }
93+ }()
94+
8195 if etcdCluster .Spec .Size == 0 {
8296 logger .Info ("EtcdCluster size is 0..Skipping next steps" )
97+ etcdCluster .Status .Phase = "Idle" // Example: Set a phase even for size 0
98+ etcdCluster .Status .ReadyReplicas = 0
99+ etcdCluster .Status .Members = 0
83100 return ctrl.Result {}, nil
84101 }
85102
@@ -93,53 +110,81 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
93110 if errors .IsNotFound (err ) {
94111 logger .Info ("Creating StatefulSet with 0 replica" , "expectedSize" , etcdCluster .Spec .Size )
95112 // Create a new StatefulSet
96-
97113 sts , err = reconcileStatefulSet (ctx , logger , etcdCluster , r .Client , 0 , r .Scheme )
98114 if err != nil {
115+ logger .Error (err , "Failed to create StatefulSet" )
116+ etcdCluster .Status .Phase = "Failed"
99117 return ctrl.Result {}, err
100118 }
101119 } else {
102120 // If an error occurs during Get/Create, we'll requeue the item so we can
103121 // attempt processing again later. This could have been caused by a
104122 // temporary network failure, or any other transient reason.
105123 logger .Error (err , "Failed to get StatefulSet. Requesting requeue" )
124+ etcdCluster .Status .Phase = "Failed"
106125 return ctrl.Result {RequeueAfter : requeueDuration }, nil
107126 }
108127 }
109128
129+ // At this point, sts should exist (either found or created)
130+ if sts == nil {
131+ // This case should ideally not happen if error handling above is correct
132+ err := fmt .Errorf ("statefulSet is unexpectedly nil after get/create" )
133+ logger .Error (err , "Internal error" )
134+ etcdCluster .Status .Phase = "Failed"
135+ return ctrl.Result {}, err // Return error, defer will update status
136+ }
137+
138+ // Update status based on STS before proceeding
139+ etcdCluster .Status .ReadyReplicas = sts .Status .ReadyReplicas
140+
110141 // If the Statefulsets is not controlled by this EtcdCluster resource, we should log
111142 // a warning to the event recorder and return error msg.
112143 err = checkStatefulSetControlledByEtcdOperator (etcdCluster , sts )
113144 if err != nil {
114145 logger .Error (err , "StatefulSet is not controlled by this EtcdCluster resource" )
146+ etcdCluster .Status .Phase = "Failed"
115147 return ctrl.Result {}, err
116148 }
117149
118150 // If statefulset size is 0. try to instantiate the cluster with 1 member
119151 if sts .Spec .Replicas != nil && * sts .Spec .Replicas == 0 {
120152 logger .Info ("StatefulSet has 0 replicas. Trying to create a new cluster with 1 member" )
121-
122153 sts , err = reconcileStatefulSet (ctx , logger , etcdCluster , r .Client , 1 , r .Scheme )
123154 if err != nil {
155+ logger .Error (err , "Failed to scale StatefulSet to 1 replica" )
156+ etcdCluster .Status .Phase = "Failed"
124157 return ctrl.Result {}, err
125158 }
159+ // Successfully scaled to 1, update status fields and requeue to wait for pod readiness
160+ etcdCluster .Status .ReadyReplicas = sts .Status .ReadyReplicas
161+ etcdCluster .Status .Phase = "Initializing"
162+ // return ctrl.Result{RequeueAfter: requeueDuration}, nil // Requeue to check readiness, should we do it?
126163 }
127164
128165 err = createHeadlessServiceIfNotExist (ctx , logger , r .Client , etcdCluster , r .Scheme )
129166 if err != nil {
167+ logger .Error (err , "Failed to create Headless Service" )
168+ etcdCluster .Status .Phase = "Failed"
130169 return ctrl.Result {}, err
131170 }
132171
133172 logger .Info ("Now checking health of the cluster members" )
134173 memberListResp , healthInfos , err := healthCheck (sts , logger )
135174 if err != nil {
175+ logger .Error (err , "Health check failed" )
176+ etcdCluster .Status .Phase = "Degraded" // Or "Unavailable"?
136177 return ctrl.Result {}, fmt .Errorf ("health check failed: %w" , err )
137178 }
138179
139180 memberCnt := 0
140181 if memberListResp != nil {
141182 memberCnt = len (memberListResp .Members )
142183 }
184+ etcdCluster .Status .Members = int32 (memberCnt )
185+ // TODO: Update CurrentVersion from healthInfos
186+ // TODO: Update Conditions based on healthInfos
187+
143188 targetReplica := * sts .Spec .Replicas // Start with the current size of the stateful set
144189
145190 // The number of replicas in the StatefulSet doesn't match the number of etcd members in the cluster.
@@ -151,6 +196,8 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
151196 logger .Info ("Increasing StatefulSet replicas to match the etcd cluster member count" , "oldReplicaCount" , targetReplica , "newReplicaCount" , newReplicaCount )
152197 _ , err = reconcileStatefulSet (ctx , logger , etcdCluster , r .Client , newReplicaCount , r .Scheme )
153198 if err != nil {
199+ logger .Error (err , "Failed to adjust StatefulSet replicas to match member count" )
200+ etcdCluster .Status .Phase = "Failed"
154201 return ctrl.Result {}, err
155202 }
156203 } else {
@@ -159,9 +206,14 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
159206 logger .Info ("Decreasing StatefulSet replicas to remove the unneeded Pod." , "oldReplicaCount" , targetReplica , "newReplicaCount" , newReplicaCount )
160207 _ , err = reconcileStatefulSet (ctx , logger , etcdCluster , r .Client , newReplicaCount , r .Scheme )
161208 if err != nil {
209+ logger .Error (err , "Failed to adjust StatefulSet replicas to match member count" )
210+ etcdCluster .Status .Phase = "Failed"
162211 return ctrl.Result {}, err
163212 }
164213 }
214+ // Successfully adjusted STS, update status and requeue
215+ etcdCluster .Status .ReadyReplicas = sts .Status .ReadyReplicas
216+ etcdCluster .Status .Phase = "Scaling"
165217 return ctrl.Result {RequeueAfter : requeueDuration }, nil
166218 }
167219
@@ -175,6 +227,10 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
175227 // Find the leader status
176228 _ , leaderStatus = etcdutils .FindLeaderStatus (healthInfos , logger )
177229 if leaderStatus == nil {
230+ err := fmt .Errorf ("couldn't find leader, memberCnt: %d" , memberCnt )
231+ logger .Error (err , "Leader election might be in progress or cluster unhealthy" )
232+ etcdCluster .Status .Phase = "Degraded" // Or Unavailable
233+ // TODO: Add Condition
178234 // If the leader is not available, let's wait for the leader to be elected
179235 return ctrl.Result {}, fmt .Errorf ("couldn't find leader, memberCnt: %d" , memberCnt )
180236 }
@@ -191,13 +247,18 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
191247 eps = eps [:(len (eps ) - 1 )]
192248 err = etcdutils .PromoteLearner (eps , learner )
193249 if err != nil {
250+ logger .Error (err , "Failed to promote learner" )
251+ etcdCluster .Status .Phase = "Failed" // Promotion failed
252+ // TODO: Add Condition
194253 // The member is not promoted yet, so we error out
195254 return ctrl.Result {}, err
196255 }
256+ etcdCluster .Status .Phase = "PromotingLearner" // Indicate promotion happened
197257 } else {
198258 // Learner is not yet ready. We can't add another learner or proceed further until this one is promoted
199259 // So let's requeue
200260 logger .Info ("The learner member isn't ready to be promoted yet" , "learnerID" , learner )
261+ etcdCluster .Status .Phase = "PromotingLearner" // Still trying to promote
201262 return ctrl.Result {RequeueAfter : requeueDuration }, nil
202263 }
203264 }
@@ -218,6 +279,9 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
218279 targetReplica ++
219280 logger .Info ("[Scale out] adding a new learner member to etcd cluster" , "peerURLs" , peerURL )
220281 if _ , err := etcdutils .AddMember (eps , []string {peerURL }, true ); err != nil {
282+ logger .Error (err , "Failed to add learner member" )
283+ etcdCluster .Status .Phase = "Failed" // Scaling failed
284+ // TODO: Add Condition
221285 return ctrl.Result {}, err
222286 }
223287
@@ -232,31 +296,61 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
232296 logger .Info ("[Scale in] removing one member" , "memberID" , memberID )
233297 eps = eps [:targetReplica ]
234298 if err := etcdutils .RemoveMember (eps , memberID ); err != nil {
299+ logger .Error (err , "Failed to remove member" )
300+ etcdCluster .Status .Phase = "Failed" // Scaling failed
301+ // TODO: Add Condition
235302 return ctrl.Result {}, err
236303 }
237304 }
238305
239306 sts , err = reconcileStatefulSet (ctx , logger , etcdCluster , r .Client , targetReplica , r .Scheme )
240307 if err != nil {
308+ logger .Error (err , "Failed to update StatefulSet during scaling" )
309+ etcdCluster .Status .Phase = "Failed"
241310 return ctrl.Result {}, err
242311 }
243312
244313 allMembersHealthy , err := areAllMembersHealthy (sts , logger )
245314 if err != nil {
315+ logger .Error (err , "Final health check failed" )
316+ etcdCluster .Status .Phase = "Degraded"
317+ // TODO: Add Condition
246318 return ctrl.Result {}, err
247319 }
248320
249321 if * sts .Spec .Replicas != int32 (etcdCluster .Spec .Size ) || ! allMembersHealthy {
250322 // Requeue if the statefulset size is not equal to the expected size of ETCD cluster
251323 // Or if all members of the cluster are not healthy
324+ etcdCluster .Status .Phase = "Degraded"
325+ // TODO: Add Condition
252326 return ctrl.Result {RequeueAfter : requeueDuration }, nil
253327 }
254328
329+ etcdCluster .Status .Phase = "Running" // Final healthy state
330+ // TODO: Set Available Condition to True
255331 logger .Info ("EtcdCluster reconciled successfully" )
256332 return ctrl.Result {}, nil
257333
258334}
259335
336+ // updateStatusIfNeeded compares the old and new status and patches if changed.
337+ func (r * EtcdClusterReconciler ) updateStatusIfNeeded (ctx context.Context , etcdCluster * ecv1alpha1.EtcdCluster , oldEtcdCluster * ecv1alpha1.EtcdCluster ) error {
338+ logger := log .FromContext (ctx )
339+ // Compare the new status with the old status
340+ if ! equality .Semantic .DeepEqual (oldEtcdCluster .Status , etcdCluster .Status ) {
341+ logger .Info ("Updating EtcdCluster status" , "namespace" , etcdCluster .Namespace , "name" , etcdCluster .Name )
342+ err := r .Status ().Patch (ctx , etcdCluster , client .MergeFrom (oldEtcdCluster ))
343+ if err != nil {
344+ logger .Error (err , "Failed to update EtcdCluster status" , "namespace" , etcdCluster .Namespace , "name" , etcdCluster .Name )
345+ return err // Return the error so the Reconcile loop retries
346+ }
347+ logger .Info ("Successfully updated EtcdCluster status" , "namespace" , etcdCluster .Namespace , "name" , etcdCluster .Name )
348+ } else {
349+ logger .V (1 ).Info ("EtcdCluster status is already up-to-date" , "namespace" , etcdCluster .Namespace , "name" , etcdCluster .Name ) // Use V(1) for less important info
350+ }
351+ return nil // No error occurred during status update itself
352+ }
353+
260354// SetupWithManager sets up the controller with the Manager.
261355func (r * EtcdClusterReconciler ) SetupWithManager (mgr ctrl.Manager ) error {
262356 r .Recorder = mgr .GetEventRecorderFor ("etcdcluster-controller" )
0 commit comments