Skip to content

Commit accd419

Browse files
nineguamducroux
andauthored
feat(btc-adapter): implement a hybrid header cache (#6797)
Implement a hybrid header cache that would persist anchor header and its ancestors to disk. This leads to a simpler on-disk cache implementation, since it only has to persist a single chain of headers. The downside is that headers since the anchor point would be lost if the adapter is restarted, in which case it would sync more headers in order to catch up. --------- Co-authored-by: mducroux <[email protected]>
1 parent ca94383 commit accd419

File tree

6 files changed

+400
-273
lines changed

6 files changed

+400
-273
lines changed

rs/bitcoin/adapter/benches/e2e.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,12 @@ fn add_800k_block_headers(criterion: &mut Criterion) {
189189
group.bench_function("add_headers", |bench| {
190190
let rt = tokio::runtime::Runtime::new().unwrap();
191191
bench.iter(|| {
192-
let blockchain_state =
193-
BlockchainState::new(Network::Bitcoin, &MetricsRegistry::default());
192+
let blockchain_state = BlockchainState::new(
193+
Network::Bitcoin,
194+
None,
195+
&MetricsRegistry::default(),
196+
no_op_logger(),
197+
);
194198
// Headers are processed in chunks of at most MAX_HEADERS_SIZE entries
195199
for chunk in bitcoin_headers_to_add.chunks(MAX_HEADERS_SIZE) {
196200
let (added_headers, error) =

rs/bitcoin/adapter/src/blockchainmanager.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -801,7 +801,8 @@ pub mod test {
801801
where
802802
BlockchainState<Network>: HeaderValidator<Network>,
803803
{
804-
let blockchain_state = BlockchainState::new(network, &MetricsRegistry::default());
804+
let blockchain_state =
805+
BlockchainState::new(network, None, &MetricsRegistry::default(), no_op_logger());
805806
(
806807
blockchain_state.genesis(),
807808
BlockchainManager::new(

rs/bitcoin/adapter/src/blockchainstate.rs

Lines changed: 15 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,7 @@
33
use crate::common::HeaderValidator;
44
use crate::{
55
common::{BlockHeight, BlockchainBlock, BlockchainHeader, BlockchainNetwork},
6-
header_cache::{
7-
AddHeaderCacheError, AddHeaderResult, HeaderCache, HeaderNode, InMemoryHeaderCache,
8-
LMDBHeaderCache, Tip,
9-
},
6+
header_cache::{AddHeaderCacheError, AddHeaderResult, HeaderNode, HybridHeaderCache, Tip},
107
metrics::BlockchainStateMetrics,
118
};
129
use bitcoin::{BlockHash, block::Header, consensus::Encodable, dogecoin::Header as DogecoinHeader};
@@ -60,7 +57,7 @@ pub type SerializedBlock = Vec<u8>;
6057
/// The BlockChainState also maintains the child relationhips between the headers.
6158
pub struct BlockchainState<Network: BlockchainNetwork> {
6259
/// This field stores all the Bitcoin headers using a HashMap containing BlockHash and the corresponding header.
63-
header_cache: Arc<dyn HeaderCache<Header = Network::Header> + Send>,
60+
pub(crate) header_cache: Arc<HybridHeaderCache<Network::Header>>,
6461

6562
/// This field stores a hashmap containing BlockHash and the corresponding SerializedBlock.
6663
block_cache: RwLock<HashMap<BlockHash, Arc<SerializedBlock>>>,
@@ -99,28 +96,15 @@ impl<Network: BlockchainNetwork> BlockchainState<Network>
9996
where
10097
Network::Header: Send + Sync,
10198
{
102-
/// Create a new BlockChainState object with in-memory cache.
103-
pub fn new(network: Network, metrics_registry: &MetricsRegistry) -> Self {
104-
let genesis_block_header = network.genesis_block_header();
105-
let header_cache = Arc::new(InMemoryHeaderCache::new(genesis_block_header));
106-
let block_cache = RwLock::new(HashMap::new());
107-
BlockchainState {
108-
header_cache,
109-
block_cache,
110-
network,
111-
metrics: BlockchainStateMetrics::new(metrics_registry),
112-
}
113-
}
114-
115-
/// Create a new BlockChainState with on-disk cache.
116-
pub fn new_with_cache_dir(
99+
/// Create a new BlockChainState with an optional on-disk cache if cache_dir is specified.
100+
pub fn new(
117101
network: Network,
118-
cache_dir: PathBuf,
102+
cache_dir: Option<PathBuf>,
119103
metrics_registry: &MetricsRegistry,
120104
logger: ReplicaLogger,
121105
) -> Self {
122106
let genesis_block_header = network.genesis_block_header();
123-
let header_cache = Arc::new(LMDBHeaderCache::new(
107+
let header_cache = Arc::new(HybridHeaderCache::new(
124108
genesis_block_header,
125109
cache_dir,
126110
logger,
@@ -387,17 +371,22 @@ mod test {
387371
use std::collections::HashSet;
388372

389373
fn run_in_memory<R>(network: Network, test_fn: impl Fn(BlockchainState<Network>) -> R) -> R {
390-
test_fn(BlockchainState::new(network, &MetricsRegistry::default()))
374+
test_fn(BlockchainState::new(
375+
network,
376+
None,
377+
&MetricsRegistry::default(),
378+
no_op_logger(),
379+
))
391380
}
392381

393382
fn run_with_cache_dir<R>(
394383
network: Network,
395384
test_fn: impl Fn(BlockchainState<Network>) -> R,
396385
) -> R {
397386
let dir = tempdir().unwrap();
398-
test_fn(BlockchainState::new_with_cache_dir(
387+
test_fn(BlockchainState::new(
399388
network,
400-
dir.path().to_path_buf(),
389+
Some(dir.path().to_path_buf()),
401390
&MetricsRegistry::default(),
402391
no_op_logger(),
403392
))
@@ -555,7 +544,7 @@ mod test {
555544
"unsuccessfully added fork chain: {maybe_err:?}"
556545
);
557546

558-
let mut tips = state.header_cache.get_tips();
547+
let mut tips = crate::header_cache::test::get_tips(&state.header_cache);
559548
tips.sort_by(|x, y| y.work.cmp(&x.work));
560549
assert_eq!(tips.len(), 2);
561550
assert_eq!(tips[0].header.block_hash(), *last_fork_hash);

rs/bitcoin/adapter/src/get_successors_handler.rs

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{
22
collections::{HashSet, VecDeque},
3-
sync::Arc,
3+
sync::{Arc, Mutex},
44
};
55

66
use bitcoin::{BlockHash, consensus::Encodable};
@@ -90,9 +90,10 @@ pub struct GetSuccessorsHandler<Network: BlockchainNetwork> {
9090
blockchain_manager_tx: Sender<BlockchainManagerRequest>,
9191
network: Network,
9292
metrics: GetSuccessorMetrics,
93+
pruning_task_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
9394
}
9495

95-
impl<Network: BlockchainNetwork> GetSuccessorsHandler<Network> {
96+
impl<Network: BlockchainNetwork + Send + Sync> GetSuccessorsHandler<Network> {
9697
/// Creates a GetSuccessorsHandler to be used to access the blockchain state
9798
/// inside of the adapter when a `GetSuccessorsRequest` is received.
9899
pub fn new(
@@ -106,6 +107,7 @@ impl<Network: BlockchainNetwork> GetSuccessorsHandler<Network> {
106107
blockchain_manager_tx,
107108
network,
108109
metrics: GetSuccessorMetrics::new(metrics_registry),
110+
pruning_task_handle: Mutex::new(None),
109111
}
110112
}
111113

@@ -123,6 +125,21 @@ impl<Network: BlockchainNetwork> GetSuccessorsHandler<Network> {
123125
.processed_block_hashes
124126
.observe(request.processed_block_hashes.len() as f64);
125127

128+
// Spawn persist-to-disk task without waiting for it to finish, and make sure there
129+
// is only one task running at a time.
130+
let cache = self.state.header_cache.clone();
131+
let mut handle = self.pruning_task_handle.lock().unwrap();
132+
let is_finished = handle
133+
.as_ref()
134+
.map(|handle| handle.is_finished())
135+
.unwrap_or(true);
136+
if is_finished {
137+
*handle = Some(tokio::task::spawn_blocking(move || {
138+
// Error is ignored, since it is a background task
139+
let _ = cache.persist_and_prune_headers_below_anchor(request.anchor);
140+
}));
141+
}
142+
126143
let (blocks, next, obsolete_blocks) = {
127144
let anchor_height = self
128145
.state
@@ -312,6 +329,7 @@ mod test {
312329
use super::*;
313330

314331
use bitcoin::{Block, consensus::Decodable};
332+
use ic_logger::no_op_logger;
315333
use ic_metrics::MetricsRegistry;
316334
use tokio::sync::mpsc::channel;
317335

@@ -323,12 +341,18 @@ mod test {
323341
Block::consensus_decode(&mut (*serialized_block).as_slice()).unwrap()
324342
}
325343

344+
fn new_blockchain_state<Network: BlockchainNetwork>(
345+
network: Network,
346+
) -> BlockchainState<Network> {
347+
BlockchainState::new(network, None, &MetricsRegistry::default(), no_op_logger())
348+
}
349+
326350
/// This tests ensures that `BlockchainManager::get_successors(...)` will return relevant blocks
327351
/// with the next headers of many forks and enqueue missing block hashes.
328352
#[tokio::test]
329353
async fn test_get_successors() {
330354
let network = bitcoin::Network::Regtest;
331-
let blockchain_state = BlockchainState::new(network, &MetricsRegistry::default());
355+
let blockchain_state = new_blockchain_state(network);
332356
let genesis = blockchain_state.genesis();
333357
let genesis_hash = genesis.block_hash();
334358
let (blockchain_manager_tx, _blockchain_manager_rx) = channel(10);
@@ -440,7 +464,7 @@ mod test {
440464
#[tokio::test]
441465
async fn test_get_successors_wait_header_sync_regtest() {
442466
let network = bitcoin::Network::Regtest;
443-
let blockchain_state = BlockchainState::new(network, &MetricsRegistry::default());
467+
let blockchain_state = new_blockchain_state(network);
444468
let genesis = blockchain_state.genesis();
445469
let genesis_hash = genesis.block_hash();
446470
let (blockchain_manager_tx, _blockchain_manager_rx) = channel(10);
@@ -490,7 +514,7 @@ mod test {
490514
#[tokio::test]
491515
async fn test_get_successors_multiple_blocks() {
492516
let network = bitcoin::Network::Regtest;
493-
let blockchain_state = BlockchainState::new(network, &MetricsRegistry::default());
517+
let blockchain_state = new_blockchain_state(network);
494518
let genesis = blockchain_state.genesis();
495519
let genesis_hash = genesis.block_hash();
496520
let (blockchain_manager_tx, _blockchain_manager_rx) = channel(10);
@@ -564,7 +588,7 @@ mod test {
564588
#[tokio::test]
565589
async fn test_get_successors_max_num_blocks() {
566590
let network = bitcoin::Network::Regtest;
567-
let blockchain_state = BlockchainState::new(network, &MetricsRegistry::default());
591+
let blockchain_state = new_blockchain_state(network);
568592
let genesis = blockchain_state.genesis();
569593
let genesis_hash = genesis.block_hash();
570594
let (blockchain_manager_tx, _blockchain_manager_rx) = channel(10);
@@ -600,7 +624,7 @@ mod test {
600624
#[tokio::test]
601625
async fn test_get_successors_multiple_blocks_out_of_order() {
602626
let network = bitcoin::Network::Regtest;
603-
let blockchain_state = BlockchainState::new(network, &MetricsRegistry::default());
627+
let blockchain_state = new_blockchain_state(network);
604628
let genesis = blockchain_state.genesis();
605629
let genesis_hash = genesis.block_hash();
606630
let (blockchain_manager_tx, _blockchain_manager_rx) = channel(10);
@@ -694,7 +718,7 @@ mod test {
694718
#[tokio::test]
695719
async fn test_get_successors_large_block() {
696720
let network = bitcoin::Network::Regtest;
697-
let blockchain_state = BlockchainState::new(network, &MetricsRegistry::default());
721+
let blockchain_state = new_blockchain_state(network);
698722
let genesis = blockchain_state.genesis();
699723
let genesis_hash = genesis.block_hash();
700724
let (blockchain_manager_tx, _blockchain_manager_rx) = channel(10);
@@ -760,7 +784,7 @@ mod test {
760784
#[tokio::test]
761785
async fn test_get_successors_many_blocks_until_size_cap_is_met() {
762786
let network = bitcoin::Network::Regtest;
763-
let blockchain_state = BlockchainState::new(network, &MetricsRegistry::default());
787+
let blockchain_state = new_blockchain_state(network);
764788
let genesis = blockchain_state.genesis();
765789
let genesis_hash = genesis.block_hash();
766790
let (blockchain_manager_tx, _blockchain_manager_rx) = channel(10);

0 commit comments

Comments
 (0)