Skip to content

Commit aae3b05

Browse files
authored
fix(tesseract): Fix rolling window external pre-aggregation (#9625)
1 parent 81a1ae9 commit aae3b05

File tree

24 files changed

+650
-324
lines changed

24 files changed

+650
-324
lines changed

packages/cubejs-schema-compiler/src/adapter/BaseQuery.js

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,8 +332,9 @@ export class BaseQuery {
332332
this.useNativeSqlPlanner = this.options.useNativeSqlPlanner ?? getEnv('nativeSqlPlanner');
333333
this.canUseNativeSqlPlannerPreAggregation = false;
334334
if (this.useNativeSqlPlanner && !this.neverUseSqlPlannerPreaggregation()) {
335-
const hasMultiStageMeasures = this.fullKeyQueryAggregateMeasures({ hasMultipliedForPreAggregation: true }).multiStageMembers.length > 0;
336-
this.canUseNativeSqlPlannerPreAggregation = hasMultiStageMeasures;
335+
const fullAggregateMeasures = this.fullKeyQueryAggregateMeasures({ hasMultipliedForPreAggregation: true });
336+
337+
this.canUseNativeSqlPlannerPreAggregation = fullAggregateMeasures.multiStageMembers.length > 0 || fullAggregateMeasures.cumulativeMeasures.length > 0;
337338
}
338339
this.queryLevelJoinHints = this.options.joinHints ?? [];
339340
this.prebuildJoin();
@@ -775,6 +776,13 @@ export class BaseQuery {
775776
);
776777
}
777778

779+
driverTools(external) {
780+
if (external && !this.options.disableExternalPreAggregations && this.externalQueryClass) {
781+
return this.externalQuery();
782+
}
783+
return this;
784+
}
785+
778786
buildSqlAndParamsRust(exportAnnotatedSql) {
779787
const order = this.options.order && R.pipe(
780788
R.map((hash) => ((!hash || !hash.id) ? null : hash)),

packages/cubejs-schema-compiler/src/adapter/CubeStoreQuery.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,4 +334,13 @@ export class CubeStoreQuery extends BaseQuery {
334334
}
335335
);
336336
}
337+
338+
public sqlTemplates() {
339+
const templates = super.sqlTemplates();
340+
templates.statements.time_series_select = '{% for time_item in seria %}' +
341+
'select to_timestamp(\'{{ time_item[0] }}\') date_from, to_timestamp(\'{{ time_item[1] }}\') date_to \n' +
342+
'{% if not loop.last %} UNION ALL\n{% endif %}' +
343+
'{% endfor %}';
344+
return templates;
345+
}
337346
}

rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/base_tools.rs

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use super::base_query_options::FilterItem;
2+
use super::driver_tools::{DriverTools, NativeDriverTools};
23
use super::filter_group::{FilterGroup, NativeFilterGroup};
34
use super::filter_params::{FilterParams, NativeFilterParams};
45
use super::pre_aggregation_obj::{NativePreAggregationObj, PreAggregationObj};
@@ -16,12 +17,7 @@ use std::rc::Rc;
1617

1718
#[nativebridge::native_bridge]
1819
pub trait BaseTools {
19-
fn convert_tz(&self, field: String) -> Result<String, CubeError>;
20-
fn time_grouped_column(
21-
&self,
22-
granularity: String,
23-
dimension: String,
24-
) -> Result<String, CubeError>;
20+
fn driver_tools(&self, external: bool) -> Result<Rc<dyn DriverTools>, CubeError>;
2521
fn sql_templates(&self) -> Result<Rc<dyn SqlTemplatesRender>, CubeError>;
2622
fn security_context_for_rust(&self) -> Result<Rc<dyn SecurityContext>, CubeError>;
2723
fn sql_utils_for_rust(&self) -> Result<Rc<dyn SqlUtils>, CubeError>;
@@ -33,10 +29,6 @@ pub trait BaseTools {
3329
&self,
3430
used_filters: Option<Vec<FilterItem>>,
3531
) -> Result<Rc<dyn FilterGroup>, CubeError>;
36-
fn timestamp_precision(&self) -> Result<u32, CubeError>;
37-
fn time_stamp_cast(&self, field: String) -> Result<String, CubeError>; //TODO move to templates
38-
fn date_time_cast(&self, field: String) -> Result<String, CubeError>; //TODO move to templates
39-
fn in_db_time_zone(&self, date: String) -> Result<String, CubeError>;
4032
fn generate_time_series(
4133
&self,
4234
granularity: String,
@@ -49,23 +41,8 @@ pub trait BaseTools {
4941
origin: String,
5042
) -> Result<Vec<Vec<String>>, CubeError>;
5143
fn get_allocated_params(&self) -> Result<Vec<String>, CubeError>;
52-
fn subtract_interval(&self, date: String, interval: String) -> Result<String, CubeError>;
53-
fn add_interval(&self, date: String, interval: String) -> Result<String, CubeError>;
54-
fn add_timestamp_interval(&self, date: String, interval: String) -> Result<String, CubeError>;
5544
fn all_cube_members(&self, path: String) -> Result<Vec<String>, CubeError>;
5645
fn interval_and_minimal_time_unit(&self, interval: String) -> Result<Vec<String>, CubeError>;
57-
//===== TODO Move to templates
58-
fn hll_init(&self, sql: String) -> Result<String, CubeError>;
59-
fn hll_merge(&self, sql: String) -> Result<String, CubeError>;
60-
fn hll_cardinality_merge(&self, sql: String) -> Result<String, CubeError>;
61-
fn count_distinct_approx(&self, sql: String) -> Result<String, CubeError>;
62-
fn date_bin(
63-
&self,
64-
interval: String,
65-
source: String,
66-
origin: String,
67-
) -> Result<String, CubeError>;
68-
6946
fn get_pre_aggregation_by_name(
7047
&self,
7148
cube_name: String,
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
use super::sql_templates_render::{NativeSqlTemplatesRender, SqlTemplatesRender};
2+
use cubenativeutils::wrappers::serializer::{
3+
NativeDeserialize, NativeDeserializer, NativeSerialize,
4+
};
5+
use cubenativeutils::wrappers::NativeContextHolder;
6+
use cubenativeutils::wrappers::NativeObjectHandle;
7+
use cubenativeutils::CubeError;
8+
use std::any::Any;
9+
use std::rc::Rc;
10+
11+
#[nativebridge::native_bridge]
12+
pub trait DriverTools {
13+
fn convert_tz(&self, field: String) -> Result<String, CubeError>;
14+
fn time_grouped_column(
15+
&self,
16+
granularity: String,
17+
dimension: String,
18+
) -> Result<String, CubeError>;
19+
fn sql_templates(&self) -> Result<Rc<dyn SqlTemplatesRender>, CubeError>;
20+
fn timestamp_precision(&self) -> Result<u32, CubeError>;
21+
fn time_stamp_cast(&self, field: String) -> Result<String, CubeError>; //TODO move to templates
22+
fn date_time_cast(&self, field: String) -> Result<String, CubeError>; //TODO move to templates
23+
fn in_db_time_zone(&self, date: String) -> Result<String, CubeError>;
24+
fn get_allocated_params(&self) -> Result<Vec<String>, CubeError>;
25+
fn subtract_interval(&self, date: String, interval: String) -> Result<String, CubeError>;
26+
fn add_interval(&self, date: String, interval: String) -> Result<String, CubeError>;
27+
fn add_timestamp_interval(&self, date: String, interval: String) -> Result<String, CubeError>;
28+
fn interval_and_minimal_time_unit(&self, interval: String) -> Result<Vec<String>, CubeError>;
29+
fn hll_init(&self, sql: String) -> Result<String, CubeError>;
30+
fn hll_merge(&self, sql: String) -> Result<String, CubeError>;
31+
fn hll_cardinality_merge(&self, sql: String) -> Result<String, CubeError>;
32+
fn count_distinct_approx(&self, sql: String) -> Result<String, CubeError>;
33+
fn date_bin(
34+
&self,
35+
interval: String,
36+
source: String,
37+
origin: String,
38+
) -> Result<String, CubeError>;
39+
}

rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ pub mod case_item;
66
pub mod case_label;
77
pub mod cube_definition;
88
pub mod dimension_definition;
9+
pub mod driver_tools;
910
pub mod evaluator;
1011
pub mod filter_group;
1112
pub mod filter_params;

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use super::*;
2-
use crate::cube_bridge::pre_aggregation_obj::PreAggregationObj;
32
use crate::logical_plan::*;
43
use crate::plan::FilterItem;
54
use crate::planner::query_tools::QueryTools;
@@ -29,7 +28,7 @@ impl MatchState {
2928

3029
pub struct PreAggregationOptimizer {
3130
query_tools: Rc<QueryTools>,
32-
used_pre_aggregations: HashMap<(String, String), Rc<dyn PreAggregationObj>>,
31+
used_pre_aggregations: HashMap<(String, String), Rc<PreAggregation>>,
3332
}
3433

3534
impl PreAggregationOptimizer {
@@ -71,7 +70,7 @@ impl PreAggregationOptimizer {
7170
Ok(None)
7271
}
7372

74-
pub fn get_used_pre_aggregations(&self) -> Vec<Rc<dyn PreAggregationObj>> {
73+
pub fn get_used_pre_aggregations(&self) -> Vec<Rc<PreAggregation>> {
7574
self.used_pre_aggregations.values().cloned().collect()
7675
}
7776

@@ -445,15 +444,14 @@ impl PreAggregationOptimizer {
445444
granularity: pre_aggregation.granularity.clone(),
446445
table_name: table_name.clone(),
447446
cube_name: pre_aggregation.cube_name.clone(),
447+
pre_aggregation_obj,
448448
};
449+
let result = Rc::new(pre_aggregation);
449450
self.used_pre_aggregations.insert(
450-
(
451-
pre_aggregation.cube_name.clone(),
452-
pre_aggregation.name.clone(),
453-
),
454-
pre_aggregation_obj.clone(),
451+
(result.cube_name.clone(), result.name.clone()),
452+
result.clone(),
455453
);
456-
Ok(Rc::new(pre_aggregation))
454+
Ok(result)
457455
} else {
458456
Err(CubeError::internal(format!(
459457
"Cannot find pre aggregation object for cube {} and name {}",

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/pre_aggregation.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use super::*;
2+
use crate::cube_bridge::pre_aggregation_obj::PreAggregationObj;
23
use crate::planner::sql_evaluator::MemberSymbol;
34
use itertools::Itertools;
45
use std::rc::Rc;
@@ -13,6 +14,7 @@ pub struct PreAggregation {
1314
pub granularity: Option<String>,
1415
pub table_name: String,
1516
pub cube_name: String,
17+
pub pre_aggregation_obj: Rc<dyn PreAggregationObj>,
1618
}
1719

1820
impl PrettyPrint for PreAggregation {

rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/builder.rs

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,14 @@ impl PhysicalPlanBuilderContext {
4141

4242
pub struct PhysicalPlanBuilder {
4343
query_tools: Rc<QueryTools>,
44-
_plan_sql_templates: PlanSqlTemplates,
44+
plan_sql_templates: PlanSqlTemplates,
4545
}
4646

4747
impl PhysicalPlanBuilder {
48-
pub fn new(query_tools: Rc<QueryTools>) -> Self {
49-
let plan_sql_templates = query_tools.plan_sql_templates();
48+
pub fn new(query_tools: Rc<QueryTools>, plan_sql_templates: PlanSqlTemplates) -> Self {
5049
Self {
5150
query_tools,
52-
_plan_sql_templates: plan_sql_templates,
51+
plan_sql_templates,
5352
}
5453
}
5554

@@ -102,6 +101,7 @@ impl PhysicalPlanBuilder {
102101
) -> Result<Rc<Select>, CubeError> {
103102
let mut render_references = HashMap::new();
104103
let mut measure_references = HashMap::new();
104+
let mut dimensions_references = HashMap::new();
105105
let mut context_factory = context.make_sql_nodes_factory();
106106
let from = match &logical_plan.source {
107107
SimpleQuerySource::LogicalJoin(join) => self.process_logical_join(
@@ -114,8 +114,8 @@ impl PhysicalPlanBuilder {
114114
let res = self.process_pre_aggregation(
115115
pre_aggregation,
116116
context,
117-
&mut render_references,
118117
&mut measure_references,
118+
&mut dimensions_references,
119119
)?;
120120
for member in logical_plan.schema.time_dimensions.iter() {
121121
context_factory.add_dimensions_with_ignored_timezone(member.full_name());
@@ -128,6 +128,7 @@ impl PhysicalPlanBuilder {
128128
let mut select_builder = SelectBuilder::new(from);
129129
context_factory.set_ungrouped(logical_plan.ungrouped);
130130
context_factory.set_pre_aggregation_measures_references(measure_references);
131+
context_factory.set_pre_aggregation_dimensions_references(dimensions_references);
131132

132133
let mut group_by = Vec::new();
133134
for member in logical_plan.schema.dimensions.iter() {
@@ -185,8 +186,8 @@ impl PhysicalPlanBuilder {
185186
&self,
186187
pre_aggregation: &Rc<PreAggregation>,
187188
_context: &PhysicalPlanBuilderContext,
188-
render_references: &mut HashMap<String, QualifiedColumnName>,
189189
measure_references: &mut HashMap<String, QualifiedColumnName>,
190+
dimensions_references: &mut HashMap<String, QualifiedColumnName>,
190191
) -> Result<Rc<From>, CubeError> {
191192
let mut pre_aggregation_schema = Schema::empty();
192193
let pre_aggregation_alias = PlanSqlTemplates::memeber_alias_name(
@@ -201,7 +202,7 @@ impl PhysicalPlanBuilder {
201202
&dim.alias_suffix(),
202203
self.query_tools.clone(),
203204
)?;
204-
render_references.insert(
205+
dimensions_references.insert(
205206
dim.full_name(),
206207
QualifiedColumnName::new(Some(pre_aggregation_alias.clone()), alias.clone()),
207208
);
@@ -214,16 +215,10 @@ impl PhysicalPlanBuilder {
214215
granularity,
215216
self.query_tools.clone(),
216217
)?;
217-
render_references.insert(
218+
dimensions_references.insert(
218219
dim.full_name(),
219220
QualifiedColumnName::new(Some(pre_aggregation_alias.clone()), alias.clone()),
220221
);
221-
if let Some(granularity) = &granularity {
222-
render_references.insert(
223-
format!("{}_{}", dim.full_name(), granularity),
224-
QualifiedColumnName::new(Some(pre_aggregation_alias.clone()), alias.clone()),
225-
);
226-
}
227222
pre_aggregation_schema.add_column(SchemaColumn::new(alias, Some(dim.full_name())));
228223
}
229224
for meas in pre_aggregation.measures.iter() {
@@ -970,9 +965,7 @@ impl PhysicalPlanBuilder {
970965
));
971966
};
972967

973-
let templates = self.query_tools.plan_sql_templates();
974-
975-
let ts_date_range = if templates.supports_generated_time_series()
968+
let ts_date_range = if self.plan_sql_templates.supports_generated_time_series()
976969
&& granularity_obj.is_predefined_granularity()
977970
{
978971
if let Some(date_range) = time_dimension_symbol

rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,7 @@ impl RegularRollingWindowJoinCondition {
5252
};
5353

5454
let trailing_start = if let Some(trailing_interval) = &self.trailing_interval {
55-
templates
56-
.base_tools()
57-
.subtract_interval(start_date, trailing_interval.clone())?
55+
templates.subtract_interval(start_date, trailing_interval.clone())?
5856
} else {
5957
start_date
6058
};
@@ -72,9 +70,7 @@ impl RegularRollingWindowJoinCondition {
7270
};
7371

7472
let leading_end = if let Some(leading_interval) = &self.leading_interval {
75-
templates
76-
.base_tools()
77-
.add_interval(end_date, leading_interval.clone())?
73+
templates.add_interval(end_date, leading_interval.clone())?
7874
} else {
7975
end_date
8076
};
@@ -121,7 +117,7 @@ pub struct ToDateRollingWindowJoinCondition {
121117
time_series_source: String,
122118
granularity: String,
123119
time_dimension: Expr,
124-
query_tools: Rc<QueryTools>,
120+
_query_tools: Rc<QueryTools>,
125121
}
126122

127123
impl ToDateRollingWindowJoinCondition {
@@ -135,7 +131,7 @@ impl ToDateRollingWindowJoinCondition {
135131
time_series_source,
136132
granularity,
137133
time_dimension,
138-
query_tools,
134+
_query_tools: query_tools,
139135
}
140136
}
141137

@@ -150,10 +146,7 @@ impl ToDateRollingWindowJoinCondition {
150146
templates.column_reference(&Some(self.time_series_source.clone()), "date_to")?;
151147
let date_to =
152148
templates.column_reference(&Some(self.time_series_source.clone()), "date_from")?;
153-
let grouped_from = self
154-
.query_tools
155-
.base_tools()
156-
.time_grouped_column(self.granularity.clone(), date_from)?;
149+
let grouped_from = templates.time_grouped_column(self.granularity.clone(), date_from)?;
157150
let result = format!("{date_column} >= {grouped_from} and {date_column} <= {date_to}");
158151
Ok(result)
159152
}

rust/cubesqlplanner/cubesqlplanner/src/plan/time_series.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ impl TimeSeries {
4444
&& self.granularity.is_predefined_granularity()
4545
{
4646
let interval_description = templates
47-
.base_tools()
4847
.interval_and_minimal_time_unit(self.granularity.granularity_interval().clone())?;
4948
if interval_description.len() != 2 {
5049
return Err(CubeError::internal(

0 commit comments

Comments
 (0)