Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/op-rbuilder/src/builders/flashblocks/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ impl OpPayloadSyncerCtx {
self.max_gas_per_txn
}

pub(super) fn metrics(&self) -> &Arc<OpRBuilderMetrics> {
&self.metrics
}

pub(super) fn into_op_payload_builder_ctx(
self,
payload_config: PayloadConfig<OpPayloadBuilderAttributes<OpTransactionSigned>>,
Expand Down
9 changes: 5 additions & 4 deletions crates/op-rbuilder/src/builders/flashblocks/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,19 @@ pub(super) struct OpPayloadBuilder<Pool, Client, BuilderTx> {

impl<Pool, Client, BuilderTx> OpPayloadBuilder<Pool, Client, BuilderTx> {
/// `OpPayloadBuilder` constructor.
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
evm_config: OpEvmConfig,
pool: Pool,
client: Client,
config: BuilderConfig<FlashblocksConfig>,
builder_tx: BuilderTx,
payload_tx: mpsc::Sender<OpBuiltPayload>,
ws_pub: Arc<WebSocketPublisher>,
metrics: Arc<OpRBuilderMetrics>,
) -> eyre::Result<Self> {
let ws_pub = WebSocketPublisher::new(config.specific.ws_addr, Arc::clone(&metrics))?.into();
) -> Self {
let address_gas_limiter = AddressGasLimiter::new(config.gas_limiter_config.clone());
Ok(Self {
Self {
evm_config,
pool,
client,
Expand All @@ -172,7 +173,7 @@ impl<Pool, Client, BuilderTx> OpPayloadBuilder<Pool, Client, BuilderTx> {
metrics,
builder_tx,
address_gas_limiter,
})
}
}
}

Expand Down
28 changes: 26 additions & 2 deletions crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
builders::flashblocks::{
ctx::OpPayloadSyncerCtx, p2p::Message, payload::FlashblocksExecutionInfo,
wspub::WebSocketPublisher,
},
primitives::reth::ExecutionInfo,
traits::ClientBounds,
Expand Down Expand Up @@ -37,6 +38,9 @@ pub(crate) struct PayloadHandler<Client> {
p2p_tx: mpsc::Sender<Message>,
// sends a `Events::BuiltPayload` to the reth payload builder when a new payload is received.
payload_events_handle: tokio::sync::broadcast::Sender<Events<OpEngineTypes>>,
/// WebSocket publisher for broadcasting flashblocks
/// to all connected subscribers.
ws_pub: Arc<WebSocketPublisher>,
// context required for execution of blocks during syncing
ctx: OpPayloadSyncerCtx,
// chain client
Expand All @@ -55,6 +59,7 @@ where
p2p_tx: mpsc::Sender<Message>,
payload_events_handle: tokio::sync::broadcast::Sender<Events<OpEngineTypes>>,
ctx: OpPayloadSyncerCtx,
ws_pub: Arc<WebSocketPublisher>,
client: Client,
cancel: tokio_util::sync::CancellationToken,
) -> Self {
Expand All @@ -63,6 +68,7 @@ where
p2p_rx,
p2p_tx,
payload_events_handle,
ws_pub,
ctx,
client,
cancel,
Expand All @@ -76,6 +82,7 @@ where
p2p_tx,
payload_events_handle,
ctx,
ws_pub,
client,
cancel,
} = self;
Expand All @@ -98,21 +105,38 @@ where
let ctx = ctx.clone();
let client = client.clone();
let payload_events_handle = payload_events_handle.clone();
let ws_pub = ws_pub.clone();
let cancel = cancel.clone();

// execute the flashblock on a thread where blocking is acceptable,
// as it's potentially a heavy operation
tokio::task::spawn_blocking(move || {
let metrics = ctx.metrics().clone();
let res = execute_flashblock(
payload,
ctx,
client,
cancel,
);
match res {
Ok((payload, _)) => {
Ok((payload, fb_payload)) => {
tracing::info!(hash = payload.block().hash().to_string(), block_number = payload.block().header().number, "successfully executed received flashblock");
let _ = payload_events_handle.send(Events::BuiltPayload(payload));
if let Err(e) = payload_events_handle.send(Events::BuiltPayload(payload)) {
warn!(e = ?e, "failed to send BuiltPayload event on synced block");
}

match ws_pub
.publish(&fb_payload) {
Ok(flashblock_byte_size) => {
metrics
.flashblock_byte_size_histogram
.record(flashblock_byte_size as f64);
}
Err(e) => {
tracing::warn!(error = ?e, "failed to publish flashblock to websocket subscribers");
}
}

}
Err(e) => {
tracing::error!(error = ?e, "failed to execute received flashblock");
Expand Down
11 changes: 9 additions & 2 deletions crates/op-rbuilder/src/builders/flashblocks/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
p2p::{AGENT_VERSION, FLASHBLOCKS_STREAM_PROTOCOL, Message},
payload::{FlashblocksExecutionInfo, FlashblocksExtraCtx},
payload_handler::PayloadHandler,
wspub::WebSocketPublisher,
},
generator::BlockPayloadJobGenerator,
},
Expand Down Expand Up @@ -106,16 +107,21 @@ impl FlashblocksServiceBuilder {

let metrics = Arc::new(OpRBuilderMetrics::default());
let (built_payload_tx, built_payload_rx) = tokio::sync::mpsc::channel(16);

let ws_pub: Arc<WebSocketPublisher> =
WebSocketPublisher::new(self.0.specific.ws_addr, metrics.clone())
.wrap_err("failed to create ws publisher")?
.into();
let payload_builder = OpPayloadBuilder::new(
OpEvmConfig::optimism(ctx.chain_spec()),
pool,
ctx.provider().clone(),
self.0.clone(),
builder_tx,
built_payload_tx,
ws_pub.clone(),
metrics.clone(),
)
.wrap_err("failed to create flashblocks payload builder")?;
);
let payload_job_config = BasicPayloadJobGeneratorConfig::default();

let payload_generator = BlockPayloadJobGenerator::with_builder(
Expand Down Expand Up @@ -144,6 +150,7 @@ impl FlashblocksServiceBuilder {
outgoing_message_tx,
payload_service.payload_events_handle(),
syncer_ctx,
ws_pub,
ctx.provider().clone(),
cancel,
);
Expand Down
Loading