From ca284b248ea6569f0750ed610a83eed81e71dd44 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Mon, 13 Jan 2020 19:35:52 +0300 Subject: [PATCH 01/12] Reduction. --- bin/node-template/src/service.rs | 8 +- bin/node/cli/src/service.rs | 20 ++--- client/service/src/builder.rs | 6 +- client/service/src/lib.rs | 8 +- client/transaction-pool/src/lib.rs | 9 +- primitives/transaction-pool/src/pool.rs | 110 +----------------------- 6 files changed, 22 insertions(+), 139 deletions(-) diff --git a/bin/node-template/src/service.rs b/bin/node-template/src/service.rs index ed2299e30f73e..acfa78f5bfefa 100644 --- a/bin/node-template/src/service.rs +++ b/bin/node-template/src/service.rs @@ -43,9 +43,7 @@ macro_rules! new_full_start { .with_transaction_pool(|config, client, _fetcher| { let pool_api = sc_transaction_pool::FullChainApi::new(client.clone()); let pool = sc_transaction_pool::BasicPool::new(config, pool_api); - let maintainer = sc_transaction_pool::FullBasicPoolMaintainer::new(pool.pool().clone(), client); - let maintainable_pool = sp_transaction_pool::MaintainableTransactionPool::new(pool, maintainer); - Ok(maintainable_pool) + Ok(pool) })? .with_import_queue(|_config, client, mut select_chain, transaction_pool| { let select_chain = select_chain.take() @@ -209,9 +207,7 @@ pub fn new_light(config: Configuration; #[allow(dead_code)] -type ConcreteTransactionPool = sp_transaction_pool::MaintainableTransactionPool< - sc_transaction_pool::BasicPool< - sc_transaction_pool::FullChainApi, - ConcreteBlock - >, - sc_transaction_pool::FullBasicPoolMaintainer< - ConcreteClient, - sc_transaction_pool::FullChainApi - > +type ConcreteTransactionPool = sc_transaction_pool::BasicPool< + sc_transaction_pool::FullChainApi, + ConcreteBlock >; /// A specialized configuration object for setting up the node.. @@ -323,9 +315,7 @@ pub fn new_light(config: NodeConfiguration) .ok_or_else(|| "Trying to start light transaction pool without active fetcher")?; let pool_api = sc_transaction_pool::LightChainApi::new(client.clone(), fetcher.clone()); let pool = sc_transaction_pool::BasicPool::new(config, pool_api); - let maintainer = sc_transaction_pool::LightBasicPoolMaintainer::with_defaults(pool.pool().clone(), client, fetcher); - let maintainable_pool = sp_transaction_pool::MaintainableTransactionPool::new(pool, maintainer); - Ok(maintainable_pool) + Ok(pool) })? .with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool| { let fetch_checker = fetcher diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 044798701c6e1..5e07b24840c60 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -49,7 +49,7 @@ use std::{ }; use sysinfo::{get_current_pid, ProcessExt, System, SystemExt}; use sc_telemetry::{telemetry, SUBSTRATE_INFO}; -use sp_transaction_pool::{TransactionPool, TransactionPoolMaintainer}; +use sp_transaction_pool::TransactionPool; use sp_blockchain; use grafana_data_source::{self, record_metrics}; @@ -740,9 +740,7 @@ ServiceBuilder< TSc: Clone, TImpQu: 'static + ImportQueue, TNetP: NetworkSpecialization, - TExPool: 'static - + TransactionPool::Hash> - + TransactionPoolMaintainer::Hash>, + TExPool: TransactionPool::Hash> + 'static, TRpc: sc_rpc::RpcExtension + Clone, { diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index c1b87e4491904..1b2e7bcd3cc71 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -61,7 +61,7 @@ pub use self::builder::{ }; pub use config::{Configuration, Roles, PruningMode}; pub use sc_chain_spec::{ChainSpec, Properties, RuntimeGenesis, Extension as ChainSpecExtension}; -pub use sp_transaction_pool::{TransactionPool, TransactionPoolMaintainer, InPoolTransaction, error::IntoPoolError}; +pub use sp_transaction_pool::{TransactionPool, InPoolTransaction, error::IntoPoolError}; pub use sc_transaction_pool::txpool::Options as TransactionPoolOptions; pub use sc_client::FinalityNotifications; pub use sc_rpc::Metadata as RpcMetadata; @@ -148,8 +148,7 @@ pub trait AbstractService: 'static + Future> + /// Chain selection algorithm. type SelectChain: sp_consensus::SelectChain; /// Transaction pool. - type TransactionPool: TransactionPool - + TransactionPoolMaintainer; + type TransactionPool: TransactionPool; /// Network specialization. type NetworkSpecialization: NetworkSpecialization; @@ -213,8 +212,7 @@ where TExec: 'static + sc_client::CallExecutor + Send + Sync + Clone, TRtApi: 'static + Send + Sync, TSc: sp_consensus::SelectChain + 'static + Clone + Send + Unpin, - TExPool: 'static + TransactionPool - + TransactionPoolMaintainer, + TExPool: 'static + TransactionPool, TOc: 'static + Send + Sync, TNetSpec: NetworkSpecialization, { diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index 4d71307c0abd0..7d182009a66ff 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -20,7 +20,7 @@ #![warn(unused_extern_crates)] mod api; -mod maintainer; +//mod maintainer; pub mod error; #[cfg(test)] @@ -28,7 +28,7 @@ mod tests; pub use sc_transaction_graph as txpool; pub use crate::api::{FullChainApi, LightChainApi}; -pub use crate::maintainer::{FullBasicPoolMaintainer, LightBasicPoolMaintainer}; +//pub use crate::maintainer::{FullBasicPoolMaintainer, LightBasicPoolMaintainer}; use std::{collections::HashMap, sync::Arc}; use futures::{Future, FutureExt}; @@ -39,7 +39,7 @@ use sp_runtime::{ }; use sp_transaction_pool::{ TransactionPool, PoolStatus, ImportNotificationStream, - TxHash, TransactionFor, TransactionStatusStreamFor, + TxHash, TransactionFor, TransactionStatusStreamFor, BlockHash, }; /// Basic implementation of transaction pool that can be customized by providing PoolApi. @@ -129,4 +129,7 @@ impl TransactionPool for BasicPool fn on_broadcasted(&self, propagations: HashMap, Vec>) { self.pool.on_broadcasted(propagations) } + + fn maintain(&self, block: &BlockId, retracted: &[BlockHash]) { + } } diff --git a/primitives/transaction-pool/src/pool.rs b/primitives/transaction-pool/src/pool.rs index e67a9890755d7..97f611e622306 100644 --- a/primitives/transaction-pool/src/pool.rs +++ b/primitives/transaction-pool/src/pool.rs @@ -223,6 +223,9 @@ pub trait TransactionPool: Send + Sync { /// Returns transaction hash fn hash_of(&self, xt: &TransactionFor) -> TxHash; + + /// Perform maintaince + fn maintain(&self, block: &BlockId, retracted: &[BlockHash]); } /// An abstraction for transaction pool. @@ -264,109 +267,4 @@ impl OffchainSubmitTransaction for TPool { e )) } -} - -/// Transaction pool maintainer interface. -pub trait TransactionPoolMaintainer: Send + Sync { - /// Block type. - type Block: BlockT; - /// Transaction Hash type. - type Hash: Hash + Eq + Member + Serialize; - - /// Returns a future that performs maintenance procedures on the pool when - /// with given hash is imported. - fn maintain( - &self, - id: &BlockId, - retracted: &[Self::Hash], - ) -> Box + Send + Unpin>; -} - -/// Maintainable pool implementation. -pub struct MaintainableTransactionPool { - pool: Pool, - maintainer: Maintainer, -} - -impl MaintainableTransactionPool { - /// Create new maintainable pool using underlying pool and maintainer. - pub fn new(pool: Pool, maintainer: Maintainer) -> Self { - MaintainableTransactionPool { pool, maintainer } - } -} - -impl TransactionPool for MaintainableTransactionPool - where - Pool: TransactionPool, - Maintainer: Send + Sync, -{ - type Block = Pool::Block; - type Hash = Pool::Hash; - type InPoolTransaction = Pool::InPoolTransaction; - type Error = Pool::Error; - - fn submit_at( - &self, - at: &BlockId, - xts: impl IntoIterator> + 'static, - ) -> Box, Self::Error>>, Self::Error>> + Send + Unpin> { - self.pool.submit_at(at, xts) - } - - fn submit_one( - &self, - at: &BlockId, - xt: TransactionFor, - ) -> Box, Self::Error>> + Send + Unpin> { - self.pool.submit_one(at, xt) - } - - fn submit_and_watch( - &self, - at: &BlockId, - xt: TransactionFor, - ) -> Box>, Self::Error>> + Send + Unpin> { - self.pool.submit_and_watch(at, xt) - } - - fn remove_invalid(&self, hashes: &[TxHash]) -> Vec> { - self.pool.remove_invalid(hashes) - } - - fn status(&self) -> PoolStatus { - self.pool.status() - } - - fn ready(&self) -> Box>> { - self.pool.ready() - } - - fn import_notification_stream(&self) -> ImportNotificationStream { - self.pool.import_notification_stream() - } - - fn hash_of(&self, xt: &TransactionFor) -> TxHash { - self.pool.hash_of(xt) - } - - fn on_broadcasted(&self, propagations: HashMap, Vec>) { - self.pool.on_broadcasted(propagations) - } -} - -impl TransactionPoolMaintainer for MaintainableTransactionPool - where - Pool: Send + Sync, - Maintainer: TransactionPoolMaintainer -{ - type Block = Maintainer::Block; - type Hash = Maintainer::Hash; - - fn maintain( - &self, - id: &BlockId, - retracted: &[Self::Hash], - ) -> Box + Send + Unpin> { - self.maintainer.maintain(id, retracted) - } -} +} \ No newline at end of file From 38c9207e01c2e3c0bc331a0ce8c5538c34744c80 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 14 Jan 2020 19:09:34 +0300 Subject: [PATCH 02/12] Reformation. --- client/transaction-pool/graph/src/pool.rs | 10 +++- .../graph/src/validated_pool.rs | 4 +- client/transaction-pool/src/api.rs | 18 +++++++ client/transaction-pool/src/lib.rs | 54 +++++++++++++++++-- client/transaction-pool/src/tests.rs | 20 +++++-- primitives/transaction-pool/src/pool.rs | 6 ++- 6 files changed, 100 insertions(+), 12 deletions(-) diff --git a/client/transaction-pool/graph/src/pool.rs b/client/transaction-pool/graph/src/pool.rs index 629bd0a9a93a8..885c69a6359e6 100644 --- a/client/transaction-pool/graph/src/pool.rs +++ b/client/transaction-pool/graph/src/pool.rs @@ -68,6 +68,8 @@ pub trait ChainApi: Send + Sync { type Error: From + error::IntoPoolError; /// Validate transaction future. type ValidationFuture: Future> + Send + Unpin; + /// Body future (since block body might be remote) + type BodyFuture: Future::Extrinsic>, Self::Error>> + Unpin + Send + 'static; /// Verify extrinsic at given block. fn validate_transaction( @@ -84,6 +86,12 @@ pub trait ChainApi: Send + Sync { /// Returns hash and encoding length of the extrinsic. fn hash_and_length(&self, uxt: &ExtrinsicFor) -> (Self::Hash, usize); + + /// Return header + fn block_header(&self, at: &BlockId) -> Result::Header>, Self::Error>; + + /// Return body + fn block_body(&self, at: &BlockId) -> Self::BodyFuture; } /// Pool configuration options. @@ -120,7 +128,7 @@ pub struct Pool { impl Pool { /// Create a new transaction pool. - pub fn new(options: Options, api: B) -> Self { + pub fn new(options: Options, api: Arc) -> Self { Pool { validated_pool: Arc::new(ValidatedPool::new(options, api)), } diff --git a/client/transaction-pool/graph/src/validated_pool.rs b/client/transaction-pool/graph/src/validated_pool.rs index 29f82fb894ac0..34f34d580680c 100644 --- a/client/transaction-pool/graph/src/validated_pool.rs +++ b/client/transaction-pool/graph/src/validated_pool.rs @@ -63,7 +63,7 @@ pub type ValidatedTransactionFor = ValidatedTransaction< /// Pool that deals with validated transactions. pub(crate) struct ValidatedPool { - api: B, + api: Arc, options: Options, listener: RwLock, BlockHash>>, pool: RwLock { impl ValidatedPool { /// Create a new transaction pool. - pub fn new(options: Options, api: B) -> Self { + pub fn new(options: Options, api: Arc) -> Self { let base_pool = base::BasePool::new(options.reject_future_transactions); ValidatedPool { api, diff --git a/client/transaction-pool/src/api.rs b/client/transaction-pool/src/api.rs index 8495b8b65f17b..48b95695f5e11 100644 --- a/client/transaction-pool/src/api.rs +++ b/client/transaction-pool/src/api.rs @@ -71,6 +71,15 @@ impl sc_transaction_graph::ChainApi for FullChainApi> + Send>>; + type BodyFuture = Pin::Extrinsic>>> + Send>>; + + fn block_header(&self, at: &BlockId) -> Result::Header>, Self::Error> { + Ok(None) + } + + fn block_body(&self, at: &BlockId) -> Self::BodyFuture { + Box::pin(ready(Ok(Vec::new()))) + } fn validate_transaction( &self, @@ -149,6 +158,7 @@ impl sc_transaction_graph::ChainApi for LightChainApi> + Send + Unpin>; + type BodyFuture = Pin::Extrinsic>>> + Send>>; fn validate_transaction( &self, @@ -197,4 +207,12 @@ impl sc_transaction_graph::ChainApi for LightChainApi::Hashing as HashT>::hash(x), x.len()) }) } + + fn block_header(&self, at: &BlockId) -> Result::Header>, Self::Error> { + Ok(None) + } + + fn block_body(&self, at: &BlockId) -> Self::BodyFuture { + Box::pin(ready(Ok(Vec::new()))) + } } diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index 7d182009a66ff..bd7dad573e2b2 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -30,8 +30,8 @@ pub use sc_transaction_graph as txpool; pub use crate::api::{FullChainApi, LightChainApi}; //pub use crate::maintainer::{FullBasicPoolMaintainer, LightBasicPoolMaintainer}; -use std::{collections::HashMap, sync::Arc}; -use futures::{Future, FutureExt}; +use std::{collections::HashMap, sync::Arc, pin::Pin}; +use futures::{Future, FutureExt, future::{ready, join}}; use sp_runtime::{ generic::BlockId, @@ -40,6 +40,7 @@ use sp_runtime::{ use sp_transaction_pool::{ TransactionPool, PoolStatus, ImportNotificationStream, TxHash, TransactionFor, TransactionStatusStreamFor, BlockHash, + MaintainedTransactionPool, }; /// Basic implementation of transaction pool that can be customized by providing PoolApi. @@ -49,6 +50,7 @@ pub struct BasicPool PoolApi: sc_transaction_graph::ChainApi, { pool: Arc>, + api: Arc, } impl BasicPool @@ -58,8 +60,11 @@ impl BasicPool { /// Create new basic transaction pool with provided api. pub fn new(options: sc_transaction_graph::Options, pool_api: PoolApi) -> Self { + let api = Arc::new(pool_api); + let cloned_api = api.clone(); BasicPool { - pool: Arc::new(sc_transaction_graph::Pool::new(options, pool_api)), + api: cloned_api, + pool: Arc::new(sc_transaction_graph::Pool::new(options, api)), } } @@ -129,7 +134,48 @@ impl TransactionPool for BasicPool fn on_broadcasted(&self, propagations: HashMap, Vec>) { self.pool.on_broadcasted(propagations) } +} - fn maintain(&self, block: &BlockId, retracted: &[BlockHash]) { +impl MaintainedTransactionPool for BasicPool +where + Block: BlockT, + PoolApi: 'static + sc_transaction_graph::ChainApi, +{ + fn maintain(&self, id: &BlockId, retracted: &[BlockHash]) -> Pin + Send>> { + // basic pool revalidates everything in place (TODO: only if certain time has passed) + let header = self.api.block_header(id) + .and_then(|h| h.ok_or(error::Error::Blockchain(sp_blockchain::Error::UnknownBlock(format!("{}", id))))); + let header = match header { + Ok(header) => header, + Err(err) => { + log::warn!("Failed to maintain basic tx pool - no header in chain! {:?}", err); + return Box::pin(ready(())) + } + }; + + let id = id.clone(); + let pool = self.pool.clone(); + let api = self.api.clone(); + + async move { + let double_pool = pool.clone(); + let hashes = api.block_body(&id).await + .unwrap_or_else(|e| { + log::warn!("Prune known transactions: error request {:?}!", e); + vec![] + }) + .into_iter() + .map(|tx| pool.hash_of(&tx)) + .collect::>(); + + if let Err(e) = pool.prune_known(&id, &hashes) { + log::warn!("Cannot prune known in the pool {:?}!", e); + } + + if let Err(e) = double_pool.revalidate_ready(&id, None).await { + log::warn!("revalidate ready failed {:?}", e); + } + }.boxed() } } + diff --git a/client/transaction-pool/src/tests.rs b/client/transaction-pool/src/tests.rs index 1199e41cf8740..172ff06391051 100644 --- a/client/transaction-pool/src/tests.rs +++ b/client/transaction-pool/src/tests.rs @@ -20,9 +20,9 @@ use super::*; use codec::Encode; use futures::executor::block_on; use sc_transaction_graph::{self, Pool}; -use substrate_test_runtime_client::{runtime::{AccountId, Block, Hash, Index, Extrinsic, Transfer}, AccountKeyring::{self, *}}; +use substrate_test_runtime_client::{runtime::{AccountId, Block, Hash, Header, Index, Extrinsic, Transfer}, AccountKeyring::{self, *}}; use sp_runtime::{ - generic::{self, BlockId}, + generic::{self, BlockId, Block as BlockT}, traits::{Hash as HashT, BlakeTwo256}, transaction_validity::{TransactionValidity, ValidTransaction}, }; @@ -44,6 +44,7 @@ impl sc_transaction_graph::ChainApi for TestApi { type Hash = Hash; type Error = error::Error; type ValidationFuture = futures::future::Ready>; + type BodyFuture = futures::future::Ready>>; fn validate_transaction( &self, @@ -89,6 +90,13 @@ impl sc_transaction_graph::ChainApi for TestApi { (BlakeTwo256::hash(&encoded), encoded.len()) } + fn block_header(&self, id: &BlockId) -> error::Result> { + Ok(None) + } + + fn block_body(&self, id: &BlockId) -> Self::BodyFuture { + futures::future::ready(Ok(vec![])) + } } fn index(at: &BlockId) -> u64 { @@ -114,7 +122,11 @@ fn uxt(who: AccountKeyring, nonce: Index) -> Extrinsic { } fn pool() -> Pool { - Pool::new(Default::default(), TestApi::default()) + Pool::new(Default::default(), TestApi::default().into()) +} + +fn pool_api_with_modifier(modifier: Box) -> Pool { + Pool::new(Default::default(), TestApi { modifier }.into()) } #[test] @@ -196,7 +208,7 @@ fn should_correctly_prune_transactions_providing_more_than_one_tag() { api.modifier = Box::new(|v: &mut ValidTransaction| { v.provides.push(vec![155]); }); - let pool = Pool::new(Default::default(), api); + let pool = Pool::new(Default::default(), Arc::new(api)); let xt = uxt(Alice, 209); block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported"); assert_eq!(pool.status().ready, 1); diff --git a/primitives/transaction-pool/src/pool.rs b/primitives/transaction-pool/src/pool.rs index 97f611e622306..3fc19445c15d1 100644 --- a/primitives/transaction-pool/src/pool.rs +++ b/primitives/transaction-pool/src/pool.rs @@ -20,6 +20,7 @@ use std::{ collections::HashMap, hash::Hash, sync::Arc, + pin::Pin, }; use futures::{ Future, Stream, @@ -223,9 +224,12 @@ pub trait TransactionPool: Send + Sync { /// Returns transaction hash fn hash_of(&self, xt: &TransactionFor) -> TxHash; +} +pub trait MaintainedTransactionPool : TransactionPool { /// Perform maintaince - fn maintain(&self, block: &BlockId, retracted: &[BlockHash]); + fn maintain(&self, block: &BlockId, retracted: &[BlockHash]) + -> Pin + Send>>; } /// An abstraction for transaction pool. From 55a605f5eb2598847544a5511f499bb1ed6aa882 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 14 Jan 2020 19:42:26 +0300 Subject: [PATCH 03/12] add locked timer stuff --- client/transaction-pool/src/lib.rs | 65 ++++++++++++++++++++++++++++-- 1 file changed, 61 insertions(+), 4 deletions(-) diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index bd7dad573e2b2..a07bd0e1a2d46 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -30,12 +30,13 @@ pub use sc_transaction_graph as txpool; pub use crate::api::{FullChainApi, LightChainApi}; //pub use crate::maintainer::{FullBasicPoolMaintainer, LightBasicPoolMaintainer}; -use std::{collections::HashMap, sync::Arc, pin::Pin}; +use std::{collections::HashMap, sync::Arc, pin::Pin, time::Instant}; use futures::{Future, FutureExt, future::{ready, join}}; +use parking_lot::Mutex; use sp_runtime::{ generic::BlockId, - traits::Block as BlockT, + traits::{Block as BlockT, Extrinsic, Header, NumberFor, SimpleArithmetic}, }; use sp_transaction_pool::{ TransactionPool, PoolStatus, ImportNotificationStream, @@ -51,6 +52,7 @@ pub struct BasicPool { pool: Arc>, api: Arc, + revalidation_status: Arc>>>, } impl BasicPool @@ -65,6 +67,7 @@ impl BasicPool BasicPool { api: cloned_api, pool: Arc::new(sc_transaction_graph::Pool::new(options, api)), + revalidation_status: Arc::new(Mutex::new(TxPoolRevalidationStatus::NotScheduled)), } } @@ -136,6 +139,50 @@ impl TransactionPool for BasicPool } } +#[cfg_attr(test, derive(Debug))] +enum TxPoolRevalidationStatus { + /// The revalidation has never been completed. + NotScheduled, + /// The revalidation is scheduled. + Scheduled(Option, Option), + /// The revalidation is in progress. + InProgress, +} + +impl TxPoolRevalidationStatus { + /// Called when revalidation is completed. + pub fn clear(&mut self) { + *self = TxPoolRevalidationStatus::NotScheduled; + } + + /// Returns true if revalidation is required. + pub fn is_required( + &mut self, + block: N, + revalidate_time_period: Option, + revalidate_block_period: Option, + ) -> bool { + match *self { + TxPoolRevalidationStatus::NotScheduled => { + *self = TxPoolRevalidationStatus::Scheduled( + revalidate_time_period.map(|period| Instant::now() + period), + revalidate_block_period.map(|period| block + period), + ); + false + }, + TxPoolRevalidationStatus::Scheduled(revalidate_at_time, revalidate_at_block) => { + let is_required = revalidate_at_time.map(|at| Instant::now() >= at).unwrap_or(false) + || revalidate_at_block.map(|at| block >= at).unwrap_or(false); + if is_required { + *self = TxPoolRevalidationStatus::InProgress; + } + is_required + }, + TxPoolRevalidationStatus::InProgress => false, + } + } +} + impl MaintainedTransactionPool for BasicPool where Block: BlockT, @@ -156,6 +203,12 @@ where let id = id.clone(); let pool = self.pool.clone(); let api = self.api.clone(); + let is_revalidation_required = self.revalidation_status.lock().is_required( + *header.number(), + Some(std::time::Duration::from_secs(60)), + Some(20.into()), + ); + let revalidation_status = self.revalidation_status.clone(); async move { let double_pool = pool.clone(); @@ -172,9 +225,13 @@ where log::warn!("Cannot prune known in the pool {:?}!", e); } - if let Err(e) = double_pool.revalidate_ready(&id, None).await { - log::warn!("revalidate ready failed {:?}", e); + if is_revalidation_required { + if let Err(e) = double_pool.revalidate_ready(&id, None).await { + log::warn!("revalidate ready failed {:?}", e); + } } + + revalidation_status.lock().clear(); }.boxed() } } From d3770802baff8cb78e1e829eeb18cb65acaa72c4 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 14 Jan 2020 19:53:07 +0300 Subject: [PATCH 04/12] fix issues and introduce full pool --- client/transaction-pool/src/full_pool.rs | 165 +++++++++++++++++++++++ client/transaction-pool/src/lib.rs | 10 +- 2 files changed, 169 insertions(+), 6 deletions(-) create mode 100644 client/transaction-pool/src/full_pool.rs diff --git a/client/transaction-pool/src/full_pool.rs b/client/transaction-pool/src/full_pool.rs new file mode 100644 index 0000000000000..e54b507e72fcc --- /dev/null +++ b/client/transaction-pool/src/full_pool.rs @@ -0,0 +1,165 @@ +// Copyright 2018-2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Substrate transaction pool implementation. + +use sc_transaction_graph as txpool; +use crate::api::{FullChainApi, LightChainApi}; + +use std::{collections::HashMap, sync::Arc, pin::Pin, time::Instant}; +use futures::{Future, FutureExt, future::{ready, join}}; +use parking_lot::Mutex; + +use sp_runtime::{ + generic::BlockId, + traits::{Block as BlockT, Extrinsic, Header, NumberFor, SimpleArithmetic}, +}; +use sp_transaction_pool::{ + TransactionPool, PoolStatus, ImportNotificationStream, + TxHash, TransactionFor, TransactionStatusStreamFor, BlockHash, + MaintainedTransactionPool, +}; +use crate::error; + +/// Basic implementation of transaction pool that can be customized by providing PoolApi. +pub struct FullPool + where + Block: BlockT, + PoolApi: sc_transaction_graph::ChainApi, +{ + pool: Arc>, + api: Arc, +} + +impl FullPool + where + Block: BlockT, + PoolApi: sc_transaction_graph::ChainApi, +{ + /// Create new basic transaction pool with provided api. + pub fn new(options: sc_transaction_graph::Options, pool_api: PoolApi) -> Self { + let api = Arc::new(pool_api); + let cloned_api = api.clone(); + FullPool { + api: cloned_api, + pool: Arc::new(sc_transaction_graph::Pool::new(options, api)), + } + } +} + +impl TransactionPool for FullPool +where + Block: BlockT, + PoolApi: 'static + sc_transaction_graph::ChainApi, +{ + type Block = PoolApi::Block; + type Hash = sc_transaction_graph::ExHash; + type InPoolTransaction = sc_transaction_graph::base_pool::Transaction, TransactionFor>; + type Error = error::Error; + + fn submit_at( + &self, + at: &BlockId, + xts: impl IntoIterator> + 'static, + ) -> Box, Self::Error>>, Self::Error>> + Send + Unpin> { + Box::new(self.pool.submit_at(at, xts, false)) + } + + fn submit_one( + &self, + at: &BlockId, + xt: TransactionFor, + ) -> Box, Self::Error>> + Send + Unpin> { + Box::new(self.pool.submit_one(at, xt)) + } + + fn submit_and_watch( + &self, + at: &BlockId, + xt: TransactionFor, + ) -> Box>, Self::Error>> + Send + Unpin> { + Box::new( + self.pool.submit_and_watch(at, xt) + .map(|result| result.map(|watcher| Box::new(watcher.into_stream()) as _)) + ) + } + + fn remove_invalid(&self, hashes: &[TxHash]) -> Vec> { + self.pool.remove_invalid(hashes) + } + + fn status(&self) -> PoolStatus { + self.pool.status() + } + + fn ready(&self) -> Box>> { + Box::new(self.pool.ready()) + } + + fn import_notification_stream(&self) -> ImportNotificationStream { + self.pool.import_notification_stream() + } + + fn hash_of(&self, xt: &TransactionFor) -> TxHash { + self.pool.hash_of(xt) + } + + fn on_broadcasted(&self, propagations: HashMap, Vec>) { + self.pool.on_broadcasted(propagations) + } +} + +impl MaintainedTransactionPool for FullPool +where + Block: BlockT, + PoolApi: 'static + sc_transaction_graph::ChainApi, +{ + fn maintain(&self, id: &BlockId, retracted: &[BlockHash]) -> Pin + Send>> { + // basic pool revalidates everything in place (TODO: only if certain time has passed) + let header = self.api.block_header(id) + .and_then(|h| h.ok_or(error::Error::Blockchain(sp_blockchain::Error::UnknownBlock(format!("{}", id))))); + let header = match header { + Ok(header) => header, + Err(err) => { + log::warn!("Failed to maintain basic tx pool - no header in chain! {:?}", err); + return Box::pin(ready(())) + } + }; + + let id = id.clone(); + let pool = self.pool.clone(); + let api = self.api.clone(); + + async move { + let hashes = api.block_body(&id).await + .unwrap_or_else(|e| { + log::warn!("Prune known transactions: error request {:?}!", e); + vec![] + }) + .into_iter() + .map(|tx| pool.hash_of(&tx)) + .collect::>(); + + if let Err(e) = pool.prune_known(&id, &hashes) { + log::warn!("Cannot prune known in the pool {:?}!", e); + } + + if let Err(e) = pool.revalidate_ready(&id, None).await { + log::warn!("revalidate ready failed {:?}", e); + } + }.boxed() + } +} diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index a07bd0e1a2d46..123d3d450cfb7 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -20,15 +20,15 @@ #![warn(unused_extern_crates)] mod api; -//mod maintainer; - +mod full_pool; pub mod error; + #[cfg(test)] mod tests; +pub use full_pool::FullPool; pub use sc_transaction_graph as txpool; pub use crate::api::{FullChainApi, LightChainApi}; -//pub use crate::maintainer::{FullBasicPoolMaintainer, LightBasicPoolMaintainer}; use std::{collections::HashMap, sync::Arc, pin::Pin, time::Instant}; use futures::{Future, FutureExt, future::{ready, join}}; @@ -211,7 +211,6 @@ where let revalidation_status = self.revalidation_status.clone(); async move { - let double_pool = pool.clone(); let hashes = api.block_body(&id).await .unwrap_or_else(|e| { log::warn!("Prune known transactions: error request {:?}!", e); @@ -226,7 +225,7 @@ where } if is_revalidation_required { - if let Err(e) = double_pool.revalidate_ready(&id, None).await { + if let Err(e) = pool.revalidate_ready(&id, None).await { log::warn!("revalidate ready failed {:?}", e); } } @@ -235,4 +234,3 @@ where }.boxed() } } - From b5b6f5da900388026980ef4377c7cbda8295c2f1 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Wed, 15 Jan 2020 16:01:16 +0300 Subject: [PATCH 05/12] arrange together --- bin/node-template/src/service.rs | 5 +- client/service/src/builder.rs | 4 +- client/transaction-pool/graph/src/pool.rs | 20 +- client/transaction-pool/src/api.rs | 45 +- client/transaction-pool/src/full_pool.rs | 165 ------ client/transaction-pool/src/lib.rs | 164 ++++-- client/transaction-pool/src/maintainer.rs | 645 ---------------------- client/transaction-pool/src/tests.rs | 346 +++++++----- primitives/transaction-pool/src/pool.rs | 1 + 9 files changed, 376 insertions(+), 1019 deletions(-) delete mode 100644 client/transaction-pool/src/full_pool.rs delete mode 100644 client/transaction-pool/src/maintainer.rs diff --git a/bin/node-template/src/service.rs b/bin/node-template/src/service.rs index acfa78f5bfefa..458656d836d47 100644 --- a/bin/node-template/src/service.rs +++ b/bin/node-template/src/service.rs @@ -205,8 +205,11 @@ pub fn new_light(config: Configuration, TNetP: NetworkSpecialization, - TExPool: TransactionPool::Hash> + 'static, + TExPool: MaintainedTransactionPool::Hash> + 'static, TRpc: sc_rpc::RpcExtension + Clone, { diff --git a/client/transaction-pool/graph/src/pool.rs b/client/transaction-pool/graph/src/pool.rs index 885c69a6359e6..26ff57cdc5ba8 100644 --- a/client/transaction-pool/graph/src/pool.rs +++ b/client/transaction-pool/graph/src/pool.rs @@ -69,7 +69,7 @@ pub trait ChainApi: Send + Sync { /// Validate transaction future. type ValidationFuture: Future> + Send + Unpin; /// Body future (since block body might be remote) - type BodyFuture: Future::Extrinsic>, Self::Error>> + Unpin + Send + 'static; + type BodyFuture: Future::Extrinsic>>, Self::Error>> + Unpin + Send + 'static; /// Verify extrinsic at given block. fn validate_transaction( @@ -87,9 +87,6 @@ pub trait ChainApi: Send + Sync { /// Returns hash and encoding length of the extrinsic. fn hash_and_length(&self, uxt: &ExtrinsicFor) -> (Self::Hash, usize); - /// Return header - fn block_header(&self, at: &BlockId) -> Result::Header>, Self::Error>; - /// Return body fn block_body(&self, at: &BlockId) -> Self::BodyFuture; } @@ -496,6 +493,7 @@ mod tests { type Hash = u64; type Error = error::Error; type ValidationFuture = futures::future::Ready>; + type BodyFuture = futures::future::Ready>>>; /// Verify extrinsic at given block. fn validate_transaction( @@ -568,6 +566,10 @@ mod tests { len ) } + + fn block_body(&self, _id: &BlockId) -> Self::BodyFuture { + futures::future::ready(Ok(None)) + } } fn uxt(transfer: Transfer) -> Extrinsic { @@ -575,7 +577,7 @@ mod tests { } fn pool() -> Pool { - Pool::new(Default::default(), TestApi::default()) + Pool::new(Default::default(), TestApi::default().into()) } #[test] @@ -721,7 +723,7 @@ mod tests { ready: limit.clone(), future: limit.clone(), ..Default::default() - }, TestApi::default()); + }, TestApi::default().into()); let hash1 = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), @@ -756,7 +758,7 @@ mod tests { ready: limit.clone(), future: limit.clone(), ..Default::default() - }, TestApi::default()); + }, TestApi::default().into()); // when block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { @@ -932,7 +934,7 @@ mod tests { ready: limit.clone(), future: limit.clone(), ..Default::default() - }, TestApi::default()); + }, TestApi::default().into()); let xt = uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), @@ -966,7 +968,7 @@ mod tests { let (tx, rx) = std::sync::mpsc::sync_channel(1); let mut api = TestApi::default(); api.delay = Arc::new(Mutex::new(rx.into())); - let pool = Arc::new(Pool::new(Default::default(), api)); + let pool = Arc::new(Pool::new(Default::default(), api.into())); // when let xt = uxt(Transfer { diff --git a/client/transaction-pool/src/api.rs b/client/transaction-pool/src/api.rs index 48b95695f5e11..6f7e418ab11c1 100644 --- a/client/transaction-pool/src/api.rs +++ b/client/transaction-pool/src/api.rs @@ -24,7 +24,8 @@ use futures::{ use sc_client_api::{ blockchain::HeaderBackend, - light::{Fetcher, RemoteCallRequest} + light::{Fetcher, RemoteCallRequest, RemoteBodyRequest}, + BlockBody, }; use sp_core::Hasher; use sp_runtime::{ @@ -63,7 +64,7 @@ impl FullChainApi where impl sc_transaction_graph::ChainApi for FullChainApi where Block: BlockT, - Client: ProvideRuntimeApi + BlockIdTo + 'static + Send + Sync, + Client: ProvideRuntimeApi + BlockBody + BlockIdTo + 'static + Send + Sync, Client::Api: TaggedTransactionQueue, sp_api::ApiErrorFor: Send, { @@ -71,14 +72,12 @@ impl sc_transaction_graph::ChainApi for FullChainApi> + Send>>; - type BodyFuture = Pin::Extrinsic>>> + Send>>; + type BodyFuture = Pin::Extrinsic>>>> + Send>>; - fn block_header(&self, at: &BlockId) -> Result::Header>, Self::Error> { - Ok(None) - } - - fn block_body(&self, at: &BlockId) -> Self::BodyFuture { - Box::pin(ready(Ok(Vec::new()))) + fn block_body(&self, id: &BlockId) -> Self::BodyFuture { + Box::pin(ready( + self.client.block_body(&id).map_err(|e| error::Error::from(e)) + )) } fn validate_transaction( @@ -158,7 +157,7 @@ impl sc_transaction_graph::ChainApi for LightChainApi> + Send + Unpin>; - type BodyFuture = Pin::Extrinsic>>> + Send>>; + type BodyFuture = Pin::Extrinsic>>>> + Send>>; fn validate_transaction( &self, @@ -208,11 +207,27 @@ impl sc_transaction_graph::ChainApi for LightChainApi) -> Result::Header>, Self::Error> { - Ok(None) - } + fn block_body(&self, id: &BlockId) -> Self::BodyFuture { + let header = self.client.header(*id) + .and_then(|h| h.ok_or(sp_blockchain::Error::UnknownBlock(format!("{}", id)))); + let header = match header { + Ok(header) => header, + Err(err) => { + log::warn!(target: "txpool", "Failed to query header: {:?}", err); + return Box::pin(ready(Ok(None))); + } + }; - fn block_body(&self, at: &BlockId) -> Self::BodyFuture { - Box::pin(ready(Ok(Vec::new()))) + let fetcher = self.fetcher.clone(); + async move { + let transactions = fetcher.remote_body({ + RemoteBodyRequest { + header, + retry_count: None, + } + }).await; + + Ok(Some(transactions.unwrap_or(Vec::new()))) + }.boxed() } } diff --git a/client/transaction-pool/src/full_pool.rs b/client/transaction-pool/src/full_pool.rs deleted file mode 100644 index e54b507e72fcc..0000000000000 --- a/client/transaction-pool/src/full_pool.rs +++ /dev/null @@ -1,165 +0,0 @@ -// Copyright 2018-2020 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -//! Substrate transaction pool implementation. - -use sc_transaction_graph as txpool; -use crate::api::{FullChainApi, LightChainApi}; - -use std::{collections::HashMap, sync::Arc, pin::Pin, time::Instant}; -use futures::{Future, FutureExt, future::{ready, join}}; -use parking_lot::Mutex; - -use sp_runtime::{ - generic::BlockId, - traits::{Block as BlockT, Extrinsic, Header, NumberFor, SimpleArithmetic}, -}; -use sp_transaction_pool::{ - TransactionPool, PoolStatus, ImportNotificationStream, - TxHash, TransactionFor, TransactionStatusStreamFor, BlockHash, - MaintainedTransactionPool, -}; -use crate::error; - -/// Basic implementation of transaction pool that can be customized by providing PoolApi. -pub struct FullPool - where - Block: BlockT, - PoolApi: sc_transaction_graph::ChainApi, -{ - pool: Arc>, - api: Arc, -} - -impl FullPool - where - Block: BlockT, - PoolApi: sc_transaction_graph::ChainApi, -{ - /// Create new basic transaction pool with provided api. - pub fn new(options: sc_transaction_graph::Options, pool_api: PoolApi) -> Self { - let api = Arc::new(pool_api); - let cloned_api = api.clone(); - FullPool { - api: cloned_api, - pool: Arc::new(sc_transaction_graph::Pool::new(options, api)), - } - } -} - -impl TransactionPool for FullPool -where - Block: BlockT, - PoolApi: 'static + sc_transaction_graph::ChainApi, -{ - type Block = PoolApi::Block; - type Hash = sc_transaction_graph::ExHash; - type InPoolTransaction = sc_transaction_graph::base_pool::Transaction, TransactionFor>; - type Error = error::Error; - - fn submit_at( - &self, - at: &BlockId, - xts: impl IntoIterator> + 'static, - ) -> Box, Self::Error>>, Self::Error>> + Send + Unpin> { - Box::new(self.pool.submit_at(at, xts, false)) - } - - fn submit_one( - &self, - at: &BlockId, - xt: TransactionFor, - ) -> Box, Self::Error>> + Send + Unpin> { - Box::new(self.pool.submit_one(at, xt)) - } - - fn submit_and_watch( - &self, - at: &BlockId, - xt: TransactionFor, - ) -> Box>, Self::Error>> + Send + Unpin> { - Box::new( - self.pool.submit_and_watch(at, xt) - .map(|result| result.map(|watcher| Box::new(watcher.into_stream()) as _)) - ) - } - - fn remove_invalid(&self, hashes: &[TxHash]) -> Vec> { - self.pool.remove_invalid(hashes) - } - - fn status(&self) -> PoolStatus { - self.pool.status() - } - - fn ready(&self) -> Box>> { - Box::new(self.pool.ready()) - } - - fn import_notification_stream(&self) -> ImportNotificationStream { - self.pool.import_notification_stream() - } - - fn hash_of(&self, xt: &TransactionFor) -> TxHash { - self.pool.hash_of(xt) - } - - fn on_broadcasted(&self, propagations: HashMap, Vec>) { - self.pool.on_broadcasted(propagations) - } -} - -impl MaintainedTransactionPool for FullPool -where - Block: BlockT, - PoolApi: 'static + sc_transaction_graph::ChainApi, -{ - fn maintain(&self, id: &BlockId, retracted: &[BlockHash]) -> Pin + Send>> { - // basic pool revalidates everything in place (TODO: only if certain time has passed) - let header = self.api.block_header(id) - .and_then(|h| h.ok_or(error::Error::Blockchain(sp_blockchain::Error::UnknownBlock(format!("{}", id))))); - let header = match header { - Ok(header) => header, - Err(err) => { - log::warn!("Failed to maintain basic tx pool - no header in chain! {:?}", err); - return Box::pin(ready(())) - } - }; - - let id = id.clone(); - let pool = self.pool.clone(); - let api = self.api.clone(); - - async move { - let hashes = api.block_body(&id).await - .unwrap_or_else(|e| { - log::warn!("Prune known transactions: error request {:?}!", e); - vec![] - }) - .into_iter() - .map(|tx| pool.hash_of(&tx)) - .collect::>(); - - if let Err(e) = pool.prune_known(&id, &hashes) { - log::warn!("Cannot prune known in the pool {:?}!", e); - } - - if let Err(e) = pool.revalidate_ready(&id, None).await { - log::warn!("revalidate ready failed {:?}", e); - } - }.boxed() - } -} diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index 123d3d450cfb7..59d69e99a5894 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -20,23 +20,21 @@ #![warn(unused_extern_crates)] mod api; -mod full_pool; pub mod error; #[cfg(test)] mod tests; -pub use full_pool::FullPool; pub use sc_transaction_graph as txpool; pub use crate::api::{FullChainApi, LightChainApi}; use std::{collections::HashMap, sync::Arc, pin::Pin, time::Instant}; -use futures::{Future, FutureExt, future::{ready, join}}; +use futures::{Future, FutureExt, future::ready}; use parking_lot::Mutex; use sp_runtime::{ generic::BlockId, - traits::{Block as BlockT, Extrinsic, Header, NumberFor, SimpleArithmetic}, + traits::{Block as BlockT, NumberFor, SimpleArithmetic}, }; use sp_transaction_pool::{ TransactionPool, PoolStatus, ImportNotificationStream, @@ -52,7 +50,22 @@ pub struct BasicPool { pool: Arc>, api: Arc, - revalidation_status: Arc>>>, + revalidation_strategy: Arc>>>, +} + +/// Type of revalidation. +pub enum RevalidationType { + /// Light revalidation type. + /// + /// During maintaince, transaction pool makes periodic revalidation + /// depending on number of blocks or time passed. + Light, + + /// Full revalidation type. + /// + /// During maintaince, transaction pool revalidates some transactions + /// from the pool of valid transactions. + Full, } impl BasicPool @@ -61,20 +74,44 @@ impl BasicPool PoolApi: sc_transaction_graph::ChainApi, { /// Create new basic transaction pool with provided api. - pub fn new(options: sc_transaction_graph::Options, pool_api: PoolApi) -> Self { + pub fn new( + options: sc_transaction_graph::Options, + pool_api: PoolApi, + ) -> Self { + Self::with_revalidation_type(options, pool_api, RevalidationType::Full) + } + + /// Create new basic transaction pool with provided api and custom + /// revalidation type. + pub fn with_revalidation_type( + options: sc_transaction_graph::Options, + pool_api: PoolApi, + revalidation_type: RevalidationType, + ) -> Self { let api = Arc::new(pool_api); let cloned_api = api.clone(); BasicPool { api: cloned_api, pool: Arc::new(sc_transaction_graph::Pool::new(options, api)), - revalidation_status: Arc::new(Mutex::new(TxPoolRevalidationStatus::NotScheduled)), + revalidation_strategy: Arc::new(Mutex::new( + match revalidation_type { + RevalidationType::Light => RevalidationStrategy::Light(RevalidationStatus::NotScheduled), + RevalidationType::Full => RevalidationStrategy::Always, + } + )), } + } /// Gets shared reference to the underlying pool. pub fn pool(&self) -> &Arc> { &self.pool } + + #[cfg(test)] + pub fn api(&self) -> &Arc { + &self.api + } } impl TransactionPool for BasicPool @@ -140,7 +177,7 @@ impl TransactionPool for BasicPool } #[cfg_attr(test, derive(Debug))] -enum TxPoolRevalidationStatus { +enum RevalidationStatus { /// The revalidation has never been completed. NotScheduled, /// The revalidation is scheduled. @@ -149,10 +186,43 @@ enum TxPoolRevalidationStatus { InProgress, } -impl TxPoolRevalidationStatus { +enum RevalidationStrategy { + Always, + Light(RevalidationStatus) +} + +impl RevalidationStrategy { + pub fn clear(&mut self) { + if let Self::Light(status) = self { status.clear() } + } + + pub fn resubmit_required(&mut self) -> bool { + if let Self::Light(_) = self { return false } else { return true } + } + + pub fn is_required( + &mut self, + block: N, + revalidate_time_period: Option, + revalidate_block_period: Option, + ) -> bool { + if let Self::Light(status) = self { + status.is_required(block, revalidate_time_period, revalidate_block_period) + } else { true } + } + + pub fn amount(&self) -> Option { + match self { + Self::Light(_) => None, + Self::Always => Some(16), + } + } +} + +impl RevalidationStatus { /// Called when revalidation is completed. pub fn clear(&mut self) { - *self = TxPoolRevalidationStatus::NotScheduled; + *self = Self::NotScheduled; } /// Returns true if revalidation is required. @@ -163,22 +233,22 @@ impl TxPoolRevalidationStatus { revalidate_block_period: Option, ) -> bool { match *self { - TxPoolRevalidationStatus::NotScheduled => { - *self = TxPoolRevalidationStatus::Scheduled( + Self::NotScheduled => { + *self = Self::Scheduled( revalidate_time_period.map(|period| Instant::now() + period), revalidate_block_period.map(|period| block + period), ); false }, - TxPoolRevalidationStatus::Scheduled(revalidate_at_time, revalidate_at_block) => { + Self::Scheduled(revalidate_at_time, revalidate_at_block) => { let is_required = revalidate_at_time.map(|at| Instant::now() >= at).unwrap_or(false) || revalidate_at_block.map(|at| block >= at).unwrap_or(false); if is_required { - *self = TxPoolRevalidationStatus::InProgress; + *self = Self::InProgress; } is_required }, - TxPoolRevalidationStatus::InProgress => false, + Self::InProgress => false, } } } @@ -189,33 +259,41 @@ where PoolApi: 'static + sc_transaction_graph::ChainApi, { fn maintain(&self, id: &BlockId, retracted: &[BlockHash]) -> Pin + Send>> { - // basic pool revalidates everything in place (TODO: only if certain time has passed) - let header = self.api.block_header(id) - .and_then(|h| h.ok_or(error::Error::Blockchain(sp_blockchain::Error::UnknownBlock(format!("{}", id))))); - let header = match header { - Ok(header) => header, - Err(err) => { - log::warn!("Failed to maintain basic tx pool - no header in chain! {:?}", err); - return Box::pin(ready(())) - } - }; - let id = id.clone(); let pool = self.pool.clone(); let api = self.api.clone(); - let is_revalidation_required = self.revalidation_status.lock().is_required( - *header.number(), - Some(std::time::Duration::from_secs(60)), - Some(20.into()), - ); - let revalidation_status = self.revalidation_status.clone(); + + let block_number = match api.block_id_to_number(&id) { + Ok(Some(number)) => number, + _ => { + log::trace!(target: "txqueue", "Skipping chain event - no numbrer for that block {:?}", id); + return Box::pin(ready(())); + } + }; + + let (is_revalidation_required, is_resubmit_required) = { + let mut lock = self.revalidation_strategy.lock(); + ( + lock.is_required( + block_number, + Some(std::time::Duration::from_secs(60)), + Some(20.into()), + ), + lock.resubmit_required() + ) + }; + + let revalidation_status = self.revalidation_strategy.clone(); + let revalidation_amount = revalidation_status.lock().amount(); + let retracted = retracted.to_vec(); async move { let hashes = api.block_body(&id).await .unwrap_or_else(|e| { log::warn!("Prune known transactions: error request {:?}!", e); - vec![] + None }) + .unwrap_or(Vec::new()) .into_iter() .map(|tx| pool.hash_of(&tx)) .collect::>(); @@ -224,8 +302,26 @@ where log::warn!("Cannot prune known in the pool {:?}!", e); } + if is_resubmit_required { + let mut resubmit_transactions = Vec::new(); + + for retracted_hash in retracted.into_iter() { + let txes = api.block_body(&BlockId::hash(retracted_hash.clone())).await + .unwrap_or(None) + .unwrap_or(Vec::new()); + for tx in txes { + resubmit_transactions.push(tx) + } + } + if let Err(e) = pool.submit_at(&id, resubmit_transactions, true).await { + log::debug!(target: "txpool", + "[{:?}] Error re-submitting transactions: {:?}", id, e + ) + } + } + if is_revalidation_required { - if let Err(e) = pool.revalidate_ready(&id, None).await { + if let Err(e) = pool.revalidate_ready(&id, revalidation_amount).await { log::warn!("revalidate ready failed {:?}", e); } } diff --git a/client/transaction-pool/src/maintainer.rs b/client/transaction-pool/src/maintainer.rs deleted file mode 100644 index 97dc7e10a6f11..0000000000000 --- a/client/transaction-pool/src/maintainer.rs +++ /dev/null @@ -1,645 +0,0 @@ -// Copyright 2019-2020 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -use std::{ - marker::{PhantomData, Unpin}, - sync::Arc, - time::Instant, -}; -use futures::{ - Future, FutureExt, - future::{Either, join, ready}, -}; -use log::{warn, debug, trace}; -use parking_lot::Mutex; - -use sc_client_api::{ - client::BlockBody, - light::{Fetcher, RemoteBodyRequest}, -}; -use sp_runtime::{ - generic::BlockId, - traits::{Block as BlockT, Extrinsic, Header, NumberFor, SimpleArithmetic}, -}; -use sp_blockchain::HeaderBackend; -use sp_transaction_pool::{TransactionPoolMaintainer, runtime_api::TaggedTransactionQueue}; -use sp_api::ProvideRuntimeApi; - -use sc_transaction_graph::{self, ChainApi}; - -/// Basic transaction pool maintainer for full clients. -pub struct FullBasicPoolMaintainer { - pool: Arc>, - client: Arc, -} - -impl FullBasicPoolMaintainer { - /// Create new basic full pool maintainer. - pub fn new( - pool: Arc>, - client: Arc, - ) -> Self { - FullBasicPoolMaintainer { pool, client } - } -} - -impl TransactionPoolMaintainer -for - FullBasicPoolMaintainer -where - Block: BlockT, - Client: ProvideRuntimeApi + HeaderBackend + BlockBody + 'static, - Client::Api: TaggedTransactionQueue, - PoolApi: ChainApi + 'static, -{ - type Block = Block; - type Hash = Block::Hash; - - fn maintain( - &self, - id: &BlockId, - retracted: &[Block::Hash], - ) -> Box + Send + Unpin> { - let now = std::time::Instant::now(); - let took = move || format!("Took {} ms", now.elapsed().as_millis()); - - let id = *id; - trace!(target: "txpool", "[{:?}] Starting pool maintainance", id); - // Put transactions from retracted blocks back into the pool. - let client_copy = self.client.clone(); - let retracted_transactions = retracted.to_vec().into_iter() - .filter_map(move |hash| client_copy.block_body(&BlockId::hash(hash)).ok().unwrap_or(None)) - .flat_map(|block| block.into_iter()) - // if signed information is not present, attempt to resubmit anyway. - .filter(|tx| tx.is_signed().unwrap_or(true)); - let resubmit_future = self.pool - .submit_at(&id, retracted_transactions, true) - .then(move |resubmit_result| ready(match resubmit_result { - Ok(_) => trace!(target: "txpool", - "[{:?}] Re-submitting retracted done. {}", id, took() - ), - Err(e) => debug!(target: "txpool", - "[{:?}] Error re-submitting transactions: {:?}", id, e - ), - })); - - // Avoid calling into runtime if there is nothing to prune from the pool anyway. - if self.pool.status().is_empty() { - return Box::new(resubmit_future) - } - - let block = (self.client.header(id), self.client.block_body(&id)); - let prune_future = match block { - (Ok(Some(header)), Ok(Some(extrinsics))) => { - let parent_id = BlockId::hash(*header.parent_hash()); - let prune_future = self.pool - .prune(&id, &parent_id, &extrinsics) - .then(move |prune_result| ready(match prune_result { - Ok(_) => trace!(target: "txpool", - "[{:?}] Pruning done. {}", id, took() - ), - Err(e) => warn!(target: "txpool", - "[{:?}] Error pruning transactions: {:?}", id, e - ), - })); - - Either::Left(resubmit_future.then(|_| prune_future)) - }, - (Ok(_), Ok(_)) => Either::Right(resubmit_future), - err => { - warn!(target: "txpool", "[{:?}] Error reading block: {:?}", id, err); - Either::Right(resubmit_future) - }, - }; - - let revalidate_future = self.pool - .revalidate_ready(&id, Some(16)) - .then(move |result| ready(match result { - Ok(_) => debug!(target: "txpool", - "[{:?}] Revalidation done: {}", id, took() - ), - Err(e) => warn!(target: "txpool", - "[{:?}] Encountered errors while revalidating transactions: {:?}", id, e - ), - })); - - Box::new(prune_future.then(|_| revalidate_future)) - } -} - -/// Basic transaction pool maintainer for light clients. -pub struct LightBasicPoolMaintainer { - pool: Arc>, - client: Arc, - fetcher: Arc, - revalidate_time_period: Option, - revalidate_block_period: Option>, - revalidation_status: Arc>>>, - _phantom: PhantomData, -} - -impl LightBasicPoolMaintainer - where - Block: BlockT, - Client: ProvideRuntimeApi + HeaderBackend + BlockBody + 'static, - Client::Api: TaggedTransactionQueue, - PoolApi: ChainApi + 'static, - F: Fetcher + 'static, -{ - /// Create light pool maintainer with default constants. - /// - /// Default constants are: revalidate every 60 seconds or every 20 blocks - /// (whatever happens first). - pub fn with_defaults( - pool: Arc>, - client: Arc, - fetcher: Arc, - ) -> Self { - Self::new( - pool, - client, - fetcher, - Some(std::time::Duration::from_secs(60)), - Some(20.into()), - ) - } - - /// Create light pool maintainer with passed constants. - pub fn new( - pool: Arc>, - client: Arc, - fetcher: Arc, - revalidate_time_period: Option, - revalidate_block_period: Option>, - ) -> Self { - Self { - pool, - client, - fetcher, - revalidate_time_period, - revalidate_block_period, - revalidation_status: Arc::new(Mutex::new(TxPoolRevalidationStatus::NotScheduled)), - _phantom: Default::default(), - } - } - - /// Returns future that prunes block transactions from the pool. - fn prune( - &self, - id: &BlockId, - header: &Block::Header, - ) -> impl std::future::Future { - // fetch transactions (possible future optimization: proofs of inclusion) that - // have been included into new block and prune these from the pool - let id = id.clone(); - let pool = self.pool.clone(); - self.fetcher.remote_body(RemoteBodyRequest { - header: header.clone(), - retry_count: None, - }) - .then(move |transactions| ready( - transactions - .map_err(|e| format!("{}", e)) - .and_then(|transactions| { - let hashes = transactions - .into_iter() - .map(|tx| pool.hash_of(&tx)) - .collect::>(); - pool.prune_known(&id, &hashes) - .map_err(|e| format!("{}", e)) - }) - )) - .then(|r| { - if let Err(e) = r { - warn!("Error pruning known transactions: {}", e) - } - ready(()) - }) - } - - /// Returns future that performs in-pool transations revalidation, if required. - fn revalidate( - &self, - id: &BlockId, - header: &Block::Header, - ) -> impl std::future::Future { - // to determine whether ready transaction is still valid, we perform periodic revalidaton - // of ready transactions - let is_revalidation_required = self.revalidation_status.lock().is_required( - *header.number(), - self.revalidate_time_period, - self.revalidate_block_period, - ); - match is_revalidation_required { - true => { - let revalidation_status = self.revalidation_status.clone(); - Either::Left(self.pool - .revalidate_ready(id, None) - .map(|r| r.map_err(|e| warn!("Error revalidating known transactions: {}", e))) - .map(move |_| revalidation_status.lock().clear())) - }, - false => Either::Right(ready(())), - } - } -} - -impl TransactionPoolMaintainer -for - LightBasicPoolMaintainer -where - Block: BlockT, - Client: ProvideRuntimeApi + HeaderBackend + BlockBody + 'static, - Client::Api: TaggedTransactionQueue, - PoolApi: ChainApi + 'static, - F: Fetcher + 'static, -{ - type Block = Block; - type Hash = Block::Hash; - - fn maintain( - &self, - id: &BlockId, - _retracted: &[Block::Hash], - ) -> Box + Send + Unpin> { - // Do nothing if transaction pool is empty. - if self.pool.status().is_empty() { - self.revalidation_status.lock().clear(); - return Box::new(ready(())); - } - let header = self.client.header(*id) - .and_then(|h| h.ok_or(sp_blockchain::Error::UnknownBlock(format!("{}", id)))); - let header = match header { - Ok(header) => header, - Err(err) => { - println!("Failed to maintain light tx pool: {:?}", err); - return Box::new(ready(())); - } - }; - - // else prune block transactions from the pool - let prune_future = self.prune(id, &header); - - // and then (optionally) revalidate in-pool transactions - let revalidate_future = self.revalidate(id, &header); - - let maintain_future = join( - prune_future, - revalidate_future, - ).map(|_| ()); - - Box::new(maintain_future) - } -} - -/// The status of transactions revalidation at light tx pool. -#[cfg_attr(test, derive(Debug))] -enum TxPoolRevalidationStatus { - /// The revalidation has never been completed. - NotScheduled, - /// The revalidation is scheduled. - Scheduled(Option, Option), - /// The revalidation is in progress. - InProgress, -} - -impl TxPoolRevalidationStatus { - /// Called when revalidation is completed. - pub fn clear(&mut self) { - *self = TxPoolRevalidationStatus::NotScheduled; - } - - /// Returns true if revalidation is required. - pub fn is_required( - &mut self, - block: N, - revalidate_time_period: Option, - revalidate_block_period: Option, - ) -> bool { - match *self { - TxPoolRevalidationStatus::NotScheduled => { - *self = TxPoolRevalidationStatus::Scheduled( - revalidate_time_period.map(|period| Instant::now() + period), - revalidate_block_period.map(|period| block + period), - ); - false - }, - TxPoolRevalidationStatus::Scheduled(revalidate_at_time, revalidate_at_block) => { - let is_required = revalidate_at_time.map(|at| Instant::now() >= at).unwrap_or(false) - || revalidate_at_block.map(|at| block >= at).unwrap_or(false); - if is_required { - *self = TxPoolRevalidationStatus::InProgress; - } - is_required - }, - TxPoolRevalidationStatus::InProgress => false, - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use futures::executor::block_on; - use codec::Encode; - use substrate_test_runtime_client::{ - prelude::*, Client, runtime::{Block, Transfer}, sp_consensus::{BlockOrigin, SelectChain}, - LongestChain, - }; - use sp_transaction_pool::PoolStatus; - use crate::api::{FullChainApi, LightChainApi}; - - struct TestSetup { - client: Arc>, - longest_chain: LongestChain, - pool: Arc>, - } - - impl TestSetup { - fn new() -> TestSetup, Block>> { - let (client, longest_chain) = TestClientBuilder::new().build_with_longest_chain(); - let client = Arc::new(client); - let pool = Arc::new( - sc_transaction_graph::Pool::new(Default::default(), FullChainApi::new(client.clone())), - ); - TestSetup { - client, - longest_chain, - pool, - } - } - - fn new_light(fetcher: Arc) -> TestSetup, F, Block>> - where F: Fetcher + 'static, - { - let (client, longest_chain) = TestClientBuilder::new().build_with_longest_chain(); - let client = Arc::new(client); - let pool = Arc::new( - sc_transaction_graph::Pool::new( - Default::default(), - LightChainApi::new(client.clone(), fetcher) - ), - ); - TestSetup { - client, - longest_chain, - pool, - } - } - } - - fn setup() -> TestSetup, Block>> { - TestSetup::, Block>>::new() - } - - fn setup_light(fetcher: Arc) -> TestSetup, F, Block>> - where F: Fetcher + 'static, - { - TestSetup::, F, Block>>::new_light(fetcher) - } - - #[test] - fn should_remove_transactions_from_the_full_pool() { - let mut setup = setup(); - - let transaction = Transfer { - amount: 5, - nonce: 0, - from: AccountKeyring::Alice.into(), - to: Default::default(), - }.into_signed_tx(); - let best = setup.longest_chain.best_chain().unwrap(); - - // store the transaction in the pool - block_on(setup.pool.submit_one(&BlockId::hash(best.hash()), transaction.clone())).unwrap(); - - // import the block - let mut builder = setup.client.new_block(Default::default()).unwrap(); - builder.push(transaction.clone()).unwrap(); - let block = builder.build().unwrap().block; - let id = BlockId::hash(block.header().hash()); - setup.client.import(BlockOrigin::Own, block).unwrap(); - - // fire notification - this should clean up the queue - assert_eq!(setup.pool.status().ready, 1); - block_on(FullBasicPoolMaintainer::new(setup.pool.clone(), setup.client.clone()).maintain(&id, &[])); - - // then - assert_eq!(setup.pool.status().ready, 0); - assert_eq!(setup.pool.status().future, 0); - } - - #[test] - fn should_remove_transactions_from_the_light_pool() { - let transaction = Transfer { - amount: 5, - nonce: 0, - from: AccountKeyring::Alice.into(), - to: Default::default(), - }.into_signed_tx(); - let fetcher_transaction = transaction.clone(); - let fetcher = Arc::new(substrate_test_runtime_client::new_light_fetcher() - .with_remote_body(Some(Box::new(move |_| Ok(vec![fetcher_transaction.clone()])))) - .with_remote_call(Some(Box::new(move |_| { - let validity: sp_runtime::transaction_validity::TransactionValidity = - Ok(sp_runtime::transaction_validity::ValidTransaction { - priority: 0, - requires: Vec::new(), - provides: vec![vec![42]], - longevity: 0, - propagate: true, - }); - Ok(validity.encode()) - })))); - - let setup = setup_light(fetcher.clone()); - let best = setup.longest_chain.best_chain().unwrap(); - - // store the transaction in the pool - block_on(setup.pool.submit_one(&BlockId::hash(best.hash()), transaction.clone())).unwrap(); - - // fire notification - this should clean up the queue - assert_eq!(setup.pool.status().ready, 1); - block_on(LightBasicPoolMaintainer::with_defaults(setup.pool.clone(), setup.client.clone(), fetcher).maintain( - &BlockId::Number(0), - &[], - )); - - // then - assert_eq!(setup.pool.status().ready, 0); - assert_eq!(setup.pool.status().future, 0); - } - - #[test] - fn should_schedule_transactions_revalidation_at_light_pool() { - // when revalidation is not scheduled, it became scheduled - let mut status = TxPoolRevalidationStatus::NotScheduled; - assert!(!status.is_required(10u32, None, None)); - match status { - TxPoolRevalidationStatus::Scheduled(_, _) => (), - _ => panic!("Unexpected status: {:?}", status), - } - - // revalidation required at time - let mut status = TxPoolRevalidationStatus::Scheduled(Some(Instant::now()), None); - assert!(status.is_required(10u32, None, None)); - match status { - TxPoolRevalidationStatus::InProgress => (), - _ => panic!("Unexpected status: {:?}", status), - } - - // revalidation required at block - let mut status = TxPoolRevalidationStatus::Scheduled(None, Some(10)); - assert!(status.is_required(10u32, None, None)); - match status { - TxPoolRevalidationStatus::InProgress => (), - _ => panic!("Unexpected status: {:?}", status), - } - } - - #[test] - fn should_revalidate_transactions_at_light_pool() { - use std::sync::atomic; - use sp_runtime::transaction_validity::*; - - let build_fetcher = || { - let validated = Arc::new(atomic::AtomicBool::new(false)); - Arc::new(substrate_test_runtime_client::new_light_fetcher() - .with_remote_body(Some(Box::new(move |_| Ok(vec![])))) - .with_remote_call(Some(Box::new(move |_| { - let is_inserted = validated.swap(true, atomic::Ordering::SeqCst); - let validity: TransactionValidity = if is_inserted { - Err(TransactionValidityError::Invalid( - InvalidTransaction::Custom(0) - )) - } else { - Ok(ValidTransaction { - priority: 0, - requires: Vec::new(), - provides: vec![vec![42]], - longevity: 0, - propagate: true, - }) - }; - Ok(validity.encode()) - })))) - }; - - fn with_fetcher_maintain + 'static>( - fetcher: Arc, - revalidate_time_period: Option, - revalidate_block_period: Option, - prepare_maintainer: impl Fn(&Mutex>), - ) -> PoolStatus { - let setup = setup_light(fetcher.clone()); - let best = setup.longest_chain.best_chain().unwrap(); - - // let's prepare maintainer - let maintainer = LightBasicPoolMaintainer::new( - setup.pool.clone(), - setup.client.clone(), - fetcher, - revalidate_time_period, - revalidate_block_period, - ); - prepare_maintainer(&*maintainer.revalidation_status); - - // store the transaction in the pool - block_on(setup.pool.submit_one( - &BlockId::hash(best.hash()), - Transfer { - amount: 5, - nonce: 0, - from: AccountKeyring::Alice.into(), - to: Default::default(), - }.into_signed_tx(), - )).unwrap(); - - // and run maintain procedures - block_on(maintainer.maintain(&BlockId::Number(0), &[])); - - setup.pool.status() - } - - // when revalidation is never required - nothing happens - let fetcher = build_fetcher(); - //let maintainer = DefaultLightTransactionPoolMaintainer::new(client.clone(), fetcher.clone(), None, None); - let status = with_fetcher_maintain(fetcher, None, None, |_revalidation_status| {}); - assert_eq!(status.ready, 1); - - // when revalidation is scheduled by time - it is performed - let fetcher = build_fetcher(); - let status = with_fetcher_maintain(fetcher, None, None, |revalidation_status| - *revalidation_status.lock() = TxPoolRevalidationStatus::Scheduled(Some(Instant::now()), None) - ); - assert_eq!(status.ready, 0); - - // when revalidation is scheduled by block number - it is performed - let fetcher = build_fetcher(); - let status = with_fetcher_maintain(fetcher, None, None, |revalidation_status| - *revalidation_status.lock() = TxPoolRevalidationStatus::Scheduled(None, Some(0)) - ); - assert_eq!(status.ready, 0); - } - - #[test] - fn should_add_reverted_transactions_to_the_pool() { - let mut setup = setup(); - - let transaction = Transfer { - amount: 5, - nonce: 0, - from: AccountKeyring::Alice.into(), - to: Default::default(), - }.into_signed_tx(); - let best = setup.longest_chain.best_chain().unwrap(); - - // store the transaction in the pool - block_on(setup.pool.submit_one(&BlockId::hash(best.hash()), transaction.clone())).unwrap(); - - // import the block - let mut builder = setup.client.new_block(Default::default()).unwrap(); - builder.push(transaction.clone()).unwrap(); - let block = builder.build().unwrap().block; - let block1_hash = block.header().hash(); - let id = BlockId::hash(block1_hash.clone()); - setup.client.import(BlockOrigin::Own, block).unwrap(); - - // fire notification - this should clean up the queue - assert_eq!(setup.pool.status().ready, 1); - block_on(FullBasicPoolMaintainer::new(setup.pool.clone(), setup.client.clone()).maintain(&id, &[])); - - // then - assert_eq!(setup.pool.status().ready, 0); - assert_eq!(setup.pool.status().future, 0); - - // import second block - let builder = setup.client.new_block_at( - &BlockId::hash(best.hash()), - Default::default(), - false, - ).unwrap(); - let block = builder.build().unwrap().block; - let id = BlockId::hash(block.header().hash()); - setup.client.import(BlockOrigin::Own, block).unwrap(); - - // fire notification - this should add the transaction back to the pool. - block_on(FullBasicPoolMaintainer::new(setup.pool.clone(), setup.client.clone()).maintain(&id, &[block1_hash])); - - // then - assert_eq!(setup.pool.status().ready, 1); - assert_eq!(setup.pool.status().future, 0); - } -} diff --git a/client/transaction-pool/src/tests.rs b/client/transaction-pool/src/tests.rs index 172ff06391051..fafc698901c93 100644 --- a/client/transaction-pool/src/tests.rs +++ b/client/transaction-pool/src/tests.rs @@ -14,221 +14,271 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . - use super::*; +use crate::{BasicPool, MaintainedTransactionPool}; use codec::Encode; use futures::executor::block_on; -use sc_transaction_graph::{self, Pool}; -use substrate_test_runtime_client::{runtime::{AccountId, Block, Hash, Header, Index, Extrinsic, Transfer}, AccountKeyring::{self, *}}; +use parking_lot::RwLock; +use sc_transaction_graph::{self, ExHash, Pool}; use sp_runtime::{ - generic::{self, BlockId, Block as BlockT}, - traits::{Hash as HashT, BlakeTwo256}, - transaction_validity::{TransactionValidity, ValidTransaction}, + generic::{self, BlockId}, + traits::{BlakeTwo256, Hash as HashT}, + transaction_validity::{TransactionValidity, ValidTransaction, TransactionValidityError, InvalidTransaction}, +}; +use std::collections::HashSet; +use substrate_test_runtime_client::{ + runtime::{AccountId, Block, BlockNumber, Extrinsic, Hash, Header, Index, Transfer}, + AccountKeyring::{self, *}, }; struct TestApi { - pub modifier: Box, + pub modifier: RwLock>, + pub chain_block_by_number: RwLock>>, + pub chain_headers_by_number: RwLock>, + pub invalid_hashes: RwLock>>, } impl TestApi { - fn default() -> Self { - TestApi { - modifier: Box::new(|_| {}), - } + fn default() -> Self { + TestApi { + modifier: RwLock::new(Box::new(|_| {})), + chain_block_by_number: RwLock::new(HashMap::new()), + invalid_hashes: RwLock::new(HashSet::new()), + chain_headers_by_number: RwLock::new(HashMap::new()), + } + } + + fn push_block(&self, block_number: BlockNumber, xts: Vec) { + self.chain_block_by_number.write().insert(block_number, xts); + self.chain_headers_by_number.write().insert(block_number, Header { + number: block_number, + digest: Default::default(), + extrinsics_root: Default::default(), + parent_hash: Default::default(), + state_root: Default::default(), + }); } } impl sc_transaction_graph::ChainApi for TestApi { - type Block = Block; - type Hash = Hash; - type Error = error::Error; - type ValidationFuture = futures::future::Ready>; - type BodyFuture = futures::future::Ready>>; - - fn validate_transaction( - &self, - at: &BlockId, - uxt: sc_transaction_graph::ExtrinsicFor, - ) -> Self::ValidationFuture { - let expected = index(at); - let requires = if expected == uxt.transfer().nonce { - vec![] - } else { - vec![vec![uxt.transfer().nonce as u8 - 1]] - }; + type Block = Block; + type Hash = Hash; + type Error = error::Error; + type ValidationFuture = futures::future::Ready>; + type BodyFuture = futures::future::Ready>>>; + + fn validate_transaction( + &self, + at: &BlockId, + uxt: sc_transaction_graph::ExtrinsicFor, + ) -> Self::ValidationFuture { + let expected = index(at); + let requires = if expected == uxt.transfer().nonce { + vec![] + } else { + vec![vec![uxt.transfer().nonce as u8 - 1]] + }; let provides = vec![vec![uxt.transfer().nonce as u8]]; - let mut validity = ValidTransaction { - priority: 1, - requires, - provides, - longevity: 64, - propagate: true, - }; - - (self.modifier)(&mut validity); - - futures::future::ready(Ok( - Ok(validity) - )) - } - - fn block_id_to_number(&self, at: &BlockId) -> error::Result>> { - Ok(Some(number_of(at))) - } - - fn block_id_to_hash(&self, at: &BlockId) -> error::Result>> { - Ok(match at { - generic::BlockId::Hash(x) => Some(x.clone()), - _ => Some(Default::default()), - }) - } - - fn hash_and_length(&self, ex: &sc_transaction_graph::ExtrinsicFor) -> (Self::Hash, usize) { - let encoded = ex.encode(); - (BlakeTwo256::hash(&encoded), encoded.len()) - } - - fn block_header(&self, id: &BlockId) -> error::Result> { - Ok(None) - } + if self.invalid_hashes.read().contains(&self.hash_and_length(&uxt).0) { + return futures::future::ready(Ok( + Err(TransactionValidityError::Invalid(InvalidTransaction::Custom(0))) + )) + } - fn block_body(&self, id: &BlockId) -> Self::BodyFuture { - futures::future::ready(Ok(vec![])) - } + let mut validity = ValidTransaction { + priority: 1, + requires, + provides, + longevity: 64, + propagate: true, + }; + + (self.modifier.read())(&mut validity); + + futures::future::ready(Ok(Ok(validity))) + } + + fn block_id_to_number( + &self, + at: &BlockId, + ) -> error::Result>> { + Ok(Some(number_of(at))) + } + + fn block_id_to_hash( + &self, + at: &BlockId, + ) -> error::Result>> { + Ok(match at { + generic::BlockId::Hash(x) => Some(x.clone()), + _ => Some(Default::default()), + }) + } + + fn hash_and_length( + &self, + ex: &sc_transaction_graph::ExtrinsicFor, + ) -> (Self::Hash, usize) { + let encoded = ex.encode(); + (BlakeTwo256::hash(&encoded), encoded.len()) + } + + fn block_body(&self, id: &BlockId) -> Self::BodyFuture { + futures::future::ready(Ok(if let BlockId::Number(num) = id { + self.chain_block_by_number.read().get(num).cloned() + } else { + None + })) + } } fn index(at: &BlockId) -> u64 { - 209 + number_of(at) + 209 + number_of(at) } fn number_of(at: &BlockId) -> u64 { - match at { - generic::BlockId::Number(n) => *n as u64, - _ => 0, - } + match at { + generic::BlockId::Number(n) => *n as u64, + _ => 0, + } } fn uxt(who: AccountKeyring, nonce: Index) -> Extrinsic { - let transfer = Transfer { - from: who.into(), - to: AccountId::default(), - nonce, - amount: 1, - }; - let signature = transfer.using_encoded(|e| who.sign(e)); - Extrinsic::Transfer(transfer, signature.into()) + let transfer = Transfer { + from: who.into(), + to: AccountId::default(), + nonce, + amount: 1, + }; + let signature = transfer.using_encoded(|e| who.sign(e)); + Extrinsic::Transfer(transfer, signature.into()) } fn pool() -> Pool { - Pool::new(Default::default(), TestApi::default().into()) + Pool::new(Default::default(), TestApi::default().into()) } -fn pool_api_with_modifier(modifier: Box) -> Pool { - Pool::new(Default::default(), TestApi { modifier }.into()) +fn maintained_pool() -> BasicPool { + BasicPool::new(Default::default(), TestApi::default()) } #[test] fn submission_should_work() { - let pool = pool(); - assert_eq!(209, index(&BlockId::number(0))); - block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); + let pool = pool(); + assert_eq!(209, index(&BlockId::number(0))); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); - let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); - assert_eq!(pending, vec![209]); + let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); + assert_eq!(pending, vec![209]); } #[test] fn multiple_submission_should_work() { - let pool = pool(); - block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); - block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 210))).unwrap(); + let pool = pool(); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 210))).unwrap(); - let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); - assert_eq!(pending, vec![209, 210]); + let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); + assert_eq!(pending, vec![209, 210]); } #[test] fn early_nonce_should_be_culled() { - let pool = pool(); - block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 208))).unwrap(); + let pool = pool(); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 208))).unwrap(); - let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); - assert_eq!(pending, Vec::::new()); + let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); + assert_eq!(pending, Vec::::new()); } #[test] fn late_nonce_should_be_queued() { - let pool = pool(); + let pool = pool(); - block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 210))).unwrap(); - let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); - assert_eq!(pending, Vec::::new()); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 210))).unwrap(); + let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); + assert_eq!(pending, Vec::::new()); - block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); - let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); - assert_eq!(pending, vec![209, 210]); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); + let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); + assert_eq!(pending, vec![209, 210]); } #[test] fn prune_tags_should_work() { - let pool = pool(); - block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); - block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 210))).unwrap(); + let pool = pool(); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 210))).unwrap(); - let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); - assert_eq!(pending, vec![209, 210]); + let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); + assert_eq!(pending, vec![209, 210]); - block_on(pool.prune_tags(&BlockId::number(1), vec![vec![209]], vec![])).unwrap(); + block_on(pool.prune_tags(&BlockId::number(1), vec![vec![209]], vec![])).unwrap(); - let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); - assert_eq!(pending, vec![210]); + let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); + assert_eq!(pending, vec![210]); } #[test] fn should_ban_invalid_transactions() { - let pool = pool(); - let uxt = uxt(Alice, 209); - let hash = block_on(pool.submit_one(&BlockId::number(0), uxt.clone())).unwrap(); - pool.remove_invalid(&[hash]); - block_on(pool.submit_one(&BlockId::number(0), uxt.clone())).unwrap_err(); - - // when - let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); - assert_eq!(pending, Vec::::new()); - - // then - block_on(pool.submit_one(&BlockId::number(0), uxt.clone())).unwrap_err(); + let pool = pool(); + let uxt = uxt(Alice, 209); + let hash = block_on(pool.submit_one(&BlockId::number(0), uxt.clone())).unwrap(); + pool.remove_invalid(&[hash]); + block_on(pool.submit_one(&BlockId::number(0), uxt.clone())).unwrap_err(); + + // when + let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); + assert_eq!(pending, Vec::::new()); + + // then + block_on(pool.submit_one(&BlockId::number(0), uxt.clone())).unwrap_err(); } #[test] fn should_correctly_prune_transactions_providing_more_than_one_tag() { - let mut api = TestApi::default(); - api.modifier = Box::new(|v: &mut ValidTransaction| { - v.provides.push(vec![155]); - }); - let pool = Pool::new(Default::default(), Arc::new(api)); + let api = TestApi::default(); + *api.modifier.write() = Box::new(|v: &mut ValidTransaction| { + v.provides.push(vec![155]); + }); + let pool = Pool::new(Default::default(), Arc::new(api)); + let xt = uxt(Alice, 209); + block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported"); + assert_eq!(pool.status().ready, 1); + + // remove the transaction that just got imported. + block_on(pool.prune_tags(&BlockId::number(1), vec![vec![209]], vec![])).expect("1. Pruned"); + assert_eq!(pool.status().ready, 0); + // it's re-imported to future + assert_eq!(pool.status().future, 1); + + // so now let's insert another transaction that also provides the 155 + let xt = uxt(Alice, 211); + block_on(pool.submit_one(&BlockId::number(2), xt.clone())).expect("2. Imported"); + assert_eq!(pool.status().ready, 1); + assert_eq!(pool.status().future, 1); + let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); + assert_eq!(pending, vec![211]); + + // prune it and make sure the pool is empty + block_on(pool.prune_tags(&BlockId::number(3), vec![vec![155]], vec![])).expect("2. Pruned"); + assert_eq!(pool.status().ready, 0); + assert_eq!(pool.status().future, 2); +} + +#[test] +fn maintaince_prune() { let xt = uxt(Alice, 209); - block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported"); - assert_eq!(pool.status().ready, 1); - - // remove the transaction that just got imported. - block_on(pool.prune_tags(&BlockId::number(1), vec![vec![209]], vec![])).expect("1. Pruned"); - assert_eq!(pool.status().ready, 0); - // it's re-imported to future - assert_eq!(pool.status().future, 1); - - // so now let's insert another transaction that also provides the 155 - let xt = uxt(Alice, 211); - block_on(pool.submit_one(&BlockId::number(2), xt.clone())).expect("2. Imported"); - assert_eq!(pool.status().ready, 1); - assert_eq!(pool.status().future, 1); - let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); - assert_eq!(pending, vec![211]); - - // prune it and make sure the pool is empty - block_on(pool.prune_tags(&BlockId::number(3), vec![vec![155]], vec![])).expect("2. Pruned"); - assert_eq!(pool.status().ready, 0); - assert_eq!(pool.status().future, 2); + + let pool = maintained_pool(); + + block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported"); + assert_eq!(pool.status().ready, 1); + + pool.api.push_block(1, vec![xt.clone()]); + + block_on(pool.maintain(&BlockId::number(1), &[])); + assert_eq!(pool.status().ready, 0); } diff --git a/primitives/transaction-pool/src/pool.rs b/primitives/transaction-pool/src/pool.rs index 3fc19445c15d1..8da7d7ecbe4dd 100644 --- a/primitives/transaction-pool/src/pool.rs +++ b/primitives/transaction-pool/src/pool.rs @@ -226,6 +226,7 @@ pub trait TransactionPool: Send + Sync { fn hash_of(&self, xt: &TransactionFor) -> TxHash; } +/// Trait for transaction pool maintaince. pub trait MaintainedTransactionPool : TransactionPool { /// Perform maintaince fn maintain(&self, block: &BlockId, retracted: &[BlockHash]) From 5baf228dbed1477d2caf1d58ac1b2ff0f80be495 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Wed, 15 Jan 2020 16:18:52 +0300 Subject: [PATCH 06/12] fix benches --- client/transaction-pool/graph/benches/basics.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/client/transaction-pool/graph/benches/basics.rs b/client/transaction-pool/graph/benches/basics.rs index 557a2ca3d1f85..1dfe2aeea1797 100644 --- a/client/transaction-pool/graph/benches/basics.rs +++ b/client/transaction-pool/graph/benches/basics.rs @@ -16,7 +16,7 @@ use criterion::{criterion_group, criterion_main, Criterion}; -use futures::executor::block_on; +use futures::{future::{ready, Ready}, executor::block_on}; use sc_transaction_graph::*; use sp_runtime::transaction_validity::{ValidTransaction, InvalidTransaction}; use codec::Encode; @@ -49,7 +49,8 @@ impl ChainApi for TestApi { type Block = Block; type Hash = H256; type Error = sp_transaction_pool::error::Error; - type ValidationFuture = futures::future::Ready>; + type ValidationFuture = Ready>; + type BodyFuture = Ready>>>; fn validate_transaction( &self, @@ -61,14 +62,14 @@ impl ChainApi for TestApi { match self.block_id_to_number(at) { Ok(Some(num)) if num > 5 => { - return futures::future::ready( + return ready( Ok(Err(InvalidTransaction::Stale.into())) ) }, _ => {}, } - futures::future::ready( + ready( Ok(Ok(ValidTransaction { priority: 4, requires: if nonce > 1 && self.nonce_dependant { @@ -105,6 +106,10 @@ impl ChainApi for TestApi { let encoded = uxt.encode(); (blake2_256(&encoded).into(), encoded.len()) } + + fn block_body(&self, _id: &BlockId) -> Self::BodyFuture { + ready(Ok(None)) + } } fn uxt(transfer: Transfer) -> Extrinsic { From 4cff5ebfc041c5af5703a2a84d98ddd9a39ab9e0 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Wed, 15 Jan 2020 16:19:57 +0300 Subject: [PATCH 07/12] fix new_light --- bin/node/cli/src/service.rs | 4 +++- client/transaction-pool/graph/benches/basics.rs | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index cadc6159de2f0..179cc92a399db 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -314,7 +314,9 @@ pub fn new_light(config: NodeConfiguration) let fetcher = fetcher .ok_or_else(|| "Trying to start light transaction pool without active fetcher")?; let pool_api = sc_transaction_pool::LightChainApi::new(client.clone(), fetcher.clone()); - let pool = sc_transaction_pool::BasicPool::new(config, pool_api); + let pool = sc_transaction_pool::BasicPool::with_revalidation_type( + config, pool_api, sc_transaction_pool::RevalidationType::Light, + ); Ok(pool) })? .with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool| { diff --git a/client/transaction-pool/graph/benches/basics.rs b/client/transaction-pool/graph/benches/basics.rs index 1dfe2aeea1797..75d15cc1f1916 100644 --- a/client/transaction-pool/graph/benches/basics.rs +++ b/client/transaction-pool/graph/benches/basics.rs @@ -50,7 +50,7 @@ impl ChainApi for TestApi { type Hash = H256; type Error = sp_transaction_pool::error::Error; type ValidationFuture = Ready>; - type BodyFuture = Ready>>>; + type BodyFuture = Ready>>>; fn validate_transaction( &self, @@ -155,13 +155,13 @@ fn benchmark_main(c: &mut Criterion) { c.bench_function("sequential 50 tx", |b| { b.iter(|| { - bench_configured(Pool::new(Default::default(), TestApi::new_dependant()), 50); + bench_configured(Pool::new(Default::default(), TestApi::new_dependant().into()), 50); }); }); c.bench_function("random 100 tx", |b| { b.iter(|| { - bench_configured(Pool::new(Default::default(), TestApi::default()), 100); + bench_configured(Pool::new(Default::default(), TestApi::default().into()), 100); }); }); } From 29d389b4cc48df92f22f95ea4cc9aee61e06ea31 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Sun, 19 Jan 2020 11:33:37 +0300 Subject: [PATCH 08/12] Add revalidation test case --- client/transaction-pool/src/tests.rs | 353 ++++++++++++++------------- 1 file changed, 189 insertions(+), 164 deletions(-) diff --git a/client/transaction-pool/src/tests.rs b/client/transaction-pool/src/tests.rs index fafc698901c93..e5c351061c167 100644 --- a/client/transaction-pool/src/tests.rs +++ b/client/transaction-pool/src/tests.rs @@ -22,31 +22,33 @@ use futures::executor::block_on; use parking_lot::RwLock; use sc_transaction_graph::{self, ExHash, Pool}; use sp_runtime::{ - generic::{self, BlockId}, - traits::{BlakeTwo256, Hash as HashT}, - transaction_validity::{TransactionValidity, ValidTransaction, TransactionValidityError, InvalidTransaction}, + generic::{self, BlockId}, + traits::{BlakeTwo256, Hash as HashT}, + transaction_validity::{TransactionValidity, ValidTransaction, TransactionValidityError, InvalidTransaction}, }; use std::collections::HashSet; use substrate_test_runtime_client::{ - runtime::{AccountId, Block, BlockNumber, Extrinsic, Hash, Header, Index, Transfer}, - AccountKeyring::{self, *}, + runtime::{AccountId, Block, BlockNumber, Extrinsic, Hash, Header, Index, Transfer}, + AccountKeyring::{self, *}, }; struct TestApi { - pub modifier: RwLock>, - pub chain_block_by_number: RwLock>>, - pub chain_headers_by_number: RwLock>, - pub invalid_hashes: RwLock>>, + pub modifier: RwLock>, + pub chain_block_by_number: RwLock>>, + pub chain_headers_by_number: RwLock>, + pub invalid_hashes: RwLock>>, + pub validation_requests: RwLock>, } impl TestApi { - fn default() -> Self { - TestApi { - modifier: RwLock::new(Box::new(|_| {})), - chain_block_by_number: RwLock::new(HashMap::new()), + fn default() -> Self { + TestApi { + modifier: RwLock::new(Box::new(|_| {})), + chain_block_by_number: RwLock::new(HashMap::new()), invalid_hashes: RwLock::new(HashSet::new()), chain_headers_by_number: RwLock::new(HashMap::new()), - } + validation_requests: RwLock::new(Default::default()), + } } fn push_block(&self, block_number: BlockNumber, xts: Vec) { @@ -62,23 +64,26 @@ impl TestApi { } impl sc_transaction_graph::ChainApi for TestApi { - type Block = Block; - type Hash = Hash; - type Error = error::Error; - type ValidationFuture = futures::future::Ready>; - type BodyFuture = futures::future::Ready>>>; - - fn validate_transaction( - &self, - at: &BlockId, - uxt: sc_transaction_graph::ExtrinsicFor, - ) -> Self::ValidationFuture { - let expected = index(at); - let requires = if expected == uxt.transfer().nonce { - vec![] - } else { - vec![vec![uxt.transfer().nonce as u8 - 1]] - }; + type Block = Block; + type Hash = Hash; + type Error = error::Error; + type ValidationFuture = futures::future::Ready>; + type BodyFuture = futures::future::Ready>>>; + + fn validate_transaction( + &self, + at: &BlockId, + uxt: sc_transaction_graph::ExtrinsicFor, + ) -> Self::ValidationFuture { + + self.validation_requests.write().push(uxt.clone()); + + let expected = index(at); + let requires = if expected == uxt.transfer().nonce { + vec![] + } else { + vec![vec![uxt.transfer().nonce as u8 - 1]] + }; let provides = vec![vec![uxt.transfer().nonce as u8]]; if self.invalid_hashes.read().contains(&self.hash_and_length(&uxt).0) { @@ -87,198 +92,218 @@ impl sc_transaction_graph::ChainApi for TestApi { )) } - let mut validity = ValidTransaction { - priority: 1, - requires, - provides, - longevity: 64, - propagate: true, - }; - - (self.modifier.read())(&mut validity); - - futures::future::ready(Ok(Ok(validity))) - } - - fn block_id_to_number( - &self, - at: &BlockId, - ) -> error::Result>> { - Ok(Some(number_of(at))) - } - - fn block_id_to_hash( - &self, - at: &BlockId, - ) -> error::Result>> { - Ok(match at { - generic::BlockId::Hash(x) => Some(x.clone()), - _ => Some(Default::default()), - }) - } - - fn hash_and_length( - &self, - ex: &sc_transaction_graph::ExtrinsicFor, - ) -> (Self::Hash, usize) { - let encoded = ex.encode(); - (BlakeTwo256::hash(&encoded), encoded.len()) - } - - fn block_body(&self, id: &BlockId) -> Self::BodyFuture { - futures::future::ready(Ok(if let BlockId::Number(num) = id { - self.chain_block_by_number.read().get(num).cloned() - } else { - None - })) - } + let mut validity = ValidTransaction { + priority: 1, + requires, + provides, + longevity: 64, + propagate: true, + }; + + (self.modifier.read())(&mut validity); + + futures::future::ready(Ok(Ok(validity))) + } + + fn block_id_to_number( + &self, + at: &BlockId, + ) -> error::Result>> { + Ok(Some(number_of(at))) + } + + fn block_id_to_hash( + &self, + at: &BlockId, + ) -> error::Result>> { + Ok(match at { + generic::BlockId::Hash(x) => Some(x.clone()), + _ => Some(Default::default()), + }) + } + + fn hash_and_length( + &self, + ex: &sc_transaction_graph::ExtrinsicFor, + ) -> (Self::Hash, usize) { + let encoded = ex.encode(); + (BlakeTwo256::hash(&encoded), encoded.len()) + } + + fn block_body(&self, id: &BlockId) -> Self::BodyFuture { + futures::future::ready(Ok(if let BlockId::Number(num) = id { + self.chain_block_by_number.read().get(num).cloned() + } else { + None + })) + } } fn index(at: &BlockId) -> u64 { - 209 + number_of(at) + 209 + number_of(at) } fn number_of(at: &BlockId) -> u64 { - match at { - generic::BlockId::Number(n) => *n as u64, - _ => 0, - } + match at { + generic::BlockId::Number(n) => *n as u64, + _ => 0, + } } fn uxt(who: AccountKeyring, nonce: Index) -> Extrinsic { - let transfer = Transfer { - from: who.into(), - to: AccountId::default(), - nonce, - amount: 1, - }; - let signature = transfer.using_encoded(|e| who.sign(e)); - Extrinsic::Transfer(transfer, signature.into()) + let transfer = Transfer { + from: who.into(), + to: AccountId::default(), + nonce, + amount: 1, + }; + let signature = transfer.using_encoded(|e| who.sign(e)); + Extrinsic::Transfer(transfer, signature.into()) } fn pool() -> Pool { - Pool::new(Default::default(), TestApi::default().into()) + Pool::new(Default::default(), TestApi::default().into()) } fn maintained_pool() -> BasicPool { - BasicPool::new(Default::default(), TestApi::default()) + BasicPool::new(Default::default(), TestApi::default()) } #[test] fn submission_should_work() { - let pool = pool(); - assert_eq!(209, index(&BlockId::number(0))); - block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); + let pool = pool(); + assert_eq!(209, index(&BlockId::number(0))); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); - let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); - assert_eq!(pending, vec![209]); + let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); + assert_eq!(pending, vec![209]); } #[test] fn multiple_submission_should_work() { - let pool = pool(); - block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); - block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 210))).unwrap(); + let pool = pool(); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 210))).unwrap(); - let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); - assert_eq!(pending, vec![209, 210]); + let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); + assert_eq!(pending, vec![209, 210]); } #[test] fn early_nonce_should_be_culled() { - let pool = pool(); - block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 208))).unwrap(); + let pool = pool(); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 208))).unwrap(); - let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); - assert_eq!(pending, Vec::::new()); + let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); + assert_eq!(pending, Vec::::new()); } #[test] fn late_nonce_should_be_queued() { - let pool = pool(); + let pool = pool(); - block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 210))).unwrap(); - let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); - assert_eq!(pending, Vec::::new()); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 210))).unwrap(); + let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); + assert_eq!(pending, Vec::::new()); - block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); - let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); - assert_eq!(pending, vec![209, 210]); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); + let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); + assert_eq!(pending, vec![209, 210]); } #[test] fn prune_tags_should_work() { - let pool = pool(); - block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); - block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 210))).unwrap(); + let pool = pool(); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 210))).unwrap(); - let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); - assert_eq!(pending, vec![209, 210]); + let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); + assert_eq!(pending, vec![209, 210]); - block_on(pool.prune_tags(&BlockId::number(1), vec![vec![209]], vec![])).unwrap(); + block_on(pool.prune_tags(&BlockId::number(1), vec![vec![209]], vec![])).unwrap(); - let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); - assert_eq!(pending, vec![210]); + let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); + assert_eq!(pending, vec![210]); } #[test] fn should_ban_invalid_transactions() { - let pool = pool(); - let uxt = uxt(Alice, 209); - let hash = block_on(pool.submit_one(&BlockId::number(0), uxt.clone())).unwrap(); - pool.remove_invalid(&[hash]); - block_on(pool.submit_one(&BlockId::number(0), uxt.clone())).unwrap_err(); - - // when - let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); - assert_eq!(pending, Vec::::new()); - - // then - block_on(pool.submit_one(&BlockId::number(0), uxt.clone())).unwrap_err(); + let pool = pool(); + let uxt = uxt(Alice, 209); + let hash = block_on(pool.submit_one(&BlockId::number(0), uxt.clone())).unwrap(); + pool.remove_invalid(&[hash]); + block_on(pool.submit_one(&BlockId::number(0), uxt.clone())).unwrap_err(); + + // when + let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); + assert_eq!(pending, Vec::::new()); + + // then + block_on(pool.submit_one(&BlockId::number(0), uxt.clone())).unwrap_err(); } #[test] fn should_correctly_prune_transactions_providing_more_than_one_tag() { - let api = TestApi::default(); - *api.modifier.write() = Box::new(|v: &mut ValidTransaction| { - v.provides.push(vec![155]); - }); - let pool = Pool::new(Default::default(), Arc::new(api)); - let xt = uxt(Alice, 209); - block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported"); - assert_eq!(pool.status().ready, 1); - - // remove the transaction that just got imported. - block_on(pool.prune_tags(&BlockId::number(1), vec![vec![209]], vec![])).expect("1. Pruned"); - assert_eq!(pool.status().ready, 0); - // it's re-imported to future - assert_eq!(pool.status().future, 1); - - // so now let's insert another transaction that also provides the 155 - let xt = uxt(Alice, 211); - block_on(pool.submit_one(&BlockId::number(2), xt.clone())).expect("2. Imported"); - assert_eq!(pool.status().ready, 1); - assert_eq!(pool.status().future, 1); - let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); - assert_eq!(pending, vec![211]); - - // prune it and make sure the pool is empty - block_on(pool.prune_tags(&BlockId::number(3), vec![vec![155]], vec![])).expect("2. Pruned"); - assert_eq!(pool.status().ready, 0); - assert_eq!(pool.status().future, 2); + let api = TestApi::default(); + *api.modifier.write() = Box::new(|v: &mut ValidTransaction| { + v.provides.push(vec![155]); + }); + let pool = Pool::new(Default::default(), Arc::new(api)); + let xt = uxt(Alice, 209); + block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported"); + assert_eq!(pool.status().ready, 1); + + // remove the transaction that just got imported. + block_on(pool.prune_tags(&BlockId::number(1), vec![vec![209]], vec![])).expect("1. Pruned"); + assert_eq!(pool.status().ready, 0); + // it's re-imported to future + assert_eq!(pool.status().future, 1); + + // so now let's insert another transaction that also provides the 155 + let xt = uxt(Alice, 211); + block_on(pool.submit_one(&BlockId::number(2), xt.clone())).expect("2. Imported"); + assert_eq!(pool.status().ready, 1); + assert_eq!(pool.status().future, 1); + let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); + assert_eq!(pending, vec![211]); + + // prune it and make sure the pool is empty + block_on(pool.prune_tags(&BlockId::number(3), vec![vec![155]], vec![])).expect("2. Pruned"); + assert_eq!(pool.status().ready, 0); + assert_eq!(pool.status().future, 2); } #[test] fn maintaince_prune() { let xt = uxt(Alice, 209); - let pool = maintained_pool(); + let pool = maintained_pool(); - block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported"); - assert_eq!(pool.status().ready, 1); + block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported"); + assert_eq!(pool.status().ready, 1); pool.api.push_block(1, vec![xt.clone()]); - block_on(pool.maintain(&BlockId::number(1), &[])); - assert_eq!(pool.status().ready, 0); + block_on(pool.maintain(&BlockId::number(1), &[])); + assert_eq!(pool.status().ready, 0); +} + + +#[test] +fn maintaince_revalidate() { + let xt1 = uxt(Alice, 209); + let xt2 = uxt(Alice, 210); + + let pool = maintained_pool(); + block_on(pool.submit_one(&BlockId::number(0), xt1.clone())).expect("1. Imported"); + block_on(pool.submit_one(&BlockId::number(0), xt2.clone())).expect("2. Imported"); + assert_eq!(pool.status().ready, 2); + assert_eq!(pool.api.validation_requests.read().len(), 2); + + pool.api.push_block(1, vec![xt1.clone()]); + + block_on(pool.maintain(&BlockId::number(1), &[])); + assert_eq!(pool.status().ready, 1); + // test that pool revalidated transaction that left ready and not included in the block + assert_eq!(pool.api.validation_requests.read().len(), 3); } From 16b91de594c19c30ab05e9da324bf545193e7b2a Mon Sep 17 00:00:00 2001 From: NikVolf Date: Mon, 20 Jan 2020 18:50:19 +0300 Subject: [PATCH 09/12] review fixes --- client/transaction-pool/graph/src/pool.rs | 2 +- client/transaction-pool/src/lib.rs | 100 ++++++++++++---------- client/transaction-pool/src/tests.rs | 4 +- 3 files changed, 56 insertions(+), 50 deletions(-) diff --git a/client/transaction-pool/graph/src/pool.rs b/client/transaction-pool/graph/src/pool.rs index 26ff57cdc5ba8..5be879f079a7b 100644 --- a/client/transaction-pool/graph/src/pool.rs +++ b/client/transaction-pool/graph/src/pool.rs @@ -87,7 +87,7 @@ pub trait ChainApi: Send + Sync { /// Returns hash and encoding length of the extrinsic. fn hash_and_length(&self, uxt: &ExtrinsicFor) -> (Self::Hash, usize); - /// Return body + /// Returns a block body given the block id. fn block_body(&self, at: &BlockId) -> Self::BodyFuture; } diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index 59d69e99a5894..925c71d52e3a4 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -58,13 +58,15 @@ pub enum RevalidationType { /// Light revalidation type. /// /// During maintaince, transaction pool makes periodic revalidation - /// depending on number of blocks or time passed. + /// of all transactions depending on number of blocks or time passed. + /// Also this kind of revalidation does not resubmit transactions from + /// retracted blocks, since it is too expensive. Light, /// Full revalidation type. /// - /// During maintaince, transaction pool revalidates some transactions - /// from the pool of valid transactions. + /// During maintaince, transaction pool revalidates some fixed amount of + /// transactions from the pool of valid transactions. Full, } @@ -191,30 +193,38 @@ enum RevalidationStrategy { Light(RevalidationStatus) } +struct RevalidationAction { + revalidate: bool, + resubmit: bool, + revalidate_amount: Option, +} + impl RevalidationStrategy { pub fn clear(&mut self) { if let Self::Light(status) = self { status.clear() } } - pub fn resubmit_required(&mut self) -> bool { - if let Self::Light(_) = self { return false } else { return true } - } - - pub fn is_required( + pub fn next( &mut self, block: N, revalidate_time_period: Option, revalidate_block_period: Option, - ) -> bool { - if let Self::Light(status) = self { - status.is_required(block, revalidate_time_period, revalidate_block_period) - } else { true } - } - - pub fn amount(&self) -> Option { + ) -> RevalidationAction { match self { - Self::Light(_) => None, - Self::Always => Some(16), + Self::Light(status) => RevalidationAction { + revalidate: status.next_required( + block, + revalidate_time_period, + revalidate_block_period + ), + resubmit: false, + revalidate_amount: None, + }, + Self::Always => RevalidationAction { + revalidate: true, + resubmit: true, + revalidate_amount: Some(16), + } } } } @@ -226,7 +236,7 @@ impl RevalidationStatus { } /// Returns true if revalidation is required. - pub fn is_required( + pub fn next_required( &mut self, block: N, revalidate_time_period: Option, @@ -258,7 +268,9 @@ where Block: BlockT, PoolApi: 'static + sc_transaction_graph::ChainApi, { - fn maintain(&self, id: &BlockId, retracted: &[BlockHash]) -> Pin + Send>> { + fn maintain(&self, id: &BlockId, retracted: &[BlockHash]) + -> Pin + Send>> + { let id = id.clone(); let pool = self.pool.clone(); let api = self.api.clone(); @@ -271,20 +283,12 @@ where } }; - let (is_revalidation_required, is_resubmit_required) = { - let mut lock = self.revalidation_strategy.lock(); - ( - lock.is_required( - block_number, - Some(std::time::Duration::from_secs(60)), - Some(20.into()), - ), - lock.resubmit_required() - ) - }; - - let revalidation_status = self.revalidation_strategy.clone(); - let revalidation_amount = revalidation_status.lock().amount(); + let next_action = self.revalidation_strategy.lock().next( + block_number, + Some(std::time::Duration::from_secs(60)), + Some(20.into()), + ); + let revalidation_strategy = self.revalidation_strategy.clone(); let retracted = retracted.to_vec(); async move { @@ -293,25 +297,27 @@ where log::warn!("Prune known transactions: error request {:?}!", e); None }) - .unwrap_or(Vec::new()) + .unwrap_or_default() .into_iter() .map(|tx| pool.hash_of(&tx)) .collect::>(); if let Err(e) = pool.prune_known(&id, &hashes) { - log::warn!("Cannot prune known in the pool {:?}!", e); + log::error!("Cannot prune known in the pool {:?}!", e); } - if is_resubmit_required { + if next_action.resubmit { let mut resubmit_transactions = Vec::new(); - for retracted_hash in retracted.into_iter() { - let txes = api.block_body(&BlockId::hash(retracted_hash.clone())).await - .unwrap_or(None) - .unwrap_or(Vec::new()); - for tx in txes { - resubmit_transactions.push(tx) - } + for retracted_hash in retracted { + let block_transactions = api.block_body(&BlockId::hash(retracted_hash.clone())).await + .unwrap_or_else(|e| { + log::warn!("Failed to fetch block body {:?}!", e); + None + }) + .unwrap_or_default(); + + resubmit_transactions.extend(block_transactions); } if let Err(e) = pool.submit_at(&id, resubmit_transactions, true).await { log::debug!(target: "txpool", @@ -320,13 +326,13 @@ where } } - if is_revalidation_required { - if let Err(e) = pool.revalidate_ready(&id, revalidation_amount).await { - log::warn!("revalidate ready failed {:?}", e); + if next_action.revalidate { + if let Err(e) = pool.revalidate_ready(&id, next_action.revalidate_amount).await { + log::warn!("Revalidate ready failed {:?}", e); } } - revalidation_status.lock().clear(); + revalidation_strategy.lock().clear(); }.boxed() } } diff --git a/client/transaction-pool/src/tests.rs b/client/transaction-pool/src/tests.rs index e5c351061c167..778536b7b9ae3 100644 --- a/client/transaction-pool/src/tests.rs +++ b/client/transaction-pool/src/tests.rs @@ -274,7 +274,7 @@ fn should_correctly_prune_transactions_providing_more_than_one_tag() { } #[test] -fn maintaince_prune() { +fn should_prune_old_during_maintenance() { let xt = uxt(Alice, 209); let pool = maintained_pool(); @@ -290,7 +290,7 @@ fn maintaince_prune() { #[test] -fn maintaince_revalidate() { +fn should_revalidate_during_maintenance() { let xt1 = uxt(Alice, 209); let xt2 = uxt(Alice, 210); From 722e4525e69dc87ed551658baa5c76194b16e390 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 21 Jan 2020 14:12:50 +0300 Subject: [PATCH 10/12] review fixes --- client/transaction-pool/src/api.rs | 19 ++++++++++++------- client/transaction-pool/src/lib.rs | 25 +++++++++++++++---------- 2 files changed, 27 insertions(+), 17 deletions(-) diff --git a/client/transaction-pool/src/api.rs b/client/transaction-pool/src/api.rs index 6f7e418ab11c1..1fc00ac8c515d 100644 --- a/client/transaction-pool/src/api.rs +++ b/client/transaction-pool/src/api.rs @@ -221,13 +221,18 @@ impl sc_transaction_graph::ChainApi for LightChainApi number, _ => { - log::trace!(target: "txqueue", "Skipping chain event - no numbrer for that block {:?}", id); + log::trace!(target: "txqueue", "Skipping chain event - no number for that block {:?}", id); return Box::pin(ready(())); } }; @@ -292,18 +292,21 @@ where let retracted = retracted.to_vec(); async move { - let hashes = api.block_body(&id).await - .unwrap_or_else(|e| { - log::warn!("Prune known transactions: error request {:?}!", e); - None - }) + // We don't query block if we won't prune anything + if !pool.status().is_empty() { + let hashes = api.block_body(&id).await + .unwrap_or_else(|e| { + log::warn!("Prune known transactions: error request {:?}!", e); + None + }) .unwrap_or_default() .into_iter() .map(|tx| pool.hash_of(&tx)) .collect::>(); - if let Err(e) = pool.prune_known(&id, &hashes) { - log::error!("Cannot prune known in the pool {:?}!", e); + if let Err(e) = pool.prune_known(&id, &hashes) { + log::error!("Cannot prune known in the pool {:?}!", e); + } } if next_action.resubmit { @@ -315,7 +318,9 @@ where log::warn!("Failed to fetch block body {:?}!", e); None }) - .unwrap_or_default(); + .unwrap_or_default() + .into_iter() + .filter(|tx| tx.is_signed().unwrap_or(true)); resubmit_transactions.extend(block_transactions); } From 61ff80f1c8eea6bd18026e4a84e93fb41ae71b28 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 21 Jan 2020 16:22:35 +0300 Subject: [PATCH 11/12] use just ready future --- client/transaction-pool/src/api.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/client/transaction-pool/src/api.rs b/client/transaction-pool/src/api.rs index 1fc00ac8c515d..1bf6348214842 100644 --- a/client/transaction-pool/src/api.rs +++ b/client/transaction-pool/src/api.rs @@ -19,7 +19,7 @@ use std::{marker::PhantomData, pin::Pin, sync::Arc}; use codec::{Decode, Encode}; use futures::{ - channel::oneshot, executor::{ThreadPool, ThreadPoolBuilder}, future::{Future, FutureExt, ready}, + channel::oneshot, executor::{ThreadPool, ThreadPoolBuilder}, future::{Future, FutureExt, ready, Ready}, }; use sc_client_api::{ @@ -72,12 +72,10 @@ impl sc_transaction_graph::ChainApi for FullChainApi> + Send>>; - type BodyFuture = Pin::Extrinsic>>>> + Send>>; + type BodyFuture = Ready::Extrinsic>>>>; fn block_body(&self, id: &BlockId) -> Self::BodyFuture { - Box::pin(ready( - self.client.block_body(&id).map_err(|e| error::Error::from(e)) - )) + ready(self.client.block_body(&id).map_err(|e| error::Error::from(e))) } fn validate_transaction( From b207696c00367d427bd09436e07cc0088bf726b8 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Wed, 22 Jan 2020 17:51:47 +0300 Subject: [PATCH 12/12] address review --- client/transaction-pool/src/lib.rs | 8 +++++--- primitives/transaction-pool/src/pool.rs | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index 91919ee59eb20..f6f7774935b39 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -57,7 +57,7 @@ pub struct BasicPool pub enum RevalidationType { /// Light revalidation type. /// - /// During maintaince, transaction pool makes periodic revalidation + /// During maintenance, transaction pool makes periodic revalidation /// of all transactions depending on number of blocks or time passed. /// Also this kind of revalidation does not resubmit transactions from /// retracted blocks, since it is too expensive. @@ -65,7 +65,7 @@ pub enum RevalidationType { /// Full revalidation type. /// - /// During maintaince, transaction pool revalidates some fixed amount of + /// During maintenance, transaction pool revalidates some fixed amount of /// transactions from the pool of valid transactions. Full, } @@ -201,7 +201,9 @@ struct RevalidationAction { impl RevalidationStrategy { pub fn clear(&mut self) { - if let Self::Light(status) = self { status.clear() } + if let Self::Light(status) = self { + status.clear() + } } pub fn next( diff --git a/primitives/transaction-pool/src/pool.rs b/primitives/transaction-pool/src/pool.rs index 8da7d7ecbe4dd..ed24ad0619a09 100644 --- a/primitives/transaction-pool/src/pool.rs +++ b/primitives/transaction-pool/src/pool.rs @@ -226,9 +226,9 @@ pub trait TransactionPool: Send + Sync { fn hash_of(&self, xt: &TransactionFor) -> TxHash; } -/// Trait for transaction pool maintaince. +/// Trait for transaction pool maintenance. pub trait MaintainedTransactionPool : TransactionPool { - /// Perform maintaince + /// Perform maintenance fn maintain(&self, block: &BlockId, retracted: &[BlockHash]) -> Pin + Send>>; }