diff --git a/crates/electrum/Cargo.toml b/crates/electrum/Cargo.toml index a6b619eb1..c09bfcbac 100644 --- a/crates/electrum/Cargo.toml +++ b/crates/electrum/Cargo.toml @@ -15,10 +15,12 @@ workspace = true [dependencies] bdk_core = { path = "../core", version = "0.6.0" } electrum-client = { version = "0.23.1", features = [ "proxy" ], default-features = false } +serde_json = "1.0" [dev-dependencies] bdk_testenv = { path = "../testenv" } bdk_chain = { path = "../chain" } +criterion = { version = "0.2" } [features] default = ["use-rustls"] @@ -29,3 +31,7 @@ use-openssl = ["electrum-client/use-openssl"] [[test]] name = "test_electrum" required-features = ["use-rustls"] + +[[bench]] +name = "test_sync" +harness = false diff --git a/crates/electrum/benches/test_sync.rs b/crates/electrum/benches/test_sync.rs new file mode 100644 index 000000000..063fdd629 --- /dev/null +++ b/crates/electrum/benches/test_sync.rs @@ -0,0 +1,118 @@ +use bdk_chain::bitcoin::{Address, Amount, ScriptBuf}; +use bdk_core::{ + bitcoin::{ + consensus::WriteExt, + hashes::Hash, + key::{Secp256k1, UntweakedPublicKey}, + Network, TapNodeHash, + }, + spk_client::SyncRequest, + CheckPoint, +}; +use bdk_electrum::BdkElectrumClient; +use bdk_testenv::{anyhow, bitcoincore_rpc::RpcApi, TestEnv}; +use criterion::{criterion_group, criterion_main, Criterion}; +use electrum_client::ElectrumApi; +use std::{collections::BTreeSet, time::Duration}; + +// Batch size for `sync_with_electrum`. +const BATCH_SIZE: usize = 100; + +pub fn get_test_spk(i: usize) -> ScriptBuf { + const PK_BYTES: &[u8] = &[ + 12, 244, 72, 4, 163, 4, 211, 81, 159, 82, 153, 123, 125, 74, 142, 40, 55, 237, 191, 231, + 31, 114, 89, 165, 83, 141, 8, 203, 93, 240, 53, 101, + ]; + let secp = Secp256k1::new(); + let pk = UntweakedPublicKey::from_slice(PK_BYTES).expect("Must be valid PK"); + let mut engine = TapNodeHash::engine(); + engine.emit_u64(i as u64).expect("must emit"); + ScriptBuf::new_p2tr(&secp, pk, Some(TapNodeHash::from_engine(engine))) +} + +fn sync_with_electrum( + client: &BdkElectrumClient, + spks: &[ScriptBuf], + chain_tip: &CheckPoint, +) -> anyhow::Result<()> { + let update = client.sync( + SyncRequest::builder() + .chain_tip(chain_tip.clone()) + .spks(spks.iter().cloned()), + BATCH_SIZE, + true, + )?; + + assert!( + !update.tx_update.txs.is_empty(), + "expected some transactions from sync, but got none" + ); + + Ok(()) +} + +pub fn test_sync_performance(c: &mut Criterion) { + let env = TestEnv::new().unwrap(); + + const NUM_BLOCKS: usize = 100; + let mut spks = Vec::with_capacity(NUM_BLOCKS); + + // Mine some blocks and send transactions. + env.mine_blocks(101, None).unwrap(); + + // Scatter UTXOs across many blocks. + for i in 0..NUM_BLOCKS { + let spk = get_test_spk(i); + let addr = Address::from_script(&spk, Network::Regtest).unwrap(); + env.send(&addr, Amount::from_sat(10_000)).unwrap(); + env.mine_blocks(1, None).unwrap(); + + spks.push(spk); + } + let _ = env.wait_until_electrum_sees_block(Duration::from_secs(6)); + assert_eq!( + spks.iter().cloned().collect::>().len(), + spks.len(), + "all spks must be unique", + ); + + // Setup receiver. + let genesis_cp = CheckPoint::new(bdk_core::BlockId { + height: 0, + hash: env.bitcoind.client.get_block_hash(0).unwrap(), + }); + + { + let electrum_client = + electrum_client::Client::new(env.electrsd.electrum_url.as_str()).unwrap(); + let spks = spks.clone(); + let genesis_cp = genesis_cp.clone(); + c.bench_function("sync_with_electrum", move |b| { + b.iter(|| { + sync_with_electrum( + &BdkElectrumClient::new(&electrum_client), + &spks, + &genesis_cp, + ) + .expect("must not error") + }) + }); + } + + { + let client = BdkElectrumClient::new( + electrum_client::Client::new(env.electrsd.electrum_url.as_str()).unwrap(), + ); + c.bench_function("sync_with_electrum_cached", move |b| { + b.iter(|| sync_with_electrum(&client, &spks, &genesis_cp).expect("must not error")) + }); + } +} + +criterion_group! { + name = benches; + config = Criterion::default() + .sample_size(10); + targets = test_sync_performance +} +criterion_main!(benches); diff --git a/crates/electrum/src/bdk_electrum_client.rs b/crates/electrum/src/bdk_electrum_client.rs index e0eac5083..dde9efebd 100644 --- a/crates/electrum/src/bdk_electrum_client.rs +++ b/crates/electrum/src/bdk_electrum_client.rs @@ -22,6 +22,8 @@ pub struct BdkElectrumClient { tx_cache: Mutex>>, /// The header cache block_header_cache: Mutex>, + /// Cache of transaction anchors + anchor_cache: Mutex>, } impl BdkElectrumClient { @@ -31,6 +33,7 @@ impl BdkElectrumClient { inner: client, tx_cache: Default::default(), block_header_cache: Default::default(), + anchor_cache: Default::default(), } } @@ -64,33 +67,6 @@ impl BdkElectrumClient { Ok(tx) } - /// Fetch block header of given `height`. - /// - /// If it hits the cache it will return the cached version and avoid making the request. - fn fetch_header(&self, height: u32) -> Result { - let block_header_cache = self.block_header_cache.lock().unwrap(); - - if let Some(header) = block_header_cache.get(&height) { - return Ok(*header); - } - - drop(block_header_cache); - - self.update_header(height) - } - - /// Update a block header at given `height`. Returns the updated header. - fn update_header(&self, height: u32) -> Result { - let header = self.inner.block_header(height as usize)?; - - self.block_header_cache - .lock() - .unwrap() - .insert(height, header); - - Ok(header) - } - /// Broadcasts a transaction to the network. /// /// This is a re-export of [`ElectrumApi::transaction_broadcast`]. @@ -135,13 +111,19 @@ impl BdkElectrumClient { let mut tx_update = TxUpdate::::default(); let mut last_active_indices = BTreeMap::::default(); + let mut pending_anchors = Vec::new(); for keychain in request.keychains() { let spks = request .iter_spks(keychain.clone()) .map(|(spk_i, spk)| (spk_i, SpkWithExpectedTxids::from(spk))); - if let Some(last_active_index) = - self.populate_with_spks(start_time, &mut tx_update, spks, stop_gap, batch_size)? - { + if let Some(last_active_index) = self.populate_with_spks( + start_time, + &mut tx_update, + spks, + stop_gap, + batch_size, + &mut pending_anchors, + )? { last_active_indices.insert(keychain, last_active_index); } } @@ -151,6 +133,13 @@ impl BdkElectrumClient { self.fetch_prev_txout(&mut tx_update)?; } + if !pending_anchors.is_empty() { + let anchors = self.batch_fetch_anchors(&pending_anchors)?; + for (txid, anchor) in anchors { + tx_update.anchors.insert((anchor, txid)); + } + } + let chain_update = match tip_and_latest_blocks { Some((chain_tip, latest_blocks)) => Some(chain_update( chain_tip, @@ -204,6 +193,7 @@ impl BdkElectrumClient { }; let mut tx_update = TxUpdate::::default(); + let mut pending_anchors = Vec::new(); self.populate_with_spks( start_time, &mut tx_update, @@ -213,15 +203,33 @@ impl BdkElectrumClient { .map(|(i, spk)| (i as u32, spk)), usize::MAX, batch_size, + &mut pending_anchors, + )?; + self.populate_with_txids( + start_time, + &mut tx_update, + request.iter_txids(), + &mut pending_anchors, + )?; + self.populate_with_outpoints( + start_time, + &mut tx_update, + request.iter_outpoints(), + &mut pending_anchors, )?; - self.populate_with_txids(start_time, &mut tx_update, request.iter_txids())?; - self.populate_with_outpoints(start_time, &mut tx_update, request.iter_outpoints())?; // Fetch previous `TxOut`s for fee calculation if flag is enabled. if fetch_prev_txouts { self.fetch_prev_txout(&mut tx_update)?; } + if !pending_anchors.is_empty() { + let anchors = self.batch_fetch_anchors(&pending_anchors)?; + for (txid, anchor) in anchors { + tx_update.anchors.insert((anchor, txid)); + } + } + let chain_update = match tip_and_latest_blocks { Some((chain_tip, latest_blocks)) => Some(chain_update( chain_tip, @@ -249,6 +257,7 @@ impl BdkElectrumClient { mut spks_with_expected_txids: impl Iterator, stop_gap: usize, batch_size: usize, + pending_anchors: &mut Vec<(Txid, usize)>, ) -> Result, Error> { let mut unused_spk_count = 0_usize; let mut last_active_index = Option::::None; @@ -267,10 +276,10 @@ impl BdkElectrumClient { for ((spk_index, spk), spk_history) in spks.into_iter().zip(spk_histories) { if spk_history.is_empty() { - unused_spk_count = unused_spk_count.saturating_add(1); - if unused_spk_count >= stop_gap { - return Ok(last_active_index); - } + match unused_spk_count.checked_add(1) { + Some(i) if i < stop_gap => unused_spk_count = i, + _ => return Ok(last_active_index), + }; } else { last_active_index = Some(spk_index); unused_spk_count = 0; @@ -292,7 +301,7 @@ impl BdkElectrumClient { match tx_res.height.try_into() { // Returned heights 0 & -1 are reserved for unconfirmed txs. Ok(height) if height > 0 => { - self.validate_merkle_for_anchor(tx_update, tx_res.tx_hash, height)?; + pending_anchors.push((tx_res.tx_hash, height)); } _ => { tx_update.seen_ats.insert((tx_res.tx_hash, start_time)); @@ -312,62 +321,82 @@ impl BdkElectrumClient { start_time: u64, tx_update: &mut TxUpdate, outpoints: impl IntoIterator, + pending_anchors: &mut Vec<(Txid, usize)>, ) -> Result<(), Error> { - for outpoint in outpoints { - let op_txid = outpoint.txid; - let op_tx = self.fetch_tx(op_txid)?; - let op_txout = match op_tx.output.get(outpoint.vout as usize) { - Some(txout) => txout, - None => continue, - }; - debug_assert_eq!(op_tx.compute_txid(), op_txid); - - // attempt to find the following transactions (alongside their chain positions), and - // add to our sparsechain `update`: - let mut has_residing = false; // tx in which the outpoint resides - let mut has_spending = false; // tx that spends the outpoint - for res in self.inner.script_get_history(&op_txout.script_pubkey)? { - if has_residing && has_spending { - break; + // Collect valid outpoints with their corresponding `spk` and `tx`. + let mut ops_spks_txs = Vec::new(); + for op in outpoints { + if let Ok(tx) = self.fetch_tx(op.txid) { + if let Some(txout) = tx.output.get(op.vout as usize) { + ops_spks_txs.push((op, txout.script_pubkey.clone(), tx)); } + } + } - if !has_residing && res.tx_hash == op_txid { - has_residing = true; - tx_update.txs.push(Arc::clone(&op_tx)); - match res.height.try_into() { - // Returned heights 0 & -1 are reserved for unconfirmed txs. - Ok(height) if height > 0 => { - self.validate_merkle_for_anchor(tx_update, res.tx_hash, height)?; - } - _ => { - tx_update.seen_ats.insert((res.tx_hash, start_time)); - } + // Dedup `spk`s, batch-fetch all histories in one call, and store them in a map. + let unique_spks: Vec<_> = ops_spks_txs + .iter() + .map(|(_, spk, _)| spk.clone()) + .collect::>() + .into_iter() + .collect(); + let histories = self + .inner + .batch_script_get_history(unique_spks.iter().map(|spk| spk.as_script()))?; + let mut spk_map = HashMap::new(); + for (spk, history) in unique_spks.into_iter().zip(histories.into_iter()) { + spk_map.insert(spk, history); + } + + for (outpoint, spk, tx) in ops_spks_txs { + if let Some(spk_history) = spk_map.get(&spk) { + let mut has_residing = false; // tx in which the outpoint resides + let mut has_spending = false; // tx that spends the outpoint + + for res in spk_history { + if has_residing && has_spending { + break; } - } - if !has_spending && res.tx_hash != op_txid { - let res_tx = self.fetch_tx(res.tx_hash)?; - // we exclude txs/anchors that do not spend our specified outpoint(s) - has_spending = res_tx - .input - .iter() - .any(|txin| txin.previous_output == outpoint); - if !has_spending { - continue; + if !has_residing && res.tx_hash == outpoint.txid { + has_residing = true; + tx_update.txs.push(Arc::clone(&tx)); + match res.height.try_into() { + // Returned heights 0 & -1 are reserved for unconfirmed txs. + Ok(height) if height > 0 => { + pending_anchors.push((res.tx_hash, height)); + } + _ => { + tx_update.seen_ats.insert((res.tx_hash, start_time)); + } + } } - tx_update.txs.push(Arc::clone(&res_tx)); - match res.height.try_into() { - // Returned heights 0 & -1 are reserved for unconfirmed txs. - Ok(height) if height > 0 => { - self.validate_merkle_for_anchor(tx_update, res.tx_hash, height)?; + + if !has_spending && res.tx_hash != outpoint.txid { + let res_tx = self.fetch_tx(res.tx_hash)?; + // we exclude txs/anchors that do not spend our specified outpoint(s) + has_spending = res_tx + .input + .iter() + .any(|txin| txin.previous_output == outpoint); + if !has_spending { + continue; } - _ => { - tx_update.seen_ats.insert((res.tx_hash, start_time)); + tx_update.txs.push(Arc::clone(&res_tx)); + match res.height.try_into() { + // Returned heights 0 & -1 are reserved for unconfirmed txs. + Ok(height) if height > 0 => { + pending_anchors.push((res.tx_hash, height)); + } + _ => { + tx_update.seen_ats.insert((res.tx_hash, start_time)); + } } } } } } + Ok(()) } @@ -377,88 +406,163 @@ impl BdkElectrumClient { start_time: u64, tx_update: &mut TxUpdate, txids: impl IntoIterator, + pending_anchors: &mut Vec<(Txid, usize)>, ) -> Result<(), Error> { + let mut txs = Vec::<(Txid, Arc)>::new(); + let mut scripts = Vec::new(); for txid in txids { - let tx = match self.fetch_tx(txid) { - Ok(tx) => tx, - Err(electrum_client::Error::Protocol(_)) => continue, - Err(other_err) => return Err(other_err), - }; + match self.fetch_tx(txid) { + Ok(tx) => { + let spk = tx + .output + .first() + .map(|txo| &txo.script_pubkey) + .expect("tx must have an output") + .clone(); + txs.push((txid, tx)); + scripts.push(spk); + } + Err(electrum_client::Error::Protocol(_)) => { + continue; + } + Err(e) => return Err(e), + } + } - let spk = tx - .output - .first() - .map(|txo| &txo.script_pubkey) - .expect("tx must have an output"); + // because of restrictions of the Electrum API, we have to use the `script_get_history` + // call to get confirmation status of our transaction + let spk_histories = self + .inner + .batch_script_get_history(scripts.iter().map(|spk| spk.as_script()))?; - // because of restrictions of the Electrum API, we have to use the `script_get_history` - // call to get confirmation status of our transaction - if let Some(r) = self - .inner - .script_get_history(spk)? - .into_iter() - .find(|r| r.tx_hash == txid) - { - match r.height.try_into() { + for (tx, spk_history) in txs.into_iter().zip(spk_histories) { + if let Some(res) = spk_history.into_iter().find(|res| res.tx_hash == tx.0) { + match res.height.try_into() { // Returned heights 0 & -1 are reserved for unconfirmed txs. Ok(height) if height > 0 => { - self.validate_merkle_for_anchor(tx_update, txid, height)?; + pending_anchors.push((tx.0, height)); } _ => { - tx_update.seen_ats.insert((r.tx_hash, start_time)); + tx_update.seen_ats.insert((res.tx_hash, start_time)); } } } - tx_update.txs.push(tx); + tx_update.txs.push(tx.1); } + Ok(()) } - // Helper function which checks if a transaction is confirmed by validating the merkle proof. - // An anchor is inserted if the transaction is validated to be in a confirmed block. - fn validate_merkle_for_anchor( + /// Batch validate Merkle proofs, cache each confirmation anchor, and return them. + fn batch_fetch_anchors( &self, - tx_update: &mut TxUpdate, - txid: Txid, - confirmation_height: usize, - ) -> Result<(), Error> { - if let Ok(merkle_res) = self - .inner - .transaction_get_merkle(&txid, confirmation_height) + txs_with_heights: &[(Txid, usize)], + ) -> Result, Error> { + let mut results = Vec::with_capacity(txs_with_heights.len()); + let mut to_fetch = Vec::new(); + + // Figure out which block heights we need headers for. + let mut needed_heights: Vec = + txs_with_heights.iter().map(|&(_, h)| h as u32).collect(); + needed_heights.sort_unstable(); + needed_heights.dedup(); + + let mut height_to_hash = HashMap::with_capacity(needed_heights.len()); + + // Collect headers of missing heights, and build `height_to_hash` map. { - let mut header = self.fetch_header(merkle_res.block_height as u32)?; - let mut is_confirmed_tx = electrum_client::utils::validate_merkle_proof( - &txid, - &header.merkle_root, - &merkle_res, - ); + let mut cache = self.block_header_cache.lock().unwrap(); - // Merkle validation will fail if the header in `block_header_cache` is outdated, so we - // want to check if there is a new header and validate against the new one. - if !is_confirmed_tx { - header = self.update_header(merkle_res.block_height as u32)?; - is_confirmed_tx = electrum_client::utils::validate_merkle_proof( + let mut missing_heights = Vec::new(); + for &height in &needed_heights { + if let Some(header) = cache.get(&height) { + height_to_hash.insert(height, header.block_hash()); + } else { + missing_heights.push(height); + } + } + + if !missing_heights.is_empty() { + let headers = self.inner.batch_block_header(missing_heights.clone())?; + for (height, header) in missing_heights.into_iter().zip(headers) { + height_to_hash.insert(height, header.block_hash()); + cache.insert(height, header); + } + } + } + + // Check our anchor cache and queue up any proofs we still need. + { + let anchor_cache = self.anchor_cache.lock().unwrap(); + for &(txid, height) in txs_with_heights { + let h = height as u32; + let hash = height_to_hash[&h]; + if let Some(anchor) = anchor_cache.get(&(txid, hash)) { + results.push((txid, *anchor)); + } else { + to_fetch.push((txid, height, hash)); + } + } + } + + // Batch all get_merkle calls. + let mut batch = electrum_client::Batch::default(); + for &(txid, height, _) in &to_fetch { + batch.raw( + "blockchain.transaction.get_merkle".into(), + vec![ + electrum_client::Param::String(format!("{:x}", txid)), + electrum_client::Param::Usize(height), + ], + ); + } + let resps = self.inner.batch_call(&batch)?; + + // Validate each proof, retrying once for each stale header. + for ((txid, height, hash), resp) in to_fetch.into_iter().zip(resps.into_iter()) { + let proof: electrum_client::GetMerkleRes = serde_json::from_value(resp)?; + + let mut header = { + let cache = self.block_header_cache.lock().unwrap(); + cache + .get(&(height as u32)) + .copied() + .expect("header already fetched above") + }; + let mut valid = + electrum_client::utils::validate_merkle_proof(&txid, &header.merkle_root, &proof); + if !valid { + header = self.inner.block_header(height)?; + self.block_header_cache + .lock() + .unwrap() + .insert(height as u32, header); + valid = electrum_client::utils::validate_merkle_proof( &txid, &header.merkle_root, - &merkle_res, + &proof, ); } - if is_confirmed_tx { - tx_update.anchors.insert(( - ConfirmationBlockTime { - confirmation_time: header.time as u64, - block_id: BlockId { - height: merkle_res.block_height as u32, - hash: header.block_hash(), - }, + // Build and cache the anchor if merkle proof is valid. + if valid { + let anchor = ConfirmationBlockTime { + confirmation_time: header.time as u64, + block_id: BlockId { + height: height as u32, + hash, }, - txid, - )); + }; + self.anchor_cache + .lock() + .unwrap() + .insert((txid, hash), anchor); + results.push((txid, anchor)); } } - Ok(()) + + Ok(results) } // Helper function which fetches the `TxOut`s of our relevant transactions' previous