diff --git a/node/src/components/storage.rs b/node/src/components/storage.rs index 389c4fee34..aab5bd442e 100644 --- a/node/src/components/storage.rs +++ b/node/src/components/storage.rs @@ -20,7 +20,10 @@ use tokio::task; use crate::{ components::Component, - effect::{requests::StorageRequest, EffectBuilder, EffectExt, Effects}, + effect::{ + announcements::StorageAnnouncement, requests::StorageRequest, EffectBuilder, EffectExt, + Effects, + }, types::{Block, Deploy}, }; // Seems to be a false positive. @@ -99,12 +102,13 @@ impl Component for S where S: StorageType, Self: Sized + 'static, + REv: From> + Send, { type Event = StorageRequest; fn handle_event( &mut self, - _effect_builder: EffectBuilder, + effect_builder: EffectBuilder, _rng: &mut R, event: Self::Event, ) -> Effects { @@ -148,10 +152,22 @@ where StorageRequest::PutDeploy { deploy, responder } => { let deploy_store = self.deploy_store(); async move { + // Create the effect, but do not return it. + let announce_success = effect_builder.announce_deploy_stored(&deploy); + let result = task::spawn_blocking(move || deploy_store.put(*deploy)) .await .expect("should run"); - responder.respond(result).await + + let was_ok = result.is_ok(); + + // Tell the requestor the result of storing the deploy. + responder.respond(result).await; + + if was_ok { + // Now that we have stored the deploy, we also want to announce it. + announce_success.await; + } } .ignore() } diff --git a/node/src/effect.rs b/node/src/effect.rs index 01052933a2..fc8a9f6b38 100644 --- a/node/src/effect.rs +++ b/node/src/effect.rs @@ -85,7 +85,7 @@ use crate::{ types::{Deploy, ExecutedBlock, ProtoBlock}, Chainspec, }; -use announcements::{ApiServerAnnouncement, NetworkAnnouncement}; +use announcements::{ApiServerAnnouncement, NetworkAnnouncement, StorageAnnouncement}; use requests::{ContractRuntimeRequest, DeployQueueRequest, NetworkRequest, StorageRequest}; /// A pinned, boxed future that produces one or more events. @@ -371,7 +371,7 @@ impl EffectBuilder { .await } - /// Announce that a network message has been received. + /// Announces that a network message has been received. pub(crate) async fn announce_message_received(self, sender: I, payload: P) where REv: From>, @@ -397,6 +397,24 @@ impl EffectBuilder { .await; } + /// Announces that a (not necessarily new) deploy has been added to the store. + pub(crate) fn announce_deploy_stored(self, deploy: &S::Deploy) -> impl Future + where + S: StorageType, + REv: From>, + { + let deploy_hash = *deploy.id(); + let deploy_header = deploy.header().clone(); + + self.0.schedule( + StorageAnnouncement::StoredDeploy { + deploy_hash, + deploy_header, + }, + QueueKind::Regular, + ) + } + /// Puts the given block into the linear block store. // TODO: remove once method is used. #[allow(dead_code)] diff --git a/node/src/effect/announcements.rs b/node/src/effect/announcements.rs index 74dc020eb9..5c729bdc96 100644 --- a/node/src/effect/announcements.rs +++ b/node/src/effect/announcements.rs @@ -5,7 +5,10 @@ use std::fmt::{self, Display, Formatter}; -use crate::types::Deploy; +use crate::{ + components::storage::{StorageType, Value}, + types::Deploy, +}; /// A networking layer announcement. #[derive(Debug)] @@ -54,3 +57,30 @@ impl Display for ApiServerAnnouncement { } } } + +/// A storage layer announcement. +#[derive(Debug)] +pub enum StorageAnnouncement { + /// A deploy has been stored. + StoredDeploy { + /// ID or "hash" of the deploy that was added to the store. + deploy_hash: ::Id, + + /// The header of the deploy that was added to the store. + deploy_header: ::Header, + }, +} + +impl Display for StorageAnnouncement +where + S: StorageType, + ::Id: Display, +{ + fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { + match self { + StorageAnnouncement::StoredDeploy { deploy_hash, .. } => { + write!(formatter, "stored deploy {}", deploy_hash) + } + } + } +} diff --git a/node/src/reactor/initializer.rs b/node/src/reactor/initializer.rs index b9dd9473cf..cc7b4fbe89 100644 --- a/node/src/reactor/initializer.rs +++ b/node/src/reactor/initializer.rs @@ -8,6 +8,7 @@ use std::{ use derive_more::From; use rand::Rng; use thiserror::Error; +use tracing::debug; use crate::{ components::{ @@ -17,6 +18,7 @@ use crate::{ Component, }, effect::{ + announcements::StorageAnnouncement, requests::{ContractRuntimeRequest, StorageRequest}, EffectBuilder, Effects, }, @@ -35,6 +37,10 @@ pub enum Event { #[from] Storage(StorageRequest), + /// Storage announcement. + #[from] + StorageAnnouncement(StorageAnnouncement), + /// Contract runtime event. #[from] ContractRuntime(contract_runtime::Event), @@ -51,6 +57,7 @@ impl Display for Event { match self { Event::Chainspec(event) => write!(formatter, "chainspec: {}", event), Event::Storage(event) => write!(formatter, "storage: {}", event), + Event::StorageAnnouncement(ann) => write!(formatter, "storage announcement: {}", ann), Event::ContractRuntime(event) => write!(formatter, "contract runtime: {}", event), } } @@ -139,6 +146,10 @@ impl reactor::Reactor for Reactor { Event::Storage, self.storage.handle_event(effect_builder, rng, event), ), + Event::StorageAnnouncement(ann) => { + debug!(%ann, "ignoring storing announcement"); + Effects::new() + } Event::ContractRuntime(event) => reactor::wrap_effects( Event::ContractRuntime, self.contract_runtime diff --git a/node/src/reactor/validator.rs b/node/src/reactor/validator.rs index 82c2569958..6bfcc7678b 100644 --- a/node/src/reactor/validator.rs +++ b/node/src/reactor/validator.rs @@ -10,6 +10,7 @@ use std::fmt::{self, Display, Formatter}; use derive_more::From; use rand::Rng; use serde::{Deserialize, Serialize}; +use tracing::warn; use crate::{ components::{ @@ -23,7 +24,7 @@ use crate::{ Component, }, effect::{ - announcements::{ApiServerAnnouncement, NetworkAnnouncement}, + announcements::{ApiServerAnnouncement, NetworkAnnouncement, StorageAnnouncement}, requests::{ ApiRequest, ContractRuntimeRequest, DeployQueueRequest, NetworkRequest, StorageRequest, }, @@ -102,6 +103,9 @@ pub enum Event { /// Network announcement. #[from] NetworkAnnouncement(NetworkAnnouncement), + /// Storage announcement. + #[from] + StorageAnnouncement(StorageAnnouncement), /// API server announcement. #[from] ApiServerAnnouncement(ApiServerAnnouncement), @@ -152,6 +156,7 @@ impl Display for Event { Event::DeployQueueRequest(req) => write!(f, "deploy queue request: {}", req), Event::NetworkAnnouncement(ann) => write!(f, "network announcement: {}", ann), Event::ApiServerAnnouncement(ann) => write!(f, "api server announcement: {}", ann), + Event::StorageAnnouncement(ann) => write!(f, "storage announcement: {}", ann), } } } @@ -299,6 +304,10 @@ impl reactor::Reactor for Reactor { let event = deploy_gossiper::Event::DeployReceived { deploy }; self.dispatch_event(effect_builder, rng, Event::DeployGossiper(event)) } + Event::StorageAnnouncement(ann) => { + warn!(%ann, "dropped storage announcement"); + Effects::new() + } } } }