diff --git a/Cargo.toml b/Cargo.toml index 29dd29877..a0b038a7f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,5 +2,6 @@ members = [ "grovedb", "merk", - "node-grove" + "node-grove", + "storage", ] diff --git a/grovedb/Cargo.toml b/grovedb/Cargo.toml index 8f3b067c0..925d26ae7 100644 --- a/grovedb/Cargo.toml +++ b/grovedb/Cargo.toml @@ -10,5 +10,6 @@ thiserror = "1.0.30" tempdir = "0.3.7" bincode = "1.3.3" serde = { version = "1.0.130", features = ["derive"] } +storage = { path = "../storage" } [features] diff --git a/grovedb/src/lib.rs b/grovedb/src/lib.rs index eaaabb150..5c7c769b3 100644 --- a/grovedb/src/lib.rs +++ b/grovedb/src/lib.rs @@ -1,4 +1,3 @@ -#![feature(trivial_bounds)] mod subtree; #[cfg(test)] mod tests; @@ -9,9 +8,13 @@ use std::{ rc::Rc, }; -use merk::{self, rocksdb, Merk}; +use merk::{self, Merk}; use rs_merkle::{algorithms::Sha256, MerkleTree}; pub use subtree::Element; +use storage::{ + rocksdb_storage::{PrefixedRocksDbStorage, PrefixedRocksDbStorageError}, + Storage, +}; /// Limit of possible indirections const MAX_REFERENCE_HOPS: usize = 10; @@ -24,7 +27,7 @@ const ROOT_LEAFS_SERIALIZED_KEY: &[u8] = b"rootLeafsSerialized"; #[derive(Debug, thiserror::Error)] pub enum Error { #[error("rocksdb error")] - RocksDBError(#[from] merk::rocksdb::Error), + RocksDBError(#[from] PrefixedRocksDbStorageError), #[error("unable to open Merk db")] MerkError(merk::Error), #[error("invalid path: {0}")] @@ -46,57 +49,65 @@ impl From for Error { pub struct GroveDb { root_tree: MerkleTree, root_leaf_keys: HashMap, usize>, - subtrees: HashMap, Merk>, - db: Rc, + subtrees: HashMap, Merk>, + meta_storage: PrefixedRocksDbStorage, + db: Rc, } impl GroveDb { pub fn open>(path: P) -> Result { - let db = Rc::new(rocksdb::DB::open_cf_descriptors( - &Merk::default_db_opts(), - path, - merk::column_families(), - )?); + let db = Rc::new( + storage::rocksdb_storage::DB::open_cf_descriptors( + &storage::rocksdb_storage::default_db_opts(), + path, + storage::rocksdb_storage::column_families(), + ) + .map_err(Into::::into)?, + ); + let meta_storage = PrefixedRocksDbStorage::new(db.clone(), Vec::new())?; let mut subtrees = HashMap::new(); // TODO: owned `get` is not required for deserialization - if let Some(prefixes_serialized) = db.get(SUBTRESS_SERIALIZED_KEY)? { + if let Some(prefixes_serialized) = meta_storage.get_meta(SUBTRESS_SERIALIZED_KEY)? { let subtrees_prefixes: Vec> = bincode::deserialize(&prefixes_serialized)?; for prefix in subtrees_prefixes { - let subtree_merk = Merk::open(db.clone(), prefix.to_vec())?; + let subtree_merk = + Merk::open(PrefixedRocksDbStorage::new(db.clone(), prefix.to_vec())?)?; subtrees.insert(prefix.to_vec(), subtree_merk); } } // TODO: owned `get` is not required for deserialization - let root_leaf_keys: HashMap, usize> = - if let Some(root_leaf_keys_serialized) = db.get(ROOT_LEAFS_SERIALIZED_KEY)? { - bincode::deserialize(&root_leaf_keys_serialized)? - } else { - HashMap::new() - }; + let root_leaf_keys: HashMap, usize> = if let Some(root_leaf_keys_serialized) = + meta_storage.get_meta(ROOT_LEAFS_SERIALIZED_KEY)? + { + bincode::deserialize(&root_leaf_keys_serialized)? + } else { + HashMap::new() + }; Ok(GroveDb { root_tree: Self::build_root_tree(&subtrees, &root_leaf_keys), - db: db.clone(), + db, subtrees, root_leaf_keys, + meta_storage, }) } fn store_subtrees_keys_data(&self) -> Result<(), Error> { let prefixes: Vec> = self.subtrees.keys().map(|x| x.clone()).collect(); - self.db - .put(SUBTRESS_SERIALIZED_KEY, bincode::serialize(&prefixes)?)?; - self.db.put( + self.meta_storage + .put_meta(SUBTRESS_SERIALIZED_KEY, &bincode::serialize(&prefixes)?)?; + self.meta_storage.put_meta( ROOT_LEAFS_SERIALIZED_KEY, - bincode::serialize(&self.root_leaf_keys)?, + &bincode::serialize(&self.root_leaf_keys)?, )?; Ok(()) } fn build_root_tree( - subtrees: &HashMap, Merk>, + subtrees: &HashMap, Merk>, root_leaf_keys: &HashMap, usize>, ) -> MerkleTree { let mut leaf_hashes: Vec<[u8; 32]> = vec![[0; 32]; root_leaf_keys.len()]; @@ -121,13 +132,17 @@ impl GroveDb { match &mut element { Element::Tree(subtree_root_hash) => { // Helper closure to create a new subtree under path + key - let create_subtree_merk = || -> Result<(Vec, Merk), Error> { - let compressed_path_subtree = Self::compress_path(path, Some(&key)); - Ok(( - compressed_path_subtree.clone(), - Merk::open(self.db.clone(), compressed_path_subtree)?, - )) - }; + let create_subtree_merk = + || -> Result<(Vec, Merk), Error> { + let compressed_path_subtree = Self::compress_path(path, Some(&key)); + Ok(( + compressed_path_subtree.clone(), + Merk::open(PrefixedRocksDbStorage::new( + self.db.clone(), + compressed_path_subtree, + )?)?, + )) + }; if path.is_empty() { // Add subtree to the root tree @@ -200,7 +215,7 @@ impl GroveDb { Element::get(&merk, key) } - fn follow_reference<'a>(&self, mut path: Vec>) -> Result { + fn follow_reference(&self, mut path: Vec>) -> Result { let mut hops_left = MAX_REFERENCE_HOPS; let mut current_element; let mut visited = HashSet::new(); diff --git a/grovedb/src/subtree.rs b/grovedb/src/subtree.rs index 6b3c5a36f..f70fc6aef 100644 --- a/grovedb/src/subtree.rs +++ b/grovedb/src/subtree.rs @@ -3,6 +3,7 @@ //! Merk API to GroveDB needs. use merk::Op; use serde::{Deserialize, Serialize}; +use storage::rocksdb_storage::PrefixedRocksDbStorage; use crate::{Error, Merk}; @@ -27,7 +28,7 @@ impl Element { /// Get an element from Merk under a key; path should be resolved and proper /// Merk should be loaded by this moment - pub fn get(merk: &Merk, key: &[u8]) -> Result { + pub fn get(merk: &Merk, key: &[u8]) -> Result { let element = bincode::deserialize( merk.get(&key)? .ok_or(Error::InvalidPath("key not found in Merk"))? @@ -38,7 +39,11 @@ impl Element { /// Insert an element in Merk under a key; path should be resolved and /// proper Merk should be loaded by this moment - pub fn insert(&self, merk: &mut Merk, key: Vec) -> Result<(), Error> { + pub fn insert( + &self, + merk: &mut Merk, + key: Vec, + ) -> Result<(), Error> { let batch = [(key, Op::Put(bincode::serialize(self)?))]; merk.apply(&batch, &[]).map_err(|e| e.into()) } diff --git a/merk/Cargo.toml b/merk/Cargo.toml index 738a6cb17..ab4d7f762 100644 --- a/merk/Cargo.toml +++ b/merk/Cargo.toml @@ -8,6 +8,10 @@ license = "MIT" [dependencies] tempdir = "0.3.7" +storage = { path = "../storage" } +thiserror = "1.0.30" +failure = "0.1.8" +rocksdb = "0.17.0" [dependencies.time] version = "0.1.42" @@ -29,10 +33,6 @@ optional = true version = "1.3.2" optional = true -[dependencies.failure] -version = "0.1.6" -optional = true - [dependencies.ed] version = "0.1.6" optional = true @@ -46,13 +46,6 @@ version = "0.8.3" features = ["small_rng"] optional = true -[dependencies.rocksdb] -git = "https://github.com/rust-rocksdb/rust-rocksdb" -version = "0.16.0" -rev = "v0.16.0" -default-features = false -optional = true - [dependencies.jemallocator] version = "0.3.2" features = ["disable_initial_exec_tls"] @@ -61,18 +54,15 @@ optional = true [features] default = ["full", "verify"] full = ["rand", - "rocksdb", - "time", + "time", "hex", "colored", "num_cpus", "byteorder", - "failure", "ed", "blake3", "jemallocator" ] verify = ["ed", - "failure", "blake3" ] diff --git a/merk/src/lib.rs b/merk/src/lib.rs index f64473173..8c3c1cbea 100644 --- a/merk/src/lib.rs +++ b/merk/src/lib.rs @@ -1,18 +1,11 @@ #![feature(map_first_last)] -#[global_allocator] -#[cfg(feature = "full")] -static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; - -#[cfg(feature = "full")] -pub use rocksdb; - /// Error and Result types. mod error; /// The top-level store API. #[cfg(feature = "full")] mod merk; -pub use crate::merk::column_families; + /// Provides a container type that allows temporarily taking ownership of a /// value. // TODO: move this into its own crate @@ -29,8 +22,8 @@ pub use error::{Error, Result}; #[allow(deprecated)] pub use proofs::query::verify_query; pub use proofs::query::{execute_proof, verify}; -pub use tree::{Batch, BatchEntry, Hash, Op, PanicSource, HASH_LENGTH}; +pub use tree::{BatchEntry, Hash, MerkBatch, Op, PanicSource, HASH_LENGTH}; -#[cfg(feature = "full")] -// pub use crate::merk::{chunks, restore, Merk}; -pub use crate::merk::{chunks, Merk}; +// #[cfg(feature = "full")] +// // pub use crate::merk::{chunks, restore, Merk}; +pub use crate::merk::Merk; diff --git a/merk/src/merk/chunks.rs b/merk/src/merk/chunks.rs index 6d6665c5e..b2fa3c5d5 100644 --- a/merk/src/merk/chunks.rs +++ b/merk/src/merk/chunks.rs @@ -1,9 +1,11 @@ //! Provides `ChunkProducer`, which creates chunk proofs for full replication of //! a Merk. +use std::marker::PhantomData; + use ed::Encode; use failure::bail; -use rocksdb::DBRawIterator; +use storage::{RawIterator, Storage}; use super::Merk; use crate::{ @@ -14,17 +16,27 @@ use crate::{ /// A `ChunkProducer` allows the creation of chunk proofs, used for trustlessly /// replicating entire Merk trees. Chunks can be generated on the fly in a /// random order, or iterated in order for slightly better performance. -pub struct ChunkProducer<'a> { +pub struct ChunkProducer<'a, S: Storage + 'a> +where + S: Storage, + // crate::error::Error: From, + ::Error: std::error::Error + Sync + Send + 'static, +{ trunk: Vec, chunk_boundaries: Vec>, - raw_iter: DBRawIterator<'a>, + raw_iter: S::RawIterator<'a>, index: usize, } -impl<'a> ChunkProducer<'a> { +impl<'a, S> ChunkProducer<'a, S> +where + S: Storage, + // crate::error::Error: From, + ::Error: std::error::Error + Sync + Send + 'static, +{ /// Creates a new `ChunkProducer` for the given `Merk` instance. In the /// constructor, the first chunk (the "trunk") will be created. - pub fn new(merk: &'a Merk) -> Result { + pub fn new(merk: &'a Merk) -> Result { let (trunk, has_more) = merk.walk(|maybe_walker| match maybe_walker { Some(mut walker) => walker.create_trunk_proof(), None => Ok((vec![], false)), @@ -111,9 +123,14 @@ impl<'a> ChunkProducer<'a> { } } -impl<'a> IntoIterator for ChunkProducer<'a> { - type IntoIter = ChunkIter<'a>; - type Item = as Iterator>::Item; +impl<'a, S> IntoIterator for ChunkProducer<'a, S> +where + S: Storage, + // crate::error::Error: From, + ::Error: std::error::Error + Sync + Send + 'static, +{ + type IntoIter = ChunkIter<'a, S>; + type Item = as Iterator>::Item; fn into_iter(self) -> Self::IntoIter { ChunkIter(self) @@ -123,9 +140,18 @@ impl<'a> IntoIterator for ChunkProducer<'a> { /// A `ChunkIter` iterates through all the chunks for the underlying `Merk` /// instance in order (the first chunk is the "trunk" chunk). Yields `None` /// after all chunks have been yielded. -pub struct ChunkIter<'a>(ChunkProducer<'a>); - -impl<'a> Iterator for ChunkIter<'a> { +pub struct ChunkIter<'a, S>(ChunkProducer<'a, S>) +where + S: Storage, + // crate::error::Error: From, + ::Error: std::error::Error + Sync + Send + 'static; + +impl<'a, S> Iterator for ChunkIter<'a, S> +where + S: Storage, + // crate::error::Error: From, + ::Error: std::error::Error + Sync + Send + 'static, +{ type Item = Result>; fn size_hint(&self) -> (usize, Option) { @@ -141,16 +167,22 @@ impl<'a> Iterator for ChunkIter<'a> { } } -impl Merk { +impl Merk +where + S: Storage, + // crate::error::Error: From, + ::Error: std::error::Error + Sync + Send + 'static, +{ /// Creates a `ChunkProducer` which can return chunk proofs for replicating /// the entire Merk tree. - pub fn chunks(&self) -> Result { + pub fn chunks(&self) -> Result> { ChunkProducer::new(self) } } #[cfg(test)] mod tests { + use storage::rocksdb_storage::{default_rocksdb, PrefixedRocksDbStorage}; use tempdir::TempDir; use super::*; @@ -211,7 +243,8 @@ mod tests { let tmp_dir = TempDir::new("chunks_from_reopen").expect("cannot create tempdir"); let original_chunks = { let db = default_rocksdb(tmp_dir.path()); - let mut merk = Merk::open(db, Vec::new()).unwrap(); + let mut merk = + Merk::open(PrefixedRocksDbStorage::new(db, Vec::new()).unwrap()).unwrap(); let batch = make_batch_seq(1..10); merk.apply(batch.as_slice(), &[]).unwrap(); @@ -224,7 +257,7 @@ mod tests { }; let db = default_rocksdb(tmp_dir.path()); - let merk = Merk::open(db, Vec::new()).unwrap(); + let merk = Merk::open(PrefixedRocksDbStorage::new(db, Vec::new()).unwrap()).unwrap(); let reopen_chunks = merk.chunks().unwrap().into_iter().map(Result::unwrap); for (original, checkpoint) in original_chunks.zip(reopen_chunks) { diff --git a/merk/src/merk/mod.rs b/merk/src/merk/mod.rs index 9505adbcc..beddb79d1 100644 --- a/merk/src/merk/mod.rs +++ b/merk/src/merk/mod.rs @@ -2,73 +2,49 @@ pub mod chunks; // TODO // pub mod restore; -use std::{ - cell::Cell, - cmp::Ordering, - collections::LinkedList, - path::{Path, PathBuf}, - rc::Rc, -}; +use std::{cell::Cell, cmp::Ordering, collections::LinkedList}; -use failure::bail; -use rocksdb::{checkpoint::Checkpoint, ColumnFamilyDescriptor, WriteBatch}; +use failure::format_err; +use storage::{self, Batch, Storage, Store}; use crate::{ error::Result, proofs::{encode_into, query::QueryItem, Query}, - tree::{Batch, Commit, Fetch, Hash, Link, Op, RefWalker, Tree, Walker, NULL_HASH}, + tree::{Commit, Fetch, Hash, Link, MerkBatch, Op, RefWalker, Tree, Walker, NULL_HASH}, }; const ROOT_KEY_KEY: &[u8] = b"root"; -const AUX_CF_NAME: &str = "aux"; -const INTERNAL_CF_NAME: &str = "internal"; - -pub fn column_families() -> Vec { - vec![ - // TODO: clone opts or take args - ColumnFamilyDescriptor::new(AUX_CF_NAME, Merk::default_db_opts()), - ColumnFamilyDescriptor::new(INTERNAL_CF_NAME, Merk::default_db_opts()), - ] -} /// A handle to a Merkle key/value store backed by RocksDB. -pub struct Merk { +pub struct Merk +where + S: Storage, + crate::error::Error: From, +{ pub(crate) tree: Cell>, - pub(crate) db: Rc, - pub(crate) prefix: Vec, + pub(crate) storage: S, } pub type UseTreeMutResult = Result, Option>)>>; -impl Merk { - pub fn open(db: Rc, prefix: Vec) -> Result { +impl Merk +where + crate::error::Error: From<::Error>, + ::Error: std::error::Error, +{ + pub fn open(storage: S) -> Result> { let mut merk = Merk { tree: Cell::new(None), - db, - prefix, + storage, }; merk.load_root()?; Ok(merk) } - pub fn default_db_opts() -> rocksdb::Options { - let mut opts = rocksdb::Options::default(); - opts.create_if_missing(true); - opts.increase_parallelism(num_cpus::get() as i32); - // opts.set_advise_random_on_open(false); - opts.set_allow_mmap_writes(true); - opts.set_allow_mmap_reads(true); - opts.create_missing_column_families(true); - opts.set_atomic_flush(true); - // TODO: tune - opts - } - /// Gets an auxiliary value. pub fn get_aux(&self, key: &[u8]) -> Result>> { - let aux_cf = self.db.cf_handle(AUX_CF_NAME); - Ok(self.db.get_cf(aux_cf.unwrap(), key)?) + Ok(self.storage.get_aux(key)?) } /// Gets a value for the given key. If the key is not found, `None` is @@ -112,7 +88,7 @@ impl Merk { match maybe_child { None => { // fetch from RocksDB - break fetch_node(&self.db, &self.prefix, &key) + break Tree::get(&self.storage, key) .map(|maybe_node| maybe_node.map(|node| f(&node))); } Some(child) => cursor = child, // traverse to child @@ -143,19 +119,19 @@ impl Merk { /// use merk::Op; /// /// let batch = &[ - /// (vec![1, 2, 3], Op::Put(vec![4, 5, 6])), // puts value [4,5,6] to key [1,2,3] + /// (vec![1, 2, 3], Op::Put(vec![4, 5, 6])), // puts value [4,5,6] to key[1,2,3] /// (vec![4, 5, 6], Op::Delete), // deletes key [4,5,6] /// ]; /// store.apply(batch, &[]).unwrap(); /// ``` - pub fn apply(&mut self, batch: &Batch, aux: &Batch) -> Result<()> { + pub fn apply(&mut self, batch: &MerkBatch, aux: &MerkBatch) -> Result<()> { // ensure keys in batch are sorted and unique let mut maybe_prev_key: Option> = None; for (key, _) in batch.iter() { if let Some(prev_key) = maybe_prev_key { match prev_key.cmp(key) { - Ordering::Greater => bail!("Keys in batch must be sorted"), - Ordering::Equal => bail!("Keys in batch must be unique"), + Ordering::Greater => return Err(format_err!("Keys in batch must be sorted")), + Ordering::Equal => return Err(format_err!("Keys in batch must be unique")), _ => (), } } @@ -186,7 +162,7 @@ impl Merk { /// ]; /// unsafe { store.apply_unchecked(batch, &[]).unwrap() }; /// ``` - pub unsafe fn apply_unchecked(&mut self, batch: &Batch, aux: &Batch) -> Result<()> { + pub unsafe fn apply_unchecked(&mut self, batch: &MerkBatch, aux: &MerkBatch) -> Result<()> { let maybe_walker = self .tree .take() @@ -216,18 +192,18 @@ impl Merk { self.prove_unchecked(query) } - /// Creates a Merkle proof for the list of queried keys. For each key in the - /// query, if the key is found in the store then the value will be proven to - /// be in the tree. For each key in the query that does not exist in the - /// tree, its absence will be proven by including boundary keys. - /// + /// Creates a Merkle proof for the list of queried keys. For each key in + /// the query, if the key is found in the store then the value will be + /// proven to be in the tree. For each key in the query that does not + /// exist in the tree, its absence will be proven by including + /// boundary keys. /// The proof returned is in an encoded format which can be verified with /// `merk::verify`. /// /// This is unsafe because the keys in `query` must be sorted and unique - - /// if they are not, there will be undefined behavior. For a safe version of - /// this method which checks to ensure the batch is sorted and unique, see - /// `prove`. + /// if they are not, there will be undefined behavior. For a safe version + /// of this method which checks to ensure the batch is sorted and + /// unique, see `prove`. pub fn prove_unchecked(&self, query: I) -> Result> where Q: Into, @@ -236,10 +212,7 @@ impl Merk { let query_vec: Vec = query.into_iter().map(Into::into).collect(); self.use_tree_mut(|maybe_tree| { - let tree = match maybe_tree { - None => bail!("Cannot create proof for empty tree"), - Some(tree) => tree, - }; + let tree = maybe_tree.ok_or(format_err!("Cannot create proof for empty tree"))?; let mut ref_walker = RefWalker::new(tree, self.source()); let (proof, _) = ref_walker.create_proof(query_vec.as_slice())?; @@ -250,32 +223,22 @@ impl Merk { }) } - pub fn flush(&self) -> Result<()> { - Ok(self.db.flush()?) - } - - pub fn commit(&mut self, deleted_keys: LinkedList>, aux: &Batch) -> Result<()> { - let internal_cf = self.db.cf_handle(INTERNAL_CF_NAME).unwrap(); - let aux_cf = self.db.cf_handle(AUX_CF_NAME).unwrap(); - - let mut batch = rocksdb::WriteBatch::default(); + pub fn commit(&mut self, deleted_keys: LinkedList>, aux: &MerkBatch) -> Result<()> { + let mut batch = self.storage.new_batch()?; let mut to_batch = self.use_tree_mut(|maybe_tree| -> UseTreeMutResult { // TODO: concurrent commit - // - let mut prefixed_root = self.prefix.clone(); - prefixed_root.extend_from_slice(ROOT_KEY_KEY); if let Some(tree) = maybe_tree { // TODO: configurable committer let mut committer = MerkCommitter::new(tree.height(), 100); tree.commit(&mut committer)?; // update pointer to root node - batch.put_cf(internal_cf, prefixed_root, tree.key()); + batch.put_root(ROOT_KEY_KEY, tree.key()); Ok(committer.batch) } else { // empty tree, delete pointer to root - batch.delete_cf(internal_cf, prefixed_root); + batch.delete_root(ROOT_KEY_KEY); Ok(vec![]) } })?; @@ -286,31 +249,27 @@ impl Merk { } to_batch.sort_by(|a, b| a.0.cmp(&b.0)); for (key, maybe_value) in to_batch { - let mut prefixed_key = self.prefix.clone(); - prefixed_key.extend_from_slice(&key); if let Some(value) = maybe_value { - batch.put(prefixed_key, value); + batch.put(&key, &value); } else { - batch.delete(prefixed_key); + batch.delete(&key); } } for (key, value) in aux { - let mut prefixed_key = self.prefix.clone(); - prefixed_key.extend_from_slice(&key); match value { - Op::Put(value) => batch.put_cf(aux_cf, prefixed_key, value), - Op::Delete => batch.delete_cf(aux_cf, prefixed_key), + Op::Put(value) => batch.put_aux(key, value), + Op::Delete => batch.delete_aux(key), }; } // write to db - self.write(batch)?; + self.storage.commit_batch(batch)?; Ok(()) } - pub fn walk(&self, f: impl FnOnce(Option>) -> T) -> T { + pub fn walk(&self, f: impl FnOnce(Option>>) -> T) -> T { let mut tree = self.tree.take(); let maybe_walker = tree .as_mut() @@ -320,19 +279,18 @@ impl Merk { res } - pub fn raw_iter(&self) -> rocksdb::DBRawIterator { - self.db.raw_iterator() + pub fn raw_iter<'a>(&'a self) -> S::RawIterator<'a> { + self.storage.raw_iter() } - // pub fn checkpoint>(&self, path: P, prefix: &[u8]) -> - // Result { Checkpoint::new(&self.db)?.create_checkpoint(&path)?; - // Merk::open(path, prefix) - // } + // // pub fn checkpoint>(&self, path: P, prefix: &[u8]) -> + // // Result { Checkpoint::new(&self.db)?.create_checkpoint(&path)?; + // // Merk::open(path, prefix) + // // } - fn source(&self) -> MerkSource { + fn source(&self) -> MerkSource { MerkSource { - db: &self.db, - prefix: &self.prefix, + storage: &self.storage, } } @@ -350,50 +308,40 @@ impl Merk { res } - pub(crate) fn write(&mut self, batch: WriteBatch) -> Result<()> { - let mut opts = rocksdb::WriteOptions::default(); - opts.set_sync(false); - // TODO: disable WAL once we can ensure consistency with transactions - self.db.write_opt(batch, &opts)?; - Ok(()) - } - - pub(crate) fn set_root_key(&mut self, key: Vec) -> Result<()> { - let internal_cf = self.db.cf_handle(INTERNAL_CF_NAME).unwrap(); - let mut batch = WriteBatch::default(); - let mut prefixed_root_key = self.prefix.clone(); - prefixed_root_key.extend_from_slice(ROOT_KEY_KEY); - batch.put_cf(internal_cf, prefixed_root_key, key); - self.write(batch) - } - - pub(crate) fn fetch_node(&self, prefix: &[u8], key: &[u8]) -> Result> { - fetch_node(&self.db, prefix, key) + pub(crate) fn set_root_key(&mut self, key: &[u8]) -> Result<()> { + Ok(self.storage.put_root(ROOT_KEY_KEY, key)?) } pub(crate) fn load_root(&mut self) -> Result<()> { - let internal_cf = self.db.cf_handle(INTERNAL_CF_NAME).unwrap(); - let mut prefixed_root_key = self.prefix.clone(); - prefixed_root_key.extend_from_slice(ROOT_KEY_KEY); - let tree = self - .db - .get_pinned_cf(internal_cf, &prefixed_root_key)? - .map(|root_key| fetch_existing_node(&self.db, &self.prefix, &root_key)) - .transpose()?; - self.tree = Cell::new(tree); + if let Some(tree_root_key) = self.storage.get_root(ROOT_KEY_KEY)? { + let tree = Tree::get(&self.storage, &tree_root_key)?; + self.tree = Cell::new(tree); + } Ok(()) } } -#[derive(Clone)] -pub struct MerkSource<'a> { - db: &'a rocksdb::DB, - prefix: &'a [u8], +// TODO: get rid of Fetch/source and use GroveDB storage abstraction +#[derive(Debug)] +pub struct MerkSource<'a, S: Storage> { + storage: &'a S, } -impl<'a> Fetch for MerkSource<'a> { +impl<'a, S: Storage> Clone for MerkSource<'a, S> { + fn clone(&self) -> Self { + MerkSource { + storage: self.storage, + } + } +} + +impl<'a, S: Storage> Fetch for MerkSource<'a, S> +where + crate::error::Error: From<::Error>, + ::Error: std::error::Error, +{ fn fetch(&self, link: &Link) -> Result { - fetch_existing_node(self.db, &self.prefix, link.key()) + Ok(Tree::get(&self.storage, link.key())?.ok_or(format_err!("Key not found"))?) } } @@ -428,26 +376,9 @@ impl Commit for MerkCommitter { } } -fn fetch_node(db: &rocksdb::DB, prefix: &[u8], key: &[u8]) -> Result> { - let mut prefixed_key = prefix.to_vec(); - prefixed_key.extend_from_slice(key); - let bytes = db.get_pinned(&prefixed_key)?; - if let Some(bytes) = bytes { - Ok(Some(Tree::decode(key.to_vec(), &bytes))) - } else { - Ok(None) - } -} - -fn fetch_existing_node(db: &rocksdb::DB, prefix: &[u8], key: &[u8]) -> Result { - match fetch_node(db, prefix, key)? { - None => bail!("key not found: {:?}", key), - Some(node) => Ok(node), - } -} - #[cfg(test)] mod test { + use storage::rocksdb_storage::{default_rocksdb, PrefixedRocksDbStorage}; use tempdir::TempDir; use super::{Merk, MerkSource, RefWalker}; @@ -516,7 +447,7 @@ mod test { let key = batch.first().unwrap().0.clone(); merk.apply(&[(key.clone(), Op::Delete)], &[]).unwrap(); - let value = merk.db.get(key.as_slice()).unwrap(); + let value = merk.inner.get(key.as_slice()).unwrap(); assert!(value.is_none()); } @@ -576,7 +507,10 @@ mod test { #[test] fn reopen() { - fn collect(mut node: RefWalker, nodes: &mut Vec>) { + fn collect( + mut node: RefWalker>, + nodes: &mut Vec>, + ) { nodes.push(node.tree().encode()); node.walk(true).unwrap().map(|c| collect(c, nodes)); node.walk(false).unwrap().map(|c| collect(c, nodes)); @@ -586,7 +520,8 @@ mod test { let original_nodes = { let db = default_rocksdb(tmp_dir.path()); - let mut merk = Merk::open(db, Vec::new()).unwrap(); + let mut merk = + Merk::open(PrefixedRocksDbStorage::new(db, Vec::new()).unwrap()).unwrap(); let batch = make_batch_seq(1..10_000); merk.apply(batch.as_slice(), &[]).unwrap(); let mut tree = merk.tree.take().unwrap(); @@ -598,7 +533,7 @@ mod test { }; let db = default_rocksdb(tmp_dir.path()); - let merk = Merk::open(db, Vec::new()).unwrap(); + let merk = Merk::open(PrefixedRocksDbStorage::new(db, Vec::new()).unwrap()).unwrap(); let mut tree = merk.tree.take().unwrap(); let walker = RefWalker::new(&mut tree, merk.source()); @@ -620,7 +555,8 @@ mod test { let original_nodes = { let db = default_rocksdb(tmp_dir.path()); - let mut merk = Merk::open(db, Vec::new()).unwrap(); + let mut merk = + Merk::open(PrefixedRocksDbStorage::new(db, Vec::new()).unwrap()).unwrap(); let batch = make_batch_seq(1..10_000); merk.apply(batch.as_slice(), &[]).unwrap(); @@ -629,7 +565,7 @@ mod test { nodes }; let db = default_rocksdb(tmp_dir.path()); - let mut merk = Merk::open(db, Vec::new()).unwrap(); + let merk = Merk::open(PrefixedRocksDbStorage::new(db, Vec::new()).unwrap()).unwrap(); let mut reopen_nodes = vec![]; collect(&mut merk.raw_iter(), &mut reopen_nodes); diff --git a/merk/src/proofs/chunk.rs b/merk/src/proofs/chunk.rs index 40f088e78..35d8e937d 100644 --- a/merk/src/proofs/chunk.rs +++ b/merk/src/proofs/chunk.rs @@ -1,3 +1,4 @@ +use storage::RawIterator; #[cfg(feature = "full")] use { super::tree::{execute, Tree as ProofTree}, @@ -20,7 +21,7 @@ pub const MIN_TRUNK_HEIGHT: usize = 5; impl<'a, S> RefWalker<'a, S> where - S: Fetch + Sized + Send + Clone, + S: Fetch + Sized + Clone, { /// Generates a trunk proof by traversing the tree. /// @@ -125,7 +126,10 @@ where /// /// Advances the iterator for all nodes in the chunk and the `end_key` (if any). #[cfg(feature = "full")] -pub(crate) fn get_next_chunk(iter: &mut DBRawIterator, end_key: Option<&[u8]>) -> Result> { +pub(crate) fn get_next_chunk( + iter: &mut impl RawIterator, + end_key: Option<&[u8]>, +) -> Result> { let mut chunk = Vec::with_capacity(512); let mut stack = Vec::with_capacity(32); let mut node = Tree::new(vec![], vec![]); @@ -414,7 +418,7 @@ mod tests { merk.tree.set(root_node); // whole tree as 1 leaf - let mut iter = merk.db.raw_iterator(); + let mut iter = merk.inner.raw_iter(); iter.seek_to_first(); let chunk = get_next_chunk(&mut iter, None).unwrap(); let ops = chunk.into_iter().map(|op| Ok(op)); @@ -425,7 +429,7 @@ mod tests { assert_eq!(counts.kvhash, 0); drop(iter); - let mut iter = merk.db.raw_iterator(); + let mut iter = merk.inner.raw_iter(); iter.seek_to_first(); // left leaf diff --git a/merk/src/proofs/query/mod.rs b/merk/src/proofs/query/mod.rs index c4ae2c675..191e85161 100644 --- a/merk/src/proofs/query/mod.rs +++ b/merk/src/proofs/query/mod.rs @@ -231,7 +231,7 @@ impl Link { impl<'a, S> RefWalker<'a, S> where - S: Fetch + Sized + Send + Clone, + S: Fetch + Sized + Clone, { /// Creates a `Node::KV` from the key/value pair of the root node. pub(crate) fn to_kv_node(&self) -> Node { diff --git a/merk/src/test_utils/crash_merk.rs b/merk/src/test_utils/crash_merk.rs index f1f57db55..6d39bfca6 100644 --- a/merk/src/test_utils/crash_merk.rs +++ b/merk/src/test_utils/crash_merk.rs @@ -1,9 +1,9 @@ use std::{ - fs, - mem::ManuallyDrop, ops::{Deref, DerefMut}, + rc::Rc, }; +use storage::rocksdb_storage::{default_rocksdb, PrefixedRocksDbStorage}; use tempdir::TempDir; use crate::{Merk, Result}; @@ -11,8 +11,9 @@ use crate::{Merk, Result}; /// Wraps a Merk instance and drops it without flushing once it goes out of /// scope. pub struct CrashMerk { - merk: Merk, + merk: Merk, path: Option, + _db: Rc, } impl CrashMerk { @@ -20,11 +21,12 @@ impl CrashMerk { /// does not exist. pub fn open() -> Result { let path = TempDir::new("db").expect("cannot create tempdir"); - let db = super::default_rocksdb(path.path()); - let merk = Merk::open(db, Vec::new())?; + let db = default_rocksdb(path.path()); + let merk = Merk::open(PrefixedRocksDbStorage::new(db.clone(), Vec::new()).unwrap())?; Ok(CrashMerk { merk, path: Some(path), + _db: db, }) } @@ -34,15 +36,15 @@ impl CrashMerk { } impl Deref for CrashMerk { - type Target = Merk; + type Target = Merk; - fn deref(&self) -> &Merk { + fn deref(&self) -> &Merk { &self.merk } } impl DerefMut for CrashMerk { - fn deref_mut(&mut self) -> &mut Merk { + fn deref_mut(&mut self) -> &mut Merk { &mut self.merk } } @@ -55,8 +57,6 @@ mod tests { #[test] #[ignore] // currently this still works because we enabled the WAL fn crash() { - let path = std::thread::current().name().unwrap().to_owned(); - let mut merk = CrashMerk::open().expect("failed to open merk"); merk.apply(&[(vec![1, 2, 3], Op::Put(vec![4, 5, 6]))], &[]) .expect("apply failed"); diff --git a/merk/src/test_utils/mod.rs b/merk/src/test_utils/mod.rs index 1956997a5..7c0fca3b4 100644 --- a/merk/src/test_utils/mod.rs +++ b/merk/src/test_utils/mod.rs @@ -6,9 +6,9 @@ use std::{convert::TryInto, ops::Range}; use byteorder::{BigEndian, WriteBytesExt}; pub use crash_merk::CrashMerk; use rand::prelude::*; -pub use temp_merk::{default_rocksdb, TempMerk}; +pub use temp_merk::TempMerk; -use crate::tree::{Batch, BatchEntry, NoopCommit, Op, PanicSource, Tree, Walker}; +use crate::tree::{BatchEntry, MerkBatch, NoopCommit, Op, PanicSource, Tree, Walker}; pub fn assert_tree_invariants(tree: &Tree) { assert!(tree.balance_factor().abs() < 2); @@ -33,7 +33,7 @@ pub fn assert_tree_invariants(tree: &Tree) { } } -pub fn apply_memonly_unchecked(tree: Tree, batch: &Batch) -> Tree { +pub fn apply_memonly_unchecked(tree: Tree, batch: &MerkBatch) -> Tree { let walker = Walker::::new(tree, PanicSource {}); let mut tree = Walker::::apply_to(Some(walker), batch, PanicSource {}) .expect("apply failed") @@ -43,13 +43,13 @@ pub fn apply_memonly_unchecked(tree: Tree, batch: &Batch) -> Tree { tree } -pub fn apply_memonly(tree: Tree, batch: &Batch) -> Tree { +pub fn apply_memonly(tree: Tree, batch: &MerkBatch) -> Tree { let tree = apply_memonly_unchecked(tree, batch); assert_tree_invariants(&tree); tree } -pub fn apply_to_memonly(maybe_tree: Option, batch: &Batch) -> Option { +pub fn apply_to_memonly(maybe_tree: Option, batch: &MerkBatch) -> Option { let maybe_walker = maybe_tree.map(|tree| Walker::::new(tree, PanicSource {})); Walker::::apply_to(maybe_walker, batch, PanicSource {}) .expect("apply failed") diff --git a/merk/src/test_utils/temp_merk.rs b/merk/src/test_utils/temp_merk.rs index c1fc2c06b..147ca0cc2 100644 --- a/merk/src/test_utils/temp_merk.rs +++ b/merk/src/test_utils/temp_merk.rs @@ -4,14 +4,16 @@ use std::{ rc::Rc, }; +use storage::rocksdb_storage::{default_rocksdb, PrefixedRocksDbStorage}; use tempdir::TempDir; use crate::Merk; /// Wraps a Merk instance and deletes it from disk it once it goes out of scope. pub struct TempMerk { - pub inner: Merk, + pub inner: Merk, pub path: TempDir, + _db: Rc, } impl TempMerk { @@ -19,30 +21,26 @@ impl TempMerk { pub fn new() -> TempMerk { let path = TempDir::new("db").expect("cannot create tempdir"); let db = default_rocksdb(path.path()); + let inner = PrefixedRocksDbStorage::new(db.clone(), Vec::new()) + .expect("cannot create prefixed storage"); TempMerk { - inner: Merk::open(db, Vec::new()).expect("cannot open Merk"), + inner: Merk::open(inner).expect("cannot open Merk"), path, + _db: db, } } } -pub fn default_rocksdb(path: &Path) -> Rc { - Rc::new( - rocksdb::DB::open_cf_descriptors(&Merk::default_db_opts(), &path, crate::column_families()) - .expect("cannot create rocksdb"), - ) -} - impl Deref for TempMerk { - type Target = Merk; + type Target = Merk; - fn deref(&self) -> &Merk { + fn deref(&self) -> &Merk { &self.inner } } impl DerefMut for TempMerk { - fn deref_mut(&mut self) -> &mut Merk { + fn deref_mut(&mut self) -> &mut Merk { &mut self.inner } } diff --git a/merk/src/tree/encoding.rs b/merk/src/tree/encoding.rs index 84f848376..095a3d370 100644 --- a/merk/src/tree/encoding.rs +++ b/merk/src/tree/encoding.rs @@ -1,6 +1,35 @@ use ed::{Decode, Encode}; +use storage::{Storage, Store}; use super::Tree; +use crate::error::Error; + +impl Store for Tree { + type Error = Error; + + fn encode(&self) -> Vec { + self.encode() + } + + fn decode(bytes: &[u8]) -> Result { + Decode::decode(bytes) + } + + fn get(storage: S, key: &[u8]) -> Result, Self::Error> + where + S: Storage, + Self::Error: From, + { + let mut tree: Option = storage + .get(key)? + .map(|x| ::decode(&x)) + .transpose()?; + if let Some(ref mut t) = tree { + t.set_key(key.to_vec()); + } + Ok(tree) + } +} impl Tree { #[inline] @@ -31,6 +60,7 @@ impl Tree { #[inline] pub fn decode(key: Vec, input: &[u8]) -> Tree { // operation is infallible so it's ok to unwrap + // TODO: how said that its infallible? let mut tree: Tree = Decode::decode(input).unwrap(); tree.inner.kv.key = key; tree diff --git a/merk/src/tree/mod.rs b/merk/src/tree/mod.rs index 7c433f093..a09c0af54 100644 --- a/merk/src/tree/mod.rs +++ b/merk/src/tree/mod.rs @@ -17,7 +17,7 @@ use ed::{Decode, Encode}; pub use hash::{kv_hash, node_hash, Hash, HASH_LENGTH, NULL_HASH}; use kv::KV; pub use link::Link; -pub use ops::{Batch, BatchEntry, Op, PanicSource}; +pub use ops::{BatchEntry, MerkBatch, Op, PanicSource}; pub use walk::{Fetch, RefWalker, Walker}; use super::error::Result; @@ -81,6 +81,10 @@ impl Tree { self.inner.kv.key() } + pub fn set_key(&mut self, key: Vec) { + self.inner.kv.key = key; + } + /// Consumes the tree and returns its root node's key, without having to /// clone or allocate. #[inline] diff --git a/merk/src/tree/ops.rs b/merk/src/tree/ops.rs index 2b3d1cbec..57f71efcd 100644 --- a/merk/src/tree/ops.rs +++ b/merk/src/tree/ops.rs @@ -28,7 +28,7 @@ impl fmt::Debug for Op { pub type BatchEntry = (Vec, Op); /// A mapping of keys and operations. Keys should be sorted and unique. -pub type Batch = [BatchEntry]; +pub type MerkBatch = [BatchEntry]; /// A source of data which panics when called. Useful when creating a store /// which always keeps the state in memory. @@ -42,7 +42,7 @@ impl Fetch for PanicSource { impl Walker where - S: Fetch + Sized + Send + Clone, + S: Fetch + Sized + Clone, { /// Applies a batch of operations, possibly creating a new tree if /// `maybe_tree` is `None`. This is similar to `Walker::apply`, but does @@ -51,7 +51,7 @@ where /// Keys in batch must be sorted and unique. pub fn apply_to( maybe_tree: Option, - batch: &Batch, + batch: &MerkBatch, source: S, ) -> Result<(Option, LinkedList>)> { let (maybe_walker, deleted_keys) = if batch.is_empty() { @@ -70,7 +70,7 @@ where /// Builds a `Tree` from a batch of operations. /// /// Keys in batch must be sorted and unique. - fn build(batch: &Batch, source: S) -> Result> { + fn build(batch: &MerkBatch, source: S) -> Result> { if batch.is_empty() { return Ok(None); } @@ -107,7 +107,7 @@ where /// `Walker::apply`_to, but requires a populated tree. /// /// Keys in batch must be sorted and unique. - fn apply(self, batch: &Batch) -> Result<(Option, LinkedList>)> { + fn apply(self, batch: &MerkBatch) -> Result<(Option, LinkedList>)> { // binary search to see if this node's key is in the batch, and to split // into left and right batches let search = batch.binary_search_by(|(key, _op)| key.as_slice().cmp(self.tree().key())); @@ -158,7 +158,7 @@ where /// will be dispatched to workers in other threads. fn recurse( self, - batch: &Batch, + batch: &MerkBatch, mid: usize, exclusive: bool, ) -> Result<(Option, LinkedList>)> { diff --git a/merk/src/tree/walk/mod.rs b/merk/src/tree/walk/mod.rs index 184d0adc5..22ed18602 100644 --- a/merk/src/tree/walk/mod.rs +++ b/merk/src/tree/walk/mod.rs @@ -11,7 +11,7 @@ use crate::{error::Result, owner::Owner}; /// to a pruned node, detaching children as they are traversed. pub struct Walker where - S: Fetch + Sized + Clone + Send, + S: Fetch + Sized + Clone, { tree: Owner, source: S, @@ -19,7 +19,7 @@ where impl Walker where - S: Fetch + Sized + Clone + Send, + S: Fetch + Sized + Clone, { /// Creates a `Walker` with the given tree and source. pub fn new(tree: Tree, source: S) -> Self { @@ -138,7 +138,7 @@ where impl From> for Tree where - S: Fetch + Sized + Clone + Send, + S: Fetch + Sized + Clone, { fn from(walker: Walker) -> Tree { walker.into_inner() diff --git a/merk/src/tree/walk/ref_walker.rs b/merk/src/tree/walk/ref_walker.rs index c33f2d960..a937e4bfa 100644 --- a/merk/src/tree/walk/ref_walker.rs +++ b/merk/src/tree/walk/ref_walker.rs @@ -12,7 +12,7 @@ use crate::error::Result; /// since the last update). pub struct RefWalker<'a, S> where - S: Fetch + Sized + Clone + Send, + S: Fetch + Sized + Clone, { tree: &'a mut Tree, source: S, @@ -20,7 +20,7 @@ where impl<'a, S> RefWalker<'a, S> where - S: Fetch + Sized + Clone + Send, + S: Fetch + Sized + Clone, { /// Creates a `RefWalker` with the given tree and source. pub fn new(tree: &'a mut Tree, source: S) -> Self { diff --git a/storage/Cargo.toml b/storage/Cargo.toml new file mode 100644 index 000000000..73fddd70b --- /dev/null +++ b/storage/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "storage" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +num_cpus = "1.13.0" +rocksdb = "0.17.0" +thiserror = "1.0.30" + +[dev-dependencies] +tempdir = "0.3.7" diff --git a/storage/src/lib.rs b/storage/src/lib.rs new file mode 100644 index 000000000..e1ec51f9f --- /dev/null +++ b/storage/src/lib.rs @@ -0,0 +1,212 @@ +#![feature(generic_associated_types)] +pub mod rocksdb_storage; + +/// `Storage` is able to store and retrieve arbitrary bytes by key +pub trait Storage { + /// Storage error type + type Error: std::error::Error + Send + Sync + 'static; + /// Storage batch type + type Batch<'a>: Batch + where + Self: 'a; + /// Storage raw iterator type (to iterate over storage without supplying a + /// key) + type RawIterator<'a>: RawIterator + where + Self: 'a; + + /// Put `value` into data storage with `key` + fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error>; + + /// Put `value` into auxiliary data storage with `key` + fn put_aux(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error>; + + /// Put `value` into trees roots storage with `key` + fn put_root(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error>; + + /// Put `value` into GroveDB metadata storage with `key` + fn put_meta(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error>; + + /// Delete entry with `key` from data storage + fn delete(&self, key: &[u8]) -> Result<(), Self::Error>; + + /// Delete entry with `key` from auxiliary data storage + fn delete_aux(&self, key: &[u8]) -> Result<(), Self::Error>; + + /// Delete entry with `key` from trees roots storage + fn delete_root(&self, key: &[u8]) -> Result<(), Self::Error>; + + /// Delete entry with `key` from GroveDB metadata storage + fn delete_meta(&self, key: &[u8]) -> Result<(), Self::Error>; + + /// Get entry by `key` from data storage + fn get(&self, key: &[u8]) -> Result>, Self::Error>; + + /// Get entry by `key` from auxiliary data storage + fn get_aux(&self, key: &[u8]) -> Result>, Self::Error>; + + /// Get entry by `key` from trees roots storage + fn get_root(&self, key: &[u8]) -> Result>, Self::Error>; + + /// Get entry by `key` from GroveDB metadata storage + fn get_meta(&self, key: &[u8]) -> Result>, Self::Error>; + + /// Initialize a new batch + fn new_batch<'a>(&'a self) -> Result, Self::Error>; + + /// Commits changes from batch into storage + fn commit_batch<'a>(&'a self, batch: Self::Batch<'a>) -> Result<(), Self::Error>; + + /// Forces data to be written + fn flush(&self) -> Result<(), Self::Error>; + + /// Get raw iterator over storage + fn raw_iter<'a>(&'a self) -> Self::RawIterator<'a>; +} + +impl<'b, S: Storage> Storage for &'b S { + type Batch<'a> + where + 'b: 'a, + = S::Batch<'a>; + type Error = S::Error; + type RawIterator<'a> + where + 'b: 'a, + = S::RawIterator<'a>; + + fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> { + (*self).put(key, value) + } + + fn put_aux(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> { + (*self).put_aux(key, value) + } + + fn put_root(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> { + (*self).put_root(key, value) + } + + fn delete(&self, key: &[u8]) -> Result<(), Self::Error> { + (*self).delete(key) + } + + fn delete_aux(&self, key: &[u8]) -> Result<(), Self::Error> { + (*self).delete_aux(key) + } + + fn delete_root(&self, key: &[u8]) -> Result<(), Self::Error> { + (*self).delete_root(key) + } + + fn get(&self, key: &[u8]) -> Result>, Self::Error> { + (*self).get(key) + } + + fn get_aux(&self, key: &[u8]) -> Result>, Self::Error> { + (*self).get_aux(key) + } + + fn get_root(&self, key: &[u8]) -> Result>, Self::Error> { + (*self).get_root(key) + } + + fn put_meta(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> { + (*self).put_meta(key, value) + } + + fn delete_meta(&self, key: &[u8]) -> Result<(), Self::Error> { + (*self).delete_meta(key) + } + + fn get_meta(&self, key: &[u8]) -> Result>, Self::Error> { + (*self).get_meta(key) + } + + fn new_batch<'a>(&'a self) -> Result, Self::Error> { + (*self).new_batch() + } + + fn commit_batch<'a>(&'a self, batch: Self::Batch<'a>) -> Result<(), Self::Error> { + (*self).commit_batch(batch) + } + + fn flush(&self) -> Result<(), Self::Error> { + (*self).flush() + } + + fn raw_iter<'a>(&'a self) -> Self::RawIterator<'a> { + (*self).raw_iter() + } +} + +pub trait Batch { + fn put(&mut self, key: &[u8], value: &[u8]); + + fn put_aux(&mut self, key: &[u8], value: &[u8]); + + fn put_root(&mut self, key: &[u8], value: &[u8]); + + fn delete(&mut self, key: &[u8]); + + fn delete_aux(&mut self, key: &[u8]); + + fn delete_root(&mut self, key: &[u8]); +} + +pub trait RawIterator { + fn seek_to_first(&mut self); + + fn seek(&mut self, key: &[u8]); + + fn next(&mut self); + + fn value(&self) -> Option<&[u8]>; + + fn key(&self) -> Option<&[u8]>; + + fn valid(&self) -> bool; +} + +/// The `Store` trait allows to store its implementor by key using a storage `S` +/// or to delete it. +pub trait Store +where + Self: Sized, +{ + /// Error type for a process of object storing + type Error; + + /// Serialize object into bytes + fn encode(&self) -> Vec; + + /// Deserialize object from bytes + fn decode(bytes: &[u8]) -> Result; + + /// Persist object into storage + fn put(&self, storage: S, key: &[u8]) -> Result<(), Self::Error> + where + S: Storage, + Self::Error: From, + { + Ok(storage.put(key, &self.encode())?) + } + + /// Delete object from storage + fn delete(storage: S, key: &[u8]) -> Result<(), Self::Error> + where + S: Storage, + Self::Error: From, + { + Ok(storage.delete(key)?) + } + + /// Fetch object from storage `S` by `key` + fn get(storage: S, key: &[u8]) -> Result, Self::Error> + where + S: Storage, + Self::Error: From, + { + Ok(storage.get(key)?.map(|x| Self::decode(&x)).transpose()?) + } +} diff --git a/storage/src/rocksdb_storage.rs b/storage/src/rocksdb_storage.rs new file mode 100644 index 000000000..818c94e34 --- /dev/null +++ b/storage/src/rocksdb_storage.rs @@ -0,0 +1,491 @@ +//! Storage implementation using RocksDB +use std::{path::Path, rc::Rc}; + +use rocksdb::{ColumnFamily, ColumnFamilyDescriptor, DBRawIterator, WriteBatch}; +pub use rocksdb::{Error, DB}; + +use crate::{Batch, RawIterator, Storage}; + +const AUX_CF_NAME: &str = "aux"; +const ROOTS_CF_NAME: &str = "roots"; +const META_CF_NAME: &str = "meta"; + +/// RocksDB options +pub fn default_db_opts() -> rocksdb::Options { + let mut opts = rocksdb::Options::default(); + opts.create_if_missing(true); + opts.increase_parallelism(num_cpus::get() as i32); + opts.set_allow_mmap_writes(true); + opts.set_allow_mmap_reads(true); + opts.create_missing_column_families(true); + opts.set_atomic_flush(true); + opts +} + +/// RocksDB column families +pub fn column_families() -> Vec { + vec![ + ColumnFamilyDescriptor::new(AUX_CF_NAME, default_db_opts()), + ColumnFamilyDescriptor::new(ROOTS_CF_NAME, default_db_opts()), + ColumnFamilyDescriptor::new(META_CF_NAME, default_db_opts()), + ] +} + +/// Create RocksDB with default settings +pub fn default_rocksdb(path: &Path) -> Rc { + Rc::new( + rocksdb::DB::open_cf_descriptors(&default_db_opts(), &path, column_families()) + .expect("cannot create rocksdb"), + ) +} + +fn make_prefixed_key(prefix: Vec, key: &[u8]) -> Vec { + let mut prefixed_key = prefix.clone(); + prefixed_key.extend_from_slice(key); + prefixed_key +} + +/// RocksDB wrapper to store items with prefixes +pub struct PrefixedRocksDbStorage { + db: Rc, + prefix: Vec, +} + +#[derive(thiserror::Error, Debug)] +pub enum PrefixedRocksDbStorageError { + #[error("column family not found: {0}")] + ColumnFamilyNotFound(&'static str), + #[error(transparent)] + RocksDbError(#[from] rocksdb::Error), +} + +impl PrefixedRocksDbStorage { + /// Wraps RocksDB to prepend prefixes to each operation + pub fn new(db: Rc, prefix: Vec) -> Result { + Ok(PrefixedRocksDbStorage { prefix, db }) + } + + /// Get auxiliary data column family + fn cf_aux(&self) -> Result<&rocksdb::ColumnFamily, PrefixedRocksDbStorageError> { + self.db + .cf_handle(AUX_CF_NAME) + .ok_or(PrefixedRocksDbStorageError::ColumnFamilyNotFound( + AUX_CF_NAME, + )) + } + + /// Get trees roots data column family + fn cf_roots(&self) -> Result<&rocksdb::ColumnFamily, PrefixedRocksDbStorageError> { + self.db + .cf_handle(ROOTS_CF_NAME) + .ok_or(PrefixedRocksDbStorageError::ColumnFamilyNotFound( + ROOTS_CF_NAME, + )) + } + + /// Get metadata column family + fn cf_meta(&self) -> Result<&rocksdb::ColumnFamily, PrefixedRocksDbStorageError> { + self.db + .cf_handle(META_CF_NAME) + .ok_or(PrefixedRocksDbStorageError::ColumnFamilyNotFound( + META_CF_NAME, + )) + } +} + +impl Storage for PrefixedRocksDbStorage { + type Batch<'a> = PrefixedRocksDbBatch<'a>; + type Error = PrefixedRocksDbStorageError; + type RawIterator<'a> = rocksdb::DBRawIterator<'a>; + + fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> { + self.db + .put(make_prefixed_key(self.prefix.clone(), key), value)?; + Ok(()) + } + + fn put_aux(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> { + self.db.put_cf( + self.cf_aux()?, + make_prefixed_key(self.prefix.clone(), key), + value, + )?; + Ok(()) + } + + fn put_root(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> { + self.db.put_cf( + self.cf_roots()?, + make_prefixed_key(self.prefix.clone(), key), + value, + )?; + Ok(()) + } + + fn delete(&self, key: &[u8]) -> Result<(), Self::Error> { + self.db + .delete(make_prefixed_key(self.prefix.clone(), key))?; + Ok(()) + } + + fn delete_aux(&self, key: &[u8]) -> Result<(), Self::Error> { + self.db + .delete_cf(self.cf_aux()?, make_prefixed_key(self.prefix.clone(), key))?; + Ok(()) + } + + fn delete_root(&self, key: &[u8]) -> Result<(), Self::Error> { + self.db.delete_cf( + self.cf_roots()?, + make_prefixed_key(self.prefix.clone(), key), + )?; + Ok(()) + } + + fn get(&self, key: &[u8]) -> Result>, Self::Error> { + Ok(self.db.get(make_prefixed_key(self.prefix.clone(), key))?) + } + + fn get_aux(&self, key: &[u8]) -> Result>, Self::Error> { + Ok(self + .db + .get_cf(self.cf_aux()?, make_prefixed_key(self.prefix.clone(), key))?) + } + + fn get_root(&self, key: &[u8]) -> Result>, Self::Error> { + Ok(self.db.get_cf( + self.cf_roots()?, + make_prefixed_key(self.prefix.clone(), key), + )?) + } + + fn put_meta(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> { + Ok(self.db.put_cf(self.cf_meta()?, key, value)?) + } + + fn delete_meta(&self, key: &[u8]) -> Result<(), Self::Error> { + Ok(self.db.delete_cf(self.cf_meta()?, key)?) + } + + fn get_meta(&self, key: &[u8]) -> Result>, Self::Error> { + Ok(self.db.get_cf(self.cf_meta()?, key)?) + } + + fn new_batch<'a>(&'a self) -> Result, Self::Error> { + Ok(PrefixedRocksDbBatch { + prefix: self.prefix.clone(), + batch: WriteBatch::default(), + cf_aux: self.cf_aux()?, + cf_roots: self.cf_roots()?, + }) + } + + fn commit_batch<'a>(&'a self, batch: Self::Batch<'a>) -> Result<(), Self::Error> { + self.db.write(batch.batch)?; + Ok(()) + } + + fn flush(&self) -> Result<(), Self::Error> { + self.db.flush()?; + Ok(()) + } + + fn raw_iter<'a>(&'a self) -> Self::RawIterator<'a> { + self.db.raw_iterator() + } +} + +impl RawIterator for rocksdb::DBRawIterator<'_> { + fn seek_to_first(&mut self) { + DBRawIterator::seek_to_first(self) + } + + fn seek(&mut self, key: &[u8]) { + DBRawIterator::seek(self, key) + } + + fn next(&mut self) { + DBRawIterator::next(self) + } + + fn value(&self) -> Option<&[u8]> { + DBRawIterator::value(self) + } + + fn key(&self) -> Option<&[u8]> { + DBRawIterator::key(self) + } + + fn valid(&self) -> bool { + DBRawIterator::valid(self) + } +} + +/// Wrapper to RocksDB batch +pub struct PrefixedRocksDbBatch<'a> { + prefix: Vec, + batch: rocksdb::WriteBatch, + cf_aux: &'a ColumnFamily, + cf_roots: &'a ColumnFamily, +} + +impl<'a> Batch for PrefixedRocksDbBatch<'a> { + fn put(&mut self, key: &[u8], value: &[u8]) { + self.batch + .put(make_prefixed_key(self.prefix.clone(), key), value) + } + + fn put_aux(&mut self, key: &[u8], value: &[u8]) { + self.batch.put_cf( + self.cf_aux, + make_prefixed_key(self.prefix.clone(), key), + value, + ) + } + + fn put_root(&mut self, key: &[u8], value: &[u8]) { + self.batch.put_cf( + self.cf_roots, + make_prefixed_key(self.prefix.clone(), key), + value, + ) + } + + fn delete(&mut self, key: &[u8]) { + self.batch + .delete(make_prefixed_key(self.prefix.clone(), key)) + } + + fn delete_aux(&mut self, key: &[u8]) { + self.batch + .delete_cf(self.cf_aux, make_prefixed_key(self.prefix.clone(), key)) + } + + fn delete_root(&mut self, key: &[u8]) { + self.batch + .delete_cf(self.cf_roots, make_prefixed_key(self.prefix.clone(), key)) + } +} + +#[cfg(test)] +mod tests { + use std::ops::Deref; + + use tempdir::TempDir; + + use super::*; + + struct TempPrefixedStorage { + storage: PrefixedRocksDbStorage, + _tmp_dir: TempDir, + } + + impl Deref for TempPrefixedStorage { + type Target = PrefixedRocksDbStorage; + + fn deref(&self) -> &Self::Target { + &self.storage + } + } + + impl TempPrefixedStorage { + fn new() -> Self { + let tmp_dir = TempDir::new("db").expect("cannot open tempdir"); + TempPrefixedStorage { + storage: PrefixedRocksDbStorage::new( + default_rocksdb(tmp_dir.path()), + b"test".to_vec(), + ) + .expect("cannot create prefixed rocksdb storage"), + _tmp_dir: tmp_dir, + } + } + } + + #[test] + fn test_get_put() { + let storage = TempPrefixedStorage::new(); + storage + .put(b"key", b"value") + .expect("cannot put into storage"); + assert_eq!( + storage.get(b"key").expect("cannot get by key").unwrap(), + b"value" + ); + assert_eq!( + storage + .db + .get(b"testkey") + .expect("cannot get by prefixed key") + .unwrap(), + b"value" + ); + } + + #[test] + fn test_get_put_aux() { + let storage = TempPrefixedStorage::new(); + storage + .put_aux(b"key", b"value") + .expect("cannot put into aux storage"); + assert_eq!( + storage.get_aux(b"key").expect("cannot get by key").unwrap(), + b"value" + ); + assert_eq!( + storage + .db + .get_cf(&storage.db.cf_handle(AUX_CF_NAME).unwrap(), b"testkey") + .expect("cannot get by prefixed key") + .unwrap(), + b"value" + ); + } + + #[test] + fn test_get_put_root() { + let storage = TempPrefixedStorage::new(); + storage + .put_root(b"key", b"value") + .expect("cannot put into roots storage"); + assert_eq!( + storage + .get_root(b"key") + .expect("cannot get by key") + .unwrap(), + b"value" + ); + assert_eq!( + storage + .db + .get_cf(&storage.db.cf_handle(ROOTS_CF_NAME).unwrap(), b"testkey") + .expect("cannot get by prefixed key") + .unwrap(), + b"value" + ); + } + + #[test] + fn test_get_put_meta() { + let storage = TempPrefixedStorage::new(); + storage + .put_meta(b"key", b"value") + .expect("cannot put into metadata storage"); + assert_eq!( + storage + .get_meta(b"key") + .expect("cannot get by key") + .unwrap(), + b"value" + ); + + // Note that metadata storage requires no prefixes + + assert!(storage + .db + .get_cf(&storage.db.cf_handle(META_CF_NAME).unwrap(), b"testkey") + .expect("cannot get by prefixed key") + .is_none()); + assert_eq!( + storage + .db + .get_cf(&storage.db.cf_handle(META_CF_NAME).unwrap(), b"key") + .expect("cannot get by prefixed key") + .unwrap(), + b"value" + ); + } + + #[test] + fn test_delete() { + let storage = TempPrefixedStorage::new(); + storage + .put(b"key", b"value") + .expect("cannot put into storage"); + storage.delete(b"key").expect("cannot delete from storage"); + assert!(storage + .db + .get(b"testkey") + .expect("cannot get by prefixed key") + .is_none()); + } + + #[test] + fn test_delete_aux() { + let storage = TempPrefixedStorage::new(); + storage + .put_aux(b"key", b"value") + .expect("cannot put into aux storage"); + storage + .delete_aux(b"key") + .expect("cannot delete from storage"); + assert!(storage + .db + .get_cf(&storage.db.cf_handle(AUX_CF_NAME).unwrap(), b"testkey") + .expect("cannot get by prefixed key") + .is_none()); + } + + #[test] + fn test_delete_root() { + let storage = TempPrefixedStorage::new(); + storage + .put_root(b"key", b"value") + .expect("cannot put into storage"); + storage + .delete_root(b"key") + .expect("cannot delete from storage"); + assert!(storage + .db + .get_cf(&storage.db.cf_handle(ROOTS_CF_NAME).unwrap(), b"testkey") + .expect("cannot get by prefixed key") + .is_none()); + } + + #[test] + fn test_delete_meta() { + let storage = TempPrefixedStorage::new(); + storage + .put_meta(b"key", b"value") + .expect("cannot put into storage"); + storage + .delete_meta(b"key") + .expect("cannot delete from storage"); + assert!(storage + .db + .get_cf(&storage.db.cf_handle(META_CF_NAME).unwrap(), b"key") + .expect("cannot get by prefixed key") + .is_none()); + } + + #[test] + fn test_batch() { + let storage = TempPrefixedStorage::new(); + let mut batch = storage.new_batch().expect("cannot create batch"); + batch.put(b"key1", b"value1"); + batch.put(b"key2", b"value2"); + batch.put_root(b"root", b"yeet"); + storage.commit_batch(batch).expect("cannot commit batch"); + assert_eq!( + storage + .get(b"key1") + .expect("cannot get a value by key1") + .unwrap(), + b"value1" + ); + assert_eq!( + storage + .get(b"key2") + .expect("cannot get a value by key2") + .unwrap(), + b"value2" + ); + assert_eq!( + storage + .get_root(b"root") + .expect("cannot get a root value") + .unwrap(), + b"yeet" + ); + } +}