diff --git a/Cargo.lock b/Cargo.lock index 0c315da5de285..7c421ad96947a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3079,6 +3079,15 @@ dependencies = [ "substrate-primitives 0.1.0", ] +[[package]] +name = "substrate-transaction-graph" +version = "0.1.0" +dependencies = [ + "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "sr-primitives 0.1.0", +] + [[package]] name = "substrate-transaction-pool" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index cabd3e22dc641..3eb25679835f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,6 @@ members = [ "core/client", "core/client/db", "core/executor", - "core/transaction-pool", "core/keyring", "core/misbehavior-check", "core/network", @@ -37,6 +36,8 @@ members = [ "core/sr-sandbox", "core/sr-std", "core/sr-version", + "core/transaction-graph", + "core/transaction-pool", "srml/support", "srml/balances", "srml/consensus", diff --git a/core/transaction-graph/Cargo.toml b/core/transaction-graph/Cargo.toml new file mode 100644 index 0000000000000..7b1f7b7246aa1 --- /dev/null +++ b/core/transaction-graph/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "substrate-transaction-graph" +version = "0.1.0" +authors = ["Parity Technologies "] + +[dependencies] +error-chain = "0.12" +log = "0.3.0" +sr-primitives = { path = "../sr-primitives" } diff --git a/core/transaction-graph/src/error.rs b/core/transaction-graph/src/error.rs new file mode 100644 index 0000000000000..bf25accac12a0 --- /dev/null +++ b/core/transaction-graph/src/error.rs @@ -0,0 +1,37 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use sr_primitives::transaction_validity::TransactionPriority as Priority; + +error_chain! { + errors { + /// The transaction is already in the pool. + AlreadyImported { + description("Transaction is already in the pool."), + display("Already imported"), + } + /// The transaction cannot be imported cause it's a replacement and has too low priority. + TooLowPriority(old: Priority, new: Priority) { + description("The priority is too low to replace transactions already in the pool."), + display("Too low priority ({} > {})", old, new) + } + /// Deps cycle detected and we couldn't import transaction. + CycleDetected { + description("Transaction was not imported because of detected cycle."), + display("Cycle Detected"), + } + } +} diff --git a/core/transaction-graph/src/future.rs b/core/transaction-graph/src/future.rs new file mode 100644 index 0000000000000..e69358daa7075 --- /dev/null +++ b/core/transaction-graph/src/future.rs @@ -0,0 +1,177 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use std::{ + collections::{HashMap, HashSet}, + hash, +}; + +use sr_primitives::transaction_validity::{ + TransactionTag as Tag, +}; + +use pool::Transaction; + +/// Transaction with partially satisfied dependencies. +#[derive(Debug)] +pub struct WaitingTransaction { + /// Transaction details. + pub transaction: Transaction, + /// Tags that are required and have not been satisfied yet by other transactions in the pool. + pub missing_tags: HashSet, +} + +impl WaitingTransaction { + /// Creates a new `WaitingTransaction`. + /// + /// Computes the set of missing tags based on the requirements and tags that + /// are provided by all transactions in the ready queue. + pub fn new(transaction: Transaction, provided: &HashMap) -> Self { + let missing_tags = transaction.requires + .iter() + .filter(|tag| !provided.contains_key(&**tag)) + .cloned() + .collect(); + + WaitingTransaction { + transaction, + missing_tags, + } + } + + /// Marks the tag as satisfied. + pub fn satisfy_tag(&mut self, tag: &Tag) { + self.missing_tags.remove(tag); + } + + /// Returns true if transaction has all requirements satisfied. + pub fn is_ready(&self) -> bool { + self.missing_tags.is_empty() + } +} + +/// A pool of transactions that are not yet ready to be included in the block. +/// +/// Contains transactions that are still awaiting for some other transactions that +/// could provide a tag that they require. +#[derive(Debug)] +pub struct FutureTransactions { + /// tags that are not yet provided by any transaction and we await for them + wanted_tags: HashMap>, + /// Transactions waiting for a particular other transaction + waiting: HashMap>, +} + +impl Default for FutureTransactions { + fn default() -> Self { + FutureTransactions { + wanted_tags: Default::default(), + waiting: Default::default(), + } + } +} + +const WAITING_PROOF: &str = r"# +In import we always insert to `waiting` if we push to `wanted_tags`; +when removing from `waiting` we always clear `wanted_tags`; +every hash from `wanted_tags` is always present in `waiting`; +qed +#"; + +impl FutureTransactions { + /// Import transaction to Future queue. + /// + /// Only transactions that don't have all their tags satisfied should occupy + /// the Future queue. + /// As soon as required tags are provided by some other transactions that are ready + /// we should remove the transactions from here and move them to the Ready queue. + pub fn import(&mut self, tx: WaitingTransaction) { + assert!(!tx.is_ready(), "Transaction is ready."); + assert!(!self.waiting.contains_key(&tx.transaction.hash), "Transaction is already imported."); + + // Add all tags that are missing + for tag in &tx.missing_tags { + let mut entry = self.wanted_tags.entry(tag.clone()).or_insert_with(HashSet::new); + entry.insert(tx.transaction.hash.clone()); + } + + // Add the transaction to a by-hash waiting map + self.waiting.insert(tx.transaction.hash.clone(), tx); + } + + /// Returns true if given hash is part of the queue. + pub fn contains(&self, hash: &Hash) -> bool { + self.waiting.contains_key(hash) + } + + /// Satisfies provided tags in transactions that are waiting for them. + /// + /// Returns (and removes) transactions that became ready after their last tag got + /// satisfied and now we can remove them from Future and move to Ready queue. + pub fn satisfy_tags>(&mut self, tags: impl IntoIterator) -> Vec> { + let mut became_ready = vec![]; + + for tag in tags { + if let Some(hashes) = self.wanted_tags.remove(tag.as_ref()) { + for hash in hashes { + let is_ready = { + let mut tx = self.waiting.get_mut(&hash) + .expect(WAITING_PROOF); + tx.satisfy_tag(tag.as_ref()); + tx.is_ready() + }; + + if is_ready { + let tx = self.waiting.remove(&hash).expect(WAITING_PROOF); + became_ready.push(tx); + } + } + } + } + + became_ready + } + + /// Removes transactions for given list of hashes. + /// + /// Returns a list of actually removed transactions. + pub fn remove(&mut self, hashes: &[Hash]) -> Vec> { + let mut removed = vec![]; + for hash in hashes { + if let Some(waiting_tx) = self.waiting.remove(hash) { + // remove from wanted_tags as well + for tag in waiting_tx.missing_tags { + let remove = if let Some(mut wanted) = self.wanted_tags.get_mut(&tag) { + wanted.remove(hash); + wanted.is_empty() + } else { false }; + if remove { + self.wanted_tags.remove(&tag); + } + } + // add to result + removed.push(waiting_tx.transaction) + } + } + removed + } + + /// Returns number of transactions in the Future queue. + #[cfg(test)] + pub fn len(&self) -> usize { + self.waiting.len() + } +} diff --git a/core/transaction-graph/src/lib.rs b/core/transaction-graph/src/lib.rs new file mode 100644 index 0000000000000..a33c42a693e90 --- /dev/null +++ b/core/transaction-graph/src/lib.rs @@ -0,0 +1,45 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Generic Transaction Pool +//! +//! The pool is based on dependency graph between transactions +//! and their priority. +//! The pool is able to return an iterator that traverses transaction +//! graph in the correct order taking into account priorities and dependencies. +//! +//! TODO [ToDr] +//! - [ ] Longevity handling (remove obsolete transactions periodically) +//! - [ ] Banning / Future-rotation (once rejected (as invalid) should not be accepted for some time) +//! - [ ] Multi-threading (getting ready transactions should not block the pool) + +#![warn(missing_docs)] +#![warn(unused_extern_crates)] + +extern crate sr_primitives; + +#[macro_use] +extern crate error_chain; + +#[macro_use] +extern crate log; + +mod error; +mod future; +mod pool; +mod ready; + +pub use self::pool::{Transaction, Pool}; diff --git a/core/transaction-graph/src/pool.rs b/core/transaction-graph/src/pool.rs new file mode 100644 index 0000000000000..5dcb55e8f04fd --- /dev/null +++ b/core/transaction-graph/src/pool.rs @@ -0,0 +1,668 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use std::{ + hash, + sync::Arc, +}; + +use sr_primitives::traits::Member; +use sr_primitives::transaction_validity::{ + TransactionTag as Tag, + TransactionLongevity as Longevity, + TransactionPriority as Priority, +}; + +use error; +use future::{FutureTransactions, WaitingTransaction}; +use ready::ReadyTransactions; + +pub type BlockNumber = u64; + +/// Successful import result. +#[derive(Debug, PartialEq, Eq)] +pub enum Imported { + /// Transaction was successfuly imported to Ready queue. + Ready { + /// Hash of transaction that was successfuly imported. + hash: Hash, + /// Transactions that got promoted from the Future queue. + promoted: Vec, + /// Transactions that failed to be promoted from the Future queue and are now discarded. + failed: Vec, + /// Transactions removed from the Ready pool (replaced). + removed: Vec>>, + }, + /// Transaction was successfuly imported to Future queue. + Future { + /// Hash of transaction that was successfuly imported. + hash: Hash, + } +} + +/// Status of pruning the queue. +#[derive(Debug)] +pub struct PruneStatus { + /// A list of imports that satisfying the tag triggered. + pub promoted: Vec>, + /// A list of transactions that failed to be promoted and now are discarded. + pub failed: Vec, + /// A list of transactions that got pruned from the ready queue. + pub pruned: Vec>>, +} + +/// Immutable transaction +#[cfg_attr(test, derive(Clone))] +#[derive(Debug, PartialEq, Eq)] +pub struct Transaction { + /// Raw extrinsic representing that transaction. + pub ex: Vec, + /// Transaction hash (unique) + pub hash: Hash, + /// Transaction priority (higher = better) + pub priority: Priority, + /// How many blocks the transaction is valid for. + pub longevity: Longevity, + /// Tags required by the transaction. + pub requires: Vec, + /// Tags that this transaction provides. + pub provides: Vec, +} + +/// Transaction pool. +/// +/// Builds a dependency graph for all transactions in the pool and returns +/// the ones that are currently ready to be executed. +/// +/// General note: +/// If function returns some transactions it usually means that importing them +/// as-is for the second time will fail or produce unwanted results. +/// Most likely it is required to revalidate them and recompute set of +/// required tags. +#[derive(Default, Debug)] +pub struct Pool { + future: FutureTransactions, + ready: ReadyTransactions, +} + +impl Pool { + /// Imports transaction to the pool. + /// + /// The pool consists of two parts: Future and Ready. + /// The former contains transactions that require some tags that are not yet provided by + /// other transactions in the pool. + /// The latter contains transactions that have all the requirements satisfied and are + /// ready to be included in the block. + pub fn import( + &mut self, + block_number: BlockNumber, + tx: Transaction, + ) -> error::Result> { + if self.future.contains(&tx.hash) || self.ready.contains(&tx.hash) { + bail!(error::ErrorKind::AlreadyImported) + } + + let tx = WaitingTransaction::new(tx, self.ready.provided_tags()); + trace!(target: "txpool", "[{:?}] {:?}", tx.transaction.hash, tx); + debug!(target: "txpool", "[{:?}] Importing to {}", tx.transaction.hash, if tx.is_ready() { "ready" } else { "future" }); + + // If all tags are not satisfied import to future. + if !tx.is_ready() { + let hash = tx.transaction.hash.clone(); + self.future.import(tx); + return Ok(Imported::Future { hash }); + } + + self.import_to_ready(block_number, tx) + } + + /// Imports transaction to ready queue. + /// + /// NOTE the transaction has to have all requirements satisfied. + fn import_to_ready(&mut self, block_number: BlockNumber, tx: WaitingTransaction) -> error::Result> { + let hash = tx.transaction.hash.clone(); + let mut promoted = vec![]; + let mut failed = vec![]; + let mut removed = vec![]; + + let mut first = true; + let mut to_import = vec![tx]; + + loop { + // take first transaction from the list + let tx = match to_import.pop() { + Some(tx) => tx, + None => break, + }; + + // find transactions in Future that it unlocks + to_import.append(&mut self.future.satisfy_tags(&tx.transaction.provides)); + + // import this transaction + let current_hash = tx.transaction.hash.clone(); + match self.ready.import(block_number, tx) { + Ok(mut replaced) => { + if !first { + promoted.push(current_hash); + } + // The transactions were removed from the ready pool. We might attempt to re-import them. + removed.append(&mut replaced); + }, + // transaction failed to be imported. + Err(e) => if first { + debug!(target: "txpool", "[{:?}] Error importing: {:?}", current_hash, e); + return Err(e) + } else { + failed.push(current_hash); + }, + } + first = false; + } + + // An edge case when importing transaction caused + // some future transactions to be imported and that + // future transactions pushed out current transaction. + // This means that there is a cycle and the transactions should + // be moved back to future, since we can't resolve it. + if removed.iter().any(|tx| tx.hash == hash) { + // We still need to remove all transactions that we promoted + // since they depend on each other and will never get to the best iterator. + self.ready.remove_invalid(&promoted); + + debug!(target: "txpool", "[{:?}] Cycle detected, bailing.", hash); + bail!(error::ErrorKind::CycleDetected) + } + + Ok(Imported::Ready { + hash, + promoted, + failed, + removed, + }) + } + + /// Returns an iterator over ready transactions in the pool. + pub fn ready<'a>(&'a self) -> impl Iterator>> + 'a { + self.ready.get() + } + + /// Removes all transactions represented by the hashes and all other transactions + /// that depend on them. + /// + /// Returns a list of actually removed transactions. + /// NOTE some transactions might still be valid, but were just removed because + /// they were part of a chain, you may attempt to re-import them later. + /// NOTE If you want to remove ready transactions that were already used + /// and you don't want them to be stored in the pool use `prune_tags` method. + pub fn remove_invalid(&mut self, hashes: &[Hash]) -> Vec>> { + let mut removed = self.ready.remove_invalid(hashes); + removed.extend(self.future.remove(hashes).into_iter().map(Arc::new)); + removed + } + + /// Prunes transactions that provide given list of tags. + /// + /// This will cause all transactions that provide these tags to be removed from the pool, + /// but unlike `remove_invalid`, dependent transactions are not touched. + /// Additional transactions from future queue might be promoted to ready if you satisfy tags + /// that the pool didn't previously know about. + pub fn prune_tags(&mut self, block_number: BlockNumber, tags: impl IntoIterator) -> PruneStatus { + let mut to_import = vec![]; + let mut pruned = vec![]; + + for tag in tags { + // make sure to promote any future transactions that could be unlocked + to_import.append(&mut self.future.satisfy_tags(::std::iter::once(&tag))); + // and actually prune transactions in ready queue + pruned.append(&mut self.ready.prune_tags(tag)); + } + + let mut promoted = vec![]; + let mut failed = vec![]; + for tx in to_import { + let hash = tx.transaction.hash.clone(); + match self.import_to_ready(block_number, tx) { + Ok(res) => promoted.push(res), + Err(e) => { + warn!(target: "txpool", "[{:?}] Failed to promote during pruning: {:?}", hash, e); + failed.push(hash) + }, + } + } + + PruneStatus { + pruned, + failed, + promoted, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + type Hash = u64; + + fn pool() -> Pool { + Pool::default() + } + + #[test] + fn should_import_transaction_to_ready() { + // given + let mut pool = pool(); + + // when + pool.import(1, Transaction { + ex: vec![1u8], + hash: 1u64, + priority: 5u64, + longevity: 64u64, + requires: vec![], + provides: vec![vec![1]], + }).unwrap(); + + // then + assert_eq!(pool.ready().count(), 1); + assert_eq!(pool.ready.len(), 1); + } + + #[test] + fn should_not_import_same_transaction_twice() { + // given + let mut pool = pool(); + + // when + pool.import(1, Transaction { + ex: vec![1u8], + hash: 1, + priority: 5u64, + longevity: 64u64, + requires: vec![], + provides: vec![vec![1]], + }).unwrap(); + pool.import(1, Transaction { + ex: vec![1u8], + hash: 1, + priority: 5u64, + longevity: 64u64, + requires: vec![], + provides: vec![vec![1]], + }).unwrap_err(); + + // then + assert_eq!(pool.ready().count(), 1); + assert_eq!(pool.ready.len(), 1); + } + + + #[test] + fn should_import_transaction_to_future_and_promote_it_later() { + // given + let mut pool = pool(); + + // when + pool.import(1, Transaction { + ex: vec![1u8], + hash: 1, + priority: 5u64, + longevity: 64u64, + requires: vec![vec![0]], + provides: vec![vec![1]], + }).unwrap(); + assert_eq!(pool.ready().count(), 0); + assert_eq!(pool.ready.len(), 0); + pool.import(1, Transaction { + ex: vec![2u8], + hash: 2, + priority: 5u64, + longevity: 64u64, + requires: vec![], + provides: vec![vec![0]], + }).unwrap(); + + // then + assert_eq!(pool.ready().count(), 2); + assert_eq!(pool.ready.len(), 2); + } + + #[test] + fn should_promote_a_subgraph() { + // given + let mut pool = pool(); + + // when + pool.import(1, Transaction { + ex: vec![1u8], + hash: 1, + priority: 5u64, + longevity: 64u64, + requires: vec![vec![0]], + provides: vec![vec![1]], + }).unwrap(); + pool.import(1, Transaction { + ex: vec![3u8], + hash: 3, + priority: 5u64, + longevity: 64u64, + requires: vec![vec![2]], + provides: vec![], + }).unwrap(); + pool.import(1, Transaction { + ex: vec![2u8], + hash: 2, + priority: 5u64, + longevity: 64u64, + requires: vec![vec![1]], + provides: vec![vec![3], vec![2]], + }).unwrap(); + pool.import(1, Transaction { + ex: vec![4u8], + hash: 4, + priority: 1_000u64, + longevity: 64u64, + requires: vec![vec![3], vec![4]], + provides: vec![], + }).unwrap(); + assert_eq!(pool.ready().count(), 0); + assert_eq!(pool.ready.len(), 0); + + let res = pool.import(1, Transaction { + ex: vec![5u8], + hash: 5, + priority: 5u64, + longevity: 64u64, + requires: vec![], + provides: vec![vec![0], vec![4]], + }).unwrap(); + + // then + let mut it = pool.ready().into_iter().map(|tx| tx.ex[0]); + + assert_eq!(it.next(), Some(5)); + assert_eq!(it.next(), Some(1)); + assert_eq!(it.next(), Some(2)); + assert_eq!(it.next(), Some(4)); + assert_eq!(it.next(), Some(3)); + assert_eq!(it.next(), None); + assert_eq!(res, Imported::Ready { + hash: 5, + promoted: vec![1, 2, 3, 4], + failed: vec![], + removed: vec![], + }); + } + + #[test] + fn should_handle_a_cycle() { + // given + let mut pool = pool(); + pool.import(1, Transaction { + ex: vec![1u8], + hash: 1, + priority: 5u64, + longevity: 64u64, + requires: vec![vec![0]], + provides: vec![vec![1]], + }).unwrap(); + pool.import(1, Transaction { + ex: vec![3u8], + hash: 3, + priority: 5u64, + longevity: 64u64, + requires: vec![vec![1]], + provides: vec![vec![2]], + }).unwrap(); + assert_eq!(pool.ready().count(), 0); + assert_eq!(pool.ready.len(), 0); + + // when + pool.import(1, Transaction { + ex: vec![2u8], + hash: 2, + priority: 5u64, + longevity: 64u64, + requires: vec![vec![2]], + provides: vec![vec![0]], + }).unwrap(); + + // then + { + let mut it = pool.ready().into_iter().map(|tx| tx.ex[0]); + assert_eq!(it.next(), None); + } + // all transactions occupy the Future queue - it's fine + assert_eq!(pool.future.len(), 3); + + // let's close the cycle with one additional transaction + let res = pool.import(1, Transaction { + ex: vec![4u8], + hash: 4, + priority: 50u64, + longevity: 64u64, + requires: vec![], + provides: vec![vec![0]], + }).unwrap(); + let mut it = pool.ready().into_iter().map(|tx| tx.ex[0]); + assert_eq!(it.next(), Some(4)); + assert_eq!(it.next(), Some(1)); + assert_eq!(it.next(), Some(3)); + assert_eq!(it.next(), None); + assert_eq!(res, Imported::Ready { + hash: 4, + promoted: vec![1, 3], + failed: vec![2], + removed: vec![], + }); + assert_eq!(pool.future.len(), 0); + } + + + #[test] + fn should_handle_a_cycle_with_low_priority() { + // given + let mut pool = pool(); + pool.import(1, Transaction { + ex: vec![1u8], + hash: 1, + priority: 5u64, + longevity: 64u64, + requires: vec![vec![0]], + provides: vec![vec![1]], + }).unwrap(); + pool.import(1, Transaction { + ex: vec![3u8], + hash: 3, + priority: 5u64, + longevity: 64u64, + requires: vec![vec![1]], + provides: vec![vec![2]], + }).unwrap(); + assert_eq!(pool.ready().count(), 0); + assert_eq!(pool.ready.len(), 0); + + // when + pool.import(1, Transaction { + ex: vec![2u8], + hash: 2, + priority: 5u64, + longevity: 64u64, + requires: vec![vec![2]], + provides: vec![vec![0]], + }).unwrap(); + + // then + { + let mut it = pool.ready().into_iter().map(|tx| tx.ex[0]); + assert_eq!(it.next(), None); + } + // all transactions occupy the Future queue - it's fine + assert_eq!(pool.future.len(), 3); + + // let's close the cycle with one additional transaction + let err = pool.import(1, Transaction { + ex: vec![4u8], + hash: 4, + priority: 1u64, // lower priority than Tx(2) + longevity: 64u64, + requires: vec![], + provides: vec![vec![0]], + }).unwrap_err(); + let mut it = pool.ready().into_iter().map(|tx| tx.ex[0]); + assert_eq!(it.next(), None); + assert_eq!(pool.ready.len(), 0); + assert_eq!(pool.future.len(), 0); + if let error::ErrorKind::CycleDetected = *err.kind() { + } else { + assert!(false, "Invalid error kind: {:?}", err.kind()); + } + } + + #[test] + fn should_remove_invalid_transactions() { + // given + let mut pool = pool(); + pool.import(1, Transaction { + ex: vec![5u8], + hash: 5, + priority: 5u64, + longevity: 64u64, + requires: vec![], + provides: vec![vec![0], vec![4]], + }).unwrap(); + pool.import(1, Transaction { + ex: vec![1u8], + hash: 1, + priority: 5u64, + longevity: 64u64, + requires: vec![vec![0]], + provides: vec![vec![1]], + }).unwrap(); + pool.import(1, Transaction { + ex: vec![3u8], + hash: 3, + priority: 5u64, + longevity: 64u64, + requires: vec![vec![2]], + provides: vec![], + }).unwrap(); + pool.import(1, Transaction { + ex: vec![2u8], + hash: 2, + priority: 5u64, + longevity: 64u64, + requires: vec![vec![1]], + provides: vec![vec![3], vec![2]], + }).unwrap(); + pool.import(1, Transaction { + ex: vec![4u8], + hash: 4, + priority: 1_000u64, + longevity: 64u64, + requires: vec![vec![3], vec![4]], + provides: vec![], + }).unwrap(); + // future + pool.import(1, Transaction { + ex: vec![6u8], + hash: 6, + priority: 1_000u64, + longevity: 64u64, + requires: vec![vec![11]], + provides: vec![], + }).unwrap(); + assert_eq!(pool.ready().count(), 5); + assert_eq!(pool.future.len(), 1); + + // when + pool.remove_invalid(&[6, 1]); + + // then + assert_eq!(pool.ready().count(), 1); + assert_eq!(pool.future.len(), 0); + } + + + #[test] + fn should_prune_ready_transactions() { + // given + let mut pool = pool(); + // future (waiting for 0) + pool.import(1, Transaction { + ex: vec![5u8], + hash: 5, + priority: 5u64, + longevity: 64u64, + requires: vec![vec![0]], + provides: vec![vec![100]], + }).unwrap(); + // ready + pool.import(1, Transaction { + ex: vec![1u8], + hash: 1, + priority: 5u64, + longevity: 64u64, + requires: vec![], + provides: vec![vec![1]], + }).unwrap(); + pool.import(1, Transaction { + ex: vec![2u8], + hash: 2, + priority: 5u64, + longevity: 64u64, + requires: vec![vec![2]], + provides: vec![vec![3]], + }).unwrap(); + pool.import(1, Transaction { + ex: vec![3u8], + hash: 3, + priority: 5u64, + longevity: 64u64, + requires: vec![vec![1]], + provides: vec![vec![2]], + }).unwrap(); + pool.import(1, Transaction { + ex: vec![4u8], + hash: 4, + priority: 1_000u64, + longevity: 64u64, + requires: vec![vec![3], vec![2]], + provides: vec![vec![4]], + }).unwrap(); + + assert_eq!(pool.ready().count(), 4); + assert_eq!(pool.future.len(), 1); + + // when + let result = pool.prune_tags(1, vec![vec![0], vec![2]]); + + // then + assert_eq!(result.pruned.len(), 2); + assert_eq!(result.failed.len(), 0); + assert_eq!(result.promoted[0], Imported::Ready { + hash: 5, + promoted: vec![], + failed: vec![], + removed: vec![], + }); + assert_eq!(result.promoted.len(), 1); + assert_eq!(pool.future.len(), 0); + assert_eq!(pool.ready.len(), 3); + assert_eq!(pool.ready().count(), 3); + } + +} diff --git a/core/transaction-graph/src/ready.rs b/core/transaction-graph/src/ready.rs new file mode 100644 index 0000000000000..7c1296edfb799 --- /dev/null +++ b/core/transaction-graph/src/ready.rs @@ -0,0 +1,559 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use std::{ + collections::{HashMap, HashSet, BTreeSet}, + cmp, + hash, + sync::Arc, +}; + +use sr_primitives::traits::Member; +use sr_primitives::transaction_validity::{ + TransactionTag as Tag, +}; + +use error; +use future::WaitingTransaction; +use pool::{BlockNumber, Transaction}; + +#[derive(Debug, Clone)] +pub struct TransactionRef { + pub transaction: Arc>, + pub valid_till: BlockNumber, + pub insertion_id: u64, +} + +impl Ord for TransactionRef { + fn cmp(&self, other: &Self) -> cmp::Ordering { + self.transaction.priority.cmp(&other.transaction.priority) + .then(other.valid_till.cmp(&self.valid_till)) + .then(other.insertion_id.cmp(&self.insertion_id)) + } +} + +impl PartialOrd for TransactionRef { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for TransactionRef { + fn eq(&self, other: &Self) -> bool { + self.cmp(other) == cmp::Ordering::Equal + } +} +impl Eq for TransactionRef {} + +#[derive(Debug)] +struct ReadyTx { + /// A reference to a transaction + pub transaction: TransactionRef, + /// A list of transactions that get unlocked by this one + pub unlocks: Vec, + /// How many required tags are provided inherently + /// + /// Some transactions might be already pruned from the queue, + /// so when we compute ready set we may consider this transactions ready earlier. + pub requires_offset: usize, +} + +const HASH_READY: &str = r#" +Every time transaction is imported its hash is placed in `ready` map and tags in `provided_tags`; +Every time transaction is removed from the queue we remove the hash from `ready` map and from `provided_tags`; +Hence every hash retrieved from `provided_tags` is always present in `ready`; +qed +"#; + +#[derive(Debug)] +pub struct ReadyTransactions { + /// Insertion id + insertion_id: u64, + /// tags that are provided by Ready transactions + provided_tags: HashMap, + /// Transactions that are ready (i.e. don't have any requirements external to the pool) + ready: HashMap>, + // ^^ TODO [ToDr] Consider wrapping this into `Arc>` and allow multiple concurrent iterators + /// Best transactions that are ready to be included to the block without any other previous transaction. + best: BTreeSet>, +} + +impl Default for ReadyTransactions { + fn default() -> Self { + ReadyTransactions { + insertion_id: Default::default(), + provided_tags: Default::default(), + ready: Default::default(), + best: Default::default(), + } + } +} + +impl ReadyTransactions { + /// Borrows a map of tags that are provided by transactions in this queue. + pub fn provided_tags(&self) -> &HashMap { + &self.provided_tags + } + + /// Returns an iterator of ready transactions. + /// + /// Transactions are returned in order: + /// 1. First by the dependencies: + /// - never return transaction that requires a tag, which was not provided by one of the previously returned transactions + /// 2. Then by priority: + /// - If there are two transactions with all requirements satisfied the one with higher priority goes first. + /// 3. Then by the ttl that's left + /// - transactions that are valid for a shorter time go first + /// 4. Lastly we sort by the time in the queue + /// - transactions that are longer in the queue go first + pub fn get<'a>(&'a self) -> impl Iterator>> + 'a { + BestIterator { + all: &self.ready, + best: self.best.clone(), + awaiting: Default::default(), + } + } + + /// Imports transactions to the pool of ready transactions. + /// + /// The transaction needs to have all tags satisfied (be ready) by transactions + /// that are in this queue. + pub fn import( + &mut self, + block_number: BlockNumber, + tx: WaitingTransaction, + ) -> error::Result>>> { + assert!(tx.is_ready(), "Only ready transactions can be imported."); + assert!(!self.ready.contains_key(&tx.transaction.hash), "Transaction is already imported."); + + self.insertion_id += 1; + let insertion_id = self.insertion_id; + let hash = tx.transaction.hash.clone(); + let tx = tx.transaction; + + let replaced = self.replace_previous(&tx)?; + + let mut goes_to_best = true; + // Add links to transactions that unlock the current one + for tag in &tx.requires { + // Check if the transaction that satisfies the tag is still in the queue. + if let Some(other) = self.provided_tags.get(tag) { + let mut tx = self.ready.get_mut(other).expect(HASH_READY); + tx.unlocks.push(hash.clone()); + // this transaction depends on some other, so it doesn't go to best directly. + goes_to_best = false; + } + } + + // update provided_tags + for tag in tx.provides.clone() { + self.provided_tags.insert(tag, hash.clone()); + } + + let transaction = TransactionRef { + insertion_id, + valid_till: block_number + tx.longevity, + transaction: Arc::new(tx), + }; + + // insert to best if it doesn't require any other transaction to be included before it + if goes_to_best { + self.best.insert(transaction.clone()); + } + + // insert to Ready + self.ready.insert(hash, ReadyTx { + transaction, + unlocks: vec![], + requires_offset: 0, + }); + + Ok(replaced) + } + + /// Returns true if given hash is part of the queue. + pub fn contains(&self, hash: &Hash) -> bool { + self.ready.contains_key(hash) + } + + /// Removes invalid transactions from the ready pool. + /// + /// NOTE removing a transaction will also cause a removal of all transactions that depend on that one + /// (i.e. the entire subgraph that this transaction is a start of will be removed). + /// All removed transactions are returned. + pub fn remove_invalid(&mut self, hashes: &[Hash]) -> Vec>> { + let mut removed = vec![]; + let mut to_remove = hashes.iter().cloned().collect::>(); + + loop { + let hash = match to_remove.pop() { + Some(hash) => hash, + None => return removed, + }; + + if let Some(mut tx) = self.ready.remove(&hash) { + // remove entries from provided_tags + for tag in &tx.transaction.transaction.provides { + self.provided_tags.remove(tag); + } + // remove from unlocks + for tag in &tx.transaction.transaction.requires { + if let Some(hash) = self.provided_tags.get(tag) { + if let Some(tx) = self.ready.get_mut(hash) { + remove_item(&mut tx.unlocks, &hash); + } + } + } + + // remove from best + self.best.remove(&tx.transaction); + + // remove all transactions that the current one unlocks + to_remove.append(&mut tx.unlocks); + + // add to removed + debug!(target: "txpool", "[{:?}] Removed as invalid: ", hash); + removed.push(tx.transaction.transaction); + } + } + } + + /// Removes transactions that provide given tag. + /// + /// All transactions that lead to a transaction, which provides this tag + /// are going to be removed from the queue, but no other transactions are touched - + /// i.e. all other subgraphs starting from given tag are still considered valid & ready. + pub fn prune_tags(&mut self, tag: Tag) -> Vec>> { + let mut removed = vec![]; + let mut to_remove = vec![tag]; + + loop { + let tag = match to_remove.pop() { + Some(tag) => tag, + None => return removed, + }; + + let res = self.provided_tags.remove(&tag) + .and_then(|hash| self.ready.remove(&hash)); + + if let Some(tx) = res { + let unlocks = tx.unlocks; + let tx = tx.transaction.transaction; + + // prune previous transactions as well + { + let hash = &tx.hash; + let mut find_previous = |tag| -> Option> { + let prev_hash = self.provided_tags.get(tag)?; + let tx2 = self.ready.get_mut(&prev_hash)?; + remove_item(&mut tx2.unlocks, hash); + // We eagerly prune previous transactions as well. + // But it might not always be good. + // Possible edge case: + // - tx provides two tags + // - the second tag enables some subgraph we don't know of yet + // - we will prune the transaction + // - when we learn about the subgraph it will go to future + // - we will have to wait for re-propagation of that transaction + // Alternatively the caller may attempt to re-import these transactions. + if tx2.unlocks.is_empty() { + Some(tx2.transaction.transaction.provides.clone()) + } else { + None + } + }; + + // find previous transactions + for tag in &tx.requires { + if let Some(mut tags_to_remove) = find_previous(tag) { + to_remove.append(&mut tags_to_remove); + } + } + } + + // add the transactions that just got unlocked to `best` + for hash in unlocks { + if let Some(tx) = self.ready.get_mut(&hash) { + tx.requires_offset += 1; + // this transaction is ready + if tx.requires_offset == tx.transaction.transaction.requires.len() { + self.best.insert(tx.transaction.clone()); + } + } + } + + debug!(target: "txpool", "[{:?}] Pruned.", tx.hash); + removed.push(tx); + } + } + } + + /// Checks if the transaction is providing the same tags as other transactions. + /// + /// In case that's true it determines if the priority of transactions that + /// we are about to replace is lower than the priority of the replacement transaction. + /// We remove/replace old transactions in case they have lower priority. + /// + /// In case replacement is succesful returns a list of removed transactions. + fn replace_previous(&mut self, tx: &Transaction) -> error::Result>>> { + let mut to_remove = { + // check if we are replacing a transaction + let replace_hashes = tx.provides + .iter() + .filter_map(|tag| self.provided_tags.get(tag)) + .collect::>(); + + // early exit if we are not replacing anything. + if replace_hashes.is_empty() { + return Ok(vec![]); + } + + // now check if collective priority is lower than the replacement transaction. + let old_priority = replace_hashes + .iter() + .filter_map(|hash| self.ready.get(hash)) + .fold(0u64, |total, tx| total.saturating_add(tx.transaction.transaction.priority)); + + // bail - the transaction has too low priority to replace the old ones + if old_priority >= tx.priority { + bail!(error::ErrorKind::TooLowPriority(old_priority, tx.priority)) + } + + replace_hashes.into_iter().cloned().collect::>() + }; + + let new_provides = tx.provides.iter().cloned().collect::>(); + let mut removed = vec![]; + loop { + let hash = match to_remove.pop() { + Some(hash) => hash, + None => return Ok(removed), + }; + + let tx = self.ready.remove(&hash).expect(HASH_READY); + // check if this transaction provides stuff that is not provided by the new one. + let (mut unlocks, tx) = (tx.unlocks, tx.transaction.transaction); + { + let invalidated = tx.provides + .iter() + .filter(|tag| !new_provides.contains(&**tag)); + + for tag in invalidated { + // remove the tag since it's no longer provided by any transaction + self.provided_tags.remove(tag); + // add more transactions to remove + to_remove.append(&mut unlocks); + } + } + + removed.push(tx); + } + } + + /// Returns number of transactions in this queue. + #[cfg(test)] + pub fn len(&self) -> usize { + self.ready.len() + } + +} + +pub struct BestIterator<'a, Hash: 'a> { + all: &'a HashMap>, + awaiting: HashMap)>, + best: BTreeSet>, +} + +impl<'a, Hash: 'a + hash::Hash + Member> BestIterator<'a, Hash> { + /// Depending on number of satisfied requirements insert given ref + /// either to awaiting set or to best set. + fn best_or_awaiting(&mut self, satisfied: usize, tx_ref: TransactionRef) { + if satisfied == tx_ref.transaction.requires.len() { + // If we have satisfied all deps insert to best + self.best.insert(tx_ref); + + } else { + // otherwise we're still awaiting for some deps + self.awaiting.insert(tx_ref.transaction.hash.clone(), (satisfied, tx_ref)); + } + } +} + +impl<'a, Hash: 'a + hash::Hash + Member> Iterator for BestIterator<'a, Hash> { + type Item = Arc>; + + fn next(&mut self) -> Option { + let best = self.best.iter().next_back()?.clone(); + let best = self.best.take(&best)?; + + let ready = match self.all.get(&best.transaction.hash) { + Some(ready) => ready, + // The transaction is not in all, maybe it was removed in the meantime? + None => return self.next(), + }; + + // Insert transactions that just got unlocked. + for hash in &ready.unlocks { + // first check local awaiting transactions + if let Some((mut satisfied, tx_ref)) = self.awaiting.remove(hash) { + satisfied += 1; + self.best_or_awaiting(satisfied, tx_ref); + // then get from the pool + } else if let Some(next) = self.all.get(hash) { + self.best_or_awaiting(next.requires_offset + 1, next.transaction.clone()); + } + } + + Some(best.transaction.clone()) + } +} + +// See: https://github.com/rust-lang/rust/issues/40062 +fn remove_item(vec: &mut Vec, item: &T) { + if let Some(idx) = vec.iter().position(|i| i == item) { + vec.swap_remove(idx); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn tx(id: u8) -> Transaction { + Transaction { + ex: vec![id], + hash: id as u64, + priority: 1, + longevity: 2, + requires: vec![vec![1], vec![2]], + provides: vec![vec![3], vec![4]], + } + } + + #[test] + fn should_replace_transaction_that_provides_the_same_tag() { + // given + let mut ready = ReadyTransactions::default(); + let block_number = 1; + let mut tx1 = tx(1); + tx1.requires.clear(); + let mut tx2 = tx(2); + tx2.requires.clear(); + tx2.provides = vec![vec![3]]; + let mut tx3 = tx(3); + tx3.requires.clear(); + tx3.provides = vec![vec![4]]; + + // when + let x = WaitingTransaction::new(tx2, &ready.provided_tags()); + ready.import(block_number, x).unwrap(); + let x = WaitingTransaction::new(tx3, &ready.provided_tags()); + ready.import(block_number, x).unwrap(); + assert_eq!(ready.get().count(), 2); + + // too low priority + let x = WaitingTransaction::new(tx1.clone(), &ready.provided_tags()); + ready.import(block_number, x).unwrap_err(); + + tx1.priority = 10; + let x = WaitingTransaction::new(tx1.clone(), &ready.provided_tags()); + ready.import(block_number, x).unwrap(); + + // then + assert_eq!(ready.get().count(), 1); + } + + + #[test] + fn should_return_best_transactions_in_correct_order() { + // given + let mut ready = ReadyTransactions::default(); + let mut tx1 = tx(1); + tx1.requires.clear(); + let mut tx2 = tx(2); + tx2.requires = tx1.provides.clone(); + tx2.provides = vec![vec![106]]; + let mut tx3 = tx(3); + tx3.requires = vec![tx1.provides[0].clone(), vec![106]]; + tx3.provides = vec![]; + let mut tx4 = tx(4); + tx4.requires = vec![tx1.provides[0].clone()]; + tx4.provides = vec![]; + let block_number = 1; + + // when + let x = WaitingTransaction::new(tx1, &ready.provided_tags()); + ready.import(block_number, x).unwrap(); + let x = WaitingTransaction::new(tx2, &ready.provided_tags()); + ready.import(block_number, x).unwrap(); + let x = WaitingTransaction::new(tx3, &ready.provided_tags()); + ready.import(block_number, x).unwrap(); + let x = WaitingTransaction::new(tx4, &ready.provided_tags()); + ready.import(block_number, x).unwrap(); + + // then + assert_eq!(ready.best.len(), 1); + + let mut it = ready.get().map(|tx| tx.ex[0]); + + assert_eq!(it.next(), Some(1)); + assert_eq!(it.next(), Some(2)); + assert_eq!(it.next(), Some(3)); + assert_eq!(it.next(), Some(4)); + assert_eq!(it.next(), None); + } + + #[test] + fn should_order_refs() { + let mut id = 1; + let mut with_priority = |priority| { + id += 1; + let mut tx = tx(id); + tx.priority = priority; + tx + }; + // higher priority = better + assert!(TransactionRef { + transaction: Arc::new(with_priority(3)), + valid_till: 3, + insertion_id: 1, + } > TransactionRef { + transaction: Arc::new(with_priority(2)), + valid_till: 3, + insertion_id: 2, + }); + // lower validity = better + assert!(TransactionRef { + transaction: Arc::new(with_priority(3)), + valid_till: 2, + insertion_id: 1, + } > TransactionRef { + transaction: Arc::new(with_priority(3)), + valid_till: 3, + insertion_id: 2, + }); + // lower insertion_id = better + assert!(TransactionRef { + transaction: Arc::new(with_priority(3)), + valid_till: 3, + insertion_id: 1, + } > TransactionRef { + transaction: Arc::new(with_priority(3)), + valid_till: 3, + insertion_id: 2, + }); + } +}