Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/blockchain/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl_from!(boxed rpc::RpcBlockchain, AnyBlockchain, Rpc, #[cfg(feature = "rpc")]
/// );
/// # }
/// ```
#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq)]
#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum AnyBlockchainConfig {
#[cfg(feature = "electrum")]
Expand Down
4 changes: 2 additions & 2 deletions src/blockchain/compact_filters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ impl WalletSync for CompactFiltersBlockchain {
}

/// Data to connect to a Bitcoin P2P peer
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)]
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)]
pub struct BitcoinPeerConfig {
/// Peer address such as 127.0.0.1:18333
pub address: String,
Expand All @@ -490,7 +490,7 @@ pub struct BitcoinPeerConfig {
}

/// Configuration for a [`CompactFiltersBlockchain`]
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)]
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)]
pub struct CompactFiltersBlockchainConfig {
/// List of peers to try to connect to for asking headers and filters
pub peers: Vec<BitcoinPeerConfig>,
Expand Down
2 changes: 1 addition & 1 deletion src/blockchain/electrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ impl<'a, 'b, D: Database> TxCache<'a, 'b, D> {
}

/// Configuration for an [`ElectrumBlockchain`]
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)]
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)]
pub struct ElectrumBlockchainConfig {
/// URL of the Electrum server (such as ElectrumX, Esplora, BWT) may start with `ssl://` or `tcp://` and include a port
///
Expand Down
2 changes: 1 addition & 1 deletion src/blockchain/esplora/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl fmt::Display for EsploraError {
}

/// Configuration for an [`EsploraBlockchain`]
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)]
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)]
pub struct EsploraBlockchainConfig {
/// Base URL of the esplora service
///
Expand Down
162 changes: 137 additions & 25 deletions src/blockchain/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
//! network: bdk::bitcoin::Network::Testnet,
//! wallet_name: "wallet_name".to_string(),
//! sync_params: None,
//! import_params: None,
//! };
//! let blockchain = RpcBlockchain::from_config(&config);
//! ```
Expand All @@ -47,8 +48,9 @@ use bitcoincore_rpc::json::{
use bitcoincore_rpc::jsonrpc::serde_json::{json, Value};
use bitcoincore_rpc::Auth as RpcAuth;
use bitcoincore_rpc::{Client, RpcApi};
use log::{debug, info};
use log::{debug, info, warn};
use serde::{Deserialize, Serialize};
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::ops::Deref;
use std::path::PathBuf;
Expand All @@ -66,6 +68,8 @@ pub struct RpcBlockchain {
capabilities: HashSet<Capability>,
/// Sync parameters.
sync_params: RpcSyncParams,
/// Import parameters.
import_params: RefCell<RpcImportParams>,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be safe I would replace this with an Arc<RpcImportParams>. Then you can use Arc::make_mut() whenever you want to replace it

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't work as the WalletSync is NOT &mut self. We can do Arc<Mutex<RpcImportParams>>? But I think this is too much. I feel like the user can just handle thread-safety elsewhere?

}

impl Deref for RpcBlockchain {
Expand All @@ -77,7 +81,7 @@ impl Deref for RpcBlockchain {
}

/// RpcBlockchain configuration options
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct RpcConfig {
/// The bitcoin node url
pub url: String,
Expand All @@ -89,23 +93,30 @@ pub struct RpcConfig {
pub wallet_name: String,
/// Sync parameters
pub sync_params: Option<RpcSyncParams>,
/// Initial `scriptPubKey` import parameters. [`RpcImportParams`] will be mutated in
/// [`RpcBlockchain`] for every sync to reflect what was imported. The updated params can be
/// obtained via [`RpcBlockchain::import_parameters`].
pub import_params: Option<RpcImportParams>,
}

/// Sync parameters for Bitcoin Core RPC.
///
/// In general, BDK tries to sync `scriptPubKey`s cached in [`crate::database::Database`] with
/// `scriptPubKey`s imported in the Bitcoin Core Wallet. These parameters are used for determining
/// how the `importdescriptors` RPC calls are to be made.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct RpcSyncParams {
/// The minimum number of scripts to scan for on initial sync.
/// The minimum number of scripts to scan for on the first sync.
pub start_script_count: usize,
/// Time in unix seconds in which initial sync will start scanning from (0 to start from genesis).
pub start_time: u64,
/// Forces every sync to use `start_time` as import timestamp.
/// Forces every sync to use [`crate::database::SyncTime`] as import timestamp. The default
/// behavior is to use the last `sync_time` as the import timestamp.
pub force_start_time: bool,
/// RPC poll rate (in seconds) to get state updates.
pub poll_rate_sec: u64,
/// Page size for RPC calls (`importdescriptors`, `importmulti` and `listtransactions`).
pub page_size: usize,
}

impl Default for RpcSyncParams {
Expand All @@ -114,11 +125,24 @@ impl Default for RpcSyncParams {
start_script_count: 100,
start_time: 0,
force_start_time: false,
poll_rate_sec: 3,
poll_rate_sec: 2,
page_size: 200,
}
}
}

/// `scriptPubKey` import parameters for Bitcoin Core RPC.
///
/// This defines which derivation index(es) to start importing from so that BDK can avoid
/// re-importing `scriptPubKey`s into the Core wallet.
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
pub struct RpcImportParams {
/// External derivation index to start import from.
pub external_start_index: usize,
/// Internal derivation index to start import from.
pub internal_start_index: usize,
}

/// This struct is equivalent to [bitcoincore_rpc::Auth] but it implements [serde::Serialize]
/// To be removed once upstream equivalent is implementing Serialize (json serialization format
/// should be the same), see [rust-bitcoincore-rpc/pull/181](https://github.com/rust-bitcoin/rust-bitcoincore-rpc/pull/181)
Expand Down Expand Up @@ -152,6 +176,18 @@ impl From<Auth> for RpcAuth {
}
}

impl RpcBlockchain {
/// Returns the current import parameters.
pub fn get_import_parameters(&self) -> RpcImportParams {
*self.import_params.borrow()
}

/// Replaces the import parameters and returns the old value.
pub fn replace_import_parameters(&self, params: RpcImportParams) -> RpcImportParams {
self.import_params.replace(params)
}
}

impl Blockchain for RpcBlockchain {
fn get_capabilities(&self) -> HashSet<Capability> {
self.capabilities.clone()
Expand Down Expand Up @@ -197,7 +233,11 @@ impl WalletSync for RpcBlockchain {
D: BatchDatabase,
{
let batch = DbState::new(db, &self.sync_params, &*prog)?
.sync_with_core(&self.client, self.is_descriptors)?
.sync_with_core(
&self.client,
&mut *self.import_params.borrow_mut(),
self.is_descriptors,
)?
.as_db_batch()?;

db.commit_batch(batch)
Expand Down Expand Up @@ -273,6 +313,7 @@ impl ConfigurableBlockchain for RpcBlockchain {
capabilities,
is_descriptors,
sync_params: config.sync_params.clone().unwrap_or_default(),
import_params: RefCell::new(config.import_params.unwrap_or_default()),
})
}
}
Expand Down Expand Up @@ -313,7 +354,11 @@ struct DbState<'a, D> {

impl<'a, D: BatchDatabase> DbState<'a, D> {
/// Obtain [DbState] from [crate::database::Database].
fn new(db: &'a D, params: &'a RpcSyncParams, prog: &'a dyn Progress) -> Result<Self, Error> {
fn new(
db: &'a D,
sync_params: &'a RpcSyncParams,
prog: &'a dyn Progress,
) -> Result<Self, Error> {
let ext_spks = db.iter_script_pubkeys(Some(KeychainKind::External))?;
let int_spks = db.iter_script_pubkeys(Some(KeychainKind::Internal))?;

Expand All @@ -324,10 +369,10 @@ impl<'a, D: BatchDatabase> DbState<'a, D> {

// If at least one descriptor is derivable, we need to ensure scriptPubKeys are sufficiently
// cached.
if has_derivable && last_count < params.start_script_count {
if has_derivable && last_count < sync_params.start_script_count {
let inner_err = MissingCachedScripts {
last_count,
missing_count: params.start_script_count - last_count,
missing_count: sync_params.start_script_count - last_count,
};
debug!("requesting more spks with: {:?}", inner_err);
return Err(Error::MissingCachedScripts(inner_err));
Expand Down Expand Up @@ -358,7 +403,7 @@ impl<'a, D: BatchDatabase> DbState<'a, D> {

Ok(Self {
db,
params,
params: sync_params,
prog,
ext_spks,
int_spks,
Expand All @@ -373,7 +418,12 @@ impl<'a, D: BatchDatabase> DbState<'a, D> {

/// Sync states of [BatchDatabase] and Core wallet.
/// First we import all `scriptPubKey`s from database into core wallet
fn sync_with_core(&mut self, client: &Client, is_descriptor: bool) -> Result<&mut Self, Error> {
fn sync_with_core(
&mut self,
client: &Client,
import_params: &mut RpcImportParams,
use_desc: bool,
) -> Result<&mut Self, Error> {
// this tells Core wallet where to sync from for imported scripts
let start_epoch = if self.params.force_start_time {
self.params.start_time
Expand All @@ -383,23 +433,49 @@ impl<'a, D: BatchDatabase> DbState<'a, D> {
.map_or(self.params.start_time, |st| st.block_time.timestamp)
};

// sync scriptPubKeys from Database to Core wallet
let scripts_iter = self.ext_spks.iter().chain(&self.int_spks);
if is_descriptor {
import_descriptors(client, start_epoch, scripts_iter)?;
} else {
import_multi(client, start_epoch, scripts_iter)?;
}

// wait for Core wallet to rescan (TODO: maybe make this async)
await_wallet_scan(client, self.params.poll_rate_sec, self.prog)?;
// sync scriptPubKeys from Database into Core wallet, starting from derivation indexes
// defined in `import_params`
let scripts_iter = {
let ext_spks = self
.ext_spks
.iter()
.skip(import_params.external_start_index);
let int_spks = self
.int_spks
.iter()
.skip(import_params.internal_start_index);
ext_spks.chain(int_spks)
};
paginated_import(
client,
use_desc,
start_epoch,
self.params.poll_rate_sec,
self.params.page_size,
scripts_iter,
self.prog,
)?;
// await_wallet_scan(client, self.params.poll_rate_sec, self.prog)?;

// update import_params
import_params.external_start_index = self.ext_spks.len();
import_params.internal_start_index = self.int_spks.len();

// obtain iterator of pagenated `listtransactions` RPC calls
const LIST_TX_PAGE_SIZE: usize = 100; // item count per page
let tx_iter = list_transactions(client, LIST_TX_PAGE_SIZE)?.filter(|item| {
let tx_iter = list_transactions(client, self.params.page_size)?.filter(|item| {
// filter out conflicting transactions - only accept transactions that are already
// confirmed, or exists in mempool
item.info.confirmations > 0 || client.get_mempool_entry(&item.info.txid).is_ok()
let confirmed = item.info.confirmations > 0;
let in_mempool = client.get_mempool_entry(&item.info.txid).is_ok();

let keep = confirmed || in_mempool;
if !keep {
warn!(
"transaction {} is skipped: confirmed={}, in_mempool={}",
item.info.txid, confirmed, in_mempool
);
}
keep
});

// iterate through chronological results of `listtransactions`
Expand Down Expand Up @@ -710,6 +786,39 @@ where
Ok(())
}

fn paginated_import<'a, S>(
client: &Client,
use_desc: bool,
start_epoch: u64,
poll_rate_sec: u64,
page_size: usize,
scripts_iter: S,
progress: &dyn Progress,
) -> Result<(), Error>
where
S: Iterator<Item = &'a Script> + Clone,
{
(0_usize..)
Comment thread
evanlinjin marked this conversation as resolved.
.step_by(page_size)
.map(|page_start| {
scripts_iter
.clone()
Comment thread
evanlinjin marked this conversation as resolved.
.skip(page_start)
.take(page_size)
.cloned()
.collect::<Vec<_>>()
})
.take_while(|scripts| !scripts.is_empty())
.try_for_each(|scripts| {
if use_desc {
import_descriptors(client, start_epoch, scripts.iter())?;
} else {
import_multi(client, start_epoch, scripts.iter())?;
}
await_wallet_scan(client, poll_rate_sec, progress)
})
}

/// Calls the `listtransactions` RPC method in `page_size`s and returns iterator of the tx results
/// in chronological order.
///
Expand Down Expand Up @@ -864,6 +973,8 @@ impl BlockchainFactory for RpcBlockchainFactory {
checksum
),
sync_params: self.sync_params.clone(),
// TODO @evanlinjin: How can be set this individually for each build?
import_params: Default::default(),
})
}
}
Expand Down Expand Up @@ -891,6 +1002,7 @@ mod test {
network: Network::Regtest,
wallet_name: format!("client-wallet-test-{}", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos() ),
sync_params: None,
import_params: None,
};
RpcBlockchain::from_config(&config).unwrap()
}
Expand Down
6 changes: 6 additions & 0 deletions src/testutils/blockchain_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,12 @@ macro_rules! bdk_blockchain_tests {

blockchain.broadcast(&tx1).expect("broadcasting first");
blockchain.broadcast(&tx2).expect("broadcasting replacement");

// TODO @evanlinjin: Core's `listtransactions` RPC call does not return conflicting
// unconfirmed transactions (unless we re-import associated scriptPubKey/descriptor)
#[cfg(feature = "rpc")]
blockchain.replace_import_parameters(Default::default());

receiver_wallet.sync(&blockchain, SyncOptions::default()).expect("syncing receiver");
assert_eq!(receiver_wallet.get_balance().expect("balance").untrusted_pending, 49_000, "should have received coins once and only once");
}
Expand Down
4 changes: 2 additions & 2 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ pub struct LocalUtxo {
}

/// A [`Utxo`] with its `satisfaction_weight`.
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WeightedUtxo {
/// The weight of the witness data and `scriptSig` expressed in [weight units]. This is used to
/// properly maintain the feerate when adding this input to a transaction during coin selection.
Expand All @@ -177,7 +177,7 @@ pub struct WeightedUtxo {
pub utxo: Utxo,
}

#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq)]
/// An unspent transaction output (UTXO).
pub enum Utxo {
/// A UTXO owned by the local wallet.
Expand Down
2 changes: 1 addition & 1 deletion src/wallet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ pub enum AddressIndex {

/// A derived address and the index it was found at
/// For convenience this automatically derefs to `Address`
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Eq)]
pub struct AddressInfo {
/// Child index of this address
pub index: u32,
Expand Down
2 changes: 1 addition & 1 deletion src/wallet/signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ pub struct SignOptions {
}

/// Customize which taproot script-path leaves the signer should sign.
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TapLeavesOptions {
/// The signer will sign all the leaves it has a key for.
All,
Expand Down