Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ smallvec = "1.4.0"
structopt = "0.3.14"
tempfile = "3.1.0"
thiserror = "1.0.18"
tokio = { version = "0.2.20", features = ["macros", "rt-threaded", "sync", "tcp", "time", "blocking"] }
tokio = { version = "0.2.20", features = ["blocking", "macros", "rt-threaded", "sync", "tcp", "time"] }
tokio-openssl = "0.4.0"
tokio-serde = { version = "0.6.1", features = ["messagepack"] }
tokio-util = { version = "0.3.1", features = ["codec"] }
Expand All @@ -80,13 +80,14 @@ wasmi = "0.6.2"

[dev-dependencies]
assert_matches = "1.3.0"
criterion = "0.3.3"
fake_instant = "0.4.0"
lazy_static = "1"
pnet = "0.26.0"
proptest = "0.9.4"
fake_instant = "0.4.0"
rand_xorshift = { version = "~0.2.0" }
rand_core = "0.5.1"
criterion = "0.3.3"
tokio = { version = "0.2.20", features = ["test-util"] }

[features]
vendored-openssl = ['openssl/vendored']
Expand Down
1 change: 1 addition & 0 deletions node/src/components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod contract_runtime;
pub(crate) mod deploy_buffer;
pub(crate) mod deploy_gossiper;
// The `in_memory_network` is public for use in doctests.
#[cfg(test)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand why this is desired, but I believe I removed it because it made it not show up in the docs, IIRC. Also, I do not think doctests are executed in this case.

Correct me if I am from, but this should be conditional on both test and doctest? (#[cfg(any(test, doctest))])

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the consensus so far appears to be that we shouldn't add modules/types to public APIs unless required. After the proposed style guide is finalised, if we do prefer to extend APIs just to support docs/doctests we can change this, but for now I'd rather keep the test-only code test feature-gated.

If we take that view, I don't think there's any point to changing this to #[cfg(any(test, doctest))] since doctests won't run this code?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doctest will cause it to run with cargo test AFAIK.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It tried and it seems like it doesn't.

pub mod in_memory_network;
pub(crate) mod metrics;
pub(crate) mod pinger;
Expand Down
147 changes: 11 additions & 136 deletions node/src/components/deploy_gossiper.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use std::{
collections::HashSet,
fmt::{self, Display, Formatter},
time::Duration,
};
mod event;
mod message;
mod tests;

use std::{collections::HashSet, time::Duration};

use futures::FutureExt;
use rand::Rng;
use serde::{Deserialize, Serialize};
use smallvec::smallvec;
use tracing::{debug, error, warn};
use tracing::{debug, error};

use crate::{
components::{
Expand All @@ -21,10 +20,13 @@ use crate::{
EffectBuilder, EffectExt, Effects,
},
types::{Deploy, DeployHash},
utils::{DisplayIter, GossipAction, GossipTable},
utils::{GossipAction, GossipTable},
GossipTableConfig,
};

pub use event::Event;
pub use message::Message;

trait ReactorEvent:
From<Event> + From<NetworkRequest<NodeId, Message>> + From<StorageRequest<Storage>> + Send
{
Expand All @@ -35,133 +37,6 @@ impl<T> ReactorEvent for T where
{
}

/// `DeployGossiper` events.
#[derive(Debug)]
pub enum Event {
/// A new deploy has been received to be gossiped.
DeployReceived { deploy: Box<Deploy> },
/// The network component gossiped to the included peers.
GossipedTo {
deploy_hash: DeployHash,
peers: HashSet<NodeId>,
},
/// The timeout for waiting for a gossip response has elapsed and we should check the response
/// arrived.
CheckGossipTimeout {
deploy_hash: DeployHash,
peer: NodeId,
},
/// The timeout for waiting for the full deploy body has elapsed and we should check the
/// response arrived.
CheckGetFromPeerTimeout {
deploy_hash: DeployHash,
peer: NodeId,
},
/// An incoming gossip network message.
MessageReceived { sender: NodeId, message: Message },
/// The result of the `DeployGossiper` putting a deploy to the storage component. If the
/// result is `Ok`, the deploy hash should be gossiped onwards.
PutToStoreResult {
deploy_hash: DeployHash,
maybe_sender: Option<NodeId>,
result: storage::Result<bool>,
},
/// The result of the `DeployGossiper` getting a deploy from the storage component. If the
/// result is `Ok`, the deploy should be sent to the requesting peer.
GetFromStoreResult {
deploy_hash: DeployHash,
requester: NodeId,
result: Box<storage::Result<Deploy>>,
},
}

impl Display for Event {
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
match self {
Event::DeployReceived { deploy } => {
write!(formatter, "new deploy received: {}", deploy.id())
}
Event::GossipedTo { deploy_hash, peers } => write!(
formatter,
"gossiped {} to {}",
deploy_hash,
DisplayIter::new(peers)
),
Event::CheckGossipTimeout { deploy_hash, peer } => write!(
formatter,
"check gossip timeout for {} with {}",
deploy_hash, peer
),
Event::CheckGetFromPeerTimeout { deploy_hash, peer } => write!(
formatter,
"check get from peer timeout for {} with {}",
deploy_hash, peer
),
Event::MessageReceived { sender, message } => {
write!(formatter, "{} received from {}", message, sender)
}
Event::PutToStoreResult {
deploy_hash,
result,
..
} => {
if result.is_ok() {
write!(formatter, "put {} to store", deploy_hash)
} else {
write!(formatter, "failed to put {} to store", deploy_hash)
}
}
Event::GetFromStoreResult {
deploy_hash,
result,
..
} => {
if result.is_ok() {
write!(formatter, "got {} from store", deploy_hash)
} else {
write!(formatter, "failed to get {} from store", deploy_hash)
}
}
}
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum Message {
/// Gossiped out to random peers to notify them of a `Deploy` we hold.
Gossip(DeployHash),
/// Response to a `Gossip` message. If `is_already_held` is false, the recipient should treat
/// this as a `GetRequest` and send a `GetResponse` containing the `Deploy`.
GossipResponse {
deploy_hash: DeployHash,
is_already_held: bool,
},
/// Sent if a `Deploy` fails to arrive, either after sending a `GossipResponse` with
/// `is_already_held` set to false, or after a previous `GetRequest`.
GetRequest(DeployHash),
/// Sent in response to a `GetRequest`, or to a peer which responded to gossip indicating it
/// didn't already hold the full `Deploy`.
GetResponse(Box<Deploy>),
}

impl Display for Message {
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
match self {
Message::Gossip(deploy_hash) => write!(formatter, "gossip({})", deploy_hash),
Message::GossipResponse {
deploy_hash,
is_already_held,
} => write!(
formatter,
"gossip-response({}, {})",
deploy_hash, is_already_held
),
Message::GetRequest(deploy_hash) => write!(formatter, "get-request({})", deploy_hash),
Message::GetResponse(deploy) => write!(formatter, "get-response({})", deploy.id()),
}
}
}

/// The component which gossips `Deploy`s to peers and handles incoming `Deploy`s which have been
/// gossiped to it.
#[derive(Debug)]
Expand Down Expand Up @@ -222,7 +97,7 @@ impl DeployGossiper {
// in the entry being removed.
if peers.is_empty() {
self.table.pause(&deploy_hash);
warn!(
debug!(
"paused gossiping {} since no more peers to gossip to",
deploy_hash
);
Expand Down
102 changes: 102 additions & 0 deletions node/src/components/deploy_gossiper/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use std::{
collections::HashSet,
fmt::{self, Display, Formatter},
};

use super::Message;
use crate::{
components::{small_network::NodeId, storage},
types::{Deploy, DeployHash},
utils::DisplayIter,
};

/// `DeployGossiper` events.
#[derive(Debug)]
pub enum Event {
/// A new deploy has been received to be gossiped.
DeployReceived { deploy: Box<Deploy> },
/// The network component gossiped to the included peers.
GossipedTo {
deploy_hash: DeployHash,
peers: HashSet<NodeId>,
},
/// The timeout for waiting for a gossip response has elapsed and we should check the response
/// arrived.
CheckGossipTimeout {
deploy_hash: DeployHash,
peer: NodeId,
},
/// The timeout for waiting for the full deploy body has elapsed and we should check the
/// response arrived.
CheckGetFromPeerTimeout {
deploy_hash: DeployHash,
peer: NodeId,
},
/// An incoming gossip network message.
MessageReceived { sender: NodeId, message: Message },
/// The result of the `DeployGossiper` putting a deploy to the storage component. If the
/// result is `Ok`, the deploy hash should be gossiped onwards.
PutToStoreResult {
deploy_hash: DeployHash,
maybe_sender: Option<NodeId>,
result: storage::Result<bool>,
},
/// The result of the `DeployGossiper` getting a deploy from the storage component. If the
/// result is `Ok`, the deploy should be sent to the requesting peer.
GetFromStoreResult {
deploy_hash: DeployHash,
requester: NodeId,
result: Box<storage::Result<Deploy>>,
},
}

impl Display for Event {
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
match self {
Event::DeployReceived { deploy } => {
write!(formatter, "new deploy received: {}", deploy.id())
}
Event::GossipedTo { deploy_hash, peers } => write!(
formatter,
"gossiped {} to {}",
deploy_hash,
DisplayIter::new(peers)
),
Event::CheckGossipTimeout { deploy_hash, peer } => write!(
formatter,
"check gossip timeout for {} with {}",
deploy_hash, peer
),
Event::CheckGetFromPeerTimeout { deploy_hash, peer } => write!(
formatter,
"check get from peer timeout for {} with {}",
deploy_hash, peer
),
Event::MessageReceived { sender, message } => {
write!(formatter, "{} received from {}", message, sender)
}
Event::PutToStoreResult {
deploy_hash,
result,
..
} => {
if result.is_ok() {
write!(formatter, "put {} to store", deploy_hash)
} else {
write!(formatter, "failed to put {} to store", deploy_hash)
}
}
Event::GetFromStoreResult {
deploy_hash,
result,
..
} => {
if result.is_ok() {
write!(formatter, "got {} from store", deploy_hash)
} else {
write!(formatter, "failed to get {} from store", deploy_hash)
}
}
}
}
}
41 changes: 41 additions & 0 deletions node/src/components/deploy_gossiper/message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::fmt::{self, Display, Formatter};

use serde::{Deserialize, Serialize};

use crate::types::{Deploy, DeployHash};

#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum Message {
/// Gossiped out to random peers to notify them of a `Deploy` we hold.
Gossip(DeployHash),
/// Response to a `Gossip` message. If `is_already_held` is false, the recipient should treat
/// this as a `GetRequest` and send a `GetResponse` containing the `Deploy`.
GossipResponse {
deploy_hash: DeployHash,
is_already_held: bool,
},
/// Sent if a `Deploy` fails to arrive, either after sending a `GossipResponse` with
/// `is_already_held` set to false, or after a previous `GetRequest`.
GetRequest(DeployHash),
/// Sent in response to a `GetRequest`, or to a peer which responded to gossip indicating it
/// didn't already hold the full `Deploy`.
GetResponse(Box<Deploy>),
}

impl Display for Message {
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
match self {
Message::Gossip(deploy_hash) => write!(formatter, "gossip({})", deploy_hash),
Message::GossipResponse {
deploy_hash,
is_already_held,
} => write!(
formatter,
"gossip-response({}, {})",
deploy_hash, is_already_held
),
Message::GetRequest(deploy_hash) => write!(formatter, "get-request({})", deploy_hash),
Message::GetResponse(deploy) => write!(formatter, "get-response({})", deploy.id()),
}
}
}
Loading