Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/bin/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,10 @@ async fn main_unified() -> WalletResult<()> {
},
}

// Wait for event emitter for finish all outstanding tasks
if let Some(event_emitter) = &mut wallet_scanner.config_mut().event_emitter {
event_emitter.flush().await;
}
// Stop background writer gracefully
#[cfg(all(feature = "storage", not(target_arch = "wasm32")))]
if !storage_backend.is_memory_only {
Expand Down
63 changes: 36 additions & 27 deletions src/data_structures/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#[cfg(feature = "grpc")]
use rayon::prelude::*;
use tari_script::{Opcode, TariScript};
use tari_utilities::{hex::Hex, ByteArray};
use tari_utilities::ByteArray;

#[cfg(feature = "grpc")]
use crate::scanning::BlockInfo;
Expand Down Expand Up @@ -154,7 +154,7 @@ impl Block {
fn extract_script_key(&self, script: &Script) -> Option<CompressedPublicKey> {
let tari_script = TariScript::from_bytes(&script.bytes).ok()?;
if let [Opcode::PushPubKey(pk)] = tari_script.as_slice() {
let compressed_pk = CompressedPublicKey::from_hex(&pk.to_hex()).ok()?;
let compressed_pk = CompressedPublicKey::from_canonical_bytes(&pk.as_bytes()).ok()?;
return Some(compressed_pk);
}
None
Expand All @@ -179,7 +179,8 @@ impl Block {

// Handle coinbase outputs
if is_coinbase {
if let Some(result) = self.try_coinbase_output_optimized(output_index, output, view_key, script_key.clone())
if let Some(result) =
self.try_coinbase_output_optimized(output_index, output, view_key, script_key.as_ref())
{
return Some(result);
}
Expand All @@ -192,17 +193,15 @@ impl Block {
}

// Try regular decryption first (most common case)
if let Some(script_key) = script_key {
if let Some(result) = self.try_regular_decryption_optimized(
output_index,
output,
view_key,
script_key,
TransactionStatus::MinedConfirmed,
true,
) {
return Some(result);
}
if let Some(result) = self.try_regular_decryption_optimized(
output_index,
output,
view_key,
script_key.as_ref(),
TransactionStatus::MinedConfirmed,
true,
) {
return Some(result);
}

// Try one-sided decryption only if sender offset key is present
Expand All @@ -211,6 +210,7 @@ impl Block {
output_index,
output,
view_key,
script_key.as_ref(),
TransactionStatus::OneSidedConfirmed,
true,
) {
Expand All @@ -227,7 +227,7 @@ impl Block {
output_index: usize,
output: &TransactionOutput,
view_key: &PrivateKey,
script_key: Option<CompressedPublicKey>,
script_key: Option<&CompressedPublicKey>,
) -> Option<OutputProcessingResult> {
let coinbase_value = output.minimum_value_promise.as_u64();
if coinbase_value == 0 {
Expand All @@ -242,19 +242,27 @@ impl Block {
};

// Try regular decryption for ownership verification first (faster)
if let Some(sk) = script_key {
if let Some(result) =
self.try_regular_decryption_optimized(output_index, output, view_key, sk, transaction_status, is_mature)
{
return Some(result);
}
if let Some(result) = self.try_regular_decryption_optimized(
output_index,
output,
view_key,
script_key,
transaction_status,
is_mature,
) {
return Some(result);
}

// Only try one-sided decryption if regular failed and sender offset key exists
if !output.sender_offset_public_key.as_bytes().is_empty() {
if let Some(result) =
self.try_one_sided_decryption_optimized(output_index, output, view_key, transaction_status, is_mature)
{
if let Some(result) = self.try_one_sided_decryption_optimized(
output_index,
output,
view_key,
script_key,
transaction_status,
is_mature,
) {
return Some(result);
}
}
Expand All @@ -269,7 +277,7 @@ impl Block {
output_index: usize,
output: &TransactionOutput,
view_key: &PrivateKey,
script_key: CompressedPublicKey,
script_key: Option<&CompressedPublicKey>,
transaction_status: TransactionStatus,
is_mature: bool,
) -> Option<OutputProcessingResult> {
Expand All @@ -283,7 +291,7 @@ impl Block {
transaction_status,
is_mature,
commitment_mask_private_key,
script_key: Some(script_key),
script_key: script_key.map(|k| k.clone()),
});
}
None
Expand All @@ -295,6 +303,7 @@ impl Block {
output_index: usize,
output: &TransactionOutput,
view_key: &PrivateKey,
script_key: Option<&CompressedPublicKey>,
transaction_status: TransactionStatus,
is_mature: bool,
) -> Option<OutputProcessingResult> {
Expand All @@ -311,7 +320,7 @@ impl Block {
transaction_status,
is_mature,
commitment_mask_private_key,
script_key: None,
script_key: script_key.map(|k| k.clone()),
});
}
None
Expand Down
80 changes: 62 additions & 18 deletions src/events/listeners/database_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use tari_transaction_components::key_manager::{
#[cfg(all(feature = "storage", not(target_arch = "wasm32")))]
use tokio::sync::{mpsc, oneshot};

#[cfg(feature = "storage")]
use crate::data_structures::CompressedPublicKey;
#[cfg(feature = "storage")]
use crate::events::types::{AddressInfo, BlockInfo, OutputData, SpentOutputData, TransactionData};
#[cfg(feature = "storage")]
Expand All @@ -45,6 +47,22 @@ use crate::{
},
};

/// Represents a known key with its ID and compressed public key.
#[cfg(feature = "storage")]
#[derive(Debug, Clone)]
pub struct KnownKey {
pub key_id: TariKeyId,
pub pub_key: CompressedPublicKey,
}

/// Holds a TransactionKeyManager and a vector of KnownKey instances.
#[cfg(feature = "storage")]
#[derive(Clone)]
pub struct KeyManagerWithKnownKeys {
pub key_manager: TransactionKeyManager,
pub known_keys: Vec<KnownKey>,
}

/// Database storage listener that persists scan results to SQLite
///
/// This listener handles all database operations needed during wallet scanning,
Expand Down Expand Up @@ -102,7 +120,7 @@ pub struct DatabaseStorageListener {
/// Whether to enable event auditing (stores events in wallet_events table)
enable_event_auditing: bool,
/// Cache for TransactionKeyManager instances
key_managers: HashMap<u32, Arc<TransactionKeyManager>>,
key_managers: HashMap<u32, Arc<KeyManagerWithKnownKeys>>,

/// Background writer for non-WASM32 architectures
#[cfg(all(feature = "storage", not(target_arch = "wasm32")))]
Expand Down Expand Up @@ -293,7 +311,8 @@ impl DatabaseStorageListener {
async fn create_transaction_manager(
&mut self,
wallet_id: u32,
) -> Result<Arc<TransactionKeyManager>, Box<dyn Error + Send + Sync>> {
) -> Result<Arc<KeyManagerWithKnownKeys>, Box<dyn Error + Send + Sync>> {
use tari_utilities::ByteArray;
match self.key_managers.entry(wallet_id) {
Entry::Occupied(entry) => Ok(entry.get().clone()),
Entry::Vacant(entry) => {
Expand All @@ -309,7 +328,26 @@ impl DatabaseStorageListener {
)
.await?;

Ok(entry.insert(Arc::new(transaction_key_manager)).clone())
// Initialize known keys
let mut known_keys = Vec::new();
let comms_key_id = TariKeyId::Managed {
branch: KeyManagerBranch::Comms.get_branch_key(),
index: 0,
};
let core_comms_key = transaction_key_manager.get_public_key_at_key_id(&comms_key_id).await?;
let comms_pub_key = CompressedPublicKey::from_canonical_bytes(core_comms_key.as_bytes())?;

known_keys.push(KnownKey {
key_id: comms_key_id,
pub_key: comms_pub_key,
});

let key_manager_with_known_keys = Arc::new(KeyManagerWithKnownKeys {
key_manager: transaction_key_manager,
known_keys,
});

Ok(entry.insert(key_manager_with_known_keys).clone())
},
}
}
Expand Down Expand Up @@ -606,7 +644,7 @@ impl DatabaseStorageListener {
block_info: &BlockInfo,
_address_info: &AddressInfo,
transaction_data: &crate::events::types::TransactionData,
key_manager: &Arc<TransactionKeyManager>,
key_manager_with_known_keys: &Arc<KeyManagerWithKnownKeys>,
) -> Result<StoredOutput, Box<dyn Error + Send + Sync>> {
// Parse commitment from hex string
use crate::{data_structures::PaymentId, hex_utils::HexEncodable, wallet_scanner::extract_script_data};
Expand Down Expand Up @@ -634,19 +672,24 @@ impl DatabaseStorageListener {
.ok_or(crate::WalletError::StorageError("No spending key found".to_string()))?;
let core_commitment_mask_private_key = RistrettoSecretKey::try_from(commitment_mask_private_key)
.map_err(|e| crate::WalletError::ConversionError(e.to_string()))?;
let commitment_mask_private_key = key_manager.import_key(core_commitment_mask_private_key).await?;
let commitment_mask_private_key = key_manager_with_known_keys
.key_manager
.import_key(core_commitment_mask_private_key)
.await?;

let mut script_key_found = None;
if let Some(script_key) = &output_data.script_key {
for known_key in &key_manager_with_known_keys.known_keys {
if script_key == &known_key.pub_key {
script_key_found = Some(known_key.key_id.clone());
break;
}
}
}
let script_key = script_key_found.unwrap_or_else(|| TariKeyId::Derived {
key: SerializedKeyString::from(commitment_mask_private_key.clone().to_string()),
});

let script_key = match &output_data.script_key {
// UTXO of a normal transaction
Some(_) => TariKeyId::Managed {
branch: KeyManagerBranch::Comms.get_branch_key(),
index: 0,
},
// UTXO of a stealth transaction
None => TariKeyId::Derived {
key: SerializedKeyString::from(commitment_mask_private_key.clone().to_string()),
},
};
let payment_id = transaction_data
.payment_id
.as_deref()
Expand Down Expand Up @@ -1062,7 +1105,8 @@ impl DatabaseStorageListener {
address_info: &AddressInfo,
transaction_data: &crate::events::types::TransactionData,
) -> Result<Vec<u32>, Box<dyn Error + Send + Sync>> {
let key_manager = self.create_transaction_manager(wallet_id).await?;
let key_manager_with_known_keys = self.create_transaction_manager(wallet_id).await?;

// Convert event data to StoredOutput
let stored_output = self
.convert_to_stored_output(
Expand All @@ -1071,7 +1115,7 @@ impl DatabaseStorageListener {
block_info,
address_info,
transaction_data,
&key_manager,
&key_manager_with_known_keys,
)
.await?;

Expand Down
Loading