Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .changeset/shared_utilities_to_handle_vrl_expressions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
default: minor
---

# Breaking

Removed `pool_idle_timeout_seconds` from `traffic_shaping`, instead use `pool_idle_timeout` with duration format.

```diff
traffic_shaping:
- pool_idle_timeout_seconds: 30
+ pool_idle_timeout: 30s
```

#540 by @ardatan
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 12 additions & 17 deletions bin/router/src/pipeline/progressive_override.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
use std::collections::{BTreeMap, HashMap, HashSet};

use hive_router_config::override_labels::{LabelOverrideValue, OverrideLabelsConfig};
use hive_router_plan_executor::execution::client_request_details::ClientRequestDetails;
use hive_router_plan_executor::{
execution::client_request_details::ClientRequestDetails, utils::expression::compile_expression,
};
use hive_router_query_planner::{
graph::{PlannerOverrideContext, PERCENTAGE_SCALE_FACTOR},
state::supergraph_state::SupergraphState,
};
use rand::Rng;
use vrl::{
compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue},
compiler::Program as VrlProgram,
compiler::TargetValue as VrlTargetValue,
core::Value as VrlValue,
prelude::{
state::RuntimeState as VrlState, Context as VrlContext, ExpressionError,
TimeZone as VrlTimeZone,
},
stdlib::all as vrl_build_functions,
value::Secrets as VrlSecrets,
};

Expand Down Expand Up @@ -126,27 +128,20 @@ impl OverrideLabelsEvaluator {
) -> Result<Self, OverrideLabelsCompileError> {
let mut static_enabled_labels = HashSet::new();
let mut expressions = HashMap::new();
let vrl_functions = vrl_build_functions();

for (label, value) in override_labels_config.iter() {
match value {
LabelOverrideValue::Boolean(true) => {
static_enabled_labels.insert(label.clone());
}
LabelOverrideValue::Expression { expression } => {
let compilation_result =
vrl_compile(expression, &vrl_functions).map_err(|diagnostics| {
OverrideLabelsCompileError {
label: label.clone(),
error: diagnostics
.errors()
.into_iter()
.map(|d| d.code.to_string() + ": " + &d.message)
.collect::<Vec<_>>()
.join(", "),
}
})?;
expressions.insert(label.clone(), compilation_result.program);
let program = compile_expression(expression, None).map_err(|err| {
OverrideLabelsCompileError {
label: label.clone(),
error: err.to_string(),
}
})?;
expressions.insert(label.clone(), program);
}
_ => {} // Skip false booleans
}
Expand Down
68 changes: 60 additions & 8 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
|[**override\_subgraph\_urls**](#override_subgraph_urls)|`object`|Configuration for overriding subgraph URLs.<br/>Default: `{}`<br/>||
|[**query\_planner**](#query_planner)|`object`|Query planning configuration.<br/>Default: `{"allow_expose":false,"timeout":"10s"}`<br/>||
|[**supergraph**](#supergraph)|`object`|Configuration for the Federation supergraph source. By default, the router will use a local file-based supergraph source (`./supergraph.graphql`).<br/>||
|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs.<br/>Default: `{"dedupe_enabled":true,"max_connections_per_host":100,"pool_idle_timeout_seconds":50}`<br/>||
|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs.<br/>Default: `{"all":{"dedupe_enabled":true,"pool_idle_timeout":"50s","request_timeout":"30s"},"max_connections_per_host":100}`<br/>||

**Additional Properties:** not allowed
**Example**
Expand Down Expand Up @@ -107,9 +107,11 @@ query_planner:
timeout: 10s
supergraph: {}
traffic_shaping:
dedupe_enabled: true
all:
dedupe_enabled: true
pool_idle_timeout: 50s
request_timeout: 30s
max_connections_per_host: 100
pool_idle_timeout_seconds: 50

```

Expand Down Expand Up @@ -1808,25 +1810,75 @@ Request timeout for the Hive Console CDN requests.
<a name="traffic_shaping"></a>
## traffic\_shaping: object

Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs.
Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs.


**Properties**

|Name|Type|Description|Required|
|----|----|-----------|--------|
|**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.<br/><br/>When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will<br/>be deduplicated by sharing the response of other in-flight requests.<br/>Default: `true`<br/>||
|[**all**](#traffic_shapingall)|`object`|The default configuration that will be applied to all subgraphs, unless overridden by a specific subgraph configuration.<br/>Default: `{"dedupe_enabled":true,"pool_idle_timeout":"50s","request_timeout":"30s"}`<br/>||
|**max\_connections\_per\_host**|`integer`|Limits the concurrent amount of requests/connections per host/subgraph.<br/>Default: `100`<br/>Format: `"uint"`<br/>Minimum: `0`<br/>||
|**pool\_idle\_timeout\_seconds**|`integer`|Timeout for idle sockets being kept-alive.<br/>Default: `50`<br/>Format: `"uint64"`<br/>Minimum: `0`<br/>||
|[**subgraphs**](#traffic_shapingsubgraphs)|`object`|Optional per-subgraph configurations that will override the default configuration for specific subgraphs.<br/>||

**Additional Properties:** not allowed
**Example**

```yaml
dedupe_enabled: true
all:
dedupe_enabled: true
pool_idle_timeout: 50s
request_timeout: 30s
max_connections_per_host: 100
pool_idle_timeout_seconds: 50

```

<a name="traffic_shapingall"></a>
### traffic\_shaping\.all: object

The default configuration that will be applied to all subgraphs, unless overridden by a specific subgraph configuration.


**Properties**

|Name|Type|Description|Required|
|----|----|-----------|--------|
|**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.<br/><br/>When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will<br/>be deduplicated by sharing the response of other in-flight requests.<br/>Default: `true`<br/>||
|**pool\_idle\_timeout**|`string`|Timeout for idle sockets being kept-alive.<br/>Default: `"50s"`<br/>||
|**request\_timeout**||Optional timeout configuration for requests to subgraphs.<br/><br/>Example with a fixed duration:<br/>```yaml<br/> timeout:<br/> duration: 5s<br/>```<br/><br/>Or with a VRL expression that can return a duration based on the operation kind:<br/>```yaml<br/> timeout:<br/> expression: \|<br/> if (.request.operation.type == "mutation") {<br/> "10s"<br/> } else {<br/> "15s"<br/> }<br/>```<br/>Default: `"30s"`<br/>||

**Additional Properties:** not allowed
**Example**

```yaml
dedupe_enabled: true
pool_idle_timeout: 50s
request_timeout: 30s

```

<a name="traffic_shapingsubgraphs"></a>
### traffic\_shaping\.subgraphs: object

Optional per-subgraph configurations that will override the default configuration for specific subgraphs.


**Additional Properties**

|Name|Type|Description|Required|
|----|----|-----------|--------|
|[**Additional Properties**](#traffic_shapingsubgraphsadditionalproperties)|`object`||yes|

<a name="traffic_shapingsubgraphsadditionalproperties"></a>
#### traffic\_shaping\.subgraphs\.additionalProperties: object

**Properties**

|Name|Type|Description|Required|
|----|----|-----------|--------|
|**dedupe\_enabled**|`boolean`, `null`|Enables/disables request deduplication to subgraphs.<br/><br/>When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will<br/>be deduplicated by sharing the response of other in-flight requests.<br/>|no|
|**pool\_idle\_timeout\_seconds**|`string`|Timeout for idle sockets being kept-alive.<br/>|yes|
|**request\_timeout**||Optional timeout configuration for requests to subgraphs.<br/><br/>Example with a fixed duration:<br/>```yaml<br/> timeout:<br/> duration: 5s<br/>```<br/><br/>Or with a VRL expression that can return a duration based on the operation kind:<br/>```yaml<br/> timeout:<br/> expression: \|<br/> if (.request.operation.type == "mutation") {<br/> "10s"<br/> } else {<br/> "15s"<br/> }<br/>```<br/>|no|

**Additional Properties:** not allowed

2 changes: 2 additions & 0 deletions lib/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ hyper-util = { version = "0.1.16", features = [
"http2",
"tokio",
] }
humantime = "2.3.0"
bytes = "1.10.1"
itoa = "1.0.15"
ryu = "1.0.20"
indexmap = "2.10.0"
bumpalo = "3.19.0"
once_cell = "1.21.3"

[dev-dependencies]
subgraphs = { path = "../../bench/subgraphs" }
Expand Down
4 changes: 2 additions & 2 deletions lib/executor/src/execution/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
rewrites::FetchRewriteExt,
},
executors::{
common::{HttpExecutionRequest, HttpExecutionResponse},
common::{HttpExecutionResponse, SubgraphExecutionRequest},
map::SubgraphExecutorMap,
},
headers::{
Expand Down Expand Up @@ -700,7 +700,7 @@ impl<'exec, 'req> Executor<'exec, 'req> {
let variable_refs =
select_fetch_variables(self.variable_values, node.variable_usages.as_ref());

let mut subgraph_request = HttpExecutionRequest {
let mut subgraph_request = SubgraphExecutionRequest {
query: node.operation.document_str.as_str(),
dedupe: self.dedupe_subgraph_requests,
operation_name: node.operation_name.as_deref(),
Expand Down
9 changes: 5 additions & 4 deletions lib/executor/src/executors/common.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap, sync::Arc, time::Duration};

use async_trait::async_trait;
use bytes::Bytes;
Expand All @@ -9,7 +9,8 @@ use sonic_rs::Value;
pub trait SubgraphExecutor {
async fn execute<'a>(
&self,
execution_request: HttpExecutionRequest<'a>,
execution_request: SubgraphExecutionRequest<'a>,
timeout: Option<Duration>,
) -> HttpExecutionResponse;

fn to_boxed_arc<'a>(self) -> Arc<Box<dyn SubgraphExecutor + Send + Sync + 'a>>
Expand All @@ -26,7 +27,7 @@ pub type SubgraphExecutorBoxedArc = Arc<Box<SubgraphExecutorType>>;

pub type SubgraphRequestExtensions = HashMap<String, Value>;

pub struct HttpExecutionRequest<'a> {
pub struct SubgraphExecutionRequest<'a> {
pub query: &'a str,
pub dedupe: bool,
pub operation_name: Option<&'a str>,
Expand All @@ -37,7 +38,7 @@ pub struct HttpExecutionRequest<'a> {
pub extensions: Option<SubgraphRequestExtensions>,
}

impl HttpExecutionRequest<'_> {
impl SubgraphExecutionRequest<'_> {
pub fn add_request_extensions_field(&mut self, key: String, value: Value) {
self.extensions
.get_or_insert_with(HashMap::new)
Expand Down
24 changes: 24 additions & 0 deletions lib/executor/src/executors/duration_or_prog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use std::time::Duration;

use hive_router_config::traffic_shaping::DurationOrExpression;
use vrl::{compiler::Program as VrlProgram, prelude::Function};

use crate::utils::expression::compile_expression;

pub enum DurationOrProgram {
Duration(Duration),
Program(Box<VrlProgram>),
}

pub fn compile_duration_expression(
duration_or_expr: &DurationOrExpression,
fns: Option<&[Box<dyn Function>]>,
) -> Result<DurationOrProgram, String> {
match duration_or_expr {
DurationOrExpression::Duration(dur) => Ok(DurationOrProgram::Duration(*dur)),
DurationOrExpression::Expression { expression } => {
let program = compile_expression(expression, fns)?;
Ok(DurationOrProgram::Program(Box::new(program)))
}
}
}
30 changes: 14 additions & 16 deletions lib/executor/src/executors/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use vrl::{diagnostic::DiagnosticList, prelude::ExpressionError};
use vrl::prelude::ExpressionError;

use crate::response::graphql_error::{GraphQLError, GraphQLErrorExtensions};

Expand All @@ -20,6 +20,12 @@ pub enum SubgraphExecutorError {
RequestFailure(String, String),
#[error("Failed to serialize variable \"{0}\": {1}")]
VariablesSerializationFailure(String, String),
#[error("Failed to compile VRL expression for timeout for subgraph '{0}'. Please check your VRL expression for syntax errors. Diagnostic: {1}")]
RequestTimeoutExpressionBuild(String, String),
#[error("Failed to resolve VRL expression for timeout for subgraph '{0}'. Runtime error: {1}")]
TimeoutExpressionResolution(String, String),
#[error("Request to subgraph \"{0}\" timed out after {1} milliseconds")]
RequestTimeout(String, u64),
}

impl From<SubgraphExecutorError> for GraphQLError {
Expand All @@ -34,21 +40,6 @@ impl From<SubgraphExecutorError> for GraphQLError {
}

impl SubgraphExecutorError {
pub fn new_endpoint_expression_build(
subgraph_name: String,
diagnostics: DiagnosticList,
) -> Self {
SubgraphExecutorError::EndpointExpressionBuild(
subgraph_name,
diagnostics
.errors()
.into_iter()
.map(|d| d.code.to_string() + ": " + &d.message)
.collect::<Vec<_>>()
.join(", "),
)
}

pub fn new_endpoint_expression_resolution_failure(
subgraph_name: String,
error: ExpressionError,
Expand Down Expand Up @@ -76,6 +67,13 @@ impl SubgraphExecutorError {
SubgraphExecutorError::VariablesSerializationFailure(_, _) => {
"SUBGRAPH_VARIABLES_SERIALIZATION_FAILURE"
}
SubgraphExecutorError::TimeoutExpressionResolution(_, _) => {
"SUBGRAPH_TIMEOUT_EXPRESSION_RESOLUTION_FAILURE"
}
SubgraphExecutorError::RequestTimeout(_, _) => "SUBGRAPH_REQUEST_TIMEOUT",
SubgraphExecutorError::RequestTimeoutExpressionBuild(_, _) => {
"SUBGRAPH_TIMEOUT_EXPRESSION_BUILD_FAILURE"
}
}
}
}
Loading
Loading