diff --git a/src/components.rs b/src/components.rs index 90c5a1f952..d5e853f1f1 100644 --- a/src/components.rs +++ b/src/components.rs @@ -2,6 +2,7 @@ //! //! Components are the building blocks of the whole application, wired together inside a reactor. //! Each component has a unified interface, expressed by the `Component` trait. +pub(crate) mod consensus; pub(crate) mod pinger; pub(crate) mod small_network; pub(crate) mod storage; diff --git a/src/components/consensus.rs b/src/components/consensus.rs new file mode 100644 index 0000000000..f7025e241d --- /dev/null +++ b/src/components/consensus.rs @@ -0,0 +1,91 @@ +//! The consensus component. Provides distributed consensus among the nodes in the network. + +use std::fmt; + +use serde::{Deserialize, Serialize}; + +use crate::{ + components::{small_network::NodeId, Component}, + effect::{requests::NetworkRequest, Effect, EffectBuilder, Multiple}, +}; + +/// The consensus component. +#[derive(Debug)] +pub(crate) struct Consensus { + // TODO +} + +/// Network message used by the consensus component. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub(crate) enum Message { + /// TODO: create actual message variants + Dummy, +} + +/// Consensus component event. +#[derive(Debug)] +pub(crate) enum Event { + /// An incoming network message. + // TODO: remove lint relaxation + #[allow(dead_code)] + MessageReceived { sender: NodeId, msg: Message }, + // TODO: remove lint relaxation + #[allow(dead_code)] + Timer, +} + +impl fmt::Display for Message { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Message::Dummy => write!(f, "dummy"), + } + } +} + +impl fmt::Display for Event { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Event::MessageReceived { sender, msg } => write!(f, "msg from {}: {}", sender, msg), + Event::Timer => write!(f, "timer"), + } + } +} + +impl Component for Consensus +where + REv: From + Send + From>, +{ + type Event = Event; + + fn handle_event( + &mut self, + eb: EffectBuilder, + event: Self::Event, + ) -> Multiple> { + match event { + Event::Timer => todo!(), + Event::MessageReceived { sender, msg } => self.handle_message(eb, sender, msg), + } + } +} + +impl Consensus { + /// Create and initialize a new consensus instance. + pub(crate) fn new + Send + From>>( + _eb: EffectBuilder, + ) -> (Self, Multiple>) { + let consensus = Consensus {}; + + (consensus, Default::default()) + } + + /// Handles an incoming message + fn handle_message + Send + From>>( + &mut self, + _eb: EffectBuilder, + _sender: NodeId, + _msg: Message, + ) -> Multiple> { + Default::default() + } +} diff --git a/src/reactor/validator.rs b/src/reactor/validator.rs index c874fb8d64..e3c061a286 100644 --- a/src/reactor/validator.rs +++ b/src/reactor/validator.rs @@ -5,10 +5,12 @@ use std::fmt::{self, Display, Formatter}; use derive_more::From; +use serde::{Deserialize, Serialize}; use crate::{ components::{ - pinger::{self, Message, Pinger}, + consensus::{self, Consensus}, + pinger::{self, Pinger}, storage::{self, Storage}, Component, }, @@ -21,6 +23,53 @@ use crate::{ Config, SmallNetwork, }; +#[derive(Debug, Clone, From, Serialize, Deserialize)] +enum Message { + #[from] + Pinger(pinger::Message), + #[from] + Consensus(consensus::Message), +} + +impl Display for Message { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + match self { + Message::Pinger(pinger) => write!(f, "Pinger::{}", pinger), + Message::Consensus(consensus) => write!(f, "Consensus::{}", consensus), + } + } +} + +// This is ugly, but it works around trait specialization not being stable yet: +trait IsNotMessage {} +impl IsNotMessage for pinger::Message {} +impl IsNotMessage for consensus::Message {} + +impl From> for NetworkRequest +where + P: Into + IsNotMessage, +{ + fn from(other: NetworkRequest) -> NetworkRequest { + match other { + NetworkRequest::SendMessage { + dest, + payload, + responder, + } => NetworkRequest::SendMessage { + dest, + payload: payload.into(), + responder, + }, + NetworkRequest::BroadcastMessage { payload, responder } => { + NetworkRequest::BroadcastMessage { + payload: payload.into(), + responder, + } + } + } + } +} + /// Top-level event for the reactor. #[derive(Debug, From)] #[must_use] @@ -33,11 +82,15 @@ enum Event { Storage(Box>), #[from] StorageConsumer(Box), + #[from] + Consensus(consensus::Event), } -impl From> for Event { - fn from(req: NetworkRequest) -> Self { - Event::Network(small_network::Event::from(req)) +impl + IsNotMessage> From> for Event { + fn from(req: NetworkRequest) -> Self { + Event::Network(small_network::Event::from( + NetworkRequest::::from(req), + )) } } @@ -46,6 +99,7 @@ struct Reactor { net: SmallNetwork, pinger: Pinger, storage: Storage, + consensus: Consensus, dummy_storage_consumer: storage::dummy::StorageConsumer, } @@ -65,18 +119,22 @@ impl reactor::Reactor for Reactor { let (dummy_storage_consumer, storage_consumer_effects) = storage::dummy::StorageConsumer::new(eb); + let (consensus, consensus_effects) = Consensus::new(eb); + let mut effects = reactor::wrap_effects(Event::Network, net_effects); effects.extend(reactor::wrap_effects(Event::Pinger, pinger_effects)); effects.extend(reactor::wrap_effects( |event| Event::StorageConsumer(Box::new(event)), storage_consumer_effects, )); + effects.extend(reactor::wrap_effects(Event::Consensus, consensus_effects)); Ok(( Reactor { net, pinger, storage, + consensus, dummy_storage_consumer, }, effects, @@ -103,6 +161,9 @@ impl reactor::Reactor for Reactor { |event| Event::StorageConsumer(Box::new(event)), self.dummy_storage_consumer.handle_event(eb, *ev), ), + Event::Consensus(ev) => { + reactor::wrap_effects(Event::Consensus, self.consensus.handle_event(eb, ev)) + } } } } @@ -114,6 +175,7 @@ impl Display for Event { Event::Pinger(ev) => write!(f, "pinger: {}", ev), Event::Storage(ev) => write!(f, "storage: {}", ev), Event::StorageConsumer(ev) => write!(f, "storage_consumer: {}", ev), + Event::Consensus(ev) => write!(f, "consensus: {}", ev), } } }