diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index ff3748bd55cf2..d037057e50090 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -351,6 +351,22 @@ impl BlockAnnouncesHandshake { } } +/// Builds a SCALE-encoded "Status" message to send as handshake for the legacy protocol. +fn build_status_message(protocol_config: &ProtocolConfig, chain: &Arc>) -> Vec { + let info = chain.info(); + let status = message::generic::Status { + version: CURRENT_VERSION, + min_supported_version: MIN_VERSION, + genesis_hash: info.genesis_hash, + roles: protocol_config.roles.into(), + best_number: info.best_number, + best_hash: info.best_hash, + chain_status: Vec::new(), // TODO: find a way to make this backwards-compatible + }; + + Message::::Status(status).encode() +} + /// Fallback mechanism to use to send a notification if no substream is open. #[derive(Debug, Clone, PartialEq, Eq)] enum Fallback { @@ -403,6 +419,7 @@ impl Protocol { local_peer_id, protocol_id.clone(), versions, + build_status_message(&config, &chain), peerset, queue_size_report ); @@ -547,6 +564,11 @@ impl Protocol { pub fn update_chain(&mut self) { let info = self.context_data.chain.info(); self.sync.update_chain_info(&info.best_hash, info.best_number); + self.behaviour.set_legacy_handshake_message(build_status_message(&self.config, &self.context_data.chain)); + self.behaviour.set_notif_protocol_handshake( + &self.block_announces_protocol, + BlockAnnouncesHandshake::build(&self.config, &self.context_data.chain).encode() + ); } /// Inform sync about an own imported block. @@ -683,7 +705,6 @@ impl Protocol { pub fn on_peer_connected(&mut self, who: PeerId) { trace!(target: "sync", "Connecting {}", who); self.handshaking_peers.insert(who.clone(), HandshakingPeer { timestamp: Instant::now() }); - self.send_status(who); } /// Called by peer when it is disconnecting @@ -1329,22 +1350,6 @@ impl Protocol { } } - /// Send Status message - fn send_status(&mut self, who: PeerId) { - let info = self.context_data.chain.info(); - let status = message::generic::Status { - version: CURRENT_VERSION, - min_supported_version: MIN_VERSION, - genesis_hash: info.genesis_hash, - roles: self.config.roles, - best_number: info.best_number, - best_hash: info.best_hash, - chain_status: Vec::new(), // TODO: find a way to make this backwards-compatible - }; - - self.send_message(&who, None, GenericMessage::Status(status)) - } - fn on_block_announce( &mut self, who: PeerId, @@ -1498,6 +1503,7 @@ impl Protocol { }); if let Some((best_num, best_hash)) = new_best { self.sync.update_chain_info(&best_hash, best_num); + self.behaviour.set_legacy_handshake_message(build_status_message(&self.config, &self.context_data.chain)); self.behaviour.set_notif_protocol_handshake( &self.block_announces_protocol, BlockAnnouncesHandshake::build(&self.config, &self.context_data.chain).encode() diff --git a/client/network/src/protocol/generic_proto/behaviour.rs b/client/network/src/protocol/generic_proto/behaviour.rs index be2451c3f4a03..48b75b6321235 100644 --- a/client/network/src/protocol/generic_proto/behaviour.rs +++ b/client/network/src/protocol/generic_proto/behaviour.rs @@ -30,12 +30,13 @@ use libp2p::swarm::{ PollParameters }; use log::{debug, error, trace, warn}; +use parking_lot::RwLock; use prometheus_endpoint::HistogramVec; use rand::distributions::{Distribution as _, Uniform}; use smallvec::SmallVec; use std::task::{Context, Poll}; use std::{borrow::Cow, cmp, collections::{hash_map::Entry, VecDeque}}; -use std::{error, mem, pin::Pin, str, time::Duration}; +use std::{error, mem, pin::Pin, str, sync::Arc, time::Duration}; use wasm_timer::Instant; /// Network behaviour that handles opening substreams for custom protocols with other peers. @@ -118,7 +119,7 @@ pub struct GenericProto { /// Notification protocols. Entries are only ever added and not removed. /// Contains, for each protocol, the protocol name and the message to send as part of the /// initial handshake. - notif_protocols: Vec<(Cow<'static, [u8]>, Vec)>, + notif_protocols: Vec<(Cow<'static, [u8]>, Arc>>)>, /// Receiver for instructions about who to connect to or disconnect from. peerset: sc_peerset::Peerset, @@ -220,20 +221,6 @@ enum PeerState { } impl PeerState { - /// True if there exists any established connection to the peer. - fn is_connected(&self) -> bool { - match self { - PeerState::Disabled { .. } | - PeerState::DisabledPendingEnable { .. } | - PeerState::Enabled { .. } | - PeerState::PendingRequest { .. } | - PeerState::Requested | - PeerState::Incoming { .. } => true, - PeerState::Poisoned | - PeerState::Banned { .. } => false, - } - } - /// True if there exists an established connection to the peer /// that is open for custom protocol traffic. fn is_open(&self) -> bool { @@ -343,10 +330,12 @@ impl GenericProto { local_peer_id: PeerId, protocol: impl Into, versions: &[u8], + handshake_message: Vec, peerset: sc_peerset::Peerset, queue_size_report: Option, ) -> Self { - let legacy_protocol = RegisteredProtocol::new(protocol, versions); + let legacy_handshake_message = Arc::new(RwLock::new(handshake_message)); + let legacy_protocol = RegisteredProtocol::new(protocol, versions, legacy_handshake_message); GenericProto { local_peer_id, @@ -372,7 +361,7 @@ impl GenericProto { protocol_name: impl Into>, handshake_msg: impl Into> ) { - self.notif_protocols.push((protocol_name.into(), handshake_msg.into())); + self.notif_protocols.push((protocol_name.into(), Arc::new(RwLock::new(handshake_msg.into())))); } /// Modifies the handshake of the given notifications protocol. @@ -383,24 +372,17 @@ impl GenericProto { protocol_name: &[u8], handshake_message: impl Into> ) { - let handshake_message = handshake_message.into(); if let Some(protocol) = self.notif_protocols.iter_mut().find(|(name, _)| name == &protocol_name) { - protocol.1 = handshake_message.clone(); - } else { - return; + *protocol.1.write() = handshake_message.into(); } + } - // Send an event to all the peers we're connected to, updating the handshake message. - for (peer_id, _) in self.peers.iter().filter(|(_, state)| state.is_connected()) { - self.events.push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: peer_id.clone(), - handler: NotifyHandler::All, - event: NotifsHandlerIn::UpdateHandshake { - protocol_name: Cow::Owned(protocol_name.to_owned()), - handshake_message: handshake_message.clone(), - }, - }); - } + /// Modifies the handshake of the legacy protocol. + pub fn set_legacy_handshake_message( + &mut self, + handshake_message: impl Into> + ) { + *self.legacy_protocol.handshake_message().write() = handshake_message.into(); } /// Returns the number of discovered nodes that we keep in memory. diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index 625916a05e4a4..ed3e564223667 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -64,8 +64,9 @@ use libp2p::swarm::{ NegotiatedSubstream, }; use log::{debug, error}; +use parking_lot::RwLock; use prometheus_endpoint::HistogramVec; -use std::{borrow::Cow, error, io, str, task::{Context, Poll}}; +use std::{borrow::Cow, error, io, str, sync::Arc, task::{Context, Poll}}; /// Implements the `IntoProtocolsHandler` trait of libp2p. /// @@ -77,10 +78,10 @@ use std::{borrow::Cow, error, io, str, task::{Context, Poll}}; pub struct NotifsHandlerProto { /// Prototypes for handlers for inbound substreams, and the message we respond with in the /// handshake. - in_handlers: Vec<(NotifsInHandlerProto, Vec)>, + in_handlers: Vec<(NotifsInHandlerProto, Arc>>)>, /// Prototypes for handlers for outbound substreams, and the initial handshake message we send. - out_handlers: Vec<(NotifsOutHandlerProto, Vec)>, + out_handlers: Vec<(NotifsOutHandlerProto, Arc>>)>, /// Prototype for handler for backwards-compatibility. legacy: LegacyProtoHandlerProto, @@ -91,10 +92,10 @@ pub struct NotifsHandlerProto { /// See the documentation at the module level for more information. pub struct NotifsHandler { /// Handlers for inbound substreams, and the message we respond with in the handshake. - in_handlers: Vec<(NotifsInHandler, Vec)>, + in_handlers: Vec<(NotifsInHandler, Arc>>)>, /// Handlers for outbound substreams, and the initial handshake message we send. - out_handlers: Vec<(NotifsOutHandler, Vec)>, + out_handlers: Vec<(NotifsOutHandler, Arc>>)>, /// Handler for backwards-compatibility. legacy: LegacyProtoHandler, @@ -161,18 +162,6 @@ pub enum NotifsHandlerIn { message: Vec, }, - /// Modifies the handshake message of a notifications protocol. - UpdateHandshake { - /// Name of the protocol for the message. - /// - /// Must match one of the registered protocols. - protocol_name: Cow<'static, [u8]>, - - /// The new handshake message to send if we open a substream or if the remote opens a - /// substream towards us. - handshake_message: Vec, - }, - /// Sends a notifications message. SendNotification { /// Name of the protocol for the message. @@ -253,7 +242,7 @@ impl NotifsHandlerProto { /// messages queue. If passed, it must have one label for the protocol name. pub fn new( legacy: RegisteredProtocol, - list: impl Into, Vec)>>, + list: impl Into, Arc>>)>>, queue_size_report: Option ) -> Self { let list = list.into(); @@ -346,12 +335,17 @@ impl ProtocolsHandler for NotifsHandler { self.enabled = EnabledState::Enabled; self.legacy.inject_event(LegacyProtoHandlerIn::Enable); for (handler, initial_message) in &mut self.out_handlers { + // We create `initial_message` on a separate line to be sure that the lock + // is released as soon as possible. + let initial_message = initial_message.read().clone(); handler.inject_event(NotifsOutHandlerIn::Enable { - initial_message: initial_message.clone(), + initial_message, }); } for num in self.pending_in.drain(..) { - let handshake_message = self.in_handlers[num].1.clone(); + // We create `handshake_message` on a separate line to be sure + // that the lock is released as soon as possible. + let handshake_message = self.in_handlers[num].1.read().clone(); self.in_handlers[num].0 .inject_event(NotifsInHandlerIn::Accept(handshake_message)); } @@ -375,18 +369,6 @@ impl ProtocolsHandler for NotifsHandler { }, NotifsHandlerIn::SendLegacy { message } => self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { message }), - NotifsHandlerIn::UpdateHandshake { protocol_name, handshake_message } => { - for (handler, current_handshake) in &mut self.in_handlers { - if handler.protocol_name() == &*protocol_name { - *current_handshake = handshake_message.clone(); - } - } - for (handler, current_handshake) in &mut self.out_handlers { - if handler.protocol_name() == &*protocol_name { - *current_handshake = handshake_message.clone(); - } - } - } NotifsHandlerIn::SendNotification { message, encoded_fallback_message, protocol_name } => { for (handler, _) in &mut self.out_handlers { if handler.protocol_name() != &protocol_name[..] { @@ -524,8 +506,12 @@ impl ProtocolsHandler for NotifsHandler { ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(_)) => match self.enabled { EnabledState::Initial => self.pending_in.push(handler_num), - EnabledState::Enabled => - handler.inject_event(NotifsInHandlerIn::Accept(handshake_message.clone())), + EnabledState::Enabled => { + // We create `handshake_message` on a separate line to be sure + // that the lock is released as soon as possible. + let handshake_message = handshake_message.read().clone(); + handler.inject_event(NotifsInHandlerIn::Accept(handshake_message)) + }, EnabledState::Disabled => handler.inject_event(NotifsInHandlerIn::Refuse), }, diff --git a/client/network/src/protocol/generic_proto/tests.rs b/client/network/src/protocol/generic_proto/tests.rs index de02ac5f3468d..f932a3a08916f 100644 --- a/client/network/src/protocol/generic_proto/tests.rs +++ b/client/network/src/protocol/generic_proto/tests.rs @@ -83,7 +83,7 @@ fn build_nodes() -> (Swarm, Swarm) { }); let behaviour = CustomProtoWithAddr { - inner: GenericProto::new(local_peer_id, &b"test"[..], &[1], peerset, None), + inner: GenericProto::new(local_peer_id, &b"test"[..], &[1], vec![], peerset, None), addrs: addrs .iter() .enumerate() @@ -241,6 +241,8 @@ fn two_nodes_transfer_lots_of_packets() { ); } }, + // An empty handshake is being sent after opening. + Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {}, _ => panic!(), } } @@ -251,6 +253,8 @@ fn two_nodes_transfer_lots_of_packets() { loop { match ready!(service2.poll_next_unpin(cx)) { Some(GenericProtoOut::CustomProtocolOpen { .. }) => {}, + // An empty handshake is being sent after opening. + Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {}, Some(GenericProtoOut::LegacyMessage { message, .. }) => { match Message::::decode(&mut &message[..]).unwrap() { Message::::BlockResponse(BlockResponse { id: _, blocks }) => { @@ -312,6 +316,8 @@ fn basic_two_nodes_requests_in_parallel() { service1.send_packet(&peer_id, msg.encode()); } }, + // An empty handshake is being sent after opening. + Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {}, _ => panic!(), } } @@ -321,6 +327,8 @@ fn basic_two_nodes_requests_in_parallel() { loop { match ready!(service2.poll_next_unpin(cx)) { Some(GenericProtoOut::CustomProtocolOpen { .. }) => {}, + // An empty handshake is being sent after opening. + Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {}, Some(GenericProtoOut::LegacyMessage { message, .. }) => { let pos = to_receive.iter().position(|m| m.encode() == message).unwrap(); to_receive.remove(pos); diff --git a/client/network/src/protocol/generic_proto/upgrade/legacy.rs b/client/network/src/protocol/generic_proto/upgrade/legacy.rs index 13560113bb1a2..538532c1aff6b 100644 --- a/client/network/src/protocol/generic_proto/upgrade/legacy.rs +++ b/client/network/src/protocol/generic_proto/upgrade/legacy.rs @@ -21,7 +21,8 @@ use bytes::BytesMut; use futures::prelude::*; use futures_codec::Framed; use libp2p::core::{Endpoint, UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName}; -use std::{collections::VecDeque, io, pin::Pin, vec::IntoIter as VecIntoIter}; +use parking_lot::RwLock; +use std::{collections::VecDeque, io, pin::Pin, sync::Arc, vec::IntoIter as VecIntoIter}; use std::task::{Context, Poll}; use unsigned_varint::codec::UviBytes; @@ -38,12 +39,13 @@ pub struct RegisteredProtocol { /// List of protocol versions that we support. /// Ordered in descending order so that the best comes first. supported_versions: Vec, + /// Handshake to send after the substream is open. + handshake_message: Arc>>, } impl RegisteredProtocol { - /// Creates a new `RegisteredProtocol`. The `custom_data` parameter will be - /// passed inside the `RegisteredProtocolOutput`. - pub fn new(protocol: impl Into, versions: &[u8]) + /// Creates a new `RegisteredProtocol`. + pub fn new(protocol: impl Into, versions: &[u8], handshake_message: Arc>>) -> Self { let protocol = protocol.into(); let mut base_name = b"/substrate/".to_vec(); @@ -58,8 +60,14 @@ impl RegisteredProtocol { tmp.sort_unstable_by(|a, b| b.cmp(&a)); tmp }, + handshake_message, } } + + /// Returns the `Arc` to the handshake message that was passed at initialization. + pub fn handshake_message(&self) -> &Arc>> { + &self.handshake_message + } } impl Clone for RegisteredProtocol { @@ -68,6 +76,7 @@ impl Clone for RegisteredProtocol { id: self.id.clone(), base_name: self.base_name.clone(), supported_versions: self.supported_versions.clone(), + handshake_message: self.handshake_message.clone(), } } } @@ -244,10 +253,10 @@ impl ProtocolName for RegisteredProtocolName { } impl InboundUpgrade for RegisteredProtocol -where TSubstream: AsyncRead + AsyncWrite + Unpin, +where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, { type Output = RegisteredProtocolSubstream; - type Future = future::Ready>; + type Future = Pin> + Send>>; type Error = io::Error; fn upgrade_inbound( @@ -255,26 +264,31 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin, socket: TSubstream, info: Self::Info, ) -> Self::Future { - let framed = { - let mut codec = UviBytes::default(); - codec.set_max_len(16 * 1024 * 1024); // 16 MiB hard limit for packets. - Framed::new(socket, codec) - }; - - future::ok(RegisteredProtocolSubstream { - is_closing: false, - endpoint: Endpoint::Listener, - send_queue: VecDeque::new(), - requires_poll_flush: false, - inner: framed.fuse(), - protocol_version: info.version, - clogged_fuse: false, + Box::pin(async move { + let mut framed = { + let mut codec = UviBytes::default(); + codec.set_max_len(16 * 1024 * 1024); // 16 MiB hard limit for packets. + Framed::new(socket, codec) + }; + + let handshake = BytesMut::from(&self.handshake_message.read()[..]); + framed.send(handshake).await?; + + Ok(RegisteredProtocolSubstream { + is_closing: false, + endpoint: Endpoint::Listener, + send_queue: VecDeque::new(), + requires_poll_flush: false, + inner: framed.fuse(), + protocol_version: info.version, + clogged_fuse: false, + }) }) } } impl OutboundUpgrade for RegisteredProtocol -where TSubstream: AsyncRead + AsyncWrite + Unpin, +where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, { type Output = >::Output; type Future = >::Future; @@ -285,16 +299,25 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin, socket: TSubstream, info: Self::Info, ) -> Self::Future { - let framed = Framed::new(socket, UviBytes::default()); - - future::ok(RegisteredProtocolSubstream { - is_closing: false, - endpoint: Endpoint::Dialer, - send_queue: VecDeque::new(), - requires_poll_flush: false, - inner: framed.fuse(), - protocol_version: info.version, - clogged_fuse: false, + Box::pin(async move { + let mut framed = { + let mut codec = UviBytes::default(); + codec.set_max_len(16 * 1024 * 1024); // 16 MiB hard limit for packets. + Framed::new(socket, codec) + }; + + let handshake = BytesMut::from(&self.handshake_message.read()[..]); + framed.send(handshake).await?; + + Ok(RegisteredProtocolSubstream { + is_closing: false, + endpoint: Endpoint::Dialer, + send_queue: VecDeque::new(), + requires_poll_flush: false, + inner: framed.fuse(), + protocol_version: info.version, + clogged_fuse: false, + }) }) } }