From c2696eb87994d823979cec372c2576c9b3ca64f9 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Mon, 11 Aug 2025 16:31:53 +0200 Subject: [PATCH 1/5] refactor: add cargo fmt check to github CI. - prettify all files which need it. --- .../workflows/run-tests-on-push-to-main.yml | 10 +- common/src/rational_number.rs | 2 +- modules/accounts_state/src/accounts_state.rs | 77 +++-- modules/accounts_state/src/monetary.rs | 90 ++--- modules/accounts_state/src/rest.rs | 4 +- modules/accounts_state/src/rewards.rs | 85 ++--- modules/accounts_state/src/snapshot.rs | 131 +++++--- .../src/spo_distribution_publisher.rs | 6 +- modules/accounts_state/src/state.rs | 312 ++++++++++-------- modules/block_unpacker/src/block_unpacker.rs | 6 +- modules/drep_state/src/drep_state.rs | 8 +- .../src/epoch_activity_counter.rs | 18 +- .../src/genesis_bootstrapper.rs | 13 +- .../governance_state/src/governance_state.rs | 4 +- modules/governance_state/src/state.rs | 58 ++-- modules/governance_state/src/voting_state.rs | 31 +- modules/parameters_state/build.rs | 25 +- .../parameters_state/src/alonzo_genesis.rs | 25 +- .../parameters_state/src/genesis_params.rs | 100 ++++-- .../parameters_state/src/parameters_state.rs | 6 +- .../src/snapshot_bootstrapper.rs | 4 +- modules/spo_state/src/spo_state.rs | 8 +- modules/spo_state/src/state.rs | 41 ++- .../src/stake_delta_filter.rs | 31 +- modules/tx_unpacker/src/tx_unpacker.rs | 272 ++++++++------- .../src/body_fetcher.rs | 36 +- .../src/upstream_cache.rs | 110 +++--- .../src/upstream_chain_fetcher.rs | 64 ++-- modules/upstream_chain_fetcher/src/utils.rs | 53 +-- .../src/fjall_async_immutable_utxo_store.rs | 5 +- modules/utxo_state/src/utxo_state.rs | 8 +- processes/omnibus/src/main.rs | 4 +- processes/replayer/src/main.rs | 2 +- processes/replayer/src/playback.rs | 124 +++---- processes/replayer/src/recorder.rs | 39 ++- processes/replayer/src/replayer_config.rs | 23 +- 36 files changed, 1081 insertions(+), 754 deletions(-) diff --git a/.github/workflows/run-tests-on-push-to-main.yml b/.github/workflows/run-tests-on-push-to-main.yml index 636b544..c39605c 100644 --- a/.github/workflows/run-tests-on-push-to-main.yml +++ b/.github/workflows/run-tests-on-push-to-main.yml @@ -14,13 +14,17 @@ env: jobs: build: - runs-on: ubuntu-latest steps: - name: Checkout code uses: actions/checkout@v4 - - name: Build + + - name: Run Format + run: cargo fmt --all -- --check + + - name: Run Build run: cargo build --verbose - - name: Run tests + + - name: Run Tests run: cargo test --verbose diff --git a/common/src/rational_number.rs b/common/src/rational_number.rs index 1e01b0f..059703d 100644 --- a/common/src/rational_number.rs +++ b/common/src/rational_number.rs @@ -9,8 +9,8 @@ pub fn rational_number_from_f32(f: f32) -> Result { #[cfg(test)] mod tests { - use crate::rational_number::RationalNumber; use crate::rational_number::rational_number_from_f32; + use crate::rational_number::RationalNumber; #[test] fn test_fractions() -> Result<(), anyhow::Error> { diff --git a/modules/accounts_state/src/accounts_state.rs b/modules/accounts_state/src/accounts_state.rs index 5afa8ed..4dce61a 100644 --- a/modules/accounts_state/src/accounts_state.rs +++ b/modules/accounts_state/src/accounts_state.rs @@ -20,10 +20,10 @@ mod spo_distribution_publisher; use spo_distribution_publisher::SPODistributionPublisher; mod state; use state::State; -mod snapshot; -mod rewards; mod monetary; mod rest; +mod rewards; +mod snapshot; use acropolis_common::queries::accounts::{ AccountInfo, AccountsStateQuery, AccountsStateQueryResponse, }; @@ -124,7 +124,7 @@ impl AccountsState { current_block = Some(block_info.clone()); block_info.new_epoch && block_info.epoch > 0 } - _ => false + _ => false, }; // Read from epoch-boundary messages only when it's a new epoch @@ -138,8 +138,10 @@ impl AccountsState { let (_, message) = dreps_message_f.await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::DRepState(dreps_msg))) => { - let span = info_span!("account_state.handle_drep_state", - block = block_info.number); + let span = info_span!( + "account_state.handle_drep_state", + block = block_info.number + ); async { Self::check_sync(¤t_block, &block_info); state.handle_drep_state(&dreps_msg); @@ -148,7 +150,9 @@ impl AccountsState { if let Err(e) = drep_publisher.publish_drdd(block_info, drdd).await { error!("Error publishing drep voting stake distribution: {e:#}") } - }.instrument(span).await; + } + .instrument(span) + .await; } _ => error!("Unexpected message type: {message:?}"), @@ -158,8 +162,8 @@ impl AccountsState { let (_, message) = spos_message_f.await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::SPOState(spo_msg))) => { - let span = info_span!("account_state.handle_spo_state", - block = block_info.number); + let span = + info_span!("account_state.handle_spo_state", block = block_info.number); async { Self::check_sync(¤t_block, &block_info); state @@ -171,7 +175,9 @@ impl AccountsState { if let Err(e) = spo_publisher.publish_spdd(block_info, spdd).await { error!("Error publishing SPO stake distribution: {e:#}") } - }.instrument(span).await; + } + .instrument(span) + .await; } _ => error!("Unexpected message type: {message:?}"), @@ -181,8 +187,10 @@ impl AccountsState { let (_, message) = ea_message_f.await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::EpochActivity(ea_msg))) => { - let span = info_span!("account_state.handle_epoch_activity", - block = block_info.number); + let span = info_span!( + "account_state.handle_epoch_activity", + block = block_info.number + ); async { Self::check_sync(¤t_block, &block_info); state @@ -190,7 +198,9 @@ impl AccountsState { .await .inspect_err(|e| error!("EpochActivity handling error: {e:#}")) .ok(); - }.instrument(span).await; + } + .instrument(span) + .await; } _ => error!("Unexpected message type: {message:?}"), @@ -201,8 +211,10 @@ impl AccountsState { let (_, message) = params_message_f.await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::ProtocolParams(params_msg))) => { - let span = info_span!("account_state.handle_parameters", - block = block_info.number); + let span = info_span!( + "account_state.handle_parameters", + block = block_info.number + ); async { Self::check_sync(¤t_block, &block_info); if let Some(ref block) = current_block { @@ -219,7 +231,9 @@ impl AccountsState { .handle_parameters(params_msg) .inspect_err(|e| error!("Messaging handling error: {e}")) .ok(); - }.instrument(span).await; + } + .instrument(span) + .await; } _ => error!("Unexpected message type: {message:?}"), @@ -236,8 +250,10 @@ impl AccountsState { .handle_tx_certificates(tx_certs_msg) .inspect_err(|e| error!("TxCertificates handling error: {e:#}")) .ok(); - }.instrument(span).await; - } + } + .instrument(span) + .await; + } _ => error!("Unexpected message type: {certs_message:?}"), } @@ -246,15 +262,19 @@ impl AccountsState { let (_, message) = withdrawals_message_f.await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::Withdrawals(withdrawals_msg))) => { - let span = info_span!("account_state.handle_withdrawals", - block = block_info.number); + let span = info_span!( + "account_state.handle_withdrawals", + block = block_info.number + ); async { Self::check_sync(¤t_block, &block_info); state .handle_withdrawals(withdrawals_msg) .inspect_err(|e| error!("Withdrawals handling error: {e:#}")) .ok(); - }.instrument(span).await; + } + .instrument(span) + .await; } _ => error!("Unexpected message type: {message:?}"), @@ -264,15 +284,19 @@ impl AccountsState { let (_, message) = stake_message_f.await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::StakeAddressDeltas(deltas_msg))) => { - let span = info_span!("account_state.handle_stake_deltas", - block = block_info.number); + let span = info_span!( + "account_state.handle_stake_deltas", + block = block_info.number + ); async { Self::check_sync(¤t_block, &block_info); state .handle_stake_deltas(deltas_msg) .inspect_err(|e| error!("StakeAddressDeltas handling error: {e:#}")) .ok(); - }.instrument(span).await; + } + .instrument(span) + .await; } _ => error!("Unexpected message type: {message:?}"), @@ -292,7 +316,8 @@ impl AccountsState { error!( expected = block.number, actual = actual.number, - "Messages out of sync"); + "Messages out of sync" + ); } } } @@ -443,7 +468,9 @@ impl AccountsState { if let Some(state) = history_tick.lock().await.current() { state.tick().await.inspect_err(|e| error!("Tick error: {e}")).ok(); } - }.instrument(span).await; + } + .instrument(span) + .await; } } } diff --git a/modules/accounts_state/src/monetary.rs b/modules/accounts_state/src/monetary.rs index a83c5b5..d3027c3 100644 --- a/modules/accounts_state/src/monetary.rs +++ b/modules/accounts_state/src/monetary.rs @@ -1,19 +1,15 @@ //! Acropolis AccountsState: monetary (reserves, treasury) calculations -use acropolis_common::{ - Lovelace, ShelleyParams, - rational_number::RationalNumber, -}; use crate::state::Pots; -use anyhow::{Result, anyhow}; -use tracing::info; -use bigdecimal::{BigDecimal, ToPrimitive, One}; +use acropolis_common::{rational_number::RationalNumber, Lovelace, ShelleyParams}; +use anyhow::{anyhow, Result}; +use bigdecimal::{BigDecimal, One, ToPrimitive}; use std::str::FromStr; +use tracing::info; /// Result of monetary calculation #[derive(Debug, Default, Clone)] pub struct MonetaryResult { - /// Updated pots pub pots: Pots, @@ -23,10 +19,12 @@ pub struct MonetaryResult { /// Calculate monetary change at the start of an epoch, returning updated pots and total /// available for stake rewards -pub fn calculate_monetary_change(params: &ShelleyParams, - old_pots: &Pots, - total_fees_last_epoch: Lovelace, - total_non_obft_blocks: usize) -> Result { +pub fn calculate_monetary_change( + params: &ShelleyParams, + old_pots: &Pots, + total_fees_last_epoch: Lovelace, + total_non_obft_blocks: usize, +) -> Result { let mut new_pots = old_pots.clone(); // Add fees to reserves to start with - they will get allocated to treasury and stake @@ -44,14 +42,12 @@ pub fn calculate_monetary_change(params: &ShelleyParams, // Top-slice some for treasury let treasury_cut = RationalNumber::new(2, 10); // TODO odd values again! ¶ms.protocol_params.treasury_cut; // Tau - let treasury_increase = (&total_reward_pot - * BigDecimal::from(treasury_cut.numer()) - / BigDecimal::from(treasury_cut.denom())) - .with_scale(0); + let treasury_increase = (&total_reward_pot * BigDecimal::from(treasury_cut.numer()) + / BigDecimal::from(treasury_cut.denom())) + .with_scale(0); - let treasury_increase_u64 = treasury_increase - .to_u64() - .ok_or(anyhow!("Can't calculate integral treasury cut"))?; + let treasury_increase_u64 = + treasury_increase.to_u64().ok_or(anyhow!("Can't calculate integral treasury cut"))?; new_pots.treasury += treasury_increase_u64; new_pots.reserves -= treasury_increase_u64; @@ -74,29 +70,32 @@ fn calculate_eta(params: &ShelleyParams, total_non_obft_blocks: usize) -> Result let active_slots_coeff = BigDecimal::from_str(¶ms.active_slots_coeff.to_string())?; let epoch_length = BigDecimal::from(params.epoch_length); - let eta = if decentralisation >= &RationalNumber::new(8,10) { + let eta = if decentralisation >= &RationalNumber::new(8, 10) { BigDecimal::one() } else { - let expected_blocks = epoch_length * active_slots_coeff * - (BigDecimal::one() - BigDecimal::from(decentralisation.numer()) - / BigDecimal::from(decentralisation.denom())); + let expected_blocks = epoch_length + * active_slots_coeff + * (BigDecimal::one() + - BigDecimal::from(decentralisation.numer()) + / BigDecimal::from(decentralisation.denom())); - (BigDecimal::from(total_non_obft_blocks as u64) / expected_blocks) - .min(BigDecimal::one()) + (BigDecimal::from(total_non_obft_blocks as u64) / expected_blocks).min(BigDecimal::one()) }; Ok(eta) } // Calculate monetary expansion based on current reserves -fn calculate_monetary_expansion(params: &ShelleyParams, reserves: Lovelace, eta: &BigDecimal) - -> BigDecimal { +fn calculate_monetary_expansion( + params: &ShelleyParams, + reserves: Lovelace, + eta: &BigDecimal, +) -> BigDecimal { let monetary_expansion_factor = RationalNumber::new(3, 1000); // TODO odd values coming in! ¶ms.protocol_params.monetary_expansion; // Rho - let monetary_expansion = (BigDecimal::from(reserves) - * eta - * BigDecimal::from(monetary_expansion_factor.numer()) - / BigDecimal::from(monetary_expansion_factor.denom())) + let monetary_expansion = + (BigDecimal::from(reserves) * eta * BigDecimal::from(monetary_expansion_factor.numer()) + / BigDecimal::from(monetary_expansion_factor.denom())) .with_scale(0); info!(eta=%eta, rho=%monetary_expansion_factor, %monetary_expansion, "Monetary:"); @@ -108,7 +107,9 @@ fn calculate_monetary_expansion(params: &ShelleyParams, reserves: Lovelace, eta: #[cfg(test)] mod tests { use super::*; - use acropolis_common::{ NetworkId, Nonce, NonceVariant, ProtocolVersion, ShelleyProtocolParams }; + use acropolis_common::{ + NetworkId, Nonce, NonceVariant, ProtocolVersion, ShelleyProtocolParams, + }; use chrono::{DateTime, Utc}; // Known values at start of Shelley - from Java reference and DBSync @@ -122,7 +123,7 @@ mod tests { const EPOCH_210_RESERVES: Lovelace = 13_278_197_552_770_393; const EPOCH_210_TREASURY: Lovelace = 16_306_644_182_013; - const EPOCH_210_REFUNDS_TO_TREASURY: Lovelace = 500_000_000; // 1 SPO with unknown reward + const EPOCH_210_REFUNDS_TO_TREASURY: Lovelace = 500_000_000; // 1 SPO with unknown reward const EPOCH_211_RESERVES: Lovelace = 13_270_236_767_315_870; const EPOCH_211_TREASURY: Lovelace = 24_275_595_982_960; @@ -136,10 +137,7 @@ mod tests { network_id: NetworkId::Mainnet, network_magic: 76482407, protocol_params: ShelleyProtocolParams { - protocol_version: ProtocolVersion { - major: 2, - minor: 0, - }, + protocol_version: ProtocolVersion { major: 2, minor: 0 }, max_tx_size: 16384, max_block_body_size: 65536, max_block_header_size: 1100, @@ -153,12 +151,12 @@ mod tests { pool_retire_max_epoch: 18, extra_entropy: Nonce { tag: NonceVariant::NeutralNonce, - hash: None + hash: None, }, decentralisation_param: RationalNumber::new(1, 1), monetary_expansion: RationalNumber::new(3, 1000), treasury_cut: RationalNumber::new(2, 10), - pool_pledge_influence: RationalNumber::new(3, 10) + pool_pledge_influence: RationalNumber::new(3, 10), }, security_param: 2160, slot_length: 1, @@ -170,7 +168,6 @@ mod tests { #[test] fn epoch_208_monetary_change() { - let params = shelley_params(); let pots = Pots { reserves: EPOCH_208_RESERVES, @@ -182,14 +179,16 @@ mod tests { let result = calculate_monetary_change(¶ms, &pots, 0, 0).unwrap(); // Epoch 209 reserves - all goes to treasury - assert_eq!(result.pots.reserves, EPOCH_208_RESERVES - EPOCH_209_TREASURY); + assert_eq!( + result.pots.reserves, + EPOCH_208_RESERVES - EPOCH_209_TREASURY + ); assert_eq!(result.pots.reserves - EPOCH_208_MIRS, EPOCH_209_RESERVES); assert_eq!(result.pots.treasury, EPOCH_209_TREASURY); } #[test] fn epoch_209_monetary_change() { - let params = shelley_params(); let pots = Pots { reserves: EPOCH_209_RESERVES, @@ -207,7 +206,6 @@ mod tests { #[test] fn epoch_210_monetary_change() { - let params = shelley_params(); let pots = Pots { reserves: EPOCH_210_RESERVES, @@ -220,7 +218,9 @@ mod tests { // Epoch 211 reserves assert_eq!(result.pots.reserves, EPOCH_211_RESERVES); - assert_eq!(result.pots.treasury + EPOCH_210_REFUNDS_TO_TREASURY, EPOCH_211_TREASURY); + assert_eq!( + result.pots.treasury + EPOCH_210_REFUNDS_TO_TREASURY, + EPOCH_211_TREASURY + ); } - } diff --git a/modules/accounts_state/src/rest.rs b/modules/accounts_state/src/rest.rs index 3a7c628..00bcf0d 100644 --- a/modules/accounts_state/src/rest.rs +++ b/modules/accounts_state/src/rest.rs @@ -8,9 +8,7 @@ use tokio::sync::Mutex; use crate::state::State; use acropolis_common::state_history::StateHistory; -use acropolis_common::{ - messages::RESTResponse, DelegatedStake, Lovelace, -}; +use acropolis_common::{messages::RESTResponse, DelegatedStake, Lovelace}; /// REST response structure for /accounts/{stake_address} #[derive(serde::Serialize)] diff --git a/modules/accounts_state/src/rewards.rs b/modules/accounts_state/src/rewards.rs index 42b7a50..6606cbd 100644 --- a/modules/accounts_state/src/rewards.rs +++ b/modules/accounts_state/src/rewards.rs @@ -1,16 +1,15 @@ //! Acropolis AccountsState: rewards calculations +use crate::snapshot::Snapshot; use acropolis_common::{ - KeyHash, Lovelace, ShelleyParams, RewardAccount, - rational_number::RationalNumber, + rational_number::RationalNumber, KeyHash, Lovelace, RewardAccount, ShelleyParams, }; -use crate::snapshot::Snapshot; -use std::sync::Arc; -use anyhow::{Result, bail}; -use tracing::{debug, info, warn}; -use bigdecimal::{BigDecimal, ToPrimitive, Zero, One}; +use anyhow::{bail, Result}; +use bigdecimal::{BigDecimal, One, ToPrimitive, Zero}; use std::cmp::min; use std::collections::HashMap; +use std::sync::Arc; +use tracing::{debug, info, warn}; /// Result of a rewards calculation #[derive(Debug, Default)] @@ -25,7 +24,6 @@ pub struct RewardsResult { /// State for rewards calculation #[derive(Debug, Default, Clone)] pub struct RewardsState { - /// Latest snapshot (epoch i) (if any) pub mark: Arc, @@ -48,9 +46,13 @@ impl RewardsState { /// The epoch is the one we are now entering - we assume the snapshot for this has already been /// taken. /// Note immutable - only state change allowed is to push a new snapshot - pub fn calculate_rewards(&self, epoch: u64, params: &ShelleyParams, - total_blocks: usize, - stake_rewards: BigDecimal) -> Result { + pub fn calculate_rewards( + &self, + epoch: u64, + params: &ShelleyParams, + total_blocks: usize, + stake_rewards: BigDecimal, + ) -> Result { let mut result = RewardsResult::default(); // Calculate total supply (total in circulation + treasury) or @@ -77,7 +79,6 @@ impl RewardsState { // from epoch (i-2) "Go" let mut total_paid_to_pools: Lovelace = 0; for (operator_id, spo) in self.go.spos.iter() { - // Actual blocks produced as proportion of epoch (Beta) let relative_blocks = BigDecimal::from(spo.blocks_produced as u64) / BigDecimal::from(total_blocks as u64); @@ -90,13 +91,17 @@ impl RewardsState { } // Get the stake actually delegated by the owners accounts to this SPO - let pool_owner_stake = self.go.get_stake_delegated_to_spo_by_addresses( - &operator_id, &spo.pool_owners); + let pool_owner_stake = + self.go.get_stake_delegated_to_spo_by_addresses(&operator_id, &spo.pool_owners); // If they haven't met their pledge, no dice if pool_owner_stake < spo.pledge { - warn!("SPO {} has owner stake {} less than pledge {} - skipping", - hex::encode(&operator_id), pool_owner_stake, spo.pledge); + warn!( + "SPO {} has owner stake {} less than pledge {} - skipping", + hex::encode(&operator_id), + pool_owner_stake, + spo.pledge + ); continue; } @@ -104,35 +109,32 @@ impl RewardsState { // Relative stake as fraction of total supply (sigma), and capped with 1/k (sigma') let relative_pool_stake = &pool_stake / &total_supply; - let capped_relative_pool_stake = min(&relative_pool_stake, - &relative_pool_saturation_size); + let capped_relative_pool_stake = + min(&relative_pool_stake, &relative_pool_saturation_size); // Stake pledged by operator (s) and capped with 1/k (s') let relative_pool_pledge = &pool_pledge / &total_supply; - let capped_relative_pool_pledge = min(&relative_pool_pledge, - &relative_pool_saturation_size); + let capped_relative_pool_pledge = + min(&relative_pool_pledge, &relative_pool_saturation_size); // Get the optimum reward for this pool - let optimum_rewards = ( - (&stake_rewards / (BigDecimal::one() + &pledge_influence_factor)) - * - ( - capped_relative_pool_stake + ( - capped_relative_pool_pledge * &pledge_influence_factor * ( - capped_relative_pool_stake - ( - capped_relative_pool_pledge * ( - (&relative_pool_saturation_size - capped_relative_pool_stake) - / &relative_pool_saturation_size) - ) - ) - ) / &relative_pool_saturation_size - ) - ).with_scale(0); + let optimum_rewards = ((&stake_rewards + / (BigDecimal::one() + &pledge_influence_factor)) + * (capped_relative_pool_stake + + (capped_relative_pool_pledge + * &pledge_influence_factor + * (capped_relative_pool_stake + - (capped_relative_pool_pledge + * ((&relative_pool_saturation_size + - capped_relative_pool_stake) + / &relative_pool_saturation_size)))) + / &relative_pool_saturation_size)) + .with_scale(0); // If decentralisation_param >= 0.8 => performance = 1 // Shelley Delegation Spec 3.8.3 let decentralisation = ¶ms.protocol_params.decentralisation_param; - let pool_performance = if decentralisation >= &RationalNumber::new(8,10) { + let pool_performance = if decentralisation >= &RationalNumber::new(8, 10) { BigDecimal::one() } else { relative_blocks.clone() / relative_pool_stake.clone() @@ -157,7 +159,7 @@ impl RewardsState { let margin = ((&pool_rewards - &fixed_cost) * BigDecimal::from(spo.margin.numerator) // TODO use RationalNumber / BigDecimal::from(spo.margin.denominator)) - .with_scale(0); + .with_scale(0); let costs = &fixed_cost + &margin; let remainder = &pool_rewards - &costs; @@ -207,8 +209,13 @@ impl RewardsState { } }); - info!(num_delegators_paid, total_paid_to_delegators, total_paid_to_pools, - total=result.total_paid, "Rewards actually paid:"); + info!( + num_delegators_paid, + total_paid_to_delegators, + total_paid_to_pools, + total = result.total_paid, + "Rewards actually paid:" + ); Ok(result) } diff --git a/modules/accounts_state/src/snapshot.rs b/modules/accounts_state/src/snapshot.rs index e9930a6..ba456eb 100644 --- a/modules/accounts_state/src/snapshot.rs +++ b/modules/accounts_state/src/snapshot.rs @@ -1,10 +1,10 @@ //! Acropolis AccountsState: snapshot for rewards calculations +use crate::state::{Pots, StakeAddressState}; +use acropolis_common::{KeyHash, Lovelace, PoolRegistration, Ratio, RewardAccount}; +use imbl::OrdMap; use std::collections::HashMap; -use acropolis_common::{Lovelace, KeyHash, PoolRegistration, Ratio, RewardAccount}; -use crate::state::{StakeAddressState, Pots}; use tracing::info; -use imbl::OrdMap; /// SPO data for stake snapshot #[derive(Debug, Default)] @@ -51,13 +51,15 @@ pub struct Snapshot { } impl Snapshot { - /// Get a stake snapshot based the current stake addresses - pub fn new(epoch: u64, stake_addresses: &HashMap, - spos: &OrdMap, - spo_block_counts: &HashMap, - pots: &Pots, - fees: Lovelace) -> Self { + pub fn new( + epoch: u64, + stake_addresses: &HashMap, + spos: &OrdMap, + spo_block_counts: &HashMap, + pots: &Pots, + fees: Lovelace, + ) -> Self { let mut snapshot = Self { _epoch: epoch, pots: pots.clone(), @@ -84,16 +86,19 @@ impl Snapshot { // See how many blocks produced let blocks_produced = spo_block_counts.get(spo_id).copied().unwrap_or(0); - snapshot.spos.insert(spo_id.clone(), SnapshotSPO { - delegators: vec![(hash.clone(), sas.utxo_value)], - total_stake: sas.utxo_value, - pledge: spo.pledge, - fixed_cost: spo.cost, - margin: spo.margin.clone(), - blocks_produced, - pool_owners: spo.pool_owners.clone(), - reward_account: spo.reward_account.clone(), - }); + snapshot.spos.insert( + spo_id.clone(), + SnapshotSPO { + delegators: vec![(hash.clone(), sas.utxo_value)], + total_stake: sas.utxo_value, + pledge: spo.pledge, + fixed_cost: spo.cost, + margin: spo.margin.clone(), + blocks_produced, + pool_owners: spo.pool_owners.clone(), + reward_account: spo.reward_account.clone(), + }, + ); } } total_stake += sas.utxo_value; @@ -101,22 +106,30 @@ impl Snapshot { } // Calculate the total rewards just for logging & comparison - let total_rewards: u64 = stake_addresses - .values() - .map(|sas| sas.rewards) - .sum(); + let total_rewards: u64 = stake_addresses.values().map(|sas| sas.rewards).sum(); // Log to be comparable with DBSync ada_pots table - info!(epoch, treasury=pots.treasury, reserves=pots.reserves, rewards=total_rewards, - deposits=pots.deposits, total_stake, spos=snapshot.spos.len(), fees, - "Snapshot"); + info!( + epoch, + treasury = pots.treasury, + reserves = pots.reserves, + rewards = total_rewards, + deposits = pots.deposits, + total_stake, + spos = snapshot.spos.len(), + fees, + "Snapshot" + ); snapshot } /// Get the total stake held by a vector of stake addresses for a particular SPO (by ID) - pub fn get_stake_delegated_to_spo_by_addresses(&self, spo: &KeyHash, - addresses: &[KeyHash]) -> Lovelace { + pub fn get_stake_delegated_to_spo_by_addresses( + &self, + spo: &KeyHash, + addresses: &[KeyHash], + ) -> Lovelace { let Some(snapshot_spo) = self.spos.get(spo) else { return 0; }; @@ -151,33 +164,51 @@ mod tests { let addr4: KeyHash = vec![0x14]; let mut stake_addresses: HashMap = HashMap::new(); - stake_addresses.insert(addr1.clone(), StakeAddressState { - utxo_value: 42, - delegated_spo: Some(spo1.clone()), - .. StakeAddressState::default() - }); - stake_addresses.insert(addr2.clone(), StakeAddressState { - utxo_value: 99, - delegated_spo: Some(spo2.clone()), - .. StakeAddressState::default() - }); - stake_addresses.insert(addr3.clone(), StakeAddressState { - utxo_value: 0, - delegated_spo: Some(spo1.clone()), - .. StakeAddressState::default() - }); - stake_addresses.insert(addr4.clone(), StakeAddressState { - utxo_value: 1000000, - delegated_spo: None, - .. StakeAddressState::default() - }); + stake_addresses.insert( + addr1.clone(), + StakeAddressState { + utxo_value: 42, + delegated_spo: Some(spo1.clone()), + ..StakeAddressState::default() + }, + ); + stake_addresses.insert( + addr2.clone(), + StakeAddressState { + utxo_value: 99, + delegated_spo: Some(spo2.clone()), + ..StakeAddressState::default() + }, + ); + stake_addresses.insert( + addr3.clone(), + StakeAddressState { + utxo_value: 0, + delegated_spo: Some(spo1.clone()), + ..StakeAddressState::default() + }, + ); + stake_addresses.insert( + addr4.clone(), + StakeAddressState { + utxo_value: 1000000, + delegated_spo: None, + ..StakeAddressState::default() + }, + ); let mut spos: OrdMap = OrdMap::new(); spos.insert(spo1.clone(), PoolRegistration::default()); spos.insert(spo2.clone(), PoolRegistration::default()); let spo_block_counts: HashMap = HashMap::new(); - let snapshot = Snapshot::new(42, &stake_addresses, &spos, &spo_block_counts, - &Pots::default(), 0); + let snapshot = Snapshot::new( + 42, + &stake_addresses, + &spos, + &spo_block_counts, + &Pots::default(), + 0, + ); assert_eq!(snapshot.spos.len(), 2); diff --git a/modules/accounts_state/src/spo_distribution_publisher.rs b/modules/accounts_state/src/spo_distribution_publisher.rs index c40c6f0..7c963bc 100644 --- a/modules/accounts_state/src/spo_distribution_publisher.rs +++ b/modules/accounts_state/src/spo_distribution_publisher.rs @@ -1,5 +1,5 @@ -use acropolis_common::messages::{CardanoMessage, SPOStakeDistributionMessage, Message}; -use acropolis_common::{KeyHash, BlockInfo, DelegatedStake}; +use acropolis_common::messages::{CardanoMessage, Message, SPOStakeDistributionMessage}; +use acropolis_common::{BlockInfo, DelegatedStake, KeyHash}; use caryatid_sdk::Context; use std::collections::BTreeMap; use std::sync::Arc; @@ -32,7 +32,7 @@ impl SPODistributionPublisher { Arc::new(Message::Cardano(( block.clone(), CardanoMessage::SPOStakeDistribution(SPOStakeDistributionMessage { - epoch: block.epoch-1, // End of previous epoch + epoch: block.epoch - 1, // End of previous epoch spos: spos.into_iter().collect(), }), ))), diff --git a/modules/accounts_state/src/state.rs b/modules/accounts_state/src/state.rs index 846aa5c..7623f23 100644 --- a/modules/accounts_state/src/state.rs +++ b/modules/accounts_state/src/state.rs @@ -1,27 +1,26 @@ //! Acropolis AccountsState: State storage +use crate::monetary::calculate_monetary_change; +use crate::rewards::{RewardsResult, RewardsState}; +use crate::snapshot::Snapshot; use acropolis_common::{ messages::{ DRepStateMessage, EpochActivityMessage, PotDeltasMessage, ProtocolParamsMessage, SPOStateMessage, StakeAddressDeltasMessage, TxCertificatesMessage, WithdrawalsMessage, }, - DelegatedStake, - DRepChoice, DRepCredential, InstantaneousRewardSource, InstantaneousRewardTarget, KeyHash, - Lovelace, MoveInstantaneousReward, PoolRegistration, Pot, ProtocolParams, - StakeAddress, StakeCredential, TxCertificate, + DRepChoice, DRepCredential, DelegatedStake, InstantaneousRewardSource, + InstantaneousRewardTarget, KeyHash, Lovelace, MoveInstantaneousReward, PoolRegistration, Pot, + ProtocolParams, StakeAddress, StakeCredential, TxCertificate, }; -use crate::snapshot::Snapshot; -use crate::rewards::{RewardsResult, RewardsState}; -use crate::monetary::calculate_monetary_change; use anyhow::{bail, Result}; use dashmap::DashMap; use imbl::OrdMap; -use std::collections::{HashMap, BTreeMap, HashSet}; use rayon::prelude::*; use serde_with::{hex::Hex, serde_as}; -use std::sync::{atomic::AtomicU64, Arc, Mutex}; -use tracing::{debug, error, info, warn}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::mem::take; +use std::sync::{atomic::AtomicU64, Arc, Mutex}; use tokio::task::{spawn_blocking, JoinHandle}; +use tracing::{debug, error, info, warn}; const DEFAULT_KEY_DEPOSIT: u64 = 2_000_000; const DEFAULT_POOL_DEPOSIT: u64 = 500_000_000; @@ -129,9 +128,12 @@ impl State { /// total_fees: Total fees taken in previous epoch /// spo_block_counts: Count of blocks minted by operator ID in previous epoch // Follows the general scheme in https://docs.cardano.org/about-cardano/learn/pledging-rewards - fn enter_epoch(&mut self, epoch: u64, total_fees: u64, - spo_block_counts: HashMap) -> Result<()> { - + fn enter_epoch( + &mut self, + epoch: u64, + total_fees: u64, + spo_block_counts: HashMap, + ) -> Result<()> { // TODO HACK! Investigate why this differs to our calculated reserves after AVVM // 13,887,515,255 - as we enter 208 (Shelley) if epoch == 208 { @@ -139,18 +141,23 @@ impl State { // https://github.com/cardano-foundation/cf-java-rewards-calculation/blob/b05eddf495af6dc12d96c49718f27c34fa2042b1/calculation/src/main/java/org/cardanofoundation/rewards/calculation/config/NetworkConfig.java#L45C57-L45C74 let old_reserves = self.pots.reserves; self.pots.reserves = 13_888_022_852_926_644; - warn!(new=self.pots.reserves, old=old_reserves, diff=self.pots.reserves-old_reserves, - "Fixed reserves"); + warn!( + new = self.pots.reserves, + old = old_reserves, + diff = self.pots.reserves - old_reserves, + "Fixed reserves" + ); } // Get Shelley parameters, silently return if too early in the chain so no // rewards to calculate let shelley_params = match &self.protocol_parameters { - Some(ProtocolParams { shelley: Some(sp), .. }) => sp, - _ => { - return Ok(()) - } - }.clone(); + Some(ProtocolParams { + shelley: Some(sp), .. + }) => sp, + _ => return Ok(()), + } + .clone(); // Filter the block counts for SPOs that are registered - treating any we don't know // as 'OBFT' style (the legacy nodes) @@ -166,9 +173,12 @@ impl State { // Update the reserves and treasury (monetary.rs) // TODO note using last-but-one epoch's fees for reward pot - why? - let monetary_change = calculate_monetary_change(&shelley_params, &self.pots, - self.rewards_state.mark.fees, - total_non_obft_blocks)?; + let monetary_change = calculate_monetary_change( + &shelley_params, + &self.pots, + self.rewards_state.mark.fees, + total_non_obft_blocks, + )?; self.pots = monetary_change.pots; // Pay the refunds and MIRs @@ -177,8 +187,14 @@ impl State { self.pay_mirs(); // Capture a new snapshot and push it to state - let snapshot = Snapshot::new(epoch, &self.stake_addresses.lock().unwrap(), - &self.spos, &spo_block_counts, &self.pots, total_fees); + let snapshot = Snapshot::new( + epoch, + &self.stake_addresses.lock().unwrap(), + &self.spos, + &spo_block_counts, + &self.pots, + total_fees, + ); self.rewards_state.push(snapshot); // Stop here if no blocks to pay out on @@ -189,7 +205,12 @@ impl State { let rs = self.rewards_state.clone(); self.epoch_rewards_task = Arc::new(Mutex::new(Some(spawn_blocking(move || { // Calculate reward payouts - rs.calculate_rewards(epoch, &shelley_params, total_blocks, monetary_change.stake_rewards) + rs.calculate_rewards( + epoch, + &shelley_params, + total_blocks, + monetary_change.stake_rewards, + ) })))); Ok(()) @@ -198,7 +219,8 @@ impl State { /// Pay pool refunds fn pay_pool_refunds(&mut self) { // Get pool deposit amount from parameters, or default - let deposit = self.protocol_parameters + let deposit = self + .protocol_parameters .as_ref() .and_then(|pp| pp.shelley.as_ref()) .map(|sp| sp.protocol_params.pool_deposit) @@ -206,8 +228,11 @@ impl State { let refunds = take(&mut self.pool_refunds); if !refunds.is_empty() { - info!("{} retiring SPOs, total refunds {}", refunds.len(), - (refunds.len() as u64) * deposit); + info!( + "{} retiring SPOs, total refunds {}", + refunds.len(), + (refunds.len() as u64) * deposit + ); } // Send them their deposits back @@ -217,13 +242,15 @@ impl State { let stake_addresses = self.stake_addresses.lock().unwrap(); match stake_addresses.get(&keyhash) { Some(sas) => sas.registered, - None => false + None => false, } } { self.add_to_reward(&keyhash, deposit); } else { - warn!("SPO reward account {} deregistered - paying refund to treasury", - hex::encode(keyhash)); + warn!( + "SPO reward account {} deregistered - paying refund to treasury", + hex::encode(keyhash) + ); self.pots.treasury += deposit; } @@ -235,8 +262,11 @@ impl State { fn pay_stake_refunds(&mut self) { let refunds = take(&mut self.stake_refunds); if !refunds.is_empty() { - info!("{} deregistered stake addresses, total refunds {}", refunds.len(), - refunds.iter().map(|(_, n)| n).sum::()); + info!( + "{} deregistered stake addresses, total refunds {}", + refunds.len(), + refunds.iter().map(|(_, n)| n).sum::() + ); } // Send them their deposits back @@ -289,8 +319,10 @@ impl State { let _ = Self::update_value_with_delta(&mut total_value, *value); } - info!("MIR of {total_value} to {} stake addresses from {source_name}", - deltas.len()); + info!( + "MIR of {total_value} to {} stake addresses from {source_name}", + deltas.len() + ); } InstantaneousRewardTarget::OtherAccountingPot(value) => { @@ -342,8 +374,7 @@ impl State { let sas_data: Vec<(KeyHash, (u64, u64))> = stake_addresses .values() .filter_map(|sas| { - sas.delegated_spo.as_ref() - .map(|spo| (spo.clone(), (sas.utxo_value, sas.rewards))) + sas.delegated_spo.as_ref().map(|spo| (spo.clone(), (sas.utxo_value, sas.rewards))) }) .collect(); @@ -353,13 +384,15 @@ impl State { .for_each_init( || Arc::clone(&spo_stakes), |map, (spo, (utxo_value, rewards))| { - map.entry(spo.clone()).and_modify(|v| { - v.active += *utxo_value; - v.live += *utxo_value + *rewards; - }).or_insert(DelegatedStake { - active: *utxo_value, - live: *utxo_value + *rewards - }); + map.entry(spo.clone()) + .and_modify(|v| { + v.active += *utxo_value; + v.live += *utxo_value + *rewards; + }) + .or_insert(DelegatedStake { + active: *utxo_value, + live: *utxo_value + *rewards, + }); }, ); @@ -377,13 +410,8 @@ impl State { .iter() .map(|(cred, deposit)| (cred.clone(), AtomicU64::new(*deposit))) .collect::>(); - self.stake_addresses - .lock() - .unwrap() - .values() - .collect::>() - .par_iter() - .for_each(|state| { + self.stake_addresses.lock().unwrap().values().collect::>().par_iter().for_each( + |state| { let Some(drep) = state.delegated_drep.clone() else { return; }; @@ -409,7 +437,8 @@ impl State { }; let stake = state.utxo_value + state.rewards; total.fetch_add(stake, std::sync::atomic::Ordering::Relaxed); - }); + }, + ); let abstain = abstain.load(std::sync::atomic::Ordering::Relaxed); let no_confidence = no_confidence.load(std::sync::atomic::Ordering::Relaxed); let dreps = dreps @@ -426,10 +455,9 @@ impl State { /// Handle an ProtocolParamsMessage with the latest parameters at the start of a new /// epoch pub fn handle_parameters(&mut self, params_msg: &ProtocolParamsMessage) -> Result<()> { - let different = match &self.protocol_parameters { Some(old_params) => old_params != ¶ms_msg.params, - None => true + None => true, }; if different { @@ -443,16 +471,13 @@ impl State { /// Handle an EpochActivityMessage giving total fees and block counts by VRF key for /// the just-ended epoch pub async fn handle_epoch_activity(&mut self, ea_msg: &EpochActivityMessage) -> Result<()> { - // Reverse map of VRF key to SPO operator ID - let vrf_to_operator: HashMap = self.spos - .iter() - .map(|(id, spo)| (spo.vrf_key_hash.clone(), id.clone())) - .collect(); + let vrf_to_operator: HashMap = + self.spos.iter().map(|(id, spo)| (spo.vrf_key_hash.clone(), id.clone())).collect(); // Create a map of operator ID to block count - let spo_block_counts: HashMap = - ea_msg.vrf_vkey_hashes + let spo_block_counts: HashMap = ea_msg + .vrf_vkey_hashes .iter() .filter_map(|(vrf, count)| { vrf_to_operator.get(vrf).map(|operator| (operator.clone(), *count)) @@ -466,7 +491,7 @@ impl State { Err(_) => { error!("Failed to lock epoch rewards task"); None - }, + } } }; // If rewards have been calculated, save the results @@ -485,33 +510,26 @@ impl State { } }; // Enter epoch - note the message specifies the epoch that has just *ended* - self.enter_epoch(ea_msg.epoch+1, ea_msg.total_fees, spo_block_counts) + self.enter_epoch(ea_msg.epoch + 1, ea_msg.total_fees, spo_block_counts) } /// Handle an SPOStateMessage with the full set of SPOs valid at the end of the last /// epoch pub fn handle_spo_state(&mut self, spo_msg: &SPOStateMessage) -> Result<()> { - // Capture current SPOs, mapped by operator ID - let mut new_spos: OrdMap = spo_msg - .spos - .iter() - .cloned() - .map(|spo| (spo.operator.clone(), spo)) - .collect(); + let mut new_spos: OrdMap = + spo_msg.spos.iter().cloned().map(|spo| (spo.operator.clone(), spo)).collect(); // Get pool deposit amount from parameters, or default - let deposit = self.protocol_parameters + let deposit = self + .protocol_parameters .as_ref() .and_then(|pp| pp.shelley.as_ref()) .map(|sp| sp.protocol_params.pool_deposit) .unwrap_or(DEFAULT_POOL_DEPOSIT); // Check for how many new SPOs - let new_count = new_spos - .keys() - .filter(|id| !self.spos.contains_key(*id)) - .count(); + let new_count = new_spos.keys().filter(|id| !self.spos.contains_key(*id)).count(); // They've each paid their deposit, so increment that (the UTXO spend is taken // care of in UTXOState) @@ -529,11 +547,14 @@ impl State { match StakeAddress::from_binary(&retired_spo.reward_account) { Ok(stake_address) => { let keyhash = stake_address.get_hash(); - debug!("SPO {} has retired - refunding their deposit to {}", - hex::encode(id), hex::encode(keyhash)); + debug!( + "SPO {} has retired - refunding their deposit to {}", + hex::encode(id), + hex::encode(keyhash) + ); self.pool_refunds.push(keyhash.to_vec()); } - Err(e) => error!("Error repaying SPO deposit: {e}") + Err(e) => error!("Error repaying SPO deposit: {e}"), } // Remove from our list @@ -547,15 +568,17 @@ impl State { } /// Register a stake address, with specified deposit if known - fn register_stake_address(&mut self, credential: &StakeCredential, - deposit: Option) { + fn register_stake_address(&mut self, credential: &StakeCredential, deposit: Option) { let hash = credential.get_hash(); // Stake addresses can be registered after being used in UTXOs let mut stake_addresses = self.stake_addresses.lock().unwrap(); let sas = stake_addresses.entry(hash.clone()).or_default(); if sas.registered { - error!("Stake address hash {} registered when already registered", hex::encode(&hash)); + error!( + "Stake address hash {} registered when already registered", + hex::encode(&hash) + ); } else { sas.registered = true; @@ -577,14 +600,12 @@ impl State { } /// Deregister a stake address, with specified refund if known - fn deregister_stake_address(&mut self, credential: &StakeCredential, - refund: Option) { + fn deregister_stake_address(&mut self, credential: &StakeCredential, refund: Option) { let hash = credential.get_hash(); // Check if it existed let mut stake_addresses = self.stake_addresses.lock().unwrap(); if let Some(sas) = stake_addresses.get_mut(&hash) { - if sas.registered { // Account for the deposit, if registered before let deposit = match refund { @@ -601,10 +622,16 @@ impl State { self.pots.deposits -= deposit; sas.registered = false; } else { - error!("Deregistration of unregistered stake address hash {}", hex::encode(hash)); + error!( + "Deregistration of unregistered stake address hash {}", + hex::encode(hash) + ); } } else { - error!("Deregistration of unknown stake address hash {}", hex::encode(hash)); + error!( + "Deregistration of unknown stake address hash {}", + hex::encode(hash) + ); } } @@ -622,10 +649,16 @@ impl State { if sas.registered { sas.delegated_spo = Some(spo.clone()); } else { - error!("Unregistered stake address in stake delegation: {}", hex::encode(hash)); + error!( + "Unregistered stake address in stake delegation: {}", + hex::encode(hash) + ); } } else { - error!("Unknown stake address in stake delegation: {}", hex::encode(hash)); + error!( + "Unknown stake address in stake delegation: {}", + hex::encode(hash) + ); } } @@ -659,10 +692,16 @@ impl State { if sas.registered { sas.delegated_drep = Some(drep.clone()); } else { - error!("Unregistered stake address in DRep delegation: {}", hex::encode(hash)); + error!( + "Unregistered stake address in DRep delegation: {}", + hex::encode(hash) + ); } } else { - error!("Unknown stake address in stake delegation: {}", hex::encode(hash)); + error!( + "Unknown stake address in stake delegation: {}", + hex::encode(hash) + ); } } @@ -735,15 +774,17 @@ impl State { // Get old stake address state - which must exist let mut stake_addresses = self.stake_addresses.lock().unwrap(); if let Some(sas) = stake_addresses.get(hash) { - // Zero withdrawals are expected, as a way to validate stake addresses (per Pi) if withdrawal.value != 0 { let mut sas = sas.clone(); - if let Err(e) = Self::update_value_with_delta(&mut sas.rewards, - -(withdrawal.value as i64)) { - error!("Withdrawing from stake address {} hash {}: {e}", - withdrawal.address.to_string().unwrap_or("???".to_string()), - hex::encode(hash)); + if let Err(e) = + Self::update_value_with_delta(&mut sas.rewards, -(withdrawal.value as i64)) + { + error!( + "Withdrawing from stake address {} hash {}: {e}", + withdrawal.address.to_string().unwrap_or("???".to_string()), + hex::encode(hash) + ); continue; } else { // Update the stake address @@ -751,8 +792,10 @@ impl State { } } } else { - error!("Unknown stake address in withdrawal: {}", - withdrawal.address.to_string().unwrap_or("???".to_string())); + error!( + "Unknown stake address in withdrawal: {}", + withdrawal.address.to_string().unwrap_or("???".to_string()) + ); } } @@ -771,7 +814,10 @@ impl State { if let Err(e) = Self::update_value_with_delta(pot, pot_delta.delta) { error!("Applying pot delta {pot_delta:?}: {e}"); } else { - info!("Pot delta for {:?} {} => {}", pot_delta.pot, pot_delta.delta, *pot); + info!( + "Pot delta for {:?} {} => {}", + pot_delta.pot, pot_delta.delta, *pot + ); } } @@ -881,34 +927,42 @@ mod tests { let spo2: KeyHash = vec![0x02]; // Create the SPOs - state.handle_spo_state(&SPOStateMessage { - epoch: 1, - spos: vec![ - PoolRegistration { - operator: spo1.clone(), - vrf_key_hash: spo1.clone(), - pledge: 26, - cost: 0, - margin: Ratio { numerator: 1, denominator: 20 }, - reward_account: Vec::new(), - pool_owners: Vec::new(), - relays: Vec::new(), - pool_metadata: None - }, - PoolRegistration { - operator: spo2.clone(), - vrf_key_hash: spo2.clone(), - pledge: 47, - cost: 10, - margin: Ratio { numerator: 1, denominator: 10 }, - reward_account: Vec::new(), - pool_owners: Vec::new(), - relays: Vec::new(), - pool_metadata: None - }, - ], - retired_spos: vec![], - }).unwrap(); + state + .handle_spo_state(&SPOStateMessage { + epoch: 1, + spos: vec![ + PoolRegistration { + operator: spo1.clone(), + vrf_key_hash: spo1.clone(), + pledge: 26, + cost: 0, + margin: Ratio { + numerator: 1, + denominator: 20, + }, + reward_account: Vec::new(), + pool_owners: Vec::new(), + relays: Vec::new(), + pool_metadata: None, + }, + PoolRegistration { + operator: spo2.clone(), + vrf_key_hash: spo2.clone(), + pledge: 47, + cost: 10, + margin: Ratio { + numerator: 1, + denominator: 10, + }, + reward_account: Vec::new(), + pool_owners: Vec::new(), + relays: Vec::new(), + pool_metadata: None, + }, + ], + retired_spos: vec![], + }) + .unwrap(); // Delegate let addr1: KeyHash = vec![0x11]; @@ -1059,7 +1113,7 @@ mod tests { state.pay_mirs(); assert_eq!(state.pots.reserves, 58); assert_eq!(state.pots.treasury, 0); - assert_eq!(state.pots.deposits, 2_000_000); // Paid deposit + assert_eq!(state.pots.deposits, 2_000_000); // Paid deposit let stake_addresses = state.stake_addresses.lock().unwrap(); let sas = stake_addresses.get(&STAKE_KEY_HASH.to_vec()).unwrap(); diff --git a/modules/block_unpacker/src/block_unpacker.rs b/modules/block_unpacker/src/block_unpacker.rs index 56b4c27..f3b51a5 100644 --- a/modules/block_unpacker/src/block_unpacker.rs +++ b/modules/block_unpacker/src/block_unpacker.rs @@ -7,7 +7,7 @@ use caryatid_sdk::{module, Context, Module}; use config::Config; use pallas::ledger::traverse::MultiEraBlock; use std::sync::Arc; -use tracing::{debug, error, info, Instrument, info_span}; +use tracing::{debug, error, info, info_span, Instrument}; const DEFAULT_SUBSCRIBE_TOPIC: &str = "cardano.block.body"; const DEFAULT_PUBLISH_TOPIC: &str = "cardano.txs"; @@ -72,7 +72,9 @@ impl BlockUnpacker { .publish(&publish_topic, Arc::new(message_enum)) .await .unwrap_or_else(|e| error!("Failed to publish: {e}")); - }.instrument(span).await; + } + .instrument(span) + .await; } Err(e) => error!("Can't decode block {}: {e}", block_info.number), diff --git a/modules/drep_state/src/drep_state.rs b/modules/drep_state/src/drep_state.rs index a197374..7b201ae 100644 --- a/modules/drep_state/src/drep_state.rs +++ b/modules/drep_state/src/drep_state.rs @@ -77,7 +77,9 @@ impl DRepState { .await .unwrap_or_else(|e| error!("Failed to publish: {e}")); } - }.instrument(span).await; + } + .instrument(span) + .await; } _ => error!("Unexpected message type: {message:?}"), @@ -143,7 +145,9 @@ impl DRepState { .await .inspect_err(|e| error!("Tick error: {e}")) .ok(); - }.instrument(span).await; + } + .instrument(span) + .await; } } } diff --git a/modules/epoch_activity_counter/src/epoch_activity_counter.rs b/modules/epoch_activity_counter/src/epoch_activity_counter.rs index df86005..424aeac 100644 --- a/modules/epoch_activity_counter/src/epoch_activity_counter.rs +++ b/modules/epoch_activity_counter/src/epoch_activity_counter.rs @@ -57,7 +57,10 @@ impl EpochActivityCounter { let (_, message) = headers_message_f.await?; match message.as_ref() { Message::Cardano((block, CardanoMessage::BlockHeader(header_msg))) => { - let span = info_span!("epoch_activity_counter.handle_block_header", block = block.number); + let span = info_span!( + "epoch_activity_counter.handle_block_header", + block = block.number + ); async { // End of epoch? if block.new_epoch && block.epoch > 0 { @@ -93,7 +96,9 @@ impl EpochActivityCounter { Err(e) => error!("Can't decode header {}: {e}", block.slot), } - }.instrument(span).await; + } + .instrument(span) + .await; } _ => error!("Unexpected message type: {message:?}"), @@ -103,11 +108,16 @@ impl EpochActivityCounter { let (_, message) = fees_message_f.await?; match message.as_ref() { Message::Cardano((block, CardanoMessage::BlockFees(fees_msg))) => { - let span = info_span!("epoch_activity_counter.handle_block_fees", block = block.number); + let span = info_span!( + "epoch_activity_counter.handle_block_fees", + block = block.number + ); async { let mut state = state.lock().await; state.handle_fees(&block, fees_msg.total_fees); - }.instrument(span).await; + } + .instrument(span) + .await; } _ => error!("Unexpected message type: {message:?}"), diff --git a/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs b/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs index 77055c8..f0c1d92 100644 --- a/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs +++ b/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs @@ -67,9 +67,8 @@ impl GenesisBootstrapper { .unwrap_or(DEFAULT_COMPLETION_TOPIC.to_string()); info!("Completing with '{completion_topic}'"); - let network_name = config - .get_string("network-name") - .unwrap_or(DEFAULT_NETWORK_NAME.to_string()); + let network_name = + config.get_string("network-name").unwrap_or(DEFAULT_NETWORK_NAME.to_string()); let genesis = match network_name.as_ref() { "mainnet" => MAINNET_BYRON_GENESIS, @@ -82,8 +81,8 @@ impl GenesisBootstrapper { info!("Reading genesis for '{network_name}'"); // Read genesis data - let genesis: byron::GenesisFile = serde_json::from_slice(genesis) - .expect("Invalid JSON in BYRON_GENESIS file"); + let genesis: byron::GenesisFile = + serde_json::from_slice(genesis).expect("Invalid JSON in BYRON_GENESIS file"); // Construct messages let block_info = BlockInfo { @@ -156,7 +155,9 @@ impl GenesisBootstrapper { .publish(&completion_topic, Arc::new(message_enum)) .await .unwrap_or_else(|e| error!("Failed to publish: {e}")); - }.instrument(span).await; + } + .instrument(span) + .await; }); Ok(()) diff --git a/modules/governance_state/src/governance_state.rs b/modules/governance_state/src/governance_state.rs index 8e16d48..e7f35dc 100644 --- a/modules/governance_state/src/governance_state.rs +++ b/modules/governance_state/src/governance_state.rs @@ -152,7 +152,9 @@ impl GovernanceState { .await .inspect_err(|e| error!("Tick error: {e}")) .ok(); - }.instrument(span).await; + } + .instrument(span) + .await; } } } diff --git a/modules/governance_state/src/state.rs b/modules/governance_state/src/state.rs index c0740ab..bf53ccf 100644 --- a/modules/governance_state/src/state.rs +++ b/modules/governance_state/src/state.rs @@ -1,18 +1,16 @@ //! Acropolis Governance State: State storage +use crate::VotingRegistrationState; use acropolis_common::{ messages::{ - CardanoMessage, DRepStakeDistributionMessage, SPOStakeDistributionMessage, - GovernanceOutcomesMessage, - GovernanceProceduresMessage, Message, ProtocolParamsMessage, + CardanoMessage, DRepStakeDistributionMessage, GovernanceOutcomesMessage, + GovernanceProceduresMessage, Message, ProtocolParamsMessage, SPOStakeDistributionMessage, }, - BlockInfo, ConwayParams, DelegatedStake, - DRepCredential, DataHash, EnactStateElem, Era, GovActionId, - GovernanceAction, GovernanceOutcome, GovernanceOutcomeVariant, KeyHash, Lovelace, - ProposalProcedure, SingleVoterVotes, - TreasuryWithdrawalsAction, Voter, VotesCount, VotingOutcome, VotingProcedure, + BlockInfo, ConwayParams, DRepCredential, DataHash, DelegatedStake, EnactStateElem, Era, + GovActionId, GovernanceAction, GovernanceOutcome, GovernanceOutcomeVariant, KeyHash, Lovelace, + ProposalProcedure, SingleVoterVotes, TreasuryWithdrawalsAction, Voter, VotesCount, + VotingOutcome, VotingProcedure, }; -use crate::VotingRegistrationState; use anyhow::{anyhow, bail, Result}; use caryatid_sdk::Context; use hex::ToHex; @@ -78,7 +76,7 @@ impl State { pub async fn handle_drep_stake( &mut self, drep_message: &DRepStakeDistributionMessage, - spo_message: &SPOStakeDistributionMessage + spo_message: &SPOStakeDistributionMessage, ) -> Result<()> { self.drep_stake_messages_count += 1; self.drep_stake = HashMap::from_iter(drep_message.dreps.iter().cloned()); @@ -94,12 +92,12 @@ impl State { governance_message: &GovernanceProceduresMessage, ) -> Result<()> { if block.era < Era::Conway { - if !(governance_message.proposal_procedures.is_empty() && - governance_message.voting_procedures.is_empty()) + if !(governance_message.proposal_procedures.is_empty() + && governance_message.voting_procedures.is_empty()) { bail!("Non-empty governance message for pre-conway block {block:?}"); } - return Ok(()) + return Ok(()); } for pproc in &governance_message.proposal_procedures { @@ -176,7 +174,11 @@ impl State { } /// Checks whether action_id can be considered finally accepted - fn is_finally_accepted(&self, voting_state: &VotingRegistrationState, action_id: &GovActionId) -> Result { + fn is_finally_accepted( + &self, + voting_state: &VotingRegistrationState, + action_id: &GovActionId, + ) -> Result { let (_epoch, proposal) = self .proposals .get(action_id) @@ -295,24 +297,29 @@ impl State { } fn recalculate_voting_state(&self) -> Result { - let drep_stake = self.drep_stake.iter().map(|(_dr,lov)| lov).sum(); + let drep_stake = self.drep_stake.iter().map(|(_dr, lov)| lov).sum(); let committee_usize = self.get_conway_params()?.committee.members.len(); - let committee = committee_usize.try_into().or_else( - |e| Err(anyhow!("Commitee size: conversion usize -> u64 failed, {e}")) - )?; + let committee = committee_usize.try_into().or_else(|e| { + Err(anyhow!( + "Commitee size: conversion usize -> u64 failed, {e}" + )) + })?; - let spo_stake = self.spo_stake.iter().map(|(_sp,ds)| ds.live).sum(); + let spo_stake = self.spo_stake.iter().map(|(_sp, ds)| ds.live).sum(); - Ok(VotingRegistrationState::new(spo_stake, spo_stake, drep_stake, committee)) + Ok(VotingRegistrationState::new( + spo_stake, spo_stake, drep_stake, committee, + )) } /// Loops through all actions and checks their status for the new_epoch /// All incoming data (parameters for the epoch, drep distribution, etc) /// should already be actual at this moment. - pub fn process_new_epoch(&mut self, new_block: &BlockInfo) - -> Result - { + pub fn process_new_epoch( + &mut self, + new_block: &BlockInfo, + ) -> Result { let mut output = GovernanceOutcomesMessage::default(); if self.current_era < Era::Conway { // Processes new epoch acts on old events. @@ -361,7 +368,10 @@ impl State { info!( "Epoch {} ({}): {}, total {} actions, {ens} enacts, {wdr} withdrawals, {rej} rejected", - voting_state, new_block.epoch, new_block.era, output.outcomes.len() + voting_state, + new_block.epoch, + new_block.era, + output.outcomes.len() ); return Ok(output); } diff --git a/modules/governance_state/src/voting_state.rs b/modules/governance_state/src/voting_state.rs index c43bbcf..4675417 100644 --- a/modules/governance_state/src/voting_state.rs +++ b/modules/governance_state/src/voting_state.rs @@ -1,19 +1,18 @@ use acropolis_common::{ - GovernanceAction, ConwayParams, VotesCount, - ProtocolParamType, ProposalProcedure, ProtocolParamUpdate, - rational_number::RationalNumber + rational_number::RationalNumber, ConwayParams, GovernanceAction, ProposalProcedure, + ProtocolParamType, ProtocolParamUpdate, VotesCount, }; -use std::{cmp::max, fmt}; use anyhow::Result; +use std::{cmp::max, fmt}; #[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)] pub struct VotingRegistrationState { /// Total stake in all SPOs. This parameter is used for Hard Fork initiation voting, /// see CIP-1694: - /// ... The SPO vote threshold which must be met as a certain threshold of the total - /// active voting stake, excepting Hard Fork Governance Actions. Due to the need for - /// robust consensus around Hard Fork initiations, these votes must be met as a percentage - /// of the stake held by all stake pools. + /// ... The SPO vote threshold which must be met as a certain threshold of the total + /// active voting stake, excepting Hard Fork Governance Actions. Due to the need for + /// robust consensus around Hard Fork initiations, these votes must be met as a percentage + /// of the stake held by all stake pools. total_spos: u64, /// Total stake in active voting SPOs stake @@ -29,7 +28,9 @@ pub struct VotingRegistrationState { impl fmt::Display for VotingRegistrationState { fn fmt(&self, res: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - write!(res, "spos total {}/reg. {}, dreps {}, committee {}", + write!( + res, + "spos total {}/reg. {}, dreps {}, committee {}", self.total_spos, self.registered_spos, self.registered_dreps, self.committee_size ) } @@ -48,9 +49,17 @@ impl VotingRegistrationState { } pub fn new( - total_spos: u64, registered_spos: u64, registered_dreps: u64, committee_size: u64 + total_spos: u64, + registered_spos: u64, + registered_dreps: u64, + committee_size: u64, ) -> Self { - Self { total_spos, registered_spos, registered_dreps, committee_size } + Self { + total_spos, + registered_spos, + registered_dreps, + committee_size, + } } fn proportional_count_drep_comm( diff --git a/modules/parameters_state/build.rs b/modules/parameters_state/build.rs index be482e6..b6bb61d 100644 --- a/modules/parameters_state/build.rs +++ b/modules/parameters_state/build.rs @@ -7,13 +7,13 @@ use std::path::Path; const OUTPUT_DIR: &str = "downloads"; /// Download a URL to a file in OUTPUT_DIR -fn download(url_base: &str, epoch: &str, filename: &str, rename: &Vec<(&str,&str)>) { +fn download(url_base: &str, epoch: &str, filename: &str, rename: &Vec<(&str, &str)>) { let url = format!("{}/{}-genesis.json", url_base, epoch); let response = get(url).expect("Failed to fetch {url}"); let mut data = response.text().expect("Failed to read response"); - for (what,with) in rename.iter() { - data = data.replace(&format!("\"{what}\""),&format!("\"{with}\"")); + for (what, with) in rename.iter() { + data = data.replace(&format!("\"{what}\""), &format!("\"{with}\"")); } let output_path = Path::new(OUTPUT_DIR); @@ -30,19 +30,30 @@ fn main() { println!("cargo:rerun-if-changed=build.rs"); // Ensure the script runs if modified let shelley_fix = vec![ - ("slotsPerKESPeriod","slotsPerKesPeriod"),("maxKESEvolutions","maxKesEvolutions") + ("slotsPerKESPeriod", "slotsPerKesPeriod"), + ("maxKESEvolutions", "maxKesEvolutions"), ]; let main = "https://book.world.dev.cardano.org/environments/mainnet"; download(main, "byron", "mainnet-byron-genesis.json", &vec![]); - download(main, "shelley", "mainnet-shelley-genesis.json", &shelley_fix); + download( + main, + "shelley", + "mainnet-shelley-genesis.json", + &shelley_fix, + ); download(main, "alonzo", "mainnet-alonzo-genesis.json", &vec![]); download(main, "conway", "mainnet-conway-genesis.json", &vec![]); - let sancho = + let sancho = "https://raw.githubusercontent.com/Hornan7/SanchoNet-Tutorials/refs/heads/main/genesis"; download(sancho, "byron", "sanchonet-byron-genesis.json", &vec![]); - download(sancho, "shelley", "sanchonet-shelley-genesis.json", &shelley_fix); + download( + sancho, + "shelley", + "sanchonet-shelley-genesis.json", + &shelley_fix, + ); download(sancho, "alonzo", "sanchonet-alonzo-genesis.json", &vec![]); download(sancho, "conway", "sanchonet-conway-genesis.json", &vec![]); } diff --git a/modules/parameters_state/src/alonzo_genesis.rs b/modules/parameters_state/src/alonzo_genesis.rs index 03dbe4c..694bbc7 100644 --- a/modules/parameters_state/src/alonzo_genesis.rs +++ b/modules/parameters_state/src/alonzo_genesis.rs @@ -3,8 +3,8 @@ //! is incompatible with SanchoNet genesis) use acropolis_common::{ - rational_number::{RationalNumber, rational_number_from_f32}, - AlonzoParams, ExUnits, ExUnitPrices + rational_number::{rational_number_from_f32, RationalNumber}, + AlonzoParams, ExUnitPrices, ExUnits, }; use anyhow::{bail, Result}; use serde::Deserialize; @@ -14,18 +14,19 @@ use std::collections::HashMap; #[serde(untagged)] pub enum CostModel { Map(HashMap), - Vector(Vec) + Vector(Vec), } impl CostModel { pub fn to_vec(&self) -> Vec { match self { CostModel::Map(hm) => { - let mut keys = hm.iter().map(|(k,v)| (k.as_str(),*v)).collect::>(); + let mut keys = + hm.iter().map(|(k, v)| (k.as_str(), *v)).collect::>(); keys.sort(); - keys.into_iter().map(|(_,n)| n).collect::>() + keys.into_iter().map(|(_, n)| n).collect::>() } - CostModel::Vector(v) => v.clone() + CostModel::Vector(v) => v.clone(), } } } @@ -42,7 +43,7 @@ pub struct CostModelPerLanguage(HashMap); impl CostModelPerLanguage { fn get_plutus_v1(&self) -> Result>> { let mut res = None; - for (k,v) in self.0.iter() { + for (k, v) in self.0.iter() { if *k != Language::PlutusV1 { bail!("Only PlutusV1 language cost model is allowed in Alonzo Genesis!") } @@ -56,15 +57,17 @@ impl CostModelPerLanguage { #[serde(untagged)] pub enum AlonzoFraction { Float(f32), - Fraction {numerator: u64, denominator: u64} + Fraction { numerator: u64, denominator: u64 }, } impl AlonzoFraction { fn get_rational(&self) -> Result { match self { - AlonzoFraction::Fraction { numerator: n, denominator: d } => - Ok(RationalNumber::new(*n, *d)), - AlonzoFraction::Float(v) => rational_number_from_f32(*v) + AlonzoFraction::Fraction { + numerator: n, + denominator: d, + } => Ok(RationalNumber::new(*n, *d)), + AlonzoFraction::Float(v) => rational_number_from_f32(*v), } } } diff --git a/modules/parameters_state/src/genesis_params.rs b/modules/parameters_state/src/genesis_params.rs index 18e2c00..63a5f52 100644 --- a/modules/parameters_state/src/genesis_params.rs +++ b/modules/parameters_state/src/genesis_params.rs @@ -1,27 +1,58 @@ +use crate::alonzo_genesis; use acropolis_common::{ - rational_number::{RationalNumber, rational_number_from_f32}, - AlonzoParams, Anchor, BlockVersionData, ByronParams, - Committee, Constitution, ConwayParams, Credential, DRepVotingThresholds, Era, - NetworkId, Nonce, NonceVariant, PoolVotingThresholds, ProtocolConsts, ProtocolVersion, - ShelleyParams, ShelleyProtocolParams, SoftForkRule, TxFeePolicy, + rational_number::{rational_number_from_f32, RationalNumber}, + AlonzoParams, Anchor, BlockVersionData, ByronParams, Committee, Constitution, ConwayParams, + Credential, DRepVotingThresholds, Era, NetworkId, Nonce, NonceVariant, PoolVotingThresholds, + ProtocolConsts, ProtocolVersion, ShelleyParams, ShelleyProtocolParams, SoftForkRule, + TxFeePolicy, }; use anyhow::{anyhow, bail, Result}; use hex::decode; use pallas::ledger::{configs::*, primitives}; use serde::Deserialize; -use crate::alonzo_genesis; use std::collections::HashMap; const PREDEFINED_GENESIS: [(&str, Era, &[u8]); 8] = [ - ("sanchonet", Era::Byron, include_bytes!("../downloads/sanchonet-byron-genesis.json")), - ("sanchonet", Era::Shelley, include_bytes!("../downloads/sanchonet-shelley-genesis.json")), - ("sanchonet", Era::Alonzo, include_bytes!("../downloads/sanchonet-alonzo-genesis.json")), - ("sanchonet", Era::Conway, include_bytes!("../downloads/sanchonet-conway-genesis.json")), - - ("mainnet", Era::Byron, include_bytes!("../downloads/mainnet-byron-genesis.json")), - ("mainnet", Era::Shelley, include_bytes!("../downloads/mainnet-shelley-genesis.json")), - ("mainnet", Era::Alonzo, include_bytes!("../downloads/mainnet-alonzo-genesis.json")), - ("mainnet", Era::Conway, include_bytes!("../downloads/mainnet-conway-genesis.json")) + ( + "sanchonet", + Era::Byron, + include_bytes!("../downloads/sanchonet-byron-genesis.json"), + ), + ( + "sanchonet", + Era::Shelley, + include_bytes!("../downloads/sanchonet-shelley-genesis.json"), + ), + ( + "sanchonet", + Era::Alonzo, + include_bytes!("../downloads/sanchonet-alonzo-genesis.json"), + ), + ( + "sanchonet", + Era::Conway, + include_bytes!("../downloads/sanchonet-conway-genesis.json"), + ), + ( + "mainnet", + Era::Byron, + include_bytes!("../downloads/mainnet-byron-genesis.json"), + ), + ( + "mainnet", + Era::Shelley, + include_bytes!("../downloads/mainnet-shelley-genesis.json"), + ), + ( + "mainnet", + Era::Alonzo, + include_bytes!("../downloads/mainnet-alonzo-genesis.json"), + ), + ( + "mainnet", + Era::Conway, + include_bytes!("../downloads/mainnet-conway-genesis.json"), + ), ]; fn decode_hex_string(s: &str, len: usize) -> Result> { @@ -160,9 +191,7 @@ fn map_shelley_protocol_params(p: &shelley::ProtocolParams) -> Result(p: &Option, n: &str) -> Result { - p.as_ref().ok_or_else( - || anyhow!("Empty parameter {n}, invalidating shelley genesis") - ).cloned() + p.as_ref().ok_or_else(|| anyhow!("Empty parameter {n}, invalidating shelley genesis")).cloned() } fn map_shelley(genesis: &shelley::GenesisFile) -> Result { @@ -172,7 +201,8 @@ fn map_shelley(genesis: &shelley::GenesisFile) -> Result { max_kes_evolutions: unw(&genesis.max_kes_evolutions, "max_kes_evolutions")?, max_lovelace_supply: unw(&genesis.max_lovelace_supply, "max_lovelace_supply")?, network_id: unw( - &genesis.network_id.as_deref().map(map_network_id).transpose()?, "network_id" + &genesis.network_id.as_deref().map(map_network_id).transpose()?, + "network_id", )?, network_magic: unw(&genesis.network_magic, "network_magic")?, protocol_params: map_shelley_protocol_params(&genesis.protocol_params)?, @@ -180,7 +210,8 @@ fn map_shelley(genesis: &shelley::GenesisFile) -> Result { slot_length: unw(&genesis.slot_length, "slot_length")?, slots_per_kes_period: unw(&genesis.slots_per_kes_period, "slots_per_kes_period")?, system_start: unw( - &genesis.system_start.as_ref().map(|s| s.parse()).transpose()?, "system_start" + &genesis.system_start.as_ref().map(|s| s.parse()).transpose()?, + "system_start", )?, update_quorum: unw(&genesis.update_quorum, "update_quorum")?, }) @@ -231,35 +262,38 @@ fn map_byron(genesis: &byron::GenesisFile) -> Result { } fn read_pdef_genesis<'a, PallasStruct: Deserialize<'a>, OurStruct>( - network: &str, era: Era, map: impl Fn(&PallasStruct) -> Result + network: &str, + era: Era, + map: impl Fn(&PallasStruct) -> Result, ) -> Result { - let (_net,_era,genesis) = match PREDEFINED_GENESIS.iter().find( - |(n,e,_g)| *n == network && *e == era - ) { - Some(eg) => eg, - None => bail!("Genesis for {era} not defined"), - }; + let (_net, _era, genesis) = + match PREDEFINED_GENESIS.iter().find(|(n, e, _g)| *n == network && *e == era) { + Some(eg) => eg, + None => bail!("Genesis for {era} not defined"), + }; match &serde_json::from_slice(genesis) { Ok(decoded) => map(decoded), - Err(e) => bail!("Cannot read JSON for {network} {era} genesis: {e}") + Err(e) => bail!("Cannot read JSON for {network} {era} genesis: {e}"), } } pub fn read_byron_genesis(network: &str) -> Result { - read_pdef_genesis:: (network, Era::Byron, map_byron) + read_pdef_genesis::(network, Era::Byron, map_byron) } pub fn read_shelley_genesis(network: &str) -> Result { - read_pdef_genesis:: (network, Era::Shelley, map_shelley) + read_pdef_genesis::(network, Era::Shelley, map_shelley) } pub fn read_alonzo_genesis(network: &str) -> Result { - read_pdef_genesis:: ( - network, Era::Alonzo, alonzo_genesis::map_alonzo + read_pdef_genesis::( + network, + Era::Alonzo, + alonzo_genesis::map_alonzo, ) } pub fn read_conway_genesis(network: &str) -> Result { - read_pdef_genesis:: (network, Era::Conway, map_conway) + read_pdef_genesis::(network, Era::Conway, map_conway) } diff --git a/modules/parameters_state/src/parameters_state.rs b/modules/parameters_state/src/parameters_state.rs index 50ab94f..8b6a63e 100644 --- a/modules/parameters_state/src/parameters_state.rs +++ b/modules/parameters_state/src/parameters_state.rs @@ -13,11 +13,11 @@ use std::sync::Arc; use tokio::sync::Mutex; use tracing::{error, info, info_span, Instrument}; +mod alonzo_genesis; mod genesis_params; mod parameters_updater; mod rest; mod state; -mod alonzo_genesis; use parameters_updater::ParametersUpdater; use rest::handle_current; @@ -118,7 +118,9 @@ impl ParametersState { let new_params = locked.handle_enact_state(&block, &gov).await?; Self::publish_update(&config, &block, new_params)?; Ok::<(), anyhow::Error>(()) - }.instrument(span).await?; + } + .instrument(span) + .await?; } msg => error!("Unexpected message {msg:?} for enact state topic"), } diff --git a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs index 06ba02b..b9e61b2 100644 --- a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs +++ b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs @@ -52,7 +52,9 @@ impl SnapshotBootstrapper { .publish(&snapshot_topic, Arc::new(spo_state_message)) .await .unwrap_or_else(|e| error!("failed to publish: {e}")); - }.instrument(span).await; + } + .instrument(span) + .await; }); Ok(()) diff --git a/modules/spo_state/src/spo_state.rs b/modules/spo_state/src/spo_state.rs index 87145b9..afeec72 100644 --- a/modules/spo_state/src/spo_state.rs +++ b/modules/spo_state/src/spo_state.rs @@ -135,7 +135,9 @@ impl SPOState { .await .unwrap_or_else(|e| error!("Failed to publish: {e}")); } - }.instrument(span).await; + } + .instrument(span) + .await; } _ => error!("Unexpected message type: {message:?}"), } @@ -173,7 +175,9 @@ impl SPOState { .await .inspect_err(|e| error!("Tick error: {e}")) .ok(); - }.instrument(span).await; + } + .instrument(span) + .await; } } } diff --git a/modules/spo_state/src/state.rs b/modules/spo_state/src/state.rs index 36cab12..6a140e2 100644 --- a/modules/spo_state/src/state.rs +++ b/modules/spo_state/src/state.rs @@ -12,7 +12,7 @@ use imbl::HashMap; use serde_with::{hex::Hex, serde_as}; use std::collections::{BTreeMap, VecDeque}; use std::sync::Arc; -use tracing::{debug, info, error}; +use tracing::{debug, error, info}; #[serde_as] #[derive(Debug, Clone, serde::Serialize)] @@ -185,8 +185,8 @@ impl State { pub fn handle_tx_certs( &mut self, block: &BlockInfo, - tx_certs_msg: &TxCertificatesMessage) -> Result>> { - + tx_certs_msg: &TxCertificatesMessage, + ) -> Result>> { let mut message: Option> = None; let mut current = self.get_previous_state(block.number); current.block = block.number; @@ -213,7 +213,7 @@ impl State { "Retirement requested for unregistered SPO {}", hex::encode(&dr) ), - _ => retired_spos.push(dr.clone()) + _ => retired_spos.push(dr.clone()), }; } } @@ -234,24 +234,34 @@ impl State { for tx_cert in tx_certs_msg.certificates.iter() { match tx_cert { TxCertificate::PoolRegistration(reg) => { - debug!(block=block.number, "Registering SPO {}", hex::encode(®.operator)); + debug!( + block = block.number, + "Registering SPO {}", + hex::encode(®.operator) + ); current.spos.insert(reg.operator.clone(), reg.clone()); // Remove any existing queued deregistrations - for (epoch, deregistrations) in - &mut current.pending_deregistrations.iter_mut() + for (epoch, deregistrations) in &mut current.pending_deregistrations.iter_mut() { let old_len = deregistrations.len(); deregistrations.retain(|d| *d != reg.operator); if deregistrations.len() != old_len { - debug!("Removed pending deregistration of SPO {} from epoch {}", - hex::encode(®.operator), epoch); + debug!( + "Removed pending deregistration of SPO {} from epoch {}", + hex::encode(®.operator), + epoch + ); } } } TxCertificate::PoolRetirement(ret) => { - debug!("SPO {} wants to retire at the end of epoch {} (cert in block number {})", - hex::encode(&ret.operator), ret.epoch, block.number); + debug!( + "SPO {} wants to retire at the end of epoch {} (cert in block number {})", + hex::encode(&ret.operator), + ret.epoch, + block.number + ); if ret.epoch <= current.epoch { error!( "SPO retirement received for current or past epoch {} for SPO {}", @@ -269,8 +279,11 @@ impl State { let old_len = deregistrations.len(); deregistrations.retain(|d| *d != ret.operator); if deregistrations.len() != old_len { - debug!("Replaced pending deregistration of SPO {} from epoch {}", - hex::encode(&ret.operator), epoch); + debug!( + "Replaced pending deregistration of SPO {} from epoch {}", + hex::encode(&ret.operator), + epoch + ); } } current @@ -500,7 +513,7 @@ pub mod tests { assert!(state.handle_tx_certs(&block, &msg).is_ok()); let msg = new_msg(); block.number = 2; - block.epoch = 1; // SPO get retired at the start of the epoch it requests + block.epoch = 1; // SPO get retired at the start of the epoch it requests assert!(state.handle_tx_certs(&block, &msg).is_ok()); let current = state.current(); assert!(!current.is_none()); diff --git a/modules/stake_delta_filter/src/stake_delta_filter.rs b/modules/stake_delta_filter/src/stake_delta_filter.rs index 32c90ab..199647b 100644 --- a/modules/stake_delta_filter/src/stake_delta_filter.rs +++ b/modules/stake_delta_filter/src/stake_delta_filter.rs @@ -170,14 +170,19 @@ impl StakeDeltaFilter { }; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::AddressDeltas(delta))) => { - let span = info_span!("stake_delta_filter_stateless.handle_deltas", block = block_info.number); + let span = info_span!( + "stake_delta_filter_stateless.handle_deltas", + block = block_info.number + ); async { let msg = process_message(&cache, &delta, &block_info, None); publisher .publish(&block_info, msg) .await .unwrap_or_else(|e| error!("Publish error: {e}")) - }.instrument(span).await; + } + .instrument(span) + .await; } msg => error!( @@ -208,7 +213,10 @@ impl StakeDeltaFilter { }; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::TxCertificates(tx_cert_msg))) => { - let span = info_span!("stake_delta_filter.handle_certs", block = block_info.number); + let span = info_span!( + "stake_delta_filter.handle_certs", + block = block_info.number + ); async { let mut state = state_certs.lock().await; state @@ -216,7 +224,9 @@ impl StakeDeltaFilter { .await .inspect_err(|e| error!("Messaging handling error: {e}")) .ok(); - }.instrument(span).await; + } + .instrument(span) + .await; } _ => error!("Unexpected message type: {message:?}"), @@ -235,7 +245,10 @@ impl StakeDeltaFilter { }; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::AddressDeltas(deltas))) => { - let span = info_span!("stake_delta_filter.handle_deltas", block = block_info.number); + let span = info_span!( + "stake_delta_filter.handle_deltas", + block = block_info.number + ); async { let mut state = state_deltas.lock().await; state @@ -243,7 +256,9 @@ impl StakeDeltaFilter { .await .inspect_err(|e| error!("Messaging handling error: {e}")) .ok(); - }.instrument(span).await; + } + .instrument(span) + .await; } _ => error!("Unexpected message type for {}: {message:?}", &topic), @@ -270,7 +285,9 @@ impl StakeDeltaFilter { .await .inspect_err(|e| error!("Tick error: {e}")) .ok(); - }.instrument(span).await; + } + .instrument(span) + .await; } } } diff --git a/modules/tx_unpacker/src/tx_unpacker.rs b/modules/tx_unpacker/src/tx_unpacker.rs index c537257..cf4fe3f 100644 --- a/modules/tx_unpacker/src/tx_unpacker.rs +++ b/modules/tx_unpacker/src/tx_unpacker.rs @@ -2,11 +2,11 @@ //! Unpacks transaction bodies into UTXO events use acropolis_common::{ - rational_number::RationalNumber, messages::{ BlockFeesMessage, CardanoMessage, GovernanceProceduresMessage, Message, TxCertificatesMessage, UTXODeltasMessage, WithdrawalsMessage, }, + rational_number::RationalNumber, *, }; use caryatid_sdk::{module, Context, Module}; @@ -315,158 +315,154 @@ impl TxUnpacker { }, // Now repeated for a different type! - MultiEraCert::Conway(cert) => { - match cert.as_ref().as_ref() { - conway::Certificate::StakeRegistration(cred) => { - Ok(TxCertificate::StakeRegistration(StakeCredentialWithPos { - stake_credential: Self::map_stake_credential(cred), - tx_index: tx_index.try_into().unwrap(), - cert_index: cert_index.try_into().unwrap(), - })) - } - conway::Certificate::StakeDeregistration(cred) => Ok( - TxCertificate::StakeDeregistration(Self::map_stake_credential(cred)), - ), - conway::Certificate::StakeDelegation(cred, pool_key_hash) => { - Ok(TxCertificate::StakeDelegation(StakeDelegation { - credential: Self::map_stake_credential(cred), - operator: pool_key_hash.to_vec(), - })) - } - conway::Certificate::PoolRegistration { - operator, - vrf_keyhash, - pledge, - cost, - margin, - reward_account, - pool_owners, - relays, - pool_metadata, - } => Ok(TxCertificate::PoolRegistration(PoolRegistration { - operator: operator.to_vec(), - vrf_key_hash: vrf_keyhash.to_vec(), - pledge: *pledge, - cost: *cost, - margin: Ratio { - numerator: margin.numerator, - denominator: margin.denominator, - }, - reward_account: reward_account.to_vec(), - pool_owners: pool_owners.into_iter().map(|v| v.to_vec()).collect(), - relays: relays.into_iter().map(|relay| Self::map_relay(relay)).collect(), - pool_metadata: match pool_metadata { - Nullable::Some(md) => Some(PoolMetadata { - url: md.url.clone(), - hash: md.hash.to_vec(), - }), - _ => None, - }, - })), - conway::Certificate::PoolRetirement(pool_key_hash, epoch) => { - Ok(TxCertificate::PoolRetirement(PoolRetirement { - operator: pool_key_hash.to_vec(), - epoch: *epoch, - })) - } + MultiEraCert::Conway(cert) => match cert.as_ref().as_ref() { + conway::Certificate::StakeRegistration(cred) => { + Ok(TxCertificate::StakeRegistration(StakeCredentialWithPos { + stake_credential: Self::map_stake_credential(cred), + tx_index: tx_index.try_into().unwrap(), + cert_index: cert_index.try_into().unwrap(), + })) + } + conway::Certificate::StakeDeregistration(cred) => Ok( + TxCertificate::StakeDeregistration(Self::map_stake_credential(cred)), + ), + conway::Certificate::StakeDelegation(cred, pool_key_hash) => { + Ok(TxCertificate::StakeDelegation(StakeDelegation { + credential: Self::map_stake_credential(cred), + operator: pool_key_hash.to_vec(), + })) + } + conway::Certificate::PoolRegistration { + operator, + vrf_keyhash, + pledge, + cost, + margin, + reward_account, + pool_owners, + relays, + pool_metadata, + } => Ok(TxCertificate::PoolRegistration(PoolRegistration { + operator: operator.to_vec(), + vrf_key_hash: vrf_keyhash.to_vec(), + pledge: *pledge, + cost: *cost, + margin: Ratio { + numerator: margin.numerator, + denominator: margin.denominator, + }, + reward_account: reward_account.to_vec(), + pool_owners: pool_owners.into_iter().map(|v| v.to_vec()).collect(), + relays: relays.into_iter().map(|relay| Self::map_relay(relay)).collect(), + pool_metadata: match pool_metadata { + Nullable::Some(md) => Some(PoolMetadata { + url: md.url.clone(), + hash: md.hash.to_vec(), + }), + _ => None, + }, + })), + conway::Certificate::PoolRetirement(pool_key_hash, epoch) => { + Ok(TxCertificate::PoolRetirement(PoolRetirement { + operator: pool_key_hash.to_vec(), + epoch: *epoch, + })) + } - conway::Certificate::Reg(cred, coin) => { - Ok(TxCertificate::Registration(Registration { - credential: Self::map_stake_credential(cred), - deposit: *coin, - })) - } + conway::Certificate::Reg(cred, coin) => { + Ok(TxCertificate::Registration(Registration { + credential: Self::map_stake_credential(cred), + deposit: *coin, + })) + } - conway::Certificate::UnReg(cred, coin) => { - Ok(TxCertificate::Deregistration(Deregistration { - credential: Self::map_stake_credential(cred), - refund: *coin, - })) - } + conway::Certificate::UnReg(cred, coin) => { + Ok(TxCertificate::Deregistration(Deregistration { + credential: Self::map_stake_credential(cred), + refund: *coin, + })) + } + + conway::Certificate::VoteDeleg(cred, drep) => { + Ok(TxCertificate::VoteDelegation(VoteDelegation { + credential: Self::map_stake_credential(cred), + drep: Self::map_drep(drep), + })) + } + + conway::Certificate::StakeVoteDeleg(cred, pool_key_hash, drep) => Ok( + TxCertificate::StakeAndVoteDelegation(StakeAndVoteDelegation { + credential: Self::map_stake_credential(cred), + operator: pool_key_hash.to_vec(), + drep: Self::map_drep(drep), + }), + ), + + conway::Certificate::StakeRegDeleg(cred, pool_key_hash, coin) => Ok( + TxCertificate::StakeRegistrationAndDelegation(StakeRegistrationAndDelegation { + credential: Self::map_stake_credential(cred), + operator: pool_key_hash.to_vec(), + deposit: *coin, + }), + ), - conway::Certificate::VoteDeleg(cred, drep) => { - Ok(TxCertificate::VoteDelegation(VoteDelegation { + conway::Certificate::VoteRegDeleg(cred, drep, coin) => { + Ok(TxCertificate::StakeRegistrationAndVoteDelegation( + StakeRegistrationAndVoteDelegation { credential: Self::map_stake_credential(cred), drep: Self::map_drep(drep), - })) - } + deposit: *coin, + }, + )) + } - conway::Certificate::StakeVoteDeleg(cred, pool_key_hash, drep) => Ok( - TxCertificate::StakeAndVoteDelegation(StakeAndVoteDelegation { + conway::Certificate::StakeVoteRegDeleg(cred, pool_key_hash, drep, coin) => { + Ok(TxCertificate::StakeRegistrationAndStakeAndVoteDelegation( + StakeRegistrationAndStakeAndVoteDelegation { credential: Self::map_stake_credential(cred), operator: pool_key_hash.to_vec(), drep: Self::map_drep(drep), - }), - ), - - conway::Certificate::StakeRegDeleg(cred, pool_key_hash, coin) => { - Ok(TxCertificate::StakeRegistrationAndDelegation( - StakeRegistrationAndDelegation { - credential: Self::map_stake_credential(cred), - operator: pool_key_hash.to_vec(), - deposit: *coin, - }, - )) - } - - conway::Certificate::VoteRegDeleg(cred, drep, coin) => { - Ok(TxCertificate::StakeRegistrationAndVoteDelegation( - StakeRegistrationAndVoteDelegation { - credential: Self::map_stake_credential(cred), - drep: Self::map_drep(drep), - deposit: *coin, - }, - )) - } - - conway::Certificate::StakeVoteRegDeleg(cred, pool_key_hash, drep, coin) => { - Ok(TxCertificate::StakeRegistrationAndStakeAndVoteDelegation( - StakeRegistrationAndStakeAndVoteDelegation { - credential: Self::map_stake_credential(cred), - operator: pool_key_hash.to_vec(), - drep: Self::map_drep(drep), - deposit: *coin, - }, - )) - } + deposit: *coin, + }, + )) + } - conway::Certificate::AuthCommitteeHot(cold_cred, hot_cred) => { - Ok(TxCertificate::AuthCommitteeHot(AuthCommitteeHot { - cold_credential: Self::map_stake_credential(cold_cred), - hot_credential: Self::map_stake_credential(hot_cred), - })) - } + conway::Certificate::AuthCommitteeHot(cold_cred, hot_cred) => { + Ok(TxCertificate::AuthCommitteeHot(AuthCommitteeHot { + cold_credential: Self::map_stake_credential(cold_cred), + hot_credential: Self::map_stake_credential(hot_cred), + })) + } - conway::Certificate::ResignCommitteeCold(cold_cred, anchor) => { - Ok(TxCertificate::ResignCommitteeCold(ResignCommitteeCold { - cold_credential: Self::map_stake_credential(cold_cred), - anchor: Self::map_nullable_anchor(&anchor), - })) - } + conway::Certificate::ResignCommitteeCold(cold_cred, anchor) => { + Ok(TxCertificate::ResignCommitteeCold(ResignCommitteeCold { + cold_credential: Self::map_stake_credential(cold_cred), + anchor: Self::map_nullable_anchor(&anchor), + })) + } - conway::Certificate::RegDRepCert(cred, coin, anchor) => { - Ok(TxCertificate::DRepRegistration(DRepRegistration { - credential: Self::map_stake_credential(cred), - deposit: *coin, - anchor: Self::map_nullable_anchor(&anchor), - })) - } + conway::Certificate::RegDRepCert(cred, coin, anchor) => { + Ok(TxCertificate::DRepRegistration(DRepRegistration { + credential: Self::map_stake_credential(cred), + deposit: *coin, + anchor: Self::map_nullable_anchor(&anchor), + })) + } - conway::Certificate::UnRegDRepCert(cred, coin) => { - Ok(TxCertificate::DRepDeregistration(DRepDeregistration { - credential: Self::map_stake_credential(cred), - refund: *coin, - })) - } + conway::Certificate::UnRegDRepCert(cred, coin) => { + Ok(TxCertificate::DRepDeregistration(DRepDeregistration { + credential: Self::map_stake_credential(cred), + refund: *coin, + })) + } - conway::Certificate::UpdateDRepCert(cred, anchor) => { - Ok(TxCertificate::DRepUpdate(DRepUpdate { - credential: Self::map_stake_credential(cred), - anchor: Self::map_nullable_anchor(&anchor), - })) - } + conway::Certificate::UpdateDRepCert(cred, anchor) => { + Ok(TxCertificate::DRepUpdate(DRepUpdate { + credential: Self::map_stake_credential(cred), + anchor: Self::map_nullable_anchor(&anchor), + })) } - } + }, _ => Err(anyhow!("Unknown certificate era {:?} ignored", cert)), } diff --git a/modules/upstream_chain_fetcher/src/body_fetcher.rs b/modules/upstream_chain_fetcher/src/body_fetcher.rs index b7a24f8..795f2db 100644 --- a/modules/upstream_chain_fetcher/src/body_fetcher.rs +++ b/modules/upstream_chain_fetcher/src/body_fetcher.rs @@ -7,14 +7,14 @@ use acropolis_common::{ BlockInfo, BlockStatus, Era, }; use anyhow::{bail, Result}; +use crossbeam::channel::{Receiver, TryRecvError}; use pallas::{ ledger::traverse::MultiEraHeader, network::{ facades::PeerClient, - miniprotocols::{chainsync::HeaderContent, Point} + miniprotocols::{chainsync::HeaderContent, Point}, }, }; -use crossbeam::channel::{Receiver, TryRecvError}; use std::{sync::Arc, time::Duration}; use tokio::time::sleep; use tracing::{debug, info}; @@ -31,20 +31,24 @@ pub struct BodyFetcher { } impl BodyFetcher { - async fn new(cfg: Arc, cache: Option, last_epoch: Option) - -> Result - { + async fn new( + cfg: Arc, + cache: Option, + last_epoch: Option, + ) -> Result { Ok(BodyFetcher { cfg: cfg.clone(), peer: utils::peer_connect(cfg.clone(), "body fetcher").await?, cache, - last_epoch + last_epoch, }) } - async fn fetch_block(&mut self, point: Point, block_info: &BlockInfo) - -> Result> - { + async fn fetch_block( + &mut self, + point: Point, + block_info: &BlockInfo, + ) -> Result> { // Fetch the block body debug!("Requesting single block {point:?}"); let body = self.peer.blockfetch().fetch_single(point.clone()).await; @@ -128,7 +132,7 @@ impl BodyFetcher { let record = UpstreamCacheRecord { id: block_info.clone(), hdr: msg_hdr.clone(), - body: msg_body.clone() + body: msg_body.clone(), }; self.cache.as_mut().map(|c| c.write_record(&record)).transpose()?; @@ -145,15 +149,15 @@ impl BodyFetcher { cfg: Arc, cache: Option, last_epoch: Option, - receiver: Receiver<(bool, HeaderContent)> + receiver: Receiver<(bool, HeaderContent)>, ) -> Result<()> { let mut fetcher = Self::new(cfg, cache, last_epoch).await?; loop { - match receiver.try_recv() { - Ok((rolled_back, header)) => fetcher.process_message(rolled_back, header).await?, - Err(TryRecvError::Disconnected) => return Ok(()), - Err(TryRecvError::Empty) => sleep(Duration::from_millis(1)).await - } + match receiver.try_recv() { + Ok((rolled_back, header)) => fetcher.process_message(rolled_back, header).await?, + Err(TryRecvError::Disconnected) => return Ok(()), + Err(TryRecvError::Empty) => sleep(Duration::from_millis(1)).await, + } } } } diff --git a/modules/upstream_chain_fetcher/src/upstream_cache.rs b/modules/upstream_chain_fetcher/src/upstream_cache.rs index 152bf1e..df06d06 100644 --- a/modules/upstream_chain_fetcher/src/upstream_cache.rs +++ b/modules/upstream_chain_fetcher/src/upstream_cache.rs @@ -1,15 +1,20 @@ -use anyhow::{anyhow, bail, Result}; -use std::{path::Path, fs::File, io::{BufReader, Write}, sync::Arc}; use acropolis_common::{ - messages::{BlockHeaderMessage, BlockBodyMessage}, - BlockInfo + messages::{BlockBodyMessage, BlockHeaderMessage}, + BlockInfo, +}; +use anyhow::{anyhow, bail, Result}; +use std::{ + fs::File, + io::{BufReader, Write}, + path::Path, + sync::Arc, }; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct UpstreamCacheRecord { pub id: BlockInfo, pub hdr: Arc, - pub body: Arc + pub body: Arc, } pub trait Storage { @@ -23,7 +28,9 @@ pub struct FileStorage { impl FileStorage { pub fn new(path: &str) -> Self { - Self { path: path.to_string() } + Self { + path: path.to_string(), + } } fn get_file_name(&self, chunk_no: usize) -> String { @@ -49,18 +56,23 @@ pub struct UpstreamCacheImpl { // If current_record < density and current_record points outside of chunk_cached, // then we're at the first empty record after cached records. - current_chunk: usize, current_record: usize, chunk_cached: Vec, // Reader/writer functions --- to abstract actual struct encoder/storage from chunk logic - storage: S + storage: S, } impl UpstreamCacheImpl { pub fn new_impl(storage: S) -> Self { - Self { storage, density: 1000, current_chunk: 0, current_record: 0, chunk_cached: vec![] } + Self { + storage, + density: 1000, + current_chunk: 0, + current_record: 0, + chunk_cached: vec![], + } } pub fn start_reading(&mut self) -> Result<()> { @@ -84,8 +96,10 @@ impl UpstreamCacheImpl { if self.current_record >= self.density { if self.chunk_cached.len() > self.density { - bail!("Full chunk actual length {}, expected {}", - self.chunk_cached.len(), self.density + bail!( + "Full chunk actual length {}, expected {}", + self.chunk_cached.len(), + self.density ); } @@ -100,10 +114,11 @@ impl UpstreamCacheImpl { pub fn read_record(&mut self) -> Result> { if self.has_record() { - let record = self.chunk_cached.get(self.current_record) - .ok_or(anyhow!("Error reading {}:{}", - self.current_chunk, self.current_record - ))?; + let record = self.chunk_cached.get(self.current_record).ok_or(anyhow!( + "Error reading {}:{}", + self.current_chunk, + self.current_record + ))?; return Ok(Some(record.clone())); }; @@ -136,7 +151,8 @@ impl Storage for FileStorage { let file = File::open(&name)?; let reader = BufReader::new(file); - match serde_json::from_reader::, Vec>(reader) { + match serde_json::from_reader::, Vec>(reader) + { Ok(res) => Ok(res.clone()), Err(err) => Err(anyhow!( "Error reading upstream cache chunk JSON from {name}: '{err}'" @@ -153,35 +169,41 @@ impl Storage for FileStorage { #[cfg(test)] mod test { - use anyhow::{anyhow, bail, Result}; - use std::{collections::HashMap, path::Path, sync::Arc}; + use crate::upstream_cache::{Storage, UpstreamCacheImpl, UpstreamCacheRecord}; use acropolis_common::{ - messages::{BlockHeaderMessage, BlockBodyMessage}, - BlockInfo, BlockStatus, Era + messages::{BlockBodyMessage, BlockHeaderMessage}, + BlockInfo, BlockStatus, Era, }; - use crate::upstream_cache::{Storage, UpstreamCacheImpl, UpstreamCacheRecord}; + use anyhow::{anyhow, bail, Result}; + use std::{collections::HashMap, path::Path, sync::Arc}; - fn blk(n: u64) -> BlockInfo { BlockInfo { - status: BlockStatus::Volatile, - slot: n, - number: n, - hash: vec![], - epoch: 0, - new_epoch: false, - era: Era::default(), - }} + fn blk(n: u64) -> BlockInfo { + BlockInfo { + status: BlockStatus::Volatile, + slot: n, + number: n, + hash: vec![], + epoch: 0, + new_epoch: false, + era: Era::default(), + } + } fn ucr(n: u64, hdr: usize, body: usize) -> UpstreamCacheRecord { UpstreamCacheRecord { id: blk(n), - hdr: Arc::new(BlockHeaderMessage { raw: vec![hdr as u8] }), - body: Arc::new(BlockBodyMessage { raw: vec![body as u8] }) + hdr: Arc::new(BlockHeaderMessage { + raw: vec![hdr as u8], + }), + body: Arc::new(BlockBodyMessage { + raw: vec![body as u8], + }), } } #[derive(Default)] struct TestStorage { - rec: HashMap> + rec: HashMap>, } impl Storage for TestStorage { @@ -207,22 +229,28 @@ mod test { fn test_write_read() -> Result<()> { let mut cache = UpstreamCacheImpl::::new_impl(TestStorage::default()); cache.density = 3; - let perm: [u64; 11] = [3,1,4,1,5,9,2,6,5,3,5]; + let perm: [u64; 11] = [3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5]; for n in 0..11 { - cache.write_record(&ucr(perm[n], n, n+100))?; + cache.write_record(&ucr(perm[n], n, n + 100))?; } assert_eq!(cache.storage.rec.len(), 4); for ch in 0..3 { let chunk = cache.storage.rec.get(&ch).unwrap(); - assert_eq!(chunk.get(0).unwrap().id.number, perm[ch*3]); - assert_eq!(chunk.get(1).unwrap().id.number, perm[ch*3+1]); - assert_eq!(chunk.get(2).unwrap().id.number, perm[ch*3+2]); + assert_eq!(chunk.get(0).unwrap().id.number, perm[ch * 3]); + assert_eq!(chunk.get(1).unwrap().id.number, perm[ch * 3 + 1]); + assert_eq!(chunk.get(2).unwrap().id.number, perm[ch * 3 + 2]); assert_eq!(chunk.len(), 3); } - assert_eq!(cache.storage.rec.get(&3).unwrap().get(0).unwrap().id.number, perm[9]); - assert_eq!(cache.storage.rec.get(&3).unwrap().get(1).unwrap().id.number, perm[10]); + assert_eq!( + cache.storage.rec.get(&3).unwrap().get(0).unwrap().id.number, + perm[9] + ); + assert_eq!( + cache.storage.rec.get(&3).unwrap().get(1).unwrap().id.number, + perm[10] + ); assert_eq!(cache.storage.rec.get(&3).unwrap().len(), 2); cache.start_reading()?; @@ -230,7 +258,7 @@ mod test { let record = cache.read_record()?.unwrap(); assert_eq!(record.id.number, perm[n]); assert_eq!(record.hdr.raw, vec![n as u8]); - assert_eq!(record.body.raw, vec![(n+100) as u8]); + assert_eq!(record.body.raw, vec![(n + 100) as u8]); cache.next_record()?; } diff --git a/modules/upstream_chain_fetcher/src/upstream_chain_fetcher.rs b/modules/upstream_chain_fetcher/src/upstream_chain_fetcher.rs index 44ed529..023f367 100644 --- a/modules/upstream_chain_fetcher/src/upstream_chain_fetcher.rs +++ b/modules/upstream_chain_fetcher/src/upstream_chain_fetcher.rs @@ -4,16 +4,16 @@ use acropolis_common::{ calculations::slot_to_epoch, messages::{CardanoMessage, Message}, - BlockInfo, + BlockInfo, }; use anyhow::{anyhow, bail, Result}; use caryatid_sdk::{module, Context, Module, Subscription}; -use crossbeam::channel::{TrySendError, bounded}; use config::Config; +use crossbeam::channel::{bounded, TrySendError}; use pallas::{ ledger::traverse::MultiEraHeader, network::{ - facades::PeerClient, + facades::PeerClient, miniprotocols::{ chainsync::{NextResponse, Tip}, Point, @@ -24,12 +24,12 @@ use std::{sync::Arc, time::Duration}; use tokio::{sync::Mutex, time::sleep}; use tracing::{debug, error, info}; -mod upstream_cache; mod body_fetcher; +mod upstream_cache; mod utils; -use upstream_cache::{UpstreamCache, UpstreamCacheRecord}; use body_fetcher::BodyFetcher; +use upstream_cache::{UpstreamCache, UpstreamCacheRecord}; use utils::{FetcherConfig, SyncPoint}; const MAX_BODY_FETCHER_CHANNEL_LENGTH: usize = 100; @@ -49,7 +49,7 @@ impl UpstreamChainFetcher { cfg: Arc, peer: Arc>, cache: Option, - start: Point + start: Point, ) -> Result<()> { // Find intersect to given point let slot = start.slot_or_default(); @@ -102,13 +102,13 @@ impl UpstreamChainFetcher { for_send = match sender.try_send(for_send) { Ok(()) => break 'sender, Err(TrySendError::Full(fs)) => fs, - Err(e) => bail!("Cannot send message to BodyFetcher: {e}") + Err(e) => bail!("Cannot send message to BodyFetcher: {e}"), }; sleep(Duration::from_millis(100)).await; } rolled_back = false; - }, + } // TODO The first message after sync start always comes with 'RollBackward'. // Here we suppress this status (since it says nothing about actual rollbacks, @@ -126,9 +126,10 @@ impl UpstreamChainFetcher { } } - async fn read_cache(cfg: Arc, cache: &mut UpstreamCache) - -> Result> - { + async fn read_cache( + cfg: Arc, + cache: &mut UpstreamCache, + ) -> Result> { let mut last_block = None; cache.start_reading()?; @@ -141,33 +142,33 @@ impl UpstreamChainFetcher { Ok(last_block) } - async fn wait_snapshot_completion(subscription: &mut Box>) - -> Result> - { + async fn wait_snapshot_completion( + subscription: &mut Box>, + ) -> Result> { let Ok((_, message)) = subscription.read().await else { return Ok(None); }; match message.as_ref() { Message::Cardano((blk, CardanoMessage::SnapshotComplete)) => Ok(Some(blk.clone())), - msg => bail!("Unexpected message in completion topic: {msg:?}") + msg => bail!("Unexpected message in completion topic: {msg:?}"), } } async fn run_chain_sync( cfg: Arc, - snapshot_complete: &mut Option>> + snapshot_complete: &mut Option>>, ) -> Result<()> { - let peer = Arc::new(Mutex::new(utils::peer_connect(cfg.clone(), "header fetcher").await?)); + let peer = Arc::new(Mutex::new( + utils::peer_connect(cfg.clone(), "header fetcher").await?, + )); match cfg.sync_point { SyncPoint::Tip => { // Ask for origin but get the tip as well let mut my_peer = peer.lock().await; - let (_, Tip(point, _)) = my_peer - .chainsync() - .find_intersect(vec![Point::Origin]) - .await?; + let (_, Tip(point, _)) = + my_peer.chainsync().find_intersect(vec![Point::Origin]).await?; Self::sync_to_point(cfg, peer.clone(), None, point).await?; } SyncPoint::Origin => { @@ -183,9 +184,12 @@ impl UpstreamChainFetcher { Self::sync_to_point(cfg, peer.clone(), Some(upstream_cache), point).await?; } SyncPoint::Snapshot => { - info!("Waiting for snapshot completion on {}", cfg.snapshot_completion_topic); - let mut completion_subscription = snapshot_complete.as_mut() - .ok_or_else(|| anyhow!("Snapshot topic missing"))?; + info!( + "Waiting for snapshot completion on {}", + cfg.snapshot_completion_topic + ); + let mut completion_subscription = + snapshot_complete.as_mut().ok_or_else(|| anyhow!("Snapshot topic missing"))?; match Self::wait_snapshot_completion(&mut completion_subscription).await? { Some(block) => { @@ -196,7 +200,7 @@ impl UpstreamChainFetcher { let point = Point::Specific(block.slot, block.hash.clone()); Self::sync_to_point(cfg, peer, None, point).await?; } - None => info!("Completion not received. Exiting ...") + None => info!("Completion not received. Exiting ..."), } } } @@ -207,13 +211,15 @@ impl UpstreamChainFetcher { pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { let cfg = FetcherConfig::new(context.clone(), config)?; let mut subscription = match cfg.sync_point { - SyncPoint::Snapshot => - Some(cfg.context.subscribe(&cfg.snapshot_completion_topic).await?), - _ => None + SyncPoint::Snapshot => { + Some(cfg.context.subscribe(&cfg.snapshot_completion_topic).await?) + } + _ => None, }; context.clone().run(async move { - Self::run_chain_sync(cfg, &mut subscription).await + Self::run_chain_sync(cfg, &mut subscription) + .await .unwrap_or_else(|e| error!("Chain sync failed: {e}")); }); diff --git a/modules/upstream_chain_fetcher/src/utils.rs b/modules/upstream_chain_fetcher/src/utils.rs index 534ede7..40258e8 100644 --- a/modules/upstream_chain_fetcher/src/utils.rs +++ b/modules/upstream_chain_fetcher/src/utils.rs @@ -1,3 +1,4 @@ +use crate::UpstreamCacheRecord; use acropolis_common::messages::{CardanoMessage, Message}; use anyhow::{anyhow, bail, Result}; use caryatid_sdk::Context; @@ -6,18 +7,17 @@ use pallas::network::facades::PeerClient; use serde::Deserialize; use std::sync::Arc; use tracing::info; -use crate::UpstreamCacheRecord; -const DEFAULT_HEADER_TOPIC: (&str,&str) = ("header-topic", "cardano.block.header"); -const DEFAULT_BODY_TOPIC: (&str,&str) = ("body-topic", "cardano.block.body"); -const DEFAULT_SNAPSHOT_COMPLETION_TOPIC: (&str,&str) = - ("snapshot-complietion-topic","cardano.snapshot.complete"); +const DEFAULT_HEADER_TOPIC: (&str, &str) = ("header-topic", "cardano.block.header"); +const DEFAULT_BODY_TOPIC: (&str, &str) = ("body-topic", "cardano.block.body"); +const DEFAULT_SNAPSHOT_COMPLETION_TOPIC: (&str, &str) = + ("snapshot-complietion-topic", "cardano.snapshot.complete"); -const DEFAULT_NODE_ADDRESS: (&str,&str) = ("node-address", "backbone.cardano.iog.io:3001"); -const DEFAULT_MAGIC_NUMBER: (&str,u64) = ("magic-number", 764824073); +const DEFAULT_NODE_ADDRESS: (&str, &str) = ("node-address", "backbone.cardano.iog.io:3001"); +const DEFAULT_MAGIC_NUMBER: (&str, u64) = ("magic-number", 764824073); -const DEFAULT_SYNC_POINT: (&str,SyncPoint) = ("sync-point", SyncPoint::Snapshot); -const DEFAULT_CACHE_DIR: (&str,&str) = ("cache-dir", "upstream-cache"); +const DEFAULT_SYNC_POINT: (&str, SyncPoint) = ("sync-point", SyncPoint::Snapshot); +const DEFAULT_CACHE_DIR: (&str, &str) = ("cache-dir", "upstream-cache"); #[derive(Clone, Debug, serde::Deserialize, PartialEq)] pub enum SyncPoint { @@ -28,7 +28,7 @@ pub enum SyncPoint { #[serde(rename = "cache")] Cache, #[serde(rename = "snapshot")] - Snapshot + Snapshot, } pub struct FetcherConfig { @@ -39,7 +39,7 @@ pub struct FetcherConfig { pub snapshot_completion_topic: String, pub node_address: String, pub magic_number: u64, - pub cache_dir: String + pub cache_dir: String, } impl FetcherConfig { @@ -49,7 +49,10 @@ impl FetcherConfig { actual } - fn conf_enum<'a, T: Deserialize<'a> + std::fmt::Debug>(config: &Arc, keydef: (&str, T)) -> Result { + fn conf_enum<'a, T: Deserialize<'a> + std::fmt::Debug>( + config: &Arc, + keydef: (&str, T), + ) -> Result { let actual = if config.get_string(keydef.0).is_ok() { config .get::(keydef.0) @@ -68,23 +71,24 @@ impl FetcherConfig { body_topic: Self::conf(&config, DEFAULT_BODY_TOPIC), snapshot_completion_topic: Self::conf(&config, DEFAULT_SNAPSHOT_COMPLETION_TOPIC), sync_point: Self::conf_enum::(&config, DEFAULT_SYNC_POINT)?, - magic_number: config.get::(DEFAULT_MAGIC_NUMBER.0) - .unwrap_or(DEFAULT_MAGIC_NUMBER.1), + magic_number: config + .get::(DEFAULT_MAGIC_NUMBER.0) + .unwrap_or(DEFAULT_MAGIC_NUMBER.1), node_address: Self::conf(&config, DEFAULT_NODE_ADDRESS), - cache_dir: Self::conf(&config, DEFAULT_CACHE_DIR) + cache_dir: Self::conf(&config, DEFAULT_CACHE_DIR), })) } } pub async fn publish_message(cfg: Arc, record: &UpstreamCacheRecord) -> Result<()> { let header_msg = Arc::new(Message::Cardano(( - record.id.clone(), - CardanoMessage::BlockHeader((*record.hdr).clone()) + record.id.clone(), + CardanoMessage::BlockHeader((*record.hdr).clone()), ))); let body_msg = Arc::new(Message::Cardano(( - record.id.clone(), - CardanoMessage::BlockBody((*record.body).clone()) + record.id.clone(), + CardanoMessage::BlockBody((*record.body).clone()), ))); cfg.context.message_bus.publish(&cfg.header_topic, header_msg).await?; @@ -92,7 +96,10 @@ pub async fn publish_message(cfg: Arc, record: &UpstreamCacheReco } pub async fn peer_connect(cfg: Arc, role: &str) -> Result { - info!("Connecting {role} to {} ({}) ...", cfg.node_address, cfg.magic_number); + info!( + "Connecting {role} to {} ({}) ...", + cfg.node_address, cfg.magic_number + ); match PeerClient::connect(cfg.node_address.clone(), cfg.magic_number).await { Ok(peer) => { @@ -100,7 +107,9 @@ pub async fn peer_connect(cfg: Arc, role: &str) -> Result bail!( - "Cannot connect {role} to {} ({}): {e}", cfg.node_address, cfg.magic_number - ) + "Cannot connect {role} to {} ({}): {e}", + cfg.node_address, + cfg.magic_number + ), } } diff --git a/modules/utxo_state/src/fjall_async_immutable_utxo_store.rs b/modules/utxo_state/src/fjall_async_immutable_utxo_store.rs index 54ea326..260dc1e 100644 --- a/modules/utxo_state/src/fjall_async_immutable_utxo_store.rs +++ b/modules/utxo_state/src/fjall_async_immutable_utxo_store.rs @@ -118,6 +118,9 @@ impl ImmutableUTXOStore for FjallAsyncImmutableUTXOStore { async fn len(&self) -> Result { let partition = self.partition.clone(); - Ok(task::spawn_blocking(move || Ok::<_, anyhow::Error>(partition.approximate_len())).await??) + Ok( + task::spawn_blocking(move || Ok::<_, anyhow::Error>(partition.approximate_len())) + .await??, + ) } } diff --git a/modules/utxo_state/src/utxo_state.rs b/modules/utxo_state/src/utxo_state.rs index ed5151b..5ef68ed 100644 --- a/modules/utxo_state/src/utxo_state.rs +++ b/modules/utxo_state/src/utxo_state.rs @@ -100,7 +100,9 @@ impl UTXOState { .await .inspect_err(|e| error!("Messaging handling error: {e}")) .ok(); - }.instrument(span).await; + } + .instrument(span) + .await; } _ => error!("Unexpected message type: {message:?}"), @@ -127,7 +129,9 @@ impl UTXOState { .await .inspect_err(|e| error!("Tick error: {e}")) .ok(); - }.instrument(span).await; + } + .instrument(span) + .await; } } } diff --git a/processes/omnibus/src/main.rs b/processes/omnibus/src/main.rs index fa6c7c4..29bc101 100644 --- a/processes/omnibus/src/main.rs +++ b/processes/omnibus/src/main.rs @@ -59,7 +59,9 @@ pub async fn main() -> Result<()> { .build() .tracer("rust-otel-otlp"); let otel_layer = OpenTelemetryLayer::new(otel_tracer) - .with_filter(EnvFilter::from_default_env().add_directive(filter::LevelFilter::INFO.into())) + .with_filter( + EnvFilter::from_default_env().add_directive(filter::LevelFilter::INFO.into()), + ) .with_filter(filter::filter_fn(|meta| meta.is_span())); Registry::default().with(fmt_layer).with(otel_layer).init(); } else { diff --git a/processes/replayer/src/main.rs b/processes/replayer/src/main.rs index ac2bd22..a09955e 100644 --- a/processes/replayer/src/main.rs +++ b/processes/replayer/src/main.rs @@ -4,10 +4,10 @@ use acropolis_common::messages::Message; use anyhow::Result; use caryatid_process::Process; use caryatid_sdk::ModuleRegistry; +use config::{Config, Environment, File}; use std::{env, sync::Arc}; use tracing::info; use tracing_subscriber; -use config::{Config, Environment, File}; // External modules use acropolis_module_accounts_state::AccountsState; diff --git a/processes/replayer/src/playback.rs b/processes/replayer/src/playback.rs index 3895aaf..326d4a2 100644 --- a/processes/replayer/src/playback.rs +++ b/processes/replayer/src/playback.rs @@ -1,15 +1,14 @@ //! Caryatid Playback module -use anyhow::{anyhow, bail, ensure, Result}; -use caryatid_sdk::{module, Context, Module}; use acropolis_common::{ messages::{ - CardanoMessage, GovernanceProceduresMessage, - DRepStakeDistributionMessage, SPOStakeDistributionMessage, - Message - }, - BlockInfo + CardanoMessage, DRepStakeDistributionMessage, GovernanceProceduresMessage, Message, + SPOStakeDistributionMessage, + }, + BlockInfo, }; +use anyhow::{anyhow, bail, ensure, Result}; +use caryatid_sdk::{module, Context, Module}; use config::Config; use std::collections::HashMap; use std::fs::read_to_string; @@ -21,7 +20,11 @@ use crate::replayer_config::ReplayerConfig; /// Playback module /// Parameterised by the outer message enum used on the bus -#[module(message_type(Message), name = "gov-playback", description = "Governance messages playback")] +#[module( + message_type(Message), + name = "gov-playback", + description = "Governance messages playback" +)] pub struct Playback; struct PlaybackRunner { @@ -37,16 +40,14 @@ struct PlaybackRunner { } impl PlaybackRunner { - fn new( - context: Arc>, cfg: Arc - ) -> Self { + fn new(context: Arc>, cfg: Arc) -> Self { Self { context, cfg: cfg.clone(), topics: cfg.get_topics_vec(), current_file: HashMap::new(), next: HashMap::new(), - empty_message: HashMap::new() + empty_message: HashMap::new(), } } } @@ -56,9 +57,7 @@ impl Playback { let cfg = ReplayerConfig::new(&config); let mut playback_runner = PlaybackRunner::new(context.clone(), cfg); - context.run(async move { - playback_runner.run().await - }); + context.run(async move { playback_runner.run().await }); Ok(()) } @@ -67,24 +66,23 @@ impl Playback { impl PlaybackRunner { fn empty_message(msg: &CardanoMessage) -> Result> { match msg { - CardanoMessage::GovernanceProcedures(_) => - Ok(Arc::new(CardanoMessage::GovernanceProcedures( - GovernanceProceduresMessage::default())) - ), - CardanoMessage::DRepStakeDistribution(_) => - Ok(Arc::new(CardanoMessage::DRepStakeDistribution( - DRepStakeDistributionMessage::default())) - ), - CardanoMessage::SPOStakeDistribution(_) => - Ok(Arc::new(CardanoMessage::SPOStakeDistribution( - SPOStakeDistributionMessage::default())) - ), - m => bail!("Cannot empty message {m:?}") + CardanoMessage::GovernanceProcedures(_) => Ok(Arc::new( + CardanoMessage::GovernanceProcedures(GovernanceProceduresMessage::default()), + )), + CardanoMessage::DRepStakeDistribution(_) => Ok(Arc::new( + CardanoMessage::DRepStakeDistribution(DRepStakeDistributionMessage::default()), + )), + CardanoMessage::SPOStakeDistribution(_) => Ok(Arc::new( + CardanoMessage::SPOStakeDistribution(SPOStakeDistributionMessage::default()), + )), + m => bail!("Cannot empty message {m:?}"), } } fn take_message( - &mut self, topic: String, prefix: String + &mut self, + topic: String, + prefix: String, ) -> Result>> { let num = self.current_file.get(&topic).unwrap_or(&0); @@ -97,27 +95,28 @@ impl PlaybackRunner { } let (id, message) = match read_to_string(&filename) { - Ok(file) => - match serde_json::from_str::(&file) { - Ok(Message::Cardano((id, cardano_message))) => (id, Arc::new(cardano_message)), - Ok(m) => bail!("Expected CardanoMessage, found {m:?}"), - Err(error) => bail!("Failed to parse message from file {filename:?}: {error}") - }, - - Err(error) => bail!("Failed to read file {filename:?}: {error}. Aborting playback") + Ok(file) => match serde_json::from_str::(&file) { + Ok(Message::Cardano((id, cardano_message))) => (id, Arc::new(cardano_message)), + Ok(m) => bail!("Expected CardanoMessage, found {m:?}"), + Err(error) => bail!("Failed to parse message from file {filename:?}: {error}"), + }, + + Err(error) => bail!("Failed to read file {filename:?}: {error}. Aborting playback"), }; - self.current_file.insert(topic.clone(), num+1); + self.current_file.insert(topic.clone(), num + 1); self.next.insert(topic, (id, message.clone())); Ok(Some(message)) } fn get_earliest_available_block(&self) -> Option { - self.next.values().map(|(blk,_msg)| blk).min().map(|x| (*x).clone()) + self.next.values().map(|(blk, _msg)| blk).min().map(|x| (*x).clone()) } fn gen_block_info( - curr_block_num: u64, prev_blk: &BlockInfo, pending_blk: &BlockInfo + curr_block_num: u64, + prev_blk: &BlockInfo, + pending_blk: &BlockInfo, ) -> Result { let mut curr_blk = prev_blk.clone(); curr_blk.slot += curr_block_num - prev_blk.number; @@ -127,8 +126,9 @@ impl PlaybackRunner { ensure!(curr_blk.slot < pending_blk.slot); ensure!(curr_blk.number < pending_blk.number); - ensure!(curr_blk.epoch == pending_blk.epoch - || (curr_blk.epoch + 1 == pending_blk.epoch && pending_blk.new_epoch) + ensure!( + curr_blk.epoch == pending_blk.epoch + || (curr_blk.epoch + 1 == pending_blk.epoch && pending_blk.new_epoch) ); Ok(curr_blk) @@ -144,17 +144,21 @@ impl PlaybackRunner { for (topic, _prefix, epoch_bound, skip_zero) in self.topics.iter() { if (!skip_zero || curr_blk.epoch != 0) && (!epoch_bound || curr_blk.new_epoch) { let msg = match self.next.get(topic) { - Some((blk,msg)) if blk == curr_blk => msg.clone(), + Some((blk, msg)) if blk == curr_blk => msg.clone(), - Some((blk,_)) if blk.number == curr_blk.number => - bail!("{blk:?} != {curr_blk:?}"), + Some((blk, _)) if blk.number == curr_blk.number => { + bail!("{blk:?} != {curr_blk:?}") + } - Some((blk,_)) if blk.number < curr_blk.number => - bail!("{blk:?} < {curr_blk:?}"), + Some((blk, _)) if blk.number < curr_blk.number => { + bail!("{blk:?} < {curr_blk:?}") + } - Some(_) | None => self.empty_message.get(topic).ok_or_else( - || anyhow!("No empty message for {topic}") - )?.clone() + Some(_) | None => self + .empty_message + .get(topic) + .ok_or_else(|| anyhow!("No empty message for {topic}"))? + .clone(), }; self.send_message(topic, curr_blk, &msg).await?; } @@ -164,11 +168,10 @@ impl PlaybackRunner { fn step_forward(&mut self, current_step: &BlockInfo) -> Result<()> { for (topic, prefix, _epoch_bound, _skip_zero) in self.topics.clone().iter() { - if let Some((blk,_msg)) = self.next.get(topic) { + if let Some((blk, _msg)) = self.next.get(topic) { if blk == current_step { self.take_message(topic.to_string(), prefix.to_string())?; - } - else if blk.number <= current_step.number { + } else if blk.number <= current_step.number { bail!("Impossible next block info for {topic}: {blk:?} < {current_step:?}"); } } @@ -177,16 +180,19 @@ impl PlaybackRunner { } fn dump_state(&self) { - let stats = self.next.iter().map( - |(topic, (blk, _msg))| format!("{topic}: {}:{} ", blk.epoch, blk.number) - ).collect::(); + let stats = self + .next + .iter() + .map(|(topic, (blk, _msg))| format!("{topic}: {}:{} ", blk.epoch, blk.number)) + .collect::(); info!("Current replay state: {stats}"); } async fn run(&mut self) -> Result<()> { // Initializing message status for (topic, prefix, _epoch_bound, _skip_zero) in self.topics.clone().iter() { - let msg = self.take_message(topic.to_string(), prefix.to_string())? + let msg = self + .take_message(topic.to_string(), prefix.to_string())? .ok_or_else(|| anyhow!("Topic {topic} may not be empty"))?; self.empty_message.insert(topic.to_string(), Self::empty_message(&msg)?); @@ -194,7 +200,7 @@ impl PlaybackRunner { let mut prev_blk = match self.get_earliest_available_block() { Some(minimal_blk) => minimal_blk, - None => bail!("At least one real block is required for replay") + None => bail!("At least one real block is required for replay"), }; if prev_blk.number != 1 { @@ -208,7 +214,7 @@ impl PlaybackRunner { granularity += 1; } - for curr_block_num in prev_blk.number .. pending_blk.number { + for curr_block_num in prev_blk.number..pending_blk.number { let cur_blk = Self::gen_block_info(curr_block_num, &prev_blk, &pending_blk)?; self.send_messages_to_all(&cur_blk).await?; } diff --git a/processes/replayer/src/recorder.rs b/processes/replayer/src/recorder.rs index 1dd3937..e44d630 100644 --- a/processes/replayer/src/recorder.rs +++ b/processes/replayer/src/recorder.rs @@ -1,13 +1,14 @@ //! Governance recorder module -use anyhow::{anyhow, Result}; -use caryatid_sdk::{module, Context, Module, Subscription}; -use acropolis_common::{BlockInfo, +use acropolis_common::{ messages::{ - Message, CardanoMessage, GovernanceProceduresMessage, - DRepStakeDistributionMessage, SPOStakeDistributionMessage - } + CardanoMessage, DRepStakeDistributionMessage, GovernanceProceduresMessage, Message, + SPOStakeDistributionMessage, + }, + BlockInfo, }; +use anyhow::{anyhow, Result}; +use caryatid_sdk::{module, Context, Module, Subscription}; use config::Config; use std::{fs::File, io::Write, sync::Arc}; use tracing::{error, info}; @@ -15,18 +16,26 @@ use tracing::{error, info}; use crate::replayer_config::ReplayerConfig; /// Recorder module -#[module(message_type(Message), name = "gov-recorder", description = "Governance messages recorder")] +#[module( + message_type(Message), + name = "gov-recorder", + description = "Governance messages recorder" +)] pub struct Recorder; struct BlockRecorder { cfg: Arc, prefix: String, - num: usize + num: usize, } impl BlockRecorder { pub fn new(cfg: Arc, prefix: &str) -> Self { - Self { cfg, prefix: prefix.to_string(), num: 0 } + Self { + cfg, + prefix: prefix.to_string(), + num: 0, + } } pub fn write(&mut self, block: &BlockInfo, info: CardanoMessage) { @@ -79,7 +88,6 @@ impl Recorder { } } - async fn run( cfg: Arc, mut governance_s: Box>, @@ -93,10 +101,9 @@ impl Recorder { loop { let (blk_g, gov_procs) = Self::read_governance(&mut governance_s).await?; - let gov_procs_empty = - gov_procs.proposal_procedures.is_empty() && - gov_procs.voting_procedures.is_empty() && - !blk_g.new_epoch; + let gov_procs_empty = gov_procs.proposal_procedures.is_empty() + && gov_procs.voting_procedures.is_empty() + && !blk_g.new_epoch; if !gov_procs_empty { gov_recorder.write(&blk_g, CardanoMessage::GovernanceProcedures(gov_procs)); @@ -113,7 +120,9 @@ impl Recorder { info!("Waiting spo..."); let (blk_spo, d_spo) = Self::read_spo(&mut spo_s).await?; if blk_g != blk_spo { - error!("Governance {blk_g:?} and SPO distribution {blk_spo:?} are out of sync"); + error!( + "Governance {blk_g:?} and SPO distribution {blk_spo:?} are out of sync" + ); } drep_recorder.write(&blk_g, CardanoMessage::DRepStakeDistribution(d_drep)); diff --git a/processes/replayer/src/replayer_config.rs b/processes/replayer/src/replayer_config.rs index 9110a26..9cb2965 100644 --- a/processes/replayer/src/replayer_config.rs +++ b/processes/replayer/src/replayer_config.rs @@ -33,11 +33,26 @@ impl ReplayerConfig { } /// Returns of subscription topics: topic, prefix, is-epoch-wise, do-skip-epoch-0 - pub fn get_topics_vec(&self) -> Arc> { + pub fn get_topics_vec(&self) -> Arc> { Arc::new(vec![ - (self.subscribe_topic.to_string(), "gov".to_string(), false, false), - (self.drep_distribution_topic.to_string(), "drep".to_string(), true, true), - (self.spo_distribution_topic.to_string(), "spo".to_string(), true, true) + ( + self.subscribe_topic.to_string(), + "gov".to_string(), + false, + false, + ), + ( + self.drep_distribution_topic.to_string(), + "drep".to_string(), + true, + true, + ), + ( + self.spo_distribution_topic.to_string(), + "spo".to_string(), + true, + true, + ), ]) } } From 881a274ee23421c32a9fb8a2006b14e530483c2a Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Mon, 11 Aug 2025 16:49:43 +0200 Subject: [PATCH 2/5] feat: add github pre push hook for cargo fmt --- .github/pre-push | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100755 .github/pre-push diff --git a/.github/pre-push b/.github/pre-push new file mode 100755 index 0000000..e1ee896 --- /dev/null +++ b/.github/pre-push @@ -0,0 +1,6 @@ +#!/bin/sh +echo "Running cargo fmt check..." +if ! cargo fmt --all -- --check; then + echo "❌ Code is not formatted. Run 'cargo fmt --all' to fix." + exit 1 +fi \ No newline at end of file From 723987af19f5bc05c8d5d6ca6b213352dc525d71 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Mon, 11 Aug 2025 16:51:10 +0200 Subject: [PATCH 3/5] revert: git pre push hook --- .github/pre-push | 6 ------ 1 file changed, 6 deletions(-) delete mode 100755 .github/pre-push diff --git a/.github/pre-push b/.github/pre-push deleted file mode 100755 index e1ee896..0000000 --- a/.github/pre-push +++ /dev/null @@ -1,6 +0,0 @@ -#!/bin/sh -echo "Running cargo fmt check..." -if ! cargo fmt --all -- --check; then - echo "❌ Code is not formatted. Run 'cargo fmt --all' to fix." - exit 1 -fi \ No newline at end of file From 62defc057cc718de528f7f7ce232203311fe9e09 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Sun, 17 Aug 2025 08:59:21 +0200 Subject: [PATCH 4/5] revert: omnibuss toml file --- processes/omnibus/omnibus.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index 1b1ec07..a2deeda 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -8,7 +8,7 @@ genesis-key = "5b3139312c36362c3134302c3138352c3133382c31312c3233372c3230372c323 # Download max age in hours. E.g. 8 means 8 hours (if there isn't any snapshot within this time range download from Mithril) download-max-age = "never" # Pause constraint E.g. "epoch:100", "block:1200" -pause = "epoch:211" +pause = "none" [module.upstream-chain-fetcher] sync-point = "snapshot" @@ -77,4 +77,4 @@ bulk-resume-capacity = 75 # Message routing [[message-router.route]] # Everything is internal only pattern = "#" -bus = "internal" \ No newline at end of file +bus = "internal" From bbc9874b036f452207edb719a159b800cb141605 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Sun, 17 Aug 2025 09:01:20 +0200 Subject: [PATCH 5/5] chore: format files --- common/src/queries/pools.rs | 41 ++++++++++++++++++++++++++++--------- common/src/queries/utils.rs | 2 +- 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/common/src/queries/pools.rs b/common/src/queries/pools.rs index cb3c8f2..ff00316 100644 --- a/common/src/queries/pools.rs +++ b/common/src/queries/pools.rs @@ -6,17 +6,38 @@ pub enum PoolsStateQuery { GetPoolsListWithInfo, GetPoolsRetiredList, GetPoolsRetiringList, - GetPoolsActiveStakes { pools_operators: Vec>, epoch: u64 }, + GetPoolsActiveStakes { + pools_operators: Vec>, + epoch: u64, + }, // Get total blocks minted for each vrf vkey hashes (not included current epoch's blocks minted) - GetPoolsTotalBlocksMinted { vrf_key_hashes: Vec>}, - GetPoolInfo { pool_id: Vec }, - GetPoolHistory { pool_id: Vec }, - GetPoolMetadata { pool_id: Vec }, - GetPoolRelays { pool_id: Vec }, - GetPoolDelegators { pool_id: Vec }, - GetPoolBlocks { pool_id: Vec }, - GetPoolUpdates { pool_id: Vec }, - GetPoolVotes { pool_id: Vec }, + GetPoolsTotalBlocksMinted { + vrf_key_hashes: Vec>, + }, + GetPoolInfo { + pool_id: Vec, + }, + GetPoolHistory { + pool_id: Vec, + }, + GetPoolMetadata { + pool_id: Vec, + }, + GetPoolRelays { + pool_id: Vec, + }, + GetPoolDelegators { + pool_id: Vec, + }, + GetPoolBlocks { + pool_id: Vec, + }, + GetPoolUpdates { + pool_id: Vec, + }, + GetPoolVotes { + pool_id: Vec, + }, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] diff --git a/common/src/queries/utils.rs b/common/src/queries/utils.rs index a7f62e6..43ef193 100644 --- a/common/src/queries/utils.rs +++ b/common/src/queries/utils.rs @@ -1,6 +1,6 @@ -use std::sync::Arc; use anyhow::Result; use caryatid_sdk::Context; +use std::sync::Arc; use crate::messages::Message;