From b45e012e52b950456b00b422d6afd21088e99e12 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 31 Oct 2025 13:10:44 +0300 Subject: [PATCH 01/20] New `Expression` type to handle VRL in one place --- Cargo.lock | 1 + lib/executor/src/executors/map.rs | 48 +++------- lib/executor/src/headers/compile.rs | 90 ++++-------------- lib/executor/src/headers/errors.rs | 14 +-- lib/executor/src/headers/plan.rs | 6 +- lib/executor/src/headers/request.rs | 20 +--- lib/executor/src/headers/response.rs | 20 +--- lib/router-config/Cargo.toml | 1 + lib/router-config/src/headers.rs | 4 +- lib/router-config/src/lib.rs | 6 +- .../src/override_subgraph_urls.rs | 8 +- .../src/primitives/expression.rs | 91 +++++++++++++++++++ lib/router-config/src/primitives/mod.rs | 1 + lib/router-config/src/traffic_shaping.rs | 4 +- 14 files changed, 147 insertions(+), 167 deletions(-) create mode 100644 lib/router-config/src/primitives/expression.rs diff --git a/Cargo.lock b/Cargo.lock index 360f7cd81..f12a6ff43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2064,6 +2064,7 @@ dependencies = [ "serde_json", "thiserror 2.0.17", "tracing", + "vrl", ] [[package]] diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index a3c297ad1..0c25544d3 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -6,7 +6,9 @@ use std::{ use bytes::{BufMut, Bytes, BytesMut}; use dashmap::DashMap; -use hive_router_config::{override_subgraph_urls::UrlOrExpression, HiveRouterConfig}; +use hive_router_config::{ + override_subgraph_urls::UrlOrExpression, primitives::expression::Expression, HiveRouterConfig, +}; use http::Uri; use hyper_tls::HttpsConnector; use hyper_util::{ @@ -15,16 +17,7 @@ use hyper_util::{ }; use tokio::sync::{OnceCell, Semaphore}; use tracing::error; -use vrl::{ - compiler::compile as vrl_compile, - compiler::Program as VrlProgram, - compiler::TargetValue as VrlTargetValue, - core::Value as VrlValue, - prelude::Function as VrlFunction, - prelude::{state::RuntimeState as VrlState, Context as VrlContext, TimeZone as VrlTimeZone}, - stdlib::all as vrl_build_functions, - value::Secrets as VrlSecrets, -}; +use vrl::core::Value as VrlValue; use crate::{ execution::client_request_details::ClientRequestDetails, @@ -44,7 +37,7 @@ type SubgraphEndpoint = String; type ExecutorsBySubgraphMap = DashMap>; type EndpointsBySubgraphMap = DashMap; -type ExpressionsBySubgraphMap = HashMap; +type ExpressionsBySubgraphMap = HashMap; pub struct SubgraphExecutorMap { executors_by_subgraph: ExecutorsBySubgraphMap, @@ -54,8 +47,6 @@ pub struct SubgraphExecutorMap { /// Mapping from subgraph name to VRL expression program expressions_by_subgraph: ExpressionsBySubgraphMap, config: Arc, - /// Precompiled VRL functions to be used in endpoint expressions. - vrl_functions: Vec>, client: Arc, semaphores_by_origin: DashMap>, max_connections_per_host: usize, @@ -80,7 +71,6 @@ impl SubgraphExecutorMap { static_endpoints_by_subgraph: Default::default(), expressions_by_subgraph: Default::default(), config, - vrl_functions: vrl_build_functions(), client: Arc::new(client), semaphores_by_origin: Default::default(), max_connections_per_host, @@ -101,7 +91,7 @@ impl SubgraphExecutorMap { let endpoint_str = match endpoint_str { Some(UrlOrExpression::Url(url)) => url, - Some(UrlOrExpression::Expression { expression }) => { + Some(UrlOrExpression::Expression(expression)) => { subgraph_executor_map.register_expression(&subgraph_name, expression)?; &original_endpoint_str } @@ -194,21 +184,13 @@ impl SubgraphExecutorMap { SubgraphExecutorError::StaticEndpointNotFound(subgraph_name.to_string()) })?, )); - let mut target = VrlTargetValue { - value: VrlValue::Object(BTreeMap::from([ - ("request".into(), client_request.into()), - ("original_url".into(), original_url_value), - ])), - metadata: VrlValue::Object(BTreeMap::new()), - secrets: VrlSecrets::default(), - }; - - let mut state = VrlState::default(); - let timezone = VrlTimeZone::default(); - let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); + let value = VrlValue::Object(BTreeMap::from([ + ("request".into(), client_request.into()), + ("original_url".into(), original_url_value), + ])); // Resolve the expression to get an endpoint URL. - let endpoint_result = expression.resolve(&mut ctx).map_err(|err| { + let endpoint_result = expression.execute(value).map_err(|err| { SubgraphExecutorError::new_endpoint_expression_resolution_failure( subgraph_name.to_string(), err, @@ -267,14 +249,10 @@ impl SubgraphExecutorMap { fn register_expression( &mut self, subgraph_name: &str, - expression: &str, + expression: &Expression, ) -> Result<(), SubgraphExecutorError> { - let compilation_result = vrl_compile(expression, &self.vrl_functions).map_err(|e| { - SubgraphExecutorError::new_endpoint_expression_build(subgraph_name.to_string(), e) - })?; - self.expressions_by_subgraph - .insert(subgraph_name.to_string(), compilation_result.program); + .insert(subgraph_name.to_string(), expression.clone()); Ok(()) } diff --git a/lib/executor/src/headers/compile.rs b/lib/executor/src/headers/compile.rs index 5b1b14a9f..1cf29c8c9 100644 --- a/lib/executor/src/headers/compile.rs +++ b/lib/executor/src/headers/compile.rs @@ -12,43 +12,13 @@ use crate::headers::{ use hive_router_config::headers as config; use http::HeaderName; use regex_automata::{meta, util::syntax::Config as SyntaxConfig}; -use vrl::{ - compiler::compile as vrl_compile, prelude::Function as VrlFunction, - stdlib::all as vrl_build_functions, -}; - -pub struct HeaderRuleCompilerContext { - vrl_functions: Vec>, -} - -impl Default for HeaderRuleCompilerContext { - fn default() -> Self { - Self::new() - } -} - -impl HeaderRuleCompilerContext { - pub fn new() -> Self { - Self { - vrl_functions: vrl_build_functions(), - } - } -} pub trait HeaderRuleCompiler { - fn compile( - &self, - ctx: &HeaderRuleCompilerContext, - actions: &mut A, - ) -> Result<(), HeaderRuleCompileError>; + fn compile(&self, actions: &mut A) -> Result<(), HeaderRuleCompileError>; } impl HeaderRuleCompiler> for config::RequestHeaderRule { - fn compile( - &self, - ctx: &HeaderRuleCompilerContext, - actions: &mut Vec, - ) -> Result<(), HeaderRuleCompileError> { + fn compile(&self, actions: &mut Vec) -> Result<(), HeaderRuleCompileError> { match self { config::RequestHeaderRule::Propagate(rule) => { let spec = materialize_match_spec( @@ -78,16 +48,11 @@ impl HeaderRuleCompiler> for config::RequestHeaderRule { value: build_header_value(&rule.name, value)?, })); } - config::InsertSource::Expression { expression } => { - let compilation_result = - vrl_compile(expression, &ctx.vrl_functions).map_err(|e| { - HeaderRuleCompileError::new_expression_build(rule.name.clone(), e) - })?; - + config::InsertSource::Expression(expression) => { actions.push(RequestHeaderRule::InsertExpression( RequestInsertExpression { name: build_header_name(&rule.name)?, - expression: Box::new(compilation_result.program), + expression: Box::new(expression.clone()), }, )); } @@ -112,11 +77,7 @@ impl HeaderRuleCompiler> for config::RequestHeaderRule { } impl HeaderRuleCompiler> for config::ResponseHeaderRule { - fn compile( - &self, - ctx: &HeaderRuleCompilerContext, - actions: &mut Vec, - ) -> Result<(), HeaderRuleCompileError> { + fn compile(&self, actions: &mut Vec) -> Result<(), HeaderRuleCompileError> { match self { config::ResponseHeaderRule::Propagate(rule) => { let aggregation_strategy = rule.algorithm.into(); @@ -153,21 +114,16 @@ impl HeaderRuleCompiler> for config::ResponseHeaderRule strategy: aggregation_strategy, })); } - config::InsertSource::Expression { expression } => { + config::InsertSource::Expression(expression) => { // NOTE: In case we ever need to improve performance and not pass the whole context // to VRL expressions, we can use: // - compilation_result.program.info().target_assignments // - compilation_result.program.info().target_queries // to determine what parts of the context are actually needed by the expression - let compilation_result = vrl_compile(expression, &ctx.vrl_functions) - .map_err(|e| { - HeaderRuleCompileError::new_expression_build(rule.name.clone(), e) - })?; - actions.push(ResponseHeaderRule::InsertExpression( ResponseInsertExpression { name: build_header_name(&rule.name)?, - expression: Box::new(compilation_result.program), + expression: Box::new(expression.clone()), strategy: aggregation_strategy, }, )); @@ -196,19 +152,18 @@ impl HeaderRuleCompiler> for config::ResponseHeaderRule pub fn compile_headers_plan( cfg: &config::HeadersConfig, ) -> Result { - let ctx = HeaderRuleCompilerContext::new(); let mut request_plan = RequestHeaderRules::default(); let mut response_plan = ResponseHeaderRules::default(); if let Some(global_rules) = &cfg.all { - request_plan.global = compile_request_header_rules(&ctx, global_rules)?; - response_plan.global = compile_response_header_rules(&ctx, global_rules)?; + request_plan.global = compile_request_header_rules(global_rules)?; + response_plan.global = compile_response_header_rules(global_rules)?; } if let Some(subgraph_rules_map) = &cfg.subgraphs { for (subgraph_name, subgraph_rules) in subgraph_rules_map { - let request_actions = compile_request_header_rules(&ctx, subgraph_rules)?; - let response_actions = compile_response_header_rules(&ctx, subgraph_rules)?; + let request_actions = compile_request_header_rules(subgraph_rules)?; + let response_actions = compile_response_header_rules(subgraph_rules)?; request_plan .by_subgraph .insert(subgraph_name.clone(), request_actions); @@ -225,26 +180,24 @@ pub fn compile_headers_plan( } fn compile_request_header_rules( - ctx: &HeaderRuleCompilerContext, header_rules: &config::HeaderRules, ) -> Result, HeaderRuleCompileError> { let mut request_actions = Vec::new(); if let Some(request_rule_entries) = &header_rules.request { for request_rule in request_rule_entries { - request_rule.compile(ctx, &mut request_actions)?; + request_rule.compile(&mut request_actions)?; } } Ok(request_actions) } fn compile_response_header_rules( - ctx: &HeaderRuleCompilerContext, header_rules: &config::HeaderRules, ) -> Result, HeaderRuleCompileError> { let mut response_actions = Vec::new(); if let Some(response_rule_entries) = &header_rules.response { for response_rule in response_rule_entries { - response_rule.compile(ctx, &mut response_actions)?; + response_rule.compile(&mut response_actions)?; } } Ok(response_actions) @@ -358,7 +311,7 @@ mod tests { use http::HeaderName; use crate::headers::{ - compile::{build_header_value, HeaderRuleCompiler, HeaderRuleCompilerContext}, + compile::{build_header_value, HeaderRuleCompiler}, errors::HeaderRuleCompileError, plan::{HeaderAggregationStrategy, RequestHeaderRule, ResponseHeaderRule}, }; @@ -378,9 +331,8 @@ mod tests { rename: None, default: None, }); - let ctx = HeaderRuleCompilerContext::new(); let mut actions = Vec::new(); - rule.compile(&ctx, &mut actions).unwrap(); + rule.compile(&mut actions).unwrap(); assert_eq!(actions.len(), 1); match &actions[0] { RequestHeaderRule::PropagateNamed(data) => { @@ -401,8 +353,7 @@ mod tests { }, }); let mut actions = Vec::new(); - let ctx = HeaderRuleCompilerContext::new(); - rule.compile(&ctx, &mut actions).unwrap(); + rule.compile(&mut actions).unwrap(); assert_eq!(actions.len(), 1); match &actions[0] { RequestHeaderRule::InsertStatic(data) => { @@ -423,8 +374,7 @@ mod tests { }, }); let mut actions = Vec::new(); - let ctx = HeaderRuleCompilerContext::new(); - rule.compile(&ctx, &mut actions).unwrap(); + rule.compile(&mut actions).unwrap(); assert_eq!(actions.len(), 1); match &actions[0] { RequestHeaderRule::RemoveNamed(data) => { @@ -449,8 +399,7 @@ mod tests { default: Some("def".to_string()), }); let mut actions = Vec::new(); - let ctx = HeaderRuleCompilerContext::new(); - let err = rule.compile(&ctx, &mut actions).unwrap_err(); + let err = rule.compile(&mut actions).unwrap_err(); match err { HeaderRuleCompileError::InvalidDefault => {} _ => panic!("Expected InvalidDefault error"), @@ -470,8 +419,7 @@ mod tests { algorithm: config::AggregationAlgo::First, }); let mut actions = Vec::new(); - let ctx = HeaderRuleCompilerContext::new(); - rule.compile(&ctx, &mut actions).unwrap(); + rule.compile(&mut actions).unwrap(); assert_eq!(actions.len(), 1); match &actions[0] { ResponseHeaderRule::PropagateNamed(data) => { diff --git a/lib/executor/src/headers/errors.rs b/lib/executor/src/headers/errors.rs index d53444877..6c2a806d2 100644 --- a/lib/executor/src/headers/errors.rs +++ b/lib/executor/src/headers/errors.rs @@ -1,6 +1,6 @@ use http::header::{InvalidHeaderName, InvalidHeaderValue}; use regex_automata::meta::BuildError; -use vrl::{diagnostic::DiagnosticList, prelude::ExpressionError}; +use vrl::prelude::ExpressionError; #[derive(thiserror::Error, Debug)] pub enum HeaderRuleCompileError { @@ -27,16 +27,8 @@ pub enum HeaderRuleRuntimeError { } impl HeaderRuleCompileError { - pub fn new_expression_build(header_name: String, diagnostics: DiagnosticList) -> Self { - HeaderRuleCompileError::ExpressionBuild( - header_name, - diagnostics - .errors() - .into_iter() - .map(|d| d.code.to_string() + ": " + &d.message) - .collect::>() - .join(", "), - ) + pub fn new_expression_build(header_name: String, err: String) -> Self { + HeaderRuleCompileError::ExpressionBuild(header_name, err) } } diff --git a/lib/executor/src/headers/plan.rs b/lib/executor/src/headers/plan.rs index a1a3cf990..69e0edffd 100644 --- a/lib/executor/src/headers/plan.rs +++ b/lib/executor/src/headers/plan.rs @@ -1,7 +1,7 @@ use ahash::HashMap; +use hive_router_config::primitives::expression::Expression; use http::{HeaderName, HeaderValue}; use regex_automata::meta::Regex; -use vrl::compiler::Program as VrlProgram; #[derive(Clone)] pub struct HeaderRulesPlan { @@ -62,13 +62,13 @@ pub struct ResponseInsertStatic { #[derive(Clone)] pub struct RequestInsertExpression { pub name: HeaderName, - pub expression: Box, + pub expression: Box, } #[derive(Clone)] pub struct ResponseInsertExpression { pub name: HeaderName, - pub expression: Box, + pub expression: Box, pub strategy: HeaderAggregationStrategy, } diff --git a/lib/executor/src/headers/request.rs b/lib/executor/src/headers/request.rs index 637ab0d58..925789367 100644 --- a/lib/executor/src/headers/request.rs +++ b/lib/executor/src/headers/request.rs @@ -1,12 +1,4 @@ -use std::collections::BTreeMap; - use http::HeaderMap; -use vrl::{ - compiler::TargetValue as VrlTargetValue, - core::Value as VrlValue, - prelude::{state::RuntimeState as VrlState, Context as VrlContext, TimeZone as VrlTimeZone}, - value::Secrets as VrlSecrets, -}; use crate::{ execution::client_request_details::ClientRequestDetails, @@ -174,17 +166,7 @@ impl ApplyRequestHeader for RequestInsertExpression { if is_denied_header(&self.name) { return Ok(()); } - - let mut target = VrlTargetValue { - value: ctx.into(), - metadata: VrlValue::Object(BTreeMap::new()), - secrets: VrlSecrets::default(), - }; - - let mut state = VrlState::default(); - let timezone = VrlTimeZone::default(); - let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); - let value = self.expression.resolve(&mut ctx).map_err(|err| { + let value = self.expression.execute(ctx.into()).map_err(|err| { HeaderRuleRuntimeError::new_expression_evaluation(self.name.to_string(), Box::new(err)) })?; diff --git a/lib/executor/src/headers/response.rs b/lib/executor/src/headers/response.rs index 6a5c34444..5f7c895bd 100644 --- a/lib/executor/src/headers/response.rs +++ b/lib/executor/src/headers/response.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, iter::once}; +use std::iter::once; use crate::{ execution::client_request_details::ClientRequestDetails, @@ -17,12 +17,6 @@ use crate::{ use super::sanitizer::is_never_join_header; use http::{header::InvalidHeaderValue, HeaderMap, HeaderName, HeaderValue}; -use vrl::{ - compiler::TargetValue as VrlTargetValue, - core::Value as VrlValue, - prelude::{state::RuntimeState as VrlState, Context as VrlContext, TimeZone as VrlTimeZone}, - value::Secrets as VrlSecrets, -}; pub fn apply_subgraph_response_headers( header_rule_plan: &HeaderRulesPlan, @@ -194,17 +188,7 @@ impl ApplyResponseHeader for ResponseInsertExpression { if is_denied_header(&self.name) { return Ok(()); } - - let mut target = VrlTargetValue { - value: ctx.into(), - metadata: VrlValue::Object(BTreeMap::new()), - secrets: VrlSecrets::default(), - }; - - let mut state = VrlState::default(); - let timezone = VrlTimeZone::default(); - let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); - let value = self.expression.resolve(&mut ctx).map_err(|err| { + let value = self.expression.execute(ctx.into()).map_err(|err| { HeaderRuleRuntimeError::ExpressionEvaluation(self.name.to_string(), Box::new(err)) })?; diff --git a/lib/router-config/Cargo.toml b/lib/router-config/Cargo.toml index 93edcc3b5..56c6eb981 100644 --- a/lib/router-config/Cargo.toml +++ b/lib/router-config/Cargo.toml @@ -23,6 +23,7 @@ http = { workspace = true } jsonwebtoken = { workspace = true } retry-policies = { workspace = true} tracing = { workspace = true } +vrl = { workspace = true } schemars = "1.0.4" humantime-serde = "1.1.1" diff --git a/lib/router-config/src/headers.rs b/lib/router-config/src/headers.rs index bd0913f6f..8d30675b7 100644 --- a/lib/router-config/src/headers.rs +++ b/lib/router-config/src/headers.rs @@ -3,6 +3,8 @@ use std::collections::HashMap; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use crate::primitives::expression::Expression; + type HeaderName = String; type RegExp = String; @@ -248,7 +250,7 @@ pub enum InsertSource { /// name: x-auth-scheme /// expression: 'split(.request.headers.authorization, " ")[0] ?? "none"' /// ``` - Expression { expression: String }, + Expression(Expression), } /// Helper to allow `one` or `many` values for ergonomics (OR semantics). diff --git a/lib/router-config/src/lib.rs b/lib/router-config/src/lib.rs index 537244c9e..de74d754d 100644 --- a/lib/router-config/src/lib.rs +++ b/lib/router-config/src/lib.rs @@ -29,7 +29,7 @@ use crate::{ primitives::file_path::with_start_path, query_planner::QueryPlannerConfig, supergraph::SupergraphSource, - traffic_shaping::TrafficShapingExecutorConfig, + traffic_shaping::TrafficShapingConfig, }; #[derive(Debug, Deserialize, Serialize, JsonSchema)] @@ -62,9 +62,9 @@ pub struct HiveRouterConfig { #[serde(default)] pub http: HttpServerConfig, - /// Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs. + /// Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs. #[serde(default)] - pub traffic_shaping: TrafficShapingExecutorConfig, + pub traffic_shaping: TrafficShapingConfig, /// Configuration for the headers. #[serde(default)] diff --git a/lib/router-config/src/override_subgraph_urls.rs b/lib/router-config/src/override_subgraph_urls.rs index ddacfe044..dc32bb783 100644 --- a/lib/router-config/src/override_subgraph_urls.rs +++ b/lib/router-config/src/override_subgraph_urls.rs @@ -3,6 +3,8 @@ use std::collections::HashMap; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use crate::primitives::expression::Expression; + /// Configuration for how the Router should override subgraph URLs. /// This can be used to point to different subgraph endpoints based on environment, /// or to use dynamic expressions to determine the URL at runtime. @@ -52,7 +54,7 @@ pub enum UrlOrExpression { /// A static URL string. Url(String), /// A dynamic value computed by a VRL expression. - Expression { expression: String }, + Expression(Expression), } fn override_subgraph_urls_example_1() -> OverrideSubgraphUrlsConfig { @@ -75,9 +77,7 @@ fn override_subgraph_urls_example_1() -> OverrideSubgraphUrlsConfig { subgraphs.insert( "products".to_string(), PerSubgraphConfig { - url: UrlOrExpression::Expression { - expression: expression.to_string(), - }, + url: UrlOrExpression::Expression(expression.to_string().try_into().unwrap()), }, ); diff --git a/lib/router-config/src/primitives/expression.rs b/lib/router-config/src/primitives/expression.rs new file mode 100644 index 000000000..2df64b768 --- /dev/null +++ b/lib/router-config/src/primitives/expression.rs @@ -0,0 +1,91 @@ +use std::{borrow::Cow, collections::BTreeMap}; + +use schemars::{JsonSchema, Schema, SchemaGenerator}; +use serde::{Deserialize, Serialize}; +use vrl::{ + compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue}, + core::Value as VrlValue, + prelude::{ + state::RuntimeState as VrlState, Context as VrlContext, ExpressionError, + TimeZone as VrlTimeZone, + }, + stdlib::all as vrl_build_functions, + value::Secrets as VrlSecrets, +}; + +#[derive(Debug, Clone)] +pub struct Expression { + expression: String, + program: Box, +} + +impl Expression { + pub fn try_new(expression: String) -> Result { + let vrl_functions = vrl_build_functions(); + + let compilation_result = + vrl_compile(&expression, &vrl_functions).map_err(|diagnostics| { + diagnostics + .errors() + .into_iter() + .map(|d| d.code.to_string() + ": " + &d.message) + .collect::>() + .join(", ") + })?; + + Ok(Self { + expression, + program: Box::new(compilation_result.program), + }) + } + + pub fn execute(&self, value: VrlValue) -> Result { + let mut target = VrlTargetValue { + value, + metadata: VrlValue::Object(BTreeMap::new()), + secrets: VrlSecrets::default(), + }; + + let mut state = VrlState::default(); + let timezone = VrlTimeZone::default(); + let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); + + self.program.resolve(&mut ctx) + } +} + +impl<'de> Deserialize<'de> for Expression { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let expression = String::deserialize(deserializer)?; + Expression::try_new(expression).map_err(serde::de::Error::custom) + } +} + +impl Serialize for Expression { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(&self.expression) + } +} + +impl JsonSchema for Expression { + fn schema_name() -> Cow<'static, str> { + "Expression".into() + } + + fn json_schema(gen: &mut SchemaGenerator) -> Schema { + String::json_schema(gen) + } +} + +impl TryFrom for Expression { + type Error = String; + fn try_from(value: String) -> Result { + Expression::try_new(value) + } +} diff --git a/lib/router-config/src/primitives/mod.rs b/lib/router-config/src/primitives/mod.rs index 972582fc3..aa591928b 100644 --- a/lib/router-config/src/primitives/mod.rs +++ b/lib/router-config/src/primitives/mod.rs @@ -1,3 +1,4 @@ +pub mod expression; pub mod file_path; pub mod http_header; pub mod retry_policy; diff --git a/lib/router-config/src/traffic_shaping.rs b/lib/router-config/src/traffic_shaping.rs index 02ed5ecdd..58dd7e049 100644 --- a/lib/router-config/src/traffic_shaping.rs +++ b/lib/router-config/src/traffic_shaping.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)] #[serde(deny_unknown_fields)] -pub struct TrafficShapingExecutorConfig { +pub struct TrafficShapingConfig { /// Limits the concurrent amount of requests/connections per host/subgraph. #[serde(default = "default_max_connections_per_host")] pub max_connections_per_host: usize, @@ -20,7 +20,7 @@ pub struct TrafficShapingExecutorConfig { pub dedupe_enabled: bool, } -impl Default for TrafficShapingExecutorConfig { +impl Default for TrafficShapingConfig { fn default() -> Self { Self { max_connections_per_host: default_max_connections_per_host(), From 65b7617936e3046cb0054873a44f8c2b0b0d24ad Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 31 Oct 2025 14:24:59 +0300 Subject: [PATCH 02/20] Fix --- Cargo.lock | 1 + .../src/pipeline/progressive_override.rs | 29 +++----- lib/executor/src/executors/error.rs | 17 +---- lib/executor/src/executors/map.rs | 2 +- lib/executor/src/headers/request.rs | 12 +++- lib/executor/src/headers/response.rs | 9 ++- lib/router-config/Cargo.toml | 1 + lib/router-config/src/override_labels.rs | 7 +- .../src/primitives/expression.rs | 68 +++++++++++++++---- 9 files changed, 85 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f12a6ff43..9b55ebe70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2058,6 +2058,7 @@ dependencies = [ "http", "humantime-serde", "jsonwebtoken", + "once_cell", "retry-policies", "schemars 1.0.5", "serde", diff --git a/bin/router/src/pipeline/progressive_override.rs b/bin/router/src/pipeline/progressive_override.rs index d0b09c183..eae790709 100644 --- a/bin/router/src/pipeline/progressive_override.rs +++ b/bin/router/src/pipeline/progressive_override.rs @@ -1,6 +1,9 @@ use std::collections::{BTreeMap, HashMap, HashSet}; -use hive_router_config::override_labels::{LabelOverrideValue, OverrideLabelsConfig}; +use hive_router_config::{ + override_labels::{LabelOverrideValue, OverrideLabelsConfig}, + primitives::expression::Expression, +}; use hive_router_plan_executor::execution::client_request_details::ClientRequestDetails; use hive_router_query_planner::{ graph::{PlannerOverrideContext, PERCENTAGE_SCALE_FACTOR}, @@ -8,13 +11,12 @@ use hive_router_query_planner::{ }; use rand::Rng; use vrl::{ - compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue}, + compiler::TargetValue as VrlTargetValue, core::Value as VrlValue, prelude::{ state::RuntimeState as VrlState, Context as VrlContext, ExpressionError, TimeZone as VrlTimeZone, }, - stdlib::all as vrl_build_functions, value::Secrets as VrlSecrets, }; @@ -117,7 +119,7 @@ impl StableOverrideContext { /// It's intended to be used as a shared state in the router. pub struct OverrideLabelsEvaluator { static_enabled_labels: HashSet, - expressions: HashMap, + expressions: HashMap, } impl OverrideLabelsEvaluator { @@ -126,27 +128,14 @@ impl OverrideLabelsEvaluator { ) -> Result { let mut static_enabled_labels = HashSet::new(); let mut expressions = HashMap::new(); - let vrl_functions = vrl_build_functions(); for (label, value) in override_labels_config.iter() { match value { LabelOverrideValue::Boolean(true) => { static_enabled_labels.insert(label.clone()); } - LabelOverrideValue::Expression { expression } => { - let compilation_result = - vrl_compile(expression, &vrl_functions).map_err(|diagnostics| { - OverrideLabelsCompileError { - label: label.clone(), - error: diagnostics - .errors() - .into_iter() - .map(|d| d.code.to_string() + ": " + &d.message) - .collect::>() - .join(", "), - } - })?; - expressions.insert(label.clone(), compilation_result.program); + LabelOverrideValue::Expression(expression) => { + expressions.insert(label.clone(), expression.clone()); } _ => {} // Skip false booleans } @@ -179,7 +168,7 @@ impl OverrideLabelsEvaluator { let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); for (label, expression) in &self.expressions { - match expression.resolve(&mut ctx) { + match expression.execute_with_context(&mut ctx) { Ok(evaluated_value) => match evaluated_value { VrlValue::Boolean(true) => { active_flags.insert(label.clone()); diff --git a/lib/executor/src/executors/error.rs b/lib/executor/src/executors/error.rs index 2234f524c..de69d47cb 100644 --- a/lib/executor/src/executors/error.rs +++ b/lib/executor/src/executors/error.rs @@ -1,4 +1,4 @@ -use vrl::{diagnostic::DiagnosticList, prelude::ExpressionError}; +use vrl::prelude::ExpressionError; use crate::response::graphql_error::{GraphQLError, GraphQLErrorExtensions}; @@ -34,21 +34,6 @@ impl From for GraphQLError { } impl SubgraphExecutorError { - pub fn new_endpoint_expression_build( - subgraph_name: String, - diagnostics: DiagnosticList, - ) -> Self { - SubgraphExecutorError::EndpointExpressionBuild( - subgraph_name, - diagnostics - .errors() - .into_iter() - .map(|d| d.code.to_string() + ": " + &d.message) - .collect::>() - .join(", "), - ) - } - pub fn new_endpoint_expression_resolution_failure( subgraph_name: String, error: ExpressionError, diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index 0c25544d3..e4e858d96 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -190,7 +190,7 @@ impl SubgraphExecutorMap { ])); // Resolve the expression to get an endpoint URL. - let endpoint_result = expression.execute(value).map_err(|err| { + let endpoint_result = expression.execute_with_value(value).map_err(|err| { SubgraphExecutorError::new_endpoint_expression_resolution_failure( subgraph_name.to_string(), err, diff --git a/lib/executor/src/headers/request.rs b/lib/executor/src/headers/request.rs index 925789367..aa3960c05 100644 --- a/lib/executor/src/headers/request.rs +++ b/lib/executor/src/headers/request.rs @@ -166,9 +166,15 @@ impl ApplyRequestHeader for RequestInsertExpression { if is_denied_header(&self.name) { return Ok(()); } - let value = self.expression.execute(ctx.into()).map_err(|err| { - HeaderRuleRuntimeError::new_expression_evaluation(self.name.to_string(), Box::new(err)) - })?; + let value = self + .expression + .execute_with_value(ctx.into()) + .map_err(|err| { + HeaderRuleRuntimeError::new_expression_evaluation( + self.name.to_string(), + Box::new(err), + ) + })?; if let Some(header_value) = vrl_value_to_header_value(value) { if is_never_join_header(&self.name) { diff --git a/lib/executor/src/headers/response.rs b/lib/executor/src/headers/response.rs index 5f7c895bd..3c2085644 100644 --- a/lib/executor/src/headers/response.rs +++ b/lib/executor/src/headers/response.rs @@ -188,9 +188,12 @@ impl ApplyResponseHeader for ResponseInsertExpression { if is_denied_header(&self.name) { return Ok(()); } - let value = self.expression.execute(ctx.into()).map_err(|err| { - HeaderRuleRuntimeError::ExpressionEvaluation(self.name.to_string(), Box::new(err)) - })?; + let value = self + .expression + .execute_with_value(ctx.into()) + .map_err(|err| { + HeaderRuleRuntimeError::ExpressionEvaluation(self.name.to_string(), Box::new(err)) + })?; if let Some(header_value) = vrl_value_to_header_value(value) { let strategy = if is_never_join_header(&self.name) { diff --git a/lib/router-config/Cargo.toml b/lib/router-config/Cargo.toml index 56c6eb981..9efaf9405 100644 --- a/lib/router-config/Cargo.toml +++ b/lib/router-config/Cargo.toml @@ -25,6 +25,7 @@ retry-policies = { workspace = true} tracing = { workspace = true } vrl = { workspace = true } +once_cell = "1.21.3" schemars = "1.0.4" humantime-serde = "1.1.1" config = { version = "0.15.14", features = ["yaml", "json", "json5"] } diff --git a/lib/router-config/src/override_labels.rs b/lib/router-config/src/override_labels.rs index b3dc01a75..a7006f8b1 100644 --- a/lib/router-config/src/override_labels.rs +++ b/lib/router-config/src/override_labels.rs @@ -2,6 +2,8 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use crate::primitives::expression::Expression; + /// A map of label names to their override configuration. pub type OverrideLabelsConfig = HashMap; @@ -15,10 +17,7 @@ pub enum LabelOverrideValue { /// A static boolean value to enable or disable the label. Boolean(bool), /// A dynamic value computed by an expression. - Expression { - /// An expression that must evaluate to a boolean. If true, the label will be applied. - expression: String, - }, + Expression(Expression), } impl LabelOverrideValue { diff --git a/lib/router-config/src/primitives/expression.rs b/lib/router-config/src/primitives/expression.rs index 2df64b768..b2bd226ac 100644 --- a/lib/router-config/src/primitives/expression.rs +++ b/lib/router-config/src/primitives/expression.rs @@ -1,12 +1,12 @@ -use std::{borrow::Cow, collections::BTreeMap}; - +use once_cell::sync::Lazy; use schemars::{JsonSchema, Schema, SchemaGenerator}; use serde::{Deserialize, Serialize}; +use std::{borrow::Cow, collections::BTreeMap}; use vrl::{ compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue}, core::Value as VrlValue, prelude::{ - state::RuntimeState as VrlState, Context as VrlContext, ExpressionError, + state::RuntimeState as VrlState, Context as VrlContext, ExpressionError, Function, TimeZone as VrlTimeZone, }, stdlib::all as vrl_build_functions, @@ -19,16 +19,17 @@ pub struct Expression { program: Box, } +static VRL_FUNCTIONS: Lazy>> = Lazy::new(vrl_build_functions); +static VRL_TIMEZONE: Lazy = Lazy::new(VrlTimeZone::default); + impl Expression { pub fn try_new(expression: String) -> Result { - let vrl_functions = vrl_build_functions(); - let compilation_result = - vrl_compile(&expression, &vrl_functions).map_err(|diagnostics| { + vrl_compile(&expression, &VRL_FUNCTIONS).map_err(|diagnostics| { diagnostics .errors() - .into_iter() - .map(|d| d.code.to_string() + ": " + &d.message) + .iter() + .map(|d| format!("{}: {}", d.code, d.message)) .collect::>() .join(", ") })?; @@ -39,7 +40,7 @@ impl Expression { }) } - pub fn execute(&self, value: VrlValue) -> Result { + pub fn execute_with_value(&self, value: VrlValue) -> Result { let mut target = VrlTargetValue { value, metadata: VrlValue::Object(BTreeMap::new()), @@ -47,10 +48,13 @@ impl Expression { }; let mut state = VrlState::default(); - let timezone = VrlTimeZone::default(); - let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); + let mut ctx = VrlContext::new(&mut target, &mut state, &VRL_TIMEZONE); + + self.execute_with_context(&mut ctx) + } - self.program.resolve(&mut ctx) + pub fn execute_with_context(&self, ctx: &mut VrlContext) -> Result { + self.program.resolve(ctx) } } @@ -59,8 +63,44 @@ impl<'de> Deserialize<'de> for Expression { where D: serde::Deserializer<'de>, { - let expression = String::deserialize(deserializer)?; - Expression::try_new(expression).map_err(serde::de::Error::custom) + struct ExpressionVisitor; + impl<'de> serde::de::Visitor<'de> for ExpressionVisitor { + type Value = Expression; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a map for Expression") + } + + fn visit_map(self, mut map: A) -> Result + where + A: serde::de::MapAccess<'de>, + { + let mut expression_str: Option = None; + + while let Some(key) = map.next_key::()? { + match key.as_str() { + "expression" => { + if expression_str.is_some() { + return Err(serde::de::Error::duplicate_field("expression")); + } + expression_str = Some(map.next_value()?); + } + other_key => { + return Err(serde::de::Error::unknown_field( + other_key, + &["expression"], + )); + } + } + } + + let expression_str = + expression_str.ok_or_else(|| serde::de::Error::missing_field("expression"))?; + + Expression::try_new(expression_str).map_err(serde::de::Error::custom) + } + } + deserializer.deserialize_map(ExpressionVisitor) } } From c9ca4841221ca763ed7c26ae77d94cf209545159 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 31 Oct 2025 14:36:56 +0300 Subject: [PATCH 03/20] Readme --- docs/README.md | 12 +++++----- .../src/primitives/expression.rs | 23 +++++++++++++++---- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/docs/README.md b/docs/README.md index fb474b9d2..1b0c8e40e 100644 --- a/docs/README.md +++ b/docs/README.md @@ -15,7 +15,7 @@ |[**override\_subgraph\_urls**](#override_subgraph_urls)|`object`|Configuration for overriding subgraph URLs.
Default: `{}`
|| |[**query\_planner**](#query_planner)|`object`|Query planning configuration.
Default: `{"allow_expose":false,"timeout":"10s"}`
|| |[**supergraph**](#supergraph)|`object`|Configuration for the Federation supergraph source. By default, the router will use a local file-based supergraph source (`./supergraph.graphql`).
|| -|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs.
Default: `{"dedupe_enabled":true,"max_connections_per_host":100,"pool_idle_timeout_seconds":50}`
|| +|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs.
Default: `{"dedupe_enabled":true,"max_connections_per_host":100,"pool_idle_timeout_seconds":50}`
|| **Additional Properties:** not allowed **Example** @@ -641,7 +641,7 @@ For more information on the available functions and syntax, see the |Name|Type|Description|Required| |----|----|-----------|--------| -|**expression**|`string`||yes| +|**expression**|`string`|The VRL expression string.
|yes|
@@ -863,7 +863,7 @@ For more information on the available functions and syntax, see the |Name|Type|Description|Required| |----|----|-----------|--------| -|**expression**|`string`||yes| +|**expression**|`string`|The VRL expression string.
|yes| @@ -1116,7 +1116,7 @@ For more information on the available functions and syntax, see the |Name|Type|Description|Required| |----|----|-----------|--------| -|**expression**|`string`||yes| +|**expression**|`string`|The VRL expression string.
|yes| @@ -1338,7 +1338,7 @@ For more information on the available functions and syntax, see the |Name|Type|Description|Required| |----|----|-----------|--------| -|**expression**|`string`||yes| +|**expression**|`string`|The VRL expression string.
|yes| @@ -1808,7 +1808,7 @@ Request timeout for the Hive Console CDN requests. ## traffic\_shaping: object -Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs. +Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs. **Properties** diff --git a/lib/router-config/src/primitives/expression.rs b/lib/router-config/src/primitives/expression.rs index b2bd226ac..44da62957 100644 --- a/lib/router-config/src/primitives/expression.rs +++ b/lib/router-config/src/primitives/expression.rs @@ -1,6 +1,6 @@ use once_cell::sync::Lazy; -use schemars::{JsonSchema, Schema, SchemaGenerator}; -use serde::{Deserialize, Serialize}; +use schemars::{JsonSchema, Schema, SchemaGenerator, json_schema}; +use serde::{Deserialize, Serialize, ser::SerializeStruct}; use std::{borrow::Cow, collections::BTreeMap}; use vrl::{ compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue}, @@ -109,7 +109,9 @@ impl Serialize for Expression { where S: serde::Serializer, { - serializer.serialize_str(&self.expression) + let mut state = serializer.serialize_struct("Expression", 1)?; + state.serialize_field("expression", &self.expression)?; + state.end() } } @@ -118,8 +120,19 @@ impl JsonSchema for Expression { "Expression".into() } - fn json_schema(gen: &mut SchemaGenerator) -> Schema { - String::json_schema(gen) + fn json_schema(_gen: &mut SchemaGenerator) -> Schema { + json_schema!({ + "type": "object", + "description": "A VRL expression used for dynamic evaluations.", + "properties": { + "expression": { + "type": "string", + "description": "The VRL expression string." + } + }, + "required": ["expression"], + "additionalProperties": false + }) } } From dae1e066a8f1efdd70d4b0ef8d7e3078cd8ecb1e Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 31 Oct 2025 14:38:14 +0300 Subject: [PATCH 04/20] Format --- lib/router-config/src/primitives/expression.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/router-config/src/primitives/expression.rs b/lib/router-config/src/primitives/expression.rs index 44da62957..5b60113b7 100644 --- a/lib/router-config/src/primitives/expression.rs +++ b/lib/router-config/src/primitives/expression.rs @@ -1,6 +1,6 @@ use once_cell::sync::Lazy; -use schemars::{JsonSchema, Schema, SchemaGenerator, json_schema}; -use serde::{Deserialize, Serialize, ser::SerializeStruct}; +use schemars::{json_schema, JsonSchema, Schema, SchemaGenerator}; +use serde::{ser::SerializeStruct, Deserialize, Serialize}; use std::{borrow::Cow, collections::BTreeMap}; use vrl::{ compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue}, From cb637333168e704d9f12b1f07231de2780bf994d Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 31 Oct 2025 18:01:37 +0300 Subject: [PATCH 05/20] Revert --- Cargo.lock | 4 +- .../src/pipeline/progressive_override.rs | 22 ++- docs/README.md | 8 +- lib/executor/Cargo.toml | 2 + lib/executor/src/executors/map.rs | 31 ++-- lib/executor/src/headers/compile.rs | 34 +++-- lib/executor/src/headers/plan.rs | 6 +- lib/executor/src/headers/request.rs | 13 +- lib/executor/src/headers/response.rs | 11 +- lib/executor/src/utils/expression.rs | 49 ++++++ lib/executor/src/utils/mod.rs | 1 + lib/router-config/Cargo.toml | 2 - lib/router-config/src/headers.rs | 4 +- lib/router-config/src/override_labels.rs | 4 +- .../src/override_subgraph_urls.rs | 8 +- .../src/primitives/expression.rs | 144 ------------------ lib/router-config/src/primitives/mod.rs | 1 - 17 files changed, 128 insertions(+), 216 deletions(-) create mode 100644 lib/executor/src/utils/expression.rs delete mode 100644 lib/router-config/src/primitives/expression.rs diff --git a/Cargo.lock b/Cargo.lock index 9b55ebe70..261fea428 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2058,14 +2058,12 @@ dependencies = [ "http", "humantime-serde", "jsonwebtoken", - "once_cell", "retry-policies", "schemars 1.0.5", "serde", "serde_json", "thiserror 2.0.17", "tracing", - "vrl", ] [[package]] @@ -2092,9 +2090,11 @@ dependencies = [ "insta", "itoa", "ntex-http", + "once_cell", "ordered-float", "regex-automata", "ryu", + "schemars 1.0.4", "serde", "sonic-rs", "strum 0.27.2", diff --git a/bin/router/src/pipeline/progressive_override.rs b/bin/router/src/pipeline/progressive_override.rs index eae790709..0c2ad9c15 100644 --- a/bin/router/src/pipeline/progressive_override.rs +++ b/bin/router/src/pipeline/progressive_override.rs @@ -1,16 +1,16 @@ use std::collections::{BTreeMap, HashMap, HashSet}; -use hive_router_config::{ - override_labels::{LabelOverrideValue, OverrideLabelsConfig}, - primitives::expression::Expression, +use hive_router_config::override_labels::{LabelOverrideValue, OverrideLabelsConfig}; +use hive_router_plan_executor::{ + execution::client_request_details::ClientRequestDetails, utils::expression::compile_expression, }; -use hive_router_plan_executor::execution::client_request_details::ClientRequestDetails; use hive_router_query_planner::{ graph::{PlannerOverrideContext, PERCENTAGE_SCALE_FACTOR}, state::supergraph_state::SupergraphState, }; use rand::Rng; use vrl::{ + compiler::Program as VrlProgram, compiler::TargetValue as VrlTargetValue, core::Value as VrlValue, prelude::{ @@ -119,7 +119,7 @@ impl StableOverrideContext { /// It's intended to be used as a shared state in the router. pub struct OverrideLabelsEvaluator { static_enabled_labels: HashSet, - expressions: HashMap, + expressions: HashMap, } impl OverrideLabelsEvaluator { @@ -134,8 +134,14 @@ impl OverrideLabelsEvaluator { LabelOverrideValue::Boolean(true) => { static_enabled_labels.insert(label.clone()); } - LabelOverrideValue::Expression(expression) => { - expressions.insert(label.clone(), expression.clone()); + LabelOverrideValue::Expression { expression } => { + let program = compile_expression(expression, None).map_err(|err| { + OverrideLabelsCompileError { + label: label.clone(), + error: err.to_string(), + } + })?; + expressions.insert(label.clone(), program); } _ => {} // Skip false booleans } @@ -168,7 +174,7 @@ impl OverrideLabelsEvaluator { let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); for (label, expression) in &self.expressions { - match expression.execute_with_context(&mut ctx) { + match expression.resolve(&mut ctx) { Ok(evaluated_value) => match evaluated_value { VrlValue::Boolean(true) => { active_flags.insert(label.clone()); diff --git a/docs/README.md b/docs/README.md index 1b0c8e40e..a01d56293 100644 --- a/docs/README.md +++ b/docs/README.md @@ -641,7 +641,7 @@ For more information on the available functions and syntax, see the |Name|Type|Description|Required| |----|----|-----------|--------| -|**expression**|`string`|The VRL expression string.
|yes| +|**expression**|`string`||yes| @@ -863,7 +863,7 @@ For more information on the available functions and syntax, see the |Name|Type|Description|Required| |----|----|-----------|--------| -|**expression**|`string`|The VRL expression string.
|yes| +|**expression**|`string`||yes| @@ -1116,7 +1116,7 @@ For more information on the available functions and syntax, see the |Name|Type|Description|Required| |----|----|-----------|--------| -|**expression**|`string`|The VRL expression string.
|yes| +|**expression**|`string`||yes| @@ -1338,7 +1338,7 @@ For more information on the available functions and syntax, see the |Name|Type|Description|Required| |----|----|-----------|--------| -|**expression**|`string`|The VRL expression string.
|yes| +|**expression**|`string`||yes| diff --git a/lib/executor/Cargo.toml b/lib/executor/Cargo.toml index c3f6f9117..06bcf2904 100644 --- a/lib/executor/Cargo.toml +++ b/lib/executor/Cargo.toml @@ -49,6 +49,8 @@ itoa = "1.0.15" ryu = "1.0.20" indexmap = "2.10.0" bumpalo = "3.19.0" +once_cell = "1.21.3" +schemars = "1.0.4" [dev-dependencies] subgraphs = { path = "../../bench/subgraphs" } diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index e4e858d96..93247ec80 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -6,9 +6,7 @@ use std::{ use bytes::{BufMut, Bytes, BytesMut}; use dashmap::DashMap; -use hive_router_config::{ - override_subgraph_urls::UrlOrExpression, primitives::expression::Expression, HiveRouterConfig, -}; +use hive_router_config::{override_subgraph_urls::UrlOrExpression, HiveRouterConfig}; use http::Uri; use hyper_tls::HttpsConnector; use hyper_util::{ @@ -17,7 +15,7 @@ use hyper_util::{ }; use tokio::sync::{OnceCell, Semaphore}; use tracing::error; -use vrl::core::Value as VrlValue; +use vrl::{compiler::Program as VrlProgram, core::Value as VrlValue}; use crate::{ execution::client_request_details::ClientRequestDetails, @@ -30,6 +28,7 @@ use crate::{ http::{HTTPSubgraphExecutor, HttpClient}, }, response::graphql_error::GraphQLError, + utils::expression::{compile_expression, execute_expression_with_value}, }; type SubgraphName = String; @@ -37,7 +36,7 @@ type SubgraphEndpoint = String; type ExecutorsBySubgraphMap = DashMap>; type EndpointsBySubgraphMap = DashMap; -type ExpressionsBySubgraphMap = HashMap; +type ExpressionsBySubgraphMap = HashMap; pub struct SubgraphExecutorMap { executors_by_subgraph: ExecutorsBySubgraphMap, @@ -91,7 +90,7 @@ impl SubgraphExecutorMap { let endpoint_str = match endpoint_str { Some(UrlOrExpression::Url(url)) => url, - Some(UrlOrExpression::Expression(expression)) => { + Some(UrlOrExpression::Expression { expression }) => { subgraph_executor_map.register_expression(&subgraph_name, expression)?; &original_endpoint_str } @@ -190,12 +189,13 @@ impl SubgraphExecutorMap { ])); // Resolve the expression to get an endpoint URL. - let endpoint_result = expression.execute_with_value(value).map_err(|err| { - SubgraphExecutorError::new_endpoint_expression_resolution_failure( - subgraph_name.to_string(), - err, - ) - })?; + let endpoint_result = + execute_expression_with_value(expression, value).map_err(|err| { + SubgraphExecutorError::new_endpoint_expression_resolution_failure( + subgraph_name.to_string(), + err, + ) + })?; let endpoint_str = match endpoint_result.as_str() { Some(s) => s.to_string(), None => { @@ -249,10 +249,13 @@ impl SubgraphExecutorMap { fn register_expression( &mut self, subgraph_name: &str, - expression: &Expression, + expression: &str, ) -> Result<(), SubgraphExecutorError> { + let program = compile_expression(expression, None).map_err(|err| { + SubgraphExecutorError::EndpointExpressionBuild(subgraph_name.to_string(), err) + })?; self.expressions_by_subgraph - .insert(subgraph_name.to_string(), expression.clone()); + .insert(subgraph_name.to_string(), program); Ok(()) } diff --git a/lib/executor/src/headers/compile.rs b/lib/executor/src/headers/compile.rs index 1cf29c8c9..587cbee60 100644 --- a/lib/executor/src/headers/compile.rs +++ b/lib/executor/src/headers/compile.rs @@ -1,12 +1,16 @@ -use crate::headers::{ - errors::HeaderRuleCompileError, - plan::{ - HeaderAggregationStrategy, HeaderRulesPlan, RequestHeaderRule, RequestHeaderRules, - RequestInsertExpression, RequestInsertStatic, RequestPropagateNamed, RequestPropagateRegex, - RequestRemoveNamed, RequestRemoveRegex, ResponseHeaderRule, ResponseHeaderRules, - ResponseInsertExpression, ResponseInsertStatic, ResponsePropagateNamed, - ResponsePropagateRegex, ResponseRemoveNamed, ResponseRemoveRegex, +use crate::{ + headers::{ + errors::HeaderRuleCompileError, + plan::{ + HeaderAggregationStrategy, HeaderRulesPlan, RequestHeaderRule, RequestHeaderRules, + RequestInsertExpression, RequestInsertStatic, RequestPropagateNamed, + RequestPropagateRegex, RequestRemoveNamed, RequestRemoveRegex, ResponseHeaderRule, + ResponseHeaderRules, ResponseInsertExpression, ResponseInsertStatic, + ResponsePropagateNamed, ResponsePropagateRegex, ResponseRemoveNamed, + ResponseRemoveRegex, + }, }, + utils::expression::compile_expression, }; use hive_router_config::headers as config; @@ -48,11 +52,14 @@ impl HeaderRuleCompiler> for config::RequestHeaderRule { value: build_header_value(&rule.name, value)?, })); } - config::InsertSource::Expression(expression) => { + config::InsertSource::Expression { expression } => { + let program = compile_expression(expression, None).map_err(|err| { + HeaderRuleCompileError::ExpressionBuild(rule.name.clone(), err) + })?; actions.push(RequestHeaderRule::InsertExpression( RequestInsertExpression { name: build_header_name(&rule.name)?, - expression: Box::new(expression.clone()), + expression: Box::new(program), }, )); } @@ -114,16 +121,19 @@ impl HeaderRuleCompiler> for config::ResponseHeaderRule strategy: aggregation_strategy, })); } - config::InsertSource::Expression(expression) => { + config::InsertSource::Expression { expression } => { // NOTE: In case we ever need to improve performance and not pass the whole context // to VRL expressions, we can use: // - compilation_result.program.info().target_assignments // - compilation_result.program.info().target_queries // to determine what parts of the context are actually needed by the expression + let program = compile_expression(expression, None).map_err(|err| { + HeaderRuleCompileError::ExpressionBuild(rule.name.clone(), err) + })?; actions.push(ResponseHeaderRule::InsertExpression( ResponseInsertExpression { name: build_header_name(&rule.name)?, - expression: Box::new(expression.clone()), + expression: Box::new(program), strategy: aggregation_strategy, }, )); diff --git a/lib/executor/src/headers/plan.rs b/lib/executor/src/headers/plan.rs index 69e0edffd..a1a3cf990 100644 --- a/lib/executor/src/headers/plan.rs +++ b/lib/executor/src/headers/plan.rs @@ -1,7 +1,7 @@ use ahash::HashMap; -use hive_router_config::primitives::expression::Expression; use http::{HeaderName, HeaderValue}; use regex_automata::meta::Regex; +use vrl::compiler::Program as VrlProgram; #[derive(Clone)] pub struct HeaderRulesPlan { @@ -62,13 +62,13 @@ pub struct ResponseInsertStatic { #[derive(Clone)] pub struct RequestInsertExpression { pub name: HeaderName, - pub expression: Box, + pub expression: Box, } #[derive(Clone)] pub struct ResponseInsertExpression { pub name: HeaderName, - pub expression: Box, + pub expression: Box, pub strategy: HeaderAggregationStrategy, } diff --git a/lib/executor/src/headers/request.rs b/lib/executor/src/headers/request.rs index aa3960c05..ec22de509 100644 --- a/lib/executor/src/headers/request.rs +++ b/lib/executor/src/headers/request.rs @@ -11,6 +11,7 @@ use crate::{ }, sanitizer::{is_denied_header, is_never_join_header}, }, + utils::expression::execute_expression_with_value, }; pub fn modify_subgraph_request_headers( @@ -166,15 +167,9 @@ impl ApplyRequestHeader for RequestInsertExpression { if is_denied_header(&self.name) { return Ok(()); } - let value = self - .expression - .execute_with_value(ctx.into()) - .map_err(|err| { - HeaderRuleRuntimeError::new_expression_evaluation( - self.name.to_string(), - Box::new(err), - ) - })?; + let value = execute_expression_with_value(&self.expression, ctx.into()).map_err(|err| { + HeaderRuleRuntimeError::new_expression_evaluation(self.name.to_string(), Box::new(err)) + })?; if let Some(header_value) = vrl_value_to_header_value(value) { if is_never_join_header(&self.name) { diff --git a/lib/executor/src/headers/response.rs b/lib/executor/src/headers/response.rs index 3c2085644..9cf45a768 100644 --- a/lib/executor/src/headers/response.rs +++ b/lib/executor/src/headers/response.rs @@ -13,6 +13,7 @@ use crate::{ }, sanitizer::is_denied_header, }, + utils::expression::execute_expression_with_value, }; use super::sanitizer::is_never_join_header; @@ -188,13 +189,9 @@ impl ApplyResponseHeader for ResponseInsertExpression { if is_denied_header(&self.name) { return Ok(()); } - let value = self - .expression - .execute_with_value(ctx.into()) - .map_err(|err| { - HeaderRuleRuntimeError::ExpressionEvaluation(self.name.to_string(), Box::new(err)) - })?; - + let value = execute_expression_with_value(&self.expression, ctx.into()).map_err(|err| { + HeaderRuleRuntimeError::new_expression_evaluation(self.name.to_string(), Box::new(err)) + })?; if let Some(header_value) = vrl_value_to_header_value(value) { let strategy = if is_never_join_header(&self.name) { HeaderAggregationStrategy::Append diff --git a/lib/executor/src/utils/expression.rs b/lib/executor/src/utils/expression.rs new file mode 100644 index 000000000..faee2dc3b --- /dev/null +++ b/lib/executor/src/utils/expression.rs @@ -0,0 +1,49 @@ +use once_cell::sync::Lazy; +use std::collections::BTreeMap; +use vrl::{ + compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue}, + core::Value as VrlValue, + prelude::{ + state::RuntimeState as VrlState, Context as VrlContext, ExpressionError, Function, + TimeZone as VrlTimeZone, + }, + stdlib::all as vrl_build_functions, + value::Secrets as VrlSecrets, +}; + +static VRL_FUNCTIONS: Lazy>> = Lazy::new(vrl_build_functions); +static VRL_TIMEZONE: Lazy = Lazy::new(VrlTimeZone::default); + +pub fn compile_expression( + expression: &str, + functions: Option<&[Box]>, +) -> Result { + let functions = functions.unwrap_or(&VRL_FUNCTIONS); + + let compilation_result = vrl_compile(expression, functions).map_err(|diagnostics| { + diagnostics + .errors() + .iter() + .map(|d| format!("{}: {}", d.code, d.message)) + .collect::>() + .join(", ") + })?; + + Ok(compilation_result.program) +} + +pub fn execute_expression_with_value( + program: &VrlProgram, + value: VrlValue, +) -> Result { + let mut target = VrlTargetValue { + value, + metadata: VrlValue::Object(BTreeMap::new()), + secrets: VrlSecrets::default(), + }; + + let mut state = VrlState::default(); + let mut ctx = VrlContext::new(&mut target, &mut state, &VRL_TIMEZONE); + + program.resolve(&mut ctx) +} diff --git a/lib/executor/src/utils/mod.rs b/lib/executor/src/utils/mod.rs index fc4226984..0461bb8a8 100644 --- a/lib/executor/src/utils/mod.rs +++ b/lib/executor/src/utils/mod.rs @@ -1,3 +1,4 @@ pub mod consts; +pub mod expression; pub mod traverse; pub mod vrl; diff --git a/lib/router-config/Cargo.toml b/lib/router-config/Cargo.toml index 9efaf9405..93edcc3b5 100644 --- a/lib/router-config/Cargo.toml +++ b/lib/router-config/Cargo.toml @@ -23,9 +23,7 @@ http = { workspace = true } jsonwebtoken = { workspace = true } retry-policies = { workspace = true} tracing = { workspace = true } -vrl = { workspace = true } -once_cell = "1.21.3" schemars = "1.0.4" humantime-serde = "1.1.1" config = { version = "0.15.14", features = ["yaml", "json", "json5"] } diff --git a/lib/router-config/src/headers.rs b/lib/router-config/src/headers.rs index 8d30675b7..bd0913f6f 100644 --- a/lib/router-config/src/headers.rs +++ b/lib/router-config/src/headers.rs @@ -3,8 +3,6 @@ use std::collections::HashMap; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use crate::primitives::expression::Expression; - type HeaderName = String; type RegExp = String; @@ -250,7 +248,7 @@ pub enum InsertSource { /// name: x-auth-scheme /// expression: 'split(.request.headers.authorization, " ")[0] ?? "none"' /// ``` - Expression(Expression), + Expression { expression: String }, } /// Helper to allow `one` or `many` values for ergonomics (OR semantics). diff --git a/lib/router-config/src/override_labels.rs b/lib/router-config/src/override_labels.rs index a7006f8b1..cbb7f28d1 100644 --- a/lib/router-config/src/override_labels.rs +++ b/lib/router-config/src/override_labels.rs @@ -2,8 +2,6 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use crate::primitives::expression::Expression; - /// A map of label names to their override configuration. pub type OverrideLabelsConfig = HashMap; @@ -17,7 +15,7 @@ pub enum LabelOverrideValue { /// A static boolean value to enable or disable the label. Boolean(bool), /// A dynamic value computed by an expression. - Expression(Expression), + Expression { expression: String }, } impl LabelOverrideValue { diff --git a/lib/router-config/src/override_subgraph_urls.rs b/lib/router-config/src/override_subgraph_urls.rs index dc32bb783..ddacfe044 100644 --- a/lib/router-config/src/override_subgraph_urls.rs +++ b/lib/router-config/src/override_subgraph_urls.rs @@ -3,8 +3,6 @@ use std::collections::HashMap; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use crate::primitives::expression::Expression; - /// Configuration for how the Router should override subgraph URLs. /// This can be used to point to different subgraph endpoints based on environment, /// or to use dynamic expressions to determine the URL at runtime. @@ -54,7 +52,7 @@ pub enum UrlOrExpression { /// A static URL string. Url(String), /// A dynamic value computed by a VRL expression. - Expression(Expression), + Expression { expression: String }, } fn override_subgraph_urls_example_1() -> OverrideSubgraphUrlsConfig { @@ -77,7 +75,9 @@ fn override_subgraph_urls_example_1() -> OverrideSubgraphUrlsConfig { subgraphs.insert( "products".to_string(), PerSubgraphConfig { - url: UrlOrExpression::Expression(expression.to_string().try_into().unwrap()), + url: UrlOrExpression::Expression { + expression: expression.to_string(), + }, }, ); diff --git a/lib/router-config/src/primitives/expression.rs b/lib/router-config/src/primitives/expression.rs deleted file mode 100644 index 5b60113b7..000000000 --- a/lib/router-config/src/primitives/expression.rs +++ /dev/null @@ -1,144 +0,0 @@ -use once_cell::sync::Lazy; -use schemars::{json_schema, JsonSchema, Schema, SchemaGenerator}; -use serde::{ser::SerializeStruct, Deserialize, Serialize}; -use std::{borrow::Cow, collections::BTreeMap}; -use vrl::{ - compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue}, - core::Value as VrlValue, - prelude::{ - state::RuntimeState as VrlState, Context as VrlContext, ExpressionError, Function, - TimeZone as VrlTimeZone, - }, - stdlib::all as vrl_build_functions, - value::Secrets as VrlSecrets, -}; - -#[derive(Debug, Clone)] -pub struct Expression { - expression: String, - program: Box, -} - -static VRL_FUNCTIONS: Lazy>> = Lazy::new(vrl_build_functions); -static VRL_TIMEZONE: Lazy = Lazy::new(VrlTimeZone::default); - -impl Expression { - pub fn try_new(expression: String) -> Result { - let compilation_result = - vrl_compile(&expression, &VRL_FUNCTIONS).map_err(|diagnostics| { - diagnostics - .errors() - .iter() - .map(|d| format!("{}: {}", d.code, d.message)) - .collect::>() - .join(", ") - })?; - - Ok(Self { - expression, - program: Box::new(compilation_result.program), - }) - } - - pub fn execute_with_value(&self, value: VrlValue) -> Result { - let mut target = VrlTargetValue { - value, - metadata: VrlValue::Object(BTreeMap::new()), - secrets: VrlSecrets::default(), - }; - - let mut state = VrlState::default(); - let mut ctx = VrlContext::new(&mut target, &mut state, &VRL_TIMEZONE); - - self.execute_with_context(&mut ctx) - } - - pub fn execute_with_context(&self, ctx: &mut VrlContext) -> Result { - self.program.resolve(ctx) - } -} - -impl<'de> Deserialize<'de> for Expression { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - struct ExpressionVisitor; - impl<'de> serde::de::Visitor<'de> for ExpressionVisitor { - type Value = Expression; - - fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { - formatter.write_str("a map for Expression") - } - - fn visit_map(self, mut map: A) -> Result - where - A: serde::de::MapAccess<'de>, - { - let mut expression_str: Option = None; - - while let Some(key) = map.next_key::()? { - match key.as_str() { - "expression" => { - if expression_str.is_some() { - return Err(serde::de::Error::duplicate_field("expression")); - } - expression_str = Some(map.next_value()?); - } - other_key => { - return Err(serde::de::Error::unknown_field( - other_key, - &["expression"], - )); - } - } - } - - let expression_str = - expression_str.ok_or_else(|| serde::de::Error::missing_field("expression"))?; - - Expression::try_new(expression_str).map_err(serde::de::Error::custom) - } - } - deserializer.deserialize_map(ExpressionVisitor) - } -} - -impl Serialize for Expression { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - let mut state = serializer.serialize_struct("Expression", 1)?; - state.serialize_field("expression", &self.expression)?; - state.end() - } -} - -impl JsonSchema for Expression { - fn schema_name() -> Cow<'static, str> { - "Expression".into() - } - - fn json_schema(_gen: &mut SchemaGenerator) -> Schema { - json_schema!({ - "type": "object", - "description": "A VRL expression used for dynamic evaluations.", - "properties": { - "expression": { - "type": "string", - "description": "The VRL expression string." - } - }, - "required": ["expression"], - "additionalProperties": false - }) - } -} - -impl TryFrom for Expression { - type Error = String; - fn try_from(value: String) -> Result { - Expression::try_new(value) - } -} diff --git a/lib/router-config/src/primitives/mod.rs b/lib/router-config/src/primitives/mod.rs index aa591928b..972582fc3 100644 --- a/lib/router-config/src/primitives/mod.rs +++ b/lib/router-config/src/primitives/mod.rs @@ -1,4 +1,3 @@ -pub mod expression; pub mod file_path; pub mod http_header; pub mod retry_policy; From e1111dcb320e36de162a919a230b35e27e47c907 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Mon, 3 Nov 2025 13:34:20 +0300 Subject: [PATCH 06/20] Remove extra dep and apply comments --- Cargo.lock | 1 - lib/executor/Cargo.toml | 1 - lib/router-config/src/override_labels.rs | 5 ++++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 261fea428..86befc23f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2094,7 +2094,6 @@ dependencies = [ "ordered-float", "regex-automata", "ryu", - "schemars 1.0.4", "serde", "sonic-rs", "strum 0.27.2", diff --git a/lib/executor/Cargo.toml b/lib/executor/Cargo.toml index 06bcf2904..a1da6754f 100644 --- a/lib/executor/Cargo.toml +++ b/lib/executor/Cargo.toml @@ -50,7 +50,6 @@ ryu = "1.0.20" indexmap = "2.10.0" bumpalo = "3.19.0" once_cell = "1.21.3" -schemars = "1.0.4" [dev-dependencies] subgraphs = { path = "../../bench/subgraphs" } diff --git a/lib/router-config/src/override_labels.rs b/lib/router-config/src/override_labels.rs index cbb7f28d1..b3dc01a75 100644 --- a/lib/router-config/src/override_labels.rs +++ b/lib/router-config/src/override_labels.rs @@ -15,7 +15,10 @@ pub enum LabelOverrideValue { /// A static boolean value to enable or disable the label. Boolean(bool), /// A dynamic value computed by an expression. - Expression { expression: String }, + Expression { + /// An expression that must evaluate to a boolean. If true, the label will be applied. + expression: String, + }, } impl LabelOverrideValue { From 766bd0edbf13ef049f0a508d088362d9fe06028e Mon Sep 17 00:00:00 2001 From: "knope-bot[bot]" <152252888+knope-bot[bot]@users.noreply.github.com> Date: Mon, 3 Nov 2025 10:40:23 +0000 Subject: [PATCH 07/20] Auto generate changeset --- .changeset/shared_utilities_to_handle_vrl_expressions.md | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 .changeset/shared_utilities_to_handle_vrl_expressions.md diff --git a/.changeset/shared_utilities_to_handle_vrl_expressions.md b/.changeset/shared_utilities_to_handle_vrl_expressions.md new file mode 100644 index 000000000..f1e52b5c2 --- /dev/null +++ b/.changeset/shared_utilities_to_handle_vrl_expressions.md @@ -0,0 +1,9 @@ +--- +default: patch +--- + +# Shared utilities to handle VRL expressions + +#540 by @ardatan + + From 6b9ee81b8418d9e089cb9507e498a76033df41f0 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Mon, 3 Nov 2025 15:12:46 +0300 Subject: [PATCH 08/20] Duration --- lib/executor/src/executors/map.rs | 5 +---- lib/router-config/src/traffic_shaping.rs | 17 ++++++++++++----- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index 93247ec80..fc3d6905a 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -1,7 +1,6 @@ use std::{ collections::{BTreeMap, HashMap}, sync::Arc, - time::Duration, }; use bytes::{BufMut, Bytes, BytesMut}; @@ -57,9 +56,7 @@ impl SubgraphExecutorMap { let https = HttpsConnector::new(); let client: HttpClient = Client::builder(TokioExecutor::new()) .pool_timer(TokioTimer::new()) - .pool_idle_timeout(Duration::from_secs( - config.traffic_shaping.pool_idle_timeout_seconds, - )) + .pool_idle_timeout(config.traffic_shaping.pool_idle_timeout) .pool_max_idle_per_host(config.traffic_shaping.max_connections_per_host) .build(https); diff --git a/lib/router-config/src/traffic_shaping.rs b/lib/router-config/src/traffic_shaping.rs index 58dd7e049..95d824910 100644 --- a/lib/router-config/src/traffic_shaping.rs +++ b/lib/router-config/src/traffic_shaping.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -9,8 +11,13 @@ pub struct TrafficShapingConfig { pub max_connections_per_host: usize, /// Timeout for idle sockets being kept-alive. - #[serde(default = "default_pool_idle_timeout_seconds")] - pub pool_idle_timeout_seconds: u64, + #[serde( + default = "default_pool_idle_timeout", + deserialize_with = "humantime_serde::deserialize", + serialize_with = "humantime_serde::serialize" + )] + #[schemars(with = "String")] + pub pool_idle_timeout: Duration, /// Enables/disables request deduplication to subgraphs. /// @@ -24,7 +31,7 @@ impl Default for TrafficShapingConfig { fn default() -> Self { Self { max_connections_per_host: default_max_connections_per_host(), - pool_idle_timeout_seconds: default_pool_idle_timeout_seconds(), + pool_idle_timeout: default_pool_idle_timeout(), dedupe_enabled: default_dedupe_enabled(), } } @@ -34,8 +41,8 @@ fn default_max_connections_per_host() -> usize { 100 } -fn default_pool_idle_timeout_seconds() -> u64 { - 50 +fn default_pool_idle_timeout() -> Duration { + Duration::from_secs(50) } fn default_dedupe_enabled() -> bool { From 47f829784ebf955f24897f94ca7c7871ae992a4b Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Mon, 3 Nov 2025 15:19:01 +0300 Subject: [PATCH 09/20] Fix config' --- docs/README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/README.md b/docs/README.md index a01d56293..0a2fc6727 100644 --- a/docs/README.md +++ b/docs/README.md @@ -15,7 +15,7 @@ |[**override\_subgraph\_urls**](#override_subgraph_urls)|`object`|Configuration for overriding subgraph URLs.
Default: `{}`
|| |[**query\_planner**](#query_planner)|`object`|Query planning configuration.
Default: `{"allow_expose":false,"timeout":"10s"}`
|| |[**supergraph**](#supergraph)|`object`|Configuration for the Federation supergraph source. By default, the router will use a local file-based supergraph source (`./supergraph.graphql`).
|| -|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs.
Default: `{"dedupe_enabled":true,"max_connections_per_host":100,"pool_idle_timeout_seconds":50}`
|| +|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs.
Default: `{"dedupe_enabled":true,"max_connections_per_host":100,"pool_idle_timeout":"50s"}`
|| **Additional Properties:** not allowed **Example** @@ -109,7 +109,7 @@ supergraph: {} traffic_shaping: dedupe_enabled: true max_connections_per_host: 100 - pool_idle_timeout_seconds: 50 + pool_idle_timeout: 50s ``` @@ -1817,7 +1817,7 @@ Configuration for the traffic-shaping of the executor. Use these configurations |----|----|-----------|--------| |**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.

When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will
be deduplicated by sharing the response of other in-flight requests.
Default: `true`
|| |**max\_connections\_per\_host**|`integer`|Limits the concurrent amount of requests/connections per host/subgraph.
Default: `100`
Format: `"uint"`
Minimum: `0`
|| -|**pool\_idle\_timeout\_seconds**|`integer`|Timeout for idle sockets being kept-alive.
Default: `50`
Format: `"uint64"`
Minimum: `0`
|| +|**pool\_idle\_timeout**|`string`|Timeout for idle sockets being kept-alive.
Default: `"50s"`
|| **Additional Properties:** not allowed **Example** @@ -1825,7 +1825,7 @@ Configuration for the traffic-shaping of the executor. Use these configurations ```yaml dedupe_enabled: true max_connections_per_host: 100 -pool_idle_timeout_seconds: 50 +pool_idle_timeout: 50s ``` From 7ebe2d95083d101dd428bd007fc876d38cffe870 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Mon, 3 Nov 2025 15:52:58 +0300 Subject: [PATCH 10/20] Update traffic shaping configuration for VRL expressions Removed `pool_idle_timeout_seconds` from `traffic_shaping` and replaced it with `pool_idle_timeout` using duration format. --- .../shared_utilities_to_handle_vrl_expressions.md | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/.changeset/shared_utilities_to_handle_vrl_expressions.md b/.changeset/shared_utilities_to_handle_vrl_expressions.md index f1e52b5c2..97c07aa3a 100644 --- a/.changeset/shared_utilities_to_handle_vrl_expressions.md +++ b/.changeset/shared_utilities_to_handle_vrl_expressions.md @@ -1,9 +1,15 @@ --- -default: patch +default: minor --- -# Shared utilities to handle VRL expressions +# Breaking -#540 by @ardatan +Removed `pool_idle_timeout_seconds` from `traffic_shaping`, instead use `pool_idle_timeout` with duration format. +```diff +traffic_shaping: +- pool_idle_timeout_seconds: 30 ++ pool_idle_timeout: 30s +``` +#540 by @ardatan From d2a309a4c4f70d9be69223bb0440640ab4a7dbb5 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 31 Oct 2025 14:24:59 +0300 Subject: [PATCH 11/20] Fix --- Cargo.lock | 1 + bin/router/src/pipeline/progressive_override.rs | 4 ++-- lib/router-config/Cargo.toml | 1 + lib/router-config/src/override_labels.rs | 7 +++---- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 86befc23f..c45d03f01 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2058,6 +2058,7 @@ dependencies = [ "http", "humantime-serde", "jsonwebtoken", + "once_cell", "retry-policies", "schemars 1.0.5", "serde", diff --git a/bin/router/src/pipeline/progressive_override.rs b/bin/router/src/pipeline/progressive_override.rs index 0c2ad9c15..f3b2aeb4c 100644 --- a/bin/router/src/pipeline/progressive_override.rs +++ b/bin/router/src/pipeline/progressive_override.rs @@ -119,7 +119,7 @@ impl StableOverrideContext { /// It's intended to be used as a shared state in the router. pub struct OverrideLabelsEvaluator { static_enabled_labels: HashSet, - expressions: HashMap, + expressions: HashMap, } impl OverrideLabelsEvaluator { @@ -174,7 +174,7 @@ impl OverrideLabelsEvaluator { let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); for (label, expression) in &self.expressions { - match expression.resolve(&mut ctx) { + match expression.execute_with_context(&mut ctx) { Ok(evaluated_value) => match evaluated_value { VrlValue::Boolean(true) => { active_flags.insert(label.clone()); diff --git a/lib/router-config/Cargo.toml b/lib/router-config/Cargo.toml index 93edcc3b5..e7748a961 100644 --- a/lib/router-config/Cargo.toml +++ b/lib/router-config/Cargo.toml @@ -24,6 +24,7 @@ jsonwebtoken = { workspace = true } retry-policies = { workspace = true} tracing = { workspace = true } +once_cell = "1.21.3" schemars = "1.0.4" humantime-serde = "1.1.1" config = { version = "0.15.14", features = ["yaml", "json", "json5"] } diff --git a/lib/router-config/src/override_labels.rs b/lib/router-config/src/override_labels.rs index b3dc01a75..a7006f8b1 100644 --- a/lib/router-config/src/override_labels.rs +++ b/lib/router-config/src/override_labels.rs @@ -2,6 +2,8 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use crate::primitives::expression::Expression; + /// A map of label names to their override configuration. pub type OverrideLabelsConfig = HashMap; @@ -15,10 +17,7 @@ pub enum LabelOverrideValue { /// A static boolean value to enable or disable the label. Boolean(bool), /// A dynamic value computed by an expression. - Expression { - /// An expression that must evaluate to a boolean. If true, the label will be applied. - expression: String, - }, + Expression(Expression), } impl LabelOverrideValue { From 792f8a1ca568a9d3823b25fb049137b01f39b3b2 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 31 Oct 2025 18:01:37 +0300 Subject: [PATCH 12/20] Revert --- Cargo.lock | 2 +- bin/router/src/pipeline/progressive_override.rs | 4 ++-- lib/router-config/Cargo.toml | 1 - lib/router-config/src/override_labels.rs | 4 +--- 4 files changed, 4 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c45d03f01..261fea428 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2058,7 +2058,6 @@ dependencies = [ "http", "humantime-serde", "jsonwebtoken", - "once_cell", "retry-policies", "schemars 1.0.5", "serde", @@ -2095,6 +2094,7 @@ dependencies = [ "ordered-float", "regex-automata", "ryu", + "schemars 1.0.4", "serde", "sonic-rs", "strum 0.27.2", diff --git a/bin/router/src/pipeline/progressive_override.rs b/bin/router/src/pipeline/progressive_override.rs index f3b2aeb4c..0c2ad9c15 100644 --- a/bin/router/src/pipeline/progressive_override.rs +++ b/bin/router/src/pipeline/progressive_override.rs @@ -119,7 +119,7 @@ impl StableOverrideContext { /// It's intended to be used as a shared state in the router. pub struct OverrideLabelsEvaluator { static_enabled_labels: HashSet, - expressions: HashMap, + expressions: HashMap, } impl OverrideLabelsEvaluator { @@ -174,7 +174,7 @@ impl OverrideLabelsEvaluator { let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); for (label, expression) in &self.expressions { - match expression.execute_with_context(&mut ctx) { + match expression.resolve(&mut ctx) { Ok(evaluated_value) => match evaluated_value { VrlValue::Boolean(true) => { active_flags.insert(label.clone()); diff --git a/lib/router-config/Cargo.toml b/lib/router-config/Cargo.toml index e7748a961..93edcc3b5 100644 --- a/lib/router-config/Cargo.toml +++ b/lib/router-config/Cargo.toml @@ -24,7 +24,6 @@ jsonwebtoken = { workspace = true } retry-policies = { workspace = true} tracing = { workspace = true } -once_cell = "1.21.3" schemars = "1.0.4" humantime-serde = "1.1.1" config = { version = "0.15.14", features = ["yaml", "json", "json5"] } diff --git a/lib/router-config/src/override_labels.rs b/lib/router-config/src/override_labels.rs index a7006f8b1..cbb7f28d1 100644 --- a/lib/router-config/src/override_labels.rs +++ b/lib/router-config/src/override_labels.rs @@ -2,8 +2,6 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use crate::primitives::expression::Expression; - /// A map of label names to their override configuration. pub type OverrideLabelsConfig = HashMap; @@ -17,7 +15,7 @@ pub enum LabelOverrideValue { /// A static boolean value to enable or disable the label. Boolean(bool), /// A dynamic value computed by an expression. - Expression(Expression), + Expression { expression: String }, } impl LabelOverrideValue { From a603586610f4f63aae7ab1835008a8dce743cd7d Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 31 Oct 2025 13:54:53 +0300 Subject: [PATCH 13/20] feat(router): Subgraph Timeout Configuration --- Cargo.lock | 2 +- docs/README.md | 65 +++++++++++- lib/executor/Cargo.toml | 1 + lib/executor/src/execution/plan.rs | 3 +- lib/executor/src/executors/common.rs | 17 +-- lib/executor/src/executors/error.rs | 13 +++ lib/executor/src/executors/http.rs | 115 ++++++++++++++++++--- lib/executor/src/executors/map.rs | 63 ++++++++++-- lib/router-config/src/traffic_shaping.rs | 125 ++++++++++++++++++++--- 9 files changed, 351 insertions(+), 53 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 261fea428..2365117e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2083,6 +2083,7 @@ dependencies = [ "hive-router-query-planner", "http", "http-body-util", + "humantime", "hyper", "hyper-tls", "hyper-util", @@ -2094,7 +2095,6 @@ dependencies = [ "ordered-float", "regex-automata", "ryu", - "schemars 1.0.4", "serde", "sonic-rs", "strum 0.27.2", diff --git a/docs/README.md b/docs/README.md index 0a2fc6727..b54ec0592 100644 --- a/docs/README.md +++ b/docs/README.md @@ -15,7 +15,7 @@ |[**override\_subgraph\_urls**](#override_subgraph_urls)|`object`|Configuration for overriding subgraph URLs.
Default: `{}`
|| |[**query\_planner**](#query_planner)|`object`|Query planning configuration.
Default: `{"allow_expose":false,"timeout":"10s"}`
|| |[**supergraph**](#supergraph)|`object`|Configuration for the Federation supergraph source. By default, the router will use a local file-based supergraph source (`./supergraph.graphql`).
|| -|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs.
Default: `{"dedupe_enabled":true,"max_connections_per_host":100,"pool_idle_timeout":"50s"}`
|| +|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs.
Default: `{"all":{"dedupe_enabled":true,"pool_idle_timeout":"50s","request_timeout":{"Duration":"15s"}},"max_connections_per_host":100}`
|| **Additional Properties:** not allowed **Example** @@ -107,9 +107,12 @@ query_planner: timeout: 10s supergraph: {} traffic_shaping: - dedupe_enabled: true + all: + dedupe_enabled: true + pool_idle_timeout: 50s + request_timeout: + Duration: 15s max_connections_per_host: 100 - pool_idle_timeout: 50s ``` @@ -1815,18 +1818,70 @@ Configuration for the traffic-shaping of the executor. Use these configurations |Name|Type|Description|Required| |----|----|-----------|--------| -|**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.

When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will
be deduplicated by sharing the response of other in-flight requests.
Default: `true`
|| +|[**all**](#traffic_shapingall)|`object`|The default configuration that will be applied to all subgraphs, unless overridden by a specific subgraph configuration.
Default: `{"dedupe_enabled":true,"pool_idle_timeout":"50s","request_timeout":{"Duration":"15s"}}`
|| |**max\_connections\_per\_host**|`integer`|Limits the concurrent amount of requests/connections per host/subgraph.
Default: `100`
Format: `"uint"`
Minimum: `0`
|| +|[**subgraphs**](#traffic_shapingsubgraphs)|`object`|Optional per-subgraph configurations that will override the default configuration for specific subgraphs.
|| + +**Additional Properties:** not allowed +**Example** + +```yaml +all: + dedupe_enabled: true + pool_idle_timeout: 50s + request_timeout: + Duration: 15s +max_connections_per_host: 100 + +``` + +
+### traffic\_shaping\.all: object + +The default configuration that will be applied to all subgraphs, unless overridden by a specific subgraph configuration. + + +**Properties** + +|Name|Type|Description|Required| +|----|----|-----------|--------| +|**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.

When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will
be deduplicated by sharing the response of other in-flight requests.
Default: `true`
|| |**pool\_idle\_timeout**|`string`|Timeout for idle sockets being kept-alive.
Default: `"50s"`
|| +|**request\_timeout**||Optional timeout configuration for requests to subgraphs.

Example with a fixed duration:
```yaml
timeout:
duration: 5s
```

Or with a VRL expression that can return a duration based on the operation kind:
```yaml
timeout:
expression: \|
if (.request.operation.type == "mutation") {
"10s"
} else {
"15s"
}
```
Default: `{"Duration":"15s"}`
|| **Additional Properties:** not allowed **Example** ```yaml dedupe_enabled: true -max_connections_per_host: 100 pool_idle_timeout: 50s +request_timeout: + Duration: 15s ``` + +### traffic\_shaping\.subgraphs: object + +Optional per-subgraph configurations that will override the default configuration for specific subgraphs. + + +**Additional Properties** + +|Name|Type|Description|Required| +|----|----|-----------|--------| +|[**Additional Properties**](#traffic_shapingsubgraphsadditionalproperties)|`object`||yes| + + +#### traffic\_shaping\.subgraphs\.additionalProperties: object + +**Properties** + +|Name|Type|Description|Required| +|----|----|-----------|--------| +|**dedupe\_enabled**|`boolean`, `null`|Enables/disables request deduplication to subgraphs.

When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will
be deduplicated by sharing the response of other in-flight requests.
|no| +|**pool\_idle\_timeout\_seconds**|`string`|Timeout for idle sockets being kept-alive.
|yes| +|**request\_timeout**||Optional timeout configuration for requests to subgraphs.

Example with a fixed duration:
```yaml
timeout:
duration: 5s
```

Or with a VRL expression that can return a duration based on the operation kind:
```yaml
timeout:
expression: \|
if (.request.operation.type == "mutation") {
"10s"
} else {
"15s"
}
```
|no| + +**Additional Properties:** not allowed diff --git a/lib/executor/Cargo.toml b/lib/executor/Cargo.toml index a1da6754f..d416ab28b 100644 --- a/lib/executor/Cargo.toml +++ b/lib/executor/Cargo.toml @@ -44,6 +44,7 @@ hyper-util = { version = "0.1.16", features = [ "http2", "tokio", ] } +humantime = "2.3.0" bytes = "1.10.1" itoa = "1.0.15" ryu = "1.0.20" diff --git a/lib/executor/src/execution/plan.rs b/lib/executor/src/execution/plan.rs index f86356312..90102542c 100644 --- a/lib/executor/src/execution/plan.rs +++ b/lib/executor/src/execution/plan.rs @@ -708,6 +708,7 @@ impl<'exec, 'req> Executor<'exec, 'req> { representations, headers: headers_map, extensions: None, + client_request: self.client_request, }; if let Some(jwt_forwarding_plan) = &self.jwt_forwarding_plan { @@ -722,7 +723,7 @@ impl<'exec, 'req> Executor<'exec, 'req> { subgraph_name: node.service_name.clone(), response: self .executors - .execute(&node.service_name, subgraph_request, self.client_request) + .execute(&node.service_name, subgraph_request) .await .into(), })) diff --git a/lib/executor/src/executors/common.rs b/lib/executor/src/executors/common.rs index bdcd4d819..44fc455d6 100644 --- a/lib/executor/src/executors/common.rs +++ b/lib/executor/src/executors/common.rs @@ -5,11 +5,13 @@ use bytes::Bytes; use http::HeaderMap; use sonic_rs::Value; +use crate::execution::client_request_details::ClientRequestDetails; + #[async_trait] pub trait SubgraphExecutor { - async fn execute<'a>( + async fn execute<'exec, 'req>( &self, - execution_request: HttpExecutionRequest<'a>, + execution_request: HttpExecutionRequest<'exec, 'req>, ) -> HttpExecutionResponse; fn to_boxed_arc<'a>(self) -> Arc> @@ -26,18 +28,19 @@ pub type SubgraphExecutorBoxedArc = Arc>; pub type SubgraphRequestExtensions = HashMap; -pub struct HttpExecutionRequest<'a> { - pub query: &'a str, +pub struct HttpExecutionRequest<'exec, 'req> { + pub query: &'exec str, pub dedupe: bool, - pub operation_name: Option<&'a str>, + pub operation_name: Option<&'exec str>, // TODO: variables could be stringified before even executing the request - pub variables: Option>, + pub variables: Option>, pub headers: HeaderMap, pub representations: Option>, pub extensions: Option, + pub client_request: &'exec ClientRequestDetails<'exec, 'req>, } -impl HttpExecutionRequest<'_> { +impl HttpExecutionRequest<'_, '_> { pub fn add_request_extensions_field(&mut self, key: String, value: Value) { self.extensions .get_or_insert_with(HashMap::new) diff --git a/lib/executor/src/executors/error.rs b/lib/executor/src/executors/error.rs index de69d47cb..c3c7625f1 100644 --- a/lib/executor/src/executors/error.rs +++ b/lib/executor/src/executors/error.rs @@ -20,6 +20,12 @@ pub enum SubgraphExecutorError { RequestFailure(String, String), #[error("Failed to serialize variable \"{0}\": {1}")] VariablesSerializationFailure(String, String), + #[error("Failed to compile VRL expression for timeout for subgraph '{0}'. Please check your VRL expression for syntax errors. Diagnostic: {1}")] + RequestTimeoutExpressionBuild(String, String), + #[error("Failed to resolve VRL expression for timeout for subgraph '{0}'. Runtime error: {1}")] + TimeoutExpressionResolution(String, String), + #[error("Request to subgraph \"{0}\" timed out after {1} milliseconds")] + RequestTimeout(String, u64), } impl From for GraphQLError { @@ -61,6 +67,13 @@ impl SubgraphExecutorError { SubgraphExecutorError::VariablesSerializationFailure(_, _) => { "SUBGRAPH_VARIABLES_SERIALIZATION_FAILURE" } + SubgraphExecutorError::TimeoutExpressionResolution(_, _) => { + "SUBGRAPH_TIMEOUT_EXPRESSION_RESOLUTION_FAILURE" + } + SubgraphExecutorError::RequestTimeout(_, _) => "SUBGRAPH_REQUEST_TIMEOUT", + SubgraphExecutorError::RequestTimeoutExpressionBuild(_, _) => { + "SUBGRAPH_TIMEOUT_EXPRESSION_BUILD_FAILURE" + } } } } diff --git a/lib/executor/src/executors/http.rs b/lib/executor/src/executors/http.rs index 29b392567..aa833edb8 100644 --- a/lib/executor/src/executors/http.rs +++ b/lib/executor/src/executors/http.rs @@ -1,9 +1,12 @@ +use std::collections::BTreeMap; use std::sync::Arc; +use std::time::Duration; +use crate::execution::client_request_details::ClientRequestDetails; use crate::executors::common::HttpExecutionResponse; use crate::executors::dedupe::{request_fingerprint, ABuildHasher, SharedResponse}; +use crate::utils::expression::execute_expression_with_value; use dashmap::DashMap; -use hive_router_config::HiveRouterConfig; use tokio::sync::OnceCell; use async_trait::async_trait; @@ -18,6 +21,8 @@ use hyper_tls::HttpsConnector; use hyper_util::client::legacy::{connect::HttpConnector, Client}; use tokio::sync::Semaphore; use tracing::debug; +use vrl::compiler::Program as VrlProgram; +use vrl::core::Value as VrlValue; use crate::executors::common::HttpExecutionRequest; use crate::executors::error::SubgraphExecutorError; @@ -28,6 +33,12 @@ use crate::utils::consts::COMMA; use crate::utils::consts::QUOTE; use crate::{executors::common::SubgraphExecutor, json_writer::write_and_escape_string}; +#[derive(Debug)] +pub enum DurationOrProgram { + Duration(Duration), + Program(Box), +} + #[derive(Debug)] pub struct HTTPSubgraphExecutor { pub subgraph_name: String, @@ -35,8 +46,9 @@ pub struct HTTPSubgraphExecutor { pub http_client: Arc, Full>>, pub header_map: HeaderMap, pub semaphore: Arc, - pub config: Arc, + pub dedupe_enabled: bool, pub in_flight_requests: Arc>, ABuildHasher>>, + pub timeout: DurationOrProgram, } const FIRST_VARIABLE_STR: &[u8] = b",\"variables\":{"; @@ -50,8 +62,9 @@ impl HTTPSubgraphExecutor { endpoint: http::Uri, http_client: Arc, semaphore: Arc, - config: Arc, + dedupe_enabled: bool, in_flight_requests: Arc>, ABuildHasher>>, + timeout: DurationOrProgram, ) -> Self { let mut header_map = HeaderMap::new(); header_map.insert( @@ -69,14 +82,15 @@ impl HTTPSubgraphExecutor { http_client, header_map, semaphore, - config, + dedupe_enabled, in_flight_requests, + timeout, } } - fn build_request_body<'a>( + fn build_request_body( &self, - execution_request: &HttpExecutionRequest<'a>, + execution_request: &HttpExecutionRequest<'_, '_>, ) -> Result, SubgraphExecutorError> { let mut body = Vec::with_capacity(4096); body.put(FIRST_QUOTE_STR); @@ -137,6 +151,7 @@ impl HTTPSubgraphExecutor { &self, body: Vec, headers: HeaderMap, + client_request: &ClientRequestDetails<'_, '_>, ) -> Result { let mut req = hyper::Request::builder() .method(http::Method::POST) @@ -151,9 +166,75 @@ impl HTTPSubgraphExecutor { debug!("making http request to {}", self.endpoint.to_string()); - let res = self.http_client.request(req).await.map_err(|e| { - SubgraphExecutorError::RequestFailure(self.endpoint.to_string(), e.to_string()) - })?; + let timeout = match &self.timeout { + DurationOrProgram::Duration(dur) => *dur, + DurationOrProgram::Program(program) => { + let value = + VrlValue::Object(BTreeMap::from([("request".into(), client_request.into())])); + let result = execute_expression_with_value(program, value).map_err(|err| { + SubgraphExecutorError::TimeoutExpressionResolution( + self.subgraph_name.to_string(), + err.to_string(), + ) + })?; + match result { + VrlValue::Integer(i) => { + if i < 0 { + return Err(SubgraphExecutorError::TimeoutExpressionResolution( + self.subgraph_name.to_string(), + "Timeout expression resolved to a negative integer".to_string(), + )); + } + std::time::Duration::from_millis(i as u64) + } + VrlValue::Float(f) => { + let f = f.into_inner(); + if f < 0.0 { + return Err(SubgraphExecutorError::TimeoutExpressionResolution( + self.subgraph_name.to_string(), + "Timeout expression resolved to a negative float".to_string(), + )); + } + std::time::Duration::from_millis(f as u64) + } + VrlValue::Bytes(b) => { + let s = std::str::from_utf8(&b).map_err(|e| { + SubgraphExecutorError::TimeoutExpressionResolution( + self.subgraph_name.to_string(), + format!("Failed to parse duration string from bytes: {}", e), + ) + })?; + humantime::parse_duration(s).map_err(|e| { + SubgraphExecutorError::TimeoutExpressionResolution( + self.subgraph_name.to_string(), + format!("Failed to parse duration string '{}': {}", s, e), + ) + })? + } + other => { + return Err(SubgraphExecutorError::TimeoutExpressionResolution( + self.subgraph_name.to_string(), + format!( + "Timeout expression resolved to an unexpected type: {}. Expected a non-negative integer/float (ms) or a duration string.", + other.kind() + ), + )); + } + } + } + }; + + let res = tokio::time::timeout(timeout, self.http_client.request(req)) + .await + .map_err(|_| { + SubgraphExecutorError::RequestTimeout( + self.endpoint.to_string(), + timeout.as_millis() as u64, + ) + })? + .map_err(|e| { + SubgraphExecutorError::RequestFailure(self.endpoint.to_string(), e.to_string()) + })?; debug!( "http request to {} completed, status: {}", @@ -209,10 +290,10 @@ impl HTTPSubgraphExecutor { #[async_trait] impl SubgraphExecutor for HTTPSubgraphExecutor { - #[tracing::instrument(skip_all, fields(subgraph_name = self.subgraph_name))] - async fn execute<'a>( + #[tracing::instrument(skip_all, fields(subgraph_name = %self.subgraph_name))] + async fn execute<'exec, 'req>( &self, - execution_request: HttpExecutionRequest<'a>, + execution_request: HttpExecutionRequest<'exec, 'req>, ) -> HttpExecutionResponse { let body = match self.build_request_body(&execution_request) { Ok(body) => body, @@ -230,11 +311,14 @@ impl SubgraphExecutor for HTTPSubgraphExecutor { headers.insert(key, value.clone()); }); - if !self.config.traffic_shaping.dedupe_enabled || !execution_request.dedupe { + if !self.dedupe_enabled || !execution_request.dedupe { // This unwrap is safe because the semaphore is never closed during the application's lifecycle. // `acquire()` only fails if the semaphore is closed, so this will always return `Ok`. let _permit = self.semaphore.acquire().await.unwrap(); - return match self._send_request(body, headers).await { + return match self + ._send_request(body, headers, execution_request.client_request) + .await + { Ok(shared_response) => HttpExecutionResponse { body: shared_response.body, headers: shared_response.headers, @@ -266,7 +350,8 @@ impl SubgraphExecutor for HTTPSubgraphExecutor { // This unwrap is safe because the semaphore is never closed during the application's lifecycle. // `acquire()` only fails if the semaphore is closed, so this will always return `Ok`. let _permit = self.semaphore.acquire().await.unwrap(); - self._send_request(body, headers).await + self._send_request(body, headers, execution_request.client_request) + .await }; // It's important to remove the entry from the map before returning the result. // This ensures that once the OnceCell is set, no future requests can join it. diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index fc3d6905a..9dddeb648 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -5,7 +5,10 @@ use std::{ use bytes::{BufMut, Bytes, BytesMut}; use dashmap::DashMap; -use hive_router_config::{override_subgraph_urls::UrlOrExpression, HiveRouterConfig}; +use hive_router_config::{ + override_subgraph_urls::UrlOrExpression, traffic_shaping::DurationOrExpression, + HiveRouterConfig, +}; use http::Uri; use hyper_tls::HttpsConnector; use hyper_util::{ @@ -24,7 +27,7 @@ use crate::{ }, dedupe::{ABuildHasher, SharedResponse}, error::SubgraphExecutorError, - http::{HTTPSubgraphExecutor, HttpClient}, + http::{DurationOrProgram, HTTPSubgraphExecutor, HttpClient}, }, response::graphql_error::GraphQLError, utils::expression::{compile_expression, execute_expression_with_value}, @@ -56,7 +59,7 @@ impl SubgraphExecutorMap { let https = HttpsConnector::new(); let client: HttpClient = Client::builder(TokioExecutor::new()) .pool_timer(TokioTimer::new()) - .pool_idle_timeout(config.traffic_shaping.pool_idle_timeout) + .pool_idle_timeout(config.traffic_shaping.all.pool_idle_timeout) .pool_max_idle_per_host(config.traffic_shaping.max_connections_per_host) .build(https); @@ -101,13 +104,12 @@ impl SubgraphExecutorMap { Ok(subgraph_executor_map) } - pub async fn execute<'a, 'req>( + pub async fn execute<'exec, 'req>( &self, subgraph_name: &str, - execution_request: HttpExecutionRequest<'a>, - client_request: &ClientRequestDetails<'a, 'req>, + execution_request: HttpExecutionRequest<'exec, 'req>, ) -> HttpExecutionResponse { - match self.get_or_create_executor(subgraph_name, client_request) { + match self.get_or_create_executor(subgraph_name, execution_request.client_request) { Ok(Some(executor)) => executor.execute(execution_request).await, Err(err) => { error!( @@ -295,13 +297,56 @@ impl SubgraphExecutorMap { .or_insert_with(|| Arc::new(Semaphore::new(self.max_connections_per_host))) .clone(); + let mut client = self.client.clone(); + let mut timeout_config = &self.config.traffic_shaping.all.request_timeout; + let pool_idle_timeout_seconds = self.config.traffic_shaping.all.pool_idle_timeout; + let mut dedupe_enabled = self.config.traffic_shaping.all.dedupe_enabled; + if let Some(subgraph_traffic_shaping_config) = + self.config.traffic_shaping.subgraphs.get(subgraph_name) + { + if subgraph_traffic_shaping_config.pool_idle_timeout_seconds.is_some() { + client = Arc::new( + Client::builder(TokioExecutor::new()) + .pool_timer(TokioTimer::new()) + .pool_idle_timeout( + subgraph_traffic_shaping_config + .pool_idle_timeout_seconds + .unwrap_or(pool_idle_timeout_seconds), + ) + .pool_max_idle_per_host(self.max_connections_per_host) + .build(HttpsConnector::new()), + ); + } + dedupe_enabled = subgraph_traffic_shaping_config + .dedupe_enabled + .unwrap_or(dedupe_enabled); + timeout_config = subgraph_traffic_shaping_config + .request_timeout + .as_ref() + .unwrap_or(timeout_config); + } + + let timeout_prog = match &timeout_config { + DurationOrExpression::Duration(dur) => DurationOrProgram::Duration(*dur), + DurationOrExpression::Expression { expression } => { + let program = compile_expression(expression, None).map_err(|err| { + SubgraphExecutorError::RequestTimeoutExpressionBuild( + subgraph_name.to_string(), + err, + ) + })?; + DurationOrProgram::Program(Box::new(program)) + } + }; + let executor = HTTPSubgraphExecutor::new( subgraph_name.to_string(), endpoint_uri, - self.client.clone(), + client, semaphore, - self.config.clone(), + dedupe_enabled, self.in_flight_requests.clone(), + timeout_prog, ); self.executors_by_subgraph diff --git a/lib/router-config/src/traffic_shaping.rs b/lib/router-config/src/traffic_shaping.rs index 95d824910..53f691237 100644 --- a/lib/router-config/src/traffic_shaping.rs +++ b/lib/router-config/src/traffic_shaping.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{collections::HashMap, time::Duration}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -6,10 +6,80 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)] #[serde(deny_unknown_fields)] pub struct TrafficShapingConfig { + /// The default configuration that will be applied to all subgraphs, unless overridden by a specific subgraph configuration. + #[serde(default)] + pub all: TrafficShapingExecutorConfig, + /// Optional per-subgraph configurations that will override the default configuration for specific subgraphs. + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + pub subgraphs: HashMap, /// Limits the concurrent amount of requests/connections per host/subgraph. #[serde(default = "default_max_connections_per_host")] pub max_connections_per_host: usize, +} + +impl Default for TrafficShapingConfig { + fn default() -> Self { + Self { + all: TrafficShapingExecutorConfig::default(), + subgraphs: HashMap::new(), + max_connections_per_host: default_max_connections_per_host(), + } + } +} + +fn default_max_connections_per_host() -> usize { + 100 +} + +fn default_pool_idle_timeout() -> Duration { + Duration::from_secs(50) +} +fn default_dedupe_enabled() -> bool { + true +} + +#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)] +#[serde(deny_unknown_fields)] +pub struct TrafficShapingExecutorSubgraphConfig { + /// Timeout for idle sockets being kept-alive. + #[serde( + deserialize_with = "humantime_serde::deserialize", + serialize_with = "humantime_serde::serialize" + )] + #[schemars(with = "String")] + pub pool_idle_timeout_seconds: Option, + + /// Enables/disables request deduplication to subgraphs. + /// + /// When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will + /// be deduplicated by sharing the response of other in-flight requests. + pub dedupe_enabled: Option, + + /// Optional timeout configuration for requests to subgraphs. + /// + /// Example with a fixed duration: + /// ```yaml + /// timeout: + /// duration: 5s + /// ``` + /// + /// Or with a VRL expression that can return a duration based on the operation kind: + /// ```yaml + /// timeout: + /// expression: | + /// if (.request.operation.type == "mutation") { + /// "10s" + /// } else { + /// "15s" + /// } + /// ``` + pub request_timeout: Option, +} + +#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)] +#[serde(deny_unknown_fields)] +pub struct TrafficShapingExecutorConfig { /// Timeout for idle sockets being kept-alive. #[serde( default = "default_pool_idle_timeout", @@ -25,26 +95,51 @@ pub struct TrafficShapingConfig { /// be deduplicated by sharing the response of other in-flight requests. #[serde(default = "default_dedupe_enabled")] pub dedupe_enabled: bool, + + /// Optional timeout configuration for requests to subgraphs. + /// + /// Example with a fixed duration: + /// ```yaml + /// timeout: + /// duration: 5s + /// ``` + /// + /// Or with a VRL expression that can return a duration based on the operation kind: + /// ```yaml + /// timeout: + /// expression: | + /// if (.request.operation.type == "mutation") { + /// "10s" + /// } else { + /// "15s" + /// } + /// ``` + #[serde(default = "default_request_timeout")] + pub request_timeout: DurationOrExpression, } -impl Default for TrafficShapingConfig { +fn default_request_timeout() -> DurationOrExpression { + DurationOrExpression::Duration(Duration::from_secs(15)) +} + +#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)] +pub enum DurationOrExpression { + /// A fixed duration, e.g., "5s" or "100ms". + #[serde( + deserialize_with = "humantime_serde::deserialize", + serialize_with = "humantime_serde::serialize" + )] + Duration(Duration), + /// A VRL expression that evaluates to a duration. The result can be an integer (milliseconds), a float (milliseconds), or a duration string (e.g. "5s"). + Expression { expression: String }, +} + +impl Default for TrafficShapingExecutorConfig { fn default() -> Self { Self { - max_connections_per_host: default_max_connections_per_host(), pool_idle_timeout: default_pool_idle_timeout(), dedupe_enabled: default_dedupe_enabled(), + request_timeout: default_request_timeout(), } } } - -fn default_max_connections_per_host() -> usize { - 100 -} - -fn default_pool_idle_timeout() -> Duration { - Duration::from_secs(50) -} - -fn default_dedupe_enabled() -> bool { - true -} From 4e03aa4b17538764c97a915117acf2389fcf7cc6 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Mon, 3 Nov 2025 15:01:35 +0300 Subject: [PATCH 14/20] Timeout --- lib/executor/src/execution/plan.rs | 3 +- lib/executor/src/executors/common.rs | 17 ++-- lib/executor/src/executors/http.rs | 102 ++----------------- lib/executor/src/executors/map.rs | 145 +++++++++++++++++++++++---- 4 files changed, 142 insertions(+), 125 deletions(-) diff --git a/lib/executor/src/execution/plan.rs b/lib/executor/src/execution/plan.rs index 90102542c..f86356312 100644 --- a/lib/executor/src/execution/plan.rs +++ b/lib/executor/src/execution/plan.rs @@ -708,7 +708,6 @@ impl<'exec, 'req> Executor<'exec, 'req> { representations, headers: headers_map, extensions: None, - client_request: self.client_request, }; if let Some(jwt_forwarding_plan) = &self.jwt_forwarding_plan { @@ -723,7 +722,7 @@ impl<'exec, 'req> Executor<'exec, 'req> { subgraph_name: node.service_name.clone(), response: self .executors - .execute(&node.service_name, subgraph_request) + .execute(&node.service_name, subgraph_request, self.client_request) .await .into(), })) diff --git a/lib/executor/src/executors/common.rs b/lib/executor/src/executors/common.rs index 44fc455d6..bdcd4d819 100644 --- a/lib/executor/src/executors/common.rs +++ b/lib/executor/src/executors/common.rs @@ -5,13 +5,11 @@ use bytes::Bytes; use http::HeaderMap; use sonic_rs::Value; -use crate::execution::client_request_details::ClientRequestDetails; - #[async_trait] pub trait SubgraphExecutor { - async fn execute<'exec, 'req>( + async fn execute<'a>( &self, - execution_request: HttpExecutionRequest<'exec, 'req>, + execution_request: HttpExecutionRequest<'a>, ) -> HttpExecutionResponse; fn to_boxed_arc<'a>(self) -> Arc> @@ -28,19 +26,18 @@ pub type SubgraphExecutorBoxedArc = Arc>; pub type SubgraphRequestExtensions = HashMap; -pub struct HttpExecutionRequest<'exec, 'req> { - pub query: &'exec str, +pub struct HttpExecutionRequest<'a> { + pub query: &'a str, pub dedupe: bool, - pub operation_name: Option<&'exec str>, + pub operation_name: Option<&'a str>, // TODO: variables could be stringified before even executing the request - pub variables: Option>, + pub variables: Option>, pub headers: HeaderMap, pub representations: Option>, pub extensions: Option, - pub client_request: &'exec ClientRequestDetails<'exec, 'req>, } -impl HttpExecutionRequest<'_, '_> { +impl HttpExecutionRequest<'_> { pub fn add_request_extensions_field(&mut self, key: String, value: Value) { self.extensions .get_or_insert_with(HashMap::new) diff --git a/lib/executor/src/executors/http.rs b/lib/executor/src/executors/http.rs index aa833edb8..fb1f6bed0 100644 --- a/lib/executor/src/executors/http.rs +++ b/lib/executor/src/executors/http.rs @@ -1,11 +1,7 @@ -use std::collections::BTreeMap; use std::sync::Arc; -use std::time::Duration; -use crate::execution::client_request_details::ClientRequestDetails; use crate::executors::common::HttpExecutionResponse; use crate::executors::dedupe::{request_fingerprint, ABuildHasher, SharedResponse}; -use crate::utils::expression::execute_expression_with_value; use dashmap::DashMap; use tokio::sync::OnceCell; @@ -21,8 +17,6 @@ use hyper_tls::HttpsConnector; use hyper_util::client::legacy::{connect::HttpConnector, Client}; use tokio::sync::Semaphore; use tracing::debug; -use vrl::compiler::Program as VrlProgram; -use vrl::core::Value as VrlValue; use crate::executors::common::HttpExecutionRequest; use crate::executors::error::SubgraphExecutorError; @@ -33,12 +27,6 @@ use crate::utils::consts::COMMA; use crate::utils::consts::QUOTE; use crate::{executors::common::SubgraphExecutor, json_writer::write_and_escape_string}; -#[derive(Debug)] -pub enum DurationOrProgram { - Duration(Duration), - Program(Box), -} - #[derive(Debug)] pub struct HTTPSubgraphExecutor { pub subgraph_name: String, @@ -48,7 +36,6 @@ pub struct HTTPSubgraphExecutor { pub semaphore: Arc, pub dedupe_enabled: bool, pub in_flight_requests: Arc>, ABuildHasher>>, - pub timeout: DurationOrProgram, } const FIRST_VARIABLE_STR: &[u8] = b",\"variables\":{"; @@ -64,7 +51,6 @@ impl HTTPSubgraphExecutor { semaphore: Arc, dedupe_enabled: bool, in_flight_requests: Arc>, ABuildHasher>>, - timeout: DurationOrProgram, ) -> Self { let mut header_map = HeaderMap::new(); header_map.insert( @@ -84,13 +70,12 @@ impl HTTPSubgraphExecutor { semaphore, dedupe_enabled, in_flight_requests, - timeout, } } fn build_request_body( &self, - execution_request: &HttpExecutionRequest<'_, '_>, + execution_request: &HttpExecutionRequest<'_>, ) -> Result, SubgraphExecutorError> { let mut body = Vec::with_capacity(4096); body.put(FIRST_QUOTE_STR); @@ -151,7 +136,6 @@ impl HTTPSubgraphExecutor { &self, body: Vec, headers: HeaderMap, - client_request: &ClientRequestDetails<'_, '_>, ) -> Result { let mut req = hyper::Request::builder() .method(http::Method::POST) @@ -166,75 +150,9 @@ impl HTTPSubgraphExecutor { debug!("making http request to {}", self.endpoint.to_string()); - let timeout = match &self.timeout { - DurationOrProgram::Duration(dur) => *dur, - DurationOrProgram::Program(program) => { - let value = - VrlValue::Object(BTreeMap::from([("request".into(), client_request.into())])); - let result = execute_expression_with_value(program, value).map_err(|err| { - SubgraphExecutorError::TimeoutExpressionResolution( - self.subgraph_name.to_string(), - err.to_string(), - ) - })?; - match result { - VrlValue::Integer(i) => { - if i < 0 { - return Err(SubgraphExecutorError::TimeoutExpressionResolution( - self.subgraph_name.to_string(), - "Timeout expression resolved to a negative integer".to_string(), - )); - } - std::time::Duration::from_millis(i as u64) - } - VrlValue::Float(f) => { - let f = f.into_inner(); - if f < 0.0 { - return Err(SubgraphExecutorError::TimeoutExpressionResolution( - self.subgraph_name.to_string(), - "Timeout expression resolved to a negative float".to_string(), - )); - } - std::time::Duration::from_millis(f as u64) - } - VrlValue::Bytes(b) => { - let s = std::str::from_utf8(&b).map_err(|e| { - SubgraphExecutorError::TimeoutExpressionResolution( - self.subgraph_name.to_string(), - format!("Failed to parse duration string from bytes: {}", e), - ) - })?; - humantime::parse_duration(s).map_err(|e| { - SubgraphExecutorError::TimeoutExpressionResolution( - self.subgraph_name.to_string(), - format!("Failed to parse duration string '{}': {}", s, e), - ) - })? - } - other => { - return Err(SubgraphExecutorError::TimeoutExpressionResolution( - self.subgraph_name.to_string(), - format!( - "Timeout expression resolved to an unexpected type: {}. Expected a non-negative integer/float (ms) or a duration string.", - other.kind() - ), - )); - } - } - } - }; - - let res = tokio::time::timeout(timeout, self.http_client.request(req)) - .await - .map_err(|_| { - SubgraphExecutorError::RequestTimeout( - self.endpoint.to_string(), - timeout.as_millis() as u64, - ) - })? - .map_err(|e| { - SubgraphExecutorError::RequestFailure(self.endpoint.to_string(), e.to_string()) - })?; + let res = self.http_client.request(req).await.map_err(|e| { + SubgraphExecutorError::RequestFailure(self.endpoint.to_string(), e.to_string()) + })?; debug!( "http request to {} completed, status: {}", @@ -291,9 +209,9 @@ impl HTTPSubgraphExecutor { #[async_trait] impl SubgraphExecutor for HTTPSubgraphExecutor { #[tracing::instrument(skip_all, fields(subgraph_name = %self.subgraph_name))] - async fn execute<'exec, 'req>( + async fn execute<'a>( &self, - execution_request: HttpExecutionRequest<'exec, 'req>, + execution_request: HttpExecutionRequest<'a>, ) -> HttpExecutionResponse { let body = match self.build_request_body(&execution_request) { Ok(body) => body, @@ -315,10 +233,7 @@ impl SubgraphExecutor for HTTPSubgraphExecutor { // This unwrap is safe because the semaphore is never closed during the application's lifecycle. // `acquire()` only fails if the semaphore is closed, so this will always return `Ok`. let _permit = self.semaphore.acquire().await.unwrap(); - return match self - ._send_request(body, headers, execution_request.client_request) - .await - { + return match self._send_request(body, headers).await { Ok(shared_response) => HttpExecutionResponse { body: shared_response.body, headers: shared_response.headers, @@ -350,8 +265,7 @@ impl SubgraphExecutor for HTTPSubgraphExecutor { // This unwrap is safe because the semaphore is never closed during the application's lifecycle. // `acquire()` only fails if the semaphore is closed, so this will always return `Ok`. let _permit = self.semaphore.acquire().await.unwrap(); - self._send_request(body, headers, execution_request.client_request) - .await + self._send_request(body, headers).await }; // It's important to remove the entry from the map before returning the result. // This ensures that once the OnceCell is set, no future requests can join it. diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index 9dddeb648..f916422f8 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -1,6 +1,6 @@ use std::{ collections::{BTreeMap, HashMap}, - sync::Arc, + sync::Arc, time::Duration, }; use bytes::{BufMut, Bytes, BytesMut}; @@ -27,7 +27,7 @@ use crate::{ }, dedupe::{ABuildHasher, SharedResponse}, error::SubgraphExecutorError, - http::{DurationOrProgram, HTTPSubgraphExecutor, HttpClient}, + http::{HTTPSubgraphExecutor, HttpClient}, }, response::graphql_error::GraphQLError, utils::expression::{compile_expression, execute_expression_with_value}, @@ -40,6 +40,12 @@ type ExecutorsBySubgraphMap = type EndpointsBySubgraphMap = DashMap; type ExpressionsBySubgraphMap = HashMap; +#[derive(Debug)] +pub enum DurationOrProgram { + Duration(Duration), + Program(Box), +} + pub struct SubgraphExecutorMap { executors_by_subgraph: ExecutorsBySubgraphMap, /// Mapping from subgraph name to endpoint for quick lookup @@ -47,6 +53,7 @@ pub struct SubgraphExecutorMap { static_endpoints_by_subgraph: EndpointsBySubgraphMap, /// Mapping from subgraph name to VRL expression program expressions_by_subgraph: ExpressionsBySubgraphMap, + timeouts_by_subgraph: DashMap, config: Arc, client: Arc, semaphores_by_origin: DashMap>, @@ -74,6 +81,7 @@ impl SubgraphExecutorMap { semaphores_by_origin: Default::default(), max_connections_per_host, in_flight_requests: Arc::new(DashMap::with_hasher(ABuildHasher::default())), + timeouts_by_subgraph: Default::default(), } } @@ -104,13 +112,45 @@ impl SubgraphExecutorMap { Ok(subgraph_executor_map) } - pub async fn execute<'exec, 'req>( + pub async fn execute<'a, 'req>( &self, subgraph_name: &str, - execution_request: HttpExecutionRequest<'exec, 'req>, + execution_request: HttpExecutionRequest<'a>, + client_request: &ClientRequestDetails<'a, 'req>, ) -> HttpExecutionResponse { - match self.get_or_create_executor(subgraph_name, execution_request.client_request) { - Ok(Some(executor)) => executor.execute(execution_request).await, + match self.get_or_create_executor(subgraph_name, client_request) { + Ok(Some(executor)) => { + let timeout = self + .timeouts_by_subgraph + .get(subgraph_name) + .map(|t| resolve_duration_prog(t.value(), subgraph_name, client_request)); + match timeout { + Some(Ok(dur)) => tokio::time::timeout(dur, executor.execute(execution_request)) + .await + .unwrap_or_else(|_| { + error!( + "Request to subgraph '{}' timed out after {:?}", + subgraph_name, dur, + ); + self.internal_server_error_response( + SubgraphExecutorError::RequestTimeout( + subgraph_name.to_string(), + dur.as_millis() as u64, + ) + .into(), + subgraph_name, + ) + }), + Some(Err(err)) => { + error!( + "Timeout expression resolution error for subgraph '{}': {}", + subgraph_name, err, + ); + self.internal_server_error_response(err.into(), subgraph_name) + } + None => executor.execute(execution_request).await, + } + } Err(err) => { error!( "Subgraph executor error for subgraph '{}': {}", @@ -326,18 +366,22 @@ impl SubgraphExecutorMap { .unwrap_or(timeout_config); } - let timeout_prog = match &timeout_config { - DurationOrExpression::Duration(dur) => DurationOrProgram::Duration(*dur), - DurationOrExpression::Expression { expression } => { - let program = compile_expression(expression, None).map_err(|err| { - SubgraphExecutorError::RequestTimeoutExpressionBuild( - subgraph_name.to_string(), - err, - ) - })?; - DurationOrProgram::Program(Box::new(program)) - } - }; + if !self.timeouts_by_subgraph.contains_key(subgraph_name) { + let timeout = match &timeout_config { + DurationOrExpression::Duration(dur) => DurationOrProgram::Duration(*dur), + DurationOrExpression::Expression { expression } => { + let program = compile_expression(expression, None).map_err(|err| { + SubgraphExecutorError::RequestTimeoutExpressionBuild( + subgraph_name.to_string(), + err, + ) + })?; + DurationOrProgram::Program(Box::new(program)) + } + }; + self.timeouts_by_subgraph + .insert(subgraph_name.to_string(), timeout); + } let executor = HTTPSubgraphExecutor::new( subgraph_name.to_string(), @@ -346,7 +390,6 @@ impl SubgraphExecutorMap { semaphore, dedupe_enabled, self.in_flight_requests.clone(), - timeout_prog, ); self.executors_by_subgraph @@ -357,3 +400,67 @@ impl SubgraphExecutorMap { Ok(()) } } + +fn resolve_duration_prog( + duration_or_program: &DurationOrProgram, + subgraph_name: &str, + client_request: &ClientRequestDetails<'_, '_>, +) -> Result { + match duration_or_program { + DurationOrProgram::Duration(dur) => Ok(*dur), + DurationOrProgram::Program(program) => { + let value = + VrlValue::Object(BTreeMap::from([("request".into(), client_request.into())])); + let result = execute_expression_with_value(program, value).map_err(|err| { + SubgraphExecutorError::TimeoutExpressionResolution( + subgraph_name.to_string(), + err.to_string(), + ) + })?; + match result { + VrlValue::Integer(i) => { + if i < 0 { + return Err(SubgraphExecutorError::TimeoutExpressionResolution( + subgraph_name.to_string(), + "Timeout expression resolved to a negative integer".to_string(), + )); + } + Ok(std::time::Duration::from_millis(i as u64)) + } + VrlValue::Float(f) => { + let f = f.into_inner(); + if f < 0.0 { + return Err(SubgraphExecutorError::TimeoutExpressionResolution( + subgraph_name.to_string(), + "Timeout expression resolved to a negative float".to_string(), + )); + } + Ok(std::time::Duration::from_millis(f as u64)) + } + VrlValue::Bytes(b) => { + let s = std::str::from_utf8(&b).map_err(|e| { + SubgraphExecutorError::TimeoutExpressionResolution( + subgraph_name.to_string(), + format!("Failed to parse duration string from bytes: {}", e), + ) + })?; + Ok(humantime::parse_duration(s).map_err(|e| { + SubgraphExecutorError::TimeoutExpressionResolution( + subgraph_name.to_string(), + format!("Failed to parse duration string '{}': {}", s, e), + ) + })?) + } + other => { + Err(SubgraphExecutorError::TimeoutExpressionResolution( + subgraph_name.to_string(), + format!( + "Timeout expression resolved to an unexpected type: {}. Expected a non-negative integer/float (ms) or a duration string.", + other.kind() + ), + )) + } + } + } + } +} From 7cc103068f53fae0688df467b794d3864969e516 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Mon, 3 Nov 2025 15:18:06 +0300 Subject: [PATCH 15/20] Format --- lib/executor/src/executors/map.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index f916422f8..d191d4b0a 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -1,6 +1,7 @@ use std::{ collections::{BTreeMap, HashMap}, - sync::Arc, time::Duration, + sync::Arc, + time::Duration, }; use bytes::{BufMut, Bytes, BytesMut}; @@ -344,7 +345,10 @@ impl SubgraphExecutorMap { if let Some(subgraph_traffic_shaping_config) = self.config.traffic_shaping.subgraphs.get(subgraph_name) { - if subgraph_traffic_shaping_config.pool_idle_timeout_seconds.is_some() { + if subgraph_traffic_shaping_config + .pool_idle_timeout_seconds + .is_some() + { client = Arc::new( Client::builder(TokioExecutor::new()) .pool_timer(TokioTimer::new()) From 236b14efe5912dce3e47d5295f5a1655f8190518 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Mon, 3 Nov 2025 16:32:01 +0300 Subject: [PATCH 16/20] Less diff --- lib/router-config/src/override_labels.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/router-config/src/override_labels.rs b/lib/router-config/src/override_labels.rs index cbb7f28d1..b3dc01a75 100644 --- a/lib/router-config/src/override_labels.rs +++ b/lib/router-config/src/override_labels.rs @@ -15,7 +15,10 @@ pub enum LabelOverrideValue { /// A static boolean value to enable or disable the label. Boolean(bool), /// A dynamic value computed by an expression. - Expression { expression: String }, + Expression { + /// An expression that must evaluate to a boolean. If true, the label will be applied. + expression: String, + }, } impl LabelOverrideValue { From 5e8cef13ed7c2affbff701aeee913ec511f46e01 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Mon, 3 Nov 2025 16:40:50 +0300 Subject: [PATCH 17/20] Lets go --- .../src/executors/duration_or_prog.rs | 24 +++++++++++++ lib/executor/src/executors/map.rs | 36 +++++++------------ lib/executor/src/executors/mod.rs | 1 + 3 files changed, 37 insertions(+), 24 deletions(-) create mode 100644 lib/executor/src/executors/duration_or_prog.rs diff --git a/lib/executor/src/executors/duration_or_prog.rs b/lib/executor/src/executors/duration_or_prog.rs new file mode 100644 index 000000000..a2f8fb7d4 --- /dev/null +++ b/lib/executor/src/executors/duration_or_prog.rs @@ -0,0 +1,24 @@ +use std::time::Duration; + +use hive_router_config::traffic_shaping::DurationOrExpression; +use vrl::{compiler::Program as VrlProgram, prelude::Function}; + +use crate::utils::expression::compile_expression; + +pub enum DurationOrProgram { + Duration(Duration), + Program(Box), +} + +pub fn compile_duration_expression( + duration_or_expr: &DurationOrExpression, + fns: Option<&[Box]>, +) -> Result { + match duration_or_expr { + DurationOrExpression::Duration(dur) => Ok(DurationOrProgram::Duration(*dur)), + DurationOrExpression::Expression { expression } => { + let program = compile_expression(expression, fns)?; + Ok(DurationOrProgram::Program(Box::new(program))) + } + } +} diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index d191d4b0a..027e6fa97 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -6,10 +6,7 @@ use std::{ use bytes::{BufMut, Bytes, BytesMut}; use dashmap::DashMap; -use hive_router_config::{ - override_subgraph_urls::UrlOrExpression, traffic_shaping::DurationOrExpression, - HiveRouterConfig, -}; +use hive_router_config::{override_subgraph_urls::UrlOrExpression, HiveRouterConfig}; use http::Uri; use hyper_tls::HttpsConnector; use hyper_util::{ @@ -27,6 +24,7 @@ use crate::{ HttpExecutionRequest, HttpExecutionResponse, SubgraphExecutor, SubgraphExecutorBoxedArc, }, dedupe::{ABuildHasher, SharedResponse}, + duration_or_prog::{compile_duration_expression, DurationOrProgram}, error::SubgraphExecutorError, http::{HTTPSubgraphExecutor, HttpClient}, }, @@ -40,12 +38,7 @@ type ExecutorsBySubgraphMap = DashMap>; type EndpointsBySubgraphMap = DashMap; type ExpressionsBySubgraphMap = HashMap; - -#[derive(Debug)] -pub enum DurationOrProgram { - Duration(Duration), - Program(Box), -} +type TimeoutsBySubgraph = DashMap; pub struct SubgraphExecutorMap { executors_by_subgraph: ExecutorsBySubgraphMap, @@ -54,7 +47,7 @@ pub struct SubgraphExecutorMap { static_endpoints_by_subgraph: EndpointsBySubgraphMap, /// Mapping from subgraph name to VRL expression program expressions_by_subgraph: ExpressionsBySubgraphMap, - timeouts_by_subgraph: DashMap, + timeouts_by_subgraph: TimeoutsBySubgraph, config: Arc, client: Arc, semaphores_by_origin: DashMap>, @@ -371,20 +364,15 @@ impl SubgraphExecutorMap { } if !self.timeouts_by_subgraph.contains_key(subgraph_name) { - let timeout = match &timeout_config { - DurationOrExpression::Duration(dur) => DurationOrProgram::Duration(*dur), - DurationOrExpression::Expression { expression } => { - let program = compile_expression(expression, None).map_err(|err| { - SubgraphExecutorError::RequestTimeoutExpressionBuild( - subgraph_name.to_string(), - err, - ) - })?; - DurationOrProgram::Program(Box::new(program)) - } - }; + let timeout_prog: DurationOrProgram = compile_duration_expression(timeout_config, None) + .map_err(|err| { + SubgraphExecutorError::RequestTimeoutExpressionBuild( + subgraph_name.to_string(), + err, + ) + })?; self.timeouts_by_subgraph - .insert(subgraph_name.to_string(), timeout); + .insert(subgraph_name.to_string(), timeout_prog); } let executor = HTTPSubgraphExecutor::new( diff --git a/lib/executor/src/executors/mod.rs b/lib/executor/src/executors/mod.rs index 520ff5f94..7a4ab272d 100644 --- a/lib/executor/src/executors/mod.rs +++ b/lib/executor/src/executors/mod.rs @@ -1,5 +1,6 @@ pub mod common; pub mod dedupe; +pub mod duration_or_prog; pub mod error; pub mod http; pub mod map; From d3e97f30b9c36ea9380186397ba9c44c10e3a11f Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Tue, 4 Nov 2025 15:05:05 +0300 Subject: [PATCH 18/20] Better impl --- lib/executor/src/execution/plan.rs | 4 ++-- lib/executor/src/executors/common.rs | 9 ++++---- lib/executor/src/executors/http.rs | 31 +++++++++++++++++++++------- lib/executor/src/executors/map.rs | 26 ++++++----------------- 4 files changed, 37 insertions(+), 33 deletions(-) diff --git a/lib/executor/src/execution/plan.rs b/lib/executor/src/execution/plan.rs index f86356312..563c0eb24 100644 --- a/lib/executor/src/execution/plan.rs +++ b/lib/executor/src/execution/plan.rs @@ -19,7 +19,7 @@ use crate::{ rewrites::FetchRewriteExt, }, executors::{ - common::{HttpExecutionRequest, HttpExecutionResponse}, + common::{HttpExecutionResponse, SubgraphExecutionRequest}, map::SubgraphExecutorMap, }, headers::{ @@ -700,7 +700,7 @@ impl<'exec, 'req> Executor<'exec, 'req> { let variable_refs = select_fetch_variables(self.variable_values, node.variable_usages.as_ref()); - let mut subgraph_request = HttpExecutionRequest { + let mut subgraph_request = SubgraphExecutionRequest { query: node.operation.document_str.as_str(), dedupe: self.dedupe_subgraph_requests, operation_name: node.operation_name.as_deref(), diff --git a/lib/executor/src/executors/common.rs b/lib/executor/src/executors/common.rs index bdcd4d819..27e7828c0 100644 --- a/lib/executor/src/executors/common.rs +++ b/lib/executor/src/executors/common.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use async_trait::async_trait; use bytes::Bytes; @@ -9,7 +9,8 @@ use sonic_rs::Value; pub trait SubgraphExecutor { async fn execute<'a>( &self, - execution_request: HttpExecutionRequest<'a>, + execution_request: SubgraphExecutionRequest<'a>, + timeout: Option, ) -> HttpExecutionResponse; fn to_boxed_arc<'a>(self) -> Arc> @@ -26,7 +27,7 @@ pub type SubgraphExecutorBoxedArc = Arc>; pub type SubgraphRequestExtensions = HashMap; -pub struct HttpExecutionRequest<'a> { +pub struct SubgraphExecutionRequest<'a> { pub query: &'a str, pub dedupe: bool, pub operation_name: Option<&'a str>, @@ -37,7 +38,7 @@ pub struct HttpExecutionRequest<'a> { pub extensions: Option, } -impl HttpExecutionRequest<'_> { +impl SubgraphExecutionRequest<'_> { pub fn add_request_extensions_field(&mut self, key: String, value: Value) { self.extensions .get_or_insert_with(HashMap::new) diff --git a/lib/executor/src/executors/http.rs b/lib/executor/src/executors/http.rs index fb1f6bed0..e7673a457 100644 --- a/lib/executor/src/executors/http.rs +++ b/lib/executor/src/executors/http.rs @@ -1,8 +1,10 @@ use std::sync::Arc; +use std::time::Duration; use crate::executors::common::HttpExecutionResponse; use crate::executors::dedupe::{request_fingerprint, ABuildHasher, SharedResponse}; use dashmap::DashMap; +use futures::TryFutureExt; use tokio::sync::OnceCell; use async_trait::async_trait; @@ -18,7 +20,7 @@ use hyper_util::client::legacy::{connect::HttpConnector, Client}; use tokio::sync::Semaphore; use tracing::debug; -use crate::executors::common::HttpExecutionRequest; +use crate::executors::common::SubgraphExecutionRequest; use crate::executors::error::SubgraphExecutorError; use crate::response::graphql_error::GraphQLError; use crate::utils::consts::CLOSE_BRACE; @@ -75,7 +77,7 @@ impl HTTPSubgraphExecutor { fn build_request_body( &self, - execution_request: &HttpExecutionRequest<'_>, + execution_request: &SubgraphExecutionRequest<'_>, ) -> Result, SubgraphExecutorError> { let mut body = Vec::with_capacity(4096); body.put(FIRST_QUOTE_STR); @@ -136,6 +138,7 @@ impl HTTPSubgraphExecutor { &self, body: Vec, headers: HeaderMap, + timeout: Option, ) -> Result { let mut req = hyper::Request::builder() .method(http::Method::POST) @@ -150,9 +153,22 @@ impl HTTPSubgraphExecutor { debug!("making http request to {}", self.endpoint.to_string()); - let res = self.http_client.request(req).await.map_err(|e| { + let res_fut = self.http_client.request(req).map_err(|e| { SubgraphExecutorError::RequestFailure(self.endpoint.to_string(), e.to_string()) - })?; + }); + + let res = if let Some(timeout_duration) = timeout { + tokio::time::timeout(timeout_duration, res_fut) + .await + .map_err(|_| { + SubgraphExecutorError::RequestTimeout( + self.endpoint.to_string(), + timeout_duration.as_secs(), + ) + })? + } else { + res_fut.await + }?; debug!( "http request to {} completed, status: {}", @@ -211,7 +227,8 @@ impl SubgraphExecutor for HTTPSubgraphExecutor { #[tracing::instrument(skip_all, fields(subgraph_name = %self.subgraph_name))] async fn execute<'a>( &self, - execution_request: HttpExecutionRequest<'a>, + execution_request: SubgraphExecutionRequest<'a>, + timeout: Option, ) -> HttpExecutionResponse { let body = match self.build_request_body(&execution_request) { Ok(body) => body, @@ -233,7 +250,7 @@ impl SubgraphExecutor for HTTPSubgraphExecutor { // This unwrap is safe because the semaphore is never closed during the application's lifecycle. // `acquire()` only fails if the semaphore is closed, so this will always return `Ok`. let _permit = self.semaphore.acquire().await.unwrap(); - return match self._send_request(body, headers).await { + return match self._send_request(body, headers, timeout).await { Ok(shared_response) => HttpExecutionResponse { body: shared_response.body, headers: shared_response.headers, @@ -265,7 +282,7 @@ impl SubgraphExecutor for HTTPSubgraphExecutor { // This unwrap is safe because the semaphore is never closed during the application's lifecycle. // `acquire()` only fails if the semaphore is closed, so this will always return `Ok`. let _permit = self.semaphore.acquire().await.unwrap(); - self._send_request(body, headers).await + self._send_request(body, headers, timeout).await }; // It's important to remove the entry from the map before returning the result. // This ensures that once the OnceCell is set, no future requests can join it. diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index 027e6fa97..66faceb88 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -21,7 +21,8 @@ use crate::{ execution::client_request_details::ClientRequestDetails, executors::{ common::{ - HttpExecutionRequest, HttpExecutionResponse, SubgraphExecutor, SubgraphExecutorBoxedArc, + HttpExecutionResponse, SubgraphExecutionRequest, SubgraphExecutor, + SubgraphExecutorBoxedArc, }, dedupe::{ABuildHasher, SharedResponse}, duration_or_prog::{compile_duration_expression, DurationOrProgram}, @@ -109,7 +110,7 @@ impl SubgraphExecutorMap { pub async fn execute<'a, 'req>( &self, subgraph_name: &str, - execution_request: HttpExecutionRequest<'a>, + execution_request: SubgraphExecutionRequest<'a>, client_request: &ClientRequestDetails<'a, 'req>, ) -> HttpExecutionResponse { match self.get_or_create_executor(subgraph_name, client_request) { @@ -119,30 +120,15 @@ impl SubgraphExecutorMap { .get(subgraph_name) .map(|t| resolve_duration_prog(t.value(), subgraph_name, client_request)); match timeout { - Some(Ok(dur)) => tokio::time::timeout(dur, executor.execute(execution_request)) - .await - .unwrap_or_else(|_| { - error!( - "Request to subgraph '{}' timed out after {:?}", - subgraph_name, dur, - ); - self.internal_server_error_response( - SubgraphExecutorError::RequestTimeout( - subgraph_name.to_string(), - dur.as_millis() as u64, - ) - .into(), - subgraph_name, - ) - }), + Some(Ok(dur)) => executor.execute(execution_request, Some(dur)).await, Some(Err(err)) => { error!( - "Timeout expression resolution error for subgraph '{}': {}", + "Failed to resolve timeout for subgraph '{}': {}", subgraph_name, err, ); self.internal_server_error_response(err.into(), subgraph_name) } - None => executor.execute(execution_request).await, + None => executor.execute(execution_request, None).await, } } Err(err) => { From 8c0549a3a5632d7495d2bff3eb8f9c368e15fe0b Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Tue, 4 Nov 2025 15:12:30 +0300 Subject: [PATCH 19/20] Default values --- lib/router-config/src/traffic_shaping.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/router-config/src/traffic_shaping.rs b/lib/router-config/src/traffic_shaping.rs index 53f691237..7b4fccb41 100644 --- a/lib/router-config/src/traffic_shaping.rs +++ b/lib/router-config/src/traffic_shaping.rs @@ -119,7 +119,7 @@ pub struct TrafficShapingExecutorConfig { } fn default_request_timeout() -> DurationOrExpression { - DurationOrExpression::Duration(Duration::from_secs(15)) + DurationOrExpression::Duration(Duration::from_secs(30)) } #[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)] From d069f91a788c2905ece1985a2efa837892f5935d Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Tue, 4 Nov 2025 15:33:51 +0300 Subject: [PATCH 20/20] Fix --- docs/README.md | 15 ++++++--------- lib/router-config/src/traffic_shaping.rs | 1 + 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/docs/README.md b/docs/README.md index b54ec0592..d119d60e1 100644 --- a/docs/README.md +++ b/docs/README.md @@ -15,7 +15,7 @@ |[**override\_subgraph\_urls**](#override_subgraph_urls)|`object`|Configuration for overriding subgraph URLs.
Default: `{}`
|| |[**query\_planner**](#query_planner)|`object`|Query planning configuration.
Default: `{"allow_expose":false,"timeout":"10s"}`
|| |[**supergraph**](#supergraph)|`object`|Configuration for the Federation supergraph source. By default, the router will use a local file-based supergraph source (`./supergraph.graphql`).
|| -|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs.
Default: `{"all":{"dedupe_enabled":true,"pool_idle_timeout":"50s","request_timeout":{"Duration":"15s"}},"max_connections_per_host":100}`
|| +|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs.
Default: `{"all":{"dedupe_enabled":true,"pool_idle_timeout":"50s","request_timeout":"30s"},"max_connections_per_host":100}`
|| **Additional Properties:** not allowed **Example** @@ -110,8 +110,7 @@ traffic_shaping: all: dedupe_enabled: true pool_idle_timeout: 50s - request_timeout: - Duration: 15s + request_timeout: 30s max_connections_per_host: 100 ``` @@ -1818,7 +1817,7 @@ Configuration for the traffic-shaping of the executor. Use these configurations |Name|Type|Description|Required| |----|----|-----------|--------| -|[**all**](#traffic_shapingall)|`object`|The default configuration that will be applied to all subgraphs, unless overridden by a specific subgraph configuration.
Default: `{"dedupe_enabled":true,"pool_idle_timeout":"50s","request_timeout":{"Duration":"15s"}}`
|| +|[**all**](#traffic_shapingall)|`object`|The default configuration that will be applied to all subgraphs, unless overridden by a specific subgraph configuration.
Default: `{"dedupe_enabled":true,"pool_idle_timeout":"50s","request_timeout":"30s"}`
|| |**max\_connections\_per\_host**|`integer`|Limits the concurrent amount of requests/connections per host/subgraph.
Default: `100`
Format: `"uint"`
Minimum: `0`
|| |[**subgraphs**](#traffic_shapingsubgraphs)|`object`|Optional per-subgraph configurations that will override the default configuration for specific subgraphs.
|| @@ -1829,8 +1828,7 @@ Configuration for the traffic-shaping of the executor. Use these configurations all: dedupe_enabled: true pool_idle_timeout: 50s - request_timeout: - Duration: 15s + request_timeout: 30s max_connections_per_host: 100 ``` @@ -1847,7 +1845,7 @@ The default configuration that will be applied to all subgraphs, unless overridd |----|----|-----------|--------| |**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.

When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will
be deduplicated by sharing the response of other in-flight requests.
Default: `true`
|| |**pool\_idle\_timeout**|`string`|Timeout for idle sockets being kept-alive.
Default: `"50s"`
|| -|**request\_timeout**||Optional timeout configuration for requests to subgraphs.

Example with a fixed duration:
```yaml
timeout:
duration: 5s
```

Or with a VRL expression that can return a duration based on the operation kind:
```yaml
timeout:
expression: \|
if (.request.operation.type == "mutation") {
"10s"
} else {
"15s"
}
```
Default: `{"Duration":"15s"}`
|| +|**request\_timeout**||Optional timeout configuration for requests to subgraphs.

Example with a fixed duration:
```yaml
timeout:
duration: 5s
```

Or with a VRL expression that can return a duration based on the operation kind:
```yaml
timeout:
expression: \|
if (.request.operation.type == "mutation") {
"10s"
} else {
"15s"
}
```
Default: `"30s"`
|| **Additional Properties:** not allowed **Example** @@ -1855,8 +1853,7 @@ The default configuration that will be applied to all subgraphs, unless overridd ```yaml dedupe_enabled: true pool_idle_timeout: 50s -request_timeout: - Duration: 15s +request_timeout: 30s ``` diff --git a/lib/router-config/src/traffic_shaping.rs b/lib/router-config/src/traffic_shaping.rs index 7b4fccb41..0693ac277 100644 --- a/lib/router-config/src/traffic_shaping.rs +++ b/lib/router-config/src/traffic_shaping.rs @@ -123,6 +123,7 @@ fn default_request_timeout() -> DurationOrExpression { } #[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)] +#[serde(untagged)] pub enum DurationOrExpression { /// A fixed duration, e.g., "5s" or "100ms". #[serde(