Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ test-l1-sync:
--locked \
--all-features \
--no-fail-fast \
--test-threads=1 \
--failure-output immediate \
-E 'binary(l1_sync)'

Expand Down
4 changes: 0 additions & 4 deletions crates/chain-orchestrator/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,6 @@ pub enum ChainOrchestratorError {
/// An error occurred while handling rollup node primitives.
#[error("An error occurred while handling rollup node primitives: {0}")]
RollupNodePrimitiveError(rollup_node_primitives::RollupNodePrimitiveError),
/// Shutdown requested - this is not a real error but used to signal graceful shutdown.
#[cfg(feature = "test-utils")]
#[error("Shutdown requested")]
Shutdown,
}

impl CanRetry for ChainOrchestratorError {
Expand Down
2 changes: 2 additions & 0 deletions crates/chain-orchestrator/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,6 @@ pub enum ChainOrchestratorEvent {
},
/// The head of the fork choice state has been updated in the engine driver.
FcsHeadUpdated(BlockInfo),
/// The chain orchestrator is shutting down.
Shutdown,
}
3 changes: 0 additions & 3 deletions crates/chain-orchestrator/src/handle/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ pub enum ChainOrchestratorCommand<N: FullNetwork<Primitives = ScrollNetworkPrimi
/// Returns a database handle for direct database access.
#[cfg(feature = "test-utils")]
DatabaseHandle(oneshot::Sender<std::sync::Arc<scroll_db::Database>>),
/// Request the `ChainOrchestrator` to shutdown immediately.
#[cfg(feature = "test-utils")]
Shutdown(oneshot::Sender<()>),
}

/// The database queries that can be sent to the rollup manager.
Expand Down
9 changes: 0 additions & 9 deletions crates/chain-orchestrator/src/handle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,4 @@ impl<N: FullNetwork<Primitives = ScrollNetworkPrimitives>> ChainOrchestratorHand
self.send_command(ChainOrchestratorCommand::DatabaseHandle(tx));
rx.await
}

/// Sends a command to shutdown the `ChainOrchestrator` immediately.
/// This will cause the `ChainOrchestrator`'s event loop to exit gracefully.
#[cfg(feature = "test-utils")]
pub async fn shutdown(&self) -> Result<(), oneshot::error::RecvError> {
let (tx, rx) = oneshot::channel();
self.send_command(ChainOrchestratorCommand::Shutdown(tx));
rx.await
}
}
20 changes: 3 additions & 17 deletions crates/chain-orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,19 +189,12 @@ impl<
biased;

_guard = &mut shutdown => {
self.notify(ChainOrchestratorEvent::Shutdown);
break;
}
Some(command) = self.handle_rx.recv() => {
match self.handle_command(command).await {
#[cfg(feature = "test-utils")]
Err(ChainOrchestratorError::Shutdown) => {
tracing::info!(target: "scroll::chain_orchestrator", "Shutdown requested, exiting gracefully");
break;
}
Err(err) => {
tracing::error!(target: "scroll::chain_orchestrator", ?err, "Error handling command");
}
Ok(_) => {}
if let Err(err) = self.handle_command(command).await {
tracing::error!(target: "scroll::chain_orchestrator", ?err, "Error handling command");
}
}
Some(event) = async {
Expand Down Expand Up @@ -442,13 +435,6 @@ impl<
ChainOrchestratorCommand::DatabaseHandle(tx) => {
let _ = tx.send(self.database.clone());
}
#[cfg(feature = "test-utils")]
ChainOrchestratorCommand::Shutdown(tx) => {
tracing::info!(target: "scroll::chain_orchestrator", "Received shutdown command, exiting event loop");
let _ = tx.send(());
// Return an error to signal shutdown
return Err(ChainOrchestratorError::Shutdown);
}
}

Ok(())
Expand Down
110 changes: 67 additions & 43 deletions crates/node/src/test_utils/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,25 @@ use alloy_rpc_types_anvil::ReorgOptions;
use alloy_rpc_types_eth::Block;
use alloy_signer_local::PrivateKeySigner;
use alloy_transport::layers::RetryBackoffLayer;
use reth_chainspec::EthChainSpec;
use reth_e2e_test_utils::{wallet::Wallet, NodeHelperType, TmpDB};
use reth_eth_wire_types::BasicNetworkPrimitives;
use reth_fs_util::remove_dir_all;
use reth_network::NetworkHandle;
use reth_node_builder::NodeTypes;
use reth_node_core::exit::NodeExitFuture;
use reth_node_types::NodeTypesWithDBAdapter;
use reth_provider::providers::BlockchainProvider;
use reth_scroll_chainspec::SCROLL_DEV;
use reth_scroll_primitives::ScrollPrimitives;
use reth_tasks::TaskManager;
use reth_tokio_util::EventStream;
use rollup_node_chain_orchestrator::{ChainOrchestratorEvent, ChainOrchestratorHandle};
use rollup_node_primitives::BlockInfo;
use rollup_node_sequencer::L1MessageInclusionMode;
use scroll_alloy_consensus::ScrollPooledTransaction;
use scroll_alloy_provider::{ScrollAuthApiEngineClient, ScrollEngineApi};
use scroll_alloy_rpc_types::Transaction;
use scroll_engine::{Engine, ForkchoiceState};
use std::{
fmt::{Debug, Formatter},
ops::{Deref, DerefMut},
path::PathBuf,
sync::Arc,
};
Expand All @@ -58,8 +56,6 @@ pub struct TestFixture {
pub chain_spec: Arc<<ScrollRollupNode as NodeTypes>::ChainSpec>,
/// Optional Anvil instance for L1 simulation.
pub anvil: Option<anvil::NodeHandle>,
/// The task manager. Held in order to avoid dropping the node.
pub tasks: TaskManager,
/// The configuration for the nodes.
pub config: ScrollRollupNodeConfig,
}
Expand Down Expand Up @@ -99,6 +95,9 @@ pub type ScrollNetworkHandle =
pub type TestBlockChainProvider =
BlockchainProvider<NodeTypesWithDBAdapter<ScrollRollupNode, TmpDB>>;

/// The test node type for Scroll nodes.
pub type ScrollTestNode = NodeHelperType<ScrollRollupNode, TestBlockChainProvider>;

/// The node type (sequencer or follower).
#[derive(Debug)]
pub enum NodeType {
Expand All @@ -108,12 +107,51 @@ pub enum NodeType {
Follower,
}

/// Components of a test node.
pub struct ScrollNodeTestComponents {
/// The node helper type for the test node.
pub node: ScrollTestNode,
/// The task manager for the test node.
pub task_manager: TaskManager,
/// The exit future for the test node.
pub exit_future: NodeExitFuture,
}

impl ScrollNodeTestComponents {
/// Create new test node components.
pub async fn new(
node: ScrollTestNode,
task_manager: TaskManager,
exit_future: NodeExitFuture,
) -> Self {
Self { node, task_manager, exit_future }
}
}

impl std::fmt::Debug for ScrollNodeTestComponents {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ScrollNodeTestComponents").finish()
}
}

impl DerefMut for ScrollNodeTestComponents {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.node
}
}

impl Deref for ScrollNodeTestComponents {
type Target = ScrollTestNode;

fn deref(&self) -> &Self::Target {
&self.node
}
}

/// Handle to a single test node with its components.
pub struct NodeHandle {
/// The underlying node context.
pub node: NodeHelperType<ScrollRollupNode, TestBlockChainProvider>,
/// Engine instance for this node.
pub engine: Engine<Arc<dyn ScrollEngineApi + Send + Sync + 'static>>,
pub node: ScrollNodeTestComponents,
/// Chain orchestrator listener.
pub chain_orchestrator_rx: EventStream<ChainOrchestratorEvent>,
/// Chain orchestrator handle.
Expand All @@ -123,6 +161,15 @@ pub struct NodeHandle {
}

impl NodeHandle {
/// Create a new node handle.
pub async fn new(node: ScrollNodeTestComponents, typ: NodeType) -> eyre::Result<Self> {
let rollup_manager_handle = node.inner.add_ons_handle.rollup_manager_handle.clone();
let chain_orchestrator_rx =
node.inner.add_ons_handle.rollup_manager_handle.get_event_listener().await?;

Ok(Self { node, chain_orchestrator_rx, rollup_manager_handle, typ })
}

/// Returns true if this is a handle to the sequencer.
pub const fn is_sequencer(&self) -> bool {
matches!(self.typ, NodeType::Sequencer)
Expand Down Expand Up @@ -598,7 +645,6 @@ impl TestFixtureBuilder {
// Start Anvil if requested
let anvil = if self.anvil_config.enabled {
let handle = Self::spawn_anvil(
self.anvil_config.port,
self.anvil_config.state_path.as_deref(),
self.anvil_config.chain_id,
self.anvil_config.block_time,
Expand All @@ -623,9 +669,7 @@ impl TestFixtureBuilder {
None
};

let tasks = TaskManager::current();
let (nodes, dbs, wallet) = setup_engine(
&tasks,
let (node_components, dbs, wallet) = setup_engine(
self.config.clone(),
self.num_nodes,
chain_spec.clone(),
Expand All @@ -635,59 +679,39 @@ impl TestFixtureBuilder {
)
.await?;

let mut node_handles = Vec::with_capacity(nodes.len());
for (index, node) in nodes.into_iter().enumerate() {
let genesis_hash = node.inner.chain_spec().genesis_hash();

// Create engine for the node
let auth_client = node.inner.engine_http_client();
let engine_client = Arc::new(ScrollAuthApiEngineClient::new(auth_client))
as Arc<dyn ScrollEngineApi + Send + Sync + 'static>;
let fcs = ForkchoiceState::new(
BlockInfo { hash: genesis_hash, number: 0 },
Default::default(),
Default::default(),
);
let engine = Engine::new(Arc::new(engine_client), fcs);

// Get handles if available
let rollup_manager_handle = node.inner.add_ons_handle.rollup_manager_handle.clone();
let chain_orchestrator_rx =
node.inner.add_ons_handle.rollup_manager_handle.get_event_listener().await?;

node_handles.push(Some(NodeHandle {
let mut nodes = Vec::with_capacity(node_components.len());
for (index, node) in node_components.into_iter().enumerate() {
let handle = NodeHandle::new(
node,
engine,
chain_orchestrator_rx,
rollup_manager_handle,
typ: if self.config.sequencer_args.sequencer_enabled && index == 0 {
if self.config.sequencer_args.sequencer_enabled && index == 0 {
NodeType::Sequencer
} else {
NodeType::Follower
},
}));
)
.await?;

nodes.push(Some(handle));
}

Ok(TestFixture {
nodes: node_handles,
nodes,
dbs,
wallet: Arc::new(Mutex::new(wallet)),
chain_spec,
tasks,
anvil,
config: self.config,
})
}

/// Spawn an Anvil instance with the given configuration.
async fn spawn_anvil(
port: u16,
state_path: Option<&std::path::Path>,
chain_id: Option<u64>,
block_time: Option<u64>,
slots_in_an_epoch: u64,
) -> eyre::Result<anvil::NodeHandle> {
let mut config = anvil::NodeConfig { port, ..Default::default() };
let mut config = anvil::NodeConfig { port: 0, ..Default::default() };

if let Some(id) = chain_id {
config.chain_id = Some(id);
Expand Down
Loading
Loading