From 4b0e7c65294b0282f43f774b19e5a5839ed4fc0c Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Wed, 26 Nov 2025 15:24:45 -0300 Subject: [PATCH 01/11] add backrun rpc in tips --- .env.example | 2 + crates/ingress-rpc/src/lib.rs | 12 ++++ crates/ingress-rpc/src/service.rs | 107 +++++++++++++++++++++++++++--- justfile | 67 +++++++++++++++++++ 4 files changed, 179 insertions(+), 9 deletions(-) diff --git a/.env.example b/.env.example index ab604f98..34ef62a2 100644 --- a/.env.example +++ b/.env.example @@ -15,6 +15,8 @@ TIPS_INGRESS_BLOCK_TIME_MILLISECONDS=2000 TIPS_INGRESS_METER_BUNDLE_TIMEOUT_MS=2000 TIPS_INGRESS_MAX_BUFFERED_METER_BUNDLE_RESPONSES=100 TIPS_INGRESS_BUILDER_RPCS=http://localhost:2222,http://localhost:2222,http://localhost:2222 +TIPS_INGRESS_BACKRUN_ENABLED=true +TIPS_INGRESS_OP_RBUILDER_URL=http://localhost:2222 # Audit service configuration TIPS_AUDIT_KAFKA_PROPERTIES_FILE=/app/docker/audit-kafka-properties diff --git a/crates/ingress-rpc/src/lib.rs b/crates/ingress-rpc/src/lib.rs index 3da7d9be..464631b5 100644 --- a/crates/ingress-rpc/src/lib.rs +++ b/crates/ingress-rpc/src/lib.rs @@ -133,6 +133,18 @@ pub struct Config { default_value = "100" )] pub max_buffered_meter_bundle_responses: usize, + + /// Enable backrun bundle submission to op-rbuilder + #[arg( + long, + env = "TIPS_INGRESS_BACKRUN_ENABLED", + default_value = "false" + )] + pub backrun_enabled: bool, + + /// Op-rbuilder RPC URL for backrun bundle submission + #[arg(long, env = "TIPS_INGRESS_OP_RBUILDER_URL")] + pub op_rbuilder_url: Option, } pub fn connect_ingress_to_builder( diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index c0862a50..d1162938 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -30,6 +30,9 @@ pub trait IngressApi { #[method(name = "sendBundle")] async fn send_bundle(&self, bundle: Bundle) -> RpcResult; + #[method(name = "sendBackrunBundle")] + async fn send_backrun_bundle(&self, bundle: Bundle) -> RpcResult; + /// `eth_cancelBundle` is used to prevent a submitted bundle from being included on-chain. #[method(name = "cancelBundle")] async fn cancel_bundle(&self, request: CancelBundle) -> RpcResult<()>; @@ -50,6 +53,8 @@ pub struct IngressService { block_time_milliseconds: u64, meter_bundle_timeout_ms: u64, builder_tx: broadcast::Sender, + backrun_enabled: bool, + op_rbuilder_client: Option>, } impl IngressService { @@ -61,6 +66,13 @@ impl IngressService { builder_tx: broadcast::Sender, config: Config, ) -> Self { + let op_rbuilder_client = config.op_rbuilder_url.as_ref().map(|url| { + alloy_provider::ProviderBuilder::new() + .disable_recommended_fillers() + .network::() + .connect_http(url.clone()) + }); + Self { provider, simulation_provider, @@ -73,25 +85,102 @@ impl IngressService { block_time_milliseconds: config.block_time_milliseconds, meter_bundle_timeout_ms: config.meter_bundle_timeout_ms, builder_tx, + backrun_enabled: config.backrun_enabled, + op_rbuilder_client, } } } -#[async_trait] -impl IngressApiServer for IngressService +impl IngressService where Queue: QueuePublisher + Sync + Send + 'static, { - async fn send_bundle(&self, bundle: Bundle) -> RpcResult { - // validate the bundle and consume the `bundle` to get an `AcceptedBundle` - self.validate_bundle(&bundle).await?; + /// Helper method to validate, parse, and meter a bundle + async fn validate_parse_and_meter_bundle( + &self, + bundle: &Bundle, + ) -> RpcResult<(AcceptedBundle, B256)> { + self.validate_bundle(bundle).await?; let parsed_bundle: ParsedBundle = bundle .clone() .try_into() .map_err(|e: String| EthApiError::InvalidParams(e).into_rpc_err())?; - let bundle_hash = &parsed_bundle.bundle_hash(); - let meter_bundle_response = self.meter_bundle(&bundle, bundle_hash).await?; + let bundle_hash = parsed_bundle.bundle_hash(); + let meter_bundle_response = self.meter_bundle(bundle, &bundle_hash).await?; let accepted_bundle = AcceptedBundle::new(parsed_bundle, meter_bundle_response.clone()); + Ok((accepted_bundle, bundle_hash)) + } +} + +#[async_trait] +impl IngressApiServer for IngressService +where + Queue: QueuePublisher + Sync + Send + 'static, +{ + + async fn send_backrun_bundle(&self, bundle: Bundle) -> RpcResult { + if !self.backrun_enabled { + info!(message = "Backrun bundle submission is disabled", backrun_enabled = self.backrun_enabled); + return Err(EthApiError::InvalidParams( + "Backrun bundle submission is disabled".into(), + ) + .into_rpc_err()); + } + + let (accepted_bundle, bundle_hash) = self.validate_parse_and_meter_bundle(&bundle).await?; + + info!(message = "Validated and parsed bundle", bundle_hash = %bundle_hash); + + if let Some(client) = &self.op_rbuilder_client { + match client + .client() + .request::<(Bundle,), ()>("base_sendBackrunBundle", (bundle.clone(),)) + .await + { + Ok(_) => { + info!( + message = "Sent backrun bundle to op-rbuilder", + bundle_hash = %bundle_hash + ); + } + Err(e) => { + // Log and continue (not critical failure) + warn!( + message = "Failed to send backrun bundle to op-rbuilder", + bundle_hash = %bundle_hash, + error = %e + ); + // Don't return error, just log + } + } + } else { + // No op-rbuilder client configured + warn!( + message = "Op-rbuilder client not configured, backrun bundle not submitted", + bundle_hash = %bundle_hash + ); + // Continue execution, just log the warning + } + + let audit_event = BundleEvent::Received { + bundle_id: *accepted_bundle.uuid(), + bundle: Box::new(accepted_bundle.clone()), + }; + if let Err(e) = self.audit_channel.send(audit_event) { + warn!(message = "Failed to send audit event", error = %e); + // Don't fail the request + } + + info!(message = "Sent backrun bundle to audit channel", bundle_hash = %bundle_hash); + + Ok(BundleHash { bundle_hash }) + } + + async fn send_bundle(&self, bundle: Bundle) -> RpcResult { + let (accepted_bundle, bundle_hash) = self.validate_parse_and_meter_bundle(&bundle).await?; + + // Get meter_bundle_response for builder broadcast + let meter_bundle_response = accepted_bundle.meter_bundle_response.clone(); // asynchronously send the meter bundle response to the builder self.builder_tx @@ -101,7 +190,7 @@ where // publish the bundle to the queue if let Err(e) = self .bundle_queue - .publish(&accepted_bundle, bundle_hash) + .publish(&accepted_bundle, &bundle_hash) .await { warn!(message = "Failed to publish bundle to queue", bundle_hash = %bundle_hash, error = %e); @@ -126,7 +215,7 @@ where } Ok(BundleHash { - bundle_hash: *bundle_hash, + bundle_hash, }) } diff --git a/justfile b/justfile index 2d4398ed..4c30a664 100644 --- a/justfile +++ b/justfile @@ -99,6 +99,9 @@ get-blocks: sender := "0x70997970C51812dc3A010C7d01b50e0d17dc79C8" sender_key := "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d" +backrunner := "0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC" +backrunner_key := "0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a" + send-txn: #!/usr/bin/env bash set -euxo pipefail @@ -108,3 +111,67 @@ send-txn: hash=$(curl -s {{ ingress_url }} -X POST -H "Content-Type: application/json" --data "{\"method\":\"eth_sendRawTransaction\",\"params\":[\"$txn\"],\"id\":1,\"jsonrpc\":\"2.0\"}" | jq -r ".result") cast receipt $hash -r {{ sequencer_url }} | grep status cast receipt $hash -r {{ builder_url }} | grep status + +send-txn-with-backrun: + #!/usr/bin/env bash + set -euxo pipefail + + # 1. Get nonce and send target transaction from sender account + nonce=$(cast nonce {{ sender }} -r {{ builder_url }}) + echo "Sending target transaction from sender (nonce=$nonce)..." + target_txn=$(cast mktx --private-key {{ sender_key }} \ + 0x0000000000000000000000000000000000000000 \ + --value 0.01ether \ + --nonce $nonce \ + --chain-id 13 \ + -r {{ builder_url }}) + + target_hash=$(curl -s {{ ingress_url }} -X POST \ + -H "Content-Type: application/json" \ + --data "{\"method\":\"eth_sendRawTransaction\",\"params\":[\"$target_txn\"],\"id\":1,\"jsonrpc\":\"2.0\"}" \ + | jq -r ".result") + echo "Target tx sent: $target_hash" + + # 2. Build backrun transaction from backrunner account (different account!) + backrun_nonce=$(cast nonce {{ backrunner }} -r {{ builder_url }}) + echo "Building backrun transaction from backrunner (nonce=$backrun_nonce)..." + backrun_txn=$(cast mktx --private-key {{ backrunner_key }} \ + 0x0000000000000000000000000000000000000001 \ + --value 0.001ether \ + --nonce $backrun_nonce \ + --chain-id 13 \ + -r {{ builder_url }}) + + # 3. Compute tx hashes for reverting_tx_hashes + backrun_hash_computed=$(cast keccak $backrun_txn) + echo "Target tx hash: $target_hash" + echo "Backrun tx hash: $backrun_hash_computed" + + # 4. Construct and send bundle with reverting_tx_hashes + echo "Sending backrun bundle..." + bundle_json=$(jq -n \ + --arg target "$target_txn" \ + --arg backrun "$backrun_txn" \ + --arg target_hash "$target_hash" \ + --arg backrun_hash "$backrun_hash_computed" \ + '{ + txs: [$target, $backrun], + blockNumber: 0, + revertingTxHashes: [$target_hash, $backrun_hash] + }') + + bundle_hash=$(curl -s {{ ingress_url }} -X POST \ + -H "Content-Type: application/json" \ + --data "{\"method\":\"eth_sendBackrunBundle\",\"params\":[$bundle_json],\"id\":1,\"jsonrpc\":\"2.0\"}" \ + | jq -r ".result") + echo "Bundle sent: $bundle_hash" + + # 5. Wait and verify both transactions + echo "Waiting for transactions to land..." + sleep 5 + + echo "=== Target transaction (from sender) ===" + cast receipt $target_hash -r {{ sequencer_url }} | grep -E "(status|blockNumber|transactionIndex)" + + echo "=== Backrun transaction (from backrunner) ===" + cast receipt $backrun_hash_computed -r {{ sequencer_url }} | grep -E "(status|blockNumber|transactionIndex)" || echo "Backrun tx not found yet" From 24a6a87a353907597d6da97a8e355896445827b5 Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Wed, 26 Nov 2025 15:56:14 -0300 Subject: [PATCH 02/11] use multiple builder urls --- crates/ingress-rpc/src/lib.rs | 10 +-- crates/ingress-rpc/src/service.rs | 110 ++++++++++++++++++------------ 2 files changed, 67 insertions(+), 53 deletions(-) diff --git a/crates/ingress-rpc/src/lib.rs b/crates/ingress-rpc/src/lib.rs index 464631b5..febd9fc7 100644 --- a/crates/ingress-rpc/src/lib.rs +++ b/crates/ingress-rpc/src/lib.rs @@ -135,16 +135,8 @@ pub struct Config { pub max_buffered_meter_bundle_responses: usize, /// Enable backrun bundle submission to op-rbuilder - #[arg( - long, - env = "TIPS_INGRESS_BACKRUN_ENABLED", - default_value = "false" - )] + #[arg(long, env = "TIPS_INGRESS_BACKRUN_ENABLED", default_value = "false")] pub backrun_enabled: bool, - - /// Op-rbuilder RPC URL for backrun bundle submission - #[arg(long, env = "TIPS_INGRESS_OP_RBUILDER_URL")] - pub op_rbuilder_url: Option, } pub fn connect_ingress_to_builder( diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 8a80771d..9c252490 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -54,7 +54,7 @@ pub struct IngressService { meter_bundle_timeout_ms: u64, builder_tx: broadcast::Sender, backrun_enabled: bool, - op_rbuilder_client: Option>, + builder_clients: Vec>, } impl IngressService { @@ -66,12 +66,16 @@ impl IngressService { builder_tx: broadcast::Sender, config: Config, ) -> Self { - let op_rbuilder_client = config.op_rbuilder_url.as_ref().map(|url| { - alloy_provider::ProviderBuilder::new() - .disable_recommended_fillers() - .network::() - .connect_http(url.clone()) - }); + let builder_clients = config + .builder_rpcs + .iter() + .map(|url| { + alloy_provider::ProviderBuilder::new() + .disable_recommended_fillers() + .network::() + .connect_http(url.clone()) + }) + .collect(); Self { provider, @@ -86,7 +90,7 @@ impl IngressService { meter_bundle_timeout_ms: config.meter_bundle_timeout_ms, builder_tx, backrun_enabled: config.backrun_enabled, - op_rbuilder_client, + builder_clients, } } } @@ -117,49 +121,69 @@ impl IngressApiServer for IngressService where Queue: QueuePublisher + Sync + Send + 'static, { - async fn send_backrun_bundle(&self, bundle: Bundle) -> RpcResult { if !self.backrun_enabled { - info!(message = "Backrun bundle submission is disabled", backrun_enabled = self.backrun_enabled); - return Err(EthApiError::InvalidParams( - "Backrun bundle submission is disabled".into(), - ) - .into_rpc_err()); + info!( + message = "Backrun bundle submission is disabled", + backrun_enabled = self.backrun_enabled + ); + return Err( + EthApiError::InvalidParams("Backrun bundle submission is disabled".into()) + .into_rpc_err(), + ); } let (accepted_bundle, bundle_hash) = self.validate_parse_and_meter_bundle(&bundle).await?; - info!(message = "Validated and parsed bundle", bundle_hash = %bundle_hash); + info!(message = "Validated and parsed backrun bundle", bundle_hash = %bundle_hash); - if let Some(client) = &self.op_rbuilder_client { - match client - .client() - .request::<(Bundle,), ()>("base_sendBackrunBundle", (bundle.clone(),)) - .await - { - Ok(_) => { - info!( - message = "Sent backrun bundle to op-rbuilder", - bundle_hash = %bundle_hash - ); - } - Err(e) => { - // Log and continue (not critical failure) - warn!( - message = "Failed to send backrun bundle to op-rbuilder", - bundle_hash = %bundle_hash, - error = %e - ); - // Don't return error, just log - } - } - } else { - // No op-rbuilder client configured + // Send to all configured builder RPCs concurrently + if self.builder_clients.is_empty() { warn!( - message = "Op-rbuilder client not configured, backrun bundle not submitted", + message = "No builder RPCs configured, backrun bundle not submitted", bundle_hash = %bundle_hash ); - // Continue execution, just log the warning + } else { + let mut tasks = Vec::new(); + + for (idx, client) in self.builder_clients.iter().enumerate() { + let client = client.clone(); + let bundle = bundle.clone(); + let bundle_hash = bundle_hash; + + let task = tokio::spawn(async move { + let result = client + .client() + .request::<(Bundle,), ()>("base_sendBackrunBundle", (bundle,)) + .await; + (idx, bundle_hash, result) + }); + + tasks.push(task); + } + + // Wait for all tasks to complete + for task in tasks { + if let Ok((idx, bundle_hash, result)) = task.await { + match result { + Ok(_) => { + info!( + message = "Sent backrun bundle to op-rbuilder", + bundle_hash = %bundle_hash, + builder_idx = idx + ); + } + Err(e) => { + warn!( + message = "Failed to send backrun bundle to op-rbuilder", + bundle_hash = %bundle_hash, + builder_idx = idx, + error = %e + ); + } + } + } + } } let audit_event = BundleEvent::Received { @@ -214,9 +238,7 @@ where ); } - Ok(BundleHash { - bundle_hash, - }) + Ok(BundleHash { bundle_hash }) } async fn cancel_bundle(&self, _request: CancelBundle) -> RpcResult<()> { From 43b68cdd0219fdfc80756c3b4a8a167bd849ac5c Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Wed, 26 Nov 2025 16:09:42 -0300 Subject: [PATCH 03/11] fix clippy --- crates/ingress-rpc/src/service.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 9c252490..2c0a508d 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -149,7 +149,6 @@ where for (idx, client) in self.builder_clients.iter().enumerate() { let client = client.clone(); let bundle = bundle.clone(); - let bundle_hash = bundle_hash; let task = tokio::spawn(async move { let result = client From 14c7b9ac3f420cf63f188c2b142460cd74fdd7fc Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Wed, 26 Nov 2025 16:14:21 -0300 Subject: [PATCH 04/11] refactor --- crates/ingress-rpc/src/service.rs | 146 +++++++++++++++--------------- 1 file changed, 71 insertions(+), 75 deletions(-) diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 2c0a508d..c52f80ca 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -114,6 +114,72 @@ where let accepted_bundle = AcceptedBundle::new(parsed_bundle, meter_bundle_response.clone()); Ok((accepted_bundle, bundle_hash)) } + + /// Helper method to send backrun bundle to all configured builders concurrently + async fn send_backrun_to_builders(&self, bundle: &Bundle, bundle_hash: B256) { + if self.builder_clients.is_empty() { + warn!( + message = "No builder RPCs configured, backrun bundle not submitted", + bundle_hash = %bundle_hash + ); + return; + } + + let mut tasks = Vec::new(); + + for (idx, client) in self.builder_clients.iter().enumerate() { + let client = client.clone(); + let bundle = bundle.clone(); + + let task = tokio::spawn(async move { + let result = client + .client() + .request::<(Bundle,), ()>("base_sendBackrunBundle", (bundle,)) + .await; + (idx, bundle_hash, result) + }); + + tasks.push(task); + } + + // Wait for all tasks to complete + for task in tasks { + if let Ok((idx, bundle_hash, result)) = task.await { + match result { + Ok(_) => { + info!( + message = "Sent backrun bundle to op-rbuilder", + bundle_hash = %bundle_hash, + builder_idx = idx + ); + } + Err(e) => { + warn!( + message = "Failed to send backrun bundle to op-rbuilder", + bundle_hash = %bundle_hash, + builder_idx = idx, + error = %e + ); + } + } + } + } + } + + /// Helper method to send audit event for a bundle + fn send_audit_event(&self, accepted_bundle: &AcceptedBundle, bundle_hash: B256) { + let audit_event = BundleEvent::Received { + bundle_id: *accepted_bundle.uuid(), + bundle: Box::new(accepted_bundle.clone()), + }; + if let Err(e) = self.audit_channel.send(audit_event) { + warn!( + message = "Failed to send audit event", + bundle_hash = %bundle_hash, + error = %e + ); + } + } } #[async_trait] @@ -135,66 +201,11 @@ where let (accepted_bundle, bundle_hash) = self.validate_parse_and_meter_bundle(&bundle).await?; - info!(message = "Validated and parsed backrun bundle", bundle_hash = %bundle_hash); - // Send to all configured builder RPCs concurrently - if self.builder_clients.is_empty() { - warn!( - message = "No builder RPCs configured, backrun bundle not submitted", - bundle_hash = %bundle_hash - ); - } else { - let mut tasks = Vec::new(); - - for (idx, client) in self.builder_clients.iter().enumerate() { - let client = client.clone(); - let bundle = bundle.clone(); - - let task = tokio::spawn(async move { - let result = client - .client() - .request::<(Bundle,), ()>("base_sendBackrunBundle", (bundle,)) - .await; - (idx, bundle_hash, result) - }); - - tasks.push(task); - } - - // Wait for all tasks to complete - for task in tasks { - if let Ok((idx, bundle_hash, result)) = task.await { - match result { - Ok(_) => { - info!( - message = "Sent backrun bundle to op-rbuilder", - bundle_hash = %bundle_hash, - builder_idx = idx - ); - } - Err(e) => { - warn!( - message = "Failed to send backrun bundle to op-rbuilder", - bundle_hash = %bundle_hash, - builder_idx = idx, - error = %e - ); - } - } - } - } - } - - let audit_event = BundleEvent::Received { - bundle_id: *accepted_bundle.uuid(), - bundle: Box::new(accepted_bundle.clone()), - }; - if let Err(e) = self.audit_channel.send(audit_event) { - warn!(message = "Failed to send audit event", error = %e); - // Don't fail the request - } + self.send_backrun_to_builders(&bundle, bundle_hash).await; - info!(message = "Sent backrun bundle to audit channel", bundle_hash = %bundle_hash); + // Send audit event + self.send_audit_event(&accepted_bundle, bundle_hash); Ok(BundleHash { bundle_hash }) } @@ -226,16 +237,7 @@ where ); // asynchronously send the audit event to the audit channel - let audit_event = BundleEvent::Received { - bundle_id: *accepted_bundle.uuid(), - bundle: Box::new(accepted_bundle.clone()), - }; - if let Err(e) = self.audit_channel.send(audit_event) { - warn!(message = "Failed to send audit event", error = %e); - return Err( - EthApiError::InvalidParams("Failed to send audit event".into()).into_rpc_err(), - ); - } + self.send_audit_event(&accepted_bundle, bundle_hash); Ok(BundleHash { bundle_hash }) } @@ -314,13 +316,7 @@ where } } - let audit_event = BundleEvent::Received { - bundle_id: *accepted_bundle.uuid(), - bundle: accepted_bundle.clone().into(), - }; - if let Err(e) = self.audit_channel.send(audit_event) { - warn!(message = "Failed to send audit event", error = %e); - } + self.send_audit_event(&accepted_bundle, transaction.tx_hash()); self.metrics .send_raw_transaction_duration From 3a380afff4f7a21b37aa2866b2a1a1f435d0e5c0 Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Wed, 26 Nov 2025 16:14:42 -0300 Subject: [PATCH 05/11] remove env --- .env.example | 1 - 1 file changed, 1 deletion(-) diff --git a/.env.example b/.env.example index 34ef62a2..9eeb2fdc 100644 --- a/.env.example +++ b/.env.example @@ -16,7 +16,6 @@ TIPS_INGRESS_METER_BUNDLE_TIMEOUT_MS=2000 TIPS_INGRESS_MAX_BUFFERED_METER_BUNDLE_RESPONSES=100 TIPS_INGRESS_BUILDER_RPCS=http://localhost:2222,http://localhost:2222,http://localhost:2222 TIPS_INGRESS_BACKRUN_ENABLED=true -TIPS_INGRESS_OP_RBUILDER_URL=http://localhost:2222 # Audit service configuration TIPS_AUDIT_KAFKA_PROPERTIES_FILE=/app/docker/audit-kafka-properties From 7dda49a720fd175b8bffc9a166ff2dbb1e42fcbd Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Wed, 26 Nov 2025 16:43:26 -0300 Subject: [PATCH 06/11] use future instead of tokio spawn task --- Cargo.lock | 1 + Cargo.toml | 1 + crates/ingress-rpc/Cargo.toml | 1 + crates/ingress-rpc/src/service.rs | 62 ++++++++++++++----------------- 4 files changed, 30 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 67b8d591..812802fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6909,6 +6909,7 @@ dependencies = [ "backon", "clap", "dotenvy", + "futures", "jsonrpsee", "metrics", "metrics-derive", diff --git a/Cargo.toml b/Cargo.toml index 3a4d2082..cefa94d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ testcontainers = { version = "0.23.1", features = ["blocking"] } testcontainers-modules = { version = "0.11.2", features = ["postgres", "kafka", "minio"] } jsonrpsee = { version = "0.26.0", features = ["server", "macros"] } chrono = { version = "0.4.42", features = ["serde"] } +futures = "0.3" # Kafka and S3 dependencies rdkafka = { version = "0.37.0", features = ["libz-static", "ssl-vendored"] } diff --git a/crates/ingress-rpc/Cargo.toml b/crates/ingress-rpc/Cargo.toml index 5099da88..b95e97a7 100644 --- a/crates/ingress-rpc/Cargo.toml +++ b/crates/ingress-rpc/Cargo.toml @@ -38,3 +38,4 @@ reth-optimism-evm.workspace = true metrics.workspace = true metrics-derive.workspace = true metrics-exporter-prometheus.workspace = true +futures.workspace = true diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index c52f80ca..b33ba21c 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -23,6 +23,7 @@ use crate::metrics::{Metrics, record_histogram}; use crate::queue::QueuePublisher; use crate::validation::validate_bundle; use crate::{Config, TxSubmissionMethod}; +use futures::future::join_all; #[rpc(server, namespace = "eth")] pub trait IngressApi { @@ -125,43 +126,34 @@ where return; } - let mut tasks = Vec::new(); - - for (idx, client) in self.builder_clients.iter().enumerate() { - let client = client.clone(); - let bundle = bundle.clone(); - - let task = tokio::spawn(async move { - let result = client - .client() - .request::<(Bundle,), ()>("base_sendBackrunBundle", (bundle,)) - .await; - (idx, bundle_hash, result) + let futures = self + .builder_clients + .iter() + .enumerate() + .map(|(idx, client)| { + let bundle = bundle.clone(); + async move { + let result = client + .client() + .request::<(Bundle,), ()>("base_sendBackrunBundle", (bundle,)) + .await; + (idx, result) + } }); - tasks.push(task); - } - - // Wait for all tasks to complete - for task in tasks { - if let Ok((idx, bundle_hash, result)) = task.await { - match result { - Ok(_) => { - info!( - message = "Sent backrun bundle to op-rbuilder", - bundle_hash = %bundle_hash, - builder_idx = idx - ); - } - Err(e) => { - warn!( - message = "Failed to send backrun bundle to op-rbuilder", - bundle_hash = %bundle_hash, - builder_idx = idx, - error = %e - ); - } - } + for (idx, result) in join_all(futures).await { + match result { + Ok(_) => info!( + message = "Sent backrun bundle to op-rbuilder", + bundle_hash = %bundle_hash, + builder_idx = idx, + ), + Err(e) => warn!( + message = "Failed to send backrun bundle to op-rbuilder", + bundle_hash = %bundle_hash, + builder_idx = idx, + error = %e, + ), } } } From f60759640438bab78db847972da10509f403822d Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Wed, 26 Nov 2025 16:51:51 -0300 Subject: [PATCH 07/11] add metrics --- crates/ingress-rpc/src/metrics.rs | 8 +++++++- crates/ingress-rpc/src/service.rs | 9 +++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/crates/ingress-rpc/src/metrics.rs b/crates/ingress-rpc/src/metrics.rs index eb2e34cf..b41eb019 100644 --- a/crates/ingress-rpc/src/metrics.rs +++ b/crates/ingress-rpc/src/metrics.rs @@ -1,4 +1,4 @@ -use metrics::Histogram; +use metrics::{Counter, Histogram}; use metrics_derive::Metrics; use metrics_exporter_prometheus::PrometheusBuilder; use std::net::SocketAddr; @@ -29,6 +29,12 @@ pub struct Metrics { #[metric(describe = "Duration of send_raw_transaction")] pub send_raw_transaction_duration: Histogram, + + #[metric(describe = "Total backrun bundles received")] + pub backrun_bundles_received_total: Counter, + + #[metric(describe = "Duration to send backrun bundle to op-rbuilder")] + pub backrun_bundles_sent_duration: Histogram, } /// Initialize Prometheus metrics exporter diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index b33ba21c..3765223e 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -191,14 +191,23 @@ where ); } + let start = Instant::now(); let (accepted_bundle, bundle_hash) = self.validate_parse_and_meter_bundle(&bundle).await?; + // Record metric: backrun bundle received + self.metrics.backrun_bundles_received_total.increment(1); + // Send to all configured builder RPCs concurrently self.send_backrun_to_builders(&bundle, bundle_hash).await; // Send audit event self.send_audit_event(&accepted_bundle, bundle_hash); + // Record metric: duration to send backrun bundle + self.metrics + .backrun_bundles_sent_duration + .record(start.elapsed().as_secs_f64()); + Ok(BundleHash { bundle_hash }) } From b06d90e52cfb2c9c506eeb98e63ca59a739b8f8a Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Wed, 26 Nov 2025 17:00:25 -0300 Subject: [PATCH 08/11] remove bad comments --- crates/ingress-rpc/src/service.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 3765223e..8b6d3197 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -194,7 +194,6 @@ where let start = Instant::now(); let (accepted_bundle, bundle_hash) = self.validate_parse_and_meter_bundle(&bundle).await?; - // Record metric: backrun bundle received self.metrics.backrun_bundles_received_total.increment(1); // Send to all configured builder RPCs concurrently @@ -203,7 +202,6 @@ where // Send audit event self.send_audit_event(&accepted_bundle, bundle_hash); - // Record metric: duration to send backrun bundle self.metrics .backrun_bundles_sent_duration .record(start.elapsed().as_secs_f64()); From 3f4981715313c05a1d2cd7e24486a8187e0a2d5a Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Tue, 2 Dec 2025 13:48:48 -0300 Subject: [PATCH 09/11] address comments --- Cargo.lock | 179 ++++++++++++++++++++++++++--- Cargo.toml | 1 - crates/ingress-rpc/Cargo.toml | 1 - crates/ingress-rpc/src/bin/main.rs | 12 +- crates/ingress-rpc/src/lib.rs | 33 ++++-- crates/ingress-rpc/src/service.rs | 136 +++++++--------------- 6 files changed, 235 insertions(+), 127 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 812802fd..12ae8a6b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -51,9 +51,9 @@ dependencies = [ [[package]] name = "alloy-consensus" -version = "1.1.0" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90d103d3e440ad6f703dd71a5b58a6abd24834563bde8a5fabe706e00242f810" +checksum = "8b6440213a22df93a87ed512d2f668e7dc1d62a05642d107f82d61edc9e12370" dependencies = [ "alloy-eips", "alloy-primitives", @@ -62,6 +62,7 @@ dependencies = [ "alloy-trie", "alloy-tx-macros", "auto_impl", + "borsh", "c-kzg", "derive_more", "either", @@ -77,9 +78,9 @@ dependencies = [ [[package]] name = "alloy-consensus-any" -version = "1.1.0" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48ead76c8c84ab3a50c31c56bc2c748c2d64357ad2131c32f9b10ab790a25e1a" +checksum = "15d0bea09287942405c4f9d2a4f22d1e07611c2dbd9d5bf94b75366340f9e6e0" dependencies = [ "alloy-consensus", "alloy-eips", @@ -130,9 +131,9 @@ dependencies = [ [[package]] name = "alloy-eips" -version = "1.1.0" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7bdbec74583d0067798d77afa43d58f00d93035335d7ceaa5d3f93857d461bb9" +checksum = "4bd2c7ae05abcab4483ce821f12f285e01c0b33804e6883dd9ca1569a87ee2be" dependencies = [ "alloy-eip2124", "alloy-eip2930", @@ -141,6 +142,7 @@ dependencies = [ "alloy-rlp", "alloy-serde", "auto_impl", + "borsh", "c-kzg", "derive_more", "either", @@ -257,9 +259,9 @@ dependencies = [ [[package]] name = "alloy-network-primitives" -version = "1.1.0" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0e7918396eecd69d9c907046ec8a93fb09b89e2f325d5e7ea9c4e3929aa0dd2" +checksum = "7805124ad69e57bbae7731c9c344571700b2a18d351bda9e0eba521c991d1bcb" dependencies = [ "alloy-consensus", "alloy-eips", @@ -410,6 +412,19 @@ dependencies = [ "wasmtimer", ] +[[package]] +name = "alloy-rpc-types" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64e279e6d40ee40fe8f76753b678d8d5d260cb276dc6c8a8026099b16d2b43f4" +dependencies = [ + "alloy-primitives", + "alloy-rpc-types-engine", + "alloy-rpc-types-eth", + "alloy-serde", + "serde", +] + [[package]] name = "alloy-rpc-types-admin" version = "1.1.0" @@ -435,9 +450,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types-engine" -version = "1.1.0" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07da696cc7fbfead4b1dda8afe408685cae80975cbb024f843ba74d9639cd0d3" +checksum = "d9c4c53a8b0905d931e7921774a1830609713bd3e8222347963172b03a3ecc68" dependencies = [ "alloy-consensus", "alloy-eips", @@ -447,15 +462,17 @@ dependencies = [ "derive_more", "ethereum_ssz", "ethereum_ssz_derive", + "jsonwebtoken", + "rand 0.8.5", "serde", "strum", ] [[package]] name = "alloy-rpc-types-eth" -version = "1.1.0" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a15e4831b71eea9d20126a411c1c09facf1d01d5cac84fd51d532d3c429cfc26" +checksum = "ed5fafb741c19b3cca4cdd04fa215c89413491f9695a3e928dee2ae5657f607e" dependencies = [ "alloy-consensus", "alloy-consensus-any", @@ -488,9 +505,9 @@ dependencies = [ [[package]] name = "alloy-serde" -version = "1.1.0" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "751d1887f7d202514a82c5b3caf28ee8bd4a2ad9549e4f498b6f0bff99b52add" +checksum = "a6f180c399ca7c1e2fe17ea58343910cad0090878a696ff5a50241aee12fc529" dependencies = [ "alloy-primitives", "serde", @@ -654,9 +671,9 @@ dependencies = [ [[package]] name = "alloy-tx-macros" -version = "1.1.0" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd7ce8ed34106acd6e21942022b6a15be6454c2c3ead4d76811d3bdcd63cf771" +checksum = "ae109e33814b49fc0a62f2528993aa8a2dd346c26959b151f05441dc0b9da292" dependencies = [ "darling 0.21.3", "proc-macro2", @@ -1545,6 +1562,58 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b098575ebe77cb6d14fc7f32749631a6e44edbef6b796f89b020e99ba20d425" +dependencies = [ + "axum-core", + "bytes", + "form_urlencoded", + "futures-util", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "hyper 1.8.0", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde_core", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59446ce19cd142f8833f856eb31f3eb097812d1479ab224f54d72428ca21ea22" +dependencies = [ + "bytes", + "futures-core", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "az" version = "1.2.1" @@ -3555,6 +3624,21 @@ dependencies = [ "thiserror", ] +[[package]] +name = "jsonwebtoken" +version = "9.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a87cc7a48537badeae96744432de36f4be2b4a34a05a5ef32e9dd8a1c169dde" +dependencies = [ + "base64 0.22.1", + "js-sys", + "pem", + "ring", + "serde", + "serde_json", + "simple_asn1", +] + [[package]] name = "k256" version = "0.13.4" @@ -3710,6 +3794,12 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matchit" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" + [[package]] name = "md-5" version = "0.10.6" @@ -3785,6 +3875,12 @@ dependencies = [ "sketches-ddsketch", ] +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -4264,6 +4360,16 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "pem" +version = "3.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d30c53c26bc5b31a98cd02d20f25a7c8567146caf63ed593a9d87b2775291be" +dependencies = [ + "base64 0.22.1", + "serde_core", +] + [[package]] name = "percent-encoding" version = "2.3.2" @@ -6328,6 +6434,17 @@ dependencies = [ "serde_core", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457" +dependencies = [ + "itoa", + "serde", + "serde_core", +] + [[package]] name = "serde_repr" version = "0.1.20" @@ -6478,6 +6595,18 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "simple_asn1" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "297f631f50729c8c99b84667867963997ec0b50f32b2a7dbcab828ef0541e8bb" +dependencies = [ + "num-bigint", + "num-traits", + "thiserror", + "time", +] + [[package]] name = "siphasher" version = "1.0.1" @@ -6884,6 +7013,7 @@ dependencies = [ "alloy-consensus", "alloy-primitives", "alloy-provider", + "alloy-rpc-types", "alloy-serde", "alloy-signer-local", "op-alloy-consensus", @@ -6906,10 +7036,10 @@ dependencies = [ "alloy-signer-local", "anyhow", "async-trait", + "axum", "backon", "clap", "dotenvy", - "futures", "jsonrpsee", "metrics", "metrics-derive", @@ -7071,6 +7201,7 @@ dependencies = [ "tokio", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -7109,6 +7240,7 @@ version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -7156,6 +7288,16 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-serde" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.2.25" @@ -7175,12 +7317,15 @@ dependencies = [ "nu-ansi-term", "once_cell", "regex-automata", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", "tracing-log", + "tracing-serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 92a23537..fd18eaaf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,6 @@ testcontainers = { version = "0.23.1", features = ["blocking"] } testcontainers-modules = { version = "0.11.2", features = ["postgres", "kafka", "minio"] } jsonrpsee = { version = "0.26.0", features = ["server", "macros"] } chrono = { version = "0.4.42", features = ["serde"] } -futures = "0.3" axum = "0.8.3" # Kafka and S3 dependencies diff --git a/crates/ingress-rpc/Cargo.toml b/crates/ingress-rpc/Cargo.toml index a2a4787b..6e7f4275 100644 --- a/crates/ingress-rpc/Cargo.toml +++ b/crates/ingress-rpc/Cargo.toml @@ -38,5 +38,4 @@ reth-optimism-evm.workspace = true metrics.workspace = true metrics-derive.workspace = true metrics-exporter-prometheus.workspace = true -futures.workspace = true axum.workspace = true diff --git a/crates/ingress-rpc/src/bin/main.rs b/crates/ingress-rpc/src/bin/main.rs index 4d0f2b2a..b0204cae 100644 --- a/crates/ingress-rpc/src/bin/main.rs +++ b/crates/ingress-rpc/src/bin/main.rs @@ -5,11 +5,11 @@ use op_alloy_network::Optimism; use rdkafka::ClientConfig; use rdkafka::producer::FutureProducer; use tips_audit::{BundleEvent, KafkaBundleEventPublisher, connect_audit_to_publisher}; -use tips_core::MeterBundleResponse; use tips_core::kafka::load_kafka_config_from_file; use tips_core::logger::init_logger_with_format; +use tips_core::{Bundle, MeterBundleResponse}; use tips_ingress_rpc::Config; -use tips_ingress_rpc::connect_ingress_to_builder; +use tips_ingress_rpc::connect_to_builder; use tips_ingress_rpc::health::bind_health_server; use tips_ingress_rpc::metrics::init_prometheus_exporter; use tips_ingress_rpc::queue::KafkaQueuePublisher; @@ -68,9 +68,12 @@ async fn main() -> anyhow::Result<()> { let (builder_tx, _) = broadcast::channel::(config.max_buffered_meter_bundle_responses); + let (builder_backrun_tx, _) = + broadcast::channel::(config.max_buffered_meter_bundle_responses); config.builder_rpcs.iter().for_each(|builder_rpc| { - let builder_rx = builder_tx.subscribe(); - connect_ingress_to_builder(builder_rx, builder_rpc.clone()); + let metering_rx = builder_tx.subscribe(); + let backrun_rx = builder_backrun_tx.subscribe(); + connect_to_builder(metering_rx, backrun_rx, builder_rpc.clone()); }); let health_check_addr = config.health_check_addr; @@ -86,6 +89,7 @@ async fn main() -> anyhow::Result<()> { queue, audit_tx, builder_tx, + builder_backrun_tx, cfg, ); let bind_addr = format!("{}:{}", config.address, config.port); diff --git a/crates/ingress-rpc/src/lib.rs b/crates/ingress-rpc/src/lib.rs index b01f8837..1d882ca8 100644 --- a/crates/ingress-rpc/src/lib.rs +++ b/crates/ingress-rpc/src/lib.rs @@ -151,21 +151,23 @@ pub struct Config { pub backrun_enabled: bool, } -pub fn connect_ingress_to_builder( - event_rx: broadcast::Receiver, +pub fn connect_to_builder( + metering_rx: broadcast::Receiver, + backrun_rx: broadcast::Receiver, builder_rpc: Url, ) { - tokio::spawn(async move { - let builder: RootProvider = ProviderBuilder::new() - .disable_recommended_fillers() - .network::() - .connect_http(builder_rpc); + let builder: RootProvider = ProviderBuilder::new() + .disable_recommended_fillers() + .network::() + .connect_http(builder_rpc); - let mut event_rx = event_rx; + let metering_builder = builder.clone(); + tokio::spawn(async move { + let mut event_rx = metering_rx; while let Ok(event) = event_rx.recv().await { // we only support one transaction per bundle for now let tx_hash = event.results[0].tx_hash; - if let Err(e) = builder + if let Err(e) = metering_builder .client() .request::<(TxHash, MeterBundleResponse), ()>( "base_setMeteringInformation", @@ -177,4 +179,17 @@ pub fn connect_ingress_to_builder( } } }); + + tokio::spawn(async move { + let mut event_rx = backrun_rx; + while let Ok(bundle) = event_rx.recv().await { + if let Err(e) = builder + .client() + .request::<(tips_core::Bundle,), ()>("base_sendBackrunBundle", (bundle,)) + .await + { + error!(error = %e, "Failed to send backrun bundle to builder"); + } + } + }); } diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 351b51ba..c26d971b 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -24,7 +24,6 @@ use crate::metrics::{Metrics, record_histogram}; use crate::queue::QueuePublisher; use crate::validation::validate_bundle; use crate::{Config, TxSubmissionMethod}; -use futures::future::join_all; #[rpc(server, namespace = "eth")] pub trait IngressApi { @@ -63,7 +62,7 @@ pub struct IngressService { meter_bundle_timeout_ms: u64, builder_tx: broadcast::Sender, backrun_enabled: bool, - builder_clients: Vec>, + builder_backrun_tx: broadcast::Sender, } impl IngressService { @@ -73,19 +72,9 @@ impl IngressService { queue: Queue, audit_channel: mpsc::UnboundedSender, builder_tx: broadcast::Sender, + builder_backrun_tx: broadcast::Sender, config: Config, ) -> Self { - let builder_clients = config - .builder_rpcs - .iter() - .map(|url| { - alloy_provider::ProviderBuilder::new() - .disable_recommended_fillers() - .network::() - .connect_http(url.clone()) - }) - .collect(); - Self { provider, simulation_provider, @@ -99,85 +88,7 @@ impl IngressService { meter_bundle_timeout_ms: config.meter_bundle_timeout_ms, builder_tx, backrun_enabled: config.backrun_enabled, - builder_clients, - } - } -} - -impl IngressService -where - Queue: QueuePublisher + Sync + Send + 'static, -{ - /// Helper method to validate, parse, and meter a bundle - async fn validate_parse_and_meter_bundle( - &self, - bundle: &Bundle, - ) -> RpcResult<(AcceptedBundle, B256)> { - self.validate_bundle(bundle).await?; - let parsed_bundle: ParsedBundle = bundle - .clone() - .try_into() - .map_err(|e: String| EthApiError::InvalidParams(e).into_rpc_err())?; - let bundle_hash = parsed_bundle.bundle_hash(); - let meter_bundle_response = self.meter_bundle(bundle, &bundle_hash).await?; - let accepted_bundle = AcceptedBundle::new(parsed_bundle, meter_bundle_response.clone()); - Ok((accepted_bundle, bundle_hash)) - } - - /// Helper method to send backrun bundle to all configured builders concurrently - async fn send_backrun_to_builders(&self, bundle: &Bundle, bundle_hash: B256) { - if self.builder_clients.is_empty() { - warn!( - message = "No builder RPCs configured, backrun bundle not submitted", - bundle_hash = %bundle_hash - ); - return; - } - - let futures = self - .builder_clients - .iter() - .enumerate() - .map(|(idx, client)| { - let bundle = bundle.clone(); - async move { - let result = client - .client() - .request::<(Bundle,), ()>("base_sendBackrunBundle", (bundle,)) - .await; - (idx, result) - } - }); - - for (idx, result) in join_all(futures).await { - match result { - Ok(_) => info!( - message = "Sent backrun bundle to op-rbuilder", - bundle_hash = %bundle_hash, - builder_idx = idx, - ), - Err(e) => warn!( - message = "Failed to send backrun bundle to op-rbuilder", - bundle_hash = %bundle_hash, - builder_idx = idx, - error = %e, - ), - } - } - } - - /// Helper method to send audit event for a bundle - fn send_audit_event(&self, accepted_bundle: &AcceptedBundle, bundle_hash: B256) { - let audit_event = BundleEvent::Received { - bundle_id: *accepted_bundle.uuid(), - bundle: Box::new(accepted_bundle.clone()), - }; - if let Err(e) = self.audit_channel.send(audit_event) { - warn!( - message = "Failed to send audit event", - bundle_hash = %bundle_hash, - error = %e - ); + builder_backrun_tx, } } } @@ -204,10 +115,14 @@ where self.metrics.backrun_bundles_received_total.increment(1); - // Send to all configured builder RPCs concurrently - self.send_backrun_to_builders(&bundle, bundle_hash).await; + if let Err(e) = self.builder_backrun_tx.send(bundle) { + warn!( + message = "Failed to send backrun bundle to builders", + bundle_hash = %bundle_hash, + error = %e + ); + } - // Send audit event self.send_audit_event(&accepted_bundle, bundle_hash); self.metrics @@ -432,6 +347,37 @@ where } Ok(res) } + + /// Helper method to validate, parse, and meter a bundle + async fn validate_parse_and_meter_bundle( + &self, + bundle: &Bundle, + ) -> RpcResult<(AcceptedBundle, B256)> { + self.validate_bundle(bundle).await?; + let parsed_bundle: ParsedBundle = bundle + .clone() + .try_into() + .map_err(|e: String| EthApiError::InvalidParams(e).into_rpc_err())?; + let bundle_hash = parsed_bundle.bundle_hash(); + let meter_bundle_response = self.meter_bundle(bundle, &bundle_hash).await?; + let accepted_bundle = AcceptedBundle::new(parsed_bundle, meter_bundle_response.clone()); + Ok((accepted_bundle, bundle_hash)) + } + + /// Helper method to send audit event for a bundle + fn send_audit_event(&self, accepted_bundle: &AcceptedBundle, bundle_hash: B256) { + let audit_event = BundleEvent::Received { + bundle_id: *accepted_bundle.uuid(), + bundle: Box::new(accepted_bundle.clone()), + }; + if let Err(e) = self.audit_channel.send(audit_event) { + warn!( + message = "Failed to send audit event", + bundle_hash = %bundle_hash, + error = %e + ); + } + } } #[cfg(test)] From 73fc770fa9648dac8900fc26c3e9ad7d231885b2 Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Tue, 2 Dec 2025 13:50:44 -0300 Subject: [PATCH 10/11] rename back --- crates/ingress-rpc/src/bin/main.rs | 4 ++-- crates/ingress-rpc/src/lib.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/ingress-rpc/src/bin/main.rs b/crates/ingress-rpc/src/bin/main.rs index b0204cae..156dc257 100644 --- a/crates/ingress-rpc/src/bin/main.rs +++ b/crates/ingress-rpc/src/bin/main.rs @@ -9,7 +9,7 @@ use tips_core::kafka::load_kafka_config_from_file; use tips_core::logger::init_logger_with_format; use tips_core::{Bundle, MeterBundleResponse}; use tips_ingress_rpc::Config; -use tips_ingress_rpc::connect_to_builder; +use tips_ingress_rpc::connect_ingress_to_builder; use tips_ingress_rpc::health::bind_health_server; use tips_ingress_rpc::metrics::init_prometheus_exporter; use tips_ingress_rpc::queue::KafkaQueuePublisher; @@ -73,7 +73,7 @@ async fn main() -> anyhow::Result<()> { config.builder_rpcs.iter().for_each(|builder_rpc| { let metering_rx = builder_tx.subscribe(); let backrun_rx = builder_backrun_tx.subscribe(); - connect_to_builder(metering_rx, backrun_rx, builder_rpc.clone()); + connect_ingress_to_builder(metering_rx, backrun_rx, builder_rpc.clone()); }); let health_check_addr = config.health_check_addr; diff --git a/crates/ingress-rpc/src/lib.rs b/crates/ingress-rpc/src/lib.rs index 1d882ca8..9c7fd4f8 100644 --- a/crates/ingress-rpc/src/lib.rs +++ b/crates/ingress-rpc/src/lib.rs @@ -151,7 +151,7 @@ pub struct Config { pub backrun_enabled: bool, } -pub fn connect_to_builder( +pub fn connect_ingress_to_builder( metering_rx: broadcast::Receiver, backrun_rx: broadcast::Receiver, builder_rpc: Url, From 0c1fd9afdde0491924d2a970a3da77f3245b122d Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Tue, 2 Dec 2025 13:53:30 -0300 Subject: [PATCH 11/11] add buffer config for backrun --- crates/ingress-rpc/src/bin/main.rs | 3 +-- crates/ingress-rpc/src/lib.rs | 8 ++++++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/crates/ingress-rpc/src/bin/main.rs b/crates/ingress-rpc/src/bin/main.rs index 156dc257..d5fd5430 100644 --- a/crates/ingress-rpc/src/bin/main.rs +++ b/crates/ingress-rpc/src/bin/main.rs @@ -68,8 +68,7 @@ async fn main() -> anyhow::Result<()> { let (builder_tx, _) = broadcast::channel::(config.max_buffered_meter_bundle_responses); - let (builder_backrun_tx, _) = - broadcast::channel::(config.max_buffered_meter_bundle_responses); + let (builder_backrun_tx, _) = broadcast::channel::(config.max_buffered_backrun_bundles); config.builder_rpcs.iter().for_each(|builder_rpc| { let metering_rx = builder_tx.subscribe(); let backrun_rx = builder_backrun_tx.subscribe(); diff --git a/crates/ingress-rpc/src/lib.rs b/crates/ingress-rpc/src/lib.rs index 9c7fd4f8..8115f28b 100644 --- a/crates/ingress-rpc/src/lib.rs +++ b/crates/ingress-rpc/src/lib.rs @@ -138,6 +138,14 @@ pub struct Config { )] pub max_buffered_meter_bundle_responses: usize, + /// Maximum number of backrun bundles to buffer in memory + #[arg( + long, + env = "TIPS_INGRESS_MAX_BUFFERED_BACKRUN_BUNDLES", + default_value = "100" + )] + pub max_buffered_backrun_bundles: usize, + /// Address to bind the health check server to #[arg( long,