From a611a4f64eedb067adac94aaf9cc8a7780f3662b Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 30 Apr 2019 14:35:38 +0200 Subject: [PATCH 01/11] Rewrite the PSM --- core/network-libp2p/src/behaviour.rs | 2 +- .../src/custom_proto/behaviour.rs | 2 +- core/peerset/src/lib.rs | 498 ++++++------------ core/peerset/src/peersstate.rs | 472 +++++++++++++++++ core/peerset/src/slots.rs | 222 -------- 5 files changed, 624 insertions(+), 572 deletions(-) create mode 100644 core/peerset/src/peersstate.rs delete mode 100644 core/peerset/src/slots.rs diff --git a/core/network-libp2p/src/behaviour.rs b/core/network-libp2p/src/behaviour.rs index 30dc0f52e42d1..5b4943c1bb99f 100644 --- a/core/network-libp2p/src/behaviour.rs +++ b/core/network-libp2p/src/behaviour.rs @@ -159,7 +159,7 @@ impl Behaviour { } /// Returns the state of the peerset manager, for debugging purposes. - pub fn peerset_debug_info(&self) -> serde_json::Value { + pub fn peerset_debug_info(&mut self) -> serde_json::Value { self.custom_protocols.peerset_debug_info() } } diff --git a/core/network-libp2p/src/custom_proto/behaviour.rs b/core/network-libp2p/src/custom_proto/behaviour.rs index eb50325dc01fd..de82a6c7d5941 100644 --- a/core/network-libp2p/src/custom_proto/behaviour.rs +++ b/core/network-libp2p/src/custom_proto/behaviour.rs @@ -357,7 +357,7 @@ impl CustomProto { } /// Returns the state of the peerset manager, for debugging purposes. - pub fn peerset_debug_info(&self) -> serde_json::Value { + pub fn peerset_debug_info(&mut self) -> serde_json::Value { self.peerset.debug_info() } diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index 570dcf5f866e3..5550d0d256248 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -17,34 +17,14 @@ //! Peer Set Manager (PSM). Contains the strategy for choosing which nodes the network should be //! connected to. -mod slots; +mod peersstate; -use std::collections::VecDeque; +use std::{collections::HashMap, collections::VecDeque, convert::TryFrom, time::Instant}; use futures::{prelude::*, sync::mpsc, try_ready}; use libp2p::PeerId; -use log::trace; -use lru_cache::LruCache; -use slots::{SlotType, SlotState, Slots}; +use log::{debug, error, trace}; use serde_json::json; -const PEERSET_SCORES_CACHE_SIZE: usize = 1000; -const DISCOVERED_NODES_LIMIT: u32 = 1000; - -#[derive(Debug)] -struct PeersetData { - /// List of nodes that we know exist, but we are not connected to. - /// Elements in this list must never be in `out_slots` or `in_slots`. - discovered: Slots, - /// If true, we only accept reserved nodes. - reserved_only: bool, - /// Node slots for outgoing connections. - out_slots: Slots, - /// Node slots for incoming connections. - in_slots: Slots, - /// List of node scores. - scores: LruCache, -} - #[derive(Debug)] enum Action { AddReservedPeer(PeerId), @@ -147,9 +127,15 @@ pub struct PeersetConfig { /// errors. #[derive(Debug)] pub struct Peerset { - data: PeersetData, + data: peersstate::PeersState, + /// If true, we only accept reserved nodes. + reserved_only: bool, rx: mpsc::UnboundedReceiver, message_queue: VecDeque, + /// When the `Peerset` was created. + created: Instant, + /// Last time when we updated the reputations of connected nodes. + latest_time_update: Instant, } impl Peerset { @@ -157,30 +143,33 @@ impl Peerset { pub fn from_config(config: PeersetConfig) -> (Peerset, PeersetHandle) { let (tx, rx) = mpsc::unbounded(); - let data = PeersetData { - discovered: Slots::new(DISCOVERED_NODES_LIMIT), - reserved_only: config.reserved_only, - out_slots: Slots::new(config.out_peers), - in_slots: Slots::new(config.in_peers), - scores: LruCache::new(PEERSET_SCORES_CACHE_SIZE), - }; - let handle = PeersetHandle { tx, }; let mut peerset = Peerset { - data, + data: peersstate::PeersState::new(config.in_peers, config.out_peers), rx, + reserved_only: config.reserved_only, message_queue: VecDeque::new(), + created: Instant::now(), + latest_time_update: Instant::now(), }; for peer_id in config.reserved_nodes { - peerset.data.discovered.add_peer(peer_id, SlotType::Reserved); + if let peersstate::Peer::Unknown(entry) = peerset.data.peer(&peer_id) { + entry.discover().set_reserved(true); + } else { + debug!(target: "peerset", "Duplicate reserved node in config: {:?}", peer_id); + } } for peer_id in config.bootnodes { - peerset.data.discovered.add_peer(peer_id, SlotType::Common); + if let peersstate::Peer::Unknown(entry) = peerset.data.peer(&peer_id) { + entry.discover(); + } else { + debug!(target: "peerset", "Duplicate bootnode in config: {:?}", peer_id); + } } peerset.alloc_slots(); @@ -188,115 +177,103 @@ impl Peerset { } fn on_add_reserved_peer(&mut self, peer_id: PeerId) { - // Nothing more to do if we're already connected. - if self.data.in_slots.contains(&peer_id) { - self.data.in_slots.mark_reserved(&peer_id); - return; - } - - match self.data.out_slots.add_peer(peer_id, SlotType::Reserved) { - SlotState::Added(peer_id) => { - // reserved node may have been previously stored as normal node in discovered list - self.data.discovered.remove_peer(&peer_id); - - // notify that connection has been made - trace!(target: "peerset", "Connecting to new reserved peer {}", peer_id); - self.message_queue.push_back(Message::Connect(peer_id)); - return; - }, - SlotState::Swaped { removed, added } => { - // reserved node may have been previously stored as normal node in discovered list - self.data.discovered.remove_peer(&added); - // let's add the peer we disconnected from to the discovered list again - self.data.discovered.add_peer(removed.clone(), SlotType::Common); - // swap connections - trace!(target: "peerset", "Connecting to new reserved peer {}, dropping {}", added, removed); - self.message_queue.push_back(Message::Drop(removed)); - self.message_queue.push_back(Message::Connect(added)); + let mut entry = match self.data.peer(&peer_id) { + peersstate::Peer::Connected(mut connected) => { + connected.set_reserved(true); + return } - SlotState::AlreadyExists(_) | SlotState::Upgraded(_) => { - return; - } - SlotState::MaxCapacity(peer_id) => { - self.data.discovered.add_peer(peer_id, SlotType::Reserved); - return; - } - } + peersstate::Peer::NotConnected(entry) => entry, + peersstate::Peer::Unknown(entry) => entry.discover(), + }; + + // We reach this point if and only if we were not connected to the node. + entry.set_reserved(true); + entry.force_outgoing(); + self.message_queue.push_back(Message::Connect(peer_id)); } fn on_remove_reserved_peer(&mut self, peer_id: PeerId) { - self.data.in_slots.mark_not_reserved(&peer_id); - self.data.out_slots.mark_not_reserved(&peer_id); - self.data.discovered.mark_not_reserved(&peer_id); - if self.data.reserved_only { - if self.data.in_slots.remove_peer(&peer_id) || self.data.out_slots.remove_peer(&peer_id) { - // insert peer back into discovered list - self.data.discovered.add_peer(peer_id.clone(), SlotType::Common); + match self.data.peer(&peer_id) { + peersstate::Peer::Connected(mut peer) => { + peer.set_reserved(false); + peer.disconnect(); self.message_queue.push_back(Message::Drop(peer_id)); - // call alloc_slots again, cause we may have some reserved peers in discovered list - // waiting for the slot that was just cleared - self.alloc_slots(); } + peersstate::Peer::NotConnected(mut peer) => peer.set_reserved(false), + peersstate::Peer::Unknown(_) => {} } } fn on_set_reserved_only(&mut self, reserved_only: bool) { // Disconnect non-reserved nodes. - self.data.reserved_only = reserved_only; - if self.data.reserved_only { - for peer_id in self.data.in_slots.clear_common_slots().into_iter().chain(self.data.out_slots.clear_common_slots().into_iter()) { - // insert peer back into discovered list - self.data.discovered.add_peer(peer_id.clone(), SlotType::Common); + self.reserved_only = reserved_only; + if self.reserved_only { + for peer_id in self.data.connected_peers().cloned().collect::>().into_iter() { + self.data.peer(&peer_id).into_connected() + .expect("We are enumerating connected peers, therefore the peer is connected; qed") + .disconnect(); self.message_queue.push_back(Message::Drop(peer_id)); } + } else { self.alloc_slots(); } } fn on_report_peer(&mut self, peer_id: PeerId, score_diff: i32) { - let score = match self.data.scores.get_mut(&peer_id) { - Some(score) => { - *score = score.saturating_add(score_diff); - *score - }, - None => { - self.data.scores.insert(peer_id.clone(), score_diff); - score_diff - } - }; + // We want reputations to be up-to-date before adjusting them. + self.update_time(); - if score < 0 { - // peer will be removed from `in_slots` or `out_slots` in `on_dropped` method - if self.data.in_slots.contains(&peer_id) || self.data.out_slots.contains(&peer_id) { - self.data.in_slots.remove_peer(&peer_id); - self.data.out_slots.remove_peer(&peer_id); - self.message_queue.push_back(Message::Drop(peer_id)); - } + match self.data.peer(&peer_id) { + peersstate::Peer::Connected(mut peer) => peer.add_reputation(score_diff), + peersstate::Peer::NotConnected(mut peer) => peer.add_reputation(score_diff), + peersstate::Peer::Unknown(peer) => peer.discover().add_reputation(score_diff), } } + /// Updates the value of `self.latest_time_update` and performs all the updates that happen + /// over time, such as reputation increases for staying connected. + fn update_time(&mut self) { + // We basically do `now - self.latest_update`, except that by the way we do it we know that + // we're not going to miss entire seconds because of rounding to integers. + let secs_diff = { + let now = Instant::now(); + let elapsed_latest = self.latest_time_update - self.created; + let elapsed_now = now - self.created; + self.latest_time_update = now; + elapsed_now.as_secs() - elapsed_latest.as_secs() + }; + + let rep_increase = i32::try_from(secs_diff).unwrap_or(i32::max_value()); + self.data.connected_reputation_increase(rep_increase); + } + + /// Try to fill available out slots with nodes. fn alloc_slots(&mut self) { - while let Some((peer_id, slot_type)) = self.data.discovered.pop_most_important_peer(self.data.reserved_only) { - match self.data.out_slots.add_peer(peer_id, slot_type) { - SlotState::Added(peer_id) => { - trace!(target: "peerset", "Connecting to new peer {}", peer_id); - self.message_queue.push_back(Message::Connect(peer_id)); - }, - SlotState::Swaped { removed, added } => { - // insert peer back into discovered list - trace!(target: "peerset", "Connecting to new peer {}, dropping {}", added, removed); - self.data.discovered.add_peer(removed.clone(), SlotType::Common); - self.message_queue.push_back(Message::Drop(removed)); - self.message_queue.push_back(Message::Connect(added)); + self.update_time(); + + loop { + // Try to grab the next node to attempt to connect to. + let next = match self.data.reserved_not_connected_peer() { + Some(p) => p, + None => if self.reserved_only { + break // No known node to add. + } else { + match self.data.highest_not_connected_peer() { + Some(p) => p, + None => break, // No known node to add. + } } - SlotState::Upgraded(_) | SlotState::AlreadyExists(_) => { - // TODO: we should never reach this point - }, - SlotState::MaxCapacity(peer_id) => { - self.data.discovered.add_peer(peer_id, slot_type); - break; - }, + }; + + // Don't connect to nodes with an abysmal reputation. + if next.reputation() == i32::min_value() { + break; + } + + match next.try_outgoing() { + Ok(conn) => self.message_queue.push_back(Message::Connect(conn.into_peer_id())), + Err(_) => break, // No more slots available. } } } @@ -311,54 +288,18 @@ impl Peerset { /// Because of concurrency issues, it is acceptable to call `incoming` with a `PeerId` the /// peerset is already connected to, in which case it must not answer. pub fn incoming(&mut self, peer_id: PeerId, index: IncomingIndex) { - trace!( - target: "peerset", - "Incoming {:?}\nin_slots={:?}\nout_slots={:?}", - peer_id, self.data.in_slots, self.data.out_slots - ); - // if `reserved_only` is set, but this peer is not a part of our discovered list, - // a) it is not reserved, so we reject the connection - // b) we are already connected to it, so we reject the connection - if self.data.reserved_only && !self.data.discovered.is_reserved(&peer_id) { - self.message_queue.push_back(Message::Reject(index)); - return; - } - - // check if we are already connected to this peer - if self.data.out_slots.contains(&peer_id) { - // we are already connected. in this case we do not answer - return; - } + trace!(target: "peerset", "Incoming {:?}", peer_id); - let slot_type = if self.data.reserved_only { - SlotType::Reserved - } else { - SlotType::Common + let not_connected = match self.data.peer(&peer_id) { + // If we're already connected, don't answer, as the docs mention. + peersstate::Peer::Connected(_) => return, + peersstate::Peer::NotConnected(entry) => entry, + peersstate::Peer::Unknown(entry) => entry.discover(), }; - match self.data.in_slots.add_peer(peer_id, slot_type) { - SlotState::Added(peer_id) => { - // reserved node may have been previously stored as normal node in discovered list - self.data.discovered.remove_peer(&peer_id); - self.message_queue.push_back(Message::Accept(index)); - return; - }, - SlotState::Swaped { removed, added } => { - // reserved node may have been previously stored as normal node in discovered list - self.data.discovered.remove_peer(&added); - // swap connections. - self.message_queue.push_back(Message::Drop(removed)); - self.message_queue.push_back(Message::Accept(index)); - }, - SlotState::AlreadyExists(_) | SlotState::Upgraded(_) => { - // we are already connected. in this case we do not answer - return; - }, - SlotState::MaxCapacity(peer_id) => { - self.data.discovered.add_peer(peer_id, slot_type); - self.message_queue.push_back(Message::Reject(index)); - return; - }, + match not_connected.try_accept_incoming() { + Ok(_) => self.message_queue.push_back(Message::Accept(index)), + Err(_) => self.message_queue.push_back(Message::Reject(index)), } } @@ -367,24 +308,24 @@ impl Peerset { /// Must only be called after the PSM has either generated a `Connect` message with this /// `PeerId`, or accepted an incoming connection with this `PeerId`. pub fn dropped(&mut self, peer_id: PeerId) { - trace!( - target: "peerset", - "Dropping {:?}\nin_slots={:?}\nout_slots={:?}", - peer_id, self.data.in_slots, self.data.out_slots - ); - // Automatically connect back if reserved. - if self.data.in_slots.is_reserved(&peer_id) || self.data.out_slots.is_reserved(&peer_id) { - self.message_queue.push_back(Message::Connect(peer_id)); - return; + trace!(target: "peerset", "Dropping {:?}", peer_id); + + // We want reputations to be up-to-date before adjusting them. + self.update_time(); + + match self.data.peer(&peer_id) { + peersstate::Peer::Connected(mut entry) => { + // Decrease the node's reputation so that we don't try it again and again and again. + // We decrease by 20% so that it doesn't take forever to remove a node we were + // connected to for a very long time. + entry.set_reputation(entry.reputation() - entry.reputation() / 20); + entry.add_reputation(-10); + entry.disconnect(); + }, + peersstate::Peer::NotConnected(_) | peersstate::Peer::Unknown(_) => + error!(target: "peerset", "Received dropped() for non-connected node"), } - // Otherwise, free the slot. - self.data.in_slots.remove_peer(&peer_id); - self.data.out_slots.remove_peer(&peer_id); - - // Note: in this dummy implementation we consider that peers never expire. As soon as we - // are disconnected from a peer, we try again. - self.data.discovered.add_peer(peer_id, SlotType::Common); self.alloc_slots(); } @@ -393,28 +334,42 @@ impl Peerset { /// > **Note**: There is no equivalent "expired" message, meaning that it is the responsibility /// > of the PSM to remove `PeerId`s that fail to dial too often. pub fn discovered>(&mut self, peer_ids: I) { + let mut discovered_any = false; + for peer_id in peer_ids { - if !self.data.in_slots.contains(&peer_id) && !self.data.out_slots.contains(&peer_id) && !self.data.discovered.contains(&peer_id) { - trace!(target: "peerset", "Discovered new peer: {:?}", peer_id); - self.data.discovered.add_peer(peer_id, SlotType::Common); - } else { - trace!(target: "peerset", "Discovered known peer: {:?}", peer_id); + if let peersstate::Peer::Unknown(entry) = self.data.peer(&peer_id) { + entry.discover(); + discovered_any = true; } } - self.alloc_slots(); + if discovered_any { + self.alloc_slots(); + } } /// Produces a JSON object containing the state of the peerset manager, for debugging purposes. - pub fn debug_info(&self) -> serde_json::Value { + pub fn debug_info(&mut self) -> serde_json::Value { + self.update_time(); + json!({ - "data": { - // add scores - "discovered": self.data.discovered.debug_info(), - "reserved_only": self.data.reserved_only, - "out_slots": self.data.out_slots.debug_info(), - "in_slots": self.data.in_slots.debug_info() - }, + "nodes": self.data.peers().cloned().collect::>().into_iter().map(|peer_id| { + let state = match self.data.peer(&peer_id) { + peersstate::Peer::Connected(entry) => json!({ + "connected": true, + "reputation": entry.reputation() + }), + peersstate::Peer::NotConnected(entry) => json!({ + "connected": false, + "reputation": entry.reputation() + }), + peersstate::Peer::Unknown(_) => + unreachable!("We iterate over the known peers; QED") + }; + + (peer_id.to_base58(), state) + }).collect::>(), + "reserved_only": self.reserved_only, "message_queue": self.message_queue.len(), }) } @@ -454,7 +409,7 @@ mod tests { assert_eq!(message, expected_message); peerset = p; } - assert!(peerset.message_queue.is_empty()); + assert!(peerset.message_queue.is_empty(), peerset.message_queue); peerset } @@ -466,49 +421,6 @@ mod tests { Ok((message, peerset)) } - #[test] - fn test_peerset_from_config_with_bootnodes() { - let bootnode = PeerId::random(); - let bootnode2 = PeerId::random(); - let config = PeersetConfig { - in_peers: 0, - out_peers: 2, - bootnodes: vec![bootnode.clone(), bootnode2.clone()], - reserved_only: false, - reserved_nodes: Vec::new(), - }; - - let (peerset, _handle) = Peerset::from_config(config); - - assert_messages(peerset, vec![ - Message::Connect(bootnode), - Message::Connect(bootnode2), - ]); - } - - #[test] - fn test_peerset_from_config_with_reserved_nodes() { - let bootnode = PeerId::random(); - let bootnode2 = PeerId::random(); - let reserved_peer = PeerId::random(); - let reserved_peer2 = PeerId::random(); - let config = PeersetConfig { - in_peers: 0, - out_peers: 3, - bootnodes: vec![bootnode.clone(), bootnode2.clone()], - reserved_only: false, - reserved_nodes: vec![reserved_peer.clone(), reserved_peer2.clone()], - }; - - let (peerset, _handle) = Peerset::from_config(config); - - assert_messages(peerset, vec![ - Message::Connect(reserved_peer), - Message::Connect(reserved_peer2), - Message::Connect(bootnode) - ]); - } - #[test] fn test_peerset_add_reserved_peer() { let bootnode = PeerId::random(); @@ -532,87 +444,6 @@ mod tests { ]); } - #[test] - fn test_peerset_remove_reserved_peer() { - let reserved_peer = PeerId::random(); - let reserved_peer2 = PeerId::random(); - let config = PeersetConfig { - in_peers: 0, - out_peers: 2, - bootnodes: vec![], - reserved_only: false, - reserved_nodes: vec![reserved_peer.clone(), reserved_peer2.clone()], - }; - - let (peerset, handle) = Peerset::from_config(config); - handle.remove_reserved_peer(reserved_peer.clone()); - - let peerset = assert_messages(peerset, vec![ - Message::Connect(reserved_peer.clone()), - Message::Connect(reserved_peer2.clone()), - ]); - - handle.set_reserved_only(true); - handle.remove_reserved_peer(reserved_peer2.clone()); - - assert_messages(peerset, vec![ - Message::Drop(reserved_peer), - Message::Drop(reserved_peer2), - ]); - } - - #[test] - fn test_peerset_set_reserved_only() { - let bootnode = PeerId::random(); - let bootnode2 = PeerId::random(); - let reserved_peer = PeerId::random(); - let reserved_peer2 = PeerId::random(); - let config = PeersetConfig { - in_peers: 0, - out_peers: 4, - bootnodes: vec![bootnode.clone(), bootnode2.clone()], - reserved_only: false, - reserved_nodes: vec![reserved_peer.clone(), reserved_peer2.clone()], - }; - - let (peerset, handle) = Peerset::from_config(config); - handle.set_reserved_only(true); - handle.set_reserved_only(false); - - assert_messages(peerset, vec![ - Message::Connect(reserved_peer), - Message::Connect(reserved_peer2), - Message::Connect(bootnode.clone()), - Message::Connect(bootnode2.clone()), - Message::Drop(bootnode.clone()), - Message::Drop(bootnode2.clone()), - Message::Connect(bootnode), - Message::Connect(bootnode2), - ]); - } - - #[test] - fn test_peerset_report_peer() { - let bootnode = PeerId::random(); - let bootnode2 = PeerId::random(); - let config = PeersetConfig { - in_peers: 0, - out_peers: 1, - bootnodes: vec![bootnode.clone(), bootnode2.clone()], - reserved_only: false, - reserved_nodes: Vec::new(), - }; - - let (peerset, handle) = Peerset::from_config(config); - handle.report_peer(bootnode2, -1); - handle.report_peer(bootnode.clone(), -1); - - assert_messages(peerset, vec![ - Message::Connect(bootnode.clone()), - Message::Drop(bootnode) - ]); - } - #[test] fn test_peerset_incoming() { let bootnode = PeerId::random(); @@ -645,35 +476,6 @@ mod tests { ]); } - #[test] - fn test_peerset_dropped() { - let bootnode = PeerId::random(); - let bootnode2 = PeerId::random(); - let reserved_peer = PeerId::random(); - let config = PeersetConfig { - in_peers: 0, - out_peers: 2, - bootnodes: vec![bootnode.clone(), bootnode2.clone()], - reserved_only: false, - reserved_nodes: vec![reserved_peer.clone()], - }; - - let (peerset, _handle) = Peerset::from_config(config); - - let mut peerset = assert_messages(peerset, vec![ - Message::Connect(reserved_peer.clone()), - Message::Connect(bootnode.clone()), - ]); - - peerset.dropped(reserved_peer.clone()); - peerset.dropped(bootnode); - - let _peerset = assert_messages(peerset, vec![ - Message::Connect(reserved_peer), - Message::Connect(bootnode2), - ]); - } - #[test] fn test_peerset_discovered() { let bootnode = PeerId::random(); diff --git a/core/peerset/src/peersstate.rs b/core/peerset/src/peersstate.rs new file mode 100644 index 0000000000000..457d53998f9bf --- /dev/null +++ b/core/peerset/src/peersstate.rs @@ -0,0 +1,472 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Contains the state storage behind the peerset. + +use libp2p::PeerId; +use std::{borrow::Cow, collections::HashMap, convert::TryFrom}; + +/// State storage behind the peerset. +/// +/// # Usage +/// +/// This struct is nothing more but a data structure containing a list of nodes, where each node +/// has a reputation and is either connected to us or not. +/// +#[derive(Debug, Clone)] +pub struct PeersState { + /// List of nodes that we know about. + /// + /// > **Note**: This list should really be ordered by decreasing reputation, so that we can + /// easily select the best node to connect to. As a first draft, however, we don't + /// sort, to make the logic easier. + nodes: HashMap, + + /// Maximum allowed number of non-reserved nodes for which the `ConnectionState` is `In`. + max_in: u32, + + /// Maximum allowed number of non-reserved nodes for which the `ConnectionState` is `Out`. + max_out: u32, +} + +/// State of a single node that we know about. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +struct Node { + /// Whether we are connected to this node. + connection_state: ConnectionState, + + /// If true, this node is reserved and should always be connected to. + reserved: bool, + + /// Reputation value of the node, between `i32::min_value` (we hate that node) and + /// `i32::max_value` (we love that node). + reputation: i32, +} + +/// Whether we are connected to a node. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +enum ConnectionState { + /// We are connected through an ingoing connection. + In, + /// We are connected through an outgoing connection. + Out, + /// We are not connected to this node. + NotConnected, +} + +impl ConnectionState { + /// Returns `true` for `In` and `Out`. + fn is_connected(self) -> bool { + match self { + ConnectionState::In => true, + ConnectionState::Out => true, + ConnectionState::NotConnected => false, + } + } +} + +impl PeersState { + /// Builds a new empty `PeersState`. + pub fn new(in_peers: u32, out_peers: u32) -> Self { + PeersState { + nodes: HashMap::new(), + max_in: in_peers, + max_out: out_peers, + } + } + + /// Adds `value` to the reputation of all the nodes we are connected to. + /// + /// In case of overflow, the value of capped. + pub fn connected_reputation_increase(&mut self, value: i32) { + for (_, peer_state) in self.nodes.iter_mut() { + if peer_state.connection_state.is_connected() { + peer_state.reputation = peer_state.reputation.saturating_add(value); + } + } + } + + /// Returns an object that grants access to the state of a peer. + pub fn peer<'a>(&'a mut self, peer_id: &'a PeerId) -> Peer<'a> { + if let Some(node) = self.nodes.get(peer_id) { + if node.connection_state.is_connected() { + Peer::Connected(ConnectedPeer { + parent: self, + peer_id: Cow::Borrowed(peer_id), + }) + } else { + Peer::NotConnected(NotConnectedPeer { + parent: self, + peer_id: Cow::Borrowed(peer_id), + }) + } + } else { + Peer::Unknown(UnknownPeer { + parent: self, + peer_id: Cow::Borrowed(peer_id), + }) + } + } + + /// Returns the list of all the peers we know of. + // Note: this method could theoretically return a `Peer`, but implementing that + // isn't simple. + pub fn peers(&self) -> impl Iterator { + self.nodes.keys() + } + + /// Returns the list of peers we are connected to. + // Note: this method could theoretically return a `ConnectedPeer`, but implementing that + // isn't simple. + pub fn connected_peers(&self) -> impl Iterator { + self.nodes.iter() + .filter(|(_, p)| p.connection_state.is_connected()) + .map(|(p, _)| p) + } + + /// Returns the first reserved peer that we are not connected to. + /// + /// If multiple nodes are reserved, which one is returned is unspecified. + pub fn reserved_not_connected_peer(&mut self) -> Option { + let peer_id = self.nodes.iter_mut() + .find(|(_, &mut Node { connection_state, reserved, .. })| { + reserved && !connection_state.is_connected() + }) + .map(|(peer_id, _)| peer_id.clone()); + + if let Some(peer_id) = peer_id { + Some(NotConnectedPeer { + parent: self, + peer_id: Cow::Owned(peer_id), + }) + } else { + None + } + } + + /// Returns the peer with the highest reputation and that we are not connected to. + /// + /// If multiple nodes have the same reputation, which one is returned is unspecified. + pub fn highest_not_connected_peer(&mut self) -> Option { + let peer_id = self.nodes + .iter() + .filter(|(_, Node { connection_state, .. })| !connection_state.is_connected()) + .fold(None::<(&PeerId, &Node)>, |mut cur_node, to_try| { + if let Some(cur_node) = cur_node.take() { + if cur_node.1.reputation >= to_try.1.reputation { + return Some(cur_node); + } + } + Some(to_try) + }) + .map(|(peer_id, _)| peer_id.clone()); + + if let Some(peer_id) = peer_id { + Some(NotConnectedPeer { + parent: self, + peer_id: Cow::Owned(peer_id), + }) + } else { + None + } + } +} + +/// Grants access to the state of a peer in the `PeersState`. +pub enum Peer<'a> { + /// We are connected to this node. + Connected(ConnectedPeer<'a>), + /// We are not connected to this node. + NotConnected(NotConnectedPeer<'a>), + /// We have never heard of this node. + Unknown(UnknownPeer<'a>), +} + +impl<'a> Peer<'a> { + /// If we are the `Connected` variant, returns the inner `ConnectedPeer`. Returns `None` + /// otherwise. + pub fn into_connected(self) -> Option> { + match self { + Peer::Connected(peer) => Some(peer), + Peer::NotConnected(_) => None, + Peer::Unknown(_) => None, + } + } +} + +/// A peer that is connected to us. +pub struct ConnectedPeer<'a> { + parent: &'a mut PeersState, + peer_id: Cow<'a, PeerId>, +} + +impl<'a> ConnectedPeer<'a> { + /// Destroys this `ConnectedPeer` and returns the `PeerId` inside of it. + pub fn into_peer_id(self) -> PeerId { + self.peer_id.into_owned() + } + + /// Switches the peer to "not connected". + pub fn disconnect(self) -> NotConnectedPeer<'a> { + let connec_state = &mut self.parent.nodes.get_mut(&self.peer_id) + .expect("We only ever build a ConnectedPeer if the node's in the list; QED") + .connection_state; + debug_assert!(connec_state.is_connected()); + *connec_state = ConnectionState::NotConnected; + + NotConnectedPeer { + parent: self.parent, + peer_id: self.peer_id, + } + } + + /// Sets whether or not the node is reserved. + pub fn set_reserved(&mut self, reserved: bool) { + self.parent.nodes.get_mut(&self.peer_id) + .expect("We only ever build a ConnectedPeer if the node's in the list; QED") + .reserved = reserved; + } + + /// Returns the reputation value of the node. + pub fn reputation(&self) -> i32 { + self.parent.nodes.get(&self.peer_id) + .expect("We only ever build a ConnectedPeer if the node's in the list; QED") + .reputation + } + + /// Sets the reputation of the peer. + pub fn set_reputation(&mut self, value: i32) { + self.parent.nodes.get_mut(&self.peer_id) + .expect("We only ever build a ConnectedPeer if the node's in the list; QED") + .reputation = value; + } + + /// Performs an arithmetic addition on the reputation score of that peer. + /// + /// In case of overflow, the value will be capped. + pub fn add_reputation(&mut self, modifier: i32) { + let reputation = &mut self.parent.nodes.get_mut(&self.peer_id) + .expect("We only ever build a ConnectedPeer if the node's in the list; QED") + .reputation; + *reputation = reputation.saturating_add(modifier); + } +} + +/// A peer that is not connected to us. +pub struct NotConnectedPeer<'a> { + parent: &'a mut PeersState, + peer_id: Cow<'a, PeerId>, +} + +impl<'a> NotConnectedPeer<'a> { + /// Tries to set the peer as connected as an outgoing connection. + /// + /// If there are enough slots available, switches the node to "connected" and returns `Ok`. If + /// the slots are full, the node stays "not connected" and we return `Err`. + /// If the node is reserved, this method always succeeds. + /// + /// Note that reserved nodes don't count towards the number of slots. + pub fn try_outgoing(self) -> Result, NotConnectedPeer<'a>> { + if self.is_reserved() { + return Ok(self.force_outgoing()) + } + + // Count number of nodes in our "out" slots and that are not reserved. + let num_out_peers = u32::try_from(self.parent.nodes.values() + .filter(|p| !p.reserved && p.connection_state == ConnectionState::Out) + .count()) + .unwrap_or(u32::max_value()); + + // Note that it is possible for num_out_peers to be strictly superior to the max, in case + // we were connected to reserved node then marked them as not reserved, or if the user + // used `force_outgoing`. + if num_out_peers >= self.parent.max_out { + return Err(self); + } + + Ok(self.force_outgoing()) + } + + /// Sets the peer as connected as an outgoing connection. + pub fn force_outgoing(self) -> ConnectedPeer<'a> { + let connec_state = &mut self.parent.nodes.get_mut(&self.peer_id) + .expect("We only ever build a ConnectedPeer if the node's in the list; QED") + .connection_state; + debug_assert!(!connec_state.is_connected()); + *connec_state = ConnectionState::Out; + + ConnectedPeer { + parent: self.parent, + peer_id: self.peer_id, + } + } + + /// Tries to accept the peer as an incoming connection. + /// + /// If there are enough slots available, switches the node to "connected" and returns `Ok`. If + /// the slots are full, the node stays "not connected" and we return `Err`. + /// + /// Note that reserved nodes don't count towards the number of slots. + pub fn try_accept_incoming(self) -> Result, NotConnectedPeer<'a>> { + if self.is_reserved() { + return Ok(self.force_ingoing()) + } + + // Count number of nodes in our "in" slots and that are not reserved. + let num_in_peers = u32::try_from(self.parent.nodes.values() + .filter(|p| !p.reserved && p.connection_state == ConnectionState::In) + .count()) + .unwrap_or(u32::max_value()); + + // Note that it is possible for num_in_peers to be strictly superior to the max, in case + // we were connected to reserved node then marked them as not reserved. + if num_in_peers >= self.parent.max_in { + return Err(self); + } + + Ok(self.force_ingoing()) + } + + /// Sets the peer as connected as an ingoing connection. + pub fn force_ingoing(self) -> ConnectedPeer<'a> { + let connec_state = &mut self.parent.nodes.get_mut(&self.peer_id) + .expect("We only ever build a ConnectedPeer if the node's in the list; QED") + .connection_state; + debug_assert!(!connec_state.is_connected()); + *connec_state = ConnectionState::In; + + ConnectedPeer { + parent: self.parent, + peer_id: self.peer_id, + } + } + + /// Returns true if the the node is reserved. + pub fn is_reserved(&self) -> bool { + self.parent.nodes.get(&self.peer_id) + .expect("We only ever build a ConnectedPeer if the node's in the list; QED") + .reserved + } + + /// Sets whether or not the node is reserved. + pub fn set_reserved(&mut self, reserved: bool) { + self.parent.nodes.get_mut(&self.peer_id) + .expect("We only ever build a ConnectedPeer if the node's in the list; QED") + .reserved = reserved; + } + + /// Returns the reputation value of the node. + pub fn reputation(&self) -> i32 { + self.parent.nodes.get(&self.peer_id) + .expect("We only ever build a ConnectedPeer if the node's in the list; QED") + .reputation + } + + /// Performs an arithmetic addition on the reputation score of that peer. + /// + /// In case of overflow, the value will be capped. + /// If the peer is unknown to us, we insert it and consider that it has a reputation of 0. + pub fn add_reputation(&mut self, modifier: i32) { + let reputation = &mut self.parent.nodes.get_mut(&self.peer_id) + .expect("We only ever build a ConnectedPeer if the node's in the list; QED") + .reputation; + *reputation = reputation.saturating_add(modifier); + } +} + +/// A peer that we have never heard of. +pub struct UnknownPeer<'a> { + parent: &'a mut PeersState, + peer_id: Cow<'a, PeerId>, +} + +impl<'a> UnknownPeer<'a> { + /// Inserts the peer identity in our list. + /// + /// The node is not reserved and starts with a reputation of 0. You can adjust these default + /// values using the `NotConnectedPeer` that this method returns. + pub fn discover(self) -> NotConnectedPeer<'a> { + self.parent.nodes.insert(self.peer_id.clone().into_owned(), Node { + connection_state: ConnectionState::NotConnected, + reputation: 0, + reserved: false, + }); + + NotConnectedPeer { + parent: self.parent, + peer_id: self.peer_id, + } + } +} + +#[cfg(test)] +mod tests { + use super::{PeersState, Peer}; + use libp2p::PeerId; + + #[test] + fn full_slots_in() { + let mut peers_state = PeersState::new(1, 1); + let id1 = PeerId::random(); + let id2 = PeerId::random(); + + if let Peer::Unknown(e) = peers_state.peer(&id1) { + assert!(e.discover().try_accept_incoming().is_ok()); + } + + if let Peer::Unknown(e) = peers_state.peer(&id2) { + assert!(e.discover().try_accept_incoming().is_err()); + } + } + + #[test] + fn reserved_node_doesnt_use_slot() { + let mut peers_state = PeersState::new(1, 1); + let id1 = PeerId::random(); + let id2 = PeerId::random(); + + if let Peer::Unknown(e) = peers_state.peer(&id1) { + let mut p = e.discover(); + p.set_reserved(true); + assert!(p.try_accept_incoming().is_ok()); + } else { panic!() } + + if let Peer::Unknown(e) = peers_state.peer(&id2) { + assert!(e.discover().try_accept_incoming().is_ok()); + } else { panic!() } + } + + #[test] + fn disconnecting_frees_slot() { + let mut peers_state = PeersState::new(1, 1); + let id1 = PeerId::random(); + let id2 = PeerId::random(); + + if let Peer::Unknown(e) = peers_state.peer(&id1) { + assert!(e.discover().try_accept_incoming().is_ok()); + } else { panic!() } + + if let Peer::Unknown(e) = peers_state.peer(&id2) { + assert!(e.discover().try_accept_incoming().is_err()); + } else { panic!() } + + peers_state.peer(&id1).into_connected().unwrap().disconnect(); + + if let Peer::NotConnected(e) = peers_state.peer(&id2) { + assert!(e.try_accept_incoming().is_ok()); + } + } +} diff --git a/core/peerset/src/slots.rs b/core/peerset/src/slots.rs deleted file mode 100644 index 094e01ac60da2..0000000000000 --- a/core/peerset/src/slots.rs +++ /dev/null @@ -1,222 +0,0 @@ -// Copyright 2018-2019 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -use std::{fmt, mem}; -use libp2p::PeerId; -use linked_hash_map::LinkedHashMap; -use serde_json::json; - -/// Describes the nature of connection with a given peer. -#[derive(Debug, PartialEq, Clone, Copy)] -pub enum SlotType { - /// Reserved peer is a peer we should always stay connected to. - Reserved, - /// Common peer is a type of peer that we stay connected to only if it's - /// useful for us. - Common, -} - -/// Descibes the result of `add_peer` action. -pub enum SlotState { - /// Returned when `add_peer` successfully adds a peer to the slot. - Added(PeerId), - /// Returned when we already have given peer in our list, but it is upgraded from - /// `Common` to `Reserved`. - Upgraded(PeerId), - /// Returned when we should removed a common peer to make space for a reserved peer. - Swaped { - /// Peer was removed from the list. - removed: PeerId, - /// Peer was added to the list. - added: PeerId, - }, - /// Error returned when we are already know about given peer. - AlreadyExists(PeerId), - /// Error returned when list is full and no more peers can be added. - MaxCapacity(PeerId), -} - -/// Contains all information about group of slots. -pub struct Slots { - /// Maximum number of slots. Total number of reserved and common slots must be always - /// smaller or equal to `max_slots`. - max_slots: usize, - /// Reserved slots. - reserved: LinkedHashMap, - /// Common slots. - common: LinkedHashMap, -} - -impl fmt::Debug for Slots { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - struct ListFormatter<'a>(&'a LinkedHashMap); - - impl<'a> fmt::Debug for ListFormatter<'a> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_list().entries(self.0.keys()).finish() - } - } - - f.debug_struct("Slots") - .field("max_slots", &self.max_slots) - .field("reserved", &ListFormatter(&self.reserved)) - .field("common", &ListFormatter(&self.common)) - .finish() - } -} - -impl Slots { - /// Creates a group of slots with a limited size. - pub fn new(max_slots: u32) -> Self { - let max_slots = max_slots as usize; - Slots { - max_slots, - reserved: LinkedHashMap::new(), - common: LinkedHashMap::new(), - } - } - - /// Returns true if one of the slots contains given peer. - pub fn contains(&self, peer_id: &PeerId) -> bool { - self.common.contains_key(peer_id) || self.reserved.contains_key(peer_id) - } - - /// Tries to find a slot for a given peer and returns `SlotState`. - /// - /// - If a peer is already inserted into reserved list or inserted or - /// inserted into common list and readded with the same `SlotType`, - /// the function returns `SlotState::AlreadyExists` - /// - If a peer is already inserted common list returns `SlotState::Upgraded` - /// - If there is no slot for a reserved peer, we try to drop one common peer - /// and it a new reserved one in it's place, function returns `SlotState::Swaped` - /// - If there is no place for a peer, function returns `SlotState::MaxCapacity` - /// - If the peer was simply added, `SlotState::Added` is returned - pub fn add_peer(&mut self, peer_id: PeerId, slot_type: SlotType) -> SlotState { - if self.reserved.contains_key(&peer_id) { - return SlotState::AlreadyExists(peer_id); - } - - if self.common.contains_key(&peer_id) { - if slot_type == SlotType::Reserved { - self.common.remove(&peer_id); - self.reserved.insert(peer_id.clone(), ()); - return SlotState::Upgraded(peer_id); - } else { - return SlotState::AlreadyExists(peer_id); - } - } - - if self.max_slots == (self.common.len() + self.reserved.len()) { - if let SlotType::Reserved = slot_type { - if let Some((to_remove, _)) = self.common.pop_front() { - self.reserved.insert(peer_id.clone(), ()); - return SlotState::Swaped { - removed: to_remove, - added: peer_id, - }; - } - } - return SlotState::MaxCapacity(peer_id); - } - - match slot_type { - SlotType::Common => self.common.insert(peer_id.clone(), ()), - SlotType::Reserved => self.reserved.insert(peer_id.clone(), ()), - }; - - SlotState::Added(peer_id) - } - - /// Pops the oldest reserved peer. If none exists and `reserved_only = false` pops a common peer. - pub fn pop_most_important_peer(&mut self, reserved_only: bool) -> Option<(PeerId, SlotType)> { - if let Some((peer_id, _)) = self.reserved.pop_front() { - return Some((peer_id, SlotType::Reserved)); - } - - if reserved_only { - return None; - } - - self.common.pop_front() - .map(|(peer_id, _)| (peer_id, SlotType::Common)) - } - - /// Removes all common peers from the list and returns an iterator over them. - pub fn clear_common_slots(&mut self) -> impl Iterator { - let slots = mem::replace(&mut self.common, LinkedHashMap::new()); - slots.into_iter().map(|(peer_id, _)| peer_id) - } - - /// Marks given peer as a reserved one. - pub fn mark_reserved(&mut self, peer_id: &PeerId) { - if self.common.remove(peer_id).is_some() { - self.reserved.insert(peer_id.clone(), ()); - } - } - - /// Marks given peer as not reserved one. - pub fn mark_not_reserved(&mut self, peer_id: &PeerId) { - if self.reserved.remove(peer_id).is_some() { - self.common.insert(peer_id.clone(), ()); - } - } - - /// Removes a peer from a list and returns true if it existed. - pub fn remove_peer(&mut self, peer_id: &PeerId) -> bool { - self.common.remove(peer_id).is_some() || self.reserved.remove(peer_id).is_some() - } - - /// Returns true if given peer is reserved. - pub fn is_reserved(&self, peer_id: &PeerId) -> bool { - self.reserved.contains_key(peer_id) - } - - /// Produces a JSON object containing the state of slots, for debugging purposes. - pub fn debug_info(&self) -> serde_json::Value { - json!({ - "max_slots": self.max_slots, - "reserved": self.reserved.keys().map(|peer_id| peer_id.to_base58()).collect::>(), - "common": self.common.keys().map(|peer_id| peer_id.to_base58()).collect::>() - }) - } -} - -#[cfg(test)] -mod tests { - use libp2p::PeerId; - use serde_json::json; - use super::{Slots, SlotType}; - - #[test] - fn test_slots_debug_info() { - let reserved_peer = PeerId::random(); - let reserved_peer2 = PeerId::random(); - let common_peer = PeerId::random(); - let mut slots = Slots::new(10); - - slots.add_peer(reserved_peer.clone(), SlotType::Reserved); - slots.add_peer(reserved_peer2.clone(), SlotType::Reserved); - slots.add_peer(common_peer.clone(), SlotType::Common); - - let expected = json!({ - "max_slots": 10, - "reserved": vec![reserved_peer.to_base58(), reserved_peer2.to_base58()], - "common": vec![common_peer.to_base58()], - }); - - assert_eq!(expected, slots.debug_info()); - } -} From e66343a726e9798a25d7116acbead7fb5408c3c0 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 30 Apr 2019 20:37:59 +0200 Subject: [PATCH 02/11] Fix disconnecting from reserved peers --- core/peerset/src/lib.rs | 16 ++++++++++------ core/peerset/src/peersstate.rs | 7 +++++++ 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index 5550d0d256248..977820a5f4d51 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -196,8 +196,10 @@ impl Peerset { match self.data.peer(&peer_id) { peersstate::Peer::Connected(mut peer) => { peer.set_reserved(false); - peer.disconnect(); - self.message_queue.push_back(Message::Drop(peer_id)); + if self.reserved_only { + peer.disconnect(); + self.message_queue.push_back(Message::Drop(peer_id)); + } } peersstate::Peer::NotConnected(mut peer) => peer.set_reserved(false), peersstate::Peer::Unknown(_) => {} @@ -209,10 +211,12 @@ impl Peerset { self.reserved_only = reserved_only; if self.reserved_only { for peer_id in self.data.connected_peers().cloned().collect::>().into_iter() { - self.data.peer(&peer_id).into_connected() - .expect("We are enumerating connected peers, therefore the peer is connected; qed") - .disconnect(); - self.message_queue.push_back(Message::Drop(peer_id)); + let peer = self.data.peer(&peer_id).into_connected() + .expect("We are enumerating connected peers, therefore the peer is connected; qed"); + if !peer.is_reserved() { + peer.disconnect(); + self.message_queue.push_back(Message::Drop(peer_id)); + } } } else { diff --git a/core/peerset/src/peersstate.rs b/core/peerset/src/peersstate.rs index 457d53998f9bf..718bee13cd254 100644 --- a/core/peerset/src/peersstate.rs +++ b/core/peerset/src/peersstate.rs @@ -240,6 +240,13 @@ impl<'a> ConnectedPeer<'a> { .reserved = reserved; } + /// Returns whether or not the node is reserved. + pub fn is_reserved(&self) -> bool { + self.parent.nodes.get(&self.peer_id) + .expect("We only ever build a ConnectedPeer if the node's in the list; QED") + .reserved + } + /// Returns the reputation value of the node. pub fn reputation(&self) -> i32 { self.parent.nodes.get(&self.peer_id) From f4bf0c7307e5d309ae15496309ee551b119e9700 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 1 May 2019 00:51:21 +0200 Subject: [PATCH 03/11] Minor adjustements --- core/peerset/src/lib.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index 977820a5f4d51..a7bdd04ad6586 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -238,8 +238,8 @@ impl Peerset { /// Updates the value of `self.latest_time_update` and performs all the updates that happen /// over time, such as reputation increases for staying connected. fn update_time(&mut self) { - // We basically do `now - self.latest_update`, except that by the way we do it we know that - // we're not going to miss entire seconds because of rounding to integers. + // We basically do `(now - self.latest_update).as_secs()`, except that by the way we do it + // we know that we're not going to miss entire seconds because of rounding to integers. let secs_diff = { let now = Instant::now(); let elapsed_latest = self.latest_time_update - self.created; @@ -286,7 +286,7 @@ impl Peerset { /// a corresponding `Accept` or `Reject`, except if we were already connected to this peer. /// /// Note that this mechanism is orthogonal to `Connect`/`Drop`. Accepting an incoming - /// connection implicitely means `Accept`, but incoming connections aren't cancelled by + /// connection implicitely means `Connect`, but incoming connections aren't cancelled by /// `dropped`. /// /// Because of concurrency issues, it is acceptable to call `incoming` with a `PeerId` the @@ -320,12 +320,14 @@ impl Peerset { match self.data.peer(&peer_id) { peersstate::Peer::Connected(mut entry) => { // Decrease the node's reputation so that we don't try it again and again and again. - // We decrease by 20% so that it doesn't take forever to remove a node we were - // connected to for a very long time. - entry.set_reputation(entry.reputation() - entry.reputation() / 20); + // We decrease by 20% if it's positive so that it doesn't take forever to remove a + // node we were connected to for a very long time. + if entry.reputation() > 0 { + entry.set_reputation(entry.reputation() - entry.reputation() / 20); + } entry.add_reputation(-10); entry.disconnect(); - }, + } peersstate::Peer::NotConnected(_) | peersstate::Peer::Unknown(_) => error!(target: "peerset", "Received dropped() for non-connected node"), } From 8ec63fdb6e3b912cf42db399bd9480b325756e28 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 2 May 2019 10:59:12 +0200 Subject: [PATCH 04/11] Address review --- core/peerset/src/lib.rs | 2 +- core/peerset/src/peersstate.rs | 74 ++++++++++++++++------------------ 2 files changed, 36 insertions(+), 40 deletions(-) diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index a7bdd04ad6586..a1563e1f08858 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -323,7 +323,7 @@ impl Peerset { // We decrease by 20% if it's positive so that it doesn't take forever to remove a // node we were connected to for a very long time. if entry.reputation() > 0 { - entry.set_reputation(entry.reputation() - entry.reputation() / 20); + entry.set_reputation(entry.reputation() - entry.reputation() / 5); } entry.add_reputation(-10); entry.disconnect(); diff --git a/core/peerset/src/peersstate.rs b/core/peerset/src/peersstate.rs index 718bee13cd254..c2ef9ecfbaa04 100644 --- a/core/peerset/src/peersstate.rs +++ b/core/peerset/src/peersstate.rs @@ -219,11 +219,19 @@ impl<'a> ConnectedPeer<'a> { self.peer_id.into_owned() } - /// Switches the peer to "not connected". - pub fn disconnect(self) -> NotConnectedPeer<'a> { - let connec_state = &mut self.parent.nodes.get_mut(&self.peer_id) + fn state(&self) -> &Node { + self.parent.nodes.get(&self.peer_id) .expect("We only ever build a ConnectedPeer if the node's in the list; QED") - .connection_state; + } + + fn state_mut(&mut self) -> &mut Node { + self.parent.nodes.get_mut(&self.peer_id) + .expect("We only ever build a ConnectedPeer if the node's in the list; QED") + } + + /// Switches the peer to "not connected". + pub fn disconnect(mut self) -> NotConnectedPeer<'a> { + let connec_state = &mut self.state_mut().connection_state; debug_assert!(connec_state.is_connected()); *connec_state = ConnectionState::NotConnected; @@ -235,39 +243,29 @@ impl<'a> ConnectedPeer<'a> { /// Sets whether or not the node is reserved. pub fn set_reserved(&mut self, reserved: bool) { - self.parent.nodes.get_mut(&self.peer_id) - .expect("We only ever build a ConnectedPeer if the node's in the list; QED") - .reserved = reserved; + self.state_mut().reserved = reserved; } /// Returns whether or not the node is reserved. pub fn is_reserved(&self) -> bool { - self.parent.nodes.get(&self.peer_id) - .expect("We only ever build a ConnectedPeer if the node's in the list; QED") - .reserved + self.state().reserved } /// Returns the reputation value of the node. pub fn reputation(&self) -> i32 { - self.parent.nodes.get(&self.peer_id) - .expect("We only ever build a ConnectedPeer if the node's in the list; QED") - .reputation + self.state().reputation } /// Sets the reputation of the peer. pub fn set_reputation(&mut self, value: i32) { - self.parent.nodes.get_mut(&self.peer_id) - .expect("We only ever build a ConnectedPeer if the node's in the list; QED") - .reputation = value; + self.state_mut().reputation = value; } /// Performs an arithmetic addition on the reputation score of that peer. /// /// In case of overflow, the value will be capped. pub fn add_reputation(&mut self, modifier: i32) { - let reputation = &mut self.parent.nodes.get_mut(&self.peer_id) - .expect("We only ever build a ConnectedPeer if the node's in the list; QED") - .reputation; + let reputation = &mut self.state_mut().reputation; *reputation = reputation.saturating_add(modifier); } } @@ -279,6 +277,16 @@ pub struct NotConnectedPeer<'a> { } impl<'a> NotConnectedPeer<'a> { + fn state(&self) -> &Node { + self.parent.nodes.get(&self.peer_id) + .expect("We only ever build a NotConnectedPeer if the node's in the list; QED") + } + + fn state_mut(&mut self) -> &mut Node { + self.parent.nodes.get_mut(&self.peer_id) + .expect("We only ever build a NotConnectedPeer if the node's in the list; QED") + } + /// Tries to set the peer as connected as an outgoing connection. /// /// If there are enough slots available, switches the node to "connected" and returns `Ok`. If @@ -308,10 +316,8 @@ impl<'a> NotConnectedPeer<'a> { } /// Sets the peer as connected as an outgoing connection. - pub fn force_outgoing(self) -> ConnectedPeer<'a> { - let connec_state = &mut self.parent.nodes.get_mut(&self.peer_id) - .expect("We only ever build a ConnectedPeer if the node's in the list; QED") - .connection_state; + pub fn force_outgoing(mut self) -> ConnectedPeer<'a> { + let connec_state = &mut self.state_mut().connection_state; debug_assert!(!connec_state.is_connected()); *connec_state = ConnectionState::Out; @@ -348,10 +354,8 @@ impl<'a> NotConnectedPeer<'a> { } /// Sets the peer as connected as an ingoing connection. - pub fn force_ingoing(self) -> ConnectedPeer<'a> { - let connec_state = &mut self.parent.nodes.get_mut(&self.peer_id) - .expect("We only ever build a ConnectedPeer if the node's in the list; QED") - .connection_state; + pub fn force_ingoing(mut self) -> ConnectedPeer<'a> { + let connec_state = &mut self.state_mut().connection_state; debug_assert!(!connec_state.is_connected()); *connec_state = ConnectionState::In; @@ -363,23 +367,17 @@ impl<'a> NotConnectedPeer<'a> { /// Returns true if the the node is reserved. pub fn is_reserved(&self) -> bool { - self.parent.nodes.get(&self.peer_id) - .expect("We only ever build a ConnectedPeer if the node's in the list; QED") - .reserved + self.state().reserved } /// Sets whether or not the node is reserved. pub fn set_reserved(&mut self, reserved: bool) { - self.parent.nodes.get_mut(&self.peer_id) - .expect("We only ever build a ConnectedPeer if the node's in the list; QED") - .reserved = reserved; + self.state_mut().reserved = reserved; } /// Returns the reputation value of the node. pub fn reputation(&self) -> i32 { - self.parent.nodes.get(&self.peer_id) - .expect("We only ever build a ConnectedPeer if the node's in the list; QED") - .reputation + self.state().reputation } /// Performs an arithmetic addition on the reputation score of that peer. @@ -387,9 +385,7 @@ impl<'a> NotConnectedPeer<'a> { /// In case of overflow, the value will be capped. /// If the peer is unknown to us, we insert it and consider that it has a reputation of 0. pub fn add_reputation(&mut self, modifier: i32) { - let reputation = &mut self.parent.nodes.get_mut(&self.peer_id) - .expect("We only ever build a ConnectedPeer if the node's in the list; QED") - .reputation; + let reputation = &mut self.state_mut().reputation; *reputation = reputation.saturating_add(modifier); } } From 66aae3aa25f96560c8290a679c0d89471c9db0c2 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 3 May 2019 11:45:46 +0200 Subject: [PATCH 05/11] Reputation changes adjustements --- core/peerset/src/lib.rs | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index a1563e1f08858..e4b08ceab0134 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -25,6 +25,9 @@ use libp2p::PeerId; use log::{debug, error, trace}; use serde_json::json; +/// Reputation change for a node when we get disconnected from it. +const DISCONNECT_REPUTATION_CHANGE: i32 = -10; + #[derive(Debug)] enum Action { AddReservedPeer(PeerId), @@ -239,7 +242,7 @@ impl Peerset { /// over time, such as reputation increases for staying connected. fn update_time(&mut self) { // We basically do `(now - self.latest_update).as_secs()`, except that by the way we do it - // we know that we're not going to miss entire seconds because of rounding to integers. + // we know that we're not going to miss seconds because of rounding to integers. let secs_diff = { let now = Instant::now(); let elapsed_latest = self.latest_time_update - self.created; @@ -248,6 +251,16 @@ impl Peerset { elapsed_now.as_secs() - elapsed_latest.as_secs() }; + // For each elapsed second, move the node reputation towards zero. + for _ in 0..secs_diff { + for peer in self.data.connected_peers().cloned().collect::>() { + let mut peer = self.data.peer(&peer).into_connected() + .expect("we iterate over connected peers; qed"); + let cur_reput = peer.reputation(); + peer.set_reputation(cur_reput.saturating_sub(cur_reput / 20)); + } + } + let rep_increase = i32::try_from(secs_diff).unwrap_or(i32::max_value()); self.data.connected_reputation_increase(rep_increase); } @@ -320,12 +333,7 @@ impl Peerset { match self.data.peer(&peer_id) { peersstate::Peer::Connected(mut entry) => { // Decrease the node's reputation so that we don't try it again and again and again. - // We decrease by 20% if it's positive so that it doesn't take forever to remove a - // node we were connected to for a very long time. - if entry.reputation() > 0 { - entry.set_reputation(entry.reputation() - entry.reputation() / 5); - } - entry.add_reputation(-10); + entry.add_reputation(DISCONNECT_REPUTATION_CHANGE); entry.disconnect(); } peersstate::Peer::NotConnected(_) | peersstate::Peer::Unknown(_) => From e5957ebf88cb229c471feb06233ec331dd57db12 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 3 May 2019 11:52:40 +0200 Subject: [PATCH 06/11] More adjustements --- core/peerset/src/lib.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index e4b08ceab0134..67e53f2846c0c 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -252,12 +252,17 @@ impl Peerset { }; // For each elapsed second, move the node reputation towards zero. + // If we multiply each second the reputation by `k` (where `k` is between 0 and 1), it + // takes `ln(0.5) / ln(k)` seconds to reduce the reputation by half. Use this formula to + // empirically determine a value of `k` that looks correct. for _ in 0..secs_diff { for peer in self.data.connected_peers().cloned().collect::>() { let mut peer = self.data.peer(&peer).into_connected() .expect("we iterate over connected peers; qed"); let cur_reput = peer.reputation(); - peer.set_reputation(cur_reput.saturating_sub(cur_reput / 20)); + // We use `k = 0.98`, so we divide by `50`. With that value, it takes 34.3 seconds + // to reduce the reputation by half. + peer.set_reputation(cur_reput.saturating_sub(cur_reput / 50)); } } From b748c1ca0b840361a65f78a50e445aac3873167c Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 3 May 2019 11:56:09 +0200 Subject: [PATCH 07/11] Adjust all reputations --- core/peerset/src/lib.rs | 17 ++++++++++++----- core/peerset/src/peersstate.rs | 5 +++++ 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index 67e53f2846c0c..e833b20f6d4ef 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -256,13 +256,20 @@ impl Peerset { // takes `ln(0.5) / ln(k)` seconds to reduce the reputation by half. Use this formula to // empirically determine a value of `k` that looks correct. for _ in 0..secs_diff { - for peer in self.data.connected_peers().cloned().collect::>() { - let mut peer = self.data.peer(&peer).into_connected() - .expect("we iterate over connected peers; qed"); - let cur_reput = peer.reputation(); + for peer in self.data.peers().cloned().collect::>() { // We use `k = 0.98`, so we divide by `50`. With that value, it takes 34.3 seconds // to reduce the reputation by half. - peer.set_reputation(cur_reput.saturating_sub(cur_reput / 50)); + match self.data.peer(&peer) { + peersstate::Peer::Connected(mut peer) => { + let cur_reput = peer.reputation(); + peer.set_reputation(cur_reput.saturating_sub(cur_reput / 50)); + } + peersstate::Peer::NotConnected(mut peer) => { + let cur_reput = peer.reputation(); + peer.set_reputation(cur_reput.saturating_sub(cur_reput / 50)); + } + peersstate::Peer::Unknown(_) => unreachable!("We iterate over known peers; qed") + } } } diff --git a/core/peerset/src/peersstate.rs b/core/peerset/src/peersstate.rs index c2ef9ecfbaa04..585254c163a9a 100644 --- a/core/peerset/src/peersstate.rs +++ b/core/peerset/src/peersstate.rs @@ -380,6 +380,11 @@ impl<'a> NotConnectedPeer<'a> { self.state().reputation } + /// Sets the reputation of the peer. + pub fn set_reputation(&mut self, value: i32) { + self.state_mut().reputation = value; + } + /// Performs an arithmetic addition on the reputation score of that peer. /// /// In case of overflow, the value will be capped. From 7fb2ed22a923a15961802434f5242cd313a4c30d Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 3 May 2019 12:14:00 +0200 Subject: [PATCH 08/11] More fixes and adjustments --- core/peerset/src/lib.rs | 5 +- core/peerset/src/peersstate.rs | 91 ++++++++++++++++++++++++++-------- 2 files changed, 71 insertions(+), 25 deletions(-) diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index e833b20f6d4ef..d319d3dc57f7d 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -19,7 +19,7 @@ mod peersstate; -use std::{collections::HashMap, collections::VecDeque, convert::TryFrom, time::Instant}; +use std::{collections::HashMap, collections::VecDeque, time::Instant}; use futures::{prelude::*, sync::mpsc, try_ready}; use libp2p::PeerId; use log::{debug, error, trace}; @@ -272,9 +272,6 @@ impl Peerset { } } } - - let rep_increase = i32::try_from(secs_diff).unwrap_or(i32::max_value()); - self.data.connected_reputation_increase(rep_increase); } /// Try to fill available out slots with nodes. diff --git a/core/peerset/src/peersstate.rs b/core/peerset/src/peersstate.rs index 585254c163a9a..4fbfb84ee4ea6 100644 --- a/core/peerset/src/peersstate.rs +++ b/core/peerset/src/peersstate.rs @@ -88,17 +88,6 @@ impl PeersState { } } - /// Adds `value` to the reputation of all the nodes we are connected to. - /// - /// In case of overflow, the value of capped. - pub fn connected_reputation_increase(&mut self, value: i32) { - for (_, peer_state) in self.nodes.iter_mut() { - if peer_state.connection_state.is_connected() { - peer_state.reputation = peer_state.reputation.saturating_add(value); - } - } - } - /// Returns an object that grants access to the state of a peer. pub fn peer<'a>(&'a mut self, peer_id: &'a PeerId) -> Peer<'a> { if let Some(node) = self.nodes.get(peer_id) { @@ -205,6 +194,28 @@ impl<'a> Peer<'a> { Peer::Unknown(_) => None, } } + + /// If we are the `Unknown` variant, returns the inner `ConnectedPeer`. Returns `None` + /// otherwise. + #[cfg(test)] // Feel free to remove this if this function is needed outside of tests + pub fn into_not_connected(self) -> Option> { + match self { + Peer::Connected(_) => None, + Peer::NotConnected(peer) => Some(peer), + Peer::Unknown(_) => None, + } + } + + /// If we are the `Unknown` variant, returns the inner `ConnectedPeer`. Returns `None` + /// otherwise. + #[cfg(test)] // Feel free to remove this if this function is needed outside of tests + pub fn into_unknown(self) -> Option> { + match self { + Peer::Connected(_) => None, + Peer::NotConnected(_) => None, + Peer::Unknown(peer) => Some(peer), + } + } } /// A peer that is connected to us. @@ -277,6 +288,12 @@ pub struct NotConnectedPeer<'a> { } impl<'a> NotConnectedPeer<'a> { + /// Destroys this `NotConnectedPeer` and returns the `PeerId` inside of it. + #[cfg(test)] // Feel free to remove this if this function is needed outside of tests + pub fn into_peer_id(self) -> PeerId { + self.peer_id.into_owned() + } + fn state(&self) -> &Node { self.parent.nodes.get(&self.peer_id) .expect("We only ever build a NotConnectedPeer if the node's in the list; QED") @@ -463,18 +480,50 @@ mod tests { let id1 = PeerId::random(); let id2 = PeerId::random(); - if let Peer::Unknown(e) = peers_state.peer(&id1) { - assert!(e.discover().try_accept_incoming().is_ok()); - } else { panic!() } + assert!(peers_state.peer(&id1).into_unknown().unwrap().discover().try_accept_incoming().is_ok()); + assert!(peers_state.peer(&id2).into_unknown().unwrap().discover().try_accept_incoming().is_err()); + peers_state.peer(&id1).into_connected().unwrap().disconnect(); + assert!(peers_state.peer(&id2).into_not_connected().unwrap().try_accept_incoming().is_ok()); + } - if let Peer::Unknown(e) = peers_state.peer(&id2) { - assert!(e.discover().try_accept_incoming().is_err()); - } else { panic!() } + #[test] + fn reserved_not_connected_peer() { + let mut peers_state = PeersState::new(25, 25); + let id1 = PeerId::random(); + let id2 = PeerId::random(); - peers_state.peer(&id1).into_connected().unwrap().disconnect(); + assert!(peers_state.reserved_not_connected_peer().is_none()); + peers_state.peer(&id1).into_unknown().unwrap().discover(); + peers_state.peer(&id2).into_unknown().unwrap().discover(); - if let Peer::NotConnected(e) = peers_state.peer(&id2) { - assert!(e.try_accept_incoming().is_ok()); - } + assert!(peers_state.reserved_not_connected_peer().is_none()); + peers_state.peer(&id1).into_not_connected().unwrap().set_reserved(true); + assert!(peers_state.reserved_not_connected_peer().is_some()); + peers_state.peer(&id2).into_not_connected().unwrap().set_reserved(true); + peers_state.peer(&id1).into_not_connected().unwrap().set_reserved(false); + assert!(peers_state.reserved_not_connected_peer().is_some()); + peers_state.peer(&id2).into_not_connected().unwrap().set_reserved(false); + assert!(peers_state.reserved_not_connected_peer().is_none()); + } + + #[test] + fn highest_not_connected_peer() { + let mut peers_state = PeersState::new(25, 25); + let id1 = PeerId::random(); + let id2 = PeerId::random(); + + assert!(peers_state.highest_not_connected_peer().is_none()); + peers_state.peer(&id1).into_unknown().unwrap().discover().set_reputation(50); + peers_state.peer(&id2).into_unknown().unwrap().discover().set_reputation(25); + assert_eq!(peers_state.highest_not_connected_peer().map(|p| p.into_peer_id()), Some(id1.clone())); + peers_state.peer(&id2).into_not_connected().unwrap().set_reputation(75); + assert_eq!(peers_state.highest_not_connected_peer().map(|p| p.into_peer_id()), Some(id2.clone())); + peers_state.peer(&id2).into_not_connected().unwrap().force_ingoing(); + assert_eq!(peers_state.highest_not_connected_peer().map(|p| p.into_peer_id()), Some(id1.clone())); + peers_state.peer(&id1).into_not_connected().unwrap().set_reputation(100); + peers_state.peer(&id2).into_connected().unwrap().disconnect(); + assert_eq!(peers_state.highest_not_connected_peer().map(|p| p.into_peer_id()), Some(id1.clone())); + peers_state.peer(&id1).into_not_connected().unwrap().set_reputation(-100); + assert_eq!(peers_state.highest_not_connected_peer().map(|p| p.into_peer_id()), Some(id2.clone())); } } From 4c125a7d5ea70802e131caf98612dde9b8f271c1 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 6 May 2019 14:13:45 +0200 Subject: [PATCH 09/11] Improve proof --- core/peerset/src/peersstate.rs | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/core/peerset/src/peersstate.rs b/core/peerset/src/peersstate.rs index 4fbfb84ee4ea6..232e203497335 100644 --- a/core/peerset/src/peersstate.rs +++ b/core/peerset/src/peersstate.rs @@ -219,6 +219,12 @@ impl<'a> Peer<'a> { } /// A peer that is connected to us. +// +// Implementation note: the fact this object is alive is a guarantee that the peer is in the list +// and in a connected state. It holds a mutable borrow to `PeersState`, guaranteeing that nothing +// else can modify that list except for the local object. Any method on `ConnectedPeer` that +// transitions the state away from "connected and in the list" must destroy the `ConnectedPeer` +// in the process. pub struct ConnectedPeer<'a> { parent: &'a mut PeersState, peer_id: Cow<'a, PeerId>, @@ -232,12 +238,18 @@ impl<'a> ConnectedPeer<'a> { fn state(&self) -> &Node { self.parent.nodes.get(&self.peer_id) - .expect("We only ever build a ConnectedPeer if the node's in the list; QED") + .expect("we only initially create a ConnectedPeer object when have verified that a \ + node is in the list; additionally, keeping the ConnectedPeer alive mutably \ + borrows the list of nodes, guaranteeing that nothing can remove the current \ + node from said list; QED") } fn state_mut(&mut self) -> &mut Node { self.parent.nodes.get_mut(&self.peer_id) - .expect("We only ever build a ConnectedPeer if the node's in the list; QED") + .expect("we only initially create a ConnectedPeer object when have verified that a \ + node is in the list; additionally, keeping the ConnectedPeer alive mutably \ + borrows the list of nodes, guaranteeing that nothing can remove the current \ + node from said list; QED") } /// Switches the peer to "not connected". @@ -282,6 +294,12 @@ impl<'a> ConnectedPeer<'a> { } /// A peer that is not connected to us. +// +// Implementation note: the fact this object is alive is a guarantee that the peer is in the list +// and in a non-connected state. It holds a mutable borrow to `PeersState`, guaranteeing that +// nothing else can modify that list except for the local object. Any method on `NotConnectedPeer` +// that transitions the state away from "not connected and in the list" must destroy the +// `NotConnectedPeer` in the process. pub struct NotConnectedPeer<'a> { parent: &'a mut PeersState, peer_id: Cow<'a, PeerId>, @@ -296,12 +314,18 @@ impl<'a> NotConnectedPeer<'a> { fn state(&self) -> &Node { self.parent.nodes.get(&self.peer_id) - .expect("We only ever build a NotConnectedPeer if the node's in the list; QED") + .expect("we only initially create a NotConnectedPeer object when have verified that a \ + node is in the list; additionally, keeping the NotConnectedPeer alive mutably \ + borrows the list of nodes, guaranteeing that nothing can remove the current \ + node from said list; QED") } fn state_mut(&mut self) -> &mut Node { self.parent.nodes.get_mut(&self.peer_id) - .expect("We only ever build a NotConnectedPeer if the node's in the list; QED") + .expect("we only initially create a NotConnectedPeer object when have verified that a \ + node is in the list; additionally, keeping the NotConnectedPeer alive mutably \ + borrows the list of nodes, guaranteeing that nothing can remove the current \ + node from said list; QED") } /// Tries to set the peer as connected as an outgoing connection. From d13e2799005673b420877f215f176f634d9269f3 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 7 May 2019 16:49:51 +0200 Subject: [PATCH 10/11] Remove the possible panic --- core/peerset/src/peersstate.rs | 263 +++++++++++++++++++-------------- 1 file changed, 148 insertions(+), 115 deletions(-) diff --git a/core/peerset/src/peersstate.rs b/core/peerset/src/peersstate.rs index 232e203497335..4907ccf6e523a 100644 --- a/core/peerset/src/peersstate.rs +++ b/core/peerset/src/peersstate.rs @@ -17,7 +17,7 @@ //! Contains the state storage behind the peerset. use libp2p::PeerId; -use std::{borrow::Cow, collections::HashMap, convert::TryFrom}; +use std::{borrow::Cow, collections::HashMap}; /// State storage behind the peerset. /// @@ -35,6 +35,12 @@ pub struct PeersState { /// sort, to make the logic easier. nodes: HashMap, + /// Number of non-reserved nodes for which the `ConnectionState` is `In`. + num_in: u32, + + /// Number of non-reserved nodes for which the `ConnectionState` is `In`. + num_out: u32, + /// Maximum allowed number of non-reserved nodes for which the `ConnectionState` is `In`. max_in: u32, @@ -83,6 +89,8 @@ impl PeersState { pub fn new(in_peers: u32, out_peers: u32) -> Self { PeersState { nodes: HashMap::new(), + num_in: 0, + num_out: 0, max_in: in_peers, max_out: out_peers, } @@ -90,22 +98,36 @@ impl PeersState { /// Returns an object that grants access to the state of a peer. pub fn peer<'a>(&'a mut self, peer_id: &'a PeerId) -> Peer<'a> { - if let Some(node) = self.nodes.get(peer_id) { - if node.connection_state.is_connected() { - Peer::Connected(ConnectedPeer { - parent: self, - peer_id: Cow::Borrowed(peer_id), - }) - } else { - Peer::NotConnected(NotConnectedPeer { - parent: self, - peer_id: Cow::Borrowed(peer_id), - }) - } - } else { - Peer::Unknown(UnknownPeer { + // Note: the Rust borrow checker still has some issues. In particular, we can't put this + // block as an `else` below (as the obvious solution would be here), or it will complain + // that we borrow `self` while it is already borrowed. + if !self.nodes.contains_key(peer_id) { + return Peer::Unknown(UnknownPeer { parent: self, peer_id: Cow::Borrowed(peer_id), + }); + } + + let state = self.nodes.get_mut(peer_id) + .expect("We check that the value is present right above; QED"); + + if state.connection_state.is_connected() { + Peer::Connected(ConnectedPeer { + state, + peer_id: Cow::Borrowed(peer_id), + num_in: &mut self.num_in, + num_out: &mut self.num_out, + max_in: self.max_in, + max_out: self.max_out, + }) + } else { + Peer::NotConnected(NotConnectedPeer { + state, + peer_id: Cow::Borrowed(peer_id), + num_in: &mut self.num_in, + num_out: &mut self.num_out, + max_in: self.max_in, + max_out: self.max_out, }) } } @@ -130,16 +152,20 @@ impl PeersState { /// /// If multiple nodes are reserved, which one is returned is unspecified. pub fn reserved_not_connected_peer(&mut self) -> Option { - let peer_id = self.nodes.iter_mut() + let outcome = self.nodes.iter_mut() .find(|(_, &mut Node { connection_state, reserved, .. })| { reserved && !connection_state.is_connected() }) - .map(|(peer_id, _)| peer_id.clone()); + .map(|(peer_id, node)| (peer_id.clone(), node)); - if let Some(peer_id) = peer_id { + if let Some((peer_id, state)) = outcome { Some(NotConnectedPeer { - parent: self, + state, peer_id: Cow::Owned(peer_id), + num_in: &mut self.num_in, + num_out: &mut self.num_out, + max_in: self.max_in, + max_out: self.max_out, }) } else { None @@ -150,10 +176,10 @@ impl PeersState { /// /// If multiple nodes have the same reputation, which one is returned is unspecified. pub fn highest_not_connected_peer(&mut self) -> Option { - let peer_id = self.nodes - .iter() + let outcome = self.nodes + .iter_mut() .filter(|(_, Node { connection_state, .. })| !connection_state.is_connected()) - .fold(None::<(&PeerId, &Node)>, |mut cur_node, to_try| { + .fold(None::<(&PeerId, &mut Node)>, |mut cur_node, to_try| { if let Some(cur_node) = cur_node.take() { if cur_node.1.reputation >= to_try.1.reputation { return Some(cur_node); @@ -161,12 +187,16 @@ impl PeersState { } Some(to_try) }) - .map(|(peer_id, _)| peer_id.clone()); + .map(|(peer_id, state)| (peer_id.clone(), state)); - if let Some(peer_id) = peer_id { + if let Some((peer_id, state)) = outcome { Some(NotConnectedPeer { - parent: self, + state, peer_id: Cow::Owned(peer_id), + num_in: &mut self.num_in, + num_out: &mut self.num_out, + max_in: self.max_in, + max_out: self.max_out, }) } else { None @@ -219,15 +249,13 @@ impl<'a> Peer<'a> { } /// A peer that is connected to us. -// -// Implementation note: the fact this object is alive is a guarantee that the peer is in the list -// and in a connected state. It holds a mutable borrow to `PeersState`, guaranteeing that nothing -// else can modify that list except for the local object. Any method on `ConnectedPeer` that -// transitions the state away from "connected and in the list" must destroy the `ConnectedPeer` -// in the process. pub struct ConnectedPeer<'a> { - parent: &'a mut PeersState, + state: &'a mut Node, peer_id: Cow<'a, PeerId>, + num_in: &'a mut u32, + num_out: &'a mut u32, + max_in: u32, + max_out: u32, } impl<'a> ConnectedPeer<'a> { @@ -236,73 +264,83 @@ impl<'a> ConnectedPeer<'a> { self.peer_id.into_owned() } - fn state(&self) -> &Node { - self.parent.nodes.get(&self.peer_id) - .expect("we only initially create a ConnectedPeer object when have verified that a \ - node is in the list; additionally, keeping the ConnectedPeer alive mutably \ - borrows the list of nodes, guaranteeing that nothing can remove the current \ - node from said list; QED") - } - - fn state_mut(&mut self) -> &mut Node { - self.parent.nodes.get_mut(&self.peer_id) - .expect("we only initially create a ConnectedPeer object when have verified that a \ - node is in the list; additionally, keeping the ConnectedPeer alive mutably \ - borrows the list of nodes, guaranteeing that nothing can remove the current \ - node from said list; QED") - } - /// Switches the peer to "not connected". - pub fn disconnect(mut self) -> NotConnectedPeer<'a> { - let connec_state = &mut self.state_mut().connection_state; - debug_assert!(connec_state.is_connected()); + pub fn disconnect(self) -> NotConnectedPeer<'a> { + let connec_state = &mut self.state.connection_state; + + match *connec_state { + ConnectionState::In => *self.num_in -= 1, + ConnectionState::Out => *self.num_out -= 1, + ConnectionState::NotConnected => + debug_assert!(false, "State inconsistency: disconnecting a disconnected node") + } + *connec_state = ConnectionState::NotConnected; NotConnectedPeer { - parent: self.parent, + state: self.state, peer_id: self.peer_id, + num_in: self.num_in, + num_out: self.num_out, + max_in: self.max_in, + max_out: self.max_out, } } /// Sets whether or not the node is reserved. pub fn set_reserved(&mut self, reserved: bool) { - self.state_mut().reserved = reserved; + if reserved { + self.state.reserved = true; + match self.state.connection_state { + ConnectionState::In => *self.num_in -= 1, + ConnectionState::Out => *self.num_out -= 1, + ConnectionState::NotConnected => debug_assert!(false, "State inconsistency: \ + connected node is in fact not connected"), + } + + } else { + self.state.reserved = false; + match self.state.connection_state { + ConnectionState::In => *self.num_in += 1, + ConnectionState::Out => *self.num_out += 1, + ConnectionState::NotConnected => debug_assert!(false, "State inconsistency: \ + connected node is in fact not connected"), + } + } } /// Returns whether or not the node is reserved. pub fn is_reserved(&self) -> bool { - self.state().reserved + self.state.reserved } /// Returns the reputation value of the node. pub fn reputation(&self) -> i32 { - self.state().reputation + self.state.reputation } /// Sets the reputation of the peer. pub fn set_reputation(&mut self, value: i32) { - self.state_mut().reputation = value; + self.state.reputation = value; } /// Performs an arithmetic addition on the reputation score of that peer. /// /// In case of overflow, the value will be capped. pub fn add_reputation(&mut self, modifier: i32) { - let reputation = &mut self.state_mut().reputation; + let reputation = &mut self.state.reputation; *reputation = reputation.saturating_add(modifier); } } /// A peer that is not connected to us. -// -// Implementation note: the fact this object is alive is a guarantee that the peer is in the list -// and in a non-connected state. It holds a mutable borrow to `PeersState`, guaranteeing that -// nothing else can modify that list except for the local object. Any method on `NotConnectedPeer` -// that transitions the state away from "not connected and in the list" must destroy the -// `NotConnectedPeer` in the process. pub struct NotConnectedPeer<'a> { - parent: &'a mut PeersState, + state: &'a mut Node, peer_id: Cow<'a, PeerId>, + num_in: &'a mut u32, + num_out: &'a mut u32, + max_in: u32, + max_out: u32, } impl<'a> NotConnectedPeer<'a> { @@ -312,22 +350,6 @@ impl<'a> NotConnectedPeer<'a> { self.peer_id.into_owned() } - fn state(&self) -> &Node { - self.parent.nodes.get(&self.peer_id) - .expect("we only initially create a NotConnectedPeer object when have verified that a \ - node is in the list; additionally, keeping the NotConnectedPeer alive mutably \ - borrows the list of nodes, guaranteeing that nothing can remove the current \ - node from said list; QED") - } - - fn state_mut(&mut self) -> &mut Node { - self.parent.nodes.get_mut(&self.peer_id) - .expect("we only initially create a NotConnectedPeer object when have verified that a \ - node is in the list; additionally, keeping the NotConnectedPeer alive mutably \ - borrows the list of nodes, guaranteeing that nothing can remove the current \ - node from said list; QED") - } - /// Tries to set the peer as connected as an outgoing connection. /// /// If there are enough slots available, switches the node to "connected" and returns `Ok`. If @@ -340,16 +362,10 @@ impl<'a> NotConnectedPeer<'a> { return Ok(self.force_outgoing()) } - // Count number of nodes in our "out" slots and that are not reserved. - let num_out_peers = u32::try_from(self.parent.nodes.values() - .filter(|p| !p.reserved && p.connection_state == ConnectionState::Out) - .count()) - .unwrap_or(u32::max_value()); - - // Note that it is possible for num_out_peers to be strictly superior to the max, in case - // we were connected to reserved node then marked them as not reserved, or if the user - // used `force_outgoing`. - if num_out_peers >= self.parent.max_out { + // Note that it is possible for num_out to be strictly superior to the max, in case we were + // connected to reserved node then marked them as not reserved, or if the user used + // `force_outgoing`. + if *self.num_out >= self.max_out { return Err(self); } @@ -357,14 +373,22 @@ impl<'a> NotConnectedPeer<'a> { } /// Sets the peer as connected as an outgoing connection. - pub fn force_outgoing(mut self) -> ConnectedPeer<'a> { - let connec_state = &mut self.state_mut().connection_state; + pub fn force_outgoing(self) -> ConnectedPeer<'a> { + let connec_state = &mut self.state.connection_state; debug_assert!(!connec_state.is_connected()); *connec_state = ConnectionState::Out; + if !self.state.reserved { + *self.num_out += 1; + } + ConnectedPeer { - parent: self.parent, + state: self.state, peer_id: self.peer_id, + num_in: self.num_in, + num_out: self.num_out, + max_in: self.max_in, + max_out: self.max_out, } } @@ -379,15 +403,9 @@ impl<'a> NotConnectedPeer<'a> { return Ok(self.force_ingoing()) } - // Count number of nodes in our "in" slots and that are not reserved. - let num_in_peers = u32::try_from(self.parent.nodes.values() - .filter(|p| !p.reserved && p.connection_state == ConnectionState::In) - .count()) - .unwrap_or(u32::max_value()); - - // Note that it is possible for num_in_peers to be strictly superior to the max, in case - // we were connected to reserved node then marked them as not reserved. - if num_in_peers >= self.parent.max_in { + // Note that it is possible for num_in to be strictly superior to the max, in case we were + // connected to reserved node then marked them as not reserved. + if *self.num_in >= self.max_in { return Err(self); } @@ -395,35 +413,43 @@ impl<'a> NotConnectedPeer<'a> { } /// Sets the peer as connected as an ingoing connection. - pub fn force_ingoing(mut self) -> ConnectedPeer<'a> { - let connec_state = &mut self.state_mut().connection_state; + pub fn force_ingoing(self) -> ConnectedPeer<'a> { + let connec_state = &mut self.state.connection_state; debug_assert!(!connec_state.is_connected()); *connec_state = ConnectionState::In; + if !self.state.reserved { + *self.num_in += 1; + } + ConnectedPeer { - parent: self.parent, + state: self.state, peer_id: self.peer_id, + num_in: self.num_in, + num_out: self.num_out, + max_in: self.max_in, + max_out: self.max_out, } } - /// Returns true if the the node is reserved. - pub fn is_reserved(&self) -> bool { - self.state().reserved - } - /// Sets whether or not the node is reserved. pub fn set_reserved(&mut self, reserved: bool) { - self.state_mut().reserved = reserved; + self.state.reserved = reserved; + } + + /// Returns true if the the node is reserved. + pub fn is_reserved(&self) -> bool { + self.state.reserved } /// Returns the reputation value of the node. pub fn reputation(&self) -> i32 { - self.state().reputation + self.state.reputation } /// Sets the reputation of the peer. pub fn set_reputation(&mut self, value: i32) { - self.state_mut().reputation = value; + self.state.reputation = value; } /// Performs an arithmetic addition on the reputation score of that peer. @@ -431,7 +457,7 @@ impl<'a> NotConnectedPeer<'a> { /// In case of overflow, the value will be capped. /// If the peer is unknown to us, we insert it and consider that it has a reputation of 0. pub fn add_reputation(&mut self, modifier: i32) { - let reputation = &mut self.state_mut().reputation; + let reputation = &mut self.state.reputation; *reputation = reputation.saturating_add(modifier); } } @@ -454,9 +480,16 @@ impl<'a> UnknownPeer<'a> { reserved: false, }); + let state = self.parent.nodes.get_mut(&self.peer_id) + .expect("We insert that key into the HashMap right above; QED"); + NotConnectedPeer { - parent: self.parent, + state, peer_id: self.peer_id, + num_in: &mut self.parent.num_in, + num_out: &mut self.parent.num_out, + max_in: self.parent.max_in, + max_out: self.parent.max_out, } } } From 8eba5a955c4b4ca0ee5cadc8cb17734b6183431e Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 8 May 2019 15:25:54 +0200 Subject: [PATCH 11/11] Make sure reputation reaches 0 --- core/peerset/src/lib.rs | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index d319d3dc57f7d..a419b743d34a3 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -259,15 +259,20 @@ impl Peerset { for peer in self.data.peers().cloned().collect::>() { // We use `k = 0.98`, so we divide by `50`. With that value, it takes 34.3 seconds // to reduce the reputation by half. - match self.data.peer(&peer) { - peersstate::Peer::Connected(mut peer) => { - let cur_reput = peer.reputation(); - peer.set_reputation(cur_reput.saturating_sub(cur_reput / 50)); - } - peersstate::Peer::NotConnected(mut peer) => { - let cur_reput = peer.reputation(); - peer.set_reputation(cur_reput.saturating_sub(cur_reput / 50)); + fn reput_tick(reput: i32) -> i32 { + let mut diff = -reput / 50; + if diff == 0 && reput < 0 { + diff = 1; + } else if diff == 0 && reput > 0 { + diff = -1; } + reput.saturating_add(diff) + } + match self.data.peer(&peer) { + peersstate::Peer::Connected(mut peer) => + peer.set_reputation(reput_tick(peer.reputation())), + peersstate::Peer::NotConnected(mut peer) => + peer.set_reputation(reput_tick(peer.reputation())), peersstate::Peer::Unknown(_) => unreachable!("We iterate over known peers; qed") } }