diff --git a/src/producer/vector_producer.rs b/src/producer/vector_producer.rs index f2268b8..82cc17f 100644 --- a/src/producer/vector_producer.rs +++ b/src/producer/vector_producer.rs @@ -68,22 +68,19 @@ impl VectorResultsProducer { let region: &'static str = config.region.clone().leak(); let mut batch = Vec::with_capacity(config.vector_batch_size); - while let Some(json) = receiver.recv().await { - let new_count = pending_items.fetch_sub(1, Ordering::Relaxed); - metrics::gauge!("producer.pending_items").set(new_count as f64); - - batch.push(json); - - if batch.len() < config.vector_batch_size { - continue; - } + while receiver + .recv_many(&mut batch, config.vector_batch_size) + .await + > 0 + { + let new_count = pending_items.fetch_sub(batch.len(), Ordering::Relaxed); + metrics::gauge!("producer.pending_items", "uptime_region" => region) + .set(new_count as f64); - let batch_to_send = - std::mem::replace(&mut batch, Vec::with_capacity(config.vector_batch_size)); if let Err(e) = { let start = std::time::Instant::now(); let result = send_batch( - batch_to_send, + &mut batch, client.clone(), config.endpoint.clone(), config.retry_vector_errors_forever, @@ -99,7 +96,7 @@ impl VectorResultsProducer { if !batch.is_empty() { if let Err(e) = send_batch( - batch, + &mut batch, client, config.endpoint, config.retry_vector_errors_forever, @@ -134,7 +131,7 @@ impl ResultsProducer for VectorResultsProducer { } async fn send_batch( - batch: Vec>, + batch: &mut Vec>, client: Client, endpoint: String, retry_vector_errors_forever: bool, @@ -144,11 +141,14 @@ async fn send_batch( return Ok(()); } - let body: String = batch - .iter() - .filter_map(|json| String::from_utf8(json.clone()).ok()) - .map(|s| s + "\n") - .collect(); + let vec_size = batch.iter().fold(0, |acc, b| acc + b.len() + 1); + let batch_len = batch.len(); + + let mut body = Vec::with_capacity(vec_size); + for s in batch.drain(..) { + body.extend(s); + body.push(b'\n'); + } const BASE_DELAY_MS: u64 = 100; const MAX_DELAY_MS: u64 = 2000; @@ -198,7 +198,7 @@ async fn send_batch( status = ?resp.status(), retry = num_of_retries, delay_ms = ?delay.as_millis(), - batch_size = batch.len(), + batch_size = batch_len, "request.failed_to_vector_retrying" ); } @@ -206,7 +206,7 @@ async fn send_batch( tracing::warn!( error = ?e, retry = num_of_retries, - batch_size = batch.len(), + batch_size = batch_len, delay_ms = ?delay.as_millis(), "request.failed_to_vector_retrying" );