diff --git a/sync/src/block/downloader/body.rs b/sync/src/block/downloader/body.rs index cc46e4d418..ff1390da22 100644 --- a/sync/src/block/downloader/body.rs +++ b/sync/src/block/downloader/body.rs @@ -16,20 +16,31 @@ use super::super::message::RequestMessage; use ccore::UnverifiedTransaction; -use ctypes::{BlockHash, Header}; -use std::collections::{HashMap, HashSet}; +use ctypes::BlockHash; +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::mem::replace; -#[derive(Clone)] -struct Target { - hash: BlockHash, - is_empty: bool, +#[derive(Debug, PartialEq)] +enum State { + Queued, + Downloading, + Downloaded { + transactions: Vec, + }, + Drained, +} + +impl Default for State { + fn default() -> Self { + State::Queued + } } #[derive(Default)] pub struct BodyDownloader { - targets: Vec, - downloading: HashSet, - downloaded: HashMap>, + targets: Vec, + states: HashMap, } impl BodyDownloader { @@ -37,9 +48,12 @@ impl BodyDownloader { const MAX_BODY_REQEUST_LENGTH: usize = 128; let mut hashes = Vec::new(); for t in &self.targets { - if !self.downloading.contains(&t.hash) && !self.downloaded.contains_key(&t.hash) { - hashes.push(t.hash); + let state = self.states.entry(*t).or_default(); + if *state != State::Queued { + continue } + *state = State::Downloading; + hashes.push(*t); if hashes.len() >= MAX_BODY_REQEUST_LENGTH { break } @@ -47,101 +61,99 @@ impl BodyDownloader { if hashes.is_empty() { None } else { - self.downloading.extend(&hashes); Some(RequestMessage::Bodies(hashes)) } } pub fn import_bodies(&mut self, hashes: Vec, bodies: Vec>) { - let empty_targets: HashSet<_> = self.targets.iter().filter(|t| t.is_empty).map(|t| t.hash).collect(); - for (hash, body) in hashes.into_iter().zip(bodies) { - if self.downloading.remove(&hash) { - if body.is_empty() { - if !empty_targets.contains(&hash) { - cwarn!(SYNC, "Invalid body of {}. It should be not empty.", hash); - continue - } - } else if empty_targets.contains(&hash) { - cwarn!(SYNC, "Invalid body of {}. It should be empty.", hash); + assert_eq!(hashes.len(), bodies.len()); + for (hash, transactions) in hashes.into_iter().zip(bodies) { + if let Some(state) = self.states.get_mut(&hash) { + if state != &State::Downloading { continue } - self.downloaded.insert(hash, body); + *state = State::Downloaded { + transactions, + } } } - self.downloading.shrink_to_fit(); } pub fn get_target_hashes(&self) -> Vec { - self.targets.iter().map(|t| t.hash).collect() + self.targets.iter().map(Clone::clone).collect() } - pub fn add_target(&mut self, header: &Header) { - cdebug!(SYNC, "Add download target: {}", header.hash()); - self.targets.push(Target { - hash: header.hash(), - is_empty: header.is_empty(), - }); + pub fn add_targets(&mut self, hashes: Vec) { + cdebug!(SYNC, "Add download targets: {:?}", hashes); + for hash in &hashes { + self.states.insert(*hash, State::Queued); + } + self.targets.extend(hashes); } - pub fn remove_target(&mut self, targets: &[BlockHash]) { + pub fn remove_targets(&mut self, targets: &[BlockHash]) { if targets.is_empty() { return } cdebug!(SYNC, "Remove download targets: {:?}", targets); - for hash in targets { - if let Some(index) = self.targets.iter().position(|t| t.hash == *hash) { - self.targets.remove(index); - } - self.downloading.remove(hash); - self.downloaded.remove(hash); - } + // XXX: It can be slow. + self.states.retain(|hash, _| !targets.contains(hash)); + self.targets.retain(|hash| !targets.contains(hash)); + self.states.shrink_to_fit(); self.targets.shrink_to_fit(); - self.downloading.shrink_to_fit(); - self.downloaded.shrink_to_fit(); } pub fn reset_downloading(&mut self, hashes: &[BlockHash]) { cdebug!(SYNC, "Remove downloading by timeout {:?}", hashes); for hash in hashes { - self.downloading.remove(&hash); + if let Some(state) = self.states.get_mut(hash) { + if *state == State::Downloading { + *state = State::Queued; + } + } } - self.downloading.shrink_to_fit(); } pub fn drain(&mut self) -> Vec<(BlockHash, Vec)> { let mut result = Vec::new(); - for t in &self.targets { - if let Some(body) = self.downloaded.remove(&t.hash) { - result.push((t.hash, body)); - } else { - break + for hash in &self.targets { + let entry = self.states.entry(*hash); + let state = match entry { + Entry::Vacant(_) => unreachable!(), + Entry::Occupied(mut entry) => match entry.get_mut() { + state @ State::Downloaded { + .. + } => replace(state, State::Drained), + _ => break, + }, + }; + match state { + State::Downloaded { + transactions, + } => { + result.push((*hash, transactions)); + } + _ => unreachable!(), } } - self.downloaded.shrink_to_fit(); - self.targets.drain(0..result.len()); - self.targets.shrink_to_fit(); result } - pub fn re_request( - &mut self, - hash: BlockHash, - is_empty: bool, - remains: Vec<(BlockHash, Vec)>, - ) { - let mut new_targets = vec![Target { - hash, - is_empty, - }]; - new_targets.extend(remains.into_iter().map(|(hash, transactions)| { - let is_empty = transactions.is_empty(); - self.downloaded.insert(hash, transactions); - Target { - hash, - is_empty, - } - })); - new_targets.append(&mut self.targets); - self.targets = new_targets; + pub fn re_request(&mut self, hash: BlockHash, remains: Vec<(BlockHash, Vec)>) { + #[inline] + fn insert(states: &mut HashMap, hash: BlockHash, state: State) { + let old = states.insert(hash, state); + debug_assert_ne!(None, old); + } + // The implementation of extend method allocates an additional memory for new items. + // However, our implementation guarantees that new items are already in the map and it just + // update the states. So iterating over new items and calling the insert method is faster + // than using the extend method and uses less memory. + for (hash, transactions) in remains { + insert(&mut self.states, hash, State::Downloaded { + transactions, + }); + } + insert(&mut self.states, hash, State::Queued); } } diff --git a/sync/src/block/extension.rs b/sync/src/block/extension.rs index 4d670674a6..1ab792126f 100644 --- a/sync/src/block/extension.rs +++ b/sync/src/block/extension.rs @@ -223,11 +223,17 @@ impl Extension { } } let mut body_downloader = BodyDownloader::default(); - for neighbors in hollow_headers.windows(2).rev() { - let child = &neighbors[0]; - cdebug!(SYNC, "Adding block #{} (hash: {}) for initial body download target", child.number(), child.hash()); - body_downloader.add_target(child); - } + let new_targets = hollow_headers + .windows(2) + .rev() + .map(|neighbors| { + let child = &neighbors[0]; + let hash = child.hash(); + cdebug!(SYNC, "Adding block #{} (hash: {}) for initial body download target", child.number(), hash); + hash + }) + .collect(); + body_downloader.add_targets(new_targets); cinfo!(SYNC, "Sync extension initialized"); Extension { state, @@ -715,15 +721,13 @@ impl Extension { .into_iter() .filter(|header| self.client.block_body(&BlockId::Hash(header.hash())).is_none()) .collect(); // FIXME: No need to collect here if self is not borrowed. - for header in headers { - self.body_downloader.add_target(&header.decode()); - } + self.body_downloader.add_targets(headers.into_iter().map(|header| header.hash()).collect()); } } fn new_blocks(&mut self, imported: Vec, invalid: Vec) { - self.body_downloader.remove_target(&imported); - self.body_downloader.remove_target(&invalid); + self.body_downloader.remove_targets(&imported); + self.body_downloader.remove_targets(&invalid); self.send_status_broadcast(); } @@ -1038,6 +1042,7 @@ impl Extension { } fn import_blocks(&mut self, blocks: Vec<(BlockHash, Vec)>) { + let mut imported = Vec::new(); let mut remains = Vec::new(); let mut error_target = None; for (hash, transactions) in blocks { @@ -1051,7 +1056,7 @@ impl Extension { skewed_merkle_root(BLAKE_NULL_RLP, transactions.iter().map(Encodable::rlp_bytes)); if *header.transactions_root() != calculated_transactions_root { cwarn!(SYNC, "Received corrupted body for ${}({}", header.number(), hash); - error_target = Some((hash, transactions.is_empty())); + error_target = Some(hash); continue } @@ -1072,12 +1077,15 @@ impl Extension { cwarn!(SYNC, "Cannot import block({}): {:?}", hash, err); break } - _ => {} + Ok(_) => { + imported.push(hash); + } } } - if let Some((hash, is_empty)) = error_target { - self.body_downloader.re_request(hash, is_empty, remains); + if let Some(hash) = error_target { + self.body_downloader.re_request(hash, remains); } + self.body_downloader.remove_targets(&imported); } fn on_body_response(&mut self, hashes: Vec, bodies: Vec>) {