diff --git a/reconciler/incremental.go b/reconciler/incremental.go index 2620ff9..462ea53 100644 --- a/reconciler/incremental.go +++ b/reconciler/incremental.go @@ -13,16 +13,14 @@ import ( "github.com/cilium/statedb" ) -// incrementalRound is the shared context for incremental reconciliation and retries. -type incrementalRound[Obj comparable] struct { +// incremental is the shared context for incremental reconciliation and retries. +type incremental[Obj comparable] struct { metrics Metrics moduleID cell.FullModuleID config *config[Obj] retries *retries primaryIndexer statedb.Indexer[Obj] db *statedb.DB - ctx context.Context - txn statedb.ReadTxn table statedb.RWTable[Obj] // numReconciled counts the number of objects that have been reconciled in this @@ -47,48 +45,40 @@ type opResult struct { id uint64 // the "pending" identifier } -func (r *reconciler[Obj]) incremental(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) []error { - round := incrementalRound[Obj]{ - moduleID: r.ModuleID, - metrics: r.config.Metrics, - config: &r.config, - retries: r.retries, - primaryIndexer: r.primaryIndexer, - db: r.DB, - ctx: ctx, - txn: txn, - table: r.config.Table, - results: make(map[Obj]opResult), - } - +func (incr *incremental[Obj]) run(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) []error { // Reconcile new and changed objects using either Operations // or BatchOperations. - if r.config.BatchOperations != nil { - round.batch(changes) + if incr.config.BatchOperations != nil { + incr.batch(ctx, txn, changes) } else { - round.single(changes) + incr.single(ctx, txn, changes) } // Process objects that need to be retried that were not cleared. - round.processRetries() + incr.processRetries(ctx, txn) // Finally commit the status updates. - newErrors := round.commitStatus() + newErrors := incr.commitStatus() // Since all failures are retried, we can return the errors from the retry // queue which includes both errors occurred in this round and the old // errors. - errs := round.retries.errors() - round.metrics.ReconciliationErrors(r.ModuleID, newErrors, len(errs)) + errs := incr.retries.errors() + incr.metrics.ReconciliationErrors(incr.moduleID, newErrors, len(errs)) + + // Prepare for next round + incr.numReconciled = 0 + clear(incr.results) + return errs } -func (round *incrementalRound[Obj]) single(changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) { +func (incr *incremental[Obj]) single(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) { // Iterate in revision order through new and changed objects. for change, rev := range changes { obj := change.Object - status := round.config.GetObjectStatus(obj) + status := incr.config.GetObjectStatus(obj) if !change.Deleted && !status.IsPendingOrRefreshing() { // Only process objects that are pending reconciliation, e.g. // changed from outside. @@ -97,25 +87,25 @@ func (round *incrementalRound[Obj]) single(changes iter.Seq2[statedb.Change[Obj] } // Clear retries as the object has changed. - round.retries.Clear(obj) + incr.retries.Clear(obj) - round.processSingle(obj, rev, change.Deleted) - round.numReconciled++ - if round.numReconciled >= round.config.IncrementalRoundSize { + incr.processSingle(ctx, txn, obj, rev, change.Deleted) + incr.numReconciled++ + if incr.numReconciled >= incr.config.IncrementalRoundSize { break } } } -func (round *incrementalRound[Obj]) batch(changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) { - ops := round.config.BatchOperations +func (incr *incremental[Obj]) batch(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) { + ops := incr.config.BatchOperations updateBatch := []BatchEntry[Obj]{} deleteBatch := []BatchEntry[Obj]{} for change, rev := range changes { obj := change.Object - status := round.config.GetObjectStatus(obj) + status := incr.config.GetObjectStatus(obj) if !change.Deleted && !status.IsPendingOrRefreshing() { // Only process objects that are pending reconciliation, e.g. // changed from outside. @@ -124,11 +114,11 @@ func (round *incrementalRound[Obj]) batch(changes iter.Seq2[statedb.Change[Obj], } // Clear an existing retry as the object has changed. - round.retries.Clear(obj) + incr.retries.Clear(obj) // Clone the object so we or the operations can mutate it. orig := obj - obj = round.config.CloneObject(obj) + obj = incr.config.CloneObject(obj) if change.Deleted { deleteBatch = append(deleteBatch, BatchEntry[Obj]{Object: obj, Revision: rev, original: orig}) @@ -136,8 +126,8 @@ func (round *incrementalRound[Obj]) batch(changes iter.Seq2[statedb.Change[Obj], updateBatch = append(updateBatch, BatchEntry[Obj]{Object: obj, Revision: rev, original: orig}) } - round.numReconciled++ - if round.numReconciled >= round.config.IncrementalRoundSize { + incr.numReconciled++ + if incr.numReconciled >= incr.config.IncrementalRoundSize { break } } @@ -145,16 +135,16 @@ func (round *incrementalRound[Obj]) batch(changes iter.Seq2[statedb.Change[Obj], // Process the delete batch first to make room. if len(deleteBatch) > 0 { start := time.Now() - ops.DeleteBatch(round.ctx, round.txn, deleteBatch) - round.metrics.ReconciliationDuration( - round.moduleID, + ops.DeleteBatch(ctx, txn, deleteBatch) + incr.metrics.ReconciliationDuration( + incr.moduleID, OpDelete, time.Since(start), ) for _, entry := range deleteBatch { if entry.Result != nil { // Delete failed, queue a retry for it. - round.retries.Add(entry.original, entry.Revision, true, entry.Result) + incr.retries.Add(entry.original, entry.Revision, true, entry.Result) } } } @@ -162,37 +152,37 @@ func (round *incrementalRound[Obj]) batch(changes iter.Seq2[statedb.Change[Obj], // And then the update batch. if len(updateBatch) > 0 { start := time.Now() - ops.UpdateBatch(round.ctx, round.txn, updateBatch) - round.metrics.ReconciliationDuration( - round.moduleID, + ops.UpdateBatch(ctx, txn, updateBatch) + incr.metrics.ReconciliationDuration( + incr.moduleID, OpUpdate, time.Since(start), ) for _, entry := range updateBatch { - status := round.config.GetObjectStatus(entry.Object) + status := incr.config.GetObjectStatus(entry.Object) if entry.Result == nil { - round.retries.Clear(entry.Object) + incr.retries.Clear(entry.Object) } - round.results[entry.Object] = opResult{rev: entry.Revision, id: status.ID, err: entry.Result, original: entry.original} + incr.results[entry.Object] = opResult{rev: entry.Revision, id: status.ID, err: entry.Result, original: entry.original} } } } -func (round *incrementalRound[Obj]) processRetries() { +func (incr *incremental[Obj]) processRetries(ctx context.Context, txn statedb.ReadTxn) { now := time.Now() - for round.numReconciled < round.config.IncrementalRoundSize { - item, ok := round.retries.Top() + for incr.numReconciled < incr.config.IncrementalRoundSize { + item, ok := incr.retries.Top() if !ok || item.retryAt.After(now) { break } - round.retries.Pop() - round.processSingle(item.object.(Obj), item.rev, item.delete) - round.numReconciled++ + incr.retries.Pop() + incr.processSingle(ctx, txn, item.object.(Obj), item.rev, item.delete) + incr.numReconciled++ } } -func (round *incrementalRound[Obj]) processSingle(obj Obj, rev statedb.Revision, delete bool) { +func (incr *incremental[Obj]) processSingle(ctx context.Context, txn statedb.ReadTxn, obj Obj, rev statedb.Revision, delete bool) { start := time.Now() var ( @@ -201,38 +191,38 @@ func (round *incrementalRound[Obj]) processSingle(obj Obj, rev statedb.Revision, ) if delete { op = OpDelete - err = round.config.Operations.Delete(round.ctx, round.txn, rev, obj) + err = incr.config.Operations.Delete(ctx, txn, rev, obj) if err != nil { // Deletion failed. Retry again later. - round.retries.Add(obj, rev, true, err) + incr.retries.Add(obj, rev, true, err) } } else { // Clone the object so it can be mutated by Update() orig := obj - obj = round.config.CloneObject(obj) + obj = incr.config.CloneObject(obj) op = OpUpdate - err = round.config.Operations.Update(round.ctx, round.txn, rev, obj) - status := round.config.GetObjectStatus(obj) - round.results[obj] = opResult{original: orig, id: status.ID, rev: rev, err: err} + err = incr.config.Operations.Update(ctx, txn, rev, obj) + status := incr.config.GetObjectStatus(obj) + incr.results[obj] = opResult{original: orig, id: status.ID, rev: rev, err: err} } - round.metrics.ReconciliationDuration(round.moduleID, op, time.Since(start)) + incr.metrics.ReconciliationDuration(incr.moduleID, op, time.Since(start)) if err == nil { - round.retries.Clear(obj) + incr.retries.Clear(obj) } } -func (round *incrementalRound[Obj]) commitStatus() (numErrors int) { - if len(round.results) == 0 { +func (incr *incremental[Obj]) commitStatus() (numErrors int) { + if len(incr.results) == 0 { // Nothing to commit. return } - wtxn := round.db.WriteTxn(round.table) + wtxn := incr.db.WriteTxn(incr.table) defer wtxn.Commit() // Commit status for updated objects. - for obj, result := range round.results { + for obj, result := range incr.results { // Update the object if it is unchanged. It may happen that the object has // been updated in the meanwhile, in which case we skip updating the status // and reprocess the object on the next round. @@ -245,7 +235,7 @@ func (round *incrementalRound[Obj]) commitStatus() (numErrors int) { numErrors++ } - current, exists, err := round.table.CompareAndSwap(wtxn, result.rev, round.config.SetObjectStatus(obj, status)) + current, exists, err := incr.table.CompareAndSwap(wtxn, result.rev, incr.config.SetObjectStatus(obj, status)) if errors.Is(err, statedb.ErrRevisionNotEqual) && exists { // The object had changed. Check if the pending status still carries the same // identifier and if so update the object. This is an optimization for supporting @@ -255,19 +245,19 @@ func (round *incrementalRound[Obj]) commitStatus() (numErrors int) { // The limitation of this approach is that we cannot support the reconciler // modifying the object during reconciliation as the following will forget // the changes. - currentStatus := round.config.GetObjectStatus(current) + currentStatus := incr.config.GetObjectStatus(current) if currentStatus.Kind == StatusKindPending && currentStatus.ID == result.id { - current = round.config.CloneObject(current) - current = round.config.SetObjectStatus(current, status) - round.table.Insert(wtxn, current) + current = incr.config.CloneObject(current) + current = incr.config.SetObjectStatus(current, status) + incr.table.Insert(wtxn, current) } } if result.err != nil && err == nil { // Reconciliation of the object had failed and the status was updated // successfully (object had not changed). Queue the retry for the object. - newRevision := round.table.Revision(wtxn) - round.retries.Add(result.original.(Obj), newRevision, false, result.err) + newRevision := incr.table.Revision(wtxn) + incr.retries.Add(result.original.(Obj), newRevision, false, result.err) } } return diff --git a/reconciler/reconciler.go b/reconciler/reconciler.go index 3a24bcf..181365f 100644 --- a/reconciler/reconciler.go +++ b/reconciler/reconciler.go @@ -51,6 +51,17 @@ func (r *reconciler[Obj]) reconcileLoop(ctx context.Context, health cell.Health) tableInitialized := false _, tableInitWatch := r.config.Table.Initialized(txn) + incremental := incremental[Obj]{ + moduleID: r.ModuleID, + metrics: r.config.Metrics, + config: &r.config, + retries: r.retries, + primaryIndexer: r.primaryIndexer, + db: r.DB, + table: r.config.Table, + results: make(map[Obj]opResult), + } + for { // Throttle a bit before reconciliation to allow for a bigger batch to arrive and // for objects to settle. @@ -89,7 +100,7 @@ func (r *reconciler[Obj]) reconcileLoop(ctx context.Context, health cell.Health) // Perform incremental reconciliation and retries of previously failed // objects. - errs := r.incremental(ctx, txn, changes) + errs := incremental.run(ctx, txn, changes) if tableInitialized && (prune || externalPrune) { if err := r.prune(ctx, txn); err != nil {