From f30feeccdf8d2d92998c9656d655d08a8d47f9ba Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 5 Aug 2020 11:05:57 +0200 Subject: [PATCH 1/6] Wait for all notifications protocols to be open before reporting opening --- .../protocol/generic_proto/handler/group.rs | 131 +++++++++++------- .../protocol/generic_proto/handler/legacy.rs | 21 --- .../generic_proto/handler/notif_out.rs | 16 +++ client/network/src/service/tests.rs | 2 - 4 files changed, 97 insertions(+), 73 deletions(-) diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index 2826f7a19c8c2..b0d0b41be2995 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -107,9 +107,17 @@ pub struct NotifsHandler { /// Handlers for outbound substreams, and the initial handshake message we send. out_handlers: Vec<(NotifsOutHandler, Arc>>)>, + /// Whether we are the connection dialer or listener. + endpoint: ConnectedPoint, + /// Handler for backwards-compatibility. legacy: LegacyProtoHandler, + /// In the situation where `legacy.is_open()` is true, but we haven't sent out any + /// [`NotifsHandlerOut::Open`] event yet, this contains the handshake received on the legacy + /// substream. + pending_legacy_handshake: Option>, + /// State of this handler. enabled: EnabledState, @@ -123,6 +131,9 @@ pub struct NotifsHandler { /// We use two different channels in order to have two different channel sizes, but from the /// receiving point of view, the two channels are the same. /// The receivers are fused in case the user drops the [`NotificationsSink`] entirely. + /// + /// Contains `Some` if and only if it has been reported to the user that the substreams are + /// open. notifications_sink_rx: Option< stream::Select< stream::Fuse>, @@ -159,7 +170,9 @@ impl IntoProtocolsHandler for NotifsHandlerProto { .into_iter() .map(|(proto, msg)| (proto.into_handler(remote_peer_id, connected_point), msg)) .collect(), + endpoint: connected_point.clone(), legacy: self.legacy.into_handler(remote_peer_id, connected_point), + pending_legacy_handshake: None, enabled: EnabledState::Initial, pending_in: Vec::new(), notifications_sink_rx: None, @@ -622,54 +635,48 @@ impl ProtocolsHandler for NotifsHandler { } } - if let Poll::Ready(ev) = self.legacy.poll(cx) { - return match ev { - ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: () } => - Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: protocol.map_upgrade(EitherUpgrade::B), - info: None, - }), - ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { - endpoint, - received_handshake, - .. - }) => { - let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE); - let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE); - let notifications_sink = NotificationsSink { - inner: Arc::new(NotificationsSinkInner { - async_channel: FuturesMutex::new(async_tx), - sync_channel: Mutex::new(sync_tx), + // If `self.pending_legacy_handshake` is `Some`, we are in a state where the legacy + // substream is open but the user isn't aware yet of the substreams being open. + // When that is the case, the legacy substream shouldn't be polled, otherwise there is + // a risk of receiving messages from it. + if self.pending_legacy_handshake.is_none() { + while let Poll::Ready(ev) = self.legacy.poll(cx) { + match ev { + ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: () } => + return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: protocol.map_upgrade(EitherUpgrade::B), + info: None, }), - }; - - debug_assert!(self.notifications_sink_rx.is_none()); - self.notifications_sink_rx = Some(stream::select(async_rx.fuse(), sync_rx.fuse())); - - Poll::Ready(ProtocolsHandlerEvent::Custom( - NotifsHandlerOut::Open { endpoint, received_handshake, notifications_sink } - )) - }, - ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { endpoint, reason }) => { - // We consciously drop the receivers despite notifications being potentially - // still buffered up. - debug_assert!(self.notifications_sink_rx.is_some()); - self.notifications_sink_rx = None; - - Poll::Ready(ProtocolsHandlerEvent::Custom( - NotifsHandlerOut::Closed { endpoint, reason } - )) - }, - ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomMessage { message }) => - Poll::Ready(ProtocolsHandlerEvent::Custom( - NotifsHandlerOut::CustomMessage { message } - )), - ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::ProtocolError { is_severe, error }) => - Poll::Ready(ProtocolsHandlerEvent::Custom( - NotifsHandlerOut::ProtocolError { is_severe, error } - )), - ProtocolsHandlerEvent::Close(err) => - Poll::Ready(ProtocolsHandlerEvent::Close(NotifsHandlerError::Legacy(err))), + ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { + received_handshake, + .. + }) => { + self.pending_legacy_handshake = Some(received_handshake); + break; + }, + ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { reason, .. }) => { + // We consciously drop the receivers despite notifications being potentially + // still buffered up. + debug_assert!(self.notifications_sink_rx.is_some()); + self.notifications_sink_rx = None; + + return Poll::Ready(ProtocolsHandlerEvent::Custom( + NotifsHandlerOut::Closed { endpoint: self.endpoint.clone(), reason } + )) + }, + ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomMessage { message }) => { + debug_assert!(self.notifications_sink_rx.is_some()); + return Poll::Ready(ProtocolsHandlerEvent::Custom( + NotifsHandlerOut::CustomMessage { message } + )) + }, + ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::ProtocolError { is_severe, error }) => + return Poll::Ready(ProtocolsHandlerEvent::Custom( + NotifsHandlerOut::ProtocolError { is_severe, error } + )), + ProtocolsHandlerEvent::Close(err) => + return Poll::Ready(ProtocolsHandlerEvent::Close(NotifsHandlerError::Legacy(err))), + } } } @@ -693,9 +700,9 @@ impl ProtocolsHandler for NotifsHandler { }, ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed) => {}, ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(message)) => { - // Note that right now the legacy substream has precedence over - // everything. If it is not open, then we consider that nothing is open. - if self.legacy.is_open() { + // `notifications_sink_rx.is_some()` is how we know whether the user has + // been informed of the substreams being open. + if self.notifications_sink_rx.is_some() { let msg = NotifsHandlerOut::Notification { message, protocol_name: handler.protocol_name().to_owned().into(), @@ -730,6 +737,30 @@ impl ProtocolsHandler for NotifsHandler { } } + if self.out_handlers.iter().all(|(h, _)| h.is_open() || h.is_refused()) { + if let Some(handshake) = self.pending_legacy_handshake.take() { + let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE); + let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE); + let notifications_sink = NotificationsSink { + inner: Arc::new(NotificationsSinkInner { + async_channel: FuturesMutex::new(async_tx), + sync_channel: Mutex::new(sync_tx), + }), + }; + + debug_assert!(self.notifications_sink_rx.is_none()); + self.notifications_sink_rx = Some(stream::select(async_rx.fuse(), sync_rx.fuse())); + + return Poll::Ready(ProtocolsHandlerEvent::Custom( + NotifsHandlerOut::Open { + endpoint: self.endpoint.clone(), + received_handshake: handshake, + notifications_sink + } + )) + } + } + Poll::Pending } } diff --git a/client/network/src/protocol/generic_proto/handler/legacy.rs b/client/network/src/protocol/generic_proto/handler/legacy.rs index 71d6175f06674..7d31ed323a43b 100644 --- a/client/network/src/protocol/generic_proto/handler/legacy.rs +++ b/client/network/src/protocol/generic_proto/handler/legacy.rs @@ -222,16 +222,12 @@ pub enum LegacyProtoHandlerOut { /// Handshake message that has been sent to us. /// This is normally a "Status" message, but this out of the concern of this code. received_handshake: Vec, - /// The connected endpoint. - endpoint: ConnectedPoint, }, /// Closed a custom protocol with the remote. CustomProtocolClosed { /// Reason why the substream closed, for diagnostic purposes. reason: Cow<'static, str>, - /// The connected endpoint. - endpoint: ConnectedPoint, }, /// Receives a message on a custom protocol substream. @@ -250,18 +246,6 @@ pub enum LegacyProtoHandlerOut { } impl LegacyProtoHandler { - /// Returns true if the legacy substream is currently open. - pub fn is_open(&self) -> bool { - match &self.state { - ProtocolState::Init { substreams, .. } => !substreams.is_empty(), - ProtocolState::Opening { .. } => false, - ProtocolState::Normal { substreams, .. } => !substreams.is_empty(), - ProtocolState::Disabled { .. } => false, - ProtocolState::KillAsap => false, - ProtocolState::Poisoned => false, - } - } - /// Enables the handler. fn enable(&mut self) { self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) { @@ -285,7 +269,6 @@ impl LegacyProtoHandler { } else { let event = LegacyProtoHandlerOut::CustomProtocolOpen { version: incoming[0].0.protocol_version(), - endpoint: self.endpoint.clone(), received_handshake: mem::replace(&mut incoming[0].1, Vec::new()), }; self.events_queue.push_back(ProtocolsHandlerEvent::Custom(event)); @@ -399,7 +382,6 @@ impl LegacyProtoHandler { if substreams.is_empty() { let event = LegacyProtoHandlerOut::CustomProtocolClosed { reason: "Legacy substream clogged".into(), - endpoint: self.endpoint.clone() }; self.state = ProtocolState::Disabled { shutdown: shutdown.into_iter().collect(), @@ -413,7 +395,6 @@ impl LegacyProtoHandler { if substreams.is_empty() { let event = LegacyProtoHandlerOut::CustomProtocolClosed { reason: "All substreams have been closed by the remote".into(), - endpoint: self.endpoint.clone() }; self.state = ProtocolState::Disabled { shutdown: shutdown.into_iter().collect(), @@ -426,7 +407,6 @@ impl LegacyProtoHandler { if substreams.is_empty() { let event = LegacyProtoHandlerOut::CustomProtocolClosed { reason: format!("Error on the last substream: {:?}", err).into(), - endpoint: self.endpoint.clone() }; self.state = ProtocolState::Disabled { shutdown: shutdown.into_iter().collect(), @@ -492,7 +472,6 @@ impl LegacyProtoHandler { ProtocolState::Opening { .. } => { let event = LegacyProtoHandlerOut::CustomProtocolOpen { version: substream.protocol_version(), - endpoint: self.endpoint.clone(), received_handshake, }; self.events_queue.push_back(ProtocolsHandlerEvent::Custom(event)); diff --git a/client/network/src/protocol/generic_proto/handler/notif_out.rs b/client/network/src/protocol/generic_proto/handler/notif_out.rs index 14de382c1bbca..851671aef9823 100644 --- a/client/network/src/protocol/generic_proto/handler/notif_out.rs +++ b/client/network/src/protocol/generic_proto/handler/notif_out.rs @@ -203,6 +203,22 @@ impl NotifsOutHandler { } } + /// Returns `true` there has been an attempt to open the substream, but the remote refused + /// the substream. + /// + /// Always returns `false` if the handler is in a disabled state. + pub fn is_refused(&self) -> bool { + match &self.state { + State::Disabled => false, + State::DisabledOpening => false, + State::DisabledOpen(_) => false, + State::Opening { .. } => false, + State::Refused => true, + State::Open { .. } => false, + State::Poisoned => false, + } + } + /// Returns the name of the protocol that we negotiate. pub fn protocol_name(&self) -> &[u8] { &self.protocol_name diff --git a/client/network/src/service/tests.rs b/client/network/src/service/tests.rs index 0bfe507599cd0..f0982e30d9921 100644 --- a/client/network/src/service/tests.rs +++ b/client/network/src/service/tests.rs @@ -345,9 +345,7 @@ fn lots_of_incoming_peers_works() { }); } -// TODO: this test is at the moment ignored because of https://github.com/paritytech/substrate/issues/6766 #[test] -#[ignore] fn notifications_back_pressure() { // Node 1 floods node 2 with notifications. Random sleeps are done on node 2 to simulate the // node being busy. We make sure that all notifications are received. From 0ddbcb855a187fcb91b57759a9fbb6da52f66834 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 6 Aug 2020 11:13:47 +0200 Subject: [PATCH 2/6] Update client/network/src/protocol/generic_proto/handler/notif_out.rs Co-authored-by: Max Inden --- client/network/src/protocol/generic_proto/handler/notif_out.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/src/protocol/generic_proto/handler/notif_out.rs b/client/network/src/protocol/generic_proto/handler/notif_out.rs index 851671aef9823..4ba9d9a0b74aa 100644 --- a/client/network/src/protocol/generic_proto/handler/notif_out.rs +++ b/client/network/src/protocol/generic_proto/handler/notif_out.rs @@ -203,7 +203,7 @@ impl NotifsOutHandler { } } - /// Returns `true` there has been an attempt to open the substream, but the remote refused + /// Returns `true` if there has been an attempt to open the substream, but the remote refused /// the substream. /// /// Always returns `false` if the handler is in a disabled state. From 2ccf5c13f751cfdfaa335fdfa9015df700f51766 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 6 Aug 2020 11:15:54 +0200 Subject: [PATCH 3/6] Concern --- client/network/src/protocol/generic_proto/handler/group.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index 04d1660678cb5..481f65121a5f7 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -697,6 +697,10 @@ impl ProtocolsHandler for NotifsHandler { ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(message)) => { // `notifications_sink_rx.is_some()` is how we know whether the user has // been informed of the substreams being open. + // `notifications_sink_rx` containing `None` should only ever happen for + // the last few messages being received during a shutdown, or if the + // remote is impolite. In order to enforce API invariants, we have no other + // choice but to discard the message. if self.notifications_sink_rx.is_some() { let msg = NotifsHandlerOut::Notification { message, From ca2a00d2d793fd8635c950e8b7fca397f82b7fb0 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 20 Aug 2020 15:25:29 +0200 Subject: [PATCH 4/6] Fix attempt --- .../protocol/generic_proto/handler/group.rs | 64 +++++++++---------- 1 file changed, 29 insertions(+), 35 deletions(-) diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index 481f65121a5f7..1c0b6c1be2928 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -632,8 +632,8 @@ impl ProtocolsHandler for NotifsHandler { // If `self.pending_legacy_handshake` is `Some`, we are in a state where the legacy // substream is open but the user isn't aware yet of the substreams being open. - // When that is the case, the legacy substream shouldn't be polled, otherwise there is - // a risk of receiving messages from it. + // When that is the case, neither the legacy substream nor the incoming notifications + // substreams should be polled, otherwise there is a risk of receiving messages from them. if self.pending_legacy_handshake.is_none() { while let Poll::Ready(ev) = self.legacy.poll(cx) { match ev { @@ -673,42 +673,36 @@ impl ProtocolsHandler for NotifsHandler { return Poll::Ready(ProtocolsHandlerEvent::Close(NotifsHandlerError::Legacy(err))), } } - } - for (handler_num, (handler, handshake_message)) in self.in_handlers.iter_mut().enumerate() { - while let Poll::Ready(ev) = handler.poll(cx) { - match ev { - ProtocolsHandlerEvent::OutboundSubstreamRequest { .. } => - error!("Incoming substream handler tried to open a substream"), - ProtocolsHandlerEvent::Close(err) => void::unreachable(err), - ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(_)) => - match self.enabled { - EnabledState::Initial => self.pending_in.push(handler_num), - EnabledState::Enabled => { - // We create `handshake_message` on a separate line to be sure - // that the lock is released as soon as possible. - let handshake_message = handshake_message.read().clone(); - handler.inject_event(NotifsInHandlerIn::Accept(handshake_message)) + for (handler_num, (handler, handshake_message)) in self.in_handlers.iter_mut().enumerate() { + while let Poll::Ready(ev) = handler.poll(cx) { + match ev { + ProtocolsHandlerEvent::OutboundSubstreamRequest { .. } => + error!("Incoming substream handler tried to open a substream"), + ProtocolsHandlerEvent::Close(err) => void::unreachable(err), + ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(_)) => + match self.enabled { + EnabledState::Initial => self.pending_in.push(handler_num), + EnabledState::Enabled => { + // We create `handshake_message` on a separate line to be sure + // that the lock is released as soon as possible. + let handshake_message = handshake_message.read().clone(); + handler.inject_event(NotifsInHandlerIn::Accept(handshake_message)) + }, + EnabledState::Disabled => + handler.inject_event(NotifsInHandlerIn::Refuse), }, - EnabledState::Disabled => - handler.inject_event(NotifsInHandlerIn::Refuse), + ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed) => {}, + ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(message)) => { + if self.notifications_sink_rx.is_some() { + let msg = NotifsHandlerOut::Notification { + message, + protocol_name: handler.protocol_name().to_owned().into(), + }; + return Poll::Ready(ProtocolsHandlerEvent::Custom(msg)); + } }, - ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed) => {}, - ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(message)) => { - // `notifications_sink_rx.is_some()` is how we know whether the user has - // been informed of the substreams being open. - // `notifications_sink_rx` containing `None` should only ever happen for - // the last few messages being received during a shutdown, or if the - // remote is impolite. In order to enforce API invariants, we have no other - // choice but to discard the message. - if self.notifications_sink_rx.is_some() { - let msg = NotifsHandlerOut::Notification { - message, - protocol_name: handler.protocol_name().to_owned().into(), - }; - return Poll::Ready(ProtocolsHandlerEvent::Custom(msg)); - } - }, + } } } } From 279020c2ea20f1fe8cc642dd364ed89361772c9b Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 20 Aug 2020 17:19:09 +0200 Subject: [PATCH 5/6] Another fix attempt --- client/network/src/protocol/generic_proto/handler/group.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index 1c0b6c1be2928..f8f7fdb3f5152 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -647,7 +647,8 @@ impl ProtocolsHandler for NotifsHandler { .. }) => { self.pending_legacy_handshake = Some(received_handshake); - break; + cx.waker().wake(); + return Poll::Pending; }, ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { reason, .. }) => { // We consciously drop the receivers despite notifications being potentially From 5104ed6175401f4681ae682c97f0ccc418332d3e Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 20 Aug 2020 17:33:15 +0200 Subject: [PATCH 6/6] Update client/network/src/protocol/generic_proto/handler/group.rs Co-authored-by: Max Inden --- client/network/src/protocol/generic_proto/handler/group.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index f8f7fdb3f5152..bcdbaf848511f 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -647,7 +647,7 @@ impl ProtocolsHandler for NotifsHandler { .. }) => { self.pending_legacy_handshake = Some(received_handshake); - cx.waker().wake(); + cx.waker().wake_by_ref(); return Poll::Pending; }, ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { reason, .. }) => {