Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 7 additions & 4 deletions codechain/run_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ use crate::json::PasswordFile;
use crate::rpc::{rpc_http_start, rpc_ipc_start, rpc_ws_start};
use crate::rpc_apis::ApiDependencies;
use ccore::{
AccountProvider, AccountProviderError, BlockId, ChainNotify, Client, ClientConfig, ClientService, EngineInfo,
EngineType, Miner, MinerService, Scheme, Stratum, StratumConfig, StratumError, NUM_COLUMNS,
AccountProvider, AccountProviderError, BlockId, ChainNotify, Client, ClientConfig, ClientService, EngineClient,
EngineInfo, EngineType, Miner, MinerService, PeerDb, Scheme, Stratum, StratumConfig, StratumError, NUM_COLUMNS,
};
use cdiscovery::{Config, Discovery};
use ckey::{Address, NetworkId, PlatformAddress};
use ckeystore::accounts_dir::RootDiskDirectory;
use ckeystore::KeyStore;
use clap::ArgMatches;
use clogger::{self, EmailAlarm, LoggerConfig};
use cnetwork::{Filters, NetworkConfig, NetworkControl, NetworkService, RoutingTable, SocketAddr};
use cnetwork::{Filters, ManagingPeerdb, NetworkConfig, NetworkControl, NetworkService, RoutingTable, SocketAddr};
use csync::{BlockSyncExtension, BlockSyncSender, SnapshotService, TransactionSyncExtension};
use ctimer::TimerLoop;
use ctrlc::CtrlC;
Expand All @@ -48,6 +48,7 @@ fn network_start(
timer_loop: TimerLoop,
cfg: &NetworkConfig,
routing_table: Arc<RoutingTable>,
peer_db: Box<dyn ManagingPeerdb>,
) -> Result<Arc<NetworkService>, String> {
let addr = cfg.address.parse().map_err(|_| format!("Invalid NETWORK listen host given: {}", cfg.address))?;
let sockaddress = SocketAddr::new(addr, cfg.port);
Expand All @@ -61,6 +62,7 @@ fn network_start(
cfg.max_peers,
filters,
routing_table,
peer_db,
)
.map_err(|e| format!("Network service error: {:?}", e))?;

Expand Down Expand Up @@ -282,8 +284,9 @@ pub fn run_node(matches: &ArgMatches) -> Result<(), String> {
// XXX: What should we do if the network id has been changed.
let c = client.client();
let network_id = c.network_id();
let peer_db = PeerDb::new(c.get_kvdb());
let routing_table = RoutingTable::new();
let service = network_start(network_id, timer_loop, &network_config, Arc::clone(&routing_table))?;
let service = network_start(network_id, timer_loop, &network_config, Arc::clone(&routing_table), peer_db)?;

if config.network.discovery.unwrap() {
discovery_start(&service, &config.network, routing_table)?;
Expand Down
2 changes: 2 additions & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub mod encoded;
mod error;
mod invoice;
mod miner;
mod peer_db;
mod scheme;
mod service;
mod transaction;
Expand All @@ -89,6 +90,7 @@ pub use crate::consensus::{EngineType, TimeGapParams};
pub use crate::db::{COL_STATE, NUM_COLUMNS};
pub use crate::error::{BlockImportError, Error, ImportError};
pub use crate::miner::{MemPoolFees, Miner, MinerOptions, MinerService, Stratum, StratumConfig, StratumError};
pub use crate::peer_db::PeerDb;
pub use crate::scheme::Scheme;
pub use crate::service::ClientService;
pub use crate::transaction::{
Expand Down
99 changes: 99 additions & 0 deletions core/src/peer_db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2020 Kodebox, Inc.
// This file is part of CodeChain.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::db::COL_EXTRA;
use cnetwork::{ManagingPeerdb, SocketAddr};
use kvdb::{DBTransaction, KeyValueDB};
use parking_lot::Mutex;
use rlp::RlpStream;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

pub struct PeerDb {
db: Arc<dyn KeyValueDB>,
peers_and_count: Mutex<(HashMap<SocketAddr, u64>, usize)>,
}

impl PeerDb {
pub fn new(database: Arc<dyn KeyValueDB>) -> Box<Self> {
Box::new(Self {
db: database,
peers_and_count: Default::default(),
})
}
}

impl ManagingPeerdb for PeerDb {
fn insert(&self, key: SocketAddr) {
let (peers, count) = &mut *self.peers_and_count.lock();
peers.entry(key).or_insert_with(|| {
*count += 1;
SystemTime::now().duration_since(UNIX_EPOCH).expect("There is no time machine.").as_secs()
});

if let Some(batch) = get_db_transaction_if_enough_hit(peers, count) {
self.db.write(batch).expect("The DB must alive");
}
}

fn delete(&self, key: &SocketAddr) {
let (peers, count) = &mut *self.peers_and_count.lock();
if peers.remove(key).is_some() {
*count += 1;
}

if let Some(batch) = get_db_transaction_if_enough_hit(peers, count) {
self.db.write(batch).expect("The DB must alive");
}
}
}

// XXX: It may not be needed. Generally, in the p2p networks, the old node lives longer.
// Exaggeratedly, the new node does not affect the stability of the network. In other words, it
// doesn't matter even we don't restore the fresh updated nodes.
impl Drop for PeerDb {
fn drop(&mut self) {
let (peers, _) = &*self.peers_and_count.lock();
let batch = get_db_transaction(peers);
self.db.write(batch).expect("The DB must alive");
}
}

fn get_db_transaction_if_enough_hit(peers: &HashMap<SocketAddr, u64>, count: &mut usize) -> Option<DBTransaction> {
const UPDATE_AT: usize = 10;
if *count < UPDATE_AT {
return None
}

*count = 0;

Some(get_db_transaction(peers))
}

fn get_db_transaction(peers: &HashMap<SocketAddr, u64>) -> DBTransaction {
let mut s = RlpStream::new_list(peers.len());
for (address, time) in peers {
s.begin_list(2).append(address).append(time);
}
let encoded = s.drain();

let mut batch = DBTransaction::new();

const COLUMN_TO_WRITE: Option<u32> = COL_EXTRA;
const PEER_DB_KEY: &[u8] = b"peer-list";
batch.put(COLUMN_TO_WRITE, PEER_DB_KEY, &encoded);
batch
}
1 change: 1 addition & 0 deletions network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ crossbeam-channel = "0.3"
finally-block = "0.1"
primitives = { git = "https://github.com/CodeChain-io/rust-codechain-primitives.git", version = "0.4" }
log = "0.4.6"
kvdb = "0.1"
mio = "0.6.16"
never-type = "0.1.0"
parking_lot = "0.6.0"
Expand Down
2 changes: 2 additions & 0 deletions network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ extern crate rand;
extern crate rlp;
#[macro_use]
extern crate rlp_derive;
extern crate kvdb;
extern crate never_type;
extern crate table as ctable;
extern crate time;
Expand Down Expand Up @@ -64,6 +65,7 @@ pub use crate::extension::{
pub use crate::node_id::{IntoSocketAddr, NodeId};
pub use crate::service::{Error as NetworkServiceError, Service as NetworkService};

pub use self::p2p::{Handler, ManagingPeerdb};
pub use crate::filters::{FilterEntry, Filters, FiltersControl};
pub use crate::routing_table::RoutingTable;

Expand Down
16 changes: 14 additions & 2 deletions network/src/p2p/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ const RETRY_SYNC_MAX: Duration = Duration::from_secs(10); // T1
const RTT: Duration = Duration::from_secs(10); // T2
const WAIT_SYNC: Duration = Duration::from_secs(30); // T3 >> T1 + RTT

pub trait ManagingPeerdb: Send + Sync {
fn insert(&self, key: SocketAddr);
fn delete(&self, key: &SocketAddr);
}

pub struct Handler {
connecting_lock: Mutex<()>,
channel: IoChannel<Message>,
Expand Down Expand Up @@ -113,7 +118,7 @@ pub struct Handler {

min_peers: usize,
max_peers: usize,

peer_db: Box<dyn (ManagingPeerdb)>,
rng: Mutex<OsRng>,
}

Expand All @@ -128,6 +133,7 @@ impl Handler {
bootstrap_addresses: Vec<SocketAddr>,
min_peers: usize,
max_peers: usize,
peer_db: Box<dyn ManagingPeerdb>,
) -> ::std::result::Result<Self, String> {
if MAX_INBOUND_CONNECTIONS + MAX_OUTBOUND_CONNECTIONS < max_peers {
return Err(format!("Max peers must be less than {}", MAX_INBOUND_CONNECTIONS + MAX_OUTBOUND_CONNECTIONS))
Expand Down Expand Up @@ -166,7 +172,7 @@ impl Handler {
bootstrap_addresses,
min_peers,
max_peers,

peer_db,
rng: Mutex::new(OsRng::new().unwrap()),
})
}
Expand Down Expand Up @@ -491,6 +497,8 @@ impl IoHandler<Message> for Handler {
is_inbound: true,
} => {
let mut inbound_connections = self.inbound_connections.write();
let target = connection.peer_addr();
self.peer_db.insert(*target);
if let Some(token) = self.inbound_tokens.lock().gen() {
let remote_node_id = connection.peer_addr().into();
assert_eq!(
Expand Down Expand Up @@ -1055,6 +1063,8 @@ impl IoHandler<Message> for Handler {
self.routing_table.remove(con.peer_addr());
self.inbound_tokens.lock().restore(stream);
ctrace!(NETWORK, "Inbound connect({}) removed", stream);
let remove_target = con.peer_addr();
self.peer_db.delete(&remove_target);
} else {
cdebug!(NETWORK, "Invalid inbound token({}) on deregister", stream);
}
Expand All @@ -1069,6 +1079,8 @@ impl IoHandler<Message> for Handler {
unreachable!("{} has no node id", stream);
}
con.deregister(event_loop)?;
let remove_target = con.peer_addr();
self.peer_db.delete(&remove_target);
self.routing_table.remove(con.peer_addr());
self.outbound_tokens.lock().restore(stream);
ctrace!(NETWORK, "Outbound connect({}) removed", stream);
Expand Down
2 changes: 1 addition & 1 deletion network/src/p2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ mod listener;
mod message;
mod stream;

pub use self::handler::{Handler, Message};
pub use self::handler::{Handler, ManagingPeerdb, Message};
use self::message::{ExtensionMessage, Message as NetworkMessage, NegotiationMessage, SignedMessage};
4 changes: 3 additions & 1 deletion network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::client::Client;
use crate::control::{Control, Error as ControlError};
use crate::filters::{FilterEntry, FiltersControl};
use crate::routing_table::RoutingTable;
use crate::{p2p, Api, NetworkExtension, SocketAddr};
use crate::{p2p, Api, ManagingPeerdb, NetworkExtension, SocketAddr};
use cidr::IpCidr;
use cio::{IoError, IoService};
use ckey::{NetworkId, Public};
Expand Down Expand Up @@ -46,6 +46,7 @@ impl Service {
max_peers: usize,
filters_control: Arc<dyn FiltersControl>,
routing_table: Arc<RoutingTable>,
peer_db: Box<dyn ManagingPeerdb>,
) -> Result<Arc<Self>, Error> {
let p2p = IoService::start("P2P")?;

Expand All @@ -61,6 +62,7 @@ impl Service {
bootstrap_addresses,
min_peers,
max_peers,
peer_db,
)?);
p2p.register_handler(p2p_handler.clone())?;

Expand Down