diff --git a/finality-aleph/src/network/io.rs b/finality-aleph/src/network/io.rs deleted file mode 100644 index d86b6b3bfe..0000000000 --- a/finality-aleph/src/network/io.rs +++ /dev/null @@ -1,40 +0,0 @@ -use std::fmt::Debug; - -use futures::channel::mpsc; - -use crate::network::{ - clique::{Network as CliqueNetwork, PublicKey}, - manager::{DataInSession, VersionedAuthentication}, - AddressingInformation, ConnectionManagerIO, Data, GossipNetwork, SessionManagerIO, -}; - -type FullIO = (ConnectionManagerIO, SessionManagerIO); - -pub fn setup< - D: Data, - M: Data + Debug, - A: AddressingInformation + TryFrom> + Into>, - CN: CliqueNetwork>, - GN: GossipNetwork>, ->( - validator_network: CN, - gossip_network: GN, -) -> FullIO -where - A::PeerId: PublicKey, -{ - // Prepare and start the network - let (commands_for_service, commands_from_user) = mpsc::unbounded(); - let (messages_for_service, commands_from_manager) = mpsc::unbounded(); - - let connection_io = ConnectionManagerIO::new( - commands_from_user, - commands_from_manager, - validator_network, - gossip_network, - ); - let channels_for_session_manager = - SessionManagerIO::new(commands_for_service, messages_for_service); - - (connection_io, channels_for_session_manager) -} diff --git a/finality-aleph/src/network/manager/mod.rs b/finality-aleph/src/network/manager/mod.rs deleted file mode 100644 index ad79ae7bc8..0000000000 --- a/finality-aleph/src/network/manager/mod.rs +++ /dev/null @@ -1,139 +0,0 @@ -use codec::{Decode, Encode, Error, Input, Output}; - -use crate::{ - crypto::Signature, - network::{AddressingInformation, Data}, - NodeIndex, SessionId, -}; - -mod compatibility; -mod connections; -mod discovery; -mod service; -mod session; - -pub use compatibility::{ - DiscoveryMessage, LegacyDiscoveryMessage, PeerAuthentications, VersionedAuthentication, -}; -use connections::Connections; -pub use discovery::Discovery; -pub use service::{ - Config as ConnectionManagerConfig, Service as ConnectionManager, SessionCommand, - IO as ConnectionIO, -}; -pub use session::{Handler as SessionHandler, HandlerError as SessionHandlerError}; - -/// Data validators used to use to authenticate themselves for a single session -/// and disseminate their addresses. -#[derive(Clone, Debug, PartialEq, Eq, Hash, Encode, Decode)] -pub struct LegacyAuthData { - addresses: Vec, - node_id: NodeIndex, - session_id: SessionId, -} - -impl LegacyAuthData { - pub fn session(&self) -> SessionId { - self.session_id - } - - pub fn creator(&self) -> NodeIndex { - self.node_id - } -} - -/// Data validators use to authenticate themselves for a single session -/// and disseminate their addresses. -#[derive(Clone, Debug, PartialEq, Eq, Hash, Encode, Decode)] -pub struct AuthData { - address: A, - node_id: NodeIndex, - session_id: SessionId, -} - -impl AuthData { - pub fn session(&self) -> SessionId { - self.session_id - } - - pub fn creator(&self) -> NodeIndex { - self.node_id - } - - pub fn address(&self) -> A { - self.address.clone() - } -} - -impl>> From> for LegacyAuthData { - fn from(auth_data: AuthData) -> Self { - let AuthData { - address, - node_id, - session_id, - } = auth_data; - let addresses = address.into(); - LegacyAuthData { - addresses, - node_id, - session_id, - } - } -} - -impl>> TryFrom> - for AuthData -{ - type Error = (); - - fn try_from(legacy_auth_data: LegacyAuthData) -> Result { - let LegacyAuthData { - addresses, - node_id, - session_id, - } = legacy_auth_data; - let address = addresses.try_into().map_err(|_| ())?; - Ok(AuthData { - address, - node_id, - session_id, - }) - } -} - -/// A full legacy authentication, consisting of a signed LegacyAuthData. -pub type LegacyAuthentication = (LegacyAuthData, Signature); - -/// A full authentication, consisting of a signed AuthData. -pub type Authentication = (AuthData, Signature); - -/// Data inside session, sent to validator network. -/// Wrapper for data send over network. We need it to ensure compatibility. -/// The order of the data and session_id is fixed in encode and the decode expects it to be data, session_id. -/// Since data is versioned, i.e. it's encoding starts with a version number in the standardized way, -/// this will allow us to retrofit versioning here if we ever need to change this structure. -#[derive(Clone, Debug, PartialEq, Eq)] -pub struct DataInSession { - pub data: D, - pub session_id: SessionId, -} - -impl Decode for DataInSession { - fn decode(input: &mut I) -> Result { - let data = D::decode(input)?; - let session_id = SessionId::decode(input)?; - - Ok(Self { data, session_id }) - } -} - -impl Encode for DataInSession { - fn size_hint(&self) -> usize { - self.data.size_hint() + self.session_id.size_hint() - } - - fn encode_to(&self, dest: &mut T) { - self.data.encode_to(dest); - self.session_id.encode_to(dest); - } -} diff --git a/finality-aleph/src/network/mod.rs b/finality-aleph/src/network/mod.rs index 6111f2e4b7..35945465ed 100644 --- a/finality-aleph/src/network/mod.rs +++ b/finality-aleph/src/network/mod.rs @@ -1,5 +1,4 @@ use std::{ - collections::HashSet, fmt::{Debug, Display}, hash::Hash, }; @@ -11,58 +10,16 @@ use sp_runtime::traits::Block; pub mod clique; pub mod data; mod gossip; -mod io; -mod manager; #[cfg(test)] pub mod mock; -mod session; +pub mod session; mod substrate; pub mod tcp; +#[cfg(test)] +pub use gossip::mock::{MockEvent, MockRawNetwork}; pub use gossip::{Network as GossipNetwork, Protocol, Service as GossipService}; -pub use io::setup as setup_io; -use manager::SessionCommand; -pub use manager::{ - ConnectionIO as ConnectionManagerIO, ConnectionManager, ConnectionManagerConfig, -}; -pub use session::{Manager as SessionManager, ManagerError, SessionSender, IO as SessionManagerIO}; pub use substrate::protocol_name; -#[cfg(test)] -pub mod testing { - use super::manager::LegacyAuthentication; - pub use super::{ - clique::mock::MockAddressingInformation, - gossip::mock::{MockEvent, MockRawNetwork}, - manager::{ - Authentication, DataInSession, DiscoveryMessage, LegacyDiscoveryMessage, - PeerAuthentications, SessionHandler, VersionedAuthentication, - }, - }; - - pub fn legacy_authentication( - handler: &SessionHandler, - ) -> LegacyAuthentication { - match handler - .authentication() - .expect("this is a validator handler") - { - PeerAuthentications::Both(_, authentication) => authentication, - _ => panic!("handler doesn't have both authentications"), - } - } - - pub fn authentication( - handler: &SessionHandler, - ) -> Authentication { - match handler - .authentication() - .expect("this is a validator handler") - { - PeerAuthentications::Both(authentication, _) => authentication, - _ => panic!("handler doesn't have both authentications"), - } - } -} /// Represents the id of an arbitrary node. pub trait PeerId: PartialEq + Eq + Clone + Debug + Display + Hash + Codec + Send { @@ -121,17 +78,7 @@ pub trait RequestBlocks: Clone + Send + Sync + 'static { fn is_major_syncing(&self) -> bool; } -/// Commands for manipulating the reserved peers set. -#[derive(Debug, PartialEq, Eq)] -pub enum ConnectionCommand { - AddReserved(HashSet), - DelReserved(HashSet), -} - /// A basic alias for properties we expect basic data to satisfy. pub trait Data: Clone + Codec + Send + Sync + 'static {} impl Data for D {} - -// In practice D: Data and P: PeerId, but we cannot require that in type aliases. -type AddressedData = (D, P); diff --git a/finality-aleph/src/network/session.rs b/finality-aleph/src/network/session.rs deleted file mode 100644 index 9b91ef77bd..0000000000 --- a/finality-aleph/src/network/session.rs +++ /dev/null @@ -1,143 +0,0 @@ -use futures::channel::{mpsc, oneshot}; - -use crate::{ - abft::Recipient, - crypto::{AuthorityPen, AuthorityVerifier}, - network::{ - data::{ - component::{Sender, SimpleNetwork}, - SendError, - }, - Data, SessionCommand, - }, - NodeIndex, SessionId, -}; - -/// Sends data within a single session. -#[derive(Clone)] -pub struct SessionSender { - session_id: SessionId, - messages_for_network: mpsc::UnboundedSender<(D, SessionId, Recipient)>, -} - -impl Sender for SessionSender { - fn send(&self, data: D, recipient: Recipient) -> Result<(), SendError> { - self.messages_for_network - .unbounded_send((data, self.session_id, recipient)) - .map_err(|_| SendError::SendFailed) - } -} - -/// Sends and receives data within a single session. -type Network = SimpleNetwork, SessionSender>; - -/// Manages sessions for which the network should be active. -pub struct Manager { - commands_for_service: mpsc::UnboundedSender>, - messages_for_service: mpsc::UnboundedSender<(D, SessionId, Recipient)>, -} - -/// What went wrong during a session management operation. -#[derive(Debug)] -pub enum ManagerError { - CommandSendFailed, - NetworkReceiveFailed, -} - -pub struct IO { - pub commands_for_service: mpsc::UnboundedSender>, - pub messages_for_service: mpsc::UnboundedSender<(D, SessionId, Recipient)>, -} - -impl IO { - pub fn new( - commands_for_service: mpsc::UnboundedSender>, - messages_for_service: mpsc::UnboundedSender<(D, SessionId, Recipient)>, - ) -> Self { - IO { - commands_for_service, - messages_for_service, - } - } -} - -impl Manager { - /// Create a new manager with the given channels to the service. - pub fn new(io: IO) -> Self { - Manager { - commands_for_service: io.commands_for_service, - messages_for_service: io.messages_for_service, - } - } - - /// Start participating or update the verifier in the given session where you are not a - /// validator. - pub fn start_nonvalidator_session( - &self, - session_id: SessionId, - verifier: AuthorityVerifier, - ) -> Result<(), ManagerError> { - self.commands_for_service - .unbounded_send(SessionCommand::StartNonvalidator(session_id, verifier)) - .map_err(|_| ManagerError::CommandSendFailed) - } - - /// Start participating or update the information about the given session where you are a - /// validator. Returns a session network to be used for sending and receiving data within the - /// session. - pub async fn start_validator_session( - &self, - session_id: SessionId, - verifier: AuthorityVerifier, - node_id: NodeIndex, - pen: AuthorityPen, - ) -> Result, ManagerError> { - let (result_for_us, result_from_service) = oneshot::channel(); - self.commands_for_service - .unbounded_send(SessionCommand::StartValidator( - session_id, - verifier, - node_id, - pen, - Some(result_for_us), - )) - .map_err(|_| ManagerError::CommandSendFailed)?; - - let data_from_network = result_from_service - .await - .map_err(|_| ManagerError::NetworkReceiveFailed)?; - let messages_for_network = self.messages_for_service.clone(); - - Ok(Network::new( - data_from_network, - SessionSender { - session_id, - messages_for_network, - }, - )) - } - - /// Start participating or update the information about the given session where you are a - /// validator. Used for early starts when you don't yet need the returned network, but would - /// like to start discovery. - pub fn early_start_validator_session( - &self, - session_id: SessionId, - verifier: AuthorityVerifier, - node_id: NodeIndex, - pen: AuthorityPen, - ) -> Result<(), ManagerError> { - self.commands_for_service - .unbounded_send(SessionCommand::StartValidator( - session_id, verifier, node_id, pen, None, - )) - .map_err(|_| ManagerError::CommandSendFailed) - } - - /// Stop participating in the given session. - pub fn stop_session(&self, session_id: SessionId) -> Result<(), ManagerError> { - self.commands_for_service - .unbounded_send(SessionCommand::Stop(session_id)) - .map_err(|_| ManagerError::CommandSendFailed) - } -} diff --git a/finality-aleph/src/network/manager/compatibility.rs b/finality-aleph/src/network/session/compatibility.rs similarity index 99% rename from finality-aleph/src/network/manager/compatibility.rs rename to finality-aleph/src/network/session/compatibility.rs index bb368716c8..146341bb51 100644 --- a/finality-aleph/src/network/manager/compatibility.rs +++ b/finality-aleph/src/network/session/compatibility.rs @@ -8,7 +8,7 @@ use log::warn; use crate::{ network::{ - manager::{AuthData, Authentication, LegacyAuthentication}, + session::{AuthData, Authentication, LegacyAuthentication}, AddressingInformation, Data, }, SessionId, Version, @@ -253,7 +253,7 @@ mod test { crypto::AuthorityVerifier, network::{ clique::mock::MockAddressingInformation, - manager::{ + session::{ compatibility::{PeerAuthentications, MAX_AUTHENTICATION_SIZE}, LegacyDiscoveryMessage, SessionHandler, }, diff --git a/finality-aleph/src/network/manager/connections.rs b/finality-aleph/src/network/session/connections.rs similarity index 100% rename from finality-aleph/src/network/manager/connections.rs rename to finality-aleph/src/network/session/connections.rs diff --git a/finality-aleph/src/network/session/data.rs b/finality-aleph/src/network/session/data.rs new file mode 100644 index 0000000000..db7cf5f672 --- /dev/null +++ b/finality-aleph/src/network/session/data.rs @@ -0,0 +1,34 @@ +use codec::{Decode, Encode, Error, Input, Output}; + +use crate::{network::Data, SessionId}; + +/// Data inside session, sent to validator network. +/// Wrapper for data send over network. We need it to ensure compatibility. +/// The order of the data and session_id is fixed in encode and the decode expects it to be data, session_id. +/// Since data is versioned, i.e. it's encoding starts with a version number in the standardized way, +/// this will allow us to retrofit versioning here if we ever need to change this structure. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct DataInSession { + pub data: D, + pub session_id: SessionId, +} + +impl Decode for DataInSession { + fn decode(input: &mut I) -> Result { + let data = D::decode(input)?; + let session_id = SessionId::decode(input)?; + + Ok(Self { data, session_id }) + } +} + +impl Encode for DataInSession { + fn size_hint(&self) -> usize { + self.data.size_hint() + self.session_id.size_hint() + } + + fn encode_to(&self, dest: &mut T) { + self.data.encode_to(dest); + self.session_id.encode_to(dest); + } +} diff --git a/finality-aleph/src/network/manager/discovery.rs b/finality-aleph/src/network/session/discovery.rs similarity index 98% rename from finality-aleph/src/network/manager/discovery.rs rename to finality-aleph/src/network/session/discovery.rs index 4e36513b69..e64e1a030c 100644 --- a/finality-aleph/src/network/manager/discovery.rs +++ b/finality-aleph/src/network/session/discovery.rs @@ -9,7 +9,7 @@ use log::{debug, info, trace}; use crate::{ network::{ - manager::{ + session::{ compatibility::PeerAuthentications, Authentication, LegacyAuthentication, SessionHandler, }, @@ -123,9 +123,11 @@ mod tests { use crate::{ network::{ clique::mock::{random_address, MockAddressingInformation}, - manager::{compatibility::PeerAuthentications, SessionHandler}, mock::crypto_basics, - testing::{authentication, legacy_authentication}, + session::{ + authentication, compatibility::PeerAuthentications, legacy_authentication, + SessionHandler, + }, }, SessionId, }; diff --git a/finality-aleph/src/network/manager/session.rs b/finality-aleph/src/network/session/handler.rs similarity index 95% rename from finality-aleph/src/network/manager/session.rs rename to finality-aleph/src/network/session/handler.rs index 009786f946..797fe073d6 100644 --- a/finality-aleph/src/network/manager/session.rs +++ b/finality-aleph/src/network/session/handler.rs @@ -6,7 +6,7 @@ use crate::{ abft::NodeCount, crypto::{AuthorityPen, AuthorityVerifier}, network::{ - manager::{ + session::{ compatibility::PeerAuthentications, AuthData, Authentication, LegacyAuthData, LegacyAuthentication, }, @@ -269,18 +269,42 @@ impl> + Into>> Handler } #[cfg(test)] -mod tests { +pub mod tests { use super::{Handler, HandlerError}; use crate::{ network::{ - clique::mock::{random_address, random_invalid_address}, + clique::mock::{random_address, random_invalid_address, MockAddressingInformation}, mock::crypto_basics, - testing::{authentication, legacy_authentication}, + session::{compatibility::PeerAuthentications, Authentication, LegacyAuthentication}, AddressingInformation, }, NodeIndex, SessionId, }; + pub fn legacy_authentication( + handler: &Handler, + ) -> LegacyAuthentication { + match handler + .authentication() + .expect("this is a validator handler") + { + PeerAuthentications::Both(_, authentication) => authentication, + _ => panic!("handler doesn't have both authentications"), + } + } + + pub fn authentication( + handler: &Handler, + ) -> Authentication { + match handler + .authentication() + .expect("this is a validator handler") + { + PeerAuthentications::Both(authentication, _) => authentication, + _ => panic!("handler doesn't have both authentications"), + } + } + const NUM_NODES: usize = 7; #[tokio::test] diff --git a/finality-aleph/src/network/manager/service.rs b/finality-aleph/src/network/session/manager.rs similarity index 50% rename from finality-aleph/src/network/manager/service.rs rename to finality-aleph/src/network/session/manager.rs index d0d0978bc8..6feff15c67 100644 --- a/finality-aleph/src/network/manager/service.rs +++ b/finality-aleph/src/network/session/manager.rs @@ -1,47 +1,35 @@ use std::{ - cmp, collections::{HashMap, HashSet}, - fmt::{Debug, Display, Error as FmtError, Formatter}, - marker::PhantomData, + fmt::Debug, time::Duration, }; -use futures::{ - channel::{mpsc, oneshot}, - StreamExt, -}; -use log::{debug, info, trace, warn}; -use tokio::time::{self, Instant}; +use futures::channel::mpsc; +use log::{debug, info}; use crate::{ abft::Recipient, crypto::{AuthorityPen, AuthorityVerifier}, network::{ - clique::{Network as CliqueNetwork, PublicKey}, - manager::{ - compatibility::PeerAuthentications, Connections, DataInSession, Discovery, - DiscoveryMessage, SessionHandler, SessionHandlerError, VersionedAuthentication, + session::{ + compatibility::PeerAuthentications, data::DataInSession, Connections, Discovery, + DiscoveryMessage, SessionHandler, SessionHandlerError, }, - AddressedData, AddressingInformation, ConnectionCommand, Data, GossipNetwork, - NetworkIdentity, PeerId, + AddressingInformation, Data, NetworkIdentity, PeerId, }, - MillisecsPerBlock, NodeIndex, SessionId, SessionPeriod, STATUS_REPORT_INTERVAL, + NodeIndex, SessionId, }; -/// Commands for manipulating sessions, stopping them and starting both validator and non-validator -/// sessions. -pub enum SessionCommand { - StartValidator( - SessionId, - AuthorityVerifier, - NodeIndex, - AuthorityPen, - Option>>, - ), - StartNonvalidator(SessionId, AuthorityVerifier), - Stop(SessionId), +/// Commands for manipulating the reserved peers set. +#[derive(Debug, PartialEq, Eq)] +pub enum ConnectionCommand { + AddReserved(HashSet), + DelReserved(HashSet), } +// In practice D: Data and P: PeerId, but we cannot require that in type aliases. +pub type AddressedData = (D, P); + struct Session> + Into>> { handler: SessionHandler, discovery: Discovery, @@ -50,75 +38,38 @@ struct Session> + In #[derive(Clone)] /// Stores all data needed for starting validator session -struct PreValidatorSession { - session_id: SessionId, - verifier: AuthorityVerifier, - node_id: NodeIndex, - pen: AuthorityPen, +pub struct PreValidatorSession { + pub session_id: SessionId, + pub verifier: AuthorityVerifier, + pub node_id: NodeIndex, + pub pen: AuthorityPen, } #[derive(Clone)] /// Stores all data needed for starting non-validator session -struct PreNonvalidatorSession { - session_id: SessionId, - verifier: AuthorityVerifier, -} - -/// Configuration for the session manager service. Controls how often the maintenance and -/// rebroadcasts are triggerred. Also controls when maintenance starts. -pub struct Config { - discovery_cooldown: Duration, - maintenance_period: Duration, - initial_delay: Duration, -} - -impl Config { - fn new( - discovery_cooldown: Duration, - maintenance_period: Duration, - initial_delay: Duration, - ) -> Self { - Config { - discovery_cooldown, - maintenance_period, - initial_delay, - } - } - - /// Returns a configuration that triggers maintenance about 5 times per session. - pub fn with_session_period( - session_period: &SessionPeriod, - millisecs_per_block: &MillisecsPerBlock, - ) -> Self { - let discovery_cooldown = - Duration::from_millis(millisecs_per_block.0 * session_period.0 as u64 / 5); - let maintenance_period = discovery_cooldown / 2; - let initial_delay = cmp::min( - Duration::from_millis(millisecs_per_block.0 * 10), - maintenance_period, - ); - Config::new(discovery_cooldown, maintenance_period, initial_delay) - } +pub struct PreNonvalidatorSession { + pub session_id: SessionId, + pub verifier: AuthorityVerifier, } -/// Actions that the service wants to take as the result of some information. Might contain a +/// Actions that the manager wants to take as the result of some information. Might contain a /// command for connecting to or disconnecting from some peers or a message to broadcast for /// discovery purposes. -pub struct ServiceActions> + Into>> { - maybe_command: Option>, - maybe_message: Option>, +pub struct ManagerActions> + Into>> { + pub maybe_command: Option>, + pub maybe_message: Option>, } -impl> + Into>> ServiceActions { +impl> + Into>> ManagerActions { fn noop() -> Self { - ServiceActions { + ManagerActions { maybe_command: None, maybe_message: None, } } } -/// The connection manager service. It handles the abstraction over the network we build to support +/// The connection manager. It handles the abstraction over the network we build to support /// separate sessions. This includes: /// 1. Starting and ending specific sessions on user demand. /// 2. Forwarding in-session user messages to the network using session handlers for address @@ -127,7 +78,7 @@ impl> + Into>> Service /// 1. In-session messages are forwarded to the user. /// 2. Authentication messages forwarded to session handlers. /// 4. Running periodic maintenance, mostly related to node discovery. -pub struct Service +pub struct Manager where NI::AddressingInformation: TryFrom> + Into>, { @@ -135,8 +86,6 @@ where connections: Connections, sessions: HashMap>, discovery_cooldown: Duration, - maintenance_period: Duration, - initial_delay: Duration, } /// Error when trying to forward data from the network to the user, should never be fatal. @@ -146,24 +95,17 @@ pub enum SendError { NoSession, } -impl Service +impl Manager where NI::AddressingInformation: TryFrom> + Into>, { - /// Create a new connection manager service. - pub fn new(network_identity: NI, config: Config) -> Self { - let Config { - discovery_cooldown, - maintenance_period, - initial_delay, - } = config; - Service { + /// Create a new connection manager. + pub fn new(network_identity: NI, discovery_cooldown: Duration) -> Self { + Manager { network_identity, connections: Connections::new(), sessions: HashMap::new(), discovery_cooldown, - maintenance_period, - initial_delay, } } @@ -176,12 +118,16 @@ where } } - fn finish_session( + /// Ends a session. + pub fn finish_session( &mut self, session_id: SessionId, - ) -> Option> { + ) -> ManagerActions { self.sessions.remove(&session_id); - Self::delete_reserved(self.connections.remove_session(session_id)) + ManagerActions { + maybe_command: Self::delete_reserved(self.connections.remove_session(session_id)), + maybe_message: None, + } } fn discover_authorities( @@ -234,12 +180,13 @@ where (self.discover_authorities(&session_id), data_from_network) } - async fn update_validator_session( + /// Starts or updates a validator session. + pub async fn update_validator_session( &mut self, pre_session: PreValidatorSession, ) -> Result< ( - ServiceActions, + ManagerActions, mpsc::UnboundedReceiver, ), SessionHandlerError, @@ -251,7 +198,7 @@ where let (maybe_message, data_from_network) = self.start_validator_session(pre_session, address).await; return Ok(( - ServiceActions { + ManagerActions { maybe_command: None, maybe_message, }, @@ -283,7 +230,7 @@ where session.data_for_user = Some(data_for_user); self.connections.add_peers(session_id, peers_to_stay); Ok(( - ServiceActions { + ManagerActions { maybe_command, maybe_message: self.discover_authorities(&session_id), }, @@ -291,23 +238,6 @@ where )) } - async fn handle_validator_presession( - &mut self, - pre_session: PreValidatorSession, - result_for_user: Option>>, - ) -> Result, SessionHandlerError> { - self.update_validator_session(pre_session) - .await - .map(|(actions, data_from_network)| { - if let Some(result_for_user) = result_for_user { - if result_for_user.send(data_from_network).is_err() { - warn!(target: "aleph-network", "Failed to send started session.") - } - } - actions - }) - } - async fn start_nonvalidator_session( &mut self, pre_session: PreNonvalidatorSession, @@ -329,63 +259,24 @@ where ); } - async fn update_nonvalidator_session( + /// Starts or updates a nonvalidator session. + pub async fn update_nonvalidator_session( &mut self, pre_session: PreNonvalidatorSession, - ) -> Result<(), SessionHandlerError> { + ) -> Result, SessionHandlerError> { let address = self.network_identity.identity(); - let session = match self.sessions.get_mut(&pre_session.session_id) { - Some(session) => session, + match self.sessions.get_mut(&pre_session.session_id) { + Some(session) => { + session + .handler + .update(None, pre_session.verifier, address) + .await?; + } None => { self.start_nonvalidator_session(pre_session, address).await; - return Ok(()); } }; - session - .handler - .update(None, pre_session.verifier, address) - .await?; - Ok(()) - } - - async fn handle_nonvalidator_presession( - &mut self, - pre_session: PreNonvalidatorSession, - ) -> Result<(), SessionHandlerError> { - self.update_nonvalidator_session(pre_session).await - } - - /// Handle a session command. - /// Returns actions the service wants to take or an error if the session command is invalid. - pub async fn on_command( - &mut self, - command: SessionCommand, - ) -> Result, SessionHandlerError> { - use SessionCommand::*; - match command { - StartValidator(session_id, verifier, node_id, pen, result_for_user) => { - let pre_session = PreValidatorSession { - session_id, - verifier, - node_id, - pen, - }; - self.handle_validator_presession(pre_session, result_for_user) - .await - } - StartNonvalidator(session_id, verifier) => { - let pre_session = PreNonvalidatorSession { - session_id, - verifier, - }; - self.handle_nonvalidator_presession(pre_session).await?; - Ok(ServiceActions::noop()) - } - Stop(session_id) => Ok(ServiceActions { - maybe_command: self.finish_session(session_id), - maybe_message: None, - }), - } + Ok(ManagerActions::noop()) } /// Handle a user request for sending data. @@ -420,11 +311,11 @@ where } /// Handle a discovery message. - /// Returns actions the service wants to take. + /// Returns actions the manager wants to take. pub fn on_discovery_message( &mut self, message: DiscoveryMessage, - ) -> ServiceActions { + ) -> ManagerActions { use DiscoveryMessage::*; let session_id = message.session_id(); match self.sessions.get_mut(&session_id) { @@ -447,14 +338,14 @@ where } _ => None, }; - ServiceActions { + ManagerActions { maybe_command, maybe_message, } } None => { debug!(target: "aleph-network", "Received message from unknown session: {:?}", message); - ServiceActions::noop() + ManagerActions::noop() } } } @@ -553,297 +444,114 @@ where } } -/// Input/output interface for the connection manager service. -pub struct IO< - D: Data, - M: Data, - A: AddressingInformation + TryFrom> + Into>, - CN: CliqueNetwork>, - GN: GossipNetwork>, -> where - A::PeerId: PublicKey, -{ - commands_from_user: mpsc::UnboundedReceiver>, - messages_from_user: mpsc::UnboundedReceiver<(D, SessionId, Recipient)>, - validator_network: CN, - gossip_network: GN, - _phantom: PhantomData<(M, A)>, -} - -/// Errors that can happen during the network service operations. -#[derive(Debug, PartialEq, Eq)] -pub enum Error { - CommandsChannel, - MessageChannel, - ValidatorNetwork, - GossipNetwork(GE), -} - -impl Display for Error { - fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { - use Error::*; - match self { - CommandsChannel => write!(f, "commands channel unexpectedly closed"), - MessageChannel => write!(f, "message channel unexpectedly closed"), - ValidatorNetwork => write!(f, "validator network unexpectedly done"), - GossipNetwork(e) => write!(f, "gossip network unexpectedly done: {}", e), - } - } -} - -impl< - D: Data, - M: Data + Debug, - A: AddressingInformation + TryFrom> + Into>, - CN: CliqueNetwork>, - GN: GossipNetwork>, - > IO -where - A::PeerId: PublicKey, -{ - pub fn new( - commands_from_user: mpsc::UnboundedReceiver>, - messages_from_user: mpsc::UnboundedReceiver<(D, SessionId, Recipient)>, - validator_network: CN, - gossip_network: GN, - ) -> IO { - IO { - commands_from_user, - messages_from_user, - validator_network, - gossip_network, - _phantom: PhantomData, - } - } - - fn send_data(&self, to_send: AddressedData, A::PeerId>) { - self.validator_network.send(to_send.0, to_send.1) - } - - fn send_authentications( - &mut self, - to_send: Vec>, - ) -> Result<(), Error> { - for auth in to_send { - self.gossip_network - .broadcast(auth) - .map_err(Error::GossipNetwork)?; - } - Ok(()) - } - - fn handle_connection_command(&mut self, connection_command: ConnectionCommand) { - match connection_command { - ConnectionCommand::AddReserved(addresses) => { - for address in addresses { - self.validator_network - .add_connection(address.peer_id(), address); - } - } - ConnectionCommand::DelReserved(peers) => { - for peer in peers { - self.validator_network.remove_connection(peer); - } - } - }; - } - - fn handle_service_actions( - &mut self, - ServiceActions { - maybe_command, - maybe_message, - }: ServiceActions, - ) -> Result<(), Error> { - if let Some(command) = maybe_command { - self.handle_connection_command(command); - } - if let Some(message) = maybe_message { - self.send_authentications(message.into())?; - } - Ok(()) - } - - /// Run the connection manager service with this IO. - pub async fn run>( - mut self, - mut service: Service, - ) -> Result<(), Error> { - // Initial delay is needed so that Network is fully set up and we received some first discovery broadcasts from other nodes. - // Otherwise this might cause first maintenance never working, as it happens before first broadcasts. - let mut maintenance = time::interval_at( - Instant::now() + service.initial_delay, - service.maintenance_period, - ); - - let mut status_ticker = time::interval(STATUS_REPORT_INTERVAL); - loop { - trace!(target: "aleph-network", "Manager Loop started a next iteration"); - tokio::select! { - maybe_command = self.commands_from_user.next() => { - trace!(target: "aleph-network", "Manager received a command from user"); - match maybe_command { - Some(command) => match service.on_command(command).await { - Ok(to_send) => self.handle_service_actions(to_send)?, - Err(e) => warn!(target: "aleph-network", "Failed to update handler: {:?}", e), - }, - None => return Err(Error::CommandsChannel), - } - }, - maybe_message = self.messages_from_user.next() => { - trace!(target: "aleph-network", "Manager received a message from user"); - match maybe_message { - Some((message, session_id, recipient)) => for message in service.on_user_message(message, session_id, recipient) { - self.send_data(message); - }, - None => return Err(Error::MessageChannel), - } - }, - maybe_data = self.validator_network.next() => { - trace!(target: "aleph-network", "Manager received some data from network"); - match maybe_data { - Some(DataInSession{data, session_id}) => if let Err(e) = service.send_session_data(&session_id, data) { - match e { - SendError::UserSend => trace!(target: "aleph-network", "Failed to send to user in session."), - SendError::NoSession => trace!(target: "aleph-network", "Received message for unknown session."), - } - }, - None => return Err(Error::ValidatorNetwork), - } - }, - maybe_authentication = self.gossip_network.next() => { - let authentication = maybe_authentication.map_err(Error::GossipNetwork)?; - trace!(target: "aleph-network", "Manager received an authentication from network"); - match authentication.try_into() { - Ok(message) => self.handle_service_actions(service.on_discovery_message(message))?, - Err(e) => warn!(target: "aleph-network", "Error casting versioned authentication to discovery message: {:?}", e), - } - }, - _ = maintenance.tick() => { - debug!(target: "aleph-network", "Manager starts maintenence"); - for to_send in service.discovery() { - self.send_authentications(to_send.into())?; - } - }, - _ = status_ticker.tick() => { - service.status_report(); - } - } - } - } -} - #[cfg(test)] mod tests { use std::{iter, time::Duration}; - use futures::{channel::oneshot, StreamExt}; + use futures::StreamExt; - use super::{Config, SendError, Service, ServiceActions, SessionCommand}; + use super::{ + ConnectionCommand, Manager, ManagerActions, PreNonvalidatorSession, PreValidatorSession, + SendError, + }; use crate::{ network::{ clique::mock::{random_address, MockAddressingInformation}, - manager::{compatibility::PeerAuthentications, DataInSession, DiscoveryMessage}, mock::crypto_basics, - ConnectionCommand, + session::{compatibility::PeerAuthentications, data::DataInSession, DiscoveryMessage}, }, Recipient, SessionId, }; const NUM_NODES: usize = 7; - const MAINTENANCE_PERIOD: Duration = Duration::from_secs(120); const DISCOVERY_PERIOD: Duration = Duration::from_secs(60); - const INITIAL_DELAY: Duration = Duration::from_secs(5); - fn build() -> Service { - Service::new( - random_address(), - Config::new(MAINTENANCE_PERIOD, DISCOVERY_PERIOD, INITIAL_DELAY), - ) + fn build() -> Manager { + Manager::new(random_address(), DISCOVERY_PERIOD) } #[tokio::test] async fn starts_nonvalidator_session() { - let mut service = build(); + let mut manager = build(); let (_, verifier) = crypto_basics(NUM_NODES).await; let session_id = SessionId(43); - let ServiceActions { + let ManagerActions { maybe_command, maybe_message, - } = service - .on_command(SessionCommand::StartNonvalidator(session_id, verifier)) + } = manager + .update_nonvalidator_session(PreNonvalidatorSession { + session_id, + verifier, + }) .await .unwrap(); assert!(maybe_command.is_none()); assert!(maybe_message.is_none()); assert_eq!( - service.send_session_data(&session_id, -43), + manager.send_session_data(&session_id, -43), Err(SendError::NoSession) ); } #[tokio::test] async fn starts_validator_session() { - let mut service = build(); + let mut manager = build(); let (validator_data, verifier) = crypto_basics(NUM_NODES).await; let (node_id, pen) = validator_data[0].clone(); let session_id = SessionId(43); - let (result_for_user, result_from_service) = oneshot::channel(); - let ServiceActions { - maybe_command, - maybe_message, - } = service - .on_command(SessionCommand::StartValidator( + let ( + ManagerActions { + maybe_command, + maybe_message, + }, + _data_from_network, + ) = manager + .update_validator_session(PreValidatorSession { session_id, verifier, node_id, pen, - Some(result_for_user), - )) + }) .await .unwrap(); assert!(maybe_command.is_none()); assert!(maybe_message.is_some()); - let _data_from_network = result_from_service.await.unwrap(); - assert_eq!(service.send_session_data(&session_id, -43), Ok(())); + assert_eq!(manager.send_session_data(&session_id, -43), Ok(())); } #[tokio::test] async fn stops_session() { - let mut service = build(); + let mut manager = build(); let (validator_data, verifier) = crypto_basics(NUM_NODES).await; let (node_id, pen) = validator_data[0].clone(); let session_id = SessionId(43); - let (result_for_user, result_from_service) = oneshot::channel(); - let ServiceActions { - maybe_command, - maybe_message, - } = service - .on_command(SessionCommand::StartValidator( + let ( + ManagerActions { + maybe_command, + maybe_message, + }, + mut data_from_network, + ) = manager + .update_validator_session(PreValidatorSession { session_id, verifier, node_id, pen, - Some(result_for_user), - )) + }) .await .unwrap(); assert!(maybe_command.is_none()); assert!(maybe_message.is_some()); - assert_eq!(service.send_session_data(&session_id, -43), Ok(())); - let mut data_from_network = result_from_service.await.unwrap(); + assert_eq!(manager.send_session_data(&session_id, -43), Ok(())); assert_eq!(data_from_network.next().await, Some(-43)); - let ServiceActions { + let ManagerActions { maybe_command, maybe_message, - } = service - .on_command(SessionCommand::Stop(session_id)) - .await - .unwrap(); + } = manager.finish_session(session_id); assert!(maybe_command.is_none()); assert!(maybe_message.is_none()); assert_eq!( - service.send_session_data(&session_id, -43), + manager.send_session_data(&session_id, -43), Err(SendError::NoSession) ); assert!(data_from_network.next().await.is_none()); @@ -851,26 +559,28 @@ mod tests { #[tokio::test] async fn handles_broadcast() { - let mut service = build(); + let mut manager = build(); let (validator_data, verifier) = crypto_basics(NUM_NODES).await; let (node_id, pen) = validator_data[0].clone(); let session_id = SessionId(43); - service - .on_command(SessionCommand::StartValidator( + manager + .update_validator_session(PreValidatorSession { session_id, - verifier.clone(), + verifier: verifier.clone(), node_id, pen, - None, - )) + }) .await .unwrap(); - let mut other_service = build(); + let mut other_manager = build(); let (node_id, pen) = validator_data[1].clone(); - let ServiceActions { maybe_message, .. } = other_service - .on_command(SessionCommand::StartValidator( - session_id, verifier, node_id, pen, None, - )) + let (ManagerActions { maybe_message, .. }, _) = other_manager + .update_validator_session(PreValidatorSession { + session_id, + verifier, + node_id, + pen, + }) .await .unwrap(); let message = maybe_message.expect("there should be a discovery message"); @@ -881,10 +591,10 @@ mod tests { ), message => panic!("Expected both authentications, got {:?}", message), }; - let ServiceActions { + let ManagerActions { maybe_command, maybe_message, - } = service.on_discovery_message(message); + } = manager.on_discovery_message(message); assert_eq!( maybe_command, Some(ConnectionCommand::AddReserved( @@ -896,26 +606,28 @@ mod tests { #[tokio::test] async fn sends_user_data() { - let mut service = build(); + let mut manager = build(); let (validator_data, verifier) = crypto_basics(NUM_NODES).await; let (node_id, pen) = validator_data[0].clone(); let session_id = SessionId(43); - service - .on_command(SessionCommand::StartValidator( + manager + .update_validator_session(PreValidatorSession { session_id, - verifier.clone(), + verifier: verifier.clone(), node_id, pen, - None, - )) + }) .await .unwrap(); - let mut other_service = build(); + let mut other_manager = build(); let (node_id, pen) = validator_data[1].clone(); - let ServiceActions { maybe_message, .. } = other_service - .on_command(SessionCommand::StartValidator( - session_id, verifier, node_id, pen, None, - )) + let (ManagerActions { maybe_message, .. }, _) = other_manager + .update_validator_session(PreValidatorSession { + session_id, + verifier, + node_id, + pen, + }) .await .unwrap(); let message = match maybe_message.expect("there should be a discovery message") { @@ -924,8 +636,8 @@ mod tests { } message => panic!("Expected both authentications, got {:?}", message), }; - service.on_discovery_message(message); - let messages = service.on_user_message(2137, session_id, Recipient::Everyone); + manager.on_discovery_message(message); + let messages = manager.on_user_message(2137, session_id, Recipient::Everyone); assert_eq!(messages.len(), 1); let (network_data, _) = &messages[0]; assert_eq!( diff --git a/finality-aleph/src/network/session/mod.rs b/finality-aleph/src/network/session/mod.rs new file mode 100644 index 0000000000..a3b7ed75ad --- /dev/null +++ b/finality-aleph/src/network/session/mod.rs @@ -0,0 +1,178 @@ +//! Managing the validator connections in sessions using the gossip network. +use std::fmt::Display; + +use codec::{Decode, Encode}; +use futures::channel::mpsc; + +use crate::{ + crypto::{AuthorityPen, AuthorityVerifier, Signature}, + network::{ + data::{ + component::{Sender, SimpleNetwork}, + SendError, + }, + AddressingInformation, Data, + }, + NodeIndex, Recipient, SessionId, +}; + +mod compatibility; +mod connections; +mod data; +mod discovery; +mod handler; +mod manager; +mod service; + +pub use compatibility::{ + DiscoveryMessage, LegacyDiscoveryMessage, PeerAuthentications, VersionedAuthentication, +}; +use connections::Connections; +#[cfg(test)] +pub use data::DataInSession; +pub use discovery::Discovery; +#[cfg(test)] +pub use handler::tests::{authentication, legacy_authentication}; +pub use handler::{Handler as SessionHandler, HandlerError as SessionHandlerError}; +pub use service::{Config as ConnectionManagerConfig, ManagerError, Service as ConnectionManager}; + +/// Data validators used to use to authenticate themselves for a single session +/// and disseminate their addresses. +#[derive(Clone, Debug, PartialEq, Eq, Hash, Encode, Decode)] +pub struct LegacyAuthData { + addresses: Vec, + node_id: NodeIndex, + session_id: SessionId, +} + +impl LegacyAuthData { + pub fn session(&self) -> SessionId { + self.session_id + } + + pub fn creator(&self) -> NodeIndex { + self.node_id + } +} + +/// Data validators use to authenticate themselves for a single session +/// and disseminate their addresses. +#[derive(Clone, Debug, PartialEq, Eq, Hash, Encode, Decode)] +pub struct AuthData { + address: A, + node_id: NodeIndex, + session_id: SessionId, +} + +impl AuthData { + pub fn session(&self) -> SessionId { + self.session_id + } + + pub fn creator(&self) -> NodeIndex { + self.node_id + } + + pub fn address(&self) -> A { + self.address.clone() + } +} + +impl>> From> for LegacyAuthData { + fn from(auth_data: AuthData) -> Self { + let AuthData { + address, + node_id, + session_id, + } = auth_data; + let addresses = address.into(); + LegacyAuthData { + addresses, + node_id, + session_id, + } + } +} + +impl>> TryFrom> + for AuthData +{ + type Error = (); + + fn try_from(legacy_auth_data: LegacyAuthData) -> Result { + let LegacyAuthData { + addresses, + node_id, + session_id, + } = legacy_auth_data; + let address = addresses.try_into().map_err(|_| ())?; + Ok(AuthData { + address, + node_id, + session_id, + }) + } +} + +/// A full legacy authentication, consisting of a signed LegacyAuthData. +pub type LegacyAuthentication = (LegacyAuthData, Signature); + +/// A full authentication, consisting of a signed AuthData. +pub type Authentication = (AuthData, Signature); + +/// Sends data within a single session. +#[derive(Clone)] +pub struct SessionSender { + session_id: SessionId, + messages_for_network: mpsc::UnboundedSender<(D, SessionId, Recipient)>, +} + +impl Sender for SessionSender { + fn send(&self, data: D, recipient: Recipient) -> Result<(), SendError> { + self.messages_for_network + .unbounded_send((data, self.session_id, recipient)) + .map_err(|_| SendError::SendFailed) + } +} + +/// Sends and receives data within a single session. +type Network = SimpleNetwork, SessionSender>; + +/// An interface for managing session networks for validators and nonvalidators. +#[async_trait::async_trait] +pub trait SessionManager: Send + Sync + 'static { + type Error: Display; + + /// Start participating or update the verifier in the given session where you are not a + /// validator. + fn start_nonvalidator_session( + &self, + session_id: SessionId, + verifier: AuthorityVerifier, + ) -> Result<(), Self::Error>; + + /// Start participating or update the information about the given session where you are a + /// validator. Returns a session network to be used for sending and receiving data within the + /// session. + async fn start_validator_session( + &self, + session_id: SessionId, + verifier: AuthorityVerifier, + node_id: NodeIndex, + pen: AuthorityPen, + ) -> Result, Self::Error>; + + /// Start participating or update the information about the given session where you are a + /// validator. Used for early starts when you don't yet need the returned network, but would + /// like to start discovery. + fn early_start_validator_session( + &self, + session_id: SessionId, + verifier: AuthorityVerifier, + node_id: NodeIndex, + pen: AuthorityPen, + ) -> Result<(), Self::Error>; + + /// Stop participating in the given session. + fn stop_session(&self, session_id: SessionId) -> Result<(), Self::Error>; +} diff --git a/finality-aleph/src/network/session/service.rs b/finality-aleph/src/network/session/service.rs new file mode 100644 index 0000000000..080368397d --- /dev/null +++ b/finality-aleph/src/network/session/service.rs @@ -0,0 +1,410 @@ +use std::{ + cmp, + fmt::{Debug, Display, Error as FmtError, Formatter}, + time::Duration, +}; + +use futures::{ + channel::{mpsc, oneshot}, + StreamExt, +}; +use log::{debug, trace, warn}; +use tokio::time::{self, Instant}; + +use crate::{ + abft::Recipient, + crypto::{AuthorityPen, AuthorityVerifier}, + network::{ + clique::{Network as CliqueNetwork, PublicKey}, + session::{ + data::DataInSession, + manager::{ + AddressedData, ConnectionCommand, Manager, ManagerActions, PreNonvalidatorSession, + PreValidatorSession, SendError, + }, + Network, SessionHandlerError, SessionManager, SessionSender, VersionedAuthentication, + }, + AddressingInformation, Data, GossipNetwork, NetworkIdentity, + }, + MillisecsPerBlock, NodeIndex, SessionId, SessionPeriod, STATUS_REPORT_INTERVAL, +}; + +/// Commands for manipulating sessions, stopping them and starting both validator and non-validator +/// sessions. +enum SessionCommand { + StartValidator( + SessionId, + AuthorityVerifier, + NodeIndex, + AuthorityPen, + Option>>, + ), + StartNonvalidator(SessionId, AuthorityVerifier), + Stop(SessionId), +} + +/// Manages sessions for which the network should be active. +struct ManagerInterface { + commands_for_service: mpsc::UnboundedSender>, + messages_for_service: mpsc::UnboundedSender<(D, SessionId, Recipient)>, +} + +/// What went wrong during a session management operation. +#[derive(Debug)] +pub enum ManagerError { + CommandSendFailed, + NetworkReceiveFailed, +} + +impl Display for ManagerError { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { + use ManagerError::*; + match self { + CommandSendFailed => write!(f, "failed to send a command to the service"), + NetworkReceiveFailed => write!(f, "the service did not return a network"), + } + } +} + +#[async_trait::async_trait] +impl SessionManager for ManagerInterface { + type Error = ManagerError; + + fn start_nonvalidator_session( + &self, + session_id: SessionId, + verifier: AuthorityVerifier, + ) -> Result<(), Self::Error> { + self.commands_for_service + .unbounded_send(SessionCommand::StartNonvalidator(session_id, verifier)) + .map_err(|_| ManagerError::CommandSendFailed) + } + + async fn start_validator_session( + &self, + session_id: SessionId, + verifier: AuthorityVerifier, + node_id: NodeIndex, + pen: AuthorityPen, + ) -> Result, Self::Error> { + let (result_for_us, result_from_service) = oneshot::channel(); + self.commands_for_service + .unbounded_send(SessionCommand::StartValidator( + session_id, + verifier, + node_id, + pen, + Some(result_for_us), + )) + .map_err(|_| ManagerError::CommandSendFailed)?; + + let data_from_network = result_from_service + .await + .map_err(|_| ManagerError::NetworkReceiveFailed)?; + let messages_for_network = self.messages_for_service.clone(); + + Ok(Network::new( + data_from_network, + SessionSender { + session_id, + messages_for_network, + }, + )) + } + + fn early_start_validator_session( + &self, + session_id: SessionId, + verifier: AuthorityVerifier, + node_id: NodeIndex, + pen: AuthorityPen, + ) -> Result<(), Self::Error> { + self.commands_for_service + .unbounded_send(SessionCommand::StartValidator( + session_id, verifier, node_id, pen, None, + )) + .map_err(|_| ManagerError::CommandSendFailed) + } + + fn stop_session(&self, session_id: SessionId) -> Result<(), Self::Error> { + self.commands_for_service + .unbounded_send(SessionCommand::Stop(session_id)) + .map_err(|_| ManagerError::CommandSendFailed) + } +} + +/// Configuration for the session manager. Controls how often the maintenance and +/// rebroadcasts are triggerred. Also controls when maintenance starts. +pub struct Config { + discovery_cooldown: Duration, + maintenance_period: Duration, + initial_delay: Duration, +} + +impl Config { + fn new( + discovery_cooldown: Duration, + maintenance_period: Duration, + initial_delay: Duration, + ) -> Self { + Config { + discovery_cooldown, + maintenance_period, + initial_delay, + } + } + + /// Returns a configuration that triggers maintenance about 5 times per session. + pub fn with_session_period( + session_period: &SessionPeriod, + millisecs_per_block: &MillisecsPerBlock, + ) -> Self { + let discovery_cooldown = + Duration::from_millis(millisecs_per_block.0 * session_period.0 as u64 / 5); + let maintenance_period = discovery_cooldown / 2; + let initial_delay = cmp::min( + Duration::from_millis(millisecs_per_block.0 * 10), + maintenance_period, + ); + Config::new(discovery_cooldown, maintenance_period, initial_delay) + } +} + +/// The connection manager service. +pub struct Service< + D: Data, + M: Data + Debug, + NI: NetworkIdentity, + CN: CliqueNetwork>, + GN: GossipNetwork>, +> where + NI::PeerId: PublicKey, + NI::AddressingInformation: TryFrom> + Into>, +{ + manager: Manager, + commands_from_user: mpsc::UnboundedReceiver>, + messages_from_user: mpsc::UnboundedReceiver<(D, SessionId, Recipient)>, + validator_network: CN, + gossip_network: GN, + maintenance_period: Duration, + initial_delay: Duration, +} + +/// Errors that can happen during the network service operations. +#[derive(Debug, PartialEq, Eq)] +pub enum Error { + CommandsChannel, + MessageChannel, + ValidatorNetwork, + GossipNetwork(GE), +} + +impl Display for Error { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { + use Error::*; + match self { + CommandsChannel => write!(f, "commands channel unexpectedly closed"), + MessageChannel => write!(f, "message channel unexpectedly closed"), + ValidatorNetwork => write!(f, "validator network unexpectedly done"), + GossipNetwork(e) => write!(f, "gossip network unexpectedly done: {}", e), + } + } +} + +impl< + D: Data, + M: Data + Debug, + NI: NetworkIdentity, + CN: CliqueNetwork>, + GN: GossipNetwork>, + > Service +where + NI::PeerId: PublicKey, + NI::AddressingInformation: TryFrom> + Into>, +{ + pub fn new( + network_identity: NI, + validator_network: CN, + gossip_network: GN, + config: Config, + ) -> ( + Service, + impl SessionManager, + ) { + let Config { + discovery_cooldown, + maintenance_period, + initial_delay, + } = config; + let manager = Manager::new(network_identity, discovery_cooldown); + let (commands_for_service, commands_from_user) = mpsc::unbounded(); + let (messages_for_service, messages_from_user) = mpsc::unbounded(); + ( + Service { + manager, + commands_from_user, + messages_from_user, + validator_network, + gossip_network, + maintenance_period, + initial_delay, + }, + ManagerInterface { + commands_for_service, + messages_for_service, + }, + ) + } + + fn send_data(&self, to_send: AddressedData, NI::PeerId>) { + self.validator_network.send(to_send.0, to_send.1) + } + + fn send_authentications( + &mut self, + to_send: Vec>, + ) -> Result<(), Error> { + for auth in to_send { + self.gossip_network + .broadcast(auth) + .map_err(Error::GossipNetwork)?; + } + Ok(()) + } + + fn handle_connection_command( + &mut self, + connection_command: ConnectionCommand, + ) { + match connection_command { + ConnectionCommand::AddReserved(addresses) => { + for address in addresses { + self.validator_network + .add_connection(address.peer_id(), address); + } + } + ConnectionCommand::DelReserved(peers) => { + for peer in peers { + self.validator_network.remove_connection(peer); + } + } + }; + } + + fn handle_manager_actions( + &mut self, + ManagerActions { + maybe_command, + maybe_message, + }: ManagerActions, + ) -> Result<(), Error> { + if let Some(command) = maybe_command { + self.handle_connection_command(command); + } + if let Some(message) = maybe_message { + self.send_authentications(message.into())?; + } + Ok(()) + } + + /// Handle a session command. + /// Returns actions the manager wants to take or an error if the session command is invalid. + async fn handle_command( + &mut self, + command: SessionCommand, + ) -> Result, SessionHandlerError> { + use SessionCommand::*; + match command { + StartValidator(session_id, verifier, node_id, pen, result_for_user) => { + let pre_session = PreValidatorSession { + session_id, + verifier, + node_id, + pen, + }; + let (actions, data_from_network) = + self.manager.update_validator_session(pre_session).await?; + if let Some(result_for_user) = result_for_user { + if result_for_user.send(data_from_network).is_err() { + warn!(target: "aleph-network", "Failed to send started session.") + } + } + Ok(actions) + } + StartNonvalidator(session_id, verifier) => { + let pre_session = PreNonvalidatorSession { + session_id, + verifier, + }; + self.manager.update_nonvalidator_session(pre_session).await + } + Stop(session_id) => Ok(self.manager.finish_session(session_id)), + } + } + + /// Run the connection manager service. + pub async fn run(mut self) -> Result<(), Error> { + // Initial delay is needed so that Network is fully set up and we received some first discovery broadcasts from other nodes. + // Otherwise this might cause first maintenance never working, as it happens before first broadcasts. + let mut maintenance = + time::interval_at(Instant::now() + self.initial_delay, self.maintenance_period); + + let mut status_ticker = time::interval(STATUS_REPORT_INTERVAL); + loop { + trace!(target: "aleph-network", "Manager Loop started a next iteration"); + tokio::select! { + maybe_command = self.commands_from_user.next() => { + trace!(target: "aleph-network", "Manager received a command from user"); + match maybe_command { + Some(command) => match self.handle_command(command).await { + Ok(to_send) => self.handle_manager_actions(to_send)?, + Err(e) => warn!(target: "aleph-network", "Failed to update handler: {:?}", e), + }, + None => return Err(Error::CommandsChannel), + } + }, + maybe_message = self.messages_from_user.next() => { + trace!(target: "aleph-network", "Manager received a message from user"); + match maybe_message { + Some((message, session_id, recipient)) => for message in self.manager.on_user_message(message, session_id, recipient) { + self.send_data(message); + }, + None => return Err(Error::MessageChannel), + } + }, + maybe_data = self.validator_network.next() => { + trace!(target: "aleph-network", "Manager received some data from network"); + match maybe_data { + Some(DataInSession{data, session_id}) => if let Err(e) = self.manager.send_session_data(&session_id, data) { + match e { + SendError::UserSend => trace!(target: "aleph-network", "Failed to send to user in session."), + SendError::NoSession => trace!(target: "aleph-network", "Received message for unknown session."), + } + }, + None => return Err(Error::ValidatorNetwork), + } + }, + maybe_authentication = self.gossip_network.next() => { + let authentication = maybe_authentication.map_err(Error::GossipNetwork)?; + trace!(target: "aleph-network", "Manager received an authentication from network"); + match authentication.try_into() { + Ok(message) => { + let manager_actions = self.manager.on_discovery_message(message); + self.handle_manager_actions(manager_actions)? + }, + Err(e) => warn!(target: "aleph-network", "Error casting versioned authentication to discovery message: {:?}", e), + } + }, + _ = maintenance.tick() => { + debug!(target: "aleph-network", "Manager starts maintenence"); + for to_send in self.manager.discovery() { + self.send_authentications(to_send.into())?; + } + }, + _ = status_ticker.tick() => { + self.manager.status_report(); + } + } + } + } +} diff --git a/finality-aleph/src/network/manager/testing.rs b/finality-aleph/src/network/session/testing.rs similarity index 100% rename from finality-aleph/src/network/manager/testing.rs rename to finality-aleph/src/network/session/testing.rs diff --git a/finality-aleph/src/nodes/validator_node.rs b/finality-aleph/src/nodes/validator_node.rs index 50fb08f597..5301fa25a6 100644 --- a/finality-aleph/src/nodes/validator_node.rs +++ b/finality-aleph/src/nodes/validator_node.rs @@ -13,9 +13,9 @@ use crate::{ crypto::AuthorityPen, network::{ clique::Service, - setup_io, + session::{ConnectionManager, ConnectionManagerConfig}, tcp::{new_tcp_network, KEY_TYPE}, - ConnectionManager, ConnectionManagerConfig, GossipService, SessionManager, + GossipService, }, nodes::{setup_justification_handler, JustificationParams}, party::{ @@ -119,21 +119,19 @@ where session_map: session_authorities.clone(), }); - let (connection_io, session_io) = setup_io(validator_network, gossip_network); - - let connection_manager = ConnectionManager::new( + let (connection_manager_service, connection_manager) = ConnectionManager::new( network_identity, + validator_network, + gossip_network, ConnectionManagerConfig::with_session_period(&session_period, &millisecs_per_block), ); let connection_manager_task = async move { - if let Err(e) = connection_io.run(connection_manager).await { + if let Err(e) = connection_manager_service.run().await { panic!("Failed to run connection manager: {}", e); } }; - let session_manager = SessionManager::new(session_io); - spawn_handle.spawn("aleph/justification_handler", None, handler_task); debug!(target: "aleph-party", "JustificationHandler has started."); @@ -158,7 +156,7 @@ where block_requester, metrics, spawn_handle.into(), - session_manager, + connection_manager, keystore, ), _phantom: PhantomData, diff --git a/finality-aleph/src/party/manager/mod.rs b/finality-aleph/src/party/manager/mod.rs index 199e5c9092..ee4081303c 100644 --- a/finality-aleph/src/party/manager/mod.rs +++ b/finality-aleph/src/party/manager/mod.rs @@ -1,4 +1,4 @@ -use std::{collections::HashSet, fmt::Debug, marker::PhantomData, sync::Arc}; +use std::{collections::HashSet, marker::PhantomData, sync::Arc}; use aleph_primitives::{AlephSessionApi, KEY_TYPE}; use async_trait::async_trait; @@ -25,7 +25,8 @@ use crate::{ component::{Network, NetworkMap, SimpleNetwork}, split::split, }, - ManagerError, RequestBlocks, SessionManager, SessionSender, + session::{SessionManager, SessionSender}, + RequestBlocks, }, party::{ backup::ABFTBackup, manager::aggregator::AggregatorVersion, traits::NodeSessionManager, @@ -87,13 +88,14 @@ where phantom: PhantomData, } -pub struct NodeSessionManagerImpl +pub struct NodeSessionManagerImpl where B: BlockT, C: crate::ClientForAleph + Send + Sync + 'static, BE: Backend + 'static, SC: SelectChain + 'static, RB: RequestBlocks, + SM: SessionManager> + 'static, { client: Arc, select_chain: SC, @@ -103,12 +105,12 @@ where block_requester: RB, metrics: Option::Hash>>, spawn_handle: SpawnHandle, - session_manager: SessionManager>, + session_manager: SM, keystore: Arc, _phantom: PhantomData, } -impl NodeSessionManagerImpl +impl NodeSessionManagerImpl where B: BlockT, C: crate::ClientForAleph + Send + Sync + 'static, @@ -116,6 +118,7 @@ where BE: Backend + 'static, SC: SelectChain + 'static, RB: RequestBlocks, + SM: SessionManager>, { #[allow(clippy::too_many_arguments)] pub fn new( @@ -127,7 +130,7 @@ where block_requester: RB, metrics: Option::Hash>>, spawn_handle: SpawnHandle, - session_manager: SessionManager>, + session_manager: SM, keystore: Arc, ) -> Self { Self { @@ -305,11 +308,14 @@ where justifications_for_chain: self.authority_justification_tx.clone(), }; - let data_network = self + let data_network = match self .session_manager .start_validator_session(session_id, authority_verifier, node_id, authority_pen) .await - .expect("Failed to start validator session!"); + { + Ok(data_network) => data_network, + Err(e) => panic!("Failed to start validator session: {}", e), + }; let last_block_of_previous_session = session_boundaries .first_block() @@ -359,14 +365,8 @@ where } } -#[derive(Debug)] -pub enum SessionManagerError { - NotAuthority, - ManagerError(ManagerError), -} - #[async_trait] -impl NodeSessionManager for NodeSessionManagerImpl +impl NodeSessionManager for NodeSessionManagerImpl where B: BlockT, C: crate::ClientForAleph + Send + Sync + 'static, @@ -374,8 +374,9 @@ where BE: Backend + 'static, SC: SelectChain + 'static, RB: RequestBlocks, + SM: SessionManager>, { - type Error = SessionManagerError; + type Error = SM::Error; async fn spawn_authority_task_for_session( &self, @@ -404,20 +405,20 @@ where async fn early_start_validator_session( &self, session: SessionId, + node_id: NodeIndex, authorities: &[AuthorityId], ) -> Result<(), Self::Error> { - let node_id = match self.node_idx(authorities).await { - Some(id) => id, - None => return Err(SessionManagerError::NotAuthority), - }; let authority_verifier = AuthorityVerifier::new(authorities.to_vec()); let authority_pen = AuthorityPen::new(authorities[node_id.0].clone(), self.keystore.clone()) .await .expect("The keys should sign successfully"); - self.session_manager - .early_start_validator_session(session, authority_verifier, node_id, authority_pen) - .map_err(SessionManagerError::ManagerError) + self.session_manager.early_start_validator_session( + session, + authority_verifier, + node_id, + authority_pen, + ) } fn start_nonvalidator_session( @@ -429,13 +430,10 @@ where self.session_manager .start_nonvalidator_session(session, authority_verifier) - .map_err(SessionManagerError::ManagerError) } fn stop_session(&self, session: SessionId) -> Result<(), Self::Error> { - self.session_manager - .stop_session(session) - .map_err(SessionManagerError::ManagerError) + self.session_manager.stop_session(session) } async fn node_idx(&self, authorities: &[AuthorityId]) -> Option { diff --git a/finality-aleph/src/party/mocks.rs b/finality-aleph/src/party/mocks.rs index 2a0d78441b..21977bc100 100644 --- a/finality-aleph/src/party/mocks.rs +++ b/finality-aleph/src/party/mocks.rs @@ -1,5 +1,6 @@ use std::{ collections::HashSet, + fmt::{Debug, Display, Error as FmtError, Formatter}, hash::Hash, sync::{Arc, Mutex}, }; @@ -111,9 +112,17 @@ impl MockNodeSessionManager { } } +pub struct MockNodeSessionManagerError; + +impl Display for MockNodeSessionManagerError { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { + write!(f, "mock node session manager error") + } +} + #[async_trait] impl NodeSessionManager for Arc { - type Error = (); + type Error = MockNodeSessionManagerError; async fn spawn_authority_task_for_session( &self, @@ -133,6 +142,7 @@ impl NodeSessionManager for Arc { async fn early_start_validator_session( &self, session: SessionId, + _node_id: NodeIndex, _authorities: &[AuthorityId], ) -> Result<(), Self::Error> { self.insert(self.session_early_started.clone(), session); diff --git a/finality-aleph/src/party/mod.rs b/finality-aleph/src/party/mod.rs index fd46336da6..50c9c78e3f 100644 --- a/finality-aleph/src/party/mod.rs +++ b/finality-aleph/src/party/mod.rs @@ -154,7 +154,7 @@ where .session_manager .start_nonvalidator_session(session_id, authorities) { - warn!(target: "aleph-party", "Failed to start nonvalidator session{:?}:{:?}", session_id, e); + warn!(target: "aleph-party", "Failed to start nonvalidator session{:?}: {}", session_id, e); } None }; @@ -194,21 +194,22 @@ where } => { let next_session_authorities = next_session_authority_data.authorities(); match self.session_manager.node_idx(next_session_authorities).await { - Some(_) => if let Err(e) = self + Some(next_session_node_id) => if let Err(e) = self .session_manager .early_start_validator_session( next_session_id, + next_session_node_id, next_session_authorities, ).await { - warn!(target: "aleph-party", "Failed to early start validator session{:?}:{:?}", next_session_id, e); + warn!(target: "aleph-party", "Failed to early start validator session{:?}: {}", next_session_id, e); } None => { if let Err(e) = self .session_manager .start_nonvalidator_session(next_session_id, next_session_authorities) { - warn!(target: "aleph-party", "Failed to early start nonvalidator session{:?}:{:?}", next_session_id, e); + warn!(target: "aleph-party", "Failed to early start nonvalidator session{:?}: {}", next_session_id, e); } } } @@ -232,7 +233,7 @@ where } } if let Err(e) = self.session_manager.stop_session(session_id) { - warn!(target: "aleph-party", "Session Manager failed to stop in session {:?}: {:?}", session_id, e) + warn!(target: "aleph-party", "Session Manager failed to stop in session {:?}: {}", session_id, e) } } diff --git a/finality-aleph/src/party/traits.rs b/finality-aleph/src/party/traits.rs index 74c139a284..21198d737f 100644 --- a/finality-aleph/src/party/traits.rs +++ b/finality-aleph/src/party/traits.rs @@ -1,4 +1,4 @@ -use std::fmt::Debug; +use std::fmt::{Debug, Display}; use async_trait::async_trait; use sp_runtime::traits::{Block as BlockT, NumberFor}; @@ -34,7 +34,7 @@ pub trait ChainState { #[async_trait] /// Abstraction over session related tasks. pub trait NodeSessionManager { - type Error: Debug; + type Error: Display; /// Spawns every task needed for an authority to run in a session. async fn spawn_authority_task_for_session( @@ -49,6 +49,7 @@ pub trait NodeSessionManager { async fn early_start_validator_session( &self, session: SessionId, + node_id: NodeIndex, authorities: &[AuthorityId], ) -> Result<(), Self::Error>; diff --git a/finality-aleph/src/testing/network.rs b/finality-aleph/src/testing/network.rs index 07806ef495..77445be02c 100644 --- a/finality-aleph/src/testing/network.rs +++ b/finality-aleph/src/testing/network.rs @@ -18,13 +18,12 @@ use crate::{ }, data::Network, mock::{crypto_basics, MockData}, - setup_io, - testing::{ - authentication, legacy_authentication, DataInSession, LegacyDiscoveryMessage, - MockEvent, MockRawNetwork, SessionHandler, VersionedAuthentication, + session::{ + authentication, legacy_authentication, ConnectionManager, ConnectionManagerConfig, + DataInSession, LegacyDiscoveryMessage, ManagerError, SessionHandler, SessionManager, + VersionedAuthentication, }, - AddressingInformation, ConnectionManager, ConnectionManagerConfig, GossipService, - NetworkIdentity, Protocol, SessionManager, + AddressingInformation, GossipService, MockEvent, MockRawNetwork, Protocol, }, MillisecsPerBlock, NodeIndex, Recipient, SessionId, SessionPeriod, }; @@ -60,19 +59,10 @@ impl Authority { } } -impl NetworkIdentity for Authority { - type PeerId = MockPublicKey; - type AddressingInformation = MockAddressingInformation; - - fn identity(&self) -> Self::AddressingInformation { - self.address.clone() - } -} - struct TestData { pub authorities: Vec, pub authority_verifier: AuthorityVerifier, - pub session_manager: SessionManager, + pub session_manager: Box>, pub network: MockRawNetwork, pub validator_network: MockCliqueNetwork>, network_manager_exit_tx: oneshot::Sender<()>, @@ -108,19 +98,17 @@ async fn prepare_one_session_test_data() -> TestData { let (gossip_service, gossip_network) = GossipService::new(network.clone(), task_manager.spawn_handle()); - let (connection_io, session_io) = setup_io(validator_network.clone(), gossip_network); - - let connection_manager = ConnectionManager::new( - authorities[0].clone(), + let (connection_manager_service, session_manager) = ConnectionManager::new( + authorities[0].address(), + validator_network.clone(), + gossip_network, ConnectionManagerConfig::with_session_period(&SESSION_PERIOD, &MILLISECS_PER_BLOCK), ); - - let session_manager = SessionManager::new(session_io); + let session_manager = Box::new(session_manager); let network_manager_task = async move { tokio::select! { - _ = connection_io - .run(connection_manager) => { }, + _ =connection_manager_service.run() => { }, _ = network_manager_exit_rx => { }, }; }; @@ -161,7 +149,8 @@ impl TestData { node_id: usize, session_id: u32, ) -> impl Network { - self.session_manager + match self + .session_manager .start_validator_session( SessionId(session_id), self.authority_verifier.clone(), @@ -169,18 +158,21 @@ impl TestData { self.authorities[node_id].pen(), ) .await - .expect("Failed to start validator session!") + { + Ok(network) => network, + Err(e) => panic!("Failed to start validator session: {}", e), + } } fn early_start_validator_session(&self, node_id: usize, session_id: u32) { - self.session_manager - .early_start_validator_session( - SessionId(session_id), - self.authority_verifier.clone(), - NodeIndex(node_id), - self.authorities[node_id].pen(), - ) - .expect("Failed to start validator session!"); + if let Err(e) = self.session_manager.early_start_validator_session( + SessionId(session_id), + self.authority_verifier.clone(), + NodeIndex(node_id), + self.authorities[node_id].pen(), + ) { + panic!("Failed to start validator session: {}", e); + } } async fn get_session_handler(