From 20fbdbd1a5c4845f8510048e64b27e2efa053cff Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 10 Oct 2025 13:30:37 +0800 Subject: [PATCH] support enforcing locality on the reqirement side --- .../testdata/input/locality_backfill.yaml | 45 +++++++++ .../testdata/output/locality_backfill.yaml | 92 ++++++++++++++++++ .../src/optimizer/plan_node/convert.rs | 16 ++++ .../src/optimizer/plan_node/logical_agg.rs | 5 +- .../src/optimizer/plan_node/logical_dedup.rs | 7 +- .../src/optimizer/plan_node/logical_join.rs | 70 +++++--------- .../plan_node/logical_over_window.rs | 9 +- .../src/optimizer/plan_node/logical_scan.rs | 96 +++++++++---------- .../src/optimizer/plan_node/logical_topn.rs | 7 +- 9 files changed, 227 insertions(+), 120 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/input/locality_backfill.yaml b/src/frontend/planner_test/tests/testdata/input/locality_backfill.yaml index 6b197d70b7e9e..4388369852e05 100644 --- a/src/frontend/planner_test/tests/testdata/input/locality_backfill.yaml +++ b/src/frontend/planner_test/tests/testdata/input/locality_backfill.yaml @@ -15,5 +15,50 @@ set enable_locality_backfill = true; create table t (a int, b int, c int, primary key (b, a)); select count(*) from t group by a, b; + expected_outputs: + - stream_plan +- sql: | + set enable_locality_backfill = true; + create table t (a int, b int, c int); + select count(*) from t where c > 1 group by a, b; + expected_outputs: + - stream_plan +- sql: | + set enable_locality_backfill = true; + create table t1 (a int, b int, c int); + create table t2 (a int, b int, c int); + select count(*) from t1 join t2 on t1.a = t2.a where t1.c > t2.c group by t1.b; + expected_outputs: + - stream_plan +- sql: | + set enable_locality_backfill = true; + create table t (a int, b int, c int); + select RANK() OVER (PARTITION BY a ORDER BY b) as rank from t; + expected_outputs: + - stream_plan +- name: enforce locality for temporal join for both sides. + sql: | + set enable_locality_backfill = true; + create table stream(id1 int, a1 int, b1 int); + create table version(id2 int, a2 int, b2 int, primary key (id2)); + create index idx2 on version (a2, b2); + select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2; + expected_outputs: + - stream_plan +- sql: | + set enable_locality_backfill = true; + create table t(a int, b int, c int) append only; + select distinct on(a) * from t ; + expected_outputs: + - stream_plan +- sql: | + set enable_locality_backfill = true; + create table t(a int, b int, c int); + SELECT * FROM ( + SELECT + *, + row_number() OVER (PARTITION BY a ORDER BY b) AS rank + FROM t + ) WHERE rank <= 1; expected_outputs: - stream_plan \ No newline at end of file diff --git a/src/frontend/planner_test/tests/testdata/output/locality_backfill.yaml b/src/frontend/planner_test/tests/testdata/output/locality_backfill.yaml index 2c72134309bec..b7720755e6b1d 100644 --- a/src/frontend/planner_test/tests/testdata/output/locality_backfill.yaml +++ b/src/frontend/planner_test/tests/testdata/output/locality_backfill.yaml @@ -41,3 +41,95 @@ └─StreamLocalityProvider { locality_columns: [t.a, t.b] } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(t.b, t.a) } └─StreamTableScan { table: t, columns: [t.a, t.b], stream_scan_type: ArrangementBackfill, stream_key: [t.b, t.a], pk: [b, a], dist: UpstreamHashShard(t.b, t.a) } +- sql: | + set enable_locality_backfill = true; + create table t (a int, b int, c int); + select count(*) from t where c > 1 group by a, b; + stream_plan: |- + StreamMaterialize { columns: [count, t.a(hidden), t.b(hidden)], stream_key: [t.a, t.b], pk_columns: [t.a, t.b], pk_conflict: NoCheck } + └─StreamProject { exprs: [count, t.a, t.b] } + └─StreamHashAgg { group_key: [t.a, t.b], aggs: [count] } + └─StreamLocalityProvider { locality_columns: [t.a, t.b] } + └─StreamExchange { dist: HashShard(t.a, t.b) } + └─StreamProject { exprs: [t.a, t.b, t._row_id] } + └─StreamFilter { predicate: (t.c > 1:Int32) } + └─StreamTableScan { table: t, columns: [t.a, t.b, t._row_id, t.c], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } +- sql: | + set enable_locality_backfill = true; + create table t1 (a int, b int, c int); + create table t2 (a int, b int, c int); + select count(*) from t1 join t2 on t1.a = t2.a where t1.c > t2.c group by t1.b; + stream_plan: |- + StreamMaterialize { columns: [count, t1.b(hidden)], stream_key: [t1.b], pk_columns: [t1.b], pk_conflict: NoCheck } + └─StreamProject { exprs: [count, t1.b] } + └─StreamHashAgg { group_key: [t1.b], aggs: [count] } + └─StreamLocalityProvider { locality_columns: [t1.b] } + └─StreamExchange { dist: HashShard(t1.b) } + └─StreamProject { exprs: [t1.b, t1._row_id, t1.a, t2._row_id] } + └─StreamFilter { predicate: (t1.c > t2.c) } + └─StreamHashJoin { type: Inner, predicate: t1.a = t2.a, output: all } + ├─StreamExchange { dist: HashShard(t1.a) } + │ └─StreamLocalityProvider { locality_columns: [t1.a] } + │ └─StreamExchange { dist: HashShard(t1.a) } + │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1.c, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } + └─StreamExchange { dist: HashShard(t2.a) } + └─StreamLocalityProvider { locality_columns: [t2.a] } + └─StreamExchange { dist: HashShard(t2.a) } + └─StreamTableScan { table: t2, columns: [t2.a, t2.c, t2._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t2._row_id], pk: [_row_id], dist: UpstreamHashShard(t2._row_id) } +- sql: | + set enable_locality_backfill = true; + create table t (a int, b int, c int); + select RANK() OVER (PARTITION BY a ORDER BY b) as rank from t; + stream_plan: |- + StreamMaterialize { columns: [rank, t._row_id(hidden), t.a(hidden)], stream_key: [t._row_id, t.a], pk_columns: [t._row_id, t.a], pk_conflict: NoCheck } + └─StreamProject { exprs: [rank, t._row_id, t.a] } + └─StreamOverWindow { window_functions: [rank() OVER(PARTITION BY t.a ORDER BY t.b ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } + └─StreamLocalityProvider { locality_columns: [t.a] } + └─StreamExchange { dist: HashShard(t.a) } + └─StreamTableScan { table: t, columns: [t.a, t.b, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } +- name: enforce locality for temporal join for both sides. + sql: | + set enable_locality_backfill = true; + create table stream(id1 int, a1 int, b1 int); + create table version(id2 int, a2 int, b2 int, primary key (id2)); + create index idx2 on version (a2, b2); + select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2; + stream_plan: |- + StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, a1, stream.b1], pk_columns: [stream._row_id, id2, a1, stream.b1], pk_conflict: NoCheck } + └─StreamExchange { dist: HashShard(stream.a1, idx2.id2, stream._row_id, stream.b1) } + └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, nested_loop: false, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } + ├─StreamExchange { dist: HashShard(stream.a1) } + │ └─StreamLocalityProvider { locality_columns: [stream.a1, stream.b1] } + │ └─StreamExchange { dist: HashShard(stream.a1, stream.b1) } + │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } + └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx2.a2) } + └─StreamTableScan { table: idx2, columns: [idx2.id2, idx2.a2, idx2.b2], stream_scan_type: UpstreamOnly, stream_key: [idx2.id2], pk: [a2, b2, id2], dist: UpstreamHashShard(idx2.a2) } +- sql: | + set enable_locality_backfill = true; + create table t(a int, b int, c int) append only; + select distinct on(a) * from t ; + stream_plan: |- + StreamMaterialize { columns: [a, b, c, t._row_id(hidden)], stream_key: [a], pk_columns: [a], pk_conflict: NoCheck } + └─StreamAppendOnlyDedup { dedup_cols: [t.a] } + └─StreamExchange { dist: HashShard(t.a) } + └─StreamLocalityProvider { locality_columns: [t.a] } + └─StreamExchange { dist: HashShard(t.a) } + └─StreamTableScan { table: t, columns: [t.a, t.b, t.c, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } +- sql: | + set enable_locality_backfill = true; + create table t(a int, b int, c int); + SELECT * FROM ( + SELECT + *, + row_number() OVER (PARTITION BY a ORDER BY b) AS rank + FROM t + ) WHERE rank <= 1; + stream_plan: |- + StreamMaterialize { columns: [a, b, c, t._row_id(hidden), rank], stream_key: [a], pk_columns: [a], pk_conflict: NoCheck } + └─StreamOverWindow { window_functions: [row_number() OVER(PARTITION BY t.a ORDER BY t.b ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } + └─StreamLocalityProvider { locality_columns: [t.a] } + └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(t.a) } + └─StreamGroupTopN { order: [t.b ASC], limit: 1, offset: 0, group_key: [t.a] } + └─StreamLocalityProvider { locality_columns: [t.a] } + └─StreamExchange { dist: HashShard(t.a) } + └─StreamTableScan { table: t, columns: [t.a, t.b, t.c, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } diff --git a/src/frontend/src/optimizer/plan_node/convert.rs b/src/frontend/src/optimizer/plan_node/convert.rs index 2b14382f072f9..b19b6c489ff50 100644 --- a/src/frontend/src/optimizer/plan_node/convert.rs +++ b/src/frontend/src/optimizer/plan_node/convert.rs @@ -61,6 +61,22 @@ pub trait ToStream { } } +/// Try to enforce the locality requirement on the given columns. +/// If a better plan can be found, return the better plan. +/// If no better plan can be found, and locality backfill is enabled, wrap the plan +/// with `LogicalLocalityProvider`. +/// Otherwise, return the plan as is. +pub fn try_enforce_locality_requirement(plan: LogicalPlanRef, columns: &[usize]) -> LogicalPlanRef { + assert!(!columns.is_empty()); + if let Some(better_plan) = plan.try_better_locality(columns) { + better_plan + } else if plan.ctx().session_ctx().config().enable_locality_backfill() { + LogicalLocalityProvider::new(plan, columns.to_owned()).into() + } else { + plan + } +} + pub fn stream_enforce_eowc_requirement( ctx: OptimizerContextRef, plan: StreamPlanRef, diff --git a/src/frontend/src/optimizer/plan_node/logical_agg.rs b/src/frontend/src/optimizer/plan_node/logical_agg.rs index 3238f71936577..61d9efca4dc9f 100644 --- a/src/frontend/src/optimizer/plan_node/logical_agg.rs +++ b/src/frontend/src/optimizer/plan_node/logical_agg.rs @@ -25,6 +25,7 @@ use super::{ BatchHashAgg, BatchSimpleAgg, ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef, PlanBase, PlanTreeNodeUnary, PredicatePushdown, StreamHashAgg, StreamPlanRef, StreamProject, StreamShare, StreamSimpleAgg, StreamStatelessSimpleAgg, ToBatch, ToStream, + try_enforce_locality_requirement, }; use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{ @@ -1419,9 +1420,7 @@ impl ToStream for LogicalAgg { let input = if self.group_key().is_empty() { self.input() } else { - self.input() - .try_better_locality(&self.group_key().to_vec()) - .unwrap_or_else(|| self.input()) + try_enforce_locality_requirement(self.input(), &self.group_key().to_vec()) }; let stream_input = input.to_stream(ctx)?; diff --git a/src/frontend/src/optimizer/plan_node/logical_dedup.rs b/src/frontend/src/optimizer/plan_node/logical_dedup.rs index 71e35086afd13..e0a8e38880c69 100644 --- a/src/frontend/src/optimizer/plan_node/logical_dedup.rs +++ b/src/frontend/src/optimizer/plan_node/logical_dedup.rs @@ -22,7 +22,7 @@ use super::{ BatchGroupTopN, BatchPlanRef, ColPrunable, ColumnPruningContext, ExprRewritable, Logical, LogicalPlanRef as PlanRef, LogicalProject, PlanBase, PlanTreeNodeUnary, PredicatePushdown, PredicatePushdownContext, RewriteStreamContext, StreamDedup, StreamGroupTopN, ToBatch, - ToStream, ToStreamContext, gen_filter_and_pushdown, generic, + ToStream, ToStreamContext, gen_filter_and_pushdown, generic, try_enforce_locality_requirement, }; use crate::error::Result; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; @@ -104,10 +104,7 @@ impl ToStream for LogicalDedup { ) -> Result { use super::stream::prelude::*; - let logical_input = self - .input() - .try_better_locality(self.dedup_cols()) - .unwrap_or_else(|| self.input()); + let logical_input = try_enforce_locality_requirement(self.input(), self.dedup_cols()); let input = logical_input.to_stream(ctx)?; let input = RequiredDist::hash_shard(self.dedup_cols()) .streaming_enforce_if_not_satisfies(input)?; diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index ad1b4c68d860d..ffe3184d85085 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -28,9 +28,9 @@ use super::generic::{ }; use super::utils::{Distill, childless_record}; use super::{ - BatchPlanRef, ColPrunable, ExprRewritable, Logical, LogicalLocalityProvider, - LogicalPlanRef as PlanRef, PlanBase, PlanTreeNodeBinary, PredicatePushdown, StreamHashJoin, - StreamPlanRef, StreamProject, ToBatch, ToStream, generic, + BatchPlanRef, ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef, PlanBase, + PlanTreeNodeBinary, PredicatePushdown, StreamHashJoin, StreamPlanRef, StreamProject, ToBatch, + ToStream, generic, try_enforce_locality_requirement, }; use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{CollectInputRef, Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor, InputRef}; @@ -903,19 +903,12 @@ impl LogicalJoin { let lhs_join_key_idx = self.eq_indexes().into_iter().map(|(l, _)| l).collect_vec(); let rhs_join_key_idx = self.eq_indexes().into_iter().map(|(_, r)| r).collect_vec(); - let logical_right = self - .right() - .try_better_locality(&rhs_join_key_idx) - .unwrap_or_else(|| self.right()); + let logical_right = try_enforce_locality_requirement(self.right(), &rhs_join_key_idx); let mut right = logical_right.to_stream_with_dist_required( &RequiredDist::shard_by_key(self.right().schema().len(), &predicate.right_eq_indexes()), ctx, )?; - let logical_left = self - .left() - .try_better_locality(&lhs_join_key_idx) - .unwrap_or_else(|| self.left()); - + let logical_left = try_enforce_locality_requirement(self.left(), &lhs_join_key_idx); let r2l = predicate.r2l_eq_columns_mapping(logical_left.schema().len(), right.schema().len()); let l2r = @@ -1255,10 +1248,7 @@ impl LogicalJoin { .into_iter() .map(|(l, _)| l) .collect_vec(); - let logical_left = self - .left() - .try_better_locality(&lhs_join_key_idx) - .unwrap_or_else(|| self.left()); + let logical_left = try_enforce_locality_requirement(self.left(), &lhs_join_key_idx); let left = logical_left.to_stream(ctx)?; // Enforce a shuffle for the temporal join LHS to let the scheduler be able to schedule the join fragment together with the RHS with a `no_shuffle` exchange. let left = required_dist.stream_enforce(left); @@ -1541,26 +1531,6 @@ impl LogicalJoin { .into()), } } - - fn try_better_locality_inner(&self, columns: &[usize]) -> Option { - let mut ctx = ToStreamContext::new(false); - // only pass through the locality information if it can be converted to dynamic filter - if let Ok(Some(_)) = self.to_stream_dynamic_filter(self.on().clone(), &mut ctx) { - // since dynamic filter only supports left input ref in the output indices, we can safely use o2i mapping to convert the required columns. - let o2i_mapping = self.core.o2i_col_mapping(); - let left_input_columns = columns - .iter() - .map(|&col| o2i_mapping.try_map(col)) - .collect::>>()?; - if let Some(better_left_plan) = self.left().try_better_locality(&left_input_columns) { - return Some( - self.clone_with_left_right(better_left_plan, self.right()) - .into(), - ); - } - } - None - } } impl ToBatch for LogicalJoin { @@ -1774,19 +1744,23 @@ impl ToStream for LogicalJoin { } fn try_better_locality(&self, columns: &[usize]) -> Option { - if let Some(better_plan) = self.try_better_locality_inner(columns) { - Some(better_plan) - } else if self.ctx().session_ctx().config().enable_locality_backfill() { - Some( - LogicalLocalityProvider::new( - self.clone_with_left_right(self.left(), self.right()).into(), - columns.to_owned(), - ) - .into(), - ) - } else { - None + let mut ctx = ToStreamContext::new(false); + // only pass through the locality information if it can be converted to dynamic filter + if let Ok(Some(_)) = self.to_stream_dynamic_filter(self.on().clone(), &mut ctx) { + // since dynamic filter only supports left input ref in the output indices, we can safely use o2i mapping to convert the required columns. + let o2i_mapping = self.core.o2i_col_mapping(); + let left_input_columns = columns + .iter() + .map(|&col| o2i_mapping.try_map(col)) + .collect::>>()?; + if let Some(better_left_plan) = self.left().try_better_locality(&left_input_columns) { + return Some( + self.clone_with_left_right(better_left_plan, self.right()) + .into(), + ); + } } + None } } diff --git a/src/frontend/src/optimizer/plan_node/logical_over_window.rs b/src/frontend/src/optimizer/plan_node/logical_over_window.rs index 49cc0fbd465b8..bb5e980c28a5a 100644 --- a/src/frontend/src/optimizer/plan_node/logical_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_over_window.rs @@ -26,7 +26,7 @@ use super::{ BatchOverWindow, ColPrunable, ExprRewritable, Logical, LogicalFilter, LogicalPlanRef as PlanRef, LogicalProject, PlanBase, PlanTreeNodeUnary, PredicatePushdown, StreamEowcOverWindow, StreamEowcSort, StreamOverWindow, ToBatch, ToStream, - gen_filter_and_pushdown, + gen_filter_and_pushdown, try_enforce_locality_requirement, }; use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{ @@ -670,11 +670,8 @@ impl ToStream for LogicalOverWindow { if partition_key_indices.is_empty() { empty_partition_by_not_implemented!(); } - let input = self - .core - .input - .try_better_locality(&partition_key_indices) - .unwrap_or_else(|| self.core.input.clone()); + + let input = try_enforce_locality_requirement(self.input(), &partition_key_indices); let stream_input = input.to_stream(ctx)?; if ctx.emit_on_window_close() { diff --git a/src/frontend/src/optimizer/plan_node/logical_scan.rs b/src/frontend/src/optimizer/plan_node/logical_scan.rs index 06f542b9a696b..1041986a4c5e5 100644 --- a/src/frontend/src/optimizer/plan_node/logical_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_scan.rs @@ -26,8 +26,8 @@ use super::generic::{GenericPlanNode, GenericPlanRef}; use super::utils::{Distill, childless_record}; use super::{ BatchFilter, BatchPlanRef, BatchProject, ColPrunable, ExprRewritable, Logical, - LogicalLocalityProvider, LogicalPlanRef as PlanRef, PlanBase, PlanNodeId, PredicatePushdown, - StreamTableScan, ToBatch, ToStream, generic, + LogicalPlanRef as PlanRef, PlanBase, PlanNodeId, PredicatePushdown, StreamTableScan, ToBatch, + ToStream, generic, }; use crate::TableCatalog; use crate::binder::BoundBaseTable; @@ -565,52 +565,6 @@ impl LogicalScan { None } - - fn try_better_locality_inner(&self, columns: &[usize]) -> Option { - if !self - .core - .ctx() - .session_ctx() - .config() - .enable_index_selection() - { - return None; - } - if columns.is_empty() { - return None; - } - if self.table_indexes().is_empty() { - return None; - } - let orders = if columns.len() <= 3 { - OrderType::all() - } else { - // Limit the number of order type combinations to avoid explosion. - // For more than 3 columns, we only consider ascending nulls last and descending. - // Since by default, indexes are created with ascending nulls last. - // This is a heuristic to reduce the search space. - vec![OrderType::ascending_nulls_last(), OrderType::descending()] - }; - for order_type_combo in columns - .iter() - .map(|&col| orders.iter().map(move |ot| ColumnOrder::new(col, *ot))) - .multi_cartesian_product() - .take(256) - // limit the number of combinations - { - let required_order = Order { - column_orders: order_type_combo, - }; - - let order_satisfied_index = self.indexes_satisfy_order(&required_order); - for index in order_satisfied_index { - if let Some(index_scan) = self.to_index_scan_if_index_covered(index) { - return Some(index_scan.into()); - } - } - } - None - } } impl ToBatch for LogicalScan { @@ -726,12 +680,48 @@ impl ToStream for LogicalScan { } fn try_better_locality(&self, columns: &[usize]) -> Option { - if let Some(better_plan) = self.try_better_locality_inner(columns) { - Some(better_plan) - } else if self.ctx().session_ctx().config().enable_locality_backfill() { - Some(LogicalLocalityProvider::new(self.clone().into(), columns.to_owned()).into()) + if !self + .core + .ctx() + .session_ctx() + .config() + .enable_index_selection() + { + return None; + } + if columns.is_empty() { + return None; + } + if self.table_indexes().is_empty() { + return None; + } + let orders = if columns.len() <= 3 { + OrderType::all() } else { - None + // Limit the number of order type combinations to avoid explosion. + // For more than 3 columns, we only consider ascending nulls last and descending. + // Since by default, indexes are created with ascending nulls last. + // This is a heuristic to reduce the search space. + vec![OrderType::ascending_nulls_last(), OrderType::descending()] + }; + for order_type_combo in columns + .iter() + .map(|&col| orders.iter().map(move |ot| ColumnOrder::new(col, *ot))) + .multi_cartesian_product() + .take(256) + // limit the number of combinations + { + let required_order = Order { + column_orders: order_type_combo, + }; + + let order_satisfied_index = self.indexes_satisfy_order(&required_order); + for index in order_satisfied_index { + if let Some(index_scan) = self.to_index_scan_if_index_covered(index) { + return Some(index_scan.into()); + } + } } + None } } diff --git a/src/frontend/src/optimizer/plan_node/logical_topn.rs b/src/frontend/src/optimizer/plan_node/logical_topn.rs index b03d6dace4f1f..7de4315bda3b8 100644 --- a/src/frontend/src/optimizer/plan_node/logical_topn.rs +++ b/src/frontend/src/optimizer/plan_node/logical_topn.rs @@ -22,7 +22,7 @@ use super::utils::impl_distill_by_unit; use super::{ BatchGroupTopN, ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef, PlanBase, PlanTreeNodeUnary, PredicatePushdown, StreamGroupTopN, StreamPlanRef, StreamProject, ToBatch, - ToStream, gen_filter_and_pushdown, generic, + ToStream, gen_filter_and_pushdown, generic, try_enforce_locality_requirement, }; use crate::error::{ErrorCode, Result, RwError}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; @@ -323,10 +323,7 @@ impl ToStream for LogicalTopN { ))); } Ok(if !self.group_key().is_empty() { - let logical_input = self - .input() - .try_better_locality(self.group_key()) - .unwrap_or_else(|| self.input()); + let logical_input = try_enforce_locality_requirement(self.input(), self.group_key()); let input = logical_input.to_stream(ctx)?; let input = RequiredDist::shard_by_key(self.input().schema().len(), self.group_key()) .streaming_enforce_if_not_satisfies(input)?;