Skip to content

Support reduction_fator metric (for EXPLAIN ANALYZE) in AggregateExec #18410

@2010YOUY01

Description

@2010YOUY01

Is your feature request related to a problem or challenge?

In the pre-aggregation stage (In datafusion's implementation, AggregateExec with Partial mode), a reduction_factor can be calculated as output_rows (after partial aggregation) / input_rows in AggregateExec

Background for 'partial aggregation':

/// Partial Aggregation [batch_size = 2] (max memory = 3 rows)

Example with datafusion-cli

> set datafusion.explain.analyze_level = summary;

0 row(s) fetched.
Elapsed 0.000 seconds.

> create table t1(a int, b int);
0 row(s) fetched.
Elapsed 0.007 seconds.

> insert into t1 values (1,10), (1, 20), (2,10), (2,30);
+-------+
| count |
+-------+
| 4     |
+-------+
1 row(s) fetched.
Elapsed 0.011 seconds.

> explain analyze select a, sum(b)
from t1
group by a;
+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                              |
+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[sum(t1.b)], metrics=[output_rows=2, elapsed_compute=4.67121ms, output_bytes=1088.0 B] |
|                   |   CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=2, elapsed_compute=17.875µs, output_bytes=192.0 KB]                           |
|                   |     RepartitionExec: partitioning=Hash([a@0], 14), input_partitions=14, metrics=[]                                                                |
|                   |       RepartitionExec: partitioning=RoundRobinBatch(14), input_partitions=1, metrics=[]                                                           |
|                   |         AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[sum(t1.b)], metrics=[output_rows=2, elapsed_compute=1.931542ms, output_bytes=544.0 B]  |
|                   |           DataSourceExec: partitions=1, partition_sizes=[1], metrics=[]                                                                           |
|                   |                                                                                                                                                   |
+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched.
Elapsed 0.018 seconds.

You can see for the bottom AggregateExec with Partial mode, it has 4 input rows, and after the pre-aggregation, the output size has been reduced to 2 rows, so a reduction_factor metric can be calculated as 4/2 => 2, indicating on average each group has 2 origianl rows.

Describe the solution you'd like

Add reduction_factor metric for AggregateExec with Partial mode.

Reference PR: #18406

Describe alternatives you've considered

No response

Additional context

No response

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions