From 64e58066f87ab95fe75617c8cb93639104434c21 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Tue, 9 Jun 2020 09:35:35 +0200 Subject: [PATCH 1/4] Initial commit Forked at: 85cd5569fd52b7de2c9628c470d947d60c843bc8 Parent branch: origin/master From ba8b10ef1acd00d38b2fa65ed260d3761307b30d Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Tue, 9 Jun 2020 10:19:05 +0200 Subject: [PATCH 2/4] WIP Forked at: 85cd5569fd52b7de2c9628c470d947d60c843bc8 Parent branch: origin/master --- client/cli/src/runner.rs | 5 +++-- client/service/src/lib.rs | 31 ++++++++++--------------------- 2 files changed, 13 insertions(+), 23 deletions(-) diff --git a/client/cli/src/runner.rs b/client/cli/src/runner.rs index 6c220b5261aec..47f2c8d461da8 100644 --- a/client/cli/src/runner.rs +++ b/client/cli/src/runner.rs @@ -264,7 +264,7 @@ impl Runner { F: FnOnce(Configuration) -> std::result::Result, T: AbstractService + Unpin, { - let service = service_builder(self.config)?; + let mut service = service_builder(self.config)?; let informant_future = sc_informant::build(&service, sc_informant::OutputFormat::Coloured); let _informant_handle = self.tokio_runtime.spawn(informant_future); @@ -275,12 +275,13 @@ impl Runner { let _telemetry = service.telemetry(); { - let f = service.fuse(); + let f = service.future().fuse(); self.tokio_runtime .block_on(main(f)) .map_err(|e| e.to_string())?; } + drop(service); // The `service` **must** have been destroyed here for the shutdown signal to propagate // to all the tasks. Dropping `tokio_runtime` will block the thread until all tasks have // shut down. diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 4f2be23f877ba..64804c7f7b8b9 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -42,7 +42,7 @@ use std::net::SocketAddr; use std::collections::HashMap; use std::time::Duration; use wasm_timer::Instant; -use std::task::{Poll, Context}; +use std::task::Poll; use parking_lot::Mutex; use client::Client; @@ -132,7 +132,7 @@ pub struct Service { impl Unpin for Service {} /// Abstraction over a Substrate service. -pub trait AbstractService: Future> + Send + Unpin + Spawn + 'static { +pub trait AbstractService: Send + Unpin + Spawn + 'static { /// Type of block of this chain. type Block: BlockT; /// Backend storage for the client. @@ -210,6 +210,9 @@ pub trait AbstractService: Future> + Send + Unpin + S /// Get the prometheus metrics registry, if available. fn prometheus_registry(&self) -> Option; + + /// Return a future that will end if an essential task fails + fn future<'a>(&'a mut self) -> Pin> + 'a>>; } impl AbstractService for @@ -310,27 +313,13 @@ where fn prometheus_registry(&self) -> Option { self.prometheus_registry.clone() } -} - -impl Future for - Service -{ - type Output = Result<(), Error>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let this = Pin::into_inner(self); - match Pin::new(&mut this.essential_failed_rx).poll_next(cx) { - Poll::Pending => {}, - Poll::Ready(_) => { - // Ready(None) should not be possible since we hold a live - // sender. - return Poll::Ready(Err(Error::Other("Essential task failed.".into()))); - } - } + fn future<'a>(&'a mut self) -> Pin> + 'a>> { + Box::pin(async move { + self.essential_failed_rx.next().await; - // The service future never ends. - Poll::Pending + Err(Error::Other("Essential task failed.".into())) + }) } } From 3ee8867e9cc74be832c422dfa1d565a3517288fd Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Tue, 9 Jun 2020 10:52:54 +0200 Subject: [PATCH 3/4] WIP Forked at: 85cd5569fd52b7de2c9628c470d947d60c843bc8 Parent branch: origin/master --- client/cli/src/runner.rs | 21 ++++++--------------- client/service/src/lib.rs | 9 ++++++++- client/service/src/task_manager.rs | 11 ++++++++--- 3 files changed, 22 insertions(+), 19 deletions(-) diff --git a/client/cli/src/runner.rs b/client/cli/src/runner.rs index 47f2c8d461da8..b8c4c9360c7ed 100644 --- a/client/cli/src/runner.rs +++ b/client/cli/src/runner.rs @@ -269,22 +269,13 @@ impl Runner { let informant_future = sc_informant::build(&service, sc_informant::OutputFormat::Coloured); let _informant_handle = self.tokio_runtime.spawn(informant_future); - // we eagerly drop the service so that the internal exit future is fired, - // but we need to keep holding a reference to the global telemetry guard - // and drop the runtime first. - let _telemetry = service.telemetry(); - - { - let f = service.future().fuse(); - self.tokio_runtime - .block_on(main(f)) - .map_err(|e| e.to_string())?; - } + let f = service.future().fuse(); + self.tokio_runtime + .block_on(main(f)) + .map_err(|e| e.to_string())?; - drop(service); - // The `service` **must** have been destroyed here for the shutdown signal to propagate - // to all the tasks. Dropping `tokio_runtime` will block the thread until all tasks have - // shut down. + service.terminate(); + // Dropping `tokio_runtime` will block the thread until all tasks have shut down. drop(self.tokio_runtime); Ok(()) diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 64804c7f7b8b9..a6b262d6bc4e2 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -211,8 +211,11 @@ pub trait AbstractService: Send + Unpin + Spawn + 'static { /// Get the prometheus metrics registry, if available. fn prometheus_registry(&self) -> Option; - /// Return a future that will end if an essential task fails + /// Return a future that will end if an essential task fails. fn future<'a>(&'a mut self) -> Pin> + 'a>>; + + /// Signal to terminate all the running tasks. + fn terminate(&mut self); } impl AbstractService for @@ -321,6 +324,10 @@ where Err(Error::Other("Essential task failed.".into())) }) } + + fn terminate(&mut self) { + self.task_manager.terminate() + } } impl Spawn for diff --git a/client/service/src/task_manager.rs b/client/service/src/task_manager.rs index 553ca9c326d8b..f9db91e8ef881 100644 --- a/client/service/src/task_manager.rs +++ b/client/service/src/task_manager.rs @@ -196,14 +196,19 @@ impl TaskManager { pub(super) fn on_exit(&self) -> exit_future::Exit { self.on_exit.clone() } + + /// Signal to terminate all the running tasks. + pub(super) fn terminate(&mut self) { + if let Some(signal) = self.signal.take() { + let _ = signal.fire(); + } + } } impl Drop for TaskManager { fn drop(&mut self) { debug!(target: "service", "Tasks manager shutdown"); - if let Some(signal) = self.signal.take() { - let _ = signal.fire(); - } + self.terminate(); } } From 4cc5e4b167337c8e3f05c5e0fe8f9f093af4efa3 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Tue, 9 Jun 2020 11:30:58 +0200 Subject: [PATCH 4/4] fix tests --- client/service/src/lib.rs | 4 ++-- client/service/test/src/lib.rs | 3 --- utils/browser/src/lib.rs | 2 +- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index a6b262d6bc4e2..30846c6101ceb 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -212,7 +212,7 @@ pub trait AbstractService: Send + Unpin + Spawn + 'static { fn prometheus_registry(&self) -> Option; /// Return a future that will end if an essential task fails. - fn future<'a>(&'a mut self) -> Pin> + 'a>>; + fn future<'a>(&'a mut self) -> Pin> + Send + 'a>>; /// Signal to terminate all the running tasks. fn terminate(&mut self); @@ -317,7 +317,7 @@ where self.prometheus_registry.clone() } - fn future<'a>(&'a mut self) -> Pin> + 'a>> { + fn future<'a>(&'a mut self) -> Pin> + Send + 'a>> { Box::pin(async move { self.essential_failed_rx.next().await; diff --git a/client/service/test/src/lib.rs b/client/service/test/src/lib.rs index 206153082505c..dbf91e7b39a1d 100644 --- a/client/service/test/src/lib.rs +++ b/client/service/test/src/lib.rs @@ -273,7 +273,6 @@ impl TestNet where let (service, user_data) = authority(node_config).expect("Error creating test node service"); let service = SyncService::from(service); - executor.spawn(service.clone().map_err(|_| ())); let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().clone().into())); self.authority_nodes.push((self.nodes, service, user_data, addr)); self.nodes += 1; @@ -289,7 +288,6 @@ impl TestNet where let (service, user_data) = full(node_config).expect("Error creating test node service"); let service = SyncService::from(service); - executor.spawn(service.clone().map_err(|_| ())); let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().clone().into())); self.full_nodes.push((self.nodes, service, user_data, addr)); self.nodes += 1; @@ -304,7 +302,6 @@ impl TestNet where let addr = node_config.network.listen_addresses.iter().next().unwrap().clone(); let service = SyncService::from(light(node_config).expect("Error creating test node service")); - executor.spawn(service.clone().map_err(|_| ())); let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().clone().into())); self.light_nodes.push((self.nodes, service, addr)); self.nodes += 1; diff --git a/utils/browser/src/lib.rs b/utils/browser/src/lib.rs index 408ba24cfed22..4eb8a7412b760 100644 --- a/utils/browser/src/lib.rs +++ b/utils/browser/src/lib.rs @@ -141,7 +141,7 @@ pub fn start_client(mut service: impl AbstractService) -> Client { } } - Pin::new(&mut service) + Pin::new(&mut service.future()) .poll(cx) .map(drop) }));