diff --git a/node/Cargo.toml b/node/Cargo.toml index 93d8e2dee8..64d11abc1e 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -63,7 +63,7 @@ smallvec = "1.4.0" structopt = "0.3.14" tempfile = "3.1.0" thiserror = "1.0.18" -tokio = { version = "0.2.20", features = ["macros", "rt-threaded", "sync", "tcp", "time", "blocking"] } +tokio = { version = "0.2.20", features = ["blocking", "macros", "rt-threaded", "sync", "tcp", "time"] } tokio-openssl = "0.4.0" tokio-serde = { version = "0.6.1", features = ["messagepack"] } tokio-util = { version = "0.3.1", features = ["codec"] } @@ -80,13 +80,14 @@ wasmi = "0.6.2" [dev-dependencies] assert_matches = "1.3.0" +criterion = "0.3.3" +fake_instant = "0.4.0" lazy_static = "1" pnet = "0.26.0" proptest = "0.9.4" -fake_instant = "0.4.0" rand_xorshift = { version = "~0.2.0" } rand_core = "0.5.1" -criterion = "0.3.3" +tokio = { version = "0.2.20", features = ["test-util"] } [features] vendored-openssl = ['openssl/vendored'] diff --git a/node/src/components.rs b/node/src/components.rs index 7fd71ecb28..f56b45feab 100644 --- a/node/src/components.rs +++ b/node/src/components.rs @@ -9,6 +9,7 @@ pub mod contract_runtime; pub(crate) mod deploy_buffer; pub(crate) mod deploy_gossiper; // The `in_memory_network` is public for use in doctests. +#[cfg(test)] pub mod in_memory_network; pub(crate) mod metrics; pub(crate) mod pinger; diff --git a/node/src/components/deploy_gossiper.rs b/node/src/components/deploy_gossiper.rs index 144d830536..38f8dfe377 100644 --- a/node/src/components/deploy_gossiper.rs +++ b/node/src/components/deploy_gossiper.rs @@ -1,14 +1,13 @@ -use std::{ - collections::HashSet, - fmt::{self, Display, Formatter}, - time::Duration, -}; +mod event; +mod message; +mod tests; + +use std::{collections::HashSet, time::Duration}; use futures::FutureExt; use rand::Rng; -use serde::{Deserialize, Serialize}; use smallvec::smallvec; -use tracing::{debug, error, warn}; +use tracing::{debug, error}; use crate::{ components::{ @@ -21,10 +20,13 @@ use crate::{ EffectBuilder, EffectExt, Effects, }, types::{Deploy, DeployHash}, - utils::{DisplayIter, GossipAction, GossipTable}, + utils::{GossipAction, GossipTable}, GossipTableConfig, }; +pub use event::Event; +pub use message::Message; + trait ReactorEvent: From + From> + From> + Send { @@ -35,133 +37,6 @@ impl ReactorEvent for T where { } -/// `DeployGossiper` events. -#[derive(Debug)] -pub enum Event { - /// A new deploy has been received to be gossiped. - DeployReceived { deploy: Box }, - /// The network component gossiped to the included peers. - GossipedTo { - deploy_hash: DeployHash, - peers: HashSet, - }, - /// The timeout for waiting for a gossip response has elapsed and we should check the response - /// arrived. - CheckGossipTimeout { - deploy_hash: DeployHash, - peer: NodeId, - }, - /// The timeout for waiting for the full deploy body has elapsed and we should check the - /// response arrived. - CheckGetFromPeerTimeout { - deploy_hash: DeployHash, - peer: NodeId, - }, - /// An incoming gossip network message. - MessageReceived { sender: NodeId, message: Message }, - /// The result of the `DeployGossiper` putting a deploy to the storage component. If the - /// result is `Ok`, the deploy hash should be gossiped onwards. - PutToStoreResult { - deploy_hash: DeployHash, - maybe_sender: Option, - result: storage::Result, - }, - /// The result of the `DeployGossiper` getting a deploy from the storage component. If the - /// result is `Ok`, the deploy should be sent to the requesting peer. - GetFromStoreResult { - deploy_hash: DeployHash, - requester: NodeId, - result: Box>, - }, -} - -impl Display for Event { - fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { - match self { - Event::DeployReceived { deploy } => { - write!(formatter, "new deploy received: {}", deploy.id()) - } - Event::GossipedTo { deploy_hash, peers } => write!( - formatter, - "gossiped {} to {}", - deploy_hash, - DisplayIter::new(peers) - ), - Event::CheckGossipTimeout { deploy_hash, peer } => write!( - formatter, - "check gossip timeout for {} with {}", - deploy_hash, peer - ), - Event::CheckGetFromPeerTimeout { deploy_hash, peer } => write!( - formatter, - "check get from peer timeout for {} with {}", - deploy_hash, peer - ), - Event::MessageReceived { sender, message } => { - write!(formatter, "{} received from {}", message, sender) - } - Event::PutToStoreResult { - deploy_hash, - result, - .. - } => { - if result.is_ok() { - write!(formatter, "put {} to store", deploy_hash) - } else { - write!(formatter, "failed to put {} to store", deploy_hash) - } - } - Event::GetFromStoreResult { - deploy_hash, - result, - .. - } => { - if result.is_ok() { - write!(formatter, "got {} from store", deploy_hash) - } else { - write!(formatter, "failed to get {} from store", deploy_hash) - } - } - } - } -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub enum Message { - /// Gossiped out to random peers to notify them of a `Deploy` we hold. - Gossip(DeployHash), - /// Response to a `Gossip` message. If `is_already_held` is false, the recipient should treat - /// this as a `GetRequest` and send a `GetResponse` containing the `Deploy`. - GossipResponse { - deploy_hash: DeployHash, - is_already_held: bool, - }, - /// Sent if a `Deploy` fails to arrive, either after sending a `GossipResponse` with - /// `is_already_held` set to false, or after a previous `GetRequest`. - GetRequest(DeployHash), - /// Sent in response to a `GetRequest`, or to a peer which responded to gossip indicating it - /// didn't already hold the full `Deploy`. - GetResponse(Box), -} - -impl Display for Message { - fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { - match self { - Message::Gossip(deploy_hash) => write!(formatter, "gossip({})", deploy_hash), - Message::GossipResponse { - deploy_hash, - is_already_held, - } => write!( - formatter, - "gossip-response({}, {})", - deploy_hash, is_already_held - ), - Message::GetRequest(deploy_hash) => write!(formatter, "get-request({})", deploy_hash), - Message::GetResponse(deploy) => write!(formatter, "get-response({})", deploy.id()), - } - } -} - /// The component which gossips `Deploy`s to peers and handles incoming `Deploy`s which have been /// gossiped to it. #[derive(Debug)] @@ -222,7 +97,7 @@ impl DeployGossiper { // in the entry being removed. if peers.is_empty() { self.table.pause(&deploy_hash); - warn!( + debug!( "paused gossiping {} since no more peers to gossip to", deploy_hash ); diff --git a/node/src/components/deploy_gossiper/event.rs b/node/src/components/deploy_gossiper/event.rs new file mode 100644 index 0000000000..d46e73ef11 --- /dev/null +++ b/node/src/components/deploy_gossiper/event.rs @@ -0,0 +1,102 @@ +use std::{ + collections::HashSet, + fmt::{self, Display, Formatter}, +}; + +use super::Message; +use crate::{ + components::{small_network::NodeId, storage}, + types::{Deploy, DeployHash}, + utils::DisplayIter, +}; + +/// `DeployGossiper` events. +#[derive(Debug)] +pub enum Event { + /// A new deploy has been received to be gossiped. + DeployReceived { deploy: Box }, + /// The network component gossiped to the included peers. + GossipedTo { + deploy_hash: DeployHash, + peers: HashSet, + }, + /// The timeout for waiting for a gossip response has elapsed and we should check the response + /// arrived. + CheckGossipTimeout { + deploy_hash: DeployHash, + peer: NodeId, + }, + /// The timeout for waiting for the full deploy body has elapsed and we should check the + /// response arrived. + CheckGetFromPeerTimeout { + deploy_hash: DeployHash, + peer: NodeId, + }, + /// An incoming gossip network message. + MessageReceived { sender: NodeId, message: Message }, + /// The result of the `DeployGossiper` putting a deploy to the storage component. If the + /// result is `Ok`, the deploy hash should be gossiped onwards. + PutToStoreResult { + deploy_hash: DeployHash, + maybe_sender: Option, + result: storage::Result, + }, + /// The result of the `DeployGossiper` getting a deploy from the storage component. If the + /// result is `Ok`, the deploy should be sent to the requesting peer. + GetFromStoreResult { + deploy_hash: DeployHash, + requester: NodeId, + result: Box>, + }, +} + +impl Display for Event { + fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { + match self { + Event::DeployReceived { deploy } => { + write!(formatter, "new deploy received: {}", deploy.id()) + } + Event::GossipedTo { deploy_hash, peers } => write!( + formatter, + "gossiped {} to {}", + deploy_hash, + DisplayIter::new(peers) + ), + Event::CheckGossipTimeout { deploy_hash, peer } => write!( + formatter, + "check gossip timeout for {} with {}", + deploy_hash, peer + ), + Event::CheckGetFromPeerTimeout { deploy_hash, peer } => write!( + formatter, + "check get from peer timeout for {} with {}", + deploy_hash, peer + ), + Event::MessageReceived { sender, message } => { + write!(formatter, "{} received from {}", message, sender) + } + Event::PutToStoreResult { + deploy_hash, + result, + .. + } => { + if result.is_ok() { + write!(formatter, "put {} to store", deploy_hash) + } else { + write!(formatter, "failed to put {} to store", deploy_hash) + } + } + Event::GetFromStoreResult { + deploy_hash, + result, + .. + } => { + if result.is_ok() { + write!(formatter, "got {} from store", deploy_hash) + } else { + write!(formatter, "failed to get {} from store", deploy_hash) + } + } + } + } +} diff --git a/node/src/components/deploy_gossiper/message.rs b/node/src/components/deploy_gossiper/message.rs new file mode 100644 index 0000000000..b3b0a08681 --- /dev/null +++ b/node/src/components/deploy_gossiper/message.rs @@ -0,0 +1,41 @@ +use std::fmt::{self, Display, Formatter}; + +use serde::{Deserialize, Serialize}; + +use crate::types::{Deploy, DeployHash}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub enum Message { + /// Gossiped out to random peers to notify them of a `Deploy` we hold. + Gossip(DeployHash), + /// Response to a `Gossip` message. If `is_already_held` is false, the recipient should treat + /// this as a `GetRequest` and send a `GetResponse` containing the `Deploy`. + GossipResponse { + deploy_hash: DeployHash, + is_already_held: bool, + }, + /// Sent if a `Deploy` fails to arrive, either after sending a `GossipResponse` with + /// `is_already_held` set to false, or after a previous `GetRequest`. + GetRequest(DeployHash), + /// Sent in response to a `GetRequest`, or to a peer which responded to gossip indicating it + /// didn't already hold the full `Deploy`. + GetResponse(Box), +} + +impl Display for Message { + fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { + match self { + Message::Gossip(deploy_hash) => write!(formatter, "gossip({})", deploy_hash), + Message::GossipResponse { + deploy_hash, + is_already_held, + } => write!( + formatter, + "gossip-response({}, {})", + deploy_hash, is_already_held + ), + Message::GetRequest(deploy_hash) => write!(formatter, "get-request({})", deploy_hash), + Message::GetResponse(deploy) => write!(formatter, "get-response({})", deploy.id()), + } + } +} diff --git a/node/src/components/deploy_gossiper/tests.rs b/node/src/components/deploy_gossiper/tests.rs new file mode 100644 index 0000000000..55478a358f --- /dev/null +++ b/node/src/components/deploy_gossiper/tests.rs @@ -0,0 +1,394 @@ +#![cfg(test)] +use std::{ + collections::{BTreeSet, HashMap}, + fmt::{self, Debug, Display, Formatter}, + iter, +}; + +use derive_more::From; +use prometheus::Registry; +use smallvec::smallvec; +use tempfile::TempDir; +use thiserror::Error; +use tokio::time; +use tracing::debug; + +use super::*; +use crate::{ + components::{ + in_memory_network::{InMemoryNetwork, NetworkController, NodeId}, + storage::{self, Storage, StorageType}, + }, + effect::announcements::{ApiServerAnnouncement, NetworkAnnouncement, StorageAnnouncement}, + reactor::{self, EventQueueHandle, Runner}, + testing::{ + network::{Network, NetworkedReactor}, + ConditionCheckReactor, + }, + types::Deploy, +}; + +/// Top-level event for the reactor. +#[derive(Debug, From)] +#[must_use] +enum Event { + #[from] + /// Storage event. + Storage(StorageRequest), + /// Deploy gossiper event. + #[from] + DeployGossiper(super::Event), + /// Network request. + #[from] + NetworkRequest(NetworkRequest), + /// Network announcement. + #[from] + NetworkAnnouncement(NetworkAnnouncement), + /// Storage announcement. + #[from] + StorageAnnouncement(StorageAnnouncement), + /// API server announcement. + #[from] + ApiServerAnnouncement(ApiServerAnnouncement), +} + +impl Display for Event { + fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { + match self { + Event::Storage(event) => write!(formatter, "storage: {}", event), + Event::DeployGossiper(event) => write!(formatter, "deploy gossiper: {}", event), + Event::NetworkRequest(req) => write!(formatter, "network request: {}", req), + Event::NetworkAnnouncement(ann) => write!(formatter, "network announcement: {}", ann), + Event::StorageAnnouncement(ann) => write!(formatter, "storage announcement: {}", ann), + Event::ApiServerAnnouncement(ann) => { + write!(formatter, "api server announcement: {}", ann) + } + } + } +} + +/// Error type returned by the test reactor. +#[derive(Debug, Error)] +pub enum Error { + /// Metrics-related error + #[error("prometheus (metrics) error: {0}")] + Metrics(#[from] prometheus::Error), + /// `Storage` component error. + #[error("storage error: {0}")] + Storage(#[from] storage::Error), +} +struct Reactor { + network: InMemoryNetwork, + storage: Storage, + deploy_gossiper: DeployGossiper, + _storage_tempdir: TempDir, +} + +impl Drop for Reactor { + fn drop(&mut self) { + NetworkController::::remove_node(&self.network.node_id()) + } +} + +impl reactor::Reactor for Reactor { + type Event = Event; + type Config = GossipTableConfig; + type Error = Error; + + fn new( + config: Self::Config, + _registry: &Registry, + event_queue: EventQueueHandle, + rng: &mut R, + ) -> Result<(Self, Effects), Self::Error> { + let network = NetworkController::create_node(event_queue, rng); + + let (storage_config, _storage_tempdir) = storage::Config::default_for_tests(); + let storage = Storage::new(&storage_config)?; + + let deploy_gossiper = DeployGossiper::new(config); + + let reactor = Reactor { + network, + storage, + deploy_gossiper, + _storage_tempdir, + }; + + let effects = Effects::new(); + + Ok((reactor, effects)) + } + + fn dispatch_event( + &mut self, + effect_builder: EffectBuilder, + rng: &mut R, + event: Event, + ) -> Effects { + match event { + Event::Storage(event) => reactor::wrap_effects( + Event::Storage, + self.storage.handle_event(effect_builder, rng, event), + ), + Event::DeployGossiper(event) => reactor::wrap_effects( + Event::DeployGossiper, + self.deploy_gossiper + .handle_event(effect_builder, rng, event), + ), + Event::NetworkRequest(request) => reactor::wrap_effects( + Event::NetworkRequest, + self.network.handle_event(effect_builder, rng, request), + ), + Event::NetworkAnnouncement(NetworkAnnouncement::MessageReceived { + sender, + payload, + }) => { + let event = super::Event::MessageReceived { + sender, + message: payload, + }; + reactor::wrap_effects( + From::from, + self.deploy_gossiper + .handle_event(effect_builder, rng, event), + ) + } + Event::StorageAnnouncement(_) => Effects::new(), + Event::ApiServerAnnouncement(ApiServerAnnouncement::DeployReceived { deploy }) => { + let event = super::Event::DeployReceived { deploy }; + self.dispatch_event(effect_builder, rng, Event::DeployGossiper(event)) + } + } + } +} + +impl NetworkedReactor for Reactor { + type NodeId = NodeId; + + fn node_id(&self) -> NodeId { + self.network.node_id() + } +} + +fn create_deploy_received( + deploy: Box, +) -> impl FnOnce(EffectBuilder) -> Effects { + |effect_builder: EffectBuilder| effect_builder.announce_deploy_received(deploy).ignore() +} + +async fn run_gossip(network_size: usize, deploy_count: usize) { + const TIMEOUT: Duration = Duration::from_secs(20); + + NetworkController::::create_active(); + let mut network = Network::::new(); + let mut rng = rand::thread_rng(); + + // Add `network_size` nodes. + let node_ids = network.add_nodes(&mut rng, network_size).await; + + // Create `deploy_count` random deploys. + let (all_deploy_hashes, mut deploys): (BTreeSet<_>, Vec<_>) = iter::repeat_with(|| { + let deploy = Box::new(rng.gen::()); + (*deploy.id(), deploy) + }) + .take(deploy_count) + .unzip(); + + // Give each deploy to a randomly-chosen node to be gossiped. + for deploy in deploys.drain(..) { + let index: usize = rng.gen_range(0, network_size); + network + .process_injected_effect_on(&node_ids[index], create_deploy_received(deploy)) + .await; + } + + // Check every node has every deploy stored locally. + let all_deploys_held = |nodes: &HashMap>>| { + nodes.values().all(|runner| { + let hashes = runner + .reactor() + .inner() + .storage + .deploy_store() + .ids() + .unwrap() + .into_iter() + .collect(); + all_deploy_hashes == hashes + }) + }; + network.settle_on(&mut rng, all_deploys_held, TIMEOUT).await; + + NetworkController::::remove_active(); +} + +#[tokio::test] +async fn should_gossip() { + const NETWORK_SIZES: [usize; 3] = [2, 5, 20]; + const DEPLOY_COUNTS: [usize; 3] = [1, 10, 30]; + + for network_size in &NETWORK_SIZES { + for deploy_count in &DEPLOY_COUNTS { + run_gossip(*network_size, *deploy_count).await + } + } +} + +#[tokio::test] +async fn should_get_from_alternate_source() { + const NETWORK_SIZE: usize = 3; + const POLL_DURATION: Duration = Duration::from_millis(10); + const TIMEOUT: Duration = Duration::from_secs(2); + + NetworkController::::create_active(); + let mut network = Network::::new(); + let mut rng = rand::thread_rng(); + + // Add `NETWORK_SIZE` nodes. + let node_ids = network.add_nodes(&mut rng, NETWORK_SIZE).await; + + // Create random deploy. + let deploy = Box::new(rng.gen::()); + let deploy_id = *deploy.id(); + + // Give the deploy to nodes 0 and 1 to be gossiped. + for node_id in node_ids.iter().take(2) { + network + .process_injected_effect_on(&node_id, create_deploy_received(deploy.clone())) + .await; + } + + // Run node 0 until it has sent the gossip request then remove it from the network. + let made_gossip_request = |event: &Event| -> bool { + match event { + Event::NetworkRequest(NetworkRequest::Gossip { .. }) => true, + _ => false, + } + }; + network + .crank_until(&node_ids[0], &mut rng, made_gossip_request, TIMEOUT) + .await; + assert!(network.remove_node(&node_ids[0]).is_some()); + debug!("removed node {}", &node_ids[0]); + + // Run node 2 until it receives and responds to the gossip request from node 0. + let node_id_0 = node_ids[0]; + let sent_gossip_response = move |event: &Event| -> bool { + match event { + Event::NetworkRequest(NetworkRequest::SendMessage { + dest, + payload: Message::GossipResponse { .. }, + .. + }) => dest == &node_id_0, + _ => false, + } + }; + network + .crank_until(&node_ids[2], &mut rng, sent_gossip_response, TIMEOUT) + .await; + + // Run nodes 1 and 2 until settled. Node 2 will be waiting for the deploy from node 0. + network.settle(&mut rng, POLL_DURATION, TIMEOUT).await; + + // Advance time to trigger node 2's timeout causing it to request the deploy from node 1. + let secs_to_advance = GossipTableConfig::default().get_remainder_timeout_secs(); + time::pause(); + time::advance(Duration::from_secs(secs_to_advance)).await; + time::resume(); + debug!("advanced time by {} secs", secs_to_advance); + + // Check node 0 has the deploy stored locally. + let deploy_held = |nodes: &HashMap>>| { + let runner = nodes.get(&node_ids[2]).unwrap(); + runner + .reactor() + .inner() + .storage + .deploy_store() + .get(smallvec![deploy_id]) + .pop() + .expect("should only be a single result") + .map(|retrieved_deploy| retrieved_deploy == *deploy) + .unwrap_or_default() + }; + network.settle_on(&mut rng, deploy_held, TIMEOUT).await; + + NetworkController::::remove_active(); +} + +#[tokio::test] +async fn should_timeout_gossip_response() { + const PAUSE_DURATION: Duration = Duration::from_millis(50); + const TIMEOUT: Duration = Duration::from_secs(2); + + NetworkController::::create_active(); + let mut network = Network::::new(); + let mut rng = rand::thread_rng(); + + // The target number of peers to infect with a given piece of data. + let infection_target = GossipTableConfig::default().infection_target(); + + // Add `infection_target + 1` nodes. + let mut node_ids = network + .add_nodes(&mut rng, infection_target as usize + 1) + .await; + + // Create random deploy. + let deploy = Box::new(rng.gen::()); + let deploy_id = *deploy.id(); + + // Give the deploy to node 0 to be gossiped. + network + .process_injected_effect_on(&node_ids[0], create_deploy_received(deploy.clone())) + .await; + + // Run node 0 until it has sent the gossip requests. + let made_gossip_request = |event: &Event| -> bool { + match event { + Event::DeployGossiper(super::Event::GossipedTo { .. }) => true, + _ => false, + } + }; + network + .crank_until(&node_ids[0], &mut rng, made_gossip_request, TIMEOUT) + .await; + // Give node 0 time to set the timeouts before advancing the clock. + time::delay_for(PAUSE_DURATION).await; + + // Replace all nodes except node 0 with new nodes. + for node_id in node_ids.drain(1..) { + assert!(network.remove_node(&node_id).is_some()); + debug!("removed node {}", node_id); + } + for _ in 0..infection_target { + let (node_id, _runner) = network.add_node(&mut rng).await.unwrap(); + node_ids.push(node_id); + } + + // Advance time to trigger node 0's timeout causing it to gossip to the new nodes. + let secs_to_advance = GossipTableConfig::default().gossip_request_timeout_secs(); + time::pause(); + time::advance(Duration::from_secs(secs_to_advance)).await; + time::resume(); + debug!("advanced time by {} secs", secs_to_advance); + + // Check every node has every deploy stored locally. + let deploy_held = |nodes: &HashMap>>| { + nodes.values().all(|runner| { + runner + .reactor() + .inner() + .storage + .deploy_store() + .get(smallvec![deploy_id]) + .pop() + .expect("should only be a single result") + .map(|retrieved_deploy| retrieved_deploy == *deploy) + .unwrap_or_default() + }) + }; + network.settle_on(&mut rng, deploy_held, TIMEOUT).await; + + NetworkController::::remove_active(); +} diff --git a/node/src/components/in_memory_network.rs b/node/src/components/in_memory_network.rs index 13c80c2aea..6e71200876 100644 --- a/node/src/components/in_memory_network.rs +++ b/node/src/components/in_memory_network.rs @@ -15,20 +15,30 @@ //! //! ```rust //! # #![allow(dead_code)] // FIXME: Remove me -//! # use std::{collections::HashMap, fmt::{self, Formatter, Debug, Display}, ops::AddAssign, -//! # time::Duration}; +//! # use std::{ +//! # collections::HashMap, +//! # fmt::{self, Debug, Display, Formatter}, +//! # ops::AddAssign, +//! # time::Duration, +//! # }; //! # //! # use derive_more::From; //! # use maplit::hashmap; //! # use prometheus::Registry; -//! # use rand::{Rng, rngs::OsRng}; +//! # use rand::{rngs::OsRng, Rng}; //! # -//! # use casperlabs_node::{components::{Component, -//! # in_memory_network::{InMemoryNetwork, NetworkController, NodeId}}, -//! # effect::{EffectBuilder, EffectExt, Effects, -//! # announcements::NetworkAnnouncement, requests::NetworkRequest}, -//! # reactor::{self, EventQueueHandle, wrap_effects}, -//! # testing::network::{Network, NetworkedReactor}}; +//! # use casperlabs_node::{ +//! # components::{ +//! # in_memory_network::{InMemoryNetwork, NetworkController, NodeId}, +//! # Component, +//! # }, +//! # effect::{ +//! # announcements::NetworkAnnouncement, requests::NetworkRequest, EffectBuilder, EffectExt, +//! # Effects, +//! # }, +//! # reactor::{self, wrap_effects, EventQueueHandle}, +//! # testing::network::{Network, NetworkedReactor}, +//! # }; //! # //! # let mut runtime = tokio::runtime::Runtime::new().unwrap(); //! # @@ -202,11 +212,11 @@ //! } //! //! impl NetworkedReactor for Reactor { -//! type NodeId = NodeId; +//! type NodeId = NodeId; //! -//! fn node_id(&self) -> NodeId { -//! self.net.node_id() -//! } +//! fn node_id(&self) -> NodeId { +//! self.net.node_id() +//! } //! } //! //! // We can finally run the tests: @@ -271,12 +281,12 @@ use std::{ any::Any, + cell::RefCell, collections::{HashMap, HashSet}, fmt::Display, - sync::{Arc, Mutex, RwLock}, + sync::{Arc, RwLock}, }; -use lazy_static::lazy_static; use rand::{seq::IteratorRandom, Rng}; use tokio::sync::mpsc::{self, error::SendError}; use tracing::{debug, error, info, warn}; @@ -287,19 +297,21 @@ use crate::{ announcements::NetworkAnnouncement, requests::NetworkRequest, EffectBuilder, EffectExt, Effects, }, + logging, reactor::{EventQueueHandle, QueueKind}, + tls::KeyFingerprint, }; -type Network

= Arc>>>; - /// The node ID type used by the in-memory network. -pub type NodeId = u64; +pub type NodeId = KeyFingerprint; + +type Network

= Arc>>>; -lazy_static! { +thread_local! { /// The currently active network as a thread local. /// /// The type is dynamic, every network can be of a distinct type when the payload `P` differs. - static ref ACTIVE_NETWORK: Mutex>> = Mutex::new(None); + static ACTIVE_NETWORK: RefCell>> = RefCell::new(None); } /// The network controller is used to control the network topology (e.g. adding and removing nodes). @@ -315,6 +327,7 @@ where { /// Create a new, empty network. fn new() -> Self { + let _ = logging::init(); NetworkController { nodes: Default::default(), } @@ -326,10 +339,9 @@ where /// /// Panics if the internal lock has been poisoned. pub fn create_active() { + let _ = logging::init(); ACTIVE_NETWORK - .lock() - .expect("active network lock has been poisoned") - .replace(Box::new(Self::new())); + .with(|active_network| active_network.borrow_mut().replace(Box::new(Self::new()))); } /// Removes the active network. @@ -340,12 +352,13 @@ where /// removed or if there was no network at at all. pub fn remove_active() { assert!( - ACTIVE_NETWORK - .lock() - .expect("active network lock has been poisoned") - .take() - .expect("tried to remove non-existant network") - .is::(), + ACTIVE_NETWORK.with(|active_network| { + active_network + .borrow_mut() + .take() + .expect("tried to remove non-existent network") + .is::() + }), "removed network was of wrong type" ); } @@ -364,14 +377,36 @@ where R: Rng + ?Sized, REv: From> + Send, { - ACTIVE_NETWORK - .lock() - .expect("active network lock has been poisoned") - .as_mut() - .expect("tried to create node without active network set") - .downcast_mut::() - .expect("active network has wrong message type") - .create_node_local(event_queue, rng) + ACTIVE_NETWORK.with(|active_network| { + active_network + .borrow_mut() + .as_mut() + .expect("tried to create node without active network set") + .downcast_mut::() + .expect("active network has wrong message type") + .create_node_local(event_queue, rng) + }) + } + + /// Removes an in-memory network component on the active network. + /// + /// # Panics + /// + /// Panics if the internal lock has been poisoned, the active network is not of the correct + /// message type, or the node to remove doesn't exist. + pub fn remove_node(node_id: &NodeId) { + ACTIVE_NETWORK.with(|active_network| { + if let Some(active_network) = active_network.borrow_mut().as_mut() { + active_network + .downcast_mut::() + .expect("active network has wrong message type") + .nodes + .write() + .expect("poisoned lock") + .remove(node_id) + .expect("node doesn't exist in network"); + } + }) } /// Creates a new networking node with a random node ID. @@ -440,6 +475,10 @@ where dest: NodeId, payload: P, ) { + if dest == self.node_id { + panic!("can't send message to self"); + } + match nodes.get(&dest) { Some(sender) => { if let Err(SendError((_, msg))) = sender.send((self.node_id, payload)) { @@ -448,7 +487,7 @@ where // We do nothing else, the message is just dropped. } } - None => info!(%dest, %payload, "dropping message to non-existant recipient"), + None => info!(%dest, %payload, "dropping message to non-existent recipient"), } } } @@ -471,6 +510,10 @@ where payload, responder, } => { + if dest == self.node_id { + panic!("can't send message to self"); + } + if let Ok(guard) = self.nodes.read() { self.send(&guard, dest, payload); } else { @@ -481,7 +524,7 @@ where } NetworkRequest::Broadcast { payload, responder } => { if let Ok(guard) = self.nodes.read() { - for dest in guard.keys() { + for dest in guard.keys().filter(|&node_id| node_id != &self.node_id) { self.send(&guard, *dest, payload.clone()); } } else { @@ -499,7 +542,7 @@ where if let Ok(guard) = self.nodes.read() { let chosen: HashSet<_> = guard .keys() - .filter(|k| !exclude.contains(k)) + .filter(|&node_id| !exclude.contains(node_id) && node_id != &self.node_id) .cloned() .choose_multiple(rng, count) .into_iter() diff --git a/node/src/components/small_network.rs b/node/src/components/small_network.rs index 6f7e68d9a7..b3114014f5 100644 --- a/node/src/components/small_network.rs +++ b/node/src/components/small_network.rs @@ -44,7 +44,7 @@ mod error; mod event; mod message; #[cfg(test)] -mod test; +mod tests; use std::{ collections::{HashMap, HashSet}, diff --git a/node/src/components/small_network/test.rs b/node/src/components/small_network/tests.rs similarity index 83% rename from node/src/components/small_network/test.rs rename to node/src/components/small_network/tests.rs index bb8304820a..a5639b0caa 100644 --- a/node/src/components/small_network/test.rs +++ b/node/src/components/small_network/tests.rs @@ -6,11 +6,10 @@ use std::{ collections::{HashMap, HashSet}, fmt::{self, Debug, Display, Formatter}, - time::Duration, + time::{Duration, Instant}, }; use derive_more::From; -use futures::Future; use serde::{Deserialize, Serialize}; use small_network::NodeId; @@ -20,13 +19,15 @@ use crate::{ logging, reactor::{self, EventQueueHandle, Reactor, Runner}, small_network::{self, SmallNetwork}, - testing::network::{Network, NetworkedReactor}, + testing::{ + network::{Network, NetworkedReactor}, + ConditionCheckReactor, + }, }; use pnet::datalink; use prometheus::Registry; use rand::{rngs::OsRng, Rng}; use reactor::{wrap_effects, Finalize}; -use tokio::time::{timeout, Timeout}; use tracing::{debug, info}; /// The networking port used by the tests for the root node. @@ -131,7 +132,9 @@ fn init_logging() { } /// Checks whether or not a given network is completely connected. -fn network_is_complete(nodes: &HashMap>) -> bool { +fn network_is_complete( + nodes: &HashMap>>, +) -> bool { // We need at least one node. if nodes.is_empty() { return false; @@ -139,13 +142,13 @@ fn network_is_complete(nodes: &HashMap>) -> bool { let expected: HashSet<_> = nodes .iter() - .map(|(_, runner)| runner.reactor().net.node_id()) + .map(|(_, runner)| runner.reactor().inner().net.node_id()) .collect(); nodes .iter() .map(|(_, runner)| { - let net = &runner.reactor().net; + let net = &runner.reactor().inner().net; let mut actual = net.connected_nodes(); // All nodes should be connected to every other node, except itself, so we add it to the @@ -157,23 +160,6 @@ fn network_is_complete(nodes: &HashMap>) -> bool { .all(|actual| actual == expected) } -/// Helper trait to annotate timeouts more naturally. -trait Within { - /// Sets a timeout on a future. - /// - /// If the timeout occurs, the annotated future will be **cancelled**. Use with caution. - fn within(self, duration: Duration) -> Timeout; -} - -impl Within for T -where - T: Future, -{ - fn within(self, duration: Duration) -> Timeout { - timeout(duration, self) - } -} - fn gen_config(bind_port: u16) -> small_network::Config { // Bind everything to localhost. let bind_interface = "127.0.0.1".parse().unwrap(); @@ -205,29 +191,26 @@ async fn run_two_node_network_five_times() { let mut net = Network::new(); - let start = std::time::Instant::now(); + let start = Instant::now(); net.add_node_with_config(gen_config(TEST_ROOT_NODE_PORT), &mut rng) .await .unwrap(); net.add_node_with_config(gen_config(TEST_ROOT_NODE_PORT + 1), &mut rng) .await .unwrap(); - let end = std::time::Instant::now(); + let end = Instant::now(); debug!( total_time_ms = (end - start).as_millis() as u64, "finished setting up networking nodes" ); - net.settle_on(&mut rng, network_is_complete) - .within(Duration::from_millis(1000)) - .await - .expect("network did not fully connect in time"); + let timeout = Duration::from_secs(1); + net.settle_on(&mut rng, network_is_complete, timeout).await; - net.settle(&mut rng, Duration::from_millis(25)) - .within(Duration::from_millis(2000)) - .await - .expect("network did not stay settled"); + let quiet_for = Duration::from_millis(25); + let timeout = Duration::from_secs(2); + net.settle(&mut rng, quiet_for, timeout).await; assert!( network_is_complete(net.nodes()), @@ -275,5 +258,7 @@ async fn bind_to_real_network_interface() { .await .unwrap(); - net.settle(&mut rng, Duration::from_millis(250)).await; + let quiet_for = Duration::from_millis(250); + let timeout = Duration::from_secs(2); + net.settle(&mut rng, quiet_for, timeout).await; } diff --git a/node/src/components/storage/config.rs b/node/src/components/storage/config.rs index 96863ddc84..7a30a56e5b 100644 --- a/node/src/components/storage/config.rs +++ b/node/src/components/storage/config.rs @@ -2,6 +2,7 @@ use std::{env, path::PathBuf}; use directories::ProjectDirs; use serde::{Deserialize, Serialize}; +#[cfg(test)] use tempfile::TempDir; use tracing::warn; @@ -15,6 +16,7 @@ const DEFAULT_MAX_BLOCK_STORE_SIZE: usize = 483_183_820_800; // 450 GiB const DEFAULT_MAX_DEPLOY_STORE_SIZE: usize = 322_122_547_200; // 300 GiB const DEFAULT_MAX_CHAINSPEC_STORE_SIZE: usize = 1_073_741_824; // 1 GiB +#[cfg(test)] const DEFAULT_TEST_MAX_DB_SIZE: usize = 52_428_800; // 50 MiB /// On-disk storage configuration. @@ -55,7 +57,7 @@ pub struct Config { impl Config { /// Returns a default `Config` suitable for tests, along with a `TempDir` which must be kept /// alive for the duration of the test since its destructor removes the dir from the filesystem. - #[allow(unused)] + #[cfg(test)] pub(crate) fn default_for_tests() -> (Self, TempDir) { let tempdir = tempfile::tempdir().expect("should get tempdir"); let path = Some(tempdir.path().to_path_buf()); diff --git a/node/src/lib.rs b/node/src/lib.rs index 060fe8b12f..e2738411ed 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -26,6 +26,7 @@ pub mod crypto; pub mod effect; pub mod logging; pub mod reactor; +#[cfg(test)] pub mod testing; pub mod tls; pub mod types; diff --git a/node/src/logging.rs b/node/src/logging.rs index 2c7ed2e00f..a911969b7b 100644 --- a/node/src/logging.rs +++ b/node/src/logging.rs @@ -123,7 +123,7 @@ pub fn init() -> anyhow::Result<()> { // Setup a new tracing-subscriber writing to `stderr` for logging. tracing::subscriber::set_global_default( tracing_subscriber::fmt() - .with_writer(io::stderr) + .with_writer(io::stdout) .with_env_filter(EnvFilter::from_default_env()) .fmt_fields(formatter) .event_format(FmtEvent {}) diff --git a/node/src/reactor.rs b/node/src/reactor.rs index 3fcd18c3eb..52781d0106 100644 --- a/node/src/reactor.rs +++ b/node/src/reactor.rs @@ -227,6 +227,24 @@ where }) } + /// Inject (schedule then process) effects created via a call to `create_effects` which is + /// itself passed an instance of an `EffectBuilder`. + #[cfg(test)] + pub(crate) async fn process_injected_effects(&mut self, create_effects: F) + where + F: FnOnce(EffectBuilder) -> Effects, + { + let event_queue = EventQueueHandle::new(self.scheduler); + let effect_builder = EffectBuilder::new(event_queue); + + let effects = create_effects(effect_builder); + + let effect_span = tracing::debug_span!("process injected effects", ev = self.event_count); + process_effects(self.scheduler, effects) + .instrument(effect_span) + .await; + } + /// Processes a single event on the event queue. #[inline] pub async fn crank(&mut self, rng: &mut Rd) { diff --git a/node/src/reactor/initializer.rs b/node/src/reactor/initializer.rs index a6880e8f75..83c50318a9 100644 --- a/node/src/reactor/initializer.rs +++ b/node/src/reactor/initializer.rs @@ -88,7 +88,7 @@ pub enum Error { ContractRuntime(#[from] contract_runtime::ConfigError), } -/// Validator node reactor. +/// Initializer node reactor. #[derive(Debug)] pub struct Reactor { pub(super) config: validator::Config, diff --git a/node/src/testing.rs b/node/src/testing.rs index 296854eeb4..6cd28eb8cb 100644 --- a/node/src/testing.rs +++ b/node/src/testing.rs @@ -3,4 +3,7 @@ //! Contains various parts and components to aid writing tests and simulations using the //! `casperlabs-node` library. +mod condition_check_reactor; pub mod network; + +pub(crate) use condition_check_reactor::ConditionCheckReactor; diff --git a/node/src/testing/condition_check_reactor.rs b/node/src/testing/condition_check_reactor.rs new file mode 100644 index 0000000000..badce89534 --- /dev/null +++ b/node/src/testing/condition_check_reactor.rs @@ -0,0 +1,114 @@ +use std::fmt::{self, Debug, Formatter}; + +use futures::future::BoxFuture; +use prometheus::Registry; +use rand::Rng; + +use super::network::NetworkedReactor; +use crate::{ + effect::{EffectBuilder, Effects}, + reactor::{EventQueueHandle, Finalize, Reactor}, +}; + +/// A reactor wrapping an inner reactor, and which has an optional hook into +/// `Reactor::dispatch_event()`. +/// +/// While the hook is not `None`, it's called on every call to `dispatch_event()`, taking a +/// reference to the current `Event`, and setting a boolean result to true when the condition has +/// been met. +/// +/// Once the condition is met, the hook is reset to `None`. +pub struct ConditionCheckReactor { + reactor: R, + condition_checker: Option bool + Send>>, + condition_result: bool, +} + +impl ConditionCheckReactor { + /// Sets the condition checker hook. + pub fn set_condition_checker( + &mut self, + condition_checker: Box bool + Send>, + ) { + self.condition_checker = Some(condition_checker); + } + + /// Returns the result of the last execution of the condition checker hook. + pub fn condition_result(&self) -> bool { + self.condition_result + } + + /// Returns a reference to the wrapped reactor. + pub fn inner(&self) -> &R { + &self.reactor + } + + /// Returns a mutable reference to the wrapped reactor. + pub fn inner_mut(&mut self) -> &mut R { + &mut self.reactor + } +} + +impl Reactor for ConditionCheckReactor { + type Event = R::Event; + type Config = R::Config; + type Error = R::Error; + + fn new( + config: Self::Config, + registry: &Registry, + event_queue: EventQueueHandle, + rng: &mut RNG, + ) -> Result<(Self, Effects), Self::Error> { + let (reactor, effects) = R::new(config, registry, event_queue, rng)?; + Ok(( + Self { + reactor, + condition_checker: None, + condition_result: false, + }, + effects, + )) + } + + fn dispatch_event( + &mut self, + effect_builder: EffectBuilder, + rng: &mut RNG, + event: Self::Event, + ) -> Effects { + self.condition_result = self + .condition_checker + .as_ref() + .map(|condition_checker| condition_checker(&event)) + .unwrap_or_default(); + if self.condition_result { + self.condition_checker = None; + } + self.reactor.dispatch_event(effect_builder, rng, event) + } +} + +impl Finalize for ConditionCheckReactor { + fn finalize(self) -> BoxFuture<'static, ()> { + self.reactor.finalize() + } +} + +impl NetworkedReactor for ConditionCheckReactor { + type NodeId = R::NodeId; + + fn node_id(&self) -> Self::NodeId { + self.reactor.node_id() + } +} + +impl Debug for ConditionCheckReactor { + fn fmt(&self, formatter: &mut Formatter) -> fmt::Result { + formatter + .debug_struct("ConditionCheckReactor") + .field("reactor", &self.reactor) + .field("condition_check_result", &self.condition_result) + .finish() + } +} diff --git a/node/src/testing/network.rs b/node/src/testing/network.rs index b9b7513174..2f50567627 100644 --- a/node/src/testing/network.rs +++ b/node/src/testing/network.rs @@ -9,10 +9,15 @@ use std::{ use futures::future::{BoxFuture, FutureExt}; use rand::Rng; -use tracing::{debug, field}; +use tokio::time; +use tracing::debug; use tracing_futures::Instrument; -use crate::reactor::{Finalize, Reactor, Runner}; +use super::ConditionCheckReactor; +use crate::{ + effect::{EffectBuilder, Effects}, + reactor::{Finalize, Reactor, Runner}, +}; /// A reactor with networking functionality. pub trait NetworkedReactor: Sized { @@ -34,13 +39,14 @@ const POLL_INTERVAL: Duration = Duration::from_millis(10); #[derive(Debug, Default)] pub struct Network { /// Current network. - nodes: HashMap<::NodeId, Runner>, + nodes: HashMap<::NodeId, Runner>>, } impl Network where R: Reactor + NetworkedReactor, R::Config: Default, + ::Error: Debug, R::Error: From, { /// Creates a new networking node on the network using the default root node port. @@ -49,12 +55,26 @@ where /// /// Panics if a duplicate node ID is being inserted. This should only happen in case a randomly /// generated ID collides. - pub async fn add_node( + pub async fn add_node( &mut self, - rng: &mut Rd, - ) -> Result<(R::NodeId, &mut Runner), R::Error> { + rng: &mut RNG, + ) -> Result<(R::NodeId, &mut Runner>), R::Error> { self.add_node_with_config(Default::default(), rng).await } + + /// Adds `count` new nodes to the network, and returns their IDs. + pub async fn add_nodes( + &mut self, + rng: &mut RNG, + count: usize, + ) -> Vec { + let mut node_ids = vec![]; + for _ in 0..count { + let (node_id, _runner) = self.add_node(rng).await.unwrap(); + node_ids.push(node_id); + } + node_ids + } } impl Network @@ -74,12 +94,12 @@ where /// # Panics /// /// Panics if a duplicate node ID is being inserted. - pub async fn add_node_with_config( + pub async fn add_node_with_config( &mut self, cfg: R::Config, - rng: &mut Rd, - ) -> Result<(R::NodeId, &mut Runner), R::Error> { - let runner: Runner = Runner::new(cfg, rng).await?; + rng: &mut RNG, + ) -> Result<(R::NodeId, &mut Runner>), R::Error> { + let runner: Runner> = Runner::new(cfg, rng).await?; let node_id = runner.reactor().node_id(); @@ -95,13 +115,78 @@ where Ok((node_id, node_ref)) } + /// Removes a node from the network. + pub fn remove_node(&mut self, node_id: &R::NodeId) -> Option>> { + self.nodes.remove(node_id) + } + + /// Crank the specified runner once, returning the number of events processed. + pub async fn crank(&mut self, node_id: &R::NodeId, rng: &mut RNG) -> usize { + let runner = self.nodes.get_mut(node_id).expect("should find node"); + + let node_id = runner.reactor().node_id(); + let span = tracing::error_span!("crank", node_id = %node_id); + if runner.try_crank(rng).instrument(span).await.is_some() { + 1 + } else { + 0 + } + } + + /// Crank only the specified runner until `condition` is true or until `within` has elapsed. + /// + /// Returns `true` if `condition` has been met within the specified timeout. + pub async fn crank_until( + &mut self, + node_id: &R::NodeId, + rng: &mut RNG, + condition: F, + within: Duration, + ) where + RNG: Rng + ?Sized, + F: Fn(&R::Event) -> bool + Send + 'static, + { + self.nodes + .get_mut(node_id) + .unwrap() + .reactor_mut() + .set_condition_checker(Box::new(condition)); + + time::timeout(within, self.crank_and_check_indefinitely(node_id, rng)) + .await + .unwrap() + } + + async fn crank_and_check_indefinitely( + &mut self, + node_id: &R::NodeId, + rng: &mut RNG, + ) { + loop { + if self.crank(node_id, rng).await == 0 { + time::delay_for(POLL_INTERVAL).await; + continue; + } + + if self + .nodes + .get(node_id) + .unwrap() + .reactor() + .condition_result() + { + debug!("{} met condition", node_id); + return; + } + } + } + /// Crank all runners once, returning the number of events processed. - pub async fn crank_all(&mut self, rng: &mut Rd) -> usize { + pub async fn crank_all(&mut self, rng: &mut RNG) -> usize { let mut event_count = 0; for node in self.nodes.values_mut() { let node_id = node.reactor().node_id(); - let span = tracing::error_span!("crank", node_id = field::Empty); - span.record("node_id", &field::display(node_id)); + let span = tracing::error_span!("crank", node_id = %node_id); event_count += if node.try_crank(rng).instrument(span).await.is_some() { 1 } else { @@ -114,18 +199,30 @@ where /// Process events on all nodes until all event queues are empty. /// - /// Exits if `at_least` time has passed twice between events that have been processed. - pub async fn settle(&mut self, rng: &mut Rd, at_least: Duration) { + /// Returns `true` if `quiet_for` time has passed with no new events processed within the + /// specified timeout. + pub async fn settle( + &mut self, + rng: &mut RNG, + quiet_for: Duration, + within: Duration, + ) { + time::timeout(within, self.settle_indefinitely(rng, quiet_for)) + .await + .unwrap() + } + + async fn settle_indefinitely(&mut self, rng: &mut RNG, quiet_for: Duration) { let mut no_events = false; loop { if self.crank_all(rng).await == 0 { - // Stop once we have no pending events and haven't had any for `at_least` duration. + // Stop once we have no pending events and haven't had any for `quiet_for` time. if no_events { - debug!(?at_least, "network has settled after"); + debug!("network has been quiet for {:?}", quiet_for); break; } else { no_events = true; - tokio::time::delay_for(at_least).await; + time::delay_for(quiet_for).await; } } else { no_events = false; @@ -133,30 +230,58 @@ where } } - /// Runs the main loop of every reactor until a condition is true. - pub async fn settle_on(&mut self, rng: &mut Rd, f: F) + /// Runs the main loop of every reactor until `condition` is true or until `within` has elapsed. + /// + /// Returns `true` if `condition` has been met within the specified timeout. + pub async fn settle_on(&mut self, rng: &mut RNG, condition: F, within: Duration) + where + RNG: Rng + ?Sized, + F: Fn(&HashMap>>) -> bool, + { + time::timeout(within, self.settle_on_indefinitely(rng, condition)) + .await + .unwrap() + } + + async fn settle_on_indefinitely(&mut self, rng: &mut RNG, condition: F) where - Rd: Rng + ?Sized, - F: Fn(&HashMap>) -> bool, + RNG: Rng + ?Sized, + F: Fn(&HashMap>>) -> bool, { loop { - // Check condition. - if f(&self.nodes) { - debug!("network settled"); + if condition(&self.nodes) { + debug!("network settled on meeting condition"); break; } if self.crank_all(rng).await == 0 { // No events processed, wait for a bit to avoid 100% cpu usage. - tokio::time::delay_for(POLL_INTERVAL).await; + time::delay_for(POLL_INTERVAL).await; } } } /// Returns the internal map of nodes. - pub fn nodes(&self) -> &HashMap> { + pub fn nodes(&self) -> &HashMap>> { &self.nodes } + + /// Create effects and dispatch them on the given node. + /// + /// The effects are created via a call to `create_effects` which is itself passed an instance of + /// an `EffectBuilder`. + pub async fn process_injected_effect_on(&mut self, node_id: &R::NodeId, create_effects: F) + where + F: FnOnce(EffectBuilder) -> Effects, + { + let runner = self.nodes.get_mut(node_id).unwrap(); + let node_id = runner.reactor().node_id(); + let span = tracing::error_span!("inject", node_id = %node_id); + runner + .process_injected_effects(create_effects) + .instrument(span) + .await + } } impl Finalize for Network diff --git a/node/src/types/deploy.rs b/node/src/types/deploy.rs index f8325e6962..8140fcdda0 100644 --- a/node/src/types/deploy.rs +++ b/node/src/types/deploy.rs @@ -5,9 +5,19 @@ use std::{ }; use hex::FromHexError; +#[cfg(test)] +use rand::{ + distributions::{Alphanumeric, Distribution, Standard}, + Rng, +}; use serde::{Deserialize, Serialize}; use thiserror::Error; +#[cfg(test)] +use crate::crypto::{ + asymmetric_key::{self, SecretKey}, + hash, +}; use crate::{ components::{ contract_runtime::core::engine_state::executable_deploy_item::ExecutableDeployItem, @@ -206,6 +216,73 @@ impl Display for Deploy { } } +#[cfg(test)] +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> Deploy { + let seed: u64 = rng.gen(); + let hash = DeployHash::new(hash::hash(seed.to_le_bytes())); + + let secret_key = SecretKey::generate_ed25519(); + let account = PublicKey::from(&secret_key); + + let timestamp = seed + 100; + let gas_price = seed + 101; + let body_hash = hash::hash(seed.overflowing_add(102).0.to_le_bytes()); + let ttl_millis = seed as u32 + 103; + + let dependencies = vec![ + DeployHash::new(hash::hash(seed.overflowing_add(104).0.to_le_bytes())), + DeployHash::new(hash::hash(seed.overflowing_add(105).0.to_le_bytes())), + DeployHash::new(hash::hash(seed.overflowing_add(106).0.to_le_bytes())), + ]; + + let chain_name = std::iter::repeat_with(|| rng.sample(Alphanumeric)) + .take(10) + .collect(); + + let header = DeployHeader { + account, + timestamp, + gas_price, + body_hash, + ttl_millis, + dependencies, + chain_name, + }; + + let payment = ExecutableDeployItem::ModuleBytes { + module_bytes: hash::hash(seed.overflowing_add(107).0.to_le_bytes()) + .as_ref() + .to_vec(), + args: hash::hash(seed.overflowing_add(108).0.to_le_bytes()) + .as_ref() + .to_vec(), + }; + let session = ExecutableDeployItem::ModuleBytes { + module_bytes: hash::hash(seed.overflowing_add(109).0.to_le_bytes()) + .as_ref() + .to_vec(), + args: hash::hash(seed.overflowing_add(1110).0.to_le_bytes()) + .as_ref() + .to_vec(), + }; + + let approvals = vec![ + asymmetric_key::sign(&[3], &secret_key, &account), + asymmetric_key::sign(&[4], &secret_key, &account), + asymmetric_key::sign(&[5], &secret_key, &account), + ]; + + Deploy { + hash, + header, + payment, + session, + approvals, + } + } +} + /// This module provides structs which map to the main deploy types, but which are suitable for /// encoding to and decoding from JSON. For all fields with binary data, this is converted to/from /// hex strings. diff --git a/node/src/utils/gossip_table.rs b/node/src/utils/gossip_table.rs index fefdbc5077..ad43c9998e 100644 --- a/node/src/utils/gossip_table.rs +++ b/node/src/utils/gossip_table.rs @@ -115,6 +115,11 @@ impl Config { }) } + #[cfg(test)] + pub(crate) fn infection_target(&self) -> u8 { + self.infection_target + } + pub(crate) fn gossip_request_timeout_secs(&self) -> u64 { self.gossip_request_timeout_secs }