Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions finality-aleph/src/communication/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ impl<B: Block, H: Hash> SignedUnit<B, H> {
pub(crate) fn sign_unit<B: Block, H: Hash>(
auth_crypto_store: &AuthorityKeystore,
unit: Unit<B::Hash, H>,
) -> Option<SignedUnit<B, H>> {
) -> SignedUnit<B, H> {
let encoded = unit.encode();
let signature = auth_crypto_store.sign(&encoded[..]);

Some(SignedUnit {
SignedUnit {
unit,
signature,
id: auth_crypto_store.authority_id.clone(),
})
}
}

/// Actions for incoming messages.
Expand Down
73 changes: 60 additions & 13 deletions finality-aleph/src/communication/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ use crate::{
AuthorityKeystore, UnitCoord,
};
use codec::{Decode, Encode};
use futures::{channel::mpsc, prelude::*, Future};
use futures::{
channel::{mpsc, mpsc::SendError},
prelude::*,
Future, FutureExt, StreamExt,
};
use log::debug;
use parking_lot::Mutex;
use prometheus_endpoint::Registry;
Expand All @@ -19,11 +23,58 @@ use sc_network_gossip::{GossipEngine, Network as GossipNetwork};
use sp_runtime::traits::Block;
use sp_utils::mpsc::TracingUnboundedReceiver;
use std::{
error::Error,
fmt::{Display, Formatter, Result as FmtResult},
pin::Pin,
sync::Arc,
task::{Context, Poll},
};

#[derive(Debug)]
enum ErrorKind {
StartSendFail(SendError),
}

impl Display for ErrorKind {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
use ErrorKind::*;
match self {
StartSendFail(e) => write!(f, "failed to send on channel: {}", e),
}
}
}

impl Error for ErrorKind {}

#[derive(Debug)]
pub struct NetworkError(Box<ErrorKind>);

impl Display for NetworkError {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
Display::fmt(&self.0, f)
}
}

impl Error for NetworkError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
Some(&self.0)
}
}

impl From<ErrorKind> for NetworkError {
fn from(e: ErrorKind) -> Self {
NetworkError(Box::new(e))
}
}

impl From<SendError> for NetworkError {
fn from(e: SendError) -> Self {
NetworkError(Box::new(ErrorKind::StartSendFail(e)))
}
}

pub type NetworkResult<T> = Result<T, NetworkError>;

/// Name of the notifications protocol used by Aleph Zero. This is how messages
/// are subscribed to to ensure that we are gossiping and communicating with our
/// own network.
Expand All @@ -43,8 +94,7 @@ pub struct NotificationOutSender<B: Block, H: Hash> {
unsafe impl<B: Block, H: Hash> Send for NotificationOutSender<B, H> {}

impl<B: Block, H: Hash> Sink<NotificationOut<B::Hash, H>> for NotificationOutSender<B, H> {
// TODO! error
type Error = ();
type Error = NetworkError;

fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
Expand All @@ -53,14 +103,11 @@ impl<B: Block, H: Hash> Sink<NotificationOut<B::Hash, H>> for NotificationOutSen
fn start_send(
mut self: Pin<&mut Self>,
item: NotificationOut<B::Hash, H>,
) -> Result<(), Self::Error> {
) -> NetworkResult<()> {
return match item {
NotificationOut::CreatedUnit(u) => {
let signed_unit = match super::gossip::sign_unit::<B, H>(&self.auth_cryptostore, u)
{
Some(s) => s,
None => return Err(()),
};
let signed_unit = super::gossip::sign_unit::<B, H>(&self.auth_cryptostore, u);

let message = GossipMessage::Multicast(Multicast {
signed_unit: signed_unit.clone(),
});
Expand All @@ -71,7 +118,7 @@ impl<B: Block, H: Hash> Sink<NotificationOut<B::Hash, H>> for NotificationOutSen
.gossip_message(topic, message.encode(), false);

let notification = NotificationIn::NewUnits(vec![signed_unit.unit]);
self.sender.start_send(notification).map_err(|_e| ())
self.sender.start_send(notification).map_err(|e| e.into())
}
NotificationOut::MissingUnits(coords, aux) => {
let n_coords = {
Expand All @@ -97,12 +144,12 @@ impl<B: Block, H: Hash> Sink<NotificationOut<B::Hash, H>> for NotificationOutSen
};
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<NetworkResult<()>> {
Poll::Ready(Ok(()))
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Sink::poll_close(Pin::new(&mut self.sender), cx).map(|elem| elem.map_err(|_e| ()))
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<NetworkResult<()>> {
Sink::poll_close(Pin::new(&mut self.sender), cx).map(|elem| elem.map_err(|e| e.into()))
}
}

Expand Down
13 changes: 8 additions & 5 deletions finality-aleph/src/environment.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
communication::network::{Network, NetworkBridge, NotificationOutSender},
communication::network::{Network, NetworkBridge, NetworkError, NotificationOutSender},
AuthorityKeystore, NodeId,
};
use futures::Stream;
Expand Down Expand Up @@ -65,21 +65,24 @@ where
type Crypto = ();
type In = Box<dyn Stream<Item = NotificationIn<Self::BlockHash, Self::Hash>> + Send + Unpin>;
type Out = NotificationOutSender<B, Self::Hash>;
type Error = ();
type Error = NetworkError;

fn finalize_block(&self, h: Self::BlockHash) {
finalize_block(self.client.clone(), h);
}

fn check_extends_finalized(&self, h: Self::BlockHash) -> bool {
let head_finalized = self.client.info().finalized_hash;
let lca =
sp_blockchain::lowest_common_ancestor(self.client.as_ref(), h, head_finalized).unwrap();
let lca = sp_blockchain::lowest_common_ancestor(self.client.as_ref(), h, head_finalized)
.expect("No lowest common ancestor");
lca.hash == head_finalized
}

fn best_block(&self) -> Self::BlockHash {
self.select_chain.best_chain().unwrap().hash()
self.select_chain
.best_chain()
.expect("No best chain")
.hash()
}

fn consensus_data(&self) -> (Self::Out, Self::In) {
Expand Down
42 changes: 42 additions & 0 deletions finality-aleph/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use crate::communication::network::NetworkError;
use std::{
error::Error as StdError,
fmt::{Display, Formatter, Result as FmtResult},
};

#[derive(Debug)]
enum ErrorKind {
Network(NetworkError),
}

impl Display for ErrorKind {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
use ErrorKind::*;
match self {
Network(e) => std::fmt::Display::fmt(&e, f),
}
}
}

impl StdError for ErrorKind {}

#[derive(Debug)]
pub struct Error(Box<ErrorKind>);

impl Display for Error {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
Display::fmt(&self.0, f)
}
}

impl StdError for Error {
fn source(&self) -> Option<&(dyn StdError + 'static)> {
Some(&self.0)
}
}

impl From<NetworkError> for Error {
fn from(e: NetworkError) -> Error {
Error(Box::new(ErrorKind::Network(e)))
}
}
5 changes: 2 additions & 3 deletions finality-aleph/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
#![allow(clippy::type_complexity)]
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};

use futures::Future;

use codec::{Decode, Encode};
use futures::Future;
use rush::{nodes::NodeIndex, HashT, Unit};
pub use rush::{Config as ConsensusConfig, EpochId};

use sc_client_api::{
backend::{AuxStore, Backend},
BlockchainEvents, ExecutorProvider, Finalizer, LockImportRun, TransactionFor,
Expand All @@ -25,6 +23,7 @@ use std::{
pub(crate) mod communication;
pub mod config;
pub(crate) mod environment;
mod error;
pub mod hash;
mod party;

Expand Down