From 3e63c72bb769af138d73bbc0ea7df9a5e3fe324e Mon Sep 17 00:00:00 2001 From: Michal Swietek Date: Wed, 24 Mar 2021 10:14:13 +0100 Subject: [PATCH] implements Future for NetworkBridge that routes messages --- bin/node/src/service.rs | 9 +++-- finality-aleph/src/communication/gossip.rs | 8 ++--- finality-aleph/src/communication/mod.rs | 2 -- finality-aleph/src/communication/network.rs | 36 ++++++++++++++++---- finality-aleph/src/communication/peer/mod.rs | 2 +- finality-aleph/src/config.rs | 2 +- finality-aleph/src/lib.rs | 33 +++++++++++++++--- finality-aleph/src/party.rs | 11 +++++- 8 files changed, 82 insertions(+), 21 deletions(-) diff --git a/bin/node/src/service.rs b/bin/node/src/service.rs index de1c371e20..384af33243 100644 --- a/bin/node/src/service.rs +++ b/bin/node/src/service.rs @@ -86,7 +86,7 @@ pub fn new_partial( } fn get_authority(keystore: SyncCryptoStorePtr) -> AuthorityId { - let key_type_id = sp_application_crypto::key_types::AURA; + let key_type_id = finality_aleph::KEY_TYPE; let keys = SyncCryptoStore::sr25519_public_keys(&*keystore, key_type_id); if keys.is_empty() { SyncCryptoStore::sr25519_generate_new(&*keystore, key_type_id, None) @@ -138,7 +138,7 @@ fn consensus_config( } /// Builds a new service for a full client. -pub fn new_full(config: Configuration) -> Result { +pub fn new_full(mut config: Configuration) -> Result { let sc_service::PartialComponents { client, task_manager, @@ -151,6 +151,11 @@ pub fn new_full(config: Configuration) -> Result { .. } = new_partial(&config)?; + config + .network + .extra_sets + .push(finality_aleph::peers_set_config()); + let (network, _, _, network_starter) = sc_service::build_network(sc_service::BuildNetworkParams { config: &config, diff --git a/finality-aleph/src/communication/gossip.rs b/finality-aleph/src/communication/gossip.rs index 250a8f86bf..5836e5e61f 100644 --- a/finality-aleph/src/communication/gossip.rs +++ b/finality-aleph/src/communication/gossip.rs @@ -122,8 +122,8 @@ pub(crate) enum GossipMessage { /// Reports a peer with a reputation change. pub(crate) struct PeerReport { - who: PeerId, - change: ReputationChange, + pub(crate) who: PeerId, + pub(crate) change: ReputationChange, } /// A prometheus result. @@ -210,7 +210,7 @@ impl GossipValidator { /// Sets the current authorities which are used to ensure that the incoming /// messages are indeed signed by these authorities. - pub(crate) fn set_authorities(&self, authorities: I) + pub(crate) fn _set_authorities(&self, authorities: I) where I: IntoIterator, { @@ -220,7 +220,7 @@ impl GossipValidator { } /// Removes a single authority in case they had been forked out. - pub(crate) fn remove_authority(&self, authority: &AuthorityId) { + pub(crate) fn _remove_authority(&self, authority: &AuthorityId) { let mut authorities = self.authority_set.write(); authorities.remove(authority); } diff --git a/finality-aleph/src/communication/mod.rs b/finality-aleph/src/communication/mod.rs index b6506c39f5..2efb80c6f3 100644 --- a/finality-aleph/src/communication/mod.rs +++ b/finality-aleph/src/communication/mod.rs @@ -5,8 +5,6 @@ pub(crate) mod peer; use rush::EpochId; use sp_runtime::traits::{Block, Hash, Header}; -pub const ALEPH_AUTHORITIES_KEY: &[u8] = b":aleph_authorities"; - pub(crate) fn epoch_topic(epoch: EpochId) -> B::Hash { <::Hashing as Hash>::hash(format!("epoch-{}", epoch.0).as_bytes()) } diff --git a/finality-aleph/src/communication/network.rs b/finality-aleph/src/communication/network.rs index 3925f8be1c..5cd0199944 100644 --- a/finality-aleph/src/communication/network.rs +++ b/finality-aleph/src/communication/network.rs @@ -9,8 +9,8 @@ use crate::{ AuthorityKeystore, UnitCoord, }; use codec::{Decode, Encode}; -use futures::{channel::mpsc, prelude::*}; -use log::{debug, trace}; +use futures::{channel::mpsc, prelude::*, Future}; +use log::debug; use parking_lot::Mutex; use prometheus_endpoint::Registry; use rush::{EpochId, NotificationIn, NotificationOut}; @@ -106,8 +106,9 @@ impl Sink> for NotificationOutSen } } +#[derive(Clone)] pub(crate) struct NetworkBridge> { - network_service: N, + _network_service: N, gossip_engine: Arc>>, gossip_validator: Arc>, peer_report_handle: Arc>>, @@ -133,7 +134,7 @@ impl> NetworkBridge { ))); NetworkBridge { - network_service, + _network_service: network_service, gossip_engine, gossip_validator, peer_report_handle, @@ -180,7 +181,7 @@ impl> NetworkBridge { futures::future::ready(notification) } else { // NOTE: This should be unreachable due to the validator. - trace!(target: "afa", "Skipping malformed incoming message: {:?}", notification); + debug!(target: "afa", "Skipping malformed incoming message: {:?}", notification); futures::future::ready(None) } }); @@ -201,7 +202,7 @@ impl> NetworkBridge { futures::future::ready(notification) } else { // NOTE: This should be unreachable due to the validator. - trace!(target: "afa", "Skipping malformed incoming message: {:?}", notification); + debug!(target: "afa", "Skipping malformed incoming message: {:?}", notification); futures::future::ready(None) } }); @@ -223,3 +224,26 @@ impl> NetworkBridge { (outgoing, Box::new(incoming)) } } + +impl> Future for NetworkBridge { + type Output = (); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + match self.peer_report_handle.lock().poll_next_unpin(cx) { + Poll::Ready(Some(PeerReport { who, change })) => { + self.gossip_engine.lock().report(who, change); + } + Poll::Ready(None) => { + debug!(target: "afa", "Gossip validator report stream closed."); + return Poll::Ready(()); + } + Poll::Pending => break, + } + } + + self.gossip_engine.lock().poll_unpin(cx).map(|_| { + debug!(target: "afa", "Gossip engine future finished"); + () + }) + } +} diff --git a/finality-aleph/src/communication/peer/mod.rs b/finality-aleph/src/communication/peer/mod.rs index 99237c5ee0..da50089e30 100644 --- a/finality-aleph/src/communication/peer/mod.rs +++ b/finality-aleph/src/communication/peer/mod.rs @@ -38,7 +38,7 @@ impl Peers { self.others.remove(peer); } - pub(crate) fn contains(&self, peer: &PeerId) -> bool { + pub(crate) fn _contains(&self, peer: &PeerId) -> bool { self.authorities.contains_key(peer) || self.others.contains_key(peer) } diff --git a/finality-aleph/src/config.rs b/finality-aleph/src/config.rs index 0729b18703..bb8b8f9e77 100644 --- a/finality-aleph/src/config.rs +++ b/finality-aleph/src/config.rs @@ -13,7 +13,7 @@ pub struct Config { } impl Config { - pub(crate) fn name(&self) -> &str { + pub(crate) fn _name(&self) -> &str { self.name.as_deref().unwrap_or("") } } diff --git a/finality-aleph/src/lib.rs b/finality-aleph/src/lib.rs index e2702efb08..11ec0811a4 100644 --- a/finality-aleph/src/lib.rs +++ b/finality-aleph/src/lib.rs @@ -1,6 +1,5 @@ #![allow(clippy::type_complexity)] use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr}; -use sp_runtime::KeyTypeId; use futures::Future; @@ -17,7 +16,11 @@ use sp_api::ProvideRuntimeApi; use sp_blockchain::{HeaderBackend, HeaderMetadata}; use sp_consensus::{BlockImport, SelectChain}; use sp_runtime::traits::Block; -use std::{convert::TryInto, fmt::Debug, sync::Arc}; +use std::{ + convert::TryInto, + fmt::{Debug, Display}, + sync::Arc, +}; pub(crate) mod communication; pub mod config; @@ -42,7 +45,22 @@ mod party; // pub type AuthoritySignature = app::Signature; // pub type AuthorityPair = app::Pair; -use sp_application_crypto::key_types::AURA; +pub fn peers_set_config() -> sc_network::config::NonDefaultSetConfig { + sc_network::config::NonDefaultSetConfig { + notifications_protocol: communication::network::ALEPH_PROTOCOL_NAME.into(), + max_notification_size: 1024 * 1024, + set_config: sc_network::config::SetConfig { + in_peers: 0, + out_peers: 0, + reserved_nodes: vec![], + non_reserved_mode: sc_network::config::NonReservedPeerMode::Accept, + }, + } +} + +use sp_core::crypto::KeyTypeId; +pub const KEY_TYPE: KeyTypeId = KeyTypeId(*b"alp0"); +// use sp_application_crypto::key_types::AURA; pub use sp_consensus_aura::sr25519::{AuthorityId, AuthorityPair, AuthoritySignature}; #[derive(Clone, Debug, Default, Eq, Hash, Encode, Decode, PartialEq)] @@ -51,6 +69,12 @@ pub struct NodeId { pub index: NodeIndex, } +impl Display for NodeId { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + Display::fmt(&self.auth, f) + } +} + impl rush::MyIndex for NodeId { fn my_index(&self) -> Option { Some(self.index) @@ -70,7 +94,7 @@ impl AuthorityKeystore { /// Constructs a new authority cryptography keystore. pub fn new(authority_id: AuthorityId, keystore: SyncCryptoStorePtr) -> Self { AuthorityKeystore { - key_type_id: AURA, + key_type_id: KEY_TYPE, authority_id, keystore, } @@ -166,6 +190,7 @@ where { } +#[derive(Clone)] struct SpawnHandle(SpawnTaskHandle); impl From for SpawnHandle { diff --git a/finality-aleph/src/party.rs b/finality-aleph/src/party.rs index 40f7fb70db..886e23752d 100644 --- a/finality-aleph/src/party.rs +++ b/finality-aleph/src/party.rs @@ -17,6 +17,7 @@ where BE: Backend + 'static, SC: SelectChain + 'static, { + env: Arc>, consensus: Consensus>, } @@ -46,12 +47,20 @@ where )); let consensus = Consensus::new(conf, env.clone()); - ConsensusParty { consensus } + ConsensusParty { env, consensus } } pub(crate) async fn run(self, spawn_handle: SpawnHandle) { // TODO now it runs just a single instance of consensus but later it will // orchestrate managing multiple instances for differents epochs + + rush::SpawnHandle::spawn( + &spawn_handle.clone(), + "aleph/network", + self.env.network.clone(), + ); + log::debug!(target: "afa", "Aleph network has started"); + let (_exit, exit) = tokio::sync::oneshot::channel(); log::debug!(target: "afa", "Consensus party has started"); self.consensus.run(spawn_handle, exit).await;