Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/blockchain/esplora/reqwest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//! Esplora by way of `reqwest` HTTP client.

use std::collections::{HashMap, HashSet};
use std::ops::Deref;

use bitcoin::consensus::{deserialize, serialize};
use bitcoin::hashes::hex::{FromHex, ToHex};
Expand Down Expand Up @@ -46,7 +47,7 @@ struct UrlClient {
/// See the [`blockchain::esplora`](crate::blockchain::esplora) module for a usage example.
#[derive(Debug)]
pub struct EsploraBlockchain {
url_client: UrlClient,
pub url_client: UrlClient,
stop_gap: usize,
}

Expand Down Expand Up @@ -101,6 +102,14 @@ impl Blockchain for EsploraBlockchain {
}
}

impl Deref for EsploraBlockchain {
type Target = UrlClient;

fn deref(&self) -> &Self::Target {
&self.url_client
}
}

impl StatelessBlockchain for EsploraBlockchain {}

#[maybe_async]
Expand Down
11 changes: 10 additions & 1 deletion src/blockchain/esplora/ureq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use std::collections::{HashMap, HashSet};
use std::io;
use std::io::Read;
use std::ops::Deref;
use std::time::Duration;

#[allow(unused_imports)]
Expand Down Expand Up @@ -45,7 +46,7 @@ struct UrlClient {
/// See the [`blockchain::esplora`](crate::blockchain::esplora) module for a usage example.
#[derive(Debug)]
pub struct EsploraBlockchain {
url_client: UrlClient,
pub url_client: UrlClient,
stop_gap: usize,
concurrency: u8,
}
Expand Down Expand Up @@ -76,6 +77,14 @@ impl EsploraBlockchain {
}
}

impl Deref for EsploraBlockchain {
type Target = UrlClient;

fn deref(&self) -> &Self::Target {
&self.url_client
}
}

impl Blockchain for EsploraBlockchain {
fn get_capabilities(&self) -> HashSet<Capability> {
vec![
Expand Down
217 changes: 130 additions & 87 deletions src/blockchain/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,27 +201,26 @@ impl ConfigurableBlockchain for RpcBlockchain {
/// Returns RpcBlockchain backend creating an RPC client to a specific wallet named as the descriptor's checksum
/// if it's the first time it creates the wallet in the node and upon return is granted the wallet is loaded
fn from_config(config: &Self::Config) -> Result<Self, Error> {
let wallet_name = config.wallet_name.clone();
let wallet_url = format!("{}/wallet/{}", config.url, &wallet_name);
debug!("connecting to {} auth:{:?}", wallet_url, config.auth);
let wallet_url = format!("{}/wallet/{}", config.url, &config.wallet_name);

let client = Client::new(wallet_url.as_str(), config.auth.clone().into())?;
let rpc_version = client.version()?;

let loaded_wallets = client.list_wallets()?;
if loaded_wallets.contains(&wallet_name) {
debug!("wallet already loaded {:?}", wallet_name);
} else if list_wallet_dir(&client)?.contains(&wallet_name) {
client.load_wallet(&wallet_name)?;
debug!("wallet loaded {:?}", wallet_name);
info!("connected to '{}' with auth: {:?}", wallet_url, config.auth);

if client.list_wallets()?.contains(&config.wallet_name) {
info!("wallet already loaded: {}", config.wallet_name);
} else if list_wallet_dir(&client)?.contains(&config.wallet_name) {
client.load_wallet(&config.wallet_name)?;
info!("wallet loaded: {}", config.wallet_name);
} else {
// pre-0.21 use legacy wallets
if rpc_version < 210_000 {
client.create_wallet(&wallet_name, Some(true), None, None, None)?;
client.create_wallet(&config.wallet_name, Some(true), None, None, None)?;
} else {
// TODO: move back to api call when https://github.com/rust-bitcoin/rust-bitcoincore-rpc/issues/225 is closed
let args = [
Value::String(wallet_name.clone()),
Value::String(config.wallet_name.clone()),
Value::Bool(true),
Value::Bool(false),
Value::Null,
Expand All @@ -231,7 +230,7 @@ impl ConfigurableBlockchain for RpcBlockchain {
let _: Value = client.call("createwallet", &args)?;
}

debug!("wallet created {:?}", wallet_name);
info!("wallet created: {}", config.wallet_name);
}

let is_descriptors = is_wallet_descriptor(&client)?;
Expand Down Expand Up @@ -386,9 +385,16 @@ impl<'a, D: BatchDatabase> DbState<'a, D> {
// wait for Core wallet to rescan (TODO: maybe make this async)
await_wallet_scan(client, self.params.poll_rate_sec, self.prog)?;

// loop through results of Core RPC method `listtransactions`
for tx_res in CoreTxIter::new(client, 100) {
let tx_res = tx_res?;
// obtain iterator of pagenated `listtransactions` RPC calls
const LIST_TX_PAGE_SIZE: usize = 100; // item count per page
let tx_iter = list_transactions(client, LIST_TX_PAGE_SIZE)?.filter(|item| {
// filter out conflicting transactions - only accept transactions that are already
// confirmed, or exists in mempool
item.info.confirmations > 0 || client.get_mempool_entry(&item.info.txid).is_ok()
});

// iterate through chronological results of `listtransactions`
for tx_res in tx_iter {
let mut updated = false;

let db_tx = self.txs.entry(tx_res.info.txid).or_insert_with(|| {
Expand Down Expand Up @@ -695,81 +701,53 @@ where
Ok(())
}

/// Iterates through results of multiple `listtransactions` calls.
struct CoreTxIter<'a> {
client: &'a Client,
/// Calls the `listtransactions` RPC method in `page_size`s and returns iterator of the tx results
/// in chronological order.
///
/// `page_size` cannot be less than 1 and cannot be greater than 1000.
fn list_transactions(
client: &Client,
page_size: usize,
page_index: usize,

stack: Vec<ListTransactionResult>,
done: bool,
}

impl<'a> CoreTxIter<'a> {
fn new(client: &'a Client, mut page_size: usize) -> Self {
if page_size > 1000 {
page_size = 1000;
}

Self {
client,
page_size,
page_index: 0,
stack: Vec::with_capacity(page_size),
done: false,
}
}

/// We want to filter out conflicting transactions.
/// Only accept transactions that are already confirmed, or existing in mempool.
fn keep_tx(&self, item: &ListTransactionResult) -> bool {
item.info.confirmations > 0 || self.client.get_mempool_entry(&item.info.txid).is_ok()
) -> Result<impl Iterator<Item = ListTransactionResult>, Error> {
if !(1..=1000).contains(&page_size) {
return Err(Error::Generic(format!(
"Core RPC method `listtransactions` must have `page_size` in range [1 to 1000]: got {}",
page_size
)));
}
}

impl<'a> Iterator for CoreTxIter<'a> {
type Item = Result<ListTransactionResult, Error>;

fn next(&mut self) -> Option<Self::Item> {
loop {
if self.done {
return None;
}

if let Some(item) = self.stack.pop() {
if self.keep_tx(&item) {
return Some(Ok(item));
}
}

let res = self
.client
.list_transactions(
None,
Some(self.page_size),
Some(self.page_size * self.page_index),
Some(true),
)
.map_err(Error::Rpc);

self.page_index += 1;

let list = match res {
Ok(list) => list,
Err(err) => {
self.done = true;
return Some(Err(err));
}
};

if list.is_empty() {
self.done = true;
return None;
// `.take_while` helper to obtain the first error (TODO: remove when we can use `.map_while`)
let mut got_err = false;

// obtain results in batches (of `page_size`)
let nested_list = (0_usize..)
.map(|page_index| {
client.list_transactions(
None,
Some(page_size),
Some(page_size * page_index),
Some(true),
)
})
// take until returned rpc call is empty or until error
// TODO: replace with the following when MSRV is 1.57.0:
// `.map_while(|res| res.map(|l| if l.is_empty() { None } else { Some(l) }).transpose())`
.take_while(|res| {
if got_err || matches!(res, Ok(list) if list.is_empty()) {
// break if last iteration was an error, or if the current result is empty
false
} else {
// record whether result is error or not
got_err = res.is_err();
// continue on non-empty result or first error
true
}
})
.collect::<Result<Vec<_>, _>>()
.map_err(Error::Rpc)?;

self.stack = list;
}
}
// reverse here to have txs in chronological order
Ok(nested_list.into_iter().rev().flatten())
}

fn await_wallet_scan(client: &Client, rate_sec: u64, progress: &dyn Progress) -> Result<(), Error> {
Expand Down Expand Up @@ -885,10 +863,16 @@ impl BlockchainFactory for RpcBlockchainFactory {
#[cfg(any(feature = "test-rpc", feature = "test-rpc-legacy"))]
mod test {
use super::*;
use crate::testutils::blockchain_tests::TestClient;
use crate::{
descriptor::{into_wallet_descriptor_checked, AsDerived},
testutils::blockchain_tests::TestClient,
wallet::utils::SecpCtx,
};

use bitcoin::Network;
use bitcoin::{Address, Network};
use bitcoincore_rpc::RpcApi;
use log::LevelFilter;
use miniscript::DescriptorTrait;

crate::bdk_blockchain_tests! {
fn test_instance(test_client: &TestClient) -> RpcBlockchain {
Expand Down Expand Up @@ -942,4 +926,63 @@ mod test {
"prefix-bbbbbb"
);
}

/// This test ensures that [list_transactions] always iterates through transactions in
/// chronological order, independent of the `page_size`.
#[test]
fn test_list_transactions() {
let _ = env_logger::builder()
.filter_level(LevelFilter::Info)
.default_format()
.try_init();

const DESC: &'static str = "wpkh(tpubD9zMNV59kgbWgKK55SHJugmKKSt6wQXczxpucGYqNKwGmJp1x7Ar2nrLUXYHDdCctXmyDoSCn2JVMzMUDfib3FaDhwxCEMUELoq19xLSx66/*)";
const AMOUNT_PER_TX: u64 = 10_000;
const TX_COUNT: u32 = 50;

let secp = SecpCtx::default();
let network = Network::Regtest;
let (desc, ..) = into_wallet_descriptor_checked(DESC, &secp, network).unwrap();

let (mut test_client, factory) = get_factory();
let bc = factory.build("itertest", None).unwrap();

// generate scripts (1 tx per script)
let scripts = (0..TX_COUNT)
.map(|index| desc.as_derived(index, &secp).script_pubkey())
.collect::<Vec<_>>();

// import scripts and wait
if bc.is_descriptors {
import_descriptors(&bc.client, 0, scripts.iter()).unwrap();
} else {
import_multi(&bc.client, 0, scripts.iter()).unwrap();
}
await_wallet_scan(&bc.client, 2, &NoopProgress).unwrap();

// create and broadcast txs
let expected_txids = scripts
.iter()
.map(|script| {
let addr = Address::from_script(script, network).unwrap();
let txid =
test_client.receive(testutils! { @tx ( (@addr addr) => AMOUNT_PER_TX ) });
test_client.generate(1, None);
txid
})
.collect::<Vec<_>>();

// iterate through different page sizes - should always return txs in chronological order
[1000, 1, 2, 6, 25, 49, 50].iter().for_each(|page_size| {
println!("trying with page_size: {}", page_size);

let txids = list_transactions(&bc.client, *page_size)
.unwrap()
.map(|res| res.info.txid)
.collect::<Vec<_>>();

assert_eq!(txids.len(), expected_txids.len());
assert_eq!(txids, expected_txids);
});
}
}