Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
0e4ed51
Initial impl of capnp serialization of query
JonoPrest Aug 28, 2025
da2c59a
Refactor net types into separate modules
JonoPrest Aug 28, 2025
54763e2
Add field types
JonoPrest Aug 28, 2025
96cfd1f
Add tests for using all fields in schema
JonoPrest Aug 28, 2025
3946859
Update field types and tests to use strum for ordering
JonoPrest Aug 28, 2025
879f29d
Compiling with new field types
JonoPrest Aug 28, 2025
7fa5b8d
Move capnp logic relevant modules
JonoPrest Aug 28, 2025
f451e4c
Use serialize packed for capnp
JonoPrest Aug 28, 2025
0682db8
Wip add capnp deserializer
JonoPrest Aug 28, 2025
0f720fe
Wip add deserialization for individual filters
JonoPrest Aug 28, 2025
17059e9
Refactor and ensure filters get set
JonoPrest Aug 28, 2025
499a487
Add test and benchmark for default query
JonoPrest Aug 28, 2025
bc6af71
Passing base query
JonoPrest Aug 28, 2025
47de3f8
Add tests for block serde
JonoPrest Aug 28, 2025
89a1f1d
Add test for transaction serde
JonoPrest Aug 28, 2025
6d82d8c
Fix transaction serde
JonoPrest Aug 28, 2025
e55ae52
Fix api test
JonoPrest Aug 28, 2025
c1c64ea
Add test for logs serde
JonoPrest Aug 28, 2025
9a0167a
Add test for trace serde
JonoPrest Aug 28, 2025
198388f
Add benchmark for large payload and test with bincode
JonoPrest Aug 28, 2025
c7e022c
Add moderate payload test
JonoPrest Aug 28, 2025
0ae9bfb
Fix description of config
JonoPrest Aug 28, 2025
85778ff
Bump versions
JonoPrest Aug 28, 2025
69c4a64
Add display and from_str traits to fields
JonoPrest Aug 28, 2025
fb838e8
Change default serialization
JonoPrest Aug 28, 2025
8a7d392
Change serialized query structure
JonoPrest Aug 29, 2025
220d681
Working api_test
JonoPrest Aug 29, 2025
56a46c4
Add summary
JonoPrest Aug 29, 2025
04e3f2c
Add new test
JonoPrest Aug 29, 2025
05230b9
Add commands
JonoPrest Aug 29, 2025
111786a
Add benchmarks to presentation
JonoPrest Aug 29, 2025
3321ccd
Add future improvements
JonoPrest Aug 29, 2025
cfdbcba
Backmerg main with stubs
JonoPrest Oct 24, 2025
a3ed025
Implement new capnp structures
JonoPrest Oct 24, 2025
7cd5150
Remove breaking bincode benchmarks
JonoPrest Oct 24, 2025
91d2fc4
Bump versions
JonoPrest Oct 24, 2025
e957404
Squashed commit of the following:
JonoPrest Oct 24, 2025
219edf8
Include capnp generated code in src so that there is no dependency on…
JonoPrest Oct 27, 2025
baebe9f
Small serialization speed improvment
JonoPrest Oct 27, 2025
c3726a8
Remove hack presentation MD file
JonoPrest Oct 27, 2025
3ce176c
Run cargo fmt on generated file
JonoPrest Oct 27, 2025
fe5874c
Ignore capnp api test
JonoPrest Oct 27, 2025
9d1e912
Ignore clippy warnings for generated code
JonoPrest Oct 27, 2025
ae9ee15
Use full name flag for capnp compile output
JonoPrest Oct 27, 2025
8e76dc6
Testing compression
JonoPrest Oct 27, 2025
5d00d16
Compression tests
JonoPrest Oct 28, 2025
5f7ccee
Use zstd compression on query
JonoPrest Oct 28, 2025
7b3bc54
Fix warnings in benchmark
JonoPrest Oct 29, 2025
8e4e85a
Remove Readme deps on capnp compiler
JonoPrest Oct 29, 2025
cb33efe
Fix typo
JonoPrest Oct 29, 2025
df66e70
Apply code rabbit make file suggestion
JonoPrest Oct 29, 2025
abcee8a
Fix bugs in return type
JonoPrest Oct 29, 2025
4d866f3
Implement code rabbit suggestions
JonoPrest Oct 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 1 addition & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,10 @@

[![CI](https://github.com/enviodev/hypersync-client-rust/actions/workflows/ci.yaml/badge.svg?branch=main)](https://github.com/enviodev/hypersync-client-rust/actions/workflows/ci.yaml)
<a href="https://crates.io/crates/hypersync-client">
<img src="https://img.shields.io/crates/v/hypersync-client.svg?style=flat-square"
<img src="https://img.shields.io/crates/v/hypersync-client.svg?style=flat-square"
alt="Crates.io version" />
</a>

Rust crate for [Envio's](https://envio.dev/) HyperSync client.

[Documentation Page](https://docs.envio.dev/docs/hypersync-clients)

### Dependencies

Need to install capnproto tool in order to build the library.

#### Linux

```bash
apt-get install -y capnproto libcapnp-dev
```

#### Windows

```bash
choco install capnproto
```

#### MacOS

```bash
brew install capnp
```
4 changes: 2 additions & 2 deletions hypersync-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hypersync-client"
version = "0.20.0-rc.1"
version = "0.20.0-rc.2"
edition = "2021"
description = "client library for hypersync"
license = "MPL-2.0"
Expand Down Expand Up @@ -47,7 +47,7 @@ nohash-hasher = "0.2.0"
ethers = { version = "2.0.14", optional = true }
alloy-primitives = "1.1"

hypersync-net-types = { path = "../hypersync-net-types", version = "0.11.0-rc.1" }
hypersync-net-types = { path = "../hypersync-net-types", version = "0.11.0-rc.2" }
hypersync-format = { path = "../hypersync-format", version = "0.5" }
hypersync-schema = { path = "../hypersync-schema", version = "0.3" }

Expand Down
19 changes: 19 additions & 0 deletions hypersync-client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,25 @@ pub struct ClientConfig {
pub retry_base_ms: Option<u64>,
/// Ceiling time for request backoff.
pub retry_ceiling_ms: Option<u64>,
/// Query serialization format to use for HTTP requests.
#[serde(default)]
pub serialization_format: SerializationFormat,
}

/// Determines query serialization format for HTTP requests.
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub enum SerializationFormat {
/// Use JSON serialization (default)
Json,
/// Use Cap'n Proto binary serialization
CapnProto,
}

impl Default for SerializationFormat {
fn default() -> Self {
// Keep this the default until all hs instances are upgraded to use Cap'n Proto endpoint
Self::Json
}
}

/// Config for hypersync event streaming.
Expand Down
59 changes: 56 additions & 3 deletions hypersync-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use url::Url;

pub use column_mapping::{ColumnMapping, DataType};
pub use config::HexOutput;
pub use config::{ClientConfig, StreamConfig};
pub use config::{ClientConfig, SerializationFormat, StreamConfig};
pub use decode::Decoder;
pub use decode_call::CallDecoder;
pub use types::{ArrowBatch, ArrowResponse, ArrowResponseData, QueryResponse};
Expand All @@ -61,6 +61,8 @@ pub struct Client {
retry_base_ms: u64,
/// Ceiling time for request backoff.
retry_ceiling_ms: u64,
/// Query serialization format to use for HTTP requests.
serialization_format: SerializationFormat,
}

impl Client {
Expand All @@ -86,6 +88,7 @@ impl Client {
retry_backoff_ms: cfg.retry_backoff_ms.unwrap_or(500),
retry_base_ms: cfg.retry_base_ms.unwrap_or(200),
retry_ceiling_ms: cfg.retry_ceiling_ms.unwrap_or(5_000),
serialization_format: cfg.serialization_format,
})
}

Expand Down Expand Up @@ -383,8 +386,8 @@ impl Client {
))
}

/// Executes query once and returns the result in (Arrow, size) format.
async fn get_arrow_impl(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
/// Executes query once and returns the result in (Arrow, size) format using JSON serialization.
async fn get_arrow_impl_json(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
let mut url = self.url.clone();
let mut segments = url.path_segments_mut().ok().context("get path segments")?;
segments.push("query");
Expand Down Expand Up @@ -418,6 +421,56 @@ impl Client {
Ok((res, bytes.len().try_into().unwrap()))
}

/// Executes query once and returns the result in (Arrow, size) format using Cap'n Proto serialization.
async fn get_arrow_impl_capnp(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
let mut url = self.url.clone();
let mut segments = url.path_segments_mut().ok().context("get path segments")?;
segments.push("query");
segments.push("arrow-ipc");
segments.push("capnp");
std::mem::drop(segments);
let mut req = self.http_client.request(Method::POST, url);

if let Some(bearer_token) = &self.bearer_token {
req = req.bearer_auth(bearer_token);
}

let query_bytes = query.to_bytes().context("serialize query to bytes")?;
let res = req
.header("content-type", "application/x-capnp")
.body(query_bytes)
.send()
.await
.context("execute http req")?;

let status = res.status();
if !status.is_success() {
let text = res.text().await.context("read text to see error")?;

return Err(anyhow!(
"http response status code {}, err body: {}",
status,
text
));
}

let bytes = res.bytes().await.context("read response body bytes")?;

let res = tokio::task::block_in_place(|| {
parse_query_response(&bytes).context("parse query response")
})?;

Ok((res, bytes.len().try_into().unwrap()))
}

/// Executes query once and returns the result in (Arrow, size) format.
async fn get_arrow_impl(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
match self.serialization_format {
SerializationFormat::Json => self.get_arrow_impl_json(query).await,
SerializationFormat::CapnProto => self.get_arrow_impl_capnp(query).await,
}
}

/// Executes query with retries and returns the response in Arrow format.
pub async fn get_arrow(&self, query: &Query) -> Result<ArrowResponse> {
self.get_arrow_with_size(query).await.map(|res| res.0)
Expand Down
52 changes: 13 additions & 39 deletions hypersync-client/src/preset_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ use std::collections::BTreeSet;

use arrayvec::ArrayVec;
use hypersync_format::{Address, LogArgument};
use hypersync_net_types::block::BlockField;
use hypersync_net_types::log::LogField;
use hypersync_net_types::transaction::TransactionField;
use hypersync_net_types::{
FieldSelection, LogFilter, LogSelection, Query, TransactionFilter, TransactionSelection,
};
Expand All @@ -12,17 +15,8 @@ use hypersync_net_types::{
/// Note: this is only for quickstart purposes. For the best performance, create a custom query
/// that only includes the fields you'll use in `field_selection`.
pub fn blocks_and_transactions(from_block: u64, to_block: Option<u64>) -> Query {
let all_block_fields: BTreeSet<String> = hypersync_schema::block_header()
.fields
.iter()
.map(|x| x.name.clone())
.collect();

let all_tx_fields: BTreeSet<String> = hypersync_schema::transaction()
.fields
.iter()
.map(|x| x.name.clone())
.collect();
let all_block_fields = BlockField::all();
let all_tx_fields = TransactionField::all();

Query {
from_block,
Expand All @@ -45,15 +39,11 @@ pub fn blocks_and_transactions(from_block: u64, to_block: Option<u64>) -> Query
/// that only includes the fields you'll use in `field_selection`.
pub fn blocks_and_transaction_hashes(from_block: u64, to_block: Option<u64>) -> Query {
let mut tx_field_selection = BTreeSet::new();
tx_field_selection.insert("block_hash".to_owned());
tx_field_selection.insert("block_number".to_owned());
tx_field_selection.insert("hash".to_owned());
tx_field_selection.insert(TransactionField::BlockHash);
tx_field_selection.insert(TransactionField::BlockNumber);
tx_field_selection.insert(TransactionField::Hash);

let all_block_fields: BTreeSet<String> = hypersync_schema::block_header()
.fields
.iter()
.map(|x| x.name.clone())
.collect();
let all_block_fields = BlockField::all();

Query {
from_block,
Expand All @@ -74,11 +64,7 @@ pub fn blocks_and_transaction_hashes(from_block: u64, to_block: Option<u64>) ->
/// Note: this is only for quickstart purposes. For the best performance, create a custom query
/// that only includes the fields you'll use in `field_selection`.
pub fn logs(from_block: u64, to_block: Option<u64>, contract_address: Address) -> Query {
let all_log_fields: BTreeSet<String> = hypersync_schema::log()
.fields
.iter()
.map(|x| x.name.clone())
.collect();
let all_log_fields = LogField::all();

Query {
from_block,
Expand Down Expand Up @@ -109,11 +95,7 @@ pub fn logs_of_event(
let mut topics = ArrayVec::<Vec<LogArgument>, 4>::new();
topics.insert(0, vec![topic0]);

let all_log_fields: BTreeSet<String> = hypersync_schema::log()
.fields
.iter()
.map(|x| x.name.clone())
.collect();
let all_log_fields = LogField::all();

Query {
from_block,
Expand All @@ -136,11 +118,7 @@ pub fn logs_of_event(
/// Note: this is only for quickstart purposes. For the best performance, create a custom query
/// that only includes the fields you'll use in `field_selection`.
pub fn transactions(from_block: u64, to_block: Option<u64>) -> Query {
let all_txn_fields: BTreeSet<String> = hypersync_schema::transaction()
.fields
.iter()
.map(|x| x.name.clone())
.collect();
let all_txn_fields = TransactionField::all();

Query {
from_block,
Expand All @@ -163,11 +141,7 @@ pub fn transactions_from_address(
to_block: Option<u64>,
address: Address,
) -> Query {
let all_txn_fields: BTreeSet<String> = hypersync_schema::transaction()
.fields
.iter()
.map(|x| x.name.clone())
.collect();
let all_txn_fields = TransactionField::all();

Query {
from_block,
Expand Down
42 changes: 17 additions & 25 deletions hypersync-client/src/simple_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use hypersync_format::{
AccessList, Address, Authorization, BlockNumber, BloomFilter, Data, Hash, LogArgument,
LogIndex, Nonce, Quantity, TransactionIndex, TransactionStatus, TransactionType, Withdrawal,
};
use hypersync_net_types::FieldSelection;
use hypersync_net_types::{
block::BlockField, log::LogField, transaction::TransactionField, FieldSelection,
};
use nohash_hasher::IntMap;
use serde::{Deserialize, Serialize};
use xxhash_rust::xxh3::Xxh3Builder;
Expand All @@ -26,10 +28,10 @@ pub struct Event {

// Field lists for implementing event based API, these fields are used for joining
// so they should always be added to the field selection.
const BLOCK_JOIN_FIELD: &str = "number";
const TX_JOIN_FIELD: &str = "hash";
const LOG_JOIN_FIELD_WITH_TX: &str = "transaction_hash";
const LOG_JOIN_FIELD_WITH_BLOCK: &str = "block_number";
const BLOCK_JOIN_FIELD: BlockField = BlockField::Number;
const TX_JOIN_FIELD: TransactionField = TransactionField::Hash;
const LOG_JOIN_FIELD_WITH_TX: LogField = LogField::TransactionHash;
const LOG_JOIN_FIELD_WITH_BLOCK: LogField = LogField::BlockNumber;

enum InternalJoinStrategy {
NotSelected,
Expand All @@ -51,15 +53,15 @@ impl From<&FieldSelection> for InternalEventJoinStrategy {
Self {
block: if block_fields_num == 0 {
InternalJoinStrategy::NotSelected
} else if block_fields_num == 1 && field_selection.block.contains(BLOCK_JOIN_FIELD) {
} else if block_fields_num == 1 && field_selection.block.contains(&BLOCK_JOIN_FIELD) {
InternalJoinStrategy::OnlyLogJoinField
} else {
InternalJoinStrategy::FullJoin
},
transaction: if transaction_fields_num == 0 {
InternalJoinStrategy::NotSelected
} else if transaction_fields_num == 1
&& field_selection.transaction.contains(TX_JOIN_FIELD)
&& field_selection.transaction.contains(&TX_JOIN_FIELD)
{
InternalJoinStrategy::OnlyLogJoinField
} else {
Expand All @@ -75,34 +77,24 @@ impl InternalEventJoinStrategy {
match self.block {
InternalJoinStrategy::NotSelected => (),
InternalJoinStrategy::OnlyLogJoinField => {
field_selection
.log
.insert(LOG_JOIN_FIELD_WITH_BLOCK.to_string());
field_selection.block.remove(BLOCK_JOIN_FIELD);
field_selection.log.insert(LOG_JOIN_FIELD_WITH_BLOCK);
field_selection.block.remove(&BLOCK_JOIN_FIELD);
}
InternalJoinStrategy::FullJoin => {
field_selection
.log
.insert(LOG_JOIN_FIELD_WITH_BLOCK.to_string());
field_selection.block.insert(BLOCK_JOIN_FIELD.to_string());
field_selection.log.insert(LOG_JOIN_FIELD_WITH_BLOCK);
field_selection.block.insert(BLOCK_JOIN_FIELD);
}
}

match self.transaction {
InternalJoinStrategy::NotSelected => (),
InternalJoinStrategy::OnlyLogJoinField => {
field_selection
.log
.insert(LOG_JOIN_FIELD_WITH_TX.to_string());
field_selection.transaction.remove(TX_JOIN_FIELD);
field_selection.log.insert(LOG_JOIN_FIELD_WITH_TX);
field_selection.transaction.remove(&TX_JOIN_FIELD);
}
InternalJoinStrategy::FullJoin => {
field_selection
.log
.insert(LOG_JOIN_FIELD_WITH_TX.to_string());
field_selection
.transaction
.insert(TX_JOIN_FIELD.to_string());
field_selection.log.insert(LOG_JOIN_FIELD_WITH_TX);
field_selection.transaction.insert(TX_JOIN_FIELD);
}
}
}
Expand Down
Loading
Loading