Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ openssl = "0.10.29"
parity-wasm = "0.41.0"
parking_lot = "0.10.0"
pem = "0.8.1"
prometheus = "0.9.0"
proptest = { version = "0.9.4", optional = true }
pwasm-utils = "0.12.0"
rand = "0.7.3"
Expand Down
2 changes: 1 addition & 1 deletion node/src/app/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ impl Cli {
bail!("failed to initialize successfully");
}

let mut runner = Runner::<validator::Reactor>::from(initializer).await?;
let mut runner = Runner::<validator::Reactor>::new(initializer, &mut rng).await?;
runner.run(&mut rng).await;
}
}
Expand Down
1 change: 1 addition & 0 deletions node/src/components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub(crate) mod deploy_buffer;
pub(crate) mod deploy_gossiper;
// The `in_memory_network` is public for use in doctests.
pub mod in_memory_network;
pub(crate) mod metrics;
pub(crate) mod pinger;
pub(crate) mod small_network;
pub(crate) mod storage;
Expand Down
42 changes: 37 additions & 5 deletions node/src/components/api_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
mod config;
mod event;

use std::{error::Error as StdError, net::SocketAddr, str};
use std::{borrow::Cow, error::Error as StdError, net::SocketAddr, str};

use bytes::Bytes;
use futures::FutureExt;
use http::Response;
use rand::Rng;
use smallvec::smallvec;
Expand All @@ -42,7 +43,7 @@ use crate::{
crypto::hash::Digest,
effect::{
announcements::ApiServerAnnouncement,
requests::{ApiRequest, ContractRuntimeRequest, StorageRequest},
requests::{ApiRequest, ContractRuntimeRequest, MetricsRequest, StorageRequest},
EffectBuilder, EffectExt, Effects,
},
reactor::QueueKind,
Expand All @@ -52,6 +53,7 @@ pub use config::Config;
pub(crate) use event::Event;

const DEPLOYS_API_PATH: &str = "deploys";
const METRICS_API_PATH: &str = "metrics";

#[derive(Debug)]
pub(crate) struct ApiServer {}
Expand All @@ -68,9 +70,10 @@ impl ApiServer {

impl<REv> Component<REv> for ApiServer
where
REv: From<StorageRequest<Storage>>
+ From<ApiServerAnnouncement>
REv: From<ApiServerAnnouncement>
+ From<ContractRuntimeRequest>
+ From<MetricsRequest>
+ From<StorageRequest<Storage>>
+ Send,
{
type Event = Event;
Expand Down Expand Up @@ -100,6 +103,12 @@ where
result: Box::new(result),
main_responder: responder,
}),
Event::ApiRequest(ApiRequest::GetMetrics { responder }) => effect_builder
.get_metrics()
.event(move |text| Event::GetMetricsResult {
text,
main_responder: responder,
}),
Event::GetDeployResult {
hash: _,
result,
Expand All @@ -109,6 +118,10 @@ where
result,
main_responder,
} => main_responder.respond(*result).ignore(),
Event::GetMetricsResult {
text,
main_responder,
} => main_responder.respond(text).ignore(),
}
}
}
Expand All @@ -128,10 +141,29 @@ where
.and(warp::path::tail())
.and_then(move |hex_digest| parse_get_request(effect_builder, hex_digest));

let get_metrics = warp::get()
.and(warp::path(METRICS_API_PATH))
.and_then(move || {
effect_builder
.make_request(
|responder| ApiRequest::GetMetrics { responder },
QueueKind::Api,
)
.map(|text_opt| match text_opt {
Some(text) => {
Ok::<_, Rejection>(reply::with_status(Cow::from(text), StatusCode::OK))
}
None => Ok(reply::with_status(
Cow::from("failed to collect metrics. sorry!"),
StatusCode::INTERNAL_SERVER_ERROR,
)),
})
});

let mut server_addr = SocketAddr::from((config.bind_interface, config.bind_port));

debug!(%server_addr, "starting HTTP server");
match warp::serve(post_deploy.or(get_deploy)).try_bind_ephemeral(server_addr) {
match warp::serve(post_deploy.or(get_deploy).or(get_metrics)).try_bind_ephemeral(server_addr) {
Ok((addr, server_fut)) => {
info!(%addr, "started HTTP server");
return server_fut.await;
Expand Down
8 changes: 8 additions & 0 deletions node/src/components/api_server/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ pub enum Event {
result: Box<storage::Result<Vec<DeployHash>>>,
main_responder: Responder<storage::Result<Vec<DeployHash>>>,
},
GetMetricsResult {
text: Option<String>,
main_responder: Responder<Option<String>>,
},
}

impl Display for Event {
Expand All @@ -33,6 +37,10 @@ impl Display for Event {
Event::ListDeploysResult { result, .. } => {
write!(formatter, "ListDeployResult: {:?}", result)
}
Event::GetMetricsResult { text, .. } => match text {
Some(tx) => write!(formatter, "GetMetricsResult ({} bytes)", tx.len()),
None => write!(formatter, "GetMetricsResult (failed)"),
},
}
}
}
9 changes: 6 additions & 3 deletions node/src/components/in_memory_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
//! #
//! # use derive_more::From;
//! # use maplit::hashmap;
//! # use prometheus::Registry;
//! # use rand::{Rng, rngs::OsRng};
//! #
//! # use casperlabs_node::{components::{Component,
Expand Down Expand Up @@ -164,12 +165,14 @@
//! impl reactor::Reactor for Reactor {
//! type Event = Event;
//! type Config = ();
//! type Error = ();
//! type Error = anyhow::Error;
//!
//! fn new<R: Rng + ?Sized>(_cfg: Self::Config,
//! fn new<R: Rng + ?Sized>(
//! _cfg: Self::Config,
//! _registry: &Registry,
//! event_queue: EventQueueHandle<Self::Event>,
//! rng: &mut R,
//! ) -> Result<(Self, Effects<Self::Event>), ()> {
//! ) -> Result<(Self, Effects<Self::Event>), anyhow::Error> {
//! let effect_builder = EffectBuilder::new(event_queue);
//! let (shouter, shouter_effect) = Shouter::new(effect_builder);
//!
Expand Down
77 changes: 77 additions & 0 deletions node/src/components/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//! Metrics component.
//!
//! The metrics component renders metrics upon request.
//!
//! # Adding metrics to a component
//!
//! When adding metrics to an existing component, there are a few guidelines that should in general
//! be followed:
//!
//! 1. For a component `XYZ`, there should be a `XYZMetrics` struct that is one of its fields that
//! holds all of the `Collectors` (`Counter`s, etc) to make it easy to find all of the metrics
//! for a component in one place.
//!
//! Creation and instantiation of this component happens inside the `reactor::Reactor::new`
//! function, which is passed in a `prometheus::Registry` (see 2.).
//!
//! 2. Instantiation of an `XYZMetrics` struct should always be combined with registering all of
//! the metrics on a registry. For this reason it is advisable to have the `XYZMetrics::new`
//! method take a `prometheus::Registry` and register it directly.
//!
//! 3. Updating metrics is done inside the `handle_event` function by simply calling methods on the
//! fields of `self.metrics` (`: XYZMetrics`). **Important**: Metrics should never be read to
//! prevent any actual logic depending on them. If a counter is being increment as a metric and
//! also required for busines logic, a second counter should be kept in the component's state.

use prometheus::{Encoder, Registry, TextEncoder};
use rand::Rng;
use tracing::error;

use crate::{
components::Component,
effect::{requests::MetricsRequest, EffectBuilder, EffectExt, Effects},
};

/// The metrics component.
#[derive(Debug)]
pub(crate) struct Metrics {
/// Metrics registry used to answer metrics queries.
registry: Registry,
}

impl<REv> Component<REv> for Metrics {
type Event = MetricsRequest;

fn handle_event<R: Rng + ?Sized>(
&mut self,
_effect_builder: EffectBuilder<REv>,
_rng: &mut R,
req: Self::Event,
) -> Effects<Self::Event> {
match req {
MetricsRequest::RenderNodeMetricsText { responder } => {
let mut buf: Vec<u8> = Vec::<u8>::new();

if let Err(e) = TextEncoder::new().encode(&self.registry.gather(), &mut buf) {
error!(%e, "text encoding of metrics failed");
return responder.respond(None).ignore();
};

match String::from_utf8(buf) {
Ok(text) => responder.respond(Some(text)).ignore(),
Err(e) => {
error!(%e, "generated text metrics are not valid UTF-8");
responder.respond(None).ignore()
}
}
}
}
}
}

impl Metrics {
/// Create and initialize a new metrics component.
pub(crate) fn new(registry: Registry) -> Self {
Metrics { registry }
}
}
36 changes: 34 additions & 2 deletions node/src/components/pinger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{
time::Duration,
};

use prometheus::{IntCounter, Registry};
use rand::Rng;
use serde::{Deserialize, Serialize};
use tracing::info;
Expand All @@ -28,6 +29,31 @@ pub(crate) struct Pinger {
responsive_nodes: HashSet<NodeId>,
/// Increasing ping counter.
ping_counter: u32,
/// Pinger metrics.
metrics: PingerMetrics,
}

/// Metrics for the pinger component.
#[derive(Debug)]
struct PingerMetrics {
/// Number of pings sent out.
pings_sent: IntCounter,
/// Number of pongs received.
pongs_received: IntCounter,
}

impl PingerMetrics {
fn new(registry: &Registry) -> Result<Self, prometheus::Error> {
let pings_sent = IntCounter::new("pinger_pings_sent", "number of pings received")?;
let pongs_received = IntCounter::new("pinger_pongs_received", "number of pongs received")?;
registry.register(Box::new(pings_sent.clone()))?;
registry.register(Box::new(pongs_received.clone()))?;

Ok(PingerMetrics {
pings_sent,
pongs_received,
})
}
}

/// Interval in which to send pings.
Expand Down Expand Up @@ -96,6 +122,9 @@ where
sender,
msg: Message::Pong(counter),
} => {
// We count all pongs, even if they're stale.
self.metrics.pongs_received.inc();

// We've received a pong, if it is valid (same counter value), process it.
if counter == self.ping_counter {
self.responsive_nodes.insert(sender);
Expand All @@ -112,17 +141,19 @@ where
impl Pinger {
/// Create and initialize a new pinger.
pub(crate) fn new<REv: From<Event> + Send + From<NetworkRequest<NodeId, Message>>>(
registry: &Registry,
effect_builder: EffectBuilder<REv>,
) -> (Self, Effects<Event>) {
) -> Result<(Self, Effects<Event>), prometheus::Error> {
let mut pinger = Pinger {
responsive_nodes: HashSet::new(),
ping_counter: 0,
metrics: PingerMetrics::new(registry)?,
};

// We send out a round of pings immediately on construction.
let init = pinger.send_pings(effect_builder);

(pinger, init)
Ok((pinger, init))
}

/// Broadcast a ping and set a timer for the next broadcast.
Expand All @@ -138,6 +169,7 @@ impl Pinger {
// We increment the counter and clear pings beforehand, thus causing all pongs that are
// still in flight to be timeouts.
self.ping_counter += 1;
self.metrics.pings_sent.inc();
self.responsive_nodes.clear();

let mut effects: Effects<Event> = Effects::new();
Expand Down
2 changes: 2 additions & 0 deletions node/src/components/small_network/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::{
testing::network::{Network, NetworkedReactor},
};
use pnet::datalink;
use prometheus::Registry;
use rand::{rngs::OsRng, Rng};
use reactor::{wrap_effects, Finalize};
use tokio::time::{timeout, Timeout};
Expand Down Expand Up @@ -80,6 +81,7 @@ impl Reactor for TestReactor {

fn new<R: Rng + ?Sized>(
cfg: Self::Config,
_registry: &Registry,
event_queue: EventQueueHandle<Self::Event>,
_rng: &mut R,
) -> anyhow::Result<(Self, Effects<Self::Event>)> {
Expand Down
Loading