Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
71a86b4
planner
chenzl25 Sep 18, 2025
b8a76df
fix locality state
chenzl25 Sep 18, 2025
6885136
fix
chenzl25 Sep 19, 2025
58fe6d8
add backfill
chenzl25 Sep 19, 2025
9b36d47
fix
chenzl25 Sep 19, 2025
97252b0
add back buffering stage
chenzl25 Sep 19, 2025
dc18e6c
fix
chenzl25 Sep 19, 2025
027d22f
fmt
chenzl25 Sep 19, 2025
f52a660
change locality provider distribution to upstreamhash and ensure its …
chenzl25 Sep 19, 2025
2c4bb95
add fragment dependency
chenzl25 Sep 19, 2025
2f50b59
report backfill progress
chenzl25 Sep 19, 2025
124e8c8
backfill track locality provider
chenzl25 Sep 19, 2025
c806a4d
add backfill ordering for locality provider
chenzl25 Sep 19, 2025
3d5fc58
fix progress states
chenzl25 Sep 19, 2025
0d97562
improve locality provider ordering
chenzl25 Sep 19, 2025
f145ed1
add session variable enable_locality_backfill
chenzl25 Sep 19, 2025
00f7f2b
fix
chenzl25 Sep 22, 2025
60bc6f7
fix locality backfill metrics
chenzl25 Sep 25, 2025
7bae65c
use chunk builder
chenzl25 Sep 25, 2025
171e124
fmt
chenzl25 Sep 25, 2025
69c7afb
Merge branch 'main' into dylan/support_locality_enforcement
chenzl25 Sep 25, 2025
4a86f92
fix
chenzl25 Sep 25, 2025
6c6e2f0
fix
chenzl25 Sep 25, 2025
1151be1
fix
chenzl25 Sep 25, 2025
714defc
fix
chenzl25 Sep 26, 2025
c348fe2
remove deadcode
chenzl25 Sep 26, 2025
8b01b5a
fmt
chenzl25 Sep 29, 2025
deb2ccb
set enable_locality_backfill = true to test e2e backfill test
chenzl25 Sep 29, 2025
1d7564f
fix
chenzl25 Sep 29, 2025
22bc8b8
Revert "set enable_locality_backfill = true to test e2e backfill test"
chenzl25 Sep 29, 2025
16f6b9c
fmt
chenzl25 Sep 29, 2025
e44a12f
add test
chenzl25 Sep 29, 2025
9dabe6a
add tests
chenzl25 Sep 29, 2025
3dd2398
add locality backfill test to run backfill tests
chenzl25 Sep 29, 2025
6857478
resolve conflicts
chenzl25 Sep 30, 2025
679e2b1
fix fields_pretty
chenzl25 Oct 9, 2025
a7176c0
expect_stream_key
chenzl25 Oct 9, 2025
a49a912
add no shuffle test
chenzl25 Oct 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions ci/scripts/run-backfill-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,16 @@ test_cross_db_snapshot_backfill() {
kill_cluster
}

test_locality_backfill() {
echo "--- e2e, locality backfill test, $RUNTIME_CLUSTER_PROFILE"

risedev ci-start $RUNTIME_CLUSTER_PROFILE

sqllogictest -p 4566 -d dev 'e2e_test/backfill/locality_backfill/basic.slt'

kill_cluster
}

main() {
set -euo pipefail
test_snapshot_and_upstream_read
Expand All @@ -433,6 +443,7 @@ main() {
test_scale_in

test_cross_db_snapshot_backfill
test_locality_backfill

# Only if profile is "ci-release", run it.
if [[ ${profile:-} == "ci-release" ]]; then
Expand Down
35 changes: 35 additions & 0 deletions e2e_test/backfill/locality_backfill/basic.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
statement ok
set enable_locality_backfill=true;

statement ok
create table t1(a int, b int);

statement ok
create table t2(a int, b int);

statement ok
insert into t1 select i, 123 from generate_series(1, 1000, 1) i;

statement ok
insert into t2 select i, 123 from generate_series(1, 1000, 1) i ;

statement ok
flush;

statement ok
create materialized view mv as select count(*) from t1 join t2 on t1.a = t2.a group by t1.b;


query ?
select * from mv;
----
1000

statement ok
drop materialized view mv;

statement ok
drop table t1;

statement ok
drop table t2;
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ user disable_purify_definition
user dml_rate_limit
user enable_index_selection
user enable_join_ordering
user enable_locality_backfill
user enable_share_plan
user enable_two_phase_agg
user extra_float_digits
Expand Down
10 changes: 10 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,15 @@ message UpstreamSinkUnionNode {
repeated UpstreamSinkInfo init_upstreams = 1;
}

message LocalityProviderNode {
// Column indices that define locality
repeated uint32 locality_columns = 1;
// State table for buffering input data
optional catalog.Table state_table = 2;
// Progress table for tracking backfill progress
optional catalog.Table progress_table = 3;
}

message StreamNode {
// This field used to be a `bool append_only`.
// Enum variants are ordered for backwards compatibility.
Expand Down Expand Up @@ -1060,6 +1069,7 @@ message StreamNode {
MaterializedExprsNode materialized_exprs = 149;
VectorIndexWriteNode vector_index_write = 150;
UpstreamSinkUnionNode upstream_sink_union = 151;
LocalityProviderNode locality_provider = 152;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down
8 changes: 7 additions & 1 deletion src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,8 @@ macro_rules! for_all_fragment_type_flags {
CrossDbSnapshotBackfillStreamScan,
StreamCdcScan,
VectorIndexWrite,
UpstreamSinkUnion
UpstreamSinkUnion,
LocalityProvider
},
{},
0
Expand Down Expand Up @@ -892,6 +893,11 @@ mod tests {
65536,
"UPSTREAM_SINK_UNION",
),
(
LocalityProvider,
131072,
"LOCALITY_PROVIDER",
),
]
"#]]
.assert_debug_eq(
Expand Down
4 changes: 4 additions & 0 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,10 @@ pub struct SessionConfig {
/// Enable index selection for queries
#[parameter(default = true)]
enable_index_selection: bool,

/// Enable locality backfill for streaming queries. Defaults to false.
#[parameter(default = false)]
enable_locality_backfill: bool,
}

fn check_iceberg_engine_connection(val: &str) -> Result<(), String> {
Expand Down
4 changes: 4 additions & 0 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,10 @@ pub fn visit_stream_node_tables_inner<F>(
always!(node.table, "StreamVectorIndexWrite");
}

NodeBody::LocalityProvider(node) => {
always!(node.state_table, "LocalityProviderState");
always!(node.progress_table, "LocalityProviderProgress");
}
_ => {}
}
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
- sql: |
set enable_locality_backfill = true;
create table t (a int, b int, c int);
select count(*) from t group by b;
expected_outputs:
- stream_plan
- sql: |
set enable_locality_backfill = true;
create table t1 (a int, b int, c int);
create table t2 (a int, b int, c int);
select count(*) from t1 join t2 on t1.a = t2.a group by t1.b;
expected_outputs:
- stream_plan
- sql: |
set enable_locality_backfill = true;
create table t (a int, b int, c int, primary key (b, a));
select count(*) from t group by a, b;
expected_outputs:
- stream_plan
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.
- sql: |
set enable_locality_backfill = true;
create table t (a int, b int, c int);
select count(*) from t group by b;
stream_plan: |-
StreamMaterialize { columns: [count, t.b(hidden)], stream_key: [t.b], pk_columns: [t.b], pk_conflict: NoCheck }
└─StreamProject { exprs: [count, t.b] }
└─StreamHashAgg { group_key: [t.b], aggs: [count] }
└─StreamLocalityProvider { locality_columns: [t.b] }
└─StreamExchange { dist: HashShard(t.b) }
└─StreamTableScan { table: t, columns: [t.b, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
- sql: |
set enable_locality_backfill = true;
create table t1 (a int, b int, c int);
create table t2 (a int, b int, c int);
select count(*) from t1 join t2 on t1.a = t2.a group by t1.b;
stream_plan: |-
StreamMaterialize { columns: [count, t1.b(hidden)], stream_key: [t1.b], pk_columns: [t1.b], pk_conflict: NoCheck }
└─StreamProject { exprs: [count, t1.b] }
└─StreamHashAgg { group_key: [t1.b], aggs: [count] }
└─StreamLocalityProvider { locality_columns: [t1.b] }
└─StreamExchange { dist: HashShard(t1.b) }
└─StreamHashJoin { type: Inner, predicate: t1.a = t2.a, output: [t1.b, t1._row_id, t1.a, t2._row_id] }
├─StreamExchange { dist: HashShard(t1.a) }
│ └─StreamLocalityProvider { locality_columns: [t1.a] }
│ └─StreamExchange { dist: HashShard(t1.a) }
│ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(t2.a) }
└─StreamLocalityProvider { locality_columns: [t2.a] }
└─StreamExchange { dist: HashShard(t2.a) }
└─StreamTableScan { table: t2, columns: [t2.a, t2._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t2._row_id], pk: [_row_id], dist: UpstreamHashShard(t2._row_id) }
- sql: |
set enable_locality_backfill = true;
create table t (a int, b int, c int, primary key (b, a));
select count(*) from t group by a, b;
stream_plan: |-
StreamMaterialize { columns: [count, t.a(hidden), t.b(hidden)], stream_key: [t.a, t.b], pk_columns: [t.a, t.b], pk_conflict: NoCheck }
└─StreamProject { exprs: [count, t.a, t.b] }
└─StreamHashAgg { group_key: [t.a, t.b], aggs: [count] }
└─StreamLocalityProvider { locality_columns: [t.a, t.b] }
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(t.b, t.a) }
└─StreamTableScan { table: t, columns: [t.a, t.b], stream_scan_type: ArrangementBackfill, stream_key: [t.b, t.a], pk: [b, a], dist: UpstreamHashShard(t.b, t.a) }
78 changes: 78 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/locality_provider.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2025 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use pretty_xmlish::Pretty;
use risingwave_common::catalog::{FieldDisplay, Schema};

use super::{GenericPlanNode, GenericPlanRef, impl_distill_unit_from_fields};
use crate::expr::ExprRewriter;
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;

/// `LocalityProvider` provides locality for operators during backfilling.
/// It buffers input data into a state table using locality columns as primary key prefix.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct LocalityProvider<PlanRef> {
pub input: PlanRef,
/// Columns that define the locality
pub locality_columns: Vec<usize>,
}

impl<PlanRef: GenericPlanRef> LocalityProvider<PlanRef> {
pub fn new(input: PlanRef, locality_columns: Vec<usize>) -> Self {
Self {
input,
locality_columns,
}
}

pub fn fields_pretty<'a>(&self) -> Vec<(&'a str, Pretty<'a>)> {
let locality_columns_display = self
.locality_columns
.iter()
.map(|&i| Pretty::display(&FieldDisplay(self.input.schema().fields.get(i).unwrap())))
.collect();
vec![("locality_columns", Pretty::Array(locality_columns_display))]
}
}

impl<PlanRef: GenericPlanRef> GenericPlanNode for LocalityProvider<PlanRef> {
fn schema(&self) -> Schema {
self.input.schema().clone()
}

fn stream_key(&self) -> Option<Vec<usize>> {
Some(self.input.stream_key()?.to_vec())
}

fn ctx(&self) -> OptimizerContextRef {
self.input.ctx()
}

fn functional_dependency(&self) -> FunctionalDependencySet {
self.input.functional_dependency().clone()
}
}

impl<PlanRef: GenericPlanRef> LocalityProvider<PlanRef> {
pub fn rewrite_exprs(&mut self, _r: &mut dyn ExprRewriter) {
// LocalityProvider doesn't contain expressions to rewrite
}

pub fn visit_exprs(&self, _v: &mut dyn crate::expr::ExprVisitor) {
// LocalityProvider doesn't contain expressions to visit
}
}

impl_distill_unit_from_fields!(LocalityProvider, GenericPlanRef);
3 changes: 3 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ pub use postgres_query::*;
mod mysql_query;
pub use mysql_query::*;

mod locality_provider;
pub use locality_provider::*;

pub trait DistillUnit {
fn distill_with_name<'a>(&self, name: impl Into<Cow<'a, str>>) -> XmlNode<'a>;
}
Expand Down
12 changes: 8 additions & 4 deletions src/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1416,10 +1416,14 @@ impl ToStream for LogicalAgg {
use super::stream::prelude::*;

let eowc = ctx.emit_on_window_close();
let input = self
.input()
.try_better_locality(&self.group_key().to_vec())
.unwrap_or_else(|| self.input());
let input = if self.group_key().is_empty() {
self.input()
} else {
self.input()
.try_better_locality(&self.group_key().to_vec())
.unwrap_or_else(|| self.input())
};

let stream_input = input.to_stream(ctx)?;

// Use Dedup operator, if possible.
Expand Down
54 changes: 35 additions & 19 deletions src/frontend/src/optimizer/plan_node/logical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ use super::generic::{
};
use super::utils::{Distill, childless_record};
use super::{
BatchPlanRef, ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef, PlanBase,
PlanTreeNodeBinary, PredicatePushdown, StreamHashJoin, StreamPlanRef, StreamProject, ToBatch,
ToStream, generic,
BatchPlanRef, ColPrunable, ExprRewritable, Logical, LogicalLocalityProvider,
LogicalPlanRef as PlanRef, PlanBase, PlanTreeNodeBinary, PredicatePushdown, StreamHashJoin,
StreamPlanRef, StreamProject, ToBatch, ToStream, generic,
};
use crate::error::{ErrorCode, Result, RwError};
use crate::expr::{CollectInputRef, Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor, InputRef};
Expand Down Expand Up @@ -1541,6 +1541,26 @@ impl LogicalJoin {
.into()),
}
}

fn try_better_locality_inner(&self, columns: &[usize]) -> Option<PlanRef> {
let mut ctx = ToStreamContext::new(false);
// only pass through the locality information if it can be converted to dynamic filter
if let Ok(Some(_)) = self.to_stream_dynamic_filter(self.on().clone(), &mut ctx) {
// since dynamic filter only supports left input ref in the output indices, we can safely use o2i mapping to convert the required columns.
let o2i_mapping = self.core.o2i_col_mapping();
let left_input_columns = columns
.iter()
.map(|&col| o2i_mapping.try_map(col))
.collect::<Option<Vec<usize>>>()?;
if let Some(better_left_plan) = self.left().try_better_locality(&left_input_columns) {
return Some(
self.clone_with_left_right(better_left_plan, self.right())
.into(),
);
}
}
None
}
}

impl ToBatch for LogicalJoin {
Expand Down Expand Up @@ -1754,23 +1774,19 @@ impl ToStream for LogicalJoin {
}

fn try_better_locality(&self, columns: &[usize]) -> Option<PlanRef> {
let mut ctx = ToStreamContext::new(false);
// only pass through the locality information if it can be converted to dynamic filter
if let Ok(Some(_)) = self.to_stream_dynamic_filter(self.on().clone(), &mut ctx) {
// since dynamic filter only supports left input ref in the output indices, we can safely use o2i mapping to convert the required columns.
let o2i_mapping = self.core.o2i_col_mapping();
let left_input_columns = columns
.iter()
.map(|&col| o2i_mapping.try_map(col))
.collect::<Option<Vec<usize>>>()?;
if let Some(better_left_plan) = self.left().try_better_locality(&left_input_columns) {
return Some(
self.clone_with_left_right(better_left_plan, self.right())
.into(),
);
}
if let Some(better_plan) = self.try_better_locality_inner(columns) {
Some(better_plan)
} else if self.ctx().session_ctx().config().enable_locality_backfill() {
Some(
LogicalLocalityProvider::new(
self.clone_with_left_right(self.left(), self.right()).into(),
columns.to_owned(),
)
.into(),
)
Comment on lines +1780 to +1786
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized that we generate a LocalityProvider only until we encounter an upstream with different locality. If I'm understanding correctly, we will generate a plan like

Join -> LocalityProvider -> Filter -> Project -> Agg

instead of

Join -> Filter -> Project -> LocalityProvider -> Agg

Does it mean that we need to buffer unnecessary data that could have been filtered out by those stateless executors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good insight. To resolve this issue, I think we should add the LocalityProvider to the requirement side instead of the provider side.

} else {
None
}
None
}
}

Expand Down
Loading
Loading