From 983b60bb0be2a87b09c2f475c4e4205fbfe38b0a Mon Sep 17 00:00:00 2001 From: Seulgi Kim Date: Sun, 26 Jan 2020 19:21:33 +0900 Subject: [PATCH 1/2] Simplify BodyDownloader by removing is_empty field Once we introduce the CloseBlock transaction, there will be no empty block. --- sync/src/block/downloader/body.rs | 53 +++++++------------------------ sync/src/block/extension.rs | 6 ++-- 2 files changed, 14 insertions(+), 45 deletions(-) diff --git a/sync/src/block/downloader/body.rs b/sync/src/block/downloader/body.rs index cc46e4d418..8752ceb41b 100644 --- a/sync/src/block/downloader/body.rs +++ b/sync/src/block/downloader/body.rs @@ -19,15 +19,9 @@ use ccore::UnverifiedTransaction; use ctypes::{BlockHash, Header}; use std::collections::{HashMap, HashSet}; -#[derive(Clone)] -struct Target { - hash: BlockHash, - is_empty: bool, -} - #[derive(Default)] pub struct BodyDownloader { - targets: Vec, + targets: Vec, downloading: HashSet, downloaded: HashMap>, } @@ -37,8 +31,8 @@ impl BodyDownloader { const MAX_BODY_REQEUST_LENGTH: usize = 128; let mut hashes = Vec::new(); for t in &self.targets { - if !self.downloading.contains(&t.hash) && !self.downloaded.contains_key(&t.hash) { - hashes.push(t.hash); + if !self.downloading.contains(t) && !self.downloaded.contains_key(t) { + hashes.push(*t); } if hashes.len() >= MAX_BODY_REQEUST_LENGTH { break @@ -53,18 +47,8 @@ impl BodyDownloader { } pub fn import_bodies(&mut self, hashes: Vec, bodies: Vec>) { - let empty_targets: HashSet<_> = self.targets.iter().filter(|t| t.is_empty).map(|t| t.hash).collect(); for (hash, body) in hashes.into_iter().zip(bodies) { if self.downloading.remove(&hash) { - if body.is_empty() { - if !empty_targets.contains(&hash) { - cwarn!(SYNC, "Invalid body of {}. It should be not empty.", hash); - continue - } - } else if empty_targets.contains(&hash) { - cwarn!(SYNC, "Invalid body of {}. It should be empty.", hash); - continue - } self.downloaded.insert(hash, body); } } @@ -72,15 +56,12 @@ impl BodyDownloader { } pub fn get_target_hashes(&self) -> Vec { - self.targets.iter().map(|t| t.hash).collect() + self.targets.iter().map(Clone::clone).collect() } pub fn add_target(&mut self, header: &Header) { cdebug!(SYNC, "Add download target: {}", header.hash()); - self.targets.push(Target { - hash: header.hash(), - is_empty: header.is_empty(), - }); + self.targets.push(header.hash()); } pub fn remove_target(&mut self, targets: &[BlockHash]) { @@ -89,7 +70,7 @@ impl BodyDownloader { } cdebug!(SYNC, "Remove download targets: {:?}", targets); for hash in targets { - if let Some(index) = self.targets.iter().position(|t| t.hash == *hash) { + if let Some(index) = self.targets.iter().position(|t| t == hash) { self.targets.remove(index); } self.downloading.remove(hash); @@ -111,8 +92,8 @@ impl BodyDownloader { pub fn drain(&mut self) -> Vec<(BlockHash, Vec)> { let mut result = Vec::new(); for t in &self.targets { - if let Some(body) = self.downloaded.remove(&t.hash) { - result.push((t.hash, body)); + if let Some(body) = self.downloaded.remove(t) { + result.push((*t, body)); } else { break } @@ -123,23 +104,11 @@ impl BodyDownloader { result } - pub fn re_request( - &mut self, - hash: BlockHash, - is_empty: bool, - remains: Vec<(BlockHash, Vec)>, - ) { - let mut new_targets = vec![Target { - hash, - is_empty, - }]; + pub fn re_request(&mut self, hash: BlockHash, remains: Vec<(BlockHash, Vec)>) { + let mut new_targets = vec![hash]; new_targets.extend(remains.into_iter().map(|(hash, transactions)| { - let is_empty = transactions.is_empty(); self.downloaded.insert(hash, transactions); - Target { - hash, - is_empty, - } + hash })); new_targets.append(&mut self.targets); self.targets = new_targets; diff --git a/sync/src/block/extension.rs b/sync/src/block/extension.rs index 4d670674a6..0cc75d0c6b 100644 --- a/sync/src/block/extension.rs +++ b/sync/src/block/extension.rs @@ -1051,7 +1051,7 @@ impl Extension { skewed_merkle_root(BLAKE_NULL_RLP, transactions.iter().map(Encodable::rlp_bytes)); if *header.transactions_root() != calculated_transactions_root { cwarn!(SYNC, "Received corrupted body for ${}({}", header.number(), hash); - error_target = Some((hash, transactions.is_empty())); + error_target = Some(hash); continue } @@ -1075,8 +1075,8 @@ impl Extension { _ => {} } } - if let Some((hash, is_empty)) = error_target { - self.body_downloader.re_request(hash, is_empty, remains); + if let Some(hash) = error_target { + self.body_downloader.re_request(hash, remains); } } From d280a45ff7c4923de4c9c31cdbc602b79cd719e7 Mon Sep 17 00:00:00 2001 From: Seulgi Kim Date: Sun, 26 Jan 2020 22:40:55 +0900 Subject: [PATCH 2/2] Refactor BodyDownloader There are 5 states for block hashes. 1. listed, but no request sent 2. sent the request 3. received the response 4. drained from the list 5. imported on the chain Currently, these states are implicit with the combination of targets, downloading, and downloaded fields. This patch merges downloading and downloaded fields, and make the states explicit. --- sync/src/block/downloader/body.rs | 125 ++++++++++++++++++++---------- sync/src/block/extension.rs | 30 ++++--- 2 files changed, 103 insertions(+), 52 deletions(-) diff --git a/sync/src/block/downloader/body.rs b/sync/src/block/downloader/body.rs index 8752ceb41b..ff1390da22 100644 --- a/sync/src/block/downloader/body.rs +++ b/sync/src/block/downloader/body.rs @@ -16,14 +16,31 @@ use super::super::message::RequestMessage; use ccore::UnverifiedTransaction; -use ctypes::{BlockHash, Header}; -use std::collections::{HashMap, HashSet}; +use ctypes::BlockHash; +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::mem::replace; + +#[derive(Debug, PartialEq)] +enum State { + Queued, + Downloading, + Downloaded { + transactions: Vec, + }, + Drained, +} + +impl Default for State { + fn default() -> Self { + State::Queued + } +} #[derive(Default)] pub struct BodyDownloader { targets: Vec, - downloading: HashSet, - downloaded: HashMap>, + states: HashMap, } impl BodyDownloader { @@ -31,9 +48,12 @@ impl BodyDownloader { const MAX_BODY_REQEUST_LENGTH: usize = 128; let mut hashes = Vec::new(); for t in &self.targets { - if !self.downloading.contains(t) && !self.downloaded.contains_key(t) { - hashes.push(*t); + let state = self.states.entry(*t).or_default(); + if *state != State::Queued { + continue } + *state = State::Downloading; + hashes.push(*t); if hashes.len() >= MAX_BODY_REQEUST_LENGTH { break } @@ -41,76 +61,99 @@ impl BodyDownloader { if hashes.is_empty() { None } else { - self.downloading.extend(&hashes); Some(RequestMessage::Bodies(hashes)) } } pub fn import_bodies(&mut self, hashes: Vec, bodies: Vec>) { - for (hash, body) in hashes.into_iter().zip(bodies) { - if self.downloading.remove(&hash) { - self.downloaded.insert(hash, body); + assert_eq!(hashes.len(), bodies.len()); + for (hash, transactions) in hashes.into_iter().zip(bodies) { + if let Some(state) = self.states.get_mut(&hash) { + if state != &State::Downloading { + continue + } + *state = State::Downloaded { + transactions, + } } } - self.downloading.shrink_to_fit(); } pub fn get_target_hashes(&self) -> Vec { self.targets.iter().map(Clone::clone).collect() } - pub fn add_target(&mut self, header: &Header) { - cdebug!(SYNC, "Add download target: {}", header.hash()); - self.targets.push(header.hash()); + pub fn add_targets(&mut self, hashes: Vec) { + cdebug!(SYNC, "Add download targets: {:?}", hashes); + for hash in &hashes { + self.states.insert(*hash, State::Queued); + } + self.targets.extend(hashes); } - pub fn remove_target(&mut self, targets: &[BlockHash]) { + pub fn remove_targets(&mut self, targets: &[BlockHash]) { if targets.is_empty() { return } cdebug!(SYNC, "Remove download targets: {:?}", targets); - for hash in targets { - if let Some(index) = self.targets.iter().position(|t| t == hash) { - self.targets.remove(index); - } - self.downloading.remove(hash); - self.downloaded.remove(hash); - } + // XXX: It can be slow. + self.states.retain(|hash, _| !targets.contains(hash)); + self.targets.retain(|hash| !targets.contains(hash)); + self.states.shrink_to_fit(); self.targets.shrink_to_fit(); - self.downloading.shrink_to_fit(); - self.downloaded.shrink_to_fit(); } pub fn reset_downloading(&mut self, hashes: &[BlockHash]) { cdebug!(SYNC, "Remove downloading by timeout {:?}", hashes); for hash in hashes { - self.downloading.remove(&hash); + if let Some(state) = self.states.get_mut(hash) { + if *state == State::Downloading { + *state = State::Queued; + } + } } - self.downloading.shrink_to_fit(); } pub fn drain(&mut self) -> Vec<(BlockHash, Vec)> { let mut result = Vec::new(); - for t in &self.targets { - if let Some(body) = self.downloaded.remove(t) { - result.push((*t, body)); - } else { - break + for hash in &self.targets { + let entry = self.states.entry(*hash); + let state = match entry { + Entry::Vacant(_) => unreachable!(), + Entry::Occupied(mut entry) => match entry.get_mut() { + state @ State::Downloaded { + .. + } => replace(state, State::Drained), + _ => break, + }, + }; + match state { + State::Downloaded { + transactions, + } => { + result.push((*hash, transactions)); + } + _ => unreachable!(), } } - self.downloaded.shrink_to_fit(); - self.targets.drain(0..result.len()); - self.targets.shrink_to_fit(); result } pub fn re_request(&mut self, hash: BlockHash, remains: Vec<(BlockHash, Vec)>) { - let mut new_targets = vec![hash]; - new_targets.extend(remains.into_iter().map(|(hash, transactions)| { - self.downloaded.insert(hash, transactions); - hash - })); - new_targets.append(&mut self.targets); - self.targets = new_targets; + #[inline] + fn insert(states: &mut HashMap, hash: BlockHash, state: State) { + let old = states.insert(hash, state); + debug_assert_ne!(None, old); + } + // The implementation of extend method allocates an additional memory for new items. + // However, our implementation guarantees that new items are already in the map and it just + // update the states. So iterating over new items and calling the insert method is faster + // than using the extend method and uses less memory. + for (hash, transactions) in remains { + insert(&mut self.states, hash, State::Downloaded { + transactions, + }); + } + insert(&mut self.states, hash, State::Queued); } } diff --git a/sync/src/block/extension.rs b/sync/src/block/extension.rs index 0cc75d0c6b..1ab792126f 100644 --- a/sync/src/block/extension.rs +++ b/sync/src/block/extension.rs @@ -223,11 +223,17 @@ impl Extension { } } let mut body_downloader = BodyDownloader::default(); - for neighbors in hollow_headers.windows(2).rev() { - let child = &neighbors[0]; - cdebug!(SYNC, "Adding block #{} (hash: {}) for initial body download target", child.number(), child.hash()); - body_downloader.add_target(child); - } + let new_targets = hollow_headers + .windows(2) + .rev() + .map(|neighbors| { + let child = &neighbors[0]; + let hash = child.hash(); + cdebug!(SYNC, "Adding block #{} (hash: {}) for initial body download target", child.number(), hash); + hash + }) + .collect(); + body_downloader.add_targets(new_targets); cinfo!(SYNC, "Sync extension initialized"); Extension { state, @@ -715,15 +721,13 @@ impl Extension { .into_iter() .filter(|header| self.client.block_body(&BlockId::Hash(header.hash())).is_none()) .collect(); // FIXME: No need to collect here if self is not borrowed. - for header in headers { - self.body_downloader.add_target(&header.decode()); - } + self.body_downloader.add_targets(headers.into_iter().map(|header| header.hash()).collect()); } } fn new_blocks(&mut self, imported: Vec, invalid: Vec) { - self.body_downloader.remove_target(&imported); - self.body_downloader.remove_target(&invalid); + self.body_downloader.remove_targets(&imported); + self.body_downloader.remove_targets(&invalid); self.send_status_broadcast(); } @@ -1038,6 +1042,7 @@ impl Extension { } fn import_blocks(&mut self, blocks: Vec<(BlockHash, Vec)>) { + let mut imported = Vec::new(); let mut remains = Vec::new(); let mut error_target = None; for (hash, transactions) in blocks { @@ -1072,12 +1077,15 @@ impl Extension { cwarn!(SYNC, "Cannot import block({}): {:?}", hash, err); break } - _ => {} + Ok(_) => { + imported.push(hash); + } } } if let Some(hash) = error_target { self.body_downloader.re_request(hash, remains); } + self.body_downloader.remove_targets(&imported); } fn on_body_response(&mut self, hashes: Vec, bodies: Vec>) {