From 098aaacdea061b09eed9b74a590178e4d854fa73 Mon Sep 17 00:00:00 2001 From: maciejnems Date: Fri, 30 Dec 2022 15:45:29 +0100 Subject: [PATCH 1/4] add substrate chain state notifier --- finality-aleph/src/sync/mod.rs | 6 +- .../sync/{substrate.rs => substrate/mod.rs} | 2 + .../src/sync/substrate/state_notifier.rs | 81 +++++++++++++++++++ 3 files changed, 88 insertions(+), 1 deletion(-) rename finality-aleph/src/sync/{substrate.rs => substrate/mod.rs} (98%) create mode 100644 finality-aleph/src/sync/substrate/state_notifier.rs diff --git a/finality-aleph/src/sync/mod.rs b/finality-aleph/src/sync/mod.rs index 9907ec1ca9..2526963bcf 100644 --- a/finality-aleph/src/sync/mod.rs +++ b/finality-aleph/src/sync/mod.rs @@ -6,6 +6,8 @@ use std::{ mod substrate; mod ticker; +const LOG_TARGET: &str = "aleph-block-sync"; + /// The identifier of a block, the least amount of knowledge we can have about a block. pub trait BlockIdentifier: Clone + Hash + Debug + Eq { /// The block number, useful when reasoning about hopeless forks. @@ -70,8 +72,10 @@ pub enum ChainStateNotification { /// A stream of notifications about the chain state in the database changing. #[async_trait::async_trait] pub trait ChainStateNotifier { + type Error: Display; + /// Returns a chain state notification when it is available. - async fn next(&self) -> ChainStateNotification; + async fn next(&mut self) -> Result, Self::Error>; } /// The state of a block in the database. diff --git a/finality-aleph/src/sync/substrate.rs b/finality-aleph/src/sync/substrate/mod.rs similarity index 98% rename from finality-aleph/src/sync/substrate.rs rename to finality-aleph/src/sync/substrate/mod.rs index 5516f80ba2..1088277891 100644 --- a/finality-aleph/src/sync/substrate.rs +++ b/finality-aleph/src/sync/substrate/mod.rs @@ -5,6 +5,8 @@ use sp_runtime::traits::{CheckedSub, Header as SubstrateHeader, One}; use crate::sync::{BlockIdentifier, Header}; +mod state_notifier; + #[derive(Clone, Debug, PartialEq, Eq)] pub struct BlockId> { hash: H::Hash, diff --git a/finality-aleph/src/sync/substrate/state_notifier.rs b/finality-aleph/src/sync/substrate/state_notifier.rs new file mode 100644 index 0000000000..d791162a49 --- /dev/null +++ b/finality-aleph/src/sync/substrate/state_notifier.rs @@ -0,0 +1,81 @@ +use std::fmt::{Display, Error as FmtError, Formatter}; + +use aleph_primitives::BlockNumber; +use futures::StreamExt; +use sc_client_api::client::{FinalityNotifications, ImportNotifications}; +use sp_runtime::traits::{Block as BlockT, Header as SubstrateHeader}; +use tokio::select; + +use crate::sync::{substrate::BlockId, ChainStateNotification, ChainStateNotifier, Header}; + +/// What can go wrong when waiting for next chain state notification. +#[derive(Debug)] +pub enum Error { + JustificationStream, + ImportStream, +} + +impl Display for Error { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { + use Error::*; + match self { + JustificationStream => { + write!(f, "finalization notification stream has ended") + } + ImportStream => { + write!(f, "import notification stream has ended") + } + } + } +} + +/// Substrate specific implementation of `ChainStateNotifier`. +pub struct SubstrateChainStateNotifier +where + H: SubstrateHeader, + B: BlockT
, +{ + finality_notifications: FinalityNotifications, + import_notifications: ImportNotifications, +} + +impl SubstrateChainStateNotifier +where + H: SubstrateHeader, + B: BlockT
, +{ + fn new( + finality_notifications: FinalityNotifications, + import_notifications: ImportNotifications, + ) -> Self { + Self { + finality_notifications, + import_notifications, + } + } +} + +#[async_trait::async_trait] +impl ChainStateNotifier> for SubstrateChainStateNotifier +where + H: SubstrateHeader, + B: BlockT
, +{ + type Error = Error; + + /// Returns next chain state notification. + async fn next(&mut self) -> Result>, Self::Error> { + select! { + maybe_block = self.finality_notifications.next() => { + maybe_block + .map(|block| ChainStateNotification::BlockFinalized(block.header.id())) + .ok_or(Error::JustificationStream) + }, + maybe_block = self.import_notifications.next() => { + maybe_block + .map(|block| ChainStateNotification::BlockImported(block.header.id())) + .ok_or(Error::ImportStream) + } + } + } +} From ccfd30abf1646c1f5fe86c385f824a8c123f9e6a Mon Sep 17 00:00:00 2001 From: maciejnems Date: Fri, 30 Dec 2022 16:46:58 +0100 Subject: [PATCH 2/4] rename from state to status --- finality-aleph/src/sync/mod.rs | 24 +++++++++---------- finality-aleph/src/sync/substrate/mod.rs | 2 +- .../{state_notifier.rs => status_notifier.rs} | 20 ++++++++-------- 3 files changed, 23 insertions(+), 23 deletions(-) rename finality-aleph/src/sync/substrate/{state_notifier.rs => status_notifier.rs} (70%) diff --git a/finality-aleph/src/sync/mod.rs b/finality-aleph/src/sync/mod.rs index 2526963bcf..59600f7b95 100644 --- a/finality-aleph/src/sync/mod.rs +++ b/finality-aleph/src/sync/mod.rs @@ -61,25 +61,25 @@ pub trait Finalizer { fn finalize(&self, justification: J) -> Result<(), Self::Error>; } -/// A notification about the chain state changing. -pub enum ChainStateNotification { +/// A notification about the chain status changing. +pub enum ChainStatusNotification { /// A block has been imported. BlockImported(BI), /// A block has been finalized. BlockFinalized(BI), } -/// A stream of notifications about the chain state in the database changing. +/// A stream of notifications about the chain status in the database changing. #[async_trait::async_trait] -pub trait ChainStateNotifier { +pub trait ChainStatusNotifier { type Error: Display; - /// Returns a chain state notification when it is available. - async fn next(&mut self) -> Result, Self::Error>; + /// Returns a chain status notification when it is available. + async fn next(&mut self) -> Result, Self::Error>; } -/// The state of a block in the database. -pub enum BlockState { +/// The status of a block in the database. +pub enum BlockStatus { /// The block is justified and thus finalized. Justified(J), /// The block is present, might be finalized if a descendant is justified. @@ -88,10 +88,10 @@ pub enum BlockState { Unknown, } -/// The knowledge about the chain state. -pub trait ChainState { - /// The state of the block. - fn state_of(&self, id: ::Identifier) -> BlockState; +/// The knowledge about the chain status. +pub trait ChainStatus { + /// The status of the block. + fn status_of(&self, id: ::Identifier) -> BlockStatus; /// The header of the best block. fn best_block(&self) -> J::Header; diff --git a/finality-aleph/src/sync/substrate/mod.rs b/finality-aleph/src/sync/substrate/mod.rs index 1088277891..e7e46924a4 100644 --- a/finality-aleph/src/sync/substrate/mod.rs +++ b/finality-aleph/src/sync/substrate/mod.rs @@ -5,7 +5,7 @@ use sp_runtime::traits::{CheckedSub, Header as SubstrateHeader, One}; use crate::sync::{BlockIdentifier, Header}; -mod state_notifier; +mod status_notifier; #[derive(Clone, Debug, PartialEq, Eq)] pub struct BlockId> { diff --git a/finality-aleph/src/sync/substrate/state_notifier.rs b/finality-aleph/src/sync/substrate/status_notifier.rs similarity index 70% rename from finality-aleph/src/sync/substrate/state_notifier.rs rename to finality-aleph/src/sync/substrate/status_notifier.rs index d791162a49..200e699727 100644 --- a/finality-aleph/src/sync/substrate/state_notifier.rs +++ b/finality-aleph/src/sync/substrate/status_notifier.rs @@ -6,9 +6,9 @@ use sc_client_api::client::{FinalityNotifications, ImportNotifications}; use sp_runtime::traits::{Block as BlockT, Header as SubstrateHeader}; use tokio::select; -use crate::sync::{substrate::BlockId, ChainStateNotification, ChainStateNotifier, Header}; +use crate::sync::{substrate::BlockId, ChainStatusNotification, ChainStatusNotifier, Header}; -/// What can go wrong when waiting for next chain state notification. +/// What can go wrong when waiting for next chain status notification. #[derive(Debug)] pub enum Error { JustificationStream, @@ -29,8 +29,8 @@ impl Display for Error { } } -/// Substrate specific implementation of `ChainStateNotifier`. -pub struct SubstrateChainStateNotifier +/// Substrate specific implementation of `ChainStatusNotifier`. +pub struct SubstrateChainStatusNotifier where H: SubstrateHeader, B: BlockT
, @@ -39,7 +39,7 @@ where import_notifications: ImportNotifications, } -impl SubstrateChainStateNotifier +impl SubstrateChainStatusNotifier where H: SubstrateHeader, B: BlockT
, @@ -56,24 +56,24 @@ where } #[async_trait::async_trait] -impl ChainStateNotifier> for SubstrateChainStateNotifier +impl ChainStatusNotifier> for SubstrateChainStatusNotifier where H: SubstrateHeader, B: BlockT
, { type Error = Error; - /// Returns next chain state notification. - async fn next(&mut self) -> Result>, Self::Error> { + /// Returns next chain status notification. + async fn next(&mut self) -> Result>, Self::Error> { select! { maybe_block = self.finality_notifications.next() => { maybe_block - .map(|block| ChainStateNotification::BlockFinalized(block.header.id())) + .map(|block| ChainStatusNotification::BlockFinalized(block.header.id())) .ok_or(Error::JustificationStream) }, maybe_block = self.import_notifications.next() => { maybe_block - .map(|block| ChainStateNotification::BlockImported(block.header.id())) + .map(|block| ChainStatusNotification::BlockImported(block.header.id())) .ok_or(Error::ImportStream) } } From 3efb30d1bcf375a13eb59c8bd2951de9eafa41b5 Mon Sep 17 00:00:00 2001 From: maciejnems Date: Fri, 30 Dec 2022 16:57:04 +0100 Subject: [PATCH 3/4] remove unnecessary H --- .../src/sync/substrate/status_notifier.rs | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/finality-aleph/src/sync/substrate/status_notifier.rs b/finality-aleph/src/sync/substrate/status_notifier.rs index 200e699727..2952ed8645 100644 --- a/finality-aleph/src/sync/substrate/status_notifier.rs +++ b/finality-aleph/src/sync/substrate/status_notifier.rs @@ -30,19 +30,17 @@ impl Display for Error { } /// Substrate specific implementation of `ChainStatusNotifier`. -pub struct SubstrateChainStatusNotifier +pub struct SubstrateChainStatusNotifier where - H: SubstrateHeader, - B: BlockT
, + B: BlockT, { finality_notifications: FinalityNotifications, import_notifications: ImportNotifications, } -impl SubstrateChainStatusNotifier +impl SubstrateChainStatusNotifier where - H: SubstrateHeader, - B: BlockT
, + B: BlockT, { fn new( finality_notifications: FinalityNotifications, @@ -56,15 +54,15 @@ where } #[async_trait::async_trait] -impl ChainStatusNotifier> for SubstrateChainStatusNotifier +impl ChainStatusNotifier> for SubstrateChainStatusNotifier where - H: SubstrateHeader, - B: BlockT
, + B: BlockT, + B::Header: SubstrateHeader, { type Error = Error; - /// Returns next chain status notification. - async fn next(&mut self) -> Result>, Self::Error> { + /// Returns next chain tatus notification. + async fn next(&mut self) -> Result>, Self::Error> { select! { maybe_block = self.finality_notifications.next() => { maybe_block From 9f321d31c270f1598a449cfe8938cbe9e53473fc Mon Sep 17 00:00:00 2001 From: maciejnems Date: Fri, 30 Dec 2022 17:05:20 +0100 Subject: [PATCH 4/4] apply suggested changes from mr kostek --- finality-aleph/src/sync/mod.rs | 2 -- .../src/sync/substrate/status_notifier.rs | 13 ++++++------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/finality-aleph/src/sync/mod.rs b/finality-aleph/src/sync/mod.rs index 59600f7b95..880c2e84ef 100644 --- a/finality-aleph/src/sync/mod.rs +++ b/finality-aleph/src/sync/mod.rs @@ -6,8 +6,6 @@ use std::{ mod substrate; mod ticker; -const LOG_TARGET: &str = "aleph-block-sync"; - /// The identifier of a block, the least amount of knowledge we can have about a block. pub trait BlockIdentifier: Clone + Hash + Debug + Eq { /// The block number, useful when reasoning about hopeless forks. diff --git a/finality-aleph/src/sync/substrate/status_notifier.rs b/finality-aleph/src/sync/substrate/status_notifier.rs index 2952ed8645..042e05104a 100644 --- a/finality-aleph/src/sync/substrate/status_notifier.rs +++ b/finality-aleph/src/sync/substrate/status_notifier.rs @@ -11,18 +11,18 @@ use crate::sync::{substrate::BlockId, ChainStatusNotification, ChainStatusNotifi /// What can go wrong when waiting for next chain status notification. #[derive(Debug)] pub enum Error { - JustificationStream, - ImportStream, + JustificationStreamClosed, + ImportStreamClosed, } impl Display for Error { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { use Error::*; match self { - JustificationStream => { + JustificationStreamClosed => { write!(f, "finalization notification stream has ended") } - ImportStream => { + ImportStreamClosed => { write!(f, "import notification stream has ended") } } @@ -61,18 +61,17 @@ where { type Error = Error; - /// Returns next chain tatus notification. async fn next(&mut self) -> Result>, Self::Error> { select! { maybe_block = self.finality_notifications.next() => { maybe_block .map(|block| ChainStatusNotification::BlockFinalized(block.header.id())) - .ok_or(Error::JustificationStream) + .ok_or(Error::JustificationStreamClosed) }, maybe_block = self.import_notifications.next() => { maybe_block .map(|block| ChainStatusNotification::BlockImported(block.header.id())) - .ok_or(Error::ImportStream) + .ok_or(Error::ImportStreamClosed) } } }