@@ -73,22 +73,19 @@ impl VectorResultsProducer {
7373 let region: & ' static str = config. region . clone ( ) . leak ( ) ;
7474 let mut batch = Vec :: with_capacity ( config. vector_batch_size ) ;
7575
76- while let Some ( json) = receiver. recv ( ) . await {
77- let new_count = pending_items. fetch_sub ( 1 , Ordering :: Relaxed ) ;
78- metrics:: gauge!( "producer.pending_items" ) . set ( new_count as f64 ) ;
79-
80- batch. push ( json) ;
81-
82- if batch. len ( ) < config. vector_batch_size {
83- continue ;
84- }
76+ while receiver
77+ . recv_many ( & mut batch, config. vector_batch_size )
78+ . await
79+ > 0
80+ {
81+ let new_count = pending_items. fetch_sub ( batch. len ( ) , Ordering :: Relaxed ) ;
82+ metrics:: gauge!( "producer.pending_items" , "uptime_region" => region)
83+ . set ( new_count as f64 ) ;
8584
86- let batch_to_send =
87- std:: mem:: replace ( & mut batch, Vec :: with_capacity ( config. vector_batch_size ) ) ;
8885 if let Err ( e) = {
8986 let start = std:: time:: Instant :: now ( ) ;
9087 let result = send_batch (
91- batch_to_send ,
88+ & mut batch ,
9289 client. clone ( ) ,
9390 config. endpoint . clone ( ) ,
9491 config. retry_vector_errors_forever ,
@@ -104,7 +101,7 @@ impl VectorResultsProducer {
104101
105102 if !batch. is_empty ( ) {
106103 if let Err ( e) = send_batch (
107- batch,
104+ & mut batch,
108105 client,
109106 config. endpoint ,
110107 config. retry_vector_errors_forever ,
@@ -139,7 +136,7 @@ impl ResultsProducer for VectorResultsProducer {
139136}
140137
141138async fn send_batch (
142- batch : Vec < Vec < u8 > > ,
139+ batch : & mut Vec < Vec < u8 > > ,
143140 client : Client ,
144141 endpoint : String ,
145142 retry_vector_errors_forever : bool ,
@@ -149,11 +146,14 @@ async fn send_batch(
149146 return Ok ( ( ) ) ;
150147 }
151148
152- let body: String = batch
153- . iter ( )
154- . filter_map ( |json| String :: from_utf8 ( json. clone ( ) ) . ok ( ) )
155- . map ( |s| s + "\n " )
156- . collect ( ) ;
149+ let vec_size = batch. iter ( ) . fold ( 0 , |acc, b| acc + b. len ( ) + 1 ) ;
150+ let batch_len = batch. len ( ) ;
151+
152+ let mut body = Vec :: with_capacity ( vec_size) ;
153+ for s in batch. drain ( ..) {
154+ body. extend ( s) ;
155+ body. push ( b'\n' ) ;
156+ }
157157
158158 const BASE_DELAY_MS : u64 = 100 ;
159159 const MAX_DELAY_MS : u64 = 2000 ;
@@ -203,15 +203,15 @@ async fn send_batch(
203203 status = ?resp. status( ) ,
204204 retry = num_of_retries,
205205 delay_ms = ?delay. as_millis( ) ,
206- batch_size = batch . len ( ) ,
206+ batch_size = batch_len ,
207207 "request.failed_to_vector_retrying"
208208 ) ;
209209 }
210210 Err ( e) => {
211211 tracing:: warn!(
212212 error = ?e,
213213 retry = num_of_retries,
214- batch_size = batch . len ( ) ,
214+ batch_size = batch_len ,
215215 delay_ms = ?delay. as_millis( ) ,
216216 "request.failed_to_vector_retrying"
217217 ) ;
0 commit comments