From 748043e403246684275bbe7982cb1a888a5637fe Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 29 Jun 2020 11:55:25 +0200 Subject: [PATCH 1/3] Use async/await in build_network_future --- Cargo.lock | 8 +- client/informant/src/lib.rs | 5 +- client/service/src/builder.rs | 12 +- client/service/src/lib.rs | 244 +++++++++++++++------------ primitives/utils/src/status_sinks.rs | 152 ++++++++++++----- 5 files changed, 255 insertions(+), 166 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 89b24d0826f97..8a84eaf3bd03b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4976,18 +4976,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "0.4.9" +version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f6a7f5eee6292c559c793430c55c00aea9d3b3d1905e855806ca4d7253426a2" +checksum = "12e3a6cdbfe94a5e4572812a0201f8c0ed98c1c452c7b8563ce2276988ef9c17" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "0.4.9" +version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8988430ce790d8682672117bc06dda364c0be32d3abd738234f19f3240bad99a" +checksum = "6a0ffd45cf79d88737d7cc85bfd5d2894bee1139b356e616fe85dc389c61aaf7" dependencies = [ "proc-macro2", "quote 1.0.6", diff --git a/client/informant/src/lib.rs b/client/informant/src/lib.rs index d56afcf335917..3daf29a9f7837 100644 --- a/client/informant/src/lib.rs +++ b/client/informant/src/lib.rs @@ -29,7 +29,6 @@ use sp_runtime::traits::{Block as BlockT, Header}; use sp_transaction_pool::TransactionPool; use sp_utils::{status_sinks, mpsc::tracing_unbounded}; use std::{fmt::Display, sync::Arc, time::Duration, collections::VecDeque}; -use parking_lot::Mutex; mod display; @@ -82,7 +81,7 @@ impl TransactionPoolAndMaybeMallogSizeOf for /// Builds the informant and returns a `Future` that drives the informant. pub fn build( client: Arc, - network_status_sinks: Arc, NetworkState)>>>, + network_status_sinks: Arc, NetworkState)>>, pool: Arc, format: OutputFormat, ) -> impl futures::Future @@ -94,7 +93,7 @@ where let client_1 = client.clone(); let (network_status_sink, network_status_stream) = tracing_unbounded("mpsc_network_status"); - network_status_sinks.lock().push(Duration::from_millis(5000), network_status_sink); + network_status_sinks.push(Duration::from_millis(5000), network_status_sink); let display_notifications = network_status_stream .for_each(move |(net_status, _)| { diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 1fbf301f5b45b..b1303aa486bcb 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -1237,7 +1237,7 @@ async fn telemetry_periodic_send( client: Arc>, transaction_pool: Arc, mut metrics_service: MetricsService, - network_status_sinks: Arc, NetworkState)>>> + network_status_sinks: Arc, NetworkState)>> ) where TBl: BlockT, @@ -1247,7 +1247,7 @@ async fn telemetry_periodic_send( TBackend: sc_client_api::backend::Backend, { let (state_tx, state_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat1"); - network_status_sinks.lock().push(std::time::Duration::from_millis(5000), state_tx); + network_status_sinks.push(std::time::Duration::from_millis(5000), state_tx); state_rx.for_each(move |(net_status, _)| { let info = client.usage_info(); metrics_service.tick( @@ -1260,11 +1260,11 @@ async fn telemetry_periodic_send( } async fn telemetry_periodic_network_state( - network_status_sinks: Arc, NetworkState)>>> + network_status_sinks: Arc, NetworkState)>> ) { // Periodically send the network state to the telemetry. let (netstat_tx, netstat_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat2"); - network_status_sinks.lock().push(std::time::Duration::from_secs(30), netstat_tx); + network_status_sinks.push(std::time::Duration::from_secs(30), netstat_tx); netstat_rx.for_each(move |(_, network_state)| { telemetry!( SUBSTRATE_INFO; @@ -1429,7 +1429,7 @@ fn build_network( ) -> Result< ( Arc::Hash>>, - Arc, NetworkState)>>>, + Arc, NetworkState)>>, Pin + Send>> ), Error @@ -1490,7 +1490,7 @@ fn build_network( let has_bootnodes = !network_params.network_config.boot_nodes.is_empty(); let network_mut = sc_network::NetworkWorker::new(network_params)?; let network = network_mut.service().clone(); - let network_status_sinks = Arc::new(Mutex::new(status_sinks::StatusSinks::new())); + let network_status_sinks = Arc::new(status_sinks::StatusSinks::new()); let future = build_network_future( config.role.clone(), diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 036c95777323e..084f7312e3851 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -20,7 +20,7 @@ //! Manages communication between them. #![warn(missing_docs)] -#![recursion_limit="128"] +#![recursion_limit = "1024"] pub mod config; #[macro_use] @@ -49,6 +49,7 @@ use futures::{ Future, FutureExt, Stream, StreamExt, compat::*, sink::SinkExt, + stream, task::{Spawn, FutureObj, SpawnError}, }; use sc_network::{NetworkService, NetworkStatus, network_state::NetworkState, PeerId}; @@ -113,7 +114,7 @@ pub struct Service { network: Arc, // Sinks to propagate network status updates. // For each element, every time the `Interval` fires we push an element on the sender. - network_status_sinks: Arc>>, + network_status_sinks: Arc>, transaction_pool: Arc, // Send a signal when a spawned essential task has concluded. The next time // the service future is polled it should complete with an error. @@ -315,7 +316,7 @@ where fn network_status(&self, interval: Duration) -> TracingUnboundedReceiver<(NetworkStatus, NetworkState)> { let (sink, stream) = tracing_unbounded("mpsc_network_status"); - self.network_status_sinks.lock().push(interval, sink); + self.network_status_sinks.push(interval, sink); stream } @@ -373,7 +374,7 @@ impl Spawn for /// Builds a never-ending future that continuously polls the network. /// /// The `status_sink` contain a list of senders to send a periodic network status to. -fn build_network_future< +async fn build_network_future< B: BlockT, C: BlockchainEvents, H: sc_network::ExHashT @@ -381,128 +382,151 @@ fn build_network_future< role: Role, mut network: sc_network::NetworkWorker, client: Arc, - status_sinks: Arc, NetworkState)>>>, - mut rpc_rx: TracingUnboundedReceiver>, + status_sinks: Arc, NetworkState)>>, + rpc_rx: TracingUnboundedReceiver>, should_have_peers: bool, announce_imported_blocks: bool, -) -> impl Future { +) { let mut imported_blocks_stream = client.import_notification_stream().fuse(); let mut finality_notification_stream = client.finality_notification_stream().fuse(); - futures::future::poll_fn(move |cx| { + // In the case where the channel containing RPC requests shuts down, we want the network + // future to continue running. To make that easier, we make sure that `rpc_rx` never finishes. + let mut rpc_rx = rpc_rx.chain(stream::pending()).fuse(); + + loop { let before_polling = Instant::now(); - // We poll `imported_blocks_stream`. - while let Poll::Ready(Some(notification)) = Pin::new(&mut imported_blocks_stream).poll_next(cx) { - if announce_imported_blocks { - network.service().announce_block(notification.hash, Vec::new()); + // This future does the same as `finality_notification_stream.next()`, except that if + // multiple events are ready on the stream, only the last one is returned. + let mut last_finality_notification = futures::future::poll_fn(|cx| { + let mut last = None; + while let Poll::Ready(Some(item)) = Pin::new(&mut finality_notification_stream).poll_next(cx) { + last = Some(item); + } + if let Some(last) = last { + Poll::Ready(last) + } else { + Poll::Pending } + }).fuse(); + + futures::select!{ + // List of blocks that the client has imported. + notification = imported_blocks_stream.next() => { + let notification = match notification { + Some(n) => n, + // If this stream is shut down, that means the client has shut down, and the + // most appropriate thing to do for the network future is to shut down too. + None => return, + }; + + if announce_imported_blocks { + network.service().announce_block(notification.hash, Vec::new()); + } - if let sp_consensus::BlockOrigin::Own = notification.origin { - network.service().own_block_imported( - notification.hash, - notification.header.number().clone(), - ); + if let sp_consensus::BlockOrigin::Own = notification.origin { + network.service().own_block_imported( + notification.hash, + notification.header.number().clone(), + ); + } } - } - // We poll `finality_notification_stream`, but we only take the last event. - let mut last = None; - while let Poll::Ready(Some(item)) = Pin::new(&mut finality_notification_stream).poll_next(cx) { - last = Some(item); - } - if let Some(notification) = last { - network.on_block_finalized(notification.hash, notification.header); - } + // List of blocks that the client has finalized. + notification = last_finality_notification => { + network.on_block_finalized(notification.hash, notification.header); + } - // Poll the RPC requests and answer them. - while let Poll::Ready(Some(request)) = Pin::new(&mut rpc_rx).poll_next(cx) { - match request { - sc_rpc::system::Request::Health(sender) => { - let _ = sender.send(sc_rpc::system::Health { - peers: network.peers_debug_info().len(), - is_syncing: network.service().is_major_syncing(), - should_have_peers, - }); - }, - sc_rpc::system::Request::LocalPeerId(sender) => { - let _ = sender.send(network.local_peer_id().to_base58()); - }, - sc_rpc::system::Request::LocalListenAddresses(sender) => { - let peer_id = network.local_peer_id().clone().into(); - let p2p_proto_suffix = sc_network::multiaddr::Protocol::P2p(peer_id); - let addresses = network.listen_addresses() - .map(|addr| addr.clone().with(p2p_proto_suffix.clone()).to_string()) - .collect(); - let _ = sender.send(addresses); - }, - sc_rpc::system::Request::Peers(sender) => { - let _ = sender.send(network.peers_debug_info().into_iter().map(|(peer_id, p)| - sc_rpc::system::PeerInfo { - peer_id: peer_id.to_base58(), - roles: format!("{:?}", p.roles), - protocol_version: p.protocol_version, - best_hash: p.best_hash, - best_number: p.best_number, - } - ).collect()); - } - sc_rpc::system::Request::NetworkState(sender) => { - if let Some(network_state) = serde_json::to_value(&network.network_state()).ok() { - let _ = sender.send(network_state); + // Answer incoming RPC requests. + request = rpc_rx.next() => { + match request { + None => continue, + Some(sc_rpc::system::Request::Health(sender)) => { + let _ = sender.send(sc_rpc::system::Health { + peers: network.peers_debug_info().len(), + is_syncing: network.service().is_major_syncing(), + should_have_peers, + }); + }, + Some(sc_rpc::system::Request::LocalPeerId(sender)) => { + let _ = sender.send(network.local_peer_id().to_base58()); + }, + Some(sc_rpc::system::Request::LocalListenAddresses(sender)) => { + let peer_id = network.local_peer_id().clone().into(); + let p2p_proto_suffix = sc_network::multiaddr::Protocol::P2p(peer_id); + let addresses = network.listen_addresses() + .map(|addr| addr.clone().with(p2p_proto_suffix.clone()).to_string()) + .collect(); + let _ = sender.send(addresses); + }, + Some(sc_rpc::system::Request::Peers(sender)) => { + let _ = sender.send(network.peers_debug_info().into_iter().map(|(peer_id, p)| + sc_rpc::system::PeerInfo { + peer_id: peer_id.to_base58(), + roles: format!("{:?}", p.roles), + protocol_version: p.protocol_version, + best_hash: p.best_hash, + best_number: p.best_number, + } + ).collect()); } - } - sc_rpc::system::Request::NetworkAddReservedPeer(peer_addr, sender) => { - let x = network.add_reserved_peer(peer_addr) - .map_err(sc_rpc::system::error::Error::MalformattedPeerArg); - let _ = sender.send(x); - } - sc_rpc::system::Request::NetworkRemoveReservedPeer(peer_id, sender) => { - let _ = match peer_id.parse::() { - Ok(peer_id) => { - network.remove_reserved_peer(peer_id); - sender.send(Ok(())) + Some(sc_rpc::system::Request::NetworkState(sender)) => { + if let Some(network_state) = serde_json::to_value(&network.network_state()).ok() { + let _ = sender.send(network_state); } - Err(e) => sender.send(Err(sc_rpc::system::error::Error::MalformattedPeerArg( - e.to_string(), - ))), - }; - } - sc_rpc::system::Request::NodeRoles(sender) => { - use sc_rpc::system::NodeRole; + } + Some(sc_rpc::system::Request::NetworkAddReservedPeer(peer_addr, sender)) => { + let x = network.add_reserved_peer(peer_addr) + .map_err(sc_rpc::system::error::Error::MalformattedPeerArg); + let _ = sender.send(x); + } + Some(sc_rpc::system::Request::NetworkRemoveReservedPeer(peer_id, sender)) => { + let _ = match peer_id.parse::() { + Ok(peer_id) => { + network.remove_reserved_peer(peer_id); + sender.send(Ok(())) + } + Err(e) => sender.send(Err(sc_rpc::system::error::Error::MalformattedPeerArg( + e.to_string(), + ))), + }; + } + Some(sc_rpc::system::Request::NodeRoles(sender)) => { + use sc_rpc::system::NodeRole; - let node_role = match role { - Role::Authority { .. } => NodeRole::Authority, - Role::Light => NodeRole::LightClient, - Role::Full => NodeRole::Full, - Role::Sentry { .. } => NodeRole::Sentry, - }; + let node_role = match role { + Role::Authority { .. } => NodeRole::Authority, + Role::Light => NodeRole::LightClient, + Role::Full => NodeRole::Full, + Role::Sentry { .. } => NodeRole::Sentry, + }; - let _ = sender.send(vec![node_role]); + let _ = sender.send(vec![node_role]); + } } - }; - } + } - // Interval report for the external API. - status_sinks.lock().poll(cx, || { - let status = NetworkStatus { - sync_state: network.sync_state(), - best_seen_block: network.best_seen_block(), - num_sync_peers: network.num_sync_peers(), - num_connected_peers: network.num_connected_peers(), - num_active_peers: network.num_active_peers(), - average_download_per_sec: network.average_download_per_sec(), - average_upload_per_sec: network.average_upload_per_sec(), - }; - let state = network.network_state(); - (status, state) - }); - - // Main network polling. - if let Poll::Ready(Ok(())) = Pin::new(&mut network).poll(cx).map_err(|err| { - warn!(target: "service", "Error in network: {:?}", err); - }) { - return Poll::Ready(()); + // The network worker has done something. Nothing special to do, but could be + // used in the future to perform actions in response of things that happened on + // the network. + _ = (&mut network).fuse() => {} + + // At a regular interval, we send the state of the network on what is called + // the "status sinks". + ready_sink = status_sinks.next().fuse() => { + let status = NetworkStatus { + sync_state: network.sync_state(), + best_seen_block: network.best_seen_block(), + num_sync_peers: network.num_sync_peers(), + num_connected_peers: network.num_connected_peers(), + num_active_peers: network.num_active_peers(), + average_download_per_sec: network.average_download_per_sec(), + average_upload_per_sec: network.average_upload_per_sec(), + }; + let state = network.network_state(); + ready_sink.send((status, state)); + } } // Now some diagnostic for performances. @@ -513,9 +537,7 @@ fn build_network_future< "⚠️ Polling the network future took {:?}", polling_dur ); - - Poll::Pending - }) + } } #[cfg(not(target_os = "unknown"))] diff --git a/primitives/utils/src/status_sinks.rs b/primitives/utils/src/status_sinks.rs index 47bccebb960b4..de6a7b96e4cbd 100644 --- a/primitives/utils/src/status_sinks.rs +++ b/primitives/utils/src/status_sinks.rs @@ -14,19 +14,27 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use futures::{Stream, stream::futures_unordered::FuturesUnordered}; -use std::time::Duration; -use std::pin::Pin; -use std::task::{Poll, Context}; +use crate::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; +use futures::{prelude::*, lock::Mutex}; use futures_timer::Delay; -use crate::mpsc::TracingUnboundedSender; +use std::{pin::Pin, task::{Poll, Context}, time::Duration}; /// Holds a list of `UnboundedSender`s, each associated with a certain time period. Every time the /// period elapses, we push an element on the sender. /// /// Senders are removed only when they are closed. pub struct StatusSinks { - entries: FuturesUnordered>, + /// Should only be locked by `next`. + inner: Mutex>, + /// Sending side of `Inner::entries_rx`. + entries_tx: TracingUnboundedSender>, +} + +struct Inner { + /// The actual entries of the list. + entries: stream::FuturesUnordered>, + /// Receives new entries and puts them in `entries`. + entries_rx: TracingUnboundedReceiver>, } struct YieldAfter { @@ -38,56 +46,114 @@ struct YieldAfter { impl StatusSinks { /// Builds a new empty collection. pub fn new() -> StatusSinks { + let (entries_tx, entries_rx) = tracing_unbounded("status-sinks-entries"); + StatusSinks { - entries: FuturesUnordered::new(), + inner: Mutex::new(Inner { + entries: stream::FuturesUnordered::new(), + entries_rx, + }), + entries_tx, } } /// Adds a sender to the collection. /// /// The `interval` is the time period between two pushes on the sender. - pub fn push(&mut self, interval: Duration, sender: TracingUnboundedSender) { - self.entries.push(YieldAfter { + pub fn push(&self, interval: Duration, sender: TracingUnboundedSender) { + let _ = self.entries_tx.unbounded_send(YieldAfter { delay: Delay::new(interval), interval, sender: Some(sender), - }) + }); } - /// Processes all the senders. If any sender is ready, calls the `status_grab` function and - /// pushes what it returns to the sender. + /// Waits until one of the sinks is ready, then returns an object that can be used to send + /// an element on said sink. /// - /// This function doesn't return anything, but it should be treated as if it implicitly - /// returns `Poll::Pending`. In particular, it should be called again when the task - /// is waken up. - /// - /// # Panic - /// - /// Panics if not called within the context of a task. - pub fn poll(&mut self, cx: &mut Context, mut status_grab: impl FnMut() -> T) { + /// If the object isn't used to send an element, the slot is skipped. + pub async fn next(&self) -> ReadySinkEvent<'_, T> { + // This is only ever locked by `next`, which means that one `next` at a time can run. + let mut inner = self.inner.lock().await; + let inner = &mut *inner; + loop { - match Pin::new(&mut self.entries).poll_next(cx) { - Poll::Ready(Some((sender, interval))) => { - let status = status_grab(); - if sender.unbounded_send(status).is_ok() { - self.entries.push(YieldAfter { - // Note that since there's a small delay between the moment a task is - // waken up and the moment it is polled, the period is actually not - // `interval` but `interval + `. We ignore this problem in - // practice. - delay: Delay::new(interval), - interval, - sender: Some(sender), - }); + // Future that produces the next ready entry in `entries`, or doesn't produce anything if + // the list is empty. + let next_ready_entry = { + let entries = &mut inner.entries; + async move { + if let Some(v) = entries.next().await { + v + } else { + loop { + futures::pending!() + } + } + } + }; + + futures::select!{ + new_entry = inner.entries_rx.next() => { + if let Some(new_entry) = new_entry { + inner.entries.push(new_entry); + } + }, + (sender, interval) = next_ready_entry.fuse() => { + return ReadySinkEvent { + sinks: self, + sender: Some(sender), + interval, } } - Poll::Ready(None) | - Poll::Pending => break, } } } } +/// One of the sinks is ready. +#[must_use] +pub struct ReadySinkEvent<'a, T> { + sinks: &'a StatusSinks, + sender: Option>, + interval: Duration, +} + +impl<'a, T> ReadySinkEvent<'a, T> { + /// Sends an element on the sender. + pub fn send(mut self, element: T) { + if let Some(sender) = self.sender.take() { + if sender.unbounded_send(element).is_ok() { + let _ = self.sinks.entries_tx.unbounded_send(YieldAfter { + // Note that since there's a small delay between the moment a task is + // woken up and the moment it is polled, the period is actually not + // `interval` but `interval + `. We ignore this problem in + // practice. + delay: Delay::new(self.interval), + interval: self.interval, + sender: Some(sender), + }); + } + } + } +} + +impl<'a, T> Drop for ReadySinkEvent<'a, T> { + fn drop(&mut self) { + if let Some(sender) = self.sender.take() { + if sender.is_closed() { + return; + } + + let _ = self.sinks.entries_tx.unbounded_send(YieldAfter { + delay: Delay::new(self.interval), + interval: self.interval, + sender: Some(sender), + }); + } + } +} + impl futures::Future for YieldAfter { type Output = (TracingUnboundedSender, Duration); @@ -109,26 +175,28 @@ impl futures::Future for YieldAfter { mod tests { use super::StatusSinks; use futures::prelude::*; - use crate::mpsc::tracing_unbounded; + use futures::channel::mpsc; use std::time::Duration; - use std::task::Poll; #[test] fn works() { // We're not testing that the `StatusSink` properly enforces an order in the intervals, as // this easily causes test failures on busy CPUs. - let mut status_sinks = StatusSinks::new(); + let status_sinks = StatusSinks::new(); - let (tx, rx) = tracing_unbounded("status_sink_test"); + let (tx, rx) = mpsc::unbounded(); status_sinks.push(Duration::from_millis(100), tx); let mut val_order = 5; futures::executor::block_on(futures::future::select( - futures::future::poll_fn(move |cx| { - status_sinks.poll(cx, || { val_order += 1; val_order }); - Poll::<()>::Pending + Box::pin(async move { + loop { + let ev = status_sinks.next().await; + val_order += 1; + ev.send(val_order); + } }), Box::pin(async { let items: Vec = rx.take(3).collect().await; From 7672c283c9afbf4f561c823933f250ea02248f78 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 30 Jun 2020 11:24:12 +0200 Subject: [PATCH 2/3] Address concerns --- Cargo.lock | 1 - client/informant/Cargo.toml | 3 +-- client/service/src/lib.rs | 47 ++++++++++++++++++------------------- 3 files changed, 24 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8a84eaf3bd03b..0b816c6a807ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6420,7 +6420,6 @@ dependencies = [ "futures 0.3.4", "log", "parity-util-mem", - "parking_lot 0.10.2", "sc-client-api", "sc-network", "sp-blockchain", diff --git a/client/informant/Cargo.toml b/client/informant/Cargo.toml index d2df78537d8d2..98c72f5deb5e0 100644 --- a/client/informant/Cargo.toml +++ b/client/informant/Cargo.toml @@ -16,11 +16,10 @@ ansi_term = "0.12.1" futures = "0.3.4" log = "0.4.8" parity-util-mem = { version = "0.6.1", default-features = false, features = ["primitive-types"] } -wasm-timer = "0.2" sc-client-api = { version = "2.0.0-rc4", path = "../api" } sc-network = { version = "0.8.0-rc4", path = "../network" } sp-blockchain = { version = "2.0.0-rc4", path = "../../primitives/blockchain" } sp-runtime = { version = "2.0.0-rc4", path = "../../primitives/runtime" } sp-utils = { version = "2.0.0-rc2", path = "../../primitives/utils" } sp-transaction-pool = { version = "2.0.0-rc2", path = "../../primitives/transaction-pool" } -parking_lot = "0.10.2" +wasm-timer = "0.2" diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 084f7312e3851..718e229f30aac 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -383,33 +383,33 @@ async fn build_network_future< mut network: sc_network::NetworkWorker, client: Arc, status_sinks: Arc, NetworkState)>>, - rpc_rx: TracingUnboundedReceiver>, + mut rpc_rx: TracingUnboundedReceiver>, should_have_peers: bool, announce_imported_blocks: bool, ) { let mut imported_blocks_stream = client.import_notification_stream().fuse(); - let mut finality_notification_stream = client.finality_notification_stream().fuse(); - // In the case where the channel containing RPC requests shuts down, we want the network - // future to continue running. To make that easier, we make sure that `rpc_rx` never finishes. - let mut rpc_rx = rpc_rx.chain(stream::pending()).fuse(); + // Stream of finalized blocks reported by the client. + let mut finality_notification_stream = { + let mut finality_notification_stream = client.finality_notification_stream().fuse(); - loop { - let before_polling = Instant::now(); - - // This future does the same as `finality_notification_stream.next()`, except that if - // multiple events are ready on the stream, only the last one is returned. - let mut last_finality_notification = futures::future::poll_fn(|cx| { + // We tweak the `Stream` in order to merge together multiple items if they happen to be + // ready. This way, we only get the latest finalized block. + stream::poll_fn(move |cx| { let mut last = None; while let Poll::Ready(Some(item)) = Pin::new(&mut finality_notification_stream).poll_next(cx) { last = Some(item); } if let Some(last) = last { - Poll::Ready(last) + Poll::Ready(Some(last)) } else { Poll::Pending } - }).fuse(); + }).fuse() + }; + + loop { + let before_polling = Instant::now(); futures::select!{ // List of blocks that the client has imported. @@ -434,25 +434,24 @@ async fn build_network_future< } // List of blocks that the client has finalized. - notification = last_finality_notification => { + notification = finality_notification_stream.select_next_some() => { network.on_block_finalized(notification.hash, notification.header); } // Answer incoming RPC requests. - request = rpc_rx.next() => { + request = rpc_rx.select_next_some() => { match request { - None => continue, - Some(sc_rpc::system::Request::Health(sender)) => { + sc_rpc::system::Request::Health(sender) => { let _ = sender.send(sc_rpc::system::Health { peers: network.peers_debug_info().len(), is_syncing: network.service().is_major_syncing(), should_have_peers, }); }, - Some(sc_rpc::system::Request::LocalPeerId(sender)) => { + sc_rpc::system::Request::LocalPeerId(sender) => { let _ = sender.send(network.local_peer_id().to_base58()); }, - Some(sc_rpc::system::Request::LocalListenAddresses(sender)) => { + sc_rpc::system::Request::LocalListenAddresses(sender) => { let peer_id = network.local_peer_id().clone().into(); let p2p_proto_suffix = sc_network::multiaddr::Protocol::P2p(peer_id); let addresses = network.listen_addresses() @@ -460,7 +459,7 @@ async fn build_network_future< .collect(); let _ = sender.send(addresses); }, - Some(sc_rpc::system::Request::Peers(sender)) => { + sc_rpc::system::Request::Peers(sender) => { let _ = sender.send(network.peers_debug_info().into_iter().map(|(peer_id, p)| sc_rpc::system::PeerInfo { peer_id: peer_id.to_base58(), @@ -471,17 +470,17 @@ async fn build_network_future< } ).collect()); } - Some(sc_rpc::system::Request::NetworkState(sender)) => { + sc_rpc::system::Request::NetworkState(sender) => { if let Some(network_state) = serde_json::to_value(&network.network_state()).ok() { let _ = sender.send(network_state); } } - Some(sc_rpc::system::Request::NetworkAddReservedPeer(peer_addr, sender)) => { + sc_rpc::system::Request::NetworkAddReservedPeer(peer_addr, sender) => { let x = network.add_reserved_peer(peer_addr) .map_err(sc_rpc::system::error::Error::MalformattedPeerArg); let _ = sender.send(x); } - Some(sc_rpc::system::Request::NetworkRemoveReservedPeer(peer_id, sender)) => { + sc_rpc::system::Request::NetworkRemoveReservedPeer(peer_id, sender) => { let _ = match peer_id.parse::() { Ok(peer_id) => { network.remove_reserved_peer(peer_id); @@ -492,7 +491,7 @@ async fn build_network_future< ))), }; } - Some(sc_rpc::system::Request::NodeRoles(sender)) => { + sc_rpc::system::Request::NodeRoles(sender) => { use sc_rpc::system::NodeRole; let node_role = match role { From 10058e55969c10dad6599b0c79ecc7b428e80c6f Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 30 Jun 2020 13:25:28 +0200 Subject: [PATCH 3/3] Fix test --- primitives/utils/src/status_sinks.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/primitives/utils/src/status_sinks.rs b/primitives/utils/src/status_sinks.rs index de6a7b96e4cbd..65a560af4eaa5 100644 --- a/primitives/utils/src/status_sinks.rs +++ b/primitives/utils/src/status_sinks.rs @@ -173,9 +173,9 @@ impl futures::Future for YieldAfter { #[cfg(test)] mod tests { + use crate::mpsc::tracing_unbounded; use super::StatusSinks; use futures::prelude::*; - use futures::channel::mpsc; use std::time::Duration; #[test] @@ -185,7 +185,7 @@ mod tests { let status_sinks = StatusSinks::new(); - let (tx, rx) = mpsc::unbounded(); + let (tx, rx) = tracing_unbounded("test"); status_sinks.push(Duration::from_millis(100), tx); let mut val_order = 5;