Skip to content

feat: Add Aggregate::addRawClusteredInput and streaming_aggregation_eager_flush #12975

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,17 @@ class QueryConfig {
static constexpr const char* kRequestDataSizesMaxWaitSec =
"request_data_sizes_max_wait_sec";

/// If this is false (the default), in streaming aggregation, wait until we
/// have enough number of output rows to produce a batch of size specified by
/// Operator::outputBatchRows.
///
/// If this is true, we put the rows in output batch, as soon as the
/// corresponding groups are fully aggregated. This is useful for reducing
/// memory consumption, if the downstream operators are not sensitive to small
/// batch size.
static constexpr const char* kStreamingAggregationEagerFlush =
"streaming_aggregation_eager_flush";

bool selectiveNimbleReaderEnabled() const {
return get<bool>(kSelectiveNimbleReaderEnabled, false);
}
Expand Down Expand Up @@ -976,6 +987,10 @@ class QueryConfig {
return get<bool>(kThrowExceptionOnDuplicateMapKeys, false);
}

bool streamingAggregationEagerFlush() const {
return get<bool>(kStreamingAggregationEagerFlush, false);
}

template <typename T>
T get(const std::string& key, const T& defaultValue) const {
return config_->get<T>(key, defaultValue);
Expand Down
36 changes: 28 additions & 8 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,6 @@ Generic Configuration
- integer
- 5000
- TableScan operator will exit getOutput() method after this many milliseconds even if it has no data to return yet. Zero means 'no time limit'.
* - abandon_partial_aggregation_min_rows
- integer
- 100,000
- Number of input rows to receive before starting to check whether to abandon partial aggregation.
* - abandon_partial_aggregation_min_pct
- integer
- 80
- Abandons partial aggregation if number of groups equals or exceeds this percentage of the number of input rows.
* - abandon_partial_topn_row_number_min_rows
- integer
- 100,000
Expand Down Expand Up @@ -399,6 +391,34 @@ Spilling
- 0
- Percentage of aggregation or join input batches that will be forced to spill for testing. 0 means no extra spilling.

Aggregation
-----------
.. list-table::
:widths: 20 10 10 70
:header-rows: 1

* - Property Name
- Type
- Default Value
- Description
* - abandon_partial_aggregation_min_rows
- integer
- 100,000
- Number of input rows to receive before starting to check whether to abandon partial aggregation.
* - abandon_partial_aggregation_min_pct
- integer
- 80
- Abandons partial aggregation if number of groups equals or exceeds this percentage of the number of input rows.
* - streaming_aggregation_eager_flush
- bool
- false
- If this is false (the default), in streaming aggregation, wait until we
have enough number of output rows to produce a batch of size specified by
Operator::outputBatchRows. If this is true, we put the rows in output
batch, as soon as the corresponding groups are fully aggregated. This is
useful for reducing memory consumption, if the downstream operators are
not sensitive to small batch size.

Table Scan
------------
.. list-table::
Expand Down
31 changes: 31 additions & 0 deletions velox/exec/Aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,35 @@ class Aggregate {
const std::vector<VectorPtr>& args,
bool mayPushdown) = 0;

/// Called by aggregation operator to set whether the input data is eligible
/// for clustered input optimization. This is turned off, in cases for
/// example if the input rows from same group are not contiguous, or the
/// aggregate is sorted or distinct.
void setClusteredInput(bool value) {
clusteredInput_ = value;
}

/// Whether the function itself supports clustered input optimization.
///
/// When this returns true, `addRawClusteredInput` should be implemented.
virtual bool supportsAddRawClusteredInput() const {
return false;
}

/// Fast path for the function when the input rows from same group are
/// clustered together. `groups`, `rows`, and `args` are the same as
/// `addRawInput`, `groupBoundaries` is the indices into `groups` indicating
/// the row after last row of each group.
///
/// Will only be called when `supportsAddRawClusteredInput` returns true.
virtual void addRawClusteredInput(
char** /*groups*/,
const SelectivityVector& /*rows*/,
const std::vector<VectorPtr>& /*args*/,
const folly::Range<const vector_size_t*>& /*groupBoundaries*/) {
VELOX_NYI("Unimplemented: {} {}", typeid(*this).name(), __func__);
}

// Updates final accumulators from intermediate results.
// @param groups Pointers to the start of the group rows. These are aligned
// with the 'args', e.g. data in the i-th row of the 'args' goes to the i-th
Expand Down Expand Up @@ -468,6 +497,8 @@ class Aggregate {
std::vector<vector_size_t> pushdownCustomIndices_;

bool validateIntermediateInputs_ = false;

bool clusteredInput_ = false;
};

using AggregateFunctionFactory = std::function<std::unique_ptr<Aggregate>(
Expand Down
52 changes: 35 additions & 17 deletions velox/exec/StreamingAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ StreamingAggregation::StreamingAggregation(
: "Aggregation"),
outputBatchSize_{outputBatchRows()},
aggregationNode_{aggregationNode},
step_{aggregationNode->step()} {
step_{aggregationNode->step()},
eagerFlush_{operatorCtx_->driverCtx()
->queryConfig()
.streamingAggregationEagerFlush()} {
if (aggregationNode_->ignoreNullKeys()) {
VELOX_UNSUPPORTED(
"Streaming aggregation doesn't support ignoring null keys yet");
Expand Down Expand Up @@ -75,6 +78,15 @@ void StreamingAggregation::initialize() {
}
}

if (isRawInput(step_)) {
for (column_index_t i = 0; i < aggregates_.size(); ++i) {
if (aggregates_[i].sortingKeys.empty() && !aggregates_[i].distinct) {
// Must be set before we initialize row container, because it could
// change the type and size of accumulator.
aggregates_[i].function->setClusteredInput(true);
}
}
}
masks_ = std::make_unique<AggregationMasks>(extractMaskChannels(aggregates_));
rows_ = makeRowContainer(groupingKeyTypes);

Expand Down Expand Up @@ -175,6 +187,9 @@ RowVectorPtr StreamingAggregation::createOutput(size_t numGroups) {
}
}

std::rotate(groups_.begin(), groups_.begin() + numGroups, groups_.end());
numGroups_ -= numGroups;

return output;
}

Expand Down Expand Up @@ -215,6 +230,15 @@ void StreamingAggregation::assignGroups() {
}
}
}

groupBoundaries_.clear();
for (vector_size_t i = 1; i < numInput; ++i) {
if (inputGroups_[i] != inputGroups_[i - 1]) {
groupBoundaries_.push_back(i);
}
}
VELOX_CHECK_GT(numInput, 0);
groupBoundaries_.push_back(numInput);
}

const SelectivityVector& StreamingAggregation::getSelectivityVector(
Expand Down Expand Up @@ -256,7 +280,12 @@ void StreamingAggregation::evaluateAggregates() {
}

if (isRawInput(step_)) {
function->addRawInput(inputGroups_.data(), rows, args, false);
if (function->supportsAddRawClusteredInput()) {
function->addRawClusteredInput(
inputGroups_.data(), rows, args, groupBoundaries_);
} else {
function->addRawInput(inputGroups_.data(), rows, args, false);
}
} else {
function->addIntermediateResults(inputGroups_.data(), rows, args, false);
}
Expand All @@ -274,9 +303,7 @@ bool StreamingAggregation::isFinished() {
RowVectorPtr StreamingAggregation::getOutput() {
if (!input_) {
if (noMoreInput_ && numGroups_ > 0) {
auto output = createOutput(numGroups_);
numGroups_ = 0;
return output;
return createOutput(numGroups_);
}
return nullptr;
}
Expand All @@ -294,19 +321,10 @@ RowVectorPtr StreamingAggregation::getOutput() {
evaluateAggregates();

RowVectorPtr output;
if (numGroups_ > outputBatchSize_) {
if (eagerFlush_ && numGroups_ > 1) {
output = createOutput(numGroups_ - 1);
} else if (numGroups_ > outputBatchSize_) {
output = createOutput(outputBatchSize_);

// Rotate the entries in the groups_ vector to move the remaining groups to
// the beginning and place re-usable groups at the end.
std::vector<char*> copy(groups_.size());
std::copy(groups_.begin() + outputBatchSize_, groups_.end(), copy.begin());
std::copy(
groups_.begin(),
groups_.begin() + outputBatchSize_,
copy.begin() + groups_.size() - outputBatchSize_);
groups_ = std::move(copy);
numGroups_ -= outputBatchSize_;
}

prevInput_ = input_;
Expand Down
6 changes: 6 additions & 0 deletions velox/exec/StreamingAggregation.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ class StreamingAggregation : public Operator {

const core::AggregationNode::Step step_;

const bool eagerFlush_;

std::vector<column_index_t> groupingKeys_;
std::vector<AggregateInfo> aggregates_;
std::unique_ptr<SortedAggregations> sortedAggregations_;
Expand All @@ -117,6 +119,10 @@ class StreamingAggregation : public Operator {
// Pointers to groups for all input rows.
std::vector<char*> inputGroups_;

// Indices into `groups` indicating the row after last row of each group. The
// last element of this is the total size of input.
std::vector<vector_size_t> groupBoundaries_;

// A subset of input rows to evaluate the aggregate function on. Rows
// where aggregation mask is false are excluded.
SelectivityVector inputRows_;
Expand Down
Loading
Loading