From a853d46ada9c9cb9ddd2932f8f5ad94bfa19e99d Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Wed, 15 Jul 2020 18:30:26 +0200 Subject: [PATCH 1/8] Create a `StorageAnnouncement::StoredDeploy` --- node/src/effect.rs | 17 +++++++++++++++-- node/src/effect/announcements.rs | 9 +++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/node/src/effect.rs b/node/src/effect.rs index 01052933a2..57b4c9ced1 100644 --- a/node/src/effect.rs +++ b/node/src/effect.rs @@ -85,7 +85,7 @@ use crate::{ types::{Deploy, ExecutedBlock, ProtoBlock}, Chainspec, }; -use announcements::{ApiServerAnnouncement, NetworkAnnouncement}; +use announcements::{ApiServerAnnouncement, NetworkAnnouncement, StorageAnnouncement}; use requests::{ContractRuntimeRequest, DeployQueueRequest, NetworkRequest, StorageRequest}; /// A pinned, boxed future that produces one or more events. @@ -371,7 +371,7 @@ impl EffectBuilder { .await } - /// Announce that a network message has been received. + /// Announces that a network message has been received. pub(crate) async fn announce_message_received(self, sender: I, payload: P) where REv: From>, @@ -397,6 +397,19 @@ impl EffectBuilder { .await; } + /// Announces that a (not necessarily new) deploy has been added to the store. + pub(crate) async fn announce_deploy_stored(self, deploy_hash: D) + where + REv: From>, + { + self.0 + .schedule( + StorageAnnouncement::StoredDeploy { deploy_hash }, + QueueKind::Regular, + ) + .await; + } + /// Puts the given block into the linear block store. // TODO: remove once method is used. #[allow(dead_code)] diff --git a/node/src/effect/announcements.rs b/node/src/effect/announcements.rs index 74dc020eb9..833b965796 100644 --- a/node/src/effect/announcements.rs +++ b/node/src/effect/announcements.rs @@ -54,3 +54,12 @@ impl Display for ApiServerAnnouncement { } } } +/// A storage layer announcement. +#[derive(Debug)] +pub enum StorageAnnouncement { + /// A deploy has been stored. + StoredDeploy { + /// ID or "hash" of the deploy that was added to the store. + deploy_hash: D, + }, +} From b3634e37f5f8727009163634085a982de8a916c5 Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Wed, 15 Jul 2020 20:14:44 +0200 Subject: [PATCH 2/8] Announce stored deploys by hash --- node/src/components/storage.rs | 17 ++++++++++++++--- node/src/effect/announcements.rs | 13 +++++++++++++ node/src/reactor/initializer.rs | 14 ++++++++++++++ node/src/reactor/validator.rs | 13 +++++++++++-- 4 files changed, 52 insertions(+), 5 deletions(-) diff --git a/node/src/components/storage.rs b/node/src/components/storage.rs index 389c4fee34..3b1ce27928 100644 --- a/node/src/components/storage.rs +++ b/node/src/components/storage.rs @@ -20,7 +20,10 @@ use tokio::task; use crate::{ components::Component, - effect::{requests::StorageRequest, EffectBuilder, EffectExt, Effects}, + effect::{ + announcements::StorageAnnouncement, requests::StorageRequest, EffectBuilder, EffectExt, + Effects, + }, types::{Block, Deploy}, }; // Seems to be a false positive. @@ -99,12 +102,13 @@ impl Component for S where S: StorageType, Self: Sized + 'static, + REv: From::Deploy as Value>::Id>> + Send, { type Event = StorageRequest; fn handle_event( &mut self, - _effect_builder: EffectBuilder, + effect_builder: EffectBuilder, _rng: &mut R, event: Self::Event, ) -> Effects { @@ -148,10 +152,17 @@ where StorageRequest::PutDeploy { deploy, responder } => { let deploy_store = self.deploy_store(); async move { + let deploy_id = *deploy.id(); + let result = task::spawn_blocking(move || deploy_store.put(*deploy)) .await .expect("should run"); - responder.respond(result).await + + // Tell the requestor that saved the deploy we're good. + responder.respond(result).await; + + // Now that we have stored the deploy, we also want to announce it. + effect_builder.announce_deploy_stored(deploy_id).await; } .ignore() } diff --git a/node/src/effect/announcements.rs b/node/src/effect/announcements.rs index 833b965796..d33887c35c 100644 --- a/node/src/effect/announcements.rs +++ b/node/src/effect/announcements.rs @@ -63,3 +63,16 @@ pub enum StorageAnnouncement { deploy_hash: D, }, } + +impl Display for StorageAnnouncement +where + D: Display, +{ + fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { + match self { + StorageAnnouncement::StoredDeploy { deploy_hash } => { + write!(formatter, "stored deploy {}", deploy_hash) + } + } + } +} diff --git a/node/src/reactor/initializer.rs b/node/src/reactor/initializer.rs index b9dd9473cf..136e6ad8c5 100644 --- a/node/src/reactor/initializer.rs +++ b/node/src/reactor/initializer.rs @@ -8,6 +8,7 @@ use std::{ use derive_more::From; use rand::Rng; use thiserror::Error; +use tracing::debug; use crate::{ components::{ @@ -17,10 +18,12 @@ use crate::{ Component, }, effect::{ + announcements::StorageAnnouncement, requests::{ContractRuntimeRequest, StorageRequest}, EffectBuilder, Effects, }, reactor::{self, validator, EventQueueHandle}, + types::DeployHash, }; /// Top-level event for the reactor. @@ -35,6 +38,10 @@ pub enum Event { #[from] Storage(StorageRequest), + /// Storage announcement. + #[from] + StorageAnnouncement(StorageAnnouncement), + /// Contract runtime event. #[from] ContractRuntime(contract_runtime::Event), @@ -51,6 +58,9 @@ impl Display for Event { match self { Event::Chainspec(event) => write!(formatter, "chainspec: {}", event), Event::Storage(event) => write!(formatter, "storage: {}", event), + Event::StorageAnnouncement(announcement) => { + write!(formatter, "storage announcement: {}", announcement) + } Event::ContractRuntime(event) => write!(formatter, "contract runtime: {}", event), } } @@ -139,6 +149,10 @@ impl reactor::Reactor for Reactor { Event::Storage, self.storage.handle_event(effect_builder, rng, event), ), + Event::StorageAnnouncement(announcement) => { + debug!(%announcement, "ignoring storing announcement"); + Effects::new() + } Event::ContractRuntime(event) => reactor::wrap_effects( Event::ContractRuntime, self.contract_runtime diff --git a/node/src/reactor/validator.rs b/node/src/reactor/validator.rs index 82c2569958..6a3330ad59 100644 --- a/node/src/reactor/validator.rs +++ b/node/src/reactor/validator.rs @@ -10,6 +10,7 @@ use std::fmt::{self, Display, Formatter}; use derive_more::From; use rand::Rng; use serde::{Deserialize, Serialize}; +use tracing::warn; use crate::{ components::{ @@ -23,7 +24,7 @@ use crate::{ Component, }, effect::{ - announcements::{ApiServerAnnouncement, NetworkAnnouncement}, + announcements::{ApiServerAnnouncement, NetworkAnnouncement, StorageAnnouncement}, requests::{ ApiRequest, ContractRuntimeRequest, DeployQueueRequest, NetworkRequest, StorageRequest, }, @@ -31,7 +32,7 @@ use crate::{ }, reactor::{self, initializer, EventQueueHandle}, small_network::{self, NodeId}, - types::Timestamp, + types::{DeployHash, Timestamp}, SmallNetwork, }; pub use config::Config; @@ -102,6 +103,9 @@ pub enum Event { /// Network announcement. #[from] NetworkAnnouncement(NetworkAnnouncement), + /// Storage announcement. + #[from] + StorageAnnouncement(StorageAnnouncement), /// API server announcement. #[from] ApiServerAnnouncement(ApiServerAnnouncement), @@ -152,6 +156,7 @@ impl Display for Event { Event::DeployQueueRequest(req) => write!(f, "deploy queue request: {}", req), Event::NetworkAnnouncement(ann) => write!(f, "network announcement: {}", ann), Event::ApiServerAnnouncement(ann) => write!(f, "api server announcement: {}", ann), + Event::StorageAnnouncement(ann) => write!(f, "storage announcement: {}", ann), } } } @@ -271,6 +276,10 @@ impl reactor::Reactor for Reactor { Event::DeployQueueRequest(req) => { self.dispatch_event(effect_builder, rng, Event::DeployQueue(req.into())) } + Event::StorageAnnouncement(ann) => { + warn!(%ann, "dropped storage announcement"); + Effects::new() + } // Announcements: Event::NetworkAnnouncement(NetworkAnnouncement::MessageReceived { From 9344e2fa2a83363c7fbf51d0c6d725bc03c7d7e7 Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Thu, 16 Jul 2020 12:43:09 +0200 Subject: [PATCH 3/8] Fixed typos in formatting/comments --- node/src/components/storage.rs | 2 +- node/src/effect/announcements.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/node/src/components/storage.rs b/node/src/components/storage.rs index 3b1ce27928..c437702360 100644 --- a/node/src/components/storage.rs +++ b/node/src/components/storage.rs @@ -158,7 +158,7 @@ where .await .expect("should run"); - // Tell the requestor that saved the deploy we're good. + // Tell the requestor the result of storing the deploy. responder.respond(result).await; // Now that we have stored the deploy, we also want to announce it. diff --git a/node/src/effect/announcements.rs b/node/src/effect/announcements.rs index d33887c35c..fe1a71f591 100644 --- a/node/src/effect/announcements.rs +++ b/node/src/effect/announcements.rs @@ -54,6 +54,7 @@ impl Display for ApiServerAnnouncement { } } } + /// A storage layer announcement. #[derive(Debug)] pub enum StorageAnnouncement { From 9992b2ec1bdc7674b2f55cd1ba7871a9983662d6 Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Thu, 16 Jul 2020 12:43:22 +0200 Subject: [PATCH 4/8] Only announce new deploys when they have been stored successfully --- node/src/components/storage.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/node/src/components/storage.rs b/node/src/components/storage.rs index c437702360..63820e2d62 100644 --- a/node/src/components/storage.rs +++ b/node/src/components/storage.rs @@ -161,8 +161,10 @@ where // Tell the requestor the result of storing the deploy. responder.respond(result).await; - // Now that we have stored the deploy, we also want to announce it. - effect_builder.announce_deploy_stored(deploy_id).await; + if result.is_ok() { + // Now that we have stored the deploy, we also want to announce it. + effect_builder.announce_deploy_stored(deploy_id).await; + } } .ignore() } From 34a472483224b122780c2c2c7f7ddca737d63603 Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Thu, 16 Jul 2020 12:45:18 +0200 Subject: [PATCH 5/8] Fix ordering of match arms in validator event processing --- node/src/reactor/validator.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/node/src/reactor/validator.rs b/node/src/reactor/validator.rs index 6a3330ad59..5dc46a1035 100644 --- a/node/src/reactor/validator.rs +++ b/node/src/reactor/validator.rs @@ -276,10 +276,6 @@ impl reactor::Reactor for Reactor { Event::DeployQueueRequest(req) => { self.dispatch_event(effect_builder, rng, Event::DeployQueue(req.into())) } - Event::StorageAnnouncement(ann) => { - warn!(%ann, "dropped storage announcement"); - Effects::new() - } // Announcements: Event::NetworkAnnouncement(NetworkAnnouncement::MessageReceived { @@ -308,6 +304,10 @@ impl reactor::Reactor for Reactor { let event = deploy_gossiper::Event::DeployReceived { deploy }; self.dispatch_event(effect_builder, rng, Event::DeployGossiper(event)) } + Event::StorageAnnouncement(ann) => { + warn!(%ann, "dropped storage announcement"); + Effects::new() + } } } } From 40a52cb1025e8dfec10040200a80befd03a3e46c Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Thu, 16 Jul 2020 14:38:24 +0200 Subject: [PATCH 6/8] Make storage announcement generic over storage trait instead of just the deploy hash --- node/src/components/storage.rs | 6 ++++-- node/src/effect.rs | 5 +++-- node/src/effect/announcements.rs | 14 +++++++++----- node/src/reactor/initializer.rs | 11 ++++------- node/src/reactor/validator.rs | 4 ++-- 5 files changed, 22 insertions(+), 18 deletions(-) diff --git a/node/src/components/storage.rs b/node/src/components/storage.rs index 63820e2d62..51fa4b0b9f 100644 --- a/node/src/components/storage.rs +++ b/node/src/components/storage.rs @@ -102,7 +102,7 @@ impl Component for S where S: StorageType, Self: Sized + 'static, - REv: From::Deploy as Value>::Id>> + Send, + REv: From> + Send, { type Event = StorageRequest; @@ -158,10 +158,12 @@ where .await .expect("should run"); + let was_ok = result.is_ok(); + // Tell the requestor the result of storing the deploy. responder.respond(result).await; - if result.is_ok() { + if was_ok { // Now that we have stored the deploy, we also want to announce it. effect_builder.announce_deploy_stored(deploy_id).await; } diff --git a/node/src/effect.rs b/node/src/effect.rs index 57b4c9ced1..a35d0f6ca1 100644 --- a/node/src/effect.rs +++ b/node/src/effect.rs @@ -398,9 +398,10 @@ impl EffectBuilder { } /// Announces that a (not necessarily new) deploy has been added to the store. - pub(crate) async fn announce_deploy_stored(self, deploy_hash: D) + pub(crate) async fn announce_deploy_stored(self, deploy_hash: ::Id) where - REv: From>, + S: StorageType, + REv: From>, { self.0 .schedule( diff --git a/node/src/effect/announcements.rs b/node/src/effect/announcements.rs index fe1a71f591..1041551726 100644 --- a/node/src/effect/announcements.rs +++ b/node/src/effect/announcements.rs @@ -5,7 +5,10 @@ use std::fmt::{self, Display, Formatter}; -use crate::types::Deploy; +use crate::{ + components::storage::{StorageType, Value}, + types::Deploy, +}; /// A networking layer announcement. #[derive(Debug)] @@ -57,17 +60,18 @@ impl Display for ApiServerAnnouncement { /// A storage layer announcement. #[derive(Debug)] -pub enum StorageAnnouncement { +pub enum StorageAnnouncement { /// A deploy has been stored. StoredDeploy { /// ID or "hash" of the deploy that was added to the store. - deploy_hash: D, + deploy_hash: ::Id, }, } -impl Display for StorageAnnouncement +impl Display for StorageAnnouncement where - D: Display, + S: StorageType, + ::Id: Display, { fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { match self { diff --git a/node/src/reactor/initializer.rs b/node/src/reactor/initializer.rs index 136e6ad8c5..cc7b4fbe89 100644 --- a/node/src/reactor/initializer.rs +++ b/node/src/reactor/initializer.rs @@ -23,7 +23,6 @@ use crate::{ EffectBuilder, Effects, }, reactor::{self, validator, EventQueueHandle}, - types::DeployHash, }; /// Top-level event for the reactor. @@ -40,7 +39,7 @@ pub enum Event { /// Storage announcement. #[from] - StorageAnnouncement(StorageAnnouncement), + StorageAnnouncement(StorageAnnouncement), /// Contract runtime event. #[from] @@ -58,9 +57,7 @@ impl Display for Event { match self { Event::Chainspec(event) => write!(formatter, "chainspec: {}", event), Event::Storage(event) => write!(formatter, "storage: {}", event), - Event::StorageAnnouncement(announcement) => { - write!(formatter, "storage announcement: {}", announcement) - } + Event::StorageAnnouncement(ann) => write!(formatter, "storage announcement: {}", ann), Event::ContractRuntime(event) => write!(formatter, "contract runtime: {}", event), } } @@ -149,8 +146,8 @@ impl reactor::Reactor for Reactor { Event::Storage, self.storage.handle_event(effect_builder, rng, event), ), - Event::StorageAnnouncement(announcement) => { - debug!(%announcement, "ignoring storing announcement"); + Event::StorageAnnouncement(ann) => { + debug!(%ann, "ignoring storing announcement"); Effects::new() } Event::ContractRuntime(event) => reactor::wrap_effects( diff --git a/node/src/reactor/validator.rs b/node/src/reactor/validator.rs index 5dc46a1035..6bfcc7678b 100644 --- a/node/src/reactor/validator.rs +++ b/node/src/reactor/validator.rs @@ -32,7 +32,7 @@ use crate::{ }, reactor::{self, initializer, EventQueueHandle}, small_network::{self, NodeId}, - types::{DeployHash, Timestamp}, + types::Timestamp, SmallNetwork, }; pub use config::Config; @@ -105,7 +105,7 @@ pub enum Event { NetworkAnnouncement(NetworkAnnouncement), /// Storage announcement. #[from] - StorageAnnouncement(StorageAnnouncement), + StorageAnnouncement(StorageAnnouncement), /// API server announcement. #[from] ApiServerAnnouncement(ApiServerAnnouncement), From 7a20317bb26abe4e309c401fe03e0116ccd7d94b Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Thu, 16 Jul 2020 14:42:22 +0200 Subject: [PATCH 7/8] Also store deploy header --- node/src/components/storage.rs | 5 ++++- node/src/effect.rs | 12 +++++++++--- node/src/effect/announcements.rs | 5 ++++- 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/node/src/components/storage.rs b/node/src/components/storage.rs index 51fa4b0b9f..6060532d1e 100644 --- a/node/src/components/storage.rs +++ b/node/src/components/storage.rs @@ -153,6 +153,7 @@ where let deploy_store = self.deploy_store(); async move { let deploy_id = *deploy.id(); + let deploy_header = deploy.header().clone(); let result = task::spawn_blocking(move || deploy_store.put(*deploy)) .await @@ -165,7 +166,9 @@ where if was_ok { // Now that we have stored the deploy, we also want to announce it. - effect_builder.announce_deploy_stored(deploy_id).await; + effect_builder + .announce_deploy_stored(deploy_id, deploy_header) + .await; } } .ignore() diff --git a/node/src/effect.rs b/node/src/effect.rs index a35d0f6ca1..e55a615428 100644 --- a/node/src/effect.rs +++ b/node/src/effect.rs @@ -398,14 +398,20 @@ impl EffectBuilder { } /// Announces that a (not necessarily new) deploy has been added to the store. - pub(crate) async fn announce_deploy_stored(self, deploy_hash: ::Id) - where + pub(crate) async fn announce_deploy_stored( + self, + deploy_hash: ::Id, + deploy_header: ::Header, + ) where S: StorageType, REv: From>, { self.0 .schedule( - StorageAnnouncement::StoredDeploy { deploy_hash }, + StorageAnnouncement::StoredDeploy { + deploy_hash, + deploy_header, + }, QueueKind::Regular, ) .await; diff --git a/node/src/effect/announcements.rs b/node/src/effect/announcements.rs index 1041551726..5c729bdc96 100644 --- a/node/src/effect/announcements.rs +++ b/node/src/effect/announcements.rs @@ -65,6 +65,9 @@ pub enum StorageAnnouncement { StoredDeploy { /// ID or "hash" of the deploy that was added to the store. deploy_hash: ::Id, + + /// The header of the deploy that was added to the store. + deploy_header: ::Header, }, } @@ -75,7 +78,7 @@ where { fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { match self { - StorageAnnouncement::StoredDeploy { deploy_hash } => { + StorageAnnouncement::StoredDeploy { deploy_hash, .. } => { write!(formatter, "stored deploy {}", deploy_hash) } } From f1d03c5ce16696c5154cb955cafb6f961c06bdf2 Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Thu, 16 Jul 2020 14:51:35 +0200 Subject: [PATCH 8/8] Pass in reference to full deploy instead of pieces to avoid inconsistencies when creating announcement --- node/src/components/storage.rs | 8 +++----- node/src/effect.rs | 26 ++++++++++++-------------- 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/node/src/components/storage.rs b/node/src/components/storage.rs index 6060532d1e..aab5bd442e 100644 --- a/node/src/components/storage.rs +++ b/node/src/components/storage.rs @@ -152,8 +152,8 @@ where StorageRequest::PutDeploy { deploy, responder } => { let deploy_store = self.deploy_store(); async move { - let deploy_id = *deploy.id(); - let deploy_header = deploy.header().clone(); + // Create the effect, but do not return it. + let announce_success = effect_builder.announce_deploy_stored(&deploy); let result = task::spawn_blocking(move || deploy_store.put(*deploy)) .await @@ -166,9 +166,7 @@ where if was_ok { // Now that we have stored the deploy, we also want to announce it. - effect_builder - .announce_deploy_stored(deploy_id, deploy_header) - .await; + announce_success.await; } } .ignore() diff --git a/node/src/effect.rs b/node/src/effect.rs index e55a615428..fc8a9f6b38 100644 --- a/node/src/effect.rs +++ b/node/src/effect.rs @@ -398,23 +398,21 @@ impl EffectBuilder { } /// Announces that a (not necessarily new) deploy has been added to the store. - pub(crate) async fn announce_deploy_stored( - self, - deploy_hash: ::Id, - deploy_header: ::Header, - ) where + pub(crate) fn announce_deploy_stored(self, deploy: &S::Deploy) -> impl Future + where S: StorageType, REv: From>, { - self.0 - .schedule( - StorageAnnouncement::StoredDeploy { - deploy_hash, - deploy_header, - }, - QueueKind::Regular, - ) - .await; + let deploy_hash = *deploy.id(); + let deploy_header = deploy.header().clone(); + + self.0.schedule( + StorageAnnouncement::StoredDeploy { + deploy_hash, + deploy_header, + }, + QueueKind::Regular, + ) } /// Puts the given block into the linear block store.