diff --git a/batcher/aligned-batcher/src/config/mod.rs b/batcher/aligned-batcher/src/config/mod.rs index 47adce463f..c39fd22cf8 100644 --- a/batcher/aligned-batcher/src/config/mod.rs +++ b/batcher/aligned-batcher/src/config/mod.rs @@ -42,6 +42,7 @@ pub struct BatcherConfigFromYaml { pub max_batch_size: usize, pub eth_ws_reconnects: usize, pub pre_verification_is_enabled: bool, + pub metrics_port: u16, pub non_paying: Option, } diff --git a/batcher/aligned-batcher/src/lib.rs b/batcher/aligned-batcher/src/lib.rs index 486bc62237..252c9f1779 100644 --- a/batcher/aligned-batcher/src/lib.rs +++ b/batcher/aligned-batcher/src/lib.rs @@ -75,6 +75,7 @@ pub struct Batcher { pre_verification_is_enabled: bool, non_paying_config: Option, posting_batch: Mutex, + pub metrics: metrics::BatcherMetrics, } impl Batcher { @@ -101,6 +102,13 @@ impl Batcher { .await .expect("Failed to get ethereum websocket provider"); + log::info!( + "Starting metrics server on port {}", + config.batcher.metrics_port + ); + let metrics = metrics::BatcherMetrics::start(config.batcher.metrics_port) + .expect("Failed to start metrics server"); + let eth_ws_provider_fallback = Provider::connect_with_reconnects( &config.eth_ws_url_fallback, config.batcher.eth_ws_reconnects, @@ -201,6 +209,7 @@ impl Batcher { non_paying_config, posting_batch: Mutex::new(false), batch_state: Mutex::new(batch_state), + metrics, } } @@ -213,7 +222,7 @@ impl Batcher { // Let's spawn the handling of each connection in a separate task. while let Ok((stream, addr)) = listener.accept().await { - metrics::OPEN_CONNECTIONS.inc(); + self.metrics.open_connections.inc(); let batcher = self.clone(); tokio::spawn(batcher.handle_connection(stream, addr)); } @@ -296,7 +305,7 @@ impl Batcher { Ok(_) => info!("{} disconnected", &addr), } - metrics::OPEN_CONNECTIONS.dec(); + self.metrics.open_connections.dec(); Ok(()) } @@ -316,7 +325,7 @@ impl Batcher { }; let msg_nonce = client_msg.verification_data.nonce; debug!("Received message with nonce: {msg_nonce:?}",); - metrics::RECEIVED_PROOFS.inc(); + self.metrics.received_proofs.inc(); // * ---------------------------------------------------* // * Perform validations over the message * @@ -1016,7 +1025,9 @@ impl Batcher { let proof_submitters = finalized_batch.iter().map(|entry| entry.sender).collect(); - metrics::GAS_PRICE_USED_ON_LATEST_BATCH.set(gas_price.as_u64() as i64); + self.metrics + .gas_price_used_on_latest_batch + .set(gas_price.as_u64() as i64); match self .create_new_task( @@ -1029,7 +1040,7 @@ impl Batcher { { Ok(_) => { info!("Batch verification task created on Aligned contract"); - metrics::SENT_BATCHES.inc(); + self.metrics.sent_batches.inc(); Ok(()) } Err(e) => { @@ -1038,7 +1049,7 @@ impl Batcher { e ); - metrics::REVERTED_BATCHES.inc(); + self.metrics.reverted_batches.inc(); Err(e) } } diff --git a/batcher/aligned-batcher/src/main.rs b/batcher/aligned-batcher/src/main.rs index 5715ec9fd8..bc13885052 100644 --- a/batcher/aligned-batcher/src/main.rs +++ b/batcher/aligned-batcher/src/main.rs @@ -5,9 +5,7 @@ use std::sync::Arc; use clap::Parser; use env_logger::Env; -use aligned_batcher::metrics; use aligned_batcher::{types::errors::BatcherError, Batcher}; -use warp::Filter; /// Batcher main flow: /// There are two main tasks spawned: `listen_connections` and `listen_new_blocks` @@ -39,15 +37,6 @@ async fn main() -> Result<(), BatcherError> { }; env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); - - // Endpoint for Prometheus - metrics::init_variables(); - let metrics_route = warp::path!("metrics").and_then(metrics::metrics_handler); - println!("Starting Batcher metrics on port 9093"); - tokio::task::spawn(async move { - warp::serve(metrics_route).run(([0, 0, 0, 0], 9093)).await; - }); //TODO read from config - let batcher = Batcher::new(cli.config).await; let batcher = Arc::new(batcher); @@ -63,7 +52,7 @@ async fn main() -> Result<(), BatcherError> { } }); - metrics::batcher_started(); + batcher.metrics.inc_batcher_restart(); batcher.listen_connections(&addr).await?; diff --git a/batcher/aligned-batcher/src/metrics.rs b/batcher/aligned-batcher/src/metrics.rs index 34dfd68856..6a8ae5fb62 100644 --- a/batcher/aligned-batcher/src/metrics.rs +++ b/batcher/aligned-batcher/src/metrics.rs @@ -1,67 +1,79 @@ +use std::{thread, time::Duration}; + // Prometheus use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter, IntGauge}; -use warp::{Rejection, Reply}; - -use once_cell::sync::Lazy; -use std::{thread, time}; - -// Prometheus setup -pub static BATCHER_STARTED: Lazy = - Lazy::new(|| register_int_counter!(opts!("batcher_started", "Batcher Started")).unwrap()); - -pub static OPEN_CONNECTIONS: Lazy = - Lazy::new(|| register_int_gauge!(opts!("open_connections", "Open Connections")).unwrap()); - -pub static RECEIVED_PROOFS: Lazy = - Lazy::new(|| register_int_counter!(opts!("received_proofs", "Received Proofs")).unwrap()); - -pub static SENT_BATCHES: Lazy = - Lazy::new(|| register_int_counter!(opts!("sent_batches", "Sent Batches")).unwrap()); - -pub static REVERTED_BATCHES: Lazy = - Lazy::new(|| register_int_counter!(opts!("reverted_batches", "Reverted Batches")).unwrap()); - -pub static GAS_PRICE_USED_ON_LATEST_BATCH: Lazy = Lazy::new(|| { - register_int_gauge!(opts!("gas_price_used_on_latest_batch", "Gas Price")).unwrap() -}); - -// so Prometheus can collect our metrics. -pub async fn metrics_handler() -> Result { - use prometheus::Encoder; - let encoder = prometheus::TextEncoder::new(); - - let mut buffer = Vec::new(); - if let Err(e) = encoder.encode(&prometheus::gather(), &mut buffer) { - eprintln!("could not encode prometheus metrics: {}", e); - }; - let res = match String::from_utf8(buffer.clone()) { - Ok(v) => v, - Err(e) => { - eprintln!("prometheus metrics could not be from_utf8'd: {}", e); - String::default() - } - }; - buffer.clear(); - - Ok(res) -} - -pub fn init_variables() { - BATCHER_STARTED.reset(); - - OPEN_CONNECTIONS.set(0); - - RECEIVED_PROOFS.reset(); - - SENT_BATCHES.reset(); - - REVERTED_BATCHES.reset(); +use warp::{Filter, Rejection, Reply}; - GAS_PRICE_USED_ON_LATEST_BATCH.set(0); +#[derive(Clone, Debug)] +pub struct BatcherMetrics { + pub open_connections: IntGauge, + pub received_proofs: IntCounter, + pub sent_batches: IntCounter, + pub reverted_batches: IntCounter, + pub batcher_started: IntCounter, + pub gas_price_used_on_latest_batch: IntGauge, } -pub fn batcher_started() { - thread::sleep(time::Duration::from_secs(10)); - BATCHER_STARTED.inc(); +impl BatcherMetrics { + pub fn start(metrics_port: u16) -> anyhow::Result { + let registry = prometheus::Registry::new(); + + let open_connections = register_int_gauge!(opts!("open_connections", "Open Connections"))?; + let received_proofs = register_int_counter!(opts!("received_proofs", "Received Proofs"))?; + let sent_batches = register_int_counter!(opts!("sent_batches", "Sent Batches"))?; + let reverted_batches = + register_int_counter!(opts!("reverted_batches", "Reverted Batches"))?; + let batcher_started = register_int_counter!(opts!("batcher_started", "Batcher Started"))?; + let gas_price_used_on_latest_batch = + register_int_gauge!(opts!("gas_price_used_on_latest_batch", "Gas Price"))?; + + registry.register(Box::new(open_connections.clone()))?; + registry.register(Box::new(received_proofs.clone()))?; + registry.register(Box::new(sent_batches.clone()))?; + registry.register(Box::new(reverted_batches.clone()))?; + registry.register(Box::new(batcher_started.clone()))?; + + let metrics_route = warp::path!("metrics") + .and(warp::any().map(move || registry.clone())) + .and_then(BatcherMetrics::metrics_handler); + + tokio::task::spawn(async move { + warp::serve(metrics_route) + .run(([0, 0, 0, 0], metrics_port)) + .await; + }); + + Ok(Self { + open_connections, + received_proofs, + sent_batches, + reverted_batches, + batcher_started, + gas_price_used_on_latest_batch, + }) + } + + pub async fn metrics_handler(registry: prometheus::Registry) -> Result { + use prometheus::Encoder; + let encoder = prometheus::TextEncoder::new(); + + let mut buffer = Vec::new(); + if let Err(e) = encoder.encode(®istry.gather(), &mut buffer) { + eprintln!("could not encode prometheus metrics: {}", e); + }; + let res = String::from_utf8(buffer.clone()) + .inspect_err(|e| eprintln!("prometheus metrics could not be parsed correctly: {e}")) + .unwrap_or_default(); + buffer.clear(); + + Ok(res) + } + + pub fn inc_batcher_restart(&self) { + // Sleep for 2 seconds to allow prometheus to start and set the metrics with default intial values. + // If prometheus is not ready, the metrics will directly be set to 1 and prometheus will not be able to display the correct increment. + thread::sleep(Duration::from_secs(2)); + self.batcher_started.inc(); + } } diff --git a/config-files/config-batcher.yaml b/config-files/config-batcher.yaml index e1da034d2c..6a6273e026 100644 --- a/config-files/config-batcher.yaml +++ b/config-files/config-batcher.yaml @@ -22,6 +22,7 @@ batcher: max_batch_size: 268435456 # 256 MiB eth_ws_reconnects: 99999999999999 pre_verification_is_enabled: true + metrics_port: 9093 non_paying: address: 0xa0Ee7A142d267C1f36714E4a8F75612F20a79720 # Anvil address 9 replacement_private_key: ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80 # Anvil address 1 diff --git a/config-files/config.yaml b/config-files/config.yaml index f9f3be1e76..0af66f176a 100644 --- a/config-files/config.yaml +++ b/config-files/config.yaml @@ -27,6 +27,7 @@ batcher: max_batch_size: 268435456 # 256 MiB eth_ws_reconnects: 99999999999999 pre_verification_is_enabled: true + metrics_port: 9093 ## Aggregator Configurations aggregator: