diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index e69c7955171b..6a5342d56dbe 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -531,6 +531,17 @@ class QueryConfig { static constexpr const char* kRequestDataSizesMaxWaitSec = "request_data_sizes_max_wait_sec"; + /// If this is false (the default), in streaming aggregation, wait until we + /// have enough number of output rows to produce a batch of size specified by + /// Operator::outputBatchRows. + /// + /// If this is true, we put the rows in output batch, as soon as the + /// corresponding groups are fully aggregated. This is useful for reducing + /// memory consumption, if the downstream operators are not sensitive to small + /// batch size. + static constexpr const char* kStreamingAggregationEagerFlush = + "streaming_aggregation_eager_flush"; + bool selectiveNimbleReaderEnabled() const { return get(kSelectiveNimbleReaderEnabled, false); } @@ -976,6 +987,10 @@ class QueryConfig { return get(kThrowExceptionOnDuplicateMapKeys, false); } + bool streamingAggregationEagerFlush() const { + return get(kStreamingAggregationEagerFlush, false); + } + template T get(const std::string& key, const T& defaultValue) const { return config_->get(key, defaultValue); diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index d5f339a13a1e..8f62f2425f83 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -31,14 +31,6 @@ Generic Configuration - integer - 5000 - TableScan operator will exit getOutput() method after this many milliseconds even if it has no data to return yet. Zero means 'no time limit'. - * - abandon_partial_aggregation_min_rows - - integer - - 100,000 - - Number of input rows to receive before starting to check whether to abandon partial aggregation. - * - abandon_partial_aggregation_min_pct - - integer - - 80 - - Abandons partial aggregation if number of groups equals or exceeds this percentage of the number of input rows. * - abandon_partial_topn_row_number_min_rows - integer - 100,000 @@ -399,6 +391,34 @@ Spilling - 0 - Percentage of aggregation or join input batches that will be forced to spill for testing. 0 means no extra spilling. +Aggregation +----------- +.. list-table:: + :widths: 20 10 10 70 + :header-rows: 1 + + * - Property Name + - Type + - Default Value + - Description + * - abandon_partial_aggregation_min_rows + - integer + - 100,000 + - Number of input rows to receive before starting to check whether to abandon partial aggregation. + * - abandon_partial_aggregation_min_pct + - integer + - 80 + - Abandons partial aggregation if number of groups equals or exceeds this percentage of the number of input rows. + * - streaming_aggregation_eager_flush + - bool + - false + - If this is false (the default), in streaming aggregation, wait until we + have enough number of output rows to produce a batch of size specified by + Operator::outputBatchRows. If this is true, we put the rows in output + batch, as soon as the corresponding groups are fully aggregated. This is + useful for reducing memory consumption, if the downstream operators are + not sensitive to small batch size. + Table Scan ------------ .. list-table:: diff --git a/velox/exec/Aggregate.h b/velox/exec/Aggregate.h index 67f64e5b8234..17035b8a34f5 100644 --- a/velox/exec/Aggregate.h +++ b/velox/exec/Aggregate.h @@ -171,6 +171,35 @@ class Aggregate { const std::vector& args, bool mayPushdown) = 0; + /// Called by aggregation operator to set whether the input data is eligible + /// for clustered input optimization. This is turned off, in cases for + /// example if the input rows from same group are not contiguous, or the + /// aggregate is sorted or distinct. + void setClusteredInput(bool value) { + clusteredInput_ = value; + } + + /// Whether the function itself supports clustered input optimization. + /// + /// When this returns true, `addRawClusteredInput` should be implemented. + virtual bool supportsAddRawClusteredInput() const { + return false; + } + + /// Fast path for the function when the input rows from same group are + /// clustered together. `groups`, `rows`, and `args` are the same as + /// `addRawInput`, `groupBoundaries` is the indices into `groups` indicating + /// the row after last row of each group. + /// + /// Will only be called when `supportsAddRawClusteredInput` returns true. + virtual void addRawClusteredInput( + char** /*groups*/, + const SelectivityVector& /*rows*/, + const std::vector& /*args*/, + const folly::Range& /*groupBoundaries*/) { + VELOX_NYI("Unimplemented: {} {}", typeid(*this).name(), __func__); + } + // Updates final accumulators from intermediate results. // @param groups Pointers to the start of the group rows. These are aligned // with the 'args', e.g. data in the i-th row of the 'args' goes to the i-th @@ -468,6 +497,8 @@ class Aggregate { std::vector pushdownCustomIndices_; bool validateIntermediateInputs_ = false; + + bool clusteredInput_ = false; }; using AggregateFunctionFactory = std::function( diff --git a/velox/exec/StreamingAggregation.cpp b/velox/exec/StreamingAggregation.cpp index e4b0aa598c8e..9e91c2ccf993 100644 --- a/velox/exec/StreamingAggregation.cpp +++ b/velox/exec/StreamingAggregation.cpp @@ -32,7 +32,10 @@ StreamingAggregation::StreamingAggregation( : "Aggregation"), outputBatchSize_{outputBatchRows()}, aggregationNode_{aggregationNode}, - step_{aggregationNode->step()} { + step_{aggregationNode->step()}, + eagerFlush_{operatorCtx_->driverCtx() + ->queryConfig() + .streamingAggregationEagerFlush()} { if (aggregationNode_->ignoreNullKeys()) { VELOX_UNSUPPORTED( "Streaming aggregation doesn't support ignoring null keys yet"); @@ -75,6 +78,15 @@ void StreamingAggregation::initialize() { } } + if (isRawInput(step_)) { + for (column_index_t i = 0; i < aggregates_.size(); ++i) { + if (aggregates_[i].sortingKeys.empty() && !aggregates_[i].distinct) { + // Must be set before we initialize row container, because it could + // change the type and size of accumulator. + aggregates_[i].function->setClusteredInput(true); + } + } + } masks_ = std::make_unique(extractMaskChannels(aggregates_)); rows_ = makeRowContainer(groupingKeyTypes); @@ -175,6 +187,9 @@ RowVectorPtr StreamingAggregation::createOutput(size_t numGroups) { } } + std::rotate(groups_.begin(), groups_.begin() + numGroups, groups_.end()); + numGroups_ -= numGroups; + return output; } @@ -215,6 +230,15 @@ void StreamingAggregation::assignGroups() { } } } + + groupBoundaries_.clear(); + for (vector_size_t i = 1; i < numInput; ++i) { + if (inputGroups_[i] != inputGroups_[i - 1]) { + groupBoundaries_.push_back(i); + } + } + VELOX_CHECK_GT(numInput, 0); + groupBoundaries_.push_back(numInput); } const SelectivityVector& StreamingAggregation::getSelectivityVector( @@ -256,7 +280,12 @@ void StreamingAggregation::evaluateAggregates() { } if (isRawInput(step_)) { - function->addRawInput(inputGroups_.data(), rows, args, false); + if (function->supportsAddRawClusteredInput()) { + function->addRawClusteredInput( + inputGroups_.data(), rows, args, groupBoundaries_); + } else { + function->addRawInput(inputGroups_.data(), rows, args, false); + } } else { function->addIntermediateResults(inputGroups_.data(), rows, args, false); } @@ -274,9 +303,7 @@ bool StreamingAggregation::isFinished() { RowVectorPtr StreamingAggregation::getOutput() { if (!input_) { if (noMoreInput_ && numGroups_ > 0) { - auto output = createOutput(numGroups_); - numGroups_ = 0; - return output; + return createOutput(numGroups_); } return nullptr; } @@ -294,19 +321,10 @@ RowVectorPtr StreamingAggregation::getOutput() { evaluateAggregates(); RowVectorPtr output; - if (numGroups_ > outputBatchSize_) { + if (eagerFlush_ && numGroups_ > 1) { + output = createOutput(numGroups_ - 1); + } else if (numGroups_ > outputBatchSize_) { output = createOutput(outputBatchSize_); - - // Rotate the entries in the groups_ vector to move the remaining groups to - // the beginning and place re-usable groups at the end. - std::vector copy(groups_.size()); - std::copy(groups_.begin() + outputBatchSize_, groups_.end(), copy.begin()); - std::copy( - groups_.begin(), - groups_.begin() + outputBatchSize_, - copy.begin() + groups_.size() - outputBatchSize_); - groups_ = std::move(copy); - numGroups_ -= outputBatchSize_; } prevInput_ = input_; diff --git a/velox/exec/StreamingAggregation.h b/velox/exec/StreamingAggregation.h index ee70a1e8e627..cea7667665ea 100644 --- a/velox/exec/StreamingAggregation.h +++ b/velox/exec/StreamingAggregation.h @@ -91,6 +91,8 @@ class StreamingAggregation : public Operator { const core::AggregationNode::Step step_; + const bool eagerFlush_; + std::vector groupingKeys_; std::vector aggregates_; std::unique_ptr sortedAggregations_; @@ -117,6 +119,10 @@ class StreamingAggregation : public Operator { // Pointers to groups for all input rows. std::vector inputGroups_; + // Indices into `groups` indicating the row after last row of each group. The + // last element of this is the total size of input. + std::vector groupBoundaries_; + // A subset of input rows to evaluate the aggregate function on. Rows // where aggregation mask is false are excluded. SelectivityVector inputRows_; diff --git a/velox/exec/tests/StreamingAggregationTest.cpp b/velox/exec/tests/StreamingAggregationTest.cpp index bf0a4cf5b0c7..b22e46fc0213 100644 --- a/velox/exec/tests/StreamingAggregationTest.cpp +++ b/velox/exec/tests/StreamingAggregationTest.cpp @@ -20,29 +20,39 @@ #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/SumNonPODAggregate.h" -using namespace facebook::velox; +namespace facebook::velox::exec { +namespace { + using namespace facebook::velox::exec::test; -class StreamingAggregationTest : public OperatorTestBase { +class StreamingAggregationTest : public OperatorTestBase, + public testing::WithParamInterface { protected: void SetUp() override { OperatorTestBase::SetUp(); registerSumNonPODAggregate("sumnonpod", 64); } + bool eagerFlush() { + return GetParam(); + } + + AssertQueryBuilder& config( + AssertQueryBuilder builder, + uint32_t outputBatchSize) { + return builder + .config( + core::QueryConfig::kPreferredOutputBatchRows, + std::to_string(outputBatchSize)) + .config( + core::QueryConfig::kStreamingAggregationEagerFlush, + std::to_string(eagerFlush())); + } + void testAggregation( const std::vector& keys, uint32_t outputBatchSize) { - std::vector data; - - vector_size_t totalSize = 0; - for (const auto& keyVector : keys) { - auto size = keyVector->size(); - auto payload = makeFlatVector( - size, [totalSize](auto row) { return totalSize + row; }); - data.push_back(makeRowVector({keyVector, payload})); - totalSize += size; - } + auto data = addPayload(keys, 1); createDuckDbTable(data); auto plan = PlanBuilder() @@ -59,10 +69,7 @@ class StreamingAggregationTest : public OperatorTestBase { .finalAggregation() .planNode(); - AssertQueryBuilder(plan, duckDbQueryRunner_) - .config( - core::QueryConfig::kPreferredOutputBatchRows, - std::to_string(outputBatchSize)) + config(AssertQueryBuilder(plan, duckDbQueryRunner_), outputBatchSize) .assertResults( "SELECT c0, count(1), min(c1), max(c1), sum(c1), sum(1), sum(cast(NULL as INT))" " , approx_quantile(c1, 0.95) " @@ -80,10 +87,7 @@ class StreamingAggregationTest : public OperatorTestBase { .finalAggregation() .planNode(); - AssertQueryBuilder(plan, duckDbQueryRunner_) - .config( - core::QueryConfig::kPreferredOutputBatchRows, - std::to_string(outputBatchSize)) + config(AssertQueryBuilder(plan, duckDbQueryRunner_), outputBatchSize) .assertResults( "SELECT c0, count(1), min(c1), max(c1), sum(c1), sum(1) FROM tmp GROUP BY 1"); @@ -101,10 +105,7 @@ class StreamingAggregationTest : public OperatorTestBase { .finalAggregation() .planNode(); - AssertQueryBuilder(plan, duckDbQueryRunner_) - .config( - core::QueryConfig::kPreferredOutputBatchRows, - std::to_string(outputBatchSize)) + config(AssertQueryBuilder(plan, duckDbQueryRunner_), outputBatchSize) .assertResults( "SELECT c0, count(1), min(c1) filter (where c1 % 7 = 0), " "max(c1) filter (where c1 % 11 = 0), sum(c1) filter (where c1 % 7 = 0) " @@ -114,16 +115,7 @@ class StreamingAggregationTest : public OperatorTestBase { void testSortedAggregation( const std::vector& keys, uint32_t outputBatchSize) { - std::vector data; - - vector_size_t totalSize = 0; - for (const auto& keyVector : keys) { - auto size = keyVector->size(); - auto payload = makeFlatVector( - size, [totalSize](auto row) { return totalSize + row; }); - data.push_back(makeRowVector({keyVector, payload, payload})); - totalSize += size; - } + auto data = addPayload(keys, 2); createDuckDbTable(data); auto plan = PlanBuilder() @@ -138,10 +130,7 @@ class StreamingAggregationTest : public OperatorTestBase { false) .planNode(); - AssertQueryBuilder(plan, duckDbQueryRunner_) - .config( - core::QueryConfig::kPreferredOutputBatchRows, - std::to_string(outputBatchSize)) + config(AssertQueryBuilder(plan, duckDbQueryRunner_), outputBatchSize) .assertResults( "SELECT c0, max(c1 order by c2), max(c1 order by c2 desc), array_agg(c1 order by c2) FROM tmp GROUP BY c0"); } @@ -149,16 +138,7 @@ class StreamingAggregationTest : public OperatorTestBase { void testDistinctAggregation( const std::vector& keys, uint32_t outputBatchSize) { - std::vector data; - - vector_size_t totalSize = 0; - for (const auto& keyVector : keys) { - auto size = keyVector->size(); - auto payload = makeFlatVector( - size, [totalSize](auto row) { return totalSize + row; }); - data.push_back(makeRowVector({keyVector, payload, payload})); - totalSize += size; - } + auto data = addPayload(keys, 2); createDuckDbTable(data); { @@ -175,10 +155,7 @@ class StreamingAggregationTest : public OperatorTestBase { false) .planNode(); - AssertQueryBuilder(plan, duckDbQueryRunner_) - .config( - core::QueryConfig::kPreferredOutputBatchRows, - std::to_string(outputBatchSize)) + config(AssertQueryBuilder(plan, duckDbQueryRunner_), outputBatchSize) .assertResults( "SELECT c0, array_agg(distinct c1), array_agg(c1 order by c2), " "count(distinct c1), array_agg(c2) FROM tmp GROUP BY c0"); @@ -192,14 +169,31 @@ class StreamingAggregationTest : public OperatorTestBase { {"c0"}, {}, {}, core::AggregationNode::Step::kSingle, false) .planNode(); - AssertQueryBuilder(plan, duckDbQueryRunner_) - .config( - core::QueryConfig::kPreferredOutputBatchRows, - std::to_string(outputBatchSize)) + config(AssertQueryBuilder(plan, duckDbQueryRunner_), outputBatchSize) .assertResults("SELECT distinct c0 FROM tmp"); } } + std::vector addPayload( + const std::vector& keys, + int numPayloadColumns) { + std::vector data; + vector_size_t totalSize = 0; + for (const auto& keyVector : keys) { + auto size = keyVector->size(); + auto payload = makeFlatVector( + size, [totalSize](auto row) { return totalSize + row; }); + std::vector columns; + columns.push_back(keyVector); + for (int i = 0; i < numPayloadColumns; ++i) { + columns.push_back(payload); + } + data.push_back(makeRowVector(columns)); + totalSize += size; + } + return data; + } + std::vector addPayload(const std::vector& keys) { auto numKeys = keys[0]->type()->size(); @@ -263,10 +257,7 @@ class StreamingAggregationTest : public OperatorTestBase { keySql.str(), keySql.str()); - AssertQueryBuilder(plan, duckDbQueryRunner_) - .config( - core::QueryConfig::kPreferredOutputBatchRows, - std::to_string(outputBatchSize)) + config(AssertQueryBuilder(plan, duckDbQueryRunner_), outputBatchSize) .assertResults(sql); EXPECT_EQ(NonPODInt64::constructed, NonPODInt64::destructed); @@ -309,10 +300,7 @@ class StreamingAggregationTest : public OperatorTestBase { keySql.str(), keySql.str()); - AssertQueryBuilder(plan, duckDbQueryRunner_) - .config( - core::QueryConfig::kPreferredOutputBatchRows, - std::to_string(outputBatchSize)) + config(AssertQueryBuilder(plan, duckDbQueryRunner_), outputBatchSize) .assertResults(sql); EXPECT_EQ(NonPODInt64::constructed, NonPODInt64::destructed); @@ -338,16 +326,15 @@ class StreamingAggregationTest : public OperatorTestBase { const auto sql = fmt::format("SELECT distinct {} FROM tmp", keySql.str()); - AssertQueryBuilder(plan, duckDbQueryRunner_) - .config( - core::QueryConfig::kPreferredOutputBatchRows, - std::to_string(outputBatchSize)) + config(AssertQueryBuilder(plan, duckDbQueryRunner_), outputBatchSize) .assertResults(sql); } } }; -TEST_F(StreamingAggregationTest, smallInputBatches) { +VELOX_INSTANTIATE_TEST_SUITE_P(, StreamingAggregationTest, testing::Bool()); + +TEST_P(StreamingAggregationTest, smallInputBatches) { // Use grouping keys that span one or more batches. std::vector keys = { makeNullableFlatVector({1, 1, std::nullopt, 2, 2}), @@ -363,7 +350,7 @@ TEST_F(StreamingAggregationTest, smallInputBatches) { testAggregation(keys, 3); } -TEST_F(StreamingAggregationTest, multipleKeys) { +TEST_P(StreamingAggregationTest, multipleKeys) { std::vector keys = { makeRowVector({ makeFlatVector({1, 1, 2, 2, 2}), @@ -385,7 +372,7 @@ TEST_F(StreamingAggregationTest, multipleKeys) { testMultiKeyAggregation(keys, 3); } -TEST_F(StreamingAggregationTest, regularSizeInputBatches) { +TEST_P(StreamingAggregationTest, regularSizeInputBatches) { auto size = 1'024; std::vector keys = { @@ -404,7 +391,7 @@ TEST_F(StreamingAggregationTest, regularSizeInputBatches) { testAggregation(keys, 100); } -TEST_F(StreamingAggregationTest, uniqueKeys) { +TEST_P(StreamingAggregationTest, uniqueKeys) { auto size = 1'024; std::vector keys = { @@ -421,7 +408,7 @@ TEST_F(StreamingAggregationTest, uniqueKeys) { testAggregation(keys, 100); } -TEST_F(StreamingAggregationTest, partialStreaming) { +TEST_P(StreamingAggregationTest, partialStreaming) { auto size = 1'024; // Generate 2 key columns. First key is clustered / pre-grouped. Second key is @@ -462,7 +449,7 @@ TEST_F(StreamingAggregationTest, partialStreaming) { // Test StreamingAggregation being closed without being initialized. Create a // pipeline with Project followed by StreamingAggregation. Make // Project::initialize fail by using non-existent function. -TEST_F(StreamingAggregationTest, closeUninitialized) { +TEST_P(StreamingAggregationTest, closeUninitialized) { auto data = makeRowVector({ makeFlatVector({1, 2, 3}), }); @@ -489,7 +476,7 @@ TEST_F(StreamingAggregationTest, closeUninitialized) { "Scalar function name not registered: do-not-exist"); } -TEST_F(StreamingAggregationTest, sortedAggregations) { +TEST_P(StreamingAggregationTest, sortedAggregations) { auto size = 1024; std::vector keys = { @@ -505,7 +492,7 @@ TEST_F(StreamingAggregationTest, sortedAggregations) { testSortedAggregation(keys, 32); } -TEST_F(StreamingAggregationTest, distinctAggregations) { +TEST_P(StreamingAggregationTest, distinctAggregations) { auto size = 1024; std::vector keys = { @@ -538,3 +525,42 @@ TEST_F(StreamingAggregationTest, distinctAggregations) { testMultiKeyDistinctAggregation(multiKeys, 1024); testMultiKeyDistinctAggregation(multiKeys, 3); } + +TEST_P(StreamingAggregationTest, clusteredInput) { + std::vector keys = { + makeNullableFlatVector({1, 1, std::nullopt, 2, 2}), + makeFlatVector({2, 3, 3, 4}), + makeFlatVector({5, 6, 6, 6}), + makeFlatVector({6, 6, 6, 6}), + makeFlatVector({6, 7, 8}), + }; + auto data = addPayload(keys, 1); + auto plan = PlanBuilder() + .values(data) + .partialStreamingAggregation( + {"c0"}, {"count(c1)", "arbitrary(c1)", "array_agg(c1)"}) + .finalAggregation() + .planNode(); + auto expected = makeRowVector({ + makeNullableFlatVector({1, std::nullopt, 2, 3, 4, 5, 6, 7, 8}), + makeFlatVector({2, 1, 3, 2, 1, 1, 8, 1, 1}), + makeFlatVector({0, 2, 3, 6, 8, 9, 10, 18, 19}), + makeArrayVector( + {{0, 1}, + {2}, + {3, 4, 5}, + {6, 7}, + {8}, + {9}, + {10, 11, 12, 13, 14, 15, 16, 17}, + {18}, + {19}}), + }); + for (auto batchSize : {3, 20}) { + SCOPED_TRACE(fmt::format("batchSize={}", batchSize)); + config(AssertQueryBuilder(plan), batchSize).assertResults(expected); + } +} + +} // namespace +} // namespace facebook::velox::exec diff --git a/velox/functions/prestosql/aggregates/ArbitraryAggregate.cpp b/velox/functions/prestosql/aggregates/ArbitraryAggregate.cpp index d530f1a14658..1a57bcac0734 100644 --- a/velox/functions/prestosql/aggregates/ArbitraryAggregate.cpp +++ b/velox/functions/prestosql/aggregates/ArbitraryAggregate.cpp @@ -152,6 +152,12 @@ inline int32_t ArbitraryAggregate::accumulatorAlignmentSize() const { return static_cast(sizeof(int128_t)); } +// In case of clustered input, we just keep a reference to the input vector. +struct ClusteredNonNumericAccumulator { + VectorPtr vector; + vector_size_t index; +}; + // Arbitrary for non-numeric types. We always keep the first (non-NULL) element // seen. Arbitrary (x) will produce partial and final aggregations of type x. class NonNumericArbitrary : public exec::Aggregate { @@ -159,10 +165,13 @@ class NonNumericArbitrary : public exec::Aggregate { explicit NonNumericArbitrary(const TypePtr& resultType) : exec::Aggregate(resultType) {} - // We use singleValueAccumulator to save the results for each group. This - // struct will allow us to save variable-width value. int32_t accumulatorFixedWidthSize() const override { - return sizeof(SingleValueAccumulator); + return clusteredInput_ ? sizeof(ClusteredNonNumericAccumulator) + : sizeof(SingleValueAccumulator); + } + + bool accumulatorUsesExternalMemory() const override { + return true; } void extractValues(char** groups, int32_t numGroups, VectorPtr* result) @@ -172,14 +181,41 @@ class NonNumericArbitrary : public exec::Aggregate { auto* rawNulls = exec::Aggregate::getRawNulls(result->get()); - for (int32_t i = 0; i < numGroups; ++i) { - char* group = groups[i]; - auto accumulator = value(group); - if (!accumulator->hasValue()) { - (*result)->setNull(i, true); - } else { - exec::Aggregate::clearNull(rawNulls, i); - accumulator->read(*result, i); + if (clusteredInput_) { + VectorPtr* currentSource = nullptr; + VELOX_DCHECK(copyRanges_.empty()); + for (vector_size_t i = 0; i < numGroups; ++i) { + auto* accumulator = value(groups[i]); + if (!accumulator->vector) { + (*result)->setNull(i, true); + continue; + } + if (currentSource && + currentSource->get() != accumulator->vector.get()) { + result->get()->copyRanges(currentSource->get(), copyRanges_); + copyRanges_.clear(); + } + currentSource = &accumulator->vector; + BaseVector::CopyRange range = {accumulator->index, i, 1}; + if (!copyRanges_.empty() && copyRanges_.back().mergeable(range)) { + ++copyRanges_.back().count; + } else { + copyRanges_.push_back(range); + } + } + if (currentSource) { + result->get()->copyRanges(currentSource->get(), copyRanges_); + copyRanges_.clear(); + } + } else { + for (int32_t i = 0; i < numGroups; ++i) { + auto* accumulator = value(groups[i]); + if (!accumulator->hasValue()) { + (*result)->setNull(i, true); + } else { + exec::Aggregate::clearNull(rawNulls, i); + accumulator->read(*result, i); + } } } } @@ -194,16 +230,17 @@ class NonNumericArbitrary : public exec::Aggregate { const SelectivityVector& rows, const std::vector& args, bool /*unused*/) override { - DecodedVector decoded(*args[0], rows, true); - if (decoded.isConstantMapping() && decoded.isNullAt(rows.begin())) { + VELOX_CHECK(!clusteredInput_); + decoded_.decode(*args[0], rows, true); + if (decoded_.isConstantMapping() && decoded_.isNullAt(rows.begin())) { // nothing to do; all values are nulls return; } - const auto* indices = decoded.indices(); - const auto* baseVector = decoded.base(); + const auto* indices = decoded_.indices(); + const auto* baseVector = decoded_.base(); rows.applyToSelected([&](vector_size_t i) { - if (decoded.isNullAt(i)) { + if (decoded_.isNullAt(i)) { return; } auto* accumulator = value(groups[i]); @@ -213,6 +250,48 @@ class NonNumericArbitrary : public exec::Aggregate { }); } + bool supportsAddRawClusteredInput() const override { + return clusteredInput_; + } + + void addRawClusteredInput( + char** groups, + const SelectivityVector& rows, + const std::vector& args, + const folly::Range& groupBoundaries) override { + VELOX_CHECK(clusteredInput_); + decoded_.decode(*args[0]); + vector_size_t groupStart = 0; + auto forEachEmptyAccumulator = [&](auto func) { + for (auto groupEnd : groupBoundaries) { + auto* accumulator = + value(groups[groupEnd - 1]); + // When the vector is already set, it means the same group is also + // present in previous input batch. + if (!accumulator->vector) { + func(groupEnd, accumulator); + } + groupStart = groupEnd; + } + }; + if (rows.isAllSelected() && !decoded_.mayHaveNulls()) { + forEachEmptyAccumulator([&](auto /*groupEnd*/, auto* accumulator) { + accumulator->vector = args[0]; + accumulator->index = groupStart; + }); + } else { + forEachEmptyAccumulator([&](auto groupEnd, auto* accumulator) { + for (auto i = groupStart; i < groupEnd; ++i) { + if (rows.isValid(i) && !decoded_.isNullAt(i)) { + accumulator->vector = args[0]; + accumulator->index = i; + break; + } + } + }); + } + } + void addIntermediateResults( char** groups, const SelectivityVector& rows, @@ -226,22 +305,23 @@ class NonNumericArbitrary : public exec::Aggregate { const SelectivityVector& rows, const std::vector& args, bool /*unused*/) override { + VELOX_CHECK(!clusteredInput_); auto* accumulator = value(group); if (accumulator->hasValue()) { return; } - DecodedVector decoded(*args[0], rows, true); - if (decoded.isConstantMapping() && decoded.isNullAt(rows.begin())) { + decoded_.decode(*args[0], rows, true); + if (decoded_.isConstantMapping() && decoded_.isNullAt(rows.begin())) { // nothing to do; all values are nulls return; } - const auto* indices = decoded.indices(); - const auto* baseVector = decoded.base(); + const auto* indices = decoded_.indices(); + const auto* baseVector = decoded_.base(); // Find the first non-null value. rows.testSelected([&](vector_size_t i) { - if (!decoded.isNullAt(i)) { + if (!decoded_.isNullAt(i)) { accumulator->write(baseVector, indices[i], allocator_); return false; // Stop } @@ -258,23 +338,40 @@ class NonNumericArbitrary : public exec::Aggregate { } protected: - // Initialize each group, we will not use the null flags because - // SingleValueAccumulator has its own flag. + // Initialize each group, we will not use the null flags because the + // accumulator has its own flag. void initializeNewGroupsInternal( char** groups, folly::Range indices) override { for (auto i : indices) { - new (groups[i] + offset_) SingleValueAccumulator(); + if (clusteredInput_) { + new (groups[i] + offset_) ClusteredNonNumericAccumulator(); + } else { + new (groups[i] + offset_) SingleValueAccumulator(); + } } } void destroyInternal(folly::Range groups) override { for (auto group : groups) { if (isInitialized(group)) { - value(group)->destroy(allocator_); + if (clusteredInput_) { + auto* accumulator = value(group); + std::destroy_at(accumulator); + } else { + auto* accumulator = value(group); + accumulator->destroy(allocator_); + } } } } + + private: + // Decoded input vector. + DecodedVector decoded_; + + // Copy ranges used when extracting from ClusteredNonNumericAccumulator. + std::vector copyRanges_; }; } // namespace diff --git a/velox/functions/prestosql/aggregates/ArrayAggAggregate.cpp b/velox/functions/prestosql/aggregates/ArrayAggAggregate.cpp index 7bc41c4579a9..8ec496dd425c 100644 --- a/velox/functions/prestosql/aggregates/ArrayAggAggregate.cpp +++ b/velox/functions/prestosql/aggregates/ArrayAggAggregate.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ #include "velox/functions/prestosql/aggregates/ArrayAggAggregate.h" +#include "folly/container/small_vector.h" #include "velox/exec/ContainerRowSerde.h" #include "velox/expression/FunctionSignature.h" #include "velox/functions/lib/aggregates/ValueList.h" @@ -26,13 +27,30 @@ struct ArrayAccumulator { ValueList elements; }; +struct SourceRange { + VectorPtr vector; + vector_size_t offset; + vector_size_t size; +}; + +// Accumulator for clustered input. We keep the ranges from different source +// vectors that should be collected in this group. +struct ClusteredAccumulator { + folly::small_vector sources; +}; + class ArrayAggAggregate : public exec::Aggregate { public: explicit ArrayAggAggregate(TypePtr resultType, bool ignoreNulls) : Aggregate(resultType), ignoreNulls_(ignoreNulls) {} int32_t accumulatorFixedWidthSize() const override { - return sizeof(ArrayAccumulator); + return clusteredInput_ ? sizeof(ClusteredAccumulator) + : sizeof(ArrayAccumulator); + } + + bool accumulatorUsesExternalMemory() const override { + return true; } bool isFixedSize() const override { @@ -97,26 +115,70 @@ class ArrayAggAggregate : public exec::Aggregate { auto vector = (*result)->as(); VELOX_CHECK(vector); vector->resize(numGroups); - - auto elements = vector->elements(); + uint64_t* rawNulls = getRawNulls(vector); + auto* resultOffsets = + vector->mutableOffsets(numGroups)->asMutable(); + auto* resultSizes = + vector->mutableSizes(numGroups)->asMutable(); + auto& elements = vector->elements(); elements->resize(countElements(groups, numGroups)); + vector_size_t arrayOffset = 0; + + if (clusteredInput_) { + VectorPtr* currentSource = nullptr; + std::vector ranges; + for (int32_t i = 0; i < numGroups; ++i) { + auto* accumulator = value(groups[i]); + resultOffsets[i] = arrayOffset; + vector_size_t arraySize = 0; + for (auto& source : accumulator->sources) { + if (currentSource && currentSource->get() != source.vector.get()) { + elements->copyRanges(currentSource->get(), ranges); + ranges.clear(); + } + if (!currentSource || currentSource->get() != source.vector.get()) { + currentSource = &source.vector; + ranges.push_back({source.offset, arrayOffset, source.size}); + } else if ( + ranges.back().sourceIndex + ranges.back().count == + source.offset) { + ranges.back().count += source.size; + } else { + VELOX_DCHECK_LT( + ranges.back().sourceIndex + ranges.back().count, source.offset); + ranges.push_back({source.offset, arrayOffset, source.size}); + } + arrayOffset += source.size; + arraySize += source.size; + } + resultSizes[i] = arraySize; + if (arraySize == 0) { + vector->setNull(i, true); + } else { + clearNull(rawNulls, i); + } + } + if (currentSource) { + VELOX_DCHECK(!ranges.empty()); + elements->copyRanges(currentSource->get(), ranges); + } - uint64_t* rawNulls = getRawNulls(vector); - vector_size_t offset = 0; - for (int32_t i = 0; i < numGroups; ++i) { - auto& values = value(groups[i])->elements; - auto arraySize = values.size(); - if (arraySize) { - clearNull(rawNulls, i); - - ValueListReader reader(values); - for (auto index = 0; index < arraySize; ++index) { - reader.next(*elements, offset + index); + } else { + for (int32_t i = 0; i < numGroups; ++i) { + auto& values = value(groups[i])->elements; + auto arraySize = values.size(); + if (arraySize) { + clearNull(rawNulls, i); + ValueListReader reader(values); + for (auto index = 0; index < arraySize; ++index) { + reader.next(*elements, arrayOffset + index); + } + resultOffsets[i] = arrayOffset; + resultSizes[i] = arraySize; + arrayOffset += arraySize; + } else { + vector->setNull(i, true); } - vector->setOffsetAndSize(i, offset, arraySize); - offset += arraySize; - } else { - vector->setNull(i, true); } } } @@ -131,6 +193,7 @@ class ArrayAggAggregate : public exec::Aggregate { const SelectivityVector& rows, const std::vector& args, bool /*mayPushdown*/) override { + VELOX_CHECK(!clusteredInput_); decodedElements_.decode(*args[0], rows); rows.applyToSelected([&](vector_size_t row) { if (ignoreNulls_ && decodedElements_.isNullAt(row)) { @@ -143,6 +206,51 @@ class ArrayAggAggregate : public exec::Aggregate { }); } + bool supportsAddRawClusteredInput() const override { + return clusteredInput_; + } + + void addRawClusteredInput( + char** groups, + const SelectivityVector& rows, + const std::vector& args, + const folly::Range& groupBoundaries) override { + VELOX_CHECK(clusteredInput_); + decodedElements_.decode(*args[0]); + vector_size_t groupStart = 0; + auto forEachAccumulator = [&](auto func) { + for (auto groupEnd : groupBoundaries) { + auto* accumulator = value(groups[groupEnd - 1]); + func(groupEnd, accumulator); + groupStart = groupEnd; + } + }; + if (rows.isAllSelected() && + (!ignoreNulls_ || !decodedElements_.mayHaveNulls())) { + forEachAccumulator([&](auto groupEnd, auto* accumulator) { + accumulator->sources.push_back( + {args[0], groupStart, groupEnd - groupStart}); + }); + } else { + forEachAccumulator([&](auto groupEnd, auto* accumulator) { + for (auto i = groupStart; i < groupEnd; ++i) { + if (!rows.isValid(i) || + (ignoreNulls_ && decodedElements_.isNullAt(i))) { + if (i > groupStart) { + accumulator->sources.push_back( + {args[0], groupStart, i - groupStart}); + } + groupStart = i + 1; + } + } + if (groupEnd > groupStart) { + accumulator->sources.push_back( + {args[0], groupStart, groupEnd - groupStart}); + } + }); + } + } + void addIntermediateResults( char** groups, const SelectivityVector& rows, @@ -171,6 +279,7 @@ class ArrayAggAggregate : public exec::Aggregate { const SelectivityVector& rows, const std::vector& args, bool /* mayPushdown */) override { + VELOX_CHECK(!clusteredInput_); auto& values = value(group)->elements; decodedElements_.decode(*args[0], rows); @@ -188,6 +297,7 @@ class ArrayAggAggregate : public exec::Aggregate { const SelectivityVector& rows, const std::vector& args, bool /* mayPushdown */) override { + VELOX_CHECK(!clusteredInput_); decodedIntermediate_.decode(*args[0], rows); auto arrayVector = decodedIntermediate_.base()->as(); @@ -210,14 +320,23 @@ class ArrayAggAggregate : public exec::Aggregate { char** groups, folly::Range indices) override { for (auto index : indices) { - new (groups[index] + offset_) ArrayAccumulator(); + if (clusteredInput_) { + new (groups[index] + offset_) ClusteredAccumulator(); + } else { + new (groups[index] + offset_) ArrayAccumulator(); + } } } void destroyInternal(folly::Range groups) override { for (auto group : groups) { if (isInitialized(group)) { - value(group)->elements.free(allocator_); + if (clusteredInput_) { + auto* accumulator = value(group); + std::destroy_at(accumulator); + } else { + value(group)->elements.free(allocator_); + } } } } @@ -225,8 +344,17 @@ class ArrayAggAggregate : public exec::Aggregate { private: vector_size_t countElements(char** groups, int32_t numGroups) const { vector_size_t size = 0; - for (int32_t i = 0; i < numGroups; ++i) { - size += value(groups[i])->elements.size(); + if (clusteredInput_) { + for (int32_t i = 0; i < numGroups; ++i) { + auto* accumulator = value(groups[i]); + for (auto& source : accumulator->sources) { + size += source.size; + } + } + } else { + for (int32_t i = 0; i < numGroups; ++i) { + size += value(groups[i])->elements.size(); + } } return size; } diff --git a/velox/functions/prestosql/aggregates/tests/ArbitraryTest.cpp b/velox/functions/prestosql/aggregates/tests/ArbitraryTest.cpp index 29cbc6837ddf..f3efd78966cf 100644 --- a/velox/functions/prestosql/aggregates/tests/ArbitraryTest.cpp +++ b/velox/functions/prestosql/aggregates/tests/ArbitraryTest.cpp @@ -465,5 +465,43 @@ TEST_F(ArbitraryTest, spilling) { ::facebook::velox::test::assertEqualVectors(expected, result); } +TEST_F(ArbitraryTest, clusteredInput) { + constexpr int kSize = 1000; + for (int batchRows : {kSize, 13}) { + std::vector data; + for (int i = 0; i < kSize; i += batchRows) { + auto size = std::min(batchRows, kSize - i); + data.push_back(makeRowVector({ + makeFlatVector(size, [&](auto j) { return (i + j) / 17; }), + makeFlatVector( + size, [&](auto j) { return std::to_string(i + j); }), + makeFlatVector(size, [&](auto j) { return (i + j) % 11 == 0; }), + })); + } + createDuckDbTable(data); + for (bool mask : {false, true}) { + auto builder = PlanBuilder().values(data); + std::string expected; + if (mask) { + builder.partialStreamingAggregation({"c0"}, {"arbitrary(c1)"}, {"c2"}); + expected = "select c0, first(c1) filter (where c2) from tmp group by 1"; + } else { + builder.partialStreamingAggregation({"c0"}, {"arbitrary(c1)"}); + expected = "select c0, first(c1) from tmp group by 1"; + } + auto plan = builder.finalAggregation().planNode(); + for (bool eagerFlush : {false, true}) { + SCOPED_TRACE(fmt::format( + "mask={} batchRows={} eagerFlush={}", mask, batchRows, eagerFlush)); + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchRows, batchRows) + .config( + core::QueryConfig::kStreamingAggregationEagerFlush, eagerFlush) + .assertResults(expected); + } + } + } +} + } // namespace } // namespace facebook::velox::aggregate::test diff --git a/velox/functions/prestosql/aggregates/tests/ArrayAggTest.cpp b/velox/functions/prestosql/aggregates/tests/ArrayAggTest.cpp index 56c94257ff5b..e94da3d2bde0 100644 --- a/velox/functions/prestosql/aggregates/tests/ArrayAggTest.cpp +++ b/velox/functions/prestosql/aggregates/tests/ArrayAggTest.cpp @@ -557,5 +557,46 @@ TEST_F(ArrayAggTest, mask) { testFunction("simple_array_agg"); } +TEST_F(ArrayAggTest, clusteredInput) { + constexpr int kSize = 1000; + for (int batchRows : {kSize, 13}) { + std::vector data; + for (int i = 0; i < kSize; i += batchRows) { + auto size = std::min(batchRows, kSize - i); + data.push_back(makeRowVector({ + makeFlatVector(size, [&](auto j) { return (i + j) / 17; }), + makeFlatVector( + size, + [&](auto j) { return i + j; }, + [&](auto j) { return (i + j) % 19 == 0; }), + makeFlatVector(size, [&](auto j) { return (i + j) % 11 == 0; }), + })); + } + createDuckDbTable(data); + for (bool mask : {false, true}) { + auto builder = PlanBuilder().values(data); + std::string expected; + if (mask) { + builder.partialStreamingAggregation({"c0"}, {"array_agg(c1)"}, {"c2"}); + expected = + "select c0, array_agg(c1) filter (where c2) from tmp group by 1"; + } else { + builder.partialStreamingAggregation({"c0"}, {"array_agg(c1)"}); + expected = "select c0, array_agg(c1) from tmp group by 1"; + } + auto plan = builder.finalAggregation().planNode(); + for (bool eagerFlush : {false, true}) { + SCOPED_TRACE(fmt::format( + "mask={} batchRows={} eagerFlush={}", mask, batchRows, eagerFlush)); + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchRows, batchRows) + .config( + core::QueryConfig::kStreamingAggregationEagerFlush, eagerFlush) + .assertResults(expected); + } + } + } +} + } // namespace } // namespace facebook::velox::aggregate::test diff --git a/velox/vector/BaseVector.h b/velox/vector/BaseVector.h index bf88943dc9b3..c9322ea3a6be 100644 --- a/velox/vector/BaseVector.h +++ b/velox/vector/BaseVector.h @@ -430,6 +430,12 @@ class BaseVector { vector_size_t sourceIndex; vector_size_t targetIndex; vector_size_t count; + + /// Whether `next` can be merged with this range to form a new range. + bool mergeable(const CopyRange& next) const { + return next.sourceIndex == sourceIndex + count && + next.targetIndex == targetIndex + count; + } }; /// Sets null flags for each row in 'ranges' to 'isNull'.