From a28ff26018dcb567f1bf66cfd7a6463633db5cee Mon Sep 17 00:00:00 2001 From: timorl Date: Fri, 4 Nov 2022 10:18:55 +0100 Subject: [PATCH 1/4] One-sided connections, with broken backwards compatibility --- .../src/validator_network/incoming.rs | 9 +- .../validator_network/manager/direction.rs | 211 +++++++++ .../{manager.rs => manager/legacy.rs} | 31 +- .../src/validator_network/manager/mod.rs | 326 +++++++++++++ finality-aleph/src/validator_network/mod.rs | 3 - .../src/validator_network/outgoing.rs | 8 +- .../{ => protocols}/handshake.rs | 0 .../src/validator_network/protocols/mod.rs | 138 ++++++ .../negotiation.rs} | 25 +- .../{ => protocols/v0}/heartbeat.rs | 0 .../{protocols.rs => protocols/v0/mod.rs} | 125 +---- .../src/validator_network/protocols/v1/mod.rs | 445 ++++++++++++++++++ .../src/validator_network/service.rs | 51 +- 13 files changed, 1178 insertions(+), 194 deletions(-) create mode 100644 finality-aleph/src/validator_network/manager/direction.rs rename finality-aleph/src/validator_network/{manager.rs => manager/legacy.rs} (92%) create mode 100644 finality-aleph/src/validator_network/manager/mod.rs rename finality-aleph/src/validator_network/{ => protocols}/handshake.rs (100%) create mode 100644 finality-aleph/src/validator_network/protocols/mod.rs rename finality-aleph/src/validator_network/{protocol_negotiation.rs => protocols/negotiation.rs} (92%) rename finality-aleph/src/validator_network/{ => protocols/v0}/heartbeat.rs (100%) rename finality-aleph/src/validator_network/{protocols.rs => protocols/v0/mod.rs} (81%) create mode 100644 finality-aleph/src/validator_network/protocols/v1/mod.rs diff --git a/finality-aleph/src/validator_network/incoming.rs b/finality-aleph/src/validator_network/incoming.rs index 2200c41b8b..4dae3f3052 100644 --- a/finality-aleph/src/validator_network/incoming.rs +++ b/finality-aleph/src/validator_network/incoming.rs @@ -1,14 +1,13 @@ use std::fmt::{Display, Error as FmtError, Formatter}; use aleph_primitives::AuthorityId; -use futures::channel::{mpsc, oneshot}; +use futures::channel::mpsc; use log::{debug, info}; use crate::{ crypto::AuthorityPen, validator_network::{ - protocol_negotiation::{protocol, ProtocolNegotiationError}, - protocols::ProtocolError, + protocols::{protocol, ProtocolError, ProtocolNegotiationError}, Data, Splittable, }, }; @@ -43,7 +42,7 @@ impl From for IncomingError { async fn manage_incoming( authority_pen: AuthorityPen, stream: S, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, oneshot::Sender<()>)>, + result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option>)>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), IncomingError> { debug!(target: "validator-network", "Performing incoming protocol negotiation."); @@ -62,7 +61,7 @@ async fn manage_incoming( pub async fn incoming( authority_pen: AuthorityPen, stream: S, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, oneshot::Sender<()>)>, + result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option>)>, data_for_user: mpsc::UnboundedSender, ) { if let Err(e) = manage_incoming(authority_pen, stream, result_for_parent, data_for_user).await { diff --git a/finality-aleph/src/validator_network/manager/direction.rs b/finality-aleph/src/validator_network/manager/direction.rs new file mode 100644 index 0000000000..beb8df9564 --- /dev/null +++ b/finality-aleph/src/validator_network/manager/direction.rs @@ -0,0 +1,211 @@ +use std::collections::{HashMap, HashSet}; + +use aleph_primitives::AuthorityId; + +use crate::validator_network::Data; + +pub struct DirectedPeers { + own_id: AuthorityId, + outgoing: HashMap>, + incoming: HashSet, +} + +fn bit_xor_sum_parity((a, b): (u8, u8)) -> u8 { + let mut result = 0; + for i in 0..8 { + result += ((a >>i)^(b>>i)) % 2; + } + result%2 +} + +// Whether we shold call the remote or the other way around. We xor the peer ids and based on the +// parity of the sum of bits of the result decide whether the caller should be the smaller or +// greated lexicographically. They are never equal, because cryptography. +fn should_call(own_id: &[u8], remote_id: &[u8]) -> bool { + let xor_sum_parity: u8 = own_id.iter().cloned().zip(remote_id.iter().cloned()).map(bit_xor_sum_parity).fold(0u8, |a, b| (a + b) % 2); + match xor_sum_parity == 0 { + true => own_id < remote_id, + false => own_id > remote_id, + } +} + +impl DirectedPeers { + /// Create a new set of peers directed using our own peer id. + pub fn new(own_id: AuthorityId) -> Self { + DirectedPeers { + own_id, + outgoing: HashMap::new(), + incoming: HashSet::new(), + } + } + + /// Add a peer to the list of peers we want to stay connected to, or + /// update the list of addresses if the peer was already added. + /// Returns whether we should start attampts at connecting with the peer, which is the case + /// exactly when the peer is one with which we should attempt connections AND it was added for + /// the first time. + pub fn add_peer(&mut self, peer_id: AuthorityId, addresses: Vec) -> bool { + match should_call(self.own_id.as_ref(), peer_id.as_ref()) { + true => self.outgoing.insert(peer_id, addresses).is_none(), + false => { + // We discard the addresses here, as we will never want to call this peer anyway, + // so we don't need them. + self.incoming.insert(peer_id); + false + }, + } + } + + /// Return the addresses of the given peer, or None if we shouldn't attempt connecting with the peer. + pub fn peer_addresses(&self, peer_id: &AuthorityId) -> Option> { + self.outgoing.get(peer_id).cloned() + } + + /// Whether we should be maintaining a connection with this peer. + pub fn interested(&self, peer_id: &AuthorityId) -> bool { + self.incoming.contains(peer_id) || self.outgoing.contains_key(peer_id) + } + + /// Iterator over the peers we want connections from. + pub fn incoming_peers(&self) -> impl Iterator { + self.incoming.iter() + } + + /// Iterator over the peers we want to connect to. + pub fn outgoing_peers(&self) -> impl Iterator { + self.outgoing.keys() + } + + /// Remove a peer from the list of peers that we want to stay connected with, whether the + /// connection was supposed to be incoming or outgoing. + pub fn remove_peer(&mut self, peer_id: &AuthorityId) { + self.incoming.remove(peer_id); + self.outgoing.remove(peer_id); + } +} + +#[cfg(test)] +mod tests { + use aleph_primitives::AuthorityId; + + use super::DirectedPeers; + use crate::validator_network::mock::keys; + + type Address = String; + + async fn container_with_id() -> (DirectedPeers
, AuthorityId) { + let (own_id, _) = keys().await; + let own_container = DirectedPeers::new(own_id.clone()); + (own_container, own_id) + } + + #[tokio::test] + async fn exactly_one_direction_attempts_connections() { + let (mut own_container, own_id) = container_with_id().await; + let (mut remote_container, remote_id) = container_with_id().await; + let addresses = vec![ + String::from(""), + String::from("a/b/c"), + String::from("43.43.43.43:43000"), + ]; + assert!(own_container.add_peer(remote_id, addresses.clone()) != remote_container.add_peer(own_id, addresses.clone())); + } + + async fn container_with_added_connecting_peer() -> (DirectedPeers
, AuthorityId) { + let (mut own_container, own_id) = container_with_id().await; + let (mut remote_container, remote_id) = container_with_id().await; + let addresses = vec![ + String::from(""), + String::from("a/b/c"), + String::from("43.43.43.43:43000"), + ]; + match own_container.add_peer(remote_id.clone(), addresses.clone()) { + true => (own_container, remote_id), + false => { + remote_container.add_peer(own_id.clone(), addresses); + (remote_container, own_id) + }, + } + } + + async fn container_with_added_nonconnecting_peer() -> (DirectedPeers
, AuthorityId) { + let (mut own_container, own_id) = container_with_id().await; + let (mut remote_container, remote_id) = container_with_id().await; + let addresses = vec![ + String::from(""), + String::from("a/b/c"), + String::from("43.43.43.43:43000"), + ]; + match own_container.add_peer(remote_id.clone(), addresses.clone()) { + false => (own_container, remote_id), + true => { + remote_container.add_peer(own_id.clone(), addresses); + (remote_container, own_id) + }, + } + } + + #[tokio::test] + async fn no_connecting_on_readd() { + let (mut own_container, remote_id) = container_with_added_connecting_peer().await; + let addresses = vec![ + String::from(""), + String::from("a/b/c"), + String::from("43.43.43.43:43000"), + ]; + assert!(!own_container.add_peer(remote_id, addresses)); + } + + #[tokio::test] + async fn peer_addresses_when_connecting() { + let (own_container, remote_id) = container_with_added_connecting_peer().await; + assert!(own_container.peer_addresses(&remote_id).is_some()); + } + + #[tokio::test] + async fn no_peer_addresses_when_nonconnecting() { + let (own_container, remote_id) = container_with_added_nonconnecting_peer().await; + assert!(own_container.peer_addresses(&remote_id).is_none()); + } + + #[tokio::test] + async fn interested_in_connecting() { + let (own_container, remote_id) = container_with_added_connecting_peer().await; + assert!(own_container.interested(&remote_id)); + } + + #[tokio::test] + async fn interested_in_nonconnecting() { + let (own_container, remote_id) = container_with_added_nonconnecting_peer().await; + assert!(own_container.interested(&remote_id)); + } + + #[tokio::test] + async fn uninterested_in_unknown() { + let (own_container, _) = container_with_id().await; + let (_, remote_id) = container_with_id().await; + assert!(!own_container.interested(&remote_id)); + } + + #[tokio::test] + async fn connecting_are_outgoing() { + let (own_container, remote_id) = container_with_added_connecting_peer().await; + assert_eq!(own_container.outgoing_peers().collect::>(), vec![&remote_id]); + assert_eq!(own_container.incoming_peers().next(), None); + } + + #[tokio::test] + async fn nonconnecting_are_incoming() { + let (own_container, remote_id) = container_with_added_nonconnecting_peer().await; + assert_eq!(own_container.incoming_peers().collect::>(), vec![&remote_id]); + assert_eq!(own_container.outgoing_peers().next(), None); + } + + #[tokio::test] + async fn uninterested_in_removed() { + let (mut own_container, remote_id) = container_with_added_connecting_peer().await; + assert!(own_container.interested(&remote_id)); + own_container.remove_peer(&remote_id); + assert!(!own_container.interested(&remote_id)); + } +} diff --git a/finality-aleph/src/validator_network/manager.rs b/finality-aleph/src/validator_network/manager/legacy.rs similarity index 92% rename from finality-aleph/src/validator_network/manager.rs rename to finality-aleph/src/validator_network/manager/legacy.rs index c765c33a78..d889f57d15 100644 --- a/finality-aleph/src/validator_network/manager.rs +++ b/finality-aleph/src/validator_network/manager/legacy.rs @@ -6,7 +6,7 @@ use std::{ use aleph_primitives::AuthorityId; use futures::channel::{mpsc, oneshot}; -use crate::{network::PeerId, validator_network::Data}; +use crate::{network::PeerId, validator_network::{Data, manager::{AddResult, SendError}}}; /// Network component responsible for holding the list of peers that we /// want to connect to, and managing the established connections. @@ -16,24 +16,6 @@ pub struct Manager { incoming: HashMap>, } -/// Error during sending data through the Manager -#[derive(Debug, PartialEq, Eq)] -pub enum SendError { - /// Outgoing network connection closed - ConnectionClosed, - /// Peer not added to the manager - PeerNotFound, -} - -impl Display for SendError { - fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { - use SendError::*; - match self { - ConnectionClosed => write!(f, "worker dead"), - PeerNotFound => write!(f, "peer not found"), - } - } -} struct ManagerStatus { wanted_peers: usize, @@ -149,17 +131,6 @@ impl Display for ManagerStatus { } } -/// Possible results of adding connections. -#[derive(Debug, PartialEq, Eq)] -pub enum AddResult { - /// We do not want to maintain a connection with this peer. - Uninterested, - /// Connection added. - Added, - /// Old connection replaced with new one. - Replaced, -} - impl Manager { /// Create a new Manager with empty list of peers. pub fn new() -> Self { diff --git a/finality-aleph/src/validator_network/manager/mod.rs b/finality-aleph/src/validator_network/manager/mod.rs new file mode 100644 index 0000000000..0a331fbe6e --- /dev/null +++ b/finality-aleph/src/validator_network/manager/mod.rs @@ -0,0 +1,326 @@ +use std::{ + collections::{HashMap, HashSet}, + fmt::{Display, Error as FmtError, Formatter}, +}; + +use aleph_primitives::AuthorityId; +use futures::channel::mpsc; + +use crate::{network::PeerId, validator_network::Data}; + +mod direction; +mod legacy; + +pub use legacy::Manager as LegacyManager; +use direction::DirectedPeers; + +/// Network component responsible for holding the list of peers that we +/// want to connect to or let them connect to us, and managing the established +/// connections. +pub struct Manager { + wanted: DirectedPeers, + have: HashMap>, +} + +/// Error during sending data through the Manager +#[derive(Debug, PartialEq, Eq)] +pub enum SendError { + /// Outgoing network connection closed + ConnectionClosed, + /// Peer not added to the manager + PeerNotFound, +} + +impl Display for SendError { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { + use SendError::*; + match self { + ConnectionClosed => write!(f, "worker dead"), + PeerNotFound => write!(f, "peer not found"), + } + } +} + +struct ManagerStatus { + outgoing_peers: HashSet, + missing_outgoing: HashSet, + incoming_peers: HashSet, + missing_incoming: HashSet, +} + +impl ManagerStatus { + fn new(manager: &Manager) -> Self { + let mut incoming_peers = HashSet::new(); + let mut missing_incoming = HashSet::new(); + let mut outgoing_peers = HashSet::new(); + let mut missing_outgoing = HashSet::new(); + + for peer in manager.wanted.incoming_peers() { + match manager.active_connection(peer) { + true => incoming_peers.insert(peer.clone()), + false => missing_incoming.insert(peer.clone()), + }; + } + for peer in manager.wanted.outgoing_peers() { + match manager.active_connection(peer) { + true => outgoing_peers.insert(peer.clone()), + false => missing_outgoing.insert(peer.clone()), + }; + } + ManagerStatus { + incoming_peers, + missing_incoming, + outgoing_peers, + missing_outgoing, + } + } + + fn wanted_incoming(&self) -> usize { + self.incoming_peers.len() + self.missing_incoming.len() + } + + fn wanted_outgoing(&self) -> usize { + self.outgoing_peers.len() + self.missing_outgoing.len() + } +} + +fn pretty_authority_id_set(set: &HashSet) -> String { + set + .iter() + .map(|authority_id| authority_id.to_short_string()) + .collect::>() + .join(", ") +} + +impl Display for ManagerStatus { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { + let wanted_incoming = self.wanted_incoming(); + let wanted_outgoing = self.wanted_outgoing(); + if wanted_incoming + wanted_outgoing == 0 { + return write!(f, "not maintaining any connections; "); + } + + match wanted_incoming { + 0 => write!(f, "not expecting any incoming connections; ")?, + _ => { + write!(f, "expecting {:?} incoming connections; ", wanted_incoming)?; + match self.incoming_peers.is_empty() { + true => write!(f, "WARNING! No incoming peers even though we expected tham, maybe connecting to us is impossible; ")?, + false => write!( + f, + "have - {:?} [{}]; ", + self.incoming_peers.len(), + pretty_authority_id_set(&self.incoming_peers), + )?, + } + if !self.missing_incoming.is_empty() { + write!( + f, + "missing - {:?} [{}]; ", + self.missing_incoming.len(), + pretty_authority_id_set(&self.missing_incoming), + )?; + } + }, + } + + match wanted_outgoing { + 0 => write!(f, "not attempting any outgoing connections; ")?, + _ => { + write!(f, "attempting {:?} outgoing connections; ", wanted_outgoing)?; + if !self.outgoing_peers.is_empty() { + write!( + f, + "have - {:?} [{}]; ", + self.incoming_peers.len(), + pretty_authority_id_set(&self.outgoing_peers), + )?; + } + if !self.missing_outgoing.is_empty() { + write!( + f, + "missing - {:?} [{}]; ", + self.missing_incoming.len(), + pretty_authority_id_set(&self.missing_outgoing), + )?; + } + }, + } + + Ok(()) + } +} + +/// Possible results of adding connections. +#[derive(Debug, PartialEq, Eq)] +pub enum AddResult { + /// We do not want to maintain a connection with this peer. + Uninterested, + /// Connection added. + Added, + /// Old connection replaced with new one. + Replaced, +} + +impl Manager { + /// Create a new Manager with empty list of peers. + pub fn new(own_id: AuthorityId) -> Self { + Manager { + wanted: DirectedPeers::new(own_id), + have: HashMap::new(), + } + } + + fn active_connection(&self, peer_id: &AuthorityId) -> bool { + self.have.get(peer_id).map(|sender| !sender.is_closed()).unwrap_or(false) + } + + /// Add a peer to the list of peers we want to stay connected to, or + /// update the list of addresses if the peer was already added. + /// Returns whether we should start attempts at connecting with the peer. + pub fn add_peer(&mut self, peer_id: AuthorityId, addresses: Vec) -> bool { + self.wanted.add_peer(peer_id, addresses) + } + + /// Return the addresses of the given peer, or None if we shouldn't attempt connecting with the peer. + pub fn peer_addresses(&self, peer_id: &AuthorityId) -> Option> { + self.wanted.peer_addresses(peer_id) + } + + /// Add an established connection with a known peer, but only if the peer is among the peers we want to be connected to. + pub fn add_connection( + &mut self, + peer_id: AuthorityId, + data_for_network: mpsc::UnboundedSender, + ) -> AddResult { + use AddResult::*; + if !self.wanted.interested(&peer_id) { + return Uninterested; + } + match self.have.insert(peer_id, data_for_network) { + Some(_) => Replaced, + None => Added, + } + } + + /// Remove a peer from the list of peers that we want to stay connected with. + /// Close any incoming and outgoing connections that were established. + pub fn remove_peer(&mut self, peer_id: &AuthorityId) { + self.wanted.remove_peer(peer_id); + self.have.remove(peer_id); + } + + /// Send data to a peer. + /// Returns error if there is no outgoing connection to the peer, + /// or if the connection is dead. + pub fn send_to(&mut self, peer_id: &AuthorityId, data: D) -> Result<(), SendError> { + self.have + .get(peer_id) + .ok_or(SendError::PeerNotFound)? + .unbounded_send(data) + .map_err(|_| SendError::ConnectionClosed) + } + + /// A status of the manager, to be displayed somewhere. + pub fn status_report(&self) -> impl Display { + ManagerStatus::new(self) + } +} + +#[cfg(test)] +mod tests { + use futures::{ + channel::mpsc, + StreamExt, + }; + + use super::{AddResult::*, Manager, SendError}; + use crate::validator_network::mock::keys; + + type Data = String; + type Address = String; + + #[tokio::test] + async fn add_remove() { + let (own_id, _) = keys().await; + let mut manager = Manager::::new(own_id); + let (peer_id, _) = keys().await; + let (peer_id_b, _) = keys().await; + let addresses = vec![ + String::from(""), + String::from("a/b/c"), + String::from("43.43.43.43:43000"), + ]; + // add new peer - might return either true or false, depending on the ids + let attempting_connections = manager.add_peer(peer_id.clone(), addresses.clone()); + // add known peer - always returns false + assert!(!manager.add_peer(peer_id.clone(), addresses.clone())); + // get address + match attempting_connections { + true => assert_eq!(manager.peer_addresses(&peer_id), Some(addresses)), + false => assert_eq!(manager.peer_addresses(&peer_id), None), + } + // try to get address of an unknown peer + assert_eq!(manager.peer_addresses(&peer_id_b), None); + // remove peer + manager.remove_peer(&peer_id); + // try to get address of removed peer + assert_eq!(manager.peer_addresses(&peer_id), None); + // remove again + manager.remove_peer(&peer_id); + // remove unknown peer + manager.remove_peer(&peer_id_b); + } + + #[tokio::test] + async fn send_receive() { + let (mut connecting_id, _) = keys().await; + let mut connecting_manager = Manager::::new(connecting_id.clone()); + let (mut listening_id, _) = keys().await; + let mut listening_manager = Manager::::new(listening_id.clone()); + let data = String::from("DATA"); + let addresses = vec![ + String::from(""), + String::from("a/b/c"), + String::from("43.43.43.43:43000"), + ]; + let (tx, _rx) = mpsc::unbounded(); + // try add unknown peer + assert_eq!(connecting_manager.add_connection(listening_id.clone(), tx), Uninterested); + // sending should fail + assert_eq!( + connecting_manager.send_to(&listening_id, data.clone()), + Err(SendError::PeerNotFound) + ); + // add peer, this time for real + if connecting_manager.add_peer(listening_id.clone(), addresses.clone()) { + assert!(!listening_manager.add_peer(connecting_id.clone(), addresses.clone())) + } else { + // We need to switch the names around, because the connection was randomly the + // other way around. + let temp_id = connecting_id; + connecting_id = listening_id; + listening_id = temp_id; + let temp_manager = connecting_manager; + connecting_manager = listening_manager; + listening_manager = temp_manager; + assert!(connecting_manager.add_peer(listening_id.clone(), addresses.clone())); + } + // add outgoing to connecting + let (tx, mut rx) = mpsc::unbounded(); + assert_eq!(connecting_manager.add_connection(listening_id.clone(), tx), Added); + // send and receive connecting + assert!(connecting_manager.send_to(&listening_id, data.clone()).is_ok()); + assert_eq!(data, rx.next().await.expect("should receive")); + // add incoming to listening + let (tx, mut rx) = mpsc::unbounded(); + assert_eq!(listening_manager.add_connection(connecting_id.clone(), tx), Added); + // send and receive listening + assert!(listening_manager.send_to(&connecting_id, data.clone()).is_ok()); + assert_eq!(data, rx.next().await.expect("should receive")); + // remove peer + listening_manager.remove_peer(&connecting_id); + // receiving should fail + assert!(rx.next().await.is_none()); + } +} diff --git a/finality-aleph/src/validator_network/mod.rs b/finality-aleph/src/validator_network/mod.rs index a797b7e44a..d4f14aac4c 100644 --- a/finality-aleph/src/validator_network/mod.rs +++ b/finality-aleph/src/validator_network/mod.rs @@ -5,15 +5,12 @@ use codec::Codec; use sp_core::crypto::KeyTypeId; use tokio::io::{AsyncRead, AsyncWrite}; -mod handshake; -mod heartbeat; mod incoming; mod io; mod manager; #[cfg(test)] pub mod mock; mod outgoing; -mod protocol_negotiation; mod protocols; mod service; diff --git a/finality-aleph/src/validator_network/outgoing.rs b/finality-aleph/src/validator_network/outgoing.rs index 36115181ab..fd43c0f219 100644 --- a/finality-aleph/src/validator_network/outgoing.rs +++ b/finality-aleph/src/validator_network/outgoing.rs @@ -8,8 +8,7 @@ use tokio::time::{sleep, Duration}; use crate::{ crypto::AuthorityPen, validator_network::{ - protocol_negotiation::{protocol, ProtocolNegotiationError}, - protocols::ProtocolError, + protocols::{protocol, ProtocolError, ProtocolNegotiationError}, Data, Dialer, }, }; @@ -49,6 +48,7 @@ async fn manage_outgoing>( mut dialer: ND, addresses: Vec, result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option>)>, + data_for_user: mpsc::UnboundedSender, ) -> Result<(), OutgoingError> { debug!(target: "validator-network", "Trying to connect to {}.", peer_id); let stream = dialer @@ -59,7 +59,7 @@ async fn manage_outgoing>( let (stream, protocol) = protocol(stream).await?; debug!(target: "validator-network", "Negotiated protocol, running."); Ok(protocol - .manage_outgoing(stream, authority_pen, peer_id, result_for_parent) + .manage_outgoing(stream, authority_pen, peer_id, result_for_parent, data_for_user) .await?) } @@ -74,6 +74,7 @@ pub async fn outgoing>( dialer: ND, addresses: Vec, result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option>)>, + data_for_user: mpsc::UnboundedSender, ) { if let Err(e) = manage_outgoing( authority_pen, @@ -81,6 +82,7 @@ pub async fn outgoing>( dialer, addresses, result_for_parent.clone(), + data_for_user, ) .await { diff --git a/finality-aleph/src/validator_network/handshake.rs b/finality-aleph/src/validator_network/protocols/handshake.rs similarity index 100% rename from finality-aleph/src/validator_network/handshake.rs rename to finality-aleph/src/validator_network/protocols/handshake.rs diff --git a/finality-aleph/src/validator_network/protocols/mod.rs b/finality-aleph/src/validator_network/protocols/mod.rs new file mode 100644 index 0000000000..00ed8112a3 --- /dev/null +++ b/finality-aleph/src/validator_network/protocols/mod.rs @@ -0,0 +1,138 @@ +use std::fmt::{Display, Error as FmtError, Formatter}; + +use aleph_primitives::AuthorityId; +use futures::channel::mpsc; + +use crate::{ + crypto::AuthorityPen, + validator_network::{ + io::{ReceiveError, SendError}, + Data, Splittable, + }, +}; + + +mod handshake; +mod negotiation; +mod v0; +mod v1; + +use handshake::HandshakeError; + +pub use negotiation::{protocol, ProtocolNegotiationError}; + +pub type Version = u32; + +/// Defines the protocol for communication. +#[derive(Debug, PartialEq, Eq)] +pub enum Protocol { + /// The first version of the protocol, with unidirectional connections. + V0, + /// The current version of the protocol, with pseudorandom connection direction and + /// multiplexing. + V1, +} + +/// Protocol error. +#[derive(Debug)] +pub enum ProtocolError { + /// Error during performing a handshake. + HandshakeError(HandshakeError), + /// Sending failed. + SendError(SendError), + /// Receiving failed. + ReceiveError(ReceiveError), + /// Heartbeat stopped. + CardiacArrest, + /// Channel to the parent service closed. + NoParentConnection, + /// Data channel closed. + NoUserConnection, +} + +impl Display for ProtocolError { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { + use ProtocolError::*; + match self { + HandshakeError(e) => write!(f, "handshake error: {}", e), + SendError(e) => write!(f, "send error: {}", e), + ReceiveError(e) => write!(f, "receive error: {}", e), + CardiacArrest => write!(f, "heartbeat stopped"), + NoParentConnection => write!(f, "cannot send result to service"), + NoUserConnection => write!(f, "cannot send data to user"), + } + } +} + +impl From for ProtocolError { + fn from(e: HandshakeError) -> Self { + ProtocolError::HandshakeError(e) + } +} + +impl From for ProtocolError { + fn from(e: SendError) -> Self { + ProtocolError::SendError(e) + } +} + +impl From for ProtocolError { + fn from(e: ReceiveError) -> Self { + ProtocolError::ReceiveError(e) + } +} + +impl Protocol { + /// Launches the proper variant of the protocol (receiver half). + pub async fn manage_incoming( + &self, + stream: S, + authority_pen: AuthorityPen, + result_for_service: mpsc::UnboundedSender<(AuthorityId, Option>)>, + data_for_user: mpsc::UnboundedSender, + ) -> Result<(), ProtocolError> { + use Protocol::*; + match self { + V0 => v0::incoming(stream, authority_pen, result_for_service, data_for_user).await, + V1 => v1::incoming(stream, authority_pen, result_for_service, data_for_user).await, + } + } + + /// Launches the proper variant of the protocol (sender half). + pub async fn manage_outgoing( + &self, + stream: S, + authority_pen: AuthorityPen, + peer_id: AuthorityId, + result_for_service: mpsc::UnboundedSender<(AuthorityId, Option>)>, + data_for_user: mpsc::UnboundedSender, + ) -> Result<(), ProtocolError> { + use Protocol::*; + match self { + V0 => v0::outgoing(stream, authority_pen, peer_id, result_for_service).await, + V1 => v1::outgoing(stream, authority_pen, peer_id, result_for_service, data_for_user).await, + } + } + + /// Minimal supported protocol version. + pub fn min_version() -> Version { + 0 + } + + /// Maximal supported protocol version. + pub fn max_version() -> Version { + 1 + } +} + +impl TryFrom for Protocol { + type Error = Version; + + fn try_from(version: Version) -> Result { + match version { + 0 => Ok(Protocol::V0), + 1 => Ok(Protocol::V1), + unknown_version => Err(unknown_version), + } + } +} diff --git a/finality-aleph/src/validator_network/protocol_negotiation.rs b/finality-aleph/src/validator_network/protocols/negotiation.rs similarity index 92% rename from finality-aleph/src/validator_network/protocol_negotiation.rs rename to finality-aleph/src/validator_network/protocols/negotiation.rs index 6413dfc63b..13e5b7d9c9 100644 --- a/finality-aleph/src/validator_network/protocol_negotiation.rs +++ b/finality-aleph/src/validator_network/protocols/negotiation.rs @@ -8,17 +8,13 @@ use tokio::{ time::{timeout, Duration}, }; -use crate::validator_network::protocols::Protocol; +use crate::validator_network::protocols::{Protocol, Version}; -pub type ProtocolVersion = u32; - -const MIN_SUPPORTED_PROTOCOL: ProtocolVersion = 0; -const MAX_SUPPORTED_PROTOCOL: ProtocolVersion = 0; const PROTOCOL_NEGOTIATION_TIMEOUT: Duration = Duration::from_secs(5); /// A range of supported protocols, will fail to decode if the range is empty. #[derive(Clone, Debug, PartialEq, Eq)] -pub struct ProtocolsRange(ProtocolVersion, ProtocolVersion); +pub struct ProtocolsRange(Version, Version); impl Display for ProtocolsRange { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { @@ -26,8 +22,8 @@ impl Display for ProtocolsRange { } } -const fn supported_protocol_range() -> ProtocolsRange { - ProtocolsRange(MIN_SUPPORTED_PROTOCOL, MAX_SUPPORTED_PROTOCOL) +fn supported_protocol_range() -> ProtocolsRange { + ProtocolsRange(Protocol::min_version(), Protocol::max_version()) } /// What went wrong when negotiating a protocol. @@ -36,7 +32,7 @@ pub enum ProtocolNegotiationError { ConnectionClosed, InvalidRange(ProtocolsRange), ProtocolMismatch(ProtocolsRange, ProtocolsRange), - BadChoice(ProtocolVersion), + BadChoice(Version), TimedOut, } @@ -74,10 +70,10 @@ impl ProtocolsRange { fn decode(encoded: &[u8; 8]) -> Result { let result = ProtocolsRange( - ProtocolVersion::from_le_bytes( + Version::from_le_bytes( encoded[0..4].try_into().expect("this is literally 4 bytes"), ), - ProtocolVersion::from_le_bytes( + Version::from_le_bytes( encoded[4..8].try_into().expect("this is literally 4 bytes"), ), ); @@ -103,10 +99,7 @@ fn maximum_of_intersection( range1: ProtocolsRange, range2: ProtocolsRange, ) -> Result { - intersection(range1, range2).map(|intersection| match intersection.1 { - 0 => Ok(Protocol::V0), - unknown_version => Err(ProtocolNegotiationError::BadChoice(unknown_version)), - })? + intersection(range1, range2).map(|intersection| intersection.1.try_into().map_err(ProtocolNegotiationError::BadChoice))? } async fn negotiate_protocol_version( @@ -151,7 +144,7 @@ mod tests { fn correct_negotiation(result: Result<(S, Protocol), ProtocolNegotiationError>) { match result { - Ok((_stream, protocol)) => assert_eq!(Protocol::V0, protocol), + Ok((_stream, protocol)) => assert_eq!(Protocol::V1, protocol), Err(e) => panic!("Unexpected error: {:?}", e), } } diff --git a/finality-aleph/src/validator_network/heartbeat.rs b/finality-aleph/src/validator_network/protocols/v0/heartbeat.rs similarity index 100% rename from finality-aleph/src/validator_network/heartbeat.rs rename to finality-aleph/src/validator_network/protocols/v0/heartbeat.rs diff --git a/finality-aleph/src/validator_network/protocols.rs b/finality-aleph/src/validator_network/protocols/v0/mod.rs similarity index 81% rename from finality-aleph/src/validator_network/protocols.rs rename to finality-aleph/src/validator_network/protocols/v0/mod.rs index 54581c20bb..510d9bd230 100644 --- a/finality-aleph/src/validator_network/protocols.rs +++ b/finality-aleph/src/validator_network/protocols/v0/mod.rs @@ -1,8 +1,6 @@ -use std::fmt::{Display, Error as FmtError, Formatter}; - use aleph_primitives::AuthorityId; use futures::{ - channel::{mpsc, oneshot}, + channel::mpsc, StreamExt, }; use log::{debug, info, trace}; @@ -11,68 +9,18 @@ use tokio::io::{AsyncRead, AsyncWrite}; use crate::{ crypto::AuthorityPen, validator_network::{ - handshake::{v0_handshake_incoming, v0_handshake_outgoing, HandshakeError}, - heartbeat::{heartbeat_receiver, heartbeat_sender}, - io::{receive_data, send_data, ReceiveError, SendError}, + protocols::{ + ProtocolError, + handshake::{v0_handshake_incoming, v0_handshake_outgoing}, + }, + io::{receive_data, send_data}, Data, Splittable, }, }; -/// Defines the protocol for communication. -#[derive(Debug, PartialEq, Eq)] -pub enum Protocol { - /// The current version of the protocol. - V0, -} - -/// Protocol error. -#[derive(Debug)] -pub enum ProtocolError { - /// Error during performing a handshake. - HandshakeError(HandshakeError), - /// Sending failed. - SendError(SendError), - /// Receiving failed. - ReceiveError(ReceiveError), - /// Heartbeat stopped. - CardiacArrest, - /// Channel to the parent service closed. - NoParentConnection, - /// Data channel closed. - NoUserConnection, -} - -impl Display for ProtocolError { - fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { - use ProtocolError::*; - match self { - HandshakeError(e) => write!(f, "handshake error: {}", e), - SendError(e) => write!(f, "send error: {}", e), - ReceiveError(e) => write!(f, "receive error: {}", e), - CardiacArrest => write!(f, "heartbeat stopped"), - NoParentConnection => write!(f, "cannot send result to service"), - NoUserConnection => write!(f, "cannot send data to user"), - } - } -} - -impl From for ProtocolError { - fn from(e: HandshakeError) -> Self { - ProtocolError::HandshakeError(e) - } -} - -impl From for ProtocolError { - fn from(e: SendError) -> Self { - ProtocolError::SendError(e) - } -} +mod heartbeat; -impl From for ProtocolError { - fn from(e: ReceiveError) -> Self { - ProtocolError::ReceiveError(e) - } -} +use heartbeat::{heartbeat_receiver, heartbeat_sender}; /// Receives data from the parent service and sends it over the network. /// Exits when the parent channel is closed, or if the network connection is broken. @@ -91,7 +39,7 @@ async fn sending( /// Performs the handshake, and then keeps sending data received from the parent service. /// Exits on parent request, or in case of broken or dead network connection. -async fn v0_outgoing( +pub async fn outgoing( stream: S, authority_pen: AuthorityPen, peer_id: AuthorityId, @@ -100,7 +48,7 @@ async fn v0_outgoing( trace!(target: "validator-network", "Extending hand to {}.", peer_id); let (sender, receiver) = v0_handshake_outgoing(stream, authority_pen, peer_id.clone()).await?; info!(target: "validator-network", "Outgoing handshake with {} finished successfully.", peer_id); - let (data_for_network, data_from_user) = mpsc::unbounded::(); + let (data_for_network, data_from_user) = mpsc::unbounded(); result_for_parent .unbounded_send((peer_id.clone(), Some(data_for_network))) .map_err(|_| ProtocolError::NoParentConnection)?; @@ -134,19 +82,19 @@ async fn receiving( /// Performs the handshake, and then keeps sending data received from the network to the parent service. /// Exits on parent request, or in case of broken or dead network connection. -async fn v0_incoming( +pub async fn incoming( stream: S, authority_pen: AuthorityPen, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, oneshot::Sender<()>)>, + result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option>)>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { trace!(target: "validator-network", "Waiting for extended hand..."); let (sender, receiver, peer_id) = v0_handshake_incoming(stream, authority_pen).await?; info!(target: "validator-network", "Incoming handshake with {} finished successfully.", peer_id); - let (tx_exit, exit) = oneshot::channel(); + let (tx_exit, mut exit) = mpsc::unbounded(); result_for_parent - .unbounded_send((peer_id.clone(), tx_exit)) + .unbounded_send((peer_id.clone(), Some(tx_exit))) .map_err(|_| ProtocolError::NoParentConnection)?; let receiving = receiving(receiver, data_for_user); @@ -157,37 +105,7 @@ async fn v0_incoming( tokio::select! { _ = heartbeat => return Err(ProtocolError::CardiacArrest), result = receiving => return result, - _ = exit => return Ok(()), - } - } -} - -impl Protocol { - /// Launches the proper variant of the protocol (receiver half). - pub async fn manage_incoming( - &self, - stream: S, - authority_pen: AuthorityPen, - result_for_service: mpsc::UnboundedSender<(AuthorityId, oneshot::Sender<()>)>, - data_for_user: mpsc::UnboundedSender, - ) -> Result<(), ProtocolError> { - use Protocol::*; - match self { - V0 => v0_incoming(stream, authority_pen, result_for_service, data_for_user).await, - } - } - - /// Launches the proper variant of the protocol (sender half). - pub async fn manage_outgoing( - &self, - stream: S, - authority_pen: AuthorityPen, - peer_id: AuthorityId, - result_for_service: mpsc::UnboundedSender<(AuthorityId, Option>)>, - ) -> Result<(), ProtocolError> { - use Protocol::*; - match self { - V0 => v0_outgoing(stream, authority_pen, peer_id, result_for_service).await, + _ = exit.next() => return Ok(()), } } } @@ -196,11 +114,11 @@ impl Protocol { mod tests { use aleph_primitives::AuthorityId; use futures::{ - channel::{mpsc, mpsc::UnboundedReceiver, oneshot}, + channel::{mpsc, mpsc::UnboundedReceiver}, pin_mut, FutureExt, StreamExt, }; - use super::{Protocol, ProtocolError}; + use super::{ProtocolError, incoming, outgoing}; use crate::{ crypto::AuthorityPen, validator_network::{ @@ -217,24 +135,23 @@ mod tests { impl futures::Future>, impl futures::Future>, UnboundedReceiver, - UnboundedReceiver<(AuthorityId, oneshot::Sender<()>)>, + UnboundedReceiver<(AuthorityId, Option>)>, UnboundedReceiver<(AuthorityId, Option>)>, ) { let (stream_incoming, stream_outgoing) = MockSplittable::new(4096); let (id_incoming, pen_incoming) = key().await; let (id_outgoing, pen_outgoing) = key().await; assert_ne!(id_incoming, id_outgoing); - let (incoming_result_for_service, result_from_incoming) = - mpsc::unbounded::<(AuthorityId, oneshot::Sender<()>)>(); + let (incoming_result_for_service, result_from_incoming) = mpsc::unbounded(); let (outgoing_result_for_service, result_from_outgoing) = mpsc::unbounded(); let (data_for_user, data_from_incoming) = mpsc::unbounded::(); - let incoming_handle = Protocol::V0.manage_incoming( + let incoming_handle = incoming( stream_incoming, pen_incoming.clone(), incoming_result_for_service, data_for_user, ); - let outgoing_handle = Protocol::V0.manage_outgoing( + let outgoing_handle = outgoing( stream_outgoing, pen_outgoing.clone(), id_incoming.clone(), diff --git a/finality-aleph/src/validator_network/protocols/v1/mod.rs b/finality-aleph/src/validator_network/protocols/v1/mod.rs new file mode 100644 index 0000000000..82da65ea60 --- /dev/null +++ b/finality-aleph/src/validator_network/protocols/v1/mod.rs @@ -0,0 +1,445 @@ +use aleph_primitives::AuthorityId; +use codec::{Decode, Encode}; +use futures::{ + channel::mpsc, + StreamExt, +}; +use log::{debug, info, trace}; +use tokio::{ + io::{AsyncRead, AsyncWrite}, + time::{timeout, Duration}, +}; + +use crate::{ + crypto::AuthorityPen, + validator_network::{ + protocols::{ + ProtocolError, + handshake::{v0_handshake_incoming, v0_handshake_outgoing}, + }, + io::{receive_data, send_data}, + Data, Splittable, + }, +}; + +const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5); +const MAX_MISSED_HEARTBEATS: u32 = 4; + +#[derive(Debug, Clone, Encode, Decode)] +enum Message { + Data(D), + Heartbeat, +} + +async fn sending( + mut sender: S, + mut data_from_user: mpsc::UnboundedReceiver, +) -> Result<(), ProtocolError> { + use Message::*; + loop { + let to_send = match timeout( + HEARTBEAT_TIMEOUT, + data_from_user.next(), + ).await { + Ok(maybe_data) => match maybe_data { + Some(data) => Data(data), + // We have been closed by the parent service, all good. + None => return Ok(()), + }, + _ => Heartbeat, + }; + sender = send_data(sender, to_send).await?; + } +} + +async fn receiving( + mut stream: S, + data_for_user: mpsc::UnboundedSender, +) -> Result<(), ProtocolError> { + use Message::*; + loop { + let (old_stream, message) = timeout( + MAX_MISSED_HEARTBEATS * HEARTBEAT_TIMEOUT, + receive_data(stream), + ).await.map_err(|_| ProtocolError::CardiacArrest)??; + stream = old_stream; + match message { + Data(data) => data_for_user + .unbounded_send(data) + .map_err(|_| ProtocolError::NoUserConnection)?, + Heartbeat => (), + } + } +} + +async fn manage_connection( + sender: S, + receiver: R, + data_from_user: mpsc::UnboundedReceiver, + data_for_user: mpsc::UnboundedSender, +) -> Result<(), ProtocolError> { + let sending = sending(sender, data_from_user); + let receiving = receiving(receiver, data_for_user); + tokio::select! { + result = receiving => result, + result = sending => result, + } +} + +//TODO: maybe extract a tad more into manage_connection +/// Performs the outgoing handshake, and then manages a connection sending and receiving data. +/// Exits on parent request, or in case of broken or dead network connection. +pub async fn outgoing( + stream: S, + authority_pen: AuthorityPen, + peer_id: AuthorityId, + result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option>)>, + data_for_user: mpsc::UnboundedSender, +) -> Result<(), ProtocolError> { + trace!(target: "validator-network", "Extending hand to {}.", peer_id); + let (sender, receiver) = v0_handshake_outgoing(stream, authority_pen, peer_id.clone()).await?; + info!(target: "validator-network", "Outgoing handshake with {} finished successfully.", peer_id); + let (data_for_network, data_from_user) = mpsc::unbounded(); + result_for_parent + .unbounded_send((peer_id.clone(), Some(data_for_network))) + .map_err(|_| ProtocolError::NoParentConnection)?; + debug!(target: "validator-network", "Starting worker for communicating with {}.", peer_id); + manage_connection(sender, receiver, data_from_user, data_for_user).await +} + +/// Performs the incoming handshake, and then manages a connection sending and receiving data. +/// Exits on parent request, or in case of broken or dead network connection. +pub async fn incoming( + stream: S, + authority_pen: AuthorityPen, + result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option>)>, + data_for_user: mpsc::UnboundedSender, +) -> Result<(), ProtocolError> { + trace!(target: "validator-network", "Waiting for extended hand..."); + let (sender, receiver, peer_id) = v0_handshake_incoming(stream, authority_pen).await?; + info!(target: "validator-network", "Incoming handshake with {} finished successfully.", peer_id); + let (data_for_network, data_from_user) = mpsc::unbounded(); + result_for_parent + .unbounded_send((peer_id.clone(), Some(data_for_network))) + .map_err(|_| ProtocolError::NoParentConnection)?; + debug!(target: "validator-network", "Starting worker for communicating with {}.", peer_id); + manage_connection(sender, receiver, data_from_user, data_for_user).await +} + +#[cfg(test)] +mod tests { + use aleph_primitives::AuthorityId; + use futures::{ + channel::{mpsc, mpsc::UnboundedReceiver}, + pin_mut, FutureExt, StreamExt, + }; + + use super::{ProtocolError, incoming, outgoing}; + use crate::{ + crypto::AuthorityPen, + validator_network::{ + mock::{keys, MockSplittable}, + Data, + }, + }; + + async fn prepare() -> ( + AuthorityId, + AuthorityPen, + AuthorityId, + AuthorityPen, + impl futures::Future>, + impl futures::Future>, + UnboundedReceiver, + UnboundedReceiver, + UnboundedReceiver<(AuthorityId, Option>)>, + UnboundedReceiver<(AuthorityId, Option>)>, + ) { + let (stream_incoming, stream_outgoing) = MockSplittable::new(4096); + let (id_incoming, pen_incoming) = keys().await; + let (id_outgoing, pen_outgoing) = keys().await; + assert_ne!(id_incoming, id_outgoing); + let (incoming_result_for_service, result_from_incoming) = mpsc::unbounded(); + let (outgoing_result_for_service, result_from_outgoing) = mpsc::unbounded(); + let (incoming_data_for_user, data_from_incoming) = mpsc::unbounded::(); + let (outgoing_data_for_user, data_from_outgoing) = mpsc::unbounded::(); + let incoming_handle = incoming( + stream_incoming, + pen_incoming.clone(), + incoming_result_for_service, + incoming_data_for_user, + ); + let outgoing_handle = outgoing( + stream_outgoing, + pen_outgoing.clone(), + id_incoming.clone(), + outgoing_result_for_service, + outgoing_data_for_user, + ); + ( + id_incoming, + pen_incoming, + id_outgoing, + pen_outgoing, + incoming_handle, + outgoing_handle, + data_from_incoming, + data_from_outgoing, + result_from_incoming, + result_from_outgoing, + ) + } + + #[tokio::test] + async fn send_data() { + let ( + _id_incoming, + _pen_incoming, + _id_outgoing, + _pen_outgoing, + incoming_handle, + outgoing_handle, + mut data_from_incoming, + mut data_from_outgoing, + mut result_from_incoming, + mut result_from_outgoing, + ) = prepare::>().await; + let incoming_handle = incoming_handle.fuse(); + let outgoing_handle = outgoing_handle.fuse(); + pin_mut!(incoming_handle); + pin_mut!(outgoing_handle); + let _data_for_outgoing = tokio::select! { + _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), + _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), + result = result_from_outgoing.next() => { + let (_, maybe_data_for_outgoing) = result.expect("outgoing should have resturned Some"); + let data_for_outgoing = maybe_data_for_outgoing.expect("successfully connected"); + data_for_outgoing + .unbounded_send(vec![4, 3, 43]) + .expect("should send"); + data_for_outgoing + .unbounded_send(vec![2, 1, 3, 7]) + .expect("should send"); + data_for_outgoing + }, + }; + let _data_for_incoming = tokio::select! { + _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), + _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), + result = result_from_incoming.next() => { + let (_, maybe_data_for_incoming) = result.expect("outgoing should have resturned Some"); + let data_for_incoming = maybe_data_for_incoming.expect("successfully connected"); + data_for_incoming + .unbounded_send(vec![5, 4, 44]) + .expect("should send"); + data_for_incoming + .unbounded_send(vec![3, 2, 4, 8]) + .expect("should send"); + data_for_incoming + }, + }; + tokio::select! { + _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), + _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), + v = data_from_incoming.next() => { + assert_eq!(v, Some(vec![4, 3, 43])); + }, + }; + tokio::select! { + _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), + _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), + v = data_from_incoming.next() => { + assert_eq!(v, Some(vec![2, 1, 3, 7])); + }, + }; + tokio::select! { + _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), + _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), + v = data_from_outgoing.next() => { + assert_eq!(v, Some(vec![5, 4, 44])); + }, + }; + tokio::select! { + _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), + _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), + v = data_from_outgoing.next() => { + assert_eq!(v, Some(vec![3, 2, 4, 8])); + }, + }; + } + + #[tokio::test] + async fn closed_by_parent_service() { + let ( + _id_incoming, + _pen_incoming, + id_outgoing, + _pen_outgoing, + incoming_handle, + outgoing_handle, + _data_from_incoming, + _data_from_outgoing, + mut result_from_incoming, + _result_from_outgoing, + ) = prepare::>().await; + let incoming_handle = incoming_handle.fuse(); + let outgoing_handle = outgoing_handle.fuse(); + pin_mut!(incoming_handle); + pin_mut!(outgoing_handle); + tokio::select! { + _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), + _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), + received = result_from_incoming.next() => { + // we drop the data sending channel, thus finishing incoming_handle + let (received_id, _) = received.expect("should receive"); + assert_eq!(received_id, id_outgoing); + }, + }; + incoming_handle + .await + .expect("closed manually, should finish with no error"); + } + + #[tokio::test] + async fn parent_service_dead() { + let ( + _id_incoming, + _pen_incoming, + _id_outgoing, + _pen_outgoing, + incoming_handle, + outgoing_handle, + _data_from_incoming, + _data_from_outgoing, + result_from_incoming, + _result_from_outgoing, + ) = prepare::>().await; + std::mem::drop(result_from_incoming); + let incoming_handle = incoming_handle.fuse(); + let outgoing_handle = outgoing_handle.fuse(); + pin_mut!(incoming_handle); + pin_mut!(outgoing_handle); + tokio::select! { + e = &mut incoming_handle => match e { + Err(ProtocolError::NoParentConnection) => (), + Err(e) => panic!("unexpected error: {}", e), + Ok(_) => panic!("successfully finished when parent dead"), + }, + _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), + }; + } + + #[tokio::test] + async fn parent_user_dead() { + let ( + _id_incoming, + _pen_incoming, + _id_outgoing, + _pen_outgoing, + incoming_handle, + outgoing_handle, + data_from_incoming, + _data_from_outgoing, + _result_from_incoming, + mut result_from_outgoing, + ) = prepare::>().await; + std::mem::drop(data_from_incoming); + let incoming_handle = incoming_handle.fuse(); + let outgoing_handle = outgoing_handle.fuse(); + pin_mut!(incoming_handle); + pin_mut!(outgoing_handle); + let _data_for_outgoing = tokio::select! { + _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), + _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), + result = result_from_outgoing.next() => { + let (_, maybe_data_for_outgoing) = result.expect("outgoing should have resturned Some"); + let data_for_outgoing = maybe_data_for_outgoing.expect("successfully connected"); + data_for_outgoing + .unbounded_send(vec![2, 1, 3, 7]) + .expect("should send"); + data_for_outgoing + }, + }; + tokio::select! { + e = &mut incoming_handle => match e { + Err(ProtocolError::NoUserConnection) => (), + Err(e) => panic!("unexpected error: {}", e), + Ok(_) => panic!("successfully finished when user dead"), + }, + _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), + }; + } + + #[tokio::test] + async fn sender_dead_before_handshake() { + let ( + _id_incoming, + _pen_incoming, + _id_outgoing, + _pen_outgoing, + incoming_handle, + outgoing_handle, + _data_from_incoming, + _data_from_outgoing, + _result_from_incoming, + _result_from_outgoing, + ) = prepare::>().await; + std::mem::drop(outgoing_handle); + match incoming_handle.await { + Err(ProtocolError::HandshakeError(_)) => (), + Err(e) => panic!("unexpected error: {}", e), + Ok(_) => panic!("successfully finished when connection dead"), + }; + } + + #[tokio::test] + async fn sender_dead_after_handshake() { + let ( + _id_incoming, + _pen_incoming, + _id_outgoing, + _pen_outgoing, + incoming_handle, + outgoing_handle, + _data_from_incoming, + _data_from_outgoing, + mut result_from_incoming, + _result_from_outgoing, + ) = prepare::>().await; + let incoming_handle = incoming_handle.fuse(); + pin_mut!(incoming_handle); + let (_, _exit) = tokio::select! { + _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), + _ = outgoing_handle => panic!("outgoing process unexpectedly finished"), + out = result_from_incoming.next() => out.expect("should receive"), + }; + // outgoing_handle got consumed by tokio::select!, the sender is dead + match incoming_handle.await { + Err(ProtocolError::ReceiveError(_)) => (), + Err(e) => panic!("unexpected error: {}", e), + Ok(_) => panic!("successfully finished when connection dead"), + }; + } + + #[tokio::test] + async fn receiver_dead_before_handshake() { + let ( + _id_incoming, + _pen_incoming, + _id_outgoing, + _pen_outgoing, + incoming_handle, + outgoing_handle, + _data_from_incoming, + _data_from_outgoing, + _result_from_incoming, + _result_from_outgoing, + ) = prepare::>().await; + std::mem::drop(incoming_handle); + match outgoing_handle.await { + Err(ProtocolError::HandshakeError(_)) => (), + Err(e) => panic!("unexpected error: {}", e), + Ok(_) => panic!("successfully finished when connection dead"), + }; + } +} diff --git a/finality-aleph/src/validator_network/service.rs b/finality-aleph/src/validator_network/service.rs index ead09e5256..3602bbbe07 100644 --- a/finality-aleph/src/validator_network/service.rs +++ b/finality-aleph/src/validator_network/service.rs @@ -97,7 +97,7 @@ impl, NL: Listener> Service { Self { commands_from_interface, next_to_interface, - manager: Manager::new(), + manager: Manager::new(authority_pen.authority_id()), dialer, listener, spawn_handle, @@ -118,16 +118,17 @@ impl, NL: Listener> Service { ) { let authority_pen = self.authority_pen.clone(); let dialer = self.dialer.clone(); + let next_to_interface = self.next_to_interface.clone(); self.spawn_handle .spawn("aleph/validator_network_outgoing", None, async move { - outgoing(authority_pen, peer_id, dialer, addresses, result_for_parent).await; + outgoing(authority_pen, peer_id, dialer, addresses, result_for_parent, next_to_interface).await; }); } fn spawn_new_incoming( &self, stream: NL::Connection, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, oneshot::Sender<()>)>, + result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option>)>, ) { let authority_pen = self.authority_pen.clone(); let next_to_interface = self.next_to_interface.clone(); @@ -140,20 +141,16 @@ impl, NL: Listener> Service { /// Run the service until a signal from exit. pub async fn run(mut self, mut exit: oneshot::Receiver<()>) { let mut status_ticker = time::interval(STATUS_REPORT_INTERVAL); - // channel used to receive tuple (peer_id, exit_handle) from a spawned worker - // that has just established an incoming connection - // exit_handle may be used to kill the worker later - let (incoming_result_for_parent, mut incoming_workers) = mpsc::unbounded(); // channel used to receive information about failure from a spawned worker // that managed an outgoing connection // the received peer_id can be used to spawn another worker - let (outgoing_result_for_parent, mut outgoing_workers) = mpsc::unbounded(); + let (result_for_parent, mut worker_results) = mpsc::unbounded(); use ServiceCommand::*; loop { tokio::select! { // got new incoming connection from the listener - spawn an incoming worker maybe_stream = self.listener.accept() => match maybe_stream { - Ok(stream) => self.spawn_new_incoming(stream, incoming_result_for_parent.clone()), + Ok(stream) => self.spawn_new_incoming(stream, result_for_parent.clone()), Err(e) => warn!(target: "validator-network", "Listener failed to accept connection: {}", e), }, // got a new command from the interface @@ -162,7 +159,7 @@ impl, NL: Listener> Service { // spawn a worker managing outgoing connection if the peer was not known AddConnection(peer_id, addresses) => { if self.manager.add_peer(peer_id.clone(), addresses.clone()) { - self.spawn_new_outgoing(peer_id, addresses, outgoing_result_for_parent.clone()); + self.spawn_new_outgoing(peer_id, addresses, result_for_parent.clone()); }; }, // remove the peer from the manager all workers will be killed automatically, due to closed channels @@ -177,32 +174,20 @@ impl, NL: Listener> Service { } }, }, - // received tuple (peer_id, exit_handle) from a spawned worker - // that has just established an incoming connection - // pass the tuple to the manager to register the connection - // the manager will be responsible for killing the worker if necessary - Some((peer_id, exit)) = incoming_workers.next() => { - use AddResult::*; - match self.manager.add_incoming(peer_id.clone(), exit) { - Uninterested => info!(target: "validator-network", "Peer {} connected to us despite out lack of interest.", peer_id), - Added => info!(target: "validator-network", "New incoming connection for peer {}.", peer_id), - Replaced => info!(target: "validator-network", "Replaced incoming connection for peer {}.", peer_id), - } - }, - // received information from a spawned worker managing an outgoing connection + // received information from a spawned worker managing a connection // check if we still want to be connected to the peer, and if so, spawn a new worker or actually add proper connection - Some((peer_id, maybe_data_for_network)) = outgoing_workers.next() => { + Some((peer_id, maybe_data_for_network)) = worker_results.next() => { use AddResult::*; - if let Some(addresses) = self.manager.peer_addresses(&peer_id) { - match maybe_data_for_network { - Some(data_for_network) => match self.manager.add_outgoing(peer_id.clone(), data_for_network) { - Uninterested => warn!(target: "validator-network", "We connected to peer {} for unknown reasons.", peer_id), - Added => info!(target: "validator-network", "New outgoing connection to peer {}.", peer_id), - Replaced => info!(target: "validator-network", "Replaced outgoing connection to peer {}.", peer_id), - }, - None => self.spawn_new_outgoing(peer_id, addresses, outgoing_result_for_parent.clone()), + match maybe_data_for_network { + Some(data_for_network) => match self.manager.add_connection(peer_id.clone(), data_for_network) { + Uninterested => warn!(target: "validator-network", "Established connection with peer {} for unknown reasons.", peer_id), + Added => info!(target: "validator-network", "New connection with peer {}.", peer_id), + Replaced => info!(target: "validator-network", "Replaced connection with peer {}.", peer_id), + }, + None => if let Some(addresses) = self.manager.peer_addresses(&peer_id) { + self.spawn_new_outgoing(peer_id, addresses, result_for_parent.clone()); } - }; + } }, // periodically reporting what we are trying to do _ = status_ticker.tick() => { From c7675f5d0ad7f45739557d4c6098577f1734927e Mon Sep 17 00:00:00 2001 From: timorl Date: Fri, 4 Nov 2022 15:43:05 +0100 Subject: [PATCH 2/4] Add handling of legacy double connections --- .../src/validator_network/incoming.rs | 7 +- .../validator_network/manager/direction.rs | 4 +- .../src/validator_network/manager/legacy.rs | 27 +++--- .../src/validator_network/manager/mod.rs | 12 +-- .../src/validator_network/outgoing.rs | 10 +- .../src/validator_network/protocols/mod.rs | 35 ++++--- .../protocols/negotiation.rs | 4 +- .../src/validator_network/protocols/v0/mod.rs | 33 ++++--- .../src/validator_network/protocols/v1/mod.rs | 37 ++++---- .../src/validator_network/service.rs | 93 ++++++++++++++++--- 10 files changed, 175 insertions(+), 87 deletions(-) diff --git a/finality-aleph/src/validator_network/incoming.rs b/finality-aleph/src/validator_network/incoming.rs index 4dae3f3052..ad705d526a 100644 --- a/finality-aleph/src/validator_network/incoming.rs +++ b/finality-aleph/src/validator_network/incoming.rs @@ -1,13 +1,12 @@ use std::fmt::{Display, Error as FmtError, Formatter}; -use aleph_primitives::AuthorityId; use futures::channel::mpsc; use log::{debug, info}; use crate::{ crypto::AuthorityPen, validator_network::{ - protocols::{protocol, ProtocolError, ProtocolNegotiationError}, + protocols::{protocol, ProtocolError, ProtocolNegotiationError, ResultForService}, Data, Splittable, }, }; @@ -42,7 +41,7 @@ impl From for IncomingError { async fn manage_incoming( authority_pen: AuthorityPen, stream: S, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option>)>, + result_for_parent: ResultForService, data_for_user: mpsc::UnboundedSender, ) -> Result<(), IncomingError> { debug!(target: "validator-network", "Performing incoming protocol negotiation."); @@ -61,7 +60,7 @@ async fn manage_incoming( pub async fn incoming( authority_pen: AuthorityPen, stream: S, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option>)>, + result_for_parent: ResultForService, data_for_user: mpsc::UnboundedSender, ) { if let Err(e) = manage_incoming(authority_pen, stream, result_for_parent, data_for_user).await { diff --git a/finality-aleph/src/validator_network/manager/direction.rs b/finality-aleph/src/validator_network/manager/direction.rs index beb8df9564..afa5742fbe 100644 --- a/finality-aleph/src/validator_network/manager/direction.rs +++ b/finality-aleph/src/validator_network/manager/direction.rs @@ -89,12 +89,12 @@ mod tests { use aleph_primitives::AuthorityId; use super::DirectedPeers; - use crate::validator_network::mock::keys; + use crate::validator_network::mock::key; type Address = String; async fn container_with_id() -> (DirectedPeers
, AuthorityId) { - let (own_id, _) = keys().await; + let (own_id, _) = key().await; let own_container = DirectedPeers::new(own_id.clone()); (own_container, own_id) } diff --git a/finality-aleph/src/validator_network/manager/legacy.rs b/finality-aleph/src/validator_network/manager/legacy.rs index d889f57d15..6e05888562 100644 --- a/finality-aleph/src/validator_network/manager/legacy.rs +++ b/finality-aleph/src/validator_network/manager/legacy.rs @@ -4,7 +4,7 @@ use std::{ }; use aleph_primitives::AuthorityId; -use futures::channel::{mpsc, oneshot}; +use futures::channel::mpsc; use crate::{network::PeerId, validator_network::{Data, manager::{AddResult, SendError}}}; @@ -13,10 +13,9 @@ use crate::{network::PeerId, validator_network::{Data, manager::{AddResult, Send pub struct Manager { addresses: HashMap>, outgoing: HashMap>, - incoming: HashMap>, + incoming: HashMap>, } - struct ManagerStatus { wanted_peers: usize, both_ways_peers: HashSet, @@ -30,7 +29,7 @@ impl ManagerStatus { let incoming: HashSet<_> = manager .incoming .iter() - .filter(|(_, exit)| !exit.is_canceled()) + .filter(|(_, exit)| !exit.is_closed()) .map(|(k, _)| k.clone()) .collect(); let outgoing: HashSet<_> = manager @@ -173,7 +172,7 @@ impl Manager { /// Add an established incoming connection with a known peer, /// but only if the peer is on the list of peers that we want to stay connected with. - pub fn add_incoming(&mut self, peer_id: AuthorityId, exit: oneshot::Sender<()>) -> AddResult { + pub fn add_incoming(&mut self, peer_id: AuthorityId, exit: mpsc::UnboundedSender) -> AddResult { use AddResult::*; if !self.addresses.contains_key(&peer_id) { return Uninterested; @@ -212,7 +211,7 @@ impl Manager { #[cfg(test)] mod tests { use futures::{ - channel::{mpsc, oneshot}, + channel::mpsc, StreamExt, }; @@ -290,27 +289,27 @@ mod tests { String::from("a/b/c"), String::from("43.43.43.43:43000"), ]; - let (tx, rx) = oneshot::channel(); + let (tx, mut rx) = mpsc::unbounded(); // try add unknown peer assert_eq!(manager.add_incoming(peer_id.clone(), tx), Uninterested); // rx should fail - assert!(rx.await.is_err()); + assert!(rx.try_next().expect("channel should be closed").is_none()); // add peer, this time for real assert!(manager.add_peer(peer_id.clone(), addresses.clone())); - let (tx, mut rx) = oneshot::channel(); + let (tx, mut rx) = mpsc::unbounded(); // should just add assert_eq!(manager.add_incoming(peer_id.clone(), tx), Added); // the exit channel should be open - assert!(rx.try_recv().is_ok()); - let (tx, mut rx2) = oneshot::channel(); + assert!(rx.try_next().is_err()); + let (tx, mut rx2) = mpsc::unbounded(); // should replace now assert_eq!(manager.add_incoming(peer_id.clone(), tx), Replaced); // receiving should fail on old, but work on new channel - assert!(rx.try_recv().is_err()); - assert!(rx2.try_recv().is_ok()); + assert!(rx.try_next().expect("channel should be closed").is_none()); + assert!(rx2.try_next().is_err()); // remove peer manager.remove_peer(&peer_id); // receiving should fail - assert!(rx2.try_recv().is_err()); + assert!(rx2.try_next().expect("channel should be closed").is_none()); } } diff --git a/finality-aleph/src/validator_network/manager/mod.rs b/finality-aleph/src/validator_network/manager/mod.rs index 0a331fbe6e..560fecea6e 100644 --- a/finality-aleph/src/validator_network/manager/mod.rs +++ b/finality-aleph/src/validator_network/manager/mod.rs @@ -235,17 +235,17 @@ mod tests { }; use super::{AddResult::*, Manager, SendError}; - use crate::validator_network::mock::keys; + use crate::validator_network::mock::key; type Data = String; type Address = String; #[tokio::test] async fn add_remove() { - let (own_id, _) = keys().await; + let (own_id, _) = key().await; let mut manager = Manager::::new(own_id); - let (peer_id, _) = keys().await; - let (peer_id_b, _) = keys().await; + let (peer_id, _) = key().await; + let (peer_id_b, _) = key().await; let addresses = vec![ String::from(""), String::from("a/b/c"), @@ -274,9 +274,9 @@ mod tests { #[tokio::test] async fn send_receive() { - let (mut connecting_id, _) = keys().await; + let (mut connecting_id, _) = key().await; let mut connecting_manager = Manager::::new(connecting_id.clone()); - let (mut listening_id, _) = keys().await; + let (mut listening_id, _) = key().await; let mut listening_manager = Manager::::new(listening_id.clone()); let data = String::from("DATA"); let addresses = vec![ diff --git a/finality-aleph/src/validator_network/outgoing.rs b/finality-aleph/src/validator_network/outgoing.rs index fd43c0f219..1c479d6acb 100644 --- a/finality-aleph/src/validator_network/outgoing.rs +++ b/finality-aleph/src/validator_network/outgoing.rs @@ -8,7 +8,7 @@ use tokio::time::{sleep, Duration}; use crate::{ crypto::AuthorityPen, validator_network::{ - protocols::{protocol, ProtocolError, ProtocolNegotiationError}, + protocols::{protocol, ConnectionType, ProtocolError, ProtocolNegotiationError, ResultForService}, Data, Dialer, }, }; @@ -47,7 +47,7 @@ async fn manage_outgoing>( peer_id: AuthorityId, mut dialer: ND, addresses: Vec, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option>)>, + result_for_parent: ResultForService, data_for_user: mpsc::UnboundedSender, ) -> Result<(), OutgoingError> { debug!(target: "validator-network", "Trying to connect to {}.", peer_id); @@ -73,7 +73,7 @@ pub async fn outgoing>( peer_id: AuthorityId, dialer: ND, addresses: Vec, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option>)>, + result_for_parent: ResultForService, data_for_user: mpsc::UnboundedSender, ) { if let Err(e) = manage_outgoing( @@ -88,7 +88,9 @@ pub async fn outgoing>( { info!(target: "validator-network", "Outgoing connection to {} failed: {}, will retry after {}s.", peer_id, e, RETRY_DELAY.as_secs()); sleep(RETRY_DELAY).await; - if result_for_parent.unbounded_send((peer_id, None)).is_err() { + // we send the "new" connection type, because we always assume it's new until proven + // otherwise, and here we did not even get the chance to attempt negotiating a protocol + if result_for_parent.unbounded_send((peer_id, None, ConnectionType::New)).is_err() { debug!(target: "validator-network", "Could not send the closing message, we've probably been terminated by the parent service."); } } diff --git a/finality-aleph/src/validator_network/protocols/mod.rs b/finality-aleph/src/validator_network/protocols/mod.rs index 00ed8112a3..323f690c10 100644 --- a/finality-aleph/src/validator_network/protocols/mod.rs +++ b/finality-aleph/src/validator_network/protocols/mod.rs @@ -23,6 +23,21 @@ pub use negotiation::{protocol, ProtocolNegotiationError}; pub type Version = u32; +/// The types of connections needed for backwards compatibility with the legacy two connections +/// protocol. Remove after it's no longer needed. +#[derive(PartialEq, Debug, Eq, Clone, Copy)] +pub enum ConnectionType { + New, + LegacyIncoming, + LegacyOutgoing, +} + +/// What connections send back to the service after they become established. Starts with a peer id +/// of the remote node, followed by a channel for sending data to that node, with None if the +/// connection was unsuccessful and should be reestablished. Finally a marker for legacy +/// compatibility. +pub type ResultForService = mpsc::UnboundedSender<(AuthorityId, Option>, ConnectionType)>; + /// Defines the protocol for communication. #[derive(Debug, PartialEq, Eq)] pub enum Protocol { @@ -83,12 +98,18 @@ impl From for ProtocolError { } impl Protocol { + /// Minimal supported protocol version. + const MIN_VERSION: Version = 0; + + /// Maximal supported protocol version. + const MAX_VERSION: Version = 1; + /// Launches the proper variant of the protocol (receiver half). pub async fn manage_incoming( &self, stream: S, authority_pen: AuthorityPen, - result_for_service: mpsc::UnboundedSender<(AuthorityId, Option>)>, + result_for_service: ResultForService, data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { use Protocol::*; @@ -104,7 +125,7 @@ impl Protocol { stream: S, authority_pen: AuthorityPen, peer_id: AuthorityId, - result_for_service: mpsc::UnboundedSender<(AuthorityId, Option>)>, + result_for_service: ResultForService, data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { use Protocol::*; @@ -113,16 +134,6 @@ impl Protocol { V1 => v1::outgoing(stream, authority_pen, peer_id, result_for_service, data_for_user).await, } } - - /// Minimal supported protocol version. - pub fn min_version() -> Version { - 0 - } - - /// Maximal supported protocol version. - pub fn max_version() -> Version { - 1 - } } impl TryFrom for Protocol { diff --git a/finality-aleph/src/validator_network/protocols/negotiation.rs b/finality-aleph/src/validator_network/protocols/negotiation.rs index 13e5b7d9c9..4fd2ff67f0 100644 --- a/finality-aleph/src/validator_network/protocols/negotiation.rs +++ b/finality-aleph/src/validator_network/protocols/negotiation.rs @@ -22,8 +22,8 @@ impl Display for ProtocolsRange { } } -fn supported_protocol_range() -> ProtocolsRange { - ProtocolsRange(Protocol::min_version(), Protocol::max_version()) +const fn supported_protocol_range() -> ProtocolsRange { + ProtocolsRange(Protocol::MIN_VERSION, Protocol::MAX_VERSION) } /// What went wrong when negotiating a protocol. diff --git a/finality-aleph/src/validator_network/protocols/v0/mod.rs b/finality-aleph/src/validator_network/protocols/v0/mod.rs index 510d9bd230..1fb832fcc6 100644 --- a/finality-aleph/src/validator_network/protocols/v0/mod.rs +++ b/finality-aleph/src/validator_network/protocols/v0/mod.rs @@ -10,7 +10,7 @@ use crate::{ crypto::AuthorityPen, validator_network::{ protocols::{ - ProtocolError, + ProtocolError, ResultForService, ConnectionType, handshake::{v0_handshake_incoming, v0_handshake_outgoing}, }, io::{receive_data, send_data}, @@ -43,14 +43,14 @@ pub async fn outgoing( stream: S, authority_pen: AuthorityPen, peer_id: AuthorityId, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option>)>, + result_for_parent: ResultForService, ) -> Result<(), ProtocolError> { trace!(target: "validator-network", "Extending hand to {}.", peer_id); let (sender, receiver) = v0_handshake_outgoing(stream, authority_pen, peer_id.clone()).await?; info!(target: "validator-network", "Outgoing handshake with {} finished successfully.", peer_id); let (data_for_network, data_from_user) = mpsc::unbounded(); result_for_parent - .unbounded_send((peer_id.clone(), Some(data_for_network))) + .unbounded_send((peer_id.clone(), Some(data_for_network), ConnectionType::LegacyOutgoing)) .map_err(|_| ProtocolError::NoParentConnection)?; let sending = sending(sender, data_from_user); @@ -85,7 +85,7 @@ async fn receiving( pub async fn incoming( stream: S, authority_pen: AuthorityPen, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option>)>, + result_for_parent: ResultForService, data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { trace!(target: "validator-network", "Waiting for extended hand..."); @@ -94,7 +94,7 @@ pub async fn incoming( let (tx_exit, mut exit) = mpsc::unbounded(); result_for_parent - .unbounded_send((peer_id.clone(), Some(tx_exit))) + .unbounded_send((peer_id.clone(), Some(tx_exit), ConnectionType::LegacyIncoming)) .map_err(|_| ProtocolError::NoParentConnection)?; let receiving = receiving(receiver, data_for_user); @@ -122,6 +122,7 @@ mod tests { use crate::{ crypto::AuthorityPen, validator_network::{ + protocols::ConnectionType, mock::{key, MockSplittable}, Data, }, @@ -135,8 +136,8 @@ mod tests { impl futures::Future>, impl futures::Future>, UnboundedReceiver, - UnboundedReceiver<(AuthorityId, Option>)>, - UnboundedReceiver<(AuthorityId, Option>)>, + UnboundedReceiver<(AuthorityId, Option>, ConnectionType)>, + UnboundedReceiver<(AuthorityId, Option>, ConnectionType)>, ) { let (stream_incoming, stream_outgoing) = MockSplittable::new(4096); let (id_incoming, pen_incoming) = key().await; @@ -191,7 +192,8 @@ mod tests { _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), result = result_from_outgoing.next() => { - let (_, maybe_data_for_outgoing) = result.expect("outgoing should have resturned Some"); + let (_, maybe_data_for_outgoing, connection_type) = result.expect("the chennel shouldn't be dropped"); + assert_eq!(connection_type, ConnectionType::LegacyOutgoing); let data_for_outgoing = maybe_data_for_outgoing.expect("successfully connected"); data_for_outgoing .unbounded_send(vec![4, 3, 43]) @@ -240,7 +242,8 @@ mod tests { _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), received = result_from_incoming.next() => { // we drop the exit oneshot channel, thus finishing incoming_handle - let (received_id, _) = received.expect("should receive"); + let (received_id, _, connection_type) = received.expect("the chennel shouldn't be dropped"); + assert_eq!(connection_type, ConnectionType::LegacyIncoming); assert_eq!(received_id, id_outgoing); }, }; @@ -299,7 +302,8 @@ mod tests { _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), result = result_from_outgoing.next() => { - let (_, maybe_data_for_outgoing) = result.expect("outgoing should have resturned Some"); + let (_, maybe_data_for_outgoing, connection_type) = result.expect("the chennel shouldn't be dropped"); + assert_eq!(connection_type, ConnectionType::LegacyOutgoing); let data_for_outgoing = maybe_data_for_outgoing.expect("successfully connected"); data_for_outgoing .unbounded_send(vec![2, 1, 3, 7]) @@ -353,11 +357,12 @@ mod tests { ) = prepare::>().await; let incoming_handle = incoming_handle.fuse(); pin_mut!(incoming_handle); - let (_, _exit) = tokio::select! { + let (_, _exit, connection_type) = tokio::select! { _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), _ = outgoing_handle => panic!("outgoing process unexpectedly finished"), out = result_from_incoming.next() => out.expect("should receive"), }; + assert_eq!(connection_type, ConnectionType::LegacyIncoming); // outgoing_handle got consumed by tokio::select!, the sender is dead match incoming_handle.await { Err(ProtocolError::ReceiveError(_)) => (), @@ -402,11 +407,12 @@ mod tests { ) = prepare::>().await; let outgoing_handle = outgoing_handle.fuse(); pin_mut!(outgoing_handle); - let (_, _exit) = tokio::select! { + let (_, _exit, connection_type) = tokio::select! { _ = incoming_handle => panic!("incoming process unexpectedly finished"), _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), out = result_from_incoming.next() => out.expect("should receive"), }; + assert_eq!(connection_type, ConnectionType::LegacyIncoming); // incoming_handle got consumed by tokio::select!, the receiver is dead match outgoing_handle.await { // We never get the SendError variant here, because we did not send anything @@ -432,11 +438,12 @@ mod tests { ) = prepare::>().await; let outgoing_handle = outgoing_handle.fuse(); pin_mut!(outgoing_handle); - let (_, _exit) = tokio::select! { + let (_, _exit, connection_type) = tokio::select! { _ = incoming_handle => panic!("incoming process unexpectedly finished"), _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), out = result_from_incoming.next() => out.expect("should receive"), }; + assert_eq!(connection_type, ConnectionType::LegacyIncoming); match outgoing_handle.await { Err(ProtocolError::CardiacArrest) => (), Err(e) => panic!("unexpected error: {}", e), diff --git a/finality-aleph/src/validator_network/protocols/v1/mod.rs b/finality-aleph/src/validator_network/protocols/v1/mod.rs index 82da65ea60..1870249848 100644 --- a/finality-aleph/src/validator_network/protocols/v1/mod.rs +++ b/finality-aleph/src/validator_network/protocols/v1/mod.rs @@ -14,7 +14,7 @@ use crate::{ crypto::AuthorityPen, validator_network::{ protocols::{ - ProtocolError, + ProtocolError, ConnectionType, ResultForService, handshake::{v0_handshake_incoming, v0_handshake_outgoing}, }, io::{receive_data, send_data}, @@ -86,14 +86,13 @@ async fn manage_connection( stream: S, authority_pen: AuthorityPen, peer_id: AuthorityId, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option>)>, + result_for_parent: ResultForService, data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { trace!(target: "validator-network", "Extending hand to {}.", peer_id); @@ -101,7 +100,7 @@ pub async fn outgoing( info!(target: "validator-network", "Outgoing handshake with {} finished successfully.", peer_id); let (data_for_network, data_from_user) = mpsc::unbounded(); result_for_parent - .unbounded_send((peer_id.clone(), Some(data_for_network))) + .unbounded_send((peer_id.clone(), Some(data_for_network), ConnectionType::New)) .map_err(|_| ProtocolError::NoParentConnection)?; debug!(target: "validator-network", "Starting worker for communicating with {}.", peer_id); manage_connection(sender, receiver, data_from_user, data_for_user).await @@ -112,7 +111,7 @@ pub async fn outgoing( pub async fn incoming( stream: S, authority_pen: AuthorityPen, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option>)>, + result_for_parent: ResultForService, data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { trace!(target: "validator-network", "Waiting for extended hand..."); @@ -120,7 +119,7 @@ pub async fn incoming( info!(target: "validator-network", "Incoming handshake with {} finished successfully.", peer_id); let (data_for_network, data_from_user) = mpsc::unbounded(); result_for_parent - .unbounded_send((peer_id.clone(), Some(data_for_network))) + .unbounded_send((peer_id.clone(), Some(data_for_network), ConnectionType::New)) .map_err(|_| ProtocolError::NoParentConnection)?; debug!(target: "validator-network", "Starting worker for communicating with {}.", peer_id); manage_connection(sender, receiver, data_from_user, data_for_user).await @@ -138,7 +137,8 @@ mod tests { use crate::{ crypto::AuthorityPen, validator_network::{ - mock::{keys, MockSplittable}, + protocols::ConnectionType, + mock::{key, MockSplittable}, Data, }, }; @@ -152,12 +152,12 @@ mod tests { impl futures::Future>, UnboundedReceiver, UnboundedReceiver, - UnboundedReceiver<(AuthorityId, Option>)>, - UnboundedReceiver<(AuthorityId, Option>)>, + UnboundedReceiver<(AuthorityId, Option>, ConnectionType)>, + UnboundedReceiver<(AuthorityId, Option>, ConnectionType)>, ) { let (stream_incoming, stream_outgoing) = MockSplittable::new(4096); - let (id_incoming, pen_incoming) = keys().await; - let (id_outgoing, pen_outgoing) = keys().await; + let (id_incoming, pen_incoming) = key().await; + let (id_outgoing, pen_outgoing) = key().await; assert_ne!(id_incoming, id_outgoing); let (incoming_result_for_service, result_from_incoming) = mpsc::unbounded(); let (outgoing_result_for_service, result_from_outgoing) = mpsc::unbounded(); @@ -212,7 +212,8 @@ mod tests { _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), result = result_from_outgoing.next() => { - let (_, maybe_data_for_outgoing) = result.expect("outgoing should have resturned Some"); + let (_, maybe_data_for_outgoing, connection_type) = result.expect("the chennel shouldn't be dropped"); + assert_eq!(connection_type, ConnectionType::New); let data_for_outgoing = maybe_data_for_outgoing.expect("successfully connected"); data_for_outgoing .unbounded_send(vec![4, 3, 43]) @@ -227,7 +228,8 @@ mod tests { _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), result = result_from_incoming.next() => { - let (_, maybe_data_for_incoming) = result.expect("outgoing should have resturned Some"); + let (_, maybe_data_for_incoming, connection_type) = result.expect("the chennel shouldn't be dropped"); + assert_eq!(connection_type, ConnectionType::New); let data_for_incoming = maybe_data_for_incoming.expect("successfully connected"); data_for_incoming .unbounded_send(vec![5, 4, 44]) @@ -291,7 +293,8 @@ mod tests { _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), received = result_from_incoming.next() => { // we drop the data sending channel, thus finishing incoming_handle - let (received_id, _) = received.expect("should receive"); + let (received_id, _, connection_type) = received.expect("the chennel shouldn't be dropped"); + assert_eq!(connection_type, ConnectionType::New); assert_eq!(received_id, id_outgoing); }, }; @@ -352,7 +355,8 @@ mod tests { _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), result = result_from_outgoing.next() => { - let (_, maybe_data_for_outgoing) = result.expect("outgoing should have resturned Some"); + let (_, maybe_data_for_outgoing, connection_type) = result.expect("the chennel shouldn't be dropped"); + assert_eq!(connection_type, ConnectionType::New); let data_for_outgoing = maybe_data_for_outgoing.expect("successfully connected"); data_for_outgoing .unbounded_send(vec![2, 1, 3, 7]) @@ -408,11 +412,12 @@ mod tests { ) = prepare::>().await; let incoming_handle = incoming_handle.fuse(); pin_mut!(incoming_handle); - let (_, _exit) = tokio::select! { + let (_, _exit, connection_type) = tokio::select! { _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), _ = outgoing_handle => panic!("outgoing process unexpectedly finished"), out = result_from_incoming.next() => out.expect("should receive"), }; + assert_eq!(connection_type, ConnectionType::New); // outgoing_handle got consumed by tokio::select!, the sender is dead match incoming_handle.await { Err(ProtocolError::ReceiveError(_)) => (), diff --git a/finality-aleph/src/validator_network/service.rs b/finality-aleph/src/validator_network/service.rs index 3602bbbe07..e83ff0b3cb 100644 --- a/finality-aleph/src/validator_network/service.rs +++ b/finality-aleph/src/validator_network/service.rs @@ -1,16 +1,19 @@ +use std::collections::HashSet; + use aleph_primitives::AuthorityId; use futures::{ channel::{mpsc, oneshot}, StreamExt, }; -use log::{info, trace, warn}; +use log::{debug, info, trace, warn}; use tokio::time; use crate::{ crypto::AuthorityPen, validator_network::{ + protocols::{ResultForService, ConnectionType}, incoming::incoming, - manager::{AddResult, Manager}, + manager::{AddResult, LegacyManager, Manager}, outgoing::outgoing, Data, Dialer, Listener, Network, }, @@ -79,6 +82,9 @@ pub struct Service, NL: Listener> { listener: NL, spawn_handle: SpawnTaskHandle, authority_pen: AuthorityPen, + // Backwards compatibility with the one-sided connections, remove when no longer needed. + legacy_connected: HashSet, + legacy_manager: LegacyManager, } impl, NL: Listener> Service { @@ -102,6 +108,8 @@ impl, NL: Listener> Service { listener, spawn_handle, authority_pen, + legacy_connected: HashSet::new(), + legacy_manager: LegacyManager::new(), }, ServiceInterface { commands_for_service, @@ -114,7 +122,7 @@ impl, NL: Listener> Service { &self, peer_id: AuthorityId, addresses: Vec, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option>)>, + result_for_parent: ResultForService, ) { let authority_pen = self.authority_pen.clone(); let dialer = self.dialer.clone(); @@ -128,7 +136,7 @@ impl, NL: Listener> Service { fn spawn_new_incoming( &self, stream: NL::Connection, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option>)>, + result_for_parent: ResultForService, ) { let authority_pen = self.authority_pen.clone(); let next_to_interface = self.next_to_interface.clone(); @@ -138,12 +146,25 @@ impl, NL: Listener> Service { }); } + fn peer_addresses(&self, peer_id: &AuthorityId) -> Option> { + match self.legacy_connected.contains(peer_id) { + true => self.legacy_manager.peer_addresses(peer_id), + false => self.manager.peer_addresses(peer_id), + } + } + + fn add_connection(&mut self, peer_id: AuthorityId, data_for_network: mpsc::UnboundedSender, connection_type: ConnectionType) -> AddResult { + use ConnectionType::*; + match connection_type { + New => self.manager.add_connection(peer_id, data_for_network), + LegacyIncoming => self.legacy_manager.add_incoming(peer_id, data_for_network), + LegacyOutgoing => self.legacy_manager.add_outgoing(peer_id, data_for_network), + } + } + /// Run the service until a signal from exit. pub async fn run(mut self, mut exit: oneshot::Receiver<()>) { let mut status_ticker = time::interval(STATUS_REPORT_INTERVAL); - // channel used to receive information about failure from a spawned worker - // that managed an outgoing connection - // the received peer_id can be used to spawn another worker let (result_for_parent, mut worker_results) = mpsc::unbounded(); use ServiceCommand::*; loop { @@ -158,6 +179,12 @@ impl, NL: Listener> Service { // register new peer in manager or update its list of addresses if already there // spawn a worker managing outgoing connection if the peer was not known AddConnection(peer_id, addresses) => { + // we add all the peers to the legacy manager so we don't lose the + // addresses, but only care about its opinion when it turns out we have to + // in particular the first time we add a peer we never know whether it + // requires legacy connecting, so we only attempt to connect to it if the + // new criterion is satisfied, otherwise we wait for it to connect to us + self.legacy_manager.add_peer(peer_id.clone(), addresses.clone()); if self.manager.add_peer(peer_id.clone(), addresses.clone()) { self.spawn_new_outgoing(peer_id, addresses, result_for_parent.clone()); }; @@ -165,33 +192,71 @@ impl, NL: Listener> Service { // remove the peer from the manager all workers will be killed automatically, due to closed channels DelConnection(peer_id) => { self.manager.remove_peer(&peer_id); + self.legacy_manager.remove_peer(&peer_id); + self.legacy_connected.remove(&peer_id); }, // pass the data to the manager SendData(data, peer_id) => { - match self.manager.send_to(&peer_id, data) { - Ok(_) => trace!(target: "validator-network", "Sending data to {}.", peer_id), - Err(e) => trace!(target: "validator-network", "Failed sending to {}: {}", peer_id, e), + match self.legacy_connected.contains(&peer_id) { + true => match self.legacy_manager.send_to(&peer_id, data) { + Ok(_) => trace!(target: "validator-network", "Sending data to {} through legacy.", peer_id), + Err(e) => trace!(target: "validator-network", "Failed sending to {} through legacy: {}", peer_id, e), + }, + false => match self.manager.send_to(&peer_id, data) { + Ok(_) => trace!(target: "validator-network", "Sending data to {}.", peer_id), + Err(e) => trace!(target: "validator-network", "Failed sending to {}: {}", peer_id, e), + }, } }, }, // received information from a spawned worker managing a connection // check if we still want to be connected to the peer, and if so, spawn a new worker or actually add proper connection - Some((peer_id, maybe_data_for_network)) = worker_results.next() => { + Some((peer_id, maybe_data_for_network, connection_type)) = worker_results.next() => { + use ConnectionType::*; + let spawn_new_legacy_connection = match connection_type { + LegacyIncoming => { + self.manager.remove_peer(&peer_id); + self.legacy_connected.insert(peer_id.clone()) + }, + LegacyOutgoing => { + self.manager.remove_peer(&peer_id); + self.legacy_connected.insert(peer_id.clone()); + false + } + New => { + // We always return New if connecting fails, so only New+Some means we + // actually negotiated the new protocol, otherwise we should stay + // with our previous guess. + if maybe_data_for_network.is_some() { + self.legacy_connected.remove(&peer_id); + } + false + }, + }; + if spawn_new_legacy_connection { + match self.legacy_manager.peer_addresses(&peer_id) { + Some(addresses) => self.spawn_new_outgoing(peer_id.clone(), addresses, result_for_parent.clone()), + None => { + self.legacy_connected.remove(&peer_id); + }, + } + } use AddResult::*; match maybe_data_for_network { - Some(data_for_network) => match self.manager.add_connection(peer_id.clone(), data_for_network) { + Some(data_for_network) => match self.add_connection(peer_id.clone(), data_for_network, connection_type) { Uninterested => warn!(target: "validator-network", "Established connection with peer {} for unknown reasons.", peer_id), Added => info!(target: "validator-network", "New connection with peer {}.", peer_id), Replaced => info!(target: "validator-network", "Replaced connection with peer {}.", peer_id), }, - None => if let Some(addresses) = self.manager.peer_addresses(&peer_id) { + None => if let Some(addresses) = self.peer_addresses(&peer_id) { self.spawn_new_outgoing(peer_id, addresses, result_for_parent.clone()); } } }, // periodically reporting what we are trying to do _ = status_ticker.tick() => { - info!(target: "validator-network", "Validator Network status: {}", self.manager.status_report()) + info!(target: "validator-network", "Validator Network status: {}", self.manager.status_report()); + debug!(target: "validator-network", "Validator Network legacy status: {}", self.legacy_manager.status_report()); } // received exit signal, stop the network // all workers will be killed automatically after the manager gets dropped From e28497d4ae3809bf66296e12445742d037134a14 Mon Sep 17 00:00:00 2001 From: timorl Date: Fri, 4 Nov 2022 16:29:53 +0100 Subject: [PATCH 3/4] Formatting, heh --- .../validator_network/manager/direction.rs | 32 ++++++++++---- .../src/validator_network/manager/legacy.rs | 19 ++++++--- .../src/validator_network/manager/mod.rs | 42 ++++++++++++------- .../src/validator_network/outgoing.rs | 17 ++++++-- .../src/validator_network/protocols/mod.rs | 19 +++++++-- .../protocols/negotiation.rs | 15 +++---- .../src/validator_network/protocols/v0/mod.rs | 37 ++++++++++------ .../src/validator_network/protocols/v1/mod.rs | 34 ++++++++------- .../src/validator_network/service.rs | 25 +++++++---- 9 files changed, 161 insertions(+), 79 deletions(-) diff --git a/finality-aleph/src/validator_network/manager/direction.rs b/finality-aleph/src/validator_network/manager/direction.rs index afa5742fbe..6b6db0715d 100644 --- a/finality-aleph/src/validator_network/manager/direction.rs +++ b/finality-aleph/src/validator_network/manager/direction.rs @@ -13,16 +13,21 @@ pub struct DirectedPeers { fn bit_xor_sum_parity((a, b): (u8, u8)) -> u8 { let mut result = 0; for i in 0..8 { - result += ((a >>i)^(b>>i)) % 2; + result += ((a >> i) ^ (b >> i)) % 2; } - result%2 + result % 2 } // Whether we shold call the remote or the other way around. We xor the peer ids and based on the // parity of the sum of bits of the result decide whether the caller should be the smaller or // greated lexicographically. They are never equal, because cryptography. fn should_call(own_id: &[u8], remote_id: &[u8]) -> bool { - let xor_sum_parity: u8 = own_id.iter().cloned().zip(remote_id.iter().cloned()).map(bit_xor_sum_parity).fold(0u8, |a, b| (a + b) % 2); + let xor_sum_parity: u8 = own_id + .iter() + .cloned() + .zip(remote_id.iter().cloned()) + .map(bit_xor_sum_parity) + .fold(0u8, |a, b| (a + b) % 2); match xor_sum_parity == 0 { true => own_id < remote_id, false => own_id > remote_id, @@ -52,7 +57,7 @@ impl DirectedPeers { // so we don't need them. self.incoming.insert(peer_id); false - }, + } } } @@ -108,7 +113,10 @@ mod tests { String::from("a/b/c"), String::from("43.43.43.43:43000"), ]; - assert!(own_container.add_peer(remote_id, addresses.clone()) != remote_container.add_peer(own_id, addresses.clone())); + assert!( + own_container.add_peer(remote_id, addresses.clone()) + != remote_container.add_peer(own_id, addresses.clone()) + ); } async fn container_with_added_connecting_peer() -> (DirectedPeers
, AuthorityId) { @@ -124,7 +132,7 @@ mod tests { false => { remote_container.add_peer(own_id.clone(), addresses); (remote_container, own_id) - }, + } } } @@ -141,7 +149,7 @@ mod tests { true => { remote_container.add_peer(own_id.clone(), addresses); (remote_container, own_id) - }, + } } } @@ -190,14 +198,20 @@ mod tests { #[tokio::test] async fn connecting_are_outgoing() { let (own_container, remote_id) = container_with_added_connecting_peer().await; - assert_eq!(own_container.outgoing_peers().collect::>(), vec![&remote_id]); + assert_eq!( + own_container.outgoing_peers().collect::>(), + vec![&remote_id] + ); assert_eq!(own_container.incoming_peers().next(), None); } #[tokio::test] async fn nonconnecting_are_incoming() { let (own_container, remote_id) = container_with_added_nonconnecting_peer().await; - assert_eq!(own_container.incoming_peers().collect::>(), vec![&remote_id]); + assert_eq!( + own_container.incoming_peers().collect::>(), + vec![&remote_id] + ); assert_eq!(own_container.outgoing_peers().next(), None); } diff --git a/finality-aleph/src/validator_network/manager/legacy.rs b/finality-aleph/src/validator_network/manager/legacy.rs index 6e05888562..76e3220e68 100644 --- a/finality-aleph/src/validator_network/manager/legacy.rs +++ b/finality-aleph/src/validator_network/manager/legacy.rs @@ -6,7 +6,13 @@ use std::{ use aleph_primitives::AuthorityId; use futures::channel::mpsc; -use crate::{network::PeerId, validator_network::{Data, manager::{AddResult, SendError}}}; +use crate::{ + network::PeerId, + validator_network::{ + manager::{AddResult, SendError}, + Data, + }, +}; /// Network component responsible for holding the list of peers that we /// want to connect to, and managing the established connections. @@ -172,7 +178,11 @@ impl Manager { /// Add an established incoming connection with a known peer, /// but only if the peer is on the list of peers that we want to stay connected with. - pub fn add_incoming(&mut self, peer_id: AuthorityId, exit: mpsc::UnboundedSender) -> AddResult { + pub fn add_incoming( + &mut self, + peer_id: AuthorityId, + exit: mpsc::UnboundedSender, + ) -> AddResult { use AddResult::*; if !self.addresses.contains_key(&peer_id) { return Uninterested; @@ -210,10 +220,7 @@ impl Manager { #[cfg(test)] mod tests { - use futures::{ - channel::mpsc, - StreamExt, - }; + use futures::{channel::mpsc, StreamExt}; use super::{AddResult::*, Manager, SendError}; use crate::validator_network::mock::key; diff --git a/finality-aleph/src/validator_network/manager/mod.rs b/finality-aleph/src/validator_network/manager/mod.rs index 560fecea6e..5e4ff1dc21 100644 --- a/finality-aleph/src/validator_network/manager/mod.rs +++ b/finality-aleph/src/validator_network/manager/mod.rs @@ -11,8 +11,8 @@ use crate::{network::PeerId, validator_network::Data}; mod direction; mod legacy; -pub use legacy::Manager as LegacyManager; use direction::DirectedPeers; +pub use legacy::Manager as LegacyManager; /// Network component responsible for holding the list of peers that we /// want to connect to or let them connect to us, and managing the established @@ -85,8 +85,7 @@ impl ManagerStatus { } fn pretty_authority_id_set(set: &HashSet) -> String { - set - .iter() + set.iter() .map(|authority_id| authority_id.to_short_string()) .collect::>() .join(", ") @@ -121,7 +120,7 @@ impl Display for ManagerStatus { pretty_authority_id_set(&self.missing_incoming), )?; } - }, + } } match wanted_outgoing { @@ -144,7 +143,7 @@ impl Display for ManagerStatus { pretty_authority_id_set(&self.missing_outgoing), )?; } - }, + } } Ok(()) @@ -172,7 +171,10 @@ impl Manager { } fn active_connection(&self, peer_id: &AuthorityId) -> bool { - self.have.get(peer_id).map(|sender| !sender.is_closed()).unwrap_or(false) + self.have + .get(peer_id) + .map(|sender| !sender.is_closed()) + .unwrap_or(false) } /// Add a peer to the list of peers we want to stay connected to, or @@ -229,10 +231,7 @@ impl Manager { #[cfg(test)] mod tests { - use futures::{ - channel::mpsc, - StreamExt, - }; + use futures::{channel::mpsc, StreamExt}; use super::{AddResult::*, Manager, SendError}; use crate::validator_network::mock::key; @@ -286,7 +285,10 @@ mod tests { ]; let (tx, _rx) = mpsc::unbounded(); // try add unknown peer - assert_eq!(connecting_manager.add_connection(listening_id.clone(), tx), Uninterested); + assert_eq!( + connecting_manager.add_connection(listening_id.clone(), tx), + Uninterested + ); // sending should fail assert_eq!( connecting_manager.send_to(&listening_id, data.clone()), @@ -308,15 +310,25 @@ mod tests { } // add outgoing to connecting let (tx, mut rx) = mpsc::unbounded(); - assert_eq!(connecting_manager.add_connection(listening_id.clone(), tx), Added); + assert_eq!( + connecting_manager.add_connection(listening_id.clone(), tx), + Added + ); // send and receive connecting - assert!(connecting_manager.send_to(&listening_id, data.clone()).is_ok()); + assert!(connecting_manager + .send_to(&listening_id, data.clone()) + .is_ok()); assert_eq!(data, rx.next().await.expect("should receive")); // add incoming to listening let (tx, mut rx) = mpsc::unbounded(); - assert_eq!(listening_manager.add_connection(connecting_id.clone(), tx), Added); + assert_eq!( + listening_manager.add_connection(connecting_id.clone(), tx), + Added + ); // send and receive listening - assert!(listening_manager.send_to(&connecting_id, data.clone()).is_ok()); + assert!(listening_manager + .send_to(&connecting_id, data.clone()) + .is_ok()); assert_eq!(data, rx.next().await.expect("should receive")); // remove peer listening_manager.remove_peer(&connecting_id); diff --git a/finality-aleph/src/validator_network/outgoing.rs b/finality-aleph/src/validator_network/outgoing.rs index 1c479d6acb..2720df9aee 100644 --- a/finality-aleph/src/validator_network/outgoing.rs +++ b/finality-aleph/src/validator_network/outgoing.rs @@ -8,7 +8,9 @@ use tokio::time::{sleep, Duration}; use crate::{ crypto::AuthorityPen, validator_network::{ - protocols::{protocol, ConnectionType, ProtocolError, ProtocolNegotiationError, ResultForService}, + protocols::{ + protocol, ConnectionType, ProtocolError, ProtocolNegotiationError, ResultForService, + }, Data, Dialer, }, }; @@ -59,7 +61,13 @@ async fn manage_outgoing>( let (stream, protocol) = protocol(stream).await?; debug!(target: "validator-network", "Negotiated protocol, running."); Ok(protocol - .manage_outgoing(stream, authority_pen, peer_id, result_for_parent, data_for_user) + .manage_outgoing( + stream, + authority_pen, + peer_id, + result_for_parent, + data_for_user, + ) .await?) } @@ -90,7 +98,10 @@ pub async fn outgoing>( sleep(RETRY_DELAY).await; // we send the "new" connection type, because we always assume it's new until proven // otherwise, and here we did not even get the chance to attempt negotiating a protocol - if result_for_parent.unbounded_send((peer_id, None, ConnectionType::New)).is_err() { + if result_for_parent + .unbounded_send((peer_id, None, ConnectionType::New)) + .is_err() + { debug!(target: "validator-network", "Could not send the closing message, we've probably been terminated by the parent service."); } } diff --git a/finality-aleph/src/validator_network/protocols/mod.rs b/finality-aleph/src/validator_network/protocols/mod.rs index 323f690c10..8c26661a96 100644 --- a/finality-aleph/src/validator_network/protocols/mod.rs +++ b/finality-aleph/src/validator_network/protocols/mod.rs @@ -11,14 +11,12 @@ use crate::{ }, }; - mod handshake; mod negotiation; mod v0; mod v1; use handshake::HandshakeError; - pub use negotiation::{protocol, ProtocolNegotiationError}; pub type Version = u32; @@ -36,7 +34,11 @@ pub enum ConnectionType { /// of the remote node, followed by a channel for sending data to that node, with None if the /// connection was unsuccessful and should be reestablished. Finally a marker for legacy /// compatibility. -pub type ResultForService = mpsc::UnboundedSender<(AuthorityId, Option>, ConnectionType)>; +pub type ResultForService = mpsc::UnboundedSender<( + AuthorityId, + Option>, + ConnectionType, +)>; /// Defines the protocol for communication. #[derive(Debug, PartialEq, Eq)] @@ -131,7 +133,16 @@ impl Protocol { use Protocol::*; match self { V0 => v0::outgoing(stream, authority_pen, peer_id, result_for_service).await, - V1 => v1::outgoing(stream, authority_pen, peer_id, result_for_service, data_for_user).await, + V1 => { + v1::outgoing( + stream, + authority_pen, + peer_id, + result_for_service, + data_for_user, + ) + .await + } } } } diff --git a/finality-aleph/src/validator_network/protocols/negotiation.rs b/finality-aleph/src/validator_network/protocols/negotiation.rs index 4fd2ff67f0..52f0f1203a 100644 --- a/finality-aleph/src/validator_network/protocols/negotiation.rs +++ b/finality-aleph/src/validator_network/protocols/negotiation.rs @@ -70,12 +70,8 @@ impl ProtocolsRange { fn decode(encoded: &[u8; 8]) -> Result { let result = ProtocolsRange( - Version::from_le_bytes( - encoded[0..4].try_into().expect("this is literally 4 bytes"), - ), - Version::from_le_bytes( - encoded[4..8].try_into().expect("this is literally 4 bytes"), - ), + Version::from_le_bytes(encoded[0..4].try_into().expect("this is literally 4 bytes")), + Version::from_le_bytes(encoded[4..8].try_into().expect("this is literally 4 bytes")), ); match result.valid() { true => Ok(result), @@ -99,7 +95,12 @@ fn maximum_of_intersection( range1: ProtocolsRange, range2: ProtocolsRange, ) -> Result { - intersection(range1, range2).map(|intersection| intersection.1.try_into().map_err(ProtocolNegotiationError::BadChoice))? + intersection(range1, range2).map(|intersection| { + intersection + .1 + .try_into() + .map_err(ProtocolNegotiationError::BadChoice) + })? } async fn negotiate_protocol_version( diff --git a/finality-aleph/src/validator_network/protocols/v0/mod.rs b/finality-aleph/src/validator_network/protocols/v0/mod.rs index 1fb832fcc6..dfa53b58e0 100644 --- a/finality-aleph/src/validator_network/protocols/v0/mod.rs +++ b/finality-aleph/src/validator_network/protocols/v0/mod.rs @@ -1,19 +1,16 @@ use aleph_primitives::AuthorityId; -use futures::{ - channel::mpsc, - StreamExt, -}; +use futures::{channel::mpsc, StreamExt}; use log::{debug, info, trace}; use tokio::io::{AsyncRead, AsyncWrite}; use crate::{ crypto::AuthorityPen, validator_network::{ + io::{receive_data, send_data}, protocols::{ - ProtocolError, ResultForService, ConnectionType, handshake::{v0_handshake_incoming, v0_handshake_outgoing}, + ConnectionType, ProtocolError, ResultForService, }, - io::{receive_data, send_data}, Data, Splittable, }, }; @@ -50,7 +47,11 @@ pub async fn outgoing( info!(target: "validator-network", "Outgoing handshake with {} finished successfully.", peer_id); let (data_for_network, data_from_user) = mpsc::unbounded(); result_for_parent - .unbounded_send((peer_id.clone(), Some(data_for_network), ConnectionType::LegacyOutgoing)) + .unbounded_send(( + peer_id.clone(), + Some(data_for_network), + ConnectionType::LegacyOutgoing, + )) .map_err(|_| ProtocolError::NoParentConnection)?; let sending = sending(sender, data_from_user); @@ -94,7 +95,11 @@ pub async fn incoming( let (tx_exit, mut exit) = mpsc::unbounded(); result_for_parent - .unbounded_send((peer_id.clone(), Some(tx_exit), ConnectionType::LegacyIncoming)) + .unbounded_send(( + peer_id.clone(), + Some(tx_exit), + ConnectionType::LegacyIncoming, + )) .map_err(|_| ProtocolError::NoParentConnection)?; let receiving = receiving(receiver, data_for_user); @@ -118,12 +123,12 @@ mod tests { pin_mut, FutureExt, StreamExt, }; - use super::{ProtocolError, incoming, outgoing}; + use super::{incoming, outgoing, ProtocolError}; use crate::{ crypto::AuthorityPen, validator_network::{ - protocols::ConnectionType, mock::{key, MockSplittable}, + protocols::ConnectionType, Data, }, }; @@ -136,8 +141,16 @@ mod tests { impl futures::Future>, impl futures::Future>, UnboundedReceiver, - UnboundedReceiver<(AuthorityId, Option>, ConnectionType)>, - UnboundedReceiver<(AuthorityId, Option>, ConnectionType)>, + UnboundedReceiver<( + AuthorityId, + Option>, + ConnectionType, + )>, + UnboundedReceiver<( + AuthorityId, + Option>, + ConnectionType, + )>, ) { let (stream_incoming, stream_outgoing) = MockSplittable::new(4096); let (id_incoming, pen_incoming) = key().await; diff --git a/finality-aleph/src/validator_network/protocols/v1/mod.rs b/finality-aleph/src/validator_network/protocols/v1/mod.rs index 1870249848..b0287bca4b 100644 --- a/finality-aleph/src/validator_network/protocols/v1/mod.rs +++ b/finality-aleph/src/validator_network/protocols/v1/mod.rs @@ -1,9 +1,6 @@ use aleph_primitives::AuthorityId; use codec::{Decode, Encode}; -use futures::{ - channel::mpsc, - StreamExt, -}; +use futures::{channel::mpsc, StreamExt}; use log::{debug, info, trace}; use tokio::{ io::{AsyncRead, AsyncWrite}, @@ -13,11 +10,11 @@ use tokio::{ use crate::{ crypto::AuthorityPen, validator_network::{ + io::{receive_data, send_data}, protocols::{ - ProtocolError, ConnectionType, ResultForService, handshake::{v0_handshake_incoming, v0_handshake_outgoing}, + ConnectionType, ProtocolError, ResultForService, }, - io::{receive_data, send_data}, Data, Splittable, }, }; @@ -37,10 +34,7 @@ async fn sending( ) -> Result<(), ProtocolError> { use Message::*; loop { - let to_send = match timeout( - HEARTBEAT_TIMEOUT, - data_from_user.next(), - ).await { + let to_send = match timeout(HEARTBEAT_TIMEOUT, data_from_user.next()).await { Ok(maybe_data) => match maybe_data { Some(data) => Data(data), // We have been closed by the parent service, all good. @@ -61,7 +55,9 @@ async fn receiving( let (old_stream, message) = timeout( MAX_MISSED_HEARTBEATS * HEARTBEAT_TIMEOUT, receive_data(stream), - ).await.map_err(|_| ProtocolError::CardiacArrest)??; + ) + .await + .map_err(|_| ProtocolError::CardiacArrest)??; stream = old_stream; match message { Data(data) => data_for_user @@ -133,12 +129,12 @@ mod tests { pin_mut, FutureExt, StreamExt, }; - use super::{ProtocolError, incoming, outgoing}; + use super::{incoming, outgoing, ProtocolError}; use crate::{ crypto::AuthorityPen, validator_network::{ - protocols::ConnectionType, mock::{key, MockSplittable}, + protocols::ConnectionType, Data, }, }; @@ -152,8 +148,16 @@ mod tests { impl futures::Future>, UnboundedReceiver, UnboundedReceiver, - UnboundedReceiver<(AuthorityId, Option>, ConnectionType)>, - UnboundedReceiver<(AuthorityId, Option>, ConnectionType)>, + UnboundedReceiver<( + AuthorityId, + Option>, + ConnectionType, + )>, + UnboundedReceiver<( + AuthorityId, + Option>, + ConnectionType, + )>, ) { let (stream_incoming, stream_outgoing) = MockSplittable::new(4096); let (id_incoming, pen_incoming) = key().await; diff --git a/finality-aleph/src/validator_network/service.rs b/finality-aleph/src/validator_network/service.rs index e83ff0b3cb..d81496b653 100644 --- a/finality-aleph/src/validator_network/service.rs +++ b/finality-aleph/src/validator_network/service.rs @@ -11,10 +11,10 @@ use tokio::time; use crate::{ crypto::AuthorityPen, validator_network::{ - protocols::{ResultForService, ConnectionType}, incoming::incoming, manager::{AddResult, LegacyManager, Manager}, outgoing::outgoing, + protocols::{ConnectionType, ResultForService}, Data, Dialer, Listener, Network, }, SpawnTaskHandle, STATUS_REPORT_INTERVAL, @@ -129,15 +129,19 @@ impl, NL: Listener> Service { let next_to_interface = self.next_to_interface.clone(); self.spawn_handle .spawn("aleph/validator_network_outgoing", None, async move { - outgoing(authority_pen, peer_id, dialer, addresses, result_for_parent, next_to_interface).await; + outgoing( + authority_pen, + peer_id, + dialer, + addresses, + result_for_parent, + next_to_interface, + ) + .await; }); } - fn spawn_new_incoming( - &self, - stream: NL::Connection, - result_for_parent: ResultForService, - ) { + fn spawn_new_incoming(&self, stream: NL::Connection, result_for_parent: ResultForService) { let authority_pen = self.authority_pen.clone(); let next_to_interface = self.next_to_interface.clone(); self.spawn_handle @@ -153,7 +157,12 @@ impl, NL: Listener> Service { } } - fn add_connection(&mut self, peer_id: AuthorityId, data_for_network: mpsc::UnboundedSender, connection_type: ConnectionType) -> AddResult { + fn add_connection( + &mut self, + peer_id: AuthorityId, + data_for_network: mpsc::UnboundedSender, + connection_type: ConnectionType, + ) -> AddResult { use ConnectionType::*; match connection_type { New => self.manager.add_connection(peer_id, data_for_network), From 15e6e9f4ebddb19fe723bf5b8f8f2216f9d803e2 Mon Sep 17 00:00:00 2001 From: timorl Date: Tue, 8 Nov 2022 09:44:40 +0100 Subject: [PATCH 4/4] The type should be what is sent, not the sending mechanism --- .../src/validator_network/incoming.rs | 4 ++-- .../src/validator_network/outgoing.rs | 4 ++-- .../src/validator_network/protocols/mod.rs | 8 ++++---- .../src/validator_network/protocols/v0/mod.rs | 18 +++++------------- .../src/validator_network/service.rs | 8 ++++++-- 5 files changed, 19 insertions(+), 23 deletions(-) diff --git a/finality-aleph/src/validator_network/incoming.rs b/finality-aleph/src/validator_network/incoming.rs index ad705d526a..56692564c8 100644 --- a/finality-aleph/src/validator_network/incoming.rs +++ b/finality-aleph/src/validator_network/incoming.rs @@ -41,7 +41,7 @@ impl From for IncomingError { async fn manage_incoming( authority_pen: AuthorityPen, stream: S, - result_for_parent: ResultForService, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), IncomingError> { debug!(target: "validator-network", "Performing incoming protocol negotiation."); @@ -60,7 +60,7 @@ async fn manage_incoming( pub async fn incoming( authority_pen: AuthorityPen, stream: S, - result_for_parent: ResultForService, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) { if let Err(e) = manage_incoming(authority_pen, stream, result_for_parent, data_for_user).await { diff --git a/finality-aleph/src/validator_network/outgoing.rs b/finality-aleph/src/validator_network/outgoing.rs index 2720df9aee..a72a605480 100644 --- a/finality-aleph/src/validator_network/outgoing.rs +++ b/finality-aleph/src/validator_network/outgoing.rs @@ -49,7 +49,7 @@ async fn manage_outgoing>( peer_id: AuthorityId, mut dialer: ND, addresses: Vec, - result_for_parent: ResultForService, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), OutgoingError> { debug!(target: "validator-network", "Trying to connect to {}.", peer_id); @@ -81,7 +81,7 @@ pub async fn outgoing>( peer_id: AuthorityId, dialer: ND, addresses: Vec, - result_for_parent: ResultForService, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) { if let Err(e) = manage_outgoing( diff --git a/finality-aleph/src/validator_network/protocols/mod.rs b/finality-aleph/src/validator_network/protocols/mod.rs index 8c26661a96..d16430d205 100644 --- a/finality-aleph/src/validator_network/protocols/mod.rs +++ b/finality-aleph/src/validator_network/protocols/mod.rs @@ -34,11 +34,11 @@ pub enum ConnectionType { /// of the remote node, followed by a channel for sending data to that node, with None if the /// connection was unsuccessful and should be reestablished. Finally a marker for legacy /// compatibility. -pub type ResultForService = mpsc::UnboundedSender<( +pub type ResultForService = ( AuthorityId, Option>, ConnectionType, -)>; +); /// Defines the protocol for communication. #[derive(Debug, PartialEq, Eq)] @@ -111,7 +111,7 @@ impl Protocol { &self, stream: S, authority_pen: AuthorityPen, - result_for_service: ResultForService, + result_for_service: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { use Protocol::*; @@ -127,7 +127,7 @@ impl Protocol { stream: S, authority_pen: AuthorityPen, peer_id: AuthorityId, - result_for_service: ResultForService, + result_for_service: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { use Protocol::*; diff --git a/finality-aleph/src/validator_network/protocols/v0/mod.rs b/finality-aleph/src/validator_network/protocols/v0/mod.rs index dfa53b58e0..1f80de2baf 100644 --- a/finality-aleph/src/validator_network/protocols/v0/mod.rs +++ b/finality-aleph/src/validator_network/protocols/v0/mod.rs @@ -40,7 +40,7 @@ pub async fn outgoing( stream: S, authority_pen: AuthorityPen, peer_id: AuthorityId, - result_for_parent: ResultForService, + result_for_parent: mpsc::UnboundedSender>, ) -> Result<(), ProtocolError> { trace!(target: "validator-network", "Extending hand to {}.", peer_id); let (sender, receiver) = v0_handshake_outgoing(stream, authority_pen, peer_id.clone()).await?; @@ -86,7 +86,7 @@ async fn receiving( pub async fn incoming( stream: S, authority_pen: AuthorityPen, - result_for_parent: ResultForService, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { trace!(target: "validator-network", "Waiting for extended hand..."); @@ -128,7 +128,7 @@ mod tests { crypto::AuthorityPen, validator_network::{ mock::{key, MockSplittable}, - protocols::ConnectionType, + protocols::{ConnectionType, ResultForService}, Data, }, }; @@ -141,16 +141,8 @@ mod tests { impl futures::Future>, impl futures::Future>, UnboundedReceiver, - UnboundedReceiver<( - AuthorityId, - Option>, - ConnectionType, - )>, - UnboundedReceiver<( - AuthorityId, - Option>, - ConnectionType, - )>, + UnboundedReceiver>, + UnboundedReceiver>, ) { let (stream_incoming, stream_outgoing) = MockSplittable::new(4096); let (id_incoming, pen_incoming) = key().await; diff --git a/finality-aleph/src/validator_network/service.rs b/finality-aleph/src/validator_network/service.rs index d81496b653..c1c94c5ebe 100644 --- a/finality-aleph/src/validator_network/service.rs +++ b/finality-aleph/src/validator_network/service.rs @@ -122,7 +122,7 @@ impl, NL: Listener> Service { &self, peer_id: AuthorityId, addresses: Vec, - result_for_parent: ResultForService, + result_for_parent: mpsc::UnboundedSender>, ) { let authority_pen = self.authority_pen.clone(); let dialer = self.dialer.clone(); @@ -141,7 +141,11 @@ impl, NL: Listener> Service { }); } - fn spawn_new_incoming(&self, stream: NL::Connection, result_for_parent: ResultForService) { + fn spawn_new_incoming( + &self, + stream: NL::Connection, + result_for_parent: mpsc::UnboundedSender>, + ) { let authority_pen = self.authority_pen.clone(); let next_to_interface = self.next_to_interface.clone(); self.spawn_handle