Skip to content

Conversation

chenzl25
Copy link
Contributor

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

  • Previously we have supported index selection for backfilling. While index selection for backfilling only improve the locality of the leaves level of the DAG. To extend the idea of locality, we can introduce locality enforcement to the non-leaves nodes of the DAG. That is a new type of backfilling -- locality backfill. With locality backfill, we can provide good locality to the intermediate result during backfilling. In the circumstances of large historical data backfilling and memory-limited, it can significantly improve the performance.
  • In the optimizer, we will follow the try_better_locality method to enforce locality for an operator if it can't provide. The operator called LocalityProvider. Currently, we only generate this operator for LogicalJoin and LogicalScan. With this ability, users don't even need to create index by themselves. But if users know their workload well, they can create indexes to share indexes across different jobs.
  • The LocalityProvider has 2 states which one of the used to buffer data during backfilling and provide data locality. The other one is a progress table like normal backfill operator to track the backfilling progress of itself.
  • We use the locality columns as the prefix of the state table pk to provide the locality, so implicitly it will sort the table asynchronously by compactors.
  • Once we introduce locality backfill, the dependencies between scan backfill and intermediate locality backfill become important. We need to start the scan backfill first, then next to intermediate locality backfill. Among all intermediate locality backfill, we also need to take care of the backfilling dependency, since intermediate LocalityProvider operators could depend on each other. We extend these dependencies ordering control based on feat(meta,frontend,streaming): support fixed backfill order control #20967.
  • TODO: The state table could be truncated after backfilling.

Performance

TPCH Q18 with scale = 1g
We limit the compute node memory to 2g

Backfill throughput

With locality backfill, our backfilling could finish in 7min, while without the locality backfill, it jobs is too slow to finish.

image

Cache miss ratio

With locality backfill, our cache miss ratio is much lower than without it.

image
set enable_locality_backfill = true;
explain create materialized view q18 as    select
      c_name,
      c_custkey,
      o_orderkey,
      o_orderdate,
      o_totalprice,
      sum(l_quantity) quantity
    from
      customer,
      orders,
      lineitem
    where
      o_orderkey in (
        select
          l_orderkey
        from
          lineitem
        group by
          l_orderkey
        having
          sum(l_quantity) > 1
      )
      and c_custkey = o_custkey
      and o_orderkey = l_orderkey
    group by
      c_name,
      c_custkey,
      o_orderkey,
      o_orderdate,
      o_totalprice
    order by
      o_totalprice desc,
      o_orderdate
    LIMIT 100;

 StreamMaterialize { columns: [c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice, quantity], stream_key: [c_custkey, c_name, o_orderkey, o_totalprice, o_orderdate], pk_columns: [o_totalprice, o_orderdate, c_custkey, c_name, o_orderkey], pk_conflict: NoCheck }
 └─StreamProject { exprs: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, sum(lineitem.l_quantity)] }
   └─StreamTopN { order: [orders.o_totalprice DESC, orders.o_orderdate ASC], limit: 100, offset: 0 }
     └─StreamExchange { dist: Single }
       └─StreamGroupTopN { order: [orders.o_totalprice DESC, orders.o_orderdate ASC], limit: 100, offset: 0, group_key: [$expr1] }
         └─StreamProject { exprs: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, sum(lineitem.l_quantity), Vnode(orders.o_orderkey) as $expr1] }
           └─StreamHashAgg { group_key: [customer.c_custkey, customer.c_name, orders.o_orderkey, orders.o_totalprice, orders.o_orderdate], aggs: [sum(lineitem.l_quantity), count] }
             └─StreamLocalityProvider { locality_columns: [0, 1, 2, 3, 4] }
               └─StreamExchange [no_shuffle] { dist: HashShard(orders.o_orderkey) }
                 └─StreamHashJoin { type: LeftSemi, predicate: orders.o_orderkey = lineitem.l_orderkey }
                   ├─StreamExchange { dist: HashShard(orders.o_orderkey) }
                   │ └─StreamLocalityProvider { locality_columns: [2] }
                   │   └─StreamExchange [no_shuffle] { dist: HashShard(orders.o_orderkey) }
                   │     └─StreamHashJoin { type: Inner, predicate: orders.o_orderkey = lineitem.l_orderkey }
                   │       ├─StreamExchange { dist: HashShard(orders.o_orderkey) }
                   │       │ └─StreamLocalityProvider { locality_columns: [2] }
                   │       │   └─StreamExchange { dist: HashShard(orders.o_orderkey) }
                   │       │     └─StreamHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey }
                   │       │       ├─StreamExchange { dist: HashShard(customer.c_custkey) }
                   │       │       │ └─StreamLocalityProvider { locality_columns: [0] }
                   │       │       │   └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(customer.c_custkey) }
                   │       │       │     └─StreamTableScan { table: customer, columns: [c_custkey, c_name] }
                   │       │       └─StreamExchange { dist: HashShard(orders.o_custkey) }
                   │       │         └─StreamLocalityProvider { locality_columns: [1] }
                   │       │           └─StreamExchange { dist: HashShard(orders.o_custkey) }
                   │       │             └─StreamTableScan { table: orders, columns: [o_orderkey, o_custkey, o_totalprice, o_orderdate] }
                   │       └─StreamExchange { dist: HashShard(lineitem.l_orderkey) }
                   │         └─StreamLocalityProvider { locality_columns: [0] }
                   │           └─StreamExchange { dist: HashShard(lineitem.l_orderkey) }
                   │             └─StreamTableScan { table: lineitem, columns: [l_orderkey, l_quantity, l_linenumber] }
                   └─StreamExchange { dist: HashShard(lineitem.l_orderkey) }
                     └─StreamProject { exprs: [lineitem.l_orderkey] }
                       └─StreamFilter { predicate: (sum(lineitem.l_quantity) > 1:Decimal) }
                         └─StreamProject { exprs: [lineitem.l_orderkey, sum(lineitem.l_quantity)] }
                           └─StreamHashAgg { group_key: [lineitem.l_orderkey], aggs: [sum(lineitem.l_quantity), count] }
                             └─StreamLocalityProvider { locality_columns: [0] }
                               └─StreamExchange { dist: HashShard(lineitem.l_orderkey) }
                                 └─StreamTableScan { table: lineitem, columns: [l_orderkey, l_quantity, l_linenumber] }
(38 rows)

Checklist

  • I have written necessary rustdoc comments.
  • I have added necessary unit tests and integration tests.
  • I have added test labels as necessary.
  • I have added fuzzing tests or opened an issue to track them.
  • My PR contains breaking changes.
  • My PR changes performance-critical code, so I will run (micro) benchmarks and present the results.
  • I have checked the Release Timeline and Currently Supported Versions to determine which release branches I need to cherry-pick this PR into.

Documentation

  • My PR needs documentation updates.
Release note

Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces locality enforcement and locality backfill to RisingWave's streaming engine, extending the existing index selection functionality to improve performance during large historical data backfilling in memory-limited scenarios.

Key changes include:

  • Implementation of LocalityProvider operators that buffer data with locality column ordering during backfill
  • Extension of dependency ordering to ensure proper sequencing between scan backfill and locality backfill operations
  • Addition of session configuration to enable/disable locality backfill functionality

Reviewed Changes

Copilot reviewed 21 out of 21 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
src/stream/src/executor/locality_provider.rs Core executor implementation for locality-aware backfilling with proper state management
src/stream/src/from_proto/locality_provider.rs Protocol buffer conversion and executor builder for LocalityProvider
src/frontend/src/optimizer/plan_node/stream_locality_provider.rs Stream plan node implementation with state and progress table catalog building
src/frontend/src/optimizer/plan_node/logical_locality_provider.rs Logical plan node with column pruning and predicate pushdown support
src/meta/src/stream/stream_graph/fragment.rs Fragment dependency analysis for LocalityProvider ordering
src/meta/src/model/stream.rs Backfill upstream type classification for LocalityProvider
src/common/src/session_config/mod.rs Session configuration parameter for enabling locality backfill
proto/stream_plan.proto Protocol buffer definition for LocalityProviderNode
Comments suppressed due to low confidence (1)

src/stream/src/executor/locality_provider.rs:1

  • The magic number 1024 should be defined as a named constant or made configurable to improve maintainability and allow tuning.
// Copyright 2025 RisingWave Labs

}
}

// TODO: truncate the state table after backfill.
Copy link
Preview

Copilot AI Sep 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This TODO comment indicates incomplete functionality. State table truncation after backfill should be implemented or tracked in a proper issue management system.

Suggested change
// TODO: truncate the state table after backfill.
// Truncate the state table after backfill to free resources.
state_table.truncate().await?;

Copilot uses AI. Check for mistakes.

@yuhao-su yuhao-su self-requested a review September 25, 2025 07:15
@BugenZhao BugenZhao self-requested a review September 26, 2025 12:12
Copy link
Contributor Author

chenzl25 commented Sep 26, 2025

@chenzl25 chenzl25 requested review from kwannoel and wenym1 September 29, 2025 09:32
@chenzl25
Copy link
Contributor Author

chenzl25 commented Sep 29, 2025

This PR is ready to review.
Basically the PR contains 3 parts:

  • optimizer: Introduce LocalityProvider and use it together with try_better_locality method.
  • backfill ordering: Deal with the dependencies among scan and locality backfill fragments.
  • executor: Buffer chunk until StartFragmentBackfill and backfill the buffered state table.

@chenzl25 chenzl25 mentioned this pull request Sep 29, 2025
5 tasks
@kwannoel
Copy link
Contributor

This PR is ready to review.
Basically the PR contains 3 parts:

  • optimizer: Introduce LocalityProvider and use it together with try_better_locality method.
  • backfill ordering: Deal with the dependencies among scan and locality backfill fragments.
  • executor: Buffer chunk until StartFragmentBackfill and backfill the buffered state table.

Very cool, I'll take a look tomorrow!

@kwannoel
Copy link
Contributor

This pull request introduces a new "locality backfill" feature for streaming queries, which enables more efficient backfilling by grouping and buffering input data using specified locality columns. The changes span the SQL test suite, session configuration, planner, protobuf definitions, and catalog metadata to support this feature end-to-end. The most important changes are grouped below:

Locality Backfill Feature Implementation

  • Added the LocalityProvider plan node (src/frontend/src/optimizer/plan_node/generic/locality_provider.rs) and integrated it into the planner logic, enabling operators to buffer input data by locality columns during backfill. This includes changes to the planner's aggregation and join logic to use the new node when the feature is enabled. [1] [2] [3] [4] [5] [6]
  • Added the LocalityProviderNode protobuf message and registered it as a fragment type, allowing the streaming engine to recognize and process locality backfill nodes. [1] [2] [3] [4]

Configuration and Test Coverage

  • Introduced the enable_locality_backfill session config parameter (default: false), making the feature opt-in and allowing users to control its activation. [1] [2]
  • Added new SQL logic tests and planner test cases to verify locality backfill behavior, including a dedicated end-to-end test and planner output validation. [1] [2] [3]
  • Updated the CI test script to include the locality backfill test in the automated pipeline. [1] [2]

Catalog and Utility Updates

  • Registered the new fragment type in the catalog and updated related utilities and test cases to support the LocalityProvider node. [1] [2] [3]

These changes collectively enable more granular and efficient backfill operations in streaming queries, controlled by a session-level configuration and fully integrated into the query planner and execution engine.

@BugenZhao
Copy link
Member

This PR is ready to review.

I'm wondering if it's possible to split this PR into smaller ones to make the review process easier?

Copy link
Member

@BugenZhao BugenZhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great to me! Reviewed all files except stream.

kill_cluster
}

test_locality_backfill() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically, if we push this into a larger scope or even enable this by default, then

  • we can run all e2e tests under this mode
  • all existing planner tests need to be updated

May I know your plan for this?

Comment on lines +51 to +54
pub fn fields_pretty<'a>(&self) -> Vec<(&'a str, pretty_xmlish::Pretty<'a>)> {
let locality_columns_str = format!("{:?}", self.locality_columns);
vec![("locality_columns", locality_columns_str.into())]
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we show the field name?

Comment on lines +1780 to +1786
Some(
LogicalLocalityProvider::new(
self.clone_with_left_right(self.left(), self.right()).into(),
columns.to_owned(),
)
.into(),
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized that we generate a LocalityProvider only until we encounter an upstream with different locality. If I'm understanding correctly, we will generate a plan like

Join -> LocalityProvider -> Filter -> Project -> Agg

instead of

Join -> Filter -> Project -> LocalityProvider -> Agg

Does it mean that we need to buffer unnecessary data that could have been filtered out by those stateless executors?

Comment on lines +729 to +732
if let Some(better_plan) = self.try_better_locality_inner(columns) {
Some(better_plan)
} else if self.ctx().session_ctx().config().enable_locality_backfill() {
Some(LogicalLocalityProvider::new(self.clone().into(), columns.to_owned()).into())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern seems to appear multiple times. Is there any way to extract it?

Comment on lines +47 to +48
// If the input is hash-distributed, we make it a UpstreamHashShard distribution
// just like a normal table scan. It is used to ensure locality provider is in its own fragment.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean that this is actually a workaround for the backfill order control to work correctly? I guess we can keep the HashShard and omit one shuffle if it's solely for correctness.

catalog_builder.add_order_column(*locality_col_idx, OrderType::ascending());
}
// add streaming key of the input as the rest of the primary key
if let Some(stream_key) = input.stream_key() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if let Some(stream_key) = input.stream_key() {
input.expect_stream_key()


// Set locality columns as primary key.
for locality_col_idx in self.locality_columns() {
catalog_builder.add_order_column(*locality_col_idx, OrderType::ascending());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realize that ideally we should also have Order as a requirement for "better locality".

Comment on lines +162 to +164
/// Schema: | vnode | pk(locality columns + input stream keys) | `backfill_finished` | `row_count` |
/// Key: | vnode | pk(locality columns + input stream keys) |
fn build_progress_catalog(&self, state: &mut BuildFragmentGraphState) -> TableCatalog {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we unify this with other backfill nodes?

}
}

// 2. Add dependencies: all backfill fragments should run before LocalityProvider fragments
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary, or just for the sake of simplicity? For the example below, it seems that we can backfill Locality and Fact simultaneously.

Dim -> Locality 
                 -> Join
          Fact

BackfillUpstreamType::MView => mv_count += 1,
BackfillUpstreamType::Source => source_count += 1,
BackfillUpstreamType::Values => (),
BackfillUpstreamType::LocalityProvider => mv_count += 1, /* Count LocalityProvider as an MView for progress */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if there's (or we should add) a way to inspect the progress of all backfill (and locality provider) nodes separately? This seems to benefit observability of backfill order control a lot, especially we will now "generate" or "manipulate" the backfill order to also handle the locality provider nodes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We currently have fragment level backfill observability, in the form of rw_fragment_backfill_progress. Because LocalityProvider is also dedicated to a single fragment, we can reuse this mechanism to track the progress, provided LocalityProvider also has a progress state table. We just need to update the system catalog.

@@ -0,0 +1,32 @@
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test for NoShuffleExchange? We should make sure that the LocalityProvider lies on a different Fragment is no shuffle exchange is used.

// Force a no shuffle exchange to ensure locality provider is in its own fragment.
// This is important to ensure the backfill ordering can recognize and build
// the dependency graph among different backfill-needed fragments.
StreamExchange::new_no_shuffle(input).into()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to test this.

Copy link
Contributor

@kwannoel kwannoel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Executor part looks good to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants