From f72e1cb0c26555da5d1f764c6f4a9abceb884fcc Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 21 Aug 2019 14:20:10 +0200 Subject: [PATCH 01/20] node/runtime: Add authority-discovery as session handler The srml/authority-discovery module implements the OneSessionHandler in order to keep its authority set in sync. This commit adds the module to the set of session handlers. --- node/runtime/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/runtime/src/lib.rs b/node/runtime/src/lib.rs index b1e4065b5f7d3..f860e252ca72f 100644 --- a/node/runtime/src/lib.rs +++ b/node/runtime/src/lib.rs @@ -191,7 +191,7 @@ impl authorship::Trait for Runtime { type EventHandler = Staking; } -type SessionHandlers = (Grandpa, Babe, ImOnline); +type SessionHandlers = (Grandpa, Babe, ImOnline, AuthorityDiscovery); impl_opaque_keys! { pub struct SessionKeys { From d34eb4fff5ef7af60497701c472ad6c1b5060d54 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 21 Aug 2019 14:52:15 +0200 Subject: [PATCH 02/20] core/network: Make network worker return Dht events on poll Instead of network worker implement the Future trait, have it implement the Stream interface returning Dht events. For now these events are ignored in build_network_future but will be used by the core/authority-discovery module in subsequent commits. --- core/network/src/lib.rs | 1 + core/network/src/protocol/event.rs | 2 ++ core/network/src/service.rs | 13 +++++++------ core/service/src/lib.rs | 12 ++++++------ 4 files changed, 16 insertions(+), 12 deletions(-) diff --git a/core/network/src/lib.rs b/core/network/src/lib.rs index e797ffb208ec1..7e9fd51a41533 100644 --- a/core/network/src/lib.rs +++ b/core/network/src/lib.rs @@ -192,6 +192,7 @@ pub use service::{ NetworkStateInfo, }; pub use protocol::{PeerInfo, Context, consensus_gossip, message, specialization}; +pub use protocol::event::{Event, DhtEvent}; pub use protocol::sync::SyncState; pub use libp2p::{Multiaddr, PeerId}; #[doc(inline)] diff --git a/core/network/src/protocol/event.rs b/core/network/src/protocol/event.rs index c0c26da515f0b..c8bee5588c704 100644 --- a/core/network/src/protocol/event.rs +++ b/core/network/src/protocol/event.rs @@ -20,6 +20,7 @@ use libp2p::kad::record::Key; /// Events generated by DHT as a response to get_value and put_value requests. +#[derive(Debug, Clone)] pub enum DhtEvent { /// The value was found. ValueFound(Vec<(Key, Vec)>), @@ -35,6 +36,7 @@ pub enum DhtEvent { } /// Type for events generated by networking layer. +#[derive(Debug, Clone)] pub enum Event { /// Event generated by a DHT. Dht(DhtEvent), diff --git a/core/network/src/service.rs b/core/network/src/service.rs index c3f773e232e7a..9af39e1e7d461 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -612,11 +612,11 @@ pub struct NetworkWorker, H: Ex light_client_rqs: Option>>, } -impl, H: ExHashT> Future for NetworkWorker { - type Item = (); +impl, H: ExHashT> Stream for NetworkWorker { + type Item = Event; type Error = io::Error; - fn poll(&mut self) -> Poll { + fn poll(&mut self) -> Poll, Self::Error> { // Poll the import queue for actions to perform. let _ = futures03::future::poll_fn(|cx| { self.import_queue.poll_actions(cx, &mut NetworkLink { @@ -636,7 +636,7 @@ impl, H: ExHashT> Future for Ne // Process the next message coming from the `NetworkService`. let msg = match self.from_worker.poll() { Ok(Async::Ready(Some(msg))) => msg, - Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), + Ok(Async::Ready(None)) | Err(_) => return Ok(Async::NotReady), Ok(Async::NotReady) => break, }; @@ -677,8 +677,9 @@ impl, H: ExHashT> Future for Ne Ok(Async::Ready(Some(BehaviourOut::SubstrateAction(outcome)))) => outcome, Ok(Async::Ready(Some(BehaviourOut::Dht(ev)))) => { self.network_service.user_protocol_mut() - .on_event(Event::Dht(ev)); - CustomMessageOutcome::None + .on_event(Event::Dht(ev.clone())); + + return Ok(Async::Ready(Some(Event::Dht(ev)))); }, Ok(Async::Ready(None)) => CustomMessageOutcome::None, Err(err) => { diff --git a/core/service/src/lib.rs b/core/service/src/lib.rs index 33a42e87fe04a..0149937c9c1a5 100644 --- a/core/service/src/lib.rs +++ b/core/service/src/lib.rs @@ -38,7 +38,7 @@ use exit_future::Signal; use futures::prelude::*; use futures03::stream::{StreamExt as _, TryStreamExt as _}; use keystore::Store as Keystore; -use network::{NetworkState, NetworkStateInfo}; +use network::{NetworkState, NetworkStateInfo, Event, DhtEvent}; use log::{log, info, warn, debug, error, Level}; use codec::{Encode, Decode}; use sr_primitives::generic::BlockId; @@ -708,11 +708,11 @@ fn build_network_future< } // Main network polling. - match network.poll() { - Ok(Async::NotReady) => {} - Err(err) => warn!(target: "service", "Error in network: {:?}", err), - Ok(Async::Ready(())) => warn!(target: "service", "Network service finished"), - } + while let Ok(Async::Ready(Some(Event::Dht(event)))) = network.poll().map_err(|err| { + warn!(target: "service", "Error in network: {:?}", err); + }) { + // Ignore for now. + }; // Now some diagnostic for performances. let polling_dur = before_polling.elapsed(); From 94a238a6f2fb9c0a9a3c09d56ecfbcb4fc5c828d Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 21 Aug 2019 15:03:48 +0200 Subject: [PATCH 03/20] *: Add scaffolding and integration for core/authority-discovery module --- Cargo.lock | 97 ++++++++++ Cargo.toml | 1 + core/authority-discovery/Cargo.toml | 26 +++ core/authority-discovery/build.rs | 3 + core/authority-discovery/src/error.rs | 24 +++ core/authority-discovery/src/lib.rs | 170 ++++++++++++++++++ core/authority-discovery/src/schema/dht.proto | 14 ++ core/service/Cargo.toml | 2 + core/service/src/components.rs | 55 +++++- core/service/src/lib.rs | 32 +++- node/cli/src/service.rs | 2 +- 11 files changed, 419 insertions(+), 7 deletions(-) create mode 100644 core/authority-discovery/Cargo.toml create mode 100644 core/authority-discovery/build.rs create mode 100644 core/authority-discovery/src/error.rs create mode 100644 core/authority-discovery/src/lib.rs create mode 100644 core/authority-discovery/src/schema/dht.proto diff --git a/Cargo.lock b/Cargo.lock index 592b39e08ce8c..c8f193c2c4eb3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -881,6 +881,11 @@ dependencies = [ "static_assertions 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "fixedbitset" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "flate2" version = "1.0.9" @@ -2218,6 +2223,11 @@ dependencies = [ "ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "multimap" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "multistream-select" version = "0.5.0" @@ -2922,6 +2932,14 @@ name = "percent-encoding" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "petgraph" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "fixedbitset 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "pin-utils" version = "0.1.0-alpha.4" @@ -2985,6 +3003,54 @@ dependencies = [ "unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "prost" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "prost-derive 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "prost-build" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", + "multimap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "petgraph 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)", + "prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "prost-types 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "which 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "prost-derive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.15.42 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "prost-types" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "protobuf" version = "2.8.0" @@ -4399,6 +4465,28 @@ dependencies = [ "substrate-test-runtime-client 2.0.0", ] +[[package]] +name = "substrate-authority-discovery" +version = "2.0.0" +dependencies = [ + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", + "libp2p 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", + "parity-scale-codec 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", + "prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "prost-build 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.40 (registry+https://github.com/rust-lang/crates.io-index)", + "sr-primitives 2.0.0", + "substrate-authority-discovery-primitives 2.0.0", + "substrate-client 2.0.0", + "substrate-keystore 2.0.0", + "substrate-network 2.0.0", + "substrate-primitives 2.0.0", + "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "substrate-authority-discovery-primitives" version = "2.0.0" @@ -5032,6 +5120,8 @@ dependencies = [ "sr-io 2.0.0", "sr-primitives 2.0.0", "substrate-application-crypto 2.0.0", + "substrate-authority-discovery 2.0.0", + "substrate-authority-discovery-primitives 2.0.0", "substrate-client 2.0.0", "substrate-client-db 2.0.0", "substrate-consensus-babe-primitives 2.0.0", @@ -6317,6 +6407,7 @@ dependencies = [ "checksum fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b1ee15a7050e5580b3712877157068ea713b245b080ff302ae2ca973cfcd9baa" "checksum finality-grandpa 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9681c1f75941ea47584573dd2bc10558b2067d460612945887e00744e43393be" "checksum fixed-hash 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "516877b7b9a1cc2d0293cbce23cd6203f0edbfd4090e6ca4489fecb5aa73050e" +"checksum fixedbitset 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "86d4de0081402f5e88cdac65c8dcdcc73118c1a7a465e2a05f0da05843a8ea33" "checksum flate2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)" = "550934ad4808d5d39365e5d61727309bf18b3b02c6c56b729cb92e7dd84bc3d8" "checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" "checksum foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" @@ -6443,6 +6534,7 @@ dependencies = [ "checksum mio-extras 2.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "46e73a04c2fa6250b8d802134d56d554a9ec2922bf977777c805ea5def61ce40" "checksum mio-uds 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)" = "966257a94e196b11bb43aca423754d87429960a768de9414f3691d6957abf125" "checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" +"checksum multimap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2eb04b9f127583ed176e163fb9ec6f3e793b87e21deedd5734a69386a18a0151" "checksum multistream-select 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "51a032ec01abdbe99a1165cd3e518bdd4bd7ca509a59ae9adf186d240399b90c" "checksum names 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ef320dab323286b50fb5cdda23f61c796a72a89998ab565ca32525c5c556f2da" "checksum native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "4b2df1a4c22fd44a62147fd8f13dd0f95c9d8ca7b2610299b2a2f9cf8964274e" @@ -6489,6 +6581,7 @@ dependencies = [ "checksum pbkdf2 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "006c038a43a45995a9670da19e67600114740e8511d4333bf97a56e66a7542d9" "checksum peeking_take_while 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" "checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831" +"checksum petgraph 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)" = "9c3659d1ee90221741f65dd128d9998311b0e40c5d3c23a62445938214abce4f" "checksum pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5894c618ce612a3fa23881b152b608bafb8c56cfc22f434a3ba3120b40f7b587" "checksum pkg-config 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "a7c1d2cfa5a714db3b5f24f0915e74fcdf91d09d496ba61329705dda7774d2af" "checksum ppv-lite86 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e3cbf9f658cdb5000fcf6f362b8ea2ba154b9f146a61c7a20d647034c6b6561b" @@ -6497,6 +6590,10 @@ dependencies = [ "checksum proc-macro-crate 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "e10d4b51f154c8a7fb96fd6dad097cb74b863943ec010ac94b9fd1be8861fe1e" "checksum proc-macro-hack 0.5.8 (registry+https://github.com/rust-lang/crates.io-index)" = "982a35d1194084ba319d65c4a68d24ca28f5fdb5b8bc20899e4eef8641ea5178" "checksum proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)" = "cf3d2011ab5c909338f7887f4fc896d35932e29146c12c8d01da6b22a80ba759" +"checksum prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "96d14b1c185652833d24aaad41c5832b0be5616a590227c1fbff57c616754b23" +"checksum prost-build 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "eb788126ea840817128183f8f603dce02cb7aea25c2a0b764359d8e20010702e" +"checksum prost-derive 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5e7dc378b94ac374644181a2247cebf59a6ec1c88b49ac77f3a94b86b79d0e11" +"checksum prost-types 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1de482a366941c8d56d19b650fac09ca08508f2a696119ee7513ad590c8bac6f" "checksum protobuf 2.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8aefcec9f142b524d98fc81d07827743be89dd6586a1ba6ab21fa66a500b3fa5" "checksum pwasm-utils 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "efb0dcbddbb600f47a7098d33762a00552c671992171637f5bb310b37fe1f0e4" "checksum quick-error 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5fb6ccf8db7bbcb9c2eae558db5ab4f3da1c2a87e4e597ed394726bc8ea6ca1d" diff --git a/Cargo.toml b/Cargo.toml index 048bfb7629060..3f9d73559cdde 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,6 +63,7 @@ members = [ "core/utils/fork-tree", "core/utils/wasm-builder", "core/utils/wasm-builder-runner", + "core/authority-discovery", "srml/support", "srml/support/procedural", "srml/support/procedural/tools", diff --git a/core/authority-discovery/Cargo.toml b/core/authority-discovery/Cargo.toml new file mode 100644 index 0000000000000..a450532a48370 --- /dev/null +++ b/core/authority-discovery/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "substrate-authority-discovery" +version = "2.0.0" +authors = ["Parity Technologies "] +edition = "2018" +build = "build.rs" + +[build-dependencies] +prost-build = "0.5" + +[dependencies] +network = { package = "substrate-network", path = "../../core/network" } +sr-primitives = { path = "../../core/sr-primitives" } +primitives = { package = "substrate-primitives", path = "../primitives" } +client = { package = "substrate-client", path = "../../core/client" } +authority-discovery-primitives = { package = "substrate-authority-discovery-primitives", path = "./primitives", default-features = false } +codec = { package = "parity-scale-codec", default-features = false, version = "1.0.3" } +futures = "0.1.17" +tokio-timer = "0.2" +keystore = { package = "substrate-keystore", path = "../../core/keystore" } +libp2p = { version = "0.12.0", default-features = false, features = ["secp256k1", "libp2p-websocket"] } +serde_json = "1.0" +log = "0.4" +derive_more = "0.14.0" +prost = "0.5" +bytes = "0.4" diff --git a/core/authority-discovery/build.rs b/core/authority-discovery/build.rs new file mode 100644 index 0000000000000..ed632575f3ba8 --- /dev/null +++ b/core/authority-discovery/build.rs @@ -0,0 +1,3 @@ +fn main() { + prost_build::compile_protos(&["src/schema/dht.proto"], &["src/schema"]).unwrap(); +} diff --git a/core/authority-discovery/src/error.rs b/core/authority-discovery/src/error.rs new file mode 100644 index 0000000000000..74c671b13c79a --- /dev/null +++ b/core/authority-discovery/src/error.rs @@ -0,0 +1,24 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Authority discovery errors. + +/// AuthorityDiscovery Result. +pub type Result = std::result::Result; + +#[derive(Debug, derive_more::Display, derive_more::From)] +pub enum Error { +} diff --git a/core/authority-discovery/src/lib.rs b/core/authority-discovery/src/lib.rs new file mode 100644 index 0000000000000..65a41b127892b --- /dev/null +++ b/core/authority-discovery/src/lib.rs @@ -0,0 +1,170 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +#![warn(missing_docs)] + +//! Substrate authority discovery. +//! +//! This crate enables Substrate authorities to directly connect to other +//! authorities. [`AuthorityDiscovery`] implements the Future trait. By polling +//! a [`AuthorityDiscovery`] an authority: +//! +//! +//! 1. **Makes itself discoverable** +//! +//! 1. Retrieves its external addresses. +//! +//! 2. Adds its network peer id to the addresses. +//! +//! 3. Signs the above. +//! +//! 4. Puts the signature and the addresses on the libp2p Kademlia DHT. +//! +//! +//! 2. **Discovers other authorities** +//! +//! 1. Retrieves the current set of authorities.. +//! +//! 2. Starts DHT queries for the ids of the authorities. +//! +//! 3. Validates the signatures of the retrieved key value pairs. +//! +//! 4. Adds the retrieved external addresses as priority nodes to the +//! peerset. + +use authority_discovery_primitives::AuthorityDiscoveryApi; +use client::blockchain::HeaderBackend; +use error::{Error, Result}; +use futures::{prelude::*, sync::mpsc::Receiver}; +use log::{debug, error, log_enabled, warn}; +use network::specialization::NetworkSpecialization; +use network::{DhtEvent, ExHashT, NetworkStateInfo}; +use prost::Message; +use sr_primitives::generic::BlockId; +use sr_primitives::traits::{Block, ProvideRuntimeApi}; +use std::collections::{HashMap, HashSet}; +use std::iter::FromIterator; +use std::marker::PhantomData; +use std::sync::Arc; +use std::time::Duration; + +mod error; +/// Dht payload schemas generated from Protobuf definitions via Prost crate in build.rs. +mod schema { + include!(concat!(env!("OUT_DIR"), "/authority_discovery.rs")); +} + +/// A AuthorityDiscovery makes a given authority discoverable as well as +/// discovers other authoritys. +pub struct AuthorityDiscovery +where + B: Block + 'static, + S: NetworkSpecialization, + H: ExHashT, + AuthorityId: std::string::ToString + + codec::Codec + + std::convert::AsRef<[u8]> + + std::clone::Clone + + std::fmt::Debug + + std::hash::Hash + + std::cmp::Eq, + Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend, + ::Api: AuthorityDiscoveryApi, +{ + client: Arc, + + network: Arc>, + /// Channel we receive Dht events on. + dht_event_rx: Receiver, + + /// Interval to be proactive on, e.g. publishing own addresses or starting + /// to query for addresses. + interval: tokio_timer::Interval, + + /// The network peerset interface for priority groups lets us only set an + /// entire group, but we retrieve the addresses of other authorities one by + /// one from the network. To use the peerset interface we need to cache the + /// addresses and always overwrite the entire peerset priority group. To + /// ensure this map doesn't grow indefinitely + /// `purge_old_authorities_from_cache` function is called each time we add a + /// new entry. + address_cache: HashMap>, + + phantom_authority_id: PhantomData, +} + +impl AuthorityDiscovery +where + B: Block + 'static, + S: NetworkSpecialization, + H: ExHashT, + AuthorityId: std::string::ToString + + codec::Codec + + std::convert::AsRef<[u8]> + + std::clone::Clone + + std::fmt::Debug + + std::hash::Hash + + std::cmp::Eq, + Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend, + ::Api: AuthorityDiscoveryApi, +{ + /// Return a new authority discovery. + pub fn new( + client: Arc, + network: Arc>, + dht_event_rx: futures::sync::mpsc::Receiver, + ) -> AuthorityDiscovery { + // TODO: 5 seconds is probably a bit spammy, figure out what Kademlias + // time to live for dht entries is and adjust accordingly. + let interval = tokio_timer::Interval::new_interval(Duration::from_secs(5)); + let address_cache = HashMap::new(); + + AuthorityDiscovery { + client, + network, + dht_event_rx, + interval, + address_cache, + phantom_authority_id: PhantomData, + } + } +} + +impl futures::Future + for AuthorityDiscovery +where + B: Block + 'static, + S: NetworkSpecialization, + H: ExHashT, + AuthorityId: std::string::ToString + + codec::Codec + + std::convert::AsRef<[u8]> + + std::clone::Clone + + std::fmt::Debug + + std::hash::Hash + + std::cmp::Eq, + Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend, + ::Api: AuthorityDiscoveryApi, +{ + type Item = (); + type Error = (); + + fn poll(&mut self) -> futures::Poll { + // Make sure to always return NotReady as this is a long running task + // with the same lifetime of the node itself. + Ok(futures::Async::NotReady) + } +} diff --git a/core/authority-discovery/src/schema/dht.proto b/core/authority-discovery/src/schema/dht.proto new file mode 100644 index 0000000000000..434c54ffa645f --- /dev/null +++ b/core/authority-discovery/src/schema/dht.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package authority_discovery; + +// First we need to serialize the addresses in order to be able to sign them. +message AuthorityAddresses { + repeated string addresses = 1; +} + +// Then we need to serialize addresses and signature to send them over the wire. +message SignedAuthorityAddresses { + bytes addresses = 1; + bytes signature = 2; +} \ No newline at end of file diff --git a/core/service/Cargo.toml b/core/service/Cargo.toml index 7afd59ebc0679..7f17d83931564 100644 --- a/core/service/Cargo.toml +++ b/core/service/Cargo.toml @@ -31,12 +31,14 @@ client = { package = "substrate-client", path = "../../core/client" } client_db = { package = "substrate-client-db", path = "../../core/client/db", features = ["kvdb-rocksdb"] } codec = { package = "parity-scale-codec", version = "1.0.0" } substrate-executor = { path = "../../core/executor" } +substrate-authority-discovery = { path = "../../core/authority-discovery"} transaction_pool = { package = "substrate-transaction-pool", path = "../../core/transaction-pool" } rpc-servers = { package = "substrate-rpc-servers", path = "../../core/rpc-servers" } rpc = { package = "substrate-rpc", path = "../../core/rpc" } tel = { package = "substrate-telemetry", path = "../../core/telemetry" } offchain = { package = "substrate-offchain", path = "../../core/offchain" } parity-multiaddr = { package = "parity-multiaddr", version = "0.5.0" } +authority-discovery-primitives = { package = "substrate-authority-discovery-primitives", path = "../authority-discovery/primitives", default-features = false } [dev-dependencies] substrate-test-runtime-client = { path = "../test-runtime/client" } diff --git a/core/service/src/components.rs b/core/service/src/components.rs index a9aa2129f2498..7d7d99aee48d2 100644 --- a/core/service/src/components.rs +++ b/core/service/src/components.rs @@ -25,13 +25,14 @@ use client::{self, Client, runtime_api}; use crate::{error, Service}; use consensus_common::{import_queue::ImportQueue, SelectChain}; use network::{ - self, OnDemand, FinalityProofProvider, NetworkStateInfo, config::BoxFinalityProofRequestBuilder + self, OnDemand, FinalityProofProvider, NetworkStateInfo, config::BoxFinalityProofRequestBuilder, DhtEvent, }; use substrate_executor::{NativeExecutor, NativeExecutionDispatch}; use transaction_pool::txpool::{self, Options as TransactionPoolOptions, Pool as TransactionPool}; use sr_primitives::{ BuildStorage, traits::{Block as BlockT, Header as HeaderT, ProvideRuntimeApi}, generic::BlockId }; +use authority_discovery_primitives::AuthorityDiscoveryApi; use crate::config::Configuration; use primitives::{Blake2Hasher, H256, traits::BareCryptoStorePtr}; use rpc::{self, system::SystemInfo}; @@ -302,6 +303,40 @@ impl OffchainWorker for C where } } +pub trait AuthorityDiscovery { + fn authority_discovery( + client: Arc>, + network: Arc, S, H>>, + dht_event_rx: futures::sync::mpsc::Receiver, + ) -> Box + Send> + where + H: network::ExHashT, + S: network::specialization::NetworkSpecialization>; +} + +impl AuthorityDiscovery for C +where + ComponentClient: ProvideRuntimeApi, + as ProvideRuntimeApi>::Api: + AuthorityDiscoveryApi, ::AuthorityId>, +{ + fn authority_discovery( + client: Arc>, + network: Arc, S, H>>, + dht_event_rx: futures::sync::mpsc::Receiver, + ) -> Box + Send> + where + H: network::ExHashT, + S: network::specialization::NetworkSpecialization>, + { + Box::new(substrate_authority_discovery::AuthorityDiscovery::new( + client, + network, + dht_event_rx, + )) + } +} + /// The super trait that combines all required traits a `Service` needs to implement. pub trait ServiceTrait: Deref> @@ -310,6 +345,7 @@ pub trait ServiceTrait: + StartRpc + MaintainTransactionPool + OffchainWorker + + AuthorityDiscovery + InitialSessionKeys {} impl ServiceTrait for T where @@ -319,6 +355,7 @@ impl ServiceTrait for T where + StartRpc + MaintainTransactionPool + OffchainWorker + + AuthorityDiscovery + InitialSessionKeys {} @@ -349,12 +386,22 @@ pub trait ServiceFactory: 'static + Sized { type FullService: ServiceTrait>; /// Extended light service type. type LightService: ServiceTrait>; - /// ImportQueue for full client + /// ImportQueue for full client. type FullImportQueue: ImportQueue + 'static; - /// ImportQueue for light clients + /// ImportQueue for light clients. type LightImportQueue: ImportQueue + 'static; - /// The Fork Choice Strategy for the chain + /// The Fork Choice Strategy for the chain. type SelectChain: SelectChain + 'static; + /// Authority identifier used for authority-discovery module. + // TODO: Are all of these trait bounds necessary? + type AuthorityId: codec::Codec + + std::clone::Clone + + std::cmp::Eq + + std::convert::AsRef<[u8]> + + std::fmt::Debug + + std::hash::Hash + + std::marker::Send + + std::string::ToString; //TODO: replace these with a constructor trait. that TransactionPool implements. (#1242) /// Extrinsic pool constructor for the full client. diff --git a/core/service/src/lib.rs b/core/service/src/lib.rs index 0149937c9c1a5..ba4a3036a4599 100644 --- a/core/service/src/lib.rs +++ b/core/service/src/lib.rs @@ -46,6 +46,7 @@ use sr_primitives::traits::{Header, NumberFor, SaturatedConversion}; use substrate_executor::NativeExecutor; use sysinfo::{get_current_pid, ProcessExt, System, SystemExt}; use tel::{telemetry, SUBSTRATE_INFO}; +use crate::components::AuthorityDiscovery; pub use self::error::Error; pub use config::{Configuration, Roles, PruningMode}; @@ -392,12 +393,26 @@ impl Service { let rpc_handlers = gen_handler(); let rpc = start_rpc_servers(&config, gen_handler)?; + // Use bounded channel to ensure back-pressure. Authority discovery is + // triggering one event per authority within the current authority set. This + // estimates the authority set size to be somewhere below 10 000 thereby + // setting the channel buffer size to 10 000. + let (dht_event_tx, dht_event_rx) = + mpsc::channel::(10000); + let authority_discovery = Components::RuntimeServices::authority_discovery( + client.clone(), + network.clone(), + dht_event_rx, + ); + let _ = to_spawn_tx.unbounded_send(authority_discovery); + let _ = to_spawn_tx.unbounded_send(Box::new(build_network_future( network_mut, client.clone(), network_status_sinks.clone(), system_rpc_rx, - has_bootnodes + has_bootnodes, + dht_event_tx, ) .map_err(|_| ()) .select(exit.clone()) @@ -633,6 +648,7 @@ fn build_network_future< status_sinks: Arc, NetworkState)>>>>, rpc_rx: futures03::channel::mpsc::UnboundedReceiver>, should_have_peers: bool, + mut dht_events_tx: mpsc::Sender, ) -> impl Future { // Compatibility shim while we're transitionning to stable Futures. // See https://github.com/paritytech/substrate/issues/3099 @@ -711,7 +727,17 @@ fn build_network_future< while let Ok(Async::Ready(Some(Event::Dht(event)))) = network.poll().map_err(|err| { warn!(target: "service", "Error in network: {:?}", err); }) { - // Ignore for now. + // Given that core/authority-discovery is the only upper stack consumer of Dht events at the moment, all Dht + // events are being passed on to the authority-discovery module. In the future there might be multiple + // consumers of these events. In that case this would need to be refactored to properly dispatch the events, + // e.g. via a subscriber model. + if let Err(e) = dht_events_tx.try_send(event) { + if e.is_full() { + warn!(target: "service", "Dht event channel to authority discovery is full, dropping event."); + } else if e.is_disconnected() { + warn!(target: "service", "Dht event channel to authority discovery is disconnected, dropping event."); + } + } }; // Now some diagnostic for performances. @@ -1019,6 +1045,7 @@ macro_rules! construct_service_factory { FinalityProofProvider = { $( $finality_proof_provider_init:tt )* }, RpcExtensions = $rpc_extensions_ty:ty $( { $( $rpc_extensions:tt )* } )?, + AuthorityId = $authority_id:ty, } ) => { $( #[$attr] )* @@ -1040,6 +1067,7 @@ macro_rules! construct_service_factory { type LightImportQueue = $light_import_queue; type SelectChain = $select_chain; type RpcExtensions = $rpc_extensions_ty; + type AuthorityId = $authority_id; fn build_full_transaction_pool( config: $crate::TransactionPoolOptions, diff --git a/node/cli/src/service.rs b/node/cli/src/service.rs index 7022d12d69a0f..15796a147adce 100644 --- a/node/cli/src/service.rs +++ b/node/cli/src/service.rs @@ -278,7 +278,6 @@ construct_service_factory! { FinalityProofProvider = { |client: Arc>| { Ok(Some(Arc::new(GrandpaFinalityProofProvider::new(client.clone(), client)) as _)) }}, - RpcExtensions = jsonrpc_core::IoHandler { |client, pool| { use node_rpc::accounts::{Accounts, AccountsApi}; @@ -289,6 +288,7 @@ construct_service_factory! { ); io }}, + AuthorityId = im_online::AuthorityId, } } From 7de7db7522a07a69cffbe79b69bb427c887a02b4 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 21 Aug 2019 15:34:05 +0200 Subject: [PATCH 04/20] core/authority-discovery: Implement module logic itself --- core/authority-discovery/src/error.rs | 13 ++ core/authority-discovery/src/lib.rs | 196 ++++++++++++++++++++++++++ 2 files changed, 209 insertions(+) diff --git a/core/authority-discovery/src/error.rs b/core/authority-discovery/src/error.rs index 74c671b13c79a..a9034aea3056c 100644 --- a/core/authority-discovery/src/error.rs +++ b/core/authority-discovery/src/error.rs @@ -21,4 +21,17 @@ pub type Result = std::result::Result; #[derive(Debug, derive_more::Display, derive_more::From)] pub enum Error { + RetrievingAuthorityId, + VerifyingDhtPayload, + HashingAuthorityId(libp2p::core::multiaddr::multihash::EncodeError), + CallingRuntime(client::error::Error), + SigningDhtPayload, + /// From the Dht we only get the hashed authority id. In order to retrieve the actual authority id and to ensure it + /// is actually an authority, we match the hash against the hash of the authority id of all other authorities. This + /// error is the result of the above failing. + MatchingHashedAuthorityIdWithAuthorityId, + SettingPeersetPriorityGroup(String), + Encoding(prost::EncodeError), + Decoding(prost::DecodeError), + ParsingMultiaddress(libp2p::core::multiaddr::Error), } diff --git a/core/authority-discovery/src/lib.rs b/core/authority-discovery/src/lib.rs index 65a41b127892b..d2e3b004d5d49 100644 --- a/core/authority-discovery/src/lib.rs +++ b/core/authority-discovery/src/lib.rs @@ -141,6 +141,178 @@ where phantom_authority_id: PhantomData, } } + + fn publish_own_ext_addresses(&mut self) -> Result<()> { + let id = BlockId::hash(self.client.info().best_hash); + + let authority_id = self + .client + .runtime_api() + .authority_id(&id) + .map_err(Error::CallingRuntime)? + .ok_or(Error::RetrievingAuthorityId)?; + + let addresses = self + .network + .external_addresses() + .into_iter() + .map(|mut a| { + a.push(libp2p::core::multiaddr::Protocol::P2p( + self.network.peer_id().into(), + )); + a + }) + .map(|a| a.to_string()) + .collect(); + + let mut serialized_addresses = vec![]; + { + let mut a = schema::AuthorityAddresses::default(); + a.addresses = addresses; + a.encode(&mut serialized_addresses) + .map_err(Error::Encoding)?; + }; + + let sig = self + .client + .runtime_api() + .sign(&id, serialized_addresses.clone(), authority_id.clone()) + .map_err(Error::CallingRuntime)? + .ok_or(Error::SigningDhtPayload)?; + + let mut signed_addresses = vec![]; + { + let mut a = schema::SignedAuthorityAddresses::default(); + a.addresses = serialized_addresses; + a.signature = sig; + a.encode(&mut signed_addresses).map_err(Error::Encoding)?; + }; + + self.network + .put_value(hash_authority_id(authority_id.as_ref())?, signed_addresses); + + Ok(()) + } + + fn request_addresses_of_others(&mut self) -> Result<()> { + let id = BlockId::hash(self.client.info().best_hash); + + let authorities = self + .client + .runtime_api() + .authorities(&id) + .map_err(Error::CallingRuntime)?; + + for authority_id in authorities.iter() { + self.network + .get_value(&hash_authority_id(authority_id.as_ref())?); + } + + Ok(()) + } + + fn handle_dht_events(&mut self) -> Result<()> { + while let Ok(Async::Ready(Some(event))) = self.dht_event_rx.poll() { + match event { + DhtEvent::ValueFound(v) => { + if log_enabled!(log::Level::Debug) { + let hashes = v.iter().map(|(hash, _value)| hash.clone()); + debug!(target: "sub-authority-discovery", "Value for hash '{:?}' found on Dht.", hashes); + } + + self.handle_dht_value_found_event(v)?; + } + DhtEvent::ValueNotFound(hash) => { + warn!(target: "sub-authority-discovery", "Value for hash '{:?}' not found on Dht.", hash) + } + DhtEvent::ValuePut(hash) => { + debug!(target: "sub-authority-discovery", "Successfully put hash '{:?}' on Dht.", hash) + } + DhtEvent::ValuePutFailed(hash) => { + warn!(target: "sub-authority-discovery", "Failed to put hash '{:?}' on Dht.", hash) + } + } + } + + Ok(()) + } + + fn handle_dht_value_found_event( + &mut self, + values: Vec<(libp2p::kad::record::Key, Vec)>, + ) -> Result<()> { + println!("==== dht found handling, cache: {:?}", self.address_cache); + let id = BlockId::hash(self.client.info().best_hash); + + // From the Dht we only get the hashed authority id. In order to retrieve the actual authority id and to ensure + // it is actually an authority, we match the hash against the hash of the authority id of all other authorities. + let authorities = self.client.runtime_api().authorities(&id)?; + self.purge_old_authorities_from_cache(&authorities); + + let authorities = authorities + .into_iter() + .map(|a| hash_authority_id(a.as_ref()).map(|h| (h, a))) + .collect::>>()?; + + for (key, value) in values.iter() { + // Check if the event origins from an authority in the current + // authority set. + let authority_pub_key: &AuthorityId = authorities + .get(key) + .ok_or(Error::MatchingHashedAuthorityIdWithAuthorityId)?; + + let signed_addresses = + schema::SignedAuthorityAddresses::decode(value).map_err(Error::Decoding)?; + + let is_verified = self + .client + .runtime_api() + .verify( + &id, + signed_addresses.addresses.clone(), + signed_addresses.signature.clone(), + authority_pub_key.clone(), + ) + .map_err(Error::CallingRuntime)?; + + if !is_verified { + return Err(Error::VerifyingDhtPayload); + } + + let addresses: Vec = + schema::AuthorityAddresses::decode(signed_addresses.addresses) + .map(|a| a.addresses) + .map_err(Error::Decoding)? + .into_iter() + .map(|a| a.parse()) + .collect::>() + .map_err(Error::ParsingMultiaddress)?; + + self.address_cache + .insert(authority_pub_key.clone(), addresses); + } + + // Let's update the peerset priority group with the all the addresses we + // have in our cache. + + let addresses = HashSet::from_iter( + self.address_cache + .iter() + .map(|(_peer_id, addresses)| addresses.clone()) + .flatten(), + ); + + self.network + .set_priority_group("authorities".to_string(), addresses) + .map_err(Error::SettingPeersetPriorityGroup)?; + + Ok(()) + } + + fn purge_old_authorities_from_cache(&mut self, authorities: &Vec) { + self.address_cache + .retain(|peer_id, _addresses| authorities.contains(peer_id)) + } } impl futures::Future @@ -163,8 +335,32 @@ where type Error = (); fn poll(&mut self) -> futures::Poll { + let mut inner = || -> Result<()> { + // Process incoming events before triggering new ones. + self.handle_dht_events()?; + + while let Ok(Async::Ready(_)) = self.interval.poll() { + self.publish_own_ext_addresses()?; + + self.request_addresses_of_others()?; + } + + Ok(()) + }; + + match inner() { + Ok(()) => {} + Err(e) => error!(target: "sub-authority-discovery", "Poll failure: {:?}", e), + } + // Make sure to always return NotReady as this is a long running task // with the same lifetime of the node itself. Ok(futures::Async::NotReady) } } + +fn hash_authority_id(id: &[u8]) -> Result<(libp2p::kad::record::Key)> { + libp2p::multihash::encode(libp2p::multihash::Hash::SHA2256, id) + .map(|k| libp2p::kad::record::Key::new(&k)) + .map_err(Error::HashingAuthorityId) +} From ed294de204aad175108d24ae5034f56dfb589114 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 22 Aug 2019 14:12:57 +0200 Subject: [PATCH 05/20] core/network: Finish NetworkWoker if NetworkService stream finished When the channel from network service to network worker is finished, also finish the network worker stream given that both network worker and network service have the same lifetime. --- core/network/src/service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/network/src/service.rs b/core/network/src/service.rs index 9af39e1e7d461..ac6bd1ac05dd5 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -636,7 +636,7 @@ impl, H: ExHashT> Stream for Ne // Process the next message coming from the `NetworkService`. let msg = match self.from_worker.poll() { Ok(Async::Ready(Some(msg))) => msg, - Ok(Async::Ready(None)) | Err(_) => return Ok(Async::NotReady), + Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(None)), Ok(Async::NotReady) => break, }; From fbc3a5b187326aace5b9798738df1b4f409430d8 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 22 Aug 2019 15:56:19 +0200 Subject: [PATCH 06/20] core/authority-discovery: Ensure being woken up on next interval tick Authority discovery implements the future trait. Polling this future always returns NotReady. The future uses Tokios timer crate to become proactive periodically, e.g. advertising its own external addresses. This commit ensures that the underlying Tokio task is always registered at the Tokio Reactor to be woken up on the next interval tick. This is achieved by making sure `interval.poll` returns `NotReady` at least once within each `AuthorityDiscovery.poll` execution. --- core/authority-discovery/src/lib.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/core/authority-discovery/src/lib.rs b/core/authority-discovery/src/lib.rs index d2e3b004d5d49..1f7029c09d94d 100644 --- a/core/authority-discovery/src/lib.rs +++ b/core/authority-discovery/src/lib.rs @@ -339,9 +339,14 @@ where // Process incoming events before triggering new ones. self.handle_dht_events()?; - while let Ok(Async::Ready(_)) = self.interval.poll() { - self.publish_own_ext_addresses()?; + if let Ok(Async::Ready(_)) = self.interval.poll() { + // Make sure to call interval.poll until it returns Async::NotReady once. Otherwise, in case one of the + // function calls within this block do a `return`, we don't call `interval.poll` again and thereby the + // underlying Tokio task is never registered with Tokios Reactor to be woken up on the next interval + // tick. + while let Ok(Async::Ready(_)) = self.interval.poll() {} + self.publish_own_ext_addresses()?; self.request_addresses_of_others()?; } @@ -351,7 +356,7 @@ where match inner() { Ok(()) => {} Err(e) => error!(target: "sub-authority-discovery", "Poll failure: {:?}", e), - } + }; // Make sure to always return NotReady as this is a long running task // with the same lifetime of the node itself. From 9de2655f3dc812a9b59d22b913513ca75cba9fb1 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 27 Aug 2019 15:26:54 +0200 Subject: [PATCH 07/20] core/authority-discovery: Adjust interval to be proactive on Kademlia's default time-to-live for Dht records is 36h, republishing records every 24h. Given that a node could restart at any point in time, one can not depend on the republishing process, thus starting to publish own external addresses should happen on an interval < 36h. In addition have the first tick of the interval be at the beginning not after an interval duration. --- core/authority-discovery/src/lib.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/core/authority-discovery/src/lib.rs b/core/authority-discovery/src/lib.rs index 1f7029c09d94d..b3882297c6f75 100644 --- a/core/authority-discovery/src/lib.rs +++ b/core/authority-discovery/src/lib.rs @@ -20,7 +20,7 @@ //! //! This crate enables Substrate authorities to directly connect to other //! authorities. [`AuthorityDiscovery`] implements the Future trait. By polling -//! a [`AuthorityDiscovery`] an authority: +//! [`AuthorityDiscovery`] an authority: //! //! //! 1. **Makes itself discoverable** @@ -59,7 +59,7 @@ use std::collections::{HashMap, HashSet}; use std::iter::FromIterator; use std::marker::PhantomData; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; mod error; /// Dht payload schemas generated from Protobuf definitions via Prost crate in build.rs. @@ -127,9 +127,12 @@ where network: Arc>, dht_event_rx: futures::sync::mpsc::Receiver, ) -> AuthorityDiscovery { - // TODO: 5 seconds is probably a bit spammy, figure out what Kademlias - // time to live for dht entries is and adjust accordingly. - let interval = tokio_timer::Interval::new_interval(Duration::from_secs(5)); + // Kademlia's default time-to-live for Dht records is 36h, republishing records every 24h. Given that a node + // could restart at any point in time, one can not depend on the republishing process, thus starting to publish + // own external addresses should happen on an interval < 36h. + // TODO: It might make sense to split this up into a publication_interval and a retrieval_interval. + let interval = + tokio_timer::Interval::new(Instant::now(), Duration::from_secs(12 * 60 * 60)); let address_cache = HashMap::new(); AuthorityDiscovery { @@ -342,11 +345,12 @@ where if let Ok(Async::Ready(_)) = self.interval.poll() { // Make sure to call interval.poll until it returns Async::NotReady once. Otherwise, in case one of the // function calls within this block do a `return`, we don't call `interval.poll` again and thereby the - // underlying Tokio task is never registered with Tokios Reactor to be woken up on the next interval + // underlying Tokio task is never registered with Tokio's Reactor to be woken up on the next interval // tick. while let Ok(Async::Ready(_)) = self.interval.poll() {} self.publish_own_ext_addresses()?; + // TODO: Should we request external addresses more often than we publish our owns? self.request_addresses_of_others()?; } @@ -359,7 +363,7 @@ where }; // Make sure to always return NotReady as this is a long running task - // with the same lifetime of the node itself. + // with the same lifetime as the node itself. Ok(futures::Async::NotReady) } } From 5d1042d73e8a568bd9c692763b8db9fe7f0e1a1f Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 27 Aug 2019 15:33:55 +0200 Subject: [PATCH 08/20] core/authority-discovery: Implement unit tests Abstract NetworkService via NetworkProvider trait to be able to mock it within the unit tests. In addition add basic unit test for `publish_own_ext_addresses`, `request_addresses_of_others` and `handle_dht_events`. --- Cargo.lock | 4 + core/authority-discovery/Cargo.toml | 6 + core/authority-discovery/src/lib.rs | 455 ++++++++++++++++++++++++---- 3 files changed, 402 insertions(+), 63 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c8f193c2c4eb3..24faabcf7852b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4475,6 +4475,7 @@ dependencies = [ "libp2p 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", "parity-scale-codec 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "prost-build 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.40 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4483,7 +4484,10 @@ dependencies = [ "substrate-client 2.0.0", "substrate-keystore 2.0.0", "substrate-network 2.0.0", + "substrate-peerset 2.0.0", "substrate-primitives 2.0.0", + "substrate-test-runtime-client 2.0.0", + "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/core/authority-discovery/Cargo.toml b/core/authority-discovery/Cargo.toml index a450532a48370..0a94830ead4cb 100644 --- a/core/authority-discovery/Cargo.toml +++ b/core/authority-discovery/Cargo.toml @@ -24,3 +24,9 @@ log = "0.4" derive_more = "0.14.0" prost = "0.5" bytes = "0.4" + +[dev-dependencies] +test-client = { package = "substrate-test-runtime-client", path = "../../core/test-runtime/client" } +peerset = { package = "substrate-peerset", path = "../../core/peerset" } +parking_lot = { version = "0.9.0" } +tokio = { version = "0.1.11"} diff --git a/core/authority-discovery/src/lib.rs b/core/authority-discovery/src/lib.rs index b3882297c6f75..c9a53aedbbd28 100644 --- a/core/authority-discovery/src/lib.rs +++ b/core/authority-discovery/src/lib.rs @@ -18,9 +18,8 @@ //! Substrate authority discovery. //! -//! This crate enables Substrate authorities to directly connect to other -//! authorities. [`AuthorityDiscovery`] implements the Future trait. By polling -//! [`AuthorityDiscovery`] an authority: +//! This crate enables Substrate authorities to directly connect to other authorities. [`AuthorityDiscovery`] implements +//! the Future trait. By polling [`AuthorityDiscovery`] an authority: //! //! //! 1. **Makes itself discoverable** @@ -42,8 +41,7 @@ //! //! 3. Validates the signatures of the retrieved key value pairs. //! -//! 4. Adds the retrieved external addresses as priority nodes to the -//! peerset. +//! 4. Adds the retrieved external addresses as priority nodes to the peerset. use authority_discovery_primitives::AuthorityDiscoveryApi; use client::blockchain::HeaderBackend; @@ -51,10 +49,10 @@ use error::{Error, Result}; use futures::{prelude::*, sync::mpsc::Receiver}; use log::{debug, error, log_enabled, warn}; use network::specialization::NetworkSpecialization; -use network::{DhtEvent, ExHashT, NetworkStateInfo}; +use network::{DhtEvent, ExHashT}; use prost::Message; use sr_primitives::generic::BlockId; -use sr_primitives::traits::{Block, ProvideRuntimeApi}; +use sr_primitives::traits::{Block as BlockT, ProvideRuntimeApi}; use std::collections::{HashMap, HashSet}; use std::iter::FromIterator; use std::marker::PhantomData; @@ -67,13 +65,11 @@ mod schema { include!(concat!(env!("OUT_DIR"), "/authority_discovery.rs")); } -/// A AuthorityDiscovery makes a given authority discoverable as well as -/// discovers other authoritys. -pub struct AuthorityDiscovery +/// A AuthorityDiscovery makes a given authority discoverable as well as discovers other authorities. +pub struct AuthorityDiscovery where - B: Block + 'static, - S: NetworkSpecialization, - H: ExHashT, + Block: BlockT + 'static, + Network: NetworkProvider, AuthorityId: std::string::ToString + codec::Codec + std::convert::AsRef<[u8]> @@ -81,36 +77,32 @@ where + std::fmt::Debug + std::hash::Hash + std::cmp::Eq, - Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend, - ::Api: AuthorityDiscoveryApi, + Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend, + ::Api: AuthorityDiscoveryApi, { client: Arc, - network: Arc>, + network: Arc, /// Channel we receive Dht events on. dht_event_rx: Receiver, - /// Interval to be proactive on, e.g. publishing own addresses or starting - /// to query for addresses. + /// Interval to be proactive on, e.g. publishing own addresses or starting to query for addresses. interval: tokio_timer::Interval, - /// The network peerset interface for priority groups lets us only set an - /// entire group, but we retrieve the addresses of other authorities one by - /// one from the network. To use the peerset interface we need to cache the - /// addresses and always overwrite the entire peerset priority group. To - /// ensure this map doesn't grow indefinitely - /// `purge_old_authorities_from_cache` function is called each time we add a - /// new entry. + /// The network peerset interface for priority groups lets us only set an entire group, but we retrieve the + /// addresses of other authorities one by one from the network. To use the peerset interface we need to cache the + /// addresses and always overwrite the entire peerset priority group. To ensure this map doesn't grow indefinitely + /// `purge_old_authorities_from_cache` function is called each time we add a new entry. address_cache: HashMap>, phantom_authority_id: PhantomData, + phantom_block: PhantomData, } -impl AuthorityDiscovery +impl AuthorityDiscovery where - B: Block + 'static, - S: NetworkSpecialization, - H: ExHashT, + Block: BlockT + 'static, + Network: NetworkProvider, AuthorityId: std::string::ToString + codec::Codec + std::convert::AsRef<[u8]> @@ -118,15 +110,15 @@ where + std::fmt::Debug + std::hash::Hash + std::cmp::Eq, - Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend, - ::Api: AuthorityDiscoveryApi, + Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend, + ::Api: AuthorityDiscoveryApi, { /// Return a new authority discovery. pub fn new( client: Arc, - network: Arc>, + network: Arc, dht_event_rx: futures::sync::mpsc::Receiver, - ) -> AuthorityDiscovery { + ) -> AuthorityDiscovery { // Kademlia's default time-to-live for Dht records is 36h, republishing records every 24h. Given that a node // could restart at any point in time, one can not depend on the republishing process, thus starting to publish // own external addresses should happen on an interval < 36h. @@ -142,6 +134,7 @@ where interval, address_cache, phantom_authority_id: PhantomData, + phantom_block: PhantomData, } } @@ -161,7 +154,7 @@ where .into_iter() .map(|mut a| { a.push(libp2p::core::multiaddr::Protocol::P2p( - self.network.peer_id().into(), + self.network.local_peer_id().into(), )); a }) @@ -169,12 +162,11 @@ where .collect(); let mut serialized_addresses = vec![]; - { - let mut a = schema::AuthorityAddresses::default(); - a.addresses = addresses; - a.encode(&mut serialized_addresses) - .map_err(Error::Encoding)?; - }; + schema::AuthorityAddresses { + addresses: addresses, + } + .encode(&mut serialized_addresses) + .map_err(Error::Encoding)?; let sig = self .client @@ -184,12 +176,12 @@ where .ok_or(Error::SigningDhtPayload)?; let mut signed_addresses = vec![]; - { - let mut a = schema::SignedAuthorityAddresses::default(); - a.addresses = serialized_addresses; - a.signature = sig; - a.encode(&mut signed_addresses).map_err(Error::Encoding)?; - }; + schema::SignedAuthorityAddresses { + addresses: serialized_addresses, + signature: sig, + } + .encode(&mut signed_addresses) + .map_err(Error::Encoding)?; self.network .put_value(hash_authority_id(authority_id.as_ref())?, signed_addresses); @@ -244,7 +236,7 @@ where &mut self, values: Vec<(libp2p::kad::record::Key, Vec)>, ) -> Result<()> { - println!("==== dht found handling, cache: {:?}", self.address_cache); + println!("==== Dht found handling, cache: {:?}", self.address_cache); let id = BlockId::hash(self.client.info().best_hash); // From the Dht we only get the hashed authority id. In order to retrieve the actual authority id and to ensure @@ -258,8 +250,7 @@ where .collect::>>()?; for (key, value) in values.iter() { - // Check if the event origins from an authority in the current - // authority set. + // Check if the event origins from an authority in the current authority set. let authority_pub_key: &AuthorityId = authorities .get(key) .ok_or(Error::MatchingHashedAuthorityIdWithAuthorityId)?; @@ -295,8 +286,7 @@ where .insert(authority_pub_key.clone(), addresses); } - // Let's update the peerset priority group with the all the addresses we - // have in our cache. + // Let's update the peerset priority group with the all the addresses we have in our cache. let addresses = HashSet::from_iter( self.address_cache @@ -312,18 +302,17 @@ where Ok(()) } - fn purge_old_authorities_from_cache(&mut self, authorities: &Vec) { + fn purge_old_authorities_from_cache(&mut self, current_authorities: &Vec) { self.address_cache - .retain(|peer_id, _addresses| authorities.contains(peer_id)) + .retain(|peer_id, _addresses| current_authorities.contains(peer_id)) } } -impl futures::Future - for AuthorityDiscovery +impl futures::Future + for AuthorityDiscovery where - B: Block + 'static, - S: NetworkSpecialization, - H: ExHashT, + Block: BlockT + 'static, + Network: NetworkProvider, AuthorityId: std::string::ToString + codec::Codec + std::convert::AsRef<[u8]> @@ -331,8 +320,8 @@ where + std::fmt::Debug + std::hash::Hash + std::cmp::Eq, - Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend, - ::Api: AuthorityDiscoveryApi, + Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend, + ::Api: AuthorityDiscoveryApi, { type Item = (); type Error = (); @@ -362,14 +351,354 @@ where Err(e) => error!(target: "sub-authority-discovery", "Poll failure: {:?}", e), }; - // Make sure to always return NotReady as this is a long running task - // with the same lifetime as the node itself. + // Make sure to always return NotReady as this is a long running task with the same lifetime as the node itself. Ok(futures::Async::NotReady) } } -fn hash_authority_id(id: &[u8]) -> Result<(libp2p::kad::record::Key)> { +/// NetworkProvider provides AuthorityDiscovery with all necessary hooks into the underlying Substrate networking. Using +/// this trait abstraction instead of NetworkService directly is necessary to unit test AuthorityDiscovery. +pub trait NetworkProvider { + /// Returns the local external addresses. + fn external_addresses(&self) -> Vec; + + /// Returns the network identity of the node. + fn local_peer_id(&self) -> libp2p::PeerId; + + /// Modify a peerset priority group. + fn set_priority_group( + &self, + group_id: String, + peers: HashSet, + ) -> std::result::Result<(), String>; + + /// Start putting a value in the Dht. + fn put_value(&self, key: libp2p::kad::record::Key, value: Vec); + + /// Start getting a value from the Dht. + fn get_value(&self, key: &libp2p::kad::record::Key); +} + +impl NetworkProvider for network::NetworkService +where + B: BlockT + 'static, + S: NetworkSpecialization, + H: ExHashT, +{ + fn external_addresses(&self) -> Vec { + self.external_addresses() + } + fn local_peer_id(&self) -> libp2p::PeerId { + self.local_peer_id() + } + fn set_priority_group( + &self, + group_id: String, + peers: HashSet, + ) -> std::result::Result<(), String> { + self.set_priority_group(group_id, peers) + } + fn put_value(&self, key: libp2p::kad::record::Key, value: Vec) { + self.put_value(key, value) + } + fn get_value(&self, key: &libp2p::kad::record::Key) { + self.get_value(key) + } +} + +fn hash_authority_id(id: &[u8]) -> Result { libp2p::multihash::encode(libp2p::multihash::Hash::SHA2256, id) .map(|k| libp2p::kad::record::Key::new(&k)) .map_err(Error::HashingAuthorityId) } + +#[cfg(test)] +mod tests { + use super::*; + use client::runtime_api::{ApiExt, Core, RuntimeVersion}; + use futures::future::poll_fn; + use primitives::{ExecutionContext, NativeOrEncoded}; + use sr_primitives::traits::Zero; + use sr_primitives::traits::{ApiRef, Block as BlockT, NumberFor, ProvideRuntimeApi}; + use std::sync::{Arc, Mutex}; + use test_client::runtime::Block; + use tokio::runtime::current_thread; + + #[derive(Clone)] + struct TestApi {} + + impl ProvideRuntimeApi for TestApi { + type Api = RuntimeApi; + + fn runtime_api<'a>(&'a self) -> ApiRef<'a, Self::Api> { + RuntimeApi {}.into() + } + } + + /// Blockchain database header backend. Does not perform any validation. + impl HeaderBackend for TestApi { + fn header( + &self, + _id: BlockId, + ) -> std::result::Result, client::error::Error> { + Ok(None) + } + + fn info(&self) -> client::blockchain::Info { + client::blockchain::Info { + best_hash: Default::default(), + best_number: Zero::zero(), + finalized_hash: Default::default(), + finalized_number: Zero::zero(), + genesis_hash: Default::default(), + } + } + + fn status( + &self, + _id: BlockId, + ) -> std::result::Result { + Ok(client::blockchain::BlockStatus::Unknown) + } + + fn number( + &self, + _hash: Block::Hash, + ) -> std::result::Result>, client::error::Error> { + Ok(None) + } + + fn hash( + &self, + _number: NumberFor, + ) -> std::result::Result, client::error::Error> { + Ok(None) + } + } + + struct RuntimeApi {} + + impl Core for RuntimeApi { + fn Core_version_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + _: Option<()>, + _: Vec, + ) -> std::result::Result, client::error::Error> { + unimplemented!("Not required for testing!") + } + + fn Core_execute_block_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + _: Option<(Block)>, + _: Vec, + ) -> std::result::Result, client::error::Error> { + unimplemented!("Not required for testing!") + } + + fn Core_initialize_block_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + _: Option<&::Header>, + _: Vec, + ) -> std::result::Result, client::error::Error> { + unimplemented!("Not required for testing!") + } + } + + impl ApiExt for RuntimeApi { + fn map_api_result std::result::Result, R, E>( + &self, + _: F, + ) -> std::result::Result { + unimplemented!("Not required for testing!") + } + + fn runtime_version_at( + &self, + _: &BlockId, + ) -> std::result::Result { + unimplemented!("Not required for testing!") + } + + fn record_proof(&mut self) { + unimplemented!("Not required for testing!") + } + + fn extract_proof(&mut self) -> Option>> { + unimplemented!("Not required for testing!") + } + } + + impl AuthorityDiscoveryApi for RuntimeApi { + fn AuthorityDiscoveryApi_authority_id_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + _: Option<()>, + _: Vec, + ) -> std::result::Result>, client::error::Error> { + return Ok(NativeOrEncoded::Native(Some("test".to_string()))); + } + fn AuthorityDiscoveryApi_authorities_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + _: Option<()>, + _: Vec, + ) -> std::result::Result>, client::error::Error> { + return Ok(NativeOrEncoded::Native(vec![ + "test-authority-id-1".to_string(), + "test-authority-id-2".to_string(), + ])); + } + fn AuthorityDiscoveryApi_sign_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + _: Option<(std::vec::Vec, String)>, + _: Vec, + ) -> std::result::Result>>, client::error::Error> { + return Ok(NativeOrEncoded::Native(Some( + "test-signature-1".as_bytes().to_vec(), + ))); + } + fn AuthorityDiscoveryApi_verify_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + args: Option<(Vec, Vec, String)>, + _: Vec, + ) -> std::result::Result, client::error::Error> { + if args.unwrap().1 == "test-signature-1".as_bytes() { + return Ok(NativeOrEncoded::Native(true)); + } + return Ok(NativeOrEncoded::Native(false)); + } + } + + #[derive(Default)] + struct TestNetwork { + // Whenever functions on `TestNetwork` are called, the function arguments are added to the vectors below. + pub put_value_call: Arc)>>>, + pub get_value_call: Arc>>, + pub set_priority_group_call: Arc)>>>, + } + + impl NetworkProvider for TestNetwork { + fn external_addresses(&self) -> Vec { + vec![] + } + fn local_peer_id(&self) -> libp2p::PeerId { + libp2p::PeerId::random() + } + fn set_priority_group( + &self, + group_id: String, + peers: HashSet, + ) -> std::result::Result<(), String> { + self.set_priority_group_call + .lock() + .unwrap() + .push((group_id, peers)); + Ok(()) + } + fn put_value(&self, key: libp2p::kad::record::Key, value: Vec) { + self.put_value_call.lock().unwrap().push((key, value)); + } + fn get_value(&self, key: &libp2p::kad::record::Key) { + self.get_value_call.lock().unwrap().push(key.clone()); + } + } + + #[test] + fn publish_own_ext_addresses_puts_record_on_dht() { + let (_dht_event_tx, dht_event_rx) = futures::sync::mpsc::channel(1000); + let test_api = Arc::new(TestApi {}); + let network: Arc = Arc::new(Default::default()); + + let mut authority_discovery = + AuthorityDiscovery::new(test_api, network.clone(), dht_event_rx); + + authority_discovery.publish_own_ext_addresses().unwrap(); + + // Expect authority discovery to put a new record onto the dht. + assert_eq!(network.put_value_call.lock().unwrap().len(), 1); + } + + #[test] + fn request_addresses_of_others_triggers_dht_get_query() { + let (_dht_event_tx, dht_event_rx) = futures::sync::mpsc::channel(1000); + let test_api = Arc::new(TestApi {}); + let network: Arc = Arc::new(Default::default()); + + let mut authority_discovery = + AuthorityDiscovery::new(test_api, network.clone(), dht_event_rx); + + authority_discovery.request_addresses_of_others().unwrap(); + + // Expect authority discovery to request new records from the dht. + assert_eq!(network.get_value_call.lock().unwrap().len(), 2); + } + + #[test] + fn handle_dht_events_with_value_found_should_call_set_priority_group() { + // Create authority discovery. + + let (mut dht_event_tx, dht_event_rx) = futures::sync::mpsc::channel(1000); + let test_api = Arc::new(TestApi {}); + let network: Arc = Arc::new(Default::default()); + + let mut authority_discovery = + AuthorityDiscovery::new(test_api, network.clone(), dht_event_rx); + + // Create sample dht event. + + let authority_id_1 = hash_authority_id("test-authority-id-1".as_bytes()).unwrap(); + let address_1 = "/ip6/2001:db8::".to_string(); + + let mut serialized_addresses = vec![]; + schema::AuthorityAddresses { + addresses: vec![address_1.clone()], + } + .encode(&mut serialized_addresses) + .unwrap(); + + let mut signed_addresses = vec![]; + schema::SignedAuthorityAddresses { + addresses: serialized_addresses, + signature: "test-signature-1".as_bytes().to_vec(), + } + .encode(&mut signed_addresses) + .unwrap(); + + let dht_event = network::DhtEvent::ValueFound(vec![(authority_id_1, signed_addresses)]); + dht_event_tx.try_send(dht_event).unwrap(); + + // Make authority discovery handle the event. + + let f = || { + authority_discovery.handle_dht_events().unwrap(); + + // Expect authority discovery to set the priority set. + assert_eq!(network.set_priority_group_call.lock().unwrap().len(), 1); + + assert_eq!( + network.set_priority_group_call.lock().unwrap()[0], + ( + "authorities".to_string(), + HashSet::from_iter(vec![address_1.parse().unwrap()].into_iter()) + ) + ); + + Ok(Async::Ready(())) + }; + + let mut runtime = current_thread::Runtime::new().unwrap(); + runtime.block_on(poll_fn::<(), (), _>(f)).unwrap(); + } +} From 4421bade9a8f90cdd46a114ae586bc071b7d60f4 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 29 Aug 2019 11:35:48 +0200 Subject: [PATCH 09/20] core/authority-discovery: Publish and query on different intervals Split the global `AuthorityDiscovery.interval` into two intervals: `publish_interval` and `query_interval`. Dht entries of other authorities can change at any point in time. Thereby one should query more often than publish. --- core/authority-discovery/src/lib.rs | 40 ++++++++++++++----- core/authority-discovery/src/schema/dht.proto | 2 +- 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/core/authority-discovery/src/lib.rs b/core/authority-discovery/src/lib.rs index c9a53aedbbd28..2c1687cda2029 100644 --- a/core/authority-discovery/src/lib.rs +++ b/core/authority-discovery/src/lib.rs @@ -86,8 +86,10 @@ where /// Channel we receive Dht events on. dht_event_rx: Receiver, - /// Interval to be proactive on, e.g. publishing own addresses or starting to query for addresses. - interval: tokio_timer::Interval, + /// Interval to be proactive, publishing own addresses. + publish_interval: tokio_timer::Interval, + /// Interval on which to query for addresses of other authorities. + query_interval: tokio_timer::Interval, /// The network peerset interface for priority groups lets us only set an entire group, but we retrieve the /// addresses of other authorities one by one from the network. To use the peerset interface we need to cache the @@ -120,18 +122,24 @@ where dht_event_rx: futures::sync::mpsc::Receiver, ) -> AuthorityDiscovery { // Kademlia's default time-to-live for Dht records is 36h, republishing records every 24h. Given that a node - // could restart at any point in time, one can not depend on the republishing process, thus starting to publish - // own external addresses should happen on an interval < 36h. - // TODO: It might make sense to split this up into a publication_interval and a retrieval_interval. - let interval = + // could restart at any point in time, one can not depend on the republishing process, thus publishing own + // external addresses should happen on an interval < 36h. + let publish_interval = tokio_timer::Interval::new(Instant::now(), Duration::from_secs(12 * 60 * 60)); + + // External addresses of other authorities can change at any given point in time. The interval on which to query + // for external addresses of other authorities is a trade off between efficiency and performance. + let query_interval = + tokio_timer::Interval::new(Instant::now(), Duration::from_secs(10 * 60)); + let address_cache = HashMap::new(); AuthorityDiscovery { client, network, dht_event_rx, - interval, + publish_interval, + query_interval, address_cache, phantom_authority_id: PhantomData, phantom_block: PhantomData, @@ -236,7 +244,8 @@ where &mut self, values: Vec<(libp2p::kad::record::Key, Vec)>, ) -> Result<()> { - println!("==== Dht found handling, cache: {:?}", self.address_cache); + debug!(target: "sub-authority-discovery", "Got Dht value from network."); + let id = BlockId::hash(self.client.info().best_hash); // From the Dht we only get the hashed authority id. In order to retrieve the actual authority id and to ensure @@ -295,6 +304,7 @@ where .flatten(), ); + debug!(target: "sub-authority-discovery", "Applying priority group {:#?} to peerset.", addresses); self.network .set_priority_group("authorities".to_string(), addresses) .map_err(Error::SettingPeersetPriorityGroup)?; @@ -331,15 +341,23 @@ where // Process incoming events before triggering new ones. self.handle_dht_events()?; - if let Ok(Async::Ready(_)) = self.interval.poll() { + if let Ok(Async::Ready(_)) = self.publish_interval.poll() { // Make sure to call interval.poll until it returns Async::NotReady once. Otherwise, in case one of the // function calls within this block do a `return`, we don't call `interval.poll` again and thereby the // underlying Tokio task is never registered with Tokio's Reactor to be woken up on the next interval // tick. - while let Ok(Async::Ready(_)) = self.interval.poll() {} + while let Ok(Async::Ready(_)) = self.publish_interval.poll() {} self.publish_own_ext_addresses()?; - // TODO: Should we request external addresses more often than we publish our owns? + } + + if let Ok(Async::Ready(_)) = self.query_interval.poll() { + // Make sure to call interval.poll until it returns Async::NotReady once. Otherwise, in case one of the + // function calls within this block do a `return`, we don't call `interval.poll` again and thereby the + // underlying Tokio task is never registered with Tokio's Reactor to be woken up on the next interval + // tick. + while let Ok(Async::Ready(_)) = self.query_interval.poll() {} + self.request_addresses_of_others()?; } diff --git a/core/authority-discovery/src/schema/dht.proto b/core/authority-discovery/src/schema/dht.proto index 434c54ffa645f..8d6fa066c656b 100644 --- a/core/authority-discovery/src/schema/dht.proto +++ b/core/authority-discovery/src/schema/dht.proto @@ -11,4 +11,4 @@ message AuthorityAddresses { message SignedAuthorityAddresses { bytes addresses = 1; bytes signature = 2; -} \ No newline at end of file +} From 76db4124145a6a12bbdbd4a3296727e13771c7cb Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 30 Aug 2019 14:55:24 +0200 Subject: [PATCH 10/20] *: Remove abstract type AuthorityId replaced by newtype Vec The authority discovery module treats an authority identifier as an opaque string. Thus the type abstraction `AuthorityId` is unnecessary, bloating the `core/service` construction code. --- .../authority-discovery/primitives/src/lib.rs | 20 +++--- core/authority-discovery/src/error.rs | 1 - core/authority-discovery/src/lib.rs | 71 ++++++------------- core/service/src/components.rs | 12 +--- core/service/src/lib.rs | 2 - node/cli/src/service.rs | 1 - node/runtime/src/lib.rs | 34 ++++++--- srml/authority-discovery/src/lib.rs | 13 ++-- 8 files changed, 61 insertions(+), 93 deletions(-) diff --git a/core/authority-discovery/primitives/src/lib.rs b/core/authority-discovery/primitives/src/lib.rs index 556b758aa61fc..33710df043a99 100644 --- a/core/authority-discovery/primitives/src/lib.rs +++ b/core/authority-discovery/primitives/src/lib.rs @@ -19,9 +19,15 @@ #![cfg_attr(not(feature = "std"), no_std)] use client::decl_runtime_apis; -use codec::Codec; use rstd::vec::Vec; +#[derive(codec::Encode, codec::Decode, Eq, PartialEq, Clone)] +#[cfg_attr(feature = "std", derive(Debug, Hash))] +pub struct Signature(pub Vec); +#[derive(codec::Encode, codec::Decode, Eq, PartialEq, Clone)] +#[cfg_attr(feature = "std", derive(Debug, Hash))] +pub struct AuthorityId(pub Vec); + decl_runtime_apis! { /// The authority discovery api. /// @@ -29,21 +35,15 @@ decl_runtime_apis! { /// own authority identifier, to retrieve identifiers of the current authority /// set, as well as sign and verify Kademlia Dht external address payloads /// from and to other authorities. - pub trait AuthorityDiscoveryApi { - /// Returns own authority identifier iff it is part of the current authority - /// set, otherwise this function returns None. The restriction might be - /// softened in the future in case a consumer needs to learn own authority - /// identifier. - fn authority_id() -> Option; - + pub trait AuthorityDiscoveryApi { /// Retrieve authority identifiers of the current authority set. fn authorities() -> Vec; /// Sign the given payload with the private key corresponding to the given authority id. - fn sign(payload: Vec, authority_id: AuthorityId) -> Option>; + fn sign(payload: Vec) -> Option<(Signature, AuthorityId)>; /// Verify the given signature for the given payload with the given /// authority identifier. - fn verify(payload: Vec, signature: Vec, authority_id: AuthorityId) -> bool; + fn verify(payload: Vec, signature: Signature, authority_id: AuthorityId) -> bool; } } diff --git a/core/authority-discovery/src/error.rs b/core/authority-discovery/src/error.rs index a9034aea3056c..34501b60c458a 100644 --- a/core/authority-discovery/src/error.rs +++ b/core/authority-discovery/src/error.rs @@ -21,7 +21,6 @@ pub type Result = std::result::Result; #[derive(Debug, derive_more::Display, derive_more::From)] pub enum Error { - RetrievingAuthorityId, VerifyingDhtPayload, HashingAuthorityId(libp2p::core::multiaddr::multihash::EncodeError), CallingRuntime(client::error::Error), diff --git a/core/authority-discovery/src/lib.rs b/core/authority-discovery/src/lib.rs index 2c1687cda2029..71b5d05079d96 100644 --- a/core/authority-discovery/src/lib.rs +++ b/core/authority-discovery/src/lib.rs @@ -43,7 +43,7 @@ //! //! 4. Adds the retrieved external addresses as priority nodes to the peerset. -use authority_discovery_primitives::AuthorityDiscoveryApi; +use authority_discovery_primitives::{AuthorityDiscoveryApi, AuthorityId, Signature}; use client::blockchain::HeaderBackend; use error::{Error, Result}; use futures::{prelude::*, sync::mpsc::Receiver}; @@ -66,19 +66,12 @@ mod schema { } /// A AuthorityDiscovery makes a given authority discoverable as well as discovers other authorities. -pub struct AuthorityDiscovery +pub struct AuthorityDiscovery where Block: BlockT + 'static, Network: NetworkProvider, - AuthorityId: std::string::ToString - + codec::Codec - + std::convert::AsRef<[u8]> - + std::clone::Clone - + std::fmt::Debug - + std::hash::Hash - + std::cmp::Eq, Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend, - ::Api: AuthorityDiscoveryApi, + ::Api: AuthorityDiscoveryApi, { client: Arc, @@ -97,30 +90,22 @@ where /// `purge_old_authorities_from_cache` function is called each time we add a new entry. address_cache: HashMap>, - phantom_authority_id: PhantomData, - phantom_block: PhantomData, + phantom: PhantomData, } -impl AuthorityDiscovery +impl AuthorityDiscovery where Block: BlockT + 'static, Network: NetworkProvider, - AuthorityId: std::string::ToString - + codec::Codec - + std::convert::AsRef<[u8]> - + std::clone::Clone - + std::fmt::Debug - + std::hash::Hash - + std::cmp::Eq, Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend, - ::Api: AuthorityDiscoveryApi, + ::Api: AuthorityDiscoveryApi, { /// Return a new authority discovery. pub fn new( client: Arc, network: Arc, dht_event_rx: futures::sync::mpsc::Receiver, - ) -> AuthorityDiscovery { + ) -> AuthorityDiscovery { // Kademlia's default time-to-live for Dht records is 36h, republishing records every 24h. Given that a node // could restart at any point in time, one can not depend on the republishing process, thus publishing own // external addresses should happen on an interval < 36h. @@ -141,21 +126,13 @@ where publish_interval, query_interval, address_cache, - phantom_authority_id: PhantomData, - phantom_block: PhantomData, + phantom: PhantomData, } } fn publish_own_ext_addresses(&mut self) -> Result<()> { let id = BlockId::hash(self.client.info().best_hash); - let authority_id = self - .client - .runtime_api() - .authority_id(&id) - .map_err(Error::CallingRuntime)? - .ok_or(Error::RetrievingAuthorityId)?; - let addresses = self .network .external_addresses() @@ -176,23 +153,23 @@ where .encode(&mut serialized_addresses) .map_err(Error::Encoding)?; - let sig = self + let (signature, authority_id) = self .client .runtime_api() - .sign(&id, serialized_addresses.clone(), authority_id.clone()) + .sign(&id, serialized_addresses.clone()) .map_err(Error::CallingRuntime)? .ok_or(Error::SigningDhtPayload)?; let mut signed_addresses = vec![]; schema::SignedAuthorityAddresses { addresses: serialized_addresses, - signature: sig, + signature: signature.0, } .encode(&mut signed_addresses) .map_err(Error::Encoding)?; self.network - .put_value(hash_authority_id(authority_id.as_ref())?, signed_addresses); + .put_value(hash_authority_id(authority_id.0.as_ref())?, signed_addresses); Ok(()) } @@ -208,7 +185,7 @@ where for authority_id in authorities.iter() { self.network - .get_value(&hash_authority_id(authority_id.as_ref())?); + .get_value(&hash_authority_id(authority_id.0.as_ref())?); } Ok(()) @@ -255,12 +232,12 @@ where let authorities = authorities .into_iter() - .map(|a| hash_authority_id(a.as_ref()).map(|h| (h, a))) + .map(|a| hash_authority_id(a.0.as_ref()).map(|h| (h, a))) .collect::>>()?; for (key, value) in values.iter() { // Check if the event origins from an authority in the current authority set. - let authority_pub_key: &AuthorityId = authorities + let authority_id: &AuthorityId = authorities .get(key) .ok_or(Error::MatchingHashedAuthorityIdWithAuthorityId)?; @@ -273,8 +250,8 @@ where .verify( &id, signed_addresses.addresses.clone(), - signed_addresses.signature.clone(), - authority_pub_key.clone(), + Signature(signed_addresses.signature.clone()), + authority_id.clone(), ) .map_err(Error::CallingRuntime)?; @@ -292,7 +269,7 @@ where .map_err(Error::ParsingMultiaddress)?; self.address_cache - .insert(authority_pub_key.clone(), addresses); + .insert(authority_id.clone(), addresses); } // Let's update the peerset priority group with the all the addresses we have in our cache. @@ -318,20 +295,12 @@ where } } -impl futures::Future - for AuthorityDiscovery +impl futures::Future for AuthorityDiscovery where Block: BlockT + 'static, Network: NetworkProvider, - AuthorityId: std::string::ToString - + codec::Codec - + std::convert::AsRef<[u8]> - + std::clone::Clone - + std::fmt::Debug - + std::hash::Hash - + std::cmp::Eq, Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend, - ::Api: AuthorityDiscoveryApi, + ::Api: AuthorityDiscoveryApi, { type Item = (); type Error = (); diff --git a/core/service/src/components.rs b/core/service/src/components.rs index 7d7d99aee48d2..15d1f7a786634 100644 --- a/core/service/src/components.rs +++ b/core/service/src/components.rs @@ -318,7 +318,7 @@ impl AuthorityDiscovery for C where ComponentClient: ProvideRuntimeApi, as ProvideRuntimeApi>::Api: - AuthorityDiscoveryApi, ::AuthorityId>, + AuthorityDiscoveryApi>, { fn authority_discovery( client: Arc>, @@ -392,16 +392,6 @@ pub trait ServiceFactory: 'static + Sized { type LightImportQueue: ImportQueue + 'static; /// The Fork Choice Strategy for the chain. type SelectChain: SelectChain + 'static; - /// Authority identifier used for authority-discovery module. - // TODO: Are all of these trait bounds necessary? - type AuthorityId: codec::Codec - + std::clone::Clone - + std::cmp::Eq - + std::convert::AsRef<[u8]> - + std::fmt::Debug - + std::hash::Hash - + std::marker::Send - + std::string::ToString; //TODO: replace these with a constructor trait. that TransactionPool implements. (#1242) /// Extrinsic pool constructor for the full client. diff --git a/core/service/src/lib.rs b/core/service/src/lib.rs index ba4a3036a4599..0776a1e250392 100644 --- a/core/service/src/lib.rs +++ b/core/service/src/lib.rs @@ -1045,7 +1045,6 @@ macro_rules! construct_service_factory { FinalityProofProvider = { $( $finality_proof_provider_init:tt )* }, RpcExtensions = $rpc_extensions_ty:ty $( { $( $rpc_extensions:tt )* } )?, - AuthorityId = $authority_id:ty, } ) => { $( #[$attr] )* @@ -1067,7 +1066,6 @@ macro_rules! construct_service_factory { type LightImportQueue = $light_import_queue; type SelectChain = $select_chain; type RpcExtensions = $rpc_extensions_ty; - type AuthorityId = $authority_id; fn build_full_transaction_pool( config: $crate::TransactionPoolOptions, diff --git a/node/cli/src/service.rs b/node/cli/src/service.rs index 15796a147adce..8ba6d9a6872cf 100644 --- a/node/cli/src/service.rs +++ b/node/cli/src/service.rs @@ -288,7 +288,6 @@ construct_service_factory! { ); io }}, - AuthorityId = im_online::AuthorityId, } } diff --git a/node/runtime/src/lib.rs b/node/runtime/src/lib.rs index f860e252ca72f..377f00e5ef9f9 100644 --- a/node/runtime/src/lib.rs +++ b/node/runtime/src/lib.rs @@ -49,6 +49,8 @@ use primitives::OpaqueMetadata; use grandpa::{AuthorityId as GrandpaId, AuthorityWeight as GrandpaWeight}; use im_online::{AuthorityId as ImOnlineId}; use finality_tracker::{DEFAULT_REPORT_LATENCY, DEFAULT_WINDOW_SIZE}; +use authority_discovery_primitives::{AuthorityId as EncodedAuthorityId, Signature as EncodedSignature}; +use codec::{Encode, Decode}; #[cfg(any(feature = "std", test))] pub use sr_primitives::BuildStorage; @@ -579,20 +581,32 @@ impl_runtime_apis! { } } - impl authority_discovery_primitives::AuthorityDiscoveryApi for Runtime { - fn authority_id() -> Option { - AuthorityDiscovery::authority_id() - } - fn authorities() -> Vec { - AuthorityDiscovery::authorities() + impl authority_discovery_primitives::AuthorityDiscoveryApi for Runtime { + fn authorities() -> Vec { + AuthorityDiscovery::authorities().into_iter() + .map(|id| id.encode()) + .map(EncodedAuthorityId) + .collect() } - fn sign(payload: Vec, authority_id: im_online::AuthorityId) -> Option> { - AuthorityDiscovery::sign(payload, authority_id) + fn sign(payload: Vec) -> Option<(EncodedSignature, EncodedAuthorityId)> { + AuthorityDiscovery::sign(payload).map(|(sig, id)| { + (EncodedSignature(sig.encode()), EncodedAuthorityId(id.encode())) + }) } - fn verify(payload: Vec, signature: Vec, public_key: im_online::AuthorityId) -> bool { - AuthorityDiscovery::verify(payload, signature, public_key) + fn verify(payload: Vec, signature: EncodedSignature, authority_id: EncodedAuthorityId) -> bool { + let signature = match im_online::AuthoritySignature::decode(&mut &signature.0[..]) { + Ok(s) => s, + _ => return false, + }; + + let authority_id = match im_online::AuthorityId::decode(&mut &authority_id.0[..]) { + Ok(id) => id, + _ => return false, + }; + + AuthorityDiscovery::verify(payload, signature, authority_id) } } diff --git a/srml/authority-discovery/src/lib.rs b/srml/authority-discovery/src/lib.rs index 00912aeffed8b..c0e1bd7ac0117 100644 --- a/srml/authority-discovery/src/lib.rs +++ b/srml/authority-discovery/src/lib.rs @@ -64,7 +64,7 @@ impl Module { /// set, otherwise this function returns None. The restriction might be /// softened in the future in case a consumer needs to learn own authority /// identifier. - pub fn authority_id() -> Option { + fn authority_id() -> Option { let authorities = Keys::get(); let local_keys = im_online::AuthorityId::all(); @@ -84,20 +84,19 @@ impl Module { } /// Sign the given payload with the private key corresponding to the given authority id. - pub fn sign(payload: Vec, authority_id: im_online::AuthorityId) -> Option> { - authority_id.sign(&payload).map(|s| s.encode()) + pub fn sign(payload: Vec) -> Option<(im_online::AuthoritySignature, im_online::AuthorityId)> { + let authority_id = Module::::authority_id()?; + authority_id.sign(&payload).map(|s| (s, authority_id)) } /// Verify the given signature for the given payload with the given /// authority identifier. pub fn verify( payload: Vec, - signature: Vec, + signature: im_online::AuthoritySignature, authority_id: im_online::AuthorityId, ) -> bool { - im_online::AuthoritySignature::decode(&mut &signature[..]) - .map(|s| authority_id.verify(&payload, &s)) - .unwrap_or(false) + authority_id.verify(&payload, &signature) } fn initialize_keys(keys: &[im_online::AuthorityId]) { From 56da02cf97044dcc7ac4acf5b0941f1e992ae740 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 30 Aug 2019 17:04:23 +0200 Subject: [PATCH 11/20] {core,srml}/authority-discovery: Adjust tests to merge --- core/authority-discovery/src/lib.rs | 44 +++++++++++++---------------- srml/authority-discovery/src/lib.rs | 5 ++-- 2 files changed, 22 insertions(+), 27 deletions(-) diff --git a/core/authority-discovery/src/lib.rs b/core/authority-discovery/src/lib.rs index 71b5d05079d96..ea937e112da93 100644 --- a/core/authority-discovery/src/lib.rs +++ b/core/authority-discovery/src/lib.rs @@ -168,8 +168,10 @@ where .encode(&mut signed_addresses) .map_err(Error::Encoding)?; - self.network - .put_value(hash_authority_id(authority_id.0.as_ref())?, signed_addresses); + self.network.put_value( + hash_authority_id(authority_id.0.as_ref())?, + signed_addresses, + ); Ok(()) } @@ -268,8 +270,7 @@ where .collect::>() .map_err(Error::ParsingMultiaddress)?; - self.address_cache - .insert(authority_id.clone(), addresses); + self.address_cache.insert(authority_id.clone(), addresses); } // Let's update the peerset priority group with the all the addresses we have in our cache. @@ -521,47 +522,42 @@ mod tests { } } - impl AuthorityDiscoveryApi for RuntimeApi { - fn AuthorityDiscoveryApi_authority_id_runtime_api_impl( - &self, - _: &BlockId, - _: ExecutionContext, - _: Option<()>, - _: Vec, - ) -> std::result::Result>, client::error::Error> { - return Ok(NativeOrEncoded::Native(Some("test".to_string()))); - } + impl AuthorityDiscoveryApi for RuntimeApi { fn AuthorityDiscoveryApi_authorities_runtime_api_impl( &self, _: &BlockId, _: ExecutionContext, _: Option<()>, _: Vec, - ) -> std::result::Result>, client::error::Error> { + ) -> std::result::Result>, client::error::Error> { return Ok(NativeOrEncoded::Native(vec![ - "test-authority-id-1".to_string(), - "test-authority-id-2".to_string(), + AuthorityId("test-authority-id-1".as_bytes().to_vec()), + AuthorityId("test-authority-id-2".as_bytes().to_vec()), ])); } fn AuthorityDiscoveryApi_sign_runtime_api_impl( &self, _: &BlockId, _: ExecutionContext, - _: Option<(std::vec::Vec, String)>, + _: Option>, _: Vec, - ) -> std::result::Result>>, client::error::Error> { - return Ok(NativeOrEncoded::Native(Some( - "test-signature-1".as_bytes().to_vec(), - ))); + ) -> std::result::Result< + NativeOrEncoded>, + client::error::Error, + > { + return Ok(NativeOrEncoded::Native(Some(( + Signature("test-signature-1".as_bytes().to_vec()), + AuthorityId("test-authority-id-1".as_bytes().to_vec()), + )))); } fn AuthorityDiscoveryApi_verify_runtime_api_impl( &self, _: &BlockId, _: ExecutionContext, - args: Option<(Vec, Vec, String)>, + args: Option<(Vec, Signature, AuthorityId)>, _: Vec, ) -> std::result::Result, client::error::Error> { - if args.unwrap().1 == "test-signature-1".as_bytes() { + if args.unwrap().1 == Signature("test-signature-1".as_bytes().to_vec()) { return Ok(NativeOrEncoded::Native(true)); } return Ok(NativeOrEncoded::Native(false)); diff --git a/srml/authority-discovery/src/lib.rs b/srml/authority-discovery/src/lib.rs index d7deb453b8f48..15599df59e2fc 100644 --- a/srml/authority-discovery/src/lib.rs +++ b/srml/authority-discovery/src/lib.rs @@ -347,10 +347,9 @@ mod tests { externalities.set_keystore(key_store); with_externalities(&mut externalities, || { - let authority_id = AuthorityDiscovery::authority_id().expect("authority id"); let payload = String::from("test payload").into_bytes(); - let sig = - AuthorityDiscovery::sign(payload.clone(), authority_id.clone()).expect("signature"); + let (sig, authority_id) = + AuthorityDiscovery::sign(payload.clone()).expect("signature"); assert!(AuthorityDiscovery::verify( payload, From 1a5f9742c6c8729ef128d896ef6dd2db0650a6b0 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 30 Aug 2019 17:06:35 +0200 Subject: [PATCH 12/20] node/runtime: Bump runtime spec and impl version --- node/runtime/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/runtime/src/lib.rs b/node/runtime/src/lib.rs index 800224447bae7..1041709fe12ae 100644 --- a/node/runtime/src/lib.rs +++ b/node/runtime/src/lib.rs @@ -81,8 +81,8 @@ pub const VERSION: RuntimeVersion = RuntimeVersion { // and set impl_version to equal spec_version. If only runtime // implementation changes and behavior does not, then leave spec_version as // is and increment impl_version. - spec_version: 154, - impl_version: 157, + spec_version: 155, + impl_version: 155, apis: RUNTIME_API_VERSIONS, }; From 32214d917314be60e8830b0f8fa2c161913176e8 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 2 Sep 2019 11:37:35 +0200 Subject: [PATCH 13/20] *: Instantiate authority discovery within node/cli Instead of instantiating the authority discovery module within core/service, this commit instantiates authority discovery within node/cli. The authority discovery module depends on the srml authority discovery module, which depends on the im online module, as well as session, ... With the former approach all these dependencies were enforced on any substrate implementation. With the latter approach these dependencies are optional. --- Cargo.lock | 1 + core/service/src/builder.rs | 45 +++++++++++++++++++++++++++++++++---- core/service/src/lib.rs | 19 ++++------------ node/cli/Cargo.toml | 3 ++- node/cli/src/service.rs | 17 ++++++++++++++ 5 files changed, 65 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0c2565a4e69a1..f4c05e3fc0130 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2334,6 +2334,7 @@ dependencies = [ "srml-system 2.0.0", "srml-timestamp 2.0.0", "structopt 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)", + "substrate-authority-discovery 2.0.0", "substrate-basic-authorship 2.0.0", "substrate-cli 2.0.0", "substrate-client 2.0.0", diff --git a/core/service/src/builder.rs b/core/service/src/builder.rs index d8df1043b5636..33d2db221f91a 100644 --- a/core/service/src/builder.rs +++ b/core/service/src/builder.rs @@ -72,6 +72,7 @@ pub struct ServiceBuilder, rpc_extensions: TRpc, + dht_event_tx: Option>, marker: PhantomData<(TBl, TRtApi)>, } @@ -137,6 +138,7 @@ where TGen: Serialize + DeserializeOwned + BuildStorage { network_protocol: (), transaction_pool: Arc::new(()), rpc_extensions: Default::default(), + dht_event_tx: None, marker: PhantomData, }) } @@ -220,6 +222,7 @@ where TGen: Serialize + DeserializeOwned + BuildStorage { network_protocol: (), transaction_pool: Arc::new(()), rpc_extensions: Default::default(), + dht_event_tx: None, marker: PhantomData, }) } @@ -264,6 +267,7 @@ impl, + ) -> Result, Error> { + Ok(ServiceBuilder { + config: self.config, + client: self.client, + backend: self.backend, + keystore: self.keystore, + fetcher: self.fetcher, + select_chain: self.select_chain, + import_queue: self.import_queue, + finality_proof_request_builder: self.finality_proof_request_builder, + finality_proof_provider: self.finality_proof_provider, + network_protocol: self.network_protocol, + transaction_pool: self.transaction_pool, + rpc_extensions: self.rpc_extensions, + dht_event_tx: Some(dht_event_tx), + marker: self.marker, + }) + } } /// Implemented on `ServiceBuilder`. Allows importing blocks once you have given all the required @@ -617,7 +652,6 @@ ServiceBuilder< as ProvideRuntimeApi>::Api: runtime_api::Metadata + offchain::OffchainWorkerApi + - authority_discovery_primitives::AuthorityDiscoveryApi + runtime_api::TaggedTransactionQueue + session::SessionKeys, TBl: BlockT::Out>, @@ -662,7 +696,8 @@ ServiceBuilder< finality_proof_provider, network_protocol, transaction_pool, - rpc_extensions + rpc_extensions, + dht_event_tx, ) = ( self.client, self.fetcher, @@ -674,7 +709,8 @@ ServiceBuilder< self.finality_proof_provider, self.network_protocol, self.transaction_pool, - self.rpc_extensions + self.rpc_extensions, + self.dht_event_tx, ); new_impl!( @@ -692,7 +728,8 @@ ServiceBuilder< finality_proof_provider, network_protocol, transaction_pool, - rpc_extensions + rpc_extensions, + dht_event_tx, )) }, |h, c, tx| maintain_transaction_pool(h, c, tx), diff --git a/core/service/src/lib.rs b/core/service/src/lib.rs index 96fea9bb9842b..282ae6c22ec22 100644 --- a/core/service/src/lib.rs +++ b/core/service/src/lib.rs @@ -154,7 +154,8 @@ macro_rules! new_impl { finality_proof_provider, network_protocol, transaction_pool, - rpc_extensions + rpc_extensions, + dht_event_tx, ) = $build_components(&$config)?; let import_queue = Box::new(import_queue); let chain_info = client.info().chain; @@ -356,18 +357,6 @@ macro_rules! new_impl { let rpc_handlers = gen_handler(); let rpc = start_rpc_servers(&$config, gen_handler)?; - // Use bounded channel to ensure back-pressure. Authority discovery is - // triggering one event per authority within the current authority set. This - // estimates the authority set size to be somewhere below 10 000 thereby - // setting the channel buffer size to 10 000. - let (dht_event_tx, dht_event_rx) = - mpsc::channel::(10000); - let authority_discovery = substrate_authority_discovery::AuthorityDiscovery::new( - client.clone(), - network.clone(), - dht_event_rx, - ); - let _ = to_spawn_tx.unbounded_send(Box::new(authority_discovery)); let _ = to_spawn_tx.unbounded_send(Box::new(build_network_future( network_mut, @@ -666,7 +655,7 @@ fn build_network_future< status_sinks: Arc, NetworkState)>>>>, rpc_rx: futures03::channel::mpsc::UnboundedReceiver>, should_have_peers: bool, - mut dht_events_tx: mpsc::Sender, + dht_event_tx: Option>, ) -> impl Future { // Compatibility shim while we're transitionning to stable Futures. // See https://github.com/paritytech/substrate/issues/3099 @@ -751,7 +740,7 @@ fn build_network_future< // events are being passed on to the authority-discovery module. In the future there might be multiple // consumers of these events. In that case this would need to be refactored to properly dispatch the events, // e.g. via a subscriber model. - if let Err(e) = dht_events_tx.try_send(event) { + if let Some(Err(e)) = dht_event_tx.as_ref().map(|c| c.clone().try_send(event)) { if e.is_full() { warn!(target: "service", "Dht event channel to authority discovery is full, dropping event."); } else if e.is_disconnected() { diff --git a/node/cli/Cargo.toml b/node/cli/Cargo.toml index 1f35f7b86b41c..c28a517639654 100644 --- a/node/cli/Cargo.toml +++ b/node/cli/Cargo.toml @@ -46,7 +46,8 @@ system = { package = "srml-system", path = "../../srml/system" } balances = { package = "srml-balances", path = "../../srml/balances" } support = { package = "srml-support", path = "../../srml/support", default-features = false } im_online = { package = "srml-im-online", path = "../../srml/im-online", default-features = false } -authority-discovery = { package = "srml-authority-discovery", path = "../../srml/authority-discovery", default-features = false } +sr-authority-discovery = { package = "srml-authority-discovery", path = "../../srml/authority-discovery", default-features = false } +authority-discovery = { package = "substrate-authority-discovery", path = "../../core/authority-discovery"} [dev-dependencies] keystore = { package = "substrate-keystore", path = "../../core/keystore" } diff --git a/node/cli/src/service.rs b/node/cli/src/service.rs index da78d74a8730c..1f8fe961964f2 100644 --- a/node/cli/src/service.rs +++ b/node/cli/src/service.rs @@ -103,6 +103,8 @@ macro_rules! new_full_start { macro_rules! new_full { ($config:expr) => {{ use futures::Future; + use futures::sync::mpsc; + use network::DhtEvent; let ( is_authority, @@ -118,10 +120,18 @@ macro_rules! new_full { let (builder, mut import_setup, inherent_data_providers, mut tasks_to_spawn) = new_full_start!($config); + // Dht event channel from the network to the authority discovery module. Use bounded channel to ensure + // back-pressure. Authority discovery is triggering one event per authority within the current authority set. + // This estimates the authority set size to be somewhere below 10 000 thereby setting the channel buffer size to + // 10 000. + let (dht_event_tx, dht_event_rx) = + mpsc::channel::(10000); + let service = builder.with_network_protocol(|_| Ok(crate::service::NodeProtocol::new()))? .with_finality_proof_provider(|client, backend| Ok(Arc::new(grandpa::FinalityProofProvider::new(backend, client)) as _) )? + .with_dht_event_tx(dht_event_tx)? .build()?; let (block_import, link_half, babe_link) = import_setup.take() @@ -162,6 +172,13 @@ macro_rules! new_full { let babe = babe::start_babe(babe_config)?; let select = babe.select(service.on_exit()).then(|_| Ok(())); service.spawn_task(Box::new(select)); + + let authority_discovery = authority_discovery::AuthorityDiscovery::new( + service.client(), + service.network(), + dht_event_rx, + ); + let _ = service.spawn_task(Box::new(authority_discovery)); } let config = grandpa::Config { From 30bd78ef85bbba20bcda37e9cdbfc48c48dd83ce Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 3 Sep 2019 16:20:57 +0200 Subject: [PATCH 14/20] core/authority-discovery: Remove patch version in Cargo.toml --- core/authority-discovery/Cargo.toml | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/core/authority-discovery/Cargo.toml b/core/authority-discovery/Cargo.toml index 0a94830ead4cb..ac7f8ac3685ee 100644 --- a/core/authority-discovery/Cargo.toml +++ b/core/authority-discovery/Cargo.toml @@ -9,24 +9,24 @@ build = "build.rs" prost-build = "0.5" [dependencies] -network = { package = "substrate-network", path = "../../core/network" } -sr-primitives = { path = "../../core/sr-primitives" } -primitives = { package = "substrate-primitives", path = "../primitives" } -client = { package = "substrate-client", path = "../../core/client" } authority-discovery-primitives = { package = "substrate-authority-discovery-primitives", path = "./primitives", default-features = false } +bytes = "0.4" +client = { package = "substrate-client", path = "../../core/client" } codec = { package = "parity-scale-codec", default-features = false, version = "1.0.3" } -futures = "0.1.17" -tokio-timer = "0.2" +derive_more = "0.14.0" +futures = "0.1" keystore = { package = "substrate-keystore", path = "../../core/keystore" } libp2p = { version = "0.12.0", default-features = false, features = ["secp256k1", "libp2p-websocket"] } -serde_json = "1.0" log = "0.4" -derive_more = "0.14.0" +network = { package = "substrate-network", path = "../../core/network" } +primitives = { package = "substrate-primitives", path = "../primitives" } prost = "0.5" -bytes = "0.4" +serde_json = "1.0" +sr-primitives = { path = "../../core/sr-primitives" } +tokio-timer = "0.2" [dev-dependencies] -test-client = { package = "substrate-test-runtime-client", path = "../../core/test-runtime/client" } -peerset = { package = "substrate-peerset", path = "../../core/peerset" } parking_lot = { version = "0.9.0" } -tokio = { version = "0.1.11"} +peerset = { package = "substrate-peerset", path = "../../core/peerset" } +test-client = { package = "substrate-test-runtime-client", path = "../../core/test-runtime/client" } +tokio = { version = "0.1"} From 1a376bf6058308b7857e2d7dbc829f8dae2a1137 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 3 Sep 2019 16:26:48 +0200 Subject: [PATCH 15/20] core/authority-discovery: Add doc comments to Error enum --- core/authority-discovery/src/error.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/core/authority-discovery/src/error.rs b/core/authority-discovery/src/error.rs index 34501b60c458a..dca50cc0beb9e 100644 --- a/core/authority-discovery/src/error.rs +++ b/core/authority-discovery/src/error.rs @@ -19,18 +19,27 @@ /// AuthorityDiscovery Result. pub type Result = std::result::Result; +/// Error type for the authority discovery module. #[derive(Debug, derive_more::Display, derive_more::From)] pub enum Error { + /// Failed to verify a dht payload with the given signature. VerifyingDhtPayload, + /// Failed to hash the authority id to be used as a dht key. HashingAuthorityId(libp2p::core::multiaddr::multihash::EncodeError), + /// Failed calling into the Substrate runtime. CallingRuntime(client::error::Error), + /// Failed signing the dht payload via the Substrate runtime. SigningDhtPayload, /// From the Dht we only get the hashed authority id. In order to retrieve the actual authority id and to ensure it /// is actually an authority, we match the hash against the hash of the authority id of all other authorities. This /// error is the result of the above failing. MatchingHashedAuthorityIdWithAuthorityId, + /// Failed to set the authority discovery peerset priority group in the peerset module. SettingPeersetPriorityGroup(String), + /// Failed to encode a dht payload. Encoding(prost::EncodeError), + /// Failed to decode a dht payload. Decoding(prost::DecodeError), + /// Failed to parse a libp2p multi address. ParsingMultiaddress(libp2p::core::multiaddr::Error), } From c5d5a90c847f42c8fc311a088c88cbe229e27967 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 3 Sep 2019 19:27:02 +0200 Subject: [PATCH 16/20] authority-discovery: Encode address as Protobuf bytes instead of string --- core/authority-discovery/src/lib.rs | 24 +++++++++---------- core/authority-discovery/src/schema/dht.proto | 2 +- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/core/authority-discovery/src/lib.rs b/core/authority-discovery/src/lib.rs index ea937e112da93..7b8932d5431ea 100644 --- a/core/authority-discovery/src/lib.rs +++ b/core/authority-discovery/src/lib.rs @@ -35,7 +35,7 @@ //! //! 2. **Discovers other authorities** //! -//! 1. Retrieves the current set of authorities.. +//! 1. Retrieves the current set of authorities. //! //! 2. Starts DHT queries for the ids of the authorities. //! @@ -54,6 +54,7 @@ use prost::Message; use sr_primitives::generic::BlockId; use sr_primitives::traits::{Block as BlockT, ProvideRuntimeApi}; use std::collections::{HashMap, HashSet}; +use std::convert::TryInto; use std::iter::FromIterator; use std::marker::PhantomData; use std::sync::Arc; @@ -65,7 +66,7 @@ mod schema { include!(concat!(env!("OUT_DIR"), "/authority_discovery.rs")); } -/// A AuthorityDiscovery makes a given authority discoverable as well as discovers other authorities. +/// An `AuthorityDiscovery` makes a given authority discoverable and discovers other authorities. pub struct AuthorityDiscovery where Block: BlockT + 'static, @@ -137,21 +138,18 @@ where .network .external_addresses() .into_iter() - .map(|mut a| { - a.push(libp2p::core::multiaddr::Protocol::P2p( + .map(|a| { + a.with(libp2p::core::multiaddr::Protocol::P2p( self.network.local_peer_id().into(), - )); - a + )) }) - .map(|a| a.to_string()) + .map(|a| a.to_vec()) .collect(); let mut serialized_addresses = vec![]; - schema::AuthorityAddresses { - addresses: addresses, - } - .encode(&mut serialized_addresses) - .map_err(Error::Encoding)?; + schema::AuthorityAddresses { addresses } + .encode(&mut serialized_addresses) + .map_err(Error::Encoding)?; let (signature, authority_id) = self .client @@ -266,7 +264,7 @@ where .map(|a| a.addresses) .map_err(Error::Decoding)? .into_iter() - .map(|a| a.parse()) + .map(|a| a.try_into()) .collect::>() .map_err(Error::ParsingMultiaddress)?; diff --git a/core/authority-discovery/src/schema/dht.proto b/core/authority-discovery/src/schema/dht.proto index 8d6fa066c656b..9dbe9d559f4b1 100644 --- a/core/authority-discovery/src/schema/dht.proto +++ b/core/authority-discovery/src/schema/dht.proto @@ -4,7 +4,7 @@ package authority_discovery; // First we need to serialize the addresses in order to be able to sign them. message AuthorityAddresses { - repeated string addresses = 1; + repeated bytes addresses = 1; } // Then we need to serialize addresses and signature to send them over the wire. From d162540acde37242e94ea5e144b90994f51668ea Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 4 Sep 2019 11:01:35 +0200 Subject: [PATCH 17/20] *: Have AuthorityApi.{sign,verify} borrow its inputs --- .../authority-discovery/primitives/src/lib.rs | 4 +-- core/authority-discovery/src/lib.rs | 33 +++++++++---------- node/cli/src/service.rs | 2 +- node/runtime/src/lib.rs | 4 +-- srml/authority-discovery/src/lib.rs | 10 +++--- 5 files changed, 26 insertions(+), 27 deletions(-) diff --git a/core/authority-discovery/primitives/src/lib.rs b/core/authority-discovery/primitives/src/lib.rs index 33710df043a99..13da4de020466 100644 --- a/core/authority-discovery/primitives/src/lib.rs +++ b/core/authority-discovery/primitives/src/lib.rs @@ -40,10 +40,10 @@ decl_runtime_apis! { fn authorities() -> Vec; /// Sign the given payload with the private key corresponding to the given authority id. - fn sign(payload: Vec) -> Option<(Signature, AuthorityId)>; + fn sign(payload: &Vec) -> Option<(Signature, AuthorityId)>; /// Verify the given signature for the given payload with the given /// authority identifier. - fn verify(payload: Vec, signature: Signature, authority_id: AuthorityId) -> bool; + fn verify(payload: &Vec, signature: &Signature, authority_id: &AuthorityId) -> bool; } } diff --git a/core/authority-discovery/src/lib.rs b/core/authority-discovery/src/lib.rs index 7b8932d5431ea..687e1db20f2c6 100644 --- a/core/authority-discovery/src/lib.rs +++ b/core/authority-discovery/src/lib.rs @@ -154,7 +154,7 @@ where let (signature, authority_id) = self .client .runtime_api() - .sign(&id, serialized_addresses.clone()) + .sign(&id, &serialized_addresses) .map_err(Error::CallingRuntime)? .ok_or(Error::SigningDhtPayload)?; @@ -241,32 +241,31 @@ where .get(key) .ok_or(Error::MatchingHashedAuthorityIdWithAuthorityId)?; - let signed_addresses = - schema::SignedAuthorityAddresses::decode(value).map_err(Error::Decoding)?; + let (signature, addresses): (Signature, Vec); + { + let mut signed_addresses = + schema::SignedAuthorityAddresses::decode(value).map_err(Error::Decoding)?; + signature = Signature(std::mem::replace(&mut signed_addresses.signature, vec![])); + addresses = std::mem::replace(&mut signed_addresses.addresses, vec![]); + } let is_verified = self .client .runtime_api() - .verify( - &id, - signed_addresses.addresses.clone(), - Signature(signed_addresses.signature.clone()), - authority_id.clone(), - ) + .verify(&id, &addresses, &signature, &authority_id.clone()) .map_err(Error::CallingRuntime)?; if !is_verified { return Err(Error::VerifyingDhtPayload); } - let addresses: Vec = - schema::AuthorityAddresses::decode(signed_addresses.addresses) - .map(|a| a.addresses) - .map_err(Error::Decoding)? - .into_iter() - .map(|a| a.try_into()) - .collect::>() - .map_err(Error::ParsingMultiaddress)?; + let addresses: Vec = schema::AuthorityAddresses::decode(addresses) + .map(|a| a.addresses) + .map_err(Error::Decoding)? + .into_iter() + .map(|a| a.try_into()) + .collect::>() + .map_err(Error::ParsingMultiaddress)?; self.address_cache.insert(authority_id.clone(), addresses); } diff --git a/node/cli/src/service.rs b/node/cli/src/service.rs index e2f1e32a887d9..c982bd0ee6fde 100644 --- a/node/cli/src/service.rs +++ b/node/cli/src/service.rs @@ -178,7 +178,7 @@ macro_rules! new_full { service.network(), dht_event_rx, ); - let _ = service.spawn_task(Box::new(authority_discovery)); + service.spawn_task(Box::new(authority_discovery)); } let config = grandpa::Config { diff --git a/node/runtime/src/lib.rs b/node/runtime/src/lib.rs index 6296520d139b8..532c1ec4d1eeb 100644 --- a/node/runtime/src/lib.rs +++ b/node/runtime/src/lib.rs @@ -621,13 +621,13 @@ impl_runtime_apis! { .collect() } - fn sign(payload: Vec) -> Option<(EncodedSignature, EncodedAuthorityId)> { + fn sign(payload: &Vec) -> Option<(EncodedSignature, EncodedAuthorityId)> { AuthorityDiscovery::sign(payload).map(|(sig, id)| { (EncodedSignature(sig.encode()), EncodedAuthorityId(id.encode())) }) } - fn verify(payload: Vec, signature: EncodedSignature, authority_id: EncodedAuthorityId) -> bool { + fn verify(payload: &Vec, signature: &EncodedSignature, authority_id: &EncodedAuthorityId) -> bool { let signature = match ImOnlineSignature::decode(&mut &signature.0[..]) { Ok(s) => s, _ => return false, diff --git a/srml/authority-discovery/src/lib.rs b/srml/authority-discovery/src/lib.rs index 4b99e556c82d2..fa95b255fe5ba 100644 --- a/srml/authority-discovery/src/lib.rs +++ b/srml/authority-discovery/src/lib.rs @@ -29,8 +29,8 @@ #![cfg_attr(not(feature = "std"), no_std)] use app_crypto::RuntimeAppPublic; -use rstd::prelude::*; use codec::{Encode, Decode}; +use rstd::prelude::*; use srml_support::{decl_module, decl_storage, StorageValue}; pub trait Trait: system::Trait + session::Trait + im_online::Trait {} @@ -79,19 +79,19 @@ impl Module { } /// Sign the given payload with the private key corresponding to the given authority id. - pub fn sign(payload: Vec) -> Option<(AuthoritySignatureFor, AuthorityIdFor)> { + pub fn sign(payload: &Vec) -> Option<(AuthoritySignatureFor, AuthorityIdFor)> { let authority_id = Module::::authority_id()?; - authority_id.sign(&payload).map(|s| (s, authority_id)) + authority_id.sign(payload).map(|s| (s, authority_id)) } /// Verify the given signature for the given payload with the given /// authority identifier. pub fn verify( - payload: Vec, + payload: &Vec, signature: AuthoritySignatureFor, authority_id: AuthorityIdFor, ) -> bool { - authority_id.verify(&payload, &signature) + authority_id.verify(payload, &signature) } fn initialize_keys(keys: &[AuthorityIdFor]) { From 024687e9f9fb857ba9346da4535a5e6eb6d739de Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 4 Sep 2019 14:56:02 +0200 Subject: [PATCH 18/20] core/authority-discovery: Handle tokio timer errors --- core/authority-discovery/src/error.rs | 2 ++ core/authority-discovery/src/lib.rs | 24 ++++++++++++++++++++---- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/core/authority-discovery/src/error.rs b/core/authority-discovery/src/error.rs index dca50cc0beb9e..e8c1ad9705f0c 100644 --- a/core/authority-discovery/src/error.rs +++ b/core/authority-discovery/src/error.rs @@ -42,4 +42,6 @@ pub enum Error { Decoding(prost::DecodeError), /// Failed to parse a libp2p multi address. ParsingMultiaddress(libp2p::core::multiaddr::Error), + /// Tokio timer error. + PollingTokioTimer(tokio_timer::Error) } diff --git a/core/authority-discovery/src/lib.rs b/core/authority-discovery/src/lib.rs index 687e1db20f2c6..82c644fe393ba 100644 --- a/core/authority-discovery/src/lib.rs +++ b/core/authority-discovery/src/lib.rs @@ -308,22 +308,38 @@ where // Process incoming events before triggering new ones. self.handle_dht_events()?; - if let Ok(Async::Ready(_)) = self.publish_interval.poll() { + if let Async::Ready(_) = self + .publish_interval + .poll() + .map_err(Error::PollingTokioTimer)? + { // Make sure to call interval.poll until it returns Async::NotReady once. Otherwise, in case one of the // function calls within this block do a `return`, we don't call `interval.poll` again and thereby the // underlying Tokio task is never registered with Tokio's Reactor to be woken up on the next interval // tick. - while let Ok(Async::Ready(_)) = self.publish_interval.poll() {} + while let Async::Ready(_) = self + .publish_interval + .poll() + .map_err(Error::PollingTokioTimer)? + {} self.publish_own_ext_addresses()?; } - if let Ok(Async::Ready(_)) = self.query_interval.poll() { + if let Async::Ready(_) = self + .query_interval + .poll() + .map_err(Error::PollingTokioTimer)? + { // Make sure to call interval.poll until it returns Async::NotReady once. Otherwise, in case one of the // function calls within this block do a `return`, we don't call `interval.poll` again and thereby the // underlying Tokio task is never registered with Tokio's Reactor to be woken up on the next interval // tick. - while let Ok(Async::Ready(_)) = self.query_interval.poll() {} + while let Async::Ready(_) = self + .query_interval + .poll() + .map_err(Error::PollingTokioTimer)? + {} self.request_addresses_of_others()?; } From 2e954eb13d6751eeba79e4a4dbf55a414a89b006 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 4 Sep 2019 16:51:03 +0200 Subject: [PATCH 19/20] srml/authority-discovery: Address unit test failures --- srml/authority-discovery/src/lib.rs | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/srml/authority-discovery/src/lib.rs b/srml/authority-discovery/src/lib.rs index 3bcef1c4031f6..1c46822dfef6f 100644 --- a/srml/authority-discovery/src/lib.rs +++ b/srml/authority-discovery/src/lib.rs @@ -29,14 +29,14 @@ #![cfg_attr(not(feature = "std"), no_std)] use app_crypto::RuntimeAppPublic; -use codec::{Encode, Decode}; use rstd::prelude::*; use support::{decl_module, decl_storage, StorageValue}; pub trait Trait: system::Trait + session::Trait + im_online::Trait {} type AuthorityIdFor = ::AuthorityId; -type AuthoritySignatureFor = <::AuthorityId as RuntimeAppPublic>::Signature; +type AuthoritySignatureFor = + <::AuthorityId as RuntimeAppPublic>::Signature; decl_storage! { trait Store for Module as AuthorityDiscovery { @@ -158,10 +158,7 @@ mod tests { pub struct TestOnSessionEnding; impl session::OnSessionEnding for TestOnSessionEnding { - fn on_session_ending( - _: SessionIndex, - _: SessionIndex, - ) -> Option> { + fn on_session_ending(_: SessionIndex, _: SessionIndex) -> Option> { None } } @@ -352,17 +349,12 @@ mod tests { with_externalities(&mut externalities, || { let payload = String::from("test payload").into_bytes(); - let (sig, authority_id) = - AuthorityDiscovery::sign(payload.clone()).expect("signature"); + let (sig, authority_id) = AuthorityDiscovery::sign(&payload).expect("signature"); - assert!(AuthorityDiscovery::verify( - payload, - sig.clone(), - authority_id.clone() - )); + assert!(AuthorityDiscovery::verify(&payload, sig.clone(), authority_id.clone(),)); assert!(!AuthorityDiscovery::verify( - String::from("other payload").into_bytes(), + &String::from("other payload").into_bytes(), sig, authority_id )) From c39c50ec4f1bbff1acfb144512bba6b4e37cb7d4 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 5 Sep 2019 15:07:17 +0200 Subject: [PATCH 20/20] core/authority-discovery: Address unit test failures --- core/authority-discovery/src/lib.rs | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/core/authority-discovery/src/lib.rs b/core/authority-discovery/src/lib.rs index 82c644fe393ba..987169ead90b1 100644 --- a/core/authority-discovery/src/lib.rs +++ b/core/authority-discovery/src/lib.rs @@ -241,13 +241,11 @@ where .get(key) .ok_or(Error::MatchingHashedAuthorityIdWithAuthorityId)?; - let (signature, addresses): (Signature, Vec); - { - let mut signed_addresses = - schema::SignedAuthorityAddresses::decode(value).map_err(Error::Decoding)?; - signature = Signature(std::mem::replace(&mut signed_addresses.signature, vec![])); - addresses = std::mem::replace(&mut signed_addresses.addresses, vec![]); - } + let schema::SignedAuthorityAddresses { + signature, + addresses, + } = schema::SignedAuthorityAddresses::decode(value).map_err(Error::Decoding)?; + let signature = Signature(signature); let is_verified = self .client @@ -552,7 +550,7 @@ mod tests { &self, _: &BlockId, _: ExecutionContext, - _: Option>, + _: Option<&std::vec::Vec>, _: Vec, ) -> std::result::Result< NativeOrEncoded>, @@ -567,10 +565,10 @@ mod tests { &self, _: &BlockId, _: ExecutionContext, - args: Option<(Vec, Signature, AuthorityId)>, + args: Option<(&Vec, &Signature, &AuthorityId)>, _: Vec, ) -> std::result::Result, client::error::Error> { - if args.unwrap().1 == Signature("test-signature-1".as_bytes().to_vec()) { + if *args.unwrap().1 == Signature("test-signature-1".as_bytes().to_vec()) { return Ok(NativeOrEncoded::Native(true)); } return Ok(NativeOrEncoded::Native(false)); @@ -655,11 +653,11 @@ mod tests { // Create sample dht event. let authority_id_1 = hash_authority_id("test-authority-id-1".as_bytes()).unwrap(); - let address_1 = "/ip6/2001:db8::".to_string(); + let address_1: libp2p::Multiaddr = "/ip6/2001:db8::".parse().unwrap(); let mut serialized_addresses = vec![]; schema::AuthorityAddresses { - addresses: vec![address_1.clone()], + addresses: vec![address_1.to_vec()], } .encode(&mut serialized_addresses) .unwrap(); @@ -687,7 +685,7 @@ mod tests { network.set_priority_group_call.lock().unwrap()[0], ( "authorities".to_string(), - HashSet::from_iter(vec![address_1.parse().unwrap()].into_iter()) + HashSet::from_iter(vec![address_1.clone()].into_iter()) ) );