From 7a18cad68be6bda33deaeb50ec9a5307d1ce8f6d Mon Sep 17 00:00:00 2001 From: Wei Chen Date: Sat, 22 Mar 2025 17:46:12 +0530 Subject: [PATCH 1/6] feat(electrum): optimize merkle proof validation with batching Co-authored-by: keerthi --- crates/electrum/src/bdk_electrum_client.rs | 181 ++++++++++++++++----- crates/electrum/tests/test_electrum.rs | 49 ++++++ 2 files changed, 185 insertions(+), 45 deletions(-) diff --git a/crates/electrum/src/bdk_electrum_client.rs b/crates/electrum/src/bdk_electrum_client.rs index e0eac5083..f4ab32b20 100644 --- a/crates/electrum/src/bdk_electrum_client.rs +++ b/crates/electrum/src/bdk_electrum_client.rs @@ -12,6 +12,9 @@ use std::sync::{Arc, Mutex}; /// We include a chain suffix of a certain length for the purpose of robustness. const CHAIN_SUFFIX_LENGTH: u32 = 8; +/// Maximum batch size for proof validation requests +const MAX_BATCH_SIZE: usize = 100; + /// Wrapper around an [`electrum_client::ElectrumApi`] which includes an internal in-memory /// transaction cache to avoid re-fetching already downloaded transactions. #[derive(Debug)] @@ -22,6 +25,8 @@ pub struct BdkElectrumClient { tx_cache: Mutex>>, /// The header cache block_header_cache: Mutex>, + /// Cache of transaction anchors + anchor_cache: Mutex>, } impl BdkElectrumClient { @@ -31,6 +36,7 @@ impl BdkElectrumClient { inner: client, tx_cache: Default::default(), block_header_cache: Default::default(), + anchor_cache: Default::default(), } } @@ -135,13 +141,19 @@ impl BdkElectrumClient { let mut tx_update = TxUpdate::::default(); let mut last_active_indices = BTreeMap::::default(); + let mut pending_anchors = Vec::new(); for keychain in request.keychains() { let spks = request .iter_spks(keychain.clone()) .map(|(spk_i, spk)| (spk_i, SpkWithExpectedTxids::from(spk))); - if let Some(last_active_index) = - self.populate_with_spks(start_time, &mut tx_update, spks, stop_gap, batch_size)? - { + if let Some(last_active_index) = self.populate_with_spks( + start_time, + &mut tx_update, + spks, + stop_gap, + batch_size, + &mut pending_anchors, + )? { last_active_indices.insert(keychain, last_active_index); } } @@ -151,6 +163,13 @@ impl BdkElectrumClient { self.fetch_prev_txout(&mut tx_update)?; } + if !pending_anchors.is_empty() { + let anchors = self.batch_fetch_anchors(&pending_anchors)?; + for (txid, anchor) in anchors { + tx_update.anchors.insert((anchor, txid)); + } + } + let chain_update = match tip_and_latest_blocks { Some((chain_tip, latest_blocks)) => Some(chain_update( chain_tip, @@ -204,6 +223,7 @@ impl BdkElectrumClient { }; let mut tx_update = TxUpdate::::default(); + let mut pending_anchors = Vec::new(); self.populate_with_spks( start_time, &mut tx_update, @@ -213,15 +233,33 @@ impl BdkElectrumClient { .map(|(i, spk)| (i as u32, spk)), usize::MAX, batch_size, + &mut pending_anchors, + )?; + self.populate_with_txids( + start_time, + &mut tx_update, + request.iter_txids(), + &mut pending_anchors, + )?; + self.populate_with_outpoints( + start_time, + &mut tx_update, + request.iter_outpoints(), + &mut pending_anchors, )?; - self.populate_with_txids(start_time, &mut tx_update, request.iter_txids())?; - self.populate_with_outpoints(start_time, &mut tx_update, request.iter_outpoints())?; // Fetch previous `TxOut`s for fee calculation if flag is enabled. if fetch_prev_txouts { self.fetch_prev_txout(&mut tx_update)?; } + if !pending_anchors.is_empty() { + let anchors = self.batch_fetch_anchors(&pending_anchors)?; + for (txid, anchor) in anchors { + tx_update.anchors.insert((anchor, txid)); + } + } + let chain_update = match tip_and_latest_blocks { Some((chain_tip, latest_blocks)) => Some(chain_update( chain_tip, @@ -249,16 +287,17 @@ impl BdkElectrumClient { mut spks_with_expected_txids: impl Iterator, stop_gap: usize, batch_size: usize, + pending_anchors: &mut Vec<(Txid, usize)>, ) -> Result, Error> { - let mut unused_spk_count = 0_usize; - let mut last_active_index = Option::::None; + let mut unused_spk_count = 0; + let mut last_active_index = None; loop { let spks = (0..batch_size) .map_while(|_| spks_with_expected_txids.next()) .collect::>(); if spks.is_empty() { - return Ok(last_active_index); + break; } let spk_histories = self @@ -267,9 +306,9 @@ impl BdkElectrumClient { for ((spk_index, spk), spk_history) in spks.into_iter().zip(spk_histories) { if spk_history.is_empty() { - unused_spk_count = unused_spk_count.saturating_add(1); + unused_spk_count += 1; if unused_spk_count >= stop_gap { - return Ok(last_active_index); + break; } } else { last_active_index = Some(spk_index); @@ -292,7 +331,7 @@ impl BdkElectrumClient { match tx_res.height.try_into() { // Returned heights 0 & -1 are reserved for unconfirmed txs. Ok(height) if height > 0 => { - self.validate_merkle_for_anchor(tx_update, tx_res.tx_hash, height)?; + pending_anchors.push((tx_res.tx_hash, height)); } _ => { tx_update.seen_ats.insert((tx_res.tx_hash, start_time)); @@ -301,6 +340,8 @@ impl BdkElectrumClient { } } } + + Ok(last_active_index) } /// Populate the `tx_update` with associated transactions/anchors of `outpoints`. @@ -312,6 +353,7 @@ impl BdkElectrumClient { start_time: u64, tx_update: &mut TxUpdate, outpoints: impl IntoIterator, + pending_anchors: &mut Vec<(Txid, usize)>, ) -> Result<(), Error> { for outpoint in outpoints { let op_txid = outpoint.txid; @@ -337,7 +379,7 @@ impl BdkElectrumClient { match res.height.try_into() { // Returned heights 0 & -1 are reserved for unconfirmed txs. Ok(height) if height > 0 => { - self.validate_merkle_for_anchor(tx_update, res.tx_hash, height)?; + pending_anchors.push((res.tx_hash, height)); } _ => { tx_update.seen_ats.insert((res.tx_hash, start_time)); @@ -359,7 +401,7 @@ impl BdkElectrumClient { match res.height.try_into() { // Returned heights 0 & -1 are reserved for unconfirmed txs. Ok(height) if height > 0 => { - self.validate_merkle_for_anchor(tx_update, res.tx_hash, height)?; + pending_anchors.push((res.tx_hash, height)); } _ => { tx_update.seen_ats.insert((res.tx_hash, start_time)); @@ -368,6 +410,7 @@ impl BdkElectrumClient { } } } + Ok(()) } @@ -377,6 +420,7 @@ impl BdkElectrumClient { start_time: u64, tx_update: &mut TxUpdate, txids: impl IntoIterator, + pending_anchors: &mut Vec<(Txid, usize)>, ) -> Result<(), Error> { for txid in txids { let tx = match self.fetch_tx(txid) { @@ -402,7 +446,7 @@ impl BdkElectrumClient { match r.height.try_into() { // Returned heights 0 & -1 are reserved for unconfirmed txs. Ok(height) if height > 0 => { - self.validate_merkle_for_anchor(tx_update, txid, height)?; + pending_anchors.push((txid, height)); } _ => { tx_update.seen_ats.insert((r.tx_hash, start_time)); @@ -412,52 +456,99 @@ impl BdkElectrumClient { tx_update.txs.push(tx); } + Ok(()) } - // Helper function which checks if a transaction is confirmed by validating the merkle proof. - // An anchor is inserted if the transaction is validated to be in a confirmed block. - fn validate_merkle_for_anchor( + /// Batch validate Merkle proofs, cache each confirmation anchor, and return them. + fn batch_fetch_anchors( &self, - tx_update: &mut TxUpdate, - txid: Txid, - confirmation_height: usize, - ) -> Result<(), Error> { - if let Ok(merkle_res) = self - .inner - .transaction_get_merkle(&txid, confirmation_height) + txs_with_heights: &[(Txid, usize)], + ) -> Result, Error> { + let mut results = Vec::with_capacity(txs_with_heights.len()); + let mut to_fetch = Vec::new(); + + // Build a map for height to block hash conversions. This is for obtaining block hash data + // with minimum `fetch_header` calls. + let mut height_to_hash: HashMap = HashMap::new(); + for &(_, height) in txs_with_heights { + let h = height as u32; + if !height_to_hash.contains_key(&h) { + // Try to obtain hash from the header cache, or fetch the header if absent. + let hash = self.fetch_header(h)?.block_hash(); + height_to_hash.insert(h, hash); + } + } + + // Check cache. { - let mut header = self.fetch_header(merkle_res.block_height as u32)?; - let mut is_confirmed_tx = electrum_client::utils::validate_merkle_proof( - &txid, - &header.merkle_root, - &merkle_res, - ); - - // Merkle validation will fail if the header in `block_header_cache` is outdated, so we - // want to check if there is a new header and validate against the new one. - if !is_confirmed_tx { - header = self.update_header(merkle_res.block_height as u32)?; - is_confirmed_tx = electrum_client::utils::validate_merkle_proof( + let anchor_cache = self.anchor_cache.lock().unwrap(); + for &(txid, height) in txs_with_heights { + let h = height as u32; + let hash = height_to_hash[&h]; + if let Some(anchor) = anchor_cache.get(&(txid, hash)) { + results.push((txid, *anchor)); + } else { + to_fetch.push((txid, height, hash)); + } + } + } + + // Fetch missing proofs in batches + for chunk in to_fetch.chunks(MAX_BATCH_SIZE) { + for &(txid, height, hash) in chunk { + // Fetch the raw proof. + let proof = self.inner.transaction_get_merkle(&txid, height)?; + + // Validate against header, retrying once on stale header. + let mut header = self.fetch_header(height as u32)?; + let mut valid = electrum_client::utils::validate_merkle_proof( &txid, &header.merkle_root, - &merkle_res, + &proof, ); - } + if !valid { + header = self.update_header(height as u32)?; + valid = electrum_client::utils::validate_merkle_proof( + &txid, + &header.merkle_root, + &proof, + ); + } - if is_confirmed_tx { - tx_update.anchors.insert(( - ConfirmationBlockTime { + // Build and cache the anchor if merkle proof is valid. + if valid { + let anchor = ConfirmationBlockTime { confirmation_time: header.time as u64, block_id: BlockId { - height: merkle_res.block_height as u32, + height: height as u32, hash: header.block_hash(), }, - }, - txid, - )); + }; + self.anchor_cache + .lock() + .unwrap() + .insert((txid, hash), anchor); + results.push((txid, anchor)); + } } } + + Ok(results) + } + + /// Validate a single transaction’s Merkle proof, cache its confirmation anchor, and update. + #[allow(dead_code)] + fn validate_anchor_for_update( + &self, + tx_update: &mut TxUpdate, + txid: Txid, + confirmation_height: usize, + ) -> Result<(), Error> { + let anchors = self.batch_fetch_anchors(&[(txid, confirmation_height)])?; + for (txid, anchor) in anchors { + tx_update.anchors.insert((anchor, txid)); + } Ok(()) } diff --git a/crates/electrum/tests/test_electrum.rs b/crates/electrum/tests/test_electrum.rs index 5302e62f2..7b6a63cd8 100644 --- a/crates/electrum/tests/test_electrum.rs +++ b/crates/electrum/tests/test_electrum.rs @@ -20,6 +20,7 @@ use core::time::Duration; use electrum_client::ElectrumApi; use std::collections::{BTreeSet, HashMap, HashSet}; use std::str::FromStr; +use std::time::Instant; // Batch size for `sync_with_electrum`. const BATCH_SIZE: usize = 5; @@ -881,3 +882,51 @@ fn test_check_fee_calculation() -> anyhow::Result<()> { } Ok(()) } + +#[test] +pub fn test_sync_performance() -> anyhow::Result<()> { + const EXPECTED_MAX_SYNC_TIME: Duration = Duration::from_secs(5); + const NUM_ADDRESSES: usize = 1000; + + let env = TestEnv::new()?; + let electrum_client = electrum_client::Client::new(env.electrsd.electrum_url.as_str())?; + let client = BdkElectrumClient::new(electrum_client); + + // Generate test addresses. + let mut spks = Vec::with_capacity(NUM_ADDRESSES); + for _ in 0..NUM_ADDRESSES { + spks.push(get_test_spk()); + } + + // Mine some blocks and send transactions. + env.mine_blocks(101, None)?; + for spk in spks.iter().take(10) { + let addr = Address::from_script(spk, Network::Regtest)?; + env.send(&addr, Amount::from_sat(10_000))?; + } + env.mine_blocks(1, None)?; + + // Setup receiver. + let (mut recv_chain, _) = LocalChain::from_genesis_hash(env.bitcoind.client.get_block_hash(0)?); + let mut recv_graph = IndexedTxGraph::::new({ + let mut recv_index = SpkTxOutIndex::default(); + for spk in spks.iter() { + recv_index.insert_spk((), spk.clone()); + } + recv_index + }); + + // Measure sync time. + let start = Instant::now(); + let _ = sync_with_electrum(&client, spks.clone(), &mut recv_chain, &mut recv_graph)?; + let sync_duration = start.elapsed(); + + assert!( + sync_duration <= EXPECTED_MAX_SYNC_TIME, + "Sync took {:?}, which is longer than expected {:?}", + sync_duration, + EXPECTED_MAX_SYNC_TIME + ); + + Ok(()) +} From b57768dd2bd25d4fc9d6acc3990b04fa8594e7b1 Mon Sep 17 00:00:00 2001 From: keerthi Date: Thu, 10 Apr 2025 14:51:26 +0530 Subject: [PATCH 2/6] fix(electrum): improve tx validation and gap limit scanning --- crates/electrum/src/bdk_electrum_client.rs | 4 ++++ crates/electrum/tests/test_electrum.rs | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/electrum/src/bdk_electrum_client.rs b/crates/electrum/src/bdk_electrum_client.rs index f4ab32b20..4e89fdb80 100644 --- a/crates/electrum/src/bdk_electrum_client.rs +++ b/crates/electrum/src/bdk_electrum_client.rs @@ -339,6 +339,10 @@ impl BdkElectrumClient { } } } + + if unused_spk_count >= stop_gap { + break; + } } Ok(last_active_index) diff --git a/crates/electrum/tests/test_electrum.rs b/crates/electrum/tests/test_electrum.rs index 7b6a63cd8..1973b456e 100644 --- a/crates/electrum/tests/test_electrum.rs +++ b/crates/electrum/tests/test_electrum.rs @@ -885,7 +885,7 @@ fn test_check_fee_calculation() -> anyhow::Result<()> { #[test] pub fn test_sync_performance() -> anyhow::Result<()> { - const EXPECTED_MAX_SYNC_TIME: Duration = Duration::from_secs(5); + const EXPECTED_MAX_SYNC_TIME: Duration = Duration::from_secs(15); const NUM_ADDRESSES: usize = 1000; let env = TestEnv::new()?; From f21a21d8c8dfc5b84957c854c1f1daee59bcc620 Mon Sep 17 00:00:00 2001 From: Wei Chen Date: Sun, 25 May 2025 18:28:19 +0000 Subject: [PATCH 3/6] test(electrum): add `criterion` benchmark for `sync` --- crates/electrum/Cargo.toml | 5 ++ crates/electrum/benches/test_sync.rs | 111 +++++++++++++++++++++++++ crates/electrum/tests/test_electrum.rs | 49 ----------- 3 files changed, 116 insertions(+), 49 deletions(-) create mode 100644 crates/electrum/benches/test_sync.rs diff --git a/crates/electrum/Cargo.toml b/crates/electrum/Cargo.toml index a6b619eb1..95982eefa 100644 --- a/crates/electrum/Cargo.toml +++ b/crates/electrum/Cargo.toml @@ -19,6 +19,7 @@ electrum-client = { version = "0.23.1", features = [ "proxy" ], default-features [dev-dependencies] bdk_testenv = { path = "../testenv" } bdk_chain = { path = "../chain" } +criterion = { version = "0.2" } [features] default = ["use-rustls"] @@ -29,3 +30,7 @@ use-openssl = ["electrum-client/use-openssl"] [[test]] name = "test_electrum" required-features = ["use-rustls"] + +[[bench]] +name = "test_sync" +harness = false diff --git a/crates/electrum/benches/test_sync.rs b/crates/electrum/benches/test_sync.rs new file mode 100644 index 000000000..8a0191f2b --- /dev/null +++ b/crates/electrum/benches/test_sync.rs @@ -0,0 +1,111 @@ +use bdk_chain::{ + bitcoin::{Address, Amount, ScriptBuf}, + local_chain::LocalChain, + spk_client::{SyncRequest, SyncResponse}, + spk_txout::SpkTxOutIndex, + ConfirmationBlockTime, IndexedTxGraph, Indexer, Merge, +}; +use bdk_core::bitcoin::{ + key::{Secp256k1, UntweakedPublicKey}, + Network, +}; +use bdk_electrum::BdkElectrumClient; +use bdk_testenv::{anyhow, bitcoincore_rpc::RpcApi, TestEnv}; +use criterion::{criterion_group, criterion_main, Criterion}; +use std::time::Duration; + +// Batch size for `sync_with_electrum`. +const BATCH_SIZE: usize = 5; + +pub fn get_test_spk() -> ScriptBuf { + const PK_BYTES: &[u8] = &[ + 12, 244, 72, 4, 163, 4, 211, 81, 159, 82, 153, 123, 125, 74, 142, 40, 55, 237, 191, 231, + 31, 114, 89, 165, 83, 141, 8, 203, 93, 240, 53, 101, + ]; + let secp = Secp256k1::new(); + let pk = UntweakedPublicKey::from_slice(PK_BYTES).expect("Must be valid PK"); + ScriptBuf::new_p2tr(&secp, pk, None) +} + +fn sync_with_electrum( + client: &BdkElectrumClient, + spks: Spks, + chain: &mut LocalChain, + graph: &mut IndexedTxGraph, +) -> anyhow::Result +where + I: Indexer, + I::ChangeSet: Default + Merge, + Spks: IntoIterator, + Spks::IntoIter: ExactSizeIterator + Send + 'static, +{ + let update = client.sync( + SyncRequest::builder().chain_tip(chain.tip()).spks(spks), + BATCH_SIZE, + true, + )?; + + assert!( + !update.tx_update.txs.is_empty(), + "expected some transactions from sync, but got none" + ); + + if let Some(chain_update) = update.chain_update.clone() { + let _ = chain + .apply_update(chain_update) + .map_err(|err| anyhow::anyhow!("LocalChain update error: {:?}", err))?; + } + let _ = graph.apply_update(update.tx_update.clone()); + + Ok(update) +} + +pub fn test_sync_performance(c: &mut Criterion) { + let env = TestEnv::new().unwrap(); + let electrum_client = electrum_client::Client::new(env.electrsd.electrum_url.as_str()).unwrap(); + let client = BdkElectrumClient::new(electrum_client); + + const NUM_BLOCKS: usize = 100; + let mut spks = Vec::with_capacity(NUM_BLOCKS); + + // Mine some blocks and send transactions. + env.mine_blocks(101, None).unwrap(); + + // Scatter UTXOs across many blocks. + for _ in 0..NUM_BLOCKS { + let spk = get_test_spk(); + let addr = Address::from_script(&spk, Network::Regtest).unwrap(); + env.send(&addr, Amount::from_sat(10_000)).unwrap(); + env.mine_blocks(1, None).unwrap(); + + spks.push(spk); + } + let _ = env.wait_until_electrum_sees_block(Duration::from_secs(6)); + + // Setup receiver. + let genesis = env.bitcoind.client.get_block_hash(0).unwrap(); + let (chain, _) = LocalChain::from_genesis_hash(genesis); + let graph = IndexedTxGraph::::new({ + let mut idx = SpkTxOutIndex::default(); + idx.insert_spk((), spks[0].clone()); + idx + }); + + c.bench_function("sync_with_electrum", move |b| { + b.iter(|| { + let spks = spks.clone(); + let mut recv_chain = chain.clone(); + let mut recv_graph = graph.clone(); + + let _ = sync_with_electrum(&client, spks, &mut recv_chain, &mut recv_graph); + }) + }); +} + +criterion_group! { + name = benches; + config = Criterion::default() + .sample_size(10); + targets = test_sync_performance +} +criterion_main!(benches); diff --git a/crates/electrum/tests/test_electrum.rs b/crates/electrum/tests/test_electrum.rs index 1973b456e..5302e62f2 100644 --- a/crates/electrum/tests/test_electrum.rs +++ b/crates/electrum/tests/test_electrum.rs @@ -20,7 +20,6 @@ use core::time::Duration; use electrum_client::ElectrumApi; use std::collections::{BTreeSet, HashMap, HashSet}; use std::str::FromStr; -use std::time::Instant; // Batch size for `sync_with_electrum`. const BATCH_SIZE: usize = 5; @@ -882,51 +881,3 @@ fn test_check_fee_calculation() -> anyhow::Result<()> { } Ok(()) } - -#[test] -pub fn test_sync_performance() -> anyhow::Result<()> { - const EXPECTED_MAX_SYNC_TIME: Duration = Duration::from_secs(15); - const NUM_ADDRESSES: usize = 1000; - - let env = TestEnv::new()?; - let electrum_client = electrum_client::Client::new(env.electrsd.electrum_url.as_str())?; - let client = BdkElectrumClient::new(electrum_client); - - // Generate test addresses. - let mut spks = Vec::with_capacity(NUM_ADDRESSES); - for _ in 0..NUM_ADDRESSES { - spks.push(get_test_spk()); - } - - // Mine some blocks and send transactions. - env.mine_blocks(101, None)?; - for spk in spks.iter().take(10) { - let addr = Address::from_script(spk, Network::Regtest)?; - env.send(&addr, Amount::from_sat(10_000))?; - } - env.mine_blocks(1, None)?; - - // Setup receiver. - let (mut recv_chain, _) = LocalChain::from_genesis_hash(env.bitcoind.client.get_block_hash(0)?); - let mut recv_graph = IndexedTxGraph::::new({ - let mut recv_index = SpkTxOutIndex::default(); - for spk in spks.iter() { - recv_index.insert_spk((), spk.clone()); - } - recv_index - }); - - // Measure sync time. - let start = Instant::now(); - let _ = sync_with_electrum(&client, spks.clone(), &mut recv_chain, &mut recv_graph)?; - let sync_duration = start.elapsed(); - - assert!( - sync_duration <= EXPECTED_MAX_SYNC_TIME, - "Sync took {:?}, which is longer than expected {:?}", - sync_duration, - EXPECTED_MAX_SYNC_TIME - ); - - Ok(()) -} From ec4fd971c81e7d3bcf8fbdac15c6df63b934cd70 Mon Sep 17 00:00:00 2001 From: Wei Chen Date: Mon, 26 May 2025 23:18:07 +0000 Subject: [PATCH 4/6] feat(electrum): batched `Header`s and `script_get_history` --- crates/electrum/src/bdk_electrum_client.rs | 247 +++++++++++---------- 1 file changed, 133 insertions(+), 114 deletions(-) diff --git a/crates/electrum/src/bdk_electrum_client.rs b/crates/electrum/src/bdk_electrum_client.rs index 4e89fdb80..d68f62e1b 100644 --- a/crates/electrum/src/bdk_electrum_client.rs +++ b/crates/electrum/src/bdk_electrum_client.rs @@ -70,33 +70,6 @@ impl BdkElectrumClient { Ok(tx) } - /// Fetch block header of given `height`. - /// - /// If it hits the cache it will return the cached version and avoid making the request. - fn fetch_header(&self, height: u32) -> Result { - let block_header_cache = self.block_header_cache.lock().unwrap(); - - if let Some(header) = block_header_cache.get(&height) { - return Ok(*header); - } - - drop(block_header_cache); - - self.update_header(height) - } - - /// Update a block header at given `height`. Returns the updated header. - fn update_header(&self, height: u32) -> Result { - let header = self.inner.block_header(height as usize)?; - - self.block_header_cache - .lock() - .unwrap() - .insert(height, header); - - Ok(header) - } - /// Broadcasts a transaction to the network. /// /// This is a re-export of [`ElectrumApi::transaction_broadcast`]. @@ -292,7 +265,7 @@ impl BdkElectrumClient { let mut unused_spk_count = 0; let mut last_active_index = None; - loop { + 'batch_loop: loop { let spks = (0..batch_size) .map_while(|_| spks_with_expected_txids.next()) .collect::>(); @@ -308,7 +281,7 @@ impl BdkElectrumClient { if spk_history.is_empty() { unused_spk_count += 1; if unused_spk_count >= stop_gap { - break; + break 'batch_loop; } } else { last_active_index = Some(spk_index); @@ -339,13 +312,7 @@ impl BdkElectrumClient { } } } - - if unused_spk_count >= stop_gap { - break; - } } - - Ok(last_active_index) } /// Populate the `tx_update` with associated transactions/anchors of `outpoints`. @@ -359,56 +326,74 @@ impl BdkElectrumClient { outpoints: impl IntoIterator, pending_anchors: &mut Vec<(Txid, usize)>, ) -> Result<(), Error> { - for outpoint in outpoints { - let op_txid = outpoint.txid; - let op_tx = self.fetch_tx(op_txid)?; - let op_txout = match op_tx.output.get(outpoint.vout as usize) { - Some(txout) => txout, - None => continue, - }; - debug_assert_eq!(op_tx.compute_txid(), op_txid); - - // attempt to find the following transactions (alongside their chain positions), and - // add to our sparsechain `update`: - let mut has_residing = false; // tx in which the outpoint resides - let mut has_spending = false; // tx that spends the outpoint - for res in self.inner.script_get_history(&op_txout.script_pubkey)? { - if has_residing && has_spending { - break; + // Collect valid outpoints with their corresponding `spk` and `tx`. + let mut ops_spks_txs = Vec::new(); + for op in outpoints { + if let Ok(tx) = self.fetch_tx(op.txid) { + if let Some(txout) = tx.output.get(op.vout as usize) { + ops_spks_txs.push((op, txout.script_pubkey.clone(), tx)); } + } + } - if !has_residing && res.tx_hash == op_txid { - has_residing = true; - tx_update.txs.push(Arc::clone(&op_tx)); - match res.height.try_into() { - // Returned heights 0 & -1 are reserved for unconfirmed txs. - Ok(height) if height > 0 => { - pending_anchors.push((res.tx_hash, height)); - } - _ => { - tx_update.seen_ats.insert((res.tx_hash, start_time)); - } + // Dedup `spk`s, batch-fetch all histories in one call, and store them in a map. + let unique_spks: Vec<_> = ops_spks_txs + .iter() + .map(|(_, spk, _)| spk.clone()) + .collect::>() + .into_iter() + .collect(); + let histories = self + .inner + .batch_script_get_history(unique_spks.iter().map(|spk| spk.as_script()))?; + let mut spk_map = HashMap::new(); + for (spk, history) in unique_spks.into_iter().zip(histories.into_iter()) { + spk_map.insert(spk, history); + } + + for (outpoint, spk, tx) in ops_spks_txs { + if let Some(spk_history) = spk_map.get(&spk) { + let mut has_residing = false; // tx in which the outpoint resides + let mut has_spending = false; // tx that spends the outpoint + + for res in spk_history { + if has_residing && has_spending { + break; } - } - if !has_spending && res.tx_hash != op_txid { - let res_tx = self.fetch_tx(res.tx_hash)?; - // we exclude txs/anchors that do not spend our specified outpoint(s) - has_spending = res_tx - .input - .iter() - .any(|txin| txin.previous_output == outpoint); - if !has_spending { - continue; + if !has_residing && res.tx_hash == outpoint.txid { + has_residing = true; + tx_update.txs.push(Arc::clone(&tx)); + match res.height.try_into() { + // Returned heights 0 & -1 are reserved for unconfirmed txs. + Ok(height) if height > 0 => { + pending_anchors.push((res.tx_hash, height)); + } + _ => { + tx_update.seen_ats.insert((res.tx_hash, start_time)); + } + } } - tx_update.txs.push(Arc::clone(&res_tx)); - match res.height.try_into() { - // Returned heights 0 & -1 are reserved for unconfirmed txs. - Ok(height) if height > 0 => { - pending_anchors.push((res.tx_hash, height)); + + if !has_spending && res.tx_hash != outpoint.txid { + let res_tx = self.fetch_tx(res.tx_hash)?; + // we exclude txs/anchors that do not spend our specified outpoint(s) + has_spending = res_tx + .input + .iter() + .any(|txin| txin.previous_output == outpoint); + if !has_spending { + continue; } - _ => { - tx_update.seen_ats.insert((res.tx_hash, start_time)); + tx_update.txs.push(Arc::clone(&res_tx)); + match res.height.try_into() { + // Returned heights 0 & -1 are reserved for unconfirmed txs. + Ok(height) if height > 0 => { + pending_anchors.push((res.tx_hash, height)); + } + _ => { + tx_update.seen_ats.insert((res.tx_hash, start_time)); + } } } } @@ -426,39 +411,47 @@ impl BdkElectrumClient { txids: impl IntoIterator, pending_anchors: &mut Vec<(Txid, usize)>, ) -> Result<(), Error> { + let mut txs = Vec::<(Txid, Arc)>::new(); + let mut scripts = Vec::new(); for txid in txids { - let tx = match self.fetch_tx(txid) { - Ok(tx) => tx, - Err(electrum_client::Error::Protocol(_)) => continue, - Err(other_err) => return Err(other_err), - }; + match self.fetch_tx(txid) { + Ok(tx) => { + let spk = tx + .output + .first() + .map(|txo| &txo.script_pubkey) + .expect("tx must have an output") + .clone(); + txs.push((txid, tx)); + scripts.push(spk); + } + Err(electrum_client::Error::Protocol(_)) => { + continue; + } + Err(e) => return Err(e), + } + } - let spk = tx - .output - .first() - .map(|txo| &txo.script_pubkey) - .expect("tx must have an output"); + // because of restrictions of the Electrum API, we have to use the `script_get_history` + // call to get confirmation status of our transaction + let spk_histories = self + .inner + .batch_script_get_history(scripts.iter().map(|spk| spk.as_script()))?; - // because of restrictions of the Electrum API, we have to use the `script_get_history` - // call to get confirmation status of our transaction - if let Some(r) = self - .inner - .script_get_history(spk)? - .into_iter() - .find(|r| r.tx_hash == txid) - { - match r.height.try_into() { + for (tx, spk_history) in txs.into_iter().zip(spk_histories) { + if let Some(res) = spk_history.into_iter().find(|res| res.tx_hash == tx.0) { + match res.height.try_into() { // Returned heights 0 & -1 are reserved for unconfirmed txs. Ok(height) if height > 0 => { - pending_anchors.push((txid, height)); + pending_anchors.push((tx.0, height)); } _ => { - tx_update.seen_ats.insert((r.tx_hash, start_time)); + tx_update.seen_ats.insert((res.tx_hash, start_time)); } } } - tx_update.txs.push(tx); + tx_update.txs.push(tx.1); } Ok(()) @@ -472,19 +465,37 @@ impl BdkElectrumClient { let mut results = Vec::with_capacity(txs_with_heights.len()); let mut to_fetch = Vec::new(); - // Build a map for height to block hash conversions. This is for obtaining block hash data - // with minimum `fetch_header` calls. - let mut height_to_hash: HashMap = HashMap::new(); - for &(_, height) in txs_with_heights { - let h = height as u32; - if !height_to_hash.contains_key(&h) { - // Try to obtain hash from the header cache, or fetch the header if absent. - let hash = self.fetch_header(h)?.block_hash(); - height_to_hash.insert(h, hash); + // Figure out which block heights we need headers for. + let mut needed_heights: Vec = + txs_with_heights.iter().map(|&(_, h)| h as u32).collect(); + needed_heights.sort_unstable(); + needed_heights.dedup(); + + let mut height_to_hash = HashMap::with_capacity(needed_heights.len()); + + // Collect headers of missing heights, and build `height_to_hash` map. + { + let mut cache = self.block_header_cache.lock().unwrap(); + + let mut missing_heights = Vec::new(); + for &height in &needed_heights { + if let Some(header) = cache.get(&height) { + height_to_hash.insert(height, header.block_hash()); + } else { + missing_heights.push(height); + } + } + + if !missing_heights.is_empty() { + let headers = self.inner.batch_block_header(missing_heights.clone())?; + for (height, header) in missing_heights.into_iter().zip(headers) { + height_to_hash.insert(height, header.block_hash()); + cache.insert(height, header); + } } } - // Check cache. + // Check our anchor cache and queue up any proofs we still need. { let anchor_cache = self.anchor_cache.lock().unwrap(); for &(txid, height) in txs_with_heights { @@ -505,14 +516,22 @@ impl BdkElectrumClient { let proof = self.inner.transaction_get_merkle(&txid, height)?; // Validate against header, retrying once on stale header. - let mut header = self.fetch_header(height as u32)?; + let mut header = { + let cache = self.block_header_cache.lock().unwrap(); + cache[&(height as u32)] + }; let mut valid = electrum_client::utils::validate_merkle_proof( &txid, &header.merkle_root, &proof, ); if !valid { - header = self.update_header(height as u32)?; + let new_header = self.inner.block_header(height)?; + self.block_header_cache + .lock() + .unwrap() + .insert(height as u32, new_header); + header = new_header; valid = electrum_client::utils::validate_merkle_proof( &txid, &header.merkle_root, @@ -526,7 +545,7 @@ impl BdkElectrumClient { confirmation_time: header.time as u64, block_id: BlockId { height: height as u32, - hash: header.block_hash(), + hash, }, }; self.anchor_cache From 4ea5ea6c490adb0652ad068bc816c2434c51da35 Mon Sep 17 00:00:00 2001 From: Wei Chen Date: Tue, 27 May 2025 18:15:44 +0000 Subject: [PATCH 5/6] feat(electrum): batch `transaction.get_merkle` calls via `batch_call` --- crates/electrum/Cargo.toml | 1 + crates/electrum/src/bdk_electrum_client.rs | 122 ++++++++++----------- 2 files changed, 57 insertions(+), 66 deletions(-) diff --git a/crates/electrum/Cargo.toml b/crates/electrum/Cargo.toml index 95982eefa..c09bfcbac 100644 --- a/crates/electrum/Cargo.toml +++ b/crates/electrum/Cargo.toml @@ -15,6 +15,7 @@ workspace = true [dependencies] bdk_core = { path = "../core", version = "0.6.0" } electrum-client = { version = "0.23.1", features = [ "proxy" ], default-features = false } +serde_json = "1.0" [dev-dependencies] bdk_testenv = { path = "../testenv" } diff --git a/crates/electrum/src/bdk_electrum_client.rs b/crates/electrum/src/bdk_electrum_client.rs index d68f62e1b..dde9efebd 100644 --- a/crates/electrum/src/bdk_electrum_client.rs +++ b/crates/electrum/src/bdk_electrum_client.rs @@ -12,9 +12,6 @@ use std::sync::{Arc, Mutex}; /// We include a chain suffix of a certain length for the purpose of robustness. const CHAIN_SUFFIX_LENGTH: u32 = 8; -/// Maximum batch size for proof validation requests -const MAX_BATCH_SIZE: usize = 100; - /// Wrapper around an [`electrum_client::ElectrumApi`] which includes an internal in-memory /// transaction cache to avoid re-fetching already downloaded transactions. #[derive(Debug)] @@ -262,15 +259,15 @@ impl BdkElectrumClient { batch_size: usize, pending_anchors: &mut Vec<(Txid, usize)>, ) -> Result, Error> { - let mut unused_spk_count = 0; - let mut last_active_index = None; + let mut unused_spk_count = 0_usize; + let mut last_active_index = Option::::None; - 'batch_loop: loop { + loop { let spks = (0..batch_size) .map_while(|_| spks_with_expected_txids.next()) .collect::>(); if spks.is_empty() { - break; + return Ok(last_active_index); } let spk_histories = self @@ -279,10 +276,10 @@ impl BdkElectrumClient { for ((spk_index, spk), spk_history) in spks.into_iter().zip(spk_histories) { if spk_history.is_empty() { - unused_spk_count += 1; - if unused_spk_count >= stop_gap { - break 'batch_loop; - } + match unused_spk_count.checked_add(1) { + Some(i) if i < stop_gap => unused_spk_count = i, + _ => return Ok(last_active_index), + }; } else { last_active_index = Some(spk_index); unused_spk_count = 0; @@ -509,72 +506,65 @@ impl BdkElectrumClient { } } - // Fetch missing proofs in batches - for chunk in to_fetch.chunks(MAX_BATCH_SIZE) { - for &(txid, height, hash) in chunk { - // Fetch the raw proof. - let proof = self.inner.transaction_get_merkle(&txid, height)?; - - // Validate against header, retrying once on stale header. - let mut header = { - let cache = self.block_header_cache.lock().unwrap(); - cache[&(height as u32)] - }; - let mut valid = electrum_client::utils::validate_merkle_proof( + // Batch all get_merkle calls. + let mut batch = electrum_client::Batch::default(); + for &(txid, height, _) in &to_fetch { + batch.raw( + "blockchain.transaction.get_merkle".into(), + vec![ + electrum_client::Param::String(format!("{:x}", txid)), + electrum_client::Param::Usize(height), + ], + ); + } + let resps = self.inner.batch_call(&batch)?; + + // Validate each proof, retrying once for each stale header. + for ((txid, height, hash), resp) in to_fetch.into_iter().zip(resps.into_iter()) { + let proof: electrum_client::GetMerkleRes = serde_json::from_value(resp)?; + + let mut header = { + let cache = self.block_header_cache.lock().unwrap(); + cache + .get(&(height as u32)) + .copied() + .expect("header already fetched above") + }; + let mut valid = + electrum_client::utils::validate_merkle_proof(&txid, &header.merkle_root, &proof); + if !valid { + header = self.inner.block_header(height)?; + self.block_header_cache + .lock() + .unwrap() + .insert(height as u32, header); + valid = electrum_client::utils::validate_merkle_proof( &txid, &header.merkle_root, &proof, ); - if !valid { - let new_header = self.inner.block_header(height)?; - self.block_header_cache - .lock() - .unwrap() - .insert(height as u32, new_header); - header = new_header; - valid = electrum_client::utils::validate_merkle_proof( - &txid, - &header.merkle_root, - &proof, - ); - } + } - // Build and cache the anchor if merkle proof is valid. - if valid { - let anchor = ConfirmationBlockTime { - confirmation_time: header.time as u64, - block_id: BlockId { - height: height as u32, - hash, - }, - }; - self.anchor_cache - .lock() - .unwrap() - .insert((txid, hash), anchor); - results.push((txid, anchor)); - } + // Build and cache the anchor if merkle proof is valid. + if valid { + let anchor = ConfirmationBlockTime { + confirmation_time: header.time as u64, + block_id: BlockId { + height: height as u32, + hash, + }, + }; + self.anchor_cache + .lock() + .unwrap() + .insert((txid, hash), anchor); + results.push((txid, anchor)); } } Ok(results) } - /// Validate a single transaction’s Merkle proof, cache its confirmation anchor, and update. - #[allow(dead_code)] - fn validate_anchor_for_update( - &self, - tx_update: &mut TxUpdate, - txid: Txid, - confirmation_height: usize, - ) -> Result<(), Error> { - let anchors = self.batch_fetch_anchors(&[(txid, confirmation_height)])?; - for (txid, anchor) in anchors { - tx_update.anchors.insert((anchor, txid)); - } - Ok(()) - } - // Helper function which fetches the `TxOut`s of our relevant transactions' previous // transactions, which we do not have by default. This data is needed to calculate the // transaction fee. From 156cbab67f4ff91276f9f03749944f4c46210f7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Wed, 25 Jun 2025 23:34:50 +0000 Subject: [PATCH 6/6] test(electrum): Improve benchmark * Actually use different spks * Do not benchmark applying updates (only fetching/contructing) * Have two benches: One with cache, one without. * Remove `black_box`. --- crates/electrum/benches/test_sync.rs | 113 ++++++++++++++------------- 1 file changed, 60 insertions(+), 53 deletions(-) diff --git a/crates/electrum/benches/test_sync.rs b/crates/electrum/benches/test_sync.rs index 8a0191f2b..063fdd629 100644 --- a/crates/electrum/benches/test_sync.rs +++ b/crates/electrum/benches/test_sync.rs @@ -1,46 +1,44 @@ -use bdk_chain::{ - bitcoin::{Address, Amount, ScriptBuf}, - local_chain::LocalChain, - spk_client::{SyncRequest, SyncResponse}, - spk_txout::SpkTxOutIndex, - ConfirmationBlockTime, IndexedTxGraph, Indexer, Merge, -}; -use bdk_core::bitcoin::{ - key::{Secp256k1, UntweakedPublicKey}, - Network, +use bdk_chain::bitcoin::{Address, Amount, ScriptBuf}; +use bdk_core::{ + bitcoin::{ + consensus::WriteExt, + hashes::Hash, + key::{Secp256k1, UntweakedPublicKey}, + Network, TapNodeHash, + }, + spk_client::SyncRequest, + CheckPoint, }; use bdk_electrum::BdkElectrumClient; use bdk_testenv::{anyhow, bitcoincore_rpc::RpcApi, TestEnv}; use criterion::{criterion_group, criterion_main, Criterion}; -use std::time::Duration; +use electrum_client::ElectrumApi; +use std::{collections::BTreeSet, time::Duration}; // Batch size for `sync_with_electrum`. -const BATCH_SIZE: usize = 5; +const BATCH_SIZE: usize = 100; -pub fn get_test_spk() -> ScriptBuf { +pub fn get_test_spk(i: usize) -> ScriptBuf { const PK_BYTES: &[u8] = &[ 12, 244, 72, 4, 163, 4, 211, 81, 159, 82, 153, 123, 125, 74, 142, 40, 55, 237, 191, 231, 31, 114, 89, 165, 83, 141, 8, 203, 93, 240, 53, 101, ]; let secp = Secp256k1::new(); let pk = UntweakedPublicKey::from_slice(PK_BYTES).expect("Must be valid PK"); - ScriptBuf::new_p2tr(&secp, pk, None) + let mut engine = TapNodeHash::engine(); + engine.emit_u64(i as u64).expect("must emit"); + ScriptBuf::new_p2tr(&secp, pk, Some(TapNodeHash::from_engine(engine))) } -fn sync_with_electrum( - client: &BdkElectrumClient, - spks: Spks, - chain: &mut LocalChain, - graph: &mut IndexedTxGraph, -) -> anyhow::Result -where - I: Indexer, - I::ChangeSet: Default + Merge, - Spks: IntoIterator, - Spks::IntoIter: ExactSizeIterator + Send + 'static, -{ +fn sync_with_electrum( + client: &BdkElectrumClient, + spks: &[ScriptBuf], + chain_tip: &CheckPoint, +) -> anyhow::Result<()> { let update = client.sync( - SyncRequest::builder().chain_tip(chain.tip()).spks(spks), + SyncRequest::builder() + .chain_tip(chain_tip.clone()) + .spks(spks.iter().cloned()), BATCH_SIZE, true, )?; @@ -50,20 +48,11 @@ where "expected some transactions from sync, but got none" ); - if let Some(chain_update) = update.chain_update.clone() { - let _ = chain - .apply_update(chain_update) - .map_err(|err| anyhow::anyhow!("LocalChain update error: {:?}", err))?; - } - let _ = graph.apply_update(update.tx_update.clone()); - - Ok(update) + Ok(()) } pub fn test_sync_performance(c: &mut Criterion) { let env = TestEnv::new().unwrap(); - let electrum_client = electrum_client::Client::new(env.electrsd.electrum_url.as_str()).unwrap(); - let client = BdkElectrumClient::new(electrum_client); const NUM_BLOCKS: usize = 100; let mut spks = Vec::with_capacity(NUM_BLOCKS); @@ -72,8 +61,8 @@ pub fn test_sync_performance(c: &mut Criterion) { env.mine_blocks(101, None).unwrap(); // Scatter UTXOs across many blocks. - for _ in 0..NUM_BLOCKS { - let spk = get_test_spk(); + for i in 0..NUM_BLOCKS { + let spk = get_test_spk(i); let addr = Address::from_script(&spk, Network::Regtest).unwrap(); env.send(&addr, Amount::from_sat(10_000)).unwrap(); env.mine_blocks(1, None).unwrap(); @@ -81,25 +70,43 @@ pub fn test_sync_performance(c: &mut Criterion) { spks.push(spk); } let _ = env.wait_until_electrum_sees_block(Duration::from_secs(6)); + assert_eq!( + spks.iter().cloned().collect::>().len(), + spks.len(), + "all spks must be unique", + ); // Setup receiver. - let genesis = env.bitcoind.client.get_block_hash(0).unwrap(); - let (chain, _) = LocalChain::from_genesis_hash(genesis); - let graph = IndexedTxGraph::::new({ - let mut idx = SpkTxOutIndex::default(); - idx.insert_spk((), spks[0].clone()); - idx + let genesis_cp = CheckPoint::new(bdk_core::BlockId { + height: 0, + hash: env.bitcoind.client.get_block_hash(0).unwrap(), }); - c.bench_function("sync_with_electrum", move |b| { - b.iter(|| { - let spks = spks.clone(); - let mut recv_chain = chain.clone(); - let mut recv_graph = graph.clone(); + { + let electrum_client = + electrum_client::Client::new(env.electrsd.electrum_url.as_str()).unwrap(); + let spks = spks.clone(); + let genesis_cp = genesis_cp.clone(); + c.bench_function("sync_with_electrum", move |b| { + b.iter(|| { + sync_with_electrum( + &BdkElectrumClient::new(&electrum_client), + &spks, + &genesis_cp, + ) + .expect("must not error") + }) + }); + } - let _ = sync_with_electrum(&client, spks, &mut recv_chain, &mut recv_graph); - }) - }); + { + let client = BdkElectrumClient::new( + electrum_client::Client::new(env.electrsd.electrum_url.as_str()).unwrap(), + ); + c.bench_function("sync_with_electrum_cached", move |b| { + b.iter(|| sync_with_electrum(&client, &spks, &genesis_cp).expect("must not error")) + }); + } } criterion_group! {