diff --git a/.changeset/shared_utilities_to_handle_vrl_expressions.md b/.changeset/shared_utilities_to_handle_vrl_expressions.md new file mode 100644 index 000000000..97c07aa3a --- /dev/null +++ b/.changeset/shared_utilities_to_handle_vrl_expressions.md @@ -0,0 +1,15 @@ +--- +default: minor +--- + +# Breaking + +Removed `pool_idle_timeout_seconds` from `traffic_shaping`, instead use `pool_idle_timeout` with duration format. + +```diff +traffic_shaping: +- pool_idle_timeout_seconds: 30 ++ pool_idle_timeout: 30s +``` + +#540 by @ardatan diff --git a/Cargo.lock b/Cargo.lock index 360f7cd81..c03250a47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1371,14 +1371,17 @@ checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" name = "e2e" version = "0.0.1" dependencies = [ + "hex", "hive-router", "hive-router-config", + "hmac", "insta", "jsonwebtoken", "lazy_static", "mockito", "ntex", "reqwest", + "sha2", "sonic-rs", "subgraphs", "tempfile", @@ -2079,8 +2082,10 @@ dependencies = [ "futures", "graphql-parser", "graphql-tools", + "hex", "hive-router-config", "hive-router-query-planner", + "hmac", "http", "http-body-util", "hyper", @@ -2090,10 +2095,12 @@ dependencies = [ "insta", "itoa", "ntex-http", + "once_cell", "ordered-float", "regex-automata", "ryu", "serde", + "sha2", "sonic-rs", "strum 0.27.2", "subgraphs", diff --git a/bin/router/src/pipeline/progressive_override.rs b/bin/router/src/pipeline/progressive_override.rs index d0b09c183..0c2ad9c15 100644 --- a/bin/router/src/pipeline/progressive_override.rs +++ b/bin/router/src/pipeline/progressive_override.rs @@ -1,20 +1,22 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use hive_router_config::override_labels::{LabelOverrideValue, OverrideLabelsConfig}; -use hive_router_plan_executor::execution::client_request_details::ClientRequestDetails; +use hive_router_plan_executor::{ + execution::client_request_details::ClientRequestDetails, utils::expression::compile_expression, +}; use hive_router_query_planner::{ graph::{PlannerOverrideContext, PERCENTAGE_SCALE_FACTOR}, state::supergraph_state::SupergraphState, }; use rand::Rng; use vrl::{ - compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue}, + compiler::Program as VrlProgram, + compiler::TargetValue as VrlTargetValue, core::Value as VrlValue, prelude::{ state::RuntimeState as VrlState, Context as VrlContext, ExpressionError, TimeZone as VrlTimeZone, }, - stdlib::all as vrl_build_functions, value::Secrets as VrlSecrets, }; @@ -126,7 +128,6 @@ impl OverrideLabelsEvaluator { ) -> Result { let mut static_enabled_labels = HashSet::new(); let mut expressions = HashMap::new(); - let vrl_functions = vrl_build_functions(); for (label, value) in override_labels_config.iter() { match value { @@ -134,19 +135,13 @@ impl OverrideLabelsEvaluator { static_enabled_labels.insert(label.clone()); } LabelOverrideValue::Expression { expression } => { - let compilation_result = - vrl_compile(expression, &vrl_functions).map_err(|diagnostics| { - OverrideLabelsCompileError { - label: label.clone(), - error: diagnostics - .errors() - .into_iter() - .map(|d| d.code.to_string() + ": " + &d.message) - .collect::>() - .join(", "), - } - })?; - expressions.insert(label.clone(), compilation_result.program); + let program = compile_expression(expression, None).map_err(|err| { + OverrideLabelsCompileError { + label: label.clone(), + error: err.to_string(), + } + })?; + expressions.insert(label.clone(), program); } _ => {} // Skip false booleans } diff --git a/docs/README.md b/docs/README.md index fb474b9d2..548a983e1 100644 --- a/docs/README.md +++ b/docs/README.md @@ -8,6 +8,7 @@ |[**csrf**](#csrf)|`object`|Configuration for CSRF prevention.
Default: `{"enabled":false,"required_headers":[]}`
|| |[**graphiql**](#graphiql)|`object`|Configuration for the GraphiQL interface.
Default: `{"enabled":true}`
|| |[**headers**](#headers)|`object`|Configuration for the headers.
Default: `{}`
|| +|[**hmac\_signature**](#hmac_signature)|`object`||yes| |[**http**](#http)|`object`|Configuration for the HTTP server/listener.
Default: `{"host":"0.0.0.0","port":4000}`
|| |[**jwt**](#jwt)|`object`|Configuration for JWT authentication plugin.
|yes| |[**log**](#log)|`object`|The router logger configuration.
Default: `{"filter":null,"format":"json","level":"info"}`
|| @@ -15,7 +16,7 @@ |[**override\_subgraph\_urls**](#override_subgraph_urls)|`object`|Configuration for overriding subgraph URLs.
Default: `{}`
|| |[**query\_planner**](#query_planner)|`object`|Query planning configuration.
Default: `{"allow_expose":false,"timeout":"10s"}`
|| |[**supergraph**](#supergraph)|`object`|Configuration for the Federation supergraph source. By default, the router will use a local file-based supergraph source (`./supergraph.graphql`).
|| -|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs.
Default: `{"dedupe_enabled":true,"max_connections_per_host":100,"pool_idle_timeout_seconds":50}`
|| +|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs.
Default: `{"dedupe_enabled":true,"max_connections_per_host":100,"pool_idle_timeout":"50s"}`
|| **Additional Properties:** not allowed **Example** @@ -57,6 +58,9 @@ headers: default: unknown named: x-tenant-id rename: x-acct-tenant +hmac_signature: + enabled: false + extension_name: hmac-signature http: host: 0.0.0.0 port: 4000 @@ -109,7 +113,7 @@ supergraph: {} traffic_shaping: dedupe_enabled: true max_connections_per_host: 100 - pool_idle_timeout_seconds: 50 + pool_idle_timeout: 50s ``` @@ -1341,6 +1345,25 @@ For more information on the available functions and syntax, see the |**expression**|`string`||yes| + +## hmac\_signature: object + +**Properties** + +|Name|Type|Description|Required| +|----|----|-----------|--------| +|**enabled**||Default: `false`
|no| +|**extension\_name**|`string`|Default: `"hmac-signature"`
|no| +|**secret**|`string`||yes| + +**Example** + +```yaml +enabled: false +extension_name: hmac-signature + +``` + ## http: object @@ -1808,7 +1831,7 @@ Request timeout for the Hive Console CDN requests. ## traffic\_shaping: object -Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs. +Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs. **Properties** @@ -1817,7 +1840,7 @@ Configuration for the traffic-shaper executor. Use these configurations to contr |----|----|-----------|--------| |**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.

When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will
be deduplicated by sharing the response of other in-flight requests.
Default: `true`
|| |**max\_connections\_per\_host**|`integer`|Limits the concurrent amount of requests/connections per host/subgraph.
Default: `100`
Format: `"uint"`
Minimum: `0`
|| -|**pool\_idle\_timeout\_seconds**|`integer`|Timeout for idle sockets being kept-alive.
Default: `50`
Format: `"uint64"`
Minimum: `0`
|| +|**pool\_idle\_timeout**|`string`|Timeout for idle sockets being kept-alive.
Default: `"50s"`
|| **Additional Properties:** not allowed **Example** @@ -1825,7 +1848,7 @@ Configuration for the traffic-shaper executor. Use these configurations to contr ```yaml dedupe_enabled: true max_connections_per_host: 100 -pool_idle_timeout_seconds: 50 +pool_idle_timeout: 50s ``` diff --git a/e2e/Cargo.toml b/e2e/Cargo.toml index 5a604afc1..f26730317 100644 --- a/e2e/Cargo.toml +++ b/e2e/Cargo.toml @@ -24,3 +24,7 @@ subgraphs = { path = "../bench/subgraphs" } mockito = "1.7.0" tempfile = "3.23.0" + +hmac = "0.12.1" +sha2 = "0.10.9" +hex = "0.4.3" diff --git a/e2e/configs/hmac_forward.router.yaml b/e2e/configs/hmac_forward.router.yaml new file mode 100644 index 000000000..f029385d6 --- /dev/null +++ b/e2e/configs/hmac_forward.router.yaml @@ -0,0 +1,3 @@ +hmac_signature: + enabled: true + secret: VERY_SECRET \ No newline at end of file diff --git a/e2e/src/hmac.rs b/e2e/src/hmac.rs new file mode 100644 index 000000000..1487f674e --- /dev/null +++ b/e2e/src/hmac.rs @@ -0,0 +1,56 @@ +#[cfg(test)] +mod hmac_e2e_tests { + use crate::testkit::{ + init_graphql_request, init_router_from_config_file, wait_for_readiness, SubgraphsServer, + }; + + use ntex::web::test; + use sonic_rs::JsonValueTrait; + + fn create_hmac_signature(secret: &str, query: &str) -> String { + use hex; + use hmac::{Hmac, Mac}; + use sha2::Sha256; + + type HmacSha256 = Hmac; + + let mut mac = + HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size"); + let message = format!("{{\"query\":\"{}\"}}", query); + mac.update(message.as_bytes()); + let result = mac.finalize(); + let code_bytes = result.into_bytes(); + hex::encode(code_bytes) + } + + #[ntex::test] + async fn should_forward_hmac_signature_to_subgraph_via_extensions() { + let subgraphs_server = SubgraphsServer::start().await; + let app = init_router_from_config_file("configs/hmac_forward.router.yaml") + .await + .unwrap(); + wait_for_readiness(&app.app).await; + let query = "query{users{id}}"; + let req = init_graphql_request(query, None); + let resp: ntex::web::WebResponse = test::call_service(&app.app, req.to_request()).await; + + assert!(resp.status().is_success(), "Expected 200 OK"); + + let subgraph_requests = subgraphs_server + .get_subgraph_requests_log("accounts") + .await + .expect("expected requests sent to accounts subgraph"); + assert_eq!( + subgraph_requests.len(), + 1, + "expected 1 request to accounts subgraph" + ); + let extensions = subgraph_requests[0].request_body.get("extensions").unwrap(); + + let expected_signature = create_hmac_signature("VERY_SECRET", query); + assert_eq!( + extensions.get("hmac-signature").unwrap(), + &expected_signature + ); + } +} diff --git a/e2e/src/lib.rs b/e2e/src/lib.rs index 9086e01f4..ca4cd588c 100644 --- a/e2e/src/lib.rs +++ b/e2e/src/lib.rs @@ -3,6 +3,8 @@ mod file_supergraph; #[cfg(test)] mod hive_cdn_supergraph; #[cfg(test)] +mod hmac; +#[cfg(test)] mod jwt; #[cfg(test)] mod override_subgraph_urls; diff --git a/lib/executor/Cargo.toml b/lib/executor/Cargo.toml index c3f6f9117..c38f491fe 100644 --- a/lib/executor/Cargo.toml +++ b/lib/executor/Cargo.toml @@ -49,6 +49,10 @@ itoa = "1.0.15" ryu = "1.0.20" indexmap = "2.10.0" bumpalo = "3.19.0" +once_cell = "1.21.3" +hmac = "0.12.1" +sha2 = "0.10.9" +hex = "0.4.3" [dev-dependencies] subgraphs = { path = "../../bench/subgraphs" } diff --git a/lib/executor/src/execution/hmac.rs b/lib/executor/src/execution/hmac.rs new file mode 100644 index 000000000..6e708f7e2 --- /dev/null +++ b/lib/executor/src/execution/hmac.rs @@ -0,0 +1,111 @@ +use std::collections::BTreeMap; + +use bytes::{BufMut, Bytes}; +use hive_router_config::hmac_signature::{BooleanOrExpression, HMACSignatureConfig}; +use hmac::{Hmac, Mac}; +use sha2::Sha256; +use vrl::{compiler::Program as VrlProgram, core::Value as VrlValue}; + +use crate::{ + execution::client_request_details::ClientRequestDetails, + executors::{error::SubgraphExecutorError, http::FIRST_EXTENSION_STR}, + utils::{ + consts::{CLOSE_BRACE, COLON, COMMA, QUOTE}, + expression::{compile_expression, execute_expression_with_value}, + }, +}; + +#[derive(Debug)] +pub enum BooleanOrProgram { + Boolean(bool), + Program(Box), +} + +pub fn compile_hmac_config( + config: &HMACSignatureConfig, +) -> Result { + match &config.enabled { + BooleanOrExpression::Boolean(b) => Ok(BooleanOrProgram::Boolean(*b)), + BooleanOrExpression::Expression { expression } => { + let program = compile_expression(expression, None) + .map_err(SubgraphExecutorError::HMACExpressionBuild)?; + Ok(BooleanOrProgram::Program(Box::new(program))) + } + } +} +type HmacSha256 = Hmac; + +pub fn sign_hmac( + hmac_program: &BooleanOrProgram, + hmac_config: &HMACSignatureConfig, + subgraph_name: &str, + client_request: &ClientRequestDetails, + first_extension: &mut bool, + body: &mut Vec, +) -> Result<(), SubgraphExecutorError> { + let should_sign_hmac = match &hmac_program { + BooleanOrProgram::Boolean(b) => *b, + BooleanOrProgram::Program(expr) => { + // .subgraph + let subgraph_value = VrlValue::Object(BTreeMap::from([( + "name".into(), + VrlValue::Bytes(Bytes::from(subgraph_name.to_owned())), + )])); + // .request + let request_value: VrlValue = client_request.into(); + let target_value = VrlValue::Object(BTreeMap::from([ + ("subgraph".into(), subgraph_value), + ("request".into(), request_value), + ])); + let result = execute_expression_with_value(expr, target_value); + match result { + Ok(VrlValue::Boolean(b)) => b, + Ok(_) => { + return Err(SubgraphExecutorError::HMACSignatureError( + "HMAC signature expression did not evaluate to a boolean".to_string(), + )); + } + Err(e) => { + return Err(SubgraphExecutorError::HMACSignatureError(format!( + "HMAC signature expression evaluation error: {}", + e + ))); + } + } + } + }; + + if should_sign_hmac { + if hmac_config.secret.is_empty() { + return Err(SubgraphExecutorError::HMACSignatureError( + "HMAC signature secret is empty".to_string(), + )); + } + let mut mac = HmacSha256::new_from_slice(hmac_config.secret.as_bytes()).map_err(|e| { + SubgraphExecutorError::HMACSignatureError(format!( + "Failed to create HMAC instance: {}", + e + )) + })?; + let mut body_without_extensions = body.clone(); + body_without_extensions.put(CLOSE_BRACE); + mac.update(&body_without_extensions); + let result = mac.finalize(); + let result_bytes = result.into_bytes(); + if *first_extension { + body.put(FIRST_EXTENSION_STR); + *first_extension = false; + } else { + body.put(COMMA); + } + body.put(QUOTE); + body.put(hmac_config.extension_name.as_bytes()); + body.put(QUOTE); + body.put(COLON); + let hmac_hex = hex::encode(result_bytes); + body.put(QUOTE); + body.put(hmac_hex.as_bytes()); + body.put(QUOTE); + } + Ok(()) +} diff --git a/lib/executor/src/execution/mod.rs b/lib/executor/src/execution/mod.rs index 52dc59506..e66ab00d6 100644 --- a/lib/executor/src/execution/mod.rs +++ b/lib/executor/src/execution/mod.rs @@ -1,5 +1,6 @@ pub mod client_request_details; pub mod error; +pub mod hmac; pub mod jwt_forward; pub mod plan; pub mod rewrites; diff --git a/lib/executor/src/execution/plan.rs b/lib/executor/src/execution/plan.rs index f86356312..90102542c 100644 --- a/lib/executor/src/execution/plan.rs +++ b/lib/executor/src/execution/plan.rs @@ -708,6 +708,7 @@ impl<'exec, 'req> Executor<'exec, 'req> { representations, headers: headers_map, extensions: None, + client_request: self.client_request, }; if let Some(jwt_forwarding_plan) = &self.jwt_forwarding_plan { @@ -722,7 +723,7 @@ impl<'exec, 'req> Executor<'exec, 'req> { subgraph_name: node.service_name.clone(), response: self .executors - .execute(&node.service_name, subgraph_request, self.client_request) + .execute(&node.service_name, subgraph_request) .await .into(), })) diff --git a/lib/executor/src/executors/common.rs b/lib/executor/src/executors/common.rs index bdcd4d819..44fc455d6 100644 --- a/lib/executor/src/executors/common.rs +++ b/lib/executor/src/executors/common.rs @@ -5,11 +5,13 @@ use bytes::Bytes; use http::HeaderMap; use sonic_rs::Value; +use crate::execution::client_request_details::ClientRequestDetails; + #[async_trait] pub trait SubgraphExecutor { - async fn execute<'a>( + async fn execute<'exec, 'req>( &self, - execution_request: HttpExecutionRequest<'a>, + execution_request: HttpExecutionRequest<'exec, 'req>, ) -> HttpExecutionResponse; fn to_boxed_arc<'a>(self) -> Arc> @@ -26,18 +28,19 @@ pub type SubgraphExecutorBoxedArc = Arc>; pub type SubgraphRequestExtensions = HashMap; -pub struct HttpExecutionRequest<'a> { - pub query: &'a str, +pub struct HttpExecutionRequest<'exec, 'req> { + pub query: &'exec str, pub dedupe: bool, - pub operation_name: Option<&'a str>, + pub operation_name: Option<&'exec str>, // TODO: variables could be stringified before even executing the request - pub variables: Option>, + pub variables: Option>, pub headers: HeaderMap, pub representations: Option>, pub extensions: Option, + pub client_request: &'exec ClientRequestDetails<'exec, 'req>, } -impl HttpExecutionRequest<'_> { +impl HttpExecutionRequest<'_, '_> { pub fn add_request_extensions_field(&mut self, key: String, value: Value) { self.extensions .get_or_insert_with(HashMap::new) diff --git a/lib/executor/src/executors/error.rs b/lib/executor/src/executors/error.rs index 2234f524c..972448237 100644 --- a/lib/executor/src/executors/error.rs +++ b/lib/executor/src/executors/error.rs @@ -1,4 +1,4 @@ -use vrl::{diagnostic::DiagnosticList, prelude::ExpressionError}; +use vrl::prelude::ExpressionError; use crate::response::graphql_error::{GraphQLError, GraphQLErrorExtensions}; @@ -20,6 +20,12 @@ pub enum SubgraphExecutorError { RequestFailure(String, String), #[error("Failed to serialize variable \"{0}\": {1}")] VariablesSerializationFailure(String, String), + #[error("Failed to serialize extension \"{0}\": {1}")] + ExtensionSerializationFailure(String, String), + #[error("Failed to build HMAC expression. Please check your VRL expression for syntax errors. Diagnostic: {0}")] + HMACExpressionBuild(String), + #[error("HMAC signature error: {0}")] + HMACSignatureError(String), } impl From for GraphQLError { @@ -34,21 +40,6 @@ impl From for GraphQLError { } impl SubgraphExecutorError { - pub fn new_endpoint_expression_build( - subgraph_name: String, - diagnostics: DiagnosticList, - ) -> Self { - SubgraphExecutorError::EndpointExpressionBuild( - subgraph_name, - diagnostics - .errors() - .into_iter() - .map(|d| d.code.to_string() + ": " + &d.message) - .collect::>() - .join(", "), - ) - } - pub fn new_endpoint_expression_resolution_failure( subgraph_name: String, error: ExpressionError, @@ -76,6 +67,13 @@ impl SubgraphExecutorError { SubgraphExecutorError::VariablesSerializationFailure(_, _) => { "SUBGRAPH_VARIABLES_SERIALIZATION_FAILURE" } + SubgraphExecutorError::ExtensionSerializationFailure(_, _) => { + "SUBGRAPH_EXTENSION_SERIALIZATION_FAILURE" + } + SubgraphExecutorError::HMACSignatureError(_) => "SUBGRAPH_HMAC_SIGNATURE_ERROR", + SubgraphExecutorError::HMACExpressionBuild(_) => { + "SUBGRAPH_HMAC_EXPRESSION_BUILD_FAILURE" + } } } } diff --git a/lib/executor/src/executors/http.rs b/lib/executor/src/executors/http.rs index 29b392567..79bf186d9 100644 --- a/lib/executor/src/executors/http.rs +++ b/lib/executor/src/executors/http.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use crate::execution::hmac::{sign_hmac, BooleanOrProgram}; use crate::executors::common::HttpExecutionResponse; use crate::executors::dedupe::{request_fingerprint, ABuildHasher, SharedResponse}; use dashmap::DashMap; @@ -37,10 +38,12 @@ pub struct HTTPSubgraphExecutor { pub semaphore: Arc, pub config: Arc, pub in_flight_requests: Arc>, ABuildHasher>>, + pub should_sign_hmac: Arc, } const FIRST_VARIABLE_STR: &[u8] = b",\"variables\":{"; const FIRST_QUOTE_STR: &[u8] = b"{\"query\":"; +pub const FIRST_EXTENSION_STR: &[u8] = b",\"extensions\":{"; pub type HttpClient = Client, Full>; @@ -52,6 +55,7 @@ impl HTTPSubgraphExecutor { semaphore: Arc, config: Arc, in_flight_requests: Arc>, ABuildHasher>>, + should_sign_hmac: Arc, ) -> Self { let mut header_map = HeaderMap::new(); header_map.insert( @@ -71,12 +75,13 @@ impl HTTPSubgraphExecutor { semaphore, config, in_flight_requests, + should_sign_hmac, } } - fn build_request_body<'a>( + fn build_request_body<'exec, 'req>( &self, - execution_request: &HttpExecutionRequest<'a>, + execution_request: &HttpExecutionRequest<'exec, 'req>, ) -> Result, SubgraphExecutorError> { let mut body = Vec::with_capacity(4096); body.put(FIRST_QUOTE_STR); @@ -118,16 +123,45 @@ impl HTTPSubgraphExecutor { body.put(CLOSE_BRACE); } - if let Some(extensions) = &execution_request.extensions { - if !extensions.is_empty() { - let as_value = sonic_rs::to_value(extensions).unwrap(); + let mut first_extension = true; + + if !self.config.hmac_signature.is_disabled() { + sign_hmac( + &self.should_sign_hmac, + &self.config.hmac_signature, + &self.subgraph_name, + execution_request.client_request, + &mut first_extension, + &mut body, + )?; + } - body.put(COMMA); - body.put("\"extensions\":".as_bytes()); - body.extend_from_slice(as_value.to_string().as_bytes()); + if let Some(extensions) = &execution_request.extensions { + for (extension_name, extension_value) in extensions { + if first_extension { + body.put(FIRST_EXTENSION_STR); + first_extension = false; + } else { + body.put(COMMA); + } + body.put(QUOTE); + body.put(extension_name.as_bytes()); + body.put(QUOTE); + body.put(COLON); + let value_str = sonic_rs::to_string(extension_value).map_err(|err| { + SubgraphExecutorError::ExtensionSerializationFailure( + extension_name.to_string(), + err.to_string(), + ) + })?; + body.put(value_str.as_bytes()); } } + if !first_extension { + body.put(CLOSE_BRACE); + } + body.put(CLOSE_BRACE); Ok(body) @@ -210,9 +244,9 @@ impl HTTPSubgraphExecutor { #[async_trait] impl SubgraphExecutor for HTTPSubgraphExecutor { #[tracing::instrument(skip_all, fields(subgraph_name = self.subgraph_name))] - async fn execute<'a>( + async fn execute<'exec, 'req>( &self, - execution_request: HttpExecutionRequest<'a>, + execution_request: HttpExecutionRequest<'exec, 'req>, ) -> HttpExecutionResponse { let body = match self.build_request_body(&execution_request) { Ok(body) => body, diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index a3c297ad1..16bcf46a6 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -1,7 +1,6 @@ use std::{ collections::{BTreeMap, HashMap}, sync::Arc, - time::Duration, }; use bytes::{BufMut, Bytes, BytesMut}; @@ -15,19 +14,13 @@ use hyper_util::{ }; use tokio::sync::{OnceCell, Semaphore}; use tracing::error; -use vrl::{ - compiler::compile as vrl_compile, - compiler::Program as VrlProgram, - compiler::TargetValue as VrlTargetValue, - core::Value as VrlValue, - prelude::Function as VrlFunction, - prelude::{state::RuntimeState as VrlState, Context as VrlContext, TimeZone as VrlTimeZone}, - stdlib::all as vrl_build_functions, - value::Secrets as VrlSecrets, -}; +use vrl::{compiler::Program as VrlProgram, core::Value as VrlValue}; use crate::{ - execution::client_request_details::ClientRequestDetails, + execution::{ + client_request_details::ClientRequestDetails, + hmac::{compile_hmac_config, BooleanOrProgram}, + }, executors::{ common::{ HttpExecutionRequest, HttpExecutionResponse, SubgraphExecutor, SubgraphExecutorBoxedArc, @@ -37,6 +30,7 @@ use crate::{ http::{HTTPSubgraphExecutor, HttpClient}, }, response::graphql_error::GraphQLError, + utils::expression::{compile_expression, execute_expression_with_value}, }; type SubgraphName = String; @@ -54,45 +48,44 @@ pub struct SubgraphExecutorMap { /// Mapping from subgraph name to VRL expression program expressions_by_subgraph: ExpressionsBySubgraphMap, config: Arc, - /// Precompiled VRL functions to be used in endpoint expressions. - vrl_functions: Vec>, client: Arc, semaphores_by_origin: DashMap>, max_connections_per_host: usize, in_flight_requests: Arc>, ABuildHasher>>, + should_sign_hmac: Arc, } impl SubgraphExecutorMap { - pub fn new(config: Arc) -> Self { + pub fn try_new(config: Arc) -> Result { let https = HttpsConnector::new(); let client: HttpClient = Client::builder(TokioExecutor::new()) .pool_timer(TokioTimer::new()) - .pool_idle_timeout(Duration::from_secs( - config.traffic_shaping.pool_idle_timeout_seconds, - )) + .pool_idle_timeout(config.traffic_shaping.pool_idle_timeout) .pool_max_idle_per_host(config.traffic_shaping.max_connections_per_host) .build(https); let max_connections_per_host = config.traffic_shaping.max_connections_per_host; - SubgraphExecutorMap { + let should_sign_hmac = compile_hmac_config(&config.hmac_signature)?; + + Ok(SubgraphExecutorMap { executors_by_subgraph: Default::default(), static_endpoints_by_subgraph: Default::default(), expressions_by_subgraph: Default::default(), config, - vrl_functions: vrl_build_functions(), client: Arc::new(client), semaphores_by_origin: Default::default(), max_connections_per_host, in_flight_requests: Arc::new(DashMap::with_hasher(ABuildHasher::default())), - } + should_sign_hmac: Arc::new(should_sign_hmac), + }) } pub fn from_http_endpoint_map( subgraph_endpoint_map: HashMap, config: Arc, ) -> Result { - let mut subgraph_executor_map = SubgraphExecutorMap::new(config.clone()); + let mut subgraph_executor_map = SubgraphExecutorMap::try_new(config.clone())?; for (subgraph_name, original_endpoint_str) in subgraph_endpoint_map.into_iter() { let endpoint_str = config @@ -115,14 +108,13 @@ impl SubgraphExecutorMap { Ok(subgraph_executor_map) } - pub async fn execute<'a, 'req>( + pub async fn execute<'exec, 'req>( &self, subgraph_name: &str, - execution_request: HttpExecutionRequest<'a>, - client_request: &ClientRequestDetails<'a, 'req>, + execution_request: HttpExecutionRequest<'exec, 'req>, ) -> HttpExecutionResponse { - match self.get_or_create_executor(subgraph_name, client_request) { - Ok(Some(executor)) => executor.execute(execution_request).await, + match self.get_or_create_executor(subgraph_name, execution_request.client_request) { + Ok(executor) => executor.execute(execution_request).await, Err(err) => { error!( "Subgraph executor error for subgraph '{}': {}", @@ -130,13 +122,6 @@ impl SubgraphExecutorMap { ); self.internal_server_error_response(err.into(), subgraph_name) } - Ok(None) => { - error!( - "Subgraph executor not found for subgraph '{}'", - subgraph_name - ); - self.internal_server_error_response("Internal server error".into(), subgraph_name) - } } } @@ -165,15 +150,17 @@ impl SubgraphExecutorMap { &self, subgraph_name: &str, client_request: &ClientRequestDetails<'_, '_>, - ) -> Result, SubgraphExecutorError> { - let from_expression = - self.get_or_create_executor_from_expression(subgraph_name, client_request)?; - - if from_expression.is_some() { - return Ok(from_expression); - } - - Ok(self.get_executor_from_static_endpoint(subgraph_name)) + ) -> Result { + self.expressions_by_subgraph + .get(subgraph_name) + .map(|expression| { + self.get_or_create_executor_from_expression( + subgraph_name, + expression, + client_request, + ) + }) + .unwrap_or_else(|| self.get_executor_from_static_endpoint(subgraph_name)) } /// Looks up a subgraph executor, @@ -183,74 +170,50 @@ impl SubgraphExecutorMap { fn get_or_create_executor_from_expression( &self, subgraph_name: &str, + expression: &VrlProgram, client_request: &ClientRequestDetails<'_, '_>, - ) -> Result, SubgraphExecutorError> { - if let Some(expression) = self.expressions_by_subgraph.get(subgraph_name) { - let original_url_value = VrlValue::Bytes(Bytes::from( - self.static_endpoints_by_subgraph - .get(subgraph_name) - .map(|endpoint| endpoint.value().clone()) - .ok_or_else(|| { - SubgraphExecutorError::StaticEndpointNotFound(subgraph_name.to_string()) - })?, - )); - let mut target = VrlTargetValue { - value: VrlValue::Object(BTreeMap::from([ - ("request".into(), client_request.into()), - ("original_url".into(), original_url_value), - ])), - metadata: VrlValue::Object(BTreeMap::new()), - secrets: VrlSecrets::default(), - }; - - let mut state = VrlState::default(); - let timezone = VrlTimeZone::default(); - let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); - - // Resolve the expression to get an endpoint URL. - let endpoint_result = expression.resolve(&mut ctx).map_err(|err| { - SubgraphExecutorError::new_endpoint_expression_resolution_failure( - subgraph_name.to_string(), - err, - ) - })?; - let endpoint_str = match endpoint_result.as_str() { - Some(s) => s.to_string(), - None => { - return Err(SubgraphExecutorError::EndpointExpressionWrongType( - subgraph_name.to_string(), - )); - } - }; - - // Check if an executor for this endpoint already exists. - let existing_executor = self - .executors_by_subgraph + ) -> Result { + let original_url_value = VrlValue::Bytes(Bytes::from( + self.static_endpoints_by_subgraph .get(subgraph_name) - .and_then(|endpoints| endpoints.get(&endpoint_str).map(|e| e.clone())); - - if let Some(executor) = existing_executor { - return Ok(Some(executor)); - } - + .map(|endpoint| endpoint.value().clone()) + .ok_or_else(|| { + SubgraphExecutorError::StaticEndpointNotFound(subgraph_name.to_string()) + })?, + )); + let value = VrlValue::Object(BTreeMap::from([ + ("request".into(), client_request.into()), + ("original_url".into(), original_url_value), + ])); + + // Resolve the expression to get an endpoint URL. + let endpoint_result = execute_expression_with_value(expression, value).map_err(|err| { + SubgraphExecutorError::new_endpoint_expression_resolution_failure( + subgraph_name.to_string(), + err, + ) + })?; + let endpoint_str = match endpoint_result.as_str() { + Some(s) => Ok(s), + None => Err(SubgraphExecutorError::EndpointExpressionWrongType( + subgraph_name.to_string(), + )), + }?; + + // Check if an executor for this endpoint already exists. + self.executors_by_subgraph + .get(subgraph_name) + .and_then(|endpoints| endpoints.get(endpoint_str.as_ref()).map(|e| e.clone())) + .map(Ok) // If not, create and register a new one. - self.register_executor(subgraph_name, &endpoint_str)?; - - let endpoints = self - .executors_by_subgraph - .get(subgraph_name) - .expect("Executor was just registered, should be present"); - return Ok(endpoints.get(&endpoint_str).map(|e| e.clone())); - } - - Ok(None) + .unwrap_or_else(|| self.register_executor(subgraph_name, endpoint_str.as_ref())) } /// Looks up a subgraph executor based on a static endpoint URL. fn get_executor_from_static_endpoint( &self, subgraph_name: &str, - ) -> Option { + ) -> Result { self.static_endpoints_by_subgraph .get(subgraph_name) .and_then(|endpoint_ref| { @@ -259,6 +222,7 @@ impl SubgraphExecutorMap { .get(subgraph_name) .and_then(|endpoints| endpoints.get(endpoint_str).map(|e| e.clone())) }) + .ok_or_else(|| SubgraphExecutorError::StaticEndpointNotFound(subgraph_name.to_string())) } /// Registers a VRL expression for the given subgraph name. @@ -269,12 +233,11 @@ impl SubgraphExecutorMap { subgraph_name: &str, expression: &str, ) -> Result<(), SubgraphExecutorError> { - let compilation_result = vrl_compile(expression, &self.vrl_functions).map_err(|e| { - SubgraphExecutorError::new_endpoint_expression_build(subgraph_name.to_string(), e) + let program = compile_expression(expression, None).map_err(|err| { + SubgraphExecutorError::EndpointExpressionBuild(subgraph_name.to_string(), err) })?; - self.expressions_by_subgraph - .insert(subgraph_name.to_string(), compilation_result.program); + .insert(subgraph_name.to_string(), program); Ok(()) } @@ -293,7 +256,7 @@ impl SubgraphExecutorMap { &self, subgraph_name: &str, endpoint_str: &str, - ) -> Result<(), SubgraphExecutorError> { + ) -> Result { let endpoint_uri = endpoint_str.parse::().map_err(|e| { SubgraphExecutorError::EndpointParseFailure(endpoint_str.to_string(), e.to_string()) })?; @@ -324,13 +287,16 @@ impl SubgraphExecutorMap { semaphore, self.config.clone(), self.in_flight_requests.clone(), + self.should_sign_hmac.clone(), ); + let executor_arc = executor.to_boxed_arc(); + self.executors_by_subgraph .entry(subgraph_name.to_string()) .or_default() - .insert(endpoint_str.to_string(), executor.to_boxed_arc()); + .insert(endpoint_str.to_string(), executor_arc.clone()); - Ok(()) + Ok(executor_arc) } } diff --git a/lib/executor/src/headers/compile.rs b/lib/executor/src/headers/compile.rs index 5b1b14a9f..587cbee60 100644 --- a/lib/executor/src/headers/compile.rs +++ b/lib/executor/src/headers/compile.rs @@ -1,54 +1,28 @@ -use crate::headers::{ - errors::HeaderRuleCompileError, - plan::{ - HeaderAggregationStrategy, HeaderRulesPlan, RequestHeaderRule, RequestHeaderRules, - RequestInsertExpression, RequestInsertStatic, RequestPropagateNamed, RequestPropagateRegex, - RequestRemoveNamed, RequestRemoveRegex, ResponseHeaderRule, ResponseHeaderRules, - ResponseInsertExpression, ResponseInsertStatic, ResponsePropagateNamed, - ResponsePropagateRegex, ResponseRemoveNamed, ResponseRemoveRegex, +use crate::{ + headers::{ + errors::HeaderRuleCompileError, + plan::{ + HeaderAggregationStrategy, HeaderRulesPlan, RequestHeaderRule, RequestHeaderRules, + RequestInsertExpression, RequestInsertStatic, RequestPropagateNamed, + RequestPropagateRegex, RequestRemoveNamed, RequestRemoveRegex, ResponseHeaderRule, + ResponseHeaderRules, ResponseInsertExpression, ResponseInsertStatic, + ResponsePropagateNamed, ResponsePropagateRegex, ResponseRemoveNamed, + ResponseRemoveRegex, + }, }, + utils::expression::compile_expression, }; use hive_router_config::headers as config; use http::HeaderName; use regex_automata::{meta, util::syntax::Config as SyntaxConfig}; -use vrl::{ - compiler::compile as vrl_compile, prelude::Function as VrlFunction, - stdlib::all as vrl_build_functions, -}; - -pub struct HeaderRuleCompilerContext { - vrl_functions: Vec>, -} - -impl Default for HeaderRuleCompilerContext { - fn default() -> Self { - Self::new() - } -} - -impl HeaderRuleCompilerContext { - pub fn new() -> Self { - Self { - vrl_functions: vrl_build_functions(), - } - } -} pub trait HeaderRuleCompiler { - fn compile( - &self, - ctx: &HeaderRuleCompilerContext, - actions: &mut A, - ) -> Result<(), HeaderRuleCompileError>; + fn compile(&self, actions: &mut A) -> Result<(), HeaderRuleCompileError>; } impl HeaderRuleCompiler> for config::RequestHeaderRule { - fn compile( - &self, - ctx: &HeaderRuleCompilerContext, - actions: &mut Vec, - ) -> Result<(), HeaderRuleCompileError> { + fn compile(&self, actions: &mut Vec) -> Result<(), HeaderRuleCompileError> { match self { config::RequestHeaderRule::Propagate(rule) => { let spec = materialize_match_spec( @@ -79,15 +53,13 @@ impl HeaderRuleCompiler> for config::RequestHeaderRule { })); } config::InsertSource::Expression { expression } => { - let compilation_result = - vrl_compile(expression, &ctx.vrl_functions).map_err(|e| { - HeaderRuleCompileError::new_expression_build(rule.name.clone(), e) - })?; - + let program = compile_expression(expression, None).map_err(|err| { + HeaderRuleCompileError::ExpressionBuild(rule.name.clone(), err) + })?; actions.push(RequestHeaderRule::InsertExpression( RequestInsertExpression { name: build_header_name(&rule.name)?, - expression: Box::new(compilation_result.program), + expression: Box::new(program), }, )); } @@ -112,11 +84,7 @@ impl HeaderRuleCompiler> for config::RequestHeaderRule { } impl HeaderRuleCompiler> for config::ResponseHeaderRule { - fn compile( - &self, - ctx: &HeaderRuleCompilerContext, - actions: &mut Vec, - ) -> Result<(), HeaderRuleCompileError> { + fn compile(&self, actions: &mut Vec) -> Result<(), HeaderRuleCompileError> { match self { config::ResponseHeaderRule::Propagate(rule) => { let aggregation_strategy = rule.algorithm.into(); @@ -159,15 +127,13 @@ impl HeaderRuleCompiler> for config::ResponseHeaderRule // - compilation_result.program.info().target_assignments // - compilation_result.program.info().target_queries // to determine what parts of the context are actually needed by the expression - let compilation_result = vrl_compile(expression, &ctx.vrl_functions) - .map_err(|e| { - HeaderRuleCompileError::new_expression_build(rule.name.clone(), e) - })?; - + let program = compile_expression(expression, None).map_err(|err| { + HeaderRuleCompileError::ExpressionBuild(rule.name.clone(), err) + })?; actions.push(ResponseHeaderRule::InsertExpression( ResponseInsertExpression { name: build_header_name(&rule.name)?, - expression: Box::new(compilation_result.program), + expression: Box::new(program), strategy: aggregation_strategy, }, )); @@ -196,19 +162,18 @@ impl HeaderRuleCompiler> for config::ResponseHeaderRule pub fn compile_headers_plan( cfg: &config::HeadersConfig, ) -> Result { - let ctx = HeaderRuleCompilerContext::new(); let mut request_plan = RequestHeaderRules::default(); let mut response_plan = ResponseHeaderRules::default(); if let Some(global_rules) = &cfg.all { - request_plan.global = compile_request_header_rules(&ctx, global_rules)?; - response_plan.global = compile_response_header_rules(&ctx, global_rules)?; + request_plan.global = compile_request_header_rules(global_rules)?; + response_plan.global = compile_response_header_rules(global_rules)?; } if let Some(subgraph_rules_map) = &cfg.subgraphs { for (subgraph_name, subgraph_rules) in subgraph_rules_map { - let request_actions = compile_request_header_rules(&ctx, subgraph_rules)?; - let response_actions = compile_response_header_rules(&ctx, subgraph_rules)?; + let request_actions = compile_request_header_rules(subgraph_rules)?; + let response_actions = compile_response_header_rules(subgraph_rules)?; request_plan .by_subgraph .insert(subgraph_name.clone(), request_actions); @@ -225,26 +190,24 @@ pub fn compile_headers_plan( } fn compile_request_header_rules( - ctx: &HeaderRuleCompilerContext, header_rules: &config::HeaderRules, ) -> Result, HeaderRuleCompileError> { let mut request_actions = Vec::new(); if let Some(request_rule_entries) = &header_rules.request { for request_rule in request_rule_entries { - request_rule.compile(ctx, &mut request_actions)?; + request_rule.compile(&mut request_actions)?; } } Ok(request_actions) } fn compile_response_header_rules( - ctx: &HeaderRuleCompilerContext, header_rules: &config::HeaderRules, ) -> Result, HeaderRuleCompileError> { let mut response_actions = Vec::new(); if let Some(response_rule_entries) = &header_rules.response { for response_rule in response_rule_entries { - response_rule.compile(ctx, &mut response_actions)?; + response_rule.compile(&mut response_actions)?; } } Ok(response_actions) @@ -358,7 +321,7 @@ mod tests { use http::HeaderName; use crate::headers::{ - compile::{build_header_value, HeaderRuleCompiler, HeaderRuleCompilerContext}, + compile::{build_header_value, HeaderRuleCompiler}, errors::HeaderRuleCompileError, plan::{HeaderAggregationStrategy, RequestHeaderRule, ResponseHeaderRule}, }; @@ -378,9 +341,8 @@ mod tests { rename: None, default: None, }); - let ctx = HeaderRuleCompilerContext::new(); let mut actions = Vec::new(); - rule.compile(&ctx, &mut actions).unwrap(); + rule.compile(&mut actions).unwrap(); assert_eq!(actions.len(), 1); match &actions[0] { RequestHeaderRule::PropagateNamed(data) => { @@ -401,8 +363,7 @@ mod tests { }, }); let mut actions = Vec::new(); - let ctx = HeaderRuleCompilerContext::new(); - rule.compile(&ctx, &mut actions).unwrap(); + rule.compile(&mut actions).unwrap(); assert_eq!(actions.len(), 1); match &actions[0] { RequestHeaderRule::InsertStatic(data) => { @@ -423,8 +384,7 @@ mod tests { }, }); let mut actions = Vec::new(); - let ctx = HeaderRuleCompilerContext::new(); - rule.compile(&ctx, &mut actions).unwrap(); + rule.compile(&mut actions).unwrap(); assert_eq!(actions.len(), 1); match &actions[0] { RequestHeaderRule::RemoveNamed(data) => { @@ -449,8 +409,7 @@ mod tests { default: Some("def".to_string()), }); let mut actions = Vec::new(); - let ctx = HeaderRuleCompilerContext::new(); - let err = rule.compile(&ctx, &mut actions).unwrap_err(); + let err = rule.compile(&mut actions).unwrap_err(); match err { HeaderRuleCompileError::InvalidDefault => {} _ => panic!("Expected InvalidDefault error"), @@ -470,8 +429,7 @@ mod tests { algorithm: config::AggregationAlgo::First, }); let mut actions = Vec::new(); - let ctx = HeaderRuleCompilerContext::new(); - rule.compile(&ctx, &mut actions).unwrap(); + rule.compile(&mut actions).unwrap(); assert_eq!(actions.len(), 1); match &actions[0] { ResponseHeaderRule::PropagateNamed(data) => { diff --git a/lib/executor/src/headers/errors.rs b/lib/executor/src/headers/errors.rs index d53444877..6c2a806d2 100644 --- a/lib/executor/src/headers/errors.rs +++ b/lib/executor/src/headers/errors.rs @@ -1,6 +1,6 @@ use http::header::{InvalidHeaderName, InvalidHeaderValue}; use regex_automata::meta::BuildError; -use vrl::{diagnostic::DiagnosticList, prelude::ExpressionError}; +use vrl::prelude::ExpressionError; #[derive(thiserror::Error, Debug)] pub enum HeaderRuleCompileError { @@ -27,16 +27,8 @@ pub enum HeaderRuleRuntimeError { } impl HeaderRuleCompileError { - pub fn new_expression_build(header_name: String, diagnostics: DiagnosticList) -> Self { - HeaderRuleCompileError::ExpressionBuild( - header_name, - diagnostics - .errors() - .into_iter() - .map(|d| d.code.to_string() + ": " + &d.message) - .collect::>() - .join(", "), - ) + pub fn new_expression_build(header_name: String, err: String) -> Self { + HeaderRuleCompileError::ExpressionBuild(header_name, err) } } diff --git a/lib/executor/src/headers/request.rs b/lib/executor/src/headers/request.rs index 637ab0d58..ec22de509 100644 --- a/lib/executor/src/headers/request.rs +++ b/lib/executor/src/headers/request.rs @@ -1,12 +1,4 @@ -use std::collections::BTreeMap; - use http::HeaderMap; -use vrl::{ - compiler::TargetValue as VrlTargetValue, - core::Value as VrlValue, - prelude::{state::RuntimeState as VrlState, Context as VrlContext, TimeZone as VrlTimeZone}, - value::Secrets as VrlSecrets, -}; use crate::{ execution::client_request_details::ClientRequestDetails, @@ -19,6 +11,7 @@ use crate::{ }, sanitizer::{is_denied_header, is_never_join_header}, }, + utils::expression::execute_expression_with_value, }; pub fn modify_subgraph_request_headers( @@ -174,17 +167,7 @@ impl ApplyRequestHeader for RequestInsertExpression { if is_denied_header(&self.name) { return Ok(()); } - - let mut target = VrlTargetValue { - value: ctx.into(), - metadata: VrlValue::Object(BTreeMap::new()), - secrets: VrlSecrets::default(), - }; - - let mut state = VrlState::default(); - let timezone = VrlTimeZone::default(); - let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); - let value = self.expression.resolve(&mut ctx).map_err(|err| { + let value = execute_expression_with_value(&self.expression, ctx.into()).map_err(|err| { HeaderRuleRuntimeError::new_expression_evaluation(self.name.to_string(), Box::new(err)) })?; diff --git a/lib/executor/src/headers/response.rs b/lib/executor/src/headers/response.rs index 6a5c34444..9cf45a768 100644 --- a/lib/executor/src/headers/response.rs +++ b/lib/executor/src/headers/response.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, iter::once}; +use std::iter::once; use crate::{ execution::client_request_details::ClientRequestDetails, @@ -13,16 +13,11 @@ use crate::{ }, sanitizer::is_denied_header, }, + utils::expression::execute_expression_with_value, }; use super::sanitizer::is_never_join_header; use http::{header::InvalidHeaderValue, HeaderMap, HeaderName, HeaderValue}; -use vrl::{ - compiler::TargetValue as VrlTargetValue, - core::Value as VrlValue, - prelude::{state::RuntimeState as VrlState, Context as VrlContext, TimeZone as VrlTimeZone}, - value::Secrets as VrlSecrets, -}; pub fn apply_subgraph_response_headers( header_rule_plan: &HeaderRulesPlan, @@ -194,20 +189,9 @@ impl ApplyResponseHeader for ResponseInsertExpression { if is_denied_header(&self.name) { return Ok(()); } - - let mut target = VrlTargetValue { - value: ctx.into(), - metadata: VrlValue::Object(BTreeMap::new()), - secrets: VrlSecrets::default(), - }; - - let mut state = VrlState::default(); - let timezone = VrlTimeZone::default(); - let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); - let value = self.expression.resolve(&mut ctx).map_err(|err| { - HeaderRuleRuntimeError::ExpressionEvaluation(self.name.to_string(), Box::new(err)) + let value = execute_expression_with_value(&self.expression, ctx.into()).map_err(|err| { + HeaderRuleRuntimeError::new_expression_evaluation(self.name.to_string(), Box::new(err)) })?; - if let Some(header_value) = vrl_value_to_header_value(value) { let strategy = if is_never_join_header(&self.name) { HeaderAggregationStrategy::Append diff --git a/lib/executor/src/utils/expression.rs b/lib/executor/src/utils/expression.rs new file mode 100644 index 000000000..faee2dc3b --- /dev/null +++ b/lib/executor/src/utils/expression.rs @@ -0,0 +1,49 @@ +use once_cell::sync::Lazy; +use std::collections::BTreeMap; +use vrl::{ + compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue}, + core::Value as VrlValue, + prelude::{ + state::RuntimeState as VrlState, Context as VrlContext, ExpressionError, Function, + TimeZone as VrlTimeZone, + }, + stdlib::all as vrl_build_functions, + value::Secrets as VrlSecrets, +}; + +static VRL_FUNCTIONS: Lazy>> = Lazy::new(vrl_build_functions); +static VRL_TIMEZONE: Lazy = Lazy::new(VrlTimeZone::default); + +pub fn compile_expression( + expression: &str, + functions: Option<&[Box]>, +) -> Result { + let functions = functions.unwrap_or(&VRL_FUNCTIONS); + + let compilation_result = vrl_compile(expression, functions).map_err(|diagnostics| { + diagnostics + .errors() + .iter() + .map(|d| format!("{}: {}", d.code, d.message)) + .collect::>() + .join(", ") + })?; + + Ok(compilation_result.program) +} + +pub fn execute_expression_with_value( + program: &VrlProgram, + value: VrlValue, +) -> Result { + let mut target = VrlTargetValue { + value, + metadata: VrlValue::Object(BTreeMap::new()), + secrets: VrlSecrets::default(), + }; + + let mut state = VrlState::default(); + let mut ctx = VrlContext::new(&mut target, &mut state, &VRL_TIMEZONE); + + program.resolve(&mut ctx) +} diff --git a/lib/executor/src/utils/mod.rs b/lib/executor/src/utils/mod.rs index fc4226984..0461bb8a8 100644 --- a/lib/executor/src/utils/mod.rs +++ b/lib/executor/src/utils/mod.rs @@ -1,3 +1,4 @@ pub mod consts; +pub mod expression; pub mod traverse; pub mod vrl; diff --git a/lib/router-config/src/hmac_signature.rs b/lib/router-config/src/hmac_signature.rs new file mode 100644 index 000000000..dc393b1bd --- /dev/null +++ b/lib/router-config/src/hmac_signature.rs @@ -0,0 +1,62 @@ +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Deserialize, Serialize, Default, JsonSchema)] +pub struct HMACSignatureConfig { + // Whether to sign outgoing requests with HMAC signatures. + // Can be a boolean or a VRL expression that evaluates to a boolean. + // Example: + // hmac_signature: + // enabled: true + // or enable it conditionally based on the subgraph name: + // hmac_signature: + // enabled: | + // if .subgraph.name == "users" { + // true + // } else { + // false + // } + #[serde(default = "default_hmac_signature_enabled")] + pub enabled: BooleanOrExpression, + + // The secret key used for HMAC signing and verification. + // It should be a random, opaque string shared between the Hive Router and the subgraph services. + pub secret: String, + + // The key name used in the extensions field of the outgoing requests to store the HMAC signature. + #[serde(default = "default_extension_name")] + pub extension_name: String, +} + +fn default_extension_name() -> String { + "hmac-signature".to_string() +} + +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] +#[serde(untagged)] +pub enum BooleanOrExpression { + Boolean(bool), + Expression { expression: String }, +} + +impl Default for BooleanOrExpression { + fn default() -> Self { + BooleanOrExpression::Boolean(false) + } +} + +impl HMACSignatureConfig { + pub fn is_disabled(&self) -> bool { + match &self.enabled { + BooleanOrExpression::Boolean(b) => !*b, + BooleanOrExpression::Expression { expression: _ } => { + // If it's an expression, we consider it enabled for the purpose of this check. + false + } + } + } +} + +fn default_hmac_signature_enabled() -> BooleanOrExpression { + BooleanOrExpression::Boolean(false) +} diff --git a/lib/router-config/src/lib.rs b/lib/router-config/src/lib.rs index 537244c9e..b2af09431 100644 --- a/lib/router-config/src/lib.rs +++ b/lib/router-config/src/lib.rs @@ -3,6 +3,7 @@ pub mod csrf; mod env_overrides; pub mod graphiql; pub mod headers; +pub mod hmac_signature; pub mod http_server; pub mod jwt_auth; pub mod log; @@ -29,7 +30,7 @@ use crate::{ primitives::file_path::with_start_path, query_planner::QueryPlannerConfig, supergraph::SupergraphSource, - traffic_shaping::TrafficShapingExecutorConfig, + traffic_shaping::TrafficShapingConfig, }; #[derive(Debug, Deserialize, Serialize, JsonSchema)] @@ -62,9 +63,9 @@ pub struct HiveRouterConfig { #[serde(default)] pub http: HttpServerConfig, - /// Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs. + /// Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs. #[serde(default)] - pub traffic_shaping: TrafficShapingExecutorConfig, + pub traffic_shaping: TrafficShapingConfig, /// Configuration for the headers. #[serde(default)] @@ -92,6 +93,12 @@ pub struct HiveRouterConfig { /// Configuration for overriding labels. #[serde(default, skip_serializing_if = "HashMap::is_empty")] pub override_labels: OverrideLabelsConfig, + + #[serde( + default, + skip_serializing_if = "hmac_signature::HMACSignatureConfig::is_disabled" + )] + pub hmac_signature: hmac_signature::HMACSignatureConfig, } #[derive(Debug, thiserror::Error)] diff --git a/lib/router-config/src/traffic_shaping.rs b/lib/router-config/src/traffic_shaping.rs index 02ed5ecdd..95d824910 100644 --- a/lib/router-config/src/traffic_shaping.rs +++ b/lib/router-config/src/traffic_shaping.rs @@ -1,16 +1,23 @@ +use std::time::Duration; + use schemars::JsonSchema; use serde::{Deserialize, Serialize}; #[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)] #[serde(deny_unknown_fields)] -pub struct TrafficShapingExecutorConfig { +pub struct TrafficShapingConfig { /// Limits the concurrent amount of requests/connections per host/subgraph. #[serde(default = "default_max_connections_per_host")] pub max_connections_per_host: usize, /// Timeout for idle sockets being kept-alive. - #[serde(default = "default_pool_idle_timeout_seconds")] - pub pool_idle_timeout_seconds: u64, + #[serde( + default = "default_pool_idle_timeout", + deserialize_with = "humantime_serde::deserialize", + serialize_with = "humantime_serde::serialize" + )] + #[schemars(with = "String")] + pub pool_idle_timeout: Duration, /// Enables/disables request deduplication to subgraphs. /// @@ -20,11 +27,11 @@ pub struct TrafficShapingExecutorConfig { pub dedupe_enabled: bool, } -impl Default for TrafficShapingExecutorConfig { +impl Default for TrafficShapingConfig { fn default() -> Self { Self { max_connections_per_host: default_max_connections_per_host(), - pool_idle_timeout_seconds: default_pool_idle_timeout_seconds(), + pool_idle_timeout: default_pool_idle_timeout(), dedupe_enabled: default_dedupe_enabled(), } } @@ -34,8 +41,8 @@ fn default_max_connections_per_host() -> usize { 100 } -fn default_pool_idle_timeout_seconds() -> u64 { - 50 +fn default_pool_idle_timeout() -> Duration { + Duration::from_secs(50) } fn default_dedupe_enabled() -> bool {