From 9eb738b3a5b9f611e32bc4b52b37b44ef2313637 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Thu, 1 Sep 2022 11:15:38 +0800 Subject: [PATCH 1/4] Make clippy happy --- src/blockchain/any.rs | 2 +- src/blockchain/electrum.rs | 2 +- src/blockchain/rpc.rs | 8 ++++---- src/types.rs | 4 ++-- src/wallet/mod.rs | 2 +- src/wallet/signer.rs | 2 +- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/blockchain/any.rs b/src/blockchain/any.rs index 5ef1a3385..3138d0253 100644 --- a/src/blockchain/any.rs +++ b/src/blockchain/any.rs @@ -194,7 +194,7 @@ impl_from!(boxed rpc::RpcBlockchain, AnyBlockchain, Rpc, #[cfg(feature = "rpc")] /// ); /// # } /// ``` -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq)] +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)] #[serde(tag = "type", rename_all = "snake_case")] pub enum AnyBlockchainConfig { #[cfg(feature = "electrum")] diff --git a/src/blockchain/electrum.rs b/src/blockchain/electrum.rs index fdb10b470..54381241a 100644 --- a/src/blockchain/electrum.rs +++ b/src/blockchain/electrum.rs @@ -296,7 +296,7 @@ impl<'a, 'b, D: Database> TxCache<'a, 'b, D> { } /// Configuration for an [`ElectrumBlockchain`] -#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)] +#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)] pub struct ElectrumBlockchainConfig { /// URL of the Electrum server (such as ElectrumX, Esplora, BWT) may start with `ssl://` or `tcp://` and include a port /// diff --git a/src/blockchain/rpc.rs b/src/blockchain/rpc.rs index b2c64ba5a..a7909281a 100644 --- a/src/blockchain/rpc.rs +++ b/src/blockchain/rpc.rs @@ -77,7 +77,7 @@ impl Deref for RpcBlockchain { } /// RpcBlockchain configuration options -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct RpcConfig { /// The bitcoin node url pub url: String, @@ -96,7 +96,7 @@ pub struct RpcConfig { /// In general, BDK tries to sync `scriptPubKey`s cached in [`crate::database::Database`] with /// `scriptPubKey`s imported in the Bitcoin Core Wallet. These parameters are used for determining /// how the `importdescriptors` RPC calls are to be made. -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct RpcSyncParams { /// The minimum number of scripts to scan for on initial sync. pub start_script_count: usize, @@ -373,7 +373,7 @@ impl<'a, D: BatchDatabase> DbState<'a, D> { /// Sync states of [BatchDatabase] and Core wallet. /// First we import all `scriptPubKey`s from database into core wallet - fn sync_with_core(&mut self, client: &Client, is_descriptor: bool) -> Result<&mut Self, Error> { + fn sync_with_core(&mut self, client: &Client, use_desc: bool) -> Result<&mut Self, Error> { // this tells Core wallet where to sync from for imported scripts let start_epoch = if self.params.force_start_time { self.params.start_time @@ -385,7 +385,7 @@ impl<'a, D: BatchDatabase> DbState<'a, D> { // sync scriptPubKeys from Database to Core wallet let scripts_iter = self.ext_spks.iter().chain(&self.int_spks); - if is_descriptor { + if use_desc { import_descriptors(client, start_epoch, scripts_iter)?; } else { import_multi(client, start_epoch, scripts_iter)?; diff --git a/src/types.rs b/src/types.rs index bae86477f..d4bda5890 100644 --- a/src/types.rs +++ b/src/types.rs @@ -166,7 +166,7 @@ pub struct LocalUtxo { } /// A [`Utxo`] with its `satisfaction_weight`. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct WeightedUtxo { /// The weight of the witness data and `scriptSig` expressed in [weight units]. This is used to /// properly maintain the feerate when adding this input to a transaction during coin selection. @@ -177,7 +177,7 @@ pub struct WeightedUtxo { pub utxo: Utxo, } -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq)] /// An unspent transaction output (UTXO). pub enum Utxo { /// A UTXO owned by the local wallet. diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 3490fdfc3..68ac71619 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -143,7 +143,7 @@ pub enum AddressIndex { /// A derived address and the index it was found at /// For convenience this automatically derefs to `Address` -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Eq)] pub struct AddressInfo { /// Child index of this address pub index: u32, diff --git a/src/wallet/signer.rs b/src/wallet/signer.rs index 1704a9532..01458510b 100644 --- a/src/wallet/signer.rs +++ b/src/wallet/signer.rs @@ -720,7 +720,7 @@ pub struct SignOptions { } /// Customize which taproot script-path leaves the signer should sign. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum TapLeavesOptions { /// The signer will sign all the leaves it has a key for. All, From 6fd25bd86749b14cc8b4ea54b9002e25a4bd244b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Thu, 1 Sep 2022 11:30:33 +0800 Subject: [PATCH 2/4] Improve `RpcSyncParams` documentation --- src/blockchain/rpc.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/blockchain/rpc.rs b/src/blockchain/rpc.rs index a7909281a..7fad78876 100644 --- a/src/blockchain/rpc.rs +++ b/src/blockchain/rpc.rs @@ -98,11 +98,12 @@ pub struct RpcConfig { /// how the `importdescriptors` RPC calls are to be made. #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct RpcSyncParams { - /// The minimum number of scripts to scan for on initial sync. + /// The minimum number of scripts to scan for on the first sync. pub start_script_count: usize, /// Time in unix seconds in which initial sync will start scanning from (0 to start from genesis). pub start_time: u64, - /// Forces every sync to use `start_time` as import timestamp. + /// Forces every sync to use [`crate::database::SyncTime`] as import timestamp. The default + /// behavior is to use the last `sync_time` as the import timestamp. pub force_start_time: bool, /// RPC poll rate (in seconds) to get state updates. pub poll_rate_sec: u64, From 53198168816367c7d25f41b6b4a48f9778ab3892 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Thu, 1 Sep 2022 15:31:01 +0800 Subject: [PATCH 3/4] Introduce `RpcImportParams` to make rpc sync more efficient `RpcImportParams` keeps track of the scriptPubKey derivation index to start from for the next call to `importdescripts`/`importmulti`, thus avoiding re-importing into Bitcoin Core. --- src/blockchain/compact_filters/mod.rs | 4 +- src/blockchain/esplora/mod.rs | 2 +- src/blockchain/rpc.rs | 111 ++++++++++++++++++++++---- src/testutils/blockchain_tests.rs | 6 ++ 4 files changed, 105 insertions(+), 18 deletions(-) diff --git a/src/blockchain/compact_filters/mod.rs b/src/blockchain/compact_filters/mod.rs index 7ca78a2c3..9b47df9cf 100644 --- a/src/blockchain/compact_filters/mod.rs +++ b/src/blockchain/compact_filters/mod.rs @@ -479,7 +479,7 @@ impl WalletSync for CompactFiltersBlockchain { } /// Data to connect to a Bitcoin P2P peer -#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)] +#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)] pub struct BitcoinPeerConfig { /// Peer address such as 127.0.0.1:18333 pub address: String, @@ -490,7 +490,7 @@ pub struct BitcoinPeerConfig { } /// Configuration for a [`CompactFiltersBlockchain`] -#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)] +#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)] pub struct CompactFiltersBlockchainConfig { /// List of peers to try to connect to for asking headers and filters pub peers: Vec, diff --git a/src/blockchain/esplora/mod.rs b/src/blockchain/esplora/mod.rs index 30d29d647..eecd8dfe8 100644 --- a/src/blockchain/esplora/mod.rs +++ b/src/blockchain/esplora/mod.rs @@ -97,7 +97,7 @@ impl fmt::Display for EsploraError { } /// Configuration for an [`EsploraBlockchain`] -#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)] +#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)] pub struct EsploraBlockchainConfig { /// Base URL of the esplora service /// diff --git a/src/blockchain/rpc.rs b/src/blockchain/rpc.rs index 7fad78876..4c3538e63 100644 --- a/src/blockchain/rpc.rs +++ b/src/blockchain/rpc.rs @@ -27,6 +27,7 @@ //! network: bdk::bitcoin::Network::Testnet, //! wallet_name: "wallet_name".to_string(), //! sync_params: None, +//! import_params: None, //! }; //! let blockchain = RpcBlockchain::from_config(&config); //! ``` @@ -47,8 +48,9 @@ use bitcoincore_rpc::json::{ use bitcoincore_rpc::jsonrpc::serde_json::{json, Value}; use bitcoincore_rpc::Auth as RpcAuth; use bitcoincore_rpc::{Client, RpcApi}; -use log::{debug, info}; +use log::{debug, info, warn}; use serde::{Deserialize, Serialize}; +use std::cell::RefCell; use std::collections::{HashMap, HashSet}; use std::ops::Deref; use std::path::PathBuf; @@ -66,6 +68,8 @@ pub struct RpcBlockchain { capabilities: HashSet, /// Sync parameters. sync_params: RpcSyncParams, + /// Import parameters. + import_params: RefCell, } impl Deref for RpcBlockchain { @@ -89,6 +93,10 @@ pub struct RpcConfig { pub wallet_name: String, /// Sync parameters pub sync_params: Option, + /// Initial `scriptPubKey` import parameters. [`RpcImportParams`] will be mutated in + /// [`RpcBlockchain`] for every sync to reflect what was imported. The updated params can be + /// obtained via [`RpcBlockchain::import_parameters`]. + pub import_params: Option, } /// Sync parameters for Bitcoin Core RPC. @@ -120,6 +128,18 @@ impl Default for RpcSyncParams { } } +/// `scriptPubKey` import parameters for Bitcoin Core RPC. +/// +/// This defines which derivation index(es) to start importing from so that BDK can avoid +/// re-importing `scriptPubKey`s into the Core wallet. +#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] +pub struct RpcImportParams { + /// External derivation index to start import from. + pub external_start_index: usize, + /// Internal derivation index to start import from. + pub internal_start_index: usize, +} + /// This struct is equivalent to [bitcoincore_rpc::Auth] but it implements [serde::Serialize] /// To be removed once upstream equivalent is implementing Serialize (json serialization format /// should be the same), see [rust-bitcoincore-rpc/pull/181](https://github.com/rust-bitcoin/rust-bitcoincore-rpc/pull/181) @@ -153,6 +173,18 @@ impl From for RpcAuth { } } +impl RpcBlockchain { + /// Returns the current import parameters. + pub fn get_import_parameters(&self) -> RpcImportParams { + *self.import_params.borrow() + } + + /// Replaces the import parameters and returns the old value. + pub fn replace_import_parameters(&self, params: RpcImportParams) -> RpcImportParams { + self.import_params.replace(params) + } +} + impl Blockchain for RpcBlockchain { fn get_capabilities(&self) -> HashSet { self.capabilities.clone() @@ -198,7 +230,11 @@ impl WalletSync for RpcBlockchain { D: BatchDatabase, { let batch = DbState::new(db, &self.sync_params, &*prog)? - .sync_with_core(&self.client, self.is_descriptors)? + .sync_with_core( + &self.client, + &mut *self.import_params.borrow_mut(), + self.is_descriptors, + )? .as_db_batch()?; db.commit_batch(batch) @@ -274,6 +310,7 @@ impl ConfigurableBlockchain for RpcBlockchain { capabilities, is_descriptors, sync_params: config.sync_params.clone().unwrap_or_default(), + import_params: RefCell::new(config.import_params.unwrap_or_default()), }) } } @@ -314,7 +351,11 @@ struct DbState<'a, D> { impl<'a, D: BatchDatabase> DbState<'a, D> { /// Obtain [DbState] from [crate::database::Database]. - fn new(db: &'a D, params: &'a RpcSyncParams, prog: &'a dyn Progress) -> Result { + fn new( + db: &'a D, + sync_params: &'a RpcSyncParams, + prog: &'a dyn Progress, + ) -> Result { let ext_spks = db.iter_script_pubkeys(Some(KeychainKind::External))?; let int_spks = db.iter_script_pubkeys(Some(KeychainKind::Internal))?; @@ -325,10 +366,10 @@ impl<'a, D: BatchDatabase> DbState<'a, D> { // If at least one descriptor is derivable, we need to ensure scriptPubKeys are sufficiently // cached. - if has_derivable && last_count < params.start_script_count { + if has_derivable && last_count < sync_params.start_script_count { let inner_err = MissingCachedScripts { last_count, - missing_count: params.start_script_count - last_count, + missing_count: sync_params.start_script_count - last_count, }; debug!("requesting more spks with: {:?}", inner_err); return Err(Error::MissingCachedScripts(inner_err)); @@ -359,7 +400,7 @@ impl<'a, D: BatchDatabase> DbState<'a, D> { Ok(Self { db, - params, + params: sync_params, prog, ext_spks, int_spks, @@ -374,7 +415,12 @@ impl<'a, D: BatchDatabase> DbState<'a, D> { /// Sync states of [BatchDatabase] and Core wallet. /// First we import all `scriptPubKey`s from database into core wallet - fn sync_with_core(&mut self, client: &Client, use_desc: bool) -> Result<&mut Self, Error> { + fn sync_with_core( + &mut self, + client: &Client, + import_params: &mut RpcImportParams, + use_desc: bool, + ) -> Result<&mut Self, Error> { // this tells Core wallet where to sync from for imported scripts let start_epoch = if self.params.force_start_time { self.params.start_time @@ -384,23 +430,55 @@ impl<'a, D: BatchDatabase> DbState<'a, D> { .map_or(self.params.start_time, |st| st.block_time.timestamp) }; - // sync scriptPubKeys from Database to Core wallet - let scripts_iter = self.ext_spks.iter().chain(&self.int_spks); - if use_desc { - import_descriptors(client, start_epoch, scripts_iter)?; - } else { - import_multi(client, start_epoch, scripts_iter)?; + // sync scriptPubKeys from Database into Core wallet, starting from derivation indexes + // defined in `import_params` + let (scripts_count, scripts_iter) = { + let ext_spks = self + .ext_spks + .iter() + .skip(import_params.external_start_index); + let int_spks = self + .int_spks + .iter() + .skip(import_params.internal_start_index); + + let scripts_count = ext_spks.len() + int_spks.len(); + let scripts_iter = ext_spks.chain(int_spks); + println!("scripts count: {}", scripts_count); + + (scripts_count, scripts_iter) + }; + + if scripts_count > 0 { + if use_desc { + import_descriptors(client, start_epoch, scripts_iter)?; + } else { + import_multi(client, start_epoch, scripts_iter)?; + } } - // wait for Core wallet to rescan (TODO: maybe make this async) await_wallet_scan(client, self.params.poll_rate_sec, self.prog)?; + // update import_params + import_params.external_start_index = self.ext_spks.len(); + import_params.internal_start_index = self.int_spks.len(); + // obtain iterator of pagenated `listtransactions` RPC calls const LIST_TX_PAGE_SIZE: usize = 100; // item count per page let tx_iter = list_transactions(client, LIST_TX_PAGE_SIZE)?.filter(|item| { // filter out conflicting transactions - only accept transactions that are already // confirmed, or exists in mempool - item.info.confirmations > 0 || client.get_mempool_entry(&item.info.txid).is_ok() + let confirmed = item.info.confirmations > 0; + let in_mempool = client.get_mempool_entry(&item.info.txid).is_ok(); + + let keep = confirmed || in_mempool; + if !keep { + warn!( + "transaction {} is skipped: confirmed={}, in_mempool={}", + item.info.txid, confirmed, in_mempool + ); + } + keep }); // iterate through chronological results of `listtransactions` @@ -865,6 +943,8 @@ impl BlockchainFactory for RpcBlockchainFactory { checksum ), sync_params: self.sync_params.clone(), + // TODO @evanlinjin: How can be set this individually for each build? + import_params: Default::default(), }) } } @@ -892,6 +972,7 @@ mod test { network: Network::Regtest, wallet_name: format!("client-wallet-test-{}", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos() ), sync_params: None, + import_params: None, }; RpcBlockchain::from_config(&config).unwrap() } diff --git a/src/testutils/blockchain_tests.rs b/src/testutils/blockchain_tests.rs index a3d7c2b17..2c18ac7f3 100644 --- a/src/testutils/blockchain_tests.rs +++ b/src/testutils/blockchain_tests.rs @@ -755,6 +755,12 @@ macro_rules! bdk_blockchain_tests { blockchain.broadcast(&tx1).expect("broadcasting first"); blockchain.broadcast(&tx2).expect("broadcasting replacement"); + + // TODO @evanlinjin: Core's `listtransactions` RPC call does not return conflicting + // unconfirmed transactions (unless we re-import associated scriptPubKey/descriptor) + #[cfg(feature = "rpc")] + blockchain.replace_import_parameters(Default::default()); + receiver_wallet.sync(&blockchain, SyncOptions::default()).expect("syncing receiver"); assert_eq!(receiver_wallet.get_balance().expect("balance").untrusted_pending, 49_000, "should have received coins once and only once"); } From 83edbf0388f038f14b35e39115ccaa68dc09db28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Thu, 1 Sep 2022 17:10:37 +0800 Subject: [PATCH 4/4] Introduce pagenated import logic to `RpcBlockchain` * Add `RpcSyncParams::page_size` that restricts req/resp array count for various RPC calls. * Add `pagenated_import` function. --- src/blockchain/rpc.rs | 70 ++++++++++++++++++++++++++++++------------- 1 file changed, 50 insertions(+), 20 deletions(-) diff --git a/src/blockchain/rpc.rs b/src/blockchain/rpc.rs index 4c3538e63..eca63cd89 100644 --- a/src/blockchain/rpc.rs +++ b/src/blockchain/rpc.rs @@ -115,6 +115,8 @@ pub struct RpcSyncParams { pub force_start_time: bool, /// RPC poll rate (in seconds) to get state updates. pub poll_rate_sec: u64, + /// Page size for RPC calls (`importdescriptors`, `importmulti` and `listtransactions`). + pub page_size: usize, } impl Default for RpcSyncParams { @@ -123,7 +125,8 @@ impl Default for RpcSyncParams { start_script_count: 100, start_time: 0, force_start_time: false, - poll_rate_sec: 3, + poll_rate_sec: 2, + page_size: 200, } } } @@ -432,7 +435,7 @@ impl<'a, D: BatchDatabase> DbState<'a, D> { // sync scriptPubKeys from Database into Core wallet, starting from derivation indexes // defined in `import_params` - let (scripts_count, scripts_iter) = { + let scripts_iter = { let ext_spks = self .ext_spks .iter() @@ -441,31 +444,25 @@ impl<'a, D: BatchDatabase> DbState<'a, D> { .int_spks .iter() .skip(import_params.internal_start_index); - - let scripts_count = ext_spks.len() + int_spks.len(); - let scripts_iter = ext_spks.chain(int_spks); - println!("scripts count: {}", scripts_count); - - (scripts_count, scripts_iter) + ext_spks.chain(int_spks) }; - - if scripts_count > 0 { - if use_desc { - import_descriptors(client, start_epoch, scripts_iter)?; - } else { - import_multi(client, start_epoch, scripts_iter)?; - } - } - - await_wallet_scan(client, self.params.poll_rate_sec, self.prog)?; + paginated_import( + client, + use_desc, + start_epoch, + self.params.poll_rate_sec, + self.params.page_size, + scripts_iter, + self.prog, + )?; + // await_wallet_scan(client, self.params.poll_rate_sec, self.prog)?; // update import_params import_params.external_start_index = self.ext_spks.len(); import_params.internal_start_index = self.int_spks.len(); // obtain iterator of pagenated `listtransactions` RPC calls - const LIST_TX_PAGE_SIZE: usize = 100; // item count per page - let tx_iter = list_transactions(client, LIST_TX_PAGE_SIZE)?.filter(|item| { + let tx_iter = list_transactions(client, self.params.page_size)?.filter(|item| { // filter out conflicting transactions - only accept transactions that are already // confirmed, or exists in mempool let confirmed = item.info.confirmations > 0; @@ -789,6 +786,39 @@ where Ok(()) } +fn paginated_import<'a, S>( + client: &Client, + use_desc: bool, + start_epoch: u64, + poll_rate_sec: u64, + page_size: usize, + scripts_iter: S, + progress: &dyn Progress, +) -> Result<(), Error> +where + S: Iterator + Clone, +{ + (0_usize..) + .step_by(page_size) + .map(|page_start| { + scripts_iter + .clone() + .skip(page_start) + .take(page_size) + .cloned() + .collect::>() + }) + .take_while(|scripts| !scripts.is_empty()) + .try_for_each(|scripts| { + if use_desc { + import_descriptors(client, start_epoch, scripts.iter())?; + } else { + import_multi(client, start_epoch, scripts.iter())?; + } + await_wallet_scan(client, poll_rate_sec, progress) + }) +} + /// Calls the `listtransactions` RPC method in `page_size`s and returns iterator of the tx results /// in chronological order. ///