Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 83 additions & 71 deletions sync/src/block/downloader/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,132 +16,144 @@

use super::super::message::RequestMessage;
use ccore::UnverifiedTransaction;
use ctypes::{BlockHash, Header};
use std::collections::{HashMap, HashSet};
use ctypes::BlockHash;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::mem::replace;

#[derive(Clone)]
struct Target {
hash: BlockHash,
is_empty: bool,
#[derive(Debug, PartialEq)]
enum State {
Queued,
Downloading,
Downloaded {
transactions: Vec<UnverifiedTransaction>,
},
Drained,
}

impl Default for State {
fn default() -> Self {
State::Queued
}
}

#[derive(Default)]
pub struct BodyDownloader {
targets: Vec<Target>,
downloading: HashSet<BlockHash>,
downloaded: HashMap<BlockHash, Vec<UnverifiedTransaction>>,
targets: Vec<BlockHash>,
states: HashMap<BlockHash, State>,
}

impl BodyDownloader {
pub fn create_request(&mut self) -> Option<RequestMessage> {
const MAX_BODY_REQEUST_LENGTH: usize = 128;
let mut hashes = Vec::new();
for t in &self.targets {
if !self.downloading.contains(&t.hash) && !self.downloaded.contains_key(&t.hash) {
hashes.push(t.hash);
let state = self.states.entry(*t).or_default();
if *state != State::Queued {
continue
}
*state = State::Downloading;
hashes.push(*t);
if hashes.len() >= MAX_BODY_REQEUST_LENGTH {
break
}
}
if hashes.is_empty() {
None
} else {
self.downloading.extend(&hashes);
Some(RequestMessage::Bodies(hashes))
}
}

pub fn import_bodies(&mut self, hashes: Vec<BlockHash>, bodies: Vec<Vec<UnverifiedTransaction>>) {
let empty_targets: HashSet<_> = self.targets.iter().filter(|t| t.is_empty).map(|t| t.hash).collect();
for (hash, body) in hashes.into_iter().zip(bodies) {
if self.downloading.remove(&hash) {
if body.is_empty() {
if !empty_targets.contains(&hash) {
cwarn!(SYNC, "Invalid body of {}. It should be not empty.", hash);
continue
}
} else if empty_targets.contains(&hash) {
cwarn!(SYNC, "Invalid body of {}. It should be empty.", hash);
assert_eq!(hashes.len(), bodies.len());
for (hash, transactions) in hashes.into_iter().zip(bodies) {
if let Some(state) = self.states.get_mut(&hash) {
if state != &State::Downloading {
continue
}
self.downloaded.insert(hash, body);
*state = State::Downloaded {
transactions,
}
}
}
self.downloading.shrink_to_fit();
}

pub fn get_target_hashes(&self) -> Vec<BlockHash> {
self.targets.iter().map(|t| t.hash).collect()
self.targets.iter().map(Clone::clone).collect()
}

pub fn add_target(&mut self, header: &Header) {
cdebug!(SYNC, "Add download target: {}", header.hash());
self.targets.push(Target {
hash: header.hash(),
is_empty: header.is_empty(),
});
pub fn add_targets(&mut self, hashes: Vec<BlockHash>) {
cdebug!(SYNC, "Add download targets: {:?}", hashes);
for hash in &hashes {
self.states.insert(*hash, State::Queued);
}
self.targets.extend(hashes);
}

pub fn remove_target(&mut self, targets: &[BlockHash]) {
pub fn remove_targets(&mut self, targets: &[BlockHash]) {
if targets.is_empty() {
return
}
cdebug!(SYNC, "Remove download targets: {:?}", targets);
for hash in targets {
if let Some(index) = self.targets.iter().position(|t| t.hash == *hash) {
self.targets.remove(index);
}
self.downloading.remove(hash);
self.downloaded.remove(hash);
}
// XXX: It can be slow.
self.states.retain(|hash, _| !targets.contains(hash));
self.targets.retain(|hash| !targets.contains(hash));
self.states.shrink_to_fit();
self.targets.shrink_to_fit();
self.downloading.shrink_to_fit();
self.downloaded.shrink_to_fit();
}

pub fn reset_downloading(&mut self, hashes: &[BlockHash]) {
cdebug!(SYNC, "Remove downloading by timeout {:?}", hashes);
for hash in hashes {
self.downloading.remove(&hash);
if let Some(state) = self.states.get_mut(hash) {
if *state == State::Downloading {
*state = State::Queued;
}
}
}
self.downloading.shrink_to_fit();
}

pub fn drain(&mut self) -> Vec<(BlockHash, Vec<UnverifiedTransaction>)> {
let mut result = Vec::new();
for t in &self.targets {
if let Some(body) = self.downloaded.remove(&t.hash) {
result.push((t.hash, body));
} else {
break
for hash in &self.targets {
let entry = self.states.entry(*hash);
let state = match entry {
Entry::Vacant(_) => unreachable!(),
Entry::Occupied(mut entry) => match entry.get_mut() {
state @ State::Downloaded {
..
} => replace(state, State::Drained),
_ => break,
},
};
match state {
State::Downloaded {
transactions,
} => {
result.push((*hash, transactions));
}
_ => unreachable!(),
}
}
self.downloaded.shrink_to_fit();
self.targets.drain(0..result.len());
self.targets.shrink_to_fit();
result
}

pub fn re_request(
&mut self,
hash: BlockHash,
is_empty: bool,
remains: Vec<(BlockHash, Vec<UnverifiedTransaction>)>,
) {
let mut new_targets = vec![Target {
hash,
is_empty,
}];
new_targets.extend(remains.into_iter().map(|(hash, transactions)| {
let is_empty = transactions.is_empty();
self.downloaded.insert(hash, transactions);
Target {
hash,
is_empty,
}
}));
new_targets.append(&mut self.targets);
self.targets = new_targets;
pub fn re_request(&mut self, hash: BlockHash, remains: Vec<(BlockHash, Vec<UnverifiedTransaction>)>) {
#[inline]
fn insert(states: &mut HashMap<BlockHash, State>, hash: BlockHash, state: State) {
let old = states.insert(hash, state);
debug_assert_ne!(None, old);
}
// The implementation of extend method allocates an additional memory for new items.
// However, our implementation guarantees that new items are already in the map and it just
// update the states. So iterating over new items and calling the insert method is faster
// than using the extend method and uses less memory.
for (hash, transactions) in remains {
insert(&mut self.states, hash, State::Downloaded {
transactions,
});
}
insert(&mut self.states, hash, State::Queued);
}
}
36 changes: 22 additions & 14 deletions sync/src/block/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,17 @@ impl Extension {
}
}
let mut body_downloader = BodyDownloader::default();
for neighbors in hollow_headers.windows(2).rev() {
let child = &neighbors[0];
cdebug!(SYNC, "Adding block #{} (hash: {}) for initial body download target", child.number(), child.hash());
body_downloader.add_target(child);
}
let new_targets = hollow_headers
.windows(2)
.rev()
.map(|neighbors| {
let child = &neighbors[0];
let hash = child.hash();
cdebug!(SYNC, "Adding block #{} (hash: {}) for initial body download target", child.number(), hash);
hash
})
.collect();
body_downloader.add_targets(new_targets);
cinfo!(SYNC, "Sync extension initialized");
Extension {
state,
Expand Down Expand Up @@ -715,15 +721,13 @@ impl Extension {
.into_iter()
.filter(|header| self.client.block_body(&BlockId::Hash(header.hash())).is_none())
.collect(); // FIXME: No need to collect here if self is not borrowed.
for header in headers {
self.body_downloader.add_target(&header.decode());
}
self.body_downloader.add_targets(headers.into_iter().map(|header| header.hash()).collect());
}
}

fn new_blocks(&mut self, imported: Vec<BlockHash>, invalid: Vec<BlockHash>) {
self.body_downloader.remove_target(&imported);
self.body_downloader.remove_target(&invalid);
self.body_downloader.remove_targets(&imported);
self.body_downloader.remove_targets(&invalid);

self.send_status_broadcast();
}
Expand Down Expand Up @@ -1038,6 +1042,7 @@ impl Extension {
}

fn import_blocks(&mut self, blocks: Vec<(BlockHash, Vec<UnverifiedTransaction>)>) {
let mut imported = Vec::new();
let mut remains = Vec::new();
let mut error_target = None;
for (hash, transactions) in blocks {
Expand All @@ -1051,7 +1056,7 @@ impl Extension {
skewed_merkle_root(BLAKE_NULL_RLP, transactions.iter().map(Encodable::rlp_bytes));
if *header.transactions_root() != calculated_transactions_root {
cwarn!(SYNC, "Received corrupted body for ${}({}", header.number(), hash);
error_target = Some((hash, transactions.is_empty()));
error_target = Some(hash);
continue
}

Expand All @@ -1072,12 +1077,15 @@ impl Extension {
cwarn!(SYNC, "Cannot import block({}): {:?}", hash, err);
break
}
_ => {}
Ok(_) => {
imported.push(hash);
}
}
}
if let Some((hash, is_empty)) = error_target {
self.body_downloader.re_request(hash, is_empty, remains);
if let Some(hash) = error_target {
self.body_downloader.re_request(hash, remains);
}
self.body_downloader.remove_targets(&imported);
}

fn on_body_response(&mut self, hashes: Vec<BlockHash>, bodies: Vec<Vec<UnverifiedTransaction>>) {
Expand Down