Skip to content

refactor: SPO state to handle pools historical state #119

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions common/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ pub struct EpochActivityMessage {
/// Total fees in this epoch
pub total_fees: u64,

/// Fees by VRF vkey hash
pub fees: Vec<(KeyHash, u64)>,

/// List of all VRF vkey hashes used on blocks (SPO indicator) and
/// number of blocks produced
pub vrf_vkey_hashes: Vec<(KeyHash, usize)>,
Expand Down
116 changes: 93 additions & 23 deletions modules/epoch_activity_counter/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,31 @@ use acropolis_common::{
messages::{CardanoMessage, EpochActivityMessage, Message},
BlockInfo, KeyHash,
};
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::sync::Arc;
use tracing::info;
use tracing::{error, info};

pub struct State {
// Current epoch number
current_epoch: u64,

// Map of counts by VRF key hashes
vrf_vkey_hashes: HashMap<KeyHash, usize>,
blocks_minted: HashMap<KeyHash, usize>,

// Total blocks seen this epoch
total_blocks: usize,

// Maps of fees by vrf key hash
fees: HashMap<KeyHash, u64>,

// Total fees seen this epoch
total_fees: u64,

// Blocks seen this epoch
// removed when we calculate the fee of the block
// and cleared when we end the epoch
blocks: VecDeque<(u64, KeyHash)>,

// History of epochs (disabled by default)
epoch_history: Option<BTreeMap<u64, EpochActivityMessage>>,
}
Expand All @@ -31,9 +39,11 @@ impl State {
pub fn new(store_history: bool) -> Self {
Self {
current_epoch: 0,
vrf_vkey_hashes: HashMap::new(),
blocks_minted: HashMap::new(),
total_blocks: 0,
fees: HashMap::new(),
total_fees: 0,
blocks: VecDeque::new(),
epoch_history: if store_history {
Some(BTreeMap::new())
} else {
Expand All @@ -43,26 +53,43 @@ impl State {
}

// Handle a block minting, taking the SPO's VRF vkey
pub fn handle_mint(&mut self, _block: &BlockInfo, vrf_vkey: Option<&[u8]>) {
pub fn handle_mint(&mut self, block: &BlockInfo, vrf_vkey: Option<&[u8]>) {
self.total_blocks += 1;

if let Some(vrf_vkey) = vrf_vkey {
// Count one on this hash
*(self.vrf_vkey_hashes.entry(keyhash(vrf_vkey)).or_insert(0)) += 1;
*(self.blocks_minted.entry(keyhash(vrf_vkey)).or_insert(0)) += 1;

// Add the block to the queue
self.blocks.push_back((block.number, keyhash(vrf_vkey)));
}
}

// Handle block fees
pub fn handle_fees(&mut self, _block: &BlockInfo, total_fees: u64) {
self.total_fees += total_fees;
pub fn handle_fees(&mut self, block: &BlockInfo, block_fee: u64) {
self.total_fees += block_fee;

// find the block in the queue
loop {
let Some((front_number, vrf_key_hash)) = self.blocks.pop_front() else {
break;
};
if block.number > front_number {
// if CardanoMessage::BlockFees is received before CardanoMessage::BlockHeader.
error!("CardanoMessage::BlockFees is received before CardanoMessage::BlockHeader.");
} else if front_number == block.number {
// add this fee to fees for this vrf key hash
*(self.fees.entry(vrf_key_hash).or_insert(0)) += block_fee;
break;
}
}
}

// Handle end of epoch, returns message to be published
pub fn end_epoch(&mut self, block: &BlockInfo, epoch: u64) -> Arc<Message> {
info!(
epoch,
total_blocks = self.total_blocks,
unique_vrf_keys = self.vrf_vkey_hashes.len(),
unique_vrf_keys = self.blocks_minted.len(),
total_fees = self.total_fees,
"End of epoch"
);
Expand All @@ -80,8 +107,9 @@ impl State {

self.current_epoch = epoch + 1;
self.total_blocks = 0;
self.vrf_vkey_hashes.clear();
self.blocks_minted.clear();
self.total_fees = 0;
self.fees.clear();

message
}
Expand All @@ -91,7 +119,8 @@ impl State {
epoch: self.current_epoch,
total_blocks: self.total_blocks,
total_fees: self.total_fees,
vrf_vkey_hashes: self.vrf_vkey_hashes.iter().map(|(k, v)| (k.clone(), *v)).collect(),
fees: self.fees.iter().map(|(k, v)| (k.clone(), *v)).collect(),
vrf_vkey_hashes: self.blocks_minted.iter().map(|(k, v)| (k.clone(), *v)).collect(),
}
}

Expand All @@ -112,7 +141,7 @@ impl State {
pub fn get_blocks_minted_by_pools(&self, vrf_key_hashes: &Vec<KeyHash>) -> Vec<u64> {
vrf_key_hashes
.iter()
.map(|key_hash| self.vrf_vkey_hashes.get(key_hash).map(|v| *v as u64).unwrap_or(0))
.map(|key_hash| self.blocks_minted.get(key_hash).map(|v| *v as u64).unwrap_or(0))
.collect()
}
}
Expand Down Expand Up @@ -143,47 +172,81 @@ mod tests {
let state = State::new(false);
assert_eq!(state.total_blocks, 0);
assert_eq!(state.total_fees, 0);
assert!(state.vrf_vkey_hashes.is_empty());
assert!(state.blocks_minted.is_empty());
assert!(state.fees.is_empty());
}

#[test]
fn handle_mint_single_vrf_records_counts() {
let mut state = State::new(false);
let vrf = b"vrf_key";
let block = make_block(100);
let mut block = make_block(100);
state.handle_mint(&block, Some(vrf));
state.handle_fees(&block, 100);

block.number += 1;
state.handle_mint(&block, Some(vrf));
state.handle_fees(&block, 200);

assert_eq!(state.total_blocks, 2);
assert_eq!(state.vrf_vkey_hashes.len(), 1);
assert_eq!(state.vrf_vkey_hashes.get(&keyhash(vrf)), Some(&2));
assert_eq!(state.blocks_minted.len(), 1);
assert_eq!(state.blocks_minted.get(&keyhash(vrf)), Some(&2));
assert_eq!(state.fees.get(&keyhash(vrf)), Some(&300));
assert_eq!(state.blocks.len(), 0);
}

#[test]
fn handle_mint_multiple_vrf_records_counts() {
let mut state = State::new(false);
let block = make_block(100);
let mut block = make_block(100);
state.handle_mint(&block, Some(b"vrf_1"));
block.number += 1;
state.handle_mint(&block, Some(b"vrf_2"));
block.number += 1;
state.handle_mint(&block, Some(b"vrf_2"));

assert_eq!(state.total_blocks, 3);
assert_eq!(state.vrf_vkey_hashes.len(), 2);
assert_eq!(state.blocks_minted.len(), 2);
assert_eq!(state.blocks.len(), 3);
assert_eq!(
state.vrf_vkey_hashes.iter().find(|(k, _)| *k == &keyhash(b"vrf_1")).map(|(_, v)| *v),
state.blocks_minted.iter().find(|(k, _)| *k == &keyhash(b"vrf_1")).map(|(_, v)| *v),
Some(1)
);
assert_eq!(
state.vrf_vkey_hashes.iter().find(|(k, _)| *k == &keyhash(b"vrf_2")).map(|(_, v)| *v),
state.blocks_minted.iter().find(|(k, _)| *k == &keyhash(b"vrf_2")).map(|(_, v)| *v),
Some(2)
);

block = make_block(100);
state.handle_fees(&block, 100);
block.number += 1;
state.handle_fees(&block, 200);
block.number += 1;
state.handle_fees(&block, 300);

assert_eq!(state.blocks.len(), 0);
assert_eq!(state.fees.len(), 2);
assert_eq!(
state.fees.iter().find(|(k, _)| *k == &keyhash(b"vrf_1")).map(|(_, v)| *v),
Some(100)
);
assert_eq!(
state.fees.iter().find(|(k, _)| *k == &keyhash(b"vrf_2")).map(|(_, v)| *v),
Some(500)
);
}

#[test]
fn handle_fees_counts_fees() {
let mut state = State::new(false);
let block = make_block(100);
let mut block = make_block(100);

state.blocks = VecDeque::from([
(block.number, keyhash(b"vrf_1")),
(block.number + 1, keyhash(b"vrf_2")),
]);
state.handle_fees(&block, 100);
block.number += 1;
state.handle_fees(&block, 250);

assert_eq!(state.total_fees, 350);
Expand All @@ -205,13 +268,18 @@ mod tests {
assert_eq!(ea.total_blocks, 1);
assert_eq!(ea.total_fees, 123);
assert_eq!(ea.vrf_vkey_hashes.len(), 1);
assert_eq!(ea.fees.len(), 1);
assert_eq!(
ea.vrf_vkey_hashes
.iter()
.find(|(k, _)| k == &keyhash(b"vrf_1"))
.map(|(_, v)| *v),
Some(1)
);
assert_eq!(
ea.fees.iter().find(|(k, _)| k == &keyhash(b"vrf_1")).map(|(_, v)| *v),
Some(123)
);
}
_ => panic!("Expected EpochActivity message"),
}
Expand All @@ -220,7 +288,9 @@ mod tests {
assert_eq!(state.current_epoch, 1);
assert_eq!(state.total_blocks, 0);
assert_eq!(state.total_fees, 0);
assert!(state.vrf_vkey_hashes.is_empty());
assert!(state.blocks_minted.is_empty());
assert!(state.fees.is_empty());
assert!(state.blocks.is_empty());
}

#[test]
Expand Down
2 changes: 2 additions & 0 deletions modules/spo_state/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ serde_json = "1.0.132"
serde_with = { version = "3.12.0", features = ["hex"] }
hex = "0.4.3"
imbl = { version = "5.0.0", features = ["serde"] }
dashmap = "6.1.0"
rayon = "1.11.0"

[lib]
path = "src/spo_state.rs"
26 changes: 12 additions & 14 deletions modules/spo_state/src/spo_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const DEFAULT_SPO_STATE_TOPIC: &str = "cardano.spo.state";
const DEFAULT_SPDD_SUBSCRIBE_TOPIC: &str = "cardano.spo.distribution";
const DEFAULT_EPOCH_ACTIVITY_TOPIC: &str = "cardano.epoch.activity";

const DEFAULT_STORE_HISTORY: (&str, bool) = ("store-history", false);

const POOLS_STATE_TOPIC: &str = "pools-state";
/// SPO State module
#[module(
Expand Down Expand Up @@ -152,6 +154,9 @@ impl SPOState {
.unwrap_or(DEFAULT_EPOCH_ACTIVITY_TOPIC.to_string());
info!("Creating subscriber on '{epoch_activity_topic}'");

let store_history =
config.get_bool(DEFAULT_STORE_HISTORY.0).unwrap_or(DEFAULT_STORE_HISTORY.1);

let maybe_snapshot_topic = config
.get_string("snapshot-topic")
.ok()
Expand All @@ -162,7 +167,7 @@ impl SPOState {
.unwrap_or(DEFAULT_SPO_STATE_TOPIC.to_string());
info!("Creating SPO state publisher on '{spo_state_topic}'");

let state = Arc::new(Mutex::new(State::new()));
let state = Arc::new(Mutex::new(State::new(store_history)));

// handle pools-state
let state_rest_blockfrost = state.clone();
Expand Down Expand Up @@ -195,19 +200,12 @@ impl SPOState {
pools_operators,
epoch,
} => {
if let Some((active_stakes, total_active_stake)) =
guard.get_pools_active_stakes(pools_operators, *epoch)
{
PoolsStateQueryResponse::PoolsActiveStakes(PoolsActiveStakes {
active_stakes,
total_active_stake,
})
} else {
PoolsStateQueryResponse::PoolsActiveStakes(PoolsActiveStakes {
active_stakes: vec![0; pools_operators.len()],
total_active_stake: 0,
})
}
let (active_stakes, total_active_stake) =
guard.get_pools_active_stakes(pools_operators, *epoch);
PoolsStateQueryResponse::PoolsActiveStakes(PoolsActiveStakes {
active_stakes,
total_active_stake,
})
}

PoolsStateQuery::GetPoolsTotalBlocksMinted { vrf_key_hashes } => {
Expand Down
Loading