From 57112af45345b7419c226b274ee3ae89580d6b27 Mon Sep 17 00:00:00 2001 From: rajarshimaitra Date: Sat, 8 Oct 2022 22:52:42 +0530 Subject: [PATCH 1/6] Add nakamoto blockchain --- Cargo.toml | 7 +- src/blockchain/compact_filters/mod.rs | 2 + src/blockchain/compact_filters/nakamoto.rs | 689 +++++++++++++++++++++ src/error.rs | 13 + 4 files changed, 710 insertions(+), 1 deletion(-) create mode 100644 src/blockchain/compact_filters/nakamoto.rs diff --git a/Cargo.toml b/Cargo.toml index 18fcfef62..139d7a66c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ bitcoin = { version = "0.29.1", features = ["serde", "base64", "rand"] } serde = { version = "^1.0", features = ["derive"] } serde_json = { version = "^1.0" } rand = "^0.8" +thiserror = "1.0" # Optional dependencies sled = { version = "0.34", optional = true } @@ -39,6 +40,9 @@ bitcoinconsensus = { version = "0.19.0-3", optional = true } # Needed by bdk_blockchain_tests macro and the `rpc` feature bitcoincore-rpc = { version = "0.16", optional = true } +# Nakamoto crates for the CBF blockchain +nakamoto = { git = "https://github.com/cloudhead/nakamoto.git", rev = "14902d3d5cc9442cdda82419f0f24277d86b243b", optional = true } + # Platform-specific dependencies [target.'cfg(not(target_arch = "wasm32"))'.dependencies] tokio = { version = "1", features = ["rt", "macros"] } @@ -55,7 +59,7 @@ verify = ["bitcoinconsensus"] default = ["key-value-db", "electrum"] sqlite = ["rusqlite", "ahash"] sqlite-bundled = ["sqlite", "rusqlite/bundled"] -compact_filters = ["rocksdb", "socks", "cc"] +compact_filters = ["rocksdb", "socks", "cc", "nakamoto"] key-value-db = ["sled"] all-keys = ["keys-bip39"] keys-bip39 = ["bip39"] @@ -93,6 +97,7 @@ reqwest-default-tls = ["esplora-client/async-https"] test-blockchains = ["bitcoincore-rpc", "electrum-client"] test-electrum = ["electrum", "electrsd/electrs_0_8_10", "electrsd/bitcoind_22_0", "test-blockchains"] test-rpc = ["rpc", "electrsd/electrs_0_8_10", "electrsd/bitcoind_22_0", "test-blockchains"] +test-cbf = ["compact_filters", "electrsd/electrs_0_8_10", "electrsd/bitcoind_22_0", "test-blockchains"] test-rpc-legacy = ["rpc", "electrsd/electrs_0_8_10", "electrsd/bitcoind_0_20_0", "test-blockchains"] test-esplora = ["electrsd/legacy", "electrsd/esplora_a33e97e1", "electrsd/bitcoind_22_0", "test-blockchains"] test-md-docs = ["electrum"] diff --git a/src/blockchain/compact_filters/mod.rs b/src/blockchain/compact_filters/mod.rs index 7845d513d..d88d59620 100644 --- a/src/blockchain/compact_filters/mod.rs +++ b/src/blockchain/compact_filters/mod.rs @@ -68,6 +68,8 @@ mod peer; mod store; mod sync; +pub mod nakamoto; + use crate::blockchain::*; use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils}; use crate::error::Error; diff --git a/src/blockchain/compact_filters/nakamoto.rs b/src/blockchain/compact_filters/nakamoto.rs new file mode 100644 index 000000000..6bd544862 --- /dev/null +++ b/src/blockchain/compact_filters/nakamoto.rs @@ -0,0 +1,689 @@ +// Bitcoin Dev Kit +// Written in 2020 by Alekos Filini +// +// Copyright (c) 2020-2021 Bitcoin Dev Kit Developers +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + +//! This is Compact Filter (BIP157) Blockchain implementation using the nakamoto crate. +//! https://github.com/cloudhead/nakamoto +//! +//! This module implements BDK's [crate::blockchain::Blockchain] trait over a nakamoto +//! client. Which is it's own state machine that maintains the compact filter database +//! and interface with the Bitcoin p2p network using the BIP157/BIP158 protocol. +#![allow(dead_code)] + +use crate::blockchain::{ + Blockchain, Capability, ConfigurableBlockchain, GetBlockHash, GetHeight, GetTx, WalletSync, +}; +use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils}; +use crate::{BlockTime, FeeRate, KeychainKind, LocalUtxo, TransactionDetails}; +use bitcoin::{Block, OutPoint, Script, Transaction, Txid}; +use log::{debug, info}; +use nakamoto::{ + client::{ + chan::Receiver, handle::Handle, protocol, protocol::fees::FeeEstimate, spv::TxStatus, + Client, Config, Event, Handle as ClientHandle, + }, + net::poll::Waker, +}; +use std::cell::Cell; +use std::collections::{HashMap, HashSet}; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::time::Duration; +use std::{net::TcpStream, thread}; +use thiserror::Error; + +type Reactor = nakamoto::net::poll::Reactor; + +/// An Error that occurred during the compact block filter operation. +#[derive(Debug, Error)] +pub enum CbfError { + /// Global BDK error + #[error(transparent)] + Global(#[from] Box), + + /// Nakamoto client error + #[error(transparent)] + Nakamoto(#[from] nakamoto::client::error::Error), +} + +impl From for CbfError { + fn from(err: crate::error::Error) -> Self { + CbfError::Global(Box::new(err)) + } +} + +/// Process a transaction by looking for inputs that spend from a UTXO in the database or +/// outputs that send funds to a know script_pubkey. +fn add_tx( + database: &mut impl BatchDatabase, + tx: &Transaction, + height: Option, + timestamp: Option, + internal_max_deriv: &mut Option, + external_max_deriv: &mut Option, +) -> Result<(), CbfError> { + let mut updates = database.begin_batch(); + + let mut incoming: u64 = 0; + let mut outgoing: u64 = 0; + let mut inputs_sum: u64 = 0; + let mut outputs_sum: u64 = 0; + + // Sanity check for conflicts + let db_txs = database.iter_raw_txs()?; + let conflicts = db_txs + .iter() + .filter_map(|existing_tx| { + let existing_inputs = existing_tx + .input + .iter() + .map(|txin| txin.previous_output) + .collect::>(); + let current_inputs = tx + .input + .iter() + .map(|txin| txin.previous_output) + .collect::>(); + if !current_inputs + .intersection(&existing_inputs) + .collect::>() + .is_empty() + && existing_tx.txid() != tx.txid() + { + Some(existing_tx) + } else { + None + } + }) + .collect::>(); + + for conflict in conflicts { + debug!( + "{} Conflicts with existing tx {}", + tx.txid(), + conflict.txid() + ); + // TODO: Make more intelligent checking following RBF rules. + if conflict.is_explicitly_rbf() { + debug!("{} Replaces {} via RBF", tx.txid(), conflict.txid()); + // Assuming the latest RBF transaction is the right transaction to keep. + // Delete the existing one. + delete_tx(database, &conflict.txid())?; + } else { + debug!( + "{} is non-rbf and conflicts with {}. Rejected", + tx.txid(), + conflict.txid() + ); + return Ok(()); + } + } + + // Process transaction inputs + for input in tx.input.iter() { + if let Some(previous_output) = database.get_previous_output(&input.previous_output)? { + inputs_sum += previous_output.value; + + // this output is ours, we have a path to derive it + if let Some((keychain, _)) = + database.get_path_from_script_pubkey(&previous_output.script_pubkey)? + { + outgoing += previous_output.value; + + debug!("{} is mine, setting utxo as spent", input.previous_output); + updates.set_utxo(&LocalUtxo { + outpoint: input.previous_output, + txout: previous_output.clone(), + keychain, + is_spent: true, + })?; + } + } + } + + // Process transaction outputs + for (i, output) in tx.output.iter().enumerate() { + outputs_sum += output.value; + + // When the output is ours + if let Some((keychain, child)) = + database.get_path_from_script_pubkey(&output.script_pubkey)? + { + debug!("{} output #{} is mine, adding utxo", tx.txid(), i); + updates.set_utxo(&LocalUtxo { + outpoint: OutPoint::new(tx.txid(), i as u32), + txout: output.clone(), + keychain, + is_spent: false, + })?; + incoming += output.value; + + if keychain == KeychainKind::Internal + && (internal_max_deriv.is_none() || child > internal_max_deriv.unwrap_or(0)) + { + *internal_max_deriv = Some(child); + } else if keychain == KeychainKind::External + && (external_max_deriv.is_none() || child > external_max_deriv.unwrap_or(0)) + { + *external_max_deriv = Some(child); + } + } + } + + if incoming > 0 || outgoing > 0 { + let tx = TransactionDetails { + txid: tx.txid(), + transaction: Some(tx.clone()), + received: incoming, + sent: outgoing, + confirmation_time: BlockTime::new(height, timestamp), + fee: Some(inputs_sum.saturating_sub(outputs_sum)), + }; + + debug!("Saving tx {}", tx.txid); + updates.set_tx(&tx)?; + } + + database.commit_batch(updates)?; + + Ok(()) +} + +/// Mark as transaction as unconfirmed in the database. +/// This is called in block reorg cases, where previously confirmed transactions becomes unconfirmed. +fn unconfirm_tx( + database: &mut impl BatchDatabase, + txid: &Txid, +) -> Result { + let mut tx_details = database + .get_tx(&txid, false)? + .expect("We must have the transaction at this stage"); + tx_details.confirmation_time = None; + database.set_tx(&tx_details)?; + debug!("Existing transaction marked unconfirmed : {}", txid); + Ok(tx_details) +} + +/// Delete a Transaction from the database, updating related UTXO records. +/// This is called in a a RBF case. +fn delete_tx(database: &mut impl BatchDatabase, txid: &Txid) -> Result<(), CbfError> { + let mut updates = database.begin_batch(); + + if let Some(db_tx) = database.get_raw_tx(txid)? { + let txid = db_tx.txid(); + // Mark input utxos as unspent + for input in db_tx.input { + let prev_output = input.previous_output; + if let Some(mut existing_utxo) = database.get_utxo(&prev_output)? { + debug!("Setting UTXO : {} as `unspent`", existing_utxo.outpoint); + existing_utxo.is_spent = false; + updates.set_utxo(&existing_utxo)?; + } + } + + // Delete output utxos + for (i, _) in db_tx.output.iter().enumerate() { + let outpoint = OutPoint::new(txid, i as u32); + if let Some(deleted_utxo) = updates.del_utxo(&outpoint)? { + debug!("Deleting Existing UTXO {}", deleted_utxo.outpoint); + } + } + + // Delete the transaction data + let _ = database + .del_tx(&txid, true)? + .expect("We must have the transaction at this stage") + .txid; + debug!("Deleting existing transaction : txid: {}", txid); + } + + database.commit_batch(updates)?; + + Ok(()) +} + +/// Structure containing the nakamoto client handle and event receiver. +pub struct CbfBlockchain { + /// A Receiver of all client related events + receiver: Receiver, + /// The handler for the client + client_handle: ClientHandle, + /// Internal query timeout. Set by default at 60 secs. + timeout: Duration, + /// A store of blockheight and fee estimation + fee_data: Cell>, + /// A local cache of unconfirmed broadcasted transactions. + /// Used to sync wallet db with known unconfirmed txs. Cleared at each `wallet.sync()` call. + broadcasted_txs: Cell>, + /// Last Sync height + last_sync_height: Cell, + // A Hack to handle reorg sync cases. + #[cfg(test)] + break_sync_at: Cell>, +} + +/// Nakamoto CBF Client Configuration +#[derive(Debug, Clone, PartialEq, Eq, serde::Deserialize, serde::Serialize)] +pub struct CBFBlockchainConfig { + /// Network for the CBF client + pub network: bitcoin::Network, + /// Optional custom datadir for CBF Client + pub datadir: Option, +} + +impl CbfBlockchain { + /// Create a new CBF node which runs at the background. + pub fn new(network: bitcoin::Network, datadir: Option) -> Result { + let root = if let Some(dir) = datadir { + dir + } else { + PathBuf::from(std::env::var("HOME").unwrap_or_default()) + }; + let cbf_client = Client::::new()?; + let client_cfg = Config { + listen: vec![], // Don't listen for incoming connections. + root, + protocol: protocol::Config { + network: network.into(), + ..protocol::Config::default() + }, + ..Config::default() + }; + + let client_handle = cbf_client.handle(); + thread::spawn(|| { + cbf_client.run(client_cfg).unwrap(); + }); + let receiver = client_handle.subscribe(); + + Ok(Self { + receiver, + client_handle, + timeout: Duration::from_secs(60), // This is nakamoto default client timeout + fee_data: Cell::new(HashMap::new()), + broadcasted_txs: Cell::new(Vec::new()), + last_sync_height: Cell::new(0u32), + #[cfg(test)] + break_sync_at: Cell::new(None), + }) + } + + /// Scan the filters from a height, for a given list of scripts + pub fn scan(&self, from: u32, scripts: Vec