Skip to content

Commit 91bcc9b

Browse files
committed
support enforcing locality on the reqirement side
1 parent a49a912 commit 91bcc9b

File tree

9 files changed

+227
-120
lines changed

9 files changed

+227
-120
lines changed

src/frontend/planner_test/tests/testdata/input/locality_backfill.yaml

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,50 @@
1515
set enable_locality_backfill = true;
1616
create table t (a int, b int, c int, primary key (b, a));
1717
select count(*) from t group by a, b;
18+
expected_outputs:
19+
- stream_plan
20+
- sql: |
21+
set enable_locality_backfill = true;
22+
create table t (a int, b int, c int);
23+
select count(*) from t where c > 1 group by a, b;
24+
expected_outputs:
25+
- stream_plan
26+
- sql: |
27+
set enable_locality_backfill = true;
28+
create table t1 (a int, b int, c int);
29+
create table t2 (a int, b int, c int);
30+
select count(*) from t1 join t2 on t1.a = t2.a where t1.c > t2.c group by t1.b;
31+
expected_outputs:
32+
- stream_plan
33+
- sql: |
34+
set enable_locality_backfill = true;
35+
create table t (a int, b int, c int);
36+
select RANK() OVER (PARTITION BY a ORDER BY b) as rank from t;
37+
expected_outputs:
38+
- stream_plan
39+
- name: enforce locality for temporal join for both sides.
40+
sql: |
41+
set enable_locality_backfill = true;
42+
create table stream(id1 int, a1 int, b1 int);
43+
create table version(id2 int, a2 int, b2 int, primary key (id2));
44+
create index idx2 on version (a2, b2);
45+
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2;
46+
expected_outputs:
47+
- stream_plan
48+
- sql: |
49+
set enable_locality_backfill = true;
50+
create table t(a int, b int, c int) append only;
51+
select distinct on(a) * from t ;
52+
expected_outputs:
53+
- stream_plan
54+
- sql: |
55+
set enable_locality_backfill = true;
56+
create table t(a int, b int, c int);
57+
SELECT * FROM (
58+
SELECT
59+
*,
60+
row_number() OVER (PARTITION BY a ORDER BY b) AS rank
61+
FROM t
62+
) WHERE rank <= 1;
1863
expected_outputs:
1964
- stream_plan

src/frontend/planner_test/tests/testdata/output/locality_backfill.yaml

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,95 @@
4141
└─StreamLocalityProvider { locality_columns: [t.a, t.b] }
4242
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(t.b, t.a) }
4343
└─StreamTableScan { table: t, columns: [t.a, t.b], stream_scan_type: ArrangementBackfill, stream_key: [t.b, t.a], pk: [b, a], dist: UpstreamHashShard(t.b, t.a) }
44+
- sql: |
45+
set enable_locality_backfill = true;
46+
create table t (a int, b int, c int);
47+
select count(*) from t where c > 1 group by a, b;
48+
stream_plan: |-
49+
StreamMaterialize { columns: [count, t.a(hidden), t.b(hidden)], stream_key: [t.a, t.b], pk_columns: [t.a, t.b], pk_conflict: NoCheck }
50+
└─StreamProject { exprs: [count, t.a, t.b] }
51+
└─StreamHashAgg { group_key: [t.a, t.b], aggs: [count] }
52+
└─StreamLocalityProvider { locality_columns: [t.a, t.b] }
53+
└─StreamExchange { dist: HashShard(t.a, t.b) }
54+
└─StreamProject { exprs: [t.a, t.b, t._row_id] }
55+
└─StreamFilter { predicate: (t.c > 1:Int32) }
56+
└─StreamTableScan { table: t, columns: [t.a, t.b, t._row_id, t.c], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
57+
- sql: |
58+
set enable_locality_backfill = true;
59+
create table t1 (a int, b int, c int);
60+
create table t2 (a int, b int, c int);
61+
select count(*) from t1 join t2 on t1.a = t2.a where t1.c > t2.c group by t1.b;
62+
stream_plan: |-
63+
StreamMaterialize { columns: [count, t1.b(hidden)], stream_key: [t1.b], pk_columns: [t1.b], pk_conflict: NoCheck }
64+
└─StreamProject { exprs: [count, t1.b] }
65+
└─StreamHashAgg { group_key: [t1.b], aggs: [count] }
66+
└─StreamLocalityProvider { locality_columns: [t1.b] }
67+
└─StreamExchange { dist: HashShard(t1.b) }
68+
└─StreamProject { exprs: [t1.b, t1._row_id, t1.a, t2._row_id] }
69+
└─StreamFilter { predicate: (t1.c > t2.c) }
70+
└─StreamHashJoin { type: Inner, predicate: t1.a = t2.a, output: all }
71+
├─StreamExchange { dist: HashShard(t1.a) }
72+
│ └─StreamLocalityProvider { locality_columns: [t1.a] }
73+
│ └─StreamExchange { dist: HashShard(t1.a) }
74+
│ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1.c, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
75+
└─StreamExchange { dist: HashShard(t2.a) }
76+
└─StreamLocalityProvider { locality_columns: [t2.a] }
77+
└─StreamExchange { dist: HashShard(t2.a) }
78+
└─StreamTableScan { table: t2, columns: [t2.a, t2.c, t2._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t2._row_id], pk: [_row_id], dist: UpstreamHashShard(t2._row_id) }
79+
- sql: |
80+
set enable_locality_backfill = true;
81+
create table t (a int, b int, c int);
82+
select RANK() OVER (PARTITION BY a ORDER BY b) as rank from t;
83+
stream_plan: |-
84+
StreamMaterialize { columns: [rank, t._row_id(hidden), t.a(hidden)], stream_key: [t._row_id, t.a], pk_columns: [t._row_id, t.a], pk_conflict: NoCheck }
85+
└─StreamProject { exprs: [rank, t._row_id, t.a] }
86+
└─StreamOverWindow { window_functions: [rank() OVER(PARTITION BY t.a ORDER BY t.b ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
87+
└─StreamLocalityProvider { locality_columns: [t.a] }
88+
└─StreamExchange { dist: HashShard(t.a) }
89+
└─StreamTableScan { table: t, columns: [t.a, t.b, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
90+
- name: enforce locality for temporal join for both sides.
91+
sql: |
92+
set enable_locality_backfill = true;
93+
create table stream(id1 int, a1 int, b1 int);
94+
create table version(id2 int, a2 int, b2 int, primary key (id2));
95+
create index idx2 on version (a2, b2);
96+
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2;
97+
stream_plan: |-
98+
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, a1, stream.b1], pk_columns: [stream._row_id, id2, a1, stream.b1], pk_conflict: NoCheck }
99+
└─StreamExchange { dist: HashShard(stream.a1, idx2.id2, stream._row_id, stream.b1) }
100+
└─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, nested_loop: false, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] }
101+
├─StreamExchange { dist: HashShard(stream.a1) }
102+
│ └─StreamLocalityProvider { locality_columns: [stream.a1, stream.b1] }
103+
│ └─StreamExchange { dist: HashShard(stream.a1, stream.b1) }
104+
│ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) }
105+
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx2.a2) }
106+
└─StreamTableScan { table: idx2, columns: [idx2.id2, idx2.a2, idx2.b2], stream_scan_type: UpstreamOnly, stream_key: [idx2.id2], pk: [a2, b2, id2], dist: UpstreamHashShard(idx2.a2) }
107+
- sql: |
108+
set enable_locality_backfill = true;
109+
create table t(a int, b int, c int) append only;
110+
select distinct on(a) * from t ;
111+
stream_plan: |-
112+
StreamMaterialize { columns: [a, b, c, t._row_id(hidden)], stream_key: [a], pk_columns: [a], pk_conflict: NoCheck }
113+
└─StreamAppendOnlyDedup { dedup_cols: [t.a] }
114+
└─StreamExchange { dist: HashShard(t.a) }
115+
└─StreamLocalityProvider { locality_columns: [t.a] }
116+
└─StreamExchange { dist: HashShard(t.a) }
117+
└─StreamTableScan { table: t, columns: [t.a, t.b, t.c, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
118+
- sql: |
119+
set enable_locality_backfill = true;
120+
create table t(a int, b int, c int);
121+
SELECT * FROM (
122+
SELECT
123+
*,
124+
row_number() OVER (PARTITION BY a ORDER BY b) AS rank
125+
FROM t
126+
) WHERE rank <= 1;
127+
stream_plan: |-
128+
StreamMaterialize { columns: [a, b, c, t._row_id(hidden), rank], stream_key: [a], pk_columns: [a], pk_conflict: NoCheck }
129+
└─StreamOverWindow { window_functions: [row_number() OVER(PARTITION BY t.a ORDER BY t.b ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
130+
└─StreamLocalityProvider { locality_columns: [t.a] }
131+
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(t.a) }
132+
└─StreamGroupTopN { order: [t.b ASC], limit: 1, offset: 0, group_key: [t.a] }
133+
└─StreamLocalityProvider { locality_columns: [t.a] }
134+
└─StreamExchange { dist: HashShard(t.a) }
135+
└─StreamTableScan { table: t, columns: [t.a, t.b, t.c, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }

src/frontend/src/optimizer/plan_node/convert.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,22 @@ pub trait ToStream {
6161
}
6262
}
6363

64+
/// Try to enforce the locality requirement on the given columns.
65+
/// If a better plan can be found, return the better plan.
66+
/// If no better plan can be found, and locality backfill is enabled, wrap the plan
67+
/// with `LogicalLocalityProvider`.
68+
/// Otherwise, return the plan as is.
69+
pub fn try_enforce_locality_requirement(plan: LogicalPlanRef, columns: &[usize]) -> LogicalPlanRef {
70+
assert!(!columns.is_empty());
71+
if let Some(better_plan) = plan.try_better_locality(columns) {
72+
better_plan
73+
} else if plan.ctx().session_ctx().config().enable_locality_backfill() {
74+
LogicalLocalityProvider::new(plan, columns.to_owned()).into()
75+
} else {
76+
plan
77+
}
78+
}
79+
6480
pub fn stream_enforce_eowc_requirement(
6581
ctx: OptimizerContextRef,
6682
plan: StreamPlanRef,

src/frontend/src/optimizer/plan_node/logical_agg.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use super::{
2525
BatchHashAgg, BatchSimpleAgg, ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef,
2626
PlanBase, PlanTreeNodeUnary, PredicatePushdown, StreamHashAgg, StreamPlanRef, StreamProject,
2727
StreamShare, StreamSimpleAgg, StreamStatelessSimpleAgg, ToBatch, ToStream,
28+
try_enforce_locality_requirement,
2829
};
2930
use crate::error::{ErrorCode, Result, RwError};
3031
use crate::expr::{
@@ -1419,9 +1420,7 @@ impl ToStream for LogicalAgg {
14191420
let input = if self.group_key().is_empty() {
14201421
self.input()
14211422
} else {
1422-
self.input()
1423-
.try_better_locality(&self.group_key().to_vec())
1424-
.unwrap_or_else(|| self.input())
1423+
try_enforce_locality_requirement(self.input(), &self.group_key().to_vec())
14251424
};
14261425

14271426
let stream_input = input.to_stream(ctx)?;

src/frontend/src/optimizer/plan_node/logical_dedup.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use super::{
2222
BatchGroupTopN, BatchPlanRef, ColPrunable, ColumnPruningContext, ExprRewritable, Logical,
2323
LogicalPlanRef as PlanRef, LogicalProject, PlanBase, PlanTreeNodeUnary, PredicatePushdown,
2424
PredicatePushdownContext, RewriteStreamContext, StreamDedup, StreamGroupTopN, ToBatch,
25-
ToStream, ToStreamContext, gen_filter_and_pushdown, generic,
25+
ToStream, ToStreamContext, gen_filter_and_pushdown, generic, try_enforce_locality_requirement,
2626
};
2727
use crate::error::Result;
2828
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
@@ -104,10 +104,7 @@ impl ToStream for LogicalDedup {
104104
) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
105105
use super::stream::prelude::*;
106106

107-
let logical_input = self
108-
.input()
109-
.try_better_locality(self.dedup_cols())
110-
.unwrap_or_else(|| self.input());
107+
let logical_input = try_enforce_locality_requirement(self.input(), self.dedup_cols());
111108
let input = logical_input.to_stream(ctx)?;
112109
let input = RequiredDist::hash_shard(self.dedup_cols())
113110
.streaming_enforce_if_not_satisfies(input)?;

src/frontend/src/optimizer/plan_node/logical_join.rs

Lines changed: 22 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ use super::generic::{
2828
};
2929
use super::utils::{Distill, childless_record};
3030
use super::{
31-
BatchPlanRef, ColPrunable, ExprRewritable, Logical, LogicalLocalityProvider,
32-
LogicalPlanRef as PlanRef, PlanBase, PlanTreeNodeBinary, PredicatePushdown, StreamHashJoin,
33-
StreamPlanRef, StreamProject, ToBatch, ToStream, generic,
31+
BatchPlanRef, ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef, PlanBase,
32+
PlanTreeNodeBinary, PredicatePushdown, StreamHashJoin, StreamPlanRef, StreamProject, ToBatch,
33+
ToStream, generic, try_enforce_locality_requirement,
3434
};
3535
use crate::error::{ErrorCode, Result, RwError};
3636
use crate::expr::{CollectInputRef, Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor, InputRef};
@@ -903,19 +903,12 @@ impl LogicalJoin {
903903
let lhs_join_key_idx = self.eq_indexes().into_iter().map(|(l, _)| l).collect_vec();
904904
let rhs_join_key_idx = self.eq_indexes().into_iter().map(|(_, r)| r).collect_vec();
905905

906-
let logical_right = self
907-
.right()
908-
.try_better_locality(&rhs_join_key_idx)
909-
.unwrap_or_else(|| self.right());
906+
let logical_right = try_enforce_locality_requirement(self.right(), &rhs_join_key_idx);
910907
let mut right = logical_right.to_stream_with_dist_required(
911908
&RequiredDist::shard_by_key(self.right().schema().len(), &predicate.right_eq_indexes()),
912909
ctx,
913910
)?;
914-
let logical_left = self
915-
.left()
916-
.try_better_locality(&lhs_join_key_idx)
917-
.unwrap_or_else(|| self.left());
918-
911+
let logical_left = try_enforce_locality_requirement(self.left(), &lhs_join_key_idx);
919912
let r2l =
920913
predicate.r2l_eq_columns_mapping(logical_left.schema().len(), right.schema().len());
921914
let l2r =
@@ -1255,10 +1248,7 @@ impl LogicalJoin {
12551248
.into_iter()
12561249
.map(|(l, _)| l)
12571250
.collect_vec();
1258-
let logical_left = self
1259-
.left()
1260-
.try_better_locality(&lhs_join_key_idx)
1261-
.unwrap_or_else(|| self.left());
1251+
let logical_left = try_enforce_locality_requirement(self.left(), &lhs_join_key_idx);
12621252
let left = logical_left.to_stream(ctx)?;
12631253
// Enforce a shuffle for the temporal join LHS to let the scheduler be able to schedule the join fragment together with the RHS with a `no_shuffle` exchange.
12641254
let left = required_dist.stream_enforce(left);
@@ -1541,26 +1531,6 @@ impl LogicalJoin {
15411531
.into()),
15421532
}
15431533
}
1544-
1545-
fn try_better_locality_inner(&self, columns: &[usize]) -> Option<PlanRef> {
1546-
let mut ctx = ToStreamContext::new(false);
1547-
// only pass through the locality information if it can be converted to dynamic filter
1548-
if let Ok(Some(_)) = self.to_stream_dynamic_filter(self.on().clone(), &mut ctx) {
1549-
// since dynamic filter only supports left input ref in the output indices, we can safely use o2i mapping to convert the required columns.
1550-
let o2i_mapping = self.core.o2i_col_mapping();
1551-
let left_input_columns = columns
1552-
.iter()
1553-
.map(|&col| o2i_mapping.try_map(col))
1554-
.collect::<Option<Vec<usize>>>()?;
1555-
if let Some(better_left_plan) = self.left().try_better_locality(&left_input_columns) {
1556-
return Some(
1557-
self.clone_with_left_right(better_left_plan, self.right())
1558-
.into(),
1559-
);
1560-
}
1561-
}
1562-
None
1563-
}
15641534
}
15651535

15661536
impl ToBatch for LogicalJoin {
@@ -1774,19 +1744,23 @@ impl ToStream for LogicalJoin {
17741744
}
17751745

17761746
fn try_better_locality(&self, columns: &[usize]) -> Option<PlanRef> {
1777-
if let Some(better_plan) = self.try_better_locality_inner(columns) {
1778-
Some(better_plan)
1779-
} else if self.ctx().session_ctx().config().enable_locality_backfill() {
1780-
Some(
1781-
LogicalLocalityProvider::new(
1782-
self.clone_with_left_right(self.left(), self.right()).into(),
1783-
columns.to_owned(),
1784-
)
1785-
.into(),
1786-
)
1787-
} else {
1788-
None
1747+
let mut ctx = ToStreamContext::new(false);
1748+
// only pass through the locality information if it can be converted to dynamic filter
1749+
if let Ok(Some(_)) = self.to_stream_dynamic_filter(self.on().clone(), &mut ctx) {
1750+
// since dynamic filter only supports left input ref in the output indices, we can safely use o2i mapping to convert the required columns.
1751+
let o2i_mapping = self.core.o2i_col_mapping();
1752+
let left_input_columns = columns
1753+
.iter()
1754+
.map(|&col| o2i_mapping.try_map(col))
1755+
.collect::<Option<Vec<usize>>>()?;
1756+
if let Some(better_left_plan) = self.left().try_better_locality(&left_input_columns) {
1757+
return Some(
1758+
self.clone_with_left_right(better_left_plan, self.right())
1759+
.into(),
1760+
);
1761+
}
17891762
}
1763+
None
17901764
}
17911765
}
17921766

src/frontend/src/optimizer/plan_node/logical_over_window.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use super::{
2626
BatchOverWindow, ColPrunable, ExprRewritable, Logical, LogicalFilter,
2727
LogicalPlanRef as PlanRef, LogicalProject, PlanBase, PlanTreeNodeUnary, PredicatePushdown,
2828
StreamEowcOverWindow, StreamEowcSort, StreamOverWindow, ToBatch, ToStream,
29-
gen_filter_and_pushdown,
29+
gen_filter_and_pushdown, try_enforce_locality_requirement,
3030
};
3131
use crate::error::{ErrorCode, Result, RwError};
3232
use crate::expr::{
@@ -670,11 +670,8 @@ impl ToStream for LogicalOverWindow {
670670
if partition_key_indices.is_empty() {
671671
empty_partition_by_not_implemented!();
672672
}
673-
let input = self
674-
.core
675-
.input
676-
.try_better_locality(&partition_key_indices)
677-
.unwrap_or_else(|| self.core.input.clone());
673+
674+
let input = try_enforce_locality_requirement(self.input(), &partition_key_indices);
678675
let stream_input = input.to_stream(ctx)?;
679676

680677
if ctx.emit_on_window_close() {

0 commit comments

Comments
 (0)