Skip to content

Commit e38d39f

Browse files
committed
Finish impl
1 parent 7615e47 commit e38d39f

File tree

9 files changed

+261
-574
lines changed

9 files changed

+261
-574
lines changed

Cargo.lock

Lines changed: 166 additions & 508 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/executor/Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,10 @@ itoa = "1.0.15"
4949
ryu = "1.0.20"
5050
indexmap = "2.10.0"
5151
bumpalo = "3.19.0"
52-
aws-config = "1.8.8"
53-
aws-sigv4 = "1.3.5"
54-
aws-credential-types = "1.2.8"
52+
reqsign-aws-v4 = "2.0.1"
53+
reqsign-core = "2.0.1"
54+
reqsign-file-read-tokio = "2.0.1"
55+
reqsign-http-send-reqwest = "2.0.1"
5556

5657
[dev-dependencies]
5758
subgraphs = { path = "../../bench/subgraphs" }
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
use hive_router_config::aws_sig_v4::AwsSigV4SubgraphConfig;
2+
use reqsign_aws_v4::{
3+
Credential, DefaultCredentialProvider, ProfileCredentialProvider, RequestSigner,
4+
StaticCredentialProvider,
5+
};
6+
use reqsign_core::{Context, OsEnv, ProvideCredentialChain, Signer};
7+
use reqsign_file_read_tokio::TokioFileRead;
8+
use reqsign_http_send_reqwest::ReqwestHttpSend;
9+
10+
pub fn create_awssigv4_signer(config: &AwsSigV4SubgraphConfig) -> Signer<Credential> {
11+
let ctx = Context::new()
12+
.with_file_read(TokioFileRead)
13+
.with_http_send(ReqwestHttpSend::default())
14+
.with_env(OsEnv);
15+
let mut loader = ProvideCredentialChain::new();
16+
match config {
17+
AwsSigV4SubgraphConfig::DefaultChain(default_chain) => {
18+
loader = loader.push(DefaultCredentialProvider::new());
19+
if let Some(profile_name) = &default_chain.profile_name {
20+
loader = loader.push(ProfileCredentialProvider::new().with_profile(profile_name));
21+
}
22+
}
23+
AwsSigV4SubgraphConfig::HardCoded(hard_coded) => {
24+
let mut provider = StaticCredentialProvider::new(
25+
&hard_coded.access_key_id,
26+
&hard_coded.secret_access_key,
27+
);
28+
if let Some(session_token) = &hard_coded.session_token {
29+
provider = provider.with_session_token(session_token);
30+
}
31+
loader = loader.push(provider);
32+
}
33+
}
34+
let service: &str = match config {
35+
AwsSigV4SubgraphConfig::DefaultChain(default_chain) => {
36+
default_chain.service.as_ref().map_or("s3", |v| v)
37+
}
38+
AwsSigV4SubgraphConfig::HardCoded(hard_coded) => hard_coded.service_name.as_str(),
39+
};
40+
let region: &str = match config {
41+
AwsSigV4SubgraphConfig::DefaultChain(default_chain) => {
42+
default_chain.region.as_ref().map_or("us-east-1", |v| v)
43+
}
44+
AwsSigV4SubgraphConfig::HardCoded(hard_coded) => hard_coded.region.as_str(),
45+
};
46+
let builder = RequestSigner::new(service, region);
47+
48+
Signer::new(ctx, loader, builder)
49+
}

lib/executor/src/execution/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
pub mod awssigv4;
12
pub mod client_request_details;
23
pub mod error;
34
pub mod jwt_forward;

lib/executor/src/executors/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ impl SubgraphExecutorError {
7777
SubgraphExecutorError::RequestFailure(_, _) => "SUBGRAPH_REQUEST_FAILURE",
7878
SubgraphExecutorError::VariablesSerializationFailure(_, _) => {
7979
"SUBGRAPH_VARIABLES_SERIALIZATION_FAILURE"
80-
},
80+
}
8181
SubgraphExecutorError::AwsSigV4SigningFailure(_, _) => {
8282
"SUBGRAPH_AWS_SIGV4_SIGNING_FAILURE"
8383
}

lib/executor/src/executors/http.rs

Lines changed: 17 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,10 @@ use std::sync::Arc;
33
use crate::executors::common::HttpExecutionResponse;
44
use crate::executors::dedupe::{request_fingerprint, ABuildHasher, SharedResponse};
55
use async_trait::async_trait;
6-
use aws_sigv4::http_request::{
7-
sign, SignableBody, SignableRequest, SigningParams,
8-
};
96
use dashmap::DashMap;
107
use hive_router_config::HiveRouterConfig;
8+
use reqsign_aws_v4::Credential;
9+
use reqsign_core::Signer;
1110
use tokio::sync::OnceCell;
1211

1312
use bytes::{BufMut, Bytes, BytesMut};
@@ -39,7 +38,7 @@ pub struct HTTPSubgraphExecutor {
3938
pub semaphore: Arc<Semaphore>,
4039
pub config: Arc<HiveRouterConfig>,
4140
pub in_flight_requests: Arc<DashMap<u64, Arc<OnceCell<SharedResponse>>, ABuildHasher>>,
42-
pub aws_signing_params: Option<Arc<SigningParams<'static>>>,
41+
pub aws_sigv4_signer: Option<Signer<Credential>>,
4342
}
4443

4544
const FIRST_VARIABLE_STR: &[u8] = b",\"variables\":{";
@@ -55,7 +54,7 @@ impl HTTPSubgraphExecutor {
5554
semaphore: Arc<Semaphore>,
5655
config: Arc<HiveRouterConfig>,
5756
in_flight_requests: Arc<DashMap<u64, Arc<OnceCell<SharedResponse>>, ABuildHasher>>,
58-
aws_signing_params: Option<Arc<SigningParams<'static>>>,
57+
aws_sigv4_signer: Option<Signer<Credential>>,
5958
) -> Self {
6059
let mut header_map = HeaderMap::new();
6160
header_map.insert(
@@ -75,7 +74,7 @@ impl HTTPSubgraphExecutor {
7574
semaphore,
7675
config,
7776
in_flight_requests,
78-
aws_signing_params,
77+
aws_sigv4_signer,
7978
}
8079
}
8180

@@ -138,51 +137,6 @@ impl HTTPSubgraphExecutor {
138137
Ok(body)
139138
}
140139

141-
async fn sign_awssigv4<'a>(
142-
&self,
143-
req: &'a mut http::Request<Full<Bytes>>,
144-
body: &'a [u8],
145-
) -> Result<(), SubgraphExecutorError> {
146-
if let Some(signing_params) = &self.aws_signing_params {
147-
let signable_request = SignableRequest::new(
148-
req.method().as_str(),
149-
req.uri().to_string(),
150-
req.headers().iter().map(|(k, v)| {
151-
(
152-
k.as_str(),
153-
str::from_utf8(v.as_bytes())
154-
.map_err(|err| {
155-
SubgraphExecutorError::AwsSigV4SigningFailure(
156-
self.subgraph_name.to_string(),
157-
err.to_string(),
158-
)
159-
})
160-
.unwrap(),
161-
)
162-
}),
163-
SignableBody::Bytes(body),
164-
)
165-
.map_err(|err| {
166-
SubgraphExecutorError::AwsSigV4SigningFailure(
167-
self.subgraph_name.to_string(),
168-
err.to_string(),
169-
)
170-
})?;
171-
172-
let (signing_instructions, _) = sign(signable_request, &signing_params)
173-
.map_err(|err| {
174-
SubgraphExecutorError::AwsSigV4SigningFailure(
175-
self.subgraph_name.to_string(),
176-
err.to_string(),
177-
)
178-
})?
179-
.into_parts();
180-
181-
signing_instructions.apply_to_request_http1x(req);
182-
}
183-
Ok(())
184-
}
185-
186140
async fn _send_request(
187141
&self,
188142
body: Vec<u8>,
@@ -192,19 +146,26 @@ impl HTTPSubgraphExecutor {
192146
.method(http::Method::POST)
193147
.uri(&self.endpoint)
194148
.version(Version::HTTP_11)
195-
.body(Default::default())
149+
.body(Full::new(Bytes::from(body)))
196150
.map_err(|e| {
197151
SubgraphExecutorError::RequestBuildFailure(self.endpoint.to_string(), e.to_string())
198152
})?;
199153

200154
*req.headers_mut() = headers;
201155

202-
self.sign_awssigv4(&mut req, &body).await?;
203-
204-
*req.body_mut() = Full::new(Bytes::from(body));
205-
206156
debug!("making http request to {}", self.endpoint.to_string());
207157

158+
if let Some(aws_sigv4_signer) = &self.aws_sigv4_signer {
159+
let (mut parts, body) = req.into_parts();
160+
aws_sigv4_signer.sign(&mut parts, None).await.map_err(|e| {
161+
SubgraphExecutorError::AwsSigV4SigningFailure(
162+
self.endpoint.to_string(),
163+
e.to_string(),
164+
)
165+
})?;
166+
req = http::Request::from_parts(parts, body);
167+
}
168+
208169
let res = self.http_client.request(req).await.map_err(|e| {
209170
SubgraphExecutorError::RequestFailure(self.endpoint.to_string(), e.to_string())
210171
})?;

lib/executor/src/executors/map.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use vrl::{
2727
};
2828

2929
use crate::{
30-
execution::client_request_details::ClientRequestDetails,
30+
execution::{awssigv4::create_awssigv4_signer, client_request_details::ClientRequestDetails},
3131
executors::{
3232
common::{
3333
HttpExecutionRequest, HttpExecutionResponse, SubgraphExecutor, SubgraphExecutorBoxedArc,
@@ -317,14 +317,27 @@ impl SubgraphExecutorMap {
317317
.or_insert_with(|| Arc::new(Semaphore::new(self.max_connections_per_host)))
318318
.clone();
319319

320+
let aws_sigv4_signer = if self.config.aws_sig_v4.is_disabled() {
321+
None
322+
} else {
323+
let aws_sigv4_subgraph_config = self
324+
.config
325+
.aws_sig_v4
326+
.subgraphs
327+
.get(subgraph_name)
328+
.unwrap_or(&self.config.aws_sig_v4.all);
329+
330+
Some(create_awssigv4_signer(aws_sigv4_subgraph_config))
331+
};
332+
320333
let executor = HTTPSubgraphExecutor::new(
321334
subgraph_name.to_string(),
322335
endpoint_uri,
323336
self.client.clone(),
324337
semaphore,
325338
self.config.clone(),
326339
self.in_flight_requests.clone(),
327-
None,
340+
aws_sigv4_signer,
328341
);
329342

330343
self.executors_by_subgraph

lib/router-config/src/aws_sig_v4.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ pub struct AwsSigV4Config {
1313

1414
// configuration that will apply to all subgraphs
1515
pub all: AwsSigV4SubgraphConfig,
16-
16+
1717
// per-subgraph configuration overrides
1818
#[serde(default)]
1919
pub subgraphs: HashMap<String, AwsSigV4SubgraphConfig>,
@@ -66,6 +66,7 @@ pub struct HardCodedConfig {
6666
pub secret_access_key: String,
6767
pub region: String,
6868
pub service_name: String,
69+
pub session_token: Option<String>,
6970
}
7071

7172
impl AwsSigV4Config {
@@ -83,4 +84,4 @@ pub struct AssumeRoleConfig {
8384
pub role_arn: String,
8485
pub session_name: Option<String>,
8586
pub external_id: Option<String>,
86-
}
87+
}

lib/router-config/src/lib.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
pub mod aws_sig_v4;
12
pub mod cors;
23
pub mod csrf;
34
mod env_overrides;
@@ -12,7 +13,6 @@ pub mod primitives;
1213
pub mod query_planner;
1314
pub mod supergraph;
1415
pub mod traffic_shaping;
15-
pub mod aws_sig_v4;
1616

1717
use config::{Config, File, FileFormat, FileSourceFile};
1818
use envconfig::Envconfig;
@@ -95,7 +95,10 @@ pub struct HiveRouterConfig {
9595
pub override_labels: OverrideLabelsConfig,
9696

9797
/// Configuration for AWS SigV4 signing of requests to subgraphs.
98-
#[serde(default, skip_serializing_if = "aws_sig_v4::AwsSigV4Config::is_disabled")]
98+
#[serde(
99+
default,
100+
skip_serializing_if = "aws_sig_v4::AwsSigV4Config::is_disabled"
101+
)]
99102
pub aws_sig_v4: aws_sig_v4::AwsSigV4Config,
100103
}
101104

0 commit comments

Comments
 (0)