Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions bin/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub fn new_partial(
}

fn get_authority(keystore: SyncCryptoStorePtr) -> AuthorityId {
let key_type_id = sp_application_crypto::key_types::AURA;
let key_type_id = finality_aleph::KEY_TYPE;
let keys = SyncCryptoStore::sr25519_public_keys(&*keystore, key_type_id);
if keys.is_empty() {
SyncCryptoStore::sr25519_generate_new(&*keystore, key_type_id, None)
Expand Down Expand Up @@ -138,7 +138,7 @@ fn consensus_config(
}

/// Builds a new service for a full client.
pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError> {
let sc_service::PartialComponents {
client,
task_manager,
Expand All @@ -151,6 +151,11 @@ pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
..
} = new_partial(&config)?;

config
.network
.extra_sets
.push(finality_aleph::peers_set_config());

let (network, _, _, network_starter) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
Expand Down
8 changes: 4 additions & 4 deletions finality-aleph/src/communication/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ pub(crate) enum GossipMessage<B: Block, H: Hash> {

/// Reports a peer with a reputation change.
pub(crate) struct PeerReport {
who: PeerId,
change: ReputationChange,
pub(crate) who: PeerId,
pub(crate) change: ReputationChange,
}

/// A prometheus result.
Expand Down Expand Up @@ -210,7 +210,7 @@ impl<B: Block, H: Hash> GossipValidator<B, H> {

/// Sets the current authorities which are used to ensure that the incoming
/// messages are indeed signed by these authorities.
pub(crate) fn set_authorities<I>(&self, authorities: I)
pub(crate) fn _set_authorities<I>(&self, authorities: I)
where
I: IntoIterator<Item = AuthorityId>,
{
Expand All @@ -220,7 +220,7 @@ impl<B: Block, H: Hash> GossipValidator<B, H> {
}

/// Removes a single authority in case they had been forked out.
pub(crate) fn remove_authority(&self, authority: &AuthorityId) {
pub(crate) fn _remove_authority(&self, authority: &AuthorityId) {
let mut authorities = self.authority_set.write();
authorities.remove(authority);
}
Expand Down
2 changes: 0 additions & 2 deletions finality-aleph/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ pub(crate) mod peer;
use rush::EpochId;
use sp_runtime::traits::{Block, Hash, Header};

pub const ALEPH_AUTHORITIES_KEY: &[u8] = b":aleph_authorities";

pub(crate) fn epoch_topic<B: Block>(epoch: EpochId) -> B::Hash {
<<B::Header as Header>::Hashing as Hash>::hash(format!("epoch-{}", epoch.0).as_bytes())
}
Expand Down
36 changes: 30 additions & 6 deletions finality-aleph/src/communication/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::{
AuthorityKeystore, UnitCoord,
};
use codec::{Decode, Encode};
use futures::{channel::mpsc, prelude::*};
use log::{debug, trace};
use futures::{channel::mpsc, prelude::*, Future};
use log::debug;
use parking_lot::Mutex;
use prometheus_endpoint::Registry;
use rush::{EpochId, NotificationIn, NotificationOut};
Expand Down Expand Up @@ -106,8 +106,9 @@ impl<B: Block, H: Hash> Sink<NotificationOut<B::Hash, H>> for NotificationOutSen
}
}

#[derive(Clone)]
pub(crate) struct NetworkBridge<B: Block, H, N: Network<B>> {
network_service: N,
_network_service: N,
gossip_engine: Arc<Mutex<GossipEngine<B>>>,
gossip_validator: Arc<GossipValidator<B, H>>,
peer_report_handle: Arc<Mutex<TracingUnboundedReceiver<PeerReport>>>,
Expand All @@ -133,7 +134,7 @@ impl<B: Block, H: Hash, N: Network<B>> NetworkBridge<B, H, N> {
)));

NetworkBridge {
network_service,
_network_service: network_service,
gossip_engine,
gossip_validator,
peer_report_handle,
Expand Down Expand Up @@ -180,7 +181,7 @@ impl<B: Block, H: Hash, N: Network<B>> NetworkBridge<B, H, N> {
futures::future::ready(notification)
} else {
// NOTE: This should be unreachable due to the validator.
trace!(target: "afa", "Skipping malformed incoming message: {:?}", notification);
debug!(target: "afa", "Skipping malformed incoming message: {:?}", notification);
futures::future::ready(None)
}
});
Expand All @@ -201,7 +202,7 @@ impl<B: Block, H: Hash, N: Network<B>> NetworkBridge<B, H, N> {
futures::future::ready(notification)
} else {
// NOTE: This should be unreachable due to the validator.
trace!(target: "afa", "Skipping malformed incoming message: {:?}", notification);
debug!(target: "afa", "Skipping malformed incoming message: {:?}", notification);
futures::future::ready(None)
}
});
Expand All @@ -223,3 +224,26 @@ impl<B: Block, H: Hash, N: Network<B>> NetworkBridge<B, H, N> {
(outgoing, Box::new(incoming))
}
}

impl<B: Block, H: Hash, N: Network<B>> Future for NetworkBridge<B, H, N> {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match self.peer_report_handle.lock().poll_next_unpin(cx) {
Poll::Ready(Some(PeerReport { who, change })) => {
self.gossip_engine.lock().report(who, change);
}
Poll::Ready(None) => {
debug!(target: "afa", "Gossip validator report stream closed.");
return Poll::Ready(());
}
Poll::Pending => break,
}
}

self.gossip_engine.lock().poll_unpin(cx).map(|_| {
debug!(target: "afa", "Gossip engine future finished");
()
})
}
}
2 changes: 1 addition & 1 deletion finality-aleph/src/communication/peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl Peers {
self.others.remove(peer);
}

pub(crate) fn contains(&self, peer: &PeerId) -> bool {
pub(crate) fn _contains(&self, peer: &PeerId) -> bool {
self.authorities.contains_key(peer) || self.others.contains_key(peer)
}

Expand Down
2 changes: 1 addition & 1 deletion finality-aleph/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub struct Config {
}

impl Config {
pub(crate) fn name(&self) -> &str {
pub(crate) fn _name(&self) -> &str {
self.name.as_deref().unwrap_or("<unknown>")
}
}
33 changes: 29 additions & 4 deletions finality-aleph/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#![allow(clippy::type_complexity)]
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
use sp_runtime::KeyTypeId;

use futures::Future;

Expand All @@ -17,7 +16,11 @@ use sp_api::ProvideRuntimeApi;
use sp_blockchain::{HeaderBackend, HeaderMetadata};
use sp_consensus::{BlockImport, SelectChain};
use sp_runtime::traits::Block;
use std::{convert::TryInto, fmt::Debug, sync::Arc};
use std::{
convert::TryInto,
fmt::{Debug, Display},
sync::Arc,
};

pub(crate) mod communication;
pub mod config;
Expand All @@ -42,7 +45,22 @@ mod party;
// pub type AuthoritySignature = app::Signature;
// pub type AuthorityPair = app::Pair;

use sp_application_crypto::key_types::AURA;
pub fn peers_set_config() -> sc_network::config::NonDefaultSetConfig {
sc_network::config::NonDefaultSetConfig {
notifications_protocol: communication::network::ALEPH_PROTOCOL_NAME.into(),
max_notification_size: 1024 * 1024,
set_config: sc_network::config::SetConfig {
in_peers: 0,
out_peers: 0,
reserved_nodes: vec![],
non_reserved_mode: sc_network::config::NonReservedPeerMode::Accept,
},
}
}

use sp_core::crypto::KeyTypeId;
pub const KEY_TYPE: KeyTypeId = KeyTypeId(*b"alp0");
// use sp_application_crypto::key_types::AURA;
pub use sp_consensus_aura::sr25519::{AuthorityId, AuthorityPair, AuthoritySignature};

#[derive(Clone, Debug, Default, Eq, Hash, Encode, Decode, PartialEq)]
Expand All @@ -51,6 +69,12 @@ pub struct NodeId {
pub index: NodeIndex,
}

impl Display for NodeId {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
Display::fmt(&self.auth, f)
}
}

impl rush::MyIndex for NodeId {
fn my_index(&self) -> Option<NodeIndex> {
Some(self.index)
Expand All @@ -70,7 +94,7 @@ impl AuthorityKeystore {
/// Constructs a new authority cryptography keystore.
pub fn new(authority_id: AuthorityId, keystore: SyncCryptoStorePtr) -> Self {
AuthorityKeystore {
key_type_id: AURA,
key_type_id: KEY_TYPE,
authority_id,
keystore,
}
Expand Down Expand Up @@ -166,6 +190,7 @@ where
{
}

#[derive(Clone)]
struct SpawnHandle(SpawnTaskHandle);

impl From<SpawnTaskHandle> for SpawnHandle {
Expand Down
11 changes: 10 additions & 1 deletion finality-aleph/src/party.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ where
BE: Backend<B> + 'static,
SC: SelectChain<B> + 'static,
{
env: Arc<Environment<B, N, C, BE, SC>>,
consensus: Consensus<Environment<B, N, C, BE, SC>>,
}

Expand Down Expand Up @@ -46,12 +47,20 @@ where
));
let consensus = Consensus::new(conf, env.clone());

ConsensusParty { consensus }
ConsensusParty { env, consensus }
}

pub(crate) async fn run(self, spawn_handle: SpawnHandle) {
// TODO now it runs just a single instance of consensus but later it will
// orchestrate managing multiple instances for differents epochs

rush::SpawnHandle::spawn(
&spawn_handle.clone(),
"aleph/network",
self.env.network.clone(),
);
log::debug!(target: "afa", "Aleph network has started");

let (_exit, exit) = tokio::sync::oneshot::channel();
log::debug!(target: "afa", "Consensus party has started");
self.consensus.run(spawn_handle, exit).await;
Expand Down