diff --git a/src/chainstate/stacks/db/accounts.rs b/src/chainstate/stacks/db/accounts.rs index 7a74949502..ef5eb8ed5c 100644 --- a/src/chainstate/stacks/db/accounts.rs +++ b/src/chainstate/stacks/db/accounts.rs @@ -153,6 +153,10 @@ impl StacksChainState { }) } + pub fn get_nonce(clarity_tx: &mut T, principal: &PrincipalData) -> u64 { + clarity_tx.with_clarity_db_readonly(|ref mut db| db.get_account_nonce(principal)) + } + pub fn get_account_ft( clarity_tx: &mut ClarityTx, contract_id: &QualifiedContractIdentifier, diff --git a/src/chainstate/stacks/miner.rs b/src/chainstate/stacks/miner.rs index f352b142eb..f7ceff13c5 100644 --- a/src/chainstate/stacks/miner.rs +++ b/src/chainstate/stacks/miner.rs @@ -918,7 +918,7 @@ impl<'a> StacksMicroblockBuilder<'a> { let deadline = get_epoch_time_ms() + (self.settings.max_miner_time_ms as u128); let mut block_limit_hit = BlockLimitFunction::NO_LIMIT_HIT; - mem_pool.reset_last_known_nonces()?; + mem_pool.reset_nonce_cache()?; let stacks_epoch_id = clarity_tx.get_epoch(); let block_limit = clarity_tx .block_limit() @@ -935,6 +935,7 @@ impl<'a> StacksMicroblockBuilder<'a> { let mut num_added = 0; intermediate_result = mem_pool.iterate_candidates( &mut clarity_tx, + &mut tx_events, self.anchor_block_height, mempool_settings.clone(), |clarity_tx, to_consider, estimator| { @@ -946,11 +947,12 @@ impl<'a> StacksMicroblockBuilder<'a> { "Microblock miner deadline exceeded ({} ms)", self.settings.max_miner_time_ms ); - return Ok(false); + return Ok(None); } if considered.contains(&mempool_tx.tx.txid()) { - return Ok(true); + return Ok(Some(TransactionResult::skipped( + &mempool_tx.tx, "Transaction already considered.".to_string()).convert_to_event())); } else { considered.insert(mempool_tx.tx.txid()); } @@ -963,7 +965,7 @@ impl<'a> StacksMicroblockBuilder<'a> { &block_limit_hit, ) { Ok(tx_result) => { - tx_events.push(tx_result.convert_to_event()); + let result_event = tx_result.convert_to_event(); match tx_result { TransactionResult::Success(TransactionSuccess { receipt, @@ -993,7 +995,7 @@ impl<'a> StacksMicroblockBuilder<'a> { num_txs += 1; num_added += 1; num_selected += 1; - Ok(true) + Ok(Some(result_event)) } TransactionResult::Skipped(TransactionSkipped { error, @@ -1019,7 +1021,7 @@ impl<'a> StacksMicroblockBuilder<'a> { debug!("Block budget exceeded while mining microblock"; "tx" => %mempool_tx.tx.txid(), "next_behavior" => "Stop mining microblock"); block_limit_hit = BlockLimitFunction::LIMIT_REACHED; - return Ok(false); + return Ok(None); } } Error::TransactionTooBigError => { @@ -1027,7 +1029,7 @@ impl<'a> StacksMicroblockBuilder<'a> { } _ => {} } - return Ok(true) + return Ok(Some(result_event)) } } } @@ -2077,7 +2079,7 @@ impl StacksBlockBuilder { .convert_to_event(), ); - mempool.reset_last_known_nonces()?; + mempool.reset_nonce_cache()?; mempool.estimate_tx_rates(100, &block_limit, &stacks_epoch_id)?; @@ -2101,6 +2103,7 @@ impl StacksBlockBuilder { let mut num_considered = 0; intermediate_result = mempool.iterate_candidates( &mut epoch_tx, + &mut tx_events, tip_height, mempool_settings.clone(), |epoch_tx, to_consider, estimator| { @@ -2108,28 +2111,53 @@ impl StacksBlockBuilder { let update_estimator = to_consider.update_estimate; if block_limit_hit == BlockLimitFunction::LIMIT_REACHED { - return Ok(false); + return Ok(None); } if get_epoch_time_ms() >= deadline { debug!("Miner mining time exceeded ({} ms)", max_miner_time_ms); - return Ok(false); + return Ok(None); } // skip transactions early if we can if considered.contains(&txinfo.tx.txid()) { - return Ok(true); + return Ok(Some( + TransactionResult::skipped( + &txinfo.tx, + "Transaction already considered.".to_string(), + ) + .convert_to_event(), + )); } if let Some(nonce) = mined_origin_nonces.get(&txinfo.tx.origin_address()) { if *nonce >= txinfo.tx.get_origin_nonce() { - return Ok(true); + return Ok(Some( + TransactionResult::skipped( + &txinfo.tx, + format!( + "Bad origin nonce, tx nonce {} versus {}.", + txinfo.tx.get_origin_nonce(), + *nonce + ), + ) + .convert_to_event(), + )); } } if let Some(sponsor_addr) = txinfo.tx.sponsor_address() { if let Some(nonce) = mined_sponsor_nonces.get(&sponsor_addr) { if let Some(sponsor_nonce) = txinfo.tx.get_sponsor_nonce() { if *nonce >= sponsor_nonce { - return Ok(true); + return Ok(Some( + TransactionResult::skipped( + &txinfo.tx, + format!( + "Bad sponsor nonce, tx nonce {} versus {}.", + sponsor_nonce, *nonce + ), + ) + .convert_to_event(), + )); } } } @@ -2144,8 +2172,8 @@ impl StacksBlockBuilder { txinfo.metadata.len, &block_limit_hit, ); - tx_events.push(tx_result.convert_to_event()); + let result_event = tx_result.convert_to_event(); match tx_result { TransactionResult::Success(TransactionSuccess { receipt, .. }) => { num_txs += 1; @@ -2192,7 +2220,7 @@ impl StacksBlockBuilder { "Stop mining anchored block due to limit exceeded" ); block_limit_hit = BlockLimitFunction::LIMIT_REACHED; - return Ok(false); + return Ok(None); } } Error::TransactionTooBigError => { @@ -2203,13 +2231,13 @@ impl StacksBlockBuilder { } e => { warn!("Failed to apply tx {}: {:?}", &txinfo.tx.txid(), &e); - return Ok(true); + return Ok(Some(result_event)); } } } } - Ok(true) + Ok(Some(result_event)) }, ); @@ -2535,11 +2563,14 @@ pub mod test { use crate::chainstate::stacks::db::blocks::test::store_staging_block; use crate::chainstate::stacks::db::test::*; use crate::chainstate::stacks::db::*; + use crate::chainstate::stacks::test::codec_all_transactions; use crate::chainstate::stacks::Error as ChainstateError; use crate::chainstate::stacks::C32_ADDRESS_VERSION_TESTNET_SINGLESIG; use crate::chainstate::stacks::*; + use crate::core::tests::make_block; use crate::net::test::*; use crate::util_lib::db::Error as db_error; + use clarity::vm::test_util::TEST_BURN_STATE_DB; use clarity::vm::types::*; use stacks_common::address::*; use stacks_common::util::sleep_ms; @@ -6721,6 +6752,368 @@ pub mod test { .0 } + #[test] + /// Test the situation in which the nonce order of transactions from a user. That is, + /// nonce 1 has a higher fee than nonce 0. + /// Want to see that both transactions can go into the same block, because the miner + /// should make multiple passes. + fn test_fee_order_mismatch_nonce_order() { + let privk = StacksPrivateKey::from_hex( + "42faca653724860da7a41bfcef7e6ba78db55146f6900de8cb2a9f760ffac70c01", + ) + .unwrap(); + let addr = StacksAddress::from_public_keys( + C32_ADDRESS_VERSION_TESTNET_SINGLESIG, + &AddressHashMode::SerializeP2PKH, + 1, + &vec![StacksPublicKey::from_private(&privk)], + ) + .unwrap(); + + let mut peer_config = TestPeerConfig::new( + "test_build_anchored_blocks_stx_transfers_single", + 2002, + 2003, + ); + peer_config.initial_balances = vec![(addr.to_account_principal(), 1000000000)]; + + let mut peer = TestPeer::new(peer_config); + + let chainstate_path = peer.chainstate_path.clone(); + + let first_stacks_block_height = { + let sn = + SortitionDB::get_canonical_burn_chain_tip(&peer.sortdb.as_ref().unwrap().conn()) + .unwrap(); + sn.block_height + }; + + let recipient_addr_str = "ST1RFD5Q2QPK3E0F08HG9XDX7SSC7CNRS0QR0SGEV"; + let recipient = StacksAddress::from_string(recipient_addr_str).unwrap(); + let sender_nonce = 0; + + let mut last_block = None; + // send transactions to the mempool + let tip = SortitionDB::get_canonical_burn_chain_tip(&peer.sortdb.as_ref().unwrap().conn()) + .unwrap(); + + let (burn_ops, stacks_block, microblocks) = peer.make_tenure( + |ref mut miner, + ref mut sortdb, + ref mut chainstate, + vrf_proof, + ref parent_opt, + ref parent_microblock_header_opt| { + let parent_tip = match parent_opt { + None => StacksChainState::get_genesis_header_info(chainstate.db()).unwrap(), + Some(block) => { + let ic = sortdb.index_conn(); + let snapshot = SortitionDB::get_block_snapshot_for_winning_stacks_block( + &ic, + &tip.sortition_id, + &block.block_hash(), + ) + .unwrap() + .unwrap(); // succeeds because we don't fork + StacksChainState::get_anchored_block_header_info( + chainstate.db(), + &snapshot.consensus_hash, + &snapshot.winning_stacks_block_hash, + ) + .unwrap() + .unwrap() + } + }; + + let parent_header_hash = parent_tip.anchored_header.block_hash(); + let parent_consensus_hash = parent_tip.consensus_hash.clone(); + + let mut mempool = + MemPoolDB::open_test(false, 0x80000000, &chainstate_path).unwrap(); + + let coinbase_tx = make_coinbase(miner, 0); + + let stx_transfer0 = + make_user_stacks_transfer(&privk, 0, 200, &recipient.to_account_principal(), 1); + let stx_transfer1 = + make_user_stacks_transfer(&privk, 1, 400, &recipient.to_account_principal(), 1); + + mempool + .submit( + chainstate, + &parent_consensus_hash, + &parent_header_hash, + &stx_transfer0, + None, + &ExecutionCost::max_value(), + &StacksEpochId::Epoch20, + ) + .unwrap(); + + mempool + .submit( + chainstate, + &parent_consensus_hash, + &parent_header_hash, + &stx_transfer1, + None, + &ExecutionCost::max_value(), + &StacksEpochId::Epoch20, + ) + .unwrap(); + + let anchored_block = StacksBlockBuilder::build_anchored_block( + chainstate, + &sortdb.index_conn(), + &mut mempool, + &parent_tip, + tip.total_burn, + vrf_proof, + Hash160([0 as u8; 20]), + &coinbase_tx, + BlockBuilderSettings::max_value(), + None, + ) + .unwrap(); + (anchored_block.0, vec![]) + }, + ); + + last_block = Some(stacks_block.clone()); + + peer.next_burnchain_block(burn_ops.clone()); + peer.process_stacks_epoch_at_tip(&stacks_block, µblocks); + + // Both user transactions and the coinbase should have been mined. + assert_eq!(stacks_block.txs.len(), 3); + } + + #[test] + fn mempool_walk_test_users_1_rounds_10_cache_size_2_null_prob_0() { + paramaterized_mempool_walk_test(1, 10, 2, 0, 30000) + } + + #[test] + fn mempool_walk_test_users_10_rounds_3_cache_size_2_null_prob_0() { + paramaterized_mempool_walk_test(10, 3, 2, 0, 30000) + } + + #[test] + fn mempool_walk_test_users_1_rounds_10_cache_size_2_null_prob_50() { + paramaterized_mempool_walk_test(1, 10, 2, 50, 30000) + } + + #[test] + fn mempool_walk_test_users_10_rounds_3_cache_size_2_null_prob_50() { + paramaterized_mempool_walk_test(10, 3, 2, 50, 30000) + } + + #[test] + fn mempool_walk_test_users_1_rounds_10_cache_size_2_null_prob_100() { + paramaterized_mempool_walk_test(1, 10, 2, 100, 30000) + } + + #[test] + fn mempool_walk_test_users_10_rounds_3_cache_size_2_null_prob_100() { + paramaterized_mempool_walk_test(10, 3, 2, 100, 30000) + } + + #[test] + fn mempool_walk_test_users_10_rounds_3_cache_size_2000_null_prob_0() { + paramaterized_mempool_walk_test(10, 3, 2000, 0, 30000) + } + + #[test] + fn mempool_walk_test_users_10_rounds_3_cache_size_2000_null_prob_50() { + paramaterized_mempool_walk_test(10, 3, 2000, 50, 30000) + } + + #[test] + fn mempool_walk_test_users_10_rounds_3_cache_size_2000_null_prob_100() { + paramaterized_mempool_walk_test(10, 3, 2000, 100, 30000) + } + + /// With the parameters given, create `num_rounds` transactions per each user in `num_users`. + /// `nonce_and_candidate_cache_size` is the cache size used for both of the nonce cache + /// and the candidate cache. + fn paramaterized_mempool_walk_test( + num_users: usize, + num_rounds: usize, + nonce_and_candidate_cache_size: u64, + consider_no_estimate_tx_prob: u8, + timeout_ms: u128, + ) { + let key_address_pairs: Vec<(Secp256k1PrivateKey, StacksAddress)> = (0..num_users) + .map(|_user_index| { + let privk = StacksPrivateKey::new(); + let addr = StacksAddress::from_public_keys( + C32_ADDRESS_VERSION_TESTNET_SINGLESIG, + &AddressHashMode::SerializeP2PKH, + 1, + &vec![StacksPublicKey::from_private(&privk)], + ) + .unwrap(); + (privk, addr) + }) + .collect(); + + let test_name = format!( + "mempool_walk_test_users_{}_rounds_{}_cache_size_{}_null_prob_{}", + num_users, num_rounds, nonce_and_candidate_cache_size, consider_no_estimate_tx_prob + ); + let mut peer_config = TestPeerConfig::new(&test_name, 2002, 2003); + + peer_config.initial_balances = vec![]; + for (privk, addr) in &key_address_pairs { + peer_config + .initial_balances + .push((addr.to_account_principal(), 1000000000)); + } + + let recipient_addr_str = "ST1RFD5Q2QPK3E0F08HG9XDX7SSC7CNRS0QR0SGEV"; + let recipient = StacksAddress::from_string(recipient_addr_str).unwrap(); + + let mut chainstate = + instantiate_chainstate_with_balances(false, 0x80000000, &test_name, vec![]); + let chainstate_path = chainstate_path(&test_name); + let mut mempool = MemPoolDB::open_test(false, 0x80000000, &chainstate_path).unwrap(); + let b_1 = make_block( + &mut chainstate, + ConsensusHash([0x1; 20]), + &( + FIRST_BURNCHAIN_CONSENSUS_HASH.clone(), + FIRST_STACKS_BLOCK_HASH.clone(), + ), + 1, + 1, + ); + let b_2 = make_block(&mut chainstate, ConsensusHash([0x2; 20]), &b_1, 2, 2); + + let mut mempool_settings = MemPoolWalkSettings::default(); + mempool_settings.min_tx_fee = 10; + let mut tx_events = Vec::new(); + + let txs = codec_all_transactions( + &TransactionVersion::Testnet, + 0x80000000, + &TransactionAnchorMode::Any, + &TransactionPostConditionMode::Allow, + ); + + let mut transaction_counter = 0; + for round_index in 0..num_rounds { + for user_index in 0..num_users { + transaction_counter += 1; + let mut tx = make_user_stacks_transfer( + &key_address_pairs[user_index].0, + round_index as u64, + 200, + &recipient.to_account_principal(), + 1, + ); + + let mut mempool_tx = mempool.tx_begin().unwrap(); + + let origin_address = tx.origin_address(); + let origin_nonce = tx.get_origin_nonce(); + let sponsor_address = tx.sponsor_address().unwrap_or(origin_address); + let sponsor_nonce = tx.get_sponsor_nonce().unwrap_or(origin_nonce); + + tx.set_tx_fee(100); + let txid = tx.txid(); + let tx_bytes = tx.serialize_to_vec(); + let tx_fee = tx.get_tx_fee(); + let height = 100; + + MemPoolDB::try_add_tx( + &mut mempool_tx, + &mut chainstate, + &b_1.0, + &b_1.1, + txid, + tx_bytes, + tx_fee, + height, + &origin_address, + round_index.try_into().unwrap(), + &sponsor_address, + round_index.try_into().unwrap(), + None, + ) + .unwrap(); + + if transaction_counter & 1 == 0 { + mempool_tx + .execute( + "UPDATE mempool SET fee_rate = ? WHERE txid = ?", + rusqlite::params![Some(123.0), &txid], + ) + .unwrap(); + } else { + let none: Option = None; + mempool_tx + .execute( + "UPDATE mempool SET fee_rate = ? WHERE txid = ?", + rusqlite::params![none, &txid], + ) + .unwrap(); + } + + mempool_tx.commit().unwrap(); + } + } + + mempool_settings.nonce_cache_size = nonce_and_candidate_cache_size; + mempool_settings.candidate_retry_cache_size = nonce_and_candidate_cache_size; + mempool_settings.consider_no_estimate_tx_prob = consider_no_estimate_tx_prob; + let deadline = get_epoch_time_ms() + timeout_ms; + chainstate.with_read_only_clarity_tx( + &TEST_BURN_STATE_DB, + &StacksBlockHeader::make_index_block_hash(&b_2.0, &b_2.1), + |clarity_conn| { + let mut count_txs = 0; + // When the candidate cache fills, one pass cannot process all transactions + loop { + if mempool + .iterate_candidates::<_, ChainstateError, _>( + clarity_conn, + &mut tx_events, + 2, + mempool_settings.clone(), + |_, available_tx, _| { + count_txs += 1; + Ok(Some( + // Generate any success result + TransactionResult::success( + &available_tx.tx.tx, + available_tx.tx.metadata.tx_fee, + StacksTransactionReceipt::from_stx_transfer( + available_tx.tx.tx.clone(), + vec![], + Value::okay(Value::Bool(true)).unwrap(), + ExecutionCost::zero(), + ), + ) + .convert_to_event(), + )) + }, + ) + .unwrap() + == 0 + { + break; + } + assert!(get_epoch_time_ms() < deadline, "test timed out"); + } + assert_eq!( + count_txs, transaction_counter, + "Mempool should find all {} transactions", + transaction_counter + ); + }, + ); + } + static CONTRACT: &'static str = " (define-map my-map int int) (define-private (do (input bool)) diff --git a/src/core/mempool.rs b/src/core/mempool.rs index de8e6993cc..ae52823e9d 100644 --- a/src/core/mempool.rs +++ b/src/core/mempool.rs @@ -14,8 +14,8 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use std::cmp; -use std::collections::HashSet; +use std::cmp::{self, Ordering}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::fs; use std::hash::Hasher; use std::io::{Read, Write}; @@ -31,6 +31,7 @@ use rusqlite::Error as SqliteError; use rusqlite::OpenFlags; use rusqlite::OptionalExtension; use rusqlite::Row; +use rusqlite::Rows; use rusqlite::Transaction; use rusqlite::NO_PARAMS; @@ -236,6 +237,18 @@ pub struct MemPoolTxInfo { pub metadata: MemPoolTxMetadata, } +/// This class is a minimal version of `MemPoolTxInfo`. It contains +/// just enough information to 1) filter by nonce readiness, 2) sort by fee rate. +#[derive(Debug, Clone)] +pub struct MemPoolTxInfoPartial { + pub txid: Txid, + pub fee_rate: Option, + pub origin_address: StacksAddress, + pub origin_nonce: u64, + pub sponsor_address: StacksAddress, + pub sponsor_nonce: u64, +} + #[derive(Debug, PartialEq, Clone)] pub struct MemPoolTxMetadata { pub txid: Txid, @@ -253,6 +266,19 @@ pub struct MemPoolTxMetadata { pub accept_time: u64, } +impl MemPoolTxMetadata { + pub fn get_unknown_nonces(&self) -> Vec { + let mut needs_nonces = vec![]; + if self.last_known_origin_nonce.is_none() { + needs_nonces.push(self.origin_address); + } + if self.last_known_sponsor_nonce.is_none() { + needs_nonces.push(self.sponsor_address); + } + needs_nonces + } +} + #[derive(Debug, Clone)] pub struct MemPoolWalkSettings { /// Minimum transaction fee that will be considered @@ -264,6 +290,11 @@ pub struct MemPoolWalkSettings { /// That is, with x%, when picking the next transaction to include a block, select one that /// either failed to get a cost estimate or has not been estimated yet. pub consider_no_estimate_tx_prob: u8, + /// Size of the nonce cache. This avoids MARF look-ups. + pub nonce_cache_size: u64, + /// Size of the candidate cache. These are the candidates that will be retried after each + /// transaction is mined. + pub candidate_retry_cache_size: u64, } impl MemPoolWalkSettings { @@ -272,6 +303,8 @@ impl MemPoolWalkSettings { min_tx_fee: 1, max_walk_time_ms: u64::max_value(), consider_no_estimate_tx_prob: 5, + nonce_cache_size: 1024 * 1024, + candidate_retry_cache_size: 64 * 1024, } } pub fn zero() -> MemPoolWalkSettings { @@ -279,6 +312,8 @@ impl MemPoolWalkSettings { min_tx_fee: 0, max_walk_time_ms: u64::max_value(), consider_no_estimate_tx_prob: 5, + nonce_cache_size: 1024 * 1024, + candidate_retry_cache_size: 64 * 1024, } } } @@ -341,6 +376,29 @@ impl FromRow for MemPoolTxInfo { } } +impl FromRow for MemPoolTxInfoPartial { + fn from_row<'a>(row: &'a Row) -> Result { + let txid = Txid::from_column(row, "txid")?; + let fee_rate: Option = match row.get("fee_rate") { + Ok(rate) => Some(rate), + Err(_) => None, + }; + let origin_address = StacksAddress::from_column(row, "origin_address")?; + let origin_nonce = u64::from_column(row, "origin_nonce")?; + let sponsor_address = StacksAddress::from_column(row, "sponsor_address")?; + let sponsor_nonce = u64::from_column(row, "sponsor_nonce")?; + + Ok(MemPoolTxInfoPartial { + txid, + fee_rate, + origin_address, + origin_nonce, + sponsor_address, + sponsor_nonce, + }) + } +} + impl FromRow<(u64, u64)> for (u64, u64) { fn from_row<'a>(row: &'a Row) -> Result<(u64, u64), db_error> { let t1: i64 = row.get_unwrap(0); @@ -381,6 +439,8 @@ const MEMPOOL_SCHEMA_2_COST_ESTIMATOR: &'static [&'static str] = &[ FOREIGN KEY (txid) REFERENCES mempool (txid) ON DELETE CASCADE ON UPDATE CASCADE ); "#, + // The `last_known_*_nonce` columns are no longer used, beginning in schema 6, + // in favor of a separate `nonces` table and an in-memory cache. r#" ALTER TABLE mempool ADD COLUMN last_known_origin_nonce INTEGER; "#, @@ -416,6 +476,73 @@ const MEMPOOL_SCHEMA_3_BLOOM_STATE: &'static [&'static str] = &[ "#, ]; +const MEMPOOL_SCHEMA_4_BLACKLIST: &'static [&'static str] = &[ + r#" + -- List of transactions that will never be stored to the mempool again, for as long as the rows exist. + -- `arrival_time` indicates when the entry was created. This is used to garbage-collect the list. + -- A transaction that is blacklisted may still be served from the mempool, but it will never be (re)submitted. + CREATE TABLE IF NOT EXISTS tx_blacklist( + txid TEXT PRIMARY KEY NOT NULL, + arrival_time INTEGER NOT NULL + ); + "#, + r#" + -- Count the number of entries in the blacklist + CREATE TABLE IF NOT EXISTS tx_blacklist_size( + size INTEGER NOT NULL + ); + "#, + r#" + -- Maintain a count of the size of the blacklist + CREATE TRIGGER IF NOT EXISTS tx_blacklist_size_inc + AFTER INSERT ON tx_blacklist + BEGIN + UPDATE tx_blacklist_size SET size = size + 1; + END + "#, + r#" + CREATE TRIGGER IF NOT EXISTS tx_blacklist_size_dec + AFTER DELETE ON tx_blacklist + BEGIN + UPDATE tx_blacklist_size SET size = size - 1; + END + "#, + r#" + INSERT INTO tx_blacklist_size (size) VALUES (0) + "#, + r#" + INSERT INTO schema_version (version) VALUES (4) + "#, +]; + +const MEMPOOL_SCHEMA_5: &'static [&'static str] = &[ + r#" + ALTER TABLE mempool ADD COLUMN fee_rate NUMBER; + "#, + r#" + CREATE INDEX IF NOT EXISTS by_fee_rate ON mempool(fee_rate); + "#, + r#" + UPDATE mempool + SET fee_rate = (SELECT f.fee_rate FROM fee_estimates as f WHERE f.txid = mempool.txid); + "#, + r#" + INSERT INTO schema_version (version) VALUES (5) + "#, +]; + +const MEMPOOL_SCHEMA_6_NONCES: &'static [&'static str] = &[ + r#" + CREATE TABLE nonces( + address TEXT PRIMARY KEY NOT NULL, + nonce INTEGER NOT NULL + ); + "#, + r#" + INSERT INTO schema_version (version) VALUES (6) + "#, +]; + const MEMPOOL_INDEXES: &'static [&'static str] = &[ "CREATE INDEX IF NOT EXISTS by_txid ON mempool(txid);", "CREATE INDEX IF NOT EXISTS by_height ON mempool(height);", @@ -427,6 +554,7 @@ const MEMPOOL_INDEXES: &'static [&'static str] = &[ "CREATE INDEX IF NOT EXISTS fee_by_txid ON fee_estimates(txid);", "CREATE INDEX IF NOT EXISTS by_ordered_hashed_txid ON randomized_txids(hashed_txid ASC);", "CREATE INDEX IF NOT EXISTS by_hashed_txid ON randomized_txids(txid,hashed_txid);", + "CREATE INDEX IF NOT EXISTS by_arrival_time_desc ON tx_blacklist(arrival_time DESC);", ]; pub struct MemPoolDB { @@ -644,6 +772,236 @@ impl MemPoolTxInfo { } } +/// Used to locally cache nonces to avoid repeatedly looking them up in the nonce. +struct NonceCache { + cache: HashMap, + /// The maximum size that this cache can be. + max_cache_size: usize, +} + +impl NonceCache { + fn new(nonce_cache_size: u64) -> Self { + let max_size: usize = nonce_cache_size + .try_into() + .expect("Could not cast `nonce_cache_size` as `usize`."); + Self { + cache: HashMap::new(), + max_cache_size: max_size, + } + } + + /// Get a nonce from the cache. + /// First, the RAM cache will be checked for this address. + /// If absent, then the `nonces` table will be queried for this address. + /// If absent, then the MARF will be queried for this address. + /// + /// If not in RAM, the nonce will be opportunistically stored to the `nonces` table. If that + /// fails due to lock contention, then the method will return `true` for its second tuple argument. + /// + /// Returns (nonce, should-try-store-again?) + fn get( + &mut self, + address: &StacksAddress, + clarity_tx: &mut C, + mempool_db: &DBConn, + ) -> (u64, bool) + where + C: ClarityConnection, + { + #[cfg(test)] + assert!(self.cache.len() <= self.max_cache_size); + + // Check in-memory cache + match self.cache.get(address) { + Some(nonce) => (*nonce, false), + None => { + // Check sqlite cache + let opt_nonce = match db_get_nonce(mempool_db, address) { + Ok(opt_nonce) => opt_nonce, + Err(e) => { + warn!("error retrieving nonce from mempool db: {}", e); + None + } + }; + match opt_nonce { + Some(nonce) => { + // Copy this into the in-memory cache if there is space + if self.cache.len() < self.max_cache_size { + self.cache.insert(address.clone(), nonce); + } + (nonce, false) + } + None => { + let nonce = + StacksChainState::get_nonce(clarity_tx, &address.clone().into()); + + let should_store_again = match db_set_nonce(mempool_db, address, nonce) { + Ok(_) => false, + Err(e) => { + warn!("error caching nonce to sqlite: {}", e); + true + } + }; + + if self.cache.len() < self.max_cache_size { + self.cache.insert(address.clone(), nonce); + } + (nonce, should_store_again) + } + } + } + } + } + + /// Store the (address, nonce) pair to the `nonces` table. + /// If storage fails, return false. + /// Otherwise return true. + fn update(&mut self, address: StacksAddress, value: u64, mempool_db: &DBConn) -> bool { + // Sqlite cache + let success = match db_set_nonce(mempool_db, &address, value) { + Ok(_) => true, + Err(e) => { + warn!("error caching nonce to sqlite: {}", e); + false + } + }; + + // In-memory cache + match self.cache.get_mut(&address) { + Some(nonce) => { + *nonce = value; + } + None => (), + } + + success + } +} + +fn db_set_nonce(conn: &DBConn, address: &StacksAddress, nonce: u64) -> Result<(), db_error> { + let addr_str = address.to_string(); + let nonce_i64 = u64_to_sql(nonce)?; + + let sql = "INSERT OR REPLACE INTO nonces (address, nonce) VALUES (?1, ?2)"; + conn.execute(sql, rusqlite::params![&addr_str, nonce_i64])?; + Ok(()) +} + +fn db_get_nonce(conn: &DBConn, address: &StacksAddress) -> Result, db_error> { + let addr_str = address.to_string(); + + let sql = "SELECT nonce FROM nonces WHERE address = ?"; + query_row(conn, sql, rusqlite::params![&addr_str]) +} + +#[cfg(test)] +pub fn db_get_all_nonces(conn: &DBConn) -> Result, db_error> { + let sql = "SELECT * FROM nonces"; + let mut stmt = conn.prepare(&sql).map_err(|e| db_error::SqliteError(e))?; + let mut iter = stmt + .query(NO_PARAMS) + .map_err(|e| db_error::SqliteError(e))?; + let mut ret = vec![]; + while let Ok(Some(row)) = iter.next() { + let addr = StacksAddress::from_column(row, "address")?; + let nonce = u64::from_column(row, "nonce")?; + ret.push((addr, nonce)); + } + Ok(ret) +} + +/// Cache potential candidate transactions for subsequent iterations. +/// While walking the mempool, transactions that have nonces that are too high +/// to process yet (but could be processed in the future) are added to `next`. +/// In the next pass, `next` is moved to `cache` and these transactions are +/// checked before reading more from the mempool DB. +struct CandidateCache { + cache: VecDeque, + next: VecDeque, + /// The maximum size that this cache can be. + max_cache_size: usize, +} + +impl CandidateCache { + fn new(candidate_retry_cache_size: u64) -> Self { + let max_size: usize = candidate_retry_cache_size + .try_into() + .expect("Could not cast `candidate_retry_cache_size` as usize."); + Self { + cache: VecDeque::new(), + next: VecDeque::new(), + max_cache_size: max_size, + } + } + + /// Retrieve the next candidate transaction from the cache. + fn next(&mut self) -> Option { + self.cache.pop_front() + } + + /// Push a candidate to the cache for the next iteration. + fn push(&mut self, tx: MemPoolTxInfoPartial) { + if self.next.len() < self.max_cache_size { + self.next.push_back(tx); + } + + #[cfg(test)] + assert!(self.cache.len() + self.next.len() <= self.max_cache_size); + } + + /// Prepare for the next iteration, transferring transactions from `next` to `cache`. + fn reset(&mut self) { + // We do not need a size check here, because the cache can only grow in size + // after `cache` is empty. New transactions are not walked until the entire + // cache has been walked, so whenever we are adding brand new transactions to + // the cache, `cache` must, by definition, be empty. The size of `next` + // can grow beyond the previous iteration's cache, and that is limited inside + // the `push` method. + self.next.append(&mut self.cache); + self.cache = std::mem::take(&mut self.next); + + #[cfg(test)] + { + assert!(self.cache.len() <= self.max_cache_size + 1); + assert!(self.next.len() <= self.max_cache_size + 1); + } + } + + /// Total length of the cache. + fn len(&self) -> usize { + self.cache.len() + self.next.len() + } +} + +/// Evaluates the pair of nonces, to determine an order +/// +/// Returns: +/// `Equal` if both origin and sponsor nonces match expected +/// `Less` if the origin nonce is less than expected, or the origin matches expected and the +/// sponsor nonce is less than expected +/// `Greater` if the origin nonce is greater than expected, or the origin matches expected +/// and the sponsor nonce is greater than expected +fn order_nonces( + origin_actual: u64, + origin_expected: u64, + sponsor_actual: u64, + sponsor_expected: u64, +) -> Ordering { + if origin_actual < origin_expected { + return Ordering::Less; + } else if origin_actual > origin_expected { + return Ordering::Greater; + } + + if sponsor_actual < sponsor_expected { + return Ordering::Less; + } else if sponsor_actual > sponsor_expected { + return Ordering::Greater; + } + + Ordering::Equal +} + impl MemPoolDB { fn instantiate_mempool_db(conn: &mut DBConn) -> Result<(), db_error> { let mut tx = tx_begin_immediate(conn)?; @@ -694,6 +1052,15 @@ impl MemPoolDB { MemPoolDB::instantiate_bloom_state(tx)?; } 3 => { + MemPoolDB::instantiate_tx_blacklist(tx)?; + } + 4 => { + MemPoolDB::denormalize_fee_rate(tx)?; + } + 5 => { + MemPoolDB::instantiate_nonces(tx)?; + } + 6 => { break; } _ => { @@ -738,6 +1105,33 @@ impl MemPoolDB { Ok(()) } + /// Denormalize fee rate schema 5 + fn denormalize_fee_rate(tx: &DBTx) -> Result<(), db_error> { + for sql_exec in MEMPOOL_SCHEMA_5 { + tx.execute_batch(sql_exec)?; + } + + Ok(()) + } + + /// Instantiate the tx blacklist schema + fn instantiate_tx_blacklist(tx: &DBTx) -> Result<(), db_error> { + for sql_exec in MEMPOOL_SCHEMA_4_BLACKLIST { + tx.execute_batch(sql_exec)?; + } + + Ok(()) + } + + /// Add the nonce table + fn instantiate_nonces(tx: &DBTx) -> Result<(), db_error> { + for sql_exec in MEMPOOL_SCHEMA_6_NONCES { + tx.execute_batch(sql_exec)?; + } + + Ok(()) + } + pub fn db_path(chainstate_root_path: &str) -> Result { let mut path = PathBuf::from(chainstate_root_path); @@ -820,117 +1214,12 @@ impl MemPoolDB { }) } - pub fn reset_last_known_nonces(&mut self) -> Result<(), db_error> { - let sql = - "UPDATE mempool SET last_known_origin_nonce = NULL, last_known_sponsor_nonce = NULL"; + pub fn reset_nonce_cache(&mut self) -> Result<(), db_error> { + let sql = "DELETE FROM nonces"; self.db.execute(sql, rusqlite::NO_PARAMS)?; Ok(()) } - fn bump_last_known_nonces(&self, address: &StacksAddress) -> Result<(), db_error> { - let query_by = address.to_string(); - - let sql = "UPDATE mempool SET last_known_origin_nonce = last_known_origin_nonce + 1 - WHERE origin_address = ? AND last_known_origin_nonce IS NOT NULL"; - self.db.execute(sql, &[&query_by])?; - - let sql = "UPDATE mempool SET last_known_sponsor_nonce = last_known_sponsor_nonce + 1 - WHERE sponsor_address = ? AND last_known_sponsor_nonce IS NOT NULL"; - self.db.execute(sql, &[&query_by])?; - Ok(()) - } - - fn update_last_known_nonces( - &self, - address: &StacksAddress, - nonce: u64, - ) -> Result<(), db_error> { - let addr_str = address.to_string(); - let nonce_i64 = u64_to_sql(nonce)?; - - let sql = "UPDATE mempool SET last_known_origin_nonce = ? WHERE origin_address = ?"; - self.db - .execute(sql, rusqlite::params![nonce_i64, &addr_str])?; - - let sql = "UPDATE mempool SET last_known_sponsor_nonce = ? WHERE sponsor_address = ?"; - self.db - .execute(sql, rusqlite::params![nonce_i64, &addr_str])?; - - Ok(()) - } - - /// Select the next TX to consider from the pool of transactions without cost estimates. - /// If a transaction is found, returns Some object containing the transaction and a boolean indicating - /// whether or not the miner should propagate transaction receipts back to the estimator. - fn get_next_tx_to_consider_no_estimate( - &self, - ) -> Result, db_error> { - let select_no_estimate = "SELECT * FROM mempool LEFT JOIN fee_estimates as f ON mempool.txid = f.txid WHERE - ((origin_nonce = last_known_origin_nonce AND - sponsor_nonce = last_known_sponsor_nonce) OR (last_known_origin_nonce is NULL) OR (last_known_sponsor_nonce is NULL)) - AND f.fee_rate IS NULL ORDER BY tx_fee DESC LIMIT 1"; - query_row(&self.db, select_no_estimate, rusqlite::NO_PARAMS) - .map(|opt_tx| opt_tx.map(|tx| (tx, true))) - } - - /// Select the next TX to consider from the pool of transactions with cost estimates. - /// If a transaction is found, returns Some object containing the transaction and a boolean indicating - /// whether or not the miner should propagate transaction receipts back to the estimator. - fn get_next_tx_to_consider_with_estimate( - &self, - ) -> Result, db_error> { - let select_estimate = "SELECT * FROM mempool LEFT OUTER JOIN fee_estimates as f ON mempool.txid = f.txid WHERE - ((origin_nonce = last_known_origin_nonce AND - sponsor_nonce = last_known_sponsor_nonce) OR (last_known_origin_nonce is NULL) OR (last_known_sponsor_nonce is NULL)) - AND f.fee_rate IS NOT NULL ORDER BY f.fee_rate DESC LIMIT 1"; - query_row(&self.db, select_estimate, rusqlite::NO_PARAMS) - .map(|opt_tx| opt_tx.map(|tx| (tx, false))) - } - - /// * `start_with_no_estimate` - Pass `true` to make this function - /// start by considering transactions without a cost - /// estimate, and if none are found, use transactions with a cost estimate. - /// Pass `false` for the opposite behavior. - fn get_next_tx_to_consider( - &self, - start_with_no_estimate: bool, - ) -> Result { - let (next_tx, update_estimate): (MemPoolTxInfo, bool) = if start_with_no_estimate { - match self.get_next_tx_to_consider_no_estimate()? { - Some(result) => result, - None => match self.get_next_tx_to_consider_with_estimate()? { - Some(result) => result, - None => return Ok(ConsiderTransactionResult::NoTransactions), - }, - } - } else { - match self.get_next_tx_to_consider_with_estimate()? { - Some(result) => result, - None => match self.get_next_tx_to_consider_no_estimate()? { - Some(result) => result, - None => return Ok(ConsiderTransactionResult::NoTransactions), - }, - } - }; - - let mut needs_nonces = vec![]; - if next_tx.metadata.last_known_origin_nonce.is_none() { - needs_nonces.push(next_tx.metadata.origin_address); - } - if next_tx.metadata.last_known_sponsor_nonce.is_none() { - needs_nonces.push(next_tx.metadata.sponsor_address); - } - - if !needs_nonces.is_empty() { - Ok(ConsiderTransactionResult::UpdateNonces(needs_nonces)) - } else { - Ok(ConsiderTransactionResult::Consider(ConsiderTransaction { - tx: next_tx, - update_estimate, - })) - } - } - /// Find the origin addresses who have sent the highest-fee transactions fn find_origin_addresses_by_descending_fees( &self, @@ -966,8 +1255,7 @@ impl MemPoolDB { let sql_tx = tx_begin_immediate(&mut self.db)?; let txs: Vec = query_rows( &sql_tx, - "SELECT * FROM mempool as m LEFT OUTER JOIN fee_estimates as f ON - m.txid = f.txid WHERE f.fee_rate IS NULL LIMIT ?", + "SELECT * FROM mempool as m WHERE m.fee_rate IS NULL LIMIT ?", &[max_updates], )?; let mut updated = 0; @@ -992,8 +1280,8 @@ impl MemPoolDB { }; sql_tx.execute( - "INSERT OR REPLACE INTO fee_estimates(txid, fee_rate) VALUES (?, ?)", - rusqlite::params![&txid, fee_rate_f64], + "UPDATE mempool SET fee_rate = ? WHERE txid = ?", + rusqlite::params![fee_rate_f64, &txid], )?; updated += 1; } @@ -1003,25 +1291,65 @@ impl MemPoolDB { Ok(updated) } - /// + /// Helper method to record nonces to a retry-buffer. + /// This is needed for when we try to write-through a new (address, nonce) pair to the on-disk + /// `nonces` cache, but the write fails due to lock contention from another thread. The + /// retry-buffer will be used to later store this data in a single transaction. + fn save_nonce_for_retry( + retry_store: &mut HashMap, + max_size: u64, + addr: StacksAddress, + new_nonce: u64, + ) { + if (retry_store.len() as u64) < max_size { + if let Some(nonce) = retry_store.get_mut(&addr) { + *nonce = cmp::max(new_nonce, *nonce); + } else { + retry_store.insert(addr, new_nonce); + } + } + } + /// Iterate over candidates in the mempool - /// `todo` will be called once for each transaction whose origin nonce is equal - /// to the origin account's nonce. At most one transaction per origin will be - /// considered by this method, and transactions will be considered in - /// highest-fee-first order. This method is interruptable -- in the `settings` struct, the - /// caller may choose how long to spend iterating before this method stops. + /// `todo` will be called once for each transaction that is a valid + /// candidate for inclusion in the next block, meaning its origin and + /// sponsor nonces are equal to the nonces of the corresponding accounts. + /// Best effort will be made to process the transactions in fee-rate order. + /// That is, transactions will be processed in fee-rate order until the + /// candidate cache is full, at which point, transactions with a lower + /// fee-rate may be considered before those with a higher fee-rate. + /// When the candidate cache fills, a subsequent call to + /// `iterate_candidates` will be needed to reconsider transactions which + /// were skipped on the first pass, but become valid after some lower + /// fee-rate transactions are considered. + /// + /// The size of the candidate cache and the nonce cache are configurable + /// in the settings struct. This method is interruptable -- in the + /// `settings` struct, the caller may choose how long to spend iterating + /// before this method stops. /// - /// `todo` returns a boolean representing whether or not to keep iterating. + /// `todo` returns an option to a `TransactionEvent` representing the + /// outcome, or None to indicate that iteration through the mempool should + /// be halted. + /// + /// `output_events` is modified in place, adding all substantive + /// transaction events (success and error events, but not skipped) output + /// by `todo`. pub fn iterate_candidates( &mut self, clarity_tx: &mut C, + output_events: &mut Vec, _tip_height: u64, settings: MemPoolWalkSettings, mut todo: F, ) -> Result where C: ClarityConnection, - F: FnMut(&mut C, &ConsiderTransaction, &mut dyn CostEstimator) -> Result, + F: FnMut( + &mut C, + &ConsiderTransaction, + &mut dyn CostEstimator, + ) -> Result, E>, E: From + From, { let start_time = Instant::now(); @@ -1031,7 +1359,39 @@ impl MemPoolDB { let tx_consideration_sampler = Uniform::new(0, 100); let mut rng = rand::thread_rng(); - let mut remember_start_with_estimate = None; + let mut candidate_cache = CandidateCache::new(settings.candidate_retry_cache_size); + let mut nonce_cache = NonceCache::new(settings.nonce_cache_size); + + // set of (address, nonce) to store after the inner loop completes. This will be done in a + // single transaction. This cannot grow to more than `settings.nonce_cache_size` entries. + let mut retry_store = HashMap::new(); + + let sql = " + SELECT txid, origin_nonce, origin_address, sponsor_nonce, sponsor_address, fee_rate + FROM mempool + WHERE fee_rate IS NULL + "; + let mut query_stmt_null = self + .db + .prepare(&sql) + .map_err(|err| Error::SqliteError(err))?; + let mut null_iterator = query_stmt_null + .query(NO_PARAMS) + .map_err(|err| Error::SqliteError(err))?; + + let sql = " + SELECT txid, origin_nonce, origin_address, sponsor_nonce, sponsor_address, fee_rate + FROM mempool + WHERE fee_rate IS NOT NULL + ORDER BY fee_rate DESC + "; + let mut query_stmt_fee = self + .db + .prepare(&sql) + .map_err(|err| Error::SqliteError(err))?; + let mut fee_iterator = query_stmt_fee + .query(NO_PARAMS) + .map_err(|err| Error::SqliteError(err))?; loop { if start_time.elapsed().as_millis() > settings.max_walk_time_ms as u128 { @@ -1040,58 +1400,209 @@ impl MemPoolDB { break; } - let start_with_no_estimate = remember_start_with_estimate.unwrap_or_else(|| { - tx_consideration_sampler.sample(&mut rng) < settings.consider_no_estimate_tx_prob - }); + let start_with_no_estimate = + tx_consideration_sampler.sample(&mut rng) < settings.consider_no_estimate_tx_prob; - match self.get_next_tx_to_consider(start_with_no_estimate)? { - ConsiderTransactionResult::NoTransactions => { - debug!("No more transactions to consider in mempool"); - break; + // First, try to read from the retry list + let (candidate, update_estimate) = match candidate_cache.next() { + Some(tx) => { + let update_estimate = tx.fee_rate.is_none(); + (tx, update_estimate) } - ConsiderTransactionResult::UpdateNonces(addresses) => { - // if we need to update the nonce for the considered transaction, - // use the last value of start_with_no_estimate on the next loop - remember_start_with_estimate = Some(start_with_no_estimate); - let mut last_addr = None; - for address in addresses.into_iter() { - debug!("Update nonce"; "address" => %address); - // do not recheck nonces if the sponsor == origin - if last_addr.as_ref() == Some(&address) { - continue; + None => { + // When the retry list is empty, read from the mempool db, + // randomly selecting from either the null fee-rate transactions + // or those with fee-rate estimates. + let opt_tx = if start_with_no_estimate { + null_iterator + .next() + .map_err(|err| Error::SqliteError(err))? + } else { + fee_iterator.next().map_err(|err| Error::SqliteError(err))? + }; + match opt_tx { + Some(row) => (MemPoolTxInfoPartial::from_row(row)?, start_with_no_estimate), + None => { + // If the selected iterator is empty, check the other + match if start_with_no_estimate { + fee_iterator.next().map_err(|err| Error::SqliteError(err))? + } else { + null_iterator + .next() + .map_err(|err| Error::SqliteError(err))? + } { + Some(row) => ( + MemPoolTxInfoPartial::from_row(row)?, + !start_with_no_estimate, + ), + None => { + debug!("No more transactions to consider in mempool"); + break; + } + } } - let min_nonce = - StacksChainState::get_account(clarity_tx, &address.clone().into()) - .nonce; - - self.update_last_known_nonces(&address, min_nonce)?; - last_addr = Some(address) } } - ConsiderTransactionResult::Consider(consider) => { - // if we actually consider the chosen transaction, - // compute a new start_with_no_estimate on the next loop - remember_start_with_estimate = None; - debug!("Consider mempool transaction"; + }; + + // Check the nonces. + let (expected_origin_nonce, retry_store_origin_nonce) = + nonce_cache.get(&candidate.origin_address, clarity_tx, self.conn()); + let (expected_sponsor_nonce, retry_store_sponsor_nonce) = + nonce_cache.get(&candidate.sponsor_address, clarity_tx, self.conn()); + + // Try storing these nonces later if we failed to do so here, e.g. due to some other + // thread holding the write-lock on the mempool DB. + if retry_store_origin_nonce { + Self::save_nonce_for_retry( + &mut retry_store, + settings.nonce_cache_size, + candidate.origin_address.clone(), + expected_origin_nonce, + ); + } + if retry_store_sponsor_nonce { + Self::save_nonce_for_retry( + &mut retry_store, + settings.nonce_cache_size, + candidate.sponsor_address.clone(), + expected_sponsor_nonce, + ); + } + + match order_nonces( + candidate.origin_nonce, + expected_origin_nonce, + candidate.sponsor_nonce, + expected_sponsor_nonce, + ) { + Ordering::Less => { + debug!( + "Mempool: unexecutable: drop tx {}:{} ({})", + candidate.origin_address, + candidate.origin_nonce, + candidate.fee_rate.unwrap_or_default() + ); + // This transaction cannot execute in this pass, just drop it + continue; + } + Ordering::Greater => { + debug!( + "Mempool: nonces too high, cached for later {}:{} ({})", + candidate.origin_address, + candidate.origin_nonce, + candidate.fee_rate.unwrap_or_default() + ); + // This transaction could become runnable in this pass, save it for later + candidate_cache.push(candidate); + continue; + } + Ordering::Equal => { + // Candidate transaction: fall through + debug!("Mempool: fall through."); + } + }; + + // Read in and deserialize the transaction. + let tx_info_option = MemPoolDB::get_tx(&self.conn(), &candidate.txid)?; + let tx_info = match tx_info_option { + Some(tx) => tx, + None => { + // Note: Don't panic here because maybe the state has changed from garbage collection. + warn!("Miner: could not find a tx for id {:?}", &candidate.txid); + continue; + } + }; + + let consider = ConsiderTransaction { + tx: tx_info, + update_estimate, + }; + debug!("Consider mempool transaction"; "txid" => %consider.tx.tx.txid(), "origin_addr" => %consider.tx.metadata.origin_address, + "origin_nonce" => candidate.origin_nonce, "sponsor_addr" => %consider.tx.metadata.sponsor_address, + "sponsor_nonce" => candidate.sponsor_nonce, "accept_time" => consider.tx.metadata.accept_time, "tx_fee" => consider.tx.metadata.tx_fee, + "fee_rate" => candidate.fee_rate, "size" => consider.tx.metadata.len); - total_considered += 1; - - if !todo(clarity_tx, &consider, self.cost_estimator.as_mut())? { - debug!("Mempool iteration early exit from iterator"); - break; - } - - self.bump_last_known_nonces(&consider.tx.metadata.origin_address)?; - if consider.tx.tx.auth.is_sponsored() { - self.bump_last_known_nonces(&consider.tx.metadata.sponsor_address)?; + total_considered += 1; + + // Run `todo` on the transaction. + match todo(clarity_tx, &consider, self.cost_estimator.as_mut())? { + Some(tx_event) => { + match tx_event { + TransactionEvent::Success(_) => { + // Bump nonces in the cache for the executed transaction + let stored = nonce_cache.update( + consider.tx.metadata.origin_address, + expected_origin_nonce + 1, + self.conn(), + ); + if !stored { + Self::save_nonce_for_retry( + &mut retry_store, + settings.nonce_cache_size, + consider.tx.metadata.origin_address, + expected_origin_nonce + 1, + ); + } + + if consider.tx.tx.auth.is_sponsored() { + let stored = nonce_cache.update( + consider.tx.metadata.sponsor_address, + expected_sponsor_nonce + 1, + self.conn(), + ); + if !stored { + Self::save_nonce_for_retry( + &mut retry_store, + settings.nonce_cache_size, + consider.tx.metadata.sponsor_address, + expected_sponsor_nonce + 1, + ); + } + } + output_events.push(tx_event); + } + TransactionEvent::Skipped(_) => { + // don't push `Skipped` events to the observer + } + _ => { + output_events.push(tx_event); + } } } + None => { + debug!("Mempool iteration early exit from iterator"); + break; + } + } + + // Reset for finding the next transaction to process + debug!( + "Mempool: reset: retry list has {} entries", + candidate_cache.len() + ); + candidate_cache.reset(); + } + + // drop these rusqlite statements and queries, since their existence as immutable borrows on the + // connection prevents us from beginning a transaction below (which requires a mutable + // borrow). + drop(null_iterator); + drop(fee_iterator); + drop(query_stmt_null); + drop(query_stmt_fee); + + if retry_store.len() > 0 { + let tx = self.tx_begin()?; + for (address, nonce) in retry_store.into_iter() { + nonce_cache.update(address, nonce, &tx); } + tx.commit()?; } debug!( @@ -1319,7 +1830,7 @@ impl MemPoolDB { } else { // there's a >= fee tx in this fork, cannot add info!("TX conflicts with sponsor/origin nonce in same fork with >= fee"; - "new_txid" => %txid, + "new_txid" => %txid, "old_txid" => %prior_tx.txid, "origin_addr" => %origin_address, "origin_nonce" => origin_nonce, @@ -1520,8 +2031,8 @@ impl MemPoolDB { mempool_tx .execute( - "INSERT OR REPLACE INTO fee_estimates(txid, fee_rate) VALUES (?, ?)", - rusqlite::params![&txid, fee_rate_estimate], + "UPDATE mempool SET fee_rate = ? WHERE txid = ?", + rusqlite::params![fee_rate_estimate, &txid], ) .map_err(db_error::from)?; diff --git a/src/core/tests/mod.rs b/src/core/tests/mod.rs index 0e8eacb71f..02e26fcf7e 100644 --- a/src/core/tests/mod.rs +++ b/src/core/tests/mod.rs @@ -25,6 +25,8 @@ use crate::chainstate::stacks::db::test::chainstate_path; use crate::chainstate::stacks::db::test::instantiate_chainstate; use crate::chainstate::stacks::db::test::instantiate_chainstate_with_balances; use crate::chainstate::stacks::db::StreamCursor; +use crate::chainstate::stacks::events::StacksTransactionReceipt; +use crate::chainstate::stacks::miner::TransactionResult; use crate::chainstate::stacks::test::codec_all_transactions; use crate::chainstate::stacks::{ db::blocks::MemPoolRejection, db::StacksChainState, index::MarfTrieId, CoinbasePayload, @@ -97,7 +99,7 @@ fn mempool_db_init() { } #[cfg(test)] -fn make_block( +pub fn make_block( chainstate: &mut StacksChainState, block_consensus: ConsensusHash, parent: &(ConsensusHash, BlockHeaderHash), @@ -279,7 +281,7 @@ fn mempool_walk_over_fork() { let mut mempool_settings = MemPoolWalkSettings::default(); mempool_settings.min_tx_fee = 10; - + let mut tx_events = Vec::new(); chainstate.with_read_only_clarity_tx( &TEST_BURN_STATE_DB, &StacksBlockHeader::make_index_block_hash(&b_2.0, &b_2.1), @@ -288,11 +290,25 @@ fn mempool_walk_over_fork() { mempool .iterate_candidates::<_, ChainstateError, _>( clarity_conn, + &mut tx_events, 2, mempool_settings.clone(), |_, available_tx, _| { count_txs += 1; - Ok(true) + Ok(Some( + // Generate any success result + TransactionResult::success( + &available_tx.tx.tx, + available_tx.tx.metadata.tx_fee, + StacksTransactionReceipt::from_stx_transfer( + available_tx.tx.tx.clone(), + vec![], + Value::okay(Value::Bool(true)).unwrap(), + ExecutionCost::zero(), + ), + ) + .convert_to_event(), + )) }, ) .unwrap(); @@ -313,11 +329,25 @@ fn mempool_walk_over_fork() { mempool .iterate_candidates::<_, ChainstateError, _>( clarity_conn, + &mut tx_events, 2, mempool_settings.clone(), |_, available_tx, _| { count_txs += 1; - Ok(true) + Ok(Some( + // Generate any success result + TransactionResult::success( + &available_tx.tx.tx, + available_tx.tx.metadata.tx_fee, + StacksTransactionReceipt::from_stx_transfer( + available_tx.tx.tx.clone(), + vec![], + Value::okay(Value::Bool(true)).unwrap(), + ExecutionCost::zero(), + ), + ) + .convert_to_event(), + )) }, ) .unwrap(); @@ -326,7 +356,7 @@ fn mempool_walk_over_fork() { ); mempool - .reset_last_known_nonces() + .reset_nonce_cache() .expect("Should be able to reset nonces"); chainstate.with_read_only_clarity_tx( @@ -337,11 +367,25 @@ fn mempool_walk_over_fork() { mempool .iterate_candidates::<_, ChainstateError, _>( clarity_conn, + &mut tx_events, 3, mempool_settings.clone(), |_, available_tx, _| { count_txs += 1; - Ok(true) + Ok(Some( + // Generate any success result + TransactionResult::success( + &available_tx.tx.tx, + available_tx.tx.metadata.tx_fee, + StacksTransactionReceipt::from_stx_transfer( + available_tx.tx.tx.clone(), + vec![], + Value::okay(Value::Bool(true)).unwrap(), + ExecutionCost::zero(), + ), + ) + .convert_to_event(), + )) }, ) .unwrap(); @@ -353,7 +397,7 @@ fn mempool_walk_over_fork() { ); mempool - .reset_last_known_nonces() + .reset_nonce_cache() .expect("Should be able to reset nonces"); // The mempool iterator no longer does any consideration of what block accepted @@ -366,11 +410,25 @@ fn mempool_walk_over_fork() { mempool .iterate_candidates::<_, ChainstateError, _>( clarity_conn, + &mut tx_events, 2, mempool_settings.clone(), |_, available_tx, _| { count_txs += 1; - Ok(true) + Ok(Some( + // Generate any success result + TransactionResult::success( + &available_tx.tx.tx, + available_tx.tx.metadata.tx_fee, + StacksTransactionReceipt::from_stx_transfer( + available_tx.tx.tx.clone(), + vec![], + Value::okay(Value::Bool(true)).unwrap(), + ExecutionCost::zero(), + ), + ) + .convert_to_event(), + )) }, ) .unwrap(); @@ -382,7 +440,7 @@ fn mempool_walk_over_fork() { ); mempool - .reset_last_known_nonces() + .reset_nonce_cache() .expect("Should be able to reset nonces"); chainstate.with_read_only_clarity_tx( @@ -393,11 +451,25 @@ fn mempool_walk_over_fork() { mempool .iterate_candidates::<_, ChainstateError, _>( clarity_conn, + &mut tx_events, 3, mempool_settings.clone(), |_, available_tx, _| { count_txs += 1; - Ok(true) + Ok(Some( + // Generate any success result + TransactionResult::success( + &available_tx.tx.tx, + available_tx.tx.metadata.tx_fee, + StacksTransactionReceipt::from_stx_transfer( + available_tx.tx.tx.clone(), + vec![], + Value::okay(Value::Bool(true)).unwrap(), + ExecutionCost::zero(), + ), + ) + .convert_to_event(), + )) }, ) .unwrap(); @@ -409,7 +481,7 @@ fn mempool_walk_over_fork() { ); mempool - .reset_last_known_nonces() + .reset_nonce_cache() .expect("Should be able to reset nonces"); // let's test replace-across-fork while we're here. @@ -522,6 +594,442 @@ fn mempool_walk_over_fork() { ); } +#[test] +/// This test verifies that all transactions are visited, regardless of the +/// setting for `consider_no_estimate_tx_prob`. +fn test_iterate_candidates_consider_no_estimate_tx_prob() { + let mut chainstate = instantiate_chainstate_with_balances( + false, + 0x80000000, + "test_iterate_candidates_consider_no_estimate_tx_prob", + vec![], + ); + let chainstate_path = chainstate_path("test_iterate_candidates_consider_no_estimate_tx_prob"); + let mut mempool = MemPoolDB::open_test(false, 0x80000000, &chainstate_path).unwrap(); + let b_1 = make_block( + &mut chainstate, + ConsensusHash([0x1; 20]), + &( + FIRST_BURNCHAIN_CONSENSUS_HASH.clone(), + FIRST_STACKS_BLOCK_HASH.clone(), + ), + 1, + 1, + ); + let b_2 = make_block(&mut chainstate, ConsensusHash([0x2; 20]), &b_1, 2, 2); + + let mut mempool_settings = MemPoolWalkSettings::default(); + mempool_settings.min_tx_fee = 10; + let mut tx_events = Vec::new(); + + let mut txs = codec_all_transactions( + &TransactionVersion::Testnet, + 0x80000000, + &TransactionAnchorMode::Any, + &TransactionPostConditionMode::Allow, + ); + + // Load 24 transactions into the mempool, alternating whether or not they have a fee-rate. + for nonce in 0..24 { + let mut tx = txs.pop().unwrap(); + let mut mempool_tx = mempool.tx_begin().unwrap(); + + let origin_address = tx.origin_address(); + let origin_nonce = tx.get_origin_nonce(); + let sponsor_address = tx.sponsor_address().unwrap_or(origin_address); + let sponsor_nonce = tx.get_sponsor_nonce().unwrap_or(origin_nonce); + + tx.set_tx_fee(100); + let txid = tx.txid(); + let tx_bytes = tx.serialize_to_vec(); + let tx_fee = tx.get_tx_fee(); + let height = 100; + + MemPoolDB::try_add_tx( + &mut mempool_tx, + &mut chainstate, + &b_1.0, + &b_1.1, + txid, + tx_bytes, + tx_fee, + height, + &origin_address, + nonce, + &sponsor_address, + nonce, + None, + ) + .unwrap(); + + if nonce & 1 == 0 { + mempool_tx + .execute( + "UPDATE mempool SET fee_rate = ? WHERE txid = ?", + rusqlite::params![Some(123.0), &txid], + ) + .unwrap(); + } else { + let none: Option = None; + mempool_tx + .execute( + "UPDATE mempool SET fee_rate = ? WHERE txid = ?", + rusqlite::params![none, &txid], + ) + .unwrap(); + } + + mempool_tx.commit().unwrap(); + } + + // First, with default (5%) + chainstate.with_read_only_clarity_tx( + &TEST_BURN_STATE_DB, + &StacksBlockHeader::make_index_block_hash(&b_2.0, &b_2.1), + |clarity_conn| { + let mut count_txs = 0; + mempool + .iterate_candidates::<_, ChainstateError, _>( + clarity_conn, + &mut tx_events, + 2, + mempool_settings.clone(), + |_, available_tx, _| { + count_txs += 1; + Ok(Some( + // Generate any success result + TransactionResult::success( + &available_tx.tx.tx, + available_tx.tx.metadata.tx_fee, + StacksTransactionReceipt::from_stx_transfer( + available_tx.tx.tx.clone(), + vec![], + Value::okay(Value::Bool(true)).unwrap(), + ExecutionCost::zero(), + ), + ) + .convert_to_event(), + )) + }, + ) + .unwrap(); + assert_eq!(count_txs, 24, "Mempool should find all 24 transactions"); + }, + ); + + // Next with 0% + let _ = mempool.reset_nonce_cache(); + mempool_settings.consider_no_estimate_tx_prob = 0; + + chainstate.with_read_only_clarity_tx( + &TEST_BURN_STATE_DB, + &StacksBlockHeader::make_index_block_hash(&b_2.0, &b_2.1), + |clarity_conn| { + let mut count_txs = 0; + mempool + .iterate_candidates::<_, ChainstateError, _>( + clarity_conn, + &mut tx_events, + 2, + mempool_settings.clone(), + |_, available_tx, _| { + count_txs += 1; + Ok(Some( + // Generate any success result + TransactionResult::success( + &available_tx.tx.tx, + available_tx.tx.metadata.tx_fee, + StacksTransactionReceipt::from_stx_transfer( + available_tx.tx.tx.clone(), + vec![], + Value::okay(Value::Bool(true)).unwrap(), + ExecutionCost::zero(), + ), + ) + .convert_to_event(), + )) + }, + ) + .unwrap(); + assert_eq!(count_txs, 24, "Mempool should find all 24 transactions"); + }, + ); + + // Then with with 100% + let _ = mempool.reset_nonce_cache(); + mempool_settings.consider_no_estimate_tx_prob = 100; + + chainstate.with_read_only_clarity_tx( + &TEST_BURN_STATE_DB, + &StacksBlockHeader::make_index_block_hash(&b_2.0, &b_2.1), + |clarity_conn| { + let mut count_txs = 0; + mempool + .iterate_candidates::<_, ChainstateError, _>( + clarity_conn, + &mut tx_events, + 2, + mempool_settings.clone(), + |_, available_tx, _| { + count_txs += 1; + Ok(Some( + // Generate any success result + TransactionResult::success( + &available_tx.tx.tx, + available_tx.tx.metadata.tx_fee, + StacksTransactionReceipt::from_stx_transfer( + available_tx.tx.tx.clone(), + vec![], + Value::okay(Value::Bool(true)).unwrap(), + ExecutionCost::zero(), + ), + ) + .convert_to_event(), + )) + }, + ) + .unwrap(); + assert_eq!(count_txs, 24, "Mempool should find all 24 transactions"); + }, + ); +} + +#[test] +/// This test verifies that when a transaction is skipped, other transactions +/// from the same address with higher nonces are not considered for inclusion in a block. +fn test_iterate_candidates_skipped_transaction() { + let mut chainstate = instantiate_chainstate_with_balances( + false, + 0x80000000, + "test_iterate_candidates_skipped_transaction", + vec![], + ); + let chainstate_path = chainstate_path("test_iterate_candidates_skipped_transaction"); + let mut mempool = MemPoolDB::open_test(false, 0x80000000, &chainstate_path).unwrap(); + let b_1 = make_block( + &mut chainstate, + ConsensusHash([0x1; 20]), + &( + FIRST_BURNCHAIN_CONSENSUS_HASH.clone(), + FIRST_STACKS_BLOCK_HASH.clone(), + ), + 1, + 1, + ); + let b_2 = make_block(&mut chainstate, ConsensusHash([0x2; 20]), &b_1, 2, 2); + + let mut mempool_settings = MemPoolWalkSettings::default(); + mempool_settings.min_tx_fee = 10; + let mut tx_events = Vec::new(); + + let mut txs = codec_all_transactions( + &TransactionVersion::Testnet, + 0x80000000, + &TransactionAnchorMode::Any, + &TransactionPostConditionMode::Allow, + ); + + // Load 3 transactions into the mempool + for nonce in 0..3 { + let mut tx = txs.pop().unwrap(); + let mut mempool_tx = mempool.tx_begin().unwrap(); + + let origin_address = tx.origin_address(); + let origin_nonce = tx.get_origin_nonce(); + let sponsor_address = tx.sponsor_address().unwrap_or(origin_address); + let sponsor_nonce = tx.get_sponsor_nonce().unwrap_or(origin_nonce); + + tx.set_tx_fee(100); + let txid = tx.txid(); + let tx_bytes = tx.serialize_to_vec(); + let tx_fee = tx.get_tx_fee(); + let height = 100; + + MemPoolDB::try_add_tx( + &mut mempool_tx, + &mut chainstate, + &b_1.0, + &b_1.1, + txid, + tx_bytes, + tx_fee, + height, + &origin_address, + nonce, + &sponsor_address, + nonce, + None, + ) + .unwrap(); + + mempool_tx.commit().unwrap(); + } + + chainstate.with_read_only_clarity_tx( + &TEST_BURN_STATE_DB, + &StacksBlockHeader::make_index_block_hash(&b_2.0, &b_2.1), + |clarity_conn| { + let mut count_txs = 0; + mempool + .iterate_candidates::<_, ChainstateError, _>( + clarity_conn, + &mut tx_events, + 2, + mempool_settings.clone(), + |_, available_tx, _| { + count_txs += 1; + // For the second transaction, return a `Skipped` result + let result = if count_txs == 2 { + TransactionResult::skipped( + &available_tx.tx.tx, + "event not relevant to test".to_string(), + ) + .convert_to_event() + } else { + // Generate any success result + TransactionResult::success( + &available_tx.tx.tx, + available_tx.tx.metadata.tx_fee, + StacksTransactionReceipt::from_stx_transfer( + available_tx.tx.tx.clone(), + vec![], + Value::okay(Value::Bool(true)).unwrap(), + ExecutionCost::zero(), + ), + ) + .convert_to_event() + }; + Ok(Some(result)) + }, + ) + .unwrap(); + assert_eq!( + count_txs, 2, + "Mempool iteration should not proceed past the skipped transaction" + ); + }, + ); +} + +#[test] +/// This test verifies that when a transaction reports a processing error, other transactions +/// from the same address with higher nonces are not considered for inclusion in a block. +fn test_iterate_candidates_processing_error_transaction() { + let mut chainstate = instantiate_chainstate_with_balances( + false, + 0x80000000, + "test_iterate_candidates_processing_error_transaction", + vec![], + ); + let chainstate_path = chainstate_path("test_iterate_candidates_processing_error_transaction"); + let mut mempool = MemPoolDB::open_test(false, 0x80000000, &chainstate_path).unwrap(); + let b_1 = make_block( + &mut chainstate, + ConsensusHash([0x1; 20]), + &( + FIRST_BURNCHAIN_CONSENSUS_HASH.clone(), + FIRST_STACKS_BLOCK_HASH.clone(), + ), + 1, + 1, + ); + let b_2 = make_block(&mut chainstate, ConsensusHash([0x2; 20]), &b_1, 2, 2); + + let mut mempool_settings = MemPoolWalkSettings::default(); + mempool_settings.min_tx_fee = 10; + let mut tx_events = Vec::new(); + + let mut txs = codec_all_transactions( + &TransactionVersion::Testnet, + 0x80000000, + &TransactionAnchorMode::Any, + &TransactionPostConditionMode::Allow, + ); + + // Load 3 transactions into the mempool + for nonce in 0..3 { + let mut tx = txs.pop().unwrap(); + let mut mempool_tx = mempool.tx_begin().unwrap(); + + let origin_address = tx.origin_address(); + let origin_nonce = tx.get_origin_nonce(); + let sponsor_address = tx.sponsor_address().unwrap_or(origin_address); + let sponsor_nonce = tx.get_sponsor_nonce().unwrap_or(origin_nonce); + + tx.set_tx_fee(100); + let txid = tx.txid(); + let tx_bytes = tx.serialize_to_vec(); + let tx_fee = tx.get_tx_fee(); + let height = 100; + + MemPoolDB::try_add_tx( + &mut mempool_tx, + &mut chainstate, + &b_1.0, + &b_1.1, + txid, + tx_bytes, + tx_fee, + height, + &origin_address, + nonce, + &sponsor_address, + nonce, + None, + ) + .unwrap(); + + mempool_tx.commit().unwrap(); + } + + chainstate.with_read_only_clarity_tx( + &TEST_BURN_STATE_DB, + &StacksBlockHeader::make_index_block_hash(&b_2.0, &b_2.1), + |clarity_conn| { + let mut count_txs = 0; + mempool + .iterate_candidates::<_, ChainstateError, _>( + clarity_conn, + &mut tx_events, + 2, + mempool_settings.clone(), + |_, available_tx, _| { + count_txs += 1; + // For the second transaction, return a `Skipped` result + let result = if count_txs == 2 { + TransactionResult::error( + &available_tx.tx.tx, + crate::chainstate::stacks::Error::StacksTransactionSkipped( + "error for testing".to_string(), + ), + ) + .convert_to_event() + } else { + // Generate any success result + TransactionResult::success( + &available_tx.tx.tx, + available_tx.tx.metadata.tx_fee, + StacksTransactionReceipt::from_stx_transfer( + available_tx.tx.tx.clone(), + vec![], + Value::okay(Value::Bool(true)).unwrap(), + ExecutionCost::zero(), + ), + ) + .convert_to_event() + }; + Ok(Some(result)) + }, + ) + .unwrap(); + assert_eq!( + count_txs, 2, + "Mempool iteration should not proceed past the skipped transaction" + ); + }, + ); +} + #[test] fn mempool_do_not_replace_tx() { let mut chainstate = instantiate_chainstate_with_balances( diff --git a/testnet/stacks-node/src/config.rs b/testnet/stacks-node/src/config.rs index 987048e66c..d126ce121c 100644 --- a/testnet/stacks-node/src/config.rs +++ b/testnet/stacks-node/src/config.rs @@ -419,6 +419,12 @@ impl Config { probability_pick_no_estimate_tx: miner .probability_pick_no_estimate_tx .unwrap_or(miner_default_config.probability_pick_no_estimate_tx), + nonce_cache_size: miner + .nonce_cache_size + .unwrap_or(miner_default_config.nonce_cache_size), + candidate_retry_cache_size: miner + .candidate_retry_cache_size + .unwrap_or(miner_default_config.candidate_retry_cache_size), }, None => miner_default_config, }; @@ -781,6 +787,8 @@ impl Config { self.miner.subsequent_attempt_time_ms }, consider_no_estimate_tx_prob: self.miner.probability_pick_no_estimate_tx, + nonce_cache_size: self.miner.nonce_cache_size, + candidate_retry_cache_size: self.miner.candidate_retry_cache_size, }, } } @@ -1389,6 +1397,8 @@ pub struct MinerConfig { pub subsequent_attempt_time_ms: u64, pub microblock_attempt_time_ms: u64, pub probability_pick_no_estimate_tx: u8, + pub nonce_cache_size: u64, + pub candidate_retry_cache_size: u64, } impl MinerConfig { @@ -1399,6 +1409,8 @@ impl MinerConfig { subsequent_attempt_time_ms: 30_000, microblock_attempt_time_ms: 30_000, probability_pick_no_estimate_tx: 5, + nonce_cache_size: 10_000, + candidate_retry_cache_size: 10_000, } } } @@ -1505,6 +1517,8 @@ pub struct MinerConfigFile { pub subsequent_attempt_time_ms: Option, pub microblock_attempt_time_ms: Option, pub probability_pick_no_estimate_tx: Option, + pub nonce_cache_size: Option, + pub candidate_retry_cache_size: Option, } #[derive(Clone, Deserialize, Default)]