Skip to content

Conversation

xxhZs
Copy link
Contributor

@xxhZs xxhZs commented Sep 18, 2025

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

#23177

Because our changelog previously used the logic of rowid to generate vnodes in a loop, which caused errors. In this PR, the problem is avoided by using the vnode of the table.

This pull request introduces significant improvements to how changelog row IDs are generated and handled throughout the system, with a focus on per-vnode sequencing, distribution awareness, and better timestamp management. The changes affect the row ID generation logic, the optimizer and planner, and the stream executor, ensuring that changelog row IDs are unique per vnode and properly distributed. Below are the most important changes grouped by theme:

Row ID Generation Enhancements:

  • Added a new ChangelogRowIdGenerator struct in row_id.rs that generates unique changelog row IDs with per-vnode sequence management and improved timestamp handling via a shared TimestampManager. This ensures uniqueness and correct ordering for distributed changelog processing.
  • Refactored RowIdGenerator to use the new TimestampManager for consistent timestamp management, replacing direct timestamp logic and simplifying timestamp updates. [1] [2] [3] [4] [5]

Stream Plan and Executor Integration:

  • Updated the ChangeLogExecutor to use the new ChangelogRowIdGenerator, passing in the vnode bitmap and distribution keys, and generating row IDs per vnode for each chunk. Also handles vnode bitmap updates on barriers for dynamic partitioning. [1] [2] [3]
  • Modified the optimizer's changelog planning logic to propagate distribution keys and ensure that changelog nodes are properly distributed, including adding a StreamExchange when necessary to enforce distribution. [1] [2]

Protobuf and Plan Node Changes:

  • Extended the ChangeLogNode protobuf definition to include a repeated distribution_keys field, and updated StreamChangeLog to carry and serialize these keys, ensuring distribution metadata is correctly propagated in the stream plan. [1] [2] [3] [4] [5]

These changes collectively improve the reliability, scalability, and correctness of changelog row ID generation and distribution in the streaming system.

What's changed and what's your intention?

Checklist

  • I have written necessary rustdoc comments.
  • I have added necessary unit tests and integration tests.
  • I have added test labels as necessary.
  • I have added fuzzing tests or opened an issue to track them.
  • My PR contains breaking changes.
  • My PR changes performance-critical code, so I will run (micro) benchmarks and present the results.
  • I have checked the Release Timeline and Currently Supported Versions to determine which release branches I need to cherry-pick this PR into.

Documentation

  • My PR needs documentation updates.
Release note

@xxhZs xxhZs changed the title fix(stream): fix changelog order fix(stream): fix _changelog_row_id out of order Sep 18, 2025
@github-actions github-actions bot added the type/fix Type: Bug fix. Only for pull requests. label Sep 18, 2025
@hzxa21 hzxa21 requested a review from Copilot September 18, 2025 04:49
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes an issue with the _changelog_row_id being generated out of order by implementing proper virtual node (vnode) aware row ID generation in the ChangeLog executor. The fix ensures that changelog row IDs are generated correctly using a new ChangelogRowIdGenerator that maintains separate sequences for each vnode.

  • Replaces the placeholder row ID generation with proper vnode-aware implementation
  • Introduces a new ChangelogRowIdGenerator that uses the snowflake algorithm with per-vnode sequences
  • Removes the separate StreamRowIdGen step by integrating row ID generation directly into the ChangeLog executor

Reviewed Changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
src/stream/src/from_proto/changelog.rs Updates builder to pass vnode information to ChangeLog executor
src/stream/src/executor/changelog.rs Implements proper row ID generation using ChangelogRowIdGenerator with vnode awareness
src/frontend/src/planner/changelog.rs Adds vnode_count extraction from base table catalog
src/frontend/src/optimizer/plan_node/stream_changelog.rs Adds vnode_count to protobuf serialization
src/frontend/src/optimizer/plan_node/logical_changelog.rs Removes StreamRowIdGen usage and adds vnode_count parameter threading
src/frontend/src/optimizer/plan_node/generic/changelog.rs Adds vnode_count field to generic ChangeLog structure
src/common/src/util/row_id.rs Implements new ChangelogRowIdGenerator with per-vnode sequence management
proto/stream_plan.proto Adds vnode_count field to ChangeLogNode protobuf definition

@BugenZhao
Copy link
Member

Can you fill the PR body first?

@xxhZs xxhZs requested review from BugenZhao and hzxa21 September 19, 2025 06:47
Comment on lines 151 to 154
let stream_key = input
.stream_key()
.ok_or_else(|| anyhow::anyhow!("changelog input must have a stream key"))?
.to_vec();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can assume the stream key must exist for stream plan node? Then we can call expect_stream_key. cc @chenzl25

Comment on lines 145 to 149
let (distribution_keys, new_input) = match dist {
Distribution::HashShard(distribution_keys)
| Distribution::UpstreamHashShard(distribution_keys, _) => {
(distribution_keys.clone(), input)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can call dist_column_indices.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this method will panic, and it's not certain if there are any special cases here that could cause issues. This should be the same problem as the one above, cc @chenzl25

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I guess you can call enforce_concrete_distribution then dist_column_indices.

Copy link
Contributor

@chenzl25 chenzl25 Sep 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the input of changelog is simply a table scan, so the distribution of scan could be UpstreamHashShard or Single. But it seems we don't handle the Single distribution here. For example the stream key of Single is empty, but we use it in HashShard.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the input of changelog is simply a table scan, so the distribution of scan could be UpstreamHashShard or Single

True. But theoretically the upstream can be any plan node, so I'd suggest using a more general implementation here. 🤔

pub fn new_with_dist(
core: generic::ChangeLog<PlanRef>,
dist: Distribution,
distribution_keys: Vec<u32>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can obtain this from core.input.distribution().dist_column_indices()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the comments above, the stream_key of our changelog operator should be changlog_row_id. Therefore, we fix the dist_key here as changlog_row_id instead of using the dist_key from the upstream.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

I just realized this means that we changed the distribution key within a fragment without introducing an Exchange. Not sure if this will break any assumption... image

@BugenZhao BugenZhao requested a review from chenzl25 September 22, 2025 09:19
Copy link
Member

@BugenZhao BugenZhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surprised to see there are no planner tests for this feature. Could you add some in this PR?

Also, can you add the test case mentioned in #23177 as an e2e test?

Copy link
Member

@BugenZhao BugenZhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM. cc @chenzl25 Would you also take a look at the optimizer part before this getting into main?

Comment on lines 145 to 149
let (distribution_keys, new_input) = match dist {
Distribution::HashShard(distribution_keys)
| Distribution::UpstreamHashShard(distribution_keys, _) => {
(distribution_keys.clone(), input)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I guess you can call enforce_concrete_distribution then dist_column_indices.

@chenzl25
Copy link
Contributor

I will take a look later today.

@BugenZhao
Copy link
Member

Can you also add some planner test for the new plan? stream_plan and stream_dist_plan will be nice.

Comment on lines +17 to +26
statement ok
create table wide (v1 int primary key, even int, odd int);

statement ok
create sink even into wide (v1, even) as select v1, v1*2 from t4;

statement ok
create sink odd into wide (v1, odd) as select v1, v1*2+1 from t4;

statement ok
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

columns mismatch

Comment on lines 145 to 149
let (distribution_keys, new_input) = match dist {
Distribution::HashShard(distribution_keys)
| Distribution::UpstreamHashShard(distribution_keys, _) => {
(distribution_keys.clone(), input)
}
Copy link
Contributor

@chenzl25 chenzl25 Sep 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the input of changelog is simply a table scan, so the distribution of scan could be UpstreamHashShard or Single. But it seems we don't handle the Single distribution here. For example the stream key of Single is empty, but we use it in HashShard.

@xxhZs xxhZs enabled auto-merge September 24, 2025 06:40
@xxhZs xxhZs added this pull request to the merge queue Sep 24, 2025
github-merge-queue bot pushed a commit that referenced this pull request Sep 24, 2025
@github-merge-queue github-merge-queue bot removed this pull request from the merge queue due to failed status checks Sep 24, 2025
@xxhZs xxhZs enabled auto-merge September 24, 2025 08:48
Comment on lines 144 to 150
let distribution_keys = match dist {
Distribution::HashShard(distribution_keys)
| Distribution::UpstreamHashShard(distribution_keys, _) => distribution_keys.clone(),
_ => {
vec![]
}
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please enumerate all the distribution type explicitly and I think Broadcast and SomeShard should never appear in the input.

Copy link
Contributor

@chenzl25 chenzl25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM

@xxhZs xxhZs added this pull request to the merge queue Sep 24, 2025
Merged via the queue into main with commit 9f528ed Sep 24, 2025
36 of 37 checks passed
@xxhZs xxhZs deleted the xxh/fix-changelog-order branch September 24, 2025 16:40
github-actions bot pushed a commit that referenced this pull request Sep 24, 2025
@github-actions
Copy link
Contributor

✅ Cherry-pick PRs (or issues if encountered conflicts) have been created successfully to all target branches.

xxhZs added a commit that referenced this pull request Sep 25, 2025
@hzxa21
Copy link
Collaborator

hzxa21 commented Sep 25, 2025

There is a compatibility issue in this PR. I created a MV with AS CHANGELOG in commit 5e9eb6b (one commit before this PR)

dev=> create table t(k int primary key, v int);
dev=> create materialized view mv2 as with sub as CHANGELOG FROM t select k,v,changelog_op, _changelog_row_id::bigint AS __row_id from sub;
dev=> insert into t values(1, 3);
INSERT 0 1
dev=> insert into t values(3, 3);
INSERT 0 1

Then I upgrade to this PR ad6e6ab and insert more rows

dev=> insert into t values(5, 3);
INSERT 0 1

CN panics:

thread 'rw-streaming' panicked at src/common/src/util/row_id.rs:306:13:
vnode VirtualNode(0) not in generator
stack backtrace:
   0: __rustc::rust_begin_unwind
             at /rustc/28f1c807911c63f08d98e7b468cfcf15a441e34b/library/std/src/panicking.rs:697:5
   1: core::panicking::panic_fmt
             at /rustc/28f1c807911c63f08d98e7b468cfcf15a441e34b/library/core/src/panicking.rs:75:14
   2: risingwave_common::util::row_id::ChangelogRowIdGenerator::next_changelog_row_id_in_current_timestamp
             at ./src/common/src/util/row_id.rs:306:13
   3: risingwave_common::util::row_id::ChangelogRowIdGenerator::next
             at ./src/common/src/util/row_id.rs:325:36
   4: risingwave_stream::executor::changelog::ChangeLogExecutor::execute_inner::{{closure}}::{{closure}}
             at ./src/stream/src/executor/changelog.rs:87:70
   5: core::iter::adapters::map::map_fold::{{closure}}
             at /home/patrick/.rustup/toolchains/nightly-2025-06-25-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/iter/adapters/map.rs:88:28
   6: <core::slice::iter::Iter<T> as core::iter::traits::iterator::Iterator>::fold
             at /home/patrick/.rustup/toolchains/nightly-2025-06-25-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/slice/iter/macros.rs:255:27
   7: <core::iter::adapters::map::Map<I,F> as core::iter::traits::iterator::Iterator>::fold
             at /home/patrick/.rustup/toolchains/nightly-2025-06-25-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/iter/adapters/map.rs:128:19
   8: <core::iter::adapters::map::Map<I,F> as core::iter::traits::iterator::Iterator>::fold

@BugenZhao
Copy link
Member

BugenZhao commented Sep 26, 2025

vnode VirtualNode(0) not in generator

True. This is really an issue. Because we are adding a new field into the plan node proto.

@xxhZs xxhZs mentioned this pull request Sep 26, 2025
8 tasks
github-merge-queue bot pushed a commit that referenced this pull request Sep 26, 2025
github-merge-queue bot pushed a commit that referenced this pull request Oct 16, 2025
Co-authored-by: Xinhao Xu <[email protected]>
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Chengyou Liu <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

need-cherry-pick-since-release-2.5 type/fix Type: Bug fix. Only for pull requests.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants