Skip to content

Compatibility enhancements on AggregationNode #12830

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
zhztheplayer opened this issue Mar 27, 2025 · 21 comments · May be fixed by #13024 or #12861
Open

Compatibility enhancements on AggregationNode #12830

zhztheplayer opened this issue Mar 27, 2025 · 21 comments · May be fixed by #13024 or #12861
Labels
enhancement New feature or request

Comments

@zhztheplayer
Copy link
Contributor

zhztheplayer commented Mar 27, 2025

Background

Velox's AggregationNode was likely designed following Presto style, where step is a property of the aggregation operator. As a comparison, Spark's AggregateMode is bound to a specific AggreagateFunction, making it possible that one Spark aggregate operator has aggregate functions that are with different modes (steps). There was also an old discussion around this difference.

Apache Gluten has been relying on some tricks to align Spark with Velox on this: the planner interprets Spark's aggregation modes into different types of Velox companion functions, then assigns single step to Velox's AggregationNode constantly. By doing this we can get rid of a lot of relevant issues caused by the mismatch. For example, we can naturally support spill-able partial aggregation which is unsupported by Velox. We can also avoid unwanted flushing causing result mismatches because of some specific query plans generated by Spark.

The solution basically worked as expected until we met that sometimes the intermediate / final companion functions are not resolvable given intermediate input types for some functions. Because the functions accept different types of input but use the same type of intermediate data. For example this one.

The issue is literally hard to fix completely because, the companion functions are all treated as normal aggregation functions in Velox, even we have the input types of the original aggregation function from Spark, we can't always use them to find intermediate / final companion functions accurately because they are resolved with intermediate data types in Velox. Although Velox provides suffixed version of final companion functions to distinguish between them, but theoretically they're not reliable since in SQL world overloaded functions can only be distinguished by function name and input data types.

An Example of Mixed-Steps Aggregation Operator in Spark

The following is an example about how mixed-steps aggregation operator is generated in Spark:

A SQL SELECT sum(distinct value1) FROM agg2 in Spark will be planned as

AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[sum(distinct value1#58)], output=[sum(DISTINCT value1)#61L])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=90]
      +- HashAggregate(keys=[], functions=[partial_sum(distinct value1#58)], output=[sum#66L])
         +- HashAggregate(keys=[value1#58], functions=[], output=[value1#58])
            +- Exchange hashpartitioning(value1#58, 5), ENSURE_REQUIREMENTS, [plan_id=86]
               +- HashAggregate(keys=[value1#58], functions=[], output=[value1#58])
                  +- FileScan parquet spark_catalog.default.agg2[value1#58] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/warehouse-3e3fcb4a-c364-44dc-84c8-ba32de1ba854/agg2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value1:int>

. Note the query plan is a 3-stages work:

  1. Do partial distinct value1 from scan stage;
  2. Do final distinct value1 + partial_sum(distinct value1) from second stage after shuffle (value1)
  3. Do final sum(distinct value1) from third stage after shuffle ()

The second stage is important for the overall parallelism of the query because if without it, all the sum(distinct) work will be done in a single node so causes huge inefficiency.

Based on this SQL, assuming we add another non-distinct aggregation function to it, for instance:

SELECT sum(distinct value1), avg(value2) FROM agg2 in Spark will be planned as

AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[avg(value2#59), sum(distinct value1#58)], output=[sum(DISTINCT value1)#62L, avg(value2)#63])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=90]
      +- HashAggregate(keys=[], functions=[merge_avg(value2#59), partial_sum(distinct value1#58)], output=[sum#69, count#70L, sum#73L])
         +- HashAggregate(keys=[value1#58], functions=[merge_avg(value2#59)], output=[value1#58, sum#69, count#70L])
            +- Exchange hashpartitioning(value1#58, 5), ENSURE_REQUIREMENTS, [plan_id=86]
               +- HashAggregate(keys=[value1#58], functions=[partial_avg(value2#59)], output=[value1#58, sum#69, count#70L])
                  +- FileScan parquet spark_catalog.default.agg2[value1#58,value2#59] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/warehouse-8f7bc201-2722-42c0-97c6-8053b7600458/agg2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value1:int,value2:int>

As shown, for fully utilising all the previous 3 stages, avg(value2) is planned as partial_avg(value2) + merge_avg(value2) + avg(value2), so Spark could make sure in every stage it got a reduction of the data amount to process for the avg function.

Because merge_avg(value2) is needed in the 2nd stage, it will be mixed with partial_sum(distinct value1) to create a mixed-steps aggregation operator.

Proposed Changes

  1. Move AggregationNode::Step to AggregationNode::Aggregate::Step and make it work.
  2. Add a flag to AggregationNode to allow user disable flushing actively.

The changes will make Velox's aggregation API completely compatible with both Spark and Presto, and possibly with other databases because the API is made finer grained. In Presto we can just pass the same aggregation step to all aggregate functions in the operator, and in Spark we are now able to interpret Spark AggregateMode into Velox AggregationNode::Aggregate::Step for each aggregate function. Spark does't do flushing so normally in Spark we can just disable flushing. We can also stop relying on companion functions.

@zhztheplayer zhztheplayer added the enhancement New feature or request label Mar 27, 2025
@zhztheplayer
Copy link
Contributor Author

zhztheplayer commented Mar 27, 2025

@rui-mo @NEUpanning @kagamiori @liujiayi771 @jinchengchenghh What do you think? Was considering about this kind of enhancement in Velox for long time.

cc @mbasmanova

@mbasmanova
Copy link
Contributor

@zhztheplayer I'm not sure I understand the big picture of the proposal. Would you create a Google doc to describe the semantics on the new Aggregation node and provide use cases that show case why these semantics make sense?

@zhztheplayer
Copy link
Contributor Author

@zhztheplayer I'm not sure I understand the big picture of the proposal. Would you create a Google doc to describe the semantics on the new Aggregation node and provide use cases that show case why these semantics make sense?

No problem. I will open a Google doc for this.

@zhztheplayer
Copy link
Contributor Author

@mbasmanova
Copy link
Contributor

@zhztheplayer Thank you for putting together a doc. I left some comments. Overall, it would be helpful to provide some examples that explain the benefit of mixing aggregation function steps in a single operator. Currently, it is not clear.

@zhztheplayer
Copy link
Contributor Author

zhztheplayer commented Apr 7, 2025

@mbasmanova I've included an example in the document. The format in Google doc is a bit strange so I am also explaining it here:

Spark SQL:

SELECT count(l_quantity), count(distinct l_partkey) FROM lineitem

Spark query plan:

HashAggregate(keys=[], functions=[count(l_quantity#80), count(distinct l_partkey#77L)], output=[count(l_quantity)#159L, count(DISTINCT l_partkey)#160L])
   +- HashAggregate(keys=[], functions=[merge_count(l_quantity#80), partial_count(distinct l_partkey#77L)], output=[count#166L, count#169L])
      +- HashAggregate(keys=[l_partkey#77L], functions=[merge_count(l_quantity#80)], output=[l_partkey#77L, count#166L])
         +- HashAggregate(keys=[l_partkey#77L], functions=[partial_count(l_quantity#80)], output=[l_partkey#77L, count#166L])
            +- BatchScan parquet 

Note the last aggregation node:

HashAggregate(keys=[], functions=[merge_count(l_quantity#80), partial_count(distinct l_partkey#77L)], output=[count#166L, count#169L])

is the one that mixes use of PartialMerge and Partial aggregation modes at the same time.

This is because Spark query planner does a small optimization to reuse the result of merge_count(l_quantity#80) GROUP BY l_partkey to calculate both count(l_quantity) and count(distinct l_partkey#77L) at a time.

@mbasmanova
Copy link
Contributor

mbasmanova commented Apr 7, 2025

@zhztheplayer Thank you for proving an example. This is very helpful. Any chance you could give me editing permissions on the doc so I count also add some sections? Alternatively, let me do that here.

To make sure I understand fully, the benefit of having mixed aggregations is to optimize queries like:

SELECT count(a), count(distinct b) FROM t

or more generically

SELECT agg1(a1), agg2(a2),..., agg(distinct b) FROM t

SELECT count(a), count(distinct b) FROM t

In this query, we can first group by 'b' and compute partial counts of 'a' for each group of 'b'. Then, perform global aggregation and count number of distinct groups + sum of partial counts of 'a'.

In other words we re-write original SQL as

WITH x as (SELECT b, count(a) as cnt FROM t GROUP BY 1)
SELECT sum(cnt), count(b) FROM x 

and generate a plan like this:

- Agg(): merge_count(a), merge_count(b)
  - Agg(): merge_count(a), partial_count(b)
    - Agg(b): a := merge_count(a)
      - <shuffle on 'b'>
         - Agg(b): a := partial_count(a)

Is there any particular reason for having the 3rd aggregation here? I assume we need to shuffle the data on 'b' after the first aggregation. If that's the case, then we can have just 1 aggregation after shuffle, no?

Is this the only known shape that uses mixed aggregations? If not, would you share others?

I'm still trying to understand what is being optimized here. Would you clarify a bit but showing the difference between using this optimization and not using it?

More generically,

SELECT agg1(a), agg2(distinct b) FROM t

is optimized to

WITH x AS (SELECT b, agg1(a) as a_agg1 FROM t GROUP BY 1)
SELECT agg1_merge(a_agg1), agg2(b) FROM x

with plan

- Agg(): merge_agg1(a), merge_agg2(b)
  - Agg(): merge_agg1(a), partial_agg2(b)
    - Agg(b): a := merge_agg1(a)
      - <shuffle on 'b'>
         - Agg(b): a := partial_agg1(a)

@mbasmanova
Copy link
Contributor

The solution basically worked as expected until we met that sometimes the intermediate / final companion functions are not resolvable given intermediate input types for some functions.

Where do we encounter an error? Is this during type resolution in Gluten (probably not) or Velox? If the latter, Velox receives fully typed plan and therefore shouldn't need to perform type resolution on its own. Where does it try and fail to do that? Is this during creation of Aggregate function instance or somewhere else? Please, clarify.

@zhztheplayer
Copy link
Contributor Author

Hi @mbasmanova, thank you for the detailed breaking down. And I have edited the example query plan for missing of the last aggregation operator name, but I guess you already got that example.

Any chance you could give me editing permissions on the doc so I count also add some sections? Alternatively, let me do that here.
I've made the document editable with same link in case you want to edit.

Is there any particular reason for having the 3rd aggregation here?

Aha. We can take another example that has a aggregation key:

SELECT c, count(a), count(distinct b) FROM t GROUP BY c

Then the query plan could be optimized to:

- Agg(c): merge_count(a), merge_count(b)
  - <shuffle on 'c'>
    - Agg(c): a := merge_count(a), b := partial_count(b)
      - <shuffle on 'b, c'>
        - Agg(b, c): a := partial_count(a)

in Spark. the second aggregation operator will work on intermediate result distributed by hash(b, c) so it still need to have a partial count on b then rely on the last aggregation operator to do the final count.

@zhztheplayer
Copy link
Contributor Author

Where do we encounter an error? Is this during type resolution in Gluten (probably not) or Velox?

It's during converting from Gluten plan to Velox. Gluten was failing to find the wanted merge-extract function from Velox's function registry.

Is this during creation of Aggregate function instance or somewhere else? Please, clarify.

This test case in Velox is the closest one to the case Gluten was encountering:

TEST_F(
AggregateCompanionRegistryTest,
resultTypeNotResolvableFromIntermediateType) {
// We only register companion functions for original signatures whose result
// type can be resolved from its intermediate type.
std::vector<std::shared_ptr<AggregateFunctionSignature>> signatures{
AggregateFunctionSignatureBuilder()
.typeVariable("T")
.returnType("array(T)")
.intermediateType("varbinary")
.argumentType("T")
.build()};
registerDummyAggregateFunction("aggregateFunc6", signatures);
checkAggregateSignaturesCount("aggregateFunc6_partial", 0);
checkAggregateSignaturesCount("aggregateFunc6_merge", 0);
checkAggregateSignaturesCount("aggregateFunc6_merge_extract", 0);
checkScalarSignaturesCount("aggregateFunc6_extract", 0);
}

You'll see with only intermediate types provided, the merge-extract function cannot be resolved. The same happens on resolving decimal avg's merge-extract companion in Gluten.

@mbasmanova
Copy link
Contributor

mbasmanova commented Apr 7, 2025

@zhztheplayer

I'm still trying to understand what is being optimized here. Would you clarify a bit by showing the difference between using this optimization and not using it?

Do you have something to say here?

Assuming we can confirm that this optimization is indeed useful, we'll need to find a way to translate Spark plans with mixed aggregations into Velox plans.

My thinking at this moment is that a cleaner change might be to extend AggregationNode::Aggregate struct to add a boolean requesting to use a 'merge' companion function or to add logic to Velox to automatically check whether input is rawInput or intermediate result and then choose whether to apply the original aggregate function or 'merge' companion.

The reason I'm thinking in this direction as opposed to actually moving 'step' from the plan node to Aggregate is that doing so would require changing flush and spill implementations that adds quite a lot of complexity.

@liujiayi771
Copy link
Contributor

liujiayi771 commented Apr 8, 2025

Hi, @mbasmanova. This discussion can provide a more detailed explanation of the issues encountered with Spark decimal average.

@zhztheplayer
Copy link
Contributor Author

@mbasmanova Thanks for the suggestion, just to clarify a bit:

to add a boolean requesting to use a 'merge' companion function

How does this flag work with AggregationNode::Step? Say if the step assigned for the aggregation node is kSingle, would you mean we still find the merge-extract function instead if the flag is true?

or to add logic to Velox to automatically check whether input is rawInput or intermediate result and then choose whether to apply the original aggregate function or 'merge' companion.

The same question there. Given there is already AggregationNode::Step, does it still make sense to have another mechanism to find aggregation function with the wanted step?

The reason I'm thinking in this direction as opposed to actually moving 'step' from the plan node to Aggregate is that doing so would require changing flush and spill implementations that adds quite a lot of complexity.

Actually it's not much effort based on my attempts. Could see #12861 which makes all things work and keeps the backward compatibilty on PlanBuilder facilities.

In Presto we also only need a trivial change to move onto the new API. We may only change several lines of code in https://github.com/prestodb/presto/blob/14a69e9b44c37f34caca6d09fd57d63cdedbf4ce/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp#L769-L805. I can help file a PR in Presto then.

Moreover, I think the proposed is not only a change for Spark + Velox integration, it also makes Velox be adaptable to more upper layer softwares. For example, could see this code from Hive to find that the aggregation step is also a property of one single aggregation function. Presto's design on this part is actually not that common.

@mbasmanova
Copy link
Contributor

@zhztheplayer

I'm still trying to understand what is being optimized here. Would you clarify a bit by showing the difference between using this optimization and not using it?

Could you please help us resolve this question? We need to make sure the optimization is actually worth it before proceeding with the changes. At this point it is not obvious what's the value of this optimization. Would appreciate clarification.

@zhztheplayer
Copy link
Contributor Author

zhztheplayer commented Apr 8, 2025

Hi @mbasmanova ,

From Apache Gluten's perspective, with the change, we will be able to:

  1. Use all aggregation functions in Velox that are registered with generic input arguments at will whenever is needed;
  2. Remove this workaround from our own Velox fork;
  3. Stop relying on Velox companion functions that add extra complexities to Gluten's planner;
  4. In code, have clearer 1 on 1 mapping from Spark's aggregation modes to Velox's aggregation steps, and an individual flag to control flushing. Previously, we convert flushable aggregation to Velox partial aggregation for flushable=true, or to Velox single aggregation for flushable=false, then find the wanted companion functions based on the aggregation modes from registry. This code is not clean.

@mbasmanova
Copy link
Contributor

@zhztheplayer It sounds like the optimization in Spark that generates mixed aggregation is not very useful. If that's the case, a logical next question is why not disable it for Gluten?

@zhztheplayer
Copy link
Contributor Author

zhztheplayer commented Apr 8, 2025

Hi @mbasmanova ,

@zhztheplayer It sounds like the optimization in Spark that generates mixed aggregation is not very useful. If that's the case, a logical next question is why not disable it for Gluten?

It's useful and cannot be disabled from Spark.

  1. Stop relying on Velox companion functions that add extra complexities to Gluten's planner;

If the comment was based on this one, which may be a little misleading - I was tending to mean, we can stop relying on Velox companion functions, but still assign different steps to Velox AggregationNode::Aggregate instances.

@liujiayi771
Copy link
Contributor

@mbasmanova I think this modification is not intended to introduce the optimization of Spark's count(distinct) aggregation into Velox. Its main purpose is to make Velox's AggregationNode more generic, so that it can better adapt to plans generated by Spark or other SQL engines, while having almost no impact on Presto's compatibility.

@mbasmanova
Copy link
Contributor

@zhztheplayer

It's useful and cannot be disabled from Spark.

Would you clarify a bit more? Are you saying that this is not an optimization in Spark, but rather the only implementation available for queries with distinct aggregations? I'd like to to understand why it cannot be disabled? If it is an optimization, would you clarify what is being optimized? An example that illustrates the wins would be appreciated.

Its main purpose is to make Velox's AggregationNode more generic, so that it can better adapt to plans generated by Spark or other SQL engines

More generic implies higher complexity, larger test matrix and more maintenance. Hence, it is not something to embrace unless there is a large enough upside. What's the upside here? Which other SQL engines would benefit? What makes it a good idea for an optimizer to create mixed aggregations? I might be missing something obvious, hence, would appreciate a clarification.

To evaluate #12861, it would be helpful to summarize how is 'step' currently used in the Aggregation operators and how these usages would change with the new design. For example, there is logic to abandon partial agg early. Will we enable it only if all aggs are running in 'partial' mode?

Alternatively, I think it is worth considering changing the aggregations themselves.

I'm busy preparing for VeloxCon this week and will be traveling next week. I should have more bandwidth to help with the design the following week. It would be great to clear the "Why?" above before then to make sure we have a clear understanding about the motivation for introducing the new design.

FYI, there is an in-flight optimization for aggregation: #12975

@zhztheplayer
Copy link
Contributor Author

zhztheplayer commented Apr 10, 2025

Hi @mbasmanova ,

Would you clarify a bit more? Are you saying that this is not an optimization in Spark, but rather the only implementation available for queries with distinct aggregations?

Correct. It's simply how Spark plans aggregation queries. It's neither an additional feature nor a rule in optimizer.

I'd like to understand why it cannot be disabled?

Could start from this Spark code in Spark query planner for aggregation planning. The relevant logic is embemdded in the core code that cannot be disabled through options or so.

What makes it a good idea for an optimizer to create mixed aggregations?

I had to iterate on Spark's commit history to answer the question though it is highly related how Spark plans distinct aggregation functions.

For example, a SQL SELECT sum(distinct value1) FROM agg2 in Spark will be planned as

AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[sum(distinct value1#58)], output=[sum(DISTINCT value1)#61L])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=90]
      +- HashAggregate(keys=[], functions=[partial_sum(distinct value1#58)], output=[sum#66L])
         +- HashAggregate(keys=[value1#58], functions=[], output=[value1#58])
            +- Exchange hashpartitioning(value1#58, 5), ENSURE_REQUIREMENTS, [plan_id=86]
               +- HashAggregate(keys=[value1#58], functions=[], output=[value1#58])
                  +- FileScan parquet spark_catalog.default.agg2[value1#58] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/warehouse-3e3fcb4a-c364-44dc-84c8-ba32de1ba854/agg2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value1:int>

. Note the query plan is a 3-stages work:

  1. Do partial distinct value1 from scan stage;
  2. Do final distinct value1 + partial_sum(distinct value1) from second stage after shuffle (value1)
  3. Do final sum(distinct value1) from third stage after shuffle ()

The second stage is important for the overall parallelism of the query because if without it, all the sum(distinct) work will be done in a single node so causes huge inefficiency.

Based on this SQL, assuming we add another non-distinct aggregation function to it, for instance:

SELECT sum(distinct value1), avg(value2) FROM agg2 in Spark will be planned as

AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[avg(value2#59), sum(distinct value1#58)], output=[sum(DISTINCT value1)#62L, avg(value2)#63])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=90]
      +- HashAggregate(keys=[], functions=[merge_avg(value2#59), partial_sum(distinct value1#58)], output=[sum#69, count#70L, sum#73L])
         +- HashAggregate(keys=[value1#58], functions=[merge_avg(value2#59)], output=[value1#58, sum#69, count#70L])
            +- Exchange hashpartitioning(value1#58, 5), ENSURE_REQUIREMENTS, [plan_id=86]
               +- HashAggregate(keys=[value1#58], functions=[partial_avg(value2#59)], output=[value1#58, sum#69, count#70L])
                  +- FileScan parquet spark_catalog.default.agg2[value1#58,value2#59] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/warehouse-8f7bc201-2722-42c0-97c6-8053b7600458/agg2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value1:int,value2:int>

As shown, for fully utilising all the previous 3 stages, avg(value2) is planned as partial_avg(value2) + merge_avg(value2) + avg(value2), so Spark could make sure in every stage it got a reduction of the data amount to process for the avg function.

Because merge_avg(value2) is needed in the 2nd stage, it will be mixed with partial_sum(distinct value1) to create a mixed-steps aggregation operator.

Do you think the breakdown can answer your major question? Please feel free to let me know if any further comments.

@mbasmanova
Copy link
Contributor

@zhztheplayer Thank you for these additional clarification. I think I get it now. It would be great to summarize all this discussion in a single doc or GitHub issue (perhaps, just edit this issue's description) so that new readers can get up to speed quickly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
3 participants