diff --git a/src/blockchain/any.rs b/src/blockchain/any.rs index 5ef1a3385..3138d0253 100644 --- a/src/blockchain/any.rs +++ b/src/blockchain/any.rs @@ -194,7 +194,7 @@ impl_from!(boxed rpc::RpcBlockchain, AnyBlockchain, Rpc, #[cfg(feature = "rpc")] /// ); /// # } /// ``` -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq)] +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)] #[serde(tag = "type", rename_all = "snake_case")] pub enum AnyBlockchainConfig { #[cfg(feature = "electrum")] diff --git a/src/blockchain/compact_filters/mod.rs b/src/blockchain/compact_filters/mod.rs index 7ca78a2c3..9b47df9cf 100644 --- a/src/blockchain/compact_filters/mod.rs +++ b/src/blockchain/compact_filters/mod.rs @@ -479,7 +479,7 @@ impl WalletSync for CompactFiltersBlockchain { } /// Data to connect to a Bitcoin P2P peer -#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)] +#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)] pub struct BitcoinPeerConfig { /// Peer address such as 127.0.0.1:18333 pub address: String, @@ -490,7 +490,7 @@ pub struct BitcoinPeerConfig { } /// Configuration for a [`CompactFiltersBlockchain`] -#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)] +#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)] pub struct CompactFiltersBlockchainConfig { /// List of peers to try to connect to for asking headers and filters pub peers: Vec, diff --git a/src/blockchain/electrum.rs b/src/blockchain/electrum.rs index fdb10b470..54381241a 100644 --- a/src/blockchain/electrum.rs +++ b/src/blockchain/electrum.rs @@ -296,7 +296,7 @@ impl<'a, 'b, D: Database> TxCache<'a, 'b, D> { } /// Configuration for an [`ElectrumBlockchain`] -#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)] +#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)] pub struct ElectrumBlockchainConfig { /// URL of the Electrum server (such as ElectrumX, Esplora, BWT) may start with `ssl://` or `tcp://` and include a port /// diff --git a/src/blockchain/esplora/mod.rs b/src/blockchain/esplora/mod.rs index 30d29d647..eecd8dfe8 100644 --- a/src/blockchain/esplora/mod.rs +++ b/src/blockchain/esplora/mod.rs @@ -97,7 +97,7 @@ impl fmt::Display for EsploraError { } /// Configuration for an [`EsploraBlockchain`] -#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)] +#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)] pub struct EsploraBlockchainConfig { /// Base URL of the esplora service /// diff --git a/src/blockchain/rpc.rs b/src/blockchain/rpc.rs index b2c64ba5a..eca63cd89 100644 --- a/src/blockchain/rpc.rs +++ b/src/blockchain/rpc.rs @@ -27,6 +27,7 @@ //! network: bdk::bitcoin::Network::Testnet, //! wallet_name: "wallet_name".to_string(), //! sync_params: None, +//! import_params: None, //! }; //! let blockchain = RpcBlockchain::from_config(&config); //! ``` @@ -47,8 +48,9 @@ use bitcoincore_rpc::json::{ use bitcoincore_rpc::jsonrpc::serde_json::{json, Value}; use bitcoincore_rpc::Auth as RpcAuth; use bitcoincore_rpc::{Client, RpcApi}; -use log::{debug, info}; +use log::{debug, info, warn}; use serde::{Deserialize, Serialize}; +use std::cell::RefCell; use std::collections::{HashMap, HashSet}; use std::ops::Deref; use std::path::PathBuf; @@ -66,6 +68,8 @@ pub struct RpcBlockchain { capabilities: HashSet, /// Sync parameters. sync_params: RpcSyncParams, + /// Import parameters. + import_params: RefCell, } impl Deref for RpcBlockchain { @@ -77,7 +81,7 @@ impl Deref for RpcBlockchain { } /// RpcBlockchain configuration options -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct RpcConfig { /// The bitcoin node url pub url: String, @@ -89,6 +93,10 @@ pub struct RpcConfig { pub wallet_name: String, /// Sync parameters pub sync_params: Option, + /// Initial `scriptPubKey` import parameters. [`RpcImportParams`] will be mutated in + /// [`RpcBlockchain`] for every sync to reflect what was imported. The updated params can be + /// obtained via [`RpcBlockchain::import_parameters`]. + pub import_params: Option, } /// Sync parameters for Bitcoin Core RPC. @@ -96,16 +104,19 @@ pub struct RpcConfig { /// In general, BDK tries to sync `scriptPubKey`s cached in [`crate::database::Database`] with /// `scriptPubKey`s imported in the Bitcoin Core Wallet. These parameters are used for determining /// how the `importdescriptors` RPC calls are to be made. -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct RpcSyncParams { - /// The minimum number of scripts to scan for on initial sync. + /// The minimum number of scripts to scan for on the first sync. pub start_script_count: usize, /// Time in unix seconds in which initial sync will start scanning from (0 to start from genesis). pub start_time: u64, - /// Forces every sync to use `start_time` as import timestamp. + /// Forces every sync to use [`crate::database::SyncTime`] as import timestamp. The default + /// behavior is to use the last `sync_time` as the import timestamp. pub force_start_time: bool, /// RPC poll rate (in seconds) to get state updates. pub poll_rate_sec: u64, + /// Page size for RPC calls (`importdescriptors`, `importmulti` and `listtransactions`). + pub page_size: usize, } impl Default for RpcSyncParams { @@ -114,11 +125,24 @@ impl Default for RpcSyncParams { start_script_count: 100, start_time: 0, force_start_time: false, - poll_rate_sec: 3, + poll_rate_sec: 2, + page_size: 200, } } } +/// `scriptPubKey` import parameters for Bitcoin Core RPC. +/// +/// This defines which derivation index(es) to start importing from so that BDK can avoid +/// re-importing `scriptPubKey`s into the Core wallet. +#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] +pub struct RpcImportParams { + /// External derivation index to start import from. + pub external_start_index: usize, + /// Internal derivation index to start import from. + pub internal_start_index: usize, +} + /// This struct is equivalent to [bitcoincore_rpc::Auth] but it implements [serde::Serialize] /// To be removed once upstream equivalent is implementing Serialize (json serialization format /// should be the same), see [rust-bitcoincore-rpc/pull/181](https://github.com/rust-bitcoin/rust-bitcoincore-rpc/pull/181) @@ -152,6 +176,18 @@ impl From for RpcAuth { } } +impl RpcBlockchain { + /// Returns the current import parameters. + pub fn get_import_parameters(&self) -> RpcImportParams { + *self.import_params.borrow() + } + + /// Replaces the import parameters and returns the old value. + pub fn replace_import_parameters(&self, params: RpcImportParams) -> RpcImportParams { + self.import_params.replace(params) + } +} + impl Blockchain for RpcBlockchain { fn get_capabilities(&self) -> HashSet { self.capabilities.clone() @@ -197,7 +233,11 @@ impl WalletSync for RpcBlockchain { D: BatchDatabase, { let batch = DbState::new(db, &self.sync_params, &*prog)? - .sync_with_core(&self.client, self.is_descriptors)? + .sync_with_core( + &self.client, + &mut *self.import_params.borrow_mut(), + self.is_descriptors, + )? .as_db_batch()?; db.commit_batch(batch) @@ -273,6 +313,7 @@ impl ConfigurableBlockchain for RpcBlockchain { capabilities, is_descriptors, sync_params: config.sync_params.clone().unwrap_or_default(), + import_params: RefCell::new(config.import_params.unwrap_or_default()), }) } } @@ -313,7 +354,11 @@ struct DbState<'a, D> { impl<'a, D: BatchDatabase> DbState<'a, D> { /// Obtain [DbState] from [crate::database::Database]. - fn new(db: &'a D, params: &'a RpcSyncParams, prog: &'a dyn Progress) -> Result { + fn new( + db: &'a D, + sync_params: &'a RpcSyncParams, + prog: &'a dyn Progress, + ) -> Result { let ext_spks = db.iter_script_pubkeys(Some(KeychainKind::External))?; let int_spks = db.iter_script_pubkeys(Some(KeychainKind::Internal))?; @@ -324,10 +369,10 @@ impl<'a, D: BatchDatabase> DbState<'a, D> { // If at least one descriptor is derivable, we need to ensure scriptPubKeys are sufficiently // cached. - if has_derivable && last_count < params.start_script_count { + if has_derivable && last_count < sync_params.start_script_count { let inner_err = MissingCachedScripts { last_count, - missing_count: params.start_script_count - last_count, + missing_count: sync_params.start_script_count - last_count, }; debug!("requesting more spks with: {:?}", inner_err); return Err(Error::MissingCachedScripts(inner_err)); @@ -358,7 +403,7 @@ impl<'a, D: BatchDatabase> DbState<'a, D> { Ok(Self { db, - params, + params: sync_params, prog, ext_spks, int_spks, @@ -373,7 +418,12 @@ impl<'a, D: BatchDatabase> DbState<'a, D> { /// Sync states of [BatchDatabase] and Core wallet. /// First we import all `scriptPubKey`s from database into core wallet - fn sync_with_core(&mut self, client: &Client, is_descriptor: bool) -> Result<&mut Self, Error> { + fn sync_with_core( + &mut self, + client: &Client, + import_params: &mut RpcImportParams, + use_desc: bool, + ) -> Result<&mut Self, Error> { // this tells Core wallet where to sync from for imported scripts let start_epoch = if self.params.force_start_time { self.params.start_time @@ -383,23 +433,49 @@ impl<'a, D: BatchDatabase> DbState<'a, D> { .map_or(self.params.start_time, |st| st.block_time.timestamp) }; - // sync scriptPubKeys from Database to Core wallet - let scripts_iter = self.ext_spks.iter().chain(&self.int_spks); - if is_descriptor { - import_descriptors(client, start_epoch, scripts_iter)?; - } else { - import_multi(client, start_epoch, scripts_iter)?; - } - - // wait for Core wallet to rescan (TODO: maybe make this async) - await_wallet_scan(client, self.params.poll_rate_sec, self.prog)?; + // sync scriptPubKeys from Database into Core wallet, starting from derivation indexes + // defined in `import_params` + let scripts_iter = { + let ext_spks = self + .ext_spks + .iter() + .skip(import_params.external_start_index); + let int_spks = self + .int_spks + .iter() + .skip(import_params.internal_start_index); + ext_spks.chain(int_spks) + }; + paginated_import( + client, + use_desc, + start_epoch, + self.params.poll_rate_sec, + self.params.page_size, + scripts_iter, + self.prog, + )?; + // await_wallet_scan(client, self.params.poll_rate_sec, self.prog)?; + + // update import_params + import_params.external_start_index = self.ext_spks.len(); + import_params.internal_start_index = self.int_spks.len(); // obtain iterator of pagenated `listtransactions` RPC calls - const LIST_TX_PAGE_SIZE: usize = 100; // item count per page - let tx_iter = list_transactions(client, LIST_TX_PAGE_SIZE)?.filter(|item| { + let tx_iter = list_transactions(client, self.params.page_size)?.filter(|item| { // filter out conflicting transactions - only accept transactions that are already // confirmed, or exists in mempool - item.info.confirmations > 0 || client.get_mempool_entry(&item.info.txid).is_ok() + let confirmed = item.info.confirmations > 0; + let in_mempool = client.get_mempool_entry(&item.info.txid).is_ok(); + + let keep = confirmed || in_mempool; + if !keep { + warn!( + "transaction {} is skipped: confirmed={}, in_mempool={}", + item.info.txid, confirmed, in_mempool + ); + } + keep }); // iterate through chronological results of `listtransactions` @@ -710,6 +786,39 @@ where Ok(()) } +fn paginated_import<'a, S>( + client: &Client, + use_desc: bool, + start_epoch: u64, + poll_rate_sec: u64, + page_size: usize, + scripts_iter: S, + progress: &dyn Progress, +) -> Result<(), Error> +where + S: Iterator + Clone, +{ + (0_usize..) + .step_by(page_size) + .map(|page_start| { + scripts_iter + .clone() + .skip(page_start) + .take(page_size) + .cloned() + .collect::>() + }) + .take_while(|scripts| !scripts.is_empty()) + .try_for_each(|scripts| { + if use_desc { + import_descriptors(client, start_epoch, scripts.iter())?; + } else { + import_multi(client, start_epoch, scripts.iter())?; + } + await_wallet_scan(client, poll_rate_sec, progress) + }) +} + /// Calls the `listtransactions` RPC method in `page_size`s and returns iterator of the tx results /// in chronological order. /// @@ -864,6 +973,8 @@ impl BlockchainFactory for RpcBlockchainFactory { checksum ), sync_params: self.sync_params.clone(), + // TODO @evanlinjin: How can be set this individually for each build? + import_params: Default::default(), }) } } @@ -891,6 +1002,7 @@ mod test { network: Network::Regtest, wallet_name: format!("client-wallet-test-{}", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos() ), sync_params: None, + import_params: None, }; RpcBlockchain::from_config(&config).unwrap() } diff --git a/src/testutils/blockchain_tests.rs b/src/testutils/blockchain_tests.rs index a3d7c2b17..2c18ac7f3 100644 --- a/src/testutils/blockchain_tests.rs +++ b/src/testutils/blockchain_tests.rs @@ -755,6 +755,12 @@ macro_rules! bdk_blockchain_tests { blockchain.broadcast(&tx1).expect("broadcasting first"); blockchain.broadcast(&tx2).expect("broadcasting replacement"); + + // TODO @evanlinjin: Core's `listtransactions` RPC call does not return conflicting + // unconfirmed transactions (unless we re-import associated scriptPubKey/descriptor) + #[cfg(feature = "rpc")] + blockchain.replace_import_parameters(Default::default()); + receiver_wallet.sync(&blockchain, SyncOptions::default()).expect("syncing receiver"); assert_eq!(receiver_wallet.get_balance().expect("balance").untrusted_pending, 49_000, "should have received coins once and only once"); } diff --git a/src/types.rs b/src/types.rs index bae86477f..d4bda5890 100644 --- a/src/types.rs +++ b/src/types.rs @@ -166,7 +166,7 @@ pub struct LocalUtxo { } /// A [`Utxo`] with its `satisfaction_weight`. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct WeightedUtxo { /// The weight of the witness data and `scriptSig` expressed in [weight units]. This is used to /// properly maintain the feerate when adding this input to a transaction during coin selection. @@ -177,7 +177,7 @@ pub struct WeightedUtxo { pub utxo: Utxo, } -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq)] /// An unspent transaction output (UTXO). pub enum Utxo { /// A UTXO owned by the local wallet. diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 3490fdfc3..68ac71619 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -143,7 +143,7 @@ pub enum AddressIndex { /// A derived address and the index it was found at /// For convenience this automatically derefs to `Address` -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Eq)] pub struct AddressInfo { /// Child index of this address pub index: u32, diff --git a/src/wallet/signer.rs b/src/wallet/signer.rs index 1704a9532..01458510b 100644 --- a/src/wallet/signer.rs +++ b/src/wallet/signer.rs @@ -720,7 +720,7 @@ pub struct SignOptions { } /// Customize which taproot script-path leaves the signer should sign. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum TapLeavesOptions { /// The signer will sign all the leaves it has a key for. All,