Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
820c41b
Instead of allowing dead code, make `shutdown_server` only available …
marc-casperlabs Jun 20, 2020
3944f79
Store nodes of the network in a hashmap instead of a vec to allow ret…
marc-casperlabs Jun 20, 2020
e0617b4
Make node creation functions return node id as well as a reference fo…
marc-casperlabs Jun 20, 2020
6193f58
Make `init_logging` actually initialize logging again
marc-casperlabs Jun 20, 2020
02fb456
Initialize logging `bind_to_real_network_interface` test
marc-casperlabs Jun 20, 2020
022009d
Make logging a library module instead of being app only to facilitate…
marc-casperlabs Jun 20, 2020
a21ef4a
Track a runner/reactor id in the reactor runner
marc-casperlabs Jun 20, 2020
ae3498c
Instead of storing an ID, store node identification on logging span o…
marc-casperlabs Jun 20, 2020
9f66654
Use actual node app logging in tests
marc-casperlabs Jun 20, 2020
d98772e
Remove unnecessary `mut` qualifier
marc-casperlabs Jun 20, 2020
ae0e32b
Store a `server_span` for the server background task
marc-casperlabs Jun 20, 2020
f40fbef
Remove unused code and imports in `small_net` tests
marc-casperlabs Jun 30, 2020
81e47e2
Cleanup imports as suggested
marc-casperlabs Jun 30, 2020
32f1540
Fix formatting in docstring
marc-casperlabs Jun 30, 2020
525e563
Be more elaborate in documentation on logging init
marc-casperlabs Jun 30, 2020
dfed5d5
Simplify iterator and subsequent map in `small_network::test`
marc-casperlabs Jun 30, 2020
081c8a8
Fix typo in `try_crank` call
marc-casperlabs Jun 30, 2020
546651f
Cleanup node insertion logic in `small_net` test network
marc-casperlabs Jun 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/apps/node/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use std::{io, io::Write, path::PathBuf};
use anyhow::bail;
use structopt::StructOpt;

use crate::{config, logging};
use casperlabs_node::{reactor, tls};
use crate::config;
use casperlabs_node::{logging, reactor, tls};

// Note: The docstring on `Cli` is the help shown when calling the binary with `--help`.
#[derive(Debug, StructOpt)]
Expand Down
1 change: 0 additions & 1 deletion src/apps/node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

mod cli;
pub mod config;
pub mod logging;

use structopt::StructOpt;

Expand Down
14 changes: 11 additions & 3 deletions src/components/small_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ use tokio::{
use tokio_openssl::SslStream;
use tokio_serde::{formats::SymmetricalMessagePack, SymmetricallyFramed};
use tokio_util::codec::{Framed, LengthDelimitedCodec};
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info, warn, Span};

pub(crate) use self::{endpoint::Endpoint, event::Event, message::Message};
use self::{endpoint::EndpointUpdate, error::Result};
Expand Down Expand Up @@ -139,6 +139,11 @@ where
event_queue: EventQueueHandle<REv>,
cfg: Config,
) -> Result<(SmallNetwork<REv, P>, Multiple<Effect<Event<P>>>)> {
let span = tracing::debug_span!("net");
let _enter = span.enter();

let server_span = tracing::info_span!("server");

// First, we load or generate the TLS keys.
let (cert, private_key) = match (&cfg.cert, &cfg.private_key) {
// We're given a cert_file and a private_key file. Just load them, additional checking
Expand Down Expand Up @@ -177,6 +182,7 @@ where
event_queue,
tokio::net::TcpListener::from_std(listener).map_err(Error::ListenerConversion)?,
server_shutdown_receiver,
server_span,
));

let model = SmallNetwork {
Expand Down Expand Up @@ -208,7 +214,7 @@ where
/// waits for it to complete the shutdown. This explicitly allows the background task to finish
/// and drop everything it owns, ensuring that resources such as allocated ports are free to be
/// reused once this completes.
#[allow(dead_code)]
#[cfg(test)]
async fn shutdown_server(&mut self) {
// Close the shutdown socket, causing the server to exit.
drop(self.shutdown.take());
Expand Down Expand Up @@ -471,7 +477,6 @@ where
}

/// Returns the node id of this network node.
#[cfg(test)]
pub(crate) fn node_id(&self) -> NodeId {
self.cert.public_key_fingerprint()
}
Expand Down Expand Up @@ -671,9 +676,12 @@ async fn server_task<P, REv>(
event_queue: EventQueueHandle<REv>,
mut listener: tokio::net::TcpListener,
shutdown: oneshot::Receiver<()>,
span: Span,
) where
REv: From<Event<P>>,
{
let _enter = span.enter();

// The server task is a bit tricky, since it has to wait on incoming connections while at the
// same time shut down if the networking component is dropped, otherwise the TCP socket will
// stay open, preventing reuse.
Expand Down
75 changes: 45 additions & 30 deletions src/components/small_network/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,26 @@
//! instances of `small_net` arranged in a network.

use std::{
collections::HashSet,
collections::{hash_map::Entry, HashMap, HashSet},
fmt::{self, Debug, Display, Formatter},
io,
time::Duration,
};

use derive_more::From;
use futures::{future::join_all, Future};
use serde::{Deserialize, Serialize};
use small_network::NodeId;
use tracing_subscriber::{self, EnvFilter};

use crate::{
components::Component,
effect::{announcements::NetworkAnnouncement, Effect, EffectBuilder, Multiple},
logging,
reactor::{self, EventQueueHandle, Reactor},
small_network::{self, SmallNetwork},
};
use pnet::datalink;
use tokio::time::{timeout, Timeout};
use tracing::{debug, dispatcher::DefaultGuard, info};
use tracing::{debug, field, info, Span};

/// Time interval for which to poll an observed testing network when no events have occurred.
const POLL_INTERVAL: Duration = Duration::from_millis(10);
Expand Down Expand Up @@ -82,9 +81,13 @@ impl Reactor for TestReactor {
fn new(
cfg: Self::Config,
event_queue: EventQueueHandle<Self::Event>,
span: &Span,
) -> reactor::Result<(Self, Multiple<Effect<Self::Event>>)> {
let (net, effects) = SmallNetwork::new(event_queue, cfg)?;

let node_id = net.node_id();
span.record("id", &field::display(node_id));

Ok((
TestReactor { net },
reactor::wrap_effects(Event::SmallNet, effects),
Expand Down Expand Up @@ -113,17 +116,21 @@ impl Display for Message {
/// cranking until a condition has been reached.
#[derive(Debug)]
struct Network {
nodes: Vec<reactor::Runner<TestReactor>>,
node_count: usize,
nodes: HashMap<NodeId, reactor::Runner<TestReactor>>,
}

impl Network {
/// Creates a new network.
fn new() -> Self {
Network { nodes: Vec::new() }
Network {
node_count: 0,
nodes: HashMap::new(),
}
}

/// Creates a new networking node on the network using the default root node port.
async fn add_node(&mut self) -> anyhow::Result<&mut reactor::Runner<TestReactor>> {
async fn add_node(&mut self) -> anyhow::Result<(NodeId, &mut reactor::Runner<TestReactor>)> {
self.add_node_with_config(small_network::Config::default_on_port(TEST_ROOT_NODE_PORT))
.await
}
Expand All @@ -132,19 +139,27 @@ impl Network {
async fn add_node_with_config(
&mut self,
cfg: small_network::Config,
) -> anyhow::Result<&mut reactor::Runner<TestReactor>> {
let runner = reactor::Runner::new(cfg).await?;
self.nodes.push(runner);

Ok(self
.nodes
.last_mut()
.expect("vector cannot be empty after insertion"))
) -> anyhow::Result<(NodeId, &mut reactor::Runner<TestReactor>)> {
let runner: reactor::Runner<TestReactor> = reactor::Runner::new(cfg).await?;
self.node_count += 1;

let node_id = runner.reactor().net.node_id();

let node_ref = match self.nodes.entry(node_id) {
Entry::Occupied(_) => {
// This happens in the event of the extremely unlikely hash collision, or if the
// node ID was set manually.
anyhow::bail!("trying to insert a duplicate node {}", node_id)
}
Entry::Vacant(entry) => entry.insert(runner),
};

Ok((node_id, node_ref))
}

/// Crank all runners once, returning the number of events processed.
async fn crank_all(&mut self) -> usize {
join_all(self.nodes.iter_mut().map(|runner| runner.try_crank()))
join_all(self.nodes.values_mut().map(reactor::Runner::try_crank))
.await
.into_iter()
.filter(|opt| opt.is_some())
Expand Down Expand Up @@ -175,7 +190,7 @@ impl Network {
/// Runs the main loop of every reactor until a condition is true.
async fn settle_on<F>(&mut self, f: F)
where
F: Fn(&[reactor::Runner<TestReactor>]) -> bool,
F: Fn(&HashMap<NodeId, reactor::Runner<TestReactor>>) -> bool,
{
loop {
// Check condition.
Expand All @@ -191,8 +206,8 @@ impl Network {
}
}

// Returns the internal list of nodes.
fn nodes(&self) -> &[reactor::Runner<TestReactor>] {
// Returns the internal map of nodes.
fn nodes(&self) -> &HashMap<NodeId, reactor::Runner<TestReactor>> {
&self.nodes
}

Expand All @@ -205,7 +220,7 @@ impl Network {
/// gets the job done.
async fn shutdown(self) {
// Shutdown the sender of every reactor node to ensure the port is open again.
for node in self.nodes.into_iter() {
for (_, node) in self.nodes.into_iter() {
node.into_inner().net.shutdown_server().await;
}

Expand All @@ -216,31 +231,29 @@ impl Network {
/// Sets up logging for testing.
///
/// Returns a guard that when dropped out of scope, clears the logger again.
fn init_logging() -> DefaultGuard {
fn init_logging() {
// TODO: Write logs to file by default for each test.
tracing::subscriber::set_default(
tracing_subscriber::fmt()
.with_writer(io::stderr)
.with_env_filter(EnvFilter::from_default_env())
.finish(),
)
logging::init()
// Ignore the return value, setting the global subscriber will fail if `init_logging` has
// been called before, which we don't care about.
.ok();
}

/// Checks whether or not a given network is completely connected.
fn network_is_complete(nodes: &[reactor::Runner<TestReactor>]) -> bool {
fn network_is_complete(nodes: &HashMap<NodeId, reactor::Runner<TestReactor>>) -> bool {
// We need at least one node.
if nodes.is_empty() {
return false;
}

let expected: HashSet<_> = nodes
.iter()
.map(|runner| runner.reactor().net.node_id())
.map(|(_, runner)| runner.reactor().net.node_id())
.collect();

nodes
.iter()
.map(|runner| {
.map(|(_, runner)| {
let net = &runner.reactor().net;
let mut actual = net.connected_nodes();

Expand Down Expand Up @@ -316,6 +329,8 @@ async fn run_two_node_network_five_times() {
/// Very unlikely to ever fail on a real machine.
#[tokio::test]
async fn bind_to_real_network_interface() {
init_logging();

let iface = datalink::interfaces()
.into_iter()
.find(|net| !net.ips.is_empty() && !net.ips.iter().any(|ip| ip.ip().is_loopback()))
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
mod components;
pub mod crypto;
pub mod effect;
pub mod logging;
pub mod reactor;
pub mod tls;
pub mod types;
Expand Down
8 changes: 5 additions & 3 deletions src/apps/node/logging.rs → src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,12 @@ where
}
}

/// Initializes logging system based on settings in configuration.
/// Initializes the logging system.
///
/// Will setup logging as described in this configuration for the whole application. This
/// function should only be called once during the lifetime of the application.
/// This function should only be called once during the lifetime of the application. Do not call
/// this outside of the application or testing code, the installed logger is global.
///
/// See the `README.md` for hints on how to configure logging at runtime.
pub fn init() -> anyhow::Result<()> {
let formatter = format::debug_fn(|writer, field, value| {
if field.name() == "message" {
Expand Down
45 changes: 38 additions & 7 deletions src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use std::{
};

use futures::FutureExt;
use tracing::{debug, info, trace, warn};
use tracing::{debug, info, trace, warn, Span};

use crate::{
effect::{Effect, EffectBuilder, Multiple},
Expand Down Expand Up @@ -115,10 +115,14 @@ pub trait Reactor: Sized {
/// This method creates the full state, which consists of all components, and returns a reactor
/// instances along with the effects the components generated upon instantiation.
///
/// The function is also given an instance to the tracing span used, this enables it to set up
/// tracing fields like `id` to set an ID for the reactor if desired.
///
/// If any instantiation fails, an error is returned.
fn new(
cfg: Self::Config,
event_queue: EventQueueHandle<Self::Event>,
span: &Span,
) -> Result<(Self, Multiple<Effect<Self::Event>>)>;
}

Expand Down Expand Up @@ -147,44 +151,71 @@ where

/// The reactor instance itself.
reactor: R,

/// The logging span indicating which reactor we are in.
span: Span,

/// Counter for events, to aid tracing.
event_count: usize,
}

impl<R> Runner<R>
where
R: Reactor,
{
/// Creates a new runner from a given configuration.
///
/// The `id` is used to identify the runner during logging when debugging and can be chosen
/// arbitrarily.
#[inline]
pub async fn new(cfg: R::Config) -> Result<Self> {
// We create a new logging span, ensuring that we can always associate log messages to this
// specific reactor. This is usually only relevant when running multiple reactors, e.g.
// during testing, so we set the log level to `debug` here.
let span = tracing::debug_span!("node", id = tracing::field::Empty);
let entered = span.enter();

let event_size = mem::size_of::<R::Event>();

// Check if the event is of a reasonable size. This only emits a runtime warning at startup
// right now, since storage size of events is not an issue per se, but copying might be
// expensive if events get too large.
if event_size > 16 * mem::size_of::<usize>() {
warn!(
"event size is {} bytes, consider reducing it or boxing",
event_size
);
warn!(%event_size, "large event size, consider reducing it or boxing");
}

// Create a new event queue for this reactor run.
let scheduler = utils::leak(Scheduler::new(QueueKind::weights()));

let event_queue = EventQueueHandle::new(scheduler);
let (reactor, initial_effects) = R::new(cfg, event_queue)?;

let (reactor, initial_effects) = R::new(cfg, event_queue, &span)?;

// Run all effects from component instantiation.
process_effects(scheduler, initial_effects).await;

info!("reactor main loop is ready");

Ok(Runner { scheduler, reactor })
drop(entered);
Ok(Runner {
scheduler,
reactor,
span,
event_count: 0,
})
}

/// Processes a single event on the event queue.
#[inline]
pub async fn crank(&mut self) {
let _enter = self.span.enter();

// Create another span for tracing the processing of one event.
let crank_span = tracing::debug_span!("crank", ev = self.event_count);
let _inner_enter = crank_span.enter();

self.event_count += 1;

let event_queue = EventQueueHandle::new(self.scheduler);
let effect_builder = EffectBuilder::new(event_queue);

Expand Down
Loading