Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 11 additions & 33 deletions node/src/components/api_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ use crate::{
components::storage::{self, Storage},
crypto::hash::Digest,
effect::{
requests::{ApiRequest, DeployGossiperRequest, StorageRequest},
announcements::ApiServerAnnouncement,
requests::{ApiRequest, StorageRequest},
EffectBuilder, EffectExt, Effects,
},
reactor::QueueKind,
Expand All @@ -66,7 +67,7 @@ impl ApiServer {

impl<REv> Component<REv> for ApiServer
where
REv: From<StorageRequest<Storage>> + From<DeployGossiperRequest> + Send,
REv: From<StorageRequest<Storage>> + From<ApiServerAnnouncement> + Send,
{
type Event = Event;

Expand All @@ -77,13 +78,11 @@ where
event: Self::Event,
) -> Effects<Self::Event> {
match event {
Event::ApiRequest(ApiRequest::SubmitDeploy { deploy, responder }) => effect_builder
.put_deploy_to_storage(*deploy.clone())
.event(move |result| Event::PutDeployResult {
deploy,
result,
main_responder: responder,
}),
Event::ApiRequest(ApiRequest::SubmitDeploy { deploy, responder }) => {
let mut effects = effect_builder.announce_deploy_received(deploy).ignore();
effects.extend(responder.respond(()).ignore());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I had said it already so I just repeat myself for the sake of increasing the bus factor – in Scala we were telling the user whether the deploy had an invalid signature, had too many deploy dependencies, invalid TTL etc. In general, stuff that we could easily check and would deem the deploy invalid.

Doesn't need to be done now, just saying so that we don't forget.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And of course, it shouldn't be gossiped if there's anything wrong with it.

effects
}
Event::ApiRequest(ApiRequest::GetDeploy { hash, responder }) => effect_builder
.get_deploy_from_storage(hash)
.event(move |result| Event::GetDeployResult {
Expand All @@ -97,18 +96,6 @@ where
result: Box::new(result),
main_responder: responder,
}),
Event::PutDeployResult {
deploy,
result,
main_responder,
} => {
let cloned_deploy = deploy.clone();
let mut effects = main_responder
.respond(result.map_err(|error| (*deploy, error)))
.ignore();
effects.extend(effect_builder.gossip_deploy(cloned_deploy).ignore());
effects
}
Event::GetDeployResult {
hash: _,
result,
Expand Down Expand Up @@ -187,7 +174,7 @@ where
}
};

let result = effect_builder
effect_builder
.make_request(
|responder| ApiRequest::SubmitDeploy {
deploy: Box::new(deploy),
Expand All @@ -197,17 +184,8 @@ where
)
.await;

match result {
Ok(()) => {
let json = reply::json(&"");
Ok(reply::with_status(json, StatusCode::OK))
}
Err((deploy, error)) => {
let error_reply = format!("Failed to store {}: {}", deploy.id(), error);
let json = reply::json(&error_reply);
Ok(reply::with_status(json, StatusCode::BAD_REQUEST))
}
}
let json = reply::json(&"");
Ok(reply::with_status(json, StatusCode::OK))
}

async fn parse_get_request<REv>(
Expand Down
8 changes: 0 additions & 8 deletions node/src/components/api_server/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@ use crate::{
pub enum Event {
#[from]
ApiRequest(ApiRequest),
PutDeployResult {
deploy: Box<Deploy>,
result: storage::Result<()>,
main_responder: Responder<Result<(), (Deploy, storage::Error)>>,
},
GetDeployResult {
hash: DeployHash,
result: Box<storage::Result<Deploy>>,
Expand All @@ -32,9 +27,6 @@ impl Display for Event {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
match self {
Event::ApiRequest(request) => write!(formatter, "{}", request),
Event::PutDeployResult { result, .. } => {
write!(formatter, "PutDeployResult: {:?}", result)
}
Event::GetDeployResult { hash, result, .. } => {
write!(formatter, "GetDeployResult for {}: {:?}", hash, result)
}
Expand Down
3 changes: 1 addition & 2 deletions node/src/components/chainspec_handler/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,7 @@ fn string_to_array(input: String) -> Result<[u8; ACCOUNT_HASH_LENGTH], Error> {
mod tests {
use std::path::PathBuf;

use super::chainspec::rewrite_with_absolute_paths;
use super::*;
use super::{chainspec::rewrite_with_absolute_paths, *};

const PRODUCTION_DIR: &str = "resources/production";
const EXAMPLE_DIR: &str = "resources/example";
Expand Down
58 changes: 31 additions & 27 deletions node/src/components/deploy_gossiper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
Component,
},
effect::{
requests::{DeployGossiperRequest, NetworkRequest, StorageRequest},
requests::{NetworkRequest, StorageRequest},
EffectBuilder, EffectExt, Effects,
},
types::{Deploy, DeployHash},
Expand All @@ -38,8 +38,8 @@ impl<T> ReactorEvent for T where
/// `DeployGossiper` events.
#[derive(Debug)]
pub enum Event {
/// A request to begin gossiping a new deploy received from a client.
Request(DeployGossiperRequest),
/// A new deploy has been received to be gossiped.
DeployReceived { deploy: Box<Deploy> },
/// The network component gossiped to the included peers.
GossipedTo {
deploy_hash: DeployHash,
Expand All @@ -63,7 +63,7 @@ pub enum Event {
/// result is `Ok`, the deploy hash should be gossiped onwards.
PutToStoreResult {
deploy_hash: DeployHash,
sender: NodeId,
maybe_sender: Option<NodeId>,
result: storage::Result<()>,
},
/// The result of the `DeployGossiper` getting a deploy from the storage component. If the
Expand All @@ -78,7 +78,9 @@ pub enum Event {
impl Display for Event {
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
match self {
Event::Request(request) => write!(formatter, "{}", request),
Event::DeployReceived { deploy } => {
write!(formatter, "new deploy received: {}", deploy.id())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
write!(formatter, "new deploy received: {}", deploy.id())
write!(formatter, "New deploy received: {}", deploy.id())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I prefer your suggestion, but @marc-casperlabs prefers all lowercase, so we agreed to go with his style preference at the start. Now that the EE codebase is ported we have a mix. I'll leave this as is for now until/if we want to refactor all such log statements.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we collect such decisions here?
https://casperlabs.atlassian.net/wiki/spaces/REL/pages/16842753/Coding+Standards
I often wonder about such details as well, and usually just make some choice, which is probably not always in line with the rest of the code. I'd love to have a central place where I can look these things up.

}
Event::GossipedTo { deploy_hash, peers } => write!(
formatter,
"gossiped {} to {}",
Expand Down Expand Up @@ -178,22 +180,21 @@ impl DeployGossiper {
}
}

/// Handles a new deploy received from a client by starting to gossip it.
fn handle_put_from_client<REv: ReactorEvent>(
/// Handles a new deploy received from somewhere other than a peer (e.g. the HTTP API server).
fn handle_deploy_received<REv: ReactorEvent>(
&mut self,
effect_builder: EffectBuilder<REv>,
deploy: Deploy,
) -> Effects<Event> {
if let Some(should_gossip) = self.table.new_complete_data(deploy.id(), None) {
self.gossip(
effect_builder,
*deploy.id(),
should_gossip.count,
should_gossip.exclude_peers,
)
} else {
Effects::new() // we already completed gossiping this deploy
}
// Put the deploy to the storage component.
let deploy_hash = *deploy.id();
effect_builder
.put_deploy_to_storage(deploy)
.event(move |result| Event::PutToStoreResult {
deploy_hash,
maybe_sender: None,
result,
})
}

/// Gossips the given deploy hash to `count` random peers excluding the indicated ones.
Expand Down Expand Up @@ -408,26 +409,27 @@ impl DeployGossiper {
deploy: Deploy,
sender: NodeId,
) -> Effects<Event> {
// Put the deploy to the storage component, and potentially start gossiping about it.
// Put the deploy to the storage component.
let deploy_hash = *deploy.id();
effect_builder
.put_deploy_to_storage(deploy)
.event(move |result| Event::PutToStoreResult {
deploy_hash,
sender,
maybe_sender: Some(sender),
result,
})
}

/// Handles the `Ok` case for a `Result` of attempting to put the deploy to the storage
/// component having received it from the sender.
fn put_to_store<REv: ReactorEvent>(
/// component having received it from the sender (for the `Some` case) or from our own HTTP API
/// server (the `None` case).
fn handle_put_to_store_success<REv: ReactorEvent>(
&mut self,
effect_builder: EffectBuilder<REv>,
deploy_hash: DeployHash,
sender: NodeId,
maybe_sender: Option<NodeId>,
) -> Effects<Event> {
if let Some(should_gossip) = self.table.new_complete_data(&deploy_hash, Some(sender)) {
if let Some(should_gossip) = self.table.new_complete_data(&deploy_hash, maybe_sender) {
self.gossip(
effect_builder,
deploy_hash,
Expand Down Expand Up @@ -496,8 +498,8 @@ where
) -> Effects<Self::Event> {
debug!(?event, "handling event");
match event {
Event::Request(DeployGossiperRequest::PutFromClient { deploy }) => {
self.handle_put_from_client(effect_builder, *deploy)
Event::DeployReceived { deploy } => {
self.handle_deploy_received(effect_builder, *deploy)
}
Event::GossipedTo { deploy_hash, peers } => {
self.gossiped_to(effect_builder, deploy_hash, peers)
Expand Down Expand Up @@ -530,10 +532,12 @@ where
},
Event::PutToStoreResult {
deploy_hash,
sender,
maybe_sender,
result,
} => match result {
Ok(()) => self.put_to_store(effect_builder, deploy_hash, sender),
Ok(()) => {
self.handle_put_to_store_success(effect_builder, deploy_hash, maybe_sender)
}
Err(error) => self.failed_to_put_to_store(deploy_hash, error),
},
Event::GetFromStoreResult {
Expand Down
29 changes: 14 additions & 15 deletions node/src/effect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,11 @@ use crate::{
contract_runtime::core::engine_state::{self, genesis::GenesisResult},
storage::{self, StorageType, Value},
},
effect::requests::DeployGossiperRequest,
reactor::{EventQueueHandle, QueueKind},
types::{Deploy, ExecutedBlock, ProtoBlock},
Chainspec,
};
use announcements::NetworkAnnouncement;
use announcements::{ApiServerAnnouncement, NetworkAnnouncement};
use requests::{ContractRuntimeRequest, NetworkRequest, StorageRequest};

/// A pinned, boxed future that produces one or more events.
Expand Down Expand Up @@ -384,6 +383,19 @@ impl<REv> EffectBuilder<REv> {
.await;
}

/// Announce that the HTTP API server has received a deploy.
pub(crate) async fn announce_deploy_received(self, deploy: Box<Deploy>)
where
REv: From<ApiServerAnnouncement>,
{
self.0
.schedule(
ApiServerAnnouncement::DeployReceived { deploy },
QueueKind::Api,
)
.await;
}

/// Puts the given block into the linear block store.
// TODO: remove once method is used.
#[allow(dead_code)]
Expand Down Expand Up @@ -513,19 +525,6 @@ impl<REv> EffectBuilder<REv> {
.await
}

/// Passes the given deploy to the `DeployGossiper` component to be gossiped.
pub(crate) async fn gossip_deploy(self, deploy: Box<Deploy>)
where
REv: From<DeployGossiperRequest>,
{
self.0
.schedule(
DeployGossiperRequest::PutFromClient { deploy },
QueueKind::Regular,
)
.await;
}

/// Passes the timestamp of a future block for which deploys are to be proposed.
// TODO: Add an argument (`BlockContext`?) that contains all information necessary to select
// deploys, e.g. the ancestors' deploys.
Expand Down
24 changes: 24 additions & 0 deletions node/src/effect/announcements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@

use std::fmt::{self, Display, Formatter};

use crate::types::Deploy;

/// A networking layer announcement.
#[derive(Debug)]
#[must_use]
pub enum NetworkAnnouncement<I, P> {
/// A payload message has been received from a peer.
MessageReceived {
Expand All @@ -30,3 +33,24 @@ where
}
}
}

/// An HTTP API server announcement.
#[derive(Debug)]
#[must_use]
pub enum ApiServerAnnouncement {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have must_use feature for all events and announcements? Even if the stop-gap solution would be to panic/unimplemented/todo! on all the match arms, I think it will be harder to miss than not handling the event/announcement and silently leave the events in the queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll do this, but I wonder how appropriate it is for announcements? From the docs in effect.rs: "the component will never expect an answer for these and does not rely on them being handled." But for now, I doubt we'll be wanting to announce things which aren't handled by some component, so I'll make the suggested change.

/// A new deploy received.
DeployReceived {
/// The received deploy.
deploy: Box<Deploy>,
},
}

impl Display for ApiServerAnnouncement {
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
match self {
ApiServerAnnouncement::DeployReceived { deploy } => {
write!(formatter, "api server received {}", deploy.id())
}
}
}
}
Loading