Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import threading
import warnings
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, ClassVar
from typing import TYPE_CHECKING, Annotated, Any, ClassVar

from typing_extensions import deprecated

from daft import runners
from daft.daft import IOConfig, PyDaftContext, PyDaftExecutionConfig, PyDaftPlanningConfig
Expand Down Expand Up @@ -251,7 +253,9 @@ def set_execution_config(
high_cardinality_aggregation_threshold: float | None = None,
read_sql_partition_size_bytes: int | None = None,
enable_aqe: bool | None = None,
default_morsel_size: int | None = None,
default_morsel_size: Annotated[
int | None, deprecated("Use morsel_size_lower_bound and morsel_size_upper_bound instead.")
] = None,
shuffle_algorithm: str | None = None,
pre_shuffle_merge_threshold: int | None = None,
flight_shuffle_dirs: list[str] | None = None,
Expand All @@ -262,6 +266,8 @@ def set_execution_config(
use_legacy_ray_runner: bool | None = None,
min_cpu_per_task: float | None = None,
actor_udf_ready_timeout: int | None = None,
morsel_size_lower_bound: int | None = None,
morsel_size_upper_bound: int | None = None,
) -> DaftContext:
"""Globally sets various configuration parameters which control various aspects of Daft execution.

Expand Down Expand Up @@ -301,7 +307,7 @@ def set_execution_config(
high_cardinality_aggregation_threshold: Threshold selectivity for performing high cardinality aggregations on the Native Runner. Defaults to 0.8.
read_sql_partition_size_bytes: Target size of partition when reading from SQL databases. Defaults to 512MB
enable_aqe: Enables Adaptive Query Execution, Defaults to False
default_morsel_size: Default size of morsels used for the new local executor. Defaults to 131072 rows.
default_morsel_size: DEPRECATED: Default size of morsels used for the new local executor. Defaults to 131072 rows.
shuffle_algorithm: The shuffle algorithm to use. Defaults to "auto", which will let Daft determine the algorithm. Options are "map_reduce" and "pre_shuffle_merge".
pre_shuffle_merge_threshold: Memory threshold in bytes for pre-shuffle merge. Defaults to 1GB
flight_shuffle_dirs: The directories to use for flight shuffle. Defaults to ["/tmp"].
Expand Down Expand Up @@ -349,6 +355,8 @@ def set_execution_config(
use_legacy_ray_runner=use_legacy_ray_runner,
min_cpu_per_task=min_cpu_per_task,
actor_udf_ready_timeout=actor_udf_ready_timeout,
morsel_size_lower_bound=morsel_size_lower_bound,
morsel_size_upper_bound=morsel_size_upper_bound,
)

ctx._ctx._daft_execution_config = new_daft_execution_config
Expand Down
6 changes: 6 additions & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -2022,6 +2022,8 @@ class PyDaftExecutionConfig:
use_legacy_ray_runner: bool | None = None,
min_cpu_per_task: float | None = None,
actor_udf_ready_timeout: int | None = None,
morsel_size_lower_bound: int | None = None,
morsel_size_upper_bound: int | None = None,
) -> PyDaftExecutionConfig: ...
@property
def scan_tasks_min_size_bytes(self) -> int: ...
Expand Down Expand Up @@ -2077,6 +2079,10 @@ class PyDaftExecutionConfig:
def actor_udf_ready_timeout(self) -> int: ...
@property
def scantask_max_parallel(self) -> int: ...
@property
def low_bound_morsel_size(self) -> int: ...
@property
def upper_bound_morsel_size(self) -> int: ...

class PyDaftPlanningConfig:
@staticmethod
Expand Down
24 changes: 24 additions & 0 deletions src/common/daft-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ pub struct DaftExecutionConfig {
pub use_legacy_ray_runner: bool,
pub min_cpu_per_task: f64,
pub actor_udf_ready_timeout: usize,
pub morsel_size_lower_bound: Option<usize>,
pub morsel_size_upper_bound: Option<usize>,
}

impl Default for DaftExecutionConfig {
Expand Down Expand Up @@ -112,6 +114,8 @@ impl Default for DaftExecutionConfig {
use_legacy_ray_runner: false,
min_cpu_per_task: 0.5,
actor_udf_ready_timeout: 120,
morsel_size_lower_bound: Some(0), // Minimum value is 0
morsel_size_upper_bound: Some(128 * 1024), // Same as default_morsel_size
}
}
}
Expand Down Expand Up @@ -174,6 +178,26 @@ impl DaftExecutionConfig {
),
}
}
let morsel_size_lower_bound_var = "DAFT_MORSEL_SIZE_LOWER_BOUND";
if let Ok(val) = std::env::var(morsel_size_lower_bound_var) {
match val.parse::<usize>() {
Ok(parsed) => cfg.morsel_size_lower_bound = Some(parsed),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider validating that parsed > 0 to prevent zero lower bound which could cause issues in buffer logic

Err(_) => eprintln!(
"Invalid {} value: {}, ignoring",
morsel_size_lower_bound_var, val
),
}
}
let morsel_size_upper_bound_var = "DAFT_MORSEL_SIZE_UPPER_BOUND";
if let Ok(val) = std::env::var(morsel_size_upper_bound_var) {
match val.parse::<usize>() {
Ok(parsed) => cfg.morsel_size_upper_bound = Some(parsed),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Should validate that parsed > 0 and potentially ensure upper bound is reasonable (not too large) to prevent memory issues

Err(_) => eprintln!(
"Invalid {} value: {}, ignoring",
morsel_size_upper_bound_var, val
),
}
}
cfg
}
}
Expand Down
35 changes: 35 additions & 0 deletions src/common/daft-config/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ impl PyDaftExecutionConfig {
use_legacy_ray_runner=None,
min_cpu_per_task=None,
actor_udf_ready_timeout=None,
morsel_size_lower_bound=None,
morsel_size_upper_bound=None,
))]
fn with_config_values(
&self,
Expand Down Expand Up @@ -152,6 +154,8 @@ impl PyDaftExecutionConfig {
use_legacy_ray_runner: Option<bool>,
min_cpu_per_task: Option<f64>,
actor_udf_ready_timeout: Option<usize>,
morsel_size_lower_bound: Option<usize>,
morsel_size_upper_bound: Option<usize>,
) -> PyResult<Self> {
let mut config = self.config.as_ref().clone();

Expand Down Expand Up @@ -221,6 +225,27 @@ impl PyDaftExecutionConfig {
if let Some(default_morsel_size) = default_morsel_size {
config.default_morsel_size = default_morsel_size;
}

if let Some(morsel_size_lower_bound) = morsel_size_lower_bound {
config.morsel_size_lower_bound = Some(morsel_size_lower_bound);
}

if let Some(morsel_size_upper_bound) = morsel_size_upper_bound {
config.morsel_size_upper_bound = Some(morsel_size_upper_bound);
}

// Validate morsel size bounds
if let (Some(lower), Some(upper)) = (
config.morsel_size_lower_bound,
config.morsel_size_upper_bound,
) && lower > upper
{
return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
"morsel_size_lower_bound ({}) cannot be greater than morsel_size_upper_bound ({})",
lower, upper
)));
}

if let Some(shuffle_algorithm) = shuffle_algorithm {
if !matches!(
shuffle_algorithm,
Expand Down Expand Up @@ -407,6 +432,16 @@ impl PyDaftExecutionConfig {
fn actor_udf_ready_timeout(&self) -> PyResult<usize> {
Ok(self.config.actor_udf_ready_timeout)
}

#[getter]
fn morsel_size_lower_bound(&self) -> PyResult<Option<usize>> {
Ok(self.config.morsel_size_lower_bound)
}

#[getter]
fn morsel_size_upper_bound(&self) -> PyResult<Option<usize>> {
Ok(self.config.morsel_size_upper_bound)
}
}

impl_bincode_py_state_serialization!(PyDaftExecutionConfig);
13 changes: 10 additions & 3 deletions src/daft-local-execution/src/intermediate_ops/intermediate_op.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use capitalize::Capitalize;
use common_daft_config::DaftExecutionConfig;
use common_display::tree::TreeDisplay;
use common_error::DaftResult;
use common_metrics::ops::{NodeCategory, NodeInfo, NodeType};
Expand Down Expand Up @@ -66,6 +67,7 @@ pub(crate) trait IntermediateOperator: Send + Sync {
&self,
morsel_size_requirement: MorselSizeRequirement,
maintain_order: bool,
_config: &DaftExecutionConfig,
) -> Arc<dyn DispatchSpawner> {
if maintain_order {
Arc::new(RoundRobinDispatcher::new(morsel_size_requirement))
Expand All @@ -82,6 +84,7 @@ pub struct IntermediateNode<Op: IntermediateOperator> {
plan_stats: StatsState,
morsel_size_requirement: MorselSizeRequirement,
node_info: Arc<NodeInfo>,
config: Arc<DaftExecutionConfig>,
}

impl<Op: IntermediateOperator + 'static> IntermediateNode<Op> {
Expand All @@ -90,6 +93,7 @@ impl<Op: IntermediateOperator + 'static> IntermediateNode<Op> {
children: Vec<Box<dyn PipelineNode>>,
plan_stats: StatsState,
ctx: &RuntimeContext,
config: Arc<DaftExecutionConfig>,
) -> Self {
let info = ctx.next_node_info(
Arc::from(intermediate_op.name()),
Expand All @@ -107,6 +111,7 @@ impl<Op: IntermediateOperator + 'static> IntermediateNode<Op> {
plan_stats,
morsel_size_requirement,
node_info: Arc::new(info),
config,
}
}

Expand Down Expand Up @@ -269,9 +274,11 @@ impl<Op: IntermediateOperator + 'static> PipelineNode for IntermediateNode<Op> {
let (destination_sender, destination_receiver) = create_channel(0);
let counting_sender = CountingSender::new(destination_sender, self.runtime_stats.clone());

let dispatch_spawner = self
.intermediate_op
.dispatch_spawner(self.morsel_size_requirement, maintain_order);
let dispatch_spawner = self.intermediate_op.dispatch_spawner(
self.morsel_size_requirement,
maintain_order,
&self.config,
);
let spawned_dispatch_result = dispatch_spawner.spawn_dispatch(
child_result_receivers,
num_workers,
Expand Down
Loading
Loading