-
Notifications
You must be signed in to change notification settings - Fork 353
feat: Implement KVConfig multi-backend architecture #5170
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Summary
This PR implements a comprehensive multi-backend KV (Key-Value) store architecture for Daft, enabling support for different storage backends through a unified configuration interface. The changes introduce:
Core Architecture Changes:
- New
KVConfigclass serving as a unified wrapper for different backend configurations (LanceConfigandLMDBConfig) - Session-based KV store management following the established pattern used for catalogs and providers
- Multi-backend support with Lance (optimized for AI/ML workloads) and LMDB (for high-performance caching)
API Extensions:
- Updated
kv_get,kv_batch_get, andkv_existsfunctions to acceptKVConfigparameters - New session management functions:
attach_kv,detach_kv,set_kv,get_kv,has_kv, andcurrent_kv - Factory methods for easy KV store creation (
load_kvfunction) - Backward compatibility maintained with legacy URI parameters
Implementation Details:
- Rust-Python integration through JSON serialization of configuration objects
- Column projection optimization for Lance backend to reduce data transfer
- Comprehensive error handling and validation for backend configurations
- Builder pattern methods for fluent configuration updates
- Integration with Daft's expression system and function registry
Dependency Updates:
- Apache Arrow dependencies upgraded from 54.2.1 to 55.2.0 across multiple crates
- Chrono updated to 0.4.39
- New dependencies added:
daft-ioandserde_jsonfor KV functionality
The architecture allows users to attach multiple KV stores with different backends to a session and switch between them dynamically, providing flexibility for different use cases. The implementation follows established Daft patterns for session-based resource management and maintains API consistency across the framework.
Confidence score: 2/5
- This PR introduces significant architectural changes but contains multiple critical issues that prevent safe production deployment
- Score reflects serious implementation gaps including placeholder code, extensive debug logging, missing function definitions, and incomplete integrations
- Pay close attention to all KV-related files, especially the Rust implementations in
src/daft-dsl/src/functions/kv/and Python bindings
Context used:
Rule - Import statements should be placed at the top of the file rather than inline within functions or methods. (link)
36 files reviewed, no comments
| let logical_type = match time_unit { | ||
| TimeUnit::Second => Some(PrimitiveLogicalType::Integer(IntegerType::Int64)), | ||
| TimeUnit::Millisecond => Some(PrimitiveLogicalType::Integer(IntegerType::Int64)), | ||
| TimeUnit::Microsecond => Some(PrimitiveLogicalType::Integer(IntegerType::Int64)), | ||
| TimeUnit::Nanosecond => Some(PrimitiveLogicalType::Integer(IntegerType::Int64)), | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: All Duration time units map to the same Integer(Int64) logical type, which may not preserve time unit information during deserialization. Consider using different logical type annotations or metadata to distinguish between Second, Millisecond, Microsecond, and Nanosecond units.
| // Try multiple debug approaches | ||
| std::fs::write( | ||
| "/tmp/rust_debug.log", | ||
| "DEBUG: KVGetWithConfig::call invoked\n", | ||
| ) | ||
| .ok(); | ||
| eprintln!("DEBUG: KVGetWithConfig::call invoked"); | ||
|
|
||
| // Also try writing to a different location | ||
| std::fs::write( | ||
| "/workspace/iris_59ecff5f-dd8f-4eb8-a92d-dca852586066/rust_debug.log", | ||
| "DEBUG: KVGetWithConfig::call invoked\n", | ||
| ) | ||
| .ok(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Debug logging writes to hardcoded filesystem paths including /tmp and workspace directories. This should be removed before production deployment as it creates security risks and filesystem pollution.
| if TYPE_CHECKING: | ||
| from daft.kv import KVConfig, LMDBConfig | ||
|
|
||
| from daft.kv import KVConfig, KVStore, LMDBConfig |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Remove redundant imports in TYPE_CHECKING block since the same classes are imported at runtime
| if TYPE_CHECKING: | |
| from daft.kv import KVConfig, LMDBConfig | |
| from daft.kv import KVConfig, KVStore, LMDBConfig | |
| from daft.kv import KVConfig, KVStore, LMDBConfig |
Context Used: Rule - Import statements should be placed at the top of the file rather than inline within functions or methods. (link)
daft/kv/lance.py
Outdated
| if TYPE_CHECKING: | ||
| from daft.io import IOConfig | ||
| from daft.kv import KVConfig, LanceConfig | ||
|
|
||
| from daft.kv import KVConfig, KVStore, LanceConfig |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: imports are duplicated between TYPE_CHECKING block and regular imports - LanceConfig and KVConfig appear in both
| if TYPE_CHECKING: | |
| from daft.io import IOConfig | |
| from daft.kv import KVConfig, LanceConfig | |
| from daft.kv import KVConfig, KVStore, LanceConfig | |
| if TYPE_CHECKING: | |
| from daft.io import IOConfig | |
| from daft.kv import KVConfig, LanceConfig | |
| from daft.kv import KVStore |
src/daft-ir/src/proto/functions.rs
Outdated
| fn to_proto(&self) -> ProtoResult<Self::Message> { | ||
| not_implemented_err!("kv_expr") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: The protobuf serialization is not implemented yet. This will cause runtime errors if KV expressions are serialized before the implementation is completed.
| from daft.expressions import lit | ||
| from daft.session import current_kv, get_kv |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
syntax: Import statements should be at module top per style guide
Context Used: Rule - Import statements should be placed at the top of the file rather than inline within functions or methods. (link)
| def kv_get( | ||
| row_ids: Expression | str, | ||
| columns: list[str] | str | None = None, | ||
| on_error: Literal["raise", "null"] = "raise", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: on_error parameter accepted but never used in implementation
daft/kv/__init__.py
Outdated
| # TODO: Implement proper IOConfig serialization when needed | ||
| config_dict["lance"]["io_config"] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: IOConfig serialization is incomplete and will cause issues when io_config is needed
tests/kv_store/test_session_kv.py
Outdated
| """Test that KV functions fail gracefully when no KV store is attached.""" | ||
| from daft.functions.kv import kv_batch_get, kv_exists, kv_get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: This import placement violates the repository's style guide. Import statements should be at the top of the file rather than inline within functions.
| """Test that KV functions fail gracefully when no KV store is attached.""" | |
| from daft.functions.kv import kv_batch_get, kv_exists, kv_get | |
| """Test that KV functions fail gracefully when no KV store is attached.""" | |
| # Import moved to top of file as per style guide |
Context Used: Rule - Import statements should be placed at the top of the file rather than inline within functions or methods. (link)
src/daft-dsl/src/lib.rs
Outdated
| parent.add_function(wrap_pyfunction!(python::kv_get_with_config, parent)?)?; | ||
| parent.add_function(wrap_pyfunction!(python::kv_batch_get_with_config, parent)?)?; | ||
| parent.add_function(wrap_pyfunction!(python::kv_exists_with_config, parent)?)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: These Python function bindings reference functions that don't appear to be implemented in the python module. The python::kv_get_with_config, python::kv_batch_get_with_config, and python::kv_exists_with_config functions need to be defined in src/daft-dsl/src/python.rs or this will cause compilation errors.
b1e63a3 to
b0983f1
Compare
c499123 to
20ab0cc
Compare
| } | ||
| } | ||
|
|
||
| impl FunctionEvaluator for LanceKVExpr { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use the newer ScalarFunction instead of FunctionEvaluator?
src/daft-dsl/src/functions/kv/mod.rs
Outdated
| } | ||
| } | ||
|
|
||
| impl FunctionEvaluator for KVExpr { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here. We should use ScalarFunction trait instead of FunctionEvaluator.
Theres a more detailed writeup here on how we want contributors to add new expressions.
| parent.add_fn(KVGetWithConfig); | ||
| parent.add_fn(KVBatchGetWithConfig); | ||
| parent.add_fn(KVExistsWithConfig); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since these are all *WithConfig, id suggest dropping the suffix.
| } | ||
|
|
||
| /// Initialize KV Store functions in the global function registry | ||
| pub fn register_kv_functions() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this appears unused
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@universalmind303 thank you for your review, I'll check it later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also retagged @rchowell for review as he reviewed the initial PR.
429d845 to
017c08b
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #5170 +/- ##
==========================================
- Coverage 75.31% 70.10% -5.22%
==========================================
Files 990 1000 +10
Lines 124917 127276 +2359
==========================================
- Hits 94083 89226 -4857
- Misses 30834 38050 +7216
🚀 New features to boost your workflow:
|
daft/functions/kv.py
Outdated
| source: str | None = None, | ||
| ) -> Expression: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| source: str | None = None, | |
| ) -> Expression: | |
| * | |
| store: str | None = None, | |
| ) -> Expression: |
Please consistently update the 'store' argument to all kv functions to be a named argument after all other expressions. This would look like,
def kv_put(key: Expression | str, value: Expression | Any, *, store: KvStoreLike | None = None): ...
KvStoreLike = KvStore | str # kv store instance or referenceIf the instance/reference is none, then you resolve the KvStore from the session.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Here, both
StrandNoneare still retained. The current interaction method is to firstset_kv(kv_store), - and then if
store_name (str)is not passed duringkv_put, the defaultkv_storein the session will be used; - if it is passed, it will be obtained from the session instead of directly using the kvstore object here.
daft/functions/kv.py
Outdated
| # Memory backend: handle get through Python UDF and return Struct aggregated across attached stores | ||
| if hasattr(kv_store, "backend_type") and kv_store.backend_type == "memory": | ||
| from daft.session import list_kv |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to not have the implementation be aware of any particular implementation? I do not follow right now why the memory store is special-cased.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sense, have removed it
daft/session.py
Outdated
| # Keep Python-side registry for aggregation use-cases | ||
| self._kv_registry[a] = k |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why specifically do we need special python registration of kv-stores?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's useless. Remove it
| // Create Lance KV expression with extracted parameters | ||
| let lance_expr = LanceKVExpr::batch_get( | ||
| uri_str, | ||
| columns_list, | ||
| final_batch_size, | ||
| on_error_str, | ||
| io_config_opt, | ||
| ); | ||
|
|
||
| // Directly call the Lance implementation | ||
| match &lance_expr { | ||
| LanceKVExpr::BatchGet { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean that kv_batch_get_with_config is hardcoded to Lance for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, lance kvstore will be brought up next pr.
This PR only uses a standalone version of memstore to verify the process.
09c0f35 to
0676ed1
Compare
0676ed1 to
0b0fc47
Compare
|
@rchowell I fixed your comment. Please take a look when you have time. |
|
@Jay-ju - Thanks for the work on this. Checking to see if you are able to push this through. |
This implements the multi-KV backend architecture discussed in PR feedback, enabling support for different storage backends (Lance for AI/ML workloads, LMDB for high-performance caching) through a unified configuration interface.
Changes Made
Related Issues
Checklist
docs/mkdocs.ymlnavigation