Skip to content

feat: Add Aggregate::addRawClusteredInput and streaming_aggregation_eager_flush #12975

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

Yuhta
Copy link
Contributor

@Yuhta Yuhta commented Apr 9, 2025

Summary:
Add fast path for streaming aggregation where we have input rows from same group located together. For certain functions, we can leverage this property to reduce the number of copy calls and create larger and fewer ranges for copy. This brings 3x improvements for a specific query shape common in data loading for AI training.

We implement this optimization for arbitrary and array_agg. For arbitrary, if the input is clustered, we just keep a reference to the input vector and index that is selected; when we extract values from the container, we group all copies from same vector to one copyRange call so the cost is minimized. For array_agg, we do similar thing, only track the range (offset and size) where the input will be taken for each group, and do the copy in bulk when we extract value.

There is another optimization to flush the streaming aggregation output whenever there is result available, via a new query config streaming_aggregation_eager_flush. This allows us to minimize the memory used by accumulators.

Differential Revision: D72677410

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Apr 9, 2025
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D72677410

Copy link

netlify bot commented Apr 9, 2025

Deploy Preview for meta-velox canceled.

Name Link
🔨 Latest commit c2cedea
🔍 Latest deploy log https://app.netlify.com/sites/meta-velox/deploys/67f849eb55091100082ee54c

@Yuhta Yuhta changed the title feat: add Aggregate::addRawClusteredInput and streaming_aggregation_eager_flush feat: Add Aggregate::addRawClusteredInput and streaming_aggregation_eager_flush Apr 9, 2025
@mbasmanova mbasmanova requested review from xiaoxmeng and rui-mo April 9, 2025 15:45
@mbasmanova
Copy link
Contributor

@Yuhta Jimmy, thank you for the optimization. Would you update the PR description to share some findings about why this is a useful optimization? 3x improvements for a specific query shape common in data loading for AI training.

Copy link
Collaborator

@rui-mo rui-mo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Added two nits.

@Yuhta Yuhta force-pushed the export-D72677410 branch from f639c66 to fa6d5ad Compare April 10, 2025 17:41
Yuhta added a commit to Yuhta/velox that referenced this pull request Apr 10, 2025
…ager_flush (facebookincubator#12975)

Summary:

Add fast path for streaming aggregation where we have input rows from same group located together.  For certain functions, we can leverage this property to reduce the number of copy calls and create larger and fewer ranges for copy.  This brings 3x improvements for a specific query shape common in data loading for AI training.

We implement this optimization for `arbitrary` and `array_agg`.  For `arbitrary`, if the input is clustered, we just keep a reference to the input vector and index that is selected; when we extract values from the container, we group all copies from same vector to one `copyRange` call so the cost is minimized.  For `array_agg`, we do similar thing, only track the range (offset and size) where the input will be taken for each group, and do the copy in bulk when we extract value.

There is another optimization to flush the streaming aggregation output whenever there is result available, via a new query config `streaming_aggregation_eager_flush`.  This allows us to minimize the memory used by accumulators.

Differential Revision: D72677410
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D72677410

Yuhta added a commit to Yuhta/velox that referenced this pull request Apr 10, 2025
…ager_flush (facebookincubator#12975)

Summary:

Add fast path for streaming aggregation where we have input rows from same group located together.  For certain functions, we can leverage this property to reduce the number of copy calls and create larger and fewer ranges for copy.  This brings 3x improvements for a specific query shape common in data loading for AI training.

We implement this optimization for `arbitrary` and `array_agg`.  For `arbitrary`, if the input is clustered, we just keep a reference to the input vector and index that is selected; when we extract values from the container, we group all copies from same vector to one `copyRange` call so the cost is minimized.  For `array_agg`, we do similar thing, only track the range (offset and size) where the input will be taken for each group, and do the copy in bulk when we extract value.

There is another optimization to flush the streaming aggregation output whenever there is result available, via a new query config `streaming_aggregation_eager_flush`.  This allows us to minimize the memory used by accumulators.

Differential Revision: D72677410
Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, Jimmy.

Yuhta added a commit to Yuhta/velox that referenced this pull request Apr 10, 2025
…ager_flush (facebookincubator#12975)

Summary:

Add fast path for streaming aggregation where we have input rows from same group located together.  For certain functions, we can leverage this property to reduce the number of copy calls and create larger and fewer ranges for copy.  This brings 3x improvements for a specific query shape common in data loading for AI training.

We implement this optimization for `arbitrary` and `array_agg`.  For `arbitrary`, if the input is clustered, we just keep a reference to the input vector and index that is selected; when we extract values from the container, we group all copies from same vector to one `copyRange` call so the cost is minimized.  For `array_agg`, we do similar thing, only track the range (offset and size) where the input will be taken for each group, and do the copy in bulk when we extract value.

There is another optimization to flush the streaming aggregation output whenever there is result available, via a new query config `streaming_aggregation_eager_flush`.  This allows us to minimize the memory used by accumulators.

Reviewed By: mbasmanova

Differential Revision: D72677410
@Yuhta Yuhta force-pushed the export-D72677410 branch from fa6d5ad to a987bc4 Compare April 10, 2025 21:27
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D72677410

Yuhta added a commit to Yuhta/velox that referenced this pull request Apr 10, 2025
…ager_flush (facebookincubator#12975)

Summary:

Add fast path for streaming aggregation where we have input rows from same group located together.  For certain functions, we can leverage this property to reduce the number of copy calls and create larger and fewer ranges for copy.  This brings 3x improvements for a specific query shape common in data loading for AI training.

We implement this optimization for `arbitrary` and `array_agg`.  For `arbitrary`, if the input is clustered, we just keep a reference to the input vector and index that is selected; when we extract values from the container, we group all copies from same vector to one `copyRange` call so the cost is minimized.  For `array_agg`, we do similar thing, only track the range (offset and size) where the input will be taken for each group, and do the copy in bulk when we extract value.

There is another optimization to flush the streaming aggregation output whenever there is result available, via a new query config `streaming_aggregation_eager_flush`.  This allows us to minimize the memory used by accumulators.

Reviewed By: mbasmanova

Differential Revision: D72677410
@Yuhta Yuhta force-pushed the export-D72677410 branch from a987bc4 to aec840e Compare April 10, 2025 22:26
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D72677410

Yuhta added a commit to Yuhta/velox that referenced this pull request Apr 10, 2025
…ager_flush (facebookincubator#12975)

Summary:

Add fast path for streaming aggregation where we have input rows from same group located together.  For certain functions, we can leverage this property to reduce the number of copy calls and create larger and fewer ranges for copy.  This brings 3x improvements for a specific query shape common in data loading for AI training.

We implement this optimization for `arbitrary` and `array_agg`.  For `arbitrary`, if the input is clustered, we just keep a reference to the input vector and index that is selected; when we extract values from the container, we group all copies from same vector to one `copyRange` call so the cost is minimized.  For `array_agg`, we do similar thing, only track the range (offset and size) where the input will be taken for each group, and do the copy in bulk when we extract value.

There is another optimization to flush the streaming aggregation output whenever there is result available, via a new query config `streaming_aggregation_eager_flush`.  This allows us to minimize the memory used by accumulators.

Reviewed By: mbasmanova

Differential Revision: D72677410
@Yuhta Yuhta force-pushed the export-D72677410 branch from aec840e to ed0ccd1 Compare April 10, 2025 22:34
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D72677410

…ager_flush (facebookincubator#12975)

Summary:

Add fast path for streaming aggregation where we have input rows from same group located together.  For certain functions, we can leverage this property to reduce the number of copy calls and create larger and fewer ranges for copy.  This brings 3x improvements for a specific query shape common in data loading for AI training.

We implement this optimization for `arbitrary` and `array_agg`.  For `arbitrary`, if the input is clustered, we just keep a reference to the input vector and index that is selected; when we extract values from the container, we group all copies from same vector to one `copyRange` call so the cost is minimized.  For `array_agg`, we do similar thing, only track the range (offset and size) where the input will be taken for each group, and do the copy in bulk when we extract value.

There is another optimization to flush the streaming aggregation output whenever there is result available, via a new query config `streaming_aggregation_eager_flush`.  This allows us to minimize the memory used by accumulators.

Reviewed By: mbasmanova

Differential Revision: D72677410
@Yuhta Yuhta force-pushed the export-D72677410 branch from ed0ccd1 to c2cedea Compare April 10, 2025 22:44
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D72677410

@facebook-github-bot
Copy link
Contributor

This pull request has been merged in afb236a.

@prestodb-ci
Copy link

Rebase triggered for oap-project/velox.

zhanglistar pushed a commit to bigo-sg/velox that referenced this pull request Apr 22, 2025
…ager_flush (facebookincubator#12975)

Summary:
Pull Request resolved: facebookincubator#12975

Add fast path for streaming aggregation where we have input rows from same group located together.  For certain functions, we can leverage this property to reduce the number of copy calls and create larger and fewer ranges for copy.  This brings 3x improvements for a specific query shape common in data loading for AI training.

We implement this optimization for `arbitrary` and `array_agg`.  For `arbitrary`, if the input is clustered, we just keep a reference to the input vector and index that is selected; when we extract values from the container, we group all copies from same vector to one `copyRange` call so the cost is minimized.  For `array_agg`, we do similar thing, only track the range (offset and size) where the input will be taken for each group, and do the copy in bulk when we extract value.

There is another optimization to flush the streaming aggregation output whenever there is result available, via a new query config `streaming_aggregation_eager_flush`.  This allows us to minimize the memory used by accumulators.

Reviewed By: mbasmanova

Differential Revision: D72677410

fbshipit-source-id: eedd664174b13784b47325eb2ab8274445470235
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. fb-exported Merged
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants