From b1461f2955d4dc212976da0da67ab38f3205f2b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Fri, 24 Jan 2025 15:31:41 +1100 Subject: [PATCH 01/16] feat(core)!: Make `TxUpdate` non-exhaustive This may, if we introduce new fields to `TxUdpate`, they can be minor non-breaking updates. --- crates/chain/benches/canonicalization.rs | 12 +-- crates/chain/src/tx_graph.rs | 26 ++--- crates/chain/tests/test_tx_graph.rs | 117 ++++++++++----------- crates/core/src/tx_update.rs | 15 +++ crates/electrum/src/bdk_electrum_client.rs | 6 +- crates/wallet/src/test_utils.rs | 23 ++-- 6 files changed, 98 insertions(+), 101 deletions(-) diff --git a/crates/chain/benches/canonicalization.rs b/crates/chain/benches/canonicalization.rs index 3002a7ca3..52cbf51d6 100644 --- a/crates/chain/benches/canonicalization.rs +++ b/crates/chain/benches/canonicalization.rs @@ -132,10 +132,8 @@ pub fn many_conflicting_unconfirmed(c: &mut Criterion) { }], ..new_tx(i) }; - let update = TxUpdate { - txs: vec![Arc::new(tx)], - ..Default::default() - }; + let mut update = TxUpdate::default(); + update.txs = vec![Arc::new(tx)]; let _ = tx_graph.apply_update_at(update, Some(i as u64)); } })); @@ -169,10 +167,8 @@ pub fn many_chained_unconfirmed(c: &mut Criterion) { ..new_tx(i) }; let txid = tx.compute_txid(); - let update = TxUpdate { - txs: vec![Arc::new(tx)], - ..Default::default() - }; + let mut update = TxUpdate::default(); + update.txs = vec![Arc::new(tx)]; let _ = tx_graph.apply_update_at(update, Some(i as u64)); // Store the next prevout. previous_output = OutPoint::new(txid, 0); diff --git a/crates/chain/src/tx_graph.rs b/crates/chain/src/tx_graph.rs index 2d512cfea..d0f7380a1 100644 --- a/crates/chain/src/tx_graph.rs +++ b/crates/chain/src/tx_graph.rs @@ -110,19 +110,19 @@ use core::{ impl From> for TxUpdate { fn from(graph: TxGraph) -> Self { - Self { - txs: graph.full_txs().map(|tx_node| tx_node.tx).collect(), - txouts: graph - .floating_txouts() - .map(|(op, txo)| (op, txo.clone())) - .collect(), - anchors: graph - .anchors - .into_iter() - .flat_map(|(txid, anchors)| anchors.into_iter().map(move |a| (a, txid))) - .collect(), - seen_ats: graph.last_seen.into_iter().collect(), - } + let mut tx_update = TxUpdate::default(); + tx_update.txs = graph.full_txs().map(|tx_node| tx_node.tx).collect(); + tx_update.txouts = graph + .floating_txouts() + .map(|(op, txo)| (op, txo.clone())) + .collect(); + tx_update.anchors = graph + .anchors + .into_iter() + .flat_map(|(txid, anchors)| anchors.into_iter().map(move |a| (a, txid))) + .collect(); + tx_update.seen_ats = graph.last_seen.into_iter().collect(); + tx_update } } diff --git a/crates/chain/tests/test_tx_graph.rs b/crates/chain/tests/test_tx_graph.rs index ef57ac15b..16d2e6c6d 100644 --- a/crates/chain/tests/test_tx_graph.rs +++ b/crates/chain/tests/test_tx_graph.rs @@ -1231,69 +1231,60 @@ fn tx_graph_update_conversion() { let test_cases: &[TestCase] = &[ ("empty_update", TxUpdate::default()), - ( - "single_tx", - TxUpdate { - txs: vec![make_tx(0).into()], - ..Default::default() - }, - ), - ( - "two_txs", - TxUpdate { - txs: vec![make_tx(0).into(), make_tx(1).into()], - ..Default::default() - }, - ), - ( - "with_floating_txouts", - TxUpdate { - txs: vec![make_tx(0).into(), make_tx(1).into()], - txouts: [ - (OutPoint::new(hash!("a"), 0), make_txout(0)), - (OutPoint::new(hash!("a"), 1), make_txout(1)), - (OutPoint::new(hash!("b"), 0), make_txout(2)), - ] - .into(), - ..Default::default() - }, - ), - ( - "with_anchors", - TxUpdate { - txs: vec![make_tx(0).into(), make_tx(1).into()], - txouts: [ - (OutPoint::new(hash!("a"), 0), make_txout(0)), - (OutPoint::new(hash!("a"), 1), make_txout(1)), - (OutPoint::new(hash!("b"), 0), make_txout(2)), - ] - .into(), - anchors: [ - (ConfirmationBlockTime::default(), hash!("a")), - (ConfirmationBlockTime::default(), hash!("b")), - ] - .into(), - ..Default::default() - }, - ), - ( - "with_seen_ats", - TxUpdate { - txs: vec![make_tx(0).into(), make_tx(1).into()], - txouts: [ - (OutPoint::new(hash!("a"), 0), make_txout(0)), - (OutPoint::new(hash!("a"), 1), make_txout(1)), - (OutPoint::new(hash!("d"), 0), make_txout(2)), - ] - .into(), - anchors: [ - (ConfirmationBlockTime::default(), hash!("a")), - (ConfirmationBlockTime::default(), hash!("b")), - ] - .into(), - seen_ats: [(hash!("c"), 12346)].into_iter().collect(), - }, - ), + ("single_tx", { + let mut tx_update = TxUpdate::default(); + tx_update.txs = vec![make_tx(0).into()]; + tx_update + }), + ("two_txs", { + let mut tx_update = TxUpdate::default(); + tx_update.txs = vec![make_tx(0).into(), make_tx(1).into()]; + tx_update + }), + ("with_floating_txouts", { + let mut tx_update = TxUpdate::default(); + tx_update.txs = vec![make_tx(0).into(), make_tx(1).into()]; + tx_update.txouts = [ + (OutPoint::new(hash!("a"), 0), make_txout(0)), + (OutPoint::new(hash!("a"), 1), make_txout(1)), + (OutPoint::new(hash!("b"), 0), make_txout(2)), + ] + .into(); + tx_update + }), + ("with_anchors", { + let mut tx_update = TxUpdate::default(); + tx_update.txs = vec![make_tx(0).into(), make_tx(1).into()]; + tx_update.txouts = [ + (OutPoint::new(hash!("a"), 0), make_txout(0)), + (OutPoint::new(hash!("a"), 1), make_txout(1)), + (OutPoint::new(hash!("b"), 0), make_txout(2)), + ] + .into(); + tx_update.anchors = [ + (ConfirmationBlockTime::default(), hash!("a")), + (ConfirmationBlockTime::default(), hash!("b")), + ] + .into(); + tx_update + }), + ("with_seen_ats", { + let mut tx_update = TxUpdate::default(); + tx_update.txs = vec![make_tx(0).into(), make_tx(1).into()]; + tx_update.txouts = [ + (OutPoint::new(hash!("a"), 0), make_txout(0)), + (OutPoint::new(hash!("a"), 1), make_txout(1)), + (OutPoint::new(hash!("d"), 0), make_txout(2)), + ] + .into(); + tx_update.anchors = [ + (ConfirmationBlockTime::default(), hash!("a")), + (ConfirmationBlockTime::default(), hash!("b")), + ] + .into(); + tx_update.seen_ats = [(hash!("c"), 12346)].into_iter().collect(); + tx_update + }), ]; for (test_name, update) in test_cases { diff --git a/crates/core/src/tx_update.rs b/crates/core/src/tx_update.rs index 7707578ee..5da2bff87 100644 --- a/crates/core/src/tx_update.rs +++ b/crates/core/src/tx_update.rs @@ -4,7 +4,22 @@ use bitcoin::{OutPoint, Transaction, TxOut, Txid}; /// Data object used to communicate updates about relevant transactions from some chain data source /// to the core model (usually a `bdk_chain::TxGraph`). +/// +/// ```rust +/// use bdk_core::TxUpdate; +/// # use std::sync::Arc; +/// # use bitcoin::{Transaction, transaction::Version, absolute::LockTime}; +/// # let version = Version::ONE; +/// # let lock_time = LockTime::ZERO; +/// # let tx = Arc::new(Transaction { input: vec![], output: vec![], version, lock_time }); +/// # let txid = tx.compute_txid(); +/// # let anchor = (); +/// let mut tx_update = TxUpdate::default(); +/// tx_update.txs.push(tx); +/// tx_update.anchors.insert((anchor, txid)); +/// ``` #[derive(Debug, Clone)] +#[non_exhaustive] pub struct TxUpdate { /// Full transactions. These are transactions that were determined to be relevant to the wallet /// given the request. diff --git a/crates/electrum/src/bdk_electrum_client.rs b/crates/electrum/src/bdk_electrum_client.rs index 621a69e11..e187bf366 100644 --- a/crates/electrum/src/bdk_electrum_client.rs +++ b/crates/electrum/src/bdk_electrum_client.rs @@ -571,10 +571,8 @@ mod test { // `fetch_prev_txout` on a coinbase transaction will trigger a `fetch_tx` on a transaction // with a txid of all zeros. If `fetch_prev_txout` attempts to fetch this transaction, this // assertion will fail. - let mut tx_update = TxUpdate { - txs: vec![Arc::new(coinbase_tx)], - ..Default::default() - }; + let mut tx_update = TxUpdate::default(); + tx_update.txs = vec![Arc::new(coinbase_tx)]; assert!(client.fetch_prev_txout(&mut tx_update).is_ok()); // Ensure that the txouts are empty. diff --git a/crates/wallet/src/test_utils.rs b/crates/wallet/src/test_utils.rs index 7e1778fab..14ff40ed4 100644 --- a/crates/wallet/src/test_utils.rs +++ b/crates/wallet/src/test_utils.rs @@ -4,7 +4,7 @@ use alloc::string::ToString; use alloc::sync::Arc; use core::str::FromStr; -use bdk_chain::{tx_graph, BlockId, ConfirmationBlockTime}; +use bdk_chain::{BlockId, ConfirmationBlockTime}; use bitcoin::{ absolute, hashes::Hash, transaction, Address, Amount, BlockHash, FeeRate, Network, OutPoint, Transaction, TxIn, TxOut, Txid, @@ -314,12 +314,11 @@ pub fn insert_checkpoint(wallet: &mut Wallet, block: BlockId) { /// Insert transaction pub fn insert_tx(wallet: &mut Wallet, tx: Transaction) { + let mut tx_update = bdk_chain::TxUpdate::default(); + tx_update.txs.push(Arc::new(tx)); wallet .apply_update(Update { - tx_update: bdk_chain::TxUpdate { - txs: vec![Arc::new(tx)], - ..Default::default() - }, + tx_update, ..Default::default() }) .unwrap(); @@ -329,12 +328,11 @@ pub fn insert_tx(wallet: &mut Wallet, tx: Transaction) { /// the given `anchor`. Note: to be considered confirmed the anchor block must exist in /// the current active chain. pub fn insert_anchor(wallet: &mut Wallet, txid: Txid, anchor: ConfirmationBlockTime) { + let mut tx_update = bdk_chain::TxUpdate::default(); + tx_update.anchors.insert((anchor, txid)); wallet .apply_update(Update { - tx_update: tx_graph::TxUpdate { - anchors: [(anchor, txid)].into(), - ..Default::default() - }, + tx_update, ..Default::default() }) .unwrap(); @@ -342,12 +340,11 @@ pub fn insert_anchor(wallet: &mut Wallet, txid: Txid, anchor: ConfirmationBlockT /// Marks the given `txid` seen as unconfirmed at `seen_at` pub fn insert_seen_at(wallet: &mut Wallet, txid: Txid, seen_at: u64) { + let mut tx_update = bdk_chain::TxUpdate::default(); + tx_update.seen_ats.insert(txid, seen_at); wallet .apply_update(crate::Update { - tx_update: tx_graph::TxUpdate { - seen_ats: [(txid, seen_at)].into_iter().collect(), - ..Default::default() - }, + tx_update, ..Default::default() }) .unwrap(); From ec25b10257f2d46e70783b3a7dce4c356877abf3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Fri, 24 Jan 2025 17:51:51 +1100 Subject: [PATCH 02/16] feat(core): Add `TxUpdate::evicted` This is a set of txids evicted from the mempool. --- crates/core/src/tx_update.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/crates/core/src/tx_update.rs b/crates/core/src/tx_update.rs index 5da2bff87..089008fd7 100644 --- a/crates/core/src/tx_update.rs +++ b/crates/core/src/tx_update.rs @@ -1,4 +1,4 @@ -use crate::collections::{BTreeMap, BTreeSet, HashMap}; +use crate::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use alloc::{sync::Arc, vec::Vec}; use bitcoin::{OutPoint, Transaction, TxOut, Txid}; @@ -34,6 +34,8 @@ pub struct TxUpdate { /// Seen at times for transactions. This records when a transaction was most recently seen in /// the user's mempool for the sake of tie-breaking other conflicting transactions. pub seen_ats: HashMap, + /// A set of txids missing from the mempool. + pub evicted: HashSet, } impl Default for TxUpdate { @@ -43,6 +45,7 @@ impl Default for TxUpdate { txouts: Default::default(), anchors: Default::default(), seen_ats: Default::default(), + evicted: Default::default(), } } } @@ -62,6 +65,7 @@ impl TxUpdate { .map(|(a, txid)| (map(a), txid)) .collect(), seen_ats: self.seen_ats, + evicted: self.evicted, } } @@ -71,5 +75,6 @@ impl TxUpdate { self.txouts.extend(other.txouts); self.anchors.extend(other.anchors); self.seen_ats.extend(other.seen_ats); + self.evicted.extend(other.evicted); } } From 4a0b920cbd052282d6d86833ec2c7a9d06571ed1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Fri, 24 Jan 2025 19:45:00 +1100 Subject: [PATCH 03/16] feat(chain)!: Change `TxGraph` to understand evicted-at & last-evicted The evicted-at and last-evicted timestamp informs the `TxGraph` when the transaction was last deemed as missing from the mempool. The canonicalization algorithm is changed to disregard transactions with a last-missing timestamp greater or equal to it's last-seen timestamp. The exception is when we have a canonical descendant due to rules of transitivity. --- crates/chain/src/tx_graph.rs | 70 +++++++++++++++++++++++++++-- crates/chain/tests/test_tx_graph.rs | 6 ++- 2 files changed, 71 insertions(+), 5 deletions(-) diff --git a/crates/chain/src/tx_graph.rs b/crates/chain/src/tx_graph.rs index d0f7380a1..43899238a 100644 --- a/crates/chain/src/tx_graph.rs +++ b/crates/chain/src/tx_graph.rs @@ -145,6 +145,7 @@ pub struct TxGraph { spends: BTreeMap>, anchors: HashMap>, last_seen: HashMap, + last_evicted: HashMap, txs_by_highest_conf_heights: BTreeSet<(u32, Txid)>, txs_by_last_seen: BTreeSet<(u64, Txid)>, @@ -162,6 +163,7 @@ impl Default for TxGraph { spends: Default::default(), anchors: Default::default(), last_seen: Default::default(), + last_evicted: Default::default(), txs_by_highest_conf_heights: Default::default(), txs_by_last_seen: Default::default(), empty_outspends: Default::default(), @@ -715,6 +717,34 @@ impl TxGraph { changeset } + /// Inserts the given `evicted_at` for `txid` into [`TxGraph`]. + /// + /// The `evicted_at` timestamp represents the last known time when the transaction was observed + /// to be missing from the mempool. If `txid` was previously recorded with an earlier + /// `evicted_at` value, it is updated only if the new value is greater. + pub fn insert_evicted_at(&mut self, txid: Txid, evicted_at: u64) -> ChangeSet { + let is_changed = match self.last_evicted.entry(txid) { + hash_map::Entry::Occupied(mut e) => { + let last_evicted = e.get_mut(); + let change = *last_evicted < evicted_at; + if change { + *last_evicted = evicted_at; + } + change + } + hash_map::Entry::Vacant(e) => { + e.insert(evicted_at); + true + } + }; + + let mut changeset = ChangeSet::::default(); + if is_changed { + changeset.last_evicted.insert(txid, evicted_at); + } + changeset + } + /// Extends this graph with the given `update`. /// /// The returned [`ChangeSet`] is the set difference between `update` and `self` (transactions that @@ -737,6 +767,10 @@ impl TxGraph { /// transactions (where the transaction with the lower `last_seen` value is omitted from the /// canonical history). /// + /// `evicted_at` is used to track when a transaction was last observed in the mempool before + /// disappearing. It helps determine whether a transaction was potentially replaced, allowing + /// the graph to filter out missing transactions that should no longer be considered valid. + /// /// Not setting a `seen_at` value means unconfirmed transactions introduced by this update will /// not be part of the canonical history of transactions. /// @@ -765,6 +799,14 @@ impl TxGraph { changeset.merge(self.insert_seen_at(txid, seen_at)); } } + for txid in update.evicted { + // We want the `evicted_at` value to override the `last_seen` value of the transaction. + // If there is no `last_seen`, there is no need for the `evicted_at` value since the + // transaction will not be canonical anyway. + if let Some(&evicted_at) = self.last_seen.get(&txid) { + changeset.merge(self.insert_evicted_at(txid, evicted_at)); + } + } changeset } @@ -782,6 +824,7 @@ impl TxGraph { .flat_map(|(txid, anchors)| anchors.iter().map(|a| (a.clone(), *txid))) .collect(), last_seen: self.last_seen.iter().map(|(&k, &v)| (k, v)).collect(), + last_evicted: self.last_evicted.iter().map(|(&k, &v)| (k, v)).collect(), } } @@ -799,6 +842,9 @@ impl TxGraph { for (txid, seen_at) in changeset.last_seen { let _ = self.insert_seen_at(txid, seen_at); } + for (txid, evicted_at) in changeset.last_evicted { + let _ = self.insert_evicted_at(txid, evicted_at); + } } } @@ -969,9 +1015,14 @@ impl TxGraph { /// List txids by descending last-seen order. /// - /// Transactions without last-seens are excluded. - pub fn txids_by_descending_last_seen(&self) -> impl ExactSizeIterator + '_ { - self.txs_by_last_seen.iter().copied().rev() + /// Transactions without last-seens are excluded. Transactions with a last-evicted timestamp + /// equal or higher than it's last-seen timestamp are excluded. + pub fn txids_by_descending_last_seen(&self) -> impl Iterator + '_ { + self.txs_by_last_seen + .iter() + .copied() + .rev() + .filter(|(last_seen, txid)| !matches!(self.last_evicted.get(txid), Some(last_evicted) if last_evicted >= last_seen)) } /// Returns a [`CanonicalIter`]. @@ -1139,6 +1190,8 @@ pub struct ChangeSet { pub anchors: BTreeSet<(A, Txid)>, /// Added last-seen unix timestamps of transactions. pub last_seen: BTreeMap, + /// Added timestamps of when a transaction is last evicted from the mempool. + pub last_evicted: BTreeMap, } impl Default for ChangeSet { @@ -1148,6 +1201,7 @@ impl Default for ChangeSet { txouts: Default::default(), anchors: Default::default(), last_seen: Default::default(), + last_evicted: Default::default(), } } } @@ -1202,6 +1256,14 @@ impl Merge for ChangeSet { .filter(|(txid, update_ls)| self.last_seen.get(txid) < Some(update_ls)) .collect::>(), ); + // last_evicted timestamps should only increase + self.last_evicted.extend( + other + .last_evicted + .into_iter() + .filter(|(txid, update_lm)| self.last_evicted.get(txid) < Some(update_lm)) + .collect::>(), + ); } fn is_empty(&self) -> bool { @@ -1209,6 +1271,7 @@ impl Merge for ChangeSet { && self.txouts.is_empty() && self.anchors.is_empty() && self.last_seen.is_empty() + && self.last_evicted.is_empty() } } @@ -1228,6 +1291,7 @@ impl ChangeSet { self.anchors.into_iter().map(|(a, txid)| (f(a), txid)), ), last_seen: self.last_seen, + last_evicted: self.last_evicted, } } } diff --git a/crates/chain/tests/test_tx_graph.rs b/crates/chain/tests/test_tx_graph.rs index 16d2e6c6d..c44e1d2af 100644 --- a/crates/chain/tests/test_tx_graph.rs +++ b/crates/chain/tests/test_tx_graph.rs @@ -115,7 +115,8 @@ fn insert_txouts() { txs: [Arc::new(update_tx.clone())].into(), txouts: update_ops.clone().into(), anchors: [(conf_anchor, update_tx.compute_txid()),].into(), - last_seen: [(hash!("tx2"), 1000000)].into() + last_seen: [(hash!("tx2"), 1000000)].into(), + last_evicted: [].into(), } ); @@ -168,7 +169,8 @@ fn insert_txouts() { txs: [Arc::new(update_tx.clone())].into(), txouts: update_ops.into_iter().chain(original_ops).collect(), anchors: [(conf_anchor, update_tx.compute_txid()),].into(), - last_seen: [(hash!("tx2"), 1000000)].into() + last_seen: [(hash!("tx2"), 1000000)].into(), + last_evicted: [].into(), } ); } From 40ceef1bc5d6f64c97d8b3c3ff758afe418eed26 Mon Sep 17 00:00:00 2001 From: Wei Chen Date: Fri, 24 Jan 2025 17:18:56 +0800 Subject: [PATCH 04/16] feat(core): add `SpkWithExpectedTxids` support for `SpkClient` --- crates/core/src/spk_client.rs | 49 ++++++++++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/crates/core/src/spk_client.rs b/crates/core/src/spk_client.rs index a5ec813c9..d23504bd9 100644 --- a/crates/core/src/spk_client.rs +++ b/crates/core/src/spk_client.rs @@ -1,7 +1,7 @@ //! Helper types for spk-based blockchain clients. use crate::{ alloc::{boxed::Box, collections::VecDeque, vec::Vec}, - collections::BTreeMap, + collections::{BTreeMap, HashMap, HashSet}, CheckPoint, ConfirmationBlockTime, Indexed, }; use bitcoin::{OutPoint, Script, ScriptBuf, Txid}; @@ -86,6 +86,14 @@ impl SyncProgress { } } +/// [`Script`] with corresponding [`Txid`] histories. +pub struct SpkWithExpectedTxids { + /// Script pubkey. + pub spk: ScriptBuf, + /// Txid history. + pub txids: HashSet, +} + /// Builds a [`SyncRequest`]. #[must_use] pub struct SyncRequestBuilder { @@ -213,6 +221,7 @@ pub struct SyncRequest { chain_tip: Option, spks: VecDeque<(I, ScriptBuf)>, spks_consumed: usize, + spk_histories: HashMap>, txids: VecDeque, txids_consumed: usize, outpoints: VecDeque, @@ -226,6 +235,7 @@ impl Default for SyncRequest { chain_tip: None, spks: VecDeque::new(), spks_consumed: 0, + spk_histories: HashMap::new(), txids: VecDeque::new(), txids_consumed: 0, outpoints: VecDeque::new(), @@ -276,6 +286,23 @@ impl SyncRequest { Some(spk) } + /// Advances the sync request and returns the next [`ScriptBuf`] with corresponding [`Txid`] + /// history. + /// + /// Returns [`None`] when there are no more scripts remaining in the request. + pub fn next_spk_with_history(&mut self) -> Option { + let next_spk = self.next_spk()?; + let spk_history = self + .spk_histories + .get(&next_spk) + .cloned() + .unwrap_or_default(); + Some(SpkWithExpectedTxids { + spk: next_spk, + txids: spk_history, + }) + } + /// Advances the sync request and returns the next [`Txid`]. /// /// Returns [`None`] when there are no more txids remaining in the request. @@ -301,6 +328,13 @@ impl SyncRequest { SyncIter::::new(self) } + /// Iterate over [`ScriptBuf`]s with corresponding [`Txid`] histories contained in this request. + pub fn iter_spks_with_expected_txids( + &mut self, + ) -> impl ExactSizeIterator + '_ { + SyncIter::::new(self) + } + /// Iterate over [`Txid`]s contained in this request. pub fn iter_txids(&mut self) -> impl ExactSizeIterator + '_ { SyncIter::::new(self) @@ -524,6 +558,19 @@ impl Iterator for SyncIter<'_, I, ScriptBuf> { } } +impl Iterator for SyncIter<'_, I, SpkWithExpectedTxids> { + type Item = SpkWithExpectedTxids; + + fn next(&mut self) -> Option { + self.request.next_spk_with_history() + } + + fn size_hint(&self) -> (usize, Option) { + let remaining = self.request.spks.len(); + (remaining, Some(remaining)) + } +} + impl Iterator for SyncIter<'_, I, Txid> { type Item = Txid; From 5b7f851d4d10da6680563d9b17453a964f78928a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Fri, 24 Jan 2025 20:43:07 +1100 Subject: [PATCH 05/16] feat(chain): Update rusqlite_impl to persist `last_evicted` --- crates/chain/src/rusqlite_impl.rs | 80 ++++++++++++++++++++++++++++--- 1 file changed, 74 insertions(+), 6 deletions(-) diff --git a/crates/chain/src/rusqlite_impl.rs b/crates/chain/src/rusqlite_impl.rs index 7b39f53c0..3bc105d0b 100644 --- a/crates/chain/src/rusqlite_impl.rs +++ b/crates/chain/src/rusqlite_impl.rs @@ -264,12 +264,20 @@ impl tx_graph::ChangeSet { format!("{add_confirmation_time_column}; {extract_confirmation_time_from_anchor_column}; {drop_anchor_column}") } + /// Get v2 of sqlite [tx_graph::ChangeSet] schema + pub fn schema_v2() -> String { + format!( + "ALTER TABLE {} ADD COLUMN last_evicted INTEGER", + Self::TXS_TABLE_NAME, + ) + } + /// Initialize sqlite tables. pub fn init_sqlite_tables(db_tx: &rusqlite::Transaction) -> rusqlite::Result<()> { migrate_schema( db_tx, Self::SCHEMA_NAME, - &[&Self::schema_v0(), &Self::schema_v1()], + &[&Self::schema_v0(), &Self::schema_v1(), &Self::schema_v2()], ) } @@ -280,7 +288,7 @@ impl tx_graph::ChangeSet { let mut changeset = Self::default(); let mut statement = db_tx.prepare(&format!( - "SELECT txid, raw_tx, last_seen FROM {}", + "SELECT txid, raw_tx, last_seen, last_evicted FROM {}", Self::TXS_TABLE_NAME, ))?; let row_iter = statement.query_map([], |row| { @@ -288,16 +296,20 @@ impl tx_graph::ChangeSet { row.get::<_, Impl>("txid")?, row.get::<_, Option>>("raw_tx")?, row.get::<_, Option>("last_seen")?, + row.get::<_, Option>("last_evicted")?, )) })?; for row in row_iter { - let (Impl(txid), tx, last_seen) = row?; + let (Impl(txid), tx, last_seen, last_evicted) = row?; if let Some(Impl(tx)) = tx { changeset.txs.insert(Arc::new(tx)); } if let Some(last_seen) = last_seen { changeset.last_seen.insert(txid, last_seen); } + if let Some(last_evicted) = last_evicted { + changeset.last_evicted.insert(txid, last_evicted); + } } let mut statement = db_tx.prepare(&format!( @@ -377,6 +389,19 @@ impl tx_graph::ChangeSet { })?; } + let mut statement = db_tx + .prepare_cached(&format!( + "INSERT INTO {}(txid, last_evicted) VALUES(:txid, :last_evicted) ON CONFLICT(txid) DO UPDATE SET last_evicted=:last_evicted", + Self::TXS_TABLE_NAME, + ))?; + for (&txid, &last_evicted) in &self.last_evicted { + let checked_time = last_evicted.to_sql()?; + statement.execute(named_params! { + ":txid": Impl(txid), + ":last_evicted": Some(checked_time), + })?; + } + let mut statement = db_tx.prepare_cached(&format!( "REPLACE INTO {}(txid, vout, value, script) VALUES(:txid, :vout, :value, :script)", Self::TXOUTS_TABLE_NAME, @@ -628,7 +653,7 @@ mod test { } #[test] - fn v0_to_v1_schema_migration_is_backward_compatible() -> anyhow::Result<()> { + fn v0_to_v2_schema_migration_is_backward_compatible() -> anyhow::Result<()> { type ChangeSet = tx_graph::ChangeSet; let mut conn = rusqlite::Connection::open_in_memory()?; @@ -697,13 +722,17 @@ mod test { } } - // Apply v1 sqlite schema to tables with data + // Apply v1 & v2 sqlite schema to tables with data { let db_tx = conn.transaction()?; migrate_schema( &db_tx, ChangeSet::SCHEMA_NAME, - &[&ChangeSet::schema_v0(), &ChangeSet::schema_v1()], + &[ + &ChangeSet::schema_v0(), + &ChangeSet::schema_v1(), + &ChangeSet::schema_v2(), + ], )?; db_tx.commit()?; } @@ -718,4 +747,43 @@ mod test { Ok(()) } + + #[test] + fn can_persist_last_evicted() -> anyhow::Result<()> { + use bitcoin::hashes::Hash; + + type ChangeSet = tx_graph::ChangeSet; + let mut conn = rusqlite::Connection::open_in_memory()?; + + // Init tables + { + let db_tx = conn.transaction()?; + ChangeSet::init_sqlite_tables(&db_tx)?; + db_tx.commit()?; + } + + let txid = bitcoin::Txid::all_zeros(); + let last_evicted = 100; + + // Persist `last_evicted` + { + let changeset = ChangeSet { + last_evicted: [(txid, last_evicted)].into(), + ..Default::default() + }; + let db_tx = conn.transaction()?; + changeset.persist_to_sqlite(&db_tx)?; + db_tx.commit()?; + } + + // Load from sqlite should succeed + { + let db_tx = conn.transaction()?; + let changeset = ChangeSet::from_sqlite(&db_tx)?; + db_tx.commit()?; + assert_eq!(changeset.last_evicted.get(&txid), Some(&last_evicted)); + } + + Ok(()) + } } From b3d17ad6524c1bbd41e7369b9b44aa91fef03a7c Mon Sep 17 00:00:00 2001 From: Wei Chen Date: Fri, 24 Jan 2025 18:47:55 +0800 Subject: [PATCH 06/16] feat(electrum): add `SpkWithExpectedTxids` support --- crates/electrum/src/bdk_electrum_client.rs | 59 +++++++++++++++------- 1 file changed, 40 insertions(+), 19 deletions(-) diff --git a/crates/electrum/src/bdk_electrum_client.rs b/crates/electrum/src/bdk_electrum_client.rs index e187bf366..9f3a666f2 100644 --- a/crates/electrum/src/bdk_electrum_client.rs +++ b/crates/electrum/src/bdk_electrum_client.rs @@ -1,14 +1,13 @@ use bdk_core::{ - bitcoin::{block::Header, BlockHash, OutPoint, ScriptBuf, Transaction, Txid}, - collections::{BTreeMap, HashMap}, - spk_client::{FullScanRequest, FullScanResponse, SyncRequest, SyncResponse}, + bitcoin::{block::Header, BlockHash, OutPoint, Transaction, Txid}, + collections::{BTreeMap, HashMap, HashSet}, + spk_client::{ + FullScanRequest, FullScanResponse, SpkWithExpectedTxids, SyncRequest, SyncResponse, + }, BlockId, CheckPoint, ConfirmationBlockTime, TxUpdate, }; use electrum_client::{ElectrumApi, Error, HeaderNotification}; -use std::{ - collections::HashSet, - sync::{Arc, Mutex}, -}; +use std::sync::{Arc, Mutex}; /// We include a chain suffix of a certain length for the purpose of robustness. const CHAIN_SUFFIX_LENGTH: u32 = 8; @@ -138,8 +137,17 @@ impl BdkElectrumClient { let mut last_active_indices = BTreeMap::::default(); for keychain in request.keychains() { let spks = request.iter_spks(keychain.clone()); + let spks_with_history = spks.into_iter().map(|(i, spk)| { + ( + i, + SpkWithExpectedTxids { + spk, + txids: HashSet::::new(), + }, + ) + }); if let Some(last_active_index) = - self.populate_with_spks(&mut tx_update, spks, stop_gap, batch_size)? + self.populate_with_spks(&mut tx_update, spks_with_history, stop_gap, batch_size)? { last_active_indices.insert(keychain, last_active_index); } @@ -206,7 +214,7 @@ impl BdkElectrumClient { self.populate_with_spks( &mut tx_update, request - .iter_spks() + .iter_spks_with_expected_txids() .enumerate() .map(|(i, spk)| (i as u32, spk)), usize::MAX, @@ -243,7 +251,7 @@ impl BdkElectrumClient { fn populate_with_spks( &self, tx_update: &mut TxUpdate, - mut spks: impl Iterator, + mut spks_with_history: impl Iterator, stop_gap: usize, batch_size: usize, ) -> Result, Error> { @@ -251,35 +259,48 @@ impl BdkElectrumClient { let mut last_active_index = Option::::None; loop { - let spks = (0..batch_size) - .map_while(|_| spks.next()) + let spks_with_history = (0..batch_size) + .map_while(|_| spks_with_history.next()) .collect::>(); - if spks.is_empty() { + if spks_with_history.is_empty() { return Ok(last_active_index); } - let spk_histories = self - .inner - .batch_script_get_history(spks.iter().map(|(_, s)| s.as_script()))?; + let spk_histories = self.inner.batch_script_get_history( + spks_with_history.iter().map(|(_, s)| s.spk.as_script()), + )?; - for ((spk_index, _spk), spk_history) in spks.into_iter().zip(spk_histories) { - if spk_history.is_empty() { + for ((spk_index, spk_with_history), history_res) in + spks_with_history.into_iter().zip(spk_histories) + { + if history_res.is_empty() { unused_spk_count = unused_spk_count.saturating_add(1); if unused_spk_count >= stop_gap { return Ok(last_active_index); } + tx_update.evicted.extend(spk_with_history.txids); continue; } else { last_active_index = Some(spk_index); unused_spk_count = 0; } - for tx_res in spk_history { + for tx_res in history_res { tx_update.txs.push(self.fetch_tx(tx_res.tx_hash)?); if let Ok(height) = tx_res.height.try_into() { self.validate_merkle_for_anchor(tx_update, tx_res.tx_hash, height)?; } } + + let fetched_txids = tx_update + .txs + .iter() + .map(|tx| tx.compute_txid()) + .collect::>(); + + tx_update + .evicted + .extend(spk_with_history.txids.difference(&fetched_txids).cloned()); } } } From 241dd7d6ac620e7ae66f9ae516ac0767684c253c Mon Sep 17 00:00:00 2001 From: Wei Chen Date: Fri, 24 Jan 2025 18:48:29 +0800 Subject: [PATCH 07/16] feat(esplora): add `SpkWithExpectedTxids` support --- crates/esplora/src/async_ext.rs | 46 ++++++++++++++++++++++-------- crates/esplora/src/blocking_ext.rs | 44 ++++++++++++++++++++-------- 2 files changed, 66 insertions(+), 24 deletions(-) diff --git a/crates/esplora/src/async_ext.rs b/crates/esplora/src/async_ext.rs index 4c1bd0ad7..bb7e641df 100644 --- a/crates/esplora/src/async_ext.rs +++ b/crates/esplora/src/async_ext.rs @@ -1,8 +1,10 @@ use async_trait::async_trait; use bdk_core::collections::{BTreeMap, BTreeSet, HashSet}; -use bdk_core::spk_client::{FullScanRequest, FullScanResponse, SyncRequest, SyncResponse}; +use bdk_core::spk_client::{ + FullScanRequest, FullScanResponse, SpkWithExpectedTxids, SyncRequest, SyncResponse, +}; use bdk_core::{ - bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid}, + bitcoin::{BlockHash, OutPoint, Txid}, BlockId, CheckPoint, ConfirmationBlockTime, Indexed, TxUpdate, }; use esplora_client::Sleeper; @@ -77,10 +79,19 @@ where let mut last_active_indices = BTreeMap::::new(); for keychain in keychains { let keychain_spks = request.iter_spks(keychain.clone()); + let spks_with_history = keychain_spks.into_iter().map(|(i, spk)| { + ( + i, + SpkWithExpectedTxids { + spk, + txids: HashSet::::new(), + }, + ) + }); let (update, last_active_index) = fetch_txs_with_keychain_spks( self, &mut inserted_txs, - keychain_spks, + spks_with_history, stop_gap, parallel_requests, ) @@ -125,7 +136,7 @@ where fetch_txs_with_spks( self, &mut inserted_txs, - request.iter_spks(), + request.iter_spks_with_expected_txids(), parallel_requests, ) .await?, @@ -279,12 +290,12 @@ async fn chain_update( async fn fetch_txs_with_keychain_spks( client: &esplora_client::AsyncClient, inserted_txs: &mut HashSet, - mut keychain_spks: I, + mut spks_with_history: I, stop_gap: usize, parallel_requests: usize, ) -> Result<(TxUpdate, Option), Error> where - I: Iterator> + Send, + I: Iterator> + Send, S: Sleeper + Clone + Send + Sync, { type TxsOfSpkIndex = (u32, Vec); @@ -292,18 +303,22 @@ where let mut update = TxUpdate::::default(); let mut last_index = Option::::None; let mut last_active_index = Option::::None; + let mut spk_txids = HashSet::new(); loop { - let handles = keychain_spks + let handles = spks_with_history .by_ref() .take(parallel_requests) - .map(|(spk_index, spk)| { + .map(|(spk_index, spk_with_history)| { + spk_txids.extend(&spk_with_history.txids); let client = client.clone(); async move { let mut last_seen = None; let mut spk_txs = Vec::new(); loop { - let txs = client.scripthash_txs(&spk, last_seen).await?; + let txs = client + .scripthash_txs(&spk_with_history.spk, last_seen) + .await?; let tx_count = txs.len(); last_seen = txs.last().map(|tx| tx.txid); spk_txs.extend(txs); @@ -344,6 +359,10 @@ where } } + update + .evicted + .extend(spk_txids.difference(inserted_txs).cloned()); + Ok((update, last_active_index)) } @@ -358,18 +377,21 @@ where async fn fetch_txs_with_spks( client: &esplora_client::AsyncClient, inserted_txs: &mut HashSet, - spks: I, + spks_with_history: I, parallel_requests: usize, ) -> Result, Error> where - I: IntoIterator + Send, + I: IntoIterator + Send, I::IntoIter: Send, S: Sleeper + Clone + Send + Sync, { fetch_txs_with_keychain_spks( client, inserted_txs, - spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)), + spks_with_history + .into_iter() + .enumerate() + .map(|(i, spk)| (i as u32, spk)), usize::MAX, parallel_requests, ) diff --git a/crates/esplora/src/blocking_ext.rs b/crates/esplora/src/blocking_ext.rs index 655055b33..b7d170abc 100644 --- a/crates/esplora/src/blocking_ext.rs +++ b/crates/esplora/src/blocking_ext.rs @@ -1,7 +1,9 @@ use bdk_core::collections::{BTreeMap, BTreeSet, HashSet}; -use bdk_core::spk_client::{FullScanRequest, FullScanResponse, SyncRequest, SyncResponse}; +use bdk_core::spk_client::{ + FullScanRequest, FullScanResponse, SpkWithExpectedTxids, SyncRequest, SyncResponse, +}; use bdk_core::{ - bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid}, + bitcoin::{BlockHash, OutPoint, Txid}, BlockId, CheckPoint, ConfirmationBlockTime, Indexed, TxUpdate, }; use esplora_client::{OutputStatus, Tx}; @@ -67,10 +69,19 @@ impl EsploraExt for esplora_client::BlockingClient { let mut last_active_indices = BTreeMap::::new(); for keychain in request.keychains() { let keychain_spks = request.iter_spks(keychain.clone()); + let spks_with_history = keychain_spks.into_iter().map(|(i, spk)| { + ( + i, + SpkWithExpectedTxids { + spk, + txids: HashSet::::new(), + }, + ) + }); let (update, last_active_index) = fetch_txs_with_keychain_spks( self, &mut inserted_txs, - keychain_spks, + spks_with_history, stop_gap, parallel_requests, )?; @@ -116,7 +127,7 @@ impl EsploraExt for esplora_client::BlockingClient { tx_update.extend(fetch_txs_with_spks( self, &mut inserted_txs, - request.iter_spks(), + request.iter_spks_with_expected_txids(), parallel_requests, )?); tx_update.extend(fetch_txs_with_txids( @@ -248,10 +259,10 @@ fn chain_update( Ok(tip) } -fn fetch_txs_with_keychain_spks>>( +fn fetch_txs_with_keychain_spks>>( client: &esplora_client::BlockingClient, inserted_txs: &mut HashSet, - mut keychain_spks: I, + mut spks_with_history: I, stop_gap: usize, parallel_requests: usize, ) -> Result<(TxUpdate, Option), Error> { @@ -260,19 +271,21 @@ fn fetch_txs_with_keychain_spks>>( let mut update = TxUpdate::::default(); let mut last_index = Option::::None; let mut last_active_index = Option::::None; + let mut spk_txids = HashSet::new(); loop { - let handles = keychain_spks + let handles = spks_with_history .by_ref() .take(parallel_requests) - .map(|(spk_index, spk)| { + .map(|(spk_index, spk_with_history)| { + spk_txids.extend(&spk_with_history.txids); std::thread::spawn({ let client = client.clone(); move || -> Result { let mut last_seen = None; let mut spk_txs = Vec::new(); loop { - let txs = client.scripthash_txs(&spk, last_seen)?; + let txs = client.scripthash_txs(&spk_with_history.spk, last_seen)?; let tx_count = txs.len(); last_seen = txs.last().map(|tx| tx.txid); spk_txs.extend(txs); @@ -315,6 +328,10 @@ fn fetch_txs_with_keychain_spks>>( } } + update + .evicted + .extend(spk_txids.difference(inserted_txs).cloned()); + Ok((update, last_active_index)) } @@ -326,16 +343,19 @@ fn fetch_txs_with_keychain_spks>>( /// requests to make in parallel. /// /// Refer to [crate-level docs](crate) for more. -fn fetch_txs_with_spks>( +fn fetch_txs_with_spks>( client: &esplora_client::BlockingClient, inserted_txs: &mut HashSet, - spks: I, + spks_with_history: I, parallel_requests: usize, ) -> Result, Error> { fetch_txs_with_keychain_spks( client, inserted_txs, - spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)), + spks_with_history + .into_iter() + .enumerate() + .map(|(i, spk_with_history)| (i as u32, spk_with_history)), usize::MAX, parallel_requests, ) From bfaf5897492576a584076085d0bb2dce38081051 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Fri, 24 Jan 2025 21:47:46 +1100 Subject: [PATCH 08/16] feat(core): Add `SyncRequestBuilder::expected_txids_of_spk` This is for conveniently adding associations of txid <-> spk. We expect that these txids exist in the spk history. Otherwise, it means the tx is evicted from the mempool and we need to update the `missing_at` value in the sync response. --- crates/core/src/spk_client.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/crates/core/src/spk_client.rs b/crates/core/src/spk_client.rs index d23504bd9..97d6cca0e 100644 --- a/crates/core/src/spk_client.rs +++ b/crates/core/src/spk_client.rs @@ -167,6 +167,23 @@ impl SyncRequestBuilder { self } + /// Add transactions that are expected to exist under a given spk. + /// + /// This is useful for detecting a malicious replacement of an incoming transaction. + pub fn expected_txids_of_spk( + mut self, + txs: impl IntoIterator, + ) -> Self { + for (txid, spk) in txs { + self.inner + .spk_histories + .entry(spk) + .or_default() + .insert(txid); + } + self + } + /// Add [`Txid`]s that will be synced against. pub fn txids(mut self, txids: impl IntoIterator) -> Self { self.inner.txids.extend(txids); From 5e2f242d57ef1bae1011a5ce7941ba1bd0a282d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Fri, 24 Jan 2025 21:51:30 +1100 Subject: [PATCH 09/16] feat(chain): Add `SyncRequestExt::check_unconfirmed_statuses` This is a convenience method for adding unconfirmed txs alongside their associated spks the the sync request. This way, we will be able to detect evictions of these transactions from the mempool. --- crates/chain/src/indexer/keychain_txout.rs | 41 +++++++++++++++++++++- crates/chain/src/indexer/spk_txout.rs | 21 ++++++++++- 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/crates/chain/src/indexer/keychain_txout.rs b/crates/chain/src/indexer/keychain_txout.rs index 4543027cc..555784d13 100644 --- a/crates/chain/src/indexer/keychain_txout.rs +++ b/crates/chain/src/indexer/keychain_txout.rs @@ -7,11 +7,13 @@ use crate::{ spk_client::{FullScanRequestBuilder, SyncRequestBuilder}, spk_iter::BIP32_MAX_INDEX, spk_txout::SpkTxOutIndex, - DescriptorExt, DescriptorId, Indexed, Indexer, KeychainIndexed, SpkIterator, + Anchor, CanonicalIter, CanonicalReason, ChainOracle, DescriptorExt, DescriptorId, Indexed, + Indexer, KeychainIndexed, SpkIterator, }; use alloc::{borrow::ToOwned, vec::Vec}; use bitcoin::{Amount, OutPoint, ScriptBuf, SignedAmount, Transaction, TxOut, Txid}; use core::{ + convert::Infallible, fmt::Debug, ops::{Bound, RangeBounds}, }; @@ -879,6 +881,20 @@ pub trait SyncRequestBuilderExt { /// Add [`Script`](bitcoin::Script)s that are revealed by the `indexer` but currently unused. fn unused_spks_from_indexer(self, indexer: &KeychainTxOutIndex) -> Self; + + /// Add unconfirmed txids and their associated spks. + /// + /// We expect that the chain source should include these txids in their spk histories. If not, + /// the transaction has been evicted for some reason and we will inform the receiving + /// structures in the response. + fn check_unconfirmed_statuses( + self, + indexer: &KeychainTxOutIndex, + canonical_iter: CanonicalIter, + ) -> Self + where + A: Anchor, + C: ChainOracle; } impl SyncRequestBuilderExt for SyncRequestBuilder<(K, u32)> { @@ -892,6 +908,29 @@ impl SyncRequestBuilderExt for SyncRequest fn unused_spks_from_indexer(self, indexer: &KeychainTxOutIndex) -> Self { self.spks_with_indexes(indexer.unused_spks()) } + + fn check_unconfirmed_statuses( + self, + indexer: &KeychainTxOutIndex, + canonical_iter: CanonicalIter, + ) -> Self + where + A: Anchor, + C: ChainOracle, + { + self.expected_txids_of_spk( + canonical_iter + .map(|res| res.expect("infallible")) + .filter(|(_, _, reason)| matches!(reason, CanonicalReason::ObservedIn { .. })) + .flat_map(|(txid, tx, _)| { + indexer + .inner + .relevant_spks_of_tx(tx.as_ref()) + .into_iter() + .map(move |spk| (txid, spk)) + }), + ) + } } /// Trait to extend [`FullScanRequestBuilder`]. diff --git a/crates/chain/src/indexer/spk_txout.rs b/crates/chain/src/indexer/spk_txout.rs index 286e5d2dc..fb45a939d 100644 --- a/crates/chain/src/indexer/spk_txout.rs +++ b/crates/chain/src/indexer/spk_txout.rs @@ -3,7 +3,7 @@ use core::ops::RangeBounds; use crate::{ - collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap}, + collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet}, Indexer, }; use bitcoin::{Amount, OutPoint, ScriptBuf, SignedAmount, Transaction, TxOut, Txid}; @@ -334,4 +334,23 @@ impl SpkTxOutIndex { .any(|output| self.spk_indices.contains_key(&output.script_pubkey)); input_matches || output_matches } + + /// Find relevant script pubkeys associated with a transaction for tracking and validation. + /// + /// Returns a set of script pubkeys from [`SpkTxOutIndex`] that are relevant to the outputs and + /// previous outputs of a given transaction. Inputs are only considered relevant if the parent + /// transactions have been scanned. + pub fn relevant_spks_of_tx(&self, tx: &Transaction) -> HashSet { + let spks_from_inputs = tx.input.iter().filter_map(|txin| { + self.txouts + .get(&txin.previous_output) + .map(|(_, prev_txo)| prev_txo.script_pubkey.clone()) + }); + let spks_from_outputs = tx + .output + .iter() + .filter(|txout| self.spk_indices.contains_key(&txout.script_pubkey)) + .map(|txo| txo.script_pubkey.clone()); + spks_from_inputs.chain(spks_from_outputs).collect() + } } From 4b979c644d7f085b9fcf99db97fee55cbddcef18 Mon Sep 17 00:00:00 2001 From: Wei Chen Date: Fri, 24 Jan 2025 23:26:45 +0800 Subject: [PATCH 10/16] test(electrum): `detect_receive_tx_cancel` --- crates/electrum/tests/test_electrum.rs | 127 ++++++++++++++++++++++++- 1 file changed, 125 insertions(+), 2 deletions(-) diff --git a/crates/electrum/tests/test_electrum.rs b/crates/electrum/tests/test_electrum.rs index 8c89605e4..3be5173fe 100644 --- a/crates/electrum/tests/test_electrum.rs +++ b/crates/electrum/tests/test_electrum.rs @@ -1,6 +1,8 @@ use bdk_chain::{ - bitcoin::{hashes::Hash, Address, Amount, ScriptBuf, WScriptHash}, + bitcoin::{hashes::Hash, secp256k1::Secp256k1, Address, Amount, ScriptBuf, WScriptHash}, + indexer::keychain_txout::KeychainTxOutIndex, local_chain::LocalChain, + miniscript::Descriptor, spk_client::{FullScanRequest, SyncRequest, SyncResponse}, spk_txout::SpkTxOutIndex, Balance, ConfirmationBlockTime, IndexedTxGraph, Indexer, Merge, TxGraph, @@ -14,7 +16,7 @@ use bdk_testenv::{ }; use core::time::Duration; use electrum_client::ElectrumApi; -use std::collections::{BTreeSet, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::str::FromStr; // Batch size for `sync_with_electrum`. @@ -60,6 +62,127 @@ where Ok(update) } +// Ensure that a wallet can detect a malicious replacement of an incoming transaction. +// +// This checks that both the Electrum chain source and the receiving structures properly track the +// replaced transaction as missing. +#[test] +pub fn detect_receive_tx_cancel() -> anyhow::Result<()> { + const SEND_TX_FEE: Amount = Amount::from_sat(1000); + const UNDO_SEND_TX_FEE: Amount = Amount::from_sat(2000); + + use bdk_chain::keychain_txout::SyncRequestBuilderExt; + let env = TestEnv::new()?; + let rpc_client = env.rpc_client(); + let electrum_client = electrum_client::Client::new(env.electrsd.electrum_url.as_str())?; + let client = BdkElectrumClient::new(electrum_client); + + let (receiver_desc, _) = Descriptor::parse_descriptor(&Secp256k1::signing_only(), "tr([73c5da0a/86'/0'/0']xprv9xgqHN7yz9MwCkxsBPN5qetuNdQSUttZNKw1dcYTV4mkaAFiBVGQziHs3NRSWMkCzvgjEe3n9xV8oYywvM8at9yRqyaZVz6TYYhX98VjsUk/0/*)") + .expect("must be valid"); + let mut graph = IndexedTxGraph::::new(KeychainTxOutIndex::new(0)); + let _ = graph.index.insert_descriptor((), receiver_desc.clone())?; + let (chain, _) = LocalChain::from_genesis_hash(env.bitcoind.client.get_block_hash(0)?); + + // Derive the receiving address from the descriptor. + let ((_, receiver_spk), _) = graph.index.reveal_next_spk(()).unwrap(); + let receiver_addr = Address::from_script(&receiver_spk, bdk_chain::bitcoin::Network::Regtest)?; + + env.mine_blocks(101, None)?; + + // Select a UTXO to use as an input for constructing our test transactions. + let selected_utxo = rpc_client + .list_unspent(None, None, None, Some(false), None)? + .into_iter() + // Find a block reward tx. + .find(|utxo| utxo.amount == Amount::from_int_btc(50)) + .expect("Must find a block reward UTXO"); + + // Derive the sender's address from the selected UTXO. + let sender_spk = selected_utxo.script_pub_key.clone(); + let sender_addr = Address::from_script(&sender_spk, bdk_chain::bitcoin::Network::Regtest) + .expect("Failed to derive address from UTXO"); + + // Setup the common inputs used by both `send_tx` and `undo_send_tx`. + let inputs = [CreateRawTransactionInput { + txid: selected_utxo.txid, + vout: selected_utxo.vout, + sequence: None, + }]; + + // Create and sign the `send_tx` that sends funds to the receiver address. + let send_tx_outputs = HashMap::from([( + receiver_addr.to_string(), + selected_utxo.amount - SEND_TX_FEE, + )]); + let send_tx = rpc_client.create_raw_transaction(&inputs, &send_tx_outputs, None, Some(true))?; + let send_tx = rpc_client + .sign_raw_transaction_with_wallet(send_tx.raw_hex(), None, None)? + .transaction()?; + + // Create and sign the `undo_send_tx` transaction. This redirects funds back to the sender + // address. + let undo_send_outputs = HashMap::from([( + sender_addr.to_string(), + selected_utxo.amount - UNDO_SEND_TX_FEE, + )]); + let undo_send_tx = + rpc_client.create_raw_transaction(&inputs, &undo_send_outputs, None, Some(true))?; + let undo_send_tx = rpc_client + .sign_raw_transaction_with_wallet(undo_send_tx.raw_hex(), None, None)? + .transaction()?; + + // Sync after broadcasting the `send_tx`. Ensure that we detect and receive the `send_tx`. + let send_txid = env.rpc_client().send_raw_transaction(send_tx.raw_hex())?; + env.wait_until_electrum_sees_txid(send_txid, Duration::from_secs(6))?; + let sync_request = SyncRequest::builder() + .chain_tip(chain.tip()) + .revealed_spks_from_indexer(&graph.index, ..) + .check_unconfirmed_statuses( + &graph.index, + graph.graph().canonical_iter(&chain, chain.tip().block_id()), + ); + let sync_response = client.sync(sync_request, BATCH_SIZE, true)?; + assert!( + sync_response + .tx_update + .txs + .iter() + .any(|tx| tx.compute_txid() == send_txid), + "sync response must include the send_tx" + ); + let changeset = graph.apply_update(sync_response.tx_update.clone()); + assert!( + changeset.tx_graph.txs.contains(&send_tx), + "tx graph must deem send_tx relevant and include it" + ); + + // Sync after broadcasting the `undo_send_tx`. Verify that `send_tx` is now missing from the + // mempool. + let undo_send_txid = env + .rpc_client() + .send_raw_transaction(undo_send_tx.raw_hex())?; + env.wait_until_electrum_sees_txid(undo_send_txid, Duration::from_secs(6))?; + let sync_request = SyncRequest::builder() + .chain_tip(chain.tip()) + .revealed_spks_from_indexer(&graph.index, ..) + .check_unconfirmed_statuses( + &graph.index, + graph.graph().canonical_iter(&chain, chain.tip().block_id()), + ); + let sync_response = client.sync(sync_request, BATCH_SIZE, true)?; + assert!( + sync_response.tx_update.evicted.contains(&send_txid), + "sync response must track send_tx as missing from mempool" + ); + let changeset = graph.apply_update(sync_response.tx_update.clone()); + assert!( + changeset.tx_graph.last_evicted.contains_key(&send_txid), + "tx graph must track send_tx as missing" + ); + + Ok(()) +} + /// If an spk history contains a tx that spends another unconfirmed tx (chained mempool history), /// the Electrum API will return the tx with a negative height. This should succeed and not panic. #[test] From 3be6c2b745d8f0d2e95e65a893b4282d397904ad Mon Sep 17 00:00:00 2001 From: Wei Chen Date: Fri, 24 Jan 2025 23:27:13 +0800 Subject: [PATCH 11/16] test(esplora): `detect_receive_tx_cancel` --- crates/esplora/tests/async_ext.rs | 140 ++++++++++++++++++++++++++- crates/esplora/tests/blocking_ext.rs | 140 ++++++++++++++++++++++++++- 2 files changed, 270 insertions(+), 10 deletions(-) diff --git a/crates/esplora/tests/async_ext.rs b/crates/esplora/tests/async_ext.rs index b535d2bfa..bac2a6303 100644 --- a/crates/esplora/tests/async_ext.rs +++ b/crates/esplora/tests/async_ext.rs @@ -1,14 +1,144 @@ -use bdk_chain::spk_client::{FullScanRequest, SyncRequest}; -use bdk_chain::{ConfirmationBlockTime, TxGraph}; +use bdk_chain::{ + bitcoin::{secp256k1::Secp256k1, Address, Amount}, + indexer::keychain_txout::KeychainTxOutIndex, + local_chain::LocalChain, + miniscript::Descriptor, + spk_client::{FullScanRequest, SyncRequest}, + ConfirmationBlockTime, IndexedTxGraph, TxGraph, +}; use bdk_esplora::EsploraAsyncExt; use esplora_client::{self, Builder}; -use std::collections::{BTreeSet, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::str::FromStr; use std::thread::sleep; use std::time::Duration; -use bdk_chain::bitcoin::{Address, Amount}; -use bdk_testenv::{anyhow, bitcoincore_rpc::RpcApi, TestEnv}; +use bdk_testenv::{ + anyhow, + bitcoincore_rpc::{json::CreateRawTransactionInput, RawTx, RpcApi}, + TestEnv, +}; + +// Ensure that a wallet can detect a malicious replacement of an incoming transaction. +// +// This checks that both the Electrum chain source and the receiving structures properly track the +// replaced transaction as missing. +#[tokio::test] +pub async fn detect_receive_tx_cancel() -> anyhow::Result<()> { + const SEND_TX_FEE: Amount = Amount::from_sat(1000); + const UNDO_SEND_TX_FEE: Amount = Amount::from_sat(2000); + + use bdk_chain::keychain_txout::SyncRequestBuilderExt; + let env = TestEnv::new()?; + let rpc_client = env.rpc_client(); + let base_url = format!("http://{}", &env.electrsd.esplora_url.clone().unwrap()); + let client = Builder::new(base_url.as_str()).build_async()?; + + let (receiver_desc, _) = Descriptor::parse_descriptor(&Secp256k1::signing_only(), "tr([73c5da0a/86'/0'/0']xprv9xgqHN7yz9MwCkxsBPN5qetuNdQSUttZNKw1dcYTV4mkaAFiBVGQziHs3NRSWMkCzvgjEe3n9xV8oYywvM8at9yRqyaZVz6TYYhX98VjsUk/0/*)") + .expect("must be valid"); + let mut graph = IndexedTxGraph::::new(KeychainTxOutIndex::new(0)); + let _ = graph.index.insert_descriptor((), receiver_desc.clone())?; + let (chain, _) = LocalChain::from_genesis_hash(env.bitcoind.client.get_block_hash(0)?); + + // Derive the receiving address from the descriptor. + let ((_, receiver_spk), _) = graph.index.reveal_next_spk(()).unwrap(); + let receiver_addr = Address::from_script(&receiver_spk, bdk_chain::bitcoin::Network::Regtest)?; + + env.mine_blocks(101, None)?; + + // Select a UTXO to use as an input for constructing our test transactions. + let selected_utxo = rpc_client + .list_unspent(None, None, None, Some(false), None)? + .into_iter() + // Find a block reward tx. + .find(|utxo| utxo.amount == Amount::from_int_btc(50)) + .expect("Must find a block reward UTXO"); + + // Derive the sender's address from the selected UTXO. + let sender_spk = selected_utxo.script_pub_key.clone(); + let sender_addr = Address::from_script(&sender_spk, bdk_chain::bitcoin::Network::Regtest) + .expect("Failed to derive address from UTXO"); + + // Setup the common inputs used by both `send_tx` and `undo_send_tx`. + let inputs = [CreateRawTransactionInput { + txid: selected_utxo.txid, + vout: selected_utxo.vout, + sequence: None, + }]; + + // Create and sign the `send_tx` that sends funds to the receiver address. + let send_tx_outputs = HashMap::from([( + receiver_addr.to_string(), + selected_utxo.amount - SEND_TX_FEE, + )]); + let send_tx = rpc_client.create_raw_transaction(&inputs, &send_tx_outputs, None, Some(true))?; + let send_tx = rpc_client + .sign_raw_transaction_with_wallet(send_tx.raw_hex(), None, None)? + .transaction()?; + + // Create and sign the `undo_send_tx` transaction. This redirects funds back to the sender + // address. + let undo_send_outputs = HashMap::from([( + sender_addr.to_string(), + selected_utxo.amount - UNDO_SEND_TX_FEE, + )]); + let undo_send_tx = + rpc_client.create_raw_transaction(&inputs, &undo_send_outputs, None, Some(true))?; + let undo_send_tx = rpc_client + .sign_raw_transaction_with_wallet(undo_send_tx.raw_hex(), None, None)? + .transaction()?; + + // Sync after broadcasting the `send_tx`. Ensure that we detect and receive the `send_tx`. + let send_txid = env.rpc_client().send_raw_transaction(send_tx.raw_hex())?; + env.wait_until_electrum_sees_txid(send_txid, Duration::from_secs(6))?; + let sync_request = SyncRequest::builder() + .chain_tip(chain.tip()) + .revealed_spks_from_indexer(&graph.index, ..) + .check_unconfirmed_statuses( + &graph.index, + graph.graph().canonical_iter(&chain, chain.tip().block_id()), + ); + let sync_response = client.sync(sync_request, 1).await?; + assert!( + sync_response + .tx_update + .txs + .iter() + .any(|tx| tx.compute_txid() == send_txid), + "sync response must include the send_tx" + ); + let changeset = graph.apply_update(sync_response.tx_update.clone()); + assert!( + changeset.tx_graph.txs.contains(&send_tx), + "tx graph must deem send_tx relevant and include it" + ); + + // Sync after broadcasting the `undo_send_tx`. Verify that `send_tx` is now missing from the + // mempool. + let undo_send_txid = env + .rpc_client() + .send_raw_transaction(undo_send_tx.raw_hex())?; + env.wait_until_electrum_sees_txid(undo_send_txid, Duration::from_secs(6))?; + let sync_request = SyncRequest::builder() + .chain_tip(chain.tip()) + .revealed_spks_from_indexer(&graph.index, ..) + .check_unconfirmed_statuses( + &graph.index, + graph.graph().canonical_iter(&chain, chain.tip().block_id()), + ); + let sync_response = client.sync(sync_request, 1).await?; + assert!( + sync_response.tx_update.evicted.contains(&send_txid), + "sync response must track send_tx as missing from mempool" + ); + let changeset = graph.apply_update(sync_response.tx_update.clone()); + assert!( + changeset.tx_graph.last_evicted.contains_key(&send_txid), + "tx graph must track send_tx as missing" + ); + + Ok(()) +} #[tokio::test] pub async fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> { diff --git a/crates/esplora/tests/blocking_ext.rs b/crates/esplora/tests/blocking_ext.rs index d4191ceb0..cfb8bc526 100644 --- a/crates/esplora/tests/blocking_ext.rs +++ b/crates/esplora/tests/blocking_ext.rs @@ -1,14 +1,144 @@ -use bdk_chain::spk_client::{FullScanRequest, SyncRequest}; -use bdk_chain::{ConfirmationBlockTime, TxGraph}; +use bdk_chain::{ + bitcoin::{secp256k1::Secp256k1, Address, Amount}, + indexer::keychain_txout::KeychainTxOutIndex, + local_chain::LocalChain, + miniscript::Descriptor, + spk_client::{FullScanRequest, SyncRequest}, + ConfirmationBlockTime, IndexedTxGraph, TxGraph, +}; use bdk_esplora::EsploraExt; use esplora_client::{self, Builder}; -use std::collections::{BTreeSet, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::str::FromStr; use std::thread::sleep; use std::time::Duration; -use bdk_chain::bitcoin::{Address, Amount}; -use bdk_testenv::{anyhow, bitcoincore_rpc::RpcApi, TestEnv}; +use bdk_testenv::{ + anyhow, + bitcoincore_rpc::{json::CreateRawTransactionInput, RawTx, RpcApi}, + TestEnv, +}; + +// Ensure that a wallet can detect a malicious replacement of an incoming transaction. +// +// This checks that both the Electrum chain source and the receiving structures properly track the +// replaced transaction as missing. +#[test] +pub fn detect_receive_tx_cancel() -> anyhow::Result<()> { + const SEND_TX_FEE: Amount = Amount::from_sat(1000); + const UNDO_SEND_TX_FEE: Amount = Amount::from_sat(2000); + + use bdk_chain::keychain_txout::SyncRequestBuilderExt; + let env = TestEnv::new()?; + let rpc_client = env.rpc_client(); + let base_url = format!("http://{}", &env.electrsd.esplora_url.clone().unwrap()); + let client = Builder::new(base_url.as_str()).build_blocking(); + + let (receiver_desc, _) = Descriptor::parse_descriptor(&Secp256k1::signing_only(), "tr([73c5da0a/86'/0'/0']xprv9xgqHN7yz9MwCkxsBPN5qetuNdQSUttZNKw1dcYTV4mkaAFiBVGQziHs3NRSWMkCzvgjEe3n9xV8oYywvM8at9yRqyaZVz6TYYhX98VjsUk/0/*)") + .expect("must be valid"); + let mut graph = IndexedTxGraph::::new(KeychainTxOutIndex::new(0)); + let _ = graph.index.insert_descriptor((), receiver_desc.clone())?; + let (chain, _) = LocalChain::from_genesis_hash(env.bitcoind.client.get_block_hash(0)?); + + // Derive the receiving address from the descriptor. + let ((_, receiver_spk), _) = graph.index.reveal_next_spk(()).unwrap(); + let receiver_addr = Address::from_script(&receiver_spk, bdk_chain::bitcoin::Network::Regtest)?; + + env.mine_blocks(101, None)?; + + // Select a UTXO to use as an input for constructing our test transactions. + let selected_utxo = rpc_client + .list_unspent(None, None, None, Some(false), None)? + .into_iter() + // Find a block reward tx. + .find(|utxo| utxo.amount == Amount::from_int_btc(50)) + .expect("Must find a block reward UTXO"); + + // Derive the sender's address from the selected UTXO. + let sender_spk = selected_utxo.script_pub_key.clone(); + let sender_addr = Address::from_script(&sender_spk, bdk_chain::bitcoin::Network::Regtest) + .expect("Failed to derive address from UTXO"); + + // Setup the common inputs used by both `send_tx` and `undo_send_tx`. + let inputs = [CreateRawTransactionInput { + txid: selected_utxo.txid, + vout: selected_utxo.vout, + sequence: None, + }]; + + // Create and sign the `send_tx` that sends funds to the receiver address. + let send_tx_outputs = HashMap::from([( + receiver_addr.to_string(), + selected_utxo.amount - SEND_TX_FEE, + )]); + let send_tx = rpc_client.create_raw_transaction(&inputs, &send_tx_outputs, None, Some(true))?; + let send_tx = rpc_client + .sign_raw_transaction_with_wallet(send_tx.raw_hex(), None, None)? + .transaction()?; + + // Create and sign the `undo_send_tx` transaction. This redirects funds back to the sender + // address. + let undo_send_outputs = HashMap::from([( + sender_addr.to_string(), + selected_utxo.amount - UNDO_SEND_TX_FEE, + )]); + let undo_send_tx = + rpc_client.create_raw_transaction(&inputs, &undo_send_outputs, None, Some(true))?; + let undo_send_tx = rpc_client + .sign_raw_transaction_with_wallet(undo_send_tx.raw_hex(), None, None)? + .transaction()?; + + // Sync after broadcasting the `send_tx`. Ensure that we detect and receive the `send_tx`. + let send_txid = env.rpc_client().send_raw_transaction(send_tx.raw_hex())?; + env.wait_until_electrum_sees_txid(send_txid, Duration::from_secs(6))?; + let sync_request = SyncRequest::builder() + .chain_tip(chain.tip()) + .revealed_spks_from_indexer(&graph.index, ..) + .check_unconfirmed_statuses( + &graph.index, + graph.graph().canonical_iter(&chain, chain.tip().block_id()), + ); + let sync_response = client.sync(sync_request, 1)?; + assert!( + sync_response + .tx_update + .txs + .iter() + .any(|tx| tx.compute_txid() == send_txid), + "sync response must include the send_tx" + ); + let changeset = graph.apply_update(sync_response.tx_update.clone()); + assert!( + changeset.tx_graph.txs.contains(&send_tx), + "tx graph must deem send_tx relevant and include it" + ); + + // Sync after broadcasting the `undo_send_tx`. Verify that `send_tx` is now missing from the + // mempool. + let undo_send_txid = env + .rpc_client() + .send_raw_transaction(undo_send_tx.raw_hex())?; + env.wait_until_electrum_sees_txid(undo_send_txid, Duration::from_secs(6))?; + let sync_request = SyncRequest::builder() + .chain_tip(chain.tip()) + .revealed_spks_from_indexer(&graph.index, ..) + .check_unconfirmed_statuses( + &graph.index, + graph.graph().canonical_iter(&chain, chain.tip().block_id()), + ); + let sync_response = client.sync(sync_request, 1)?; + assert!( + sync_response.tx_update.evicted.contains(&send_txid), + "sync response must track send_tx as missing from mempool" + ); + let changeset = graph.apply_update(sync_response.tx_update.clone()); + assert!( + changeset.tx_graph.last_evicted.contains_key(&send_txid), + "tx graph must track send_tx as missing" + ); + + Ok(()) +} #[test] pub fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> { From 5f6dfe74ef9b6d589f80bb8852b65aac3b453f1d Mon Sep 17 00:00:00 2001 From: valued mammal Date: Sat, 1 Feb 2025 10:12:38 -0500 Subject: [PATCH 12/16] feat(chain): add method `iter_spks_with_expected_txids` --- crates/chain/src/indexed_tx_graph.rs | 46 ++++++++++++++++++++++++-- crates/chain/src/tx_graph.rs | 48 +++++++++++++++++++++++++++- 2 files changed, 90 insertions(+), 4 deletions(-) diff --git a/crates/chain/src/indexed_tx_graph.rs b/crates/chain/src/indexed_tx_graph.rs index 039924c92..dc5221556 100644 --- a/crates/chain/src/indexed_tx_graph.rs +++ b/crates/chain/src/indexed_tx_graph.rs @@ -1,13 +1,16 @@ //! Contains the [`IndexedTxGraph`] and associated types. Refer to the //! [`IndexedTxGraph`] documentation for more. -use core::fmt::Debug; + +use core::fmt; +use core::ops::RangeBounds; use alloc::{sync::Arc, vec::Vec}; -use bitcoin::{Block, OutPoint, Transaction, TxOut, Txid}; +use bitcoin::{Block, OutPoint, ScriptBuf, Transaction, TxOut, Txid}; use crate::{ + spk_txout::SpkTxOutIndex, tx_graph::{self, TxGraph}, - Anchor, BlockId, Indexer, Merge, TxPosInBlock, + Anchor, BlockId, ChainOracle, Indexer, Merge, TxPosInBlock, }; /// The [`IndexedTxGraph`] combines a [`TxGraph`] and an [`Indexer`] implementation. @@ -324,6 +327,43 @@ where indexer, } } + + /// Inserts the given `evicted_at` for `txid`. + /// + /// The `evicted_at` timestamp represents the last known time when the transaction was observed + /// to be missing from the mempool. If `txid` was previously recorded with an earlier + /// `evicted_at` value, it is updated only if the new value is greater. + pub fn insert_evicted_at(&mut self, txid: Txid, evicted_at: u64) -> ChangeSet { + let tx_graph = self.graph.insert_evicted_at(txid, evicted_at); + ChangeSet { + tx_graph, + ..Default::default() + } + } +} + +impl IndexedTxGraph> +where + A: Anchor, + I: fmt::Debug + Clone + Ord, +{ + /// Returns an iterator over unconfirmed transactions and their associated script pubkeys, + /// filtered within the specified `range`. + /// + /// This function delegates the transaction filtering to [`TxGraph::iter_spks_with_expected_txids`], + /// using the [`SpkTxOutIndex`] stored in [`IndexedTxGraph`]. The [`TxGraph`] internally scans + /// for unconfirmed transactions relevant to the indexed outputs. + pub fn iter_spks_with_expected_txids<'a, O>( + &'a self, + chain: &'a O, + range: impl RangeBounds + 'a, + ) -> impl Iterator + 'a + where + O: ChainOracle, + { + self.graph + .iter_spks_with_expected_txids(chain, &self.index, range) + } } impl AsRef> for IndexedTxGraph { diff --git a/crates/chain/src/tx_graph.rs b/crates/chain/src/tx_graph.rs index 43899238a..979bc01d2 100644 --- a/crates/chain/src/tx_graph.rs +++ b/crates/chain/src/tx_graph.rs @@ -91,9 +91,11 @@ //! [`insert_txout`]: TxGraph::insert_txout use crate::collections::*; +use crate::spk_txout::SpkTxOutIndex; use crate::BlockId; use crate::CanonicalIter; use crate::CanonicalReason; +use crate::Indexer; use crate::ObservedIn; use crate::{Anchor, Balance, ChainOracle, ChainPosition, FullTxOut, Merge}; use alloc::collections::vec_deque::VecDeque; @@ -105,7 +107,7 @@ use bitcoin::{Amount, OutPoint, ScriptBuf, SignedAmount, Transaction, TxOut, Txi use core::fmt::{self, Formatter}; use core::{ convert::Infallible, - ops::{Deref, RangeInclusive}, + ops::{Deref, RangeBounds, RangeInclusive}, }; impl From> for TxUpdate { @@ -1161,6 +1163,50 @@ impl TxGraph { self.try_balance(chain, chain_tip, outpoints, trust_predicate) .expect("oracle is infallible") } + + /// Returns an iterator over unconfirmed transactions and their associated script pubkeys, + /// filtered within the specified `range`. + /// + /// This function scans the transaction graph for unconfirmed transactions relevant to the + /// provided [`SpkTxOutIndex`], determining which transactions should be considered based on + /// indexed outputs. + pub fn iter_spks_with_expected_txids<'a, C, I>( + &'a self, + chain: &'a C, + indexer: &'a SpkTxOutIndex, + range: impl RangeBounds + 'a, + ) -> impl Iterator + 'a + where + C: ChainOracle, + I: fmt::Debug + Clone + Ord, + { + let chain_tip = chain.get_chain_tip().unwrap(); + + self.list_canonical_txs(chain, chain_tip) + .filter(|c| !c.chain_position.is_confirmed() && indexer.is_tx_relevant(&c.tx_node)) + .flat_map(move |c| { + let txid = c.tx_node.txid; + let spks = c + .tx_node + .input + .iter() + .map(|txin| txin.previous_output) + .chain( + c.tx_node + .output + .iter() + .enumerate() + .map(move |(vout, _)| OutPoint::new(txid, vout as u32)), + ) + .flat_map(|op| match indexer.txout(op) { + Some((i, txo)) if range.contains(i) => Some(txo.script_pubkey.clone()), + _ => None, + }) + .collect::>(); + + core::iter::repeat(txid).zip(spks) + }) + } } /// The [`ChangeSet`] represents changes to a [`TxGraph`]. From ef45337fa05ffeed3a42a723b0538d9e1c70121c Mon Sep 17 00:00:00 2001 From: valued mammal Date: Sat, 1 Feb 2025 10:15:55 -0500 Subject: [PATCH 13/16] test(rpc): add `test_expect_tx_evicted` --- crates/bitcoind_rpc/tests/test_emitter.rs | 103 ++++++++++++++++++++++ 1 file changed, 103 insertions(+) diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs index 14b0c9212..e7614b8cd 100644 --- a/crates/bitcoind_rpc/tests/test_emitter.rs +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -731,3 +731,106 @@ fn no_agreement_point() -> anyhow::Result<()> { Ok(()) } + +#[test] +fn test_expect_tx_evicted() -> anyhow::Result<()> { + use bdk_bitcoind_rpc::bitcoincore_rpc::bitcoin; + use bdk_bitcoind_rpc::bitcoincore_rpc::bitcoincore_rpc_json::CreateRawTransactionInput; + use bdk_chain::miniscript; + use bdk_chain::spk_txout::SpkTxOutIndex; + use bdk_chain::ConfirmationBlockTime; + use bitcoin::constants::genesis_block; + use bitcoin::secp256k1::Secp256k1; + use bitcoin::Network; + use std::collections::HashMap; + let env = TestEnv::new()?; + + let s = bdk_testenv::utils::DESCRIPTORS[0]; + let desc = miniscript::Descriptor::parse_descriptor(&Secp256k1::new(), s) + .unwrap() + .0; + let spk = desc.at_derivation_index(0)?.script_pubkey(); + + let chain = LocalChain::from_genesis_hash(genesis_block(Network::Regtest).block_hash()).0; + let chain_tip = chain.tip().block_id(); + + let mut index = SpkTxOutIndex::default(); + index.insert_spk(("external", 0u32), spk.clone()); + let mut graph = IndexedTxGraph::::new(index); + + // Receive tx1. + let _ = env.mine_blocks(100, None)?; + let txid_1 = env.send( + &Address::from_script(&spk, Network::Regtest)?, + Amount::ONE_BTC, + )?; + + let mut emitter = Emitter::new(env.rpc_client(), chain.tip(), 1); + let changeset = graph.batch_insert_unconfirmed(emitter.mempool()?); + assert!(changeset + .tx_graph + .txs + .iter() + .any(|tx| tx.compute_txid() == txid_1)); + let seen_at = graph + .graph() + .get_tx_node(txid_1) + .unwrap() + .last_seen_unconfirmed + .unwrap(); + + // Double spend tx1. + + // Get `prevout` from core. + let core = env.rpc_client(); + let tx1 = &core.get_raw_transaction(&txid_1, None)?; + let txin = &tx1.input[0]; + let op = txin.previous_output; + + // Create `tx1b` using the previous output from tx1. + let utxo = CreateRawTransactionInput { + txid: op.txid, + vout: op.vout, + sequence: None, + }; + let addr = core.get_new_address(None, None)?.assume_checked(); + let tx = core.create_raw_transaction( + &[utxo], + &HashMap::from([(addr.to_string(), Amount::from_btc(49.99)?)]), + None, + None, + )?; + let res = core.sign_raw_transaction_with_wallet(&tx, None, None)?; + let tx1b = res.transaction()?; + + // Send the tx. + let txid_2 = core.send_raw_transaction(&tx1b)?; + + // Retrieve the expected unconfirmed txids and spks from the graph. + let exp_spk_txids = graph.expected_unconfirmed_spk_txids(&chain, chain_tip, ..)?; + assert_eq!(exp_spk_txids, vec![(txid_1, spk)]); + + // Check that mempool emission contains evicted txid. + let mempool_event = emitter.mempool()?; + let unseen_txids: Vec = mempool_event + .emitted_txs + .iter() + .map(|(tx, _)| tx.compute_txid()) + .collect(); + assert!(unseen_txids.contains(&txid_2)); + + // Update graph with evicted tx. + let exp_txids = exp_spk_txids.into_iter().map(|(txid, _)| txid); + let evicted_txids = mempool_event.evicted_txids(exp_txids); + for txid in evicted_txids { + let _ = graph.insert_evicted_at(txid, seen_at); + } + + assert!(graph + .graph() + .list_canonical_txs(&chain, chain_tip) + .next() + .is_none()); + + Ok(()) +} From f2a2095a89492105b090820f26e588a0b8ef97bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Thu, 6 Feb 2025 19:19:15 +1100 Subject: [PATCH 14/16] feat(chain)!: `IndexedTxGraph::expected_unconfirmed_spk_txids` Make this method work when the indexer is `KeychainTxOutIndex`. We reintroduce the ability to get the internal `SpkTxOutIndex` from `KeychainTxOutIndex` so that `SpkTxOutIndex::relevant_spks_of_tx` is callable from `KeychainTxOutIndex`. This commit renames `iter_spks_with_expected_txids` to `expected_unconfirmed_spk_txids` for `TxGraph`, `IndexedTxGraph` and `SyncRequestBuilder`. Docs are also improved to explain how these methods are useful. Remove unused `SyncRequestBuilder` methods. --- crates/bitcoind_rpc/tests/test_emitter.rs | 17 +++--- crates/chain/src/indexed_tx_graph.rs | 47 +++++++++++---- crates/chain/src/indexer/keychain_txout.rs | 52 ++++------------ crates/chain/src/indexer/spk_txout.rs | 6 ++ crates/chain/src/tx_graph.rs | 69 +++++++++++----------- crates/core/src/spk_client.rs | 2 +- crates/electrum/tests/test_electrum.rs | 18 +++--- crates/esplora/tests/async_ext.rs | 18 +++--- crates/esplora/tests/blocking_ext.rs | 18 +++--- 9 files changed, 126 insertions(+), 121 deletions(-) diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs index e7614b8cd..839943c49 100644 --- a/crates/bitcoind_rpc/tests/test_emitter.rs +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -806,15 +806,14 @@ fn test_expect_tx_evicted() -> anyhow::Result<()> { // Send the tx. let txid_2 = core.send_raw_transaction(&tx1b)?; - // Retrieve the expected unconfirmed txids and spks from the graph. - let exp_spk_txids = graph.expected_unconfirmed_spk_txids(&chain, chain_tip, ..)?; - assert_eq!(exp_spk_txids, vec![(txid_1, spk)]); - - // Check that mempool emission contains evicted txid. - let mempool_event = emitter.mempool()?; - let unseen_txids: Vec = mempool_event - .emitted_txs - .iter() + // We evict the expected txs that are missing from mempool. + let exp_txids = graph + .expected_unconfirmed_spk_txids(&chain, chain_tip, ..)? + .collect::>(); + assert_eq!(exp_txids, vec![(txid_1, spk)]); + let mempool = emitter + .mempool()? + .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect(); assert!(unseen_txids.contains(&txid_2)); diff --git a/crates/chain/src/indexed_tx_graph.rs b/crates/chain/src/indexed_tx_graph.rs index dc5221556..acbddb3c6 100644 --- a/crates/chain/src/indexed_tx_graph.rs +++ b/crates/chain/src/indexed_tx_graph.rs @@ -1,7 +1,6 @@ //! Contains the [`IndexedTxGraph`] and associated types. Refer to the //! [`IndexedTxGraph`] documentation for more. -use core::fmt; use core::ops::RangeBounds; use alloc::{sync::Arc, vec::Vec}; @@ -345,24 +344,50 @@ where impl IndexedTxGraph> where A: Anchor, - I: fmt::Debug + Clone + Ord, + I: core::fmt::Debug + Clone + Ord, { - /// Returns an iterator over unconfirmed transactions and their associated script pubkeys, - /// filtered within the specified `range`. + /// Iterate over unconfirmed txids that we expect to exist in a chain source's spk history + /// response. /// - /// This function delegates the transaction filtering to [`TxGraph::iter_spks_with_expected_txids`], - /// using the [`SpkTxOutIndex`] stored in [`IndexedTxGraph`]. The [`TxGraph`] internally scans - /// for unconfirmed transactions relevant to the indexed outputs. - pub fn iter_spks_with_expected_txids<'a, O>( + /// This is used to fill [`SyncRequestBuilder::expected_unconfirmed_spk_txids`](bdk_core::spk_client::SyncRequestBuilder::expected_unconfirmed_spk_txids). + /// + /// The spk range can be contrained with `range`. + pub fn expected_unconfirmed_spk_txids<'a, O>( &'a self, chain: &'a O, + chain_tip: BlockId, range: impl RangeBounds + 'a, - ) -> impl Iterator + 'a + ) -> Result, O::Error> + where + O: ChainOracle, + { + self.graph + .expected_unconfirmed_spk_txids(chain, chain_tip, &self.index, range) + } +} + +impl IndexedTxGraph> +where + A: Anchor, + K: core::fmt::Debug + Clone + Ord, +{ + /// Iterate over unconfirmed txids that we expect to exist in a chain source's spk history + /// response. + /// + /// This is used to fill [`SyncRequestBuilder::expected_unconfirmed_spk_txids`](bdk_core::spk_client::SyncRequestBuilder::expected_unconfirmed_spk_txids). + /// + /// The spk range can be contrained with `range`. + pub fn expected_unconfirmed_spk_txids<'a, O>( + &'a self, + chain: &'a O, + chain_tip: BlockId, + range: impl RangeBounds<(K, u32)> + 'a, + ) -> Result, O::Error> where - O: ChainOracle, + O: ChainOracle, { self.graph - .iter_spks_with_expected_txids(chain, &self.index, range) + .expected_unconfirmed_spk_txids(chain, chain_tip, &self.index, range) } } diff --git a/crates/chain/src/indexer/keychain_txout.rs b/crates/chain/src/indexer/keychain_txout.rs index 555784d13..8f159fb38 100644 --- a/crates/chain/src/indexer/keychain_txout.rs +++ b/crates/chain/src/indexer/keychain_txout.rs @@ -7,13 +7,11 @@ use crate::{ spk_client::{FullScanRequestBuilder, SyncRequestBuilder}, spk_iter::BIP32_MAX_INDEX, spk_txout::SpkTxOutIndex, - Anchor, CanonicalIter, CanonicalReason, ChainOracle, DescriptorExt, DescriptorId, Indexed, - Indexer, KeychainIndexed, SpkIterator, + DescriptorExt, DescriptorId, Indexed, Indexer, KeychainIndexed, SpkIterator, }; use alloc::{borrow::ToOwned, vec::Vec}; use bitcoin::{Amount, OutPoint, ScriptBuf, SignedAmount, Transaction, TxOut, Txid}; use core::{ - convert::Infallible, fmt::Debug, ops::{Bound, RangeBounds}, }; @@ -138,6 +136,12 @@ impl Default for KeychainTxOutIndex { } } +impl AsRef> for KeychainTxOutIndex { + fn as_ref(&self) -> &SpkTxOutIndex<(K, u32)> { + self.inner() + } +} + impl Indexer for KeychainTxOutIndex { type ChangeSet = ChangeSet; @@ -202,6 +206,11 @@ impl KeychainTxOutIndex { lookahead, } } + + /// Get a reference to the internal [`SpkTxOutIndex`]. + pub fn inner(&self) -> &SpkTxOutIndex<(K, u32)> { + &self.inner + } } /// Methods that are *re-exposed* from the internal [`SpkTxOutIndex`]. @@ -881,20 +890,6 @@ pub trait SyncRequestBuilderExt { /// Add [`Script`](bitcoin::Script)s that are revealed by the `indexer` but currently unused. fn unused_spks_from_indexer(self, indexer: &KeychainTxOutIndex) -> Self; - - /// Add unconfirmed txids and their associated spks. - /// - /// We expect that the chain source should include these txids in their spk histories. If not, - /// the transaction has been evicted for some reason and we will inform the receiving - /// structures in the response. - fn check_unconfirmed_statuses( - self, - indexer: &KeychainTxOutIndex, - canonical_iter: CanonicalIter, - ) -> Self - where - A: Anchor, - C: ChainOracle; } impl SyncRequestBuilderExt for SyncRequestBuilder<(K, u32)> { @@ -908,29 +903,6 @@ impl SyncRequestBuilderExt for SyncRequest fn unused_spks_from_indexer(self, indexer: &KeychainTxOutIndex) -> Self { self.spks_with_indexes(indexer.unused_spks()) } - - fn check_unconfirmed_statuses( - self, - indexer: &KeychainTxOutIndex, - canonical_iter: CanonicalIter, - ) -> Self - where - A: Anchor, - C: ChainOracle, - { - self.expected_txids_of_spk( - canonical_iter - .map(|res| res.expect("infallible")) - .filter(|(_, _, reason)| matches!(reason, CanonicalReason::ObservedIn { .. })) - .flat_map(|(txid, tx, _)| { - indexer - .inner - .relevant_spks_of_tx(tx.as_ref()) - .into_iter() - .map(move |spk| (txid, spk)) - }), - ) - } } /// Trait to extend [`FullScanRequestBuilder`]. diff --git a/crates/chain/src/indexer/spk_txout.rs b/crates/chain/src/indexer/spk_txout.rs index fb45a939d..c3b33dd3e 100644 --- a/crates/chain/src/indexer/spk_txout.rs +++ b/crates/chain/src/indexer/spk_txout.rs @@ -42,6 +42,12 @@ pub struct SpkTxOutIndex { spk_txouts: BTreeSet<(I, OutPoint)>, } +impl AsRef> for SpkTxOutIndex { + fn as_ref(&self) -> &SpkTxOutIndex { + self + } +} + impl Default for SpkTxOutIndex { fn default() -> Self { Self { diff --git a/crates/chain/src/tx_graph.rs b/crates/chain/src/tx_graph.rs index 979bc01d2..bc1f200fb 100644 --- a/crates/chain/src/tx_graph.rs +++ b/crates/chain/src/tx_graph.rs @@ -95,7 +95,6 @@ use crate::spk_txout::SpkTxOutIndex; use crate::BlockId; use crate::CanonicalIter; use crate::CanonicalReason; -use crate::Indexer; use crate::ObservedIn; use crate::{Anchor, Balance, ChainOracle, ChainPosition, FullTxOut, Merge}; use alloc::collections::vec_deque::VecDeque; @@ -1164,48 +1163,46 @@ impl TxGraph { .expect("oracle is infallible") } - /// Returns an iterator over unconfirmed transactions and their associated script pubkeys, - /// filtered within the specified `range`. + /// Iterate over unconfirmed txids that we expect to exist in a chain source's spk history + /// response. /// - /// This function scans the transaction graph for unconfirmed transactions relevant to the - /// provided [`SpkTxOutIndex`], determining which transactions should be considered based on - /// indexed outputs. - pub fn iter_spks_with_expected_txids<'a, C, I>( + /// This is used to fill [`SyncRequestBuilder::expected_unconfirmed_spk_txids`](bdk_core::spk_client::SyncRequestBuilder::expected_unconfirmed_spk_txids). + /// + /// The spk range can be contrained with `range`. + pub fn expected_unconfirmed_spk_txids<'a, C, I>( &'a self, chain: &'a C, - indexer: &'a SpkTxOutIndex, + chain_tip: BlockId, + indexer: &'a impl AsRef>, range: impl RangeBounds + 'a, - ) -> impl Iterator + 'a + ) -> Result, C::Error> where - C: ChainOracle, - I: fmt::Debug + Clone + Ord, + C: ChainOracle, + I: fmt::Debug + Clone + Ord + 'a, { - let chain_tip = chain.get_chain_tip().unwrap(); - - self.list_canonical_txs(chain, chain_tip) - .filter(|c| !c.chain_position.is_confirmed() && indexer.is_tx_relevant(&c.tx_node)) - .flat_map(move |c| { - let txid = c.tx_node.txid; - let spks = c - .tx_node - .input + let mut spk_txs = vec![]; + for res in self.try_list_canonical_txs(chain, chain_tip) { + let canonical_tx = res?; + if canonical_tx.chain_position.is_confirmed() { + continue; + } + let txid = canonical_tx.tx_node.txid; + let tx = canonical_tx.tx_node.tx; + let outpoints = tx.input.iter().map(|txin| txin.previous_output).chain( + tx.output .iter() - .map(|txin| txin.previous_output) - .chain( - c.tx_node - .output - .iter() - .enumerate() - .map(move |(vout, _)| OutPoint::new(txid, vout as u32)), - ) - .flat_map(|op| match indexer.txout(op) { - Some((i, txo)) if range.contains(i) => Some(txo.script_pubkey.clone()), - _ => None, - }) - .collect::>(); - - core::iter::repeat(txid).zip(spks) - }) + .enumerate() + .map(|(vout, _)| OutPoint::new(txid, vout as u32)), + ); + for op in outpoints { + if let Some((index, txo)) = indexer.as_ref().txout(op) { + if range.contains(index) { + spk_txs.push((txid, txo.script_pubkey.clone())); + } + } + } + } + Ok(spk_txs) } } diff --git a/crates/core/src/spk_client.rs b/crates/core/src/spk_client.rs index 97d6cca0e..728d9a6fd 100644 --- a/crates/core/src/spk_client.rs +++ b/crates/core/src/spk_client.rs @@ -170,7 +170,7 @@ impl SyncRequestBuilder { /// Add transactions that are expected to exist under a given spk. /// /// This is useful for detecting a malicious replacement of an incoming transaction. - pub fn expected_txids_of_spk( + pub fn expected_unconfirmed_spk_txids( mut self, txs: impl IntoIterator, ) -> Self { diff --git a/crates/electrum/tests/test_electrum.rs b/crates/electrum/tests/test_electrum.rs index 3be5173fe..d0db849b0 100644 --- a/crates/electrum/tests/test_electrum.rs +++ b/crates/electrum/tests/test_electrum.rs @@ -137,10 +137,11 @@ pub fn detect_receive_tx_cancel() -> anyhow::Result<()> { let sync_request = SyncRequest::builder() .chain_tip(chain.tip()) .revealed_spks_from_indexer(&graph.index, ..) - .check_unconfirmed_statuses( - &graph.index, - graph.graph().canonical_iter(&chain, chain.tip().block_id()), - ); + .expected_unconfirmed_spk_txids(graph.expected_unconfirmed_spk_txids( + &chain, + chain.tip().block_id(), + .., + )?); let sync_response = client.sync(sync_request, BATCH_SIZE, true)?; assert!( sync_response @@ -165,10 +166,11 @@ pub fn detect_receive_tx_cancel() -> anyhow::Result<()> { let sync_request = SyncRequest::builder() .chain_tip(chain.tip()) .revealed_spks_from_indexer(&graph.index, ..) - .check_unconfirmed_statuses( - &graph.index, - graph.graph().canonical_iter(&chain, chain.tip().block_id()), - ); + .expected_unconfirmed_spk_txids(graph.expected_unconfirmed_spk_txids( + &chain, + chain.tip().block_id(), + .., + )?); let sync_response = client.sync(sync_request, BATCH_SIZE, true)?; assert!( sync_response.tx_update.evicted.contains(&send_txid), diff --git a/crates/esplora/tests/async_ext.rs b/crates/esplora/tests/async_ext.rs index bac2a6303..f6b083fab 100644 --- a/crates/esplora/tests/async_ext.rs +++ b/crates/esplora/tests/async_ext.rs @@ -94,10 +94,11 @@ pub async fn detect_receive_tx_cancel() -> anyhow::Result<()> { let sync_request = SyncRequest::builder() .chain_tip(chain.tip()) .revealed_spks_from_indexer(&graph.index, ..) - .check_unconfirmed_statuses( - &graph.index, - graph.graph().canonical_iter(&chain, chain.tip().block_id()), - ); + .expected_unconfirmed_spk_txids(graph.expected_unconfirmed_spk_txids( + &chain, + chain.tip().block_id(), + .., + )?); let sync_response = client.sync(sync_request, 1).await?; assert!( sync_response @@ -122,10 +123,11 @@ pub async fn detect_receive_tx_cancel() -> anyhow::Result<()> { let sync_request = SyncRequest::builder() .chain_tip(chain.tip()) .revealed_spks_from_indexer(&graph.index, ..) - .check_unconfirmed_statuses( - &graph.index, - graph.graph().canonical_iter(&chain, chain.tip().block_id()), - ); + .expected_unconfirmed_spk_txids(graph.expected_unconfirmed_spk_txids( + &chain, + chain.tip().block_id(), + .., + )?); let sync_response = client.sync(sync_request, 1).await?; assert!( sync_response.tx_update.evicted.contains(&send_txid), diff --git a/crates/esplora/tests/blocking_ext.rs b/crates/esplora/tests/blocking_ext.rs index cfb8bc526..ac14968c0 100644 --- a/crates/esplora/tests/blocking_ext.rs +++ b/crates/esplora/tests/blocking_ext.rs @@ -94,10 +94,11 @@ pub fn detect_receive_tx_cancel() -> anyhow::Result<()> { let sync_request = SyncRequest::builder() .chain_tip(chain.tip()) .revealed_spks_from_indexer(&graph.index, ..) - .check_unconfirmed_statuses( - &graph.index, - graph.graph().canonical_iter(&chain, chain.tip().block_id()), - ); + .expected_unconfirmed_spk_txids(graph.expected_unconfirmed_spk_txids( + &chain, + chain.tip().block_id(), + .., + )?); let sync_response = client.sync(sync_request, 1)?; assert!( sync_response @@ -122,10 +123,11 @@ pub fn detect_receive_tx_cancel() -> anyhow::Result<()> { let sync_request = SyncRequest::builder() .chain_tip(chain.tip()) .revealed_spks_from_indexer(&graph.index, ..) - .check_unconfirmed_statuses( - &graph.index, - graph.graph().canonical_iter(&chain, chain.tip().block_id()), - ); + .expected_unconfirmed_spk_txids(graph.expected_unconfirmed_spk_txids( + &chain, + chain.tip().block_id(), + .., + )?); let sync_response = client.sync(sync_request, 1)?; assert!( sync_response.tx_update.evicted.contains(&send_txid), From dc452a1dcf4cec2bd7e028529d8bbd18c802e817 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Fri, 7 Feb 2025 20:00:38 +1100 Subject: [PATCH 15/16] docs(chain): Update `tx_graph` module docs * Remove duplicate paragraphs about `ChangeSet`s. * Add "Canonicalization" section which expands on methods that require canonicalization and the associated data types used in the canonicalization algorithm. --- crates/chain/src/tx_graph.rs | 51 ++++++++++++++++++++++++++++-------- 1 file changed, 40 insertions(+), 11 deletions(-) diff --git a/crates/chain/src/tx_graph.rs b/crates/chain/src/tx_graph.rs index bc1f200fb..7241a4454 100644 --- a/crates/chain/src/tx_graph.rs +++ b/crates/chain/src/tx_graph.rs @@ -16,23 +16,52 @@ //! documentation for more details), and the timestamp of the last time we saw the transaction as //! unconfirmed. //! -//! Conflicting transactions are allowed to coexist within a [`TxGraph`]. This is useful for -//! identifying and traversing conflicts and descendants of a given transaction. Some [`TxGraph`] -//! methods only consider transactions that are "canonical" (i.e., in the best chain or in mempool). -//! We decide which transactions are canonical based on the transaction's anchors and the -//! `last_seen` (as unconfirmed) timestamp. +//! # Canonicalization //! -//! The [`ChangeSet`] reports changes made to a [`TxGraph`]; it can be used to either save to -//! persistent storage, or to be applied to another [`TxGraph`]. +//! Conflicting transactions are allowed to coexist within a [`TxGraph`]. A process called +//! canonicalization is required to get a conflict-free history of transactions. +//! +//! * [`list_canonical_txs`](TxGraph::list_canonical_txs) lists canonical transactions. +//! * [`filter_chain_txouts`](TxGraph::filter_chain_txouts) filters out canonical outputs from a +//! list of outpoints. +//! * [`filter_chain_unspents`](TxGraph::filter_chain_unspents) filters out canonical unspent +//! outputs from a list of outpoints. +//! * [`balance`](TxGraph::balance) gets the total sum of unspent outputs filtered from a list of +//! outpoints. +//! * [`canonical_iter`](TxGraph::canonical_iter) returns the [`CanonicalIter`] which contains all +//! of the canonicalization logic. +//! +//! All these methods require a `chain` and `chain_tip` argument. The `chain` must be a +//! [`ChainOracle`] implementation (such as [`LocalChain`](crate::local_chain::LocalChain)) which +//! identifies which blocks exist under a given `chain_tip`. +//! +//! The canonicalization algorithm uses the following associated data to determine which +//! transactions have precedence over others: //! -//! Lastly, you can use [`TxAncestors`]/[`TxDescendants`] to traverse ancestors and descendants of -//! a given transaction, respectively. +//! * [`Anchor`] - This bit of data represents that a transaction is anchored in a given block. If +//! the transaction is anchored in chain of `chain_tip`, or is an ancestor of a transaction +//! anchored in chain of `chain_tip`, then the transaction must be canonical. +//! * `last_seen` - This is the timestamp of when a transaction is last-seen in the mempool. This +//! value is updated by [`insert_seen_at`](TxGraph::insert_seen_at) and +//! [`apply_update`](TxGraph::apply_update). Transactions that are seen later have higher +//! priority than those that are seen earlier. `last_seen` values are transitive. Meaning that +//! the actual `last_seen` value of a transaction is the max of all the `last_seen` values of +//! it's descendants. +//! * `last_evicted` - This is the timestamp of when a transaction is last-seen as evicted in the +//! mempool. If this value is equal to or higher than the transaction's `last_seen` value, then +//! it will not be considered canonical. +//! +//! # Graph traversal +//! +//! You can use [`TxAncestors`]/[`TxDescendants`] to traverse ancestors and descendants of a given +//! transaction, respectively. //! //! # Applying changes //! +//! The [`ChangeSet`] reports changes made to a [`TxGraph`]; it can be used to either save to +//! persistent storage, or to be applied to another [`TxGraph`]. +//! //! Methods that change the state of [`TxGraph`] will return [`ChangeSet`]s. -//! [`ChangeSet`]s can be applied back to a [`TxGraph`] or be used to inform persistent storage -//! of the changes to [`TxGraph`]. //! //! # Generics //! From 66fa256a3ac7b80da0ce33351081bf0b218327f3 Mon Sep 17 00:00:00 2001 From: Wei Chen Date: Thu, 13 Feb 2025 19:32:00 +0800 Subject: [PATCH 16/16] refactor(rpc)!: update `mempool` interface and test code --- crates/bitcoind_rpc/src/lib.rs | 57 ++++++++++++++++--- crates/bitcoind_rpc/tests/test_emitter.rs | 33 +++++++---- .../example_bitcoind_rpc_polling/src/main.rs | 25 +++++--- example-crates/example_wallet_rpc/src/main.rs | 6 +- 4 files changed, 91 insertions(+), 30 deletions(-) diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs index 3fa17ef19..1fc35718f 100644 --- a/crates/bitcoind_rpc/src/lib.rs +++ b/crates/bitcoind_rpc/src/lib.rs @@ -10,8 +10,9 @@ #![warn(missing_docs)] use bdk_core::{BlockId, CheckPoint}; -use bitcoin::{block::Header, Block, BlockHash, Transaction}; +use bitcoin::{block::Header, Block, BlockHash, Transaction, Txid}; use bitcoincore_rpc::bitcoincore_rpc_json; +use std::collections::HashSet; pub mod bip158; @@ -64,17 +65,19 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { } } - /// Emit mempool transactions, alongside their first-seen unix timestamps. + /// Emit mempool transactions and capture the initial snapshot of all mempool [`Txid`]s. /// - /// This method emits each transaction only once, unless we cannot guarantee the transaction's - /// ancestors are already emitted. + /// This method returns a [`MempoolEvent`] containing the full transactions (with their + /// first-seen unix timestamps) that were emitted, and the set of all [`Txid`]s present from the + /// initial mempool query. Each transaction is emitted only once, unless we cannot guarantee the + /// transaction's ancestors are already emitted. /// /// To understand why, consider a receiver which filters transactions based on whether it /// alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction spends a /// tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to block /// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block /// at height `h`. - pub fn mempool(&mut self) -> Result, bitcoincore_rpc::Error> { + pub fn mempool(&mut self) -> Result { let client = self.client; // This is the emitted tip height during the last mempool emission. @@ -91,8 +94,11 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { let prev_mempool_time = self.last_mempool_time; let mut latest_time = prev_mempool_time; - let txs_to_emit = client - .get_raw_mempool_verbose()? + // Get the raw mempool result from the RPC client. + let raw_mempool = client.get_raw_mempool_verbose()?; + let raw_mempool_txids = raw_mempool.keys().copied().collect::>(); + + let emitted_txs = raw_mempool .into_iter() .filter_map({ let latest_time = &mut latest_time; @@ -128,7 +134,11 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { self.last_mempool_time = latest_time; self.last_mempool_tip = Some(self.last_cp.height()); - Ok(txs_to_emit) + Ok(MempoolEvent { + emitted_txs, + raw_mempool_txids, + last_seen: latest_time as u64, + }) } /// Emit the next block height and header (if any). @@ -144,6 +154,37 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { } } +/// A new emission from mempool. +#[derive(Debug)] +pub struct MempoolEvent { + /// Emitted mempool transactions with their first‐seen unix timestamps. + pub emitted_txs: Vec<(Transaction, u64)>, + + /// Set of all [`Txid`]s from the raw mempool result, including transactions that may have been + /// confirmed or evicted during processing. This is used to determine which expected + /// transactions are missing. + pub raw_mempool_txids: HashSet, + + /// The latest first-seen epoch of emitted mempool transactions. + pub last_seen: u64, +} + +impl MempoolEvent { + /// Given an iterator of expected [`Txid`]s, return those that are missing from the mempool. + pub fn evicted_txids( + &self, + expected_unconfirmed_txids: impl IntoIterator, + ) -> HashSet { + let expected_set = expected_unconfirmed_txids + .into_iter() + .collect::>(); + expected_set + .difference(&self.raw_mempool_txids) + .copied() + .collect() + } +} + /// A newly emitted block from [`Emitter`]. #[derive(Debug)] pub struct BlockEvent { diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs index 839943c49..2448b6483 100644 --- a/crates/bitcoind_rpc/tests/test_emitter.rs +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -189,7 +189,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> { assert!(emitter.next_block()?.is_none()); let mempool_txs = emitter.mempool()?; - let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs); + let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs.emitted_txs); assert_eq!( indexed_additions .tx_graph @@ -437,6 +437,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> { // the first emission should include all transactions let emitted_txids = emitter .mempool()? + .emitted_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(); @@ -447,7 +448,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> { // second emission should be empty assert!( - emitter.mempool()?.is_empty(), + emitter.mempool()?.emitted_txs.is_empty(), "second emission should be empty" ); @@ -457,7 +458,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> { } while emitter.next_header()?.is_some() {} assert!( - emitter.mempool()?.is_empty(), + emitter.mempool()?.emitted_txs.is_empty(), "third emission, after chain tip is extended, should also be empty" ); @@ -506,6 +507,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<() assert_eq!( emitter .mempool()? + .emitted_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(), @@ -515,6 +517,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<() assert_eq!( emitter .mempool()? + .emitted_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(), @@ -535,6 +538,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<() .collect::>(); let emitted_txids = emitter .mempool()? + .emitted_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(); @@ -593,6 +597,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> { assert_eq!( emitter .mempool()? + .emitted_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(), @@ -628,6 +633,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> { // include mempool txs introduced at reorg height or greater let mempool = emitter .mempool()? + .emitted_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(); @@ -643,6 +649,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> { let mempool = emitter .mempool()? + .emitted_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(); @@ -766,7 +773,7 @@ fn test_expect_tx_evicted() -> anyhow::Result<()> { )?; let mut emitter = Emitter::new(env.rpc_client(), chain.tip(), 1); - let changeset = graph.batch_insert_unconfirmed(emitter.mempool()?); + let changeset = graph.batch_insert_unconfirmed(emitter.mempool()?.emitted_txs); assert!(changeset .tx_graph .txs @@ -806,14 +813,15 @@ fn test_expect_tx_evicted() -> anyhow::Result<()> { // Send the tx. let txid_2 = core.send_raw_transaction(&tx1b)?; - // We evict the expected txs that are missing from mempool. - let exp_txids = graph - .expected_unconfirmed_spk_txids(&chain, chain_tip, ..)? - .collect::>(); - assert_eq!(exp_txids, vec![(txid_1, spk)]); - let mempool = emitter - .mempool()? - .into_iter() + // Retrieve the expected unconfirmed txids and spks from the graph. + let exp_spk_txids = graph.expected_unconfirmed_spk_txids(&chain, chain_tip, ..)?; + assert_eq!(exp_spk_txids, vec![(txid_1, spk)]); + + // Check that mempool emission contains evicted txid. + let mempool_event = emitter.mempool()?; + let unseen_txids: Vec = mempool_event + .emitted_txs + .iter() .map(|(tx, _)| tx.compute_txid()) .collect(); assert!(unseen_txids.contains(&txid_2)); @@ -825,6 +833,7 @@ fn test_expect_tx_evicted() -> anyhow::Result<()> { let _ = graph.insert_evicted_at(txid, seen_at); } + // tx1 should no longer be canonical. assert!(graph .graph() .list_canonical_txs(&chain, chain_tip) diff --git a/example-crates/example_bitcoind_rpc_polling/src/main.rs b/example-crates/example_bitcoind_rpc_polling/src/main.rs index 95c547967..33e5281c6 100644 --- a/example-crates/example_bitcoind_rpc_polling/src/main.rs +++ b/example-crates/example_bitcoind_rpc_polling/src/main.rs @@ -11,10 +11,7 @@ use bdk_bitcoind_rpc::{ bitcoincore_rpc::{Auth, Client, RpcApi}, Emitter, }; -use bdk_chain::{ - bitcoin::{Block, Transaction}, - local_chain, Merge, -}; +use bdk_chain::{bitcoin::Block, local_chain, Merge}; use example_cli::{ anyhow, clap::{self, Args, Subcommand}, @@ -36,7 +33,7 @@ const DB_COMMIT_DELAY: Duration = Duration::from_secs(60); #[derive(Debug)] enum Emission { Block(bdk_bitcoind_rpc::BlockEvent), - Mempool(Vec<(Transaction, u64)>), + Mempool(bdk_bitcoind_rpc::MempoolEvent), Tip(u32), } @@ -204,7 +201,7 @@ fn main() -> anyhow::Result<()> { let graph_changeset = graph .lock() .unwrap() - .batch_insert_relevant_unconfirmed(mempool_txs); + .batch_insert_relevant_unconfirmed(mempool_txs.emitted_txs); { let db = &mut *db.lock().unwrap(); db_stage.merge(ChangeSet { @@ -287,7 +284,21 @@ fn main() -> anyhow::Result<()> { (chain_changeset, graph_changeset) } Emission::Mempool(mempool_txs) => { - let graph_changeset = graph.batch_insert_relevant_unconfirmed(mempool_txs); + let mut graph_changeset = graph + .batch_insert_relevant_unconfirmed(mempool_txs.emitted_txs.clone()); + let expected_txids = graph + .expected_unconfirmed_spk_txids( + &chain.clone(), + chain.tip().block_id(), + .., + )? + .into_iter() + .map(|(txid, _)| txid); + let evicted_txids = mempool_txs.evicted_txids(expected_txids); + for txid in evicted_txids { + graph_changeset + .merge(graph.insert_evicted_at(txid, mempool_txs.last_seen)); + } (local_chain::ChangeSet::default(), graph_changeset) } Emission::Tip(h) => { diff --git a/example-crates/example_wallet_rpc/src/main.rs b/example-crates/example_wallet_rpc/src/main.rs index 204224bcb..c3ac70f49 100644 --- a/example-crates/example_wallet_rpc/src/main.rs +++ b/example-crates/example_wallet_rpc/src/main.rs @@ -3,7 +3,7 @@ use bdk_bitcoind_rpc::{ Emitter, }; use bdk_wallet::{ - bitcoin::{Block, Network, Transaction}, + bitcoin::{Block, Network}, file_store::Store, KeychainKind, Wallet, }; @@ -73,7 +73,7 @@ impl Args { enum Emission { SigTerm, Block(bdk_bitcoind_rpc::BlockEvent), - Mempool(Vec<(Transaction, u64)>), + Mempool(bdk_bitcoind_rpc::MempoolEvent), } fn main() -> anyhow::Result<()> { @@ -157,7 +157,7 @@ fn main() -> anyhow::Result<()> { } Emission::Mempool(mempool_emission) => { let start_apply_mempool = Instant::now(); - wallet.apply_unconfirmed_txs(mempool_emission); + wallet.apply_unconfirmed_txs(mempool_emission.emitted_txs); wallet.persist(&mut db)?; println!( "Applied unconfirmed transactions in {}s",