diff --git a/src/bin/scanner.rs b/src/bin/scanner.rs index 03d3003..7f45db4 100644 --- a/src/bin/scanner.rs +++ b/src/bin/scanner.rs @@ -804,6 +804,10 @@ async fn main_unified() -> WalletResult<()> { }, } + // Wait for event emitter for finish all outstanding tasks + if let Some(event_emitter) = &mut wallet_scanner.config_mut().event_emitter { + event_emitter.flush().await; + } // Stop background writer gracefully #[cfg(all(feature = "storage", not(target_arch = "wasm32")))] if !storage_backend.is_memory_only { diff --git a/src/data_structures/block.rs b/src/data_structures/block.rs index 22a31bf..8a5d1d4 100644 --- a/src/data_structures/block.rs +++ b/src/data_structures/block.rs @@ -11,7 +11,7 @@ #[cfg(feature = "grpc")] use rayon::prelude::*; use tari_script::{Opcode, TariScript}; -use tari_utilities::{hex::Hex, ByteArray}; +use tari_utilities::ByteArray; #[cfg(feature = "grpc")] use crate::scanning::BlockInfo; @@ -154,7 +154,7 @@ impl Block { fn extract_script_key(&self, script: &Script) -> Option { let tari_script = TariScript::from_bytes(&script.bytes).ok()?; if let [Opcode::PushPubKey(pk)] = tari_script.as_slice() { - let compressed_pk = CompressedPublicKey::from_hex(&pk.to_hex()).ok()?; + let compressed_pk = CompressedPublicKey::from_canonical_bytes(&pk.as_bytes()).ok()?; return Some(compressed_pk); } None @@ -179,7 +179,8 @@ impl Block { // Handle coinbase outputs if is_coinbase { - if let Some(result) = self.try_coinbase_output_optimized(output_index, output, view_key, script_key.clone()) + if let Some(result) = + self.try_coinbase_output_optimized(output_index, output, view_key, script_key.as_ref()) { return Some(result); } @@ -192,17 +193,15 @@ impl Block { } // Try regular decryption first (most common case) - if let Some(script_key) = script_key { - if let Some(result) = self.try_regular_decryption_optimized( - output_index, - output, - view_key, - script_key, - TransactionStatus::MinedConfirmed, - true, - ) { - return Some(result); - } + if let Some(result) = self.try_regular_decryption_optimized( + output_index, + output, + view_key, + script_key.as_ref(), + TransactionStatus::MinedConfirmed, + true, + ) { + return Some(result); } // Try one-sided decryption only if sender offset key is present @@ -211,6 +210,7 @@ impl Block { output_index, output, view_key, + script_key.as_ref(), TransactionStatus::OneSidedConfirmed, true, ) { @@ -227,7 +227,7 @@ impl Block { output_index: usize, output: &TransactionOutput, view_key: &PrivateKey, - script_key: Option, + script_key: Option<&CompressedPublicKey>, ) -> Option { let coinbase_value = output.minimum_value_promise.as_u64(); if coinbase_value == 0 { @@ -242,19 +242,27 @@ impl Block { }; // Try regular decryption for ownership verification first (faster) - if let Some(sk) = script_key { - if let Some(result) = - self.try_regular_decryption_optimized(output_index, output, view_key, sk, transaction_status, is_mature) - { - return Some(result); - } + if let Some(result) = self.try_regular_decryption_optimized( + output_index, + output, + view_key, + script_key, + transaction_status, + is_mature, + ) { + return Some(result); } // Only try one-sided decryption if regular failed and sender offset key exists if !output.sender_offset_public_key.as_bytes().is_empty() { - if let Some(result) = - self.try_one_sided_decryption_optimized(output_index, output, view_key, transaction_status, is_mature) - { + if let Some(result) = self.try_one_sided_decryption_optimized( + output_index, + output, + view_key, + script_key, + transaction_status, + is_mature, + ) { return Some(result); } } @@ -269,7 +277,7 @@ impl Block { output_index: usize, output: &TransactionOutput, view_key: &PrivateKey, - script_key: CompressedPublicKey, + script_key: Option<&CompressedPublicKey>, transaction_status: TransactionStatus, is_mature: bool, ) -> Option { @@ -283,7 +291,7 @@ impl Block { transaction_status, is_mature, commitment_mask_private_key, - script_key: Some(script_key), + script_key: script_key.map(|k| k.clone()), }); } None @@ -295,6 +303,7 @@ impl Block { output_index: usize, output: &TransactionOutput, view_key: &PrivateKey, + script_key: Option<&CompressedPublicKey>, transaction_status: TransactionStatus, is_mature: bool, ) -> Option { @@ -311,7 +320,7 @@ impl Block { transaction_status, is_mature, commitment_mask_private_key, - script_key: None, + script_key: script_key.map(|k| k.clone()), }); } None diff --git a/src/events/listeners/database_storage.rs b/src/events/listeners/database_storage.rs index 3a6ed49..7d7612f 100644 --- a/src/events/listeners/database_storage.rs +++ b/src/events/listeners/database_storage.rs @@ -24,6 +24,8 @@ use tari_transaction_components::key_manager::{ #[cfg(all(feature = "storage", not(target_arch = "wasm32")))] use tokio::sync::{mpsc, oneshot}; +#[cfg(feature = "storage")] +use crate::data_structures::CompressedPublicKey; #[cfg(feature = "storage")] use crate::events::types::{AddressInfo, BlockInfo, OutputData, SpentOutputData, TransactionData}; #[cfg(feature = "storage")] @@ -45,6 +47,22 @@ use crate::{ }, }; +/// Represents a known key with its ID and compressed public key. +#[cfg(feature = "storage")] +#[derive(Debug, Clone)] +pub struct KnownKey { + pub key_id: TariKeyId, + pub pub_key: CompressedPublicKey, +} + +/// Holds a TransactionKeyManager and a vector of KnownKey instances. +#[cfg(feature = "storage")] +#[derive(Clone)] +pub struct KeyManagerWithKnownKeys { + pub key_manager: TransactionKeyManager, + pub known_keys: Vec, +} + /// Database storage listener that persists scan results to SQLite /// /// This listener handles all database operations needed during wallet scanning, @@ -102,7 +120,7 @@ pub struct DatabaseStorageListener { /// Whether to enable event auditing (stores events in wallet_events table) enable_event_auditing: bool, /// Cache for TransactionKeyManager instances - key_managers: HashMap>, + key_managers: HashMap>, /// Background writer for non-WASM32 architectures #[cfg(all(feature = "storage", not(target_arch = "wasm32")))] @@ -293,7 +311,8 @@ impl DatabaseStorageListener { async fn create_transaction_manager( &mut self, wallet_id: u32, - ) -> Result, Box> { + ) -> Result, Box> { + use tari_utilities::ByteArray; match self.key_managers.entry(wallet_id) { Entry::Occupied(entry) => Ok(entry.get().clone()), Entry::Vacant(entry) => { @@ -309,7 +328,26 @@ impl DatabaseStorageListener { ) .await?; - Ok(entry.insert(Arc::new(transaction_key_manager)).clone()) + // Initialize known keys + let mut known_keys = Vec::new(); + let comms_key_id = TariKeyId::Managed { + branch: KeyManagerBranch::Comms.get_branch_key(), + index: 0, + }; + let core_comms_key = transaction_key_manager.get_public_key_at_key_id(&comms_key_id).await?; + let comms_pub_key = CompressedPublicKey::from_canonical_bytes(core_comms_key.as_bytes())?; + + known_keys.push(KnownKey { + key_id: comms_key_id, + pub_key: comms_pub_key, + }); + + let key_manager_with_known_keys = Arc::new(KeyManagerWithKnownKeys { + key_manager: transaction_key_manager, + known_keys, + }); + + Ok(entry.insert(key_manager_with_known_keys).clone()) }, } } @@ -606,7 +644,7 @@ impl DatabaseStorageListener { block_info: &BlockInfo, _address_info: &AddressInfo, transaction_data: &crate::events::types::TransactionData, - key_manager: &Arc, + key_manager_with_known_keys: &Arc, ) -> Result> { // Parse commitment from hex string use crate::{data_structures::PaymentId, hex_utils::HexEncodable, wallet_scanner::extract_script_data}; @@ -634,19 +672,24 @@ impl DatabaseStorageListener { .ok_or(crate::WalletError::StorageError("No spending key found".to_string()))?; let core_commitment_mask_private_key = RistrettoSecretKey::try_from(commitment_mask_private_key) .map_err(|e| crate::WalletError::ConversionError(e.to_string()))?; - let commitment_mask_private_key = key_manager.import_key(core_commitment_mask_private_key).await?; + let commitment_mask_private_key = key_manager_with_known_keys + .key_manager + .import_key(core_commitment_mask_private_key) + .await?; + + let mut script_key_found = None; + if let Some(script_key) = &output_data.script_key { + for known_key in &key_manager_with_known_keys.known_keys { + if script_key == &known_key.pub_key { + script_key_found = Some(known_key.key_id.clone()); + break; + } + } + } + let script_key = script_key_found.unwrap_or_else(|| TariKeyId::Derived { + key: SerializedKeyString::from(commitment_mask_private_key.clone().to_string()), + }); - let script_key = match &output_data.script_key { - // UTXO of a normal transaction - Some(_) => TariKeyId::Managed { - branch: KeyManagerBranch::Comms.get_branch_key(), - index: 0, - }, - // UTXO of a stealth transaction - None => TariKeyId::Derived { - key: SerializedKeyString::from(commitment_mask_private_key.clone().to_string()), - }, - }; let payment_id = transaction_data .payment_id .as_deref() @@ -1062,7 +1105,8 @@ impl DatabaseStorageListener { address_info: &AddressInfo, transaction_data: &crate::events::types::TransactionData, ) -> Result, Box> { - let key_manager = self.create_transaction_manager(wallet_id).await?; + let key_manager_with_known_keys = self.create_transaction_manager(wallet_id).await?; + // Convert event data to StoredOutput let stored_output = self .convert_to_stored_output( @@ -1071,7 +1115,7 @@ impl DatabaseStorageListener { block_info, address_info, transaction_data, - &key_manager, + &key_manager_with_known_keys, ) .await?; diff --git a/src/scanning/event_emitter.rs b/src/scanning/event_emitter.rs index 33d56b5..0999be4 100644 --- a/src/scanning/event_emitter.rs +++ b/src/scanning/event_emitter.rs @@ -91,6 +91,15 @@ pub struct ScanEventEmitter { current_context: Option, /// Whether to use fire-and-forget mode for event emission (non-blocking) fire_and_forget: bool, + /// Bookkeeping for spawned background event tasks (only used if fire_and_forget) + #[cfg(not(target_arch = "wasm32"))] + spawned_tasks: Arc>>>, + + #[cfg(not(target_arch = "wasm32"))] + spawned_tasks_watcher: Option>, + + #[cfg(target_arch = "wasm32")] + pending_tasks: std::sync::Arc>, // simple counter for WASM } impl ScanEventEmitter { @@ -104,6 +113,12 @@ impl ScanEventEmitter { current_config: None, current_context: None, fire_and_forget: true, + #[cfg(not(target_arch = "wasm32"))] + spawned_tasks: Arc::new(tokio::sync::Mutex::new(Vec::new())), + #[cfg(not(target_arch = "wasm32"))] + spawned_tasks_watcher: None, + #[cfg(target_arch = "wasm32")] + pending_tasks: std::sync::Arc::new(tokio::sync::Mutex::new(0)), } } @@ -119,13 +134,31 @@ impl ScanEventEmitter { /// listeners to complete processing. This is critical for scanning performance /// when slow listeners (like database operations) are registered. pub fn with_fire_and_forget(mut self, enabled: bool) -> Self { - self.fire_and_forget = enabled; + self.set_fire_and_forget(enabled); self } /// Set fire-and-forget mode for non-blocking event emission pub fn set_fire_and_forget(&mut self, enabled: bool) { self.fire_and_forget = enabled; + #[cfg(not(target_arch = "wasm32"))] + { + if enabled && self.spawned_tasks_watcher.is_none() { + // Start watcher task to clean up finished spawned_tasks + let spawned_tasks = Arc::clone(&self.spawned_tasks); + let watcher = tokio::spawn(async move { + use tokio::time::{sleep, Duration}; + loop { + { + let mut tasks = spawned_tasks.lock().await; + tasks.retain(|handle| !handle.is_finished()); + } + sleep(Duration::from_millis(100)).await; + } + }); + self.spawned_tasks_watcher = Some(watcher); + } + } } /// Set the current scan configuration for reference in events @@ -478,19 +511,30 @@ impl ScanEventEmitter { #[cfg(not(target_arch = "wasm32"))] { // Spawn the dispatch in the background and don't wait for it - tokio::spawn(async move { + let handle = tokio::spawn(async move { let mut disp = dispatcher.lock().await; disp.dispatch(event).await; }); + let spawned_tasks = Arc::clone(&self.spawned_tasks); + tokio::spawn(async move { + spawned_tasks.lock().await.push(handle); + }); // Return immediately without waiting for the spawned task } #[cfg(target_arch = "wasm32")] { // Use spawn_local for WASM + let pending = self.pending_tasks.clone(); + { + let mut count = pending.lock().await; + *count += 1; + } wasm_bindgen_futures::spawn_local(async move { let mut disp = dispatcher.lock().await; disp.dispatch(event).await; + let mut count = pending.lock().await; + *count -= 1; }); // Return immediately without waiting for the spawned task } @@ -501,6 +545,33 @@ impl ScanEventEmitter { } } + /// Wait for all spawned background event tasks to finish (fire-and-forget mode) + pub async fn flush(&mut self) { + #[cfg(not(target_arch = "wasm32"))] + { + // Stop watcher task if running + if let Some(watcher) = self.spawned_tasks_watcher.take() { + watcher.abort(); + } + // Await all remaining tasks + let mut tasks = self.spawned_tasks.lock().await; + while let Some(handle) = tasks.pop() { + let _ = handle.await; + } + } + #[cfg(target_arch = "wasm32")] + { + use tokio::time::{sleep, Duration}; + loop { + let count = { *self.pending_tasks.lock().await }; + if count == 0 { + break; + } + sleep(Duration::from_millis(10)).await; + } + } + } + /// Create event metadata with consistent source and correlation ID fn create_metadata(&self) -> EventMetadata { match &self.correlation_id { diff --git a/src/signing/outgoing_tx_builder.rs b/src/signing/outgoing_tx_builder.rs index cd822ce..a6bfcf8 100644 --- a/src/signing/outgoing_tx_builder.rs +++ b/src/signing/outgoing_tx_builder.rs @@ -9,10 +9,16 @@ use tari_common_types::{ use tari_script::push_pubkey_script; use tari_transaction_components::{ consensus::ConsensusConstantsBuilder, - key_manager::TransactionKeyManagerInterface, + key_manager::{TariKeyId, TransactionKeyManagerInterface}, tari_amount::MicroMinotari, transaction_builder::FinalizedTransaction, - transaction_components::{memo_field::MemoField, OutputFeatures, TransactionError, WalletOutputBuilder}, + transaction_components::{ + memo_field::MemoField, + one_sided::{shared_secret_to_output_encryption_key, shared_secret_to_output_spending_key}, + OutputFeatures, + TransactionError, + WalletOutputBuilder, + }, TransactionBuilder, }; @@ -72,33 +78,45 @@ impl OutgoingTxBuilder { .await .map_err(|err| TransactionError::BuilderError(err.to_string()))?; - let (commitment_mask_key, script_key) = self - .transaction_key_manager - .as_interface() - .get_next_commitment_mask_and_script_key() - .await?; + let output_builder_key_manager = self.transaction_key_manager.clone().as_interface(); - let sender_offset = self - .transaction_key_manager - .get_next_key(TransactionKeyManagerBranch::SenderOffset.get_branch_key()) + let sender_offset = output_builder_key_manager + .get_next_key(TransactionKeyManagerBranch::OneSidedSenderOffset.get_branch_key()) .await .unwrap(); - let output_builder_key_manager = self.transaction_key_manager.clone().as_interface(); + let shared_secret = output_builder_key_manager + .get_diffie_hellman_shared_secret( + &sender_offset.key_id, + dest_address + .public_view_key() + .ok_or(WalletError::DataError("Missing addressee public view key".to_string()))?, + ) + .await?; + + let commitment_mask_key = shared_secret_to_output_spending_key(&shared_secret) + .map_err(|err| TransactionError::BuilderError(err.to_string()))?; + let commitment_mask_key_id = output_builder_key_manager + .import_key(commitment_mask_key.clone()) + .await?; + + let encryption_private_key = shared_secret_to_output_encryption_key(&shared_secret) + .map_err(|err| TransactionError::BuilderError(err.to_string()))?; + let encryption_key = output_builder_key_manager.import_key(encryption_private_key).await?; let script_spending_key = output_builder_key_manager - .stealth_address_script_spending_key(&commitment_mask_key.key_id, dest_address.public_spend_key()) + .stealth_address_script_spending_key(&commitment_mask_key_id, dest_address.public_spend_key()) .await?; let script = push_pubkey_script(&script_spending_key); - let recipient_output = WalletOutputBuilder::new(amount, commitment_mask_key.key_id) + let recipient_output = WalletOutputBuilder::new(amount, commitment_mask_key_id.clone()) .with_features(OutputFeatures::default()) .with_script(script) - .encrypt_data_for_recovery(&output_builder_key_manager, None, payment_id.clone()) + .encrypt_data_for_recovery(&output_builder_key_manager, Some(&encryption_key), payment_id.clone()) .await? .with_input_data(Default::default()) .with_sender_offset_public_key(sender_offset.pub_key) - .with_script_key(script_key.key_id) + .with_script_key(TariKeyId::Zero) .with_minimum_value_promise(0.into()) .sign_as_sender_and_receiver_verified(&output_builder_key_manager, &sender_offset.key_id, &dest_address) .await? diff --git a/src/signing/prepare/output_converter.rs b/src/signing/prepare/output_converter.rs index 68a54e6..4d3fb36 100644 --- a/src/signing/prepare/output_converter.rs +++ b/src/signing/prepare/output_converter.rs @@ -69,14 +69,16 @@ impl OutputConverter { "TODO: commitment_mask_key_id: {}, export_safe_script_key_id: {}", commitment_mask_key_id, export_safe_script_key_id ); - let rangeproof = o - .rangeproof - .map(|p| { - let hex = String::from_utf8(p).map_err(|e| WalletError::ConversionError(e.to_string()))?; + let rangeproof = match o.rangeproof { + Some(rp) if !rp.is_empty() => { + let hex = String::from_utf8(rp).map_err(|e| WalletError::ConversionError(e.to_string()))?; let binary = HexUtils::from_hex(&hex).map_err(|e| WalletError::ConversionError(e.to_string()))?; - BulletRangeProof::from_vec(&binary).map_err(|e| WalletError::ConversionError(e.to_string())) - }) - .transpose()?; + let result = + BulletRangeProof::from_vec(&binary).map_err(|e| WalletError::ConversionError(e.to_string()))?; + Some(result) + }, + Some(_) | None => None, + }; let wallet_output = WalletOutput::new_with_rangeproof( TransactionOutputVersion::get_current_version(),