Skip to content

fix: Allow companion functions when result type is not resolvable given intermediate type #11999

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

rui-mo
Copy link
Collaborator

@rui-mo rui-mo commented Jan 2, 2025

The registrations of partial and merge companion functions does not require the
result type being resolvable given intermediate type. This PR removes the
limitations for them. The registration of merge_extract companion function used
to depend on the resolving for all aggregate steps, and this PR uses the passed-
in result type rather than the resolved one for single and final steps.

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Jan 2, 2025
Copy link

netlify bot commented Jan 2, 2025

Deploy Preview for meta-velox canceled.

Name Link
🔨 Latest commit ed5adc1
🔍 Latest deploy log https://app.netlify.com/sites/meta-velox/deploys/67ebbd676ee84d0008196759

@rui-mo rui-mo force-pushed the wip_merge_extract branch 2 times, most recently from cee024d to 9a4519f Compare January 2, 2025 07:12
@zhztheplayer
Copy link
Contributor

zhztheplayer commented Jan 2, 2025

@rui-mo Thank you for taking this on. I remember we needed to bypass this check as well to make the decimal aggregate functions work. Or we don't need that change anymore?

@Yuhta Yuhta requested a review from kagamiori January 2, 2025 15:13
@rui-mo rui-mo force-pushed the wip_merge_extract branch from 9a4519f to 9911e8f Compare January 8, 2025 09:39
@rui-mo
Copy link
Collaborator Author

rui-mo commented Jan 8, 2025

@zhztheplayer Thanks for the pointer! In Gluten we expect that all the names will end with "_merge_extract," however according to this reasoning, the names may have other suffixes. For instance, we would obtain the names for sum and average as follows. I'll investigate further to see how to incorporate this section with Gluten.

avg_merge_extract_DECIMAL, avg_merge_extract_double
sum_merge_extract_bigint, sum_merge_extract_DECIMAL, sum_merge_extract_double

@zhztheplayer
Copy link
Contributor

@zhztheplayer Thanks for the pointer! In Gluten we expect that all the names will end with "_merge_extract," however according to this reasoning, the names may have other suffixes. For instance, we would obtain the names for sum and average as follows. I'll investigate further to see how to incorporate this section with Gluten.

avg_merge_extract_DECIMAL, avg_merge_extract_double
sum_merge_extract_bigint, sum_merge_extract_DECIMAL, sum_merge_extract_double

Sounds reasonable. Look forward to a solution here. Thanks!

@rui-mo rui-mo force-pushed the wip_merge_extract branch from 9911e8f to 3d2f9e5 Compare January 9, 2025 03:26
@rui-mo rui-mo marked this pull request as ready for review January 9, 2025 03:27
if (auto func = getAggregateFunctionEntry(name)) {
auto fn = func->factory(
core::AggregationNode::Step::kFinal,
argTypes,
originalResultType,
resultType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @rui-mo, it's not universally correct to use resultType here. The reason is that resultType is an argument received by the factory of the merge-extract-function (i.e., the lambda starting at line 337). This factory is called in the HashAggregation constructor for individual aggregation nodes that can be the partial aggregation step or the intermediate aggregation step, etc. Suppose an aggregation node perform the intermediate aggregation step of the merge-extract-function, then both the argTypes and the resultType received by the factory at line 337 are the intermediate type of the original function. But when we do auto fn = func->factory(...), we're creating the original aggregation function, so the result type passed to this factory should be the result type of the original aggregation function.

(This change doesn't trigger any test error because the AggregationTestBase::testAggregationsWithCompanion() currently doesn't test the functions with the merge-extract companion function, which we should better add...)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suppose an aggregation node perform the intermediate aggregation step of the merge-extract-function, then both the argTypes and the resultType received by the factory at line 337 are the intermediate type of the original function.

Can we break down the cases here? E.g., when step is single / final, we just skip the result type resolution?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we break down the cases here? E.g., when step is single / final, we just skip the result type resolution?

@kagamiori I apologize for the delayed response; I was on vacation. Do you believe that @zhztheplayer's suggestion above makes sense? We are proposing this change to allow more flexibility in the aggregate function registration especially for the Spark decimal average.

Copy link
Contributor

@NEUpanning NEUpanning Mar 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kagamiori @rui-mo I am curious why the original function factory is called withStep::kFinal. If the step is eithter partial or intermediate, can we use Step::kIntermediate instead? By doing this, we can avoid the need for result type resolutio. Here is the code:

bool registerAggregateFunction(
    const std::string& name,
    const std::string& mergeExtractFunctionName,
    const std::vector<std::shared_ptr<AggregateFunctionSignature>>&
        mergeExtractSignatures,
    bool overwrite) {
  return exec::registerAggregateFunction(
             mergeExtractFunctionName,
             std::move(mergeExtractSignatures),
             [name, mergeExtractFunctionName](
                 core::AggregationNode::Step step,
                 const std::vector<TypePtr>& argTypes,
                 const TypePtr& resultType,
                 const core::QueryConfig& config)
                 -> std::unique_ptr<Aggregate> {
               if (auto func = getAggregateFunctionEntry(name)) {
                 core::AggregationNode::Step factoryStep = core::AggregationNode::Step::kFinal;
                 if (isPartialOutput(step)) {
                   factoryStep = core::AggregationNode::Step::kIntermediate;
                 }
                 auto fn = func->factory(
                     factoryStep,
                     argTypes,
                     resultType,
                     config);
                 VELOX_CHECK_NOT_NULL(fn);
                 return std::make_unique<
                     AggregateCompanionAdapter::MergeExtractFunction>(
                     std::move(fn), resultType);
               }
               VELOX_FAIL(
                   "Original aggregation function {} not found: {}",
                   name,
                   mergeExtractFunctionName);
             },
             /*registerCompanionFunctions*/ false,
             overwrite)
      .mainFunction;
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NEUpanning I think using kIntermediate cannot ensure the result could be correctly extracted if the MergeExtractFunction::extractValues is called.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rui-mo I am not sure I follow. Could you elaborate on it? I assume that for partial and intermediate steps, the input and result types of merge_extract are both intermediate type, which maps to intermediate step of original aggregate function. Thanks!

Copy link
Collaborator Author

@rui-mo rui-mo Mar 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NEUpanning With the change as you mentioned, it appears we are only registering a function accepting intermediate input and giving intermediate output for the partial/intermediate steps (this is the same with an ordinary intermediate aggregate function). But for a companion function, we need to register a function accepting intermediate input and can give final output for the partial/intermediate steps. @kagamiori Would you like to confirm? Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for a companion function, we need to register a function accepting intermediate input and can give final output for the partial/intermediate steps

@rui-mo Thanks for your clarification! For the partial/intermediate steps, I thought aggregate op would call extractAccumulators function to get results as stated in the document , and merge_extract does not override extractAccumulators to extractValues. Could you remind me how aggregate op gets the final output for the partial/intermediate steps?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got your point. By reviewing the design doc #4493 I'm not sure if merge_extract is designed to extract final result when step is partial//intermediate. Perhaps @kagamiori could help clarify. Thanks!

@NEUpanning
Copy link
Contributor

NEUpanning commented Jan 24, 2025

@rui-mo Thank you for working on this. I'm developing a new Spark collect_list with the signature T -> varbinary -> array(T), for more details see #12023. I suppose if this PR is merged, Velox would support this signature. So, I cherry-picked this PR into my branch and ran the test. I found four issues:

  1. CompanionSignatures::extractFunctionSignature still has limitations. It leads to register no extract functions.
  2. While creating the merge_extract signature, Velox checks if all type variables should appear in the inputs arguments. For this signature, T does not appear in varbinary. See code link
  3. exec::test::PlanBuilder::project uses resolveScalarFunctionType to resolve result type from intermediate type, but it cannot be resolved for this signature. Here is the error message:
C++ exception with description "Exception: VeloxUserError
Error Source: USER
Error Code: INVALID_ARGUMENT
Reason: Scalar function signature is not supported: spark_collect_list_extract(VARBINARY). Supported signatures: (varbinary) -> array(E).
Retriable: False
Function: resolveScalarFunctionType
File: /Users/panning/VSProjects/glutenNvelox_CE/velox2/velox/velox/parse/TypeResolver.cpp
Line: 102
  1. Gluten fails to validate function type for collect_list_merge_extract, because Velox cannot use actual intermediate type to resolve return type.
  Validation failed at file:SubstraitToVeloxPlanValidator.cc, line:1055, function:validateAggRelFunctionType, reason:Validation failed for function collect_list_merge_extract resolve type in AggregateRel.

For the first and second issues, perhaps we can simply remove the limitation and check? For the fourth issue, maybe we can use actual return type to resolve return type.

cc @zhztheplayer

@rui-mo
Copy link
Collaborator Author

rui-mo commented Feb 11, 2025

I'm developing a new Spark collect_list with the signature T -> varbinary -> array(T)

Hi @NEUpanning, could you elaborate more about the issue you met when developing the collect_list, and how the issue might be relevant to this PR perhaps in the issue #12023? Thanks.

@NEUpanning
Copy link
Contributor

@rui-mo To resolve issue #12023, we have to change the intermediate type of collect_list to varbinary. After this change the signature of collect_list will be T -> varbinary -> array(T) that leads to result type is not resolvable given intermediate type and generates no companion functions for collect_list. I thought this PR would allow companion functions for the new collect_list signature.

@kagamiori
Copy link
Contributor

Hi @rui-mo and @zhztheplayer, sorry for the delay.

Can we break down the cases here? E.g., when step is single / final, we just skip the result type resolution?

This could be one possibility, but I'd like to understand the problem a bit better. Below are the problems I got from the conversation here:

  1. @rui-mo needs a workaround to allow registering _merge_extract functions of decimal aggregation functions whose result types cannot be resolved from intermediate types.
  2. Similarly, @NEUpanning needs a way to create function signature and register _extract functions of aggregation functions whose result type cannot be resolved from intermediate types, such as T -> varbinary -> array(T).

These two problems are similar. Could you please help me understand how the _merge_extract and _extract function will be used in Gluten if the function signature contain unresolvable result type (in other words, where does Gluten get the correct result type)? For example, are these _merge_extract and _extract functions used directly in the user's queries? If so, how does Gluten determine the correct result type when constructing an aggregation or project plan node? Or does Gluten only allow user's queries to use the original aggregation function and the plan nodes of _merge_extract and _extract functions are created according to the plan node of the original aggregation function?

@rui-mo
Copy link
Collaborator Author

rui-mo commented Feb 24, 2025

@kagamiori Gluten uses Velox's final aggregation + companion functions (partial, merge, merge_extract) to map to different modes of Spark aggregation. Velox's single aggregation is also utilised for Spark's complete aggregation. The reasons to use companion functions can be referred at #9048 (reply in thread). This replacement is plan-level, so the user cannot use them in queries directly, instead Gluten will generate them according to the Spark plan.

The extract companion function is not used by Gluten, and for the merge_extract function, Gluten uses the result type from Spark planner. Therefore, Gluten only needs to enable the merge_extract function of final aggregate when result type is not resolvable given intermediate type. Do you think it makes sense? Thanks!

@rui-mo rui-mo force-pushed the wip_merge_extract branch 3 times, most recently from e7f4c97 to b30a368 Compare February 28, 2025 10:14
@rui-mo
Copy link
Collaborator Author

rui-mo commented Feb 28, 2025

@kagamiori I updated this PR to skip type resolving only for single and final steps, and added tests for the merge_extract companion function in testAggregationsWithCompanion. Would you help take another look? Thanks!

@kagamiori
Copy link
Contributor

@kagamiori Gluten uses Velox's final aggregation + companion functions (partial, merge, merge_extract) to map to different modes of Spark aggregation. Velox's single aggregation is also utilised for Spark's complete aggregation. The reasons to use companion functions can be referred at #9048 (reply in thread). This replacement is plan-level, so the user cannot use them in queries directly, instead Gluten will generate them according to the Spark plan.

The extract companion function is not used by Gluten, and for the merge_extract function, Gluten uses the result type from Spark planner. Therefore, Gluten only needs to enable the merge_extract function of final aggregate when result type is not resolvable given intermediate type. Do you think it makes sense? Thanks!

Hi @rui-mo, thank you for sharing the context! Since it is specific to Gluten that it only uses companion functions with the single and final aggregation steps and not using companion function signatures during query resolution, I think the condition should b relaxed only in Gluten. One possibility comes to my mind is for Gluten to implement its own CompanionFunctionsRegistrar that inherits the Velox one and override the registerMergeExtractFunction() method.

cc @mbasmanova for additional thoughts.

@rui-mo
Copy link
Collaborator Author

rui-mo commented Mar 4, 2025

One possibility comes to my mind is for Gluten to implement its own CompanionFunctionsRegistrar that inherits the Velox one and override the registerMergeExtractFunction() method.

@kagamiori Thanks for your suggestion! I assume we could allow a custom CompanionFunctionsRegistrar to be registered by providing a factory method in Velox, and Gluten can register its customized one. Similar customization may need for the CompanionSignatures which skips the partial/merge/merge_extract signatures when isResultTypeResolvableGivenIntermediateType returns false. By doing so, Spark aggregate functions whose return type cannot be resolved given intermediate type (like decimal_avg) can only be added in Velox without the registration of companion functions.

On the other hand, the change proposed in this PR only relax the restrictions for partial, merge, and merge_extract of single and final steps. The registrations of them does not depend on the resolvability in the first place, so I suppose this change is more like to remove the unnecessary limitation.

Could you please help provide more insights? Thanks.

@rui-mo rui-mo force-pushed the wip_merge_extract branch from b30a368 to 98b6e3d Compare March 10, 2025 10:13
@kagamiori
Copy link
Contributor

Hi @rui-mo, sorry for the delay. I looked at the code in this PR. One of my concern is about the removal of the checks isResultTypeResolvableGivenIntermediateType(signature) in AggregateCompanionSignatures.cpp. Previously if the UDAF function signature doesn't pass this check, it fails at registration time of its companion functions. If we remove this check, the registration would succeed but it will fail at query execution time (if merge- or merge-extract companion functions are used with the intermediate aggregation step). This seems to not be what we desire in general, so I would suggest try relaxing the check only in Gluten if that's possible.

@rui-mo
Copy link
Collaborator Author

rui-mo commented Mar 21, 2025

@kagamiori Thanks for helping review!

If we remove this check, the registration would succeed but it will fail at query execution time (if merge- or merge-extract companion functions are used with the intermediate aggregation step).

It looks to me the partial and merge companions do not depend on the resolving from intermediate type to result type. Please kindly correct me if that's wrong. For the merge_extract, we are not sure if it is designed to extract final result when step is partial or intermediate because it does not override the extractAccumulators and intermediate outputs will be produced, seeing details in #11999 (comment). If that's the case, its result type resolving could also be unnecessary. How do you think?

@rui-mo rui-mo force-pushed the wip_merge_extract branch from 98b6e3d to ed5adc1 Compare April 1, 2025 10:18
@rui-mo
Copy link
Collaborator Author

rui-mo commented Apr 1, 2025

Hi @mbasmanova @kagamiori, this is a summary on the scope of this PR. Could you spare some time to help review? Thanks!


Gluten uses Velox's final aggregation + companion functions (partial, merge, merge_extract) to map to different modes of Spark aggregation. While the required resolving from intermediate type to result type during companion functions' registration causes failure for Spark decimal_avg and collect_list aggregate functions, whose result type cannot be resolved from the intermediate type. This PR aims to relax the limitation when unnecessary.

PR Summary

  1. Based on the implementation, the 'partial' and 'merge' companion functions do not rely on the intermediate-type-to-result-type-resolving. We remove the requirement for them.
  2. For the 'merge_extract' companion function, when called with single or final step, the user-passed output type is the same with the original result type, so we could remove the resolving process for single and final steps.

With the above fixes, Gluten issue could be resolved without bringing side-effect to Velox functionality as we only remove the unnecessary resolving process.

Further thoughts

@NEUpanning finds the MergeExtractFunction does not override the extractAccumulators function, which means when called with 'partial' or 'intermediate' step, intermediate result is produced. This brings the question on whether the merge_extract function is designed to produce final result even for 'partial' and 'intermediate' steps. If not, Intermediate rather than kFinal shall be used during func->factory for 'partial' and 'intermediate' steps. If so, the intermediate-type-to-result-type-resolving could be removed completely for the merge_extract function.

auto fn = func->factory(
core::AggregationNode::Step::kFinal,
argTypes,
originalResultType,
config);
VELOX_CHECK_NOT_NULL(fn);
return std::make_unique<
AggregateCompanionAdapter::MergeExtractFunction>(
std::move(fn), resultType);

Please note that although this modification is proposed based on Gluten's reliance on companion functions, if the community agrees with the enhancement proposed by @zhztheplayer in #12830, it could also be considered as a simplification as unnecessary limitation during the companion function registration is removed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants