From 07d9fe30884f2868703cacb08d6cf0e156baaa3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Tue, 18 Sep 2018 13:52:33 +0200 Subject: [PATCH 01/14] Graph transaction pool. --- Cargo.lock | 17 +++ Cargo.toml | 1 + node/transaction-graph/Cargo.toml | 17 +++ node/transaction-graph/src/future.rs | 0 node/transaction-graph/src/lib.rs | 40 ++++++ node/transaction-graph/src/pool.rs | 188 +++++++++++++++++++++++++++ node/transaction-graph/src/ready.rs | 177 +++++++++++++++++++++++++ 7 files changed, 440 insertions(+) create mode 100644 node/transaction-graph/Cargo.toml create mode 100644 node/transaction-graph/src/future.rs create mode 100644 node/transaction-graph/src/lib.rs create mode 100644 node/transaction-graph/src/pool.rs create mode 100644 node/transaction-graph/src/ready.rs diff --git a/Cargo.lock b/Cargo.lock index 3c7ba0cc7abc0..65d3b88e2dd59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1597,6 +1597,23 @@ dependencies = [ "tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "node-transaction-graph" +version = "0.1.0" +dependencies = [ + "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "node-api 0.1.0", + "node-primitives 0.1.0", + "node-runtime 0.1.0", + "parity-codec 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "sr-primitives 0.1.0", + "substrate-client 0.1.0", + "substrate-keyring 0.1.0", + "substrate-primitives 0.1.0", +] + [[package]] name = "node-transaction-pool" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index d8b6e280b4a85..a562b73bf20fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,6 +68,7 @@ members = [ "node/runtime", "node/service", "node/transaction-pool", + "node/transaction-graph", "subkey", ] exclude = [ diff --git a/node/transaction-graph/Cargo.toml b/node/transaction-graph/Cargo.toml new file mode 100644 index 0000000000000..23f9362900a94 --- /dev/null +++ b/node/transaction-graph/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "node-transaction-graph" +version = "0.1.0" +authors = ["Parity Technologies "] + +[dependencies] +log = "0.3.0" +error-chain = "0.12" +parking_lot = "0.4" +node-api = { path = "../api" } +node-primitives = { path = "../primitives" } +node-runtime = { path = "../runtime" } +substrate-client = { path = "../../core/client" } +parity-codec = { version = "1.1" } +substrate-keyring = { path = "../../core/keyring" } +substrate-primitives = { path = "../../core/primitives" } +sr-primitives = { path = "../../core/sr-primitives" } diff --git a/node/transaction-graph/src/future.rs b/node/transaction-graph/src/future.rs new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/node/transaction-graph/src/lib.rs b/node/transaction-graph/src/lib.rs new file mode 100644 index 0000000000000..2eb67bb1ded23 --- /dev/null +++ b/node/transaction-graph/src/lib.rs @@ -0,0 +1,40 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + + +// #![warn(missing_docs)] +// #![warn(unused_crates)] + +extern crate substrate_client as client; +extern crate parity_codec as codec; +extern crate substrate_primitives; +extern crate sr_primitives; +extern crate node_runtime as runtime; +extern crate node_primitives as primitives; +extern crate node_api; +extern crate parking_lot; + +#[cfg(test)] +extern crate substrate_keyring; + +#[macro_use] +extern crate error_chain; + +#[macro_use] +extern crate log; + +mod pool; +mod ready; diff --git a/node/transaction-graph/src/pool.rs b/node/transaction-graph/src/pool.rs new file mode 100644 index 0000000000000..45c07d80355ba --- /dev/null +++ b/node/transaction-graph/src/pool.rs @@ -0,0 +1,188 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use std::{ + collections::{HashMap, HashSet}, + fmt, + hash, +}; + +use primitives::UncheckedExtrinsic; +use sr_primitives::transaction_validity::{ + TransactionTag as Tag, + TransactionLongevity as Longevity, + TransactionPriority as Priority, +}; + +use ready::ReadyTransactions; + +pub type BlockNumber = u64; + +/// Immutable transaction +#[derive(Debug)] +pub struct Transaction { + pub ex: UncheckedExtrinsic, + pub priority: Priority, + pub longevity: Longevity, + pub requires: Vec, + pub provides: Vec, +} + +#[derive(Debug)] +pub struct WaitingTransaction { + pub transaction: Transaction, + pub missing_tags: HashSet, + pub hash: Hash, +} + +impl WaitingTransaction { + pub fn new(transaction: Transaction, hash: Hash, provided: &HashMap) -> Self { + let missing_tags = transaction.requires + .iter() + .filter(|tag| !provided.contains_key(&**tag)) + .cloned() + .collect(); + + WaitingTransaction { + transaction, + missing_tags, + hash, + } + } + + pub fn satisfy_tag(&mut self, tag: &Tag) { + self.missing_tags.remove(tag); + } + + pub fn is_ready(&self) -> bool { + self.missing_tags.is_empty() + } +} + +#[derive(Debug)] +pub struct FutureTransactions { + /// tags that are not yet provided by any transaction and we await for them + wanted_tags: HashMap>, + /// Transactions waiting for a particular other transaction + waiting: HashMap>, +} + +impl Default for FutureTransactions { + fn default() -> Self { + FutureTransactions { + wanted_tags: Default::default(), + waiting: Default::default(), + } + } +} + +impl FutureTransactions { + pub fn import(&mut self, tx: WaitingTransaction) { + unimplemented!() + } +} + +/// Transaction pool. +/// +/// Builds a dependency graph for all transactions in the pool and returns +/// the ones that are currently ready to be executed. +pub struct Pool { + future: FutureTransactions, + ready: ReadyTransactions, + insertion_id: u64, + hasher: Box Hash>, +} + +impl fmt::Debug for Pool { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Pool") + .field("future", &self.future) + .field("ready", &self.ready) + .field("insertion_id", &self.insertion_id) + .finish() + } +} + +impl Pool { + pub fn new(hasher: F) -> Self + where + F: Fn(&UncheckedExtrinsic) -> Hash + 'static, + { + Pool { + future: Default::default(), + ready: Default::default(), + insertion_id: Default::default(), + hasher: Box::new(hasher), + } + } + + pub fn import( + &mut self, + block_number: BlockNumber, + ex: UncheckedExtrinsic, + priority: Priority, + longevity: Longevity, + requires: Vec, + provides: Vec, + ) { + let hash = (self.hasher)(&ex); + let tx = Transaction { ex, priority, longevity, requires, provides }; + let tx = WaitingTransaction::new(tx, hash, self.ready.provided_tags()); + + // Tags provided from ready queue satisfy all requirements, so just add to ready. + if tx.is_ready() { + self.insertion_id += 1; + self.ready.import(tx, self.insertion_id, block_number); + // TODO [ToDr] Check what transactions from future are unlocked. + } else { + self.future.import(tx); + } + } + + pub fn ready(&self) -> Vec<()> { + self.ready.get().map(|_| ()).collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + type Hash = u64; + + fn pool() -> Pool { + Pool::new(|ex| ex.0.len() as u64) + } + + #[test] + fn should_import_transaction_to_ready() { + // given + let mut pool = pool(); + + // when + pool.import( + 1, + UncheckedExtrinsic(vec![1u8]), + 5u64, + 64u64, + vec![], + vec![vec![1]], + ); + + // then + assert_eq!(pool.ready().len(), 1); + } +} diff --git a/node/transaction-graph/src/ready.rs b/node/transaction-graph/src/ready.rs new file mode 100644 index 0000000000000..2c3adbfb9965d --- /dev/null +++ b/node/transaction-graph/src/ready.rs @@ -0,0 +1,177 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use std::{ + collections::{HashMap, BTreeSet}, + cmp, + hash, + sync::Arc, +}; + +use sr_primitives::transaction_validity::{ + TransactionTag as Tag, + TransactionPriority as Priority, +}; + +use super::pool::BlockNumber; +use super::pool::{Transaction, WaitingTransaction}; + +#[derive(Debug, Clone)] +pub struct TransactionRef { + pub transaction: Arc, + pub hash: Hash, + pub valid_till: BlockNumber, + pub insertion_id: u64, +} + +// TODO [ToDr] Testme +impl Ord for TransactionRef { + fn cmp(&self, other: &Self) -> cmp::Ordering { + self.transaction.priority.cmp(&other.transaction.priority) + .then(self.valid_till.cmp(&other.valid_till)) + .then(self.insertion_id.cmp(&other.insertion_id)) + } +} + +impl PartialOrd for TransactionRef { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for TransactionRef { + fn eq(&self, other: &Self) -> bool { + self.cmp(other) == cmp::Ordering::Equal + } +} +impl Eq for TransactionRef {} + +#[derive(Debug)] +struct ReadyTx { + pub transaction: TransactionRef, + pub unlocks: Vec, +} + +#[derive(Debug)] +pub struct ReadyTransactions { + /// tags that are provided by ready transactions + provided_tags: HashMap, + /// Transactions that are ready (i.e. don't have any requirements external to the pool) + ready: HashMap>, + // ^^ TODO [ToDr] Consider wrapping this into `Arc>` and allow multiple concurrent iterators + /// Best transactions that are ready to be included to the block without any other previous transaction. + best: BTreeSet>, +} + +impl Default for ReadyTransactions { + fn default() -> Self { + ReadyTransactions { + provided_tags: Default::default(), + ready: Default::default(), + best: Default::default(), + } + } +} + +impl ReadyTransactions { + pub fn provided_tags(&self) -> &HashMap { + &self.provided_tags + } + + pub fn get<'a>(&'a self) -> impl Iterator> + 'a { + BestIterator { + ready: &self.ready, + best: self.best.clone(), + } + } + + pub fn import(&mut self, tx: WaitingTransaction, insertion_id: u64, block_number: BlockNumber) { + assert!(tx.is_ready(), "Only ready transactions can be imported."); + + let hash = tx.hash; + let tx = tx.transaction; + let mut goes_to_best = true; + // Add info what transactions unlock this one. + for tag in &tx.requires { + // Check if the transaction that satisfies the tag is still in the queue. + if let Some(other) = self.provided_tags.get(tag) { + let mut tx = self.ready.get_mut(other).expect("All hashes are present in ready; qed"); + tx.unlocks.push(hash.clone()); + // this transaction depends on some other, so it doesn't go to best directly. + goes_to_best = false; + } + } + + // update provided_tags + for tag in tx.provides.clone() { + self.provided_tags.insert(tag, hash.clone()); + } + + let transaction = TransactionRef { + insertion_id, + hash: hash.clone(), + valid_till: block_number + tx.longevity, + transaction: Arc::new(tx), + }; + + // insert to best if it doesn't require any other transaction to be included before it + if goes_to_best { + self.best.insert(transaction.clone()); + } + + // insert to ready + self.ready.insert(hash, ReadyTx { + transaction, + unlocks: vec![], + }); + } +} + +pub struct BestIterator<'a, Hash: 'a> { + ready: &'a HashMap>, + best: BTreeSet>, +} + +impl<'a, Hash: 'a + hash::Hash + Eq + Clone> Iterator for BestIterator<'a, Hash> { + type Item = Arc; + + fn next(&mut self) -> Option { + let best = self.best.iter().next()?.clone(); + let best = self.best.take(&best)?; + + let ready = self.ready.get(&best.hash)?; + + // Insert transactions that just got unlocked. + for hash in &ready.unlocks { + // TODO [ToDr] This is actually invalid. + // We need to check if all `requires` were already returned. + if let Some(other) = self.ready.get(hash) { + self.best.insert(other.transaction.clone()); + } + } + + Some(best.transaction.clone()) + } +} + +#[cfg(test)] +mod tests { + + #[test] + fn should_sort_best_transactions() { + assert_eq!(true, false); + } +} From a1ae961f0715307b3a90e6caf166e3c20ac0461b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Tue, 18 Sep 2018 17:43:17 +0200 Subject: [PATCH 02/14] Start future implementation. --- node/transaction-graph/src/future.rs | 89 +++++++++++++++ node/transaction-graph/src/lib.rs | 3 + node/transaction-graph/src/pool.rs | 141 +++++++++++------------- node/transaction-graph/src/ready.rs | 159 ++++++++++++++++++++++++--- 4 files changed, 301 insertions(+), 91 deletions(-) diff --git a/node/transaction-graph/src/future.rs b/node/transaction-graph/src/future.rs index e69de29bb2d1d..c1e7fb40966d1 100644 --- a/node/transaction-graph/src/future.rs +++ b/node/transaction-graph/src/future.rs @@ -0,0 +1,89 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use std::{ + collections::{HashMap, HashSet}, + hash, +}; + +use sr_primitives::transaction_validity::{ + TransactionTag as Tag, +}; + +use pool::Transaction; + +#[derive(Debug)] +pub struct WaitingTransaction { + pub transaction: Transaction, + pub missing_tags: HashSet, + pub hash: Hash, +} + +impl WaitingTransaction { + pub fn new(transaction: Transaction, hash: Hash, provided: &HashMap) -> Self { + let missing_tags = transaction.requires + .iter() + .filter(|tag| !provided.contains_key(&**tag)) + .cloned() + .collect(); + + WaitingTransaction { + transaction, + missing_tags, + hash, + } + } + + pub fn satisfy_tag(&mut self, tag: &Tag) { + self.missing_tags.remove(tag); + } + + pub fn is_ready(&self) -> bool { + self.missing_tags.is_empty() + } +} + +#[derive(Debug)] +pub struct FutureTransactions { + /// tags that are not yet provided by any transaction and we await for them + wanted_tags: HashMap>, + /// Transactions waiting for a particular other transaction + waiting: HashMap>, +} + +impl Default for FutureTransactions { + fn default() -> Self { + FutureTransactions { + wanted_tags: Default::default(), + waiting: Default::default(), + } + } +} + +impl FutureTransactions { + pub fn import(&mut self, tx: WaitingTransaction) { + assert!(!self.waiting.contains_key(&tx.hash), "Transaction is already imported."); + + // Add all tags that are missing + for tag in &tx.missing_tags { + let mut entry = self.wanted_tags.entry(tag.clone()).or_insert_with(Vec::new); + entry.push(tx.hash.clone()); + } + + // Add the transaction to a by-hash waiting map + self.waiting.insert(tx.hash.clone(), tx); + } +} diff --git a/node/transaction-graph/src/lib.rs b/node/transaction-graph/src/lib.rs index 2eb67bb1ded23..3f8ecf172ffae 100644 --- a/node/transaction-graph/src/lib.rs +++ b/node/transaction-graph/src/lib.rs @@ -36,5 +36,8 @@ extern crate error_chain; #[macro_use] extern crate log; +mod future; mod pool; mod ready; + +pub use self::pool::{Transaction, Pool}; diff --git a/node/transaction-graph/src/pool.rs b/node/transaction-graph/src/pool.rs index 45c07d80355ba..3f685660fc0c9 100644 --- a/node/transaction-graph/src/pool.rs +++ b/node/transaction-graph/src/pool.rs @@ -15,7 +15,6 @@ // along with Substrate. If not, see . use std::{ - collections::{HashMap, HashSet}, fmt, hash, }; @@ -28,6 +27,7 @@ use sr_primitives::transaction_validity::{ }; use ready::ReadyTransactions; +use future::{FutureTransactions, WaitingTransaction}; pub type BlockNumber = u64; @@ -41,60 +41,6 @@ pub struct Transaction { pub provides: Vec, } -#[derive(Debug)] -pub struct WaitingTransaction { - pub transaction: Transaction, - pub missing_tags: HashSet, - pub hash: Hash, -} - -impl WaitingTransaction { - pub fn new(transaction: Transaction, hash: Hash, provided: &HashMap) -> Self { - let missing_tags = transaction.requires - .iter() - .filter(|tag| !provided.contains_key(&**tag)) - .cloned() - .collect(); - - WaitingTransaction { - transaction, - missing_tags, - hash, - } - } - - pub fn satisfy_tag(&mut self, tag: &Tag) { - self.missing_tags.remove(tag); - } - - pub fn is_ready(&self) -> bool { - self.missing_tags.is_empty() - } -} - -#[derive(Debug)] -pub struct FutureTransactions { - /// tags that are not yet provided by any transaction and we await for them - wanted_tags: HashMap>, - /// Transactions waiting for a particular other transaction - waiting: HashMap>, -} - -impl Default for FutureTransactions { - fn default() -> Self { - FutureTransactions { - wanted_tags: Default::default(), - waiting: Default::default(), - } - } -} - -impl FutureTransactions { - pub fn import(&mut self, tx: WaitingTransaction) { - unimplemented!() - } -} - /// Transaction pool. /// /// Builds a dependency graph for all transactions in the pool and returns @@ -102,7 +48,6 @@ impl FutureTransactions { pub struct Pool { future: FutureTransactions, ready: ReadyTransactions, - insertion_id: u64, hasher: Box Hash>, } @@ -111,7 +56,6 @@ impl fmt::Debug for Pool { fmt.debug_struct("Pool") .field("future", &self.future) .field("ready", &self.ready) - .field("insertion_id", &self.insertion_id) .finish() } } @@ -124,7 +68,6 @@ impl Pool { Pool { future: Default::default(), ready: Default::default(), - insertion_id: Default::default(), hasher: Box::new(hasher), } } @@ -132,20 +75,15 @@ impl Pool { pub fn import( &mut self, block_number: BlockNumber, - ex: UncheckedExtrinsic, - priority: Priority, - longevity: Longevity, - requires: Vec, - provides: Vec, + tx: Transaction, ) { - let hash = (self.hasher)(&ex); - let tx = Transaction { ex, priority, longevity, requires, provides }; + let hash = (self.hasher)(&tx.ex); + // TODO [ToDr] check if transaction is already imported let tx = WaitingTransaction::new(tx, hash, self.ready.provided_tags()); // Tags provided from ready queue satisfy all requirements, so just add to ready. if tx.is_ready() { - self.insertion_id += 1; - self.ready.import(tx, self.insertion_id, block_number); + self.ready.import(tx, block_number); // TODO [ToDr] Check what transactions from future are unlocked. } else { self.future.import(tx); @@ -164,7 +102,7 @@ mod tests { type Hash = u64; fn pool() -> Pool { - Pool::new(|ex| ex.0.len() as u64) + Pool::new(|ex| ex.0[0] as u64) } #[test] @@ -173,16 +111,67 @@ mod tests { let mut pool = pool(); // when - pool.import( - 1, - UncheckedExtrinsic(vec![1u8]), - 5u64, - 64u64, - vec![], - vec![vec![1]], - ); + pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![1u8]), + priority: 5u64, + longevity: 64u64, + requires: vec![], + provides: vec![vec![1]], + }); // then assert_eq!(pool.ready().len(), 1); } + + #[test] + fn should_not_import_same_transaction_twice() { + // given + let mut pool = pool(); + + // when + pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![1u8]), + priority: 5u64, + longevity: 64u64, + requires: vec![], + provides: vec![vec![1]], + }); + pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![1u8]), + priority: 5u64, + longevity: 64u64, + requires: vec![], + provides: vec![vec![1]], + }); + + // then + assert_eq!(pool.ready().len(), 1); + } + + + #[test] + fn should_import_transaction_to_future_and_promote_it_later() { + // given + let mut pool = pool(); + + // when + pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![1u8]), + priority: 5u64, + longevity: 64u64, + requires: vec![vec![0]], + provides: vec![vec![1]], + }); + assert_eq!(pool.ready().len(), 0); + pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![2u8]), + priority: 5u64, + longevity: 64u64, + requires: vec![], + provides: vec![vec![0]], + }); + + // then + assert_eq!(pool.ready().len(), 2); + } } diff --git a/node/transaction-graph/src/ready.rs b/node/transaction-graph/src/ready.rs index 2c3adbfb9965d..e466bd4668ea4 100644 --- a/node/transaction-graph/src/ready.rs +++ b/node/transaction-graph/src/ready.rs @@ -23,11 +23,10 @@ use std::{ use sr_primitives::transaction_validity::{ TransactionTag as Tag, - TransactionPriority as Priority, }; -use super::pool::BlockNumber; -use super::pool::{Transaction, WaitingTransaction}; +use future::WaitingTransaction; +use pool::{BlockNumber, Transaction}; #[derive(Debug, Clone)] pub struct TransactionRef { @@ -41,8 +40,8 @@ pub struct TransactionRef { impl Ord for TransactionRef { fn cmp(&self, other: &Self) -> cmp::Ordering { self.transaction.priority.cmp(&other.transaction.priority) - .then(self.valid_till.cmp(&other.valid_till)) - .then(self.insertion_id.cmp(&other.insertion_id)) + .then(other.valid_till.cmp(&self.valid_till)) + .then(other.insertion_id.cmp(&self.insertion_id)) } } @@ -67,6 +66,8 @@ struct ReadyTx { #[derive(Debug)] pub struct ReadyTransactions { + /// Insertion id + insertion_id: u64, /// tags that are provided by ready transactions provided_tags: HashMap, /// Transactions that are ready (i.e. don't have any requirements external to the pool) @@ -79,6 +80,7 @@ pub struct ReadyTransactions { impl Default for ReadyTransactions { fn default() -> Self { ReadyTransactions { + insertion_id: Default::default(), provided_tags: Default::default(), ready: Default::default(), best: Default::default(), @@ -93,14 +95,18 @@ impl ReadyTransactions { pub fn get<'a>(&'a self) -> impl Iterator> + 'a { BestIterator { - ready: &self.ready, + all: &self.ready, best: self.best.clone(), + awaiting: Default::default(), } } - pub fn import(&mut self, tx: WaitingTransaction, insertion_id: u64, block_number: BlockNumber) { + pub fn import(&mut self, tx: WaitingTransaction, block_number: BlockNumber) { assert!(tx.is_ready(), "Only ready transactions can be imported."); + assert!(!self.ready.contains_key(&tx.hash), "Transaction is already imported."); + self.insertion_id += 1; + let insertion_id = self.insertion_id; let hash = tx.hash; let tx = tx.transaction; let mut goes_to_best = true; @@ -117,7 +123,8 @@ impl ReadyTransactions { // update provided_tags for tag in tx.provides.clone() { - self.provided_tags.insert(tag, hash.clone()); + let prev = self.provided_tags.insert(tag, hash.clone()); + // TODO [ToDr] Only one extrinsic can provide a tag, so we need to remove the previous one! } let transaction = TransactionRef { @@ -141,10 +148,25 @@ impl ReadyTransactions { } pub struct BestIterator<'a, Hash: 'a> { - ready: &'a HashMap>, + all: &'a HashMap>, + awaiting: HashMap)>, best: BTreeSet>, } +impl<'a, Hash: 'a + hash:: Hash + Eq + Clone> BestIterator<'a, Hash> { + /// Depending on number of satisfied requirements insert given ref + /// either to awaiting set or to best set. + fn best_or_awaiting(&mut self, satisfied: usize, tx_ref: TransactionRef) { + if satisfied == tx_ref.transaction.requires.len() { + // If we have satisfied all deps insert to best + self.best.insert(tx_ref); + } else { + // otherwise we're still awaiting for some deps + self.awaiting.insert(tx_ref.hash.clone(), (satisfied, tx_ref)); + } + } +} + impl<'a, Hash: 'a + hash::Hash + Eq + Clone> Iterator for BestIterator<'a, Hash> { type Item = Arc; @@ -152,14 +174,17 @@ impl<'a, Hash: 'a + hash::Hash + Eq + Clone> Iterator for BestIterator<'a, Hash> let best = self.best.iter().next()?.clone(); let best = self.best.take(&best)?; - let ready = self.ready.get(&best.hash)?; + let ready = self.all.get(&best.hash)?; // Insert transactions that just got unlocked. for hash in &ready.unlocks { - // TODO [ToDr] This is actually invalid. - // We need to check if all `requires` were already returned. - if let Some(other) = self.ready.get(hash) { - self.best.insert(other.transaction.clone()); + // first check local awaiting transactions + if let Some((mut satisfied, tx_ref)) = self.awaiting.remove(hash) { + satisfied += 1; + self.best_or_awaiting(satisfied, tx_ref); + // then get from the pool + } else if let Some(next) = self.all.get(hash) { + self.best_or_awaiting(1, next.transaction.clone()); } } @@ -169,9 +194,113 @@ impl<'a, Hash: 'a + hash::Hash + Eq + Clone> Iterator for BestIterator<'a, Hash> #[cfg(test)] mod tests { + use super::*; + use primitives::UncheckedExtrinsic; + + fn tx(id: u8) -> Transaction { + Transaction { + ex: UncheckedExtrinsic(vec![id]), + priority: 1, + longevity: 2, + requires: vec![vec![1], vec![2]], + provides: vec![vec![3], vec![4]], + } + } #[test] - fn should_sort_best_transactions() { + fn should_replace_transaction_that_provides_the_same_tag() { + // given + + // when + + // then assert_eq!(true, false); } + + + #[test] + fn should_return_best_transactions_in_correct_order() { + // given + let mut ready = ReadyTransactions::default(); + let mut tx1 = tx(1); + tx1.requires.clear(); + let mut tx2 = tx(2); + tx2.requires = tx1.provides.clone(); + tx2.provides = vec![vec![106]]; + let mut tx3 = tx(3); + tx3.requires = vec![tx1.provides[0].clone(), vec![106]]; + tx3.provides = vec![]; + let mut tx4 = tx(4); + tx4.requires = vec![tx1.provides[0].clone()]; + tx4.provides = vec![]; + let block_number = 1; + + // when + let x = WaitingTransaction::new(tx1, 101, &ready.provided_tags()); + ready.import(x, block_number); + let x = WaitingTransaction::new(tx2, 102, &ready.provided_tags()); + ready.import(x, block_number); + let x = WaitingTransaction::new(tx3, 103, &ready.provided_tags()); + ready.import(x, block_number); + let x = WaitingTransaction::new(tx4, 104, &ready.provided_tags()); + ready.import(x, block_number); + + // then + assert_eq!(ready.best.len(), 1); + + let mut it = ready.get().map(|tx| tx.ex.0[0]); + + assert_eq!(it.next(), Some(1)); + assert_eq!(it.next(), Some(4)); + assert_eq!(it.next(), Some(2)); + assert_eq!(it.next(), Some(3)); + assert_eq!(it.next(), None); + } + + #[test] + fn should_order_refs() { + let mut id = 1; + let mut with_priority = |priority| { + id += 1; + let mut tx = tx(id); + tx.priority = priority; + tx + }; + // higher priority = better + assert!(TransactionRef { + transaction: Arc::new(with_priority(3)), + hash: 3, + valid_till: 3, + insertion_id: 1, + } > TransactionRef { + transaction: Arc::new(with_priority(2)), + hash: 3, + valid_till: 3, + insertion_id: 2, + }); + // lower validity = better + assert!(TransactionRef { + transaction: Arc::new(with_priority(3)), + hash: 3, + valid_till: 2, + insertion_id: 1, + } > TransactionRef { + transaction: Arc::new(with_priority(3)), + hash: 3, + valid_till: 3, + insertion_id: 2, + }); + // lower insertion_id = better + assert!(TransactionRef { + transaction: Arc::new(with_priority(3)), + hash: 3, + valid_till: 3, + insertion_id: 1, + } > TransactionRef { + transaction: Arc::new(with_priority(3)), + hash: 3, + valid_till: 3, + insertion_id: 2, + }); + } } From f51c5d0ff564dd9039a56d41e0898a8c3641d1ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Wed, 19 Sep 2018 10:24:42 +0200 Subject: [PATCH 03/14] Future -> Ready promotions. --- node/transaction-graph/src/future.rs | 41 +++++++++++ node/transaction-graph/src/pool.rs | 102 ++++++++++++++++++++++++--- node/transaction-graph/src/ready.rs | 13 +++- 3 files changed, 144 insertions(+), 12 deletions(-) diff --git a/node/transaction-graph/src/future.rs b/node/transaction-graph/src/future.rs index c1e7fb40966d1..da5d4b6dc9ebb 100644 --- a/node/transaction-graph/src/future.rs +++ b/node/transaction-graph/src/future.rs @@ -74,7 +74,14 @@ impl Default for FutureTransactions { } impl FutureTransactions { + /// Import transaction to future queue. + /// + /// Only transactions that don't have all their tags satisfied should occupy + /// the future queue. + /// As soon as required tags are provided by some other transactions that are ready + /// we should remove the transactions from here and move them to the other queue. pub fn import(&mut self, tx: WaitingTransaction) { + assert!(!tx.is_ready(), "Transaction is ready."); assert!(!self.waiting.contains_key(&tx.hash), "Transaction is already imported."); // Add all tags that are missing @@ -86,4 +93,38 @@ impl FutureTransactions { // Add the transaction to a by-hash waiting map self.waiting.insert(tx.hash.clone(), tx); } + + /// Returns true if given hash is part of the queue. + pub fn contains(&self, hash: &Hash) -> bool { + self.waiting.contains_key(hash) + } + + /// Satisfies provided tags in transactions that are waiting for them. + /// + /// Returns (and removes) transactions that became ready after their last tag got + /// satisfied and now we can remove them from future and move to ready queue. + pub fn satisfy_tags(&mut self, tags: &[Tag]) -> Vec> { + let mut became_ready = vec![]; + + for tag in tags { + if let Some(hashes) = self.wanted_tags.remove(tag) { + for hash in hashes { + let is_ready = { + let mut tx = self.waiting.get_mut(&hash) + .expect("Every transaction in wanted_tags is present in waiting; qed"); + tx.satisfy_tag(tag); + tx.is_ready() + }; + + if is_ready { + let tx = self.waiting.remove(&hash) + .expect("We just get_mut the entry; qed"); + became_ready.push(tx); + } + } + } + } + + became_ready + } } diff --git a/node/transaction-graph/src/pool.rs b/node/transaction-graph/src/pool.rs index 3f685660fc0c9..29d958ef5724e 100644 --- a/node/transaction-graph/src/pool.rs +++ b/node/transaction-graph/src/pool.rs @@ -17,6 +17,7 @@ use std::{ fmt, hash, + sync::Arc, }; use primitives::UncheckedExtrinsic; @@ -72,26 +73,52 @@ impl Pool { } } + /// Imports transaction to the pool. + /// + /// The pool consists of two parts: future and ready. + /// The former contains transactions that require some tags that are not yet provided by + /// other transactions in the pool. + /// The latter contains transactions that have all the requirements satisfied and are + /// ready to be included in the block. pub fn import( &mut self, block_number: BlockNumber, tx: Transaction, - ) { + ) -> Result<(), ()> { let hash = (self.hasher)(&tx.ex); + if self.future.contains(&hash) || self.ready.contains(&hash) { + return Err(()) + } + // TODO [ToDr] check if transaction is already imported let tx = WaitingTransaction::new(tx, hash, self.ready.provided_tags()); // Tags provided from ready queue satisfy all requirements, so just add to ready. if tx.is_ready() { - self.ready.import(tx, block_number); - // TODO [ToDr] Check what transactions from future are unlocked. + let mut to_import = vec![tx]; + + loop { + // take first transaction from the list + let tx = match to_import.pop() { + Some(tx) => tx, + None => break, + }; + + // find future transactions that it unlocks + to_import.append(&mut self.future.satisfy_tags(&tx.transaction.provides)); + + // import this transaction + self.ready.import(tx, block_number); + } } else { self.future.import(tx); } + + Ok(()) } - pub fn ready(&self) -> Vec<()> { - self.ready.get().map(|_| ()).collect() + pub fn ready(&self) -> Vec> { + self.ready.get().collect() } } @@ -117,7 +144,7 @@ mod tests { longevity: 64u64, requires: vec![], provides: vec![vec![1]], - }); + }).unwrap(); // then assert_eq!(pool.ready().len(), 1); @@ -135,14 +162,14 @@ mod tests { longevity: 64u64, requires: vec![], provides: vec![vec![1]], - }); + }).unwrap(); pool.import(1, Transaction { ex: UncheckedExtrinsic(vec![1u8]), priority: 5u64, longevity: 64u64, requires: vec![], provides: vec![vec![1]], - }); + }).unwrap_err(); // then assert_eq!(pool.ready().len(), 1); @@ -161,7 +188,7 @@ mod tests { longevity: 64u64, requires: vec![vec![0]], provides: vec![vec![1]], - }); + }).unwrap(); assert_eq!(pool.ready().len(), 0); pool.import(1, Transaction { ex: UncheckedExtrinsic(vec![2u8]), @@ -169,9 +196,64 @@ mod tests { longevity: 64u64, requires: vec![], provides: vec![vec![0]], - }); + }).unwrap(); // then assert_eq!(pool.ready().len(), 2); } + + #[test] + fn should_promote_a_subgraph() { + // given + let mut pool = pool(); + + // when + pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![1u8]), + priority: 5u64, + longevity: 64u64, + requires: vec![vec![0]], + provides: vec![vec![1]], + }).unwrap(); + pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![3u8]), + priority: 5u64, + longevity: 64u64, + requires: vec![vec![2]], + provides: vec![], + }).unwrap(); + pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![2u8]), + priority: 5u64, + longevity: 64u64, + requires: vec![vec![1]], + provides: vec![vec![3], vec![2]], + }).unwrap(); + pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![4u8]), + priority: 1_000u64, + longevity: 64u64, + requires: vec![vec![3], vec![4]], + provides: vec![], + }).unwrap(); + assert_eq!(pool.ready().len(), 0); + + pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![5u8]), + priority: 5u64, + longevity: 64u64, + requires: vec![], + provides: vec![vec![0], vec![4]], + }).unwrap(); + + // then + let mut it = pool.ready().into_iter().map(|tx| tx.ex.0[0]); + + assert_eq!(it.next(), Some(5)); + assert_eq!(it.next(), Some(1)); + assert_eq!(it.next(), Some(2)); + assert_eq!(it.next(), Some(4)); + assert_eq!(it.next(), Some(3)); + assert_eq!(it.next(), None); + } } diff --git a/node/transaction-graph/src/ready.rs b/node/transaction-graph/src/ready.rs index e466bd4668ea4..81ddaae516eda 100644 --- a/node/transaction-graph/src/ready.rs +++ b/node/transaction-graph/src/ready.rs @@ -145,6 +145,11 @@ impl ReadyTransactions { unlocks: vec![], }); } + + /// Returns true if given hash is part of the queue. + pub fn contains(&self, hash: &Hash) -> bool { + self.ready.contains_key(hash) + } } pub struct BestIterator<'a, Hash: 'a> { @@ -157,10 +162,14 @@ impl<'a, Hash: 'a + hash:: Hash + Eq + Clone> BestIterator<'a, Hash> { /// Depending on number of satisfied requirements insert given ref /// either to awaiting set or to best set. fn best_or_awaiting(&mut self, satisfied: usize, tx_ref: TransactionRef) { + println!("[{}/{}] Considering: {:?}", satisfied, tx_ref.transaction.requires.len(), tx_ref.transaction); if satisfied == tx_ref.transaction.requires.len() { // If we have satisfied all deps insert to best self.best.insert(tx_ref); + + println!("Best: {:?}", self.best.iter().map(|tx| tx.transaction.clone()).collect::>()); } else { + println!(" Still awaiting"); // otherwise we're still awaiting for some deps self.awaiting.insert(tx_ref.hash.clone(), (satisfied, tx_ref)); } @@ -171,7 +180,7 @@ impl<'a, Hash: 'a + hash::Hash + Eq + Clone> Iterator for BestIterator<'a, Hash> type Item = Arc; fn next(&mut self) -> Option { - let best = self.best.iter().next()?.clone(); + let best = self.best.iter().next_back()?.clone(); let best = self.best.take(&best)?; let ready = self.all.get(&best.hash)?; @@ -251,9 +260,9 @@ mod tests { let mut it = ready.get().map(|tx| tx.ex.0[0]); assert_eq!(it.next(), Some(1)); - assert_eq!(it.next(), Some(4)); assert_eq!(it.next(), Some(2)); assert_eq!(it.next(), Some(3)); + assert_eq!(it.next(), Some(4)); assert_eq!(it.next(), None); } From b66ebc86b1c1833c67e27062dd7691478d595180 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Wed, 19 Sep 2018 13:47:48 +0200 Subject: [PATCH 04/14] Replacement logic. --- node/transaction-graph/src/error.rs | 32 ++++++ node/transaction-graph/src/future.rs | 13 ++- node/transaction-graph/src/lib.rs | 1 + node/transaction-graph/src/pool.rs | 138 +++++++++++++++++++++--- node/transaction-graph/src/ready.rs | 150 +++++++++++++++++++++++---- 5 files changed, 299 insertions(+), 35 deletions(-) create mode 100644 node/transaction-graph/src/error.rs diff --git a/node/transaction-graph/src/error.rs b/node/transaction-graph/src/error.rs new file mode 100644 index 0000000000000..e6f3b2d3a1b42 --- /dev/null +++ b/node/transaction-graph/src/error.rs @@ -0,0 +1,32 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use sr_primitives::transaction_validity::TransactionPriority as Priority; + +error_chain! { + errors { + /// The transaction is already in the pool. + AlreadyImported { + description("Transaction is already in the pool."), + display("Already imported"), + } + /// The transaction cannot be imported cause it's a replacement and has too low priority. + TooLowPriority(old: Priority, new: Priority) { + description("The priority is too low to replace transactions already in the pool."), + display("Too low priority ({} > {})", old, new) + } + } +} diff --git a/node/transaction-graph/src/future.rs b/node/transaction-graph/src/future.rs index da5d4b6dc9ebb..1e41023407cd2 100644 --- a/node/transaction-graph/src/future.rs +++ b/node/transaction-graph/src/future.rs @@ -74,12 +74,12 @@ impl Default for FutureTransactions { } impl FutureTransactions { - /// Import transaction to future queue. + /// Import transaction to Future queue. /// /// Only transactions that don't have all their tags satisfied should occupy - /// the future queue. + /// the Future queue. /// As soon as required tags are provided by some other transactions that are ready - /// we should remove the transactions from here and move them to the other queue. + /// we should remove the transactions from here and move them to the Ready queue. pub fn import(&mut self, tx: WaitingTransaction) { assert!(!tx.is_ready(), "Transaction is ready."); assert!(!self.waiting.contains_key(&tx.hash), "Transaction is already imported."); @@ -102,7 +102,7 @@ impl FutureTransactions { /// Satisfies provided tags in transactions that are waiting for them. /// /// Returns (and removes) transactions that became ready after their last tag got - /// satisfied and now we can remove them from future and move to ready queue. + /// satisfied and now we can remove them from Future and move to Ready queue. pub fn satisfy_tags(&mut self, tags: &[Tag]) -> Vec> { let mut became_ready = vec![]; @@ -127,4 +127,9 @@ impl FutureTransactions { became_ready } + + /// Returns number of transactions in the Future queue. + pub fn len(&self) -> usize { + self.waiting.len() + } } diff --git a/node/transaction-graph/src/lib.rs b/node/transaction-graph/src/lib.rs index 3f8ecf172ffae..56a1bf5b73376 100644 --- a/node/transaction-graph/src/lib.rs +++ b/node/transaction-graph/src/lib.rs @@ -36,6 +36,7 @@ extern crate error_chain; #[macro_use] extern crate log; +mod error; mod future; mod pool; mod ready; diff --git a/node/transaction-graph/src/pool.rs b/node/transaction-graph/src/pool.rs index 29d958ef5724e..a25d0359b8739 100644 --- a/node/transaction-graph/src/pool.rs +++ b/node/transaction-graph/src/pool.rs @@ -27,13 +27,36 @@ use sr_primitives::transaction_validity::{ TransactionPriority as Priority, }; -use ready::ReadyTransactions; +use error; use future::{FutureTransactions, WaitingTransaction}; +use ready::ReadyTransactions; pub type BlockNumber = u64; +/// Successful import result. +#[derive(Debug, PartialEq, Eq)] +pub enum Imported { + /// Transaction was successfuly imported to Ready queue. + Ready { + /// Hash of transaction that was successfuly imported. + hash: Hash, + /// Transactions that got promoted from the Future queue. + promoted: Vec, + /// Transactions that failed to be promoted from the Future queue and are now discarded. + failed: Vec, + /// Transactions removed from the Ready pool (replaced). + removed: Vec>, + }, + /// Transaction was successfuly imported to Future queue. + Future { + /// Hash of transaction that was successfuly imported. + hash: Hash, + } +} + /// Immutable transaction -#[derive(Debug)] +#[cfg_attr(test, derive(Clone))] +#[derive(Debug, PartialEq, Eq)] pub struct Transaction { pub ex: UncheckedExtrinsic, pub priority: Priority, @@ -75,7 +98,7 @@ impl Pool { /// Imports transaction to the pool. /// - /// The pool consists of two parts: future and ready. + /// The pool consists of two parts: Future and Ready. /// The former contains transactions that require some tags that are not yet provided by /// other transactions in the pool. /// The latter contains transactions that have all the requirements satisfied and are @@ -84,17 +107,21 @@ impl Pool { &mut self, block_number: BlockNumber, tx: Transaction, - ) -> Result<(), ()> { + ) -> error::Result> { let hash = (self.hasher)(&tx.ex); if self.future.contains(&hash) || self.ready.contains(&hash) { - return Err(()) + bail!(error::ErrorKind::AlreadyImported) } - // TODO [ToDr] check if transaction is already imported - let tx = WaitingTransaction::new(tx, hash, self.ready.provided_tags()); + let tx = WaitingTransaction::new(tx, hash.clone(), self.ready.provided_tags()); - // Tags provided from ready queue satisfy all requirements, so just add to ready. + // Tags provided from Ready queue satisfy all requirements, so just add to Ready. if tx.is_ready() { + let mut promoted = vec![]; + let mut failed = vec![]; + let mut removed = vec![]; + + let mut first = true; let mut to_import = vec![tx]; loop { @@ -104,17 +131,39 @@ impl Pool { None => break, }; - // find future transactions that it unlocks + // find transactions in Future that it unlocks to_import.append(&mut self.future.satisfy_tags(&tx.transaction.provides)); // import this transaction - self.ready.import(tx, block_number); + let hash = tx.hash.clone(); + match self.ready.import(tx, block_number) { + Ok(mut replaced) => { + if !first { + promoted.push(hash); + } + // The transactions were removed from the ready pool. We might attempt to re-import them. + removed.append(&mut replaced); + }, + // transaction failed to be imported. + Err(e) => if first { + return Err(e) + } else { + failed.push(hash); + }, + } + first = false; } + + Ok(Imported::Ready { + hash, + promoted, + failed, + removed, + }) } else { self.future.import(tx); + Ok(Imported::Future { hash }) } - - Ok(()) } pub fn ready(&self) -> Vec> { @@ -238,7 +287,7 @@ mod tests { }).unwrap(); assert_eq!(pool.ready().len(), 0); - pool.import(1, Transaction { + let res = pool.import(1, Transaction { ex: UncheckedExtrinsic(vec![5u8]), priority: 5u64, longevity: 64u64, @@ -255,5 +304,68 @@ mod tests { assert_eq!(it.next(), Some(4)); assert_eq!(it.next(), Some(3)); assert_eq!(it.next(), None); + assert_eq!(res, Imported::Ready { + hash: 5, + promoted: vec![1, 2, 3, 4], + failed: vec![], + removed: vec![], + }); + } + + #[test] + fn should_handle_a_cycle() { + // given + let mut pool = pool(); + pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![1u8]), + priority: 5u64, + longevity: 64u64, + requires: vec![vec![0]], + provides: vec![vec![1]], + }).unwrap(); + pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![3u8]), + priority: 5u64, + longevity: 64u64, + requires: vec![vec![1]], + provides: vec![vec![2]], + }).unwrap(); + assert_eq!(pool.ready().len(), 0); + + // when + pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![2u8]), + priority: 5u64, + longevity: 64u64, + requires: vec![vec![2]], + provides: vec![vec![0]], + }).unwrap(); + + // then + let mut it = pool.ready().into_iter().map(|tx| tx.ex.0[0]); + assert_eq!(it.next(), None); + // all transactions occupy the Future queue - it's fine + assert_eq!(pool.future.len(), 3); + + // let's close the cycle with one additional transaction + let res = pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![4u8]), + priority: 50u64, + longevity: 64u64, + requires: vec![], + provides: vec![vec![0]], + }).unwrap(); + let mut it = pool.ready().into_iter().map(|tx| tx.ex.0[0]); + assert_eq!(it.next(), Some(4)); + assert_eq!(it.next(), Some(1)); + assert_eq!(it.next(), Some(3)); + assert_eq!(it.next(), None); + assert_eq!(res, Imported::Ready { + hash: 4, + promoted: vec![1, 3], + failed: vec![2], + removed: vec![], + }); + assert_eq!(pool.future.len(), 0); } } diff --git a/node/transaction-graph/src/ready.rs b/node/transaction-graph/src/ready.rs index 81ddaae516eda..21d41d379e4e6 100644 --- a/node/transaction-graph/src/ready.rs +++ b/node/transaction-graph/src/ready.rs @@ -15,7 +15,7 @@ // along with Substrate. If not, see . use std::{ - collections::{HashMap, BTreeSet}, + collections::{HashMap, HashSet, BTreeSet}, cmp, hash, sync::Arc, @@ -25,6 +25,7 @@ use sr_primitives::transaction_validity::{ TransactionTag as Tag, }; +use error; use future::WaitingTransaction; use pool::{BlockNumber, Transaction}; @@ -36,7 +37,6 @@ pub struct TransactionRef { pub insertion_id: u64, } -// TODO [ToDr] Testme impl Ord for TransactionRef { fn cmp(&self, other: &Self) -> cmp::Ordering { self.transaction.priority.cmp(&other.transaction.priority) @@ -64,11 +64,13 @@ struct ReadyTx { pub unlocks: Vec, } +const HASH_READY: &str = "Every hash is in ready map; qed"; + #[derive(Debug)] pub struct ReadyTransactions { /// Insertion id insertion_id: u64, - /// tags that are provided by ready transactions + /// tags that are provided by Ready transactions provided_tags: HashMap, /// Transactions that are ready (i.e. don't have any requirements external to the pool) ready: HashMap>, @@ -89,10 +91,22 @@ impl Default for ReadyTransactions { } impl ReadyTransactions { + /// Borrows a map of tags that are provided by transactions in this queue. pub fn provided_tags(&self) -> &HashMap { &self.provided_tags } + /// Returns an iterator of ready transactions. + /// + /// Transactions are returned in order: + /// 1. First by the dependencies: + /// - never return transaction that requires a tag, which was not provided by one of the previously returned transactions + /// 2. Then by priority: + /// - If there are two transactions with all requirements satisfied the one with higher priority goes first. + /// 3. Then by the ttl that's left + /// - transactions that are valid for a shorter time go first + /// 4. Lastly we sort by the time in the queue + /// - transactions that are longer in the queue go first pub fn get<'a>(&'a self) -> impl Iterator> + 'a { BestIterator { all: &self.ready, @@ -101,7 +115,15 @@ impl ReadyTransactions { } } - pub fn import(&mut self, tx: WaitingTransaction, block_number: BlockNumber) { + /// Imports transactions to the pool of ready transactions. + /// + /// The transaction needs to have all tags satisfied (be ready) by transactions + /// that are in this queue. + pub fn import( + &mut self, + tx: WaitingTransaction, + block_number: BlockNumber, + ) -> error::Result>> { assert!(tx.is_ready(), "Only ready transactions can be imported."); assert!(!self.ready.contains_key(&tx.hash), "Transaction is already imported."); @@ -109,12 +131,15 @@ impl ReadyTransactions { let insertion_id = self.insertion_id; let hash = tx.hash; let tx = tx.transaction; + + let replaced = self.replace_previous(&tx)?; + let mut goes_to_best = true; - // Add info what transactions unlock this one. + // Add links to transactions that unlock the current one for tag in &tx.requires { // Check if the transaction that satisfies the tag is still in the queue. if let Some(other) = self.provided_tags.get(tag) { - let mut tx = self.ready.get_mut(other).expect("All hashes are present in ready; qed"); + let mut tx = self.ready.get_mut(other).expect(HASH_READY); tx.unlocks.push(hash.clone()); // this transaction depends on some other, so it doesn't go to best directly. goes_to_best = false; @@ -123,8 +148,7 @@ impl ReadyTransactions { // update provided_tags for tag in tx.provides.clone() { - let prev = self.provided_tags.insert(tag, hash.clone()); - // TODO [ToDr] Only one extrinsic can provide a tag, so we need to remove the previous one! + self.provided_tags.insert(tag, hash.clone()); } let transaction = TransactionRef { @@ -139,17 +163,83 @@ impl ReadyTransactions { self.best.insert(transaction.clone()); } - // insert to ready + // insert to Ready self.ready.insert(hash, ReadyTx { transaction, unlocks: vec![], }); + + Ok(replaced) } /// Returns true if given hash is part of the queue. pub fn contains(&self, hash: &Hash) -> bool { self.ready.contains_key(hash) } + + /// Checks if the transaction is providing the same tags as other transactions. + /// + /// In case that's true it determines if the priority of transactions that + /// we are about to replace is lower than the priority of the replacement transaction. + /// We remove/replace old transactions in case they have lower priority. + /// + /// In case replacement is succesful returns a list of removed transactions. + fn replace_previous(&mut self, tx: &Transaction) -> error::Result>> { + let mut to_remove = { + // check if we are replacing a transaction + let replace_hashes = tx.provides + .iter() + .filter_map(|tag| self.provided_tags.get(tag)) + .collect::>(); + + // early exit if we are not replacing anything. + if replace_hashes.is_empty() { + return Ok(vec![]); + } + + // now check if collective priority is lower than the replacement transaction. + let old_priority = replace_hashes + .iter() + .filter_map(|hash| self.ready.get(hash)) + .fold(0u64, |total, tx| total.saturating_add(tx.transaction.transaction.priority)); + + // bail - the transaction has too low priority to replace the old ones + if old_priority >= tx.priority { + bail!(error::ErrorKind::TooLowPriority(old_priority, tx.priority)) + } + + replace_hashes.into_iter().cloned().collect::>() + }; + + let new_provides = tx.provides.iter().cloned().collect::>(); + let mut removed = vec![]; + loop { + let hash = match to_remove.pop() { + Some(hash) => hash, + None => return Ok(removed), + }; + + let tx = self.ready.remove(&hash).expect(HASH_READY); + // check if this transaction provides stuff that is not provided by the new one. + let (mut unlocks, tx) = (tx.unlocks, tx.transaction.transaction); + { + let invalidated = tx.provides + .iter() + .filter(|tag| !new_provides.contains(&**tag)); + + for tag in invalidated { + // remove the tag since it's no longer provided by any transaction + self.provided_tags.remove(tag); + // add more transactions to remove + to_remove.append(&mut unlocks); + } + } + + removed.push(tx); + } + } + + } pub struct BestIterator<'a, Hash: 'a> { @@ -162,14 +252,11 @@ impl<'a, Hash: 'a + hash:: Hash + Eq + Clone> BestIterator<'a, Hash> { /// Depending on number of satisfied requirements insert given ref /// either to awaiting set or to best set. fn best_or_awaiting(&mut self, satisfied: usize, tx_ref: TransactionRef) { - println!("[{}/{}] Considering: {:?}", satisfied, tx_ref.transaction.requires.len(), tx_ref.transaction); if satisfied == tx_ref.transaction.requires.len() { // If we have satisfied all deps insert to best self.best.insert(tx_ref); - println!("Best: {:?}", self.best.iter().map(|tx| tx.transaction.clone()).collect::>()); } else { - println!(" Still awaiting"); // otherwise we're still awaiting for some deps self.awaiting.insert(tx_ref.hash.clone(), (satisfied, tx_ref)); } @@ -183,7 +270,11 @@ impl<'a, Hash: 'a + hash::Hash + Eq + Clone> Iterator for BestIterator<'a, Hash> let best = self.best.iter().next_back()?.clone(); let best = self.best.take(&best)?; - let ready = self.all.get(&best.hash)?; + let ready = match self.all.get(&best.hash) { + Some(ready) => ready, + // The transaction is not in all, maybe it was removed in the meantime? + None => return self.next(), + }; // Insert transactions that just got unlocked. for hash in &ready.unlocks { @@ -219,11 +310,34 @@ mod tests { #[test] fn should_replace_transaction_that_provides_the_same_tag() { // given + let mut ready = ReadyTransactions::default(); + let block_number = 1; + let mut tx1 = tx(1); + tx1.requires.clear(); + let mut tx2 = tx(2); + tx2.requires.clear(); + tx2.provides = vec![vec![3]]; + let mut tx3 = tx(3); + tx3.requires.clear(); + tx3.provides = vec![vec![4]]; // when + let x = WaitingTransaction::new(tx2, 102, &ready.provided_tags()); + ready.import(x, block_number).unwrap(); + let x = WaitingTransaction::new(tx3, 103, &ready.provided_tags()); + ready.import(x, block_number).unwrap(); + assert_eq!(ready.get().count(), 2); + + // too low priority + let x = WaitingTransaction::new(tx1.clone(), 101, &ready.provided_tags()); + ready.import(x, block_number).unwrap_err(); + + tx1.priority = 10; + let x = WaitingTransaction::new(tx1.clone(), 101, &ready.provided_tags()); + ready.import(x, block_number).unwrap(); // then - assert_eq!(true, false); + assert_eq!(ready.get().count(), 1); } @@ -246,13 +360,13 @@ mod tests { // when let x = WaitingTransaction::new(tx1, 101, &ready.provided_tags()); - ready.import(x, block_number); + ready.import(x, block_number).unwrap(); let x = WaitingTransaction::new(tx2, 102, &ready.provided_tags()); - ready.import(x, block_number); + ready.import(x, block_number).unwrap(); let x = WaitingTransaction::new(tx3, 103, &ready.provided_tags()); - ready.import(x, block_number); + ready.import(x, block_number).unwrap(); let x = WaitingTransaction::new(tx4, 104, &ready.provided_tags()); - ready.import(x, block_number); + ready.import(x, block_number).unwrap(); // then assert_eq!(ready.best.len(), 1); From 83aa78daba2396d9c35b94c276d213c27826bf7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Wed, 19 Sep 2018 14:26:16 +0200 Subject: [PATCH 05/14] Clear extern crates, add docs. --- Cargo.lock | 7 ------- node/transaction-graph/Cargo.toml | 9 +-------- node/transaction-graph/src/future.rs | 1 + node/transaction-graph/src/lib.rs | 19 ++++++++---------- node/transaction-graph/src/pool.rs | 29 ++++++++++++++++++---------- 5 files changed, 29 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 65d3b88e2dd59..94befc16424c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1603,15 +1603,8 @@ version = "0.1.0" dependencies = [ "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", - "node-api 0.1.0", "node-primitives 0.1.0", - "node-runtime 0.1.0", - "parity-codec 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "sr-primitives 0.1.0", - "substrate-client 0.1.0", - "substrate-keyring 0.1.0", - "substrate-primitives 0.1.0", ] [[package]] diff --git a/node/transaction-graph/Cargo.toml b/node/transaction-graph/Cargo.toml index 23f9362900a94..482674ec90dfc 100644 --- a/node/transaction-graph/Cargo.toml +++ b/node/transaction-graph/Cargo.toml @@ -4,14 +4,7 @@ version = "0.1.0" authors = ["Parity Technologies "] [dependencies] -log = "0.3.0" error-chain = "0.12" -parking_lot = "0.4" -node-api = { path = "../api" } +log = "0.3.0" node-primitives = { path = "../primitives" } -node-runtime = { path = "../runtime" } -substrate-client = { path = "../../core/client" } -parity-codec = { version = "1.1" } -substrate-keyring = { path = "../../core/keyring" } -substrate-primitives = { path = "../../core/primitives" } sr-primitives = { path = "../../core/sr-primitives" } diff --git a/node/transaction-graph/src/future.rs b/node/transaction-graph/src/future.rs index 1e41023407cd2..b90178030157e 100644 --- a/node/transaction-graph/src/future.rs +++ b/node/transaction-graph/src/future.rs @@ -129,6 +129,7 @@ impl FutureTransactions { } /// Returns number of transactions in the Future queue. + #[cfg(test)] pub fn len(&self) -> usize { self.waiting.len() } diff --git a/node/transaction-graph/src/lib.rs b/node/transaction-graph/src/lib.rs index 56a1bf5b73376..043b801d6d075 100644 --- a/node/transaction-graph/src/lib.rs +++ b/node/transaction-graph/src/lib.rs @@ -14,21 +14,18 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . +//! Generic Transaction Pool +//! +//! The pool is based on dependency graph between transactions +//! and their priority. +//! The pool is able to return an iterator that traverses transaction +//! graph in the correct order taking into account priorities and dependencies. -// #![warn(missing_docs)] -// #![warn(unused_crates)] +#![warn(missing_docs)] +#![warn(unused_extern_crates)] -extern crate substrate_client as client; -extern crate parity_codec as codec; -extern crate substrate_primitives; extern crate sr_primitives; -extern crate node_runtime as runtime; extern crate node_primitives as primitives; -extern crate node_api; -extern crate parking_lot; - -#[cfg(test)] -extern crate substrate_keyring; #[macro_use] extern crate error_chain; diff --git a/node/transaction-graph/src/pool.rs b/node/transaction-graph/src/pool.rs index a25d0359b8739..5a721af3677be 100644 --- a/node/transaction-graph/src/pool.rs +++ b/node/transaction-graph/src/pool.rs @@ -58,10 +58,15 @@ pub enum Imported { #[cfg_attr(test, derive(Clone))] #[derive(Debug, PartialEq, Eq)] pub struct Transaction { + /// Raw extrinsic representing that transaction. pub ex: UncheckedExtrinsic, + /// Transaction priority (higher = better) pub priority: Priority, + /// How many blocks the transaction is valid for. pub longevity: Longevity, + /// Tags required by the transaction. pub requires: Vec, + /// Tags that this transaction provides. pub provides: Vec, } @@ -85,6 +90,7 @@ impl fmt::Debug for Pool { } impl Pool { + /// Creates new transaction pool with given hasher. pub fn new(hasher: F) -> Self where F: Fn(&UncheckedExtrinsic) -> Hash + 'static, @@ -166,8 +172,9 @@ impl Pool { } } - pub fn ready(&self) -> Vec> { - self.ready.get().collect() + /// Returns an iterator over ready transactions in the pool. + pub fn ready<'a>(&'a self) -> impl Iterator> + 'a { + self.ready.get() } } @@ -196,7 +203,7 @@ mod tests { }).unwrap(); // then - assert_eq!(pool.ready().len(), 1); + assert_eq!(pool.ready().count(), 1); } #[test] @@ -221,7 +228,7 @@ mod tests { }).unwrap_err(); // then - assert_eq!(pool.ready().len(), 1); + assert_eq!(pool.ready().count(), 1); } @@ -238,7 +245,7 @@ mod tests { requires: vec![vec![0]], provides: vec![vec![1]], }).unwrap(); - assert_eq!(pool.ready().len(), 0); + assert_eq!(pool.ready().count(), 0); pool.import(1, Transaction { ex: UncheckedExtrinsic(vec![2u8]), priority: 5u64, @@ -248,7 +255,7 @@ mod tests { }).unwrap(); // then - assert_eq!(pool.ready().len(), 2); + assert_eq!(pool.ready().count(), 2); } #[test] @@ -285,7 +292,7 @@ mod tests { requires: vec![vec![3], vec![4]], provides: vec![], }).unwrap(); - assert_eq!(pool.ready().len(), 0); + assert_eq!(pool.ready().count(), 0); let res = pool.import(1, Transaction { ex: UncheckedExtrinsic(vec![5u8]), @@ -330,7 +337,7 @@ mod tests { requires: vec![vec![1]], provides: vec![vec![2]], }).unwrap(); - assert_eq!(pool.ready().len(), 0); + assert_eq!(pool.ready().count(), 0); // when pool.import(1, Transaction { @@ -342,8 +349,10 @@ mod tests { }).unwrap(); // then - let mut it = pool.ready().into_iter().map(|tx| tx.ex.0[0]); - assert_eq!(it.next(), None); + { + let mut it = pool.ready().into_iter().map(|tx| tx.ex.0[0]); + assert_eq!(it.next(), None); + } // all transactions occupy the Future queue - it's fine assert_eq!(pool.future.len(), 3); From be73dee63bad1696508c041f765234f8baa8568f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Wed, 19 Sep 2018 14:38:17 +0200 Subject: [PATCH 06/14] Move hash externally. --- node/transaction-graph/src/future.rs | 12 +++--- node/transaction-graph/src/pool.rs | 61 +++++++++++++--------------- node/transaction-graph/src/ready.rs | 45 +++++++++----------- 3 files changed, 53 insertions(+), 65 deletions(-) diff --git a/node/transaction-graph/src/future.rs b/node/transaction-graph/src/future.rs index b90178030157e..0b7b20f034948 100644 --- a/node/transaction-graph/src/future.rs +++ b/node/transaction-graph/src/future.rs @@ -27,13 +27,12 @@ use pool::Transaction; #[derive(Debug)] pub struct WaitingTransaction { - pub transaction: Transaction, + pub transaction: Transaction, pub missing_tags: HashSet, - pub hash: Hash, } impl WaitingTransaction { - pub fn new(transaction: Transaction, hash: Hash, provided: &HashMap) -> Self { + pub fn new(transaction: Transaction, provided: &HashMap) -> Self { let missing_tags = transaction.requires .iter() .filter(|tag| !provided.contains_key(&**tag)) @@ -43,7 +42,6 @@ impl WaitingTransaction { WaitingTransaction { transaction, missing_tags, - hash, } } @@ -82,16 +80,16 @@ impl FutureTransactions { /// we should remove the transactions from here and move them to the Ready queue. pub fn import(&mut self, tx: WaitingTransaction) { assert!(!tx.is_ready(), "Transaction is ready."); - assert!(!self.waiting.contains_key(&tx.hash), "Transaction is already imported."); + assert!(!self.waiting.contains_key(&tx.transaction.hash), "Transaction is already imported."); // Add all tags that are missing for tag in &tx.missing_tags { let mut entry = self.wanted_tags.entry(tag.clone()).or_insert_with(Vec::new); - entry.push(tx.hash.clone()); + entry.push(tx.transaction.hash.clone()); } // Add the transaction to a by-hash waiting map - self.waiting.insert(tx.hash.clone(), tx); + self.waiting.insert(tx.transaction.hash.clone(), tx); } /// Returns true if given hash is part of the queue. diff --git a/node/transaction-graph/src/pool.rs b/node/transaction-graph/src/pool.rs index 5a721af3677be..3a2369a7fd78a 100644 --- a/node/transaction-graph/src/pool.rs +++ b/node/transaction-graph/src/pool.rs @@ -45,7 +45,7 @@ pub enum Imported { /// Transactions that failed to be promoted from the Future queue and are now discarded. failed: Vec, /// Transactions removed from the Ready pool (replaced). - removed: Vec>, + removed: Vec>>, }, /// Transaction was successfuly imported to Future queue. Future { @@ -57,9 +57,11 @@ pub enum Imported { /// Immutable transaction #[cfg_attr(test, derive(Clone))] #[derive(Debug, PartialEq, Eq)] -pub struct Transaction { +pub struct Transaction { /// Raw extrinsic representing that transaction. pub ex: UncheckedExtrinsic, + /// Transaction hash (unique) + pub hash: Hash, /// Transaction priority (higher = better) pub priority: Priority, /// How many blocks the transaction is valid for. @@ -74,34 +76,13 @@ pub struct Transaction { /// /// Builds a dependency graph for all transactions in the pool and returns /// the ones that are currently ready to be executed. +#[derive(Default, Debug)] pub struct Pool { future: FutureTransactions, ready: ReadyTransactions, - hasher: Box Hash>, } -impl fmt::Debug for Pool { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("Pool") - .field("future", &self.future) - .field("ready", &self.ready) - .finish() - } -} - -impl Pool { - /// Creates new transaction pool with given hasher. - pub fn new(hasher: F) -> Self - where - F: Fn(&UncheckedExtrinsic) -> Hash + 'static, - { - Pool { - future: Default::default(), - ready: Default::default(), - hasher: Box::new(hasher), - } - } - +impl Pool { /// Imports transaction to the pool. /// /// The pool consists of two parts: Future and Ready. @@ -112,15 +93,16 @@ impl Pool { pub fn import( &mut self, block_number: BlockNumber, - tx: Transaction, + tx: Transaction, ) -> error::Result> { - let hash = (self.hasher)(&tx.ex); + let hash = tx.hash.clone(); if self.future.contains(&hash) || self.ready.contains(&hash) { bail!(error::ErrorKind::AlreadyImported) } - let tx = WaitingTransaction::new(tx, hash.clone(), self.ready.provided_tags()); - + let tx = WaitingTransaction::new(tx, self.ready.provided_tags()); + trace!(target: "txpool", "[{:?}] {:?}", hash, tx); + debug!(target: "txpool", "[{:?}] Importing to {}", hash, if tx.is_ready() { "ready" } else { "future" }); // Tags provided from Ready queue satisfy all requirements, so just add to Ready. if tx.is_ready() { let mut promoted = vec![]; @@ -141,7 +123,7 @@ impl Pool { to_import.append(&mut self.future.satisfy_tags(&tx.transaction.provides)); // import this transaction - let hash = tx.hash.clone(); + let hash = tx.transaction.hash.clone(); match self.ready.import(tx, block_number) { Ok(mut replaced) => { if !first { @@ -152,6 +134,7 @@ impl Pool { }, // transaction failed to be imported. Err(e) => if first { + debug!(target: "txpool", "[{:?}] Error importing: {:?}", hash, e); return Err(e) } else { failed.push(hash); @@ -173,7 +156,7 @@ impl Pool { } /// Returns an iterator over ready transactions in the pool. - pub fn ready<'a>(&'a self) -> impl Iterator> + 'a { + pub fn ready<'a>(&'a self) -> impl Iterator>> + 'a { self.ready.get() } } @@ -185,7 +168,7 @@ mod tests { type Hash = u64; fn pool() -> Pool { - Pool::new(|ex| ex.0[0] as u64) + Pool::default() } #[test] @@ -196,6 +179,7 @@ mod tests { // when pool.import(1, Transaction { ex: UncheckedExtrinsic(vec![1u8]), + hash: 1u64, priority: 5u64, longevity: 64u64, requires: vec![], @@ -214,6 +198,7 @@ mod tests { // when pool.import(1, Transaction { ex: UncheckedExtrinsic(vec![1u8]), + hash: 1, priority: 5u64, longevity: 64u64, requires: vec![], @@ -221,6 +206,7 @@ mod tests { }).unwrap(); pool.import(1, Transaction { ex: UncheckedExtrinsic(vec![1u8]), + hash: 1, priority: 5u64, longevity: 64u64, requires: vec![], @@ -240,6 +226,7 @@ mod tests { // when pool.import(1, Transaction { ex: UncheckedExtrinsic(vec![1u8]), + hash: 1, priority: 5u64, longevity: 64u64, requires: vec![vec![0]], @@ -248,6 +235,7 @@ mod tests { assert_eq!(pool.ready().count(), 0); pool.import(1, Transaction { ex: UncheckedExtrinsic(vec![2u8]), + hash: 2, priority: 5u64, longevity: 64u64, requires: vec![], @@ -266,6 +254,7 @@ mod tests { // when pool.import(1, Transaction { ex: UncheckedExtrinsic(vec![1u8]), + hash: 1, priority: 5u64, longevity: 64u64, requires: vec![vec![0]], @@ -273,6 +262,7 @@ mod tests { }).unwrap(); pool.import(1, Transaction { ex: UncheckedExtrinsic(vec![3u8]), + hash: 3, priority: 5u64, longevity: 64u64, requires: vec![vec![2]], @@ -280,6 +270,7 @@ mod tests { }).unwrap(); pool.import(1, Transaction { ex: UncheckedExtrinsic(vec![2u8]), + hash: 2, priority: 5u64, longevity: 64u64, requires: vec![vec![1]], @@ -287,6 +278,7 @@ mod tests { }).unwrap(); pool.import(1, Transaction { ex: UncheckedExtrinsic(vec![4u8]), + hash: 4, priority: 1_000u64, longevity: 64u64, requires: vec![vec![3], vec![4]], @@ -296,6 +288,7 @@ mod tests { let res = pool.import(1, Transaction { ex: UncheckedExtrinsic(vec![5u8]), + hash: 5, priority: 5u64, longevity: 64u64, requires: vec![], @@ -325,6 +318,7 @@ mod tests { let mut pool = pool(); pool.import(1, Transaction { ex: UncheckedExtrinsic(vec![1u8]), + hash: 1, priority: 5u64, longevity: 64u64, requires: vec![vec![0]], @@ -332,6 +326,7 @@ mod tests { }).unwrap(); pool.import(1, Transaction { ex: UncheckedExtrinsic(vec![3u8]), + hash: 3, priority: 5u64, longevity: 64u64, requires: vec![vec![1]], @@ -342,6 +337,7 @@ mod tests { // when pool.import(1, Transaction { ex: UncheckedExtrinsic(vec![2u8]), + hash: 2, priority: 5u64, longevity: 64u64, requires: vec![vec![2]], @@ -359,6 +355,7 @@ mod tests { // let's close the cycle with one additional transaction let res = pool.import(1, Transaction { ex: UncheckedExtrinsic(vec![4u8]), + hash: 4, priority: 50u64, longevity: 64u64, requires: vec![], diff --git a/node/transaction-graph/src/ready.rs b/node/transaction-graph/src/ready.rs index 21d41d379e4e6..6dc44b3bcb610 100644 --- a/node/transaction-graph/src/ready.rs +++ b/node/transaction-graph/src/ready.rs @@ -31,8 +31,7 @@ use pool::{BlockNumber, Transaction}; #[derive(Debug, Clone)] pub struct TransactionRef { - pub transaction: Arc, - pub hash: Hash, + pub transaction: Arc>, pub valid_till: BlockNumber, pub insertion_id: u64, } @@ -107,7 +106,7 @@ impl ReadyTransactions { /// - transactions that are valid for a shorter time go first /// 4. Lastly we sort by the time in the queue /// - transactions that are longer in the queue go first - pub fn get<'a>(&'a self) -> impl Iterator> + 'a { + pub fn get<'a>(&'a self) -> impl Iterator>> + 'a { BestIterator { all: &self.ready, best: self.best.clone(), @@ -123,13 +122,13 @@ impl ReadyTransactions { &mut self, tx: WaitingTransaction, block_number: BlockNumber, - ) -> error::Result>> { + ) -> error::Result>>> { assert!(tx.is_ready(), "Only ready transactions can be imported."); - assert!(!self.ready.contains_key(&tx.hash), "Transaction is already imported."); + assert!(!self.ready.contains_key(&tx.transaction.hash), "Transaction is already imported."); self.insertion_id += 1; let insertion_id = self.insertion_id; - let hash = tx.hash; + let hash = tx.transaction.hash.clone(); let tx = tx.transaction; let replaced = self.replace_previous(&tx)?; @@ -153,7 +152,6 @@ impl ReadyTransactions { let transaction = TransactionRef { insertion_id, - hash: hash.clone(), valid_till: block_number + tx.longevity, transaction: Arc::new(tx), }; @@ -184,7 +182,7 @@ impl ReadyTransactions { /// We remove/replace old transactions in case they have lower priority. /// /// In case replacement is succesful returns a list of removed transactions. - fn replace_previous(&mut self, tx: &Transaction) -> error::Result>> { + fn replace_previous(&mut self, tx: &Transaction) -> error::Result>>> { let mut to_remove = { // check if we are replacing a transaction let replace_hashes = tx.provides @@ -258,19 +256,19 @@ impl<'a, Hash: 'a + hash:: Hash + Eq + Clone> BestIterator<'a, Hash> { } else { // otherwise we're still awaiting for some deps - self.awaiting.insert(tx_ref.hash.clone(), (satisfied, tx_ref)); + self.awaiting.insert(tx_ref.transaction.hash.clone(), (satisfied, tx_ref)); } } } impl<'a, Hash: 'a + hash::Hash + Eq + Clone> Iterator for BestIterator<'a, Hash> { - type Item = Arc; + type Item = Arc>; fn next(&mut self) -> Option { let best = self.best.iter().next_back()?.clone(); let best = self.best.take(&best)?; - let ready = match self.all.get(&best.hash) { + let ready = match self.all.get(&best.transaction.hash) { Some(ready) => ready, // The transaction is not in all, maybe it was removed in the meantime? None => return self.next(), @@ -297,9 +295,10 @@ mod tests { use super::*; use primitives::UncheckedExtrinsic; - fn tx(id: u8) -> Transaction { + fn tx(id: u8) -> Transaction { Transaction { ex: UncheckedExtrinsic(vec![id]), + hash: id as u64, priority: 1, longevity: 2, requires: vec![vec![1], vec![2]], @@ -322,18 +321,18 @@ mod tests { tx3.provides = vec![vec![4]]; // when - let x = WaitingTransaction::new(tx2, 102, &ready.provided_tags()); + let x = WaitingTransaction::new(tx2, &ready.provided_tags()); ready.import(x, block_number).unwrap(); - let x = WaitingTransaction::new(tx3, 103, &ready.provided_tags()); + let x = WaitingTransaction::new(tx3, &ready.provided_tags()); ready.import(x, block_number).unwrap(); assert_eq!(ready.get().count(), 2); // too low priority - let x = WaitingTransaction::new(tx1.clone(), 101, &ready.provided_tags()); + let x = WaitingTransaction::new(tx1.clone(), &ready.provided_tags()); ready.import(x, block_number).unwrap_err(); tx1.priority = 10; - let x = WaitingTransaction::new(tx1.clone(), 101, &ready.provided_tags()); + let x = WaitingTransaction::new(tx1.clone(), &ready.provided_tags()); ready.import(x, block_number).unwrap(); // then @@ -359,13 +358,13 @@ mod tests { let block_number = 1; // when - let x = WaitingTransaction::new(tx1, 101, &ready.provided_tags()); + let x = WaitingTransaction::new(tx1, &ready.provided_tags()); ready.import(x, block_number).unwrap(); - let x = WaitingTransaction::new(tx2, 102, &ready.provided_tags()); + let x = WaitingTransaction::new(tx2, &ready.provided_tags()); ready.import(x, block_number).unwrap(); - let x = WaitingTransaction::new(tx3, 103, &ready.provided_tags()); + let x = WaitingTransaction::new(tx3, &ready.provided_tags()); ready.import(x, block_number).unwrap(); - let x = WaitingTransaction::new(tx4, 104, &ready.provided_tags()); + let x = WaitingTransaction::new(tx4, &ready.provided_tags()); ready.import(x, block_number).unwrap(); // then @@ -392,36 +391,30 @@ mod tests { // higher priority = better assert!(TransactionRef { transaction: Arc::new(with_priority(3)), - hash: 3, valid_till: 3, insertion_id: 1, } > TransactionRef { transaction: Arc::new(with_priority(2)), - hash: 3, valid_till: 3, insertion_id: 2, }); // lower validity = better assert!(TransactionRef { transaction: Arc::new(with_priority(3)), - hash: 3, valid_till: 2, insertion_id: 1, } > TransactionRef { transaction: Arc::new(with_priority(3)), - hash: 3, valid_till: 3, insertion_id: 2, }); // lower insertion_id = better assert!(TransactionRef { transaction: Arc::new(with_priority(3)), - hash: 3, valid_till: 3, insertion_id: 1, } > TransactionRef { transaction: Arc::new(with_priority(3)), - hash: 3, valid_till: 3, insertion_id: 2, }); From 6776e9a035528b5a4b4732319d77c0c222413946 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 21 Sep 2018 13:20:03 +0200 Subject: [PATCH 07/14] Implement remove_invalid --- node/transaction-graph/src/error.rs | 5 + node/transaction-graph/src/future.rs | 30 ++- node/transaction-graph/src/lib.rs | 6 + node/transaction-graph/src/pool.rs | 267 ++++++++++++++++++++++----- node/transaction-graph/src/ready.rs | 37 ++++ 5 files changed, 295 insertions(+), 50 deletions(-) diff --git a/node/transaction-graph/src/error.rs b/node/transaction-graph/src/error.rs index e6f3b2d3a1b42..bf25accac12a0 100644 --- a/node/transaction-graph/src/error.rs +++ b/node/transaction-graph/src/error.rs @@ -28,5 +28,10 @@ error_chain! { description("The priority is too low to replace transactions already in the pool."), display("Too low priority ({} > {})", old, new) } + /// Deps cycle detected and we couldn't import transaction. + CycleDetected { + description("Transaction was not imported because of detected cycle."), + display("Cycle Detected"), + } } } diff --git a/node/transaction-graph/src/future.rs b/node/transaction-graph/src/future.rs index 0b7b20f034948..75be9cce4b324 100644 --- a/node/transaction-graph/src/future.rs +++ b/node/transaction-graph/src/future.rs @@ -57,7 +57,7 @@ impl WaitingTransaction { #[derive(Debug)] pub struct FutureTransactions { /// tags that are not yet provided by any transaction and we await for them - wanted_tags: HashMap>, + wanted_tags: HashMap>, /// Transactions waiting for a particular other transaction waiting: HashMap>, } @@ -84,8 +84,8 @@ impl FutureTransactions { // Add all tags that are missing for tag in &tx.missing_tags { - let mut entry = self.wanted_tags.entry(tag.clone()).or_insert_with(Vec::new); - entry.push(tx.transaction.hash.clone()); + let mut entry = self.wanted_tags.entry(tag.clone()).or_insert_with(HashSet::new); + entry.insert(tx.transaction.hash.clone()); } // Add the transaction to a by-hash waiting map @@ -126,6 +126,30 @@ impl FutureTransactions { became_ready } + /// Removes transactions for given list of hashes. + /// + /// Returns a list of actually removed transactions. + pub fn remove(&mut self, hashes: &[Hash]) -> Vec> { + let mut removed = vec![]; + for hash in hashes { + if let Some(waiting_tx) = self.waiting.remove(hash) { + // remove from wanted_tags as well + for tag in waiting_tx.missing_tags { + let remove = if let Some(mut wanted) = self.wanted_tags.get_mut(&tag) { + wanted.remove(hash); + wanted.is_empty() + } else { false }; + if remove { + self.wanted_tags.remove(&tag); + } + } + // add to result + removed.push(waiting_tx.transaction) + } + } + removed + } + /// Returns number of transactions in the Future queue. #[cfg(test)] pub fn len(&self) -> usize { diff --git a/node/transaction-graph/src/lib.rs b/node/transaction-graph/src/lib.rs index 043b801d6d075..85466dada1185 100644 --- a/node/transaction-graph/src/lib.rs +++ b/node/transaction-graph/src/lib.rs @@ -20,6 +20,12 @@ //! and their priority. //! The pool is able to return an iterator that traverses transaction //! graph in the correct order taking into account priorities and dependencies. +//! +//! TODO [ToDr] +//! - [ ] Culling/draining +//! - [ ] Longevity handling (remove obsolete transactions) +//! - [ ] Banning / Future-rotation +//! - [ ] Multi-threading #![warn(missing_docs)] #![warn(unused_extern_crates)] diff --git a/node/transaction-graph/src/pool.rs b/node/transaction-graph/src/pool.rs index 3a2369a7fd78a..698a249cc9606 100644 --- a/node/transaction-graph/src/pool.rs +++ b/node/transaction-graph/src/pool.rs @@ -103,62 +103,103 @@ impl Pool { let tx = WaitingTransaction::new(tx, self.ready.provided_tags()); trace!(target: "txpool", "[{:?}] {:?}", hash, tx); debug!(target: "txpool", "[{:?}] Importing to {}", hash, if tx.is_ready() { "ready" } else { "future" }); - // Tags provided from Ready queue satisfy all requirements, so just add to Ready. - if tx.is_ready() { - let mut promoted = vec![]; - let mut failed = vec![]; - let mut removed = vec![]; - - let mut first = true; - let mut to_import = vec![tx]; - - loop { - // take first transaction from the list - let tx = match to_import.pop() { - Some(tx) => tx, - None => break, - }; - - // find transactions in Future that it unlocks - to_import.append(&mut self.future.satisfy_tags(&tx.transaction.provides)); - - // import this transaction - let hash = tx.transaction.hash.clone(); - match self.ready.import(tx, block_number) { - Ok(mut replaced) => { - if !first { - promoted.push(hash); - } - // The transactions were removed from the ready pool. We might attempt to re-import them. - removed.append(&mut replaced); - }, - // transaction failed to be imported. - Err(e) => if first { - debug!(target: "txpool", "[{:?}] Error importing: {:?}", hash, e); - return Err(e) - } else { - failed.push(hash); - }, - } - first = false; - } - Ok(Imported::Ready { - hash, - promoted, - failed, - removed, - }) - } else { + // If all tags are not satisfied import to future. + if !tx.is_ready() { self.future.import(tx); - Ok(Imported::Future { hash }) + return Ok(Imported::Future { hash }); + } + + let mut promoted = vec![]; + let mut failed = vec![]; + let mut removed = vec![]; + + let mut first = true; + let mut to_import = vec![tx]; + + loop { + // take first transaction from the list + let tx = match to_import.pop() { + Some(tx) => tx, + None => break, + }; + + // find transactions in Future that it unlocks + to_import.append(&mut self.future.satisfy_tags(&tx.transaction.provides)); + + // import this transaction + let current_hash = tx.transaction.hash.clone(); + match self.ready.import(tx, block_number) { + Ok(mut replaced) => { + if !first { + promoted.push(current_hash); + } + // The transactions were removed from the ready pool. We might attempt to re-import them. + removed.append(&mut replaced); + }, + // transaction failed to be imported. + Err(e) => if first { + debug!(target: "txpool", "[{:?}] Error importing: {:?}", current_hash, e); + return Err(e) + } else { + failed.push(current_hash); + }, + } + first = false; + } + + // An edge case when importing transaction caused + // some future transactions to be imported and that + // future transactions pushed out current transaction. + // This means that there is a cycle and the transactions should + // be moved back to future, since we can't resolve it. + if removed.iter().any(|tx| tx.hash == hash) { + // We still need to remove all transactions that we promoted + // since they depend on each other and will never get to the best iterator. + self.ready.remove_invalid(&promoted); + + debug!(target: "txpool", "[{:?}] Cycle detected, bailing.", hash); + bail!(error::ErrorKind::CycleDetected) } + + Ok(Imported::Ready { + hash, + promoted, + failed, + removed, + }) } /// Returns an iterator over ready transactions in the pool. pub fn ready<'a>(&'a self) -> impl Iterator>> + 'a { self.ready.get() } + + /// Removes all transactions represented by the hashes and all other transactions + /// that depend on them. + /// + /// Returns a list of actually removed transactions. + /// NOTE some transactions might still be valid, but were just removed because + /// they were part of a chain, you may attempt to re-import them later. + /// NOTE If you want to just mark ready transactions that were already used + /// and you no longer need to store them use `mark_used` method. + pub fn remove_invalid<'a>(&mut self, hashes: &[Hash]) -> Vec>> where + Hash: 'a, + { + let mut removed = self.ready.remove_invalid(hashes); + removed.extend(self.future.remove(hashes).into_iter().map(Arc::new)); + removed + } + + /// Satisfies given list of tags. + /// + /// This will cause all transactions that provide these tags to be removed from the pool, + /// but unlike `remove_invalid`, dependent transactions are not touched. + /// Additional transactions from future queue might be promoted to ready if you satisfy tags + /// that the pool didn't previously know about. + pub fn satisfy_tags(&mut self, _tags: impl Iterator) { + unimplemented!() + } } #[cfg(test)] @@ -188,6 +229,7 @@ mod tests { // then assert_eq!(pool.ready().count(), 1); + assert_eq!(pool.ready.len(), 1); } #[test] @@ -215,6 +257,7 @@ mod tests { // then assert_eq!(pool.ready().count(), 1); + assert_eq!(pool.ready.len(), 1); } @@ -233,6 +276,7 @@ mod tests { provides: vec![vec![1]], }).unwrap(); assert_eq!(pool.ready().count(), 0); + assert_eq!(pool.ready.len(), 0); pool.import(1, Transaction { ex: UncheckedExtrinsic(vec![2u8]), hash: 2, @@ -244,6 +288,7 @@ mod tests { // then assert_eq!(pool.ready().count(), 2); + assert_eq!(pool.ready.len(), 2); } #[test] @@ -285,6 +330,7 @@ mod tests { provides: vec![], }).unwrap(); assert_eq!(pool.ready().count(), 0); + assert_eq!(pool.ready.len(), 0); let res = pool.import(1, Transaction { ex: UncheckedExtrinsic(vec![5u8]), @@ -333,6 +379,7 @@ mod tests { provides: vec![vec![2]], }).unwrap(); assert_eq!(pool.ready().count(), 0); + assert_eq!(pool.ready.len(), 0); // when pool.import(1, Transaction { @@ -374,4 +421,130 @@ mod tests { }); assert_eq!(pool.future.len(), 0); } + + + #[test] + fn should_handle_a_cycle_with_low_priority() { + // given + let mut pool = pool(); + pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![1u8]), + hash: 1, + priority: 5u64, + longevity: 64u64, + requires: vec![vec![0]], + provides: vec![vec![1]], + }).unwrap(); + pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![3u8]), + hash: 3, + priority: 5u64, + longevity: 64u64, + requires: vec![vec![1]], + provides: vec![vec![2]], + }).unwrap(); + assert_eq!(pool.ready().count(), 0); + assert_eq!(pool.ready.len(), 0); + + // when + pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![2u8]), + hash: 2, + priority: 5u64, + longevity: 64u64, + requires: vec![vec![2]], + provides: vec![vec![0]], + }).unwrap(); + + // then + { + let mut it = pool.ready().into_iter().map(|tx| tx.ex.0[0]); + assert_eq!(it.next(), None); + } + // all transactions occupy the Future queue - it's fine + assert_eq!(pool.future.len(), 3); + + // let's close the cycle with one additional transaction + let err = pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![4u8]), + hash: 4, + priority: 1u64, // lower priority than Tx(2) + longevity: 64u64, + requires: vec![], + provides: vec![vec![0]], + }).unwrap_err(); + let mut it = pool.ready().into_iter().map(|tx| tx.ex.0[0]); + assert_eq!(it.next(), None); + assert_eq!(pool.ready.len(), 0); + assert_eq!(pool.future.len(), 0); + if let error::ErrorKind::CycleDetected = *err.kind() { + } else { + assert!(false, "Invalid error kind: {:?}", err.kind()); + } + } + + #[test] + fn should_remove_invalid_transactions() { + // given + let mut pool = pool(); + pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![5u8]), + hash: 5, + priority: 5u64, + longevity: 64u64, + requires: vec![], + provides: vec![vec![0], vec![4]], + }).unwrap(); + pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![1u8]), + hash: 1, + priority: 5u64, + longevity: 64u64, + requires: vec![vec![0]], + provides: vec![vec![1]], + }).unwrap(); + pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![3u8]), + hash: 3, + priority: 5u64, + longevity: 64u64, + requires: vec![vec![2]], + provides: vec![], + }).unwrap(); + pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![2u8]), + hash: 2, + priority: 5u64, + longevity: 64u64, + requires: vec![vec![1]], + provides: vec![vec![3], vec![2]], + }).unwrap(); + pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![4u8]), + hash: 4, + priority: 1_000u64, + longevity: 64u64, + requires: vec![vec![3], vec![4]], + provides: vec![], + }).unwrap(); + // future + pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![6u8]), + hash: 6, + priority: 1_000u64, + longevity: 64u64, + requires: vec![vec![11]], + provides: vec![], + }).unwrap(); + assert_eq!(pool.ready().count(), 5); + assert_eq!(pool.future.len(), 1); + + // when + pool.remove_invalid(&[6, 1]); + + // then + assert_eq!(pool.ready().count(), 1); + assert_eq!(pool.future.len(), 0); + + } } diff --git a/node/transaction-graph/src/ready.rs b/node/transaction-graph/src/ready.rs index 6dc44b3bcb610..e9697b37b009d 100644 --- a/node/transaction-graph/src/ready.rs +++ b/node/transaction-graph/src/ready.rs @@ -175,6 +175,38 @@ impl ReadyTransactions { self.ready.contains_key(hash) } + /// Removes invalid transactions from the ready pool. + /// + /// NOTE removing a transaction will also cause a removal of all transactions that depend on that one + /// (i.e. the entire subgraph that this transaction is a start of will be removed). + /// All removed transactions are returned. + pub fn remove_invalid(&mut self, hashes: &[Hash]) -> Vec>> { + let mut removed = vec![]; + let mut to_remove = hashes.iter().cloned().collect::>(); + + loop { + let hash = match to_remove.pop() { + Some(hash) => hash, + None => return removed, + }; + + if let Some(mut tx) = self.ready.remove(&hash) { + // remove entries from provided_tags + for tag in &tx.transaction.transaction.provides { + self.provided_tags.remove(tag); + } + // remove from best + self.best.remove(&tx.transaction); + + // remove all transactions that the current one unlocks + to_remove.append(&mut tx.unlocks); + + // add to removed + removed.push(tx.transaction.transaction); + } + } + } + /// Checks if the transaction is providing the same tags as other transactions. /// /// In case that's true it determines if the priority of transactions that @@ -237,6 +269,11 @@ impl ReadyTransactions { } } + /// Returns number of transactions in this queue. + #[cfg(test)] + pub fn len(&self) -> usize { + self.ready.len() + } } From d90876404f6da62a10d2fcb5474a06131ff5d6e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 21 Sep 2018 15:05:58 +0200 Subject: [PATCH 08/14] Implement ready transactions pruning. --- node/transaction-graph/src/future.rs | 9 +- node/transaction-graph/src/pool.rs | 141 ++++++++++++++++++++++++--- node/transaction-graph/src/ready.rs | 107 ++++++++++++++++++-- 3 files changed, 231 insertions(+), 26 deletions(-) diff --git a/node/transaction-graph/src/future.rs b/node/transaction-graph/src/future.rs index 75be9cce4b324..03e6f282222f1 100644 --- a/node/transaction-graph/src/future.rs +++ b/node/transaction-graph/src/future.rs @@ -101,22 +101,21 @@ impl FutureTransactions { /// /// Returns (and removes) transactions that became ready after their last tag got /// satisfied and now we can remove them from Future and move to Ready queue. - pub fn satisfy_tags(&mut self, tags: &[Tag]) -> Vec> { + pub fn satisfy_tags>(&mut self, tags: impl IntoIterator) -> Vec> { let mut became_ready = vec![]; for tag in tags { - if let Some(hashes) = self.wanted_tags.remove(tag) { + if let Some(hashes) = self.wanted_tags.remove(tag.as_ref()) { for hash in hashes { let is_ready = { let mut tx = self.waiting.get_mut(&hash) .expect("Every transaction in wanted_tags is present in waiting; qed"); - tx.satisfy_tag(tag); + tx.satisfy_tag(tag.as_ref()); tx.is_ready() }; if is_ready { - let tx = self.waiting.remove(&hash) - .expect("We just get_mut the entry; qed"); + let tx = self.waiting.remove(&hash).expect("We just get_mut the entry; qed"); became_ready.push(tx); } } diff --git a/node/transaction-graph/src/pool.rs b/node/transaction-graph/src/pool.rs index 698a249cc9606..b3e1300502707 100644 --- a/node/transaction-graph/src/pool.rs +++ b/node/transaction-graph/src/pool.rs @@ -54,6 +54,17 @@ pub enum Imported { } } +/// Status of pruning the queue. +#[derive(Debug)] +pub struct PruneStatus { + /// A list of imports that satisfying the tag triggered. + pub promoted: Vec>, + /// A list of transactions that failed to be promoted and now are discarded. + pub failed: Vec, + /// A list of transactions that got pruned from the ready queue. + pub pruned: Vec>>, +} + /// Immutable transaction #[cfg_attr(test, derive(Clone))] #[derive(Debug, PartialEq, Eq)] @@ -76,6 +87,12 @@ pub struct Transaction { /// /// Builds a dependency graph for all transactions in the pool and returns /// the ones that are currently ready to be executed. +/// +/// General note: +/// If function returns some transactions it usually means that importing them +/// as-is for the second time will fail or produce unwanted results. +/// Most likely it is required to revalidate them and recompute set of +/// required tags. #[derive(Default, Debug)] pub struct Pool { future: FutureTransactions, @@ -95,21 +112,29 @@ impl Pool { block_number: BlockNumber, tx: Transaction, ) -> error::Result> { - let hash = tx.hash.clone(); - if self.future.contains(&hash) || self.ready.contains(&hash) { + if self.future.contains(&tx.hash) || self.ready.contains(&tx.hash) { bail!(error::ErrorKind::AlreadyImported) } let tx = WaitingTransaction::new(tx, self.ready.provided_tags()); - trace!(target: "txpool", "[{:?}] {:?}", hash, tx); - debug!(target: "txpool", "[{:?}] Importing to {}", hash, if tx.is_ready() { "ready" } else { "future" }); + trace!(target: "txpool", "[{:?}] {:?}", tx.transaction.hash, tx); + debug!(target: "txpool", "[{:?}] Importing to {}", tx.transaction.hash, if tx.is_ready() { "ready" } else { "future" }); // If all tags are not satisfied import to future. if !tx.is_ready() { + let hash = tx.transaction.hash.clone(); self.future.import(tx); return Ok(Imported::Future { hash }); } + self.import_to_ready(block_number, tx) + } + + /// Imports transaction to ready queue. + /// + /// NOTE the transaction has to have all requirements satisfied. + fn import_to_ready(&mut self, block_number: BlockNumber, tx: WaitingTransaction) -> error::Result> { + let hash = tx.transaction.hash.clone(); let mut promoted = vec![]; let mut failed = vec![]; let mut removed = vec![]; @@ -129,7 +154,7 @@ impl Pool { // import this transaction let current_hash = tx.transaction.hash.clone(); - match self.ready.import(tx, block_number) { + match self.ready.import(block_number, tx) { Ok(mut replaced) => { if !first { promoted.push(current_hash); @@ -183,22 +208,47 @@ impl Pool { /// they were part of a chain, you may attempt to re-import them later. /// NOTE If you want to just mark ready transactions that were already used /// and you no longer need to store them use `mark_used` method. - pub fn remove_invalid<'a>(&mut self, hashes: &[Hash]) -> Vec>> where - Hash: 'a, - { + pub fn remove_invalid(&mut self, hashes: &[Hash]) -> Vec>> { let mut removed = self.ready.remove_invalid(hashes); removed.extend(self.future.remove(hashes).into_iter().map(Arc::new)); removed } - /// Satisfies given list of tags. + /// Prunes transactions that provide given list of tags. /// /// This will cause all transactions that provide these tags to be removed from the pool, /// but unlike `remove_invalid`, dependent transactions are not touched. /// Additional transactions from future queue might be promoted to ready if you satisfy tags /// that the pool didn't previously know about. - pub fn satisfy_tags(&mut self, _tags: impl Iterator) { - unimplemented!() + pub fn prune_tags(&mut self, block_number: BlockNumber, tags: impl IntoIterator) -> PruneStatus { + let mut to_import = vec![]; + let mut pruned = vec![]; + + for tag in tags { + // make sure to promote any future transactions that could be unlocked + to_import.append(&mut self.future.satisfy_tags(::std::iter::once(&tag))); + // and actually prune transactions in ready queue + pruned.append(&mut self.ready.prune_tags(tag)); + } + + let mut promoted = vec![]; + let mut failed = vec![]; + for tx in to_import { + let hash = tx.transaction.hash.clone(); + match self.import_to_ready(block_number, tx) { + Ok(res) => promoted.push(res), + Err(e) => { + warn!(target: "txpool", "[{:?}] Failed to promote during pruning: {:?}", hash, e); + failed.push(hash) + }, + } + } + + PruneStatus { + pruned, + failed, + promoted, + } } } @@ -545,6 +595,75 @@ mod tests { // then assert_eq!(pool.ready().count(), 1); assert_eq!(pool.future.len(), 0); + } + + + #[test] + fn should_prune_ready_transactions() { + // given + let mut pool = pool(); + // future (waiting for 0) + pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![5u8]), + hash: 5, + priority: 5u64, + longevity: 64u64, + requires: vec![vec![0]], + provides: vec![vec![100]], + }).unwrap(); + // ready + pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![1u8]), + hash: 1, + priority: 5u64, + longevity: 64u64, + requires: vec![], + provides: vec![vec![1]], + }).unwrap(); + pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![2u8]), + hash: 2, + priority: 5u64, + longevity: 64u64, + requires: vec![vec![2]], + provides: vec![vec![3]], + }).unwrap(); + pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![3u8]), + hash: 3, + priority: 5u64, + longevity: 64u64, + requires: vec![vec![1]], + provides: vec![vec![2]], + }).unwrap(); + pool.import(1, Transaction { + ex: UncheckedExtrinsic(vec![4u8]), + hash: 4, + priority: 1_000u64, + longevity: 64u64, + requires: vec![vec![3], vec![2]], + provides: vec![vec![4]], + }).unwrap(); + + assert_eq!(pool.ready().count(), 4); + assert_eq!(pool.future.len(), 1); + // when + let result = pool.prune_tags(1, vec![vec![0], vec![2]]); + + // then + assert_eq!(result.pruned.len(), 2); + assert_eq!(result.failed.len(), 0); + assert_eq!(result.promoted[0], Imported::Ready { + hash: 5, + promoted: vec![], + failed: vec![], + removed: vec![], + }); + assert_eq!(result.promoted.len(), 1); + assert_eq!(pool.future.len(), 0); + assert_eq!(pool.ready.len(), 3); + assert_eq!(pool.ready().count(), 3); } + } diff --git a/node/transaction-graph/src/ready.rs b/node/transaction-graph/src/ready.rs index e9697b37b009d..71621210522e5 100644 --- a/node/transaction-graph/src/ready.rs +++ b/node/transaction-graph/src/ready.rs @@ -59,8 +59,15 @@ impl Eq for TransactionRef {} #[derive(Debug)] struct ReadyTx { + /// A reference to a transaction pub transaction: TransactionRef, + /// A list of transactions that get unlocked by this one pub unlocks: Vec, + /// How many required tags are provided inherently + /// + /// Some transactions might be already pruned from the queue, + /// so when we compute ready set we may consider this transactions ready earlier. + pub requires_offset: usize, } const HASH_READY: &str = "Every hash is in ready map; qed"; @@ -120,8 +127,8 @@ impl ReadyTransactions { /// that are in this queue. pub fn import( &mut self, - tx: WaitingTransaction, block_number: BlockNumber, + tx: WaitingTransaction, ) -> error::Result>>> { assert!(tx.is_ready(), "Only ready transactions can be imported."); assert!(!self.ready.contains_key(&tx.transaction.hash), "Transaction is already imported."); @@ -165,6 +172,7 @@ impl ReadyTransactions { self.ready.insert(hash, ReadyTx { transaction, unlocks: vec![], + requires_offset: 0, }); Ok(replaced) @@ -195,6 +203,9 @@ impl ReadyTransactions { for tag in &tx.transaction.transaction.provides { self.provided_tags.remove(tag); } + // remove from unlocks? + // TODO [ToDr] + // remove from best self.best.remove(&tx.transaction); @@ -207,6 +218,75 @@ impl ReadyTransactions { } } + /// Removes transactions that provide given tag. + /// + /// All transactions that lead to a transaction, which provides this tag + /// are going to be removed from the queue, but no other transactions are touched - + /// i.e. all other subgraphs starting from given tag are still considered valid & ready. + pub fn prune_tags(&mut self, tag: Tag) -> Vec>> { + let mut removed = vec![]; + let mut to_remove = vec![tag]; + + loop { + let tag = match to_remove.pop() { + Some(tag) => tag, + None => return removed, + }; + + let res = self.provided_tags.remove(&tag) + .and_then(|hash| self.ready.remove(&hash)); + + if let Some(tx) = res { + let unlocks = tx.unlocks; + let tx = tx.transaction.transaction; + + // prune previous transactions as well + { + let hash = &tx.hash; + let mut find_previous = |tag| -> Option> { + let prev_hash = self.provided_tags.get(tag)?; + let tx2 = self.ready.get_mut(&prev_hash)?; + remove_item(&mut tx2.unlocks, hash); + // We eagerly prune previous transactions as well. + // But it might not always be good. + // Possible edge case: + // - tx provides two tags + // - the second tag enables some subgraph we don't know of yet + // - we will prune the transaction + // - when we learn about the subgraph it will go to future + // - we will have to wait for re-propagation of that transaction + // Alternatively the caller may attempt to re-import these transactions. + if tx2.unlocks.is_empty() { + Some(tx2.transaction.transaction.provides.clone()) + } else { + None + } + }; + + // find previous transactions + for tag in &tx.requires { + if let Some(mut tags_to_remove) = find_previous(tag) { + to_remove.append(&mut tags_to_remove); + } + } + } + + // add the transactions that just got unlocked to `best` + for hash in unlocks { + if let Some(tx) = self.ready.get_mut(&hash) { + tx.requires_offset += 1; + // this transaction is ready + if tx.requires_offset == tx.transaction.transaction.requires.len() { + self.best.insert(tx.transaction.clone()); + } + } + } + + removed.push(tx); + } + } + } + /// Checks if the transaction is providing the same tags as other transactions. /// /// In case that's true it determines if the priority of transactions that @@ -319,7 +399,7 @@ impl<'a, Hash: 'a + hash::Hash + Eq + Clone> Iterator for BestIterator<'a, Hash> self.best_or_awaiting(satisfied, tx_ref); // then get from the pool } else if let Some(next) = self.all.get(hash) { - self.best_or_awaiting(1, next.transaction.clone()); + self.best_or_awaiting(next.requires_offset + 1, next.transaction.clone()); } } @@ -327,6 +407,13 @@ impl<'a, Hash: 'a + hash::Hash + Eq + Clone> Iterator for BestIterator<'a, Hash> } } +// See: https://github.com/rust-lang/rust/issues/40062 +fn remove_item(vec: &mut Vec, item: &T) { + if let Some(idx) = vec.iter().position(|i| i == item) { + vec.swap_remove(idx); + } +} + #[cfg(test)] mod tests { use super::*; @@ -359,18 +446,18 @@ mod tests { // when let x = WaitingTransaction::new(tx2, &ready.provided_tags()); - ready.import(x, block_number).unwrap(); + ready.import(block_number, x).unwrap(); let x = WaitingTransaction::new(tx3, &ready.provided_tags()); - ready.import(x, block_number).unwrap(); + ready.import(block_number, x).unwrap(); assert_eq!(ready.get().count(), 2); // too low priority let x = WaitingTransaction::new(tx1.clone(), &ready.provided_tags()); - ready.import(x, block_number).unwrap_err(); + ready.import(block_number, x).unwrap_err(); tx1.priority = 10; let x = WaitingTransaction::new(tx1.clone(), &ready.provided_tags()); - ready.import(x, block_number).unwrap(); + ready.import(block_number, x).unwrap(); // then assert_eq!(ready.get().count(), 1); @@ -396,13 +483,13 @@ mod tests { // when let x = WaitingTransaction::new(tx1, &ready.provided_tags()); - ready.import(x, block_number).unwrap(); + ready.import(block_number, x).unwrap(); let x = WaitingTransaction::new(tx2, &ready.provided_tags()); - ready.import(x, block_number).unwrap(); + ready.import(block_number, x).unwrap(); let x = WaitingTransaction::new(tx3, &ready.provided_tags()); - ready.import(x, block_number).unwrap(); + ready.import(block_number, x).unwrap(); let x = WaitingTransaction::new(tx4, &ready.provided_tags()); - ready.import(x, block_number).unwrap(); + ready.import(block_number, x).unwrap(); // then assert_eq!(ready.best.len(), 1); From f0908c994817cb4f49a2ce9e453fe606e8408d66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 21 Sep 2018 15:38:13 +0200 Subject: [PATCH 09/14] Move & rename. --- Cargo.toml | 4 ++-- {node => core}/transaction-graph/Cargo.toml | 2 +- {node => core}/transaction-graph/src/error.rs | 0 {node => core}/transaction-graph/src/future.rs | 0 {node => core}/transaction-graph/src/lib.rs | 7 +++---- {node => core}/transaction-graph/src/pool.rs | 0 {node => core}/transaction-graph/src/ready.rs | 0 7 files changed, 6 insertions(+), 7 deletions(-) rename {node => core}/transaction-graph/Cargo.toml (86%) rename {node => core}/transaction-graph/src/error.rs (100%) rename {node => core}/transaction-graph/src/future.rs (100%) rename {node => core}/transaction-graph/src/lib.rs (83%) rename {node => core}/transaction-graph/src/pool.rs (100%) rename {node => core}/transaction-graph/src/ready.rs (100%) diff --git a/Cargo.toml b/Cargo.toml index af0724d6c71bd..3eb25679835f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,6 @@ members = [ "core/client", "core/client/db", "core/executor", - "core/transaction-pool", "core/keyring", "core/misbehavior-check", "core/network", @@ -37,6 +36,8 @@ members = [ "core/sr-sandbox", "core/sr-std", "core/sr-version", + "core/transaction-graph", + "core/transaction-pool", "srml/support", "srml/balances", "srml/consensus", @@ -68,7 +69,6 @@ members = [ "node/runtime", "node/service", "node/transaction-pool", - "node/transaction-graph", "subkey", ] exclude = [ diff --git a/node/transaction-graph/Cargo.toml b/core/transaction-graph/Cargo.toml similarity index 86% rename from node/transaction-graph/Cargo.toml rename to core/transaction-graph/Cargo.toml index 482674ec90dfc..f3a6ebfc469b7 100644 --- a/node/transaction-graph/Cargo.toml +++ b/core/transaction-graph/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "node-transaction-graph" +name = "substrate-transaction-graph" version = "0.1.0" authors = ["Parity Technologies "] diff --git a/node/transaction-graph/src/error.rs b/core/transaction-graph/src/error.rs similarity index 100% rename from node/transaction-graph/src/error.rs rename to core/transaction-graph/src/error.rs diff --git a/node/transaction-graph/src/future.rs b/core/transaction-graph/src/future.rs similarity index 100% rename from node/transaction-graph/src/future.rs rename to core/transaction-graph/src/future.rs diff --git a/node/transaction-graph/src/lib.rs b/core/transaction-graph/src/lib.rs similarity index 83% rename from node/transaction-graph/src/lib.rs rename to core/transaction-graph/src/lib.rs index 85466dada1185..27e2eaa530758 100644 --- a/node/transaction-graph/src/lib.rs +++ b/core/transaction-graph/src/lib.rs @@ -22,10 +22,9 @@ //! graph in the correct order taking into account priorities and dependencies. //! //! TODO [ToDr] -//! - [ ] Culling/draining -//! - [ ] Longevity handling (remove obsolete transactions) -//! - [ ] Banning / Future-rotation -//! - [ ] Multi-threading +//! - [ ] Longevity handling (remove obsolete transactions periodically) +//! - [ ] Banning / Future-rotation (once rejected (as invalid) should not be accepted for some time) +//! - [ ] Multi-threading (getting ready transactions should not block the pool) #![warn(missing_docs)] #![warn(unused_extern_crates)] diff --git a/node/transaction-graph/src/pool.rs b/core/transaction-graph/src/pool.rs similarity index 100% rename from node/transaction-graph/src/pool.rs rename to core/transaction-graph/src/pool.rs diff --git a/node/transaction-graph/src/ready.rs b/core/transaction-graph/src/ready.rs similarity index 100% rename from node/transaction-graph/src/ready.rs rename to core/transaction-graph/src/ready.rs From 2ba8bba0474ae6070047d0829d5e3c99d85155f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 21 Sep 2018 15:42:40 +0200 Subject: [PATCH 10/14] Add some logs. --- Cargo.lock | 19 +++++++++---------- core/transaction-graph/Cargo.toml | 3 +-- core/transaction-graph/src/lib.rs | 1 - core/transaction-graph/src/ready.rs | 12 ++++++++++-- 4 files changed, 20 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5a7742cef3887..7c421ad96947a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1597,16 +1597,6 @@ dependencies = [ "tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "node-transaction-graph" -version = "0.1.0" -dependencies = [ - "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", - "node-primitives 0.1.0", - "sr-primitives 0.1.0", -] - [[package]] name = "node-transaction-pool" version = "0.1.0" @@ -3089,6 +3079,15 @@ dependencies = [ "substrate-primitives 0.1.0", ] +[[package]] +name = "substrate-transaction-graph" +version = "0.1.0" +dependencies = [ + "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "sr-primitives 0.1.0", +] + [[package]] name = "substrate-transaction-pool" version = "0.1.0" diff --git a/core/transaction-graph/Cargo.toml b/core/transaction-graph/Cargo.toml index f3a6ebfc469b7..7b1f7b7246aa1 100644 --- a/core/transaction-graph/Cargo.toml +++ b/core/transaction-graph/Cargo.toml @@ -6,5 +6,4 @@ authors = ["Parity Technologies "] [dependencies] error-chain = "0.12" log = "0.3.0" -node-primitives = { path = "../primitives" } -sr-primitives = { path = "../../core/sr-primitives" } +sr-primitives = { path = "../sr-primitives" } diff --git a/core/transaction-graph/src/lib.rs b/core/transaction-graph/src/lib.rs index 27e2eaa530758..a33c42a693e90 100644 --- a/core/transaction-graph/src/lib.rs +++ b/core/transaction-graph/src/lib.rs @@ -30,7 +30,6 @@ #![warn(unused_extern_crates)] extern crate sr_primitives; -extern crate node_primitives as primitives; #[macro_use] extern crate error_chain; diff --git a/core/transaction-graph/src/ready.rs b/core/transaction-graph/src/ready.rs index 71621210522e5..e1eb48e5efbf9 100644 --- a/core/transaction-graph/src/ready.rs +++ b/core/transaction-graph/src/ready.rs @@ -203,8 +203,14 @@ impl ReadyTransactions { for tag in &tx.transaction.transaction.provides { self.provided_tags.remove(tag); } - // remove from unlocks? - // TODO [ToDr] + // remove from unlocks + for tag in &tx.transaction.transaction.requires { + if let Some(hash) = self.provided_tags.get(tag) { + if let Some(tx) = self.ready.get_mut(hash) { + remove_item(&mut tx.unlocks, &hash); + } + } + } // remove from best self.best.remove(&tx.transaction); @@ -213,6 +219,7 @@ impl ReadyTransactions { to_remove.append(&mut tx.unlocks); // add to removed + debug!(target: "txpool", "[{:?}] Removed as invalid: ", hash); removed.push(tx.transaction.transaction); } } @@ -282,6 +289,7 @@ impl ReadyTransactions { } } + debug!(target: "txpool", "[{:?}] Pruned.", hash); removed.push(tx); } } From 8726982b37de5dd9efc636524372bd75d88003ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 21 Sep 2018 15:46:40 +0200 Subject: [PATCH 11/14] Clean up deps. --- core/transaction-graph/src/pool.rs | 71 ++++++++++++++--------------- core/transaction-graph/src/ready.rs | 9 ++-- 2 files changed, 39 insertions(+), 41 deletions(-) diff --git a/core/transaction-graph/src/pool.rs b/core/transaction-graph/src/pool.rs index b3e1300502707..bdc5ab5a542b9 100644 --- a/core/transaction-graph/src/pool.rs +++ b/core/transaction-graph/src/pool.rs @@ -20,7 +20,6 @@ use std::{ sync::Arc, }; -use primitives::UncheckedExtrinsic; use sr_primitives::transaction_validity::{ TransactionTag as Tag, TransactionLongevity as Longevity, @@ -70,7 +69,7 @@ pub struct PruneStatus { #[derive(Debug, PartialEq, Eq)] pub struct Transaction { /// Raw extrinsic representing that transaction. - pub ex: UncheckedExtrinsic, + pub ex: Vec, /// Transaction hash (unique) pub hash: Hash, /// Transaction priority (higher = better) @@ -269,7 +268,7 @@ mod tests { // when pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![1u8]), + ex: vec![1u8], hash: 1u64, priority: 5u64, longevity: 64u64, @@ -289,7 +288,7 @@ mod tests { // when pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![1u8]), + ex: vec![1u8], hash: 1, priority: 5u64, longevity: 64u64, @@ -297,7 +296,7 @@ mod tests { provides: vec![vec![1]], }).unwrap(); pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![1u8]), + ex: vec![1u8], hash: 1, priority: 5u64, longevity: 64u64, @@ -318,7 +317,7 @@ mod tests { // when pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![1u8]), + ex: vec![1u8], hash: 1, priority: 5u64, longevity: 64u64, @@ -328,7 +327,7 @@ mod tests { assert_eq!(pool.ready().count(), 0); assert_eq!(pool.ready.len(), 0); pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![2u8]), + ex: vec![2u8], hash: 2, priority: 5u64, longevity: 64u64, @@ -348,7 +347,7 @@ mod tests { // when pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![1u8]), + ex: vec![1u8], hash: 1, priority: 5u64, longevity: 64u64, @@ -356,7 +355,7 @@ mod tests { provides: vec![vec![1]], }).unwrap(); pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![3u8]), + ex: vec![3u8], hash: 3, priority: 5u64, longevity: 64u64, @@ -364,7 +363,7 @@ mod tests { provides: vec![], }).unwrap(); pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![2u8]), + ex: vec![2u8], hash: 2, priority: 5u64, longevity: 64u64, @@ -372,7 +371,7 @@ mod tests { provides: vec![vec![3], vec![2]], }).unwrap(); pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![4u8]), + ex: vec![4u8], hash: 4, priority: 1_000u64, longevity: 64u64, @@ -383,7 +382,7 @@ mod tests { assert_eq!(pool.ready.len(), 0); let res = pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![5u8]), + ex: vec![5u8], hash: 5, priority: 5u64, longevity: 64u64, @@ -392,7 +391,7 @@ mod tests { }).unwrap(); // then - let mut it = pool.ready().into_iter().map(|tx| tx.ex.0[0]); + let mut it = pool.ready().into_iter().map(|tx| tx.ex[0]); assert_eq!(it.next(), Some(5)); assert_eq!(it.next(), Some(1)); @@ -413,7 +412,7 @@ mod tests { // given let mut pool = pool(); pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![1u8]), + ex: vec![1u8], hash: 1, priority: 5u64, longevity: 64u64, @@ -421,7 +420,7 @@ mod tests { provides: vec![vec![1]], }).unwrap(); pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![3u8]), + ex: vec![3u8], hash: 3, priority: 5u64, longevity: 64u64, @@ -433,7 +432,7 @@ mod tests { // when pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![2u8]), + ex: vec![2u8], hash: 2, priority: 5u64, longevity: 64u64, @@ -443,7 +442,7 @@ mod tests { // then { - let mut it = pool.ready().into_iter().map(|tx| tx.ex.0[0]); + let mut it = pool.ready().into_iter().map(|tx| tx.ex[0]); assert_eq!(it.next(), None); } // all transactions occupy the Future queue - it's fine @@ -451,14 +450,14 @@ mod tests { // let's close the cycle with one additional transaction let res = pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![4u8]), + ex: vec![4u8], hash: 4, priority: 50u64, longevity: 64u64, requires: vec![], provides: vec![vec![0]], }).unwrap(); - let mut it = pool.ready().into_iter().map(|tx| tx.ex.0[0]); + let mut it = pool.ready().into_iter().map(|tx| tx.ex[0]); assert_eq!(it.next(), Some(4)); assert_eq!(it.next(), Some(1)); assert_eq!(it.next(), Some(3)); @@ -478,7 +477,7 @@ mod tests { // given let mut pool = pool(); pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![1u8]), + ex: vec![1u8], hash: 1, priority: 5u64, longevity: 64u64, @@ -486,7 +485,7 @@ mod tests { provides: vec![vec![1]], }).unwrap(); pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![3u8]), + ex: vec![3u8], hash: 3, priority: 5u64, longevity: 64u64, @@ -498,7 +497,7 @@ mod tests { // when pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![2u8]), + ex: vec![2u8], hash: 2, priority: 5u64, longevity: 64u64, @@ -508,7 +507,7 @@ mod tests { // then { - let mut it = pool.ready().into_iter().map(|tx| tx.ex.0[0]); + let mut it = pool.ready().into_iter().map(|tx| tx.ex[0]); assert_eq!(it.next(), None); } // all transactions occupy the Future queue - it's fine @@ -516,14 +515,14 @@ mod tests { // let's close the cycle with one additional transaction let err = pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![4u8]), + ex: vec![4u8], hash: 4, priority: 1u64, // lower priority than Tx(2) longevity: 64u64, requires: vec![], provides: vec![vec![0]], }).unwrap_err(); - let mut it = pool.ready().into_iter().map(|tx| tx.ex.0[0]); + let mut it = pool.ready().into_iter().map(|tx| tx.ex[0]); assert_eq!(it.next(), None); assert_eq!(pool.ready.len(), 0); assert_eq!(pool.future.len(), 0); @@ -538,7 +537,7 @@ mod tests { // given let mut pool = pool(); pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![5u8]), + ex: vec![5u8], hash: 5, priority: 5u64, longevity: 64u64, @@ -546,7 +545,7 @@ mod tests { provides: vec![vec![0], vec![4]], }).unwrap(); pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![1u8]), + ex: vec![1u8], hash: 1, priority: 5u64, longevity: 64u64, @@ -554,7 +553,7 @@ mod tests { provides: vec![vec![1]], }).unwrap(); pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![3u8]), + ex: vec![3u8], hash: 3, priority: 5u64, longevity: 64u64, @@ -562,7 +561,7 @@ mod tests { provides: vec![], }).unwrap(); pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![2u8]), + ex: vec![2u8], hash: 2, priority: 5u64, longevity: 64u64, @@ -570,7 +569,7 @@ mod tests { provides: vec![vec![3], vec![2]], }).unwrap(); pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![4u8]), + ex: vec![4u8], hash: 4, priority: 1_000u64, longevity: 64u64, @@ -579,7 +578,7 @@ mod tests { }).unwrap(); // future pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![6u8]), + ex: vec![6u8], hash: 6, priority: 1_000u64, longevity: 64u64, @@ -604,7 +603,7 @@ mod tests { let mut pool = pool(); // future (waiting for 0) pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![5u8]), + ex: vec![5u8], hash: 5, priority: 5u64, longevity: 64u64, @@ -613,7 +612,7 @@ mod tests { }).unwrap(); // ready pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![1u8]), + ex: vec![1u8], hash: 1, priority: 5u64, longevity: 64u64, @@ -621,7 +620,7 @@ mod tests { provides: vec![vec![1]], }).unwrap(); pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![2u8]), + ex: vec![2u8], hash: 2, priority: 5u64, longevity: 64u64, @@ -629,7 +628,7 @@ mod tests { provides: vec![vec![3]], }).unwrap(); pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![3u8]), + ex: vec![3u8], hash: 3, priority: 5u64, longevity: 64u64, @@ -637,7 +636,7 @@ mod tests { provides: vec![vec![2]], }).unwrap(); pool.import(1, Transaction { - ex: UncheckedExtrinsic(vec![4u8]), + ex: vec![4u8], hash: 4, priority: 1_000u64, longevity: 64u64, diff --git a/core/transaction-graph/src/ready.rs b/core/transaction-graph/src/ready.rs index e1eb48e5efbf9..5f188ecb04752 100644 --- a/core/transaction-graph/src/ready.rs +++ b/core/transaction-graph/src/ready.rs @@ -96,7 +96,7 @@ impl Default for ReadyTransactions { } } -impl ReadyTransactions { +impl ReadyTransactions { /// Borrows a map of tags that are provided by transactions in this queue. pub fn provided_tags(&self) -> &HashMap { &self.provided_tags @@ -289,7 +289,7 @@ impl ReadyTransactions { } } - debug!(target: "txpool", "[{:?}] Pruned.", hash); + debug!(target: "txpool", "[{:?}] Pruned.", tx.hash); removed.push(tx); } } @@ -425,11 +425,10 @@ fn remove_item(vec: &mut Vec, item: &T) { #[cfg(test)] mod tests { use super::*; - use primitives::UncheckedExtrinsic; fn tx(id: u8) -> Transaction { Transaction { - ex: UncheckedExtrinsic(vec![id]), + ex: vec![id], hash: id as u64, priority: 1, longevity: 2, @@ -502,7 +501,7 @@ mod tests { // then assert_eq!(ready.best.len(), 1); - let mut it = ready.get().map(|tx| tx.ex.0[0]); + let mut it = ready.get().map(|tx| tx.ex[0]); assert_eq!(it.next(), Some(1)); assert_eq!(it.next(), Some(2)); From e9d3553b0cc7b1816193fce9e3bd75c60db89a4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 21 Sep 2018 15:52:44 +0200 Subject: [PATCH 12/14] Use Member trait. --- core/transaction-graph/src/pool.rs | 4 ++-- core/transaction-graph/src/ready.rs | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/transaction-graph/src/pool.rs b/core/transaction-graph/src/pool.rs index bdc5ab5a542b9..26509d94e98a6 100644 --- a/core/transaction-graph/src/pool.rs +++ b/core/transaction-graph/src/pool.rs @@ -15,11 +15,11 @@ // along with Substrate. If not, see . use std::{ - fmt, hash, sync::Arc, }; +use sr_primitives::traits::Member; use sr_primitives::transaction_validity::{ TransactionTag as Tag, TransactionLongevity as Longevity, @@ -98,7 +98,7 @@ pub struct Pool { ready: ReadyTransactions, } -impl Pool { +impl Pool { /// Imports transaction to the pool. /// /// The pool consists of two parts: Future and Ready. diff --git a/core/transaction-graph/src/ready.rs b/core/transaction-graph/src/ready.rs index 5f188ecb04752..5dc9b3dfa61fd 100644 --- a/core/transaction-graph/src/ready.rs +++ b/core/transaction-graph/src/ready.rs @@ -21,6 +21,7 @@ use std::{ sync::Arc, }; +use sr_primitives::traits::Member; use sr_primitives::transaction_validity::{ TransactionTag as Tag, }; @@ -96,7 +97,7 @@ impl Default for ReadyTransactions { } } -impl ReadyTransactions { +impl ReadyTransactions { /// Borrows a map of tags that are provided by transactions in this queue. pub fn provided_tags(&self) -> &HashMap { &self.provided_tags @@ -371,7 +372,7 @@ pub struct BestIterator<'a, Hash: 'a> { best: BTreeSet>, } -impl<'a, Hash: 'a + hash:: Hash + Eq + Clone> BestIterator<'a, Hash> { +impl<'a, Hash: 'a + hash::Hash + Member> BestIterator<'a, Hash> { /// Depending on number of satisfied requirements insert given ref /// either to awaiting set or to best set. fn best_or_awaiting(&mut self, satisfied: usize, tx_ref: TransactionRef) { @@ -386,7 +387,7 @@ impl<'a, Hash: 'a + hash:: Hash + Eq + Clone> BestIterator<'a, Hash> { } } -impl<'a, Hash: 'a + hash::Hash + Eq + Clone> Iterator for BestIterator<'a, Hash> { +impl<'a, Hash: 'a + hash::Hash + Member> Iterator for BestIterator<'a, Hash> { type Item = Arc>; fn next(&mut self) -> Option { From 03e7fa2ead165b12e0410ecd415a12aeea924cb7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 21 Sep 2018 16:15:58 +0200 Subject: [PATCH 13/14] Add missing docs, elaborate on the proof. --- core/transaction-graph/src/future.rs | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/core/transaction-graph/src/future.rs b/core/transaction-graph/src/future.rs index 03e6f282222f1..e69358daa7075 100644 --- a/core/transaction-graph/src/future.rs +++ b/core/transaction-graph/src/future.rs @@ -25,13 +25,20 @@ use sr_primitives::transaction_validity::{ use pool::Transaction; +/// Transaction with partially satisfied dependencies. #[derive(Debug)] pub struct WaitingTransaction { + /// Transaction details. pub transaction: Transaction, + /// Tags that are required and have not been satisfied yet by other transactions in the pool. pub missing_tags: HashSet, } impl WaitingTransaction { + /// Creates a new `WaitingTransaction`. + /// + /// Computes the set of missing tags based on the requirements and tags that + /// are provided by all transactions in the ready queue. pub fn new(transaction: Transaction, provided: &HashMap) -> Self { let missing_tags = transaction.requires .iter() @@ -45,15 +52,21 @@ impl WaitingTransaction { } } + /// Marks the tag as satisfied. pub fn satisfy_tag(&mut self, tag: &Tag) { self.missing_tags.remove(tag); } + /// Returns true if transaction has all requirements satisfied. pub fn is_ready(&self) -> bool { self.missing_tags.is_empty() } } +/// A pool of transactions that are not yet ready to be included in the block. +/// +/// Contains transactions that are still awaiting for some other transactions that +/// could provide a tag that they require. #[derive(Debug)] pub struct FutureTransactions { /// tags that are not yet provided by any transaction and we await for them @@ -71,6 +84,13 @@ impl Default for FutureTransactions { } } +const WAITING_PROOF: &str = r"# +In import we always insert to `waiting` if we push to `wanted_tags`; +when removing from `waiting` we always clear `wanted_tags`; +every hash from `wanted_tags` is always present in `waiting`; +qed +#"; + impl FutureTransactions { /// Import transaction to Future queue. /// @@ -109,13 +129,13 @@ impl FutureTransactions { for hash in hashes { let is_ready = { let mut tx = self.waiting.get_mut(&hash) - .expect("Every transaction in wanted_tags is present in waiting; qed"); + .expect(WAITING_PROOF); tx.satisfy_tag(tag.as_ref()); tx.is_ready() }; if is_ready { - let tx = self.waiting.remove(&hash).expect("We just get_mut the entry; qed"); + let tx = self.waiting.remove(&hash).expect(WAITING_PROOF); became_ready.push(tx); } } From 508fd63451ae7be5201135d89ff980c0b330495e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Mon, 24 Sep 2018 09:17:11 +0200 Subject: [PATCH 14/14] Expand on docs and proofs. --- core/transaction-graph/src/pool.rs | 4 ++-- core/transaction-graph/src/ready.rs | 7 ++++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/core/transaction-graph/src/pool.rs b/core/transaction-graph/src/pool.rs index 26509d94e98a6..5dcb55e8f04fd 100644 --- a/core/transaction-graph/src/pool.rs +++ b/core/transaction-graph/src/pool.rs @@ -205,8 +205,8 @@ impl Pool { /// Returns a list of actually removed transactions. /// NOTE some transactions might still be valid, but were just removed because /// they were part of a chain, you may attempt to re-import them later. - /// NOTE If you want to just mark ready transactions that were already used - /// and you no longer need to store them use `mark_used` method. + /// NOTE If you want to remove ready transactions that were already used + /// and you don't want them to be stored in the pool use `prune_tags` method. pub fn remove_invalid(&mut self, hashes: &[Hash]) -> Vec>> { let mut removed = self.ready.remove_invalid(hashes); removed.extend(self.future.remove(hashes).into_iter().map(Arc::new)); diff --git a/core/transaction-graph/src/ready.rs b/core/transaction-graph/src/ready.rs index 5dc9b3dfa61fd..7c1296edfb799 100644 --- a/core/transaction-graph/src/ready.rs +++ b/core/transaction-graph/src/ready.rs @@ -71,7 +71,12 @@ struct ReadyTx { pub requires_offset: usize, } -const HASH_READY: &str = "Every hash is in ready map; qed"; +const HASH_READY: &str = r#" +Every time transaction is imported its hash is placed in `ready` map and tags in `provided_tags`; +Every time transaction is removed from the queue we remove the hash from `ready` map and from `provided_tags`; +Hence every hash retrieved from `provided_tags` is always present in `ready`; +qed +"#; #[derive(Debug)] pub struct ReadyTransactions {