-
Notifications
You must be signed in to change notification settings - Fork 315
refactor(lance): implement Lance DataSink batching mechanism with row-based flush control #5246
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Summary
This PR introduces a comprehensive batching mechanism for Lance DataSink to improve write performance by combining multiple micropartitions before writing to Lance datasets. The implementation addresses a key performance issue where writing many small micropartitions individually results in fragmented Lance files, which negatively impacts both write and read performance.
The core changes include:
API Extensions: New parameters batch_size
(default: 1) and max_batch_rows
(default: 100,000) added to the write_lance()
method in dataframe.py
. These parameters provide dual flush control - batching either by micropartition count or total row count, whichever limit is reached first.
DataSink Implementation: The LanceDataSink
class in lance_data_sink.py
has been extensively modified to support stateful batching. It now maintains instance-level state with _batch_tables
, _batch_row_count
, and _batch_count
to accumulate micropartitions across multiple write()
calls. The implementation includes intelligent flush logic that triggers when either batching condition is met.
Query Optimization: A separate optimization in lance_scan.py
adds fragment filtering to skip empty fragments during scanning, reducing unnecessary scan task creation.
Comprehensive Testing: New test suite test_write_lance_batch_size.py
validates the batching mechanism with scenarios testing parameter acceptance, data integrity, fragment reduction effectiveness, and dual flush conditions.
The batching mechanism maintains full backward compatibility since batch_size=1
(default) preserves existing behavior. When batching is enabled, multiple micropartitions are accumulated in memory and written together as larger Lance files, reducing fragmentation and improving performance. The dual-condition approach provides flexibility for users to control batching based on their specific memory constraints and performance requirements.
Confidence score: 2/5
- This PR requires careful review due to production code issues and potential thread safety concerns
- Score lowered due to debug print statements left in production code, complex error handling that may mask issues, and instance-level state management that could have thread safety implications
- Pay close attention to
daft/io/lance/lance_data_sink.py
for the debug prints and error handling logic
4 files reviewed, 5 comments
if fragment.count_rows(pushed_expr) == 0: | ||
logger.debug("Skipping fragment %s with fragment_id %s with 0 rows", fragment.fragment_id) | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: This optimization adds I/O overhead by calling count_rows()
with filters for every fragment. Consider measuring the performance impact - for datasets with many small fragments or complex filters, this could be slower than just processing empty fragments downstream.
def test_batch_size_reduces_fragments(): | ||
"""Test that batch_size parameter is accepted and maintains data integrity.""" | ||
test_batch_size_parameter_acceptance() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: This function just calls another test function without adding value. Consider removing this wrapper or consolidating the test logic.
from daft.datatype import DataType | ||
from daft.io.lance.lance_data_sink import LanceDataSink | ||
from daft.schema import Schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Import statements should be placed at the top of the file rather than within the test function.
Context Used: Rule - Import statements should be placed at the top of the file rather than inline within functions or methods. (link)
1011cae
to
56f6686
Compare
…-based flush control This commit introduces a comprehensive batching mechanism for Lance DataSink to improve write performance and merge small files by combining multiple micropartitions before writing, creating larger Lance files and reducing fragmentation. Key Features: - Add batch_size parameter to control number of micropartitions to batch (default: 1) - Add max_batch_rows parameter for row-based flush control (default: 100,000) - Implement intelligent batching logic with dual flush conditions: * Flush when batch_size micropartitions are accumulated * Flush when max_batch_rows total rows are reached - Maintain full backward compatibility with existing code
56f6686
to
ef69e2c
Compare
This commit introduces a comprehensive batching mechanism for Lance DataSink to improve write performance by combining multiple micropartitions before writing, creating larger Lance files and reducing fragmentation.
Key Features:
Changes Made
Related Issues
Checklist
docs/mkdocs.yml
navigation