diff --git a/.changeset/shared_utilities_to_handle_vrl_expressions.md b/.changeset/shared_utilities_to_handle_vrl_expressions.md new file mode 100644 index 000000000..97c07aa3a --- /dev/null +++ b/.changeset/shared_utilities_to_handle_vrl_expressions.md @@ -0,0 +1,15 @@ +--- +default: minor +--- + +# Breaking + +Removed `pool_idle_timeout_seconds` from `traffic_shaping`, instead use `pool_idle_timeout` with duration format. + +```diff +traffic_shaping: +- pool_idle_timeout_seconds: 30 ++ pool_idle_timeout: 30s +``` + +#540 by @ardatan diff --git a/Cargo.lock b/Cargo.lock index 360f7cd81..2365117e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2083,6 +2083,7 @@ dependencies = [ "hive-router-query-planner", "http", "http-body-util", + "humantime", "hyper", "hyper-tls", "hyper-util", @@ -2090,6 +2091,7 @@ dependencies = [ "insta", "itoa", "ntex-http", + "once_cell", "ordered-float", "regex-automata", "ryu", diff --git a/bin/router/src/pipeline/progressive_override.rs b/bin/router/src/pipeline/progressive_override.rs index d0b09c183..0c2ad9c15 100644 --- a/bin/router/src/pipeline/progressive_override.rs +++ b/bin/router/src/pipeline/progressive_override.rs @@ -1,20 +1,22 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use hive_router_config::override_labels::{LabelOverrideValue, OverrideLabelsConfig}; -use hive_router_plan_executor::execution::client_request_details::ClientRequestDetails; +use hive_router_plan_executor::{ + execution::client_request_details::ClientRequestDetails, utils::expression::compile_expression, +}; use hive_router_query_planner::{ graph::{PlannerOverrideContext, PERCENTAGE_SCALE_FACTOR}, state::supergraph_state::SupergraphState, }; use rand::Rng; use vrl::{ - compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue}, + compiler::Program as VrlProgram, + compiler::TargetValue as VrlTargetValue, core::Value as VrlValue, prelude::{ state::RuntimeState as VrlState, Context as VrlContext, ExpressionError, TimeZone as VrlTimeZone, }, - stdlib::all as vrl_build_functions, value::Secrets as VrlSecrets, }; @@ -126,7 +128,6 @@ impl OverrideLabelsEvaluator { ) -> Result { let mut static_enabled_labels = HashSet::new(); let mut expressions = HashMap::new(); - let vrl_functions = vrl_build_functions(); for (label, value) in override_labels_config.iter() { match value { @@ -134,19 +135,13 @@ impl OverrideLabelsEvaluator { static_enabled_labels.insert(label.clone()); } LabelOverrideValue::Expression { expression } => { - let compilation_result = - vrl_compile(expression, &vrl_functions).map_err(|diagnostics| { - OverrideLabelsCompileError { - label: label.clone(), - error: diagnostics - .errors() - .into_iter() - .map(|d| d.code.to_string() + ": " + &d.message) - .collect::>() - .join(", "), - } - })?; - expressions.insert(label.clone(), compilation_result.program); + let program = compile_expression(expression, None).map_err(|err| { + OverrideLabelsCompileError { + label: label.clone(), + error: err.to_string(), + } + })?; + expressions.insert(label.clone(), program); } _ => {} // Skip false booleans } diff --git a/docs/README.md b/docs/README.md index fb474b9d2..d119d60e1 100644 --- a/docs/README.md +++ b/docs/README.md @@ -15,7 +15,7 @@ |[**override\_subgraph\_urls**](#override_subgraph_urls)|`object`|Configuration for overriding subgraph URLs.
Default: `{}`
|| |[**query\_planner**](#query_planner)|`object`|Query planning configuration.
Default: `{"allow_expose":false,"timeout":"10s"}`
|| |[**supergraph**](#supergraph)|`object`|Configuration for the Federation supergraph source. By default, the router will use a local file-based supergraph source (`./supergraph.graphql`).
|| -|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs.
Default: `{"dedupe_enabled":true,"max_connections_per_host":100,"pool_idle_timeout_seconds":50}`
|| +|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs.
Default: `{"all":{"dedupe_enabled":true,"pool_idle_timeout":"50s","request_timeout":"30s"},"max_connections_per_host":100}`
|| **Additional Properties:** not allowed **Example** @@ -107,9 +107,11 @@ query_planner: timeout: 10s supergraph: {} traffic_shaping: - dedupe_enabled: true + all: + dedupe_enabled: true + pool_idle_timeout: 50s + request_timeout: 30s max_connections_per_host: 100 - pool_idle_timeout_seconds: 50 ``` @@ -1808,25 +1810,75 @@ Request timeout for the Hive Console CDN requests. ## traffic\_shaping: object -Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs. +Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs. **Properties** |Name|Type|Description|Required| |----|----|-----------|--------| -|**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.

When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will
be deduplicated by sharing the response of other in-flight requests.
Default: `true`
|| +|[**all**](#traffic_shapingall)|`object`|The default configuration that will be applied to all subgraphs, unless overridden by a specific subgraph configuration.
Default: `{"dedupe_enabled":true,"pool_idle_timeout":"50s","request_timeout":"30s"}`
|| |**max\_connections\_per\_host**|`integer`|Limits the concurrent amount of requests/connections per host/subgraph.
Default: `100`
Format: `"uint"`
Minimum: `0`
|| -|**pool\_idle\_timeout\_seconds**|`integer`|Timeout for idle sockets being kept-alive.
Default: `50`
Format: `"uint64"`
Minimum: `0`
|| +|[**subgraphs**](#traffic_shapingsubgraphs)|`object`|Optional per-subgraph configurations that will override the default configuration for specific subgraphs.
|| **Additional Properties:** not allowed **Example** ```yaml -dedupe_enabled: true +all: + dedupe_enabled: true + pool_idle_timeout: 50s + request_timeout: 30s max_connections_per_host: 100 -pool_idle_timeout_seconds: 50 ``` + +### traffic\_shaping\.all: object + +The default configuration that will be applied to all subgraphs, unless overridden by a specific subgraph configuration. + + +**Properties** + +|Name|Type|Description|Required| +|----|----|-----------|--------| +|**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.

When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will
be deduplicated by sharing the response of other in-flight requests.
Default: `true`
|| +|**pool\_idle\_timeout**|`string`|Timeout for idle sockets being kept-alive.
Default: `"50s"`
|| +|**request\_timeout**||Optional timeout configuration for requests to subgraphs.

Example with a fixed duration:
```yaml
timeout:
duration: 5s
```

Or with a VRL expression that can return a duration based on the operation kind:
```yaml
timeout:
expression: \|
if (.request.operation.type == "mutation") {
"10s"
} else {
"15s"
}
```
Default: `"30s"`
|| + +**Additional Properties:** not allowed +**Example** + +```yaml +dedupe_enabled: true +pool_idle_timeout: 50s +request_timeout: 30s + +``` + + +### traffic\_shaping\.subgraphs: object + +Optional per-subgraph configurations that will override the default configuration for specific subgraphs. + + +**Additional Properties** + +|Name|Type|Description|Required| +|----|----|-----------|--------| +|[**Additional Properties**](#traffic_shapingsubgraphsadditionalproperties)|`object`||yes| + + +#### traffic\_shaping\.subgraphs\.additionalProperties: object + +**Properties** + +|Name|Type|Description|Required| +|----|----|-----------|--------| +|**dedupe\_enabled**|`boolean`, `null`|Enables/disables request deduplication to subgraphs.

When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will
be deduplicated by sharing the response of other in-flight requests.
|no| +|**pool\_idle\_timeout\_seconds**|`string`|Timeout for idle sockets being kept-alive.
|yes| +|**request\_timeout**||Optional timeout configuration for requests to subgraphs.

Example with a fixed duration:
```yaml
timeout:
duration: 5s
```

Or with a VRL expression that can return a duration based on the operation kind:
```yaml
timeout:
expression: \|
if (.request.operation.type == "mutation") {
"10s"
} else {
"15s"
}
```
|no| + +**Additional Properties:** not allowed diff --git a/lib/executor/Cargo.toml b/lib/executor/Cargo.toml index c3f6f9117..d416ab28b 100644 --- a/lib/executor/Cargo.toml +++ b/lib/executor/Cargo.toml @@ -44,11 +44,13 @@ hyper-util = { version = "0.1.16", features = [ "http2", "tokio", ] } +humantime = "2.3.0" bytes = "1.10.1" itoa = "1.0.15" ryu = "1.0.20" indexmap = "2.10.0" bumpalo = "3.19.0" +once_cell = "1.21.3" [dev-dependencies] subgraphs = { path = "../../bench/subgraphs" } diff --git a/lib/executor/src/execution/plan.rs b/lib/executor/src/execution/plan.rs index f86356312..563c0eb24 100644 --- a/lib/executor/src/execution/plan.rs +++ b/lib/executor/src/execution/plan.rs @@ -19,7 +19,7 @@ use crate::{ rewrites::FetchRewriteExt, }, executors::{ - common::{HttpExecutionRequest, HttpExecutionResponse}, + common::{HttpExecutionResponse, SubgraphExecutionRequest}, map::SubgraphExecutorMap, }, headers::{ @@ -700,7 +700,7 @@ impl<'exec, 'req> Executor<'exec, 'req> { let variable_refs = select_fetch_variables(self.variable_values, node.variable_usages.as_ref()); - let mut subgraph_request = HttpExecutionRequest { + let mut subgraph_request = SubgraphExecutionRequest { query: node.operation.document_str.as_str(), dedupe: self.dedupe_subgraph_requests, operation_name: node.operation_name.as_deref(), diff --git a/lib/executor/src/executors/common.rs b/lib/executor/src/executors/common.rs index bdcd4d819..27e7828c0 100644 --- a/lib/executor/src/executors/common.rs +++ b/lib/executor/src/executors/common.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use async_trait::async_trait; use bytes::Bytes; @@ -9,7 +9,8 @@ use sonic_rs::Value; pub trait SubgraphExecutor { async fn execute<'a>( &self, - execution_request: HttpExecutionRequest<'a>, + execution_request: SubgraphExecutionRequest<'a>, + timeout: Option, ) -> HttpExecutionResponse; fn to_boxed_arc<'a>(self) -> Arc> @@ -26,7 +27,7 @@ pub type SubgraphExecutorBoxedArc = Arc>; pub type SubgraphRequestExtensions = HashMap; -pub struct HttpExecutionRequest<'a> { +pub struct SubgraphExecutionRequest<'a> { pub query: &'a str, pub dedupe: bool, pub operation_name: Option<&'a str>, @@ -37,7 +38,7 @@ pub struct HttpExecutionRequest<'a> { pub extensions: Option, } -impl HttpExecutionRequest<'_> { +impl SubgraphExecutionRequest<'_> { pub fn add_request_extensions_field(&mut self, key: String, value: Value) { self.extensions .get_or_insert_with(HashMap::new) diff --git a/lib/executor/src/executors/duration_or_prog.rs b/lib/executor/src/executors/duration_or_prog.rs new file mode 100644 index 000000000..a2f8fb7d4 --- /dev/null +++ b/lib/executor/src/executors/duration_or_prog.rs @@ -0,0 +1,24 @@ +use std::time::Duration; + +use hive_router_config::traffic_shaping::DurationOrExpression; +use vrl::{compiler::Program as VrlProgram, prelude::Function}; + +use crate::utils::expression::compile_expression; + +pub enum DurationOrProgram { + Duration(Duration), + Program(Box), +} + +pub fn compile_duration_expression( + duration_or_expr: &DurationOrExpression, + fns: Option<&[Box]>, +) -> Result { + match duration_or_expr { + DurationOrExpression::Duration(dur) => Ok(DurationOrProgram::Duration(*dur)), + DurationOrExpression::Expression { expression } => { + let program = compile_expression(expression, fns)?; + Ok(DurationOrProgram::Program(Box::new(program))) + } + } +} diff --git a/lib/executor/src/executors/error.rs b/lib/executor/src/executors/error.rs index 2234f524c..c3c7625f1 100644 --- a/lib/executor/src/executors/error.rs +++ b/lib/executor/src/executors/error.rs @@ -1,4 +1,4 @@ -use vrl::{diagnostic::DiagnosticList, prelude::ExpressionError}; +use vrl::prelude::ExpressionError; use crate::response::graphql_error::{GraphQLError, GraphQLErrorExtensions}; @@ -20,6 +20,12 @@ pub enum SubgraphExecutorError { RequestFailure(String, String), #[error("Failed to serialize variable \"{0}\": {1}")] VariablesSerializationFailure(String, String), + #[error("Failed to compile VRL expression for timeout for subgraph '{0}'. Please check your VRL expression for syntax errors. Diagnostic: {1}")] + RequestTimeoutExpressionBuild(String, String), + #[error("Failed to resolve VRL expression for timeout for subgraph '{0}'. Runtime error: {1}")] + TimeoutExpressionResolution(String, String), + #[error("Request to subgraph \"{0}\" timed out after {1} milliseconds")] + RequestTimeout(String, u64), } impl From for GraphQLError { @@ -34,21 +40,6 @@ impl From for GraphQLError { } impl SubgraphExecutorError { - pub fn new_endpoint_expression_build( - subgraph_name: String, - diagnostics: DiagnosticList, - ) -> Self { - SubgraphExecutorError::EndpointExpressionBuild( - subgraph_name, - diagnostics - .errors() - .into_iter() - .map(|d| d.code.to_string() + ": " + &d.message) - .collect::>() - .join(", "), - ) - } - pub fn new_endpoint_expression_resolution_failure( subgraph_name: String, error: ExpressionError, @@ -76,6 +67,13 @@ impl SubgraphExecutorError { SubgraphExecutorError::VariablesSerializationFailure(_, _) => { "SUBGRAPH_VARIABLES_SERIALIZATION_FAILURE" } + SubgraphExecutorError::TimeoutExpressionResolution(_, _) => { + "SUBGRAPH_TIMEOUT_EXPRESSION_RESOLUTION_FAILURE" + } + SubgraphExecutorError::RequestTimeout(_, _) => "SUBGRAPH_REQUEST_TIMEOUT", + SubgraphExecutorError::RequestTimeoutExpressionBuild(_, _) => { + "SUBGRAPH_TIMEOUT_EXPRESSION_BUILD_FAILURE" + } } } } diff --git a/lib/executor/src/executors/http.rs b/lib/executor/src/executors/http.rs index 29b392567..e7673a457 100644 --- a/lib/executor/src/executors/http.rs +++ b/lib/executor/src/executors/http.rs @@ -1,9 +1,10 @@ use std::sync::Arc; +use std::time::Duration; use crate::executors::common::HttpExecutionResponse; use crate::executors::dedupe::{request_fingerprint, ABuildHasher, SharedResponse}; use dashmap::DashMap; -use hive_router_config::HiveRouterConfig; +use futures::TryFutureExt; use tokio::sync::OnceCell; use async_trait::async_trait; @@ -19,7 +20,7 @@ use hyper_util::client::legacy::{connect::HttpConnector, Client}; use tokio::sync::Semaphore; use tracing::debug; -use crate::executors::common::HttpExecutionRequest; +use crate::executors::common::SubgraphExecutionRequest; use crate::executors::error::SubgraphExecutorError; use crate::response::graphql_error::GraphQLError; use crate::utils::consts::CLOSE_BRACE; @@ -35,7 +36,7 @@ pub struct HTTPSubgraphExecutor { pub http_client: Arc, Full>>, pub header_map: HeaderMap, pub semaphore: Arc, - pub config: Arc, + pub dedupe_enabled: bool, pub in_flight_requests: Arc>, ABuildHasher>>, } @@ -50,7 +51,7 @@ impl HTTPSubgraphExecutor { endpoint: http::Uri, http_client: Arc, semaphore: Arc, - config: Arc, + dedupe_enabled: bool, in_flight_requests: Arc>, ABuildHasher>>, ) -> Self { let mut header_map = HeaderMap::new(); @@ -69,14 +70,14 @@ impl HTTPSubgraphExecutor { http_client, header_map, semaphore, - config, + dedupe_enabled, in_flight_requests, } } - fn build_request_body<'a>( + fn build_request_body( &self, - execution_request: &HttpExecutionRequest<'a>, + execution_request: &SubgraphExecutionRequest<'_>, ) -> Result, SubgraphExecutorError> { let mut body = Vec::with_capacity(4096); body.put(FIRST_QUOTE_STR); @@ -137,6 +138,7 @@ impl HTTPSubgraphExecutor { &self, body: Vec, headers: HeaderMap, + timeout: Option, ) -> Result { let mut req = hyper::Request::builder() .method(http::Method::POST) @@ -151,9 +153,22 @@ impl HTTPSubgraphExecutor { debug!("making http request to {}", self.endpoint.to_string()); - let res = self.http_client.request(req).await.map_err(|e| { + let res_fut = self.http_client.request(req).map_err(|e| { SubgraphExecutorError::RequestFailure(self.endpoint.to_string(), e.to_string()) - })?; + }); + + let res = if let Some(timeout_duration) = timeout { + tokio::time::timeout(timeout_duration, res_fut) + .await + .map_err(|_| { + SubgraphExecutorError::RequestTimeout( + self.endpoint.to_string(), + timeout_duration.as_secs(), + ) + })? + } else { + res_fut.await + }?; debug!( "http request to {} completed, status: {}", @@ -209,10 +224,11 @@ impl HTTPSubgraphExecutor { #[async_trait] impl SubgraphExecutor for HTTPSubgraphExecutor { - #[tracing::instrument(skip_all, fields(subgraph_name = self.subgraph_name))] + #[tracing::instrument(skip_all, fields(subgraph_name = %self.subgraph_name))] async fn execute<'a>( &self, - execution_request: HttpExecutionRequest<'a>, + execution_request: SubgraphExecutionRequest<'a>, + timeout: Option, ) -> HttpExecutionResponse { let body = match self.build_request_body(&execution_request) { Ok(body) => body, @@ -230,11 +246,11 @@ impl SubgraphExecutor for HTTPSubgraphExecutor { headers.insert(key, value.clone()); }); - if !self.config.traffic_shaping.dedupe_enabled || !execution_request.dedupe { + if !self.dedupe_enabled || !execution_request.dedupe { // This unwrap is safe because the semaphore is never closed during the application's lifecycle. // `acquire()` only fails if the semaphore is closed, so this will always return `Ok`. let _permit = self.semaphore.acquire().await.unwrap(); - return match self._send_request(body, headers).await { + return match self._send_request(body, headers, timeout).await { Ok(shared_response) => HttpExecutionResponse { body: shared_response.body, headers: shared_response.headers, @@ -266,7 +282,7 @@ impl SubgraphExecutor for HTTPSubgraphExecutor { // This unwrap is safe because the semaphore is never closed during the application's lifecycle. // `acquire()` only fails if the semaphore is closed, so this will always return `Ok`. let _permit = self.semaphore.acquire().await.unwrap(); - self._send_request(body, headers).await + self._send_request(body, headers, timeout).await }; // It's important to remove the entry from the map before returning the result. // This ensures that once the OnceCell is set, no future requests can join it. diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index a3c297ad1..66faceb88 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -15,28 +15,22 @@ use hyper_util::{ }; use tokio::sync::{OnceCell, Semaphore}; use tracing::error; -use vrl::{ - compiler::compile as vrl_compile, - compiler::Program as VrlProgram, - compiler::TargetValue as VrlTargetValue, - core::Value as VrlValue, - prelude::Function as VrlFunction, - prelude::{state::RuntimeState as VrlState, Context as VrlContext, TimeZone as VrlTimeZone}, - stdlib::all as vrl_build_functions, - value::Secrets as VrlSecrets, -}; +use vrl::{compiler::Program as VrlProgram, core::Value as VrlValue}; use crate::{ execution::client_request_details::ClientRequestDetails, executors::{ common::{ - HttpExecutionRequest, HttpExecutionResponse, SubgraphExecutor, SubgraphExecutorBoxedArc, + HttpExecutionResponse, SubgraphExecutionRequest, SubgraphExecutor, + SubgraphExecutorBoxedArc, }, dedupe::{ABuildHasher, SharedResponse}, + duration_or_prog::{compile_duration_expression, DurationOrProgram}, error::SubgraphExecutorError, http::{HTTPSubgraphExecutor, HttpClient}, }, response::graphql_error::GraphQLError, + utils::expression::{compile_expression, execute_expression_with_value}, }; type SubgraphName = String; @@ -45,6 +39,7 @@ type ExecutorsBySubgraphMap = DashMap>; type EndpointsBySubgraphMap = DashMap; type ExpressionsBySubgraphMap = HashMap; +type TimeoutsBySubgraph = DashMap; pub struct SubgraphExecutorMap { executors_by_subgraph: ExecutorsBySubgraphMap, @@ -53,9 +48,8 @@ pub struct SubgraphExecutorMap { static_endpoints_by_subgraph: EndpointsBySubgraphMap, /// Mapping from subgraph name to VRL expression program expressions_by_subgraph: ExpressionsBySubgraphMap, + timeouts_by_subgraph: TimeoutsBySubgraph, config: Arc, - /// Precompiled VRL functions to be used in endpoint expressions. - vrl_functions: Vec>, client: Arc, semaphores_by_origin: DashMap>, max_connections_per_host: usize, @@ -67,9 +61,7 @@ impl SubgraphExecutorMap { let https = HttpsConnector::new(); let client: HttpClient = Client::builder(TokioExecutor::new()) .pool_timer(TokioTimer::new()) - .pool_idle_timeout(Duration::from_secs( - config.traffic_shaping.pool_idle_timeout_seconds, - )) + .pool_idle_timeout(config.traffic_shaping.all.pool_idle_timeout) .pool_max_idle_per_host(config.traffic_shaping.max_connections_per_host) .build(https); @@ -80,11 +72,11 @@ impl SubgraphExecutorMap { static_endpoints_by_subgraph: Default::default(), expressions_by_subgraph: Default::default(), config, - vrl_functions: vrl_build_functions(), client: Arc::new(client), semaphores_by_origin: Default::default(), max_connections_per_host, in_flight_requests: Arc::new(DashMap::with_hasher(ABuildHasher::default())), + timeouts_by_subgraph: Default::default(), } } @@ -118,11 +110,27 @@ impl SubgraphExecutorMap { pub async fn execute<'a, 'req>( &self, subgraph_name: &str, - execution_request: HttpExecutionRequest<'a>, + execution_request: SubgraphExecutionRequest<'a>, client_request: &ClientRequestDetails<'a, 'req>, ) -> HttpExecutionResponse { match self.get_or_create_executor(subgraph_name, client_request) { - Ok(Some(executor)) => executor.execute(execution_request).await, + Ok(Some(executor)) => { + let timeout = self + .timeouts_by_subgraph + .get(subgraph_name) + .map(|t| resolve_duration_prog(t.value(), subgraph_name, client_request)); + match timeout { + Some(Ok(dur)) => executor.execute(execution_request, Some(dur)).await, + Some(Err(err)) => { + error!( + "Failed to resolve timeout for subgraph '{}': {}", + subgraph_name, err, + ); + self.internal_server_error_response(err.into(), subgraph_name) + } + None => executor.execute(execution_request, None).await, + } + } Err(err) => { error!( "Subgraph executor error for subgraph '{}': {}", @@ -194,26 +202,19 @@ impl SubgraphExecutorMap { SubgraphExecutorError::StaticEndpointNotFound(subgraph_name.to_string()) })?, )); - let mut target = VrlTargetValue { - value: VrlValue::Object(BTreeMap::from([ - ("request".into(), client_request.into()), - ("original_url".into(), original_url_value), - ])), - metadata: VrlValue::Object(BTreeMap::new()), - secrets: VrlSecrets::default(), - }; - - let mut state = VrlState::default(); - let timezone = VrlTimeZone::default(); - let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); + let value = VrlValue::Object(BTreeMap::from([ + ("request".into(), client_request.into()), + ("original_url".into(), original_url_value), + ])); // Resolve the expression to get an endpoint URL. - let endpoint_result = expression.resolve(&mut ctx).map_err(|err| { - SubgraphExecutorError::new_endpoint_expression_resolution_failure( - subgraph_name.to_string(), - err, - ) - })?; + let endpoint_result = + execute_expression_with_value(expression, value).map_err(|err| { + SubgraphExecutorError::new_endpoint_expression_resolution_failure( + subgraph_name.to_string(), + err, + ) + })?; let endpoint_str = match endpoint_result.as_str() { Some(s) => s.to_string(), None => { @@ -269,12 +270,11 @@ impl SubgraphExecutorMap { subgraph_name: &str, expression: &str, ) -> Result<(), SubgraphExecutorError> { - let compilation_result = vrl_compile(expression, &self.vrl_functions).map_err(|e| { - SubgraphExecutorError::new_endpoint_expression_build(subgraph_name.to_string(), e) + let program = compile_expression(expression, None).map_err(|err| { + SubgraphExecutorError::EndpointExpressionBuild(subgraph_name.to_string(), err) })?; - self.expressions_by_subgraph - .insert(subgraph_name.to_string(), compilation_result.program); + .insert(subgraph_name.to_string(), program); Ok(()) } @@ -317,12 +317,56 @@ impl SubgraphExecutorMap { .or_insert_with(|| Arc::new(Semaphore::new(self.max_connections_per_host))) .clone(); + let mut client = self.client.clone(); + let mut timeout_config = &self.config.traffic_shaping.all.request_timeout; + let pool_idle_timeout_seconds = self.config.traffic_shaping.all.pool_idle_timeout; + let mut dedupe_enabled = self.config.traffic_shaping.all.dedupe_enabled; + if let Some(subgraph_traffic_shaping_config) = + self.config.traffic_shaping.subgraphs.get(subgraph_name) + { + if subgraph_traffic_shaping_config + .pool_idle_timeout_seconds + .is_some() + { + client = Arc::new( + Client::builder(TokioExecutor::new()) + .pool_timer(TokioTimer::new()) + .pool_idle_timeout( + subgraph_traffic_shaping_config + .pool_idle_timeout_seconds + .unwrap_or(pool_idle_timeout_seconds), + ) + .pool_max_idle_per_host(self.max_connections_per_host) + .build(HttpsConnector::new()), + ); + } + dedupe_enabled = subgraph_traffic_shaping_config + .dedupe_enabled + .unwrap_or(dedupe_enabled); + timeout_config = subgraph_traffic_shaping_config + .request_timeout + .as_ref() + .unwrap_or(timeout_config); + } + + if !self.timeouts_by_subgraph.contains_key(subgraph_name) { + let timeout_prog: DurationOrProgram = compile_duration_expression(timeout_config, None) + .map_err(|err| { + SubgraphExecutorError::RequestTimeoutExpressionBuild( + subgraph_name.to_string(), + err, + ) + })?; + self.timeouts_by_subgraph + .insert(subgraph_name.to_string(), timeout_prog); + } + let executor = HTTPSubgraphExecutor::new( subgraph_name.to_string(), endpoint_uri, - self.client.clone(), + client, semaphore, - self.config.clone(), + dedupe_enabled, self.in_flight_requests.clone(), ); @@ -334,3 +378,67 @@ impl SubgraphExecutorMap { Ok(()) } } + +fn resolve_duration_prog( + duration_or_program: &DurationOrProgram, + subgraph_name: &str, + client_request: &ClientRequestDetails<'_, '_>, +) -> Result { + match duration_or_program { + DurationOrProgram::Duration(dur) => Ok(*dur), + DurationOrProgram::Program(program) => { + let value = + VrlValue::Object(BTreeMap::from([("request".into(), client_request.into())])); + let result = execute_expression_with_value(program, value).map_err(|err| { + SubgraphExecutorError::TimeoutExpressionResolution( + subgraph_name.to_string(), + err.to_string(), + ) + })?; + match result { + VrlValue::Integer(i) => { + if i < 0 { + return Err(SubgraphExecutorError::TimeoutExpressionResolution( + subgraph_name.to_string(), + "Timeout expression resolved to a negative integer".to_string(), + )); + } + Ok(std::time::Duration::from_millis(i as u64)) + } + VrlValue::Float(f) => { + let f = f.into_inner(); + if f < 0.0 { + return Err(SubgraphExecutorError::TimeoutExpressionResolution( + subgraph_name.to_string(), + "Timeout expression resolved to a negative float".to_string(), + )); + } + Ok(std::time::Duration::from_millis(f as u64)) + } + VrlValue::Bytes(b) => { + let s = std::str::from_utf8(&b).map_err(|e| { + SubgraphExecutorError::TimeoutExpressionResolution( + subgraph_name.to_string(), + format!("Failed to parse duration string from bytes: {}", e), + ) + })?; + Ok(humantime::parse_duration(s).map_err(|e| { + SubgraphExecutorError::TimeoutExpressionResolution( + subgraph_name.to_string(), + format!("Failed to parse duration string '{}': {}", s, e), + ) + })?) + } + other => { + Err(SubgraphExecutorError::TimeoutExpressionResolution( + subgraph_name.to_string(), + format!( + "Timeout expression resolved to an unexpected type: {}. Expected a non-negative integer/float (ms) or a duration string.", + other.kind() + ), + )) + } + } + } + } +} diff --git a/lib/executor/src/executors/mod.rs b/lib/executor/src/executors/mod.rs index 520ff5f94..7a4ab272d 100644 --- a/lib/executor/src/executors/mod.rs +++ b/lib/executor/src/executors/mod.rs @@ -1,5 +1,6 @@ pub mod common; pub mod dedupe; +pub mod duration_or_prog; pub mod error; pub mod http; pub mod map; diff --git a/lib/executor/src/headers/compile.rs b/lib/executor/src/headers/compile.rs index 5b1b14a9f..587cbee60 100644 --- a/lib/executor/src/headers/compile.rs +++ b/lib/executor/src/headers/compile.rs @@ -1,54 +1,28 @@ -use crate::headers::{ - errors::HeaderRuleCompileError, - plan::{ - HeaderAggregationStrategy, HeaderRulesPlan, RequestHeaderRule, RequestHeaderRules, - RequestInsertExpression, RequestInsertStatic, RequestPropagateNamed, RequestPropagateRegex, - RequestRemoveNamed, RequestRemoveRegex, ResponseHeaderRule, ResponseHeaderRules, - ResponseInsertExpression, ResponseInsertStatic, ResponsePropagateNamed, - ResponsePropagateRegex, ResponseRemoveNamed, ResponseRemoveRegex, +use crate::{ + headers::{ + errors::HeaderRuleCompileError, + plan::{ + HeaderAggregationStrategy, HeaderRulesPlan, RequestHeaderRule, RequestHeaderRules, + RequestInsertExpression, RequestInsertStatic, RequestPropagateNamed, + RequestPropagateRegex, RequestRemoveNamed, RequestRemoveRegex, ResponseHeaderRule, + ResponseHeaderRules, ResponseInsertExpression, ResponseInsertStatic, + ResponsePropagateNamed, ResponsePropagateRegex, ResponseRemoveNamed, + ResponseRemoveRegex, + }, }, + utils::expression::compile_expression, }; use hive_router_config::headers as config; use http::HeaderName; use regex_automata::{meta, util::syntax::Config as SyntaxConfig}; -use vrl::{ - compiler::compile as vrl_compile, prelude::Function as VrlFunction, - stdlib::all as vrl_build_functions, -}; - -pub struct HeaderRuleCompilerContext { - vrl_functions: Vec>, -} - -impl Default for HeaderRuleCompilerContext { - fn default() -> Self { - Self::new() - } -} - -impl HeaderRuleCompilerContext { - pub fn new() -> Self { - Self { - vrl_functions: vrl_build_functions(), - } - } -} pub trait HeaderRuleCompiler { - fn compile( - &self, - ctx: &HeaderRuleCompilerContext, - actions: &mut A, - ) -> Result<(), HeaderRuleCompileError>; + fn compile(&self, actions: &mut A) -> Result<(), HeaderRuleCompileError>; } impl HeaderRuleCompiler> for config::RequestHeaderRule { - fn compile( - &self, - ctx: &HeaderRuleCompilerContext, - actions: &mut Vec, - ) -> Result<(), HeaderRuleCompileError> { + fn compile(&self, actions: &mut Vec) -> Result<(), HeaderRuleCompileError> { match self { config::RequestHeaderRule::Propagate(rule) => { let spec = materialize_match_spec( @@ -79,15 +53,13 @@ impl HeaderRuleCompiler> for config::RequestHeaderRule { })); } config::InsertSource::Expression { expression } => { - let compilation_result = - vrl_compile(expression, &ctx.vrl_functions).map_err(|e| { - HeaderRuleCompileError::new_expression_build(rule.name.clone(), e) - })?; - + let program = compile_expression(expression, None).map_err(|err| { + HeaderRuleCompileError::ExpressionBuild(rule.name.clone(), err) + })?; actions.push(RequestHeaderRule::InsertExpression( RequestInsertExpression { name: build_header_name(&rule.name)?, - expression: Box::new(compilation_result.program), + expression: Box::new(program), }, )); } @@ -112,11 +84,7 @@ impl HeaderRuleCompiler> for config::RequestHeaderRule { } impl HeaderRuleCompiler> for config::ResponseHeaderRule { - fn compile( - &self, - ctx: &HeaderRuleCompilerContext, - actions: &mut Vec, - ) -> Result<(), HeaderRuleCompileError> { + fn compile(&self, actions: &mut Vec) -> Result<(), HeaderRuleCompileError> { match self { config::ResponseHeaderRule::Propagate(rule) => { let aggregation_strategy = rule.algorithm.into(); @@ -159,15 +127,13 @@ impl HeaderRuleCompiler> for config::ResponseHeaderRule // - compilation_result.program.info().target_assignments // - compilation_result.program.info().target_queries // to determine what parts of the context are actually needed by the expression - let compilation_result = vrl_compile(expression, &ctx.vrl_functions) - .map_err(|e| { - HeaderRuleCompileError::new_expression_build(rule.name.clone(), e) - })?; - + let program = compile_expression(expression, None).map_err(|err| { + HeaderRuleCompileError::ExpressionBuild(rule.name.clone(), err) + })?; actions.push(ResponseHeaderRule::InsertExpression( ResponseInsertExpression { name: build_header_name(&rule.name)?, - expression: Box::new(compilation_result.program), + expression: Box::new(program), strategy: aggregation_strategy, }, )); @@ -196,19 +162,18 @@ impl HeaderRuleCompiler> for config::ResponseHeaderRule pub fn compile_headers_plan( cfg: &config::HeadersConfig, ) -> Result { - let ctx = HeaderRuleCompilerContext::new(); let mut request_plan = RequestHeaderRules::default(); let mut response_plan = ResponseHeaderRules::default(); if let Some(global_rules) = &cfg.all { - request_plan.global = compile_request_header_rules(&ctx, global_rules)?; - response_plan.global = compile_response_header_rules(&ctx, global_rules)?; + request_plan.global = compile_request_header_rules(global_rules)?; + response_plan.global = compile_response_header_rules(global_rules)?; } if let Some(subgraph_rules_map) = &cfg.subgraphs { for (subgraph_name, subgraph_rules) in subgraph_rules_map { - let request_actions = compile_request_header_rules(&ctx, subgraph_rules)?; - let response_actions = compile_response_header_rules(&ctx, subgraph_rules)?; + let request_actions = compile_request_header_rules(subgraph_rules)?; + let response_actions = compile_response_header_rules(subgraph_rules)?; request_plan .by_subgraph .insert(subgraph_name.clone(), request_actions); @@ -225,26 +190,24 @@ pub fn compile_headers_plan( } fn compile_request_header_rules( - ctx: &HeaderRuleCompilerContext, header_rules: &config::HeaderRules, ) -> Result, HeaderRuleCompileError> { let mut request_actions = Vec::new(); if let Some(request_rule_entries) = &header_rules.request { for request_rule in request_rule_entries { - request_rule.compile(ctx, &mut request_actions)?; + request_rule.compile(&mut request_actions)?; } } Ok(request_actions) } fn compile_response_header_rules( - ctx: &HeaderRuleCompilerContext, header_rules: &config::HeaderRules, ) -> Result, HeaderRuleCompileError> { let mut response_actions = Vec::new(); if let Some(response_rule_entries) = &header_rules.response { for response_rule in response_rule_entries { - response_rule.compile(ctx, &mut response_actions)?; + response_rule.compile(&mut response_actions)?; } } Ok(response_actions) @@ -358,7 +321,7 @@ mod tests { use http::HeaderName; use crate::headers::{ - compile::{build_header_value, HeaderRuleCompiler, HeaderRuleCompilerContext}, + compile::{build_header_value, HeaderRuleCompiler}, errors::HeaderRuleCompileError, plan::{HeaderAggregationStrategy, RequestHeaderRule, ResponseHeaderRule}, }; @@ -378,9 +341,8 @@ mod tests { rename: None, default: None, }); - let ctx = HeaderRuleCompilerContext::new(); let mut actions = Vec::new(); - rule.compile(&ctx, &mut actions).unwrap(); + rule.compile(&mut actions).unwrap(); assert_eq!(actions.len(), 1); match &actions[0] { RequestHeaderRule::PropagateNamed(data) => { @@ -401,8 +363,7 @@ mod tests { }, }); let mut actions = Vec::new(); - let ctx = HeaderRuleCompilerContext::new(); - rule.compile(&ctx, &mut actions).unwrap(); + rule.compile(&mut actions).unwrap(); assert_eq!(actions.len(), 1); match &actions[0] { RequestHeaderRule::InsertStatic(data) => { @@ -423,8 +384,7 @@ mod tests { }, }); let mut actions = Vec::new(); - let ctx = HeaderRuleCompilerContext::new(); - rule.compile(&ctx, &mut actions).unwrap(); + rule.compile(&mut actions).unwrap(); assert_eq!(actions.len(), 1); match &actions[0] { RequestHeaderRule::RemoveNamed(data) => { @@ -449,8 +409,7 @@ mod tests { default: Some("def".to_string()), }); let mut actions = Vec::new(); - let ctx = HeaderRuleCompilerContext::new(); - let err = rule.compile(&ctx, &mut actions).unwrap_err(); + let err = rule.compile(&mut actions).unwrap_err(); match err { HeaderRuleCompileError::InvalidDefault => {} _ => panic!("Expected InvalidDefault error"), @@ -470,8 +429,7 @@ mod tests { algorithm: config::AggregationAlgo::First, }); let mut actions = Vec::new(); - let ctx = HeaderRuleCompilerContext::new(); - rule.compile(&ctx, &mut actions).unwrap(); + rule.compile(&mut actions).unwrap(); assert_eq!(actions.len(), 1); match &actions[0] { ResponseHeaderRule::PropagateNamed(data) => { diff --git a/lib/executor/src/headers/errors.rs b/lib/executor/src/headers/errors.rs index d53444877..6c2a806d2 100644 --- a/lib/executor/src/headers/errors.rs +++ b/lib/executor/src/headers/errors.rs @@ -1,6 +1,6 @@ use http::header::{InvalidHeaderName, InvalidHeaderValue}; use regex_automata::meta::BuildError; -use vrl::{diagnostic::DiagnosticList, prelude::ExpressionError}; +use vrl::prelude::ExpressionError; #[derive(thiserror::Error, Debug)] pub enum HeaderRuleCompileError { @@ -27,16 +27,8 @@ pub enum HeaderRuleRuntimeError { } impl HeaderRuleCompileError { - pub fn new_expression_build(header_name: String, diagnostics: DiagnosticList) -> Self { - HeaderRuleCompileError::ExpressionBuild( - header_name, - diagnostics - .errors() - .into_iter() - .map(|d| d.code.to_string() + ": " + &d.message) - .collect::>() - .join(", "), - ) + pub fn new_expression_build(header_name: String, err: String) -> Self { + HeaderRuleCompileError::ExpressionBuild(header_name, err) } } diff --git a/lib/executor/src/headers/request.rs b/lib/executor/src/headers/request.rs index 637ab0d58..ec22de509 100644 --- a/lib/executor/src/headers/request.rs +++ b/lib/executor/src/headers/request.rs @@ -1,12 +1,4 @@ -use std::collections::BTreeMap; - use http::HeaderMap; -use vrl::{ - compiler::TargetValue as VrlTargetValue, - core::Value as VrlValue, - prelude::{state::RuntimeState as VrlState, Context as VrlContext, TimeZone as VrlTimeZone}, - value::Secrets as VrlSecrets, -}; use crate::{ execution::client_request_details::ClientRequestDetails, @@ -19,6 +11,7 @@ use crate::{ }, sanitizer::{is_denied_header, is_never_join_header}, }, + utils::expression::execute_expression_with_value, }; pub fn modify_subgraph_request_headers( @@ -174,17 +167,7 @@ impl ApplyRequestHeader for RequestInsertExpression { if is_denied_header(&self.name) { return Ok(()); } - - let mut target = VrlTargetValue { - value: ctx.into(), - metadata: VrlValue::Object(BTreeMap::new()), - secrets: VrlSecrets::default(), - }; - - let mut state = VrlState::default(); - let timezone = VrlTimeZone::default(); - let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); - let value = self.expression.resolve(&mut ctx).map_err(|err| { + let value = execute_expression_with_value(&self.expression, ctx.into()).map_err(|err| { HeaderRuleRuntimeError::new_expression_evaluation(self.name.to_string(), Box::new(err)) })?; diff --git a/lib/executor/src/headers/response.rs b/lib/executor/src/headers/response.rs index 6a5c34444..9cf45a768 100644 --- a/lib/executor/src/headers/response.rs +++ b/lib/executor/src/headers/response.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, iter::once}; +use std::iter::once; use crate::{ execution::client_request_details::ClientRequestDetails, @@ -13,16 +13,11 @@ use crate::{ }, sanitizer::is_denied_header, }, + utils::expression::execute_expression_with_value, }; use super::sanitizer::is_never_join_header; use http::{header::InvalidHeaderValue, HeaderMap, HeaderName, HeaderValue}; -use vrl::{ - compiler::TargetValue as VrlTargetValue, - core::Value as VrlValue, - prelude::{state::RuntimeState as VrlState, Context as VrlContext, TimeZone as VrlTimeZone}, - value::Secrets as VrlSecrets, -}; pub fn apply_subgraph_response_headers( header_rule_plan: &HeaderRulesPlan, @@ -194,20 +189,9 @@ impl ApplyResponseHeader for ResponseInsertExpression { if is_denied_header(&self.name) { return Ok(()); } - - let mut target = VrlTargetValue { - value: ctx.into(), - metadata: VrlValue::Object(BTreeMap::new()), - secrets: VrlSecrets::default(), - }; - - let mut state = VrlState::default(); - let timezone = VrlTimeZone::default(); - let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); - let value = self.expression.resolve(&mut ctx).map_err(|err| { - HeaderRuleRuntimeError::ExpressionEvaluation(self.name.to_string(), Box::new(err)) + let value = execute_expression_with_value(&self.expression, ctx.into()).map_err(|err| { + HeaderRuleRuntimeError::new_expression_evaluation(self.name.to_string(), Box::new(err)) })?; - if let Some(header_value) = vrl_value_to_header_value(value) { let strategy = if is_never_join_header(&self.name) { HeaderAggregationStrategy::Append diff --git a/lib/executor/src/utils/expression.rs b/lib/executor/src/utils/expression.rs new file mode 100644 index 000000000..faee2dc3b --- /dev/null +++ b/lib/executor/src/utils/expression.rs @@ -0,0 +1,49 @@ +use once_cell::sync::Lazy; +use std::collections::BTreeMap; +use vrl::{ + compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue}, + core::Value as VrlValue, + prelude::{ + state::RuntimeState as VrlState, Context as VrlContext, ExpressionError, Function, + TimeZone as VrlTimeZone, + }, + stdlib::all as vrl_build_functions, + value::Secrets as VrlSecrets, +}; + +static VRL_FUNCTIONS: Lazy>> = Lazy::new(vrl_build_functions); +static VRL_TIMEZONE: Lazy = Lazy::new(VrlTimeZone::default); + +pub fn compile_expression( + expression: &str, + functions: Option<&[Box]>, +) -> Result { + let functions = functions.unwrap_or(&VRL_FUNCTIONS); + + let compilation_result = vrl_compile(expression, functions).map_err(|diagnostics| { + diagnostics + .errors() + .iter() + .map(|d| format!("{}: {}", d.code, d.message)) + .collect::>() + .join(", ") + })?; + + Ok(compilation_result.program) +} + +pub fn execute_expression_with_value( + program: &VrlProgram, + value: VrlValue, +) -> Result { + let mut target = VrlTargetValue { + value, + metadata: VrlValue::Object(BTreeMap::new()), + secrets: VrlSecrets::default(), + }; + + let mut state = VrlState::default(); + let mut ctx = VrlContext::new(&mut target, &mut state, &VRL_TIMEZONE); + + program.resolve(&mut ctx) +} diff --git a/lib/executor/src/utils/mod.rs b/lib/executor/src/utils/mod.rs index fc4226984..0461bb8a8 100644 --- a/lib/executor/src/utils/mod.rs +++ b/lib/executor/src/utils/mod.rs @@ -1,3 +1,4 @@ pub mod consts; +pub mod expression; pub mod traverse; pub mod vrl; diff --git a/lib/router-config/src/lib.rs b/lib/router-config/src/lib.rs index 537244c9e..de74d754d 100644 --- a/lib/router-config/src/lib.rs +++ b/lib/router-config/src/lib.rs @@ -29,7 +29,7 @@ use crate::{ primitives::file_path::with_start_path, query_planner::QueryPlannerConfig, supergraph::SupergraphSource, - traffic_shaping::TrafficShapingExecutorConfig, + traffic_shaping::TrafficShapingConfig, }; #[derive(Debug, Deserialize, Serialize, JsonSchema)] @@ -62,9 +62,9 @@ pub struct HiveRouterConfig { #[serde(default)] pub http: HttpServerConfig, - /// Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs. + /// Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs. #[serde(default)] - pub traffic_shaping: TrafficShapingExecutorConfig, + pub traffic_shaping: TrafficShapingConfig, /// Configuration for the headers. #[serde(default)] diff --git a/lib/router-config/src/traffic_shaping.rs b/lib/router-config/src/traffic_shaping.rs index 02ed5ecdd..0693ac277 100644 --- a/lib/router-config/src/traffic_shaping.rs +++ b/lib/router-config/src/traffic_shaping.rs @@ -1,31 +1,28 @@ +use std::{collections::HashMap, time::Duration}; + use schemars::JsonSchema; use serde::{Deserialize, Serialize}; #[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)] #[serde(deny_unknown_fields)] -pub struct TrafficShapingExecutorConfig { +pub struct TrafficShapingConfig { + /// The default configuration that will be applied to all subgraphs, unless overridden by a specific subgraph configuration. + #[serde(default)] + pub all: TrafficShapingExecutorConfig, + /// Optional per-subgraph configurations that will override the default configuration for specific subgraphs. + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + pub subgraphs: HashMap, /// Limits the concurrent amount of requests/connections per host/subgraph. #[serde(default = "default_max_connections_per_host")] pub max_connections_per_host: usize, - - /// Timeout for idle sockets being kept-alive. - #[serde(default = "default_pool_idle_timeout_seconds")] - pub pool_idle_timeout_seconds: u64, - - /// Enables/disables request deduplication to subgraphs. - /// - /// When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will - /// be deduplicated by sharing the response of other in-flight requests. - #[serde(default = "default_dedupe_enabled")] - pub dedupe_enabled: bool, } -impl Default for TrafficShapingExecutorConfig { +impl Default for TrafficShapingConfig { fn default() -> Self { Self { + all: TrafficShapingExecutorConfig::default(), + subgraphs: HashMap::new(), max_connections_per_host: default_max_connections_per_host(), - pool_idle_timeout_seconds: default_pool_idle_timeout_seconds(), - dedupe_enabled: default_dedupe_enabled(), } } } @@ -34,10 +31,116 @@ fn default_max_connections_per_host() -> usize { 100 } -fn default_pool_idle_timeout_seconds() -> u64 { - 50 +fn default_pool_idle_timeout() -> Duration { + Duration::from_secs(50) } fn default_dedupe_enabled() -> bool { true } + +#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)] +#[serde(deny_unknown_fields)] +pub struct TrafficShapingExecutorSubgraphConfig { + /// Timeout for idle sockets being kept-alive. + #[serde( + deserialize_with = "humantime_serde::deserialize", + serialize_with = "humantime_serde::serialize" + )] + #[schemars(with = "String")] + pub pool_idle_timeout_seconds: Option, + + /// Enables/disables request deduplication to subgraphs. + /// + /// When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will + /// be deduplicated by sharing the response of other in-flight requests. + pub dedupe_enabled: Option, + + /// Optional timeout configuration for requests to subgraphs. + /// + /// Example with a fixed duration: + /// ```yaml + /// timeout: + /// duration: 5s + /// ``` + /// + /// Or with a VRL expression that can return a duration based on the operation kind: + /// ```yaml + /// timeout: + /// expression: | + /// if (.request.operation.type == "mutation") { + /// "10s" + /// } else { + /// "15s" + /// } + /// ``` + pub request_timeout: Option, +} + +#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)] +#[serde(deny_unknown_fields)] +pub struct TrafficShapingExecutorConfig { + /// Timeout for idle sockets being kept-alive. + #[serde( + default = "default_pool_idle_timeout", + deserialize_with = "humantime_serde::deserialize", + serialize_with = "humantime_serde::serialize" + )] + #[schemars(with = "String")] + pub pool_idle_timeout: Duration, + + /// Enables/disables request deduplication to subgraphs. + /// + /// When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will + /// be deduplicated by sharing the response of other in-flight requests. + #[serde(default = "default_dedupe_enabled")] + pub dedupe_enabled: bool, + + /// Optional timeout configuration for requests to subgraphs. + /// + /// Example with a fixed duration: + /// ```yaml + /// timeout: + /// duration: 5s + /// ``` + /// + /// Or with a VRL expression that can return a duration based on the operation kind: + /// ```yaml + /// timeout: + /// expression: | + /// if (.request.operation.type == "mutation") { + /// "10s" + /// } else { + /// "15s" + /// } + /// ``` + #[serde(default = "default_request_timeout")] + pub request_timeout: DurationOrExpression, +} + +fn default_request_timeout() -> DurationOrExpression { + DurationOrExpression::Duration(Duration::from_secs(30)) +} + +#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)] +#[serde(untagged)] +pub enum DurationOrExpression { + /// A fixed duration, e.g., "5s" or "100ms". + #[serde( + deserialize_with = "humantime_serde::deserialize", + serialize_with = "humantime_serde::serialize" + )] + Duration(Duration), + /// A VRL expression that evaluates to a duration. The result can be an integer (milliseconds), a float (milliseconds), or a duration string (e.g. "5s"). + Expression { expression: String }, +} + +impl Default for TrafficShapingExecutorConfig { + fn default() -> Self { + Self { + pool_idle_timeout: default_pool_idle_timeout(), + dedupe_enabled: default_dedupe_enabled(), + request_timeout: default_request_timeout(), + } + } +}