Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 64 additions & 74 deletions reconciler/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,14 @@ import (
"github.com/cilium/statedb"
)

// incrementalRound is the shared context for incremental reconciliation and retries.
type incrementalRound[Obj comparable] struct {
// incremental is the shared context for incremental reconciliation and retries.
type incremental[Obj comparable] struct {
metrics Metrics
moduleID cell.FullModuleID
config *config[Obj]
retries *retries
primaryIndexer statedb.Indexer[Obj]
db *statedb.DB
ctx context.Context
txn statedb.ReadTxn
table statedb.RWTable[Obj]

// numReconciled counts the number of objects that have been reconciled in this
Expand All @@ -47,48 +45,40 @@ type opResult struct {
id uint64 // the "pending" identifier
}

func (r *reconciler[Obj]) incremental(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) []error {
round := incrementalRound[Obj]{
moduleID: r.ModuleID,
metrics: r.config.Metrics,
config: &r.config,
retries: r.retries,
primaryIndexer: r.primaryIndexer,
db: r.DB,
ctx: ctx,
txn: txn,
table: r.config.Table,
results: make(map[Obj]opResult),
}

func (incr *incremental[Obj]) run(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) []error {
// Reconcile new and changed objects using either Operations
// or BatchOperations.
if r.config.BatchOperations != nil {
round.batch(changes)
if incr.config.BatchOperations != nil {
incr.batch(ctx, txn, changes)
} else {
round.single(changes)
incr.single(ctx, txn, changes)
}

// Process objects that need to be retried that were not cleared.
round.processRetries()
incr.processRetries(ctx, txn)

// Finally commit the status updates.
newErrors := round.commitStatus()
newErrors := incr.commitStatus()

// Since all failures are retried, we can return the errors from the retry
// queue which includes both errors occurred in this round and the old
// errors.
errs := round.retries.errors()
round.metrics.ReconciliationErrors(r.ModuleID, newErrors, len(errs))
errs := incr.retries.errors()
incr.metrics.ReconciliationErrors(incr.moduleID, newErrors, len(errs))

// Prepare for next round
incr.numReconciled = 0
clear(incr.results)

return errs
}

func (round *incrementalRound[Obj]) single(changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) {
func (incr *incremental[Obj]) single(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) {
// Iterate in revision order through new and changed objects.
for change, rev := range changes {
obj := change.Object

status := round.config.GetObjectStatus(obj)
status := incr.config.GetObjectStatus(obj)
if !change.Deleted && !status.IsPendingOrRefreshing() {
// Only process objects that are pending reconciliation, e.g.
// changed from outside.
Expand All @@ -97,25 +87,25 @@ func (round *incrementalRound[Obj]) single(changes iter.Seq2[statedb.Change[Obj]
}

// Clear retries as the object has changed.
round.retries.Clear(obj)
incr.retries.Clear(obj)

round.processSingle(obj, rev, change.Deleted)
round.numReconciled++
if round.numReconciled >= round.config.IncrementalRoundSize {
incr.processSingle(ctx, txn, obj, rev, change.Deleted)
incr.numReconciled++
if incr.numReconciled >= incr.config.IncrementalRoundSize {
break
}
}
}

func (round *incrementalRound[Obj]) batch(changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) {
ops := round.config.BatchOperations
func (incr *incremental[Obj]) batch(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) {
ops := incr.config.BatchOperations
updateBatch := []BatchEntry[Obj]{}
deleteBatch := []BatchEntry[Obj]{}

for change, rev := range changes {
obj := change.Object

status := round.config.GetObjectStatus(obj)
status := incr.config.GetObjectStatus(obj)
if !change.Deleted && !status.IsPendingOrRefreshing() {
// Only process objects that are pending reconciliation, e.g.
// changed from outside.
Expand All @@ -124,75 +114,75 @@ func (round *incrementalRound[Obj]) batch(changes iter.Seq2[statedb.Change[Obj],
}

// Clear an existing retry as the object has changed.
round.retries.Clear(obj)
incr.retries.Clear(obj)

// Clone the object so we or the operations can mutate it.
orig := obj
obj = round.config.CloneObject(obj)
obj = incr.config.CloneObject(obj)

if change.Deleted {
deleteBatch = append(deleteBatch, BatchEntry[Obj]{Object: obj, Revision: rev, original: orig})
} else {
updateBatch = append(updateBatch, BatchEntry[Obj]{Object: obj, Revision: rev, original: orig})
}

round.numReconciled++
if round.numReconciled >= round.config.IncrementalRoundSize {
incr.numReconciled++
if incr.numReconciled >= incr.config.IncrementalRoundSize {
break
}
}

// Process the delete batch first to make room.
if len(deleteBatch) > 0 {
start := time.Now()
ops.DeleteBatch(round.ctx, round.txn, deleteBatch)
round.metrics.ReconciliationDuration(
round.moduleID,
ops.DeleteBatch(ctx, txn, deleteBatch)
incr.metrics.ReconciliationDuration(
incr.moduleID,
OpDelete,
time.Since(start),
)
for _, entry := range deleteBatch {
if entry.Result != nil {
// Delete failed, queue a retry for it.
round.retries.Add(entry.original, entry.Revision, true, entry.Result)
incr.retries.Add(entry.original, entry.Revision, true, entry.Result)
}
}
}

// And then the update batch.
if len(updateBatch) > 0 {
start := time.Now()
ops.UpdateBatch(round.ctx, round.txn, updateBatch)
round.metrics.ReconciliationDuration(
round.moduleID,
ops.UpdateBatch(ctx, txn, updateBatch)
incr.metrics.ReconciliationDuration(
incr.moduleID,
OpUpdate,
time.Since(start),
)

for _, entry := range updateBatch {
status := round.config.GetObjectStatus(entry.Object)
status := incr.config.GetObjectStatus(entry.Object)
if entry.Result == nil {
round.retries.Clear(entry.Object)
incr.retries.Clear(entry.Object)
}
round.results[entry.Object] = opResult{rev: entry.Revision, id: status.ID, err: entry.Result, original: entry.original}
incr.results[entry.Object] = opResult{rev: entry.Revision, id: status.ID, err: entry.Result, original: entry.original}
}
}
}

func (round *incrementalRound[Obj]) processRetries() {
func (incr *incremental[Obj]) processRetries(ctx context.Context, txn statedb.ReadTxn) {
now := time.Now()
for round.numReconciled < round.config.IncrementalRoundSize {
item, ok := round.retries.Top()
for incr.numReconciled < incr.config.IncrementalRoundSize {
item, ok := incr.retries.Top()
if !ok || item.retryAt.After(now) {
break
}
round.retries.Pop()
round.processSingle(item.object.(Obj), item.rev, item.delete)
round.numReconciled++
incr.retries.Pop()
incr.processSingle(ctx, txn, item.object.(Obj), item.rev, item.delete)
incr.numReconciled++
}
}

func (round *incrementalRound[Obj]) processSingle(obj Obj, rev statedb.Revision, delete bool) {
func (incr *incremental[Obj]) processSingle(ctx context.Context, txn statedb.ReadTxn, obj Obj, rev statedb.Revision, delete bool) {
start := time.Now()

var (
Expand All @@ -201,38 +191,38 @@ func (round *incrementalRound[Obj]) processSingle(obj Obj, rev statedb.Revision,
)
if delete {
op = OpDelete
err = round.config.Operations.Delete(round.ctx, round.txn, rev, obj)
err = incr.config.Operations.Delete(ctx, txn, rev, obj)
if err != nil {
// Deletion failed. Retry again later.
round.retries.Add(obj, rev, true, err)
incr.retries.Add(obj, rev, true, err)
}
} else {
// Clone the object so it can be mutated by Update()
orig := obj
obj = round.config.CloneObject(obj)
obj = incr.config.CloneObject(obj)
op = OpUpdate
err = round.config.Operations.Update(round.ctx, round.txn, rev, obj)
status := round.config.GetObjectStatus(obj)
round.results[obj] = opResult{original: orig, id: status.ID, rev: rev, err: err}
err = incr.config.Operations.Update(ctx, txn, rev, obj)
status := incr.config.GetObjectStatus(obj)
incr.results[obj] = opResult{original: orig, id: status.ID, rev: rev, err: err}
}
round.metrics.ReconciliationDuration(round.moduleID, op, time.Since(start))
incr.metrics.ReconciliationDuration(incr.moduleID, op, time.Since(start))

if err == nil {
round.retries.Clear(obj)
incr.retries.Clear(obj)
}
}

func (round *incrementalRound[Obj]) commitStatus() (numErrors int) {
if len(round.results) == 0 {
func (incr *incremental[Obj]) commitStatus() (numErrors int) {
if len(incr.results) == 0 {
// Nothing to commit.
return
}

wtxn := round.db.WriteTxn(round.table)
wtxn := incr.db.WriteTxn(incr.table)
defer wtxn.Commit()

// Commit status for updated objects.
for obj, result := range round.results {
for obj, result := range incr.results {
// Update the object if it is unchanged. It may happen that the object has
// been updated in the meanwhile, in which case we skip updating the status
// and reprocess the object on the next round.
Expand All @@ -245,7 +235,7 @@ func (round *incrementalRound[Obj]) commitStatus() (numErrors int) {
numErrors++
}

current, exists, err := round.table.CompareAndSwap(wtxn, result.rev, round.config.SetObjectStatus(obj, status))
current, exists, err := incr.table.CompareAndSwap(wtxn, result.rev, incr.config.SetObjectStatus(obj, status))
if errors.Is(err, statedb.ErrRevisionNotEqual) && exists {
// The object had changed. Check if the pending status still carries the same
// identifier and if so update the object. This is an optimization for supporting
Expand All @@ -255,19 +245,19 @@ func (round *incrementalRound[Obj]) commitStatus() (numErrors int) {
// The limitation of this approach is that we cannot support the reconciler
// modifying the object during reconciliation as the following will forget
// the changes.
currentStatus := round.config.GetObjectStatus(current)
currentStatus := incr.config.GetObjectStatus(current)
if currentStatus.Kind == StatusKindPending && currentStatus.ID == result.id {
current = round.config.CloneObject(current)
current = round.config.SetObjectStatus(current, status)
round.table.Insert(wtxn, current)
current = incr.config.CloneObject(current)
current = incr.config.SetObjectStatus(current, status)
incr.table.Insert(wtxn, current)
}
}

if result.err != nil && err == nil {
// Reconciliation of the object had failed and the status was updated
// successfully (object had not changed). Queue the retry for the object.
newRevision := round.table.Revision(wtxn)
round.retries.Add(result.original.(Obj), newRevision, false, result.err)
newRevision := incr.table.Revision(wtxn)
incr.retries.Add(result.original.(Obj), newRevision, false, result.err)
}
}
return
Expand Down
13 changes: 12 additions & 1 deletion reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ func (r *reconciler[Obj]) reconcileLoop(ctx context.Context, health cell.Health)
tableInitialized := false
_, tableInitWatch := r.config.Table.Initialized(txn)

incremental := incremental[Obj]{
moduleID: r.ModuleID,
metrics: r.config.Metrics,
config: &r.config,
retries: r.retries,
primaryIndexer: r.primaryIndexer,
db: r.DB,
table: r.config.Table,
results: make(map[Obj]opResult),
}

for {
// Throttle a bit before reconciliation to allow for a bigger batch to arrive and
// for objects to settle.
Expand Down Expand Up @@ -89,7 +100,7 @@ func (r *reconciler[Obj]) reconcileLoop(ctx context.Context, health cell.Health)

// Perform incremental reconciliation and retries of previously failed
// objects.
errs := r.incremental(ctx, txn, changes)
errs := incremental.run(ctx, txn, changes)

if tableInitialized && (prune || externalPrune) {
if err := r.prune(ctx, txn); err != nil {
Expand Down