From 16e4791e8eb1a60bf7d376379bfbc7c6e895a00e Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Sat, 11 Jul 2020 19:15:55 +0200 Subject: [PATCH 1/9] Add `prometheus::Registry` to runner --- Cargo.lock | 21 +++++++++++++++++++ node/Cargo.toml | 25 ++++++++++++----------- node/src/components/in_memory_network.rs | 2 ++ node/src/components/small_network/test.rs | 2 ++ node/src/reactor.rs | 7 ++++++- node/src/reactor/initializer.rs | 2 ++ node/src/reactor/validator.rs | 2 ++ 7 files changed, 48 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c64ccf0b11..06ffebb5ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -433,6 +433,7 @@ dependencies = [ "parity-wasm", "parking_lot 0.10.2", "pnet", + "prometheus", "proptest", "pwasm-utils", "rand 0.7.3", @@ -2789,6 +2790,20 @@ dependencies = [ "unicode-xid 0.2.1", ] +[[package]] +name = "prometheus" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd0ced56dee39a6e960c15c74dc48849d614586db2eaada6497477af7c7811cd" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "protobuf", + "spin", + "thiserror", +] + [[package]] name = "proptest" version = "0.9.6" @@ -3574,6 +3589,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "standard-payment" version = "0.1.0" diff --git a/node/Cargo.toml b/node/Cargo.toml index 8fe33ae2e5..02da4ead7c 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -20,38 +20,46 @@ base64 = "0.12.3" bincode = "1.2.1" blake2 = { version = "0.8.1", default-features = false } bytes = "0.5.4" +chrono = "0.4.10" csv = "1.1.3" derive_more = "0.99.7" directories = "2.0.2" ed25519-dalek = { version = "1.0.0-pre.3", default-features = false, features = ["rand", "serde", "u64_backend"] } either = "1.5.3" enum-iterator = "0.6.0" -thiserror = "1.0.18" futures = "0.3.5" getrandom = "0.1.14" hex = "0.4.2" hex_fmt = "0.3.0" +hostname = "0.3.0" http = "0.2.1" itertools = "0.8.2" lazy_static = "1.4.0" +libc = "0.2.66" linked-hash-map = "0.5.2" lmdb = "0.8.0" +log = { version = "0.4.8", features = ["std", "serde", "kv_unstable"] } maplit = "1.0.2" +num = { version = "0.2.0", default-features = false } num-derive = "0.3.0" num-traits = "0.2.10" openssl = "0.10.29" parity-wasm = "0.41.0" +parking_lot = "0.10.0" +prometheus = "0.9.0" +proptest = { version = "0.9.4", optional = true } pwasm-utils = "0.12.0" rand = "0.7.3" rand_chacha = "0.2.2" rmp-serde = "0.14.3" semver = { version = "0.10.0", features = ["serde"] } serde = { version = "1.0.110", features = ["derive"] } -serde_json = "1.0.55" serde-big-array = "0.3.0" +serde_json = "1.0.55" smallvec = "1.4.0" structopt = "0.3.14" tempfile = "3.1.0" +thiserror = "1.0.18" tokio = { version = "0.2.20", features = ["macros", "rt-threaded", "sync", "tcp", "time", "blocking"] } tokio-openssl = "0.4.0" tokio-serde = { version = "0.6.1", features = ["messagepack"] } @@ -59,19 +67,12 @@ tokio-util = { version = "0.3.1", features = ["codec"] } toml = "0.5.6" tracing = "0.1.14" tracing-subscriber = "0.2.5" -uint = "0.8.3" -warp = "0.2.3" -wasmi = "0.6.2" types = { path = "../types", package = "casperlabs-types", features = ["std", "gens"] } -chrono = "0.4.10" -hostname = "0.3.0" -libc = "0.2.66" -log = { version = "0.4.8", features = ["std", "serde", "kv_unstable"] } -num = { version = "0.2.0", default-features = false } -proptest = { version = "0.9.4", optional = true } +uint = "0.8.3" uuid = { version = "0.8.1", features = ["serde", "v4"] } wabt = "0.9.2" -parking_lot = "0.10.0" +warp = "0.2.3" +wasmi = "0.6.2" [dev-dependencies] assert_matches = "1.3.0" diff --git a/node/src/components/in_memory_network.rs b/node/src/components/in_memory_network.rs index df02770034..177ef404a6 100644 --- a/node/src/components/in_memory_network.rs +++ b/node/src/components/in_memory_network.rs @@ -20,6 +20,7 @@ //! # //! # use derive_more::From; //! # use maplit::hashmap; +//! # use prometheus::Registry; //! # use rand::Rng; //! # use tracing::Span; //! # @@ -168,6 +169,7 @@ //! type Error = (); //! //! fn new(_cfg: Self::Config, +//! _registry: &Registry, //! event_queue: EventQueueHandle, //! _span: &Span //! ) -> Result<(Self, Effects), ()> { diff --git a/node/src/components/small_network/test.rs b/node/src/components/small_network/test.rs index 51ed0b5843..71528ecc39 100644 --- a/node/src/components/small_network/test.rs +++ b/node/src/components/small_network/test.rs @@ -23,6 +23,7 @@ use crate::{ testing::network::{Network, NetworkedReactor}, }; use pnet::datalink; +use prometheus::Registry; use reactor::{wrap_effects, Finalize}; use tokio::time::{timeout, Timeout}; use tracing::{debug, field, info, Span}; @@ -80,6 +81,7 @@ impl Reactor for TestReactor { fn new( cfg: Self::Config, + registry: &Registry, event_queue: EventQueueHandle, span: &Span, ) -> anyhow::Result<(Self, Effects)> { diff --git a/node/src/reactor.rs b/node/src/reactor.rs index 884c3d95e3..996598b2c2 100644 --- a/node/src/reactor.rs +++ b/node/src/reactor.rs @@ -34,6 +34,7 @@ use std::{ }; use futures::{future::BoxFuture, FutureExt}; +use prometheus::Registry; use tracing::{debug, info, trace, warn, Span}; use crate::{ @@ -122,6 +123,7 @@ pub trait Reactor: Sized { // TODO: Remove `span` parameter and rely on trait to retrieve from reactor where needed. fn new( cfg: Self::Config, + registry: &Registry, event_queue: EventQueueHandle, span: &Span, ) -> Result<(Self, Effects), Self::Error>; @@ -192,8 +194,11 @@ where let (span, scheduler) = Self::init(); let entered = span.enter(); + // Instantiate a new registry for metrics for this reactor. + let registry = Registry::new(); + let event_queue = EventQueueHandle::new(scheduler); - let (reactor, initial_effects) = R::new(cfg, event_queue, &span)?; + let (reactor, initial_effects) = R::new(cfg, ®istry, event_queue, &span)?; // Run all effects from component instantiation. process_effects(scheduler, initial_effects).await; diff --git a/node/src/reactor/initializer.rs b/node/src/reactor/initializer.rs index 10e01bf871..6c5411696f 100644 --- a/node/src/reactor/initializer.rs +++ b/node/src/reactor/initializer.rs @@ -6,6 +6,7 @@ use std::{ }; use derive_more::From; +use prometheus::Registry; use rand::SeedableRng; use rand_chacha::ChaCha20Rng; use thiserror::Error; @@ -102,6 +103,7 @@ impl reactor::Reactor for Reactor { fn new( (chainspec_config_path, config): Self::Config, + registry: &Registry, event_queue: EventQueueHandle, _span: &Span, ) -> Result<(Self, Effects), Error> { diff --git a/node/src/reactor/validator.rs b/node/src/reactor/validator.rs index 667deba558..23da11380c 100644 --- a/node/src/reactor/validator.rs +++ b/node/src/reactor/validator.rs @@ -8,6 +8,7 @@ mod error; use std::fmt::{self, Display, Formatter}; use derive_more::From; +use prometheus::Registry; use rand::SeedableRng; use rand_chacha::ChaCha20Rng; use serde::{Deserialize, Serialize}; @@ -204,6 +205,7 @@ impl reactor::Reactor for Reactor { fn new( config: Self::Config, + registry: &Registry, event_queue: EventQueueHandle, span: &Span, ) -> Result<(Self, Effects), Error> { From c05499934cc913dc9ed9ad821e2019ac2d473b25 Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Sat, 11 Jul 2020 20:22:14 +0200 Subject: [PATCH 2/9] Include first actual metrics component --- node/src/components/in_memory_network.rs | 4 +-- node/src/reactor.rs | 34 +++++++++++++++++++++--- node/src/reactor/initializer.rs | 4 +++ node/src/reactor/validator.rs | 1 + node/src/reactor/validator/error.rs | 4 +++ node/src/testing/network.rs | 3 +++ 6 files changed, 44 insertions(+), 6 deletions(-) diff --git a/node/src/components/in_memory_network.rs b/node/src/components/in_memory_network.rs index 177ef404a6..26ec0ade87 100644 --- a/node/src/components/in_memory_network.rs +++ b/node/src/components/in_memory_network.rs @@ -166,13 +166,13 @@ //! impl reactor::Reactor for Reactor { //! type Event = Event; //! type Config = (); -//! type Error = (); +//! type Error = anyhow::Error; //! //! fn new(_cfg: Self::Config, //! _registry: &Registry, //! event_queue: EventQueueHandle, //! _span: &Span -//! ) -> Result<(Self, Effects), ()> { +//! ) -> Result<(Self, Effects), Self::Error> { //! let effect_builder = EffectBuilder::new(event_queue); //! let (shouter, shouter_effect) = Shouter::new(effect_builder); //! diff --git a/node/src/reactor.rs b/node/src/reactor.rs index 996598b2c2..1959a1526f 100644 --- a/node/src/reactor.rs +++ b/node/src/reactor.rs @@ -34,7 +34,7 @@ use std::{ }; use futures::{future::BoxFuture, FutureExt}; -use prometheus::Registry; +use prometheus::{self, core::Collector, IntCounter, Registry}; use tracing::{debug, info, trace, warn, Span}; use crate::{ @@ -153,6 +153,7 @@ pub trait ReactorExt: Reactor { /// Creates a new instance of the reactor by taking the components from a previous reactor (not /// necessarily of the same concrete type). fn new_from( + registry: &Registry, event_queue: EventQueueHandle, span: &Span, reactor: R, @@ -179,11 +180,32 @@ where /// Counter for events, to aid tracing. event_count: usize, + + /// Metrics for the runner. + metrics: RunnerMetrics, +} + +/// Metric data for the Runner +#[derive(Debug)] +struct RunnerMetrics { + /// Total number of events processed. + events: IntCounter, +} + +impl RunnerMetrics { + /// Create and register new runner metrics. + fn new(registry: &Registry) -> Result { + let events = IntCounter::new("runner_events", "total event count")?; + registry.register(Box::new(events.clone()))?; + + Ok(RunnerMetrics { events }) + } } impl Runner where R: Reactor, + R::Error: From, { /// Creates a new runner from a given configuration. /// @@ -211,13 +233,13 @@ where reactor, span, event_count: 0, + metrics: RunnerMetrics::new(®istry)?, }) } /// Creates a new runner from a given reactor. /// - /// The `id` is used to identify the runner during logging when debugging and can be chosen - /// arbitrarily. + /// Note that this will reset all metrics. #[inline] pub async fn from(old_reactor: R1) -> Result where @@ -227,8 +249,10 @@ where let (span, scheduler) = Self::init(); let entered = span.enter(); + let registry = Registry::new(); + let event_queue = EventQueueHandle::new(scheduler); - let (reactor, initial_effects) = R::new_from(event_queue, &span, old_reactor)?; + let (reactor, initial_effects) = R::new_from(®istry, event_queue, &span, old_reactor)?; // Run all effects from component instantiation. process_effects(scheduler, initial_effects).await; @@ -241,6 +265,7 @@ where reactor, span, event_count: 0, + metrics: RunnerMetrics::new(®istry)?, }) } @@ -275,6 +300,7 @@ where let _inner_enter = crank_span.enter(); self.event_count += 1; + self.metrics.events.inc(); let event_queue = EventQueueHandle::new(self.scheduler); let effect_builder = EffectBuilder::new(event_queue); diff --git a/node/src/reactor/initializer.rs b/node/src/reactor/initializer.rs index 6c5411696f..406f99e44b 100644 --- a/node/src/reactor/initializer.rs +++ b/node/src/reactor/initializer.rs @@ -62,6 +62,10 @@ impl Display for Event { /// Error type returned by the initializer reactor. #[derive(Debug, Error)] pub enum Error { + /// Metrics-related error + #[error("prometheus (metrics) error: {0}")] + Metrics(#[from] prometheus::Error), + /// `ChainspecHandler` component error. #[error("chainspec error: {0}")] Chainspec(#[from] chainspec_handler::Error), diff --git a/node/src/reactor/validator.rs b/node/src/reactor/validator.rs index 23da11380c..8dce4edb42 100644 --- a/node/src/reactor/validator.rs +++ b/node/src/reactor/validator.rs @@ -287,6 +287,7 @@ impl reactor::Reactor for Reactor { impl reactor::ReactorExt for Reactor { fn new_from( + _registry: &Registry, event_queue: EventQueueHandle, span: &Span, initializer_reactor: initializer::Reactor, diff --git a/node/src/reactor/validator/error.rs b/node/src/reactor/validator/error.rs index d423f4064f..198aae7ca4 100644 --- a/node/src/reactor/validator/error.rs +++ b/node/src/reactor/validator/error.rs @@ -5,6 +5,10 @@ use crate::components::{contract_runtime, small_network, storage}; /// Error type returned by the validator reactor. #[derive(Debug, Error)] pub enum Error { + /// Metrics-related error + #[error("prometheus (metrics) error: {0}")] + Metrics(#[from] prometheus::Error), + /// `SmallNetwork` component error. #[error("small network error: {0}")] SmallNetwork(#[from] small_network::Error), diff --git a/node/src/testing/network.rs b/node/src/testing/network.rs index 1b5a60366d..45e06f6abf 100644 --- a/node/src/testing/network.rs +++ b/node/src/testing/network.rs @@ -39,6 +39,7 @@ impl Network where R: Reactor + NetworkedReactor, R::Config: Default, + R::Error: From, { /// Creates a new networking node on the network using the default root node port. /// @@ -54,6 +55,7 @@ where impl Network where R: Reactor + NetworkedReactor, + R::Error: From + From, { /// Creates a new network. pub fn new() -> Self { @@ -146,6 +148,7 @@ impl Finalize for Network where R: Finalize + NetworkedReactor + Reactor + Send + 'static, R::NodeId: Send, + R::Error: From, { fn finalize(self) -> BoxFuture<'static, ()> { // We support finalizing networks where the reactor itself can be finalized. From ad424758bb539fe69b17374929730d9817a1ad10 Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Sat, 11 Jul 2020 20:24:38 +0200 Subject: [PATCH 3/9] Mark `registry` as unused where no metrics are added --- node/src/components/small_network/test.rs | 2 +- node/src/reactor.rs | 2 +- node/src/reactor/initializer.rs | 2 +- node/src/reactor/validator.rs | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/node/src/components/small_network/test.rs b/node/src/components/small_network/test.rs index 71528ecc39..fc067b219a 100644 --- a/node/src/components/small_network/test.rs +++ b/node/src/components/small_network/test.rs @@ -81,7 +81,7 @@ impl Reactor for TestReactor { fn new( cfg: Self::Config, - registry: &Registry, + _registry: &Registry, event_queue: EventQueueHandle, span: &Span, ) -> anyhow::Result<(Self, Effects)> { diff --git a/node/src/reactor.rs b/node/src/reactor.rs index 1959a1526f..0a1af6b82a 100644 --- a/node/src/reactor.rs +++ b/node/src/reactor.rs @@ -34,7 +34,7 @@ use std::{ }; use futures::{future::BoxFuture, FutureExt}; -use prometheus::{self, core::Collector, IntCounter, Registry}; +use prometheus::{self, IntCounter, Registry}; use tracing::{debug, info, trace, warn, Span}; use crate::{ diff --git a/node/src/reactor/initializer.rs b/node/src/reactor/initializer.rs index 406f99e44b..b7373a6b26 100644 --- a/node/src/reactor/initializer.rs +++ b/node/src/reactor/initializer.rs @@ -107,7 +107,7 @@ impl reactor::Reactor for Reactor { fn new( (chainspec_config_path, config): Self::Config, - registry: &Registry, + _registry: &Registry, event_queue: EventQueueHandle, _span: &Span, ) -> Result<(Self, Effects), Error> { diff --git a/node/src/reactor/validator.rs b/node/src/reactor/validator.rs index 8dce4edb42..e2d00c0377 100644 --- a/node/src/reactor/validator.rs +++ b/node/src/reactor/validator.rs @@ -205,7 +205,7 @@ impl reactor::Reactor for Reactor { fn new( config: Self::Config, - registry: &Registry, + _registry: &Registry, event_queue: EventQueueHandle, span: &Span, ) -> Result<(Self, Effects), Error> { From 88787c482d27e4e523bfaa0a1d9f0771a6216d22 Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Sat, 11 Jul 2020 21:33:07 +0200 Subject: [PATCH 4/9] Refactor initializer to avoid code duplication --- node/src/app/cli.rs | 2 +- node/src/reactor.rs | 68 +++++---------------------------- node/src/reactor/initializer.rs | 10 ++--- node/src/reactor/validator.rs | 56 ++++++++++----------------- 4 files changed, 35 insertions(+), 101 deletions(-) diff --git a/node/src/app/cli.rs b/node/src/app/cli.rs index fae4298ac4..598b791874 100644 --- a/node/src/app/cli.rs +++ b/node/src/app/cli.rs @@ -97,7 +97,7 @@ impl Cli { bail!("failed to initialize successfully"); } - let mut runner = Runner::::from(initializer).await?; + let mut runner = Runner::::new(initializer, &mut rng).await?; runner.run(&mut rng).await; } } diff --git a/node/src/reactor.rs b/node/src/reactor.rs index 2440a1080e..3fcd18c3eb 100644 --- a/node/src/reactor.rs +++ b/node/src/reactor.rs @@ -147,17 +147,6 @@ pub trait Finalize: Sized { } } -/// Reactor extension trait. -pub trait ReactorExt: Reactor { - /// Creates a new instance of the reactor by taking the components from a previous reactor (not - /// necessarily of the same concrete type). - fn new_from( - registry: &Registry, - event_queue: EventQueueHandle, - reactor: R, - ) -> Result<(Self, Effects), Self::Error>; -} - /// A runner for a reactor. /// /// The runner manages a reactors event queue and reactor itself and can run it either continuously @@ -205,7 +194,16 @@ where /// Creates a new runner from a given configuration. #[inline] pub async fn new(cfg: R::Config, rng: &mut Rd) -> Result { - let scheduler = Self::init(); + let event_size = mem::size_of::(); + + // Check if the event is of a reasonable size. This only emits a runtime warning at startup + // right now, since storage size of events is not an issue per se, but copying might be + // expensive if events get too large. + if event_size > 16 * mem::size_of::() { + warn!(%event_size, "large event size, consider reducing it or boxing"); + } + + let scheduler = utils::leak(Scheduler::new(QueueKind::weights())); // Instantiate a new registry for metrics for this reactor. let registry = Registry::new(); @@ -229,52 +227,6 @@ where }) } - /// Creates a new runner from a given reactor. - /// - /// Note that this will reset all metrics. - #[inline] - pub async fn from(old_reactor: R1) -> Result - where - R1: Reactor, - R: ReactorExt, - { - let scheduler = Self::init(); - - let registry = Registry::new(); - - let event_queue = EventQueueHandle::new(scheduler); - let (reactor, initial_effects) = R::new_from(®istry, event_queue, old_reactor)?; - - // Run all effects from component instantiation. - let span = debug_span!("process initial effects (from)"); - process_effects(scheduler, initial_effects) - .instrument(span) - .await; - - info!("reactor main loop is ready"); - - Ok(Runner { - scheduler, - reactor, - event_count: 0, - metrics: RunnerMetrics::new(®istry)?, - }) - } - - fn init() -> &'static Scheduler { - let event_size = mem::size_of::(); - - // Check if the event is of a reasonable size. This only emits a runtime warning at startup - // right now, since storage size of events is not an issue per se, but copying might be - // expensive if events get too large. - if event_size > 16 * mem::size_of::() { - warn!(%event_size, "large event size, consider reducing it or boxing"); - } - - // Create a new event queue for this reactor run. - utils::leak(Scheduler::new(QueueKind::weights())) - } - /// Processes a single event on the event queue. #[inline] pub async fn crank(&mut self, rng: &mut Rd) { diff --git a/node/src/reactor/initializer.rs b/node/src/reactor/initializer.rs index 7703d90fee..d234dda9e4 100644 --- a/node/src/reactor/initializer.rs +++ b/node/src/reactor/initializer.rs @@ -80,10 +80,10 @@ pub enum Error { /// Validator node reactor. #[derive(Debug)] pub struct Reactor { - config: validator::Config, + pub(super) config: validator::Config, chainspec_handler: ChainspecHandler, - storage: Storage, - contract_runtime: ContractRuntime, + pub(super) storage: Storage, + pub(super) contract_runtime: ContractRuntime, } impl Reactor { @@ -91,10 +91,6 @@ impl Reactor { pub fn stopped_successfully(&self) -> bool { self.chainspec_handler.stopped_successfully() } - - pub(super) fn destructure(self) -> (validator::Config, Storage, ContractRuntime) { - (self.config, self.storage, self.contract_runtime) - } } impl reactor::Reactor for Reactor { diff --git a/node/src/reactor/validator.rs b/node/src/reactor/validator.rs index 1e4870dc22..14c80b55dc 100644 --- a/node/src/reactor/validator.rs +++ b/node/src/reactor/validator.rs @@ -19,7 +19,7 @@ use crate::{ contract_runtime::{self, ContractRuntime}, deploy_gossiper::{self, DeployGossiper}, pinger::{self, Pinger}, - storage::{Storage, StorageType}, + storage::Storage, Component, }, effect::{ @@ -156,13 +156,27 @@ pub struct Reactor { deploy_gossiper: DeployGossiper, } -impl Reactor { - fn init( - config: Config, - event_queue: EventQueueHandle, - storage: Storage, - contract_runtime: ContractRuntime, +impl reactor::Reactor for Reactor { + type Event = Event; + + // The "configuration" is in fact the whole state of the initializer reactor, which we + // deconstruct and reuse. + type Config = initializer::Reactor; + type Error = Error; + + fn new( + initializer: Self::Config, + _registry: &Registry, + event_queue: EventQueueHandle, + _rng: &mut R, ) -> Result<(Self, Effects), Error> { + let initializer::Reactor { + config, + storage, + contract_runtime, + .. + } = initializer; + let effect_builder = EffectBuilder::new(event_queue); let (net, net_effects) = SmallNetwork::new(event_queue, config.validator_net)?; @@ -189,23 +203,6 @@ impl Reactor { effects, )) } -} - -impl reactor::Reactor for Reactor { - type Event = Event; - type Config = Config; - type Error = Error; - - fn new( - config: Self::Config, - _registry: &Registry, - event_queue: EventQueueHandle, - _rng: &mut R, - ) -> Result<(Self, Effects), Error> { - let storage = Storage::new(&config.storage)?; - let contract_runtime = ContractRuntime::new(&config.storage, config.contract_runtime)?; - Self::init(config, event_queue, storage, contract_runtime) - } fn dispatch_event( &mut self, @@ -274,14 +271,3 @@ impl reactor::Reactor for Reactor { } } } - -impl reactor::ReactorExt for Reactor { - fn new_from( - _registry: &Registry, - event_queue: EventQueueHandle, - initializer_reactor: initializer::Reactor, - ) -> Result<(Self, Effects), Error> { - let (config, storage, contract_runtime) = initializer_reactor.destructure(); - Self::init(config, event_queue, storage, contract_runtime) - } -} From c0eb1872c67acdb403a01d1730c2937cd2570172 Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Sat, 11 Jul 2020 23:08:13 +0200 Subject: [PATCH 5/9] Add the `Metrics` component --- node/src/components.rs | 1 + node/src/components/metrics.rs | 56 ++++++++++++++++++++++++++++++++++ node/src/effect.rs | 16 +++++++++- node/src/effect/requests.rs | 18 +++++++++++ 4 files changed, 90 insertions(+), 1 deletion(-) create mode 100644 node/src/components/metrics.rs diff --git a/node/src/components.rs b/node/src/components.rs index a9c2f77c3d..9aba7e31fb 100644 --- a/node/src/components.rs +++ b/node/src/components.rs @@ -12,6 +12,7 @@ pub(crate) mod deploy_gossiper; pub(crate) mod deploy_buffer; // The `in_memory_network` is public for use in doctests. pub mod in_memory_network; +pub(crate) mod metrics; pub(crate) mod pinger; pub(crate) mod small_network; pub(crate) mod storage; diff --git a/node/src/components/metrics.rs b/node/src/components/metrics.rs new file mode 100644 index 0000000000..187a837d18 --- /dev/null +++ b/node/src/components/metrics.rs @@ -0,0 +1,56 @@ +//! Metrics component. +//! +//! The metrics component renders metrics upon request. + +use prometheus::{Encoder, Registry, TextEncoder}; +use rand::Rng; +use tracing::error; + +use crate::{ + components::Component, + effect::{requests::MetricsRequest, EffectBuilder, EffectExt, Effects}, +}; + +/// The metrics component. +#[derive(Debug)] +pub(crate) struct Metrics { + /// Metrics registry used to answer metrics queries. + registry: Registry, +} + +impl Component for Metrics { + type Event = MetricsRequest; + + fn handle_event( + &mut self, + _effect_builder: EffectBuilder, + _rng: &mut R, + req: Self::Event, + ) -> Effects { + match req { + MetricsRequest::RenderNodeMetricsText { responder } => { + let mut buf: Vec = Vec::::new(); + + if let Err(e) = TextEncoder::new().encode(&self.registry.gather(), &mut buf) { + error!(%e, "text encoding of metrics failed"); + return responder.respond(None).ignore(); + }; + + match String::from_utf8(buf) { + Ok(text) => responder.respond(Some(text)).ignore(), + Err(e) => { + error!(%e, "generated text metrics are not valid UTF-8"); + responder.respond(None).ignore() + } + } + } + } + } +} + +impl Metrics { + /// Create and initialize a new metrics component. + pub(crate) fn new(registry: Registry) -> Self { + Metrics { registry } + } +} diff --git a/node/src/effect.rs b/node/src/effect.rs index 4d335d079f..76cf1bb09b 100644 --- a/node/src/effect.rs +++ b/node/src/effect.rs @@ -86,7 +86,7 @@ use crate::{ Chainspec, }; use announcements::NetworkAnnouncement; -use requests::{ContractRuntimeRequest, NetworkRequest, StorageRequest}; +use requests::{ContractRuntimeRequest, MetricsRequest, NetworkRequest, StorageRequest}; /// A pinned, boxed future that produces one or more events. pub type Effect = BoxFuture<'static, Multiple>; @@ -310,6 +310,20 @@ impl EffectBuilder { Instant::now() - then } + /// Retrieve a snapshot of the nodes current metrics formatted as string. + /// + /// If an error occurred producing the metrics, `None` is returned. + pub(crate) async fn get_metrics(self) -> Option + where + REv: From, + { + self.make_request( + |responder| MetricsRequest::RenderNodeMetricsText { responder }, + QueueKind::Api, + ) + .await + } + /// Sends a network message. /// /// The message is queued in "fire-and-forget" fashion, there is no guarantee that the peer diff --git a/node/src/effect/requests.rs b/node/src/effect/requests.rs index 7dee33dfcc..5598d7a9a2 100644 --- a/node/src/effect/requests.rs +++ b/node/src/effect/requests.rs @@ -20,6 +20,24 @@ use crate::{ Chainspec, }; +/// A metrics request. +#[derive(Debug)] +pub enum MetricsRequest { + /// Render current node metrics as prometheus-formatted string. + RenderNodeMetricsText { + /// Resopnder returning the rendered metrics or `None`, if an internal error occurred. + responder: Responder>, + }, +} + +impl Display for MetricsRequest { + fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { + match self { + MetricsRequest::RenderNodeMetricsText { .. } => write!(formatter, "get metrics text"), + } + } +} + /// A networking request. #[derive(Debug)] pub enum NetworkRequest { From eb39d6711992030702b61a3fa74c4b6dc271dea3 Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Sat, 11 Jul 2020 23:08:55 +0200 Subject: [PATCH 6/9] Add metrics handling to ApiServer component --- node/src/components/api_server.rs | 32 ++++++++++++++++++++++--- node/src/components/api_server/event.rs | 8 +++++++ node/src/effect/requests.rs | 6 +++++ node/src/reactor/validator.rs | 18 ++++++++++++-- 4 files changed, 59 insertions(+), 5 deletions(-) diff --git a/node/src/components/api_server.rs b/node/src/components/api_server.rs index afd3367d4f..a439f66745 100644 --- a/node/src/components/api_server.rs +++ b/node/src/components/api_server.rs @@ -23,6 +23,7 @@ mod event; use std::{error::Error as StdError, net::SocketAddr, str}; use bytes::Bytes; +use futures::FutureExt; use http::Response; use rand::Rng; use tracing::{debug, info, warn}; @@ -40,7 +41,7 @@ use crate::{ components::storage::{self, Storage}, crypto::hash::Digest, effect::{ - requests::{ApiRequest, DeployGossiperRequest, StorageRequest}, + requests::{ApiRequest, DeployGossiperRequest, MetricsRequest, StorageRequest}, EffectBuilder, EffectExt, Effects, }, reactor::QueueKind, @@ -50,6 +51,7 @@ pub use config::Config; pub(crate) use event::Event; const DEPLOYS_API_PATH: &str = "deploys"; +const METRICS_API_PATH: &str = "metrics"; #[derive(Debug)] pub(crate) struct ApiServer {} @@ -66,7 +68,7 @@ impl ApiServer { impl Component for ApiServer where - REv: From> + From + Send, + REv: From> + From + From + Send, { type Event = Event; @@ -97,6 +99,12 @@ where result: Box::new(result), main_responder: responder, }), + Event::ApiRequest(ApiRequest::GetMetrics { responder }) => effect_builder + .get_metrics() + .event(move |text| Event::GetMetricsResult { + text, + main_responder: responder, + }), Event::PutDeployResult { deploy, result, @@ -118,6 +126,10 @@ where result, main_responder, } => main_responder.respond(*result).ignore(), + Event::GetMetricsResult { + text, + main_responder, + } => main_responder.respond(text).ignore(), } } } @@ -137,10 +149,24 @@ where .and(warp::path::tail()) .and_then(move |hex_digest| parse_get_request(effect_builder, hex_digest)); + let get_metrics = warp::get() + .and(warp::path(METRICS_API_PATH)) + .and_then(move || { + effect_builder + .make_request( + |responder| ApiRequest::GetMetrics { responder }, + QueueKind::Api, + ) + .map(|text_opt| match text_opt { + Some(text) => Ok::<_, Rejection>(text), + None => todo!(), + }) + }); + let mut server_addr = SocketAddr::from((config.bind_interface, config.bind_port)); debug!(%server_addr, "starting HTTP server"); - match warp::serve(post_deploy.or(get_deploy)).try_bind_ephemeral(server_addr) { + match warp::serve(post_deploy.or(get_deploy).or(get_metrics)).try_bind_ephemeral(server_addr) { Ok((addr, server_fut)) => { info!(%addr, "started HTTP server"); return server_fut.await; diff --git a/node/src/components/api_server/event.rs b/node/src/components/api_server/event.rs index e05106842c..e66bfb767d 100644 --- a/node/src/components/api_server/event.rs +++ b/node/src/components/api_server/event.rs @@ -26,6 +26,10 @@ pub enum Event { result: Box>>, main_responder: Responder>>, }, + GetMetricsResult { + text: Option, + main_responder: Responder>, + }, } impl Display for Event { @@ -41,6 +45,10 @@ impl Display for Event { Event::ListDeploysResult { result, .. } => { write!(formatter, "ListDeployResult: {:?}", result) } + Event::GetMetricsResult { text, .. } => match text { + Some(tx) => write!(formatter, "GetMetricsResult ({} bytes)", tx.len()), + None => write!(formatter, "GetMetricsResult (failed)"), + }, } } } diff --git a/node/src/effect/requests.rs b/node/src/effect/requests.rs index 5598d7a9a2..3f05c385b9 100644 --- a/node/src/effect/requests.rs +++ b/node/src/effect/requests.rs @@ -310,6 +310,11 @@ pub enum ApiRequest { /// Responder to call with the result. responder: Responder, storage::Error>>, }, + /// Return string formatted, prometheus compatible metrics or `None` if an error occured. + GetMetrics { + /// Responder to call with the result. + responder: Responder>, + }, } impl Display for ApiRequest { @@ -318,6 +323,7 @@ impl Display for ApiRequest { ApiRequest::SubmitDeploy { deploy, .. } => write!(formatter, "submit {}", *deploy), ApiRequest::GetDeploy { hash, .. } => write!(formatter, "get {}", hash), ApiRequest::ListDeploys { .. } => write!(formatter, "list deploys"), + ApiRequest::GetMetrics { .. } => write!(formatter, "get metrics"), } } } diff --git a/node/src/reactor/validator.rs b/node/src/reactor/validator.rs index 14c80b55dc..4d5d3661d1 100644 --- a/node/src/reactor/validator.rs +++ b/node/src/reactor/validator.rs @@ -18,13 +18,16 @@ use crate::{ consensus::{self, EraSupervisor}, contract_runtime::{self, ContractRuntime}, deploy_gossiper::{self, DeployGossiper}, + metrics::Metrics, pinger::{self, Pinger}, storage::Storage, Component, }, effect::{ announcements::NetworkAnnouncement, - requests::{ApiRequest, DeployGossiperRequest, NetworkRequest, StorageRequest}, + requests::{ + ApiRequest, DeployGossiperRequest, MetricsRequest, NetworkRequest, StorageRequest, + }, EffectBuilder, Effects, }, reactor::{self, initializer, EventQueueHandle}, @@ -86,6 +89,9 @@ pub enum Event { /// Network request. #[from] NetworkRequest(NetworkRequest), + /// Metrics request. + #[from] + MetricsRequest(MetricsRequest), // Announcements /// Network announcement. @@ -138,6 +144,7 @@ impl Display for Event { Event::Consensus(event) => write!(f, "consensus: {}", event), Event::DeployGossiper(event) => write!(f, "deploy gossiper: {}", event), Event::NetworkRequest(req) => write!(f, "network request: {}", req), + Event::MetricsRequest(req) => write!(f, "metrics request: {}", req), Event::NetworkAnnouncement(ann) => write!(f, "network announcement: {}", ann), Event::ContractRuntime(event) => write!(f, "contract runtime: {}", event), } @@ -147,6 +154,7 @@ impl Display for Event { /// Validator node reactor. #[derive(Debug)] pub struct Reactor { + metrics: Metrics, net: SmallNetwork, pinger: Pinger, storage: Storage, @@ -166,7 +174,7 @@ impl reactor::Reactor for Reactor { fn new( initializer: Self::Config, - _registry: &Registry, + registry: &Registry, event_queue: EventQueueHandle, _rng: &mut R, ) -> Result<(Self, Effects), Error> { @@ -177,6 +185,8 @@ impl reactor::Reactor for Reactor { .. } = initializer; + let metrics = Metrics::new(registry.clone()); + let effect_builder = EffectBuilder::new(event_queue); let (net, net_effects) = SmallNetwork::new(event_queue, config.validator_net)?; @@ -192,6 +202,7 @@ impl reactor::Reactor for Reactor { Ok(( Reactor { + metrics, net, pinger, storage, @@ -243,6 +254,9 @@ impl reactor::Reactor for Reactor { rng, Event::Network(small_network::Event::from(req)), ), + Event::MetricsRequest(req) => { + self.dispatch_event(effect_builder, rng, Event::MetricsRequest(req)) + } // Announcements: Event::NetworkAnnouncement(NetworkAnnouncement::MessageReceived { From f9b89b4ae666bcfc5ddc25a5090036aa21ebc163 Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Sat, 11 Jul 2020 23:20:48 +0200 Subject: [PATCH 7/9] Return an HTTP 500 error if metrics collection failed --- node/src/components/api_server.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/node/src/components/api_server.rs b/node/src/components/api_server.rs index a439f66745..684c032719 100644 --- a/node/src/components/api_server.rs +++ b/node/src/components/api_server.rs @@ -20,7 +20,7 @@ mod config; mod event; -use std::{error::Error as StdError, net::SocketAddr, str}; +use std::{borrow::Cow, error::Error as StdError, net::SocketAddr, str}; use bytes::Bytes; use futures::FutureExt; @@ -158,8 +158,13 @@ where QueueKind::Api, ) .map(|text_opt| match text_opt { - Some(text) => Ok::<_, Rejection>(text), - None => todo!(), + Some(text) => { + Ok::<_, Rejection>(reply::with_status(Cow::from(text), StatusCode::OK)) + } + None => Ok(reply::with_status( + Cow::from("failed to collect metrics. sorry!"), + StatusCode::INTERNAL_SERVER_ERROR, + )), }) }); From 06854842f3d837464ae059345039de9104fcfe16 Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Sat, 11 Jul 2020 23:32:10 +0200 Subject: [PATCH 8/9] Add documentation on how to add metrics --- node/src/components/metrics.rs | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/node/src/components/metrics.rs b/node/src/components/metrics.rs index 187a837d18..8a1fb34c60 100644 --- a/node/src/components/metrics.rs +++ b/node/src/components/metrics.rs @@ -1,6 +1,27 @@ //! Metrics component. //! //! The metrics component renders metrics upon request. +//! +//! # Adding metrics to a component +//! +//! When adding metrics to an existing component, there are a few guidelines that should in general +//! be followed: +//! +//! 1. For a component `XYZ`, there should be a `XYZMetrics` struct that is one of its fields that +//! holds all of the `Collectors` (`Counter`s, etc) to make it easy to find all of the metrics +//! for a component in one place. +//! +//! Creation and instantiation of this component happens inside the `reactor::Reactor::new` +//! function, which is passed in a `prometheus::Registry` (see 2.). +//! +//! 2. Instantiation of an `XYZMetrics` struct should always be combined with registering all of +//! the metrics on a registry. For this reason it is advisable to have the `XYZMetrics::new` +//! method take a `prometheus::Registry` and register it directly. +//! +//! 3. Updating metrics is done inside the `handle_event` function by simply calling methods on the +//! fields of `self.metrics` (`: XYZMetrics`). **Important**: Metrics should never be read to +//! prevent any actual logic depending on them. If a counter is being increment as a metric and +//! also required for busines logic, a second counter should be kept in the component's state. use prometheus::{Encoder, Registry, TextEncoder}; use rand::Rng; From 0aa01436123a3c4930c11e0672d62449d16be7c6 Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Sat, 11 Jul 2020 23:45:07 +0200 Subject: [PATCH 9/9] Add metrics in pinger component --- node/src/components/pinger.rs | 36 +++++++++++++++++++++++++++++++++-- node/src/reactor/validator.rs | 2 +- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/node/src/components/pinger.rs b/node/src/components/pinger.rs index 86681ca6e1..fafb3d30b5 100644 --- a/node/src/components/pinger.rs +++ b/node/src/components/pinger.rs @@ -9,6 +9,7 @@ use std::{ time::Duration, }; +use prometheus::{IntCounter, Registry}; use rand::Rng; use serde::{Deserialize, Serialize}; use tracing::info; @@ -28,6 +29,31 @@ pub(crate) struct Pinger { responsive_nodes: HashSet, /// Increasing ping counter. ping_counter: u32, + /// Pinger metrics. + metrics: PingerMetrics, +} + +/// Metrics for the pinger component. +#[derive(Debug)] +struct PingerMetrics { + /// Number of pings sent out. + pings_sent: IntCounter, + /// Number of pongs received. + pongs_received: IntCounter, +} + +impl PingerMetrics { + fn new(registry: &Registry) -> Result { + let pings_sent = IntCounter::new("pinger_pings_sent", "number of pings received")?; + let pongs_received = IntCounter::new("pinger_pongs_received", "number of pongs received")?; + registry.register(Box::new(pings_sent.clone()))?; + registry.register(Box::new(pongs_received.clone()))?; + + Ok(PingerMetrics { + pings_sent, + pongs_received, + }) + } } /// Interval in which to send pings. @@ -96,6 +122,9 @@ where sender, msg: Message::Pong(counter), } => { + // We count all pongs, even if they're stale. + self.metrics.pongs_received.inc(); + // We've received a pong, if it is valid (same counter value), process it. if counter == self.ping_counter { self.responsive_nodes.insert(sender); @@ -112,17 +141,19 @@ where impl Pinger { /// Create and initialize a new pinger. pub(crate) fn new + Send + From>>( + registry: &Registry, effect_builder: EffectBuilder, - ) -> (Self, Effects) { + ) -> Result<(Self, Effects), prometheus::Error> { let mut pinger = Pinger { responsive_nodes: HashSet::new(), ping_counter: 0, + metrics: PingerMetrics::new(registry)?, }; // We send out a round of pings immediately on construction. let init = pinger.send_pings(effect_builder); - (pinger, init) + Ok((pinger, init)) } /// Broadcast a ping and set a timer for the next broadcast. @@ -138,6 +169,7 @@ impl Pinger { // We increment the counter and clear pings beforehand, thus causing all pongs that are // still in flight to be timeouts. self.ping_counter += 1; + self.metrics.pings_sent.inc(); self.responsive_nodes.clear(); let mut effects: Effects = Effects::new(); diff --git a/node/src/reactor/validator.rs b/node/src/reactor/validator.rs index 4d5d3661d1..617f293df2 100644 --- a/node/src/reactor/validator.rs +++ b/node/src/reactor/validator.rs @@ -190,7 +190,7 @@ impl reactor::Reactor for Reactor { let effect_builder = EffectBuilder::new(event_queue); let (net, net_effects) = SmallNetwork::new(event_queue, config.validator_net)?; - let (pinger, pinger_effects) = Pinger::new(effect_builder); + let (pinger, pinger_effects) = Pinger::new(registry, effect_builder)?; let api_server = ApiServer::new(config.http_server, effect_builder); let timestamp = Timestamp::now(); let (consensus, consensus_effects) = EraSupervisor::new(timestamp, effect_builder);