From 7f04622b5e9552f0dce7bf581c26c45be5a9ea5c Mon Sep 17 00:00:00 2001 From: ltitanb Date: Thu, 17 Oct 2024 19:33:47 +0100 Subject: [PATCH 1/2] add signal entrypoint --- bin/pbs.rs | 10 ++++++++-- bin/signer.rs | 6 ++++-- bin/src/lib.rs | 2 +- crates/common/src/utils.rs | 26 ++++++++++++++++++++++++++ crates/pbs/src/service.rs | 16 +++++++++++----- crates/signer/src/service.rs | 13 ++++++++----- 6 files changed, 58 insertions(+), 15 deletions(-) diff --git a/bin/pbs.rs b/bin/pbs.rs index caa72ab7..65798113 100644 --- a/bin/pbs.rs +++ b/bin/pbs.rs @@ -1,4 +1,7 @@ -use cb_common::{config::load_pbs_config, utils::initialize_pbs_tracing_log}; +use cb_common::{ + config::load_pbs_config, + utils::{initialize_pbs_tracing_log, wait_for_signal}, +}; use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use eyre::Result; @@ -13,7 +16,10 @@ async fn main() -> Result<()> { let pbs_config = load_pbs_config()?; let _guard = initialize_pbs_tracing_log(); + let state = PbsState::new(pbs_config); PbsService::init_metrics()?; - PbsService::run::<_, DefaultBuilderApi>(state).await + PbsService::run::<_, DefaultBuilderApi>(state).await?; + + wait_for_signal().await } diff --git a/bin/signer.rs b/bin/signer.rs index ca838e59..b9d3477a 100644 --- a/bin/signer.rs +++ b/bin/signer.rs @@ -1,6 +1,6 @@ use cb_common::{ config::{StartSignerConfig, SIGNER_MODULE_NAME}, - utils::initialize_tracing_log, + utils::{initialize_tracing_log, wait_for_signal}, }; use cb_signer::service::SigningService; use eyre::Result; @@ -16,5 +16,7 @@ async fn main() -> Result<()> { let config = StartSignerConfig::load_from_env()?; let _guard = initialize_tracing_log(SIGNER_MODULE_NAME); - SigningService::run(config).await + SigningService::run(config).await?; + + wait_for_signal().await } diff --git a/bin/src/lib.rs b/bin/src/lib.rs index 44cf8552..e4f566b2 100644 --- a/bin/src/lib.rs +++ b/bin/src/lib.rs @@ -21,7 +21,7 @@ pub mod prelude { get_header, get_status, register_validator, submit_block, BuilderApi, BuilderApiState, DefaultBuilderApi, PbsService, PbsState, }; - // The TreeHash derive macro requires tree_hash:: as import + // The TreeHash derive macro requires tree_hash as import pub mod tree_hash { pub use tree_hash::*; } diff --git a/crates/common/src/utils.rs b/crates/common/src/utils.rs index d2c26ed6..c3d69105 100644 --- a/crates/common/src/utils.rs +++ b/crates/common/src/utils.rs @@ -246,3 +246,29 @@ pub fn get_user_agent_with_version(req_headers: &HeaderMap) -> eyre::Result eyre::Result<()> { + use tokio::signal::unix::{signal, SignalKind}; + use tracing::info; + + let mut sigint = signal(SignalKind::interrupt())?; + let mut sigterm = signal(SignalKind::terminate())?; + + tokio::select! { + _ = sigint.recv() => {} + _ = sigterm.recv() => {} + } + + info!("shutting down"); + + Ok(()) +} + +#[cfg(windows)] +pub async fn wait_for_signal() -> eyre::Result<()> { + tokio::signal::ctrl_c().await?; + info!("shutting down"); + + Ok(()) +} diff --git a/crates/pbs/src/service.rs b/crates/pbs/src/service.rs index 7b47a179..d433fb39 100644 --- a/crates/pbs/src/service.rs +++ b/crates/pbs/src/service.rs @@ -2,10 +2,10 @@ use std::net::SocketAddr; use cb_common::constants::COMMIT_BOOST_VERSION; use cb_metrics::provider::MetricsProvider; -use eyre::{Context, Result}; +use eyre::Result; use prometheus::core::Collector; use tokio::net::TcpListener; -use tracing::info; +use tracing::{error, info}; use crate::{ api::BuilderApi, @@ -21,12 +21,18 @@ impl PbsService { let address = SocketAddr::from(([0, 0, 0, 0], state.config.pbs_config.port)); let events_subs = state.config.event_publisher.as_ref().map(|e| e.n_subscribers()).unwrap_or_default(); - info!(version = COMMIT_BOOST_VERSION, ?address, events_subs, chain =? state.config.chain, "Starting PBS service"); + info!(version = COMMIT_BOOST_VERSION, ?address, events_subs, chain =? state.config.chain, "starting PBS service"); let app = create_app_router::(state); - let listener = TcpListener::bind(address).await.expect("failed tcp binding"); + let listener = TcpListener::bind(address).await?; - axum::serve(listener, app).await.wrap_err("PBS server exited") + tokio::spawn(async move { + if let Err(err) = axum::serve(listener, app).await { + error!(%err, "PBS service unexpectedly stopped"); + }; + }); + + Ok(()) } pub fn register_metric(c: Box) { diff --git a/crates/signer/src/service.rs b/crates/signer/src/service.rs index 754dda92..647919a4 100644 --- a/crates/signer/src/service.rs +++ b/crates/signer/src/service.rs @@ -22,7 +22,7 @@ use cb_common::{ constants::COMMIT_BOOST_VERSION, types::{Jwt, ModuleId}, }; -use eyre::{Result, WrapErr}; +use eyre::Result; use headers::{authorization::Bearer, Authorization}; use tokio::{net::TcpListener, sync::RwLock}; use tracing::{debug, error, info, warn}; @@ -71,11 +71,14 @@ impl SigningService { .route_layer(middleware::from_fn_with_state(state.clone(), jwt_auth)); let address = SocketAddr::from(([0, 0, 0, 0], config.server_port)); - let listener = TcpListener::bind(address).await.wrap_err("failed tcp binding")?; + let listener = TcpListener::bind(address).await?; + + tokio::spawn(async move { + if let Err(err) = axum::serve(listener, app).await { + error!(%err, "signing server unexpectedly stopped"); + }; + }); - if let Err(err) = axum::serve(listener, app).await { - error!(%err, "Signing server exited") - } Ok(()) } } From eab0d7db26e37e60e5733813215ffab65efc5e10 Mon Sep 17 00:00:00 2001 From: ltitanb Date: Fri, 18 Oct 2024 11:08:26 +0100 Subject: [PATCH 2/2] fixes --- Cargo.lock | 1 + bin/Cargo.toml | 3 +++ bin/pbs.rs | 16 ++++++++++++++-- bin/signer.rs | 16 ++++++++++++++-- crates/common/src/utils.rs | 5 ----- crates/pbs/src/service.rs | 12 +++--------- crates/signer/src/service.rs | 10 ++-------- 7 files changed, 37 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 87accc2f..f3c3e9ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1349,6 +1349,7 @@ dependencies = [ "color-eyre", "eyre", "tokio", + "tracing", "tree_hash 0.8.0", "tree_hash_derive", ] diff --git a/bin/Cargo.toml b/bin/Cargo.toml index ed7493c5..79258d76 100644 --- a/bin/Cargo.toml +++ b/bin/Cargo.toml @@ -18,6 +18,9 @@ tokio.workspace = true tree_hash.workspace = true tree_hash_derive.workspace = true +# telemetry +tracing.workspace = true + # misc clap.workspace = true eyre.workspace = true diff --git a/bin/pbs.rs b/bin/pbs.rs index 65798113..b8039bec 100644 --- a/bin/pbs.rs +++ b/bin/pbs.rs @@ -4,6 +4,7 @@ use cb_common::{ }; use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use eyre::Result; +use tracing::{error, info}; #[tokio::main] async fn main() -> Result<()> { @@ -19,7 +20,18 @@ async fn main() -> Result<()> { let state = PbsState::new(pbs_config); PbsService::init_metrics()?; - PbsService::run::<_, DefaultBuilderApi>(state).await?; + let server = PbsService::run::<_, DefaultBuilderApi>(state); - wait_for_signal().await + tokio::select! { + maybe_err = server => { + if let Err(err) = maybe_err { + error!(%err, "PBS service unexpectedly stopped"); + } + }, + _ = wait_for_signal() => { + info!("shutting down"); + } + } + + Ok(()) } diff --git a/bin/signer.rs b/bin/signer.rs index b9d3477a..cd9480ad 100644 --- a/bin/signer.rs +++ b/bin/signer.rs @@ -4,6 +4,7 @@ use cb_common::{ }; use cb_signer::service::SigningService; use eyre::Result; +use tracing::{error, info}; #[tokio::main] async fn main() -> Result<()> { @@ -16,7 +17,18 @@ async fn main() -> Result<()> { let config = StartSignerConfig::load_from_env()?; let _guard = initialize_tracing_log(SIGNER_MODULE_NAME); - SigningService::run(config).await?; + let server = SigningService::run(config); - wait_for_signal().await + tokio::select! { + maybe_err = server => { + if let Err(err) = maybe_err { + error!(%err, "signing server unexpectedly stopped"); + } + }, + _ = wait_for_signal() => { + info!("shutting down"); + } + } + + Ok(()) } diff --git a/crates/common/src/utils.rs b/crates/common/src/utils.rs index c3d69105..6e1ae273 100644 --- a/crates/common/src/utils.rs +++ b/crates/common/src/utils.rs @@ -250,7 +250,6 @@ pub fn get_user_agent_with_version(req_headers: &HeaderMap) -> eyre::Result eyre::Result<()> { use tokio::signal::unix::{signal, SignalKind}; - use tracing::info; let mut sigint = signal(SignalKind::interrupt())?; let mut sigterm = signal(SignalKind::terminate())?; @@ -260,15 +259,11 @@ pub async fn wait_for_signal() -> eyre::Result<()> { _ = sigterm.recv() => {} } - info!("shutting down"); - Ok(()) } #[cfg(windows)] pub async fn wait_for_signal() -> eyre::Result<()> { tokio::signal::ctrl_c().await?; - info!("shutting down"); - Ok(()) } diff --git a/crates/pbs/src/service.rs b/crates/pbs/src/service.rs index d433fb39..cd36b5cf 100644 --- a/crates/pbs/src/service.rs +++ b/crates/pbs/src/service.rs @@ -2,10 +2,10 @@ use std::net::SocketAddr; use cb_common::constants::COMMIT_BOOST_VERSION; use cb_metrics::provider::MetricsProvider; -use eyre::Result; +use eyre::{Context, Result}; use prometheus::core::Collector; use tokio::net::TcpListener; -use tracing::{error, info}; +use tracing::info; use crate::{ api::BuilderApi, @@ -26,13 +26,7 @@ impl PbsService { let app = create_app_router::(state); let listener = TcpListener::bind(address).await?; - tokio::spawn(async move { - if let Err(err) = axum::serve(listener, app).await { - error!(%err, "PBS service unexpectedly stopped"); - }; - }); - - Ok(()) + axum::serve(listener, app).await.wrap_err("PBS server exited") } pub fn register_metric(c: Box) { diff --git a/crates/signer/src/service.rs b/crates/signer/src/service.rs index 647919a4..be5ad6c4 100644 --- a/crates/signer/src/service.rs +++ b/crates/signer/src/service.rs @@ -22,7 +22,7 @@ use cb_common::{ constants::COMMIT_BOOST_VERSION, types::{Jwt, ModuleId}, }; -use eyre::Result; +use eyre::{Context, Result}; use headers::{authorization::Bearer, Authorization}; use tokio::{net::TcpListener, sync::RwLock}; use tracing::{debug, error, info, warn}; @@ -73,13 +73,7 @@ impl SigningService { let address = SocketAddr::from(([0, 0, 0, 0], config.server_port)); let listener = TcpListener::bind(address).await?; - tokio::spawn(async move { - if let Err(err) = axum::serve(listener, app).await { - error!(%err, "signing server unexpectedly stopped"); - }; - }); - - Ok(()) + axum::serve(listener, app).await.wrap_err("signer server exited") } }