diff --git a/Makefile b/Makefile index 83f43ae5..c4d334ac 100644 --- a/Makefile +++ b/Makefile @@ -78,7 +78,6 @@ test-l1-sync: --locked \ --all-features \ --no-fail-fast \ - --test-threads=1 \ --failure-output immediate \ -E 'binary(l1_sync)' diff --git a/crates/chain-orchestrator/src/error.rs b/crates/chain-orchestrator/src/error.rs index 043a9f33..504daaba 100644 --- a/crates/chain-orchestrator/src/error.rs +++ b/crates/chain-orchestrator/src/error.rs @@ -92,10 +92,6 @@ pub enum ChainOrchestratorError { /// An error occurred while handling rollup node primitives. #[error("An error occurred while handling rollup node primitives: {0}")] RollupNodePrimitiveError(rollup_node_primitives::RollupNodePrimitiveError), - /// Shutdown requested - this is not a real error but used to signal graceful shutdown. - #[cfg(feature = "test-utils")] - #[error("Shutdown requested")] - Shutdown, } impl CanRetry for ChainOrchestratorError { diff --git a/crates/chain-orchestrator/src/event.rs b/crates/chain-orchestrator/src/event.rs index e6b4bc27..0ad8d845 100644 --- a/crates/chain-orchestrator/src/event.rs +++ b/crates/chain-orchestrator/src/event.rs @@ -119,4 +119,6 @@ pub enum ChainOrchestratorEvent { }, /// The head of the fork choice state has been updated in the engine driver. FcsHeadUpdated(BlockInfo), + /// The chain orchestrator is shutting down. + Shutdown, } diff --git a/crates/chain-orchestrator/src/handle/command.rs b/crates/chain-orchestrator/src/handle/command.rs index fbf12990..ce2d8cb9 100644 --- a/crates/chain-orchestrator/src/handle/command.rs +++ b/crates/chain-orchestrator/src/handle/command.rs @@ -35,9 +35,6 @@ pub enum ChainOrchestratorCommand>), - /// Request the `ChainOrchestrator` to shutdown immediately. - #[cfg(feature = "test-utils")] - Shutdown(oneshot::Sender<()>), } /// The database queries that can be sent to the rollup manager. diff --git a/crates/chain-orchestrator/src/handle/mod.rs b/crates/chain-orchestrator/src/handle/mod.rs index 8d9f005a..76737777 100644 --- a/crates/chain-orchestrator/src/handle/mod.rs +++ b/crates/chain-orchestrator/src/handle/mod.rs @@ -149,13 +149,4 @@ impl> ChainOrchestratorHand self.send_command(ChainOrchestratorCommand::DatabaseHandle(tx)); rx.await } - - /// Sends a command to shutdown the `ChainOrchestrator` immediately. - /// This will cause the `ChainOrchestrator`'s event loop to exit gracefully. - #[cfg(feature = "test-utils")] - pub async fn shutdown(&self) -> Result<(), oneshot::error::RecvError> { - let (tx, rx) = oneshot::channel(); - self.send_command(ChainOrchestratorCommand::Shutdown(tx)); - rx.await - } } diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index 43701375..703d1374 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -189,19 +189,12 @@ impl< biased; _guard = &mut shutdown => { + self.notify(ChainOrchestratorEvent::Shutdown); break; } Some(command) = self.handle_rx.recv() => { - match self.handle_command(command).await { - #[cfg(feature = "test-utils")] - Err(ChainOrchestratorError::Shutdown) => { - tracing::info!(target: "scroll::chain_orchestrator", "Shutdown requested, exiting gracefully"); - break; - } - Err(err) => { - tracing::error!(target: "scroll::chain_orchestrator", ?err, "Error handling command"); - } - Ok(_) => {} + if let Err(err) = self.handle_command(command).await { + tracing::error!(target: "scroll::chain_orchestrator", ?err, "Error handling command"); } } Some(event) = async { @@ -442,13 +435,6 @@ impl< ChainOrchestratorCommand::DatabaseHandle(tx) => { let _ = tx.send(self.database.clone()); } - #[cfg(feature = "test-utils")] - ChainOrchestratorCommand::Shutdown(tx) => { - tracing::info!(target: "scroll::chain_orchestrator", "Received shutdown command, exiting event loop"); - let _ = tx.send(()); - // Return an error to signal shutdown - return Err(ChainOrchestratorError::Shutdown); - } } Ok(()) diff --git a/crates/node/src/test_utils/fixture.rs b/crates/node/src/test_utils/fixture.rs index d3ee3772..2b6605fd 100644 --- a/crates/node/src/test_utils/fixture.rs +++ b/crates/node/src/test_utils/fixture.rs @@ -18,12 +18,12 @@ use alloy_rpc_types_anvil::ReorgOptions; use alloy_rpc_types_eth::Block; use alloy_signer_local::PrivateKeySigner; use alloy_transport::layers::RetryBackoffLayer; -use reth_chainspec::EthChainSpec; use reth_e2e_test_utils::{wallet::Wallet, NodeHelperType, TmpDB}; use reth_eth_wire_types::BasicNetworkPrimitives; use reth_fs_util::remove_dir_all; use reth_network::NetworkHandle; use reth_node_builder::NodeTypes; +use reth_node_core::exit::NodeExitFuture; use reth_node_types::NodeTypesWithDBAdapter; use reth_provider::providers::BlockchainProvider; use reth_scroll_chainspec::SCROLL_DEV; @@ -31,14 +31,12 @@ use reth_scroll_primitives::ScrollPrimitives; use reth_tasks::TaskManager; use reth_tokio_util::EventStream; use rollup_node_chain_orchestrator::{ChainOrchestratorEvent, ChainOrchestratorHandle}; -use rollup_node_primitives::BlockInfo; use rollup_node_sequencer::L1MessageInclusionMode; use scroll_alloy_consensus::ScrollPooledTransaction; -use scroll_alloy_provider::{ScrollAuthApiEngineClient, ScrollEngineApi}; use scroll_alloy_rpc_types::Transaction; -use scroll_engine::{Engine, ForkchoiceState}; use std::{ fmt::{Debug, Formatter}, + ops::{Deref, DerefMut}, path::PathBuf, sync::Arc, }; @@ -58,8 +56,6 @@ pub struct TestFixture { pub chain_spec: Arc<::ChainSpec>, /// Optional Anvil instance for L1 simulation. pub anvil: Option, - /// The task manager. Held in order to avoid dropping the node. - pub tasks: TaskManager, /// The configuration for the nodes. pub config: ScrollRollupNodeConfig, } @@ -99,6 +95,9 @@ pub type ScrollNetworkHandle = pub type TestBlockChainProvider = BlockchainProvider>; +/// The test node type for Scroll nodes. +pub type ScrollTestNode = NodeHelperType; + /// The node type (sequencer or follower). #[derive(Debug)] pub enum NodeType { @@ -108,12 +107,51 @@ pub enum NodeType { Follower, } +/// Components of a test node. +pub struct ScrollNodeTestComponents { + /// The node helper type for the test node. + pub node: ScrollTestNode, + /// The task manager for the test node. + pub task_manager: TaskManager, + /// The exit future for the test node. + pub exit_future: NodeExitFuture, +} + +impl ScrollNodeTestComponents { + /// Create new test node components. + pub async fn new( + node: ScrollTestNode, + task_manager: TaskManager, + exit_future: NodeExitFuture, + ) -> Self { + Self { node, task_manager, exit_future } + } +} + +impl std::fmt::Debug for ScrollNodeTestComponents { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ScrollNodeTestComponents").finish() + } +} + +impl DerefMut for ScrollNodeTestComponents { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.node + } +} + +impl Deref for ScrollNodeTestComponents { + type Target = ScrollTestNode; + + fn deref(&self) -> &Self::Target { + &self.node + } +} + /// Handle to a single test node with its components. pub struct NodeHandle { /// The underlying node context. - pub node: NodeHelperType, - /// Engine instance for this node. - pub engine: Engine>, + pub node: ScrollNodeTestComponents, /// Chain orchestrator listener. pub chain_orchestrator_rx: EventStream, /// Chain orchestrator handle. @@ -123,6 +161,15 @@ pub struct NodeHandle { } impl NodeHandle { + /// Create a new node handle. + pub async fn new(node: ScrollNodeTestComponents, typ: NodeType) -> eyre::Result { + let rollup_manager_handle = node.inner.add_ons_handle.rollup_manager_handle.clone(); + let chain_orchestrator_rx = + node.inner.add_ons_handle.rollup_manager_handle.get_event_listener().await?; + + Ok(Self { node, chain_orchestrator_rx, rollup_manager_handle, typ }) + } + /// Returns true if this is a handle to the sequencer. pub const fn is_sequencer(&self) -> bool { matches!(self.typ, NodeType::Sequencer) @@ -598,7 +645,6 @@ impl TestFixtureBuilder { // Start Anvil if requested let anvil = if self.anvil_config.enabled { let handle = Self::spawn_anvil( - self.anvil_config.port, self.anvil_config.state_path.as_deref(), self.anvil_config.chain_id, self.anvil_config.block_time, @@ -623,9 +669,7 @@ impl TestFixtureBuilder { None }; - let tasks = TaskManager::current(); - let (nodes, dbs, wallet) = setup_engine( - &tasks, + let (node_components, dbs, wallet) = setup_engine( self.config.clone(), self.num_nodes, chain_spec.clone(), @@ -635,45 +679,26 @@ impl TestFixtureBuilder { ) .await?; - let mut node_handles = Vec::with_capacity(nodes.len()); - for (index, node) in nodes.into_iter().enumerate() { - let genesis_hash = node.inner.chain_spec().genesis_hash(); - - // Create engine for the node - let auth_client = node.inner.engine_http_client(); - let engine_client = Arc::new(ScrollAuthApiEngineClient::new(auth_client)) - as Arc; - let fcs = ForkchoiceState::new( - BlockInfo { hash: genesis_hash, number: 0 }, - Default::default(), - Default::default(), - ); - let engine = Engine::new(Arc::new(engine_client), fcs); - - // Get handles if available - let rollup_manager_handle = node.inner.add_ons_handle.rollup_manager_handle.clone(); - let chain_orchestrator_rx = - node.inner.add_ons_handle.rollup_manager_handle.get_event_listener().await?; - - node_handles.push(Some(NodeHandle { + let mut nodes = Vec::with_capacity(node_components.len()); + for (index, node) in node_components.into_iter().enumerate() { + let handle = NodeHandle::new( node, - engine, - chain_orchestrator_rx, - rollup_manager_handle, - typ: if self.config.sequencer_args.sequencer_enabled && index == 0 { + if self.config.sequencer_args.sequencer_enabled && index == 0 { NodeType::Sequencer } else { NodeType::Follower }, - })); + ) + .await?; + + nodes.push(Some(handle)); } Ok(TestFixture { - nodes: node_handles, + nodes, dbs, wallet: Arc::new(Mutex::new(wallet)), chain_spec, - tasks, anvil, config: self.config, }) @@ -681,13 +706,12 @@ impl TestFixtureBuilder { /// Spawn an Anvil instance with the given configuration. async fn spawn_anvil( - port: u16, state_path: Option<&std::path::Path>, chain_id: Option, block_time: Option, slots_in_an_epoch: u64, ) -> eyre::Result { - let mut config = anvil::NodeConfig { port, ..Default::default() }; + let mut config = anvil::NodeConfig { port: 0, ..Default::default() }; if let Some(id) = chain_id { config.chain_id = Some(id); diff --git a/crates/node/src/test_utils/mod.rs b/crates/node/src/test_utils/mod.rs index 855f78a4..839f3f4b 100644 --- a/crates/node/src/test_utils/mod.rs +++ b/crates/node/src/test_utils/mod.rs @@ -79,22 +79,21 @@ pub use network_helpers::{ // Legacy utilities - keep existing functions for backward compatibility use crate::{ - BlobProviderArgs, ChainOrchestratorArgs, ConsensusArgs, EngineDriverArgs, L1ProviderArgs, - RollupNodeDatabaseArgs, RollupNodeNetworkArgs, RpcArgs, ScrollRollupNode, - ScrollRollupNodeConfig, SequencerArgs, TestArgs, + test_utils::fixture::ScrollNodeTestComponents, BlobProviderArgs, ChainOrchestratorArgs, + ConsensusArgs, EngineDriverArgs, L1ProviderArgs, RollupNodeDatabaseArgs, RollupNodeNetworkArgs, + RpcArgs, ScrollRollupNode, ScrollRollupNodeConfig, SequencerArgs, TestArgs, }; use alloy_primitives::Bytes; use reth_chainspec::EthChainSpec; use reth_db::test_utils::create_test_rw_db_with_path; use reth_e2e_test_utils::{ node::NodeTestContext, transaction::TransactionTestContext, wallet::Wallet, Adapter, - NodeHelperType, TmpDB, TmpNodeAddOnsHandle, TmpNodeEthApi, + TmpNodeAddOnsHandle, TmpNodeEthApi, }; use reth_engine_local::LocalPayloadAttributesBuilder; use reth_node_builder::{ rpc::RpcHandleProvider, EngineNodeLauncher, Node, NodeBuilder, NodeConfig, - NodeHandle as RethNodeHandle, NodeTypes, NodeTypesWithDBAdapter, PayloadAttributesBuilder, - PayloadTypes, TreeConfig, + NodeHandle as RethNodeHandle, NodeTypes, PayloadAttributesBuilder, PayloadTypes, TreeConfig, }; use reth_node_core::args::{DiscoveryArgs, NetworkArgs, RpcServerArgs, TxPoolArgs}; use reth_provider::providers::BlockchainProvider; @@ -110,7 +109,6 @@ use tracing::{span, Level}; /// This is the legacy setup function that's used by existing tests. /// For new tests, consider using the `TestFixture` API instead. pub async fn setup_engine( - tasks: &TaskManager, mut scroll_node_config: ScrollRollupNodeConfig, num_nodes: usize, chain_spec: Arc<::ChainSpec>, @@ -118,12 +116,7 @@ pub async fn setup_engine( no_local_transactions_propagation: bool, reboot_info: Option<(usize, Arc>)>, ) -> eyre::Result<( - Vec< - NodeHelperType< - ScrollRollupNode, - BlockchainProvider>, - >, - >, + Vec, Vec>>, Wallet, )> @@ -135,19 +128,15 @@ where TmpNodeAddOnsHandle: RpcHandleProvider, TmpNodeEthApi>, { - let exec = tasks.executor(); - let network_config = NetworkArgs { discovery: DiscoveryArgs { disable_discovery: true, ..DiscoveryArgs::default() }, ..NetworkArgs::default() }; // Create nodes and peer them - let mut nodes: Vec> = Vec::with_capacity(num_nodes); + let mut nodes: Vec = Vec::with_capacity(num_nodes); let mut dbs: Vec>> = Vec::new(); - // let (node_index, db_provided) = reboot_info.unwrap_or((0, None)); - for idx in 0..num_nodes { // Determine the actual node index (for reboot use provided index, otherwise use idx) let node_index = reboot_info.as_ref().map(|(node_idx, _)| *node_idx).unwrap_or(idx); @@ -160,13 +149,8 @@ where // Configure node with the test data directory let mut node_config = NodeConfig::new(chain_spec.clone()) .with_network(network_config.clone()) + .with_rpc(RpcServerArgs::default().with_http().with_http_api(RpcModuleSelection::All)) .with_unused_ports() - .with_rpc( - RpcServerArgs::default() - .with_unused_ports() - .with_http() - .with_http_api(RpcModuleSelection::All), - ) .set_dev(is_dev) .with_txpool(TxPoolArgs { no_local_transactions_propagation, ..Default::default() }); @@ -214,12 +198,13 @@ where let span = span!(Level::INFO, "node", node_index); let _enter = span.enter(); + let task_manager = TaskManager::current(); let testing_node = NodeBuilder::new(node_config.clone()) .with_database(db.clone()) - .with_launch_context(exec.clone()); + .with_launch_context(task_manager.executor()); let testing_config = testing_node.config().clone(); let node = ScrollRollupNode::new(scroll_node_config.clone(), testing_config).await; - let RethNodeHandle { node, node_exit_future: _ } = testing_node + let RethNodeHandle { node, node_exit_future } = testing_node .with_types_and_provider::>() .with_components(node.components_builder()) .with_add_ons(node.add_ons()) @@ -241,12 +226,6 @@ where NodeTestContext::new(node, |_| panic!("should not build payloads using this method")) .await?; - // skip the forkchoice update when a database is provided (reboot scenario) - if reboot_info.is_none() { - let genesis = node.block_hash(0); - node.update_forkchoice(genesis, genesis).await?; - } - // Connect each node in a chain. if let Some(previous_node) = nodes.last_mut() { previous_node.connect(&mut node).await; @@ -259,6 +238,8 @@ where } } + let node = ScrollNodeTestComponents::new(node, task_manager, node_exit_future).await; + nodes.push(node); // Note: db is already added to dbs in the creation logic above } diff --git a/crates/node/src/test_utils/reboot.rs b/crates/node/src/test_utils/reboot.rs index fb9df827..e85aa51b 100644 --- a/crates/node/src/test_utils/reboot.rs +++ b/crates/node/src/test_utils/reboot.rs @@ -50,10 +50,11 @@ //! fixture.expect_event().l1_synced().await?; //! ``` -use crate::test_utils::{setup_engine, NodeHandle, TestFixture}; -use scroll_alloy_provider::{ScrollAuthApiEngineClient, ScrollEngineApi}; -use scroll_engine::Engine; -use std::sync::Arc; +use std::time::Duration; + +use futures::StreamExt; + +use crate::test_utils::{fixture::ScrollNodeTestComponents, setup_engine, NodeHandle, TestFixture}; impl TestFixture { /// Shutdown a node by index, performing a graceful shutdown of all components. @@ -117,23 +118,37 @@ impl TestFixture { } tracing::info!("Shutting down node at index {}", node_index); - - // Step 1: Explicitly shutdown the ChainOrchestrator - // This sends a Shutdown command that will make the ChainOrchestrator exit its event loop - // immediately - if let Some(node) = &self.nodes[node_index] { - tracing::info!("Sending shutdown command to ChainOrchestrator..."); - if let Err(e) = node.rollup_manager_handle.shutdown().await { - tracing::warn!("Failed to send shutdown command to ChainOrchestrator: {}", e); - } else { - tracing::info!("ChainOrchestrator shutdown command acknowledged"); + let NodeHandle { node, mut chain_orchestrator_rx, rollup_manager_handle: _r_h, typ: _ } = + self.nodes + .get_mut(node_index) + .and_then(|opt| opt.take()) + .expect("Node existence checked above"); + let ScrollNodeTestComponents { node, task_manager, exit_future } = node; + + tokio::task::spawn_blocking(|| { + if !task_manager.graceful_shutdown_with_timeout(Duration::from_secs(10)) { + return Err(eyre::eyre!("Failed to shutdown tasks within timeout")); } - } + eyre::Ok(()) + }) + .await??; + + // wait for the exit future to resolve - i.e. the consensus engine has shut down + let _ = exit_future.await; + + // Wait for shutdown event from ChainOrchestrator + tokio::time::timeout(Duration::from_secs(10), async { + while let Some(event) = chain_orchestrator_rx.next().await { + if matches!(event, rollup_node_chain_orchestrator::ChainOrchestratorEvent::Shutdown) + { + return Ok::<(), eyre::ErrReport>(()); + } + } + Err(eyre::eyre!("Shutdown event not received")) + }) + .await??; - // Step 2: Take ownership and drop the node handle - // This closes all channels (RPC, network, DB) and releases resources - let node_handle = self.nodes[node_index].take(); - drop(node_handle); + drop(node); // Step 3: Wait for async cleanup to complete tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; @@ -197,7 +212,6 @@ impl TestFixture { // Step 2: Create a new node instance with the existing database let (mut new_nodes, _, _) = setup_engine( - &self.tasks, self.config.clone(), 1, self.chain_spec.clone(), @@ -212,51 +226,13 @@ impl TestFixture { } let new_node = new_nodes.remove(0); - - // Step 3: Setup Engine API client - let auth_client = new_node.inner.engine_http_client(); - let engine_client = Arc::new(ScrollAuthApiEngineClient::new(auth_client)) - as Arc; - - // Step 4: Get necessary handles from the new node - let rollup_manager_handle = new_node.inner.add_ons_handle.rollup_manager_handle.clone(); - - // Step 5: Restore ForkchoiceState from persisted ChainOrchestrator status - // CRITICAL: This must NOT be reset to genesis. The execution client's state - // is at a later block, and resetting FCS to genesis would cause a mismatch, - // resulting in "Syncing" errors when building payloads after reboot. - let status = rollup_manager_handle.status().await?; - let fcs: scroll_engine::ForkchoiceState = status.l2.fcs; - - tracing::info!( - "Restored FCS from database - head: {:?}, safe: {:?}, finalized: {:?}", - fcs.head_block_info(), - fcs.safe_block_info(), - fcs.finalized_block_info() - ); - - // Step 6: Initialize Engine with the restored ForkchoiceState - let engine = Engine::new(Arc::new(engine_client), fcs); - let chain_orchestrator_rx = - new_node.inner.add_ons_handle.rollup_manager_handle.get_event_listener().await?; - - // Step 7: Determine node type (sequencer vs follower) - let was_sequencer = self.config.sequencer_args.sequencer_enabled && node_index == 0; - - // Step 8: Create the new NodeHandle with all components - let new_node_handle = NodeHandle { - node: new_node, - engine, - chain_orchestrator_rx, - rollup_manager_handle, - typ: if was_sequencer { - crate::test_utils::fixture::NodeType::Sequencer - } else { - crate::test_utils::fixture::NodeType::Follower - }, + let typ = if self.config.sequencer_args.sequencer_enabled && node_index == 0 { + crate::test_utils::fixture::NodeType::Sequencer + } else { + crate::test_utils::fixture::NodeType::Follower }; - // Step 9: Replace the old (None) node slot with the new node + let new_node_handle = NodeHandle::new(new_node, typ).await?; self.nodes[node_index] = Some(new_node_handle); tracing::info!("Node started successfully at index {}", node_index); diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index 0e8d812a..87211017 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -13,7 +13,7 @@ use reth_scroll_chainspec::{ScrollChainSpec, SCROLL_DEV, SCROLL_MAINNET, SCROLL_ use reth_scroll_node::ScrollNetworkPrimitives; use reth_scroll_primitives::ScrollBlock; use reth_storage_api::BlockReader; -use reth_tasks::{shutdown::signal as shutdown_signal, TaskManager}; +use reth_tasks::shutdown::signal as shutdown_signal; use reth_tokio_util::EventStream; use rollup_node::{ constants::SCROLL_GAS_LIMIT, @@ -295,16 +295,15 @@ async fn can_forward_tx_to_sequencer() -> eyre::Result<()> { // Create the chain spec for scroll mainnet with Euclid v2 activated and a test genesis. let chain_spec = (*SCROLL_DEV).clone(); - let tasks = TaskManager::current(); let (mut sequencer_node, _, _) = - setup_engine(&tasks, sequencer_node_config, 1, chain_spec.clone(), false, true, None) + setup_engine(sequencer_node_config, 1, chain_spec.clone(), false, true, None) .await .unwrap(); let sequencer_url = format!("http://localhost:{}", sequencer_node[0].rpc_url().port().unwrap()); follower_node_config.network_args.sequencer_url = Some(sequencer_url); let (mut follower_node, _, wallet) = - setup_engine(&tasks, follower_node_config, 1, chain_spec, false, true, None).await.unwrap(); + setup_engine(follower_node_config, 1, chain_spec, false, true, None).await.unwrap(); let wallet = Arc::new(Mutex::new(wallet)); @@ -466,9 +465,7 @@ async fn can_bridge_blocks() -> eyre::Result<()> { let chain_spec = (*SCROLL_DEV).clone(); // Setup the bridge node and a standard node. - let tasks = TaskManager::current(); let (mut nodes, _, _) = setup_engine( - &tasks, default_test_scroll_rollup_node_config(), 1, chain_spec.clone(), @@ -523,7 +520,7 @@ async fn can_bridge_blocks() -> eyre::Result<()> { let mut network_events = network_handle.event_listener(); // Spawn the standard NetworkManager. - tasks.executor().spawn(network); + bridge_node.task_manager.executor().spawn(network); // Connect the standard NetworkManager to the bridge node. bridge_node.network.add_peer(network_handle.local_node_record()).await; @@ -575,9 +572,7 @@ async fn shutdown_consolidates_most_recent_batch_on_startup() -> eyre::Result<() let chain_spec = (*SCROLL_MAINNET).clone(); // Launch a node - let tasks = TaskManager::current(); let (mut nodes, _, _) = setup_engine( - &tasks, default_test_scroll_rollup_node_config(), 1, chain_spec.clone(), @@ -863,9 +858,8 @@ async fn graceful_shutdown_sets_fcs_to_latest_signed_block_in_db_on_start_up() - config.signer_args.private_key = Some(PrivateKeySigner::random()); // Launch a node - let tasks = TaskManager::current(); let (mut nodes, _, _) = - setup_engine(&tasks, config.clone(), 1, chain_spec.clone(), false, false, None).await?; + setup_engine(config.clone(), 1, chain_spec.clone(), false, false, None).await?; let node = nodes.pop().unwrap(); // Instantiate the rollup node manager. @@ -920,7 +914,7 @@ async fn graceful_shutdown_sets_fcs_to_latest_signed_block_in_db_on_start_up() - } // Wait for the EN to be synced to block 10. - let execution_node_provider = node.inner.provider; + let execution_node_provider = &node.inner.provider; loop { handle.build_block(); let block_number = loop { diff --git a/crates/node/tests/sync.rs b/crates/node/tests/sync.rs index 7d96e0f0..9c9d3085 100644 --- a/crates/node/tests/sync.rs +++ b/crates/node/tests/sync.rs @@ -6,7 +6,6 @@ use reqwest::Url; use reth_provider::{BlockIdReader, BlockReader}; use reth_rpc_eth_api::helpers::EthTransactions; use reth_scroll_chainspec::{SCROLL_DEV, SCROLL_SEPOLIA}; -use reth_tasks::TaskManager; use reth_tokio_util::EventStream; use rollup_node::{ test_utils::{ @@ -80,9 +79,8 @@ async fn test_should_consolidate_to_block_15k() -> eyre::Result<()> { }; let chain_spec = (*SCROLL_SEPOLIA).clone(); - let tasks = TaskManager::current(); let (mut nodes, _, _) = - setup_engine(&tasks, node_config, 1, chain_spec.clone(), false, false, None).await?; + setup_engine(node_config, 1, chain_spec.clone(), false, false, None).await?; let node = nodes.pop().unwrap(); // We perform consolidation up to block 15k. This allows us to capture a batch revert event at @@ -552,18 +550,10 @@ async fn test_chain_orchestrator_l1_reorg() -> eyre::Result<()> { let chain_spec = (*SCROLL_DEV).clone(); // Create a sequencer node and an unsynced node. - let tasks = TaskManager::current(); - let (mut nodes, _, _) = setup_engine( - &tasks, - sequencer_node_config.clone(), - 1, - chain_spec.clone(), - false, - false, - None, - ) - .await - .unwrap(); + let (mut nodes, _, _) = + setup_engine(sequencer_node_config.clone(), 1, chain_spec.clone(), false, false, None) + .await + .unwrap(); let mut sequencer = nodes.pop().unwrap(); let sequencer_handle = sequencer.inner.rollup_manager_handle.clone(); let mut sequencer_events = sequencer_handle.get_event_listener().await?; @@ -571,9 +561,7 @@ async fn test_chain_orchestrator_l1_reorg() -> eyre::Result<()> { sequencer.inner.add_ons_handle.rollup_manager_handle.l1_watcher_mock.clone().unwrap(); let (mut nodes, _, _) = - setup_engine(&tasks, node_config.clone(), 1, chain_spec.clone(), false, false, None) - .await - .unwrap(); + setup_engine(node_config.clone(), 1, chain_spec.clone(), false, false, None).await.unwrap(); let mut follower = nodes.pop().unwrap(); let mut follower_events = follower.inner.rollup_manager_handle.get_event_listener().await?; let follower_l1_watcher_tx = diff --git a/crates/sequencer/tests/e2e.rs b/crates/sequencer/tests/e2e.rs index 38bd4b80..079e3fd1 100644 --- a/crates/sequencer/tests/e2e.rs +++ b/crates/sequencer/tests/e2e.rs @@ -6,7 +6,6 @@ use futures::stream::StreamExt; use reth_e2e_test_utils::transaction::TransactionTestContext; use reth_scroll_chainspec::SCROLL_DEV; use reth_scroll_node::test_utils::setup; -use reth_tasks::TaskManager; use rollup_node::{ constants::SCROLL_GAS_LIMIT, test_utils::{default_test_scroll_rollup_node_config, setup_engine}, @@ -212,18 +211,10 @@ async fn can_build_blocks_with_delayed_l1_messages() { const L1_MESSAGE_DELAY: u64 = 2; // setup a test node - let tasks = TaskManager::current(); - let (mut nodes, _, wallet) = setup_engine( - &tasks, - default_test_scroll_rollup_node_config(), - 1, - chain_spec, - false, - false, - None, - ) - .await - .unwrap(); + let (mut nodes, _, wallet) = + setup_engine(default_test_scroll_rollup_node_config(), 1, chain_spec, false, false, None) + .await + .unwrap(); let node = nodes.pop().unwrap(); let wallet = Arc::new(Mutex::new(wallet)); @@ -345,18 +336,10 @@ async fn can_build_blocks_with_finalized_l1_messages() { let chain_spec = SCROLL_DEV.clone(); // setup a test node - let tasks = TaskManager::current(); - let (mut nodes, _, wallet) = setup_engine( - &tasks, - default_test_scroll_rollup_node_config(), - 1, - chain_spec, - false, - false, - None, - ) - .await - .unwrap(); + let (mut nodes, _, wallet) = + setup_engine(default_test_scroll_rollup_node_config(), 1, chain_spec, false, false, None) + .await + .unwrap(); let node = nodes.pop().unwrap(); let wallet = Arc::new(Mutex::new(wallet)); @@ -530,9 +513,8 @@ async fn can_sequence_blocks_with_private_key_file() -> eyre::Result<()> { rpc_args: RpcArgs::default(), }; - let tasks = TaskManager::current(); let (nodes, _, wallet) = - setup_engine(&tasks, rollup_manager_args, 1, chain_spec, false, false, None).await?; + setup_engine(rollup_manager_args, 1, chain_spec, false, false, None).await?; let wallet = Arc::new(Mutex::new(wallet)); let sequencer_rnm_handle = nodes[0].inner.add_ons_handle.rollup_manager_handle.clone(); @@ -635,9 +617,8 @@ async fn can_sequence_blocks_with_hex_key_file_without_prefix() -> eyre::Result< rpc_args: RpcArgs::default(), }; - let tasks = TaskManager::current(); let (nodes, _, wallet) = - setup_engine(&tasks, rollup_manager_args, 1, chain_spec, false, false, None).await?; + setup_engine(rollup_manager_args, 1, chain_spec, false, false, None).await?; let wallet = Arc::new(Mutex::new(wallet)); let sequencer_rnm_handle = nodes[0].inner.add_ons_handle.rollup_manager_handle.clone(); @@ -698,9 +679,7 @@ async fn can_build_blocks_and_exit_at_gas_limit() { // setup a test node. use a high value for the payload building duration to be sure we don't // exit early. - let tasks = TaskManager::current(); let (mut nodes, _, wallet) = setup_engine( - &tasks, ScrollRollupNodeConfig { sequencer_args: SequencerArgs { payload_building_duration: 1000, ..Default::default() }, ..default_test_scroll_rollup_node_config() @@ -787,9 +766,7 @@ async fn can_build_blocks_and_exit_at_time_limit() { // setup a test node. use a low payload building duration in order to exit before we reach the // gas limit. - let tasks = TaskManager::current(); let (mut nodes, _, wallet) = setup_engine( - &tasks, ScrollRollupNodeConfig { sequencer_args: SequencerArgs { payload_building_duration: 10, ..Default::default() }, ..default_test_scroll_rollup_node_config() @@ -878,18 +855,10 @@ async fn should_limit_l1_message_cumulative_gas() { // setup a test node let chain_spec = SCROLL_DEV.clone(); - let tasks = TaskManager::current(); - let (mut nodes, _, wallet) = setup_engine( - &tasks, - default_test_scroll_rollup_node_config(), - 1, - chain_spec, - false, - false, - None, - ) - .await - .unwrap(); + let (mut nodes, _, wallet) = + setup_engine(default_test_scroll_rollup_node_config(), 1, chain_spec, false, false, None) + .await + .unwrap(); let node = nodes.pop().unwrap(); let wallet = Arc::new(Mutex::new(wallet)); @@ -1003,18 +972,10 @@ async fn should_not_add_skipped_messages() { // setup a test node let chain_spec = SCROLL_DEV.clone(); - let tasks = TaskManager::current(); - let (mut nodes, _, wallet) = setup_engine( - &tasks, - default_test_scroll_rollup_node_config(), - 1, - chain_spec, - false, - false, - None, - ) - .await - .unwrap(); + let (mut nodes, _, wallet) = + setup_engine(default_test_scroll_rollup_node_config(), 1, chain_spec, false, false, None) + .await + .unwrap(); let node = nodes.pop().unwrap(); let wallet = Arc::new(Mutex::new(wallet));