Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,50 @@
set enable_locality_backfill = true;
create table t (a int, b int, c int, primary key (b, a));
select count(*) from t group by a, b;
expected_outputs:
- stream_plan
- sql: |
set enable_locality_backfill = true;
create table t (a int, b int, c int);
select count(*) from t where c > 1 group by a, b;
expected_outputs:
- stream_plan
- sql: |
set enable_locality_backfill = true;
create table t1 (a int, b int, c int);
create table t2 (a int, b int, c int);
select count(*) from t1 join t2 on t1.a = t2.a where t1.c > t2.c group by t1.b;
expected_outputs:
- stream_plan
- sql: |
set enable_locality_backfill = true;
create table t (a int, b int, c int);
select RANK() OVER (PARTITION BY a ORDER BY b) as rank from t;
expected_outputs:
- stream_plan
- name: enforce locality for temporal join for both sides.
sql: |
set enable_locality_backfill = true;
create table stream(id1 int, a1 int, b1 int);
create table version(id2 int, a2 int, b2 int, primary key (id2));
create index idx2 on version (a2, b2);
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2;
expected_outputs:
- stream_plan
- sql: |
set enable_locality_backfill = true;
create table t(a int, b int, c int) append only;
select distinct on(a) * from t ;
expected_outputs:
- stream_plan
- sql: |
set enable_locality_backfill = true;
create table t(a int, b int, c int);
SELECT * FROM (
SELECT
*,
row_number() OVER (PARTITION BY a ORDER BY b) AS rank
FROM t
) WHERE rank <= 1;
expected_outputs:
- stream_plan
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,95 @@
└─StreamLocalityProvider { locality_columns: [t.a, t.b] }
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(t.b, t.a) }
└─StreamTableScan { table: t, columns: [t.a, t.b], stream_scan_type: ArrangementBackfill, stream_key: [t.b, t.a], pk: [b, a], dist: UpstreamHashShard(t.b, t.a) }
- sql: |
set enable_locality_backfill = true;
create table t (a int, b int, c int);
select count(*) from t where c > 1 group by a, b;
stream_plan: |-
StreamMaterialize { columns: [count, t.a(hidden), t.b(hidden)], stream_key: [t.a, t.b], pk_columns: [t.a, t.b], pk_conflict: NoCheck }
└─StreamProject { exprs: [count, t.a, t.b] }
└─StreamHashAgg { group_key: [t.a, t.b], aggs: [count] }
└─StreamLocalityProvider { locality_columns: [t.a, t.b] }
└─StreamExchange { dist: HashShard(t.a, t.b) }
└─StreamProject { exprs: [t.a, t.b, t._row_id] }
└─StreamFilter { predicate: (t.c > 1:Int32) }
└─StreamTableScan { table: t, columns: [t.a, t.b, t._row_id, t.c], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
- sql: |
set enable_locality_backfill = true;
create table t1 (a int, b int, c int);
create table t2 (a int, b int, c int);
select count(*) from t1 join t2 on t1.a = t2.a where t1.c > t2.c group by t1.b;
stream_plan: |-
StreamMaterialize { columns: [count, t1.b(hidden)], stream_key: [t1.b], pk_columns: [t1.b], pk_conflict: NoCheck }
└─StreamProject { exprs: [count, t1.b] }
└─StreamHashAgg { group_key: [t1.b], aggs: [count] }
└─StreamLocalityProvider { locality_columns: [t1.b] }
└─StreamExchange { dist: HashShard(t1.b) }
└─StreamProject { exprs: [t1.b, t1._row_id, t1.a, t2._row_id] }
└─StreamFilter { predicate: (t1.c > t2.c) }
└─StreamHashJoin { type: Inner, predicate: t1.a = t2.a, output: all }
├─StreamExchange { dist: HashShard(t1.a) }
│ └─StreamLocalityProvider { locality_columns: [t1.a] }
│ └─StreamExchange { dist: HashShard(t1.a) }
│ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1.c, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(t2.a) }
└─StreamLocalityProvider { locality_columns: [t2.a] }
└─StreamExchange { dist: HashShard(t2.a) }
└─StreamTableScan { table: t2, columns: [t2.a, t2.c, t2._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t2._row_id], pk: [_row_id], dist: UpstreamHashShard(t2._row_id) }
- sql: |
set enable_locality_backfill = true;
create table t (a int, b int, c int);
select RANK() OVER (PARTITION BY a ORDER BY b) as rank from t;
stream_plan: |-
StreamMaterialize { columns: [rank, t._row_id(hidden), t.a(hidden)], stream_key: [t._row_id, t.a], pk_columns: [t._row_id, t.a], pk_conflict: NoCheck }
└─StreamProject { exprs: [rank, t._row_id, t.a] }
└─StreamOverWindow { window_functions: [rank() OVER(PARTITION BY t.a ORDER BY t.b ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─StreamLocalityProvider { locality_columns: [t.a] }
└─StreamExchange { dist: HashShard(t.a) }
└─StreamTableScan { table: t, columns: [t.a, t.b, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
- name: enforce locality for temporal join for both sides.
sql: |
set enable_locality_backfill = true;
create table stream(id1 int, a1 int, b1 int);
create table version(id2 int, a2 int, b2 int, primary key (id2));
create index idx2 on version (a2, b2);
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2;
stream_plan: |-
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, a1, stream.b1], pk_columns: [stream._row_id, id2, a1, stream.b1], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(stream.a1, idx2.id2, stream._row_id, stream.b1) }
└─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, nested_loop: false, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] }
├─StreamExchange { dist: HashShard(stream.a1) }
│ └─StreamLocalityProvider { locality_columns: [stream.a1, stream.b1] }
│ └─StreamExchange { dist: HashShard(stream.a1, stream.b1) }
│ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) }
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx2.a2) }
└─StreamTableScan { table: idx2, columns: [idx2.id2, idx2.a2, idx2.b2], stream_scan_type: UpstreamOnly, stream_key: [idx2.id2], pk: [a2, b2, id2], dist: UpstreamHashShard(idx2.a2) }
- sql: |
set enable_locality_backfill = true;
create table t(a int, b int, c int) append only;
select distinct on(a) * from t ;
stream_plan: |-
StreamMaterialize { columns: [a, b, c, t._row_id(hidden)], stream_key: [a], pk_columns: [a], pk_conflict: NoCheck }
└─StreamAppendOnlyDedup { dedup_cols: [t.a] }
└─StreamExchange { dist: HashShard(t.a) }
└─StreamLocalityProvider { locality_columns: [t.a] }
└─StreamExchange { dist: HashShard(t.a) }
└─StreamTableScan { table: t, columns: [t.a, t.b, t.c, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
- sql: |
set enable_locality_backfill = true;
create table t(a int, b int, c int);
SELECT * FROM (
SELECT
*,
row_number() OVER (PARTITION BY a ORDER BY b) AS rank
FROM t
) WHERE rank <= 1;
stream_plan: |-
StreamMaterialize { columns: [a, b, c, t._row_id(hidden), rank], stream_key: [a], pk_columns: [a], pk_conflict: NoCheck }
└─StreamOverWindow { window_functions: [row_number() OVER(PARTITION BY t.a ORDER BY t.b ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─StreamLocalityProvider { locality_columns: [t.a] }
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(t.a) }
└─StreamGroupTopN { order: [t.b ASC], limit: 1, offset: 0, group_key: [t.a] }
└─StreamLocalityProvider { locality_columns: [t.a] }
└─StreamExchange { dist: HashShard(t.a) }
└─StreamTableScan { table: t, columns: [t.a, t.b, t.c, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
16 changes: 16 additions & 0 deletions src/frontend/src/optimizer/plan_node/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,22 @@ pub trait ToStream {
}
}

/// Try to enforce the locality requirement on the given columns.
/// If a better plan can be found, return the better plan.
/// If no better plan can be found, and locality backfill is enabled, wrap the plan
/// with `LogicalLocalityProvider`.
/// Otherwise, return the plan as is.
pub fn try_enforce_locality_requirement(plan: LogicalPlanRef, columns: &[usize]) -> LogicalPlanRef {
assert!(!columns.is_empty());
if let Some(better_plan) = plan.try_better_locality(columns) {
better_plan
} else if plan.ctx().session_ctx().config().enable_locality_backfill() {
LogicalLocalityProvider::new(plan, columns.to_owned()).into()
} else {
plan
}
}

pub fn stream_enforce_eowc_requirement(
ctx: OptimizerContextRef,
plan: StreamPlanRef,
Expand Down
5 changes: 2 additions & 3 deletions src/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use super::{
BatchHashAgg, BatchSimpleAgg, ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef,
PlanBase, PlanTreeNodeUnary, PredicatePushdown, StreamHashAgg, StreamPlanRef, StreamProject,
StreamShare, StreamSimpleAgg, StreamStatelessSimpleAgg, ToBatch, ToStream,
try_enforce_locality_requirement,
};
use crate::error::{ErrorCode, Result, RwError};
use crate::expr::{
Expand Down Expand Up @@ -1419,9 +1420,7 @@ impl ToStream for LogicalAgg {
let input = if self.group_key().is_empty() {
self.input()
} else {
self.input()
.try_better_locality(&self.group_key().to_vec())
.unwrap_or_else(|| self.input())
try_enforce_locality_requirement(self.input(), &self.group_key().to_vec())
};

let stream_input = input.to_stream(ctx)?;
Expand Down
7 changes: 2 additions & 5 deletions src/frontend/src/optimizer/plan_node/logical_dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use super::{
BatchGroupTopN, BatchPlanRef, ColPrunable, ColumnPruningContext, ExprRewritable, Logical,
LogicalPlanRef as PlanRef, LogicalProject, PlanBase, PlanTreeNodeUnary, PredicatePushdown,
PredicatePushdownContext, RewriteStreamContext, StreamDedup, StreamGroupTopN, ToBatch,
ToStream, ToStreamContext, gen_filter_and_pushdown, generic,
ToStream, ToStreamContext, gen_filter_and_pushdown, generic, try_enforce_locality_requirement,
};
use crate::error::Result;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
Expand Down Expand Up @@ -104,10 +104,7 @@ impl ToStream for LogicalDedup {
) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
use super::stream::prelude::*;

let logical_input = self
.input()
.try_better_locality(self.dedup_cols())
.unwrap_or_else(|| self.input());
let logical_input = try_enforce_locality_requirement(self.input(), self.dedup_cols());
let input = logical_input.to_stream(ctx)?;
let input = RequiredDist::hash_shard(self.dedup_cols())
.streaming_enforce_if_not_satisfies(input)?;
Expand Down
70 changes: 22 additions & 48 deletions src/frontend/src/optimizer/plan_node/logical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ use super::generic::{
};
use super::utils::{Distill, childless_record};
use super::{
BatchPlanRef, ColPrunable, ExprRewritable, Logical, LogicalLocalityProvider,
LogicalPlanRef as PlanRef, PlanBase, PlanTreeNodeBinary, PredicatePushdown, StreamHashJoin,
StreamPlanRef, StreamProject, ToBatch, ToStream, generic,
BatchPlanRef, ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef, PlanBase,
PlanTreeNodeBinary, PredicatePushdown, StreamHashJoin, StreamPlanRef, StreamProject, ToBatch,
ToStream, generic, try_enforce_locality_requirement,
};
use crate::error::{ErrorCode, Result, RwError};
use crate::expr::{CollectInputRef, Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor, InputRef};
Expand Down Expand Up @@ -903,19 +903,12 @@ impl LogicalJoin {
let lhs_join_key_idx = self.eq_indexes().into_iter().map(|(l, _)| l).collect_vec();
let rhs_join_key_idx = self.eq_indexes().into_iter().map(|(_, r)| r).collect_vec();

let logical_right = self
.right()
.try_better_locality(&rhs_join_key_idx)
.unwrap_or_else(|| self.right());
let logical_right = try_enforce_locality_requirement(self.right(), &rhs_join_key_idx);
let mut right = logical_right.to_stream_with_dist_required(
&RequiredDist::shard_by_key(self.right().schema().len(), &predicate.right_eq_indexes()),
ctx,
)?;
let logical_left = self
.left()
.try_better_locality(&lhs_join_key_idx)
.unwrap_or_else(|| self.left());

let logical_left = try_enforce_locality_requirement(self.left(), &lhs_join_key_idx);
let r2l =
predicate.r2l_eq_columns_mapping(logical_left.schema().len(), right.schema().len());
let l2r =
Expand Down Expand Up @@ -1255,10 +1248,7 @@ impl LogicalJoin {
.into_iter()
.map(|(l, _)| l)
.collect_vec();
let logical_left = self
.left()
.try_better_locality(&lhs_join_key_idx)
.unwrap_or_else(|| self.left());
let logical_left = try_enforce_locality_requirement(self.left(), &lhs_join_key_idx);
let left = logical_left.to_stream(ctx)?;
// Enforce a shuffle for the temporal join LHS to let the scheduler be able to schedule the join fragment together with the RHS with a `no_shuffle` exchange.
let left = required_dist.stream_enforce(left);
Expand Down Expand Up @@ -1541,26 +1531,6 @@ impl LogicalJoin {
.into()),
}
}

fn try_better_locality_inner(&self, columns: &[usize]) -> Option<PlanRef> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just move the inner function to the outside, no changes.

let mut ctx = ToStreamContext::new(false);
// only pass through the locality information if it can be converted to dynamic filter
if let Ok(Some(_)) = self.to_stream_dynamic_filter(self.on().clone(), &mut ctx) {
// since dynamic filter only supports left input ref in the output indices, we can safely use o2i mapping to convert the required columns.
let o2i_mapping = self.core.o2i_col_mapping();
let left_input_columns = columns
.iter()
.map(|&col| o2i_mapping.try_map(col))
.collect::<Option<Vec<usize>>>()?;
if let Some(better_left_plan) = self.left().try_better_locality(&left_input_columns) {
return Some(
self.clone_with_left_right(better_left_plan, self.right())
.into(),
);
}
}
None
}
}

impl ToBatch for LogicalJoin {
Expand Down Expand Up @@ -1774,19 +1744,23 @@ impl ToStream for LogicalJoin {
}

fn try_better_locality(&self, columns: &[usize]) -> Option<PlanRef> {
if let Some(better_plan) = self.try_better_locality_inner(columns) {
Some(better_plan)
} else if self.ctx().session_ctx().config().enable_locality_backfill() {
Some(
LogicalLocalityProvider::new(
self.clone_with_left_right(self.left(), self.right()).into(),
columns.to_owned(),
)
.into(),
)
} else {
None
let mut ctx = ToStreamContext::new(false);
// only pass through the locality information if it can be converted to dynamic filter
if let Ok(Some(_)) = self.to_stream_dynamic_filter(self.on().clone(), &mut ctx) {
// since dynamic filter only supports left input ref in the output indices, we can safely use o2i mapping to convert the required columns.
let o2i_mapping = self.core.o2i_col_mapping();
let left_input_columns = columns
.iter()
.map(|&col| o2i_mapping.try_map(col))
.collect::<Option<Vec<usize>>>()?;
if let Some(better_left_plan) = self.left().try_better_locality(&left_input_columns) {
return Some(
self.clone_with_left_right(better_left_plan, self.right())
.into(),
);
}
}
None
}
}

Expand Down
9 changes: 3 additions & 6 deletions src/frontend/src/optimizer/plan_node/logical_over_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use super::{
BatchOverWindow, ColPrunable, ExprRewritable, Logical, LogicalFilter,
LogicalPlanRef as PlanRef, LogicalProject, PlanBase, PlanTreeNodeUnary, PredicatePushdown,
StreamEowcOverWindow, StreamEowcSort, StreamOverWindow, ToBatch, ToStream,
gen_filter_and_pushdown,
gen_filter_and_pushdown, try_enforce_locality_requirement,
};
use crate::error::{ErrorCode, Result, RwError};
use crate::expr::{
Expand Down Expand Up @@ -670,11 +670,8 @@ impl ToStream for LogicalOverWindow {
if partition_key_indices.is_empty() {
empty_partition_by_not_implemented!();
}
let input = self
.core
.input
.try_better_locality(&partition_key_indices)
.unwrap_or_else(|| self.core.input.clone());

let input = try_enforce_locality_requirement(self.input(), &partition_key_indices);
let stream_input = input.to_stream(ctx)?;

if ctx.emit_on_window_close() {
Expand Down
Loading
Loading