Skip to content

Conversation

Jay-ju
Copy link
Contributor

@Jay-ju Jay-ju commented Sep 21, 2025

Changes Made

Related Issues

Checklist

  • Documented in API Docs (if applicable)
  • Documented in User Guide (if applicable)
  • If adding a new documentation page, doc is added to docs/mkdocs.yml navigation
  • Documentation builds and is formatted properly (tag @/ccmao1130 for docs review)

@github-actions github-actions bot added the feat label Sep 21, 2025
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Summary

This PR introduces configurable morsel size bounds to the Daft execution engine through two new optional configuration parameters: morsel_size_lower_bound and morsel_size_upper_bound. The implementation allows users to control the size of data morsels (chunks) during execution to prevent performance issues from overly large morsels (causing memory pressure) or overly small morsels (causing processing overhead).

The changes span multiple layers of the architecture:

  1. Configuration Layer: New fields added to DaftExecutionConfig in Rust with corresponding Python bindings, supporting both programmatic configuration and environment variables (DAFT_MORSEL_SIZE_LOWER_BOUND, DAFT_MORSEL_SIZE_UPPER_BOUND)

  2. Pipeline Integration: The execution configuration is systematically threaded through all pipeline nodes (BlockingSinkNode, StreamingSinkNode, IntermediateNode) via cfg.clone() calls to enable access to the new bounds

  3. Dispatcher Logic: The core get_morsel_size_bounds() helper function implements precedence logic where global config bounds override operator-specific MorselSizeRequirement settings when both are present

  4. Python API: The daft.context.set_execution_config() method now accepts the new parameters with validation to ensure lower bound doesn't exceed upper bound

The feature maintains backward compatibility by making both bounds optional (defaulting to None) and falling back to existing behavior when not configured. The implementation follows established patterns in the codebase for configuration management and includes comprehensive test coverage.

Confidence score: 3/5

  • This PR introduces significant complexity with potential performance implications due to extensive config cloning throughout the pipeline
  • Score reflects concerns about the hard-coded test assertion and incomplete PR description, though the core implementation appears sound
  • Pay close attention to the test file and dispatcher logic implementation for potential runtime issues

11 files reviewed, 5 comments

Edit Code Review Bot Settings | Greptile

let morsel_size_lower_bound_var = "DAFT_MORSEL_SIZE_LOWER_BOUND";
if let Ok(val) = std::env::var(morsel_size_lower_bound_var) {
match val.parse::<usize>() {
Ok(parsed) => cfg.morsel_size_lower_bound = Some(parsed),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider validating that parsed > 0 to prevent zero lower bound which could cause issues in buffer logic

let morsel_size_upper_bound_var = "DAFT_MORSEL_SIZE_UPPER_BOUND";
if let Ok(val) = std::env::var(morsel_size_upper_bound_var) {
match val.parse::<usize>() {
Ok(parsed) => cfg.morsel_size_upper_bound = Some(parsed),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Should validate that parsed > 0 and potentially ensure upper bound is reasonable (not too large) to prevent memory issues

@Jay-ju Jay-ju force-pushed the morse_size branch 5 times, most recently from ecf1890 to 012dc75 Compare September 22, 2025 12:52
@codecov
Copy link

codecov bot commented Sep 22, 2025

Codecov Report

❌ Patch coverage is 84.65608% with 29 lines in your changes missing coverage. Please review.
✅ Project coverage is 74.62%. Comparing base (10086b2) to head (a043353).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
src/daft-local-execution/src/pipeline.rs 90.98% 11 Missing ⚠️
src/common/daft-config/src/python.rs 56.52% 10 Missing ⚠️
src/common/daft-config/src/lib.rs 50.00% 8 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #5250      +/-   ##
==========================================
+ Coverage   74.24%   74.62%   +0.38%     
==========================================
  Files         973      973              
  Lines      125213   124658     -555     
==========================================
+ Hits        92959    93030      +71     
+ Misses      32254    31628     -626     
Files with missing lines Coverage Δ
daft/context.py 88.88% <ø> (ø)
src/daft-local-execution/src/dispatcher.rs 93.90% <100.00%> (-2.90%) ⬇️
...-execution/src/intermediate_ops/intermediate_op.rs 91.41% <100.00%> (+0.22%) ⬆️
...rc/daft-local-execution/src/sinks/blocking_sink.rs 90.05% <100.00%> (+0.21%) ⬆️
src/daft-local-execution/src/sinks/write.rs 90.69% <100.00%> (+0.07%) ⬆️
...rc/daft-local-execution/src/streaming_sink/base.rs 79.80% <100.00%> (+0.39%) ⬆️
src/common/daft-config/src/lib.rs 69.91% <50.00%> (-2.98%) ⬇️
src/common/daft-config/src/python.rs 67.35% <56.52%> (-0.93%) ⬇️
src/daft-local-execution/src/pipeline.rs 81.13% <90.98%> (+1.18%) ⬆️

... and 16 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@Jay-ju Jay-ju force-pushed the morse_size branch 3 times, most recently from 31df42f to a043353 Compare September 22, 2025 13:52
@Jay-ju
Copy link
Contributor Author

Jay-ju commented Sep 22, 2025

@colin-ho When you have time, please help me review the changes in this part. The main issue is that the current sink operator cannot control the size of the written data, so here we first implement a workaround. Let's make the upper and lower limits of morse configurable.

@colin-ho
Copy link
Contributor

Have you tried using into_batches?

@Jay-ju
Copy link
Contributor Author

Jay-ju commented Sep 23, 2025

Have you tried using into_batches?

@colin-ho Do you mean that repartition(1) becomes into_batches?

@colin-ho
Copy link
Contributor

I mean using into_batches to control the batch size. Since the main issue is that the current sink operator cannot control the size of the written data, perhaps using into_batches can solve the problem

@Jay-ju
Copy link
Contributor Author

Jay-ju commented Sep 23, 2025

It does work. into_batches indeed makes the data in the partition larger. I want to confirm whether shuffle will be performed here, or if it's just a local exchange?

@colin-ho
Copy link
Contributor

into_batches does not shuffle

@Jay-ju
Copy link
Contributor Author

Jay-ju commented Sep 23, 2025

@colin-ho Thank you very much.
Then, can the logic for adding configuration items here also exist? I originally wanted to uniformly set this configuration item for pipeline.rs. If there are requirements, I would prioritize the requirements. However, I found that it would affect the sorting of some window functions, so in the current PR, only the morsel_size in write.rs has been modified.

@colin-ho
Copy link
Contributor

The way the current morsel sizing works right now is that there is a default morsel size range of (0, ~128k rows]. This requirement is propagated top down, until an operator with a required morsel size is reached, e.g. UDF with batch size, then this new batch size becomes the new morsel size range.

See #4894 for more details.

The benefit of this is memory, if the UDF requires batch size of 100, then the upstream scan does not need to scan more than 100 rows at a time.

For this PR, lets keep the new configs for min / max morsel size that you have already added, and also add a deprecation warning for the existing default_morsel_size config, in favor of the new min / max configs. However, we should not change morsel size directly in write.rs or any other operator, the propagate_morsel_size_requirement should take care of this.

pipeline_node.propagate_morsel_size_requirement(
MorselSizeRequirement::Flexible(0, cfg.default_morsel_size),
MorselSizeRequirement::Flexible(0, cfg.default_morsel_size),
MorselSizeRequirement::Flexible(
Copy link
Contributor Author

@Jay-ju Jay-ju Oct 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@colin-ho I directly modified the default value here to make it configurable. However, I don't really understand the statement "However, we should not change morsel size directly in write.rs or any other operator". It mainly refers to https://github.com/Eventual-Inc/Daft/blob/main/src/daft-local-execution/src/sinks/write.rs#L220C2-L222C50 ,It seems like this is also a fixed configuration. Why is the maximum value here using int::max instead of 128k rows? Can I modify it according to the current global configuration (even though I already know that this function can be achieved through into_batches)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@colin-ho bump

Copy link
Contributor

@colin-ho colin-ho Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the maximum value here using int::max instead of 128k rows?

int::max means that there won't be any buffering. We use it instead of 128k rows because the write sinks internally do their own buffering, such as the parquet writer, so we don't need to pre-buffer.

Can I modify it according to the current global configuration (even though I already know that this function can be achieved through into_batches)?

Let's not directly modify the batch size based on global configuration on write.rs. I think for writes, if users want to change the configurations for writes, like batch size or row group size, they should be able to use into_batches or via a parameter on the write operation itself

@Jay-ju
Copy link
Contributor Author

Jay-ju commented Oct 11, 2025

The way the current morsel sizing works right now is that there is a default morsel size range of (0, ~128k rows]. This requirement is propagated top down, until an operator with a required morsel size is reached, e.g. UDF with batch size, then this new batch size becomes the new morsel size range.

See #4894 for more details.

The benefit of this is memory, if the UDF requires batch size of 100, then the upstream scan does not need to scan more than 100 rows at a time.

For this PR, lets keep the new configs for min / max morsel size that you have already added, and also add a deprecation warning for the existing default_morsel_size config, in favor of the new min / max configs. However, we should not change morsel size directly in write.rs or any other operator, the propagate_morsel_size_requirement should take care of this.

I have made modifications according to this, but I have submitted some questions to the corresponding code section. We can discuss them together.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants