From 12461b31d28de983e5f06ebe3c0802eb02c9d97e Mon Sep 17 00:00:00 2001 From: eltitanb Date: Fri, 12 Sep 2025 14:59:36 +0100 Subject: [PATCH 1/6] tmp --- Cargo.lock | 1 + crates/common/src/config/mod.rs | 9 ++- crates/common/src/config/pbs.rs | 5 ++ crates/pbs/Cargo.toml | 1 + crates/pbs/src/api.rs | 3 +- .../pbs/src/mev_boost/register_validator.rs | 55 +++++++++++-------- crates/pbs/src/routes/register_validator.rs | 3 +- tests/src/utils.rs | 1 + 8 files changed, 49 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0c9a99db..0fa792c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1583,6 +1583,7 @@ dependencies = [ "parking_lot", "prometheus", "reqwest", + "serde", "serde_json", "tokio", "tower-http", diff --git a/crates/common/src/config/mod.rs b/crates/common/src/config/mod.rs index 0880d5c8..55d6390d 100644 --- a/crates/common/src/config/mod.rs +++ b/crates/common/src/config/mod.rs @@ -1,6 +1,6 @@ use std::path::PathBuf; -use eyre::Result; +use eyre::{Result, bail}; use serde::{Deserialize, Serialize}; use crate::types::{Chain, ChainLoader, ForkVersion, load_chain_from_file}; @@ -45,6 +45,13 @@ impl CommitBoostConfig { if let Some(signer) = &self.signer { signer.validate().await?; } + + if self.relays.iter().any(|r| r.validator_registration_batch_size.is_some()) { + bail!( + "validator_registration_batch_size is now deprecated on a per-relay basis. Please use validator_registration_batch_size in the [pbs] section instead" + ) + } + Ok(()) } diff --git a/crates/common/src/config/pbs.rs b/crates/common/src/config/pbs.rs index f606a286..dc11a513 100644 --- a/crates/common/src/config/pbs.rs +++ b/crates/common/src/config/pbs.rs @@ -36,6 +36,7 @@ use crate::{ }; #[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] pub struct RelayConfig { /// Relay ID, if missing will default to the URL hostname from the entry pub id: Option, @@ -128,6 +129,10 @@ pub struct PbsConfig { /// Maximum number of retries for validator registration request per relay #[serde(default = "default_u32::")] pub register_validator_retry_limit: u32, + /// Maximum number of validators to send to relays in a single registration + /// request + #[serde(deserialize_with = "empty_string_as_none", default)] + pub validator_registration_batch_size: Option, } impl PbsConfig { diff --git a/crates/pbs/Cargo.toml b/crates/pbs/Cargo.toml index d3aaace8..e8cb0b31 100644 --- a/crates/pbs/Cargo.toml +++ b/crates/pbs/Cargo.toml @@ -18,6 +18,7 @@ lazy_static.workspace = true parking_lot.workspace = true prometheus.workspace = true reqwest.workspace = true +serde.workspace = true serde_json.workspace = true tokio.workspace = true tower-http.workspace = true diff --git a/crates/pbs/src/api.rs b/crates/pbs/src/api.rs index c388ddf1..a3786f0b 100644 --- a/crates/pbs/src/api.rs +++ b/crates/pbs/src/api.rs @@ -1,4 +1,3 @@ -use alloy::rpc::types::beacon::relay::ValidatorRegistration; use async_trait::async_trait; use axum::{Router, http::HeaderMap}; use cb_common::pbs::{ @@ -45,7 +44,7 @@ pub trait BuilderApi: 'static { /// https://ethereum.github.io/builder-specs/#/Builder/registerValidator async fn register_validator( - registrations: Vec, + registrations: Vec, req_headers: HeaderMap, state: PbsState, ) -> eyre::Result<()> { diff --git a/crates/pbs/src/mev_boost/register_validator.rs b/crates/pbs/src/mev_boost/register_validator.rs index eef2f398..1b095f5b 100644 --- a/crates/pbs/src/mev_boost/register_validator.rs +++ b/crates/pbs/src/mev_boost/register_validator.rs @@ -1,6 +1,6 @@ use std::time::{Duration, Instant}; -use alloy::rpc::types::beacon::relay::ValidatorRegistration; +use alloy::primitives::Bytes; use axum::http::{HeaderMap, HeaderValue}; use cb_common::{ pbs::{HEADER_START_TIME_UNIX_MS, RelayClient, error::PbsError}, @@ -8,7 +8,7 @@ use cb_common::{ }; use eyre::bail; use futures::future::{join_all, select_ok}; -use reqwest::header::USER_AGENT; +use reqwest::header::{CONTENT_TYPE, USER_AGENT}; use tracing::{Instrument, debug, error}; use url::Url; @@ -21,7 +21,7 @@ use crate::{ /// Implements https://ethereum.github.io/builder-specs/#/Builder/registerValidator /// Returns 200 if at least one relay returns 200, else 503 pub async fn register_validator( - registrations: Vec, + registrations: Vec, req_headers: HeaderMap, state: PbsState, ) -> eyre::Result<()> { @@ -32,25 +32,29 @@ pub async fn register_validator( send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers)?); let relays = state.all_relays().to_vec(); + + let batch_size: Option = None; + + // prepare the body in advance, ugly dyn + let bodies: Box> = if let Some(batch_size) = batch_size { + Box::new(registrations.chunks(batch_size).map(|batch| { + let body = serde_json::to_vec(batch).unwrap(); + (batch.len(), Bytes::from(body)) + })) + } else { + let body = serde_json::to_vec(®istrations).unwrap(); + Box::new(std::iter::once((registrations.len(), Bytes::from(body)))) + }; + send_headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); + let mut handles = Vec::with_capacity(relays.len()); - for relay in relays.clone() { - if let Some(batch_size) = relay.config.validator_registration_batch_size { - for batch in registrations.chunks(batch_size) { - handles.push(tokio::spawn( - send_register_validator_with_timeout( - batch.to_vec(), - relay.clone(), - send_headers.clone(), - state.pbs_config().timeout_register_validator_ms, - state.pbs_config().register_validator_retry_limit, - ) - .in_current_span(), - )); - } - } else { + + for (n_regs, body) in bodies { + for relay in relays.clone() { handles.push(tokio::spawn( send_register_validator_with_timeout( - registrations.clone(), + n_regs, + body.clone(), relay.clone(), send_headers.clone(), state.pbs_config().timeout_register_validator_ms, @@ -82,7 +86,8 @@ pub async fn register_validator( /// Register validator to relay, retry connection errors until the /// given timeout has passed async fn send_register_validator_with_timeout( - registrations: Vec, + n_regs: usize, + body: Bytes, relay: RelayClient, headers: HeaderMap, timeout_ms: u64, @@ -97,7 +102,8 @@ async fn send_register_validator_with_timeout( let start_request = Instant::now(); match send_register_validator( url.clone(), - ®istrations, + n_regs, + body.clone(), &relay, headers.clone(), remaining_timeout_ms, @@ -134,7 +140,8 @@ async fn send_register_validator_with_timeout( async fn send_register_validator( url: Url, - registrations: &[ValidatorRegistration], + n_regs: usize, + body: Bytes, relay: &RelayClient, headers: HeaderMap, timeout_ms: u64, @@ -146,7 +153,7 @@ async fn send_register_validator( .post(url) .timeout(Duration::from_millis(timeout_ms)) .headers(headers) - .json(®istrations) + .body(body.0) .send() .await { @@ -189,7 +196,7 @@ async fn send_register_validator( retry, ?code, latency = ?request_latency, - num_registrations = registrations.len(), + num_registrations = n_regs, "registration successful" ); diff --git a/crates/pbs/src/routes/register_validator.rs b/crates/pbs/src/routes/register_validator.rs index 8ccbfec8..dad85d27 100644 --- a/crates/pbs/src/routes/register_validator.rs +++ b/crates/pbs/src/routes/register_validator.rs @@ -1,4 +1,3 @@ -use alloy::rpc::types::beacon::relay::ValidatorRegistration; use axum::{Json, extract::State, http::HeaderMap, response::IntoResponse}; use cb_common::utils::get_user_agent; use reqwest::StatusCode; @@ -15,7 +14,7 @@ use crate::{ pub async fn handle_register_validator>( State(state): State>, req_headers: HeaderMap, - Json(registrations): Json>, + Json(registrations): Json>, ) -> Result { let state = state.read().clone(); diff --git a/tests/src/utils.rs b/tests/src/utils.rs index c7db1b71..1c83e0eb 100644 --- a/tests/src/utils.rs +++ b/tests/src/utils.rs @@ -81,6 +81,7 @@ pub fn get_pbs_static_config(port: u16) -> PbsConfig { rpc_url: None, http_timeout_seconds: 10, register_validator_retry_limit: u32::MAX, + validator_registration_batch_size: None, } } From 7d18f424d8d64d99d3b2408387280d1ad08de0c3 Mon Sep 17 00:00:00 2001 From: eltitanb Date: Fri, 12 Sep 2025 19:30:11 +0100 Subject: [PATCH 2/6] rm test --- tests/tests/pbs_post_validators.rs | 140 ----------------------------- 1 file changed, 140 deletions(-) diff --git a/tests/tests/pbs_post_validators.rs b/tests/tests/pbs_post_validators.rs index f520cbae..35e6c5be 100644 --- a/tests/tests/pbs_post_validators.rs +++ b/tests/tests/pbs_post_validators.rs @@ -61,146 +61,6 @@ async fn test_register_validators() -> Result<()> { Ok(()) } -#[tokio::test] -async fn test_register_validators_returns_422_if_request_is_malformed() -> Result<()> { - setup_test_env(); - let signer = random_secret(); - let pubkey: BlsPublicKey = signer.public_key(); - - let chain = Chain::Holesky; - let pbs_port = 4100; - - // Run a mock relay - let relays = vec![generate_mock_relay(pbs_port + 1, pubkey)?]; - let mock_state = Arc::new(MockRelayState::new(chain, signer)); - tokio::spawn(start_mock_relay_service(mock_state.clone(), pbs_port + 1)); - - // Run the PBS service - let config = to_pbs_config(chain, get_pbs_static_config(pbs_port), relays); - let state = PbsState::new(config); - tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); - - // leave some time to start servers - tokio::time::sleep(Duration::from_millis(100)).await; - - let mock_validator = MockValidator::new(pbs_port)?; - let url = mock_validator.comm_boost.register_validator_url().unwrap(); - info!("Sending register validator"); - - // Bad fee recipient - let bad_json = r#"[{ - "message": { - "fee_recipient": "0xaa", - "gas_limit": "100000", - "timestamp": "1000000", - "pubkey": "0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" - }, - "signature": "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc" - }]"#; - - let res = mock_validator - .comm_boost - .client - .post(url.clone()) - .header("Content-Type", "application/json") - .body(bad_json) - .send() - .await?; - - assert_eq!(res.status(), StatusCode::UNPROCESSABLE_ENTITY); - - // Bad pubkey - let bad_json = r#"[{ - "message": { - "fee_recipient": "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", - "gas_limit": "100000", - "timestamp": "1000000", - "pubkey": "0xbbb" - }, - "signature": "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc" - }]"#; - - let res = mock_validator - .comm_boost - .client - .post(url.clone()) - .header("Content-Type", "application/json") - .body(bad_json) - .send() - .await?; - - assert_eq!(res.status(), StatusCode::UNPROCESSABLE_ENTITY); - - // Bad signature - let bad_json = r#"[{ - "message": { - "fee_recipient": "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", - "gas_limit": "100000", - "timestamp": "1000000", - "pubkey": "0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" - }, - "signature": "0xcccc" - }]"#; - - let res = mock_validator - .comm_boost - .client - .post(url.clone()) - .header("Content-Type", "application/json") - .body(bad_json) - .send() - .await?; - - assert_eq!(res.status(), StatusCode::UNPROCESSABLE_ENTITY); - - // gas limit too high - let bad_json = r#"[{ - "message": { - "fee_recipient": "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", - "gas_limit": "10000000000000000000000000000000000000000000000000000000", - "timestamp": "1000000", - "pubkey": "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc" - }, - "signature": "0xcccc" - }]"#; - - let res = mock_validator - .comm_boost - .client - .post(url.clone()) - .header("Content-Type", "application/json") - .body(bad_json) - .send() - .await?; - - assert_eq!(res.status(), StatusCode::UNPROCESSABLE_ENTITY); - - // timestamp too high - let bad_json = r#"[{ - "message": { - "fee_recipient": "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", - "gas_limit": "1000000", - "timestamp": "10000000000000000000000000000000000000000000000000000000", - "pubkey": "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc" - }, - "signature": "0xcccc" - }]"#; - - let res = mock_validator - .comm_boost - .client - .post(url.clone()) - .header("Content-Type", "application/json") - .body(bad_json) - .send() - .await?; - - assert_eq!(res.status(), StatusCode::UNPROCESSABLE_ENTITY); - - assert_eq!(mock_state.received_register_validator(), 0); - Ok(()) -} - #[tokio::test] async fn test_register_validators_does_not_retry_on_429() -> Result<()> { setup_test_env(); From fc03e375f32bffe408c5d63fecfd379a23283829 Mon Sep 17 00:00:00 2001 From: eltitanb Date: Fri, 12 Sep 2025 19:33:08 +0100 Subject: [PATCH 3/6] use batch param --- .../pbs/src/mev_boost/register_validator.rs | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/crates/pbs/src/mev_boost/register_validator.rs b/crates/pbs/src/mev_boost/register_validator.rs index 1b095f5b..8d06ff8f 100644 --- a/crates/pbs/src/mev_boost/register_validator.rs +++ b/crates/pbs/src/mev_boost/register_validator.rs @@ -33,18 +33,17 @@ pub async fn register_validator( let relays = state.all_relays().to_vec(); - let batch_size: Option = None; - // prepare the body in advance, ugly dyn - let bodies: Box> = if let Some(batch_size) = batch_size { - Box::new(registrations.chunks(batch_size).map(|batch| { - let body = serde_json::to_vec(batch).unwrap(); - (batch.len(), Bytes::from(body)) - })) - } else { - let body = serde_json::to_vec(®istrations).unwrap(); - Box::new(std::iter::once((registrations.len(), Bytes::from(body)))) - }; + let bodies: Box> = + if let Some(batch_size) = state.config.pbs_config.validator_registration_batch_size { + Box::new(registrations.chunks(batch_size).map(|batch| { + let body = serde_json::to_vec(batch).unwrap(); + (batch.len(), Bytes::from(body)) + })) + } else { + let body = serde_json::to_vec(®istrations).unwrap(); + Box::new(std::iter::once((registrations.len(), Bytes::from(body)))) + }; send_headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); let mut handles = Vec::with_capacity(relays.len()); From f63b6b98c29a5c35c3460d8b385c4a3561ecf0ff Mon Sep 17 00:00:00 2001 From: eltitanb Date: Fri, 12 Sep 2025 19:37:34 +0100 Subject: [PATCH 4/6] add comment --- crates/pbs/src/mev_boost/register_validator.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/pbs/src/mev_boost/register_validator.rs b/crates/pbs/src/mev_boost/register_validator.rs index 8d06ff8f..315cf5d4 100644 --- a/crates/pbs/src/mev_boost/register_validator.rs +++ b/crates/pbs/src/mev_boost/register_validator.rs @@ -37,6 +37,7 @@ pub async fn register_validator( let bodies: Box> = if let Some(batch_size) = state.config.pbs_config.validator_registration_batch_size { Box::new(registrations.chunks(batch_size).map(|batch| { + // SAFETY: unwrap is ok because we're serializing a &[serde_json::Value] let body = serde_json::to_vec(batch).unwrap(); (batch.len(), Bytes::from(body)) })) From cf2a18e5f578a9935c898529f4b64a2e8ddab161 Mon Sep 17 00:00:00 2001 From: eltitanb Date: Mon, 15 Sep 2025 09:59:25 +0100 Subject: [PATCH 5/6] avoid one vec allocation --- crates/pbs/src/mev_boost/register_validator.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/crates/pbs/src/mev_boost/register_validator.rs b/crates/pbs/src/mev_boost/register_validator.rs index 315cf5d4..91df8996 100644 --- a/crates/pbs/src/mev_boost/register_validator.rs +++ b/crates/pbs/src/mev_boost/register_validator.rs @@ -31,8 +31,6 @@ pub async fn register_validator( .insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from_str(&utcnow_ms().to_string())?); send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers)?); - let relays = state.all_relays().to_vec(); - // prepare the body in advance, ugly dyn let bodies: Box> = if let Some(batch_size) = state.config.pbs_config.validator_registration_batch_size { @@ -47,15 +45,15 @@ pub async fn register_validator( }; send_headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); - let mut handles = Vec::with_capacity(relays.len()); + let mut handles = Vec::with_capacity(state.all_relays().len()); for (n_regs, body) in bodies { - for relay in relays.clone() { + for relay in state.all_relays().iter().cloned() { handles.push(tokio::spawn( send_register_validator_with_timeout( n_regs, body.clone(), - relay.clone(), + relay, send_headers.clone(), state.pbs_config().timeout_register_validator_ms, state.pbs_config().register_validator_retry_limit, From 4c4819604b90309f6c8d443d47303b071f02bbb2 Mon Sep 17 00:00:00 2001 From: eltitanb Date: Mon, 15 Sep 2025 15:02:22 +0100 Subject: [PATCH 6/6] comment --- crates/common/src/config/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/common/src/config/mod.rs b/crates/common/src/config/mod.rs index 55d6390d..f2fd2ff7 100644 --- a/crates/common/src/config/mod.rs +++ b/crates/common/src/config/mod.rs @@ -48,7 +48,7 @@ impl CommitBoostConfig { if self.relays.iter().any(|r| r.validator_registration_batch_size.is_some()) { bail!( - "validator_registration_batch_size is now deprecated on a per-relay basis. Please use validator_registration_batch_size in the [pbs] section instead" + "validator_registration_batch_size is now obsolete on a per-relay basis. Please use validator_registration_batch_size in the [pbs] section instead" ) }