From fc95a00392e20e195c2421a1989a4ed2605865d9 Mon Sep 17 00:00:00 2001 From: Fraser Hutchison Date: Wed, 20 May 2020 16:22:02 +0100 Subject: [PATCH 1/6] EE-1020: restructure project in preparation for new crates being added --- Cargo.lock | 21 +- Cargo.toml | 16 +- src/{ => app}/cli.rs | 9 +- src/{ => app}/config.rs | 58 +---- src/app/main.rs | 18 ++ src/components.rs | 2 +- src/components/small_network.rs | 192 +++------------- src/components/small_network/config.rs | 43 ++++ src/components/small_network/endpoint.rs | 73 ++++++ src/components/small_network/event.rs | 75 +++++++ src/components/small_network/message.rs | 31 +++ src/effect.rs | 86 +++---- src/{main.rs => lib.rs} | 21 +- src/reactor.rs | 19 +- src/reactor/validator.rs | 31 ++- src/tls.rs | 271 +++++++++++------------ src/utils.rs | 18 +- src/utils/round_robin.rs | 8 +- 18 files changed, 520 insertions(+), 472 deletions(-) rename src/{ => app}/cli.rs (95%) rename src/{ => app}/config.rs (65%) create mode 100644 src/app/main.rs create mode 100644 src/components/small_network/config.rs create mode 100644 src/components/small_network/endpoint.rs create mode 100644 src/components/small_network/event.rs create mode 100644 src/components/small_network/message.rs rename src/{main.rs => lib.rs} (71%) diff --git a/Cargo.lock b/Cargo.lock index 3eac623a5b..3d64f4a8d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -88,9 +88,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.53" +version = "1.0.54" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "404b1fe4f65288577753b17e3b36a04596ee784493ec249bf81c7f2d2acd751c" +checksum = "7bbb73db36c1246e9034e307d0fba23f9a2e251faa47ade70c1bd252220c8311" [[package]] name = "cfg-if" @@ -315,9 +315,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.1.12" +version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61565ff7aaace3525556587bd2dc31d4a07071957be715e63ce7b1eccf51a8f4" +checksum = "91780f809e750b0a89f5544be56617ff6b1227ee485bcb06ebe10cdf89bd3b71" dependencies = [ "libc", ] @@ -501,18 +501,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "0.4.16" +version = "0.4.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81d480cb4e89522ccda96d0eed9af94180b7a5f93fb28f66e1fd7d68431663d1" +checksum = "edc93aeee735e60ecb40cf740eb319ff23eab1c5748abfdb5c180e4ce49f7791" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "0.4.16" +version = "0.4.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a82996f11efccb19b685b14b5df818de31c1edcee3daa256ab5775dd98e72feb" +checksum = "e58db2081ba5b4c93bd6be09c40fd36cb9193a8336c384f3b40012e531aa7e40" dependencies = [ "proc-macro2", "quote", @@ -586,9 +586,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.5" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42934bc9c8ab0d3b273a16d8551c8f0fcff46be73276ca083ec2414c15c4ba5e" +checksum = "54a21852a652ad6f610c9510194f398ff6f8692e334fd1145fed931f7fbe44ea" dependencies = [ "proc-macro2", ] @@ -824,6 +824,7 @@ dependencies = [ "mio", "num_cpus", "pin-project-lite", + "slab", "tokio-macros", ] diff --git a/Cargo.toml b/Cargo.toml index 18c428eb72..902f604f29 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,8 +4,13 @@ version = "0.1.0" authors = ["Marc Brinkmann ", "Fraser Hutchison "] edition = "2018" description = "The CasperLabs blockchain node" -publish = false # Prevent accidental `cargo publish` for now. +documentation = "https://docs.rs/casper-node" +readme = "README.md" +homepage = "https://casperlabs.io" +repository = "https://github.com/CasperLabs/casper-node" license-file = "LICENSE" +publish = false # Prevent accidental `cargo publish` for now. +default-run = "casper-node" [dependencies] anyhow = "1.0.28" @@ -22,10 +27,17 @@ serde-big-array = "0.3.0" smallvec = "1.4.0" structopt = "0.3.14" thiserror = "1.0.18" -tokio = { version = "0.2.20", features = ["macros", "rt-threaded", "sync", "tcp"] } +tokio = { version = "0.2.20", features = ["macros", "rt-threaded", "sync", "tcp", "time"] } tokio-openssl = "0.4.0" tokio-serde = { version = "0.6.1", features = ["messagepack"] } tokio-util = { version = "0.3.1", features = ["codec"] } toml = "0.5.6" tracing = "0.1.14" tracing-subscriber = "0.2.5" + +[[bin]] +name = "casper-node" +path = "src/app/main.rs" +bench = false +doctest = false +test = false diff --git a/src/cli.rs b/src/app/cli.rs similarity index 95% rename from src/cli.rs rename to src/app/cli.rs index 855fcdd9c4..6d677b8588 100644 --- a/src/cli.rs +++ b/src/app/cli.rs @@ -8,11 +8,8 @@ use anyhow::bail; use structopt::StructOpt; use tracing::Level; -use crate::{ - config, - reactor::{self, validator::Reactor}, - tls, -}; +use crate::config; +use casper_node::{reactor, tls}; // Note: The docstring on `Cli` is the help shown when calling the binary with `--help`. #[derive(Debug, StructOpt)] @@ -81,7 +78,7 @@ impl Cli { } cfg.log.setup_logging()?; - reactor::launch::(cfg).await + reactor::validator::launch(cfg.validator_net).await } } } diff --git a/src/config.rs b/src/app/config.rs similarity index 65% rename from src/config.rs rename to src/app/config.rs index aabb81c848..a5b75141c7 100644 --- a/src/config.rs +++ b/src/app/config.rs @@ -3,9 +3,8 @@ //! Configuration for the node is loaded from TOML files, but all configuration values have sensible //! defaults. //! -//! The [`Cli`](../cli/enum.Cli.html#variant.GenerateConfig) offers an option to generate a -//! configuration from defaults for editing. I.e. running the following will dump a default -//! configuration file to stdout: +//! The binary offers an option to generate a configuration from defaults for editing. I.e. running +//! the following will dump a default configuration file to stdout: //! ``` //! cargo run --release -- generate-config //! ``` @@ -18,25 +17,23 @@ //! * `Default` is implemented (derived or manually) with sensible defaults, and //! * it is completely documented. -use std::{ - fs, io, - net::{IpAddr, Ipv4Addr, SocketAddr}, - path::{Path, PathBuf}, -}; +use std::{fs, io, path::Path}; use anyhow::Context; use serde::{Deserialize, Serialize}; use tracing::{debug, Level}; +use casper_node::SmallNetworkConfig; + /// Root configuration. #[derive(Debug, Deserialize, Serialize)] pub struct Config { /// Log configuration. pub log: Log, /// Network configuration for the validator-only network. - pub validator_net: SmallNetwork, + pub validator_net: SmallNetworkConfig, /// Network configuration for the public network. - pub public_net: SmallNetwork, + pub public_net: SmallNetworkConfig, } /// Log configuration. @@ -47,49 +44,12 @@ pub struct Log { pub level: Level, } -#[derive(Debug, Deserialize, Serialize)] -/// Small network configuration -pub struct SmallNetwork { - /// Interface to bind to. If it is the same as the in `root_addr`, attempt - /// become the root node for this particular small network. - pub bind_interface: IpAddr, - - /// Port to bind to when not the root node. Use 0 for a random port. - pub bind_port: u16, - - /// Address to connect to join the network. - pub root_addr: SocketAddr, - - /// Path to certificate file. - pub cert: Option, - - /// Path to private key for certificate. - pub private_key: Option, - - /// Maximum number of retries when trying to connect to an outgoing node. Unlimited if `None`. - pub max_outgoing_retries: Option, -} - -impl SmallNetwork { - /// Creates a default instance for `SmallNetwork` with a constant port. - fn default_on_port(port: u16) -> Self { - SmallNetwork { - bind_interface: Ipv4Addr::new(127, 0, 0, 1).into(), - bind_port: 0, - root_addr: (Ipv4Addr::new(127, 0, 0, 1), port).into(), - cert: None, - private_key: None, - max_outgoing_retries: None, - } - } -} - impl Default for Config { fn default() -> Self { Config { log: Default::default(), - validator_net: SmallNetwork::default_on_port(34553), - public_net: SmallNetwork::default_on_port(1485), + validator_net: SmallNetworkConfig::default_on_port(34553), + public_net: SmallNetworkConfig::default_on_port(1485), } } } diff --git a/src/app/main.rs b/src/app/main.rs new file mode 100644 index 0000000000..955348b7b0 --- /dev/null +++ b/src/app/main.rs @@ -0,0 +1,18 @@ +//! # CasperLabs blockchain node +//! +//! This is the core application for the CasperLabs blockchain. Run with `--help` to see available +//! command-line arguments. + +mod cli; +pub mod config; + +use structopt::StructOpt; + +use cli::Cli; + +#[tokio::main] +pub async fn main() -> anyhow::Result<()> { + // Parse CLI args and run selected subcommand. + let opts = Cli::from_args(); + opts.run().await +} diff --git a/src/components.rs b/src/components.rs index 37eb4946fa..b699a34e3c 100644 --- a/src/components.rs +++ b/src/components.rs @@ -2,4 +2,4 @@ //! //! Docs to be written, sorry. -pub mod small_network; +pub(crate) mod small_network; diff --git a/src/components/small_network.rs b/src/components/small_network.rs index 0fc15729b1..33fdd12d64 100644 --- a/src/components/small_network.rs +++ b/src/components/small_network.rs @@ -38,11 +38,14 @@ //! all nodes in the list and simultaneously tell all of its connected nodes about the new node, //! repeating the process. +mod config; +mod endpoint; +mod event; +mod message; + use std::{ - cmp::Ordering, - collections::{HashMap, HashSet}, - fmt::{self, Debug, Display, Formatter}, - hash::Hash, + collections::HashMap, + fmt::{self, Debug, Formatter}, io, net::{SocketAddr, TcpListener}, sync::Arc, @@ -55,9 +58,9 @@ use futures::{ FutureExt, SinkExt, StreamExt, }; use maplit::hashmap; -use openssl::{pkey, x509}; +use openssl::{pkey, x509::X509}; use pkey::{PKey, Private}; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use serde::{de::DeserializeOwned, Serialize}; use tokio::{ net::TcpStream, sync::mpsc::{self, UnboundedReceiver, UnboundedSender}, @@ -66,83 +69,26 @@ use tokio_openssl::SslStream; use tokio_serde::{formats::SymmetricalMessagePack, SymmetricallyFramed}; use tokio_util::codec::{Framed, LengthDelimitedCodec}; use tracing::{debug, error, info, warn}; -use x509::X509; +pub(crate) use self::{endpoint::Endpoint, event::Event, message::Message}; use crate::{ - config, - effect::{Effect, EffectExt, EffectResultExt}, + effect::{Effect, EffectExt, EffectResultExt, Multiple}, reactor::{EventQueueHandle, QueueKind, Reactor}, tls::{self, KeyFingerprint, Signed, TlsCert}, - utils::{DisplayIter, Multiple}, }; +pub use config::Config; /// A node ID. /// /// The key fingerprint found on TLS certificates. -pub type NodeId = KeyFingerprint; - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub enum Message

{ - /// A pruned set of all endpoint announcements the server has received. - Snapshot(HashSet>), - /// Broadcast a new endpoint known to the sender. - BroadcastEndpoint(Signed), - /// A payload message. - Payload(P), -} - -#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)] -pub struct Endpoint { - /// UNIX timestamp in nanoseconds resolution. - /// - /// Will overflow earliest November 2262. - timestamp_ns: u64, - /// Socket address the node is listening on. - addr: SocketAddr, - /// Certificate. - cert: TlsCert, -} +type NodeId = KeyFingerprint; -#[derive(Debug)] -pub enum Event

{ - /// Connection to the root node succeeded. - RootConnected { cert: TlsCert, transport: Transport }, - /// Connection to the root node failed. - RootFailed { error: anyhow::Error }, - /// A new TCP connection has been established from an incoming connection. - IncomingNew { stream: TcpStream, addr: SocketAddr }, - /// The TLS handshake completed on the incoming connection. - IncomingHandshakeCompleted { - result: anyhow::Result<(NodeId, Transport)>, - addr: SocketAddr, - }, - /// Received network message. - IncomingMessage { node_id: NodeId, msg: Message

}, - /// Incoming connection closed. - IncomingClosed { - result: io::Result<()>, - addr: SocketAddr, - }, - - /// A new outgoing connection was successfully established. - OutgoingEstablished { - node_id: NodeId, - transport: Transport, - }, - /// An outgoing connection failed to connect or was terminated. - OutgoingFailed { - node_id: NodeId, - attempt_count: u32, - error: Option, - }, -} - -pub struct SmallNetwork +pub(crate) struct SmallNetwork where R: Reactor, { /// Configuration. - cfg: config::SmallNetwork, + cfg: Config, /// Server certificate. cert: Arc, /// Server private key. @@ -163,9 +109,9 @@ where P: Serialize + DeserializeOwned + Clone + Debug + Send + 'static, { #[allow(clippy::type_complexity)] - pub fn new( + pub(crate) fn new( eq: EventQueueHandle>, - cfg: config::SmallNetwork, + cfg: Config, ) -> anyhow::Result<(SmallNetwork, Multiple>>)> where R: Reactor + 'static, @@ -192,12 +138,12 @@ where let addr = listener.local_addr()?; // Create the model. Initially we know our own endpoint address. - let our_endpoint = Endpoint { - timestamp_ns: SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos() as u64, + let our_endpoint = Endpoint::new( + SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos() as u64, addr, - cert: tls::validate_cert(cert.clone())?, - }; - let our_fp = our_endpoint.cert.public_key_fingerprint(); + tls::validate_cert(cert.clone())?, + ); + let our_fp = our_endpoint.cert().public_key_fingerprint(); // Run the server task. info!(%our_endpoint, "starting server background task"); @@ -234,16 +180,12 @@ where } #[allow(clippy::cognitive_complexity)] - pub fn handle_event(&mut self, ev: Event

) -> Multiple>> { + pub(crate) fn handle_event(&mut self, ev: Event

) -> Multiple>> { match ev { Event::RootConnected { cert, transport } => { // Create a pseudo-endpoint for the root node with the lowest priority (time 0) let root_node_id = cert.public_key_fingerprint(); - let ep = Endpoint { - timestamp_ns: 0, - addr: self.cfg.root_addr, - cert, - }; + let ep = Endpoint::new(0, self.cfg.root_addr, cert); if self.endpoints.insert(root_node_id, ep).is_some() { // This connection is the very first we will ever make, there should never be // a root node registered, as we will never re-attempt this connection if it @@ -369,7 +311,7 @@ where /// Returns the node ID of the endpoint if it was new. #[inline] fn update_endpoint(&mut self, endpoint: &Endpoint) -> Option { - let fp = endpoint.cert.public_key_fingerprint(); + let fp = endpoint.cert().public_key_fingerprint(); if let Some(prev) = self.endpoints.get(&fp) { if prev >= endpoint { @@ -388,7 +330,7 @@ where &mut self, signed: Signed, ) -> Multiple>> { - match signed.validate_self_signed(|endpoint| Ok(endpoint.cert.public_key())) { + match signed.validate_self_signed(|endpoint| Ok(endpoint.cert().public_key())) { Ok(endpoint) => { // Endpoint is valid, check if it was new. if let Some(node_id) = self.update_endpoint(&endpoint) { @@ -488,7 +430,7 @@ where /// /// Will attempt to bind on the root address first if the `bind_interface` is the same as the /// interface of `root_addr`. Otherwise uses an unused port on `bind_interface`. -fn create_listener(cfg: &config::SmallNetwork) -> io::Result { +fn create_listener(cfg: &Config) -> io::Result { if cfg.root_addr.ip() == cfg.bind_interface { // Try to become the root node, if the root nodes interface is available. match TcpListener::bind(cfg.root_addr) { @@ -630,11 +572,11 @@ async fn connect_outgoing( cert: Arc, private_key: Arc>, ) -> anyhow::Result { - let (server_cert, transport) = connect_trusted(endpoint.addr, cert, private_key).await?; + let (server_cert, transport) = connect_trusted(endpoint.addr(), cert, private_key).await?; let remote_id = server_cert.public_key_fingerprint(); - if remote_id != endpoint.cert.public_key_fingerprint() { + if remote_id != endpoint.cert().public_key_fingerprint() { bail!("remote node has wrong ID"); } @@ -667,82 +609,6 @@ async fn connect_trusted( Ok((tls::validate_cert(server_cert)?, tls_stream)) } -// Impose a total ordering on endpoints. Compare timestamps first, if the same, order by actual -// address. If both of these are the same, use the TLS certificate's fingerprint as a tie-breaker. -impl Ord for Endpoint { - fn cmp(&self, other: &Self) -> Ordering { - Ord::cmp(&self.timestamp_ns, &other.timestamp_ns) - .then_with(|| { - Ord::cmp( - &(self.addr.ip(), self.addr.port()), - &(other.addr.ip(), other.addr.port()), - ) - }) - .then_with(|| Ord::cmp(&self.cert.fingerprint(), &other.cert.fingerprint())) - } -} -impl PartialOrd for Endpoint { - #[inline] - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Display for Message

{ - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - match self { - Message::Snapshot(snapshot) => { - write!(f, "snapshot: {:10}", DisplayIter::new(snapshot.iter())) - } - Message::BroadcastEndpoint(endpoint) => write!(f, "broadcast endpoint: {}", endpoint), - Message::Payload(payload) => write!(f, "payload: {}", payload), - } - } -} - -impl Display for Event

{ - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - match self { - Event::RootConnected { cert, .. } => { - write!(f, "root connected @ {}", cert.public_key_fingerprint()) - } - Event::RootFailed { error } => write!(f, "root failed: {}", error), - Event::IncomingNew { addr, .. } => write!(f, "incoming connection from {}", addr), - Event::IncomingHandshakeCompleted { result, addr } => { - write!(f, "handshake from {}, is_err {}", addr, result.is_err()) - } - Event::IncomingMessage { node_id, msg } => write!(f, "msg from {}: {}", node_id, msg), - Event::IncomingClosed { addr, .. } => write!(f, "closed connection from {}", addr), - Event::OutgoingEstablished { node_id, .. } => { - write!(f, "established outgoing to {}", node_id) - } - Event::OutgoingFailed { - node_id, - attempt_count, - error, - } => write!( - f, - "failed outgoing {} [{}]: (is_err {})", - node_id, - attempt_count, - error.is_some() - ), - } - } -} - -impl Display for Endpoint { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - write!( - f, - "{}@{} [{}]", - self.cert.public_key_fingerprint(), - self.addr, - self.timestamp_ns - ) - } -} - impl Debug for SmallNetwork where R: Reactor, diff --git a/src/components/small_network/config.rs b/src/components/small_network/config.rs new file mode 100644 index 0000000000..5941fb5fc7 --- /dev/null +++ b/src/components/small_network/config.rs @@ -0,0 +1,43 @@ +use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + path::PathBuf, +}; + +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Deserialize, Serialize)] +/// Small network configuration +pub struct Config { + /// Interface to bind to. If it is the same as the in `root_addr`, attempt + /// become the root node for this particular small network. + pub bind_interface: IpAddr, + + /// Port to bind to when not the root node. Use 0 for a random port. + pub bind_port: u16, + + /// Address to connect to join the network. + pub root_addr: SocketAddr, + + /// Path to certificate file. + pub cert: Option, + + /// Path to private key for certificate. + pub private_key: Option, + + /// Maximum number of retries when trying to connect to an outgoing node. Unlimited if `None`. + pub max_outgoing_retries: Option, +} + +impl Config { + /// Creates a default instance for `SmallNetwork` with a constant port. + pub fn default_on_port(port: u16) -> Self { + Config { + bind_interface: Ipv4Addr::new(127, 0, 0, 1).into(), + bind_port: 0, + root_addr: (Ipv4Addr::new(127, 0, 0, 1), port).into(), + cert: None, + private_key: None, + max_outgoing_retries: None, + } + } +} diff --git a/src/components/small_network/endpoint.rs b/src/components/small_network/endpoint.rs new file mode 100644 index 0000000000..2e8fcdefef --- /dev/null +++ b/src/components/small_network/endpoint.rs @@ -0,0 +1,73 @@ +use std::{ + cmp::Ordering, + fmt::{self, Display, Formatter}, + net::SocketAddr, +}; + +use serde::{Deserialize, Serialize}; + +use crate::tls::TlsCert; + +#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)] +pub(crate) struct Endpoint { + /// UNIX timestamp in nanoseconds resolution. + /// + /// Will overflow earliest November 2262. + timestamp_ns: u64, + /// Socket address the node is listening on. + addr: SocketAddr, + /// Certificate. + cert: TlsCert, +} + +impl Endpoint { + pub(super) fn new(timestamp_ns: u64, addr: SocketAddr, cert: TlsCert) -> Self { + Endpoint { + timestamp_ns, + addr, + cert, + } + } + + pub(super) fn addr(&self) -> SocketAddr { + self.addr + } + + pub(super) fn cert(&self) -> &TlsCert { + &self.cert + } +} + +// Impose a total ordering on endpoints. Compare timestamps first, if the same, order by actual +// address. If both of these are the same, use the TLS certificate's fingerprint as a tie-breaker. +impl Ord for Endpoint { + fn cmp(&self, other: &Self) -> Ordering { + Ord::cmp(&self.timestamp_ns, &other.timestamp_ns) + .then_with(|| { + Ord::cmp( + &(self.addr.ip(), self.addr.port()), + &(other.addr.ip(), other.addr.port()), + ) + }) + .then_with(|| Ord::cmp(&self.cert.fingerprint(), &other.cert.fingerprint())) + } +} + +impl PartialOrd for Endpoint { + #[inline] + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Display for Endpoint { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!( + f, + "{}@{} [{}]", + self.cert.public_key_fingerprint(), + self.addr, + self.timestamp_ns + ) + } +} diff --git a/src/components/small_network/event.rs b/src/components/small_network/event.rs new file mode 100644 index 0000000000..03e5de24e9 --- /dev/null +++ b/src/components/small_network/event.rs @@ -0,0 +1,75 @@ +use std::{ + fmt::{self, Debug, Display, Formatter}, + io, + net::SocketAddr, +}; + +use tokio::net::TcpStream; + +use super::{Message, NodeId, Transport}; +use crate::tls::TlsCert; + +#[derive(Debug)] +pub(crate) enum Event

{ + /// Connection to the root node succeeded. + RootConnected { cert: TlsCert, transport: Transport }, + /// Connection to the root node failed. + RootFailed { error: anyhow::Error }, + /// A new TCP connection has been established from an incoming connection. + IncomingNew { stream: TcpStream, addr: SocketAddr }, + /// The TLS handshake completed on the incoming connection. + IncomingHandshakeCompleted { + result: anyhow::Result<(NodeId, Transport)>, + addr: SocketAddr, + }, + /// Received network message. + IncomingMessage { node_id: NodeId, msg: Message

}, + /// Incoming connection closed. + IncomingClosed { + result: io::Result<()>, + addr: SocketAddr, + }, + + /// A new outgoing connection was successfully established. + OutgoingEstablished { + node_id: NodeId, + transport: Transport, + }, + /// An outgoing connection failed to connect or was terminated. + OutgoingFailed { + node_id: NodeId, + attempt_count: u32, + error: Option, + }, +} + +impl Display for Event

{ + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + Event::RootConnected { cert, .. } => { + write!(f, "root connected @ {}", cert.public_key_fingerprint()) + } + Event::RootFailed { error } => write!(f, "root failed: {}", error), + Event::IncomingNew { addr, .. } => write!(f, "incoming connection from {}", addr), + Event::IncomingHandshakeCompleted { result, addr } => { + write!(f, "handshake from {}, is_err {}", addr, result.is_err()) + } + Event::IncomingMessage { node_id, msg } => write!(f, "msg from {}: {}", node_id, msg), + Event::IncomingClosed { addr, .. } => write!(f, "closed connection from {}", addr), + Event::OutgoingEstablished { node_id, .. } => { + write!(f, "established outgoing to {}", node_id) + } + Event::OutgoingFailed { + node_id, + attempt_count, + error, + } => write!( + f, + "failed outgoing {} [{}]: (is_err {})", + node_id, + attempt_count, + error.is_some() + ), + } + } +} diff --git a/src/components/small_network/message.rs b/src/components/small_network/message.rs new file mode 100644 index 0000000000..32c4ab1f52 --- /dev/null +++ b/src/components/small_network/message.rs @@ -0,0 +1,31 @@ +use std::{ + collections::HashSet, + fmt::{self, Debug, Display, Formatter}, +}; + +use serde::{Deserialize, Serialize}; + +use super::Endpoint; +use crate::{tls::Signed, utils::DisplayIter}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub(crate) enum Message

{ + /// A pruned set of all endpoint announcements the server has received. + Snapshot(HashSet>), + /// Broadcast a new endpoint known to the sender. + BroadcastEndpoint(Signed), + /// A payload message. + Payload(P), +} + +impl Display for Message

{ + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + Message::Snapshot(snapshot) => { + write!(f, "snapshot: {:10}", DisplayIter::new(snapshot.iter())) + } + Message::BroadcastEndpoint(endpoint) => write!(f, "broadcast endpoint: {}", endpoint), + Message::Payload(payload) => write!(f, "payload: {}", payload), + } + } +} diff --git a/src/effect.rs b/src/effect.rs index 5098e78b40..ed5d780189 100644 --- a/src/effect.rs +++ b/src/effect.rs @@ -14,8 +14,25 @@ //! //! ``` //! use std::time::Duration; -//! use crate::effect::EffectExt; -//! +//! use casper_node::effect::{Core, EffectExt}; +//! +//! # struct EffectBuilder {} +//! # +//! # impl Core for EffectBuilder { +//! # fn immediately(self) -> futures::future::BoxFuture<'static, ()> { +//! # Box::pin(async {}) +//! # } +//! # +//! # fn set_timeout(self, timeout: Duration) -> futures::future::BoxFuture<'static, Duration> { +//! # Box::pin(async move { +//! # let then = std::time::Instant::now(); +//! # tokio::time::delay_for(timeout).await; +//! # std::time::Instant::now() - then +//! # }) +//! # } +//! # } +//! # let events_factory = EffectBuilder {}; +//! # //! enum Event { //! ThreeSecondsElapsed(Duration) //! } @@ -29,43 +46,6 @@ //! `Event::ThreeSecondsElapsed`. Note that effects do nothing on their own, they need to be passed //! to a [`reactor`](../reactor/index.html) to be executed. //! -//! ## Chaining futures and effects -//! -//! Effects are built from futures, which can be combined before being finalized -//! into an effect. However, only one effect can be created as the end result -//! of such a chain. -//! -//! It is possible to create an effect from multiple effects being run in parallel using `.also`: -//! -//! ``` -//! use std::time::Duration; -//! use crate::effect::{EffectExt, EffectAlso}; -//! -//! enum Event { -//! ThreeSecondsElapsed(Duration), -//! FiveSecondsElapsed(Duration), -//! } -//! -//! // This effect produces a single event after five seconds: -//! events_factory -//! .set_timeout(Duration::from_secs(3)) -//! .then(|_| { -//! events_factory -//! .set_timeout(Duration::from_secs(2)) -//! .event(Event::FiveSecondsElapsed) -//! }); -//! -//! // Here, two effects are run in parallel, resulting in two events: -//! events_factory -//! .set_timeout(Duration::from_secs(3)) -//! .event(Event::ThreeSecondsElapsed) -//! .also( -//! events_factory -//! .set_timeout(Duration::from_secs(5)) -//! .event(Event::FiveSecondsElapsed), -//! ); -//! ``` -//! //! ## Arbitrary effects //! //! While it is technically possible to turn any future into an effect, it is advisable to only use @@ -75,18 +55,20 @@ use std::{future::Future, time::Duration}; use futures::{future::BoxFuture, FutureExt}; -use smallvec::smallvec; - -use crate::utils::Multiple; +use smallvec::{smallvec, SmallVec}; -/// Effect type. -/// -/// Effects are just boxed futures that produce one or more events. +/// Boxed futures that produce one or more events. pub type Effect = BoxFuture<'static, Multiple>; -/// Effect extension for futures. +/// Intended to hold a small collection of effects. /// -/// Used to convert futures into actual effects. +/// Stored in a `SmallVec` to avoid allocations in case there are less than three items grouped. The +/// size of two items is chosen because one item is the most common use case, and large items are +/// typically boxed. In the latter case two pointers and one enum variant discriminator is almost +/// the same size as an empty vec, which is two pointers. +pub type Multiple = SmallVec<[T; 2]>; + +/// Effect extension for futures, used to convert futures into actual effects. pub trait EffectExt: Future + Send { /// Finalizes a future into an effect that returns an event. /// @@ -97,18 +79,22 @@ pub trait EffectExt: Future + Send { U: 'static, Self: Sized; - /// Finalize a future into an effect that runs but drops the result. + /// Finalizes a future into an effect that runs but drops the result. fn ignore(self) -> Multiple>; } +/// Effect extension for futures, used to convert futures returning a `Result` into two different +/// effects. pub trait EffectResultExt { + /// The type the future will return if `Ok`. type Value; + /// The type the future will return if `Err`. type Error; /// Finalizes a future returning a `Result` into two different effects. /// - /// The function `f` is used to translate the returned value from an effect into an event, while - /// the function `g` does the same for a potential error. + /// The function `f_ok` is used to translate the returned value from an effect into an event, + /// while the function `f_err` does the same for a potential error. fn result(self, f_ok: F, f_err: G) -> Multiple> where F: FnOnce(Self::Value) -> U + 'static + Send, diff --git a/src/main.rs b/src/lib.rs similarity index 71% rename from src/main.rs rename to src/lib.rs index 344231339e..e323d7ae02 100644 --- a/src/main.rs +++ b/src/lib.rs @@ -18,24 +18,15 @@ missing_docs, trivial_casts, trivial_numeric_casts, - // unreachable_pub, + unreachable_pub, unused_qualifications )] -mod cli; mod components; -mod config; -mod effect; -mod reactor; -mod tls; +pub mod effect; +pub mod reactor; +pub mod tls; mod utils; -use structopt::StructOpt; - -/// Parses [command-line arguments](cli/index.html) and run application. -#[tokio::main] -pub async fn main() -> anyhow::Result<()> { - // Parse CLI args and run selected subcommand. - let opts = cli::Cli::from_args(); - opts.run().await -} +pub use components::small_network::Config as SmallNetworkConfig; +pub(crate) use components::small_network::{self, SmallNetwork}; diff --git a/src/reactor.rs b/src/reactor.rs index b0b18f0b20..b1744c25a7 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -33,9 +33,9 @@ use futures::FutureExt; use tracing::{debug, info, trace, warn}; use crate::{ - config, - effect::Effect, - utils::{self, Multiple, WeightedRoundRobin}, + effect::{Effect, Multiple}, + utils::{self, WeightedRoundRobin}, + SmallNetworkConfig, }; pub use queue_kind::QueueKind; @@ -45,7 +45,7 @@ pub use queue_kind::QueueKind; /// is the central hook for any part of the program that schedules events directly. /// /// Components rarely use this, but use a bound `EventQueueHandle` instead. -pub type Scheduler = WeightedRoundRobin; +type Scheduler = WeightedRoundRobin; /// Bound event queue handle /// @@ -56,7 +56,7 @@ pub type Scheduler = WeightedRoundRobin; /// Every event queue handle allows scheduling events of type `Ev` onto a reactor `R`. For this it /// carries with it a reference to a wrapper function that maps an `Ev` to a `Reactor::Event`. #[derive(Debug)] -pub struct EventQueueHandle +pub(crate) struct EventQueueHandle where R: Reactor, { @@ -78,6 +78,7 @@ where } } } + impl Copy for EventQueueHandle where R: Reactor {} impl EventQueueHandle @@ -91,7 +92,7 @@ where /// Schedule an event on a specific queue. #[inline] - pub async fn schedule(self, event: Ev, queue_kind: QueueKind) { + pub(crate) async fn schedule(self, event: Ev, queue_kind: QueueKind) { self.scheduler.push((self.wrapper)(event), queue_kind).await } } @@ -100,7 +101,7 @@ where /// /// Any reactor should implement this trait and be launched by the [`launch`](fn.launch.html) /// function. -pub trait Reactor: Sized { +pub(crate) trait Reactor: Sized { // Note: We've gone for the `Sized` bound here, since we return an instance in `new`. As an // alternative, `new` could return a boxed instance instead, removing this requirement. @@ -123,7 +124,7 @@ pub trait Reactor: Sized { /// /// If any instantiation fails, an error is returned. fn new( - cfg: config::Config, + cfg: SmallNetworkConfig, scheduler: &'static Scheduler, ) -> anyhow::Result<(Self, Multiple>)>; } @@ -136,7 +137,7 @@ pub trait Reactor: Sized { /// /// Errors are returned only if component initialization fails. #[inline] -pub async fn launch(cfg: config::Config) -> anyhow::Result<()> { +async fn launch(cfg: SmallNetworkConfig) -> anyhow::Result<()> { let event_size = mem::size_of::(); // Check if the event is of a reasonable size. This only emits a runtime warning at startup // right now, since storage size of events is not an issue per se, but copying might be diff --git a/src/reactor/validator.rs b/src/reactor/validator.rs index 9a99f09cd5..0a099e823b 100644 --- a/src/reactor/validator.rs +++ b/src/reactor/validator.rs @@ -7,25 +7,23 @@ use std::fmt::{self, Display, Formatter}; use serde::{Deserialize, Serialize}; use crate::{ - components::small_network::{self, SmallNetwork}, - config::Config, - effect::Effect, + effect::{Effect, Multiple}, reactor::{self, EventQueueHandle, Scheduler}, - utils::Multiple, + small_network, SmallNetwork, SmallNetworkConfig, }; /// Top-level event for the reactor. #[derive(Debug)] #[must_use] -pub enum Event { +enum Event { Network(small_network::Event), } #[derive(Clone, Debug, Deserialize, Serialize)] -pub enum Message {} +enum Message {} /// Validator node reactor. -pub struct Reactor { +struct Reactor { net: SmallNetwork, } @@ -33,13 +31,11 @@ impl reactor::Reactor for Reactor { type Event = Event; fn new( - cfg: Config, + cfg: SmallNetworkConfig, scheduler: &'static Scheduler, ) -> anyhow::Result<(Self, Multiple>)> { - let (net, net_effects) = SmallNetwork::new( - EventQueueHandle::bind(scheduler, Event::Network), - cfg.validator_net, - )?; + let (net, net_effects) = + SmallNetwork::new(EventQueueHandle::bind(scheduler, Event::Network), cfg)?; Ok(( Reactor { net }, @@ -67,3 +63,14 @@ impl Display for Message { write!(f, "TODO: MessagePayload") } } + +/// Runs a validator reactor. +/// +/// Starts the reactor and associated background tasks, then enters main the event processing loop. +/// +/// `launch` will leak memory on start for global structures each time it is called. +/// +/// Errors are returned only if component initialization fails. +pub async fn launch(cfg: SmallNetworkConfig) -> anyhow::Result<()> { + super::launch::(cfg).await +} diff --git a/src/tls.rs b/src/tls.rs index cde7c6cb1b..e0af772e4f 100644 --- a/src/tls.rs +++ b/src/tls.rs @@ -50,10 +50,13 @@ use openssl::{ x509::{X509Builder, X509Name, X509NameBuilder, X509NameRef, X509Ref, X509}, }; use serde::{de::DeserializeOwned, Deserialize, Deserializer, Serialize, Serializer}; -use serde_big_array::big_array; use thiserror::Error; -big_array! { BigArray; } +mod big_array { + use serde_big_array::big_array; + + big_array! { BigArray; } +} /// The chosen signature algorithm (**ECDSA with SHA512**). const SIGNATURE_ALGORITHM: Nid = Nid::ECDSA_WITH_SHA512; @@ -67,139 +70,28 @@ const SIGNATURE_DIGEST: Nid = Nid::SHA512; /// OpenSSL result type alias. /// /// Many functions rely solely on `openssl` functions and return this kind of result. -pub type SslResult = Result; +type SslResult = Result; /// SHA512 hash. #[derive(Copy, Clone, Deserialize, Serialize)] -pub struct Sha512(#[serde(with = "BigArray")] [u8; Sha512::SIZE]); - -/// Certificate fingerprint. -#[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)] -pub struct CertFingerprint(Sha512); - -/// Public key fingerprint. -#[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)] -pub struct KeyFingerprint(Sha512); - -/// Cryptographic signature. -#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)] -pub struct Signature(Vec); - -/// TLS certificate. -/// -/// Thin wrapper around `X509` enabling things like Serde serialization and fingerprint caching. -#[derive(Clone)] -pub struct TlsCert { - /// The wrapped x509 certificate. - x509: X509, - - /// Cached certificate fingerprint. - cert_fingerprint: CertFingerprint, - - /// Cached public key fingerprint. - key_fingerprint: KeyFingerprint, -} - -// Serialization and deserialization happens only via x509, which is checked upon deserialization. -impl<'de> Deserialize<'de> for TlsCert { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - validate_cert(x509_serde::deserialize(deserializer)?).map_err(serde::de::Error::custom) - } -} - -impl Serialize for TlsCert { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - x509_serde::serialize(&self.x509, serializer) - } -} - -/// A signed value. -/// -/// Combines a value `V` with a `Signature` and a signature scheme. The signature scheme involves -/// serializing the value to bytes and signing the result. -#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)] -pub struct Signed { - data: Vec, - signature: Signature, - _phantom: PhantomData, -} - -impl Signed -where - V: Serialize, -{ - /// Creates a new signed value. - /// - /// Serializes the value to a buffer and signs the buffer. - pub fn new(value: &V, signing_key: &PKeyRef) -> anyhow::Result { - let data = rmp_serde::to_vec(value)?; - let signature = Signature::create(signing_key, &data)?; - - Ok(Signed { - data, - signature, - _phantom: PhantomData, - }) - } -} - -impl Signed -where - V: DeserializeOwned, -{ - /// Validates signature and restore value. - #[allow(dead_code)] - pub fn validate(&self, public_key: &PKeyRef) -> anyhow::Result { - if self.signature.verify(public_key, &self.data)? { - Ok(rmp_serde::from_read(self.data.as_slice())?) - } else { - Err(anyhow!("invalid signature")) - } - } - - /// Validates a self-signed value. - /// - /// Allows for extraction of a public key prior to validating a value. - #[inline] - pub fn validate_self_signed(&self, extract: F) -> anyhow::Result - where - F: FnOnce(&V) -> anyhow::Result>, - { - let unverified = rmp_serde::from_read(self.data.as_slice())?; - { - let public_key = - extract(&unverified).context("could not extract public key from self-signed")?; - if self.signature.verify(&public_key, &self.data)? { - Ok(unverified) - } else { - Err(anyhow!("invalid signature")) - } - } - } -} +struct Sha512(#[serde(with = "big_array::BigArray")] [u8; Sha512::SIZE]); impl Sha512 { /// Size of digest in bytes. - pub const SIZE: usize = 64; + const SIZE: usize = 64; /// OpenSSL NID. const NID: Nid = Nid::SHA512; /// Create a new Sha512 by hashing a slice. - pub fn new>(data: B) -> Self { + fn new>(data: B) -> Self { let mut openssl_sha = sha::Sha512::new(); openssl_sha.update(data.as_ref()); Sha512(openssl_sha.finish()) } /// Returns bytestring of the hash, with length `Self::SIZE`. - pub fn bytes(&self) -> &[u8] { + fn bytes(&self) -> &[u8] { let bs = &self.0[..]; debug_assert_eq!(bs.len(), Self::SIZE); @@ -230,9 +122,21 @@ impl Sha512 { } } +/// Certificate fingerprint. +#[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)] +pub(crate) struct CertFingerprint(Sha512); + +/// Public key fingerprint. +#[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)] +pub(crate) struct KeyFingerprint(Sha512); + +/// Cryptographic signature. +#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)] +struct Signature(Vec); + impl Signature { /// Signs a binary blob with the blessed ciphers and TLS parameters. - pub fn create(private_key: &PKeyRef, data: &[u8]) -> SslResult { + fn create(private_key: &PKeyRef, data: &[u8]) -> SslResult { // TODO: This needs verification to ensure we're not doing stupid/textbook RSA-ish. // Sha512 is hardcoded, so check we're creating the correct signature. @@ -251,7 +155,7 @@ impl Signature { } /// Verifies that signature matches on a binary blob. - pub fn verify(self: &Signature, public_key: &PKeyRef, data: &[u8]) -> SslResult { + fn verify(self: &Signature, public_key: &PKeyRef, data: &[u8]) -> SslResult { assert_eq!(Sha512::NID, SIGNATURE_DIGEST); let mut verifier = Verifier::new(Sha512::create_message_digest(), public_key)?; @@ -260,17 +164,32 @@ impl Signature { } } +/// TLS certificate. +/// +/// Thin wrapper around `X509` enabling things like Serde serialization and fingerprint caching. +#[derive(Clone)] +pub(crate) struct TlsCert { + /// The wrapped x509 certificate. + x509: X509, + + /// Cached certificate fingerprint. + cert_fingerprint: CertFingerprint, + + /// Cached public key fingerprint. + key_fingerprint: KeyFingerprint, +} + impl TlsCert { /// Returns the certificate's fingerprint. /// /// In contrast to the `public_key_fingerprint`, this fingerprint also contains the certificate /// information. - pub fn fingerprint(&self) -> CertFingerprint { + pub(crate) fn fingerprint(&self) -> CertFingerprint { self.cert_fingerprint } /// Extracts the public key from the certificate. - pub fn public_key(&self) -> PKey { + pub(crate) fn public_key(&self) -> PKey { // This can never fail, we validate the certificate on construction and deserialization. self.x509 .public_key() @@ -278,15 +197,9 @@ impl TlsCert { } /// Returns the public key fingerprint. - pub fn public_key_fingerprint(&self) -> KeyFingerprint { + pub(crate) fn public_key_fingerprint(&self) -> KeyFingerprint { self.key_fingerprint } - - #[allow(dead_code)] - /// Returns OpenSSL X509 certificate. - fn x509(&self) -> &X509 { - &self.x509 - } } impl Debug for TlsCert { @@ -309,6 +222,90 @@ impl PartialEq for TlsCert { impl Eq for TlsCert {} +// Serialization and deserialization happens only via x509, which is checked upon deserialization. +impl<'de> Deserialize<'de> for TlsCert { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + validate_cert(x509_serde::deserialize(deserializer)?).map_err(serde::de::Error::custom) + } +} + +impl Serialize for TlsCert { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + x509_serde::serialize(&self.x509, serializer) + } +} + +/// A signed value. +/// +/// Combines a value `V` with a `Signature` and a signature scheme. The signature scheme involves +/// serializing the value to bytes and signing the result. +#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)] +pub(crate) struct Signed { + data: Vec, + signature: Signature, + _phantom: PhantomData, +} + +impl Signed +where + V: Serialize, +{ + /// Creates a new signed value. + /// + /// Serializes the value to a buffer and signs the buffer. + pub(crate) fn new(value: &V, signing_key: &PKeyRef) -> anyhow::Result { + let data = rmp_serde::to_vec(value)?; + let signature = Signature::create(signing_key, &data)?; + + Ok(Signed { + data, + signature, + _phantom: PhantomData, + }) + } +} + +impl Signed +where + V: DeserializeOwned, +{ + /// Validates signature and restore value. + #[allow(dead_code)] + pub(crate) fn validate(&self, public_key: &PKeyRef) -> anyhow::Result { + if self.signature.verify(public_key, &self.data)? { + Ok(rmp_serde::from_read(self.data.as_slice())?) + } else { + Err(anyhow!("invalid signature")) + } + } + + /// Validates a self-signed value. + /// + /// Allows for extraction of a public key prior to validating a value. + #[inline] + pub(crate) fn validate_self_signed(&self, extract: F) -> anyhow::Result + where + F: FnOnce(&V) -> anyhow::Result>, + { + let unverified = rmp_serde::from_read(self.data.as_slice())?; + { + let public_key = + extract(&unverified).context("could not extract public key from self-signed")?; + if self.signature.verify(&public_key, &self.data)? { + Ok(unverified) + } else { + Err(anyhow!("invalid signature")) + } + } + } +} + /// Generates a self-signed (key, certificate) pair suitable for TLS and signing. /// /// The common name of the certificate will be "casper-node". @@ -325,7 +322,7 @@ pub fn generate_node_cert() -> SslResult<(X509, PKey)> { /// compatible with connectors built with `create_tls_connector`. /// /// Incoming certificates must still be validated using `validate_cert`. -pub fn create_tls_acceptor( +pub(crate) fn create_tls_acceptor( cert: &X509Ref, private_key: &PKeyRef, ) -> SslResult { @@ -339,7 +336,7 @@ pub fn create_tls_acceptor( /// /// A connector compatible with the acceptor created using `create_tls_acceptor`. Server /// certificates must always be validated using `validate_cert` after connecting. -pub fn create_tls_connector( +pub(crate) fn create_tls_connector( cert: &X509Ref, private_key: &PKeyRef, ) -> SslResult { @@ -374,7 +371,7 @@ fn set_context_options( /// Error during certificate validation. #[derive(Debug, Display, Error)] -pub enum ValidationError { +pub(crate) enum ValidationError { /// error reading public key from certificate: {0:?} CannotReadPublicKey(#[source] ErrorStack), /// error reading subject or issuer name: {0:?} @@ -415,7 +412,7 @@ pub enum ValidationError { /// fingerprint of the public key. /// /// At the very least this ensures that no weaker ciphers have been used to forge a certificate. -pub fn validate_cert(cert: X509) -> Result { +pub(crate) fn validate_cert(cert: X509) -> Result { if cert.signature_algorithm().object().nid() != SIGNATURE_ALGORITHM { // The signature algorithm is not of the exact kind we are using to generate our // certificates, an attacker could have used a weaker one to generate colliding keys. @@ -512,7 +509,7 @@ pub fn validate_cert(cert: X509) -> Result { } /// Loads a certificate from a file. -pub fn load_cert>(src: P) -> anyhow::Result { +pub(crate) fn load_cert>(src: P) -> anyhow::Result { let pem = fs::read(src.as_ref()) .with_context(|| format!("failed to load certificate {:?}", src.as_ref()))?; @@ -520,7 +517,7 @@ pub fn load_cert>(src: P) -> anyhow::Result { } /// Loads a private key from a file. -pub fn load_private_key>(src: P) -> anyhow::Result> { +pub(crate) fn load_private_key>(src: P) -> anyhow::Result> { let pem = fs::read(src.as_ref()) .with_context(|| format!("failed to load private key {:?}", src.as_ref()))?; @@ -688,7 +685,7 @@ mod x509_serde { use super::validate_cert; /// Serde-compatible serialization for X509 certificates. - pub fn serialize(value: &X509, serializer: S) -> Result + pub(super) fn serialize(value: &X509, serializer: S) -> Result where S: Serializer, { @@ -699,7 +696,7 @@ mod x509_serde { } /// Serde-compatible deserialization for X509 certificates. - pub fn deserialize<'de, D>(deserializer: D) -> Result + pub(super) fn deserialize<'de, D>(deserializer: D) -> Result where D: Deserializer<'de>, { diff --git a/src/utils.rs b/src/utils.rs index 7da11e649e..53a62db9a3 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -8,30 +8,20 @@ use std::{ fmt::{self, Display, Formatter}, }; -use smallvec::SmallVec; - -pub use round_robin::WeightedRoundRobin; +pub(crate) use round_robin::WeightedRoundRobin; /// Moves a value to the heap and then forgets about, leaving only a static reference behind. #[inline] -pub fn leak(value: T) -> &'static T { +pub(crate) fn leak(value: T) -> &'static T { Box::leak(Box::new(value)) } -/// Small amount store. -/// -/// Stored in a `SmallVec` to avoid allocations in case there are less than three items grouped. The -/// size of two items is chosen because one item is the most common use case, and large items are -/// typically boxed. In the latter case two pointers and one enum variant discriminator is almost -/// the same size as an empty vec, which is two pointers. -pub type Multiple = SmallVec<[T; 2]>; - /// A display-helper that shows iterators display joined by ",". #[derive(Debug)] -pub struct DisplayIter(RefCell>); +pub(crate) struct DisplayIter(RefCell>); impl DisplayIter { - pub fn new(item: T) -> Self { + pub(crate) fn new(item: T) -> Self { DisplayIter(RefCell::new(Some(item))) } } diff --git a/src/utils/round_robin.rs b/src/utils/round_robin.rs index b15677b6c5..2717c13e05 100644 --- a/src/utils/round_robin.rs +++ b/src/utils/round_robin.rs @@ -23,7 +23,7 @@ use tokio::sync::{Mutex, Semaphore}; /// /// The scheduler keeps track internally which queue needs to be popped next. #[derive(Debug)] -pub struct WeightedRoundRobin { +pub(crate) struct WeightedRoundRobin { /// Current iteration state. state: Mutex>, @@ -70,7 +70,7 @@ where /// /// Creates a queue for each pair given in `weights`. The second component of each `weight` is /// the number of times to return items from one queue before moving on to the next one. - pub fn new(weights: Vec<(K, NonZeroUsize)>) -> Self { + pub(crate) fn new(weights: Vec<(K, NonZeroUsize)>) -> Self { assert!(!weights.is_empty(), "must provide at least one slot"); let queues = weights @@ -102,7 +102,7 @@ where /// ## Panics /// /// Panics if the queue identified by key `queue` does not exist. - pub async fn push(&self, item: I, queue: K) { + pub(crate) async fn push(&self, item: I, queue: K) { self.queues .get(&queue) .expect("tried to push to non-existent queue") @@ -117,7 +117,7 @@ where /// Returns the next item from queue. /// /// Asynchronously waits until a queue is non-empty or panics if an internal error occurred. - pub async fn pop(&self) -> (I, K) { + pub(crate) async fn pop(&self) -> (I, K) { self.total.acquire().await.forget(); let mut inner = self.state.lock().await; From fbc847e31cc969615bc0992959864988679c30df Mon Sep 17 00:00:00 2001 From: Fraser Hutchison Date: Sat, 23 May 2020 16:56:52 +0100 Subject: [PATCH 2/6] NDRS-38: added initial cut of storage component --- Cargo.lock | 65 ++++++ Cargo.toml | 1 + src/components.rs | 1 + src/components/small_network/config.rs | 2 +- src/components/storage.rs | 207 +++++++++++++++++++ src/components/storage/block.rs | 29 +++ src/components/storage/linear_block_store.rs | 45 ++++ src/effect.rs | 117 +++++++---- src/reactor.rs | 22 +- src/reactor/validator.rs | 56 ++++- src/utils/round_robin.rs | 2 +- 11 files changed, 496 insertions(+), 51 deletions(-) create mode 100644 src/components/storage.rs create mode 100644 src/components/storage/block.rs create mode 100644 src/components/storage/linear_block_store.rs diff --git a/Cargo.lock b/Cargo.lock index 3d64f4a8d2..2dbbc35bba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -71,6 +71,7 @@ dependencies = [ "hex_fmt", "maplit", "openssl", + "rand", "rmp-serde", "serde", "serde-big-array", @@ -304,6 +305,17 @@ dependencies = [ "slab", ] +[[package]] +name = "getrandom" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "heck" version = "0.3.1" @@ -537,6 +549,12 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05da548ad6865900e60eaba7f589cc0783590a92e940c26953ff81ddbab2d677" +[[package]] +name = "ppv-lite86" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "237a5ed80e274dbc66f86bd59c1e25edc039660be53194b5fe0a482e0f2612ea" + [[package]] name = "proc-macro-error" version = "1.0.2" @@ -593,6 +611,47 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" +dependencies = [ + "getrandom", + "libc", + "rand_chacha", + "rand_core", + "rand_hc", +] + +[[package]] +name = "rand_chacha" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" +dependencies = [ + "getrandom", +] + +[[package]] +name = "rand_hc" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" +dependencies = [ + "rand_core", +] + [[package]] name = "regex" version = "1.3.7" @@ -994,6 +1053,12 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "078775d0255232fb988e6fccf26ddc9d1ac274299aaedcedce21c6f72cc533ce" +[[package]] +name = "wasi" +version = "0.9.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" + [[package]] name = "winapi" version = "0.2.8" diff --git a/Cargo.toml b/Cargo.toml index 902f604f29..a6fa74660e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ futures = "0.3.5" hex_fmt = "0.3.0" maplit = "1.0.2" openssl = "0.10.29" +rand = "0.7.3" rmp-serde = "0.14.3" serde = { version = "1.0.110", features = ["derive"] } serde-big-array = "0.3.0" diff --git a/src/components.rs b/src/components.rs index b699a34e3c..a0af5b4297 100644 --- a/src/components.rs +++ b/src/components.rs @@ -3,3 +3,4 @@ //! Docs to be written, sorry. pub(crate) mod small_network; +pub(crate) mod storage; diff --git a/src/components/small_network/config.rs b/src/components/small_network/config.rs index 5941fb5fc7..f9949a0e33 100644 --- a/src/components/small_network/config.rs +++ b/src/components/small_network/config.rs @@ -5,8 +5,8 @@ use std::{ use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize)] /// Small network configuration +#[derive(Debug, Deserialize, Serialize)] pub struct Config { /// Interface to bind to. If it is the same as the in `root_addr`, attempt /// become the root node for this particular small network. diff --git a/src/components/storage.rs b/src/components/storage.rs new file mode 100644 index 0000000000..483f8c6c6a --- /dev/null +++ b/src/components/storage.rs @@ -0,0 +1,207 @@ +mod block; +mod linear_block_store; + +use std::{ + collections::HashSet, + fmt::{self, Debug, Formatter}, +}; + +use smallvec::smallvec; +use tracing::info; + +use crate::effect::{Effect, Multiple, Responder}; +pub(crate) use block::{BlockType, CLBlock}; +pub(crate) use linear_block_store::BlockStoreType; +use linear_block_store::InMemBlockStore; + +pub(crate) type Storage = InMemStorage; + +pub(crate) enum Event +where + ::Block: Debug, +{ + PutBlock { + block: ::Block, + responder: Responder>, + }, + GetBlock { + name: <::Block as BlockType>::Name, + responder: Responder::Block>, Event>, + }, +} + +impl Debug for Event { + fn fmt(&self, formatter: &mut Formatter) -> fmt::Result { + match self { + Event::PutBlock { block, .. } => { + write!(formatter, "Event::PutBlock {{ block: {:?} }}", block) + } + Event::GetBlock { name, .. } => { + write!(formatter, "Event::GetBlock {{ name: {:?} }}", name) + } + } + } +} + +// Trait which will handle management of the various storage sub-components. +// +// If this trait is ultimately only used for testing scenarios, we shouldn't need to expose it to +// the reactor - it can simply use a concrete type which implements this trait. +pub(crate) trait StorageType { + type BlockStore: BlockStoreType; + + fn handle_event(&mut self, event: Event) -> Multiple>> + where + Self: Sized, + { + match event { + Event::PutBlock { block, responder } => { + let result = self.block_store_mut().put(block); + smallvec![responder(result)] + } + Event::GetBlock { name, responder } => { + let result = self.block_store().get(&name); + smallvec![responder(result)] + } + } + } + + fn block_store(&mut self) -> &Self::BlockStore; + fn block_store_mut(&mut self) -> &mut Self::BlockStore; +} + +// Concrete type of `Storage` - backed by in-memory block store only for now, but will eventually +// also hold in-mem versions of wasm-store, deploy-store, etc. +#[derive(Debug)] +pub(crate) struct InMemStorage { + block_store: InMemBlockStore, +} + +impl InMemStorage { + pub(crate) fn new() -> Self { + InMemStorage { + block_store: InMemBlockStore::new(), + } + } +} + +impl StorageType for InMemStorage { + type BlockStore = InMemBlockStore; + + fn block_store(&mut self) -> &Self::BlockStore { + &self.block_store + } + + fn block_store_mut(&mut self) -> &mut Self::BlockStore { + &mut self.block_store + } +} + +pub(crate) mod dummy { + use std::time::Duration; + + use rand::{self, Rng}; + + use super::*; + use crate::{ + effect::{EffectBuilder, EffectExt}, + reactor::Reactor, + }; + + #[derive(Debug)] + pub(crate) enum Event { + Trigger, + PutBlockSucceeded(u8), + PutBlockFailed(u8), + GetBlock(u8, Option), + } + + #[derive(Debug)] + pub(crate) struct StorageConsumer { + stored_blocks_names: HashSet, + } + + impl StorageConsumer { + pub(crate) fn new( + storage_effect_builder: EffectBuilder>, + ) -> (Self, Multiple>) { + ( + Self { + stored_blocks_names: HashSet::new(), + }, + Self::set_timeout(storage_effect_builder), + ) + } + + pub(crate) fn handle_event( + &mut self, + storage_effect_builder: EffectBuilder>, + event: Event, + ) -> Multiple> { + match event { + Event::Trigger => { + let mut rng = rand::thread_rng(); + let create_block: bool = rng.gen(); + if create_block { + let block = CLBlock::new(rng.gen(), rng.gen()); + let name = *block.name(); + self.stored_blocks_names.insert(name); + Self::request_put_block(storage_effect_builder, block) + } else { + let name = rng.gen(); + Self::request_get_block(storage_effect_builder, name) + } + } + Event::PutBlockSucceeded(name) => { + info!("Consumer knows {} has been stored.", name); + Self::set_timeout(storage_effect_builder) + } + Event::PutBlockFailed(name) => { + info!("Consumer knows {} has failed to be stored.", name); + Self::set_timeout(storage_effect_builder) + } + Event::GetBlock(name, maybe_block) => { + info!("Consumer received {:?}.", maybe_block); + assert_eq!( + maybe_block.is_some(), + self.stored_blocks_names.contains(&name) + ); + Self::set_timeout(storage_effect_builder) + } + } + } + + fn set_timeout( + storage_effect_builder: EffectBuilder>, + ) -> Multiple> { + storage_effect_builder + .set_timeout(Duration::from_millis(100)) + .event(|_| Event::Trigger) + } + + fn request_put_block( + storage_effect_builder: EffectBuilder>, + block: CLBlock, + ) -> Multiple> { + let name = *block.name(); + storage_effect_builder + .make_request(|responder| super::Event::PutBlock { block, responder }) + .event(move |is_success| { + if is_success { + Event::PutBlockSucceeded(name) + } else { + Event::PutBlockFailed(name) + } + }) + } + + fn request_get_block( + storage_effect_builder: EffectBuilder>, + name: u8, + ) -> Multiple> { + storage_effect_builder + .make_request(move |responder| super::Event::GetBlock { name, responder }) + .event(move |maybe_block| Event::GetBlock(name, maybe_block)) + } + } +} diff --git a/src/components/storage/block.rs b/src/components/storage/block.rs new file mode 100644 index 0000000000..7c5cea8298 --- /dev/null +++ b/src/components/storage/block.rs @@ -0,0 +1,29 @@ +use std::{fmt::Debug, hash::Hash}; + +use serde::{de::DeserializeOwned, Deserialize, Serialize}; + +pub(crate) trait BlockType: Clone + Serialize + DeserializeOwned + Send + Debug { + type Name: Copy + Hash + PartialOrd + Ord + PartialEq + Eq + Debug + Send; + + fn name(&self) -> &Self::Name; +} + +#[derive(Clone, Serialize, Deserialize, Default, Debug)] +pub(crate) struct CLBlock { + name: u8, + etc: u64, +} + +impl CLBlock { + pub(crate) fn new(name: u8, etc: u64) -> Self { + CLBlock { name, etc } + } +} + +impl BlockType for CLBlock { + type Name = u8; + + fn name(&self) -> &Self::Name { + &self.name + } +} diff --git a/src/components/storage/linear_block_store.rs b/src/components/storage/linear_block_store.rs new file mode 100644 index 0000000000..fb21ace3e9 --- /dev/null +++ b/src/components/storage/linear_block_store.rs @@ -0,0 +1,45 @@ +use std::collections::{hash_map::Entry, HashMap}; + +use super::BlockType; + +// Trait defining the API for the linear block store. Doesn't need to fit the "Event/Effect" model +// as this is a sub-component of the storage component. +pub(crate) trait BlockStoreType { + type Block: BlockType; + + fn put(&mut self, block: Self::Block) -> bool; + fn get( + &self, + name: &<::Block as BlockType>::Name, + ) -> Option; +} + +// In-memory version of a block store. +#[derive(Debug)] +pub(crate) struct InMemBlockStore { + inner: HashMap, +} + +impl InMemBlockStore { + pub(crate) fn new() -> Self { + InMemBlockStore { + inner: HashMap::new(), + } + } +} + +impl BlockStoreType for InMemBlockStore { + type Block = B; + + fn put(&mut self, block: B) -> bool { + if let Entry::Vacant(entry) = self.inner.entry(*block.name()) { + entry.insert(block); + return true; + } + false + } + + fn get(&self, name: &B::Name) -> Option { + self.inner.get(name).cloned() + } +} diff --git a/src/effect.rs b/src/effect.rs index ed5d780189..641329ea72 100644 --- a/src/effect.rs +++ b/src/effect.rs @@ -8,36 +8,19 @@ //! //! ## Using effects //! -//! To create an effect, an events factory is used that implements one or more of the factory traits -//! of this module. For example, given an events factory `events_factory`, we can create a -//! `set_timeout` future and turn it into an effect: +//! To create an effect, an `EffectBuilder` will be passed in from the relevant reactor. For +//! example, given an effect builder `effect_builder`, we can create a `set_timeout` future and turn +//! it into an effect: //! -//! ``` +//! ```ignore //! use std::time::Duration; -//! use casper_node::effect::{Core, EffectExt}; +//! use casper_node::effect::EffectExt; //! -//! # struct EffectBuilder {} -//! # -//! # impl Core for EffectBuilder { -//! # fn immediately(self) -> futures::future::BoxFuture<'static, ()> { -//! # Box::pin(async {}) -//! # } -//! # -//! # fn set_timeout(self, timeout: Duration) -> futures::future::BoxFuture<'static, Duration> { -//! # Box::pin(async move { -//! # let then = std::time::Instant::now(); -//! # tokio::time::delay_for(timeout).await; -//! # std::time::Instant::now() - then -//! # }) -//! # } -//! # } -//! # let events_factory = EffectBuilder {}; -//! # //! enum Event { //! ThreeSecondsElapsed(Duration) //! } //! -//! events_factory +//! effect_builder //! .set_timeout(Duration::from_secs(3)) //! .event(Event::ThreeSecondsElapsed); //! ``` @@ -52,15 +35,22 @@ //! the effects explicitly listed in this module through traits to create them. Post-processing on //! effects to turn them into events should also be kept brief. -use std::{future::Future, time::Duration}; +use std::{ + future::Future, + time::{Duration, Instant}, +}; -use futures::{future::BoxFuture, FutureExt}; +use futures::{channel::oneshot, future::BoxFuture, FutureExt}; use smallvec::{smallvec, SmallVec}; +use tracing::error; -/// Boxed futures that produce one or more events. +/// A boxed future that produces one or more events. pub type Effect = BoxFuture<'static, Multiple>; -/// Intended to hold a small collection of effects. +/// A boxed closure which returns an [`Effect`](type.Effect.html). +pub type Responder = Box Effect + Send>; + +/// Intended to hold a small collection of [`Effect`](type.Effect.html)s. /// /// Stored in a `SmallVec` to avoid allocations in case there are less than three items grouped. The /// size of two items is chosen because one item is the most common use case, and large items are @@ -139,17 +129,70 @@ where } } -/// Core effects. -pub trait Core { - /// Immediately completes without doing anything. - /// - /// Can be used to trigger an event. - fn immediately(self) -> BoxFuture<'static, ()>; +use crate::reactor::{EventQueueHandle, QueueKind, Reactor}; + +/// A builder for [`Effect`](type.Effect.html)s. +/// +/// Provides methods allowing the creation of effects which need scheduled on the reactor's event +/// queue, without giving direct access to this queue. +#[derive(Copy, Clone, Debug)] +pub struct EffectBuilder { + event_queue_handle: EventQueueHandle, + queue_kind: QueueKind, +} + +impl EffectBuilder { + pub(crate) fn new(event_queue_handle: EventQueueHandle, queue_kind: QueueKind) -> Self { + EffectBuilder { + event_queue_handle, + queue_kind, + } + } /// Sets a timeout. + pub async fn set_timeout(self, timeout: Duration) -> Duration { + let then = Instant::now(); + tokio::time::delay_for(timeout).await; + Instant::now() - then + } + + /// Creates a request and response pair. /// - /// Once the timeout fires, it will return the actual elapsed time since the execution (not - /// creation!) of this effect. Event loops typically execute effects right after a called event - /// handling function completes. - fn set_timeout(self, timeout: Duration) -> BoxFuture<'static, Duration>; + /// This creates and enqueues a request Event by invoking the provided `create_request_event` + /// closure, having first created the responder required in the form of a oneshot channel. + pub async fn make_request(self, create_request_event: F) -> T + where + T: 'static + Send, + F: FnOnce(Responder) -> Ev, + { + // Prepare a channel. + let (sender, receiver) = oneshot::channel(); + + // Create response function. + let responder = create_responder(sender); + + // Now inject the request event into the event loop. + let request_event = create_request_event(responder); + self.event_queue_handle + .schedule(request_event, self.queue_kind) + .await; + + receiver.await.unwrap_or_else(|err| { + // The channel should never be closed, ever. + error!(%err, "request oneshot closed, this should not happen"); + unreachable!() + }) + } +} + +fn create_responder(sender: oneshot::Sender) -> Responder { + Box::new(move |value| { + async move { + if sender.send(value).is_err() { + error!("could not send response to request down oneshot channel") + } + smallvec![] + } + .boxed() + }) } diff --git a/src/reactor.rs b/src/reactor.rs index b1744c25a7..36d8338bea 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -27,7 +27,10 @@ pub mod non_validator; mod queue_kind; pub mod validator; -use std::{fmt, mem}; +use std::{ + fmt::{Debug, Display}, + mem, +}; use futures::FutureExt; use tracing::{debug, info, trace, warn}; @@ -45,7 +48,7 @@ pub use queue_kind::QueueKind; /// is the central hook for any part of the program that schedules events directly. /// /// Components rarely use this, but use a bound `EventQueueHandle` instead. -type Scheduler = WeightedRoundRobin; +pub type Scheduler = WeightedRoundRobin; /// Bound event queue handle /// @@ -62,7 +65,8 @@ where { /// The scheduler events will be scheduled on. scheduler: &'static Scheduler<::Event>, - /// A wrapper function translating from component event (input of `W`) to reactor event `Ev`. + /// A wrapper function translating from component event (input of `Ev`) to reactor event + /// `R::Event`. wrapper: fn(Ev) -> R::Event, } @@ -101,21 +105,25 @@ where /// /// Any reactor should implement this trait and be launched by the [`launch`](fn.launch.html) /// function. -pub(crate) trait Reactor: Sized { +pub trait Reactor: Sized { // Note: We've gone for the `Sized` bound here, since we return an instance in `new`. As an // alternative, `new` could return a boxed instance instead, removing this requirement. /// Event type associated with reactor. /// /// Defines what kind of event the reactor processes. - type Event: Send + fmt::Debug + fmt::Display + 'static; + type Event: Send + Debug + Display + 'static; /// Dispatches an event on the reactor. /// /// This function is typically only called by the reactor itself to dispatch an event. It is /// safe to call regardless, but will cause the event to skip the queue and things like /// accounting. - fn dispatch_event(&mut self, event: Self::Event) -> Multiple>; + fn dispatch_event( + &mut self, + scheduler: &'static Scheduler, + event: Self::Event, + ) -> Multiple>; /// Creates a new instance of the reactor. /// @@ -168,7 +176,7 @@ async fn launch(cfg: SmallNetworkConfig) -> anyhow::Result<()> { trace!(?event, ?q, "event"); // Dispatch the event, then execute the resulting effect. - let effects = reactor.dispatch_event(event); + let effects = reactor.dispatch_event(scheduler, event); process_effects(scheduler, effects).await; } } diff --git a/src/reactor/validator.rs b/src/reactor/validator.rs index 0a099e823b..0ab9c93ba7 100644 --- a/src/reactor/validator.rs +++ b/src/reactor/validator.rs @@ -7,8 +7,9 @@ use std::fmt::{self, Display, Formatter}; use serde::{Deserialize, Serialize}; use crate::{ - effect::{Effect, Multiple}, - reactor::{self, EventQueueHandle, Scheduler}, + components::storage::{self, Storage, StorageType}, + effect::{Effect, EffectBuilder, Multiple}, + reactor::{self, EventQueueHandle, QueueKind, Scheduler}, small_network, SmallNetwork, SmallNetworkConfig, }; @@ -17,6 +18,8 @@ use crate::{ #[must_use] enum Event { Network(small_network::Event), + Storage(storage::Event), + StorageConsumer(storage::dummy::Event), } #[derive(Clone, Debug, Deserialize, Serialize)] @@ -25,6 +28,8 @@ enum Message {} /// Validator node reactor. struct Reactor { net: SmallNetwork, + storage: Storage, + dummy_storage_consumer: storage::dummy::StorageConsumer, } impl reactor::Reactor for Reactor { @@ -37,15 +42,51 @@ impl reactor::Reactor for Reactor { let (net, net_effects) = SmallNetwork::new(EventQueueHandle::bind(scheduler, Event::Network), cfg)?; + let storage = Storage::new(); + let storage_effect_builder = EffectBuilder::new( + EventQueueHandle::bind(scheduler, Event::Storage), + QueueKind::Regular, + ); + let (dummy_storage_consumer, storage_consumer_effects) = + storage::dummy::StorageConsumer::new::(storage_effect_builder); + + let mut effects = reactor::wrap_effects(Event::Network, net_effects); + effects.extend(reactor::wrap_effects( + Event::StorageConsumer, + storage_consumer_effects, + )); + Ok(( - Reactor { net }, - reactor::wrap_effects(Event::Network, net_effects), + Reactor { + net, + storage, + dummy_storage_consumer, + }, + effects, )) } - fn dispatch_event(&mut self, event: Event) -> Multiple> { + fn dispatch_event( + &mut self, + scheduler: &'static Scheduler, + event: Event, + ) -> Multiple> { match event { Event::Network(ev) => reactor::wrap_effects(Event::Network, self.net.handle_event(ev)), + Event::Storage(ev) => { + reactor::wrap_effects(Event::Storage, self.storage.handle_event(ev)) + } + Event::StorageConsumer(ev) => { + let storage_effect_builder = EffectBuilder::::new( + EventQueueHandle::bind(scheduler, Event::Storage), + QueueKind::Regular, + ); + reactor::wrap_effects( + Event::StorageConsumer, + self.dummy_storage_consumer + .handle_event(storage_effect_builder, ev), + ) + } } } } @@ -54,6 +95,11 @@ impl Display for Event { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { Event::Network(ev) => write!(f, "network: {}", ev), + // TODO - impl Display for storage::Event + // Event::Storage(ev) => write!(f, "storage: {}", ev), + // Event::StorageConsumer(ev) => write!(f, "storage-consumer: {}", ev), + Event::Storage(_) => write!(f, "storage event"), + Event::StorageConsumer(_) => write!(f, "storage-consumer event"), } } } diff --git a/src/utils/round_robin.rs b/src/utils/round_robin.rs index 2717c13e05..ce1ddd4637 100644 --- a/src/utils/round_robin.rs +++ b/src/utils/round_robin.rs @@ -23,7 +23,7 @@ use tokio::sync::{Mutex, Semaphore}; /// /// The scheduler keeps track internally which queue needs to be popped next. #[derive(Debug)] -pub(crate) struct WeightedRoundRobin { +pub struct WeightedRoundRobin { /// Current iteration state. state: Mutex>, From 70a01bad7b78b756a0a83f6b662d5fd769c7d0be Mon Sep 17 00:00:00 2001 From: Fraser Hutchison Date: Mon, 25 May 2020 11:25:59 +0100 Subject: [PATCH 3/6] NDRS-38: put storage calls behind async interface --- Cargo.toml | 2 +- src/components/storage.rs | 50 +++++++++++++------- src/components/storage/block.rs | 6 ++- src/components/storage/linear_block_store.rs | 1 + src/effect.rs | 14 +++++- 5 files changed, 52 insertions(+), 21 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a6fa74660e..85076b515d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ serde-big-array = "0.3.0" smallvec = "1.4.0" structopt = "0.3.14" thiserror = "1.0.18" -tokio = { version = "0.2.20", features = ["macros", "rt-threaded", "sync", "tcp", "time"] } +tokio = { version = "0.2.20", features = ["macros", "rt-threaded", "sync", "tcp", "time", "blocking"] } tokio-openssl = "0.4.0" tokio-serde = { version = "0.6.1", features = ["messagepack"] } tokio-util = { version = "0.3.1", features = ["codec"] } diff --git a/src/components/storage.rs b/src/components/storage.rs index 483f8c6c6a..20b0303716 100644 --- a/src/components/storage.rs +++ b/src/components/storage.rs @@ -4,9 +4,12 @@ mod linear_block_store; use std::{ collections::HashSet, fmt::{self, Debug, Formatter}, + sync::Arc, }; +use futures::FutureExt; use smallvec::smallvec; +use tokio::{sync::RwLock, task}; use tracing::info; use crate::effect::{Effect, Multiple, Responder}; @@ -48,39 +51,56 @@ impl Debug for Event { // If this trait is ultimately only used for testing scenarios, we shouldn't need to expose it to // the reactor - it can simply use a concrete type which implements this trait. pub(crate) trait StorageType { - type BlockStore: BlockStoreType; + type BlockStore: BlockStoreType + Send + Sync; fn handle_event(&mut self, event: Event) -> Multiple>> where - Self: Sized, + Self: Sized + 'static, { match event { Event::PutBlock { block, responder } => { - let result = self.block_store_mut().put(block); - smallvec![responder(result)] + let block_store = self.block_store(); + let future = async move { + task::spawn_blocking(move || async move { + let mut block_store = block_store.write().await; + block_store.put(block) + }) + .await + .expect("should run") + .await + }; + smallvec![future.then(|is_success| responder(is_success)).boxed()] } Event::GetBlock { name, responder } => { - let result = self.block_store().get(&name); - smallvec![responder(result)] + let block_store = self.block_store(); + let future = async move { + task::spawn_blocking(move || async move { + let block_store = block_store.read().await; + block_store.get(&name) + }) + .await + .expect("should run") + .await + }; + smallvec![future.then(|block| responder(block)).boxed()] } } } - fn block_store(&mut self) -> &Self::BlockStore; - fn block_store_mut(&mut self) -> &mut Self::BlockStore; + fn block_store(&self) -> Arc>; } // Concrete type of `Storage` - backed by in-memory block store only for now, but will eventually // also hold in-mem versions of wasm-store, deploy-store, etc. #[derive(Debug)] pub(crate) struct InMemStorage { - block_store: InMemBlockStore, + block_store: Arc>>, } impl InMemStorage { pub(crate) fn new() -> Self { InMemStorage { - block_store: InMemBlockStore::new(), + block_store: Arc::new(RwLock::new(InMemBlockStore::new())), } } } @@ -88,12 +108,8 @@ impl InMemStorage { impl StorageType for InMemStorage { type BlockStore = InMemBlockStore; - fn block_store(&mut self) -> &Self::BlockStore { - &self.block_store - } - - fn block_store_mut(&mut self) -> &mut Self::BlockStore { - &mut self.block_store + fn block_store(&self) -> Arc> { + Arc::clone(&self.block_store) } } @@ -175,7 +191,7 @@ pub(crate) mod dummy { storage_effect_builder: EffectBuilder>, ) -> Multiple> { storage_effect_builder - .set_timeout(Duration::from_millis(100)) + .set_timeout(Duration::from_millis(10)) .event(|_| Event::Trigger) } diff --git a/src/components/storage/block.rs b/src/components/storage/block.rs index 7c5cea8298..1b558afdcb 100644 --- a/src/components/storage/block.rs +++ b/src/components/storage/block.rs @@ -2,8 +2,10 @@ use std::{fmt::Debug, hash::Hash}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -pub(crate) trait BlockType: Clone + Serialize + DeserializeOwned + Send + Debug { - type Name: Copy + Hash + PartialOrd + Ord + PartialEq + Eq + Debug + Send; +pub(crate) trait BlockType: + Clone + Serialize + DeserializeOwned + Send + Sync + Debug +{ + type Name: Copy + Hash + PartialOrd + Ord + PartialEq + Eq + Debug + Send + Sync; fn name(&self) -> &Self::Name; } diff --git a/src/components/storage/linear_block_store.rs b/src/components/storage/linear_block_store.rs index fb21ace3e9..2465d07ede 100644 --- a/src/components/storage/linear_block_store.rs +++ b/src/components/storage/linear_block_store.rs @@ -32,6 +32,7 @@ impl BlockStoreType for InMemBlockStore { type Block = B; fn put(&mut self, block: B) -> bool { + std::thread::sleep(std::time::Duration::from_millis(100)); if let Entry::Vacant(entry) = self.inner.entry(*block.name()) { entry.insert(block); return true; diff --git a/src/effect.rs b/src/effect.rs index 641329ea72..5f8ad4f65a 100644 --- a/src/effect.rs +++ b/src/effect.rs @@ -135,12 +135,24 @@ use crate::reactor::{EventQueueHandle, QueueKind, Reactor}; /// /// Provides methods allowing the creation of effects which need scheduled on the reactor's event /// queue, without giving direct access to this queue. -#[derive(Copy, Clone, Debug)] +#[derive(Debug)] pub struct EffectBuilder { event_queue_handle: EventQueueHandle, queue_kind: QueueKind, } +// Implement `Clone` and `Copy` manually, as `derive` will make it depend on `R` and `Ev` otherwise. +impl Clone for EffectBuilder { + fn clone(&self) -> Self { + EffectBuilder { + event_queue_handle: self.event_queue_handle, + queue_kind: self.queue_kind, + } + } +} + +impl Copy for EffectBuilder {} + impl EffectBuilder { pub(crate) fn new(event_queue_handle: EventQueueHandle, queue_kind: QueueKind) -> Self { EffectBuilder { From d3fce2ac9b5d373473cdb828a8205be539502f09 Mon Sep 17 00:00:00 2001 From: Fraser Hutchison Date: Mon, 25 May 2020 12:40:30 +0100 Subject: [PATCH 4/6] NDRS-38: move RwLock into InMemBlockStore --- src/components/storage.rs | 30 +++++++------------- src/components/storage/linear_block_store.rs | 18 +++++++----- 2 files changed, 22 insertions(+), 26 deletions(-) diff --git a/src/components/storage.rs b/src/components/storage.rs index 20b0303716..2ba1f8abea 100644 --- a/src/components/storage.rs +++ b/src/components/storage.rs @@ -9,7 +9,7 @@ use std::{ use futures::FutureExt; use smallvec::smallvec; -use tokio::{sync::RwLock, task}; +use tokio::task; use tracing::info; use crate::effect::{Effect, Multiple, Responder}; @@ -61,46 +61,38 @@ pub(crate) trait StorageType { Event::PutBlock { block, responder } => { let block_store = self.block_store(); let future = async move { - task::spawn_blocking(move || async move { - let mut block_store = block_store.write().await; - block_store.put(block) - }) - .await - .expect("should run") - .await + task::spawn_blocking(move || block_store.put(block)) + .await + .expect("should run") }; smallvec![future.then(|is_success| responder(is_success)).boxed()] } Event::GetBlock { name, responder } => { let block_store = self.block_store(); let future = async move { - task::spawn_blocking(move || async move { - let block_store = block_store.read().await; - block_store.get(&name) - }) - .await - .expect("should run") - .await + task::spawn_blocking(move || block_store.get(&name)) + .await + .expect("should run") }; smallvec![future.then(|block| responder(block)).boxed()] } } } - fn block_store(&self) -> Arc>; + fn block_store(&self) -> Arc; } // Concrete type of `Storage` - backed by in-memory block store only for now, but will eventually // also hold in-mem versions of wasm-store, deploy-store, etc. #[derive(Debug)] pub(crate) struct InMemStorage { - block_store: Arc>>, + block_store: Arc>, } impl InMemStorage { pub(crate) fn new() -> Self { InMemStorage { - block_store: Arc::new(RwLock::new(InMemBlockStore::new())), + block_store: Arc::new(InMemBlockStore::new()), } } } @@ -108,7 +100,7 @@ impl InMemStorage { impl StorageType for InMemStorage { type BlockStore = InMemBlockStore; - fn block_store(&self) -> Arc> { + fn block_store(&self) -> Arc { Arc::clone(&self.block_store) } } diff --git a/src/components/storage/linear_block_store.rs b/src/components/storage/linear_block_store.rs index 2465d07ede..9dfe6eff4a 100644 --- a/src/components/storage/linear_block_store.rs +++ b/src/components/storage/linear_block_store.rs @@ -1,4 +1,7 @@ -use std::collections::{hash_map::Entry, HashMap}; +use std::{ + collections::{hash_map::Entry, HashMap}, + sync::RwLock, +}; use super::BlockType; @@ -7,7 +10,7 @@ use super::BlockType; pub(crate) trait BlockStoreType { type Block: BlockType; - fn put(&mut self, block: Self::Block) -> bool; + fn put(&self, block: Self::Block) -> bool; fn get( &self, name: &<::Block as BlockType>::Name, @@ -17,13 +20,13 @@ pub(crate) trait BlockStoreType { // In-memory version of a block store. #[derive(Debug)] pub(crate) struct InMemBlockStore { - inner: HashMap, + inner: RwLock>, } impl InMemBlockStore { pub(crate) fn new() -> Self { InMemBlockStore { - inner: HashMap::new(), + inner: RwLock::new(HashMap::new()), } } } @@ -31,9 +34,10 @@ impl InMemBlockStore { impl BlockStoreType for InMemBlockStore { type Block = B; - fn put(&mut self, block: B) -> bool { + fn put(&self, block: B) -> bool { + let mut inner = self.inner.write().unwrap(); std::thread::sleep(std::time::Duration::from_millis(100)); - if let Entry::Vacant(entry) = self.inner.entry(*block.name()) { + if let Entry::Vacant(entry) = inner.entry(*block.name()) { entry.insert(block); return true; } @@ -41,6 +45,6 @@ impl BlockStoreType for InMemBlockStore { } fn get(&self, name: &B::Name) -> Option { - self.inner.get(name).cloned() + self.inner.read().unwrap().get(name).cloned() } } From 78806d6cd08f748dd118742359aafc6134c4613a Mon Sep 17 00:00:00 2001 From: Fraser Hutchison Date: Tue, 26 May 2020 18:12:01 +0100 Subject: [PATCH 5/6] NDRS-43: improve logging --- Cargo.lock | 14 ++++- Cargo.toml | 1 + src/app/config.rs | 105 +++++++++++++++++++++++++++++++- src/components/storage.rs | 27 +++++++- src/components/storage/block.rs | 15 ++++- src/reactor.rs | 4 +- src/reactor/validator.rs | 9 +-- 7 files changed, 159 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2dbbc35bba..4461b6139d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -18,6 +18,15 @@ dependencies = [ "winapi 0.3.8", ] +[[package]] +name = "ansi_term" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" +dependencies = [ + "winapi 0.3.8", +] + [[package]] name = "anyhow" version = "1.0.31" @@ -63,6 +72,7 @@ checksum = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1" name = "casper-node" version = "0.1.0" dependencies = [ + "ansi_term 0.12.1", "anyhow", "displaydoc", "either", @@ -116,7 +126,7 @@ version = "2.33.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bdfa80d47f954d53a35a64987ca1422f495b8d6483c0fe9f7117b36c2a792129" dependencies = [ - "ansi_term", + "ansi_term 0.11.0", "atty", "bitflags", "strsim", @@ -1003,7 +1013,7 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d53c40489aa69c9aed21ff483f26886ca8403df33bdc2d2f87c60c1617826d2" dependencies = [ - "ansi_term", + "ansi_term 0.11.0", "chrono", "lazy_static", "matchers", diff --git a/Cargo.toml b/Cargo.toml index 85076b515d..8e862cf824 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ publish = false # Prevent accidental `cargo publish` for now. default-run = "casper-node" [dependencies] +ansi_term = "0.12.1" anyhow = "1.0.28" displaydoc = "0.1.6" either = "1.5.3" diff --git a/src/app/config.rs b/src/app/config.rs index a5b75141c7..ad11913e57 100644 --- a/src/app/config.rs +++ b/src/app/config.rs @@ -17,11 +17,21 @@ //! * `Default` is implemented (derived or manually) with sensible defaults, and //! * it is completely documented. -use std::{fs, io, path::Path}; +use std::{fmt, fs, io, path::Path}; +use ansi_term::Style; use anyhow::Context; use serde::{Deserialize, Serialize}; -use tracing::{debug, Level}; +use tracing::{debug, Event, Level, Subscriber}; +use tracing_subscriber::{ + fmt::{ + format, + time::{FormatTime, SystemTime}, + FmtContext, FormatEvent, FormatFields, + }, + prelude::*, + registry::LookupSpan, +}; use casper_node::SmallNetworkConfig; @@ -60,17 +70,97 @@ impl Default for Log { } } +/// This is used to implement tracing's `FormatEvent` so that we can customize the way tracing +/// events are formatted. +struct FmtEvent {} + +impl FormatEvent for FmtEvent +where + S: Subscriber + for<'a> LookupSpan<'a>, + N: for<'a> FormatFields<'a> + 'static, +{ + fn format_event( + &self, + ctx: &FmtContext<'_, S, N>, + writer: &mut dyn fmt::Write, + event: &Event<'_>, + ) -> fmt::Result { + let meta = event.metadata(); + + let style = Style::new().dimmed(); + write!(writer, "{}", style.prefix())?; + SystemTime.format_time(writer)?; + write!(writer, "{}", style.suffix())?; + + let colour = log_level::colour(meta.level()); + write!( + writer, + " {}{:<6}{}", + colour.prefix(), + meta.level().to_string(), + colour.suffix() + )?; + + // TODO - enable outputting spans. See + // https://github.com/tokio-rs/tracing/blob/21f28f74/tracing-subscriber/src/fmt/format/mod.rs#L667-L695 + // for details. + // + // let full_ctx = FullCtx::new(&ctx); + // write!(writer, "{}", full_ctx)?; + + let module = meta + .module_path() + // .unwrap_or_default() + // .splitn(2, "::") + // .skip(1) + // .next() + .unwrap_or_default(); + + let file = meta + .file() + .unwrap_or_default() + .rsplitn(2, '/') + .next() + .unwrap_or_default(); + + let line = meta.line().unwrap_or_default(); + + write!( + writer, + "{}[{} {}:{}]{} ", + style.prefix(), + module, + file, + line, + style.suffix() + )?; + + ctx.format_fields(writer, event)?; + writeln!(writer) + } +} + impl Log { /// Initializes logging system based on settings in configuration. /// /// Will setup logging as described in this configuration for the whole application. This /// function should only be called once during the lifetime of the application. pub fn setup_logging(&self) -> anyhow::Result<()> { + let formatter = format::debug_fn(|writer, field, value| { + if field.name() == "message" { + write!(writer, "{:?}", value) + } else { + write!(writer, "{}={:?}", field, value) + } + }) + .delimited("; "); // Setup a new tracing-subscriber writing to `stderr` for logging. tracing::subscriber::set_global_default( tracing_subscriber::fmt() .with_writer(io::stderr) .with_max_level(self.level.clone()) + .fmt_fields(formatter) + .event_format(FmtEvent {}) .finish(), )?; debug!("debug output enabled"); @@ -98,6 +188,7 @@ pub fn to_string(cfg: &Config) -> anyhow::Result { mod log_level { use std::str::FromStr; + use ansi_term::Colour; use serde::{self, de::Error, Deserialize, Deserializer, Serializer}; use tracing::Level; @@ -116,4 +207,14 @@ mod log_level { Level::from_str(s.as_str()).map_err(Error::custom) } + + pub(super) fn colour(value: &Level) -> Colour { + match *value { + Level::TRACE => Colour::Purple, + Level::DEBUG => Colour::Blue, + Level::INFO => Colour::Green, + Level::WARN => Colour::Yellow, + Level::ERROR => Colour::Red, + } + } } diff --git a/src/components/storage.rs b/src/components/storage.rs index 2ba1f8abea..1e73ab001a 100644 --- a/src/components/storage.rs +++ b/src/components/storage.rs @@ -3,7 +3,7 @@ mod linear_block_store; use std::{ collections::HashSet, - fmt::{self, Debug, Formatter}, + fmt::{self, Debug, Display, Formatter}, sync::Arc, }; @@ -46,6 +46,15 @@ impl Debug for Event { } } +impl Display for Event { + fn fmt(&self, formatter: &mut Formatter) -> fmt::Result { + match self { + Event::PutBlock { block, .. } => write!(formatter, "put {}", block), + Event::GetBlock { name, .. } => write!(formatter, "get {}", name), + } + } +} + // Trait which will handle management of the various storage sub-components. // // If this trait is ultimately only used for testing scenarios, we shouldn't need to expose it to @@ -124,6 +133,22 @@ pub(crate) mod dummy { GetBlock(u8, Option), } + impl Display for Event { + fn fmt(&self, formatter: &mut Formatter) -> fmt::Result { + match self { + Event::Trigger => write!(formatter, "Trigger"), + Event::PutBlockSucceeded(name) => write!(formatter, "PutBlockSucceeded({})", name), + Event::PutBlockFailed(name) => write!(formatter, "PutBlockFailed({})", name), + Event::GetBlock(name, maybe_block) => write!( + formatter, + "GetBlock {{ {}, {} }}", + name, + maybe_block.is_some() + ), + } + } + } + #[derive(Debug)] pub(crate) struct StorageConsumer { stored_blocks_names: HashSet, diff --git a/src/components/storage/block.rs b/src/components/storage/block.rs index 1b558afdcb..52207b3a36 100644 --- a/src/components/storage/block.rs +++ b/src/components/storage/block.rs @@ -1,11 +1,14 @@ -use std::{fmt::Debug, hash::Hash}; +use std::{ + fmt::{self, Debug, Display, Formatter}, + hash::Hash, +}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; pub(crate) trait BlockType: - Clone + Serialize + DeserializeOwned + Send + Sync + Debug + Clone + Serialize + DeserializeOwned + Send + Sync + Debug + Display { - type Name: Copy + Hash + PartialOrd + Ord + PartialEq + Eq + Debug + Send + Sync; + type Name: Copy + Hash + PartialOrd + Ord + PartialEq + Eq + Debug + Display + Send + Sync; fn name(&self) -> &Self::Name; } @@ -29,3 +32,9 @@ impl BlockType for CLBlock { &self.name } } + +impl Display for CLBlock { + fn fmt(&self, formatter: &mut Formatter) -> fmt::Result { + write!(formatter, "Block({})", self.name) + } +} diff --git a/src/reactor.rs b/src/reactor.rs index 36d8338bea..5c919a9421 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -172,8 +172,8 @@ async fn launch(cfg: SmallNetworkConfig) -> anyhow::Result<()> { let (event, q) = scheduler.pop().await; // We log events twice, once in display and once in debug mode. - debug!(%event, ?q, "event"); - trace!(?event, ?q, "event"); + debug!(%event, ?q); + trace!(?event, ?q); // Dispatch the event, then execute the resulting effect. let effects = reactor.dispatch_event(scheduler, event); diff --git a/src/reactor/validator.rs b/src/reactor/validator.rs index 0ab9c93ba7..9cb6ac1379 100644 --- a/src/reactor/validator.rs +++ b/src/reactor/validator.rs @@ -94,12 +94,9 @@ impl reactor::Reactor for Reactor { impl Display for Event { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { - Event::Network(ev) => write!(f, "network: {}", ev), - // TODO - impl Display for storage::Event - // Event::Storage(ev) => write!(f, "storage: {}", ev), - // Event::StorageConsumer(ev) => write!(f, "storage-consumer: {}", ev), - Event::Storage(_) => write!(f, "storage event"), - Event::StorageConsumer(_) => write!(f, "storage-consumer event"), + Event::Network(ev) => write!(f, "Network({})", ev), + Event::Storage(ev) => write!(f, "Storage({})", ev), + Event::StorageConsumer(ev) => write!(f, "StorageConsumer({})", ev), } } } From f2fc5d787d26b2269ebc1d3efcee2a38d80a3f77 Mon Sep 17 00:00:00 2001 From: Fraser Hutchison Date: Thu, 28 May 2020 12:35:56 +0100 Subject: [PATCH 6/6] NDRS-42: address review comments --- src/app/config.rs | 31 +++++++++------------- src/components/storage.rs | 36 ++++++++++++++----------- src/effect.rs | 56 +++++++++++++++++++++++++++------------ src/lib.rs | 2 +- src/reactor.rs | 6 ++--- src/reactor/validator.rs | 6 ++--- src/tls.rs | 2 ++ 7 files changed, 81 insertions(+), 58 deletions(-) diff --git a/src/app/config.rs b/src/app/config.rs index ad11913e57..f7a4b35318 100644 --- a/src/app/config.rs +++ b/src/app/config.rs @@ -33,7 +33,7 @@ use tracing_subscriber::{ registry::LookupSpan, }; -use casper_node::SmallNetworkConfig; +use casper_node::Config as SmallNetworkConfig; /// Root configuration. #[derive(Debug, Deserialize, Serialize)] @@ -92,13 +92,13 @@ where SystemTime.format_time(writer)?; write!(writer, "{}", style.suffix())?; - let colour = log_level::colour(meta.level()); + let color = log_level::color(meta.level()); write!( writer, " {}{:<6}{}", - colour.prefix(), + color.prefix(), meta.level().to_string(), - colour.suffix() + color.suffix() )?; // TODO - enable outputting spans. See @@ -108,13 +108,7 @@ where // let full_ctx = FullCtx::new(&ctx); // write!(writer, "{}", full_ctx)?; - let module = meta - .module_path() - // .unwrap_or_default() - // .splitn(2, "::") - // .skip(1) - // .next() - .unwrap_or_default(); + let module = meta.module_path().unwrap_or_default(); let file = meta .file() @@ -154,6 +148,7 @@ impl Log { } }) .delimited("; "); + // Setup a new tracing-subscriber writing to `stderr` for logging. tracing::subscriber::set_global_default( tracing_subscriber::fmt() @@ -188,7 +183,7 @@ pub fn to_string(cfg: &Config) -> anyhow::Result { mod log_level { use std::str::FromStr; - use ansi_term::Colour; + use ansi_term::Color; use serde::{self, de::Error, Deserialize, Deserializer, Serializer}; use tracing::Level; @@ -208,13 +203,13 @@ mod log_level { Level::from_str(s.as_str()).map_err(Error::custom) } - pub(super) fn colour(value: &Level) -> Colour { + pub(super) fn color(value: &Level) -> Color { match *value { - Level::TRACE => Colour::Purple, - Level::DEBUG => Colour::Blue, - Level::INFO => Colour::Green, - Level::WARN => Colour::Yellow, - Level::ERROR => Colour::Red, + Level::TRACE => Color::Purple, + Level::DEBUG => Color::Blue, + Level::INFO => Color::Green, + Level::WARN => Color::Yellow, + Level::ERROR => Color::Red, } } } diff --git a/src/components/storage.rs b/src/components/storage.rs index 1e73ab001a..a80f12000b 100644 --- a/src/components/storage.rs +++ b/src/components/storage.rs @@ -74,7 +74,7 @@ pub(crate) trait StorageType { .await .expect("should run") }; - smallvec![future.then(|is_success| responder(is_success)).boxed()] + smallvec![future.then(|is_success| responder.call(is_success)).boxed()] } Event::GetBlock { name, responder } => { let block_store = self.block_store(); @@ -83,7 +83,7 @@ pub(crate) trait StorageType { .await .expect("should run") }; - smallvec![future.then(|block| responder(block)).boxed()] + smallvec![future.then(|block| responder.call(block)).boxed()] } } } @@ -130,21 +130,22 @@ pub(crate) mod dummy { Trigger, PutBlockSucceeded(u8), PutBlockFailed(u8), - GetBlock(u8, Option), + GotBlock(u8, Option), } impl Display for Event { fn fmt(&self, formatter: &mut Formatter) -> fmt::Result { match self { Event::Trigger => write!(formatter, "Trigger"), - Event::PutBlockSucceeded(name) => write!(formatter, "PutBlockSucceeded({})", name), - Event::PutBlockFailed(name) => write!(formatter, "PutBlockFailed({})", name), - Event::GetBlock(name, maybe_block) => write!( - formatter, - "GetBlock {{ {}, {} }}", - name, - maybe_block.is_some() - ), + Event::PutBlockSucceeded(name) => write!(formatter, "put {} succeeded", name), + Event::PutBlockFailed(name) => write!(formatter, "put {} failed", name), + Event::GotBlock(name, maybe_block) => { + if maybe_block.is_some() { + write!(formatter, "got block {}", name) + } else { + write!(formatter, "failed to get block {}", name) + } + } } } } @@ -186,15 +187,18 @@ pub(crate) mod dummy { } } Event::PutBlockSucceeded(name) => { - info!("Consumer knows {} has been stored.", name); + info!("consumer knows {} has been stored.", name); Self::set_timeout(storage_effect_builder) } Event::PutBlockFailed(name) => { - info!("Consumer knows {} has failed to be stored.", name); + info!("consumer knows {} has failed to be stored.", name); Self::set_timeout(storage_effect_builder) } - Event::GetBlock(name, maybe_block) => { - info!("Consumer received {:?}.", maybe_block); + Event::GotBlock(name, maybe_block) => { + match &maybe_block { + Some(block) => info!("consumer got {:?}", block), + None => info!("consumer failed to get {}.", name), + } assert_eq!( maybe_block.is_some(), self.stored_blocks_names.contains(&name) @@ -234,7 +238,7 @@ pub(crate) mod dummy { ) -> Multiple> { storage_effect_builder .make_request(move |responder| super::Event::GetBlock { name, responder }) - .event(move |maybe_block| Event::GetBlock(name, maybe_block)) + .event(move |maybe_block| Event::GotBlock(name, maybe_block)) } } } diff --git a/src/effect.rs b/src/effect.rs index 5f8ad4f65a..ffd5d5f895 100644 --- a/src/effect.rs +++ b/src/effect.rs @@ -36,6 +36,7 @@ //! effects to turn them into events should also be kept brief. use std::{ + fmt::{self, Debug, Display, Formatter}, future::Future, time::{Duration, Instant}, }; @@ -47,9 +48,6 @@ use tracing::error; /// A boxed future that produces one or more events. pub type Effect = BoxFuture<'static, Multiple>; -/// A boxed closure which returns an [`Effect`](type.Effect.html). -pub type Responder = Box Effect + Send>; - /// Intended to hold a small collection of [`Effect`](type.Effect.html)s. /// /// Stored in a `SmallVec` to avoid allocations in case there are less than three items grouped. The @@ -58,6 +56,42 @@ pub type Responder = Box Effect + Send>; /// the same size as an empty vec, which is two pointers. pub type Multiple = SmallVec<[T; 2]>; +/// A boxed closure which returns an [`Effect`](type.Effect.html). +pub struct Responder(Box Effect + Send>); + +impl Responder { + fn new(sender: oneshot::Sender) -> Self { + Responder(Box::new(move |value| { + async move { + if sender.send(value).is_err() { + error!("could not send response to request down oneshot channel") + } + smallvec![] + } + .boxed() + })) + } +} + +impl Responder { + /// Invoke the wrapped closure, passing in `data`. + pub fn call(self, data: T) -> Effect { + self.0(data) + } +} + +impl Debug for Responder { + fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { + write!(formatter, "responder") + } +} + +impl Display for Responder { + fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { + Debug::fmt(self, formatter) + } +} + /// Effect extension for futures, used to convert futures into actual effects. pub trait EffectExt: Future + Send { /// Finalizes a future into an effect that returns an event. @@ -170,7 +204,7 @@ impl EffectBuilder { /// Creates a request and response pair. /// - /// This creates and enqueues a request Event by invoking the provided `create_request_event` + /// This creates and enqueues a request event by invoking the provided `create_request_event` /// closure, having first created the responder required in the form of a oneshot channel. pub async fn make_request(self, create_request_event: F) -> T where @@ -181,7 +215,7 @@ impl EffectBuilder { let (sender, receiver) = oneshot::channel(); // Create response function. - let responder = create_responder(sender); + let responder = Responder::new(sender); // Now inject the request event into the event loop. let request_event = create_request_event(responder); @@ -196,15 +230,3 @@ impl EffectBuilder { }) } } - -fn create_responder(sender: oneshot::Sender) -> Responder { - Box::new(move |value| { - async move { - if sender.send(value).is_err() { - error!("could not send response to request down oneshot channel") - } - smallvec![] - } - .boxed() - }) -} diff --git a/src/lib.rs b/src/lib.rs index e323d7ae02..62ece4c687 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,5 +28,5 @@ pub mod reactor; pub mod tls; mod utils; -pub use components::small_network::Config as SmallNetworkConfig; +pub use components::small_network::Config; pub(crate) use components::small_network::{self, SmallNetwork}; diff --git a/src/reactor.rs b/src/reactor.rs index 5c919a9421..f6727f3618 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -38,7 +38,7 @@ use tracing::{debug, info, trace, warn}; use crate::{ effect::{Effect, Multiple}, utils::{self, WeightedRoundRobin}, - SmallNetworkConfig, + Config, }; pub use queue_kind::QueueKind; @@ -132,7 +132,7 @@ pub trait Reactor: Sized { /// /// If any instantiation fails, an error is returned. fn new( - cfg: SmallNetworkConfig, + cfg: Config, scheduler: &'static Scheduler, ) -> anyhow::Result<(Self, Multiple>)>; } @@ -145,7 +145,7 @@ pub trait Reactor: Sized { /// /// Errors are returned only if component initialization fails. #[inline] -async fn launch(cfg: SmallNetworkConfig) -> anyhow::Result<()> { +async fn launch(cfg: Config) -> anyhow::Result<()> { let event_size = mem::size_of::(); // Check if the event is of a reasonable size. This only emits a runtime warning at startup // right now, since storage size of events is not an issue per se, but copying might be diff --git a/src/reactor/validator.rs b/src/reactor/validator.rs index 9cb6ac1379..836a355c01 100644 --- a/src/reactor/validator.rs +++ b/src/reactor/validator.rs @@ -10,7 +10,7 @@ use crate::{ components::storage::{self, Storage, StorageType}, effect::{Effect, EffectBuilder, Multiple}, reactor::{self, EventQueueHandle, QueueKind, Scheduler}, - small_network, SmallNetwork, SmallNetworkConfig, + small_network, Config, SmallNetwork, }; /// Top-level event for the reactor. @@ -36,7 +36,7 @@ impl reactor::Reactor for Reactor { type Event = Event; fn new( - cfg: SmallNetworkConfig, + cfg: Config, scheduler: &'static Scheduler, ) -> anyhow::Result<(Self, Multiple>)> { let (net, net_effects) = @@ -114,6 +114,6 @@ impl Display for Message { /// `launch` will leak memory on start for global structures each time it is called. /// /// Errors are returned only if component initialization fails. -pub async fn launch(cfg: SmallNetworkConfig) -> anyhow::Result<()> { +pub async fn launch(cfg: Config) -> anyhow::Result<()> { super::launch::(cfg).await } diff --git a/src/tls.rs b/src/tls.rs index e0af772e4f..08d13ad8f4 100644 --- a/src/tls.rs +++ b/src/tls.rs @@ -52,6 +52,8 @@ use openssl::{ use serde::{de::DeserializeOwned, Deserialize, Deserializer, Serialize, Serializer}; use thiserror::Error; +// This is inside a private module so that the generated `BigArray` does not form part of this +// crate's public API, and hence also doesn't appear in the rustdocs. mod big_array { use serde_big_array::big_array;