diff --git a/velox/experimental/cudf/exec/CudfHashAggregation.cpp b/velox/experimental/cudf/exec/CudfHashAggregation.cpp index dfc03a1b2580..c8912faca25c 100644 --- a/velox/experimental/cudf/exec/CudfHashAggregation.cpp +++ b/velox/experimental/cudf/exec/CudfHashAggregation.cpp @@ -392,8 +392,6 @@ std::unique_ptr createAggregator( uint32_t inputIndex, VectorPtr constant, bool isGlobal) { - // Companion function may be count_merge_extract or count_partial or others, - // so use this to map if (kind.rfind("sum", 0) == 0) { return std::make_unique( step, inputIndex, constant, isGlobal); @@ -414,6 +412,43 @@ std::unique_ptr createAggregator( } } +static const std::unordered_map + companionStep = { + {"_partial", core::AggregationNode::Step::kPartial}, + {"_merge", core::AggregationNode::Step::kIntermediate}, + {"_merge_extract", core::AggregationNode::Step::kFinal}}; + +/// \brief Convert companion function to step for the aggregation function +/// +/// Companion functions are functions that are registered in velox along with +/// their main aggregation functions. These are designed to always function +/// with a fixed `step`. This is to allow spark style planNodes where `step` is +/// the property of the aggregation function rather than the planNode. +/// Companion functions allow us to override the planNode's step and use +/// aggregations of different steps in the same planNode +core::AggregationNode::Step getCompanionStep( + std::string const& kind, + core::AggregationNode::Step step) { + for (const auto& [k, v] : companionStep) { + if (folly::StringPiece(kind).endsWith(k)) { + step = v; + } + } + return step; +} + +bool hasCompanionAggs( + std::vector const& aggregates) { + for (auto const& aggregate : aggregates) { + for (const auto& [k, v] : companionStep) { + if (folly::StringPiece(aggregate.call->name()).endsWith(k)) { + return true; + } + } + } + return false; +} + auto toAggregators( core::AggregationNode const& aggregationNode, exec::OperatorCtx const& operatorCtx) { @@ -453,8 +488,9 @@ auto toAggregators( auto const kind = aggregate.call->name(); auto const inputIndex = aggInputs[0]; auto const constant = aggConstants.empty() ? nullptr : aggConstants[0]; + auto const companionStep = getCompanionStep(kind, step); aggregators.push_back( - createAggregator(step, kind, inputIndex, constant, isGlobal)); + createAggregator(companionStep, kind, inputIndex, constant, isGlobal)); } return aggregators; } @@ -508,6 +544,7 @@ CudfHashAggregation::CudfHashAggregation( isPartialOutput_(exec::isPartialOutput(aggregationNode->step())), isGlobal_(aggregationNode->groupingKeys().empty()), isDistinct_(!isGlobal_ && aggregationNode->aggregates().empty()), + hasCompanionAggs_(hasCompanionAggs(aggregationNode->aggregates())), maxPartialAggregationMemoryUsage_( driverCtx->queryConfig().maxPartialAggregationMemoryUsage()) {} @@ -790,9 +827,10 @@ RowVectorPtr CudfHashAggregation::getOutput() { // Handle partial groupby. if (isPartialOutput_ && !isGlobal_) { - if (partialOutput_ && - partialOutput_->estimateFlatSize() > - maxPartialAggregationMemoryUsage_) { + if (hasCompanionAggs_ || + (partialOutput_ && + partialOutput_->estimateFlatSize() > + maxPartialAggregationMemoryUsage_)) { // This is basically a flush of the partial output. return releaseAndResetPartialOutput(); } diff --git a/velox/experimental/cudf/exec/CudfHashAggregation.h b/velox/experimental/cudf/exec/CudfHashAggregation.h index 3b828e18e3cb..0df8c78287f7 100644 --- a/velox/experimental/cudf/exec/CudfHashAggregation.h +++ b/velox/experimental/cudf/exec/CudfHashAggregation.h @@ -125,6 +125,7 @@ class CudfHashAggregation : public exec::Operator, public NvtxHelper { // Distinct means it's a count distinct on the groupby keys, without any // aggregations const bool isDistinct_; + const bool hasCompanionAggs_; // Maximum memory usage for partial aggregation. const int64_t maxPartialAggregationMemoryUsage_; diff --git a/velox/experimental/cudf/tests/AggregationTest.cpp b/velox/experimental/cudf/tests/AggregationTest.cpp index 2f405b672e2b..a55b7256aa2b 100644 --- a/velox/experimental/cudf/tests/AggregationTest.cpp +++ b/velox/experimental/cudf/tests/AggregationTest.cpp @@ -479,6 +479,33 @@ TEST_F(AggregationTest, countPartialFinalGlobal) { assertQuery(op, "SELECT count(*) FROM tmp"); } +/// Tests the spark scenario of having different types of aggs in the same +/// planNode Specific example being tested is +/// https://github.com/facebookincubator/velox/issues/12830#issuecomment-2783340233 +TEST_F(AggregationTest, CompanionAggs) { + std::vector keys0{1, 1, 1, 2, 1, 1, 2, 2}; + std::vector keys1{1, 2, 1, 2, 1, 2, 1, 2}; + std::vector values{1, 2, 3, 4, 5, 6, 7, 8}; + auto rowVector = makeRowVector( + {makeFlatVector(keys0), + makeFlatVector(keys1), + makeFlatVector(values)}); + + createDuckDbTable({rowVector}); + + auto op = + PlanBuilder() + .values({rowVector}) + .singleAggregation({"c2", "c0"}, {"count_partial(c1)"}) + .localPartition({"c2", "c0"}) + .singleAggregation({"c0"}, {"count_merge(a0)", "count_partial(c2)"}) + .localPartition({"c0"}) + .singleAggregation({"c0"}, {"count_merge(a0)", "count_merge(a1)"}) + .planNode(); + assertQuery( + op, "SELECT c0, count(c1), count(distinct c2) FROM tmp GROUP BY c0"); +} + TEST_F(AggregationTest, partialAggregationMemoryLimit) { auto vectors = { makeRowVector({makeFlatVector(