Skip to content

Commit f56877a

Browse files
committed
vector-store: Move Usearch operations to a dedicated tasks
Usearch's API uses global locks. Operations like `add` and `remove` can hold these locks for extended periods. When these operations are executed on the Tokio runtime, they can block worker threads. This leads to thread starvation, effectively halting all asynchronous tasks, including handling HTTP requests, processing CDC data, and serving metrics. This commit moves the blocking `add` and `remove` calls to a dedicated Rayon thread pool. By isolating these operations, the Tokio runtime remains unblocked and responsive, ensuring the application's I/O continues to function correctly.
1 parent b066520 commit f56877a

File tree

5 files changed

+103
-18
lines changed

5 files changed

+103
-18
lines changed

Cargo.lock

Lines changed: 40 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/vector-store/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ utoipa.workspace = true
5757
utoipa-axum.workspace = true
5858
utoipa-swagger-ui.workspace = true
5959
uuid.workspace = true
60+
rayon = "1.11.0"
6061

6162
[dev-dependencies]
6263
mockall.workspace = true

crates/vector-store/src/index/usearch.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use std::sync::atomic::Ordering;
3131
use std::time::Duration;
3232
use std::time::Instant;
3333
use tokio::sync::Notify;
34+
use tokio::sync::OwnedSemaphorePermit;
3435
use tokio::sync::Semaphore;
3536
use tokio::sync::mpsc;
3637
use tokio::sync::oneshot;
@@ -326,14 +327,13 @@ impl UsearchIndex for RwLock<Simulator> {
326327
sim.wait_search(start);
327328

328329
let keys = {
329-
let keys = sim.keys.read().unwrap();
330-
let len = keys.len() as u64;
330+
let len = sim.keys.read().unwrap().len() as u64;
331331
if len == 0 {
332332
Vec::new()
333333
} else {
334334
iter::repeat_with(|| rand::random_range(0..len))
335335
.map(Key)
336-
.filter(|key| keys.contains(key))
336+
.filter(|key| sim.keys.read().unwrap().contains(key))
337337
.take(limit.0.get())
338338
.collect()
339339
}
@@ -425,8 +425,7 @@ fn new(
425425
let usearch_key = Arc::clone(&usearch_key);
426426
async move {
427427
crate::move_to_the_end_of_async_runtime_queue().await;
428-
process(msg, dimensions, idx, keys, usearch_key);
429-
drop(permit);
428+
process(msg, dimensions, idx, keys, usearch_key, permit);
430429
}
431430
});
432431
}
@@ -445,24 +444,32 @@ fn new(
445444
fn process(
446445
msg: Index,
447446
dimensions: Dimensions,
448-
idx: Arc<impl UsearchIndex>,
447+
idx: Arc<impl UsearchIndex + Send + Sync + 'static>,
449448
keys: Arc<RwLock<BiMap<PrimaryKey, Key>>>,
450449
usearch_key: Arc<AtomicU64>,
450+
permit: OwnedSemaphorePermit,
451451
) {
452452
match msg {
453453
Index::AddOrReplace {
454454
primary_key,
455455
embedding,
456-
in_progress: _in_progress,
456+
in_progress,
457457
} => {
458-
add_or_replace(idx, keys, usearch_key, primary_key, embedding);
458+
rayon::spawn(move || {
459+
add_or_replace(idx, keys, usearch_key, primary_key, embedding);
460+
drop(in_progress);
461+
drop(permit);
462+
});
459463
}
460464

461465
Index::Remove {
462466
primary_key,
463467
in_progress: _in_progress,
464468
} => {
465-
remove(idx, keys, primary_key);
469+
rayon::spawn(move || {
470+
remove(idx, keys, primary_key);
471+
drop(permit);
472+
});
466473
}
467474

468475
Index::Ann {

crates/vector-store/tests/integration/status.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
use crate::usearch::{setup_store, setup_store_and_wait_for_index};
77
use crate::wait_for;
8+
use vector_store::Config;
89
use vector_store::httproutes::NodeStatus;
910

1011
#[tokio::test]
@@ -19,7 +20,7 @@ async fn status_is_serving_after_creation() {
1920
#[tokio::test]
2021
async fn status_is_bootstrapping_while_discovering_indexes() {
2122
crate::enable_tracing();
22-
let (run, _index, db, _node_state) = setup_store().await;
23+
let (run, _index, db, _node_state) = setup_store(Config::default()).await;
2324
db.simulate_endless_get_indexes_processing();
2425
let (client, _server, _config_tx) = run.await;
2526

crates/vector-store/tests/integration/usearch.rs

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
44
*/
55

6+
use crate::Duration;
67
use crate::db_basic;
78
use crate::db_basic::DbBasic;
89
use crate::db_basic::Index;
@@ -28,7 +29,9 @@ use vector_store::SpaceType;
2829
use vector_store::Vector;
2930
use vector_store::node_state::NodeState;
3031

31-
pub(crate) async fn setup_store() -> (
32+
pub(crate) async fn setup_store(
33+
config: Config,
34+
) -> (
3235
impl std::future::Future<Output = (HttpClient, impl Sized, impl Sized)>,
3336
IndexMetadata,
3437
DbBasic,
@@ -76,13 +79,12 @@ pub(crate) async fn setup_store() -> (
7679
)
7780
.unwrap();
7881

79-
let (_, rx) = watch::channel(Arc::new(Config::default()));
80-
let index_factory = vector_store::new_index_factory_usearch(rx).unwrap();
82+
let (config_tx, config_rx) = watch::channel(Arc::new(config));
83+
let index_factory = vector_store::new_index_factory_usearch(config_rx.clone()).unwrap();
8184

8285
let run = {
8386
let node_state = node_state.clone();
8487
async move {
85-
let (_config_tx, config_rx) = watch::channel(Arc::new(vector_store::Config::default()));
8688
let (server, addr) = vector_store::run(
8789
SocketAddr::from(([127, 0, 0, 1], 0)).into(),
8890
node_state,
@@ -93,7 +95,7 @@ pub(crate) async fn setup_store() -> (
9395
.await
9496
.unwrap();
9597

96-
(HttpClient::new(addr), server, _config_tx)
98+
(HttpClient::new(addr), server, config_tx)
9799
}
98100
};
99101

@@ -107,7 +109,7 @@ pub(crate) async fn setup_store_and_wait_for_index() -> (
107109
impl Sized,
108110
Sender<NodeState>,
109111
) {
110-
let (run, index, db, node_state) = setup_store().await;
112+
let (run, index, db, node_state) = setup_store(Config::default()).await;
111113
let (client, server, _config_tx) = run.await;
112114

113115
wait_for(
@@ -123,7 +125,7 @@ pub(crate) async fn setup_store_and_wait_for_index() -> (
123125
async fn simple_create_search_delete_index() {
124126
crate::enable_tracing();
125127

126-
let (run, index, db, _node_state) = setup_store().await;
128+
let (run, index, db, _node_state) = setup_store(Config::default()).await;
127129
let (client, _server, _config_tx) = run.await;
128130

129131
db.insert_values(
@@ -349,7 +351,7 @@ async fn ann_returns_bad_request_when_provided_vector_size_is_not_eq_index_dimen
349351
#[tokio::test]
350352
async fn ann_fail_while_building() {
351353
crate::enable_tracing();
352-
let (run, index, db, _node_state) = setup_store().await;
354+
let (run, index, db, _node_state) = setup_store(Config::default()).await;
353355
db.set_next_full_scan_progress(vector_store::Progress::InProgress(
354356
Percentage::try_from(33.333).unwrap(),
355357
));
@@ -393,3 +395,37 @@ async fn ann_works_with_embedding_field_name() {
393395

394396
assert_eq!(response.status(), StatusCode::OK);
395397
}
398+
399+
#[tokio::test]
400+
#[ntest::timeout(10_000)]
401+
async fn http_server_is_responsive_when_index_add_hangs() {
402+
crate::enable_tracing();
403+
let config = Config {
404+
usearch_simulator: Some(vec![
405+
Duration::from_secs(0),
406+
Duration::from_secs(20), // Simulate long add operation (longer than test timeout).
407+
Duration::from_secs(0),
408+
]),
409+
..Default::default()
410+
};
411+
let (run, index, db, _node_state) = setup_store(config).await;
412+
// Insert a value before starting the vector store. The mock DB does not support
413+
// adding embeddings while it's running, so it must be populated beforehand.
414+
db.insert_values(
415+
&index.keyspace_name,
416+
&index.table_name,
417+
&index.target_column,
418+
vec![(
419+
vec![CqlValue::Int(1), CqlValue::Text("one".to_string())].into(),
420+
Some(vec![1., 1., 1.].into()),
421+
OffsetDateTime::from_unix_timestamp(10).unwrap().into(),
422+
)],
423+
)
424+
.unwrap();
425+
let (client, _server, _config_tx) = run.await;
426+
427+
// Ensure the HTTP server stays responsive while the (simulated) embedding add is long-running.
428+
let status = client.status().await.unwrap();
429+
430+
assert_eq!(status, vector_store::httproutes::NodeStatus::Serving);
431+
}

0 commit comments

Comments
 (0)