From e58dfb6c3348dd2152d626ec846eb3b30a78fcd4 Mon Sep 17 00:00:00 2001 From: matthiasdebernardini Date: Tue, 13 Aug 2024 13:36:29 +0200 Subject: [PATCH 1/2] added sqlx, removed rusqlite --- crates/chain/Cargo.toml | 8 +- crates/chain/src/lib.rs | 8 +- crates/chain/src/persist.rs | 11 +- crates/chain/src/sqlx_impl.rs | 331 ++++++++++++++++++ crates/wallet/Cargo.toml | 8 +- crates/wallet/src/lib.rs | 8 +- crates/wallet/src/wallet/changeset.rs | 242 +++++++++---- crates/wallet/src/wallet/params.rs | 3 + crates/wallet/src/wallet/persisted.rs | 196 ++++++++--- .../00_create_wallet_tables.sql | 7 + .../wallet_esplora_async/Cargo.toml | 2 +- .../wallet_esplora_async/src/main.rs | 6 +- 12 files changed, 685 insertions(+), 145 deletions(-) create mode 100644 crates/chain/src/sqlx_impl.rs create mode 100644 crates/wallet/src/wallet_migrations/00_create_wallet_tables.sql diff --git a/crates/chain/Cargo.toml b/crates/chain/Cargo.toml index 7261bdfa2..88d451e5f 100644 --- a/crates/chain/Cargo.toml +++ b/crates/chain/Cargo.toml @@ -21,7 +21,10 @@ hashbrown = { version = "0.9.1", optional = true, features = ["serde"] } miniscript = { version = "12.0.0", optional = true, default-features = false } # Feature dependencies -rusqlite = { version = "0.31.0", features = ["bundled"], optional = true } +#rusqlite = { version = "0.31.0", features = ["bundled"], optional = true } +sqlx = { version = "0.7.4", features = ["migrate", "runtime-tokio-rustls", "postgres", "json", "chrono", "uuid"] , optional = true} +async-trait = "0.1.81" + serde_json = {version = "1", optional = true } [dev-dependencies] @@ -32,4 +35,5 @@ proptest = "1.2.0" default = ["std", "miniscript"] std = ["bitcoin/std", "miniscript?/std"] serde = ["dep:serde", "bitcoin/serde", "miniscript?/serde"] -rusqlite = ["std", "dep:rusqlite", "serde", "serde_json"] +#rusqlite = ["std", "dep:rusqlite", "serde", "serde_json"] +sqlx = ["std", "dep:sqlx", "serde", "serde_json"] diff --git a/crates/chain/src/lib.rs b/crates/chain/src/lib.rs index 67e2aea15..fdd106ef5 100644 --- a/crates/chain/src/lib.rs +++ b/crates/chain/src/lib.rs @@ -55,15 +55,15 @@ mod spk_iter; pub use indexer::keychain_txout; #[cfg(feature = "miniscript")] pub use spk_iter::*; -#[cfg(feature = "rusqlite")] -pub mod rusqlite_impl; pub mod spk_client; +#[cfg(feature = "sqlx")] +pub mod sqlx_impl; #[allow(unused_imports)] #[macro_use] extern crate alloc; -#[cfg(feature = "rusqlite")] -pub extern crate rusqlite; +#[cfg(feature = "sqlx")] +pub extern crate sqlx; #[cfg(feature = "serde")] pub extern crate serde; diff --git a/crates/chain/src/persist.rs b/crates/chain/src/persist.rs index 2ec88f636..8ae4c9323 100644 --- a/crates/chain/src/persist.rs +++ b/crates/chain/src/persist.rs @@ -45,9 +45,10 @@ pub trait PersistWith: Staged + Sized { ) -> Result<(), Self::PersistError>; } -type FutureResult<'a, T, E> = Pin> + Send + 'a>>; +pub type FutureResult<'a, T, E> = Pin> + Send + 'a>>; /// Trait that persists the type with an async `Db`. +#[async_trait::async_trait] pub trait PersistAsyncWith: Staged + Sized { /// Parameters for [`PersistAsyncWith::create`]. type CreateParams; @@ -61,16 +62,16 @@ pub trait PersistAsyncWith: Staged + Sized { type PersistError; /// Initialize the `Db` and create `Self`. - fn create(db: &mut Db, params: Self::CreateParams) -> FutureResult; + async fn create(db: &mut Db, params: Self::CreateParams) -> Result; /// Initialize the `Db` and load a previously-persisted `Self`. - fn load(db: &mut Db, params: Self::LoadParams) -> FutureResult, Self::LoadError>; + async fn load(db: &mut Db, params: Self::LoadParams) -> Result, Self::LoadError>; /// Persist changes to the `Db`. - fn persist<'a>( + async fn persist<'a>( db: &'a mut Db, changeset: &'a ::ChangeSet, - ) -> FutureResult<'a, (), Self::PersistError>; + ) -> Result<(), Self::PersistError>; } /// Represents a persisted `T`. diff --git a/crates/chain/src/sqlx_impl.rs b/crates/chain/src/sqlx_impl.rs new file mode 100644 index 000000000..0bb8fa8ea --- /dev/null +++ b/crates/chain/src/sqlx_impl.rs @@ -0,0 +1,331 @@ +//! Module for stuff + +use crate::*; +use core::str::FromStr; + +use alloc::{string::ToString, sync::Arc, vec::Vec}; +use std::prelude::rust_2021::String; +use bitcoin::consensus::{Decodable, Encodable}; +// use rusqlite; +use sqlx; +use sqlx::{Acquire, migrate, Postgres, Row}; +use sqlx::migrate::MigrateError; +use sqlx::postgres::PgRow; +// use rusqlite::named_params; +// use rusqlite::types::{FromSql, FromSqlError, FromSqlResult, ToSql, ToSqlOutput, ValueRef}; +// use rusqlite::OptionalExtension; +// use rusqlite::Transaction; + +/// Table name for schemas. +pub const SCHEMAS_TABLE_NAME: &str = "bdk_schemas"; + + +impl tx_graph::ChangeSet +where + A: Anchor + Clone + Ord + serde::Serialize + serde::de::DeserializeOwned, +{ + /// Schema name for [`tx_graph::ChangeSet`]. + pub const SCHEMA_NAME: &'static str = "bdk_txgraph"; + /// Name of table that stores full transactions and `last_seen` timestamps. + pub const TXS_TABLE_NAME: &'static str = "bdk_txs"; + /// Name of table that stores floating txouts. + pub const TXOUTS_TABLE_NAME: &'static str = "bdk_txouts"; + /// Name of table that stores [`Anchor`]s. + pub const ANCHORS_TABLE_NAME: &'static str = "bdk_anchors"; + + /// Initialize sqlite tables. + async fn init_postgres_tables(db_tx: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<(), MigrateError> { + // let schema_v0: &[&str] = &[ + // // full transactions + // &format!( + // "CREATE TABLE IF NOT EXISTS {} ( \ + // txid TEXT PRIMARY KEY NOT NULL, \ + // raw_tx BYTEA, \ + // last_seen BIGINT \ + // )", + // Self::TXS_TABLE_NAME, + // ), + // // floating txouts + // &format!( + // "CREATE TABLE IF NOT EXISTS {} ( \ + // txid TEXT NOT NULL, \ + // vout INTEGER NOT NULL, \ + // value BIGINT NOT NULL, \ + // script BYTEA NOT NULL, \ + // PRIMARY KEY (txid, vout) \ + // )", + // Self::TXOUTS_TABLE_NAME, + // ), + // // anchors + // &format!( + // "CREATE TABLE IF NOT EXISTS {} ( \ + // txid TEXT NOT NULL REFERENCES {} (txid), \ + // block_height INTEGER NOT NULL, \ + // block_hash TEXT NOT NULL, \ + // anchor JSONB NOT NULL, \ + // PRIMARY KEY (txid, block_height, block_hash) \ + // )", + // Self::ANCHORS_TABLE_NAME, + // Self::TXS_TABLE_NAME, + // ), + // ]; + // + // // migrate_schema(db_tx, Self::SCHEMA_NAME, &[schema_v0]) + // Ok(()) + // let db = db_tx.acquire().await.unwrap(); + migrate!("./tx_graph_migrations").run(db_tx).await + + + } + + pub async fn from_postgres(db_tx: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> sqlx::Result { + Self::init_postgres_tables(db_tx).await?; + + let mut changeset = Self::default(); + + let rows: Vec = sqlx::query(&format!( + "SELECT txid, raw_tx, last_seen FROM {}", + Self::TXS_TABLE_NAME, + )) + .fetch_all(&mut **db_tx) + .await?; + + for row in rows { + // let txid: bitcoin::Txid = roow.get("txid"); + let txid_str: String = row.get("txid"); + let txid = bitcoin::Txid::from_str(&txid_str).expect("Invalid txid"); let raw_tx: Option> = row.get("raw_tx"); + let last_seen: Option = row.get("last_seen"); + + if let Some(tx_bytes) = raw_tx { + if let Ok(tx) = bitcoin::Transaction::consensus_decode(&mut tx_bytes.as_slice()) { + changeset.txs.insert(Arc::new(tx)); + } + } + if let Some(last_seen) = last_seen { + changeset.last_seen.insert(txid, last_seen as u64); + } + } + + let rows: Vec = sqlx::query(&format!( + "SELECT txid, vout, value, script FROM {}", + Self::TXOUTS_TABLE_NAME, + )) + .fetch_all(&mut **db_tx) + .await?; + + for row in rows { + // let txid: bitcoin::Txid = roow.get("txid"); + let txid_str: String = row.get("txid"); + let txid = bitcoin::Txid::from_str(&txid_str).expect("Invalid txid"); + let vout: i32 = row.get("vout"); + let value: i64 = row.get("value"); + let script: Vec = row.get("script"); + + changeset.txouts.insert( + bitcoin::OutPoint { txid, vout: vout as u32 }, + bitcoin::TxOut { + value: bitcoin::Amount::from_sat(value as u64), + script_pubkey: bitcoin::ScriptBuf::from(script), + }, + ); + } + + let rows: Vec= sqlx::query(&format!( + "SELECT anchor, txid FROM {}", + Self::ANCHORS_TABLE_NAME, + )) + .fetch_all(&mut **db_tx) + .await?; + + for row in rows { + let anchor: serde_json::Value = row.get("anchor"); + // let txid: bitcoin::Txid = roow.get("txid"); + let txid_str: String = row.get("txid"); + let txid = bitcoin::Txid::from_str(&txid_str).expect("Invalid txid"); + + if let Ok(anchor) = serde_json::from_value::(anchor) { + changeset.anchors.insert((anchor, txid)); + } + } + + Ok(changeset) + } + + pub async fn persist_to_postgres(&self, db_tx: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> sqlx::Result<()> { + Self::init_postgres_tables(db_tx).await?; + + for tx in &self.txs { + sqlx::query(&format!( + "INSERT INTO {} (txid, raw_tx) VALUES ($1, $2) ON CONFLICT (txid) DO UPDATE SET raw_tx = $2", + Self::TXS_TABLE_NAME, + )) + .bind(tx.compute_txid().to_string()) + .bind(bitcoin::consensus::serialize(tx.as_ref())) + .execute(&mut **db_tx) + .await?; + } + + for (&txid, &last_seen) in &self.last_seen { + sqlx::query(&format!( + "INSERT INTO {} (txid, last_seen) VALUES ($1, $2) ON CONFLICT (txid) DO UPDATE SET last_seen = $2", + Self::TXS_TABLE_NAME, + )) + .bind(txid.to_string()) + .bind(last_seen as i64) + .execute(&mut **db_tx) + .await?; + } + + for (op, txo) in &self.txouts { + sqlx::query(&format!( + "INSERT INTO {} (txid, vout, value, script) VALUES ($1, $2, $3, $4) ON CONFLICT (txid, vout) DO UPDATE SET value = $3, script = $4", + Self::TXOUTS_TABLE_NAME, + )) + .bind(op.txid.to_string()) + .bind(op.vout as i32) + .bind(txo.value.to_sat() as i64) + .bind(txo.script_pubkey.as_bytes()) + .execute(&mut **db_tx) + .await?; + } + + for (anchor, txid) in &self.anchors { + let anchor_block = anchor.anchor_block(); + sqlx::query(&format!( + "INSERT INTO {} (txid, block_height, block_hash, anchor) VALUES ($1, $2, $3, $4) ON CONFLICT (txid, block_height, block_hash) DO UPDATE SET anchor = $4", + Self::ANCHORS_TABLE_NAME, + )) + .bind(txid.to_string()) + .bind(anchor_block.height as i32) + .bind(anchor_block.hash.to_string()) + .bind(serde_json::to_value(anchor).unwrap()) + .execute(&mut **db_tx) + .await?; + } + + Ok(()) + } +} + +impl local_chain::ChangeSet { + /// Schema name for the changeset. + pub const SCHEMA_NAME: &'static str = "bdk_localchain"; + /// Name of sqlite table that stores blocks of [`LocalChain`](local_chain::LocalChain). + pub const BLOCKS_TABLE_NAME: &'static str = "bdk_blocks"; + + /// Initialize sqlite tables for persisting [`local_chain::LocalChain`]. + async fn init_postgres_tables(db_tx: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<(), MigrateError> { + sqlx::migrate!("./local_chain_migrations").run(&mut **db_tx).await + } + + pub async fn from_postgres(db_tx: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> sqlx::Result { + Self::init_postgres_tables(db_tx).await?; + + let mut changeset = Self::default(); + + let rows = sqlx::query(&format!( + "SELECT block_height, block_hash FROM {}", + Self::BLOCKS_TABLE_NAME, + )) + .fetch_all(&mut **db_tx) + .await?; + + for row in rows { + let height: i32 = row.get("block_height"); + let hash: String = row.get("block_hash"); + if let Ok(block_hash) = bitcoin::BlockHash::from_str(&hash) { + changeset.blocks.insert(height as u32, Some(block_hash)); + } + } + + Ok(changeset) + } + + pub async fn persist_to_postgres(&self, db_tx: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> sqlx::Result<()> { + Self::init_postgres_tables(db_tx).await?; + + for (&height, &hash) in &self.blocks { + match hash { + Some(hash) => { + sqlx::query(&format!( + "INSERT INTO {} (block_height, block_hash) VALUES ($1, $2) ON CONFLICT (block_height) DO UPDATE SET block_hash = $2", + Self::BLOCKS_TABLE_NAME, + )) + .bind(height as i32) + .bind(hash.to_string()) + .execute(&mut **db_tx) + .await?; + }, + None => { + sqlx::query(&format!( + "DELETE FROM {} WHERE block_height = $1", + Self::BLOCKS_TABLE_NAME, + )) + .bind(height as i32) + .execute(&mut **db_tx) + .await?; + }, + } + } + + Ok(()) + } +} + +#[cfg(feature = "miniscript")] +impl keychain_txout::ChangeSet { + /// Schema name for the changeset. + pub const SCHEMA_NAME: &'static str = "bdk_keychaintxout"; + /// Name for table that stores last revealed indices per descriptor id. + pub const LAST_REVEALED_TABLE_NAME: &'static str = "bdk_descriptor_last_revealed"; + + /// Initialize PostgreSQL tables for persisting + /// [`KeychainTxOutIndex`](keychain_txout::KeychainTxOutIndex). + async fn init_postgres_tables(db_tx: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<(), MigrateError> { + sqlx::migrate!("./keychain_migrations").run(&mut **db_tx).await + + } + + /// Construct [`KeychainTxOutIndex`](keychain_txout::KeychainTxOutIndex) from PostgreSQL database + /// and given parameters. + pub async fn from_postgres(db_tx: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> sqlx::Result { + Self::init_postgres_tables(db_tx).await?; + + let mut changeset = Self::default(); + + let rows = sqlx::query(&format!( + "SELECT descriptor_id, last_revealed FROM {}", + Self::LAST_REVEALED_TABLE_NAME, + )) + .fetch_all(&mut **db_tx) + .await?; + + for row in rows { + let descriptor_id: String = row.get("descriptor_id"); + let last_revealed: i64 = row.get("last_revealed"); + + if let Ok(descriptor_id) = DescriptorId::from_str(&descriptor_id) { + changeset.last_revealed.insert(descriptor_id, last_revealed as u32); + } + } + + Ok(changeset) + } + /// Persist `changeset` to the PostgreSQL database. + pub async fn persist_to_postgres(&self, db_tx: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> sqlx::Result<()> { + Self::init_postgres_tables(db_tx).await?; + + for (&descriptor_id, &last_revealed) in &self.last_revealed { + sqlx::query(&format!( + "INSERT INTO {} (descriptor_id, last_revealed) VALUES ($1, $2) ON CONFLICT (descriptor_id) DO UPDATE SET last_revealed = $2", + Self::LAST_REVEALED_TABLE_NAME, + )) + .bind(descriptor_id.to_string()) + .bind(last_revealed as i64) + .execute(&mut **db_tx) + .await?; + } + + Ok(()) + } +} diff --git a/crates/wallet/Cargo.toml b/crates/wallet/Cargo.toml index 44314a968..5b98d6f7d 100644 --- a/crates/wallet/Cargo.toml +++ b/crates/wallet/Cargo.toml @@ -20,7 +20,7 @@ serde = { version = "^1.0", features = ["derive"] } serde_json = { version = "^1.0" } bdk_chain = { path = "../chain", version = "0.17.0", features = ["miniscript", "serde"], default-features = false } bdk_file_store = { path = "../file_store", version = "0.14.0", optional = true } - +async-trait = "0.1.81" # Optional dependencies bip39 = { version = "2.0", optional = true } @@ -30,15 +30,15 @@ std = ["bitcoin/std", "bitcoin/rand-std", "miniscript/std", "bdk_chain/std"] compiler = ["miniscript/compiler"] all-keys = ["keys-bip39"] keys-bip39 = ["bip39"] -rusqlite = ["bdk_chain/rusqlite"] +sqlx = ["bdk_chain/sqlx"] file_store = ["bdk_file_store"] [dev-dependencies] lazy_static = "1.4" assert_matches = "1.5.0" tempfile = "3" -bdk_chain = { path = "../chain", features = ["rusqlite"] } -bdk_wallet = { path = ".", features = ["rusqlite", "file_store"] } +bdk_chain = { path = "../chain", features = ["sqlx"] } +bdk_wallet = { path = ".", features = ["sqlx", "file_store"] } bdk_file_store = { path = "../file_store" } anyhow = "1" rand = "^0.8" diff --git a/crates/wallet/src/lib.rs b/crates/wallet/src/lib.rs index 40167a396..fb8660250 100644 --- a/crates/wallet/src/lib.rs +++ b/crates/wallet/src/lib.rs @@ -32,10 +32,10 @@ mod types; mod wallet; pub(crate) use bdk_chain::collections; -#[cfg(feature = "rusqlite")] -pub use bdk_chain::rusqlite; -#[cfg(feature = "rusqlite")] -pub use bdk_chain::rusqlite_impl; +#[cfg(feature = "sqlx")] +pub use bdk_chain::sqlx; +#[cfg(feature = "sqlx")] +pub use bdk_chain::sqlx_impl; pub use descriptor::template; pub use descriptor::HdKeyPaths; pub use signer; diff --git a/crates/wallet/src/wallet/changeset.rs b/crates/wallet/src/wallet/changeset.rs index 46b2f4321..5043e2b6d 100644 --- a/crates/wallet/src/wallet/changeset.rs +++ b/crates/wallet/src/wallet/changeset.rs @@ -1,7 +1,11 @@ +use core::str::FromStr; +use std::prelude::rust_2021::{String, ToString}; use bdk_chain::{ indexed_tx_graph, keychain_txout, local_chain, tx_graph, ConfirmationBlockTime, Merge, }; use miniscript::{Descriptor, DescriptorPublicKey}; +use chain::sqlx; +use chain::sqlx::{migrate, Row}; type IndexedTxGraphChangeSet = indexed_tx_graph::ChangeSet; @@ -65,112 +69,204 @@ impl Merge for ChangeSet { } } -#[cfg(feature = "rusqlite")] +#[cfg(feature = "sqlx")] impl ChangeSet { /// Schema name for wallet. pub const WALLET_SCHEMA_NAME: &'static str = "bdk_wallet"; /// Name of table to store wallet descriptors and network. pub const WALLET_TABLE_NAME: &'static str = "bdk_wallet"; - /// Initialize sqlite tables for wallet schema & table. - fn init_wallet_sqlite_tables( - db_tx: &chain::rusqlite::Transaction, - ) -> chain::rusqlite::Result<()> { - let schema_v0: &[&str] = &[&format!( - "CREATE TABLE {} ( \ - id INTEGER PRIMARY KEY NOT NULL CHECK (id = 0), \ - descriptor TEXT, \ - change_descriptor TEXT, \ - network TEXT \ - ) STRICT;", - Self::WALLET_TABLE_NAME, - )]; - crate::rusqlite_impl::migrate_schema(db_tx, Self::WALLET_SCHEMA_NAME, &[schema_v0]) + /// Initialize PostgreSQL tables for wallet schema & table. + async fn init_wallet_postgres_tables( + db_tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + ) -> sqlx::Result<()> { + + sqlx::migrate!("/Users/matthiasdebernardini/Projects/bdk/crates/wallet/src/wallet_migrations").run(db_tx).await } - /// Recover a [`ChangeSet`] from sqlite database. - pub fn from_sqlite(db_tx: &chain::rusqlite::Transaction) -> chain::rusqlite::Result { - Self::init_wallet_sqlite_tables(db_tx)?; - use chain::rusqlite::OptionalExtension; - use chain::Impl; + /// Recover a [`ChangeSet`] from PostgreSQL database. + pub async fn from_postgres(db_tx: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> sqlx::Result { + Self::init_wallet_postgres_tables(db_tx).await?; let mut changeset = Self::default(); - let mut wallet_statement = db_tx.prepare(&format!( + let row = sqlx::query(&format!( "SELECT descriptor, change_descriptor, network FROM {}", Self::WALLET_TABLE_NAME, - ))?; - let row = wallet_statement - .query_row([], |row| { - Ok(( - row.get::<_, Impl>>("descriptor")?, - row.get::<_, Impl>>("change_descriptor")?, - row.get::<_, Impl>("network")?, - )) - }) - .optional()?; - if let Some((Impl(desc), Impl(change_desc), Impl(network))) = row { - changeset.descriptor = Some(desc); - changeset.change_descriptor = Some(change_desc); - changeset.network = Some(network); + )) + .fetch_optional(&mut **db_tx) + .await?; + + if let Some(row) = row { + changeset.descriptor = row.get::, _>("descriptor") + .and_then(|s| Descriptor::::from_str(&s).ok()); + changeset.change_descriptor = row.get::, _>("change_descriptor") + .and_then(|s| Descriptor::::from_str(&s).ok()); + changeset.network = row.get::, _>("network") + .and_then(|s| bitcoin::Network::from_str(&s).ok()); } - changeset.local_chain = local_chain::ChangeSet::from_sqlite(db_tx)?; - changeset.tx_graph = tx_graph::ChangeSet::<_>::from_sqlite(db_tx)?; - changeset.indexer = keychain_txout::ChangeSet::from_sqlite(db_tx)?; + changeset.local_chain = local_chain::ChangeSet::from_postgres(db_tx).await?; + changeset.tx_graph = tx_graph::ChangeSet::<_>::from_postgres(db_tx).await?; + changeset.indexer = keychain_txout::ChangeSet::from_postgres(db_tx).await?; Ok(changeset) } - /// Persist [`ChangeSet`] to sqlite database. - pub fn persist_to_sqlite( + /// Persist [`ChangeSet`] to PostgreSQL database. + pub async fn persist_to_postgres( &self, - db_tx: &chain::rusqlite::Transaction, - ) -> chain::rusqlite::Result<()> { - Self::init_wallet_sqlite_tables(db_tx)?; - use chain::rusqlite::named_params; - use chain::Impl; - - let mut descriptor_statement = db_tx.prepare_cached(&format!( - "INSERT INTO {}(id, descriptor) VALUES(:id, :descriptor) ON CONFLICT(id) DO UPDATE SET descriptor=:descriptor", - Self::WALLET_TABLE_NAME, - ))?; + db_tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + ) -> sqlx::Result<()> { + Self::init_wallet_postgres_tables(db_tx).await?; + if let Some(descriptor) = &self.descriptor { - descriptor_statement.execute(named_params! { - ":id": 0, - ":descriptor": Impl(descriptor.clone()), - })?; + sqlx::query(&format!( + "INSERT INTO {} (id, descriptor) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET descriptor = $2", + Self::WALLET_TABLE_NAME, + )) + .bind(0) + .bind(descriptor.to_string()) + .execute(&mut **db_tx) + .await?; } - let mut change_descriptor_statement = db_tx.prepare_cached(&format!( - "INSERT INTO {}(id, change_descriptor) VALUES(:id, :change_descriptor) ON CONFLICT(id) DO UPDATE SET change_descriptor=:change_descriptor", - Self::WALLET_TABLE_NAME, - ))?; if let Some(change_descriptor) = &self.change_descriptor { - change_descriptor_statement.execute(named_params! { - ":id": 0, - ":change_descriptor": Impl(change_descriptor.clone()), - })?; + sqlx::query(&format!( + "INSERT INTO {} (id, change_descriptor) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET change_descriptor = $2", + Self::WALLET_TABLE_NAME, + )) + .bind(0) + .bind(change_descriptor.to_string()) + .execute(&mut **db_tx) + .await?; } - let mut network_statement = db_tx.prepare_cached(&format!( - "INSERT INTO {}(id, network) VALUES(:id, :network) ON CONFLICT(id) DO UPDATE SET network=:network", - Self::WALLET_TABLE_NAME, - ))?; if let Some(network) = self.network { - network_statement.execute(named_params! { - ":id": 0, - ":network": Impl(network), - })?; + sqlx::query(&format!( + "INSERT INTO {} (id, network) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET network = $2", + Self::WALLET_TABLE_NAME, + )) + .bind(0) + .bind(network.to_string()) + .execute(&mut **db_tx) + .await?; } - self.local_chain.persist_to_sqlite(db_tx)?; - self.tx_graph.persist_to_sqlite(db_tx)?; - self.indexer.persist_to_sqlite(db_tx)?; + self.local_chain.persist_to_postgres(db_tx).await?; + self.tx_graph.persist_to_postgres(db_tx).await?; + self.indexer.persist_to_postgres(db_tx).await?; Ok(()) } } + +// #[cfg(feature = "rusqlite")] +// impl ChangeSet { +// /// Schema name for wallet. +// pub const WALLET_SCHEMA_NAME: &'static str = "bdk_wallet"; +// /// Name of table to store wallet descriptors and network. +// pub const WALLET_TABLE_NAME: &'static str = "bdk_wallet"; +// +// /// Initialize sqlite tables for wallet schema & table. +// fn init_wallet_sqlite_tables( +// db_tx: &chain::rusqlite::Transaction, +// ) -> chain::rusqlite::Result<()> { +// let schema_v0: &[&str] = &[&format!( +// "CREATE TABLE {} ( \ +// id INTEGER PRIMARY KEY NOT NULL CHECK (id = 0), \ +// descriptor TEXT, \ +// change_descriptor TEXT, \ +// network TEXT \ +// ) STRICT;", +// Self::WALLET_TABLE_NAME, +// )]; +// crate::rusqlite_impl::migrate_schema(db_tx, Self::WALLET_SCHEMA_NAME, &[schema_v0]) +// } +// +// /// Recover a [`ChangeSet`] from sqlite database. +// pub fn from_sqlite(db_tx: &chain::rusqlite::Transaction) -> chain::rusqlite::Result { +// Self::init_wallet_sqlite_tables(db_tx)?; +// use chain::rusqlite::OptionalExtension; +// use chain::Impl; +// +// let mut changeset = Self::default(); +// +// let mut wallet_statement = db_tx.prepare(&format!( +// "SELECT descriptor, change_descriptor, network FROM {}", +// Self::WALLET_TABLE_NAME, +// ))?; +// let row = wallet_statement +// .query_row([], |row| { +// Ok(( +// row.get::<_, Impl>>("descriptor")?, +// row.get::<_, Impl>>("change_descriptor")?, +// row.get::<_, Impl>("network")?, +// )) +// }) +// .optional()?; +// if let Some((Impl(desc), Impl(change_desc), Impl(network))) = row { +// changeset.descriptor = Some(desc); +// changeset.change_descriptor = Some(change_desc); +// changeset.network = Some(network); +// } +// +// changeset.local_chain = local_chain::ChangeSet::from_sqlite(db_tx)?; +// changeset.tx_graph = tx_graph::ChangeSet::<_>::from_sqlite(db_tx)?; +// changeset.indexer = keychain_txout::ChangeSet::from_sqlite(db_tx)?; +// +// Ok(changeset) +// } +// +// /// Persist [`ChangeSet`] to sqlite database. +// pub fn persist_to_sqlite( +// &self, +// db_tx: &chain::rusqlite::Transaction, +// ) -> chain::rusqlite::Result<()> { +// Self::init_wallet_sqlite_tables(db_tx)?; +// use chain::rusqlite::named_params; +// use chain::Impl; +// +// let mut descriptor_statement = db_tx.prepare_cached(&format!( +// "INSERT INTO {}(id, descriptor) VALUES(:id, :descriptor) ON CONFLICT(id) DO UPDATE SET descriptor=:descriptor", +// Self::WALLET_TABLE_NAME, +// ))?; +// if let Some(descriptor) = &self.descriptor { +// descriptor_statement.execute(named_params! { +// ":id": 0, +// ":descriptor": Impl(descriptor.clone()), +// })?; +// } +// +// let mut change_descriptor_statement = db_tx.prepare_cached(&format!( +// "INSERT INTO {}(id, change_descriptor) VALUES(:id, :change_descriptor) ON CONFLICT(id) DO UPDATE SET change_descriptor=:change_descriptor", +// Self::WALLET_TABLE_NAME, +// ))?; +// if let Some(change_descriptor) = &self.change_descriptor { +// change_descriptor_statement.execute(named_params! { +// ":id": 0, +// ":change_descriptor": Impl(change_descriptor.clone()), +// })?; +// } +// +// let mut network_statement = db_tx.prepare_cached(&format!( +// "INSERT INTO {}(id, network) VALUES(:id, :network) ON CONFLICT(id) DO UPDATE SET network=:network", +// Self::WALLET_TABLE_NAME, +// ))?; +// if let Some(network) = self.network { +// network_statement.execute(named_params! { +// ":id": 0, +// ":network": Impl(network), +// })?; +// } +// +// self.local_chain.persist_to_sqlite(db_tx)?; +// self.tx_graph.persist_to_sqlite(db_tx)?; +// self.indexer.persist_to_sqlite(db_tx)?; +// Ok(()) +// } +// } + impl From for ChangeSet { fn from(chain: local_chain::ChangeSet) -> Self { Self { diff --git a/crates/wallet/src/wallet/params.rs b/crates/wallet/src/wallet/params.rs index 44d0d1db1..19f308ccd 100644 --- a/crates/wallet/src/wallet/params.rs +++ b/crates/wallet/src/wallet/params.rs @@ -111,6 +111,9 @@ impl CreateParams { } } +unsafe impl Send for CreateParams {} +unsafe impl Send for LoadParams {} + /// Parameters for [`Wallet::load`] or [`PersistedWallet::load`]. #[must_use] pub struct LoadParams { diff --git a/crates/wallet/src/wallet/persisted.rs b/crates/wallet/src/wallet/persisted.rs index cc9f267f4..04eedd562 100644 --- a/crates/wallet/src/wallet/persisted.rs +++ b/crates/wallet/src/wallet/persisted.rs @@ -1,92 +1,188 @@ use core::fmt; - +use std::prelude::rust_2021::Box; +use chain::{FutureResult, sqlx}; use crate::{descriptor::DescriptorError, Wallet}; /// Represents a persisted wallet. pub type PersistedWallet = bdk_chain::Persisted; -#[cfg(feature = "rusqlite")] -impl<'c> chain::PersistWith> for Wallet { +#[cfg(feature = "sqlx")] +impl<'c> chain::PersistAsyncWith> for Wallet { type CreateParams = crate::CreateParams; type LoadParams = crate::LoadParams; - type CreateError = CreateWithPersistError; - type LoadError = LoadWithPersistError; - type PersistError = bdk_chain::rusqlite::Error; + type CreateError = CreateWithPersistError; + type LoadError = LoadWithPersistError; + type PersistError = sqlx::Error; - fn create( - db: &mut bdk_chain::rusqlite::Transaction<'c>, + async fn create( + db: &mut sqlx::Transaction<'c, sqlx::Postgres>, params: Self::CreateParams, ) -> Result { - let mut wallet = - Self::create_with_params(params).map_err(CreateWithPersistError::Descriptor)?; - if let Some(changeset) = wallet.take_staged() { - changeset - .persist_to_sqlite(db) - .map_err(CreateWithPersistError::Persist)?; - } - Ok(wallet) + // Box::pin(async move { + let mut wallet = Self::create_with_params(params).map_err(CreateWithPersistError::Descriptor)?; + if let Some(changeset) = wallet.take_staged() { + changeset + .persist_to_postgres(db) + .await + .map_err(CreateWithPersistError::Persist)?; + } + Ok(wallet) + // }) } - fn load( - conn: &mut bdk_chain::rusqlite::Transaction<'c>, + async fn load( + conn: &mut sqlx::Transaction<'c, sqlx::Postgres>, params: Self::LoadParams, ) -> Result, Self::LoadError> { - let changeset = - crate::ChangeSet::from_sqlite(conn).map_err(LoadWithPersistError::Persist)?; - if chain::Merge::is_empty(&changeset) { - return Ok(None); - } - Self::load_with_params(changeset, params).map_err(LoadWithPersistError::InvalidChangeSet) + // Box::pin(async move { + let changeset = crate::ChangeSet::from_postgres(conn) + .await + .map_err(LoadWithPersistError::Persist)?; + if chain::Merge::is_empty(&changeset) { + return Ok(None); + } + Self::load_with_params(changeset, params).map_err(LoadWithPersistError::InvalidChangeSet) + // }) } - fn persist( - db: &mut bdk_chain::rusqlite::Transaction<'c>, + async fn persist( + db: &mut sqlx::Transaction<'c, sqlx::Postgres>, changeset: &::ChangeSet, ) -> Result<(), Self::PersistError> { - changeset.persist_to_sqlite(db) + // Box::pin(async move { + changeset.persist_to_postgres(&mut *db).await + // }) } } -#[cfg(feature = "rusqlite")] -impl chain::PersistWith for Wallet { +#[cfg(feature = "sqlx")] +impl chain::PersistAsyncWith for Wallet { type CreateParams = crate::CreateParams; type LoadParams = crate::LoadParams; - type CreateError = CreateWithPersistError; - type LoadError = LoadWithPersistError; - type PersistError = bdk_chain::rusqlite::Error; + type CreateError = CreateWithPersistError; + type LoadError = LoadWithPersistError; + type PersistError = sqlx::Error; - fn create( - db: &mut bdk_chain::rusqlite::Connection, + async fn create<'a>( + db: &'a mut sqlx::PgPool, params: Self::CreateParams, - ) -> Result { - let mut db_tx = db.transaction().map_err(CreateWithPersistError::Persist)?; - let wallet = chain::PersistWith::create(&mut db_tx, params)?; - db_tx.commit().map_err(CreateWithPersistError::Persist)?; - Ok(wallet) + ) -> FutureResult<'a, Self, Self::CreateError> { + Box::pin(async move { + let mut db_tx = db.begin().await.map_err(CreateWithPersistError::Persist)?; + let wallet = chain::PersistAsyncWith::create(&mut db_tx, params).await?; + db_tx.commit().await.map_err(CreateWithPersistError::Persist)?; + Ok(wallet) + }) } - fn load( - db: &mut bdk_chain::rusqlite::Connection, + async fn load( + db: &mut sqlx::PgPool, params: Self::LoadParams, ) -> Result, Self::LoadError> { - let mut db_tx = db.transaction().map_err(LoadWithPersistError::Persist)?; - let wallet_opt = chain::PersistWith::load(&mut db_tx, params)?; - db_tx.commit().map_err(LoadWithPersistError::Persist)?; - Ok(wallet_opt) + // Box::pin(async move { + let mut db_tx = db.begin().await.map_err(LoadWithPersistError::Persist)?; + let wallet_opt = chain::PersistAsyncWith::load(&mut db_tx, params).await?; + db_tx.commit().await.map_err(LoadWithPersistError::Persist)?; + Ok(wallet_opt) + // }) } - fn persist( - db: &mut bdk_chain::rusqlite::Connection, + async fn persist( + db: &mut sqlx::PgPool, changeset: &::ChangeSet, ) -> Result<(), Self::PersistError> { - let db_tx = db.transaction()?; - changeset.persist_to_sqlite(&db_tx)?; - db_tx.commit() + let mut db_tx = db.begin().await?; + changeset.persist_to_postgres(&mut db_tx).await?; + db_tx.commit().await?; + Ok(()) } } + +// #[cfg(feature = "rusqlite")] +// impl<'c> chain::PersistWith> for Wallet { +// type CreateParams = crate::CreateParams; +// type LoadParams = crate::LoadParams; +// +// type CreateError = CreateWithPersistError; +// type LoadError = LoadWithPersistError; +// type PersistError = bdk_chain::rusqlite::Error; +// +// fn create( +// db: &mut bdk_chain::rusqlite::Transaction<'c>, +// params: Self::CreateParams, +// ) -> Result { +// let mut wallet = +// Self::create_with_params(params).map_err(CreateWithPersistError::Descriptor)?; +// if let Some(changeset) = wallet.take_staged() { +// changeset +// .persist_to_sqlite(db) +// .map_err(CreateWithPersistError::Persist)?; +// } +// Ok(wallet) +// } +// +// fn load( +// conn: &mut bdk_chain::rusqlite::Transaction<'c>, +// params: Self::LoadParams, +// ) -> Result, Self::LoadError> { +// let changeset = +// crate::ChangeSet::from_sqlite(conn).map_err(LoadWithPersistError::Persist)?; +// if chain::Merge::is_empty(&changeset) { +// return Ok(None); +// } +// Self::load_with_params(changeset, params).map_err(LoadWithPersistError::InvalidChangeSet) +// } +// +// fn persist( +// db: &mut bdk_chain::rusqlite::Transaction<'c>, +// changeset: &::ChangeSet, +// ) -> Result<(), Self::PersistError> { +// changeset.persist_to_sqlite(db) +// } +// } +// +// #[cfg(feature = "rusqlite")] +// impl chain::PersistWith for Wallet { +// type CreateParams = crate::CreateParams; +// type LoadParams = crate::LoadParams; +// +// type CreateError = CreateWithPersistError; +// type LoadError = LoadWithPersistError; +// type PersistError = bdk_chain::rusqlite::Error; +// +// fn create( +// db: &mut bdk_chain::rusqlite::Connection, +// params: Self::CreateParams, +// ) -> Result { +// let mut db_tx = db.transaction().map_err(CreateWithPersistError::Persist)?; +// let wallet = chain::PersistWith::create(&mut db_tx, params)?; +// db_tx.commit().map_err(CreateWithPersistError::Persist)?; +// Ok(wallet) +// } +// +// fn load( +// db: &mut bdk_chain::rusqlite::Connection, +// params: Self::LoadParams, +// ) -> Result, Self::LoadError> { +// let mut db_tx = db.transaction().map_err(LoadWithPersistError::Persist)?; +// let wallet_opt = chain::PersistWith::load(&mut db_tx, params)?; +// db_tx.commit().map_err(LoadWithPersistError::Persist)?; +// Ok(wallet_opt) +// } +// +// fn persist( +// db: &mut bdk_chain::rusqlite::Connection, +// changeset: &::ChangeSet, +// ) -> Result<(), Self::PersistError> { +// let db_tx = db.transaction()?; +// changeset.persist_to_sqlite(&db_tx)?; +// db_tx.commit() +// } +// } + #[cfg(feature = "file_store")] impl chain::PersistWith> for Wallet { type CreateParams = crate::CreateParams; diff --git a/crates/wallet/src/wallet_migrations/00_create_wallet_tables.sql b/crates/wallet/src/wallet_migrations/00_create_wallet_tables.sql new file mode 100644 index 000000000..1336639d3 --- /dev/null +++ b/crates/wallet/src/wallet_migrations/00_create_wallet_tables.sql @@ -0,0 +1,7 @@ +-- Create bdk_txs table +CREATE TABLE bdk_wallet ( \ + id INTEGER PRIMARY KEY NOT NULL CHECK (id = 0), \ + descriptor TEXT, \ + change_descriptor TEXT, \ + network TEXT \ + ) \ No newline at end of file diff --git a/example-crates/wallet_esplora_async/Cargo.toml b/example-crates/wallet_esplora_async/Cargo.toml index aa18a5e9d..d53a6f499 100644 --- a/example-crates/wallet_esplora_async/Cargo.toml +++ b/example-crates/wallet_esplora_async/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -bdk_wallet = { path = "../../crates/wallet", features = ["rusqlite"] } +bdk_wallet = { path = "../../crates/wallet", features = ["sqlx"] } bdk_esplora = { path = "../../crates/esplora", features = ["async-https"] } tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros"] } anyhow = "1" diff --git a/example-crates/wallet_esplora_async/src/main.rs b/example-crates/wallet_esplora_async/src/main.rs index cccd83395..0e4a2ba35 100644 --- a/example-crates/wallet_esplora_async/src/main.rs +++ b/example-crates/wallet_esplora_async/src/main.rs @@ -4,9 +4,10 @@ use anyhow::Ok; use bdk_esplora::{esplora_client, EsploraAsyncExt}; use bdk_wallet::{ bitcoin::{Amount, Network}, - rusqlite::Connection, + // rusqlite::Connection, KeychainKind, SignOptions, Wallet, }; +use bdk_wallet::chain::sqlx; const SEND_AMOUNT: Amount = Amount::from_sat(5000); const STOP_GAP: usize = 5; @@ -20,7 +21,8 @@ const ESPLORA_URL: &str = "http://signet.bitcoindevkit.net"; #[tokio::main] async fn main() -> Result<(), anyhow::Error> { - let mut conn = Connection::open(DB_PATH)?; + let mut conn = sqlx::Connection::connect("postgresql://sirendb_owner:2vDJjo9pGKiP@ep-sweet-shape-a5ouj5s7.us-east-2.aws.neon.tech/sirendb?sslmode=require").await.unwrap(); + // Connection::open(DB_PATH)?; let wallet_opt = Wallet::load() .descriptors(EXTERNAL_DESC, INTERNAL_DESC) From b309c2fd873c6682002242b0cd81932eabb9b2e6 Mon Sep 17 00:00:00 2001 From: matthiasdebernardini Date: Wed, 14 Aug 2024 15:11:37 +0200 Subject: [PATCH 2/2] lifetime issues again --- .../00_create_keychain_txout_table.sql | 5 ++ .../00_create_local_chain_table.sql | 5 ++ crates/chain/src/persist.rs | 8 +- .../00_create_tx_graph_tables.sql | 24 +++++ crates/wallet/src/wallet/changeset.rs | 3 +- crates/wallet/src/wallet/persisted.rs | 87 ++++++++++++------- 6 files changed, 98 insertions(+), 34 deletions(-) create mode 100644 crates/chain/keychain_migrations/00_create_keychain_txout_table.sql create mode 100644 crates/chain/local_chain_migrations/00_create_local_chain_table.sql create mode 100644 crates/chain/tx_graph_migrations/00_create_tx_graph_tables.sql diff --git a/crates/chain/keychain_migrations/00_create_keychain_txout_table.sql b/crates/chain/keychain_migrations/00_create_keychain_txout_table.sql new file mode 100644 index 000000000..4031a76df --- /dev/null +++ b/crates/chain/keychain_migrations/00_create_keychain_txout_table.sql @@ -0,0 +1,5 @@ +-- Create bdk_descriptor_last_revealed table +CREATE TABLE bdk_descriptor_last_revealed ( + descriptor_id TEXT PRIMARY KEY NOT NULL, + last_revealed BIGINT NOT NULL +); \ No newline at end of file diff --git a/crates/chain/local_chain_migrations/00_create_local_chain_table.sql b/crates/chain/local_chain_migrations/00_create_local_chain_table.sql new file mode 100644 index 000000000..5ca25bd54 --- /dev/null +++ b/crates/chain/local_chain_migrations/00_create_local_chain_table.sql @@ -0,0 +1,5 @@ +-- Create bdk_blocks table +CREATE TABLE bdk_blocks ( + block_height INTEGER PRIMARY KEY NOT NULL, + block_hash TEXT NOT NULL +); \ No newline at end of file diff --git a/crates/chain/src/persist.rs b/crates/chain/src/persist.rs index 8ae4c9323..c5328f101 100644 --- a/crates/chain/src/persist.rs +++ b/crates/chain/src/persist.rs @@ -62,16 +62,16 @@ pub trait PersistAsyncWith: Staged + Sized { type PersistError; /// Initialize the `Db` and create `Self`. - async fn create(db: &mut Db, params: Self::CreateParams) -> Result; + fn create(db: &mut Db, params: Self::CreateParams) -> FutureResult; /// Initialize the `Db` and load a previously-persisted `Self`. - async fn load(db: &mut Db, params: Self::LoadParams) -> Result, Self::LoadError>; + fn load(db: &mut Db, params: Self::LoadParams) -> FutureResult, Self::LoadError>; /// Persist changes to the `Db`. - async fn persist<'a>( + fn persist<'a>( db: &'a mut Db, changeset: &'a ::ChangeSet, - ) -> Result<(), Self::PersistError>; + ) -> FutureResult<'a, (), Self::PersistError>; } /// Represents a persisted `T`. diff --git a/crates/chain/tx_graph_migrations/00_create_tx_graph_tables.sql b/crates/chain/tx_graph_migrations/00_create_tx_graph_tables.sql new file mode 100644 index 000000000..757f03053 --- /dev/null +++ b/crates/chain/tx_graph_migrations/00_create_tx_graph_tables.sql @@ -0,0 +1,24 @@ +-- Create bdk_txs table +CREATE TABLE bdk_txs ( + txid TEXT PRIMARY KEY NOT NULL, + raw_tx BYTEA, + last_seen BIGINT +); + +-- Create bdk_txouts table +CREATE TABLE bdk_txouts ( + txid TEXT NOT NULL, + vout INTEGER NOT NULL, + value BIGINT NOT NULL, + script BYTEA NOT NULL, + PRIMARY KEY (txid, vout) +); + +-- Create bdk_anchors table +CREATE TABLE bdk_anchors ( + txid TEXT NOT NULL REFERENCES bdk_txs (txid), + block_height INTEGER NOT NULL, + block_hash TEXT NOT NULL, + anchor JSONB NOT NULL, + PRIMARY KEY (txid, block_height, block_hash) +); \ No newline at end of file diff --git a/crates/wallet/src/wallet/changeset.rs b/crates/wallet/src/wallet/changeset.rs index 5043e2b6d..258a7a901 100644 --- a/crates/wallet/src/wallet/changeset.rs +++ b/crates/wallet/src/wallet/changeset.rs @@ -81,7 +81,8 @@ impl ChangeSet { db_tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, ) -> sqlx::Result<()> { - sqlx::migrate!("/Users/matthiasdebernardini/Projects/bdk/crates/wallet/src/wallet_migrations").run(db_tx).await + // sqlx::migrate!("/Users/matthiasdebernardini/Projects/bdk/crates/wallet/src/wallet_migrations").run(db_tx).await + Ok(()) } /// Recover a [`ChangeSet`] from PostgreSQL database. diff --git a/crates/wallet/src/wallet/persisted.rs b/crates/wallet/src/wallet/persisted.rs index 04eedd562..919be46f5 100644 --- a/crates/wallet/src/wallet/persisted.rs +++ b/crates/wallet/src/wallet/persisted.rs @@ -1,7 +1,8 @@ use core::fmt; use std::prelude::rust_2021::Box; use chain::{FutureResult, sqlx}; -use crate::{descriptor::DescriptorError, Wallet}; +use chain::sqlx::{Pool, Postgres}; +use crate::{descriptor::DescriptorError, wallet::Wallet}; /// Represents a persisted wallet. pub type PersistedWallet = bdk_chain::Persisted; @@ -15,12 +16,15 @@ impl<'c> chain::PersistAsyncWith> for Wall type LoadError = LoadWithPersistError; type PersistError = sqlx::Error; - async fn create( + fn create( db: &mut sqlx::Transaction<'c, sqlx::Postgres>, params: Self::CreateParams, - ) -> Result { - // Box::pin(async move { - let mut wallet = Self::create_with_params(params).map_err(CreateWithPersistError::Descriptor)?; + ) -> FutureResult<'c, Self, Self::CreateError> { + async fn create( + db: &mut sqlx::Transaction<'_, sqlx::Postgres>, + params: >>::CreateParams, + ) -> Result>>::CreateError> { + let mut wallet = crate::Wallet::create_with_params(params).map_err(CreateWithPersistError::Descriptor)?; if let Some(changeset) = wallet.take_staged() { changeset .persist_to_postgres(db) @@ -28,31 +32,41 @@ impl<'c> chain::PersistAsyncWith> for Wall .map_err(CreateWithPersistError::Persist)?; } Ok(wallet) - // }) + } + Box::pin(create(db, params)) + } - async fn load( + fn load( conn: &mut sqlx::Transaction<'c, sqlx::Postgres>, params: Self::LoadParams, - ) -> Result, Self::LoadError> { - // Box::pin(async move { + ) -> FutureResult<'c, Option, Self::LoadError> { + async fn load( + conn: &mut sqlx::Transaction<'_, sqlx::Postgres>, + params: >>::LoadParams, + ) -> Result, >>::LoadError> { let changeset = crate::ChangeSet::from_postgres(conn) .await .map_err(LoadWithPersistError::Persist)?; if chain::Merge::is_empty(&changeset) { return Ok(None); } - Self::load_with_params(changeset, params).map_err(LoadWithPersistError::InvalidChangeSet) - // }) + crate::wallet::Wallet::load_with_params(changeset, params).map_err(LoadWithPersistError::InvalidChangeSet) + } + Box::pin(load(conn,params)) } - async fn persist( + fn persist( db: &mut sqlx::Transaction<'c, sqlx::Postgres>, changeset: &::ChangeSet, - ) -> Result<(), Self::PersistError> { - // Box::pin(async move { + ) -> FutureResult<'c, (), Self::PersistError> { + async fn persist( + db: &mut sqlx::Transaction<'_, sqlx::Postgres>, + changeset: &::ChangeSet, + ) -> Result<(), >>::PersistError> { changeset.persist_to_postgres(&mut *db).await - // }) + } + Box::pin(persist(db, changeset)) } } @@ -65,38 +79,53 @@ impl chain::PersistAsyncWith for Wallet { type LoadError = LoadWithPersistError; type PersistError = sqlx::Error; - async fn create<'a>( - db: &'a mut sqlx::PgPool, + fn create( + db: &mut sqlx::PgPool, params: Self::CreateParams, - ) -> FutureResult<'a, Self, Self::CreateError> { - Box::pin(async move { + ) -> FutureResult { + async fn create( + db: &mut sqlx::PgPool, + params: >>::CreateParams + // Self::CreateParams, + ) -> Result>>::CreateError> { let mut db_tx = db.begin().await.map_err(CreateWithPersistError::Persist)?; let wallet = chain::PersistAsyncWith::create(&mut db_tx, params).await?; db_tx.commit().await.map_err(CreateWithPersistError::Persist)?; Ok(wallet) - }) + } + Box::pin(create(db, params)) } - async fn load( + fn load( db: &mut sqlx::PgPool, params: Self::LoadParams, - ) -> Result, Self::LoadError> { - // Box::pin(async move { + ) -> FutureResult, Self::LoadError> { + async fn load( + db: &mut sqlx::PgPool, + params: >>::LoadParams, + ) -> Result, >>::LoadError> { let mut db_tx = db.begin().await.map_err(LoadWithPersistError::Persist)?; let wallet_opt = chain::PersistAsyncWith::load(&mut db_tx, params).await?; db_tx.commit().await.map_err(LoadWithPersistError::Persist)?; Ok(wallet_opt) - // }) + } + Box::pin(load(db, params)) } - async fn persist( - db: &mut sqlx::PgPool, - changeset: &::ChangeSet, - ) -> Result<(), Self::PersistError> { + fn persist<'a>( + db: &'a mut sqlx::PgPool, + changeset: &'a ::ChangeSet, + ) -> FutureResult<'a, (), Self::PersistError> { + async fn persist( + db: &mut sqlx::PgPool, + changeset: &::ChangeSet, + ) -> Result<(), >>::PersistError> { let mut db_tx = db.begin().await?; changeset.persist_to_postgres(&mut db_tx).await?; db_tx.commit().await?; - Ok(()) + Ok(()) + } + Box::pin(persist(db, changeset)) } }