diff --git a/Cargo.lock b/Cargo.lock index 2d7f3c9e50..14ebc58432 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -33,6 +33,18 @@ version = "1.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85bb70cc08ec97ca5450e6eba421deeea5f172c0fc61f78b5357b2a8e8be195f" +[[package]] +name = "arrayref" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4c527152e37cf757a3f78aae5a06fbeefdb07ccc535c980a3208ee3060dd544" + +[[package]] +name = "arrayvec" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cff77d8686867eceff3105329d4698d96c2391c176d5d03adc90c7389162b5b8" + [[package]] name = "atty" version = "0.2.14" @@ -96,6 +108,17 @@ dependencies = [ "opaque-debug", ] +[[package]] +name = "blake2b_simd" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8fb2d74254a3a0b5cac33ac9f8ed0e44aa50378d9dbb2e5d83bd21ed1dc2c8a" +dependencies = [ + "arrayref", + "arrayvec", + "constant_time_eq", +] + [[package]] name = "block-buffer" version = "0.7.3" @@ -161,6 +184,7 @@ dependencies = [ "blake2", "bytes", "derive_more", + "directories", "ed25519-dalek", "either", "enum-iterator", @@ -169,6 +193,8 @@ dependencies = [ "hex", "hex_fmt", "http", + "libc", + "lmdb", "maplit", "openssl", "rand 0.7.3", @@ -180,6 +206,7 @@ dependencies = [ "serde_json", "smallvec", "structopt", + "tempfile", "thiserror", "tokio", "tokio-openssl", @@ -247,6 +274,12 @@ dependencies = [ "bitflags", ] +[[package]] +name = "constant_time_eq" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" + [[package]] name = "core-foundation" version = "0.7.0" @@ -263,6 +296,17 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3a71ab494c0b5b860bdc8407ae08978052417070c2ced38573a9157ad75b8ac" +[[package]] +name = "crossbeam-utils" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" +dependencies = [ + "autocfg 1.0.0", + "cfg-if", + "lazy_static", +] + [[package]] name = "crypto-mac" version = "0.7.0" @@ -317,6 +361,27 @@ dependencies = [ "generic-array", ] +[[package]] +name = "directories" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "551a778172a450d7fc12e629ca3b0428d00f6afa9a43da1b630d54604e97371c" +dependencies = [ + "cfg-if", + "dirs-sys", +] + +[[package]] +name = "dirs-sys" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e93d7f5705de3e49895a2b5e0b8855a1c27f080192ae9c32a6432d50741a57a" +dependencies = [ + "libc", + "redox_users", + "winapi 0.3.8", +] + [[package]] name = "dtoa" version = "0.4.5" @@ -748,6 +813,28 @@ version = "0.2.71" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9457b06509d27052635f90d6466700c65095fdf75409b3fbdd903e988b886f49" +[[package]] +name = "lmdb" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b0908efb5d6496aa977d96f91413da2635a902e5e31dbef0bfb88986c248539" +dependencies = [ + "bitflags", + "libc", + "lmdb-sys", +] + +[[package]] +name = "lmdb-sys" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5b392838cfe8858e86fac37cf97a0e8c55cc60ba0a18365cadc33092f128ce9" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "log" version = "0.3.9" @@ -910,9 +997,9 @@ dependencies = [ [[package]] name = "num-integer" -version = "0.1.42" +version = "0.1.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f6ea62e9d81a77cd3ee9a2a5b9b609447857f3d358704331e4ef39eb247fcba" +checksum = "8d59457e662d541ba17869cf51cf177c0b5f0cbf476c66bdc90bf1edac4f875b" dependencies = [ "autocfg 1.0.0", "num-traits", @@ -920,9 +1007,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.11" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c62be47e61d1842b9170f0fdeec8eba98e60e90e5446449a0545e5152acd7096" +checksum = "ac267bcc07f48ee5f8935ab0d24f316fb722d7a1292e2913f0cc196b29ffd611" dependencies = [ "autocfg 1.0.0", ] @@ -1029,18 +1116,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "0.4.20" +version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e75373ff9037d112bb19bc61333a06a159eaeb217660dcfbea7d88e1db823919" +checksum = "b044170ce52ac41b78bdf855a045f6fe6ba72c293a33a2e3f654642127680563" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "0.4.20" +version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10b4b44893d3c370407a1d6a5cfde7c41ae0478e31c516c85f67eb3adc51be6d" +checksum = "babd76ce3c0d7c677fd01a3c3f9be24fa760adb73fd6db48f151662c1ec7eaba" dependencies = [ "proc-macro2", "quote", @@ -1295,6 +1382,17 @@ version = "0.1.56" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84" +[[package]] +name = "redox_users" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09b23093265f8d200fa7b4c2c76297f47e681c655f6f1285a8780d6a022f7431" +dependencies = [ + "getrandom", + "redox_syscall", + "rust-argon2", +] + [[package]] name = "regex" version = "1.3.9" @@ -1325,9 +1423,9 @@ checksum = "26412eb97c6b088a6997e05f69403a802a92d520de2f8e63c2b65f9e0f47c4e8" [[package]] name = "remove_dir_all" -version = "0.5.2" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a83fa3702a688b9359eccba92d153ac33fd2e8462f9e0e3fdf155239ea7792e" +checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" dependencies = [ "winapi 0.3.8", ] @@ -1387,6 +1485,18 @@ dependencies = [ "serde", ] +[[package]] +name = "rust-argon2" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bc8af4bda8e1ff4932523b94d3dd20ee30a87232323eda55903ffd71d2fb017" +dependencies = [ + "base64 0.11.0", + "blake2b_simd", + "constant_time_eq", + "crossbeam-utils", +] + [[package]] name = "ryu" version = "1.0.5" diff --git a/Cargo.toml b/Cargo.toml index c20c74f7be..2f5f214281 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ bincode = "1.2.1" blake2 = { version = "0.8.1", default-features = false } bytes = "0.5.4" derive_more = "0.99.7" +directories = "2.0.2" ed25519-dalek = { version = "1.0.0-pre.3", default-features = false, features = ["rand", "serde", "u64_backend"] } either = "1.5.3" enum-iterator = "0.6.0" @@ -27,6 +28,8 @@ getrandom = "0.1.14" hex = "0.4.2" hex_fmt = "0.3.0" http = "0.2.1" +libc = "0.2.71" +lmdb = "0.8.0" maplit = "1.0.2" openssl = "0.10.29" rand = "0.7.3" @@ -38,6 +41,7 @@ serde-big-array = "0.3.0" serde_json = "1.0.55" smallvec = "1.4.0" structopt = "0.3.14" +tempfile = "3.1.0" thiserror = "1.0.18" tokio = { version = "0.2.20", features = ["macros", "rt-threaded", "sync", "tcp", "time", "blocking"] } tokio-openssl = "0.4.0" diff --git a/README.md b/README.md index 39a95461af..05fc854be2 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,9 @@ cargo run --release -- generate-config > mynode.toml cargo run --release -- validator --config=mynode.toml ``` +**NOTE:** If you want to run multiple instances on the same machine, ensure you modify the `[storage.path]` field of +their configuration files to give each a unique path, or else instances will share database files. + ## Logging Logging can be enabled by setting the environment variable `RUST_LOG`. This can be set to one of the following levels, @@ -64,6 +67,8 @@ There is a minimal client which can be executed to store and retrieve `Deploy`s The client targets the HTTP service of a validator node. This can be configured via the config file for the node, and the actual bound endpoint is displayed as an info-level log message just after node startup. +#### Put a `Deploy` + To create a new random `Deploy` and store it: ``` @@ -73,8 +78,18 @@ cargo run --release --bin=casperlabs-client -- put-deploy http://localhost:7777 On success, the hash identifying the `Deploy` is output as a 64 character hex-encoded string. The `Deploy` will be broadcast immediately to all interconnected validator nodes. +#### Get a `Deploy` + To retrieve that deploy from any node: ``` cargo run --release --bin=casperlabs-client -- get-deploy http://localhost:8888 a555b68c8fed43078db6022a3de83fce97c1d80caf070c3654f9526d149e8182 ``` + +#### List stored `Deploy`s + +To get a list of all stored `Deploy`s' hashes: + +``` +cargo run --release --bin=casperlabs-client -- list-deploys http://localhost:9999 +``` diff --git a/src/apps/client/main.rs b/src/apps/client/main.rs index 494907103f..d72d543426 100644 --- a/src/apps/client/main.rs +++ b/src/apps/client/main.rs @@ -6,7 +6,7 @@ use structopt::StructOpt; use casperlabs_node::types::Deploy; -const DEPLOY_API_PATH: &str = "deploy"; +const DEPLOY_API_PATH: &str = "deploys"; #[derive(Debug, StructOpt)] /// CasperLabs client. @@ -25,6 +25,12 @@ pub enum Args { /// Hex-encoded deploy hash. deploy_hash: String, }, + /// Get the list of all stored deploys' hashes. + ListDeploys { + /// Address of the casperlabs-node HTTP service to contact. Example format: + /// http://localhost:7777 + node_address: String, + }, } #[tokio::main] @@ -35,6 +41,7 @@ async fn main() -> anyhow::Result<()> { node_address, deploy_hash, } => get_deploy(node_address, deploy_hash).await, + Args::ListDeploys { node_address } => list_deploys(node_address).await, } } @@ -71,3 +78,13 @@ async fn get_deploy(node_address: String, deploy_hash: String) -> anyhow::Result Ok(()) } + +async fn list_deploys(node_address: String) -> anyhow::Result<()> { + let url = format!("{}/{}", node_address, DEPLOY_API_PATH); + let body = reqwest::get(&url).await?.bytes().await?; + + let json_encoded = str::from_utf8(body.as_ref())?; + println!("{}", json_encoded); + + Ok(()) +} diff --git a/src/apps/node/cli.rs b/src/apps/node/cli.rs index f994686dbb..a50ca7a144 100644 --- a/src/apps/node/cli.rs +++ b/src/apps/node/cli.rs @@ -59,14 +59,14 @@ impl Cli { io::stdout().write_all(cfg_str.as_bytes())?; } Cli::Validator { config } => { + logging::init()?; // We load the specified config, if any, otherwise use defaults. let cfg = config .map(config::load_from_file) .transpose()? .unwrap_or_default(); - logging::init()?; - reactor::validator::run(cfg.validator_net, cfg.http_server).await? + reactor::validator::run(cfg.validator_net, cfg.http_server, cfg.storage).await? } } Ok(()) diff --git a/src/apps/node/config.rs b/src/apps/node/config.rs index 8a590df877..5bff7dceb0 100644 --- a/src/apps/node/config.rs +++ b/src/apps/node/config.rs @@ -23,7 +23,8 @@ use anyhow::Context; use serde::{Deserialize, Serialize}; use casperlabs_node::{ - ApiServerConfig, SmallNetworkConfig, ROOT_PUBLIC_LISTENING_PORT, ROOT_VALIDATOR_LISTENING_PORT, + ApiServerConfig, SmallNetworkConfig, StorageConfig, ROOT_PUBLIC_LISTENING_PORT, + ROOT_VALIDATOR_LISTENING_PORT, }; /// Root configuration. @@ -35,6 +36,8 @@ pub struct Config { pub public_net: SmallNetworkConfig, /// Network configuration for the HTTP API. pub http_server: ApiServerConfig, + /// On-disk storage configuration. + pub storage: StorageConfig, } impl Default for Config { @@ -43,6 +46,7 @@ impl Default for Config { validator_net: SmallNetworkConfig::default_on_port(ROOT_VALIDATOR_LISTENING_PORT), public_net: SmallNetworkConfig::default_on_port(ROOT_PUBLIC_LISTENING_PORT), http_server: ApiServerConfig::default(), + storage: StorageConfig::default(), } } } @@ -50,11 +54,13 @@ impl Default for Config { /// Loads a TOML-formatted configuration from a given file. pub fn load_from_file>(config_path: P) -> anyhow::Result { let path_ref = config_path.as_ref(); - Ok(toml::from_str( - &fs::read_to_string(path_ref) - .with_context(|| format!("Failed to read configuration file {:?}", path_ref))?, - ) - .with_context(|| format!("Failed to parse configuration file {:?}", path_ref))?) + let config: Config = + toml::from_str(&fs::read_to_string(path_ref).with_context(|| { + format!("Failed to read configuration file {}", path_ref.display()) + })?) + .with_context(|| format!("Failed to parse configuration file {}", path_ref.display()))?; + config.storage.check_sizes(); + Ok(config) } /// Creates a TOML-formatted string from a given configuration. diff --git a/src/components/api_server.rs b/src/components/api_server.rs index f1987f5621..3bf92ca0db 100644 --- a/src/components/api_server.rs +++ b/src/components/api_server.rs @@ -6,11 +6,21 @@ //! //! This module currently provides both halves of what is required for an API server: An abstract //! API Server that handles API requests and an external service endpoint based on HTTP+JSON. +//! +//! API +//! * To store a deploy, send an HTTP POST request to "/deploys" where the body is the +//! JSON-serialized deploy. The response will be the deploy's hash (hex-encoded) or an error +//! message on failure. +//! * To retrieve a deploy, send an HTTP GET request to "/deploys/" where is the +//! hex-encoded deploy hash. The response will be the JSON-serialized deploy, "null" if the +//! deploy doesn't exist or an error message on failure.. +//! * To list all stored deploy hashes, send an HTTP GET request to "/deploys". The response will +//! be the JSON-serialized list of hex-encoded deploy hashes or an error message on failure. mod config; mod event; -use std::{net::SocketAddr, str}; +use std::{error::Error as StdError, net::SocketAddr, str}; use bytes::Bytes; use http::Response; @@ -27,7 +37,7 @@ use warp::{ use super::Component; use crate::{ - components::storage::Storage, + components::storage::{self, Storage}, crypto::hash::Digest, effect::{ requests::{ApiRequest, DeployBroadcasterRequest, StorageRequest}, @@ -39,7 +49,7 @@ use crate::{ pub use config::Config; pub(crate) use event::Event; -const DEPLOY_API_PATH: &str = "deploy"; +const DEPLOYS_API_PATH: &str = "deploys"; pub(crate) struct ApiServer {} @@ -49,7 +59,7 @@ impl ApiServer { effect_builder: EffectBuilder, ) -> (Self, Multiple>) where - REv: From + From + Send, + REv: From + From + From> + Send, { let effects = Multiple::new(); let api_server = ApiServer {}; @@ -62,7 +72,7 @@ impl ApiServer { impl Component for ApiServer where - REv: From> + Send + From, + REv: From> + From + Send, { type Event = Event; @@ -92,18 +102,28 @@ where result: Box::new(result), main_responder: responder, }), + Event::ApiRequest(ApiRequest::ListDeploys { responder }) => effect_builder + .list_deploys() + .event(move |result| Event::ListDeploysResult { + result: Box::new(result), + main_responder: responder, + }), Event::PutDeployResult { deploy, result, main_responder, } => main_responder - .respond(result.map_err(|error| (*deploy, error.to_string()))) + .respond(result.map_err(|error| (*deploy, error))) .ignore(), Event::GetDeployResult { hash: _, result, main_responder, - } => main_responder.respond(result.ok()).ignore(), + } => main_responder.respond(*result).ignore(), + Event::ListDeploysResult { + result, + main_responder, + } => main_responder.respond(*result).ignore(), } } } @@ -111,15 +131,15 @@ where /// Run the HTTP server. async fn run_server(config: Config, effect_builder: EffectBuilder) where - REv: From + From + Send, + REv: From + From + From> + Send, { let post_deploy = warp::post() - .and(warp::path(DEPLOY_API_PATH)) + .and(warp::path(DEPLOYS_API_PATH)) .and(body::bytes()) .and_then(move |encoded_deploy| parse_post_request(effect_builder, encoded_deploy)); let get_deploy = warp::get() - .and(warp::path(DEPLOY_API_PATH)) + .and(warp::path(DEPLOYS_API_PATH)) .and(warp::path::tail()) .and_then(move |hex_digest| parse_get_request(effect_builder, hex_digest)); @@ -173,7 +193,7 @@ where } }; - let reply = effect_builder + let result = effect_builder .make_request( |responder| ApiRequest::SubmitDeploy { deploy: Box::new(deploy), @@ -182,16 +202,81 @@ where QueueKind::Api, ) .await; - let json = reply::json(&reply); - Ok(reply::with_status(json, StatusCode::OK)) + + match result { + Ok(()) => { + let json = reply::json(&""); + Ok(reply::with_status(json, StatusCode::OK)) + } + Err((deploy, error)) => { + let error_reply = format!("Failed to store {}: {}", deploy.id(), error); + let json = reply::json(&error_reply); + Ok(reply::with_status(json, StatusCode::BAD_REQUEST)) + } + } } async fn parse_get_request( + effect_builder: EffectBuilder, + tail: Tail, +) -> Result, Rejection> +where + REv: From + From + From> + Send, +{ + if tail.as_str().is_empty() { + handle_list_deploys_request(effect_builder).await + } else { + handle_get_deploy_request(effect_builder, tail).await + } +} + +async fn handle_list_deploys_request( + effect_builder: EffectBuilder, +) -> Result, Rejection> +where + REv: From + From + From> + Send, +{ + let result = effect_builder + .make_request( + |responder| ApiRequest::ListDeploys { responder }, + QueueKind::Api, + ) + .await; + let error_body = |error: &dyn std::error::Error| -> String { + format!( + r#""Internal server error listing deploys. Error: {}""#, + error + ) + }; + + // TODO - paginate these? + let (body, status) = match result { + Ok(deploy_hashes) => { + let hex_hashes = deploy_hashes + .into_iter() + .map(|deploy_hash| hex::encode(deploy_hash.inner())) + .collect::>(); + match serde_json::to_string(&hex_hashes) { + Ok(body) => (body, StatusCode::OK), + Err(error) => (error_body(&error), StatusCode::INTERNAL_SERVER_ERROR), + } + } + Err(error) => (error_body(&error), StatusCode::INTERNAL_SERVER_ERROR), + }; + + Ok(Response::builder() + .header("content-type", "application/json") + .status(status) + .body(body) + .unwrap()) +} + +async fn handle_get_deploy_request( effect_builder: EffectBuilder, hex_digest: Tail, ) -> Result, Rejection> where - REv: From + From + Send, + REv: From + From + From> + Send, { let digest = match Digest::from_hex(hex_digest.as_str()) { Ok(digest) => digest, @@ -211,21 +296,36 @@ where } }; - let reply = effect_builder + let result = effect_builder .make_request( - move |responder| ApiRequest::GetDeploy { + |responder| ApiRequest::GetDeploy { hash: DeployHash::new(digest), responder, }, QueueKind::Api, ) - .await - .and_then(|deploy| deploy.to_json().ok()) - .unwrap_or_else(|| "null".to_string()); + .await; + + let error_body = |error: &dyn StdError| -> String { + format!( + r#""Internal server error retrieving {}. Error: {}""#, + hex_digest.as_str(), + error + ) + }; + + let (body, status) = match result { + Ok(deploy) => match deploy.to_json() { + Ok(deploy_as_json) => (deploy_as_json, StatusCode::OK), + Err(error) => (error_body(&error), StatusCode::INTERNAL_SERVER_ERROR), + }, + Err(storage::Error::NotFound) => ("null".to_string(), StatusCode::OK), + Err(error) => (error_body(&error), StatusCode::INTERNAL_SERVER_ERROR), + }; Ok(Response::builder() .header("content-type", "application/json") - .status(StatusCode::OK) - .body(reply) + .status(status) + .body(body) .unwrap()) } diff --git a/src/components/api_server/event.rs b/src/components/api_server/event.rs index e69c38acf0..dea2a9035d 100644 --- a/src/components/api_server/event.rs +++ b/src/components/api_server/event.rs @@ -3,7 +3,7 @@ use std::fmt::{self, Display, Formatter}; use derive_more::From; use crate::{ - components::storage::InMemResult, + components::storage, effect::{requests::ApiRequest, Responder}, types::{Deploy, DeployHash}, }; @@ -14,13 +14,17 @@ pub(crate) enum Event { ApiRequest(ApiRequest), PutDeployResult { deploy: Box, - result: InMemResult<()>, - main_responder: Responder>, + result: storage::Result<()>, + main_responder: Responder>, }, GetDeployResult { hash: DeployHash, - result: Box>, - main_responder: Responder>, + result: Box>, + main_responder: Responder>, + }, + ListDeploysResult { + result: Box>>, + main_responder: Responder>>, }, } @@ -28,15 +32,15 @@ impl Display for Event { fn fmt(&self, formatter: &mut Formatter) -> fmt::Result { match self { Event::ApiRequest(request) => write!(formatter, "{}", request), - Event::PutDeployResult { deploy, result, .. } => write!( - formatter, - "PutDeployResult for {}: {:?}", - deploy.id(), - result - ), + Event::PutDeployResult { result, .. } => { + write!(formatter, "PutDeployResult: {:?}", result) + } Event::GetDeployResult { hash, result, .. } => { write!(formatter, "GetDeployResult for {}: {:?}", hash, result) } + Event::ListDeploysResult { result, .. } => { + write!(formatter, "ListDeployResult: {:?}", result) + } } } } diff --git a/src/components/deploy_broadcaster.rs b/src/components/deploy_broadcaster.rs index 2c93d3be99..bdcaf30295 100644 --- a/src/components/deploy_broadcaster.rs +++ b/src/components/deploy_broadcaster.rs @@ -1,7 +1,4 @@ -use std::{ - collections::HashSet, - fmt::{self, Display, Formatter}, -}; +use std::fmt::{self, Display, Formatter}; use rand::Rng; use serde::{Deserialize, Serialize}; @@ -12,7 +9,7 @@ use crate::{ requests::{DeployBroadcasterRequest, NetworkRequest, StorageRequest}, Effect, EffectBuilder, EffectExt, Multiple, }, - types::{Deploy, DeployHash}, + types::Deploy, }; /// Deploy broadcaster events. @@ -55,16 +52,11 @@ impl Display for Message { /// The component which broadcasts `Deploy`s to peers and handles incoming `Deploy`s which have been /// broadcast to it. #[derive(Default)] -pub(crate) struct DeployBroadcaster { - /// The IDs of all the `Deploy`s stored locally. - ids: HashSet, -} +pub(crate) struct DeployBroadcaster {} impl DeployBroadcaster { pub(crate) fn new() -> Self { - DeployBroadcaster { - ids: HashSet::new(), - } + DeployBroadcaster {} } } @@ -83,7 +75,6 @@ where match event { Event::Request(DeployBroadcasterRequest::PutFromClient { deploy }) => { // Incoming HTTP request - broadcast the `Deploy`. - self.ids.insert(*deploy.id()); effect_builder .broadcast_message(Message::Put(deploy)) .ignore() @@ -93,7 +84,6 @@ where sender: _, } => { // Incoming broadcast message - just store the `Deploy`. - self.ids.insert(*deploy.id()); effect_builder.put_deploy(*deploy).ignore() } } diff --git a/src/components/small_network/config.rs b/src/components/small_network/config.rs index 3d98ea56a2..d191b3da38 100644 --- a/src/components/small_network/config.rs +++ b/src/components/small_network/config.rs @@ -5,7 +5,7 @@ use std::{ use serde::{Deserialize, Serialize}; -/// Small network configuration +/// Small network configuration. #[derive(Debug, Deserialize, Serialize)] pub struct Config { /// Interface to bind to. diff --git a/src/components/storage.rs b/src/components/storage.rs index e26f198fb1..436e610ff3 100644 --- a/src/components/storage.rs +++ b/src/components/storage.rs @@ -1,10 +1,12 @@ +mod config; mod error; mod in_mem_store; +mod lmdb_store; mod store; use std::{ - error::Error as StdError, fmt::{Debug, Display}, + fs, hash::Hash, sync::Arc, }; @@ -18,22 +20,56 @@ use crate::{ effect::{requests::StorageRequest, Effect, EffectBuilder, EffectExt, Multiple}, types::{Block, Deploy}, }; -pub(crate) use error::{InMemError, InMemResult}; +// Seems to be a false positive. +#[allow(unreachable_pub)] +pub use config::Config; +// Seems to be a false positive. +#[allow(unreachable_pub)] +pub use error::Error; +pub(crate) use error::Result; use in_mem_store::InMemStore; -pub(crate) use store::Store; +use lmdb_store::LmdbStore; +use store::Store; -pub(crate) type Storage = InMemStorage; +pub(crate) type Storage = LmdbStorage; + +const BLOCK_STORE_FILENAME: &str = "block_store.db"; +const DEPLOY_STORE_FILENAME: &str = "deploy_store.db"; /// Trait defining the API for a value able to be held within the storage component. pub(crate) trait Value: Clone + Serialize + DeserializeOwned + Send + Sync + Debug + Display { - type Id: Copy + Clone + Ord + PartialOrd + Eq + PartialEq + Hash + Debug + Display + Send + Sync; + type Id: Copy + + Clone + + Ord + + PartialOrd + + Eq + + PartialEq + + Hash + + Debug + + Display + + Serialize + + DeserializeOwned + + Send + + Sync; /// A relatively small portion of the value, representing header info or metadata. - type Header: Clone + Ord + PartialOrd + Eq + PartialEq + Hash + Debug + Display + Send + Sync; + type Header: Clone + + Ord + + PartialOrd + + Eq + + PartialEq + + Hash + + Debug + + Display + + Serialize + + DeserializeOwned + + Send + + Sync; fn id(&self) -> &Self::Id; fn header(&self) -> &Self::Header; + fn take_header(self) -> Self::Header; } /// Trait which will handle management of the various storage sub-components. @@ -41,19 +77,14 @@ pub(crate) trait Value: /// If this trait is ultimately only used for testing scenarios, we shouldn't need to expose it to /// the reactor - it can simply use a concrete type which implements this trait. pub(crate) trait StorageType { - type BlockStore: Store + Send + Sync; - type DeployStore: Store + Send + Sync; - type Error: StdError - + Clone - + Serialize - + DeserializeOwned - + Send - + Sync - + From<::Error> - + From<::Error>; + type Block: Value; + type Deploy: Value; - fn block_store(&self) -> Arc; - fn deploy_store(&self) -> Arc; + fn block_store(&self) -> Arc>; + fn deploy_store(&self) -> Arc>; + fn new(config: Config) -> Result + where + Self: Sized; } impl Component for S @@ -65,7 +96,7 @@ where fn handle_event( &mut self, - _eb: EffectBuilder, + _effect_builder: EffectBuilder, _rng: &mut R, event: Self::Event, ) -> Multiple> { @@ -143,37 +174,83 @@ where } .ignore() } + StorageRequest::ListDeploys { responder } => { + let deploy_store = self.deploy_store(); + async move { + let result = task::spawn_blocking(move || deploy_store.ids()) + .await + .expect("should run"); + responder.respond(result).await + } + .ignore() + } } } } -// Concrete type of `Storage` - backed by in-memory block store only for now, but will eventually -// also hold in-mem versions of wasm-store, deploy-store, etc. +// Concrete type of `Storage` backed by in-memory stores. #[derive(Debug)] pub(crate) struct InMemStorage { block_store: Arc>, deploy_store: Arc>, } -impl InMemStorage { - pub(crate) fn new() -> Self { - InMemStorage { +#[allow(trivial_casts)] +impl StorageType for InMemStorage { + type Block = B; + type Deploy = D; + + fn block_store(&self) -> Arc> { + Arc::clone(&self.block_store) as Arc> + } + + fn new(_config: Config) -> Result { + Ok(InMemStorage { block_store: Arc::new(InMemStore::new()), deploy_store: Arc::new(InMemStore::new()), - } + }) + } + + fn deploy_store(&self) -> Arc> { + Arc::clone(&self.deploy_store) as Arc> } } -impl StorageType for InMemStorage { - type BlockStore = InMemStore; - type DeployStore = InMemStore; - type Error = InMemError; +// Concrete type of `Storage` backed by LMDB stores. +#[derive(Debug)] +pub(crate) struct LmdbStorage { + block_store: Arc>, + deploy_store: Arc>, +} + +#[allow(trivial_casts)] +impl StorageType for LmdbStorage { + type Block = B; + type Deploy = D; + + fn new(config: Config) -> Result { + fs::create_dir_all(&config.path).map_err(|error| Error::CreateDir { + dir: config.path.display().to_string(), + source: error, + })?; + + let block_store_path = config.path.join(BLOCK_STORE_FILENAME); + let deploy_store_path = config.path.join(DEPLOY_STORE_FILENAME); + + let block_store = LmdbStore::new(block_store_path, config.max_block_store_size)?; + let deploy_store = LmdbStore::new(deploy_store_path, config.max_deploy_store_size)?; + + Ok(LmdbStorage { + block_store: Arc::new(block_store), + deploy_store: Arc::new(deploy_store), + }) + } - fn block_store(&self) -> Arc { - Arc::clone(&self.block_store) + fn block_store(&self) -> Arc> { + Arc::clone(&self.block_store) as Arc> } - fn deploy_store(&self) -> Arc { - Arc::clone(&self.deploy_store) + fn deploy_store(&self) -> Arc> { + Arc::clone(&self.deploy_store) as Arc> } } diff --git a/src/components/storage/config.rs b/src/components/storage/config.rs new file mode 100644 index 0000000000..7272409114 --- /dev/null +++ b/src/components/storage/config.rs @@ -0,0 +1,112 @@ +use std::{io, path::PathBuf}; + +use directories::ProjectDirs; +use libc::{self, _SC_PAGESIZE}; +use serde::{Deserialize, Serialize}; +use tempfile::TempDir; +use tracing::warn; + +const QUALIFIER: &str = "io"; +const ORGANIZATION: &str = "CasperLabs"; +const APPLICATION: &str = "casperlabs-node"; + +const DEFAULT_MAX_BLOCK_STORE_SIZE: usize = 483_183_820_800; // 450 GiB +const DEFAULT_MAX_DEPLOY_STORE_SIZE: usize = 322_122_547_200; // 300 GiB + +const DEFAULT_TEST_MAX_DB_SIZE: usize = 52_428_800; // 50 MiB + +/// On-disk storage configuration. +#[derive(Debug, Deserialize, Serialize)] +pub struct Config { + /// The path to the folder where any files created or read by the storage component will exist. + /// + /// If the folder doesn't exist, it and any required parents will be created. + /// + /// Defaults to: + /// * Linux: `$XDG_DATA_HOME/casperlabs-node` or `$HOME/.local/share/casperlabs-node`, e.g. + /// /home/alice/.local/share/casperlabs-node + /// * macOS: `$HOME/Library/Application Support/io.CasperLabs.casperlabs-node`, e.g. + /// /Users/Alice/Library/Application Support/io.CasperLabs.casperlabs-node + /// * Windows: `{FOLDERID_RoamingAppData}\CasperLabs\casperlabs-node\data` e.g. + /// C:\Users\Alice\AppData\Roaming\CasperLabs\casperlabs-node\data + pub path: PathBuf, + /// Sets the maximum size of the database to use for the block store. + /// + /// Defaults to 483,183,820,800 == 450 GiB. + /// + /// The size should be a multiple of the OS page size. + pub max_block_store_size: usize, + /// Sets the maximum size of the database to use for the deploy store. + /// + /// Defaults to 322,122,547,200 == 300 GiB. + /// + /// The size should be a multiple of the OS page size. + pub max_deploy_store_size: usize, +} + +impl Config { + /// Returns a default `Config` suitable for tests, along with a `TempDir` which must be kept + /// alive for the duration of the test since its destructor removes the dir from the filesystem. + #[allow(unused)] + pub(crate) fn default_for_tests() -> (Self, TempDir) { + let tempdir = tempfile::tempdir().expect("should get tempdir"); + let path = tempdir.path().to_path_buf(); + + let config = Config { + path, + max_block_store_size: DEFAULT_TEST_MAX_DB_SIZE, + max_deploy_store_size: DEFAULT_TEST_MAX_DB_SIZE, + }; + (config, tempdir) + } + + /// Prints a warning if any max DB size is not a multiple of the OS page size. + pub fn check_sizes(&self) { + let page_size = get_page_size().unwrap_or(1); + if self.max_block_store_size % page_size != 0 { + warn!( + "max block store DB size {} is not multiple of system page size {}", + self.max_block_store_size, page_size + ); + } + if self.max_deploy_store_size % page_size != 0 { + warn!( + "max deploy store DB size {} is not multiple of system page size {}", + self.max_deploy_store_size, page_size + ); + } + } +} + +impl Default for Config { + fn default() -> Self { + let path = ProjectDirs::from(QUALIFIER, ORGANIZATION, APPLICATION) + .map(|project_dirs| project_dirs.data_dir().to_path_buf()) + .unwrap_or_else(|| { + warn!("failed to get project dir - falling back to current dir"); + PathBuf::from(".") + }); + + let config = Config { + path, + max_block_store_size: DEFAULT_MAX_BLOCK_STORE_SIZE, + max_deploy_store_size: DEFAULT_MAX_DEPLOY_STORE_SIZE, + }; + + config.check_sizes(); + config + } +} + +/// Returns OS page size +fn get_page_size() -> Result { + // https://www.gnu.org/software/libc/manual/html_node/Sysconf.html + let value = unsafe { libc::sysconf(_SC_PAGESIZE) }; + + if value < 0 { + warn!("unable to get system page size"); + return Err(io::Error::last_os_error()); + } + + Ok(value as usize) +} diff --git a/src/components/storage/error.rs b/src/components/storage/error.rs index ec7594685c..fb13e7e3ed 100644 --- a/src/components/storage/error.rs +++ b/src/components/storage/error.rs @@ -1,4 +1,60 @@ -mod in_mem_error; +use std::{ + error::Error as StdError, fmt::Debug, io, result::Result as StdResult, sync::PoisonError, +}; -pub(crate) use in_mem_error::InMemError; -pub(crate) type InMemResult = Result; +use thiserror::Error; + +use super::in_mem_store::PoisonedLock; + +pub(crate) type Result = StdResult; + +/// Error returned by the storage component. +#[derive(Debug, Error)] +pub enum Error { + /// Failed to create the given directory. + #[error("failed to create {dir}: {source}")] + CreateDir { + /// The path of directory which was attempted to be created. + dir: String, + /// Underlying IO error. + source: io::Error, + }, + + /// Failed to serialize data. + #[error("serialization: {0}")] + Serialization(#[source] bincode::ErrorKind), + + /// Failed to deserialize data. + #[error("deserialization: {0}")] + Deserialization(#[source] bincode::ErrorKind), + + /// Requested value not found. + #[error("value not found")] + NotFound, + + /// Internal storage component error. + #[error("internal: {0}")] + Internal(Box), +} + +impl Error { + pub(crate) fn from_serialization(error: bincode::ErrorKind) -> Self { + Error::Serialization(error) + } + + pub(crate) fn from_deserialization(error: bincode::ErrorKind) -> Self { + Error::Deserialization(error) + } +} + +impl From for Error { + fn from(error: lmdb::Error) -> Self { + Error::Internal(Box::new(error)) + } +} + +impl From> for Error { + fn from(_error: PoisonError) -> Self { + Error::Internal(Box::new(PoisonedLock {})) + } +} diff --git a/src/components/storage/error/in_mem_error.rs b/src/components/storage/error/in_mem_error.rs deleted file mode 100644 index 2bdd48ab21..0000000000 --- a/src/components/storage/error/in_mem_error.rs +++ /dev/null @@ -1,18 +0,0 @@ -use std::sync::PoisonError; - -use serde::{Deserialize, Serialize}; -use thiserror::Error; - -#[derive(Debug, Error, Clone, Serialize, Deserialize)] -pub(crate) enum InMemError { - #[error("value not found")] - ValueNotFound, - #[error("poisoned lock")] - PoisonedLock, -} - -impl From> for InMemError { - fn from(_error: PoisonError) -> Self { - InMemError::PoisonedLock - } -} diff --git a/src/components/storage/in_mem_store.rs b/src/components/storage/in_mem_store.rs index 61e341b593..443c4fb014 100644 --- a/src/components/storage/in_mem_store.rs +++ b/src/components/storage/in_mem_store.rs @@ -4,11 +4,17 @@ use std::{ sync::RwLock, }; -use super::{InMemError, InMemResult, Store, Value}; +use thiserror::Error; + +use super::{Error, Result, Store, Value}; + +#[derive(Error, Debug)] +#[error("poisoned lock")] +pub(super) struct PoisonedLock {} /// In-memory version of a store. #[derive(Debug)] -pub(crate) struct InMemStore { +pub(super) struct InMemStore { inner: RwLock>, } @@ -22,29 +28,32 @@ impl InMemStore { impl Store for InMemStore { type Value = V; - type Error = InMemError; - fn put(&self, value: V) -> InMemResult<()> { + fn put(&self, value: V) -> Result<()> { if let Entry::Vacant(entry) = self.inner.write()?.entry(*value.id()) { - entry.insert(value); + entry.insert(value.clone()); } Ok(()) } - fn get(&self, id: &V::Id) -> InMemResult { + fn get(&self, id: &V::Id) -> Result { self.inner .read()? .get(id) .cloned() - .ok_or_else(|| InMemError::ValueNotFound) + .ok_or_else(|| Error::NotFound) } - fn get_header(&self, id: &V::Id) -> InMemResult { + fn get_header(&self, id: &V::Id) -> Result { self.inner .read()? .get(id) .map(Value::header) .cloned() - .ok_or_else(|| InMemError::ValueNotFound) + .ok_or_else(|| Error::NotFound) + } + + fn ids(&self) -> Result> { + Ok(self.inner.read()?.keys().cloned().collect()) } } diff --git a/src/components/storage/lmdb_store.rs b/src/components/storage/lmdb_store.rs new file mode 100644 index 0000000000..5f2338fc38 --- /dev/null +++ b/src/components/storage/lmdb_store.rs @@ -0,0 +1,87 @@ +use std::{fmt::Debug, marker::PhantomData, path::Path}; + +use lmdb::{ + self, Cursor, Database, DatabaseFlags, Environment, EnvironmentFlags, Transaction, WriteFlags, +}; +use tracing::info; + +use super::{Error, Result, Store, Value}; + +/// LMDB version of a store. +#[derive(Debug)] +pub(super) struct LmdbStore { + env: Environment, + db: Database, + _phantom: PhantomData, +} + +impl LmdbStore { + pub(crate) fn new>(db_path: P, max_size: usize) -> Result { + let env = Environment::new() + .set_flags(EnvironmentFlags::NO_SUB_DIR) + .set_map_size(max_size) + .open(db_path.as_ref())?; + let db = env.create_db(None, DatabaseFlags::empty())?; + info!("opened DB at {}", db_path.as_ref().display()); + + Ok(LmdbStore { + env, + db, + _phantom: PhantomData, + }) + } +} + +impl LmdbStore { + fn get_value(&self, id: &V::Id) -> Result { + let id = bincode::serialize(id).map_err(|error| Error::from_serialization(*error))?; + let txn = self.env.begin_ro_txn()?; + let serialized_value = match txn.get(self.db, &id) { + Ok(value) => value, + Err(lmdb::Error::NotFound) => return Err(Error::NotFound), + Err(error) => return Err(error.into()), + }; + let value = bincode::deserialize(serialized_value) + .map_err(|error| Error::from_deserialization(*error))?; + txn.commit()?; + Ok(value) + } +} + +impl Store for LmdbStore { + type Value = V; + + fn put(&self, value: V) -> Result<()> { + let id = + bincode::serialize(value.id()).map_err(|error| Error::from_serialization(*error))?; + let serialized_value = + bincode::serialize(&value).map_err(|error| Error::from_serialization(*error))?; + let mut txn = self.env.begin_rw_txn()?; + txn.put(self.db, &id, &serialized_value, WriteFlags::empty())?; + txn.commit()?; + Ok(()) + } + + fn get(&self, id: &V::Id) -> Result { + self.get_value(id) + } + + fn get_header(&self, id: &V::Id) -> Result { + self.get_value(id).map(|value| value.take_header()) + } + + fn ids(&self) -> Result> { + let txn = self.env.begin_ro_txn()?; + let mut ids = vec![]; + { + let mut cursor = txn.open_ro_cursor(self.db)?; + for (id, _value) in cursor.iter() { + let id: V::Id = bincode::deserialize(id) + .map_err(|error| Error::from_deserialization(*error))?; + ids.push(id); + } + } + txn.commit()?; + Ok(ids) + } +} diff --git a/src/components/storage/store.rs b/src/components/storage/store.rs index e721301923..ce0e451a07 100644 --- a/src/components/storage/store.rs +++ b/src/components/storage/store.rs @@ -1,18 +1,13 @@ -use std::error::Error as StdError; - -use serde::{de::DeserializeOwned, Serialize}; - -use super::Value; +use super::{Result, Value}; /// Trait defining the API for a store managed by the storage component. -pub(crate) trait Store { +pub(crate) trait Store: Send + Sync { type Value: Value; - type Error: StdError + Clone + Serialize + DeserializeOwned + Send + Sync; - fn put(&self, block: Self::Value) -> Result<(), Self::Error>; - fn get(&self, id: &::Id) -> Result; - fn get_header( - &self, - id: &::Id, - ) -> Result<::Header, Self::Error>; + fn put(&self, block: Self::Value) -> Result<()>; + fn get(&self, id: &::Id) -> Result; + fn get_header(&self, id: &::Id) + -> Result<::Header>; + /// Returns a copy of all IDs held by the store. + fn ids(&self) -> Result::Id>>; } diff --git a/src/effect.rs b/src/effect.rs index 0310cc62e1..c4646e00cb 100644 --- a/src/effect.rs +++ b/src/effect.rs @@ -51,7 +51,7 @@ use smallvec::{smallvec, SmallVec}; use tracing::error; use crate::{ - components::storage::{StorageType, Store, Value}, + components::storage::{self, StorageType, Value}, effect::requests::DeployBroadcasterRequest, reactor::{EventQueueHandle, QueueKind}, types::Deploy, @@ -320,10 +320,7 @@ impl EffectBuilder { /// Puts the given block into the linear block store. Returns true on success. // TODO: remove once method is used. #[allow(dead_code)] - pub(crate) async fn put_block( - self, - block: ::Value, - ) -> Result<(), ::Error> + pub(crate) async fn put_block(self, block: S::Block) -> storage::Result<()> where S: StorageType + 'static, REv: From>, @@ -343,8 +340,8 @@ impl EffectBuilder { #[allow(dead_code)] pub(crate) async fn get_block( self, - block_hash: <::Value as Value>::Id, - ) -> Result<::Value, ::Error> + block_hash: ::Id, + ) -> storage::Result where S: StorageType + 'static, REv: From>, @@ -364,8 +361,8 @@ impl EffectBuilder { #[allow(dead_code)] pub(crate) async fn get_block_header( self, - block_hash: <::Value as Value>::Id, - ) -> Result<<::Value as Value>::Header, ::Error> + block_hash: ::Id, + ) -> storage::Result<::Header> where S: StorageType + 'static, REv: From>, @@ -381,10 +378,7 @@ impl EffectBuilder { } /// Puts the given deploy into the deploy store. Returns true on success. - pub(crate) async fn put_deploy( - self, - deploy: ::Value, - ) -> Result<(), ::Error> + pub(crate) async fn put_deploy(self, deploy: S::Deploy) -> storage::Result<()> where S: StorageType + 'static, REv: From>, @@ -402,8 +396,8 @@ impl EffectBuilder { /// Gets the requested deploy from the deploy store. pub(crate) async fn get_deploy( self, - deploy_hash: <::Value as Value>::Id, - ) -> Result<::Value, ::Error> + deploy_hash: ::Id, + ) -> storage::Result where S: StorageType + 'static, REv: From>, @@ -423,8 +417,8 @@ impl EffectBuilder { #[allow(dead_code)] pub(crate) async fn get_deploy_header( self, - deploy_hash: <::Value as Value>::Id, - ) -> Result<<::Value as Value>::Header, ::Error> + deploy_hash: ::Id, + ) -> storage::Result<::Header> where S: StorageType + 'static, REv: From>, @@ -439,6 +433,19 @@ impl EffectBuilder { .await } + /// Lists all deploy hashes held in the deploy store. + pub(crate) async fn list_deploys(self) -> storage::Result::Id>> + where + S: StorageType + 'static, + REv: From>, + { + self.make_request( + |responder| StorageRequest::ListDeploys { responder }, + QueueKind::Regular, + ) + .await + } + /// Passes the given deploy to the `DeployBroadcaster` component to be broadcast. pub(crate) async fn broadcast_deploy(self, deploy: Box) where diff --git a/src/effect/requests.rs b/src/effect/requests.rs index 296351aa01..10252fdc5e 100644 --- a/src/effect/requests.rs +++ b/src/effect/requests.rs @@ -2,7 +2,7 @@ use std::fmt::{self, Debug, Display, Formatter}; use super::Responder; use crate::{ - components::storage::{StorageType, Store, Value}, + components::storage::{self, StorageType, Value}, types::{Deploy, DeployHash}, }; @@ -70,48 +70,40 @@ where #[allow(clippy::type_complexity)] // TODO: remove once all variants are used. #[allow(dead_code)] -pub(crate) enum StorageRequest { +pub(crate) enum StorageRequest { /// Store given block. PutBlock { - block: Box<::Value>, - responder: Responder::Error>>, + block: Box, + responder: Responder>, }, /// Retrieve block with given hash. GetBlock { - block_hash: <::Value as Value>::Id, - responder: - Responder::Value, ::Error>>, + block_hash: ::Id, + responder: Responder>, }, /// Retrieve block header with given hash. GetBlockHeader { - block_hash: <::Value as Value>::Id, - responder: Responder< - Result< - <::Value as Value>::Header, - ::Error, - >, - >, + block_hash: ::Id, + responder: Responder::Header>>, }, /// Store given deploy. PutDeploy { - deploy: Box<::Value>, - responder: Responder::Error>>, + deploy: Box, + responder: Responder>, }, /// Retrieve deploy with given hash. GetDeploy { - deploy_hash: <::Value as Value>::Id, - responder: - Responder::Value, ::Error>>, + deploy_hash: ::Id, + responder: Responder>, }, /// Retrieve deploy header with given hash. GetDeployHeader { - deploy_hash: <::Value as Value>::Id, - responder: Responder< - Result< - <::Value as Value>::Header, - ::Error, - >, - >, + deploy_hash: ::Id, + responder: Responder::Header>>, + }, + /// List all deploy hashes. + ListDeploys { + responder: Responder::Id>>>, }, } @@ -130,6 +122,7 @@ impl Display for StorageRequest { StorageRequest::GetDeployHeader { deploy_hash, .. } => { write!(formatter, "get {}", deploy_hash) } + StorageRequest::ListDeploys { .. } => write!(formatter, "list deploys"), } } } @@ -145,12 +138,16 @@ pub(crate) enum ApiRequest { /// Returns the deploy along with an error message if it could not be stored. SubmitDeploy { deploy: Box, - responder: Responder>, + responder: Responder>, }, /// Return the specified deploy if it exists, else `None`. GetDeploy { hash: DeployHash, - responder: Responder>, + responder: Responder>, + }, + /// Return the list of all deploy hashes stored on this node. + ListDeploys { + responder: Responder, storage::Error>>, }, } @@ -159,6 +156,7 @@ impl Display for ApiRequest { match self { ApiRequest::SubmitDeploy { deploy, .. } => write!(formatter, "submit {}", *deploy), ApiRequest::GetDeploy { hash, .. } => write!(formatter, "get {}", hash), + ApiRequest::ListDeploys { .. } => write!(formatter, "list deploys"), } } } diff --git a/src/lib.rs b/src/lib.rs index eec3fef2f7..946a33dbd1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,7 +33,8 @@ mod utils; pub(crate) use components::small_network::{self, SmallNetwork}; pub use components::{ api_server::Config as ApiServerConfig, - small_network::{Config as SmallNetworkConfig, Error}, + small_network::{Config as SmallNetworkConfig, Error as SmallNetworkError}, + storage::{Config as StorageConfig, Error as StorageError}, }; /// The default listening port for the root node of the validator network. diff --git a/src/reactor.rs b/src/reactor.rs index 37f84c0107..864218f672 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -39,7 +39,7 @@ use tracing::{debug, info, trace, warn}; use crate::{ effect::{Effect, EffectBuilder, Multiple}, utils::{self, WeightedRoundRobin}, - ApiServerConfig, SmallNetworkConfig, + ApiServerConfig, SmallNetworkConfig, StorageConfig, }; pub use error::Error; pub(crate) use error::Result; @@ -116,6 +116,7 @@ pub(crate) trait Reactor: Sized { fn new( validator_network_config: SmallNetworkConfig, api_server_config: ApiServerConfig, + storage_config: StorageConfig, event_queue: EventQueueHandle, ) -> Result<(Self, Multiple>)>; } @@ -131,6 +132,7 @@ pub(crate) trait Reactor: Sized { async fn run( validator_network_config: SmallNetworkConfig, api_server_config: ApiServerConfig, + storage_config: StorageConfig, ) -> Result<()> { let event_size = mem::size_of::(); // Check if the event is of a reasonable size. This only emits a runtime warning at startup @@ -149,8 +151,12 @@ async fn run( let scheduler = utils::leak(scheduler); let event_queue = EventQueueHandle::new(scheduler); - let (mut reactor, initial_effects) = - R::new(validator_network_config, api_server_config, event_queue)?; + let (mut reactor, initial_effects) = R::new( + validator_network_config, + api_server_config, + storage_config, + event_queue, + )?; // Run all effects from component instantiation. process_effects(scheduler, initial_effects).await; diff --git a/src/reactor/error.rs b/src/reactor/error.rs index 146d544f49..550511cfb2 100644 --- a/src/reactor/error.rs +++ b/src/reactor/error.rs @@ -2,7 +2,7 @@ use std::result; use thiserror::Error; -use crate::components::small_network; +use crate::components::{small_network, storage}; pub(crate) type Result = result::Result; @@ -12,4 +12,7 @@ pub enum Error { /// `SmallNetwork` component error. #[error("small network error: {0}")] SmallNetwork(#[from] small_network::Error), + /// `Storage` component error. + #[error("storage error: {0}")] + Storage(#[from] storage::Error), } diff --git a/src/reactor/validator.rs b/src/reactor/validator.rs index 68a487775c..9252a7931d 100644 --- a/src/reactor/validator.rs +++ b/src/reactor/validator.rs @@ -15,7 +15,7 @@ use crate::{ consensus::{self, Consensus}, deploy_broadcaster::{self, DeployBroadcaster}, pinger::{self, Pinger}, - storage::Storage, + storage::{Storage, StorageType}, Component, }, effect::{ @@ -25,7 +25,7 @@ use crate::{ }, reactor::{self, EventQueueHandle, Result}, small_network::{self, NodeId}, - ApiServerConfig, SmallNetwork, SmallNetworkConfig, + ApiServerConfig, SmallNetwork, SmallNetworkConfig, StorageConfig, }; #[derive(Debug, Clone, From, Serialize, Deserialize)] @@ -121,12 +121,13 @@ impl reactor::Reactor for Reactor { fn new( validator_network_config: SmallNetworkConfig, api_server_config: ApiServerConfig, + storage_config: StorageConfig, event_queue: EventQueueHandle, ) -> Result<(Self, Multiple>)> { let effect_builder = EffectBuilder::new(event_queue); let (net, net_effects) = SmallNetwork::new(event_queue, validator_network_config)?; let (pinger, pinger_effects) = Pinger::new(effect_builder); - let storage = Storage::new(); + let storage = Storage::new(storage_config)?; let (api_server, api_server_effects) = ApiServer::new(api_server_config, effect_builder); let (consensus, consensus_effects) = Consensus::new(effect_builder); let deploy_broadcaster = DeployBroadcaster::new(); @@ -246,6 +247,7 @@ impl Display for Event { pub async fn run( validator_network_config: SmallNetworkConfig, api_server_config: ApiServerConfig, + storage_config: StorageConfig, ) -> Result<()> { - super::run::(validator_network_config, api_server_config).await + super::run::(validator_network_config, api_server_config, storage_config).await } diff --git a/src/types/block.rs b/src/types/block.rs index 5069686c6f..9a29943dd1 100644 --- a/src/types/block.rs +++ b/src/types/block.rs @@ -106,6 +106,10 @@ impl Value for Block { fn header(&self) -> &Self::Header { &self.header } + + fn take_header(self) -> Self::Header { + self.header + } } impl Display for Block { diff --git a/src/types/deploy.rs b/src/types/deploy.rs index 0bb22ad314..d509f5f43f 100644 --- a/src/types/deploy.rs +++ b/src/types/deploy.rs @@ -181,6 +181,10 @@ impl Value for Deploy { fn header(&self) -> &Self::Header { &self.header } + + fn take_header(self) -> Self::Header { + self.header + } } impl Display for Deploy {