diff --git a/Cargo.lock b/Cargo.lock index 6046ca8ecf..c6d8594146 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,14 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +[[package]] +name = "aho-corasick" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8716408b8bc624ed7f65d223ddb9ac2d044c0547b6fa4b0d554f3a9540496ada" +dependencies = [ + "memchr", +] + [[package]] name = "ansi_term" version = "0.11.0" @@ -11,9 +20,20 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.30" +version = "1.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2494382e9ba43995f3c56359e518641f450f5c36feeb4632a75cde2ec297c867" +checksum = "85bb70cc08ec97ca5450e6eba421deeea5f172c0fc61f78b5357b2a8e8be195f" + +[[package]] +name = "async-trait" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26c4f3195085c36ea8d24d32b2f828d23296a9370a28aa39d111f6f16bef9f3b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] [[package]] name = "atty" @@ -26,12 +46,24 @@ dependencies = [ "winapi", ] +[[package]] +name = "autocfg" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" + [[package]] name = "bitflags" version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" +[[package]] +name = "byteorder" +version = "1.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de" + [[package]] name = "bytes" version = "0.5.4" @@ -43,10 +75,34 @@ name = "casper-node" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", + "either", + "enum-iterator", + "futures", "serde", + "smallvec", "structopt", "tokio", "toml", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + +[[package]] +name = "chrono" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80094f509cf8b5ae86a4966a39b3ff66cd7e2a3e594accec3743ff3fabeab5b2" +dependencies = [ + "num-integer", + "num-traits", + "time", ] [[package]] @@ -64,6 +120,133 @@ dependencies = [ "vec_map", ] +[[package]] +name = "either" +version = "1.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb1f6b1ce1c140482ea30ddd3335fc0024ac7ee112895426e0a629a6c20adfe3" + +[[package]] +name = "enum-iterator" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c79a6321a1197d7730510c7e3f6cb80432dfefecb32426de8cea0aa19b4bb8d7" +dependencies = [ + "enum-iterator-derive", +] + +[[package]] +name = "enum-iterator-derive" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e94aa31f7c0dc764f57896dc615ddd76fc13b0d5dca7eb6cc5e018a5a09ec06" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + +[[package]] +name = "futures" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399" + +[[package]] +name = "futures-executor" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789" + +[[package]] +name = "futures-macro" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0b5a30a4328ab5473878237c447333c093297bded83a4983d10f4deea240d39" +dependencies = [ + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f2032893cb734c7a05d85ce0cc8b8c4075278e93b24b66f9de99d6eb0fa8acc" + +[[package]] +name = "futures-task" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626" +dependencies = [ + "once_cell", +] + +[[package]] +name = "futures-util" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project", + "pin-utils", + "proc-macro-hack", + "proc-macro-nested", + "slab", +] + [[package]] name = "heck" version = "0.3.1" @@ -82,6 +265,12 @@ dependencies = [ "libc", ] +[[package]] +name = "itoa" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8b7a7c0c47db5545ed3fef7468ee7bb5b74691498139e4b3f6a20685dc6dd8e" + [[package]] name = "lazy_static" version = "1.4.0" @@ -94,6 +283,49 @@ version = "0.2.70" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3baa92041a6fec78c687fa0cc2b3fae8884f743d672cf551bed1d6dac6988d0f" +[[package]] +name = "log" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "matchers" +version = "0.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1" +dependencies = [ + "regex-automata", +] + +[[package]] +name = "memchr" +version = "2.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400" + +[[package]] +name = "num-integer" +version = "0.1.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f6ea62e9d81a77cd3ee9a2a5b9b609447857f3d358704331e4ef39eb247fcba" +dependencies = [ + "autocfg", + "num-traits", +] + +[[package]] +name = "num-traits" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c62be47e61d1842b9170f0fdeec8eba98e60e90e5446449a0545e5152acd7096" +dependencies = [ + "autocfg", +] + [[package]] name = "num_cpus" version = "1.13.0" @@ -104,12 +336,44 @@ dependencies = [ "libc", ] +[[package]] +name = "once_cell" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d" + +[[package]] +name = "pin-project" +version = "0.4.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81d480cb4e89522ccda96d0eed9af94180b7a5f93fb28f66e1fd7d68431663d1" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "0.4.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a82996f11efccb19b685b14b5df818de31c1edcee3daa256ab5775dd98e72feb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7505eeebd78492e0f6108f7171c4948dbb120ee8119d9d77d0afa5469bef67f" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "proc-macro-error" version = "1.0.2" @@ -136,11 +400,23 @@ dependencies = [ "version_check", ] +[[package]] +name = "proc-macro-hack" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d659fe7c6d27f25e9d80a1a094c223f5246f6a6596453e09d7229bf42750b63" + +[[package]] +name = "proc-macro-nested" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e946095f9d3ed29ec38de908c22f95d9ac008e424c7bcae54c75a79c527c694" + [[package]] name = "proc-macro2" -version = "1.0.12" +version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8872cf6f48eee44265156c111456a700ab3483686b3f96df4cf5481c89157319" +checksum = "53f5ffe53a6b28e37c9c1ce74893477864d64f74778a93a4beb43c8fa167f639" dependencies = [ "unicode-xid", ] @@ -154,6 +430,40 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "regex" +version = "1.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6020f034922e3194c711b82a627453881bc4682166cabb07134a10c26ba7692" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", + "thread_local", +] + +[[package]] +name = "regex-automata" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4" +dependencies = [ + "byteorder", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.6.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fe5bd57d1d7414c6b5ed48563a2c855d995ff777729dcd91c369ec7fea395ae" + +[[package]] +name = "ryu" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed3d612bc64430efeb3f7ee6ef26d590dce0c43249217bddc62112540c7941e1" + [[package]] name = "serde" version = "1.0.110" @@ -174,6 +484,38 @@ dependencies = [ "syn", ] +[[package]] +name = "serde_json" +version = "1.0.53" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "993948e75b189211a9b31a7528f950c6adc21f9720b6438ff80a7fa2f864cea2" +dependencies = [ + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "sharded-slab" +version = "0.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06d5a3f5166fb5b42a5439f2eee8b9de149e235961e3eb21c5808fc3ea17ff3e" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "slab" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" + +[[package]] +name = "smallvec" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7cb5678e1615754284ec264d9bb5b4c27d2018577fd90ac0ceb578591ed5ee4" + [[package]] name = "strsim" version = "0.8.0" @@ -206,9 +548,9 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.21" +version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4696caa4048ac7ce2bcd2e484b3cef88c1004e41b8e945a277e2c25dc0b72060" +checksum = "1425de3c33b0941002740a420b1a906a350b88d08b82b2c8a01035a3f9447bac" dependencies = [ "proc-macro2", "quote", @@ -235,6 +577,25 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "thread_local" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "time" +version = "0.1.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "tokio" version = "0.2.21" @@ -242,6 +603,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d099fa27b9702bed751524694adbe393e18b36b204da91eb1cbbbbb4a5ee2d58" dependencies = [ "bytes", + "fnv", "num_cpus", "pin-project-lite", "tokio-macros", @@ -267,6 +629,78 @@ dependencies = [ "serde", ] +[[package]] +name = "tracing" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7c6b59d116d218cb2d990eb06b77b64043e0268ef7323aae63d8b30ae462923" +dependencies = [ + "cfg-if", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99bbad0de3fd923c9c3232ead88510b783e5a4d16a6154adffa3d53308de984c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0aa83a9a47081cd522c09c81b31aec2c9273424976f922ad61c053b58350b715" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "tracing-log" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e0f8c7178e13481ff6765bd169b33e8d554c5d2bbede5e32c356194be02b9b9" +dependencies = [ + "lazy_static", + "log", + "tracing-core", +] + +[[package]] +name = "tracing-serde" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6ccba2f8f16e0ed268fc765d9b7ff22e965e7185d32f8f1ec8294fe17d86e79" +dependencies = [ + "serde", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d53c40489aa69c9aed21ff483f26886ca8403df33bdc2d2f87c60c1617826d2" +dependencies = [ + "ansi_term", + "chrono", + "lazy_static", + "matchers", + "regex", + "serde", + "serde_json", + "sharded-slab", + "smallvec", + "tracing-core", + "tracing-log", + "tracing-serde", +] + [[package]] name = "unicode-segmentation" version = "1.6.0" diff --git a/Cargo.toml b/Cargo.toml index 99d0412558..6c0e7ca4da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,14 @@ license-file = "LICENSE" [dependencies] anyhow = "1.0.28" +async-trait = "0.1.31" +either = "1.5.3" +enum-iterator = "0.6.0" +futures = "0.3.5" serde = { version = "1.0.110", features = ["derive"] } +smallvec = "1.4.0" structopt = "0.3.14" -tokio = { version = "0.2.20", features = ["macros", "rt-threaded"] } +tokio = { version = "0.2.20", features = ["macros", "rt-threaded", "sync"] } toml = "0.5.6" +tracing = "0.1.14" +tracing-subscriber = "0.2.5" diff --git a/README.md b/README.md index 1a59365f27..733de2ba2a 100644 --- a/README.md +++ b/README.md @@ -6,10 +6,28 @@ The is the core application for the CasperLabs blockchain. To compile this application, simply run `cargo build` on a recent stable Rust (`>= 1.43.1`) version. +## Running a validator node + +Launching a validator node with the default configuration is done by simply launching the application: + +``` +casper-node validator +``` + +It is very likely that the configuration requires editing though, so typically one will want to generate a configuration file first, edit it and then launch: + +``` +casper-node generate-config > mynode.toml +# ... edit mynode.toml +casper-node validator -c mynode.toml +``` + ## Development A good starting point is to build the documentation and read it in your browser: ``` cargo doc --no-deps --open -``` \ No newline at end of file +``` + +When generating a configuration file, it is usually helpful to set the log-level to `DEBUG` during development. \ No newline at end of file diff --git a/src/cli.rs b/src/cli.rs index ee60e0bc4a..aea99a76a7 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -5,7 +5,7 @@ use std::{io, io::Write, path}; use structopt::StructOpt; -use crate::config; +use crate::{config, reactor}; // Note: The docstring on `Cli` is the help shown when calling the binary with `--help`. #[derive(Debug, StructOpt)] @@ -14,7 +14,10 @@ pub enum Cli { /// Generate a configuration file from defaults and dump it to stdout. GenerateConfig {}, /// Run the validator node. - RunValidator { + /// + /// Loads the configuration values from the given configuration file or uses defaults if not + /// given, then launches the reactor. + Validator { #[structopt(short, long, env)] /// Path to configuration file. config: Option, @@ -28,18 +31,20 @@ impl Cli { Cli::GenerateConfig {} => { let cfg_str = config::to_string(&Default::default())?; io::stdout().write_all(cfg_str.as_bytes())?; + + Ok(()) } - Cli::RunValidator { config } => { + Cli::Validator { config } => { // We load the specified config, if any, otherwise use defaults. let cfg = config .map(config::load_from_file) .transpose()? .unwrap_or_default(); - println!("{:?}", cfg); + cfg.log.setup_logging()?; + + reactor::launch::(cfg).await } } - - Ok(()) } } diff --git a/src/config.rs b/src/config.rs index e42b450f19..45d452b71c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -4,14 +4,61 @@ //! sensible defaults. //! //! The `cli` offers an option to generate a configuration from defaults for editing. +//! +//! # Adding a configuration section +//! +//! When adding a section to the configuration, ensure that +//! +//! * it has an entry in the root configuration `Config`, +//! * `Default` is implemented (derived or manually) with sensible defaults, and +//! * it is completely documented. use anyhow::Context; use serde::{Deserialize, Serialize}; -use std::{fs, path}; +use std::{fs, io, path}; +use tracing::debug; -/// Root configuration +/// Root configuration. #[derive(Debug, Default, Deserialize, Serialize)] -pub struct Config {} +pub struct Config { + /// Log configuration. + pub log: Log, +} + +/// Log configuration. +#[derive(Debug, Deserialize, Serialize)] +pub struct Log { + /// Log level. + #[serde(with = "log_level")] + pub level: tracing::Level, +} + +impl Default for Log { + fn default() -> Self { + Log { + level: tracing::Level::INFO, + } + } +} + +impl Log { + /// Initialize logging system based on settings in configuration. + /// + /// Will setup logging as described in this configuration for the whole application. This + /// function should only be called once during the lifetime of the application. + pub fn setup_logging(&self) -> anyhow::Result<()> { + // Setup a new tracing-subscriber writing to `stderr` for logging. + tracing::subscriber::set_global_default( + tracing_subscriber::fmt() + .with_writer(io::stderr) + .with_max_level(self.level.clone()) + .finish(), + )?; + debug!("debug output enabled"); + + Ok(()) + } +} /// Loads a TOML-formatted configuration from a given file. pub fn load_from_file>(config_path: P) -> anyhow::Result { @@ -27,3 +74,25 @@ pub fn load_from_file>(config_path: P) -> anyhow::Result anyhow::Result { toml::to_string_pretty(cfg).with_context(|| "Failed to serialize default configuration") } + +/// Serialization/deserialization +mod log_level { + use serde::{self, Deserialize}; + use std::str::FromStr; + use tracing::Level; + + pub fn serialize(value: &Level, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(value.to_string().as_str()) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + Level::from_str(s.as_str()).map_err(serde::de::Error::custom) + } +} diff --git a/src/effect.rs b/src/effect.rs new file mode 100644 index 0000000000..230ab9432d --- /dev/null +++ b/src/effect.rs @@ -0,0 +1,125 @@ +//! Effects subsystem. +//! +//! Effects describe things that the creator of the effect intends to happen, +//! producing a value upon completion. They are, in fact, futures. +//! +//! A boxed, pinned future returning an event is called an effect and typed as an `Effect`, +//! where `Ev` is the event's type. +//! +//! ## Using effects +//! +//! To create an effect, an events factory is used that implements one or more of the factory +//! traits of this module. For example, given an events factory `eff`, we can create a +//! `set_timeout` future and turn it into an effect: +//! +//! ``` +//! # use std::time; +//! use crate::effect::EffectExt; +//! +//! enum Event { +//! ThreeSecondsElapsed(time::Duration) +//! } +//! +//! eff.set_timeout(time::Duration::from_secs(3)) +//! .event(Event::ThreeSecondsElapsed) +//! ``` +//! +//! This example will produce an effect that, after three seconds, creates an +//! `Event::ThreeSecondsElapsed`. Note that effects do nothing on their own, they need to be passed +//! to the `Reactor` (see `reactor` module) to be executed. +//! +//! ## Chaining futures and effects +//! +//! Effects are built from futures, which can be combined before being finalized +//! into an effect. However, only one effect can be created as the end result +//! of such a chain. +//! +//! It is possible to create an effect from multiple effects being run in parallel using `.also`: +//! +//! ``` +//! # use std::time; +//! use crate::effect::{EffectExt, EffectAlso}; +//! +//! enum Event { +//! ThreeSecondsElapsed(time::Duration), +//! FiveSecondsElapsed(time::Duration), +//! } +//! +//! // This effect produces a single event after five seconds: +//! eff.set_timeout(time::Duration::from_secs(3)) +//! .then(|_| eff.set_timeout(time::Duration::from_secs(2)) +//! .event(Event::FiveSecondsElapsed); +//! +//! // Here, two effects are run in parallel, resulting in two events: +//! eff.set_timeout(time::Duration::from_secs(3)) +//! .event(Event::ThreeSecondsElapsed) +//! .also(eff.set_timeout(time::Duration::from_secs(5)) +//! .event(Event::FiveSecondsElapsed)); +//! ``` +//! +//! ## Arbitrary effects +//! +//! While it is technically possible to turn any future into an effect, it is advisable to only use +//! the effects explicitly listed in this module through traits to create them. Post-processing on +//! effects to turn them into events should also be kept brief. + +use crate::util::Multiple; +use futures::future::BoxFuture; +use futures::FutureExt; +use smallvec::smallvec; +use std::future::Future; +use std::time; + +/// Effect type. +/// +/// Effects are just boxed futures that produce one or more events. +pub type Effect = BoxFuture<'static, Multiple>; + +/// Effect extension for futures. +/// +/// Used to convert futures into actual effects. +pub trait EffectExt: Future + Send { + /// Finalize a future into an effect that returns an event. + /// + /// The passed in function `f` is used to translate the resulting value from an effect into + fn event(self, f: F) -> Multiple> + where + F: FnOnce(Self::Output) -> U + 'static + Send, + U: 'static, + Self: Sized; + + /// Finalize a future into an effect that runs but drops the result. + fn ignore(self) -> Multiple>; +} + +impl EffectExt for T +where + T: Future + Send + 'static + Sized, +{ + fn event(self, f: F) -> Multiple> + where + F: FnOnce(Self::Output) -> U + 'static + Send, + U: 'static, + { + smallvec![self.map(f).map(|item| smallvec![item]).boxed()] + } + + fn ignore(self) -> Multiple> { + smallvec![self.map(|_| Multiple::new()).boxed()] + } +} + +/// Core effects. +pub trait Core { + /// Do not do anything. + /// + /// Immediately completes, can be used to trigger an event. + fn immediately(self) -> BoxFuture<'static, ()>; + + /// Set a timeout. + /// + /// Once the timeout fires, it will return the actual elapsed time since the execution (not + /// creation!) of this effect. Event loops typically execute effects right after a called event + /// handling function completes. + fn set_timeout(self, timeout: time::Duration) -> BoxFuture<'static, time::Duration>; +} diff --git a/src/main.rs b/src/main.rs index 834ca6f4c4..86864a5555 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,20 +1,23 @@ //! # CasperLabs blockchain node //! -//! This crate contain the core application for the CasperLabs blockchain. Run with `--help` to -//! see available command-line arguments. +//! This crate contain the core application for the CasperLabs blockchain. Run with `--help` to see +//! available command-line arguments. //! //! ## Application structure //! -//! While the `main` function is the central entrypoint for the node application, its core event -//! loop is found inside the reactor. To get a tour of the sourcecode, be sure to run -//! `cargo doc --open`. +//! While the [`main`](fn.main.html) function is the central entrypoint for the node application, +//! its core event loop is found inside the [reactor](reactor/index.html). To get a tour of the +//! sourcecode, be sure to run `cargo doc --open`. mod cli; mod config; +mod effect; +mod reactor; +mod util; use structopt::StructOpt; -/// Parse command-line arguments and run application. +/// Parse [command-line arguments](cli/index.html) and run application. #[tokio::main] pub async fn main() -> anyhow::Result<()> { // Parse CLI args and run selected subcommand. diff --git a/src/reactor.rs b/src/reactor.rs new file mode 100644 index 0000000000..4628f2051d --- /dev/null +++ b/src/reactor.rs @@ -0,0 +1,170 @@ +//! Reactor core. +//! +//! Any long running instance of the node application uses an event-dispatch pattern: Events are +//! generated and stored on an event queue, then processed one-by-one. This process happens inside +//! the *reactor*, which also exclusively holds the state of the application: +//! +//! 1. The reactor pops an event off of the queue. +//! 2. The event is dispatched by the reactor. Since the reactor holds mutable state, it can grant +//! any component that processes an event mutable, exclusive access to its state. +//! 3. Once the (synchronous) event processing has completed, the component returns an effect. +//! 4. The reactor spawns a task that executes these effects and eventually puts an event onto the +//! event queue. +//! 5. meanwhile go to 1. +//! +//! # Reactors +//! +//! There no single reactor, but a reactor for each application type, since it defines which +//! components are used and how they are wired up. The reactor defines the state by being a `struct` +//! of components, their initialization through the `Reactor::new` and a function to `dispatch` +//! events to components. +//! +//! With all these set up, a reactor can be `launch`ed, causing it to run indefinitely, processing +//! events. + +pub mod non_validator; +mod queue_kind; +pub mod validator; + +use crate::util::Multiple; +use crate::{config, effect, util}; +use async_trait::async_trait; +use std::{fmt, mem}; +use tracing::{debug, info, warn}; + +pub use queue_kind::Queue; + +/// Event queue handle +/// +/// The event queue handle is how almost all parts of the application interact with the reactor +/// outside of the normal event loop. It gives different parts a chance to schedule messages that +/// stem from things like external IO. +/// +/// It is also possible to schedule new events by directly processing effects. This allows re-use of +/// the existing code for handling particular effects, as adding events directly should be a matter +/// of last resort. +#[derive(Debug)] +pub struct EventQueueHandle(&'static util::round_robin::WeightedRoundRobin); + +// Copy and Clone need to be implemented manually, since `Ev` prevents derivation. +impl Copy for EventQueueHandle {} +impl Clone for EventQueueHandle { + fn clone(&self) -> Self { + EventQueueHandle(self.0) + } +} + +impl EventQueueHandle +where + Ev: Send + 'static, +{ + /// Create a new event queue handle. + fn new(round_robin: &'static util::round_robin::WeightedRoundRobin) -> Self { + EventQueueHandle(round_robin) + } + + /// Return the next event in the queue + /// + /// Awaits until there is an event, then returns it. + #[inline] + async fn next_event(self) -> (Ev, Queue) { + self.0.pop().await + } + + /// Process an effect. + /// + /// Spawns tasks that will process the given effects. + #[inline] + pub fn process_effects(self, effects: Multiple>) { + let eq = self; + // TODO: Properly carry around priorities. + let queue = Default::default(); + + for effect in effects { + tokio::spawn(async move { + for event in effect.await { + eq.schedule(event, queue).await; + } + }); + } + } + + /// Schedule an event in the given queue. + #[inline] + pub async fn schedule(self, event: Ev, queue_kind: Queue) { + self.0.push(event, queue_kind).await + } +} + +/// Reactor core. +/// +/// Any reactor implements should implement this trait and be launched by the `launch` function. +#[async_trait] +pub trait Reactor: Sized { + // Note: We've gone for the `Sized` bound here, since we return an instance in `new`. As an + // alternative, `new` could return a boxed instance instead, removing this requirement. + + /// Event type associated with reactor. + /// + /// Defines what kind of event the reactor processes. + type Event: Send + fmt::Debug + 'static; + + /// Dispatch an event on the reactor. + /// + /// This function is typically only called by the reactor itself to dispatch an event. It is + /// safe to call regardless, but will cause the event to skip the queue and things like + /// accounting. + fn dispatch_event(&mut self, event: Self::Event) -> Multiple>; + + /// Create a new instance of the reactor. + /// + /// This method creates the full state, which consists of all components, and returns a reactor + /// instances along with the effects the components generated upon instantiation. + /// + /// If any instantiation fails, an error is returned. + fn new( + cfg: &config::Config, + eq: EventQueueHandle, + ) -> anyhow::Result<(Self, Multiple>)>; +} + +/// Run a reactor. +/// +/// Start the reactor and associated background tasks, then enter main the event processing loop. +/// +/// `launch` will leak memory on start for global structures each time it is called. +/// +/// Errors are returned only if component initialization fails. +#[inline] +pub async fn launch(cfg: config::Config) -> anyhow::Result<()> { + let event_size = mem::size_of::(); + // Check if the event is of a reasonable size. This only emits a runtime warning at startup + // right now, since storage size of events is not an issue per se, but copying might be + // expensive if events get too large. + if event_size > 4 * mem::size_of::() { + warn!( + "event size is {} bytes, consider reducing it or boxing", + event_size + ); + } + + let scheduler = util::round_robin::WeightedRoundRobin::::new(Queue::weights()); + + // Create a new event queue for this reactor run. + let eq = EventQueueHandle::new(util::leak(scheduler)); + + let (mut reactor, initial_effects) = R::new(&cfg, eq)?; + + // Run all effects from component instantiation. + eq.process_effects(initial_effects); + + info!("entering reactor main loop"); + loop { + let (event, q) = eq.next_event().await; + debug!(?event, ?q, "event"); + + // Dispatch the event, then execute the resulting effect. + let effects = reactor.dispatch_event(event); + eq.process_effects(effects); + } +} diff --git a/src/reactor/non_validator.rs b/src/reactor/non_validator.rs new file mode 100644 index 0000000000..91267842f4 --- /dev/null +++ b/src/reactor/non_validator.rs @@ -0,0 +1 @@ +//! Reactor for non-validating nodes. diff --git a/src/reactor/queue_kind.rs b/src/reactor/queue_kind.rs new file mode 100644 index 0000000000..c05f5820b9 --- /dev/null +++ b/src/reactor/queue_kind.rs @@ -0,0 +1,57 @@ +//! Queue kinds +//! +//! The reactor's event queue uses different queues to group events by priority and polls them in a +//! round-robin manner. This way, events are only competing for time within one queue, non-congested +//! queues can always assume to be speedily processed. + +use enum_iterator::IntoEnumIterator; +use std::num; + +/// Scheduling priority. +/// +/// Priorities are ordered from lowest to highest. +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, IntoEnumIterator)] +pub enum Queue { + /// Network events that were initiated outside of this node. + /// + /// Their load may vary and grouping them together in one queue aides DoS protection. + NetworkIncoming, + /// Network events that were initiated by the local node, such as outgoing messages. + Network, + /// Events of unspecified priority. + /// + /// This is the default queue. + Regular, + /// Reporting events on the local node. + /// + /// Metric events take precendence over most other events since missing a request for metrics + /// might cause the requester to assume that the node is down and forcefully restart it. + Metrics, +} + +impl Default for Queue { + fn default() -> Self { + Queue::Regular + } +} + +impl Queue { + /// Return the weight of a specific queue. + /// + /// The weight determines how many events are at most processed from a specific queue during + /// each event processing round. + fn weight(self) -> num::NonZeroUsize { + num::NonZeroUsize::new(match self { + Queue::NetworkIncoming => 4, + Queue::Network => 4, + Queue::Regular => 8, + Queue::Metrics => 16, + }) + .expect("weight must be positive") + } + + /// Return weights of all possible `Queue`s. + pub(super) fn weights() -> Vec<(Self, num::NonZeroUsize)> { + Queue::into_enum_iter().map(|q| (q, q.weight())).collect() + } +} diff --git a/src/reactor/validator.rs b/src/reactor/validator.rs new file mode 100644 index 0000000000..05ecadbf2f --- /dev/null +++ b/src/reactor/validator.rs @@ -0,0 +1,31 @@ +//! Reactor for validator nodes. +//! +//! Validator nodes join the validator only network upon startup. +use crate::util::Multiple; +use crate::{config, effect, reactor}; + +/// Top-level event for the reactor. +#[derive(Debug)] +#[must_use] +pub enum Event {} + +/// Validator node reactor. +pub struct Reactor; + +impl reactor::Reactor for Reactor { + type Event = Event; + + fn new( + _cfg: &config::Config, + _eq: reactor::EventQueueHandle, + ) -> anyhow::Result<(Self, Multiple>)> { + // TODO: Instantiate components here. + let mut _effects = Multiple::new(); + + Ok((Reactor, _effects)) + } + + fn dispatch_event(&mut self, _event: Event) -> Multiple> { + todo!() + } +} diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000000..654df2ec4a --- /dev/null +++ b/src/util.rs @@ -0,0 +1,22 @@ +//! Various utilities. +//! +//! The Generic functions that are not limited to a particular module, but are too small to warrant +//! being factored out into standalone crates. + +pub mod round_robin; + +/// Leak a value. +/// +/// Moves a value to the heap and then forgets about, leaving only a static reference behind. +#[inline] +pub fn leak(value: T) -> &'static T { + Box::leak(Box::new(value)) +} + +/// Small amount store. +/// +/// Stored in a smallvec to avoid allocations in case there are less than three items grouped. The +/// size of two items is chosen because one item is the most common use case, and large items are +/// typically boxed. In the latter case two pointers and one enum variant discriminator is almost +/// the same size as an empty vec, which is two pointers. +pub type Multiple = smallvec::SmallVec<[T; 2]>; diff --git a/src/util/round_robin.rs b/src/util/round_robin.rs new file mode 100644 index 0000000000..c83bec328e --- /dev/null +++ b/src/util/round_robin.rs @@ -0,0 +1,153 @@ +//! Round-robin scheduling. +//! +//! This module implements a weighted round-robin scheduler that ensures no deadlocks occur, but +//! still allows prioriting events from one source over another. The module uses `tokio`s +//! synchronization primitives under the hood. + +use std::collections::{HashMap, VecDeque}; +use std::hash::Hash; +use std::num::NonZeroUsize; +use tokio::sync; + +/// Weighted round-robin scheduler. +/// +/// The weighted round-robin scheduler keeps queues internally and returns an item from a queue +/// when asked. Each queue is assigned a weight, which is simply the amount of items maximally +/// returned from it before moving on to the next queue. +/// +/// If a queue is empty, it is skipped until the next round. Queues are processed in the order they +/// are passed to the constructor function. +/// +/// The scheduler keeps track internally which queue needs to be popped next. +#[derive(Debug)] +pub struct WeightedRoundRobin { + /// Current iteration state. + state: sync::Mutex>, + + /// A list of slots that are round-robin'd. + slots: Vec>, + + /// Actual queues. + queues: HashMap>>, + + /// Number of items in all queues combined. + total: sync::Semaphore, +} + +/// The inner state of the queue iteration. +#[derive(Copy, Clone, Debug)] +struct IterationState { + /// The currently active slot. + /// + /// Once it has no tickets left, the next slot is loaded. + active_slot: Slot, + + /// The position of the active slot. Used to calculate the next slot. + active_slot_idx: usize, +} + +/// An internal slot in the round-robin scheduler. +/// +/// A slot marks the scheduling position, i.e. which queue we are currently +/// polling and how many tickets it has left before the next one is due. +#[derive(Copy, Clone, Debug)] +struct Slot { + /// The key, identifying a queue. + key: K, + + /// Number of items to return before moving on to the next queue. + tickets: usize, +} + +impl WeightedRoundRobin +where + K: Copy + Clone + Eq + Hash, +{ + /// Create new weighted round-robin scheduler. + /// + /// Creates a queue for each pair given in `weights`. The second component + /// of each `weight` is the number of times to return items from one + /// queue before moving on to the next one. + pub fn new(weights: Vec<(K, NonZeroUsize)>) -> Self { + assert!(!weights.is_empty(), "must provide at least one slot"); + + let queues = weights + .iter() + .map(|(idx, _)| (*idx, sync::Mutex::new(VecDeque::new()))) + .collect(); + let slots: Vec> = weights + .into_iter() + .map(|(key, tickets)| Slot { + key, + tickets: tickets.get(), + }) + .collect(); + let active_slot = slots[0]; + + WeightedRoundRobin { + state: sync::Mutex::new(IterationState { + active_slot, + active_slot_idx: 0, + }), + slots, + queues, + total: sync::Semaphore::new(0), + } + } + + /// Push an item to a queue identified by key. + /// + /// ## Panics + /// + /// Panics if the queue identified by key `queue` does not exist. + pub async fn push(&self, item: I, queue: K) { + self.queues + .get(&queue) + .expect("tried to push to non-existant queue") + .lock() + .await + .push_back(item); + + // We increase the item count after we've put the item into the queue. + self.total.add_permits(1); + } + + /// Return the next item from queue. + /// + /// Returns `None` if the queue is empty or an internal error occurred. The + /// latter should never happen. + pub async fn pop(&self) -> (I, K) { + self.total.acquire().await.forget(); + + let mut inner = self.state.lock().await; + + // We know we have at least one item in a queue. + loop { + let mut current_queue = self + .queues + // The queue disappearing should never happen. + .get(&inner.active_slot.key) + .expect("the queue disappeared. this should not happen") + .lock() + .await; + + if inner.active_slot.tickets == 0 || current_queue.is_empty() { + // Go to next queue slot if we've exhausted the current queue. + inner.active_slot_idx = (inner.active_slot_idx + 1) % self.slots.len(); + inner.active_slot = self.slots[inner.active_slot_idx]; + continue; + } + + // We have hit a queue that is not empty. Decrease tickets and pop. + inner.active_slot.tickets -= 1; + + break ( + current_queue + .pop_front() + // We hold the queue's lock and checked `is_empty` earlier. + .expect("item disappeared. this should not happen"), + inner.active_slot.key, + ); + } + } +}