diff --git a/Cargo.toml b/Cargo.toml index 18fcfef62..752b36447 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ bitcoin = { version = "0.29.1", features = ["serde", "base64", "rand"] } serde = { version = "^1.0", features = ["derive"] } serde_json = { version = "^1.0" } rand = "^0.8" +thiserror = "1.0" # Optional dependencies sled = { version = "0.34", optional = true } @@ -39,6 +40,9 @@ bitcoinconsensus = { version = "0.19.0-3", optional = true } # Needed by bdk_blockchain_tests macro and the `rpc` feature bitcoincore-rpc = { version = "0.16", optional = true } +# Nakamoto crates for the CBF blockchain +# nakamoto = { git = "https://github.com/cloudhead/nakamoto.git", rev = "14902d3d5cc9442cdda82419f0f24277d86b243b", optional = true } +nakamoto = { version = "0.4.0", optional = true } # Platform-specific dependencies [target.'cfg(not(target_arch = "wasm32"))'.dependencies] tokio = { version = "1", features = ["rt", "macros"] } @@ -55,7 +59,7 @@ verify = ["bitcoinconsensus"] default = ["key-value-db", "electrum"] sqlite = ["rusqlite", "ahash"] sqlite-bundled = ["sqlite", "rusqlite/bundled"] -compact_filters = ["rocksdb", "socks", "cc"] +compact_filters = ["nakamoto"] key-value-db = ["sled"] all-keys = ["keys-bip39"] keys-bip39 = ["bip39"] @@ -93,6 +97,7 @@ reqwest-default-tls = ["esplora-client/async-https"] test-blockchains = ["bitcoincore-rpc", "electrum-client"] test-electrum = ["electrum", "electrsd/electrs_0_8_10", "electrsd/bitcoind_22_0", "test-blockchains"] test-rpc = ["rpc", "electrsd/electrs_0_8_10", "electrsd/bitcoind_22_0", "test-blockchains"] +test-cbf = ["compact_filters", "electrsd/electrs_0_8_10", "electrsd/bitcoind_22_0", "test-blockchains"] test-rpc-legacy = ["rpc", "electrsd/electrs_0_8_10", "electrsd/bitcoind_0_20_0", "test-blockchains"] test-esplora = ["electrsd/legacy", "electrsd/esplora_a33e97e1", "electrsd/bitcoind_22_0", "test-blockchains"] test-md-docs = ["electrum"] diff --git a/examples/compact_filters_balance.rs b/examples/compact_filters_balance.rs index ce875b4d5..cb534d0f0 100644 --- a/examples/compact_filters_balance.rs +++ b/examples/compact_filters_balance.rs @@ -1,41 +1,44 @@ -// Bitcoin Dev Kit -// Written in 2020 by Alekos Filini -// -// Copyright (c) 2020-2021 Bitcoin Dev Kit Developers -// -// This file is licensed under the Apache License, Version 2.0 or the MIT license -// , at your option. -// You may not use this file except in accordance with one or both of these -// licenses. +// // Bitcoin Dev Kit +// // Written in 2020 by Alekos Filini +// // +// // Copyright (c) 2020-2021 Bitcoin Dev Kit Developers +// // +// // This file is licensed under the Apache License, Version 2.0 or the MIT license +// // , at your option. +// // You may not use this file except in accordance with one or both of these +// // licenses. -use bdk::blockchain::compact_filters::*; -use bdk::database::MemoryDatabase; -use bdk::*; -use bitcoin::*; -use blockchain::compact_filters::CompactFiltersBlockchain; -use blockchain::compact_filters::CompactFiltersError; -use log::info; -use std::sync::Arc; +// use bdk::blockchain::compact_filters::*; +// use bdk::database::MemoryDatabase; +// use bdk::*; +// use bitcoin::*; +// use blockchain::compact_filters::CompactFiltersBlockchain; +// use blockchain::compact_filters::CompactFiltersError; +// use log::info; +// use std::sync::Arc; -/// This will return wallet balance using compact filters -/// Requires a synced local bitcoin node 0.21 running on testnet with blockfilterindex=1 and peerblockfilters=1 -fn main() -> Result<(), CompactFiltersError> { - env_logger::init(); - info!("start"); +// /// This will return wallet balance using compact filters +// /// Requires a synced local bitcoin node 0.21 running on testnet with blockfilterindex=1 and peerblockfilters=1 +// fn main() -> Result<(), CompactFiltersError> { +// env_logger::init(); +// info!("start"); - let num_threads = 4; - let mempool = Arc::new(Mempool::default()); - let peers = (0..num_threads) - .map(|_| Peer::connect("localhost:18333", Arc::clone(&mempool), Network::Testnet)) - .collect::>()?; - let blockchain = CompactFiltersBlockchain::new(peers, "./wallet-filters", Some(500_000))?; - info!("done {:?}", blockchain); - let descriptor = "wpkh(tpubD6NzVbkrYhZ4X2yy78HWrr1M9NT8dKeWfzNiQqDdMqqa9UmmGztGGz6TaLFGsLfdft5iu32gxq1T4eMNxExNNWzVCpf9Y6JZi5TnqoC9wJq/*)"; +// let num_threads = 4; +// let mempool = Arc::new(Mempool::default()); +// let peers = (0..num_threads) +// .map(|_| Peer::connect("localhost:18333", Arc::clone(&mempool), Network::Testnet)) +// .collect::>()?; +// let blockchain = CompactFiltersBlockchain::new(peers, "./wallet-filters", Some(500_000))?; +// info!("done {:?}", blockchain); +// let descriptor = "wpkh(tpubD6NzVbkrYhZ4X2yy78HWrr1M9NT8dKeWfzNiQqDdMqqa9UmmGztGGz6TaLFGsLfdft5iu32gxq1T4eMNxExNNWzVCpf9Y6JZi5TnqoC9wJq/*)"; - let database = MemoryDatabase::default(); - let wallet = Arc::new(Wallet::new(descriptor, None, Network::Testnet, database).unwrap()); - wallet.sync(&blockchain, SyncOptions::default()).unwrap(); - info!("balance: {}", wallet.get_balance()?); - Ok(()) -} +// let database = MemoryDatabase::default(); +// let wallet = Arc::new(Wallet::new(descriptor, None, Network::Testnet, database).unwrap()); +// wallet.sync(&blockchain, SyncOptions::default()).unwrap(); +// info!("balance: {}", wallet.get_balance()?); +// Ok(()) +// } + +// TODO make a nakamoto example +fn main() {} diff --git a/src/blockchain/any.rs b/src/blockchain/any.rs index 38b5f117f..082f9e38a 100644 --- a/src/blockchain/any.rs +++ b/src/blockchain/any.rs @@ -84,7 +84,7 @@ pub enum AnyBlockchain { #[cfg(feature = "compact_filters")] #[cfg_attr(docsrs, doc(cfg(feature = "compact_filters")))] /// Compact filters client - CompactFilters(Box), + CompactFilters(Box), #[cfg(feature = "rpc")] #[cfg_attr(docsrs, doc(cfg(feature = "rpc")))] /// RPC client @@ -158,7 +158,7 @@ impl WalletSync for AnyBlockchain { impl_from!(boxed electrum::ElectrumBlockchain, AnyBlockchain, Electrum, #[cfg(feature = "electrum")]); impl_from!(boxed esplora::EsploraBlockchain, AnyBlockchain, Esplora, #[cfg(feature = "esplora")]); -impl_from!(boxed compact_filters::CompactFiltersBlockchain, AnyBlockchain, CompactFilters, #[cfg(feature = "compact_filters")]); +impl_from!(boxed compact_filters::nakamoto::CbfBlockchain, AnyBlockchain, CompactFilters, #[cfg(feature = "compact_filters")]); impl_from!(boxed rpc::RpcBlockchain, AnyBlockchain, Rpc, #[cfg(feature = "rpc")]); /// Type that can contain any of the blockchain configurations defined by the library @@ -210,7 +210,7 @@ pub enum AnyBlockchainConfig { #[cfg(feature = "compact_filters")] #[cfg_attr(docsrs, doc(cfg(feature = "compact_filters")))] /// Compact filters client - CompactFilters(compact_filters::CompactFiltersBlockchainConfig), + CompactFilters(compact_filters::nakamoto::CBFBlockchainConfig), #[cfg(feature = "rpc")] #[cfg_attr(docsrs, doc(cfg(feature = "rpc")))] /// RPC client configuration @@ -232,7 +232,7 @@ impl ConfigurableBlockchain for AnyBlockchain { } #[cfg(feature = "compact_filters")] AnyBlockchainConfig::CompactFilters(inner) => AnyBlockchain::CompactFilters(Box::new( - compact_filters::CompactFiltersBlockchain::from_config(inner)?, + compact_filters::nakamoto::CbfBlockchain::from_config(inner)?, )), #[cfg(feature = "rpc")] AnyBlockchainConfig::Rpc(inner) => { @@ -244,5 +244,5 @@ impl ConfigurableBlockchain for AnyBlockchain { impl_from!(electrum::ElectrumBlockchainConfig, AnyBlockchainConfig, Electrum, #[cfg(feature = "electrum")]); impl_from!(esplora::EsploraBlockchainConfig, AnyBlockchainConfig, Esplora, #[cfg(feature = "esplora")]); -impl_from!(compact_filters::CompactFiltersBlockchainConfig, AnyBlockchainConfig, CompactFilters, #[cfg(feature = "compact_filters")]); +impl_from!(compact_filters::nakamoto::CBFBlockchainConfig, AnyBlockchainConfig, CompactFilters, #[cfg(feature = "compact_filters")]); impl_from!(rpc::RpcConfig, AnyBlockchainConfig, Rpc, #[cfg(feature = "rpc")]); diff --git a/src/blockchain/compact_filters/mod.rs b/src/blockchain/compact_filters/mod.rs index 7845d513d..02f52c993 100644 --- a/src/blockchain/compact_filters/mod.rs +++ b/src/blockchain/compact_filters/mod.rs @@ -26,593 +26,595 @@ //! //! This is an **EXPERIMENTAL** feature, API and other major changes are expected. //! -//! ## Example -//! -//! ```no_run -//! # use std::sync::Arc; -//! # use bitcoin::*; -//! # use bdk::*; -//! # use bdk::blockchain::compact_filters::*; -//! let num_threads = 4; -//! -//! let mempool = Arc::new(Mempool::default()); -//! let peers = (0..num_threads) -//! .map(|_| { -//! Peer::connect( -//! "btcd-mainnet.lightning.computer:8333", -//! Arc::clone(&mempool), -//! Network::Bitcoin, -//! ) -//! }) -//! .collect::>()?; -//! let blockchain = CompactFiltersBlockchain::new(peers, "./wallet-filters", Some(500_000))?; -//! # Ok::<(), CompactFiltersError>(()) -//! ``` - -use std::collections::HashSet; -use std::fmt; -use std::ops::DerefMut; -use std::path::Path; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; - -#[allow(unused_imports)] -use log::{debug, error, info, trace}; - -use bitcoin::network::message_blockdata::Inventory; -use bitcoin::{Network, OutPoint, Transaction, Txid}; - -use rocksdb::{Options, SliceTransform, DB}; - -mod peer; -mod store; -mod sync; - -use crate::blockchain::*; -use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils}; -use crate::error::Error; -use crate::types::{KeychainKind, LocalUtxo, TransactionDetails}; -use crate::{BlockTime, FeeRate}; - -use peer::*; -use store::*; -use sync::*; - -pub use peer::{Mempool, Peer}; - -const SYNC_HEADERS_COST: f32 = 1.0; -const SYNC_FILTERS_COST: f32 = 11.6 * 1_000.0; -const PROCESS_BLOCKS_COST: f32 = 20_000.0; - -/// Structure implementing the required blockchain traits -/// -/// ## Example -/// See the [`blockchain::compact_filters`](crate::blockchain::compact_filters) module for a usage example. -#[derive(Debug)] -pub struct CompactFiltersBlockchain { - peers: Vec>, - headers: Arc>, - skip_blocks: Option, -} - -impl CompactFiltersBlockchain { - /// Construct a new instance given a list of peers, a path to store headers and block - /// filters downloaded during the sync and optionally a number of blocks to ignore starting - /// from the genesis while scanning for the wallet's outputs. - /// - /// For each [`Peer`] specified a new thread will be spawned to download and verify the filters - /// in parallel. It's currently recommended to only connect to a single peer to avoid - /// inconsistencies in the data returned, optionally with multiple connections in parallel to - /// speed-up the sync process. - pub fn new>( - peers: Vec, - storage_dir: P, - skip_blocks: Option, - ) -> Result { - if peers.is_empty() { - return Err(CompactFiltersError::NoPeers); - } - - let mut opts = Options::default(); - opts.create_if_missing(true); - opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(16)); - - let network = peers[0].get_network(); - - let cfs = DB::list_cf(&opts, &storage_dir).unwrap_or_else(|_| vec!["default".to_string()]); - let db = DB::open_cf(&opts, &storage_dir, &cfs)?; - let headers = Arc::new(ChainStore::new(db, network)?); - - // try to recover partial snapshots - for cf_name in &cfs { - if !cf_name.starts_with("_headers:") { - continue; - } - - info!("Trying to recover: {:?}", cf_name); - headers.recover_snapshot(cf_name)?; - } - - Ok(CompactFiltersBlockchain { - peers: peers.into_iter().map(Arc::new).collect(), - headers, - skip_blocks, - }) - } - - /// Process a transaction by looking for inputs that spend from a UTXO in the database or - /// outputs that send funds to a know script_pubkey. - fn process_tx( - &self, - database: &mut D, - tx: &Transaction, - height: Option, - timestamp: Option, - internal_max_deriv: &mut Option, - external_max_deriv: &mut Option, - ) -> Result<(), Error> { - let mut updates = database.begin_batch(); - - let mut incoming: u64 = 0; - let mut outgoing: u64 = 0; - - let mut inputs_sum: u64 = 0; - let mut outputs_sum: u64 = 0; - - // look for our own inputs - for (i, input) in tx.input.iter().enumerate() { - if let Some(previous_output) = database.get_previous_output(&input.previous_output)? { - inputs_sum += previous_output.value; - - // this output is ours, we have a path to derive it - if let Some((keychain, _)) = - database.get_path_from_script_pubkey(&previous_output.script_pubkey)? - { - outgoing += previous_output.value; - - debug!("{} input #{} is mine, setting utxo as spent", tx.txid(), i); - updates.set_utxo(&LocalUtxo { - outpoint: input.previous_output, - txout: previous_output.clone(), - keychain, - is_spent: true, - })?; - } - } - } - - for (i, output) in tx.output.iter().enumerate() { - // to compute the fees later - outputs_sum += output.value; - - // this output is ours, we have a path to derive it - if let Some((keychain, child)) = - database.get_path_from_script_pubkey(&output.script_pubkey)? - { - debug!("{} output #{} is mine, adding utxo", tx.txid(), i); - updates.set_utxo(&LocalUtxo { - outpoint: OutPoint::new(tx.txid(), i as u32), - txout: output.clone(), - keychain, - is_spent: false, - })?; - incoming += output.value; - - if keychain == KeychainKind::Internal - && (internal_max_deriv.is_none() || child > internal_max_deriv.unwrap_or(0)) - { - *internal_max_deriv = Some(child); - } else if keychain == KeychainKind::External - && (external_max_deriv.is_none() || child > external_max_deriv.unwrap_or(0)) - { - *external_max_deriv = Some(child); - } - } - } - - if incoming > 0 || outgoing > 0 { - let tx = TransactionDetails { - txid: tx.txid(), - transaction: Some(tx.clone()), - received: incoming, - sent: outgoing, - confirmation_time: BlockTime::new(height, timestamp), - fee: Some(inputs_sum.saturating_sub(outputs_sum)), - }; - - info!("Saving tx {}", tx.txid); - updates.set_tx(&tx)?; - } - - database.commit_batch(updates)?; - - Ok(()) - } -} - -impl Blockchain for CompactFiltersBlockchain { - fn get_capabilities(&self) -> HashSet { - vec![Capability::FullHistory].into_iter().collect() - } - - fn broadcast(&self, tx: &Transaction) -> Result<(), Error> { - self.peers[0].broadcast_tx(tx.clone())?; - - Ok(()) - } - - fn estimate_fee(&self, _target: usize) -> Result { - // TODO - Ok(FeeRate::default()) - } -} - -impl GetHeight for CompactFiltersBlockchain { - fn get_height(&self) -> Result { - Ok(self.headers.get_height()? as u32) - } -} - -impl GetTx for CompactFiltersBlockchain { - fn get_tx(&self, txid: &Txid) -> Result, Error> { - Ok(self.peers[0] - .get_mempool() - .get_tx(&Inventory::Transaction(*txid))) - } -} - -impl GetBlockHash for CompactFiltersBlockchain { - fn get_block_hash(&self, height: u64) -> Result { - self.headers - .get_block_hash(height as usize)? - .ok_or(Error::CompactFilters( - CompactFiltersError::BlockHashNotFound, - )) - } -} - -impl WalletSync for CompactFiltersBlockchain { - #[allow(clippy::mutex_atomic)] // Mutex is easier to understand than a CAS loop. - fn wallet_setup( - &self, - database: &RefCell, - progress_update: Box, - ) -> Result<(), Error> { - let first_peer = &self.peers[0]; - - let skip_blocks = self.skip_blocks.unwrap_or(0); - - let cf_sync = Arc::new(CfSync::new(Arc::clone(&self.headers), skip_blocks, 0x00)?); - - let initial_height = self.headers.get_height()?; - let total_bundles = (first_peer.get_version().start_height as usize) - .checked_sub(skip_blocks) - .map(|x| x / 1000) - .unwrap_or(0) - + 1; - let expected_bundles_to_sync = total_bundles.saturating_sub(cf_sync.pruned_bundles()?); - - let headers_cost = (first_peer.get_version().start_height as usize) - .saturating_sub(initial_height) as f32 - * SYNC_HEADERS_COST; - let filters_cost = expected_bundles_to_sync as f32 * SYNC_FILTERS_COST; - - let total_cost = headers_cost + filters_cost + PROCESS_BLOCKS_COST; - - if let Some(snapshot) = sync::sync_headers( - Arc::clone(first_peer), - Arc::clone(&self.headers), - |new_height| { - let local_headers_cost = - new_height.saturating_sub(initial_height) as f32 * SYNC_HEADERS_COST; - progress_update.update( - local_headers_cost / total_cost * 100.0, - Some(format!("Synced headers to {}", new_height)), - ) - }, - )? { - if snapshot.work()? > self.headers.work()? { - info!("Applying snapshot with work: {}", snapshot.work()?); - self.headers.apply_snapshot(snapshot)?; - } - } - - let synced_height = self.headers.get_height()?; - let buried_height = synced_height.saturating_sub(sync::BURIED_CONFIRMATIONS); - info!("Synced headers to height: {}", synced_height); - - cf_sync.prepare_sync(Arc::clone(first_peer))?; - - let mut database = database.borrow_mut(); - let database = database.deref_mut(); - - let all_scripts = Arc::new( - database - .iter_script_pubkeys(None)? - .into_iter() - .map(|s| s.to_bytes()) - .collect::>(), - ); - - #[allow(clippy::mutex_atomic)] - let last_synced_block = Arc::new(Mutex::new(synced_height)); - - let synced_bundles = Arc::new(AtomicUsize::new(0)); - let progress_update = Arc::new(Mutex::new(progress_update)); - - let mut threads = Vec::with_capacity(self.peers.len()); - for peer in &self.peers { - let cf_sync = Arc::clone(&cf_sync); - let peer = Arc::clone(peer); - let headers = Arc::clone(&self.headers); - let all_scripts = Arc::clone(&all_scripts); - let last_synced_block = Arc::clone(&last_synced_block); - let progress_update = Arc::clone(&progress_update); - let synced_bundles = Arc::clone(&synced_bundles); - - let thread = std::thread::spawn(move || { - cf_sync.capture_thread_for_sync( - peer, - |block_hash, filter| { - if !filter - .match_any(block_hash, &mut all_scripts.iter().map(AsRef::as_ref))? - { - return Ok(false); - } - - let block_height = headers.get_height_for(block_hash)?.unwrap_or(0); - let saved_correct_block = matches!(headers.get_full_block(block_height)?, Some(block) if &block.block_hash() == block_hash); - - if saved_correct_block { - Ok(false) - } else { - let mut last_synced_block = last_synced_block.lock().unwrap(); - - // If we download a block older than `last_synced_block`, we update it so that - // we know to delete and re-process all txs starting from that height - if block_height < *last_synced_block { - *last_synced_block = block_height; - } - - Ok(true) - } - }, - |index| { - let synced_bundles = synced_bundles.fetch_add(1, Ordering::SeqCst); - let local_filters_cost = synced_bundles as f32 * SYNC_FILTERS_COST; - progress_update.lock().unwrap().update( - (headers_cost + local_filters_cost) / total_cost * 100.0, - Some(format!( - "Synced filters {} - {}", - index * 1000 + 1, - (index + 1) * 1000 - )), - ) - }, - ) - }); - - threads.push(thread); - } - - for t in threads { - t.join().unwrap()?; - } - - progress_update.lock().unwrap().update( - (headers_cost + filters_cost) / total_cost * 100.0, - Some("Processing downloaded blocks and mempool".into()), - )?; - - // delete all txs newer than last_synced_block - let last_synced_block = *last_synced_block.lock().unwrap(); - log::debug!( - "Dropping transactions newer than `last_synced_block` = {}", - last_synced_block - ); - let mut updates = database.begin_batch(); - for details in database.iter_txs(false)? { - match details.confirmation_time { - Some(c) if (c.height as usize) < last_synced_block => continue, - _ => updates.del_tx(&details.txid, false)?, - }; - } - database.commit_batch(updates)?; - - match first_peer.ask_for_mempool() { - Err(CompactFiltersError::PeerBloomDisabled) => { - log::warn!("Peer has BLOOM disabled, we can't ask for the mempool") - } - e => e?, - }; - - let mut internal_max_deriv = None; - let mut external_max_deriv = None; - - for (height, block) in self.headers.iter_full_blocks()? { - for tx in &block.txdata { - self.process_tx( - database, - tx, - Some(height as u32), - None, - &mut internal_max_deriv, - &mut external_max_deriv, - )?; - } - } - for tx in first_peer.get_mempool().iter_txs().iter() { - self.process_tx( - database, - tx, - None, - None, - &mut internal_max_deriv, - &mut external_max_deriv, - )?; - } - - let current_ext = database - .get_last_index(KeychainKind::External)? - .unwrap_or(0); - let first_ext_new = external_max_deriv.map(|x| x + 1).unwrap_or(0); - if first_ext_new > current_ext { - info!("Setting external index to {}", first_ext_new); - database.set_last_index(KeychainKind::External, first_ext_new)?; - } - - let current_int = database - .get_last_index(KeychainKind::Internal)? - .unwrap_or(0); - let first_int_new = internal_max_deriv.map(|x| x + 1).unwrap_or(0); - if first_int_new > current_int { - info!("Setting internal index to {}", first_int_new); - database.set_last_index(KeychainKind::Internal, first_int_new)?; - } - - info!("Dropping blocks until {}", buried_height); - self.headers.delete_blocks_until(buried_height)?; - - progress_update - .lock() - .unwrap() - .update(100.0, Some("Done".into()))?; - - Ok(()) - } -} - -/// Data to connect to a Bitcoin P2P peer -#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)] -pub struct BitcoinPeerConfig { - /// Peer address such as 127.0.0.1:18333 - pub address: String, - /// Optional socks5 proxy - pub socks5: Option, - /// Optional socks5 proxy credentials - pub socks5_credentials: Option<(String, String)>, -} - -/// Configuration for a [`CompactFiltersBlockchain`] -#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)] -pub struct CompactFiltersBlockchainConfig { - /// List of peers to try to connect to for asking headers and filters - pub peers: Vec, - /// Network used - pub network: Network, - /// Storage dir to save partially downloaded headers and full blocks. Should be a separate directory per descriptor. Consider using [crate::wallet::wallet_name_from_descriptor] for this. - pub storage_dir: String, - /// Optionally skip initial `skip_blocks` blocks (default: 0) - pub skip_blocks: Option, -} - -impl ConfigurableBlockchain for CompactFiltersBlockchain { - type Config = CompactFiltersBlockchainConfig; - - fn from_config(config: &Self::Config) -> Result { - let mempool = Arc::new(Mempool::default()); - let peers = config - .peers - .iter() - .map(|peer_conf| match &peer_conf.socks5 { - None => Peer::connect(&peer_conf.address, Arc::clone(&mempool), config.network), - Some(proxy) => Peer::connect_proxy( - peer_conf.address.as_str(), - proxy, - peer_conf - .socks5_credentials - .as_ref() - .map(|(a, b)| (a.as_str(), b.as_str())), - Arc::clone(&mempool), - config.network, - ), - }) - .collect::>()?; - - Ok(CompactFiltersBlockchain::new( - peers, - &config.storage_dir, - config.skip_blocks, - )?) - } -} - -/// An error that can occur during sync with a [`CompactFiltersBlockchain`] -#[derive(Debug)] -pub enum CompactFiltersError { - /// A peer sent an invalid or unexpected response - InvalidResponse, - /// The headers returned are invalid - InvalidHeaders, - /// The compact filter headers returned are invalid - InvalidFilterHeader, - /// The compact filter returned is invalid - InvalidFilter, - /// The peer is missing a block in the valid chain - MissingBlock, - /// Block hash at specified height not found - BlockHashNotFound, - /// The data stored in the block filters storage are corrupted - DataCorruption, - - /// A peer is not connected - NotConnected, - /// A peer took too long to reply to one of our messages - Timeout, - /// The peer doesn't advertise the [`BLOOM`](bitcoin::network::constants::ServiceFlags::BLOOM) service flag - PeerBloomDisabled, - - /// No peers have been specified - NoPeers, - - /// Internal database error - Db(rocksdb::Error), - /// Internal I/O error - Io(std::io::Error), - /// Invalid BIP158 filter - Bip158(bitcoin::util::bip158::Error), - /// Internal system time error - Time(std::time::SystemTimeError), - - /// Wrapper for [`crate::error::Error`] - Global(Box), -} - -impl fmt::Display for CompactFiltersError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::InvalidResponse => write!(f, "A peer sent an invalid or unexpected response"), - Self::InvalidHeaders => write!(f, "Invalid headers"), - Self::InvalidFilterHeader => write!(f, "Invalid filter header"), - Self::InvalidFilter => write!(f, "Invalid filters"), - Self::MissingBlock => write!(f, "The peer is missing a block in the valid chain"), - Self::BlockHashNotFound => write!(f, "Block hash not found"), - Self::DataCorruption => write!( - f, - "The data stored in the block filters storage are corrupted" - ), - Self::NotConnected => write!(f, "A peer is not connected"), - Self::Timeout => write!(f, "A peer took too long to reply to one of our messages"), - Self::PeerBloomDisabled => write!(f, "Peer doesn't advertise the BLOOM service flag"), - Self::NoPeers => write!(f, "No peers have been specified"), - Self::Db(err) => write!(f, "Internal database error: {}", err), - Self::Io(err) => write!(f, "Internal I/O error: {}", err), - Self::Bip158(err) => write!(f, "Invalid BIP158 filter: {}", err), - Self::Time(err) => write!(f, "Invalid system time: {}", err), - Self::Global(err) => write!(f, "Generic error: {}", err), - } - } -} - -impl std::error::Error for CompactFiltersError {} - -impl_error!(rocksdb::Error, Db, CompactFiltersError); -impl_error!(std::io::Error, Io, CompactFiltersError); -impl_error!(bitcoin::util::bip158::Error, Bip158, CompactFiltersError); -impl_error!(std::time::SystemTimeError, Time, CompactFiltersError); - -impl From for CompactFiltersError { - fn from(err: crate::error::Error) -> Self { - CompactFiltersError::Global(Box::new(err)) - } -} +// //! ## Example +// //! +// //! ```no_run +// //! # use std::sync::Arc; +// //! # use bitcoin::*; +// //! # use bdk::*; +// //! # use bdk::blockchain::compact_filters::*; +// //! let num_threads = 4; +// //! +// //! let mempool = Arc::new(Mempool::default()); +// //! let peers = (0..num_threads) +// //! .map(|_| { +// //! Peer::connect( +// //! "btcd-mainnet.lightning.computer:8333", +// //! Arc::clone(&mempool), +// //! Network::Bitcoin, +// //! ) +// //! }) +// //! .collect::>()?; +// //! let blockchain = CompactFiltersBlockchain::new(peers, "./wallet-filters", Some(500_000))?; +// //! # Ok::<(), CompactFiltersError>(()) +// //! ``` + +// use std::collections::HashSet; +// use std::fmt; +// use std::ops::DerefMut; +// use std::path::Path; +// use std::sync::atomic::{AtomicUsize, Ordering}; +// use std::sync::{Arc, Mutex}; + +// // #[allow(unused_imports)] +// // use log::{debug, error, info, trace}; + +// // use bitcoin::network::message_blockdata::Inventory; +// // use bitcoin::{Network, OutPoint, Transaction, Txid}; + +// // use rocksdb::{Options, SliceTransform, DB}; + +// // mod peer; +// // mod store; +// // mod sync; + +pub mod nakamoto; + +// use crate::blockchain::*; +// use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils}; +// use crate::error::Error; +// use crate::types::{KeychainKind, LocalUtxo, TransactionDetails}; +// use crate::{BlockTime, FeeRate}; + +// use peer::*; +// use store::*; +// use sync::*; + +// pub use peer::{Mempool, Peer}; + +// const SYNC_HEADERS_COST: f32 = 1.0; +// const SYNC_FILTERS_COST: f32 = 11.6 * 1_000.0; +// const PROCESS_BLOCKS_COST: f32 = 20_000.0; + +// /// Structure implementing the required blockchain traits +// /// +// /// ## Example +// /// See the [`blockchain::compact_filters`](crate::blockchain::compact_filters) module for a usage example. +// #[derive(Debug)] +// pub struct CompactFiltersBlockchain { +// peers: Vec>, +// headers: Arc>, +// skip_blocks: Option, +// } + +// impl CompactFiltersBlockchain { +// /// Construct a new instance given a list of peers, a path to store headers and block +// /// filters downloaded during the sync and optionally a number of blocks to ignore starting +// /// from the genesis while scanning for the wallet's outputs. +// /// +// /// For each [`Peer`] specified a new thread will be spawned to download and verify the filters +// /// in parallel. It's currently recommended to only connect to a single peer to avoid +// /// inconsistencies in the data returned, optionally with multiple connections in parallel to +// /// speed-up the sync process. +// pub fn new>( +// peers: Vec, +// storage_dir: P, +// skip_blocks: Option, +// ) -> Result { +// if peers.is_empty() { +// return Err(CompactFiltersError::NoPeers); +// } + +// let mut opts = Options::default(); +// opts.create_if_missing(true); +// opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(16)); + +// let network = peers[0].get_network(); + +// let cfs = DB::list_cf(&opts, &storage_dir).unwrap_or_else(|_| vec!["default".to_string()]); +// let db = DB::open_cf(&opts, &storage_dir, &cfs)?; +// let headers = Arc::new(ChainStore::new(db, network)?); + +// // try to recover partial snapshots +// for cf_name in &cfs { +// if !cf_name.starts_with("_headers:") { +// continue; +// } + +// info!("Trying to recover: {:?}", cf_name); +// headers.recover_snapshot(cf_name)?; +// } + +// Ok(CompactFiltersBlockchain { +// peers: peers.into_iter().map(Arc::new).collect(), +// headers, +// skip_blocks, +// }) +// } + +// /// Process a transaction by looking for inputs that spend from a UTXO in the database or +// /// outputs that send funds to a know script_pubkey. +// fn process_tx( +// &self, +// database: &mut D, +// tx: &Transaction, +// height: Option, +// timestamp: Option, +// internal_max_deriv: &mut Option, +// external_max_deriv: &mut Option, +// ) -> Result<(), Error> { +// let mut updates = database.begin_batch(); + +// let mut incoming: u64 = 0; +// let mut outgoing: u64 = 0; + +// let mut inputs_sum: u64 = 0; +// let mut outputs_sum: u64 = 0; + +// // look for our own inputs +// for (i, input) in tx.input.iter().enumerate() { +// if let Some(previous_output) = database.get_previous_output(&input.previous_output)? { +// inputs_sum += previous_output.value; + +// // this output is ours, we have a path to derive it +// if let Some((keychain, _)) = +// database.get_path_from_script_pubkey(&previous_output.script_pubkey)? +// { +// outgoing += previous_output.value; + +// debug!("{} input #{} is mine, setting utxo as spent", tx.txid(), i); +// updates.set_utxo(&LocalUtxo { +// outpoint: input.previous_output, +// txout: previous_output.clone(), +// keychain, +// is_spent: true, +// })?; +// } +// } +// } + +// for (i, output) in tx.output.iter().enumerate() { +// // to compute the fees later +// outputs_sum += output.value; + +// // this output is ours, we have a path to derive it +// if let Some((keychain, child)) = +// database.get_path_from_script_pubkey(&output.script_pubkey)? +// { +// debug!("{} output #{} is mine, adding utxo", tx.txid(), i); +// updates.set_utxo(&LocalUtxo { +// outpoint: OutPoint::new(tx.txid(), i as u32), +// txout: output.clone(), +// keychain, +// is_spent: false, +// })?; +// incoming += output.value; + +// if keychain == KeychainKind::Internal +// && (internal_max_deriv.is_none() || child > internal_max_deriv.unwrap_or(0)) +// { +// *internal_max_deriv = Some(child); +// } else if keychain == KeychainKind::External +// && (external_max_deriv.is_none() || child > external_max_deriv.unwrap_or(0)) +// { +// *external_max_deriv = Some(child); +// } +// } +// } + +// if incoming > 0 || outgoing > 0 { +// let tx = TransactionDetails { +// txid: tx.txid(), +// transaction: Some(tx.clone()), +// received: incoming, +// sent: outgoing, +// confirmation_time: BlockTime::new(height, timestamp), +// fee: Some(inputs_sum.saturating_sub(outputs_sum)), +// }; + +// info!("Saving tx {}", tx.txid); +// updates.set_tx(&tx)?; +// } + +// database.commit_batch(updates)?; + +// Ok(()) +// } +// } + +// impl Blockchain for CompactFiltersBlockchain { +// fn get_capabilities(&self) -> HashSet { +// vec![Capability::FullHistory].into_iter().collect() +// } + +// fn broadcast(&self, tx: &Transaction) -> Result<(), Error> { +// self.peers[0].broadcast_tx(tx.clone())?; + +// Ok(()) +// } + +// fn estimate_fee(&self, _target: usize) -> Result { +// // TODO +// Ok(FeeRate::default()) +// } +// } + +// impl GetHeight for CompactFiltersBlockchain { +// fn get_height(&self) -> Result { +// Ok(self.headers.get_height()? as u32) +// } +// } + +// impl GetTx for CompactFiltersBlockchain { +// fn get_tx(&self, txid: &Txid) -> Result, Error> { +// Ok(self.peers[0] +// .get_mempool() +// .get_tx(&Inventory::Transaction(*txid))) +// } +// } + +// impl GetBlockHash for CompactFiltersBlockchain { +// fn get_block_hash(&self, height: u64) -> Result { +// self.headers +// .get_block_hash(height as usize)? +// .ok_or(Error::CompactFilters( +// CompactFiltersError::BlockHashNotFound, +// )) +// } +// } + +// impl WalletSync for CompactFiltersBlockchain { +// #[allow(clippy::mutex_atomic)] // Mutex is easier to understand than a CAS loop. +// fn wallet_setup( +// &self, +// database: &RefCell, +// progress_update: Box, +// ) -> Result<(), Error> { +// let first_peer = &self.peers[0]; + +// let skip_blocks = self.skip_blocks.unwrap_or(0); + +// let cf_sync = Arc::new(CfSync::new(Arc::clone(&self.headers), skip_blocks, 0x00)?); + +// let initial_height = self.headers.get_height()?; +// let total_bundles = (first_peer.get_version().start_height as usize) +// .checked_sub(skip_blocks) +// .map(|x| x / 1000) +// .unwrap_or(0) +// + 1; +// let expected_bundles_to_sync = total_bundles.saturating_sub(cf_sync.pruned_bundles()?); + +// let headers_cost = (first_peer.get_version().start_height as usize) +// .saturating_sub(initial_height) as f32 +// * SYNC_HEADERS_COST; +// let filters_cost = expected_bundles_to_sync as f32 * SYNC_FILTERS_COST; + +// let total_cost = headers_cost + filters_cost + PROCESS_BLOCKS_COST; + +// if let Some(snapshot) = sync::sync_headers( +// Arc::clone(first_peer), +// Arc::clone(&self.headers), +// |new_height| { +// let local_headers_cost = +// new_height.saturating_sub(initial_height) as f32 * SYNC_HEADERS_COST; +// progress_update.update( +// local_headers_cost / total_cost * 100.0, +// Some(format!("Synced headers to {}", new_height)), +// ) +// }, +// )? { +// if snapshot.work()? > self.headers.work()? { +// info!("Applying snapshot with work: {}", snapshot.work()?); +// self.headers.apply_snapshot(snapshot)?; +// } +// } + +// let synced_height = self.headers.get_height()?; +// let buried_height = synced_height.saturating_sub(sync::BURIED_CONFIRMATIONS); +// info!("Synced headers to height: {}", synced_height); + +// cf_sync.prepare_sync(Arc::clone(first_peer))?; + +// let mut database = database.borrow_mut(); +// let database = database.deref_mut(); + +// let all_scripts = Arc::new( +// database +// .iter_script_pubkeys(None)? +// .into_iter() +// .map(|s| s.to_bytes()) +// .collect::>(), +// ); + +// #[allow(clippy::mutex_atomic)] +// let last_synced_block = Arc::new(Mutex::new(synced_height)); + +// let synced_bundles = Arc::new(AtomicUsize::new(0)); +// let progress_update = Arc::new(Mutex::new(progress_update)); + +// let mut threads = Vec::with_capacity(self.peers.len()); +// for peer in &self.peers { +// let cf_sync = Arc::clone(&cf_sync); +// let peer = Arc::clone(peer); +// let headers = Arc::clone(&self.headers); +// let all_scripts = Arc::clone(&all_scripts); +// let last_synced_block = Arc::clone(&last_synced_block); +// let progress_update = Arc::clone(&progress_update); +// let synced_bundles = Arc::clone(&synced_bundles); + +// let thread = std::thread::spawn(move || { +// cf_sync.capture_thread_for_sync( +// peer, +// |block_hash, filter| { +// if !filter +// .match_any(block_hash, &mut all_scripts.iter().map(AsRef::as_ref))? +// { +// return Ok(false); +// } + +// let block_height = headers.get_height_for(block_hash)?.unwrap_or(0); +// let saved_correct_block = matches!(headers.get_full_block(block_height)?, Some(block) if &block.block_hash() == block_hash); + +// if saved_correct_block { +// Ok(false) +// } else { +// let mut last_synced_block = last_synced_block.lock().unwrap(); + +// // If we download a block older than `last_synced_block`, we update it so that +// // we know to delete and re-process all txs starting from that height +// if block_height < *last_synced_block { +// *last_synced_block = block_height; +// } + +// Ok(true) +// } +// }, +// |index| { +// let synced_bundles = synced_bundles.fetch_add(1, Ordering::SeqCst); +// let local_filters_cost = synced_bundles as f32 * SYNC_FILTERS_COST; +// progress_update.lock().unwrap().update( +// (headers_cost + local_filters_cost) / total_cost * 100.0, +// Some(format!( +// "Synced filters {} - {}", +// index * 1000 + 1, +// (index + 1) * 1000 +// )), +// ) +// }, +// ) +// }); + +// threads.push(thread); +// } + +// for t in threads { +// t.join().unwrap()?; +// } + +// progress_update.lock().unwrap().update( +// (headers_cost + filters_cost) / total_cost * 100.0, +// Some("Processing downloaded blocks and mempool".into()), +// )?; + +// // delete all txs newer than last_synced_block +// let last_synced_block = *last_synced_block.lock().unwrap(); +// log::debug!( +// "Dropping transactions newer than `last_synced_block` = {}", +// last_synced_block +// ); +// let mut updates = database.begin_batch(); +// for details in database.iter_txs(false)? { +// match details.confirmation_time { +// Some(c) if (c.height as usize) < last_synced_block => continue, +// _ => updates.del_tx(&details.txid, false)?, +// }; +// } +// database.commit_batch(updates)?; + +// match first_peer.ask_for_mempool() { +// Err(CompactFiltersError::PeerBloomDisabled) => { +// log::warn!("Peer has BLOOM disabled, we can't ask for the mempool") +// } +// e => e?, +// }; + +// let mut internal_max_deriv = None; +// let mut external_max_deriv = None; + +// for (height, block) in self.headers.iter_full_blocks()? { +// for tx in &block.txdata { +// self.process_tx( +// database, +// tx, +// Some(height as u32), +// None, +// &mut internal_max_deriv, +// &mut external_max_deriv, +// )?; +// } +// } +// for tx in first_peer.get_mempool().iter_txs().iter() { +// self.process_tx( +// database, +// tx, +// None, +// None, +// &mut internal_max_deriv, +// &mut external_max_deriv, +// )?; +// } + +// let current_ext = database +// .get_last_index(KeychainKind::External)? +// .unwrap_or(0); +// let first_ext_new = external_max_deriv.map(|x| x + 1).unwrap_or(0); +// if first_ext_new > current_ext { +// info!("Setting external index to {}", first_ext_new); +// database.set_last_index(KeychainKind::External, first_ext_new)?; +// } + +// let current_int = database +// .get_last_index(KeychainKind::Internal)? +// .unwrap_or(0); +// let first_int_new = internal_max_deriv.map(|x| x + 1).unwrap_or(0); +// if first_int_new > current_int { +// info!("Setting internal index to {}", first_int_new); +// database.set_last_index(KeychainKind::Internal, first_int_new)?; +// } + +// info!("Dropping blocks until {}", buried_height); +// self.headers.delete_blocks_until(buried_height)?; + +// progress_update +// .lock() +// .unwrap() +// .update(100.0, Some("Done".into()))?; + +// Ok(()) +// } +// } + +// /// Data to connect to a Bitcoin P2P peer +// #[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)] +// pub struct BitcoinPeerConfig { +// /// Peer address such as 127.0.0.1:18333 +// pub address: String, +// /// Optional socks5 proxy +// pub socks5: Option, +// /// Optional socks5 proxy credentials +// pub socks5_credentials: Option<(String, String)>, +// } + +// /// Configuration for a [`CompactFiltersBlockchain`] +// #[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)] +// pub struct CompactFiltersBlockchainConfig { +// /// List of peers to try to connect to for asking headers and filters +// pub peers: Vec, +// /// Network used +// pub network: Network, +// /// Storage dir to save partially downloaded headers and full blocks. Should be a separate directory per descriptor. Consider using [crate::wallet::wallet_name_from_descriptor] for this. +// pub storage_dir: String, +// /// Optionally skip initial `skip_blocks` blocks (default: 0) +// pub skip_blocks: Option, +// } + +// impl ConfigurableBlockchain for CompactFiltersBlockchain { +// type Config = CompactFiltersBlockchainConfig; + +// fn from_config(config: &Self::Config) -> Result { +// let mempool = Arc::new(Mempool::default()); +// let peers = config +// .peers +// .iter() +// .map(|peer_conf| match &peer_conf.socks5 { +// None => Peer::connect(&peer_conf.address, Arc::clone(&mempool), config.network), +// Some(proxy) => Peer::connect_proxy( +// peer_conf.address.as_str(), +// proxy, +// peer_conf +// .socks5_credentials +// .as_ref() +// .map(|(a, b)| (a.as_str(), b.as_str())), +// Arc::clone(&mempool), +// config.network, +// ), +// }) +// .collect::>()?; + +// Ok(CompactFiltersBlockchain::new( +// peers, +// &config.storage_dir, +// config.skip_blocks, +// )?) +// } +// } + +// /// An error that can occur during sync with a [`CompactFiltersBlockchain`] +// #[derive(Debug)] +// pub enum CompactFiltersError { +// /// A peer sent an invalid or unexpected response +// InvalidResponse, +// /// The headers returned are invalid +// InvalidHeaders, +// /// The compact filter headers returned are invalid +// InvalidFilterHeader, +// /// The compact filter returned is invalid +// InvalidFilter, +// /// The peer is missing a block in the valid chain +// MissingBlock, +// /// Block hash at specified height not found +// BlockHashNotFound, +// /// The data stored in the block filters storage are corrupted +// DataCorruption, + +// /// A peer is not connected +// NotConnected, +// /// A peer took too long to reply to one of our messages +// Timeout, +// /// The peer doesn't advertise the [`BLOOM`](bitcoin::network::constants::ServiceFlags::BLOOM) service flag +// PeerBloomDisabled, + +// /// No peers have been specified +// NoPeers, + +// /// Internal database error +// Db(rocksdb::Error), +// /// Internal I/O error +// Io(std::io::Error), +// /// Invalid BIP158 filter +// Bip158(bitcoin::util::bip158::Error), +// /// Internal system time error +// Time(std::time::SystemTimeError), + +// /// Wrapper for [`crate::error::Error`] +// Global(Box), +// } + +// impl fmt::Display for CompactFiltersError { +// fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { +// match self { +// Self::InvalidResponse => write!(f, "A peer sent an invalid or unexpected response"), +// Self::InvalidHeaders => write!(f, "Invalid headers"), +// Self::InvalidFilterHeader => write!(f, "Invalid filter header"), +// Self::InvalidFilter => write!(f, "Invalid filters"), +// Self::MissingBlock => write!(f, "The peer is missing a block in the valid chain"), +// Self::BlockHashNotFound => write!(f, "Block hash not found"), +// Self::DataCorruption => write!( +// f, +// "The data stored in the block filters storage are corrupted" +// ), +// Self::NotConnected => write!(f, "A peer is not connected"), +// Self::Timeout => write!(f, "A peer took too long to reply to one of our messages"), +// Self::PeerBloomDisabled => write!(f, "Peer doesn't advertise the BLOOM service flag"), +// Self::NoPeers => write!(f, "No peers have been specified"), +// Self::Db(err) => write!(f, "Internal database error: {}", err), +// Self::Io(err) => write!(f, "Internal I/O error: {}", err), +// Self::Bip158(err) => write!(f, "Invalid BIP158 filter: {}", err), +// Self::Time(err) => write!(f, "Invalid system time: {}", err), +// Self::Global(err) => write!(f, "Generic error: {}", err), +// } +// } +// } + +// impl std::error::Error for CompactFiltersError {} + +// impl_error!(rocksdb::Error, Db, CompactFiltersError); +// impl_error!(std::io::Error, Io, CompactFiltersError); +// impl_error!(bitcoin::util::bip158::Error, Bip158, CompactFiltersError); +// impl_error!(std::time::SystemTimeError, Time, CompactFiltersError); + +// impl From for CompactFiltersError { +// fn from(err: crate::error::Error) -> Self { +// CompactFiltersError::Global(Box::new(err)) +// } +// } diff --git a/src/blockchain/compact_filters/nakamoto.rs b/src/blockchain/compact_filters/nakamoto.rs new file mode 100644 index 000000000..3b9f11c93 --- /dev/null +++ b/src/blockchain/compact_filters/nakamoto.rs @@ -0,0 +1,717 @@ +// Bitcoin Dev Kit +// Written in 2020 by Alekos Filini +// +// Copyright (c) 2020-2021 Bitcoin Dev Kit Developers +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + +//! This is Compact Filter (BIP157) Blockchain implementation using the nakamoto crate. +//! https://github.com/cloudhead/nakamoto +//! +//! This module implements BDK's [crate::blockchain::Blockchain] trait over a nakamoto +//! client. Which is it's own state machine that maintains the compact filter database +//! and interface with the Bitcoin p2p network using the BIP157/BIP158 protocol. +#![allow(dead_code)] + +use crate::blockchain::{ + Blockchain, Capability, ConfigurableBlockchain, GetBlockHash, GetHeight, GetTx, WalletSync, +}; +use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils}; +use crate::{BlockTime, FeeRate, KeychainKind, LocalUtxo, TransactionDetails}; +use bitcoin::{Block, OutPoint, Script, Transaction, Txid}; +use log::{debug, info}; +use nakamoto::p2p::fsm::fees::FeeEstimate; +use nakamoto::{ + client::{ + chan::Receiver, + handle::Handle, + Client, + Config, + Event, + Handle as ClientHandle, + //event::TxStatus + }, + net::poll::Waker, +}; +use std::cell::{Cell, RefCell}; +use std::collections::{HashMap, HashSet}; +use std::net::SocketAddr; +use std::ops::DerefMut; +use std::path::PathBuf; +use std::time::Duration; +use std::{net::TcpStream, thread}; +use thiserror::Error; + +type Reactor = nakamoto::net::poll::Reactor; + +/// An Error that occurred during the compact block filter operation. +#[derive(Debug, Error)] +pub enum CbfError { + /// Global BDK error + #[error(transparent)] + Global(#[from] Box), + + /// Nakamoto client error + #[error(transparent)] + Nakamoto(#[from] nakamoto::client::Error), +} + +impl From for CbfError { + fn from(err: crate::error::Error) -> Self { + CbfError::Global(Box::new(err)) + } +} + +/// Process a transaction by looking for inputs that spend from a UTXO in the database or +/// outputs that send funds to a know script_pubkey. +fn add_tx( + database: &mut impl BatchDatabase, + tx: &Transaction, + height: Option, + timestamp: Option, + internal_max_deriv: &mut Option, + external_max_deriv: &mut Option, +) -> Result<(), CbfError> { + let mut updates = database.begin_batch(); + + let mut incoming: u64 = 0; + let mut outgoing: u64 = 0; + let mut inputs_sum: u64 = 0; + let mut outputs_sum: u64 = 0; + + // Sanity check for conflicts + let db_txs = database.iter_raw_txs()?; + let conflicts = db_txs + .iter() + .filter_map(|existing_tx| { + let existing_inputs = existing_tx + .input + .iter() + .map(|txin| txin.previous_output) + .collect::>(); + let current_inputs = tx + .input + .iter() + .map(|txin| txin.previous_output) + .collect::>(); + if !current_inputs + .intersection(&existing_inputs) + .collect::>() + .is_empty() + && existing_tx.txid() != tx.txid() + { + Some(existing_tx) + } else { + None + } + }) + .collect::>(); + + for conflict in conflicts { + debug!( + "{} Conflicts with existing tx {}", + tx.txid(), + conflict.txid() + ); + // TODO: Make more intelligent checking following RBF rules. + if conflict.is_explicitly_rbf() { + debug!("{} Replaces {} via RBF", tx.txid(), conflict.txid()); + // Assuming the latest RBF transaction is the right transaction to keep. + // Delete the existing one. + delete_tx(database, &conflict.txid())?; + } else { + debug!( + "{} is non-rbf and conflicts with {}. Rejected", + tx.txid(), + conflict.txid() + ); + return Ok(()); + } + } + + // Process transaction inputs + for input in tx.input.iter() { + if let Some(previous_output) = database.get_previous_output(&input.previous_output)? { + inputs_sum += previous_output.value; + + // this output is ours, we have a path to derive it + if let Some((keychain, _)) = + database.get_path_from_script_pubkey(&previous_output.script_pubkey)? + { + outgoing += previous_output.value; + + debug!("{} is mine, setting utxo as spent", input.previous_output); + updates.set_utxo(&LocalUtxo { + outpoint: input.previous_output, + txout: previous_output.clone(), + keychain, + is_spent: true, + })?; + } + } + } + + // Process transaction outputs + for (i, output) in tx.output.iter().enumerate() { + outputs_sum += output.value; + + // When the output is ours + if let Some((keychain, child)) = + database.get_path_from_script_pubkey(&output.script_pubkey)? + { + debug!("{} output #{} is mine, adding utxo", tx.txid(), i); + updates.set_utxo(&LocalUtxo { + outpoint: OutPoint::new(tx.txid(), i as u32), + txout: output.clone(), + keychain, + is_spent: false, + })?; + incoming += output.value; + + if keychain == KeychainKind::Internal + && (internal_max_deriv.is_none() || child > internal_max_deriv.unwrap_or(0)) + { + *internal_max_deriv = Some(child); + } else if keychain == KeychainKind::External + && (external_max_deriv.is_none() || child > external_max_deriv.unwrap_or(0)) + { + *external_max_deriv = Some(child); + } + } + } + + if incoming > 0 || outgoing > 0 { + let tx = TransactionDetails { + txid: tx.txid(), + transaction: Some(tx.clone()), + received: incoming, + sent: outgoing, + confirmation_time: BlockTime::new(height, timestamp), + fee: Some(inputs_sum.saturating_sub(outputs_sum)), + }; + + debug!("Saving tx {}", tx.txid); + updates.set_tx(&tx)?; + } + + database.commit_batch(updates)?; + + Ok(()) +} + +/// Mark as transaction as unconfirmed in the database. +/// This is called in block reorg cases, where previously confirmed transactions becomes unconfirmed. +fn unconfirm_tx( + database: &mut impl BatchDatabase, + txid: &Txid, +) -> Result { + let mut tx_details = database + .get_tx(&txid, false)? + .expect("We must have the transaction at this stage"); + tx_details.confirmation_time = None; + database.set_tx(&tx_details)?; + debug!("Existing transaction marked unconfirmed : {}", txid); + Ok(tx_details) +} + +/// Delete a Transaction from the database, updating related UTXO records. +/// This is called in a a RBF case. +fn delete_tx(database: &mut impl BatchDatabase, txid: &Txid) -> Result<(), CbfError> { + let mut updates = database.begin_batch(); + + if let Some(db_tx) = database.get_raw_tx(txid)? { + let txid = db_tx.txid(); + // Mark input utxos as unspent + for input in db_tx.input { + let prev_output = input.previous_output; + if let Some(mut existing_utxo) = database.get_utxo(&prev_output)? { + debug!("Setting UTXO : {} as `unspent`", existing_utxo.outpoint); + existing_utxo.is_spent = false; + updates.set_utxo(&existing_utxo)?; + } + } + + // Delete output utxos + for (i, _) in db_tx.output.iter().enumerate() { + let outpoint = OutPoint::new(txid, i as u32); + if let Some(deleted_utxo) = updates.del_utxo(&outpoint)? { + debug!("Deleting Existing UTXO {}", deleted_utxo.outpoint); + } + } + + // Delete the transaction data + let _ = database + .del_tx(&txid, true)? + .expect("We must have the transaction at this stage") + .txid; + debug!("Deleting existing transaction : txid: {}", txid); + } + + database.commit_batch(updates)?; + + Ok(()) +} + +/// Structure containing the nakamoto client handle and event receiver. +pub struct CbfBlockchain { + /// A Receiver of all client related events + receiver: Receiver, + /// The handler for the client + client_handle: ClientHandle, + /// Internal query timeout. Set by default at 60 secs. + timeout: Duration, + /// A store of blockheight and fee estimation + fee_data: Cell>, + /// A local cache of unconfirmed broadcasted transactions. + /// Used to sync wallet db with known unconfirmed txs. Cleared at each `wallet.sync()` call. + broadcasted_txs: Cell>, + /// Last Sync height + last_sync_height: Cell, + // A Hack to handle reorg sync cases. + #[cfg(test)] + break_sync_at: Cell>, +} + +/// Nakamoto CBF Client Configuration +#[derive(Debug, Clone, PartialEq, Eq, serde::Deserialize, serde::Serialize)] +pub struct CBFBlockchainConfig { + /// Network for the CBF client + pub network: bitcoin::Network, + /// Optional custom datadir for CBF Client + pub datadir: Option, + /// Initial Peer List + pub peers: Vec, +} + +impl CbfBlockchain { + /// Create a new CBF node which runs at the background. + pub fn new( + network: bitcoin::Network, + datadir: Option, + peers: Vec, + ) -> Result { + let root = if let Some(dir) = datadir { + dir + } else { + PathBuf::from(std::env::var("HOME").unwrap_or_default()) + }; + let cbf_client = Client::::new()?; + let client_cfg = Config { + network: network.into(), + listen: vec![], // Don't listen for incoming connections. + root, + ..Config::default() + }; + + let client_handle = cbf_client.handle(); + thread::spawn(|| { + cbf_client.run(client_cfg).unwrap(); + }); + let receiver = client_handle.events(); + + for peer in peers { + let peer_link = client_handle + .connect(peer) + .map_err(nakamoto::client::Error::from)?; + debug!("New peer connected : {:?}", peer_link); + } + + Ok(Self { + receiver, + client_handle, + timeout: Duration::from_secs(60), // This is nakamoto default client timeout + fee_data: Cell::new(HashMap::new()), + broadcasted_txs: Cell::new(Vec::new()), + last_sync_height: Cell::new(0u32), + #[cfg(test)] + break_sync_at: Cell::new(None), + }) + } + + /// Scan the filters from a height, for a given list of scripts + pub fn scan(&self, from: u32, scripts: Vec