Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,7 @@ class ScanTask:
size_bytes: int | None,
pushdowns: PyPushdowns | None,
stats: PyRecordBatch | None,
source_type: str | None = None,
) -> ScanTask:
"""Create a Python factory function Scan Task."""
...
Expand Down
3 changes: 2 additions & 1 deletion daft/io/__shim.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def name(self) -> str:
return self._source.name

def display_name(self) -> str:
return f"DataSource({self.name()})"
return f"{self.name()}(Python)"

def partitioning_keys(self) -> list[PyPartitionField]:
return [pf._partition_field for pf in self._source.get_partition_fields()]
Expand Down Expand Up @@ -66,6 +66,7 @@ def to_scan_tasks(self, pushdowns: PyPushdowns) -> Iterator[ScanTask]:
size_bytes=None,
pushdowns=pushdowns,
stats=None,
source_type=self.name(),
)

def as_pushdown_filter(self) -> SupportsPushdownFilters | None:
Expand Down
1 change: 1 addition & 0 deletions daft/io/_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,5 @@ def to_scan_tasks(self, pushdowns: PyPushdowns) -> Iterator[ScanTask]:
size_bytes=None,
pushdowns=pushdowns,
stats=None,
source_type=self.name(),
)
4 changes: 4 additions & 0 deletions daft/io/lance/lance_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def to_scan_tasks(self, pushdowns: PyPushdowns) -> Iterator[ScanTask]:
size_bytes=None,
pushdowns=pushdowns,
stats=None,
source_type=self.name(),
)
# Check if there is a limit pushdown and no filters
elif pushdowns.limit is not None and self._pushed_filters is None:
Expand Down Expand Up @@ -197,6 +198,7 @@ def _create_scan_tasks_with_limit_and_no_filters(
size_bytes=None,
pushdowns=pushdowns,
stats=None,
source_type=self.name(),
)

def _create_regular_scan_tasks(
Expand Down Expand Up @@ -224,6 +226,7 @@ def _create_regular_scan_tasks(
size_bytes=size_bytes,
pushdowns=pushdowns,
stats=stats,
source_type=self.name(),
)
else:
# Group fragments
Expand Down Expand Up @@ -258,6 +261,7 @@ def _create_regular_scan_tasks(
size_bytes=size_bytes,
pushdowns=pushdowns,
stats=stats,
source_type=self.name(),
)

def _combine_filters_to_arrow(self) -> Optional["pa.compute.Expression"]:
Expand Down
35 changes: 26 additions & 9 deletions src/common/file-formats/src/file_format_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ pub enum FileFormatConfig {
#[cfg(feature = "python")]
Database(DatabaseSourceConfig),
#[cfg(feature = "python")]
PythonFunction,
PythonFunction {
source_type: Option<String>,
module_name: Option<String>,
function_name: Option<String>,
},
}

impl FileFormatConfig {
Expand All @@ -32,16 +36,29 @@ impl FileFormatConfig {
}

#[must_use]
pub fn var_name(&self) -> &'static str {
pub fn var_name(&self) -> String {
match self {
Self::Parquet(_) => "Parquet",
Self::Csv(_) => "Csv",
Self::Json(_) => "Json",
Self::Warc(_) => "Warc",
Self::Parquet(_) => "Parquet".to_string(),
Self::Csv(_) => "Csv".to_string(),
Self::Json(_) => "Json".to_string(),
Self::Warc(_) => "Warc".to_string(),
#[cfg(feature = "python")]
Self::Database(_) => "Database",
Self::Database(_) => "Database".to_string(),
#[cfg(feature = "python")]
Self::PythonFunction => "PythonFunction",
Self::PythonFunction {
source_type,
module_name,
function_name: _,
} => {
if let Some(source_type) = source_type {
format!("{}(Python)", source_type)
} else if let Some(module_name) = module_name {
// Infer type from module name
format!("{}(Python)", module_name)
} else {
"PythonFunction".to_string()
}
}
}
}

Expand All @@ -55,7 +72,7 @@ impl FileFormatConfig {
#[cfg(feature = "python")]
Self::Database(source) => source.multiline_display(),
#[cfg(feature = "python")]
Self::PythonFunction => vec![],
Self::PythonFunction { .. } => vec![],
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion src/common/file-formats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ impl From<&FileFormatConfig> for FileFormat {
#[cfg(feature = "python")]
FileFormatConfig::Database(_) => Self::Database,
#[cfg(feature = "python")]
FileFormatConfig::PythonFunction => Self::Python,
FileFormatConfig::PythonFunction {
source_type: _,
module_name: _,
function_name: _,
} => Self::Python,
}
}
}
6 changes: 5 additions & 1 deletion src/common/file-formats/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ impl PyFileFormatConfig {
.clone()
.into_pyobject(py)
.map(|c| c.unbind().into_any()),
FileFormatConfig::PythonFunction => Ok(py.None()),
FileFormatConfig::PythonFunction {
source_type: _,
module_name: _,
function_name: _,
} => Ok(py.None()),
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/daft-local-execution/src/sources/scan_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ async fn stream_scan_task(
Box::pin(futures::stream::once(async { Ok(table) }))
}
#[cfg(feature = "python")]
FileFormatConfig::PythonFunction => {
FileFormatConfig::PythonFunction { .. } => {
let iter = daft_micropartition::python::read_pyfunc_into_table_iter(scan_task.clone())?;
let stream = futures::stream::iter(iter.map(|r| r.map_err(|e| e.into())));
Box::pin(stream)
Expand Down
6 changes: 5 additions & 1 deletion src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,11 @@ fn materialize_scan_task(
})?
}
#[cfg(feature = "python")]
FileFormatConfig::PythonFunction => {
FileFormatConfig::PythonFunction {
source_type: _,
module_name: _,
function_name: _,
} => {
let tables = crate::python::read_pyfunc_into_table_iter(scan_task.clone())?;
tables.collect::<crate::Result<Vec<_>>>()?
}
Expand Down
6 changes: 5 additions & 1 deletion src/daft-scan/src/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,11 @@ impl GlobScanOperator {
));
}
#[cfg(feature = "python")]
FileFormatConfig::PythonFunction => {
FileFormatConfig::PythonFunction {
source_type: _,
module_name: _,
function_name: _,
} => {
return Err(DaftError::ValueError(
"Cannot glob a PythonFunction source".to_string(),
));
Expand Down
6 changes: 5 additions & 1 deletion src/daft-scan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,11 @@ impl ScanTask {
#[cfg(feature = "python")]
FileFormatConfig::Database(_) => 1.0,
#[cfg(feature = "python")]
FileFormatConfig::PythonFunction => 1.0,
FileFormatConfig::PythonFunction {
source_type: _,
module_name: _,
function_name: _,
} => 1.0,
};
let in_mem_size: f64 = (file_size as f64) * inflation_factor;
let read_row_size = if self.is_warc() {
Expand Down
17 changes: 13 additions & 4 deletions src/daft-scan/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,8 @@ pub mod pylib {
num_rows=None,
size_bytes=None,
pushdowns=None,
stats=None
stats=None,
source_type=None
))]
pub fn python_factory_func_scan_task(
module: String,
Expand All @@ -581,13 +582,14 @@ pub mod pylib {
size_bytes: Option<u64>,
pushdowns: Option<PyPushdowns>,
stats: Option<PyRecordBatch>,
source_type: Option<String>,
) -> PyResult<Self> {
let statistics = stats
.map(|s| TableStatistics::from_stats_table(&s.record_batch))
.transpose()?;
let data_source = DataSource::PythonFactoryFunction {
module,
func_name,
module: module.clone(),
func_name: func_name.clone(),
func_args: PythonTablesFactoryArgs::new(
func_args.into_iter().map(Arc::new).collect(),
),
Expand All @@ -599,9 +601,16 @@ pub mod pylib {
partition_spec: None,
};

// Create enhanced FileFormatConfig with context information
let file_format_config = Arc::new(FileFormatConfig::PythonFunction {
source_type,
module_name: Some(module),
function_name: Some(func_name),
});

let scan_task = ScanTask::new(
vec![data_source],
Arc::new(FileFormatConfig::PythonFunction),
file_format_config,
schema.schema,
// HACK: StorageConfig isn't used when running the Python function but this is a non-optional arg for
// ScanTask creation, so we just put in a placeholder here
Expand Down
Loading