Skip to content

Conversation

@qiujian16
Copy link
Member

@qiujian16 qiujian16 commented Nov 11, 2025

Summary

Related issue(s)

Fixes #

Summary by CodeRabbit

  • Refactor

    • Decoupled informer lifecycle from watcher stores; startup helpers now return watcher stores instead of informer objects.
    • Simplified in-memory watcher initialization and removed internal logging from work handling paths.
    • Streamlined watcher store behavior to rely on local in-memory stores.
  • Bug Fixes

    • Manifest encode/decode and update logic consistently use Generation instead of parsing ResourceVersion.
    • Create/Patch flows preserve and propagate generation metadata correctly.
  • Tests

    • Replaced fake informer scaffolding with direct in-memory watcher population.
    • Integration tests updated to use watcher stores and patch-based updates where applicable.

@openshift-ci openshift-ci bot requested a review from deads2k November 11, 2025 02:44
@openshift-ci
Copy link

openshift-ci bot commented Nov 11, 2025

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: qiujian16

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@coderabbitai
Copy link

coderabbitai bot commented Nov 11, 2025

Walkthrough

Decouple AgentInformerWatcherStore from SharedIndexInformer by using an internal cache.Store; migrate store seeding in many tests from informer stores to the watcher store; switch ManifestWork handling to rely on Generation (and propagate deletion timestamps/finalizers) and update related codec, client, and integration test flows to use generation and patch-based updates.

Changes

Cohort / File(s) Summary
Store core changes
pkg/cloudevents/clients/store/informer.go, pkg/cloudevents/clients/work/store/informer.go
Remove informer field and SetInformer; initialize BaseClientWatchStore with cache.NewStore(...); HasInitiated() no longer depends on informer sync; rewrite HandleReceivedResource to copy metadata, preserve Finalizers/Status/DeletionTimestamp/Generation, and apply Update/Delete semantics; remove logger usage.
Unit tests — informer removal & direct store seeding
pkg/cloudevents/clients/addon/client_test.go, pkg/cloudevents/clients/cluster/client_test.go, pkg/cloudevents/clients/csr/client_test.go, pkg/cloudevents/clients/store/informer_test.go, pkg/cloudevents/clients/store/lister_test.go, pkg/cloudevents/clients/work/agent/client/manifestwork_test.go, pkg/cloudevents/clients/work/store/informer_test.go
Remove fake informer factories and SetInformer calls; seed test data directly via watcherStore.Store.Add(...) or watchStore.Add(...); tests now treat watcher store as the authoritative in-memory store.
Agent clientholder / informer wiring
pkg/cloudevents/clients/csr/clientholder.go
Remove fmt import and watcher-store-type enforcement; stop calling SetInformer; NewAgentClientHolder returns ClientHolder without configuring informer on watcher store.
Work store base & generation logic
pkg/cloudevents/clients/work/store/base.go, pkg/cloudevents/generic/clients/agentclient.go
Replace ResourceVersion parsing with Generation-based comparisons; adjust comparison operator (<= → <) for generation checks; remove strconv usage and update log texts to reference generation.
ManifestWork resource versioning & codec
pkg/cloudevents/clients/work/source/client/manifestwork.go, pkg/cloudevents/clients/work/source/codec/manifestbundle.go, pkg/cloudevents/clients/work/source/codec/manifestbundle_test.go
getWorkResourceVersion now returns (resourceVersion, generation, error) and callers set newWork.Generation; Encode/Decode and tests use Generation instead of parsing ResourceVersion; tests updated to expect Generation.
Generic client logging simplification
pkg/cloudevents/generic/clients/baseclient.go
Replace lazy logging closures with direct string arguments in V(5) Info logs.
Integration tests & agent wiring
test/integration/cloudevents/agent/agent.go, test/integration/cloudevents/cloudevents_resync_test.go, test/integration/cloudevents/manifestworkclients_reync_test.go, test/integration/cloudevents/util/work.go, test/integration-test.mk
StartWorkAgent now returns *workstore.AgentInformerWatcherStore instead of informer; callers updated to list/seed via watcher store; resync test uses JSON merge-patch + agentWorkClient.Patch(...) for updates; NewManifestWork sets Generation: 1; test verbosity increased.
Misc tests adjustments
pkg/cloudevents/clients/cluster/client_test.go, pkg/cloudevents/clients/csr/client_test.go, pkg/cloudevents/clients/addon/client_test.go
Replace informer store.Add with watcherStore.Store.Add(...) for seeding objects and remove SetInformer calls.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

  • Attention recommended:
    • pkg/cloudevents/clients/store/informer.go: verify HandleReceivedResource semantics (DeletionTimestamp propagation, finalizers handling, Deleted→Update path) and HasInitiated behavior.
    • pkg/cloudevents/clients/work/store/informer.go and pkg/cloudevents/clients/work/store/base.go: ensure generation-based comparisons and ignore/update logic are consistent.
    • pkg/cloudevents/clients/work/source/client/manifestwork.go and .../codec/manifestbundle.go: validate generation extraction, error propagation from getWorkResourceVersion, and Encode/Decode mapping.
    • Integration tests: confirm JSON merge-patch creation and agentWorkClient.Patch usage align with Kubernetes API expectations and StartWorkAgent signature changes are handled.

Possibly related PRs

Suggested labels

lgtm

Suggested reviewers

  • deads2k

Pre-merge checks and finishing touches

❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is entirely incomplete, containing only the repository template with empty sections and no actual summary or issue reference. Fill in the Summary section explaining the rationale for using local stores instead of informer-based stores, and complete the 'Fixes #' field with the relevant issue number.
Docstring Coverage ⚠️ Warning Docstring coverage is 16.67% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly summarizes the main change: replacing informer-based stores with local stores throughout the agent client implementation across multiple files.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@qiujian16
Copy link
Member Author

/hold

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
test/integration/cloudevents/agent/agent.go (1)

31-37: Remove the unreturned and unused informer.

The informer created at lines 36-37 is never attached to watcherStore (which doesn't have a SetInformer() method like SourceInformerWatcherStore does). The AgentInformerWatcherStore maintains its own independent cache and handles resources via HandleReceivedResource(). This informer is dead code from the refactoring and should be deleted to avoid unnecessary resource consumption.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 105041a and 72a4fae.

📒 Files selected for processing (13)
  • pkg/cloudevents/clients/addon/client_test.go (2 hunks)
  • pkg/cloudevents/clients/cluster/client_test.go (2 hunks)
  • pkg/cloudevents/clients/csr/client_test.go (1 hunks)
  • pkg/cloudevents/clients/csr/clientholder.go (0 hunks)
  • pkg/cloudevents/clients/store/informer.go (2 hunks)
  • pkg/cloudevents/clients/store/informer_test.go (3 hunks)
  • pkg/cloudevents/clients/store/lister_test.go (2 hunks)
  • pkg/cloudevents/clients/work/agent/client/manifestwork_test.go (6 hunks)
  • pkg/cloudevents/clients/work/store/informer.go (1 hunks)
  • pkg/cloudevents/clients/work/store/informer_test.go (1 hunks)
  • test/integration/cloudevents/agent/agent.go (3 hunks)
  • test/integration/cloudevents/cloudevents_resync_test.go (3 hunks)
  • test/integration/cloudevents/manifestworkclients_reync_test.go (2 hunks)
💤 Files with no reviewable changes (1)
  • pkg/cloudevents/clients/csr/clientholder.go
🧰 Additional context used
🧬 Code graph analysis (7)
pkg/cloudevents/clients/work/store/informer.go (2)
pkg/cloudevents/clients/store/base.go (1)
  • BaseClientWatchStore (15-20)
pkg/cloudevents/clients/store/watcher.go (2)
  • Watcher (12-18)
  • NewWatcher (22-34)
test/integration/cloudevents/agent/agent.go (2)
pkg/cloudevents/clients/store/informer.go (1)
  • AgentInformerWatcherStore (25-28)
pkg/cloudevents/clients/work/store/informer.go (1)
  • AgentInformerWatcherStore (92-96)
test/integration/cloudevents/manifestworkclients_reync_test.go (4)
test/integration/cloudevents/agent/agent.go (1)
  • StartWorkAgent (16-40)
pkg/cloudevents/clients/utils/utils.go (1)
  • UID (222-225)
pkg/cloudevents/clients/common/common.go (3)
  • ManifestWorkGR (37-37)
  • CloudEventsOriginalSourceLabelKey (28-28)
  • CloudEventsDataTypeAnnotationKey (14-14)
test/integration/cloudevents/util/work.go (1)
  • NewManifestWorkWithStatus (202-206)
pkg/cloudevents/clients/store/informer.go (4)
pkg/cloudevents/generic/interface.go (1)
  • ResourceObject (20-38)
pkg/cloudevents/clients/store/base.go (1)
  • BaseClientWatchStore (15-20)
pkg/cloudevents/clients/store/watcher.go (2)
  • Watcher (12-18)
  • NewWatcher (22-34)
pkg/cloudevents/clients/work/store/informer.go (2)
  • NewAgentInformerWatcherStore (130-140)
  • AgentInformerWatcherStore (92-96)
pkg/cloudevents/clients/store/lister_test.go (2)
pkg/cloudevents/clients/store/informer.go (1)
  • NewAgentInformerWatcherStore (30-37)
pkg/cloudevents/clients/common/common.go (1)
  • CloudEventsOriginalSourceLabelKey (28-28)
pkg/cloudevents/clients/store/informer_test.go (1)
pkg/cloudevents/clients/store/informer.go (1)
  • NewAgentInformerWatcherStore (30-37)
test/integration/cloudevents/cloudevents_resync_test.go (1)
test/integration/cloudevents/agent/agent.go (1)
  • StartWorkAgent (16-40)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: unit
  • GitHub Check: integration
  • GitHub Check: verify
🔇 Additional comments (17)
pkg/cloudevents/clients/work/store/informer_test.go (1)

514-516: LGTM - Clean simplification of test setup.

The change to seed data directly into store.Store eliminates the need for fake clients and informer factories, making the test more straightforward and aligned with the PR's goal of decoupling from informers.

pkg/cloudevents/clients/work/store/informer.go (1)

130-140: LGTM - Good architectural improvement.

The AgentInformerWatcherStore now initializes with its own cache.Store, eliminating the dependency on external informer setup. This makes the store self-contained and simplifies its lifecycle management.

pkg/cloudevents/clients/csr/client_test.go (1)

77-81: LGTM - Consistent test refactoring.

Switching to watcherStore.Store.Add for test data seeding aligns with the broader refactoring pattern and removes dependency on informer store initialization.

pkg/cloudevents/clients/addon/client_test.go (1)

81-83: LGTM - Simplified test data seeding.

Direct store population is cleaner and removes unnecessary informer dependencies.

pkg/cloudevents/clients/cluster/client_test.go (2)

74-77: LGTM - Test data seeding simplified.

Consistent with the PR's goal of using local store instead of informer-backed stores.


129-131: LGTM - Consistent pattern applied.

The same refactoring pattern is correctly applied in the patch test.

pkg/cloudevents/clients/store/lister_test.go (2)

16-27: LGTM - Clean test simplification.

Direct store population removes unnecessary fake clients and informers while maintaining test coverage.


42-53: LGTM - Consistent refactoring.

The same pattern is correctly applied for the source lister test.

pkg/cloudevents/clients/store/informer_test.go (3)

19-24: LGTM - Consistent test data setup.

Direct store population simplifies test initialization and removes informer dependencies.


64-80: LGTM - Pattern consistently applied.

All test cases in TestList correctly use the direct store approach.


135-140: LGTM - Watch test properly updated.

The watch test correctly seeds data via direct store access, maintaining test functionality.

test/integration/cloudevents/manifestworkclients_reync_test.go (2)

41-42: LGTM - Integration test adapted correctly.

The change to return watchStore from StartWorkAgent aligns with the broader refactoring and enables direct store manipulation in integration tests.


55-64: LGTM - Test data seeding updated consistently.

Both manifest works are correctly seeded using the new direct store approach, maintaining test functionality.

pkg/cloudevents/clients/work/agent/client/manifestwork_test.go (1)

167-167: LGTM! Direct store population simplifies unit tests.

The change from informer-based population to direct watcherStore.Store.Add() is appropriate for unit tests, making them more isolated and easier to maintain.

pkg/cloudevents/clients/store/informer.go (2)

30-37: Store initialization looks correct.

The store now initializes with its own cache.Store instance, eliminating the dependency on external informer setup. This is a cleaner architecture.


129-130: No action needed on HasInitiated() implementation.

The review comment conflates two distinct store implementations. AgentInformerWatcherStore has no SetInformer() method and no initialization tracking requirement—it manages resources directly through Add/Update/Delete and is ready immediately. This differs from SourceInformerWatcherStore, which requires external setup and conditionally checks Initiated && informer.HasSynced().

Returning true unconditionally is correct and consistent with SimpleStore[T], which follows the same pattern. The WaitForStoreInit() function correctly handles multiple implementations with different initialization models. No safety concern exists.

test/integration/cloudevents/cloudevents_resync_test.go (1)

125-136: Good improvement: patch-based status updates.

Using CreateMergePatch and agentWorkClient.Patch is more realistic than directly manipulating the in-memory store. This better reflects production behavior where status updates go through the API.

Signed-off-by: Jian Qiu <[email protected]>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pkg/cloudevents/clients/work/store/informer.go (1)

192-205: Use consistent method for setting deletion timestamp.

Line 204 uses direct field assignment (updatedWork.DeletionTimestamp = work.DeletionTimestamp), while line 184 in the Modified case uses SetDeletionTimestamp(). For consistency and proper pointer handling, use the setter method.

Apply this diff:

 		// we should only update the deletionTimestamp or the local work
 		updatedWork := lastWork.DeepCopy()
-		updatedWork.DeletionTimestamp = work.DeletionTimestamp
+		updatedWork.SetDeletionTimestamp(work.DeletionTimestamp)
 		return s.Update(updatedWork)
🧹 Nitpick comments (1)
pkg/cloudevents/clients/work/store/informer.go (1)

182-185: Clarify misleading comment.

The comment states "prevent the work from being updated if it is deleting", but the code doesn't prevent the update—it proceeds to line 191. The logic actually preserves the deletion timestamp while allowing spec updates.

Consider revising the comment to:

-		// prevent the work from being updated if it is deleting
+		// preserve the deletion timestamp if the work is already deleting
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 72a4fae and 555357d.

⛔ Files ignored due to path filters (15)
  • vendor/modules.txt is excluded by !vendor/**
  • vendor/open-cluster-management.io/api/client/addon/informers/externalversions/addon/interface.go is excluded by !vendor/**
  • vendor/open-cluster-management.io/api/client/addon/informers/externalversions/addon/v1alpha1/addondeploymentconfig.go is excluded by !vendor/**
  • vendor/open-cluster-management.io/api/client/addon/informers/externalversions/addon/v1alpha1/addontemplate.go is excluded by !vendor/**
  • vendor/open-cluster-management.io/api/client/addon/informers/externalversions/addon/v1alpha1/clustermanagementaddon.go is excluded by !vendor/**
  • vendor/open-cluster-management.io/api/client/addon/informers/externalversions/addon/v1alpha1/interface.go is excluded by !vendor/**
  • vendor/open-cluster-management.io/api/client/addon/informers/externalversions/addon/v1alpha1/managedclusteraddon.go is excluded by !vendor/**
  • vendor/open-cluster-management.io/api/client/addon/informers/externalversions/factory.go is excluded by !vendor/**
  • vendor/open-cluster-management.io/api/client/addon/informers/externalversions/generic.go is excluded by !vendor/**
  • vendor/open-cluster-management.io/api/client/addon/informers/externalversions/internalinterfaces/factory_interfaces.go is excluded by !vendor/**
  • vendor/open-cluster-management.io/api/client/addon/listers/addon/v1alpha1/addondeploymentconfig.go is excluded by !vendor/**
  • vendor/open-cluster-management.io/api/client/addon/listers/addon/v1alpha1/addontemplate.go is excluded by !vendor/**
  • vendor/open-cluster-management.io/api/client/addon/listers/addon/v1alpha1/clustermanagementaddon.go is excluded by !vendor/**
  • vendor/open-cluster-management.io/api/client/addon/listers/addon/v1alpha1/expansion_generated.go is excluded by !vendor/**
  • vendor/open-cluster-management.io/api/client/addon/listers/addon/v1alpha1/managedclusteraddon.go is excluded by !vendor/**
📒 Files selected for processing (3)
  • pkg/cloudevents/clients/work/source/codec/manifestbundle.go (2 hunks)
  • pkg/cloudevents/clients/work/store/informer.go (2 hunks)
  • pkg/cloudevents/generic/clients/baseclient.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • pkg/cloudevents/clients/work/source/codec/manifestbundle.go
  • pkg/cloudevents/generic/clients/baseclient.go
🧰 Additional context used
🧬 Code graph analysis (1)
pkg/cloudevents/clients/work/store/informer.go (3)
pkg/cloudevents/clients/store/base.go (1)
  • BaseClientWatchStore (15-20)
vendor/open-cluster-management.io/api/work/v1/types.go (1)
  • ManifestWork (18-28)
pkg/cloudevents/clients/store/watcher.go (2)
  • Watcher (12-18)
  • NewWatcher (22-34)
🔇 Additional comments (4)
pkg/cloudevents/clients/work/store/informer.go (4)

96-124: LGTM!

The versioner implementation correctly tracks per-resource versions with proper locking. This is necessary now that the store is no longer backed by an informer that would provide resource versions automatically.


140-165: LGTM!

The versioning logic correctly assigns incrementing resource versions to each Add/Update operation and cleans up on Delete. This compensates for the absence of informer-managed versions.


204-204: Add nil-check for DeletionTimestamp in Deleted action.

The work.DeletionTimestamp field might be nil in the Deleted action. Setting a nil deletion timestamp could lead to unexpected behavior.

Verify whether the source guarantees a non-nil DeletionTimestamp in the Deleted action, or add a defensive check:

if work.DeletionTimestamp == nil {
    return fmt.Errorf("received Deleted action for %s/%s with nil DeletionTimestamp", work.Namespace, work.Name)
}

128-138: The review comment is based on an incorrect assumption.

The HasInitiated() method is defined in the parent struct (pkg/cloudevents/clients/store/informer.go, lines 129-131) and is hardcoded to return true. It does not check the Initiated field. Therefore, whether Initiated defaults to false is irrelevant to the function's behavior, and no action is needed regarding this concern.

Likely an incorrect or invalid review comment.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
pkg/cloudevents/generic/clients/baseclient.go (2)

82-82: Fix typo in comment.

The comment contains "try agin" which should be "try again".

Apply this diff:

-					// failed to reconnect, try agin
+					// failed to reconnect, try again

146-146: Consider checking log level before evaluating evt.String().

The direct call to evt.String() is now evaluated eagerly, even when V(5) logging is disabled. Since CloudEvent serialization can be expensive for large events and V(5) is typically disabled in production, this may introduce unnecessary overhead in high-throughput scenarios.

Consider checking if the log level is enabled before evaluating:

For line 146:

-	logger.V(5).Info("Sending event", "event", evt.String())
+	if klog.V(5).Enabled() {
+		logger.V(5).Info("Sending event", "event", evt.String())
+	}

For line 177:

-						logger.V(5).Info("Received event", "event", evt.String())
+						if klog.V(5).Enabled() {
+							logger.V(5).Info("Received event", "event", evt.String())
+						}

Alternatively, if the previous closure-based approach was working well, consider reverting to that pattern to ensure lazy evaluation.

Also applies to: 177-177

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 555357d and a6ea87b.

📒 Files selected for processing (12)
  • pkg/cloudevents/clients/store/informer.go (3 hunks)
  • pkg/cloudevents/clients/work/source/client/manifestwork.go (4 hunks)
  • pkg/cloudevents/clients/work/source/codec/manifestbundle_test.go (3 hunks)
  • pkg/cloudevents/clients/work/store/base.go (1 hunks)
  • pkg/cloudevents/clients/work/store/informer.go (2 hunks)
  • pkg/cloudevents/generic/clients/agentclient.go (1 hunks)
  • pkg/cloudevents/generic/clients/agentclient_test.go (1 hunks)
  • pkg/cloudevents/generic/clients/baseclient.go (2 hunks)
  • test/integration-test.mk (1 hunks)
  • test/integration/cloudevents/cloudevents_resync_test.go (3 hunks)
  • test/integration/cloudevents/manifestworkclients_reync_test.go (1 hunks)
  • test/integration/cloudevents/util/work.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • test/integration/cloudevents/manifestworkclients_reync_test.go
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-16T02:22:20.929Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/generic/options/grpc/protocol/protocol.go:200-213
Timestamp: 2025-09-16T02:22:20.929Z
Learning: In the GRPC CloudEvents protocol implementation, when startEventsReceiver encounters a stream error, it sends the error to reconnectErrorChan. The consumer of this channel handles the error by calling Close() on the protocol, which triggers close(p.closeChan), causing OpenInbound to unblock and call cancel() to properly terminate both the events receiver and heartbeat watcher goroutines.

Applied to files:

  • pkg/cloudevents/generic/clients/baseclient.go
🧬 Code graph analysis (6)
pkg/cloudevents/generic/clients/agentclient.go (1)
pkg/cloudevents/generic/types/types.go (1)
  • Modified (94-94)
pkg/cloudevents/clients/store/informer.go (4)
pkg/cloudevents/generic/interface.go (1)
  • ResourceObject (20-38)
pkg/cloudevents/clients/store/base.go (1)
  • BaseClientWatchStore (15-20)
pkg/cloudevents/clients/store/watcher.go (2)
  • Watcher (12-18)
  • NewWatcher (22-34)
pkg/cloudevents/clients/work/store/informer.go (2)
  • NewAgentInformerWatcherStore (128-138)
  • AgentInformerWatcherStore (90-94)
pkg/cloudevents/clients/work/source/client/manifestwork.go (2)
vendor/open-cluster-management.io/api/work/v1/types.go (1)
  • ManifestWork (18-28)
pkg/cloudevents/clients/common/common.go (1)
  • CloudEventsResourceVersionAnnotationKey (20-20)
test/integration/cloudevents/cloudevents_resync_test.go (2)
test/integration/cloudevents/agent/agent.go (1)
  • StartWorkAgent (16-40)
pkg/cloudevents/clients/utils/utils.go (1)
  • Patch (44-82)
pkg/cloudevents/generic/clients/agentclient_test.go (1)
pkg/cloudevents/clients/utils/utils.go (1)
  • UID (222-225)
pkg/cloudevents/clients/work/store/informer.go (2)
pkg/cloudevents/clients/store/base.go (1)
  • BaseClientWatchStore (15-20)
pkg/cloudevents/clients/store/watcher.go (2)
  • Watcher (12-18)
  • NewWatcher (22-34)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: verify
  • GitHub Check: integration
  • GitHub Check: unit
🔇 Additional comments (20)
pkg/cloudevents/clients/work/source/client/manifestwork.go (3)

7-7: LGTM!

The strconv import is correctly added to support parsing the generation value from the annotation string.


92-97: Proper error handling for multi-value return.

The code correctly handles the new signature of getWorkResourceVersion, propagates errors, and sets both Generation and ResourceVersion fields. However, there are critical bugs in getWorkResourceVersion itself (see comments on lines 318-341).


283-288: Consistent with Create function changes.

The implementation mirrors the Create function's handling of the new getWorkResourceVersion signature. Error propagation and field assignment are correct, but the same bugs in getWorkResourceVersion affect this code path (see comments on lines 318-341).

pkg/cloudevents/generic/clients/agentclient_test.go (1)

518-519: Original review comment is incorrect and should be disregarded.

The agent client's specAction() method implements Generation-based change detection: it compares incoming and stored resource generations at line 307-308 (if obj.GetGeneration() < lastObj.GetGeneration() { return evt, nil }). When the incoming event's generation is older than the stored resource's generation, it returns an empty action, which causes the receive() function to exit early without calling handlers.

The concern about inverted ResourceVersion ordering is unfounded—ResourceVersion is not used for change detection. The test correctly validates that older-generation events are ignored, regardless of ResourceVersion values.

test/integration-test.mk (1)

34-34: LGTM!

Adding verbose logging (-v=5) to the cloudevents integration test improves debugging capabilities without affecting functionality.

test/integration/cloudevents/util/work.go (1)

178-183: LGTM!

Adding Generation: 1 aligns with the PR's shift from ResourceVersion to Generation-based semantics for change detection. This is consistent with Kubernetes conventions where Generation tracks spec changes.

pkg/cloudevents/clients/work/source/codec/manifestbundle_test.go (3)

207-208: LGTM!

Test expectations correctly updated to use Generation: 13 instead of ResourceVersion: "13", aligning with the PR's shift to Generation-based semantics.


228-234: LGTM!

Test data correctly reflects Generation-based semantics with appropriate field types (int64 for Generation).


293-297: LGTM!

Expected work object correctly uses Generation: 13 consistent with the updated codec implementation.

pkg/cloudevents/clients/work/store/base.go (1)

119-122: LGTM!

The replacement of ResourceVersion string parsing with direct Generation comparison simplifies the code and eliminates error handling for string-to-int conversion. This aligns with the PR's shift to Generation-based semantics.

pkg/cloudevents/clients/work/store/informer.go (2)

131-135: LGTM!

Initializing Store with cache.NewStore(cache.MetaNamespaceKeyFunc) properly establishes a local cache instead of relying on an external informer, consistent with the PR's goal to decouple from SharedIndexInformer.


180-189: LGTM!

The refactored logic efficiently creates updatedWork once via DeepCopy() and reuses it for both DeletionTimestamp preservation and Finalizers/Status restoration, improving code clarity and performance.

test/integration/cloudevents/cloudevents_resync_test.go (3)

83-86: LGTM!

The updated signature reflects the architectural shift: StartWorkAgent now returns watchStore instead of an informer, aligning with the PR's goal to use local stores rather than informer-backed stores.


90-97: LGTM!

Replacing informer-based lister.List() with watchStore.List() is consistent with the local store architecture and correctly accesses list.Items for the length check.


114-128: LGTM!

The migration to patch-based updates via agentWorkClient.Patch is more realistic and robust:

  • Uses standard JSON merge patch construction
  • Goes through the actual client interface
  • Follows Kubernetes patching conventions

This is a better testing approach than directly manipulating an in-memory store.

pkg/cloudevents/clients/store/informer.go (4)

19-20: LGTM!

Comment accurately reflects the architectural change to using a local store instead of an informer-backed store.


30-34: LGTM!

Proper initialization with a concrete cache.Store enables the local store architecture, eliminating the dependency on external informers.


62-77: LGTM!

The accessor-based approach for extracting namespace/name is more generic and correctly preserves the DeletionTimestamp when the resource is being deleted.


124-124: Review comment is incorrect and conflates different store types.

The review incorrectly attributes behavior from SourceInformerWatcherStore (work/store package) to AgentInformerWatcherStore (clients/store package). These are distinct types with different semantics:

  • SourceInformerWatcherStore (line 67, work/store): Returns s.Initiated && s.informer.HasSynced()—must wait for remote cache sync. The tests shown (informer_test.go:485–498) verify this behavior.
  • AgentInformerWatcherStore[T] (line 123, clients/store): Returns true unconditionally—uses local in-memory store, immediately usable upon creation.

The unconditional true return at line 124 is correct; agent stores don't depend on cache synchronization. Resync logic in options/generic.go (lines 164, 236) still executes as intended when WaitForStoreInit immediately succeeds.

Likely an incorrect or invalid review comment.

pkg/cloudevents/generic/clients/agentclient.go (1)

301-311: Verify the semantic change in generation comparison logic is intentional and document the impact.

The review concern is valid. The code shows a semantic behavior change where equal generations now return types.Modified instead of being skipped:

if obj.GetGeneration() < lastObj.GetGeneration() {
    return evt, nil  // skip if incoming is older
}
return types.Modified, nil  // process for equal OR newer generations

Key findings:

  • Behavior change confirmed: Equal generations now trigger modification handling (fall through to types.Modified)
  • No downstream safeguards: ResourceHandlers receive the action but have no built-in deduplication logic
  • Contradiction with comment: The "message broker guarantees order of messages" comment suggests equal generations should be skipped, not reprocessed
  • Missing test coverage: No explicit test found for equal generation scenario in agentclient_test.go

This violates the Kubernetes pattern where generation == previousGeneration means "no spec change, no work needed." Handlers relying on idempotency assumptions could duplicate processing if the same generation arrives twice.

Confirm this change was intentional (bug fix vs. regression), document why equal generations should trigger Modified, and ensure handlers account for repeated events of the same generation.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pkg/cloudevents/generic/clients/agentclient.go (1)

303-311: The change from <= to < breaks deduplication for equal non-zero generations.

The code now returns "MODIFIED" (line 311) instead of empty string for equal non-zero generations. At lines 210-211, an empty action is explicitly ignored (if len(action) == 0), but non-empty actions invoke handlers at lines 216-218.

Old behavior (<=): Equal generations → empty action → handlers NOT invoked
New behavior (<): Equal generations → "MODIFIED" → handlers ARE invoked

In Kubernetes, generation only increments when spec changes. Equal generations indicate the spec is unchanged. Invoking handlers for identical specs violates deduplication semantics and will cause unnecessary handler invocations that expect actual modifications.

While the message broker guarantees ordering (per comment on lines 301-302), this change breaks the generation-based deduplication that prevents duplicate processing.

Either revert to <= or add explicit logic to handle equal generations: if obj.GetGeneration() == lastObj.GetGeneration() && obj.GetGeneration() != 0 { return evt, nil } before the < check.

♻️ Duplicate comments (1)
pkg/cloudevents/clients/work/source/client/manifestwork.go (1)

324-338: Fix generation parsing and resourceVersion fallback.

Two blockers here:

  • strconv.ParseInt(..., 16) truncates any generation above int16, so valid values overflow as soon as generation exceeds 32767.
  • The fallback block always overwrites an annotation-provided resourceVersion with "0", defeating the annotation entirely.

Please switch to 64-bit parsing and only fall back to the stored resourceVersion or "0" when the annotation is actually empty.

Apply this diff:

-		generation, err = strconv.ParseInt(resourceVersion, 10, 16)
+		generation, err = strconv.ParseInt(resourceVersion, 10, 64)
 		if err != nil {
 			return "", 0, errors.NewInternalError(err)
 		}
 	}

 	if generation == 0 {
 		generation = work.Generation
 	}

-	if len(resourceVersion) == 0 && len(work.ResourceVersion) != 0 {
-		resourceVersion = work.ResourceVersion
-	} else {
-		resourceVersion = "0"
-	}
+	if len(resourceVersion) == 0 {
+		if len(work.ResourceVersion) != 0 {
+			resourceVersion = work.ResourceVersion
+		} else {
+			resourceVersion = "0"
+		}
+	}
🧹 Nitpick comments (1)
pkg/cloudevents/generic/clients/agentclient_test.go (1)

518-519: Consider clarifying the test data or adding explanatory comments.

The cached resources have Generation 3 with ResourceVersion "1", while the incoming event has Generation 2 with ResourceVersion "2". This creates a counterintuitive state where the cached resource has a higher Generation but lower ResourceVersion, which wouldn't occur in real Kubernetes (ResourceVersion increases monotonically).

While this appears intentional to verify that Generation-based comparison takes precedence over ResourceVersion, the test data may confuse future maintainers. Additionally, both test1 and test2 have identical values, but only test1 is validated—test2's values seem unnecessary.

Consider one of the following:

Option 1: Use consistent increasing values

 resources: []*generictesting.MockResource{
-    {UID: kubetypes.UID("test1"), Generation: 3, ResourceVersion: "1", Namespace: "cluster1"},
-    {UID: kubetypes.UID("test2"), Generation: 3, ResourceVersion: "1", Namespace: "cluster1"},
+    {UID: kubetypes.UID("test1"), Generation: 3, ResourceVersion: "3", Namespace: "cluster1"},
+    {UID: kubetypes.UID("test2"), Generation: 2, ResourceVersion: "2", Namespace: "cluster1"},
 },

Option 2: Add a clarifying comment

+// test1 has higher Generation (3 > 2) than incoming to verify Generation-based staleness detection
+// ResourceVersion is intentionally lower to confirm it's not used for comparison
 resources: []*generictesting.MockResource{
     {UID: kubetypes.UID("test1"), Generation: 3, ResourceVersion: "1", Namespace: "cluster1"},
     {UID: kubetypes.UID("test2"), Generation: 3, ResourceVersion: "1", Namespace: "cluster1"},
 },
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a6ea87b and e7ee2c4.

📒 Files selected for processing (12)
  • pkg/cloudevents/clients/store/informer.go (4 hunks)
  • pkg/cloudevents/clients/work/source/client/manifestwork.go (4 hunks)
  • pkg/cloudevents/clients/work/source/codec/manifestbundle_test.go (3 hunks)
  • pkg/cloudevents/clients/work/store/base.go (1 hunks)
  • pkg/cloudevents/clients/work/store/informer.go (3 hunks)
  • pkg/cloudevents/generic/clients/agentclient.go (1 hunks)
  • pkg/cloudevents/generic/clients/agentclient_test.go (1 hunks)
  • pkg/cloudevents/generic/clients/baseclient.go (2 hunks)
  • test/integration-test.mk (1 hunks)
  • test/integration/cloudevents/cloudevents_resync_test.go (3 hunks)
  • test/integration/cloudevents/manifestworkclients_reync_test.go (1 hunks)
  • test/integration/cloudevents/util/work.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
  • pkg/cloudevents/generic/clients/baseclient.go
  • test/integration-test.mk
  • test/integration/cloudevents/manifestworkclients_reync_test.go
  • pkg/cloudevents/clients/work/store/informer.go
  • test/integration/cloudevents/util/work.go
🧰 Additional context used
🧬 Code graph analysis (5)
pkg/cloudevents/generic/clients/agentclient_test.go (1)
pkg/cloudevents/clients/utils/utils.go (1)
  • UID (222-225)
pkg/cloudevents/generic/clients/agentclient.go (1)
pkg/cloudevents/generic/types/types.go (1)
  • Modified (94-94)
pkg/cloudevents/clients/store/informer.go (4)
pkg/cloudevents/generic/interface.go (1)
  • ResourceObject (20-38)
pkg/cloudevents/clients/store/base.go (1)
  • BaseClientWatchStore (15-20)
pkg/cloudevents/clients/store/watcher.go (2)
  • Watcher (12-18)
  • NewWatcher (22-34)
pkg/cloudevents/clients/work/store/informer.go (2)
  • NewAgentInformerWatcherStore (128-138)
  • AgentInformerWatcherStore (90-94)
test/integration/cloudevents/cloudevents_resync_test.go (2)
test/integration/cloudevents/agent/agent.go (1)
  • StartWorkAgent (16-40)
pkg/cloudevents/clients/utils/utils.go (1)
  • Patch (44-82)
pkg/cloudevents/clients/work/source/client/manifestwork.go (1)
pkg/cloudevents/clients/common/common.go (1)
  • CloudEventsResourceVersionAnnotationKey (20-20)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: integration
  • GitHub Check: unit
  • GitHub Check: verify
🔇 Additional comments (4)
pkg/cloudevents/clients/work/source/codec/manifestbundle_test.go (3)

228-234: Metadata test data updated consistently.

The metadata JSON now uses Generation: 13 instead of ResourceVersion: "13", which is consistent with the broader migration. The metadata structure correctly includes all necessary fields (UID, Name, Namespace, Labels, Annotations, Finalizers).


293-297: Expected work metadata updated consistently.

The expected decoded work now uses Generation: 13 matching the metadata JSON setup at lines 228-234. The test expectations are internally consistent and align with the Generation-based approach.


207-208: No issues found - code correctly implements intentional design.

The codec implementation confirms the semantic mapping is intentional and working correctly:

  • Encode (line 42): WithResourceVersion(work.Generation) sets "resourceversion" extension to the Generation value
  • Decode (line 100-108): reads "resourceversion" extension and assigns it to metaObj.Generation

This pattern is consistent across the codebase and aligns with the test expectations. The test changes at lines 207-208, 228-234, and 293-297 correctly verify that decoded works receive the Generation field populated from the "resourceversion" extension. The design uses Generation (not ResourceVersion field) for ManifestWork metadata, which is semantically appropriate for this codec's purpose.

pkg/cloudevents/generic/clients/agentclient.go (1)

301-301: LGTM! Comment now accurately reflects generation-based semantics.

The updated comment correctly describes the generation-based comparison, aligning with the PR's shift from resource version to generation semantics.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (1)
pkg/cloudevents/clients/work/source/client/manifestwork.go (1)

334-338: Critical: Logic bug overwrites annotation value with "0".

Despite the past review comment claiming this was addressed, the logic still has a critical flaw. When the annotation contains a value (resourceVersion is not empty from line 322-328), the condition on line 334 evaluates to false, causing execution to fall through to line 337 which sets resourceVersion = "0", overwriting the annotation value.

Per the function comment (lines 313-317), the intended behavior is to prefer the annotation value, then fall back to work.ResourceVersion, then "0". The current code inverts this priority.

Apply this diff to fix the logic:

-	if len(resourceVersion) == 0 && len(work.ResourceVersion) != 0 {
-		resourceVersion = work.ResourceVersion
-	} else {
+	if len(resourceVersion) == 0 {
+		if len(work.ResourceVersion) != 0 {
+			resourceVersion = work.ResourceVersion
+		} else {
+			resourceVersion = "0"
+		}
+	}
+
-		resourceVersion = "0"
-	}
-
 	return resourceVersion, generation, nil
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e7ee2c4 and 02f0223.

📒 Files selected for processing (12)
  • pkg/cloudevents/clients/store/informer.go (4 hunks)
  • pkg/cloudevents/clients/work/source/client/manifestwork.go (4 hunks)
  • pkg/cloudevents/clients/work/source/codec/manifestbundle_test.go (3 hunks)
  • pkg/cloudevents/clients/work/store/base.go (1 hunks)
  • pkg/cloudevents/clients/work/store/informer.go (3 hunks)
  • pkg/cloudevents/generic/clients/agentclient.go (1 hunks)
  • pkg/cloudevents/generic/clients/agentclient_test.go (1 hunks)
  • pkg/cloudevents/generic/clients/baseclient.go (2 hunks)
  • test/integration-test.mk (1 hunks)
  • test/integration/cloudevents/cloudevents_resync_test.go (3 hunks)
  • test/integration/cloudevents/manifestworkclients_reync_test.go (1 hunks)
  • test/integration/cloudevents/util/work.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
  • test/integration/cloudevents/util/work.go
  • pkg/cloudevents/generic/clients/baseclient.go
  • pkg/cloudevents/clients/work/source/codec/manifestbundle_test.go
  • test/integration-test.mk
  • pkg/cloudevents/generic/clients/agentclient.go
🧰 Additional context used
🧬 Code graph analysis (5)
pkg/cloudevents/clients/work/store/informer.go (1)
pkg/cloudevents/clients/store/base.go (1)
  • BaseClientWatchStore (15-20)
test/integration/cloudevents/manifestworkclients_reync_test.go (5)
test/integration/cloudevents/agent/agent.go (1)
  • StartWorkAgent (16-40)
pkg/cloudevents/clients/utils/utils.go (1)
  • UID (222-225)
pkg/cloudevents/clients/common/common.go (3)
  • ManifestWorkGR (37-37)
  • CloudEventsOriginalSourceLabelKey (28-28)
  • CloudEventsDataTypeAnnotationKey (14-14)
test/integration/cloudevents/util/work.go (1)
  • NewManifestWorkWithStatus (203-207)
pkg/cloudevents/clients/work/payload/manifestbundle.go (1)
  • ManifestBundleEventDataType (11-15)
pkg/cloudevents/clients/store/informer.go (4)
pkg/cloudevents/generic/interface.go (1)
  • ResourceObject (20-38)
pkg/cloudevents/clients/store/base.go (1)
  • BaseClientWatchStore (15-20)
pkg/cloudevents/clients/store/watcher.go (2)
  • Watcher (12-18)
  • NewWatcher (22-34)
pkg/cloudevents/clients/work/store/informer.go (2)
  • NewAgentInformerWatcherStore (128-138)
  • AgentInformerWatcherStore (90-94)
pkg/cloudevents/clients/work/source/client/manifestwork.go (1)
pkg/cloudevents/clients/common/common.go (1)
  • CloudEventsResourceVersionAnnotationKey (20-20)
test/integration/cloudevents/cloudevents_resync_test.go (1)
test/integration/cloudevents/agent/agent.go (1)
  • StartWorkAgent (16-40)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: integration
  • GitHub Check: unit
  • GitHub Check: verify
🔇 Additional comments (6)
pkg/cloudevents/clients/store/informer.go (4)

28-35: LGTM! Store initialization is now explicit and self-contained.

The constructor properly initializes the Store field with cache.NewStore(cache.MetaNamespaceKeyFunc), aligning with the decoupling from SharedIndexInformer.


76-78: Deletion timestamp preservation is correct.

The logic correctly preserves the deletion timestamp from the last stored object when processing Modified events, ensuring that once an object is marked for deletion, subsequent updates maintain that state.


109-120: Finalizer-aware deletion handling is well-designed.

The logic correctly handles the case where a deletion event is received but finalizers remain on the object. By triggering an Update event with the new generation, finalizers, and deletion timestamp, watchers are notified that the object is entering deletion state without prematurely removing it from the store.


132-134: No issues found with HasInitiated() change.

HasInitiated() unconditionally returning true is correct for AgentInformerWatcherStore[T]. Verification shows:

  • All existing tests create the store and seed it directly without checking HasInitiated() expectations
  • This pattern matches SimpleStore[T], which already returns true unconditionally
  • Callers in generic.go use HasInitiated() for resync timing, not strict sync guarantees
  • Unlike SourceInformerWatcherStore (which has external informer sync), AgentInformerWatcherStore initializes directly and needs no external synchronization
test/integration/cloudevents/manifestworkclients_reync_test.go (1)

41-64: LGTM!

The migration from informer-based store to direct watchStore usage is clean and consistent with the PR's objectives to decouple from SharedIndexInformer.

test/integration/cloudevents/cloudevents_resync_test.go (1)

83-128: LGTM!

The migration to watchStore and the addition of proper JSON merge patch logic for updates are both correct and align well with Kubernetes client patterns.

Comment on lines +518 to +519
{UID: kubetypes.UID("test1"), Generation: 3, ResourceVersion: "1", Namespace: "cluster1"},
{UID: kubetypes.UID("test2"), Generation: 3, ResourceVersion: "1", Namespace: "cluster1"},
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Verify test data semantics and consider adding clarifying comments.

The test data has Generation: 3 with ResourceVersion: "1", which violates typical Kubernetes semantics where ResourceVersion should increase with every update (if Generation is 3, ResourceVersion should be ≥ 3). While this may be intentional to verify that Generation-based comparison takes priority over ResourceVersion (incoming Generation 2 < cached Generation 3, so no action despite incoming ResourceVersion "2" > cached "1"), the unrealistic combination could confuse future maintainers.

Additionally:

  1. The test name "no change resource" is misleading—it's actually testing "stale/older resource rejection based on Generation"
  2. The test2 resource appears unused in this scenario since only test1 is referenced in the incoming event

Consider:

  • Adding a comment explaining why Generation: 3 with ResourceVersion: "1" is used
  • Renaming the test to better reflect its purpose (e.g., "reject older generation resource")
  • Using more realistic test data unless the unrealistic combination is specifically required

Run the following script to verify how Generation is used for comparison in the implementation:


🏁 Script executed:

#!/bin/bash
# Description: Find the implementation that compares Generation values to understand the test scenario

# Search for Generation comparison logic in agent client implementation
rg -n -A5 -B5 'Generation' pkg/cloudevents/generic/clients/agentclient.go

# Search for how resources are compared to determine if updates should be applied
ast-grep --pattern $'func $_($$$ ) HandleReceivedResource($$$) {
  $$$
}'

Length of output: 950


🏁 Script executed:

#!/bin/bash
# Search for other test cases with Generation values to understand data patterns
rg -n 'Generation:' pkg/cloudevents/generic/clients/agentclient_test.go -A1 -B1

# Look at the full test function to see if test2 is referenced elsewhere
sed -n '497,530p' pkg/cloudevents/generic/clients/agentclient_test.go

# Check if there are comments about the test's purpose
sed -n '490,530p' pkg/cloudevents/generic/clients/agentclient_test.go

Length of output: 281


@qiujian16 qiujian16 force-pushed the client-local-store branch 2 times, most recently from 8d8887e to 3a565ed Compare November 11, 2025 14:07
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
pkg/cloudevents/generic/clients/agentclient.go (1)

307-311: Revert comparison operator from < to <= at line 307.

The change from <= to < introduces a correctness bug. In Kubernetes semantics, the generation field increments only when the spec changes. Equal generation means the spec is unchanged and should not trigger the Modified action.

With the current code (<), when obj.GetGeneration() == lastObj.GetGeneration(), the condition is false and execution falls through to return types.Modified, causing handlers to be invoked unnecessarily. This results in:

  • Duplicate processing of unchanged objects
  • Unnecessary updates to the local store
  • Wasted handler invocations for redelivered messages

The original logic (<=) correctly returns an empty action for equal generations, which causes the handler loop to be skipped (line 206: if len(action) == 0 { return }).

Additionally, there are no test cases validating the behavior when generations are equal, which is a test coverage gap.

pkg/cloudevents/clients/store/informer.go (1)

86-122: Remove the generation update from the deletion finalizer logic.

When an object is being deleted with finalizers, Kubernetes does not update the metadata.generation field. Generation is only advanced by the API server for changes to the object's spec (or other type-specific rules); updating it during deletion contradicts Kubernetes semantics.

Line 118 should be removed:

accessor.SetDeletionTimestamp(newObj.GetDeletionTimestamp())
accessor.SetFinalizers(newObj.GetFinalizers())
// Remove: accessor.SetGeneration(newObj.GetGeneration())
return s.Update(deletingObj)
♻️ Duplicate comments (1)
pkg/cloudevents/clients/work/source/client/manifestwork.go (1)

334-338: Critical: Logic bug overwrites annotation value with "0" — issue persists.

This is the same critical bug previously flagged in past reviews (marked as addressed in commit e7ee2c4), but it still exists in the current code. When the annotation contains a value, line 334's condition evaluates to false (because len(resourceVersion) == 0 is false), causing execution to fall through to line 337 which sets resourceVersion = "0", overwriting the annotation value.

Per the function comment (lines 313-317), the intended priority is: annotation value → work.ResourceVersion → "0". The current code inverts this priority.

Apply this diff to fix the logic:

-	if len(resourceVersion) == 0 && len(work.ResourceVersion) != 0 {
-		resourceVersion = work.ResourceVersion
-	} else {
+	if len(resourceVersion) == 0 {
+		if len(work.ResourceVersion) != 0 {
+			resourceVersion = work.ResourceVersion
+		} else {
+			resourceVersion = "0"
+		}
+	}
+
-		resourceVersion = "0"
-	}
-
 	return resourceVersion, generation, nil
🧹 Nitpick comments (2)
pkg/cloudevents/generic/clients/baseclient.go (1)

146-146: Consider using lazy evaluation for expensive trace logging.

The logging statements now call evt.String() unconditionally, even when V(5) logging is disabled. Since evt.String() may be expensive (especially with large event payloads) and these are hot paths, consider using closures for lazy evaluation to avoid unnecessary computation when trace logging is disabled.

Example for line 146:

-logger.V(5).Info("Sending event", "event", evt.String())
+logger.V(5).InfoS("Sending event", "event", func() string { return evt.String() })

Note: Verify that the logging library supports closure-based lazy evaluation. If not using structured logging that supports this pattern, consider checking the log level before calling evt.String():

-logger.V(5).Info("Sending event", "event", evt.String())
+if logger.V(5).Enabled() {
+    logger.Info("Sending event", "event", evt.String())
+}

Apply the same pattern to line 177.

Also applies to: 177-177

test/integration/cloudevents/manifestworkclients_reync_test.go (1)

48-55: Inconsistent versioning approach between work1 and work2.

work1 uses ResourceVersion = "1" (line 52), while work2 uses Generation = 1 (line 61). This mixed approach may be intentional to test backward compatibility, but if the PR's goal is to migrate from ResourceVersion to Generation, consider using Generation consistently for both works.

If Generation is the new standard, apply this diff for consistency:

 work1.UID = apitypes.UID(work1UID)
-work1.ResourceVersion = "1"
+work1.Generation = 1
 work1.Labels = map[string]string{common.CloudEventsOriginalSourceLabelKey: sourceID}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 02f0223 and 8d8887e.

📒 Files selected for processing (12)
  • pkg/cloudevents/clients/store/informer.go (4 hunks)
  • pkg/cloudevents/clients/work/source/client/manifestwork.go (4 hunks)
  • pkg/cloudevents/clients/work/source/codec/manifestbundle_test.go (3 hunks)
  • pkg/cloudevents/clients/work/store/base.go (1 hunks)
  • pkg/cloudevents/clients/work/store/informer.go (3 hunks)
  • pkg/cloudevents/generic/clients/agentclient.go (1 hunks)
  • pkg/cloudevents/generic/clients/agentclient_test.go (1 hunks)
  • pkg/cloudevents/generic/clients/baseclient.go (2 hunks)
  • test/integration-test.mk (1 hunks)
  • test/integration/cloudevents/cloudevents_resync_test.go (3 hunks)
  • test/integration/cloudevents/manifestworkclients_reync_test.go (1 hunks)
  • test/integration/cloudevents/util/work.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
  • test/integration-test.mk
  • pkg/cloudevents/generic/clients/agentclient_test.go
  • test/integration/cloudevents/util/work.go
  • pkg/cloudevents/clients/work/store/informer.go
  • pkg/cloudevents/clients/work/source/codec/manifestbundle_test.go
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-11-11T13:27:36.291Z
Learnt from: morvencao
Repo: open-cluster-management-io/sdk-go PR: 162
File: pkg/cloudevents/generic/options/pubsub/options.go:157-174
Timestamp: 2025-11-11T13:27:36.291Z
Learning: For open-cluster-management PubSub transport in pkg/cloudevents/generic/options/pubsub: broadcast topics (SourceBroadcast, AgentBroadcast) and their corresponding subscriptions are always required, not optional. The omitempty tags on types.Topics broadcast fields exist because the struct is shared with MQTT (where broadcasts are optional), but PubSub requires all broadcast channels for resync functionality.

Applied to files:

  • pkg/cloudevents/generic/clients/baseclient.go
📚 Learning: 2025-09-16T02:22:20.929Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/generic/options/grpc/protocol/protocol.go:200-213
Timestamp: 2025-09-16T02:22:20.929Z
Learning: In the GRPC CloudEvents protocol implementation, when startEventsReceiver encounters a stream error, it sends the error to reconnectErrorChan. The consumer of this channel handles the error by calling Close() on the protocol, which triggers close(p.closeChan), causing OpenInbound to unblock and call cancel() to properly terminate both the events receiver and heartbeat watcher goroutines.

Applied to files:

  • pkg/cloudevents/generic/clients/baseclient.go
🧬 Code graph analysis (5)
pkg/cloudevents/generic/clients/agentclient.go (1)
pkg/cloudevents/generic/types/types.go (1)
  • Modified (94-94)
test/integration/cloudevents/manifestworkclients_reync_test.go (5)
test/integration/cloudevents/agent/agent.go (1)
  • StartWorkAgent (16-40)
pkg/cloudevents/clients/utils/utils.go (1)
  • UID (222-225)
pkg/cloudevents/clients/common/common.go (3)
  • ManifestWorkGR (37-37)
  • CloudEventsOriginalSourceLabelKey (28-28)
  • CloudEventsDataTypeAnnotationKey (14-14)
test/integration/cloudevents/util/work.go (1)
  • NewManifestWorkWithStatus (203-207)
pkg/cloudevents/clients/work/payload/manifestbundle.go (1)
  • ManifestBundleEventDataType (11-15)
test/integration/cloudevents/cloudevents_resync_test.go (1)
test/integration/cloudevents/agent/agent.go (1)
  • StartWorkAgent (16-40)
pkg/cloudevents/clients/store/informer.go (4)
pkg/cloudevents/generic/interface.go (1)
  • ResourceObject (20-38)
pkg/cloudevents/clients/store/base.go (1)
  • BaseClientWatchStore (15-20)
pkg/cloudevents/clients/store/watcher.go (2)
  • Watcher (12-18)
  • NewWatcher (22-34)
pkg/cloudevents/clients/work/store/informer.go (2)
  • NewAgentInformerWatcherStore (128-138)
  • AgentInformerWatcherStore (90-94)
pkg/cloudevents/clients/work/source/client/manifestwork.go (1)
pkg/cloudevents/clients/common/common.go (1)
  • CloudEventsResourceVersionAnnotationKey (20-20)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: unit
  • GitHub Check: verify
  • GitHub Check: integration
🔇 Additional comments (14)
pkg/cloudevents/clients/work/source/client/manifestwork.go (3)

7-7: LGTM!

The strconv import is correctly added to support the ParseInt call in getWorkResourceVersion.


92-97: LGTM!

The Create method correctly integrates with the updated getWorkResourceVersion function signature, properly handles errors, and sets both Generation and ResourceVersion fields on the new work object.


283-288: LGTM!

The Patch method correctly integrates with the updated getWorkResourceVersion function, with proper error handling and field assignments matching the Create method pattern.

pkg/cloudevents/clients/work/store/base.go (1)

119-121: LGTM! Past review feedback addressed.

The switch from ResourceVersion to Generation is the right approach for comparing spec versions. The typo has been corrected ("greater"), and the log labels now accurately reflect the values: localGeneration for the source's generation and remoteGeneration for the agent's generation. The logic correctly ignores status updates when the agent reports a generation newer than the source, maintaining the source as the source of truth.

pkg/cloudevents/generic/clients/agentclient.go (1)

301-305: LGTM: Comment correctly reflects generation-based semantics.

The updated comment accurately describes the generation 0 special case, which is appropriate when generation tracking may not be reliable or is not being used. This aligns well with the migration from ResourceVersion to Generation-based semantics.

test/integration/cloudevents/cloudevents_resync_test.go (3)

5-8: LGTM!

The new imports are necessary for the patch-based update implementation (JSON marshaling, merge patch creation, and patch types).


83-101: LGTM!

The migration from informer to watchStore is correctly implemented. The test now uses watchStore.List() to retrieve manifest works, and the assertion correctly checks list.Items.


104-133: The integration test code is correct; no issues found.

The agent client's Patch method is designed to accept status field changes directly in the patch data. The subresources parameter is optional and validates only explicit subresource specifications (e.g., rejecting unsupported ones). The test correctly applies a merge patch containing status changes without specifying a subresource parameter, which is the intended usage pattern in this codebase. The client broadcasts such patches with SubResource: types.SubResourceStatus, confirming proper status update handling.

test/integration/cloudevents/manifestworkclients_reync_test.go (2)

41-42: LGTM!

The test correctly captures watchStore from StartWorkAgent and appropriately discards the unused clientHolder.


57-64: LGTM!

work2 correctly uses Generation = 1 instead of ResourceVersion, aligning with the PR's migration to generation-based versioning. The watchStore.Add() call is correct.

pkg/cloudevents/clients/store/informer.go (4)

17-26: LGTM!

The updated comment accurately reflects the migration from informer-backed storage to a local cache store.


28-35: LGTM!

The initialization correctly creates a local cache store using cache.NewStore(cache.MetaNamespaceKeyFunc), which is appropriate for namespace-scoped resources. The Watcher is properly initialized for watch functionality.


61-85: LGTM!

The Modified case correctly:

  • Uses meta.Accessor to access metadata
  • Validates resource existence before modification (line 72)
  • Preserves deletion timestamps from the last known object (lines 75-78)
  • Converts to runtime.Object before updating the store

132-134: LGTM!

Returning true directly is correct since the local cache store doesn't require synchronization like an informer-backed store. The store is immediately ready after initialization.

Signed-off-by: Jian Qiu <[email protected]>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
pkg/cloudevents/clients/work/source/client/manifestwork.go (1)

318-342: Well-structured implementation with proper fallback logic.

The function correctly handles both generation and resource version with appropriate fallbacks. The logic properly addresses the issues from previous reviews (bitSize 64, correct fallback branching).

One minor edge case: if the annotation exists but contains an empty string, ParseInt will fail and return an internal error. While this strict validation is reasonable (enforcing that present annotations must be valid), you could optionally improve the user experience by checking for empty strings explicitly and providing a more descriptive error message.

Optional improvement for error clarity:

 	resourceVersion, ok := work.Annotations[common.CloudEventsResourceVersionAnnotationKey]
 	if ok {
+		if len(resourceVersion) == 0 {
+			return "", 0, fmt.Errorf("annotation %s is present but empty; either omit the annotation or provide a valid integer value", common.CloudEventsResourceVersionAnnotationKey)
+		}
 		generation, err = strconv.ParseInt(resourceVersion, 10, 64)
 		if err != nil {
-			return "", 0, errors.NewInternalError(err)
+			return "", 0, fmt.Errorf("failed to parse annotation %s as int64: %w", common.CloudEventsResourceVersionAnnotationKey, err)
 		}
 	}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3a565ed and 6f4f6e8.

📒 Files selected for processing (12)
  • pkg/cloudevents/clients/store/informer.go (4 hunks)
  • pkg/cloudevents/clients/work/source/client/manifestwork.go (4 hunks)
  • pkg/cloudevents/clients/work/source/codec/manifestbundle_test.go (3 hunks)
  • pkg/cloudevents/clients/work/store/base.go (1 hunks)
  • pkg/cloudevents/clients/work/store/informer.go (3 hunks)
  • pkg/cloudevents/generic/clients/agentclient.go (1 hunks)
  • pkg/cloudevents/generic/clients/agentclient_test.go (1 hunks)
  • pkg/cloudevents/generic/clients/baseclient.go (2 hunks)
  • test/integration-test.mk (1 hunks)
  • test/integration/cloudevents/cloudevents_resync_test.go (3 hunks)
  • test/integration/cloudevents/manifestworkclients_reync_test.go (1 hunks)
  • test/integration/cloudevents/util/work.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • test/integration-test.mk
  • pkg/cloudevents/generic/clients/agentclient_test.go
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-11-11T13:27:36.291Z
Learnt from: morvencao
Repo: open-cluster-management-io/sdk-go PR: 162
File: pkg/cloudevents/generic/options/pubsub/options.go:157-174
Timestamp: 2025-11-11T13:27:36.291Z
Learning: For open-cluster-management PubSub transport in pkg/cloudevents/generic/options/pubsub: broadcast topics (SourceBroadcast, AgentBroadcast) and their corresponding subscriptions are always required, not optional. The omitempty tags on types.Topics broadcast fields exist because the struct is shared with MQTT (where broadcasts are optional), but PubSub requires all broadcast channels for resync functionality.

Applied to files:

  • pkg/cloudevents/generic/clients/baseclient.go
📚 Learning: 2025-09-16T02:22:20.929Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/generic/options/grpc/protocol/protocol.go:200-213
Timestamp: 2025-09-16T02:22:20.929Z
Learning: In the GRPC CloudEvents protocol implementation, when startEventsReceiver encounters a stream error, it sends the error to reconnectErrorChan. The consumer of this channel handles the error by calling Close() on the protocol, which triggers close(p.closeChan), causing OpenInbound to unblock and call cancel() to properly terminate both the events receiver and heartbeat watcher goroutines.

Applied to files:

  • pkg/cloudevents/generic/clients/baseclient.go
🧬 Code graph analysis (6)
pkg/cloudevents/clients/work/store/informer.go (2)
pkg/cloudevents/clients/store/base.go (1)
  • BaseClientWatchStore (15-20)
pkg/cloudevents/clients/store/watcher.go (2)
  • Watcher (12-18)
  • NewWatcher (22-34)
test/integration/cloudevents/manifestworkclients_reync_test.go (5)
test/integration/cloudevents/agent/agent.go (1)
  • StartWorkAgent (16-40)
pkg/cloudevents/clients/utils/utils.go (1)
  • UID (222-225)
pkg/cloudevents/clients/common/common.go (3)
  • ManifestWorkGR (37-37)
  • CloudEventsOriginalSourceLabelKey (28-28)
  • CloudEventsDataTypeAnnotationKey (14-14)
test/integration/cloudevents/util/work.go (1)
  • NewManifestWorkWithStatus (203-207)
pkg/cloudevents/clients/work/payload/manifestbundle.go (1)
  • ManifestBundleEventDataType (11-15)
test/integration/cloudevents/cloudevents_resync_test.go (2)
test/integration/cloudevents/agent/agent.go (1)
  • StartWorkAgent (16-40)
pkg/cloudevents/clients/utils/utils.go (1)
  • Patch (44-82)
pkg/cloudevents/clients/work/source/client/manifestwork.go (1)
pkg/cloudevents/clients/common/common.go (1)
  • CloudEventsResourceVersionAnnotationKey (20-20)
pkg/cloudevents/generic/clients/agentclient.go (1)
pkg/cloudevents/generic/types/types.go (1)
  • Modified (94-94)
pkg/cloudevents/clients/store/informer.go (4)
pkg/cloudevents/generic/interface.go (1)
  • ResourceObject (20-38)
pkg/cloudevents/clients/store/base.go (1)
  • BaseClientWatchStore (15-20)
pkg/cloudevents/clients/store/watcher.go (2)
  • Watcher (12-18)
  • NewWatcher (22-34)
pkg/cloudevents/clients/work/store/informer.go (2)
  • NewAgentInformerWatcherStore (128-138)
  • AgentInformerWatcherStore (90-94)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: integration
  • GitHub Check: unit
  • GitHub Check: verify
🔇 Additional comments (7)
pkg/cloudevents/generic/clients/baseclient.go (1)

146-146: LGTM: Logging simplification is appropriate for V(5).

The change from lazy evaluation to direct evt.String() calls simplifies the code. While this means the string is always computed (even when V(5) logging is disabled), the performance impact is negligible since V(5) is very verbose and rarely enabled in production environments.

Also applies to: 177-177

pkg/cloudevents/clients/work/source/client/manifestwork.go (2)

92-97: LGTM! Correct integration of resource version and generation handling.

The Create path properly retrieves both resource version and generation from the work, handles errors appropriately, and sets both fields on the new work before encoding and validation.


283-288: LGTM! Patch method correctly handles generation and resource version.

The Patch path mirrors the Create implementation, correctly deriving resource version and generation from the patched work before validation. Using patchedWork as the source ensures any patch changes to the annotation are respected.

pkg/cloudevents/clients/store/informer.go (4)

28-35: LGTM! Clean initialization with local store.

The initialization correctly creates a local cache.Store with the standard Kubernetes namespace/name key function, eliminating the informer dependency as intended by this PR.


62-78: Accessor usage and deletion timestamp preservation look correct.

The use of meta.Accessor for namespace/name extraction is appropriate, and preserving the local deletion timestamp when an object is already deleting makes sense (deletion timestamps should be immutable once set).

One edge case to consider: if both the incoming resource and the local object have deletion timestamps set, this logic overwrites the incoming timestamp with the local one. Verify this is the intended behavior, particularly if timestamps could differ due to clock skew or out-of-order event processing.


109-121: Finalizer handling is intentional and already tested—no changes needed.

The script results confirm this behavior is already documented and tested. The integration test at pkg/cloudevents/clients/work/store/informer_test.go (lines 582–589) explicitly validates that HandleReceivedResource with types.Deleted triggers an Update event instead of immediate deletion, producing the expected event sequence: [watch.Added, watch.Modified, watch.Modified]. Downstream store implementations across the codebase correctly process this flow, and the generation propagation properly syncs metadata through the meta.Accessor updates. The implementation is correct.


133-133: HasInitiated change verified as correct.

The AgentInformerWatcherStore is a local-only store—its cache is initialized immediately in the constructor (line 31) with no remote informer to sync. Returning true unconditionally is the correct behavior. The pattern matches SimpleStore, which also returns true immediately.

This is not a breaking change. WaitForStoreInit will succeed on the first poll iteration, triggering the subsequent Resync call appropriately. No code depends on synchronization delays, and all usages in generic.go handle the immediate success correctly.

Comment on lines +307 to 309
if obj.GetGeneration() < lastObj.GetGeneration() {
return evt, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Keep filtering same-generation spec events

Switching the guard to < means we now emit types.Modified whenever the incoming object shares the exact same generation as what we already cached. During any replay/resync (for example after reconnecting or on a broker resend) we typically receive identical objects with unchanged generation; this change will push all of them through the spec handlers again, reapplying every ManifestWork even though nothing actually changed. That churn was previously avoided by treating non-increasing generations as a no-op. Please restore the <= guard (or add an equivalent duplicate check) so we keep ignoring duplicates.

-	if obj.GetGeneration() < lastObj.GetGeneration() {
+	if obj.GetGeneration() <= lastObj.GetGeneration() {
 		return evt, nil
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if obj.GetGeneration() < lastObj.GetGeneration() {
return evt, nil
}
if obj.GetGeneration() <= lastObj.GetGeneration() {
return evt, nil
}
🤖 Prompt for AI Agents
In pkg/cloudevents/generic/clients/agentclient.go around lines 307 to 309, the
current guard uses '<' which lets events with the same generation through and
causes duplicate Modified events on resyncs; change the condition back to '<='
(or add an equivalent duplicate-generation check) so that when
obj.GetGeneration() is less-than-or-equal-to lastObj.GetGeneration() the
function returns without emitting a Modified event, preserving the previous
no-op behavior for identical generations.

Comment on lines 127 to 129
// only update the status on the agent local part
store := informer.Informer().GetStore()
if err := store.Update(newWork); err != nil {
if _, err := agentWorkClient.Patch(context.Background(), workName, types.MergePatchType, patchData, metav1.PatchOptions{}); err != nil {
return err
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Patch status must target the status subresource.

ManifestWork exposes a status subresource; patching it via the main resource will be rejected and the status never updates. The helper in test/integration/cloudevents/util/work.go already calls Patch(..., "status") for this reason. Please target the status subresource here as well.

-				if _, err := agentWorkClient.Patch(context.Background(), workName, types.MergePatchType, patchData, metav1.PatchOptions{}); err != nil {
+				if _, err := agentWorkClient.Patch(context.Background(), workName, types.MergePatchType, patchData, metav1.PatchOptions{}, "status"); err != nil {
🤖 Prompt for AI Agents
In test/integration/cloudevents/cloudevents_resync_test.go around lines 127-129,
the code patches the ManifestWork without targeting the status subresource which
will be rejected; update the Patch call to include the "status" subresource
argument so the status subresource is patched (i.e., call Patch with the same
parameters plus "status" as the final variadic argument).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant