From b4ab5075a1f375b3a05fb8512bcab19c3d926465 Mon Sep 17 00:00:00 2001 From: Jimmy Lu Date: Thu, 10 Apr 2025 10:44:46 -0700 Subject: [PATCH 1/2] feat: Add Aggregate::addRawClusteredInput and streaming_aggregation_eager_flush (#12975) Summary: Add fast path for streaming aggregation where we have input rows from same group located together. For certain functions, we can leverage this property to reduce the number of copy calls and create larger and fewer ranges for copy. This brings 3x improvements for a specific query shape common in data loading for AI training. We implement this optimization for `arbitrary` and `array_agg`. For `arbitrary`, if the input is clustered, we just keep a reference to the input vector and index that is selected; when we extract values from the container, we group all copies from same vector to one `copyRange` call so the cost is minimized. For `array_agg`, we do similar thing, only track the range (offset and size) where the input will be taken for each group, and do the copy in bulk when we extract value. There is another optimization to flush the streaming aggregation output whenever there is result available, via a new query config `streaming_aggregation_eager_flush`. This allows us to minimize the memory used by accumulators. Differential Revision: D72677410 --- velox/core/QueryConfig.h | 15 ++ velox/docs/configs.rst | 20 ++ velox/exec/Aggregate.h | 31 +++ velox/exec/StreamingAggregation.cpp | 52 +++-- velox/exec/StreamingAggregation.h | 6 + velox/exec/tests/StreamingAggregationTest.cpp | 178 ++++++++++-------- .../aggregates/ArbitraryAggregate.cpp | 146 +++++++++++--- .../aggregates/ArrayAggAggregate.cpp | 170 ++++++++++++++--- .../aggregates/tests/ArbitraryTest.cpp | 34 ++++ .../aggregates/tests/ArrayAggTest.cpp | 35 ++++ velox/vector/BaseVector.h | 6 + 11 files changed, 553 insertions(+), 140 deletions(-) diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index e69c7955171b..6a5342d56dbe 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -531,6 +531,17 @@ class QueryConfig { static constexpr const char* kRequestDataSizesMaxWaitSec = "request_data_sizes_max_wait_sec"; + /// If this is false (the default), in streaming aggregation, wait until we + /// have enough number of output rows to produce a batch of size specified by + /// Operator::outputBatchRows. + /// + /// If this is true, we put the rows in output batch, as soon as the + /// corresponding groups are fully aggregated. This is useful for reducing + /// memory consumption, if the downstream operators are not sensitive to small + /// batch size. + static constexpr const char* kStreamingAggregationEagerFlush = + "streaming_aggregation_eager_flush"; + bool selectiveNimbleReaderEnabled() const { return get(kSelectiveNimbleReaderEnabled, false); } @@ -976,6 +987,10 @@ class QueryConfig { return get(kThrowExceptionOnDuplicateMapKeys, false); } + bool streamingAggregationEagerFlush() const { + return get(kStreamingAggregationEagerFlush, false); + } + template T get(const std::string& key, const T& defaultValue) const { return config_->get(key, defaultValue); diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index d5f339a13a1e..07d4e4758fa6 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -399,6 +399,26 @@ Spilling - 0 - Percentage of aggregation or join input batches that will be forced to spill for testing. 0 means no extra spilling. +Aggregation +----------- +.. list-table:: + :widths: 20 10 10 70 + :header-rows: 1 + + * - Property Name + - Type + - Default Value + - Description + * - streaming_aggregation_eager_flush + - bool + - false + - If this is false (the default), in streaming aggregation, wait until we + have enough number of output rows to produce a batch of size specified by + Operator::outputBatchRows. If this is true, we put the rows in output + batch, as soon as the corresponding groups are fully aggregated. This is + useful for reducing memory consumption, if the downstream operators are + not sensitive to small batch size. + Table Scan ------------ .. list-table:: diff --git a/velox/exec/Aggregate.h b/velox/exec/Aggregate.h index 67f64e5b8234..833f7717bb9a 100644 --- a/velox/exec/Aggregate.h +++ b/velox/exec/Aggregate.h @@ -171,6 +171,35 @@ class Aggregate { const std::vector& args, bool mayPushdown) = 0; + /// Called by aggregation operator to set whether the input data is eligible + /// for clustered input optimization. This is turned off, in cases for + /// example if the input rows from same group are not contiguous, or the + /// aggregate is sorted or distinct. + void setClusteredInput(bool value) { + clusteredInput_ = value; + } + + /// Whether the function itself supports clustered input optimization. + /// + /// When this returns true, `addRawClusteredInput` should be implemented. + virtual bool supportsAddRawClusteredInput() const { + return false; + } + + /// Fast path for the function when the input rows from same group are + /// clustered together. `groups`, `rows`, and `args` are the same as + /// `addRawInput`, `groupBoundaries` is the indices into `groups` indicating + /// the row after last row of each group. + /// + /// Will only be called when `supportAddRawClusteredInput` returns true. + virtual void addRawClusteredInput( + char** /*groups*/, + const SelectivityVector& /*rows*/, + const std::vector& /*args*/, + const folly::Range& /*groupBoundaries*/) { + VELOX_NYI("Unimplemented: {} {}", typeid(*this).name(), __func__); + } + // Updates final accumulators from intermediate results. // @param groups Pointers to the start of the group rows. These are aligned // with the 'args', e.g. data in the i-th row of the 'args' goes to the i-th @@ -468,6 +497,8 @@ class Aggregate { std::vector pushdownCustomIndices_; bool validateIntermediateInputs_ = false; + + bool clusteredInput_ = false; }; using AggregateFunctionFactory = std::function( diff --git a/velox/exec/StreamingAggregation.cpp b/velox/exec/StreamingAggregation.cpp index e4b0aa598c8e..9e91c2ccf993 100644 --- a/velox/exec/StreamingAggregation.cpp +++ b/velox/exec/StreamingAggregation.cpp @@ -32,7 +32,10 @@ StreamingAggregation::StreamingAggregation( : "Aggregation"), outputBatchSize_{outputBatchRows()}, aggregationNode_{aggregationNode}, - step_{aggregationNode->step()} { + step_{aggregationNode->step()}, + eagerFlush_{operatorCtx_->driverCtx() + ->queryConfig() + .streamingAggregationEagerFlush()} { if (aggregationNode_->ignoreNullKeys()) { VELOX_UNSUPPORTED( "Streaming aggregation doesn't support ignoring null keys yet"); @@ -75,6 +78,15 @@ void StreamingAggregation::initialize() { } } + if (isRawInput(step_)) { + for (column_index_t i = 0; i < aggregates_.size(); ++i) { + if (aggregates_[i].sortingKeys.empty() && !aggregates_[i].distinct) { + // Must be set before we initialize row container, because it could + // change the type and size of accumulator. + aggregates_[i].function->setClusteredInput(true); + } + } + } masks_ = std::make_unique(extractMaskChannels(aggregates_)); rows_ = makeRowContainer(groupingKeyTypes); @@ -175,6 +187,9 @@ RowVectorPtr StreamingAggregation::createOutput(size_t numGroups) { } } + std::rotate(groups_.begin(), groups_.begin() + numGroups, groups_.end()); + numGroups_ -= numGroups; + return output; } @@ -215,6 +230,15 @@ void StreamingAggregation::assignGroups() { } } } + + groupBoundaries_.clear(); + for (vector_size_t i = 1; i < numInput; ++i) { + if (inputGroups_[i] != inputGroups_[i - 1]) { + groupBoundaries_.push_back(i); + } + } + VELOX_CHECK_GT(numInput, 0); + groupBoundaries_.push_back(numInput); } const SelectivityVector& StreamingAggregation::getSelectivityVector( @@ -256,7 +280,12 @@ void StreamingAggregation::evaluateAggregates() { } if (isRawInput(step_)) { - function->addRawInput(inputGroups_.data(), rows, args, false); + if (function->supportsAddRawClusteredInput()) { + function->addRawClusteredInput( + inputGroups_.data(), rows, args, groupBoundaries_); + } else { + function->addRawInput(inputGroups_.data(), rows, args, false); + } } else { function->addIntermediateResults(inputGroups_.data(), rows, args, false); } @@ -274,9 +303,7 @@ bool StreamingAggregation::isFinished() { RowVectorPtr StreamingAggregation::getOutput() { if (!input_) { if (noMoreInput_ && numGroups_ > 0) { - auto output = createOutput(numGroups_); - numGroups_ = 0; - return output; + return createOutput(numGroups_); } return nullptr; } @@ -294,19 +321,10 @@ RowVectorPtr StreamingAggregation::getOutput() { evaluateAggregates(); RowVectorPtr output; - if (numGroups_ > outputBatchSize_) { + if (eagerFlush_ && numGroups_ > 1) { + output = createOutput(numGroups_ - 1); + } else if (numGroups_ > outputBatchSize_) { output = createOutput(outputBatchSize_); - - // Rotate the entries in the groups_ vector to move the remaining groups to - // the beginning and place re-usable groups at the end. - std::vector copy(groups_.size()); - std::copy(groups_.begin() + outputBatchSize_, groups_.end(), copy.begin()); - std::copy( - groups_.begin(), - groups_.begin() + outputBatchSize_, - copy.begin() + groups_.size() - outputBatchSize_); - groups_ = std::move(copy); - numGroups_ -= outputBatchSize_; } prevInput_ = input_; diff --git a/velox/exec/StreamingAggregation.h b/velox/exec/StreamingAggregation.h index ee70a1e8e627..cea7667665ea 100644 --- a/velox/exec/StreamingAggregation.h +++ b/velox/exec/StreamingAggregation.h @@ -91,6 +91,8 @@ class StreamingAggregation : public Operator { const core::AggregationNode::Step step_; + const bool eagerFlush_; + std::vector groupingKeys_; std::vector aggregates_; std::unique_ptr sortedAggregations_; @@ -117,6 +119,10 @@ class StreamingAggregation : public Operator { // Pointers to groups for all input rows. std::vector inputGroups_; + // Indices into `groups` indicating the row after last row of each group. The + // last element of this is the total size of input. + std::vector groupBoundaries_; + // A subset of input rows to evaluate the aggregate function on. Rows // where aggregation mask is false are excluded. SelectivityVector inputRows_; diff --git a/velox/exec/tests/StreamingAggregationTest.cpp b/velox/exec/tests/StreamingAggregationTest.cpp index bf0a4cf5b0c7..b22e46fc0213 100644 --- a/velox/exec/tests/StreamingAggregationTest.cpp +++ b/velox/exec/tests/StreamingAggregationTest.cpp @@ -20,29 +20,39 @@ #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/SumNonPODAggregate.h" -using namespace facebook::velox; +namespace facebook::velox::exec { +namespace { + using namespace facebook::velox::exec::test; -class StreamingAggregationTest : public OperatorTestBase { +class StreamingAggregationTest : public OperatorTestBase, + public testing::WithParamInterface { protected: void SetUp() override { OperatorTestBase::SetUp(); registerSumNonPODAggregate("sumnonpod", 64); } + bool eagerFlush() { + return GetParam(); + } + + AssertQueryBuilder& config( + AssertQueryBuilder builder, + uint32_t outputBatchSize) { + return builder + .config( + core::QueryConfig::kPreferredOutputBatchRows, + std::to_string(outputBatchSize)) + .config( + core::QueryConfig::kStreamingAggregationEagerFlush, + std::to_string(eagerFlush())); + } + void testAggregation( const std::vector& keys, uint32_t outputBatchSize) { - std::vector data; - - vector_size_t totalSize = 0; - for (const auto& keyVector : keys) { - auto size = keyVector->size(); - auto payload = makeFlatVector( - size, [totalSize](auto row) { return totalSize + row; }); - data.push_back(makeRowVector({keyVector, payload})); - totalSize += size; - } + auto data = addPayload(keys, 1); createDuckDbTable(data); auto plan = PlanBuilder() @@ -59,10 +69,7 @@ class StreamingAggregationTest : public OperatorTestBase { .finalAggregation() .planNode(); - AssertQueryBuilder(plan, duckDbQueryRunner_) - .config( - core::QueryConfig::kPreferredOutputBatchRows, - std::to_string(outputBatchSize)) + config(AssertQueryBuilder(plan, duckDbQueryRunner_), outputBatchSize) .assertResults( "SELECT c0, count(1), min(c1), max(c1), sum(c1), sum(1), sum(cast(NULL as INT))" " , approx_quantile(c1, 0.95) " @@ -80,10 +87,7 @@ class StreamingAggregationTest : public OperatorTestBase { .finalAggregation() .planNode(); - AssertQueryBuilder(plan, duckDbQueryRunner_) - .config( - core::QueryConfig::kPreferredOutputBatchRows, - std::to_string(outputBatchSize)) + config(AssertQueryBuilder(plan, duckDbQueryRunner_), outputBatchSize) .assertResults( "SELECT c0, count(1), min(c1), max(c1), sum(c1), sum(1) FROM tmp GROUP BY 1"); @@ -101,10 +105,7 @@ class StreamingAggregationTest : public OperatorTestBase { .finalAggregation() .planNode(); - AssertQueryBuilder(plan, duckDbQueryRunner_) - .config( - core::QueryConfig::kPreferredOutputBatchRows, - std::to_string(outputBatchSize)) + config(AssertQueryBuilder(plan, duckDbQueryRunner_), outputBatchSize) .assertResults( "SELECT c0, count(1), min(c1) filter (where c1 % 7 = 0), " "max(c1) filter (where c1 % 11 = 0), sum(c1) filter (where c1 % 7 = 0) " @@ -114,16 +115,7 @@ class StreamingAggregationTest : public OperatorTestBase { void testSortedAggregation( const std::vector& keys, uint32_t outputBatchSize) { - std::vector data; - - vector_size_t totalSize = 0; - for (const auto& keyVector : keys) { - auto size = keyVector->size(); - auto payload = makeFlatVector( - size, [totalSize](auto row) { return totalSize + row; }); - data.push_back(makeRowVector({keyVector, payload, payload})); - totalSize += size; - } + auto data = addPayload(keys, 2); createDuckDbTable(data); auto plan = PlanBuilder() @@ -138,10 +130,7 @@ class StreamingAggregationTest : public OperatorTestBase { false) .planNode(); - AssertQueryBuilder(plan, duckDbQueryRunner_) - .config( - core::QueryConfig::kPreferredOutputBatchRows, - std::to_string(outputBatchSize)) + config(AssertQueryBuilder(plan, duckDbQueryRunner_), outputBatchSize) .assertResults( "SELECT c0, max(c1 order by c2), max(c1 order by c2 desc), array_agg(c1 order by c2) FROM tmp GROUP BY c0"); } @@ -149,16 +138,7 @@ class StreamingAggregationTest : public OperatorTestBase { void testDistinctAggregation( const std::vector& keys, uint32_t outputBatchSize) { - std::vector data; - - vector_size_t totalSize = 0; - for (const auto& keyVector : keys) { - auto size = keyVector->size(); - auto payload = makeFlatVector( - size, [totalSize](auto row) { return totalSize + row; }); - data.push_back(makeRowVector({keyVector, payload, payload})); - totalSize += size; - } + auto data = addPayload(keys, 2); createDuckDbTable(data); { @@ -175,10 +155,7 @@ class StreamingAggregationTest : public OperatorTestBase { false) .planNode(); - AssertQueryBuilder(plan, duckDbQueryRunner_) - .config( - core::QueryConfig::kPreferredOutputBatchRows, - std::to_string(outputBatchSize)) + config(AssertQueryBuilder(plan, duckDbQueryRunner_), outputBatchSize) .assertResults( "SELECT c0, array_agg(distinct c1), array_agg(c1 order by c2), " "count(distinct c1), array_agg(c2) FROM tmp GROUP BY c0"); @@ -192,14 +169,31 @@ class StreamingAggregationTest : public OperatorTestBase { {"c0"}, {}, {}, core::AggregationNode::Step::kSingle, false) .planNode(); - AssertQueryBuilder(plan, duckDbQueryRunner_) - .config( - core::QueryConfig::kPreferredOutputBatchRows, - std::to_string(outputBatchSize)) + config(AssertQueryBuilder(plan, duckDbQueryRunner_), outputBatchSize) .assertResults("SELECT distinct c0 FROM tmp"); } } + std::vector addPayload( + const std::vector& keys, + int numPayloadColumns) { + std::vector data; + vector_size_t totalSize = 0; + for (const auto& keyVector : keys) { + auto size = keyVector->size(); + auto payload = makeFlatVector( + size, [totalSize](auto row) { return totalSize + row; }); + std::vector columns; + columns.push_back(keyVector); + for (int i = 0; i < numPayloadColumns; ++i) { + columns.push_back(payload); + } + data.push_back(makeRowVector(columns)); + totalSize += size; + } + return data; + } + std::vector addPayload(const std::vector& keys) { auto numKeys = keys[0]->type()->size(); @@ -263,10 +257,7 @@ class StreamingAggregationTest : public OperatorTestBase { keySql.str(), keySql.str()); - AssertQueryBuilder(plan, duckDbQueryRunner_) - .config( - core::QueryConfig::kPreferredOutputBatchRows, - std::to_string(outputBatchSize)) + config(AssertQueryBuilder(plan, duckDbQueryRunner_), outputBatchSize) .assertResults(sql); EXPECT_EQ(NonPODInt64::constructed, NonPODInt64::destructed); @@ -309,10 +300,7 @@ class StreamingAggregationTest : public OperatorTestBase { keySql.str(), keySql.str()); - AssertQueryBuilder(plan, duckDbQueryRunner_) - .config( - core::QueryConfig::kPreferredOutputBatchRows, - std::to_string(outputBatchSize)) + config(AssertQueryBuilder(plan, duckDbQueryRunner_), outputBatchSize) .assertResults(sql); EXPECT_EQ(NonPODInt64::constructed, NonPODInt64::destructed); @@ -338,16 +326,15 @@ class StreamingAggregationTest : public OperatorTestBase { const auto sql = fmt::format("SELECT distinct {} FROM tmp", keySql.str()); - AssertQueryBuilder(plan, duckDbQueryRunner_) - .config( - core::QueryConfig::kPreferredOutputBatchRows, - std::to_string(outputBatchSize)) + config(AssertQueryBuilder(plan, duckDbQueryRunner_), outputBatchSize) .assertResults(sql); } } }; -TEST_F(StreamingAggregationTest, smallInputBatches) { +VELOX_INSTANTIATE_TEST_SUITE_P(, StreamingAggregationTest, testing::Bool()); + +TEST_P(StreamingAggregationTest, smallInputBatches) { // Use grouping keys that span one or more batches. std::vector keys = { makeNullableFlatVector({1, 1, std::nullopt, 2, 2}), @@ -363,7 +350,7 @@ TEST_F(StreamingAggregationTest, smallInputBatches) { testAggregation(keys, 3); } -TEST_F(StreamingAggregationTest, multipleKeys) { +TEST_P(StreamingAggregationTest, multipleKeys) { std::vector keys = { makeRowVector({ makeFlatVector({1, 1, 2, 2, 2}), @@ -385,7 +372,7 @@ TEST_F(StreamingAggregationTest, multipleKeys) { testMultiKeyAggregation(keys, 3); } -TEST_F(StreamingAggregationTest, regularSizeInputBatches) { +TEST_P(StreamingAggregationTest, regularSizeInputBatches) { auto size = 1'024; std::vector keys = { @@ -404,7 +391,7 @@ TEST_F(StreamingAggregationTest, regularSizeInputBatches) { testAggregation(keys, 100); } -TEST_F(StreamingAggregationTest, uniqueKeys) { +TEST_P(StreamingAggregationTest, uniqueKeys) { auto size = 1'024; std::vector keys = { @@ -421,7 +408,7 @@ TEST_F(StreamingAggregationTest, uniqueKeys) { testAggregation(keys, 100); } -TEST_F(StreamingAggregationTest, partialStreaming) { +TEST_P(StreamingAggregationTest, partialStreaming) { auto size = 1'024; // Generate 2 key columns. First key is clustered / pre-grouped. Second key is @@ -462,7 +449,7 @@ TEST_F(StreamingAggregationTest, partialStreaming) { // Test StreamingAggregation being closed without being initialized. Create a // pipeline with Project followed by StreamingAggregation. Make // Project::initialize fail by using non-existent function. -TEST_F(StreamingAggregationTest, closeUninitialized) { +TEST_P(StreamingAggregationTest, closeUninitialized) { auto data = makeRowVector({ makeFlatVector({1, 2, 3}), }); @@ -489,7 +476,7 @@ TEST_F(StreamingAggregationTest, closeUninitialized) { "Scalar function name not registered: do-not-exist"); } -TEST_F(StreamingAggregationTest, sortedAggregations) { +TEST_P(StreamingAggregationTest, sortedAggregations) { auto size = 1024; std::vector keys = { @@ -505,7 +492,7 @@ TEST_F(StreamingAggregationTest, sortedAggregations) { testSortedAggregation(keys, 32); } -TEST_F(StreamingAggregationTest, distinctAggregations) { +TEST_P(StreamingAggregationTest, distinctAggregations) { auto size = 1024; std::vector keys = { @@ -538,3 +525,42 @@ TEST_F(StreamingAggregationTest, distinctAggregations) { testMultiKeyDistinctAggregation(multiKeys, 1024); testMultiKeyDistinctAggregation(multiKeys, 3); } + +TEST_P(StreamingAggregationTest, clusteredInput) { + std::vector keys = { + makeNullableFlatVector({1, 1, std::nullopt, 2, 2}), + makeFlatVector({2, 3, 3, 4}), + makeFlatVector({5, 6, 6, 6}), + makeFlatVector({6, 6, 6, 6}), + makeFlatVector({6, 7, 8}), + }; + auto data = addPayload(keys, 1); + auto plan = PlanBuilder() + .values(data) + .partialStreamingAggregation( + {"c0"}, {"count(c1)", "arbitrary(c1)", "array_agg(c1)"}) + .finalAggregation() + .planNode(); + auto expected = makeRowVector({ + makeNullableFlatVector({1, std::nullopt, 2, 3, 4, 5, 6, 7, 8}), + makeFlatVector({2, 1, 3, 2, 1, 1, 8, 1, 1}), + makeFlatVector({0, 2, 3, 6, 8, 9, 10, 18, 19}), + makeArrayVector( + {{0, 1}, + {2}, + {3, 4, 5}, + {6, 7}, + {8}, + {9}, + {10, 11, 12, 13, 14, 15, 16, 17}, + {18}, + {19}}), + }); + for (auto batchSize : {3, 20}) { + SCOPED_TRACE(fmt::format("batchSize={}", batchSize)); + config(AssertQueryBuilder(plan), batchSize).assertResults(expected); + } +} + +} // namespace +} // namespace facebook::velox::exec diff --git a/velox/functions/prestosql/aggregates/ArbitraryAggregate.cpp b/velox/functions/prestosql/aggregates/ArbitraryAggregate.cpp index d530f1a14658..1db2401aa226 100644 --- a/velox/functions/prestosql/aggregates/ArbitraryAggregate.cpp +++ b/velox/functions/prestosql/aggregates/ArbitraryAggregate.cpp @@ -152,6 +152,12 @@ inline int32_t ArbitraryAggregate::accumulatorAlignmentSize() const { return static_cast(sizeof(int128_t)); } +// In case of clustered input, we just keep a reference to the input vector. +struct ClusteredNonNumericAccumulator { + VectorPtr vector; + vector_size_t index; +}; + // Arbitrary for non-numeric types. We always keep the first (non-NULL) element // seen. Arbitrary (x) will produce partial and final aggregations of type x. class NonNumericArbitrary : public exec::Aggregate { @@ -159,10 +165,13 @@ class NonNumericArbitrary : public exec::Aggregate { explicit NonNumericArbitrary(const TypePtr& resultType) : exec::Aggregate(resultType) {} - // We use singleValueAccumulator to save the results for each group. This - // struct will allow us to save variable-width value. int32_t accumulatorFixedWidthSize() const override { - return sizeof(SingleValueAccumulator); + return clusteredInput_ ? sizeof(ClusteredNonNumericAccumulator) + : sizeof(SingleValueAccumulator); + } + + bool accumulatorUsesExternalMemory() const override { + return true; } void extractValues(char** groups, int32_t numGroups, VectorPtr* result) @@ -172,14 +181,41 @@ class NonNumericArbitrary : public exec::Aggregate { auto* rawNulls = exec::Aggregate::getRawNulls(result->get()); - for (int32_t i = 0; i < numGroups; ++i) { - char* group = groups[i]; - auto accumulator = value(group); - if (!accumulator->hasValue()) { - (*result)->setNull(i, true); - } else { - exec::Aggregate::clearNull(rawNulls, i); - accumulator->read(*result, i); + if (clusteredInput_) { + VectorPtr* currentSource = nullptr; + VELOX_DCHECK(copyRanges_.empty()); + for (vector_size_t i = 0; i < numGroups; ++i) { + auto* accumulator = value(groups[i]); + if (!accumulator->vector) { + (*result)->setNull(i, true); + continue; + } + if (currentSource && + currentSource->get() != accumulator->vector.get()) { + result->get()->copyRanges(currentSource->get(), copyRanges_); + copyRanges_.clear(); + } + currentSource = &accumulator->vector; + BaseVector::CopyRange range = {accumulator->index, i, 1}; + if (!copyRanges_.empty() && copyRanges_.back().mergeable(range)) { + ++copyRanges_.back().count; + } else { + copyRanges_.push_back(range); + } + } + if (currentSource) { + result->get()->copyRanges(currentSource->get(), copyRanges_); + copyRanges_.clear(); + } + } else { + for (int32_t i = 0; i < numGroups; ++i) { + auto* accumulator = value(groups[i]); + if (!accumulator->hasValue()) { + (*result)->setNull(i, true); + } else { + exec::Aggregate::clearNull(rawNulls, i); + accumulator->read(*result, i); + } } } } @@ -194,16 +230,17 @@ class NonNumericArbitrary : public exec::Aggregate { const SelectivityVector& rows, const std::vector& args, bool /*unused*/) override { - DecodedVector decoded(*args[0], rows, true); - if (decoded.isConstantMapping() && decoded.isNullAt(rows.begin())) { + VELOX_CHECK(!clusteredInput_); + decoded_.decode(*args[0], rows, true); + if (decoded_.isConstantMapping() && decoded_.isNullAt(rows.begin())) { // nothing to do; all values are nulls return; } - const auto* indices = decoded.indices(); - const auto* baseVector = decoded.base(); + const auto* indices = decoded_.indices(); + const auto* baseVector = decoded_.base(); rows.applyToSelected([&](vector_size_t i) { - if (decoded.isNullAt(i)) { + if (decoded_.isNullAt(i)) { return; } auto* accumulator = value(groups[i]); @@ -213,6 +250,47 @@ class NonNumericArbitrary : public exec::Aggregate { }); } + bool supportsAddRawClusteredInput() const override { + return clusteredInput_; + } + + void addRawClusteredInput( + char** groups, + const SelectivityVector& rows, + const std::vector& args, + const folly::Range& groupBoundaries) override { + VELOX_CHECK(clusteredInput_); + decoded_.decode(*args[0]); + VELOX_DCHECK(copyRanges_.empty()); + vector_size_t groupStart = 0; + auto forEachEmptyAccumulator = [&](auto func) { + for (auto groupEnd : groupBoundaries) { + auto* accumulator = + value(groups[groupEnd - 1]); + if (!accumulator->vector) { + func(groupEnd, accumulator); + } + groupStart = groupEnd; + } + }; + if (rows.isAllSelected() && !decoded_.mayHaveNulls()) { + forEachEmptyAccumulator([&](auto /*groupEnd*/, auto* accumulator) { + accumulator->vector = args[0]; + accumulator->index = groupStart; + }); + } else { + forEachEmptyAccumulator([&](auto groupEnd, auto* accumulator) { + for (auto i = groupStart; i < groupEnd; ++i) { + if (rows.isValid(i) && !decoded_.isNullAt(i)) { + accumulator->vector = args[0]; + accumulator->index = i; + break; + } + } + }); + } + } + void addIntermediateResults( char** groups, const SelectivityVector& rows, @@ -226,22 +304,23 @@ class NonNumericArbitrary : public exec::Aggregate { const SelectivityVector& rows, const std::vector& args, bool /*unused*/) override { + VELOX_CHECK(!clusteredInput_); auto* accumulator = value(group); if (accumulator->hasValue()) { return; } - DecodedVector decoded(*args[0], rows, true); - if (decoded.isConstantMapping() && decoded.isNullAt(rows.begin())) { + decoded_.decode(*args[0], rows, true); + if (decoded_.isConstantMapping() && decoded_.isNullAt(rows.begin())) { // nothing to do; all values are nulls return; } - const auto* indices = decoded.indices(); - const auto* baseVector = decoded.base(); + const auto* indices = decoded_.indices(); + const auto* baseVector = decoded_.base(); // Find the first non-null value. rows.testSelected([&](vector_size_t i) { - if (!decoded.isNullAt(i)) { + if (!decoded_.isNullAt(i)) { accumulator->write(baseVector, indices[i], allocator_); return false; // Stop } @@ -258,23 +337,40 @@ class NonNumericArbitrary : public exec::Aggregate { } protected: - // Initialize each group, we will not use the null flags because - // SingleValueAccumulator has its own flag. + // Initialize each group, we will not use the null flags because the + // accumulator has its own flag. void initializeNewGroupsInternal( char** groups, folly::Range indices) override { for (auto i : indices) { - new (groups[i] + offset_) SingleValueAccumulator(); + if (clusteredInput_) { + new (groups[i] + offset_) ClusteredNonNumericAccumulator(); + } else { + new (groups[i] + offset_) SingleValueAccumulator(); + } } } void destroyInternal(folly::Range groups) override { for (auto group : groups) { if (isInitialized(group)) { - value(group)->destroy(allocator_); + if (clusteredInput_) { + auto* accumulator = value(group); + std::destroy_at(accumulator); + } else { + auto* accumulator = value(group); + accumulator->destroy(allocator_); + } } } } + + private: + // Decoded input vector. + DecodedVector decoded_; + + // Copy ranges used when extracting from ClusteredNonNumericAccumulator. + std::vector copyRanges_; }; } // namespace diff --git a/velox/functions/prestosql/aggregates/ArrayAggAggregate.cpp b/velox/functions/prestosql/aggregates/ArrayAggAggregate.cpp index 7bc41c4579a9..051e9e9cad8b 100644 --- a/velox/functions/prestosql/aggregates/ArrayAggAggregate.cpp +++ b/velox/functions/prestosql/aggregates/ArrayAggAggregate.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ #include "velox/functions/prestosql/aggregates/ArrayAggAggregate.h" +#include "folly/container/small_vector.h" #include "velox/exec/ContainerRowSerde.h" #include "velox/expression/FunctionSignature.h" #include "velox/functions/lib/aggregates/ValueList.h" @@ -26,13 +27,28 @@ struct ArrayAccumulator { ValueList elements; }; +struct SourceRange { + VectorPtr vector; + vector_size_t offset; + vector_size_t size; +}; + +struct ClusteredAccumulator { + folly::small_vector sources; +}; + class ArrayAggAggregate : public exec::Aggregate { public: explicit ArrayAggAggregate(TypePtr resultType, bool ignoreNulls) : Aggregate(resultType), ignoreNulls_(ignoreNulls) {} int32_t accumulatorFixedWidthSize() const override { - return sizeof(ArrayAccumulator); + return clusteredInput_ ? sizeof(ClusteredAccumulator) + : sizeof(ArrayAccumulator); + } + + bool accumulatorUsesExternalMemory() const override { + return true; } bool isFixedSize() const override { @@ -97,26 +113,70 @@ class ArrayAggAggregate : public exec::Aggregate { auto vector = (*result)->as(); VELOX_CHECK(vector); vector->resize(numGroups); - - auto elements = vector->elements(); + uint64_t* rawNulls = getRawNulls(vector); + auto* resultOffsets = + vector->mutableOffsets(numGroups)->asMutable(); + auto* resultSizes = + vector->mutableSizes(numGroups)->asMutable(); + auto& elements = vector->elements(); elements->resize(countElements(groups, numGroups)); + vector_size_t arrayOffset = 0; + + if (clusteredInput_) { + VectorPtr* currentSource = nullptr; + std::vector ranges; + for (int32_t i = 0; i < numGroups; ++i) { + auto* accumulator = value(groups[i]); + resultOffsets[i] = arrayOffset; + vector_size_t arraySize = 0; + for (auto& source : accumulator->sources) { + if (currentSource && currentSource->get() != source.vector.get()) { + elements->copyRanges(currentSource->get(), ranges); + ranges.clear(); + } + if (!currentSource || currentSource->get() != source.vector.get()) { + currentSource = &source.vector; + ranges.push_back({source.offset, arrayOffset, source.size}); + } else if ( + ranges.back().sourceIndex + ranges.back().count == + source.offset) { + ranges.back().count += source.size; + } else { + VELOX_DCHECK_LT( + ranges.back().sourceIndex + ranges.back().count, source.offset); + ranges.push_back({source.offset, arrayOffset, source.size}); + } + arrayOffset += source.size; + arraySize += source.size; + } + resultSizes[i] = arraySize; + if (arraySize == 0) { + vector->setNull(i, true); + } else { + clearNull(rawNulls, i); + } + } + if (currentSource) { + VELOX_DCHECK(!ranges.empty()); + elements->copyRanges(currentSource->get(), ranges); + } - uint64_t* rawNulls = getRawNulls(vector); - vector_size_t offset = 0; - for (int32_t i = 0; i < numGroups; ++i) { - auto& values = value(groups[i])->elements; - auto arraySize = values.size(); - if (arraySize) { - clearNull(rawNulls, i); - - ValueListReader reader(values); - for (auto index = 0; index < arraySize; ++index) { - reader.next(*elements, offset + index); + } else { + for (int32_t i = 0; i < numGroups; ++i) { + auto& values = value(groups[i])->elements; + auto arraySize = values.size(); + if (arraySize) { + clearNull(rawNulls, i); + ValueListReader reader(values); + for (auto index = 0; index < arraySize; ++index) { + reader.next(*elements, arrayOffset + index); + } + resultOffsets[i] = arrayOffset; + resultSizes[i] = arraySize; + arrayOffset += arraySize; + } else { + vector->setNull(i, true); } - vector->setOffsetAndSize(i, offset, arraySize); - offset += arraySize; - } else { - vector->setNull(i, true); } } } @@ -131,6 +191,7 @@ class ArrayAggAggregate : public exec::Aggregate { const SelectivityVector& rows, const std::vector& args, bool /*mayPushdown*/) override { + VELOX_CHECK(!clusteredInput_); decodedElements_.decode(*args[0], rows); rows.applyToSelected([&](vector_size_t row) { if (ignoreNulls_ && decodedElements_.isNullAt(row)) { @@ -143,6 +204,51 @@ class ArrayAggAggregate : public exec::Aggregate { }); } + bool supportsAddRawClusteredInput() const override { + return clusteredInput_; + } + + void addRawClusteredInput( + char** groups, + const SelectivityVector& rows, + const std::vector& args, + const folly::Range& groupBoundaries) override { + VELOX_CHECK(clusteredInput_); + decodedElements_.decode(*args[0]); + vector_size_t groupStart = 0; + auto forEachAccumulator = [&](auto func) { + for (auto groupEnd : groupBoundaries) { + auto* accumulator = value(groups[groupEnd - 1]); + func(groupEnd, accumulator); + groupStart = groupEnd; + } + }; + if (rows.isAllSelected() && + (!ignoreNulls_ || !decodedElements_.mayHaveNulls())) { + forEachAccumulator([&](auto groupEnd, auto* accumulator) { + accumulator->sources.push_back( + {args[0], groupStart, groupEnd - groupStart}); + }); + } else { + forEachAccumulator([&](auto groupEnd, auto* accumulator) { + for (auto i = groupStart; i < groupEnd; ++i) { + if (!rows.isValid(i) || + (ignoreNulls_ && decodedElements_.isNullAt(i))) { + if (i > groupStart) { + accumulator->sources.push_back( + {args[0], groupStart, i - groupStart}); + } + groupStart = i + 1; + } + } + if (groupEnd > groupStart) { + accumulator->sources.push_back( + {args[0], groupStart, groupEnd - groupStart}); + } + }); + } + } + void addIntermediateResults( char** groups, const SelectivityVector& rows, @@ -171,6 +277,7 @@ class ArrayAggAggregate : public exec::Aggregate { const SelectivityVector& rows, const std::vector& args, bool /* mayPushdown */) override { + VELOX_CHECK(!clusteredInput_); auto& values = value(group)->elements; decodedElements_.decode(*args[0], rows); @@ -188,6 +295,7 @@ class ArrayAggAggregate : public exec::Aggregate { const SelectivityVector& rows, const std::vector& args, bool /* mayPushdown */) override { + VELOX_CHECK(!clusteredInput_); decodedIntermediate_.decode(*args[0], rows); auto arrayVector = decodedIntermediate_.base()->as(); @@ -210,14 +318,23 @@ class ArrayAggAggregate : public exec::Aggregate { char** groups, folly::Range indices) override { for (auto index : indices) { - new (groups[index] + offset_) ArrayAccumulator(); + if (clusteredInput_) { + new (groups[index] + offset_) ClusteredAccumulator(); + } else { + new (groups[index] + offset_) ArrayAccumulator(); + } } } void destroyInternal(folly::Range groups) override { for (auto group : groups) { if (isInitialized(group)) { - value(group)->elements.free(allocator_); + if (clusteredInput_) { + auto* accumulator = value(group); + std::destroy_at(accumulator); + } else { + value(group)->elements.free(allocator_); + } } } } @@ -225,8 +342,17 @@ class ArrayAggAggregate : public exec::Aggregate { private: vector_size_t countElements(char** groups, int32_t numGroups) const { vector_size_t size = 0; - for (int32_t i = 0; i < numGroups; ++i) { - size += value(groups[i])->elements.size(); + if (clusteredInput_) { + for (int32_t i = 0; i < numGroups; ++i) { + auto* accumulator = value(groups[i]); + for (auto& source : accumulator->sources) { + size += source.size; + } + } + } else { + for (int32_t i = 0; i < numGroups; ++i) { + size += value(groups[i])->elements.size(); + } } return size; } diff --git a/velox/functions/prestosql/aggregates/tests/ArbitraryTest.cpp b/velox/functions/prestosql/aggregates/tests/ArbitraryTest.cpp index 29cbc6837ddf..cba1204bb41a 100644 --- a/velox/functions/prestosql/aggregates/tests/ArbitraryTest.cpp +++ b/velox/functions/prestosql/aggregates/tests/ArbitraryTest.cpp @@ -465,5 +465,39 @@ TEST_F(ArbitraryTest, spilling) { ::facebook::velox::test::assertEqualVectors(expected, result); } +TEST_F(ArbitraryTest, clusteredInput) { + constexpr int kSize = 1000; + auto data = makeRowVector({ + makeFlatVector(kSize, [](auto i) { return i / 13; }), + makeFlatVector( + kSize, [](auto i) { return std::to_string(i); }), + makeFlatVector(kSize, [](auto i) { return i % 11 == 0; }), + }); + createDuckDbTable({data}); + for (bool mask : {false, true}) { + auto builder = PlanBuilder().values({data}); + std::string expected; + if (mask) { + builder.partialStreamingAggregation({"c0"}, {"arbitrary(c1)"}, {"c2"}); + expected = "select c0, first(c1) filter (where c2) from tmp group by 1"; + } else { + builder.partialStreamingAggregation({"c0"}, {"arbitrary(c1)"}); + expected = "select c0, first(c1) from tmp group by 1"; + } + auto plan = builder.finalAggregation().planNode(); + for (int batchRows : {1024, 17}) { + for (bool eagerFlush : {false, true}) { + SCOPED_TRACE(fmt::format( + "mask={} batchRows={} eagerFlush={}", mask, batchRows, eagerFlush)); + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchRows, batchRows) + .config( + core::QueryConfig::kStreamingAggregationEagerFlush, eagerFlush) + .assertResults(expected); + } + } + } +} + } // namespace } // namespace facebook::velox::aggregate::test diff --git a/velox/functions/prestosql/aggregates/tests/ArrayAggTest.cpp b/velox/functions/prestosql/aggregates/tests/ArrayAggTest.cpp index 56c94257ff5b..4906c6ac2f20 100644 --- a/velox/functions/prestosql/aggregates/tests/ArrayAggTest.cpp +++ b/velox/functions/prestosql/aggregates/tests/ArrayAggTest.cpp @@ -557,5 +557,40 @@ TEST_F(ArrayAggTest, mask) { testFunction("simple_array_agg"); } +TEST_F(ArrayAggTest, clusteredInput) { + constexpr int kSize = 1000; + auto data = makeRowVector({ + makeFlatVector(kSize, [](auto i) { return i / 13; }), + makeFlatVector( + kSize, [](auto i) { return i; }, [](auto i) { return i % 17 == 0; }), + makeFlatVector(kSize, [](auto i) { return i % 11 == 0; }), + }); + createDuckDbTable({data}); + for (bool mask : {false, true}) { + auto builder = PlanBuilder().values({data}); + std::string expected; + if (mask) { + builder.partialStreamingAggregation({"c0"}, {"array_agg(c1)"}, {"c2"}); + expected = + "select c0, array_agg(c1) filter (where c2) from tmp group by 1"; + } else { + builder.partialStreamingAggregation({"c0"}, {"array_agg(c1)"}); + expected = "select c0, array_agg(c1) from tmp group by 1"; + } + auto plan = builder.finalAggregation().planNode(); + for (int batchRows : {1024, 17}) { + for (bool eagerFlush : {false, true}) { + SCOPED_TRACE(fmt::format( + "mask={} batchRows={} eagerFlush={}", mask, batchRows, eagerFlush)); + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchRows, batchRows) + .config( + core::QueryConfig::kStreamingAggregationEagerFlush, eagerFlush) + .assertResults(expected); + } + } + } +} + } // namespace } // namespace facebook::velox::aggregate::test diff --git a/velox/vector/BaseVector.h b/velox/vector/BaseVector.h index bf88943dc9b3..c9322ea3a6be 100644 --- a/velox/vector/BaseVector.h +++ b/velox/vector/BaseVector.h @@ -430,6 +430,12 @@ class BaseVector { vector_size_t sourceIndex; vector_size_t targetIndex; vector_size_t count; + + /// Whether `next` can be merged with this range to form a new range. + bool mergeable(const CopyRange& next) const { + return next.sourceIndex == sourceIndex + count && + next.targetIndex == targetIndex + count; + } }; /// Sets null flags for each row in 'ranges' to 'isNull'. From 41d95048352be43cd10154bfe08bd2bbee4bc41e Mon Sep 17 00:00:00 2001 From: Jimmy Lu Date: Thu, 10 Apr 2025 10:44:46 -0700 Subject: [PATCH 2/2] fix: Optimize RowType::hashKind Summary: Avoid recurively rehash for `RowType`. This reduces the expression compilation time significantly for flatmap types commonly seen in ML workload. Differential Revision: D72803509 --- velox/type/Type.cpp | 8 ++++++++ velox/type/Type.h | 6 +++++- velox/type/tests/TypeTest.cpp | 23 +++++++++++++++++++++++ 3 files changed, 36 insertions(+), 1 deletion(-) diff --git a/velox/type/Type.cpp b/velox/type/Type.cpp index 8caa26745cac..f7791200f2e5 100644 --- a/velox/type/Type.cpp +++ b/velox/type/Type.cpp @@ -488,6 +488,14 @@ bool RowType::equals(const Type& other) const { return true; } +size_t RowType::hashKind() const { + if (!hashKindComputed_.load(std::memory_order_relaxed)) { + hashKind_ = TypeBase::hashKind(); + hashKindComputed_ = true; + } + return hashKind_; +} + void RowType::printChildren(std::stringstream& ss, std::string_view delimiter) const { bool any = false; diff --git a/velox/type/Type.h b/velox/type/Type.h index 0fb455cd9305..2b272d3cc52d 100644 --- a/velox/type/Type.h +++ b/velox/type/Type.h @@ -535,7 +535,7 @@ class Type : public Tree, public velox::ISerializable { static void registerSerDe(); /// Recursive kind hashing (uses only TypeKind). - size_t hashKind() const; + virtual size_t hashKind() const; /// Recursive kind match (uses only TypeKind). bool kindEquals(const std::shared_ptr& other) const; @@ -1070,6 +1070,8 @@ class RowType : public TypeBase { return *parameters; } + size_t hashKind() const override; + protected: bool equals(const Type& other) const override; @@ -1079,6 +1081,8 @@ class RowType : public TypeBase { const std::vector names_; const std::vector> children_; mutable std::atomic*> parameters_{nullptr}; + mutable std::atomic_bool hashKindComputed_{false}; + mutable std::atomic_size_t hashKind_; }; using RowTypePtr = std::shared_ptr; diff --git a/velox/type/tests/TypeTest.cpp b/velox/type/tests/TypeTest.cpp index e321fa0bbcd8..f315c1490458 100644 --- a/velox/type/tests/TypeTest.cpp +++ b/velox/type/tests/TypeTest.cpp @@ -559,6 +559,29 @@ TEST(TypeTest, rowParametersMultiThreaded) { } } +TEST(TypeTest, rowHashKindMultiThreaded) { + std::vector names; + std::vector types; + for (int i = 0; i < 20'000; ++i) { + auto name = fmt::format("c{}", i); + names.push_back(name); + types.push_back(ROW({name}, {BIGINT()})); + } + auto type = ROW(std::move(names), std::move(types)); + constexpr int kNumThreads = 72; + size_t hashes[kNumThreads]; + std::vector threads; + for (int i = 0; i < kNumThreads; ++i) { + threads.emplace_back([&, i] { hashes[i] = type->hashKind(); }); + } + for (auto& thread : threads) { + thread.join(); + } + for (int i = 1; i < kNumThreads; ++i) { + ASSERT_TRUE(hashes[i] == hashes[0]); + } +} + class Foo {}; class Bar {};