diff --git a/ipc/cli/src/commands/checkpoint/relayer.rs b/ipc/cli/src/commands/checkpoint/relayer.rs index 816c60ff0d..43d927b409 100644 --- a/ipc/cli/src/commands/checkpoint/relayer.rs +++ b/ipc/cli/src/commands/checkpoint/relayer.rs @@ -55,6 +55,7 @@ impl CommandLineHandler for BottomUpRelayer { parent.clone(), child.clone(), Arc::new(RwLock::new(keystore)), + arguments.max_parallelism, ) .await?; @@ -88,4 +89,10 @@ pub(crate) struct BottomUpRelayerArgs { pub finalization_blocks: Option, #[arg(long, help = "The hex encoded address of the submitter")] pub submitter: Option, + #[arg( + long, + default_value = "4", + help = "The max parallelism for submitting checkpoints" + )] + pub max_parallelism: usize, } diff --git a/ipc/provider/src/checkpoint.rs b/ipc/provider/src/checkpoint.rs index 2c6d9daae2..7066844c98 100644 --- a/ipc/provider/src/checkpoint.rs +++ b/ipc/provider/src/checkpoint.rs @@ -5,13 +5,16 @@ use crate::config::Subnet; use crate::manager::{BottomUpCheckpointRelayer, EthSubnetManager}; use anyhow::{anyhow, Result}; +use futures_util::future::try_join_all; use fvm_shared::address::Address; use fvm_shared::clock::ChainEpoch; +use ipc_api::checkpoint::{BottomUpCheckpointBundle, QuorumReachedEvent}; use ipc_wallet::{EthKeyAddress, PersistentKeyStore}; use std::cmp::max; use std::fmt::{Display, Formatter}; use std::sync::{Arc, RwLock}; use std::time::Duration; +use tokio::sync::Semaphore; /// Tracks the config required for bottom up checkpoint submissions /// parent/child subnet and checkpoint period. @@ -26,10 +29,11 @@ pub struct CheckpointConfig { /// Then it will submit at the next submission height for the new checkpoint. pub struct BottomUpCheckpointManager { metadata: CheckpointConfig, - parent_handler: T, + parent_handler: Arc, child_handler: T, /// The number of blocks away from the chain head that is considered final finalization_blocks: ChainEpoch, + submission_semaphore: Arc, } impl BottomUpCheckpointManager { @@ -38,6 +42,7 @@ impl BottomUpCheckpointManager { child: Subnet, parent_handler: T, child_handler: T, + max_parallelism: usize, ) -> Result { let period = parent_handler .checkpoint_period(&child.id) @@ -49,9 +54,10 @@ impl BottomUpCheckpointManager { child, period, }, - parent_handler, + parent_handler: Arc::new(parent_handler), child_handler, finalization_blocks: 0, + submission_semaphore: Arc::new(Semaphore::new(max_parallelism)), }) } @@ -66,12 +72,20 @@ impl BottomUpCheckpointManager { parent: Subnet, child: Subnet, keystore: Arc>>, + max_parallelism: usize, ) -> Result { let parent_handler = EthSubnetManager::from_subnet_with_wallet_store(&parent, Some(keystore.clone()))?; let child_handler = EthSubnetManager::from_subnet_with_wallet_store(&child, Some(keystore))?; - Self::new(parent, child, parent_handler, child_handler).await + Self::new( + parent, + child, + parent_handler, + child_handler, + max_parallelism, + ) + .await } } @@ -106,16 +120,15 @@ impl BottomUpCheckpointMan log::info!("launching {self} for {submitter}"); loop { - if let Err(e) = self.submit_next_epoch(&submitter).await { + if let Err(e) = self.submit_next_epoch(submitter).await { log::error!("cannot submit checkpoint for submitter: {submitter} due to {e}"); } - tokio::time::sleep(submission_interval).await; } } /// Checks if the relayer has already submitted at the next submission epoch, if not it submits it. - async fn submit_next_epoch(&self, submitter: &Address) -> Result<()> { + async fn submit_next_epoch(&self, submitter: Address) -> Result<()> { let last_checkpoint_epoch = self .parent_handler .last_bottom_up_checkpoint_height(&self.metadata.child.id) @@ -137,6 +150,9 @@ impl BottomUpCheckpointMan let start = last_checkpoint_epoch + 1; log::debug!("start querying quorum reached events from : {start} to {finalized_height}"); + let mut count = 0; + let mut all_submit_tasks = vec![]; + for h in start..=finalized_height { let events = self.child_handler.quorum_reached_events(h).await?; if events.is_empty() { @@ -162,30 +178,67 @@ impl BottomUpCheckpointMan .await?; log::debug!("bottom up bundle: {bundle:?}"); - let epoch = self - .parent_handler - .submit_checkpoint( - submitter, - bundle.checkpoint, - bundle.signatures, - bundle.signatories, - ) + // We support parallel checkpoint submission using FIFO order with a limited parallelism (controlled by + // the size of submission_semaphore). + // We need to acquire a permit (from a limited permit pool) before submitting a checkpoint. + // We may wait here until a permit is available. + let parent_handler_clone = Arc::clone(&self.parent_handler); + let submission_permit = self + .submission_semaphore + .clone() + .acquire_owned() .await - .map_err(|e| { - anyhow!( - "cannot submit bottom up checkpoint at height {} due to: {e:}", - event.height - ) - })?; - - log::info!( - "submitted bottom up checkpoint({}) in parent at height {}", - event.height, - epoch - ); + .unwrap(); + all_submit_tasks.push(tokio::task::spawn(async move { + let height = event.height; + let result = + Self::submit_checkpoint(parent_handler_clone, submitter, bundle, event) + .await + .inspect_err(|err| { + log::error!("Fail to submit checkpoint at height {height}: {err}"); + }); + drop(submission_permit); + result + })); + + count += 1; + log::debug!("This round has asynchronously submitted {count} checkpoints",); } } + log::debug!("Waiting for all submissions to finish"); + // Return error if any of the submit task failed. + try_join_all(all_submit_tasks).await?; + + Ok(()) + } + + async fn submit_checkpoint( + parent_handler: Arc, + submitter: Address, + bundle: BottomUpCheckpointBundle, + event: QuorumReachedEvent, + ) -> Result<(), anyhow::Error> { + let epoch = parent_handler + .submit_checkpoint( + &submitter, + bundle.checkpoint, + bundle.signatures, + bundle.signatories, + ) + .await + .map_err(|e| { + anyhow!( + "cannot submit bottom up checkpoint at height {} due to: {e}", + event.height + ) + })?; + + log::info!( + "submitted bottom up checkpoint({}) in parent at height {}", + event.height, + epoch + ); Ok(()) } }