From 42c7853d98355d63502e72a882691c3be3f0aceb Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Fri, 15 May 2020 14:14:57 +0200 Subject: [PATCH 1/9] Add `effects` module --- Cargo.lock | 159 ++++++++++++++++++++++++++++++++++++++ Cargo.toml | 2 + src/effect.rs | 147 +++++++++++++++++++++++++++++++++++ src/main.rs | 2 + src/util.rs | 6 ++ src/util/zero_one_many.rs | 71 +++++++++++++++++ 6 files changed, 387 insertions(+) create mode 100644 src/effect.rs create mode 100644 src/util.rs create mode 100644 src/util/zero_one_many.rs diff --git a/Cargo.lock b/Cargo.lock index 6046ca8ecf..dc98bdf18c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -43,6 +43,8 @@ name = "casper-node" version = "0.1.0" dependencies = [ "anyhow", + "either", + "futures", "serde", "structopt", "tokio", @@ -64,6 +66,107 @@ dependencies = [ "vec_map", ] +[[package]] +name = "either" +version = "1.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb1f6b1ce1c140482ea30ddd3335fc0024ac7ee112895426e0a629a6c20adfe3" + +[[package]] +name = "futures" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399" + +[[package]] +name = "futures-executor" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789" + +[[package]] +name = "futures-macro" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0b5a30a4328ab5473878237c447333c093297bded83a4983d10f4deea240d39" +dependencies = [ + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f2032893cb734c7a05d85ce0cc8b8c4075278e93b24b66f9de99d6eb0fa8acc" + +[[package]] +name = "futures-task" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626" +dependencies = [ + "once_cell", +] + +[[package]] +name = "futures-util" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project", + "pin-utils", + "proc-macro-hack", + "proc-macro-nested", + "slab", +] + [[package]] name = "heck" version = "0.3.1" @@ -94,6 +197,12 @@ version = "0.2.70" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3baa92041a6fec78c687fa0cc2b3fae8884f743d672cf551bed1d6dac6988d0f" +[[package]] +name = "memchr" +version = "2.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400" + [[package]] name = "num_cpus" version = "1.13.0" @@ -104,12 +213,44 @@ dependencies = [ "libc", ] +[[package]] +name = "once_cell" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d" + +[[package]] +name = "pin-project" +version = "0.4.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81d480cb4e89522ccda96d0eed9af94180b7a5f93fb28f66e1fd7d68431663d1" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "0.4.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a82996f11efccb19b685b14b5df818de31c1edcee3daa256ab5775dd98e72feb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7505eeebd78492e0f6108f7171c4948dbb120ee8119d9d77d0afa5469bef67f" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "proc-macro-error" version = "1.0.2" @@ -136,6 +277,18 @@ dependencies = [ "version_check", ] +[[package]] +name = "proc-macro-hack" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d659fe7c6d27f25e9d80a1a094c223f5246f6a6596453e09d7229bf42750b63" + +[[package]] +name = "proc-macro-nested" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e946095f9d3ed29ec38de908c22f95d9ac008e424c7bcae54c75a79c527c694" + [[package]] name = "proc-macro2" version = "1.0.12" @@ -174,6 +327,12 @@ dependencies = [ "syn", ] +[[package]] +name = "slab" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" + [[package]] name = "strsim" version = "0.8.0" diff --git a/Cargo.toml b/Cargo.toml index 99d0412558..d4ccd507e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,8 @@ license-file = "LICENSE" [dependencies] anyhow = "1.0.28" +either = "1.5.3" +futures = "0.3.5" serde = { version = "1.0.110", features = ["derive"] } structopt = "0.3.14" tokio = { version = "0.2.20", features = ["macros", "rt-threaded"] } diff --git a/src/effect.rs b/src/effect.rs new file mode 100644 index 0000000000..f820adaac2 --- /dev/null +++ b/src/effect.rs @@ -0,0 +1,147 @@ +//! Effects subsystem. +//! +//! Effects describe things that the creator of the effect intends to happen +//! and produce a value upon completion. They are, in fact, futures. +//! +//! A boxed future returning an event is typed as an `Effect`, where `Ev` is the event's type. +//! +//! ## Using effects +//! +//! To create an effect, an events factory is used that implements one or more of the factory +//! traits of this module. For example, given an events factory `eff`, we can create a +//! `set_timeout` future and turn it into an effect: +//! +//! ``` +//! # use std::time; +//! use crate::effect::EffectExt; +//! +//! enum Event { +//! ThreeSecondsElapsed(time::Duration) +//! } +//! +//! eff.set_timeout(time::Duration::from_secs(3)) +//! .event(Event::ThreeSecondsElapsed) +//! ``` +//! +//! This example will produce an effect that, after three seconds, creates an +//! `Event::ThreeSecondsElapsed`. Note that effects do nothing on their own, they need to be passed +//! to the `Reactor` (see `reactor` module) to be executed. +//! +//! ## Chaining futures and effects +//! +//! Effects are built from futures, which can be combined before being finalized +//! into an effect. However, only one effect can be created as the end result +//! of such a chain. +//! +//! It is possible to create an effect from multiple effects being run in parallel using `.also`: +//! +//! ``` +//! # use std::time; +//! use crate::effect::{EffectExt, EffectAlso}; +//! +//! enum Event { +//! ThreeSecondsElapsed(time::Duration), +//! FiveSecondsElapsed(time::Duration), +//! } +//! +//! // This effect produces a single event after five seconds: +//! eff.set_timeout(time::Duration::from_secs(3)) +//! .then(|_| eff.set_timeout(time::Duration::from_secs(2)) +//! .event(Event::FiveSecondsElapsed); +//! +//! // Here, two effects are run in parallel, resulting in two events: +//! eff.set_timeout(time::Duration::from_secs(3)) +//! .event(Event::ThreeSecondsElapsed) +//! .also(eff.set_timeout(time::Duration::from_secs(5)) +//! .event(Event::FiveSecondsElapsed)); +//! ``` +//! +//! ## Arbitrary effects +//! +//! While it is technically possible to turn any future into an effect, it is advisable to only use +//! the effects explicitly listed in this module through traits to create them. Post-processing on +//! effects to turn them into events should also be kept brief. + +use crate::util::zero_one_many::ZeroOneMany; +use futures::future::BoxFuture; +use futures::FutureExt; +use std::future::Future; +use std::time; + +/// Effect type. +/// +/// Effects are just boxed futures that produce one or more events. +pub type Effect = BoxFuture<'static, ZeroOneMany>; + +/// Request responder. +/// +/// Responders are closures used to deliver a value to a pending request. +pub type Responder = Box Effect + Send>; + +/// Effect extension for futures. +/// +/// Used to convert futures into actual effects. +pub trait EffectExt: Future + Send { + /// Finalize a future into an effect that returns an event. + /// + /// The passed in function `f` is used to translate the resulting value from an effect into + fn event(self, f: F) -> Effect + where + F: FnOnce(Self::Output) -> U + 'static + Send, + U: 'static, + Self: Sized; + + /// Finalize a future into an effect that runs but drops the result. + fn ignore(self) -> Effect; +} + +/// Parallel execution combinator for finalized effects. +pub trait EffectAlso: Future + Send { + /// Create new effect that runs multiple effects in parallel. + fn also(self, other: Self) -> Self; +} + +impl EffectExt for T +where + T: Future + Send + 'static + Sized, +{ + fn event(self, f: F) -> Effect + where + F: FnOnce(Self::Output) -> U + 'static + Send, + U: 'static, + { + self.map(f).map(|value| ZeroOneMany::One(value)).boxed() + } + + fn ignore(self) -> Effect { + self.map(|_| ZeroOneMany::Zero).boxed() + } +} + +impl EffectAlso for Effect +where + T: Send + 'static, +{ + fn also(self, other: Self) -> Self { + async move { + let (a, b) = futures::join!(self, other); + a.combine_unordered(b) + } + .boxed() + } +} + +/// Core effects. +pub trait Core { + /// Do not do anything. + /// + /// Immediately completes, can be used to trigger an event. + fn do_nothing(self) -> BoxFuture<'static, ()>; + + /// Set a timeout. + /// + /// Once the timeout fires, it will return the actual elapsed time since the execution (not + /// creation!) of this effect. Event loops typically execute effects right after a called event + /// handling function completes. + fn set_timeout(self, timeout: time::Duration) -> BoxFuture<'static, time::Duration>; +} diff --git a/src/main.rs b/src/main.rs index 834ca6f4c4..006ed725c1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,6 +11,8 @@ mod cli; mod config; +mod effect; +mod util; use structopt::StructOpt; diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000000..46934de674 --- /dev/null +++ b/src/util.rs @@ -0,0 +1,6 @@ +//! Various utilities. +//! +//! The Generic functions that are not limited to a particular module, but are too small to warrant +//! being factored out into standalone crates. + +pub mod zero_one_many; diff --git a/src/util/zero_one_many.rs b/src/util/zero_one_many.rs new file mode 100644 index 0000000000..909635efff --- /dev/null +++ b/src/util/zero_one_many.rs @@ -0,0 +1,71 @@ +use std::{iter, vec}; + +/// A type that contains zero, one or multiple instances of a value. +/// +/// Optimization for when you expect to have a lot of `Vec`s that would have +/// zero or one element. Creating a `One` instance does not cause an allocation. +#[derive(Debug)] +pub enum ZeroOneMany { + /// Zero elements. + Zero, + + /// One element. + One(T), + + /// More than one element. + Many(Vec), +} + +impl ZeroOneMany { + /// Combine values. + /// + /// Does not preserve order. + pub fn combine_unordered(self, other: Self) -> Self { + match (self, other) { + (ZeroOneMany::Zero, r) => r, + (l, ZeroOneMany::Zero) => l, + (ZeroOneMany::One(val_l), ZeroOneMany::One(val_r)) => { + ZeroOneMany::Many(vec![val_l, val_r]) + } + (ZeroOneMany::One(val), ZeroOneMany::Many(mut vals)) + | (ZeroOneMany::Many(mut vals), ZeroOneMany::One(val)) => { + vals.push(val); + + ZeroOneMany::Many(vals) + } + (ZeroOneMany::Many(mut vals_l), ZeroOneMany::Many(vals_r)) => { + vals_l.extend(vals_r.into_iter()); + ZeroOneMany::Many(vals_l) + } + } + } + + /// Map all value(s). + #[inline] + pub fn map(self, f: F) -> ZeroOneMany + where + F: Fn(T) -> U, + { + match self { + ZeroOneMany::Zero => ZeroOneMany::Zero, + ZeroOneMany::One(value) => ZeroOneMany::One(f(value)), + ZeroOneMany::Many(values) => ZeroOneMany::Many(values.into_iter().map(f).collect()), + } + } +} + +impl IntoIterator for ZeroOneMany { + type Item = T; + type IntoIter = either::Either< + iter::Empty, + either::Either, vec::IntoIter>, + >; + + fn into_iter(self) -> Self::IntoIter { + match self { + ZeroOneMany::Zero => either::Left(iter::empty()), + ZeroOneMany::One(val) => either::Right(either::Left(iter::once(val))), + ZeroOneMany::Many(vals) => either::Right(either::Right(vals.into_iter())), + } + } +} From 4feb20b76189396e455fc5e6a87613795c0b7036 Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Fri, 15 May 2020 14:28:27 +0200 Subject: [PATCH 2/9] Improve documentation with links and minor changes --- src/effect.rs | 7 ++++--- src/main.rs | 4 ++-- src/util/zero_one_many.rs | 2 ++ 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/effect.rs b/src/effect.rs index f820adaac2..bae8f90879 100644 --- a/src/effect.rs +++ b/src/effect.rs @@ -1,9 +1,10 @@ //! Effects subsystem. //! -//! Effects describe things that the creator of the effect intends to happen -//! and produce a value upon completion. They are, in fact, futures. +//! Effects describe things that the creator of the effect intends to happen, +//! producing a value upon completion. They are, in fact, futures. //! -//! A boxed future returning an event is typed as an `Effect`, where `Ev` is the event's type. +//! A boxed, pinned future returning an event is called an effect and typed as an `Effect`, +//! where `Ev` is the event's type. //! //! ## Using effects //! diff --git a/src/main.rs b/src/main.rs index 006ed725c1..2d3c87c560 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,7 @@ //! //! ## Application structure //! -//! While the `main` function is the central entrypoint for the node application, its core event +//! While the [`main`](fn.main.html) function is the central entrypoint for the node application, its core event //! loop is found inside the reactor. To get a tour of the sourcecode, be sure to run //! `cargo doc --open`. @@ -16,7 +16,7 @@ mod util; use structopt::StructOpt; -/// Parse command-line arguments and run application. +/// Parse [command-line arguments](cli/index.html) and run application. #[tokio::main] pub async fn main() -> anyhow::Result<()> { // Parse CLI args and run selected subcommand. diff --git a/src/util/zero_one_many.rs b/src/util/zero_one_many.rs index 909635efff..c05b9ea818 100644 --- a/src/util/zero_one_many.rs +++ b/src/util/zero_one_many.rs @@ -1,3 +1,5 @@ +//! Zero, one or many item collection. + use std::{iter, vec}; /// A type that contains zero, one or multiple instances of a value. From 700787932f29e1a6e2d73e0abff81874347b108f Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Fri, 15 May 2020 15:15:33 +0200 Subject: [PATCH 3/9] Add `round-robin` utility module with scheduler --- Cargo.toml | 2 +- src/util.rs | 1 + src/util/round_robin.rs | 153 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 155 insertions(+), 1 deletion(-) create mode 100644 src/util/round_robin.rs diff --git a/Cargo.toml b/Cargo.toml index d4ccd507e8..f2e55b2672 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,5 +14,5 @@ either = "1.5.3" futures = "0.3.5" serde = { version = "1.0.110", features = ["derive"] } structopt = "0.3.14" -tokio = { version = "0.2.20", features = ["macros", "rt-threaded"] } +tokio = { version = "0.2.20", features = ["macros", "rt-threaded", "sync"] } toml = "0.5.6" diff --git a/src/util.rs b/src/util.rs index 46934de674..b478bcf1e2 100644 --- a/src/util.rs +++ b/src/util.rs @@ -3,4 +3,5 @@ //! The Generic functions that are not limited to a particular module, but are too small to warrant //! being factored out into standalone crates. +pub mod round_robin; pub mod zero_one_many; diff --git a/src/util/round_robin.rs b/src/util/round_robin.rs new file mode 100644 index 0000000000..c83bec328e --- /dev/null +++ b/src/util/round_robin.rs @@ -0,0 +1,153 @@ +//! Round-robin scheduling. +//! +//! This module implements a weighted round-robin scheduler that ensures no deadlocks occur, but +//! still allows prioriting events from one source over another. The module uses `tokio`s +//! synchronization primitives under the hood. + +use std::collections::{HashMap, VecDeque}; +use std::hash::Hash; +use std::num::NonZeroUsize; +use tokio::sync; + +/// Weighted round-robin scheduler. +/// +/// The weighted round-robin scheduler keeps queues internally and returns an item from a queue +/// when asked. Each queue is assigned a weight, which is simply the amount of items maximally +/// returned from it before moving on to the next queue. +/// +/// If a queue is empty, it is skipped until the next round. Queues are processed in the order they +/// are passed to the constructor function. +/// +/// The scheduler keeps track internally which queue needs to be popped next. +#[derive(Debug)] +pub struct WeightedRoundRobin { + /// Current iteration state. + state: sync::Mutex>, + + /// A list of slots that are round-robin'd. + slots: Vec>, + + /// Actual queues. + queues: HashMap>>, + + /// Number of items in all queues combined. + total: sync::Semaphore, +} + +/// The inner state of the queue iteration. +#[derive(Copy, Clone, Debug)] +struct IterationState { + /// The currently active slot. + /// + /// Once it has no tickets left, the next slot is loaded. + active_slot: Slot, + + /// The position of the active slot. Used to calculate the next slot. + active_slot_idx: usize, +} + +/// An internal slot in the round-robin scheduler. +/// +/// A slot marks the scheduling position, i.e. which queue we are currently +/// polling and how many tickets it has left before the next one is due. +#[derive(Copy, Clone, Debug)] +struct Slot { + /// The key, identifying a queue. + key: K, + + /// Number of items to return before moving on to the next queue. + tickets: usize, +} + +impl WeightedRoundRobin +where + K: Copy + Clone + Eq + Hash, +{ + /// Create new weighted round-robin scheduler. + /// + /// Creates a queue for each pair given in `weights`. The second component + /// of each `weight` is the number of times to return items from one + /// queue before moving on to the next one. + pub fn new(weights: Vec<(K, NonZeroUsize)>) -> Self { + assert!(!weights.is_empty(), "must provide at least one slot"); + + let queues = weights + .iter() + .map(|(idx, _)| (*idx, sync::Mutex::new(VecDeque::new()))) + .collect(); + let slots: Vec> = weights + .into_iter() + .map(|(key, tickets)| Slot { + key, + tickets: tickets.get(), + }) + .collect(); + let active_slot = slots[0]; + + WeightedRoundRobin { + state: sync::Mutex::new(IterationState { + active_slot, + active_slot_idx: 0, + }), + slots, + queues, + total: sync::Semaphore::new(0), + } + } + + /// Push an item to a queue identified by key. + /// + /// ## Panics + /// + /// Panics if the queue identified by key `queue` does not exist. + pub async fn push(&self, item: I, queue: K) { + self.queues + .get(&queue) + .expect("tried to push to non-existant queue") + .lock() + .await + .push_back(item); + + // We increase the item count after we've put the item into the queue. + self.total.add_permits(1); + } + + /// Return the next item from queue. + /// + /// Returns `None` if the queue is empty or an internal error occurred. The + /// latter should never happen. + pub async fn pop(&self) -> (I, K) { + self.total.acquire().await.forget(); + + let mut inner = self.state.lock().await; + + // We know we have at least one item in a queue. + loop { + let mut current_queue = self + .queues + // The queue disappearing should never happen. + .get(&inner.active_slot.key) + .expect("the queue disappeared. this should not happen") + .lock() + .await; + + if inner.active_slot.tickets == 0 || current_queue.is_empty() { + // Go to next queue slot if we've exhausted the current queue. + inner.active_slot_idx = (inner.active_slot_idx + 1) % self.slots.len(); + inner.active_slot = self.slots[inner.active_slot_idx]; + continue; + } + + // We have hit a queue that is not empty. Decrease tickets and pop. + inner.active_slot.tickets -= 1; + + break ( + current_queue + .pop_front() + // We hold the queue's lock and checked `is_empty` earlier. + .expect("item disappeared. this should not happen"), + inner.active_slot.key, + ); + } + } +} From 61459f6b31e3ee922024c3eafe5ffa6e482218f4 Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Sat, 16 May 2020 11:48:22 +0200 Subject: [PATCH 4/9] Support configurable logging --- Cargo.toml | 2 ++ src/cli.rs | 2 +- src/config.rs | 75 ++++++++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 75 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f2e55b2672..c9d65a92bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,3 +16,5 @@ serde = { version = "1.0.110", features = ["derive"] } structopt = "0.3.14" tokio = { version = "0.2.20", features = ["macros", "rt-threaded", "sync"] } toml = "0.5.6" +tracing = "0.1.14" +tracing-subscriber = "0.2.5" diff --git a/src/cli.rs b/src/cli.rs index ee60e0bc4a..77de05ac60 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -36,7 +36,7 @@ impl Cli { .transpose()? .unwrap_or_default(); - println!("{:?}", cfg); + cfg.log.setup_logging()?; } } diff --git a/src/config.rs b/src/config.rs index e42b450f19..45d452b71c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -4,14 +4,61 @@ //! sensible defaults. //! //! The `cli` offers an option to generate a configuration from defaults for editing. +//! +//! # Adding a configuration section +//! +//! When adding a section to the configuration, ensure that +//! +//! * it has an entry in the root configuration `Config`, +//! * `Default` is implemented (derived or manually) with sensible defaults, and +//! * it is completely documented. use anyhow::Context; use serde::{Deserialize, Serialize}; -use std::{fs, path}; +use std::{fs, io, path}; +use tracing::debug; -/// Root configuration +/// Root configuration. #[derive(Debug, Default, Deserialize, Serialize)] -pub struct Config {} +pub struct Config { + /// Log configuration. + pub log: Log, +} + +/// Log configuration. +#[derive(Debug, Deserialize, Serialize)] +pub struct Log { + /// Log level. + #[serde(with = "log_level")] + pub level: tracing::Level, +} + +impl Default for Log { + fn default() -> Self { + Log { + level: tracing::Level::INFO, + } + } +} + +impl Log { + /// Initialize logging system based on settings in configuration. + /// + /// Will setup logging as described in this configuration for the whole application. This + /// function should only be called once during the lifetime of the application. + pub fn setup_logging(&self) -> anyhow::Result<()> { + // Setup a new tracing-subscriber writing to `stderr` for logging. + tracing::subscriber::set_global_default( + tracing_subscriber::fmt() + .with_writer(io::stderr) + .with_max_level(self.level.clone()) + .finish(), + )?; + debug!("debug output enabled"); + + Ok(()) + } +} /// Loads a TOML-formatted configuration from a given file. pub fn load_from_file>(config_path: P) -> anyhow::Result { @@ -27,3 +74,25 @@ pub fn load_from_file>(config_path: P) -> anyhow::Result anyhow::Result { toml::to_string_pretty(cfg).with_context(|| "Failed to serialize default configuration") } + +/// Serialization/deserialization +mod log_level { + use serde::{self, Deserialize}; + use std::str::FromStr; + use tracing::Level; + + pub fn serialize(value: &Level, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(value.to_string().as_str()) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + Level::from_str(s.as_str()).map_err(serde::de::Error::custom) + } +} From f7c83edd363c919b92fe3d96d4bd6dd411d89e6c Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Sat, 16 May 2020 14:45:35 +0200 Subject: [PATCH 5/9] Implement basic reactor --- Cargo.lock | 286 ++++++++++++++++++++++++++++++++++- Cargo.toml | 2 + src/cli.rs | 15 +- src/main.rs | 11 +- src/reactor.rs | 169 +++++++++++++++++++++ src/reactor/non_validator.rs | 1 + src/reactor/queue_kind.rs | 57 +++++++ src/reactor/validator.rs | 30 ++++ src/util.rs | 8 + 9 files changed, 563 insertions(+), 16 deletions(-) create mode 100644 src/reactor.rs create mode 100644 src/reactor/non_validator.rs create mode 100644 src/reactor/queue_kind.rs create mode 100644 src/reactor/validator.rs diff --git a/Cargo.lock b/Cargo.lock index dc98bdf18c..3895b10cf8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,14 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +[[package]] +name = "aho-corasick" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8716408b8bc624ed7f65d223ddb9ac2d044c0547b6fa4b0d554f3a9540496ada" +dependencies = [ + "memchr", +] + [[package]] name = "ansi_term" version = "0.11.0" @@ -11,9 +20,20 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.30" +version = "1.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2494382e9ba43995f3c56359e518641f450f5c36feeb4632a75cde2ec297c867" +checksum = "85bb70cc08ec97ca5450e6eba421deeea5f172c0fc61f78b5357b2a8e8be195f" + +[[package]] +name = "async-trait" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26c4f3195085c36ea8d24d32b2f828d23296a9370a28aa39d111f6f16bef9f3b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] [[package]] name = "atty" @@ -26,12 +46,24 @@ dependencies = [ "winapi", ] +[[package]] +name = "autocfg" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" + [[package]] name = "bitflags" version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" +[[package]] +name = "byteorder" +version = "1.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de" + [[package]] name = "bytes" version = "0.5.4" @@ -43,12 +75,33 @@ name = "casper-node" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "either", + "enum-iterator", "futures", "serde", "structopt", "tokio", "toml", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + +[[package]] +name = "chrono" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80094f509cf8b5ae86a4966a39b3ff66cd7e2a3e594accec3743ff3fabeab5b2" +dependencies = [ + "num-integer", + "num-traits", + "time", ] [[package]] @@ -72,6 +125,32 @@ version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb1f6b1ce1c140482ea30ddd3335fc0024ac7ee112895426e0a629a6c20adfe3" +[[package]] +name = "enum-iterator" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c79a6321a1197d7730510c7e3f6cb80432dfefecb32426de8cea0aa19b4bb8d7" +dependencies = [ + "enum-iterator-derive", +] + +[[package]] +name = "enum-iterator-derive" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e94aa31f7c0dc764f57896dc615ddd76fc13b0d5dca7eb6cc5e018a5a09ec06" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "futures" version = "0.3.5" @@ -185,6 +264,12 @@ dependencies = [ "libc", ] +[[package]] +name = "itoa" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8b7a7c0c47db5545ed3fef7468ee7bb5b74691498139e4b3f6a20685dc6dd8e" + [[package]] name = "lazy_static" version = "1.4.0" @@ -197,12 +282,49 @@ version = "0.2.70" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3baa92041a6fec78c687fa0cc2b3fae8884f743d672cf551bed1d6dac6988d0f" +[[package]] +name = "log" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "matchers" +version = "0.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1" +dependencies = [ + "regex-automata", +] + [[package]] name = "memchr" version = "2.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400" +[[package]] +name = "num-integer" +version = "0.1.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f6ea62e9d81a77cd3ee9a2a5b9b609447857f3d358704331e4ef39eb247fcba" +dependencies = [ + "autocfg", + "num-traits", +] + +[[package]] +name = "num-traits" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c62be47e61d1842b9170f0fdeec8eba98e60e90e5446449a0545e5152acd7096" +dependencies = [ + "autocfg", +] + [[package]] name = "num_cpus" version = "1.13.0" @@ -291,9 +413,9 @@ checksum = "8e946095f9d3ed29ec38de908c22f95d9ac008e424c7bcae54c75a79c527c694" [[package]] name = "proc-macro2" -version = "1.0.12" +version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8872cf6f48eee44265156c111456a700ab3483686b3f96df4cf5481c89157319" +checksum = "53f5ffe53a6b28e37c9c1ce74893477864d64f74778a93a4beb43c8fa167f639" dependencies = [ "unicode-xid", ] @@ -307,6 +429,40 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "regex" +version = "1.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6020f034922e3194c711b82a627453881bc4682166cabb07134a10c26ba7692" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", + "thread_local", +] + +[[package]] +name = "regex-automata" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4" +dependencies = [ + "byteorder", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.6.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fe5bd57d1d7414c6b5ed48563a2c855d995ff777729dcd91c369ec7fea395ae" + +[[package]] +name = "ryu" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed3d612bc64430efeb3f7ee6ef26d590dce0c43249217bddc62112540c7941e1" + [[package]] name = "serde" version = "1.0.110" @@ -327,12 +483,38 @@ dependencies = [ "syn", ] +[[package]] +name = "serde_json" +version = "1.0.53" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "993948e75b189211a9b31a7528f950c6adc21f9720b6438ff80a7fa2f864cea2" +dependencies = [ + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "sharded-slab" +version = "0.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06d5a3f5166fb5b42a5439f2eee8b9de149e235961e3eb21c5808fc3ea17ff3e" +dependencies = [ + "lazy_static", +] + [[package]] name = "slab" version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" +[[package]] +name = "smallvec" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7cb5678e1615754284ec264d9bb5b4c27d2018577fd90ac0ceb578591ed5ee4" + [[package]] name = "strsim" version = "0.8.0" @@ -365,9 +547,9 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.21" +version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4696caa4048ac7ce2bcd2e484b3cef88c1004e41b8e945a277e2c25dc0b72060" +checksum = "1425de3c33b0941002740a420b1a906a350b88d08b82b2c8a01035a3f9447bac" dependencies = [ "proc-macro2", "quote", @@ -394,6 +576,25 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "thread_local" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "time" +version = "0.1.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "tokio" version = "0.2.21" @@ -401,6 +602,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d099fa27b9702bed751524694adbe393e18b36b204da91eb1cbbbbb4a5ee2d58" dependencies = [ "bytes", + "fnv", "num_cpus", "pin-project-lite", "tokio-macros", @@ -426,6 +628,78 @@ dependencies = [ "serde", ] +[[package]] +name = "tracing" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7c6b59d116d218cb2d990eb06b77b64043e0268ef7323aae63d8b30ae462923" +dependencies = [ + "cfg-if", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99bbad0de3fd923c9c3232ead88510b783e5a4d16a6154adffa3d53308de984c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0aa83a9a47081cd522c09c81b31aec2c9273424976f922ad61c053b58350b715" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "tracing-log" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e0f8c7178e13481ff6765bd169b33e8d554c5d2bbede5e32c356194be02b9b9" +dependencies = [ + "lazy_static", + "log", + "tracing-core", +] + +[[package]] +name = "tracing-serde" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6ccba2f8f16e0ed268fc765d9b7ff22e965e7185d32f8f1ec8294fe17d86e79" +dependencies = [ + "serde", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d53c40489aa69c9aed21ff483f26886ca8403df33bdc2d2f87c60c1617826d2" +dependencies = [ + "ansi_term", + "chrono", + "lazy_static", + "matchers", + "regex", + "serde", + "serde_json", + "sharded-slab", + "smallvec", + "tracing-core", + "tracing-log", + "tracing-serde", +] + [[package]] name = "unicode-segmentation" version = "1.6.0" diff --git a/Cargo.toml b/Cargo.toml index c9d65a92bb..b7fd30012a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,9 @@ license-file = "LICENSE" [dependencies] anyhow = "1.0.28" +async-trait = "0.1.31" either = "1.5.3" +enum-iterator = "0.6.0" futures = "0.3.5" serde = { version = "1.0.110", features = ["derive"] } structopt = "0.3.14" diff --git a/src/cli.rs b/src/cli.rs index 77de05ac60..aea99a76a7 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -5,7 +5,7 @@ use std::{io, io::Write, path}; use structopt::StructOpt; -use crate::config; +use crate::{config, reactor}; // Note: The docstring on `Cli` is the help shown when calling the binary with `--help`. #[derive(Debug, StructOpt)] @@ -14,7 +14,10 @@ pub enum Cli { /// Generate a configuration file from defaults and dump it to stdout. GenerateConfig {}, /// Run the validator node. - RunValidator { + /// + /// Loads the configuration values from the given configuration file or uses defaults if not + /// given, then launches the reactor. + Validator { #[structopt(short, long, env)] /// Path to configuration file. config: Option, @@ -28,8 +31,10 @@ impl Cli { Cli::GenerateConfig {} => { let cfg_str = config::to_string(&Default::default())?; io::stdout().write_all(cfg_str.as_bytes())?; + + Ok(()) } - Cli::RunValidator { config } => { + Cli::Validator { config } => { // We load the specified config, if any, otherwise use defaults. let cfg = config .map(config::load_from_file) @@ -37,9 +42,9 @@ impl Cli { .unwrap_or_default(); cfg.log.setup_logging()?; + + reactor::launch::(cfg).await } } - - Ok(()) } } diff --git a/src/main.rs b/src/main.rs index 2d3c87c560..86864a5555 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,17 +1,18 @@ //! # CasperLabs blockchain node //! -//! This crate contain the core application for the CasperLabs blockchain. Run with `--help` to -//! see available command-line arguments. +//! This crate contain the core application for the CasperLabs blockchain. Run with `--help` to see +//! available command-line arguments. //! //! ## Application structure //! -//! While the [`main`](fn.main.html) function is the central entrypoint for the node application, its core event -//! loop is found inside the reactor. To get a tour of the sourcecode, be sure to run -//! `cargo doc --open`. +//! While the [`main`](fn.main.html) function is the central entrypoint for the node application, +//! its core event loop is found inside the [reactor](reactor/index.html). To get a tour of the +//! sourcecode, be sure to run `cargo doc --open`. mod cli; mod config; mod effect; +mod reactor; mod util; use structopt::StructOpt; diff --git a/src/reactor.rs b/src/reactor.rs new file mode 100644 index 0000000000..9e43ce3418 --- /dev/null +++ b/src/reactor.rs @@ -0,0 +1,169 @@ +//! Reactor core. +//! +//! Any long running instance of the node application uses an event-dispatch pattern: Events are +//! generated and stored on an event queue, then processed one-by-one. This process happens inside +//! the *reactor*, which also exclusively holds the state of the application: +//! +//! 1. The reactor pops an event off of the queue. +//! 2. The event is dispatched by the reactor. Since the reactor holds mutable state, it can grant +//! any component that processes an event mutable, exclusive access to its state. +//! 3. Once the (synchronous) event processing has completed, the component returns an effect. +//! 4. The reactor spawns a task that executes these effects and eventually puts an event onto the +//! event queue. +//! 5. meanwhile go to 1. +//! +//! # Reactors +//! +//! There no single reactor, but a reactor for each application type, since it defines which +//! components are used and how they are wired up. The reactor defines the state by being a `struct` +//! of components, their initialization through the `Reactor::new` and a function to `dispatch` +//! events to components. +//! +//! With all these set up, a reactor can be `launch`ed, causing it to run indefinitely, processing +//! events. + +pub mod non_validator; +mod queue_kind; +pub mod validator; + +use crate::{config, effect, util}; +use async_trait::async_trait; +use std::{fmt, mem}; +use tracing::{debug, info, warn}; + +pub use queue_kind::Queue; + +/// Event queue handle +/// +/// The event queue handle is how almost all parts of the application interact with the reactor +/// outside of the normal event loop. It gives different parts a chance to schedule messages that +/// stem from things like external IO. +/// +/// It is also possible to schedule new events by directly processing effects. This allows re-use of +/// the existing code for handling particular effects, as adding events directly should be a matter +/// of last resort. +#[derive(Debug)] +pub struct EventQueueHandle(&'static util::round_robin::WeightedRoundRobin); + +// Copy and Clone need to be implemented manually, since `Ev` prevents derivation. +impl Copy for EventQueueHandle {} +impl Clone for EventQueueHandle { + fn clone(&self) -> Self { + EventQueueHandle(self.0) + } +} + +impl EventQueueHandle +where + Ev: Send + 'static, +{ + /// Create a new event queue handle. + fn new(round_robin: &'static util::round_robin::WeightedRoundRobin) -> Self { + EventQueueHandle(round_robin) + } + + /// Return the next event in the queue + /// + /// Awaits until there is an event, then returns it. + #[inline] + async fn next_event(&self) -> (Ev, Queue) { + self.0.pop().await + } + + /// Process an effect. + /// + /// This function simply spawns another task that will take of the effect + /// processing. + #[inline] + pub fn process_effect(self, effect: effect::Effect) { + let eq = self; + // TODO: Properly carry around priorities. + let queue = Default::default(); + tokio::spawn(async move { + for event in effect.await { + eq.schedule(event, queue).await; + } + }); + } + + /// Schedule an event in the given queue. + #[inline] + pub async fn schedule(&self, event: Ev, queue_kind: Queue) { + self.0.push(event, queue_kind).await + } +} + +/// Reactor core. +/// +/// Any reactor implements should implement this trait and be launched by the `launch` function. +#[async_trait] +pub trait Reactor: Sized { + // Note: We've gone for the `Sized` bound here, since we return an instance in `new`. As an + // alternative, `new` could return a boxed instance instead, removing this requirement. + + /// Event type associated with reactor. + /// + /// Defines what kind of event the reactor processes. + type Event: Send + fmt::Debug + 'static; + + /// Dispatch an event on the reactor. + /// + /// This function is typically only called by the reactor itself to dispatch an event. It is + /// safe to call regardless, but will cause the event to skip the queue and things like + /// accounting. + fn dispatch_event(&mut self, event: Self::Event) -> effect::Effect; + + /// Create a new instance of the reactor. + /// + /// This method creates the full state, which consists of all components, and returns a reactor + /// instances along with the effects the components generated upon instantiation. + /// + /// If any instantiation fails, an error is returned. + fn new( + cfg: &config::Config, + eq: EventQueueHandle, + ) -> anyhow::Result<(Self, Vec>)>; +} + +/// Run a reactor. +/// +/// Start the reactor and associated background tasks, then enter main the event processing loop. +/// +/// `launch` will leak memory on start for global structures each time it is called. +/// +/// Errors are returned only if component initialization fails. +#[inline] +pub async fn launch(cfg: config::Config) -> anyhow::Result<()> { + let event_size = mem::size_of::(); + // Check if the event is of a reasonable size. This only emits a runtime warning at startup + // right now, since storage size of events is not an issue per se, but copying might be + // expensive if events get too large. + if event_size <= 4 * mem::size_of::() { + warn!( + "event size is {} bytes, consider reducing it or boxing", + event_size + ); + } + + let scheduler = util::round_robin::WeightedRoundRobin::::new(Queue::weights()); + + // Create a new event queue for this reactor run. + let eq = EventQueueHandle::new(util::leak(scheduler)); + + let (mut reactor, initial_effects) = R::new(&cfg, eq)?; + + // Run all effects from component instantiation. + for effect in initial_effects { + eq.process_effect(effect); + } + + info!("entering reactor main loop"); + loop { + let (event, q) = eq.next_event().await; + debug!(?event, ?q, "event"); + + // Dispatch the event, then execute the resulting effect. + let effect = reactor.dispatch_event(event); + eq.process_effect(effect); + } +} diff --git a/src/reactor/non_validator.rs b/src/reactor/non_validator.rs new file mode 100644 index 0000000000..91267842f4 --- /dev/null +++ b/src/reactor/non_validator.rs @@ -0,0 +1 @@ +//! Reactor for non-validating nodes. diff --git a/src/reactor/queue_kind.rs b/src/reactor/queue_kind.rs new file mode 100644 index 0000000000..a7926bd3fa --- /dev/null +++ b/src/reactor/queue_kind.rs @@ -0,0 +1,57 @@ +//! Queue kinds +//! +//! The reactor's event queue uses different queues to group events by priority and polls them in a +//! round-robin manner. This way, events are only competing for time within one queue, non-congested +//! queues can always assume to be speedily processed. + +use enum_iterator::IntoEnumIterator; +use std::num; + +/// Scheduling priority. +/// +/// Priorities are ordered from lowest to highest. +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, IntoEnumIterator)] +pub enum Queue { + /// Network events that were initiated outside of this node. + /// + /// Their load may vary and grouping them together in one queue aides DoS protection. + NetworkIncoming, + /// Network events that were initiated by the local node, such as outgoing messages. + Network, + /// Events of unspecified priority. + /// + /// This is the default queue. + Regular, + /// Reporting events on the local node. + /// + /// Metric events take precendence over most other events since missing a request for metrics + /// might cause the requester to assume that the node is down and forcefully restart it. + Metrics, +} + +impl Default for Queue { + fn default() -> Self { + Queue::Regular + } +} + +impl Queue { + /// Return the weight of a specific queue. + /// + /// The weight determines how many events are at most processed from a specific queue during + /// each event processing round. + fn weight(&self) -> num::NonZeroUsize { + num::NonZeroUsize::new(match self { + Queue::NetworkIncoming => 4, + Queue::Network => 4, + Queue::Regular => 8, + Queue::Metrics => 16, + }) + .expect("weight must be positive") + } + + /// Return weights of all possible `Queue`s. + pub(super) fn weights() -> Vec<(Self, num::NonZeroUsize)> { + Queue::into_enum_iter().map(|q| (q, q.weight())).collect() + } +} diff --git a/src/reactor/validator.rs b/src/reactor/validator.rs new file mode 100644 index 0000000000..b708490b8b --- /dev/null +++ b/src/reactor/validator.rs @@ -0,0 +1,30 @@ +//! Reactor for validator nodes. +//! +//! Validator nodes join the validator only network upon startup. +use crate::{config, effect, reactor}; + +/// Top-level event for the reactor. +#[derive(Debug)] +#[must_use] +pub enum Event {} + +/// Validator node reactor. +pub struct Reactor; + +impl reactor::Reactor for Reactor { + type Event = Event; + + fn new( + _cfg: &config::Config, + _eq: reactor::EventQueueHandle, + ) -> anyhow::Result<(Self, Vec>)> { + // TODO: Instantiate components here. + let mut _effects = Vec::new(); + + Ok((Reactor, _effects)) + } + + fn dispatch_event(&mut self, _event: Event) -> effect::Effect { + todo!() + } +} diff --git a/src/util.rs b/src/util.rs index b478bcf1e2..3353cae72a 100644 --- a/src/util.rs +++ b/src/util.rs @@ -5,3 +5,11 @@ pub mod round_robin; pub mod zero_one_many; + +/// Leak a value. +/// +/// Moves a value to the heap and then forgets about, leaving only a static reference behind. +#[inline] +pub fn leak(value: T) -> &'static T { + Box::leak(Box::new(value)) +} From eb94af73c0c46127dbcea1a1f7f92585915dbd68 Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Sat, 16 May 2020 14:48:48 +0200 Subject: [PATCH 6/9] Cleanup existing code --- src/effect.rs | 7 +------ src/reactor.rs | 4 ++-- src/reactor/queue_kind.rs | 2 +- src/util/zero_one_many.rs | 15 ++------------- 4 files changed, 6 insertions(+), 22 deletions(-) diff --git a/src/effect.rs b/src/effect.rs index bae8f90879..26ac55852e 100644 --- a/src/effect.rs +++ b/src/effect.rs @@ -74,11 +74,6 @@ use std::time; /// Effects are just boxed futures that produce one or more events. pub type Effect = BoxFuture<'static, ZeroOneMany>; -/// Request responder. -/// -/// Responders are closures used to deliver a value to a pending request. -pub type Responder = Box Effect + Send>; - /// Effect extension for futures. /// /// Used to convert futures into actual effects. @@ -111,7 +106,7 @@ where F: FnOnce(Self::Output) -> U + 'static + Send, U: 'static, { - self.map(f).map(|value| ZeroOneMany::One(value)).boxed() + self.map(f).map(ZeroOneMany::One).boxed() } fn ignore(self) -> Effect { diff --git a/src/reactor.rs b/src/reactor.rs index 9e43ce3418..3fcbfeb87a 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -66,7 +66,7 @@ where /// /// Awaits until there is an event, then returns it. #[inline] - async fn next_event(&self) -> (Ev, Queue) { + async fn next_event(self) -> (Ev, Queue) { self.0.pop().await } @@ -88,7 +88,7 @@ where /// Schedule an event in the given queue. #[inline] - pub async fn schedule(&self, event: Ev, queue_kind: Queue) { + pub async fn schedule(self, event: Ev, queue_kind: Queue) { self.0.push(event, queue_kind).await } } diff --git a/src/reactor/queue_kind.rs b/src/reactor/queue_kind.rs index a7926bd3fa..c05f5820b9 100644 --- a/src/reactor/queue_kind.rs +++ b/src/reactor/queue_kind.rs @@ -40,7 +40,7 @@ impl Queue { /// /// The weight determines how many events are at most processed from a specific queue during /// each event processing round. - fn weight(&self) -> num::NonZeroUsize { + fn weight(self) -> num::NonZeroUsize { num::NonZeroUsize::new(match self { Queue::NetworkIncoming => 4, Queue::Network => 4, diff --git a/src/util/zero_one_many.rs b/src/util/zero_one_many.rs index c05b9ea818..8fa6c38ad7 100644 --- a/src/util/zero_one_many.rs +++ b/src/util/zero_one_many.rs @@ -41,23 +41,12 @@ impl ZeroOneMany { } } } - - /// Map all value(s). - #[inline] - pub fn map(self, f: F) -> ZeroOneMany - where - F: Fn(T) -> U, - { - match self { - ZeroOneMany::Zero => ZeroOneMany::Zero, - ZeroOneMany::One(value) => ZeroOneMany::One(f(value)), - ZeroOneMany::Many(values) => ZeroOneMany::Many(values.into_iter().map(f).collect()), - } - } } impl IntoIterator for ZeroOneMany { type Item = T; + + #[allow(clippy::type_complexity)] type IntoIter = either::Either< iter::Empty, either::Either, vec::IntoIter>, From a0bf3ee3a9bac7f65235f4b25810fccabb9fdee9 Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Sat, 16 May 2020 15:17:46 +0200 Subject: [PATCH 7/9] Replace `ZeroOneMany` with `smallvec`, removing broken `.also` in the process --- Cargo.lock | 1 + Cargo.toml | 1 + src/effect.rs | 38 +++++++----------------- src/reactor.rs | 31 ++++++++++---------- src/reactor/validator.rs | 7 +++-- src/util.rs | 9 +++++- src/util/zero_one_many.rs | 62 --------------------------------------- 7 files changed, 40 insertions(+), 109 deletions(-) delete mode 100644 src/util/zero_one_many.rs diff --git a/Cargo.lock b/Cargo.lock index 3895b10cf8..c6d8594146 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -80,6 +80,7 @@ dependencies = [ "enum-iterator", "futures", "serde", + "smallvec", "structopt", "tokio", "toml", diff --git a/Cargo.toml b/Cargo.toml index b7fd30012a..6c0e7ca4da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ either = "1.5.3" enum-iterator = "0.6.0" futures = "0.3.5" serde = { version = "1.0.110", features = ["derive"] } +smallvec = "1.4.0" structopt = "0.3.14" tokio = { version = "0.2.20", features = ["macros", "rt-threaded", "sync"] } toml = "0.5.6" diff --git a/src/effect.rs b/src/effect.rs index 26ac55852e..230ab9432d 100644 --- a/src/effect.rs +++ b/src/effect.rs @@ -63,16 +63,17 @@ //! the effects explicitly listed in this module through traits to create them. Post-processing on //! effects to turn them into events should also be kept brief. -use crate::util::zero_one_many::ZeroOneMany; +use crate::util::Multiple; use futures::future::BoxFuture; use futures::FutureExt; +use smallvec::smallvec; use std::future::Future; use std::time; /// Effect type. /// /// Effects are just boxed futures that produce one or more events. -pub type Effect = BoxFuture<'static, ZeroOneMany>; +pub type Effect = BoxFuture<'static, Multiple>; /// Effect extension for futures. /// @@ -81,49 +82,30 @@ pub trait EffectExt: Future + Send { /// Finalize a future into an effect that returns an event. /// /// The passed in function `f` is used to translate the resulting value from an effect into - fn event(self, f: F) -> Effect + fn event(self, f: F) -> Multiple> where F: FnOnce(Self::Output) -> U + 'static + Send, U: 'static, Self: Sized; /// Finalize a future into an effect that runs but drops the result. - fn ignore(self) -> Effect; -} - -/// Parallel execution combinator for finalized effects. -pub trait EffectAlso: Future + Send { - /// Create new effect that runs multiple effects in parallel. - fn also(self, other: Self) -> Self; + fn ignore(self) -> Multiple>; } impl EffectExt for T where T: Future + Send + 'static + Sized, { - fn event(self, f: F) -> Effect + fn event(self, f: F) -> Multiple> where F: FnOnce(Self::Output) -> U + 'static + Send, U: 'static, { - self.map(f).map(ZeroOneMany::One).boxed() + smallvec![self.map(f).map(|item| smallvec![item]).boxed()] } - fn ignore(self) -> Effect { - self.map(|_| ZeroOneMany::Zero).boxed() - } -} - -impl EffectAlso for Effect -where - T: Send + 'static, -{ - fn also(self, other: Self) -> Self { - async move { - let (a, b) = futures::join!(self, other); - a.combine_unordered(b) - } - .boxed() + fn ignore(self) -> Multiple> { + smallvec![self.map(|_| Multiple::new()).boxed()] } } @@ -132,7 +114,7 @@ pub trait Core { /// Do not do anything. /// /// Immediately completes, can be used to trigger an event. - fn do_nothing(self) -> BoxFuture<'static, ()>; + fn immediately(self) -> BoxFuture<'static, ()>; /// Set a timeout. /// diff --git a/src/reactor.rs b/src/reactor.rs index 3fcbfeb87a..8d079be480 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -26,6 +26,7 @@ pub mod non_validator; mod queue_kind; pub mod validator; +use crate::util::Multiple; use crate::{config, effect, util}; use async_trait::async_trait; use std::{fmt, mem}; @@ -72,18 +73,20 @@ where /// Process an effect. /// - /// This function simply spawns another task that will take of the effect - /// processing. + /// Spawns tasks that will process the given effects. #[inline] - pub fn process_effect(self, effect: effect::Effect) { + pub fn process_effects(self, effects: Multiple>) { let eq = self; // TODO: Properly carry around priorities. let queue = Default::default(); - tokio::spawn(async move { - for event in effect.await { - eq.schedule(event, queue).await; - } - }); + + for effect in effects { + tokio::spawn(async move { + for event in effect.await { + eq.schedule(event, queue).await; + } + }); + } } /// Schedule an event in the given queue. @@ -111,7 +114,7 @@ pub trait Reactor: Sized { /// This function is typically only called by the reactor itself to dispatch an event. It is /// safe to call regardless, but will cause the event to skip the queue and things like /// accounting. - fn dispatch_event(&mut self, event: Self::Event) -> effect::Effect; + fn dispatch_event(&mut self, event: Self::Event) -> Multiple>; /// Create a new instance of the reactor. /// @@ -122,7 +125,7 @@ pub trait Reactor: Sized { fn new( cfg: &config::Config, eq: EventQueueHandle, - ) -> anyhow::Result<(Self, Vec>)>; + ) -> anyhow::Result<(Self, Multiple>)>; } /// Run a reactor. @@ -153,9 +156,7 @@ pub async fn launch(cfg: config::Config) -> anyhow::Result<()> { let (mut reactor, initial_effects) = R::new(&cfg, eq)?; // Run all effects from component instantiation. - for effect in initial_effects { - eq.process_effect(effect); - } + eq.process_effects(initial_effects); info!("entering reactor main loop"); loop { @@ -163,7 +164,7 @@ pub async fn launch(cfg: config::Config) -> anyhow::Result<()> { debug!(?event, ?q, "event"); // Dispatch the event, then execute the resulting effect. - let effect = reactor.dispatch_event(event); - eq.process_effect(effect); + let effects = reactor.dispatch_event(event); + eq.process_effects(effects); } } diff --git a/src/reactor/validator.rs b/src/reactor/validator.rs index b708490b8b..05ecadbf2f 100644 --- a/src/reactor/validator.rs +++ b/src/reactor/validator.rs @@ -1,6 +1,7 @@ //! Reactor for validator nodes. //! //! Validator nodes join the validator only network upon startup. +use crate::util::Multiple; use crate::{config, effect, reactor}; /// Top-level event for the reactor. @@ -17,14 +18,14 @@ impl reactor::Reactor for Reactor { fn new( _cfg: &config::Config, _eq: reactor::EventQueueHandle, - ) -> anyhow::Result<(Self, Vec>)> { + ) -> anyhow::Result<(Self, Multiple>)> { // TODO: Instantiate components here. - let mut _effects = Vec::new(); + let mut _effects = Multiple::new(); Ok((Reactor, _effects)) } - fn dispatch_event(&mut self, _event: Event) -> effect::Effect { + fn dispatch_event(&mut self, _event: Event) -> Multiple> { todo!() } } diff --git a/src/util.rs b/src/util.rs index 3353cae72a..654df2ec4a 100644 --- a/src/util.rs +++ b/src/util.rs @@ -4,7 +4,6 @@ //! being factored out into standalone crates. pub mod round_robin; -pub mod zero_one_many; /// Leak a value. /// @@ -13,3 +12,11 @@ pub mod zero_one_many; pub fn leak(value: T) -> &'static T { Box::leak(Box::new(value)) } + +/// Small amount store. +/// +/// Stored in a smallvec to avoid allocations in case there are less than three items grouped. The +/// size of two items is chosen because one item is the most common use case, and large items are +/// typically boxed. In the latter case two pointers and one enum variant discriminator is almost +/// the same size as an empty vec, which is two pointers. +pub type Multiple = smallvec::SmallVec<[T; 2]>; diff --git a/src/util/zero_one_many.rs b/src/util/zero_one_many.rs deleted file mode 100644 index 8fa6c38ad7..0000000000 --- a/src/util/zero_one_many.rs +++ /dev/null @@ -1,62 +0,0 @@ -//! Zero, one or many item collection. - -use std::{iter, vec}; - -/// A type that contains zero, one or multiple instances of a value. -/// -/// Optimization for when you expect to have a lot of `Vec`s that would have -/// zero or one element. Creating a `One` instance does not cause an allocation. -#[derive(Debug)] -pub enum ZeroOneMany { - /// Zero elements. - Zero, - - /// One element. - One(T), - - /// More than one element. - Many(Vec), -} - -impl ZeroOneMany { - /// Combine values. - /// - /// Does not preserve order. - pub fn combine_unordered(self, other: Self) -> Self { - match (self, other) { - (ZeroOneMany::Zero, r) => r, - (l, ZeroOneMany::Zero) => l, - (ZeroOneMany::One(val_l), ZeroOneMany::One(val_r)) => { - ZeroOneMany::Many(vec![val_l, val_r]) - } - (ZeroOneMany::One(val), ZeroOneMany::Many(mut vals)) - | (ZeroOneMany::Many(mut vals), ZeroOneMany::One(val)) => { - vals.push(val); - - ZeroOneMany::Many(vals) - } - (ZeroOneMany::Many(mut vals_l), ZeroOneMany::Many(vals_r)) => { - vals_l.extend(vals_r.into_iter()); - ZeroOneMany::Many(vals_l) - } - } - } -} - -impl IntoIterator for ZeroOneMany { - type Item = T; - - #[allow(clippy::type_complexity)] - type IntoIter = either::Either< - iter::Empty, - either::Either, vec::IntoIter>, - >; - - fn into_iter(self) -> Self::IntoIter { - match self { - ZeroOneMany::Zero => either::Left(iter::empty()), - ZeroOneMany::One(val) => either::Right(either::Left(iter::once(val))), - ZeroOneMany::Many(vals) => either::Right(either::Right(vals.into_iter())), - } - } -} From b2bdfea390f4990102462ac4c3a8d871b7542fc8 Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Sat, 16 May 2020 15:44:13 +0200 Subject: [PATCH 8/9] Fix bug that caused event size output warning logic to be inverted --- README.md | 16 ++++++++++++++++ src/reactor.rs | 2 +- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 1a59365f27..e22e1d28ad 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,22 @@ The is the core application for the CasperLabs blockchain. To compile this application, simply run `cargo build` on a recent stable Rust (`>= 1.43.1`) version. +## Running a validator node + +Launching a validator node with the default configuration is done by simply launching the application: + +``` +casper-node validator +``` + +It is very likely that the configuration requires editing though, so typically one will want to generate a configuration file first, edit it and then launch: + +``` +casper-node generate-config > mynode.toml +# ... edit mynode.toml +casper-node validator -c mynode.toml +``` + ## Development A good starting point is to build the documentation and read it in your browser: diff --git a/src/reactor.rs b/src/reactor.rs index 8d079be480..4628f2051d 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -141,7 +141,7 @@ pub async fn launch(cfg: config::Config) -> anyhow::Result<()> { // Check if the event is of a reasonable size. This only emits a runtime warning at startup // right now, since storage size of events is not an issue per se, but copying might be // expensive if events get too large. - if event_size <= 4 * mem::size_of::() { + if event_size > 4 * mem::size_of::() { warn!( "event size is {} bytes, consider reducing it or boxing", event_size From fe9813d6f74d12097337b67d3e2559d324a5a222 Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Sat, 16 May 2020 15:44:57 +0200 Subject: [PATCH 9/9] Update `README.md` with instructions for running and more development hints --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index e22e1d28ad..733de2ba2a 100644 --- a/README.md +++ b/README.md @@ -28,4 +28,6 @@ A good starting point is to build the documentation and read it in your browser: ``` cargo doc --no-deps --open -``` \ No newline at end of file +``` + +When generating a configuration file, it is usually helpful to set the log-level to `DEBUG` during development. \ No newline at end of file