Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions rs/bitcoin/adapter/benches/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,12 @@ fn add_800k_block_headers(criterion: &mut Criterion) {
group.bench_function("add_headers", |bench| {
let rt = tokio::runtime::Runtime::new().unwrap();
bench.iter(|| {
let blockchain_state =
BlockchainState::new(Network::Bitcoin, &MetricsRegistry::default());
let blockchain_state = BlockchainState::new(
Network::Bitcoin,
None,
&MetricsRegistry::default(),
no_op_logger(),
);
// Headers are processed in chunks of at most MAX_HEADERS_SIZE entries
for chunk in bitcoin_headers_to_add.chunks(MAX_HEADERS_SIZE) {
let (added_headers, error) =
Expand Down
3 changes: 2 additions & 1 deletion rs/bitcoin/adapter/src/blockchainmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,8 @@ pub mod test {
where
BlockchainState<Network>: HeaderValidator<Network>,
{
let blockchain_state = BlockchainState::new(network, &MetricsRegistry::default());
let blockchain_state =
BlockchainState::new(network, None, &MetricsRegistry::default(), no_op_logger());
(
blockchain_state.genesis(),
BlockchainManager::new(
Expand Down
41 changes: 15 additions & 26 deletions rs/bitcoin/adapter/src/blockchainstate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
use crate::common::HeaderValidator;
use crate::{
common::{BlockHeight, BlockchainBlock, BlockchainHeader, BlockchainNetwork},
header_cache::{
AddHeaderCacheError, AddHeaderResult, HeaderCache, HeaderNode, InMemoryHeaderCache,
LMDBHeaderCache, Tip,
},
header_cache::{AddHeaderCacheError, AddHeaderResult, HeaderNode, HybridHeaderCache, Tip},
metrics::BlockchainStateMetrics,
};
use bitcoin::{BlockHash, block::Header, consensus::Encodable, dogecoin::Header as DogecoinHeader};
Expand Down Expand Up @@ -60,7 +57,7 @@ pub type SerializedBlock = Vec<u8>;
/// The BlockChainState also maintains the child relationhips between the headers.
pub struct BlockchainState<Network: BlockchainNetwork> {
/// This field stores all the Bitcoin headers using a HashMap containing BlockHash and the corresponding header.
header_cache: Arc<dyn HeaderCache<Header = Network::Header> + Send>,
pub(crate) header_cache: Arc<HybridHeaderCache<Network::Header>>,

/// This field stores a hashmap containing BlockHash and the corresponding SerializedBlock.
block_cache: RwLock<HashMap<BlockHash, Arc<SerializedBlock>>>,
Expand Down Expand Up @@ -99,28 +96,15 @@ impl<Network: BlockchainNetwork> BlockchainState<Network>
where
Network::Header: Send + Sync,
{
/// Create a new BlockChainState object with in-memory cache.
pub fn new(network: Network, metrics_registry: &MetricsRegistry) -> Self {
let genesis_block_header = network.genesis_block_header();
let header_cache = Arc::new(InMemoryHeaderCache::new(genesis_block_header));
let block_cache = RwLock::new(HashMap::new());
BlockchainState {
header_cache,
block_cache,
network,
metrics: BlockchainStateMetrics::new(metrics_registry),
}
}

/// Create a new BlockChainState with on-disk cache.
pub fn new_with_cache_dir(
/// Create a new BlockChainState with an optional on-disk cache if cache_dir is specified.
pub fn new(
network: Network,
cache_dir: PathBuf,
cache_dir: Option<PathBuf>,
metrics_registry: &MetricsRegistry,
logger: ReplicaLogger,
) -> Self {
let genesis_block_header = network.genesis_block_header();
let header_cache = Arc::new(LMDBHeaderCache::new(
let header_cache = Arc::new(HybridHeaderCache::new(
genesis_block_header,
cache_dir,
logger,
Expand Down Expand Up @@ -387,17 +371,22 @@ mod test {
use std::collections::HashSet;

fn run_in_memory<R>(network: Network, test_fn: impl Fn(BlockchainState<Network>) -> R) -> R {
test_fn(BlockchainState::new(network, &MetricsRegistry::default()))
test_fn(BlockchainState::new(
network,
None,
&MetricsRegistry::default(),
no_op_logger(),
))
}

fn run_with_cache_dir<R>(
network: Network,
test_fn: impl Fn(BlockchainState<Network>) -> R,
) -> R {
let dir = tempdir().unwrap();
test_fn(BlockchainState::new_with_cache_dir(
test_fn(BlockchainState::new(
network,
dir.path().to_path_buf(),
Some(dir.path().to_path_buf()),
&MetricsRegistry::default(),
no_op_logger(),
))
Expand Down Expand Up @@ -555,7 +544,7 @@ mod test {
"unsuccessfully added fork chain: {maybe_err:?}"
);

let mut tips = state.header_cache.get_tips();
let mut tips = crate::header_cache::test::get_tips(&state.header_cache);
tips.sort_by(|x, y| y.work.cmp(&x.work));
assert_eq!(tips.len(), 2);
assert_eq!(tips[0].header.block_hash(), *last_fork_hash);
Expand Down
42 changes: 33 additions & 9 deletions rs/bitcoin/adapter/src/get_successors_handler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
collections::{HashSet, VecDeque},
sync::Arc,
sync::{Arc, Mutex},
};

use bitcoin::{BlockHash, consensus::Encodable};
Expand Down Expand Up @@ -90,9 +90,10 @@ pub struct GetSuccessorsHandler<Network: BlockchainNetwork> {
blockchain_manager_tx: Sender<BlockchainManagerRequest>,
network: Network,
metrics: GetSuccessorMetrics,
pruning_task_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
}

impl<Network: BlockchainNetwork> GetSuccessorsHandler<Network> {
impl<Network: BlockchainNetwork + Send + Sync> GetSuccessorsHandler<Network> {
/// Creates a GetSuccessorsHandler to be used to access the blockchain state
/// inside of the adapter when a `GetSuccessorsRequest` is received.
pub fn new(
Expand All @@ -106,6 +107,7 @@ impl<Network: BlockchainNetwork> GetSuccessorsHandler<Network> {
blockchain_manager_tx,
network,
metrics: GetSuccessorMetrics::new(metrics_registry),
pruning_task_handle: Mutex::new(None),
}
}

Expand All @@ -123,6 +125,21 @@ impl<Network: BlockchainNetwork> GetSuccessorsHandler<Network> {
.processed_block_hashes
.observe(request.processed_block_hashes.len() as f64);

// Spawn persist-to-disk task without waiting for it to finish, and make sure there
// is only one task running at a time.
let cache = self.state.header_cache.clone();
let mut handle = self.pruning_task_handle.lock().unwrap();
let is_finished = handle
.as_ref()
.map(|handle| handle.is_finished())
.unwrap_or(true);
if is_finished {
*handle = Some(tokio::task::spawn_blocking(move || {
// Error is ignored, since it is a background task
let _ = cache.persist_and_prune_headers_below_anchor(request.anchor);
}));
}

let (blocks, next, obsolete_blocks) = {
let anchor_height = self
.state
Expand Down Expand Up @@ -312,6 +329,7 @@ mod test {
use super::*;

use bitcoin::{Block, consensus::Decodable};
use ic_logger::no_op_logger;
use ic_metrics::MetricsRegistry;
use tokio::sync::mpsc::channel;

Expand All @@ -323,12 +341,18 @@ mod test {
Block::consensus_decode(&mut (*serialized_block).as_slice()).unwrap()
}

fn new_blockchain_state<Network: BlockchainNetwork>(
network: Network,
) -> BlockchainState<Network> {
BlockchainState::new(network, None, &MetricsRegistry::default(), no_op_logger())
}

/// This tests ensures that `BlockchainManager::get_successors(...)` will return relevant blocks
/// with the next headers of many forks and enqueue missing block hashes.
#[tokio::test]
async fn test_get_successors() {
let network = bitcoin::Network::Regtest;
let blockchain_state = BlockchainState::new(network, &MetricsRegistry::default());
let blockchain_state = new_blockchain_state(network);
let genesis = blockchain_state.genesis();
let genesis_hash = genesis.block_hash();
let (blockchain_manager_tx, _blockchain_manager_rx) = channel(10);
Expand Down Expand Up @@ -440,7 +464,7 @@ mod test {
#[tokio::test]
async fn test_get_successors_wait_header_sync_regtest() {
let network = bitcoin::Network::Regtest;
let blockchain_state = BlockchainState::new(network, &MetricsRegistry::default());
let blockchain_state = new_blockchain_state(network);
let genesis = blockchain_state.genesis();
let genesis_hash = genesis.block_hash();
let (blockchain_manager_tx, _blockchain_manager_rx) = channel(10);
Expand Down Expand Up @@ -490,7 +514,7 @@ mod test {
#[tokio::test]
async fn test_get_successors_multiple_blocks() {
let network = bitcoin::Network::Regtest;
let blockchain_state = BlockchainState::new(network, &MetricsRegistry::default());
let blockchain_state = new_blockchain_state(network);
let genesis = blockchain_state.genesis();
let genesis_hash = genesis.block_hash();
let (blockchain_manager_tx, _blockchain_manager_rx) = channel(10);
Expand Down Expand Up @@ -564,7 +588,7 @@ mod test {
#[tokio::test]
async fn test_get_successors_max_num_blocks() {
let network = bitcoin::Network::Regtest;
let blockchain_state = BlockchainState::new(network, &MetricsRegistry::default());
let blockchain_state = new_blockchain_state(network);
let genesis = blockchain_state.genesis();
let genesis_hash = genesis.block_hash();
let (blockchain_manager_tx, _blockchain_manager_rx) = channel(10);
Expand Down Expand Up @@ -600,7 +624,7 @@ mod test {
#[tokio::test]
async fn test_get_successors_multiple_blocks_out_of_order() {
let network = bitcoin::Network::Regtest;
let blockchain_state = BlockchainState::new(network, &MetricsRegistry::default());
let blockchain_state = new_blockchain_state(network);
let genesis = blockchain_state.genesis();
let genesis_hash = genesis.block_hash();
let (blockchain_manager_tx, _blockchain_manager_rx) = channel(10);
Expand Down Expand Up @@ -694,7 +718,7 @@ mod test {
#[tokio::test]
async fn test_get_successors_large_block() {
let network = bitcoin::Network::Regtest;
let blockchain_state = BlockchainState::new(network, &MetricsRegistry::default());
let blockchain_state = new_blockchain_state(network);
let genesis = blockchain_state.genesis();
let genesis_hash = genesis.block_hash();
let (blockchain_manager_tx, _blockchain_manager_rx) = channel(10);
Expand Down Expand Up @@ -760,7 +784,7 @@ mod test {
#[tokio::test]
async fn test_get_successors_many_blocks_until_size_cap_is_met() {
let network = bitcoin::Network::Regtest;
let blockchain_state = BlockchainState::new(network, &MetricsRegistry::default());
let blockchain_state = new_blockchain_state(network);
let genesis = blockchain_state.genesis();
let genesis_hash = genesis.block_hash();
let (blockchain_manager_tx, _blockchain_manager_rx) = channel(10);
Expand Down
Loading
Loading