diff --git a/Cargo.lock b/Cargo.lock index 556938a825..83f03f195e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -463,6 +463,7 @@ dependencies = [ "parking_lot 0.10.2", "pem", "pnet", + "prometheus", "proptest", "pwasm-utils", "rand 0.7.3", @@ -2895,6 +2896,20 @@ dependencies = [ "unicode-xid 0.2.1", ] +[[package]] +name = "prometheus" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd0ced56dee39a6e960c15c74dc48849d614586db2eaada6497477af7c7811cd" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "protobuf", + "spin", + "thiserror", +] + [[package]] name = "proptest" version = "0.9.6" @@ -3707,6 +3722,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "standard-payment" version = "0.1.0" diff --git a/node/Cargo.toml b/node/Cargo.toml index 302e6caffa..93d8e2dee8 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -47,6 +47,7 @@ openssl = "0.10.29" parity-wasm = "0.41.0" parking_lot = "0.10.0" pem = "0.8.1" +prometheus = "0.9.0" proptest = { version = "0.9.4", optional = true } pwasm-utils = "0.12.0" rand = "0.7.3" diff --git a/node/src/app/cli.rs b/node/src/app/cli.rs index 7dec1aa640..4ad92096eb 100644 --- a/node/src/app/cli.rs +++ b/node/src/app/cli.rs @@ -219,7 +219,7 @@ impl Cli { bail!("failed to initialize successfully"); } - let mut runner = Runner::::from(initializer).await?; + let mut runner = Runner::::new(initializer, &mut rng).await?; runner.run(&mut rng).await; } } diff --git a/node/src/components.rs b/node/src/components.rs index 9e58cc6465..7fd71ecb28 100644 --- a/node/src/components.rs +++ b/node/src/components.rs @@ -10,6 +10,7 @@ pub(crate) mod deploy_buffer; pub(crate) mod deploy_gossiper; // The `in_memory_network` is public for use in doctests. pub mod in_memory_network; +pub(crate) mod metrics; pub(crate) mod pinger; pub(crate) mod small_network; pub(crate) mod storage; diff --git a/node/src/components/api_server.rs b/node/src/components/api_server.rs index 7d02e9d3d2..831374f083 100644 --- a/node/src/components/api_server.rs +++ b/node/src/components/api_server.rs @@ -20,9 +20,10 @@ mod config; mod event; -use std::{error::Error as StdError, net::SocketAddr, str}; +use std::{borrow::Cow, error::Error as StdError, net::SocketAddr, str}; use bytes::Bytes; +use futures::FutureExt; use http::Response; use rand::Rng; use smallvec::smallvec; @@ -42,7 +43,7 @@ use crate::{ crypto::hash::Digest, effect::{ announcements::ApiServerAnnouncement, - requests::{ApiRequest, ContractRuntimeRequest, StorageRequest}, + requests::{ApiRequest, ContractRuntimeRequest, MetricsRequest, StorageRequest}, EffectBuilder, EffectExt, Effects, }, reactor::QueueKind, @@ -52,6 +53,7 @@ pub use config::Config; pub(crate) use event::Event; const DEPLOYS_API_PATH: &str = "deploys"; +const METRICS_API_PATH: &str = "metrics"; #[derive(Debug)] pub(crate) struct ApiServer {} @@ -68,9 +70,10 @@ impl ApiServer { impl Component for ApiServer where - REv: From> - + From + REv: From + From + + From + + From> + Send, { type Event = Event; @@ -100,6 +103,12 @@ where result: Box::new(result), main_responder: responder, }), + Event::ApiRequest(ApiRequest::GetMetrics { responder }) => effect_builder + .get_metrics() + .event(move |text| Event::GetMetricsResult { + text, + main_responder: responder, + }), Event::GetDeployResult { hash: _, result, @@ -109,6 +118,10 @@ where result, main_responder, } => main_responder.respond(*result).ignore(), + Event::GetMetricsResult { + text, + main_responder, + } => main_responder.respond(text).ignore(), } } } @@ -128,10 +141,29 @@ where .and(warp::path::tail()) .and_then(move |hex_digest| parse_get_request(effect_builder, hex_digest)); + let get_metrics = warp::get() + .and(warp::path(METRICS_API_PATH)) + .and_then(move || { + effect_builder + .make_request( + |responder| ApiRequest::GetMetrics { responder }, + QueueKind::Api, + ) + .map(|text_opt| match text_opt { + Some(text) => { + Ok::<_, Rejection>(reply::with_status(Cow::from(text), StatusCode::OK)) + } + None => Ok(reply::with_status( + Cow::from("failed to collect metrics. sorry!"), + StatusCode::INTERNAL_SERVER_ERROR, + )), + }) + }); + let mut server_addr = SocketAddr::from((config.bind_interface, config.bind_port)); debug!(%server_addr, "starting HTTP server"); - match warp::serve(post_deploy.or(get_deploy)).try_bind_ephemeral(server_addr) { + match warp::serve(post_deploy.or(get_deploy).or(get_metrics)).try_bind_ephemeral(server_addr) { Ok((addr, server_fut)) => { info!(%addr, "started HTTP server"); return server_fut.await; diff --git a/node/src/components/api_server/event.rs b/node/src/components/api_server/event.rs index a5f695e44f..756a1c7512 100644 --- a/node/src/components/api_server/event.rs +++ b/node/src/components/api_server/event.rs @@ -21,6 +21,10 @@ pub enum Event { result: Box>>, main_responder: Responder>>, }, + GetMetricsResult { + text: Option, + main_responder: Responder>, + }, } impl Display for Event { @@ -33,6 +37,10 @@ impl Display for Event { Event::ListDeploysResult { result, .. } => { write!(formatter, "ListDeployResult: {:?}", result) } + Event::GetMetricsResult { text, .. } => match text { + Some(tx) => write!(formatter, "GetMetricsResult ({} bytes)", tx.len()), + None => write!(formatter, "GetMetricsResult (failed)"), + }, } } } diff --git a/node/src/components/in_memory_network.rs b/node/src/components/in_memory_network.rs index e1811374dd..13c80c2aea 100644 --- a/node/src/components/in_memory_network.rs +++ b/node/src/components/in_memory_network.rs @@ -20,6 +20,7 @@ //! # //! # use derive_more::From; //! # use maplit::hashmap; +//! # use prometheus::Registry; //! # use rand::{Rng, rngs::OsRng}; //! # //! # use casperlabs_node::{components::{Component, @@ -164,12 +165,14 @@ //! impl reactor::Reactor for Reactor { //! type Event = Event; //! type Config = (); -//! type Error = (); +//! type Error = anyhow::Error; //! -//! fn new(_cfg: Self::Config, +//! fn new( +//! _cfg: Self::Config, +//! _registry: &Registry, //! event_queue: EventQueueHandle, //! rng: &mut R, -//! ) -> Result<(Self, Effects), ()> { +//! ) -> Result<(Self, Effects), anyhow::Error> { //! let effect_builder = EffectBuilder::new(event_queue); //! let (shouter, shouter_effect) = Shouter::new(effect_builder); //! diff --git a/node/src/components/metrics.rs b/node/src/components/metrics.rs new file mode 100644 index 0000000000..8a1fb34c60 --- /dev/null +++ b/node/src/components/metrics.rs @@ -0,0 +1,77 @@ +//! Metrics component. +//! +//! The metrics component renders metrics upon request. +//! +//! # Adding metrics to a component +//! +//! When adding metrics to an existing component, there are a few guidelines that should in general +//! be followed: +//! +//! 1. For a component `XYZ`, there should be a `XYZMetrics` struct that is one of its fields that +//! holds all of the `Collectors` (`Counter`s, etc) to make it easy to find all of the metrics +//! for a component in one place. +//! +//! Creation and instantiation of this component happens inside the `reactor::Reactor::new` +//! function, which is passed in a `prometheus::Registry` (see 2.). +//! +//! 2. Instantiation of an `XYZMetrics` struct should always be combined with registering all of +//! the metrics on a registry. For this reason it is advisable to have the `XYZMetrics::new` +//! method take a `prometheus::Registry` and register it directly. +//! +//! 3. Updating metrics is done inside the `handle_event` function by simply calling methods on the +//! fields of `self.metrics` (`: XYZMetrics`). **Important**: Metrics should never be read to +//! prevent any actual logic depending on them. If a counter is being increment as a metric and +//! also required for busines logic, a second counter should be kept in the component's state. + +use prometheus::{Encoder, Registry, TextEncoder}; +use rand::Rng; +use tracing::error; + +use crate::{ + components::Component, + effect::{requests::MetricsRequest, EffectBuilder, EffectExt, Effects}, +}; + +/// The metrics component. +#[derive(Debug)] +pub(crate) struct Metrics { + /// Metrics registry used to answer metrics queries. + registry: Registry, +} + +impl Component for Metrics { + type Event = MetricsRequest; + + fn handle_event( + &mut self, + _effect_builder: EffectBuilder, + _rng: &mut R, + req: Self::Event, + ) -> Effects { + match req { + MetricsRequest::RenderNodeMetricsText { responder } => { + let mut buf: Vec = Vec::::new(); + + if let Err(e) = TextEncoder::new().encode(&self.registry.gather(), &mut buf) { + error!(%e, "text encoding of metrics failed"); + return responder.respond(None).ignore(); + }; + + match String::from_utf8(buf) { + Ok(text) => responder.respond(Some(text)).ignore(), + Err(e) => { + error!(%e, "generated text metrics are not valid UTF-8"); + responder.respond(None).ignore() + } + } + } + } + } +} + +impl Metrics { + /// Create and initialize a new metrics component. + pub(crate) fn new(registry: Registry) -> Self { + Metrics { registry } + } +} diff --git a/node/src/components/pinger.rs b/node/src/components/pinger.rs index 86681ca6e1..fafb3d30b5 100644 --- a/node/src/components/pinger.rs +++ b/node/src/components/pinger.rs @@ -9,6 +9,7 @@ use std::{ time::Duration, }; +use prometheus::{IntCounter, Registry}; use rand::Rng; use serde::{Deserialize, Serialize}; use tracing::info; @@ -28,6 +29,31 @@ pub(crate) struct Pinger { responsive_nodes: HashSet, /// Increasing ping counter. ping_counter: u32, + /// Pinger metrics. + metrics: PingerMetrics, +} + +/// Metrics for the pinger component. +#[derive(Debug)] +struct PingerMetrics { + /// Number of pings sent out. + pings_sent: IntCounter, + /// Number of pongs received. + pongs_received: IntCounter, +} + +impl PingerMetrics { + fn new(registry: &Registry) -> Result { + let pings_sent = IntCounter::new("pinger_pings_sent", "number of pings received")?; + let pongs_received = IntCounter::new("pinger_pongs_received", "number of pongs received")?; + registry.register(Box::new(pings_sent.clone()))?; + registry.register(Box::new(pongs_received.clone()))?; + + Ok(PingerMetrics { + pings_sent, + pongs_received, + }) + } } /// Interval in which to send pings. @@ -96,6 +122,9 @@ where sender, msg: Message::Pong(counter), } => { + // We count all pongs, even if they're stale. + self.metrics.pongs_received.inc(); + // We've received a pong, if it is valid (same counter value), process it. if counter == self.ping_counter { self.responsive_nodes.insert(sender); @@ -112,17 +141,19 @@ where impl Pinger { /// Create and initialize a new pinger. pub(crate) fn new + Send + From>>( + registry: &Registry, effect_builder: EffectBuilder, - ) -> (Self, Effects) { + ) -> Result<(Self, Effects), prometheus::Error> { let mut pinger = Pinger { responsive_nodes: HashSet::new(), ping_counter: 0, + metrics: PingerMetrics::new(registry)?, }; // We send out a round of pings immediately on construction. let init = pinger.send_pings(effect_builder); - (pinger, init) + Ok((pinger, init)) } /// Broadcast a ping and set a timer for the next broadcast. @@ -138,6 +169,7 @@ impl Pinger { // We increment the counter and clear pings beforehand, thus causing all pongs that are // still in flight to be timeouts. self.ping_counter += 1; + self.metrics.pings_sent.inc(); self.responsive_nodes.clear(); let mut effects: Effects = Effects::new(); diff --git a/node/src/components/small_network/test.rs b/node/src/components/small_network/test.rs index b0cfeab588..bb8304820a 100644 --- a/node/src/components/small_network/test.rs +++ b/node/src/components/small_network/test.rs @@ -23,6 +23,7 @@ use crate::{ testing::network::{Network, NetworkedReactor}, }; use pnet::datalink; +use prometheus::Registry; use rand::{rngs::OsRng, Rng}; use reactor::{wrap_effects, Finalize}; use tokio::time::{timeout, Timeout}; @@ -80,6 +81,7 @@ impl Reactor for TestReactor { fn new( cfg: Self::Config, + _registry: &Registry, event_queue: EventQueueHandle, _rng: &mut R, ) -> anyhow::Result<(Self, Effects)> { diff --git a/node/src/effect.rs b/node/src/effect.rs index 958b30a6ce..92254a8105 100644 --- a/node/src/effect.rs +++ b/node/src/effect.rs @@ -88,7 +88,9 @@ use crate::{ use announcements::{ ApiServerAnnouncement, ConsensusAnnouncement, NetworkAnnouncement, StorageAnnouncement, }; -use requests::{ContractRuntimeRequest, DeployQueueRequest, NetworkRequest, StorageRequest}; +use requests::{ + ContractRuntimeRequest, DeployQueueRequest, MetricsRequest, NetworkRequest, StorageRequest, +}; /// A pinned, boxed future that produces one or more events. pub type Effect = BoxFuture<'static, Multiple>; @@ -312,6 +314,20 @@ impl EffectBuilder { Instant::now() - then } + /// Retrieve a snapshot of the nodes current metrics formatted as string. + /// + /// If an error occurred producing the metrics, `None` is returned. + pub(crate) async fn get_metrics(self) -> Option + where + REv: From, + { + self.make_request( + |responder| MetricsRequest::RenderNodeMetricsText { responder }, + QueueKind::Api, + ) + .await + } + /// Sends a network message. /// /// The message is queued in "fire-and-forget" fashion, there is no guarantee that the peer diff --git a/node/src/effect/requests.rs b/node/src/effect/requests.rs index 7235cbb2f6..5672bc31c2 100644 --- a/node/src/effect/requests.rs +++ b/node/src/effect/requests.rs @@ -36,6 +36,24 @@ use crate::{ Chainspec, }; +/// A metrics request. +#[derive(Debug)] +pub enum MetricsRequest { + /// Render current node metrics as prometheus-formatted string. + RenderNodeMetricsText { + /// Resopnder returning the rendered metrics or `None`, if an internal error occurred. + responder: Responder>, + }, +} + +impl Display for MetricsRequest { + fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { + match self { + MetricsRequest::RenderNodeMetricsText { .. } => write!(formatter, "get metrics text"), + } + } +} + /// A networking request. #[derive(Debug)] #[must_use] @@ -314,6 +332,11 @@ pub enum ApiRequest { /// Responder to call with the result. responder: Responder, storage::Error>>, }, + /// Return string formatted, prometheus compatible metrics or `None` if an error occured. + GetMetrics { + /// Responder to call with the result. + responder: Responder>, + }, } impl Display for ApiRequest { @@ -322,6 +345,7 @@ impl Display for ApiRequest { ApiRequest::SubmitDeploy { deploy, .. } => write!(formatter, "submit {}", *deploy), ApiRequest::GetDeploy { hash, .. } => write!(formatter, "get {}", hash), ApiRequest::ListDeploys { .. } => write!(formatter, "list deploys"), + ApiRequest::GetMetrics { .. } => write!(formatter, "get metrics"), } } } diff --git a/node/src/reactor.rs b/node/src/reactor.rs index 5b5eb391ec..3fcd18c3eb 100644 --- a/node/src/reactor.rs +++ b/node/src/reactor.rs @@ -34,6 +34,7 @@ use std::{ }; use futures::{future::BoxFuture, FutureExt}; +use prometheus::{self, IntCounter, Registry}; use rand::Rng; use tracing::{debug, debug_span, info, trace, warn}; use tracing_futures::Instrument; @@ -121,6 +122,7 @@ pub trait Reactor: Sized { /// If any instantiation fails, an error is returned. fn new( cfg: Self::Config, + registry: &Registry, event_queue: EventQueueHandle, rng: &mut R, ) -> Result<(Self, Effects), Self::Error>; @@ -145,16 +147,6 @@ pub trait Finalize: Sized { } } -/// Reactor extension trait. -pub trait ReactorExt: Reactor { - /// Creates a new instance of the reactor by taking the components from a previous reactor (not - /// necessarily of the same concrete type). - fn new_from( - event_queue: EventQueueHandle, - reactor: R, - ) -> Result<(Self, Effects), Self::Error>; -} - /// A runner for a reactor. /// /// The runner manages a reactors event queue and reactor itself and can run it either continuously @@ -172,52 +164,55 @@ where /// Counter for events, to aid tracing. event_count: usize, + + /// Metrics for the runner. + metrics: RunnerMetrics, +} + +/// Metric data for the Runner +#[derive(Debug)] +struct RunnerMetrics { + /// Total number of events processed. + events: IntCounter, +} + +impl RunnerMetrics { + /// Create and register new runner metrics. + fn new(registry: &Registry) -> Result { + let events = IntCounter::new("runner_events", "total event count")?; + registry.register(Box::new(events.clone()))?; + + Ok(RunnerMetrics { events }) + } } impl Runner where R: Reactor, + R::Error: From, { /// Creates a new runner from a given configuration. #[inline] pub async fn new(cfg: R::Config, rng: &mut Rd) -> Result { - let scheduler = Self::init(); - - let event_queue = EventQueueHandle::new(scheduler); - let (reactor, initial_effects) = R::new(cfg, event_queue, rng)?; - - // Run all effects from component instantiation. - let span = debug_span!("process initial effects"); - process_effects(scheduler, initial_effects) - .instrument(span) - .await; + let event_size = mem::size_of::(); - info!("reactor main loop is ready"); + // Check if the event is of a reasonable size. This only emits a runtime warning at startup + // right now, since storage size of events is not an issue per se, but copying might be + // expensive if events get too large. + if event_size > 16 * mem::size_of::() { + warn!(%event_size, "large event size, consider reducing it or boxing"); + } - Ok(Runner { - scheduler, - reactor, - event_count: 0, - }) - } + let scheduler = utils::leak(Scheduler::new(QueueKind::weights())); - /// Creates a new runner from a given reactor. - /// - /// The `id` is used to identify the runner during logging when debugging and can be chosen - /// arbitrarily. - #[inline] - pub async fn from(old_reactor: R1) -> Result - where - R1: Reactor, - R: ReactorExt, - { - let scheduler = Self::init(); + // Instantiate a new registry for metrics for this reactor. + let registry = Registry::new(); let event_queue = EventQueueHandle::new(scheduler); - let (reactor, initial_effects) = R::new_from(event_queue, old_reactor)?; + let (reactor, initial_effects) = R::new(cfg, ®istry, event_queue, rng)?; // Run all effects from component instantiation. - let span = debug_span!("process initial effects (from)"); + let span = debug_span!("process initial effects"); process_effects(scheduler, initial_effects) .instrument(span) .await; @@ -228,26 +223,20 @@ where scheduler, reactor, event_count: 0, + metrics: RunnerMetrics::new(®istry)?, }) } - fn init() -> &'static Scheduler { - let event_size = mem::size_of::(); - - // Check if the event is of a reasonable size. This only emits a runtime warning at startup - // right now, since storage size of events is not an issue per se, but copying might be - // expensive if events get too large. - if event_size > 16 * mem::size_of::() { - warn!(%event_size, "large event size, consider reducing it or boxing"); - } - - // Create a new event queue for this reactor run. - utils::leak(Scheduler::new(QueueKind::weights())) - } - /// Processes a single event on the event queue. #[inline] pub async fn crank(&mut self, rng: &mut Rd) { + // Create another span for tracing the processing of one event. + let crank_span = tracing::debug_span!("crank", ev = self.event_count); + let _inner_enter = crank_span.enter(); + + self.event_count += 1; + self.metrics.events.inc(); + let event_queue = EventQueueHandle::new(self.scheduler); let effect_builder = EffectBuilder::new(event_queue); diff --git a/node/src/reactor/initializer.rs b/node/src/reactor/initializer.rs index 11c41986e1..a6880e8f75 100644 --- a/node/src/reactor/initializer.rs +++ b/node/src/reactor/initializer.rs @@ -6,6 +6,7 @@ use std::{ }; use derive_more::From; +use prometheus::Registry; use rand::Rng; use thiserror::Error; use tracing::debug; @@ -70,6 +71,10 @@ pub enum Error { #[error("config error: {0}")] ConfigError(String), + /// Metrics-related error + #[error("prometheus (metrics) error: {0}")] + Metrics(#[from] prometheus::Error), + /// `ChainspecHandler` component error. #[error("chainspec error: {0}")] Chainspec(#[from] chainspec_handler::Error), @@ -86,10 +91,10 @@ pub enum Error { /// Validator node reactor. #[derive(Debug)] pub struct Reactor { - config: validator::Config, + pub(super) config: validator::Config, chainspec_handler: ChainspecHandler, - storage: Storage, - contract_runtime: ContractRuntime, + pub(super) storage: Storage, + pub(super) contract_runtime: ContractRuntime, } impl Reactor { @@ -97,10 +102,6 @@ impl Reactor { pub fn stopped_successfully(&self) -> bool { self.chainspec_handler.stopped_successfully() } - - pub(super) fn destructure(self) -> (validator::Config, Storage, ContractRuntime) { - (self.config, self.storage, self.contract_runtime) - } } impl reactor::Reactor for Reactor { @@ -110,6 +111,7 @@ impl reactor::Reactor for Reactor { fn new( config: Self::Config, + _registry: &Registry, event_queue: EventQueueHandle, _rng: &mut Rd, ) -> Result<(Self, Effects), Error> { diff --git a/node/src/reactor/validator.rs b/node/src/reactor/validator.rs index ee8a4339ad..b38e1acfd9 100644 --- a/node/src/reactor/validator.rs +++ b/node/src/reactor/validator.rs @@ -8,6 +8,7 @@ mod error; use std::fmt::{self, Display, Formatter}; use derive_more::From; +use prometheus::Registry; use rand::Rng; use serde::{Deserialize, Serialize}; use tracing::info; @@ -19,8 +20,9 @@ use crate::{ contract_runtime::{self, ContractRuntime}, deploy_buffer::{self, DeployBuffer}, deploy_gossiper::{self, DeployGossiper}, + metrics::Metrics, pinger::{self, Pinger}, - storage::{Storage, StorageType}, + storage::Storage, Component, }, effect::{ @@ -28,7 +30,8 @@ use crate::{ ApiServerAnnouncement, ConsensusAnnouncement, NetworkAnnouncement, StorageAnnouncement, }, requests::{ - ApiRequest, ContractRuntimeRequest, DeployQueueRequest, NetworkRequest, StorageRequest, + ApiRequest, ContractRuntimeRequest, DeployQueueRequest, MetricsRequest, NetworkRequest, + StorageRequest, }, EffectBuilder, Effects, }, @@ -100,6 +103,9 @@ pub enum Event { /// Deploy queue request. #[from] DeployQueueRequest(DeployQueueRequest), + /// Metrics request. + #[from] + MetricsRequest(MetricsRequest), // Announcements /// Network announcement. @@ -159,6 +165,7 @@ impl Display for Event { Event::ContractRuntime(event) => write!(f, "contract runtime: {}", event), Event::NetworkRequest(req) => write!(f, "network request: {}", req), Event::DeployQueueRequest(req) => write!(f, "deploy queue request: {}", req), + Event::MetricsRequest(req) => write!(f, "metrics request: {}", req), Event::NetworkAnnouncement(ann) => write!(f, "network announcement: {}", ann), Event::ApiServerAnnouncement(ann) => write!(f, "api server announcement: {}", ann), Event::ConsensusAnnouncement(ann) => write!(f, "consensus announcement: {}", ann), @@ -170,6 +177,7 @@ impl Display for Event { /// Validator node reactor. #[derive(Debug)] pub struct Reactor { + metrics: Metrics, net: SmallNetwork, pinger: Pinger, storage: Storage, @@ -180,17 +188,33 @@ pub struct Reactor { deploy_buffer: DeployBuffer, } -impl Reactor { - fn init( - config: Config, - event_queue: EventQueueHandle, - storage: Storage, - contract_runtime: ContractRuntime, +impl reactor::Reactor for Reactor { + type Event = Event; + + // The "configuration" is in fact the whole state of the initializer reactor, which we + // deconstruct and reuse. + type Config = initializer::Reactor; + type Error = Error; + + fn new( + initializer: Self::Config, + registry: &Registry, + event_queue: EventQueueHandle, + _rng: &mut R, ) -> Result<(Self, Effects), Error> { + let initializer::Reactor { + config, + storage, + contract_runtime, + .. + } = initializer; + + let metrics = Metrics::new(registry.clone()); + let effect_builder = EffectBuilder::new(event_queue); let (net, net_effects) = SmallNetwork::new(event_queue, config.validator_net)?; - let (pinger, pinger_effects) = Pinger::new(effect_builder); + let (pinger, pinger_effects) = Pinger::new(registry, effect_builder)?; let api_server = ApiServer::new(config.http_server, effect_builder); let timestamp = Timestamp::now(); let (consensus, consensus_effects) = @@ -204,6 +228,7 @@ impl Reactor { Ok(( Reactor { + metrics, net, pinger, storage, @@ -216,22 +241,6 @@ impl Reactor { effects, )) } -} - -impl reactor::Reactor for Reactor { - type Event = Event; - type Config = Config; - type Error = Error; - - fn new( - config: Self::Config, - event_queue: EventQueueHandle, - _rng: &mut R, - ) -> Result<(Self, Effects), Error> { - let storage = Storage::new(&config.storage)?; - let contract_runtime = ContractRuntime::new(&config.storage, config.contract_runtime)?; - Self::init(config, event_queue, storage, contract_runtime) - } fn dispatch_event( &mut self, @@ -283,6 +292,9 @@ impl reactor::Reactor for Reactor { Event::DeployQueueRequest(req) => { self.dispatch_event(effect_builder, rng, Event::DeployQueue(req.into())) } + Event::MetricsRequest(req) => { + self.dispatch_event(effect_builder, rng, Event::MetricsRequest(req)) + } // Announcements: Event::NetworkAnnouncement(NetworkAnnouncement::MessageReceived { @@ -337,13 +349,3 @@ impl reactor::Reactor for Reactor { } } } - -impl reactor::ReactorExt for Reactor { - fn new_from( - event_queue: EventQueueHandle, - initializer_reactor: initializer::Reactor, - ) -> Result<(Self, Effects), Error> { - let (config, storage, contract_runtime) = initializer_reactor.destructure(); - Self::init(config, event_queue, storage, contract_runtime) - } -} diff --git a/node/src/reactor/validator/error.rs b/node/src/reactor/validator/error.rs index 37141bbd2d..3cde9a8624 100644 --- a/node/src/reactor/validator/error.rs +++ b/node/src/reactor/validator/error.rs @@ -5,6 +5,10 @@ use crate::components::{contract_runtime, small_network, storage}; /// Error type returned by the validator reactor. #[derive(Debug, Error)] pub enum Error { + /// Metrics-related error + #[error("prometheus (metrics) error: {0}")] + Metrics(#[from] prometheus::Error), + /// `SmallNetwork` component error. #[error("small network error: {0}")] SmallNetwork(#[from] small_network::Error), diff --git a/node/src/testing/network.rs b/node/src/testing/network.rs index b9fd8f5458..b9b7513174 100644 --- a/node/src/testing/network.rs +++ b/node/src/testing/network.rs @@ -41,6 +41,7 @@ impl Network where R: Reactor + NetworkedReactor, R::Config: Default, + R::Error: From, { /// Creates a new networking node on the network using the default root node port. /// @@ -59,6 +60,7 @@ where impl Network where R: Reactor + NetworkedReactor, + R::Error: From + From, { /// Creates a new network. pub fn new() -> Self { @@ -161,6 +163,7 @@ impl Finalize for Network where R: Finalize + NetworkedReactor + Reactor + Send + 'static, R::NodeId: Send, + R::Error: From, { fn finalize(self) -> BoxFuture<'static, ()> { // We support finalizing networks where the reactor itself can be finalized.