Skip to content

Commit 7f4d345

Browse files
committed
add GovernedMap cached DataSource implementation
1 parent a88d0e3 commit 7f4d345

File tree

6 files changed

+331
-9
lines changed

6 files changed

+331
-9
lines changed

demo/node/src/data_sources.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use authority_selection_inherents::authority_selection_inputs::AuthoritySelectio
22
use pallet_sidechain_rpc::SidechainRpcDataSource;
33
use partner_chains_db_sync_data_sources::{
44
block::BlockDataSourceImpl, candidates::CandidatesDataSourceImpl,
5-
governed_map::GovernedMapDataSourceImpl, mc_hash::McHashDataSourceImpl,
5+
governed_map::GovernedMapDataSourceCachedImpl, mc_hash::McHashDataSourceImpl,
66
metrics::McFollowerMetrics, native_token::NativeTokenManagementDataSourceImpl,
77
sidechain_rpc::SidechainRpcDataSourceImpl, stake_distribution::StakeDistributionDataSourceImpl,
88
};
@@ -69,7 +69,6 @@ pub fn create_mock_data_sources()
6969

7070
pub const CANDIDATES_FOR_EPOCH_CACHE_SIZE: usize = 64;
7171
pub const STAKE_CACHE_SIZE: usize = 100;
72-
7372
pub async fn create_cached_db_sync_data_sources(
7473
metrics_opt: Option<McFollowerMetrics>,
7574
) -> Result<DataSources, Box<dyn Error + Send + Sync + 'static>> {
@@ -96,6 +95,9 @@ pub async fn create_cached_db_sync_data_sources(
9695
metrics_opt.clone(),
9796
STAKE_CACHE_SIZE,
9897
)),
99-
governed_map: Arc::new(GovernedMapDataSourceImpl::new(pool, metrics_opt)),
98+
governed_map: Arc::new(GovernedMapDataSourceCachedImpl::new_from_env(
99+
pool,
100+
metrics_opt.clone(),
101+
)?),
100102
})
101103
}

toolkit/data-sources/db-sync/src/db_model.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,14 @@ pub(crate) struct DatumOutput {
114114
pub datum: DbDatum,
115115
}
116116

117+
#[derive(Debug, Clone, sqlx::FromRow, PartialEq)]
118+
pub(crate) struct DatumChangeOutput {
119+
pub datum: DbDatum,
120+
pub block_no: BlockNumber,
121+
pub block_index: TxIndexInBlock,
122+
pub action: String,
123+
}
124+
117125
#[derive(Debug, Clone, PartialEq)]
118126
pub(crate) struct NativeTokenAmount(pub u128);
119127
impl From<NativeTokenAmount> for sidechain_domain::NativeTokenAmount {
@@ -367,6 +375,53 @@ pub(crate) async fn get_token_utxo_for_epoch(
367375
.await?)
368376
}
369377

378+
#[cfg(feature = "governed-map")]
379+
pub(crate) async fn get_changes(
380+
pool: &Pool<Postgres>,
381+
address: &Address,
382+
last_block: Option<BlockNumber>,
383+
new_block: BlockNumber,
384+
asset: Asset,
385+
) -> Result<Vec<DatumChangeOutput>, SqlxError> {
386+
let query = "
387+
(SELECT
388+
datum.value as datum, origin_block.block_no as block_no, origin_tx.block_index as block_index, 'upsert' as action
389+
FROM tx_out
390+
INNER JOIN tx origin_tx ON tx_out.tx_id = origin_tx.id
391+
INNER JOIN block origin_block ON origin_tx.block_id = origin_block.id
392+
INNER JOIN datum ON tx_out.data_hash = datum.hash
393+
INNER JOIN ma_tx_out ON tx_out.id = ma_tx_out.tx_out_id
394+
INNER JOIN multi_asset ON multi_asset.id = ma_tx_out.ident
395+
WHERE
396+
tx_out.address = $1 AND ($2 IS NULL OR origin_block.block_no > $2) AND origin_block.block_no <= $3
397+
AND multi_asset.policy = $4
398+
AND multi_asset.name = $5)
399+
UNION
400+
(SELECT
401+
datum.value as datum, consuming_block.block_no as block_no, consuming_tx.block_index as block_index, 'remove' as action
402+
FROM tx_out
403+
LEFT JOIN tx_in consuming_tx_in ON tx_out.tx_id = consuming_tx_in.tx_out_id AND tx_out.index = consuming_tx_in.tx_out_index
404+
LEFT JOIN tx consuming_tx ON consuming_tx_in.tx_in_id = consuming_tx.id
405+
LEFT JOIN block consuming_block ON consuming_tx.block_id = consuming_block.id
406+
INNER JOIN datum ON tx_out.data_hash = datum.hash
407+
INNER JOIN ma_tx_out ON tx_out.id = ma_tx_out.tx_out_id
408+
INNER JOIN multi_asset ON multi_asset.id = ma_tx_out.ident
409+
WHERE
410+
tx_out.address = $1
411+
AND (consuming_tx_in.id IS NOT NULL AND ($2 IS NULL OR consuming_block.block_no > $2) AND consuming_block.block_no <= $3)
412+
AND multi_asset.policy = $4
413+
AND multi_asset.name = $5)
414+
ORDER BY block_no ASC";
415+
Ok(sqlx::query_as::<_, DatumChangeOutput>(query)
416+
.bind(&address.0)
417+
.bind(last_block)
418+
.bind(new_block)
419+
.bind(&asset.policy_id.0)
420+
.bind(&asset.asset_name.0)
421+
.fetch_all(pool)
422+
.await?)
423+
}
424+
370425
#[cfg(feature = "governed-map")]
371426
pub(crate) async fn get_datums_at_address_with_token(
372427
pool: &Pool<Postgres>,

toolkit/data-sources/db-sync/src/governed_map/mod.rs

Lines changed: 211 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
1-
use crate::DataSourceError::ExpectedDataNotFound;
1+
use crate::DataSourceError::{self, ExpectedDataNotFound};
22
use crate::Result;
3+
use crate::db_model::Block;
34
use crate::{metrics::McFollowerMetrics, observed_async_trait};
5+
use db_sync_sqlx::{BlockNumber, TxIndexInBlock};
46
use derive_new::new;
57
use log::warn;
68
use partner_chains_plutus_data::governed_map::GovernedMapDatum;
79
use sidechain_domain::byte_string::ByteString;
810
use sidechain_domain::*;
911
use sp_governed_map::{GovernedMapDataSource, MainChainScriptsV1};
1012
use sqlx::PgPool;
13+
use std::cmp::{max, min};
1114
use std::collections::HashMap;
15+
use std::sync::{Arc, Mutex};
1216

1317
#[cfg(test)]
1418
pub mod tests;
@@ -87,3 +91,209 @@ impl GovernedMapDataSourceImpl {
8791
Ok(mappings)
8892
}
8993
}
94+
95+
#[derive(new)]
96+
pub struct GovernedMapDataSourceCachedImpl {
97+
pub pool: PgPool,
98+
pub metrics_opt: Option<McFollowerMetrics>,
99+
security_parameter: u32,
100+
cache_size: u16,
101+
#[new(default)]
102+
cache: Arc<Mutex<Cache>>,
103+
}
104+
105+
observed_async_trait!(
106+
impl GovernedMapDataSource for GovernedMapDataSourceCachedImpl {
107+
async fn get_mapping_changes(
108+
&self,
109+
since_mc_block: Option<McBlockHash>,
110+
up_to_mc_block: McBlockHash,
111+
scripts: MainChainScriptsV1,
112+
) -> std::result::Result<
113+
Vec<(String, Option<ByteString>)>,
114+
Box<dyn std::error::Error + Send + Sync>,
115+
> {
116+
let since_block_number = match since_mc_block {
117+
Some(hash) =>
118+
Some(crate::db_model::get_block_by_hash(&self.pool, hash.clone())
119+
.await?
120+
.ok_or_else(|| Box::new(ExpectedDataNotFound(format!("Block hash: {hash}"))) as Box<dyn std::error::Error + Send + Sync>)?
121+
.block_no),
122+
None => None,
123+
};
124+
125+
let Some(up_to_block) =
126+
crate::db_model::get_block_by_hash(&self.pool, up_to_mc_block.clone()).await?
127+
else {
128+
return Err(Box::new(ExpectedDataNotFound(format!("Block hash: {up_to_mc_block}"))));
129+
};
130+
131+
if let Some(cached_changes) = self.get_changes_from_cache(since_block_number, up_to_block.block_no).await? {
132+
return Ok(cached_changes);
133+
}
134+
135+
let latest_block = get_latest_block(&self.pool).await?;
136+
137+
let latest_stable_block = BlockNumber(latest_block.block_no.0.saturating_sub(self.security_parameter));
138+
139+
let since_block_plus = BlockNumber(since_block_number.unwrap_or(BlockNumber(0)).0 + self.cache_size as u32);
140+
141+
let max_search_block = min(latest_stable_block, max(up_to_block.block_no, since_block_plus));
142+
143+
let changes = self.get_changes_in_range_to_cache(since_block_number, max_search_block, scripts).await?;
144+
145+
if let Ok(mut cache) = self.cache.lock() {
146+
cache.add_changes(changes.clone());
147+
}
148+
149+
Ok(deduplicate_changes(filter_changes_in_range(changes, since_block_number, up_to_block.block_no)))
150+
}
151+
}
152+
);
153+
154+
impl GovernedMapDataSourceCachedImpl {
155+
pub fn new_from_env(
156+
pool: PgPool,
157+
metrics_opt: Option<McFollowerMetrics>,
158+
) -> std::result::Result<Self, &'static str> {
159+
let security_parameter: u32 = std::env::var("CARDANO_SECURITY_PARAMETER")
160+
.ok()
161+
.and_then(|s| s.parse().ok())
162+
.ok_or("Couldn't read env variable CARDANO_SECURITY_PARAMETER as u32")?;
163+
Ok(Self {
164+
pool,
165+
metrics_opt,
166+
security_parameter,
167+
cache_size: 1000,
168+
cache: Default::default(),
169+
})
170+
}
171+
172+
async fn get_changes_from_cache(
173+
&self,
174+
since_block: Option<BlockNumber>,
175+
up_to_block: BlockNumber,
176+
) -> Result<Option<Vec<(String, Option<ByteString>)>>> {
177+
if let Ok(cache) = self.cache.lock() {
178+
if let Some(changes) = cache.get_changes_in_range(since_block, up_to_block) {
179+
return Ok(Some(deduplicate_changes(changes)));
180+
}
181+
};
182+
return Ok(None);
183+
}
184+
185+
async fn get_changes_in_range_to_cache(
186+
&self,
187+
since_block: Option<BlockNumber>,
188+
up_to_block: BlockNumber,
189+
scripts: MainChainScriptsV1,
190+
) -> Result<Vec<(String, Option<ByteString>, BlockNumber, TxIndexInBlock)>> {
191+
let latest_block_number = {
192+
let cache = self.cache.lock().expect("Failed to lock cache");
193+
194+
cache.newest_block_number.or(since_block)
195+
};
196+
let changes = crate::db_model::get_changes(
197+
&self.pool,
198+
&scripts.validator_address.into(),
199+
latest_block_number,
200+
up_to_block,
201+
scripts.asset.into(),
202+
)
203+
.await?;
204+
205+
let mut result = Vec::new();
206+
207+
for change in changes {
208+
match GovernedMapDatum::try_from(change.datum.0) {
209+
Ok(GovernedMapDatum { key, value }) => match change.action.as_str() {
210+
"remove" => result.push((key, None, change.block_no, change.block_index)),
211+
"upsert" => {
212+
result.push((key, Some(value), change.block_no, change.block_index))
213+
},
214+
_ => warn!("Unknown action: {}", change.action),
215+
},
216+
Err(err) => warn!("Failed decoding map entry: {err}"),
217+
}
218+
}
219+
Ok(result)
220+
}
221+
}
222+
223+
#[derive(Default)]
224+
pub(crate) struct Cache {
225+
newest_block_number: Option<BlockNumber>,
226+
changes: Vec<(String, Option<ByteString>, BlockNumber, TxIndexInBlock)>,
227+
}
228+
229+
fn deduplicate_changes(
230+
mut changes: Vec<(String, Option<ByteString>, BlockNumber, TxIndexInBlock)>,
231+
) -> Vec<(String, Option<ByteString>)> {
232+
let mut result = HashMap::new();
233+
234+
changes.sort_by_key(|(_, _, block_number, block_index)| (*block_number, *block_index));
235+
236+
for (key, value, _, _) in changes {
237+
if result.contains_key(&key) {
238+
result.remove(&key);
239+
}
240+
result.insert(key, value);
241+
}
242+
243+
result.into_iter().collect()
244+
}
245+
246+
fn filter_changes_in_range(
247+
changes: Vec<(String, Option<ByteString>, BlockNumber, TxIndexInBlock)>,
248+
since_block: Option<BlockNumber>,
249+
up_to_block: BlockNumber,
250+
) -> Vec<(String, Option<ByteString>, BlockNumber, TxIndexInBlock)> {
251+
changes
252+
.iter()
253+
.filter(|(_, _, block_number, _)| {
254+
block_number.0 <= up_to_block.0
255+
&& since_block.map(|b| block_number.0 > b.0).unwrap_or(true)
256+
})
257+
.map(|x| x.clone())
258+
.collect()
259+
}
260+
261+
async fn get_latest_block(pool: &PgPool) -> Result<Block> {
262+
crate::db_model::get_latest_block_info(pool).await?.ok_or(
263+
DataSourceError::ExpectedDataNotFound(
264+
"The latest block not found when querying for native token transfers".to_string(),
265+
),
266+
)
267+
}
268+
269+
impl Cache {
270+
pub fn get_changes_in_range(
271+
&self,
272+
since_block: Option<BlockNumber>,
273+
up_to_block: BlockNumber,
274+
) -> Option<Vec<(String, Option<ByteString>, BlockNumber, TxIndexInBlock)>> {
275+
let Some(newest_block_number) = self.newest_block_number else {
276+
return None;
277+
};
278+
279+
if newest_block_number.0 < up_to_block.0 {
280+
return None;
281+
}
282+
283+
Some(filter_changes_in_range(self.changes.clone(), since_block, up_to_block))
284+
}
285+
286+
pub fn add_changes(
287+
&mut self,
288+
changes: Vec<(String, Option<ByteString>, BlockNumber, TxIndexInBlock)>,
289+
) {
290+
self.changes.extend(changes);
291+
self.newest_block_number = Some(
292+
self.changes
293+
.iter()
294+
.max_by_key(|(_, _, block_number, _)| block_number.0)
295+
.map(|(_, _, block_number, _)| *block_number)
296+
.unwrap_or(BlockNumber(0)),
297+
);
298+
}
299+
}

0 commit comments

Comments
 (0)