From d122b44b202f064d8eebec9822c7d32def7f1ab5 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Tue, 6 May 2025 13:51:32 +0100 Subject: [PATCH 1/4] fix the step --- velox/experimental/cudf/exec/CudfHashAggregation.cpp | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/velox/experimental/cudf/exec/CudfHashAggregation.cpp b/velox/experimental/cudf/exec/CudfHashAggregation.cpp index 5cb364256857..2c50e7d4f2ea 100644 --- a/velox/experimental/cudf/exec/CudfHashAggregation.cpp +++ b/velox/experimental/cudf/exec/CudfHashAggregation.cpp @@ -369,6 +369,15 @@ std::unique_ptr createAggregator( bool isGlobal) { // Companion function may be count_merge_extract or count_partial or others, // so use this to map + std::unordered_map companionStep = { + {"_partial", core::AggregationNode::Step::kPartial}, + {"_merge", core::AggregationNode::Step::kFinal}, + {"_merge_extract", core::AggregationNode::Step::kIntermediate}}; + for (const auto& [k, v] : companionStep) { + if (folly::StringPiece(kind).endsWith(k)) { + step = v; + } + } if (kind.rfind("sum", 0) == 0) { return std::make_unique( step, inputIndex, constant, isGlobal); From 27e61130e4ee0d2b0f9e0ac91fcb9e6574d85e87 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Wed, 7 May 2025 09:35:59 +0100 Subject: [PATCH 2/4] fix --- velox/experimental/cudf/exec/CudfHashAggregation.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/velox/experimental/cudf/exec/CudfHashAggregation.cpp b/velox/experimental/cudf/exec/CudfHashAggregation.cpp index 2c50e7d4f2ea..040fc5e72cae 100644 --- a/velox/experimental/cudf/exec/CudfHashAggregation.cpp +++ b/velox/experimental/cudf/exec/CudfHashAggregation.cpp @@ -371,8 +371,8 @@ std::unique_ptr createAggregator( // so use this to map std::unordered_map companionStep = { {"_partial", core::AggregationNode::Step::kPartial}, - {"_merge", core::AggregationNode::Step::kFinal}, - {"_merge_extract", core::AggregationNode::Step::kIntermediate}}; + {"_merge", core::AggregationNode::Step::kIntermediate}, + {"_merge_extract", core::AggregationNode::Step::kFinal}}; for (const auto& [k, v] : companionStep) { if (folly::StringPiece(kind).endsWith(k)) { step = v; From 5793a758ca2a470f14fdd15bca454e6668e764b2 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Mon, 12 May 2025 16:45:57 +0000 Subject: [PATCH 3/4] Changes to make companion aggs work post streaming aggs changes --- .../cudf/exec/CudfHashAggregation.cpp | 59 ++++++++++++++----- .../cudf/exec/CudfHashAggregation.h | 1 + 2 files changed, 45 insertions(+), 15 deletions(-) diff --git a/velox/experimental/cudf/exec/CudfHashAggregation.cpp b/velox/experimental/cudf/exec/CudfHashAggregation.cpp index 79d71c9b3881..c8912faca25c 100644 --- a/velox/experimental/cudf/exec/CudfHashAggregation.cpp +++ b/velox/experimental/cudf/exec/CudfHashAggregation.cpp @@ -392,17 +392,6 @@ std::unique_ptr createAggregator( uint32_t inputIndex, VectorPtr constant, bool isGlobal) { - // Companion function may be count_merge_extract or count_partial or others, - // so use this to map - std::unordered_map companionStep = { - {"_partial", core::AggregationNode::Step::kPartial}, - {"_merge", core::AggregationNode::Step::kIntermediate}, - {"_merge_extract", core::AggregationNode::Step::kFinal}}; - for (const auto& [k, v] : companionStep) { - if (folly::StringPiece(kind).endsWith(k)) { - step = v; - } - } if (kind.rfind("sum", 0) == 0) { return std::make_unique( step, inputIndex, constant, isGlobal); @@ -423,6 +412,43 @@ std::unique_ptr createAggregator( } } +static const std::unordered_map + companionStep = { + {"_partial", core::AggregationNode::Step::kPartial}, + {"_merge", core::AggregationNode::Step::kIntermediate}, + {"_merge_extract", core::AggregationNode::Step::kFinal}}; + +/// \brief Convert companion function to step for the aggregation function +/// +/// Companion functions are functions that are registered in velox along with +/// their main aggregation functions. These are designed to always function +/// with a fixed `step`. This is to allow spark style planNodes where `step` is +/// the property of the aggregation function rather than the planNode. +/// Companion functions allow us to override the planNode's step and use +/// aggregations of different steps in the same planNode +core::AggregationNode::Step getCompanionStep( + std::string const& kind, + core::AggregationNode::Step step) { + for (const auto& [k, v] : companionStep) { + if (folly::StringPiece(kind).endsWith(k)) { + step = v; + } + } + return step; +} + +bool hasCompanionAggs( + std::vector const& aggregates) { + for (auto const& aggregate : aggregates) { + for (const auto& [k, v] : companionStep) { + if (folly::StringPiece(aggregate.call->name()).endsWith(k)) { + return true; + } + } + } + return false; +} + auto toAggregators( core::AggregationNode const& aggregationNode, exec::OperatorCtx const& operatorCtx) { @@ -462,8 +488,9 @@ auto toAggregators( auto const kind = aggregate.call->name(); auto const inputIndex = aggInputs[0]; auto const constant = aggConstants.empty() ? nullptr : aggConstants[0]; + auto const companionStep = getCompanionStep(kind, step); aggregators.push_back( - createAggregator(step, kind, inputIndex, constant, isGlobal)); + createAggregator(companionStep, kind, inputIndex, constant, isGlobal)); } return aggregators; } @@ -517,6 +544,7 @@ CudfHashAggregation::CudfHashAggregation( isPartialOutput_(exec::isPartialOutput(aggregationNode->step())), isGlobal_(aggregationNode->groupingKeys().empty()), isDistinct_(!isGlobal_ && aggregationNode->aggregates().empty()), + hasCompanionAggs_(hasCompanionAggs(aggregationNode->aggregates())), maxPartialAggregationMemoryUsage_( driverCtx->queryConfig().maxPartialAggregationMemoryUsage()) {} @@ -799,9 +827,10 @@ RowVectorPtr CudfHashAggregation::getOutput() { // Handle partial groupby. if (isPartialOutput_ && !isGlobal_) { - if (partialOutput_ && - partialOutput_->estimateFlatSize() > - maxPartialAggregationMemoryUsage_) { + if (hasCompanionAggs_ || + (partialOutput_ && + partialOutput_->estimateFlatSize() > + maxPartialAggregationMemoryUsage_)) { // This is basically a flush of the partial output. return releaseAndResetPartialOutput(); } diff --git a/velox/experimental/cudf/exec/CudfHashAggregation.h b/velox/experimental/cudf/exec/CudfHashAggregation.h index 3b828e18e3cb..0df8c78287f7 100644 --- a/velox/experimental/cudf/exec/CudfHashAggregation.h +++ b/velox/experimental/cudf/exec/CudfHashAggregation.h @@ -125,6 +125,7 @@ class CudfHashAggregation : public exec::Operator, public NvtxHelper { // Distinct means it's a count distinct on the groupby keys, without any // aggregations const bool isDistinct_; + const bool hasCompanionAggs_; // Maximum memory usage for partial aggregation. const int64_t maxPartialAggregationMemoryUsage_; From 8ca78bd12ba479745e5ec0bfd79d8d69d8ab02ca Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Tue, 13 May 2025 07:25:31 +0000 Subject: [PATCH 4/4] working test --- .../cudf/tests/AggregationTest.cpp | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/velox/experimental/cudf/tests/AggregationTest.cpp b/velox/experimental/cudf/tests/AggregationTest.cpp index 2f405b672e2b..a55b7256aa2b 100644 --- a/velox/experimental/cudf/tests/AggregationTest.cpp +++ b/velox/experimental/cudf/tests/AggregationTest.cpp @@ -479,6 +479,33 @@ TEST_F(AggregationTest, countPartialFinalGlobal) { assertQuery(op, "SELECT count(*) FROM tmp"); } +/// Tests the spark scenario of having different types of aggs in the same +/// planNode Specific example being tested is +/// https://github.com/facebookincubator/velox/issues/12830#issuecomment-2783340233 +TEST_F(AggregationTest, CompanionAggs) { + std::vector keys0{1, 1, 1, 2, 1, 1, 2, 2}; + std::vector keys1{1, 2, 1, 2, 1, 2, 1, 2}; + std::vector values{1, 2, 3, 4, 5, 6, 7, 8}; + auto rowVector = makeRowVector( + {makeFlatVector(keys0), + makeFlatVector(keys1), + makeFlatVector(values)}); + + createDuckDbTable({rowVector}); + + auto op = + PlanBuilder() + .values({rowVector}) + .singleAggregation({"c2", "c0"}, {"count_partial(c1)"}) + .localPartition({"c2", "c0"}) + .singleAggregation({"c0"}, {"count_merge(a0)", "count_partial(c2)"}) + .localPartition({"c0"}) + .singleAggregation({"c0"}, {"count_merge(a0)", "count_merge(a1)"}) + .planNode(); + assertQuery( + op, "SELECT c0, count(c1), count(distinct c2) FROM tmp GROUP BY c0"); +} + TEST_F(AggregationTest, partialAggregationMemoryLimit) { auto vectors = { makeRowVector({makeFlatVector(