From 6168868dad5d0a42abc19bce3c05fa483729774a Mon Sep 17 00:00:00 2001 From: timorl Date: Thu, 15 Dec 2022 13:54:36 +0100 Subject: [PATCH 1/2] Improve network manager interface --- finality-aleph/src/network/io.rs | 40 - finality-aleph/src/network/manager/data.rs | 34 + .../src/network/manager/discovery.rs | 6 +- finality-aleph/src/network/manager/manager.rs | 651 ++++++++++++ finality-aleph/src/network/manager/mod.rs | 103 +- finality-aleph/src/network/manager/service.rs | 925 ++++-------------- finality-aleph/src/network/manager/session.rs | 30 +- finality-aleph/src/network/mod.rs | 59 +- finality-aleph/src/network/session.rs | 143 --- finality-aleph/src/nodes/validator_node.rs | 16 +- finality-aleph/src/party/manager/mod.rs | 52 +- finality-aleph/src/party/mocks.rs | 12 +- finality-aleph/src/party/mod.rs | 11 +- finality-aleph/src/party/traits.rs | 5 +- finality-aleph/src/testing/network.rs | 62 +- 15 files changed, 1067 insertions(+), 1082 deletions(-) delete mode 100644 finality-aleph/src/network/io.rs create mode 100644 finality-aleph/src/network/manager/data.rs create mode 100644 finality-aleph/src/network/manager/manager.rs delete mode 100644 finality-aleph/src/network/session.rs diff --git a/finality-aleph/src/network/io.rs b/finality-aleph/src/network/io.rs deleted file mode 100644 index d86b6b3bfe..0000000000 --- a/finality-aleph/src/network/io.rs +++ /dev/null @@ -1,40 +0,0 @@ -use std::fmt::Debug; - -use futures::channel::mpsc; - -use crate::network::{ - clique::{Network as CliqueNetwork, PublicKey}, - manager::{DataInSession, VersionedAuthentication}, - AddressingInformation, ConnectionManagerIO, Data, GossipNetwork, SessionManagerIO, -}; - -type FullIO = (ConnectionManagerIO, SessionManagerIO); - -pub fn setup< - D: Data, - M: Data + Debug, - A: AddressingInformation + TryFrom> + Into>, - CN: CliqueNetwork>, - GN: GossipNetwork>, ->( - validator_network: CN, - gossip_network: GN, -) -> FullIO -where - A::PeerId: PublicKey, -{ - // Prepare and start the network - let (commands_for_service, commands_from_user) = mpsc::unbounded(); - let (messages_for_service, commands_from_manager) = mpsc::unbounded(); - - let connection_io = ConnectionManagerIO::new( - commands_from_user, - commands_from_manager, - validator_network, - gossip_network, - ); - let channels_for_session_manager = - SessionManagerIO::new(commands_for_service, messages_for_service); - - (connection_io, channels_for_session_manager) -} diff --git a/finality-aleph/src/network/manager/data.rs b/finality-aleph/src/network/manager/data.rs new file mode 100644 index 0000000000..db7cf5f672 --- /dev/null +++ b/finality-aleph/src/network/manager/data.rs @@ -0,0 +1,34 @@ +use codec::{Decode, Encode, Error, Input, Output}; + +use crate::{network::Data, SessionId}; + +/// Data inside session, sent to validator network. +/// Wrapper for data send over network. We need it to ensure compatibility. +/// The order of the data and session_id is fixed in encode and the decode expects it to be data, session_id. +/// Since data is versioned, i.e. it's encoding starts with a version number in the standardized way, +/// this will allow us to retrofit versioning here if we ever need to change this structure. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct DataInSession { + pub data: D, + pub session_id: SessionId, +} + +impl Decode for DataInSession { + fn decode(input: &mut I) -> Result { + let data = D::decode(input)?; + let session_id = SessionId::decode(input)?; + + Ok(Self { data, session_id }) + } +} + +impl Encode for DataInSession { + fn size_hint(&self) -> usize { + self.data.size_hint() + self.session_id.size_hint() + } + + fn encode_to(&self, dest: &mut T) { + self.data.encode_to(dest); + self.session_id.encode_to(dest); + } +} diff --git a/finality-aleph/src/network/manager/discovery.rs b/finality-aleph/src/network/manager/discovery.rs index 4e36513b69..dcad44b47d 100644 --- a/finality-aleph/src/network/manager/discovery.rs +++ b/finality-aleph/src/network/manager/discovery.rs @@ -123,9 +123,11 @@ mod tests { use crate::{ network::{ clique::mock::{random_address, MockAddressingInformation}, - manager::{compatibility::PeerAuthentications, SessionHandler}, + manager::{ + authentication, compatibility::PeerAuthentications, legacy_authentication, + SessionHandler, + }, mock::crypto_basics, - testing::{authentication, legacy_authentication}, }, SessionId, }; diff --git a/finality-aleph/src/network/manager/manager.rs b/finality-aleph/src/network/manager/manager.rs new file mode 100644 index 0000000000..56e515c2fe --- /dev/null +++ b/finality-aleph/src/network/manager/manager.rs @@ -0,0 +1,651 @@ +use std::{ + collections::{HashMap, HashSet}, + fmt::Debug, + time::Duration, +}; + +use futures::channel::mpsc; +use log::{debug, info}; + +use crate::{ + abft::Recipient, + crypto::{AuthorityPen, AuthorityVerifier}, + network::{ + manager::{ + compatibility::PeerAuthentications, data::DataInSession, Connections, Discovery, + DiscoveryMessage, SessionHandler, SessionHandlerError, + }, + AddressingInformation, Data, NetworkIdentity, PeerId, + }, + NodeIndex, SessionId, +}; + +/// Commands for manipulating the reserved peers set. +#[derive(Debug, PartialEq, Eq)] +pub enum ConnectionCommand { + AddReserved(HashSet), + DelReserved(HashSet), +} + +// In practice D: Data and P: PeerId, but we cannot require that in type aliases. +pub type AddressedData = (D, P); + +struct Session> + Into>> { + handler: SessionHandler, + discovery: Discovery, + data_for_user: Option>, +} + +#[derive(Clone)] +/// Stores all data needed for starting validator session +pub struct PreValidatorSession { + pub session_id: SessionId, + pub verifier: AuthorityVerifier, + pub node_id: NodeIndex, + pub pen: AuthorityPen, +} + +#[derive(Clone)] +/// Stores all data needed for starting non-validator session +pub struct PreNonvalidatorSession { + pub session_id: SessionId, + pub verifier: AuthorityVerifier, +} + +/// Actions that the manager wants to take as the result of some information. Might contain a +/// command for connecting to or disconnecting from some peers or a message to broadcast for +/// discovery purposes. +pub struct ManagerActions> + Into>> { + pub maybe_command: Option>, + pub maybe_message: Option>, +} + +impl> + Into>> ManagerActions { + fn noop() -> Self { + ManagerActions { + maybe_command: None, + maybe_message: None, + } + } +} + +/// The connection manager. It handles the abstraction over the network we build to support +/// separate sessions. This includes: +/// 1. Starting and ending specific sessions on user demand. +/// 2. Forwarding in-session user messages to the network using session handlers for address +/// translation. +/// 3. Handling network messages: +/// 1. In-session messages are forwarded to the user. +/// 2. Authentication messages forwarded to session handlers. +/// 4. Running periodic maintenance, mostly related to node discovery. +pub struct Manager +where + NI::AddressingInformation: TryFrom> + Into>, +{ + network_identity: NI, + connections: Connections, + sessions: HashMap>, + discovery_cooldown: Duration, +} + +/// Error when trying to forward data from the network to the user, should never be fatal. +#[derive(Debug, PartialEq, Eq)] +pub enum SendError { + UserSend, + NoSession, +} + +impl Manager +where + NI::AddressingInformation: TryFrom> + Into>, +{ + /// Create a new connection manager. + pub fn new(network_identity: NI, discovery_cooldown: Duration) -> Self { + Manager { + network_identity, + connections: Connections::new(), + sessions: HashMap::new(), + discovery_cooldown, + } + } + + fn delete_reserved( + to_remove: HashSet, + ) -> Option> { + match to_remove.is_empty() { + true => None, + false => Some(ConnectionCommand::DelReserved(to_remove)), + } + } + + /// Ends a session. + pub fn finish_session( + &mut self, + session_id: SessionId, + ) -> ManagerActions { + self.sessions.remove(&session_id); + ManagerActions { + maybe_command: Self::delete_reserved(self.connections.remove_session(session_id)), + maybe_message: None, + } + } + + fn discover_authorities( + &mut self, + session_id: &SessionId, + ) -> Option> { + self.sessions.get_mut(session_id).and_then( + |Session { + handler, discovery, .. + }| { discovery.discover_authorities(handler) }, + ) + } + + /// Returns all the network messages that should be sent as part of discovery at this moment. + pub fn discovery(&mut self) -> Vec> { + let sessions: Vec<_> = self.sessions.keys().cloned().collect(); + sessions + .iter() + .flat_map(|session_id| self.discover_authorities(session_id)) + .collect() + } + + async fn start_validator_session( + &mut self, + pre_session: PreValidatorSession, + address: NI::AddressingInformation, + ) -> ( + Option>, + mpsc::UnboundedReceiver, + ) { + let PreValidatorSession { + session_id, + verifier, + node_id, + pen, + } = pre_session; + let handler = + SessionHandler::new(Some((node_id, pen)), verifier, session_id, address).await; + let discovery = Discovery::new(self.discovery_cooldown); + let (data_for_user, data_from_network) = mpsc::unbounded(); + let data_for_user = Some(data_for_user); + self.sessions.insert( + session_id, + Session { + handler, + discovery, + data_for_user, + }, + ); + (self.discover_authorities(&session_id), data_from_network) + } + + /// Starts or updates a validator session. + pub async fn update_validator_session( + &mut self, + pre_session: PreValidatorSession, + ) -> Result< + ( + ManagerActions, + mpsc::UnboundedReceiver, + ), + SessionHandlerError, + > { + let address = self.network_identity.identity(); + let session = match self.sessions.get_mut(&pre_session.session_id) { + Some(session) => session, + None => { + let (maybe_message, data_from_network) = + self.start_validator_session(pre_session, address).await; + return Ok(( + ManagerActions { + maybe_command: None, + maybe_message, + }, + data_from_network, + )); + } + }; + let PreValidatorSession { + session_id, + verifier, + node_id, + pen, + } = pre_session; + let peers_to_stay = session + .handler + .update(Some((node_id, pen)), verifier, address) + .await? + .iter() + .map(|address| address.peer_id()) + .collect(); + let maybe_command = Self::delete_reserved( + self.connections + .remove_session(session_id) + .difference(&peers_to_stay) + .cloned() + .collect(), + ); + let (data_for_user, data_from_network) = mpsc::unbounded(); + session.data_for_user = Some(data_for_user); + self.connections.add_peers(session_id, peers_to_stay); + Ok(( + ManagerActions { + maybe_command, + maybe_message: self.discover_authorities(&session_id), + }, + data_from_network, + )) + } + + async fn start_nonvalidator_session( + &mut self, + pre_session: PreNonvalidatorSession, + address: NI::AddressingInformation, + ) { + let PreNonvalidatorSession { + session_id, + verifier, + } = pre_session; + let handler = SessionHandler::new(None, verifier, session_id, address).await; + let discovery = Discovery::new(self.discovery_cooldown); + self.sessions.insert( + session_id, + Session { + handler, + discovery, + data_for_user: None, + }, + ); + } + + /// Starts or updates a nonvalidator session. + pub async fn update_nonvalidator_session( + &mut self, + pre_session: PreNonvalidatorSession, + ) -> Result, SessionHandlerError> { + let address = self.network_identity.identity(); + match self.sessions.get_mut(&pre_session.session_id) { + Some(session) => { + session + .handler + .update(None, pre_session.verifier, address) + .await?; + } + None => { + self.start_nonvalidator_session(pre_session, address).await; + } + }; + Ok(ManagerActions::noop()) + } + + /// Handle a user request for sending data. + /// Returns a list of data to be sent over the network. + pub fn on_user_message( + &self, + data: D, + session_id: SessionId, + recipient: Recipient, + ) -> Vec, NI::PeerId>> { + if let Some(handler) = self + .sessions + .get(&session_id) + .map(|session| &session.handler) + { + let to_send = DataInSession { data, session_id }; + match recipient { + Recipient::Everyone => (0..handler.node_count().0) + .map(NodeIndex) + .flat_map(|node_id| handler.peer_id(&node_id)) + .map(|peer_id| (to_send.clone(), peer_id)) + .collect(), + Recipient::Node(node_id) => handler + .peer_id(&node_id) + .into_iter() + .map(|peer_id| (to_send.clone(), peer_id)) + .collect(), + } + } else { + Vec::new() + } + } + + /// Handle a discovery message. + /// Returns actions the manager wants to take. + pub fn on_discovery_message( + &mut self, + message: DiscoveryMessage, + ) -> ManagerActions { + use DiscoveryMessage::*; + let session_id = message.session_id(); + match self.sessions.get_mut(&session_id) { + Some(Session { + handler, discovery, .. + }) => { + let (maybe_address, maybe_message) = match message { + Authentication(authentication) => { + discovery.handle_authentication(authentication, handler) + } + LegacyAuthentication(legacy_authentication) => { + discovery.handle_legacy_authentication(legacy_authentication, handler) + } + }; + let maybe_command = match (maybe_address, handler.is_validator()) { + (Some(address), true) => { + debug!(target: "aleph-network", "Adding addresses for session {:?} to reserved: {:?}", session_id, address); + self.connections.add_peers(session_id, [address.peer_id()]); + Some(ConnectionCommand::AddReserved([address].into())) + } + _ => None, + }; + ManagerActions { + maybe_command, + maybe_message, + } + } + None => { + debug!(target: "aleph-network", "Received message from unknown session: {:?}", message); + ManagerActions::noop() + } + } + } + + /// Sends the data to the identified session. + pub fn send_session_data(&self, session_id: &SessionId, data: D) -> Result<(), SendError> { + match self + .sessions + .get(session_id) + .and_then(|session| session.data_for_user.as_ref()) + { + Some(data_for_user) => data_for_user + .unbounded_send(data) + .map_err(|_| SendError::UserSend), + None => Err(SendError::NoSession), + } + } + + pub fn status_report(&self) { + let mut status = String::from("Connection Manager status report: "); + + let mut authenticated: Vec<_> = self + .sessions + .iter() + .filter(|(_, session)| session.handler.authentication().is_some()) + .map(|(session_id, session)| { + let mut peers = session + .handler + .peers() + .into_iter() + .map(|(node_id, peer_id)| (node_id.0, peer_id)) + .collect::>(); + peers.sort_by(|x, y| x.0.cmp(&y.0)); + (session_id.0, session.handler.node_count().0, peers) + }) + .collect(); + authenticated.sort_by(|x, y| x.0.cmp(&y.0)); + if !authenticated.is_empty() { + let authenticated_status = authenticated + .iter() + .map(|(session_id, node_count, peers)| { + let peer_ids = peers + .iter() + .map(|(node_id, peer_id)| { + format!("{:?}: {}", node_id, peer_id.to_short_string()) + }) + .collect::>() + .join(", "); + + format!( + "{:?}: {}/{} {{{}}}", + session_id, + peers.len() + 1, + node_count, + peer_ids + ) + }) + .collect::>() + .join(", "); + status.push_str(&format!( + "authenticated authorities: {}; ", + authenticated_status + )); + } + + let mut missing: Vec<_> = self + .sessions + .iter() + .filter(|(_, session)| session.handler.authentication().is_some()) + .map(|(session_id, session)| { + ( + session_id.0, + session + .handler + .missing_nodes() + .iter() + .map(|id| id.0) + .collect::>(), + ) + }) + .filter(|(_, missing)| !missing.is_empty()) + .collect(); + missing.sort_by(|x, y| x.0.cmp(&y.0)); + if !missing.is_empty() { + let missing_status = missing + .iter() + .map(|(session_id, missing)| format!("{:?}: {:?}", session_id, missing)) + .collect::>() + .join(", "); + status.push_str(&format!("missing authorities: {}; ", missing_status)); + } + + if !authenticated.is_empty() || !missing.is_empty() { + info!(target: "aleph-network", "{}", status); + } + } +} + +#[cfg(test)] +mod tests { + use std::{iter, time::Duration}; + + use futures::StreamExt; + + use super::{ + ConnectionCommand, Manager, ManagerActions, PreNonvalidatorSession, PreValidatorSession, + SendError, + }; + use crate::{ + network::{ + clique::mock::{random_address, MockAddressingInformation}, + manager::{compatibility::PeerAuthentications, data::DataInSession, DiscoveryMessage}, + mock::crypto_basics, + }, + Recipient, SessionId, + }; + + const NUM_NODES: usize = 7; + const DISCOVERY_PERIOD: Duration = Duration::from_secs(60); + + fn build() -> Manager { + Manager::new(random_address(), DISCOVERY_PERIOD) + } + + #[tokio::test] + async fn starts_nonvalidator_session() { + let mut manager = build(); + let (_, verifier) = crypto_basics(NUM_NODES).await; + let session_id = SessionId(43); + let ManagerActions { + maybe_command, + maybe_message, + } = manager + .update_nonvalidator_session(PreNonvalidatorSession { + session_id, + verifier, + }) + .await + .unwrap(); + assert!(maybe_command.is_none()); + assert!(maybe_message.is_none()); + assert_eq!( + manager.send_session_data(&session_id, -43), + Err(SendError::NoSession) + ); + } + + #[tokio::test] + async fn starts_validator_session() { + let mut manager = build(); + let (validator_data, verifier) = crypto_basics(NUM_NODES).await; + let (node_id, pen) = validator_data[0].clone(); + let session_id = SessionId(43); + let ( + ManagerActions { + maybe_command, + maybe_message, + }, + _data_from_network, + ) = manager + .update_validator_session(PreValidatorSession { + session_id, + verifier, + node_id, + pen, + }) + .await + .unwrap(); + assert!(maybe_command.is_none()); + assert!(maybe_message.is_some()); + assert_eq!(manager.send_session_data(&session_id, -43), Ok(())); + } + + #[tokio::test] + async fn stops_session() { + let mut manager = build(); + let (validator_data, verifier) = crypto_basics(NUM_NODES).await; + let (node_id, pen) = validator_data[0].clone(); + let session_id = SessionId(43); + let ( + ManagerActions { + maybe_command, + maybe_message, + }, + mut data_from_network, + ) = manager + .update_validator_session(PreValidatorSession { + session_id, + verifier, + node_id, + pen, + }) + .await + .unwrap(); + assert!(maybe_command.is_none()); + assert!(maybe_message.is_some()); + assert_eq!(manager.send_session_data(&session_id, -43), Ok(())); + assert_eq!(data_from_network.next().await, Some(-43)); + let ManagerActions { + maybe_command, + maybe_message, + } = manager.finish_session(session_id); + assert!(maybe_command.is_none()); + assert!(maybe_message.is_none()); + assert_eq!( + manager.send_session_data(&session_id, -43), + Err(SendError::NoSession) + ); + assert!(data_from_network.next().await.is_none()); + } + + #[tokio::test] + async fn handles_broadcast() { + let mut manager = build(); + let (validator_data, verifier) = crypto_basics(NUM_NODES).await; + let (node_id, pen) = validator_data[0].clone(); + let session_id = SessionId(43); + manager + .update_validator_session(PreValidatorSession { + session_id, + verifier: verifier.clone(), + node_id, + pen, + }) + .await + .unwrap(); + let mut other_manager = build(); + let (node_id, pen) = validator_data[1].clone(); + let (ManagerActions { maybe_message, .. }, _) = other_manager + .update_validator_session(PreValidatorSession { + session_id, + verifier, + node_id, + pen, + }) + .await + .unwrap(); + let message = maybe_message.expect("there should be a discovery message"); + let (address, message) = match message { + PeerAuthentications::Both(authentication, _) => ( + authentication.0.address(), + DiscoveryMessage::Authentication(authentication), + ), + message => panic!("Expected both authentications, got {:?}", message), + }; + let ManagerActions { + maybe_command, + maybe_message, + } = manager.on_discovery_message(message); + assert_eq!( + maybe_command, + Some(ConnectionCommand::AddReserved( + iter::once(address).collect() + )) + ); + assert!(maybe_message.is_some()); + } + + #[tokio::test] + async fn sends_user_data() { + let mut manager = build(); + let (validator_data, verifier) = crypto_basics(NUM_NODES).await; + let (node_id, pen) = validator_data[0].clone(); + let session_id = SessionId(43); + manager + .update_validator_session(PreValidatorSession { + session_id, + verifier: verifier.clone(), + node_id, + pen, + }) + .await + .unwrap(); + let mut other_manager = build(); + let (node_id, pen) = validator_data[1].clone(); + let (ManagerActions { maybe_message, .. }, _) = other_manager + .update_validator_session(PreValidatorSession { + session_id, + verifier, + node_id, + pen, + }) + .await + .unwrap(); + let message = match maybe_message.expect("there should be a discovery message") { + PeerAuthentications::Both(authentication, _) => { + DiscoveryMessage::Authentication(authentication) + } + message => panic!("Expected both authentications, got {:?}", message), + }; + manager.on_discovery_message(message); + let messages = manager.on_user_message(2137, session_id, Recipient::Everyone); + assert_eq!(messages.len(), 1); + let (network_data, _) = &messages[0]; + assert_eq!( + network_data, + &DataInSession { + data: 2137, + session_id + } + ); + } +} diff --git a/finality-aleph/src/network/manager/mod.rs b/finality-aleph/src/network/manager/mod.rs index ad79ae7bc8..65a8407bbd 100644 --- a/finality-aleph/src/network/manager/mod.rs +++ b/finality-aleph/src/network/manager/mod.rs @@ -1,14 +1,26 @@ -use codec::{Decode, Encode, Error, Input, Output}; +//! Managing the validator connections using the gossip network. +use std::fmt::Display; + +use codec::{Decode, Encode}; +use futures::channel::mpsc; use crate::{ - crypto::Signature, - network::{AddressingInformation, Data}, - NodeIndex, SessionId, + crypto::{AuthorityPen, AuthorityVerifier, Signature}, + network::{ + data::{ + component::{Sender, SimpleNetwork}, + SendError, + }, + AddressingInformation, Data, + }, + NodeIndex, Recipient, SessionId, }; mod compatibility; mod connections; +mod data; mod discovery; +mod manager; mod service; mod session; @@ -16,11 +28,12 @@ pub use compatibility::{ DiscoveryMessage, LegacyDiscoveryMessage, PeerAuthentications, VersionedAuthentication, }; use connections::Connections; +#[cfg(test)] +pub use data::DataInSession; pub use discovery::Discovery; -pub use service::{ - Config as ConnectionManagerConfig, Service as ConnectionManager, SessionCommand, - IO as ConnectionIO, -}; +pub use service::{Config as ConnectionManagerConfig, ManagerError, Service as ConnectionManager}; +#[cfg(test)] +pub use session::tests::{authentication, legacy_authentication}; pub use session::{Handler as SessionHandler, HandlerError as SessionHandlerError}; /// Data validators used to use to authenticate themselves for a single session @@ -107,33 +120,59 @@ pub type LegacyAuthentication = (LegacyAuthData, Signature); /// A full authentication, consisting of a signed AuthData. pub type Authentication = (AuthData, Signature); -/// Data inside session, sent to validator network. -/// Wrapper for data send over network. We need it to ensure compatibility. -/// The order of the data and session_id is fixed in encode and the decode expects it to be data, session_id. -/// Since data is versioned, i.e. it's encoding starts with a version number in the standardized way, -/// this will allow us to retrofit versioning here if we ever need to change this structure. -#[derive(Clone, Debug, PartialEq, Eq)] -pub struct DataInSession { - pub data: D, - pub session_id: SessionId, +/// Sends data within a single session. +#[derive(Clone)] +pub struct SessionSender { + session_id: SessionId, + messages_for_network: mpsc::UnboundedSender<(D, SessionId, Recipient)>, } -impl Decode for DataInSession { - fn decode(input: &mut I) -> Result { - let data = D::decode(input)?; - let session_id = SessionId::decode(input)?; - - Ok(Self { data, session_id }) +impl Sender for SessionSender { + fn send(&self, data: D, recipient: Recipient) -> Result<(), SendError> { + self.messages_for_network + .unbounded_send((data, self.session_id, recipient)) + .map_err(|_| SendError::SendFailed) } } -impl Encode for DataInSession { - fn size_hint(&self) -> usize { - self.data.size_hint() + self.session_id.size_hint() - } - - fn encode_to(&self, dest: &mut T) { - self.data.encode_to(dest); - self.session_id.encode_to(dest); - } +/// Sends and receives data within a single session. +type Network = SimpleNetwork, SessionSender>; + +/// An interface for managing session networks for validators and nonvalidators. +#[async_trait::async_trait] +pub trait SessionManager: Send + Sync + 'static { + type Error: Display; + + /// Start participating or update the verifier in the given session where you are not a + /// validator. + fn start_nonvalidator_session( + &self, + session_id: SessionId, + verifier: AuthorityVerifier, + ) -> Result<(), Self::Error>; + + /// Start participating or update the information about the given session where you are a + /// validator. Returns a session network to be used for sending and receiving data within the + /// session. + async fn start_validator_session( + &self, + session_id: SessionId, + verifier: AuthorityVerifier, + node_id: NodeIndex, + pen: AuthorityPen, + ) -> Result, Self::Error>; + + /// Start participating or update the information about the given session where you are a + /// validator. Used for early starts when you don't yet need the returned network, but would + /// like to start discovery. + fn early_start_validator_session( + &self, + session_id: SessionId, + verifier: AuthorityVerifier, + node_id: NodeIndex, + pen: AuthorityPen, + ) -> Result<(), Self::Error>; + + /// Stop participating in the given session. + fn stop_session(&self, session_id: SessionId) -> Result<(), Self::Error>; } diff --git a/finality-aleph/src/network/manager/service.rs b/finality-aleph/src/network/manager/service.rs index d0d0978bc8..a64995d4f3 100644 --- a/finality-aleph/src/network/manager/service.rs +++ b/finality-aleph/src/network/manager/service.rs @@ -1,8 +1,6 @@ use std::{ cmp, - collections::{HashMap, HashSet}, fmt::{Debug, Display, Error as FmtError, Formatter}, - marker::PhantomData, time::Duration, }; @@ -10,7 +8,7 @@ use futures::{ channel::{mpsc, oneshot}, StreamExt, }; -use log::{debug, info, trace, warn}; +use log::{debug, trace, warn}; use tokio::time::{self, Instant}; use crate::{ @@ -19,18 +17,21 @@ use crate::{ network::{ clique::{Network as CliqueNetwork, PublicKey}, manager::{ - compatibility::PeerAuthentications, Connections, DataInSession, Discovery, - DiscoveryMessage, SessionHandler, SessionHandlerError, VersionedAuthentication, + data::DataInSession, + manager::{ + AddressedData, ConnectionCommand, Manager, ManagerActions, PreNonvalidatorSession, + PreValidatorSession, SendError, + }, + Network, SessionHandlerError, SessionManager, SessionSender, VersionedAuthentication, }, - AddressedData, AddressingInformation, ConnectionCommand, Data, GossipNetwork, - NetworkIdentity, PeerId, + AddressingInformation, Data, GossipNetwork, NetworkIdentity, }, MillisecsPerBlock, NodeIndex, SessionId, SessionPeriod, STATUS_REPORT_INTERVAL, }; /// Commands for manipulating sessions, stopping them and starting both validator and non-validator /// sessions. -pub enum SessionCommand { +enum SessionCommand { StartValidator( SessionId, AuthorityVerifier, @@ -42,29 +43,97 @@ pub enum SessionCommand { Stop(SessionId), } -struct Session> + Into>> { - handler: SessionHandler, - discovery: Discovery, - data_for_user: Option>, +/// Manages sessions for which the network should be active. +struct ManagerInterface { + commands_for_service: mpsc::UnboundedSender>, + messages_for_service: mpsc::UnboundedSender<(D, SessionId, Recipient)>, } -#[derive(Clone)] -/// Stores all data needed for starting validator session -struct PreValidatorSession { - session_id: SessionId, - verifier: AuthorityVerifier, - node_id: NodeIndex, - pen: AuthorityPen, +/// What went wrong during a session management operation. +#[derive(Debug)] +pub enum ManagerError { + CommandSendFailed, + NetworkReceiveFailed, } -#[derive(Clone)] -/// Stores all data needed for starting non-validator session -struct PreNonvalidatorSession { - session_id: SessionId, - verifier: AuthorityVerifier, +impl Display for ManagerError { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { + use ManagerError::*; + match self { + CommandSendFailed => write!(f, "failed to send a command to the service"), + NetworkReceiveFailed => write!(f, "the service did not return a network"), + } + } } -/// Configuration for the session manager service. Controls how often the maintenance and +#[async_trait::async_trait] +impl SessionManager for ManagerInterface { + type Error = ManagerError; + + fn start_nonvalidator_session( + &self, + session_id: SessionId, + verifier: AuthorityVerifier, + ) -> Result<(), Self::Error> { + self.commands_for_service + .unbounded_send(SessionCommand::StartNonvalidator(session_id, verifier)) + .map_err(|_| ManagerError::CommandSendFailed) + } + + async fn start_validator_session( + &self, + session_id: SessionId, + verifier: AuthorityVerifier, + node_id: NodeIndex, + pen: AuthorityPen, + ) -> Result, Self::Error> { + let (result_for_us, result_from_service) = oneshot::channel(); + self.commands_for_service + .unbounded_send(SessionCommand::StartValidator( + session_id, + verifier, + node_id, + pen, + Some(result_for_us), + )) + .map_err(|_| ManagerError::CommandSendFailed)?; + + let data_from_network = result_from_service + .await + .map_err(|_| ManagerError::NetworkReceiveFailed)?; + let messages_for_network = self.messages_for_service.clone(); + + Ok(Network::new( + data_from_network, + SessionSender { + session_id, + messages_for_network, + }, + )) + } + + fn early_start_validator_session( + &self, + session_id: SessionId, + verifier: AuthorityVerifier, + node_id: NodeIndex, + pen: AuthorityPen, + ) -> Result<(), Self::Error> { + self.commands_for_service + .unbounded_send(SessionCommand::StartValidator( + session_id, verifier, node_id, pen, None, + )) + .map_err(|_| ManagerError::CommandSendFailed) + } + + fn stop_session(&self, session_id: SessionId) -> Result<(), Self::Error> { + self.commands_for_service + .unbounded_send(SessionCommand::Stop(session_id)) + .map_err(|_| ManagerError::CommandSendFailed) + } +} + +/// Configuration for the session manager. Controls how often the maintenance and /// rebroadcasts are triggerred. Also controls when maintenance starts. pub struct Config { discovery_cooldown: Duration, @@ -101,473 +170,24 @@ impl Config { } } -/// Actions that the service wants to take as the result of some information. Might contain a -/// command for connecting to or disconnecting from some peers or a message to broadcast for -/// discovery purposes. -pub struct ServiceActions> + Into>> { - maybe_command: Option>, - maybe_message: Option>, -} - -impl> + Into>> ServiceActions { - fn noop() -> Self { - ServiceActions { - maybe_command: None, - maybe_message: None, - } - } -} - -/// The connection manager service. It handles the abstraction over the network we build to support -/// separate sessions. This includes: -/// 1. Starting and ending specific sessions on user demand. -/// 2. Forwarding in-session user messages to the network using session handlers for address -/// translation. -/// 3. Handling network messages: -/// 1. In-session messages are forwarded to the user. -/// 2. Authentication messages forwarded to session handlers. -/// 4. Running periodic maintenance, mostly related to node discovery. -pub struct Service -where - NI::AddressingInformation: TryFrom> + Into>, -{ - network_identity: NI, - connections: Connections, - sessions: HashMap>, - discovery_cooldown: Duration, - maintenance_period: Duration, - initial_delay: Duration, -} - -/// Error when trying to forward data from the network to the user, should never be fatal. -#[derive(Debug, PartialEq, Eq)] -pub enum SendError { - UserSend, - NoSession, -} - -impl Service -where - NI::AddressingInformation: TryFrom> + Into>, -{ - /// Create a new connection manager service. - pub fn new(network_identity: NI, config: Config) -> Self { - let Config { - discovery_cooldown, - maintenance_period, - initial_delay, - } = config; - Service { - network_identity, - connections: Connections::new(), - sessions: HashMap::new(), - discovery_cooldown, - maintenance_period, - initial_delay, - } - } - - fn delete_reserved( - to_remove: HashSet, - ) -> Option> { - match to_remove.is_empty() { - true => None, - false => Some(ConnectionCommand::DelReserved(to_remove)), - } - } - - fn finish_session( - &mut self, - session_id: SessionId, - ) -> Option> { - self.sessions.remove(&session_id); - Self::delete_reserved(self.connections.remove_session(session_id)) - } - - fn discover_authorities( - &mut self, - session_id: &SessionId, - ) -> Option> { - self.sessions.get_mut(session_id).and_then( - |Session { - handler, discovery, .. - }| { discovery.discover_authorities(handler) }, - ) - } - - /// Returns all the network messages that should be sent as part of discovery at this moment. - pub fn discovery(&mut self) -> Vec> { - let sessions: Vec<_> = self.sessions.keys().cloned().collect(); - sessions - .iter() - .flat_map(|session_id| self.discover_authorities(session_id)) - .collect() - } - - async fn start_validator_session( - &mut self, - pre_session: PreValidatorSession, - address: NI::AddressingInformation, - ) -> ( - Option>, - mpsc::UnboundedReceiver, - ) { - let PreValidatorSession { - session_id, - verifier, - node_id, - pen, - } = pre_session; - let handler = - SessionHandler::new(Some((node_id, pen)), verifier, session_id, address).await; - let discovery = Discovery::new(self.discovery_cooldown); - let (data_for_user, data_from_network) = mpsc::unbounded(); - let data_for_user = Some(data_for_user); - self.sessions.insert( - session_id, - Session { - handler, - discovery, - data_for_user, - }, - ); - (self.discover_authorities(&session_id), data_from_network) - } - - async fn update_validator_session( - &mut self, - pre_session: PreValidatorSession, - ) -> Result< - ( - ServiceActions, - mpsc::UnboundedReceiver, - ), - SessionHandlerError, - > { - let address = self.network_identity.identity(); - let session = match self.sessions.get_mut(&pre_session.session_id) { - Some(session) => session, - None => { - let (maybe_message, data_from_network) = - self.start_validator_session(pre_session, address).await; - return Ok(( - ServiceActions { - maybe_command: None, - maybe_message, - }, - data_from_network, - )); - } - }; - let PreValidatorSession { - session_id, - verifier, - node_id, - pen, - } = pre_session; - let peers_to_stay = session - .handler - .update(Some((node_id, pen)), verifier, address) - .await? - .iter() - .map(|address| address.peer_id()) - .collect(); - let maybe_command = Self::delete_reserved( - self.connections - .remove_session(session_id) - .difference(&peers_to_stay) - .cloned() - .collect(), - ); - let (data_for_user, data_from_network) = mpsc::unbounded(); - session.data_for_user = Some(data_for_user); - self.connections.add_peers(session_id, peers_to_stay); - Ok(( - ServiceActions { - maybe_command, - maybe_message: self.discover_authorities(&session_id), - }, - data_from_network, - )) - } - - async fn handle_validator_presession( - &mut self, - pre_session: PreValidatorSession, - result_for_user: Option>>, - ) -> Result, SessionHandlerError> { - self.update_validator_session(pre_session) - .await - .map(|(actions, data_from_network)| { - if let Some(result_for_user) = result_for_user { - if result_for_user.send(data_from_network).is_err() { - warn!(target: "aleph-network", "Failed to send started session.") - } - } - actions - }) - } - - async fn start_nonvalidator_session( - &mut self, - pre_session: PreNonvalidatorSession, - address: NI::AddressingInformation, - ) { - let PreNonvalidatorSession { - session_id, - verifier, - } = pre_session; - let handler = SessionHandler::new(None, verifier, session_id, address).await; - let discovery = Discovery::new(self.discovery_cooldown); - self.sessions.insert( - session_id, - Session { - handler, - discovery, - data_for_user: None, - }, - ); - } - - async fn update_nonvalidator_session( - &mut self, - pre_session: PreNonvalidatorSession, - ) -> Result<(), SessionHandlerError> { - let address = self.network_identity.identity(); - let session = match self.sessions.get_mut(&pre_session.session_id) { - Some(session) => session, - None => { - self.start_nonvalidator_session(pre_session, address).await; - return Ok(()); - } - }; - session - .handler - .update(None, pre_session.verifier, address) - .await?; - Ok(()) - } - - async fn handle_nonvalidator_presession( - &mut self, - pre_session: PreNonvalidatorSession, - ) -> Result<(), SessionHandlerError> { - self.update_nonvalidator_session(pre_session).await - } - - /// Handle a session command. - /// Returns actions the service wants to take or an error if the session command is invalid. - pub async fn on_command( - &mut self, - command: SessionCommand, - ) -> Result, SessionHandlerError> { - use SessionCommand::*; - match command { - StartValidator(session_id, verifier, node_id, pen, result_for_user) => { - let pre_session = PreValidatorSession { - session_id, - verifier, - node_id, - pen, - }; - self.handle_validator_presession(pre_session, result_for_user) - .await - } - StartNonvalidator(session_id, verifier) => { - let pre_session = PreNonvalidatorSession { - session_id, - verifier, - }; - self.handle_nonvalidator_presession(pre_session).await?; - Ok(ServiceActions::noop()) - } - Stop(session_id) => Ok(ServiceActions { - maybe_command: self.finish_session(session_id), - maybe_message: None, - }), - } - } - - /// Handle a user request for sending data. - /// Returns a list of data to be sent over the network. - pub fn on_user_message( - &self, - data: D, - session_id: SessionId, - recipient: Recipient, - ) -> Vec, NI::PeerId>> { - if let Some(handler) = self - .sessions - .get(&session_id) - .map(|session| &session.handler) - { - let to_send = DataInSession { data, session_id }; - match recipient { - Recipient::Everyone => (0..handler.node_count().0) - .map(NodeIndex) - .flat_map(|node_id| handler.peer_id(&node_id)) - .map(|peer_id| (to_send.clone(), peer_id)) - .collect(), - Recipient::Node(node_id) => handler - .peer_id(&node_id) - .into_iter() - .map(|peer_id| (to_send.clone(), peer_id)) - .collect(), - } - } else { - Vec::new() - } - } - - /// Handle a discovery message. - /// Returns actions the service wants to take. - pub fn on_discovery_message( - &mut self, - message: DiscoveryMessage, - ) -> ServiceActions { - use DiscoveryMessage::*; - let session_id = message.session_id(); - match self.sessions.get_mut(&session_id) { - Some(Session { - handler, discovery, .. - }) => { - let (maybe_address, maybe_message) = match message { - Authentication(authentication) => { - discovery.handle_authentication(authentication, handler) - } - LegacyAuthentication(legacy_authentication) => { - discovery.handle_legacy_authentication(legacy_authentication, handler) - } - }; - let maybe_command = match (maybe_address, handler.is_validator()) { - (Some(address), true) => { - debug!(target: "aleph-network", "Adding addresses for session {:?} to reserved: {:?}", session_id, address); - self.connections.add_peers(session_id, [address.peer_id()]); - Some(ConnectionCommand::AddReserved([address].into())) - } - _ => None, - }; - ServiceActions { - maybe_command, - maybe_message, - } - } - None => { - debug!(target: "aleph-network", "Received message from unknown session: {:?}", message); - ServiceActions::noop() - } - } - } - - /// Sends the data to the identified session. - pub fn send_session_data(&self, session_id: &SessionId, data: D) -> Result<(), SendError> { - match self - .sessions - .get(session_id) - .and_then(|session| session.data_for_user.as_ref()) - { - Some(data_for_user) => data_for_user - .unbounded_send(data) - .map_err(|_| SendError::UserSend), - None => Err(SendError::NoSession), - } - } - - pub fn status_report(&self) { - let mut status = String::from("Connection Manager status report: "); - - let mut authenticated: Vec<_> = self - .sessions - .iter() - .filter(|(_, session)| session.handler.authentication().is_some()) - .map(|(session_id, session)| { - let mut peers = session - .handler - .peers() - .into_iter() - .map(|(node_id, peer_id)| (node_id.0, peer_id)) - .collect::>(); - peers.sort_by(|x, y| x.0.cmp(&y.0)); - (session_id.0, session.handler.node_count().0, peers) - }) - .collect(); - authenticated.sort_by(|x, y| x.0.cmp(&y.0)); - if !authenticated.is_empty() { - let authenticated_status = authenticated - .iter() - .map(|(session_id, node_count, peers)| { - let peer_ids = peers - .iter() - .map(|(node_id, peer_id)| { - format!("{:?}: {}", node_id, peer_id.to_short_string()) - }) - .collect::>() - .join(", "); - - format!( - "{:?}: {}/{} {{{}}}", - session_id, - peers.len() + 1, - node_count, - peer_ids - ) - }) - .collect::>() - .join(", "); - status.push_str(&format!( - "authenticated authorities: {}; ", - authenticated_status - )); - } - - let mut missing: Vec<_> = self - .sessions - .iter() - .filter(|(_, session)| session.handler.authentication().is_some()) - .map(|(session_id, session)| { - ( - session_id.0, - session - .handler - .missing_nodes() - .iter() - .map(|id| id.0) - .collect::>(), - ) - }) - .filter(|(_, missing)| !missing.is_empty()) - .collect(); - missing.sort_by(|x, y| x.0.cmp(&y.0)); - if !missing.is_empty() { - let missing_status = missing - .iter() - .map(|(session_id, missing)| format!("{:?}: {:?}", session_id, missing)) - .collect::>() - .join(", "); - status.push_str(&format!("missing authorities: {}; ", missing_status)); - } - - if !authenticated.is_empty() || !missing.is_empty() { - info!(target: "aleph-network", "{}", status); - } - } -} - -/// Input/output interface for the connection manager service. -pub struct IO< +/// The connection manager service. +pub struct Service< D: Data, - M: Data, - A: AddressingInformation + TryFrom> + Into>, - CN: CliqueNetwork>, - GN: GossipNetwork>, + M: Data + Debug, + NI: NetworkIdentity, + CN: CliqueNetwork>, + GN: GossipNetwork>, > where - A::PeerId: PublicKey, + NI::PeerId: PublicKey, + NI::AddressingInformation: TryFrom> + Into>, { + manager: Manager, commands_from_user: mpsc::UnboundedReceiver>, messages_from_user: mpsc::UnboundedReceiver<(D, SessionId, Recipient)>, validator_network: CN, gossip_network: GN, - _phantom: PhantomData<(M, A)>, + maintenance_period: Duration, + initial_delay: Duration, } /// Errors that can happen during the network service operations. @@ -594,35 +214,55 @@ impl Display for Error { impl< D: Data, M: Data + Debug, - A: AddressingInformation + TryFrom> + Into>, - CN: CliqueNetwork>, - GN: GossipNetwork>, - > IO + NI: NetworkIdentity, + CN: CliqueNetwork>, + GN: GossipNetwork>, + > Service where - A::PeerId: PublicKey, + NI::PeerId: PublicKey, + NI::AddressingInformation: TryFrom> + Into>, { pub fn new( - commands_from_user: mpsc::UnboundedReceiver>, - messages_from_user: mpsc::UnboundedReceiver<(D, SessionId, Recipient)>, + network_identity: NI, validator_network: CN, gossip_network: GN, - ) -> IO { - IO { - commands_from_user, - messages_from_user, - validator_network, - gossip_network, - _phantom: PhantomData, - } + config: Config, + ) -> ( + Service, + impl SessionManager, + ) { + let Config { + discovery_cooldown, + maintenance_period, + initial_delay, + } = config; + let manager = Manager::new(network_identity, discovery_cooldown); + let (commands_for_service, commands_from_user) = mpsc::unbounded(); + let (messages_for_service, messages_from_user) = mpsc::unbounded(); + ( + Service { + manager, + commands_from_user, + messages_from_user, + validator_network, + gossip_network, + maintenance_period, + initial_delay, + }, + ManagerInterface { + commands_for_service, + messages_for_service, + }, + ) } - fn send_data(&self, to_send: AddressedData, A::PeerId>) { + fn send_data(&self, to_send: AddressedData, NI::PeerId>) { self.validator_network.send(to_send.0, to_send.1) } fn send_authentications( &mut self, - to_send: Vec>, + to_send: Vec>, ) -> Result<(), Error> { for auth in to_send { self.gossip_network @@ -632,7 +272,10 @@ where Ok(()) } - fn handle_connection_command(&mut self, connection_command: ConnectionCommand) { + fn handle_connection_command( + &mut self, + connection_command: ConnectionCommand, + ) { match connection_command { ConnectionCommand::AddReserved(addresses) => { for address in addresses { @@ -648,12 +291,12 @@ where }; } - fn handle_service_actions( + fn handle_manager_actions( &mut self, - ServiceActions { + ManagerActions { maybe_command, maybe_message, - }: ServiceActions, + }: ManagerActions, ) -> Result<(), Error> { if let Some(command) = maybe_command { self.handle_connection_command(command); @@ -664,17 +307,47 @@ where Ok(()) } - /// Run the connection manager service with this IO. - pub async fn run>( - mut self, - mut service: Service, - ) -> Result<(), Error> { + /// Handle a session command. + /// Returns actions the manager wants to take or an error if the session command is invalid. + async fn handle_command( + &mut self, + command: SessionCommand, + ) -> Result, SessionHandlerError> { + use SessionCommand::*; + match command { + StartValidator(session_id, verifier, node_id, pen, result_for_user) => { + let pre_session = PreValidatorSession { + session_id, + verifier, + node_id, + pen, + }; + let (actions, data_from_network) = + self.manager.update_validator_session(pre_session).await?; + if let Some(result_for_user) = result_for_user { + if result_for_user.send(data_from_network).is_err() { + warn!(target: "aleph-network", "Failed to send started session.") + } + } + Ok(actions) + } + StartNonvalidator(session_id, verifier) => { + let pre_session = PreNonvalidatorSession { + session_id, + verifier, + }; + self.manager.update_nonvalidator_session(pre_session).await + } + Stop(session_id) => Ok(self.manager.finish_session(session_id)), + } + } + + /// Run the connection manager service. + pub async fn run(mut self) -> Result<(), Error> { // Initial delay is needed so that Network is fully set up and we received some first discovery broadcasts from other nodes. // Otherwise this might cause first maintenance never working, as it happens before first broadcasts. - let mut maintenance = time::interval_at( - Instant::now() + service.initial_delay, - service.maintenance_period, - ); + let mut maintenance = + time::interval_at(Instant::now() + self.initial_delay, self.maintenance_period); let mut status_ticker = time::interval(STATUS_REPORT_INTERVAL); loop { @@ -683,8 +356,8 @@ where maybe_command = self.commands_from_user.next() => { trace!(target: "aleph-network", "Manager received a command from user"); match maybe_command { - Some(command) => match service.on_command(command).await { - Ok(to_send) => self.handle_service_actions(to_send)?, + Some(command) => match self.handle_command(command).await { + Ok(to_send) => self.handle_manager_actions(to_send)?, Err(e) => warn!(target: "aleph-network", "Failed to update handler: {:?}", e), }, None => return Err(Error::CommandsChannel), @@ -693,7 +366,7 @@ where maybe_message = self.messages_from_user.next() => { trace!(target: "aleph-network", "Manager received a message from user"); match maybe_message { - Some((message, session_id, recipient)) => for message in service.on_user_message(message, session_id, recipient) { + Some((message, session_id, recipient)) => for message in self.manager.on_user_message(message, session_id, recipient) { self.send_data(message); }, None => return Err(Error::MessageChannel), @@ -702,7 +375,7 @@ where maybe_data = self.validator_network.next() => { trace!(target: "aleph-network", "Manager received some data from network"); match maybe_data { - Some(DataInSession{data, session_id}) => if let Err(e) = service.send_session_data(&session_id, data) { + Some(DataInSession{data, session_id}) => if let Err(e) = self.manager.send_session_data(&session_id, data) { match e { SendError::UserSend => trace!(target: "aleph-network", "Failed to send to user in session."), SendError::NoSession => trace!(target: "aleph-network", "Received message for unknown session."), @@ -715,225 +388,23 @@ where let authentication = maybe_authentication.map_err(Error::GossipNetwork)?; trace!(target: "aleph-network", "Manager received an authentication from network"); match authentication.try_into() { - Ok(message) => self.handle_service_actions(service.on_discovery_message(message))?, + Ok(message) => { + let manager_actions = self.manager.on_discovery_message(message); + self.handle_manager_actions(manager_actions)? + }, Err(e) => warn!(target: "aleph-network", "Error casting versioned authentication to discovery message: {:?}", e), } }, _ = maintenance.tick() => { debug!(target: "aleph-network", "Manager starts maintenence"); - for to_send in service.discovery() { + for to_send in self.manager.discovery() { self.send_authentications(to_send.into())?; } }, _ = status_ticker.tick() => { - service.status_report(); + self.manager.status_report(); } } } } } - -#[cfg(test)] -mod tests { - use std::{iter, time::Duration}; - - use futures::{channel::oneshot, StreamExt}; - - use super::{Config, SendError, Service, ServiceActions, SessionCommand}; - use crate::{ - network::{ - clique::mock::{random_address, MockAddressingInformation}, - manager::{compatibility::PeerAuthentications, DataInSession, DiscoveryMessage}, - mock::crypto_basics, - ConnectionCommand, - }, - Recipient, SessionId, - }; - - const NUM_NODES: usize = 7; - const MAINTENANCE_PERIOD: Duration = Duration::from_secs(120); - const DISCOVERY_PERIOD: Duration = Duration::from_secs(60); - const INITIAL_DELAY: Duration = Duration::from_secs(5); - - fn build() -> Service { - Service::new( - random_address(), - Config::new(MAINTENANCE_PERIOD, DISCOVERY_PERIOD, INITIAL_DELAY), - ) - } - - #[tokio::test] - async fn starts_nonvalidator_session() { - let mut service = build(); - let (_, verifier) = crypto_basics(NUM_NODES).await; - let session_id = SessionId(43); - let ServiceActions { - maybe_command, - maybe_message, - } = service - .on_command(SessionCommand::StartNonvalidator(session_id, verifier)) - .await - .unwrap(); - assert!(maybe_command.is_none()); - assert!(maybe_message.is_none()); - assert_eq!( - service.send_session_data(&session_id, -43), - Err(SendError::NoSession) - ); - } - - #[tokio::test] - async fn starts_validator_session() { - let mut service = build(); - let (validator_data, verifier) = crypto_basics(NUM_NODES).await; - let (node_id, pen) = validator_data[0].clone(); - let session_id = SessionId(43); - let (result_for_user, result_from_service) = oneshot::channel(); - let ServiceActions { - maybe_command, - maybe_message, - } = service - .on_command(SessionCommand::StartValidator( - session_id, - verifier, - node_id, - pen, - Some(result_for_user), - )) - .await - .unwrap(); - assert!(maybe_command.is_none()); - assert!(maybe_message.is_some()); - let _data_from_network = result_from_service.await.unwrap(); - assert_eq!(service.send_session_data(&session_id, -43), Ok(())); - } - - #[tokio::test] - async fn stops_session() { - let mut service = build(); - let (validator_data, verifier) = crypto_basics(NUM_NODES).await; - let (node_id, pen) = validator_data[0].clone(); - let session_id = SessionId(43); - let (result_for_user, result_from_service) = oneshot::channel(); - let ServiceActions { - maybe_command, - maybe_message, - } = service - .on_command(SessionCommand::StartValidator( - session_id, - verifier, - node_id, - pen, - Some(result_for_user), - )) - .await - .unwrap(); - assert!(maybe_command.is_none()); - assert!(maybe_message.is_some()); - assert_eq!(service.send_session_data(&session_id, -43), Ok(())); - let mut data_from_network = result_from_service.await.unwrap(); - assert_eq!(data_from_network.next().await, Some(-43)); - let ServiceActions { - maybe_command, - maybe_message, - } = service - .on_command(SessionCommand::Stop(session_id)) - .await - .unwrap(); - assert!(maybe_command.is_none()); - assert!(maybe_message.is_none()); - assert_eq!( - service.send_session_data(&session_id, -43), - Err(SendError::NoSession) - ); - assert!(data_from_network.next().await.is_none()); - } - - #[tokio::test] - async fn handles_broadcast() { - let mut service = build(); - let (validator_data, verifier) = crypto_basics(NUM_NODES).await; - let (node_id, pen) = validator_data[0].clone(); - let session_id = SessionId(43); - service - .on_command(SessionCommand::StartValidator( - session_id, - verifier.clone(), - node_id, - pen, - None, - )) - .await - .unwrap(); - let mut other_service = build(); - let (node_id, pen) = validator_data[1].clone(); - let ServiceActions { maybe_message, .. } = other_service - .on_command(SessionCommand::StartValidator( - session_id, verifier, node_id, pen, None, - )) - .await - .unwrap(); - let message = maybe_message.expect("there should be a discovery message"); - let (address, message) = match message { - PeerAuthentications::Both(authentication, _) => ( - authentication.0.address(), - DiscoveryMessage::Authentication(authentication), - ), - message => panic!("Expected both authentications, got {:?}", message), - }; - let ServiceActions { - maybe_command, - maybe_message, - } = service.on_discovery_message(message); - assert_eq!( - maybe_command, - Some(ConnectionCommand::AddReserved( - iter::once(address).collect() - )) - ); - assert!(maybe_message.is_some()); - } - - #[tokio::test] - async fn sends_user_data() { - let mut service = build(); - let (validator_data, verifier) = crypto_basics(NUM_NODES).await; - let (node_id, pen) = validator_data[0].clone(); - let session_id = SessionId(43); - service - .on_command(SessionCommand::StartValidator( - session_id, - verifier.clone(), - node_id, - pen, - None, - )) - .await - .unwrap(); - let mut other_service = build(); - let (node_id, pen) = validator_data[1].clone(); - let ServiceActions { maybe_message, .. } = other_service - .on_command(SessionCommand::StartValidator( - session_id, verifier, node_id, pen, None, - )) - .await - .unwrap(); - let message = match maybe_message.expect("there should be a discovery message") { - PeerAuthentications::Both(authentication, _) => { - DiscoveryMessage::Authentication(authentication) - } - message => panic!("Expected both authentications, got {:?}", message), - }; - service.on_discovery_message(message); - let messages = service.on_user_message(2137, session_id, Recipient::Everyone); - assert_eq!(messages.len(), 1); - let (network_data, _) = &messages[0]; - assert_eq!( - network_data, - &DataInSession { - data: 2137, - session_id - } - ); - } -} diff --git a/finality-aleph/src/network/manager/session.rs b/finality-aleph/src/network/manager/session.rs index 009786f946..4d338dcfa5 100644 --- a/finality-aleph/src/network/manager/session.rs +++ b/finality-aleph/src/network/manager/session.rs @@ -269,18 +269,42 @@ impl> + Into>> Handler } #[cfg(test)] -mod tests { +pub mod tests { use super::{Handler, HandlerError}; use crate::{ network::{ - clique::mock::{random_address, random_invalid_address}, + clique::mock::{random_address, random_invalid_address, MockAddressingInformation}, + manager::{compatibility::PeerAuthentications, Authentication, LegacyAuthentication}, mock::crypto_basics, - testing::{authentication, legacy_authentication}, AddressingInformation, }, NodeIndex, SessionId, }; + pub fn legacy_authentication( + handler: &Handler, + ) -> LegacyAuthentication { + match handler + .authentication() + .expect("this is a validator handler") + { + PeerAuthentications::Both(_, authentication) => authentication, + _ => panic!("handler doesn't have both authentications"), + } + } + + pub fn authentication( + handler: &Handler, + ) -> Authentication { + match handler + .authentication() + .expect("this is a validator handler") + { + PeerAuthentications::Both(authentication, _) => authentication, + _ => panic!("handler doesn't have both authentications"), + } + } + const NUM_NODES: usize = 7; #[tokio::test] diff --git a/finality-aleph/src/network/mod.rs b/finality-aleph/src/network/mod.rs index 6111f2e4b7..6d71284000 100644 --- a/finality-aleph/src/network/mod.rs +++ b/finality-aleph/src/network/mod.rs @@ -1,5 +1,4 @@ use std::{ - collections::HashSet, fmt::{Debug, Display}, hash::Hash, }; @@ -11,58 +10,16 @@ use sp_runtime::traits::Block; pub mod clique; pub mod data; mod gossip; -mod io; -mod manager; +pub mod manager; #[cfg(test)] pub mod mock; -mod session; mod substrate; pub mod tcp; +#[cfg(test)] +pub use gossip::mock::{MockEvent, MockRawNetwork}; pub use gossip::{Network as GossipNetwork, Protocol, Service as GossipService}; -pub use io::setup as setup_io; -use manager::SessionCommand; -pub use manager::{ - ConnectionIO as ConnectionManagerIO, ConnectionManager, ConnectionManagerConfig, -}; -pub use session::{Manager as SessionManager, ManagerError, SessionSender, IO as SessionManagerIO}; pub use substrate::protocol_name; -#[cfg(test)] -pub mod testing { - use super::manager::LegacyAuthentication; - pub use super::{ - clique::mock::MockAddressingInformation, - gossip::mock::{MockEvent, MockRawNetwork}, - manager::{ - Authentication, DataInSession, DiscoveryMessage, LegacyDiscoveryMessage, - PeerAuthentications, SessionHandler, VersionedAuthentication, - }, - }; - - pub fn legacy_authentication( - handler: &SessionHandler, - ) -> LegacyAuthentication { - match handler - .authentication() - .expect("this is a validator handler") - { - PeerAuthentications::Both(_, authentication) => authentication, - _ => panic!("handler doesn't have both authentications"), - } - } - - pub fn authentication( - handler: &SessionHandler, - ) -> Authentication { - match handler - .authentication() - .expect("this is a validator handler") - { - PeerAuthentications::Both(authentication, _) => authentication, - _ => panic!("handler doesn't have both authentications"), - } - } -} /// Represents the id of an arbitrary node. pub trait PeerId: PartialEq + Eq + Clone + Debug + Display + Hash + Codec + Send { @@ -121,17 +78,7 @@ pub trait RequestBlocks: Clone + Send + Sync + 'static { fn is_major_syncing(&self) -> bool; } -/// Commands for manipulating the reserved peers set. -#[derive(Debug, PartialEq, Eq)] -pub enum ConnectionCommand { - AddReserved(HashSet), - DelReserved(HashSet), -} - /// A basic alias for properties we expect basic data to satisfy. pub trait Data: Clone + Codec + Send + Sync + 'static {} impl Data for D {} - -// In practice D: Data and P: PeerId, but we cannot require that in type aliases. -type AddressedData = (D, P); diff --git a/finality-aleph/src/network/session.rs b/finality-aleph/src/network/session.rs deleted file mode 100644 index 9b91ef77bd..0000000000 --- a/finality-aleph/src/network/session.rs +++ /dev/null @@ -1,143 +0,0 @@ -use futures::channel::{mpsc, oneshot}; - -use crate::{ - abft::Recipient, - crypto::{AuthorityPen, AuthorityVerifier}, - network::{ - data::{ - component::{Sender, SimpleNetwork}, - SendError, - }, - Data, SessionCommand, - }, - NodeIndex, SessionId, -}; - -/// Sends data within a single session. -#[derive(Clone)] -pub struct SessionSender { - session_id: SessionId, - messages_for_network: mpsc::UnboundedSender<(D, SessionId, Recipient)>, -} - -impl Sender for SessionSender { - fn send(&self, data: D, recipient: Recipient) -> Result<(), SendError> { - self.messages_for_network - .unbounded_send((data, self.session_id, recipient)) - .map_err(|_| SendError::SendFailed) - } -} - -/// Sends and receives data within a single session. -type Network = SimpleNetwork, SessionSender>; - -/// Manages sessions for which the network should be active. -pub struct Manager { - commands_for_service: mpsc::UnboundedSender>, - messages_for_service: mpsc::UnboundedSender<(D, SessionId, Recipient)>, -} - -/// What went wrong during a session management operation. -#[derive(Debug)] -pub enum ManagerError { - CommandSendFailed, - NetworkReceiveFailed, -} - -pub struct IO { - pub commands_for_service: mpsc::UnboundedSender>, - pub messages_for_service: mpsc::UnboundedSender<(D, SessionId, Recipient)>, -} - -impl IO { - pub fn new( - commands_for_service: mpsc::UnboundedSender>, - messages_for_service: mpsc::UnboundedSender<(D, SessionId, Recipient)>, - ) -> Self { - IO { - commands_for_service, - messages_for_service, - } - } -} - -impl Manager { - /// Create a new manager with the given channels to the service. - pub fn new(io: IO) -> Self { - Manager { - commands_for_service: io.commands_for_service, - messages_for_service: io.messages_for_service, - } - } - - /// Start participating or update the verifier in the given session where you are not a - /// validator. - pub fn start_nonvalidator_session( - &self, - session_id: SessionId, - verifier: AuthorityVerifier, - ) -> Result<(), ManagerError> { - self.commands_for_service - .unbounded_send(SessionCommand::StartNonvalidator(session_id, verifier)) - .map_err(|_| ManagerError::CommandSendFailed) - } - - /// Start participating or update the information about the given session where you are a - /// validator. Returns a session network to be used for sending and receiving data within the - /// session. - pub async fn start_validator_session( - &self, - session_id: SessionId, - verifier: AuthorityVerifier, - node_id: NodeIndex, - pen: AuthorityPen, - ) -> Result, ManagerError> { - let (result_for_us, result_from_service) = oneshot::channel(); - self.commands_for_service - .unbounded_send(SessionCommand::StartValidator( - session_id, - verifier, - node_id, - pen, - Some(result_for_us), - )) - .map_err(|_| ManagerError::CommandSendFailed)?; - - let data_from_network = result_from_service - .await - .map_err(|_| ManagerError::NetworkReceiveFailed)?; - let messages_for_network = self.messages_for_service.clone(); - - Ok(Network::new( - data_from_network, - SessionSender { - session_id, - messages_for_network, - }, - )) - } - - /// Start participating or update the information about the given session where you are a - /// validator. Used for early starts when you don't yet need the returned network, but would - /// like to start discovery. - pub fn early_start_validator_session( - &self, - session_id: SessionId, - verifier: AuthorityVerifier, - node_id: NodeIndex, - pen: AuthorityPen, - ) -> Result<(), ManagerError> { - self.commands_for_service - .unbounded_send(SessionCommand::StartValidator( - session_id, verifier, node_id, pen, None, - )) - .map_err(|_| ManagerError::CommandSendFailed) - } - - /// Stop participating in the given session. - pub fn stop_session(&self, session_id: SessionId) -> Result<(), ManagerError> { - self.commands_for_service - .unbounded_send(SessionCommand::Stop(session_id)) - .map_err(|_| ManagerError::CommandSendFailed) - } -} diff --git a/finality-aleph/src/nodes/validator_node.rs b/finality-aleph/src/nodes/validator_node.rs index 50fb08f597..55cf1c52d4 100644 --- a/finality-aleph/src/nodes/validator_node.rs +++ b/finality-aleph/src/nodes/validator_node.rs @@ -13,9 +13,9 @@ use crate::{ crypto::AuthorityPen, network::{ clique::Service, - setup_io, + manager::{ConnectionManager, ConnectionManagerConfig}, tcp::{new_tcp_network, KEY_TYPE}, - ConnectionManager, ConnectionManagerConfig, GossipService, SessionManager, + GossipService, }, nodes::{setup_justification_handler, JustificationParams}, party::{ @@ -119,21 +119,19 @@ where session_map: session_authorities.clone(), }); - let (connection_io, session_io) = setup_io(validator_network, gossip_network); - - let connection_manager = ConnectionManager::new( + let (connection_manager_service, connection_manager) = ConnectionManager::new( network_identity, + validator_network, + gossip_network, ConnectionManagerConfig::with_session_period(&session_period, &millisecs_per_block), ); let connection_manager_task = async move { - if let Err(e) = connection_io.run(connection_manager).await { + if let Err(e) = connection_manager_service.run().await { panic!("Failed to run connection manager: {}", e); } }; - let session_manager = SessionManager::new(session_io); - spawn_handle.spawn("aleph/justification_handler", None, handler_task); debug!(target: "aleph-party", "JustificationHandler has started."); @@ -158,7 +156,7 @@ where block_requester, metrics, spawn_handle.into(), - session_manager, + connection_manager, keystore, ), _phantom: PhantomData, diff --git a/finality-aleph/src/party/manager/mod.rs b/finality-aleph/src/party/manager/mod.rs index 199e5c9092..f50d0b9a1b 100644 --- a/finality-aleph/src/party/manager/mod.rs +++ b/finality-aleph/src/party/manager/mod.rs @@ -1,4 +1,4 @@ -use std::{collections::HashSet, fmt::Debug, marker::PhantomData, sync::Arc}; +use std::{collections::HashSet, marker::PhantomData, sync::Arc}; use aleph_primitives::{AlephSessionApi, KEY_TYPE}; use async_trait::async_trait; @@ -25,7 +25,8 @@ use crate::{ component::{Network, NetworkMap, SimpleNetwork}, split::split, }, - ManagerError, RequestBlocks, SessionManager, SessionSender, + manager::{SessionManager, SessionSender}, + RequestBlocks, }, party::{ backup::ABFTBackup, manager::aggregator::AggregatorVersion, traits::NodeSessionManager, @@ -87,13 +88,14 @@ where phantom: PhantomData, } -pub struct NodeSessionManagerImpl +pub struct NodeSessionManagerImpl where B: BlockT, C: crate::ClientForAleph + Send + Sync + 'static, BE: Backend + 'static, SC: SelectChain + 'static, RB: RequestBlocks, + SM: SessionManager> + 'static, { client: Arc, select_chain: SC, @@ -103,12 +105,12 @@ where block_requester: RB, metrics: Option::Hash>>, spawn_handle: SpawnHandle, - session_manager: SessionManager>, + session_manager: SM, keystore: Arc, _phantom: PhantomData, } -impl NodeSessionManagerImpl +impl NodeSessionManagerImpl where B: BlockT, C: crate::ClientForAleph + Send + Sync + 'static, @@ -116,6 +118,7 @@ where BE: Backend + 'static, SC: SelectChain + 'static, RB: RequestBlocks, + SM: SessionManager>, { #[allow(clippy::too_many_arguments)] pub fn new( @@ -127,7 +130,7 @@ where block_requester: RB, metrics: Option::Hash>>, spawn_handle: SpawnHandle, - session_manager: SessionManager>, + session_manager: SM, keystore: Arc, ) -> Self { Self { @@ -305,11 +308,14 @@ where justifications_for_chain: self.authority_justification_tx.clone(), }; - let data_network = self + let data_network = match self .session_manager .start_validator_session(session_id, authority_verifier, node_id, authority_pen) .await - .expect("Failed to start validator session!"); + { + Ok(data_network) => data_network, + Err(e) => panic!("Failed to start validator session: {}", e), + }; let last_block_of_previous_session = session_boundaries .first_block() @@ -359,14 +365,8 @@ where } } -#[derive(Debug)] -pub enum SessionManagerError { - NotAuthority, - ManagerError(ManagerError), -} - #[async_trait] -impl NodeSessionManager for NodeSessionManagerImpl +impl NodeSessionManager for NodeSessionManagerImpl where B: BlockT, C: crate::ClientForAleph + Send + Sync + 'static, @@ -374,8 +374,9 @@ where BE: Backend + 'static, SC: SelectChain + 'static, RB: RequestBlocks, + SM: SessionManager>, { - type Error = SessionManagerError; + type Error = SM::Error; async fn spawn_authority_task_for_session( &self, @@ -404,20 +405,20 @@ where async fn early_start_validator_session( &self, session: SessionId, + node_id: NodeIndex, authorities: &[AuthorityId], ) -> Result<(), Self::Error> { - let node_id = match self.node_idx(authorities).await { - Some(id) => id, - None => return Err(SessionManagerError::NotAuthority), - }; let authority_verifier = AuthorityVerifier::new(authorities.to_vec()); let authority_pen = AuthorityPen::new(authorities[node_id.0].clone(), self.keystore.clone()) .await .expect("The keys should sign successfully"); - self.session_manager - .early_start_validator_session(session, authority_verifier, node_id, authority_pen) - .map_err(SessionManagerError::ManagerError) + self.session_manager.early_start_validator_session( + session, + authority_verifier, + node_id, + authority_pen, + ) } fn start_nonvalidator_session( @@ -429,13 +430,10 @@ where self.session_manager .start_nonvalidator_session(session, authority_verifier) - .map_err(SessionManagerError::ManagerError) } fn stop_session(&self, session: SessionId) -> Result<(), Self::Error> { - self.session_manager - .stop_session(session) - .map_err(SessionManagerError::ManagerError) + self.session_manager.stop_session(session) } async fn node_idx(&self, authorities: &[AuthorityId]) -> Option { diff --git a/finality-aleph/src/party/mocks.rs b/finality-aleph/src/party/mocks.rs index 2a0d78441b..21977bc100 100644 --- a/finality-aleph/src/party/mocks.rs +++ b/finality-aleph/src/party/mocks.rs @@ -1,5 +1,6 @@ use std::{ collections::HashSet, + fmt::{Debug, Display, Error as FmtError, Formatter}, hash::Hash, sync::{Arc, Mutex}, }; @@ -111,9 +112,17 @@ impl MockNodeSessionManager { } } +pub struct MockNodeSessionManagerError; + +impl Display for MockNodeSessionManagerError { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { + write!(f, "mock node session manager error") + } +} + #[async_trait] impl NodeSessionManager for Arc { - type Error = (); + type Error = MockNodeSessionManagerError; async fn spawn_authority_task_for_session( &self, @@ -133,6 +142,7 @@ impl NodeSessionManager for Arc { async fn early_start_validator_session( &self, session: SessionId, + _node_id: NodeIndex, _authorities: &[AuthorityId], ) -> Result<(), Self::Error> { self.insert(self.session_early_started.clone(), session); diff --git a/finality-aleph/src/party/mod.rs b/finality-aleph/src/party/mod.rs index fd46336da6..50c9c78e3f 100644 --- a/finality-aleph/src/party/mod.rs +++ b/finality-aleph/src/party/mod.rs @@ -154,7 +154,7 @@ where .session_manager .start_nonvalidator_session(session_id, authorities) { - warn!(target: "aleph-party", "Failed to start nonvalidator session{:?}:{:?}", session_id, e); + warn!(target: "aleph-party", "Failed to start nonvalidator session{:?}: {}", session_id, e); } None }; @@ -194,21 +194,22 @@ where } => { let next_session_authorities = next_session_authority_data.authorities(); match self.session_manager.node_idx(next_session_authorities).await { - Some(_) => if let Err(e) = self + Some(next_session_node_id) => if let Err(e) = self .session_manager .early_start_validator_session( next_session_id, + next_session_node_id, next_session_authorities, ).await { - warn!(target: "aleph-party", "Failed to early start validator session{:?}:{:?}", next_session_id, e); + warn!(target: "aleph-party", "Failed to early start validator session{:?}: {}", next_session_id, e); } None => { if let Err(e) = self .session_manager .start_nonvalidator_session(next_session_id, next_session_authorities) { - warn!(target: "aleph-party", "Failed to early start nonvalidator session{:?}:{:?}", next_session_id, e); + warn!(target: "aleph-party", "Failed to early start nonvalidator session{:?}: {}", next_session_id, e); } } } @@ -232,7 +233,7 @@ where } } if let Err(e) = self.session_manager.stop_session(session_id) { - warn!(target: "aleph-party", "Session Manager failed to stop in session {:?}: {:?}", session_id, e) + warn!(target: "aleph-party", "Session Manager failed to stop in session {:?}: {}", session_id, e) } } diff --git a/finality-aleph/src/party/traits.rs b/finality-aleph/src/party/traits.rs index 74c139a284..21198d737f 100644 --- a/finality-aleph/src/party/traits.rs +++ b/finality-aleph/src/party/traits.rs @@ -1,4 +1,4 @@ -use std::fmt::Debug; +use std::fmt::{Debug, Display}; use async_trait::async_trait; use sp_runtime::traits::{Block as BlockT, NumberFor}; @@ -34,7 +34,7 @@ pub trait ChainState { #[async_trait] /// Abstraction over session related tasks. pub trait NodeSessionManager { - type Error: Debug; + type Error: Display; /// Spawns every task needed for an authority to run in a session. async fn spawn_authority_task_for_session( @@ -49,6 +49,7 @@ pub trait NodeSessionManager { async fn early_start_validator_session( &self, session: SessionId, + node_id: NodeIndex, authorities: &[AuthorityId], ) -> Result<(), Self::Error>; diff --git a/finality-aleph/src/testing/network.rs b/finality-aleph/src/testing/network.rs index 07806ef495..064396197d 100644 --- a/finality-aleph/src/testing/network.rs +++ b/finality-aleph/src/testing/network.rs @@ -17,14 +17,13 @@ use crate::{ MockPublicKey, }, data::Network, - mock::{crypto_basics, MockData}, - setup_io, - testing::{ - authentication, legacy_authentication, DataInSession, LegacyDiscoveryMessage, - MockEvent, MockRawNetwork, SessionHandler, VersionedAuthentication, + manager::{ + authentication, legacy_authentication, ConnectionManager, ConnectionManagerConfig, + DataInSession, LegacyDiscoveryMessage, ManagerError, SessionHandler, SessionManager, + VersionedAuthentication, }, - AddressingInformation, ConnectionManager, ConnectionManagerConfig, GossipService, - NetworkIdentity, Protocol, SessionManager, + mock::{crypto_basics, MockData}, + AddressingInformation, GossipService, MockEvent, MockRawNetwork, Protocol, }, MillisecsPerBlock, NodeIndex, Recipient, SessionId, SessionPeriod, }; @@ -60,19 +59,10 @@ impl Authority { } } -impl NetworkIdentity for Authority { - type PeerId = MockPublicKey; - type AddressingInformation = MockAddressingInformation; - - fn identity(&self) -> Self::AddressingInformation { - self.address.clone() - } -} - struct TestData { pub authorities: Vec, pub authority_verifier: AuthorityVerifier, - pub session_manager: SessionManager, + pub session_manager: Box>, pub network: MockRawNetwork, pub validator_network: MockCliqueNetwork>, network_manager_exit_tx: oneshot::Sender<()>, @@ -108,19 +98,17 @@ async fn prepare_one_session_test_data() -> TestData { let (gossip_service, gossip_network) = GossipService::new(network.clone(), task_manager.spawn_handle()); - let (connection_io, session_io) = setup_io(validator_network.clone(), gossip_network); - - let connection_manager = ConnectionManager::new( - authorities[0].clone(), + let (connection_manager_service, session_manager) = ConnectionManager::new( + authorities[0].address(), + validator_network.clone(), + gossip_network, ConnectionManagerConfig::with_session_period(&SESSION_PERIOD, &MILLISECS_PER_BLOCK), ); - - let session_manager = SessionManager::new(session_io); + let session_manager = Box::new(session_manager); let network_manager_task = async move { tokio::select! { - _ = connection_io - .run(connection_manager) => { }, + _ =connection_manager_service.run() => { }, _ = network_manager_exit_rx => { }, }; }; @@ -161,7 +149,8 @@ impl TestData { node_id: usize, session_id: u32, ) -> impl Network { - self.session_manager + match self + .session_manager .start_validator_session( SessionId(session_id), self.authority_verifier.clone(), @@ -169,18 +158,21 @@ impl TestData { self.authorities[node_id].pen(), ) .await - .expect("Failed to start validator session!") + { + Ok(network) => network, + Err(e) => panic!("Failed to start validator session: {}", e), + } } fn early_start_validator_session(&self, node_id: usize, session_id: u32) { - self.session_manager - .early_start_validator_session( - SessionId(session_id), - self.authority_verifier.clone(), - NodeIndex(node_id), - self.authorities[node_id].pen(), - ) - .expect("Failed to start validator session!"); + if let Err(e) = self.session_manager.early_start_validator_session( + SessionId(session_id), + self.authority_verifier.clone(), + NodeIndex(node_id), + self.authorities[node_id].pen(), + ) { + panic!("Failed to start validator session: {}", e); + } } async fn get_session_handler( From 9bb88d909ea3df37cd2bfbf725eba7719eee1aef Mon Sep 17 00:00:00 2001 From: timorl Date: Fri, 16 Dec 2022 10:59:39 +0100 Subject: [PATCH 2/2] Two hard problems in CS --- finality-aleph/src/network/mod.rs | 2 +- .../src/network/{manager => session}/compatibility.rs | 4 ++-- .../src/network/{manager => session}/connections.rs | 0 .../src/network/{manager => session}/data.rs | 0 .../src/network/{manager => session}/discovery.rs | 6 +++--- .../network/{manager/session.rs => session/handler.rs} | 4 ++-- .../src/network/{manager => session}/manager.rs | 4 ++-- finality-aleph/src/network/{manager => session}/mod.rs | 10 +++++----- .../src/network/{manager => session}/service.rs | 2 +- .../src/network/{manager => session}/testing.rs | 0 finality-aleph/src/nodes/validator_node.rs | 2 +- finality-aleph/src/party/manager/mod.rs | 2 +- finality-aleph/src/testing/network.rs | 4 ++-- 13 files changed, 20 insertions(+), 20 deletions(-) rename finality-aleph/src/network/{manager => session}/compatibility.rs (99%) rename finality-aleph/src/network/{manager => session}/connections.rs (100%) rename finality-aleph/src/network/{manager => session}/data.rs (100%) rename finality-aleph/src/network/{manager => session}/discovery.rs (99%) rename finality-aleph/src/network/{manager/session.rs => session/handler.rs} (99%) rename finality-aleph/src/network/{manager => session}/manager.rs (99%) rename finality-aleph/src/network/{manager => session}/mod.rs (95%) rename finality-aleph/src/network/{manager => session}/service.rs (99%) rename finality-aleph/src/network/{manager => session}/testing.rs (100%) diff --git a/finality-aleph/src/network/mod.rs b/finality-aleph/src/network/mod.rs index 6d71284000..35945465ed 100644 --- a/finality-aleph/src/network/mod.rs +++ b/finality-aleph/src/network/mod.rs @@ -10,9 +10,9 @@ use sp_runtime::traits::Block; pub mod clique; pub mod data; mod gossip; -pub mod manager; #[cfg(test)] pub mod mock; +pub mod session; mod substrate; pub mod tcp; diff --git a/finality-aleph/src/network/manager/compatibility.rs b/finality-aleph/src/network/session/compatibility.rs similarity index 99% rename from finality-aleph/src/network/manager/compatibility.rs rename to finality-aleph/src/network/session/compatibility.rs index bb368716c8..146341bb51 100644 --- a/finality-aleph/src/network/manager/compatibility.rs +++ b/finality-aleph/src/network/session/compatibility.rs @@ -8,7 +8,7 @@ use log::warn; use crate::{ network::{ - manager::{AuthData, Authentication, LegacyAuthentication}, + session::{AuthData, Authentication, LegacyAuthentication}, AddressingInformation, Data, }, SessionId, Version, @@ -253,7 +253,7 @@ mod test { crypto::AuthorityVerifier, network::{ clique::mock::MockAddressingInformation, - manager::{ + session::{ compatibility::{PeerAuthentications, MAX_AUTHENTICATION_SIZE}, LegacyDiscoveryMessage, SessionHandler, }, diff --git a/finality-aleph/src/network/manager/connections.rs b/finality-aleph/src/network/session/connections.rs similarity index 100% rename from finality-aleph/src/network/manager/connections.rs rename to finality-aleph/src/network/session/connections.rs diff --git a/finality-aleph/src/network/manager/data.rs b/finality-aleph/src/network/session/data.rs similarity index 100% rename from finality-aleph/src/network/manager/data.rs rename to finality-aleph/src/network/session/data.rs diff --git a/finality-aleph/src/network/manager/discovery.rs b/finality-aleph/src/network/session/discovery.rs similarity index 99% rename from finality-aleph/src/network/manager/discovery.rs rename to finality-aleph/src/network/session/discovery.rs index dcad44b47d..e64e1a030c 100644 --- a/finality-aleph/src/network/manager/discovery.rs +++ b/finality-aleph/src/network/session/discovery.rs @@ -9,7 +9,7 @@ use log::{debug, info, trace}; use crate::{ network::{ - manager::{ + session::{ compatibility::PeerAuthentications, Authentication, LegacyAuthentication, SessionHandler, }, @@ -123,11 +123,11 @@ mod tests { use crate::{ network::{ clique::mock::{random_address, MockAddressingInformation}, - manager::{ + mock::crypto_basics, + session::{ authentication, compatibility::PeerAuthentications, legacy_authentication, SessionHandler, }, - mock::crypto_basics, }, SessionId, }; diff --git a/finality-aleph/src/network/manager/session.rs b/finality-aleph/src/network/session/handler.rs similarity index 99% rename from finality-aleph/src/network/manager/session.rs rename to finality-aleph/src/network/session/handler.rs index 4d338dcfa5..797fe073d6 100644 --- a/finality-aleph/src/network/manager/session.rs +++ b/finality-aleph/src/network/session/handler.rs @@ -6,7 +6,7 @@ use crate::{ abft::NodeCount, crypto::{AuthorityPen, AuthorityVerifier}, network::{ - manager::{ + session::{ compatibility::PeerAuthentications, AuthData, Authentication, LegacyAuthData, LegacyAuthentication, }, @@ -274,8 +274,8 @@ pub mod tests { use crate::{ network::{ clique::mock::{random_address, random_invalid_address, MockAddressingInformation}, - manager::{compatibility::PeerAuthentications, Authentication, LegacyAuthentication}, mock::crypto_basics, + session::{compatibility::PeerAuthentications, Authentication, LegacyAuthentication}, AddressingInformation, }, NodeIndex, SessionId, diff --git a/finality-aleph/src/network/manager/manager.rs b/finality-aleph/src/network/session/manager.rs similarity index 99% rename from finality-aleph/src/network/manager/manager.rs rename to finality-aleph/src/network/session/manager.rs index 56e515c2fe..6feff15c67 100644 --- a/finality-aleph/src/network/manager/manager.rs +++ b/finality-aleph/src/network/session/manager.rs @@ -11,7 +11,7 @@ use crate::{ abft::Recipient, crypto::{AuthorityPen, AuthorityVerifier}, network::{ - manager::{ + session::{ compatibility::PeerAuthentications, data::DataInSession, Connections, Discovery, DiscoveryMessage, SessionHandler, SessionHandlerError, }, @@ -457,8 +457,8 @@ mod tests { use crate::{ network::{ clique::mock::{random_address, MockAddressingInformation}, - manager::{compatibility::PeerAuthentications, data::DataInSession, DiscoveryMessage}, mock::crypto_basics, + session::{compatibility::PeerAuthentications, data::DataInSession, DiscoveryMessage}, }, Recipient, SessionId, }; diff --git a/finality-aleph/src/network/manager/mod.rs b/finality-aleph/src/network/session/mod.rs similarity index 95% rename from finality-aleph/src/network/manager/mod.rs rename to finality-aleph/src/network/session/mod.rs index 65a8407bbd..a3b7ed75ad 100644 --- a/finality-aleph/src/network/manager/mod.rs +++ b/finality-aleph/src/network/session/mod.rs @@ -1,4 +1,4 @@ -//! Managing the validator connections using the gossip network. +//! Managing the validator connections in sessions using the gossip network. use std::fmt::Display; use codec::{Decode, Encode}; @@ -20,9 +20,9 @@ mod compatibility; mod connections; mod data; mod discovery; +mod handler; mod manager; mod service; -mod session; pub use compatibility::{ DiscoveryMessage, LegacyDiscoveryMessage, PeerAuthentications, VersionedAuthentication, @@ -31,10 +31,10 @@ use connections::Connections; #[cfg(test)] pub use data::DataInSession; pub use discovery::Discovery; -pub use service::{Config as ConnectionManagerConfig, ManagerError, Service as ConnectionManager}; #[cfg(test)] -pub use session::tests::{authentication, legacy_authentication}; -pub use session::{Handler as SessionHandler, HandlerError as SessionHandlerError}; +pub use handler::tests::{authentication, legacy_authentication}; +pub use handler::{Handler as SessionHandler, HandlerError as SessionHandlerError}; +pub use service::{Config as ConnectionManagerConfig, ManagerError, Service as ConnectionManager}; /// Data validators used to use to authenticate themselves for a single session /// and disseminate their addresses. diff --git a/finality-aleph/src/network/manager/service.rs b/finality-aleph/src/network/session/service.rs similarity index 99% rename from finality-aleph/src/network/manager/service.rs rename to finality-aleph/src/network/session/service.rs index a64995d4f3..080368397d 100644 --- a/finality-aleph/src/network/manager/service.rs +++ b/finality-aleph/src/network/session/service.rs @@ -16,7 +16,7 @@ use crate::{ crypto::{AuthorityPen, AuthorityVerifier}, network::{ clique::{Network as CliqueNetwork, PublicKey}, - manager::{ + session::{ data::DataInSession, manager::{ AddressedData, ConnectionCommand, Manager, ManagerActions, PreNonvalidatorSession, diff --git a/finality-aleph/src/network/manager/testing.rs b/finality-aleph/src/network/session/testing.rs similarity index 100% rename from finality-aleph/src/network/manager/testing.rs rename to finality-aleph/src/network/session/testing.rs diff --git a/finality-aleph/src/nodes/validator_node.rs b/finality-aleph/src/nodes/validator_node.rs index 55cf1c52d4..5301fa25a6 100644 --- a/finality-aleph/src/nodes/validator_node.rs +++ b/finality-aleph/src/nodes/validator_node.rs @@ -13,7 +13,7 @@ use crate::{ crypto::AuthorityPen, network::{ clique::Service, - manager::{ConnectionManager, ConnectionManagerConfig}, + session::{ConnectionManager, ConnectionManagerConfig}, tcp::{new_tcp_network, KEY_TYPE}, GossipService, }, diff --git a/finality-aleph/src/party/manager/mod.rs b/finality-aleph/src/party/manager/mod.rs index f50d0b9a1b..ee4081303c 100644 --- a/finality-aleph/src/party/manager/mod.rs +++ b/finality-aleph/src/party/manager/mod.rs @@ -25,7 +25,7 @@ use crate::{ component::{Network, NetworkMap, SimpleNetwork}, split::split, }, - manager::{SessionManager, SessionSender}, + session::{SessionManager, SessionSender}, RequestBlocks, }, party::{ diff --git a/finality-aleph/src/testing/network.rs b/finality-aleph/src/testing/network.rs index 064396197d..77445be02c 100644 --- a/finality-aleph/src/testing/network.rs +++ b/finality-aleph/src/testing/network.rs @@ -17,12 +17,12 @@ use crate::{ MockPublicKey, }, data::Network, - manager::{ + mock::{crypto_basics, MockData}, + session::{ authentication, legacy_authentication, ConnectionManager, ConnectionManagerConfig, DataInSession, LegacyDiscoveryMessage, ManagerError, SessionHandler, SessionManager, VersionedAuthentication, }, - mock::{crypto_basics, MockData}, AddressingInformation, GossipService, MockEvent, MockRawNetwork, Protocol, }, MillisecsPerBlock, NodeIndex, Recipient, SessionId, SessionPeriod,