From e055af136912a7b21ecd619d782c81e3281b8e58 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 26 Dec 2022 20:48:21 +0300 Subject: [PATCH 1/5] Convert `Peerset` into a sync struct --- Cargo.lock | 1 + client/network/src/behaviour.rs | 4 +- client/network/src/protocol.rs | 50 ++--- .../src/protocol/notifications/behaviour.rs | 2 +- .../src/protocol/notifications/tests.rs | 2 +- client/network/src/request_responses.rs | 10 +- client/network/src/service.rs | 6 +- client/peerset/Cargo.toml | 1 + client/peerset/src/lib.rs | 211 ++++++++---------- client/peerset/tests/fuzz.rs | 12 +- client/service/src/lib.rs | 1 + 11 files changed, 139 insertions(+), 161 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8412781c682e3..19773cf83e53a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7998,6 +7998,7 @@ dependencies = [ "futures", "libp2p", "log", + "parking_lot 0.12.1", "rand 0.8.5", "sc-utils", "serde_json", diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index 3a977edbca574..0e85070983e9b 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -40,7 +40,7 @@ use sc_network_common::{ }, request_responses::{IfDisconnected, ProtocolConfig, RequestFailure}, }; -use sc_peerset::{PeersetHandle, ReputationChange}; +use sc_peerset::{Peerset, ReputationChange}; use sp_blockchain::HeaderBackend; use sp_runtime::traits::Block as BlockT; use std::{collections::HashSet, time::Duration}; @@ -189,7 +189,7 @@ where local_public_key: PublicKey, disco_config: DiscoveryConfig, request_response_protocols: Vec, - peerset: PeersetHandle, + peerset: Peerset, ) -> Result { Ok(Self { substrate, diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 1f0ec9c0d120d..15323ea944745 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -162,7 +162,7 @@ pub struct Protocol { /// Number of slots to allocate to light nodes. default_peers_set_num_light: usize, /// Used to report reputation changes. - peerset_handle: sc_peerset::PeersetHandle, + peerset: sc_peerset::Peerset, /// Handles opening the unique substream and sending and receiving raw messages. behaviour: Notifications, /// List of notifications protocols that have been registered. @@ -214,7 +214,7 @@ where metrics_registry: Option<&Registry>, chain_sync: Box>, block_announces_protocol: sc_network_common::config::NonDefaultSetConfig, - ) -> error::Result<(Self, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> { + ) -> error::Result<(Self, sc_peerset::Peerset, Vec<(PeerId, Multiaddr)>)> { let info = chain.info(); let boot_node_ids = { @@ -255,7 +255,7 @@ where let mut known_addresses = Vec::new(); - let (peerset, peerset_handle) = { + let peerset = { let mut sets = Vec::with_capacity(NUM_HARDCODED_PEERSETS + network_config.extra_sets.len()); @@ -307,7 +307,7 @@ where let behaviour = { Notifications::new( - peerset, + peerset.clone(), // NOTE: Block announcement protocol is still very much hardcoded into `Protocol`. // This protocol must be the first notification protocol given to // `Notifications` @@ -351,7 +351,7 @@ where network_config.default_peers_set.in_peers; total.saturating_sub(network_config.default_peers_set_num_full) as usize }, - peerset_handle: peerset_handle.clone(), + peerset: peerset.clone(), behaviour, notification_protocols: iter::once(block_announces_protocol.notifications_protocol) .chain(network_config.extra_sets.iter().map(|s| s.notifications_protocol.clone())) @@ -366,7 +366,7 @@ where block_announce_data_cache, }; - Ok((protocol, peerset_handle, known_addresses)) + Ok((protocol, peerset, known_addresses)) } /// Returns the list of all the peers we have an open channel to. @@ -482,7 +482,7 @@ where /// Adjusts the reputation of a node. pub fn report_peer(&self, who: PeerId, reputation: sc_peerset::ReputationChange) { - self.peerset_handle.report_peer(who, reputation) + self.peerset.report_peer(who, reputation) } /// Perform time based maintenance. @@ -517,7 +517,7 @@ where "Peer is on different chain (our genesis: {} theirs: {})", self.genesis_hash, status.genesis_hash ); - self.peerset_handle.report_peer(who, rep::GENESIS_MISMATCH); + self.peerset.report_peer(who, rep::GENESIS_MISMATCH); self.behaviour.disconnect_peer(&who, HARDCODED_PEERSETS_SYNC); if self.boot_node_ids.contains(&who) { @@ -537,7 +537,7 @@ where // we're not interested in light peers if status.roles.is_light() { debug!(target: "sync", "Peer {} is unable to serve light requests", who); - self.peerset_handle.report_peer(who, rep::BAD_ROLE); + self.peerset.report_peer(who, rep::BAD_ROLE); self.behaviour.disconnect_peer(&who, HARDCODED_PEERSETS_SYNC); return Err(()) } @@ -550,7 +550,7 @@ where .saturated_into::(); if blocks_difference > LIGHT_MAXIMAL_BLOCKS_DIFFERENCE { debug!(target: "sync", "Peer {} is far behind us and will unable to serve light requests", who); - self.peerset_handle.report_peer(who, rep::PEER_BEHIND_US_LIGHT); + self.peerset.report_peer(who, rep::PEER_BEHIND_US_LIGHT); self.behaviour.disconnect_peer(&who, HARDCODED_PEERSETS_SYNC); return Err(()) } @@ -595,7 +595,7 @@ where Ok(req) => req, Err(BadPeer(id, repu)) => { self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC); - self.peerset_handle.report_peer(id, repu); + self.peerset.report_peer(id, repu); return Err(()) }, } @@ -788,33 +788,33 @@ where /// Set whether the syncing peers set is in reserved-only mode. pub fn set_reserved_only(&self, reserved_only: bool) { - self.peerset_handle.set_reserved_only(HARDCODED_PEERSETS_SYNC, reserved_only); + self.peerset.set_reserved_only(HARDCODED_PEERSETS_SYNC, reserved_only); } /// Removes a `PeerId` from the list of reserved peers for syncing purposes. pub fn remove_reserved_peer(&self, peer: PeerId) { - self.peerset_handle.remove_reserved_peer(HARDCODED_PEERSETS_SYNC, peer); + self.peerset.remove_reserved_peer(HARDCODED_PEERSETS_SYNC, peer); } /// Returns the list of reserved peers. - pub fn reserved_peers(&self) -> impl Iterator { + pub fn reserved_peers(&self) -> Vec { self.behaviour.reserved_peers(HARDCODED_PEERSETS_SYNC) } /// Adds a `PeerId` to the list of reserved peers for syncing purposes. pub fn add_reserved_peer(&self, peer: PeerId) { - self.peerset_handle.add_reserved_peer(HARDCODED_PEERSETS_SYNC, peer); + self.peerset.add_reserved_peer(HARDCODED_PEERSETS_SYNC, peer); } /// Sets the list of reserved peers for syncing purposes. pub fn set_reserved_peers(&self, peers: HashSet) { - self.peerset_handle.set_reserved_peers(HARDCODED_PEERSETS_SYNC, peers); + self.peerset.set_reserved_peers(HARDCODED_PEERSETS_SYNC, peers); } /// Sets the list of reserved peers for the given protocol/peerset. pub fn set_reserved_peerset_peers(&self, protocol: ProtocolName, peers: HashSet) { if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) { - self.peerset_handle.set_reserved_peers(sc_peerset::SetId::from(index), peers); + self.peerset.set_reserved_peers(sc_peerset::SetId::from(index), peers); } else { error!( target: "sub-libp2p", @@ -827,7 +827,7 @@ where /// Removes a `PeerId` from the list of reserved peers. pub fn remove_set_reserved_peer(&self, protocol: ProtocolName, peer: PeerId) { if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) { - self.peerset_handle.remove_reserved_peer(sc_peerset::SetId::from(index), peer); + self.peerset.remove_reserved_peer(sc_peerset::SetId::from(index), peer); } else { error!( target: "sub-libp2p", @@ -840,7 +840,7 @@ where /// Adds a `PeerId` to the list of reserved peers. pub fn add_set_reserved_peer(&self, protocol: ProtocolName, peer: PeerId) { if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) { - self.peerset_handle.add_reserved_peer(sc_peerset::SetId::from(index), peer); + self.peerset.add_reserved_peer(sc_peerset::SetId::from(index), peer); } else { error!( target: "sub-libp2p", @@ -855,14 +855,14 @@ where /// Can be called multiple times with the same `PeerId`s. pub fn add_default_set_discovered_nodes(&mut self, peer_ids: impl Iterator) { for peer_id in peer_ids { - self.peerset_handle.add_to_peers_set(HARDCODED_PEERSETS_SYNC, peer_id); + self.peerset.add_to_peers_set(HARDCODED_PEERSETS_SYNC, peer_id); } } /// Add a peer to a peers set. pub fn add_to_peers_set(&self, protocol: ProtocolName, peer: PeerId) { if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) { - self.peerset_handle.add_to_peers_set(sc_peerset::SetId::from(index), peer); + self.peerset.add_to_peers_set(sc_peerset::SetId::from(index), peer); } else { error!( target: "sub-libp2p", @@ -875,7 +875,7 @@ where /// Remove a peer from a peers set. pub fn remove_from_peers_set(&self, protocol: ProtocolName, peer: PeerId) { if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) { - self.peerset_handle.remove_from_peers_set(sc_peerset::SetId::from(index), peer); + self.peerset.remove_from_peers_set(sc_peerset::SetId::from(index), peer); } else { error!( target: "sub-libp2p", @@ -1092,7 +1092,7 @@ where peer_id, msg, ); - self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE); + self.peerset.report_peer(peer_id, rep::BAD_MESSAGE); CustomMessageOutcome::None }, Err(err) => { @@ -1115,7 +1115,7 @@ where err, err2, ); - self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE); + self.peerset.report_peer(peer_id, rep::BAD_MESSAGE); CustomMessageOutcome::None }, } @@ -1150,7 +1150,7 @@ where debug!(target: "sync", "Failed to parse remote handshake: {}", err); self.bad_handshake_substreams.insert((peer_id, set_id)); self.behaviour.disconnect_peer(&peer_id, set_id); - self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE); + self.peerset.report_peer(peer_id, rep::BAD_MESSAGE); CustomMessageOutcome::None }, } diff --git a/client/network/src/protocol/notifications/behaviour.rs b/client/network/src/protocol/notifications/behaviour.rs index 04f6fe445ac63..804bd11493d49 100644 --- a/client/network/src/protocol/notifications/behaviour.rs +++ b/client/network/src/protocol/notifications/behaviour.rs @@ -550,7 +550,7 @@ impl Notifications { } /// Returns the list of reserved peers. - pub fn reserved_peers(&self, set_id: sc_peerset::SetId) -> impl Iterator { + pub fn reserved_peers(&self, set_id: sc_peerset::SetId) -> Vec { self.peerset.reserved_peers(set_id) } diff --git a/client/network/src/protocol/notifications/tests.rs b/client/network/src/protocol/notifications/tests.rs index fa79366d20283..9436f4e3190cd 100644 --- a/client/network/src/protocol/notifications/tests.rs +++ b/client/network/src/protocol/notifications/tests.rs @@ -63,7 +63,7 @@ fn build_nodes() -> (Swarm, Swarm) { .timeout(Duration::from_secs(20)) .boxed(); - let (peerset, _) = sc_peerset::Peerset::from_config(sc_peerset::PeersetConfig { + let peerset = sc_peerset::Peerset::from_config(sc_peerset::PeersetConfig { sets: vec![sc_peerset::SetConfig { in_peers: 25, out_peers: 25, diff --git a/client/network/src/request_responses.rs b/client/network/src/request_responses.rs index d49cbd8051341..148199e46f8e2 100644 --- a/client/network/src/request_responses.rs +++ b/client/network/src/request_responses.rs @@ -65,7 +65,7 @@ use std::{ }; pub use libp2p::request_response::{InboundFailure, OutboundFailure, RequestId}; -use sc_peerset::{PeersetHandle, BANNED_THRESHOLD}; +use sc_peerset::{Peerset, BANNED_THRESHOLD}; /// Event generated by the [`RequestResponsesBehaviour`]. #[derive(Debug)] @@ -150,7 +150,7 @@ pub struct RequestResponsesBehaviour { send_feedback: HashMap>, /// Primarily used to get a reputation of a node. - peerset: PeersetHandle, + peerset: Peerset, /// Pending message request, holds `MessageRequest` as a Future state to poll it /// until we get a response from `Peerset` @@ -185,7 +185,7 @@ impl RequestResponsesBehaviour { /// the same protocol is passed twice. pub fn new( list: impl Iterator, - peerset: PeersetHandle, + peerset: Peerset, ) -> Result { let mut protocols = HashMap::new(); for protocol in list { @@ -949,9 +949,9 @@ mod tests { }], }; - let (peerset, handle) = Peerset::from_config(config); + let peerset = Peerset::from_config(config); - let behaviour = RequestResponsesBehaviour::new(list, handle).unwrap(); + let behaviour = RequestResponsesBehaviour::new(list, peerset.clone()).unwrap(); let mut swarm = Swarm::new(transport, behaviour, keypair.public().to_peer_id()); let listen_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 08e498299a1d3..8c75c25a988cf 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -71,7 +71,7 @@ use sc_network_common::{ sync::SyncStatus, ExHashT, }; -use sc_peerset::PeersetHandle; +use sc_peerset::Peerset; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_blockchain::HeaderBackend; use sp_runtime::traits::{Block as BlockT, NumberFor, Zero}; @@ -116,7 +116,7 @@ pub struct NetworkService { bandwidth: Arc, /// Peerset manager (PSM); manages the reputation of nodes and indicates the network which /// nodes it should be connected to or not. - peerset: PeersetHandle, + peerset: Peerset, /// Channel that sends messages to the actual worker. to_worker: TracingUnboundedSender>, /// Interface that can be used to delegate calls to `ChainSync` @@ -675,7 +675,7 @@ where } /// Returns the list of reserved peers. - pub fn reserved_peers(&self) -> impl Iterator { + pub fn reserved_peers(&self) -> Vec { self.network_service.behaviour().user_protocol().reserved_peers() } } diff --git a/client/peerset/Cargo.toml b/client/peerset/Cargo.toml index 8625fe40c20d4..2377734f4f5f1 100644 --- a/client/peerset/Cargo.toml +++ b/client/peerset/Cargo.toml @@ -17,6 +17,7 @@ targets = ["x86_64-unknown-linux-gnu"] futures = "0.3.21" libp2p = { version = "0.49.0", default-features = false } log = "0.4.17" +parking_lot = "0.12.1" serde_json = "1.0.85" wasm-timer = "0.2" sc-utils = { version = "4.0.0-dev", path = "../utils" } diff --git a/client/peerset/src/lib.rs b/client/peerset/src/lib.rs index ec09835c4898e..4481c118adcab 100644 --- a/client/peerset/src/lib.rs +++ b/client/peerset/src/lib.rs @@ -34,13 +34,14 @@ mod peersstate; -use futures::{channel::oneshot, prelude::*}; +use futures::prelude::*; use log::{debug, error, trace}; -use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; +use parking_lot::Mutex; use serde_json::json; use std::{ collections::{HashMap, HashSet, VecDeque}, pin::Pin, + sync::Arc, task::{Context, Poll}, time::{Duration, Instant}, }; @@ -56,18 +57,6 @@ const DISCONNECT_REPUTATION_CHANGE: i32 = -256; /// the list. const FORGET_AFTER: Duration = Duration::from_secs(3600); -#[derive(Debug)] -enum Action { - AddReservedPeer(SetId, PeerId), - RemoveReservedPeer(SetId, PeerId), - SetReservedPeers(SetId, HashSet), - SetReservedOnly(SetId, bool), - ReportPeer(PeerId, ReputationChange), - AddToPeersSet(SetId, PeerId), - RemoveFromPeersSet(SetId, PeerId), - PeerReputation(PeerId, oneshot::Sender), -} - /// Identifier of a set in the peerset. /// /// Can be constructed using the `From` trait implementation based on the index of the set @@ -118,11 +107,18 @@ impl ReputationChange { /// Shared handle to the peer set manager (PSM). Distributed around the code. #[derive(Debug, Clone)] -pub struct PeersetHandle { - tx: TracingUnboundedSender, +pub struct Peerset { + inner: Arc>, } -impl PeersetHandle { +impl Peerset { + /// Create PSM. + pub fn from_config(config: PeersetConfig) -> Self { + Self { + inner: Arc::new(Mutex::new(PeersetInner::from_config(config))) + } + } + /// Adds a new reserved peer. The peerset will make an effort to always remain connected to /// this peer. /// @@ -131,50 +127,75 @@ impl PeersetHandle { /// > **Note**: Keep in mind that the networking has to know an address for this node, /// > otherwise it will not be able to connect to it. pub fn add_reserved_peer(&self, set_id: SetId, peer_id: PeerId) { - let _ = self.tx.unbounded_send(Action::AddReservedPeer(set_id, peer_id)); + self.inner.lock().add_reserved_peer(set_id, peer_id); } /// Remove a previously-added reserved peer. /// /// Has no effect if the node was not a reserved peer. pub fn remove_reserved_peer(&self, set_id: SetId, peer_id: PeerId) { - let _ = self.tx.unbounded_send(Action::RemoveReservedPeer(set_id, peer_id)); + self.inner.lock().remove_reserved_peer(set_id, peer_id); } /// Sets whether or not the peerset only has connections with nodes marked as reserved for /// the given set. pub fn set_reserved_only(&self, set_id: SetId, reserved: bool) { - let _ = self.tx.unbounded_send(Action::SetReservedOnly(set_id, reserved)); + self.inner.lock().set_reserved_only(set_id, reserved); } /// Set reserved peers to the new set. pub fn set_reserved_peers(&self, set_id: SetId, peer_ids: HashSet) { - let _ = self.tx.unbounded_send(Action::SetReservedPeers(set_id, peer_ids)); + self.inner.lock().set_reserved_peers(set_id, peer_ids); } /// Reports an adjustment to the reputation of the given peer. pub fn report_peer(&self, peer_id: PeerId, score_diff: ReputationChange) { - let _ = self.tx.unbounded_send(Action::ReportPeer(peer_id, score_diff)); + self.inner.lock().report_peer(peer_id, score_diff); } /// Add a peer to a set. pub fn add_to_peers_set(&self, set_id: SetId, peer_id: PeerId) { - let _ = self.tx.unbounded_send(Action::AddToPeersSet(set_id, peer_id)); + self.inner.lock().add_to_peers_set(set_id, peer_id); } /// Remove a peer from a set. pub fn remove_from_peers_set(&self, set_id: SetId, peer_id: PeerId) { - let _ = self.tx.unbounded_send(Action::RemoveFromPeersSet(set_id, peer_id)); + self.inner.lock().remove_from_peers_set(set_id, peer_id); } /// Returns the reputation value of the peer. - pub async fn peer_reputation(self, peer_id: PeerId) -> Result { - let (tx, rx) = oneshot::channel(); + pub fn peer_reputation(&self, peer_id: PeerId) -> impl Future> + Send { + // TODO: refactor into sync function. + future::ready(Ok(self.inner.lock().peer_reputation(peer_id))) + } + + /// Notify PSM that we recived incoming connection. Will be answered either with + /// a corresponding `Accept` or `Reject`, except if we were already connected to this peer. + pub fn incoming(&self, set_id: SetId, peer_id: PeerId, index: IncomingIndex) { + self.inner.lock().incoming(set_id, peer_id, index); + } - let _ = self.tx.unbounded_send(Action::PeerReputation(peer_id, tx)); + /// Notify PSM that we dropped an active connection with a peer, or that we failed to connect. + /// + /// Must only be called after the PSM has either generated a `Connect` message with this + /// `PeerId`, or accepted an incoming connection with this `PeerId`. + pub fn dropped(&self, set_id: SetId, peer_id: PeerId, reason: DropReason) { + self.inner.lock().dropped(set_id, peer_id, reason); + } - // The channel can only be closed if the peerset no longer exists. - rx.await.map_err(|_| ()) + /// Produces a JSON object containing the state of the peerset manager, for debugging purposes. + pub fn debug_info(&self) -> serde_json::Value { + self.inner.lock().debug_info() + } + + /// Returns the list of reserved peers. + pub fn reserved_peers(&self, set_id: SetId) -> Vec { + self.inner.lock().reserved_peers(set_id) + } + + /// Returns the number of peers that we have discovered. + pub fn num_discovered_peers(&self) -> usize { + self.inner.lock().num_discovered_peers() } } @@ -250,17 +271,13 @@ pub struct SetConfig { /// Implements the `Stream` trait and can be polled for messages. The `Stream` never ends and never /// errors. #[derive(Debug)] -pub struct Peerset { +pub struct PeersetInner { /// Underlying data structure for the nodes's states. data: peersstate::PeersState, /// For each set, lists of nodes that don't occupy slots and that we should try to always be /// connected to, and whether only reserved nodes are accepted. Is kept in sync with the list /// of non-slot-occupying nodes in [`Peerset::data`]. reserved_nodes: Vec<(HashSet, bool)>, - /// Receiver for messages from the `PeersetHandle` and from `tx`. - rx: TracingUnboundedReceiver, - /// Sending side of `rx`. - tx: TracingUnboundedSender, /// Queue of messages to be emitted when the `Peerset` is polled. message_queue: VecDeque, /// When the `Peerset` was created. @@ -272,13 +289,9 @@ pub struct Peerset { next_periodic_alloc_slots: Delay, } -impl Peerset { +impl PeersetInner { /// Builds a new peerset from the given configuration. - pub fn from_config(config: PeersetConfig) -> (Self, PeersetHandle) { - let (tx, rx) = tracing_unbounded("mpsc_peerset_messages"); - - let handle = PeersetHandle { tx: tx.clone() }; - + fn from_config(config: PeersetConfig) -> PeersetInner { let mut peerset = { let now = Instant::now(); @@ -286,8 +299,6 @@ impl Peerset { data: peersstate::PeersState::new(config.sets.iter().map(|set| { peersstate::SetConfig { in_peers: set.in_peers, out_peers: set.out_peers } })), - tx, - rx, reserved_nodes: config .sets .iter() @@ -318,10 +329,10 @@ impl Peerset { peerset.alloc_slots(SetId(set_index)); } - (peerset, handle) + peerset } - fn on_add_reserved_peer(&mut self, set_id: SetId, peer_id: PeerId) { + fn add_reserved_peer(&mut self, set_id: SetId, peer_id: PeerId) { let newly_inserted = self.reserved_nodes[set_id.0].0.insert(peer_id); if !newly_inserted { return @@ -331,7 +342,7 @@ impl Peerset { self.alloc_slots(set_id); } - fn on_remove_reserved_peer(&mut self, set_id: SetId, peer_id: PeerId) { + fn remove_reserved_peer(&mut self, set_id: SetId, peer_id: PeerId) { if !self.reserved_nodes[set_id.0].0.remove(&peer_id) { return } @@ -351,7 +362,7 @@ impl Peerset { } } - fn on_set_reserved_peers(&mut self, set_id: SetId, peer_ids: HashSet) { + fn set_reserved_peers(&mut self, set_id: SetId, peer_ids: HashSet) { // Determine the difference between the current group and the new list. let (to_insert, to_remove) = { let to_insert = peer_ids @@ -367,15 +378,15 @@ impl Peerset { }; for node in to_insert { - self.on_add_reserved_peer(set_id, node); + self.add_reserved_peer(set_id, node); } for node in to_remove { - self.on_remove_reserved_peer(set_id, node); + self.remove_reserved_peer(set_id, node); } } - fn on_set_reserved_only(&mut self, set_id: SetId, reserved_only: bool) { + fn set_reserved_only(&mut self, set_id: SetId, reserved_only: bool) { self.reserved_nodes[set_id.0].1 = reserved_only; if reserved_only { @@ -399,22 +410,20 @@ impl Peerset { } /// Returns the list of reserved peers. - pub fn reserved_peers(&self, set_id: SetId) -> impl Iterator { - self.reserved_nodes[set_id.0].0.iter() + fn reserved_peers(&self, set_id: SetId) -> Vec { + self.reserved_nodes[set_id.0].0.iter().map(ToOwned::to_owned).collect() } /// Adds a node to the given set. The peerset will, if possible and not already the case, /// try to connect to it. - /// - /// > **Note**: This has the same effect as [`PeersetHandle::add_to_peers_set`]. - pub fn add_to_peers_set(&mut self, set_id: SetId, peer_id: PeerId) { + fn add_to_peers_set(&mut self, set_id: SetId, peer_id: PeerId) { if let peersstate::Peer::Unknown(entry) = self.data.peer(set_id.0, &peer_id) { entry.discover(); self.alloc_slots(set_id); } } - fn on_remove_from_peers_set(&mut self, set_id: SetId, peer_id: PeerId) { + fn remove_from_peers_set(&mut self, set_id: SetId, peer_id: PeerId) { // Don't do anything if node is reserved. if self.reserved_nodes[set_id.0].0.contains(&peer_id) { return @@ -432,7 +441,7 @@ impl Peerset { } } - fn on_report_peer(&mut self, peer_id: PeerId, change: ReputationChange) { + fn report_peer(&mut self, peer_id: PeerId, change: ReputationChange) { // We want reputations to be up-to-date before adjusting them. self.update_time(); @@ -464,9 +473,8 @@ impl Peerset { } } - fn on_peer_reputation(&mut self, peer_id: PeerId, pending_response: oneshot::Sender) { - let reputation = self.data.peer_reputation(peer_id); - let _ = pending_response.send(reputation.reputation()); + fn peer_reputation(&mut self, peer_id: PeerId) -> i32{ + self.data.peer_reputation(peer_id).reputation() } /// Updates the value of `self.latest_time_update` and performs all the updates that happen @@ -614,7 +622,7 @@ impl Peerset { // Implementation note: because of concurrency issues, it is possible that we push a `Connect` // message to the output channel with a `PeerId`, and that `incoming` gets called with the same // `PeerId` before that message has been read by the user. In this situation we must not answer. - pub fn incoming(&mut self, set_id: SetId, peer_id: PeerId, index: IncomingIndex) { + fn incoming(&mut self, set_id: SetId, peer_id: PeerId, index: IncomingIndex) { trace!(target: "peerset", "Incoming {:?}", peer_id); self.update_time(); @@ -649,7 +657,7 @@ impl Peerset { /// /// Must only be called after the PSM has either generated a `Connect` message with this /// `PeerId`, or accepted an incoming connection with this `PeerId`. - pub fn dropped(&mut self, set_id: SetId, peer_id: PeerId, reason: DropReason) { + fn dropped(&mut self, set_id: SetId, peer_id: PeerId, reason: DropReason) { // We want reputations to be up-to-date before adjusting them. self.update_time(); @@ -667,22 +675,14 @@ impl Peerset { } if let DropReason::Refused = reason { - self.on_remove_from_peers_set(set_id, peer_id); + self.remove_from_peers_set(set_id, peer_id); } self.alloc_slots(set_id); } - /// Reports an adjustment to the reputation of the given peer. - pub fn report_peer(&mut self, peer_id: PeerId, score_diff: ReputationChange) { - // We don't immediately perform the adjustments in order to have state consistency. We - // don't want the reporting here to take priority over messages sent using the - // `PeersetHandle`. - let _ = self.tx.unbounded_send(Action::ReportPeer(peer_id, score_diff)); - } - /// Produces a JSON object containing the state of the peerset manager, for debugging purposes. - pub fn debug_info(&mut self) -> serde_json::Value { + fn debug_info(&mut self) -> serde_json::Value { self.update_time(); json!({ @@ -714,7 +714,7 @@ impl Peerset { } /// Returns the number of peers that we have discovered. - pub fn num_discovered_peers(&self) -> usize { + fn num_discovered_peers(&self) -> usize { self.data.peers().len() } } @@ -722,44 +722,19 @@ impl Peerset { impl Stream for Peerset { type Item = Message; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - loop { - if let Some(message) = self.message_queue.pop_front() { - return Poll::Ready(Some(message)) - } - - if Future::poll(Pin::new(&mut self.next_periodic_alloc_slots), cx).is_ready() { - self.next_periodic_alloc_slots = Delay::new(Duration::new(1, 0)); - - for set_index in 0..self.data.num_sets() { - self.alloc_slots(SetId(set_index)); - } - } - - let action = match Stream::poll_next(Pin::new(&mut self.rx), cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(Some(event)) => event, - Poll::Ready(None) => return Poll::Pending, - }; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let mut inner = self.inner.lock(); + if let Some(message) = inner.message_queue.pop_front() { + return Poll::Ready(Some(message)) + } + if Future::poll(Pin::new(&mut inner.next_periodic_alloc_slots), cx).is_ready() { + inner.next_periodic_alloc_slots = Delay::new(Duration::new(1, 0)); - match action { - Action::AddReservedPeer(set_id, peer_id) => - self.on_add_reserved_peer(set_id, peer_id), - Action::RemoveReservedPeer(set_id, peer_id) => - self.on_remove_reserved_peer(set_id, peer_id), - Action::SetReservedPeers(set_id, peer_ids) => - self.on_set_reserved_peers(set_id, peer_ids), - Action::SetReservedOnly(set_id, reserved) => - self.on_set_reserved_only(set_id, reserved), - Action::ReportPeer(peer_id, score_diff) => self.on_report_peer(peer_id, score_diff), - Action::AddToPeersSet(sets_name, peer_id) => - self.add_to_peers_set(sets_name, peer_id), - Action::RemoveFromPeersSet(sets_name, peer_id) => - self.on_remove_from_peers_set(sets_name, peer_id), - Action::PeerReputation(peer_id, pending_response) => - self.on_peer_reputation(peer_id, pending_response), + for set_index in 0..inner.data.num_sets() { + inner.alloc_slots(SetId(set_index)); } } + Poll::Pending } } @@ -770,7 +745,7 @@ pub enum DropReason { /// Substream or connection has been explicitly refused by the target. In other words, the /// peer doesn't actually belong to this set. /// - /// This has the side effect of calling [`PeersetHandle::remove_from_peers_set`]. + /// This has the side effect of calling [`Peerset::remove_from_peers_set`]. Refused, } @@ -814,9 +789,9 @@ mod tests { }], }; - let (peerset, handle) = Peerset::from_config(config); - handle.add_reserved_peer(SetId::from(0), reserved_peer); - handle.add_reserved_peer(SetId::from(0), reserved_peer2); + let peerset = Peerset::from_config(config); + peerset.add_reserved_peer(SetId::from(0), reserved_peer); + peerset.add_reserved_peer(SetId::from(0), reserved_peer2); assert_messages( peerset, @@ -847,7 +822,7 @@ mod tests { }], }; - let (mut peerset, _handle) = Peerset::from_config(config); + let peerset = Peerset::from_config(config); peerset.incoming(SetId::from(0), incoming, ii); peerset.incoming(SetId::from(0), incoming, ii4); peerset.incoming(SetId::from(0), incoming2, ii2); @@ -878,7 +853,7 @@ mod tests { }], }; - let (mut peerset, _) = Peerset::from_config(config); + let peerset = Peerset::from_config(config); peerset.incoming(SetId::from(0), incoming, ii); assert_messages(peerset, vec![Message::Reject(ii)]); @@ -899,7 +874,7 @@ mod tests { }], }; - let (mut peerset, _handle) = Peerset::from_config(config); + let peerset = Peerset::from_config(config); peerset.add_to_peers_set(SetId::from(0), discovered); peerset.add_to_peers_set(SetId::from(0), discovered); peerset.add_to_peers_set(SetId::from(0), discovered2); @@ -915,7 +890,7 @@ mod tests { #[test] fn test_peerset_banned() { - let (mut peerset, handle) = Peerset::from_config(PeersetConfig { + let mut peerset = Peerset::from_config(PeersetConfig { sets: vec![SetConfig { in_peers: 25, out_peers: 25, @@ -927,7 +902,7 @@ mod tests { // We ban a node by setting its reputation under the threshold. let peer_id = PeerId::random(); - handle.report_peer(peer_id, ReputationChange::new(BANNED_THRESHOLD - 1, "")); + peerset.report_peer(peer_id, ReputationChange::new(BANNED_THRESHOLD - 1, "")); let fut = futures::future::poll_fn(move |cx| { // We need one polling for the message to be processed. @@ -958,7 +933,7 @@ mod tests { #[test] fn test_relloc_after_banned() { - let (mut peerset, handle) = Peerset::from_config(PeersetConfig { + let mut peerset = Peerset::from_config(PeersetConfig { sets: vec![SetConfig { in_peers: 25, out_peers: 25, @@ -970,7 +945,7 @@ mod tests { // We ban a node by setting its reputation under the threshold. let peer_id = PeerId::random(); - handle.report_peer(peer_id, ReputationChange::new(BANNED_THRESHOLD - 1, "")); + peerset.report_peer(peer_id, ReputationChange::new(BANNED_THRESHOLD - 1, "")); let fut = futures::future::poll_fn(move |cx| { // We need one polling for the message to be processed. diff --git a/client/peerset/tests/fuzz.rs b/client/peerset/tests/fuzz.rs index 48c5cb341c35a..a100892eb6be6 100644 --- a/client/peerset/tests/fuzz.rs +++ b/client/peerset/tests/fuzz.rs @@ -47,7 +47,7 @@ fn test_once() { // Nodes that we have reserved. Always a subset of `known_nodes`. let mut reserved_nodes = HashSet::::new(); - let (mut peerset, peerset_handle) = Peerset::from_config(PeersetConfig { + let mut peerset = Peerset::from_config(PeersetConfig { sets: vec![SetConfig { bootnodes: (0..Uniform::new_inclusive(0, 4).sample(&mut rng)) .map(|_| { @@ -122,7 +122,7 @@ fn test_once() { 2 => if let Some(id) = known_nodes.iter().choose(&mut rng) { let val = Uniform::new_inclusive(i32::MIN, i32::MAX).sample(&mut rng); - peerset_handle.report_peer(*id, ReputationChange::new(val, "")); + peerset.report_peer(*id, ReputationChange::new(val, "")); }, // If we generate 3, disconnect from a random node. @@ -149,22 +149,22 @@ fn test_once() { }, // 5 and 6 are the reserved-only mode. - 5 => peerset_handle.set_reserved_only(SetId::from(0), true), - 6 => peerset_handle.set_reserved_only(SetId::from(0), false), + 5 => peerset.set_reserved_only(SetId::from(0), true), + 6 => peerset.set_reserved_only(SetId::from(0), false), // 7 and 8 are about switching a random node in or out of reserved mode. 7 => { if let Some(id) = known_nodes.iter().filter(|n| !reserved_nodes.contains(*n)).choose(&mut rng) { - peerset_handle.add_reserved_peer(SetId::from(0), *id); + peerset.add_reserved_peer(SetId::from(0), *id); reserved_nodes.insert(*id); } }, 8 => if let Some(id) = reserved_nodes.iter().choose(&mut rng).cloned() { reserved_nodes.remove(&id); - peerset_handle.remove_reserved_peer(SetId::from(0), id); + peerset.remove_reserved_peer(SetId::from(0), id); }, _ => unreachable!(), diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 8b3a29ba4032a..0889b52ea5386 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -257,6 +257,7 @@ async fn build_network_future< sc_rpc::system::Request::NetworkReservedPeers(sender) => { let reserved_peers = network.reserved_peers(); let reserved_peers = reserved_peers + .iter() .map(|peer_id| peer_id.to_base58()) .collect(); From c26087c7b1df003a96fba45b9e2fb14113b3453a Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 26 Dec 2022 21:40:37 +0300 Subject: [PATCH 2/5] rustfmt --- client/peerset/src/lib.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/client/peerset/src/lib.rs b/client/peerset/src/lib.rs index 4481c118adcab..c25367a22cd6b 100644 --- a/client/peerset/src/lib.rs +++ b/client/peerset/src/lib.rs @@ -114,9 +114,7 @@ pub struct Peerset { impl Peerset { /// Create PSM. pub fn from_config(config: PeersetConfig) -> Self { - Self { - inner: Arc::new(Mutex::new(PeersetInner::from_config(config))) - } + Self { inner: Arc::new(Mutex::new(PeersetInner::from_config(config))) } } /// Adds a new reserved peer. The peerset will make an effort to always remain connected to @@ -473,7 +471,7 @@ impl PeersetInner { } } - fn peer_reputation(&mut self, peer_id: PeerId) -> i32{ + fn peer_reputation(&mut self, peer_id: PeerId) -> i32 { self.data.peer_reputation(peer_id).reputation() } From a91c181185fa550780671512cdea3c2a18a5e510 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 27 Dec 2022 17:48:27 +0300 Subject: [PATCH 3/5] Revert merging `Peerset` & `PeersetHandle` in client code --- client/network/src/behaviour.rs | 4 +- client/network/src/protocol.rs | 50 +++++++++---------- .../src/protocol/notifications/behaviour.rs | 2 +- .../src/protocol/notifications/tests.rs | 2 +- client/network/src/request_responses.rs | 10 ++-- client/network/src/service.rs | 6 +-- client/peerset/src/lib.rs | 22 ++++---- client/peerset/tests/fuzz.rs | 12 ++--- client/service/src/lib.rs | 1 - 9 files changed, 54 insertions(+), 55 deletions(-) diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index 0e85070983e9b..3a977edbca574 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -40,7 +40,7 @@ use sc_network_common::{ }, request_responses::{IfDisconnected, ProtocolConfig, RequestFailure}, }; -use sc_peerset::{Peerset, ReputationChange}; +use sc_peerset::{PeersetHandle, ReputationChange}; use sp_blockchain::HeaderBackend; use sp_runtime::traits::Block as BlockT; use std::{collections::HashSet, time::Duration}; @@ -189,7 +189,7 @@ where local_public_key: PublicKey, disco_config: DiscoveryConfig, request_response_protocols: Vec, - peerset: Peerset, + peerset: PeersetHandle, ) -> Result { Ok(Self { substrate, diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 15323ea944745..1f0ec9c0d120d 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -162,7 +162,7 @@ pub struct Protocol { /// Number of slots to allocate to light nodes. default_peers_set_num_light: usize, /// Used to report reputation changes. - peerset: sc_peerset::Peerset, + peerset_handle: sc_peerset::PeersetHandle, /// Handles opening the unique substream and sending and receiving raw messages. behaviour: Notifications, /// List of notifications protocols that have been registered. @@ -214,7 +214,7 @@ where metrics_registry: Option<&Registry>, chain_sync: Box>, block_announces_protocol: sc_network_common::config::NonDefaultSetConfig, - ) -> error::Result<(Self, sc_peerset::Peerset, Vec<(PeerId, Multiaddr)>)> { + ) -> error::Result<(Self, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> { let info = chain.info(); let boot_node_ids = { @@ -255,7 +255,7 @@ where let mut known_addresses = Vec::new(); - let peerset = { + let (peerset, peerset_handle) = { let mut sets = Vec::with_capacity(NUM_HARDCODED_PEERSETS + network_config.extra_sets.len()); @@ -307,7 +307,7 @@ where let behaviour = { Notifications::new( - peerset.clone(), + peerset, // NOTE: Block announcement protocol is still very much hardcoded into `Protocol`. // This protocol must be the first notification protocol given to // `Notifications` @@ -351,7 +351,7 @@ where network_config.default_peers_set.in_peers; total.saturating_sub(network_config.default_peers_set_num_full) as usize }, - peerset: peerset.clone(), + peerset_handle: peerset_handle.clone(), behaviour, notification_protocols: iter::once(block_announces_protocol.notifications_protocol) .chain(network_config.extra_sets.iter().map(|s| s.notifications_protocol.clone())) @@ -366,7 +366,7 @@ where block_announce_data_cache, }; - Ok((protocol, peerset, known_addresses)) + Ok((protocol, peerset_handle, known_addresses)) } /// Returns the list of all the peers we have an open channel to. @@ -482,7 +482,7 @@ where /// Adjusts the reputation of a node. pub fn report_peer(&self, who: PeerId, reputation: sc_peerset::ReputationChange) { - self.peerset.report_peer(who, reputation) + self.peerset_handle.report_peer(who, reputation) } /// Perform time based maintenance. @@ -517,7 +517,7 @@ where "Peer is on different chain (our genesis: {} theirs: {})", self.genesis_hash, status.genesis_hash ); - self.peerset.report_peer(who, rep::GENESIS_MISMATCH); + self.peerset_handle.report_peer(who, rep::GENESIS_MISMATCH); self.behaviour.disconnect_peer(&who, HARDCODED_PEERSETS_SYNC); if self.boot_node_ids.contains(&who) { @@ -537,7 +537,7 @@ where // we're not interested in light peers if status.roles.is_light() { debug!(target: "sync", "Peer {} is unable to serve light requests", who); - self.peerset.report_peer(who, rep::BAD_ROLE); + self.peerset_handle.report_peer(who, rep::BAD_ROLE); self.behaviour.disconnect_peer(&who, HARDCODED_PEERSETS_SYNC); return Err(()) } @@ -550,7 +550,7 @@ where .saturated_into::(); if blocks_difference > LIGHT_MAXIMAL_BLOCKS_DIFFERENCE { debug!(target: "sync", "Peer {} is far behind us and will unable to serve light requests", who); - self.peerset.report_peer(who, rep::PEER_BEHIND_US_LIGHT); + self.peerset_handle.report_peer(who, rep::PEER_BEHIND_US_LIGHT); self.behaviour.disconnect_peer(&who, HARDCODED_PEERSETS_SYNC); return Err(()) } @@ -595,7 +595,7 @@ where Ok(req) => req, Err(BadPeer(id, repu)) => { self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC); - self.peerset.report_peer(id, repu); + self.peerset_handle.report_peer(id, repu); return Err(()) }, } @@ -788,33 +788,33 @@ where /// Set whether the syncing peers set is in reserved-only mode. pub fn set_reserved_only(&self, reserved_only: bool) { - self.peerset.set_reserved_only(HARDCODED_PEERSETS_SYNC, reserved_only); + self.peerset_handle.set_reserved_only(HARDCODED_PEERSETS_SYNC, reserved_only); } /// Removes a `PeerId` from the list of reserved peers for syncing purposes. pub fn remove_reserved_peer(&self, peer: PeerId) { - self.peerset.remove_reserved_peer(HARDCODED_PEERSETS_SYNC, peer); + self.peerset_handle.remove_reserved_peer(HARDCODED_PEERSETS_SYNC, peer); } /// Returns the list of reserved peers. - pub fn reserved_peers(&self) -> Vec { + pub fn reserved_peers(&self) -> impl Iterator { self.behaviour.reserved_peers(HARDCODED_PEERSETS_SYNC) } /// Adds a `PeerId` to the list of reserved peers for syncing purposes. pub fn add_reserved_peer(&self, peer: PeerId) { - self.peerset.add_reserved_peer(HARDCODED_PEERSETS_SYNC, peer); + self.peerset_handle.add_reserved_peer(HARDCODED_PEERSETS_SYNC, peer); } /// Sets the list of reserved peers for syncing purposes. pub fn set_reserved_peers(&self, peers: HashSet) { - self.peerset.set_reserved_peers(HARDCODED_PEERSETS_SYNC, peers); + self.peerset_handle.set_reserved_peers(HARDCODED_PEERSETS_SYNC, peers); } /// Sets the list of reserved peers for the given protocol/peerset. pub fn set_reserved_peerset_peers(&self, protocol: ProtocolName, peers: HashSet) { if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) { - self.peerset.set_reserved_peers(sc_peerset::SetId::from(index), peers); + self.peerset_handle.set_reserved_peers(sc_peerset::SetId::from(index), peers); } else { error!( target: "sub-libp2p", @@ -827,7 +827,7 @@ where /// Removes a `PeerId` from the list of reserved peers. pub fn remove_set_reserved_peer(&self, protocol: ProtocolName, peer: PeerId) { if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) { - self.peerset.remove_reserved_peer(sc_peerset::SetId::from(index), peer); + self.peerset_handle.remove_reserved_peer(sc_peerset::SetId::from(index), peer); } else { error!( target: "sub-libp2p", @@ -840,7 +840,7 @@ where /// Adds a `PeerId` to the list of reserved peers. pub fn add_set_reserved_peer(&self, protocol: ProtocolName, peer: PeerId) { if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) { - self.peerset.add_reserved_peer(sc_peerset::SetId::from(index), peer); + self.peerset_handle.add_reserved_peer(sc_peerset::SetId::from(index), peer); } else { error!( target: "sub-libp2p", @@ -855,14 +855,14 @@ where /// Can be called multiple times with the same `PeerId`s. pub fn add_default_set_discovered_nodes(&mut self, peer_ids: impl Iterator) { for peer_id in peer_ids { - self.peerset.add_to_peers_set(HARDCODED_PEERSETS_SYNC, peer_id); + self.peerset_handle.add_to_peers_set(HARDCODED_PEERSETS_SYNC, peer_id); } } /// Add a peer to a peers set. pub fn add_to_peers_set(&self, protocol: ProtocolName, peer: PeerId) { if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) { - self.peerset.add_to_peers_set(sc_peerset::SetId::from(index), peer); + self.peerset_handle.add_to_peers_set(sc_peerset::SetId::from(index), peer); } else { error!( target: "sub-libp2p", @@ -875,7 +875,7 @@ where /// Remove a peer from a peers set. pub fn remove_from_peers_set(&self, protocol: ProtocolName, peer: PeerId) { if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) { - self.peerset.remove_from_peers_set(sc_peerset::SetId::from(index), peer); + self.peerset_handle.remove_from_peers_set(sc_peerset::SetId::from(index), peer); } else { error!( target: "sub-libp2p", @@ -1092,7 +1092,7 @@ where peer_id, msg, ); - self.peerset.report_peer(peer_id, rep::BAD_MESSAGE); + self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE); CustomMessageOutcome::None }, Err(err) => { @@ -1115,7 +1115,7 @@ where err, err2, ); - self.peerset.report_peer(peer_id, rep::BAD_MESSAGE); + self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE); CustomMessageOutcome::None }, } @@ -1150,7 +1150,7 @@ where debug!(target: "sync", "Failed to parse remote handshake: {}", err); self.bad_handshake_substreams.insert((peer_id, set_id)); self.behaviour.disconnect_peer(&peer_id, set_id); - self.peerset.report_peer(peer_id, rep::BAD_MESSAGE); + self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE); CustomMessageOutcome::None }, } diff --git a/client/network/src/protocol/notifications/behaviour.rs b/client/network/src/protocol/notifications/behaviour.rs index 804bd11493d49..04f6fe445ac63 100644 --- a/client/network/src/protocol/notifications/behaviour.rs +++ b/client/network/src/protocol/notifications/behaviour.rs @@ -550,7 +550,7 @@ impl Notifications { } /// Returns the list of reserved peers. - pub fn reserved_peers(&self, set_id: sc_peerset::SetId) -> Vec { + pub fn reserved_peers(&self, set_id: sc_peerset::SetId) -> impl Iterator { self.peerset.reserved_peers(set_id) } diff --git a/client/network/src/protocol/notifications/tests.rs b/client/network/src/protocol/notifications/tests.rs index 9436f4e3190cd..fa79366d20283 100644 --- a/client/network/src/protocol/notifications/tests.rs +++ b/client/network/src/protocol/notifications/tests.rs @@ -63,7 +63,7 @@ fn build_nodes() -> (Swarm, Swarm) { .timeout(Duration::from_secs(20)) .boxed(); - let peerset = sc_peerset::Peerset::from_config(sc_peerset::PeersetConfig { + let (peerset, _) = sc_peerset::Peerset::from_config(sc_peerset::PeersetConfig { sets: vec![sc_peerset::SetConfig { in_peers: 25, out_peers: 25, diff --git a/client/network/src/request_responses.rs b/client/network/src/request_responses.rs index 148199e46f8e2..d49cbd8051341 100644 --- a/client/network/src/request_responses.rs +++ b/client/network/src/request_responses.rs @@ -65,7 +65,7 @@ use std::{ }; pub use libp2p::request_response::{InboundFailure, OutboundFailure, RequestId}; -use sc_peerset::{Peerset, BANNED_THRESHOLD}; +use sc_peerset::{PeersetHandle, BANNED_THRESHOLD}; /// Event generated by the [`RequestResponsesBehaviour`]. #[derive(Debug)] @@ -150,7 +150,7 @@ pub struct RequestResponsesBehaviour { send_feedback: HashMap>, /// Primarily used to get a reputation of a node. - peerset: Peerset, + peerset: PeersetHandle, /// Pending message request, holds `MessageRequest` as a Future state to poll it /// until we get a response from `Peerset` @@ -185,7 +185,7 @@ impl RequestResponsesBehaviour { /// the same protocol is passed twice. pub fn new( list: impl Iterator, - peerset: Peerset, + peerset: PeersetHandle, ) -> Result { let mut protocols = HashMap::new(); for protocol in list { @@ -949,9 +949,9 @@ mod tests { }], }; - let peerset = Peerset::from_config(config); + let (peerset, handle) = Peerset::from_config(config); - let behaviour = RequestResponsesBehaviour::new(list, peerset.clone()).unwrap(); + let behaviour = RequestResponsesBehaviour::new(list, handle).unwrap(); let mut swarm = Swarm::new(transport, behaviour, keypair.public().to_peer_id()); let listen_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); diff --git a/client/network/src/service.rs b/client/network/src/service.rs index fb0a4f622e0fd..66de263a19a3e 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -71,7 +71,7 @@ use sc_network_common::{ sync::SyncStatus, ExHashT, }; -use sc_peerset::Peerset; +use sc_peerset::PeersetHandle; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_blockchain::HeaderBackend; use sp_runtime::traits::{Block as BlockT, NumberFor, Zero}; @@ -116,7 +116,7 @@ pub struct NetworkService { bandwidth: Arc, /// Peerset manager (PSM); manages the reputation of nodes and indicates the network which /// nodes it should be connected to or not. - peerset: Peerset, + peerset: PeersetHandle, /// Channel that sends messages to the actual worker. to_worker: TracingUnboundedSender>, /// Interface that can be used to delegate calls to `ChainSync` @@ -675,7 +675,7 @@ where } /// Returns the list of reserved peers. - pub fn reserved_peers(&self) -> Vec { + pub fn reserved_peers(&self) -> impl Iterator { self.network_service.behaviour().user_protocol().reserved_peers() } } diff --git a/client/peerset/src/lib.rs b/client/peerset/src/lib.rs index c25367a22cd6b..4691be554a7a4 100644 --- a/client/peerset/src/lib.rs +++ b/client/peerset/src/lib.rs @@ -743,7 +743,7 @@ pub enum DropReason { /// Substream or connection has been explicitly refused by the target. In other words, the /// peer doesn't actually belong to this set. /// - /// This has the side effect of calling [`Peerset::remove_from_peers_set`]. + /// This has the side effect of calling [`PeersetHandle::remove_from_peers_set`]. Refused, } @@ -787,9 +787,9 @@ mod tests { }], }; - let peerset = Peerset::from_config(config); - peerset.add_reserved_peer(SetId::from(0), reserved_peer); - peerset.add_reserved_peer(SetId::from(0), reserved_peer2); + let (peerset, handle) = Peerset::from_config(config); + handle.add_reserved_peer(SetId::from(0), reserved_peer); + handle.add_reserved_peer(SetId::from(0), reserved_peer2); assert_messages( peerset, @@ -820,7 +820,7 @@ mod tests { }], }; - let peerset = Peerset::from_config(config); + let (mut peerset, _handle) = Peerset::from_config(config); peerset.incoming(SetId::from(0), incoming, ii); peerset.incoming(SetId::from(0), incoming, ii4); peerset.incoming(SetId::from(0), incoming2, ii2); @@ -851,7 +851,7 @@ mod tests { }], }; - let peerset = Peerset::from_config(config); + let (mut peerset, _) = Peerset::from_config(config); peerset.incoming(SetId::from(0), incoming, ii); assert_messages(peerset, vec![Message::Reject(ii)]); @@ -872,7 +872,7 @@ mod tests { }], }; - let peerset = Peerset::from_config(config); + let (mut peerset, _handle) = Peerset::from_config(config); peerset.add_to_peers_set(SetId::from(0), discovered); peerset.add_to_peers_set(SetId::from(0), discovered); peerset.add_to_peers_set(SetId::from(0), discovered2); @@ -888,7 +888,7 @@ mod tests { #[test] fn test_peerset_banned() { - let mut peerset = Peerset::from_config(PeersetConfig { + let (mut peerset, handle) = Peerset::from_config(PeersetConfig { sets: vec![SetConfig { in_peers: 25, out_peers: 25, @@ -900,7 +900,7 @@ mod tests { // We ban a node by setting its reputation under the threshold. let peer_id = PeerId::random(); - peerset.report_peer(peer_id, ReputationChange::new(BANNED_THRESHOLD - 1, "")); + handle.report_peer(peer_id, ReputationChange::new(BANNED_THRESHOLD - 1, "")); let fut = futures::future::poll_fn(move |cx| { // We need one polling for the message to be processed. @@ -931,7 +931,7 @@ mod tests { #[test] fn test_relloc_after_banned() { - let mut peerset = Peerset::from_config(PeersetConfig { + let (mut peerset, handle) = Peerset::from_config(PeersetConfig { sets: vec![SetConfig { in_peers: 25, out_peers: 25, @@ -943,7 +943,7 @@ mod tests { // We ban a node by setting its reputation under the threshold. let peer_id = PeerId::random(); - peerset.report_peer(peer_id, ReputationChange::new(BANNED_THRESHOLD - 1, "")); + handle.report_peer(peer_id, ReputationChange::new(BANNED_THRESHOLD - 1, "")); let fut = futures::future::poll_fn(move |cx| { // We need one polling for the message to be processed. diff --git a/client/peerset/tests/fuzz.rs b/client/peerset/tests/fuzz.rs index a100892eb6be6..48c5cb341c35a 100644 --- a/client/peerset/tests/fuzz.rs +++ b/client/peerset/tests/fuzz.rs @@ -47,7 +47,7 @@ fn test_once() { // Nodes that we have reserved. Always a subset of `known_nodes`. let mut reserved_nodes = HashSet::::new(); - let mut peerset = Peerset::from_config(PeersetConfig { + let (mut peerset, peerset_handle) = Peerset::from_config(PeersetConfig { sets: vec![SetConfig { bootnodes: (0..Uniform::new_inclusive(0, 4).sample(&mut rng)) .map(|_| { @@ -122,7 +122,7 @@ fn test_once() { 2 => if let Some(id) = known_nodes.iter().choose(&mut rng) { let val = Uniform::new_inclusive(i32::MIN, i32::MAX).sample(&mut rng); - peerset.report_peer(*id, ReputationChange::new(val, "")); + peerset_handle.report_peer(*id, ReputationChange::new(val, "")); }, // If we generate 3, disconnect from a random node. @@ -149,22 +149,22 @@ fn test_once() { }, // 5 and 6 are the reserved-only mode. - 5 => peerset.set_reserved_only(SetId::from(0), true), - 6 => peerset.set_reserved_only(SetId::from(0), false), + 5 => peerset_handle.set_reserved_only(SetId::from(0), true), + 6 => peerset_handle.set_reserved_only(SetId::from(0), false), // 7 and 8 are about switching a random node in or out of reserved mode. 7 => { if let Some(id) = known_nodes.iter().filter(|n| !reserved_nodes.contains(*n)).choose(&mut rng) { - peerset.add_reserved_peer(SetId::from(0), *id); + peerset_handle.add_reserved_peer(SetId::from(0), *id); reserved_nodes.insert(*id); } }, 8 => if let Some(id) = reserved_nodes.iter().choose(&mut rng).cloned() { reserved_nodes.remove(&id); - peerset.remove_reserved_peer(SetId::from(0), id); + peerset_handle.remove_reserved_peer(SetId::from(0), id); }, _ => unreachable!(), diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 0889b52ea5386..8b3a29ba4032a 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -257,7 +257,6 @@ async fn build_network_future< sc_rpc::system::Request::NetworkReservedPeers(sender) => { let reserved_peers = network.reserved_peers(); let reserved_peers = reserved_peers - .iter() .map(|peer_id| peer_id.to_base58()) .collect(); From f4600509de8948770b282d0ba071e372511cd43b Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 27 Dec 2022 18:05:13 +0300 Subject: [PATCH 4/5] Split `Peerset` & `PeersetHandle` in a new way --- client/peerset/src/lib.rs | 50 +++++++++++++++++++++--------------- client/peerset/tests/fuzz.rs | 2 +- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/client/peerset/src/lib.rs b/client/peerset/src/lib.rs index 4691be554a7a4..4dd031cd794f2 100644 --- a/client/peerset/src/lib.rs +++ b/client/peerset/src/lib.rs @@ -107,16 +107,11 @@ impl ReputationChange { /// Shared handle to the peer set manager (PSM). Distributed around the code. #[derive(Debug, Clone)] -pub struct Peerset { +pub struct PeersetHandle { inner: Arc>, } -impl Peerset { - /// Create PSM. - pub fn from_config(config: PeersetConfig) -> Self { - Self { inner: Arc::new(Mutex::new(PeersetInner::from_config(config))) } - } - +impl PeersetHandle { /// Adds a new reserved peer. The peerset will make an effort to always remain connected to /// this peer. /// @@ -166,6 +161,22 @@ impl Peerset { // TODO: refactor into sync function. future::ready(Ok(self.inner.lock().peer_reputation(peer_id))) } +} + +/// Side of the peer set manager owned by the network. In other words, the "receiving" side. +/// +/// Implements the `Stream` trait and can be polled for messages. The `Stream` never ends and never +/// errors. +pub struct Peerset { + inner: Arc>, +} + +impl Peerset { + /// Builds a new peerset from the given configuration. + pub fn from_config(config: PeersetConfig) -> (Self, PeersetHandle) { + let inner = Arc::new(Mutex::new(PeersetInner::from_config(config))); + (Self { inner: inner.clone() }, PeersetHandle { inner }) + } /// Notify PSM that we recived incoming connection. Will be answered either with /// a corresponding `Accept` or `Reject`, except if we were already connected to this peer. @@ -181,11 +192,6 @@ impl Peerset { self.inner.lock().dropped(set_id, peer_id, reason); } - /// Produces a JSON object containing the state of the peerset manager, for debugging purposes. - pub fn debug_info(&self) -> serde_json::Value { - self.inner.lock().debug_info() - } - /// Returns the list of reserved peers. pub fn reserved_peers(&self, set_id: SetId) -> Vec { self.inner.lock().reserved_peers(set_id) @@ -195,6 +201,11 @@ impl Peerset { pub fn num_discovered_peers(&self) -> usize { self.inner.lock().num_discovered_peers() } + + /// Produces a JSON object containing the state of the peerset manager, for debugging purposes. + pub fn debug_info(&self) -> serde_json::Value { + self.inner.lock().debug_info() + } } /// Message that can be sent by the peer set manager (PSM). @@ -264,12 +275,9 @@ pub struct SetConfig { pub reserved_only: bool, } -/// Side of the peer set manager owned by the network. In other words, the "receiving" side. -/// -/// Implements the `Stream` trait and can be polled for messages. The `Stream` never ends and never -/// errors. +/// Internal PSM data hidden behind `Arc>` in [`PeersetHandle`] and [`Peerset`]. #[derive(Debug)] -pub struct PeersetInner { +struct PeersetInner { /// Underlying data structure for the nodes's states. data: peersstate::PeersState, /// For each set, lists of nodes that don't occupy slots and that we should try to always be @@ -872,10 +880,10 @@ mod tests { }], }; - let (mut peerset, _handle) = Peerset::from_config(config); - peerset.add_to_peers_set(SetId::from(0), discovered); - peerset.add_to_peers_set(SetId::from(0), discovered); - peerset.add_to_peers_set(SetId::from(0), discovered2); + let (mut peerset, handle) = Peerset::from_config(config); + handle.add_to_peers_set(SetId::from(0), discovered); + handle.add_to_peers_set(SetId::from(0), discovered); + handle.add_to_peers_set(SetId::from(0), discovered2); assert_messages( peerset, diff --git a/client/peerset/tests/fuzz.rs b/client/peerset/tests/fuzz.rs index 48c5cb341c35a..3bf9c000b5f61 100644 --- a/client/peerset/tests/fuzz.rs +++ b/client/peerset/tests/fuzz.rs @@ -115,7 +115,7 @@ fn test_once() { 1 => { let new_id = PeerId::random(); known_nodes.insert(new_id); - peerset.add_to_peers_set(SetId::from(0), new_id); + peerset_handle.add_to_peers_set(SetId::from(0), new_id); }, // If we generate 2, adjust a random reputation. From 0fd5e4a1ae776f9df8ba1b4bb4b8ccd603fb01cf Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 27 Dec 2022 18:44:34 +0300 Subject: [PATCH 5/5] Update client code to match new `Peerset` interface --- client/network/src/protocol.rs | 2 +- client/network/src/protocol/notifications/behaviour.rs | 2 +- client/network/src/service.rs | 2 +- client/peerset/src/lib.rs | 6 +++--- client/service/src/lib.rs | 1 + 5 files changed, 7 insertions(+), 6 deletions(-) diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 1f0ec9c0d120d..ec139751a569b 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -797,7 +797,7 @@ where } /// Returns the list of reserved peers. - pub fn reserved_peers(&self) -> impl Iterator { + pub fn reserved_peers(&self) -> Vec { self.behaviour.reserved_peers(HARDCODED_PEERSETS_SYNC) } diff --git a/client/network/src/protocol/notifications/behaviour.rs b/client/network/src/protocol/notifications/behaviour.rs index 04f6fe445ac63..804bd11493d49 100644 --- a/client/network/src/protocol/notifications/behaviour.rs +++ b/client/network/src/protocol/notifications/behaviour.rs @@ -550,7 +550,7 @@ impl Notifications { } /// Returns the list of reserved peers. - pub fn reserved_peers(&self, set_id: sc_peerset::SetId) -> impl Iterator { + pub fn reserved_peers(&self, set_id: sc_peerset::SetId) -> Vec { self.peerset.reserved_peers(set_id) } diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 66de263a19a3e..037e6b744cea1 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -675,7 +675,7 @@ where } /// Returns the list of reserved peers. - pub fn reserved_peers(&self) -> impl Iterator { + pub fn reserved_peers(&self) -> Vec { self.network_service.behaviour().user_protocol().reserved_peers() } } diff --git a/client/peerset/src/lib.rs b/client/peerset/src/lib.rs index 4dd031cd794f2..473a48cb48134 100644 --- a/client/peerset/src/lib.rs +++ b/client/peerset/src/lib.rs @@ -828,7 +828,7 @@ mod tests { }], }; - let (mut peerset, _handle) = Peerset::from_config(config); + let (peerset, _handle) = Peerset::from_config(config); peerset.incoming(SetId::from(0), incoming, ii); peerset.incoming(SetId::from(0), incoming, ii4); peerset.incoming(SetId::from(0), incoming2, ii2); @@ -859,7 +859,7 @@ mod tests { }], }; - let (mut peerset, _) = Peerset::from_config(config); + let (peerset, _) = Peerset::from_config(config); peerset.incoming(SetId::from(0), incoming, ii); assert_messages(peerset, vec![Message::Reject(ii)]); @@ -880,7 +880,7 @@ mod tests { }], }; - let (mut peerset, handle) = Peerset::from_config(config); + let (peerset, handle) = Peerset::from_config(config); handle.add_to_peers_set(SetId::from(0), discovered); handle.add_to_peers_set(SetId::from(0), discovered); handle.add_to_peers_set(SetId::from(0), discovered2); diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 8b3a29ba4032a..0889b52ea5386 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -257,6 +257,7 @@ async fn build_network_future< sc_rpc::system::Request::NetworkReservedPeers(sender) => { let reserved_peers = network.reserved_peers(); let reserved_peers = reserved_peers + .iter() .map(|peer_id| peer_id.to_base58()) .collect();