diff --git a/src/apps/node/cli.rs b/src/apps/node/cli.rs index 1cc822fa37..e49fc577f8 100644 --- a/src/apps/node/cli.rs +++ b/src/apps/node/cli.rs @@ -7,8 +7,8 @@ use std::{io, io::Write, path::PathBuf}; use anyhow::bail; use structopt::StructOpt; -use crate::{config, logging}; -use casperlabs_node::{reactor, tls}; +use crate::config; +use casperlabs_node::{logging, reactor, tls}; // Note: The docstring on `Cli` is the help shown when calling the binary with `--help`. #[derive(Debug, StructOpt)] diff --git a/src/apps/node/main.rs b/src/apps/node/main.rs index 44c54a8895..955348b7b0 100644 --- a/src/apps/node/main.rs +++ b/src/apps/node/main.rs @@ -5,7 +5,6 @@ mod cli; pub mod config; -pub mod logging; use structopt::StructOpt; diff --git a/src/components/small_network.rs b/src/components/small_network.rs index c218f94d2e..ee8a9cdbd1 100644 --- a/src/components/small_network.rs +++ b/src/components/small_network.rs @@ -80,7 +80,7 @@ use tokio::{ use tokio_openssl::SslStream; use tokio_serde::{formats::SymmetricalMessagePack, SymmetricallyFramed}; use tokio_util::codec::{Framed, LengthDelimitedCodec}; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, warn, Span}; pub(crate) use self::{endpoint::Endpoint, event::Event, message::Message}; use self::{endpoint::EndpointUpdate, error::Result}; @@ -139,6 +139,11 @@ where event_queue: EventQueueHandle, cfg: Config, ) -> Result<(SmallNetwork, Multiple>>)> { + let span = tracing::debug_span!("net"); + let _enter = span.enter(); + + let server_span = tracing::info_span!("server"); + // First, we load or generate the TLS keys. let (cert, private_key) = match (&cfg.cert, &cfg.private_key) { // We're given a cert_file and a private_key file. Just load them, additional checking @@ -177,6 +182,7 @@ where event_queue, tokio::net::TcpListener::from_std(listener).map_err(Error::ListenerConversion)?, server_shutdown_receiver, + server_span, )); let model = SmallNetwork { @@ -208,7 +214,7 @@ where /// waits for it to complete the shutdown. This explicitly allows the background task to finish /// and drop everything it owns, ensuring that resources such as allocated ports are free to be /// reused once this completes. - #[allow(dead_code)] + #[cfg(test)] async fn shutdown_server(&mut self) { // Close the shutdown socket, causing the server to exit. drop(self.shutdown.take()); @@ -471,7 +477,6 @@ where } /// Returns the node id of this network node. - #[cfg(test)] pub(crate) fn node_id(&self) -> NodeId { self.cert.public_key_fingerprint() } @@ -671,9 +676,12 @@ async fn server_task( event_queue: EventQueueHandle, mut listener: tokio::net::TcpListener, shutdown: oneshot::Receiver<()>, + span: Span, ) where REv: From>, { + let _enter = span.enter(); + // The server task is a bit tricky, since it has to wait on incoming connections while at the // same time shut down if the networking component is dropped, otherwise the TCP socket will // stay open, preventing reuse. diff --git a/src/components/small_network/test.rs b/src/components/small_network/test.rs index b8bd7f0825..92e4bc8736 100644 --- a/src/components/small_network/test.rs +++ b/src/components/small_network/test.rs @@ -4,9 +4,8 @@ //! instances of `small_net` arranged in a network. use std::{ - collections::HashSet, + collections::{hash_map::Entry, HashMap, HashSet}, fmt::{self, Debug, Display, Formatter}, - io, time::Duration, }; @@ -14,17 +13,17 @@ use derive_more::From; use futures::{future::join_all, Future}; use serde::{Deserialize, Serialize}; use small_network::NodeId; -use tracing_subscriber::{self, EnvFilter}; use crate::{ components::Component, effect::{announcements::NetworkAnnouncement, Effect, EffectBuilder, Multiple}, + logging, reactor::{self, EventQueueHandle, Reactor}, small_network::{self, SmallNetwork}, }; use pnet::datalink; use tokio::time::{timeout, Timeout}; -use tracing::{debug, dispatcher::DefaultGuard, info}; +use tracing::{debug, field, info, Span}; /// Time interval for which to poll an observed testing network when no events have occurred. const POLL_INTERVAL: Duration = Duration::from_millis(10); @@ -82,9 +81,13 @@ impl Reactor for TestReactor { fn new( cfg: Self::Config, event_queue: EventQueueHandle, + span: &Span, ) -> reactor::Result<(Self, Multiple>)> { let (net, effects) = SmallNetwork::new(event_queue, cfg)?; + let node_id = net.node_id(); + span.record("id", &field::display(node_id)); + Ok(( TestReactor { net }, reactor::wrap_effects(Event::SmallNet, effects), @@ -113,17 +116,21 @@ impl Display for Message { /// cranking until a condition has been reached. #[derive(Debug)] struct Network { - nodes: Vec>, + node_count: usize, + nodes: HashMap>, } impl Network { /// Creates a new network. fn new() -> Self { - Network { nodes: Vec::new() } + Network { + node_count: 0, + nodes: HashMap::new(), + } } /// Creates a new networking node on the network using the default root node port. - async fn add_node(&mut self) -> anyhow::Result<&mut reactor::Runner> { + async fn add_node(&mut self) -> anyhow::Result<(NodeId, &mut reactor::Runner)> { self.add_node_with_config(small_network::Config::default_on_port(TEST_ROOT_NODE_PORT)) .await } @@ -132,19 +139,27 @@ impl Network { async fn add_node_with_config( &mut self, cfg: small_network::Config, - ) -> anyhow::Result<&mut reactor::Runner> { - let runner = reactor::Runner::new(cfg).await?; - self.nodes.push(runner); - - Ok(self - .nodes - .last_mut() - .expect("vector cannot be empty after insertion")) + ) -> anyhow::Result<(NodeId, &mut reactor::Runner)> { + let runner: reactor::Runner = reactor::Runner::new(cfg).await?; + self.node_count += 1; + + let node_id = runner.reactor().net.node_id(); + + let node_ref = match self.nodes.entry(node_id) { + Entry::Occupied(_) => { + // This happens in the event of the extremely unlikely hash collision, or if the + // node ID was set manually. + anyhow::bail!("trying to insert a duplicate node {}", node_id) + } + Entry::Vacant(entry) => entry.insert(runner), + }; + + Ok((node_id, node_ref)) } /// Crank all runners once, returning the number of events processed. async fn crank_all(&mut self) -> usize { - join_all(self.nodes.iter_mut().map(|runner| runner.try_crank())) + join_all(self.nodes.values_mut().map(reactor::Runner::try_crank)) .await .into_iter() .filter(|opt| opt.is_some()) @@ -175,7 +190,7 @@ impl Network { /// Runs the main loop of every reactor until a condition is true. async fn settle_on(&mut self, f: F) where - F: Fn(&[reactor::Runner]) -> bool, + F: Fn(&HashMap>) -> bool, { loop { // Check condition. @@ -191,8 +206,8 @@ impl Network { } } - // Returns the internal list of nodes. - fn nodes(&self) -> &[reactor::Runner] { + // Returns the internal map of nodes. + fn nodes(&self) -> &HashMap> { &self.nodes } @@ -205,7 +220,7 @@ impl Network { /// gets the job done. async fn shutdown(self) { // Shutdown the sender of every reactor node to ensure the port is open again. - for node in self.nodes.into_iter() { + for (_, node) in self.nodes.into_iter() { node.into_inner().net.shutdown_server().await; } @@ -216,18 +231,16 @@ impl Network { /// Sets up logging for testing. /// /// Returns a guard that when dropped out of scope, clears the logger again. -fn init_logging() -> DefaultGuard { +fn init_logging() { // TODO: Write logs to file by default for each test. - tracing::subscriber::set_default( - tracing_subscriber::fmt() - .with_writer(io::stderr) - .with_env_filter(EnvFilter::from_default_env()) - .finish(), - ) + logging::init() + // Ignore the return value, setting the global subscriber will fail if `init_logging` has + // been called before, which we don't care about. + .ok(); } /// Checks whether or not a given network is completely connected. -fn network_is_complete(nodes: &[reactor::Runner]) -> bool { +fn network_is_complete(nodes: &HashMap>) -> bool { // We need at least one node. if nodes.is_empty() { return false; @@ -235,12 +248,12 @@ fn network_is_complete(nodes: &[reactor::Runner]) -> bool { let expected: HashSet<_> = nodes .iter() - .map(|runner| runner.reactor().net.node_id()) + .map(|(_, runner)| runner.reactor().net.node_id()) .collect(); nodes .iter() - .map(|runner| { + .map(|(_, runner)| { let net = &runner.reactor().net; let mut actual = net.connected_nodes(); @@ -316,6 +329,8 @@ async fn run_two_node_network_five_times() { /// Very unlikely to ever fail on a real machine. #[tokio::test] async fn bind_to_real_network_interface() { + init_logging(); + let iface = datalink::interfaces() .into_iter() .find(|net| !net.ips.is_empty() && !net.ips.iter().any(|ip| ip.ip().is_loopback())) diff --git a/src/lib.rs b/src/lib.rs index 53fbfcc3bb..337a084ed2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,6 +25,7 @@ mod components; pub mod crypto; pub mod effect; +pub mod logging; pub mod reactor; pub mod tls; pub mod types; diff --git a/src/apps/node/logging.rs b/src/logging.rs similarity index 92% rename from src/apps/node/logging.rs rename to src/logging.rs index 26a35777c1..2c7ed2e00f 100644 --- a/src/apps/node/logging.rs +++ b/src/logging.rs @@ -104,10 +104,12 @@ where } } -/// Initializes logging system based on settings in configuration. +/// Initializes the logging system. /// -/// Will setup logging as described in this configuration for the whole application. This -/// function should only be called once during the lifetime of the application. +/// This function should only be called once during the lifetime of the application. Do not call +/// this outside of the application or testing code, the installed logger is global. +/// +/// See the `README.md` for hints on how to configure logging at runtime. pub fn init() -> anyhow::Result<()> { let formatter = format::debug_fn(|writer, field, value| { if field.name() == "message" { diff --git a/src/reactor.rs b/src/reactor.rs index 289411e8e4..776f03cdbf 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -35,7 +35,7 @@ use std::{ }; use futures::FutureExt; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, info, trace, warn, Span}; use crate::{ effect::{Effect, EffectBuilder, Multiple}, @@ -115,10 +115,14 @@ pub trait Reactor: Sized { /// This method creates the full state, which consists of all components, and returns a reactor /// instances along with the effects the components generated upon instantiation. /// + /// The function is also given an instance to the tracing span used, this enables it to set up + /// tracing fields like `id` to set an ID for the reactor if desired. + /// /// If any instantiation fails, an error is returned. fn new( cfg: Self::Config, event_queue: EventQueueHandle, + span: &Span, ) -> Result<(Self, Multiple>)>; } @@ -147,6 +151,12 @@ where /// The reactor instance itself. reactor: R, + + /// The logging span indicating which reactor we are in. + span: Span, + + /// Counter for events, to aid tracing. + event_count: usize, } impl Runner @@ -154,37 +164,58 @@ where R: Reactor, { /// Creates a new runner from a given configuration. + /// + /// The `id` is used to identify the runner during logging when debugging and can be chosen + /// arbitrarily. #[inline] pub async fn new(cfg: R::Config) -> Result { + // We create a new logging span, ensuring that we can always associate log messages to this + // specific reactor. This is usually only relevant when running multiple reactors, e.g. + // during testing, so we set the log level to `debug` here. + let span = tracing::debug_span!("node", id = tracing::field::Empty); + let entered = span.enter(); + let event_size = mem::size_of::(); // Check if the event is of a reasonable size. This only emits a runtime warning at startup // right now, since storage size of events is not an issue per se, but copying might be // expensive if events get too large. if event_size > 16 * mem::size_of::() { - warn!( - "event size is {} bytes, consider reducing it or boxing", - event_size - ); + warn!(%event_size, "large event size, consider reducing it or boxing"); } // Create a new event queue for this reactor run. let scheduler = utils::leak(Scheduler::new(QueueKind::weights())); let event_queue = EventQueueHandle::new(scheduler); - let (reactor, initial_effects) = R::new(cfg, event_queue)?; + + let (reactor, initial_effects) = R::new(cfg, event_queue, &span)?; // Run all effects from component instantiation. process_effects(scheduler, initial_effects).await; info!("reactor main loop is ready"); - Ok(Runner { scheduler, reactor }) + drop(entered); + Ok(Runner { + scheduler, + reactor, + span, + event_count: 0, + }) } /// Processes a single event on the event queue. #[inline] pub async fn crank(&mut self) { + let _enter = self.span.enter(); + + // Create another span for tracing the processing of one event. + let crank_span = tracing::debug_span!("crank", ev = self.event_count); + let _inner_enter = crank_span.enter(); + + self.event_count += 1; + let event_queue = EventQueueHandle::new(self.scheduler); let effect_builder = EffectBuilder::new(event_queue); diff --git a/src/reactor/validator.rs b/src/reactor/validator.rs index e0b7aebabe..7e10389757 100644 --- a/src/reactor/validator.rs +++ b/src/reactor/validator.rs @@ -10,6 +10,7 @@ use derive_more::From; use rand::SeedableRng; use rand_chacha::ChaCha20Rng; use serde::{Deserialize, Serialize}; +use tracing::Span; use crate::{ components::{ @@ -138,9 +139,12 @@ impl reactor::Reactor for Reactor { fn new( cfg: Self::Config, event_queue: EventQueueHandle, + span: &Span, ) -> Result<(Self, Multiple>)> { let effect_builder = EffectBuilder::new(event_queue); let (net, net_effects) = SmallNetwork::new(event_queue, cfg.validator_net)?; + span.record("id", &tracing::field::display(net.node_id())); + let (pinger, pinger_effects) = Pinger::new(effect_builder); let storage = Storage::new(cfg.storage)?; let (api_server, api_server_effects) = ApiServer::new(cfg.http_server, effect_builder);