From c6ebffb48862ead9904ca647204463560b2b6935 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Mon, 30 Jul 2018 11:05:22 +0300 Subject: [PATCH 1/2] storage proofs --- Cargo.lock | 1 + substrate/client/src/client.rs | 13 +++- substrate/client/src/light/backend.rs | 55 ++++++++++++-- substrate/client/src/light/fetcher.rs | 51 +++++++++++-- substrate/network/Cargo.toml | 1 + substrate/network/src/chain.rs | 7 ++ substrate/network/src/lib.rs | 3 +- substrate/network/src/message.rs | 81 ++++++++++++++++++++- substrate/network/src/on_demand.rs | 73 +++++++++++++++---- substrate/network/src/protocol.rs | 23 ++++++ substrate/service/src/components.rs | 5 +- substrate/state-machine/src/lib.rs | 37 ++++++++++ substrate/state-machine/src/trie_backend.rs | 3 + 13 files changed, 321 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 17895bf28deb0..92be9f69c4d9b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2787,6 +2787,7 @@ dependencies = [ "linked-hash-map 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "substrate-client 0.1.0", "substrate-codec 0.1.0", "substrate-keyring 0.1.0", diff --git a/substrate/client/src/client.rs b/substrate/client/src/client.rs index c125a1aa4919b..781f3aaab0147 100644 --- a/substrate/client/src/client.rs +++ b/substrate/client/src/client.rs @@ -25,7 +25,10 @@ use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Zero, One, use runtime_primitives::BuildStorage; use primitives::storage::{StorageKey, StorageData}; use codec::Decode; -use state_machine::{Ext, OverlayedChanges, Backend as StateBackend, CodeExecutor, ExecutionStrategy, ExecutionManager}; +use state_machine::{ + Ext, OverlayedChanges, Backend as StateBackend, CodeExecutor, + ExecutionStrategy, ExecutionManager, prove_read +}; use backend::{self, BlockImportOperation}; use blockchain::{self, Info as ChainInfo, Backend as ChainBackend, HeaderBackend as ChainHeaderBackend}; @@ -233,6 +236,14 @@ impl Client where &self.executor } + /// Reads storage value at a given block + key, returning read proof. + pub fn read_proof(&self, id: &BlockId, key: &[u8]) -> error::Result>> { + self.state_at(id) + .and_then(|state| prove_read(state, key) + .map(|(_, proof)| proof) + .map_err(Into::into)) + } + /// Execute a call to a contract on top of state in a block of given hash /// AND returning execution proof. /// diff --git a/substrate/client/src/light/backend.rs b/substrate/client/src/light/backend.rs index d06aef80bf5ab..6851b54d1a62b 100644 --- a/substrate/client/src/light/backend.rs +++ b/substrate/client/src/light/backend.rs @@ -18,6 +18,7 @@ //! Everything else is requested from full nodes on demand. use std::sync::{Arc, Weak}; +use futures::{Future, IntoFuture}; use primitives::AuthorityId; use runtime_primitives::{bft::Justification, generic::BlockId}; @@ -29,7 +30,7 @@ use backend::{Backend as ClientBackend, BlockImportOperation, RemoteBackend}; use blockchain::HeaderBackend as BlockchainHeaderBackend; use error::{Error as ClientError, ErrorKind as ClientErrorKind, Result as ClientResult}; use light::blockchain::{Blockchain, Storage as BlockchainStorage}; -use light::fetcher::Fetcher; +use light::fetcher::{Fetcher, RemoteReadRequest}; /// Light client backend. pub struct Backend { @@ -152,8 +153,13 @@ impl StateBackend for OnDemandState where Block: BlockT, F: type Error = ClientError; type Transaction = (); - fn storage(&self, _key: &[u8]) -> ClientResult>> { - Err(ClientErrorKind::NotAvailableOnLightClient.into()) // TODO: fetch from remote node + fn storage(&self, key: &[u8]) -> ClientResult>> { + self.fetcher.upgrade().ok_or(ClientErrorKind::NotAvailableOnLightClient)? + .remote_read(RemoteReadRequest { + block: self.block, + key: key.to_vec(), + }) + .into_future().wait() } fn for_keys_with_prefix(&self, _prefix: &[u8], _action: A) { @@ -179,20 +185,57 @@ impl TryIntoStateTrieBackend for OnDemandState where Block: #[cfg(test)] pub mod tests { - use futures::future::{ok, FutureResult}; + use futures::future::{ok, err, FutureResult}; use parking_lot::Mutex; use call_executor::CallResult; + use executor::{self, NativeExecutionDispatch}; use error::Error as ClientError; - use test_client::runtime::{Hash, Block}; - use light::fetcher::{Fetcher, RemoteCallRequest}; + use test_client::{self, runtime::{Hash, Block}}; + use in_mem::{Blockchain as InMemoryBlockchain}; + use light::{new_fetch_checker, new_light_blockchain}; + use light::fetcher::{Fetcher, FetchChecker, LightDataChecker, RemoteCallRequest}; + use super::*; pub type OkCallFetcher = Mutex; impl Fetcher for OkCallFetcher { + type RemoteReadResult = FutureResult>, ClientError>; type RemoteCallResult = FutureResult; + fn remote_read(&self, _request: RemoteReadRequest) -> Self::RemoteReadResult { + err("Not implemented on test node".into()) + } + fn remote_call(&self, _request: RemoteCallRequest) -> Self::RemoteCallResult { ok((*self.lock()).clone()) } } + + #[test] + fn storage_read_proof_is_generated_and_checked() { + // prepare remote client + let remote_client = test_client::new(); + let remote_block_id = BlockId::Number(0); + let remote_block_hash = remote_client.block_hash(0).unwrap().unwrap(); + let mut remote_block_header = remote_client.header(&remote_block_id).unwrap().unwrap(); + remote_block_header.state_root = remote_client.state_at(&remote_block_id).unwrap().storage_root(::std::iter::empty()).0.into(); + + // 'fetch' read proof from remote node + let authorities_len = remote_client.authorities_at(&remote_block_id).unwrap().len(); + let remote_read_proof = remote_client.read_proof(&remote_block_id, b":auth:len").unwrap(); + + // check remote read proof locally + let local_storage = InMemoryBlockchain::::new(); + local_storage.insert(remote_block_hash, remote_block_header, None, None, true); + let local_executor = test_client::LocalExecutor::with_heap_pages(8, 8); + let local_checker: LightDataChecker< + InMemoryBlockchain, + executor::NativeExecutor, + OkCallFetcher + > = new_fetch_checker(new_light_blockchain(local_storage), local_executor); + assert_eq!(local_checker.check_read_proof(&RemoteReadRequest { + block: remote_block_hash, + key: b":auth:len".to_vec(), + }, remote_read_proof).unwrap().unwrap()[0], authorities_len as u8); + } } diff --git a/substrate/client/src/light/fetcher.rs b/substrate/client/src/light/fetcher.rs index 90622bf6fc008..5cda7d766e8f2 100644 --- a/substrate/client/src/light/fetcher.rs +++ b/substrate/client/src/light/fetcher.rs @@ -19,13 +19,15 @@ use std::sync::Arc; use futures::IntoFuture; -use runtime_primitives::traits::{Block as BlockT}; -use state_machine::CodeExecutor; +use runtime_primitives::generic::BlockId; +use runtime_primitives::traits::{Block as BlockT, Header as HeaderT}; +use state_machine::{CodeExecutor, read_proof_check}; use call_executor::CallResult; -use error::{Error as ClientError, Result as ClientResult}; +use error::{Error as ClientError, ErrorKind as ClientErrorKind, Result as ClientResult}; use light::blockchain::{Blockchain, Storage as BlockchainStorage}; use light::call_executor::check_execution_proof; +use blockchain::HeaderBackend as BlockchainHeaderBackend; /// Remote call request. #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -38,20 +40,43 @@ pub struct RemoteCallRequest { pub call_data: Vec, } +/// Remote storage read request. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct RemoteReadRequest { + /// Read at state of given block. + pub block: Hash, + /// Storage key to read. + pub key: Vec, +} + /// Light client data fetcher. Implementations of this trait must check if remote data /// is correct (see FetchedDataChecker) and return already checked data. pub trait Fetcher: Send + Sync { + /// Remote storage read future. + type RemoteReadResult: IntoFuture>, Error=ClientError>; /// Remote call result future. type RemoteCallResult: IntoFuture; + /// Fetch remote storage value. + fn remote_read(&self, request: RemoteReadRequest) -> Self::RemoteReadResult; /// Fetch remote call result. fn remote_call(&self, request: RemoteCallRequest) -> Self::RemoteCallResult; } /// Light client remote data checker. pub trait FetchChecker: Send + Sync { + /// Check remote storage read proof. + fn check_read_proof( + &self, + request: &RemoteReadRequest, + remote_proof: Vec> + ) -> ClientResult>>; /// Check remote method execution proof. - fn check_execution_proof(&self, request: &RemoteCallRequest, remote_proof: Vec>) -> ClientResult; + fn check_execution_proof( + &self, + request: &RemoteCallRequest, + remote_proof: Vec> + ) -> ClientResult; } /// Remote data checker. @@ -78,11 +103,27 @@ impl LightDataChecker { impl FetchChecker for LightDataChecker where Block: BlockT, + Block::Hash: Into<[u8; 32]>, S: BlockchainStorage, E: CodeExecutor, F: Fetcher, { - fn check_execution_proof(&self, request: &RemoteCallRequest, remote_proof: Vec>) -> ClientResult { + fn check_read_proof( + &self, + request: &RemoteReadRequest, + remote_proof: Vec> + ) -> ClientResult>> { + let local_header = self.blockchain.header(BlockId::Hash(request.block))?; + let local_header = local_header.ok_or_else(|| ClientErrorKind::UnknownBlock(format!("{}", request.block)))?; + let local_state_root = *local_header.state_root(); + read_proof_check(local_state_root.into(), remote_proof, &request.key).map_err(Into::into) + } + + fn check_execution_proof( + &self, + request: &RemoteCallRequest, + remote_proof: Vec> + ) -> ClientResult { check_execution_proof(&*self.blockchain, &self.executor, request, remote_proof) } } diff --git a/substrate/network/Cargo.toml b/substrate/network/Cargo.toml index 0185cf49f2216..7362911ee532f 100644 --- a/substrate/network/Cargo.toml +++ b/substrate/network/Cargo.toml @@ -14,6 +14,7 @@ error-chain = "0.12" bitflags = "1.0" futures = "0.1.17" linked-hash-map = "0.5" +rustc-hex = "1.0" ethcore-io = { git = "https://github.com/paritytech/parity.git" } ed25519 = { path = "../../substrate/ed25519" } substrate-primitives = { path = "../../substrate/primitives" } diff --git a/substrate/network/src/chain.rs b/substrate/network/src/chain.rs index f20b7e2b5e688..d698ab0900aa7 100644 --- a/substrate/network/src/chain.rs +++ b/substrate/network/src/chain.rs @@ -45,6 +45,9 @@ pub trait Client: Send + Sync { /// Get block justification. fn justification(&self, id: &BlockId) -> Result>, Error>; + /// Get storage read execution proof. + fn read_proof(&self, block: &Block::Hash, key: &[u8]) -> Result>, Error>; + /// Get method execution proof. fn execution_proof(&self, block: &Block::Hash, method: &str, data: &[u8]) -> Result<(Vec, Vec>), Error>; } @@ -84,6 +87,10 @@ impl Client for SubstrateClient where (self as &SubstrateClient).justification(id) } + fn read_proof(&self, block: &Block::Hash, key: &[u8]) -> Result>, Error> { + (self as &SubstrateClient).read_proof(&BlockId::Hash(block.clone()), key) + } + fn execution_proof(&self, block: &Block::Hash, method: &str, data: &[u8]) -> Result<(Vec, Vec>), Error> { (self as &SubstrateClient).execution_proof(&BlockId::Hash(block.clone()), method, data) } diff --git a/substrate/network/src/lib.rs b/substrate/network/src/lib.rs index 2c6b880013b31..8d5f2bc5ffd49 100644 --- a/substrate/network/src/lib.rs +++ b/substrate/network/src/lib.rs @@ -30,6 +30,7 @@ extern crate substrate_network_libp2p as network_libp2p; extern crate substrate_codec as codec; extern crate futures; extern crate ed25519; +extern crate rustc_hex; #[macro_use] extern crate log; #[macro_use] extern crate bitflags; #[macro_use] extern crate error_chain; @@ -63,4 +64,4 @@ pub use network_libp2p::{NonReservedPeerMode, NetworkConfiguration, NodeIndex, P pub use message::{generic as generic_message, RequestId, BftMessage, LocalizedBftMessage, ConsensusVote, SignedConsensusVote, SignedConsensusMessage, SignedConsensusProposal, Status as StatusMessage}; pub use error::Error; pub use config::{Roles, ProtocolConfig}; -pub use on_demand::{OnDemand, OnDemandService, RemoteCallResponse}; +pub use on_demand::{OnDemand, OnDemandService, RemoteResponse}; diff --git a/substrate/network/src/message.rs b/substrate/network/src/message.rs index e7e6addc39d9f..d2bbcdf3e7581 100644 --- a/substrate/network/src/message.rs +++ b/substrate/network/src/message.rs @@ -18,7 +18,10 @@ use runtime_primitives::traits::{Block as BlockT, Header as HeaderT}; use codec::{Encode, Decode, Input, Output}; -pub use self::generic::{BlockAnnounce, RemoteCallRequest, ConsensusVote, SignedConsensusVote, FromBlock}; +pub use self::generic::{ + BlockAnnounce, RemoteCallRequest, RemoteReadRequest, + ConsensusVote, SignedConsensusVote, FromBlock +}; /// A unique ID of a request. pub type RequestId = u64; @@ -135,7 +138,32 @@ impl Decode for RemoteCallResponse { }) } } - + +#[derive(Debug, PartialEq, Eq, Clone)] +/// Remote read response. +pub struct RemoteReadResponse { + /// Id of a request this response was made for. + pub id: RequestId, + /// Read proof. + pub proof: Vec>, +} + +impl Encode for RemoteReadResponse { + fn encode_to(&self, dest: &mut T) { + dest.push(&self.id); + dest.push(&self.proof); + } +} + +impl Decode for RemoteReadResponse { + fn decode(input: &mut I) -> Option { + Some(RemoteReadResponse { + id: Decode::decode(input)?, + proof: Decode::decode(input)?, + }) + } +} + /// Generic types. pub mod generic { use primitives::AuthorityId; @@ -143,8 +171,10 @@ pub mod generic { use runtime_primitives::bft::Justification; use ed25519; use service::Roles; - use super::{BlockAttributes, RemoteCallResponse, RequestId, Transactions, Direction}; - + use super::{ + BlockAttributes, RemoteCallResponse, RemoteReadResponse, + RequestId, Transactions, Direction + }; /// Block data sent in the response. #[derive(Debug, PartialEq, Eq, Clone)] @@ -446,6 +476,10 @@ pub mod generic { RemoteCallRequest(RemoteCallRequest), /// Remote method call response. RemoteCallResponse(RemoteCallResponse), + /// Remote storage read request. + RemoteReadRequest(RemoteReadRequest), + /// Remote storage read response. + RemoteReadResponse(RemoteReadResponse), /// Chain-specific message ChainSpecific(Vec), } @@ -487,6 +521,14 @@ pub mod generic { dest.push_byte(7); dest.push(m); } + Message::RemoteReadRequest(ref m) => { + dest.push_byte(8); + dest.push(m); + } + Message::RemoteReadResponse(ref m) => { + dest.push_byte(9); + dest.push(m); + } Message::ChainSpecific(ref m) => { dest.push_byte(255); dest.push(m); @@ -508,6 +550,8 @@ pub mod generic { 5 => Some(Message::BftMessage(Decode::decode(input)?)), 6 => Some(Message::RemoteCallRequest(Decode::decode(input)?)), 7 => Some(Message::RemoteCallResponse(Decode::decode(input)?)), + 8 => Some(Message::RemoteReadRequest(Decode::decode(input)?)), + 9 => Some(Message::RemoteReadResponse(Decode::decode(input)?)), 255 => Some(Message::ChainSpecific(Decode::decode(input)?)), _ => None, } @@ -678,4 +722,33 @@ pub mod generic { }) } } + + #[derive(Debug, PartialEq, Eq, Clone)] + /// Remote storage read request. + pub struct RemoteReadRequest { + /// Unique request id. + pub id: RequestId, + /// Block at which to perform call. + pub block: H, + /// Storage key. + pub key: Vec, + } + + impl Encode for RemoteReadRequest { + fn encode_to(&self, dest: &mut T) { + dest.push(&self.id); + dest.push(&self.block); + dest.push(&self.key); + } + } + + impl Decode for RemoteReadRequest { + fn decode(input: &mut I) -> Option { + Some(RemoteReadRequest { + id: Decode::decode(input)?, + block: Decode::decode(input)?, + key: Decode::decode(input)?, + }) + } + } } diff --git a/substrate/network/src/on_demand.rs b/substrate/network/src/on_demand.rs index 8a8e76a3d0bbf..069cd84c5c658 100644 --- a/substrate/network/src/on_demand.rs +++ b/substrate/network/src/on_demand.rs @@ -25,7 +25,7 @@ use linked_hash_map::LinkedHashMap; use linked_hash_map::Entry; use parking_lot::Mutex; use client; -use client::light::fetcher::{Fetcher, FetchChecker, RemoteCallRequest}; +use client::light::fetcher::{Fetcher, FetchChecker, RemoteCallRequest, RemoteReadRequest}; use io::SyncIo; use message; use network_libp2p::{Severity, NodeIndex}; @@ -46,6 +46,9 @@ pub trait OnDemandService: Send + Sync { /// Maintain peers requests. fn maintain_peers(&self, io: &mut SyncIo); + /// When read response is received from remote node. + fn on_remote_read_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteReadResponse); + /// When call response is received from remote node. fn on_remote_call_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteCallResponse); } @@ -57,8 +60,8 @@ pub struct OnDemand> { } /// On-demand remote call response. -pub struct RemoteCallResponse { - receiver: Receiver>, +pub struct RemoteResponse { + receiver: Receiver>, } #[derive(Default)] @@ -77,16 +80,18 @@ struct Request { } enum RequestData { + RemoteRead(RemoteReadRequest, Sender>, client::error::Error>>), RemoteCall(RemoteCallRequest, Sender>), } enum Accept { Ok, CheckFailed(client::error::Error, RequestData), + Unexpected(RequestData), } -impl Future for RemoteCallResponse { - type Item = client::CallResult; +impl Future for RemoteResponse { + type Item = T; type Error = client::error::Error; fn poll(&mut self) -> Poll { @@ -150,6 +155,10 @@ impl OnDemand where core.remove_peer(peer); Some(retry_request_data) }, + Accept::Unexpected(retry_request_data) => { + trace!(target: "sync", "Unexpected response to remote {} from peer {}", rtype, peer); + Some(retry_request_data) + }, }; if let Some(request_data) = retry_request_data { @@ -189,6 +198,20 @@ impl OnDemandService for OnDemand where core.dispatch(); } + fn on_remote_read_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteReadResponse) { + self.accept_response("read", io, peer, response.id, |request| match request.data { + RequestData::RemoteRead(request, sender) => match self.checker.check_read_proof(&request, response.proof) { + Ok(response) => { + // we do not bother if receiver has been dropped already + let _ = sender.send(Ok(response)); + Accept::Ok + }, + Err(error) => Accept::CheckFailed(error, RequestData::RemoteRead(request, sender)), + }, + data @ _ => Accept::Unexpected(data), + }) + } + fn on_remote_call_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteCallResponse) { self.accept_response("call", io, peer, response.id, |request| match request.data { RequestData::RemoteCall(request, sender) => match self.checker.check_execution_proof(&request, response.proof) { @@ -199,6 +222,7 @@ impl OnDemandService for OnDemand where }, Err(error) => Accept::CheckFailed(error, RequestData::RemoteCall(request, sender)), }, + data @ _ => Accept::Unexpected(data), }) } } @@ -208,12 +232,19 @@ impl Fetcher for OnDemand where E: service::ExecuteInContext, B::Header: HeaderT, { - type RemoteCallResult = RemoteCallResponse; + type RemoteReadResult = RemoteResponse>>; + type RemoteCallResult = RemoteResponse; + + fn remote_read(&self, request: RemoteReadRequest) -> Self::RemoteReadResult { + let (sender, receiver) = channel(); + self.schedule_request(RequestData::RemoteRead(request, sender), + RemoteResponse { receiver }) + } fn remote_call(&self, request: RemoteCallRequest) -> Self::RemoteCallResult { let (sender, receiver) = channel(); self.schedule_request(RequestData::RemoteCall(request, sender), - RemoteCallResponse { receiver }) + RemoteResponse { receiver }) } } @@ -301,12 +332,19 @@ impl OnDemandCore where impl Request { pub fn message(&self) -> message::Message { match self.data { - RequestData::RemoteCall(ref data, _) => message::generic::Message::RemoteCallRequest(message::RemoteCallRequest { - id: self.id, - block: data.block, - method: data.method.clone(), - data: data.call_data.clone(), - }), + RequestData::RemoteRead(ref data, _) => message::generic::Message::RemoteReadRequest( + message::RemoteReadRequest { + id: self.id, + block: data.block, + key: data.key.clone(), + }), + RequestData::RemoteCall(ref data, _) => message::generic::Message::RemoteCallRequest( + message::RemoteCallRequest { + id: self.id, + block: data.block, + method: data.method.clone(), + data: data.call_data.clone(), + }), } } } @@ -319,7 +357,7 @@ pub mod tests { use futures::Future; use parking_lot::RwLock; use client; - use client::light::fetcher::{Fetcher, FetchChecker, RemoteCallRequest}; + use client::light::fetcher::{Fetcher, FetchChecker, RemoteCallRequest, RemoteReadRequest}; use message; use network_libp2p::NodeIndex; use service::{Roles, ExecuteInContext}; @@ -335,6 +373,13 @@ pub mod tests { } impl FetchChecker for DummyFetchChecker { + fn check_read_proof(&self, _request: &RemoteReadRequest, _remote_proof: Vec>) -> client::error::Result>> { + match self.ok { + true => Ok(Some(vec![42])), + false => Err(client::error::ErrorKind::Backend("Test error".into()).into()), + } + } + fn check_execution_proof(&self, _request: &RemoteCallRequest, _remote_proof: Vec>) -> client::error::Result { match self.ok { true => Ok(client::CallResult { diff --git a/substrate/network/src/protocol.rs b/substrate/network/src/protocol.rs index e8de0f218b580..545fcc06bc934 100644 --- a/substrate/network/src/protocol.rs +++ b/substrate/network/src/protocol.rs @@ -19,6 +19,7 @@ use std::{mem, cmp}; use std::sync::Arc; use std::time; use parking_lot::RwLock; +use rustc_hex::ToHex; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashFor, As}; use runtime_primitives::generic::BlockId; use network_libp2p::{NodeIndex, Severity}; @@ -268,6 +269,8 @@ impl> Protocol { GenericMessage::Transactions(m) => self.on_extrinsics(io, who, m), GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(io, who, request), GenericMessage::RemoteCallResponse(response) => self.on_remote_call_response(io, who, response), + GenericMessage::RemoteReadRequest(request) => self.on_remote_read_request(io, who, request), + GenericMessage::RemoteReadResponse(response) => self.on_remote_read_response(io, who, response), other => self.specialization.write().on_message(&mut ProtocolContext::new(&self.context_data, io), who, other), } } @@ -602,6 +605,26 @@ impl> Protocol { self.on_demand.as_ref().map(|s| s.on_remote_call_response(io, who, response)); } + fn on_remote_read_request(&self, io: &mut SyncIo, who: NodeIndex, request: message::RemoteReadRequest) { + trace!(target: "sync", "Remote read request {} from {} ({} at {})", + request.id, who, request.key.to_hex(), request.block); + let proof = match self.context_data.chain.read_proof(&request.block, &request.key) { + Ok(proof) => proof, + Err(error) => { + trace!(target: "sync", "Remote read request {} from {} ({} at {}) failed with: {}", + request.id, who, request.key.to_hex(), request.block, error); + Default::default() + }, + }; + self.send_message(io, who, GenericMessage::RemoteReadResponse(message::RemoteReadResponse { + id: request.id, proof, + })); + } + fn on_remote_read_response(&self, io: &mut SyncIo, who: NodeIndex, response: message::RemoteReadResponse) { + trace!(target: "sync", "Remote read response {} from {}", response.id, who); + self.on_demand.as_ref().map(|s| s.on_remote_read_response(io, who, response)); + } + /// Execute a closure with access to a network context and specialization. pub fn with_spec(&self, io: &mut SyncIo, f: F) -> U where F: FnOnce(&mut S, &mut Context) -> U diff --git a/substrate/service/src/components.rs b/substrate/service/src/components.rs index fd96dc7c31688..058ea13dc7b84 100644 --- a/substrate/service/src/components.rs +++ b/substrate/service/src/components.rs @@ -213,7 +213,10 @@ pub struct LightComponents { _factory: PhantomData, } -impl Components for LightComponents { +impl Components for LightComponents + where + <::Block as BlockT>::Hash: Into<[u8; 32]>, +{ type Factory = Factory; type Executor = LightExecutor; type Backend = LightBackend; diff --git a/substrate/state-machine/src/lib.rs b/substrate/state-machine/src/lib.rs index 17fd85852b6ac..ab8d5b575d7ef 100644 --- a/substrate/state-machine/src/lib.rs +++ b/substrate/state-machine/src/lib.rs @@ -414,6 +414,29 @@ pub fn execution_proof_check( execute(&backend, overlay, exec, method, call_data, ExecutionStrategy::NativeWhenPossible) } +/// Generate storage read proof. +pub fn prove_read( + backend: B, + key: &[u8], +) -> Result<(Option>, Vec>), Box> +{ + let trie_backend = backend.try_into_trie_backend() + .ok_or_else(|| Box::new(ExecutionError::UnableToGenerateProof) as Box)?; + let proving_backend = proving_backend::ProvingBackend::new(trie_backend); + let result = proving_backend.storage(key).map_err(|e| Box::new(e) as Box)?; + Ok((result, proving_backend.extract_proof())) +} + /// Check storage read proof, generated by `prove` call. +pub fn read_proof_check( + root: [u8; 32], + proof: Vec>, + key: &[u8], +) -> Result>, Box> +{ + let backend = proving_backend::create_proof_check_backend(root.into(), proof)?; + backend.storage(key).map_err(|e| Box::new(e) as Box) +} + #[cfg(test)] mod tests { use super::*; @@ -607,4 +630,18 @@ mod tests { ], ); } + + #[test] + fn prove_read_and_proof_check_works() { + // fetch read proof from 'remote' full node + let remote_backend = trie_backend::tests::test_trie(); + let remote_root = remote_backend.storage_root(::std::iter::empty()).0; + let remote_proof = prove_read(remote_backend, b"value2").unwrap().1; + // check proof locally + let local_result1 = read_proof_check(remote_root, remote_proof.clone(), b"value2").unwrap(); + let local_result2 = read_proof_check(remote_root, remote_proof.clone(), &[0xff]).is_ok(); + // check that results are correct + assert_eq!(local_result1, Some(vec![24])); + assert_eq!(local_result2, false); + } } diff --git a/substrate/state-machine/src/trie_backend.rs b/substrate/state-machine/src/trie_backend.rs index 098d0f6df338c..629dd3ddb3a9c 100644 --- a/substrate/state-machine/src/trie_backend.rs +++ b/substrate/state-machine/src/trie_backend.rs @@ -282,6 +282,9 @@ pub mod tests { trie.insert(b"value1", &[42]).unwrap(); trie.insert(b"value2", &[24]).unwrap(); trie.insert(b":code", b"return 42").unwrap(); + for i in 128u8..255u8 { + trie.insert(&[i], &[i]).unwrap(); + } } (mdb, root) } From d0555c16acec4fe86a13b37c2b4623e2634f2d8e Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Tue, 7 Aug 2018 18:14:19 +0300 Subject: [PATCH 2/2] CHT --- substrate/client/db/src/lib.rs | 15 +- substrate/client/db/src/light.rs | 105 ++++++++- substrate/client/src/blockchain.rs | 2 + substrate/client/src/cht.rs | 261 +++++++++++++++++++++++ substrate/client/src/client.rs | 20 +- substrate/client/src/error.rs | 6 + substrate/client/src/in_mem.rs | 21 +- substrate/client/src/lib.rs | 1 + substrate/client/src/light/backend.rs | 57 ----- substrate/client/src/light/blockchain.rs | 39 +++- substrate/client/src/light/fetcher.rs | 203 +++++++++++++++++- substrate/network/src/chain.rs | 7 + substrate/network/src/message.rs | 71 +++++- substrate/network/src/on_demand.rs | 108 +++++++++- substrate/network/src/protocol.rs | 25 ++- substrate/service/src/components.rs | 2 +- 16 files changed, 863 insertions(+), 80 deletions(-) create mode 100644 substrate/client/src/cht.rs diff --git a/substrate/client/db/src/lib.rs b/substrate/client/db/src/lib.rs index b297341fa19f4..3ade4c2f1e04a 100644 --- a/substrate/client/db/src/lib.rs +++ b/substrate/client/db/src/lib.rs @@ -56,7 +56,8 @@ use runtime_primitives::BuildStorage; use state_machine::backend::Backend as StateBackend; use executor::RuntimeInfo; use state_machine::{CodeExecutor, TrieH256, DBValue, ExecutionStrategy}; -use utils::{Meta, db_err, meta_keys, number_to_db_key, open_database, read_db, read_id, read_meta}; +use utils::{Meta, db_err, meta_keys, number_to_db_key, db_key_to_number, open_database, + read_db, read_id, read_meta}; use state_db::StateDb; pub use state_db::PruningMode; @@ -178,6 +179,14 @@ impl client::blockchain::HeaderBackend for BlockchainDb Result::Number>, client::error::Error> { + read_id::(&*self.db, columns::BLOCK_INDEX, BlockId::Hash(hash)) + .and_then(|key| match key { + Some(key) => Ok(Some(db_key_to_number(&key)?)), + None => Ok(None), + }) + } + fn hash(&self, number: ::Number) -> Result, client::error::Error> { read_db::(&*self.db, columns::BLOCK_INDEX, columns::HEADER, BlockId::Number(number)).map(|x| x.map(|raw| HashFor::::hash(&raw[..])).map(Into::into) @@ -389,13 +398,13 @@ impl client::backend::Backend for Backend { } }; if let Some(finalizing_hash) = finalizing_hash { - trace!("Finalizing block #{} ({:?})", number_u64 - self.finalization_window, finalizing_hash); + trace!(target: "db", "Finalizing block #{} ({:?})", number_u64 - self.finalization_window, finalizing_hash); let commit = self.storage.state_db.finalize_block(&finalizing_hash); apply_state_commit(&mut transaction, commit); } } - debug!("DB Commit {:?} ({}), best = {}", hash, number, pending_block.is_best); + debug!(target: "db", "DB Commit {:?} ({}), best = {}", hash, number, pending_block.is_best); self.storage.db.write(transaction).map_err(db_err)?; self.blockchain.update_meta(hash, number, pending_block.is_best); } diff --git a/substrate/client/db/src/light.rs b/substrate/client/db/src/light.rs index a7e74a75711e3..425b7a2b717b0 100644 --- a/substrate/client/db/src/light.rs +++ b/substrate/client/db/src/light.rs @@ -23,14 +23,17 @@ use kvdb::{KeyValueDB, DBTransaction}; use client::blockchain::{BlockStatus, Cache as BlockchainCache, HeaderBackend as BlockchainHeaderBackend, Info as BlockchainInfo}; +use client::cht; use client::error::{ErrorKind as ClientErrorKind, Result as ClientResult}; use client::light::blockchain::Storage as LightBlockchainStorage; use codec::{Decode, Encode}; use primitives::AuthorityId; use runtime_primitives::generic::BlockId; -use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashFor, Zero, As}; +use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashFor, + Zero, One, As, NumberFor}; use cache::DbCache; -use utils::{meta_keys, Meta, db_err, number_to_db_key, open_database, read_db, read_id, read_meta}; +use utils::{meta_keys, Meta, db_err, number_to_db_key, db_key_to_number, open_database, + read_db, read_id, read_meta}; use DatabaseSettings; pub(crate) mod columns { @@ -38,10 +41,11 @@ pub(crate) mod columns { pub const BLOCK_INDEX: Option = Some(1); pub const HEADER: Option = Some(2); pub const AUTHORITIES: Option = Some(3); + pub const CHT: Option = Some(4); } /// Keep authorities for last 'AUTHORITIES_ENTRIES_TO_KEEP' blocks. -pub(crate) const AUTHORITIES_ENTRIES_TO_KEEP: u64 = 2048; +pub(crate) const AUTHORITIES_ENTRIES_TO_KEEP: u64 = cht::SIZE; /// Light blockchain storage. Stores most recent headers + CHTs for older headers. pub struct LightStorage { @@ -146,6 +150,14 @@ impl BlockchainHeaderBackend for LightStorage } } + fn number(&self, hash: Block::Hash) -> ClientResult::Header as HeaderT>::Number>> { + read_id::(&*self.db, columns::BLOCK_INDEX, BlockId::Hash(hash)) + .and_then(|key| match key { + Some(key) => Ok(Some(db_key_to_number(&key)?)), + None => Ok(None), + }) + } + fn hash(&self, number: <::Header as HeaderT>::Number) -> ClientResult> { read_db::(&*self.db, columns::BLOCK_INDEX, columns::HEADER, BlockId::Number(number)).map(|x| x.map(|raw| HashFor::::hash(&raw[..])).map(Into::into) @@ -156,6 +168,7 @@ impl BlockchainHeaderBackend for LightStorage impl LightBlockchainStorage for LightStorage where Block: BlockT, + Block::Hash: From<[u8; 32]>, { fn import_header(&self, is_new_best: bool, header: Block::Header, authorities: Option>) -> ClientResult<()> { let mut transaction = DBTransaction::new(); @@ -187,6 +200,26 @@ impl LightBlockchainStorage for LightStorage None }; + // build new CHT if required + if let Some(new_cht_number) = cht::is_build_required(cht::SIZE, *header.number()) { + let new_cht_start: NumberFor = cht::start_number(cht::SIZE, new_cht_number); + let new_cht_root: Option = cht::compute_root::(cht::SIZE, new_cht_number, (new_cht_start.as_()..) + .map(|num| self.hash(As::sa(num)).unwrap_or_default())); + + if let Some(new_cht_root) = new_cht_root { + transaction.put(columns::CHT, &number_to_db_key(new_cht_start), new_cht_root.as_ref()); + + let mut prune_block = new_cht_start; + let new_cht_end = cht::end_number(cht::SIZE, new_cht_number); + trace!(target: "db", "Replacing blocks [{}..{}] with CHT#{}", new_cht_start, new_cht_end, new_cht_number); + + while prune_block <= new_cht_end { + transaction.delete(columns::HEADER, &number_to_db_key(prune_block)); + prune_block += <::Header as HeaderT>::Number::one(); + } + } + } + debug!("Light DB Commit {:?} ({})", hash, number); self.db.write(transaction).map_err(db_err)?; self.update_meta(hash, number, is_new_best); @@ -197,6 +230,16 @@ impl LightBlockchainStorage for LightStorage Ok(()) } + fn cht_root(&self, cht_size: u64, block: <::Header as HeaderT>::Number) -> ClientResult { + let no_cht_for_block = || ClientErrorKind::Backend(format!("CHT for block {} not exists", block)).into(); + + let cht_number = cht::block_to_cht_number(cht_size, block).ok_or_else(no_cht_for_block)?; + let cht_start = cht::start_number(cht_size, cht_number); + self.db.get(columns::CHT, &number_to_db_key(cht_start)).map_err(db_err)? + .ok_or_else(no_cht_for_block) + .and_then(|hash| Block::Hash::decode(&mut &*hash).ok_or_else(no_cht_for_block)) + } + fn cache(&self) -> Option<&BlockchainCache> { Some(&self.cache) } @@ -204,6 +247,7 @@ impl LightBlockchainStorage for LightStorage #[cfg(test)] pub(crate) mod tests { + use client::cht; use runtime_primitives::testing::{H256 as Hash, Header, Block as RawBlock}; use super::*; @@ -289,4 +333,59 @@ pub(crate) mod tests { assert_eq!(db.db.iter(columns::HEADER).count(), 2); assert_eq!(db.db.iter(columns::BLOCK_INDEX).count(), 2); } + + #[test] + fn ancient_headers_are_replaced_with_cht() { + let db = LightStorage::new_test(); + + // insert genesis block header (never pruned) + let mut prev_hash = insert_block(&db, &Default::default(), 0, None); + + // insert SIZE blocks && ensure that nothing is pruned + for number in 0..cht::SIZE { + prev_hash = insert_block(&db, &prev_hash, 1 + number, None); + } + assert_eq!(db.db.iter(columns::HEADER).count(), (1 + cht::SIZE) as usize); + assert_eq!(db.db.iter(columns::CHT).count(), 0); + + // insert next SIZE blocks && ensure that nothing is pruned + for number in 0..cht::SIZE { + prev_hash = insert_block(&db, &prev_hash, 1 + cht::SIZE + number, None); + } + assert_eq!(db.db.iter(columns::HEADER).count(), (1 + cht::SIZE + cht::SIZE) as usize); + assert_eq!(db.db.iter(columns::CHT).count(), 0); + + // insert block #{2 * cht::SIZE + 1} && check that new CHT is created + headers of this CHT are pruned + insert_block(&db, &prev_hash, 1 + cht::SIZE + cht::SIZE, None); + assert_eq!(db.db.iter(columns::HEADER).count(), (1 + cht::SIZE + 1) as usize); + assert_eq!(db.db.iter(columns::CHT).count(), 1); + assert!((0..cht::SIZE).all(|i| db.db.get(columns::HEADER, &number_to_db_key(1 + i)).unwrap().is_none())); + } + + #[test] + fn get_cht_fails_for_genesis_block() { + assert!(LightStorage::::new_test().cht_root(cht::SIZE, 0).is_err()); + } + + #[test] + fn get_cht_fails_for_non_existant_cht() { + assert!(LightStorage::::new_test().cht_root(cht::SIZE, (cht::SIZE / 2) as u64).is_err()); + } + + #[test] + fn get_cht_works() { + let db = LightStorage::new_test(); + + // insert 1 + SIZE + SIZE + 1 blocks so that CHT#0 is created + let mut prev_hash = Default::default(); + for i in 0..1 + cht::SIZE + cht::SIZE + 1 { + prev_hash = insert_block(&db, &prev_hash, i as u64, None); + } + + let cht_root_1 = db.cht_root(cht::SIZE, cht::start_number(cht::SIZE, 0)).unwrap(); + let cht_root_2 = db.cht_root(cht::SIZE, (cht::start_number(cht::SIZE, 0) + cht::SIZE / 2) as u64).unwrap(); + let cht_root_3 = db.cht_root(cht::SIZE, cht::end_number(cht::SIZE, 0)).unwrap(); + assert_eq!(cht_root_1, cht_root_2); + assert_eq!(cht_root_2, cht_root_3); + } } diff --git a/substrate/client/src/blockchain.rs b/substrate/client/src/blockchain.rs index 333bc8f72e156..cb657bb97bb4f 100644 --- a/substrate/client/src/blockchain.rs +++ b/substrate/client/src/blockchain.rs @@ -31,6 +31,8 @@ pub trait HeaderBackend: Send + Sync { fn info(&self) -> Result>; /// Get block status. fn status(&self, id: BlockId) -> Result; + /// Get block number by hash. Returns `None` if the header is not in the chain. + fn number(&self, hash: Block::Hash) -> Result::Header as HeaderT>::Number>>; /// Get block hash by number. Returns `None` if the header is not in the chain. fn hash(&self, number: <::Header as HeaderT>::Number) -> Result::Header as HeaderT>::Hash>>; } diff --git a/substrate/client/src/cht.rs b/substrate/client/src/cht.rs new file mode 100644 index 0000000000000..9d56366e95ac1 --- /dev/null +++ b/substrate/client/src/cht.rs @@ -0,0 +1,261 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Canonical hash trie definitions and helper functions. +//! +//! Each CHT is a trie mapping block numbers to canonical hash. +//! One is generated for every `SIZE` blocks, allowing us to discard those blocks in +//! favor of the trie root. When the "ancient" blocks need to be accessed, we simply +//! request an inclusion proof of a specific block number against the trie with the +//! root has. A correct proof implies that the claimed block is identical to the one +//! we discarded. + +use triehash; + +use runtime_primitives::traits::{As, Header as HeaderT, SimpleArithmetic, One}; +use state_machine::backend::InMemory as InMemoryState; +use state_machine::{prove_read, read_proof_check}; + +use error::{Error as ClientError, ErrorKind as ClientErrorKind, Result as ClientResult}; + +/// The size of each CHT. This value is passed to every CHT-related function from +/// production code. Other values are passed from tests. +pub const SIZE: u64 = 2048; + +/// Returns Some(cht_number) if CHT is need to be built when the block with given number is canonized. +pub fn is_build_required(cht_size: u64, block_num: N) -> Option + where + N: Clone + SimpleArithmetic, +{ + let block_cht_num = block_to_cht_number(cht_size, block_num.clone())?; + let two = N::one() + N::one(); + if block_cht_num < two { + return None; + } + let cht_start = start_number(cht_size, block_cht_num.clone()); + if cht_start != block_num { + return None; + } + + Some(block_cht_num - two) +} + +/// Compute a CHT root from an iterator of block hashes. Fails if shorter than +/// SIZE items. The items are assumed to proceed sequentially from `start_number(cht_num)`. +/// Discards the trie's nodes. +pub fn compute_root( + cht_size: u64, + cht_num: Header::Number, + hashes: I, +) -> Option + where + Header: HeaderT, + Header::Hash: From<[u8; 32]>, + I: IntoIterator>, +{ + build_pairs::(cht_size, cht_num, hashes) + .map(|pairs| triehash::trie_root(pairs).0.into()) +} + +/// Build CHT-based header proof. +pub fn build_proof( + cht_size: u64, + cht_num: Header::Number, + block_num: Header::Number, + hashes: I +) -> Option>> + where + Header: HeaderT, + I: IntoIterator>, +{ + let transaction = build_pairs::(cht_size, cht_num, hashes)? + .into_iter() + .map(|(k, v)| (k, Some(v))) + .collect::>(); + let storage = InMemoryState::default().update(transaction); + let (value, proof) = prove_read(storage, &encode_cht_key(block_num)).ok()?; + if value.is_none() { + None + } else { + Some(proof) + } +} + +/// Check CHT-based header proof. +pub fn check_proof
( + local_root: Header::Hash, + local_number: Header::Number, + remote_hash: Header::Hash, + remote_proof: Vec> +) -> ClientResult<()> + where + Header: HeaderT, + Header::Hash: From<[u8; 32]> + Into<[u8; 32]>, +{ + let local_cht_key = encode_cht_key(local_number); + let local_cht_value = read_proof_check(local_root.into(), remote_proof, &local_cht_key).map_err(|e| ClientError::from(e))?; + let local_cht_value = local_cht_value.ok_or_else(|| ClientErrorKind::InvalidHeaderProof)?; + let local_hash: Header::Hash = decode_cht_value(&local_cht_value).ok_or_else(|| ClientErrorKind::InvalidHeaderProof)?; + match local_hash == remote_hash { + true => Ok(()), + false => Err(ClientErrorKind::InvalidHeaderProof.into()), + } +} + +/// Build pairs for computing CHT. +fn build_pairs( + cht_size: u64, + cht_num: Header::Number, + hashes: I +) -> Option, Vec)>> + where + Header: HeaderT, + I: IntoIterator>, +{ + let start_num = start_number(cht_size, cht_num); + let mut pairs = Vec::new(); + let mut hash_number = start_num; + for hash in hashes.into_iter().take(cht_size as usize) { + pairs.push(hash.map(|hash| ( + encode_cht_key(hash_number).to_vec(), + encode_cht_value(hash) + ))?); + hash_number += Header::Number::one(); + } + + if pairs.len() as u64 == cht_size { + Some(pairs) + } else { + None + } +} + +/// Get the starting block of a given CHT. +/// CHT 0 includes block 1...SIZE, +/// CHT 1 includes block SIZE + 1 ... 2*SIZE +/// More generally: CHT N includes block (1 + N*SIZE)...((N+1)*SIZE). +/// This is because the genesis hash is assumed to be known +/// and including it would be redundant. +pub fn start_number(cht_size: u64, cht_num: N) -> N { + (cht_num * As::sa(cht_size)) + N::one() +} + +/// Get the ending block of a given CHT. +pub fn end_number(cht_size: u64, cht_num: N) -> N { + (cht_num + N::one()) * As::sa(cht_size) +} + +/// Convert a block number to a CHT number. +/// Returns `None` for `block_num` == 0, `Some` otherwise. +pub fn block_to_cht_number(cht_size: u64, block_num: N) -> Option { + if block_num == N::zero() { + None + } else { + Some((block_num - N::one()) / As::sa(cht_size)) + } +} + +/// Convert header number into CHT key. +pub fn encode_cht_key>(number: N) -> Vec { + let number: u64 = number.as_(); + vec![ + (number >> 56) as u8, + ((number >> 48) & 0xff) as u8, + ((number >> 40) & 0xff) as u8, + ((number >> 32) & 0xff) as u8, + ((number >> 24) & 0xff) as u8, + ((number >> 16) & 0xff) as u8, + ((number >> 8) & 0xff) as u8, + (number & 0xff) as u8 + ] +} + +/// Convert header hash into CHT value. +fn encode_cht_value>(hash: Hash) -> Vec { + hash.as_ref().to_vec() +} + +/// Convert CHT value into block header hash. +pub fn decode_cht_value>(value: &[u8]) -> Option { + match value.len() { + 32 => { + let mut hash_array: [u8; 32] = Default::default(); + hash_array.clone_from_slice(&value[0..32]); + Some(hash_array.into()) + }, + _ => None, + } + +} + +#[cfg(test)] +mod tests { + use test_client::runtime::Header; + use super::*; + + #[test] + fn is_build_required_works() { + assert_eq!(is_build_required(SIZE, 0), None); + assert_eq!(is_build_required(SIZE, 1), None); + assert_eq!(is_build_required(SIZE, SIZE), None); + assert_eq!(is_build_required(SIZE, SIZE + 1), None); + assert_eq!(is_build_required(SIZE, 2 * SIZE), None); + assert_eq!(is_build_required(SIZE, 2 * SIZE + 1), Some(0)); + assert_eq!(is_build_required(SIZE, 3 * SIZE), None); + assert_eq!(is_build_required(SIZE, 3 * SIZE + 1), Some(1)); + } + + #[test] + fn start_number_works() { + assert_eq!(start_number(SIZE, 0), 1); + assert_eq!(start_number(SIZE, 1), SIZE + 1); + assert_eq!(start_number(SIZE, 2), SIZE + SIZE + 1); + } + + #[test] + fn end_number_works() { + assert_eq!(end_number(SIZE, 0), SIZE); + assert_eq!(end_number(SIZE, 1), SIZE + SIZE); + assert_eq!(end_number(SIZE, 2), SIZE + SIZE + SIZE); + } + + #[test] + fn build_pairs_fails_when_no_enough_blocks() { + assert!(build_pairs::(SIZE, 0, vec![Some(1.into()); SIZE as usize / 2]).is_none()); + } + + #[test] + fn build_pairs_fails_when_missing_block() { + assert!(build_pairs::(SIZE, 0, ::std::iter::repeat(Some(1.into())).take(SIZE as usize / 2) + .chain(::std::iter::once(None)) + .chain(::std::iter::repeat(Some(2.into())).take(SIZE as usize / 2 - 1))).is_none()); + } + + #[test] + fn compute_root_works() { + assert!(compute_root::(SIZE, 42, vec![Some(1.into()); SIZE as usize]).is_some()); + } + + #[test] + fn build_proof_fails_when_querying_wrong_block() { + assert!(build_proof::(SIZE, 0, (SIZE * 1000) as u64, vec![Some(1.into()); SIZE as usize]).is_none()); + } + + #[test] + fn build_proof_works() { + assert!(build_proof::(SIZE, 0, (SIZE / 2) as u64, vec![Some(1.into()); SIZE as usize]).is_some()); + } +} diff --git a/substrate/client/src/client.rs b/substrate/client/src/client.rs index 781f3aaab0147..f599b1029d371 100644 --- a/substrate/client/src/client.rs +++ b/substrate/client/src/client.rs @@ -34,7 +34,7 @@ use backend::{self, BlockImportOperation}; use blockchain::{self, Info as ChainInfo, Backend as ChainBackend, HeaderBackend as ChainHeaderBackend}; use call_executor::{CallExecutor, LocalCallExecutor}; use executor::{RuntimeVersion, RuntimeInfo}; -use {error, in_mem, block_builder, runtime_io, bft, genesis}; +use {cht, error, in_mem, block_builder, runtime_io, bft, genesis}; /// Type that implements `futures::Stream` of block import events. pub type BlockchainEventStream = mpsc::UnboundedReceiver>; @@ -252,6 +252,24 @@ impl Client where self.state_at(id).and_then(|state| self.executor.prove_at_state(state, &mut Default::default(), method, call_data)) } + /// Reads given header and generates CHT-based header proof. + pub fn header_proof(&self, id: &BlockId) -> error::Result<(Block::Header, Vec>)> { + self.header_proof_with_cht_size(id, cht::SIZE) + } + + /// Reads given header and generates CHT-based header proof for CHT of given size. + pub fn header_proof_with_cht_size(&self, id: &BlockId, cht_size: u64) -> error::Result<(Block::Header, Vec>)> { + let proof_error = || error::ErrorKind::Backend(format!("Failed to generate header proof for {:?}", id)); + let header = self.header(id)?.ok_or_else(|| error::ErrorKind::UnknownBlock(format!("{:?}", id)))?; + let block_num = *header.number(); + let cht_num = cht::block_to_cht_number(cht_size, block_num).ok_or_else(proof_error)?; + let cht_start = cht::start_number(cht_size, cht_num); + let headers = (cht_start.as_()..).map(|num| self.block_hash(As::sa(num)).unwrap_or_default()); + let proof = cht::build_proof::(cht_size, cht_num, block_num, headers) + .ok_or_else(proof_error)?; + Ok((header, proof)) + } + /// Set up the native execution environment to call into a native runtime code. pub fn using_environment T, T>( &self, f: F diff --git a/substrate/client/src/error.rs b/substrate/client/src/error.rs index 26964334f3d87..17a57e40b7327 100644 --- a/substrate/client/src/error.rs +++ b/substrate/client/src/error.rs @@ -101,6 +101,12 @@ error_chain! { display("This method is not currently available when running in light client mode"), } + /// Invalid remote header proof. + InvalidHeaderProof { + description("invalid header proof"), + display("Remote node has responded with invalid header proof"), + } + /// Invalid remote execution proof. InvalidExecutionProof { description("invalid execution proof"), diff --git a/substrate/client/src/in_mem.rs b/substrate/client/src/in_mem.rs index a12323610a723..a1c2b5f6a5222 100644 --- a/substrate/client/src/in_mem.rs +++ b/substrate/client/src/in_mem.rs @@ -86,6 +86,7 @@ struct BlockchainStorage { best_hash: Block::Hash, best_number: <::Header as HeaderT>::Number, genesis_hash: Block::Hash, + cht_roots: HashMap, Block::Hash>, } /// In-memory blockchain. Supports concurrent reads. @@ -130,6 +131,7 @@ impl Blockchain { best_hash: Default::default(), best_number: Zero::zero(), genesis_hash: Default::default(), + cht_roots: HashMap::new(), })); Blockchain { storage: storage.clone(), @@ -176,6 +178,11 @@ impl Blockchain { && this.best_number == other.best_number && this.genesis_hash == other.genesis_hash } + + /// Insert CHT root. + pub fn insert_cht_root(&self, block: NumberFor, cht_root: Block::Hash) { + self.storage.write().cht_roots.insert(block, cht_root); + } } impl blockchain::HeaderBackend for Blockchain { @@ -201,6 +208,10 @@ impl blockchain::HeaderBackend for Blockchain { } } + fn number(&self, hash: Block::Hash) -> error::Result>> { + Ok(self.storage.read().blocks.get(&hash).map(|b| *b.header().number())) + } + fn hash(&self, number: <::Header as HeaderT>::Number) -> error::Result> { Ok(self.id(BlockId::Number(number))) } @@ -226,7 +237,10 @@ impl blockchain::Backend for Blockchain { } } -impl light::blockchain::Storage for Blockchain { +impl light::blockchain::Storage for Blockchain + where + Block::Hash: From<[u8; 32]>, +{ fn import_header( &self, is_new_best: bool, @@ -242,6 +256,11 @@ impl light::blockchain::Storage for Blockchain { Ok(()) } + fn cht_root(&self, _cht_size: u64, block: NumberFor) -> error::Result { + self.storage.read().cht_roots.get(&block).cloned() + .ok_or_else(|| error::ErrorKind::Backend(format!("CHT for block {} not exists", block)).into()) + } + fn cache(&self) -> Option<&blockchain::Cache> { Some(&self.cache) } diff --git a/substrate/client/src/lib.rs b/substrate/client/src/lib.rs index 329d5a5ac02df..122cd18a4f42a 100644 --- a/substrate/client/src/lib.rs +++ b/substrate/client/src/lib.rs @@ -44,6 +44,7 @@ extern crate triehash; pub mod error; pub mod blockchain; pub mod backend; +pub mod cht; pub mod in_mem; pub mod genesis; pub mod block_builder; diff --git a/substrate/client/src/light/backend.rs b/substrate/client/src/light/backend.rs index 6851b54d1a62b..5d6d33f7a7a1c 100644 --- a/substrate/client/src/light/backend.rs +++ b/substrate/client/src/light/backend.rs @@ -182,60 +182,3 @@ impl TryIntoStateTrieBackend for OnDemandState where Block: None } } - -#[cfg(test)] -pub mod tests { - use futures::future::{ok, err, FutureResult}; - use parking_lot::Mutex; - use call_executor::CallResult; - use executor::{self, NativeExecutionDispatch}; - use error::Error as ClientError; - use test_client::{self, runtime::{Hash, Block}}; - use in_mem::{Blockchain as InMemoryBlockchain}; - use light::{new_fetch_checker, new_light_blockchain}; - use light::fetcher::{Fetcher, FetchChecker, LightDataChecker, RemoteCallRequest}; - use super::*; - - pub type OkCallFetcher = Mutex; - - impl Fetcher for OkCallFetcher { - type RemoteReadResult = FutureResult>, ClientError>; - type RemoteCallResult = FutureResult; - - fn remote_read(&self, _request: RemoteReadRequest) -> Self::RemoteReadResult { - err("Not implemented on test node".into()) - } - - fn remote_call(&self, _request: RemoteCallRequest) -> Self::RemoteCallResult { - ok((*self.lock()).clone()) - } - } - - #[test] - fn storage_read_proof_is_generated_and_checked() { - // prepare remote client - let remote_client = test_client::new(); - let remote_block_id = BlockId::Number(0); - let remote_block_hash = remote_client.block_hash(0).unwrap().unwrap(); - let mut remote_block_header = remote_client.header(&remote_block_id).unwrap().unwrap(); - remote_block_header.state_root = remote_client.state_at(&remote_block_id).unwrap().storage_root(::std::iter::empty()).0.into(); - - // 'fetch' read proof from remote node - let authorities_len = remote_client.authorities_at(&remote_block_id).unwrap().len(); - let remote_read_proof = remote_client.read_proof(&remote_block_id, b":auth:len").unwrap(); - - // check remote read proof locally - let local_storage = InMemoryBlockchain::::new(); - local_storage.insert(remote_block_hash, remote_block_header, None, None, true); - let local_executor = test_client::LocalExecutor::with_heap_pages(8, 8); - let local_checker: LightDataChecker< - InMemoryBlockchain, - executor::NativeExecutor, - OkCallFetcher - > = new_fetch_checker(new_light_blockchain(local_storage), local_executor); - assert_eq!(local_checker.check_read_proof(&RemoteReadRequest { - block: remote_block_hash, - key: b":auth:len".to_vec(), - }, remote_read_proof).unwrap().unwrap()[0], authorities_len as u8); - } -} diff --git a/substrate/client/src/light/blockchain.rs b/substrate/client/src/light/blockchain.rs index 4d9e7ca38dca7..443f92f78c197 100644 --- a/substrate/client/src/light/blockchain.rs +++ b/substrate/client/src/light/blockchain.rs @@ -18,16 +18,17 @@ //! blocks. CHT roots are stored for headers of ancient blocks. use std::sync::Weak; +use futures::{Future, IntoFuture}; use parking_lot::Mutex; use primitives::AuthorityId; use runtime_primitives::{bft::Justification, generic::BlockId}; -use runtime_primitives::traits::{Block as BlockT, Header as HeaderT}; +use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero}; use blockchain::{Backend as BlockchainBackend, BlockStatus, Cache as BlockchainCache, HeaderBackend as BlockchainHeaderBackend, Info as BlockchainInfo}; -use error::Result as ClientResult; -use light::fetcher::Fetcher; +use error::{ErrorKind as ClientErrorKind, Result as ClientResult}; +use light::fetcher::{Fetcher, RemoteHeaderRequest}; /// Light client blockchain storage. pub trait Storage: BlockchainHeaderBackend { @@ -39,6 +40,9 @@ pub trait Storage: BlockchainHeaderBackend { authorities: Option> ) -> ClientResult<()>; + /// Get CHT root for given block. Fails if the block is not pruned (not a part of any CHT). + fn cht_root(&self, cht_size: u64, block: NumberFor) -> ClientResult; + /// Get storage cache. fn cache(&self) -> Option<&BlockchainCache>; } @@ -76,7 +80,30 @@ impl Blockchain { impl BlockchainHeaderBackend for Blockchain where Block: BlockT, S: Storage, F: Fetcher { fn header(&self, id: BlockId) -> ClientResult> { - self.storage.header(id) + match self.storage.header(id)? { + Some(header) => Ok(Some(header)), + None => { + let number = match id { + BlockId::Hash(hash) => match self.storage.number(hash)? { + Some(number) => number, + None => return Ok(None), + }, + BlockId::Number(number) => number, + }; + + // if the header is from future or genesis (we never prune genesis) => return + if number.is_zero() || self.storage.status(BlockId::Number(number))? != BlockStatus::InChain { + return Ok(None); + } + + self.fetcher().upgrade().ok_or(ClientErrorKind::NotAvailableOnLightClient)? + .remote_header(RemoteHeaderRequest { + block: number, + }) + .into_future().wait() + .map(Some) + } + } } fn info(&self) -> ClientResult> { @@ -87,6 +114,10 @@ impl BlockchainHeaderBackend for Blockchain where Bloc self.storage.status(id) } + fn number(&self, hash: Block::Hash) -> ClientResult>> { + self.storage.number(hash) + } + fn hash(&self, number: <::Header as HeaderT>::Number) -> ClientResult> { self.storage.hash(number) } diff --git a/substrate/client/src/light/fetcher.rs b/substrate/client/src/light/fetcher.rs index 5cda7d766e8f2..ee8874adad98c 100644 --- a/substrate/client/src/light/fetcher.rs +++ b/substrate/client/src/light/fetcher.rs @@ -20,10 +20,11 @@ use std::sync::Arc; use futures::IntoFuture; use runtime_primitives::generic::BlockId; -use runtime_primitives::traits::{Block as BlockT, Header as HeaderT}; +use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor}; use state_machine::{CodeExecutor, read_proof_check}; use call_executor::CallResult; +use cht; use error::{Error as ClientError, ErrorKind as ClientErrorKind, Result as ClientResult}; use light::blockchain::{Blockchain, Storage as BlockchainStorage}; use light::call_executor::check_execution_proof; @@ -40,6 +41,13 @@ pub struct RemoteCallRequest { pub call_data: Vec, } +/// Remote canonical header request. +#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)] +pub struct RemoteHeaderRequest { + /// Number of the header to query. + pub block: Number, +} + /// Remote storage read request. #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct RemoteReadRequest { @@ -52,11 +60,15 @@ pub struct RemoteReadRequest { /// Light client data fetcher. Implementations of this trait must check if remote data /// is correct (see FetchedDataChecker) and return already checked data. pub trait Fetcher: Send + Sync { + /// Remote header future. + type RemoteHeaderResult: IntoFuture; /// Remote storage read future. type RemoteReadResult: IntoFuture>, Error=ClientError>; /// Remote call result future. type RemoteCallResult: IntoFuture; + /// Fetch remote header. + fn remote_header(&self, request: RemoteHeaderRequest>) -> Self::RemoteHeaderResult; /// Fetch remote storage value. fn remote_read(&self, request: RemoteReadRequest) -> Self::RemoteReadResult; /// Fetch remote call result. @@ -65,6 +77,13 @@ pub trait Fetcher: Send + Sync { /// Light client remote data checker. pub trait FetchChecker: Send + Sync { + /// Check remote header proof. + fn check_header_proof( + &self, + request: &RemoteHeaderRequest>, + header: Option, + remote_proof: Vec> + ) -> ClientResult; /// Check remote storage read proof. fn check_read_proof( &self, @@ -81,6 +100,7 @@ pub trait FetchChecker: Send + Sync { /// Remote data checker. pub struct LightDataChecker { + cht_size: u64, blockchain: Arc>, executor: E, } @@ -89,6 +109,17 @@ impl LightDataChecker { /// Create new light data checker. pub fn new(blockchain: Arc>, executor: E) -> Self { Self { + cht_size: cht::SIZE, + blockchain, + executor, + } + } + + /// Create new light data checker with given cht size. + #[cfg(test)] + pub fn with_cht_size(cht_size: u64, blockchain: Arc>, executor: E) -> Self { + Self { + cht_size, blockchain, executor, } @@ -103,11 +134,24 @@ impl LightDataChecker { impl FetchChecker for LightDataChecker where Block: BlockT, - Block::Hash: Into<[u8; 32]>, + Block::Hash: From<[u8; 32]> + Into<[u8; 32]>, S: BlockchainStorage, E: CodeExecutor, F: Fetcher, { + fn check_header_proof( + &self, + request: &RemoteHeaderRequest>, + remote_header: Option, + remote_proof: Vec> + ) -> ClientResult { + let remote_header = remote_header.ok_or_else(|| ClientError::from(ClientErrorKind::InvalidHeaderProof))?; + let remote_header_hash = remote_header.hash(); + let local_cht_root = self.blockchain.storage().cht_root(self.cht_size, request.block)?; + cht::check_proof::(local_cht_root, request.block, remote_header_hash, remote_proof) + .map(|_| remote_header) + } + fn check_read_proof( &self, request: &RemoteReadRequest, @@ -127,3 +171,158 @@ impl FetchChecker for LightDataChecker check_execution_proof(&*self.blockchain, &self.executor, request, remote_proof) } } + +#[cfg(test)] +pub mod tests { + use futures::future::{ok, err, FutureResult}; + use parking_lot::Mutex; + use call_executor::CallResult; + use executor::{self, NativeExecutionDispatch}; + use error::Error as ClientError; + use test_client::{self, TestClient, runtime::{Hash, Block, Header}}; + use test_client::client::BlockOrigin; + use in_mem::{Blockchain as InMemoryBlockchain}; + use light::new_light_blockchain; + use light::fetcher::{Fetcher, FetchChecker, LightDataChecker, + RemoteCallRequest, RemoteHeaderRequest}; + use state_machine::Backend; + use super::*; + + pub type OkCallFetcher = Mutex; + + impl Fetcher for OkCallFetcher { + type RemoteHeaderResult = FutureResult; + type RemoteReadResult = FutureResult>, ClientError>; + type RemoteCallResult = FutureResult; + + fn remote_header(&self, _request: RemoteHeaderRequest>) -> Self::RemoteHeaderResult { + err("Not implemented on test node".into()) + } + + fn remote_read(&self, _request: RemoteReadRequest) -> Self::RemoteReadResult { + err("Not implemented on test node".into()) + } + + fn remote_call(&self, _request: RemoteCallRequest) -> Self::RemoteCallResult { + ok((*self.lock()).clone()) + } + } + + fn prepare_for_read_proof_check(insert_header: bool) -> ( + LightDataChecker< + InMemoryBlockchain, + executor::NativeExecutor, + OkCallFetcher>, + Hash, Vec>, usize) + { + // prepare remote client + let remote_client = test_client::new(); + let remote_block_id = BlockId::Number(0); + let remote_block_hash = remote_client.block_hash(0).unwrap().unwrap(); + let mut remote_block_header = remote_client.header(&remote_block_id).unwrap().unwrap(); + remote_block_header.state_root = remote_client.state_at(&remote_block_id).unwrap().storage_root(::std::iter::empty()).0.into(); + + // 'fetch' read proof from remote node + let authorities_len = remote_client.authorities_at(&remote_block_id).unwrap().len(); + let remote_read_proof = remote_client.read_proof(&remote_block_id, b":auth:len").unwrap(); + + // check remote read proof locally + let local_storage = InMemoryBlockchain::::new(); + if insert_header { + local_storage.insert(remote_block_hash, remote_block_header, None, None, true); + } + let local_executor = test_client::LocalExecutor::with_heap_pages(8, 8); + let local_checker: LightDataChecker< + InMemoryBlockchain, + executor::NativeExecutor, + OkCallFetcher + > = LightDataChecker::with_cht_size(4, new_light_blockchain(local_storage), local_executor); + (local_checker, remote_block_hash, remote_read_proof, authorities_len) + } + + fn prepare_for_header_proof_check(insert_cht: bool) -> ( + LightDataChecker< + InMemoryBlockchain, + executor::NativeExecutor, + OkCallFetcher>, + Header, Vec>) + { + // prepare remote client + let remote_client = test_client::new(); + let mut local_headers_hashes = Vec::new(); + for i in 0..4 { + let builder = remote_client.new_block().unwrap(); + remote_client.justify_and_import(BlockOrigin::Own, builder.bake().unwrap()).unwrap(); + local_headers_hashes.push(remote_client.block_hash(i + 1).unwrap()); + } + + // 'fetch' header proof from remote node + let remote_block_id = BlockId::Number(1); + let (remote_block_header, remote_header_proof) = remote_client.header_proof_with_cht_size(&remote_block_id, 4).unwrap(); + + // check remote read proof locally + let local_storage = InMemoryBlockchain::::new(); + if insert_cht { + let local_cht_root = cht::compute_root::(4, 0, local_headers_hashes.into_iter()).unwrap(); + local_storage.insert_cht_root(1, local_cht_root); + } + let local_executor = test_client::LocalExecutor::with_heap_pages(8, 8); + let local_checker: LightDataChecker< + InMemoryBlockchain, + executor::NativeExecutor, + OkCallFetcher + > = LightDataChecker::with_cht_size(4, new_light_blockchain(local_storage), local_executor); + (local_checker, remote_block_header, remote_header_proof) + } + + #[test] + fn storage_read_proof_is_generated_and_checked() { + let (local_checker, remote_block_hash, remote_read_proof, authorities_len) = prepare_for_read_proof_check(true); + assert_eq!(local_checker.check_read_proof(&RemoteReadRequest { + block: remote_block_hash, + key: b":auth:len".to_vec(), + }, remote_read_proof).unwrap().unwrap()[0], authorities_len as u8); + } + + #[test] + fn check_read_proof_fails_if_header_is_unknown() { + let (local_checker, remote_block_hash, remote_read_proof, _) = prepare_for_read_proof_check(false); + assert!(local_checker.check_read_proof(&RemoteReadRequest { + block: remote_block_hash, + key: b":auth:len".to_vec(), + }, remote_read_proof).is_err()); + } + + #[test] + fn header_proof_is_generated_and_checked() { + let (local_checker, remote_block_header, remote_header_proof) = prepare_for_header_proof_check(true); + assert_eq!(local_checker.check_header_proof(&RemoteHeaderRequest { + block: 1, + }, Some(remote_block_header.clone()), remote_header_proof).unwrap(), remote_block_header); + } + + #[test] + fn check_header_proof_fails_if_header_is_none() { + let (local_checker, _, remote_header_proof) = prepare_for_header_proof_check(true); + assert!(local_checker.check_header_proof(&RemoteHeaderRequest { + block: 1, + }, None, remote_header_proof).is_err()); + } + + #[test] + fn check_header_proof_fails_if_local_cht_is_unknown() { + let (local_checker, remote_block_header, remote_header_proof) = prepare_for_header_proof_check(false); + assert!(local_checker.check_header_proof(&RemoteHeaderRequest { + block: 1, + }, Some(remote_block_header.clone()), remote_header_proof).is_err()); + } + + #[test] + fn check_header_proof_fails_if_invalid_header_provided() { + let (local_checker, mut remote_block_header, remote_header_proof) = prepare_for_header_proof_check(true); + remote_block_header.number = 100; + assert!(local_checker.check_header_proof(&RemoteHeaderRequest { + block: 1, + }, Some(remote_block_header.clone()), remote_header_proof).is_err()); + } +} diff --git a/substrate/network/src/chain.rs b/substrate/network/src/chain.rs index d698ab0900aa7..46e90100fe71d 100644 --- a/substrate/network/src/chain.rs +++ b/substrate/network/src/chain.rs @@ -45,6 +45,9 @@ pub trait Client: Send + Sync { /// Get block justification. fn justification(&self, id: &BlockId) -> Result>, Error>; + /// Get block header proof. + fn header_proof(&self, block_number: ::Number) -> Result<(Block::Header, Vec>), Error>; + /// Get storage read execution proof. fn read_proof(&self, block: &Block::Hash, key: &[u8]) -> Result>, Error>; @@ -87,6 +90,10 @@ impl Client for SubstrateClient where (self as &SubstrateClient).justification(id) } + fn header_proof(&self, block_number: ::Number) -> Result<(Block::Header, Vec>), Error> { + (self as &SubstrateClient).header_proof(&BlockId::Number(block_number)) + } + fn read_proof(&self, block: &Block::Hash, key: &[u8]) -> Result>, Error> { (self as &SubstrateClient).read_proof(&BlockId::Hash(block.clone()), key) } diff --git a/substrate/network/src/message.rs b/substrate/network/src/message.rs index d2bbcdf3e7581..a557dfcafaf93 100644 --- a/substrate/network/src/message.rs +++ b/substrate/network/src/message.rs @@ -20,7 +20,8 @@ use runtime_primitives::traits::{Block as BlockT, Header as HeaderT}; use codec::{Encode, Decode, Input, Output}; pub use self::generic::{ BlockAnnounce, RemoteCallRequest, RemoteReadRequest, - ConsensusVote, SignedConsensusVote, FromBlock + RemoteHeaderRequest, RemoteHeaderResponse, ConsensusVote, + SignedConsensusVote, FromBlock }; /// A unique ID of a request. @@ -480,6 +481,10 @@ pub mod generic { RemoteReadRequest(RemoteReadRequest), /// Remote storage read response. RemoteReadResponse(RemoteReadResponse), + /// Remote header request. + RemoteHeaderRequest(RemoteHeaderRequest), + /// Remote header response. + RemoteHeaderResponse(RemoteHeaderResponse
), /// Chain-specific message ChainSpecific(Vec), } @@ -529,6 +534,14 @@ pub mod generic { dest.push_byte(9); dest.push(m); } + Message::RemoteHeaderRequest(ref m) => { + dest.push_byte(10); + dest.push(m); + } + Message::RemoteHeaderResponse(ref m) => { + dest.push_byte(11); + dest.push(m); + } Message::ChainSpecific(ref m) => { dest.push_byte(255); dest.push(m); @@ -552,6 +565,8 @@ pub mod generic { 7 => Some(Message::RemoteCallResponse(Decode::decode(input)?)), 8 => Some(Message::RemoteReadRequest(Decode::decode(input)?)), 9 => Some(Message::RemoteReadResponse(Decode::decode(input)?)), + 10 => Some(Message::RemoteHeaderRequest(Decode::decode(input)?)), + 11 => Some(Message::RemoteHeaderResponse(Decode::decode(input)?)), 255 => Some(Message::ChainSpecific(Decode::decode(input)?)), _ => None, } @@ -751,4 +766,58 @@ pub mod generic { }) } } + + #[derive(Debug, PartialEq, Eq, Clone)] + /// Remote header request. + pub struct RemoteHeaderRequest { + /// Unique request id. + pub id: RequestId, + /// Block number to request header for. + pub block: N, + } + + impl Encode for RemoteHeaderRequest { + fn encode_to(&self, dest: &mut T) { + dest.push(&self.id); + dest.push(&self.block); + } + } + + impl Decode for RemoteHeaderRequest { + fn decode(input: &mut I) -> Option { + Some(RemoteHeaderRequest { + id: Decode::decode(input)?, + block: Decode::decode(input)?, + }) + } + } + + #[derive(Debug, PartialEq, Eq, Clone)] + /// Remote header response. + pub struct RemoteHeaderResponse
{ + /// Id of a request this response was made for. + pub id: RequestId, + /// Header. None if proof generation has failed (e.g. header is unknown). + pub header: Option
, + /// Header proof. + pub proof: Vec>, + } + + impl Encode for RemoteHeaderResponse
{ + fn encode_to(&self, dest: &mut T) { + dest.push(&self.id); + dest.push(&self.header); + dest.push(&self.proof); + } + } + + impl Decode for RemoteHeaderResponse
{ + fn decode(input: &mut I) -> Option { + Some(RemoteHeaderResponse { + id: Decode::decode(input)?, + header: Decode::decode(input)?, + proof: Decode::decode(input)?, + }) + } + } } diff --git a/substrate/network/src/on_demand.rs b/substrate/network/src/on_demand.rs index 069cd84c5c658..9c032b3822a5e 100644 --- a/substrate/network/src/on_demand.rs +++ b/substrate/network/src/on_demand.rs @@ -25,12 +25,13 @@ use linked_hash_map::LinkedHashMap; use linked_hash_map::Entry; use parking_lot::Mutex; use client; -use client::light::fetcher::{Fetcher, FetchChecker, RemoteCallRequest, RemoteReadRequest}; +use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest, + RemoteCallRequest, RemoteReadRequest}; use io::SyncIo; use message; use network_libp2p::{Severity, NodeIndex}; use service; -use runtime_primitives::traits::{Block as BlockT, Header as HeaderT}; +use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor}; /// Remote request timeout. const REQUEST_TIMEOUT: Duration = Duration::from_secs(15); @@ -46,6 +47,14 @@ pub trait OnDemandService: Send + Sync { /// Maintain peers requests. fn maintain_peers(&self, io: &mut SyncIo); + /// When header response is received from remote node. + fn on_remote_header_response( + &self, + io: &mut SyncIo, + peer: NodeIndex, + response: message::RemoteHeaderResponse + ); + /// When read response is received from remote node. fn on_remote_read_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteReadResponse); @@ -80,6 +89,7 @@ struct Request { } enum RequestData { + RemoteHeader(RemoteHeaderRequest>, Sender>), RemoteRead(RemoteReadRequest, Sender>, client::error::Error>>), RemoteCall(RemoteCallRequest, Sender>), } @@ -198,6 +208,20 @@ impl OnDemandService for OnDemand where core.dispatch(); } + fn on_remote_header_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteHeaderResponse) { + self.accept_response("header", io, peer, response.id, |request| match request.data { + RequestData::RemoteHeader(request, sender) => match self.checker.check_header_proof(&request, response.header, response.proof) { + Ok(response) => { + // we do not bother if receiver has been dropped already + let _ = sender.send(Ok(response)); + Accept::Ok + }, + Err(error) => Accept::CheckFailed(error, RequestData::RemoteHeader(request, sender)), + }, + data @ _ => Accept::Unexpected(data), + }) + } + fn on_remote_read_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteReadResponse) { self.accept_response("read", io, peer, response.id, |request| match request.data { RequestData::RemoteRead(request, sender) => match self.checker.check_read_proof(&request, response.proof) { @@ -232,9 +256,16 @@ impl Fetcher for OnDemand where E: service::ExecuteInContext, B::Header: HeaderT, { + type RemoteHeaderResult = RemoteResponse; type RemoteReadResult = RemoteResponse>>; type RemoteCallResult = RemoteResponse; + fn remote_header(&self, request: RemoteHeaderRequest>) -> Self::RemoteHeaderResult { + let (sender, receiver) = channel(); + self.schedule_request(RequestData::RemoteHeader(request, sender), + RemoteResponse { receiver }) + } + fn remote_read(&self, request: RemoteReadRequest) -> Self::RemoteReadResult { let (sender, receiver) = channel(); self.schedule_request(RequestData::RemoteRead(request, sender), @@ -332,6 +363,11 @@ impl OnDemandCore where impl Request { pub fn message(&self) -> message::Message { match self.data { + RequestData::RemoteHeader(ref data, _) => message::generic::Message::RemoteHeaderRequest( + message::RemoteHeaderRequest { + id: self.id, + block: data.block, + }), RequestData::RemoteRead(ref data, _) => message::generic::Message::RemoteReadRequest( message::RemoteReadRequest { id: self.id, @@ -357,13 +393,14 @@ pub mod tests { use futures::Future; use parking_lot::RwLock; use client; - use client::light::fetcher::{Fetcher, FetchChecker, RemoteCallRequest, RemoteReadRequest}; + use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest, + RemoteCallRequest, RemoteReadRequest}; use message; use network_libp2p::NodeIndex; use service::{Roles, ExecuteInContext}; use test::TestIo; use super::{REQUEST_TIMEOUT, OnDemand, OnDemandService}; - use test_client::runtime::{Block, Hash}; + use test_client::runtime::{Block, Hash, Header}; pub struct DummyExecutor; struct DummyFetchChecker { ok: bool } @@ -373,9 +410,21 @@ pub mod tests { } impl FetchChecker for DummyFetchChecker { + fn check_header_proof( + &self, + _request: &RemoteHeaderRequest, + header: Option
, + _remote_proof: Vec> + ) -> client::error::Result
{ + match self.ok { + true if header.is_some() => Ok(header.unwrap()), + _ => Err(client::error::ErrorKind::Backend("Test error".into()).into()), + } + } + fn check_read_proof(&self, _request: &RemoteReadRequest, _remote_proof: Vec>) -> client::error::Result>> { match self.ok { - true => Ok(Some(vec![42])), + true => Ok(Some(vec![41])), false => Err(client::error::ErrorKind::Backend("Test error".into()).into()), } } @@ -488,7 +537,7 @@ pub mod tests { } #[test] - fn receives_remote_response() { + fn receives_remote_call_response() { let (_x, on_demand) = dummy(true); let queue = RwLock::new(VecDeque::new()); let mut network = TestIo::new(&queue, None); @@ -503,4 +552,51 @@ pub mod tests { receive_call_response(&*on_demand, &mut network, 0, 0); thread.join().unwrap(); } + + #[test] + fn receives_remote_read_response() { + let (_x, on_demand) = dummy(true); + let queue = RwLock::new(VecDeque::new()); + let mut network = TestIo::new(&queue, None); + on_demand.on_connect(0, Roles::FULL); + + let response = on_demand.remote_read(RemoteReadRequest { block: Default::default(), key: b":key".to_vec() }); + let thread = ::std::thread::spawn(move || { + let result = response.wait().unwrap(); + assert_eq!(result, Some(vec![41])); + }); + + on_demand.on_remote_read_response(&mut network, 0, message::RemoteReadResponse { + id: 0, + proof: vec![vec![2]], + }); + thread.join().unwrap(); + } + + #[test] + fn receives_remote_header_response() { + let (_x, on_demand) = dummy(true); + let queue = RwLock::new(VecDeque::new()); + let mut network = TestIo::new(&queue, None); + on_demand.on_connect(0, Roles::FULL); + + let response = on_demand.remote_header(RemoteHeaderRequest { block: 1 }); + let thread = ::std::thread::spawn(move || { + let result = response.wait().unwrap(); + assert_eq!(result.hash(), "80729accb7bb10ff9c637a10e8bb59f21c52571aa7b46544c5885ca89ed190f4".into()); + }); + + on_demand.on_remote_header_response(&mut network, 0, message::RemoteHeaderResponse { + id: 0, + header: Some(Header { + parent_hash: Default::default(), + number: 1, + state_root: Default::default(), + extrinsics_root: Default::default(), + digest: Default::default(), + }), + proof: vec![vec![2]], + }); + thread.join().unwrap(); + } } diff --git a/substrate/network/src/protocol.rs b/substrate/network/src/protocol.rs index 545fcc06bc934..b053eae0e4916 100644 --- a/substrate/network/src/protocol.rs +++ b/substrate/network/src/protocol.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use std::time; use parking_lot::RwLock; use rustc_hex::ToHex; -use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashFor, As}; +use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashFor, NumberFor, As}; use runtime_primitives::generic::BlockId; use network_libp2p::{NodeIndex, Severity}; use codec::{Encode, Decode}; @@ -271,6 +271,8 @@ impl> Protocol { GenericMessage::RemoteCallResponse(response) => self.on_remote_call_response(io, who, response), GenericMessage::RemoteReadRequest(request) => self.on_remote_read_request(io, who, request), GenericMessage::RemoteReadResponse(response) => self.on_remote_read_response(io, who, response), + GenericMessage::RemoteHeaderRequest(request) => self.on_remote_header_request(io, who, request), + GenericMessage::RemoteHeaderResponse(response) => self.on_remote_header_response(io, who, response), other => self.specialization.write().on_message(&mut ProtocolContext::new(&self.context_data, io), who, other), } } @@ -625,6 +627,27 @@ impl> Protocol { self.on_demand.as_ref().map(|s| s.on_remote_read_response(io, who, response)); } + fn on_remote_header_request(&self, io: &mut SyncIo, who: NodeIndex, request: message::RemoteHeaderRequest>) { + trace!(target: "sync", "Remote header proof request {} from {} ({})", + request.id, who, request.block); + let (header, proof) = match self.context_data.chain.header_proof(request.block) { + Ok((header, proof)) => (Some(header), proof), + Err(error) => { + trace!(target: "sync", "Remote header proof request {} from {} ({}) failed with: {}", + request.id, who, request.block, error); + (Default::default(), Default::default()) + }, + }; + self.send_message(io, who, GenericMessage::RemoteHeaderResponse(message::RemoteHeaderResponse { + id: request.id, header, proof, + })); + } + + fn on_remote_header_response(&self, io: &mut SyncIo, who: NodeIndex, response: message::RemoteHeaderResponse) { + trace!(target: "sync", "Remote header proof response {} from {}", response.id, who); + self.on_demand.as_ref().map(|s| s.on_remote_header_response(io, who, response)); + } + /// Execute a closure with access to a network context and specialization. pub fn with_spec(&self, io: &mut SyncIo, f: F) -> U where F: FnOnce(&mut S, &mut Context) -> U diff --git a/substrate/service/src/components.rs b/substrate/service/src/components.rs index 058ea13dc7b84..43cc342a27425 100644 --- a/substrate/service/src/components.rs +++ b/substrate/service/src/components.rs @@ -215,7 +215,7 @@ pub struct LightComponents { impl Components for LightComponents where - <::Block as BlockT>::Hash: Into<[u8; 32]>, + <::Block as BlockT>::Hash: Into<[u8; 32]> + From<[u8; 32]>, { type Factory = Factory; type Executor = LightExecutor;