From 1770dbf44900f18b5e2f35d4374fd0dd38cf99be Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 2 Jul 2018 17:00:04 +0200 Subject: [PATCH 1/9] arbitrary application logic in CLI --- Cargo.toml | 4 ++ polkadot/cli/Cargo.toml | 1 - polkadot/cli/src/lib.rs | 41 ++++++++++++------- polkadot/collator/Cargo.toml | 5 ++- polkadot/collator/src/lib.rs | 63 ++++++++++++++++++++++++++--- polkadot/consensus/src/collation.rs | 10 +---- polkadot/consensus/src/lib.rs | 4 +- polkadot/src/main.rs | 21 ++++++++++ 8 files changed, 116 insertions(+), 33 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4d730a0236e73..a858bb7474388 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,10 @@ authors = ["Parity Technologies "] [dependencies] error-chain = "0.12" polkadot-cli = { path = "polkadot/cli" } +polkadot-service = { path = "polkadot/service" } +futures = "0.1" +ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" } + [workspace] members = [ diff --git a/polkadot/cli/Cargo.toml b/polkadot/cli/Cargo.toml index cd0040c43926f..fa187a3d2aeb0 100644 --- a/polkadot/cli/Cargo.toml +++ b/polkadot/cli/Cargo.toml @@ -21,7 +21,6 @@ ed25519 = { path = "../../substrate/ed25519" } app_dirs = "1.2" tokio-core = "0.1.12" futures = "0.1.17" -ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" } fdlimit = "0.1" parking_lot = "0.4" serde_json = "1.0" diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index 411d713ca7529..4ca1a6db44f3a 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -112,6 +112,19 @@ fn read_storage_json(filename: &str) -> Option { Some(h.into_iter().map(|(k, v)| (k.0, v.0)).collect()) } +/// Additional application logic making use of the ndoe, to run asynchronously before shutdown. +/// +/// This will be invoked with the service and spawn a future that resolves +/// when complete. +pub trait Application { + /// A future that resolves when the application work is done. + /// This should be `Clone`able and shared and will be run on a tokio runtime. + type Done: IntoFuture; + + /// Do application work. + fn work(self, service: &Service) -> Self::Done; +} + /// Parse command line arguments and start the node. /// /// IANA unassigned port ranges that we could use: @@ -120,9 +133,10 @@ fn read_storage_json(filename: &str) -> Option { /// 9556-9591 Unassigned /// 9803-9874 Unassigned /// 9926-9949 Unassigned -pub fn run(args: I) -> error::Result<()> where +pub fn run(args: I, app: A) -> error::Result<()> where I: IntoIterator, T: Into + Clone, + A: Application, { let core = reactor::Core::new().expect("tokio::Core could not be created"); @@ -264,26 +278,23 @@ pub fn run(args: I) -> error::Result<()> where }; match role == service::Role::LIGHT { - true => run_until_exit(core, service::new_light(config)?, &matches, sys_conf), - false => run_until_exit(core, service::new_full(config)?, &matches, sys_conf), + true => run_until_exit(core, service::new_light(config)?, &matches, sys_conf, app), + false => run_until_exit(core, service::new_full(config)?, &matches, sys_conf, app), } } -fn run_until_exit(mut core: reactor::Core, service: service::Service, matches: &clap::ArgMatches, sys_conf: SystemConfiguration) -> error::Result<()> +fn run_until_exit( + mut core: reactor::Core, + service: service::Service, + matches: &clap::ArgMatches, + sys_conf: SystemConfiguration, + application: A, +) -> error::Result<()> where C: service::Components, + A: Application, client::error::Error: From<<<::Backend as client::backend::Backend>::State as state_machine::Backend>::Error>, { - let exit = { - // can't use signal directly here because CtrlC takes only `Fn`. - let (exit_send, exit) = mpsc::channel(1); - ctrlc::CtrlC::set_handler(move || { - exit_send.clone().send(()).wait().expect("Error sending exit notification"); - }); - - exit - }; - informant::start(&service, core.handle()); let _rpc_servers = { @@ -306,7 +317,7 @@ fn run_until_exit(mut core: reactor::Core, service: service::Service, matc ) }; - core.run(exit.into_future()).expect("Error running informant event loop"); + core.run(application.work(&service)).expect("Error running informant event loop"); Ok(()) } diff --git a/polkadot/collator/Cargo.toml b/polkadot/collator/Cargo.toml index 161121eefad0b..0c460992674f9 100644 --- a/polkadot/collator/Cargo.toml +++ b/polkadot/collator/Cargo.toml @@ -2,12 +2,15 @@ name = "polkadot-collator" version = "0.1.0" authors = ["Parity Technologies "] -description = "Abstract collation logic" +description = "Collator node implementation" [dependencies] futures = "0.1.17" substrate-codec = { path = "../../substrate/codec", version = "0.1" } substrate-primitives = { path = "../../substrate/primitives", version = "0.1" } +polkadot-api = { path = "../api" } polkadot-runtime = { path = "../runtime", version = "0.1" } polkadot-parachain = { path = "../parachain", version = "0.1" } polkadot-primitives = { path = "../primitives", version = "0.1" } +polkadot-cli = { path = "../cli" } +polkadot-service = { path = "../servife" } diff --git a/polkadot/collator/src/lib.rs b/polkadot/collator/src/lib.rs index 55eca734d269b..7e7e37a673cab 100644 --- a/polkadot/collator/src/lib.rs +++ b/polkadot/collator/src/lib.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -//! Collation Logic. +//! Collation node logic. //! //! A collator node lives on a distinct parachain and submits a proposal for //! a state transition, along with a proof for its validity @@ -28,7 +28,7 @@ //! destination B as egress(X)[A -> B] //! //! On every block, each parachain will be intended to route messages from some -//! subset of all the other parachains. +//! subset of all the other parachains. (NOTE: in practice this is not done until PoC-3) //! //! Since the egress information is unique to every block, when routing from a //! parachain a collator must gather all egress posts from that parachain @@ -47,13 +47,20 @@ extern crate futures; extern crate substrate_codec as codec; extern crate substrate_primitives as primitives; + +extern crate polkadot_api; +extern crate polkadot_cli; extern crate polkadot_runtime; extern crate polkadot_primitives; +extern crate polkadot_service; use std::collections::{BTreeSet, BTreeMap}; use futures::{stream, Stream, Future, IntoFuture}; -use polkadot_primitives::parachain::{self, CandidateSignature, ConsolidatedIngress, Message, Id as ParaId}; +use polkadot_api::PolkadotApi; +use polkadot_primitives::BlockId; +use polkadot_primitives::parachain::{self, CandidateReceipt, CandidateSignature, ConsolidatedIngress, Collation, Message, Id as ParaId}; +use polkadot_service::{Components, Application}; /// Parachain context needed for collation. /// @@ -63,7 +70,7 @@ pub trait ParachainContext { fn produce_candidate>( &self, ingress: I, - ) -> (parachain::BlockData, polkadot_primitives::AccountId, CandidateSignature); + ) -> Collation; } /// Relay chain context needed to collate. @@ -122,7 +129,7 @@ pub fn collate_ingress<'a, R>(relay_context: R) /// Produce a candidate for the parachain. pub fn collate<'a, R, P>(local_id: ParaId, relay_context: R, para_context: P) - -> Box + 'a> + -> Box + 'a> where R: RelayChainContext, R::Error: 'a, @@ -143,6 +150,52 @@ pub fn collate<'a, R, P>(local_id: ParaId, relay_context: R, para_context: P) })) } +/// Polkadot-api context. +struct ApiContext { + api: A, + block_id: BlockId, +} + +impl RelayChainContext for ApiContext { + type Error = (); + type FutureEgress = Result>, Self::Error>; + + fn routing_parachains(&self) -> BTreeMap { + BTreeMap::new() + } + + fn unrouted_egress(&self, id: ParaId) -> Self::FutureEgress { + Ok(Vec::new()) + } +} + +struct CollationNode { + parachain_context: P, + exit: E, +} + +impl> cli::Application for CollationNode { + type Work = Box>; + + fn work(self, service: &service::Service) -> Self::Work { + let client = service.client(); + unimplemented!() + } +} + +/// Run a collator node with the given `RelayChainContext` and `ParachainContext` and +/// arguments to the underlying polkadot node. +/// +/// Provide a future which resolves when the node should exit. +/// This function blocks until done. +pub fn run_collator(parachain_context: P, exit: E, args: Vec<::std::ffi::OsStr>) -> cli::error::Result<()> where + P: ParachainContext, + E: IntoFuture +{ + let node_logic = CollationNode { parachain_context, exit: exit.into_future() }; + cli::run(args, node_logic) +} + #[cfg(test)] mod tests { use super::*; diff --git a/polkadot/consensus/src/collation.rs b/polkadot/consensus/src/collation.rs index 3738ae900413a..6de39df4a67c8 100644 --- a/polkadot/consensus/src/collation.rs +++ b/polkadot/consensus/src/collation.rs @@ -23,18 +23,10 @@ use std::sync::Arc; use polkadot_api::PolkadotApi; use polkadot_primitives::{Hash, AccountId}; -use polkadot_primitives::parachain::{Id as ParaId, Chain, BlockData, Extrinsic, CandidateReceipt}; +use polkadot_primitives::parachain::{Id as ParaId, Chain, Collation, BlockData, Extrinsic, CandidateReceipt}; use futures::prelude::*; -/// A full collation. -pub struct Collation { - /// Block data. - pub block_data: BlockData, - /// The candidate receipt itself. - pub receipt: CandidateReceipt, -} - /// Encapsulates connections to collators and allows collation on any parachain. /// /// This is expected to be a lightweight, shared type like an `Arc`. diff --git a/polkadot/consensus/src/lib.rs b/polkadot/consensus/src/lib.rs index 5dd4c1d484e46..8d552af133cf2 100644 --- a/polkadot/consensus/src/lib.rs +++ b/polkadot/consensus/src/lib.rs @@ -71,7 +71,7 @@ use table::generic::Statement as GenericStatement; use runtime_support::Hashable; use polkadot_api::PolkadotApi; use polkadot_primitives::{Hash, Block, BlockId, BlockNumber, Header, Timestamp}; -use polkadot_primitives::parachain::{Id as ParaId, Chain, DutyRoster, BlockData, Extrinsic as ParachainExtrinsic, CandidateReceipt}; +use polkadot_primitives::parachain::{Id as ParaId, Chain, Collation, DutyRoster, BlockData, Extrinsic as ParachainExtrinsic, CandidateReceipt}; use polkadot_runtime::BareExtrinsic; use primitives::AuthorityId; use transaction_pool::{TransactionPool}; @@ -82,7 +82,7 @@ use futures::future::{self, Shared}; use collation::CollationFetch; use dynamic_inclusion::DynamicInclusion; -pub use self::collation::{Collators, Collation}; +pub use self::collation::Collators; pub use self::error::{ErrorKind, Error}; pub use self::shared_table::{SharedTable, StatementSource, StatementProducer, ProducedStatements}; pub use service::Service; diff --git a/polkadot/src/main.rs b/polkadot/src/main.rs index 50ff18462e29a..be9d3a7417090 100644 --- a/polkadot/src/main.rs +++ b/polkadot/src/main.rs @@ -19,6 +19,27 @@ #![warn(missing_docs)] extern crate polkadot_cli as cli; +extern crate polkadot_service as service; +extern crate ctrlc; +extern crate futures; + +use futures::mpsc; + +// the regular polkadot application simply runs until ctrl-c +struct Application; +impl cli::Application for Application { + type Work = mpsc::Receiver<()>; + + fn work(self, _service: Service) -> Self::Work { + // can't use signal directly here because CtrlC takes only `Fn`. + let (exit_send, exit) = mpsc::channel(1); + ctrlc::CtrlC::set_handler(move || { + exit_send.clone().send(()).wait().expect("Error sending exit notification"); + }); + + exit + } +} #[macro_use] extern crate error_chain; From d8d0c03c318bea74825e9597c5b0bb3bcced6da0 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 3 Jul 2018 11:56:21 +0200 Subject: [PATCH 2/9] collation work --- Cargo.lock | 11 ++- polkadot/cli/src/lib.rs | 9 +-- polkadot/collator/Cargo.toml | 5 +- polkadot/collator/src/lib.rs | 89 ++++++++++++++++------ polkadot/consensus/Cargo.toml | 1 - polkadot/consensus/src/collation.rs | 2 +- polkadot/consensus/src/lib.rs | 3 +- polkadot/consensus/src/service.rs | 2 +- polkadot/consensus/src/shared_table/mod.rs | 3 +- polkadot/primitives/src/parachain.rs | 8 ++ polkadot/service/src/lib.rs | 7 ++ polkadot/src/main.rs | 2 +- 12 files changed, 102 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 244ed07aeeb86..22190df8b4a78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1370,8 +1370,11 @@ dependencies = [ name = "polkadot" version = "0.2.0" dependencies = [ + "ctrlc 1.1.1 (git+https://github.com/paritytech/rust-ctrlc.git)", "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "polkadot-cli 0.2.0", + "polkadot-service 0.2.0", ] [[package]] @@ -1401,7 +1404,6 @@ dependencies = [ "app_dirs 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "atty 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.31.2 (registry+https://github.com/rust-lang/crates.io-index)", - "ctrlc 1.1.1 (git+https://github.com/paritytech/rust-ctrlc.git)", "ed25519 0.1.0", "env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1437,11 +1439,17 @@ name = "polkadot-collator" version = "0.1.0" dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "polkadot-api 0.1.0", + "polkadot-cli 0.2.0", "polkadot-parachain 0.1.0", "polkadot-primitives 0.1.0", "polkadot-runtime 0.1.0", + "polkadot-service 0.2.0", + "substrate-client 0.1.0", "substrate-codec 0.1.0", "substrate-primitives 0.1.0", + "substrate-state-machine 0.1.0", ] [[package]] @@ -1455,7 +1463,6 @@ dependencies = [ "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "polkadot-api 0.1.0", - "polkadot-collator 0.1.0", "polkadot-parachain 0.1.0", "polkadot-primitives 0.1.0", "polkadot-runtime 0.1.0", diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index 4ca1a6db44f3a..129127892333a 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -26,7 +26,6 @@ extern crate regex; extern crate time; extern crate futures; extern crate tokio_core; -extern crate ctrlc; extern crate fdlimit; extern crate ed25519; extern crate triehash; @@ -80,8 +79,7 @@ use substrate_telemetry::{init_telemetry, TelemetryConfig}; use runtime_primitives::StorageMap; use polkadot_primitives::Block; -use futures::sync::mpsc; -use futures::{Sink, Future, Stream}; +use futures::prelude::*; use tokio_core::reactor; use service::PruningMode; @@ -122,7 +120,8 @@ pub trait Application { type Done: IntoFuture; /// Do application work. - fn work(self, service: &Service) -> Self::Done; + fn work(self, service: &service::Service) -> Self::Done + where client::error::Error: From<<<::Backend as client::backend::Backend>::State as state_machine::Backend>::Error>; } /// Parse command line arguments and start the node. @@ -317,7 +316,7 @@ fn run_until_exit( ) }; - core.run(application.work(&service)).expect("Error running informant event loop"); + core.run(application.work(&service).into_future()).expect("Error running informant event loop"); Ok(()) } diff --git a/polkadot/collator/Cargo.toml b/polkadot/collator/Cargo.toml index 0c460992674f9..52bf493b0dfd6 100644 --- a/polkadot/collator/Cargo.toml +++ b/polkadot/collator/Cargo.toml @@ -6,11 +6,14 @@ description = "Collator node implementation" [dependencies] futures = "0.1.17" +substrate-client = { path = "../../substrate/client" } substrate-codec = { path = "../../substrate/codec", version = "0.1" } +substrate-state-machine = { path = "../../substrate/state-machine" } substrate-primitives = { path = "../../substrate/primitives", version = "0.1" } polkadot-api = { path = "../api" } polkadot-runtime = { path = "../runtime", version = "0.1" } polkadot-parachain = { path = "../parachain", version = "0.1" } polkadot-primitives = { path = "../primitives", version = "0.1" } polkadot-cli = { path = "../cli" } -polkadot-service = { path = "../servife" } +polkadot-service = { path = "../service" } +log = "0.4" diff --git a/polkadot/collator/src/lib.rs b/polkadot/collator/src/lib.rs index 7e7e37a673cab..9d5b7e76704d5 100644 --- a/polkadot/collator/src/lib.rs +++ b/polkadot/collator/src/lib.rs @@ -45,6 +45,8 @@ //! to be performed, as the collation logic itself. extern crate futures; +extern crate substrate_client as client; +extern crate substrate_state_machine as state_machine; extern crate substrate_codec as codec; extern crate substrate_primitives as primitives; @@ -54,21 +56,27 @@ extern crate polkadot_runtime; extern crate polkadot_primitives; extern crate polkadot_service; +#[macro_use] +extern crate log; + use std::collections::{BTreeSet, BTreeMap}; +use std::sync::Arc; -use futures::{stream, Stream, Future, IntoFuture}; +use futures::{future, stream, Stream, Future, IntoFuture}; use polkadot_api::PolkadotApi; -use polkadot_primitives::BlockId; -use polkadot_primitives::parachain::{self, CandidateReceipt, CandidateSignature, ConsolidatedIngress, Collation, Message, Id as ParaId}; -use polkadot_service::{Components, Application}; +use polkadot_primitives::{Block, BlockId}; +use polkadot_primitives::parachain::{self, HeadData, CandidateReceipt, CandidateSignature, ConsolidatedIngress, Collation, Message, Id as ParaId}; +use polkadot_service::{Service, Components}; +use polkadot_cli::Application; /// Parachain context needed for collation. /// /// This can be implemented through an externally attached service or a stub. -pub trait ParachainContext { +pub trait ParachainContext: Clone { /// Produce a candidate, given the latest ingress queue information. fn produce_candidate>( &self, + last_head: HeadData, ingress: I, ) -> Collation; } @@ -92,7 +100,7 @@ pub trait RelayChainContext { /// Collate the necessary ingress queue using the given context. pub fn collate_ingress<'a, R>(relay_context: R) - -> Box + 'a> + -> impl Future + 'a where R: RelayChainContext, R::Error: 'a, @@ -113,7 +121,7 @@ pub fn collate_ingress<'a, R>(relay_context: R) // and then by the parachain ID. // // then transform that into the consolidated egress queue. - Box::new(stream::futures_unordered(egress_fetch) + stream::futures_unordered(egress_fetch) .fold(BTreeMap::new(), |mut map, (routing_id, egresses)| { for (depth, egress) in egresses.into_iter().rev().enumerate() { let depth = -(depth as i64); @@ -124,20 +132,21 @@ pub fn collate_ingress<'a, R>(relay_context: R) }) .map(|ordered| ordered.into_iter().map(|((_, id), egress)| (id, egress))) .map(|i| i.collect::>()) - .map(ConsolidatedIngress)) + .map(ConsolidatedIngress) } /// Produce a candidate for the parachain. -pub fn collate<'a, R, P>(local_id: ParaId, relay_context: R, para_context: P) - -> Box + 'a> +pub fn collate<'a, R, P>(local_id: ParaId, last_head: HeadData, relay_context: R, para_context: P) + -> impl Future + 'a where R: RelayChainContext, R::Error: 'a, R::FutureEgress: 'a, P: ParachainContext + 'a, { - Box::new(collate_ingress(relay_context).map(move |ingress| { + collate_ingress(relay_context).map(move |ingress| { let (block_data, _, signature) = para_context.produce_candidate( + last_head, ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg))) ); @@ -147,21 +156,21 @@ pub fn collate<'a, R, P>(local_id: ParaId, relay_context: R, para_context: P) block: block_data, unprocessed_ingress: ingress, } - })) + }) } /// Polkadot-api context. -struct ApiContext { - api: A, - block_id: BlockId, +struct ApiContext { + api: Arc, + id: A::CheckedBlockId, } impl RelayChainContext for ApiContext { type Error = (); type FutureEgress = Result>, Self::Error>; - fn routing_parachains(&self) -> BTreeMap { - BTreeMap::new() + fn routing_parachains(&self) -> BTreeSet { + BTreeSet::new() } fn unrouted_egress(&self, id: ParaId) -> Self::FutureEgress { @@ -172,14 +181,46 @@ impl RelayChainContext for ApiContext { struct CollationNode { parachain_context: P, exit: E, + para_id: ParaId, } -impl> cli::Application for CollationNode { - type Work = Box>; +impl> Application for CollationNode { + type Done = Box>; - fn work(self, service: &service::Service) -> Self::Work { + fn work(self, service: &Service) -> Self::Done + where client::error::Error: From<<<::Backend as client::backend::Backend>::State as state_machine::Backend>::Error>, + { + let CollationNode { parachain_context, exit, para_id } = self; let client = service.client(); - unimplemented!() + let api = service.api(); + + let work = client.import_notification_stream() + .map(move |notification| { + let id = BlockId::hash(notification.hash); + + let checked_id = api.check_id(id).expect("block from import notification always known; qed"); + match api.parachain_head(&checked_id, para_id) { + Ok(Some(last_head)) => { + let context = ApiContext { api: api.clone(), id: checked_id }; + let collation_work = collate(para_id, last_head, context, parachain_context.clone()).map(Some); + future::Either::A(collation_work) + } + Ok(None) => { + info!("Parachain {:?} appears to be inactive. Cannot collate.", id); + future::Either::B(None) + } + Err(e) => { + info!("Could not collate for parachain {:?}: {:?}", id, e); + future::Either::B(None) // returning error would shut down the collation node + } + } + }) + .for_each(|collation| { + // TODO: import into network. + Ok(()) + }); + + Box::new(work.select(exit).then(|_| Ok(()))) } } @@ -188,12 +229,12 @@ impl> cli::Application for CollationNode { /// /// Provide a future which resolves when the node should exit. /// This function blocks until done. -pub fn run_collator(parachain_context: P, exit: E, args: Vec<::std::ffi::OsStr>) -> cli::error::Result<()> where +pub fn run_collator(parachain_context: P, para_id: ParaId, exit: E, args: Vec<::std::ffi::OsString>) -> polkadot_cli::error::Result<()> where P: ParachainContext, E: IntoFuture { - let node_logic = CollationNode { parachain_context, exit: exit.into_future() }; - cli::run(args, node_logic) + let node_logic = CollationNode { parachain_context, exit: exit.into_future(), para_id }; + polkadot_cli::run(args, node_logic) } #[cfg(test)] diff --git a/polkadot/consensus/Cargo.toml b/polkadot/consensus/Cargo.toml index fefa67de45d22..d0284b4240504 100644 --- a/polkadot/consensus/Cargo.toml +++ b/polkadot/consensus/Cargo.toml @@ -12,7 +12,6 @@ error-chain = "0.12" log = "0.3" exit-future = "0.1" polkadot-api = { path = "../api" } -polkadot-collator = { path = "../collator" } polkadot-parachain = { path = "../parachain" } polkadot-primitives = { path = "../primitives" } polkadot-runtime = { path = "../runtime" } diff --git a/polkadot/consensus/src/collation.rs b/polkadot/consensus/src/collation.rs index 6de39df4a67c8..b00b9d8d7dca7 100644 --- a/polkadot/consensus/src/collation.rs +++ b/polkadot/consensus/src/collation.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use polkadot_api::PolkadotApi; use polkadot_primitives::{Hash, AccountId}; -use polkadot_primitives::parachain::{Id as ParaId, Chain, Collation, BlockData, Extrinsic, CandidateReceipt}; +use polkadot_primitives::parachain::{Id as ParaId, Chain, Collation, Extrinsic}; use futures::prelude::*; diff --git a/polkadot/consensus/src/lib.rs b/polkadot/consensus/src/lib.rs index 8d552af133cf2..d313233a113bd 100644 --- a/polkadot/consensus/src/lib.rs +++ b/polkadot/consensus/src/lib.rs @@ -32,7 +32,6 @@ extern crate ed25519; extern crate parking_lot; extern crate polkadot_api; -extern crate polkadot_collator as collator; extern crate polkadot_statement_table as table; extern crate polkadot_parachain as parachain; extern crate polkadot_transaction_pool as transaction_pool; @@ -71,7 +70,7 @@ use table::generic::Statement as GenericStatement; use runtime_support::Hashable; use polkadot_api::PolkadotApi; use polkadot_primitives::{Hash, Block, BlockId, BlockNumber, Header, Timestamp}; -use polkadot_primitives::parachain::{Id as ParaId, Chain, Collation, DutyRoster, BlockData, Extrinsic as ParachainExtrinsic, CandidateReceipt}; +use polkadot_primitives::parachain::{Id as ParaId, Chain, DutyRoster, BlockData, Extrinsic as ParachainExtrinsic, CandidateReceipt}; use polkadot_runtime::BareExtrinsic; use primitives::AuthorityId; use transaction_pool::{TransactionPool}; diff --git a/polkadot/consensus/src/service.rs b/polkadot/consensus/src/service.rs index 7e7f5e914665a..6ee0570230f9f 100644 --- a/polkadot/consensus/src/service.rs +++ b/polkadot/consensus/src/service.rs @@ -337,7 +337,7 @@ struct NoCollators; impl ::collation::Collators for NoCollators { type Error = (); - type Collation = future::Empty<::collation::Collation, ()>; + type Collation = future::Empty<::polkadot_primitives::parachain::Collation, ()>; fn collate(&self, _parachain: ParaId, _relay_parent: Hash) -> Self::Collation { future::empty() diff --git a/polkadot/consensus/src/shared_table/mod.rs b/polkadot/consensus/src/shared_table/mod.rs index 120fbb7fec365..dcae8349be66f 100644 --- a/polkadot/consensus/src/shared_table/mod.rs +++ b/polkadot/consensus/src/shared_table/mod.rs @@ -22,9 +22,8 @@ use std::sync::Arc; use table::{self, Table, Context as TableContextTrait}; use table::generic::Statement as GenericStatement; -use collation::Collation; use polkadot_primitives::Hash; -use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt}; +use polkadot_primitives::parachain::{Id as ParaId, BlockData, Collation, Extrinsic, CandidateReceipt}; use primitives::AuthorityId; use parking_lot::Mutex; diff --git a/polkadot/primitives/src/parachain.rs b/polkadot/primitives/src/parachain.rs index 9f803810e4538..e3d04e7b484e3 100644 --- a/polkadot/primitives/src/parachain.rs +++ b/polkadot/primitives/src/parachain.rs @@ -174,6 +174,14 @@ pub struct CandidateReceipt { pub fees: u64, } +/// A full collation. +pub struct Collation { + /// Block data. + pub block_data: BlockData, + /// The candidate receipt itself. + pub receipt: CandidateReceipt, +} + impl Slicable for CandidateReceipt { fn encode(&self) -> Vec { let mut v = Vec::new(); diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index 0bc343e0c936f..31d7092fb47f4 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -74,6 +74,7 @@ pub use config::{Configuration, Role, PruningMode}; pub struct Service { thread: Option>, client: Arc>, + api: Arc, network: Arc>, transaction_pool: Arc>, signal: Option, @@ -197,6 +198,7 @@ impl Service network: network, transaction_pool: transaction_pool, signal: Some(signal), + api: api, _consensus: consensus_service, }) } @@ -206,6 +208,11 @@ impl Service self.client.clone() } + /// Get shared polkadot-api instance. usually the same as the client. + pub fn clent(&self) -> Arc { + self.api.clone() + } + /// Get shared network instance. pub fn network(&self) -> Arc> { self.network.clone() diff --git a/polkadot/src/main.rs b/polkadot/src/main.rs index be9d3a7417090..6c70c9803717f 100644 --- a/polkadot/src/main.rs +++ b/polkadot/src/main.rs @@ -23,7 +23,7 @@ extern crate polkadot_service as service; extern crate ctrlc; extern crate futures; -use futures::mpsc; +use futures::sync::mpsc; // the regular polkadot application simply runs until ctrl-c struct Application; From 48ce61a403853fe50ec1d1d475dcfa47a6ef7304 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 4 Jul 2018 16:12:02 +0200 Subject: [PATCH 3/9] split up exit and work futures in application --- Cargo.lock | 1 - Cargo.toml | 2 -- polkadot/cli/src/lib.rs | 59 +++++++++++++++++++++++++++-------------- polkadot/src/main.rs | 36 +++++++++++++++++-------- 4 files changed, 64 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e741ba33f055e..e2632ce8d6da1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1375,7 +1375,6 @@ dependencies = [ "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "polkadot-cli 0.2.0", - "polkadot-service 0.2.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index f03d939910d06..9c86e92d7d317 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,11 +10,9 @@ authors = ["Parity Technologies "] [dependencies] error-chain = "0.12" polkadot-cli = { path = "polkadot/cli" } -polkadot-service = { path = "polkadot/service" } futures = "0.1" ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" } - [workspace] members = [ "polkadot/api", diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index 0a1f1f5121415..43577f72e75dc 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -64,6 +64,11 @@ mod informant; mod chain_spec; pub use chain_spec::ChainSpec; +pub use client::error::Error as ClientError; +pub use client::backend::Backend as ClientBackend; +pub use state_machine::Backend as StateMachineBackend; +pub use polkadot_primitives::Block as PolkadotBlock; +pub use service::{Components as ServiceComponents, Service}; use std::io::{self, Write, Read, stdin, stdout}; use std::fs::File; @@ -121,12 +126,18 @@ fn base_path(matches: &clap::ArgMatches) -> PathBuf { /// when complete. pub trait Application { /// A future that resolves when the application work is done. - /// This should be `Clone`able and shared and will be run on a tokio runtime. - type Done: IntoFuture; + /// This will be run on a tokio runtime. + type Work: Future; + + /// A future that resolves when the node should exit. + type Exit: Future + Send + 'static; + + /// Create exit future. + fn exit(&mut self) -> Self::Exit; /// Do application work. - fn work(self, service: &service::Service) -> Self::Done - where client::error::Error: From<<<::Backend as client::backend::Backend>::State as state_machine::Backend>::Error>; + fn work(self, service: &Service) -> Self::Work + where ClientError: From<<<::Backend as ClientBackend>::State as StateMachineBackend>::Error>; } /// Parse command line arguments and start the node. @@ -137,7 +148,7 @@ pub trait Application { /// 9556-9591 Unassigned /// 9803-9874 Unassigned /// 9926-9949 Unassigned -pub fn run(args: I, app: A) -> error::Result<()> where +pub fn run(args: I, mut app: A) -> error::Result<()> where I: IntoIterator, T: Into + Clone, A: Application, @@ -167,11 +178,11 @@ pub fn run(args: I, app: A) -> error::Result<()> where } if let Some(matches) = matches.subcommand_matches("export-blocks") { - return export_blocks(matches); + return export_blocks(matches, app.exit()); } if let Some(matches) = matches.subcommand_matches("import-blocks") { - return import_blocks(matches); + return import_blocks(matches, app.exit()); } let spec = load_spec(&matches)?; @@ -278,16 +289,19 @@ fn build_spec(matches: &clap::ArgMatches) -> error::Result<()> { Ok(()) } -fn export_blocks(matches: &clap::ArgMatches) -> error::Result<()> { +fn export_blocks(matches: &clap::ArgMatches, exit: E) -> error::Result<()> + where E: Future + Send + 'static +{ let base_path = base_path(matches); let spec = load_spec(&matches)?; let mut config = service::Configuration::default_with_spec(spec); config.database_path = db_path(&base_path).to_string_lossy().into(); info!("DB path: {}", config.database_path); let client = service::new_client(config)?; - let (exit_send, exit) = std::sync::mpsc::channel(); - ctrlc::CtrlC::set_handler(move || { - exit_send.clone().send(()).expect("Error sending exit notification"); + let (exit_send, exit_recv) = std::sync::mpsc::channel(); + ::std::thread::spawn(move || { + let _ = exit.wait(); + let _ = exit_send.send(()); }); info!("Exporting blocks"); let mut block: u32 = match matches.value_of("from") { @@ -316,7 +330,7 @@ fn export_blocks(matches: &clap::ArgMatches) -> error::Result<()> { } loop { - if exit.try_recv().is_ok() { + if exit_recv.try_recv().is_ok() { break; } match client.block(&BlockId::number(block as u64))? { @@ -340,15 +354,19 @@ fn export_blocks(matches: &clap::ArgMatches) -> error::Result<()> { Ok(()) } -fn import_blocks(matches: &clap::ArgMatches) -> error::Result<()> { +fn import_blocks(matches: &clap::ArgMatches, exit: E) -> error::Result<()> + where E: Future + Send + 'static +{ let spec = load_spec(&matches)?; let base_path = base_path(matches); let mut config = service::Configuration::default_with_spec(spec); config.database_path = db_path(&base_path).to_string_lossy().into(); let client = service::new_client(config)?; - let (exit_send, exit) = std::sync::mpsc::channel(); - ctrlc::CtrlC::set_handler(move || { - exit_send.clone().send(()).expect("Error sending exit notification"); + let (exit_send, exit_recv) = std::sync::mpsc::channel(); + + ::std::thread::spawn(move || { + let _ = exit.wait(); + let _ = exit_send.send(()); }); let mut file: Box = match matches.value_of("INPUT") { @@ -360,7 +378,7 @@ fn import_blocks(matches: &clap::ArgMatches) -> error::Result<()> { let count: u32 = Slicable::decode(&mut file).ok_or("Error reading file")?; let mut block = 0; for _ in 0 .. count { - if exit.try_recv().is_ok() { + if exit_recv.try_recv().is_ok() { break; } match SignedBlock::decode(&mut file) { @@ -388,7 +406,7 @@ fn run_until_exit( service: service::Service, matches: &clap::ArgMatches, sys_conf: SystemConfiguration, - application: A, + mut application: A, ) -> error::Result<()> where C: service::Components, @@ -417,8 +435,9 @@ fn run_until_exit( ) }; - core.run(application.work(&service).into_future()).expect("Error running informant event loop"); - Ok(()) + let exit = application.exit(); + let until_exit = application.work(&service).select(exit).then(|_| Ok(())); + core.run(until_exit) } fn start_server(mut address: SocketAddr, start: F) -> Result where diff --git a/polkadot/src/main.rs b/polkadot/src/main.rs index 6c70c9803717f..db0cbb50c8a2f 100644 --- a/polkadot/src/main.rs +++ b/polkadot/src/main.rs @@ -19,33 +19,47 @@ #![warn(missing_docs)] extern crate polkadot_cli as cli; -extern crate polkadot_service as service; extern crate ctrlc; extern crate futures; -use futures::sync::mpsc; +#[macro_use] +extern crate error_chain; + +use cli::{ClientError, ServiceComponents, ClientBackend, PolkadotBlock, StateMachineBackend, Service}; +use futures::sync::oneshot; +use futures::{future, Future}; + +use std::cell::RefCell; // the regular polkadot application simply runs until ctrl-c struct Application; impl cli::Application for Application { - type Work = mpsc::Receiver<()>; + type Work = future::Empty<(), ()>; + type Exit = future::MapErr, fn(oneshot::Canceled) -> ()>; - fn work(self, _service: Service) -> Self::Work { + fn exit(&mut self) -> Self::Exit { // can't use signal directly here because CtrlC takes only `Fn`. - let (exit_send, exit) = mpsc::channel(1); + let (exit_send, exit) = oneshot::channel(); + + let exit_send_cell = RefCell::new(Some(exit_send)); ctrlc::CtrlC::set_handler(move || { - exit_send.clone().send(()).wait().expect("Error sending exit notification"); + if let Some(exit_send) = exit_send_cell.try_borrow_mut().expect("signal handler not reentrant; qed").take() { + exit_send.send(()).expect("Error sending exit notification"); + } }); - exit + exit.map_err(drop) } -} -#[macro_use] -extern crate error_chain; + fn work(self, _service: &Service) -> Self::Work + where ClientError: From<<<::Backend as ClientBackend>::State as StateMachineBackend>::Error>, + { + future::empty() + } +} quick_main!(run); fn run() -> cli::error::Result<()> { - cli::run(::std::env::args()) + cli::run(::std::env::args(), Application) } From 12ca69423fd8de5d785a99f4ce58122c2cc84278 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 4 Jul 2018 17:24:17 +0200 Subject: [PATCH 4/9] collation node workflow --- Cargo.lock | 3 +- polkadot/collator/Cargo.toml | 3 +- polkadot/collator/src/lib.rs | 117 ++++++++++++++------- polkadot/consensus/src/shared_table/mod.rs | 2 + polkadot/primitives/src/parachain.rs | 44 +++----- polkadot/service/src/lib.rs | 2 +- 6 files changed, 99 insertions(+), 72 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e2632ce8d6da1..edc4bfce7ec3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1438,6 +1438,7 @@ dependencies = [ name = "polkadot-collator" version = "0.1.0" dependencies = [ + "ed25519 0.1.0", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "polkadot-api 0.1.0", @@ -1445,11 +1446,9 @@ dependencies = [ "polkadot-parachain 0.1.0", "polkadot-primitives 0.1.0", "polkadot-runtime 0.1.0", - "polkadot-service 0.2.0", "substrate-client 0.1.0", "substrate-codec 0.1.0", "substrate-primitives 0.1.0", - "substrate-state-machine 0.1.0", ] [[package]] diff --git a/polkadot/collator/Cargo.toml b/polkadot/collator/Cargo.toml index 52bf493b0dfd6..2b9a95a81f05f 100644 --- a/polkadot/collator/Cargo.toml +++ b/polkadot/collator/Cargo.toml @@ -8,12 +8,11 @@ description = "Collator node implementation" futures = "0.1.17" substrate-client = { path = "../../substrate/client" } substrate-codec = { path = "../../substrate/codec", version = "0.1" } -substrate-state-machine = { path = "../../substrate/state-machine" } substrate-primitives = { path = "../../substrate/primitives", version = "0.1" } polkadot-api = { path = "../api" } polkadot-runtime = { path = "../runtime", version = "0.1" } polkadot-parachain = { path = "../parachain", version = "0.1" } polkadot-primitives = { path = "../primitives", version = "0.1" } polkadot-cli = { path = "../cli" } -polkadot-service = { path = "../service" } log = "0.4" +ed25519 = { path = "../../substrate/ed25519" } diff --git a/polkadot/collator/src/lib.rs b/polkadot/collator/src/lib.rs index 9d5b7e76704d5..0104d58df7395 100644 --- a/polkadot/collator/src/lib.rs +++ b/polkadot/collator/src/lib.rs @@ -46,15 +46,14 @@ extern crate futures; extern crate substrate_client as client; -extern crate substrate_state_machine as state_machine; extern crate substrate_codec as codec; extern crate substrate_primitives as primitives; +extern crate ed25519; extern crate polkadot_api; extern crate polkadot_cli; extern crate polkadot_runtime; extern crate polkadot_primitives; -extern crate polkadot_service; #[macro_use] extern crate log; @@ -63,10 +62,11 @@ use std::collections::{BTreeSet, BTreeMap}; use std::sync::Arc; use futures::{future, stream, Stream, Future, IntoFuture}; +use client::BlockchainEvents; use polkadot_api::PolkadotApi; -use polkadot_primitives::{Block, BlockId}; -use polkadot_primitives::parachain::{self, HeadData, CandidateReceipt, CandidateSignature, ConsolidatedIngress, Collation, Message, Id as ParaId}; -use polkadot_service::{Service, Components}; +use polkadot_primitives::BlockId; +use polkadot_primitives::parachain::{self, BlockData, HeadData, ConsolidatedIngress, Collation, Message, Id as ParaId}; +use polkadot_cli::{ClientError, ServiceComponents, ClientBackend, PolkadotBlock, StateMachineBackend, Service}; use polkadot_cli::Application; /// Parachain context needed for collation. @@ -78,7 +78,7 @@ pub trait ParachainContext: Clone { &self, last_head: HeadData, ingress: I, - ) -> Collation; + ) -> (BlockData, HeadData); } /// Relay chain context needed to collate. @@ -135,37 +135,51 @@ pub fn collate_ingress<'a, R>(relay_context: R) .map(ConsolidatedIngress) } -/// Produce a candidate for the parachain. -pub fn collate<'a, R, P>(local_id: ParaId, last_head: HeadData, relay_context: R, para_context: P) +/// Produce a candidate for the parachain, with given signing key. +pub fn collate<'a, R, P>( + local_id: ParaId, + last_head: HeadData, + relay_context: R, + para_context: P, + key: Arc, +) -> impl Future + 'a where - R: RelayChainContext, + R: RelayChainContext + 'a, R::Error: 'a, R::FutureEgress: 'a, P: ParachainContext + 'a, { collate_ingress(relay_context).map(move |ingress| { - let (block_data, _, signature) = para_context.produce_candidate( + let (block_data, head_data) = para_context.produce_candidate( last_head, ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg))) ); - parachain::Candidate { + let signature = key.sign(&block_data.0[..]).into(); + let pubkey_bytes: [u8; 32] = key.public().into(); + + let receipt = parachain::CandidateReceipt { parachain_index: local_id, - collator_signature: signature, - block: block_data, - unprocessed_ingress: ingress, + collator: pubkey_bytes.into(), + signature, + head_data, + balance_uploads: Vec::new(), + egress_queue_roots: Vec::new(), + fees: 0, + }; + + parachain::Collation { + receipt, + block_data, } }) } /// Polkadot-api context. -struct ApiContext { - api: Arc, - id: A::CheckedBlockId, -} +struct ApiContext; -impl RelayChainContext for ApiContext { +impl RelayChainContext for ApiContext { type Error = (); type FutureEgress = Result>, Self::Error>; @@ -173,54 +187,72 @@ impl RelayChainContext for ApiContext { BTreeSet::new() } - fn unrouted_egress(&self, id: ParaId) -> Self::FutureEgress { + fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress { Ok(Vec::new()) } } struct CollationNode { parachain_context: P, - exit: E, + exit: Option, para_id: ParaId, + key: Arc, } -impl> Application for CollationNode { - type Done = Box>; +impl Application for CollationNode where + P: ParachainContext + 'static, + E: Future + Send + 'static +{ + type Work = Box>; + type Exit = future::Either>; + + fn exit(&mut self) -> Self::Exit { + match self.exit.take() { + Some(exit) => future::Either::A(exit), + None => future::Either::B(future::empty()), + } + } - fn work(self, service: &Service) -> Self::Done - where client::error::Error: From<<<::Backend as client::backend::Backend>::State as state_machine::Backend>::Error>, + fn work(self, service: &Service) -> Self::Work + where ClientError: From<<<::Backend as ClientBackend>::State as StateMachineBackend>::Error>, { - let CollationNode { parachain_context, exit, para_id } = self; + let CollationNode { parachain_context, exit, para_id, key } = self; + let _ = exit; let client = service.client(); let api = service.api(); let work = client.import_notification_stream() - .map(move |notification| { + .and_then(move |notification| { let id = BlockId::hash(notification.hash); - let checked_id = api.check_id(id).expect("block from import notification always known; qed"); - match api.parachain_head(&checked_id, para_id) { + match api.parachain_head(&id, para_id) { Ok(Some(last_head)) => { - let context = ApiContext { api: api.clone(), id: checked_id }; - let collation_work = collate(para_id, last_head, context, parachain_context.clone()).map(Some); + let collation_work = collate( + para_id, + HeadData(last_head), + ApiContext, + parachain_context.clone(), + key.clone(), + ).map(Some); + future::Either::A(collation_work) } Ok(None) => { info!("Parachain {:?} appears to be inactive. Cannot collate.", id); - future::Either::B(None) + future::Either::B(future::ok(None)) } Err(e) => { info!("Could not collate for parachain {:?}: {:?}", id, e); - future::Either::B(None) // returning error would shut down the collation node + future::Either::B(future::ok(None)) // returning error would shut down the collation node } } }) - .for_each(|collation| { + .for_each(|_collation: Option| { // TODO: import into network. Ok(()) }); - Box::new(work.select(exit).then(|_| Ok(()))) + Box::new(work) as Box<_> } } @@ -229,11 +261,18 @@ impl> Application for Collation /// /// Provide a future which resolves when the node should exit. /// This function blocks until done. -pub fn run_collator(parachain_context: P, para_id: ParaId, exit: E, args: Vec<::std::ffi::OsString>) -> polkadot_cli::error::Result<()> where - P: ParachainContext, - E: IntoFuture +pub fn run_collator( + parachain_context: P, + para_id: ParaId, + exit: E, + key: Arc, + args: Vec<::std::ffi::OsString> +) -> polkadot_cli::error::Result<()> where + P: ParachainContext + 'static, + E: IntoFuture, + E::Future: Send + 'static, { - let node_logic = CollationNode { parachain_context, exit: exit.into_future(), para_id }; + let node_logic = CollationNode { parachain_context, exit: Some(exit.into_future()), para_id, key }; polkadot_cli::run(args, node_logic) } diff --git a/polkadot/consensus/src/shared_table/mod.rs b/polkadot/consensus/src/shared_table/mod.rs index 48230ac10889b..e258765980567 100644 --- a/polkadot/consensus/src/shared_table/mod.rs +++ b/polkadot/consensus/src/shared_table/mod.rs @@ -485,6 +485,7 @@ mod tests { let candidate = CandidateReceipt { parachain_index: para_id, collator: [1; 32].into(), + signature: Default::default(), head_data: ::polkadot_primitives::parachain::HeadData(vec![1, 2, 3, 4]), balance_uploads: Vec::new(), egress_queue_roots: Vec::new(), @@ -535,6 +536,7 @@ mod tests { let candidate = CandidateReceipt { parachain_index: para_id, collator: [1; 32].into(), + signature: Default::default(), head_data: ::polkadot_primitives::parachain::HeadData(vec![1, 2, 3, 4]), balance_uploads: Vec::new(), egress_queue_roots: Vec::new(), diff --git a/polkadot/primitives/src/parachain.rs b/polkadot/primitives/src/parachain.rs index e3d04e7b484e3..dcde0ce86394f 100644 --- a/polkadot/primitives/src/parachain.rs +++ b/polkadot/primitives/src/parachain.rs @@ -134,26 +134,6 @@ impl Slicable for DutyRoster { #[cfg_attr(feature = "std", serde(deny_unknown_fields))] pub struct Extrinsic; -/// Candidate parachain block. -/// -/// https://github.com/w3f/polkadot-spec/blob/master/spec.md#candidate-para-chain-block -#[derive(PartialEq, Eq, Clone)] -#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))] -#[cfg_attr(feature = "std", serde(rename_all = "camelCase"))] -#[cfg_attr(feature = "std", serde(deny_unknown_fields))] -pub struct Candidate { - /// The ID of the parachain this is a proposal for. - pub parachain_index: Id, - /// Collator's signature - pub collator_signature: CandidateSignature, - /// Unprocessed ingress queue. - /// - /// Ordered by parachain ID and block number. - pub unprocessed_ingress: ConsolidatedIngress, - /// Block data - pub block: BlockData, -} - /// Candidate receipt type. #[derive(PartialEq, Eq, Clone)] #[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))] @@ -164,6 +144,8 @@ pub struct CandidateReceipt { pub parachain_index: Id, /// The collator's relay-chain account ID pub collator: super::AccountId, + /// Signature on block data by collator. + pub signature: CandidateSignature, /// The head-data pub head_data: HeadData, /// Balance uploads to the relay chain. @@ -174,20 +156,13 @@ pub struct CandidateReceipt { pub fees: u64, } -/// A full collation. -pub struct Collation { - /// Block data. - pub block_data: BlockData, - /// The candidate receipt itself. - pub receipt: CandidateReceipt, -} - impl Slicable for CandidateReceipt { fn encode(&self) -> Vec { let mut v = Vec::new(); self.parachain_index.using_encoded(|s| v.extend(s)); self.collator.using_encoded(|s| v.extend(s)); + self.signature.using_encoded(|s| v.extend(s)); self.head_data.0.using_encoded(|s| v.extend(s)); self.balance_uploads.using_encoded(|s| v.extend(s)); self.egress_queue_roots.using_encoded(|s| v.extend(s)); @@ -200,6 +175,7 @@ impl Slicable for CandidateReceipt { Some(CandidateReceipt { parachain_index: Slicable::decode(input)?, collator: Slicable::decode(input)?, + signature: Slicable::decode(input)?, head_data: Slicable::decode(input).map(HeadData)?, balance_uploads: Slicable::decode(input)?, egress_queue_roots: Slicable::decode(input)?, @@ -231,6 +207,18 @@ impl Ord for CandidateReceipt { } } +/// A full collation. +#[derive(PartialEq, Eq, Clone)] +#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))] +#[cfg_attr(feature = "std", serde(rename_all = "camelCase"))] +#[cfg_attr(feature = "std", serde(deny_unknown_fields))] +pub struct Collation { + /// Block data. + pub block_data: BlockData, + /// Candidate receipt itself. + pub receipt: CandidateReceipt, +} + /// Parachain ingress queue message. #[derive(PartialEq, Eq, Clone)] #[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))] diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index 010cc9c8f67d9..48eb3bf8dd841 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -236,7 +236,7 @@ impl Service } /// Get shared polkadot-api instance. usually the same as the client. - pub fn clent(&self) -> Arc { + pub fn api(&self) -> Arc { self.api.clone() } From ada584fc882788743bd493c5f06686b85bd52182 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 5 Jul 2018 15:48:16 +0100 Subject: [PATCH 5/9] typo --- polkadot/cli/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index 43577f72e75dc..84292d8f7c213 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -120,7 +120,7 @@ fn base_path(matches: &clap::ArgMatches) -> PathBuf { .unwrap_or_else(default_base_path) } -/// Additional application logic making use of the ndoe, to run asynchronously before shutdown. +/// Additional application logic making use of the node, to run asynchronously before shutdown. /// /// This will be invoked with the service and spawn a future that resolves /// when complete. From 0521d2406faf217ecd0b451871419b38ebfb8934 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 5 Jul 2018 16:21:55 +0100 Subject: [PATCH 6/9] indentation fix --- polkadot/collator/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/polkadot/collator/src/lib.rs b/polkadot/collator/src/lib.rs index 0104d58df7395..96946fae56370 100644 --- a/polkadot/collator/src/lib.rs +++ b/polkadot/collator/src/lib.rs @@ -146,7 +146,7 @@ pub fn collate<'a, R, P>( -> impl Future + 'a where R: RelayChainContext + 'a, - R::Error: 'a, + R::Error: 'a, R::FutureEgress: 'a, P: ParachainContext + 'a, { @@ -303,7 +303,7 @@ mod tests { } } - #[test] + #[test] fn collates_ingress() { let route_from = |x: &[ParaId]| { let mut set = BTreeSet::new(); From b9699dd5bb66ede890e545a36699c7d393bce2dc Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 5 Jul 2018 16:43:03 +0100 Subject: [PATCH 7/9] doc grumbles --- polkadot/collator/src/lib.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/polkadot/collator/src/lib.rs b/polkadot/collator/src/lib.rs index 96946fae56370..cdcc1e2e66f7f 100644 --- a/polkadot/collator/src/lib.rs +++ b/polkadot/collator/src/lib.rs @@ -72,8 +72,9 @@ use polkadot_cli::Application; /// Parachain context needed for collation. /// /// This can be implemented through an externally attached service or a stub. +/// This is expected to be a lightweight, shared type like an Arc. pub trait ParachainContext: Clone { - /// Produce a candidate, given the latest ingress queue information. + /// Produce a candidate, given the latest ingress queue information and the last parachain head. fn produce_candidate>( &self, last_head: HeadData, @@ -135,7 +136,7 @@ pub fn collate_ingress<'a, R>(relay_context: R) .map(ConsolidatedIngress) } -/// Produce a candidate for the parachain, with given signing key. +/// Produce a candidate for the parachain, with given contexts, parent head, and signing key. pub fn collate<'a, R, P>( local_id: ParaId, last_head: HeadData, From 274fcf370e899f533d621c919cf04d8d9411f652 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Fri, 6 Jul 2018 11:19:01 +0100 Subject: [PATCH 8/9] rename Application to Worker --- polkadot/cli/src/lib.rs | 30 +++++++++++++++--------------- polkadot/collator/src/lib.rs | 4 ++-- polkadot/src/main.rs | 9 +++++---- 3 files changed, 22 insertions(+), 21 deletions(-) diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index 84292d8f7c213..2df55729d649c 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -120,12 +120,12 @@ fn base_path(matches: &clap::ArgMatches) -> PathBuf { .unwrap_or_else(default_base_path) } -/// Additional application logic making use of the node, to run asynchronously before shutdown. +/// Additional worker making use of the node, to run asynchronously before shutdown. /// /// This will be invoked with the service and spawn a future that resolves /// when complete. -pub trait Application { - /// A future that resolves when the application work is done. +pub trait Worker { + /// A future that resolves when the work is done. /// This will be run on a tokio runtime. type Work: Future; @@ -135,7 +135,7 @@ pub trait Application { /// Create exit future. fn exit(&mut self) -> Self::Exit; - /// Do application work. + /// Do work. fn work(self, service: &Service) -> Self::Work where ClientError: From<<<::Backend as ClientBackend>::State as StateMachineBackend>::Error>; } @@ -148,10 +148,10 @@ pub trait Application { /// 9556-9591 Unassigned /// 9803-9874 Unassigned /// 9926-9949 Unassigned -pub fn run(args: I, mut app: A) -> error::Result<()> where +pub fn run(args: I, mut worker: W) -> error::Result<()> where I: IntoIterator, T: Into + Clone, - A: Application, + W: Worker, { let yaml = load_yaml!("./cli.yml"); let matches = match clap::App::from_yaml(yaml).version(&(crate_version!().to_owned() + "\n")[..]).get_matches_from_safe(args) { @@ -178,11 +178,11 @@ pub fn run(args: I, mut app: A) -> error::Result<()> where } if let Some(matches) = matches.subcommand_matches("export-blocks") { - return export_blocks(matches, app.exit()); + return export_blocks(matches, worker.exit()); } if let Some(matches) = matches.subcommand_matches("import-blocks") { - return import_blocks(matches, app.exit()); + return import_blocks(matches, worker.exit()); } let spec = load_spec(&matches)?; @@ -276,8 +276,8 @@ pub fn run(args: I, mut app: A) -> error::Result<()> where let core = reactor::Core::new().expect("tokio::Core could not be created"); match role == service::Role::LIGHT { - true => run_until_exit(core, service::new_light(config)?, &matches, sys_conf, app), - false => run_until_exit(core, service::new_full(config)?, &matches, sys_conf, app), + true => run_until_exit(core, service::new_light(config)?, &matches, sys_conf, worker), + false => run_until_exit(core, service::new_full(config)?, &matches, sys_conf, worker), } } @@ -401,16 +401,16 @@ fn import_blocks(matches: &clap::ArgMatches, exit: E) -> error::Result<()> Ok(()) } -fn run_until_exit( +fn run_until_exit( mut core: reactor::Core, service: service::Service, matches: &clap::ArgMatches, sys_conf: SystemConfiguration, - mut application: A, + mut worker: W, ) -> error::Result<()> where C: service::Components, - A: Application, + W: Worker, client::error::Error: From<<<::Backend as client::backend::Backend>::State as state_machine::Backend>::Error>, { informant::start(&service, core.handle()); @@ -435,8 +435,8 @@ fn run_until_exit( ) }; - let exit = application.exit(); - let until_exit = application.work(&service).select(exit).then(|_| Ok(())); + let exit = worker.exit(); + let until_exit = worker.work(&service).select(exit).then(|_| Ok(())); core.run(until_exit) } diff --git a/polkadot/collator/src/lib.rs b/polkadot/collator/src/lib.rs index cdcc1e2e66f7f..113f4a7a05d09 100644 --- a/polkadot/collator/src/lib.rs +++ b/polkadot/collator/src/lib.rs @@ -67,7 +67,7 @@ use polkadot_api::PolkadotApi; use polkadot_primitives::BlockId; use polkadot_primitives::parachain::{self, BlockData, HeadData, ConsolidatedIngress, Collation, Message, Id as ParaId}; use polkadot_cli::{ClientError, ServiceComponents, ClientBackend, PolkadotBlock, StateMachineBackend, Service}; -use polkadot_cli::Application; +use polkadot_cli::Worker; /// Parachain context needed for collation. /// @@ -200,7 +200,7 @@ struct CollationNode { key: Arc, } -impl Application for CollationNode where +impl Worker for CollationNode where P: ParachainContext + 'static, E: Future + Send + 'static { diff --git a/polkadot/src/main.rs b/polkadot/src/main.rs index db0cbb50c8a2f..3c5fe25286a89 100644 --- a/polkadot/src/main.rs +++ b/polkadot/src/main.rs @@ -31,9 +31,9 @@ use futures::{future, Future}; use std::cell::RefCell; -// the regular polkadot application simply runs until ctrl-c -struct Application; -impl cli::Application for Application { +// the regular polkadot worker simply does nothing until ctrl-c +struct Worker; +impl cli::Worker for Worker { type Work = future::Empty<(), ()>; type Exit = future::MapErr, fn(oneshot::Canceled) -> ()>; @@ -54,6 +54,7 @@ impl cli::Application for Application { fn work(self, _service: &Service) -> Self::Work where ClientError: From<<<::Backend as ClientBackend>::State as StateMachineBackend>::Error>, { + // NOTE: future::empty is more like future::never: it never resolves future::empty() } } @@ -61,5 +62,5 @@ impl cli::Application for Application { quick_main!(run); fn run() -> cli::error::Result<()> { - cli::run(::std::env::args(), Application) + cli::run(::std::env::args(), Worker) } From 5cddcb04ef782ce3dc3eb088da602f3287f3c75a Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Fri, 6 Jul 2018 13:01:08 +0100 Subject: [PATCH 9/9] refactor Worker::exit to exit_only --- polkadot/cli/src/lib.rs | 22 ++++++++++------------ polkadot/collator/src/lib.rs | 19 ++++++++----------- polkadot/src/main.rs | 9 ++++----- 3 files changed, 22 insertions(+), 28 deletions(-) diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index 2df55729d649c..c9983c47264be 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -125,17 +125,17 @@ fn base_path(matches: &clap::ArgMatches) -> PathBuf { /// This will be invoked with the service and spawn a future that resolves /// when complete. pub trait Worker { - /// A future that resolves when the work is done. + /// A future that resolves when the work is done or the node should exit. /// This will be run on a tokio runtime. type Work: Future; - /// A future that resolves when the node should exit. + /// An exit scheduled for the future. type Exit: Future + Send + 'static; - /// Create exit future. - fn exit(&mut self) -> Self::Exit; + /// Don't work, but schedule an exit. + fn exit_only(self) -> Self::Exit; - /// Do work. + /// Do work and schedule exit. fn work(self, service: &Service) -> Self::Work where ClientError: From<<<::Backend as ClientBackend>::State as StateMachineBackend>::Error>; } @@ -148,7 +148,7 @@ pub trait Worker { /// 9556-9591 Unassigned /// 9803-9874 Unassigned /// 9926-9949 Unassigned -pub fn run(args: I, mut worker: W) -> error::Result<()> where +pub fn run(args: I, worker: W) -> error::Result<()> where I: IntoIterator, T: Into + Clone, W: Worker, @@ -178,11 +178,11 @@ pub fn run(args: I, mut worker: W) -> error::Result<()> where } if let Some(matches) = matches.subcommand_matches("export-blocks") { - return export_blocks(matches, worker.exit()); + return export_blocks(matches, worker.exit_only()); } if let Some(matches) = matches.subcommand_matches("import-blocks") { - return import_blocks(matches, worker.exit()); + return import_blocks(matches, worker.exit_only()); } let spec = load_spec(&matches)?; @@ -406,7 +406,7 @@ fn run_until_exit( service: service::Service, matches: &clap::ArgMatches, sys_conf: SystemConfiguration, - mut worker: W, + worker: W, ) -> error::Result<()> where C: service::Components, @@ -435,9 +435,7 @@ fn run_until_exit( ) }; - let exit = worker.exit(); - let until_exit = worker.work(&service).select(exit).then(|_| Ok(())); - core.run(until_exit) + core.run(worker.work(&service).then(|_| Ok(()))) } fn start_server(mut address: SocketAddr, start: F) -> Result where diff --git a/polkadot/collator/src/lib.rs b/polkadot/collator/src/lib.rs index 113f4a7a05d09..e39440e2a122c 100644 --- a/polkadot/collator/src/lib.rs +++ b/polkadot/collator/src/lib.rs @@ -195,7 +195,7 @@ impl RelayChainContext for ApiContext { struct CollationNode { parachain_context: P, - exit: Option, + exit: E, para_id: ParaId, key: Arc, } @@ -205,20 +205,16 @@ impl Worker for CollationNode where E: Future + Send + 'static { type Work = Box>; - type Exit = future::Either>; + type Exit = E; - fn exit(&mut self) -> Self::Exit { - match self.exit.take() { - Some(exit) => future::Either::A(exit), - None => future::Either::B(future::empty()), - } + fn exit_only(self) -> Self::Exit { + self.exit } fn work(self, service: &Service) -> Self::Work where ClientError: From<<<::Backend as ClientBackend>::State as StateMachineBackend>::Error>, { let CollationNode { parachain_context, exit, para_id, key } = self; - let _ = exit; let client = service.client(); let api = service.api(); @@ -243,7 +239,7 @@ impl Worker for CollationNode where future::Either::B(future::ok(None)) } Err(e) => { - info!("Could not collate for parachain {:?}: {:?}", id, e); + warn!("Could not collate for parachain {:?}: {:?}", id, e); future::Either::B(future::ok(None)) // returning error would shut down the collation node } } @@ -253,7 +249,8 @@ impl Worker for CollationNode where Ok(()) }); - Box::new(work) as Box<_> + let work_and_exit = work.select(exit).then(|_| Ok(())); + Box::new(work_and_exit) as Box<_> } } @@ -273,7 +270,7 @@ pub fn run_collator( E: IntoFuture, E::Future: Send + 'static, { - let node_logic = CollationNode { parachain_context, exit: Some(exit.into_future()), para_id, key }; + let node_logic = CollationNode { parachain_context, exit: exit.into_future(), para_id, key }; polkadot_cli::run(args, node_logic) } diff --git a/polkadot/src/main.rs b/polkadot/src/main.rs index 3c5fe25286a89..65c87d85f0a2e 100644 --- a/polkadot/src/main.rs +++ b/polkadot/src/main.rs @@ -34,10 +34,10 @@ use std::cell::RefCell; // the regular polkadot worker simply does nothing until ctrl-c struct Worker; impl cli::Worker for Worker { - type Work = future::Empty<(), ()>; + type Work = Self::Exit; type Exit = future::MapErr, fn(oneshot::Canceled) -> ()>; - fn exit(&mut self) -> Self::Exit { + fn exit_only(self) -> Self::Exit { // can't use signal directly here because CtrlC takes only `Fn`. let (exit_send, exit) = oneshot::channel(); @@ -51,11 +51,10 @@ impl cli::Worker for Worker { exit.map_err(drop) } - fn work(self, _service: &Service) -> Self::Work + fn work(self, _service: &Service) -> Self::Exit where ClientError: From<<<::Backend as ClientBackend>::State as StateMachineBackend>::Error>, { - // NOTE: future::empty is more like future::never: it never resolves - future::empty() + self.exit_only() } }