From 56d92e2d30675d73fe85e5c4f6862562cb49a367 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Fri, 26 Mar 2021 13:48:34 +0100 Subject: [PATCH 1/2] Use `async_trait` in sc-consensus-slots This makes the code a little bit easier to read and also expresses that there can always only be one call at a time to `on_slot`. --- Cargo.lock | 1 + client/consensus/babe/src/lib.rs | 2 +- client/consensus/slots/Cargo.toml | 1 + client/consensus/slots/src/lib.rs | 276 ++++++++++++++++------------ client/consensus/slots/src/slots.rs | 33 ++-- 5 files changed, 171 insertions(+), 142 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c869f0c8dfcf3..d41c255e3a8d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7313,6 +7313,7 @@ dependencies = [ name = "sc-consensus-slots" version = "0.9.0" dependencies = [ + "async-trait", "futures 0.3.13", "futures-timer 3.0.2", "log", diff --git a/client/consensus/babe/src/lib.rs b/client/consensus/babe/src/lib.rs index db13d0f3e420a..14ac2ebbcf2a4 100644 --- a/client/consensus/babe/src/lib.rs +++ b/client/consensus/babe/src/lib.rs @@ -438,7 +438,7 @@ pub fn start_babe(BabeParams { + Sync + 'static, Error: std::error::Error + Send + From + From + 'static, SO: SyncOracle + Send + Sync + Clone + 'static, - CAW: CanAuthorWith + Send + 'static, + CAW: CanAuthorWith + Send + Sync + 'static, BS: BackoffAuthoringBlocksStrategy> + Send + 'static, { const HANDLE_BUFFER_SIZE: usize = 1024; diff --git a/client/consensus/slots/Cargo.toml b/client/consensus/slots/Cargo.toml index 34162cfae71e2..623c4c4abd84c 100644 --- a/client/consensus/slots/Cargo.toml +++ b/client/consensus/slots/Cargo.toml @@ -34,6 +34,7 @@ futures-timer = "3.0.1" parking_lot = "0.11.1" log = "0.4.11" thiserror = "1.0.21" +async-trait = "0.1.42" [dev-dependencies] substrate-test-runtime-client = { version = "2.0.0", path = "../../../test-utils/runtime/client" } diff --git a/client/consensus/slots/src/lib.rs b/client/consensus/slots/src/lib.rs index 83dd88a8d49ff..dbf62ccad23c8 100644 --- a/client/consensus/slots/src/lib.rs +++ b/client/consensus/slots/src/lib.rs @@ -32,9 +32,9 @@ pub use slots::SlotInfo; use slots::Slots; pub use aux_schema::{check_equivocation, MAX_SLOT_CAPACITY, PRUNING_BOUND}; -use std::{fmt::Debug, ops::Deref, pin::Pin, sync::Arc, time::Duration}; +use std::{fmt::Debug, ops::Deref, sync::Arc, time::Duration}; use codec::{Decode, Encode}; -use futures::{prelude::*, future::{self, Either}}; +use futures::{future::Either, Future, TryFutureExt}; use futures_timer::Delay; use log::{debug, error, info, warn}; use parking_lot::Mutex; @@ -68,21 +68,23 @@ pub struct SlotResult { /// /// The implementation should not make any assumptions of the slot being bound to the time or /// similar. The only valid assumption is that the slot number is always increasing. +#[async_trait::async_trait] pub trait SlotWorker { /// Called when a new slot is triggered. /// /// Returns a future that resolves to a [`SlotResult`] iff a block was successfully built in /// the slot. Otherwise `None` is returned. - fn on_slot( + async fn on_slot( &mut self, chain_head: B::Header, slot_info: SlotInfo, - ) -> Pin>> + Send>>; + ) -> Option>; } /// A skeleton implementation for `SlotWorker` which tries to claim a slot at /// its beginning and tries to produce a block if successfully claimed, timing /// out if block production takes too long. +#[async_trait::async_trait] pub trait SimpleSlotWorker { /// A handle to a `BlockImport`. type BlockImport: BlockImport>::Transaction> @@ -96,7 +98,7 @@ pub trait SimpleSlotWorker { + Send + Unpin + 'static; /// The type of proposer to use to build blocks. - type Proposer: Proposer; + type Proposer: Proposer + Send; /// Data associated with a slot claim. type Claim: Send + 'static; @@ -191,36 +193,38 @@ pub trait SimpleSlotWorker { ) -> Duration; /// Implements [`SlotWorker::on_slot`]. - fn on_slot( + async fn on_slot( &mut self, chain_head: B::Header, slot_info: SlotInfo, - ) -> Pin>::Proof>>> + Send>> - where - >::Proposal: Unpin + Send + 'static, - { + ) -> Option>::Proof>> { let (timestamp, slot) = (slot_info.timestamp, slot_info.slot); let telemetry = self.telemetry(); + let logging_target = self.logging_target(); let proposing_remaining_duration = self.proposing_remaining_duration(&chain_head, &slot_info); let proposing_remaining = if proposing_remaining_duration == Duration::default() { debug!( - target: self.logging_target(), + target: logging_target, "Skipping proposal slot {} since there's no time left to propose", slot, ); - return Box::pin(future::ready(None)); + return None } else { - Box::new(Delay::new(proposing_remaining_duration)) - as Box + Unpin + Send> + Delay::new(proposing_remaining_duration) }; let epoch_data = match self.epoch_data(&chain_head, slot) { Ok(epoch_data) => epoch_data, Err(err) => { - warn!("Unable to fetch epoch data at block {:?}: {:?}", chain_head.hash(), err); + warn!( + target: logging_target, + "Unable to fetch epoch data at block {:?}: {:?}", + chain_head.hash(), + err, + ); telemetry!( telemetry; @@ -230,7 +234,7 @@ pub trait SimpleSlotWorker { "err" => ?err, ); - return Box::pin(future::ready(None)); + return None; } }; @@ -242,7 +246,7 @@ pub trait SimpleSlotWorker { self.sync_oracle().is_offline() && authorities_len.map(|a| a > 1).unwrap_or(false) { - debug!(target: self.logging_target(), "Skipping proposal slot. Waiting for the network."); + debug!(target: logging_target, "Skipping proposal slot. Waiting for the network."); telemetry!( telemetry; CONSENSUS_DEBUG; @@ -250,16 +254,16 @@ pub trait SimpleSlotWorker { "authorities_len" => authorities_len, ); - return Box::pin(future::ready(None)); + return None; } let claim = match self.claim_slot(&chain_head, slot, &epoch_data) { - None => return Box::pin(future::ready(None)), + None => return None, Some(claim) => claim, }; if self.should_backoff(slot, &chain_head) { - return Box::pin(future::ready(None)); + return None; } debug!( @@ -277,10 +281,15 @@ pub trait SimpleSlotWorker { "timestamp" => *timestamp, ); - let awaiting_proposer = { - let telemetry = telemetry.clone(); - self.proposer(&chain_head).map_err(move |err| { - warn!("Unable to author block in slot {:?}: {:?}", slot, err); + let proposer = match self.proposer(&chain_head).await { + Ok(p) => p, + Err(err) => { + warn!( + target: logging_target, + "Unable to author block in slot {:?}: {:?}", + slot, + err, + ); telemetry!( telemetry; @@ -290,8 +299,8 @@ pub trait SimpleSlotWorker { "err" => ?err ); - err - }) + return None + } }; let logs = self.pre_digest_data(slot, &claim); @@ -299,106 +308,127 @@ pub trait SimpleSlotWorker { // deadline our production to 98% of the total time left for proposing. As we deadline // the proposing below to the same total time left, the 2% margin should be enough for // the result to be returned. - let proposing = awaiting_proposer.and_then(move |proposer| proposer.propose( + let proposing = proposer.propose( slot_info.inherent_data, sp_runtime::generic::Digest { logs, }, proposing_remaining_duration.mul_f32(0.98), - ).map_err(|e| sp_consensus::Error::ClientImport(format!("{:?}", e)))); - - let proposal_work = { - let telemetry = telemetry.clone(); - futures::future::select(proposing, proposing_remaining).map(move |v| match v { - Either::Left((b, _)) => b.map(|b| (b, claim)), - Either::Right(_) => { - info!( - "⌛️ Discarding proposal for slot {}; block production took too long", - slot, - ); - // If the node was compiled with debug, tell the user to use release optimizations. - #[cfg(build_type="debug")] - info!("👉 Recompile your node in `--release` mode to mitigate this problem."); - telemetry!( - telemetry; - CONSENSUS_INFO; - "slots.discarding_proposal_took_too_long"; - "slot" => *slot, - ); + ).map_err(|e| sp_consensus::Error::ClientImport(format!("{:?}", e))); - Err(sp_consensus::Error::ClientImport("Timeout in the Slots proposer".into())) - }, - }) + let proposal = match futures::future::select(proposing, proposing_remaining).await { + Either::Left((Ok(p), _)) => p, + Either::Left((Err(err), _)) => { + warn!( + target: logging_target, + "Proposing failed: {:?}", + err, + ); + + return None + }, + Either::Right(_) => { + info!( + target: logging_target, + "⌛️ Discarding proposal for slot {}; block production took too long", + slot, + ); + // If the node was compiled with debug, tell the user to use release optimizations. + #[cfg(build_type="debug")] + info!( + target: logging_target, + "👉 Recompile your node in `--release` mode to mitigate this problem.", + ); + telemetry!( + telemetry; + CONSENSUS_INFO; + "slots.discarding_proposal_took_too_long"; + "slot" => *slot, + ); + + return None + }, }; let block_import_params_maker = self.block_import_params(); let block_import = self.block_import(); - let logging_target = self.logging_target(); - - proposal_work.and_then(move |(proposal, claim)| async move { - let (block, storage_proof) = (proposal.block, proposal.proof); - let (header, body) = block.deconstruct(); - let header_num = *header.number(); - let header_hash = header.hash(); - let parent_hash = *header.parent_hash(); - - let block_import_params = block_import_params_maker( - header, - &header_hash, - body.clone(), - proposal.storage_changes, - claim, - epoch_data, - )?; - - info!( - "🔖 Pre-sealed block for proposal at {}. Hash now {:?}, previously {:?}.", - header_num, - block_import_params.post_hash(), - header_hash, - ); - telemetry!( - telemetry; - CONSENSUS_INFO; - "slots.pre_sealed_block"; - "header_num" => ?header_num, - "hash_now" => ?block_import_params.post_hash(), - "hash_previously" => ?header_hash, - ); - - let header = block_import_params.post_header(); - if let Err(err) = block_import.lock().import_block(block_import_params, Default::default()) { + let (block, storage_proof) = (proposal.block, proposal.proof); + let (header, body) = block.deconstruct(); + let header_num = *header.number(); + let header_hash = header.hash(); + let parent_hash = *header.parent_hash(); + + let block_import_params = match block_import_params_maker( + header, + &header_hash, + body.clone(), + proposal.storage_changes, + claim, + epoch_data, + ) { + Ok(bi) => bi, + Err(err) => { warn!( target: logging_target, - "Error with block built on {:?}: {:?}", - parent_hash, + "Failed to create block import params: {:?}", err, ); - telemetry!( - telemetry; - CONSENSUS_WARN; - "slots.err_with_block_built_on"; - "hash" => ?parent_hash, - "err" => ?err, - ); + return None } + }; + + info!( + target: logging_target, + "🔖 Pre-sealed block for proposal at {}. Hash now {:?}, previously {:?}.", + header_num, + block_import_params.post_hash(), + header_hash, + ); - Ok(SlotResult { block: B::new(header, body), storage_proof }) - }).then(|r| async move { - r.map_err(|e| warn!(target: "slots", "Encountered consensus error: {:?}", e)).ok() - }).boxed() + telemetry!( + telemetry; + CONSENSUS_INFO; + "slots.pre_sealed_block"; + "header_num" => ?header_num, + "hash_now" => ?block_import_params.post_hash(), + "hash_previously" => ?header_hash, + ); + + let header = block_import_params.post_header(); + if let Err(err) = block_import + .lock() + .import_block(block_import_params, Default::default()) + { + warn!( + target: logging_target, + "Error with block built on {:?}: {:?}", + parent_hash, + err, + ); + + telemetry!( + telemetry; + CONSENSUS_WARN; + "slots.err_with_block_built_on"; + "hash" => ?parent_hash, + "err" => ?err, + ); + } + + Some(SlotResult { block: B::new(header, body), storage_proof }) } } -impl> SlotWorker>::Proof> for T { - fn on_slot( +#[async_trait::async_trait] +impl + Send> SlotWorker>::Proof> for T { + async fn on_slot( &mut self, chain_head: B::Header, slot_info: SlotInfo, - ) -> Pin>::Proof>>> + Send>> { - SimpleSlotWorker::on_slot(self, chain_head, slot_info) + ) -> Option>::Proof>> { + SimpleSlotWorker::on_slot(self, chain_head, slot_info).await } } @@ -436,25 +466,39 @@ where let SlotDuration(slot_duration) = slot_duration; // rather than use a timer interval, we schedule our waits ourselves - Slots::::new( + let mut slots = Slots::::new( slot_duration.slot_duration(), inherent_data_providers, timestamp_extractor, - ).inspect_err(|e| debug!(target: "slots", "Faulty timer: {:?}", e)) - .try_for_each(move |slot_info| { + ); + + async move { + loop { + let slot_info = match slots.next_slot().await { + Ok(slot) => slot, + Err(err) => { + debug!(target: "slots", "Faulty timer: {:?}", err); + return + }, + }; + // only propose when we are not syncing. if sync_oracle.is_major_syncing() { debug!(target: "slots", "Skipping proposal slot due to sync."); - return Either::Right(future::ready(Ok(()))); + continue; } let slot = slot_info.slot; let chain_head = match client.best_chain() { Ok(x) => x, Err(e) => { - warn!(target: "slots", "Unable to author block in slot {}. \ - no best block header: {:?}", slot, e); - return Either::Right(future::ready(Ok(()))); + warn!( + target: "slots", + "Unable to author block in slot {}. No best block header: {:?}", + slot, + e, + ); + continue; } }; @@ -466,19 +510,11 @@ where slot, err, ); - Either::Right(future::ready(Ok(()))) } else { - Either::Left( - worker.on_slot(chain_head, slot_info) - .then(|_| future::ready(Ok(()))) - ) - } - }).then(|res| { - if let Err(err) = res { - warn!(target: "slots", "Slots stream terminated with an error: {:?}", err); + worker.on_slot(chain_head, slot_info).await; } - future::ready(()) - }) + } + } } /// A header which has been checked diff --git a/client/consensus/slots/src/slots.rs b/client/consensus/slots/src/slots.rs index 1cf7c30b9ed9e..d7ed1eda64c09 100644 --- a/client/consensus/slots/src/slots.rs +++ b/client/consensus/slots/src/slots.rs @@ -22,10 +22,9 @@ use super::{SlotCompatible, Slot}; use sp_consensus::Error; -use futures::{prelude::*, task::Context, task::Poll}; use sp_inherents::{InherentData, InherentDataProviders}; -use std::{pin::Pin, time::{Duration, Instant}}; +use std::time::{Duration, Instant}; use futures_timer::Delay; /// Returns current duration since unix epoch. @@ -107,57 +106,49 @@ impl Slots { } } -impl Stream for Slots { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { +impl Slots { + /// Returns a future that fires when the next slot starts. + pub async fn next_slot(&mut self) -> Result { loop { - let slot_duration = self.slot_duration; self.inner_delay = match self.inner_delay.take() { None => { // schedule wait. - let wait_dur = time_until_next(duration_now(), slot_duration); + let wait_dur = time_until_next(duration_now(), self.slot_duration); Some(Delay::new(wait_dur)) } Some(d) => Some(d), }; - if let Some(ref mut inner_delay) = self.inner_delay { - match Future::poll(Pin::new(inner_delay), cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(()) => {} - } + if let Some(inner_delay) = self.inner_delay.take() { + inner_delay.await; } - // timeout has fired. let inherent_data = match self.inherent_data_providers.create_inherent_data() { Ok(id) => id, - Err(err) => return Poll::Ready(Some(Err(sp_consensus::Error::InherentData(err)))), + Err(err) => return Err(sp_consensus::Error::InherentData(err)), }; let result = self.timestamp_extractor.extract_timestamp_and_slot(&inherent_data); let (timestamp, slot, offset) = match result { Ok(v) => v, - Err(err) => return Poll::Ready(Some(Err(err))), + Err(err) => return Err(err), }; // reschedule delay for next slot. let ends_in = offset + - time_until_next(timestamp.as_duration(), slot_duration); + time_until_next(timestamp.as_duration(), self.slot_duration); self.inner_delay = Some(Delay::new(ends_in)); // never yield the same slot twice. if slot > self.last_slot { self.last_slot = slot; - break Poll::Ready(Some(Ok(SlotInfo::new( + break Ok(SlotInfo::new( slot, timestamp, inherent_data, self.slot_duration, - )))) + )) } } } } - -impl Unpin for Slots {} From a35415d1471e4a3be4c05d7a951eff5d1147309d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Sat, 27 Mar 2021 13:03:12 +0000 Subject: [PATCH 2/2] slots: remove mutex around BlockImport in SlotWorker --- Cargo.lock | 1 - client/consensus/aura/src/lib.rs | 13 ++++++------- client/consensus/babe/src/lib.rs | 8 ++++---- client/consensus/slots/Cargo.toml | 1 - client/consensus/slots/src/lib.rs | 6 ++---- 5 files changed, 12 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d41c255e3a8d7..3602f0cac5ce9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7318,7 +7318,6 @@ dependencies = [ "futures-timer 3.0.2", "log", "parity-scale-codec 2.0.1", - "parking_lot 0.11.1", "sc-client-api", "sc-telemetry", "sp-api", diff --git a/client/consensus/aura/src/lib.rs b/client/consensus/aura/src/lib.rs index 81c6015ac7efa..cc60406d1664b 100644 --- a/client/consensus/aura/src/lib.rs +++ b/client/consensus/aura/src/lib.rs @@ -35,7 +35,6 @@ use std::{ }; use futures::prelude::*; -use parking_lot::Mutex; use log::{debug, trace}; use codec::{Encode, Decode, Codec}; @@ -272,7 +271,7 @@ pub fn build_aura_worker( { AuraWorker { client, - block_import: Arc::new(Mutex::new(block_import)), + block_import, env: proposer_factory, keystore, sync_oracle, @@ -286,7 +285,7 @@ pub fn build_aura_worker( struct AuraWorker { client: Arc, - block_import: Arc>, + block_import: I, env: E, keystore: SyncCryptoStorePtr, sync_oracle: SO, @@ -326,8 +325,8 @@ where "aura" } - fn block_import(&self) -> Arc> { - self.block_import.clone() + fn block_import(&mut self) -> &mut Self::BlockImport { + &mut self.block_import } fn epoch_data( @@ -805,7 +804,7 @@ mod tests { let worker = AuraWorker { client: client.clone(), - block_import: Arc::new(Mutex::new(client)), + block_import: client, env: environ, keystore: keystore.into(), sync_oracle: DummyOracle.clone(), @@ -854,7 +853,7 @@ mod tests { let mut worker = AuraWorker { client: client.clone(), - block_import: Arc::new(Mutex::new(client.clone())), + block_import: client.clone(), env: environ, keystore: keystore.into(), sync_oracle: DummyOracle.clone(), diff --git a/client/consensus/babe/src/lib.rs b/client/consensus/babe/src/lib.rs index 14ac2ebbcf2a4..28cfec1238e42 100644 --- a/client/consensus/babe/src/lib.rs +++ b/client/consensus/babe/src/lib.rs @@ -448,7 +448,7 @@ pub fn start_babe(BabeParams { let worker = BabeSlotWorker { client: client.clone(), - block_import: Arc::new(Mutex::new(block_import)), + block_import, env, sync_oracle: sync_oracle.clone(), force_authoring, @@ -605,7 +605,7 @@ type SlotNotificationSinks = Arc< struct BabeSlotWorker { client: Arc, - block_import: Arc>, + block_import: I, env: E, sync_oracle: SO, force_authoring: bool, @@ -647,8 +647,8 @@ where "babe" } - fn block_import(&self) -> Arc> { - self.block_import.clone() + fn block_import(&mut self) -> &mut Self::BlockImport { + &mut self.block_import } fn epoch_data( diff --git a/client/consensus/slots/Cargo.toml b/client/consensus/slots/Cargo.toml index 623c4c4abd84c..64beea50fcf63 100644 --- a/client/consensus/slots/Cargo.toml +++ b/client/consensus/slots/Cargo.toml @@ -31,7 +31,6 @@ sp-inherents = { version = "3.0.0", path = "../../../primitives/inherents" } sp-timestamp = { version = "3.0.0", path = "../../../primitives/timestamp" } futures = "0.3.9" futures-timer = "3.0.1" -parking_lot = "0.11.1" log = "0.4.11" thiserror = "1.0.21" async-trait = "0.1.42" diff --git a/client/consensus/slots/src/lib.rs b/client/consensus/slots/src/lib.rs index dbf62ccad23c8..351ef932ada19 100644 --- a/client/consensus/slots/src/lib.rs +++ b/client/consensus/slots/src/lib.rs @@ -32,12 +32,11 @@ pub use slots::SlotInfo; use slots::Slots; pub use aux_schema::{check_equivocation, MAX_SLOT_CAPACITY, PRUNING_BOUND}; -use std::{fmt::Debug, ops::Deref, sync::Arc, time::Duration}; +use std::{fmt::Debug, ops::Deref, time::Duration}; use codec::{Decode, Encode}; use futures::{future::Either, Future, TryFutureExt}; use futures_timer::Delay; use log::{debug, error, info, warn}; -use parking_lot::Mutex; use sp_api::{ProvideRuntimeApi, ApiRef}; use sp_arithmetic::traits::BaseArithmetic; use sp_consensus::{BlockImport, Proposer, SyncOracle, SelectChain, CanAuthorWith, SlotData}; @@ -110,7 +109,7 @@ pub trait SimpleSlotWorker { fn logging_target(&self) -> &'static str; /// A handle to a `BlockImport`. - fn block_import(&self) -> Arc>; + fn block_import(&mut self) -> &mut Self::BlockImport; /// Returns the epoch data necessary for authoring. For time-dependent epochs, /// use the provided slot number as a canonical source of time. @@ -398,7 +397,6 @@ pub trait SimpleSlotWorker { let header = block_import_params.post_header(); if let Err(err) = block_import - .lock() .import_block(block_import_params, Default::default()) { warn!(