From 7ade3bded4bc466bc5c35b6cab7936aabe1a5954 Mon Sep 17 00:00:00 2001 From: Fraser Hutchison Date: Fri, 10 Jul 2020 13:16:07 +0100 Subject: [PATCH 1/3] NDRS-119: update API server to announce new deploy --- node/src/components/api_server.rs | 44 ++++----------- node/src/components/api_server/event.rs | 8 --- .../components/chainspec_handler/config.rs | 3 +- node/src/components/consensus.rs | 2 +- node/src/components/deploy_gossiper.rs | 54 ++++++++++--------- node/src/effect.rs | 29 +++++----- node/src/effect/announcements.rs | 22 ++++++++ node/src/effect/requests.rs | 30 ++--------- node/src/reactor/validator.rs | 28 +++++----- 9 files changed, 95 insertions(+), 125 deletions(-) diff --git a/node/src/components/api_server.rs b/node/src/components/api_server.rs index afd3367d4f..18525414ee 100644 --- a/node/src/components/api_server.rs +++ b/node/src/components/api_server.rs @@ -40,7 +40,8 @@ use crate::{ components::storage::{self, Storage}, crypto::hash::Digest, effect::{ - requests::{ApiRequest, DeployGossiperRequest, StorageRequest}, + announcements::ApiServerAnnouncement, + requests::{ApiRequest, StorageRequest}, EffectBuilder, EffectExt, Effects, }, reactor::QueueKind, @@ -66,7 +67,7 @@ impl ApiServer { impl Component for ApiServer where - REv: From> + From + Send, + REv: From> + From + Send, { type Event = Event; @@ -77,13 +78,11 @@ where event: Self::Event, ) -> Effects { match event { - Event::ApiRequest(ApiRequest::SubmitDeploy { deploy, responder }) => effect_builder - .put_deploy_to_storage(*deploy.clone()) - .event(move |result| Event::PutDeployResult { - deploy, - result, - main_responder: responder, - }), + Event::ApiRequest(ApiRequest::SubmitDeploy { deploy, responder }) => { + let mut effects = effect_builder.announce_deploy_received(deploy).ignore(); + effects.extend(responder.respond(()).ignore()); + effects + } Event::ApiRequest(ApiRequest::GetDeploy { hash, responder }) => effect_builder .get_deploy_from_storage(hash) .event(move |result| Event::GetDeployResult { @@ -97,18 +96,6 @@ where result: Box::new(result), main_responder: responder, }), - Event::PutDeployResult { - deploy, - result, - main_responder, - } => { - let cloned_deploy = deploy.clone(); - let mut effects = main_responder - .respond(result.map_err(|error| (*deploy, error))) - .ignore(); - effects.extend(effect_builder.gossip_deploy(cloned_deploy).ignore()); - effects - } Event::GetDeployResult { hash: _, result, @@ -187,7 +174,7 @@ where } }; - let result = effect_builder + effect_builder .make_request( |responder| ApiRequest::SubmitDeploy { deploy: Box::new(deploy), @@ -197,17 +184,8 @@ where ) .await; - match result { - Ok(()) => { - let json = reply::json(&""); - Ok(reply::with_status(json, StatusCode::OK)) - } - Err((deploy, error)) => { - let error_reply = format!("Failed to store {}: {}", deploy.id(), error); - let json = reply::json(&error_reply); - Ok(reply::with_status(json, StatusCode::BAD_REQUEST)) - } - } + let json = reply::json(&""); + Ok(reply::with_status(json, StatusCode::OK)) } async fn parse_get_request( diff --git a/node/src/components/api_server/event.rs b/node/src/components/api_server/event.rs index e05106842c..a5f695e44f 100644 --- a/node/src/components/api_server/event.rs +++ b/node/src/components/api_server/event.rs @@ -12,11 +12,6 @@ use crate::{ pub enum Event { #[from] ApiRequest(ApiRequest), - PutDeployResult { - deploy: Box, - result: storage::Result<()>, - main_responder: Responder>, - }, GetDeployResult { hash: DeployHash, result: Box>, @@ -32,9 +27,6 @@ impl Display for Event { fn fmt(&self, formatter: &mut Formatter) -> fmt::Result { match self { Event::ApiRequest(request) => write!(formatter, "{}", request), - Event::PutDeployResult { result, .. } => { - write!(formatter, "PutDeployResult: {:?}", result) - } Event::GetDeployResult { hash, result, .. } => { write!(formatter, "GetDeployResult for {}: {:?}", hash, result) } diff --git a/node/src/components/chainspec_handler/config.rs b/node/src/components/chainspec_handler/config.rs index df391028e9..692fedede1 100644 --- a/node/src/components/chainspec_handler/config.rs +++ b/node/src/components/chainspec_handler/config.rs @@ -350,8 +350,7 @@ fn string_to_array(input: String) -> Result<[u8; ACCOUNT_HASH_LENGTH], Error> { mod tests { use std::path::PathBuf; - use super::chainspec::rewrite_with_absolute_paths; - use super::*; + use super::{chainspec::rewrite_with_absolute_paths, *}; const PRODUCTION_DIR: &str = "resources/production"; const EXAMPLE_DIR: &str = "resources/example"; diff --git a/node/src/components/consensus.rs b/node/src/components/consensus.rs index 6206f043d1..3ff618fba3 100644 --- a/node/src/components/consensus.rs +++ b/node/src/components/consensus.rs @@ -130,7 +130,7 @@ where event: Self::Event, ) -> Effects { match event { - Event::Timer { .. } => todo!(), + Event::Timer { .. } => Effects::new(), Event::MessageReceived { sender, msg } => { let ConsensusMessage { era_id, payload } = msg; self.delegate_to_era(era_id, effect_builder, move |consensus| { diff --git a/node/src/components/deploy_gossiper.rs b/node/src/components/deploy_gossiper.rs index 340a0782dd..7d893a4296 100644 --- a/node/src/components/deploy_gossiper.rs +++ b/node/src/components/deploy_gossiper.rs @@ -17,7 +17,7 @@ use crate::{ Component, }, effect::{ - requests::{DeployGossiperRequest, NetworkRequest, StorageRequest}, + requests::{NetworkRequest, StorageRequest}, EffectBuilder, EffectExt, Effects, }, types::{Deploy, DeployHash}, @@ -38,8 +38,8 @@ impl ReactorEvent for T where /// `DeployGossiper` events. #[derive(Debug)] pub enum Event { - /// A request to begin gossiping a new deploy received from a client. - Request(DeployGossiperRequest), + /// A new deploy has been received to be gossiped. + DeployReceived { deploy: Box }, /// The network component gossiped to the included peers. GossipedTo { deploy_hash: DeployHash, @@ -63,7 +63,7 @@ pub enum Event { /// result is `Ok`, the deploy hash should be gossiped onwards. PutToStoreResult { deploy_hash: DeployHash, - sender: NodeId, + maybe_sender: Option, result: storage::Result<()>, }, /// The result of the `DeployGossiper` getting a deploy from the storage component. If the @@ -78,7 +78,9 @@ pub enum Event { impl Display for Event { fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { match self { - Event::Request(request) => write!(formatter, "{}", request), + Event::DeployReceived { deploy } => { + write!(formatter, "new deploy received: {}", deploy.id()) + } Event::GossipedTo { deploy_hash, peers } => write!( formatter, "gossiped {} to {}", @@ -178,22 +180,21 @@ impl DeployGossiper { } } - /// Handles a new deploy received from a client by starting to gossip it. - fn handle_put_from_client( + /// Handles a new deploy received from somewhere other than a peer (e.g. the HTTP API server). + fn handle_deploy_received( &mut self, effect_builder: EffectBuilder, deploy: Deploy, ) -> Effects { - if let Some(should_gossip) = self.table.new_complete_data(deploy.id(), None) { - self.gossip( - effect_builder, - *deploy.id(), - should_gossip.count, - should_gossip.exclude_peers, - ) - } else { - Effects::new() // we already completed gossiping this deploy - } + // Put the deploy to the storage component. + let deploy_hash = *deploy.id(); + effect_builder + .put_deploy_to_storage(deploy) + .event(move |result| Event::PutToStoreResult { + deploy_hash, + maybe_sender: None, + result, + }) } /// Gossips the given deploy hash to `count` random peers excluding the indicated ones. @@ -408,26 +409,27 @@ impl DeployGossiper { deploy: Deploy, sender: NodeId, ) -> Effects { - // Put the deploy to the storage component, and potentially start gossiping about it. + // Put the deploy to the storage component. let deploy_hash = *deploy.id(); effect_builder .put_deploy_to_storage(deploy) .event(move |result| Event::PutToStoreResult { deploy_hash, - sender, + maybe_sender: Some(sender), result, }) } /// Handles the `Ok` case for a `Result` of attempting to put the deploy to the storage - /// component having received it from the sender. + /// component having received it from the sender (for the `Some` case) or from our own HTTP API + /// server (the `None` case). fn put_to_store( &mut self, effect_builder: EffectBuilder, deploy_hash: DeployHash, - sender: NodeId, + maybe_sender: Option, ) -> Effects { - if let Some(should_gossip) = self.table.new_complete_data(&deploy_hash, Some(sender)) { + if let Some(should_gossip) = self.table.new_complete_data(&deploy_hash, maybe_sender) { self.gossip( effect_builder, deploy_hash, @@ -496,8 +498,8 @@ where ) -> Effects { debug!(?event, "handling event"); match event { - Event::Request(DeployGossiperRequest::PutFromClient { deploy }) => { - self.handle_put_from_client(effect_builder, *deploy) + Event::DeployReceived { deploy } => { + self.handle_deploy_received(effect_builder, *deploy) } Event::GossipedTo { deploy_hash, peers } => { self.gossiped_to(effect_builder, deploy_hash, peers) @@ -530,10 +532,10 @@ where }, Event::PutToStoreResult { deploy_hash, - sender, + maybe_sender, result, } => match result { - Ok(()) => self.put_to_store(effect_builder, deploy_hash, sender), + Ok(()) => self.put_to_store(effect_builder, deploy_hash, maybe_sender), Err(error) => self.failed_to_put_to_store(deploy_hash, error), }, Event::GetFromStoreResult { diff --git a/node/src/effect.rs b/node/src/effect.rs index 4d335d079f..804f0bdfad 100644 --- a/node/src/effect.rs +++ b/node/src/effect.rs @@ -80,12 +80,11 @@ use crate::{ contract_runtime::core::engine_state::{self, genesis::GenesisResult}, storage::{self, StorageType, Value}, }, - effect::requests::DeployGossiperRequest, reactor::{EventQueueHandle, QueueKind}, types::{Deploy, ExecutedBlock, ProtoBlock}, Chainspec, }; -use announcements::NetworkAnnouncement; +use announcements::{ApiServerAnnouncement, NetworkAnnouncement}; use requests::{ContractRuntimeRequest, NetworkRequest, StorageRequest}; /// A pinned, boxed future that produces one or more events. @@ -384,6 +383,19 @@ impl EffectBuilder { .await; } + /// Announce that the HTTP API server has received a deploy. + pub(crate) async fn announce_deploy_received(self, deploy: Box) + where + REv: From, + { + self.0 + .schedule( + ApiServerAnnouncement::DeployReceived { deploy }, + QueueKind::Api, + ) + .await; + } + /// Puts the given block into the linear block store. // TODO: remove once method is used. #[allow(dead_code)] @@ -513,19 +525,6 @@ impl EffectBuilder { .await } - /// Passes the given deploy to the `DeployGossiper` component to be gossiped. - pub(crate) async fn gossip_deploy(self, deploy: Box) - where - REv: From, - { - self.0 - .schedule( - DeployGossiperRequest::PutFromClient { deploy }, - QueueKind::Regular, - ) - .await; - } - /// Passes the timestamp of a future block for which deploys are to be proposed. // TODO: Add an argument (`BlockContext`?) that contains all information necessary to select // deploys, e.g. the ancestors' deploys. diff --git a/node/src/effect/announcements.rs b/node/src/effect/announcements.rs index 0b8781f7c2..f94339c3b9 100644 --- a/node/src/effect/announcements.rs +++ b/node/src/effect/announcements.rs @@ -5,6 +5,8 @@ use std::fmt::{self, Display, Formatter}; +use crate::types::Deploy; + /// A networking layer announcement. #[derive(Debug)] pub enum NetworkAnnouncement { @@ -30,3 +32,23 @@ where } } } + +/// An HTTP API server announcement. +#[derive(Debug)] +pub enum ApiServerAnnouncement { + /// A new deploy received. + DeployReceived { + /// The received deploy. + deploy: Box, + }, +} + +impl Display for ApiServerAnnouncement { + fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { + match self { + ApiServerAnnouncement::DeployReceived { deploy } => { + write!(formatter, "api server received {}", deploy.id()) + } + } + } +} diff --git a/node/src/effect/requests.rs b/node/src/effect/requests.rs index 7dee33dfcc..f62e92d241 100644 --- a/node/src/effect/requests.rs +++ b/node/src/effect/requests.rs @@ -271,14 +271,12 @@ impl Display for DeployQueueRequest { /// transport. #[derive(Debug)] pub enum ApiRequest { - /// Submit a deploy for storing. - /// - /// Returns the deploy along with an error message if it could not be stored. + /// Submit a deploy to be announced. SubmitDeploy { - /// The deploy to be stored. + /// The deploy to be announced. deploy: Box, - /// Responder to call with the result. - responder: Responder>, + /// Responder to call. + responder: Responder<()>, }, /// Return the specified deploy if it exists, else `None`. GetDeploy { @@ -327,23 +325,3 @@ impl Display for ContractRuntimeRequest { } } } - -/// Requests for the deploy broadcaster. -#[derive(Debug)] -pub enum DeployGossiperRequest { - /// A new `Deploy` received from a client via the HTTP server component. - PutFromClient { - /// The received deploy. - deploy: Box, - }, -} - -impl Display for DeployGossiperRequest { - fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { - match self { - DeployGossiperRequest::PutFromClient { deploy, .. } => { - write!(formatter, "put from client: {}", deploy.id()) - } - } - } -} diff --git a/node/src/reactor/validator.rs b/node/src/reactor/validator.rs index 667deba558..00adff00e9 100644 --- a/node/src/reactor/validator.rs +++ b/node/src/reactor/validator.rs @@ -24,8 +24,8 @@ use crate::{ Component, }, effect::{ - announcements::NetworkAnnouncement, - requests::{ApiRequest, DeployGossiperRequest, NetworkRequest, StorageRequest}, + announcements::{ApiServerAnnouncement, NetworkAnnouncement}, + requests::{ApiRequest, NetworkRequest, StorageRequest}, EffectBuilder, Effects, }, reactor::{self, initializer, EventQueueHandle}, @@ -82,6 +82,9 @@ pub enum Event { /// Deploy gossiper event. #[from] DeployGossiper(deploy_gossiper::Event), + /// Contract runtime event. + #[from] + ContractRuntime(contract_runtime::Event), // Requests /// Network request. @@ -92,11 +95,9 @@ pub enum Event { /// Network announcement. #[from] NetworkAnnouncement(NetworkAnnouncement), - - // Contract Runtime - /// Contract runtime event. + /// API server announcement. #[from] - ContractRuntime(contract_runtime::Event), + ApiServerAnnouncement(ApiServerAnnouncement), } impl From for Event { @@ -123,12 +124,6 @@ impl From> for Event { } } -impl From for Event { - fn from(request: DeployGossiperRequest) -> Self { - Event::DeployGossiper(deploy_gossiper::Event::Request(request)) - } -} - impl Display for Event { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { @@ -138,9 +133,10 @@ impl Display for Event { Event::ApiServer(event) => write!(f, "api server: {}", event), Event::Consensus(event) => write!(f, "consensus: {}", event), Event::DeployGossiper(event) => write!(f, "deploy gossiper: {}", event), + Event::ContractRuntime(event) => write!(f, "contract runtime: {}", event), Event::NetworkRequest(req) => write!(f, "network request: {}", req), Event::NetworkAnnouncement(ann) => write!(f, "network announcement: {}", ann), - Event::ContractRuntime(event) => write!(f, "contract runtime: {}", event), + Event::ApiServerAnnouncement(ann) => write!(f, "api server announcement: {}", ann), } } } @@ -248,6 +244,7 @@ impl reactor::Reactor for Reactor { self.deploy_gossiper .handle_event(effect_builder, &mut self.rng, event), ), + Event::ContractRuntime(event) => todo!("handle contract runtime event: {:?}", event), // Requests: Event::NetworkRequest(req) => self.dispatch_event( @@ -278,7 +275,10 @@ impl reactor::Reactor for Reactor { // Any incoming message is one for the pinger. self.dispatch_event(effect_builder, reactor_event) } - Event::ContractRuntime(event) => todo!("handle contract runtime event: {:?}", event), + Event::ApiServerAnnouncement(ApiServerAnnouncement::DeployReceived { deploy }) => { + let event = deploy_gossiper::Event::DeployReceived { deploy }; + self.dispatch_event(effect_builder, Event::DeployGossiper(event)) + } } } } From a43dd0cca982d09e9d32ffd3bbdfac8e3826e294 Mon Sep 17 00:00:00 2001 From: Fraser Hutchison Date: Fri, 10 Jul 2020 18:32:33 +0100 Subject: [PATCH 2/3] NDRS-119: fix to match updated 'dispatch_event' --- node/src/reactor/validator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/src/reactor/validator.rs b/node/src/reactor/validator.rs index 2c8d6232d6..14a324e944 100644 --- a/node/src/reactor/validator.rs +++ b/node/src/reactor/validator.rs @@ -271,7 +271,7 @@ impl reactor::Reactor for Reactor { } Event::ApiServerAnnouncement(ApiServerAnnouncement::DeployReceived { deploy }) => { let event = deploy_gossiper::Event::DeployReceived { deploy }; - self.dispatch_event(effect_builder, Event::DeployGossiper(event)) + self.dispatch_event(effect_builder, rng, Event::DeployGossiper(event)) } } } From c44ab6408f15373f4e73d710363c6b2ee7fd78b4 Mon Sep 17 00:00:00 2001 From: Fraser Hutchison Date: Sun, 12 Jul 2020 11:55:46 +0100 Subject: [PATCH 3/3] NDRS-119: minor updates to deploy gossiper --- node/src/components/deploy_gossiper.rs | 6 ++++-- node/src/effect/announcements.rs | 2 ++ node/src/effect/requests.rs | 5 +++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/node/src/components/deploy_gossiper.rs b/node/src/components/deploy_gossiper.rs index 7d893a4296..2bc4e7783c 100644 --- a/node/src/components/deploy_gossiper.rs +++ b/node/src/components/deploy_gossiper.rs @@ -423,7 +423,7 @@ impl DeployGossiper { /// Handles the `Ok` case for a `Result` of attempting to put the deploy to the storage /// component having received it from the sender (for the `Some` case) or from our own HTTP API /// server (the `None` case). - fn put_to_store( + fn handle_put_to_store_success( &mut self, effect_builder: EffectBuilder, deploy_hash: DeployHash, @@ -535,7 +535,9 @@ where maybe_sender, result, } => match result { - Ok(()) => self.put_to_store(effect_builder, deploy_hash, maybe_sender), + Ok(()) => { + self.handle_put_to_store_success(effect_builder, deploy_hash, maybe_sender) + } Err(error) => self.failed_to_put_to_store(deploy_hash, error), }, Event::GetFromStoreResult { diff --git a/node/src/effect/announcements.rs b/node/src/effect/announcements.rs index f94339c3b9..74dc020eb9 100644 --- a/node/src/effect/announcements.rs +++ b/node/src/effect/announcements.rs @@ -9,6 +9,7 @@ use crate::types::Deploy; /// A networking layer announcement. #[derive(Debug)] +#[must_use] pub enum NetworkAnnouncement { /// A payload message has been received from a peer. MessageReceived { @@ -35,6 +36,7 @@ where /// An HTTP API server announcement. #[derive(Debug)] +#[must_use] pub enum ApiServerAnnouncement { /// A new deploy received. DeployReceived { diff --git a/node/src/effect/requests.rs b/node/src/effect/requests.rs index f62e92d241..1b7be2fb23 100644 --- a/node/src/effect/requests.rs +++ b/node/src/effect/requests.rs @@ -22,6 +22,7 @@ use crate::{ /// A networking request. #[derive(Debug)] +#[must_use] pub enum NetworkRequest { /// Send a message on the network to a specific peer. SendMessage { @@ -113,6 +114,7 @@ where // TODO: remove once all variants are used. /// A storage request. #[allow(dead_code)] +#[must_use] pub enum StorageRequest { /// Store given block. PutBlock { @@ -208,6 +210,7 @@ impl Display for StorageRequest { #[allow(dead_code)] // FIXME: Remove once in use. /// Deploy-queue related requests. #[derive(Debug)] +#[must_use] pub(crate) enum DeployQueueRequest { /// Add a deploy to the queue for inclusion into an upcoming block. QueueDeploy { @@ -270,6 +273,7 @@ impl Display for DeployQueueRequest { /// An API request is an abstract request that does not concern itself with serialization or /// transport. #[derive(Debug)] +#[must_use] pub enum ApiRequest { /// Submit a deploy to be announced. SubmitDeploy { @@ -304,6 +308,7 @@ impl Display for ApiRequest { /// A contract runtime request. #[derive(Debug)] +#[must_use] pub enum ContractRuntimeRequest { /// Commit genesis chainspec. CommitGenesis {