Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//!
//! Components are the building blocks of the whole application, wired together inside a reactor.
//! Each component has a unified interface, expressed by the `Component` trait.
pub(crate) mod consensus;
pub(crate) mod pinger;
pub(crate) mod small_network;
pub(crate) mod storage;
Expand Down
91 changes: 91 additions & 0 deletions src/components/consensus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
//! The consensus component. Provides distributed consensus among the nodes in the network.

use std::fmt;

use serde::{Deserialize, Serialize};

use crate::{
components::{small_network::NodeId, Component},
effect::{requests::NetworkRequest, Effect, EffectBuilder, Multiple},
};

/// The consensus component.
#[derive(Debug)]
pub(crate) struct Consensus {
// TODO
}

/// Network message used by the consensus component.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub(crate) enum Message {
/// TODO: create actual message variants
Dummy,
}

/// Consensus component event.
#[derive(Debug)]
pub(crate) enum Event {
/// An incoming network message.
// TODO: remove lint relaxation
#[allow(dead_code)]
MessageReceived { sender: NodeId, msg: Message },
// TODO: remove lint relaxation
#[allow(dead_code)]
Timer,
}

impl fmt::Display for Message {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Message::Dummy => write!(f, "dummy"),
}
}
}

impl fmt::Display for Event {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Event::MessageReceived { sender, msg } => write!(f, "msg from {}: {}", sender, msg),
Event::Timer => write!(f, "timer"),
}
}
}

impl<REv> Component<REv> for Consensus
where
REv: From<Event> + Send + From<NetworkRequest<NodeId, Message>>,
{
type Event = Event;

fn handle_event(
&mut self,
eb: EffectBuilder<REv>,
event: Self::Event,
) -> Multiple<Effect<Self::Event>> {
match event {
Event::Timer => todo!(),
Event::MessageReceived { sender, msg } => self.handle_message(eb, sender, msg),
}
}
}

impl Consensus {
/// Create and initialize a new consensus instance.
pub(crate) fn new<REv: From<Event> + Send + From<NetworkRequest<NodeId, Message>>>(
_eb: EffectBuilder<REv>,
) -> (Self, Multiple<Effect<Event>>) {
let consensus = Consensus {};

(consensus, Default::default())
}

/// Handles an incoming message
fn handle_message<REv: From<Event> + Send + From<NetworkRequest<NodeId, Message>>>(
&mut self,
_eb: EffectBuilder<REv>,
_sender: NodeId,
_msg: Message,
) -> Multiple<Effect<Event>> {
Default::default()
}
}
70 changes: 66 additions & 4 deletions src/reactor/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
use std::fmt::{self, Display, Formatter};

use derive_more::From;
use serde::{Deserialize, Serialize};

use crate::{
components::{
pinger::{self, Message, Pinger},
consensus::{self, Consensus},
pinger::{self, Pinger},
storage::{self, Storage},
Component,
},
Expand All @@ -21,6 +23,53 @@ use crate::{
Config, SmallNetwork,
};

#[derive(Debug, Clone, From, Serialize, Deserialize)]
enum Message {
#[from]
Pinger(pinger::Message),
#[from]
Consensus(consensus::Message),
}

impl Display for Message {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match self {
Message::Pinger(pinger) => write!(f, "Pinger::{}", pinger),
Message::Consensus(consensus) => write!(f, "Consensus::{}", consensus),
}
}
}

// This is ugly, but it works around trait specialization not being stable yet:
trait IsNotMessage {}
impl IsNotMessage for pinger::Message {}
impl IsNotMessage for consensus::Message {}

impl<I, P> From<NetworkRequest<I, P>> for NetworkRequest<I, Message>
where
P: Into<Message> + IsNotMessage,
{
fn from(other: NetworkRequest<I, P>) -> NetworkRequest<I, Message> {
match other {
NetworkRequest::SendMessage {
dest,
payload,
responder,
} => NetworkRequest::SendMessage {
dest,
payload: payload.into(),
responder,
},
NetworkRequest::BroadcastMessage { payload, responder } => {
NetworkRequest::BroadcastMessage {
payload: payload.into(),
responder,
}
}
}
}
}

/// Top-level event for the reactor.
#[derive(Debug, From)]
#[must_use]
Expand All @@ -33,11 +82,15 @@ enum Event {
Storage(Box<StorageRequest<Storage>>),
#[from]
StorageConsumer(Box<storage::dummy::Event>),
#[from]
Consensus(consensus::Event),
}

impl From<NetworkRequest<NodeId, Message>> for Event {
fn from(req: NetworkRequest<NodeId, Message>) -> Self {
Event::Network(small_network::Event::from(req))
impl<P: Into<Message> + IsNotMessage> From<NetworkRequest<NodeId, P>> for Event {
fn from(req: NetworkRequest<NodeId, P>) -> Self {
Event::Network(small_network::Event::from(
NetworkRequest::<NodeId, Message>::from(req),
))
}
}

Expand All @@ -46,6 +99,7 @@ struct Reactor {
net: SmallNetwork<Event, Message>,
pinger: Pinger,
storage: Storage,
consensus: Consensus,
dummy_storage_consumer: storage::dummy::StorageConsumer,
}

Expand All @@ -65,18 +119,22 @@ impl reactor::Reactor for Reactor {
let (dummy_storage_consumer, storage_consumer_effects) =
storage::dummy::StorageConsumer::new(eb);

let (consensus, consensus_effects) = Consensus::new(eb);

let mut effects = reactor::wrap_effects(Event::Network, net_effects);
effects.extend(reactor::wrap_effects(Event::Pinger, pinger_effects));
effects.extend(reactor::wrap_effects(
|event| Event::StorageConsumer(Box::new(event)),
storage_consumer_effects,
));
effects.extend(reactor::wrap_effects(Event::Consensus, consensus_effects));

Ok((
Reactor {
net,
pinger,
storage,
consensus,
dummy_storage_consumer,
},
effects,
Expand All @@ -103,6 +161,9 @@ impl reactor::Reactor for Reactor {
|event| Event::StorageConsumer(Box::new(event)),
self.dummy_storage_consumer.handle_event(eb, *ev),
),
Event::Consensus(ev) => {
reactor::wrap_effects(Event::Consensus, self.consensus.handle_event(eb, ev))
}
}
}
}
Expand All @@ -114,6 +175,7 @@ impl Display for Event {
Event::Pinger(ev) => write!(f, "pinger: {}", ev),
Event::Storage(ev) => write!(f, "storage: {}", ev),
Event::StorageConsumer(ev) => write!(f, "storage_consumer: {}", ev),
Event::Consensus(ev) => write!(f, "consensus: {}", ev),
}
}
}
Expand Down