-
Notifications
You must be signed in to change notification settings - Fork 47
Support submit checkpoint in parallel #840
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
c8bfcd8
Implement it
mb1896 9b4a643
Remove debug log
mb1896 2abd2c0
Lint fix
mb1896 84d83d7
Expose max parallelism as cmd parameter
mb1896 4cd54de
More lint
mb1896 8137c77
Make sure we always drop permit before finish async task
mb1896 6c9a93e
Lint fix
mb1896 a46a84b
Address Akosh's comments
mb1896 4752714
Lint
mb1896 143b539
rename cli flag to --max-parallelism.
raulk a406d91
Merge branch 'main' into ENG-767
raulk 207d6e0
use inspect_err.
raulk File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -5,13 +5,16 @@ | |||||||||
| use crate::config::Subnet; | ||||||||||
| use crate::manager::{BottomUpCheckpointRelayer, EthSubnetManager}; | ||||||||||
| use anyhow::{anyhow, Result}; | ||||||||||
| use futures_util::future::try_join_all; | ||||||||||
| use fvm_shared::address::Address; | ||||||||||
| use fvm_shared::clock::ChainEpoch; | ||||||||||
| use ipc_api::checkpoint::{BottomUpCheckpointBundle, QuorumReachedEvent}; | ||||||||||
| use ipc_wallet::{EthKeyAddress, PersistentKeyStore}; | ||||||||||
| use std::cmp::max; | ||||||||||
| use std::fmt::{Display, Formatter}; | ||||||||||
| use std::sync::{Arc, RwLock}; | ||||||||||
| use std::time::Duration; | ||||||||||
| use tokio::sync::Semaphore; | ||||||||||
|
|
||||||||||
| /// Tracks the config required for bottom up checkpoint submissions | ||||||||||
| /// parent/child subnet and checkpoint period. | ||||||||||
|
|
@@ -26,10 +29,11 @@ pub struct CheckpointConfig { | |||||||||
| /// Then it will submit at the next submission height for the new checkpoint. | ||||||||||
| pub struct BottomUpCheckpointManager<T> { | ||||||||||
| metadata: CheckpointConfig, | ||||||||||
| parent_handler: T, | ||||||||||
| parent_handler: Arc<T>, | ||||||||||
| child_handler: T, | ||||||||||
| /// The number of blocks away from the chain head that is considered final | ||||||||||
| finalization_blocks: ChainEpoch, | ||||||||||
| submission_semaphore: Arc<Semaphore>, | ||||||||||
| } | ||||||||||
|
|
||||||||||
| impl<T: BottomUpCheckpointRelayer> BottomUpCheckpointManager<T> { | ||||||||||
|
|
@@ -38,6 +42,7 @@ impl<T: BottomUpCheckpointRelayer> BottomUpCheckpointManager<T> { | |||||||||
| child: Subnet, | ||||||||||
| parent_handler: T, | ||||||||||
| child_handler: T, | ||||||||||
| max_parallelism: usize, | ||||||||||
| ) -> Result<Self> { | ||||||||||
| let period = parent_handler | ||||||||||
| .checkpoint_period(&child.id) | ||||||||||
|
|
@@ -49,9 +54,10 @@ impl<T: BottomUpCheckpointRelayer> BottomUpCheckpointManager<T> { | |||||||||
| child, | ||||||||||
| period, | ||||||||||
| }, | ||||||||||
| parent_handler, | ||||||||||
| parent_handler: Arc::new(parent_handler), | ||||||||||
| child_handler, | ||||||||||
| finalization_blocks: 0, | ||||||||||
| submission_semaphore: Arc::new(Semaphore::new(max_parallelism)), | ||||||||||
| }) | ||||||||||
| } | ||||||||||
|
|
||||||||||
|
|
@@ -66,12 +72,20 @@ impl BottomUpCheckpointManager<EthSubnetManager> { | |||||||||
| parent: Subnet, | ||||||||||
| child: Subnet, | ||||||||||
| keystore: Arc<RwLock<PersistentKeyStore<EthKeyAddress>>>, | ||||||||||
| max_parallelism: usize, | ||||||||||
| ) -> Result<Self> { | ||||||||||
| let parent_handler = | ||||||||||
| EthSubnetManager::from_subnet_with_wallet_store(&parent, Some(keystore.clone()))?; | ||||||||||
| let child_handler = | ||||||||||
| EthSubnetManager::from_subnet_with_wallet_store(&child, Some(keystore))?; | ||||||||||
| Self::new(parent, child, parent_handler, child_handler).await | ||||||||||
| Self::new( | ||||||||||
| parent, | ||||||||||
| child, | ||||||||||
| parent_handler, | ||||||||||
| child_handler, | ||||||||||
| max_parallelism, | ||||||||||
| ) | ||||||||||
| .await | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
|
|
@@ -106,16 +120,15 @@ impl<T: BottomUpCheckpointRelayer + Send + Sync + 'static> BottomUpCheckpointMan | |||||||||
| log::info!("launching {self} for {submitter}"); | ||||||||||
|
|
||||||||||
| loop { | ||||||||||
| if let Err(e) = self.submit_next_epoch(&submitter).await { | ||||||||||
| if let Err(e) = self.submit_next_epoch(submitter).await { | ||||||||||
| log::error!("cannot submit checkpoint for submitter: {submitter} due to {e}"); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| tokio::time::sleep(submission_interval).await; | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| /// Checks if the relayer has already submitted at the next submission epoch, if not it submits it. | ||||||||||
| async fn submit_next_epoch(&self, submitter: &Address) -> Result<()> { | ||||||||||
| async fn submit_next_epoch(&self, submitter: Address) -> Result<()> { | ||||||||||
| let last_checkpoint_epoch = self | ||||||||||
| .parent_handler | ||||||||||
| .last_bottom_up_checkpoint_height(&self.metadata.child.id) | ||||||||||
|
|
@@ -137,6 +150,9 @@ impl<T: BottomUpCheckpointRelayer + Send + Sync + 'static> BottomUpCheckpointMan | |||||||||
| let start = last_checkpoint_epoch + 1; | ||||||||||
| log::debug!("start querying quorum reached events from : {start} to {finalized_height}"); | ||||||||||
|
|
||||||||||
| let mut count = 0; | ||||||||||
| let mut all_submit_tasks = vec![]; | ||||||||||
|
|
||||||||||
| for h in start..=finalized_height { | ||||||||||
| let events = self.child_handler.quorum_reached_events(h).await?; | ||||||||||
| if events.is_empty() { | ||||||||||
|
|
@@ -162,30 +178,67 @@ impl<T: BottomUpCheckpointRelayer + Send + Sync + 'static> BottomUpCheckpointMan | |||||||||
| .await?; | ||||||||||
| log::debug!("bottom up bundle: {bundle:?}"); | ||||||||||
|
|
||||||||||
| let epoch = self | ||||||||||
| .parent_handler | ||||||||||
| .submit_checkpoint( | ||||||||||
| submitter, | ||||||||||
| bundle.checkpoint, | ||||||||||
| bundle.signatures, | ||||||||||
| bundle.signatories, | ||||||||||
| ) | ||||||||||
| // We support parallel checkpoint submission using FIFO order with a limited parallelism (controlled by | ||||||||||
| // the size of submission_semaphore). | ||||||||||
| // We need to acquire a permit (from a limited permit pool) before submitting a checkpoint. | ||||||||||
| // We may wait here until a permit is available. | ||||||||||
| let parent_handler_clone = Arc::clone(&self.parent_handler); | ||||||||||
| let submission_permit = self | ||||||||||
| .submission_semaphore | ||||||||||
| .clone() | ||||||||||
| .acquire_owned() | ||||||||||
| .await | ||||||||||
| .map_err(|e| { | ||||||||||
| anyhow!( | ||||||||||
| "cannot submit bottom up checkpoint at height {} due to: {e:}", | ||||||||||
| event.height | ||||||||||
| ) | ||||||||||
| })?; | ||||||||||
|
|
||||||||||
| log::info!( | ||||||||||
| "submitted bottom up checkpoint({}) in parent at height {}", | ||||||||||
| event.height, | ||||||||||
| epoch | ||||||||||
| ); | ||||||||||
| .unwrap(); | ||||||||||
| all_submit_tasks.push(tokio::task::spawn(async move { | ||||||||||
| let height = event.height; | ||||||||||
| let result = | ||||||||||
| Self::submit_checkpoint(parent_handler_clone, submitter, bundle, event) | ||||||||||
| .await | ||||||||||
| .inspect_err(|err| { | ||||||||||
| log::error!("Fail to submit checkpoint at height {height}: {err}"); | ||||||||||
| }); | ||||||||||
| drop(submission_permit); | ||||||||||
| result | ||||||||||
| })); | ||||||||||
|
|
||||||||||
| count += 1; | ||||||||||
| log::debug!("This round has asynchronously submitted {count} checkpoints",); | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| log::debug!("Waiting for all submissions to finish"); | ||||||||||
| // Return error if any of the submit task failed. | ||||||||||
| try_join_all(all_submit_tasks).await?; | ||||||||||
|
|
||||||||||
| Ok(()) | ||||||||||
|
raulk marked this conversation as resolved.
|
||||||||||
| } | ||||||||||
|
|
||||||||||
| async fn submit_checkpoint( | ||||||||||
| parent_handler: Arc<T>, | ||||||||||
| submitter: Address, | ||||||||||
| bundle: BottomUpCheckpointBundle, | ||||||||||
| event: QuorumReachedEvent, | ||||||||||
| ) -> Result<(), anyhow::Error> { | ||||||||||
| let epoch = parent_handler | ||||||||||
| .submit_checkpoint( | ||||||||||
| &submitter, | ||||||||||
| bundle.checkpoint, | ||||||||||
| bundle.signatures, | ||||||||||
| bundle.signatories, | ||||||||||
| ) | ||||||||||
| .await | ||||||||||
| .map_err(|e| { | ||||||||||
| anyhow!( | ||||||||||
| "cannot submit bottom up checkpoint at height {} due to: {e}", | ||||||||||
| event.height | ||||||||||
| ) | ||||||||||
| })?; | ||||||||||
|
|
||||||||||
| log::info!( | ||||||||||
| "submitted bottom up checkpoint({}) in parent at height {}", | ||||||||||
| event.height, | ||||||||||
| epoch | ||||||||||
|
Comment on lines
+238
to
+240
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is also from original code. |
||||||||||
| ); | ||||||||||
| Ok(()) | ||||||||||
| } | ||||||||||
| } | ||||||||||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious about the behaviour when the number of checkpoints to submit (say 10) is actually more than number of permits (say 2). So
acquire_ownedwill actually block if 2 checkpoints are being submitted until one of 2 actually finishes? Even ifsubmit_checkpointfails and the thread crashes,drop(submission_permit)will be executed and no dead lock here right?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it will literally block (the
forloop just pauses).My last version had an issue where the async task can panic before dropping the permit (because the use of
unwrap()). It's been fixed here so the permit will be dropped no matter it succeeds or fails when submitting the checkpoint.