1+ use std:: collections:: BTreeMap ;
12use std:: sync:: Arc ;
23
34use crate :: executors:: common:: HttpExecutionResponse ;
45use crate :: executors:: dedupe:: { request_fingerprint, ABuildHasher , SharedResponse } ;
6+ use crate :: utils:: expression:: execute_expression_with_value;
57use dashmap:: DashMap ;
68use hive_router_config:: HiveRouterConfig ;
79use tokio:: sync:: OnceCell ;
810
911use async_trait:: async_trait;
1012
1113use bytes:: { BufMut , Bytes , BytesMut } ;
14+ use hmac:: { Hmac , Mac } ;
1215use http:: HeaderMap ;
1316use http:: HeaderValue ;
1417use http_body_util:: BodyExt ;
1518use http_body_util:: Full ;
1619use hyper:: Version ;
1720use hyper_tls:: HttpsConnector ;
1821use hyper_util:: client:: legacy:: { connect:: HttpConnector , Client } ;
22+ use sha2:: Sha256 ;
1923use tokio:: sync:: Semaphore ;
2024use tracing:: debug;
25+ use vrl:: compiler:: Program as VrlProgram ;
2126
2227use crate :: executors:: common:: HttpExecutionRequest ;
2328use crate :: executors:: error:: SubgraphExecutorError ;
@@ -27,6 +32,7 @@ use crate::utils::consts::COLON;
2732use crate :: utils:: consts:: COMMA ;
2833use crate :: utils:: consts:: QUOTE ;
2934use crate :: { executors:: common:: SubgraphExecutor , json_writer:: write_and_escape_string} ;
35+ use vrl:: core:: Value as VrlValue ;
3036
3137#[ derive( Debug ) ]
3238pub struct HTTPSubgraphExecutor {
@@ -37,13 +43,23 @@ pub struct HTTPSubgraphExecutor {
3743 pub semaphore : Arc < Semaphore > ,
3844 pub config : Arc < HiveRouterConfig > ,
3945 pub in_flight_requests : Arc < DashMap < u64 , Arc < OnceCell < SharedResponse > > , ABuildHasher > > ,
46+ pub should_sign_hmac : BooleanOrProgram ,
4047}
4148
4249const FIRST_VARIABLE_STR : & [ u8 ] = b",\" variables\" :{" ;
4350const FIRST_QUOTE_STR : & [ u8 ] = b"{\" query\" :" ;
51+ const FIRST_EXTENSION_STR : & [ u8 ] = b",\" extensions\" :{" ;
4452
4553pub type HttpClient = Client < HttpsConnector < HttpConnector > , Full < Bytes > > ;
4654
55+ type HmacSha256 = Hmac < Sha256 > ;
56+
57+ #[ derive( Debug ) ]
58+ pub enum BooleanOrProgram {
59+ Boolean ( bool ) ,
60+ Program ( Box < VrlProgram > ) ,
61+ }
62+
4763impl HTTPSubgraphExecutor {
4864 pub fn new (
4965 subgraph_name : String ,
@@ -52,6 +68,7 @@ impl HTTPSubgraphExecutor {
5268 semaphore : Arc < Semaphore > ,
5369 config : Arc < HiveRouterConfig > ,
5470 in_flight_requests : Arc < DashMap < u64 , Arc < OnceCell < SharedResponse > > , ABuildHasher > > ,
71+ should_sign_hmac : BooleanOrProgram ,
5572 ) -> Self {
5673 let mut header_map = HeaderMap :: new ( ) ;
5774 header_map. insert (
@@ -71,12 +88,13 @@ impl HTTPSubgraphExecutor {
7188 semaphore,
7289 config,
7390 in_flight_requests,
91+ should_sign_hmac,
7492 }
7593 }
7694
77- fn build_request_body < ' a > (
95+ fn build_request_body < ' exec , ' req > (
7896 & self ,
79- execution_request : & HttpExecutionRequest < ' a > ,
97+ execution_request : & HttpExecutionRequest < ' exec , ' req > ,
8098 ) -> Result < Vec < u8 > , SubgraphExecutorError > {
8199 let mut body = Vec :: with_capacity ( 4096 ) ;
82100 body. put ( FIRST_QUOTE_STR ) ;
@@ -118,13 +136,89 @@ impl HTTPSubgraphExecutor {
118136 body. put ( CLOSE_BRACE ) ;
119137 }
120138
121- if let Some ( extensions) = & execution_request. extensions {
122- if !extensions. is_empty ( ) {
123- let as_value = sonic_rs:: to_value ( extensions) . unwrap ( ) ;
139+ let should_sign_hmac = match & self . should_sign_hmac {
140+ BooleanOrProgram :: Boolean ( b) => * b,
141+ BooleanOrProgram :: Program ( expr) => {
142+ // .subgraph
143+ let subgraph_value = VrlValue :: Object ( BTreeMap :: from ( [ (
144+ "name" . into ( ) ,
145+ VrlValue :: Bytes ( Bytes :: from ( self . subgraph_name . to_owned ( ) ) ) ,
146+ ) ] ) ) ;
147+ // .request
148+ let request_value: VrlValue = execution_request. client_request . into ( ) ;
149+ let target_value = VrlValue :: Object ( BTreeMap :: from ( [
150+ ( "subgraph" . into ( ) , subgraph_value) ,
151+ ( "request" . into ( ) , request_value) ,
152+ ] ) ) ;
153+ let result = execute_expression_with_value ( expr, target_value) ;
154+ match result {
155+ Ok ( VrlValue :: Boolean ( b) ) => b,
156+ Ok ( _) => {
157+ return Err ( SubgraphExecutorError :: HMACSignatureError (
158+ "HMAC signature expression did not evaluate to a boolean" . to_string ( ) ,
159+ ) ) ;
160+ }
161+ Err ( e) => {
162+ return Err ( SubgraphExecutorError :: HMACSignatureError ( format ! (
163+ "HMAC signature expression evaluation error: {}" ,
164+ e
165+ ) ) ) ;
166+ }
167+ }
168+ }
169+ } ;
124170
125- body. put ( COMMA ) ;
126- body. put ( "\" extensions\" :" . as_bytes ( ) ) ;
127- body. extend_from_slice ( as_value. to_string ( ) . as_bytes ( ) ) ;
171+ let hmac_signature_ext = if should_sign_hmac {
172+ let mut mac = HmacSha256 :: new_from_slice ( self . config . hmac_signature . secret . as_bytes ( ) )
173+ . map_err ( |e| {
174+ SubgraphExecutorError :: HMACSignatureError ( format ! (
175+ "Failed to create HMAC instance: {}" ,
176+ e
177+ ) )
178+ } ) ?;
179+ let mut body_without_extensions = body. clone ( ) ;
180+ body_without_extensions. put ( CLOSE_BRACE ) ;
181+ mac. update ( & body_without_extensions) ;
182+ let result = mac. finalize ( ) ;
183+ let result_bytes = result. into_bytes ( ) ;
184+ Some ( result_bytes)
185+ } else {
186+ None
187+ } ;
188+
189+ if let Some ( extensions) = & execution_request. extensions {
190+ let mut first = true ;
191+ if let Some ( hmac_bytes) = hmac_signature_ext {
192+ if first {
193+ body. put ( FIRST_EXTENSION_STR ) ;
194+ first = false ;
195+ } else {
196+ body. put ( COMMA ) ;
197+ }
198+ body. put ( self . config . hmac_signature . extension_name . as_bytes ( ) ) ;
199+ let hmac_hex = hex:: encode ( hmac_bytes) ;
200+ body. put ( QUOTE ) ;
201+ body. put ( hmac_hex. as_bytes ( ) ) ;
202+ body. put ( QUOTE ) ;
203+ }
204+ for ( extension_name, extension_value) in extensions {
205+ if first {
206+ body. put ( FIRST_EXTENSION_STR ) ;
207+ first = false ;
208+ } else {
209+ body. put ( COMMA ) ;
210+ }
211+ body. put ( QUOTE ) ;
212+ body. put ( extension_name. as_bytes ( ) ) ;
213+ body. put ( QUOTE ) ;
214+ body. put ( COLON ) ;
215+ let value_str = sonic_rs:: to_string ( extension_value) . map_err ( |err| {
216+ SubgraphExecutorError :: ExtensionSerializationFailure (
217+ extension_name. to_string ( ) ,
218+ err. to_string ( ) ,
219+ )
220+ } ) ?;
221+ body. put ( value_str. as_bytes ( ) ) ;
128222 }
129223 }
130224
@@ -210,9 +304,9 @@ impl HTTPSubgraphExecutor {
210304#[ async_trait]
211305impl SubgraphExecutor for HTTPSubgraphExecutor {
212306 #[ tracing:: instrument( skip_all, fields( subgraph_name = self . subgraph_name) ) ]
213- async fn execute < ' a > (
307+ async fn execute < ' exec , ' req > (
214308 & self ,
215- execution_request : HttpExecutionRequest < ' a > ,
309+ execution_request : HttpExecutionRequest < ' exec , ' req > ,
216310 ) -> HttpExecutionResponse {
217311 let body = match self . build_request_body ( & execution_request) {
218312 Ok ( body) => body,
0 commit comments