diff --git a/Cargo.lock b/Cargo.lock index b32bac0..07c0438 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -212,8 +212,10 @@ dependencies = [ "anyhow", "caryatid_sdk", "config", + "dashmap", "hex", "imbl", + "rayon", "serde", "serde_json", "serde_with 3.14.0", @@ -4366,9 +4368,9 @@ dependencies = [ [[package]] name = "rayon" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f" dependencies = [ "either", "rayon-core", @@ -4376,9 +4378,9 @@ dependencies = [ [[package]] name = "rayon-core" -version = "1.12.1" +version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91" dependencies = [ "crossbeam-deque", "crossbeam-utils", diff --git a/common/src/messages.rs b/common/src/messages.rs index 8ec26c2..a5f784b 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -118,6 +118,9 @@ pub struct EpochActivityMessage { /// Total fees in this epoch pub total_fees: u64, + /// Fees by VRF vkey hash + pub fees: Vec<(KeyHash, u64)>, + /// List of all VRF vkey hashes used on blocks (SPO indicator) and /// number of blocks produced pub vrf_vkey_hashes: Vec<(KeyHash, usize)>, diff --git a/modules/epoch_activity_counter/src/state.rs b/modules/epoch_activity_counter/src/state.rs index 5cc21a9..4146f15 100644 --- a/modules/epoch_activity_counter/src/state.rs +++ b/modules/epoch_activity_counter/src/state.rs @@ -5,23 +5,31 @@ use acropolis_common::{ messages::{CardanoMessage, EpochActivityMessage, Message}, BlockInfo, KeyHash, }; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, VecDeque}; use std::sync::Arc; -use tracing::info; +use tracing::{error, info}; pub struct State { // Current epoch number current_epoch: u64, // Map of counts by VRF key hashes - vrf_vkey_hashes: HashMap, + blocks_minted: HashMap, // Total blocks seen this epoch total_blocks: usize, + // Maps of fees by vrf key hash + fees: HashMap, + // Total fees seen this epoch total_fees: u64, + // Blocks seen this epoch + // removed when we calculate the fee of the block + // and cleared when we end the epoch + blocks: VecDeque<(u64, KeyHash)>, + // History of epochs (disabled by default) epoch_history: Option>, } @@ -31,9 +39,11 @@ impl State { pub fn new(store_history: bool) -> Self { Self { current_epoch: 0, - vrf_vkey_hashes: HashMap::new(), + blocks_minted: HashMap::new(), total_blocks: 0, + fees: HashMap::new(), total_fees: 0, + blocks: VecDeque::new(), epoch_history: if store_history { Some(BTreeMap::new()) } else { @@ -43,18 +53,35 @@ impl State { } // Handle a block minting, taking the SPO's VRF vkey - pub fn handle_mint(&mut self, _block: &BlockInfo, vrf_vkey: Option<&[u8]>) { + pub fn handle_mint(&mut self, block: &BlockInfo, vrf_vkey: Option<&[u8]>) { self.total_blocks += 1; - if let Some(vrf_vkey) = vrf_vkey { // Count one on this hash - *(self.vrf_vkey_hashes.entry(keyhash(vrf_vkey)).or_insert(0)) += 1; + *(self.blocks_minted.entry(keyhash(vrf_vkey)).or_insert(0)) += 1; + + // Add the block to the queue + self.blocks.push_back((block.number, keyhash(vrf_vkey))); } } // Handle block fees - pub fn handle_fees(&mut self, _block: &BlockInfo, total_fees: u64) { - self.total_fees += total_fees; + pub fn handle_fees(&mut self, block: &BlockInfo, block_fee: u64) { + self.total_fees += block_fee; + + // find the block in the queue + loop { + let Some((front_number, vrf_key_hash)) = self.blocks.pop_front() else { + break; + }; + if block.number > front_number { + // if CardanoMessage::BlockFees is received before CardanoMessage::BlockHeader. + error!("CardanoMessage::BlockFees is received before CardanoMessage::BlockHeader."); + } else if front_number == block.number { + // add this fee to fees for this vrf key hash + *(self.fees.entry(vrf_key_hash).or_insert(0)) += block_fee; + break; + } + } } // Handle end of epoch, returns message to be published @@ -62,7 +89,7 @@ impl State { info!( epoch, total_blocks = self.total_blocks, - unique_vrf_keys = self.vrf_vkey_hashes.len(), + unique_vrf_keys = self.blocks_minted.len(), total_fees = self.total_fees, "End of epoch" ); @@ -80,8 +107,9 @@ impl State { self.current_epoch = epoch + 1; self.total_blocks = 0; - self.vrf_vkey_hashes.clear(); + self.blocks_minted.clear(); self.total_fees = 0; + self.fees.clear(); message } @@ -91,7 +119,8 @@ impl State { epoch: self.current_epoch, total_blocks: self.total_blocks, total_fees: self.total_fees, - vrf_vkey_hashes: self.vrf_vkey_hashes.iter().map(|(k, v)| (k.clone(), *v)).collect(), + fees: self.fees.iter().map(|(k, v)| (k.clone(), *v)).collect(), + vrf_vkey_hashes: self.blocks_minted.iter().map(|(k, v)| (k.clone(), *v)).collect(), } } @@ -112,7 +141,7 @@ impl State { pub fn get_blocks_minted_by_pools(&self, vrf_key_hashes: &Vec) -> Vec { vrf_key_hashes .iter() - .map(|key_hash| self.vrf_vkey_hashes.get(key_hash).map(|v| *v as u64).unwrap_or(0)) + .map(|key_hash| self.blocks_minted.get(key_hash).map(|v| *v as u64).unwrap_or(0)) .collect() } } @@ -143,47 +172,81 @@ mod tests { let state = State::new(false); assert_eq!(state.total_blocks, 0); assert_eq!(state.total_fees, 0); - assert!(state.vrf_vkey_hashes.is_empty()); + assert!(state.blocks_minted.is_empty()); + assert!(state.fees.is_empty()); } #[test] fn handle_mint_single_vrf_records_counts() { let mut state = State::new(false); let vrf = b"vrf_key"; - let block = make_block(100); + let mut block = make_block(100); state.handle_mint(&block, Some(vrf)); + state.handle_fees(&block, 100); + + block.number += 1; state.handle_mint(&block, Some(vrf)); + state.handle_fees(&block, 200); assert_eq!(state.total_blocks, 2); - assert_eq!(state.vrf_vkey_hashes.len(), 1); - assert_eq!(state.vrf_vkey_hashes.get(&keyhash(vrf)), Some(&2)); + assert_eq!(state.blocks_minted.len(), 1); + assert_eq!(state.blocks_minted.get(&keyhash(vrf)), Some(&2)); + assert_eq!(state.fees.get(&keyhash(vrf)), Some(&300)); + assert_eq!(state.blocks.len(), 0); } #[test] fn handle_mint_multiple_vrf_records_counts() { let mut state = State::new(false); - let block = make_block(100); + let mut block = make_block(100); state.handle_mint(&block, Some(b"vrf_1")); + block.number += 1; state.handle_mint(&block, Some(b"vrf_2")); + block.number += 1; state.handle_mint(&block, Some(b"vrf_2")); assert_eq!(state.total_blocks, 3); - assert_eq!(state.vrf_vkey_hashes.len(), 2); + assert_eq!(state.blocks_minted.len(), 2); + assert_eq!(state.blocks.len(), 3); assert_eq!( - state.vrf_vkey_hashes.iter().find(|(k, _)| *k == &keyhash(b"vrf_1")).map(|(_, v)| *v), + state.blocks_minted.iter().find(|(k, _)| *k == &keyhash(b"vrf_1")).map(|(_, v)| *v), Some(1) ); assert_eq!( - state.vrf_vkey_hashes.iter().find(|(k, _)| *k == &keyhash(b"vrf_2")).map(|(_, v)| *v), + state.blocks_minted.iter().find(|(k, _)| *k == &keyhash(b"vrf_2")).map(|(_, v)| *v), Some(2) ); + + block = make_block(100); + state.handle_fees(&block, 100); + block.number += 1; + state.handle_fees(&block, 200); + block.number += 1; + state.handle_fees(&block, 300); + + assert_eq!(state.blocks.len(), 0); + assert_eq!(state.fees.len(), 2); + assert_eq!( + state.fees.iter().find(|(k, _)| *k == &keyhash(b"vrf_1")).map(|(_, v)| *v), + Some(100) + ); + assert_eq!( + state.fees.iter().find(|(k, _)| *k == &keyhash(b"vrf_2")).map(|(_, v)| *v), + Some(500) + ); } #[test] fn handle_fees_counts_fees() { let mut state = State::new(false); - let block = make_block(100); + let mut block = make_block(100); + + state.blocks = VecDeque::from([ + (block.number, keyhash(b"vrf_1")), + (block.number + 1, keyhash(b"vrf_2")), + ]); state.handle_fees(&block, 100); + block.number += 1; state.handle_fees(&block, 250); assert_eq!(state.total_fees, 350); @@ -205,6 +268,7 @@ mod tests { assert_eq!(ea.total_blocks, 1); assert_eq!(ea.total_fees, 123); assert_eq!(ea.vrf_vkey_hashes.len(), 1); + assert_eq!(ea.fees.len(), 1); assert_eq!( ea.vrf_vkey_hashes .iter() @@ -212,6 +276,10 @@ mod tests { .map(|(_, v)| *v), Some(1) ); + assert_eq!( + ea.fees.iter().find(|(k, _)| k == &keyhash(b"vrf_1")).map(|(_, v)| *v), + Some(123) + ); } _ => panic!("Expected EpochActivity message"), } @@ -220,7 +288,9 @@ mod tests { assert_eq!(state.current_epoch, 1); assert_eq!(state.total_blocks, 0); assert_eq!(state.total_fees, 0); - assert!(state.vrf_vkey_hashes.is_empty()); + assert!(state.blocks_minted.is_empty()); + assert!(state.fees.is_empty()); + assert!(state.blocks.is_empty()); } #[test] diff --git a/modules/spo_state/Cargo.toml b/modules/spo_state/Cargo.toml index 13f68bd..997f0d3 100644 --- a/modules/spo_state/Cargo.toml +++ b/modules/spo_state/Cargo.toml @@ -20,6 +20,8 @@ serde_json = "1.0.132" serde_with = { version = "3.12.0", features = ["hex"] } hex = "0.4.3" imbl = { version = "5.0.0", features = ["serde"] } +dashmap = "6.1.0" +rayon = "1.11.0" [lib] path = "src/spo_state.rs" diff --git a/modules/spo_state/src/spo_state.rs b/modules/spo_state/src/spo_state.rs index 5dfc840..ec80835 100644 --- a/modules/spo_state/src/spo_state.rs +++ b/modules/spo_state/src/spo_state.rs @@ -30,6 +30,8 @@ const DEFAULT_SPO_STATE_TOPIC: &str = "cardano.spo.state"; const DEFAULT_SPDD_SUBSCRIBE_TOPIC: &str = "cardano.spo.distribution"; const DEFAULT_EPOCH_ACTIVITY_TOPIC: &str = "cardano.epoch.activity"; +const DEFAULT_STORE_HISTORY: (&str, bool) = ("store-history", false); + const POOLS_STATE_TOPIC: &str = "pools-state"; /// SPO State module #[module( @@ -152,6 +154,9 @@ impl SPOState { .unwrap_or(DEFAULT_EPOCH_ACTIVITY_TOPIC.to_string()); info!("Creating subscriber on '{epoch_activity_topic}'"); + let store_history = + config.get_bool(DEFAULT_STORE_HISTORY.0).unwrap_or(DEFAULT_STORE_HISTORY.1); + let maybe_snapshot_topic = config .get_string("snapshot-topic") .ok() @@ -162,7 +167,7 @@ impl SPOState { .unwrap_or(DEFAULT_SPO_STATE_TOPIC.to_string()); info!("Creating SPO state publisher on '{spo_state_topic}'"); - let state = Arc::new(Mutex::new(State::new())); + let state = Arc::new(Mutex::new(State::new(store_history))); // handle pools-state let state_rest_blockfrost = state.clone(); @@ -195,19 +200,12 @@ impl SPOState { pools_operators, epoch, } => { - if let Some((active_stakes, total_active_stake)) = - guard.get_pools_active_stakes(pools_operators, *epoch) - { - PoolsStateQueryResponse::PoolsActiveStakes(PoolsActiveStakes { - active_stakes, - total_active_stake, - }) - } else { - PoolsStateQueryResponse::PoolsActiveStakes(PoolsActiveStakes { - active_stakes: vec![0; pools_operators.len()], - total_active_stake: 0, - }) - } + let (active_stakes, total_active_stake) = + guard.get_pools_active_stakes(pools_operators, *epoch); + PoolsStateQueryResponse::PoolsActiveStakes(PoolsActiveStakes { + active_stakes, + total_active_stake, + }) } PoolsStateQuery::GetPoolsTotalBlocksMinted { vrf_key_hashes } => { diff --git a/modules/spo_state/src/state.rs b/modules/spo_state/src/state.rs index e0246e9..6a5b70d 100644 --- a/modules/spo_state/src/state.rs +++ b/modules/spo_state/src/state.rs @@ -11,7 +11,9 @@ use acropolis_common::{ BlockInfo, KeyHash, PoolRegistration, PoolRetirement, TxCertificate, }; use anyhow::Result; +use dashmap::DashMap; use imbl::HashMap; +use rayon::prelude::*; use serde_with::{hex::Hex, serde_as}; use std::collections::{BTreeMap, VecDeque}; use std::sync::Arc; @@ -87,26 +89,35 @@ impl From<&BlockState> for SPOState { } } +/// Epoch State +/// Store active_stakes, blocks_minted, delegators_count, rewards, fees +/// #[serde_as] -#[derive(Debug, Clone, serde::Serialize)] -pub struct ActiveStakesState { - /// Snapshot is taken at Epoch N - 1 to N boundary - /// the first block of Epoch N - block: u64, - /// Epoch N (snapshot is Mark) +#[derive(Default, Debug, Clone, serde::Serialize)] +pub struct EpochState { + /// epoch number N epoch: u64, - /// active stakes for each pool operator for each epoch - /// Epoch in key is (Epoch N + 1 when snapshot becomes Set) - #[serde_as(as = "SerializeMapAs<_, SerializeMapAs>")] - active_stakes: HashMap>, + /// blocks minted during the epoch + blocks_minted: Option, + /// active stakes of the epoch (taken boundary from epoch N-2 to N-1) + active_stakes: Option, + /// delegators count by the end of the epoch + delegators_count: Option, + /// rewards of the epoch + rewards: Option, + /// fees of the epoch + fees: Option, } -impl ActiveStakesState { - pub fn new() -> Self { +impl EpochState { + pub fn new(epoch: u64) -> Self { Self { - block: 0, - epoch: 0, - active_stakes: HashMap::new(), + epoch, + blocks_minted: None, + active_stakes: None, + delegators_count: None, + rewards: None, + fees: None, } } } @@ -139,8 +150,13 @@ pub struct State { /// Volatile states, one per volatile block history: VecDeque, - /// Volatile active stakes state, one per epoch (in case new epoch block is rolled back) - active_stakes_history: VecDeque, + /// Epoch History for each pool operator + epochs_history: Option>>>, + + /// Active stakes for each pool operator + /// (epoch number, active stake) + /// Pop on first element when epoch number is greater than the epoch number + pub active_stakes: DashMap>, /// Volatile total blocks minted state, one per epoch (in case new epoch block is rolled back) total_blocks_minted_history: VecDeque, @@ -148,10 +164,15 @@ pub struct State { impl State { // Construct with optional publisher - pub fn new() -> Self { + pub fn new(store_history: bool) -> Self { Self { history: VecDeque::::new(), - active_stakes_history: VecDeque::::new(), + epochs_history: if store_history { + Some(Arc::new(DashMap::new())) + } else { + None + }, + active_stakes: DashMap::new(), total_blocks_minted_history: VecDeque::::new(), } } @@ -160,10 +181,6 @@ impl State { self.history.back() } - pub fn current_active_stakes_state(&self) -> Option<&ActiveStakesState> { - self.active_stakes_history.back() - } - pub fn current_total_blocks_minted_state(&self) -> Option<&TotalBlocksMintedState> { self.total_blocks_minted_history.back() } @@ -194,19 +211,37 @@ impl State { /// * `pools_operators` - A vector of pool operator hashes /// * `epoch` - The epoch to get the active stakes for /// ## Returns - /// `Option<(Vec, u64)>` - a vector of active stakes for each pool operator and the total active stake. + /// `(Vec, u64)` - a vector of active stakes for each pool operator and the total active stake. pub fn get_pools_active_stakes( &self, pools_operators: &Vec, epoch: u64, - ) -> Option<(Vec, u64)> { - let current = self.current_active_stakes_state()?; - current.active_stakes.get(&epoch).map(|stakes| { - let total_active_stake = stakes.values().sum(); - let pools_active_stakes = - pools_operators.iter().map(|spo| stakes.get(spo).cloned().unwrap_or(0)).collect(); - (pools_active_stakes, total_active_stake) - }) + ) -> (Vec, u64) { + let active_stakes = pools_operators + .par_iter() + .map(|spo| { + self.active_stakes + .get(spo) + .map(|stakes| { + stakes + .iter() + .find(|(e, _)| *e == epoch) + .map(|(_, stake)| *stake) + .or_else(|| { + error!( + "Active stakes for pool {} at epoch {} not found", + hex::encode(spo), + epoch + ); + None + }) + .unwrap_or(0) + }) + .unwrap_or(0) + }) + .collect::>(); + let total_active_stake = active_stakes.iter().sum(); + (active_stakes, total_active_stake) } /// Get total blocks minted for each pool operator @@ -280,30 +315,6 @@ impl State { } } - fn get_previous_active_stakes_state(&mut self, block_number: u64) -> ActiveStakesState { - loop { - match self.active_stakes_history.back() { - Some(state) => { - if state.block >= block_number { - info!( - "Rolling back SPO active stakes state for block {}", - state.block - ); - self.active_stakes_history.pop_back(); - } else { - break; - } - } - _ => break, - } - } - if let Some(current) = self.active_stakes_history.back() { - current.clone() - } else { - ActiveStakesState::new() - } - } - fn get_previous_total_blocks_minted_state( &mut self, block_number: u64, @@ -477,26 +488,37 @@ impl State { spdd_message.epoch, block.number, block.epoch ); let SPOStakeDistributionMessage { spos, .. } = spdd_message; - let current = self.get_previous_active_stakes_state(block.number); - let mut active_stakes = current.active_stakes.clone(); - active_stakes.insert( - block.epoch + 1, - HashMap::from_iter(spos.iter().map(|(key, value)| (key.clone(), value.active))), - ); - let new_state = ActiveStakesState { - block: block.number, - epoch: block.epoch, - active_stakes, - }; + spos.par_iter().for_each(|(key, value)| { + { + // update active stakes + let mut active_stakes = + self.active_stakes.entry(key.clone()).or_insert(VecDeque::new()); - // Prune old history which can not be rolled back to - if let Some(front) = self.active_stakes_history.front() { - if current.block > front.block + SECURITY_PARAMETER_K as u64 { - self.active_stakes_history.pop_front(); + // pop active stake of epoch which is less than current epoch + loop { + let Some((front_epoch, _)) = active_stakes.front() else { + break; + }; + if *front_epoch < block.epoch { + active_stakes.pop_front(); + } else { + break; + } + } + active_stakes.push_back((block.epoch + 1, value.active)); } - } - self.active_stakes_history.push_back(new_state); + { + // update epochs history if enabled + if let Some(epochs_history) = &self.epochs_history { + let mut current = epochs_history.entry(key.clone()).or_insert(BTreeMap::new()); + current + .entry(block.epoch + 1) + .or_insert(EpochState::new(block.epoch + 1)) + .active_stakes = Some(value.active); + } + } + }); } /// Handle Epoch Activity @@ -510,20 +532,25 @@ impl State { epoch_activity_message.epoch, block.number, block.epoch ); let EpochActivityMessage { - vrf_vkey_hashes, .. + vrf_vkey_hashes, + fees, + .. } = epoch_activity_message; + let current = self.get_previous_total_blocks_minted_state(block.number); let mut total_blocks_minted = current.total_blocks_minted.clone(); vrf_vkey_hashes.iter().for_each(|(vrf_vkey_hash, amount)| { - let Some(v) = total_blocks_minted.get_mut(vrf_vkey_hash) else { - total_blocks_minted.insert(vrf_vkey_hash.clone(), *amount as u64); - return; - }; - *v += *amount as u64; + *(total_blocks_minted.entry(vrf_vkey_hash.clone()).or_insert(0)) += *amount as u64; + + // update epochs history if enabled + if let Some(epochs_history) = &self.epochs_history { + // since we have vrf_key_hash, and key is pool_operator. + // need to think about it. + } }); - let new_state = TotalBlocksMintedState { + let new_total_blocks_minted_state = TotalBlocksMintedState { block: block.number, epoch: block.epoch, total_blocks_minted, @@ -535,7 +562,7 @@ impl State { self.total_blocks_minted_history.pop_front(); } } - self.total_blocks_minted_history.push_back(new_state); + self.total_blocks_minted_history.push_back(new_total_blocks_minted_state); } pub fn bootstrap(&mut self, state: SPOState) { @@ -558,16 +585,14 @@ pub mod tests { #[tokio::test] async fn new_state_is_empty() { - let state = State::new(); + let state = State::new(false); assert_eq!(0, state.history.len()); - assert_eq!(0, state.active_stakes_history.len()); } #[tokio::test] async fn current_on_new_state_returns_none() { - let state = State::new(); + let state = State::new(false); assert!(state.current().is_none()); - assert!(state.current_active_stakes_state().is_none()); } fn new_msg() -> TxCertificatesMessage { @@ -597,7 +622,7 @@ pub mod tests { #[tokio::test] async fn state_is_not_empty_after_handle_tx_certs() { - let mut state = State::new(); + let mut state = State::new(false); let msg = new_msg(); let block = new_block(); assert!(state.handle_tx_certs(&block, &msg).is_ok()); @@ -606,16 +631,15 @@ pub mod tests { #[tokio::test] async fn active_stakes_state_is_not_empty_after_handle_spdd() { - let mut state = State::new(); + let mut state = State::new(false); let msg = new_spdd_message(); let block = new_block(); state.handle_spdd(&block, &msg); - assert_eq!(1, state.active_stakes_history.len()); } #[tokio::test] async fn spo_gets_registered() { - let mut state = State::new(); + let mut state = State::new(false); let mut msg = new_msg(); msg.certificates.push(TxCertificate::PoolRegistration(PoolRegistration { operator: vec![0], @@ -644,7 +668,7 @@ pub mod tests { #[tokio::test] async fn pending_deregistration_gets_queued() { - let mut state = State::new(); + let mut state = State::new(false); let mut msg = new_msg(); msg.certificates.push(TxCertificate::PoolRetirement(PoolRetirement { operator: vec![0], @@ -667,7 +691,7 @@ pub mod tests { #[tokio::test] async fn second_pending_deregistration_gets_queued() { - let mut state = State::new(); + let mut state = State::new(false); let mut msg = new_msg(); msg.certificates.push(TxCertificate::PoolRetirement(PoolRetirement { operator: vec![0], @@ -698,7 +722,7 @@ pub mod tests { #[tokio::test] async fn rollback_removes_second_pending_deregistration() { - let mut state = State::new(); + let mut state = State::new(false); let mut msg = new_msg(); msg.certificates.push(TxCertificate::PoolRetirement(PoolRetirement { operator: vec![0], @@ -731,7 +755,7 @@ pub mod tests { #[tokio::test] async fn spo_gets_deregistered() { - let mut state = State::new(); + let mut state = State::new(false); let mut msg = new_msg(); msg.certificates.push(TxCertificate::PoolRegistration(PoolRegistration { operator: vec![0], @@ -775,41 +799,67 @@ pub mod tests { } #[tokio::test] - async fn get_pools_active_stakes_returns_none_when_state_is_new() { - let state = State::new(); - assert!(state.get_pools_active_stakes(&vec![vec![1], vec![2]], 0).is_none()); + async fn get_pools_active_stakes_returns_zeros_when_state_is_new() { + let state = State::new(false); + assert_eq!( + (vec![0, 0], 0), + state.get_pools_active_stakes(&vec![vec![1], vec![2]], 0) + ); } #[tokio::test] - async fn get_pools_active_stakes_returns_none_when_epoch_is_not_found() { - let mut state = State::new(); + async fn get_pools_active_stakes_returns_zeros_when_not_found() { + let mut state = State::new(false); let mut msg = new_spdd_message(); msg.epoch = 1; let mut block = new_block(); block.epoch = 1; state.handle_spdd(&block, &msg); - assert!(state.get_pools_active_stakes(&vec![vec![1], vec![2]], block.epoch).is_none()); + assert_eq!( + (vec![0, 0], 0), + state.get_pools_active_stakes(&vec![vec![1], vec![2]], block.epoch) + ); + assert_eq!( + (vec![0, 0], 0), + state.get_pools_active_stakes(&vec![vec![1], vec![2]], block.epoch + 1) + ); } #[tokio::test] - async fn get_pools_active_stakes_returns_zero_when_active_stakes_not_found() { - let mut state = State::new(); + async fn get_pools_active_stakes_returns_data() { + let mut state = State::new(false); let mut msg = new_spdd_message(); + msg.spos = vec![ + ( + vec![1], + DelegatedStake { + active: 10, + live: 10, + }, + ), + ( + vec![2], + DelegatedStake { + active: 20, + live: 20, + }, + ), + ]; msg.epoch = 1; let mut block = new_block(); - block.epoch = 1; + block.number = 11; + block.epoch = 2; state.handle_spdd(&block, &msg); - let (active_stakes, total) = - state.get_pools_active_stakes(&vec![vec![1], vec![2]], block.epoch + 1).unwrap(); - assert_eq!(2, active_stakes.len()); - assert_eq!(0, active_stakes[0]); - assert_eq!(0, active_stakes[1]); - assert_eq!(0, total); + + assert_eq!( + (vec![10, 20], 30), + state.get_pools_active_stakes(&vec![vec![1], vec![2]], block.epoch + 1) + ); } #[tokio::test] - async fn get_pools_active_stakes_returns_data() { - let mut state = State::new(); + async fn front_active_stakes_removed() { + let mut state = State::new(false); let mut msg = new_spdd_message(); msg.spos = vec![ ( @@ -832,18 +882,29 @@ pub mod tests { block.number = 11; block.epoch = 2; state.handle_spdd(&block, &msg); + assert_eq!(1, state.active_stakes.get(&vec![1]).unwrap().len()); - let (active_stakes, total) = - state.get_pools_active_stakes(&vec![vec![1], vec![2]], block.epoch + 1).unwrap(); - assert_eq!(2, active_stakes.len()); - assert_eq!(10, active_stakes[0]); - assert_eq!(20, active_stakes[1]); - assert_eq!(30, total); + msg.epoch = 2; + block.number = 21; + block.epoch = 3; + state.handle_spdd(&block, &msg); + assert_eq!(2, state.active_stakes.get(&vec![1]).unwrap().len()); + + msg.epoch = 3; + block.number = 31; + block.epoch = 4; + state.handle_spdd(&block, &msg); + // one is popped from the front + assert_eq!(2, state.active_stakes.get(&vec![1]).unwrap().len()); + assert_eq!( + (vec![10, 20], 30), + state.get_pools_active_stakes(&vec![vec![1], vec![2]], block.epoch + 1) + ); } #[tokio::test] async fn spo_gets_restored_on_rollback() { - let mut state = State::new(); + let mut state = State::new(false); let mut msg = new_msg(); msg.certificates.push(TxCertificate::PoolRegistration(PoolRegistration { operator: vec![0], @@ -901,140 +962,15 @@ pub mod tests { }; } - #[tokio::test] - async fn active_stakes_get_restored_on_rollback() { - let mut state = State::new(); - let mut msg = new_spdd_message(); - msg.spos = vec![ - ( - vec![1], - DelegatedStake { - active: 10, - live: 10, - }, - ), - ( - vec![2], - DelegatedStake { - active: 20, - live: 20, - }, - ), - ( - vec![3], - DelegatedStake { - active: 30, - live: 30, - }, - ), - ]; - msg.epoch = 1; - let mut block = new_block(); - block.number = 11; - block.epoch = 2; - state.handle_spdd(&block, &msg); - println!( - "{}", - serde_json::to_string_pretty(&state.active_stakes_history).unwrap() - ); - - let current = state.current_active_stakes_state(); - assert!(!current.is_none()); - if let Some(current) = current { - assert_eq!(1, current.active_stakes.len()); - assert_eq!( - 10, - *current.active_stakes.get(&(block.epoch + 1)).unwrap().get(&vec![1]).unwrap() - ); - assert_eq!( - 20, - *current.active_stakes.get(&(block.epoch + 1)).unwrap().get(&vec![2]).unwrap() - ); - assert_eq!( - 30, - *current.active_stakes.get(&(block.epoch + 1)).unwrap().get(&vec![3]).unwrap() - ); - }; - - msg.spos = vec![ - ( - vec![1], - DelegatedStake { - active: 30, - live: 30, - }, - ), - ( - vec![2], - DelegatedStake { - active: 40, - live: 40, - }, - ), - ]; - msg.epoch = 2; - block.number = 21; - block.epoch = 3; - state.handle_spdd(&block, &msg); - println!( - "{}", - serde_json::to_string_pretty(&state.active_stakes_history).unwrap() - ); - - let current = state.current_active_stakes_state(); - assert!(!current.is_none()); - if let Some(current) = current { - assert_eq!(2, current.active_stakes.len()); - assert_eq!( - 30, - *current.active_stakes.get(&(block.epoch + 1)).unwrap().get(&vec![1]).unwrap() - ); - assert_eq!( - 40, - *current.active_stakes.get(&(block.epoch + 1)).unwrap().get(&vec![2]).unwrap() - ); - }; - - let mut msg = new_spdd_message(); - msg.epoch = 2; - block.number = 21; - block.epoch = 3; - state.handle_spdd(&block, &msg); - println!( - "{}", - serde_json::to_string_pretty(&state.active_stakes_history).unwrap() - ); - - let current = state.current_active_stakes_state(); - assert!(!current.is_none()); - if let Some(current) = current { - assert_eq!(2, current.active_stakes.len()); - assert_eq!( - 10, - *current.active_stakes.get(&block.epoch).unwrap().get(&vec![1]).unwrap() - ); - assert_eq!( - 20, - *current.active_stakes.get(&block.epoch).unwrap().get(&vec![2]).unwrap() - ); - assert_eq!( - 30, - *current.active_stakes.get(&block.epoch).unwrap().get(&vec![3]).unwrap() - ); - - assert!(current.active_stakes.get(&(block.epoch + 1)).unwrap().is_empty()); - }; - } - #[tokio::test] async fn get_retiring_pools_returns_empty_when_state_is_new() { - let state = State::new(); + let state = State::new(false); assert!(state.get_retiring_pools().is_empty()); } #[tokio::test] async fn get_retiring_pools_returns_pools() { - let mut state = State::new(); + let mut state = State::new(false); let mut msg = new_msg(); msg.certificates.push(TxCertificate::PoolRetirement(PoolRetirement { operator: vec![0],