-
Couldn't load subscription status.
- Fork 1.9k
[ENH][s3heap] implement gRPC endpoints for heap operations #5624
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Reviewer ChecklistPlease leverage this checklist to ensure your code review is thorough before approving Testing, Bugs, Errors, Logs, Documentation
System Compatibility
Quality
|
| /// Convert proto Triggerable to s3heap Triggerable. | ||
| pub fn triggerable_from_proto( | ||
| proto: chroma_proto::Triggerable, | ||
| ) -> Result<Triggerable, ConversionError> { | ||
| let partitioning_uuid = Uuid::parse_str(&proto.partitioning_uuid) | ||
| .map_err(|e| ConversionError(format!("invalid partitioning_uuid: {}", e)))?; | ||
| let scheduling_uuid = Uuid::parse_str(&proto.scheduling_uuid) | ||
| .map_err(|e| ConversionError(format!("invalid scheduling_uuid: {}", e)))?; | ||
| Ok(Triggerable { | ||
| partitioning: partitioning_uuid.into(), | ||
| scheduling: scheduling_uuid.into(), | ||
| }) | ||
| } | ||
|
|
||
| /// Convert s3heap Triggerable to proto Triggerable. | ||
| pub fn triggerable_to_proto(triggerable: Triggerable) -> chroma_proto::Triggerable { | ||
| chroma_proto::Triggerable { | ||
| partitioning_uuid: triggerable.partitioning.to_string(), | ||
| scheduling_uuid: triggerable.scheduling.to_string(), | ||
| } | ||
| } | ||
|
|
||
| /// Convert proto Schedule to s3heap Schedule. | ||
| pub fn schedule_from_proto(proto: chroma_proto::Schedule) -> Result<Schedule, ConversionError> { | ||
| let triggerable = proto | ||
| .triggerable | ||
| .ok_or_else(|| ConversionError("missing triggerable".to_string())) | ||
| .and_then(triggerable_from_proto)?; | ||
| let next_scheduled = proto | ||
| .next_scheduled | ||
| .ok_or_else(|| ConversionError("missing next_scheduled".to_string()))?; | ||
| let next_scheduled = DateTime::from_timestamp( | ||
| next_scheduled.seconds, | ||
| next_scheduled.nanos.try_into().unwrap_or(0), | ||
| ) | ||
| .ok_or_else(|| ConversionError("invalid next_scheduled timestamp".to_string()))?; | ||
| let nonce = Uuid::parse_str(&proto.nonce) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
Same timestamp parsing issue appears multiple times throughout the code. The pattern timestamp.nanos.try_into().unwrap_or(0) silently converts invalid nanosecond values to 0, which could cause:
- Scheduling tasks at wrong times
- Pruning wrong time buckets
- Inconsistent time comparisons
This affects lines 59, 95, 110, 608, 168, 176 in the conversions and service methods.
Context for Agents
[**CriticalError**]
Same timestamp parsing issue appears multiple times throughout the code. The pattern `timestamp.nanos.try_into().unwrap_or(0)` silently converts invalid nanosecond values to 0, which could cause:
1. Scheduling tasks at wrong times
2. Pruning wrong time buckets
3. Inconsistent time comparisons
This affects lines 59, 95, 110, 608, 168, 176 in the conversions and service methods.
File: rust/s3heap-service/src/lib.rs
Line: 95Add comprehensive gRPC API for heap management operations: - Push: manually add schedules to the heap - Peek: query heap items with filtering by UUID and time - Prune: remove completed tasks with configurable limits - PruneBucket: prune a specific time bucket - ListBuckets: enumerate time buckets in the heap - Summary: retrieve heap statistics Extend heapservice.proto with message types for all operations, including Triggerable, Schedule, HeapItem, Limits, PruneStats, and FilterCriteria definitions. Implement conversion functions between proto messages and s3heap types with proper error handling for UUID parsing and timestamp conversion. Add comprehensive unit tests for all conversions. Update HeapTender to include HeapReader and HeapPruner alongside existing HeapWriter to support read and pruning operations. Add time_cut_off field to Limits struct to enable time-based filtering. Implement list_buckets() method on HeapReader with configurable limit (default 1000). Fix Limits to use Clone instead of Copy since it now contains non-Copy DateTime field. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: Claude <[email protected]>
49a8ece to
89dbc51
Compare
|
Add gRPC APIs and Structural Enhancements for Heap Operations This PR introduces comprehensive gRPC endpoints to manage heap operations in the Key Changes• Expanded Affected Areas• This summary was automatically generated by @propel-code-bot |
| let limits: s3heap::Limits = request | ||
| .into_inner() | ||
| .limits | ||
| .ok_or_else(|| Status::invalid_argument("missing limits")) | ||
| .and_then(|l| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
Integer overflow in type conversion: timestamp.nanos.try_into().unwrap_or(0) could cause data corruption if nanos is negative (which is valid for timestamps before 1970). The try_into() will fail for negative i32 values when converting to u32, silently defaulting to 0 nanoseconds.
// Fix: Handle negative nanos properly
let nanos = if timestamp.nanos < 0 {
0u32
} else {
timestamp.nanos as u32
};
let bucket = DateTime::from_timestamp(timestamp.seconds, nanos)Context for Agents
[**CriticalError**]
Integer overflow in type conversion: `timestamp.nanos.try_into().unwrap_or(0)` could cause data corruption if `nanos` is negative (which is valid for timestamps before 1970). The `try_into()` will fail for negative i32 values when converting to u32, silently defaulting to 0 nanoseconds.
```rust
// Fix: Handle negative nanos properly
let nanos = if timestamp.nanos < 0 {
0u32
} else {
timestamp.nanos as u32
};
let bucket = DateTime::from_timestamp(timestamp.seconds, nanos)
```
File: rust/s3heap-service/src/lib.rs
Line: 584| pub fn prune_stats_to_proto(stats: PruneStats) -> chroma_proto::PruneStats { | ||
| chroma_proto::PruneStats { | ||
| items_pruned: stats.items_pruned as u32, | ||
| items_retained: stats.items_retained as u32, | ||
| buckets_deleted: stats.buckets_deleted as u32, | ||
| buckets_updated: stats.buckets_updated as u32, | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
The as u32 casts for PruneStats fields can cause silent truncation on 64-bit systems if the number of items exceeds u32::MAX. This could lead to clients receiving incorrect statistics without any error. To prevent this data loss, it's safer to use try_into() and return a Result.
Suggested Change
| pub fn prune_stats_to_proto(stats: PruneStats) -> chroma_proto::PruneStats { | |
| chroma_proto::PruneStats { | |
| items_pruned: stats.items_pruned as u32, | |
| items_retained: stats.items_retained as u32, | |
| buckets_deleted: stats.buckets_deleted as u32, | |
| buckets_updated: stats.buckets_updated as u32, | |
| } | |
| } | |
| pub fn prune_stats_to_proto(stats: PruneStats) -> Result<chroma_proto::PruneStats, ConversionError> { | |
| Ok(chroma_proto::PruneStats { | |
| items_pruned: stats.items_pruned.try_into().map_err(|e| ConversionError(format!("items_pruned overflow: {}", e)))?, | |
| items_retained: stats.items_retained.try_into().map_err(|e| ConversionError(format!("items_retained overflow: {}", e)))?, | |
| buckets_deleted: stats.buckets_deleted.try_into().map_err(|e| ConversionError(format!("buckets_deleted overflow: {}", e)))?, | |
| buckets_updated: stats.buckets_updated.try_into().map_err(|e| ConversionError(format!("buckets_updated overflow: {}", e)))?, | |
| }) | |
| } |
You will also need to update the call sites in prune and prune_bucket to handle the new Result type.
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
Context for Agents
[**BestPractice**]
The `as u32` casts for `PruneStats` fields can cause silent truncation on 64-bit systems if the number of items exceeds `u32::MAX`. This could lead to clients receiving incorrect statistics without any error. To prevent this data loss, it's safer to use `try_into()` and return a `Result`.
<details>
<summary>Suggested Change</summary>
```suggestion
pub fn prune_stats_to_proto(stats: PruneStats) -> Result<chroma_proto::PruneStats, ConversionError> {
Ok(chroma_proto::PruneStats {
items_pruned: stats.items_pruned.try_into().map_err(|e| ConversionError(format!("items_pruned overflow: {}", e)))?,
items_retained: stats.items_retained.try_into().map_err(|e| ConversionError(format!("items_retained overflow: {}", e)))?,
buckets_deleted: stats.buckets_deleted.try_into().map_err(|e| ConversionError(format!("buckets_deleted overflow: {}", e)))?,
buckets_updated: stats.buckets_updated.try_into().map_err(|e| ConversionError(format!("buckets_updated overflow: {}", e)))?,
})
}
```
You will also need to update the call sites in `prune` and `prune_bucket` to handle the new `Result` type.
⚡ **Committable suggestion**
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
</details>
File: rust/s3heap-service/src/lib.rs
Line: 150
Description of changes
Add comprehensive gRPC API for heap management operations:
Extend heapservice.proto with message types for all operations,
including Triggerable, Schedule, HeapItem, Limits, PruneStats,
and FilterCriteria definitions.
Implement conversion functions between proto messages and s3heap
types with proper error handling for UUID parsing and timestamp
conversion. Add comprehensive unit tests for all conversions.
Update HeapTender to include HeapReader and HeapPruner alongside
existing HeapWriter to support read and pruning operations.
Add time_cut_off field to Limits struct to enable time-based
filtering. Implement list_buckets() method on HeapReader with
configurable limit (default 1000).
Fix Limits to use Clone instead of Copy since it now contains
non-Copy DateTime field.
🤖 Generated with Claude Code
Test plan
Tests added => CI
Migration plan
N/A
Observability plan
N/A
Documentation Changes
N/A
Co-authored-by: Claude [email protected]