Skip to content
22 changes: 19 additions & 3 deletions node/src/components/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ use tokio::task;

use crate::{
components::Component,
effect::{requests::StorageRequest, EffectBuilder, EffectExt, Effects},
effect::{
announcements::StorageAnnouncement, requests::StorageRequest, EffectBuilder, EffectExt,
Effects,
},
types::{Block, Deploy},
};
// Seems to be a false positive.
Expand Down Expand Up @@ -99,12 +102,13 @@ impl<REv, S> Component<REv> for S
where
S: StorageType,
Self: Sized + 'static,
REv: From<StorageAnnouncement<S>> + Send,
{
type Event = StorageRequest<Self>;

fn handle_event<R: Rng + ?Sized>(
&mut self,
_effect_builder: EffectBuilder<REv>,
effect_builder: EffectBuilder<REv>,
_rng: &mut R,
event: Self::Event,
) -> Effects<Self::Event> {
Expand Down Expand Up @@ -148,10 +152,22 @@ where
StorageRequest::PutDeploy { deploy, responder } => {
let deploy_store = self.deploy_store();
async move {
// Create the effect, but do not return it.
let announce_success = effect_builder.announce_deploy_stored(&deploy);

let result = task::spawn_blocking(move || deploy_store.put(*deploy))
.await
.expect("should run");
responder.respond(result).await

let was_ok = result.is_ok();

// Tell the requestor the result of storing the deploy.
responder.respond(result).await;

if was_ok {
// Now that we have stored the deploy, we also want to announce it.
announce_success.await;
}
}
.ignore()
}
Expand Down
22 changes: 20 additions & 2 deletions node/src/effect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ use crate::{
types::{Deploy, ExecutedBlock, ProtoBlock},
Chainspec,
};
use announcements::{ApiServerAnnouncement, NetworkAnnouncement};
use announcements::{ApiServerAnnouncement, NetworkAnnouncement, StorageAnnouncement};
use requests::{ContractRuntimeRequest, DeployQueueRequest, NetworkRequest, StorageRequest};

/// A pinned, boxed future that produces one or more events.
Expand Down Expand Up @@ -371,7 +371,7 @@ impl<REv> EffectBuilder<REv> {
.await
}

/// Announce that a network message has been received.
/// Announces that a network message has been received.
pub(crate) async fn announce_message_received<I, P>(self, sender: I, payload: P)
where
REv: From<NetworkAnnouncement<I, P>>,
Expand All @@ -397,6 +397,24 @@ impl<REv> EffectBuilder<REv> {
.await;
}

/// Announces that a (not necessarily new) deploy has been added to the store.
pub(crate) fn announce_deploy_stored<S>(self, deploy: &S::Deploy) -> impl Future<Output = ()>
where
S: StorageType,
REv: From<StorageAnnouncement<S>>,
{
let deploy_hash = *deploy.id();
let deploy_header = deploy.header().clone();

self.0.schedule(
StorageAnnouncement::StoredDeploy {
deploy_hash,
deploy_header,
},
QueueKind::Regular,
)
}

/// Puts the given block into the linear block store.
// TODO: remove once method is used.
#[allow(dead_code)]
Expand Down
32 changes: 31 additions & 1 deletion node/src/effect/announcements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

use std::fmt::{self, Display, Formatter};

use crate::types::Deploy;
use crate::{
components::storage::{StorageType, Value},
types::Deploy,
};

/// A networking layer announcement.
#[derive(Debug)]
Expand Down Expand Up @@ -54,3 +57,30 @@ impl Display for ApiServerAnnouncement {
}
}
}

/// A storage layer announcement.
#[derive(Debug)]
pub enum StorageAnnouncement<S: StorageType> {
/// A deploy has been stored.
StoredDeploy {
/// ID or "hash" of the deploy that was added to the store.
deploy_hash: <S::Deploy as Value>::Id,

/// The header of the deploy that was added to the store.
deploy_header: <S::Deploy as Value>::Header,
},
}

impl<S> Display for StorageAnnouncement<S>
where
S: StorageType,
<S::Deploy as Value>::Id: Display,
{
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
match self {
StorageAnnouncement::StoredDeploy { deploy_hash, .. } => {
write!(formatter, "stored deploy {}", deploy_hash)
}
}
}
}
11 changes: 11 additions & 0 deletions node/src/reactor/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
use derive_more::From;
use rand::Rng;
use thiserror::Error;
use tracing::debug;

use crate::{
components::{
Expand All @@ -17,6 +18,7 @@ use crate::{
Component,
},
effect::{
announcements::StorageAnnouncement,
requests::{ContractRuntimeRequest, StorageRequest},
EffectBuilder, Effects,
},
Expand All @@ -35,6 +37,10 @@ pub enum Event {
#[from]
Storage(StorageRequest<Storage>),

/// Storage announcement.
#[from]
StorageAnnouncement(StorageAnnouncement<Storage>),

/// Contract runtime event.
#[from]
ContractRuntime(contract_runtime::Event),
Expand All @@ -51,6 +57,7 @@ impl Display for Event {
match self {
Event::Chainspec(event) => write!(formatter, "chainspec: {}", event),
Event::Storage(event) => write!(formatter, "storage: {}", event),
Event::StorageAnnouncement(ann) => write!(formatter, "storage announcement: {}", ann),
Event::ContractRuntime(event) => write!(formatter, "contract runtime: {}", event),
}
}
Expand Down Expand Up @@ -139,6 +146,10 @@ impl reactor::Reactor for Reactor {
Event::Storage,
self.storage.handle_event(effect_builder, rng, event),
),
Event::StorageAnnouncement(ann) => {
debug!(%ann, "ignoring storing announcement");
Effects::new()
}
Event::ContractRuntime(event) => reactor::wrap_effects(
Event::ContractRuntime,
self.contract_runtime
Expand Down
11 changes: 10 additions & 1 deletion node/src/reactor/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::fmt::{self, Display, Formatter};
use derive_more::From;
use rand::Rng;
use serde::{Deserialize, Serialize};
use tracing::warn;

use crate::{
components::{
Expand All @@ -23,7 +24,7 @@ use crate::{
Component,
},
effect::{
announcements::{ApiServerAnnouncement, NetworkAnnouncement},
announcements::{ApiServerAnnouncement, NetworkAnnouncement, StorageAnnouncement},
requests::{
ApiRequest, ContractRuntimeRequest, DeployQueueRequest, NetworkRequest, StorageRequest,
},
Expand Down Expand Up @@ -102,6 +103,9 @@ pub enum Event {
/// Network announcement.
#[from]
NetworkAnnouncement(NetworkAnnouncement<NodeId, Message>),
/// Storage announcement.
#[from]
StorageAnnouncement(StorageAnnouncement<Storage>),
/// API server announcement.
#[from]
ApiServerAnnouncement(ApiServerAnnouncement),
Expand Down Expand Up @@ -152,6 +156,7 @@ impl Display for Event {
Event::DeployQueueRequest(req) => write!(f, "deploy queue request: {}", req),
Event::NetworkAnnouncement(ann) => write!(f, "network announcement: {}", ann),
Event::ApiServerAnnouncement(ann) => write!(f, "api server announcement: {}", ann),
Event::StorageAnnouncement(ann) => write!(f, "storage announcement: {}", ann),
}
}
}
Expand Down Expand Up @@ -299,6 +304,10 @@ impl reactor::Reactor for Reactor {
let event = deploy_gossiper::Event::DeployReceived { deploy };
self.dispatch_event(effect_builder, rng, Event::DeployGossiper(event))
}
Event::StorageAnnouncement(ann) => {
warn!(%ann, "dropped storage announcement");
Effects::new()
}
}
}
}
Expand Down