diff --git a/Cargo.lock b/Cargo.lock index 19abb44e5ff9c..51d07a41767d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8018,6 +8018,7 @@ dependencies = [ "futures", "libp2p", "log", + "parking_lot 0.12.1", "rand 0.8.5", "sc-utils", "serde_json", diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 1f0ec9c0d120d..ec139751a569b 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -797,7 +797,7 @@ where } /// Returns the list of reserved peers. - pub fn reserved_peers(&self) -> impl Iterator { + pub fn reserved_peers(&self) -> Vec { self.behaviour.reserved_peers(HARDCODED_PEERSETS_SYNC) } diff --git a/client/network/src/protocol/notifications/behaviour.rs b/client/network/src/protocol/notifications/behaviour.rs index 04f6fe445ac63..804bd11493d49 100644 --- a/client/network/src/protocol/notifications/behaviour.rs +++ b/client/network/src/protocol/notifications/behaviour.rs @@ -550,7 +550,7 @@ impl Notifications { } /// Returns the list of reserved peers. - pub fn reserved_peers(&self, set_id: sc_peerset::SetId) -> impl Iterator { + pub fn reserved_peers(&self, set_id: sc_peerset::SetId) -> Vec { self.peerset.reserved_peers(set_id) } diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 66de263a19a3e..037e6b744cea1 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -675,7 +675,7 @@ where } /// Returns the list of reserved peers. - pub fn reserved_peers(&self) -> impl Iterator { + pub fn reserved_peers(&self) -> Vec { self.network_service.behaviour().user_protocol().reserved_peers() } } diff --git a/client/peerset/Cargo.toml b/client/peerset/Cargo.toml index 8625fe40c20d4..2377734f4f5f1 100644 --- a/client/peerset/Cargo.toml +++ b/client/peerset/Cargo.toml @@ -17,6 +17,7 @@ targets = ["x86_64-unknown-linux-gnu"] futures = "0.3.21" libp2p = { version = "0.49.0", default-features = false } log = "0.4.17" +parking_lot = "0.12.1" serde_json = "1.0.85" wasm-timer = "0.2" sc-utils = { version = "4.0.0-dev", path = "../utils" } diff --git a/client/peerset/src/lib.rs b/client/peerset/src/lib.rs index 9b1dc6a2d0276..473a48cb48134 100644 --- a/client/peerset/src/lib.rs +++ b/client/peerset/src/lib.rs @@ -34,13 +34,14 @@ mod peersstate; -use futures::{channel::oneshot, prelude::*}; +use futures::prelude::*; use log::{debug, error, trace}; -use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; +use parking_lot::Mutex; use serde_json::json; use std::{ collections::{HashMap, HashSet, VecDeque}, pin::Pin, + sync::Arc, task::{Context, Poll}, time::{Duration, Instant}, }; @@ -56,18 +57,6 @@ const DISCONNECT_REPUTATION_CHANGE: i32 = -256; /// the list. const FORGET_AFTER: Duration = Duration::from_secs(3600); -#[derive(Debug)] -enum Action { - AddReservedPeer(SetId, PeerId), - RemoveReservedPeer(SetId, PeerId), - SetReservedPeers(SetId, HashSet), - SetReservedOnly(SetId, bool), - ReportPeer(PeerId, ReputationChange), - AddToPeersSet(SetId, PeerId), - RemoveFromPeersSet(SetId, PeerId), - PeerReputation(PeerId, oneshot::Sender), -} - /// Identifier of a set in the peerset. /// /// Can be constructed using the `From` trait implementation based on the index of the set @@ -119,7 +108,7 @@ impl ReputationChange { /// Shared handle to the peer set manager (PSM). Distributed around the code. #[derive(Debug, Clone)] pub struct PeersetHandle { - tx: TracingUnboundedSender, + inner: Arc>, } impl PeersetHandle { @@ -131,50 +120,91 @@ impl PeersetHandle { /// > **Note**: Keep in mind that the networking has to know an address for this node, /// > otherwise it will not be able to connect to it. pub fn add_reserved_peer(&self, set_id: SetId, peer_id: PeerId) { - let _ = self.tx.unbounded_send(Action::AddReservedPeer(set_id, peer_id)); + self.inner.lock().add_reserved_peer(set_id, peer_id); } /// Remove a previously-added reserved peer. /// /// Has no effect if the node was not a reserved peer. pub fn remove_reserved_peer(&self, set_id: SetId, peer_id: PeerId) { - let _ = self.tx.unbounded_send(Action::RemoveReservedPeer(set_id, peer_id)); + self.inner.lock().remove_reserved_peer(set_id, peer_id); } /// Sets whether or not the peerset only has connections with nodes marked as reserved for /// the given set. pub fn set_reserved_only(&self, set_id: SetId, reserved: bool) { - let _ = self.tx.unbounded_send(Action::SetReservedOnly(set_id, reserved)); + self.inner.lock().set_reserved_only(set_id, reserved); } /// Set reserved peers to the new set. pub fn set_reserved_peers(&self, set_id: SetId, peer_ids: HashSet) { - let _ = self.tx.unbounded_send(Action::SetReservedPeers(set_id, peer_ids)); + self.inner.lock().set_reserved_peers(set_id, peer_ids); } /// Reports an adjustment to the reputation of the given peer. pub fn report_peer(&self, peer_id: PeerId, score_diff: ReputationChange) { - let _ = self.tx.unbounded_send(Action::ReportPeer(peer_id, score_diff)); + self.inner.lock().report_peer(peer_id, score_diff); } /// Add a peer to a set. pub fn add_to_peers_set(&self, set_id: SetId, peer_id: PeerId) { - let _ = self.tx.unbounded_send(Action::AddToPeersSet(set_id, peer_id)); + self.inner.lock().add_to_peers_set(set_id, peer_id); } /// Remove a peer from a set. pub fn remove_from_peers_set(&self, set_id: SetId, peer_id: PeerId) { - let _ = self.tx.unbounded_send(Action::RemoveFromPeersSet(set_id, peer_id)); + self.inner.lock().remove_from_peers_set(set_id, peer_id); } /// Returns the reputation value of the peer. - pub async fn peer_reputation(self, peer_id: PeerId) -> Result { - let (tx, rx) = oneshot::channel(); + pub fn peer_reputation(&self, peer_id: PeerId) -> impl Future> + Send { + // TODO: refactor into sync function. + future::ready(Ok(self.inner.lock().peer_reputation(peer_id))) + } +} - let _ = self.tx.unbounded_send(Action::PeerReputation(peer_id, tx)); +/// Side of the peer set manager owned by the network. In other words, the "receiving" side. +/// +/// Implements the `Stream` trait and can be polled for messages. The `Stream` never ends and never +/// errors. +pub struct Peerset { + inner: Arc>, +} - // The channel can only be closed if the peerset no longer exists. - rx.await.map_err(|_| ()) +impl Peerset { + /// Builds a new peerset from the given configuration. + pub fn from_config(config: PeersetConfig) -> (Self, PeersetHandle) { + let inner = Arc::new(Mutex::new(PeersetInner::from_config(config))); + (Self { inner: inner.clone() }, PeersetHandle { inner }) + } + + /// Notify PSM that we recived incoming connection. Will be answered either with + /// a corresponding `Accept` or `Reject`, except if we were already connected to this peer. + pub fn incoming(&self, set_id: SetId, peer_id: PeerId, index: IncomingIndex) { + self.inner.lock().incoming(set_id, peer_id, index); + } + + /// Notify PSM that we dropped an active connection with a peer, or that we failed to connect. + /// + /// Must only be called after the PSM has either generated a `Connect` message with this + /// `PeerId`, or accepted an incoming connection with this `PeerId`. + pub fn dropped(&self, set_id: SetId, peer_id: PeerId, reason: DropReason) { + self.inner.lock().dropped(set_id, peer_id, reason); + } + + /// Returns the list of reserved peers. + pub fn reserved_peers(&self, set_id: SetId) -> Vec { + self.inner.lock().reserved_peers(set_id) + } + + /// Returns the number of peers that we have discovered. + pub fn num_discovered_peers(&self) -> usize { + self.inner.lock().num_discovered_peers() + } + + /// Produces a JSON object containing the state of the peerset manager, for debugging purposes. + pub fn debug_info(&self) -> serde_json::Value { + self.inner.lock().debug_info() } } @@ -245,22 +275,15 @@ pub struct SetConfig { pub reserved_only: bool, } -/// Side of the peer set manager owned by the network. In other words, the "receiving" side. -/// -/// Implements the `Stream` trait and can be polled for messages. The `Stream` never ends and never -/// errors. +/// Internal PSM data hidden behind `Arc>` in [`PeersetHandle`] and [`Peerset`]. #[derive(Debug)] -pub struct Peerset { +struct PeersetInner { /// Underlying data structure for the nodes's states. data: peersstate::PeersState, /// For each set, lists of nodes that don't occupy slots and that we should try to always be /// connected to, and whether only reserved nodes are accepted. Is kept in sync with the list /// of non-slot-occupying nodes in [`Peerset::data`]. reserved_nodes: Vec<(HashSet, bool)>, - /// Receiver for messages from the `PeersetHandle` and from `tx`. - rx: TracingUnboundedReceiver, - /// Sending side of `rx`. - tx: TracingUnboundedSender, /// Queue of messages to be emitted when the `Peerset` is polled. message_queue: VecDeque, /// When the `Peerset` was created. @@ -272,13 +295,9 @@ pub struct Peerset { next_periodic_alloc_slots: Delay, } -impl Peerset { +impl PeersetInner { /// Builds a new peerset from the given configuration. - pub fn from_config(config: PeersetConfig) -> (Self, PeersetHandle) { - let (tx, rx) = tracing_unbounded("mpsc_peerset_messages", 10_000); - - let handle = PeersetHandle { tx: tx.clone() }; - + fn from_config(config: PeersetConfig) -> PeersetInner { let mut peerset = { let now = Instant::now(); @@ -286,8 +305,6 @@ impl Peerset { data: peersstate::PeersState::new(config.sets.iter().map(|set| { peersstate::SetConfig { in_peers: set.in_peers, out_peers: set.out_peers } })), - tx, - rx, reserved_nodes: config .sets .iter() @@ -318,10 +335,10 @@ impl Peerset { peerset.alloc_slots(SetId(set_index)); } - (peerset, handle) + peerset } - fn on_add_reserved_peer(&mut self, set_id: SetId, peer_id: PeerId) { + fn add_reserved_peer(&mut self, set_id: SetId, peer_id: PeerId) { let newly_inserted = self.reserved_nodes[set_id.0].0.insert(peer_id); if !newly_inserted { return @@ -331,7 +348,7 @@ impl Peerset { self.alloc_slots(set_id); } - fn on_remove_reserved_peer(&mut self, set_id: SetId, peer_id: PeerId) { + fn remove_reserved_peer(&mut self, set_id: SetId, peer_id: PeerId) { if !self.reserved_nodes[set_id.0].0.remove(&peer_id) { return } @@ -351,7 +368,7 @@ impl Peerset { } } - fn on_set_reserved_peers(&mut self, set_id: SetId, peer_ids: HashSet) { + fn set_reserved_peers(&mut self, set_id: SetId, peer_ids: HashSet) { // Determine the difference between the current group and the new list. let (to_insert, to_remove) = { let to_insert = peer_ids @@ -367,15 +384,15 @@ impl Peerset { }; for node in to_insert { - self.on_add_reserved_peer(set_id, node); + self.add_reserved_peer(set_id, node); } for node in to_remove { - self.on_remove_reserved_peer(set_id, node); + self.remove_reserved_peer(set_id, node); } } - fn on_set_reserved_only(&mut self, set_id: SetId, reserved_only: bool) { + fn set_reserved_only(&mut self, set_id: SetId, reserved_only: bool) { self.reserved_nodes[set_id.0].1 = reserved_only; if reserved_only { @@ -399,22 +416,20 @@ impl Peerset { } /// Returns the list of reserved peers. - pub fn reserved_peers(&self, set_id: SetId) -> impl Iterator { - self.reserved_nodes[set_id.0].0.iter() + fn reserved_peers(&self, set_id: SetId) -> Vec { + self.reserved_nodes[set_id.0].0.iter().map(ToOwned::to_owned).collect() } /// Adds a node to the given set. The peerset will, if possible and not already the case, /// try to connect to it. - /// - /// > **Note**: This has the same effect as [`PeersetHandle::add_to_peers_set`]. - pub fn add_to_peers_set(&mut self, set_id: SetId, peer_id: PeerId) { + fn add_to_peers_set(&mut self, set_id: SetId, peer_id: PeerId) { if let peersstate::Peer::Unknown(entry) = self.data.peer(set_id.0, &peer_id) { entry.discover(); self.alloc_slots(set_id); } } - fn on_remove_from_peers_set(&mut self, set_id: SetId, peer_id: PeerId) { + fn remove_from_peers_set(&mut self, set_id: SetId, peer_id: PeerId) { // Don't do anything if node is reserved. if self.reserved_nodes[set_id.0].0.contains(&peer_id) { return @@ -432,7 +447,7 @@ impl Peerset { } } - fn on_report_peer(&mut self, peer_id: PeerId, change: ReputationChange) { + fn report_peer(&mut self, peer_id: PeerId, change: ReputationChange) { // We want reputations to be up-to-date before adjusting them. self.update_time(); @@ -464,9 +479,8 @@ impl Peerset { } } - fn on_peer_reputation(&mut self, peer_id: PeerId, pending_response: oneshot::Sender) { - let reputation = self.data.peer_reputation(peer_id); - let _ = pending_response.send(reputation.reputation()); + fn peer_reputation(&mut self, peer_id: PeerId) -> i32 { + self.data.peer_reputation(peer_id).reputation() } /// Updates the value of `self.latest_time_update` and performs all the updates that happen @@ -614,7 +628,7 @@ impl Peerset { // Implementation note: because of concurrency issues, it is possible that we push a `Connect` // message to the output channel with a `PeerId`, and that `incoming` gets called with the same // `PeerId` before that message has been read by the user. In this situation we must not answer. - pub fn incoming(&mut self, set_id: SetId, peer_id: PeerId, index: IncomingIndex) { + fn incoming(&mut self, set_id: SetId, peer_id: PeerId, index: IncomingIndex) { trace!(target: "peerset", "Incoming {:?}", peer_id); self.update_time(); @@ -649,7 +663,7 @@ impl Peerset { /// /// Must only be called after the PSM has either generated a `Connect` message with this /// `PeerId`, or accepted an incoming connection with this `PeerId`. - pub fn dropped(&mut self, set_id: SetId, peer_id: PeerId, reason: DropReason) { + fn dropped(&mut self, set_id: SetId, peer_id: PeerId, reason: DropReason) { // We want reputations to be up-to-date before adjusting them. self.update_time(); @@ -667,22 +681,14 @@ impl Peerset { } if let DropReason::Refused = reason { - self.on_remove_from_peers_set(set_id, peer_id); + self.remove_from_peers_set(set_id, peer_id); } self.alloc_slots(set_id); } - /// Reports an adjustment to the reputation of the given peer. - pub fn report_peer(&mut self, peer_id: PeerId, score_diff: ReputationChange) { - // We don't immediately perform the adjustments in order to have state consistency. We - // don't want the reporting here to take priority over messages sent using the - // `PeersetHandle`. - let _ = self.tx.unbounded_send(Action::ReportPeer(peer_id, score_diff)); - } - /// Produces a JSON object containing the state of the peerset manager, for debugging purposes. - pub fn debug_info(&mut self) -> serde_json::Value { + fn debug_info(&mut self) -> serde_json::Value { self.update_time(); json!({ @@ -714,7 +720,7 @@ impl Peerset { } /// Returns the number of peers that we have discovered. - pub fn num_discovered_peers(&self) -> usize { + fn num_discovered_peers(&self) -> usize { self.data.peers().len() } } @@ -722,44 +728,19 @@ impl Peerset { impl Stream for Peerset { type Item = Message; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - loop { - if let Some(message) = self.message_queue.pop_front() { - return Poll::Ready(Some(message)) - } - - if Future::poll(Pin::new(&mut self.next_periodic_alloc_slots), cx).is_ready() { - self.next_periodic_alloc_slots = Delay::new(Duration::new(1, 0)); - - for set_index in 0..self.data.num_sets() { - self.alloc_slots(SetId(set_index)); - } - } - - let action = match Stream::poll_next(Pin::new(&mut self.rx), cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(Some(event)) => event, - Poll::Ready(None) => return Poll::Pending, - }; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let mut inner = self.inner.lock(); + if let Some(message) = inner.message_queue.pop_front() { + return Poll::Ready(Some(message)) + } + if Future::poll(Pin::new(&mut inner.next_periodic_alloc_slots), cx).is_ready() { + inner.next_periodic_alloc_slots = Delay::new(Duration::new(1, 0)); - match action { - Action::AddReservedPeer(set_id, peer_id) => - self.on_add_reserved_peer(set_id, peer_id), - Action::RemoveReservedPeer(set_id, peer_id) => - self.on_remove_reserved_peer(set_id, peer_id), - Action::SetReservedPeers(set_id, peer_ids) => - self.on_set_reserved_peers(set_id, peer_ids), - Action::SetReservedOnly(set_id, reserved) => - self.on_set_reserved_only(set_id, reserved), - Action::ReportPeer(peer_id, score_diff) => self.on_report_peer(peer_id, score_diff), - Action::AddToPeersSet(sets_name, peer_id) => - self.add_to_peers_set(sets_name, peer_id), - Action::RemoveFromPeersSet(sets_name, peer_id) => - self.on_remove_from_peers_set(sets_name, peer_id), - Action::PeerReputation(peer_id, pending_response) => - self.on_peer_reputation(peer_id, pending_response), + for set_index in 0..inner.data.num_sets() { + inner.alloc_slots(SetId(set_index)); } } + Poll::Pending } } @@ -847,7 +828,7 @@ mod tests { }], }; - let (mut peerset, _handle) = Peerset::from_config(config); + let (peerset, _handle) = Peerset::from_config(config); peerset.incoming(SetId::from(0), incoming, ii); peerset.incoming(SetId::from(0), incoming, ii4); peerset.incoming(SetId::from(0), incoming2, ii2); @@ -878,7 +859,7 @@ mod tests { }], }; - let (mut peerset, _) = Peerset::from_config(config); + let (peerset, _) = Peerset::from_config(config); peerset.incoming(SetId::from(0), incoming, ii); assert_messages(peerset, vec![Message::Reject(ii)]); @@ -899,10 +880,10 @@ mod tests { }], }; - let (mut peerset, _handle) = Peerset::from_config(config); - peerset.add_to_peers_set(SetId::from(0), discovered); - peerset.add_to_peers_set(SetId::from(0), discovered); - peerset.add_to_peers_set(SetId::from(0), discovered2); + let (peerset, handle) = Peerset::from_config(config); + handle.add_to_peers_set(SetId::from(0), discovered); + handle.add_to_peers_set(SetId::from(0), discovered); + handle.add_to_peers_set(SetId::from(0), discovered2); assert_messages( peerset, diff --git a/client/peerset/tests/fuzz.rs b/client/peerset/tests/fuzz.rs index 48c5cb341c35a..3bf9c000b5f61 100644 --- a/client/peerset/tests/fuzz.rs +++ b/client/peerset/tests/fuzz.rs @@ -115,7 +115,7 @@ fn test_once() { 1 => { let new_id = PeerId::random(); known_nodes.insert(new_id); - peerset.add_to_peers_set(SetId::from(0), new_id); + peerset_handle.add_to_peers_set(SetId::from(0), new_id); }, // If we generate 2, adjust a random reputation. diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 8b3a29ba4032a..0889b52ea5386 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -257,6 +257,7 @@ async fn build_network_future< sc_rpc::system::Request::NetworkReservedPeers(sender) => { let reserved_peers = network.reserved_peers(); let reserved_peers = reserved_peers + .iter() .map(|peer_id| peer_id.to_base58()) .collect();