11use std:: collections:: BTreeMap ;
22use std:: sync:: Arc ;
3+ use std:: time:: Duration ;
34
45use crate :: execution:: client_request_details:: ClientRequestDetails ;
56use crate :: executors:: common:: HttpExecutionResponse ;
67use crate :: executors:: dedupe:: { request_fingerprint, ABuildHasher , SharedResponse } ;
8+ use crate :: utils:: expression:: execute_expression_with_value;
79use dashmap:: DashMap ;
8- use hive_router_config:: traffic_shaping:: DurationOrExpression ;
910use tokio:: sync:: OnceCell ;
1011
1112use async_trait:: async_trait;
@@ -20,6 +21,7 @@ use hyper_tls::HttpsConnector;
2021use hyper_util:: client:: legacy:: { connect:: HttpConnector , Client } ;
2122use tokio:: sync:: Semaphore ;
2223use tracing:: debug;
24+ use vrl:: compiler:: Program as VrlProgram ;
2325use vrl:: core:: Value as VrlValue ;
2426
2527use crate :: executors:: common:: HttpExecutionRequest ;
@@ -31,6 +33,12 @@ use crate::utils::consts::COMMA;
3133use crate :: utils:: consts:: QUOTE ;
3234use crate :: { executors:: common:: SubgraphExecutor , json_writer:: write_and_escape_string} ;
3335
36+ #[ derive( Debug ) ]
37+ pub enum DurationOrProgram {
38+ Duration ( Duration ) ,
39+ Program ( Box < VrlProgram > ) ,
40+ }
41+
3442#[ derive( Debug ) ]
3543pub struct HTTPSubgraphExecutor {
3644 pub subgraph_name : String ,
@@ -40,7 +48,7 @@ pub struct HTTPSubgraphExecutor {
4048 pub semaphore : Arc < Semaphore > ,
4149 pub dedupe_enabled : bool ,
4250 pub in_flight_requests : Arc < DashMap < u64 , Arc < OnceCell < SharedResponse > > , ABuildHasher > > ,
43- pub timeout : DurationOrExpression ,
51+ pub timeout : DurationOrProgram ,
4452}
4553
4654const FIRST_VARIABLE_STR : & [ u8 ] = b",\" variables\" :{" ;
@@ -56,7 +64,7 @@ impl HTTPSubgraphExecutor {
5664 semaphore : Arc < Semaphore > ,
5765 dedupe_enabled : bool ,
5866 in_flight_requests : Arc < DashMap < u64 , Arc < OnceCell < SharedResponse > > , ABuildHasher > > ,
59- timeout : DurationOrExpression ,
67+ timeout : DurationOrProgram ,
6068 ) -> Self {
6169 let mut header_map = HeaderMap :: new ( ) ;
6270 header_map. insert (
@@ -159,17 +167,21 @@ impl HTTPSubgraphExecutor {
159167 debug ! ( "making http request to {}" , self . endpoint. to_string( ) ) ;
160168
161169 let timeout = match & self . timeout {
162- DurationOrExpression :: Duration ( dur) => * dur,
163- DurationOrExpression :: Expression ( expr ) => {
170+ DurationOrProgram :: Duration ( dur) => * dur,
171+ DurationOrProgram :: Program ( program ) => {
164172 let value =
165173 VrlValue :: Object ( BTreeMap :: from ( [ ( "request" . into ( ) , client_request. into ( ) ) ] ) ) ;
166- let result = expr. execute_with_value ( value) . map_err ( |err| {
167- SubgraphExecutorError :: TimeoutExpressionResolutionFailure ( err. to_string ( ) )
174+ let result = execute_expression_with_value ( program, value) . map_err ( |err| {
175+ SubgraphExecutorError :: TimeoutExpressionResolution (
176+ self . subgraph_name . to_string ( ) ,
177+ err. to_string ( ) ,
178+ )
168179 } ) ?;
169180 match result {
170181 VrlValue :: Integer ( i) => {
171182 if i < 0 {
172- return Err ( SubgraphExecutorError :: TimeoutExpressionResolutionFailure (
183+ return Err ( SubgraphExecutorError :: TimeoutExpressionResolution (
184+ self . subgraph_name . to_string ( ) ,
173185 "Timeout expression resolved to a negative integer" . to_string ( ) ,
174186 ) ) ;
175187 }
@@ -178,28 +190,30 @@ impl HTTPSubgraphExecutor {
178190 VrlValue :: Float ( f) => {
179191 let f = f. into_inner ( ) ;
180192 if f < 0.0 {
181- return Err ( SubgraphExecutorError :: TimeoutExpressionResolutionFailure (
193+ return Err ( SubgraphExecutorError :: TimeoutExpressionResolution (
194+ self . subgraph_name . to_string ( ) ,
182195 "Timeout expression resolved to a negative float" . to_string ( ) ,
183196 ) ) ;
184197 }
185198 std:: time:: Duration :: from_millis ( f as u64 )
186199 }
187200 VrlValue :: Bytes ( b) => {
188201 let s = std:: str:: from_utf8 ( & b) . map_err ( |e| {
189- SubgraphExecutorError :: TimeoutExpressionResolutionFailure ( format ! (
190- "Failed to parse duration string from bytes: {}" ,
191- e
192- ) )
202+ SubgraphExecutorError :: TimeoutExpressionResolution (
203+ self . subgraph_name . to_string ( ) ,
204+ format ! ( "Failed to parse duration string from bytes: {}" , e ) ,
205+ )
193206 } ) ?;
194207 humantime:: parse_duration ( s) . map_err ( |e| {
195- SubgraphExecutorError :: TimeoutExpressionResolutionFailure ( format ! (
196- "Failed to parse duration string '{}': {}" ,
197- s, e
198- ) )
208+ SubgraphExecutorError :: TimeoutExpressionResolution (
209+ self . subgraph_name . to_string ( ) ,
210+ format ! ( "Failed to parse duration string '{}': {}" , s, e) ,
211+ )
199212 } ) ?
200213 }
201214 other => {
202- return Err ( SubgraphExecutorError :: TimeoutExpressionResolutionFailure (
215+ return Err ( SubgraphExecutorError :: TimeoutExpressionResolution (
216+ self . subgraph_name . to_string ( ) ,
203217 format ! (
204218 "Timeout expression resolved to an unexpected type: {}. Expected a non-negative integer/float (ms) or a duration string." ,
205219 other. kind( )
0 commit comments