Skip to content

Conversation

@colin-ho
Copy link
Contributor

@colin-ho colin-ho commented Sep 18, 2025

Changes Made

Integrate from_glob_path into dataframe execution instead of immediately materializing.

This code used to hang (> 5 mins and then i killed it) now takes 5s

import daft
from daft.expressions import col

df = (daft
      .from_glob_path(
"s3://eventual-data-test-bucket/digitalcorpora/pdfs/") # 8M pdfs
      .with_column('image', col('path').url.download())
      )

df.show()

glob_path go brr

Related Issues

Checklist

  • Documented in API Docs (if applicable)
  • Documented in User Guide (if applicable)
  • If adding a new documentation page, doc is added to docs/mkdocs.yml navigation
  • Documentation builds and is formatted properly (tag @/ccmao1130 for docs review)

@github-actions github-actions bot added the feat label Sep 18, 2025
@colin-ho colin-ho changed the title feat: Lazy 'from_glob_path' feat: Lazy from_glob_path Sep 18, 2025
&self,
_maintain_order: bool,
io_stats: IOStatsRef,
chunk_size: usize,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's no url download or into batches or batch size this is the default morsel size of ~130k.

I wonder if it is too much for glob scan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think thats too big in general, so lets worry about that later

@codecov
Copy link

codecov bot commented Oct 4, 2025

Codecov Report

❌ Patch coverage is 66.57143% with 117 lines in your changes missing coverage. Please review.
✅ Project coverage is 75.25%. Comparing base (6cb34b3) to head (07102fe).
⚠️ Report is 4 commits behind head on main.

Files with missing lines Patch % Lines
...-distributed/src/pipeline_node/glob_scan_source.rs 0.00% 81 Missing ⚠️
src/daft-local-execution/src/sources/glob_scan.rs 87.09% 16 Missing ⚠️
...rc/daft-distributed/src/pipeline_node/translate.rs 0.00% 9 Missing ⚠️
...cal-plan/src/optimization/rules/push_down_shard.rs 0.00% 3 Missing ⚠️
...ft-physical-plan/src/physical_planner/translate.rs 0.00% 3 Missing ⚠️
src/daft-local-plan/src/plan.rs 89.47% 2 Missing ⚠️
...an/src/optimization/rules/push_down_aggregation.rs 0.00% 1 Missing ⚠️
...al-plan/src/optimization/rules/push_down_filter.rs 0.00% 1 Missing ⚠️
...lan/src/optimization/rules/push_down_projection.rs 0.00% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #5235      +/-   ##
==========================================
+ Coverage   74.58%   75.25%   +0.66%     
==========================================
  Files         985      988       +3     
  Lines      124804   124261     -543     
==========================================
+ Hits        93090    93517     +427     
+ Misses      31714    30744     -970     
Files with missing lines Coverage Δ
daft/io/file_path.py 100.00% <100.00%> (ø)
daft/logical/builder.py 92.57% <100.00%> (+0.18%) ⬆️
src/common/metrics/src/ops.rs 0.00% <ø> (ø)
src/daft-distributed/src/pipeline_node/mod.rs 34.64% <ø> (ø)
src/daft-local-execution/src/channel.rs 98.24% <100.00%> (ø)
src/daft-local-execution/src/pipeline.rs 80.17% <100.00%> (+0.27%) ⬆️
src/daft-local-execution/src/sources/empty_scan.rs 59.09% <ø> (ø)
src/daft-local-execution/src/sources/in_memory.rs 87.09% <ø> (ø)
src/daft-local-execution/src/sources/scan_task.rs 76.94% <100.00%> (ø)
src/daft-local-execution/src/sources/source.rs 83.20% <100.00%> (ø)
... and 14 more

... and 39 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

// Iterate over the unique glob paths and stream out the record batches
let unique_glob_paths = glob_paths.iter().unique().collect::<Vec<_>>();
// Only need to keep track of seen paths if there are multiple glob paths
let mut seen_paths = if unique_glob_paths.len() > 1 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This matches the current behavior.

If there are multiple unique source glob paths then we dedupe the results. I do wonder if this is the approach we take in the future because it can be expensive to do this dedupe, perhaps we can rethink and at the same time make it clear to user what the behavior is

}
// If no files were found, return an error
if !has_results {
return Err(common_error::DaftError::FileNotFound {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also the current behavior, only if no results then we return file not found err.

One idea i have is to expose some parameter to define the file_not_found behavior, like on_error=literal["raise", "warn", "ignore"]

@colin-ho colin-ho requested a review from srilman October 4, 2025 00:22
@colin-ho colin-ho marked this pull request as ready for review October 4, 2025 00:22
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Summary

This PR implements lazy evaluation for `from_glob_path` operations by integrating glob path scanning directly into the dataframe execution pipeline instead of immediately materializing file metadata. The change addresses a significant performance issue where operations on large glob patterns (like 8M PDFs) would hang for over 5 minutes and now complete in approximately 5 seconds.

The implementation introduces a new GlobScan source type that is integrated throughout the execution system:

  • Logical Plan Layer: Added GlobScanInfo to SourceInfo enum with predefined schema (path, size, num_rows) and support for pushdown operations
  • Physical Plan Layer: Added GlobScan variant to LocalPhysicalPlan with proper constructor and pattern matching
  • Execution Layer: Implemented GlobScanSource that streams file metadata asynchronously using channels and chunked processing
  • Optimization Layer: Updated all optimization rules (filter, projection, aggregation, limit pushdown) to handle GlobScan appropriately
  • Distributed Execution: Added GlobScanSourceNode for distributed pipeline execution

The key architectural change is moving from eager evaluation (where from_glob_path immediately calls runner_io.glob_paths_details() and materializes all file metadata) to lazy evaluation (where a GlobScan logical plan node is created and executed only when needed). The implementation maintains backward compatibility with the legacy Ray runner while enabling the performance benefits for newer execution engines.

The GlobScanSource implementation includes deduplication logic for overlapping glob patterns, proper limit handling with early termination, comprehensive error handling for missing files, and integration with the existing Source trait pattern. The streaming approach processes files in chunks and uses async channels to avoid memory pressure when dealing with millions of files.

Important Files Changed

Changed Files
Filename Score Overview
src/daft-local-execution/src/sources/glob_scan.rs 4/5 Added new GlobScanSource implementation with streaming, deduplication, and limit handling
src/daft-distributed/src/pipeline_node/glob_scan_source.rs 4/5 Added GlobScanSourceNode for distributed execution with proper task generation
src/daft-logical-plan/src/source_info.rs 4/5 Added GlobScanInfo variant to SourceInfo enum with predefined schema
src/daft-local-plan/src/plan.rs 5/5 Added GlobScan variant to LocalPhysicalPlan with constructor and pattern matching
daft/io/file_path.py 4/5 Refactored from_glob_path to use lazy evaluation with conditional execution paths
src/daft-logical-plan/src/builder/mod.rs 5/5 Added from_glob_scan method to LogicalPlanBuilder
daft/logical/builder.py 4/5 Added Python wrapper for from_glob_scan method
src/daft-local-execution/src/sources/source.rs 4/5 Changed Source trait chunk_size parameter from Option to usize
src/daft-local-execution/src/pipeline.rs 4/5 Added GlobScan support in pipeline translation
src/daft-logical-plan/src/optimization/rules/push_down_limit.rs 4/5 Added limit pushdown optimization for GlobScan sources
src/daft-logical-plan/src/ops/source.rs 4/5 Added stats and display logic for GlobScan source type
src/daft-local-plan/src/translate.rs 4/5 Added translation from GlobScan LogicalSource to LocalPhysicalPlan
src/daft-logical-plan/src/optimization/rules/push_down_filter.rs 5/5 Added guard clause to prevent filter pushdown for GlobScan
src/daft-logical-plan/src/optimization/rules/push_down_projection.rs 5/5 Added no-op case for GlobScan in projection pushdown
src/daft-logical-plan/src/optimization/rules/push_down_aggregation.rs 4/5 Added GlobScan to list of sources that don't support aggregation pushdown
src/daft-logical-plan/src/optimization/rules/push_down_shard.rs 5/5 Added error handling to prevent shard pushdown into GlobScan
src/daft-physical-plan/src/physical_planner/translate.rs 4/5 Added NotImplemented error for GlobScan in physical plan translation
tests/cookbook/test_dataloading.py 4/5 Updated tests to add collect() calls and test lazy evaluation with limits
src/daft-local-execution/src/sources/scan_task.rs 4/5 Changed chunk_size parameter from Option to usize
src/daft-distributed/src/pipeline_node/translate.rs 4/5 Added GlobScanSourceNode support in pipeline translation
src/daft-ir/src/proto/rel.rs 5/5 Added proto conversion handling for GlobScan with not_optimized_err
src/daft-local-execution/src/sources/empty_scan.rs 4/5 Updated chunk_size parameter from Option to usize
src/daft-local-execution/src/sources/in_memory.rs 5/5 Updated chunk_size parameter from Option to usize
src/daft-local-execution/src/sources/mod.rs 4/5 Added glob_scan module export
src/daft-distributed/src/pipeline_node/mod.rs 5/5 Added glob_scan_source module import
src/daft-local-execution/src/channel.rs 4/5 Removed Clone trait bound from create_channel function
daft/daft/init.pyi 5/5 Added from_glob_scan method signature to LogicalPlanBuilder
src/common/metrics/src/ops.rs 5/5 Added GlobScan node type to metrics system
src/daft-local-plan/src/lib.rs 5/5 Added GlobScan to public exports
src/daft-local-execution/Cargo.toml 5/5 Added common-io-config dependency
src/daft-local-plan/Cargo.toml 5/5 Added common-io-config dependency
src/daft-distributed/Cargo.toml 5/5 Added common-io-config dependency

Confidence score: 4/5

  • This PR implements a significant architectural change that successfully addresses a critical performance issue by moving from eager to lazy evaluation of glob operations
  • The score reflects comprehensive integration across all layers of the system with proper error handling and optimization rule updates
  • Pay close attention to the new GlobScanSource implementation and its interaction with the broader execution pipeline, particularly around memory management and streaming behavior

Sequence Diagram

sequenceDiagram
    participant User
    participant DataFrame
    participant LogicalPlanBuilder
    participant LogicalPlan
    participant ExecutionEngine
    participant GlobScanSource
    participant IOClient
    participant FileSystem

    User->>DataFrame: from_glob_path(glob_paths, io_config)
    DataFrame->>LogicalPlanBuilder: from_glob_scan(glob_paths, io_config)
    LogicalPlanBuilder->>LogicalPlan: Source(SourceInfo::GlobScan)
    LogicalPlan-->>LogicalPlanBuilder: LogicalPlan node
    LogicalPlanBuilder-->>DataFrame: DataFrame with lazy plan
    DataFrame-->>User: DataFrame (lazy)

    Note over User,FileSystem: Execution only happens when .show() or .collect() is called

    User->>DataFrame: .show() or .collect()
    DataFrame->>ExecutionEngine: execute plan
    ExecutionEngine->>GlobScanSource: create source node
    GlobScanSource->>IOClient: get_io_client()
    IOClient-->>GlobScanSource: io_client
    GlobScanSource->>IOClient: get_source_and_path()
    IOClient->>FileSystem: glob(path, limit, io_stats)
    FileSystem-->>IOClient: file_metadata stream
    IOClient-->>GlobScanSource: file results
    GlobScanSource->>GlobScanSource: process file chunks
    GlobScanSource-->>ExecutionEngine: MicroPartition stream
    ExecutionEngine-->>DataFrame: results
    DataFrame-->>User: materialized data
Loading

32 files reviewed, 2 comments

Edit Code Review Agent Settings | Greptile

(_, Some(0)) => futures::future::ready(Ok(false)),
// Limit has not yet been met, update remaining remaining_rows and continue.
(table, Some(rows_left)) => {
remaining_rows = Some(rows_left.saturating_sub(table.len()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: captures remaining_rows by value in closure, preventing mutation of the outer variable - the limit logic may not work correctly

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/daft-local-execution/src/sources/glob_scan.rs
Line: 157:157

Comment:
**logic:** captures `remaining_rows` by value in closure, preventing mutation of the outer variable - the limit logic may not work correctly

How can I resolve this? If you propose a fix, please make it concise.

use super::source::Source;
use crate::{channel::create_channel, pipeline::NodeName, sources::source::SourceStream};

#[allow(dead_code)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: #[allow(dead_code)] should be removed if this struct is actually being used

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/daft-local-execution/src/sources/glob_scan.rs
Line: 21:21

Comment:
**style:** `#[allow(dead_code)]` should be removed if this struct is actually being used

How can I resolve this? If you propose a fix, please make it concise.

);
}
Self::GlobScan(_) => {
not_optimized_err!("GlobScan is not yet implemented in proto conversion.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the error should be different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implemented the protos for this and ioconfig

&self,
_maintain_order: bool,
io_stats: IOStatsRef,
chunk_size: usize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think thats too big in general, so lets worry about that later

@colin-ho colin-ho enabled auto-merge (squash) October 4, 2025 00:52
@colin-ho colin-ho merged commit 2ff3501 into main Oct 6, 2025
43 checks passed
@colin-ho colin-ho deleted the colin/lazy-from-glob-path branch October 6, 2025 18:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants