diff --git a/Cargo.lock b/Cargo.lock index 272e4c10e18..f83fb747f73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7169,6 +7169,7 @@ dependencies = [ "chrono", "figment", "futures", + "prost-types 0.13.5", "s3heap", "serde", "serde_json", diff --git a/idl/chromadb/proto/heapservice.proto b/idl/chromadb/proto/heapservice.proto index e5237a840b5..3b840aec9b3 100644 --- a/idl/chromadb/proto/heapservice.proto +++ b/idl/chromadb/proto/heapservice.proto @@ -3,10 +3,116 @@ syntax = "proto3"; package chroma; import "chromadb/proto/chroma.proto"; +import "google/protobuf/timestamp.proto"; +// A task that can be scheduled and triggered in the heap. +message Triggerable { + string partitioning_uuid = 1; + string scheduling_uuid = 2; +} + +// A scheduled task with its next execution time and unique identifier. +message Schedule { + Triggerable triggerable = 1; + google.protobuf.Timestamp next_scheduled = 2; + string nonce = 3; +} + +// A heap item with its scheduled time. +message HeapItem { + Triggerable triggerable = 1; + string nonce = 2; + google.protobuf.Timestamp scheduled_time = 3; +} + +// Limits on range-scan-backed operations. +message Limits { + optional uint32 buckets_to_read = 1; + optional uint32 max_items = 2; + optional google.protobuf.Timestamp time_cut_off = 3; +} + +// Statistics from a pruning operation. +message PruneStats { + uint32 items_pruned = 1; + uint32 items_retained = 2; + uint32 buckets_deleted = 3; + uint32 buckets_updated = 4; +} + +// Filter criteria for querying heap items. +message FilterCriteria { + optional string partitioning_uuid = 1; + optional string scheduling_uuid = 2; +} + +// Request to manually add schedules to the heap. +message PushRequest { + repeated Schedule schedules = 1; +} + +// Response from pushing schedules. +message PushResponse { + uint32 schedules_added = 1; +} + +// Request to query heap items with filters. +message PeekRequest { + Limits limits = 1; + optional FilterCriteria filter = 2; +} + +// Response containing heap items. +message PeekResponse { + repeated HeapItem items = 1; +} + +// Request to prune completed tasks. +message PruneRequest { + Limits limits = 1; +} + +// Response from pruning operation. +message PruneResponse { + PruneStats stats = 1; +} + +// Request to prune a specific bucket. +message PruneBucketRequest { + google.protobuf.Timestamp bucket = 1; +} + +// Response from bucket pruning operation. +message PruneBucketResponse { + PruneStats stats = 1; +} + +// Request to list buckets in the heap. +message ListBucketsRequest { + optional uint32 max_buckets = 1; +} + +// Response containing bucket timestamps. +message ListBucketsResponse { + repeated google.protobuf.Timestamp buckets = 1; +} + +// Request for heap summary statistics. message HeapSummaryRequest {} -message HeapSummaryResponse {} + +// Response with heap summary statistics. +message HeapSummaryResponse { + uint32 total_items = 1; + optional google.protobuf.Timestamp oldest_bucket = 2; + optional google.protobuf.Timestamp newest_bucket = 3; + uint32 bucket_count = 4; +} service HeapTenderService { + rpc Push(PushRequest) returns (PushResponse) {} + rpc Peek(PeekRequest) returns (PeekResponse) {} + rpc Prune(PruneRequest) returns (PruneResponse) {} + rpc PruneBucket(PruneBucketRequest) returns (PruneBucketResponse) {} + rpc ListBuckets(ListBucketsRequest) returns (ListBucketsResponse) {} rpc Summary(HeapSummaryRequest) returns (HeapSummaryResponse) {} } diff --git a/rust/s3heap-service/Cargo.toml b/rust/s3heap-service/Cargo.toml index d67f59bcf29..314d21e844f 100644 --- a/rust/s3heap-service/Cargo.toml +++ b/rust/s3heap-service/Cargo.toml @@ -8,6 +8,7 @@ async-trait = { workspace = true } chrono = { workspace = true } figment = { workspace = true } futures = { workspace = true } +prost-types = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } diff --git a/rust/s3heap-service/src/lib.rs b/rust/s3heap-service/src/lib.rs index 5ebaba74187..827b2e4835d 100644 --- a/rust/s3heap-service/src/lib.rs +++ b/rust/s3heap-service/src/lib.rs @@ -20,7 +20,11 @@ use chroma_types::chroma_proto::heap_tender_service_server::{ }; use chroma_types::chroma_proto::{HeapSummaryRequest, HeapSummaryResponse}; use chroma_types::{dirty_log_path_from_hostname, CollectionUuid, DirtyMarker, ScheduleEntry}; -use s3heap::{heap_path_from_hostname, Configuration, HeapWriter, Schedule, Triggerable}; +use chrono::{DateTime, Utc}; +use s3heap::{ + heap_path_from_hostname, Configuration, HeapPruner, HeapReader, HeapWriter, Schedule, + Triggerable, +}; use wal3::{ Cursor, CursorName, CursorStore, CursorStoreOptions, LogPosition, LogReader, LogReaderOptions, Witness, @@ -30,6 +34,122 @@ mod scheduler; pub use scheduler::SysDbScheduler; +//////////////////////////////////////////// conversions /////////////////////////////////////////// + +/// Error type for conversion failures. +#[derive(Debug)] +pub struct ConversionError(pub String); + +impl std::fmt::Display for ConversionError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "conversion error: {}", self.0) + } +} + +impl std::error::Error for ConversionError {} + +mod conversions { + use super::ConversionError; + use chroma_types::chroma_proto; + use chrono::{DateTime, Utc}; + use prost_types::Timestamp; + use s3heap::{HeapItem, Limits, PruneStats, Schedule, Triggerable}; + use uuid::Uuid; + + /// Convert proto Triggerable to s3heap Triggerable. + pub fn triggerable_from_proto( + proto: chroma_proto::Triggerable, + ) -> Result { + let partitioning_uuid = Uuid::parse_str(&proto.partitioning_uuid) + .map_err(|e| ConversionError(format!("invalid partitioning_uuid: {}", e)))?; + let scheduling_uuid = Uuid::parse_str(&proto.scheduling_uuid) + .map_err(|e| ConversionError(format!("invalid scheduling_uuid: {}", e)))?; + Ok(Triggerable { + partitioning: partitioning_uuid.into(), + scheduling: scheduling_uuid.into(), + }) + } + + /// Convert s3heap Triggerable to proto Triggerable. + pub fn triggerable_to_proto(triggerable: Triggerable) -> chroma_proto::Triggerable { + chroma_proto::Triggerable { + partitioning_uuid: triggerable.partitioning.to_string(), + scheduling_uuid: triggerable.scheduling.to_string(), + } + } + + /// Convert proto Schedule to s3heap Schedule. + pub fn schedule_from_proto(proto: chroma_proto::Schedule) -> Result { + let triggerable = proto + .triggerable + .ok_or_else(|| ConversionError("missing triggerable".to_string())) + .and_then(triggerable_from_proto)?; + let next_scheduled = proto + .next_scheduled + .ok_or_else(|| ConversionError("missing next_scheduled".to_string()))?; + let next_scheduled = DateTime::from_timestamp( + next_scheduled.seconds, + next_scheduled.nanos.try_into().map_err(|_| { + ConversionError("invalid nanos value in next_scheduled".to_string()) + })?, + ) + .ok_or_else(|| ConversionError("invalid next_scheduled timestamp".to_string()))?; + let nonce = Uuid::parse_str(&proto.nonce) + .map_err(|e| ConversionError(format!("invalid nonce: {}", e)))?; + Ok(Schedule { + triggerable, + next_scheduled, + nonce, + }) + } + + /// Convert s3heap HeapItem with bucket time to proto HeapItem. + pub fn heap_item_to_proto( + scheduled_time: DateTime, + item: HeapItem, + ) -> chroma_proto::HeapItem { + chroma_proto::HeapItem { + triggerable: Some(triggerable_to_proto(item.trigger)), + nonce: item.nonce.to_string(), + scheduled_time: Some(Timestamp { + seconds: scheduled_time.timestamp(), + nanos: scheduled_time.timestamp_subsec_nanos() as i32, + }), + } + } + + /// Convert proto Limits to s3heap Limits. + pub fn limits_from_proto(proto: chroma_proto::Limits) -> Result { + let buckets_to_read = proto.buckets_to_read.map(|v| v as usize); + let max_items = proto.max_items.map(|v| v as usize); + let time_cut_off = proto + .time_cut_off + .map(|ts| { + let nanos = ts.nanos.try_into().map_err(|_| { + ConversionError("invalid nanos value in time_cut_off".to_string()) + })?; + DateTime::from_timestamp(ts.seconds, nanos) + .ok_or_else(|| ConversionError("invalid time_cut_off timestamp".to_string())) + }) + .transpose()?; + Ok(Limits { + buckets_to_read, + max_items, + time_cut_off, + }) + } + + /// Convert s3heap PruneStats to proto PruneStats. + pub fn prune_stats_to_proto(stats: PruneStats) -> chroma_proto::PruneStats { + chroma_proto::PruneStats { + items_pruned: stats.items_pruned as u32, + items_retained: stats.items_retained as u32, + buckets_deleted: stats.buckets_deleted as u32, + buckets_updated: stats.buckets_updated as u32, + } + } +} + /////////////////////////////////////////////// Error ////////////////////////////////////////////// /// Custom error type that can handle errors from multiple sources. @@ -104,16 +224,27 @@ pub struct HeapTender { reader: LogReader, cursor: CursorStore, writer: HeapWriter, + heap_reader: HeapReader, + heap_pruner: HeapPruner, } impl HeapTender { /// Creates a new HeapTender. - pub fn new(sysdb: SysDb, reader: LogReader, cursor: CursorStore, writer: HeapWriter) -> Self { + pub fn new( + sysdb: SysDb, + reader: LogReader, + cursor: CursorStore, + writer: HeapWriter, + heap_reader: HeapReader, + heap_pruner: HeapPruner, + ) -> Self { Self { sysdb, reader, cursor, writer, + heap_reader, + heap_pruner, } } @@ -350,14 +481,22 @@ impl Configurable for HeapTenderServer { ); let heap_prefix = heap_path_from_hostname(&config.my_member_id); let scheduler = Arc::new(SysDbScheduler::new(sysdb.clone())) as _; - let writer = HeapWriter::new(storage, heap_prefix, Arc::clone(&scheduler)) + let writer = HeapWriter::new(storage.clone(), heap_prefix.clone(), Arc::clone(&scheduler)) .await .map_err(|e| -> Box { Box::new(e) })?; + let heap_reader = + HeapReader::new(storage.clone(), heap_prefix.clone(), Arc::clone(&scheduler)) + .await + .map_err(|e| -> Box { Box::new(e) })?; + let heap_pruner = HeapPruner::new(storage, heap_prefix, Arc::clone(&scheduler)) + .map_err(|e| -> Box { Box::new(e) })?; let tender = Arc::new(HeapTender { sysdb, reader, cursor, writer, + heap_reader, + heap_pruner, }); Ok(Self { config: config.clone(), @@ -368,11 +507,198 @@ impl Configurable for HeapTenderServer { #[async_trait::async_trait] impl HeapTenderService for HeapTenderServer { + async fn push( + &self, + request: Request, + ) -> Result, Status> { + let schedules: Vec = request + .into_inner() + .schedules + .into_iter() + .map(conversions::schedule_from_proto) + .collect::, ConversionError>>() + .map_err(|e| Status::invalid_argument(e.to_string()))?; + + let count = schedules.len(); + let count_u32 = count + .try_into() + .map_err(|_| Status::invalid_argument("too many schedules to push"))?; + self.tender + .writer + .push(&schedules) + .await + .map_err(|e| Status::internal(e.to_string()))?; + + Ok(Response::new(chroma_types::chroma_proto::PushResponse { + schedules_added: count_u32, + })) + } + + async fn peek( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let limits: s3heap::Limits = req + .limits + .ok_or_else(|| Status::invalid_argument("missing limits")) + .and_then(|l| { + conversions::limits_from_proto(l) + .map_err(|e| Status::invalid_argument(e.to_string())) + })?; + + let filter = req.filter; + let filter_fn = move |triggerable: &Triggerable, _: DateTime| { + if let Some(ref f) = filter { + if let Some(ref partitioning_uuid) = f.partitioning_uuid { + if triggerable.partitioning.to_string() != *partitioning_uuid { + return false; + } + } + if let Some(ref scheduling_uuid) = f.scheduling_uuid { + if triggerable.scheduling.to_string() != *scheduling_uuid { + return false; + } + } + } + true + }; + + let items = self + .tender + .heap_reader + .peek(filter_fn, limits) + .await + .map_err(|e| Status::internal(e.to_string()))?; + + let proto_items: Vec = items + .into_iter() + .map(|(dt, item)| conversions::heap_item_to_proto(dt, item)) + .collect(); + + Ok(Response::new(chroma_types::chroma_proto::PeekResponse { + items: proto_items, + })) + } + + async fn prune( + &self, + request: Request, + ) -> Result, Status> { + let limits: s3heap::Limits = request + .into_inner() + .limits + .ok_or_else(|| Status::invalid_argument("missing limits")) + .and_then(|l| { + conversions::limits_from_proto(l) + .map_err(|e| Status::invalid_argument(e.to_string())) + })?; + + let stats = self + .tender + .heap_pruner + .prune(limits) + .await + .map_err(|e| Status::internal(e.to_string()))?; + + Ok(Response::new(chroma_types::chroma_proto::PruneResponse { + stats: Some(conversions::prune_stats_to_proto(stats)), + })) + } + + async fn prune_bucket( + &self, + request: Request, + ) -> Result, Status> { + let timestamp = request + .into_inner() + .bucket + .ok_or_else(|| Status::invalid_argument("missing bucket timestamp"))?; + + let bucket = + DateTime::from_timestamp(timestamp.seconds, timestamp.nanos.try_into().unwrap_or(0)) + .ok_or_else(|| Status::invalid_argument("invalid bucket timestamp"))?; + + let stats = self + .tender + .heap_pruner + .prune_bucket(bucket) + .await + .map_err(|e| Status::internal(e.to_string()))?; + + Ok(Response::new( + chroma_types::chroma_proto::PruneBucketResponse { + stats: Some(conversions::prune_stats_to_proto(stats)), + }, + )) + } + + async fn list_buckets( + &self, + request: Request, + ) -> Result, Status> { + let max_buckets = request.into_inner().max_buckets.map(|v| v as usize); + + let buckets = self + .tender + .heap_reader + .list_buckets(max_buckets) + .await + .map_err(|e| Status::internal(e.to_string()))?; + + let proto_buckets: Vec = buckets + .into_iter() + .map(|dt| prost_types::Timestamp { + seconds: dt.timestamp(), + nanos: dt.timestamp_subsec_nanos() as i32, + }) + .collect(); + + Ok(Response::new( + chroma_types::chroma_proto::ListBucketsResponse { + buckets: proto_buckets, + }, + )) + } + async fn summary( &self, _request: Request, ) -> Result, Status> { - todo!(); + let buckets = self + .tender + .heap_reader + .list_buckets(None) + .await + .map_err(|e| Status::internal(e.to_string()))?; + + let bucket_count = buckets.len() as u32; + let oldest_bucket = buckets.first().map(|dt| prost_types::Timestamp { + seconds: dt.timestamp(), + nanos: dt.timestamp_subsec_nanos() as i32, + }); + let newest_bucket = buckets.last().map(|dt| prost_types::Timestamp { + seconds: dt.timestamp(), + nanos: dt.timestamp_subsec_nanos() as i32, + }); + + let items = self + .tender + .heap_reader + .peek( + |_, _| true, + s3heap::Limits::default().with_time_cut_off(Utc::now()), + ) + .await + .map_err(|e| Status::internal(e.to_string()))?; + let total_items = items.len() as u32; + + Ok(Response::new(HeapSummaryResponse { + total_items, + oldest_bucket, + newest_bucket, + bucket_count, + })) } } @@ -635,3 +961,156 @@ pub async fn entrypoint() { } } } + +/////////////////////////////////////////////// tests ////////////////////////////////////////////// + +#[cfg(test)] +mod tests { + use super::*; + use chroma_types::chroma_proto; + use chrono::TimeZone; + use s3heap::{HeapItem, Limits, PruneStats, Triggerable}; + use uuid::Uuid; + + #[test] + fn triggerable_round_trip() { + let partitioning_uuid = Uuid::new_v4(); + let scheduling_uuid = Uuid::new_v4(); + + let original = Triggerable { + partitioning: partitioning_uuid.into(), + scheduling: scheduling_uuid.into(), + }; + + let proto = conversions::triggerable_to_proto(original); + let recovered = conversions::triggerable_from_proto(proto).unwrap(); + + assert_eq!(original, recovered); + } + + #[test] + fn schedule_round_trip() { + let partitioning_uuid = Uuid::new_v4(); + let scheduling_uuid = Uuid::new_v4(); + let nonce = Uuid::new_v4(); + let next_scheduled = Utc.with_ymd_and_hms(2024, 3, 15, 14, 30, 0).unwrap(); + + let original = Schedule { + triggerable: Triggerable { + partitioning: partitioning_uuid.into(), + scheduling: scheduling_uuid.into(), + }, + next_scheduled, + nonce, + }; + + let proto = chroma_proto::Schedule { + triggerable: Some(conversions::triggerable_to_proto(original.triggerable)), + next_scheduled: Some(prost_types::Timestamp { + seconds: next_scheduled.timestamp(), + nanos: next_scheduled.timestamp_subsec_nanos() as i32, + }), + nonce: nonce.to_string(), + }; + let recovered = conversions::schedule_from_proto(proto).unwrap(); + + assert_eq!(original.triggerable, recovered.triggerable); + assert_eq!(original.nonce, recovered.nonce); + assert_eq!(original.next_scheduled, recovered.next_scheduled); + } + + #[test] + fn heap_item_round_trip() { + let partitioning_uuid = Uuid::new_v4(); + let scheduling_uuid = Uuid::new_v4(); + let nonce = Uuid::new_v4(); + let scheduled_time = Utc.with_ymd_and_hms(2024, 3, 15, 14, 30, 0).unwrap(); + + let original_item = HeapItem { + trigger: Triggerable { + partitioning: partitioning_uuid.into(), + scheduling: scheduling_uuid.into(), + }, + nonce, + }; + + let proto = conversions::heap_item_to_proto(scheduled_time, original_item.clone()); + + assert_eq!( + proto.triggerable.as_ref().unwrap().partitioning_uuid, + partitioning_uuid.to_string() + ); + assert_eq!( + proto.triggerable.as_ref().unwrap().scheduling_uuid, + scheduling_uuid.to_string() + ); + assert_eq!(proto.nonce, nonce.to_string()); + assert_eq!( + proto.scheduled_time.as_ref().unwrap().seconds, + scheduled_time.timestamp() + ); + assert_eq!( + proto.scheduled_time.as_ref().unwrap().nanos, + scheduled_time.timestamp_subsec_nanos() as i32 + ); + } + + #[test] + fn limits_round_trip() { + let original = Limits { + buckets_to_read: Some(100), + max_items: Some(50), + time_cut_off: Some(Utc.with_ymd_and_hms(2024, 3, 15, 14, 30, 0).unwrap()), + }; + + let proto = chroma_proto::Limits { + buckets_to_read: original.buckets_to_read.map(|v| v as u32), + max_items: original.max_items.map(|v| v as u32), + time_cut_off: original.time_cut_off.map(|dt| prost_types::Timestamp { + seconds: dt.timestamp(), + nanos: dt.timestamp_subsec_nanos() as i32, + }), + }; + let recovered = conversions::limits_from_proto(proto).unwrap(); + + assert_eq!(original.buckets_to_read, recovered.buckets_to_read); + assert_eq!(original.max_items, recovered.max_items); + assert_eq!(original.time_cut_off, recovered.time_cut_off); + } + + #[test] + fn limits_round_trip_with_none() { + let original = Limits { + buckets_to_read: None, + max_items: None, + time_cut_off: None, + }; + + let proto = chroma_proto::Limits { + buckets_to_read: None, + max_items: None, + time_cut_off: None, + }; + let recovered = conversions::limits_from_proto(proto).unwrap(); + + assert_eq!(original.buckets_to_read, recovered.buckets_to_read); + assert_eq!(original.max_items, recovered.max_items); + assert_eq!(original.time_cut_off, recovered.time_cut_off); + } + + #[test] + fn prune_stats_round_trip() { + let original = PruneStats { + items_pruned: 42, + items_retained: 100, + buckets_deleted: 5, + buckets_updated: 10, + }; + + let proto = conversions::prune_stats_to_proto(original.clone()); + assert_eq!(proto.items_pruned, 42); + assert_eq!(proto.items_retained, 100); + assert_eq!(proto.buckets_deleted, 5); + assert_eq!(proto.buckets_updated, 10); + } +} diff --git a/rust/s3heap-service/tests/test_k8s_integration_00_heap_tender.rs b/rust/s3heap-service/tests/test_k8s_integration_00_heap_tender.rs index ab1586c334c..4cbf26c7a54 100644 --- a/rust/s3heap-service/tests/test_k8s_integration_00_heap_tender.rs +++ b/rust/s3heap-service/tests/test_k8s_integration_00_heap_tender.rs @@ -5,7 +5,7 @@ use chroma_sysdb::{SysDb, TestSysDb}; use chroma_types::{CollectionUuid, DirtyMarker}; use wal3::{CursorStore, CursorStoreOptions, LogPosition, LogReader, LogReaderOptions}; -use s3heap::HeapWriter; +use s3heap::{HeapPruner, HeapReader, HeapWriter}; use s3heap_service::{HeapTender, HEAP_TENDER_CURSOR_NAME}; // Dummy scheduler for testing purposes @@ -52,10 +52,23 @@ async fn create_heap_tender( "test-tender".to_string(), ); let scheduler = Arc::new(DummyScheduler) as _; - let writer = HeapWriter::new(storage, heap_prefix.to_string(), Arc::clone(&scheduler)) - .await - .unwrap(); - HeapTender::new(sysdb, reader, cursor, writer) + let writer = HeapWriter::new( + storage.clone(), + heap_prefix.to_string(), + Arc::clone(&scheduler), + ) + .await + .unwrap(); + let heap_reader = HeapReader::new( + storage.clone(), + heap_prefix.to_string(), + Arc::clone(&scheduler), + ) + .await + .unwrap(); + let heap_pruner = + HeapPruner::new(storage, heap_prefix.to_string(), Arc::clone(&scheduler)).unwrap(); + HeapTender::new(sysdb, reader, cursor, writer, heap_reader, heap_pruner) } #[tokio::test] diff --git a/rust/s3heap/src/lib.rs b/rust/s3heap/src/lib.rs index 71adbe619a4..67d9d49ea3c 100644 --- a/rust/s3heap/src/lib.rs +++ b/rust/s3heap/src/lib.rs @@ -310,7 +310,7 @@ impl RetryConfig { /// // Create custom limits /// let custom_limits = Limits::default().with_buckets(100); /// ``` -#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)] +#[derive(Clone, Debug, Default, PartialEq, Eq)] pub struct Limits { /// Maximum number of buckets to read during a scan operation. /// If None, defaults to 1000 buckets. @@ -318,7 +318,8 @@ pub struct Limits { /// Maximum number of items to return. /// If None, returns all items found within bucket limits. pub max_items: Option, - /// Cut-off time: Do not read items after this cut-off. + /// Cut-off time for filtering items. + /// If Some, only items scheduled before this time will be processed. pub time_cut_off: Option>, } @@ -1260,6 +1261,52 @@ impl HeapReader { Ok(returns) } + + /// List time buckets in the heap. + /// + /// Returns up to max_buckets bucket timestamps in chronological order. + /// Each bucket corresponds to a one-minute window of scheduled tasks. + /// + /// # Arguments + /// + /// * `max_buckets` - Maximum number of buckets to return (default: 1000, max: 1000) + /// + /// # Returns + /// + /// A vector of bucket timestamps in chronological order + /// + /// # Errors + /// + /// - [`Error::InvalidArgument`] if max_buckets exceeds 1000 + /// - [`Error::Storage`] if S3 operations fail + /// - [`Error::Internal`] if bucket paths have unexpected format + /// - [`Error::ParseDate`] if bucket timestamps cannot be parsed + /// + /// # Examples + /// + /// ```ignore + /// use s3heap::HeapReader; + /// + /// // Get the first 100 buckets + /// let buckets = reader.list_buckets(Some(100)).await?; + /// + /// // Get all buckets (up to 1000) + /// let all_buckets = reader.list_buckets(None).await?; + /// ``` + pub async fn list_buckets( + &self, + max_buckets: Option, + ) -> Result>, Error> { + let limit = max_buckets.unwrap_or(1000); + if limit > 1000 { + return Err(Error::Internal(format!( + "max_buckets cannot exceed 1000, got {}", + limit + ))); + } + let all = self.internal.list_approx_first_1k_buckets().await?; + Ok(all.into_iter().take(limit).collect()) + } } /// Validate that a prefix meets the requirements for heap operations. diff --git a/rust/s3heap/tests/test_unit_tests.rs b/rust/s3heap/tests/test_unit_tests.rs index da3a131bfbb..f90c4b90d68 100644 --- a/rust/s3heap/tests/test_unit_tests.rs +++ b/rust/s3heap/tests/test_unit_tests.rs @@ -208,15 +208,15 @@ fn limits_equality() { } #[test] -fn limits_copy() { +fn limits_clone() { let original = Limits { buckets_to_read: Some(500), max_items: None, time_cut_off: None, }; - let copied = original; - assert_eq!(original, copied); - assert_eq!(copied.buckets_to_read, Some(500)); + let cloned = original.clone(); + assert_eq!(original, cloned); + assert_eq!(cloned.buckets_to_read, Some(500)); } // Tests for Triggerable diff --git a/rust/worker/src/compactor/tasks.rs b/rust/worker/src/compactor/tasks.rs index ecf4f48beac..f3145837f29 100644 --- a/rust/worker/src/compactor/tasks.rs +++ b/rust/worker/src/compactor/tasks.rs @@ -66,7 +66,7 @@ impl TaskHeapReader { } }; - match reader.peek(|_, _| true, limits).await { + match reader.peek(|_, _| true, limits.clone()).await { Ok(items) => { tracing::trace!("Found {} tasks in {}", items.len(), heap_prefix); for (bucket, item) in items {