Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,14 @@ http_file(
url = "https://download.dfinity.systems/testdata/mainnet_headers_800k.json.gz",
)

# Contains the first 800_000 headers of the Dogecoin mainnet blockchain with auxiliary proof-of-work.
http_file(
name = "doge_headers_800k_mainnet_auxpow",
downloaded_file_path = "doge_headers_800k_mainnet_auxpow.json.gz",
sha256 = "e138d7c59d237d0eea70943b60038b8755d9aa1f04bc94f315a74775cf8bfc2d",
url = "http://download.dfinity.systems/testdata/doge/doge_headers_800k_mainnet_auxpow.json.gz",
)

# Contains blocks 350_990 to 350_999 (inclusive) of the Bitcoin mainnet blockchain.
http_file(
name = "bitcoin_adapter_mainnet_blocks",
Expand Down
2 changes: 2 additions & 0 deletions rs/bitcoin/adapter/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,11 @@ rust_bench(
data = [
# Keep sorted.
"@bitcoin_adapter_mainnet_headers//file",
"@doge_headers_800k_mainnet_auxpow//file",
],
env = {
"BITCOIN_MAINNET_HEADERS_DATA_PATH": "$(rootpath @bitcoin_adapter_mainnet_headers//file)",
"DOGECOIN_MAINNET_HEADERS_DATA_PATH": "$(rootpath @doge_headers_800k_mainnet_auxpow//file)",
},
deps = [
# Keep sorted.
Expand Down
134 changes: 102 additions & 32 deletions rs/bitcoin/adapter/benches/e2e.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use bitcoin::{BlockHash, Network, block::Header as BlockHeader};
use criterion::measurement::Measurement;
use criterion::{BenchmarkGroup, Criterion, criterion_group, criterion_main};
use criterion::{BenchmarkGroup, BenchmarkId, Criterion, criterion_group, criterion_main};
use ic_btc_adapter::{
BlockchainNetwork, BlockchainState, Config, IncomingSource, MAX_HEADERS_SIZE, start_server,
BlockchainHeader, BlockchainNetwork, BlockchainState, Config, HeaderValidator, IncomingSource,
MAX_HEADERS_SIZE, start_server,
};
use ic_btc_adapter_client::setup_bitcoin_adapter_clients;
use ic_btc_adapter_test_utils::generate_headers;
Expand All @@ -17,9 +18,9 @@ use ic_logger::replica_logger::no_op_logger;
use ic_metrics::MetricsRegistry;
use rand::{CryptoRng, Rng};
use sha2::Digest;
use std::fmt;
use std::path::{Path, PathBuf};
use std::sync::LazyLock;
use tempfile::Builder;
use tempfile::{Builder, tempdir};

type BitcoinAdapterClient = Box<
dyn RpcAdapterClient<BitcoinAdapterRequestWrapper, Response = BitcoinAdapterResponseWrapper>,
Expand Down Expand Up @@ -65,6 +66,11 @@ fn prepare(
}
}

// This simulation constructs a blockchain comprising four forks, each of 2000 blocks.
// For an extended BFS execution, the initial 1975 blocks of every branch are marked in
// the request as being processed, with the aim to receive the last 25 blocks of each fork.
// Performance metrics are captured from the sending of the deserialised request through
// to receiving the response and its deserialisation.
fn e2e(criterion: &mut Criterion) {
let network = Network::Regtest;
let mut processed_block_hashes = vec![];
Expand Down Expand Up @@ -173,36 +179,105 @@ fn random_header<const N: usize, R: Rng + CryptoRng>(rng: &mut R) -> [u8; N] {
}

fn add_800k_block_headers(criterion: &mut Criterion) {
static BITCOIN_HEADERS: LazyLock<Vec<bitcoin::block::Header>> = LazyLock::new(|| {
let headers_data_path = PathBuf::from(
std::env::var("BITCOIN_MAINNET_HEADERS_DATA_PATH")
.expect("Failed to get test data path env variable"),
);
retrieve_headers::<bitcoin::Network>(&headers_data_path)
});
// Call BITCOIN_HEADERS once before benchmarking to avoid biasing the first sample (lazy instantiation).
add_block_headers_for(
criterion,
bitcoin::Network::Bitcoin,
"BITCOIN_MAINNET_HEADERS_DATA_PATH",
800_000,
);
add_block_headers_for(
criterion,
bitcoin::dogecoin::Network::Dogecoin,
"DOGECOIN_MAINNET_HEADERS_DATA_PATH",
800_000,
);
}

fn add_block_headers_for<Network: BlockchainNetwork + fmt::Display>(
criterion: &mut Criterion,
network: Network,
headers_data_env: &str,
expected_num_headers_to_add: usize,
) where
Network::Header: for<'de> serde::Deserialize<'de>,
BlockchainState<Network>: HeaderValidator<Network>,
{
let headers_data_path = PathBuf::from(
std::env::var(headers_data_env).expect("Failed to get test data path env variable"),
);
let headers = retrieve_headers::<Network>(&headers_data_path);
// Genesis block header is automatically added when instantiating BlockchainState
let bitcoin_headers_to_add = &BITCOIN_HEADERS.as_slice()[1..];
assert_eq!(bitcoin_headers_to_add.len(), 800_000);
let mut group = criterion.benchmark_group("bitcoin_800k");
let headers_to_add = &headers.as_slice()[1..];
assert_eq!(headers_to_add.len(), expected_num_headers_to_add);
let mut group = criterion.benchmark_group(format!("{network}_{expected_num_headers_to_add}"));
group.sample_size(10);

group.bench_function("add_headers", |bench| {
let rt = tokio::runtime::Runtime::new().unwrap();
bench_add_headers(&mut group, network, headers_to_add);
}

fn bench_add_headers<M: Measurement, Network: BlockchainNetwork>(
group: &mut BenchmarkGroup<'_, M>,
network: Network,
headers: &[Network::Header],
) where
BlockchainState<Network>: HeaderValidator<Network>,
{
fn add_headers<Network: BlockchainNetwork>(
blockchain_state: &mut BlockchainState<Network>,
headers: &[Network::Header],
expect_pruning: bool,
runtime: &tokio::runtime::Runtime,
) where
BlockchainState<Network>: HeaderValidator<Network>,
{
// Genesis block header is automatically added when instantiating BlockchainState
let mut num_added_headers = 1;
// Headers are processed in chunks of at most MAX_HEADERS_SIZE entries
for chunk in headers.chunks(MAX_HEADERS_SIZE) {
let (added_headers, error) =
runtime.block_on(async { blockchain_state.add_headers(chunk).await });
assert!(error.is_none(), "Failed to add headers: {}", error.unwrap());
assert_eq!(added_headers.len(), chunk.len());
num_added_headers += added_headers.len();

runtime
.block_on(async {
blockchain_state
.persist_and_prune_headers_below_anchor(chunk.last().unwrap().block_hash())
.await
})
.unwrap();
let (num_headers_disk, num_headers_memory) = blockchain_state.num_headers().unwrap();
if expect_pruning {
assert_eq!(num_headers_disk, num_added_headers);
assert_eq!(num_headers_memory, 1);
} else {
assert_eq!(num_headers_disk, 0);
assert_eq!(num_headers_memory, num_added_headers);
}
}
}

let rt = tokio::runtime::Runtime::new().unwrap();

group.bench_function(BenchmarkId::new("add_headers", "in_memory"), |bench| {
bench.iter(|| {
let mut blockchain_state =
BlockchainState::new(network, None, &MetricsRegistry::default(), no_op_logger());
add_headers(&mut blockchain_state, headers, false, &rt);
})
});

group.bench_function(BenchmarkId::new("add_headers", "lmdb"), |bench| {
bench.iter(|| {
let blockchain_state = BlockchainState::new(
Network::Bitcoin,
None,
let dir = tempdir().unwrap();
let mut blockchain_state = BlockchainState::new(
network,
Some(dir.path().to_path_buf()),
&MetricsRegistry::default(),
no_op_logger(),
);
// Headers are processed in chunks of at most MAX_HEADERS_SIZE entries
for chunk in bitcoin_headers_to_add.chunks(MAX_HEADERS_SIZE) {
let (added_headers, error) =
rt.block_on(async { blockchain_state.add_headers(chunk).await });
assert!(error.is_none(), "Failed to add headers: {}", error.unwrap());
assert_eq!(added_headers.len(), chunk.len())
}
add_headers(&mut blockchain_state, headers, true, &rt);
})
});
}
Expand Down Expand Up @@ -232,11 +307,6 @@ fn decompress<P: AsRef<Path>>(location: P) -> Vec<u8> {
decompressed
}

// This simulation constructs a blockchain comprising four forks, each of 2000 blocks.
// For an extended BFS execution, the initial 1975 blocks of every branch are marked in
// the request as being processed, with the aim to receive the last 25 blocks of each fork.
// Performance metrics are captured from the sending of the deserialised request through
// to receiving the response and its deserialisation.
criterion_group!(benches, e2e, hash_block_header, add_800k_block_headers);

// The benchmark can be run using:
Expand Down
25 changes: 25 additions & 0 deletions rs/bitcoin/adapter/src/blockchainstate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,19 @@ where
Ok(result)
}

/// Background task to ersist headers below the anchor (as headers) and the anchor (as tip) on to disk, and
/// prune headers below the anchor from the in-memory cache.
pub fn persist_and_prune_headers_below_anchor(
&self,
anchor: BlockHash,
) -> tokio::task::JoinHandle<()> {
let header_cache = self.header_cache.clone();
tokio::task::spawn_blocking(move || {
// Error is ignored, since it is a background task
let _ = header_cache.persist_and_prune_headers_below_anchor(anchor);
})
}

/// This method adds a new block to the `block_cache`
pub async fn add_block(
&self,
Expand Down Expand Up @@ -333,6 +346,18 @@ where
.map(|block| block.len())
.sum()
}

/// Number of headers stored.
///
/// Return a pair where
/// 1. Number of headers stored on disk
/// 2. Number of headers stored in memory
pub fn num_headers(&self) -> Result<(usize, usize), String> {
self.header_cache
.get_num_headers()
// do not expose internal error type
.map_err(|e| e.to_string())
}
}

impl<Network: BlockchainNetwork> HeaderStore for BlockchainState<Network> {
Expand Down
9 changes: 4 additions & 5 deletions rs/bitcoin/adapter/src/get_successors_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,16 @@ impl<Network: BlockchainNetwork + Send + Sync> GetSuccessorsHandler<Network> {

// Spawn persist-to-disk task without waiting for it to finish, and make sure there
// is only one task running at a time.
let cache = self.state.header_cache.clone();
let mut handle = self.pruning_task_handle.lock().unwrap();
let is_finished = handle
.as_ref()
.map(|handle| handle.is_finished())
.unwrap_or(true);
if is_finished {
*handle = Some(tokio::task::spawn_blocking(move || {
// Error is ignored, since it is a background task
let _ = cache.persist_and_prune_headers_below_anchor(request.anchor);
}));
*handle = Some(
self.state
.persist_and_prune_headers_below_anchor(request.anchor),
);
}

let (blocks, next, obsolete_blocks) = {
Expand Down
40 changes: 40 additions & 0 deletions rs/bitcoin/adapter/src/header_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ pub trait HeaderCache: Send + Sync {
/// Return the number of tips.
fn get_num_tips(&self) -> usize;

/// Return the number of headers.
fn get_num_headers(&self) -> usize;

/// Return the ancestor from the given block hash to the current anchor in the
/// in-memory cache as a chain of headers, where each element is the only child
/// of the next, and the first element (tip) has no child.
Expand Down Expand Up @@ -227,6 +230,10 @@ impl<Header: BlockchainHeader + Send + Sync> HeaderCache for RwLock<InMemoryHead
self.read().unwrap().tips.len()
}

fn get_num_headers(&self) -> usize {
self.read().unwrap().cache.len()
}

fn get_ancestor_chain(&self, from: BlockHash) -> Vec<(BlockHash, HeaderNode<Header>)> {
let mut hash = from;
let mut to_persist = Vec::new();
Expand Down Expand Up @@ -362,6 +369,18 @@ impl LMDBHeaderCache {
Ok(node)
}

fn tx_get_num_headers<Tx: Transaction>(&self, tx: &Tx) -> Result<usize, LMDBCacheError> {
let num = tx
.stat(self.headers)
.map(|stat| stat.entries())
.map_err(LMDBCacheError::Lmdb)?;
assert!(
num > 0,
"BUG: LMDBHeaderCache::new_with_genesis adds the tip header key '{TIP_KEY}'"
);
Ok(num - 1)
}

fn tx_add_header<Header: BlockchainHeader>(
&self,
tx: &mut RwTransaction,
Expand Down Expand Up @@ -528,6 +547,27 @@ impl<Header: BlockchainHeader + Send + Sync + 'static> HybridHeaderCache<Header>
})
}

/// Number of headers stored.
///
/// Return a pair where
/// 1. Number of headers stored on disk
/// 2. Number of headers stored in memory
pub fn get_num_headers(&self) -> Result<(usize, usize), LMDBCacheError> {
let num_headers_in_memory = self.in_memory.get_num_headers();
if self.on_disk.is_none() {
return Ok((0, num_headers_in_memory));
}

let cache = self.on_disk.as_ref().unwrap();
let num_headers_on_disk = log_err!(
cache.run_ro_txn(|tx| cache.tx_get_num_headers(tx)),
cache.log,
"get_num_headers"
)?;

Ok((num_headers_on_disk, num_headers_in_memory))
}

/// Get a header by hash.
pub fn get_header(&self, hash: BlockHash) -> Option<HeaderNode<Header>> {
self.in_memory
Expand Down
2 changes: 1 addition & 1 deletion rs/bitcoin/adapter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ enum ProcessNetworkMessageError {
InvalidMessage,
}

/// This enum is used to represent errors that
/// Error returned by `Channel::send`.
#[derive(Debug)]
enum ChannelError {}

Expand Down
2 changes: 1 addition & 1 deletion rs/bitcoin/validation/src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
};

/// An error thrown when trying to validate a header.
#[derive(Debug, PartialEq)]
#[derive(Debug, Eq, PartialEq)]
pub enum ValidateHeaderError {
/// Used when the timestamp in the header is lower than
/// the median of timestamps of past 11 headers.
Expand Down
Loading