From e033a9104385bac6141535d838da5f1a45c93339 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 28 Feb 2023 12:47:03 +0200 Subject: [PATCH 1/6] Replace `futures-channel` with `async-channel` in `out_events` --- Cargo.lock | 12 ++++++++ client/network/Cargo.toml | 1 + client/network/src/service/out_events.rs | 35 ++++++++---------------- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 91ae9ab422c3e..095fa3303ca13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -326,6 +326,17 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" +[[package]] +name = "async-channel" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + [[package]] name = "async-io" version = "1.12.0" @@ -8622,6 +8633,7 @@ version = "0.10.0-dev" dependencies = [ "array-bytes", "assert_matches", + "async-channel", "async-trait", "asynchronous-codec", "backtrace", diff --git a/client/network/Cargo.toml b/client/network/Cargo.toml index b6b21247128d7..004b4dd3daf75 100644 --- a/client/network/Cargo.toml +++ b/client/network/Cargo.toml @@ -15,6 +15,7 @@ targets = ["x86_64-unknown-linux-gnu"] [dependencies] array-bytes = "4.1" +async-channel = "1.8.0" async-trait = "0.1" asynchronous-codec = "0.6" backtrace = "0.3.67" diff --git a/client/network/src/service/out_events.rs b/client/network/src/service/out_events.rs index 57a5092ae62ea..be8738ee1cd83 100644 --- a/client/network/src/service/out_events.rs +++ b/client/network/src/service/out_events.rs @@ -32,7 +32,7 @@ //! collection. use backtrace::Backtrace; -use futures::{channel::mpsc, prelude::*, ready, stream::FusedStream}; +use futures::{prelude::*, ready, stream::FusedStream}; use log::error; use parking_lot::Mutex; use prometheus_endpoint::{register, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64}; @@ -41,10 +41,7 @@ use std::{ cell::RefCell, fmt, pin::Pin, - sync::{ - atomic::{AtomicI64, Ordering}, - Arc, - }, + sync::Arc, task::{Context, Poll}, }; @@ -52,20 +49,18 @@ use std::{ /// /// The name is used in Prometheus reports, the queue size threshold is used /// to warn if there are too many unprocessed events in the channel. -pub fn channel(name: &'static str, queue_size_warning: i64) -> (Sender, Receiver) { - let (tx, rx) = mpsc::unbounded(); +pub fn channel(name: &'static str, queue_size_warning: usize) -> (Sender, Receiver) { + let (tx, rx) = async_channel::unbounded(); let metrics = Arc::new(Mutex::new(None)); - let queue_size = Arc::new(AtomicI64::new(0)); let tx = Sender { inner: tx, name, - queue_size: queue_size.clone(), queue_size_warning, warning_fired: false, creation_backtrace: Backtrace::new_unresolved(), metrics: metrics.clone(), }; - let rx = Receiver { inner: rx, name, queue_size, metrics }; + let rx = Receiver { inner: rx, name, metrics }; (tx, rx) } @@ -77,16 +72,11 @@ pub fn channel(name: &'static str, queue_size_warning: i64) -> (Sender, Receiver /// implement the `Clone` trait e.g. in Order to not complicate the logic keeping the metrics in /// sync on drop. If someone adds a `#[derive(Clone)]` below, it is **wrong**. pub struct Sender { - inner: mpsc::UnboundedSender, + inner: async_channel::Sender, /// Name to identify the channel (e.g., in Prometheus and logs). name: &'static str, - /// Number of events in the queue. Clone of [`Receiver::in_transit`]. - // To not bother with ordering and possible underflow errors of the unsigned counter - // we just use `i64` and `Ordering::Relaxed`, and perceive `queue_size` as approximate. - // It can turn < 0 though. - queue_size: Arc, /// Threshold queue size to generate an error message in the logs. - queue_size_warning: i64, + queue_size_warning: usize, /// We generate the error message only once to not spam the logs. warning_fired: bool, /// Backtrace of a place where the channel was created. @@ -113,9 +103,8 @@ impl Drop for Sender { /// Receiving side of a channel. pub struct Receiver { - inner: mpsc::UnboundedReceiver, + inner: async_channel::Receiver, name: &'static str, - queue_size: Arc, /// Initially contains `None`, and will be set to a value once the corresponding [`Sender`] /// is assigned to an instance of [`OutChannels`]. metrics: Arc>>>>, @@ -126,7 +115,6 @@ impl Stream for Receiver { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { if let Some(ev) = ready!(Pin::new(&mut self.inner).poll_next(cx)) { - let _ = self.queue_size.fetch_sub(1, Ordering::Relaxed); let metrics = self.metrics.lock().clone(); match metrics.as_ref().map(|m| m.as_ref()) { Some(Some(metrics)) => metrics.event_out(&ev, self.name), @@ -191,17 +179,16 @@ impl OutChannels { /// Sends an event. pub fn send(&mut self, event: Event) { self.event_streams.retain_mut(|sender| { - let queue_size = sender.queue_size.fetch_add(1, Ordering::Relaxed); - if queue_size == sender.queue_size_warning && !sender.warning_fired { + if sender.inner.len() == sender.queue_size_warning && !sender.warning_fired { sender.warning_fired = true; sender.creation_backtrace.resolve(); error!( - "The number of unprocessed events in channel `{}` reached {}.\n\ + "The number of unprocessed events in channel `{}` exceeded {}.\n\ The channel was created at:\n{:?}", sender.name, sender.queue_size_warning, sender.creation_backtrace, ); } - sender.inner.unbounded_send(event.clone()).is_ok() + sender.inner.try_send(event.clone()).is_ok() }); if let Some(metrics) = &*self.metrics { From 2373f33c3e2847fa9d745206bd2a01e800cb748e Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 28 Feb 2023 16:02:39 +0300 Subject: [PATCH 2/6] Apply suggestions from code review Co-authored-by: Koute --- client/network/src/service/out_events.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/src/service/out_events.rs b/client/network/src/service/out_events.rs index be8738ee1cd83..eebbf9389aef4 100644 --- a/client/network/src/service/out_events.rs +++ b/client/network/src/service/out_events.rs @@ -179,7 +179,7 @@ impl OutChannels { /// Sends an event. pub fn send(&mut self, event: Event) { self.event_streams.retain_mut(|sender| { - if sender.inner.len() == sender.queue_size_warning && !sender.warning_fired { + if sender.inner.len() >= sender.queue_size_warning && !sender.warning_fired { sender.warning_fired = true; sender.creation_backtrace.resolve(); error!( From 78251ff97d4bca6fc253960198acee8640055c2e Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 28 Feb 2023 15:16:43 +0200 Subject: [PATCH 3/6] Also print the backtrace of `send()` call --- client/network/src/service/out_events.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/client/network/src/service/out_events.rs b/client/network/src/service/out_events.rs index eebbf9389aef4..9f74b2adf9285 100644 --- a/client/network/src/service/out_events.rs +++ b/client/network/src/service/out_events.rs @@ -184,8 +184,12 @@ impl OutChannels { sender.creation_backtrace.resolve(); error!( "The number of unprocessed events in channel `{}` exceeded {}.\n\ - The channel was created at:\n{:?}", - sender.name, sender.queue_size_warning, sender.creation_backtrace, + The channel was created at:\n{:?}\n + The last event was sent from:\n{:?}", + sender.name, + sender.queue_size_warning, + sender.creation_backtrace, + Backtrace::new(), ); } sender.inner.try_send(event.clone()).is_ok() From a8ce5cd9411aed99cf2b60b27354782c02838ccf Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 28 Feb 2023 15:28:07 +0200 Subject: [PATCH 4/6] Switch from `backtrace` crate to `std::backtrace` --- client/network/src/service/out_events.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/client/network/src/service/out_events.rs b/client/network/src/service/out_events.rs index 9f74b2adf9285..99ac022c2d8b6 100644 --- a/client/network/src/service/out_events.rs +++ b/client/network/src/service/out_events.rs @@ -31,13 +31,13 @@ //! - Send events by calling [`OutChannels::send`]. Events are cloned for each sender in the //! collection. -use backtrace::Backtrace; use futures::{prelude::*, ready, stream::FusedStream}; use log::error; use parking_lot::Mutex; use prometheus_endpoint::{register, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64}; use sc_network_common::protocol::event::Event; use std::{ + backtrace::Backtrace, cell::RefCell, fmt, pin::Pin, @@ -57,7 +57,7 @@ pub fn channel(name: &'static str, queue_size_warning: usize) -> (Sender, Receiv name, queue_size_warning, warning_fired: false, - creation_backtrace: Backtrace::new_unresolved(), + creation_backtrace: Backtrace::force_capture(), metrics: metrics.clone(), }; let rx = Receiver { inner: rx, name, metrics }; @@ -181,15 +181,14 @@ impl OutChannels { self.event_streams.retain_mut(|sender| { if sender.inner.len() >= sender.queue_size_warning && !sender.warning_fired { sender.warning_fired = true; - sender.creation_backtrace.resolve(); error!( "The number of unprocessed events in channel `{}` exceeded {}.\n\ - The channel was created at:\n{:?}\n - The last event was sent from:\n{:?}", + The channel was created at:\n{:}\n + The last event was sent from:\n{:}", sender.name, sender.queue_size_warning, sender.creation_backtrace, - Backtrace::new(), + Backtrace::force_capture(), ); } sender.inner.try_send(event.clone()).is_ok() From 7909e9e280c7de162adcd9d5fa327c938073e75f Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 28 Feb 2023 16:06:56 +0200 Subject: [PATCH 5/6] Remove outdated `backtrace` dependency --- client/network/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/client/network/Cargo.toml b/client/network/Cargo.toml index 004b4dd3daf75..5a918bebd626e 100644 --- a/client/network/Cargo.toml +++ b/client/network/Cargo.toml @@ -18,7 +18,6 @@ array-bytes = "4.1" async-channel = "1.8.0" async-trait = "0.1" asynchronous-codec = "0.6" -backtrace = "0.3.67" bytes = "1" codec = { package = "parity-scale-codec", version = "3.2.2", features = ["derive"] } either = "1.5.3" From 4514647cca08b31c4966c4e3df18edf32a8b0cf6 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 28 Feb 2023 16:14:57 +0200 Subject: [PATCH 6/6] Remove `backtrace` from `Cargo.lock` --- Cargo.lock | 1 - 1 file changed, 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 095fa3303ca13..4ca12aca97567 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8636,7 +8636,6 @@ dependencies = [ "async-channel", "async-trait", "asynchronous-codec", - "backtrace", "bytes", "either", "fnv",