From d9ce1f9df9cbfa14f7b52a09a6fe11ca04264b60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Wed, 25 Jul 2018 15:42:04 +0200 Subject: [PATCH 1/9] Initial implementation of storage events. --- Cargo.lock | 1 + polkadot/service/src/lib.rs | 2 +- substrate/client/db/Cargo.toml | 1 + substrate/client/db/src/lib.rs | 19 ++++- substrate/client/db/src/notifications.rs | 95 ++++++++++++++++++++++++ substrate/client/src/backend.rs | 12 ++- substrate/client/src/client.rs | 28 +++++-- substrate/client/src/lib.rs | 1 + substrate/client/src/light/backend.rs | 18 ++++- substrate/rpc/src/chain/mod.rs | 1 + substrate/service/src/components.rs | 4 +- substrate/state-machine/src/lib.rs | 2 +- 12 files changed, 168 insertions(+), 16 deletions(-) create mode 100644 substrate/client/db/src/notifications.rs diff --git a/Cargo.lock b/Cargo.lock index 865627b6686b1..99e7cca639cf5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2666,6 +2666,7 @@ name = "substrate-client-db" version = "0.1.0" dependencies = [ "ethereum-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "hashdb 0.1.1 (git+https://github.com/paritytech/parity.git)", "kvdb 0.1.0 (git+https://github.com/paritytech/parity.git)", "kvdb-memorydb 0.1.0 (git+https://github.com/paritytech/parity.git)", diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index 6d2c4aee157b8..86af40425aa3e 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -68,7 +68,7 @@ pub trait Components: service::Components { /// Polkadot API. type Api: 'static + PolkadotApi + Send + Sync; /// Client backend. - type Backend: 'static + client::backend::Backend; + type Backend: 'static + client::backend::Backend + client::backend::BackendEvents; /// Client executor. type Executor: 'static + client::CallExecutor + Send + Sync; } diff --git a/substrate/client/db/Cargo.toml b/substrate/client/db/Cargo.toml index 5a26a7cdb8537..1e52610d7a324 100644 --- a/substrate/client/db/Cargo.toml +++ b/substrate/client/db/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" authors = ["Parity Technologies "] [dependencies] +futures = "0.1.17" parking_lot = "0.4" log = "0.3" kvdb = { git = "https://github.com/paritytech/parity.git" } diff --git a/substrate/client/db/src/lib.rs b/substrate/client/db/src/lib.rs index bdfd5ce522096..ff4671746dc6d 100644 --- a/substrate/client/db/src/lib.rs +++ b/substrate/client/db/src/lib.rs @@ -16,6 +16,7 @@ //! Client backend that uses RocksDB database as storage. +extern crate futures; extern crate substrate_client as client; extern crate kvdb_rocksdb; extern crate kvdb; @@ -38,6 +39,7 @@ extern crate kvdb_memorydb; pub mod light; +mod notifications; mod utils; use std::sync::Arc; @@ -46,8 +48,8 @@ use std::path::PathBuf; use codec::{Decode, Encode}; use kvdb::{KeyValueDB, DBTransaction}; use memorydb::MemoryDB; -use parking_lot::RwLock; -use primitives::H256; +use parking_lot::{RwLock, Mutex}; +use primitives::{H256, storage::StorageKey}; use runtime_primitives::generic::BlockId; use runtime_primitives::bft::Justification; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, Hash, HashFor, Zero}; @@ -270,6 +272,7 @@ impl state_db::HashDb for StorageDb { /// Otherwise, trie nodes are kept only from the most recent block. pub struct Backend { storage: Arc>, + storage_notifications: Mutex>, blockchain: BlockchainDb, finalization_window: u64, } @@ -302,6 +305,7 @@ impl Backend { Ok(Backend { storage: Arc::new(storage_db), + storage_notifications: Default::default(), blockchain, finalization_window, }) @@ -323,6 +327,14 @@ fn apply_state_commit(transaction: &mut DBTransaction, commit: state_db::CommitS } } +impl client::backend::BackendEvents for Backend { + fn storage_changes_notification_stream(&self, filter_keys: Option<&[StorageKey]>) + -> client::error::Result> + { + Ok(self.storage_notifications.lock().listen(filter_keys)) + } +} + impl client::backend::Backend for Backend { type BlockImportOperation = BlockImportOperation; type Blockchain = BlockchainDb; @@ -363,6 +375,7 @@ impl client::backend::Backend for Backend { changeset.deleted.push(key.0.into()); } } + self.storage_notifications.lock().trigger(&hash, &changeset); let number_u64 = number.as_().into(); let commit = self.storage.state_db.insert_block(&hash, number_u64, &pending_block.header.parent_hash(), changeset); apply_state_commit(&mut transaction, commit); @@ -409,7 +422,7 @@ impl client::backend::Backend for Backend { } } -impl client::backend::LocalBackend for Backend +impl client::backend::LocalBackend for Backend {} #[cfg(test)] diff --git a/substrate/client/db/src/notifications.rs b/substrate/client/db/src/notifications.rs new file mode 100644 index 0000000000000..efa2f5e59f936 --- /dev/null +++ b/substrate/client/db/src/notifications.rs @@ -0,0 +1,95 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Storage notifications + +use std::collections::HashMap; + +use futures::sync::mpsc; +use primitives::{ + H256, + storage::{StorageKey, StorageData}, +}; +use runtime_primitives::traits::Block as BlockT; + +type SubscriberId = u64; + +/// Manages storage listeners. +#[derive(Debug)] +pub struct StorageNotifications { + next_id: SubscriberId, + filters: HashMap< + Option, + Vec, + >, + sinks: HashMap< + SubscriberId, + mpsc::UnboundedSender<( + Block::Hash, + Vec<(StorageKey, Option)>, + )>, + >, +} + +impl Default for StorageNotifications { + fn default() -> Self { + StorageNotifications { + next_id: Default::default(), + filters: Default::default(), + sinks: Default::default(), + } + } +} + +impl StorageNotifications { + /// Trigger notification to all listeners. + /// + /// Note the changes are going to be filtered by listener's filter key. + /// In fact no event might be sent if clients are not interested in the changes. + pub fn trigger(&mut self, hash: &Block::Hash, changeset: &::state_db::ChangeSet) { + + } + + /// Start listening for particular storage keys. + pub fn listen(&mut self, filter_keys: Option<&[StorageKey]>) -> ::client::StorageEventStream { + self.next_id += 1; + + // add subscriber for every key + { + let mut add = |key| { + self.filters + .entry(key) + .or_insert_with(Default::default) + .push(self.next_id); + }; + + match filter_keys { + None => add(None), + Some(keys) => keys.iter().for_each(|key| add(Some(StorageKey(key.0.clone())))), + } + } + + // insert sink + let (tx, rx) = mpsc::unbounded(); + self.sinks.insert(self.next_id, tx); + rx + } +} + +#[cfg(test)] +mod tests { + +} diff --git a/substrate/client/src/backend.rs b/substrate/client/src/backend.rs index 64f4a1d57f59e..d64850a871187 100644 --- a/substrate/client/src/backend.rs +++ b/substrate/client/src/backend.rs @@ -16,11 +16,12 @@ //! Polkadot Client data backend -use state_machine::backend::Backend as StateBackend; use error; +use primitives::storage::StorageKey; use runtime_primitives::bft::Justification; -use runtime_primitives::traits::Block as BlockT; use runtime_primitives::generic::BlockId; +use runtime_primitives::traits::Block as BlockT; +use state_machine::backend::Backend as StateBackend; /// Block insertion operation. Keeps hold if the inserted block state and data. pub trait BlockImportOperation { @@ -71,6 +72,13 @@ pub trait Backend: Send + Sync { fn state_at(&self, block: BlockId) -> error::Result; } +/// Client backend capable of emitting events. +pub trait BackendEvents { + /// Returns storage changes event stream. + fn storage_changes_notification_stream(&self, filter_keys: Option<&[StorageKey]>) + -> error::Result<::client::StorageEventStream>; +} + /// Mark for all Backend implementations, that are making use of state data, stored locally. pub trait LocalBackend: Backend {} diff --git a/substrate/client/src/client.rs b/substrate/client/src/client.rs index 8103128008c72..f4762231fda9d 100644 --- a/substrate/client/src/client.rs +++ b/substrate/client/src/client.rs @@ -36,6 +36,12 @@ use {error, in_mem, block_builder, runtime_io, bft, genesis}; /// Type that implements `futures::Stream` of block import events. pub type BlockchainEventStream = mpsc::UnboundedReceiver>; +/// Type that implements `futures::Stream` of storage change events. +pub type StorageEventStream = mpsc::UnboundedReceiver<( + H, + Vec<(StorageKey, Option)>, +)>; + /// Substrate Client pub struct Client where Block: BlockT { backend: Arc, @@ -49,7 +55,12 @@ pub struct Client where Block: BlockT { /// A source of blockchain evenets. pub trait BlockchainEvents { /// Get block import event stream. - fn import_notification_stream(&self) -> mpsc::UnboundedReceiver>; + fn import_notification_stream(&self) -> BlockchainEventStream; + + /// Get storage changes event stream. + /// + /// Passing `None` as keys subscribes to all possible keys + fn storage_changes_notification_stream(&self, filter_keys: Option<&[StorageKey]>) -> error::Result>; } /// Chain head information. @@ -181,9 +192,9 @@ impl Client where Ok(Client { backend, executor, - import_notification_sinks: Mutex::new(Vec::new()), - import_lock: Mutex::new(()), - importing_block: RwLock::new(None), + import_notification_sinks: Default::default(), + import_lock: Default::default(), + importing_block: Default::default(), execution_strategy, }) } @@ -497,16 +508,21 @@ impl bft::Authorities for Client impl BlockchainEvents for Client where - B: backend::Backend, + B: backend::BackendEvents, E: CallExecutor, Block: BlockT, { /// Get block import event stream. - fn import_notification_stream(&self) -> mpsc::UnboundedReceiver> { + fn import_notification_stream(&self) -> BlockchainEventStream { let (sink, stream) = mpsc::unbounded(); self.import_notification_sinks.lock().push(sink); stream } + + /// Get storage changes event stream. + fn storage_changes_notification_stream(&self, filter_keys: Option<&[StorageKey]>) -> error::Result> { + self.backend.storage_changes_notification_stream(filter_keys) + } } impl ChainHead for Client diff --git a/substrate/client/src/lib.rs b/substrate/client/src/lib.rs index 329d5a5ac02df..1a159655acb51 100644 --- a/substrate/client/src/lib.rs +++ b/substrate/client/src/lib.rs @@ -56,6 +56,7 @@ pub use client::{ BlockStatus, BlockOrigin, BlockchainEventStream, BlockchainEvents, Client, ClientInfo, ChainHead, ImportResult, JustifiedHeader, + StorageEventStream, }; pub use blockchain::Info as ChainInfo; pub use call_executor::{CallResult, CallExecutor, LocalCallExecutor}; diff --git a/substrate/client/src/light/backend.rs b/substrate/client/src/light/backend.rs index 1311618e994ec..afa39d7faea69 100644 --- a/substrate/client/src/light/backend.rs +++ b/substrate/client/src/light/backend.rs @@ -19,12 +19,13 @@ use std::sync::{Arc, Weak}; +use primitives::storage::StorageKey; use runtime_primitives::{bft::Justification, generic::BlockId}; use runtime_primitives::traits::Block as BlockT; use state_machine::{Backend as StateBackend, TrieBackend as StateTrieBackend, TryIntoTrieBackend as TryIntoStateTrieBackend}; -use backend::{Backend as ClientBackend, BlockImportOperation, RemoteBackend}; +use backend::{BackendEvents as ClientBackendEvents, Backend as ClientBackend, BlockImportOperation, RemoteBackend}; use blockchain::HeaderBackend as BlockchainHeaderBackend; use error::{Error as ClientError, ErrorKind as ClientErrorKind, Result as ClientResult}; use light::blockchain::{Blockchain, Storage as BlockchainStorage}; @@ -60,7 +61,20 @@ impl Backend { } } -impl ClientBackend for Backend where Block: BlockT, S: BlockchainStorage, F: Fetcher { +impl ClientBackendEvents for Backend + where Block: BlockT, +{ + fn storage_changes_notification_stream(&self, filter_keys: Option<&[StorageKey]>) + -> ClientResult<::client::StorageEventStream> { + bail!(ClientErrorKind::NotAvailableOnLightClient) + } +} + +impl ClientBackend for Backend where + Block: BlockT, + S: BlockchainStorage, + F: Fetcher, +{ type BlockImportOperation = ImportOperation; type Blockchain = Blockchain; type State = OnDemandState; diff --git a/substrate/rpc/src/chain/mod.rs b/substrate/rpc/src/chain/mod.rs index d1d055b5c0ff8..d89872821e78f 100644 --- a/substrate/rpc/src/chain/mod.rs +++ b/substrate/rpc/src/chain/mod.rs @@ -82,6 +82,7 @@ impl Chain { impl ChainApi for Chain where Block: BlockT + 'static, B: client::backend::Backend + Send + Sync + 'static, + B: client::backend::BackendEvents, E: client::CallExecutor + Send + Sync + 'static, { type Metadata = ::metadata::Metadata; diff --git a/substrate/service/src/components.rs b/substrate/service/src/components.rs index 7c6086b3f9d11..705db1745dbdb 100644 --- a/substrate/service/src/components.rs +++ b/substrate/service/src/components.rs @@ -149,7 +149,9 @@ pub trait Components { /// Associated service factory. type Factory: ServiceFactory; /// Client backend. - type Backend: 'static + client::backend::Backend>; + type Backend: 'static + + client::backend::Backend> + + client::backend::BackendEvents>; /// Client executor. type Executor: 'static + client::CallExecutor> + Send + Sync; /// Extrinsic pool type. diff --git a/substrate/state-machine/src/lib.rs b/substrate/state-machine/src/lib.rs index 7a69dc2c21b6f..65ecf67429cd4 100644 --- a/substrate/state-machine/src/lib.rs +++ b/substrate/state-machine/src/lib.rs @@ -511,7 +511,7 @@ mod tests { }, "test", &[], - ExecutionManager::Both(|we, _ne| { + ExecutionManager::Both(|we, _ne| { consensus_failed = true; println!("HELLO!"); we From 59f3a3be6a010b19aac0819c158a0633f0dce573 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Tue, 31 Jul 2018 10:42:01 +0200 Subject: [PATCH 2/9] Attaching storage events. --- Cargo.lock | 1 - polkadot/service/src/lib.rs | 2 +- substrate/client/db/Cargo.toml | 1 - substrate/client/db/src/lib.rs | 17 +- substrate/client/db/src/notifications.rs | 95 ---------- substrate/client/src/backend.rs | 8 - substrate/client/src/client.rs | 19 +- substrate/client/src/lib.rs | 7 +- substrate/client/src/light/backend.rs | 12 +- substrate/client/src/notifications.rs | 213 +++++++++++++++++++++++ substrate/primitives/src/storage.rs | 4 +- substrate/rpc/src/chain/mod.rs | 1 - substrate/service/src/components.rs | 4 +- substrate/state-machine/src/lib.rs | 18 +- 14 files changed, 248 insertions(+), 154 deletions(-) delete mode 100644 substrate/client/db/src/notifications.rs create mode 100644 substrate/client/src/notifications.rs diff --git a/Cargo.lock b/Cargo.lock index 99e7cca639cf5..865627b6686b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2666,7 +2666,6 @@ name = "substrate-client-db" version = "0.1.0" dependencies = [ "ethereum-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "hashdb 0.1.1 (git+https://github.com/paritytech/parity.git)", "kvdb 0.1.0 (git+https://github.com/paritytech/parity.git)", "kvdb-memorydb 0.1.0 (git+https://github.com/paritytech/parity.git)", diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index 86af40425aa3e..6d2c4aee157b8 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -68,7 +68,7 @@ pub trait Components: service::Components { /// Polkadot API. type Api: 'static + PolkadotApi + Send + Sync; /// Client backend. - type Backend: 'static + client::backend::Backend + client::backend::BackendEvents; + type Backend: 'static + client::backend::Backend; /// Client executor. type Executor: 'static + client::CallExecutor + Send + Sync; } diff --git a/substrate/client/db/Cargo.toml b/substrate/client/db/Cargo.toml index 1e52610d7a324..5a26a7cdb8537 100644 --- a/substrate/client/db/Cargo.toml +++ b/substrate/client/db/Cargo.toml @@ -4,7 +4,6 @@ version = "0.1.0" authors = ["Parity Technologies "] [dependencies] -futures = "0.1.17" parking_lot = "0.4" log = "0.3" kvdb = { git = "https://github.com/paritytech/parity.git" } diff --git a/substrate/client/db/src/lib.rs b/substrate/client/db/src/lib.rs index ff4671746dc6d..c22a41b836444 100644 --- a/substrate/client/db/src/lib.rs +++ b/substrate/client/db/src/lib.rs @@ -16,7 +16,6 @@ //! Client backend that uses RocksDB database as storage. -extern crate futures; extern crate substrate_client as client; extern crate kvdb_rocksdb; extern crate kvdb; @@ -39,7 +38,6 @@ extern crate kvdb_memorydb; pub mod light; -mod notifications; mod utils; use std::sync::Arc; @@ -48,8 +46,8 @@ use std::path::PathBuf; use codec::{Decode, Encode}; use kvdb::{KeyValueDB, DBTransaction}; use memorydb::MemoryDB; -use parking_lot::{RwLock, Mutex}; -use primitives::{H256, storage::StorageKey}; +use parking_lot::RwLock; +use primitives::H256; use runtime_primitives::generic::BlockId; use runtime_primitives::bft::Justification; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, Hash, HashFor, Zero}; @@ -272,7 +270,6 @@ impl state_db::HashDb for StorageDb { /// Otherwise, trie nodes are kept only from the most recent block. pub struct Backend { storage: Arc>, - storage_notifications: Mutex>, blockchain: BlockchainDb, finalization_window: u64, } @@ -305,7 +302,6 @@ impl Backend { Ok(Backend { storage: Arc::new(storage_db), - storage_notifications: Default::default(), blockchain, finalization_window, }) @@ -327,14 +323,6 @@ fn apply_state_commit(transaction: &mut DBTransaction, commit: state_db::CommitS } } -impl client::backend::BackendEvents for Backend { - fn storage_changes_notification_stream(&self, filter_keys: Option<&[StorageKey]>) - -> client::error::Result> - { - Ok(self.storage_notifications.lock().listen(filter_keys)) - } -} - impl client::backend::Backend for Backend { type BlockImportOperation = BlockImportOperation; type Blockchain = BlockchainDb; @@ -375,7 +363,6 @@ impl client::backend::Backend for Backend { changeset.deleted.push(key.0.into()); } } - self.storage_notifications.lock().trigger(&hash, &changeset); let number_u64 = number.as_().into(); let commit = self.storage.state_db.insert_block(&hash, number_u64, &pending_block.header.parent_hash(), changeset); apply_state_commit(&mut transaction, commit); diff --git a/substrate/client/db/src/notifications.rs b/substrate/client/db/src/notifications.rs deleted file mode 100644 index efa2f5e59f936..0000000000000 --- a/substrate/client/db/src/notifications.rs +++ /dev/null @@ -1,95 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! Storage notifications - -use std::collections::HashMap; - -use futures::sync::mpsc; -use primitives::{ - H256, - storage::{StorageKey, StorageData}, -}; -use runtime_primitives::traits::Block as BlockT; - -type SubscriberId = u64; - -/// Manages storage listeners. -#[derive(Debug)] -pub struct StorageNotifications { - next_id: SubscriberId, - filters: HashMap< - Option, - Vec, - >, - sinks: HashMap< - SubscriberId, - mpsc::UnboundedSender<( - Block::Hash, - Vec<(StorageKey, Option)>, - )>, - >, -} - -impl Default for StorageNotifications { - fn default() -> Self { - StorageNotifications { - next_id: Default::default(), - filters: Default::default(), - sinks: Default::default(), - } - } -} - -impl StorageNotifications { - /// Trigger notification to all listeners. - /// - /// Note the changes are going to be filtered by listener's filter key. - /// In fact no event might be sent if clients are not interested in the changes. - pub fn trigger(&mut self, hash: &Block::Hash, changeset: &::state_db::ChangeSet) { - - } - - /// Start listening for particular storage keys. - pub fn listen(&mut self, filter_keys: Option<&[StorageKey]>) -> ::client::StorageEventStream { - self.next_id += 1; - - // add subscriber for every key - { - let mut add = |key| { - self.filters - .entry(key) - .or_insert_with(Default::default) - .push(self.next_id); - }; - - match filter_keys { - None => add(None), - Some(keys) => keys.iter().for_each(|key| add(Some(StorageKey(key.0.clone())))), - } - } - - // insert sink - let (tx, rx) = mpsc::unbounded(); - self.sinks.insert(self.next_id, tx); - rx - } -} - -#[cfg(test)] -mod tests { - -} diff --git a/substrate/client/src/backend.rs b/substrate/client/src/backend.rs index d64850a871187..416b4058f0be4 100644 --- a/substrate/client/src/backend.rs +++ b/substrate/client/src/backend.rs @@ -17,7 +17,6 @@ //! Polkadot Client data backend use error; -use primitives::storage::StorageKey; use runtime_primitives::bft::Justification; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::Block as BlockT; @@ -72,13 +71,6 @@ pub trait Backend: Send + Sync { fn state_at(&self, block: BlockId) -> error::Result; } -/// Client backend capable of emitting events. -pub trait BackendEvents { - /// Returns storage changes event stream. - fn storage_changes_notification_stream(&self, filter_keys: Option<&[StorageKey]>) - -> error::Result<::client::StorageEventStream>; -} - /// Mark for all Backend implementations, that are making use of state data, stored locally. pub trait LocalBackend: Backend {} diff --git a/substrate/client/src/client.rs b/substrate/client/src/client.rs index f4762231fda9d..b19832dc2376f 100644 --- a/substrate/client/src/client.rs +++ b/substrate/client/src/client.rs @@ -31,21 +31,17 @@ use backend::{self, BlockImportOperation}; use blockchain::{self, Info as ChainInfo, Backend as ChainBackend, HeaderBackend as ChainHeaderBackend}; use call_executor::{CallExecutor, LocalCallExecutor}; use executor::{RuntimeVersion, RuntimeInfo}; +use notifications::{StorageNotifications, StorageEventStream}; use {error, in_mem, block_builder, runtime_io, bft, genesis}; /// Type that implements `futures::Stream` of block import events. pub type BlockchainEventStream = mpsc::UnboundedReceiver>; -/// Type that implements `futures::Stream` of storage change events. -pub type StorageEventStream = mpsc::UnboundedReceiver<( - H, - Vec<(StorageKey, Option)>, -)>; - /// Substrate Client pub struct Client where Block: BlockT { backend: Arc, executor: E, + storage_notifications: Mutex>, import_notification_sinks: Mutex>>>, import_lock: Mutex<()>, importing_block: RwLock>, // holds the block hash currently being imported. TODO: replace this with block queue @@ -192,6 +188,7 @@ impl Client where Ok(Client { backend, executor, + storage_notifications: Default::default(), import_notification_sinks: Default::default(), import_lock: Default::default(), importing_block: Default::default(), @@ -364,7 +361,7 @@ impl Client where }, ); let (_, storage_update) = r?; - Some(storage_update) + Some((storage_update, overlay.into_committed())) }, None => None, }; @@ -373,8 +370,11 @@ impl Client where trace!("Imported {}, (#{}), best={}, origin={:?}", hash, header.number(), is_new_best, origin); let unchecked: bft::UncheckedJustification<_> = justification.uncheck().into(); transaction.set_block_data(header.clone(), body, Some(unchecked.into()), is_new_best)?; - if let Some(storage_update) = storage_update { + if let Some((storage_update, changes)) = storage_update { transaction.update_storage(storage_update)?; + // TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes? + self.storage_notifications.lock() + .trigger(&hash, changes); } self.backend.commit_operation(transaction)?; if origin == BlockOrigin::NetworkBroadcast || origin == BlockOrigin::Own || origin == BlockOrigin::ConsensusBroadcast { @@ -508,7 +508,6 @@ impl bft::Authorities for Client impl BlockchainEvents for Client where - B: backend::BackendEvents, E: CallExecutor, Block: BlockT, { @@ -521,7 +520,7 @@ impl BlockchainEvents for Client /// Get storage changes event stream. fn storage_changes_notification_stream(&self, filter_keys: Option<&[StorageKey]>) -> error::Result> { - self.backend.storage_changes_notification_stream(filter_keys) + Ok(self.storage_notifications.lock().listen(filter_keys)) } } diff --git a/substrate/client/src/lib.rs b/substrate/client/src/lib.rs index 1a159655acb51..ea0f5bb5d9427 100644 --- a/substrate/client/src/lib.rs +++ b/substrate/client/src/lib.rs @@ -50,14 +50,15 @@ pub mod block_builder; pub mod light; mod call_executor; mod client; +mod notifications; +pub use blockchain::Info as ChainInfo; +pub use call_executor::{CallResult, CallExecutor, LocalCallExecutor}; pub use client::{ new_in_mem, BlockStatus, BlockOrigin, BlockchainEventStream, BlockchainEvents, Client, ClientInfo, ChainHead, ImportResult, JustifiedHeader, - StorageEventStream, }; -pub use blockchain::Info as ChainInfo; -pub use call_executor::{CallResult, CallExecutor, LocalCallExecutor}; +pub use notifications::{StorageEventStream, StorageChangeSet}; pub use state_machine::ExecutionStrategy; diff --git a/substrate/client/src/light/backend.rs b/substrate/client/src/light/backend.rs index afa39d7faea69..2d3b56da940f6 100644 --- a/substrate/client/src/light/backend.rs +++ b/substrate/client/src/light/backend.rs @@ -19,13 +19,12 @@ use std::sync::{Arc, Weak}; -use primitives::storage::StorageKey; use runtime_primitives::{bft::Justification, generic::BlockId}; use runtime_primitives::traits::Block as BlockT; use state_machine::{Backend as StateBackend, TrieBackend as StateTrieBackend, TryIntoTrieBackend as TryIntoStateTrieBackend}; -use backend::{BackendEvents as ClientBackendEvents, Backend as ClientBackend, BlockImportOperation, RemoteBackend}; +use backend::{Backend as ClientBackend, BlockImportOperation, RemoteBackend}; use blockchain::HeaderBackend as BlockchainHeaderBackend; use error::{Error as ClientError, ErrorKind as ClientErrorKind, Result as ClientResult}; use light::blockchain::{Blockchain, Storage as BlockchainStorage}; @@ -61,15 +60,6 @@ impl Backend { } } -impl ClientBackendEvents for Backend - where Block: BlockT, -{ - fn storage_changes_notification_stream(&self, filter_keys: Option<&[StorageKey]>) - -> ClientResult<::client::StorageEventStream> { - bail!(ClientErrorKind::NotAvailableOnLightClient) - } -} - impl ClientBackend for Backend where Block: BlockT, S: BlockchainStorage, diff --git a/substrate/client/src/notifications.rs b/substrate/client/src/notifications.rs new file mode 100644 index 0000000000000..9bfbee00dd2f8 --- /dev/null +++ b/substrate/client/src/notifications.rs @@ -0,0 +1,213 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Storage notifications + +// TODO [ToDr] Use FNV HashSet +use std::{ + collections::{HashSet, HashMap}, + sync::Arc, +}; + +use futures::sync::mpsc; +use primitives::storage::{StorageKey, StorageData}; +use runtime_primitives::traits::Block as BlockT; + +/// Storage change set +#[derive(Debug)] +pub struct StorageChangeSet { + changes: Arc)>>, + filter: Option>, +} + +impl StorageChangeSet { + /// Convert the change set into iterator over storage items. + pub fn iter<'a>(&'a self) -> impl Iterator)> + 'a { + self.changes + .iter() + .filter(move |&(key, _)| match self.filter { + Some(ref filter) => filter.contains(key), + None => true, + }) + } +} + +/// Type that implements `futures::Stream` of storage change events. +pub type StorageEventStream = mpsc::UnboundedReceiver<(H, StorageChangeSet)>; + +type SubscriberId = u64; + +/// Manages storage listeners. +#[derive(Debug)] +pub struct StorageNotifications { + next_id: SubscriberId, + wildcard_listeners: HashSet, + listeners: HashMap>, + sinks: HashMap, + Option>, + )>, +} + +impl Default for StorageNotifications { + fn default() -> Self { + StorageNotifications { + next_id: Default::default(), + wildcard_listeners: Default::default(), + listeners: Default::default(), + sinks: Default::default(), + } + } +} + +impl StorageNotifications { + /// Trigger notification to all listeners. + /// + /// Note the changes are going to be filtered by listener's filter key. + /// In fact no event might be sent if clients are not interested in the changes. + pub fn trigger(&mut self, hash: &Block::Hash, changeset: impl Iterator, Option>)>) { + let has_wildcard = !self.wildcard_listeners.is_empty(); + + let mut subscribers = self.wildcard_listeners.clone(); + let mut changes = Vec::new(); + + // Collect subscribers and changes + for (k, v) in changeset { + let k = StorageKey(k); + let listeners = self.listeners.get(&k); + + if let Some(ref listeners) = listeners { + subscribers.extend(listeners.iter()); + } + + if has_wildcard || listeners.is_some() { + changes.push((k, v.map(StorageData))); + } + } + + let changes = Arc::new(changes); + // Trigger the events + for subscriber in subscribers { + let remove_subscriber = match self.sinks.get(&subscriber) { + Some(&(ref sink, ref filter)) => { + sink.unbounded_send((hash.clone(), StorageChangeSet { + changes: changes.clone(), + filter: filter.clone(), + })).is_err() + }, + None => false, + }; + if remove_subscriber { + self.sinks.remove(&subscriber); + } + } + } + + /// Start listening for particular storage keys. + pub fn listen(&mut self, filter_keys: Option<&[StorageKey]>) -> StorageEventStream { + self.next_id += 1; + + // add subscriber for every key + let keys = match filter_keys { + None => { + self.wildcard_listeners.insert(self.next_id); + None + }, + Some(keys) => Some(keys.iter().map(|key| { + self.listeners + .entry(key.clone()) + .or_insert_with(Default::default) + .insert(self.next_id); + key.clone() + }).collect()) + }; + + // insert sink + let (tx, rx) = mpsc::unbounded(); + self.sinks.insert(self.next_id, (tx, keys)); + rx + } +} + +#[cfg(test)] +mod tests { + use runtime_primitives::testing::{H256 as Hash, Block as RawBlock}; + use super::*; + use futures::Stream; + + + #[cfg(test)] + impl From)>> for StorageChangeSet { + fn from(changes: Vec<(StorageKey, Option)>) -> Self { + StorageChangeSet { + changes: Arc::new(changes), + filter: None, + } + } + } + + #[cfg(test)] + impl PartialEq for StorageChangeSet { + fn eq(&self, other: &Self) -> bool { + self.iter().eq(other.iter()) + } + } + + type Block = RawBlock; + + #[test] + fn triggering_change_should_notify_wildcard_listeners() { + // given + let mut notifications = StorageNotifications::::default(); + let mut recv = notifications.listen(None).wait(); + + // when + let changeset = vec![ + (vec![2], Some(vec![3])), + (vec![3], None), + ]; + notifications.trigger(&1.into(), changeset.into_iter()); + + // then + assert_eq!(recv.next().unwrap(), Ok((1.into(), vec![ + (StorageKey(vec![2]), Some(StorageData(vec![3]))), + (StorageKey(vec![3]), None), + ].into()))); + } + + #[test] + fn should_only_notify_interested_listeners() { + // given + let mut notifications = StorageNotifications::::default(); + let mut recv1 = notifications.listen(Some(&[StorageKey(vec![1])])).wait(); + let mut recv2 = notifications.listen(Some(&[StorageKey(vec![2])])).wait(); + + // when + let changeset = vec![ + (vec![2], Some(vec![3])), + (vec![1], None), + ]; + notifications.trigger(&1.into(), changeset.into_iter()); + + // then + assert_eq!(recv1.next().unwrap(), Ok((1.into(), vec![ + (StorageKey(vec![1]), None), + ].into()))); + assert_eq!(recv2.next().unwrap(), Ok((1.into(), vec![ + (StorageKey(vec![2]), Some(StorageData(vec![3]))), + ].into()))); + } +} diff --git a/substrate/primitives/src/storage.rs b/substrate/primitives/src/storage.rs index c8929c7646ff6..533762620b86a 100644 --- a/substrate/primitives/src/storage.rs +++ b/substrate/primitives/src/storage.rs @@ -22,10 +22,10 @@ use rstd::vec::Vec; /// Contract storage key. #[derive(PartialEq, Eq)] -#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug, Hash, PartialOrd, Ord))] +#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug, Hash, PartialOrd, Ord, Clone))] pub struct StorageKey(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec); /// Contract storage entry data. #[derive(PartialEq, Eq)] -#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug, Hash, PartialOrd, Ord))] +#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug, Hash, PartialOrd, Ord, Clone))] pub struct StorageData(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec); diff --git a/substrate/rpc/src/chain/mod.rs b/substrate/rpc/src/chain/mod.rs index d89872821e78f..d1d055b5c0ff8 100644 --- a/substrate/rpc/src/chain/mod.rs +++ b/substrate/rpc/src/chain/mod.rs @@ -82,7 +82,6 @@ impl Chain { impl ChainApi for Chain where Block: BlockT + 'static, B: client::backend::Backend + Send + Sync + 'static, - B: client::backend::BackendEvents, E: client::CallExecutor + Send + Sync + 'static, { type Metadata = ::metadata::Metadata; diff --git a/substrate/service/src/components.rs b/substrate/service/src/components.rs index 705db1745dbdb..7c6086b3f9d11 100644 --- a/substrate/service/src/components.rs +++ b/substrate/service/src/components.rs @@ -149,9 +149,7 @@ pub trait Components { /// Associated service factory. type Factory: ServiceFactory; /// Client backend. - type Backend: 'static - + client::backend::Backend> - + client::backend::BackendEvents>; + type Backend: 'static + client::backend::Backend>; /// Client executor. type Executor: 'static + client::CallExecutor> + Send + Sync; /// Extrinsic pool type. diff --git a/substrate/state-machine/src/lib.rs b/substrate/state-machine/src/lib.rs index 65ecf67429cd4..caf4ac23f78ad 100644 --- a/substrate/state-machine/src/lib.rs +++ b/substrate/state-machine/src/lib.rs @@ -34,7 +34,6 @@ extern crate byteorder; extern crate parking_lot; use std::collections::HashMap; -use std::collections::hash_map::Drain; use std::fmt; pub mod backend; @@ -86,10 +85,23 @@ impl OverlayedChanges { } } - /// Drain prospective changes to an iterator. - pub fn drain(&mut self) -> Drain, Option>> { + /// Drain committed changes to an iterator. + /// + /// Panics: + /// Will panic if there are any uncommitted prospective changes. + pub fn drain<'a>(&'a mut self) -> impl Iterator, Option>)> + 'a { + assert!(self.prospective.is_empty()); self.committed.drain() } + + /// Consume `OverlayedChanges` and take committed set. + /// + /// Panics: + /// Will panic if there are any uncommitted prospective changes. + pub fn into_committed(self) -> impl Iterator, Option>)> { + assert!(self.prospective.is_empty()); + self.committed.into_iter() + } } /// State Machine Error bound. From b157e1dd9bffa4d5402fb8c1aa3a845cb460494e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Tue, 31 Jul 2018 14:43:36 +0200 Subject: [PATCH 3/9] Expose storage modification stream over RPC. --- substrate/client/src/client.rs | 1 + substrate/client/src/notifications.rs | 56 +++++++++++++- substrate/primitives/src/storage.rs | 13 ++++ substrate/rpc-servers/src/lib.rs | 2 +- substrate/rpc/src/chain/mod.rs | 11 ++- substrate/rpc/src/state/mod.rs | 103 ++++++++++++++++++++++---- substrate/service/src/lib.rs | 3 +- substrate/state-machine/src/lib.rs | 9 +-- 8 files changed, 166 insertions(+), 32 deletions(-) diff --git a/substrate/client/src/client.rs b/substrate/client/src/client.rs index b5d5aa17cfb7d..10128d948993b 100644 --- a/substrate/client/src/client.rs +++ b/substrate/client/src/client.rs @@ -367,6 +367,7 @@ impl Client where }, ); let (_, storage_update) = r?; + overlay.commit_prospective(); Some((storage_update, overlay.into_committed())) }, None => None, diff --git a/substrate/client/src/notifications.rs b/substrate/client/src/notifications.rs index 9bfbee00dd2f8..7e800a11df5f3 100644 --- a/substrate/client/src/notifications.rs +++ b/substrate/client/src/notifications.rs @@ -81,6 +81,11 @@ impl StorageNotifications { pub fn trigger(&mut self, hash: &Block::Hash, changeset: impl Iterator, Option>)>) { let has_wildcard = !self.wildcard_listeners.is_empty(); + // early exit if no listeners + if !has_wildcard && self.listeners.is_empty() { + return; + } + let mut subscribers = self.wildcard_listeners.clone(); let mut changes = Vec::new(); @@ -111,7 +116,32 @@ impl StorageNotifications { None => false, }; if remove_subscriber { - self.sinks.remove(&subscriber); + self.remove_subscriber(subscriber); + } + } + } + + fn remove_subscriber(&mut self, subscriber: SubscriberId) { + if let Some((_, filters)) = self.sinks.remove(&subscriber) { + match filters { + None => { + self.wildcard_listeners.remove(&subscriber); + }, + Some(filters) => { + for key in filters { + let remove_key = match self.listeners.get_mut(&key) { + Some(ref mut set) => { + set.remove(&subscriber); + set.is_empty() + }, + None => false, + }; + + if remove_key { + self.listeners.remove(&key); + } + } + }, } } } @@ -210,4 +240,28 @@ mod tests { (StorageKey(vec![2]), Some(StorageData(vec![3]))), ].into()))); } + + #[test] + fn should_cleanup_subscribers_if_dropped() { + // given + let mut notifications = StorageNotifications::::default(); + { + let _recv1 = notifications.listen(Some(&[StorageKey(vec![1])])).wait(); + let _recv2 = notifications.listen(Some(&[StorageKey(vec![2])])).wait(); + let _recv3 = notifications.listen(None).wait(); + assert_eq!(notifications.listeners.len(), 2); + assert_eq!(notifications.wildcard_listeners.len(), 1); + } + + // when + let changeset = vec![ + (vec![2], Some(vec![3])), + (vec![1], None), + ]; + notifications.trigger(&1.into(), changeset.into_iter()); + + // then + assert_eq!(notifications.listeners.len(), 0); + assert_eq!(notifications.wildcard_listeners.len(), 0); + } } diff --git a/substrate/primitives/src/storage.rs b/substrate/primitives/src/storage.rs index 533762620b86a..25bb11fb6e549 100644 --- a/substrate/primitives/src/storage.rs +++ b/substrate/primitives/src/storage.rs @@ -29,3 +29,16 @@ pub struct StorageKey(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec< #[derive(PartialEq, Eq)] #[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug, Hash, PartialOrd, Ord, Clone))] pub struct StorageData(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec); + +/// Storage change set +#[cfg_attr(feature = "std", derive(Serialize, Deserialize))] +pub struct StorageChangeSet { + /// Block hash + pub block: Hash, + /// A list of changes + pub changes: Vec<( + StorageKey, + Option, + )>, +} + diff --git a/substrate/rpc-servers/src/lib.rs b/substrate/rpc-servers/src/lib.rs index 006345e68e335..11fa9aa9b60af 100644 --- a/substrate/rpc-servers/src/lib.rs +++ b/substrate/rpc-servers/src/lib.rs @@ -45,7 +45,7 @@ pub fn rpc_handler( system: Y, ) -> RpcHandler where Block: 'static, - S: apis::state::StateApi, + S: apis::state::StateApi, C: apis::chain::ChainApi, A: apis::author::AuthorApi, Y: apis::system::SystemApi, diff --git a/substrate/rpc/src/chain/mod.rs b/substrate/rpc/src/chain/mod.rs index d1d055b5c0ff8..c027a07aaef1a 100644 --- a/substrate/rpc/src/chain/mod.rs +++ b/substrate/rpc/src/chain/mod.rs @@ -18,14 +18,13 @@ use std::sync::Arc; -use runtime_primitives::traits::Block as BlockT; -use runtime_primitives::generic::BlockId; use client::{self, Client, BlockchainEvents}; - use jsonrpc_macros::pubsub; use jsonrpc_pubsub::SubscriptionId; use rpc::Result as RpcResult; use rpc::futures::{Future, Sink, Stream}; +use runtime_primitives::generic::BlockId; +use runtime_primitives::traits::Block as BlockT; use tokio::runtime::TaskExecutor; use subscriptions::Subscriptions; @@ -51,11 +50,11 @@ build_rpc_trait! { #[pubsub(name = "chain_newHead")] { /// New head subscription - #[rpc(name = "subscribe_newHead")] + #[rpc(name = "subscribe_newHead", alias = ["chain_subscribeNewHead", ])] fn subscribe_new_head(&self, Self::Metadata, pubsub::Subscriber
); /// Unsubscribe from new head subscription. - #[rpc(name = "unsubscribe_newHead")] + #[rpc(name = "unsubscribe_newHead", alias = ["chain_unsubscribeNewHead", ])] fn unsubscribe_new_head(&self, SubscriptionId) -> RpcResult; } } @@ -72,7 +71,7 @@ pub struct Chain { impl Chain { /// Create new Chain API RPC handler. pub fn new(client: Arc>, executor: TaskExecutor) -> Self { - Chain { + Self { client, subscriptions: Subscriptions::new(executor), } diff --git a/substrate/rpc/src/state/mod.rs b/substrate/rpc/src/state/mod.rs index 40d6f70f6c318..6cf6907712f97 100644 --- a/substrate/rpc/src/state/mod.rs +++ b/substrate/rpc/src/state/mod.rs @@ -16,24 +16,33 @@ //! Polkadot state API. -mod error; - -#[cfg(test)] -mod tests; - use std::sync::Arc; -use client::{self, Client, CallExecutor}; +use client::{self, Client, CallExecutor, BlockchainEvents}; +use jsonrpc_macros::Trailing; +use jsonrpc_macros::pubsub; +use jsonrpc_pubsub::SubscriptionId; +use primitives::hexdisplay::HexDisplay; +use primitives::storage::{StorageKey, StorageData, StorageChangeSet}; +use rpc::Result as RpcResult; +use rpc::futures::{Future, Sink, Stream}; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::Block as BlockT; -use primitives::storage::{StorageKey, StorageData}; -use primitives::hexdisplay::HexDisplay; +use tokio::runtime::TaskExecutor; + +use subscriptions::Subscriptions; + +mod error; +#[cfg(test)] +mod tests; use self::error::Result; build_rpc_trait! { /// Polkadot state API pub trait StateApi { + type Metadata; + /// Returns a storage entry at a specific block's state. #[rpc(name = "state_getStorageAt")] fn storage_at(&self, StorageKey, Hash) -> Result; @@ -65,22 +74,52 @@ build_rpc_trait! { /// Call a contract at the best block. #[rpc(name = "state_call")] fn call(&self, String, Vec) -> Result>; + + #[pubsub(name = "state_storage")] { + /// New storage subscription + #[rpc(name = "state_subscribeStorage")] + fn subscribe_storage(&self, Self::Metadata, pubsub::Subscriber>, Trailing>); + + /// Unsubscribe from storage subscription + #[rpc(name = "state_unsubscribeStorage")] + fn unsubscribe_storage(&self, SubscriptionId) -> RpcResult; + } + } +} + +/// State API with subscriptions support. +pub struct State { + /// Substrate client. + client: Arc>, + /// Current subscriptions. + subscriptions: Subscriptions, +} + +impl State { + /// Create new State API RPC handler. + pub fn new(client: Arc>, executor: TaskExecutor) -> Self { + Self { + client, + subscriptions: Subscriptions::new(executor), + } } } -impl StateApi for Arc> where +impl StateApi for State where Block: BlockT + 'static, B: client::backend::Backend + Send + Sync + 'static, E: CallExecutor + Send + Sync + 'static, { + type Metadata = ::metadata::Metadata; + fn storage_at(&self, key: StorageKey, block: Block::Hash) -> Result { trace!(target: "rpc", "Querying storage at {:?} for key {}", block, HexDisplay::from(&key.0)); - Ok(self.as_ref().storage(&BlockId::Hash(block), &key)?) + Ok(self.client.storage(&BlockId::Hash(block), &key)?) } fn call_at(&self, method: String, data: Vec, block: Block::Hash) -> Result> { trace!(target: "rpc", "Calling runtime at {:?} for method {} ({})", block, method, HexDisplay::from(&data)); - Ok(self.as_ref().executor().call(&BlockId::Hash(block), &method, &data)?.return_data) + Ok(self.client.executor().call(&BlockId::Hash(block), &method, &data)?.return_data) } fn storage_hash_at(&self, key: StorageKey, block: Block::Hash) -> Result { @@ -93,18 +132,52 @@ impl StateApi for Arc> where } fn storage_hash(&self, key: StorageKey) -> Result { - self.storage_hash_at(key, self.as_ref().info()?.chain.best_hash) + self.storage_hash_at(key, self.client.info()?.chain.best_hash) } fn storage_size(&self, key: StorageKey) -> Result { - self.storage_size_at(key, self.as_ref().info()?.chain.best_hash) + self.storage_size_at(key, self.client.info()?.chain.best_hash) } fn storage(&self, key: StorageKey) -> Result { - self.storage_at(key, self.as_ref().info()?.chain.best_hash) + self.storage_at(key, self.client.info()?.chain.best_hash) } fn call(&self, method: String, data: Vec) -> Result> { - self.call_at(method, data, self.as_ref().info()?.chain.best_hash) + self.call_at(method, data, self.client.info()?.chain.best_hash) + } + + fn subscribe_storage( + &self, + _meta: Self::Metadata, + subscriber: pubsub::Subscriber>, + keys: Trailing> + ) { + let keys = Into::>>::into(keys); + let stream = match self.client.storage_changes_notification_stream(keys.as_ref().map(|x| &**x)) { + Ok(stream) => stream, + Err(err) => { + let _ = subscriber.reject(error::Error::from(err).into()); + return; + }, + }; + + self.subscriptions.add(subscriber, |sink| { + let stream = stream + .map_err(|e| warn!("Error creating storage notification stream: {:?}", e)) + .map(|(block, changes)| Ok(StorageChangeSet { + block, + changes: changes.iter().cloned().collect(), + })); + sink + .sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) + .send_all(stream) + // we ignore the resulting Stream (if the first stream is over we are unsubscribed) + .map(|_| ()) + }) + } + + fn unsubscribe_storage(&self, id: SubscriptionId) -> RpcResult { + Ok(self.subscriptions.cancel(id)) } } diff --git a/substrate/service/src/lib.rs b/substrate/service/src/lib.rs index fca2c4d05834e..2b9e588d01156 100644 --- a/substrate/service/src/lib.rs +++ b/substrate/service/src/lib.rs @@ -202,9 +202,10 @@ impl Service let handler = || { let client = client.clone(); let chain = rpc::apis::chain::Chain::new(client.clone(), task_executor.clone()); + let state = rpc::apis::state::State::new(client.clone(), task_executor.clone()); let author = rpc::apis::author::Author::new(client.clone(), extrinsic_pool.api(), task_executor.clone()); rpc::rpc_handler::, _, _, _, _>( - client, + state, chain, author, rpc_config.clone(), diff --git a/substrate/state-machine/src/lib.rs b/substrate/state-machine/src/lib.rs index ef7b7f6962a56..20498b107bf0f 100644 --- a/substrate/state-machine/src/lib.rs +++ b/substrate/state-machine/src/lib.rs @@ -379,14 +379,7 @@ pub fn execute_using_consensus_failure_handler< result.map(move |out| (out, delta)) }; - match result { - Ok(x) => { - Ok(x) - } - Err(e) => { - Err(Box::new(e)) - } - } + result.map_err(|e| Box::new(e) as _) } /// Prove execution using the given state backend, overlayed changes, and call executor. From 44e07d69909d13d879185f31a8bee8266ff59df6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Tue, 31 Jul 2018 14:48:09 +0200 Subject: [PATCH 4/9] Use FNV for hashing small keys. --- Cargo.lock | 1 + substrate/client/Cargo.toml | 1 + substrate/client/src/lib.rs | 1 + substrate/client/src/notifications.rs | 8 ++++---- 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 346c4f2c7e26e..fea5051c08438 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2647,6 +2647,7 @@ version = "0.1.0" dependencies = [ "ed25519 0.1.0", "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "hex-literal 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/substrate/client/Cargo.toml b/substrate/client/Cargo.toml index 30ec72435d202..5b347395763b7 100644 --- a/substrate/client/Cargo.toml +++ b/substrate/client/Cargo.toml @@ -5,6 +5,7 @@ authors = ["Parity Technologies "] [dependencies] error-chain = "0.12" +fnv = "1.0" log = "0.3" parking_lot = "0.4" triehash = "0.1" diff --git a/substrate/client/src/lib.rs b/substrate/client/src/lib.rs index ea0f5bb5d9427..8a4a143914a96 100644 --- a/substrate/client/src/lib.rs +++ b/substrate/client/src/lib.rs @@ -32,6 +32,7 @@ extern crate substrate_state_machine as state_machine; #[macro_use] extern crate slog; // needed until we can reexport `slog_info` from `substrate_telemetry` extern crate ed25519; +extern crate fnv; extern crate futures; extern crate parking_lot; extern crate triehash; diff --git a/substrate/client/src/notifications.rs b/substrate/client/src/notifications.rs index 7e800a11df5f3..2cc2b613b1536 100644 --- a/substrate/client/src/notifications.rs +++ b/substrate/client/src/notifications.rs @@ -16,12 +16,12 @@ //! Storage notifications -// TODO [ToDr] Use FNV HashSet use std::{ collections::{HashSet, HashMap}, sync::Arc, }; +use fnv::{FnvHashSet, FnvHashMap}; use futures::sync::mpsc; use primitives::storage::{StorageKey, StorageData}; use runtime_primitives::traits::Block as BlockT; @@ -54,9 +54,9 @@ type SubscriberId = u64; #[derive(Debug)] pub struct StorageNotifications { next_id: SubscriberId, - wildcard_listeners: HashSet, - listeners: HashMap>, - sinks: HashMap, + listeners: HashMap>, + sinks: FnvHashMap, Option>, )>, From b6f09673683e75b925f93240666ae95cbc8e2fc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Tue, 31 Jul 2018 14:54:06 +0200 Subject: [PATCH 5/9] Fix and add tests. --- Cargo.lock | 14 ++++++------ demo/cli/src/lib.rs | 3 ++- substrate/rpc/src/state/tests.rs | 38 ++++++++++++++++++++++++++++++-- 3 files changed, 45 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fea5051c08438..d4e42f6f76c72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -975,7 +975,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "jsonrpc-core" version = "8.0.2" -source = "git+https://github.com/paritytech/jsonrpc.git#c3f02dfbbef0c4afe16633f503917d4a0501ee9d" +source = "git+https://github.com/paritytech/jsonrpc.git#7e5df8ca2acc01c608b2d2bd8cdcdb19d8cbaa90" dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -987,7 +987,7 @@ dependencies = [ [[package]] name = "jsonrpc-http-server" version = "8.0.1" -source = "git+https://github.com/paritytech/jsonrpc.git#c3f02dfbbef0c4afe16633f503917d4a0501ee9d" +source = "git+https://github.com/paritytech/jsonrpc.git#7e5df8ca2acc01c608b2d2bd8cdcdb19d8cbaa90" dependencies = [ "hyper 0.11.27 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 8.0.2 (git+https://github.com/paritytech/jsonrpc.git)", @@ -1000,7 +1000,7 @@ dependencies = [ [[package]] name = "jsonrpc-macros" version = "8.0.1" -source = "git+https://github.com/paritytech/jsonrpc.git#c3f02dfbbef0c4afe16633f503917d4a0501ee9d" +source = "git+https://github.com/paritytech/jsonrpc.git#7e5df8ca2acc01c608b2d2bd8cdcdb19d8cbaa90" dependencies = [ "jsonrpc-core 8.0.2 (git+https://github.com/paritytech/jsonrpc.git)", "jsonrpc-pubsub 8.0.1 (git+https://github.com/paritytech/jsonrpc.git)", @@ -1010,7 +1010,7 @@ dependencies = [ [[package]] name = "jsonrpc-pubsub" version = "8.0.1" -source = "git+https://github.com/paritytech/jsonrpc.git#c3f02dfbbef0c4afe16633f503917d4a0501ee9d" +source = "git+https://github.com/paritytech/jsonrpc.git#7e5df8ca2acc01c608b2d2bd8cdcdb19d8cbaa90" dependencies = [ "jsonrpc-core 8.0.2 (git+https://github.com/paritytech/jsonrpc.git)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1020,7 +1020,7 @@ dependencies = [ [[package]] name = "jsonrpc-server-utils" version = "8.0.1" -source = "git+https://github.com/paritytech/jsonrpc.git#c3f02dfbbef0c4afe16633f503917d4a0501ee9d" +source = "git+https://github.com/paritytech/jsonrpc.git#7e5df8ca2acc01c608b2d2bd8cdcdb19d8cbaa90" dependencies = [ "bytes 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "globset 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1033,7 +1033,7 @@ dependencies = [ [[package]] name = "jsonrpc-ws-server" version = "8.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git#c3f02dfbbef0c4afe16633f503917d4a0501ee9d" +source = "git+https://github.com/paritytech/jsonrpc.git#7e5df8ca2acc01c608b2d2bd8cdcdb19d8cbaa90" dependencies = [ "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 8.0.2 (git+https://github.com/paritytech/jsonrpc.git)", @@ -2708,7 +2708,7 @@ dependencies = [ "hex-literal 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/demo/cli/src/lib.rs b/demo/cli/src/lib.rs index 08834541f52bf..0f6142b90f273 100644 --- a/demo/cli/src/lib.rs +++ b/demo/cli/src/lib.rs @@ -173,9 +173,10 @@ pub fn run(args: I) -> error::Result<()> where let mut runtime = Runtime::new()?; let _rpc_servers = { let handler = || { + let state = rpc::apis::chain::State::new(client.clone(), runtime.executor()); let chain = rpc::apis::chain::Chain::new(client.clone(), runtime.executor()); let author = rpc::apis::author::Author::new(client.clone(), Arc::new(DummyPool), runtime.executor()); - rpc::rpc_handler::(client.clone(), chain, author, DummySystem) + rpc::rpc_handler::(state, chain, author, DummySystem) }; let http_address = "127.0.0.1:9933".parse().unwrap(); let ws_address = "127.0.0.1:9944".parse().unwrap(); diff --git a/substrate/rpc/src/state/tests.rs b/substrate/rpc/src/state/tests.rs index 1b98711ea5e99..37d86cb9266b9 100644 --- a/substrate/rpc/src/state/tests.rs +++ b/substrate/rpc/src/state/tests.rs @@ -16,26 +16,60 @@ use super::*; use self::error::{Error, ErrorKind}; +use jsonrpc_macros::pubsub; +use client::BlockOrigin; use test_client::{self, TestClient}; #[test] fn should_return_storage() { + let core = ::tokio::runtime::Runtime::new().unwrap(); let client = Arc::new(test_client::new()); let genesis_hash = client.genesis_hash(); + let client = State::new(client, core.executor()); assert_matches!( - StateApi::storage_at(&client, StorageKey(vec![10]), genesis_hash), + client.storage_at(StorageKey(vec![10]), genesis_hash), Err(Error(ErrorKind::Client(client::error::ErrorKind::NoValueForKey(ref k)), _)) if *k == vec![10] ) } #[test] fn should_call_contract() { + let core = ::tokio::runtime::Runtime::new().unwrap(); let client = Arc::new(test_client::new()); let genesis_hash = client.genesis_hash(); + let client = State::new(client, core.executor()); assert_matches!( - StateApi::call_at(&client, "balanceOf".into(), vec![1,2,3], genesis_hash), + client.call_at("balanceOf".into(), vec![1,2,3], genesis_hash), Err(Error(ErrorKind::Client(client::error::ErrorKind::Execution(_)), _)) ) } + +#[test] +fn should_notify_about_storage_changes() { + let mut core = ::tokio::runtime::Runtime::new().unwrap(); + let remote = core.executor(); + let (subscriber, id, transport) = pubsub::Subscriber::new_test("test"); + + { + let api = State { + client: Arc::new(test_client::new()), + subscriptions: Subscriptions::new(remote), + }; + + api.subscribe_storage(Default::default(), subscriber, None.into()); + + // assert id assigned + assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(0)))); + + let builder = api.client.new_block().unwrap(); + api.client.justify_and_import(BlockOrigin::Own, builder.bake().unwrap()).unwrap(); + } + + // assert notification send to transport + let (notification, next) = core.block_on(transport.into_future()).unwrap(); + assert!(notification.is_some()); + // no more notifications on this channel + assert_eq!(core.block_on(next.into_future()).unwrap().0, None); +} From 7b3b494c8a989c018aa8635bb840ce042d388826 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Tue, 31 Jul 2018 16:15:25 +0200 Subject: [PATCH 6/9] Swap alias and RPC name. --- substrate/rpc/src/chain/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/substrate/rpc/src/chain/mod.rs b/substrate/rpc/src/chain/mod.rs index c027a07aaef1a..a7a8e8c0e046a 100644 --- a/substrate/rpc/src/chain/mod.rs +++ b/substrate/rpc/src/chain/mod.rs @@ -50,11 +50,11 @@ build_rpc_trait! { #[pubsub(name = "chain_newHead")] { /// New head subscription - #[rpc(name = "subscribe_newHead", alias = ["chain_subscribeNewHead", ])] + #[rpc(name = "chain_subscribeNewHead", alias = ["subscribe_newHead", ])] fn subscribe_new_head(&self, Self::Metadata, pubsub::Subscriber
); /// Unsubscribe from new head subscription. - #[rpc(name = "unsubscribe_newHead", alias = ["chain_unsubscribeNewHead", ])] + #[rpc(name = "chain_unsubscribeNewHead", alias = ["unsubscribe_newHead", ])] fn unsubscribe_new_head(&self, SubscriptionId) -> RpcResult; } } From 79d35074b21b26f697ff9b6805da1d3ee503bc8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Wed, 1 Aug 2018 11:29:15 +0200 Subject: [PATCH 7/9] Fix demo. --- demo/cli/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/demo/cli/src/lib.rs b/demo/cli/src/lib.rs index 0f6142b90f273..6eb6bfeaccdc5 100644 --- a/demo/cli/src/lib.rs +++ b/demo/cli/src/lib.rs @@ -173,7 +173,7 @@ pub fn run(args: I) -> error::Result<()> where let mut runtime = Runtime::new()?; let _rpc_servers = { let handler = || { - let state = rpc::apis::chain::State::new(client.clone(), runtime.executor()); + let state = rpc::apis::state::State::new(client.clone(), runtime.executor()); let chain = rpc::apis::chain::Chain::new(client.clone(), runtime.executor()); let author = rpc::apis::author::Author::new(client.clone(), Arc::new(DummyPool), runtime.executor()); rpc::rpc_handler::(state, chain, author, DummySystem) From 488325955889b932d3a73ec7f5d044851f050fef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Wed, 1 Aug 2018 11:50:52 +0200 Subject: [PATCH 8/9] Addressing review grumbles. --- substrate/client/src/client.rs | 21 +++++++++++++-------- substrate/client/src/notifications.rs | 18 +++++++++--------- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/substrate/client/src/client.rs b/substrate/client/src/client.rs index 10128d948993b..bd7f3c5581a2e 100644 --- a/substrate/client/src/client.rs +++ b/substrate/client/src/client.rs @@ -55,7 +55,7 @@ pub trait BlockchainEvents { /// Get storage changes event stream. /// - /// Passing `None` as keys subscribes to all possible keys + /// Passing `None` as `filter_keys` subscribes to all storage changes. fn storage_changes_notification_stream(&self, filter_keys: Option<&[StorageKey]>) -> error::Result>; } @@ -340,7 +340,7 @@ impl Client where } let mut transaction = self.backend.begin_operation(BlockId::Hash(parent_hash))?; - let storage_update = match transaction.state()? { + let (storage_update, storage_changes) = match transaction.state()? { Some(transaction_state) => { let mut overlay = Default::default(); let mut r = self.executor.call_at_state( @@ -368,9 +368,9 @@ impl Client where ); let (_, storage_update) = r?; overlay.commit_prospective(); - Some((storage_update, overlay.into_committed())) + (Some(storage_update), Some(overlay.into_committed())) }, - None => None, + None => (None, None) }; let is_new_best = header.number() == &(self.backend.blockchain().info()?.best_number + One::one()); @@ -378,14 +378,19 @@ impl Client where let unchecked: bft::UncheckedJustification<_> = justification.uncheck().into(); transaction.set_block_data(header.clone(), body, Some(unchecked.into()), is_new_best)?; transaction.update_authorities(authorities); - if let Some((storage_update, changes)) = storage_update { + if let Some(storage_update) = storage_update { transaction.update_storage(storage_update)?; - // TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes? - self.storage_notifications.lock() - .trigger(&hash, changes); } self.backend.commit_operation(transaction)?; + if origin == BlockOrigin::NetworkBroadcast || origin == BlockOrigin::Own || origin == BlockOrigin::ConsensusBroadcast { + + if let Some(storage_changes) = storage_changes { + // TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes? + self.storage_notifications.lock() + .trigger(&hash, storage_changes); + } + let notification = BlockImportNotification:: { hash: hash, origin: origin, diff --git a/substrate/client/src/notifications.rs b/substrate/client/src/notifications.rs index 2cc2b613b1536..357937f0be956 100644 --- a/substrate/client/src/notifications.rs +++ b/substrate/client/src/notifications.rs @@ -106,16 +106,16 @@ impl StorageNotifications { let changes = Arc::new(changes); // Trigger the events for subscriber in subscribers { - let remove_subscriber = match self.sinks.get(&subscriber) { - Some(&(ref sink, ref filter)) => { - sink.unbounded_send((hash.clone(), StorageChangeSet { - changes: changes.clone(), - filter: filter.clone(), - })).is_err() - }, - None => false, + let should_remove = { + let &(ref sink, ref filter) = self.sinks.get(&subscriber) + .expect("subscribers returned from self.listeners are always in self.sinks; qed"); + sink.unbounded_send((hash.clone(), StorageChangeSet { + changes: changes.clone(), + filter: filter.clone(), + })).is_err() }; - if remove_subscriber { + + if should_remove { self.remove_subscriber(subscriber); } } From e5e7c4cb6e1cd06685500b299c293b8a074fb4c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Wed, 1 Aug 2018 13:36:27 +0200 Subject: [PATCH 9/9] Fix comment. --- substrate/client/src/notifications.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/substrate/client/src/notifications.rs b/substrate/client/src/notifications.rs index 357937f0be956..a64ee0f4dd6ac 100644 --- a/substrate/client/src/notifications.rs +++ b/substrate/client/src/notifications.rs @@ -1,18 +1,18 @@ // Copyright 2017 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. +// This file is part of Substrate. -// Polkadot is free software: you can redistribute it and/or modify +// Substrate is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. -// Polkadot is distributed in the hope that it will be useful, +// Substrate is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . +// along with Substrate. If not, see . //! Storage notifications