diff --git a/node/src/components/api_server.rs b/node/src/components/api_server.rs index afd3367d4f..18525414ee 100644 --- a/node/src/components/api_server.rs +++ b/node/src/components/api_server.rs @@ -40,7 +40,8 @@ use crate::{ components::storage::{self, Storage}, crypto::hash::Digest, effect::{ - requests::{ApiRequest, DeployGossiperRequest, StorageRequest}, + announcements::ApiServerAnnouncement, + requests::{ApiRequest, StorageRequest}, EffectBuilder, EffectExt, Effects, }, reactor::QueueKind, @@ -66,7 +67,7 @@ impl ApiServer { impl Component for ApiServer where - REv: From> + From + Send, + REv: From> + From + Send, { type Event = Event; @@ -77,13 +78,11 @@ where event: Self::Event, ) -> Effects { match event { - Event::ApiRequest(ApiRequest::SubmitDeploy { deploy, responder }) => effect_builder - .put_deploy_to_storage(*deploy.clone()) - .event(move |result| Event::PutDeployResult { - deploy, - result, - main_responder: responder, - }), + Event::ApiRequest(ApiRequest::SubmitDeploy { deploy, responder }) => { + let mut effects = effect_builder.announce_deploy_received(deploy).ignore(); + effects.extend(responder.respond(()).ignore()); + effects + } Event::ApiRequest(ApiRequest::GetDeploy { hash, responder }) => effect_builder .get_deploy_from_storage(hash) .event(move |result| Event::GetDeployResult { @@ -97,18 +96,6 @@ where result: Box::new(result), main_responder: responder, }), - Event::PutDeployResult { - deploy, - result, - main_responder, - } => { - let cloned_deploy = deploy.clone(); - let mut effects = main_responder - .respond(result.map_err(|error| (*deploy, error))) - .ignore(); - effects.extend(effect_builder.gossip_deploy(cloned_deploy).ignore()); - effects - } Event::GetDeployResult { hash: _, result, @@ -187,7 +174,7 @@ where } }; - let result = effect_builder + effect_builder .make_request( |responder| ApiRequest::SubmitDeploy { deploy: Box::new(deploy), @@ -197,17 +184,8 @@ where ) .await; - match result { - Ok(()) => { - let json = reply::json(&""); - Ok(reply::with_status(json, StatusCode::OK)) - } - Err((deploy, error)) => { - let error_reply = format!("Failed to store {}: {}", deploy.id(), error); - let json = reply::json(&error_reply); - Ok(reply::with_status(json, StatusCode::BAD_REQUEST)) - } - } + let json = reply::json(&""); + Ok(reply::with_status(json, StatusCode::OK)) } async fn parse_get_request( diff --git a/node/src/components/api_server/event.rs b/node/src/components/api_server/event.rs index e05106842c..a5f695e44f 100644 --- a/node/src/components/api_server/event.rs +++ b/node/src/components/api_server/event.rs @@ -12,11 +12,6 @@ use crate::{ pub enum Event { #[from] ApiRequest(ApiRequest), - PutDeployResult { - deploy: Box, - result: storage::Result<()>, - main_responder: Responder>, - }, GetDeployResult { hash: DeployHash, result: Box>, @@ -32,9 +27,6 @@ impl Display for Event { fn fmt(&self, formatter: &mut Formatter) -> fmt::Result { match self { Event::ApiRequest(request) => write!(formatter, "{}", request), - Event::PutDeployResult { result, .. } => { - write!(formatter, "PutDeployResult: {:?}", result) - } Event::GetDeployResult { hash, result, .. } => { write!(formatter, "GetDeployResult for {}: {:?}", hash, result) } diff --git a/node/src/components/chainspec_handler/config.rs b/node/src/components/chainspec_handler/config.rs index df391028e9..692fedede1 100644 --- a/node/src/components/chainspec_handler/config.rs +++ b/node/src/components/chainspec_handler/config.rs @@ -350,8 +350,7 @@ fn string_to_array(input: String) -> Result<[u8; ACCOUNT_HASH_LENGTH], Error> { mod tests { use std::path::PathBuf; - use super::chainspec::rewrite_with_absolute_paths; - use super::*; + use super::{chainspec::rewrite_with_absolute_paths, *}; const PRODUCTION_DIR: &str = "resources/production"; const EXAMPLE_DIR: &str = "resources/example"; diff --git a/node/src/components/deploy_gossiper.rs b/node/src/components/deploy_gossiper.rs index 340a0782dd..2bc4e7783c 100644 --- a/node/src/components/deploy_gossiper.rs +++ b/node/src/components/deploy_gossiper.rs @@ -17,7 +17,7 @@ use crate::{ Component, }, effect::{ - requests::{DeployGossiperRequest, NetworkRequest, StorageRequest}, + requests::{NetworkRequest, StorageRequest}, EffectBuilder, EffectExt, Effects, }, types::{Deploy, DeployHash}, @@ -38,8 +38,8 @@ impl ReactorEvent for T where /// `DeployGossiper` events. #[derive(Debug)] pub enum Event { - /// A request to begin gossiping a new deploy received from a client. - Request(DeployGossiperRequest), + /// A new deploy has been received to be gossiped. + DeployReceived { deploy: Box }, /// The network component gossiped to the included peers. GossipedTo { deploy_hash: DeployHash, @@ -63,7 +63,7 @@ pub enum Event { /// result is `Ok`, the deploy hash should be gossiped onwards. PutToStoreResult { deploy_hash: DeployHash, - sender: NodeId, + maybe_sender: Option, result: storage::Result<()>, }, /// The result of the `DeployGossiper` getting a deploy from the storage component. If the @@ -78,7 +78,9 @@ pub enum Event { impl Display for Event { fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { match self { - Event::Request(request) => write!(formatter, "{}", request), + Event::DeployReceived { deploy } => { + write!(formatter, "new deploy received: {}", deploy.id()) + } Event::GossipedTo { deploy_hash, peers } => write!( formatter, "gossiped {} to {}", @@ -178,22 +180,21 @@ impl DeployGossiper { } } - /// Handles a new deploy received from a client by starting to gossip it. - fn handle_put_from_client( + /// Handles a new deploy received from somewhere other than a peer (e.g. the HTTP API server). + fn handle_deploy_received( &mut self, effect_builder: EffectBuilder, deploy: Deploy, ) -> Effects { - if let Some(should_gossip) = self.table.new_complete_data(deploy.id(), None) { - self.gossip( - effect_builder, - *deploy.id(), - should_gossip.count, - should_gossip.exclude_peers, - ) - } else { - Effects::new() // we already completed gossiping this deploy - } + // Put the deploy to the storage component. + let deploy_hash = *deploy.id(); + effect_builder + .put_deploy_to_storage(deploy) + .event(move |result| Event::PutToStoreResult { + deploy_hash, + maybe_sender: None, + result, + }) } /// Gossips the given deploy hash to `count` random peers excluding the indicated ones. @@ -408,26 +409,27 @@ impl DeployGossiper { deploy: Deploy, sender: NodeId, ) -> Effects { - // Put the deploy to the storage component, and potentially start gossiping about it. + // Put the deploy to the storage component. let deploy_hash = *deploy.id(); effect_builder .put_deploy_to_storage(deploy) .event(move |result| Event::PutToStoreResult { deploy_hash, - sender, + maybe_sender: Some(sender), result, }) } /// Handles the `Ok` case for a `Result` of attempting to put the deploy to the storage - /// component having received it from the sender. - fn put_to_store( + /// component having received it from the sender (for the `Some` case) or from our own HTTP API + /// server (the `None` case). + fn handle_put_to_store_success( &mut self, effect_builder: EffectBuilder, deploy_hash: DeployHash, - sender: NodeId, + maybe_sender: Option, ) -> Effects { - if let Some(should_gossip) = self.table.new_complete_data(&deploy_hash, Some(sender)) { + if let Some(should_gossip) = self.table.new_complete_data(&deploy_hash, maybe_sender) { self.gossip( effect_builder, deploy_hash, @@ -496,8 +498,8 @@ where ) -> Effects { debug!(?event, "handling event"); match event { - Event::Request(DeployGossiperRequest::PutFromClient { deploy }) => { - self.handle_put_from_client(effect_builder, *deploy) + Event::DeployReceived { deploy } => { + self.handle_deploy_received(effect_builder, *deploy) } Event::GossipedTo { deploy_hash, peers } => { self.gossiped_to(effect_builder, deploy_hash, peers) @@ -530,10 +532,12 @@ where }, Event::PutToStoreResult { deploy_hash, - sender, + maybe_sender, result, } => match result { - Ok(()) => self.put_to_store(effect_builder, deploy_hash, sender), + Ok(()) => { + self.handle_put_to_store_success(effect_builder, deploy_hash, maybe_sender) + } Err(error) => self.failed_to_put_to_store(deploy_hash, error), }, Event::GetFromStoreResult { diff --git a/node/src/effect.rs b/node/src/effect.rs index 4d335d079f..804f0bdfad 100644 --- a/node/src/effect.rs +++ b/node/src/effect.rs @@ -80,12 +80,11 @@ use crate::{ contract_runtime::core::engine_state::{self, genesis::GenesisResult}, storage::{self, StorageType, Value}, }, - effect::requests::DeployGossiperRequest, reactor::{EventQueueHandle, QueueKind}, types::{Deploy, ExecutedBlock, ProtoBlock}, Chainspec, }; -use announcements::NetworkAnnouncement; +use announcements::{ApiServerAnnouncement, NetworkAnnouncement}; use requests::{ContractRuntimeRequest, NetworkRequest, StorageRequest}; /// A pinned, boxed future that produces one or more events. @@ -384,6 +383,19 @@ impl EffectBuilder { .await; } + /// Announce that the HTTP API server has received a deploy. + pub(crate) async fn announce_deploy_received(self, deploy: Box) + where + REv: From, + { + self.0 + .schedule( + ApiServerAnnouncement::DeployReceived { deploy }, + QueueKind::Api, + ) + .await; + } + /// Puts the given block into the linear block store. // TODO: remove once method is used. #[allow(dead_code)] @@ -513,19 +525,6 @@ impl EffectBuilder { .await } - /// Passes the given deploy to the `DeployGossiper` component to be gossiped. - pub(crate) async fn gossip_deploy(self, deploy: Box) - where - REv: From, - { - self.0 - .schedule( - DeployGossiperRequest::PutFromClient { deploy }, - QueueKind::Regular, - ) - .await; - } - /// Passes the timestamp of a future block for which deploys are to be proposed. // TODO: Add an argument (`BlockContext`?) that contains all information necessary to select // deploys, e.g. the ancestors' deploys. diff --git a/node/src/effect/announcements.rs b/node/src/effect/announcements.rs index 0b8781f7c2..74dc020eb9 100644 --- a/node/src/effect/announcements.rs +++ b/node/src/effect/announcements.rs @@ -5,8 +5,11 @@ use std::fmt::{self, Display, Formatter}; +use crate::types::Deploy; + /// A networking layer announcement. #[derive(Debug)] +#[must_use] pub enum NetworkAnnouncement { /// A payload message has been received from a peer. MessageReceived { @@ -30,3 +33,24 @@ where } } } + +/// An HTTP API server announcement. +#[derive(Debug)] +#[must_use] +pub enum ApiServerAnnouncement { + /// A new deploy received. + DeployReceived { + /// The received deploy. + deploy: Box, + }, +} + +impl Display for ApiServerAnnouncement { + fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { + match self { + ApiServerAnnouncement::DeployReceived { deploy } => { + write!(formatter, "api server received {}", deploy.id()) + } + } + } +} diff --git a/node/src/effect/requests.rs b/node/src/effect/requests.rs index 7dee33dfcc..1b7be2fb23 100644 --- a/node/src/effect/requests.rs +++ b/node/src/effect/requests.rs @@ -22,6 +22,7 @@ use crate::{ /// A networking request. #[derive(Debug)] +#[must_use] pub enum NetworkRequest { /// Send a message on the network to a specific peer. SendMessage { @@ -113,6 +114,7 @@ where // TODO: remove once all variants are used. /// A storage request. #[allow(dead_code)] +#[must_use] pub enum StorageRequest { /// Store given block. PutBlock { @@ -208,6 +210,7 @@ impl Display for StorageRequest { #[allow(dead_code)] // FIXME: Remove once in use. /// Deploy-queue related requests. #[derive(Debug)] +#[must_use] pub(crate) enum DeployQueueRequest { /// Add a deploy to the queue for inclusion into an upcoming block. QueueDeploy { @@ -270,15 +273,14 @@ impl Display for DeployQueueRequest { /// An API request is an abstract request that does not concern itself with serialization or /// transport. #[derive(Debug)] +#[must_use] pub enum ApiRequest { - /// Submit a deploy for storing. - /// - /// Returns the deploy along with an error message if it could not be stored. + /// Submit a deploy to be announced. SubmitDeploy { - /// The deploy to be stored. + /// The deploy to be announced. deploy: Box, - /// Responder to call with the result. - responder: Responder>, + /// Responder to call. + responder: Responder<()>, }, /// Return the specified deploy if it exists, else `None`. GetDeploy { @@ -306,6 +308,7 @@ impl Display for ApiRequest { /// A contract runtime request. #[derive(Debug)] +#[must_use] pub enum ContractRuntimeRequest { /// Commit genesis chainspec. CommitGenesis { @@ -327,23 +330,3 @@ impl Display for ContractRuntimeRequest { } } } - -/// Requests for the deploy broadcaster. -#[derive(Debug)] -pub enum DeployGossiperRequest { - /// A new `Deploy` received from a client via the HTTP server component. - PutFromClient { - /// The received deploy. - deploy: Box, - }, -} - -impl Display for DeployGossiperRequest { - fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { - match self { - DeployGossiperRequest::PutFromClient { deploy, .. } => { - write!(formatter, "put from client: {}", deploy.id()) - } - } - } -} diff --git a/node/src/reactor/validator.rs b/node/src/reactor/validator.rs index ba8be6a045..3c535ff984 100644 --- a/node/src/reactor/validator.rs +++ b/node/src/reactor/validator.rs @@ -22,8 +22,8 @@ use crate::{ Component, }, effect::{ - announcements::NetworkAnnouncement, - requests::{ApiRequest, DeployGossiperRequest, NetworkRequest, StorageRequest}, + announcements::{ApiServerAnnouncement, NetworkAnnouncement}, + requests::{ApiRequest, NetworkRequest, StorageRequest}, EffectBuilder, Effects, }, reactor::{self, initializer, EventQueueHandle}, @@ -80,6 +80,9 @@ pub enum Event { /// Deploy gossiper event. #[from] DeployGossiper(deploy_gossiper::Event), + /// Contract runtime event. + #[from] + ContractRuntime(contract_runtime::Event), // Requests /// Network request. @@ -90,11 +93,9 @@ pub enum Event { /// Network announcement. #[from] NetworkAnnouncement(NetworkAnnouncement), - - // Contract Runtime - /// Contract runtime event. + /// API server announcement. #[from] - ContractRuntime(contract_runtime::Event), + ApiServerAnnouncement(ApiServerAnnouncement), } impl From for Event { @@ -121,12 +122,6 @@ impl From> for Event { } } -impl From for Event { - fn from(request: DeployGossiperRequest) -> Self { - Event::DeployGossiper(deploy_gossiper::Event::Request(request)) - } -} - impl Display for Event { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { @@ -136,9 +131,10 @@ impl Display for Event { Event::ApiServer(event) => write!(f, "api server: {}", event), Event::Consensus(event) => write!(f, "consensus: {}", event), Event::DeployGossiper(event) => write!(f, "deploy gossiper: {}", event), + Event::ContractRuntime(event) => write!(f, "contract runtime: {}", event), Event::NetworkRequest(req) => write!(f, "network request: {}", req), Event::NetworkAnnouncement(ann) => write!(f, "network announcement: {}", ann), - Event::ContractRuntime(event) => write!(f, "contract runtime: {}", event), + Event::ApiServerAnnouncement(ann) => write!(f, "api server announcement: {}", ann), } } } @@ -237,6 +233,7 @@ impl reactor::Reactor for Reactor { self.deploy_gossiper .handle_event(effect_builder, rng, event), ), + Event::ContractRuntime(event) => todo!("handle contract runtime event: {:?}", event), // Requests: Event::NetworkRequest(req) => self.dispatch_event( @@ -268,7 +265,10 @@ impl reactor::Reactor for Reactor { // Any incoming message is one for the pinger. self.dispatch_event(effect_builder, rng, reactor_event) } - Event::ContractRuntime(event) => todo!("handle contract runtime event: {:?}", event), + Event::ApiServerAnnouncement(ApiServerAnnouncement::DeployReceived { deploy }) => { + let event = deploy_gossiper::Event::DeployReceived { deploy }; + self.dispatch_event(effect_builder, rng, Event::DeployGossiper(event)) + } } } }