diff --git a/Cargo.lock b/Cargo.lock index 64867e532b4..d61044331eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -372,9 +372,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.63" +version = "0.1.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eff18d764974428cf3a9328e23fc5c986f5fbed46e6cd4cdf42544df5d297ec1" +checksum = "1cd7fce9ba8c3c042128ce72d8b2ddbf3a05747efb67ea0313c635e10bda47a2" dependencies = [ "proc-macro2", "quote", @@ -1844,6 +1844,7 @@ dependencies = [ "polkadot-primitives", "sc-client-api", "sc-consensus", + "schnellru", "sp-blockchain", "sp-consensus", "sp-runtime", @@ -1912,6 +1913,7 @@ dependencies = [ name = "cumulus-client-pov-recovery" version = "0.1.0" dependencies = [ + "async-trait", "cumulus-primitives-core", "cumulus-relay-chain-interface", "cumulus-test-service", @@ -2402,6 +2404,7 @@ dependencies = [ "cumulus-client-consensus-common", "cumulus-client-consensus-relay-chain", "cumulus-client-network", + "cumulus-client-pov-recovery", "cumulus-client-service", "cumulus-primitives-core", "cumulus-primitives-parachain-inherent", @@ -2419,6 +2422,8 @@ dependencies = [ "parachains-common", "parity-scale-codec", "polkadot-cli", + "polkadot-node-subsystem", + "polkadot-overseer", "polkadot-primitives", "polkadot-service", "polkadot-test-service", diff --git a/client/consensus/common/Cargo.toml b/client/consensus/common/Cargo.toml index c992c263593..a91dcaf5063 100644 --- a/client/consensus/common/Cargo.toml +++ b/client/consensus/common/Cargo.toml @@ -28,6 +28,7 @@ polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = cumulus-primitives-core = { path = "../../../primitives/core" } cumulus-relay-chain-interface = { path = "../../relay-chain-interface" } cumulus-client-pov-recovery = { path = "../../pov-recovery" } +schnellru = "0.2.1" [dev-dependencies] futures-timer = "3.0.2" diff --git a/client/consensus/common/src/parachain_consensus.rs b/client/consensus/common/src/parachain_consensus.rs index 9bd2e144663..17e07fb3b91 100644 --- a/client/consensus/common/src/parachain_consensus.rs +++ b/client/consensus/common/src/parachain_consensus.rs @@ -18,11 +18,12 @@ use sc_client_api::{ Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider, }; use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy}; +use schnellru::{ByLength, LruMap}; use sp_blockchain::Error as ClientError; use sp_consensus::{BlockOrigin, BlockStatus}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; -use cumulus_client_pov_recovery::{RecoveryDelay, RecoveryKind, RecoveryRequest}; +use cumulus_client_pov_recovery::{RecoveryKind, RecoveryRequest}; use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; use polkadot_primitives::{Hash as PHash, Id as ParaId, OccupiedCoreAssumption}; @@ -30,16 +31,60 @@ use polkadot_primitives::{Hash as PHash, Id as ParaId, OccupiedCoreAssumption}; use codec::Decode; use futures::{channel::mpsc::Sender, pin_mut, select, FutureExt, Stream, StreamExt}; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; const LOG_TARGET: &str = "cumulus-consensus"; +const FINALIZATION_CACHE_SIZE: u32 = 40; -// Delay range to trigger explicit requests. -// The chosen value doesn't have any special meaning, a random delay within the order of -// seconds in practice should be a good enough to allow a quick recovery without DOSing -// the relay chain. -const RECOVERY_DELAY: RecoveryDelay = - RecoveryDelay { min: Duration::ZERO, max: Duration::from_secs(30) }; +fn handle_new_finalized_head( + parachain: &Arc

, + finalized_head: Vec, + last_seen_finalized_hashes: &mut LruMap, +) where + Block: BlockT, + B: Backend, + P: Finalizer + UsageProvider + BlockchainEvents, +{ + let header = match Block::Header::decode(&mut &finalized_head[..]) { + Ok(header) => header, + Err(err) => { + tracing::debug!( + target: LOG_TARGET, + error = ?err, + "Could not decode parachain header while following finalized heads.", + ); + return + }, + }; + + let hash = header.hash(); + + last_seen_finalized_hashes.insert(hash, ()); + + // Only finalize if we are below the incoming finalized parachain head + if parachain.usage_info().chain.finalized_number < *header.number() { + tracing::debug!( + target: LOG_TARGET, + block_hash = ?hash, + "Attempting to finalize header.", + ); + if let Err(e) = parachain.finalize_block(hash, None, true) { + match e { + ClientError::UnknownBlock(_) => tracing::debug!( + target: LOG_TARGET, + block_hash = ?hash, + "Could not finalize block because it is unknown.", + ), + _ => tracing::warn!( + target: LOG_TARGET, + error = ?e, + block_hash = ?hash, + "Failed to finalize block", + ), + } + } + } +} /// Follow the finalized head of the given parachain. /// @@ -48,57 +93,75 @@ const RECOVERY_DELAY: RecoveryDelay = async fn follow_finalized_head(para_id: ParaId, parachain: Arc

, relay_chain: R) where Block: BlockT, - P: Finalizer + UsageProvider, + P: Finalizer + UsageProvider + BlockchainEvents, R: RelayChainInterface + Clone, B: Backend, { let finalized_heads = match finalized_heads(relay_chain, para_id).await { - Ok(finalized_heads_stream) => finalized_heads_stream, + Ok(finalized_heads_stream) => finalized_heads_stream.fuse(), Err(err) => { tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream."); return }, }; + let mut imported_blocks = parachain.import_notification_stream().fuse(); + pin_mut!(finalized_heads); - loop { - let finalized_head = if let Some(h) = finalized_heads.next().await { - h - } else { - tracing::debug!(target: LOG_TARGET, "Stopping following finalized head."); - return - }; + // We use this cache to finalize blocks that are imported late. + // For example, a block that has been recovered via PoV-Recovery + // on a full node can have several minutes delay. With this cache + // we have some "memory" of recently finalized blocks. + let mut last_seen_finalized_hashes = LruMap::new(ByLength::new(FINALIZATION_CACHE_SIZE)); - let header = match Block::Header::decode(&mut &finalized_head[..]) { - Ok(header) => header, - Err(err) => { - tracing::debug!( - target: LOG_TARGET, - error = ?err, - "Could not decode parachain header while following finalized heads.", - ); - continue + loop { + select! { + fin = finalized_heads.next() => { + match fin { + Some(finalized_head) => + handle_new_finalized_head(¶chain, finalized_head, &mut last_seen_finalized_hashes), + None => { + tracing::debug!(target: LOG_TARGET, "Stopping following finalized head."); + return + } + } }, - }; - - let hash = header.hash(); - - // don't finalize the same block multiple times. - if parachain.usage_info().chain.finalized_hash != hash { - if let Err(e) = parachain.finalize_block(hash, None, true) { - match e { - ClientError::UnknownBlock(_) => tracing::debug!( - target: LOG_TARGET, - block_hash = ?hash, - "Could not finalize block because it is unknown.", - ), - _ => tracing::warn!( - target: LOG_TARGET, - error = ?e, - block_hash = ?hash, - "Failed to finalize block", - ), + imported = imported_blocks.next() => { + match imported { + Some(imported_block) => { + // When we see a block import that is already finalized, we immediately finalize it. + if last_seen_finalized_hashes.peek(&imported_block.hash).is_some() { + tracing::debug!( + target: LOG_TARGET, + block_hash = ?imported_block.hash, + "Setting newly imported block as finalized.", + ); + + if let Err(e) = parachain.finalize_block(imported_block.hash, None, true) { + match e { + ClientError::UnknownBlock(_) => tracing::debug!( + target: LOG_TARGET, + block_hash = ?imported_block.hash, + "Could not finalize block because it is unknown.", + ), + _ => tracing::warn!( + target: LOG_TARGET, + error = ?e, + block_hash = ?imported_block.hash, + "Failed to finalize block", + ), + } + } + } + }, + None => { + tracing::debug!( + target: LOG_TARGET, + "Stopping following imported blocks.", + ); + return + } } } } @@ -266,7 +329,11 @@ async fn handle_new_block_imported( let unset_best_header = unset_best_header_opt .take() .expect("We checked above that the value is set; qed"); - + tracing::debug!( + target: LOG_TARGET, + ?unset_hash, + "Importing block as new best for parachain.", + ); import_block_as_new_best(unset_hash, unset_best_header, parachain).await; }, state => tracing::debug!( @@ -315,7 +382,11 @@ async fn handle_new_best_parachain_head( match parachain.block_status(hash) { Ok(BlockStatus::InChainWithState) => { unset_best_header.take(); - + tracing::debug!( + target: LOG_TARGET, + ?hash, + "Importing block as new best for parachain.", + ); import_block_as_new_best(hash, parachain_head, parachain).await; }, Ok(BlockStatus::InChainPruned) => { @@ -338,8 +409,7 @@ async fn handle_new_best_parachain_head( // Best effort channel to actively encourage block recovery. // An error here is not fatal; the relay chain continuously re-announces // the best block, thus we will have other opportunities to retry. - let req = - RecoveryRequest { hash, delay: RECOVERY_DELAY, kind: RecoveryKind::Full }; + let req = RecoveryRequest { hash, kind: RecoveryKind::Full }; if let Err(err) = recovery_chan_tx.try_send(req) { tracing::warn!( target: LOG_TARGET, diff --git a/client/pov-recovery/Cargo.toml b/client/pov-recovery/Cargo.toml index 1ec225c969d..2cce470f823 100644 --- a/client/pov-recovery/Cargo.toml +++ b/client/pov-recovery/Cargo.toml @@ -28,6 +28,7 @@ polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = # Cumulus cumulus-primitives-core = { path = "../../primitives/core" } cumulus-relay-chain-interface = {path = "../relay-chain-interface"} +async-trait = "0.1.64" [dev-dependencies] tokio = { version = "1.24.2", features = ["macros"] } diff --git a/client/pov-recovery/src/active_candidate_recovery.rs b/client/pov-recovery/src/active_candidate_recovery.rs index caae3615a85..feb09d005ce 100644 --- a/client/pov-recovery/src/active_candidate_recovery.rs +++ b/client/pov-recovery/src/active_candidate_recovery.rs @@ -18,12 +18,13 @@ use sp_runtime::traits::Block as BlockT; use polkadot_node_primitives::AvailableData; use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage; -use polkadot_overseer::Handle as OverseerHandle; use futures::{channel::oneshot, stream::FuturesUnordered, Future, FutureExt, StreamExt}; use std::{collections::HashSet, pin::Pin}; +use crate::RecoveryHandle; + /// The active candidate recovery. /// /// This handles the candidate recovery and tracks the activate recoveries. @@ -34,12 +35,12 @@ pub(crate) struct ActiveCandidateRecovery { >, /// The block hashes of the candidates currently being recovered. candidates: HashSet, - overseer_handle: OverseerHandle, + recovery_handle: Box, } impl ActiveCandidateRecovery { - pub fn new(overseer_handle: OverseerHandle) -> Self { - Self { recoveries: Default::default(), candidates: Default::default(), overseer_handle } + pub fn new(recovery_handle: Box) -> Self { + Self { recoveries: Default::default(), candidates: Default::default(), recovery_handle } } /// Recover the given `candidate`. @@ -50,8 +51,8 @@ impl ActiveCandidateRecovery { ) { let (tx, rx) = oneshot::channel(); - self.overseer_handle - .send_msg( + self.recovery_handle + .send_recovery_msg( AvailabilityRecoveryMessage::RecoverAvailableData( candidate.receipt.clone(), candidate.session_index, @@ -90,11 +91,6 @@ impl ActiveCandidateRecovery { ); } - /// Returns if the given `candidate` is being recovered. - pub fn is_being_recovered(&self, candidate: &Block::Hash) -> bool { - self.candidates.contains(candidate) - } - /// Waits for the next recovery. /// /// If the returned [`AvailableData`] is `None`, it means that the recovery failed. diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs index 2eed968d5f9..60fbdab310c 100644 --- a/client/pov-recovery/src/lib.rs +++ b/client/pov-recovery/src/lib.rs @@ -29,14 +29,18 @@ //! //! 1. For every included relay chain block we note the backed candidate of our parachain. If the //! block belonging to the PoV is already known, we do nothing. Otherwise we start -//! a timer that waits a random time between 0..relay_chain_slot_length before starting to recover +//! a timer that waits for a randomized time inside a specified interval before starting to recover //! the PoV. //! //! 2. If between starting and firing the timer the block is imported, we skip the recovery of the //! PoV. //! -//! 3. If the timer fired we recover the PoV using the relay chain PoV recovery protocol. After it -//! is recovered, we restore the block and import it. +//! 3. If the timer fired we recover the PoV using the relay chain PoV recovery protocol. +//! +//! 4a. After it is recovered, we restore the block and import it. +//! +//! 4b. Since we are trying to recover pending candidates, availability is not guaranteed. If the block +//! PoV is not yet available, we retry. //! //! If we need to recover multiple PoV blocks (which should hopefully not happen in real life), we //! make sure that the blocks are imported in the correct order. @@ -47,6 +51,7 @@ use sp_consensus::{BlockOrigin, BlockStatus}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; use polkadot_node_primitives::{AvailableData, POV_BOMB_LIMIT}; +use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage; use polkadot_overseer::Handle as OverseerHandle; use polkadot_primitives::{ CandidateReceipt, CommittedCandidateReceipt, Id as ParaId, SessionIndex, @@ -60,10 +65,10 @@ use futures::{ channel::mpsc::Receiver, select, stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt, }; use futures_timer::Delay; -use rand::{thread_rng, Rng}; +use rand::{distributions::Uniform, prelude::Distribution, thread_rng}; use std::{ - collections::{HashMap, VecDeque}, + collections::{HashMap, HashSet, VecDeque}, pin::Pin, sync::Arc, time::Duration, @@ -74,6 +79,28 @@ use active_candidate_recovery::ActiveCandidateRecovery; const LOG_TARGET: &str = "cumulus-pov-recovery"; +/// Test-friendly wrapper trait for the overseer handle. +/// Can be used to simulate failing recovery requests. +#[async_trait::async_trait] +pub trait RecoveryHandle: Send { + async fn send_recovery_msg( + &mut self, + message: AvailabilityRecoveryMessage, + origin: &'static str, + ); +} + +#[async_trait::async_trait] +impl RecoveryHandle for OverseerHandle { + async fn send_recovery_msg( + &mut self, + message: AvailabilityRecoveryMessage, + origin: &'static str, + ) { + self.send_msg(message, origin).await; + } +} + /// Type of recovery to trigger. #[derive(Debug, PartialEq)] pub enum RecoveryKind { @@ -87,24 +114,30 @@ pub enum RecoveryKind { pub struct RecoveryRequest { /// Hash of the last block to recover. pub hash: Block::Hash, - /// Recovery delay range. Randomizing the start of the recovery within this interval - /// can be used to prevent self-DOSing if the recovery request is part of a - /// distributed protocol and there is the possibility that multiple actors are - /// requiring to perform the recovery action at approximately the same time. - pub delay: RecoveryDelay, /// Recovery type. pub kind: RecoveryKind, } /// The delay between observing an unknown block and triggering the recovery of a block. +/// Randomizing the start of the recovery within this interval +/// can be used to prevent self-DOSing if the recovery request is part of a +/// distributed protocol and there is the possibility that multiple actors are +/// requiring to perform the recovery action at approximately the same time. #[derive(Clone, Copy)] -pub struct RecoveryDelay { +pub struct RecoveryDelayRange { /// Start recovering after `min` delay. pub min: Duration, /// Start recovering before `max` delay. pub max: Duration, } +impl RecoveryDelayRange { + /// Produce a randomized duration between `min` and `max`. + fn duration(&self) -> Duration { + Uniform::from(self.min..=self.max).sample(&mut thread_rng()) + } +} + /// Represents an outstanding block candidate. struct Candidate { receipt: CandidateReceipt, @@ -112,9 +145,66 @@ struct Candidate { block_number: NumberFor, parent_hash: Block::Hash, // Lazy recovery has been submitted. + // Should be true iff a block is either queued to be recovered or + // recovery is currently in progress. waiting_recovery: bool, } +/// Queue that is used to decide when to start PoV-recovery operations. +struct RecoveryQueue { + recovery_delay_range: RecoveryDelayRange, + // Queue that keeps the hashes of blocks to be recovered. + recovery_queue: VecDeque, + // Futures that resolve when a new recovery should be started. + signaling_queue: FuturesUnordered + Send>>>, +} + +impl RecoveryQueue { + pub fn new(recovery_delay_range: RecoveryDelayRange) -> Self { + Self { + recovery_delay_range, + recovery_queue: Default::default(), + signaling_queue: Default::default(), + } + } + + /// Add hash of a block that should go to the end of the recovery queue. + /// A new recovery will be signaled after `delay` has passed. + pub fn push_recovery(&mut self, hash: Block::Hash) { + let delay = self.recovery_delay_range.duration(); + tracing::debug!( + target: LOG_TARGET, + block_hash = ?hash, + "Adding block to queue and adding new recovery slot in {:?} sec", + delay.as_secs(), + ); + self.recovery_queue.push_back(hash); + self.signaling_queue.push( + async move { + Delay::new(delay).await; + } + .boxed(), + ); + } + + /// Get the next hash for block recovery. + pub async fn next_recovery(&mut self) -> Block::Hash { + loop { + if let Some(_) = self.signaling_queue.next().await { + if let Some(hash) = self.recovery_queue.pop_front() { + return hash + } else { + tracing::error!( + target: LOG_TARGET, + "Recovery was signaled, but no candidate hash available. This is a bug." + ); + }; + } + futures::pending!() + } + } +} + /// Encapsulates the logic of the pov recovery. pub struct PoVRecovery { /// All the pending candidates that we are waiting for to be imported or that need to be @@ -122,21 +212,22 @@ pub struct PoVRecovery { candidates: HashMap>, /// A stream of futures that resolve to hashes of candidates that need to be recovered. /// - /// The candidates to the hashes are stored in `pending_candidates`. If a candidate is not + /// The candidates to the hashes are stored in `candidates`. If a candidate is not /// available anymore in this map, it means that it was already imported. - next_candidate_to_recover: FuturesUnordered + Send>>>, + candidate_recovery_queue: RecoveryQueue, active_candidate_recovery: ActiveCandidateRecovery, /// Blocks that wait that the parent is imported. /// /// Uses parent -> blocks mapping. waiting_for_parent: HashMap>, - recovery_delay: RecoveryDelay, parachain_client: Arc, parachain_import_queue: Box>, relay_chain_interface: RC, para_id: ParaId, /// Explicit block recovery requests channel. recovery_chan_rx: Receiver>, + /// Blocks that we are retrying currently + candidates_in_retry: HashSet, } impl PoVRecovery @@ -146,8 +237,8 @@ where { /// Create a new instance. pub fn new( - overseer_handle: OverseerHandle, - recovery_delay: RecoveryDelay, + recovery_handle: Box, + recovery_delay_range: RecoveryDelayRange, parachain_client: Arc, parachain_import_queue: Box>, relay_chain_interface: RCInterface, @@ -156,14 +247,14 @@ where ) -> Self { Self { candidates: HashMap::new(), - next_candidate_to_recover: Default::default(), - active_candidate_recovery: ActiveCandidateRecovery::new(overseer_handle), - recovery_delay, + candidate_recovery_queue: RecoveryQueue::new(recovery_delay_range), + active_candidate_recovery: ActiveCandidateRecovery::new(recovery_handle), waiting_for_parent: HashMap::new(), parachain_client, parachain_import_queue, relay_chain_interface, para_id, + candidates_in_retry: HashSet::new(), recovery_chan_rx, } } @@ -210,15 +301,11 @@ where // If required, triggers a lazy recovery request that will eventually be blocked // if in the meantime the block is imported. - self.recover(RecoveryRequest { - hash, - delay: self.recovery_delay, - kind: RecoveryKind::Simple, - }); + self.recover(RecoveryRequest { hash, kind: RecoveryKind::Simple }); } - /// Handle an imported block. - fn handle_block_imported(&mut self, block_hash: &Block::Hash) { + /// Block is no longer waiting for recovery + fn clear_waiting_recovery(&mut self, block_hash: &Block::Hash) { self.candidates.get_mut(block_hash).map(|candidate| { // Prevents triggering an already enqueued recovery request candidate.waiting_recovery = false; @@ -241,9 +328,9 @@ where } } - /// Clear `waiting_for_parent` from the given `hash` and do this recursively for all child - /// blocks. - fn clear_waiting_for_parent(&mut self, hash: Block::Hash) { + /// Clear `waiting_for_parent` and `waiting_recovery` for the candidate with `hash`. + /// Also clears children blocks waiting for this parent. + fn reset_candidate(&mut self, hash: Block::Hash) { let mut blocks_to_delete = vec![hash]; while let Some(delete) = blocks_to_delete.pop() { @@ -251,6 +338,7 @@ where blocks_to_delete.extend(childs.iter().map(BlockT::hash)); } } + self.clear_waiting_recovery(&hash); } /// Handle a recovered candidate. @@ -260,11 +348,25 @@ where available_data: Option, ) { let available_data = match available_data { - Some(data) => data, - None => { - self.clear_waiting_for_parent(block_hash); - return + Some(data) => { + self.candidates_in_retry.remove(&block_hash); + data }, + None => + if self.candidates_in_retry.insert(block_hash) { + tracing::debug!(target: LOG_TARGET, ?block_hash, "Recovery failed, retrying."); + self.candidate_recovery_queue.push_recovery(block_hash); + return + } else { + tracing::warn!( + target: LOG_TARGET, + ?block_hash, + "Unable to recover block after retry.", + ); + self.candidates_in_retry.remove(&block_hash); + self.reset_candidate(block_hash); + return + }, }; let raw_block_data = match sp_maybe_compressed_blob::decompress( @@ -275,8 +377,7 @@ where Err(error) => { tracing::debug!(target: LOG_TARGET, ?error, "Failed to decompress PoV"); - self.clear_waiting_for_parent(block_hash); - + self.reset_candidate(block_hash); return }, }; @@ -290,8 +391,7 @@ where "Failed to decode parachain block data from recovered PoV", ); - self.clear_waiting_for_parent(block_hash); - + self.reset_candidate(block_hash); return }, }; @@ -302,12 +402,17 @@ where match self.parachain_client.block_status(parent) { Ok(BlockStatus::Unknown) => { - if self.active_candidate_recovery.is_being_recovered(&parent) { + // If the parent block is currently being recovered or is scheduled to be recovered, + // we want to wait for the parent. + let parent_scheduled_for_recovery = + self.candidates.get(&parent).map_or(false, |parent| parent.waiting_recovery); + if parent_scheduled_for_recovery { tracing::debug!( target: LOG_TARGET, ?block_hash, parent_hash = ?parent, - "Parent is still being recovered, waiting.", + parent_scheduled_for_recovery, + "Waiting for recovery of parent.", ); self.waiting_for_parent.entry(parent).or_default().push(block); @@ -320,8 +425,7 @@ where "Parent not found while trying to import recovered block.", ); - self.clear_waiting_for_parent(block_hash); - + self.reset_candidate(block_hash); return } }, @@ -333,8 +437,7 @@ where "Error while checking block status", ); - self.clear_waiting_for_parent(block_hash); - + self.reset_candidate(block_hash); return }, // Any other status is fine to "ignore/accept" @@ -383,10 +486,10 @@ where /// Attempts an explicit recovery of one or more blocks. pub fn recover(&mut self, req: RecoveryRequest) { - let RecoveryRequest { mut hash, delay, kind } = req; + let RecoveryRequest { mut hash, kind } = req; let mut to_recover = Vec::new(); - let do_recover = loop { + loop { let candidate = match self.candidates.get_mut(&hash) { Some(candidate) => candidate, None => { @@ -395,7 +498,7 @@ where block_hash = ?hash, "Cound not recover. Block was never announced as candidate" ); - break false + return }, }; @@ -404,7 +507,7 @@ where candidate.waiting_recovery = true; to_recover.push(hash); }, - Ok(_) => break true, + Ok(_) => break, Err(e) => { tracing::error!( target: LOG_TARGET, @@ -412,36 +515,22 @@ where block_hash = ?hash, "Failed to get block status", ); - break false + for hash in to_recover { + self.clear_waiting_recovery(&hash); + } + return }, } if kind == RecoveryKind::Simple { - break true + break } hash = candidate.parent_hash; - }; + } - if do_recover { - for hash in to_recover.into_iter().rev() { - let delay = - delay.min + delay.max.saturating_sub(delay.min).mul_f64(thread_rng().gen()); - tracing::debug!( - target: LOG_TARGET, - block_hash = ?hash, - "Starting {:?} block recovery in {:?} sec", - kind, - delay.as_secs(), - ); - self.next_candidate_to_recover.push( - async move { - Delay::new(delay).await; - hash - } - .boxed(), - ); - } + for hash in to_recover.into_iter().rev() { + self.candidate_recovery_queue.push_recovery(hash); } } @@ -480,7 +569,7 @@ where }, imported = imported_blocks.next() => { if let Some(imported) = imported { - self.handle_block_imported(&imported.hash); + self.clear_waiting_recovery(&imported.hash); } else { tracing::debug!(target: LOG_TARGET, "Imported blocks stream ended"); return; @@ -494,10 +583,8 @@ where return; } }, - next_to_recover = self.next_candidate_to_recover.next() => { - if let Some(block_hash) = next_to_recover { - self.recover_candidate(block_hash).await; - } + next_to_recover = self.candidate_recovery_queue.next_recovery().fuse() => { + self.recover_candidate(next_to_recover).await; }, (block_hash, available_data) = self.active_candidate_recovery.wait_for_recovery().fuse() => diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 8906ee23181..ab95352f21d 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -20,7 +20,7 @@ use cumulus_client_cli::CollatorOptions; use cumulus_client_consensus_common::ParachainConsensus; -use cumulus_client_pov_recovery::{PoVRecovery, RecoveryDelay}; +use cumulus_client_pov_recovery::{PoVRecovery, RecoveryDelayRange, RecoveryHandle}; use cumulus_primitives_core::{CollectCollationInfo, ParaId}; use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain; use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; @@ -59,6 +59,7 @@ pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, RCInterface, Spawn pub import_queue: Box>, pub collator_key: CollatorPair, pub relay_chain_slot_duration: Duration, + pub recovery_handle: Box, } /// Start a collator node for a parachain. @@ -79,6 +80,7 @@ pub async fn start_collator<'a, Block, BS, Client, Backend, RCInterface, Spawner import_queue, collator_key, relay_chain_slot_duration, + recovery_handle, }: StartCollatorParams<'a, Block, BS, Client, RCInterface, Spawner>, ) -> sc_service::error::Result<()> where @@ -113,15 +115,12 @@ where .spawn_essential_handle() .spawn("cumulus-consensus", None, consensus); - let overseer_handle = relay_chain_interface - .overseer_handle() - .map_err(|e| sc_service::Error::Application(Box::new(e)))?; - let pov_recovery = PoVRecovery::new( - overseer_handle.clone(), + recovery_handle, // We want that collators wait at maximum the relay chain slot duration before starting - // to recover blocks. - RecoveryDelay { min: core::time::Duration::ZERO, max: relay_chain_slot_duration }, + // to recover blocks. Additionally, we wait at least half the slot time to give the + // relay chain the chance to increase availability. + RecoveryDelayRange { min: relay_chain_slot_duration / 2, max: relay_chain_slot_duration }, client.clone(), import_queue, relay_chain_interface.clone(), @@ -132,6 +131,10 @@ where task_manager .spawn_essential_handle() .spawn("cumulus-pov-recovery", None, pov_recovery.run()); + + let overseer_handle = relay_chain_interface + .overseer_handle() + .map_err(|e| sc_service::Error::Application(Box::new(e)))?; cumulus_client_collator::start_collator(cumulus_client_collator::StartCollatorParams { runtime_api: client, block_status, @@ -156,6 +159,7 @@ pub struct StartFullNodeParams<'a, Block: BlockT, Client, RCInterface> { pub announce_block: Arc>) + Send + Sync>, pub relay_chain_slot_duration: Duration, pub import_queue: Box>, + pub recovery_handle: Box, } /// Start a full node for a parachain. @@ -171,6 +175,7 @@ pub fn start_full_node( para_id, relay_chain_slot_duration, import_queue, + recovery_handle, }: StartFullNodeParams, ) -> sc_service::error::Result<()> where @@ -200,18 +205,17 @@ where .spawn_essential_handle() .spawn("cumulus-consensus", None, consensus); - let overseer_handle = relay_chain_interface - .overseer_handle() - .map_err(|e| sc_service::Error::Application(Box::new(e)))?; - let pov_recovery = PoVRecovery::new( - overseer_handle, + recovery_handle, // Full nodes should at least wait 2.5 minutes (assuming 6 seconds slot duration) and // in maximum 5 minutes before starting to recover blocks. Collators should already start // the recovery way before full nodes try to recover a certain block and then share the // block with the network using "the normal way". Full nodes are just the "last resort" // for block recovery. - RecoveryDelay { min: relay_chain_slot_duration * 25, max: relay_chain_slot_duration * 50 }, + RecoveryDelayRange { + min: relay_chain_slot_duration * 25, + max: relay_chain_slot_duration * 50, + }, client, import_queue, relay_chain_interface, diff --git a/parachain-template/node/src/service.rs b/parachain-template/node/src/service.rs index e5258e675e2..4c6f61838c3 100644 --- a/parachain-template/node/src/service.rs +++ b/parachain-template/node/src/service.rs @@ -256,6 +256,10 @@ async fn start_node_impl( let relay_chain_slot_duration = Duration::from_secs(6); + let overseer_handle = relay_chain_interface + .overseer_handle() + .map_err(|e| sc_service::Error::Application(Box::new(e)))?; + if validator { let parachain_consensus = build_consensus( client.clone(), @@ -284,6 +288,7 @@ async fn start_node_impl( import_queue: import_queue_service, collator_key: collator_key.expect("Command line arguments do not allow this. qed"), relay_chain_slot_duration, + recovery_handle: Box::new(overseer_handle), }; start_collator(params).await?; @@ -296,6 +301,7 @@ async fn start_node_impl( relay_chain_interface, relay_chain_slot_duration, import_queue: import_queue_service, + recovery_handle: Box::new(overseer_handle), }; start_full_node(params)?; diff --git a/polkadot-parachain/src/service.rs b/polkadot-parachain/src/service.rs index c9b3d5adad3..13c904193bf 100644 --- a/polkadot-parachain/src/service.rs +++ b/polkadot-parachain/src/service.rs @@ -452,6 +452,10 @@ where let relay_chain_slot_duration = Duration::from_secs(6); + let overseer_handle = relay_chain_interface + .overseer_handle() + .map_err(|e| sc_service::Error::Application(Box::new(e)))?; + if validator { let parachain_consensus = build_consensus( client.clone(), @@ -480,6 +484,7 @@ where import_queue: import_queue_service, collator_key: collator_key.expect("Command line arguments do not allow this. qed"), relay_chain_slot_duration, + recovery_handle: Box::new(overseer_handle), }; start_collator(params).await?; @@ -492,6 +497,7 @@ where relay_chain_interface, relay_chain_slot_duration, import_queue: import_queue_service, + recovery_handle: Box::new(overseer_handle), }; start_full_node(params)?; @@ -652,6 +658,9 @@ where let relay_chain_slot_duration = Duration::from_secs(6); + let overseer_handle = relay_chain_interface + .overseer_handle() + .map_err(|e| sc_service::Error::Application(Box::new(e)))?; if validator { let parachain_consensus = build_consensus( client.clone(), @@ -680,6 +689,7 @@ where import_queue: import_queue_service, collator_key: collator_key.expect("Command line arguments do not allow this. qed"), relay_chain_slot_duration, + recovery_handle: Box::new(overseer_handle), }; start_collator(params).await?; @@ -692,6 +702,7 @@ where relay_chain_interface, relay_chain_slot_duration, import_queue: import_queue_service, + recovery_handle: Box::new(overseer_handle), }; start_full_node(params)?; @@ -1425,6 +1436,9 @@ where let relay_chain_slot_duration = Duration::from_secs(6); + let overseer_handle = relay_chain_interface + .overseer_handle() + .map_err(|e| sc_service::Error::Application(Box::new(e)))?; if validator { let parachain_consensus = build_consensus( client.clone(), @@ -1453,6 +1467,7 @@ where import_queue: import_queue_service, collator_key: collator_key.expect("Command line arguments do not allow this. qed"), relay_chain_slot_duration, + recovery_handle: Box::new(overseer_handle), }; start_collator(params).await?; @@ -1465,6 +1480,7 @@ where relay_chain_interface, relay_chain_slot_duration, import_queue: import_queue_service, + recovery_handle: Box::new(overseer_handle), }; start_full_node(params)?; diff --git a/test/service/Cargo.toml b/test/service/Cargo.toml index 30668f90ab3..f01b97f4522 100644 --- a/test/service/Cargo.toml +++ b/test/service/Cargo.toml @@ -54,6 +54,8 @@ polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = polkadot-service = { git = "https://github.com/paritytech/polkadot", branch = "master" } polkadot-test-service = { git = "https://github.com/paritytech/polkadot", branch = "master" } polkadot-cli = { git = "https://github.com/paritytech/polkadot", branch = "master" } +polkadot-node-subsystem = { git = "https://github.com/paritytech/polkadot", branch = "master" } +polkadot-overseer = { git = "https://github.com/paritytech/polkadot", branch = "master" } # Cumulus cumulus-client-cli = { path = "../../client/cli" } @@ -70,6 +72,7 @@ cumulus-relay-chain-rpc-interface = { path = "../../client/relay-chain-rpc-inter cumulus-test-relay-validation-worker-provider = { path = "../relay-validation-worker-provider" } cumulus-test-runtime = { path = "../runtime" } cumulus-relay-chain-minimal-node = { path = "../../client/relay-chain-minimal-node" } +cumulus-client-pov-recovery = { path = "../../client/pov-recovery" } [dev-dependencies] futures = "0.3.25" diff --git a/test/service/src/cli.rs b/test/service/src/cli.rs index 31829a66d26..3cf99496576 100644 --- a/test/service/src/cli.rs +++ b/test/service/src/cli.rs @@ -49,6 +49,9 @@ pub struct TestCollatorCli { #[arg(long)] pub disable_block_announcements: bool, + + #[arg(long)] + pub fail_pov_recovery: bool, } #[derive(Debug, clap::Subcommand)] diff --git a/test/service/src/lib.rs b/test/service/src/lib.rs index e14b7b7463e..3a7e4fba32e 100644 --- a/test/service/src/lib.rs +++ b/test/service/src/lib.rs @@ -34,6 +34,7 @@ use cumulus_client_consensus_common::{ ParachainBlockImport as TParachainBlockImport, ParachainCandidate, ParachainConsensus, }; use cumulus_client_network::BlockAnnounceValidator; +use cumulus_client_pov_recovery::RecoveryHandle; use cumulus_client_service::{ prepare_node_config, start_collator, start_full_node, StartCollatorParams, StartFullNodeParams, }; @@ -45,6 +46,8 @@ use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node; use cumulus_test_runtime::{Hash, Header, NodeBlock as Block, RuntimeApi}; use frame_system_rpc_runtime_api::AccountNonceApi; +use polkadot_node_subsystem::{errors::RecoveryError, messages::AvailabilityRecoveryMessage}; +use polkadot_overseer::Handle as OverseerHandle; use polkadot_primitives::{CollatorPair, Hash as PHash, PersistedValidationData}; use polkadot_service::ProvideRuntimeApi; use sc_client_api::execution_extensions::ExecutionStrategies; @@ -76,6 +79,8 @@ pub use cumulus_test_runtime as runtime; pub use genesis::*; pub use sp_keyring::Sr25519Keyring as Keyring; +const LOG_TARGET: &str = "cumulus-test-service"; + /// A consensus that will never produce any block. #[derive(Clone)] struct NullConsensus; @@ -126,6 +131,41 @@ pub type ParachainBlockImport = TParachainBlockImport, Backen /// Transaction pool type used by the test service pub type TransactionPool = Arc>; +/// Recovery handle that fails regularly to simulate unavailable povs. +pub struct FailingRecoveryHandle { + overseer_handle: OverseerHandle, + counter: u32, +} + +impl FailingRecoveryHandle { + /// Create a new FailingRecoveryHandle + pub fn new(overseer_handle: OverseerHandle) -> Self { + Self { overseer_handle, counter: 0 } + } +} + +#[async_trait::async_trait] +impl RecoveryHandle for FailingRecoveryHandle { + async fn send_recovery_msg( + &mut self, + message: AvailabilityRecoveryMessage, + origin: &'static str, + ) { + // For every 5th block we immediately signal unavailability to trigger + // a retry. + if self.counter % 5 == 0 { + let AvailabilityRecoveryMessage::RecoverAvailableData(_, _, _, back_sender) = message; + tracing::info!(target: LOG_TARGET, "Failing pov recovery."); + back_sender + .send(Err(RecoveryError::Unavailable)) + .expect("Return channel should work here."); + } else { + self.overseer_handle.send_msg(message, origin).await; + } + self.counter += 1; + } +} + /// Starts a `ServiceBuilder` for a full service. /// /// Use this macro if you don't actually need the full service, but just the builder in order to @@ -236,6 +276,7 @@ pub async fn start_node_impl( relay_chain_config: Configuration, para_id: ParaId, wrap_announce_block: Option AnnounceBlockFn>>, + fail_pov_recovery: bool, rpc_ext_builder: RB, consensus: Consensus, collator_options: CollatorOptions, @@ -320,6 +361,17 @@ where .unwrap_or_else(|| announce_block); let relay_chain_interface_for_closure = relay_chain_interface.clone(); + + let overseer_handle = relay_chain_interface + .overseer_handle() + .map_err(|e| sc_service::Error::Application(Box::new(e)))?; + + let recovery_handle: Box = if fail_pov_recovery { + Box::new(FailingRecoveryHandle::new(overseer_handle)) + } else { + Box::new(overseer_handle) + }; + if let Some(collator_key) = collator_key { let parachain_consensus: Box> = match consensus { Consensus::RelayChain => { @@ -374,6 +426,7 @@ where collator_key, import_queue: import_queue_service, relay_chain_slot_duration: Duration::from_secs(6), + recovery_handle, }; start_collator(params).await?; @@ -385,10 +438,8 @@ where para_id, relay_chain_interface, import_queue: import_queue_service, - // The slot duration is currently used internally only to configure - // the recovery delay of pov-recovery. We don't want to wait for too - // long on the full node to recover, so we reduce this time here. - relay_chain_slot_duration: Duration::from_millis(6), + relay_chain_slot_duration: Duration::from_secs(6), + recovery_handle, }; start_full_node(params)?; @@ -600,6 +651,7 @@ impl TestNodeBuilder { relay_chain_config, self.para_id, self.wrap_announce_block, + false, |_| Ok(jsonrpsee::RpcModule::new(())), self.consensus, collator_options, diff --git a/test/service/src/main.rs b/test/service/src/main.rs index 40deccc27de..760caece1ae 100644 --- a/test/service/src/main.rs +++ b/test/service/src/main.rs @@ -123,6 +123,9 @@ fn main() -> Result<(), sc_cli::Error> { "Is collating: {}", if config.role.is_authority() { "yes" } else { "no" } ); + if cli.fail_pov_recovery { + tracing::info!("PoV recovery failure enabled"); + } let collator_key = config.role.is_authority().then(|| CollatorPair::generate().0); @@ -141,6 +144,7 @@ fn main() -> Result<(), sc_cli::Error> { polkadot_config, parachain_id, cli.disable_block_announcements.then(wrap_announce_block), + cli.fail_pov_recovery, |_| Ok(jsonrpsee::RpcModule::new(())), consensus, collator_options, diff --git a/zombienet/tests/0002-pov_recovery.feature b/zombienet/tests/0002-pov_recovery.feature index 80524164821..108f437bd3c 100644 --- a/zombienet/tests/0002-pov_recovery.feature +++ b/zombienet/tests/0002-pov_recovery.feature @@ -12,9 +12,10 @@ bob: is up within 60 seconds charlie: is up within 60 seconds one: is up within 60 seconds two: is up within 60 seconds +eve: is up within 60 seconds -# wait 30 blocks and register parachain -validator-3: reports block height is at least 30 within 250 seconds +# wait 20 blocks and register parachain +validator-3: reports block height is at least 20 within 250 seconds validator-0: js-script ./register-para.js with "2000" within 240 seconds validator-0: parachain 2000 is registered within 300 seconds @@ -22,5 +23,6 @@ validator-0: parachain 2000 is registered within 300 seconds bob: reports block height is at least 20 within 600 seconds alice: reports block height is at least 20 within 600 seconds charlie: reports block height is at least 20 within 600 seconds -one: reports block height is at least 20 within 600 seconds -two: reports block height is at least 20 within 600 seconds +one: reports block height is at least 20 within 800 seconds +two: reports block height is at least 20 within 800 seconds +eve: reports block height is at least 20 within 800 seconds diff --git a/zombienet/tests/0002-pov_recovery.toml b/zombienet/tests/0002-pov_recovery.toml index 2e3f8629cab..0df0293e348 100644 --- a/zombienet/tests/0002-pov_recovery.toml +++ b/zombienet/tests/0002-pov_recovery.toml @@ -40,7 +40,7 @@ add_to_genesis = false validator = true # collator image = "{{COL_IMAGE}}" command = "test-parachain" - args = ["-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug", "--use-null-consensus", "--disable-block-announcements", "--bootnodes {{'bob'|zombie('multiAddress')}}", "--", "--reserved-only", "--reserved-nodes {{'ferdie'|zombie('multiAddress')}}"] + args = ["-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug,cumulus-consensus=debug", "--use-null-consensus", "--disable-block-announcements", "--bootnodes {{'bob'|zombie('multiAddress')}}", "--", "--reserved-only", "--reserved-nodes {{'ferdie'|zombie('multiAddress')}}"] # run eve as a parachain full node [[parachains.collators]] @@ -48,7 +48,15 @@ add_to_genesis = false validator = false # full node image = "{{COL_IMAGE}}" command = "test-parachain" - args = ["-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug", "--disable-block-announcements", "--bootnodes {{'bob'|zombie('multiAddress')}}","--", "--reserved-only", "--reserved-nodes {{'ferdie'|zombie('multiAddress')}}"] + args = ["-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug,cumulus-consensus=debug", "--disable-block-announcements", "--bootnodes {{'bob'|zombie('multiAddress')}}","--", "--reserved-only", "--reserved-nodes {{'ferdie'|zombie('multiAddress')}}"] + + # we fail recovery for eve from time to time to test retries + [[parachains.collators]] + name = "eve" + validator = true # collator + image = "{{COL_IMAGE}}" + command = "test-parachain" + args = ["-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug,cumulus-consensus=debug", "--fail-pov-recovery", "--use-null-consensus", "--disable-block-announcements", "--bootnodes {{'bob'|zombie('multiAddress')}}", "--", "--reserved-only", "--reserved-nodes {{'ferdie'|zombie('multiAddress')}}"] # run one as a RPC collator who does not produce blocks [[parachains.collators]] @@ -56,7 +64,7 @@ add_to_genesis = false validator = true # collator image = "{{COL_IMAGE}}" command = "test-parachain" - args = ["-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug", "--use-null-consensus", "--disable-block-announcements", "--bootnodes {{'bob'|zombie('multiAddress')}}", "--relay-chain-rpc-url {{'ferdie'|zombie('wsUri')}}", "--", "--reserved-only", "--reserved-nodes {{'ferdie'|zombie('multiAddress')}}"] + args = ["-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug,cumulus-consensus=debug", "--use-null-consensus", "--disable-block-announcements", "--bootnodes {{'bob'|zombie('multiAddress')}}", "--relay-chain-rpc-url {{'ferdie'|zombie('wsUri')}}", "--", "--reserved-only", "--reserved-nodes {{'ferdie'|zombie('multiAddress')}}"] # run two as a RPC parachain full node [[parachains.collators]] @@ -64,4 +72,4 @@ add_to_genesis = false validator = false # full node image = "{{COL_IMAGE}}" command = "test-parachain" - args = ["-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug", "--disable-block-announcements", "--bootnodes {{'bob'|zombie('multiAddress')}}", "--relay-chain-rpc-url {{'ferdie'|zombie('wsUri')}}", "--", "--reserved-only", "--reserved-nodes {{'ferdie'|zombie('multiAddress')}}"] + args = ["-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug,cumulus-consensus=debug", "--disable-block-announcements", "--bootnodes {{'bob'|zombie('multiAddress')}}", "--relay-chain-rpc-url {{'ferdie'|zombie('wsUri')}}", "--", "--reserved-only", "--reserved-nodes {{'ferdie'|zombie('multiAddress')}}"]