Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions examples/all_erc20/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

use std::{sync::Arc, time::Instant};

use hypersync_client::{Client, ClientConfig, ColumnMapping, DataType, StreamConfig};
use hypersync_client::{
Client, ClientConfig, ColumnMapping, DataType, SerializationFormat, StreamConfig,
};
use polars_arrow::{
array::{Array, Float64Array},
compute,
Expand All @@ -15,7 +17,13 @@ async fn main() {
env_logger::init().unwrap();

// create default client, uses eth mainnet
let client = Client::new(ClientConfig::default()).unwrap();
let client = Client::new(ClientConfig {
serialization_format: SerializationFormat::CapnProto {
should_cache_queries: true,
},
..Default::default()
})
.unwrap();

let query = serde_json::from_value(serde_json::json!( {
// start from block 10123123 and go to the end of the chain (we don't specify a toBlock).
Expand Down
4 changes: 2 additions & 2 deletions hypersync-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hypersync-client"
version = "0.20.0-rc.2"
version = "0.20.0-rc.3"
edition = "2021"
description = "client library for hypersync"
license = "MPL-2.0"
Expand Down Expand Up @@ -47,7 +47,7 @@ nohash-hasher = "0.2.0"
ethers = { version = "2.0.14", optional = true }
alloy-primitives = "1.1"

hypersync-net-types = { path = "../hypersync-net-types", version = "0.11.0-rc.2" }
hypersync-net-types = { path = "../hypersync-net-types", version = "0.11.0-rc.3" }
hypersync-format = { path = "../hypersync-format", version = "0.5.8" }
hypersync-schema = { path = "../hypersync-schema", version = "0.3" }

Expand Down
24 changes: 8 additions & 16 deletions hypersync-client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,16 @@ pub struct ClientConfig {
}

/// Determines query serialization format for HTTP requests.
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
#[derive(Default, Clone, Copy, Debug, Serialize, Deserialize)]
pub enum SerializationFormat {
/// Use JSON serialization (default)
#[default]
Json,
/// Use Cap'n Proto binary serialization
CapnProto,
}

impl Default for SerializationFormat {
fn default() -> Self {
// Keep this the default until all hs instances are upgraded to use Cap'n Proto endpoint
Self::Json
}
CapnProto {
/// Whether to use query caching
should_cache_queries: bool,
},
}

/// Config for hypersync event streaming.
Expand Down Expand Up @@ -78,18 +75,13 @@ pub struct StreamConfig {
}

/// Determines format of Binary column
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
#[derive(Default, Clone, Copy, Debug, Serialize, Deserialize)]
pub enum HexOutput {
/// Binary column won't be formatted as hex
#[default]
NoEncode,
/// Binary column would be formatted as prefixed hex i.e. 0xdeadbeef
Prefixed,
/// Binary column would be formatted as non prefixed hex i.e. deadbeef
NonPrefixed,
}

impl Default for HexOutput {
fn default() -> Self {
Self::NoEncode
}
}
93 changes: 88 additions & 5 deletions hypersync-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::{num::NonZeroU64, sync::Arc, time::Duration};

use anyhow::{anyhow, Context, Result};
use hypersync_net_types::{ArchiveHeight, ChainId, Query};
use hypersync_net_types::{hypersync_net_types_capnp, ArchiveHeight, ChainId, Query};
use polars_arrow::{array::Array, record_batch::RecordBatchT as Chunk};
use reqwest::Method;

Expand Down Expand Up @@ -40,6 +40,7 @@ pub use decode::Decoder;
pub use decode_call::CallDecoder;
pub use types::{ArrowBatch, ArrowResponse, ArrowResponseData, QueryResponse};

use crate::parse_response::read_query_response;
use crate::simple_types::InternalEventJoinStrategy;

type ArrowChunk = Chunk<Box<dyn Array>>;
Expand Down Expand Up @@ -421,6 +422,15 @@ impl Client {
Ok((res, bytes.len().try_into().unwrap()))
}

fn should_cache_queries(&self) -> bool {
matches!(
self.serialization_format,
SerializationFormat::CapnProto {
should_cache_queries: true
}
)
}

/// Executes query once and returns the result in (Arrow, size) format using Cap'n Proto serialization.
async fn get_arrow_impl_capnp(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
let mut url = self.url.clone();
Expand All @@ -429,16 +439,89 @@ impl Client {
segments.push("arrow-ipc");
segments.push("capnp");
std::mem::drop(segments);
let mut req = self.http_client.request(Method::POST, url);

let should_cache = self.should_cache_queries();

if should_cache {
let query_with_id = {
let mut message = capnp::message::Builder::new_default();
let mut query_builder =
message.init_root::<hypersync_net_types_capnp::query::Builder>();

query_builder.build_query_id_from_query(query)?;
let mut query_with_id = Vec::new();
capnp::serialize_packed::write_message(&mut query_with_id, &message)?;
query_with_id
};

let mut req = self.http_client.request(Method::POST, url.clone());
req = req.header("content-type", "application/x-capnp");
req = req.header("x-hypersync-cache-queries", "true");
if let Some(bearer_token) = &self.bearer_token {
req = req.bearer_auth(bearer_token);
}

let res = req
.body(query_with_id)
.send()
.await
.context("execute http req")?;

let status = res.status();
if status.is_success() {
let bytes = res.bytes().await.context("read response body bytes")?;

let mut opts = capnp::message::ReaderOptions::new();
opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
let message_reader = capnp::serialize_packed::read_message(bytes.as_ref(), opts)
.context("create message reader")?;
let query_response = message_reader
.get_root::<hypersync_net_types_capnp::cached_query_response::Reader>()
.context("get cached_query_response root")?;
match query_response.get_either().which()? {
hypersync_net_types_capnp::cached_query_response::either::Which::QueryResponse(
query_response,
) => {
let res = tokio::task::block_in_place(|| {
let res = query_response?;
read_query_response(&res).context("parse query response cached")
})?;
return Ok((res, bytes.len().try_into().unwrap()));
}
hypersync_net_types_capnp::cached_query_response::either::Which::NotCached(()) => {
log::trace!("query was not cached, retrying with full query");
}
}
} else {
let text = res.text().await.context("read text to see error")?;
log::error!(
"Failed cache query, will retry full query. {}, err body: {}",
status,
text
);
}
};

let full_query_bytes = {
let mut message = capnp::message::Builder::new_default();
let mut query_builder =
message.init_root::<hypersync_net_types_capnp::query::Builder>();

query_builder.build_full_query_from_query(query, should_cache)?;
let mut bytes = Vec::new();
capnp::serialize_packed::write_message(&mut bytes, &message)?;
bytes
};

let mut req = self.http_client.request(Method::POST, url);
req = req.header("content-type", "application/x-capnp");
if let Some(bearer_token) = &self.bearer_token {
req = req.bearer_auth(bearer_token);
}

let query_bytes = query.to_bytes().context("serialize query to bytes")?;
let res = req
.header("content-type", "application/x-capnp")
.body(query_bytes)
.body(full_query_bytes)
.send()
.await
.context("execute http req")?;
Expand Down Expand Up @@ -467,7 +550,7 @@ impl Client {
async fn get_arrow_impl(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
match self.serialization_format {
SerializationFormat::Json => self.get_arrow_impl_json(query).await,
SerializationFormat::CapnProto => self.get_arrow_impl_capnp(query).await,
SerializationFormat::CapnProto { .. } => self.get_arrow_impl_capnp(query).await,
}
}

Expand Down
36 changes: 21 additions & 15 deletions hypersync-client/src/parse_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,9 @@ fn read_chunks(bytes: &[u8]) -> Result<Vec<ArrowBatch>> {
Ok(chunks)
}

pub fn parse_query_response(bytes: &[u8]) -> Result<ArrowResponse> {
let mut opts = capnp::message::ReaderOptions::new();
opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
let message_reader =
capnp::serialize_packed::read_message(bytes, opts).context("create message reader")?;

let query_response = message_reader
.get_root::<hypersync_net_types_capnp::query_response::Reader>()
.context("get root")?;

pub fn read_query_response(
query_response: &hypersync_net_types_capnp::query_response::Reader,
) -> Result<ArrowResponse> {
let archive_height = match query_response.get_archive_height() {
-1 => None,
h => Some(
Expand Down Expand Up @@ -70,12 +63,13 @@ pub fn parse_query_response(bytes: &[u8]) -> Result<ArrowResponse> {

let data = query_response.get_data().context("read data")?;

let blocks = read_chunks(data.get_blocks().context("get data")?).context("parse block data")?;
let transactions =
read_chunks(data.get_transactions().context("get data")?).context("parse tx data")?;
let logs = read_chunks(data.get_logs().context("get data")?).context("parse log data")?;
let blocks =
read_chunks(data.get_blocks().context("get block data")?).context("parse block data")?;
let transactions = read_chunks(data.get_transactions().context("get transaction data")?)
.context("parse tx data")?;
let logs = read_chunks(data.get_logs().context("get log data")?).context("parse log data")?;
let traces = if data.has_traces() {
read_chunks(data.get_traces().context("get data")?).context("parse traces data")?
read_chunks(data.get_traces().context("get trace data")?).context("parse traces data")?
} else {
Vec::new()
};
Expand All @@ -94,3 +88,15 @@ pub fn parse_query_response(bytes: &[u8]) -> Result<ArrowResponse> {
rollback_guard,
})
}

pub fn parse_query_response(bytes: &[u8]) -> Result<ArrowResponse> {
let mut opts = capnp::message::ReaderOptions::new();
opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
let message_reader =
capnp::serialize_packed::read_message(bytes, opts).context("create message reader")?;

let query_response = message_reader
.get_root::<hypersync_net_types_capnp::query_response::Reader>()
.context("get root")?;
read_query_response(&query_response).context("read query response")
}
12 changes: 2 additions & 10 deletions hypersync-client/src/to_ethers.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
//! This module implement specification for Provider generic from ethers.
#![cfg(feature = "ethers")]

use crate::simple_types::{Block, Log, Trace, Transaction};
use ethers;
use ethers::prelude::{CallResult, CreateResult};
Expand All @@ -14,7 +12,7 @@ use ethers::types::{
use hypersync_format::{AccessList, Address, Data, Hash, Quantity};
use polars_arrow::array::ViewType;
use std::default::Default;
use std::fmt::{Display, Formatter, Write};
use std::fmt::{Display, Formatter};

/// Error happened during hypersync -> 3rd-party type conversion
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -774,13 +772,7 @@ impl TryFrom<Log> for EtherLog {
topics: value
.topics
.into_iter()
.filter_map(|topic| {
if let Some(topic) = topic {
Some(topic.into())
} else {
None
}
})
.filter_map(|topic| topic.map(H256::from))
.collect::<Vec<H256>>(),
data: Bytes::from_iter(
value
Expand Down
4 changes: 3 additions & 1 deletion hypersync-client/tests/api_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,9 @@ async fn test_api_capnp_client() {
let client = Arc::new(
Client::new(ClientConfig {
url: Some("http://localhost:1131".parse().unwrap()),
serialization_format: SerializationFormat::CapnProto,
serialization_format: SerializationFormat::CapnProto {
should_cache_queries: true,
},

..Default::default()
})
Expand Down
5 changes: 3 additions & 2 deletions hypersync-net-types/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hypersync-net-types"
version = "0.11.0-rc.2"
version = "0.11.0-rc.3"
edition = "2021"
description = "hypersync types for transport over network"
license = "MPL-2.0"
Expand All @@ -14,8 +14,8 @@ hypersync-format = { path = "../hypersync-format", version = "0.5.8" }
schemars = "1.0.4"
strum = "0.27.2"
strum_macros = "0.27.2"
zstd = "0.13.3"
anyhow = "1.0.100"
xxhash-rust = "0.8.15"

[dev-dependencies]
hypersync-schema = { path = "../hypersync-schema" }
Expand All @@ -24,6 +24,7 @@ pretty_assertions = "1"
sha3 = "0.10.8"
flate2 = "1.1.5"
lz4_flex = "0.11.5"
zstd = "0.13.3"
tabled = "0.20.0"

[[bench]]
Expand Down
Loading
Loading