diff --git a/Cargo.lock b/Cargo.lock index 003a23cca0..b156e752fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -469,6 +469,7 @@ dependencies = [ "codechain-types", "crossbeam-channel", "finally-block", + "kvdb", "log 0.4.6", "mio", "never-type", diff --git a/codechain/run_node.rs b/codechain/run_node.rs index fa7f57cb08..c6f0e4e4b8 100644 --- a/codechain/run_node.rs +++ b/codechain/run_node.rs @@ -21,8 +21,8 @@ use crate::json::PasswordFile; use crate::rpc::{rpc_http_start, rpc_ipc_start, rpc_ws_start}; use crate::rpc_apis::ApiDependencies; use ccore::{ - AccountProvider, AccountProviderError, BlockId, ChainNotify, Client, ClientConfig, ClientService, EngineInfo, - EngineType, Miner, MinerService, Scheme, Stratum, StratumConfig, StratumError, NUM_COLUMNS, + AccountProvider, AccountProviderError, BlockId, ChainNotify, Client, ClientConfig, ClientService, EngineClient, + EngineInfo, EngineType, Miner, MinerService, PeerDb, Scheme, Stratum, StratumConfig, StratumError, NUM_COLUMNS, }; use cdiscovery::{Config, Discovery}; use ckey::{Address, NetworkId, PlatformAddress}; @@ -30,7 +30,7 @@ use ckeystore::accounts_dir::RootDiskDirectory; use ckeystore::KeyStore; use clap::ArgMatches; use clogger::{self, EmailAlarm, LoggerConfig}; -use cnetwork::{Filters, NetworkConfig, NetworkControl, NetworkService, RoutingTable, SocketAddr}; +use cnetwork::{Filters, ManagingPeerdb, NetworkConfig, NetworkControl, NetworkService, RoutingTable, SocketAddr}; use csync::{BlockSyncExtension, BlockSyncSender, SnapshotService, TransactionSyncExtension}; use ctimer::TimerLoop; use ctrlc::CtrlC; @@ -48,6 +48,7 @@ fn network_start( timer_loop: TimerLoop, cfg: &NetworkConfig, routing_table: Arc, + peer_db: Box, ) -> Result, String> { let addr = cfg.address.parse().map_err(|_| format!("Invalid NETWORK listen host given: {}", cfg.address))?; let sockaddress = SocketAddr::new(addr, cfg.port); @@ -61,6 +62,7 @@ fn network_start( cfg.max_peers, filters, routing_table, + peer_db, ) .map_err(|e| format!("Network service error: {:?}", e))?; @@ -282,8 +284,9 @@ pub fn run_node(matches: &ArgMatches) -> Result<(), String> { // XXX: What should we do if the network id has been changed. let c = client.client(); let network_id = c.network_id(); + let peer_db = PeerDb::new(c.get_kvdb()); let routing_table = RoutingTable::new(); - let service = network_start(network_id, timer_loop, &network_config, Arc::clone(&routing_table))?; + let service = network_start(network_id, timer_loop, &network_config, Arc::clone(&routing_table), peer_db)?; if config.network.discovery.unwrap() { discovery_start(&service, &config.network, routing_table)?; diff --git a/core/src/lib.rs b/core/src/lib.rs index b648f0744c..9e378dca47 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -68,6 +68,7 @@ pub mod encoded; mod error; mod invoice; mod miner; +mod peer_db; mod scheme; mod service; mod transaction; @@ -89,6 +90,7 @@ pub use crate::consensus::{EngineType, TimeGapParams}; pub use crate::db::{COL_STATE, NUM_COLUMNS}; pub use crate::error::{BlockImportError, Error, ImportError}; pub use crate::miner::{MemPoolFees, Miner, MinerOptions, MinerService, Stratum, StratumConfig, StratumError}; +pub use crate::peer_db::PeerDb; pub use crate::scheme::Scheme; pub use crate::service::ClientService; pub use crate::transaction::{ diff --git a/core/src/peer_db.rs b/core/src/peer_db.rs new file mode 100644 index 0000000000..9a1c9206de --- /dev/null +++ b/core/src/peer_db.rs @@ -0,0 +1,99 @@ +// Copyright 2020 Kodebox, Inc. +// This file is part of CodeChain. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . +use crate::db::COL_EXTRA; +use cnetwork::{ManagingPeerdb, SocketAddr}; +use kvdb::{DBTransaction, KeyValueDB}; +use parking_lot::Mutex; +use rlp::RlpStream; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; + +pub struct PeerDb { + db: Arc, + peers_and_count: Mutex<(HashMap, usize)>, +} + +impl PeerDb { + pub fn new(database: Arc) -> Box { + Box::new(Self { + db: database, + peers_and_count: Default::default(), + }) + } +} + +impl ManagingPeerdb for PeerDb { + fn insert(&self, key: SocketAddr) { + let (peers, count) = &mut *self.peers_and_count.lock(); + peers.entry(key).or_insert_with(|| { + *count += 1; + SystemTime::now().duration_since(UNIX_EPOCH).expect("There is no time machine.").as_secs() + }); + + if let Some(batch) = get_db_transaction_if_enough_hit(peers, count) { + self.db.write(batch).expect("The DB must alive"); + } + } + + fn delete(&self, key: &SocketAddr) { + let (peers, count) = &mut *self.peers_and_count.lock(); + if peers.remove(key).is_some() { + *count += 1; + } + + if let Some(batch) = get_db_transaction_if_enough_hit(peers, count) { + self.db.write(batch).expect("The DB must alive"); + } + } +} + +// XXX: It may not be needed. Generally, in the p2p networks, the old node lives longer. +// Exaggeratedly, the new node does not affect the stability of the network. In other words, it +// doesn't matter even we don't restore the fresh updated nodes. +impl Drop for PeerDb { + fn drop(&mut self) { + let (peers, _) = &*self.peers_and_count.lock(); + let batch = get_db_transaction(peers); + self.db.write(batch).expect("The DB must alive"); + } +} + +fn get_db_transaction_if_enough_hit(peers: &HashMap, count: &mut usize) -> Option { + const UPDATE_AT: usize = 10; + if *count < UPDATE_AT { + return None + } + + *count = 0; + + Some(get_db_transaction(peers)) +} + +fn get_db_transaction(peers: &HashMap) -> DBTransaction { + let mut s = RlpStream::new_list(peers.len()); + for (address, time) in peers { + s.begin_list(2).append(address).append(time); + } + let encoded = s.drain(); + + let mut batch = DBTransaction::new(); + + const COLUMN_TO_WRITE: Option = COL_EXTRA; + const PEER_DB_KEY: &[u8] = b"peer-list"; + batch.put(COLUMN_TO_WRITE, PEER_DB_KEY, &encoded); + batch +} diff --git a/network/Cargo.toml b/network/Cargo.toml index df726548b8..c3c89e7502 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -15,6 +15,7 @@ crossbeam-channel = "0.3" finally-block = "0.1" primitives = { git = "https://github.com/CodeChain-io/rust-codechain-primitives.git", version = "0.4" } log = "0.4.6" +kvdb = "0.1" mio = "0.6.16" never-type = "0.1.0" parking_lot = "0.6.0" diff --git a/network/src/lib.rs b/network/src/lib.rs index d30d04971c..489aaa5e5b 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -36,6 +36,7 @@ extern crate rand; extern crate rlp; #[macro_use] extern crate rlp_derive; +extern crate kvdb; extern crate never_type; extern crate table as ctable; extern crate time; @@ -64,6 +65,7 @@ pub use crate::extension::{ pub use crate::node_id::{IntoSocketAddr, NodeId}; pub use crate::service::{Error as NetworkServiceError, Service as NetworkService}; +pub use self::p2p::{Handler, ManagingPeerdb}; pub use crate::filters::{FilterEntry, Filters, FiltersControl}; pub use crate::routing_table::RoutingTable; diff --git a/network/src/p2p/handler.rs b/network/src/p2p/handler.rs index 883c5dcf14..4a1498ff16 100644 --- a/network/src/p2p/handler.rs +++ b/network/src/p2p/handler.rs @@ -78,6 +78,11 @@ const RETRY_SYNC_MAX: Duration = Duration::from_secs(10); // T1 const RTT: Duration = Duration::from_secs(10); // T2 const WAIT_SYNC: Duration = Duration::from_secs(30); // T3 >> T1 + RTT +pub trait ManagingPeerdb: Send + Sync { + fn insert(&self, key: SocketAddr); + fn delete(&self, key: &SocketAddr); +} + pub struct Handler { connecting_lock: Mutex<()>, channel: IoChannel, @@ -113,7 +118,7 @@ pub struct Handler { min_peers: usize, max_peers: usize, - + peer_db: Box, rng: Mutex, } @@ -128,6 +133,7 @@ impl Handler { bootstrap_addresses: Vec, min_peers: usize, max_peers: usize, + peer_db: Box, ) -> ::std::result::Result { if MAX_INBOUND_CONNECTIONS + MAX_OUTBOUND_CONNECTIONS < max_peers { return Err(format!("Max peers must be less than {}", MAX_INBOUND_CONNECTIONS + MAX_OUTBOUND_CONNECTIONS)) @@ -166,7 +172,7 @@ impl Handler { bootstrap_addresses, min_peers, max_peers, - + peer_db, rng: Mutex::new(OsRng::new().unwrap()), }) } @@ -491,6 +497,8 @@ impl IoHandler for Handler { is_inbound: true, } => { let mut inbound_connections = self.inbound_connections.write(); + let target = connection.peer_addr(); + self.peer_db.insert(*target); if let Some(token) = self.inbound_tokens.lock().gen() { let remote_node_id = connection.peer_addr().into(); assert_eq!( @@ -1055,6 +1063,8 @@ impl IoHandler for Handler { self.routing_table.remove(con.peer_addr()); self.inbound_tokens.lock().restore(stream); ctrace!(NETWORK, "Inbound connect({}) removed", stream); + let remove_target = con.peer_addr(); + self.peer_db.delete(&remove_target); } else { cdebug!(NETWORK, "Invalid inbound token({}) on deregister", stream); } @@ -1069,6 +1079,8 @@ impl IoHandler for Handler { unreachable!("{} has no node id", stream); } con.deregister(event_loop)?; + let remove_target = con.peer_addr(); + self.peer_db.delete(&remove_target); self.routing_table.remove(con.peer_addr()); self.outbound_tokens.lock().restore(stream); ctrace!(NETWORK, "Outbound connect({}) removed", stream); diff --git a/network/src/p2p/mod.rs b/network/src/p2p/mod.rs index d62ee99491..dd4a702f93 100644 --- a/network/src/p2p/mod.rs +++ b/network/src/p2p/mod.rs @@ -20,5 +20,5 @@ mod listener; mod message; mod stream; -pub use self::handler::{Handler, Message}; +pub use self::handler::{Handler, ManagingPeerdb, Message}; use self::message::{ExtensionMessage, Message as NetworkMessage, NegotiationMessage, SignedMessage}; diff --git a/network/src/service.rs b/network/src/service.rs index fa552eb3fb..5c41059d4e 100644 --- a/network/src/service.rs +++ b/network/src/service.rs @@ -18,7 +18,7 @@ use crate::client::Client; use crate::control::{Control, Error as ControlError}; use crate::filters::{FilterEntry, FiltersControl}; use crate::routing_table::RoutingTable; -use crate::{p2p, Api, NetworkExtension, SocketAddr}; +use crate::{p2p, Api, ManagingPeerdb, NetworkExtension, SocketAddr}; use cidr::IpCidr; use cio::{IoError, IoService}; use ckey::{NetworkId, Public}; @@ -46,6 +46,7 @@ impl Service { max_peers: usize, filters_control: Arc, routing_table: Arc, + peer_db: Box, ) -> Result, Error> { let p2p = IoService::start("P2P")?; @@ -61,6 +62,7 @@ impl Service { bootstrap_addresses, min_peers, max_peers, + peer_db, )?); p2p.register_handler(p2p_handler.clone())?;