Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion crates/common/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::path::PathBuf;

use eyre::Result;
use eyre::{Result, bail};
use serde::{Deserialize, Serialize};

use crate::types::{Chain, ChainLoader, ForkVersion, load_chain_from_file};
Expand Down Expand Up @@ -45,6 +45,13 @@ impl CommitBoostConfig {
if let Some(signer) = &self.signer {
signer.validate().await?;
}

if self.relays.iter().any(|r| r.validator_registration_batch_size.is_some()) {
bail!(
"validator_registration_batch_size is now obsolete on a per-relay basis. Please use validator_registration_batch_size in the [pbs] section instead"
)
}

Ok(())
}

Expand Down
5 changes: 5 additions & 0 deletions crates/common/src/config/pbs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::{
};

#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct RelayConfig {
/// Relay ID, if missing will default to the URL hostname from the entry
pub id: Option<String>,
Expand Down Expand Up @@ -128,6 +129,10 @@ pub struct PbsConfig {
/// Maximum number of retries for validator registration request per relay
#[serde(default = "default_u32::<REGISTER_VALIDATOR_RETRY_LIMIT>")]
pub register_validator_retry_limit: u32,
/// Maximum number of validators to send to relays in a single registration
/// request
#[serde(deserialize_with = "empty_string_as_none", default)]
pub validator_registration_batch_size: Option<usize>,
}

impl PbsConfig {
Expand Down
1 change: 1 addition & 0 deletions crates/pbs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ lazy_static.workspace = true
parking_lot.workspace = true
prometheus.workspace = true
reqwest.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
tower-http.workspace = true
Expand Down
3 changes: 1 addition & 2 deletions crates/pbs/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use alloy::rpc::types::beacon::relay::ValidatorRegistration;
use async_trait::async_trait;
use axum::{Router, http::HeaderMap};
use cb_common::pbs::{
Expand Down Expand Up @@ -45,7 +44,7 @@ pub trait BuilderApi<S: BuilderApiState>: 'static {

/// https://ethereum.github.io/builder-specs/#/Builder/registerValidator
async fn register_validator(
registrations: Vec<ValidatorRegistration>,
registrations: Vec<serde_json::Value>,
req_headers: HeaderMap,
state: PbsState<S>,
) -> eyre::Result<()> {
Expand Down
57 changes: 31 additions & 26 deletions crates/pbs/src/mev_boost/register_validator.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::time::{Duration, Instant};

use alloy::rpc::types::beacon::relay::ValidatorRegistration;
use alloy::primitives::Bytes;
use axum::http::{HeaderMap, HeaderValue};
use cb_common::{
pbs::{HEADER_START_TIME_UNIX_MS, RelayClient, error::PbsError},
utils::{get_user_agent_with_version, read_chunked_body_with_max, utcnow_ms},
};
use eyre::bail;
use futures::future::{join_all, select_ok};
use reqwest::header::USER_AGENT;
use reqwest::header::{CONTENT_TYPE, USER_AGENT};
use tracing::{Instrument, debug, error};
use url::Url;

Expand All @@ -21,7 +21,7 @@ use crate::{
/// Implements https://ethereum.github.io/builder-specs/#/Builder/registerValidator
/// Returns 200 if at least one relay returns 200, else 503
pub async fn register_validator<S: BuilderApiState>(
registrations: Vec<ValidatorRegistration>,
registrations: Vec<serde_json::Value>,
req_headers: HeaderMap,
state: PbsState<S>,
) -> eyre::Result<()> {
Expand All @@ -31,27 +31,29 @@ pub async fn register_validator<S: BuilderApiState>(
.insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from_str(&utcnow_ms().to_string())?);
send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers)?);

let relays = state.all_relays().to_vec();
let mut handles = Vec::with_capacity(relays.len());
for relay in relays.clone() {
if let Some(batch_size) = relay.config.validator_registration_batch_size {
for batch in registrations.chunks(batch_size) {
handles.push(tokio::spawn(
send_register_validator_with_timeout(
batch.to_vec(),
relay.clone(),
send_headers.clone(),
state.pbs_config().timeout_register_validator_ms,
state.pbs_config().register_validator_retry_limit,
)
.in_current_span(),
));
}
// prepare the body in advance, ugly dyn
let bodies: Box<dyn Iterator<Item = (usize, Bytes)>> =
if let Some(batch_size) = state.config.pbs_config.validator_registration_batch_size {
Box::new(registrations.chunks(batch_size).map(|batch| {
// SAFETY: unwrap is ok because we're serializing a &[serde_json::Value]
let body = serde_json::to_vec(batch).unwrap();
(batch.len(), Bytes::from(body))
}))
} else {
let body = serde_json::to_vec(&registrations).unwrap();
Box::new(std::iter::once((registrations.len(), Bytes::from(body))))
};
send_headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));

let mut handles = Vec::with_capacity(state.all_relays().len());

for (n_regs, body) in bodies {
for relay in state.all_relays().iter().cloned() {
handles.push(tokio::spawn(
send_register_validator_with_timeout(
registrations.clone(),
relay.clone(),
n_regs,
body.clone(),
relay,
send_headers.clone(),
state.pbs_config().timeout_register_validator_ms,
state.pbs_config().register_validator_retry_limit,
Expand Down Expand Up @@ -82,7 +84,8 @@ pub async fn register_validator<S: BuilderApiState>(
/// Register validator to relay, retry connection errors until the
/// given timeout has passed
async fn send_register_validator_with_timeout(
registrations: Vec<ValidatorRegistration>,
n_regs: usize,
body: Bytes,
relay: RelayClient,
headers: HeaderMap,
timeout_ms: u64,
Expand All @@ -97,7 +100,8 @@ async fn send_register_validator_with_timeout(
let start_request = Instant::now();
match send_register_validator(
url.clone(),
&registrations,
n_regs,
body.clone(),
&relay,
headers.clone(),
remaining_timeout_ms,
Expand Down Expand Up @@ -134,7 +138,8 @@ async fn send_register_validator_with_timeout(

async fn send_register_validator(
url: Url,
registrations: &[ValidatorRegistration],
n_regs: usize,
body: Bytes,
relay: &RelayClient,
headers: HeaderMap,
timeout_ms: u64,
Expand All @@ -146,7 +151,7 @@ async fn send_register_validator(
.post(url)
.timeout(Duration::from_millis(timeout_ms))
.headers(headers)
.json(&registrations)
.body(body.0)
.send()
.await
{
Expand Down Expand Up @@ -189,7 +194,7 @@ async fn send_register_validator(
retry,
?code,
latency = ?request_latency,
num_registrations = registrations.len(),
num_registrations = n_regs,
"registration successful"
);

Expand Down
3 changes: 1 addition & 2 deletions crates/pbs/src/routes/register_validator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use alloy::rpc::types::beacon::relay::ValidatorRegistration;
use axum::{Json, extract::State, http::HeaderMap, response::IntoResponse};
use cb_common::utils::get_user_agent;
use reqwest::StatusCode;
Expand All @@ -15,7 +14,7 @@ use crate::{
pub async fn handle_register_validator<S: BuilderApiState, A: BuilderApi<S>>(
State(state): State<PbsStateGuard<S>>,
req_headers: HeaderMap,
Json(registrations): Json<Vec<ValidatorRegistration>>,
Json(registrations): Json<Vec<serde_json::Value>>,
) -> Result<impl IntoResponse, PbsClientError> {
let state = state.read().clone();

Expand Down
1 change: 1 addition & 0 deletions tests/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub fn get_pbs_static_config(port: u16) -> PbsConfig {
rpc_url: None,
http_timeout_seconds: 10,
register_validator_retry_limit: u32::MAX,
validator_registration_batch_size: None,
}
}

Expand Down
140 changes: 0 additions & 140 deletions tests/tests/pbs_post_validators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,146 +61,6 @@ async fn test_register_validators() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn test_register_validators_returns_422_if_request_is_malformed() -> Result<()> {
setup_test_env();
let signer = random_secret();
let pubkey: BlsPublicKey = signer.public_key();

let chain = Chain::Holesky;
let pbs_port = 4100;

// Run a mock relay
let relays = vec![generate_mock_relay(pbs_port + 1, pubkey)?];
let mock_state = Arc::new(MockRelayState::new(chain, signer));
tokio::spawn(start_mock_relay_service(mock_state.clone(), pbs_port + 1));

// Run the PBS service
let config = to_pbs_config(chain, get_pbs_static_config(pbs_port), relays);
let state = PbsState::new(config);
tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state));

// leave some time to start servers
tokio::time::sleep(Duration::from_millis(100)).await;

let mock_validator = MockValidator::new(pbs_port)?;
let url = mock_validator.comm_boost.register_validator_url().unwrap();
info!("Sending register validator");

// Bad fee recipient
let bad_json = r#"[{
"message": {
"fee_recipient": "0xaa",
"gas_limit": "100000",
"timestamp": "1000000",
"pubkey": "0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
},
"signature": "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc"
}]"#;

let res = mock_validator
.comm_boost
.client
.post(url.clone())
.header("Content-Type", "application/json")
.body(bad_json)
.send()
.await?;

assert_eq!(res.status(), StatusCode::UNPROCESSABLE_ENTITY);

// Bad pubkey
let bad_json = r#"[{
"message": {
"fee_recipient": "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"gas_limit": "100000",
"timestamp": "1000000",
"pubkey": "0xbbb"
},
"signature": "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc"
}]"#;

let res = mock_validator
.comm_boost
.client
.post(url.clone())
.header("Content-Type", "application/json")
.body(bad_json)
.send()
.await?;

assert_eq!(res.status(), StatusCode::UNPROCESSABLE_ENTITY);

// Bad signature
let bad_json = r#"[{
"message": {
"fee_recipient": "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"gas_limit": "100000",
"timestamp": "1000000",
"pubkey": "0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
},
"signature": "0xcccc"
}]"#;

let res = mock_validator
.comm_boost
.client
.post(url.clone())
.header("Content-Type", "application/json")
.body(bad_json)
.send()
.await?;

assert_eq!(res.status(), StatusCode::UNPROCESSABLE_ENTITY);

// gas limit too high
let bad_json = r#"[{
"message": {
"fee_recipient": "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"gas_limit": "10000000000000000000000000000000000000000000000000000000",
"timestamp": "1000000",
"pubkey": "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc"
},
"signature": "0xcccc"
}]"#;

let res = mock_validator
.comm_boost
.client
.post(url.clone())
.header("Content-Type", "application/json")
.body(bad_json)
.send()
.await?;

assert_eq!(res.status(), StatusCode::UNPROCESSABLE_ENTITY);

// timestamp too high
let bad_json = r#"[{
"message": {
"fee_recipient": "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"gas_limit": "1000000",
"timestamp": "10000000000000000000000000000000000000000000000000000000",
"pubkey": "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc"
},
"signature": "0xcccc"
}]"#;

let res = mock_validator
.comm_boost
.client
.post(url.clone())
.header("Content-Type", "application/json")
.body(bad_json)
.send()
.await?;

assert_eq!(res.status(), StatusCode::UNPROCESSABLE_ENTITY);

assert_eq!(mock_state.received_register_validator(), 0);
Ok(())
}

#[tokio::test]
async fn test_register_validators_does_not_retry_on_429() -> Result<()> {
setup_test_env();
Expand Down
Loading