Skip to content

Commit d7de2d3

Browse files
authored
Capnp encoding for queries (#75)
* Initial impl of capnp serialization of query * Refactor net types into separate modules * Add field types * Add tests for using all fields in schema * Update field types and tests to use strum for ordering * Compiling with new field types * Move capnp logic relevant modules * Use serialize packed for capnp * Wip add capnp deserializer * Wip add deserialization for individual filters * Refactor and ensure filters get set * Add test and benchmark for default query * Passing base query * Add tests for block serde * Add test for transaction serde * Fix transaction serde * Fix api test * Add test for logs serde * Add test for trace serde * Add benchmark for large payload and test with bincode * Add moderate payload test * Fix description of config * Bump versions * Add display and from_str traits to fields * Change default serialization * Change serialized query structure * Working api_test * Add summary * Add new test * Add commands * Add benchmarks to presentation * Add future improvements * Backmerg main with stubs * Implement new capnp structures * Remove breaking bincode benchmarks * Bump versions * Include capnp generated code in src so that there is no dependency on capnpc * Small serialization speed improvment * Remove hack presentation MD file * Run cargo fmt on generated file * Ignore capnp api test * Ignore clippy warnings for generated code * Use full name flag for capnp compile output * Testing compression * Compression tests * Use zstd compression on query * Fix warnings in benchmark * Remove Readme deps on capnp compiler * Fix typo * Apply code rabbit make file suggestion * Fix bugs in return type * Implement code rabbit suggestions
1 parent f91c16a commit d7de2d3

File tree

23 files changed

+11969
-350
lines changed

23 files changed

+11969
-350
lines changed

README.md

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,10 @@
22

33
[![CI](https://github.com/enviodev/hypersync-client-rust/actions/workflows/ci.yaml/badge.svg?branch=main)](https://github.com/enviodev/hypersync-client-rust/actions/workflows/ci.yaml)
44
<a href="https://crates.io/crates/hypersync-client">
5-
<img src="https://img.shields.io/crates/v/hypersync-client.svg?style=flat-square"
5+
<img src="https://img.shields.io/crates/v/hypersync-client.svg?style=flat-square"
66
alt="Crates.io version" />
77
</a>
88

99
Rust crate for [Envio's](https://envio.dev/) HyperSync client.
1010

1111
[Documentation Page](https://docs.envio.dev/docs/hypersync-clients)
12-
13-
### Dependencies
14-
15-
Need to install capnproto tool in order to build the library.
16-
17-
#### Linux
18-
19-
```bash
20-
apt-get install -y capnproto libcapnp-dev
21-
```
22-
23-
#### Windows
24-
25-
```bash
26-
choco install capnproto
27-
```
28-
29-
#### MacOS
30-
31-
```bash
32-
brew install capnp
33-
```

hypersync-client/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "hypersync-client"
3-
version = "0.20.0-rc.1"
3+
version = "0.20.0-rc.2"
44
edition = "2021"
55
description = "client library for hypersync"
66
license = "MPL-2.0"
@@ -47,7 +47,7 @@ nohash-hasher = "0.2.0"
4747
ethers = { version = "2.0.14", optional = true }
4848
alloy-primitives = "1.1"
4949

50-
hypersync-net-types = { path = "../hypersync-net-types", version = "0.11.0-rc.1" }
50+
hypersync-net-types = { path = "../hypersync-net-types", version = "0.11.0-rc.2" }
5151
hypersync-format = { path = "../hypersync-format", version = "0.5" }
5252
hypersync-schema = { path = "../hypersync-schema", version = "0.3" }
5353

hypersync-client/src/config.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,25 @@ pub struct ClientConfig {
2121
pub retry_base_ms: Option<u64>,
2222
/// Ceiling time for request backoff.
2323
pub retry_ceiling_ms: Option<u64>,
24+
/// Query serialization format to use for HTTP requests.
25+
#[serde(default)]
26+
pub serialization_format: SerializationFormat,
27+
}
28+
29+
/// Determines query serialization format for HTTP requests.
30+
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
31+
pub enum SerializationFormat {
32+
/// Use JSON serialization (default)
33+
Json,
34+
/// Use Cap'n Proto binary serialization
35+
CapnProto,
36+
}
37+
38+
impl Default for SerializationFormat {
39+
fn default() -> Self {
40+
// Keep this the default until all hs instances are upgraded to use Cap'n Proto endpoint
41+
Self::Json
42+
}
2443
}
2544

2645
/// Config for hypersync event streaming.

hypersync-client/src/lib.rs

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use url::Url;
3535

3636
pub use column_mapping::{ColumnMapping, DataType};
3737
pub use config::HexOutput;
38-
pub use config::{ClientConfig, StreamConfig};
38+
pub use config::{ClientConfig, SerializationFormat, StreamConfig};
3939
pub use decode::Decoder;
4040
pub use decode_call::CallDecoder;
4141
pub use types::{ArrowBatch, ArrowResponse, ArrowResponseData, QueryResponse};
@@ -61,6 +61,8 @@ pub struct Client {
6161
retry_base_ms: u64,
6262
/// Ceiling time for request backoff.
6363
retry_ceiling_ms: u64,
64+
/// Query serialization format to use for HTTP requests.
65+
serialization_format: SerializationFormat,
6466
}
6567

6668
impl Client {
@@ -86,6 +88,7 @@ impl Client {
8688
retry_backoff_ms: cfg.retry_backoff_ms.unwrap_or(500),
8789
retry_base_ms: cfg.retry_base_ms.unwrap_or(200),
8890
retry_ceiling_ms: cfg.retry_ceiling_ms.unwrap_or(5_000),
91+
serialization_format: cfg.serialization_format,
8992
})
9093
}
9194

@@ -383,8 +386,8 @@ impl Client {
383386
))
384387
}
385388

386-
/// Executes query once and returns the result in (Arrow, size) format.
387-
async fn get_arrow_impl(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
389+
/// Executes query once and returns the result in (Arrow, size) format using JSON serialization.
390+
async fn get_arrow_impl_json(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
388391
let mut url = self.url.clone();
389392
let mut segments = url.path_segments_mut().ok().context("get path segments")?;
390393
segments.push("query");
@@ -418,6 +421,56 @@ impl Client {
418421
Ok((res, bytes.len().try_into().unwrap()))
419422
}
420423

424+
/// Executes query once and returns the result in (Arrow, size) format using Cap'n Proto serialization.
425+
async fn get_arrow_impl_capnp(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
426+
let mut url = self.url.clone();
427+
let mut segments = url.path_segments_mut().ok().context("get path segments")?;
428+
segments.push("query");
429+
segments.push("arrow-ipc");
430+
segments.push("capnp");
431+
std::mem::drop(segments);
432+
let mut req = self.http_client.request(Method::POST, url);
433+
434+
if let Some(bearer_token) = &self.bearer_token {
435+
req = req.bearer_auth(bearer_token);
436+
}
437+
438+
let query_bytes = query.to_bytes().context("serialize query to bytes")?;
439+
let res = req
440+
.header("content-type", "application/x-capnp")
441+
.body(query_bytes)
442+
.send()
443+
.await
444+
.context("execute http req")?;
445+
446+
let status = res.status();
447+
if !status.is_success() {
448+
let text = res.text().await.context("read text to see error")?;
449+
450+
return Err(anyhow!(
451+
"http response status code {}, err body: {}",
452+
status,
453+
text
454+
));
455+
}
456+
457+
let bytes = res.bytes().await.context("read response body bytes")?;
458+
459+
let res = tokio::task::block_in_place(|| {
460+
parse_query_response(&bytes).context("parse query response")
461+
})?;
462+
463+
Ok((res, bytes.len().try_into().unwrap()))
464+
}
465+
466+
/// Executes query once and returns the result in (Arrow, size) format.
467+
async fn get_arrow_impl(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
468+
match self.serialization_format {
469+
SerializationFormat::Json => self.get_arrow_impl_json(query).await,
470+
SerializationFormat::CapnProto => self.get_arrow_impl_capnp(query).await,
471+
}
472+
}
473+
421474
/// Executes query with retries and returns the response in Arrow format.
422475
pub async fn get_arrow(&self, query: &Query) -> Result<ArrowResponse> {
423476
self.get_arrow_with_size(query).await.map(|res| res.0)

hypersync-client/src/preset_query.rs

Lines changed: 13 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ use std::collections::BTreeSet;
33

44
use arrayvec::ArrayVec;
55
use hypersync_format::{Address, LogArgument};
6+
use hypersync_net_types::block::BlockField;
7+
use hypersync_net_types::log::LogField;
8+
use hypersync_net_types::transaction::TransactionField;
69
use hypersync_net_types::{
710
FieldSelection, LogFilter, LogSelection, Query, TransactionFilter, TransactionSelection,
811
};
@@ -12,17 +15,8 @@ use hypersync_net_types::{
1215
/// Note: this is only for quickstart purposes. For the best performance, create a custom query
1316
/// that only includes the fields you'll use in `field_selection`.
1417
pub fn blocks_and_transactions(from_block: u64, to_block: Option<u64>) -> Query {
15-
let all_block_fields: BTreeSet<String> = hypersync_schema::block_header()
16-
.fields
17-
.iter()
18-
.map(|x| x.name.clone())
19-
.collect();
20-
21-
let all_tx_fields: BTreeSet<String> = hypersync_schema::transaction()
22-
.fields
23-
.iter()
24-
.map(|x| x.name.clone())
25-
.collect();
18+
let all_block_fields = BlockField::all();
19+
let all_tx_fields = TransactionField::all();
2620

2721
Query {
2822
from_block,
@@ -45,15 +39,11 @@ pub fn blocks_and_transactions(from_block: u64, to_block: Option<u64>) -> Query
4539
/// that only includes the fields you'll use in `field_selection`.
4640
pub fn blocks_and_transaction_hashes(from_block: u64, to_block: Option<u64>) -> Query {
4741
let mut tx_field_selection = BTreeSet::new();
48-
tx_field_selection.insert("block_hash".to_owned());
49-
tx_field_selection.insert("block_number".to_owned());
50-
tx_field_selection.insert("hash".to_owned());
42+
tx_field_selection.insert(TransactionField::BlockHash);
43+
tx_field_selection.insert(TransactionField::BlockNumber);
44+
tx_field_selection.insert(TransactionField::Hash);
5145

52-
let all_block_fields: BTreeSet<String> = hypersync_schema::block_header()
53-
.fields
54-
.iter()
55-
.map(|x| x.name.clone())
56-
.collect();
46+
let all_block_fields = BlockField::all();
5747

5848
Query {
5949
from_block,
@@ -74,11 +64,7 @@ pub fn blocks_and_transaction_hashes(from_block: u64, to_block: Option<u64>) ->
7464
/// Note: this is only for quickstart purposes. For the best performance, create a custom query
7565
/// that only includes the fields you'll use in `field_selection`.
7666
pub fn logs(from_block: u64, to_block: Option<u64>, contract_address: Address) -> Query {
77-
let all_log_fields: BTreeSet<String> = hypersync_schema::log()
78-
.fields
79-
.iter()
80-
.map(|x| x.name.clone())
81-
.collect();
67+
let all_log_fields = LogField::all();
8268

8369
Query {
8470
from_block,
@@ -109,11 +95,7 @@ pub fn logs_of_event(
10995
let mut topics = ArrayVec::<Vec<LogArgument>, 4>::new();
11096
topics.insert(0, vec![topic0]);
11197

112-
let all_log_fields: BTreeSet<String> = hypersync_schema::log()
113-
.fields
114-
.iter()
115-
.map(|x| x.name.clone())
116-
.collect();
98+
let all_log_fields = LogField::all();
11799

118100
Query {
119101
from_block,
@@ -136,11 +118,7 @@ pub fn logs_of_event(
136118
/// Note: this is only for quickstart purposes. For the best performance, create a custom query
137119
/// that only includes the fields you'll use in `field_selection`.
138120
pub fn transactions(from_block: u64, to_block: Option<u64>) -> Query {
139-
let all_txn_fields: BTreeSet<String> = hypersync_schema::transaction()
140-
.fields
141-
.iter()
142-
.map(|x| x.name.clone())
143-
.collect();
121+
let all_txn_fields = TransactionField::all();
144122

145123
Query {
146124
from_block,
@@ -163,11 +141,7 @@ pub fn transactions_from_address(
163141
to_block: Option<u64>,
164142
address: Address,
165143
) -> Query {
166-
let all_txn_fields: BTreeSet<String> = hypersync_schema::transaction()
167-
.fields
168-
.iter()
169-
.map(|x| x.name.clone())
170-
.collect();
144+
let all_txn_fields = TransactionField::all();
171145

172146
Query {
173147
from_block,

hypersync-client/src/simple_types.rs

Lines changed: 17 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ use hypersync_format::{
66
AccessList, Address, Authorization, BlockNumber, BloomFilter, Data, Hash, LogArgument,
77
LogIndex, Nonce, Quantity, TransactionIndex, TransactionStatus, TransactionType, Withdrawal,
88
};
9-
use hypersync_net_types::FieldSelection;
9+
use hypersync_net_types::{
10+
block::BlockField, log::LogField, transaction::TransactionField, FieldSelection,
11+
};
1012
use nohash_hasher::IntMap;
1113
use serde::{Deserialize, Serialize};
1214
use xxhash_rust::xxh3::Xxh3Builder;
@@ -26,10 +28,10 @@ pub struct Event {
2628

2729
// Field lists for implementing event based API, these fields are used for joining
2830
// so they should always be added to the field selection.
29-
const BLOCK_JOIN_FIELD: &str = "number";
30-
const TX_JOIN_FIELD: &str = "hash";
31-
const LOG_JOIN_FIELD_WITH_TX: &str = "transaction_hash";
32-
const LOG_JOIN_FIELD_WITH_BLOCK: &str = "block_number";
31+
const BLOCK_JOIN_FIELD: BlockField = BlockField::Number;
32+
const TX_JOIN_FIELD: TransactionField = TransactionField::Hash;
33+
const LOG_JOIN_FIELD_WITH_TX: LogField = LogField::TransactionHash;
34+
const LOG_JOIN_FIELD_WITH_BLOCK: LogField = LogField::BlockNumber;
3335

3436
enum InternalJoinStrategy {
3537
NotSelected,
@@ -51,15 +53,15 @@ impl From<&FieldSelection> for InternalEventJoinStrategy {
5153
Self {
5254
block: if block_fields_num == 0 {
5355
InternalJoinStrategy::NotSelected
54-
} else if block_fields_num == 1 && field_selection.block.contains(BLOCK_JOIN_FIELD) {
56+
} else if block_fields_num == 1 && field_selection.block.contains(&BLOCK_JOIN_FIELD) {
5557
InternalJoinStrategy::OnlyLogJoinField
5658
} else {
5759
InternalJoinStrategy::FullJoin
5860
},
5961
transaction: if transaction_fields_num == 0 {
6062
InternalJoinStrategy::NotSelected
6163
} else if transaction_fields_num == 1
62-
&& field_selection.transaction.contains(TX_JOIN_FIELD)
64+
&& field_selection.transaction.contains(&TX_JOIN_FIELD)
6365
{
6466
InternalJoinStrategy::OnlyLogJoinField
6567
} else {
@@ -75,34 +77,24 @@ impl InternalEventJoinStrategy {
7577
match self.block {
7678
InternalJoinStrategy::NotSelected => (),
7779
InternalJoinStrategy::OnlyLogJoinField => {
78-
field_selection
79-
.log
80-
.insert(LOG_JOIN_FIELD_WITH_BLOCK.to_string());
81-
field_selection.block.remove(BLOCK_JOIN_FIELD);
80+
field_selection.log.insert(LOG_JOIN_FIELD_WITH_BLOCK);
81+
field_selection.block.remove(&BLOCK_JOIN_FIELD);
8282
}
8383
InternalJoinStrategy::FullJoin => {
84-
field_selection
85-
.log
86-
.insert(LOG_JOIN_FIELD_WITH_BLOCK.to_string());
87-
field_selection.block.insert(BLOCK_JOIN_FIELD.to_string());
84+
field_selection.log.insert(LOG_JOIN_FIELD_WITH_BLOCK);
85+
field_selection.block.insert(BLOCK_JOIN_FIELD);
8886
}
8987
}
9088

9189
match self.transaction {
9290
InternalJoinStrategy::NotSelected => (),
9391
InternalJoinStrategy::OnlyLogJoinField => {
94-
field_selection
95-
.log
96-
.insert(LOG_JOIN_FIELD_WITH_TX.to_string());
97-
field_selection.transaction.remove(TX_JOIN_FIELD);
92+
field_selection.log.insert(LOG_JOIN_FIELD_WITH_TX);
93+
field_selection.transaction.remove(&TX_JOIN_FIELD);
9894
}
9995
InternalJoinStrategy::FullJoin => {
100-
field_selection
101-
.log
102-
.insert(LOG_JOIN_FIELD_WITH_TX.to_string());
103-
field_selection
104-
.transaction
105-
.insert(TX_JOIN_FIELD.to_string());
96+
field_selection.log.insert(LOG_JOIN_FIELD_WITH_TX);
97+
field_selection.transaction.insert(TX_JOIN_FIELD);
10698
}
10799
}
108100
}

0 commit comments

Comments
 (0)