diff --git a/crates/manager/src/manager/mod.rs b/crates/manager/src/manager/mod.rs index 96649305..72c23708 100644 --- a/crates/manager/src/manager/mod.rs +++ b/crates/manager/src/manager/mod.rs @@ -207,7 +207,7 @@ where /// Returns a new event listener for the rollup node manager. pub fn event_listener(&mut self) -> EventStream { if let Some(event_sender) = &self.event_sender { - return event_sender.new_listener() + return event_sender.new_listener(); }; let event_sender = EventSender::new(EVENT_CHANNEL_SIZE); diff --git a/crates/network/src/manager.rs b/crates/network/src/manager.rs index 8fe42b59..4bc79e4a 100644 --- a/crates/network/src/manager.rs +++ b/crates/network/src/manager.rs @@ -4,7 +4,7 @@ use super::{ BlockImportOutcome, BlockValidation, NetworkHandleMessage, NetworkManagerEvent, NewBlockWithPeer, ScrollNetworkHandle, }; -use alloy_primitives::{FixedBytes, Signature, U128}; +use alloy_primitives::{FixedBytes, Signature, B256, U128}; use futures::{FutureExt, Stream, StreamExt}; use reth_chainspec::EthChainSpec; use reth_eth_wire_types::NewBlock as EthWireNewBlock; @@ -54,7 +54,9 @@ pub struct ScrollNetworkManager { /// The receiver for new blocks received from the network (used to bridge from eth-wire). eth_wire_listener: Option>>, /// The scroll wire protocol manager. - scroll_wire: ScrollWireManager, + pub scroll_wire: ScrollWireManager, + /// The LRU cache used to track already seen (block,signature) pair. + pub blocks_seen: LruCache<(B256, Signature)>, /// The constant value that must be added to the block number to get the total difficulty. td_constant: U128, } @@ -90,6 +92,8 @@ impl // Create the scroll-wire protocol manager. let scroll_wire = ScrollWireManager::new(events); + let blocks_seen = LruCache::new(LRU_CACHE_SIZE); + // Spawn the inner network manager. tokio::spawn(inner_network_manager); @@ -98,6 +102,7 @@ impl handle, from_handle_rx: from_handle_rx.into(), scroll_wire, + blocks_seen, eth_wire_listener, td_constant, } @@ -128,11 +133,14 @@ impl< let handle = ScrollNetworkHandle::new(to_manager_tx, inner_network_handle); + let blocks_seen = LruCache::new(LRU_CACHE_SIZE); + Self { chain_spec, handle, from_handle_rx: from_handle_rx.into(), scroll_wire, + blocks_seen, eth_wire_listener, td_constant, } @@ -177,11 +185,29 @@ impl< } /// Handler for received events from the [`ScrollWireManager`]. - fn on_scroll_wire_event(&mut self, event: ScrollWireEvent) -> NetworkManagerEvent { + fn on_scroll_wire_event(&mut self, event: ScrollWireEvent) -> Option { match event { ScrollWireEvent::NewBlock { peer_id, block, signature } => { - trace!(target: "scroll::network::manager", peer_id = ?peer_id, block = ?block.hash_slow(), signature = ?signature, "Received new block"); - NetworkManagerEvent::NewBlock(NewBlockWithPeer { peer_id, block, signature }) + let block_hash = block.hash_slow(); + trace!(target: "scroll::network::manager", peer_id = ?peer_id, block = ?block_hash, signature = ?signature, "Received new block"); + if self.blocks_seen.contains(&(block_hash, signature)) { + None + } else { + // Update the state of the peer cache i.e. peer has seen this block. + self.scroll_wire + .state_mut() + .entry(peer_id) + .or_insert_with(|| LruCache::new(LRU_CACHE_SIZE)) + .insert(block_hash); + // Update the state of the block cache i.e. we have seen this block. + self.blocks_seen.insert((block.hash_slow(), signature)); + + Some(NetworkManagerEvent::NewBlock(NewBlockWithPeer { + peer_id, + block, + signature, + })) + } } // Only `NewBlock` events are expected from the scroll-wire protocol. _ => { @@ -214,12 +240,6 @@ impl< Ok(BlockValidation::ValidBlock { new_block: msg }) | Ok(BlockValidation::ValidHeader { new_block: msg }) => { trace!(target: "scroll::network::manager", peer_id = ?peer, block = ?msg.block, "Block import successful - announcing block to network"); - let hash = msg.block.hash_slow(); - self.scroll_wire - .state_mut() - .entry(peer) - .or_insert_with(|| LruCache::new(LRU_CACHE_SIZE)) - .insert(hash); self.announce_block(msg); } Err(BlockImportError::Consensus(err)) => { @@ -241,14 +261,6 @@ impl< block: reth_network_api::block::NewBlockWithPeer, ) -> Option { let reth_network_api::block::NewBlockWithPeer { peer_id, mut block } = block; - let block_hash = block.hash_slow(); - self.scroll_wire - .state_mut() - .entry(peer_id) - .or_insert_with(|| LruCache::new(LRU_CACHE_SIZE)) - .insert(block_hash); - - trace!(target: "scroll::bridge::import", peer_id = %peer_id, block_hash = %block_hash, "Received new block from eth-wire protocol"); // We purge the extra data field post euclid v2 to align with protocol specification. let extra_data = if self.chain_spec.is_euclid_v2_active_at_timestamp(block.timestamp) { @@ -268,6 +280,21 @@ impl< .checked_sub(ECDSA_SIGNATURE_LEN) .and_then(|i| Signature::from_raw(&extra_data[i..]).ok()) { + let block_hash = block.hash_slow(); + if self.blocks_seen.contains(&(block_hash, signature)) { + return None; + } + trace!(target: "scroll::bridge::import", peer_id = %peer_id, block_hash = %block_hash, "Received new block from eth-wire protocol"); + + // Update the state of the peer cache i.e. peer has seen this block. + self.scroll_wire + .state_mut() + .entry(peer_id) + .or_insert_with(|| LruCache::new(LRU_CACHE_SIZE)) + .insert(block_hash); + + // Update the state of the block cache i.e. we have seen this block. + self.blocks_seen.insert((block_hash, signature)); Some(NetworkManagerEvent::NewBlock(NewBlockWithPeer { peer_id, block, signature })) } else { tracing::warn!(target: "scroll::bridge::import", peer_id = %peer_id, "Failed to extract signature from block extra data, penalizing peer"); @@ -306,7 +333,9 @@ impl< // Next we handle the scroll-wire events. if let Poll::Ready(event) = this.scroll_wire.poll_unpin(cx) { - return Poll::Ready(Some(this.on_scroll_wire_event(event))); + if let Some(event) = this.on_scroll_wire_event(event) { + return Poll::Ready(Some(event)); + } } // Handle blocks received from the eth-wire protocol.