Skip to content

Commit 0e0994a

Browse files
[ENH][log] Cache the manifest and etag. (#5368)
## Description of changes Prior to this commit, the code cached just the manifest. By caching the etag we can HEAD the manifest if it hasn't changed and get some latency improvements for scout logs. ## Test plan CI ## Migration plan I changed the cache key, so even if we had a disk cache it would be backwards-compatible. ## Observability plan N/A ## Documentation Changes N/A --------- Co-authored-by: propel-code-bot[bot] <203372662+propel-code-bot[bot]@users.noreply.github.com>
1 parent ed55f72 commit 0e0994a

File tree

5 files changed

+33
-25
lines changed

5 files changed

+33
-25
lines changed

rust/log-service/src/lib.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use uuid::Uuid;
4747
use wal3::{
4848
Cursor, CursorName, CursorStore, CursorStoreOptions, Fragment, GarbageCollectionOptions,
4949
Limits, LogPosition, LogReader, LogReaderOptions, LogWriter, LogWriterOptions, Manifest,
50-
MarkDirty as MarkDirtyTrait, Witness,
50+
ManifestAndETag, MarkDirty as MarkDirtyTrait, Witness,
5151
};
5252

5353
pub mod state_hash_table;
@@ -289,10 +289,10 @@ async fn get_log_from_handle_with_mutex_held<'a>(
289289
})
290290
}
291291

292-
////////////////////////////////////// cache_key_for_manifest //////////////////////////////////////
292+
////////////////////////////////////////// cache_key_for_* /////////////////////////////////////////
293293

294-
fn cache_key_for_manifest(collection_id: CollectionUuid) -> String {
295-
format!("{collection_id}::MANIFEST")
294+
fn cache_key_for_manifest_and_etag(collection_id: CollectionUuid) -> String {
295+
format!("{collection_id}::MANIFEST/ETAG")
296296
}
297297

298298
fn cache_key_for_cursor(collection_id: CollectionUuid, name: &CursorName) -> String {
@@ -1352,11 +1352,11 @@ impl LogServer {
13521352
Err(err) => return Err(Status::new(err.code().into(), err.to_string())),
13531353
};
13541354
if let Some(cache) = self.cache.as_ref() {
1355-
let cache_key = cache_key_for_manifest(collection_id);
1356-
if let Some(manifest) = log.manifest() {
1357-
if let Ok(manifest_bytes) = serde_json::to_vec(&manifest) {
1355+
let cache_key = cache_key_for_manifest_and_etag(collection_id);
1356+
if let Some(manifest_and_etag) = log.manifest_and_etag() {
1357+
if let Ok(manifest_and_etag_bytes) = serde_json::to_vec(&manifest_and_etag) {
13581358
let cache_value = CachedBytes {
1359-
bytes: manifest_bytes,
1359+
bytes: manifest_and_etag_bytes,
13601360
};
13611361
cache.insert(cache_key, cache_value).await;
13621362
}
@@ -1427,16 +1427,17 @@ impl LogServer {
14271427
pull_logs: &PullLogsRequest,
14281428
) -> Option<Vec<Fragment>> {
14291429
if let Some(cache) = self.cache.as_ref() {
1430-
let cache_key = cache_key_for_manifest(collection_id);
1430+
let cache_key = cache_key_for_manifest_and_etag(collection_id);
14311431
let cached_bytes = cache.get(&cache_key).await.ok().flatten()?;
1432-
let manifest: Manifest = serde_json::from_slice(&cached_bytes.bytes).ok()?;
1432+
let manifest_and_etag: ManifestAndETag =
1433+
serde_json::from_slice(&cached_bytes.bytes).ok()?;
14331434
let limits = Limits {
14341435
max_files: Some(pull_logs.batch_size as u64 + 1),
14351436
max_bytes: None,
14361437
max_records: Some(pull_logs.batch_size as u64),
14371438
};
14381439
LogReader::scan_from_manifest(
1439-
&manifest,
1440+
&manifest_and_etag.manifest,
14401441
LogPosition::from_offset(pull_logs.start_from_offset as u64),
14411442
limits,
14421443
)
@@ -2027,7 +2028,7 @@ impl LogServer {
20272028
let collection_id = Uuid::parse_str(&x)
20282029
.map(CollectionUuid)
20292030
.map_err(|_| Status::invalid_argument("Failed to parse collection id"))?;
2030-
Some(cache_key_for_manifest(collection_id))
2031+
Some(cache_key_for_manifest_and_etag(collection_id))
20312032
}
20322033
Some(EntryToEvict::Fragment(f)) => {
20332034
let collection_id = Uuid::parse_str(&f.collection_id)

rust/wal3/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ pub use copy::copy;
2323
pub use cursors::{Cursor, CursorName, CursorStore, Witness};
2424
pub use destroy::destroy;
2525
pub use gc::{Garbage, GarbageCollector};
26-
pub use manifest::{unprefixed_snapshot_path, Manifest, Snapshot, SnapshotPointer};
26+
pub use manifest::{
27+
unprefixed_snapshot_path, Manifest, ManifestAndETag, Snapshot, SnapshotPointer,
28+
};
2729
pub use manifest_manager::ManifestManager;
2830
pub use reader::{Limits, LogReader};
2931
pub use snapshot_cache::SnapshotCache;

rust/wal3/src/manifest.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -925,6 +925,14 @@ impl Manifest {
925925
}
926926
}
927927

928+
////////////////////////////////////////// ManifestAndETag /////////////////////////////////////////
929+
930+
#[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)]
931+
pub struct ManifestAndETag {
932+
pub manifest: Manifest,
933+
pub e_tag: ETag,
934+
}
935+
928936
/////////////////////////////////////////////// tests //////////////////////////////////////////////
929937

930938
#[cfg(test)]

rust/wal3/src/manifest_manager.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,14 @@ use std::time::Instant;
44
use chroma_storage::{ETag, Storage};
55

66
use crate::gc::Garbage;
7-
use crate::manifest::{Manifest, Snapshot};
7+
use crate::manifest::{Manifest, ManifestAndETag, Snapshot};
88
use crate::reader::read_fragment;
99
use crate::writer::MarkDirty;
1010
use crate::{
1111
unprefixed_fragment_path, Error, Fragment, FragmentSeqNo, GarbageCollectionOptions,
1212
LogPosition, SnapshotCache, SnapshotOptions, SnapshotPointerOrFragmentSeqNo, ThrottleOptions,
1313
};
1414

15-
////////////////////////////////////////// ManifestAndETag /////////////////////////////////////////
16-
17-
#[derive(Debug)]
18-
struct ManifestAndETag {
19-
manifest: Manifest,
20-
e_tag: ETag,
21-
}
22-
2315
////////////////////////////////////////////// Staging /////////////////////////////////////////////
2416

2517
#[derive(Debug)]
@@ -270,9 +262,9 @@ impl ManifestManager {
270262
}
271263

272264
/// Return the latest stable manifest
273-
pub fn latest(&self) -> Manifest {
265+
pub fn latest(&self) -> ManifestAndETag {
274266
let staging = self.staging.lock().unwrap();
275-
staging.stable.manifest.clone()
267+
staging.stable.clone()
276268
}
277269

278270
/// Recover from a fault in writing. It is possible that fragments have been written that are

rust/wal3/src/writer.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ use tracing_opentelemetry::OpenTelemetrySpanExt;
1818
use crate::{
1919
unprefixed_fragment_path, BatchManager, CursorStore, CursorStoreOptions, Error,
2020
ExponentialBackoff, Fragment, FragmentSeqNo, Garbage, GarbageCollectionOptions, LogPosition,
21-
LogReader, LogReaderOptions, LogWriterOptions, Manifest, ManifestManager, ThrottleOptions,
21+
LogReader, LogReaderOptions, LogWriterOptions, Manifest, ManifestAndETag, ManifestManager,
22+
ThrottleOptions,
2223
};
2324

2425
/// The epoch writer is a counting writer. Every epoch exists. An epoch goes
@@ -275,6 +276,10 @@ impl LogWriter {
275276
}
276277

277278
pub fn manifest(&self) -> Option<Manifest> {
279+
self.manifest_and_etag().map(|m| m.manifest)
280+
}
281+
282+
pub fn manifest_and_etag(&self) -> Option<ManifestAndETag> {
278283
// SAFETY(rescrv): Mutex poisoning.
279284
let inner = self.inner.lock().unwrap();
280285
inner

0 commit comments

Comments
 (0)