From 879d1a2e27abde81bc7b13420c5c3bd6525a70dc Mon Sep 17 00:00:00 2001 From: Fraser Hutchison Date: Sun, 12 Jul 2020 11:21:42 +0100 Subject: [PATCH 1/7] NDRS-119: add deploy gossiper tests --- Cargo.lock | 128 +++---- node/Cargo.toml | 7 +- node/src/components.rs | 1 + node/src/components/deploy_gossiper.rs | 147 +------- node/src/components/deploy_gossiper/event.rs | 102 +++++ .../src/components/deploy_gossiper/message.rs | 41 +++ node/src/components/deploy_gossiper/tests.rs | 347 ++++++++++++++++++ node/src/components/in_memory_network.rs | 123 +++++-- node/src/components/small_network.rs | 2 +- .../small_network/{test.rs => tests.rs} | 48 +-- node/src/components/storage/config.rs | 4 +- node/src/lib.rs | 1 + node/src/logging.rs | 2 +- node/src/reactor.rs | 14 + node/src/testing/network.rs | 81 +++- node/src/types/deploy.rs | 77 ++++ node/src/utils/gossip_table.rs | 5 + 17 files changed, 827 insertions(+), 303 deletions(-) create mode 100644 node/src/components/deploy_gossiper/event.rs create mode 100644 node/src/components/deploy_gossiper/message.rs create mode 100644 node/src/components/deploy_gossiper/tests.rs rename node/src/components/small_network/{test.rs => tests.rs} (86%) diff --git a/Cargo.lock b/Cargo.lock index 36007e6873..68632351b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -299,12 +299,9 @@ dependencies = [ [[package]] name = "bytes" -version = "0.5.5" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "118cf036fbb97d0816e3c34b2d7a1e8cfc60f68fcf63d550ddbe9bd5f59c213b" -dependencies = [ - "loom", -] +checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" [[package]] name = "casperlabs-client" @@ -402,7 +399,7 @@ dependencies = [ "base64 0.12.3", "bincode", "blake2", - "bytes 0.5.5", + "bytes 0.5.6", "casperlabs-types", "chrono", "criterion", @@ -790,9 +787,9 @@ dependencies = [ [[package]] name = "ctrlc" -version = "3.1.4" +version = "3.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a4ba686dff9fa4c1c9636ce1010b0cf98ceb421361b0bb3d6faeec43bd217a7" +checksum = "54dedab740bc412d514cfbc4a1d9d5d16fed02c4b14a7be129003c07fdc33b9b" dependencies = [ "nix", "winapi 0.3.9", @@ -819,7 +816,7 @@ checksum = "cb582b60359da160a9477ee80f15c8d784c477e69c217ef2cdd4169c24ea380f" dependencies = [ "proc-macro2 1.0.18", "quote 1.0.7", - "syn 1.0.33", + "syn 1.0.34", ] [[package]] @@ -830,7 +827,7 @@ checksum = "298998b1cf6b5b2c8a7b023dfd45821825ce3ba8a8af55c921a0e734e4653f76" dependencies = [ "proc-macro2 1.0.18", "quote 1.0.7", - "syn 1.0.33", + "syn 1.0.34", ] [[package]] @@ -1130,7 +1127,7 @@ checksum = "1e94aa31f7c0dc764f57896dc615ddd76fc13b0d5dca7eb6cc5e018a5a09ec06" dependencies = [ "proc-macro2 1.0.18", "quote 1.0.7", - "syn 1.0.33", + "syn 1.0.34", ] [[package]] @@ -1171,7 +1168,7 @@ checksum = "aa4da3c766cd7a0db8242e326e9e4e081edd567072893ed320008189715366a4" dependencies = [ "proc-macro2 1.0.18", "quote 1.0.7", - "syn 1.0.33", + "syn 1.0.34", "synstructure", ] @@ -1320,7 +1317,7 @@ dependencies = [ "proc-macro-hack", "proc-macro2 1.0.18", "quote 1.0.7", - "syn 1.0.33", + "syn 1.0.34", ] [[package]] @@ -1358,19 +1355,6 @@ dependencies = [ "slab 0.4.2", ] -[[package]] -name = "generator" -version = "0.6.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "add72f17bb81521258fcc8a7a3245b1e184e916bfbe34f0ea89558f440df5c68" -dependencies = [ - "cc", - "libc", - "log 0.4.8", - "rustc_version", - "winapi 0.3.9", -] - [[package]] name = "generic-array" version = "0.12.3" @@ -1501,21 +1485,21 @@ dependencies = [ [[package]] name = "h2" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79b7246d7e4b979c03fa093da39cfb3617a96bbeee6310af63991668d7e843ff" +checksum = "993f9e0baeed60001cf565546b0d3dbe6a6ad23f2bd31644a133c641eccf6d53" dependencies = [ - "bytes 0.5.5", + "bytes 0.5.6", "fnv", "futures-core", "futures-sink", "futures-util", "http", "indexmap", - "log 0.4.8", "slab 0.4.2", "tokio 0.2.21", "tokio-util", + "tracing", ] [[package]] @@ -1532,7 +1516,7 @@ checksum = "ed18eb2459bf1a09ad2d6b1547840c3e5e62882fa09b9a6a20b1de8e3228848f" dependencies = [ "base64 0.12.3", "bitflags 1.2.1", - "bytes 0.5.5", + "bytes 0.5.6", "headers-core", "http", "mime 0.3.16", @@ -1605,7 +1589,7 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d569972648b2c512421b5f2a405ad6ac9666547189d0c5477a3f200f3e02f9" dependencies = [ - "bytes 0.5.5", + "bytes 0.5.6", "fnv", "itoa", ] @@ -1616,7 +1600,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13d5ff830006f7646652e057693569bfe0d51760c0085a071769d142a205111b" dependencies = [ - "bytes 0.5.5", + "bytes 0.5.6", "http", ] @@ -1659,11 +1643,11 @@ dependencies = [ [[package]] name = "hyper" -version = "0.13.6" +version = "0.13.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6e7655b9594024ad0ee439f3b5a7299369dc2a3f459b47c696f9ff676f9aa1f" +checksum = "3e68a8dd9716185d9e64ea473ea6ef63529252e3e27623295a0378a19665d5eb" dependencies = [ - "bytes 0.5.5", + "bytes 0.5.6", "futures-channel", "futures-core", "futures-util", @@ -1672,12 +1656,12 @@ dependencies = [ "http-body", "httparse", "itoa", - "log 0.4.8", "pin-project", "socket2", "time", "tokio 0.2.21", "tower-service", + "tracing", "want", ] @@ -1687,7 +1671,7 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d979acc56dcb5b8dddba3917601745e877576475aa046df3226eabdecef78eed" dependencies = [ - "bytes 0.5.5", + "bytes 0.5.6", "hyper", "native-tls", "tokio 0.2.21", @@ -1731,7 +1715,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19a8a95243d5a0398cae618ec29477c6e3cb631152be5c19481f80bc71559754" dependencies = [ - "bytes 0.5.5", + "bytes 0.5.6", ] [[package]] @@ -1945,17 +1929,6 @@ dependencies = [ "serde", ] -[[package]] -name = "loom" -version = "0.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ecc775857611e1df29abba5c41355cdf540e7e9d4acfdf0f355eefee82330b7" -dependencies = [ - "cfg-if", - "generator", - "scoped-tls 0.1.2", -] - [[package]] name = "main-purse" version = "0.1.0" @@ -2299,7 +2272,7 @@ checksum = "0c8b15b261814f992e33760b1fca9fe8b693d8a65299f20c9901688636cfb746" dependencies = [ "proc-macro2 1.0.18", "quote 1.0.7", - "syn 1.0.33", + "syn 1.0.34", ] [[package]] @@ -2555,7 +2528,7 @@ checksum = "6a0ffd45cf79d88737d7cc85bfd5d2894bee1139b356e616fe85dc389c61aaf7" dependencies = [ "proc-macro2 1.0.18", "quote 1.0.7", - "syn 1.0.33", + "syn 1.0.34", ] [[package]] @@ -2572,9 +2545,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pkg-config" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05da548ad6865900e60eaba7f589cc0783590a92e940c26953ff81ddbab2d677" +checksum = "d36492546b6af1463394d46f0c834346f31548646f6ba10849802c9c9a27ac33" [[package]] name = "plotters" @@ -2743,7 +2716,7 @@ dependencies = [ "proc-macro-error-attr", "proc-macro2 1.0.18", "quote 1.0.7", - "syn 1.0.33", + "syn 1.0.34", "version_check 0.9.2", ] @@ -2755,7 +2728,7 @@ checksum = "3cc9795ca17eb581285ec44936da7fc2335a3f34f2ddd13118b6f4d515435c50" dependencies = [ "proc-macro2 1.0.18", "quote 1.0.7", - "syn 1.0.33", + "syn 1.0.34", "syn-mid", "version_check 0.9.2", ] @@ -3147,9 +3120,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.1.56" +version = "0.1.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84" +checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" [[package]] name = "redox_users" @@ -3214,7 +3187,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b82c9238b305f26f53443e3a4bc8528d64b8d0bee408ec949eb7bf5635ec680" dependencies = [ "base64 0.12.3", - "bytes 0.5.5", + "bytes 0.5.6", "encoding_rs", "futures-core", "futures-util", @@ -3455,7 +3428,7 @@ checksum = "2a0be94b04690fbaed37cddffc5c134bf537c8e3329d53e982fe04c374978f8e" dependencies = [ "proc-macro2 1.0.18", "quote 1.0.7", - "syn 1.0.33", + "syn 1.0.34", ] [[package]] @@ -3633,7 +3606,7 @@ dependencies = [ "proc-macro-error", "proc-macro2 1.0.18", "quote 1.0.7", - "syn 1.0.33", + "syn 1.0.34", ] [[package]] @@ -3677,9 +3650,9 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.33" +version = "1.0.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8d5d96e8cbb005d6959f119f773bfaebb5684296108fb32600c00cde305b2cd" +checksum = "936cae2873c940d92e697597c5eee105fb570cd5689c695806f672883653349b" dependencies = [ "proc-macro2 1.0.18", "quote 1.0.7", @@ -3694,7 +3667,7 @@ checksum = "7be3539f6c128a931cf19dcee741c1af532c7fd387baa739c03dd2e96479338a" dependencies = [ "proc-macro2 1.0.18", "quote 1.0.7", - "syn 1.0.33", + "syn 1.0.34", ] [[package]] @@ -3705,7 +3678,7 @@ checksum = "b834f2d66f734cb897113e34aaff2f1ab4719ca946f9a7358dba8f8064148701" dependencies = [ "proc-macro2 1.0.18", "quote 1.0.7", - "syn 1.0.33", + "syn 1.0.34", "unicode-xid 0.2.1", ] @@ -3844,7 +3817,7 @@ checksum = "bd80fc12f73063ac132ac92aceea36734f04a1d93c1240c6944e23a3b8841793" dependencies = [ "proc-macro2 1.0.18", "quote 1.0.7", - "syn 1.0.33", + "syn 1.0.34", ] [[package]] @@ -3931,7 +3904,7 @@ version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d099fa27b9702bed751524694adbe393e18b36b204da91eb1cbbbbb4a5ee2d58" dependencies = [ - "bytes 0.5.5", + "bytes 0.5.6", "fnv", "futures-core", "iovec", @@ -4024,7 +3997,7 @@ checksum = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389" dependencies = [ "proc-macro2 1.0.18", "quote 1.0.7", - "syn 1.0.33", + "syn 1.0.34", ] [[package]] @@ -4062,7 +4035,7 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebdd897b01021779294eb09bb3b52b6e11b0747f9f7e333a84bef532b656de99" dependencies = [ - "bytes 0.5.5", + "bytes 0.5.6", "derivative", "futures 0.3.5", "pin-project", @@ -4223,7 +4196,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499" dependencies = [ - "bytes 0.5.5", + "bytes 0.5.6", "futures-core", "futures-sink", "log 0.4.8", @@ -4253,6 +4226,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2e2a2de6b0d5cbb13fc21193a2296888eaab62b6044479aafb3c54c01c29fcd" dependencies = [ "cfg-if", + "log 0.4.8", "tracing-attributes", "tracing-core", ] @@ -4265,7 +4239,7 @@ checksum = "f0693bf8d6f2bf22c690fc61a9d21ac69efdbb894a17ed596b9af0f01e64b84b" dependencies = [ "proc-macro2 1.0.18", "quote 1.0.7", - "syn 1.0.33", + "syn 1.0.34", ] [[package]] @@ -4421,9 +4395,9 @@ dependencies = [ [[package]] name = "try-lock" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e604eb7b43c06650e854be16a2a03155743d3752dd1c943f6829e26b7a36e382" +checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" [[package]] name = "tungstenite" @@ -4433,7 +4407,7 @@ checksum = "cfea31758bf674f990918962e8e5f07071a3161bd7c4138ed23e416e1ac4264e" dependencies = [ "base64 0.11.0", "byteorder", - "bytes 0.5.5", + "bytes 0.5.6", "http", "httparse", "input_buffer", @@ -4712,7 +4686,7 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e95175b7a927258ecbb816bdada3cc469cb68593e7940b96a60f4af366a9970" dependencies = [ - "bytes 0.5.5", + "bytes 0.5.6", "futures 0.3.5", "headers", "http", @@ -4761,7 +4735,7 @@ dependencies = [ "log 0.4.8", "proc-macro2 1.0.18", "quote 1.0.7", - "syn 1.0.33", + "syn 1.0.34", "wasm-bindgen-shared", ] @@ -4795,7 +4769,7 @@ checksum = "9adff9ee0e94b926ca81b57f57f86d5545cdcb1d259e21ec9bdd95b901754c75" dependencies = [ "proc-macro2 1.0.18", "quote 1.0.7", - "syn 1.0.33", + "syn 1.0.34", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/node/Cargo.toml b/node/Cargo.toml index 200f2fef44..2142f82673 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -59,7 +59,7 @@ smallvec = "1.4.0" structopt = "0.3.14" tempfile = "3.1.0" thiserror = "1.0.18" -tokio = { version = "0.2.20", features = ["macros", "rt-threaded", "sync", "tcp", "time", "blocking"] } +tokio = { version = "0.2.20", features = ["blocking", "macros", "rt-threaded", "sync", "tcp", "time"] } tokio-openssl = "0.4.0" tokio-serde = { version = "0.6.1", features = ["messagepack"] } tokio-util = { version = "0.3.1", features = ["codec"] } @@ -76,13 +76,14 @@ wasmi = "0.6.2" [dev-dependencies] assert_matches = "1.3.0" +criterion = "0.3.3" +fake_instant = "0.4.0" lazy_static = "1" pnet = "0.26.0" proptest = "0.9.4" -fake_instant = "0.4.0" rand_xorshift = { version = "~0.2.0" } rand_core = "0.5.1" -criterion = "0.3.3" +tokio = { version = "0.2.20", features = ["test-util"] } [features] vendored-openssl = ['openssl/vendored'] diff --git a/node/src/components.rs b/node/src/components.rs index a9c2f77c3d..cf32dda685 100644 --- a/node/src/components.rs +++ b/node/src/components.rs @@ -11,6 +11,7 @@ pub(crate) mod deploy_gossiper; #[allow(unused)] pub(crate) mod deploy_buffer; // The `in_memory_network` is public for use in doctests. +#[cfg(test)] pub mod in_memory_network; pub(crate) mod pinger; pub(crate) mod small_network; diff --git a/node/src/components/deploy_gossiper.rs b/node/src/components/deploy_gossiper.rs index 2bc4e7783c..a30317c57e 100644 --- a/node/src/components/deploy_gossiper.rs +++ b/node/src/components/deploy_gossiper.rs @@ -1,14 +1,13 @@ -use std::{ - collections::HashSet, - fmt::{self, Display, Formatter}, - time::Duration, -}; +mod event; +mod message; +mod tests; + +use std::{collections::HashSet, time::Duration}; use futures::FutureExt; use rand::Rng; -use serde::{Deserialize, Serialize}; use smallvec::smallvec; -use tracing::{debug, error, warn}; +use tracing::{debug, error}; use crate::{ components::{ @@ -21,10 +20,13 @@ use crate::{ EffectBuilder, EffectExt, Effects, }, types::{Deploy, DeployHash}, - utils::{DisplayIter, GossipAction, GossipTable}, + utils::{GossipAction, GossipTable}, GossipTableConfig, }; +pub use event::Event; +pub use message::Message; + trait ReactorEvent: From + From> + From> + Send { @@ -35,133 +37,6 @@ impl ReactorEvent for T where { } -/// `DeployGossiper` events. -#[derive(Debug)] -pub enum Event { - /// A new deploy has been received to be gossiped. - DeployReceived { deploy: Box }, - /// The network component gossiped to the included peers. - GossipedTo { - deploy_hash: DeployHash, - peers: HashSet, - }, - /// The timeout for waiting for a gossip response has elapsed and we should check the response - /// arrived. - CheckGossipTimeout { - deploy_hash: DeployHash, - peer: NodeId, - }, - /// The timeout for waiting for the full deploy body has elapsed and we should check the - /// response arrived. - CheckGetFromPeerTimeout { - deploy_hash: DeployHash, - peer: NodeId, - }, - /// An incoming gossip network message. - MessageReceived { sender: NodeId, message: Message }, - /// The result of the `DeployGossiper` putting a deploy to the storage component. If the - /// result is `Ok`, the deploy hash should be gossiped onwards. - PutToStoreResult { - deploy_hash: DeployHash, - maybe_sender: Option, - result: storage::Result<()>, - }, - /// The result of the `DeployGossiper` getting a deploy from the storage component. If the - /// result is `Ok`, the deploy should be sent to the requesting peer. - GetFromStoreResult { - deploy_hash: DeployHash, - requester: NodeId, - result: Box>, - }, -} - -impl Display for Event { - fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { - match self { - Event::DeployReceived { deploy } => { - write!(formatter, "new deploy received: {}", deploy.id()) - } - Event::GossipedTo { deploy_hash, peers } => write!( - formatter, - "gossiped {} to {}", - deploy_hash, - DisplayIter::new(peers) - ), - Event::CheckGossipTimeout { deploy_hash, peer } => write!( - formatter, - "check gossip timeout for {} with {}", - deploy_hash, peer - ), - Event::CheckGetFromPeerTimeout { deploy_hash, peer } => write!( - formatter, - "check get from peer timeout for {} with {}", - deploy_hash, peer - ), - Event::MessageReceived { sender, message } => { - write!(formatter, "{} received from {}", message, sender) - } - Event::PutToStoreResult { - deploy_hash, - result, - .. - } => { - if result.is_ok() { - write!(formatter, "put {} to store", deploy_hash) - } else { - write!(formatter, "failed to put {} to store", deploy_hash) - } - } - Event::GetFromStoreResult { - deploy_hash, - result, - .. - } => { - if result.is_ok() { - write!(formatter, "got {} from store", deploy_hash) - } else { - write!(formatter, "failed to get {} from store", deploy_hash) - } - } - } - } -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub enum Message { - /// Gossiped out to random peers to notify them of a `Deploy` we hold. - Gossip(DeployHash), - /// Response to a `Gossip` message. If `is_already_held` is false, the recipient should treat - /// this as a `GetRequest` and send a `GetResponse` containing the `Deploy`. - GossipResponse { - deploy_hash: DeployHash, - is_already_held: bool, - }, - /// Sent if a `Deploy` fails to arrive, either after sending a `GossipResponse` with - /// `is_already_held` set to false, or after a previous `GetRequest`. - GetRequest(DeployHash), - /// Sent in response to a `GetRequest`, or to a peer which responded to gossip indicating it - /// didn't already hold the full `Deploy`. - GetResponse(Box), -} - -impl Display for Message { - fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { - match self { - Message::Gossip(deploy_hash) => write!(formatter, "gossip({})", deploy_hash), - Message::GossipResponse { - deploy_hash, - is_already_held, - } => write!( - formatter, - "gossip-response({}, {})", - deploy_hash, is_already_held - ), - Message::GetRequest(deploy_hash) => write!(formatter, "get-request({})", deploy_hash), - Message::GetResponse(deploy) => write!(formatter, "get-response({})", deploy.id()), - } - } -} - /// The component which gossips `Deploy`s to peers and handles incoming `Deploy`s which have been /// gossiped to it. #[derive(Debug)] @@ -222,7 +97,7 @@ impl DeployGossiper { // in the entry being removed. if peers.is_empty() { self.table.pause(&deploy_hash); - warn!( + debug!( "paused gossiping {} since no more peers to gossip to", deploy_hash ); diff --git a/node/src/components/deploy_gossiper/event.rs b/node/src/components/deploy_gossiper/event.rs new file mode 100644 index 0000000000..456f204501 --- /dev/null +++ b/node/src/components/deploy_gossiper/event.rs @@ -0,0 +1,102 @@ +use std::{ + collections::HashSet, + fmt::{self, Display, Formatter}, +}; + +use super::Message; +use crate::{ + components::{small_network::NodeId, storage}, + types::{Deploy, DeployHash}, + utils::DisplayIter, +}; + +/// `DeployGossiper` events. +#[derive(Debug)] +pub enum Event { + /// A new deploy has been received to be gossiped. + DeployReceived { deploy: Box }, + /// The network component gossiped to the included peers. + GossipedTo { + deploy_hash: DeployHash, + peers: HashSet, + }, + /// The timeout for waiting for a gossip response has elapsed and we should check the response + /// arrived. + CheckGossipTimeout { + deploy_hash: DeployHash, + peer: NodeId, + }, + /// The timeout for waiting for the full deploy body has elapsed and we should check the + /// response arrived. + CheckGetFromPeerTimeout { + deploy_hash: DeployHash, + peer: NodeId, + }, + /// An incoming gossip network message. + MessageReceived { sender: NodeId, message: Message }, + /// The result of the `DeployGossiper` putting a deploy to the storage component. If the + /// result is `Ok`, the deploy hash should be gossiped onwards. + PutToStoreResult { + deploy_hash: DeployHash, + maybe_sender: Option, + result: storage::Result<()>, + }, + /// The result of the `DeployGossiper` getting a deploy from the storage component. If the + /// result is `Ok`, the deploy should be sent to the requesting peer. + GetFromStoreResult { + deploy_hash: DeployHash, + requester: NodeId, + result: Box>, + }, +} + +impl Display for Event { + fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { + match self { + Event::DeployReceived { deploy } => { + write!(formatter, "new deploy received: {}", deploy.id()) + } + Event::GossipedTo { deploy_hash, peers } => write!( + formatter, + "gossiped {} to {}", + deploy_hash, + DisplayIter::new(peers) + ), + Event::CheckGossipTimeout { deploy_hash, peer } => write!( + formatter, + "check gossip timeout for {} with {}", + deploy_hash, peer + ), + Event::CheckGetFromPeerTimeout { deploy_hash, peer } => write!( + formatter, + "check get from peer timeout for {} with {}", + deploy_hash, peer + ), + Event::MessageReceived { sender, message } => { + write!(formatter, "{} received from {}", message, sender) + } + Event::PutToStoreResult { + deploy_hash, + result, + .. + } => { + if result.is_ok() { + write!(formatter, "put {} to store", deploy_hash) + } else { + write!(formatter, "failed to put {} to store", deploy_hash) + } + } + Event::GetFromStoreResult { + deploy_hash, + result, + .. + } => { + if result.is_ok() { + write!(formatter, "got {} from store", deploy_hash) + } else { + write!(formatter, "failed to get {} from store", deploy_hash) + } + } + } + } +} diff --git a/node/src/components/deploy_gossiper/message.rs b/node/src/components/deploy_gossiper/message.rs new file mode 100644 index 0000000000..b3b0a08681 --- /dev/null +++ b/node/src/components/deploy_gossiper/message.rs @@ -0,0 +1,41 @@ +use std::fmt::{self, Display, Formatter}; + +use serde::{Deserialize, Serialize}; + +use crate::types::{Deploy, DeployHash}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub enum Message { + /// Gossiped out to random peers to notify them of a `Deploy` we hold. + Gossip(DeployHash), + /// Response to a `Gossip` message. If `is_already_held` is false, the recipient should treat + /// this as a `GetRequest` and send a `GetResponse` containing the `Deploy`. + GossipResponse { + deploy_hash: DeployHash, + is_already_held: bool, + }, + /// Sent if a `Deploy` fails to arrive, either after sending a `GossipResponse` with + /// `is_already_held` set to false, or after a previous `GetRequest`. + GetRequest(DeployHash), + /// Sent in response to a `GetRequest`, or to a peer which responded to gossip indicating it + /// didn't already hold the full `Deploy`. + GetResponse(Box), +} + +impl Display for Message { + fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { + match self { + Message::Gossip(deploy_hash) => write!(formatter, "gossip({})", deploy_hash), + Message::GossipResponse { + deploy_hash, + is_already_held, + } => write!( + formatter, + "gossip-response({}, {})", + deploy_hash, is_already_held + ), + Message::GetRequest(deploy_hash) => write!(formatter, "get-request({})", deploy_hash), + Message::GetResponse(deploy) => write!(formatter, "get-response({})", deploy.id()), + } + } +} diff --git a/node/src/components/deploy_gossiper/tests.rs b/node/src/components/deploy_gossiper/tests.rs new file mode 100644 index 0000000000..a0900c374b --- /dev/null +++ b/node/src/components/deploy_gossiper/tests.rs @@ -0,0 +1,347 @@ +#![cfg(test)] +use std::{ + collections::{BTreeSet, HashMap}, + fmt::{self, Debug, Display, Formatter}, + iter, +}; + +use derive_more::From; +use tempfile::TempDir; +use tokio::time; +use tracing::debug; + +use super::*; +use crate::{ + components::{ + in_memory_network::{InMemoryNetwork, NetworkController, NodeId}, + storage::{self, Storage, StorageType}, + }, + effect::announcements::NetworkAnnouncement, + reactor::{self, EventQueueHandle, Reactor as ReactorTrait, Runner}, + testing::network::{Network, NetworkedReactor}, + types::Deploy, +}; + +/// Top-level event for the reactor. +#[derive(Debug, From)] +#[must_use] +enum Event { + #[from] + /// Storage event. + Storage(StorageRequest), + /// Deploy gossiper event. + #[from] + DeployGossiper(super::Event), + /// Network request. + #[from] + NetworkRequest(NetworkRequest), + /// Network announcement. + #[from] + NetworkAnnouncement(NetworkAnnouncement), +} + +// From> + +impl Display for Event { + fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { + Debug::fmt(self, formatter) + } +} + +#[derive(Debug)] +struct Reactor { + network: InMemoryNetwork, + storage: Storage, + deploy_gossiper: DeployGossiper, + _storage_tempdir: TempDir, +} + +impl Drop for Reactor { + fn drop(&mut self) { + NetworkController::::remove_node(&self.network.node_id()) + } +} + +impl ReactorTrait for Reactor { + type Event = Event; + type Config = GossipTableConfig; + type Error = storage::Error; + + fn new( + config: Self::Config, + event_queue: EventQueueHandle, + rng: &mut R, + ) -> Result<(Self, Effects), Self::Error> { + let network = NetworkController::create_node(event_queue, rng); + + let (storage_config, _storage_tempdir) = storage::Config::default_for_tests(); + let storage = Storage::new(&storage_config)?; + + let deploy_gossiper = DeployGossiper::new(config); + + let reactor = Reactor { + network, + storage, + deploy_gossiper, + _storage_tempdir, + }; + + let effects = Effects::new(); + + Ok((reactor, effects)) + } + + fn dispatch_event( + &mut self, + effect_builder: EffectBuilder, + rng: &mut R, + event: Event, + ) -> Effects { + match event { + Event::Storage(event) => reactor::wrap_effects( + Event::Storage, + self.storage.handle_event(effect_builder, rng, event), + ), + Event::DeployGossiper(event) => reactor::wrap_effects( + Event::DeployGossiper, + self.deploy_gossiper + .handle_event(effect_builder, rng, event), + ), + Event::NetworkRequest(request) => reactor::wrap_effects( + Event::NetworkRequest, + self.network.handle_event(effect_builder, rng, request), + ), + Event::NetworkAnnouncement(NetworkAnnouncement::MessageReceived { + sender, + payload, + }) => { + let event = super::Event::MessageReceived { + sender, + message: payload, + }; + reactor::wrap_effects( + From::from, + self.deploy_gossiper + .handle_event(effect_builder, rng, event), + ) + } + } + } +} + +impl NetworkedReactor for Reactor { + type NodeId = NodeId; + + fn node_id(&self) -> NodeId { + self.network.node_id() + } +} + +#[tokio::test] +async fn should_gossip() { + const NETWORK_SIZE_MIN: usize = 2; + const NETWORK_SIZE_MAX: usize = 20; + const DEPLOY_COUNT_MIN: usize = 1; + const DEPLOY_COUNT_MAX: usize = 30; + const TIMEOUT: Duration = Duration::from_secs(20); + + NetworkController::::create_active(); + let mut network = Network::::new(); + let mut rng = rand::thread_rng(); + + // Add `network_size` nodes. + let network_size: usize = rng.gen_range(NETWORK_SIZE_MIN, NETWORK_SIZE_MAX + 1); + let mut node_ids = vec![]; + for _ in 0..network_size { + let (node_id, _runner) = network.add_node(&mut rng).await.unwrap(); + node_ids.push(node_id); + } + + // Create `deploy_count` random deploys. + let deploy_count: usize = rng.gen_range(DEPLOY_COUNT_MIN, DEPLOY_COUNT_MAX + 1); + let (all_deploy_hashes, mut deploys): (BTreeSet<_>, Vec<_>) = iter::repeat_with(|| { + let deploy = Box::new(rng.gen::()); + (*deploy.id(), deploy) + }) + .take(deploy_count) + .unzip(); + + // Give each deploy to a randomly-chosen node to be gossiped. + for deploy in deploys.drain(..) { + let event = Event::DeployGossiper(super::Event::DeployReceived { deploy }); + let index: usize = rng.gen_range(0, network_size); + network + .inject_event_on(&node_ids[index], &mut rng, event) + .await; + } + + // Check every node has every deploy stored locally. + let all_deploys_held = |nodes: &HashMap>| { + nodes.values().all(|runner| { + let hashes = runner + .reactor() + .storage + .deploy_store() + .ids() + .unwrap() + .into_iter() + .collect(); + all_deploy_hashes == hashes + }) + }; + assert!(network.settle_on(&mut rng, all_deploys_held, TIMEOUT).await); + + NetworkController::::remove_active(); +} + +#[tokio::test] +async fn should_get_from_alternate_source() { + const NETWORK_SIZE: usize = 3; + const POLL_DURATION: Duration = Duration::from_millis(10); + const TIMEOUT: Duration = Duration::from_secs(2); + + NetworkController::::create_active(); + let mut network = Network::::new(); + let mut rng = rand::thread_rng(); + + // Add `NETWORK_SIZE` nodes. + let mut node_ids = vec![]; + for _ in 0..NETWORK_SIZE { + let (node_id, _runner) = network.add_node(&mut rng).await.unwrap(); + node_ids.push(node_id); + } + + // Create random deploy. + let deploy = Box::new(rng.gen::()); + let deploy_id = *deploy.id(); + + // Give the deploy to nodes 0 and 1 to be gossiped. + for node_id in node_ids.iter().take(2) { + let event = Event::DeployGossiper(super::Event::DeployReceived { + deploy: deploy.clone(), + }); + network.inject_event_on(&node_id, &mut rng, event).await; + } + + // Run node 0 until it has sent the gossip request then remove it from the network. This + // equates to three events: + // 1. Storage PutDeploy + // 2. DeployGossiper PutToStoreResult + // 3. NetworkRequest Gossip + let mut event_count = 0; + while event_count < 3 { + event_count += network.crank(&node_ids[0], &mut rng).await; + time::delay_for(POLL_DURATION).await; + } + assert!(network.remove_node(&node_ids[0]).is_some()); + debug!("removed node {}", &node_ids[0]); + + // Run node 2 until it receives the gossip request from node 0. This equates to two events: + // 1. NetworkAnnouncement MessageReceived of node 0's gossip message + // 2. NetworkRequest SendMessage gossip response. + event_count = 0; + while event_count < 2 { + event_count += network.crank(&node_ids[2], &mut rng).await; + time::delay_for(POLL_DURATION).await; + } + + // Run nodes 1 and 2 until settled. Node 2 will be waiting for the deploy from node 0. + assert!(network.settle(&mut rng, POLL_DURATION, TIMEOUT).await); + + // Advance time to trigger node 2's timeout causing it to request the deploy from node 1. + let secs_to_advance = GossipTableConfig::default().get_remainder_timeout_secs(); + time::pause(); + time::advance(Duration::from_secs(secs_to_advance)).await; + time::resume(); + debug!("advanced time by {} secs", secs_to_advance); + + // Check node 0 has the deploy stored locally. + let deploy_held = |nodes: &HashMap>| { + let runner = nodes.get(&node_ids[2]).unwrap(); + runner + .reactor() + .storage + .deploy_store() + .get(&deploy_id) + .map(|retrieved_deploy| retrieved_deploy == *deploy) + .unwrap_or_default() + }; + assert!(network.settle_on(&mut rng, deploy_held, TIMEOUT).await); + + NetworkController::::remove_active(); +} + +#[tokio::test] +async fn should_timeout_gossip_response() { + const POLL_DURATION: Duration = Duration::from_millis(10); + const TIMEOUT: Duration = Duration::from_secs(2); + + NetworkController::::create_active(); + let mut network = Network::::new(); + let mut rng = rand::thread_rng(); + + // The target number of peers to infect with a given piece of data. + let infection_target = GossipTableConfig::default().infection_target(); + + // Add `infection_target + 1` nodes. + let mut node_ids = vec![]; + for _ in 0..(infection_target + 1) { + let (node_id, _runner) = network.add_node(&mut rng).await.unwrap(); + node_ids.push(node_id); + } + + // Create random deploy. + let deploy = Box::new(rng.gen::()); + let deploy_id = *deploy.id(); + + // Give the deploy to node 0 to be gossiped. + let event = Event::DeployGossiper(super::Event::DeployReceived { + deploy: deploy.clone(), + }); + network.inject_event_on(&node_ids[0], &mut rng, event).await; + + // Run node 0 until it has sent the gossip requests then remove it from the network. This + // equates to four events: + // 1. Storage PutDeploy + // 2. DeployGossiper PutToStoreResult + // 3. NetworkRequest Gossip + // 4. DeployGossiper GossipedTo + let mut event_count = 0; + while event_count < 4 { + event_count += network.crank(&node_ids[0], &mut rng).await; + time::delay_for(POLL_DURATION).await; + } + + // Replace all nodes except node 0 with new nodes. + for node_id in node_ids.drain(1..) { + assert!(network.remove_node(&node_id).is_some()); + debug!("removed node {}", node_id); + } + for _ in 0..infection_target { + let (node_id, _runner) = network.add_node(&mut rng).await.unwrap(); + node_ids.push(node_id); + } + + // Advance time to trigger node 0's timeout causing it to gossip to the new nodes. + let secs_to_advance = GossipTableConfig::default().gossip_request_timeout_secs(); + time::pause(); + time::advance(Duration::from_secs(secs_to_advance)).await; + time::resume(); + debug!("advanced time by {} secs", secs_to_advance); + + // Check every node has every deploy stored locally. + let deploy_held = |nodes: &HashMap>| { + nodes.values().all(|runner| { + runner + .reactor() + .storage + .deploy_store() + .get(&deploy_id) + .map(|retrieved_deploy| retrieved_deploy == *deploy) + .unwrap_or_default() + }) + }; + assert!(network.settle_on(&mut rng, deploy_held, TIMEOUT).await); + + NetworkController::::remove_active(); +} diff --git a/node/src/components/in_memory_network.rs b/node/src/components/in_memory_network.rs index e1811374dd..f73e1175b4 100644 --- a/node/src/components/in_memory_network.rs +++ b/node/src/components/in_memory_network.rs @@ -15,19 +15,29 @@ //! //! ```rust //! # #![allow(dead_code)] // FIXME: Remove me -//! # use std::{collections::HashMap, fmt::{self, Formatter, Debug, Display}, ops::AddAssign, -//! # time::Duration}; +//! # use std::{ +//! # collections::HashMap, +//! # fmt::{self, Debug, Display, Formatter}, +//! # ops::AddAssign, +//! # time::Duration, +//! # }; //! # //! # use derive_more::From; //! # use maplit::hashmap; -//! # use rand::{Rng, rngs::OsRng}; +//! # use rand::{rngs::OsRng, Rng}; //! # -//! # use casperlabs_node::{components::{Component, -//! # in_memory_network::{InMemoryNetwork, NetworkController, NodeId}}, -//! # effect::{EffectBuilder, EffectExt, Effects, -//! # announcements::NetworkAnnouncement, requests::NetworkRequest}, -//! # reactor::{self, EventQueueHandle, wrap_effects}, -//! # testing::network::{Network, NetworkedReactor}}; +//! # use casperlabs_node::{ +//! # components::{ +//! # in_memory_network::{InMemoryNetwork, NetworkController, NodeId}, +//! # Component, +//! # }, +//! # effect::{ +//! # announcements::NetworkAnnouncement, requests::NetworkRequest, EffectBuilder, EffectExt, +//! # Effects, +//! # }, +//! # reactor::{self, wrap_effects, EventQueueHandle}, +//! # testing::network::{Network, NetworkedReactor}, +//! # }; //! # //! # let mut runtime = tokio::runtime::Runtime::new().unwrap(); //! # @@ -199,11 +209,11 @@ //! } //! //! impl NetworkedReactor for Reactor { -//! type NodeId = NodeId; +//! type NodeId = NodeId; //! -//! fn node_id(&self) -> NodeId { -//! self.net.node_id() -//! } +//! fn node_id(&self) -> NodeId { +//! self.net.node_id() +//! } //! } //! //! // We can finally run the tests: @@ -268,12 +278,12 @@ use std::{ any::Any, + cell::RefCell, collections::{HashMap, HashSet}, fmt::Display, - sync::{Arc, Mutex, RwLock}, + sync::{Arc, RwLock}, }; -use lazy_static::lazy_static; use rand::{seq::IteratorRandom, Rng}; use tokio::sync::mpsc::{self, error::SendError}; use tracing::{debug, error, info, warn}; @@ -284,19 +294,21 @@ use crate::{ announcements::NetworkAnnouncement, requests::NetworkRequest, EffectBuilder, EffectExt, Effects, }, + logging, reactor::{EventQueueHandle, QueueKind}, + tls::KeyFingerprint, }; -type Network

= Arc>>>; - /// The node ID type used by the in-memory network. -pub type NodeId = u64; +pub type NodeId = KeyFingerprint; + +type Network

= Arc>>>; -lazy_static! { +thread_local! { /// The currently active network as a thread local. /// /// The type is dynamic, every network can be of a distinct type when the payload `P` differs. - static ref ACTIVE_NETWORK: Mutex>> = Mutex::new(None); + static ACTIVE_NETWORK: RefCell>> = RefCell::new(None); } /// The network controller is used to control the network topology (e.g. adding and removing nodes). @@ -312,6 +324,7 @@ where { /// Create a new, empty network. fn new() -> Self { + let _ = logging::init(); NetworkController { nodes: Default::default(), } @@ -323,10 +336,9 @@ where /// /// Panics if the internal lock has been poisoned. pub fn create_active() { + let _ = logging::init(); ACTIVE_NETWORK - .lock() - .expect("active network lock has been poisoned") - .replace(Box::new(Self::new())); + .with(|active_network| active_network.borrow_mut().replace(Box::new(Self::new()))); } /// Removes the active network. @@ -337,12 +349,13 @@ where /// removed or if there was no network at at all. pub fn remove_active() { assert!( - ACTIVE_NETWORK - .lock() - .expect("active network lock has been poisoned") - .take() - .expect("tried to remove non-existant network") - .is::(), + ACTIVE_NETWORK.with(|active_network| { + active_network + .borrow_mut() + .take() + .expect("tried to remove non-existent network") + .is::() + }), "removed network was of wrong type" ); } @@ -361,14 +374,36 @@ where R: Rng + ?Sized, REv: From> + Send, { - ACTIVE_NETWORK - .lock() - .expect("active network lock has been poisoned") - .as_mut() - .expect("tried to create node without active network set") - .downcast_mut::() - .expect("active network has wrong message type") - .create_node_local(event_queue, rng) + ACTIVE_NETWORK.with(|active_network| { + active_network + .borrow_mut() + .as_mut() + .expect("tried to create node without active network set") + .downcast_mut::() + .expect("active network has wrong message type") + .create_node_local(event_queue, rng) + }) + } + + /// Removes an in-memory network component on the active network. + /// + /// # Panics + /// + /// Panics if the internal lock has been poisoned, the active network is not of the correct + /// message type, or the node to remove doesn't exist. + pub fn remove_node(node_id: &NodeId) { + ACTIVE_NETWORK.with(|active_network| { + if let Some(active_network) = active_network.borrow_mut().as_mut() { + active_network + .downcast_mut::() + .expect("active network has wrong message type") + .nodes + .write() + .expect("poisoned lock") + .remove(node_id) + .expect("node doesn't exist in network"); + } + }) } /// Creates a new networking node with a random node ID. @@ -437,6 +472,10 @@ where dest: NodeId, payload: P, ) { + if dest == self.node_id { + panic!("can't send message to self"); + } + match nodes.get(&dest) { Some(sender) => { if let Err(SendError((_, msg))) = sender.send((self.node_id, payload)) { @@ -445,7 +484,7 @@ where // We do nothing else, the message is just dropped. } } - None => info!(%dest, %payload, "dropping message to non-existant recipient"), + None => info!(%dest, %payload, "dropping message to non-existent recipient"), } } } @@ -468,6 +507,10 @@ where payload, responder, } => { + if dest == self.node_id { + panic!("can't send message to self"); + } + if let Ok(guard) = self.nodes.read() { self.send(&guard, dest, payload); } else { @@ -478,7 +521,7 @@ where } NetworkRequest::Broadcast { payload, responder } => { if let Ok(guard) = self.nodes.read() { - for dest in guard.keys() { + for dest in guard.keys().filter(|&node_id| node_id != &self.node_id) { self.send(&guard, *dest, payload.clone()); } } else { @@ -496,7 +539,7 @@ where if let Ok(guard) = self.nodes.read() { let chosen: HashSet<_> = guard .keys() - .filter(|k| !exclude.contains(k)) + .filter(|&node_id| !exclude.contains(node_id) && node_id != &self.node_id) .cloned() .choose_multiple(rng, count) .into_iter() diff --git a/node/src/components/small_network.rs b/node/src/components/small_network.rs index 6f7e68d9a7..b3114014f5 100644 --- a/node/src/components/small_network.rs +++ b/node/src/components/small_network.rs @@ -44,7 +44,7 @@ mod error; mod event; mod message; #[cfg(test)] -mod test; +mod tests; use std::{ collections::{HashMap, HashSet}, diff --git a/node/src/components/small_network/test.rs b/node/src/components/small_network/tests.rs similarity index 86% rename from node/src/components/small_network/test.rs rename to node/src/components/small_network/tests.rs index b0cfeab588..5f83a3d43e 100644 --- a/node/src/components/small_network/test.rs +++ b/node/src/components/small_network/tests.rs @@ -6,11 +6,10 @@ use std::{ collections::{HashMap, HashSet}, fmt::{self, Debug, Display, Formatter}, - time::Duration, + time::{Duration, Instant}, }; use derive_more::From; -use futures::Future; use serde::{Deserialize, Serialize}; use small_network::NodeId; @@ -25,7 +24,6 @@ use crate::{ use pnet::datalink; use rand::{rngs::OsRng, Rng}; use reactor::{wrap_effects, Finalize}; -use tokio::time::{timeout, Timeout}; use tracing::{debug, info}; /// The networking port used by the tests for the root node. @@ -155,23 +153,6 @@ fn network_is_complete(nodes: &HashMap>) -> bool { .all(|actual| actual == expected) } -/// Helper trait to annotate timeouts more naturally. -trait Within { - /// Sets a timeout on a future. - /// - /// If the timeout occurs, the annotated future will be **cancelled**. Use with caution. - fn within(self, duration: Duration) -> Timeout; -} - -impl Within for T -where - T: Future, -{ - fn within(self, duration: Duration) -> Timeout { - timeout(duration, self) - } -} - fn gen_config(bind_port: u16) -> small_network::Config { // Bind everything to localhost. let bind_interface = "127.0.0.1".parse().unwrap(); @@ -203,29 +184,32 @@ async fn run_two_node_network_five_times() { let mut net = Network::new(); - let start = std::time::Instant::now(); + let start = Instant::now(); net.add_node_with_config(gen_config(TEST_ROOT_NODE_PORT), &mut rng) .await .unwrap(); net.add_node_with_config(gen_config(TEST_ROOT_NODE_PORT + 1), &mut rng) .await .unwrap(); - let end = std::time::Instant::now(); + let end = Instant::now(); debug!( total_time_ms = (end - start).as_millis() as u64, "finished setting up networking nodes" ); - net.settle_on(&mut rng, network_is_complete) - .within(Duration::from_millis(1000)) - .await - .expect("network did not fully connect in time"); + let timeout = Duration::from_secs(1); + assert!( + net.settle_on(&mut rng, network_is_complete, timeout).await, + "network did not fully connect in time" + ); - net.settle(&mut rng, Duration::from_millis(25)) - .within(Duration::from_millis(2000)) - .await - .expect("network did not stay settled"); + let quiet_for = Duration::from_millis(25); + let timeout = Duration::from_secs(2); + assert!( + net.settle(&mut rng, quiet_for, timeout).await, + "network did not stay settled" + ); assert!( network_is_complete(net.nodes()), @@ -273,5 +257,7 @@ async fn bind_to_real_network_interface() { .await .unwrap(); - net.settle(&mut rng, Duration::from_millis(250)).await; + let quiet_for = Duration::from_millis(250); + let timeout = Duration::from_secs(2); + assert!(net.settle(&mut rng, quiet_for, timeout).await); } diff --git a/node/src/components/storage/config.rs b/node/src/components/storage/config.rs index 96863ddc84..7a30a56e5b 100644 --- a/node/src/components/storage/config.rs +++ b/node/src/components/storage/config.rs @@ -2,6 +2,7 @@ use std::{env, path::PathBuf}; use directories::ProjectDirs; use serde::{Deserialize, Serialize}; +#[cfg(test)] use tempfile::TempDir; use tracing::warn; @@ -15,6 +16,7 @@ const DEFAULT_MAX_BLOCK_STORE_SIZE: usize = 483_183_820_800; // 450 GiB const DEFAULT_MAX_DEPLOY_STORE_SIZE: usize = 322_122_547_200; // 300 GiB const DEFAULT_MAX_CHAINSPEC_STORE_SIZE: usize = 1_073_741_824; // 1 GiB +#[cfg(test)] const DEFAULT_TEST_MAX_DB_SIZE: usize = 52_428_800; // 50 MiB /// On-disk storage configuration. @@ -55,7 +57,7 @@ pub struct Config { impl Config { /// Returns a default `Config` suitable for tests, along with a `TempDir` which must be kept /// alive for the duration of the test since its destructor removes the dir from the filesystem. - #[allow(unused)] + #[cfg(test)] pub(crate) fn default_for_tests() -> (Self, TempDir) { let tempdir = tempfile::tempdir().expect("should get tempdir"); let path = Some(tempdir.path().to_path_buf()); diff --git a/node/src/lib.rs b/node/src/lib.rs index 4e4494c7b1..ae234da20b 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -26,6 +26,7 @@ pub mod crypto; pub mod effect; pub mod logging; pub mod reactor; +#[cfg(test)] pub mod testing; pub mod tls; pub mod types; diff --git a/node/src/logging.rs b/node/src/logging.rs index 2c7ed2e00f..a911969b7b 100644 --- a/node/src/logging.rs +++ b/node/src/logging.rs @@ -123,7 +123,7 @@ pub fn init() -> anyhow::Result<()> { // Setup a new tracing-subscriber writing to `stderr` for logging. tracing::subscriber::set_global_default( tracing_subscriber::fmt() - .with_writer(io::stderr) + .with_writer(io::stdout) .with_env_filter(EnvFilter::from_default_env()) .fmt_fields(formatter) .event_format(FmtEvent {}) diff --git a/node/src/reactor.rs b/node/src/reactor.rs index 5b5eb391ec..83f2c46a24 100644 --- a/node/src/reactor.rs +++ b/node/src/reactor.rs @@ -245,6 +245,20 @@ where utils::leak(Scheduler::new(QueueKind::weights())) } + #[cfg(test)] + pub(crate) async fn inject_event(&mut self, rng: &mut Rd, event: R::Event) { + let event_queue = EventQueueHandle::new(self.scheduler); + let effect_builder = EffectBuilder::new(event_queue); + + let effects = self.reactor.dispatch_event(effect_builder, rng, event); + + let effect_span = tracing::debug_span!("process injected effects", ev = self.event_count); + + process_effects(self.scheduler, effects) + .instrument(effect_span) + .await; + } + /// Processes a single event on the event queue. #[inline] pub async fn crank(&mut self, rng: &mut Rd) { diff --git a/node/src/testing/network.rs b/node/src/testing/network.rs index b9fd8f5458..46a902cf60 100644 --- a/node/src/testing/network.rs +++ b/node/src/testing/network.rs @@ -9,6 +9,7 @@ use std::{ use futures::future::{BoxFuture, FutureExt}; use rand::Rng; +use tokio::time; use tracing::{debug, field}; use tracing_futures::Instrument; @@ -93,13 +94,30 @@ where Ok((node_id, node_ref)) } + /// Removes a node from the network. + pub fn remove_node(&mut self, node_id: &R::NodeId) -> Option> { + self.nodes.remove(node_id) + } + + /// Crank the specified runner once, returning the number of events processed. + pub async fn crank(&mut self, node_id: &R::NodeId, rng: &mut Rd) -> usize { + let runner = self.nodes.get_mut(node_id).unwrap(); + + let node_id = runner.reactor().node_id(); + let span = tracing::error_span!("crank", node_id = %node_id); + if runner.try_crank(rng).instrument(span).await.is_some() { + 1 + } else { + 0 + } + } + /// Crank all runners once, returning the number of events processed. pub async fn crank_all(&mut self, rng: &mut Rd) -> usize { let mut event_count = 0; for node in self.nodes.values_mut() { let node_id = node.reactor().node_id(); - let span = tracing::error_span!("crank", node_id = field::Empty); - span.record("node_id", &field::display(node_id)); + let span = tracing::error_span!("crank", node_id = %node_id); event_count += if node.try_crank(rng).instrument(span).await.is_some() { 1 } else { @@ -112,18 +130,30 @@ where /// Process events on all nodes until all event queues are empty. /// - /// Exits if `at_least` time has passed twice between events that have been processed. - pub async fn settle(&mut self, rng: &mut Rd, at_least: Duration) { + /// Returns `true` if `quiet_for` time has passed with no new events processed within the + /// specified timeout. + pub async fn settle( + &mut self, + rng: &mut Rd, + quiet_for: Duration, + within: Duration, + ) -> bool { + time::timeout(within, self.settle_indefinitely(rng, quiet_for)) + .await + .is_ok() + } + + async fn settle_indefinitely(&mut self, rng: &mut Rd, quiet_for: Duration) { let mut no_events = false; loop { if self.crank_all(rng).await == 0 { - // Stop once we have no pending events and haven't had any for `at_least` duration. + // Stop once we have no pending events and haven't had any for `quiet_for` time. if no_events { - debug!(?at_least, "network has settled after"); + debug!("network has been quiet for {:?}", quiet_for); break; } else { no_events = true; - tokio::time::delay_for(at_least).await; + time::delay_for(quiet_for).await; } } else { no_events = false; @@ -131,22 +161,33 @@ where } } - /// Runs the main loop of every reactor until a condition is true. - pub async fn settle_on(&mut self, rng: &mut Rd, f: F) + /// Runs the main loop of every reactor until `condition` is true or until `within` has elapsed. + /// + /// Returns `true` if `condition` has been met within the specified timeout. + pub async fn settle_on(&mut self, rng: &mut Rd, condition: F, within: Duration) -> bool + where + Rd: Rng + ?Sized, + F: Fn(&HashMap>) -> bool, + { + time::timeout(within, self.settle_on_indefinitely(rng, condition)) + .await + .is_ok() + } + + async fn settle_on_indefinitely(&mut self, rng: &mut Rd, condition: F) where Rd: Rng + ?Sized, F: Fn(&HashMap>) -> bool, { loop { - // Check condition. - if f(&self.nodes) { - debug!("network settled"); + if condition(&self.nodes) { + debug!("network settled on meeting condition"); break; } if self.crank_all(rng).await == 0 { // No events processed, wait for a bit to avoid 100% cpu usage. - tokio::time::delay_for(POLL_INTERVAL).await; + time::delay_for(POLL_INTERVAL).await; } } } @@ -155,6 +196,20 @@ where pub fn nodes(&self) -> &HashMap> { &self.nodes } + + /// Dispatch the given event on the given node. + pub async fn inject_event_on( + &mut self, + node_id: &R::NodeId, + rng: &mut Rd, + event: R::Event, + ) { + let runner = self.nodes.get_mut(node_id).unwrap(); + let node_id = runner.reactor().node_id(); + let span = tracing::error_span!("inject", node_id = field::Empty); + span.record("node_id", &field::display(node_id)); + runner.inject_event(rng, event).instrument(span).await + } } impl Finalize for Network diff --git a/node/src/types/deploy.rs b/node/src/types/deploy.rs index f8325e6962..8140fcdda0 100644 --- a/node/src/types/deploy.rs +++ b/node/src/types/deploy.rs @@ -5,9 +5,19 @@ use std::{ }; use hex::FromHexError; +#[cfg(test)] +use rand::{ + distributions::{Alphanumeric, Distribution, Standard}, + Rng, +}; use serde::{Deserialize, Serialize}; use thiserror::Error; +#[cfg(test)] +use crate::crypto::{ + asymmetric_key::{self, SecretKey}, + hash, +}; use crate::{ components::{ contract_runtime::core::engine_state::executable_deploy_item::ExecutableDeployItem, @@ -206,6 +216,73 @@ impl Display for Deploy { } } +#[cfg(test)] +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> Deploy { + let seed: u64 = rng.gen(); + let hash = DeployHash::new(hash::hash(seed.to_le_bytes())); + + let secret_key = SecretKey::generate_ed25519(); + let account = PublicKey::from(&secret_key); + + let timestamp = seed + 100; + let gas_price = seed + 101; + let body_hash = hash::hash(seed.overflowing_add(102).0.to_le_bytes()); + let ttl_millis = seed as u32 + 103; + + let dependencies = vec![ + DeployHash::new(hash::hash(seed.overflowing_add(104).0.to_le_bytes())), + DeployHash::new(hash::hash(seed.overflowing_add(105).0.to_le_bytes())), + DeployHash::new(hash::hash(seed.overflowing_add(106).0.to_le_bytes())), + ]; + + let chain_name = std::iter::repeat_with(|| rng.sample(Alphanumeric)) + .take(10) + .collect(); + + let header = DeployHeader { + account, + timestamp, + gas_price, + body_hash, + ttl_millis, + dependencies, + chain_name, + }; + + let payment = ExecutableDeployItem::ModuleBytes { + module_bytes: hash::hash(seed.overflowing_add(107).0.to_le_bytes()) + .as_ref() + .to_vec(), + args: hash::hash(seed.overflowing_add(108).0.to_le_bytes()) + .as_ref() + .to_vec(), + }; + let session = ExecutableDeployItem::ModuleBytes { + module_bytes: hash::hash(seed.overflowing_add(109).0.to_le_bytes()) + .as_ref() + .to_vec(), + args: hash::hash(seed.overflowing_add(1110).0.to_le_bytes()) + .as_ref() + .to_vec(), + }; + + let approvals = vec![ + asymmetric_key::sign(&[3], &secret_key, &account), + asymmetric_key::sign(&[4], &secret_key, &account), + asymmetric_key::sign(&[5], &secret_key, &account), + ]; + + Deploy { + hash, + header, + payment, + session, + approvals, + } + } +} + /// This module provides structs which map to the main deploy types, but which are suitable for /// encoding to and decoding from JSON. For all fields with binary data, this is converted to/from /// hex strings. diff --git a/node/src/utils/gossip_table.rs b/node/src/utils/gossip_table.rs index fefdbc5077..ad43c9998e 100644 --- a/node/src/utils/gossip_table.rs +++ b/node/src/utils/gossip_table.rs @@ -115,6 +115,11 @@ impl Config { }) } + #[cfg(test)] + pub(crate) fn infection_target(&self) -> u8 { + self.infection_target + } + pub(crate) fn gossip_request_timeout_secs(&self) -> u64 { self.gossip_request_timeout_secs } From ddb316b084ff46da829f095c4a90a0c758e410a7 Mon Sep 17 00:00:00 2001 From: Fraser Hutchison Date: Wed, 15 Jul 2020 11:49:24 +0100 Subject: [PATCH 2/7] NDRS-119: remove stray commented line --- node/src/components/deploy_gossiper/tests.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/node/src/components/deploy_gossiper/tests.rs b/node/src/components/deploy_gossiper/tests.rs index a0900c374b..a2c606b432 100644 --- a/node/src/components/deploy_gossiper/tests.rs +++ b/node/src/components/deploy_gossiper/tests.rs @@ -40,8 +40,6 @@ enum Event { NetworkAnnouncement(NetworkAnnouncement), } -// From> - impl Display for Event { fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { Debug::fmt(self, formatter) From 259c921d3d73279800d00af1fb42094793ab80e2 Mon Sep 17 00:00:00 2001 From: Fraser Hutchison Date: Thu, 16 Jul 2020 00:01:56 +0100 Subject: [PATCH 3/7] NDRS-119: refactor Runner::inject_event_on to make it less prone to accidental abuse --- node/src/components/deploy_gossiper/tests.rs | 88 +++++++++++--------- node/src/reactor.rs | 10 ++- node/src/reactor/initializer.rs | 2 +- node/src/testing/network.rs | 45 +++++++--- 4 files changed, 90 insertions(+), 55 deletions(-) diff --git a/node/src/components/deploy_gossiper/tests.rs b/node/src/components/deploy_gossiper/tests.rs index a2c606b432..b182b784f2 100644 --- a/node/src/components/deploy_gossiper/tests.rs +++ b/node/src/components/deploy_gossiper/tests.rs @@ -16,7 +16,7 @@ use crate::{ in_memory_network::{InMemoryNetwork, NetworkController, NodeId}, storage::{self, Storage, StorageType}, }, - effect::announcements::NetworkAnnouncement, + effect::announcements::{ApiServerAnnouncement, NetworkAnnouncement}, reactor::{self, EventQueueHandle, Reactor as ReactorTrait, Runner}, testing::network::{Network, NetworkedReactor}, types::Deploy, @@ -38,11 +38,22 @@ enum Event { /// Network announcement. #[from] NetworkAnnouncement(NetworkAnnouncement), + /// API server announcement. + #[from] + ApiServerAnnouncement(ApiServerAnnouncement), } impl Display for Event { fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { - Debug::fmt(self, formatter) + match self { + Event::Storage(event) => write!(formatter, "storage: {}", event), + Event::DeployGossiper(event) => write!(formatter, "deploy gossiper: {}", event), + Event::NetworkRequest(req) => write!(formatter, "network request: {}", req), + Event::NetworkAnnouncement(ann) => write!(formatter, "network announcement: {}", ann), + Event::ApiServerAnnouncement(ann) => { + write!(formatter, "api server announcement: {}", ann) + } + } } } @@ -123,6 +134,10 @@ impl ReactorTrait for Reactor { .handle_event(effect_builder, rng, event), ) } + Event::ApiServerAnnouncement(ApiServerAnnouncement::DeployReceived { deploy }) => { + let event = super::Event::DeployReceived { deploy }; + self.dispatch_event(effect_builder, rng, Event::DeployGossiper(event)) + } } } } @@ -135,6 +150,12 @@ impl NetworkedReactor for Reactor { } } +fn create_deploy_received( + deploy: Box, +) -> impl FnOnce(EffectBuilder) -> Effects { + |effect_builder: EffectBuilder| effect_builder.announce_deploy_received(deploy).ignore() +} + #[tokio::test] async fn should_gossip() { const NETWORK_SIZE_MIN: usize = 2; @@ -148,12 +169,8 @@ async fn should_gossip() { let mut rng = rand::thread_rng(); // Add `network_size` nodes. - let network_size: usize = rng.gen_range(NETWORK_SIZE_MIN, NETWORK_SIZE_MAX + 1); - let mut node_ids = vec![]; - for _ in 0..network_size { - let (node_id, _runner) = network.add_node(&mut rng).await.unwrap(); - node_ids.push(node_id); - } + let network_size = rng.gen_range(NETWORK_SIZE_MIN, NETWORK_SIZE_MAX + 1); + let node_ids = network.add_nodes(&mut rng, network_size).await; // Create `deploy_count` random deploys. let deploy_count: usize = rng.gen_range(DEPLOY_COUNT_MIN, DEPLOY_COUNT_MAX + 1); @@ -166,10 +183,9 @@ async fn should_gossip() { // Give each deploy to a randomly-chosen node to be gossiped. for deploy in deploys.drain(..) { - let event = Event::DeployGossiper(super::Event::DeployReceived { deploy }); let index: usize = rng.gen_range(0, network_size); network - .inject_event_on(&node_ids[index], &mut rng, event) + .process_injected_effect_on(&node_ids[index], create_deploy_received(deploy)) .await; } @@ -203,11 +219,7 @@ async fn should_get_from_alternate_source() { let mut rng = rand::thread_rng(); // Add `NETWORK_SIZE` nodes. - let mut node_ids = vec![]; - for _ in 0..NETWORK_SIZE { - let (node_id, _runner) = network.add_node(&mut rng).await.unwrap(); - node_ids.push(node_id); - } + let node_ids = network.add_nodes(&mut rng, NETWORK_SIZE).await; // Create random deploy. let deploy = Box::new(rng.gen::()); @@ -215,19 +227,19 @@ async fn should_get_from_alternate_source() { // Give the deploy to nodes 0 and 1 to be gossiped. for node_id in node_ids.iter().take(2) { - let event = Event::DeployGossiper(super::Event::DeployReceived { - deploy: deploy.clone(), - }); - network.inject_event_on(&node_id, &mut rng, event).await; + network + .process_injected_effect_on(&node_id, create_deploy_received(deploy.clone())) + .await; } // Run node 0 until it has sent the gossip request then remove it from the network. This - // equates to three events: - // 1. Storage PutDeploy - // 2. DeployGossiper PutToStoreResult - // 3. NetworkRequest Gossip + // equates to four events: + // 1. ApiServe Announcement of new deploy + // 2. Storage PutDeploy + // 3. DeployGossiper PutToStoreResult + // 4. NetworkRequest Gossip let mut event_count = 0; - while event_count < 3 { + while event_count < 4 { event_count += network.crank(&node_ids[0], &mut rng).await; time::delay_for(POLL_DURATION).await; } @@ -282,30 +294,28 @@ async fn should_timeout_gossip_response() { let infection_target = GossipTableConfig::default().infection_target(); // Add `infection_target + 1` nodes. - let mut node_ids = vec![]; - for _ in 0..(infection_target + 1) { - let (node_id, _runner) = network.add_node(&mut rng).await.unwrap(); - node_ids.push(node_id); - } + let mut node_ids = network + .add_nodes(&mut rng, infection_target as usize + 1) + .await; // Create random deploy. let deploy = Box::new(rng.gen::()); let deploy_id = *deploy.id(); // Give the deploy to node 0 to be gossiped. - let event = Event::DeployGossiper(super::Event::DeployReceived { - deploy: deploy.clone(), - }); - network.inject_event_on(&node_ids[0], &mut rng, event).await; + network + .process_injected_effect_on(&node_ids[0], create_deploy_received(deploy.clone())) + .await; // Run node 0 until it has sent the gossip requests then remove it from the network. This - // equates to four events: - // 1. Storage PutDeploy - // 2. DeployGossiper PutToStoreResult - // 3. NetworkRequest Gossip - // 4. DeployGossiper GossipedTo + // equates to five events: + // 1. ApiServe Announcement of new deploy + // 2. Storage PutDeploy + // 3. DeployGossiper PutToStoreResult + // 4. NetworkRequest Gossip + // 5. DeployGossiper GossipedTo let mut event_count = 0; - while event_count < 4 { + while event_count < 5 { event_count += network.crank(&node_ids[0], &mut rng).await; time::delay_for(POLL_DURATION).await; } diff --git a/node/src/reactor.rs b/node/src/reactor.rs index 83f2c46a24..4d58a21462 100644 --- a/node/src/reactor.rs +++ b/node/src/reactor.rs @@ -245,15 +245,19 @@ where utils::leak(Scheduler::new(QueueKind::weights())) } + /// Inject (schedule then process) effects created via a call to `create_effects` which is + /// itself passed an instance of an `EffectBuilder`. #[cfg(test)] - pub(crate) async fn inject_event(&mut self, rng: &mut Rd, event: R::Event) { + pub(crate) async fn process_injected_effects(&mut self, create_effects: F) + where + F: FnOnce(EffectBuilder) -> Effects, + { let event_queue = EventQueueHandle::new(self.scheduler); let effect_builder = EffectBuilder::new(event_queue); - let effects = self.reactor.dispatch_event(effect_builder, rng, event); + let effects = create_effects(effect_builder); let effect_span = tracing::debug_span!("process injected effects", ev = self.event_count); - process_effects(self.scheduler, effects) .instrument(effect_span) .await; diff --git a/node/src/reactor/initializer.rs b/node/src/reactor/initializer.rs index b9dd9473cf..bf66d5f89e 100644 --- a/node/src/reactor/initializer.rs +++ b/node/src/reactor/initializer.rs @@ -72,7 +72,7 @@ pub enum Error { ContractRuntime(#[from] contract_runtime::ConfigError), } -/// Validator node reactor. +/// Initializer node reactor. #[derive(Debug)] pub struct Reactor { config: validator::Config, diff --git a/node/src/testing/network.rs b/node/src/testing/network.rs index 46a902cf60..4dbab5b6c6 100644 --- a/node/src/testing/network.rs +++ b/node/src/testing/network.rs @@ -10,10 +10,13 @@ use std::{ use futures::future::{BoxFuture, FutureExt}; use rand::Rng; use tokio::time; -use tracing::{debug, field}; +use tracing::debug; use tracing_futures::Instrument; -use crate::reactor::{Finalize, Reactor, Runner}; +use crate::{ + effect::{EffectBuilder, Effects}, + reactor::{Finalize, Reactor, Runner}, +}; /// A reactor with networking functionality. pub trait NetworkedReactor: Sized { @@ -42,6 +45,7 @@ impl Network where R: Reactor + NetworkedReactor, R::Config: Default, + ::Error: Debug, { /// Creates a new networking node on the network using the default root node port. /// @@ -55,6 +59,20 @@ where ) -> Result<(R::NodeId, &mut Runner), R::Error> { self.add_node_with_config(Default::default(), rng).await } + + /// Adds `count` new nodes to the network, and returns their IDs. + pub async fn add_nodes( + &mut self, + rng: &mut Rd, + count: usize, + ) -> Vec { + let mut node_ids = vec![]; + for _ in 0..count { + let (node_id, _runner) = self.add_node(rng).await.unwrap(); + node_ids.push(node_id); + } + node_ids + } } impl Network @@ -197,18 +215,21 @@ where &self.nodes } - /// Dispatch the given event on the given node. - pub async fn inject_event_on( - &mut self, - node_id: &R::NodeId, - rng: &mut Rd, - event: R::Event, - ) { + /// Create effects and dispatch them on the given node. + /// + /// The effects are created via a call to `create_effects` which is itself passed an instance of + /// an `EffectBuilder`. + pub async fn process_injected_effect_on(&mut self, node_id: &R::NodeId, create_effects: F) + where + F: FnOnce(EffectBuilder) -> Effects, + { let runner = self.nodes.get_mut(node_id).unwrap(); let node_id = runner.reactor().node_id(); - let span = tracing::error_span!("inject", node_id = field::Empty); - span.record("node_id", &field::display(node_id)); - runner.inject_event(rng, event).instrument(span).await + let span = tracing::error_span!("inject", node_id = %node_id); + runner + .process_injected_effects(create_effects) + .instrument(span) + .await } } From 12412964d28fff89e63e43b360fa8ef89f84f99d Mon Sep 17 00:00:00 2001 From: Fraser Hutchison Date: Thu, 16 Jul 2020 04:53:45 +0100 Subject: [PATCH 4/7] NDRS-119: added 'crank_until' to test network using newtype reactor with hook to dispatched events --- node/src/components/deploy_gossiper/tests.rs | 95 +++++++++------- node/src/components/small_network/tests.rs | 13 ++- node/src/testing.rs | 3 + node/src/testing/condition_check_reactor.rs | 112 +++++++++++++++++++ node/src/testing/network.rs | 96 ++++++++++++---- 5 files changed, 253 insertions(+), 66 deletions(-) create mode 100644 node/src/testing/condition_check_reactor.rs diff --git a/node/src/components/deploy_gossiper/tests.rs b/node/src/components/deploy_gossiper/tests.rs index b182b784f2..8b4ac517d0 100644 --- a/node/src/components/deploy_gossiper/tests.rs +++ b/node/src/components/deploy_gossiper/tests.rs @@ -17,8 +17,11 @@ use crate::{ storage::{self, Storage, StorageType}, }, effect::announcements::{ApiServerAnnouncement, NetworkAnnouncement}, - reactor::{self, EventQueueHandle, Reactor as ReactorTrait, Runner}, - testing::network::{Network, NetworkedReactor}, + reactor::{self, EventQueueHandle, Runner}, + testing::{ + network::{Network, NetworkedReactor}, + ConditionCheckReactor, + }, types::Deploy, }; @@ -57,7 +60,6 @@ impl Display for Event { } } -#[derive(Debug)] struct Reactor { network: InMemoryNetwork, storage: Storage, @@ -71,7 +73,7 @@ impl Drop for Reactor { } } -impl ReactorTrait for Reactor { +impl reactor::Reactor for Reactor { type Event = Event; type Config = GossipTableConfig; type Error = storage::Error; @@ -190,10 +192,11 @@ async fn should_gossip() { } // Check every node has every deploy stored locally. - let all_deploys_held = |nodes: &HashMap>| { + let all_deploys_held = |nodes: &HashMap>>| { nodes.values().all(|runner| { let hashes = runner .reactor() + .inner() .storage .deploy_store() .ids() @@ -232,28 +235,38 @@ async fn should_get_from_alternate_source() { .await; } - // Run node 0 until it has sent the gossip request then remove it from the network. This - // equates to four events: - // 1. ApiServe Announcement of new deploy - // 2. Storage PutDeploy - // 3. DeployGossiper PutToStoreResult - // 4. NetworkRequest Gossip - let mut event_count = 0; - while event_count < 4 { - event_count += network.crank(&node_ids[0], &mut rng).await; - time::delay_for(POLL_DURATION).await; - } + // Run node 0 until it has sent the gossip request then remove it from the network. + let made_gossip_request = |event: &Event| -> bool { + match event { + Event::NetworkRequest(NetworkRequest::Gossip { .. }) => true, + _ => false, + } + }; + assert!( + network + .crank_until(&node_ids[0], &mut rng, made_gossip_request, TIMEOUT) + .await + ); assert!(network.remove_node(&node_ids[0]).is_some()); debug!("removed node {}", &node_ids[0]); - // Run node 2 until it receives the gossip request from node 0. This equates to two events: - // 1. NetworkAnnouncement MessageReceived of node 0's gossip message - // 2. NetworkRequest SendMessage gossip response. - event_count = 0; - while event_count < 2 { - event_count += network.crank(&node_ids[2], &mut rng).await; - time::delay_for(POLL_DURATION).await; - } + // Run node 2 until it receives and responds to the gossip request from node 0. + let node_id_0 = node_ids[0]; + let sent_gossip_response = move |event: &Event| -> bool { + match event { + Event::NetworkRequest(NetworkRequest::SendMessage { + dest, + payload: Message::GossipResponse { .. }, + .. + }) => dest == &node_id_0, + _ => false, + } + }; + assert!( + network + .crank_until(&node_ids[2], &mut rng, sent_gossip_response, TIMEOUT) + .await + ); // Run nodes 1 and 2 until settled. Node 2 will be waiting for the deploy from node 0. assert!(network.settle(&mut rng, POLL_DURATION, TIMEOUT).await); @@ -266,10 +279,11 @@ async fn should_get_from_alternate_source() { debug!("advanced time by {} secs", secs_to_advance); // Check node 0 has the deploy stored locally. - let deploy_held = |nodes: &HashMap>| { + let deploy_held = |nodes: &HashMap>>| { let runner = nodes.get(&node_ids[2]).unwrap(); runner .reactor() + .inner() .storage .deploy_store() .get(&deploy_id) @@ -283,7 +297,7 @@ async fn should_get_from_alternate_source() { #[tokio::test] async fn should_timeout_gossip_response() { - const POLL_DURATION: Duration = Duration::from_millis(10); + const PAUSE_DURATION: Duration = Duration::from_millis(50); const TIMEOUT: Duration = Duration::from_secs(2); NetworkController::::create_active(); @@ -307,18 +321,20 @@ async fn should_timeout_gossip_response() { .process_injected_effect_on(&node_ids[0], create_deploy_received(deploy.clone())) .await; - // Run node 0 until it has sent the gossip requests then remove it from the network. This - // equates to five events: - // 1. ApiServe Announcement of new deploy - // 2. Storage PutDeploy - // 3. DeployGossiper PutToStoreResult - // 4. NetworkRequest Gossip - // 5. DeployGossiper GossipedTo - let mut event_count = 0; - while event_count < 5 { - event_count += network.crank(&node_ids[0], &mut rng).await; - time::delay_for(POLL_DURATION).await; - } + // Run node 0 until it has sent the gossip requests. + let made_gossip_request = |event: &Event| -> bool { + match event { + Event::DeployGossiper(super::Event::GossipedTo { .. }) => true, + _ => false, + } + }; + assert!( + network + .crank_until(&node_ids[0], &mut rng, made_gossip_request, TIMEOUT) + .await + ); + // Give node 0 time to set the timeouts before advancing the clock. + time::delay_for(PAUSE_DURATION).await; // Replace all nodes except node 0 with new nodes. for node_id in node_ids.drain(1..) { @@ -338,10 +354,11 @@ async fn should_timeout_gossip_response() { debug!("advanced time by {} secs", secs_to_advance); // Check every node has every deploy stored locally. - let deploy_held = |nodes: &HashMap>| { + let deploy_held = |nodes: &HashMap>>| { nodes.values().all(|runner| { runner .reactor() + .inner() .storage .deploy_store() .get(&deploy_id) diff --git a/node/src/components/small_network/tests.rs b/node/src/components/small_network/tests.rs index 5f83a3d43e..f993c90c96 100644 --- a/node/src/components/small_network/tests.rs +++ b/node/src/components/small_network/tests.rs @@ -19,7 +19,10 @@ use crate::{ logging, reactor::{self, EventQueueHandle, Reactor, Runner}, small_network::{self, SmallNetwork}, - testing::network::{Network, NetworkedReactor}, + testing::{ + network::{Network, NetworkedReactor}, + ConditionCheckReactor, + }, }; use pnet::datalink; use rand::{rngs::OsRng, Rng}; @@ -127,7 +130,9 @@ fn init_logging() { } /// Checks whether or not a given network is completely connected. -fn network_is_complete(nodes: &HashMap>) -> bool { +fn network_is_complete( + nodes: &HashMap>>, +) -> bool { // We need at least one node. if nodes.is_empty() { return false; @@ -135,13 +140,13 @@ fn network_is_complete(nodes: &HashMap>) -> bool { let expected: HashSet<_> = nodes .iter() - .map(|(_, runner)| runner.reactor().net.node_id()) + .map(|(_, runner)| runner.reactor().inner().net.node_id()) .collect(); nodes .iter() .map(|(_, runner)| { - let net = &runner.reactor().net; + let net = &runner.reactor().inner().net; let mut actual = net.connected_nodes(); // All nodes should be connected to every other node, except itself, so we add it to the diff --git a/node/src/testing.rs b/node/src/testing.rs index 296854eeb4..6cd28eb8cb 100644 --- a/node/src/testing.rs +++ b/node/src/testing.rs @@ -3,4 +3,7 @@ //! Contains various parts and components to aid writing tests and simulations using the //! `casperlabs-node` library. +mod condition_check_reactor; pub mod network; + +pub(crate) use condition_check_reactor::ConditionCheckReactor; diff --git a/node/src/testing/condition_check_reactor.rs b/node/src/testing/condition_check_reactor.rs new file mode 100644 index 0000000000..9237268e07 --- /dev/null +++ b/node/src/testing/condition_check_reactor.rs @@ -0,0 +1,112 @@ +use std::fmt::{self, Debug, Formatter}; + +use futures::future::BoxFuture; +use rand::Rng; + +use super::network::NetworkedReactor; +use crate::{ + effect::{EffectBuilder, Effects}, + reactor::{EventQueueHandle, Finalize, Reactor}, +}; + +/// A reactor wrapping an inner reactor, and which has an optional hook into +/// `Reactor::dispatch_event()`. +/// +/// While the hook is not `None`, it's called on every call to `dispatch_event()`, taking a +/// reference to the current `Event`, and setting a boolean result to true when the condition has +/// been met. +/// +/// Once the condition is met, the hook is reset to `None`. +pub struct ConditionCheckReactor { + reactor: R, + condition_checker: Option bool + Send>>, + condition_result: bool, +} + +impl ConditionCheckReactor { + /// Sets the condition checker hook. + pub fn set_condition_checker( + &mut self, + condition_checker: Box bool + Send>, + ) { + self.condition_checker = Some(condition_checker); + } + + /// Returns the result of the last execution of the condition checker hook. + pub fn condition_result(&self) -> bool { + self.condition_result + } + + /// Returns a reference to the wrapped reactor. + pub fn inner(&self) -> &R { + &self.reactor + } + + /// Returns a mutable reference to the wrapped reactor. + pub fn inner_mut(&mut self) -> &mut R { + &mut self.reactor + } +} + +impl Reactor for ConditionCheckReactor { + type Event = R::Event; + type Config = R::Config; + type Error = R::Error; + + fn new( + config: Self::Config, + event_queue: EventQueueHandle, + rng: &mut RNG, + ) -> Result<(Self, Effects), Self::Error> { + let (reactor, effects) = R::new(config, event_queue, rng)?; + Ok(( + Self { + reactor, + condition_checker: None, + condition_result: false, + }, + effects, + )) + } + + fn dispatch_event( + &mut self, + effect_builder: EffectBuilder, + rng: &mut RNG, + event: Self::Event, + ) -> Effects { + self.condition_result = self + .condition_checker + .as_ref() + .map(|condition_checker| condition_checker(&event)) + .unwrap_or_default(); + if self.condition_result { + self.condition_checker = None; + } + self.reactor.dispatch_event(effect_builder, rng, event) + } +} + +impl Finalize for ConditionCheckReactor { + fn finalize(self) -> BoxFuture<'static, ()> { + self.reactor.finalize() + } +} + +impl NetworkedReactor for ConditionCheckReactor { + type NodeId = R::NodeId; + + fn node_id(&self) -> Self::NodeId { + self.reactor.node_id() + } +} + +impl Debug for ConditionCheckReactor { + fn fmt(&self, formatter: &mut Formatter) -> fmt::Result { + formatter + .debug_struct("ConditionCheckReactor") + .field("reactor", &self.reactor) + .field("condition_check_result", &self.condition_result) + .finish() + } +} diff --git a/node/src/testing/network.rs b/node/src/testing/network.rs index 4dbab5b6c6..a13206a0c3 100644 --- a/node/src/testing/network.rs +++ b/node/src/testing/network.rs @@ -13,6 +13,7 @@ use tokio::time; use tracing::debug; use tracing_futures::Instrument; +use super::ConditionCheckReactor; use crate::{ effect::{EffectBuilder, Effects}, reactor::{Finalize, Reactor, Runner}, @@ -38,7 +39,7 @@ const POLL_INTERVAL: Duration = Duration::from_millis(10); #[derive(Debug, Default)] pub struct Network { /// Current network. - nodes: HashMap<::NodeId, Runner>, + nodes: HashMap<::NodeId, Runner>>, } impl Network @@ -53,17 +54,17 @@ where /// /// Panics if a duplicate node ID is being inserted. This should only happen in case a randomly /// generated ID collides. - pub async fn add_node( + pub async fn add_node( &mut self, - rng: &mut Rd, - ) -> Result<(R::NodeId, &mut Runner), R::Error> { + rng: &mut RNG, + ) -> Result<(R::NodeId, &mut Runner>), R::Error> { self.add_node_with_config(Default::default(), rng).await } /// Adds `count` new nodes to the network, and returns their IDs. - pub async fn add_nodes( + pub async fn add_nodes( &mut self, - rng: &mut Rd, + rng: &mut RNG, count: usize, ) -> Vec { let mut node_ids = vec![]; @@ -91,12 +92,12 @@ where /// # Panics /// /// Panics if a duplicate node ID is being inserted. - pub async fn add_node_with_config( + pub async fn add_node_with_config( &mut self, cfg: R::Config, - rng: &mut Rd, - ) -> Result<(R::NodeId, &mut Runner), R::Error> { - let runner: Runner = Runner::new(cfg, rng).await?; + rng: &mut RNG, + ) -> Result<(R::NodeId, &mut Runner>), R::Error> { + let runner: Runner> = Runner::new(cfg, rng).await?; let node_id = runner.reactor().node_id(); @@ -113,12 +114,12 @@ where } /// Removes a node from the network. - pub fn remove_node(&mut self, node_id: &R::NodeId) -> Option> { + pub fn remove_node(&mut self, node_id: &R::NodeId) -> Option>> { self.nodes.remove(node_id) } /// Crank the specified runner once, returning the number of events processed. - pub async fn crank(&mut self, node_id: &R::NodeId, rng: &mut Rd) -> usize { + pub async fn crank(&mut self, node_id: &R::NodeId, rng: &mut RNG) -> usize { let runner = self.nodes.get_mut(node_id).unwrap(); let node_id = runner.reactor().node_id(); @@ -130,8 +131,57 @@ where } } + /// Crank only the specified runner until `condition` is true or until `within` has elapsed. + /// + /// Returns `true` if `condition` has been met within the specified timeout. + pub async fn crank_until( + &mut self, + node_id: &R::NodeId, + rng: &mut RNG, + condition: F, + within: Duration, + ) -> bool + where + RNG: Rng + ?Sized, + F: Fn(&R::Event) -> bool + Send + 'static, + { + self.nodes + .get_mut(node_id) + .unwrap() + .reactor_mut() + .set_condition_checker(Box::new(condition)); + + time::timeout(within, self.crank_and_check_indefinitely(node_id, rng)) + .await + .is_ok() + } + + async fn crank_and_check_indefinitely( + &mut self, + node_id: &R::NodeId, + rng: &mut RNG, + ) { + loop { + if self.crank(node_id, rng).await == 0 { + time::delay_for(POLL_INTERVAL).await; + continue; + } + + if self + .nodes + .get(node_id) + .unwrap() + .reactor() + .condition_result() + { + debug!("{} met condition", node_id); + return; + } + } + } + /// Crank all runners once, returning the number of events processed. - pub async fn crank_all(&mut self, rng: &mut Rd) -> usize { + pub async fn crank_all(&mut self, rng: &mut RNG) -> usize { let mut event_count = 0; for node in self.nodes.values_mut() { let node_id = node.reactor().node_id(); @@ -150,9 +200,9 @@ where /// /// Returns `true` if `quiet_for` time has passed with no new events processed within the /// specified timeout. - pub async fn settle( + pub async fn settle( &mut self, - rng: &mut Rd, + rng: &mut RNG, quiet_for: Duration, within: Duration, ) -> bool { @@ -161,7 +211,7 @@ where .is_ok() } - async fn settle_indefinitely(&mut self, rng: &mut Rd, quiet_for: Duration) { + async fn settle_indefinitely(&mut self, rng: &mut RNG, quiet_for: Duration) { let mut no_events = false; loop { if self.crank_all(rng).await == 0 { @@ -182,20 +232,20 @@ where /// Runs the main loop of every reactor until `condition` is true or until `within` has elapsed. /// /// Returns `true` if `condition` has been met within the specified timeout. - pub async fn settle_on(&mut self, rng: &mut Rd, condition: F, within: Duration) -> bool + pub async fn settle_on(&mut self, rng: &mut RNG, condition: F, within: Duration) -> bool where - Rd: Rng + ?Sized, - F: Fn(&HashMap>) -> bool, + RNG: Rng + ?Sized, + F: Fn(&HashMap>>) -> bool, { time::timeout(within, self.settle_on_indefinitely(rng, condition)) .await .is_ok() } - async fn settle_on_indefinitely(&mut self, rng: &mut Rd, condition: F) + async fn settle_on_indefinitely(&mut self, rng: &mut RNG, condition: F) where - Rd: Rng + ?Sized, - F: Fn(&HashMap>) -> bool, + RNG: Rng + ?Sized, + F: Fn(&HashMap>>) -> bool, { loop { if condition(&self.nodes) { @@ -211,7 +261,7 @@ where } /// Returns the internal map of nodes. - pub fn nodes(&self) -> &HashMap> { + pub fn nodes(&self) -> &HashMap>> { &self.nodes } From 0d3ad3a02852d7c27f18b84cee94cab2b641842f Mon Sep 17 00:00:00 2001 From: Fraser Hutchison Date: Thu, 16 Jul 2020 12:23:57 +0100 Subject: [PATCH 5/7] NDRS-119: change unwrap for expect --- node/src/testing/network.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/src/testing/network.rs b/node/src/testing/network.rs index a13206a0c3..a759053c92 100644 --- a/node/src/testing/network.rs +++ b/node/src/testing/network.rs @@ -120,7 +120,7 @@ where /// Crank the specified runner once, returning the number of events processed. pub async fn crank(&mut self, node_id: &R::NodeId, rng: &mut RNG) -> usize { - let runner = self.nodes.get_mut(node_id).unwrap(); + let runner = self.nodes.get_mut(node_id).expect("should find node"); let node_id = runner.reactor().node_id(); let span = tracing::error_span!("crank", node_id = %node_id); From 2e796dda166b42b634ea4f855cfd4a7e9c985428 Mon Sep 17 00:00:00 2001 From: Fraser Hutchison Date: Fri, 17 Jul 2020 18:42:07 +0100 Subject: [PATCH 6/7] NDRS-119: ensure test timeouts panic to avoid potentially invalidating state --- node/src/components/deploy_gossiper/tests.rs | 32 ++++++++------------ node/src/components/small_network/tests.rs | 12 ++------ node/src/testing/network.rs | 13 ++++---- 3 files changed, 22 insertions(+), 35 deletions(-) diff --git a/node/src/components/deploy_gossiper/tests.rs b/node/src/components/deploy_gossiper/tests.rs index c494451cbd..f677456a94 100644 --- a/node/src/components/deploy_gossiper/tests.rs +++ b/node/src/components/deploy_gossiper/tests.rs @@ -225,7 +225,7 @@ async fn should_gossip() { all_deploy_hashes == hashes }) }; - assert!(network.settle_on(&mut rng, all_deploys_held, TIMEOUT).await); + network.settle_on(&mut rng, all_deploys_held, TIMEOUT).await; NetworkController::::remove_active(); } @@ -261,11 +261,9 @@ async fn should_get_from_alternate_source() { _ => false, } }; - assert!( - network - .crank_until(&node_ids[0], &mut rng, made_gossip_request, TIMEOUT) - .await - ); + network + .crank_until(&node_ids[0], &mut rng, made_gossip_request, TIMEOUT) + .await; assert!(network.remove_node(&node_ids[0]).is_some()); debug!("removed node {}", &node_ids[0]); @@ -281,14 +279,12 @@ async fn should_get_from_alternate_source() { _ => false, } }; - assert!( - network - .crank_until(&node_ids[2], &mut rng, sent_gossip_response, TIMEOUT) - .await - ); + network + .crank_until(&node_ids[2], &mut rng, sent_gossip_response, TIMEOUT) + .await; // Run nodes 1 and 2 until settled. Node 2 will be waiting for the deploy from node 0. - assert!(network.settle(&mut rng, POLL_DURATION, TIMEOUT).await); + network.settle(&mut rng, POLL_DURATION, TIMEOUT).await; // Advance time to trigger node 2's timeout causing it to request the deploy from node 1. let secs_to_advance = GossipTableConfig::default().get_remainder_timeout_secs(); @@ -311,7 +307,7 @@ async fn should_get_from_alternate_source() { .map(|retrieved_deploy| retrieved_deploy == *deploy) .unwrap_or_default() }; - assert!(network.settle_on(&mut rng, deploy_held, TIMEOUT).await); + network.settle_on(&mut rng, deploy_held, TIMEOUT).await; NetworkController::::remove_active(); } @@ -349,11 +345,9 @@ async fn should_timeout_gossip_response() { _ => false, } }; - assert!( - network - .crank_until(&node_ids[0], &mut rng, made_gossip_request, TIMEOUT) - .await - ); + network + .crank_until(&node_ids[0], &mut rng, made_gossip_request, TIMEOUT) + .await; // Give node 0 time to set the timeouts before advancing the clock. time::delay_for(PAUSE_DURATION).await; @@ -389,7 +383,7 @@ async fn should_timeout_gossip_response() { .unwrap_or_default() }) }; - assert!(network.settle_on(&mut rng, deploy_held, TIMEOUT).await); + network.settle_on(&mut rng, deploy_held, TIMEOUT).await; NetworkController::::remove_active(); } diff --git a/node/src/components/small_network/tests.rs b/node/src/components/small_network/tests.rs index eb45eae21f..a5639b0caa 100644 --- a/node/src/components/small_network/tests.rs +++ b/node/src/components/small_network/tests.rs @@ -206,17 +206,11 @@ async fn run_two_node_network_five_times() { ); let timeout = Duration::from_secs(1); - assert!( - net.settle_on(&mut rng, network_is_complete, timeout).await, - "network did not fully connect in time" - ); + net.settle_on(&mut rng, network_is_complete, timeout).await; let quiet_for = Duration::from_millis(25); let timeout = Duration::from_secs(2); - assert!( - net.settle(&mut rng, quiet_for, timeout).await, - "network did not stay settled" - ); + net.settle(&mut rng, quiet_for, timeout).await; assert!( network_is_complete(net.nodes()), @@ -266,5 +260,5 @@ async fn bind_to_real_network_interface() { let quiet_for = Duration::from_millis(250); let timeout = Duration::from_secs(2); - assert!(net.settle(&mut rng, quiet_for, timeout).await); + net.settle(&mut rng, quiet_for, timeout).await; } diff --git a/node/src/testing/network.rs b/node/src/testing/network.rs index a7db017f5b..2f50567627 100644 --- a/node/src/testing/network.rs +++ b/node/src/testing/network.rs @@ -142,8 +142,7 @@ where rng: &mut RNG, condition: F, within: Duration, - ) -> bool - where + ) where RNG: Rng + ?Sized, F: Fn(&R::Event) -> bool + Send + 'static, { @@ -155,7 +154,7 @@ where time::timeout(within, self.crank_and_check_indefinitely(node_id, rng)) .await - .is_ok() + .unwrap() } async fn crank_and_check_indefinitely( @@ -207,10 +206,10 @@ where rng: &mut RNG, quiet_for: Duration, within: Duration, - ) -> bool { + ) { time::timeout(within, self.settle_indefinitely(rng, quiet_for)) .await - .is_ok() + .unwrap() } async fn settle_indefinitely(&mut self, rng: &mut RNG, quiet_for: Duration) { @@ -234,14 +233,14 @@ where /// Runs the main loop of every reactor until `condition` is true or until `within` has elapsed. /// /// Returns `true` if `condition` has been met within the specified timeout. - pub async fn settle_on(&mut self, rng: &mut RNG, condition: F, within: Duration) -> bool + pub async fn settle_on(&mut self, rng: &mut RNG, condition: F, within: Duration) where RNG: Rng + ?Sized, F: Fn(&HashMap>>) -> bool, { time::timeout(within, self.settle_on_indefinitely(rng, condition)) .await - .is_ok() + .unwrap() } async fn settle_on_indefinitely(&mut self, rng: &mut RNG, condition: F) From 1ba0dc77ec9aaf58ec4f31ff6b45093f5129f5f2 Mon Sep 17 00:00:00 2001 From: Fraser Hutchison Date: Fri, 17 Jul 2020 19:07:23 +0100 Subject: [PATCH 7/7] NDRS-119: refactor random test variables to chosen values --- node/src/components/deploy_gossiper/tests.rs | 21 ++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/node/src/components/deploy_gossiper/tests.rs b/node/src/components/deploy_gossiper/tests.rs index f677456a94..55478a358f 100644 --- a/node/src/components/deploy_gossiper/tests.rs +++ b/node/src/components/deploy_gossiper/tests.rs @@ -177,12 +177,7 @@ fn create_deploy_received( |effect_builder: EffectBuilder| effect_builder.announce_deploy_received(deploy).ignore() } -#[tokio::test] -async fn should_gossip() { - const NETWORK_SIZE_MIN: usize = 2; - const NETWORK_SIZE_MAX: usize = 20; - const DEPLOY_COUNT_MIN: usize = 1; - const DEPLOY_COUNT_MAX: usize = 30; +async fn run_gossip(network_size: usize, deploy_count: usize) { const TIMEOUT: Duration = Duration::from_secs(20); NetworkController::::create_active(); @@ -190,11 +185,9 @@ async fn should_gossip() { let mut rng = rand::thread_rng(); // Add `network_size` nodes. - let network_size = rng.gen_range(NETWORK_SIZE_MIN, NETWORK_SIZE_MAX + 1); let node_ids = network.add_nodes(&mut rng, network_size).await; // Create `deploy_count` random deploys. - let deploy_count: usize = rng.gen_range(DEPLOY_COUNT_MIN, DEPLOY_COUNT_MAX + 1); let (all_deploy_hashes, mut deploys): (BTreeSet<_>, Vec<_>) = iter::repeat_with(|| { let deploy = Box::new(rng.gen::()); (*deploy.id(), deploy) @@ -230,6 +223,18 @@ async fn should_gossip() { NetworkController::::remove_active(); } +#[tokio::test] +async fn should_gossip() { + const NETWORK_SIZES: [usize; 3] = [2, 5, 20]; + const DEPLOY_COUNTS: [usize; 3] = [1, 10, 30]; + + for network_size in &NETWORK_SIZES { + for deploy_count in &DEPLOY_COUNTS { + run_gossip(*network_size, *deploy_count).await + } + } +} + #[tokio::test] async fn should_get_from_alternate_source() { const NETWORK_SIZE: usize = 3;