33use std:: { num:: NonZeroU64 , sync:: Arc , time:: Duration } ;
44
55use anyhow:: { anyhow, Context , Result } ;
6- use hypersync_net_types:: { ArchiveHeight , ChainId , Query } ;
6+ use hypersync_net_types:: { hypersync_net_types_capnp , ArchiveHeight , ChainId , Query } ;
77use polars_arrow:: { array:: Array , record_batch:: RecordBatchT as Chunk } ;
88use reqwest:: Method ;
99
@@ -40,6 +40,7 @@ pub use decode::Decoder;
4040pub use decode_call:: CallDecoder ;
4141pub use types:: { ArrowBatch , ArrowResponse , ArrowResponseData , QueryResponse } ;
4242
43+ use crate :: parse_response:: read_query_response;
4344use crate :: simple_types:: InternalEventJoinStrategy ;
4445
4546type ArrowChunk = Chunk < Box < dyn Array > > ;
@@ -421,6 +422,15 @@ impl Client {
421422 Ok ( ( res, bytes. len ( ) . try_into ( ) . unwrap ( ) ) )
422423 }
423424
425+ fn should_cache_queries ( & self ) -> bool {
426+ matches ! (
427+ self . serialization_format,
428+ SerializationFormat :: CapnProto {
429+ should_cache_queries: true
430+ }
431+ )
432+ }
433+
424434 /// Executes query once and returns the result in (Arrow, size) format using Cap'n Proto serialization.
425435 async fn get_arrow_impl_capnp ( & self , query : & Query ) -> Result < ( ArrowResponse , u64 ) > {
426436 let mut url = self . url . clone ( ) ;
@@ -429,16 +439,89 @@ impl Client {
429439 segments. push ( "arrow-ipc" ) ;
430440 segments. push ( "capnp" ) ;
431441 std:: mem:: drop ( segments) ;
432- let mut req = self . http_client . request ( Method :: POST , url) ;
433442
443+ let should_cache = self . should_cache_queries ( ) ;
444+
445+ if should_cache {
446+ let query_with_id = {
447+ let mut message = capnp:: message:: Builder :: new_default ( ) ;
448+ let mut query_builder =
449+ message. init_root :: < hypersync_net_types_capnp:: query:: Builder > ( ) ;
450+
451+ query_builder. build_query_id_from_query ( query) ?;
452+ let mut query_with_id = Vec :: new ( ) ;
453+ capnp:: serialize_packed:: write_message ( & mut query_with_id, & message) ?;
454+ query_with_id
455+ } ;
456+
457+ let mut req = self . http_client . request ( Method :: POST , url. clone ( ) ) ;
458+ req = req. header ( "content-type" , "application/x-capnp" ) ;
459+ req = req. header ( "x-hypersync-cache-queries" , "true" ) ;
460+ if let Some ( bearer_token) = & self . bearer_token {
461+ req = req. bearer_auth ( bearer_token) ;
462+ }
463+
464+ let res = req
465+ . body ( query_with_id)
466+ . send ( )
467+ . await
468+ . context ( "execute http req" ) ?;
469+
470+ let status = res. status ( ) ;
471+ if status. is_success ( ) {
472+ let bytes = res. bytes ( ) . await . context ( "read response body bytes" ) ?;
473+
474+ let mut opts = capnp:: message:: ReaderOptions :: new ( ) ;
475+ opts. nesting_limit ( i32:: MAX ) . traversal_limit_in_words ( None ) ;
476+ let message_reader = capnp:: serialize_packed:: read_message ( bytes. as_ref ( ) , opts)
477+ . context ( "create message reader" ) ?;
478+ let query_response = message_reader
479+ . get_root :: < hypersync_net_types_capnp:: cached_query_response:: Reader > ( )
480+ . context ( "get cached_query_response root" ) ?;
481+ match query_response. get_either ( ) . which ( ) ? {
482+ hypersync_net_types_capnp:: cached_query_response:: either:: Which :: QueryResponse (
483+ query_response,
484+ ) => {
485+ let res = tokio:: task:: block_in_place ( || {
486+ let res = query_response?;
487+ read_query_response ( & res) . context ( "parse query response cached" )
488+ } ) ?;
489+ return Ok ( ( res, bytes. len ( ) . try_into ( ) . unwrap ( ) ) ) ;
490+ }
491+ hypersync_net_types_capnp:: cached_query_response:: either:: Which :: NotCached ( ( ) ) => {
492+ log:: trace!( "query was not cached, retrying with full query" ) ;
493+ }
494+ }
495+ } else {
496+ let text = res. text ( ) . await . context ( "read text to see error" ) ?;
497+ log:: error!(
498+ "Failed cache query, will retry full query. {}, err body: {}" ,
499+ status,
500+ text
501+ ) ;
502+ }
503+ } ;
504+
505+ let full_query_bytes = {
506+ let mut message = capnp:: message:: Builder :: new_default ( ) ;
507+ let mut query_builder =
508+ message. init_root :: < hypersync_net_types_capnp:: query:: Builder > ( ) ;
509+
510+ query_builder. build_full_query_from_query ( query, should_cache) ?;
511+ let mut bytes = Vec :: new ( ) ;
512+ capnp:: serialize_packed:: write_message ( & mut bytes, & message) ?;
513+ bytes
514+ } ;
515+
516+ let mut req = self . http_client . request ( Method :: POST , url) ;
517+ req = req. header ( "content-type" , "application/x-capnp" ) ;
434518 if let Some ( bearer_token) = & self . bearer_token {
435519 req = req. bearer_auth ( bearer_token) ;
436520 }
437521
438- let query_bytes = query. to_bytes ( ) . context ( "serialize query to bytes" ) ?;
439522 let res = req
440523 . header ( "content-type" , "application/x-capnp" )
441- . body ( query_bytes )
524+ . body ( full_query_bytes )
442525 . send ( )
443526 . await
444527 . context ( "execute http req" ) ?;
@@ -467,7 +550,7 @@ impl Client {
467550 async fn get_arrow_impl ( & self , query : & Query ) -> Result < ( ArrowResponse , u64 ) > {
468551 match self . serialization_format {
469552 SerializationFormat :: Json => self . get_arrow_impl_json ( query) . await ,
470- SerializationFormat :: CapnProto => self . get_arrow_impl_capnp ( query) . await ,
553+ SerializationFormat :: CapnProto { .. } => self . get_arrow_impl_capnp ( query) . await ,
471554 }
472555 }
473556
0 commit comments