From 2fc9ee6b9717a4f2145c8cde53ef3c135cc847fc Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 16 Apr 2018 16:26:43 +0200 Subject: [PATCH 1/5] barrier on starting network --- polkadot/service/src/lib.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index 9c0adc23d9fe6..3e0570582fe43 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -242,6 +242,8 @@ fn local_testnet_config() -> ChainConfig { impl Service { /// Creates and register protocol with the network service pub fn new(mut config: Configuration) -> Result { + use std::sync::Barrier; + // Create client let executor = polkadot_executor::Executor::new(); let mut storage = Default::default(); @@ -299,8 +301,13 @@ impl Service { let thread_client = client.clone(); let thread_network = network.clone(); let thread_txpool = transaction_pool.clone(); + + let barrier = ::std::sync::Arc::new(Barrier::new(2)); + let thread_barrier = barrier.clone(); let thread = thread::spawn(move || { thread_network.start_network(); + + thread_barrier.wait(); let mut core = Core::new().expect("tokio::Core could not be created"); let events = thread_client.import_notification_stream().for_each(|notification| { thread_network.on_block_imported(notification.hash, ¬ification.header); @@ -313,6 +320,11 @@ impl Service { } debug!("Polkadot service shutdown"); }); + + // before returning, make sure the network is started. avoids a race + // between the drop killing notification listeners and the new notification + // stream being stsarted. + barrier.wait(); Ok(Service { thread: Some(thread), client: client, @@ -359,6 +371,7 @@ impl Drop for Service { fn drop(&mut self) { self.client.stop_notifications(); self.network.stop_network(); + if let Some(thread) = self.thread.take() { thread.join().expect("The service thread has panicked"); } From 6ff852f44ed48219a1bfe56adb618b0d631738fe Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 16 Apr 2018 18:24:22 +0200 Subject: [PATCH 2/5] handle exit better --- Cargo.lock | 12 +++++ polkadot/cli/src/lib.rs | 14 +++--- polkadot/consensus/Cargo.toml | 1 + polkadot/consensus/src/lib.rs | 1 + polkadot/consensus/src/service.rs | 73 +++++++++++++++++++----------- polkadot/service/Cargo.toml | 1 + polkadot/service/src/lib.rs | 75 +++++++++++++++++++------------ 7 files changed, 116 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aca5f344ef6bc..db584a798d01d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -576,6 +576,15 @@ dependencies = [ "tiny-keccak 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "exit-future" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "fixed-hash" version = "0.1.3" @@ -1229,6 +1238,7 @@ version = "0.1.0" dependencies = [ "ed25519 0.1.0", "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", + "exit-future 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1331,6 +1341,7 @@ version = "0.1.0" dependencies = [ "ed25519 0.1.0", "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", + "exit-future 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", "hex-literal 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2537,6 +2548,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum ethereum-types 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "53eabbad504e438e20b6559fd070d79b92cb31c02f994c7ecb05e9b2df716013" "checksum ethereum-types-serialize 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4ac59a21a9ce98e188f3dace9eb67a6c4a3c67ec7fbc7218cb827852679dc002" "checksum ethkey 0.3.0 (git+https://github.com/paritytech/parity.git)" = "" +"checksum exit-future 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6c30ec160f83d938faf4dd6fdaba4f42316f1bd3df76b902df2d824f47fa9db7" "checksum fixed-hash 0.1.3 (git+https://github.com/rphmeier/primitives.git?branch=compile-for-wasm)" = "" "checksum fixed-hash 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "21c520ebc46522d519aec9cba2b7115d49cea707d771b772c46bec61aa0daeb8" "checksum fixed-hash 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "362f32e2fbc5ed45f01a23ca074f936bb3aee4122a66e7118e8c3e965d96104c" diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index 95a432b25261f..2df824eb941ae 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -146,11 +146,15 @@ pub fn run(args: I) -> error::Result<()> where informant::start(&service, core.handle()); - let (exit_send, exit) = mpsc::channel(1); - ctrlc::CtrlC::set_handler(move || { - exit_send.clone().send(()).wait().expect("Error sending exit notification"); - }); - core.run(exit.into_future()).expect("Error running informant event loop"); + { + // can't use signal directly here because CtrlC takes only `Fn`. + let (exit_send, exit) = mpsc::channel(1); + ctrlc::CtrlC::set_handler(move || { + exit_send.clone().send(()).wait().expect("Error sending exit notification"); + }); + core.run(exit.into_future()).expect("Error running informant event loop"); + } + Ok(()) } diff --git a/polkadot/consensus/Cargo.toml b/polkadot/consensus/Cargo.toml index 9d883b5003a9f..29d5588865159 100644 --- a/polkadot/consensus/Cargo.toml +++ b/polkadot/consensus/Cargo.toml @@ -10,6 +10,7 @@ tokio-core = "0.1.12" ed25519 = { path = "../../substrate/ed25519" } error-chain = "0.11" log = "0.3" +exit-future = "0.1" polkadot-api = { path = "../api" } polkadot-collator = { path = "../collator" } polkadot-primitives = { path = "../primitives" } diff --git a/polkadot/consensus/src/lib.rs b/polkadot/consensus/src/lib.rs index cefb625afcea1..2fef41c2d468e 100644 --- a/polkadot/consensus/src/lib.rs +++ b/polkadot/consensus/src/lib.rs @@ -44,6 +44,7 @@ extern crate substrate_primitives as primitives; extern crate substrate_runtime_support as runtime_support; extern crate substrate_network; +extern crate exit_future; extern crate tokio_core; extern crate substrate_keyring; extern crate substrate_client as client; diff --git a/polkadot/consensus/src/service.rs b/polkadot/consensus/src/service.rs index b0be89140bfa0..12f7d77df4c1a 100644 --- a/polkadot/consensus/src/service.rs +++ b/polkadot/consensus/src/service.rs @@ -265,9 +265,11 @@ impl Service { client: Arc, network: Arc, transaction_pool: Arc>, - key: ed25519::Pair + key: ed25519::Pair, + exit: ::exit_future::Exit, ) -> Service - where C: BlockchainEvents + ChainHead + bft::BlockImport + bft::Authorities + PolkadotApi + Send + Sync + 'static + where + C: BlockchainEvents + ChainHead + bft::BlockImport + bft::Authorities + PolkadotApi + Send + Sync + 'static, { let thread = thread::spawn(move || { let mut core = reactor::Core::new().expect("tokio::Core could not be created"); @@ -281,15 +283,26 @@ impl Service { let messages = SharedMessageCollection::new(); let bft_service = Arc::new(BftService::new(client.clone(), key, factory)); - let handle = core.handle(); - let notifications = client.import_notification_stream().for_each(|notification| { - if notification.is_new_best { - start_bft(¬ification.header, handle.clone(), &*client, network.clone(), &*bft_service, messages.clone()); - } - Ok(()) - }); + let notifications = { + let handle = core.handle(); + let network = network.clone(); + let client = client.clone(); + let bft_service = bft_service.clone(); + let messages = messages.clone(); - let interval = reactor::Interval::new_at(Instant::now() + Duration::from_millis(TIMER_DELAY_MS), Duration::from_millis(TIMER_INTERVAL_MS), &handle).unwrap(); + client.import_notification_stream().for_each(move |notification| { + if notification.is_new_best { + start_bft(¬ification.header, handle.clone(), &*client, network.clone(), &*bft_service, messages.clone()); + } + Ok(()) + }) + }; + + let interval = reactor::Interval::new_at( + Instant::now() + Duration::from_millis(TIMER_DELAY_MS), + Duration::from_millis(TIMER_INTERVAL_MS), + &core.handle(), + ).expect("it is always possible to create an interval with valid params"); let mut prev_best = match client.best_block_header() { Ok(header) => header.blake2_256(), Err(e) => { @@ -297,25 +310,31 @@ impl Service { return; } }; - let c = client.clone(); - let s = bft_service.clone(); - let n = network.clone(); - let m = messages.clone(); - let handle = core.handle(); - let timed = interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| { - if let Ok(best_block) = c.best_block_header() { - let hash = best_block.blake2_256(); - m.collect_garbage(); - if hash == prev_best { - debug!("Starting consensus round after a timeout"); - start_bft(&best_block, handle.clone(), &*c, n.clone(), &*s, m.clone()); + + let timed = { + let c = client.clone(); + let s = bft_service.clone(); + let n = network.clone(); + let m = messages.clone(); + let handle = core.handle(); + + interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| { + if let Ok(best_block) = c.best_block_header() { + let hash = best_block.blake2_256(); + m.collect_garbage(); + if hash == prev_best { + debug!("Starting consensus round after a timeout"); + start_bft(&best_block, handle.clone(), &*c, n.clone(), &*s, m.clone()); + } + prev_best = hash; } - prev_best = hash; - } - Ok(()) - }); + Ok(()) + }) + }; + + core.handle().spawn(notifications); core.handle().spawn(timed); - if let Err(e) = core.run(notifications) { + if let Err(e) = core.run(exit) { debug!("BFT event loop error {:?}", e); } }); diff --git a/polkadot/service/Cargo.toml b/polkadot/service/Cargo.toml index 82c672f10e0aa..d88c069470af2 100644 --- a/polkadot/service/Cargo.toml +++ b/polkadot/service/Cargo.toml @@ -11,6 +11,7 @@ tokio-timer = "0.1.2" error-chain = "0.11" log = "0.3" tokio-core = "0.1.12" +exit-future = "0.1" ed25519 = { path = "../../substrate/ed25519" } polkadot-primitives = { path = "../primitives" } polkadot-runtime = { path = "../runtime" } diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index 3e0570582fe43..ba5883f84fbd7 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -28,14 +28,15 @@ extern crate polkadot_api; extern crate polkadot_consensus as consensus; extern crate polkadot_transaction_pool as transaction_pool; extern crate polkadot_keystore as keystore; +extern crate substrate_client as client; extern crate substrate_runtime_io as runtime_io; extern crate substrate_primitives as primitives; extern crate substrate_network as network; extern crate substrate_codec as codec; extern crate substrate_executor; +extern crate exit_future; extern crate tokio_core; -extern crate substrate_client as client; #[macro_use] extern crate error_chain; @@ -65,6 +66,7 @@ use polkadot_runtime::{GenesisConfig, ConsensusConfig, CouncilConfig, DemocracyC use client::{genesis, BlockchainEvents}; use client::in_mem::Backend as InMemory; use network::ManageNetwork; +use exit_future::Signal; pub use self::error::{ErrorKind, Error}; pub use config::{Configuration, Role, ChainSpec}; @@ -77,6 +79,7 @@ pub struct Service { client: Arc, network: Arc, transaction_pool: Arc>, + signal: Option, _consensus: Option, } @@ -244,6 +247,8 @@ impl Service { pub fn new(mut config: Configuration) -> Result { use std::sync::Barrier; + let (signal, exit) = ::exit_future::signal(); + // Create client let executor = polkadot_executor::Executor::new(); let mut storage = Default::default(); @@ -286,50 +291,58 @@ impl Service { chain: client.clone(), transaction_pool: transaction_pool_adapter, }; + let network = network::Service::new(network_params)?; + let barrier = ::std::sync::Arc::new(Barrier::new(2)); + + let thread = { + let client = client.clone(); + let network = network.clone(); + let txpool = transaction_pool.clone(); + let exit = exit.clone(); + + let thread_barrier = barrier.clone(); + thread::spawn(move || { + network.start_network(); + + thread_barrier.wait(); + let mut core = Core::new().expect("tokio::Core could not be created"); + let events = client.import_notification_stream().for_each(move |notification| { + network.on_block_imported(notification.hash, ¬ification.header); + prune_imported(&*client, &*txpool, notification.hash); + + Ok(()) + }); + + core.handle().spawn(events); + if let Err(e) = core.run(exit) { + debug!("Polkadot service event loop shutdown with {:?}", e); + } + debug!("Polkadot service shutdown"); + }) + }; + + // before returning, make sure the network is started. avoids a race + // between the drop killing notification listeners and the new notification + // stream being started. + barrier.wait(); // Spin consensus service if configured let consensus_service = if config.roles & Role::VALIDATOR == Role::VALIDATOR { // Load the first available key. Code above makes sure it exisis. let key = keystore.load(&keystore.contents()?[0], "")?; info!("Using authority key {:?}", key.public()); - Some(consensus::Service::new(client.clone(), network.clone(), transaction_pool.clone(), key)) + Some(consensus::Service::new(client.clone(), network.clone(), transaction_pool.clone(), key, exit)) } else { None }; - let thread_client = client.clone(); - let thread_network = network.clone(); - let thread_txpool = transaction_pool.clone(); - - let barrier = ::std::sync::Arc::new(Barrier::new(2)); - let thread_barrier = barrier.clone(); - let thread = thread::spawn(move || { - thread_network.start_network(); - - thread_barrier.wait(); - let mut core = Core::new().expect("tokio::Core could not be created"); - let events = thread_client.import_notification_stream().for_each(|notification| { - thread_network.on_block_imported(notification.hash, ¬ification.header); - prune_imported(&*thread_client, &*thread_txpool, notification.hash); - - Ok(()) - }); - if let Err(e) = core.run(events) { - debug!("Polkadot service event loop shutdown with {:?}", e); - } - debug!("Polkadot service shutdown"); - }); - - // before returning, make sure the network is started. avoids a race - // between the drop killing notification listeners and the new notification - // stream being stsarted. - barrier.wait(); Ok(Service { thread: Some(thread), client: client, network: network, transaction_pool: transaction_pool, + signal: Some(signal), _consensus: consensus_service, }) } @@ -372,6 +385,10 @@ impl Drop for Service { self.client.stop_notifications(); self.network.stop_network(); + if let Some(signal) = self.signal.take() { + signal.fire(); + } + if let Some(thread) = self.thread.take() { thread.join().expect("The service thread has panicked"); } From f20987bd1e204b0950e1994e257a4cd107b8cc9f Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 16 Apr 2018 19:26:54 +0200 Subject: [PATCH 3/5] give consensus service its own internal exit signal --- polkadot/consensus/src/service.rs | 20 +++++++++++++------- polkadot/service/src/lib.rs | 3 +-- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/polkadot/consensus/src/service.rs b/polkadot/consensus/src/service.rs index 12f7d77df4c1a..bc40b16308109 100644 --- a/polkadot/consensus/src/service.rs +++ b/polkadot/consensus/src/service.rs @@ -218,11 +218,6 @@ impl Sink for BftSink { } } -/// Consensus service. Starts working when created. -pub struct Service { - thread: Option>, -} - struct Network(Arc); fn start_bft( @@ -259,6 +254,12 @@ fn start_bft( } } +/// Consensus service. Starts working when created. +pub struct Service { + thread: Option>, + exit_signal: Option<::exit_future::Signal>, +} + impl Service { /// Create and start a new instance. pub fn new( @@ -266,11 +267,11 @@ impl Service { network: Arc, transaction_pool: Arc>, key: ed25519::Pair, - exit: ::exit_future::Exit, ) -> Service where C: BlockchainEvents + ChainHead + bft::BlockImport + bft::Authorities + PolkadotApi + Send + Sync + 'static, { + let (signal, exit) = ::exit_future::signal(); let thread = thread::spawn(move || { let mut core = reactor::Core::new().expect("tokio::Core could not be created"); let key = Arc::new(key); @@ -339,13 +340,18 @@ impl Service { } }); Service { - thread: Some(thread) + thread: Some(thread), + exit_signal: Some(signal), } } } impl Drop for Service { fn drop(&mut self) { + if let Some(signal) = self.exit_signal.take() { + signal.fire(); + } + if let Some(thread) = self.thread.take() { thread.join().expect("The service thread has panicked"); } diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index ba5883f84fbd7..d237aa9f102de 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -299,7 +299,6 @@ impl Service { let client = client.clone(); let network = network.clone(); let txpool = transaction_pool.clone(); - let exit = exit.clone(); let thread_barrier = barrier.clone(); thread::spawn(move || { @@ -332,7 +331,7 @@ impl Service { // Load the first available key. Code above makes sure it exisis. let key = keystore.load(&keystore.contents()?[0], "")?; info!("Using authority key {:?}", key.public()); - Some(consensus::Service::new(client.clone(), network.clone(), transaction_pool.clone(), key, exit)) + Some(consensus::Service::new(client.clone(), network.clone(), transaction_pool.clone(), key)) } else { None }; From 1e6af1cd2e5e4a04aacb35a5d10fe99615b5844b Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 16 Apr 2018 19:27:43 +0200 Subject: [PATCH 4/5] update comment --- polkadot/service/src/lib.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index d237aa9f102de..40ca49a62ef33 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -321,9 +321,8 @@ impl Service { }) }; - // before returning, make sure the network is started. avoids a race - // between the drop killing notification listeners and the new notification - // stream being started. + // wait for the network to start up before starting the consensus + // service. barrier.wait(); // Spin consensus service if configured From 30e9d3fe8fba8bbc59577f831ac7733f4c784a52 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 18 Apr 2018 13:33:22 +0200 Subject: [PATCH 5/5] remove stop_notifications and fix build --- polkadot/cli/src/lib.rs | 4 +++- polkadot/service/src/lib.rs | 1 - substrate/client/src/client.rs | 5 ----- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index 35a5d3f25c356..e504028c768d0 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -74,6 +74,8 @@ pub fn run(args: I) -> error::Result<()> where ctrlc::CtrlC::set_handler(move || { exit_send.clone().send(()).wait().expect("Error sending exit notification"); }); + + exit }; let yaml = load_yaml!("./cli.yml"); @@ -161,7 +163,7 @@ pub fn run(args: I) -> error::Result<()> where ) }; - core.run(exit.into_future()).expect("Error running informant event loop"); + core.run(exit.into_future()).expect("Error running informant event loop"); Ok(()) } diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index 8d1b8f125154d..d21415f7d0a7b 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -380,7 +380,6 @@ pub fn prune_imported(client: &Client, pool: &Mutex, hash: Head impl Drop for Service { fn drop(&mut self) { - self.client.stop_notifications(); self.network.stop_network(); if let Some(signal) = self.signal.take() { diff --git a/substrate/client/src/client.rs b/substrate/client/src/client.rs index 0eba8d9fa56bd..ac016166caf29 100644 --- a/substrate/client/src/client.rs +++ b/substrate/client/src/client.rs @@ -212,11 +212,6 @@ impl Client where self.executor.clone() } - /// Close notification streams. - pub fn stop_notifications(&self) { - self.import_notification_sinks.lock().clear(); - } - /// Get the current set of authorities from storage. pub fn authorities_at(&self, id: &BlockId) -> error::Result> { let state = self.state_at(id)?;