diff --git a/Cargo-minimal.lock b/Cargo-minimal.lock index 65ac2f07b..e81bddd47 100644 --- a/Cargo-minimal.lock +++ b/Cargo-minimal.lock @@ -1992,6 +1992,7 @@ dependencies = [ "hyper-rustls 0.26.0", "hyper-util", "payjoin", + "prometheus", "redis", "rustls 0.22.4", "tempfile", @@ -2224,6 +2225,27 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot 0.12.3", + "protobuf", + "thiserror 1.0.63", +] + +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" + [[package]] name = "quote" version = "1.0.37" diff --git a/Cargo-recent.lock b/Cargo-recent.lock index 65ac2f07b..e81bddd47 100644 --- a/Cargo-recent.lock +++ b/Cargo-recent.lock @@ -1992,6 +1992,7 @@ dependencies = [ "hyper-rustls 0.26.0", "hyper-util", "payjoin", + "prometheus", "redis", "rustls 0.22.4", "tempfile", @@ -2224,6 +2225,27 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot 0.12.3", + "protobuf", + "thiserror 1.0.63", +] + +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" + [[package]] name = "quote" version = "1.0.37" diff --git a/payjoin-directory/Cargo.toml b/payjoin-directory/Cargo.toml index 71234a5e9..468a534f8 100644 --- a/payjoin-directory/Cargo.toml +++ b/payjoin-directory/Cargo.toml @@ -34,6 +34,7 @@ tokio = { version = "1.12.0", features = ["full"] } tokio-rustls = { version = "0.25", features = ["ring"], default-features = false, optional = true } tracing = "0.1.37" tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } +prometheus = "0.13.4" [dev-dependencies] tempfile = "3.5.0" diff --git a/payjoin-directory/src/lib.rs b/payjoin-directory/src/lib.rs index 79080d01c..2470837a6 100644 --- a/payjoin-directory/src/lib.rs +++ b/payjoin-directory/src/lib.rs @@ -15,10 +15,11 @@ use tracing::{debug, error, trace, warn}; pub use crate::db::DbPool; pub mod key_config; pub use crate::key_config::*; - +use crate::metrics::Metrics; pub const DEFAULT_DIR_PORT: u16 = 8080; pub const DEFAULT_DB_HOST: &str = "localhost:6379"; pub const DEFAULT_TIMEOUT_SECS: u64 = 30; +pub const DEFAULT_METRIC_PORT: u16 = 9090; const CHACHA20_POLY1305_NONCE_LEN: usize = 32; // chacha20poly1305 n_k const POLY1305_TAG_SIZE: usize = 16; @@ -32,6 +33,8 @@ const V1_UNAVAILABLE_RES_JSON: &str = r#"{{"errorCode": "unavailable", "message" mod db; +pub mod metrics; + pub type BoxError = Box; #[cfg(feature = "_manual-tls")] @@ -56,6 +59,7 @@ fn init_tls_acceptor(cert_key: (Vec, Vec)) -> Result> for Service { @@ -67,13 +71,16 @@ impl hyper::service::Service> for Service { fn call(&self, req: Request) -> Self::Future { let pool = self.pool.clone(); let ohttp = self.ohttp.clone(); - let this = Service::new(pool, ohttp); + let metrics = self.metrics.clone(); + let this = Service::new(pool, ohttp, metrics); Box::pin(async move { this.serve_request(req).await }) } } impl Service { - pub fn new(pool: DbPool, ohttp: ohttp::Server) -> Self { Self { pool, ohttp } } + pub fn new(pool: DbPool, ohttp: ohttp::Server, metrics: Metrics) -> Self { + Self { pool, ohttp, metrics } + } #[cfg(feature = "_manual-tls")] pub async fn serve_tls( @@ -88,6 +95,7 @@ impl Service { let tls_acceptor = tls_acceptor.clone(); let service = self.clone(); tokio::spawn(async move { + service.metrics.record_connection(); let tls_stream = match tls_acceptor.accept(stream).await { Ok(tls_stream) => tls_stream, Err(e) => { @@ -112,6 +120,7 @@ impl Service { let io = TokioIo::new(stream); let service = self.clone(); tokio::spawn(async move { + service.metrics.record_connection(); if let Err(err) = http1::Builder::new().serve_connection(io, service).with_upgrades().await { @@ -143,6 +152,7 @@ impl Service { (Method::POST, ["", id]) => self.post_fallback_v1(id, query, body).await, (Method::GET, ["", "health"]) => health_check().await, (Method::GET, ["", ""]) => handle_directory_home_path().await, + (Method::GET, ["", "metrics"]) => Ok(self.handle_metrics().await), _ => Ok(not_found()), } .unwrap_or_else(|e| e.to_response()); @@ -349,6 +359,24 @@ impl Service { res } + async fn handle_metrics(&self) -> Response> { + match self.metrics.generate_metrics() { + Ok(metrics_data) => { + let mut response = Response::new(full(metrics_data)); + response.headers_mut().insert( + CONTENT_TYPE, + HeaderValue::from_static("text/plain; version=0.0.4; charset=utf-8"), + ); + response + } + Err(e) => { + error!("failed to generate metrics: {}", e); + let mut response = Response::new(full("Error generating metrics")); + *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + response + } + } + } } fn handle_peek( @@ -491,3 +519,22 @@ fn empty() -> BoxBody { fn full>(chunk: T) -> BoxBody { Full::new(chunk.into()).map_err(|never| match never {}).boxed() } + +pub async fn serve_metrics_tcp( + service: Service, + listener: tokio::net::TcpListener, +) -> Result<(), BoxError> { + while let Ok((stream, _)) = listener.accept().await { + let io = TokioIo::new(stream); + let service = service.clone(); + tokio::spawn(async move { + if let Err(err) = + http1::Builder::new().serve_connection(io, service).with_upgrades().await + { + error!("Error serving connection: {:?}", err); + } + }); + } + + Ok(()) +} diff --git a/payjoin-directory/src/main.rs b/payjoin-directory/src/main.rs index f4f8d5948..655675bb6 100644 --- a/payjoin-directory/src/main.rs +++ b/payjoin-directory/src/main.rs @@ -1,6 +1,7 @@ use std::env; use std::net::{IpAddr, Ipv6Addr, SocketAddr}; +use payjoin_directory::metrics::Metrics; use payjoin_directory::*; use tokio::net::TcpListener; use tracing_subscriber::filter::LevelFilter; @@ -15,6 +16,9 @@ async fn main() -> Result<(), BoxError> { let dir_port = env::var("PJ_DIR_PORT").map_or(DEFAULT_DIR_PORT, |s| s.parse().expect("Invalid port")); + let metric_port = env::var("PJ_METRIC_PORT") + .map_or(DEFAULT_METRIC_PORT, |s| s.parse().expect("invalid metric port")); + let timeout_env = env::var("PJ_DIR_TIMEOUT_SECS") .map_or(DEFAULT_TIMEOUT_SECS, |s| s.parse().expect("Invalid timeout")); let timeout = std::time::Duration::from_secs(timeout_env); @@ -38,9 +42,20 @@ async fn main() -> Result<(), BoxError> { } }; + // Start metrics server in the background + let metrics = Metrics::new(); + let metrics_listener = bind_port(metric_port).await?; + let listener = bind_port(dir_port).await?; let db = DbPool::new(timeout, db_host).await?; - let service = Service::new(db, ohttp.into()); + let service = Service::new(db, ohttp.into(), metrics); + let service_clone = service.clone(); + tokio::spawn(async move { + if let Err(e) = payjoin_directory::serve_metrics_tcp(service_clone, metrics_listener).await + { + eprintln!("Metrics server error: {e}"); + } + }); service.serve_tcp(listener).await } diff --git a/payjoin-directory/src/metrics.rs b/payjoin-directory/src/metrics.rs new file mode 100644 index 000000000..af86e4a93 --- /dev/null +++ b/payjoin-directory/src/metrics.rs @@ -0,0 +1,60 @@ +use prometheus::{Encoder, IntCounter, Registry, TextEncoder}; + +#[derive(Debug, Clone)] +pub struct Metrics { + /// Total number of connections accepted by the directory + pub connections_total: IntCounter, + registry: Registry, +} + +impl Default for Metrics { + fn default() -> Self { Self::new() } +} + +impl Metrics { + pub fn new() -> Self { + let registry = Registry::new(); + let connections_total = + IntCounter::new("connections_total", "Total number of tcp connections") + .expect("Failed to create connections_total metrics "); + + registry + .register(Box::new(connections_total.clone())) + .expect("Failed to register connections_total"); + + Self { connections_total, registry } + } + + /// Records a new connection + pub fn record_connection(&self) { self.connections_total.inc(); } + + pub fn generate_metrics(&self) -> Result> { + let encoder = TextEncoder::new(); + let all_metrics = self.registry.gather(); + let mut buffer = Vec::new(); + encoder.encode(&all_metrics, &mut buffer)?; + Ok(String::from_utf8(buffer)?) + } +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn test_recording_metrics() { + let metrics = Metrics::new(); + metrics.record_connection(); + + let metrics_recorded = metrics.generate_metrics().expect("Failed to generate metrics"); + assert!(metrics_recorded.contains("connections_total")); + } + + #[test] + fn does_not_error_on_empty_metrics() { + let metrics = Metrics::new(); + let metrics_recorded = metrics.generate_metrics().expect("Failed to generate metrics"); + assert!(metrics_recorded.contains("connections_total 0")); + } +} diff --git a/payjoin-test-utils/src/lib.rs b/payjoin-test-utils/src/lib.rs index fdb5f3c24..df6ef61dd 100644 --- a/payjoin-test-utils/src/lib.rs +++ b/payjoin-test-utils/src/lib.rs @@ -142,7 +142,8 @@ pub async fn init_directory( println!("Database running on {db_host}"); let db = payjoin_directory::DbPool::new(timeout, db_host).await?; - let service = payjoin_directory::Service::new(db, ohttp_server.into()); + let metrics = payjoin_directory::metrics::Metrics::new(); + let service = payjoin_directory::Service::new(db, ohttp_server.into(), metrics); let listener = bind_free_port().await?; let port = listener.local_addr()?.port();