From 676ddb174ab6fe02d720eea9e2007b58993fb456 Mon Sep 17 00:00:00 2001 From: elizabeth Date: Fri, 31 Oct 2025 19:00:41 -0400 Subject: [PATCH 1/2] publish synced flashblocks to ws --- .../src/builders/flashblocks/ctx.rs | 4 +++ .../src/builders/flashblocks/payload.rs | 9 +++--- .../builders/flashblocks/payload_handler.rs | 28 +++++++++++++++++-- .../src/builders/flashblocks/service.rs | 11 ++++++-- 4 files changed, 44 insertions(+), 8 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/ctx.rs b/crates/op-rbuilder/src/builders/flashblocks/ctx.rs index c465445f5..b5a1bd477 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/ctx.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/ctx.rs @@ -56,6 +56,10 @@ impl OpPayloadSyncerCtx { self.max_gas_per_txn } + pub(super) fn metrics(&self) -> &Arc { + &self.metrics + } + pub(super) fn into_op_payload_builder_ctx( self, payload_config: PayloadConfig>, diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 89f192100..01b8bc152 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -151,6 +151,7 @@ pub(super) struct OpPayloadBuilder { impl OpPayloadBuilder { /// `OpPayloadBuilder` constructor. + #[allow(clippy::too_many_arguments)] pub(super) fn new( evm_config: OpEvmConfig, pool: Pool, @@ -158,11 +159,11 @@ impl OpPayloadBuilder { config: BuilderConfig, builder_tx: BuilderTx, payload_tx: mpsc::Sender, + ws_pub: Arc, metrics: Arc, - ) -> eyre::Result { - let ws_pub = WebSocketPublisher::new(config.specific.ws_addr, Arc::clone(&metrics))?.into(); + ) -> Self { let address_gas_limiter = AddressGasLimiter::new(config.gas_limiter_config.clone()); - Ok(Self { + Self { evm_config, pool, client, @@ -172,7 +173,7 @@ impl OpPayloadBuilder { metrics, builder_tx, address_gas_limiter, - }) + } } } diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs index 4927a0479..f80075472 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs @@ -1,6 +1,7 @@ use crate::{ builders::flashblocks::{ ctx::OpPayloadSyncerCtx, p2p::Message, payload::FlashblocksExecutionInfo, + wspub::WebSocketPublisher, }, primitives::reth::ExecutionInfo, traits::ClientBounds, @@ -37,6 +38,9 @@ pub(crate) struct PayloadHandler { p2p_tx: mpsc::Sender, // sends a `Events::BuiltPayload` to the reth payload builder when a new payload is received. payload_events_handle: tokio::sync::broadcast::Sender>, + /// WebSocket publisher for broadcasting flashblocks + /// to all connected subscribers. + ws_pub: Arc, // context required for execution of blocks during syncing ctx: OpPayloadSyncerCtx, // chain client @@ -55,6 +59,7 @@ where p2p_tx: mpsc::Sender, payload_events_handle: tokio::sync::broadcast::Sender>, ctx: OpPayloadSyncerCtx, + ws_pub: Arc, client: Client, cancel: tokio_util::sync::CancellationToken, ) -> Self { @@ -63,6 +68,7 @@ where p2p_rx, p2p_tx, payload_events_handle, + ws_pub, ctx, client, cancel, @@ -76,6 +82,7 @@ where p2p_tx, payload_events_handle, ctx, + ws_pub, client, cancel, } = self; @@ -98,11 +105,13 @@ where let ctx = ctx.clone(); let client = client.clone(); let payload_events_handle = payload_events_handle.clone(); + let ws_pub = ws_pub.clone(); let cancel = cancel.clone(); // execute the flashblock on a thread where blocking is acceptable, // as it's potentially a heavy operation tokio::task::spawn_blocking(move || { + let metrics = ctx.metrics().clone(); let res = execute_flashblock( payload, ctx, @@ -110,9 +119,24 @@ where cancel, ); match res { - Ok((payload, _)) => { + Ok((payload, fb_payload)) => { tracing::info!(hash = payload.block().hash().to_string(), block_number = payload.block().header().number, "successfully executed received flashblock"); - let _ = payload_events_handle.send(Events::BuiltPayload(payload)); + if let Err(e) = payload_events_handle.send(Events::BuiltPayload(payload.clone())) { + warn!(e = ?e, "failed to send BuiltPayload event on synced block"); + } + + match ws_pub + .publish(&fb_payload) { + Ok(flashblock_byte_size) => { + metrics + .flashblock_byte_size_histogram + .record(flashblock_byte_size as f64); + } + Err(e) => { + tracing::warn!(error = ?e, "failed to publish flashblock to websocket subscribers"); + } + } + } Err(e) => { tracing::error!(error = ?e, "failed to execute received flashblock"); diff --git a/crates/op-rbuilder/src/builders/flashblocks/service.rs b/crates/op-rbuilder/src/builders/flashblocks/service.rs index e11fa2f2f..a2d61e939 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/service.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/service.rs @@ -8,6 +8,7 @@ use crate::{ p2p::{AGENT_VERSION, FLASHBLOCKS_STREAM_PROTOCOL, Message}, payload::{FlashblocksExecutionInfo, FlashblocksExtraCtx}, payload_handler::PayloadHandler, + wspub::WebSocketPublisher, }, generator::BlockPayloadJobGenerator, }, @@ -106,6 +107,11 @@ impl FlashblocksServiceBuilder { let metrics = Arc::new(OpRBuilderMetrics::default()); let (built_payload_tx, built_payload_rx) = tokio::sync::mpsc::channel(16); + + let ws_pub: Arc = + WebSocketPublisher::new(self.0.specific.ws_addr, metrics.clone()) + .wrap_err("failed to create ws publisher")? + .into(); let payload_builder = OpPayloadBuilder::new( OpEvmConfig::optimism(ctx.chain_spec()), pool, @@ -113,9 +119,9 @@ impl FlashblocksServiceBuilder { self.0.clone(), builder_tx, built_payload_tx, + ws_pub.clone(), metrics.clone(), - ) - .wrap_err("failed to create flashblocks payload builder")?; + ); let payload_job_config = BasicPayloadJobGeneratorConfig::default(); let payload_generator = BlockPayloadJobGenerator::with_builder( @@ -144,6 +150,7 @@ impl FlashblocksServiceBuilder { outgoing_message_tx, payload_service.payload_events_handle(), syncer_ctx, + ws_pub, ctx.provider().clone(), cancel, ); From e1da742728492317853890b7bbab15d002e6c81a Mon Sep 17 00:00:00 2001 From: elizabeth Date: Fri, 31 Oct 2025 19:04:14 -0400 Subject: [PATCH 2/2] remove unneeded clone --- crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs index f80075472..e39f74fb1 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs @@ -121,7 +121,7 @@ where match res { Ok((payload, fb_payload)) => { tracing::info!(hash = payload.block().hash().to_string(), block_number = payload.block().header().number, "successfully executed received flashblock"); - if let Err(e) = payload_events_handle.send(Events::BuiltPayload(payload.clone())) { + if let Err(e) = payload_events_handle.send(Events::BuiltPayload(payload)) { warn!(e = ?e, "failed to send BuiltPayload event on synced block"); }