From 1b74df1879ebefdbac79424113a8d752c25aa31a Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Thu, 19 Sep 2019 14:14:34 +0300 Subject: [PATCH 01/12] async txpool API --- Cargo.lock | 3 + core/basic-authorship/src/basic_authorship.rs | 6 +- core/basic-authorship/src/lib.rs | 2 +- core/network/src/protocol.rs | 13 + core/offchain/src/api.rs | 19 +- core/offchain/src/lib.rs | 2 +- core/rpc/api/src/author/error.rs | 3 + core/rpc/api/src/author/mod.rs | 4 +- core/rpc/src/author/mod.rs | 64 +- core/rpc/src/author/tests.rs | 41 +- core/service/src/builder.rs | 80 ++- core/service/src/lib.rs | 18 +- core/service/test/Cargo.toml | 1 + core/service/test/src/lib.rs | 2 +- core/transaction-pool/Cargo.toml | 1 + core/transaction-pool/graph/src/pool.rs | 638 +++++++++++------- core/transaction-pool/src/api.rs | 17 +- core/transaction-pool/src/lib.rs | 2 +- core/transaction-pool/src/tests.rs | 32 +- node-template/src/service.rs | 4 +- node/cli/src/service.rs | 4 +- node/rpc/Cargo.toml | 1 + node/rpc/src/accounts.rs | 7 +- 23 files changed, 591 insertions(+), 373 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4ae6d38280f73..23dcddb4f8d90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2415,6 +2415,7 @@ name = "node-rpc" version = "2.0.0" dependencies = [ "env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 13.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core-client 13.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-derive 13.2.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -5291,6 +5292,7 @@ dependencies = [ "env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", "sr-primitives 2.0.0", "substrate-client 2.0.0", @@ -5447,6 +5449,7 @@ name = "substrate-transaction-pool" version = "2.0.0" dependencies = [ "derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", "parity-scale-codec 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/core/basic-authorship/src/basic_authorship.rs b/core/basic-authorship/src/basic_authorship.rs index 59b12ba1e40b4..c3332670149bc 100644 --- a/core/basic-authorship/src/basic_authorship.rs +++ b/core/basic-authorship/src/basic_authorship.rs @@ -247,10 +247,12 @@ mod tests { fn should_cease_building_block_when_deadline_is_reached() { // given let client = Arc::new(test_client::new()); - let chain_api = transaction_pool::ChainApi::new(client.clone()); + let chain_api = transaction_pool::FullChainApi::new(client.clone()); let txpool = Arc::new(TransactionPool::new(Default::default(), chain_api)); - txpool.submit_at(&BlockId::number(0), vec![extrinsic(0), extrinsic(1)], false).unwrap(); + futures::executor::block_on( + txpool.submit_at(&BlockId::number(0), vec![extrinsic(0), extrinsic(1)], false).unwrap() + ); let mut proposer_factory = ProposerFactory { client: client.clone(), diff --git a/core/basic-authorship/src/lib.rs b/core/basic-authorship/src/lib.rs index 71c9e2792248f..7961e4fe9e9df 100644 --- a/core/basic-authorship/src/lib.rs +++ b/core/basic-authorship/src/lib.rs @@ -26,7 +26,7 @@ //! # use test_client::{self, runtime::{Extrinsic, Transfer}, AccountKeyring}; //! # use transaction_pool::txpool::{self, Pool as TransactionPool}; //! # let client = Arc::new(test_client::new()); -//! # let chain_api = transaction_pool::ChainApi::new(client.clone()); +//! # let chain_api = transaction_pool::FullChainApi::new(client.clone()); //! # let txpool = Arc::new(TransactionPool::new(Default::default(), chain_api)); //! // The first step is to create a `ProposerFactory`. //! let mut proposer_factory = ProposerFactory { diff --git a/core/network/src/protocol.rs b/core/network/src/protocol.rs index ea993bdce0b38..201c826ee2cde 100644 --- a/core/network/src/protocol.rs +++ b/core/network/src/protocol.rs @@ -953,6 +953,14 @@ impl, H: ExHashT> Protocol { who: PeerId, extrinsics: message::Transactions ) { + // sending extrinsic to light node is considered a bad behavior + if !self.config.roles.is_full() { + trace!(target: "sync", "Peer {} is trying to send extrinsic to the light node", who); + self.behaviour.disconnect_peer(&who); + self.peerset_handle.report_peer(who, i32::min_value()); + return; + } + // Accept extrinsics only when fully synced if self.sync.status().state != SyncState::Idle { trace!(target: "sync", "{} Ignoring extrinsics while syncing", who); @@ -985,6 +993,11 @@ impl, H: ExHashT> Protocol { let extrinsics = self.transaction_pool.transactions(); let mut propagated_to = HashMap::new(); for (who, peer) in self.context_data.peers.iter_mut() { + // never send extrinsics to the light node + if !peer.info.roles.is_full() { + continue; + } + let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics .iter() .filter(|&(ref hash, _)| peer.known_extrinsics.insert(hash.clone())) diff --git a/core/offchain/src/api.rs b/core/offchain/src/api.rs index 0057dfd273b8b..9f748e308434c 100644 --- a/core/offchain/src/api.rs +++ b/core/offchain/src/api.rs @@ -302,29 +302,28 @@ impl AsyncApi { match msg { ExtMessage::SubmitExtrinsic(ext) => self.submit_extrinsic(ext), } - future::ready(()) }); future::join(extrinsics, http) .map(|((), ())| ()) } - fn submit_extrinsic(&mut self, ext: Vec) { + fn submit_extrinsic(&mut self, ext: Vec) -> impl Future { let xt = match ::Extrinsic::decode(&mut &*ext) { Ok(xt) => xt, Err(e) => { warn!("Unable to decode extrinsic: {:?}: {}", ext, e.what()); - return + return future::Either::Left(future::ready(())) }, }; info!("Submitting to the pool: {:?} (isSigned: {:?})", xt, xt.is_signed()); - match self.transaction_pool.submit_one(&self.at, xt.clone()) { - Ok(hash) => debug!("[{:?}] Offchain transaction added to the pool.", hash), - Err(e) => { - debug!("Couldn't submit transaction: {:?}", e); - }, - } + future::Either::Right(self.transaction_pool + .submit_one(&self.at, xt.clone()) + .map(|result| match result { + Ok(hash) => { debug!("[{:?}] Offchain transaction added to the pool.", hash); }, + Err(e) => { debug!("Couldn't submit transaction: {:?}", e); }, + })) } } @@ -354,7 +353,7 @@ mod tests { let db = LocalStorage::new_test(); let client = Arc::new(test_client::new()); let pool = Arc::new( - Pool::new(Default::default(), transaction_pool::ChainApi::new(client.clone())) + Pool::new(Default::default(), transaction_pool::FullChainApi::new(client.clone())) ); let mock = Arc::new(MockNetworkStateInfo()); diff --git a/core/offchain/src/lib.rs b/core/offchain/src/lib.rs index 9b785ec8bada1..79c6df04ea109 100644 --- a/core/offchain/src/lib.rs +++ b/core/offchain/src/lib.rs @@ -171,7 +171,7 @@ mod tests { // given let _ = env_logger::try_init(); let client = Arc::new(test_client::new()); - let pool = Arc::new(Pool::new(Default::default(), transaction_pool::ChainApi::new(client.clone()))); + let pool = Arc::new(Pool::new(Default::default(), transaction_pool::FullChainApi::new(client.clone()))); let db = client_db::offchain::LocalStorage::new_test(); let network_state = Arc::new(MockNetworkStateInfo()); diff --git a/core/rpc/api/src/author/error.rs b/core/rpc/api/src/author/error.rs index 727b58bd210f4..8e4f8877682b3 100644 --- a/core/rpc/api/src/author/error.rs +++ b/core/rpc/api/src/author/error.rs @@ -22,6 +22,9 @@ use jsonrpc_core as rpc; /// Author RPC Result type. pub type Result = std::result::Result; +/// Author RPC future Result type. +pub type FutureResult = Box + Send>; + /// Author RPC errors. #[derive(Debug, derive_more::Display, derive_more::From)] pub enum Error { diff --git a/core/rpc/api/src/author/mod.rs b/core/rpc/api/src/author/mod.rs index 5cde56995aad9..4ea96cb3c6122 100644 --- a/core/rpc/api/src/author/mod.rs +++ b/core/rpc/api/src/author/mod.rs @@ -24,7 +24,7 @@ use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId}; use primitives::{ Bytes }; -use self::error::Result; +use self::error::{FutureResult, Result}; use txpool::watcher::Status; pub use self::gen_client::Client as AuthorClient; @@ -37,7 +37,7 @@ pub trait AuthorApi { /// Submit hex-encoded extrinsic for inclusion in block. #[rpc(name = "author_submitExtrinsic")] - fn submit_extrinsic(&self, extrinsic: Bytes) -> Result; + fn submit_extrinsic(&self, extrinsic: Bytes) -> FutureResult; /// Insert a key into the keystore. #[rpc(name = "author_insertKey")] diff --git a/core/rpc/src/author/mod.rs b/core/rpc/src/author/mod.rs index 50b5e30d578a3..1465c24fb1ad1 100644 --- a/core/rpc/src/author/mod.rs +++ b/core/rpc/src/author/mod.rs @@ -20,13 +20,17 @@ mod tests; use std::{sync::Arc, convert::TryInto}; +use futures03::future::{FutureExt, TryFutureExt}; +use log::{error, warn}; use client::{self, Client}; -use rpc::futures::{Sink, Future}; +use rpc::futures::{ + Sink, Future, + future::{Executor, result}, +}; use futures03::{StreamExt as _, compat::Compat}; use api::Subscriptions; use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId}; -use log::warn; use codec::{Encode, Decode}; use primitives::{Bytes, Blake2Hasher, H256, traits::BareCryptoStorePtr}; use sr_primitives::{generic, traits::{self, ProvideRuntimeApi}}; @@ -44,10 +48,12 @@ use session::SessionKeys; /// Re-export the API for backward compatibility. pub use api::author::*; -use self::error::{Error, Result}; +use self::error::{Error, FutureResult, Result}; /// Authoring API pub struct Author where P: PoolChainApi + Sync + Send + 'static { + /// Futures executor. + executor: Arc + Send>> + Send + Sync>, /// Substrate client client: Arc::Block, RA>>, /// Transactions pool @@ -61,12 +67,14 @@ pub struct Author where P: PoolChainApi + Sync + Send + 'static { impl Author where P: PoolChainApi + Sync + Send + 'static { /// Create new instance of Authoring API. pub fn new( + executor: Arc + Send>> + Send + Sync>, client: Arc::Block, RA>>, pool: Arc>, subscriptions: Subscriptions, keystore: BareCryptoStorePtr, ) -> Self { Author { + executor, client, pool, subscriptions, @@ -108,15 +116,20 @@ impl AuthorApi, BlockHash

> for Author whe ).map(Into::into).map_err(|e| Error::Client(Box::new(e))) } - fn submit_extrinsic(&self, ext: Bytes) -> Result> { - let xt = Decode::decode(&mut &ext[..])?; + fn submit_extrinsic(&self, ext: Bytes) -> FutureResult> { + let xt = match Decode::decode(&mut &ext[..]) { + Ok(xt) => xt, + Err(err) => return Box::new(result(Err(err.into()))), + }; let best_block_hash = self.client.info().chain.best_hash; - self.pool + Box::new(self.pool .submit_one(&generic::BlockId::hash(best_block_hash), xt) + .boxed() + .compat() .map_err(|e| e.into_pool_error() .map(Into::into) - .unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into()) - ) + .unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into())) + ) } fn pending_extrinsics(&self) -> Result> { @@ -151,17 +164,20 @@ impl AuthorApi, BlockHash

> for Author whe ) { let submit = || -> Result<_> { let best_block_hash = self.client.info().chain.best_hash; - let dxt = <

::Block as traits::Block>::Extrinsic::decode(&mut &xt[..])?; - self.pool + let dxt = <

::Block as traits::Block>::Extrinsic::decode(&mut &xt[..]) + .map_err(|e| error::Error::from(e))?; + Ok(self.pool .submit_and_watch(&generic::BlockId::hash(best_block_hash), dxt) + .boxed() + .compat() .map_err(|e| e.into_pool_error() - .map(Into::into) + .map(|e| error::Error::from(e)) .unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into()) - ) + )) }; - let watcher = match submit() { - Ok(watcher) => watcher, + let future_watcher = match submit() { + Ok(future_watcher) => future_watcher, Err(err) => { // reject the subscriber (ignore errors - we don't care if subscriber is no longer there). let _ = subscriber.reject(err.into()); @@ -169,12 +185,20 @@ impl AuthorApi, BlockHash

> for Author whe }, }; - self.subscriptions.add(subscriber, move |sink| { - sink - .sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) - .send_all(Compat::new(watcher.into_stream().map(|v| Ok::<_, ()>(Ok(v))))) - .map(|_| ()) - }) + let subscriptions = self.subscriptions.clone(); + let subscription_future = future_watcher + .map_err(|err| { warn!("Failed to submit extrinsic: {}", err); }) + .map(move |watcher| subscriptions.add(subscriber, move |sink| { + sink + .sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) + .send_all(Compat::new(watcher.into_stream().map(|v| Ok::<_, ()>(Ok(v))))) + .map(|_| ()) + })); + + + if self.executor.execute(Box::new(subscription_future)).is_err() { + error!("Failed to spawn watch extrinsic task"); + } } fn unwatch_extrinsic(&self, _metadata: Option, id: SubscriptionId) -> Result { diff --git a/core/rpc/src/author/tests.rs b/core/rpc/src/author/tests.rs index 861b65bfe75ca..ddd77ee753b7e 100644 --- a/core/rpc/src/author/tests.rs +++ b/core/rpc/src/author/tests.rs @@ -21,7 +21,7 @@ use assert_matches::assert_matches; use codec::Encode; use transaction_pool::{ txpool::Pool, - ChainApi, + FullChainApi, }; use futures::Stream; use primitives::{ @@ -50,8 +50,9 @@ fn submit_transaction_should_not_cause_error() { let client = Arc::new(test_client::new()); let keystore = KeyStore::new(); let p = Author { + executor: Arc::new(runtime.executor()), client: client.clone(), - pool: Arc::new(Pool::new(Default::default(), ChainApi::new(client))), + pool: Arc::new(Pool::new(Default::default(), FullChainApi::new(client))), subscriptions: Subscriptions::new(Arc::new(runtime.executor())), keystore: keystore.clone(), }; @@ -59,11 +60,11 @@ fn submit_transaction_should_not_cause_error() { let h: H256 = blake2_256(&xt).into(); assert_matches!( - AuthorApi::submit_extrinsic(&p, xt.clone().into()), + AuthorApi::submit_extrinsic(&p, xt.clone().into()).wait(), Ok(h2) if h == h2 ); assert!( - AuthorApi::submit_extrinsic(&p, xt.into()).is_err() + AuthorApi::submit_extrinsic(&p, xt.into()).wait().is_err() ); } @@ -73,8 +74,9 @@ fn submit_rich_transaction_should_not_cause_error() { let client = Arc::new(test_client::new()); let keystore = KeyStore::new(); let p = Author { + executor: Arc::new(runtime.executor()), client: client.clone(), - pool: Arc::new(Pool::new(Default::default(), ChainApi::new(client.clone()))), + pool: Arc::new(Pool::new(Default::default(), FullChainApi::new(client.clone()))), subscriptions: Subscriptions::new(Arc::new(runtime.executor())), keystore: keystore.clone(), }; @@ -82,11 +84,11 @@ fn submit_rich_transaction_should_not_cause_error() { let h: H256 = blake2_256(&xt).into(); assert_matches!( - AuthorApi::submit_extrinsic(&p, xt.clone().into()), + AuthorApi::submit_extrinsic(&p, xt.clone().into()).wait(), Ok(h2) if h == h2 ); assert!( - AuthorApi::submit_extrinsic(&p, xt.into()).is_err() + AuthorApi::submit_extrinsic(&p, xt.into()).wait().is_err() ); } @@ -95,9 +97,10 @@ fn should_watch_extrinsic() { //given let mut runtime = runtime::Runtime::new().unwrap(); let client = Arc::new(test_client::new()); - let pool = Arc::new(Pool::new(Default::default(), ChainApi::new(client.clone()))); + let pool = Arc::new(Pool::new(Default::default(), FullChainApi::new(client.clone()))); let keystore = KeyStore::new(); let p = Author { + executor: Arc::new(runtime.executor()), client, pool: pool.clone(), subscriptions: Subscriptions::new(Arc::new(runtime.executor())), @@ -120,7 +123,7 @@ fn should_watch_extrinsic() { }; tx.into_signed_tx() }; - AuthorApi::submit_extrinsic(&p, replacement.encode().into()).unwrap(); + AuthorApi::submit_extrinsic(&p, replacement.encode().into()).wait().unwrap(); let (res, data) = runtime.block_on(data.into_future()).unwrap(); assert_eq!( res, @@ -137,16 +140,17 @@ fn should_watch_extrinsic() { fn should_return_pending_extrinsics() { let runtime = runtime::Runtime::new().unwrap(); let client = Arc::new(test_client::new()); - let pool = Arc::new(Pool::new(Default::default(), ChainApi::new(client.clone()))); + let pool = Arc::new(Pool::new(Default::default(), FullChainApi::new(client.clone()))); let keystore = KeyStore::new(); let p = Author { + executor: Arc::new(runtime.executor()), client, pool: pool.clone(), subscriptions: Subscriptions::new(Arc::new(runtime.executor())), keystore: keystore.clone(), }; let ex = uxt(AccountKeyring::Alice, 0); - AuthorApi::submit_extrinsic(&p, ex.encode().into()).unwrap(); + AuthorApi::submit_extrinsic(&p, ex.encode().into()).wait().unwrap(); assert_matches!( p.pending_extrinsics(), Ok(ref expected) if *expected == vec![Bytes(ex.encode())] @@ -157,20 +161,21 @@ fn should_return_pending_extrinsics() { fn should_remove_extrinsics() { let runtime = runtime::Runtime::new().unwrap(); let client = Arc::new(test_client::new()); - let pool = Arc::new(Pool::new(Default::default(), ChainApi::new(client.clone()))); + let pool = Arc::new(Pool::new(Default::default(), FullChainApi::new(client.clone()))); let keystore = KeyStore::new(); let p = Author { + executor: Arc::new(runtime.executor()), client, pool: pool.clone(), subscriptions: Subscriptions::new(Arc::new(runtime.executor())), keystore: keystore.clone(), }; let ex1 = uxt(AccountKeyring::Alice, 0); - p.submit_extrinsic(ex1.encode().into()).unwrap(); + p.submit_extrinsic(ex1.encode().into()).wait().unwrap(); let ex2 = uxt(AccountKeyring::Alice, 1); - p.submit_extrinsic(ex2.encode().into()).unwrap(); + p.submit_extrinsic(ex2.encode().into()).wait().unwrap(); let ex3 = uxt(AccountKeyring::Bob, 0); - let hash3 = p.submit_extrinsic(ex3.encode().into()).unwrap(); + let hash3 = p.submit_extrinsic(ex3.encode().into()).wait().unwrap(); assert_eq!(pool.status().ready, 3); // now remove all 3 @@ -189,8 +194,9 @@ fn should_insert_key() { let client = Arc::new(test_client::new()); let keystore = KeyStore::new(); let p = Author { + executor: Arc::new(runtime.executor()), client: client.clone(), - pool: Arc::new(Pool::new(Default::default(), ChainApi::new(client))), + pool: Arc::new(Pool::new(Default::default(), FullChainApi::new(client))), subscriptions: Subscriptions::new(Arc::new(runtime.executor())), keystore: keystore.clone(), }; @@ -215,8 +221,9 @@ fn should_rotate_keys() { let keystore = KeyStore::new(); let client = Arc::new(test_client::TestClientBuilder::new().set_keystore(keystore.clone()).build()); let p = Author { + executor: Arc::new(runtime.executor()), client: client.clone(), - pool: Arc::new(Pool::new(Default::default(), ChainApi::new(client))), + pool: Arc::new(Pool::new(Default::default(), FullChainApi::new(client))), subscriptions: Subscriptions::new(Arc::new(runtime.executor())), keystore: keystore.clone(), }; diff --git a/core/service/src/builder.rs b/core/service/src/builder.rs index e6d2cdc89acff..6ca6b440196a2 100644 --- a/core/service/src/builder.rs +++ b/core/service/src/builder.rs @@ -24,8 +24,13 @@ use client::{ }; use codec::{Decode, Encode, IoReader}; use consensus_common::import_queue::ImportQueue; -use futures::{prelude::*, sync::mpsc}; -use futures03::{FutureExt as _, compat::Compat, StreamExt as _, TryStreamExt as _}; +use futures::{prelude::*, sync::mpsc, future::{result, Either}}; +use futures03::{ + compat::Compat, + future::ready, + FutureExt as _, TryFutureExt as _, + StreamExt as _, TryStreamExt as _, +}; use keystore::{Store as Keystore, KeyStorePtr}; use log::{info, warn}; use network::{FinalityProofProvider, OnDemand, NetworkService, NetworkStateInfo, DhtEvent}; @@ -896,6 +901,7 @@ where let chain = rpc_builder.build_chain(subscriptions.clone()); let state = rpc_builder.build_state(subscriptions.clone()); let author = rpc::author::Author::new( + task_executor.clone(), client, transaction_pool, subscriptions, @@ -914,39 +920,54 @@ where pub(crate) fn maintain_transaction_pool( id: &BlockId, - client: &Client, + client: &Arc>, transaction_pool: &TransactionPool, retracted: &[Block::Hash], -) -> error::Result<()> where +) -> error::Result + Send>> where Block: BlockT::Out>, - Backend: client::backend::Backend, + Backend: 'static + client::backend::Backend, Client: ProvideRuntimeApi, as ProvideRuntimeApi>::Api: runtime_api::TaggedTransactionQueue, - Executor: client::CallExecutor, - PoolApi: txpool::ChainApi, + Executor: 'static + client::CallExecutor, + PoolApi: 'static + txpool::ChainApi, + Api: 'static, { // Put transactions from retracted blocks back into the pool. - for r in retracted { - if let Some(block) = client.block(&BlockId::hash(*r))? { - let extrinsics = block.block.extrinsics(); - if let Err(e) = transaction_pool.submit_at(id, extrinsics.iter().cloned(), true) { - warn!("Error re-submitting transactions: {:?}", e); - } + let client_copy = client.clone(); + let retracted_transactions = retracted.to_vec().into_iter() + .filter_map(move |hash| client_copy.block(&BlockId::hash(hash)).ok().unwrap_or(None)) + .flat_map(|block| block.block.deconstruct().1.into_iter()); + let resubmit_future = match transaction_pool.submit_at(id, retracted_transactions, true) { + Ok(resubmit_future) => Either::A(resubmit_future + .then(|_| ready(Ok(()))) + .boxed() + .compat() + ), + Err(e) => { + warn!("Error re-submitting transactions: {:?}", e); + Either::B(result(Ok(()))) } - } + }; // Avoid calling into runtime if there is nothing to prune from the pool anyway. if transaction_pool.status().is_empty() { - return Ok(()) - } - - if let Some(block) = client.block(id)? { - let parent_id = BlockId::hash(*block.block.header().parent_hash()); - let extrinsics = block.block.extrinsics(); - transaction_pool.prune(id, &parent_id, extrinsics).map_err(|e| format!("{:?}", e))?; + return Ok(Box::new(resubmit_future)) } - Ok(()) + let block = client.block(id)?; + Ok(match block { + Some(block) => { + let parent_id = BlockId::hash(*block.block.header().parent_hash()); + let prune_future = transaction_pool + .prune(id, &parent_id, block.block.extrinsics()) + .boxed() + .compat() + .map_err(|e| { format!("{:?}", e); }); + + Box::new(resubmit_future.and_then(|_| prune_future)) + }, + None => Box::new(resubmit_future), + }) } pub(crate) fn offchain_workers( @@ -978,6 +999,7 @@ where #[cfg(test)] mod tests { use super::*; + use futures03::executor::block_on; use consensus_common::{BlockOrigin, SelectChain}; use substrate_test_runtime_client::{prelude::*, runtime::Transfer}; @@ -985,7 +1007,7 @@ mod tests { fn should_remove_transactions_from_the_pool() { let (client, longest_chain) = TestClientBuilder::new().build_with_longest_chain(); let client = Arc::new(client); - let pool = TransactionPool::new(Default::default(), ::transaction_pool::ChainApi::new(client.clone())); + let pool = TransactionPool::new(Default::default(), ::transaction_pool::FullChainApi::new(client.clone())); let transaction = Transfer { amount: 5, nonce: 0, @@ -995,7 +1017,7 @@ mod tests { let best = longest_chain.best_chain().unwrap(); // store the transaction in the pool - pool.submit_one(&BlockId::hash(best.hash()), transaction.clone()).unwrap(); + block_on(pool.submit_one(&BlockId::hash(best.hash()), transaction.clone())).unwrap(); // import the block let mut builder = client.new_block(Default::default()).unwrap(); @@ -1011,7 +1033,7 @@ mod tests { &client, &pool, &[] - ).unwrap(); + ).unwrap().wait().unwrap(); // then assert_eq!(pool.status().ready, 0); @@ -1022,7 +1044,7 @@ mod tests { fn should_add_reverted_transactions_to_the_pool() { let (client, longest_chain) = TestClientBuilder::new().build_with_longest_chain(); let client = Arc::new(client); - let pool = TransactionPool::new(Default::default(), ::transaction_pool::ChainApi::new(client.clone())); + let pool = TransactionPool::new(Default::default(), ::transaction_pool::FullChainApi::new(client.clone())); let transaction = Transfer { amount: 5, nonce: 0, @@ -1032,7 +1054,7 @@ mod tests { let best = longest_chain.best_chain().unwrap(); // store the transaction in the pool - pool.submit_one(&BlockId::hash(best.hash()), transaction.clone()).unwrap(); + block_on(pool.submit_one(&BlockId::hash(best.hash()), transaction.clone())).unwrap(); // import the block let mut builder = client.new_block(Default::default()).unwrap(); @@ -1049,7 +1071,7 @@ mod tests { &client, &pool, &[] - ).unwrap(); + ).unwrap().wait().unwrap(); // then assert_eq!(pool.status().ready, 0); @@ -1067,7 +1089,7 @@ mod tests { &client, &pool, &[block1_hash] - ).unwrap(); + ).unwrap().wait().unwrap(); // then assert_eq!(pool.status().ready, 1); diff --git a/core/service/src/lib.rs b/core/service/src/lib.rs index 431776b31e9b4..72d7b6671bbb5 100644 --- a/core/service/src/lib.rs +++ b/core/service/src/lib.rs @@ -38,6 +38,7 @@ use parking_lot::Mutex; use client::{runtime_api::BlockT, Client}; use exit_future::Signal; use futures::prelude::*; +use futures03::executor::block_on; use futures03::stream::{StreamExt as _, TryStreamExt as _}; use network::{NetworkService, NetworkState, specialization::NetworkSpecialization, Event, DhtEvent}; use log::{log, warn, debug, error, Level}; @@ -236,12 +237,13 @@ macro_rules! new_impl { let txpool = txpool.upgrade(); if let (Some(txpool), Some(client)) = (txpool.as_ref(), wclient.upgrade()) { - $maintain_transaction_pool( + let future = $maintain_transaction_pool( &BlockId::hash(notification.hash), - &*client, + &client, &*txpool, ¬ification.retracted, ).map_err(|e| warn!("Pool error processing new block: {:?}", e))?; + let _ = to_spawn_tx_.unbounded_send(future); } let offchain = offchain.as_ref().and_then(|o| o.upgrade()); @@ -920,7 +922,10 @@ where match Decode::decode(&mut &encoded[..]) { Ok(uxt) => { let best_block_id = BlockId::hash(self.client.info().chain.best_hash); - match self.pool.submit_one(&best_block_id, uxt) { + let import_future = self.pool.submit_one(&best_block_id, uxt); + let import_result = block_on(import_future); + + match import_result { Ok(hash) => Some(hash), Err(e) => match e.into_pool_error() { Ok(txpool::error::Error::AlreadyImported(hash)) => { @@ -953,6 +958,7 @@ where #[cfg(test)] mod tests { use super::*; + use futures03::executor::block_on; use consensus_common::SelectChain; use sr_primitives::traits::BlindCheckable; use substrate_test_runtime_client::{prelude::*, runtime::{Extrinsic, Transfer}}; @@ -964,7 +970,7 @@ mod tests { let client = Arc::new(client); let pool = Arc::new(TransactionPool::new( Default::default(), - transaction_pool::ChainApi::new(client.clone()) + transaction_pool::FullChainApi::new(client.clone()) )); let best = longest_chain.best_chain().unwrap(); let transaction = Transfer { @@ -973,8 +979,8 @@ mod tests { from: AccountKeyring::Alice.into(), to: Default::default(), }.into_signed_tx(); - pool.submit_one(&BlockId::hash(best.hash()), transaction.clone()).unwrap(); - pool.submit_one(&BlockId::hash(best.hash()), Extrinsic::IncludeData(vec![1])).unwrap(); + block_on(pool.submit_one(&BlockId::hash(best.hash()), transaction.clone())).unwrap(); + block_on(pool.submit_one(&BlockId::hash(best.hash()), Extrinsic::IncludeData(vec![1]))).unwrap(); assert_eq!(pool.status().ready, 2); // when diff --git a/core/service/test/Cargo.toml b/core/service/test/Cargo.toml index aa3dddfc1851e..f937be8611a08 100644 --- a/core/service/test/Cargo.toml +++ b/core/service/test/Cargo.toml @@ -8,6 +8,7 @@ edition = "2018" tempdir = "0.3" tokio = "0.1.7" futures = "0.1" +futures03 = { package = "futures-preview", version = "=0.3.0-alpha.17", features = ["compat"] } log = "0.4" env_logger = "0.6" fdlimit = "0.1" diff --git a/core/service/test/src/lib.rs b/core/service/test/src/lib.rs index 870f287bff8f2..d7627e353b87b 100644 --- a/core/service/test/src/lib.rs +++ b/core/service/test/src/lib.rs @@ -424,7 +424,7 @@ pub fn sync( let first_user_data = &network.full_nodes[0].2; let best_block = BlockId::number(first_service.get().client().info().chain.best_number); let extrinsic = extrinsic_factory(&first_service.get(), first_user_data); - first_service.get().transaction_pool().submit_one(&best_block, extrinsic).unwrap(); + futures03::executor::block_on(first_service.get().transaction_pool().submit_one(&best_block, extrinsic)).unwrap(); network.run_until_all_full( |_index, service| service.get().transaction_pool().ready().count() == 1, |_index, _service| true, diff --git a/core/transaction-pool/Cargo.toml b/core/transaction-pool/Cargo.toml index 5e9973b6dc190..43758add8701b 100644 --- a/core/transaction-pool/Cargo.toml +++ b/core/transaction-pool/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" [dependencies] derive_more = "0.14.0" +futures-preview = "=0.3.0-alpha.17" log = "0.4" codec = { package = "parity-scale-codec", version = "1.0.0" } parking_lot = "0.9.0" diff --git a/core/transaction-pool/graph/src/pool.rs b/core/transaction-pool/graph/src/pool.rs index 389892101ee42..3f7248cf7759a 100644 --- a/core/transaction-pool/graph/src/pool.rs +++ b/core/transaction-pool/graph/src/pool.rs @@ -29,7 +29,11 @@ use crate::watcher::Watcher; use serde::Serialize; use log::debug; -use futures::channel::mpsc; +use futures::{ + Future, FutureExt, + channel::mpsc, + future::{Either, ready, join_all}, +}; use parking_lot::{Mutex, RwLock}; use sr_primitives::{ generic::BlockId, @@ -61,9 +65,15 @@ pub trait ChainApi: Send + Sync { type Hash: hash::Hash + Eq + traits::Member + Serialize; /// Error type. type Error: From + error::IntoPoolError; + /// Validate transaction future. + type ValidationFuture: Future> + Send; /// Verify extrinsic at given block. - fn validate_transaction(&self, at: &BlockId, uxt: ExtrinsicFor) -> Result; + fn validate_transaction( + &self, + at: &BlockId, + uxt: ExtrinsicFor, + ) -> Self::ValidationFuture; /// Returns a block number given the block id. fn block_id_to_number(&self, at: &BlockId) -> Result>, Self::Error>; @@ -101,6 +111,14 @@ impl Default for Options { /// Extrinsics pool. pub struct Pool { + data: SharedPoolData, +} + +/// Extrinsics pool shared data. +struct SharedPoolData(Arc>); + +/// Extrinsics pool data. +struct PoolData { api: B, options: Options, listener: RwLock, BlockHash>>, @@ -115,109 +133,39 @@ pub struct Pool { impl Pool { /// Imports a bunch of unverified extrinsics to the pool pub fn submit_at(&self, at: &BlockId, xts: T, force: bool) - -> Result, B::Error>>, B::Error> + -> Result, B::Error>>>, B::Error> where T: IntoIterator> { - let block_number = self.api.block_id_to_number(at)? - .ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())?; - - let results = xts - .into_iter() - .map(|xt| -> Result<_, B::Error> { - let (hash, bytes) = self.api.hash_and_length(&xt); - if !force && self.rotator.is_banned(&hash) { - return Err(error::Error::TemporarilyBanned.into()) - } - - match self.api.validate_transaction(at, xt.clone())? { - Ok(validity) => if validity.provides.is_empty() { - Err(error::Error::NoTagsProvided.into()) - } else { - Ok(base::Transaction { - data: xt, - bytes, - hash, - priority: validity.priority, - requires: validity.requires, - provides: validity.provides, - propagate: validity.propagate, - valid_till: block_number - .saturated_into::() - .saturating_add(validity.longevity), - }) - }, - Err(TransactionValidityError::Invalid(e)) => { - Err(error::Error::InvalidTransaction(e).into()) - }, - Err(TransactionValidityError::Unknown(e)) => { - self.listener.write().invalid(&hash); - Err(error::Error::UnknownTransaction(e).into()) - }, - } - }) - .map(|tx| { - let imported = self.pool.write().import(tx?)?; - - if let base::Imported::Ready { .. } = imported { - self.import_notification_sinks.lock().retain(|sink| sink.unbounded_send(()).is_ok()); - } - - let mut listener = self.listener.write(); - fire_events(&mut *listener, &imported); - Ok(imported.hash().clone()) - }) - .collect::>(); - - let removed = self.enforce_limits(); - - Ok(results.into_iter().map(|res| match res { - Ok(ref hash) if removed.contains(hash) => Err(error::Error::ImmediatelyDropped.into()), - other => other, - }).collect()) - } - - fn enforce_limits(&self) -> HashSet> { - let status = self.pool.read().status(); - let ready_limit = &self.options.ready; - let future_limit = &self.options.future; - - debug!(target: "txpool", "Pool Status: {:?}", status); - - if ready_limit.is_exceeded(status.ready, status.ready_bytes) - || future_limit.is_exceeded(status.future, status.future_bytes) { - // clean up the pool - let removed = { - let mut pool = self.pool.write(); - let removed = pool.enforce_limits(ready_limit, future_limit) - .into_iter().map(|x| x.hash.clone()).collect::>(); - // ban all removed transactions - self.rotator.ban(&std::time::Instant::now(), removed.iter().map(|x| x.clone())); - removed - }; - // run notifications - let mut listener = self.listener.write(); - for h in &removed { - listener.dropped(h, None); - } - - removed - } else { - Default::default() - } + self.data.submit_at(at, xts, force) } /// Imports one unverified extrinsic to the pool - pub fn submit_one(&self, at: &BlockId, xt: ExtrinsicFor) -> Result, B::Error> { - Ok(self.submit_at(at, ::std::iter::once(xt), false)?.pop().expect("One extrinsic passed; one result returned; qed")?) + pub fn submit_one( + &self, + at: &BlockId, + xt: ExtrinsicFor, + ) -> impl Future, B::Error>> { + match self.submit_at(at, ::std::iter::once(xt), false) { + Ok(import_future) => Either::Left(import_future + .map(|mut import_result| import_result + .pop() + .expect("One extrinsic passed; one result returned; qed") + ) + ), + Err(error) => Either::Right(ready(Err(error))), + } } /// Import a single extrinsic and starts to watch their progress in the pool. - pub fn submit_and_watch(&self, at: &BlockId, xt: ExtrinsicFor) -> Result, BlockHash>, B::Error> { - let hash = self.api.hash_and_length(&xt).0; - let watcher = self.listener.write().create_watcher(hash); - self.submit_one(at, xt)?; - Ok(watcher) + pub fn submit_and_watch( + &self, + at: &BlockId, + xt: ExtrinsicFor, + ) -> impl Future, BlockHash>, B::Error>> { + let hash = self.data.0.api.hash_and_length(&xt).0; + let watcher = self.data.0.listener.write().create_watcher(hash); + self.submit_one(at, xt).map(move |r| r.map(|_| watcher)) } /// Prunes ready transactions. @@ -226,41 +174,51 @@ impl Pool { /// To perform pruning we need the tags that each extrinsic provides and to avoid calling /// into runtime too often we first lookup all extrinsics that are in the pool and get /// their provided tags from there. Otherwise we query the runtime at the `parent` block. - pub fn prune(&self, at: &BlockId, parent: &BlockId, extrinsics: &[ExtrinsicFor]) -> Result<(), B::Error> { - let mut tags = Vec::with_capacity(extrinsics.len()); + pub fn prune( + &self, + at: &BlockId, + parent: &BlockId, + extrinsics: &[ExtrinsicFor], + ) -> impl Future> { // Get details of all extrinsics that are already in the pool - let hashes = extrinsics.iter().map(|extrinsic| self.api.hash_and_length(extrinsic).0).collect::>(); - let in_pool = self.pool.read().by_hash(&hashes); - { - // Zip the ones from the pool with the full list (we get pairs `(Extrinsic, Option)`) - let all = extrinsics.iter().zip(in_pool.iter()); + let hashes = extrinsics.iter() + .map(|extrinsic| self.data.0.api.hash_and_length(extrinsic).0) + .collect::>(); + let in_pool = self.data.0.pool.read().by_hash(&hashes); + + // Zip the ones from the pool with the full list (we get pairs `(Extrinsic, Option)`) + let all = extrinsics.iter().zip(in_pool.iter()); - for (extrinsic, existing_in_pool) in all { + // Prepare future that collect tags for all extrinsics + let future_tags = join_all(all + .map(|(extrinsic, existing_in_pool)| match *existing_in_pool { // reuse the tags for extrinsics that were found in the pool - Some(ref transaction) => { - tags.extend(transaction.provides.iter().cloned()); - }, + Some(ref transaction) => Either::Left( + ready(transaction.provides.clone()) + ), // if it's not found in the pool query the runtime at parent block // to get validity info and tags that the extrinsic provides. - None => { - let validity = self.api.validate_transaction(parent, extrinsic.clone()); - match validity { - Ok(Ok(mut validity)) => { - tags.append(&mut validity.provides); - }, + None => Either::Right(self.data.0.api.validate_transaction(parent, extrinsic.clone()) + .then(|validity| ready(match validity { + Ok(Ok(validity)) => validity.provides, // silently ignore invalid extrinsics, // cause they might just be inherent - _ => {} - } - }, + _ => Vec::new(), + }))), } - } - } - - self.prune_tags(at, tags, in_pool.into_iter().filter_map(|x| x).map(|x| x.hash.clone()))?; - - Ok(()) + )); + + // Prune all transactions that provide collected tags + let data = self.data.clone(); + let at_clone = at.clone(); + future_tags.then(move |tags| + data.prune_tags( + &at_clone, + tags.into_iter().flat_map(|t| t), + in_pool.into_iter().filter_map(|x| x).map(|x| x.hash.clone()), + ) + ) } /// Prunes ready transactions that provide given list of tags. @@ -286,13 +244,204 @@ impl Pool { at: &BlockId, tags: impl IntoIterator, known_imported_hashes: impl IntoIterator> + Clone, - ) -> Result<(), B::Error> { + ) -> impl Future> { + self.data.prune_tags(at, tags, known_imported_hashes) + } + + /// Removes stale transactions from the pool. + /// + /// Stale transactions are transaction beyond their longevity period. + /// Note this function does not remove transactions that are already included in the chain. + /// See `prune_tags` if you want this. + pub fn clear_stale(&self, at: &BlockId) -> Result<(), B::Error> { + self.data.clear_stale(at) + } + + /// Create a new transaction pool. + pub fn new(options: Options, api: B) -> Self { + Pool { + data: SharedPoolData(Arc::new(PoolData { + api, + options, + listener: Default::default(), + pool: Default::default(), + import_notification_sinks: Default::default(), + rotator: Default::default(), + })), + } + } + + /// Return an event stream of transactions imported to the pool. + pub fn import_notification_stream(&self) -> EventStream { + let (sink, stream) = mpsc::unbounded(); + self.data.0.import_notification_sinks.lock().push(sink); + stream + } + + /// Invoked when extrinsics are broadcasted. + pub fn on_broadcasted(&self, propagated: HashMap, Vec>) { + let mut listener = self.data.0.listener.write(); + for (hash, peers) in propagated.into_iter() { + listener.broadcasted(&hash, peers); + } + } + + /// Remove from the pool. + pub fn remove_invalid(&self, hashes: &[ExHash]) -> Vec> { + self.data.remove_invalid(hashes) + } + + /// Get an iterator for ready transactions ordered by priority + pub fn ready(&self) -> impl Iterator> { + self.data.0.pool.read().ready() + } + + /// Returns pool status. + pub fn status(&self) -> base::Status { + self.data.0.pool.read().status() + } + + /// Returns transaction hash + pub fn hash_of(&self, xt: &ExtrinsicFor) -> ExHash { + self.data.0.api.hash_and_length(xt).0 + } +} + +impl Clone for SharedPoolData { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl SharedPoolData { + /// Imports a bunch of unverified extrinsics to the pool + pub fn submit_at(&self, at: &BlockId, xts: T, force: bool) + -> Result, B::Error>>>, B::Error> + where + T: IntoIterator> + { + let block_number = self.0.api.block_id_to_number(at)? + .ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())?; + + // for each xt, prepare a validation future + let validation_futures = xts.into_iter().map(move |xt| { + let (hash, bytes) = self.0.api.hash_and_length(&xt); + if !force && self.0.rotator.is_banned(&hash) { + return Either::Left(ready(Err(error::Error::TemporarilyBanned.into()))) + } + + let self_clone = self.clone(); + Either::Right(self.0.api.validate_transaction(at, xt.clone()) + .then(move |validation_result| ready(validation_result.and_then(|validity| + match validity { + Ok(validity) => if validity.provides.is_empty() { + Err(error::Error::NoTagsProvided.into()) + } else { + Ok(base::Transaction { + data: xt, + bytes, + hash, + priority: validity.priority, + requires: validity.requires, + provides: validity.provides, + propagate: validity.propagate, + valid_till: block_number + .saturated_into::() + .saturating_add(validity.longevity), + }) + }, + Err(TransactionValidityError::Invalid(e)) => { + Err(error::Error::InvalidTransaction(e).into()) + }, + Err(TransactionValidityError::Unknown(e)) => { + self_clone.0.listener.write().invalid(&hash); + Err(error::Error::UnknownTransaction(e).into()) + }, + } + ))) + ) + }); + + // make single validation future that waits all until all extrinsics are validated + let validation_future = join_all(validation_futures); + + // and then import all successfully validated extrinsics at once + enforce limits + let self_clone = self.clone(); + let import_future = validation_future + .then(move |validation_results| ready({ + let import_results = validation_results + .into_iter() + .map(|validation_result| { + let imported = self_clone.0.pool.write().import(validation_result?)?; + + if let base::Imported::Ready { .. } = imported { + self_clone.0.import_notification_sinks + .lock() + .retain(|sink| sink.unbounded_send(()).is_ok()); + } + + let mut listener = self_clone.0.listener.write(); + fire_events(&mut *listener, &imported); + Ok(imported.hash().clone()) + }) + .collect::>(); + + let removed = self_clone.enforce_limits(); + + import_results.into_iter().map(|res| match res { + Ok(ref hash) if removed.contains(hash) => Err(error::Error::ImmediatelyDropped.into()), + other => other, + }).collect::, B::Error>>>() + })); + + Ok(import_future) + } + + /// Remove extrinsics that ary beyond pool limits. + fn enforce_limits(&self) -> HashSet> { + let status = self.0.pool.read().status(); + let ready_limit = &self.0.options.ready; + let future_limit = &self.0.options.future; + + debug!(target: "txpool", "Pool Status: {:?}", status); + + if ready_limit.is_exceeded(status.ready, status.ready_bytes) + || future_limit.is_exceeded(status.future, status.future_bytes) { + // clean up the pool + let removed = { + let mut pool = self.0.pool.write(); + let removed = pool.enforce_limits(ready_limit, future_limit) + .into_iter().map(|x| x.hash.clone()).collect::>(); + // ban all removed transactions + self.0.rotator.ban(&std::time::Instant::now(), removed.iter().map(|x| x.clone())); + removed + }; + // run notifications + let mut listener = self.0.listener.write(); + for h in &removed { + listener.dropped(h, None); + } + + removed + } else { + Default::default() + } + } + + + /// Prunes ready transactions that provide given list of tags. + fn prune_tags( + &self, + at: &BlockId, + tags: impl IntoIterator, + known_imported_hashes: impl IntoIterator> + Clone, + ) -> impl Future> { // Perform tag-based pruning in the base pool - let status = self.pool.write().prune_tags(tags); + let status = self.0.pool.write().prune_tags(tags); // Notify event listeners of all transactions // that were promoted to `Ready` or were dropped. { - let mut listener = self.listener.write(); + let mut listener = self.0.listener.write(); for promoted in &status.promoted { fire_events(&mut *listener, promoted); } @@ -303,56 +452,75 @@ impl Pool { // make sure that we don't revalidate extrinsics that were part of the recently // imported block. This is especially important for UTXO-like chains cause the // inputs are pruned so such transaction would go to future again. - self.rotator.ban(&std::time::Instant::now(), known_imported_hashes.clone().into_iter()); + self.0.rotator.ban(&std::time::Instant::now(), known_imported_hashes.clone().into_iter()); // try to re-submit pruned transactions since some of them might be still valid. // note that `known_imported_hashes` will be rejected here due to temporary ban. let hashes = status.pruned.iter().map(|tx| tx.hash.clone()).collect::>(); - let results = self.submit_at(at, status.pruned.into_iter().map(|tx| tx.data.clone()), false)?; + let results_future = self.submit_at(at, status.pruned.into_iter().map(|tx| tx.data.clone()), false); + let results_future = match results_future { + Ok(results_future) => results_future, + Err(err) => return Either::Left(ready(Err(err))), + }; // Collect the hashes of transactions that now became invalid (meaning that they are successfully pruned). - let hashes = results.into_iter().enumerate().filter_map(|(idx, r)| match r.map_err(error::IntoPoolError::into_pool_error) { - Err(Ok(error::Error::InvalidTransaction(_))) => Some(hashes[idx].clone()), - _ => None, - }); + let hashes_future = results_future.then(move |results| ready(results + .into_iter() + .enumerate() + .filter_map(move |(idx, r)| match r.map_err(error::IntoPoolError::into_pool_error) { + Err(Ok(error::Error::InvalidTransaction(_))) => Some(hashes[idx].clone()), + _ => None, + }) + )); + // Fire `pruned` notifications for collected hashes and make sure to include // `known_imported_hashes` since they were just imported as part of the block. - let hashes = hashes.chain(known_imported_hashes.into_iter()); - { - let header_hash = self.api.block_id_to_hash(at)? - .ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())?; - let mut listener = self.listener.write(); - for h in hashes { - listener.pruned(header_hash, &h); - } - } + let self_clone = self.clone(); + let at_clone = at.clone(); + let hashes_future = hashes_future.then(move |hashes| { + let notify = || { + let hashes = hashes.chain(known_imported_hashes.into_iter()); + + let header_hash = self_clone.0.api.block_id_to_hash(&at_clone)? + .ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at_clone)).into())?; + let mut listener = self_clone.0.listener.write(); + for h in hashes { + listener.pruned(header_hash, &h); + } + + Ok(()) + }; + + ready(notify()) + }); + // perform regular cleanup of old transactions in the pool // and update temporary bans. - self.clear_stale(at)?; - Ok(()) + let self_clone = self.clone(); + let at_clone = at.clone(); + Either::Right(hashes_future.then(move |r| ready(match r { + Ok(()) => self_clone.clear_stale(&at_clone), + Err(err) => Err(err), + }))) } /// Removes stale transactions from the pool. - /// - /// Stale transactions are transaction beyond their longevity period. - /// Note this function does not remove transactions that are already included in the chain. - /// See `prune_tags` if you want this. - pub fn clear_stale(&self, at: &BlockId) -> Result<(), B::Error> { - let block_number = self.api.block_id_to_number(at)? - .ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())? - .saturated_into::(); + fn clear_stale(&self, at: &BlockId) -> Result<(), B::Error> { + let block_number = self.0.api.block_id_to_number(at)? + .ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())? + .saturated_into::(); let now = time::Instant::now(); let to_remove = { - self.ready() - .filter(|tx| self.rotator.ban_if_stale(&now, block_number, &tx)) + self.0.pool.read().ready() + .filter(|tx| self.0.rotator.ban_if_stale(&now, block_number, &tx)) .map(|tx| tx.hash.clone()) .collect::>() }; let futures_to_remove: Vec> = { - let p = self.pool.read(); + let p = self.0.pool.read(); let mut hashes = Vec::new(); for tx in p.futures() { - if self.rotator.ban_if_stale(&now, block_number, &tx) { + if self.0.rotator.ban_if_stale(&now, block_number, &tx) { hashes.push(tx.hash.clone()); } } @@ -362,68 +530,26 @@ impl Pool { self.remove_invalid(&to_remove); self.remove_invalid(&futures_to_remove); // clear banned transactions timeouts - self.rotator.clear_timeouts(&now); + self.0.rotator.clear_timeouts(&now); Ok(()) } - /// Create a new transaction pool. - pub fn new(options: Options, api: B) -> Self { - Pool { - api, - options, - listener: Default::default(), - pool: Default::default(), - import_notification_sinks: Default::default(), - rotator: Default::default(), - } - } - - /// Return an event stream of transactions imported to the pool. - pub fn import_notification_stream(&self) -> EventStream { - let (sink, stream) = mpsc::unbounded(); - self.import_notification_sinks.lock().push(sink); - stream - } - - /// Invoked when extrinsics are broadcasted. - pub fn on_broadcasted(&self, propagated: HashMap, Vec>) { - let mut listener = self.listener.write(); - for (hash, peers) in propagated.into_iter() { - listener.broadcasted(&hash, peers); - } - } - /// Remove from the pool. - pub fn remove_invalid(&self, hashes: &[ExHash]) -> Vec> { + fn remove_invalid(&self, hashes: &[ExHash]) -> Vec> { // temporarily ban invalid transactions debug!(target: "txpool", "Banning invalid transactions: {:?}", hashes); - self.rotator.ban(&time::Instant::now(), hashes.iter().cloned()); + self.0.rotator.ban(&time::Instant::now(), hashes.iter().cloned()); - let invalid = self.pool.write().remove_invalid(hashes); + let invalid = self.0.pool.write().remove_invalid(hashes); - let mut listener = self.listener.write(); + let mut listener = self.0.listener.write(); for tx in &invalid { listener.invalid(&tx.hash); } invalid } - - /// Get an iterator for ready transactions ordered by priority - pub fn ready(&self) -> impl Iterator> { - self.pool.read().ready() - } - - /// Returns pool status. - pub fn status(&self) -> base::Status { - self.pool.read().status() - } - - /// Returns transaction hash - pub fn hash_of(&self, xt: &ExtrinsicFor) -> ExHash { - self.api.hash_and_length(xt).0 - } } fn fire_events( @@ -454,6 +580,7 @@ fn fire_events( #[cfg(test)] mod tests { + use futures::executor::block_on; use super::*; use sr_primitives::transaction_validity::{ValidTransaction, InvalidTransaction}; use codec::Encode; @@ -472,14 +599,15 @@ mod tests { type Block = Block; type Hash = u64; type Error = error::Error; + type ValidationFuture = futures::future::Ready>; /// Verify extrinsic at given block. fn validate_transaction( &self, at: &BlockId, uxt: ExtrinsicFor, - ) -> Result { - let block_number = self.block_id_to_number(at)?.unwrap(); + ) -> Self::ValidationFuture { + let block_number = self.block_id_to_number(at).unwrap().unwrap(); let nonce = uxt.transfer().nonce; // This is used to control the test flow. @@ -492,7 +620,7 @@ mod tests { } } - if nonce < block_number { + futures::future::ready(if nonce < block_number { Ok(InvalidTransaction::Stale.into()) } else { Ok(Ok(ValidTransaction { @@ -502,7 +630,7 @@ mod tests { longevity: 3, propagate: true, })) - } + }) } /// Returns a block number given the block id. @@ -546,12 +674,12 @@ mod tests { let pool = pool(); // when - let hash = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + let hash = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 0, - })).unwrap(); + }))).unwrap(); // then assert_eq!(pool.ready().map(|v| v.hash).collect::>(), vec![hash]); @@ -569,8 +697,8 @@ mod tests { }); // when - pool.rotator.ban(&time::Instant::now(), vec![pool.hash_of(&uxt)]); - let res = pool.submit_one(&BlockId::Number(0), uxt); + pool.data.0.rotator.ban(&time::Instant::now(), vec![pool.hash_of(&uxt)]); + let res = block_on(pool.submit_one(&BlockId::Number(0), uxt)); assert_eq!(pool.status().ready, 0); assert_eq!(pool.status().future, 0); @@ -586,25 +714,25 @@ mod tests { let stream = pool.import_notification_stream(); // when - let _hash = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + let _hash = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 0, - })).unwrap(); - let _hash = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + }))).unwrap(); + let _hash = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 1, - })).unwrap(); + }))).unwrap(); // future doesn't count - let _hash = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + let _hash = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 3, - })).unwrap(); + }))).unwrap(); assert_eq!(pool.status().ready, 2); assert_eq!(pool.status().future, 1); @@ -622,24 +750,24 @@ mod tests { fn should_clear_stale_transactions() { // given let pool = pool(); - let hash1 = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + let hash1 = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 0, - })).unwrap(); - let hash2 = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + }))).unwrap(); + let hash2 = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 1, - })).unwrap(); - let hash3 = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + }))).unwrap(); + let hash3 = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 3, - })).unwrap(); + }))).unwrap(); // when pool.clear_stale(&BlockId::Number(5)).unwrap(); @@ -649,27 +777,27 @@ mod tests { assert_eq!(pool.status().future, 0); assert_eq!(pool.status().ready, 0); // make sure they are temporarily banned as well - assert!(pool.rotator.is_banned(&hash1)); - assert!(pool.rotator.is_banned(&hash2)); - assert!(pool.rotator.is_banned(&hash3)); + assert!(pool.data.0.rotator.is_banned(&hash1)); + assert!(pool.data.0.rotator.is_banned(&hash2)); + assert!(pool.data.0.rotator.is_banned(&hash3)); } #[test] fn should_ban_mined_transactions() { // given let pool = pool(); - let hash1 = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + let hash1 = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 0, - })).unwrap(); + }))).unwrap(); // when - pool.prune_tags(&BlockId::Number(1), vec![vec![0]], vec![hash1.clone()]).unwrap(); + block_on(pool.prune_tags(&BlockId::Number(1), vec![vec![0]], vec![hash1.clone()])).unwrap(); // then - assert!(pool.rotator.is_banned(&hash1)); + assert!(pool.data.0.rotator.is_banned(&hash1)); } #[test] @@ -684,26 +812,26 @@ mod tests { future: limit.clone(), }, TestApi::default()); - let hash1 = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + let hash1 = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 1, - })).unwrap(); + }))).unwrap(); assert_eq!(pool.status().future, 1); // when - let hash2 = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + let hash2 = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(2)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 10, - })).unwrap(); + }))).unwrap(); // then assert_eq!(pool.status().future, 1); - assert!(pool.rotator.is_banned(&hash1)); - assert!(!pool.rotator.is_banned(&hash2)); + assert!(pool.data.0.rotator.is_banned(&hash1)); + assert!(!pool.data.0.rotator.is_banned(&hash2)); } #[test] @@ -719,12 +847,12 @@ mod tests { }, TestApi::default()); // when - pool.submit_one(&BlockId::Number(0), uxt(Transfer { + block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 1, - })).unwrap_err(); + }))).unwrap_err(); // then assert_eq!(pool.status().ready, 0); @@ -737,12 +865,12 @@ mod tests { let pool = pool(); // when - let err = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + let err = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: INVALID_NONCE, - })).unwrap_err(); + }))).unwrap_err(); // then assert_eq!(pool.status().ready, 0); @@ -757,17 +885,17 @@ mod tests { fn should_trigger_ready_and_finalized() { // given let pool = pool(); - let watcher = pool.submit_and_watch(&BlockId::Number(0), uxt(Transfer { + let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 0, - })).unwrap(); + }))).unwrap(); assert_eq!(pool.status().ready, 1); assert_eq!(pool.status().future, 0); // when - pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![]).unwrap(); + block_on(pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![])).unwrap(); assert_eq!(pool.status().ready, 0); assert_eq!(pool.status().future, 0); @@ -782,17 +910,17 @@ mod tests { fn should_trigger_ready_and_finalized_when_pruning_via_hash() { // given let pool = pool(); - let watcher = pool.submit_and_watch(&BlockId::Number(0), uxt(Transfer { + let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 0, - })).unwrap(); + }))).unwrap(); assert_eq!(pool.status().ready, 1); assert_eq!(pool.status().future, 0); // when - pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![2u64]).unwrap(); + block_on(pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![2u64])).unwrap(); assert_eq!(pool.status().ready, 0); assert_eq!(pool.status().future, 0); @@ -807,22 +935,22 @@ mod tests { fn should_trigger_future_and_ready_after_promoted() { // given let pool = pool(); - let watcher = pool.submit_and_watch(&BlockId::Number(0), uxt(Transfer { + let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 1, - })).unwrap(); + }))).unwrap(); assert_eq!(pool.status().ready, 0); assert_eq!(pool.status().future, 1); // when - pool.submit_one(&BlockId::Number(0), uxt(Transfer { + block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 0, - })).unwrap(); + }))).unwrap(); assert_eq!(pool.status().ready, 2); // then @@ -841,7 +969,7 @@ mod tests { amount: 5, nonce: 0, }); - let watcher = pool.submit_and_watch(&BlockId::Number(0), uxt).unwrap(); + let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), uxt)).unwrap(); assert_eq!(pool.status().ready, 1); // when @@ -865,7 +993,7 @@ mod tests { amount: 5, nonce: 0, }); - let watcher = pool.submit_and_watch(&BlockId::Number(0), uxt).unwrap(); + let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), uxt)).unwrap(); assert_eq!(pool.status().ready, 1); // when @@ -899,7 +1027,7 @@ mod tests { amount: 5, nonce: 0, }); - let watcher = pool.submit_and_watch(&BlockId::Number(0), xt).unwrap(); + let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), xt)).unwrap(); assert_eq!(pool.status().ready, 1); // when @@ -909,7 +1037,7 @@ mod tests { amount: 4, nonce: 1, }); - pool.submit_one(&BlockId::Number(1), xt).unwrap(); + block_on(pool.submit_one(&BlockId::Number(1), xt)).unwrap(); assert_eq!(pool.status().ready, 1); // then @@ -939,7 +1067,7 @@ mod tests { // This transaction should go to future, since we use `nonce: 1` let pool2 = pool.clone(); std::thread::spawn(move || { - pool2.submit_one(&BlockId::Number(0), xt).unwrap(); + block_on(pool2.submit_one(&BlockId::Number(0), xt)).unwrap(); ready.send(()).unwrap(); }); @@ -953,11 +1081,11 @@ mod tests { }); // The tag the above transaction provides (TestApi is using just nonce as u8) let provides = vec![0_u8]; - pool.submit_one(&BlockId::Number(0), xt).unwrap(); + block_on(pool.submit_one(&BlockId::Number(0), xt)).unwrap(); assert_eq!(pool.status().ready, 1); // Now block import happens before the second transaction is able to finish verification. - pool.prune_tags(&BlockId::Number(1), vec![provides], vec![]).unwrap(); + block_on(pool.prune_tags(&BlockId::Number(1), vec![provides], vec![])).unwrap(); assert_eq!(pool.status().ready, 0); diff --git a/core/transaction-pool/src/api.rs b/core/transaction-pool/src/api.rs index c0c4c787a5012..96403bd3f876f 100644 --- a/core/transaction-pool/src/api.rs +++ b/core/transaction-pool/src/api.rs @@ -37,24 +37,24 @@ use sr_primitives::{ use crate::error; /// The transaction pool logic -pub struct ChainApi { +pub struct FullChainApi { client: Arc, _marker: PhantomData, } -impl ChainApi where +impl FullChainApi where Block: traits::Block, T: traits::ProvideRuntimeApi + HeaderBackend { /// Create new transaction pool logic. pub fn new(client: Arc) -> Self { - ChainApi { + FullChainApi { client, _marker: Default::default() } } } -impl txpool::ChainApi for ChainApi where +impl txpool::ChainApi for FullChainApi where Block: traits::Block, T: traits::ProvideRuntimeApi + HeaderBackend, T::Api: TaggedTransactionQueue @@ -62,9 +62,14 @@ impl txpool::ChainApi for ChainApi where type Block = Block; type Hash = H256; type Error = error::Error; + type ValidationFuture = futures::future::Ready>; - fn validate_transaction(&self, at: &BlockId, uxt: txpool::ExtrinsicFor) -> error::Result { - Ok(self.client.runtime_api().validate_transaction(at, uxt)?) + fn validate_transaction( + &self, + at: &BlockId, + uxt: txpool::ExtrinsicFor, + ) -> Self::ValidationFuture { + futures::future::ready(self.client.runtime_api().validate_transaction(at, uxt).map_err(Into::into)) } fn block_id_to_number(&self, at: &BlockId) -> error::Result>> { diff --git a/core/transaction-pool/src/lib.rs b/core/transaction-pool/src/lib.rs index 1899c601b2fdb..6938166299d85 100644 --- a/core/transaction-pool/src/lib.rs +++ b/core/transaction-pool/src/lib.rs @@ -25,5 +25,5 @@ mod tests; pub mod error; -pub use api::ChainApi; +pub use api::FullChainApi; pub use txpool; diff --git a/core/transaction-pool/src/tests.rs b/core/transaction-pool/src/tests.rs index 1661b7108b9f8..d1ad27dd260f0 100644 --- a/core/transaction-pool/src/tests.rs +++ b/core/transaction-pool/src/tests.rs @@ -18,6 +18,7 @@ use super::*; use codec::Encode; +use futures::executor::block_on; use txpool::{self, Pool}; use test_client::{runtime::{AccountId, Block, Hash, Index, Extrinsic, Transfer}, AccountKeyring::{self, *}}; use sr_primitives::{ @@ -38,12 +39,13 @@ impl txpool::ChainApi for TestApi { type Block = Block; type Hash = Hash; type Error = error::Error; + type ValidationFuture = futures::future::Ready>; fn validate_transaction( &self, at: &BlockId, uxt: txpool::ExtrinsicFor, - ) -> error::Result { + ) -> Self::ValidationFuture { let expected = index(at); let requires = if expected == uxt.transfer().nonce { vec![] @@ -52,7 +54,7 @@ impl txpool::ChainApi for TestApi { }; let provides = vec![vec![uxt.transfer().nonce as u8]]; - Ok( + futures::future::ready(Ok( Ok(ValidTransaction { priority: 1, requires, @@ -60,7 +62,7 @@ impl txpool::ChainApi for TestApi { longevity: 64, propagate: true, }) - ) + )) } fn block_id_to_number(&self, at: &BlockId) -> error::Result>> { @@ -111,7 +113,7 @@ fn pool() -> Pool { fn submission_should_work() { let pool = pool(); assert_eq!(209, index(&BlockId::number(0))); - pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap(); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, vec![209]); @@ -120,8 +122,8 @@ fn submission_should_work() { #[test] fn multiple_submission_should_work() { let pool = pool(); - pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap(); - pool.submit_one(&BlockId::number(0), uxt(Alice, 210)).unwrap(); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 210))).unwrap(); let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, vec![209, 210]); @@ -130,7 +132,7 @@ fn multiple_submission_should_work() { #[test] fn early_nonce_should_be_culled() { let pool = pool(); - pool.submit_one(&BlockId::number(0), uxt(Alice, 208)).unwrap(); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 208))).unwrap(); let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, Vec::::new()); @@ -140,11 +142,11 @@ fn early_nonce_should_be_culled() { fn late_nonce_should_be_queued() { let pool = pool(); - pool.submit_one(&BlockId::number(0), uxt(Alice, 210)).unwrap(); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 210))).unwrap(); let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, Vec::::new()); - pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap(); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, vec![209, 210]); } @@ -152,13 +154,13 @@ fn late_nonce_should_be_queued() { #[test] fn prune_tags_should_work() { let pool = pool(); - pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap(); - pool.submit_one(&BlockId::number(0), uxt(Alice, 210)).unwrap(); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 210))).unwrap(); let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, vec![209, 210]); - pool.prune_tags(&BlockId::number(1), vec![vec![209]], vec![]).unwrap(); + block_on(pool.prune_tags(&BlockId::number(1), vec![vec![209]], vec![])).unwrap(); let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, vec![210]); @@ -168,14 +170,14 @@ fn prune_tags_should_work() { fn should_ban_invalid_transactions() { let pool = pool(); let uxt = uxt(Alice, 209); - let hash = pool.submit_one(&BlockId::number(0), uxt.clone()).unwrap(); + let hash = block_on(pool.submit_one(&BlockId::number(0), uxt.clone())).unwrap(); pool.remove_invalid(&[hash]); - pool.submit_one(&BlockId::number(0), uxt.clone()).unwrap_err(); + block_on(pool.submit_one(&BlockId::number(0), uxt.clone())).unwrap_err(); // when let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, Vec::::new()); // then - pool.submit_one(&BlockId::number(0), uxt.clone()).unwrap_err(); + block_on(pool.submit_one(&BlockId::number(0), uxt.clone())).unwrap_err(); } diff --git a/node-template/src/service.rs b/node-template/src/service.rs index f4ab3f40000e2..6f69be63e5dab 100644 --- a/node-template/src/service.rs +++ b/node-template/src/service.rs @@ -43,7 +43,7 @@ macro_rules! new_full_start { Ok(substrate_client::LongestChain::new(backend.clone())) })? .with_transaction_pool(|config, client| - Ok(transaction_pool::txpool::Pool::new(config, transaction_pool::ChainApi::new(client))) + Ok(transaction_pool::txpool::Pool::new(config, transaction_pool::FullChainApi::new(client))) )? .with_import_queue(|_config, client, mut select_chain, transaction_pool| { let select_chain = select_chain.take() @@ -195,7 +195,7 @@ pub fn new_light(config: Configuration(config: Configuration Date: Thu, 19 Sep 2019 18:29:45 +0300 Subject: [PATCH 02/12] Update core/rpc/src/author/mod.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Tomasz DrwiÄ™ga --- core/rpc/src/author/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/rpc/src/author/mod.rs b/core/rpc/src/author/mod.rs index 1465c24fb1ad1..faf0d8e5c527f 100644 --- a/core/rpc/src/author/mod.rs +++ b/core/rpc/src/author/mod.rs @@ -165,7 +165,7 @@ impl AuthorApi, BlockHash

> for Author whe let submit = || -> Result<_> { let best_block_hash = self.client.info().chain.best_hash; let dxt = <

::Block as traits::Block>::Extrinsic::decode(&mut &xt[..]) - .map_err(|e| error::Error::from(e))?; + .map_err(error::Error::from)?; Ok(self.pool .submit_and_watch(&generic::BlockId::hash(best_block_hash), dxt) .boxed() From 79812911c244182c006cdee4e5dc62f86c833906 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Thu, 19 Sep 2019 18:40:31 +0300 Subject: [PATCH 03/12] Update core/transaction-pool/graph/src/pool.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Tomasz Drwięga --- core/transaction-pool/graph/src/pool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/transaction-pool/graph/src/pool.rs b/core/transaction-pool/graph/src/pool.rs index 3f7248cf7759a..0f4524d1d95dd 100644 --- a/core/transaction-pool/graph/src/pool.rs +++ b/core/transaction-pool/graph/src/pool.rs @@ -146,7 +146,7 @@ impl Pool { at: &BlockId, xt: ExtrinsicFor, ) -> impl Future, B::Error>> { - match self.submit_at(at, ::std::iter::once(xt), false) { + match self.submit_at(at, std::iter::once(xt), false) { Ok(import_future) => Either::Left(import_future .map(|mut import_result| import_result .pop() From 760c7cf61eb9baf58058c50c434338dac58935db Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Tue, 24 Sep 2019 13:47:38 +0300 Subject: [PATCH 04/12] Pool -> Pool + ValidatedPool --- core/basic-authorship/src/basic_authorship.rs | 4 +- core/service/src/builder.rs | 23 +- core/transaction-pool/graph/src/lib.rs | 7 +- core/transaction-pool/graph/src/pool.rs | 576 +++++++----------- .../graph/src/validated_pool.rs | 371 +++++++++++ 5 files changed, 594 insertions(+), 387 deletions(-) create mode 100644 core/transaction-pool/graph/src/validated_pool.rs diff --git a/core/basic-authorship/src/basic_authorship.rs b/core/basic-authorship/src/basic_authorship.rs index c3332670149bc..7f8b343f6512c 100644 --- a/core/basic-authorship/src/basic_authorship.rs +++ b/core/basic-authorship/src/basic_authorship.rs @@ -251,8 +251,8 @@ mod tests { let txpool = Arc::new(TransactionPool::new(Default::default(), chain_api)); futures::executor::block_on( - txpool.submit_at(&BlockId::number(0), vec![extrinsic(0), extrinsic(1)], false).unwrap() - ); + txpool.submit_at(&BlockId::number(0), vec![extrinsic(0), extrinsic(1)], false) + ).unwrap(); let mut proposer_factory = ProposerFactory { client: client.clone(), diff --git a/core/service/src/builder.rs b/core/service/src/builder.rs index 6ca6b440196a2..372814596388c 100644 --- a/core/service/src/builder.rs +++ b/core/service/src/builder.rs @@ -24,7 +24,7 @@ use client::{ }; use codec::{Decode, Encode, IoReader}; use consensus_common::import_queue::ImportQueue; -use futures::{prelude::*, sync::mpsc, future::{result, Either}}; +use futures::{prelude::*, sync::mpsc}; use futures03::{ compat::Compat, future::ready, @@ -937,17 +937,16 @@ pub(crate) fn maintain_transaction_pool( let retracted_transactions = retracted.to_vec().into_iter() .filter_map(move |hash| client_copy.block(&BlockId::hash(hash)).ok().unwrap_or(None)) .flat_map(|block| block.block.deconstruct().1.into_iter()); - let resubmit_future = match transaction_pool.submit_at(id, retracted_transactions, true) { - Ok(resubmit_future) => Either::A(resubmit_future - .then(|_| ready(Ok(()))) - .boxed() - .compat() - ), - Err(e) => { - warn!("Error re-submitting transactions: {:?}", e); - Either::B(result(Ok(()))) - } - }; + let resubmit_future = transaction_pool + .submit_at(id, retracted_transactions, true) + .then(|resubmit_result| ready(match resubmit_result { + Ok(_) => Ok(()), + Err(e) => { + warn!("Error re-submitting transactions: {:?}", e); + Ok(()) + } + })) + .compat(); // Avoid calling into runtime if there is nothing to prune from the pool anyway. if transaction_pool.status().is_empty() { diff --git a/core/transaction-pool/graph/src/lib.rs b/core/transaction-pool/graph/src/lib.rs index ea890a5cd0f21..715e60874be95 100644 --- a/core/transaction-pool/graph/src/lib.rs +++ b/core/transaction-pool/graph/src/lib.rs @@ -29,6 +29,7 @@ mod listener; mod pool; mod ready; mod rotator; +mod validated_pool; pub mod base_pool; pub mod error; @@ -36,4 +37,8 @@ pub mod watcher; pub use self::error::IntoPoolError; pub use self::base_pool::{Transaction, Status}; -pub use self::pool::{Pool, Options, ChainApi, EventStream, ExtrinsicFor, BlockHash, ExHash, NumberFor, TransactionFor}; +pub use self::pool::{ + Pool, + Options, ChainApi, EventStream, ExtrinsicFor, + BlockHash, ExHash, NumberFor, TransactionFor, +}; diff --git a/core/transaction-pool/graph/src/pool.rs b/core/transaction-pool/graph/src/pool.rs index 0f4524d1d95dd..479b6e7495593 100644 --- a/core/transaction-pool/graph/src/pool.rs +++ b/core/transaction-pool/graph/src/pool.rs @@ -15,33 +15,27 @@ // along with Substrate. If not, see . use std::{ - collections::{HashSet, HashMap}, hash, + collections::HashMap, sync::Arc, - time, }; use crate::base_pool as base; use crate::error; -use crate::listener::Listener; -use crate::rotator::PoolRotator; use crate::watcher::Watcher; use serde::Serialize; -use log::debug; use futures::{ Future, FutureExt, channel::mpsc, future::{Either, ready, join_all}, }; -use parking_lot::{Mutex, RwLock}; use sr_primitives::{ generic::BlockId, traits::{self, SaturatedConversion}, transaction_validity::{TransactionValidity, TransactionTag as Tag, TransactionValidityError}, }; - -pub use crate::base_pool::Limit; +use crate::validated_pool::{ValidatedPool, ValidatedTransaction}; /// Modification notification event stream type; pub type EventStream = mpsc::UnboundedReceiver<()>; @@ -56,6 +50,12 @@ pub type ExtrinsicFor = <::Block as traits::Block>::Extrinsic; pub type NumberFor = traits::NumberFor<::Block>; /// A type of transaction stored in the pool pub type TransactionFor = Arc, ExtrinsicFor>>; +/// A type of validated transaction stored in the pool. +pub type ValidatedTransactionFor = ValidatedTransaction< + ExHash, + ExtrinsicFor, + ::Error, +>; /// Concrete extrinsic validation and query logic. pub trait ChainApi: Send + Sync { @@ -89,19 +89,19 @@ pub trait ChainApi: Send + Sync { #[derive(Debug, Clone)] pub struct Options { /// Ready queue limits. - pub ready: Limit, + pub ready: base::Limit, /// Future queue limits. - pub future: Limit, + pub future: base::Limit, } impl Default for Options { fn default() -> Self { Options { - ready: Limit { + ready: base::Limit { count: 512, total_bytes: 10 * 1024 * 1024, }, - future: Limit { + future: base::Limit { count: 128, total_bytes: 1 * 1024 * 1024, }, @@ -109,35 +109,33 @@ impl Default for Options { } } -/// Extrinsics pool. +/// Extrinsics pool that performs validation. pub struct Pool { - data: SharedPoolData, + validated_pool: SharedValidatedPool, } -/// Extrinsics pool shared data. -struct SharedPoolData(Arc>); - -/// Extrinsics pool data. -struct PoolData { - api: B, - options: Options, - listener: RwLock, BlockHash>>, - pool: RwLock, - ExtrinsicFor, - >>, - import_notification_sinks: Mutex>>, - rotator: PoolRotator>, -} +/// Shared validated extrinsics pool. +struct SharedValidatedPool(Arc>); impl Pool { + /// Create a new transaction pool. + pub fn new(options: Options, api: B) -> Self { + Pool { + validated_pool: SharedValidatedPool(Arc::new(ValidatedPool::new(options, api))), + } + } + /// Imports a bunch of unverified extrinsics to the pool pub fn submit_at(&self, at: &BlockId, xts: T, force: bool) - -> Result, B::Error>>>, B::Error> + -> impl Future, B::Error>>, B::Error>> where T: IntoIterator> { - self.data.submit_at(at, xts, force) + let validated_pool = self.validated_pool.clone(); + self.validated_pool + .verify(at, xts, force) + .map(move |validated_transactions| validated_transactions + .map(|validated_transactions| validated_pool.0.submit(validated_transactions))) } /// Imports one unverified extrinsic to the pool @@ -146,15 +144,11 @@ impl Pool { at: &BlockId, xt: ExtrinsicFor, ) -> impl Future, B::Error>> { - match self.submit_at(at, std::iter::once(xt), false) { - Ok(import_future) => Either::Left(import_future - .map(|mut import_result| import_result - .pop() - .expect("One extrinsic passed; one result returned; qed") - ) - ), - Err(error) => Either::Right(ready(Err(error))), - } + self.submit_at(at, std::iter::once(xt), false) + .map(|import_result| import_result.and_then(|mut import_result| import_result + .pop() + .expect("One extrinsic passed; one result returned; qed") + )) } /// Import a single extrinsic and starts to watch their progress in the pool. @@ -163,9 +157,16 @@ impl Pool { at: &BlockId, xt: ExtrinsicFor, ) -> impl Future, BlockHash>, B::Error>> { - let hash = self.data.0.api.hash_and_length(&xt).0; - let watcher = self.data.0.listener.write().create_watcher(hash); - self.submit_one(at, xt).map(move |r| r.map(|_| watcher)) + let block_number = match self.validated_pool.resolve_block_number(at) { + Ok(block_number) => block_number, + Err(err) => return Either::Left(ready(Err(err))) + }; + + let validated_pool = self.validated_pool.clone(); + Either::Right( + self.validated_pool.verify_one(at, block_number, xt, false) + .map(move |validated_transactions| validated_pool.0.submit_and_watch(validated_transactions)) + ) } /// Prunes ready transactions. @@ -180,45 +181,7 @@ impl Pool { parent: &BlockId, extrinsics: &[ExtrinsicFor], ) -> impl Future> { - // Get details of all extrinsics that are already in the pool - let hashes = extrinsics.iter() - .map(|extrinsic| self.data.0.api.hash_and_length(extrinsic).0) - .collect::>(); - let in_pool = self.data.0.pool.read().by_hash(&hashes); - - // Zip the ones from the pool with the full list (we get pairs `(Extrinsic, Option)`) - let all = extrinsics.iter().zip(in_pool.iter()); - - // Prepare future that collect tags for all extrinsics - let future_tags = join_all(all - .map(|(extrinsic, existing_in_pool)| - match *existing_in_pool { - // reuse the tags for extrinsics that were found in the pool - Some(ref transaction) => Either::Left( - ready(transaction.provides.clone()) - ), - // if it's not found in the pool query the runtime at parent block - // to get validity info and tags that the extrinsic provides. - None => Either::Right(self.data.0.api.validate_transaction(parent, extrinsic.clone()) - .then(|validity| ready(match validity { - Ok(Ok(validity)) => validity.provides, - // silently ignore invalid extrinsics, - // cause they might just be inherent - _ => Vec::new(), - }))), - } - )); - - // Prune all transactions that provide collected tags - let data = self.data.clone(); - let at_clone = at.clone(); - future_tags.then(move |tags| - data.prune_tags( - &at_clone, - tags.into_iter().flat_map(|t| t), - in_pool.into_iter().filter_map(|x| x).map(|x| x.hash.clone()), - ) - ) + self.validated_pool.prune(at, parent, extrinsics) } /// Prunes ready transactions that provide given list of tags. @@ -231,6 +194,9 @@ impl Pool { /// 1. Provide that tag directly /// 2. Are a dependency of pruned transaction. /// + /// Returns transactions that have been removed from the pool and must be reverified + /// before reinserting to the pool. + /// /// By removing predecessor transactions as well we might actually end up /// pruning too much, so all removed transactions are reverified against /// the runtime (`validate_transaction`) to make sure they are invalid. @@ -245,190 +211,90 @@ impl Pool { tags: impl IntoIterator, known_imported_hashes: impl IntoIterator> + Clone, ) -> impl Future> { - self.data.prune_tags(at, tags, known_imported_hashes) - } - - /// Removes stale transactions from the pool. - /// - /// Stale transactions are transaction beyond their longevity period. - /// Note this function does not remove transactions that are already included in the chain. - /// See `prune_tags` if you want this. - pub fn clear_stale(&self, at: &BlockId) -> Result<(), B::Error> { - self.data.clear_stale(at) - } - - /// Create a new transaction pool. - pub fn new(options: Options, api: B) -> Self { - Pool { - data: SharedPoolData(Arc::new(PoolData { - api, - options, - listener: Default::default(), - pool: Default::default(), - import_notification_sinks: Default::default(), - rotator: Default::default(), - })), - } + self.validated_pool.prune_tags(at, tags, known_imported_hashes) } /// Return an event stream of transactions imported to the pool. pub fn import_notification_stream(&self) -> EventStream { - let (sink, stream) = mpsc::unbounded(); - self.data.0.import_notification_sinks.lock().push(sink); - stream + self.validated_pool.0.import_notification_stream() } /// Invoked when extrinsics are broadcasted. pub fn on_broadcasted(&self, propagated: HashMap, Vec>) { - let mut listener = self.data.0.listener.write(); - for (hash, peers) in propagated.into_iter() { - listener.broadcasted(&hash, peers); - } + self.validated_pool.0.on_broadcasted(propagated) } /// Remove from the pool. pub fn remove_invalid(&self, hashes: &[ExHash]) -> Vec> { - self.data.remove_invalid(hashes) + self.validated_pool.0.remove_invalid(hashes) } /// Get an iterator for ready transactions ordered by priority pub fn ready(&self) -> impl Iterator> { - self.data.0.pool.read().ready() + self.validated_pool.0.ready() } /// Returns pool status. pub fn status(&self) -> base::Status { - self.data.0.pool.read().status() + self.validated_pool.0.status() } /// Returns transaction hash pub fn hash_of(&self, xt: &ExtrinsicFor) -> ExHash { - self.data.0.api.hash_and_length(xt).0 + self.validated_pool.0.api().hash_and_length(xt).0 } } -impl Clone for SharedPoolData { +impl Clone for SharedValidatedPool { fn clone(&self) -> Self { Self(self.0.clone()) } } -impl SharedPoolData { - /// Imports a bunch of unverified extrinsics to the pool - pub fn submit_at(&self, at: &BlockId, xts: T, force: bool) - -> Result, B::Error>>>, B::Error> - where - T: IntoIterator> - { - let block_number = self.0.api.block_id_to_number(at)? - .ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())?; - - // for each xt, prepare a validation future - let validation_futures = xts.into_iter().map(move |xt| { - let (hash, bytes) = self.0.api.hash_and_length(&xt); - if !force && self.0.rotator.is_banned(&hash) { - return Either::Left(ready(Err(error::Error::TemporarilyBanned.into()))) - } - - let self_clone = self.clone(); - Either::Right(self.0.api.validate_transaction(at, xt.clone()) - .then(move |validation_result| ready(validation_result.and_then(|validity| - match validity { - Ok(validity) => if validity.provides.is_empty() { - Err(error::Error::NoTagsProvided.into()) - } else { - Ok(base::Transaction { - data: xt, - bytes, - hash, - priority: validity.priority, - requires: validity.requires, - provides: validity.provides, - propagate: validity.propagate, - valid_till: block_number - .saturated_into::() - .saturating_add(validity.longevity), - }) - }, - Err(TransactionValidityError::Invalid(e)) => { - Err(error::Error::InvalidTransaction(e).into()) - }, - Err(TransactionValidityError::Unknown(e)) => { - self_clone.0.listener.write().invalid(&hash); - Err(error::Error::UnknownTransaction(e).into()) - }, - } - ))) - ) - }); +impl SharedValidatedPool { + /// Prunes ready transactions. + fn prune( + &self, + at: &BlockId, + parent: &BlockId, + extrinsics: &[ExtrinsicFor], + ) -> impl Future> { + // Get details of all extrinsics that are already in the pool + let (in_pool_hashes, in_pool_tags) = self.0.extrinsics_tags(extrinsics); - // make single validation future that waits all until all extrinsics are validated - let validation_future = join_all(validation_futures); - - // and then import all successfully validated extrinsics at once + enforce limits - let self_clone = self.clone(); - let import_future = validation_future - .then(move |validation_results| ready({ - let import_results = validation_results - .into_iter() - .map(|validation_result| { - let imported = self_clone.0.pool.write().import(validation_result?)?; - - if let base::Imported::Ready { .. } = imported { - self_clone.0.import_notification_sinks - .lock() - .retain(|sink| sink.unbounded_send(()).is_ok()); - } - - let mut listener = self_clone.0.listener.write(); - fire_events(&mut *listener, &imported); - Ok(imported.hash().clone()) - }) - .collect::>(); - - let removed = self_clone.enforce_limits(); - - import_results.into_iter().map(|res| match res { - Ok(ref hash) if removed.contains(hash) => Err(error::Error::ImmediatelyDropped.into()), - other => other, - }).collect::, B::Error>>>() - })); - - Ok(import_future) - } + // Zip the ones from the pool with the full list (we get pairs `(Extrinsic, Option>)`) + let all = extrinsics.iter().zip(in_pool_tags.into_iter()); - /// Remove extrinsics that ary beyond pool limits. - fn enforce_limits(&self) -> HashSet> { - let status = self.0.pool.read().status(); - let ready_limit = &self.0.options.ready; - let future_limit = &self.0.options.future; - - debug!(target: "txpool", "Pool Status: {:?}", status); - - if ready_limit.is_exceeded(status.ready, status.ready_bytes) - || future_limit.is_exceeded(status.future, status.future_bytes) { - // clean up the pool - let removed = { - let mut pool = self.0.pool.write(); - let removed = pool.enforce_limits(ready_limit, future_limit) - .into_iter().map(|x| x.hash.clone()).collect::>(); - // ban all removed transactions - self.0.rotator.ban(&std::time::Instant::now(), removed.iter().map(|x| x.clone())); - removed - }; - // run notifications - let mut listener = self.0.listener.write(); - for h in &removed { - listener.dropped(h, None); - } + // Prepare future that collect tags for all extrinsics + let future_tags = join_all(all + .map(|(extrinsic, in_pool_tags)| + match in_pool_tags { + // reuse the tags for extrinsics that were found in the pool + Some(tags) => Either::Left( + ready(tags) + ), + // if it's not found in the pool query the runtime at parent block + // to get validity info and tags that the extrinsic provides. + None => Either::Right(self.0.api().validate_transaction(parent, extrinsic.clone()) + .then(|validity| ready(match validity { + Ok(Ok(validity)) => validity.provides, + // silently ignore invalid extrinsics, + // cause they might just be inherent + _ => Vec::new(), + }))), + } + )); - removed - } else { - Default::default() - } + // Prune transactions by tags + let at = at.clone(); + let validated_pool = self.clone(); + future_tags.then(move |tags| validated_pool.prune_tags( + &at, + tags.into_iter().flat_map(|tags| tags), + in_pool_hashes, + )) } - /// Prunes ready transactions that provide given list of tags. fn prune_tags( &self, @@ -436,163 +302,129 @@ impl SharedPoolData { tags: impl IntoIterator, known_imported_hashes: impl IntoIterator> + Clone, ) -> impl Future> { - // Perform tag-based pruning in the base pool - let status = self.0.pool.write().prune_tags(tags); - // Notify event listeners of all transactions - // that were promoted to `Ready` or were dropped. - { - let mut listener = self.0.listener.write(); - for promoted in &status.promoted { - fire_events(&mut *listener, promoted); - } - for f in &status.failed { - listener.dropped(f, None); - } - } - // make sure that we don't revalidate extrinsics that were part of the recently + // Prune all transactions that provide given tags + let prune_status = match self.0.prune_tags(tags) { + Ok(prune_status) => prune_status, + Err(e) => return Either::Left(ready(Err(e))), + }; + + // Make sure that we don't revalidate extrinsics that were part of the recently // imported block. This is especially important for UTXO-like chains cause the // inputs are pruned so such transaction would go to future again. - self.0.rotator.ban(&std::time::Instant::now(), known_imported_hashes.clone().into_iter()); + self.0.ban(&std::time::Instant::now(), known_imported_hashes.clone().into_iter()); - // try to re-submit pruned transactions since some of them might be still valid. + // Try to re-validate pruned transactions since some of them might be still valid. // note that `known_imported_hashes` will be rejected here due to temporary ban. - let hashes = status.pruned.iter().map(|tx| tx.hash.clone()).collect::>(); - let results_future = self.submit_at(at, status.pruned.into_iter().map(|tx| tx.data.clone()), false); - let results_future = match results_future { - Ok(results_future) => results_future, + let pruned_hashes = prune_status.pruned.iter().map(|tx| tx.hash.clone()).collect::>(); + let pruned_transactions = prune_status.pruned.into_iter().map(|tx| tx.data.clone()); + let reverify_future = self.verify(at, pruned_transactions, false); + + // And finally - submit reverified transactions back to the pool + let at = at.clone(); + let validated_pool = self.clone(); + Either::Right(reverify_future.then(move |reverified_transactions| + ready(reverified_transactions.and_then(|reverified_transactions| + validated_pool.0.resubmit_pruned( + &at, + known_imported_hashes, + pruned_hashes, + reverified_transactions, + )) + ))) + } + + /// Resolves block number by id. + fn resolve_block_number(&self, at: &BlockId) -> Result, B::Error> { + self.0.api().block_id_to_number(at) + .and_then(|number| number.ok_or_else(|| + error::Error::InvalidBlockId(format!("{:?}", at)).into())) + } + + /// Returns future that validates a bunch of transactions at given block. + fn verify( + &self, + at: &BlockId, + xts: impl IntoIterator>, + force: bool, + ) -> impl Future>, B::Error>> { + // we need a block number to compute tx validity + let block_number = match self.resolve_block_number(at) { + Ok(block_number) => block_number, Err(err) => return Either::Left(ready(Err(err))), }; - // Collect the hashes of transactions that now became invalid (meaning that they are successfully pruned). - let hashes_future = results_future.then(move |results| ready(results - .into_iter() - .enumerate() - .filter_map(move |(idx, r)| match r.map_err(error::IntoPoolError::into_pool_error) { - Err(Ok(error::Error::InvalidTransaction(_))) => Some(hashes[idx].clone()), - _ => None, - }) - )); - - // Fire `pruned` notifications for collected hashes and make sure to include - // `known_imported_hashes` since they were just imported as part of the block. - let self_clone = self.clone(); - let at_clone = at.clone(); - let hashes_future = hashes_future.then(move |hashes| { - let notify = || { - let hashes = hashes.chain(known_imported_hashes.into_iter()); - - let header_hash = self_clone.0.api.block_id_to_hash(&at_clone)? - .ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at_clone)).into())?; - let mut listener = self_clone.0.listener.write(); - for h in hashes { - listener.pruned(header_hash, &h); - } - - Ok(()) - }; - - ready(notify()) - }); - - // perform regular cleanup of old transactions in the pool - // and update temporary bans. - let self_clone = self.clone(); - let at_clone = at.clone(); - Either::Right(hashes_future.then(move |r| ready(match r { - Ok(()) => self_clone.clear_stale(&at_clone), - Err(err) => Err(err), - }))) - } - - /// Removes stale transactions from the pool. - fn clear_stale(&self, at: &BlockId) -> Result<(), B::Error> { - let block_number = self.0.api.block_id_to_number(at)? - .ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())? - .saturated_into::(); - let now = time::Instant::now(); - let to_remove = { - self.0.pool.read().ready() - .filter(|tx| self.0.rotator.ban_if_stale(&now, block_number, &tx)) - .map(|tx| tx.hash.clone()) - .collect::>() - }; - let futures_to_remove: Vec> = { - let p = self.0.pool.read(); - let mut hashes = Vec::new(); - for tx in p.futures() { - if self.0.rotator.ban_if_stale(&now, block_number, &tx) { - hashes.push(tx.hash.clone()); - } - } - hashes - }; - // removing old transactions - self.remove_invalid(&to_remove); - self.remove_invalid(&futures_to_remove); - // clear banned transactions timeouts - self.0.rotator.clear_timeouts(&now); + // for each xt, prepare a validation future + let validation_futures = xts.into_iter().map(move |xt| + self.verify_one(at, block_number, xt, force) + ); - Ok(()) + // make single validation future that waits all until all extrinsics are validated + Either::Right(join_all(validation_futures).then(|x| ready(Ok(x)))) } - /// Remove from the pool. - fn remove_invalid(&self, hashes: &[ExHash]) -> Vec> { - // temporarily ban invalid transactions - debug!(target: "txpool", "Banning invalid transactions: {:?}", hashes); - self.0.rotator.ban(&time::Instant::now(), hashes.iter().cloned()); - - let invalid = self.0.pool.write().remove_invalid(hashes); - - let mut listener = self.0.listener.write(); - for tx in &invalid { - listener.invalid(&tx.hash); + /// Returns future that validates single transaction at given block. + fn verify_one( + &self, + block_id: &BlockId, + block_number: NumberFor, + xt: ExtrinsicFor, + force: bool, + ) -> impl Future> { + let (hash, bytes) = self.0.api().hash_and_length(&xt); + if !force && self.0.is_banned(&hash) { + return Either::Left(ready(ValidatedTransaction::Invalid(error::Error::TemporarilyBanned.into()))) } - invalid - } -} - -fn fire_events( - listener: &mut Listener, - imported: &base::Imported, -) where - H: hash::Hash + Eq + traits::Member + Serialize, - H2: Clone, -{ - match *imported { - base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => { - listener.ready(hash, None); - for f in failed { - listener.invalid(f); - } - for r in removed { - listener.dropped(&r.hash, Some(hash)); - } - for p in promoted { - listener.ready(p, None); - } - }, - base::Imported::Future { ref hash } => { - listener.future(hash) - }, + Either::Right(self.0.api().validate_transaction(block_id, xt.clone()) + .then(move |validation_result| ready(match validation_result { + Ok(validity) => match validity { + Ok(validity) => if validity.provides.is_empty() { + ValidatedTransaction::Invalid(error::Error::NoTagsProvided.into()) + } else { + ValidatedTransaction::Valid(base::Transaction { + data: xt, + bytes, + hash, + priority: validity.priority, + requires: validity.requires, + provides: validity.provides, + propagate: validity.propagate, + valid_till: block_number + .saturated_into::() + .saturating_add(validity.longevity), + }) + }, + Err(TransactionValidityError::Invalid(e)) => + ValidatedTransaction::Invalid(error::Error::InvalidTransaction(e).into()), + Err(TransactionValidityError::Unknown(e)) => + ValidatedTransaction::Unknown(hash, error::Error::UnknownTransaction(e).into()), + }, + Err(e) => ValidatedTransaction::Invalid(e), + }))) } } #[cfg(test)] mod tests { + use std::{ + collections::HashMap, + time::Instant, + }; + use parking_lot::Mutex; use futures::executor::block_on; use super::*; use sr_primitives::transaction_validity::{ValidTransaction, InvalidTransaction}; use codec::Encode; use test_runtime::{Block, Extrinsic, Transfer, H256, AccountId}; use assert_matches::assert_matches; + use crate::base_pool::Limit; use crate::watcher; const INVALID_NONCE: u64 = 254; - #[derive(Debug, Default)] + #[derive(Clone, Debug, Default)] struct TestApi { - delay: Mutex>>, + delay: Arc>>>, } impl ChainApi for TestApi { @@ -697,7 +529,7 @@ mod tests { }); // when - pool.data.0.rotator.ban(&time::Instant::now(), vec![pool.hash_of(&uxt)]); + pool.validated_pool.0.rotator().ban(&Instant::now(), vec![pool.hash_of(&uxt)]); let res = block_on(pool.submit_one(&BlockId::Number(0), uxt)); assert_eq!(pool.status().ready, 0); assert_eq!(pool.status().future, 0); @@ -770,16 +602,16 @@ mod tests { }))).unwrap(); // when - pool.clear_stale(&BlockId::Number(5)).unwrap(); + pool.validated_pool.0.clear_stale(&BlockId::Number(5)).unwrap(); // then assert_eq!(pool.ready().count(), 0); assert_eq!(pool.status().future, 0); assert_eq!(pool.status().ready, 0); // make sure they are temporarily banned as well - assert!(pool.data.0.rotator.is_banned(&hash1)); - assert!(pool.data.0.rotator.is_banned(&hash2)); - assert!(pool.data.0.rotator.is_banned(&hash3)); + assert!(pool.validated_pool.0.rotator().is_banned(&hash1)); + assert!(pool.validated_pool.0.rotator().is_banned(&hash2)); + assert!(pool.validated_pool.0.rotator().is_banned(&hash3)); } #[test] @@ -794,10 +626,10 @@ mod tests { }))).unwrap(); // when - block_on(pool.prune_tags(&BlockId::Number(1), vec![vec![0]], vec![hash1.clone()])).unwrap(); + block_on(pool.validated_pool.prune_tags(&BlockId::Number(1), vec![vec![0]], vec![hash1.clone()])).unwrap(); // then - assert!(pool.data.0.rotator.is_banned(&hash1)); + assert!(pool.validated_pool.0.rotator().is_banned(&hash1)); } #[test] @@ -830,8 +662,8 @@ mod tests { // then assert_eq!(pool.status().future, 1); - assert!(pool.data.0.rotator.is_banned(&hash1)); - assert!(!pool.data.0.rotator.is_banned(&hash2)); + assert!(pool.validated_pool.0.rotator().is_banned(&hash1)); + assert!(!pool.validated_pool.0.rotator().is_banned(&hash2)); } #[test] @@ -895,7 +727,7 @@ mod tests { assert_eq!(pool.status().future, 0); // when - block_on(pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![])).unwrap(); + block_on(pool.validated_pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![])).unwrap(); assert_eq!(pool.status().ready, 0); assert_eq!(pool.status().future, 0); @@ -920,7 +752,7 @@ mod tests { assert_eq!(pool.status().future, 0); // when - block_on(pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![2u64])).unwrap(); + block_on(pool.validated_pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![2u64])).unwrap(); assert_eq!(pool.status().ready, 0); assert_eq!(pool.status().future, 0); @@ -973,7 +805,7 @@ mod tests { assert_eq!(pool.status().ready, 1); // when - pool.remove_invalid(&[*watcher.hash()]); + pool.validated_pool.0.remove_invalid(&[*watcher.hash()]); // then @@ -1053,7 +885,7 @@ mod tests { let (ready, is_ready) = std::sync::mpsc::sync_channel(0); let (tx, rx) = std::sync::mpsc::sync_channel(1); let mut api = TestApi::default(); - api.delay = Mutex::new(rx.into()); + api.delay = Arc::new(Mutex::new(rx.into())); let pool = Arc::new(Pool::new(Default::default(), api)); // when @@ -1085,7 +917,7 @@ mod tests { assert_eq!(pool.status().ready, 1); // Now block import happens before the second transaction is able to finish verification. - block_on(pool.prune_tags(&BlockId::Number(1), vec![provides], vec![])).unwrap(); + block_on(pool.validated_pool.prune_tags(&BlockId::Number(1), vec![provides], vec![])).unwrap(); assert_eq!(pool.status().ready, 0); diff --git a/core/transaction-pool/graph/src/validated_pool.rs b/core/transaction-pool/graph/src/validated_pool.rs new file mode 100644 index 0000000000000..9bf1012628645 --- /dev/null +++ b/core/transaction-pool/graph/src/validated_pool.rs @@ -0,0 +1,371 @@ +// Copyright 2018-2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use std::{ + collections::{HashSet, HashMap}, + hash, + time, +}; + +use crate::base_pool as base; +use crate::error; +use crate::listener::Listener; +use crate::rotator::PoolRotator; +use crate::watcher::Watcher; +use serde::Serialize; +use log::debug; + +use futures::channel::mpsc; +use parking_lot::{Mutex, RwLock}; +use sr_primitives::{ + generic::BlockId, + traits::{self, SaturatedConversion}, + transaction_validity::TransactionTag as Tag, +}; + +use crate::base_pool::PruneStatus; +use crate::pool::{EventStream, Options, ChainApi, BlockHash, ExHash, ExtrinsicFor, TransactionFor}; + +/// Pre-validated transaction. Validated pool only accepts transactions wrapped in this enum. +#[derive(Debug)] +pub enum ValidatedTransaction { + /// Transaction that has been validated successfully. + Valid(base::Transaction), + /// Transaction that is invalid. + Invalid(Error), + /// Transaction which validity can't be determined. + /// + /// We're notifying watchers about failure, if 'unknown' transaction is submitted. + Unknown(Hash, Error), +} + +/// A type of validated transaction stored in the pool. +pub type ValidatedTransactionFor = ValidatedTransaction< + ExHash, + ExtrinsicFor, + ::Error, +>; + +/// Pool that deals with validated transactions. +pub(crate) struct ValidatedPool { + api: B, + options: Options, + listener: RwLock, BlockHash>>, + pool: RwLock, + ExtrinsicFor, + >>, + import_notification_sinks: Mutex>>, + rotator: PoolRotator>, +} + +impl ValidatedPool { + /// Create a new transaction pool. + pub fn new(options: Options, api: B) -> Self { + ValidatedPool { + api, + options, + listener: Default::default(), + pool: Default::default(), + import_notification_sinks: Default::default(), + rotator: Default::default(), + } + } + + /// Bans given set of hashes. + pub fn ban(&self, now: &std::time::Instant, hashes: impl IntoIterator>) { + self.rotator.ban(now, hashes) + } + + /// Returns true if transaction with given hash is currently banned from the pool. + pub fn is_banned(&self, hash: &ExHash) -> bool { + self.rotator.is_banned(hash) + } + + /// Imports a bunch of pre-validated transactions to the pool. + pub fn submit(&self, txs: T) -> Vec, B::Error>> where + T: IntoIterator> + { + let results = txs.into_iter() + .map(|validated_tx| self.submit_one(validated_tx)) + .collect::>(); + + let removed = self.enforce_limits(); + + results.into_iter().map(|res| match res { + Ok(ref hash) if removed.contains(hash) => Err(error::Error::ImmediatelyDropped.into()), + other => other, + }).collect() + } + + /// Submit single pre-validated transaction to the pool. + fn submit_one(&self, tx: ValidatedTransactionFor) -> Result, B::Error> { + match tx { + ValidatedTransaction::Valid(tx) => { + let imported = self.pool.write().import(tx)?; + + if let base::Imported::Ready { .. } = imported { + self.import_notification_sinks.lock().retain(|sink| sink.unbounded_send(()).is_ok()); + } + + let mut listener = self.listener.write(); + fire_events(&mut *listener, &imported); + Ok(imported.hash().clone()) + } + ValidatedTransaction::Invalid(err) => Err(err.into()), + ValidatedTransaction::Unknown(hash, err) => { + self.listener.write().invalid(&hash); + Err(err.into()) + } + } + } + + fn enforce_limits(&self) -> HashSet> { + let status = self.pool.read().status(); + let ready_limit = &self.options.ready; + let future_limit = &self.options.future; + + debug!(target: "txpool", "Pool Status: {:?}", status); + + if ready_limit.is_exceeded(status.ready, status.ready_bytes) + || future_limit.is_exceeded(status.future, status.future_bytes) { + // clean up the pool + let removed = { + let mut pool = self.pool.write(); + let removed = pool.enforce_limits(ready_limit, future_limit) + .into_iter().map(|x| x.hash.clone()).collect::>(); + // ban all removed transactions + self.rotator.ban(&std::time::Instant::now(), removed.iter().map(|x| x.clone())); + removed + }; + // run notifications + let mut listener = self.listener.write(); + for h in &removed { + listener.dropped(h, None); + } + + removed + } else { + Default::default() + } + } + + /// Import a single extrinsic and starts to watch their progress in the pool. + pub fn submit_and_watch( + &self, + tx: ValidatedTransactionFor, + ) -> Result, BlockHash>, B::Error> { + match tx { + ValidatedTransaction::Valid(tx) => { + let hash = self.api.hash_and_length(&tx.data).0; + let watcher = self.listener.write().create_watcher(hash); + self.submit(std::iter::once(ValidatedTransaction::Valid(tx))) + .pop() + .expect("One extrinsic passed; one result returned; qed") + .map(|_| watcher) + }, + ValidatedTransaction::Invalid(err) => Err(err.into()), + ValidatedTransaction::Unknown(_, err) => Err(err.into()), + } + } + + /// For each extrinsic, returns tags that it provides (if known), or None (if it is unknown). + pub fn extrinsics_tags(&self, extrinsics: &[ExtrinsicFor]) -> (Vec>, Vec>>) { + let hashes = extrinsics.iter().map(|extrinsic| self.api.hash_and_length(extrinsic).0).collect::>(); + let in_pool = self.pool.read().by_hash(&hashes); + ( + hashes, + in_pool.into_iter() + .map(|existing_in_pool| existing_in_pool + .map(|transaction| transaction.provides.iter().cloned() + .collect())) + .collect(), + ) + } + + /// Prunes ready transactions that provide given list of tags. + pub fn prune_tags( + &self, + tags: impl IntoIterator, + ) -> Result, ExtrinsicFor>, B::Error> { + // Perform tag-based pruning in the base pool + let status = self.pool.write().prune_tags(tags); + // Notify event listeners of all transactions + // that were promoted to `Ready` or were dropped. + { + let mut listener = self.listener.write(); + for promoted in &status.promoted { + fire_events(&mut *listener, promoted); + } + for f in &status.failed { + listener.dropped(f, None); + } + } + + Ok(status) + } + + /// Resubmit transactions that have been revalidated after prune_tags call. + pub fn resubmit_pruned( + &self, + at: &BlockId, + known_imported_hashes: impl IntoIterator> + Clone, + pruned_hashes: Vec>, + pruned_xts: Vec>, + ) -> Result<(), B::Error> { + debug_assert_eq!(pruned_hashes.len(), pruned_xts.len()); + + // Resubmit pruned transactions + let results = self.submit(pruned_xts); + + // Collect the hashes of transactions that now became invalid (meaning that they are successfully pruned). + let hashes = results + .into_iter() + .enumerate() + .filter_map(|(idx, r)| match r.map_err(error::IntoPoolError::into_pool_error) { + Err(Ok(error::Error::InvalidTransaction(_))) => Some(pruned_hashes[idx].clone()), + _ => None, + }); + // Fire `pruned` notifications for collected hashes and make sure to include + // `known_imported_hashes` since they were just imported as part of the block. + let hashes = hashes.chain(known_imported_hashes.into_iter()); + { + let header_hash = self.api.block_id_to_hash(at)? + .ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())?; + let mut listener = self.listener.write(); + for h in hashes { + listener.pruned(header_hash, &h); + } + } + // perform regular cleanup of old transactions in the pool + // and update temporary bans. + self.clear_stale(at)?; + Ok(()) + } + + /// Removes stale transactions from the pool. + /// + /// Stale transactions are transaction beyond their longevity period. + /// Note this function does not remove transactions that are already included in the chain. + /// See `prune_tags` if you want this. + pub fn clear_stale(&self, at: &BlockId) -> Result<(), B::Error> { + let block_number = self.api.block_id_to_number(at)? + .ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())? + .saturated_into::(); + let now = time::Instant::now(); + let to_remove = { + self.ready() + .filter(|tx| self.rotator.ban_if_stale(&now, block_number, &tx)) + .map(|tx| tx.hash.clone()) + .collect::>() + }; + let futures_to_remove: Vec> = { + let p = self.pool.read(); + let mut hashes = Vec::new(); + for tx in p.futures() { + if self.rotator.ban_if_stale(&now, block_number, &tx) { + hashes.push(tx.hash.clone()); + } + } + hashes + }; + // removing old transactions + self.remove_invalid(&to_remove); + self.remove_invalid(&futures_to_remove); + // clear banned transactions timeouts + self.rotator.clear_timeouts(&now); + + Ok(()) + } + + /// Get rotator reference. + #[cfg(test)] + pub fn rotator(&self) -> &PoolRotator> { + &self.rotator + } + + /// Get api reference. + pub fn api(&self) -> &B { + &self.api + } + + /// Return an event stream of transactions imported to the pool. + pub fn import_notification_stream(&self) -> EventStream { + let (sink, stream) = mpsc::unbounded(); + self.import_notification_sinks.lock().push(sink); + stream + } + + /// Invoked when extrinsics are broadcasted. + pub fn on_broadcasted(&self, propagated: HashMap, Vec>) { + let mut listener = self.listener.write(); + for (hash, peers) in propagated.into_iter() { + listener.broadcasted(&hash, peers); + } + } + + /// Remove from the pool. + pub fn remove_invalid(&self, hashes: &[ExHash]) -> Vec> { + // temporarily ban invalid transactions + debug!(target: "txpool", "Banning invalid transactions: {:?}", hashes); + self.rotator.ban(&time::Instant::now(), hashes.iter().cloned()); + + let invalid = self.pool.write().remove_invalid(hashes); + + let mut listener = self.listener.write(); + for tx in &invalid { + listener.invalid(&tx.hash); + } + + invalid + } + + /// Get an iterator for ready transactions ordered by priority + pub fn ready(&self) -> impl Iterator> { + self.pool.read().ready() + } + + /// Returns pool status. + pub fn status(&self) -> base::Status { + self.pool.read().status() + } +} + +fn fire_events( + listener: &mut Listener, + imported: &base::Imported, +) where + H: hash::Hash + Eq + traits::Member + Serialize, + H2: Clone, +{ + match *imported { + base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => { + listener.ready(hash, None); + for f in failed { + listener.invalid(f); + } + for r in removed { + listener.dropped(&r.hash, Some(hash)); + } + for p in promoted { + listener.ready(p, None); + } + }, + base::Imported::Future { ref hash } => { + listener.future(hash) + }, + } +} From 0d4f8421f8cc59d617bf854091bc417ad4617f7b Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Tue, 24 Sep 2019 13:25:54 +0300 Subject: [PATCH 05/12] removed lost block_on when importing xt from network --- core/network/src/protocol.rs | 15 +++++---- core/network/src/service.rs | 18 ++++++++++- core/network/src/test/mod.rs | 8 +++-- core/service/src/lib.rs | 62 ++++++++++++++++++++---------------- 4 files changed, 65 insertions(+), 38 deletions(-) diff --git a/core/network/src/protocol.rs b/core/network/src/protocol.rs index 201c826ee2cde..9202e830f1a4c 100644 --- a/core/network/src/protocol.rs +++ b/core/network/src/protocol.rs @@ -969,12 +969,15 @@ impl, H: ExHashT> Protocol { trace!(target: "sync", "Received {} extrinsics from {}", extrinsics.len(), who); if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) { for t in extrinsics { - if let Some(hash) = self.transaction_pool.import(&t) { - self.peerset_handle.report_peer(who.clone(), NEW_EXTRINSIC_REPUTATION_CHANGE); - peer.known_extrinsics.insert(hash); - } else { - trace!(target: "sync", "Extrinsic rejected"); - } + let hash = self.transaction_pool.hash_of(&t); + peer.known_extrinsics.insert(hash); + + self.transaction_pool.import( + self.peerset_handle.clone().into(), + who.clone(), + NEW_EXTRINSIC_REPUTATION_CHANGE, + t, + ); } } } diff --git a/core/network/src/service.rs b/core/network/src/service.rs index a1cba0395e360..4e4ee24bce14b 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -65,8 +65,18 @@ impl ExHashT for T where pub trait TransactionPool: Send + Sync { /// Get transactions from the pool that are ready to be propagated. fn transactions(&self) -> Vec<(H, B::Extrinsic)>; + /// Get hash of transaction. + fn hash_of(&self, transaction: &B::Extrinsic) -> H; /// Import a transaction into the pool. - fn import(&self, transaction: &B::Extrinsic) -> Option; + /// + /// Peer reputation is changed by reputation_change if transaction is accepted by the pool. + fn import( + &self, + report_handle: ReportHandle, + who: PeerId, + reputation_change: i32, + transaction: B::Extrinsic, + ); /// Notify the pool about transactions broadcast. fn on_broadcasted(&self, propagations: HashMap>); } @@ -77,6 +87,12 @@ pub struct ReportHandle { inner: PeersetHandle, // wraps it so we don't have to worry about breaking API. } +impl From for ReportHandle { + fn from(peerset_handle: PeersetHandle) -> Self { + ReportHandle { inner: peerset_handle } + } +} + impl ReportHandle { /// Report a given peer as either beneficial (+) or costly (-) according to the /// given scalar. diff --git a/core/network/src/test/mod.rs b/core/network/src/test/mod.rs index 5fd67acd1b625..29e158ec2c08a 100644 --- a/core/network/src/test/mod.rs +++ b/core/network/src/test/mod.rs @@ -44,7 +44,7 @@ use consensus::{Error as ConsensusError, well_known_cache_keys::{self, Id as Cac use consensus::{BlockOrigin, ForkChoiceStrategy, BlockImportParams, JustificationImport}; use futures::prelude::*; use futures03::{StreamExt as _, TryStreamExt as _}; -use crate::{NetworkWorker, NetworkService, config::ProtocolId}; +use crate::{NetworkWorker, NetworkService, ReportHandle, config::ProtocolId}; use crate::config::{NetworkConfiguration, TransportConfig, BoxFinalityProofRequestBuilder}; use libp2p::PeerId; use parking_lot::Mutex; @@ -387,10 +387,12 @@ impl TransactionPool for EmptyTransactionPool { Vec::new() } - fn import(&self, _transaction: &Extrinsic) -> Option { - None + fn hash_of(&self, _transaction: &Extrinsic) -> Hash { + Hash::default() } + fn import(&self, _report_handle: ReportHandle, _who: PeerId, _rep_change: i32, _transaction: Extrinsic) {} + fn on_broadcasted(&self, _: HashMap>) {} } diff --git a/core/service/src/lib.rs b/core/service/src/lib.rs index 72d7b6671bbb5..d9e13c2dbbe0b 100644 --- a/core/service/src/lib.rs +++ b/core/service/src/lib.rs @@ -38,9 +38,14 @@ use parking_lot::Mutex; use client::{runtime_api::BlockT, Client}; use exit_future::Signal; use futures::prelude::*; -use futures03::executor::block_on; -use futures03::stream::{StreamExt as _, TryStreamExt as _}; -use network::{NetworkService, NetworkState, specialization::NetworkSpecialization, Event, DhtEvent}; +use futures03::{ + future::{ready, FutureExt as _, TryFutureExt as _}, + stream::{StreamExt as _, TryStreamExt as _}, +}; +use network::{ + NetworkService, NetworkState, specialization::NetworkSpecialization, + Event, DhtEvent, PeerId, ReportHandle, +}; use log::{log, warn, debug, error, Level}; use codec::{Encode, Decode}; use primitives::{Blake2Hasher, H256}; @@ -176,6 +181,7 @@ macro_rules! new_impl { imports_external_transactions: !$config.roles.is_light(), pool: transaction_pool.clone(), client: client.clone(), + executor: Arc::new(SpawnTaskHandle { sender: to_spawn_tx.clone(), on_exit: exit.clone() }), }); let protocol_id = { @@ -876,6 +882,7 @@ pub struct TransactionPoolAdapter { imports_external_transactions: bool, pool: Arc

, client: Arc, + executor: TaskExecutor, } /// Get transactions for propagation. @@ -903,7 +910,7 @@ impl network::TransactionPool for TransactionPoolAdapter> where C: network::ClientHandle + Send + Sync, - PoolApi: ChainApi, + PoolApi: 'static + ChainApi, B: BlockT, H: std::hash::Hash + Eq + sr_primitives::traits::Member + serde::Serialize, E: txpool::error::IntoPoolError + From, @@ -912,10 +919,14 @@ where transactions_to_propagate(&self.pool) } - fn import(&self, transaction: &::Extrinsic) -> Option { + fn hash_of(&self, transaction: &B::Extrinsic) -> H { + self.pool.hash_of(transaction) + } + + fn import(&self, report_handle: ReportHandle, who: PeerId, reputation_change: i32, transaction: B::Extrinsic) { if !self.imports_external_transactions { debug!("Transaction rejected"); - return None; + return; } let encoded = transaction.encode(); @@ -923,30 +934,25 @@ where Ok(uxt) => { let best_block_id = BlockId::hash(self.client.info().chain.best_hash); let import_future = self.pool.submit_one(&best_block_id, uxt); - let import_result = block_on(import_future); - - match import_result { - Ok(hash) => Some(hash), - Err(e) => match e.into_pool_error() { - Ok(txpool::error::Error::AlreadyImported(hash)) => { - hash.downcast::().ok() - .map(|x| x.as_ref().clone()) - }, - Ok(e) => { - debug!("Error adding transaction to the pool: {:?}", e); - None - }, - Err(e) => { - debug!("Error converting pool error: {:?}", e); - None - }, - } + let import_future = import_future + .then(move |import_result| { + match import_result { + Ok(_) => report_handle.report_peer(who, reputation_change), + Err(e) => match e.into_pool_error() { + Ok(txpool::error::Error::AlreadyImported(_)) => (), + Ok(e) => debug!("Error adding transaction to the pool: {:?}", e), + Err(e) => debug!("Error converting pool error: {:?}", e), + } + } + ready(Ok(())) + }) + .compat(); + + if let Err(e) = self.executor.execute(Box::new(import_future)) { + warn!("Error scheduling extrinsic import: {:?}", e); } } - Err(e) => { - debug!("Error decoding transaction {}", e); - None - } + Err(e) => debug!("Error decoding transaction {}", e), } } From 8449a172dd98d43e116957f1346595a9c0079bc1 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Tue, 24 Sep 2019 13:49:57 +0300 Subject: [PATCH 06/12] fix grumbles --- core/rpc/src/author/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/rpc/src/author/mod.rs b/core/rpc/src/author/mod.rs index faf0d8e5c527f..04918f04c2d25 100644 --- a/core/rpc/src/author/mod.rs +++ b/core/rpc/src/author/mod.rs @@ -171,7 +171,7 @@ impl AuthorApi, BlockHash

> for Author whe .boxed() .compat() .map_err(|e| e.into_pool_error() - .map(|e| error::Error::from(e)) + .map(error::Error::from) .unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into()) )) }; @@ -195,7 +195,6 @@ impl AuthorApi, BlockHash

> for Author whe .map(|_| ()) })); - if self.executor.execute(Box::new(subscription_future)).is_err() { error!("Failed to spawn watch extrinsic task"); } From c88b42bd4ddee95f13ec4697425c14d709fbd1df Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Mon, 30 Sep 2019 09:22:39 +0300 Subject: [PATCH 07/12] alias for future::Executor in rpc --- core/rpc/api/src/lib.rs | 2 +- core/rpc/api/src/subscriptions.rs | 7 +++++-- core/rpc/src/author/mod.rs | 9 ++++----- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/core/rpc/api/src/lib.rs b/core/rpc/api/src/lib.rs index 78fa58f14af10..12db07633dcc4 100644 --- a/core/rpc/api/src/lib.rs +++ b/core/rpc/api/src/lib.rs @@ -25,7 +25,7 @@ mod helpers; mod subscriptions; pub use jsonrpc_core::IoHandlerExtension as RpcExtension; -pub use subscriptions::Subscriptions; +pub use subscriptions::{Subscriptions, TaskExecutor}; pub use helpers::Receiver; pub mod author; diff --git a/core/rpc/api/src/subscriptions.rs b/core/rpc/api/src/subscriptions.rs index f284e0ef5299d..bff184cadeac0 100644 --- a/core/rpc/api/src/subscriptions.rs +++ b/core/rpc/api/src/subscriptions.rs @@ -25,6 +25,9 @@ use jsonrpc_core::futures::{Future, future}; type Id = u64; +/// Alias for a an implementation of `futures::future::Executor`. +pub type TaskExecutor = Arc + Send>> + Send + Sync>; + /// Generate unique ids for subscriptions. #[derive(Clone, Debug)] pub struct IdProvider { @@ -53,12 +56,12 @@ impl IdProvider { pub struct Subscriptions { next_id: IdProvider, active_subscriptions: Arc>>>, - executor: Arc + Send>> + Send + Sync>, + executor: TaskExecutor, } impl Subscriptions { /// Creates new `Subscriptions` object. - pub fn new(executor: Arc + Send>> + Send + Sync>) -> Self { + pub fn new(executor: TaskExecutor) -> Self { Subscriptions { next_id: Default::default(), active_subscriptions: Default::default(), diff --git a/core/rpc/src/author/mod.rs b/core/rpc/src/author/mod.rs index 04918f04c2d25..6ea5aec1ece34 100644 --- a/core/rpc/src/author/mod.rs +++ b/core/rpc/src/author/mod.rs @@ -26,10 +26,10 @@ use log::{error, warn}; use client::{self, Client}; use rpc::futures::{ Sink, Future, - future::{Executor, result}, + future::result, }; use futures03::{StreamExt as _, compat::Compat}; -use api::Subscriptions; +use api::{Subscriptions, TaskExecutor}; use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId}; use codec::{Encode, Decode}; use primitives::{Bytes, Blake2Hasher, H256, traits::BareCryptoStorePtr}; @@ -53,7 +53,7 @@ use self::error::{Error, FutureResult, Result}; /// Authoring API pub struct Author where P: PoolChainApi + Sync + Send + 'static { /// Futures executor. - executor: Arc + Send>> + Send + Sync>, + executor: TaskExecutor, /// Substrate client client: Arc::Block, RA>>, /// Transactions pool @@ -67,7 +67,7 @@ pub struct Author where P: PoolChainApi + Sync + Send + 'static { impl Author where P: PoolChainApi + Sync + Send + 'static { /// Create new instance of Authoring API. pub fn new( - executor: Arc + Send>> + Send + Sync>, + executor: TaskExecutor, client: Arc::Block, RA>>, pool: Arc>, subscriptions: Subscriptions, @@ -124,7 +124,6 @@ impl AuthorApi, BlockHash

> for Author whe let best_block_hash = self.client.info().chain.best_hash; Box::new(self.pool .submit_one(&generic::BlockId::hash(best_block_hash), xt) - .boxed() .compat() .map_err(|e| e.into_pool_error() .map(Into::into) From 6044471a4c2a944f8451f53f09bf086b187848cc Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Mon, 30 Sep 2019 10:01:56 +0300 Subject: [PATCH 08/12] removed executor from Author RPCs --- core/rpc/src/author/mod.rs | 37 ++++++++++++++++++------------------ core/rpc/src/author/tests.rs | 8 -------- core/service/src/builder.rs | 3 +-- 3 files changed, 20 insertions(+), 28 deletions(-) diff --git a/core/rpc/src/author/mod.rs b/core/rpc/src/author/mod.rs index 6ea5aec1ece34..9a978f22f717a 100644 --- a/core/rpc/src/author/mod.rs +++ b/core/rpc/src/author/mod.rs @@ -21,15 +21,16 @@ mod tests; use std::{sync::Arc, convert::TryInto}; use futures03::future::{FutureExt, TryFutureExt}; -use log::{error, warn}; +use log::warn; use client::{self, Client}; use rpc::futures::{ Sink, Future, + stream::Stream as _, future::result, }; use futures03::{StreamExt as _, compat::Compat}; -use api::{Subscriptions, TaskExecutor}; +use api::Subscriptions; use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId}; use codec::{Encode, Decode}; use primitives::{Bytes, Blake2Hasher, H256, traits::BareCryptoStorePtr}; @@ -52,8 +53,6 @@ use self::error::{Error, FutureResult, Result}; /// Authoring API pub struct Author where P: PoolChainApi + Sync + Send + 'static { - /// Futures executor. - executor: TaskExecutor, /// Substrate client client: Arc::Block, RA>>, /// Transactions pool @@ -67,14 +66,12 @@ pub struct Author where P: PoolChainApi + Sync + Send + 'static { impl Author where P: PoolChainApi + Sync + Send + 'static { /// Create new instance of Authoring API. pub fn new( - executor: TaskExecutor, client: Arc::Block, RA>>, pool: Arc>, subscriptions: Subscriptions, keystore: BareCryptoStorePtr, ) -> Self { Author { - executor, client, pool, subscriptions, @@ -184,19 +181,23 @@ impl AuthorApi, BlockHash

> for Author whe }, }; - let subscriptions = self.subscriptions.clone(); - let subscription_future = future_watcher + // make 'future' watcher be a future with output = stream of watcher events + let future_watcher = future_watcher .map_err(|err| { warn!("Failed to submit extrinsic: {}", err); }) - .map(move |watcher| subscriptions.add(subscriber, move |sink| { - sink - .sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) - .send_all(Compat::new(watcher.into_stream().map(|v| Ok::<_, ()>(Ok(v))))) - .map(|_| ()) - })); - - if self.executor.execute(Box::new(subscription_future)).is_err() { - error!("Failed to spawn watch extrinsic task"); - } + .map(|watcher| Compat::new(watcher.into_stream().map(|v| Ok::<_, ()>(Ok(v))))); + + // convert a 'future' watcher into the stream with single element = stream of watcher events + let watcher_stream = future_watcher.into_stream(); + + // and now flatten the 'watcher_stream' so that we'll have the stream with watcher events + let watcher_stream = watcher_stream.flatten(); + + self.subscriptions.add(subscriber, move |sink| { + sink + .sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) + .send_all(watcher_stream) + .map(|_| ()) + }); } fn unwatch_extrinsic(&self, _metadata: Option, id: SubscriptionId) -> Result { diff --git a/core/rpc/src/author/tests.rs b/core/rpc/src/author/tests.rs index be2bac0f4fb61..57d3929d928f7 100644 --- a/core/rpc/src/author/tests.rs +++ b/core/rpc/src/author/tests.rs @@ -23,7 +23,6 @@ use transaction_pool::{ txpool::Pool, FullChainApi, }; -use futures::Stream; use primitives::{ H256, blake2_256, hexdisplay::HexDisplay, traits::BareCryptoStore, testing::{ED25519, SR25519, KeyStore}, ed25519, crypto::Pair @@ -50,7 +49,6 @@ fn submit_transaction_should_not_cause_error() { let client = Arc::new(test_client::new()); let keystore = KeyStore::new(); let p = Author { - executor: Arc::new(runtime.executor()), client: client.clone(), pool: Arc::new(Pool::new(Default::default(), FullChainApi::new(client))), subscriptions: Subscriptions::new(Arc::new(runtime.executor())), @@ -74,7 +72,6 @@ fn submit_rich_transaction_should_not_cause_error() { let client = Arc::new(test_client::new()); let keystore = KeyStore::new(); let p = Author { - executor: Arc::new(runtime.executor()), client: client.clone(), pool: Arc::new(Pool::new(Default::default(), FullChainApi::new(client.clone()))), subscriptions: Subscriptions::new(Arc::new(runtime.executor())), @@ -100,7 +97,6 @@ fn should_watch_extrinsic() { let pool = Arc::new(Pool::new(Default::default(), FullChainApi::new(client.clone()))); let keystore = KeyStore::new(); let p = Author { - executor: Arc::new(runtime.executor()), client, pool: pool.clone(), subscriptions: Subscriptions::new(Arc::new(runtime.executor())), @@ -143,7 +139,6 @@ fn should_return_pending_extrinsics() { let pool = Arc::new(Pool::new(Default::default(), FullChainApi::new(client.clone()))); let keystore = KeyStore::new(); let p = Author { - executor: Arc::new(runtime.executor()), client, pool: pool.clone(), subscriptions: Subscriptions::new(Arc::new(runtime.executor())), @@ -164,7 +159,6 @@ fn should_remove_extrinsics() { let pool = Arc::new(Pool::new(Default::default(), FullChainApi::new(client.clone()))); let keystore = KeyStore::new(); let p = Author { - executor: Arc::new(runtime.executor()), client, pool: pool.clone(), subscriptions: Subscriptions::new(Arc::new(runtime.executor())), @@ -194,7 +188,6 @@ fn should_insert_key() { let client = Arc::new(test_client::new()); let keystore = KeyStore::new(); let p = Author { - executor: Arc::new(runtime.executor()), client: client.clone(), pool: Arc::new(Pool::new(Default::default(), FullChainApi::new(client))), subscriptions: Subscriptions::new(Arc::new(runtime.executor())), @@ -221,7 +214,6 @@ fn should_rotate_keys() { let keystore = KeyStore::new(); let client = Arc::new(test_client::TestClientBuilder::new().set_keystore(keystore.clone()).build()); let p = Author { - executor: Arc::new(runtime.executor()), client: client.clone(), pool: Arc::new(Pool::new(Default::default(), FullChainApi::new(client))), subscriptions: Subscriptions::new(Arc::new(runtime.executor())), diff --git a/core/service/src/builder.rs b/core/service/src/builder.rs index e599aa6a8d4e0..a50e939174312 100644 --- a/core/service/src/builder.rs +++ b/core/service/src/builder.rs @@ -899,11 +899,10 @@ where RpcB: RpcBuilder, { use rpc::{chain, state, author, system}; - let subscriptions = rpc::Subscriptions::new(task_executor.clone()); + let subscriptions = rpc::Subscriptions::new(task_executor); let chain = rpc_builder.build_chain(subscriptions.clone()); let state = rpc_builder.build_state(subscriptions.clone()); let author = rpc::author::Author::new( - task_executor.clone(), client, transaction_pool, subscriptions, From 00e602bdc5df29998d1e98704df275f53c2a5078 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Mon, 30 Sep 2019 10:37:37 +0300 Subject: [PATCH 09/12] Pool + SharedValidatedPool -> Pool --- core/transaction-pool/graph/src/pool.rs | 206 +++++++++++------------- 1 file changed, 91 insertions(+), 115 deletions(-) diff --git a/core/transaction-pool/graph/src/pool.rs b/core/transaction-pool/graph/src/pool.rs index 479b6e7495593..53b2a62cbee89 100644 --- a/core/transaction-pool/graph/src/pool.rs +++ b/core/transaction-pool/graph/src/pool.rs @@ -111,17 +111,14 @@ impl Default for Options { /// Extrinsics pool that performs validation. pub struct Pool { - validated_pool: SharedValidatedPool, + validated_pool: Arc>, } -/// Shared validated extrinsics pool. -struct SharedValidatedPool(Arc>); - impl Pool { /// Create a new transaction pool. pub fn new(options: Options, api: B) -> Self { Pool { - validated_pool: SharedValidatedPool(Arc::new(ValidatedPool::new(options, api))), + validated_pool: Arc::new(ValidatedPool::new(options, api)), } } @@ -132,10 +129,9 @@ impl Pool { T: IntoIterator> { let validated_pool = self.validated_pool.clone(); - self.validated_pool - .verify(at, xts, force) + self.verify(at, xts, force) .map(move |validated_transactions| validated_transactions - .map(|validated_transactions| validated_pool.0.submit(validated_transactions))) + .map(|validated_transactions| validated_pool.submit(validated_transactions))) } /// Imports one unverified extrinsic to the pool @@ -157,15 +153,15 @@ impl Pool { at: &BlockId, xt: ExtrinsicFor, ) -> impl Future, BlockHash>, B::Error>> { - let block_number = match self.validated_pool.resolve_block_number(at) { + let block_number = match self.resolve_block_number(at) { Ok(block_number) => block_number, Err(err) => return Either::Left(ready(Err(err))) }; let validated_pool = self.validated_pool.clone(); Either::Right( - self.validated_pool.verify_one(at, block_number, xt, false) - .map(move |validated_transactions| validated_pool.0.submit_and_watch(validated_transactions)) + self.verify_one(at, block_number, xt, false) + .map(move |validated_transactions| validated_pool.submit_and_watch(validated_transactions)) ) } @@ -180,87 +176,9 @@ impl Pool { at: &BlockId, parent: &BlockId, extrinsics: &[ExtrinsicFor], - ) -> impl Future> { - self.validated_pool.prune(at, parent, extrinsics) - } - - /// Prunes ready transactions that provide given list of tags. - /// - /// Given tags are assumed to be always provided now, so all transactions - /// in the Future Queue that require that particular tag (and have other - /// requirements satisfied) are promoted to Ready Queue. - /// - /// Moreover for each provided tag we remove transactions in the pool that: - /// 1. Provide that tag directly - /// 2. Are a dependency of pruned transaction. - /// - /// Returns transactions that have been removed from the pool and must be reverified - /// before reinserting to the pool. - /// - /// By removing predecessor transactions as well we might actually end up - /// pruning too much, so all removed transactions are reverified against - /// the runtime (`validate_transaction`) to make sure they are invalid. - /// - /// However we avoid revalidating transactions that are contained within - /// the second parameter of `known_imported_hashes`. These transactions - /// (if pruned) are not revalidated and become temporarily banned to - /// prevent importing them in the (near) future. - pub fn prune_tags( - &self, - at: &BlockId, - tags: impl IntoIterator, - known_imported_hashes: impl IntoIterator> + Clone, - ) -> impl Future> { - self.validated_pool.prune_tags(at, tags, known_imported_hashes) - } - - /// Return an event stream of transactions imported to the pool. - pub fn import_notification_stream(&self) -> EventStream { - self.validated_pool.0.import_notification_stream() - } - - /// Invoked when extrinsics are broadcasted. - pub fn on_broadcasted(&self, propagated: HashMap, Vec>) { - self.validated_pool.0.on_broadcasted(propagated) - } - - /// Remove from the pool. - pub fn remove_invalid(&self, hashes: &[ExHash]) -> Vec> { - self.validated_pool.0.remove_invalid(hashes) - } - - /// Get an iterator for ready transactions ordered by priority - pub fn ready(&self) -> impl Iterator> { - self.validated_pool.0.ready() - } - - /// Returns pool status. - pub fn status(&self) -> base::Status { - self.validated_pool.0.status() - } - - /// Returns transaction hash - pub fn hash_of(&self, xt: &ExtrinsicFor) -> ExHash { - self.validated_pool.0.api().hash_and_length(xt).0 - } -} - -impl Clone for SharedValidatedPool { - fn clone(&self) -> Self { - Self(self.0.clone()) - } -} - -impl SharedValidatedPool { - /// Prunes ready transactions. - fn prune( - &self, - at: &BlockId, - parent: &BlockId, - extrinsics: &[ExtrinsicFor], ) -> impl Future> { // Get details of all extrinsics that are already in the pool - let (in_pool_hashes, in_pool_tags) = self.0.extrinsics_tags(extrinsics); + let (in_pool_hashes, in_pool_tags) = self.validated_pool.extrinsics_tags(extrinsics); // Zip the ones from the pool with the full list (we get pairs `(Extrinsic, Option>)`) let all = extrinsics.iter().zip(in_pool_tags.into_iter()); @@ -275,7 +193,7 @@ impl SharedValidatedPool { ), // if it's not found in the pool query the runtime at parent block // to get validity info and tags that the extrinsic provides. - None => Either::Right(self.0.api().validate_transaction(parent, extrinsic.clone()) + None => Either::Right(self.validated_pool.api().validate_transaction(parent, extrinsic.clone()) .then(|validity| ready(match validity { Ok(Ok(validity)) => validity.provides, // silently ignore invalid extrinsics, @@ -287,8 +205,8 @@ impl SharedValidatedPool { // Prune transactions by tags let at = at.clone(); - let validated_pool = self.clone(); - future_tags.then(move |tags| validated_pool.prune_tags( + let self_clone = self.clone(); + future_tags.then(move |tags| self_clone.prune_tags( &at, tags.into_iter().flat_map(|tags| tags), in_pool_hashes, @@ -296,14 +214,34 @@ impl SharedValidatedPool { } /// Prunes ready transactions that provide given list of tags. - fn prune_tags( + /// + /// Given tags are assumed to be always provided now, so all transactions + /// in the Future Queue that require that particular tag (and have other + /// requirements satisfied) are promoted to Ready Queue. + /// + /// Moreover for each provided tag we remove transactions in the pool that: + /// 1. Provide that tag directly + /// 2. Are a dependency of pruned transaction. + /// + /// Returns transactions that have been removed from the pool and must be reverified + /// before reinserting to the pool. + /// + /// By removing predecessor transactions as well we might actually end up + /// pruning too much, so all removed transactions are reverified against + /// the runtime (`validate_transaction`) to make sure they are invalid. + /// + /// However we avoid revalidating transactions that are contained within + /// the second parameter of `known_imported_hashes`. These transactions + /// (if pruned) are not revalidated and become temporarily banned to + /// prevent importing them in the (near) future. + pub fn prune_tags( &self, at: &BlockId, tags: impl IntoIterator, known_imported_hashes: impl IntoIterator> + Clone, ) -> impl Future> { // Prune all transactions that provide given tags - let prune_status = match self.0.prune_tags(tags) { + let prune_status = match self.validated_pool.prune_tags(tags) { Ok(prune_status) => prune_status, Err(e) => return Either::Left(ready(Err(e))), }; @@ -311,7 +249,7 @@ impl SharedValidatedPool { // Make sure that we don't revalidate extrinsics that were part of the recently // imported block. This is especially important for UTXO-like chains cause the // inputs are pruned so such transaction would go to future again. - self.0.ban(&std::time::Instant::now(), known_imported_hashes.clone().into_iter()); + self.validated_pool.ban(&std::time::Instant::now(), known_imported_hashes.clone().into_iter()); // Try to re-validate pruned transactions since some of them might be still valid. // note that `known_imported_hashes` will be rejected here due to temporary ban. @@ -321,10 +259,10 @@ impl SharedValidatedPool { // And finally - submit reverified transactions back to the pool let at = at.clone(); - let validated_pool = self.clone(); + let validated_pool = self.validated_pool.clone(); Either::Right(reverify_future.then(move |reverified_transactions| ready(reverified_transactions.and_then(|reverified_transactions| - validated_pool.0.resubmit_pruned( + validated_pool.resubmit_pruned( &at, known_imported_hashes, pruned_hashes, @@ -333,9 +271,39 @@ impl SharedValidatedPool { ))) } + /// Return an event stream of transactions imported to the pool. + pub fn import_notification_stream(&self) -> EventStream { + self.validated_pool.import_notification_stream() + } + + /// Invoked when extrinsics are broadcasted. + pub fn on_broadcasted(&self, propagated: HashMap, Vec>) { + self.validated_pool.on_broadcasted(propagated) + } + + /// Remove from the pool. + pub fn remove_invalid(&self, hashes: &[ExHash]) -> Vec> { + self.validated_pool.remove_invalid(hashes) + } + + /// Get an iterator for ready transactions ordered by priority + pub fn ready(&self) -> impl Iterator> { + self.validated_pool.ready() + } + + /// Returns pool status. + pub fn status(&self) -> base::Status { + self.validated_pool.status() + } + + /// Returns transaction hash + pub fn hash_of(&self, xt: &ExtrinsicFor) -> ExHash { + self.validated_pool.api().hash_and_length(xt).0 + } + /// Resolves block number by id. fn resolve_block_number(&self, at: &BlockId) -> Result, B::Error> { - self.0.api().block_id_to_number(at) + self.validated_pool.api().block_id_to_number(at) .and_then(|number| number.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())) } @@ -370,12 +338,12 @@ impl SharedValidatedPool { xt: ExtrinsicFor, force: bool, ) -> impl Future> { - let (hash, bytes) = self.0.api().hash_and_length(&xt); - if !force && self.0.is_banned(&hash) { + let (hash, bytes) = self.validated_pool.api().hash_and_length(&xt); + if !force && self.validated_pool.is_banned(&hash) { return Either::Left(ready(ValidatedTransaction::Invalid(error::Error::TemporarilyBanned.into()))) } - Either::Right(self.0.api().validate_transaction(block_id, xt.clone()) + Either::Right(self.validated_pool.api().validate_transaction(block_id, xt.clone()) .then(move |validation_result| ready(match validation_result { Ok(validity) => match validity { Ok(validity) => if validity.provides.is_empty() { @@ -404,6 +372,14 @@ impl SharedValidatedPool { } } +impl Clone for Pool { + fn clone(&self) -> Self { + Self { + validated_pool: self.validated_pool.clone(), + } + } +} + #[cfg(test)] mod tests { use std::{ @@ -529,7 +505,7 @@ mod tests { }); // when - pool.validated_pool.0.rotator().ban(&Instant::now(), vec![pool.hash_of(&uxt)]); + pool.validated_pool.rotator().ban(&Instant::now(), vec![pool.hash_of(&uxt)]); let res = block_on(pool.submit_one(&BlockId::Number(0), uxt)); assert_eq!(pool.status().ready, 0); assert_eq!(pool.status().future, 0); @@ -602,16 +578,16 @@ mod tests { }))).unwrap(); // when - pool.validated_pool.0.clear_stale(&BlockId::Number(5)).unwrap(); + pool.validated_pool.clear_stale(&BlockId::Number(5)).unwrap(); // then assert_eq!(pool.ready().count(), 0); assert_eq!(pool.status().future, 0); assert_eq!(pool.status().ready, 0); // make sure they are temporarily banned as well - assert!(pool.validated_pool.0.rotator().is_banned(&hash1)); - assert!(pool.validated_pool.0.rotator().is_banned(&hash2)); - assert!(pool.validated_pool.0.rotator().is_banned(&hash3)); + assert!(pool.validated_pool.rotator().is_banned(&hash1)); + assert!(pool.validated_pool.rotator().is_banned(&hash2)); + assert!(pool.validated_pool.rotator().is_banned(&hash3)); } #[test] @@ -626,10 +602,10 @@ mod tests { }))).unwrap(); // when - block_on(pool.validated_pool.prune_tags(&BlockId::Number(1), vec![vec![0]], vec![hash1.clone()])).unwrap(); + block_on(pool.prune_tags(&BlockId::Number(1), vec![vec![0]], vec![hash1.clone()])).unwrap(); // then - assert!(pool.validated_pool.0.rotator().is_banned(&hash1)); + assert!(pool.validated_pool.rotator().is_banned(&hash1)); } #[test] @@ -662,8 +638,8 @@ mod tests { // then assert_eq!(pool.status().future, 1); - assert!(pool.validated_pool.0.rotator().is_banned(&hash1)); - assert!(!pool.validated_pool.0.rotator().is_banned(&hash2)); + assert!(pool.validated_pool.rotator().is_banned(&hash1)); + assert!(!pool.validated_pool.rotator().is_banned(&hash2)); } #[test] @@ -727,7 +703,7 @@ mod tests { assert_eq!(pool.status().future, 0); // when - block_on(pool.validated_pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![])).unwrap(); + block_on(pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![])).unwrap(); assert_eq!(pool.status().ready, 0); assert_eq!(pool.status().future, 0); @@ -752,7 +728,7 @@ mod tests { assert_eq!(pool.status().future, 0); // when - block_on(pool.validated_pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![2u64])).unwrap(); + block_on(pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![2u64])).unwrap(); assert_eq!(pool.status().ready, 0); assert_eq!(pool.status().future, 0); @@ -805,7 +781,7 @@ mod tests { assert_eq!(pool.status().ready, 1); // when - pool.validated_pool.0.remove_invalid(&[*watcher.hash()]); + pool.validated_pool.remove_invalid(&[*watcher.hash()]); // then @@ -917,7 +893,7 @@ mod tests { assert_eq!(pool.status().ready, 1); // Now block import happens before the second transaction is able to finish verification. - block_on(pool.validated_pool.prune_tags(&BlockId::Number(1), vec![provides], vec![])).unwrap(); + block_on(pool.prune_tags(&BlockId::Number(1), vec![provides], vec![])).unwrap(); assert_eq!(pool.status().ready, 0); From b795e3c49b5f0d2c80fb3c56efcafd173fdde796 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Tue, 1 Oct 2019 10:29:42 +0300 Subject: [PATCH 10/12] fix compilation after merge --- node/cli/src/service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/cli/src/service.rs b/node/cli/src/service.rs index c4deb444a4cae..17a54131850b6 100644 --- a/node/cli/src/service.rs +++ b/node/cli/src/service.rs @@ -40,7 +40,7 @@ use sr_primitives::traits::Block as BlockT; use node_executor::NativeExecutor; use network::NetworkService; use offchain::OffchainWorkers; -use transaction_pool::ChainApi; +use transaction_pool::FullChainApi; use primitives::Blake2Hasher; construct_simple_protocol! { From c63f9b263cb1578865ab9d4600b3a0494a5bfe60 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Tue, 1 Oct 2019 10:50:06 +0300 Subject: [PATCH 11/12] another fix --- node/cli/src/service.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/node/cli/src/service.rs b/node/cli/src/service.rs index 17a54131850b6..d9b850e3139f8 100644 --- a/node/cli/src/service.rs +++ b/node/cli/src/service.rs @@ -65,7 +65,7 @@ macro_rules! new_full_start { Ok(client::LongestChain::new(backend.clone())) })? .with_transaction_pool(|config, client| - Ok(transaction_pool::txpool::Pool::new(config, transaction_pool::FullChainApi::new(client))) + Ok(transaction_pool::txpool::Pool::new(config, FullChainApi::new(client))) )? .with_import_queue(|_config, client, mut select_chain, _transaction_pool| { let select_chain = select_chain.take() @@ -251,7 +251,7 @@ pub fn new_full(config: NodeConfiguration) LongestChain, NetworkStatus, NetworkService::Hash>, - TransactionPool>, + TransactionPool>, OffchainWorkers< ConcreteClient, >::OffchainStorage, @@ -275,7 +275,7 @@ pub fn new_light(config: NodeConfiguration) Ok(LongestChain::new(backend.clone())) })? .with_transaction_pool(|config, client| - Ok(TransactionPool::new(config, transaction_pool::FullChainApi::new(client))) + Ok(TransactionPool::new(config, FullChainApi::new(client))) )? .with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool| { let fetch_checker = fetcher From eeb47174c6fc1b6fbd4ef1051ad9465b631d5eec Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Tue, 1 Oct 2019 11:44:38 +0300 Subject: [PATCH 12/12] another fix --- node/cli/src/service.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/node/cli/src/service.rs b/node/cli/src/service.rs index d9b850e3139f8..7012a3d6ce0ad 100644 --- a/node/cli/src/service.rs +++ b/node/cli/src/service.rs @@ -40,7 +40,6 @@ use sr_primitives::traits::Block as BlockT; use node_executor::NativeExecutor; use network::NetworkService; use offchain::OffchainWorkers; -use transaction_pool::FullChainApi; use primitives::Blake2Hasher; construct_simple_protocol! { @@ -65,7 +64,7 @@ macro_rules! new_full_start { Ok(client::LongestChain::new(backend.clone())) })? .with_transaction_pool(|config, client| - Ok(transaction_pool::txpool::Pool::new(config, FullChainApi::new(client))) + Ok(transaction_pool::txpool::Pool::new(config, transaction_pool::FullChainApi::new(client))) )? .with_import_queue(|_config, client, mut select_chain, _transaction_pool| { let select_chain = select_chain.take() @@ -251,7 +250,7 @@ pub fn new_full(config: NodeConfiguration) LongestChain, NetworkStatus, NetworkService::Hash>, - TransactionPool>, + TransactionPool>, OffchainWorkers< ConcreteClient, >::OffchainStorage, @@ -275,7 +274,7 @@ pub fn new_light(config: NodeConfiguration) Ok(LongestChain::new(backend.clone())) })? .with_transaction_pool(|config, client| - Ok(TransactionPool::new(config, FullChainApi::new(client))) + Ok(TransactionPool::new(config, transaction_pool::FullChainApi::new(client))) )? .with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool| { let fetch_checker = fetcher