diff --git a/Cargo.lock b/Cargo.lock
index c869f0c8dfcf3..3602f0cac5ce9 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -7313,11 +7313,11 @@ dependencies = [
name = "sc-consensus-slots"
version = "0.9.0"
dependencies = [
+ "async-trait",
"futures 0.3.13",
"futures-timer 3.0.2",
"log",
"parity-scale-codec 2.0.1",
- "parking_lot 0.11.1",
"sc-client-api",
"sc-telemetry",
"sp-api",
diff --git a/client/consensus/aura/src/lib.rs b/client/consensus/aura/src/lib.rs
index 81c6015ac7efa..cc60406d1664b 100644
--- a/client/consensus/aura/src/lib.rs
+++ b/client/consensus/aura/src/lib.rs
@@ -35,7 +35,6 @@ use std::{
};
use futures::prelude::*;
-use parking_lot::Mutex;
use log::{debug, trace};
use codec::{Encode, Decode, Codec};
@@ -272,7 +271,7 @@ pub fn build_aura_worker
(
{
AuraWorker {
client,
- block_import: Arc::new(Mutex::new(block_import)),
+ block_import,
env: proposer_factory,
keystore,
sync_oracle,
@@ -286,7 +285,7 @@ pub fn build_aura_worker
(
struct AuraWorker {
client: Arc,
- block_import: Arc>,
+ block_import: I,
env: E,
keystore: SyncCryptoStorePtr,
sync_oracle: SO,
@@ -326,8 +325,8 @@ where
"aura"
}
- fn block_import(&self) -> Arc> {
- self.block_import.clone()
+ fn block_import(&mut self) -> &mut Self::BlockImport {
+ &mut self.block_import
}
fn epoch_data(
@@ -805,7 +804,7 @@ mod tests {
let worker = AuraWorker {
client: client.clone(),
- block_import: Arc::new(Mutex::new(client)),
+ block_import: client,
env: environ,
keystore: keystore.into(),
sync_oracle: DummyOracle.clone(),
@@ -854,7 +853,7 @@ mod tests {
let mut worker = AuraWorker {
client: client.clone(),
- block_import: Arc::new(Mutex::new(client.clone())),
+ block_import: client.clone(),
env: environ,
keystore: keystore.into(),
sync_oracle: DummyOracle.clone(),
diff --git a/client/consensus/babe/src/lib.rs b/client/consensus/babe/src/lib.rs
index db13d0f3e420a..28cfec1238e42 100644
--- a/client/consensus/babe/src/lib.rs
+++ b/client/consensus/babe/src/lib.rs
@@ -438,7 +438,7 @@ pub fn start_babe(BabeParams {
+ Sync + 'static,
Error: std::error::Error + Send + From + From + 'static,
SO: SyncOracle + Send + Sync + Clone + 'static,
- CAW: CanAuthorWith + Send + 'static,
+ CAW: CanAuthorWith + Send + Sync + 'static,
BS: BackoffAuthoringBlocksStrategy> + Send + 'static,
{
const HANDLE_BUFFER_SIZE: usize = 1024;
@@ -448,7 +448,7 @@ pub fn start_babe(BabeParams {
let worker = BabeSlotWorker {
client: client.clone(),
- block_import: Arc::new(Mutex::new(block_import)),
+ block_import,
env,
sync_oracle: sync_oracle.clone(),
force_authoring,
@@ -605,7 +605,7 @@ type SlotNotificationSinks = Arc<
struct BabeSlotWorker {
client: Arc,
- block_import: Arc>,
+ block_import: I,
env: E,
sync_oracle: SO,
force_authoring: bool,
@@ -647,8 +647,8 @@ where
"babe"
}
- fn block_import(&self) -> Arc> {
- self.block_import.clone()
+ fn block_import(&mut self) -> &mut Self::BlockImport {
+ &mut self.block_import
}
fn epoch_data(
diff --git a/client/consensus/slots/Cargo.toml b/client/consensus/slots/Cargo.toml
index 34162cfae71e2..64beea50fcf63 100644
--- a/client/consensus/slots/Cargo.toml
+++ b/client/consensus/slots/Cargo.toml
@@ -31,9 +31,9 @@ sp-inherents = { version = "3.0.0", path = "../../../primitives/inherents" }
sp-timestamp = { version = "3.0.0", path = "../../../primitives/timestamp" }
futures = "0.3.9"
futures-timer = "3.0.1"
-parking_lot = "0.11.1"
log = "0.4.11"
thiserror = "1.0.21"
+async-trait = "0.1.42"
[dev-dependencies]
substrate-test-runtime-client = { version = "2.0.0", path = "../../../test-utils/runtime/client" }
diff --git a/client/consensus/slots/src/lib.rs b/client/consensus/slots/src/lib.rs
index 83dd88a8d49ff..351ef932ada19 100644
--- a/client/consensus/slots/src/lib.rs
+++ b/client/consensus/slots/src/lib.rs
@@ -32,12 +32,11 @@ pub use slots::SlotInfo;
use slots::Slots;
pub use aux_schema::{check_equivocation, MAX_SLOT_CAPACITY, PRUNING_BOUND};
-use std::{fmt::Debug, ops::Deref, pin::Pin, sync::Arc, time::Duration};
+use std::{fmt::Debug, ops::Deref, time::Duration};
use codec::{Decode, Encode};
-use futures::{prelude::*, future::{self, Either}};
+use futures::{future::Either, Future, TryFutureExt};
use futures_timer::Delay;
use log::{debug, error, info, warn};
-use parking_lot::Mutex;
use sp_api::{ProvideRuntimeApi, ApiRef};
use sp_arithmetic::traits::BaseArithmetic;
use sp_consensus::{BlockImport, Proposer, SyncOracle, SelectChain, CanAuthorWith, SlotData};
@@ -68,21 +67,23 @@ pub struct SlotResult {
///
/// The implementation should not make any assumptions of the slot being bound to the time or
/// similar. The only valid assumption is that the slot number is always increasing.
+#[async_trait::async_trait]
pub trait SlotWorker {
/// Called when a new slot is triggered.
///
/// Returns a future that resolves to a [`SlotResult`] iff a block was successfully built in
/// the slot. Otherwise `None` is returned.
- fn on_slot(
+ async fn on_slot(
&mut self,
chain_head: B::Header,
slot_info: SlotInfo,
- ) -> Pin>> + Send>>;
+ ) -> Option>;
}
/// A skeleton implementation for `SlotWorker` which tries to claim a slot at
/// its beginning and tries to produce a block if successfully claimed, timing
/// out if block production takes too long.
+#[async_trait::async_trait]
pub trait SimpleSlotWorker {
/// A handle to a `BlockImport`.
type BlockImport: BlockImport>::Transaction>
@@ -96,7 +97,7 @@ pub trait SimpleSlotWorker {
+ Send + Unpin + 'static;
/// The type of proposer to use to build blocks.
- type Proposer: Proposer;
+ type Proposer: Proposer + Send;
/// Data associated with a slot claim.
type Claim: Send + 'static;
@@ -108,7 +109,7 @@ pub trait SimpleSlotWorker {
fn logging_target(&self) -> &'static str;
/// A handle to a `BlockImport`.
- fn block_import(&self) -> Arc>;
+ fn block_import(&mut self) -> &mut Self::BlockImport;
/// Returns the epoch data necessary for authoring. For time-dependent epochs,
/// use the provided slot number as a canonical source of time.
@@ -191,36 +192,38 @@ pub trait SimpleSlotWorker {
) -> Duration;
/// Implements [`SlotWorker::on_slot`].
- fn on_slot(
+ async fn on_slot(
&mut self,
chain_head: B::Header,
slot_info: SlotInfo,
- ) -> Pin>::Proof>>> + Send>>
- where
- >::Proposal: Unpin + Send + 'static,
- {
+ ) -> Option>::Proof>> {
let (timestamp, slot) = (slot_info.timestamp, slot_info.slot);
let telemetry = self.telemetry();
+ let logging_target = self.logging_target();
let proposing_remaining_duration = self.proposing_remaining_duration(&chain_head, &slot_info);
let proposing_remaining = if proposing_remaining_duration == Duration::default() {
debug!(
- target: self.logging_target(),
+ target: logging_target,
"Skipping proposal slot {} since there's no time left to propose",
slot,
);
- return Box::pin(future::ready(None));
+ return None
} else {
- Box::new(Delay::new(proposing_remaining_duration))
- as Box + Unpin + Send>
+ Delay::new(proposing_remaining_duration)
};
let epoch_data = match self.epoch_data(&chain_head, slot) {
Ok(epoch_data) => epoch_data,
Err(err) => {
- warn!("Unable to fetch epoch data at block {:?}: {:?}", chain_head.hash(), err);
+ warn!(
+ target: logging_target,
+ "Unable to fetch epoch data at block {:?}: {:?}",
+ chain_head.hash(),
+ err,
+ );
telemetry!(
telemetry;
@@ -230,7 +233,7 @@ pub trait SimpleSlotWorker {
"err" => ?err,
);
- return Box::pin(future::ready(None));
+ return None;
}
};
@@ -242,7 +245,7 @@ pub trait SimpleSlotWorker {
self.sync_oracle().is_offline() &&
authorities_len.map(|a| a > 1).unwrap_or(false)
{
- debug!(target: self.logging_target(), "Skipping proposal slot. Waiting for the network.");
+ debug!(target: logging_target, "Skipping proposal slot. Waiting for the network.");
telemetry!(
telemetry;
CONSENSUS_DEBUG;
@@ -250,16 +253,16 @@ pub trait SimpleSlotWorker {
"authorities_len" => authorities_len,
);
- return Box::pin(future::ready(None));
+ return None;
}
let claim = match self.claim_slot(&chain_head, slot, &epoch_data) {
- None => return Box::pin(future::ready(None)),
+ None => return None,
Some(claim) => claim,
};
if self.should_backoff(slot, &chain_head) {
- return Box::pin(future::ready(None));
+ return None;
}
debug!(
@@ -277,10 +280,15 @@ pub trait SimpleSlotWorker {
"timestamp" => *timestamp,
);
- let awaiting_proposer = {
- let telemetry = telemetry.clone();
- self.proposer(&chain_head).map_err(move |err| {
- warn!("Unable to author block in slot {:?}: {:?}", slot, err);
+ let proposer = match self.proposer(&chain_head).await {
+ Ok(p) => p,
+ Err(err) => {
+ warn!(
+ target: logging_target,
+ "Unable to author block in slot {:?}: {:?}",
+ slot,
+ err,
+ );
telemetry!(
telemetry;
@@ -290,8 +298,8 @@ pub trait SimpleSlotWorker {
"err" => ?err
);
- err
- })
+ return None
+ }
};
let logs = self.pre_digest_data(slot, &claim);
@@ -299,106 +307,126 @@ pub trait SimpleSlotWorker {
// deadline our production to 98% of the total time left for proposing. As we deadline
// the proposing below to the same total time left, the 2% margin should be enough for
// the result to be returned.
- let proposing = awaiting_proposer.and_then(move |proposer| proposer.propose(
+ let proposing = proposer.propose(
slot_info.inherent_data,
sp_runtime::generic::Digest {
logs,
},
proposing_remaining_duration.mul_f32(0.98),
- ).map_err(|e| sp_consensus::Error::ClientImport(format!("{:?}", e))));
-
- let proposal_work = {
- let telemetry = telemetry.clone();
- futures::future::select(proposing, proposing_remaining).map(move |v| match v {
- Either::Left((b, _)) => b.map(|b| (b, claim)),
- Either::Right(_) => {
- info!(
- "⌛️ Discarding proposal for slot {}; block production took too long",
- slot,
- );
- // If the node was compiled with debug, tell the user to use release optimizations.
- #[cfg(build_type="debug")]
- info!("👉 Recompile your node in `--release` mode to mitigate this problem.");
- telemetry!(
- telemetry;
- CONSENSUS_INFO;
- "slots.discarding_proposal_took_too_long";
- "slot" => *slot,
- );
+ ).map_err(|e| sp_consensus::Error::ClientImport(format!("{:?}", e)));
- Err(sp_consensus::Error::ClientImport("Timeout in the Slots proposer".into()))
- },
- })
+ let proposal = match futures::future::select(proposing, proposing_remaining).await {
+ Either::Left((Ok(p), _)) => p,
+ Either::Left((Err(err), _)) => {
+ warn!(
+ target: logging_target,
+ "Proposing failed: {:?}",
+ err,
+ );
+
+ return None
+ },
+ Either::Right(_) => {
+ info!(
+ target: logging_target,
+ "⌛️ Discarding proposal for slot {}; block production took too long",
+ slot,
+ );
+ // If the node was compiled with debug, tell the user to use release optimizations.
+ #[cfg(build_type="debug")]
+ info!(
+ target: logging_target,
+ "👉 Recompile your node in `--release` mode to mitigate this problem.",
+ );
+ telemetry!(
+ telemetry;
+ CONSENSUS_INFO;
+ "slots.discarding_proposal_took_too_long";
+ "slot" => *slot,
+ );
+
+ return None
+ },
};
let block_import_params_maker = self.block_import_params();
let block_import = self.block_import();
- let logging_target = self.logging_target();
-
- proposal_work.and_then(move |(proposal, claim)| async move {
- let (block, storage_proof) = (proposal.block, proposal.proof);
- let (header, body) = block.deconstruct();
- let header_num = *header.number();
- let header_hash = header.hash();
- let parent_hash = *header.parent_hash();
-
- let block_import_params = block_import_params_maker(
- header,
- &header_hash,
- body.clone(),
- proposal.storage_changes,
- claim,
- epoch_data,
- )?;
-
- info!(
- "🔖 Pre-sealed block for proposal at {}. Hash now {:?}, previously {:?}.",
- header_num,
- block_import_params.post_hash(),
- header_hash,
- );
- telemetry!(
- telemetry;
- CONSENSUS_INFO;
- "slots.pre_sealed_block";
- "header_num" => ?header_num,
- "hash_now" => ?block_import_params.post_hash(),
- "hash_previously" => ?header_hash,
- );
-
- let header = block_import_params.post_header();
- if let Err(err) = block_import.lock().import_block(block_import_params, Default::default()) {
+ let (block, storage_proof) = (proposal.block, proposal.proof);
+ let (header, body) = block.deconstruct();
+ let header_num = *header.number();
+ let header_hash = header.hash();
+ let parent_hash = *header.parent_hash();
+
+ let block_import_params = match block_import_params_maker(
+ header,
+ &header_hash,
+ body.clone(),
+ proposal.storage_changes,
+ claim,
+ epoch_data,
+ ) {
+ Ok(bi) => bi,
+ Err(err) => {
warn!(
target: logging_target,
- "Error with block built on {:?}: {:?}",
- parent_hash,
+ "Failed to create block import params: {:?}",
err,
);
- telemetry!(
- telemetry;
- CONSENSUS_WARN;
- "slots.err_with_block_built_on";
- "hash" => ?parent_hash,
- "err" => ?err,
- );
+ return None
}
+ };
+
+ info!(
+ target: logging_target,
+ "🔖 Pre-sealed block for proposal at {}. Hash now {:?}, previously {:?}.",
+ header_num,
+ block_import_params.post_hash(),
+ header_hash,
+ );
- Ok(SlotResult { block: B::new(header, body), storage_proof })
- }).then(|r| async move {
- r.map_err(|e| warn!(target: "slots", "Encountered consensus error: {:?}", e)).ok()
- }).boxed()
+ telemetry!(
+ telemetry;
+ CONSENSUS_INFO;
+ "slots.pre_sealed_block";
+ "header_num" => ?header_num,
+ "hash_now" => ?block_import_params.post_hash(),
+ "hash_previously" => ?header_hash,
+ );
+
+ let header = block_import_params.post_header();
+ if let Err(err) = block_import
+ .import_block(block_import_params, Default::default())
+ {
+ warn!(
+ target: logging_target,
+ "Error with block built on {:?}: {:?}",
+ parent_hash,
+ err,
+ );
+
+ telemetry!(
+ telemetry;
+ CONSENSUS_WARN;
+ "slots.err_with_block_built_on";
+ "hash" => ?parent_hash,
+ "err" => ?err,
+ );
+ }
+
+ Some(SlotResult { block: B::new(header, body), storage_proof })
}
}
-impl> SlotWorker>::Proof> for T {
- fn on_slot(
+#[async_trait::async_trait]
+impl + Send> SlotWorker>::Proof> for T {
+ async fn on_slot(
&mut self,
chain_head: B::Header,
slot_info: SlotInfo,
- ) -> Pin>::Proof>>> + Send>> {
- SimpleSlotWorker::on_slot(self, chain_head, slot_info)
+ ) -> Option>::Proof>> {
+ SimpleSlotWorker::on_slot(self, chain_head, slot_info).await
}
}
@@ -436,25 +464,39 @@ where
let SlotDuration(slot_duration) = slot_duration;
// rather than use a timer interval, we schedule our waits ourselves
- Slots::::new(
+ let mut slots = Slots::::new(
slot_duration.slot_duration(),
inherent_data_providers,
timestamp_extractor,
- ).inspect_err(|e| debug!(target: "slots", "Faulty timer: {:?}", e))
- .try_for_each(move |slot_info| {
+ );
+
+ async move {
+ loop {
+ let slot_info = match slots.next_slot().await {
+ Ok(slot) => slot,
+ Err(err) => {
+ debug!(target: "slots", "Faulty timer: {:?}", err);
+ return
+ },
+ };
+
// only propose when we are not syncing.
if sync_oracle.is_major_syncing() {
debug!(target: "slots", "Skipping proposal slot due to sync.");
- return Either::Right(future::ready(Ok(())));
+ continue;
}
let slot = slot_info.slot;
let chain_head = match client.best_chain() {
Ok(x) => x,
Err(e) => {
- warn!(target: "slots", "Unable to author block in slot {}. \
- no best block header: {:?}", slot, e);
- return Either::Right(future::ready(Ok(())));
+ warn!(
+ target: "slots",
+ "Unable to author block in slot {}. No best block header: {:?}",
+ slot,
+ e,
+ );
+ continue;
}
};
@@ -466,19 +508,11 @@ where
slot,
err,
);
- Either::Right(future::ready(Ok(())))
} else {
- Either::Left(
- worker.on_slot(chain_head, slot_info)
- .then(|_| future::ready(Ok(())))
- )
- }
- }).then(|res| {
- if let Err(err) = res {
- warn!(target: "slots", "Slots stream terminated with an error: {:?}", err);
+ worker.on_slot(chain_head, slot_info).await;
}
- future::ready(())
- })
+ }
+ }
}
/// A header which has been checked
diff --git a/client/consensus/slots/src/slots.rs b/client/consensus/slots/src/slots.rs
index 1cf7c30b9ed9e..d7ed1eda64c09 100644
--- a/client/consensus/slots/src/slots.rs
+++ b/client/consensus/slots/src/slots.rs
@@ -22,10 +22,9 @@
use super::{SlotCompatible, Slot};
use sp_consensus::Error;
-use futures::{prelude::*, task::Context, task::Poll};
use sp_inherents::{InherentData, InherentDataProviders};
-use std::{pin::Pin, time::{Duration, Instant}};
+use std::time::{Duration, Instant};
use futures_timer::Delay;
/// Returns current duration since unix epoch.
@@ -107,57 +106,49 @@ impl Slots {
}
}
-impl Stream for Slots {
- type Item = Result;
-
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll