From 1539ad2479bffbc0e8b60126f07c9095cf012433 Mon Sep 17 00:00:00 2001 From: Joonmo Yang Date: Mon, 18 Nov 2019 16:53:58 +0900 Subject: [PATCH 01/12] Send and receive snapshot chunk requests --- sync/src/block/extension.rs | 184 ++++++++++++++++++++++++--- util/merkle/src/snapshot/compress.rs | 2 +- util/merkle/src/snapshot/mod.rs | 31 +++-- 3 files changed, 185 insertions(+), 32 deletions(-) diff --git a/sync/src/block/extension.rs b/sync/src/block/extension.rs index 1fa19a0749..2f00c47799 100644 --- a/sync/src/block/extension.rs +++ b/sync/src/block/extension.rs @@ -22,6 +22,8 @@ use ccore::{ ImportError, StateInfo, UnverifiedTransaction, }; use cdb::AsHashDB; +use cmerkle::snapshot::ChunkDecompressor; +use cmerkle::snapshot::Restore as SnapshotRestore; use cmerkle::{Trie, TrieFactory}; use cnetwork::{Api, EventSender, NetworkExtension, NodeId}; use cstate::{FindActionHandler, TopStateView}; @@ -29,6 +31,7 @@ use ctimer::TimerToken; use ctypes::header::{Header, Seal}; use ctypes::transaction::Action; use ctypes::{BlockHash, BlockNumber, ShardId}; +use kvdb::DBTransaction; use primitives::{H256, U256}; use rand::prelude::SliceRandom; use rand::thread_rng; @@ -57,7 +60,10 @@ pub struct TokenInfo { enum State { SnapshotHeader(BlockHash, u64), SnapshotBody(BlockHash), - SnapshotTopChunk(H256), + SnapshotTopChunk { + block: BlockHash, + restore: SnapshotRestore, + }, SnapshotShardChunk(ShardId, H256), Full, } @@ -80,7 +86,10 @@ impl State { let state_root = header.state_root(); let top_trie = TrieFactory::readonly(state_db.as_hashdb(), &state_root); if !top_trie.map(|t| t.is_complete()).unwrap_or(false) { - return State::SnapshotTopChunk(state_root) + return State::SnapshotTopChunk { + block: hash, + restore: SnapshotRestore::new(state_root), + } } let top_state = client.state_at(hash.into()).expect("Top level state at the snapshot header exists"); @@ -251,6 +260,37 @@ impl Extension { self.check_sync_variable(); } + fn send_chunk_request(&mut self, block: &BlockHash, root: &H256) { + let have_chunk_request = self.requests.values().flatten().any(|r| match r { + (_, RequestMessage::StateChunk(..)) => true, + _ => false, + }); + + if !have_chunk_request { + let mut peer_ids: Vec<_> = self.header_downloaders.keys().cloned().collect(); + peer_ids.shuffle(&mut thread_rng()); + if let Some(id) = peer_ids.first() { + if let Some(requests) = self.requests.get_mut(&id) { + let req = RequestMessage::StateChunk(*block, vec![*root]); + cdebug!(SYNC, "Request chunk to {} {:?}", id, req); + let request_id = self.last_request; + self.last_request += 1; + requests.push((request_id, req.clone())); + self.api.send(id, Arc::new(Message::Request(request_id, req).rlp_bytes())); + + let token = &self.tokens[id]; + let token_info = self.tokens_info.get_mut(token).unwrap(); + + let _ = self.api.clear_timer(*token); + self.api + .set_timer_once(*token, Duration::from_millis(SYNC_EXPIRE_REQUEST_INTERVAL)) + .expect("Timer set succeeds"); + token_info.request_id = Some(request_id); + } + } + } + } + fn check_sync_variable(&self) { let mut has_error = false; for id in self.header_downloaders.keys() { @@ -267,6 +307,14 @@ impl Extension { }) .collect(); + let chunk_requests: Vec = requests + .iter() + .filter_map(|r| match r { + (_, RequestMessage::StateChunk(..)) => Some(r.1.clone()), + _ => None, + }) + .collect(); + if body_requests.len() > 1 { cerror!(SYNC, "Body request length {} > 1, body_requests: {:?}", body_requests.len(), body_requests); has_error = true; @@ -275,16 +323,18 @@ impl Extension { let token = &self.tokens[id]; let token_info = &self.tokens_info[token]; - match (token_info.request_id, body_requests.len()) { + match (token_info.request_id, body_requests.len() + chunk_requests.len()) { (Some(_), 1) => {} (None, 0) => {} _ => { cerror!( SYNC, - "request_id: {:?}, body_requests.len(): {}, body_requests: {:?}", + "request_id: {:?}, body_requests.len(): {}, body_requests: {:?}, chunk_requests.len(): {}, chunk_requests: {:?}", token_info.request_id, body_requests.len(), - body_requests + body_requests, + chunk_requests.len(), + chunk_requests ); has_error = true; } @@ -388,7 +438,16 @@ impl NetworkExtension for Extension { } } State::SnapshotBody(..) => unimplemented!(), - State::SnapshotTopChunk(..) => unimplemented!(), + State::SnapshotTopChunk { + block, + ref mut restore, + } => { + if let Some(root) = restore.next_to_feed() { + self.send_chunk_request(&block, &root); + } else { + self.transition_to_full(); + } + } State::SnapshotShardChunk(..) => unimplemented!(), State::Full => { for id in &peer_ids { @@ -485,13 +544,18 @@ impl Extension { State::SnapshotHeader(hash, ..) => { if imported.contains(&hash) { let header = self.client.block_header(&BlockId::Hash(hash)).expect("Imported header must exist"); - Some(State::SnapshotTopChunk(header.state_root())) + Some(State::SnapshotTopChunk { + block: hash, + restore: SnapshotRestore::new(header.state_root()), + }) } else { None } } State::SnapshotBody(..) => unimplemented!(), - State::SnapshotTopChunk(..) => unimplemented!(), + State::SnapshotTopChunk { + .. + } => None, State::SnapshotShardChunk(..) => unimplemented!(), State::Full => { for peer in self.header_downloaders.values_mut() { @@ -598,7 +662,7 @@ impl Extension { RequestMessage::Bodies(hashes) => !hashes.is_empty(), RequestMessage::StateChunk { .. - } => unimplemented!(), + } => true, } } @@ -677,7 +741,24 @@ impl Extension { self.on_body_response(hashes, bodies); self.check_sync_variable(); } - ResponseMessage::StateChunk(..) => unimplemented!(), + ResponseMessage::StateChunk(chunks) => { + let roots = match request { + RequestMessage::StateChunk(_, roots) => roots, + _ => unreachable!(), + }; + if let Some(token) = self.tokens.get(from) { + if let Some(token_info) = self.tokens_info.get_mut(token) { + if token_info.request_id.is_none() { + ctrace!(SYNC, "Expired before handling response"); + return + } + self.api.clear_timer(*token).expect("Timer clear succeed"); + token_info.request_id = None; + } + } + self.dismiss_request(from, id); + self.on_chunk_response(from, &roots, &chunks); + } } } } @@ -731,12 +812,10 @@ impl Extension { } true } - ( - RequestMessage::StateChunk { - .. - }, - ResponseMessage::StateChunk(..), - ) => unimplemented!(), + (RequestMessage::StateChunk(_, roots), ResponseMessage::StateChunk(chunks)) => { + // Check length + roots.len() == chunks.len() + } _ => { cwarn!(SYNC, "Invalid response type"); false @@ -750,7 +829,12 @@ impl Extension { State::SnapshotHeader(hash, _) => match headers { [header] if header.hash() == hash => { match self.client.import_bootstrap_header(&header) { - Ok(_) | Err(BlockImportError::Import(ImportError::AlreadyInChain)) => {} + Ok(_) | Err(BlockImportError::Import(ImportError::AlreadyInChain)) => { + self.state = State::SnapshotTopChunk { + block: hash, + restore: SnapshotRestore::new(*header.state_root()), + }; + } Err(BlockImportError::Import(ImportError::AlreadyQueued)) => {} // FIXME: handle import errors Err(err) => { @@ -767,7 +851,9 @@ impl Extension { ), }, State::SnapshotBody(..) => {} - State::SnapshotTopChunk(..) => {} + State::SnapshotTopChunk { + .. + } => {} State::SnapshotShardChunk(..) => {} State::Full => { let (mut completed, peer_is_caught_up) = if let Some(peer) = self.header_downloaders.get_mut(from) { @@ -850,6 +936,68 @@ impl Extension { self.send_body_request(&id); } } + + fn on_chunk_response(&mut self, from: &NodeId, roots: &[H256], chunks: &[Vec]) { + assert_eq!(roots.len(), chunks.len()); + if let State::SnapshotTopChunk { + block, + ref mut restore, + } = self.state + { + for (r, c) in roots.iter().zip(chunks) { + if c.is_empty() { + cdebug!(SYNC, "Peer {} sent empty response for chunk request {}", from, r); + continue + } + let decompressor = ChunkDecompressor::from_slice(c); + let raw_chunk = match decompressor.decompress() { + Ok(chunk) => chunk, + Err(e) => { + cwarn!(SYNC, "Decode failed for chunk response from peer {}: {}", from, e); + continue + } + }; + let recovered = match raw_chunk.recover(*r) { + Ok(chunk) => chunk, + Err(e) => { + cwarn!(SYNC, "Invalid chunk response from peer {}: {}", from, e); + continue + } + }; + + let batch = { + let mut state_db = self.client.state_db().write(); + let hash_db = state_db.as_hashdb_mut(); + restore.feed(hash_db, recovered); + + let mut batch = DBTransaction::new(); + match state_db.journal_under(&mut batch, 0, H256::zero()) { + Ok(_) => batch, + Err(e) => { + cwarn!(SYNC, "Failed to write state chunk to database: {}", e); + continue + } + } + }; + self.client.db().write_buffered(batch); + match self.client.db().flush() { + Ok(_) => cdebug!(SYNC, "Wrote state chunk to database: {}", r), + Err(e) => cwarn!(SYNC, "Failed to flush database: {}", e), + } + } + + if let Some(root) = restore.next_to_feed() { + self.send_chunk_request(&block, &root); + } else { + self.transition_to_full(); + } + } + } + + fn transition_to_full(&mut self) { + cdebug!(SYNC, "Transitioning state to {:?}", State::Full); + self.state = State::Full; + } } pub struct BlockSyncSender(EventSender); diff --git a/util/merkle/src/snapshot/compress.rs b/util/merkle/src/snapshot/compress.rs index f122cc0cca..419a6c9e1d 100644 --- a/util/merkle/src/snapshot/compress.rs +++ b/util/merkle/src/snapshot/compress.rs @@ -33,7 +33,7 @@ impl ChunkDecompressor { } impl<'a> ChunkDecompressor> { - fn from_slice(slice: &'a [u8]) -> Self { + pub fn from_slice(slice: &'a [u8]) -> Self { ChunkDecompressor::new(Cursor::new(slice)) } } diff --git a/util/merkle/src/snapshot/mod.rs b/util/merkle/src/snapshot/mod.rs index 9072d28d2d..ca4ba5e280 100644 --- a/util/merkle/src/snapshot/mod.rs +++ b/util/merkle/src/snapshot/mod.rs @@ -34,22 +34,20 @@ const CHUNK_MAX_NODES: usize = 256; // 16 ^ (CHUNK_HEIGHT-1) /// Example: /// use codechain_merkle::snapshot::Restore; -/// let mut rm = Restore::new(db, root); +/// let mut rm = Restore::new(root); /// while let Some(root) = rm.next_to_feed() { /// let raw_chunk = request(block_hash, root)?; /// let chunk = raw_chunk.recover(root)?; -/// rm.feed(chunk); +/// rm.feed(db, chunk); /// } -pub struct Restore<'a> { - db: &'a mut dyn HashDB, +pub struct Restore { pending: Option, unresolved: OrderedHeap>, } -impl<'a> Restore<'a> { - pub fn new(db: &'a mut dyn HashDB, merkle_root: H256) -> Self { +impl Restore { + pub fn new(merkle_root: H256) -> Self { let mut result = Restore { - db, pending: None, unresolved: OrderedHeap::new(), }; @@ -59,13 +57,13 @@ impl<'a> Restore<'a> { result } - pub fn feed(&mut self, chunk: RecoveredChunk) { + pub fn feed(&mut self, db: &mut dyn HashDB, chunk: RecoveredChunk) { let pending_path = self.pending.take().expect("feed() should be called after next()"); assert_eq!(pending_path.chunk_root, chunk.root, "Unexpected chunk"); // Pour nodes into the DB for (_, value) in chunk.nodes { - self.db.insert(&value); + db.insert(&value); } // Extend search paths @@ -77,8 +75,9 @@ impl<'a> Restore<'a> { } pub fn next_to_feed(&mut self) -> Option { - if let Some(path) = self.unresolved.pop() { - assert!(self.pending.is_none(), "Previous feed() was failed"); + if let Some(pending) = &self.pending { + Some(pending.chunk_root) + } else if let Some(path) = self.unresolved.pop() { let chunk_root = path.chunk_root; self.pending = Some(path.0); @@ -89,6 +88,12 @@ impl<'a> Restore<'a> { } } +impl std::fmt::Debug for Restore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + f.debug_struct("Restore").field("pending", &self.pending).field("unresolved", &"<...>".to_string()).finish() + } +} + /// Example: /// use std::fs::File; /// use codechain_merkle::snapshot::Snapshot; @@ -290,10 +295,10 @@ mod tests { dbg!(chunks.len()); let mut db = MemoryDB::new(); - let mut recover = Restore::new(&mut db, root); + let mut recover = Restore::new(root); while let Some(chunk_root) = recover.next_to_feed() { let recovered = chunks[&chunk_root].recover(chunk_root).unwrap(); - recover.feed(recovered); + recover.feed(&mut db, recovered); } let trie = TrieDB::try_new(&db, &root).unwrap(); From eaf653bbf31139cd62fa9160c0a1973fa14ff2c7 Mon Sep 17 00:00:00 2001 From: Joonmo Yang Date: Mon, 18 Nov 2019 19:02:52 +0900 Subject: [PATCH 02/12] Serve snapshot responses to peers --- foundry/run_node.rs | 5 ++++- sync/src/block/extension.rs | 25 ++++++++++++++++++++++--- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/foundry/run_node.rs b/foundry/run_node.rs index 2fc23dfb78..c0bf085712 100644 --- a/foundry/run_node.rs +++ b/foundry/run_node.rs @@ -279,7 +279,10 @@ pub fn run_node(matches: &ArgMatches<'_>) -> Result<(), String> { (Some(hash), Some(num)) => Some((hash, num)), _ => None, }; - service.register_extension(move |api| BlockSyncExtension::new(client, api, snapshot_target)) + let snapshot_dir = config.snapshot.path.clone(); + service.register_extension(move |api| { + BlockSyncExtension::new(client, api, snapshot_target, snapshot_dir) + }) }; let sync = Arc::new(BlockSyncSender::from(sync_sender.clone())); client.client().add_notify(Arc::downgrade(&sync) as Weak); diff --git a/sync/src/block/extension.rs b/sync/src/block/extension.rs index 2f00c47799..5d57f3ab9d 100644 --- a/sync/src/block/extension.rs +++ b/sync/src/block/extension.rs @@ -16,6 +16,7 @@ use super::downloader::{BodyDownloader, HeaderDownloader}; use super::message::{Message, RequestMessage, ResponseMessage}; +use crate::snapshot::snapshot_path; use ccore::encoded::Header as EncodedHeader; use ccore::{ Block, BlockChainClient, BlockChainTrait, BlockId, BlockImportError, BlockStatus, ChainNotify, Client, ImportBlock, @@ -38,6 +39,7 @@ use rand::thread_rng; use rlp::{Encodable, Rlp}; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; +use std::fs; use std::sync::Arc; use std::time::Duration; use token_generator::TokenGenerator; @@ -125,10 +127,16 @@ pub struct Extension { api: Box, last_request: u64, seq: u64, + snapshot_dir: Option, } impl Extension { - pub fn new(client: Arc, api: Box, snapshot_target: Option<(H256, u64)>) -> Extension { + pub fn new( + client: Arc, + api: Box, + snapshot_target: Option<(H256, u64)>, + snapshot_dir: Option, + ) -> Extension { api.set_timer(SYNC_TIMER_TOKEN, Duration::from_millis(SYNC_TIMER_INTERVAL)).expect("Timer set succeeds"); let state = State::initial(&client, snapshot_target); @@ -165,6 +173,7 @@ impl Extension { api, last_request: Default::default(), seq: Default::default(), + snapshot_dir, } } @@ -700,8 +709,18 @@ impl Extension { ResponseMessage::Bodies(bodies) } - fn create_state_chunk_response(&self, _hash: BlockHash, _tree_root: Vec) -> ResponseMessage { - unimplemented!() + fn create_state_chunk_response(&self, hash: BlockHash, chunk_roots: Vec) -> ResponseMessage { + let mut result = Vec::new(); + for root in chunk_roots { + if let Some(dir) = &self.snapshot_dir { + let chunk_path = snapshot_path(&dir, &hash, &root); + match fs::read(chunk_path) { + Ok(chunk) => result.push(chunk), + _ => result.push(Vec::new()), + } + } + } + ResponseMessage::StateChunk(result) } fn on_peer_response(&mut self, from: &NodeId, id: u64, mut response: ResponseMessage) { From df46c2de385932eac8774e1d6f3ef6063b52128d Mon Sep 17 00:00:00 2001 From: Joonmo Yang Date: Thu, 21 Nov 2019 18:30:55 +0900 Subject: [PATCH 03/12] Move to SnapshotBody state after downloading the snapthot header --- sync/src/block/extension.rs | 103 ++++++++++++++++-------------------- 1 file changed, 47 insertions(+), 56 deletions(-) diff --git a/sync/src/block/extension.rs b/sync/src/block/extension.rs index 5d57f3ab9d..12fc479a7d 100644 --- a/sync/src/block/extension.rs +++ b/sync/src/block/extension.rs @@ -61,7 +61,10 @@ pub struct TokenInfo { #[derive(Debug)] enum State { SnapshotHeader(BlockHash, u64), - SnapshotBody(BlockHash), + SnapshotBody { + block: BlockHash, + prev_root: H256, + }, SnapshotTopChunk { block: BlockHash, restore: SnapshotRestore, @@ -81,7 +84,13 @@ impl State { _ => return State::SnapshotHeader(hash, num), }; if client.block_body(&hash.into()).is_none() { - return State::SnapshotBody(hash) + let parent_hash = header.parent_hash(); + let parent = + client.block_header(&parent_hash.into()).expect("Parent header of the snapshot header must exist"); + return State::SnapshotBody { + block: hash, + prev_root: parent.transactions_root(), + } } let state_db = client.state_db().read(); @@ -441,12 +450,14 @@ impl NetworkExtension for Extension { State::SnapshotHeader(_, num) => { for id in &peer_ids { self.send_header_request(id, RequestMessage::Headers { - start_number: num, - max_count: 1, + start_number: num - 1, + max_count: 2, }); } } - State::SnapshotBody(..) => unimplemented!(), + State::SnapshotBody { + .. + } => unimplemented!(), State::SnapshotTopChunk { block, ref mut restore, @@ -549,55 +560,32 @@ pub enum Event { impl Extension { fn new_headers(&mut self, imported: Vec, enacted: Vec, retracted: Vec) { - if let Some(next_state) = match self.state { - State::SnapshotHeader(hash, ..) => { - if imported.contains(&hash) { - let header = self.client.block_header(&BlockId::Hash(hash)).expect("Imported header must exist"); - Some(State::SnapshotTopChunk { - block: hash, - restore: SnapshotRestore::new(header.state_root()), - }) - } else { - None - } + if let State::Full = self.state { + for peer in self.header_downloaders.values_mut() { + peer.mark_as_imported(imported.clone()); } - State::SnapshotBody(..) => unimplemented!(), - State::SnapshotTopChunk { - .. - } => None, - State::SnapshotShardChunk(..) => unimplemented!(), - State::Full => { - for peer in self.header_downloaders.values_mut() { - peer.mark_as_imported(imported.clone()); - } - - let mut headers_to_download: Vec<_> = enacted - .into_iter() - .map(|hash| self.client.block_header(&BlockId::Hash(hash)).expect("Enacted header must exist")) - .collect(); - headers_to_download.sort_unstable_by_key(EncodedHeader::number); - #[allow(clippy::redundant_closure)] - // False alarm. https://github.com/rust-lang/rust-clippy/issues/1439 - headers_to_download.dedup_by_key(|h| h.hash()); - - let headers: Vec<_> = headers_to_download - .into_iter() - .filter(|header| self.client.block_body(&BlockId::Hash(header.hash())).is_none()) - .collect(); // FIXME: No need to collect here if self is not borrowed. - for header in headers { - let parent = self - .client - .block_header(&BlockId::Hash(header.parent_hash())) - .expect("Enacted header must have parent"); - let is_empty = header.transactions_root() == parent.transactions_root(); - self.body_downloader.add_target(&header.decode(), is_empty); - } - self.body_downloader.remove_target(&retracted); - None + let mut headers_to_download: Vec<_> = enacted + .into_iter() + .map(|hash| self.client.block_header(&BlockId::Hash(hash)).expect("Enacted header must exist")) + .collect(); + headers_to_download.sort_unstable_by_key(EncodedHeader::number); + #[allow(clippy::redundant_closure)] + // False alarm. https://github.com/rust-lang/rust-clippy/issues/1439 + headers_to_download.dedup_by_key(|h| h.hash()); + + let headers: Vec<_> = headers_to_download + .into_iter() + .filter(|header| self.client.block_body(&BlockId::Hash(header.hash())).is_none()) + .collect(); // FIXME: No need to collect here if self is not borrowed. + for header in headers { + let parent = self + .client + .block_header(&BlockId::Hash(header.parent_hash())) + .expect("Enacted header must have parent"); + let is_empty = header.transactions_root() == parent.transactions_root(); + self.body_downloader.add_target(&header.decode(), is_empty); } - } { - cdebug!(SYNC, "Transitioning state to {:?}", next_state); - self.state = next_state; + self.body_downloader.remove_target(&retracted); } } @@ -846,13 +834,14 @@ impl Extension { ctrace!(SYNC, "Received header response from({}) with length({})", from, headers.len()); match self.state { State::SnapshotHeader(hash, _) => match headers { - [header] if header.hash() == hash => { + [parent, header] if header.hash() == hash => { match self.client.import_bootstrap_header(&header) { Ok(_) | Err(BlockImportError::Import(ImportError::AlreadyInChain)) => { - self.state = State::SnapshotTopChunk { + self.state = State::SnapshotBody { block: hash, - restore: SnapshotRestore::new(*header.state_root()), + prev_root: *parent.transactions_root(), }; + cdebug!(SYNC, "Transitioning state to {:?}", self.state); } Err(BlockImportError::Import(ImportError::AlreadyQueued)) => {} // FIXME: handle import errors @@ -869,7 +858,9 @@ impl Extension { headers.len() ), }, - State::SnapshotBody(..) => {} + State::SnapshotBody { + .. + } => {} State::SnapshotTopChunk { .. } => {} From 1cb2b05837e746ba6f39923f04fce484909b8e5b Mon Sep 17 00:00:00 2001 From: Joonmo Yang Date: Thu, 21 Nov 2019 20:10:48 +0900 Subject: [PATCH 04/12] Import snapshot block with body --- core/src/blockchain/blockchain.rs | 40 ++++++--- core/src/client/client.rs | 12 +-- core/src/client/importer.rs | 14 +-- core/src/client/mod.rs | 10 +-- core/src/client/test_client.rs | 4 +- sync/src/block/extension.rs | 141 ++++++++++++++++++++---------- 6 files changed, 142 insertions(+), 79 deletions(-) diff --git a/core/src/blockchain/blockchain.rs b/core/src/blockchain/blockchain.rs index d4767b55e2..90a2e06d6b 100644 --- a/core/src/blockchain/blockchain.rs +++ b/core/src/blockchain/blockchain.rs @@ -96,18 +96,6 @@ impl BlockChain { } } - pub fn insert_bootstrap_header(&self, batch: &mut DBTransaction, header: &HeaderView) { - self.headerchain.insert_bootstrap_header(batch, header); - - let hash = header.hash(); - - *self.pending_best_block_hash.write() = Some(hash); - batch.put(db::COL_EXTRA, BEST_BLOCK_KEY, &hash); - - *self.pending_best_proposal_block_hash.write() = Some(hash); - batch.put(db::COL_EXTRA, BEST_PROPOSAL_BLOCK_KEY, &hash); - } - pub fn insert_header( &self, batch: &mut DBTransaction, @@ -120,6 +108,34 @@ impl BlockChain { } } + pub fn insert_bootstrap_block(&self, batch: &mut DBTransaction, bytes: &[u8]) { + let block = BlockView::new(bytes); + let header = block.header_view(); + let hash = header.hash(); + + ctrace!(BLOCKCHAIN, "Inserting bootstrap block #{}({}) to the blockchain.", header.number(), hash); + + if self.is_known(&hash) { + cdebug!(BLOCKCHAIN, "Block #{}({}) is already known.", header.number(), hash); + return + } + + assert!(self.pending_best_block_hash.read().is_none()); + assert!(self.pending_best_proposal_block_hash.read().is_none()); + + self.headerchain.insert_bootstrap_header(batch, &header); + self.body_db.insert_body(batch, &block); + self.body_db.update_best_block(batch, &BestBlockChanged::CanonChainAppended { + best_block: bytes.to_vec(), + }); + + *self.pending_best_block_hash.write() = Some(hash); + batch.put(db::COL_EXTRA, BEST_BLOCK_KEY, &hash); + + *self.pending_best_proposal_block_hash.write() = Some(hash); + batch.put(db::COL_EXTRA, BEST_PROPOSAL_BLOCK_KEY, &hash); + } + /// Inserts the block into backing cache database. /// Expects the block to be valid and already verified. /// If the block is already known, does nothing. diff --git a/core/src/client/client.rs b/core/src/client/client.rs index 43f879c790..4676b3bba5 100644 --- a/core/src/client/client.rs +++ b/core/src/client/client.rs @@ -20,7 +20,7 @@ use super::{ ClientConfig, DatabaseClient, EngineClient, EngineInfo, ExecuteClient, ImportBlock, ImportResult, MiningBlockChainClient, Shard, StateInfo, StateOrBlock, TextClient, }; -use crate::block::{ClosedBlock, IsBlock, OpenBlock, SealedBlock}; +use crate::block::{Block, ClosedBlock, IsBlock, OpenBlock, SealedBlock}; use crate::blockchain::{BlockChain, BlockProvider, BodyProvider, HeaderProvider, InvoiceProvider, TransactionAddress}; use crate::client::{ConsensusClient, SnapshotClient, TermInfo}; use crate::consensus::{CodeChainEngine, EngineError}; @@ -41,7 +41,7 @@ use cstate::{ }; use ctimer::{TimeoutHandler, TimerApi, TimerScheduleError, TimerToken}; use ctypes::transaction::{AssetTransferInput, PartialHashing, ShardTransaction}; -use ctypes::{BlockHash, BlockNumber, CommonParams, Header, ShardId, Tracker, TxHash}; +use ctypes::{BlockHash, BlockNumber, CommonParams, ShardId, Tracker, TxHash}; use cvm::{decode, execute, ChainTimeInfo, ScriptResult, VMConfig}; use kvdb::{DBTransaction, KeyValueDB}; use parking_lot::{Mutex, RwLock, RwLockReadGuard}; @@ -635,13 +635,13 @@ impl ImportBlock for Client { Ok(self.importer.header_queue.import(unverified)?) } - fn import_bootstrap_header(&self, header: &Header) -> Result { - if self.block_chain().is_known_header(&header.hash()) { + fn import_bootstrap_block(&self, block: &Block) -> Result { + if self.block_chain().is_known(&block.header.hash()) { return Err(BlockImportError::Import(ImportError::AlreadyInChain)) } let import_lock = self.importer.import_lock.lock(); - self.importer.import_bootstrap_header(header, self, &import_lock); - Ok(header.hash()) + self.importer.import_bootstrap_block(block, self, &import_lock); + Ok(block.header.hash()) } fn import_sealed_block(&self, block: &SealedBlock) -> ImportResult { diff --git a/core/src/client/importer.rs b/core/src/client/importer.rs index 7309322103..b922869e1d 100644 --- a/core/src/client/importer.rs +++ b/core/src/client/importer.rs @@ -15,7 +15,7 @@ // along with this program. If not, see . use super::{BlockChainTrait, Client, ClientConfig}; -use crate::block::{enact, IsBlock, LockedBlock}; +use crate::block::{enact, Block, IsBlock, LockedBlock}; use crate::blockchain::{BodyProvider, HeaderProvider, ImportRoute}; use crate::client::EngineInfo; use crate::consensus::CodeChainEngine; @@ -27,7 +27,7 @@ use crate::verification::queue::{BlockQueue, HeaderQueue}; use crate::verification::{self, PreverifiedBlock, Verifier}; use crate::views::{BlockView, HeaderView}; use cio::IoChannel; -use ctypes::header::Header; +use ctypes::header::{Header, Seal}; use ctypes::BlockHash; use kvdb::DBTransaction; use parking_lot::{Mutex, MutexGuard}; @@ -359,19 +359,21 @@ impl Importer { imported.len() } - pub fn import_bootstrap_header<'a>(&'a self, header: &'a Header, client: &Client, _importer_lock: &MutexGuard<()>) { + pub fn import_bootstrap_block<'a>(&'a self, block: &'a Block, client: &Client, _importer_lock: &MutexGuard<()>) { + let header = &block.header; let hash = header.hash(); - ctrace!(CLIENT, "Importing bootstrap header {}-{:?}", header.number(), hash); + ctrace!(CLIENT, "Importing bootstrap block #{}-{:?}", header.number(), hash); { let chain = client.block_chain(); let mut batch = DBTransaction::new(); - chain.insert_bootstrap_header(&mut batch, &HeaderView::new(&header.rlp_bytes())); + chain.insert_bootstrap_block(&mut batch, &block.rlp_bytes(&Seal::With)); client.db().write_buffered(batch); chain.commit(); } - client.new_headers(&[hash], &[], &[hash], &[], &[], Some(hash)); + self.miner.chain_new_blocks(client, &[hash], &[], &[hash], &[]); + client.new_blocks(&[hash], &[], &[hash], &[], &[]); client.db().flush().expect("DB flush failed."); } diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index 427c08d341..30a09b5226 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -28,7 +28,7 @@ pub use self::client::Client; pub use self::config::ClientConfig; pub use self::test_client::TestBlockChainClient; -use crate::block::{ClosedBlock, OpenBlock, SealedBlock}; +use crate::block::{Block, ClosedBlock, OpenBlock, SealedBlock}; use crate::blockchain_info::BlockChainInfo; use crate::consensus::EngineError; use crate::encoded; @@ -41,7 +41,7 @@ use cmerkle::Result as TrieResult; use cnetwork::NodeId; use cstate::{AssetScheme, FindActionHandler, OwnedAsset, StateResult, Text, TopLevelState, TopStateView}; use ctypes::transaction::{AssetTransferInput, PartialHashing, ShardTransaction}; -use ctypes::{BlockHash, BlockNumber, CommonParams, Header, ShardId, Tracker, TxHash}; +use ctypes::{BlockHash, BlockNumber, CommonParams, ShardId, Tracker, TxHash}; use cvm::ChainTimeInfo; use kvdb::KeyValueDB; use primitives::{Bytes, H160, H256, U256}; @@ -193,9 +193,9 @@ pub trait ImportBlock { /// Import a header into the blockchain fn import_header(&self, bytes: Bytes) -> Result; - /// Import a trusted bootstrap header into the blockchain - /// Bootstrap headers don't execute any verifications - fn import_bootstrap_header(&self, bytes: &Header) -> Result; + /// Import a trusted bootstrap block into the blockchain + /// Bootstrap blocks don't execute any verifications + fn import_bootstrap_block(&self, bytes: &Block) -> Result; /// Import sealed block. Skips all verifications. fn import_sealed_block(&self, block: &SealedBlock) -> ImportResult; diff --git a/core/src/client/test_client.rs b/core/src/client/test_client.rs index 18e5e48d5c..a2f6484872 100644 --- a/core/src/client/test_client.rs +++ b/core/src/client/test_client.rs @@ -30,7 +30,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use crate::block::{ClosedBlock, OpenBlock, SealedBlock}; +use crate::block::{Block, ClosedBlock, OpenBlock, SealedBlock}; use crate::blockchain_info::BlockChainInfo; use crate::client::{ AccountData, BlockChainClient, BlockChainTrait, BlockProducer, BlockStatus, ConsensusClient, EngineInfo, @@ -503,7 +503,7 @@ impl ImportBlock for TestBlockChainClient { unimplemented!() } - fn import_bootstrap_header(&self, _header: &BlockHeader) -> Result { + fn import_bootstrap_block(&self, _header: &Block) -> Result { unimplemented!() } diff --git a/sync/src/block/extension.rs b/sync/src/block/extension.rs index 12fc479a7d..0cbe2bd30f 100644 --- a/sync/src/block/extension.rs +++ b/sync/src/block/extension.rs @@ -25,7 +25,7 @@ use ccore::{ use cdb::AsHashDB; use cmerkle::snapshot::ChunkDecompressor; use cmerkle::snapshot::Restore as SnapshotRestore; -use cmerkle::{Trie, TrieFactory}; +use cmerkle::{skewed_merkle_root, Trie, TrieFactory}; use cnetwork::{Api, EventSender, NetworkExtension, NodeId}; use cstate::{FindActionHandler, TopStateView}; use ctimer::TimerToken; @@ -62,7 +62,7 @@ pub struct TokenInfo { enum State { SnapshotHeader(BlockHash, u64), SnapshotBody { - block: BlockHash, + header: EncodedHeader, prev_root: H256, }, SnapshotTopChunk { @@ -88,7 +88,7 @@ impl State { let parent = client.block_header(&parent_hash.into()).expect("Parent header of the snapshot header must exist"); return State::SnapshotBody { - block: hash, + header, prev_root: parent.transactions_root(), } } @@ -456,8 +456,29 @@ impl NetworkExtension for Extension { } } State::SnapshotBody { + ref header, .. - } => unimplemented!(), + } => { + for id in &peer_ids { + if let Some(requests) = self.requests.get_mut(id) { + ctrace!(SYNC, "Send snapshot body request to {}", id); + let request = RequestMessage::Bodies(vec![header.hash()]); + let request_id = self.last_request; + self.last_request += 1; + requests.push((request_id, request.clone())); + self.api.send(id, Arc::new(Message::Request(request_id, request).rlp_bytes())); + + let token = &self.tokens[id]; + let token_info = self.tokens_info.get_mut(token).unwrap(); + + let _ = self.api.clear_timer(*token); + self.api + .set_timer_once(*token, Duration::from_millis(SYNC_EXPIRE_REQUEST_INTERVAL)) + .expect("Timer set succeeds"); + token_info.request_id = Some(request_id); + } + } + } State::SnapshotTopChunk { block, ref mut restore, @@ -835,20 +856,11 @@ impl Extension { match self.state { State::SnapshotHeader(hash, _) => match headers { [parent, header] if header.hash() == hash => { - match self.client.import_bootstrap_header(&header) { - Ok(_) | Err(BlockImportError::Import(ImportError::AlreadyInChain)) => { - self.state = State::SnapshotBody { - block: hash, - prev_root: *parent.transactions_root(), - }; - cdebug!(SYNC, "Transitioning state to {:?}", self.state); - } - Err(BlockImportError::Import(ImportError::AlreadyQueued)) => {} - // FIXME: handle import errors - Err(err) => { - cwarn!(SYNC, "Cannot import header({}): {:?}", header.hash(), err); - } - } + self.state = State::SnapshotBody { + header: EncodedHeader::new(header.rlp_bytes().to_vec()), + prev_root: *parent.transactions_root(), + }; + cdebug!(SYNC, "Transitioning state to {:?}", self.state); } _ => cdebug!( SYNC, @@ -908,42 +920,75 @@ impl Extension { fn on_body_response(&mut self, hashes: Vec, bodies: Vec>) { ctrace!(SYNC, "Received body response with lenth({}) {:?}", hashes.len(), hashes); - { - self.body_downloader.import_bodies(hashes, bodies); - let completed = self.body_downloader.drain(); - for (hash, transactions) in completed { - let header = self - .client - .block_header(&BlockId::Hash(hash)) - .expect("Downloaded body's header must exist") - .decode(); - let block = Block { - header, - transactions, - }; - cdebug!(SYNC, "Body download completed for #{}({})", block.header.number(), hash); - match self.client.import_block(block.rlp_bytes(&Seal::With)) { - Err(BlockImportError::Import(ImportError::AlreadyInChain)) => { - cwarn!(SYNC, "Downloaded already existing block({})", hash) - } - Err(BlockImportError::Import(ImportError::AlreadyQueued)) => { - cwarn!(SYNC, "Downloaded already queued in the verification queue({})", hash) - } - Err(err) => { + + match self.state { + State::SnapshotBody { + ref header, + prev_root, + } => { + let body = bodies.first().expect("Body response in SnapshotBody state has only one body"); + let new_root = skewed_merkle_root(prev_root, body.iter().map(Encodable::rlp_bytes)); + if header.transactions_root() == new_root { + let block = Block { + header: header.decode(), + transactions: body.clone(), + }; + match self.client.import_bootstrap_block(&block) { + Ok(_) | Err(BlockImportError::Import(ImportError::AlreadyInChain)) => { + self.state = State::SnapshotTopChunk { + block: header.hash(), + restore: SnapshotRestore::new(header.state_root()), + }; + cdebug!(SYNC, "Transitioning state to {:?}", self.state); + } + Err(BlockImportError::Import(ImportError::AlreadyQueued)) => {} // FIXME: handle import errors - cwarn!(SYNC, "Cannot import block({}): {:?}", hash, err); - break + Err(err) => { + cwarn!(SYNC, "Cannot import header({}): {:?}", header.hash(), err); + } } - _ => {} } } - } + State::Full => { + { + self.body_downloader.import_bodies(hashes, bodies); + let completed = self.body_downloader.drain(); + for (hash, transactions) in completed { + let header = self + .client + .block_header(&BlockId::Hash(hash)) + .expect("Downloaded body's header must exist") + .decode(); + let block = Block { + header, + transactions, + }; + cdebug!(SYNC, "Body download completed for #{}({})", block.header.number(), hash); + match self.client.import_block(block.rlp_bytes(&Seal::With)) { + Err(BlockImportError::Import(ImportError::AlreadyInChain)) => { + cwarn!(SYNC, "Downloaded already existing block({})", hash) + } + Err(BlockImportError::Import(ImportError::AlreadyQueued)) => { + cwarn!(SYNC, "Downloaded already queued in the verification queue({})", hash) + } + Err(err) => { + // FIXME: handle import errors + cwarn!(SYNC, "Cannot import block({}): {:?}", hash, err); + break + } + _ => {} + } + } + } - let mut peer_ids: Vec<_> = self.header_downloaders.keys().cloned().collect(); - peer_ids.shuffle(&mut thread_rng()); + let mut peer_ids: Vec<_> = self.header_downloaders.keys().cloned().collect(); + peer_ids.shuffle(&mut thread_rng()); - for id in peer_ids { - self.send_body_request(&id); + for id in peer_ids { + self.send_body_request(&id); + } + } + _ => {} } } From f8038bf29fccadada5fba476491fd6b696833dd3 Mon Sep 17 00:00:00 2001 From: Joonmo Yang Date: Mon, 25 Nov 2019 12:12:22 +0900 Subject: [PATCH 05/12] Update the best block after importing snapshot chunks --- core/src/blockchain/blockchain.rs | 29 ++++++++++++++++------- core/src/blockchain/headerchain.rs | 30 ++++++++++++----------- core/src/client/client.rs | 18 ++++++++++++-- core/src/client/importer.rs | 38 +++++++++++++++++++++++++----- core/src/client/mod.rs | 14 ++++++++--- core/src/client/test_client.rs | 11 ++++++++- sync/src/block/extension.rs | 24 +++++++++++++++++-- 7 files changed, 127 insertions(+), 37 deletions(-) diff --git a/core/src/blockchain/blockchain.rs b/core/src/blockchain/blockchain.rs index 90a2e06d6b..a206e45f7c 100644 --- a/core/src/blockchain/blockchain.rs +++ b/core/src/blockchain/blockchain.rs @@ -108,7 +108,11 @@ impl BlockChain { } } - pub fn insert_bootstrap_block(&self, batch: &mut DBTransaction, bytes: &[u8]) { + pub fn insert_floating_header(&self, batch: &mut DBTransaction, header: &HeaderView) { + self.headerchain.insert_floating_header(batch, header); + } + + pub fn insert_floating_block(&self, batch: &mut DBTransaction, bytes: &[u8]) { let block = BlockView::new(bytes); let header = block.header_view(); let hash = header.hash(); @@ -120,20 +124,27 @@ impl BlockChain { return } + self.insert_floating_header(batch, &header); + self.body_db.insert_body(batch, &block); + } + + pub fn force_update_best_block(&self, batch: &mut DBTransaction, hash: &BlockHash) { + ctrace!(BLOCKCHAIN, "Forcefully updating the best block to {}", hash); + + assert!(self.is_known(hash)); assert!(self.pending_best_block_hash.read().is_none()); assert!(self.pending_best_proposal_block_hash.read().is_none()); - self.headerchain.insert_bootstrap_header(batch, &header); - self.body_db.insert_body(batch, &block); + let block = self.block(hash).expect("Target block is known"); + self.headerchain.force_update_best_header(batch, hash); self.body_db.update_best_block(batch, &BestBlockChanged::CanonChainAppended { - best_block: bytes.to_vec(), + best_block: block.into_inner(), }); - *self.pending_best_block_hash.write() = Some(hash); - batch.put(db::COL_EXTRA, BEST_BLOCK_KEY, &hash); - - *self.pending_best_proposal_block_hash.write() = Some(hash); - batch.put(db::COL_EXTRA, BEST_PROPOSAL_BLOCK_KEY, &hash); + batch.put(db::COL_EXTRA, BEST_BLOCK_KEY, hash); + *self.pending_best_block_hash.write() = Some(*hash); + batch.put(db::COL_EXTRA, BEST_PROPOSAL_BLOCK_KEY, hash); + *self.pending_best_proposal_block_hash.write() = Some(*hash); } /// Inserts the block into backing cache database. diff --git a/core/src/blockchain/headerchain.rs b/core/src/blockchain/headerchain.rs index 8d4eb8a822..f0a69ab826 100644 --- a/core/src/blockchain/headerchain.rs +++ b/core/src/blockchain/headerchain.rs @@ -113,24 +113,19 @@ impl HeaderChain { } } - /// Inserts a bootstrap header into backing cache database. - /// Makes the imported header the best header. - /// Expects the header to be valid and already verified. + /// Inserts a floating header into backing cache database. + /// Expects the header to be valid. /// If the header is already known, does nothing. - // FIXME: Find better return type. Returning `None` at duplication is not natural - pub fn insert_bootstrap_header(&self, batch: &mut DBTransaction, header: &HeaderView) { + pub fn insert_floating_header(&self, batch: &mut DBTransaction, header: &HeaderView) { let hash = header.hash(); - ctrace!(HEADERCHAIN, "Inserting bootstrap block header #{}({}) to the headerchain.", header.number(), hash); + ctrace!(HEADERCHAIN, "Inserting a floating block header #{}({}) to the headerchain.", header.number(), hash); if self.is_known_header(&hash) { ctrace!(HEADERCHAIN, "Block header #{}({}) is already known.", header.number(), hash); return } - assert!(self.pending_best_header_hash.read().is_none()); - assert!(self.pending_best_proposal_block_hash.read().is_none()); - let compressed_header = compress(header.rlp().as_raw(), blocks_swapper()); batch.put(db::COL_HEADERS, &hash, &compressed_header); @@ -143,11 +138,6 @@ impl HeaderChain { parent: header.parent_hash(), }); - batch.put(db::COL_EXTRA, BEST_HEADER_KEY, &hash); - *self.pending_best_header_hash.write() = Some(hash); - batch.put(db::COL_EXTRA, BEST_PROPOSAL_HEADER_KEY, &hash); - *self.pending_best_proposal_block_hash.write() = Some(hash); - let mut pending_hashes = self.pending_hashes.write(); let mut pending_details = self.pending_details.write(); @@ -155,6 +145,18 @@ impl HeaderChain { batch.extend_with_cache(db::COL_EXTRA, &mut *pending_hashes, new_hashes, CacheUpdatePolicy::Overwrite); } + pub fn force_update_best_header(&self, batch: &mut DBTransaction, hash: &BlockHash) { + ctrace!(HEADERCHAIN, "Forcefully updating the best header to {}", hash); + assert!(self.is_known_header(hash)); + assert!(self.pending_best_header_hash.read().is_none()); + assert!(self.pending_best_proposal_block_hash.read().is_none()); + + batch.put(db::COL_EXTRA, BEST_HEADER_KEY, hash); + *self.pending_best_header_hash.write() = Some(*hash); + batch.put(db::COL_EXTRA, BEST_PROPOSAL_HEADER_KEY, hash); + *self.pending_best_proposal_block_hash.write() = Some(*hash); + } + /// Inserts the header into backing cache database. /// Expects the header to be valid and already verified. /// If the header is already known, does nothing. diff --git a/core/src/client/client.rs b/core/src/client/client.rs index 4676b3bba5..8a649def19 100644 --- a/core/src/client/client.rs +++ b/core/src/client/client.rs @@ -40,6 +40,7 @@ use cstate::{ ActionHandler, AssetScheme, FindActionHandler, OwnedAsset, StateDB, StateResult, Text, TopLevelState, TopStateView, }; use ctimer::{TimeoutHandler, TimerApi, TimerScheduleError, TimerToken}; +use ctypes::header::Header; use ctypes::transaction::{AssetTransferInput, PartialHashing, ShardTransaction}; use ctypes::{BlockHash, BlockNumber, CommonParams, ShardId, Tracker, TxHash}; use cvm::{decode, execute, ChainTimeInfo, ScriptResult, VMConfig}; @@ -635,15 +636,28 @@ impl ImportBlock for Client { Ok(self.importer.header_queue.import(unverified)?) } - fn import_bootstrap_block(&self, block: &Block) -> Result { + fn import_trusted_header(&self, header: &Header) -> Result { + if self.block_chain().is_known_header(&header.hash()) { + return Err(BlockImportError::Import(ImportError::AlreadyInChain)) + } + let import_lock = self.importer.import_lock.lock(); + self.importer.import_trusted_header(header, self, &import_lock); + Ok(header.hash()) + } + + fn import_trusted_block(&self, block: &Block) -> Result { if self.block_chain().is_known(&block.header.hash()) { return Err(BlockImportError::Import(ImportError::AlreadyInChain)) } let import_lock = self.importer.import_lock.lock(); - self.importer.import_bootstrap_block(block, self, &import_lock); + self.importer.import_trusted_block(block, self, &import_lock); Ok(block.header.hash()) } + fn force_update_best_block(&self, hash: &BlockHash) { + self.importer.force_update_best_block(hash, self) + } + fn import_sealed_block(&self, block: &SealedBlock) -> ImportResult { let h = block.header().hash(); let route = { diff --git a/core/src/client/importer.rs b/core/src/client/importer.rs index b922869e1d..5c70c80518 100644 --- a/core/src/client/importer.rs +++ b/core/src/client/importer.rs @@ -359,21 +359,47 @@ impl Importer { imported.len() } - pub fn import_bootstrap_block<'a>(&'a self, block: &'a Block, client: &Client, _importer_lock: &MutexGuard<()>) { + pub fn import_trusted_header<'a>(&'a self, header: &'a Header, client: &Client, _importer_lock: &MutexGuard<()>) { + let hash = header.hash(); + ctrace!(CLIENT, "Importing trusted header #{}-{:?}", header.number(), hash); + + { + let chain = client.block_chain(); + let mut batch = DBTransaction::new(); + chain.insert_floating_header(&mut batch, &HeaderView::new(&header.rlp_bytes())); + client.db().write_buffered(batch); + chain.commit(); + } + client.new_headers(&[hash], &[], &[], &[], &[], None); + + client.db().flush().expect("DB flush failed."); + } + + pub fn import_trusted_block<'a>(&'a self, block: &'a Block, client: &Client, importer_lock: &MutexGuard<()>) { let header = &block.header; let hash = header.hash(); - ctrace!(CLIENT, "Importing bootstrap block #{}-{:?}", header.number(), hash); + ctrace!(CLIENT, "Importing trusted block #{}-{:?}", header.number(), hash); + self.import_trusted_header(header, client, importer_lock); { let chain = client.block_chain(); let mut batch = DBTransaction::new(); - chain.insert_bootstrap_block(&mut batch, &block.rlp_bytes(&Seal::With)); + chain.insert_floating_block(&mut batch, &block.rlp_bytes(&Seal::With)); client.db().write_buffered(batch); chain.commit(); } - client.new_headers(&[hash], &[], &[hash], &[], &[], Some(hash)); - self.miner.chain_new_blocks(client, &[hash], &[], &[hash], &[]); - client.new_blocks(&[hash], &[], &[hash], &[], &[]); + self.miner.chain_new_blocks(client, &[hash], &[], &[], &[]); + client.new_blocks(&[hash], &[], &[], &[], &[]); + + client.db().flush().expect("DB flush failed."); + } + + pub fn force_update_best_block(&self, hash: &BlockHash, client: &Client) { + let chain = client.block_chain(); + let mut batch = DBTransaction::new(); + chain.force_update_best_block(&mut batch, hash); + client.db().write_buffered(batch); + chain.commit(); client.db().flush().expect("DB flush failed."); } diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index 30a09b5226..c14960bd8e 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -40,6 +40,7 @@ use ckey::{Address, NetworkId, PlatformAddress, Public}; use cmerkle::Result as TrieResult; use cnetwork::NodeId; use cstate::{AssetScheme, FindActionHandler, OwnedAsset, StateResult, Text, TopLevelState, TopStateView}; +use ctypes::header::Header; use ctypes::transaction::{AssetTransferInput, PartialHashing, ShardTransaction}; use ctypes::{BlockHash, BlockNumber, CommonParams, ShardId, Tracker, TxHash}; use cvm::ChainTimeInfo; @@ -193,9 +194,16 @@ pub trait ImportBlock { /// Import a header into the blockchain fn import_header(&self, bytes: Bytes) -> Result; - /// Import a trusted bootstrap block into the blockchain - /// Bootstrap blocks don't execute any verifications - fn import_bootstrap_block(&self, bytes: &Block) -> Result; + /// Import a trusted header into the blockchain + /// Trusted header doesn't go through any verifications and doesn't update the best header + fn import_trusted_header(&self, header: &Header) -> Result; + + /// Import a trusted block into the blockchain + /// Trusted block doesn't go through any verifications and doesn't update the best block + fn import_trusted_block(&self, block: &Block) -> Result; + + /// Forcefully update the best block + fn force_update_best_block(&self, hash: &BlockHash); /// Import sealed block. Skips all verifications. fn import_sealed_block(&self, block: &SealedBlock) -> ImportResult; diff --git a/core/src/client/test_client.rs b/core/src/client/test_client.rs index a2f6484872..90131bf8fb 100644 --- a/core/src/client/test_client.rs +++ b/core/src/client/test_client.rs @@ -52,6 +52,7 @@ use cnetwork::NodeId; use cstate::tests::helpers::empty_top_state; use cstate::{FindActionHandler, StateDB, TopLevelState}; use ctimer::{TimeoutHandler, TimerToken}; +use ctypes::header::Header; use ctypes::transaction::{Action, Transaction}; use ctypes::{BlockHash, BlockNumber, CommonParams, Header as BlockHeader, Tracker, TxHash}; use cvm::ChainTimeInfo; @@ -503,7 +504,15 @@ impl ImportBlock for TestBlockChainClient { unimplemented!() } - fn import_bootstrap_block(&self, _header: &Block) -> Result { + fn import_trusted_header(&self, _header: &Header) -> Result { + unimplemented!() + } + + fn import_trusted_block(&self, _block: &Block) -> Result { + unimplemented!() + } + + fn force_update_best_block(&self, _hash: &BlockHash) { unimplemented!() } diff --git a/sync/src/block/extension.rs b/sync/src/block/extension.rs index 0cbe2bd30f..4d54f622b6 100644 --- a/sync/src/block/extension.rs +++ b/sync/src/block/extension.rs @@ -486,6 +486,7 @@ impl NetworkExtension for Extension { if let Some(root) = restore.next_to_feed() { self.send_chunk_request(&block, &root); } else { + self.client.force_update_best_block(&block); self.transition_to_full(); } } @@ -856,6 +857,24 @@ impl Extension { match self.state { State::SnapshotHeader(hash, _) => match headers { [parent, header] if header.hash() == hash => { + match self.client.import_trusted_header(parent) { + Ok(_) + | Err(BlockImportError::Import(ImportError::AlreadyInChain)) + | Err(BlockImportError::Import(ImportError::AlreadyQueued)) => {} + Err(err) => { + cwarn!(SYNC, "Cannot import header({}): {:?}", parent.hash(), err); + return + } + } + match self.client.import_trusted_header(header) { + Ok(_) + | Err(BlockImportError::Import(ImportError::AlreadyInChain)) + | Err(BlockImportError::Import(ImportError::AlreadyQueued)) => {} + Err(err) => { + cwarn!(SYNC, "Cannot import header({}): {:?}", header.hash(), err); + return + } + } self.state = State::SnapshotBody { header: EncodedHeader::new(header.rlp_bytes().to_vec()), prev_root: *parent.transactions_root(), @@ -933,7 +952,7 @@ impl Extension { header: header.decode(), transactions: body.clone(), }; - match self.client.import_bootstrap_block(&block) { + match self.client.import_trusted_block(&block) { Ok(_) | Err(BlockImportError::Import(ImportError::AlreadyInChain)) => { self.state = State::SnapshotTopChunk { block: header.hash(), @@ -944,7 +963,7 @@ impl Extension { Err(BlockImportError::Import(ImportError::AlreadyQueued)) => {} // FIXME: handle import errors Err(err) => { - cwarn!(SYNC, "Cannot import header({}): {:?}", header.hash(), err); + cwarn!(SYNC, "Cannot import block({}): {:?}", header.hash(), err); } } } @@ -1044,6 +1063,7 @@ impl Extension { if let Some(root) = restore.next_to_feed() { self.send_chunk_request(&block, &root); } else { + self.client.force_update_best_block(&block); self.transition_to_full(); } } From 9455402efe3c6d9eef5c1018062f71fbb2832442 Mon Sep 17 00:00:00 2001 From: SeongChan Lee Date: Wed, 4 Dec 2019 19:19:38 +0900 Subject: [PATCH 06/12] Change sync extension to update pivot after transitioning to Full --- sync/src/block/downloader/header.rs | 4 ++++ sync/src/block/extension.rs | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/sync/src/block/downloader/header.rs b/sync/src/block/downloader/header.rs index 105233109f..81399bfbb4 100644 --- a/sync/src/block/downloader/header.rs +++ b/sync/src/block/downloader/header.rs @@ -60,6 +60,10 @@ impl HeaderDownloader { } } + pub fn update_pivot(&mut self, hash: BlockHash) { + self.pivot = hash; + } + pub fn best_hash(&self) -> BlockHash { self.best_hash } diff --git a/sync/src/block/extension.rs b/sync/src/block/extension.rs index 4d54f622b6..4764ffcc40 100644 --- a/sync/src/block/extension.rs +++ b/sync/src/block/extension.rs @@ -1071,6 +1071,10 @@ impl Extension { fn transition_to_full(&mut self) { cdebug!(SYNC, "Transitioning state to {:?}", State::Full); + let best_hash = self.client.best_block_header().hash(); + for downloader in self.header_downloaders.values_mut() { + downloader.update_pivot(best_hash); + } self.state = State::Full; } } From e69f2e3ce2dd58180ebb55d6d0281dea774b9de3 Mon Sep 17 00:00:00 2001 From: SeongChan Lee Date: Tue, 3 Dec 2019 16:38:46 +0900 Subject: [PATCH 07/12] Fix to receive peer requests from just connected nodes All `self.header_downloaders.keys()` are in `connected_nodes`, and `self.header_downloaders` keep tracks of nodes which have sent at least one peer status out of connected nodes. The following commit will prohibit the node sending peer requests before it finishes to snapshot sync, so it can send peer requests before share peer status. So this changes make the node accept requests from the node who didn't send any peer status. --- sync/src/block/extension.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sync/src/block/extension.rs b/sync/src/block/extension.rs index 4764ffcc40..a457df7c2f 100644 --- a/sync/src/block/extension.rs +++ b/sync/src/block/extension.rs @@ -643,7 +643,7 @@ impl Extension { } fn on_peer_request(&self, from: &NodeId, id: u64, request: RequestMessage) { - if !self.header_downloaders.contains_key(from) { + if !self.connected_nodes.contains(from) { cinfo!(SYNC, "Request from invalid peer #{} received", from); return } From a9e63ed62b0532b853447dfd2b6ff8df24f36851 Mon Sep 17 00:00:00 2001 From: SeongChan Lee Date: Tue, 3 Dec 2019 16:41:54 +0900 Subject: [PATCH 08/12] Defer sending peer status after transitioning to full sync mode --- sync/src/block/extension.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/sync/src/block/extension.rs b/sync/src/block/extension.rs index a457df7c2f..8ce76a1271 100644 --- a/sync/src/block/extension.rs +++ b/sync/src/block/extension.rs @@ -40,6 +40,7 @@ use rlp::{Encodable, Rlp}; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::fs; +use std::mem::discriminant; use std::sync::Arc; use std::time::Duration; use token_generator::TokenGenerator; @@ -193,6 +194,10 @@ impl Extension { } fn send_status(&mut self, id: &NodeId) { + if discriminant(&self.state) != discriminant(&State::Full) { + return + } + let chain_info = self.client.chain_info(); self.api.send( id, @@ -209,6 +214,10 @@ impl Extension { } fn send_status_broadcast(&mut self) { + if discriminant(&self.state) != discriminant(&State::Full) { + return + } + let chain_info = self.client.chain_info(); for id in self.connected_nodes.iter() { self.api.send( @@ -1076,6 +1085,7 @@ impl Extension { downloader.update_pivot(best_hash); } self.state = State::Full; + self.send_status_broadcast(); } } From 9978f6138c8fd2062aab292e066fcc5fc9b171b1 Mon Sep 17 00:00:00 2001 From: Joonmo Yang Date: Fri, 13 Dec 2019 13:59:24 +0900 Subject: [PATCH 09/12] Refactor on_chunk_response --- sync/src/block/extension.rs | 91 +++++++++++++++++++------------------ 1 file changed, 46 insertions(+), 45 deletions(-) diff --git a/sync/src/block/extension.rs b/sync/src/block/extension.rs index 8ce76a1271..72e8076ed1 100644 --- a/sync/src/block/extension.rs +++ b/sync/src/block/extension.rs @@ -1022,59 +1022,60 @@ impl Extension { fn on_chunk_response(&mut self, from: &NodeId, roots: &[H256], chunks: &[Vec]) { assert_eq!(roots.len(), chunks.len()); - if let State::SnapshotTopChunk { - block, - ref mut restore, - } = self.state - { - for (r, c) in roots.iter().zip(chunks) { - if c.is_empty() { - cdebug!(SYNC, "Peer {} sent empty response for chunk request {}", from, r); + let (block, restore) = match self.state { + State::SnapshotTopChunk { + block, + ref mut restore, + } => (block, restore), + _ => return, + }; + for (r, c) in roots.iter().zip(chunks) { + if c.is_empty() { + cdebug!(SYNC, "Peer {} sent empty response for chunk request {}", from, r); + continue + } + let decompressor = ChunkDecompressor::from_slice(c); + let raw_chunk = match decompressor.decompress() { + Ok(chunk) => chunk, + Err(e) => { + cwarn!(SYNC, "Decode failed for chunk response from peer {}: {}", from, e); continue } - let decompressor = ChunkDecompressor::from_slice(c); - let raw_chunk = match decompressor.decompress() { - Ok(chunk) => chunk, - Err(e) => { - cwarn!(SYNC, "Decode failed for chunk response from peer {}: {}", from, e); - continue - } - }; - let recovered = match raw_chunk.recover(*r) { - Ok(chunk) => chunk, + }; + let recovered = match raw_chunk.recover(*r) { + Ok(chunk) => chunk, + Err(e) => { + cwarn!(SYNC, "Invalid chunk response from peer {}: {}", from, e); + continue + } + }; + + let batch = { + let mut state_db = self.client.state_db().write(); + let hash_db = state_db.as_hashdb_mut(); + restore.feed(hash_db, recovered); + + let mut batch = DBTransaction::new(); + match state_db.journal_under(&mut batch, 0, H256::zero()) { + Ok(_) => batch, Err(e) => { - cwarn!(SYNC, "Invalid chunk response from peer {}: {}", from, e); + cwarn!(SYNC, "Failed to write state chunk to database: {}", e); continue } - }; - - let batch = { - let mut state_db = self.client.state_db().write(); - let hash_db = state_db.as_hashdb_mut(); - restore.feed(hash_db, recovered); - - let mut batch = DBTransaction::new(); - match state_db.journal_under(&mut batch, 0, H256::zero()) { - Ok(_) => batch, - Err(e) => { - cwarn!(SYNC, "Failed to write state chunk to database: {}", e); - continue - } - } - }; - self.client.db().write_buffered(batch); - match self.client.db().flush() { - Ok(_) => cdebug!(SYNC, "Wrote state chunk to database: {}", r), - Err(e) => cwarn!(SYNC, "Failed to flush database: {}", e), } + }; + self.client.db().write_buffered(batch); + match self.client.db().flush() { + Ok(_) => cdebug!(SYNC, "Wrote state chunk to database: {}", r), + Err(e) => cwarn!(SYNC, "Failed to flush database: {}", e), } + } - if let Some(root) = restore.next_to_feed() { - self.send_chunk_request(&block, &root); - } else { - self.client.force_update_best_block(&block); - self.transition_to_full(); - } + if let Some(root) = restore.next_to_feed() { + self.send_chunk_request(&block, &root); + } else { + self.client.force_update_best_block(&block); + self.transition_to_full(); } } From 7c7171211a79af35ebb2478b385667c347baa91b Mon Sep 17 00:00:00 2001 From: Joonmo Yang Date: Fri, 13 Dec 2019 15:37:17 +0900 Subject: [PATCH 10/12] Refactor sync state transition --- sync/src/block/extension.rs | 81 ++++++++++++++++++++++++++----------- 1 file changed, 57 insertions(+), 24 deletions(-) diff --git a/sync/src/block/extension.rs b/sync/src/block/extension.rs index 72e8076ed1..86fa80078c 100644 --- a/sync/src/block/extension.rs +++ b/sync/src/block/extension.rs @@ -122,6 +122,33 @@ impl State { State::Full } + + fn next(&self, client: &Client) -> Self { + match self { + State::SnapshotHeader(hash, _) => { + let header = client.block_header(&(*hash).into()).expect("Snapshot header is imported"); + let parent = client + .block_header(&header.parent_hash().into()) + .expect("Parent of the snapshot header must be imported"); + State::SnapshotBody { + header, + prev_root: parent.transactions_root(), + } + } + State::SnapshotBody { + header, + .. + } => State::SnapshotTopChunk { + block: header.hash(), + restore: SnapshotRestore::new(header.state_root()), + }, + State::SnapshotTopChunk { + .. + } => unimplemented!(), + State::SnapshotShardChunk(..) => State::Full, + State::Full => State::Full, + } + } } pub struct Extension { @@ -187,6 +214,32 @@ impl Extension { } } + fn move_state(&mut self) { + let next_state = self.state.next(&self.client); + cdebug!(SYNC, "Transitioning the state to {:?}", next_state); + if let State::Full = next_state { + let best_hash = match &self.state { + State::SnapshotHeader(hash, _) => *hash, + State::SnapshotBody { + header, + .. + } => header.hash(), + State::SnapshotTopChunk { + block, + .. + } => *block, + State::SnapshotShardChunk(..) => unimplemented!(), + State::Full => unreachable!("Trying to transition state from State::Full"), + }; + self.client.force_update_best_block(&best_hash); + for downloader in self.header_downloaders.values_mut() { + downloader.update_pivot(best_hash); + } + self.send_status_broadcast(); + } + self.state = next_state; + } + fn dismiss_request(&mut self, id: &NodeId, request_id: u64) { if let Some(requests) = self.requests.get_mut(id) { requests.retain(|(i, _)| *i != request_id); @@ -495,8 +548,7 @@ impl NetworkExtension for Extension { if let Some(root) = restore.next_to_feed() { self.send_chunk_request(&block, &root); } else { - self.client.force_update_best_block(&block); - self.transition_to_full(); + self.move_state(); } } State::SnapshotShardChunk(..) => unimplemented!(), @@ -884,11 +936,7 @@ impl Extension { return } } - self.state = State::SnapshotBody { - header: EncodedHeader::new(header.rlp_bytes().to_vec()), - prev_root: *parent.transactions_root(), - }; - cdebug!(SYNC, "Transitioning state to {:?}", self.state); + self.move_state(); } _ => cdebug!( SYNC, @@ -963,11 +1011,7 @@ impl Extension { }; match self.client.import_trusted_block(&block) { Ok(_) | Err(BlockImportError::Import(ImportError::AlreadyInChain)) => { - self.state = State::SnapshotTopChunk { - block: header.hash(), - restore: SnapshotRestore::new(header.state_root()), - }; - cdebug!(SYNC, "Transitioning state to {:?}", self.state); + self.move_state(); } Err(BlockImportError::Import(ImportError::AlreadyQueued)) => {} // FIXME: handle import errors @@ -1074,20 +1118,9 @@ impl Extension { if let Some(root) = restore.next_to_feed() { self.send_chunk_request(&block, &root); } else { - self.client.force_update_best_block(&block); - self.transition_to_full(); + self.move_state(); } } - - fn transition_to_full(&mut self) { - cdebug!(SYNC, "Transitioning state to {:?}", State::Full); - let best_hash = self.client.best_block_header().hash(); - for downloader in self.header_downloaders.values_mut() { - downloader.update_pivot(best_hash); - } - self.state = State::Full; - self.send_status_broadcast(); - } } pub struct BlockSyncSender(EventSender); From 6759b5d8f19d76a592200622205cdb7414881970 Mon Sep 17 00:00:00 2001 From: Joonmo Yang Date: Fri, 13 Dec 2019 16:31:31 +0900 Subject: [PATCH 11/12] Import shard trie chunks in sync extension --- sync/src/block/extension.rs | 85 ++++++++++++++++++++++++++++++------- 1 file changed, 70 insertions(+), 15 deletions(-) diff --git a/sync/src/block/extension.rs b/sync/src/block/extension.rs index 86fa80078c..ae1bb7d708 100644 --- a/sync/src/block/extension.rs +++ b/sync/src/block/extension.rs @@ -27,7 +27,7 @@ use cmerkle::snapshot::ChunkDecompressor; use cmerkle::snapshot::Restore as SnapshotRestore; use cmerkle::{skewed_merkle_root, Trie, TrieFactory}; use cnetwork::{Api, EventSender, NetworkExtension, NodeId}; -use cstate::{FindActionHandler, TopStateView}; +use cstate::{FindActionHandler, TopLevelState, TopStateView}; use ctimer::TimerToken; use ctypes::header::{Header, Seal}; use ctypes::transaction::Action; @@ -70,7 +70,11 @@ enum State { block: BlockHash, restore: SnapshotRestore, }, - SnapshotShardChunk(ShardId, H256), + SnapshotShardChunk { + block: BlockHash, + shard_id: ShardId, + restore: SnapshotRestore, + }, Full, } @@ -117,7 +121,11 @@ impl State { } }); if let Some((shard_id, shard_root)) = empty_shard { - return State::SnapshotShardChunk(shard_id, shard_root) + return State::SnapshotShardChunk { + block: hash, + shard_id, + restore: SnapshotRestore::new(shard_root), + } } State::Full @@ -143,9 +151,43 @@ impl State { restore: SnapshotRestore::new(header.state_root()), }, State::SnapshotTopChunk { + block, + .. + } => { + let header = client.block_header(&(*block).into()).expect("Snapshot header must exist"); + let state_root = header.state_root(); + let state_db = client.state_db().read(); + let top_state = TopLevelState::from_existing(state_db.clone(&state_root), state_root).unwrap(); + let shard_root = top_state.shard_root(0).unwrap().expect("Shard 0 always exists"); + State::SnapshotShardChunk { + block: *block, + shard_id: 0, + restore: SnapshotRestore::new(shard_root), + } + } + State::SnapshotShardChunk { + block, + shard_id, .. - } => unimplemented!(), - State::SnapshotShardChunk(..) => State::Full, + } => { + let top_state = client.state_at((*block).into()).expect("State at the snapshot header must exist"); + let metadata = top_state.metadata().unwrap().expect("Metadata must exist for snapshot block"); + let shard_num = *metadata.number_of_shards(); + if shard_id + 1 == shard_num { + State::Full + } else { + let next_shard = shard_id + 1; + let shard_root = top_state + .shard_root(next_shard) + .expect("Top level state must be valid") + .expect("Shard root must exist"); + State::SnapshotShardChunk { + block: *block, + shard_id: next_shard, + restore: SnapshotRestore::new(shard_root), + } + } + } State::Full => State::Full, } } @@ -228,8 +270,11 @@ impl Extension { block, .. } => *block, - State::SnapshotShardChunk(..) => unimplemented!(), - State::Full => unreachable!("Trying to transition state from State::Full"), + State::SnapshotShardChunk { + block, + .. + } => *block, + State::Full => panic!("Trying to transit the state from State::Full"), }; self.client.force_update_best_block(&best_hash); for downloader in self.header_downloaders.values_mut() { @@ -551,7 +596,17 @@ impl NetworkExtension for Extension { self.move_state(); } } - State::SnapshotShardChunk(..) => unimplemented!(), + State::SnapshotShardChunk { + block, + ref mut restore, + .. + } => { + if let Some(root) = restore.next_to_feed() { + self.send_chunk_request(&block, &root); + } else { + self.move_state(); + } + } State::Full => { for id in &peer_ids { let request = @@ -946,13 +1001,6 @@ impl Extension { headers.len() ), }, - State::SnapshotBody { - .. - } => {} - State::SnapshotTopChunk { - .. - } => {} - State::SnapshotShardChunk(..) => {} State::Full => { let (mut completed, peer_is_caught_up) = if let Some(peer) = self.header_downloaders.get_mut(from) { let encoded: Vec<_> = headers.iter().map(|h| EncodedHeader::new(h.rlp_bytes().to_vec())).collect(); @@ -991,6 +1039,7 @@ impl Extension { } } } + _ => {} } } @@ -1071,8 +1120,14 @@ impl Extension { block, ref mut restore, } => (block, restore), + State::SnapshotShardChunk { + block, + ref mut restore, + .. + } => (block, restore), _ => return, }; + assert_eq!(roots.len(), chunks.len()); for (r, c) in roots.iter().zip(chunks) { if c.is_empty() { cdebug!(SYNC, "Peer {} sent empty response for chunk request {}", from, r); From 4c05615881efe5065e0d364c045cad29647fa3d0 Mon Sep 17 00:00:00 2001 From: SeongChan Lee Date: Fri, 6 Dec 2019 14:53:13 +0900 Subject: [PATCH 12/12] Add snapshot sync test with Tendermint dynamic validator --- test/src/e2e.dynval/2/snapshot.test.ts | 208 +++++++++++++++++++++ test/src/e2e.dynval/setup.ts | 37 +++- test/tendermint.dynval/snapshot-config.yml | 16 ++ 3 files changed, 251 insertions(+), 10 deletions(-) create mode 100644 test/src/e2e.dynval/2/snapshot.test.ts create mode 100644 test/tendermint.dynval/snapshot-config.yml diff --git a/test/src/e2e.dynval/2/snapshot.test.ts b/test/src/e2e.dynval/2/snapshot.test.ts new file mode 100644 index 0000000000..95285eb081 --- /dev/null +++ b/test/src/e2e.dynval/2/snapshot.test.ts @@ -0,0 +1,208 @@ +// Copyright 2019 Kodebox, Inc. +// This file is part of CodeChain. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +import * as chai from "chai"; +import { expect } from "chai"; +import * as chaiAsPromised from "chai-as-promised"; +import { SDK } from "codechain-sdk"; +import * as stake from "codechain-stakeholder-sdk"; +import * as fs from "fs"; +import "mocha"; +import * as path from "path"; + +import mkdirp = require("mkdirp"); +import { validators } from "../../../tendermint.dynval/constants"; +import { faucetAddress, faucetSecret } from "../../helper/constants"; +import { PromiseExpect } from "../../helper/promise"; +import CodeChain, { Signer } from "../../helper/spawn"; +import { setTermTestTimeout, withNodes } from "../setup"; + +chai.use(chaiAsPromised); + +const SNAPSHOT_CONFIG = `${__dirname}/../../../tendermint.dynval/snapshot-config.yml`; +const SNAPSHOT_PATH = `${__dirname}/../../../../snapshot/`; + +describe("Snapshot for Tendermint with Dynamic Validator", function() { + const promiseExpect = new PromiseExpect(); + const snapshotValidators = validators.slice(0, 3); + const freshNodeValidator = validators[3]; + const { nodes } = withNodes(this, { + promiseExpect, + overrideParams: { + maxNumOfValidators: 3 + }, + validators: snapshotValidators.map((signer, index) => ({ + signer, + delegation: 5000, + deposit: 10_000_000 - index // tie-breaker + })), + modify: () => { + mkdirp.sync(SNAPSHOT_PATH); + const snapshotPath = fs.mkdtempSync(SNAPSHOT_PATH); + return { + additionalArgv: [ + "--snapshot-path", + snapshotPath, + "--config", + SNAPSHOT_CONFIG + ], + nodeAdditionalProperties: { + snapshotPath + } + }; + } + }); + + it("should be exist after some time", async function() { + const termWaiter = setTermTestTimeout(this, { + terms: 2 + }); + const termMetadata = await termWaiter.waitNodeUntilTerm(nodes[0], { + target: 2, + termPeriods: 1 + }); + const snapshotBlock = await getSnapshotBlock(nodes[0], termMetadata); + expect( + path.join( + nodes[0].snapshotPath, + snapshotBlock.hash.toString(), + snapshotBlock.stateRoot.toString() + ) + ).to.satisfy(fs.existsSync); + }); + + it("should be able to boot with the snapshot", async function() { + const termWaiter = setTermTestTimeout(this, { + terms: 3 + }); + const termMetadata1 = await termWaiter.waitNodeUntilTerm(nodes[0], { + target: 2, + termPeriods: 1 + }); + const snapshotBlock = await getSnapshotBlock(nodes[0], termMetadata1); + await makeItValidator(nodes[0], freshNodeValidator); + const snapshotPath = fs.mkdtempSync(SNAPSHOT_PATH); + const node = new CodeChain({ + chain: `${__dirname}/../../scheme/tendermint-dynval.json`, + argv: [ + "--engine-signer", + freshNodeValidator.platformAddress.toString(), + "--password-path", + `test/tendermint.dynval/${freshNodeValidator.platformAddress.value}/password.json`, + "--force-sealing", + "--snapshot-path", + snapshotPath, + "--config", + SNAPSHOT_CONFIG, + "--snapshot-hash", + snapshotBlock.hash.toString(), + "--snapshot-number", + snapshotBlock.number.toString() + ], + additionalKeysPath: `tendermint.dynval/${freshNodeValidator.platformAddress.value}/keys` + }); + try { + await node.start(); + await node.connect(nodes[0]); + await termWaiter.waitNodeUntilTerm(node, { + target: 4, + termPeriods: 2 + }); + + await freshValidatorCheck(nodes[0].sdk); + + expect(await node.sdk.rpc.chain.getBlock(snapshotBlock.number - 1)) + .to.be.null; + expect(await node.sdk.rpc.chain.getBlock(snapshotBlock.number)).not + .to.be.null; + // Check that the freshNodeValidator is still a validator & make sure it doesn't have a block/header before termMetadata1. + } catch (e) { + node.keepLogs(); + throw e; + } finally { + await node.clean(); + } + }); + + afterEach(async function() { + promiseExpect.checkFulfilled(); + }); + + async function freshValidatorCheck(sdk: SDK) { + const blockNumber = await sdk.rpc.chain.getBestBlockNumber(); + const termMedata = await stake.getTermMetadata(sdk, blockNumber); + const currentTermInitialBlockNumber = + termMedata!.lastTermFinishedBlockNumber + 1; + const validatorsAfter = (await stake.getPossibleAuthors( + sdk, + currentTermInitialBlockNumber + ))!.map(platformAddr => platformAddr.toString()); + + expect(validatorsAfter).and.contains( + freshNodeValidator.platformAddress.toString() + ); + } +}); + +async function getSnapshotBlock( + node: CodeChain, + termMetadata: stake.TermMetadata +) { + const blockNumber = termMetadata.lastTermFinishedBlockNumber + 1; + await node.waitBlockNumber(blockNumber); + return (await node.sdk.rpc.chain.getBlock(blockNumber))!; +} + +async function makeItValidator(node: CodeChain, freshNodeValidator: Signer) { + const faucetSeq = await node.sdk.rpc.chain.getSeq(faucetAddress); + const payTx = node.sdk.core + .createPayTransaction({ + recipient: freshNodeValidator.platformAddress, + quantity: 200000000 + }) + .sign({ + secret: faucetSecret, + seq: faucetSeq, + fee: 10 + }); + await node.waitForTx(await node.sdk.rpc.chain.sendSignedTransaction(payTx)); + const selfNominateTx = stake + .createSelfNominateTransaction(node.sdk, 10000000, "") + .sign({ + secret: freshNodeValidator.privateKey, + seq: await node.sdk.rpc.chain.getSeq( + freshNodeValidator.platformAddress + ), + fee: 10 + }); + await node.waitForTx( + await node.sdk.rpc.chain.sendSignedTransaction(selfNominateTx) + ); + const delegateTx = stake + .createDelegateCCSTransaction( + node.sdk, + freshNodeValidator.platformAddress, + 10000 + ) + .sign({ + secret: faucetSecret, + seq: faucetSeq + 1, + fee: 10 + }); + await node.waitForTx( + await node.sdk.rpc.chain.sendSignedTransaction(delegateTx) + ); +} diff --git a/test/src/e2e.dynval/setup.ts b/test/src/e2e.dynval/setup.ts index 2c4aefa645..6cc695e2fb 100644 --- a/test/src/e2e.dynval/setup.ts +++ b/test/src/e2e.dynval/setup.ts @@ -39,17 +39,29 @@ interface ValidatorConfig { delegation?: U64Value; } -export function withNodes( +interface NodePropertyModifier { + additionalArgv: string[]; + nodeAdditionalProperties: T; +} + +export function withNodes( suite: Suite, options: { promiseExpect: PromiseExpect; validators: ValidatorConfig[]; overrideParams?: Partial; onBeforeEnable?: (nodes: CodeChain[]) => Promise; + modify?: (signer: Signer, index: number) => NodePropertyModifier; } ) { - const nodes: CodeChain[] = []; - const { overrideParams = {} } = options; + const nodes: (CodeChain & T)[] = []; + const { + overrideParams = {}, + modify = () => ({ + additionalArgv: [], + nodeAdditionalProperties: {} as T + }) + } = options; const initialParams = { ...defaultParams, ...overrideParams @@ -62,7 +74,8 @@ export function withNodes( nodes.length = 0; const newNodes = await createNodes({ ...options, - initialParams + initialParams, + modify }); nodes.push(...newNodes); }); @@ -95,14 +108,15 @@ export function findNode(nodes: CodeChain[], signer: Signer) { ); } -async function createNodes(options: { +async function createNodes(options: { promiseExpect: PromiseExpect; validators: ValidatorConfig[]; initialParams: CommonParams; onBeforeEnable?: (nodes: CodeChain[]) => Promise; -}): Promise { + modify: (signer: Signer, index: number) => NodePropertyModifier; +}): Promise<(CodeChain & T)[]> { const chain = `${__dirname}/../scheme/tendermint-dynval.json`; - const { promiseExpect, validators, initialParams } = options; + const { promiseExpect, validators, initialParams, modify } = options; const initialNodes: CodeChain[] = []; const initialValidators = [ @@ -124,20 +138,23 @@ async function createNodes(options: { }); } - const nodes: CodeChain[] = []; + const nodes: (CodeChain & T)[] = []; for (let i = 0; i < validators.length; i++) { const { signer: validator } = validators[i]; - nodes[i] = new CodeChain({ + const modifier = modify(validator, i); + const node = new CodeChain({ chain, argv: [ "--engine-signer", validator.platformAddress.value, "--password-path", `test/tendermint.dynval/${validator.platformAddress.value}/password.json`, - "--force-sealing" + "--force-sealing", + ...modifier.additionalArgv ], additionalKeysPath: `tendermint.dynval/${validator.platformAddress.value}/keys` }); + nodes[i] = Object.assign(node, modifier.nodeAdditionalProperties); nodes[i].signer = validator; } let bootstrapFailed = false; diff --git a/test/tendermint.dynval/snapshot-config.yml b/test/tendermint.dynval/snapshot-config.yml new file mode 100644 index 0000000000..ed49cc5414 --- /dev/null +++ b/test/tendermint.dynval/snapshot-config.yml @@ -0,0 +1,16 @@ +[codechain] + +[mining] + +[network] + +[rpc] + +[ipc] + +[ws] + +[snapshot] +disable = false + +[email_alarm]