diff --git a/grovedb/src/lib.rs b/grovedb/src/lib.rs index dffb76a66..fd488be86 100644 --- a/grovedb/src/lib.rs +++ b/grovedb/src/lib.rs @@ -2,6 +2,7 @@ mod operations; mod subtree; #[cfg(test)] mod tests; +mod transaction; use std::{collections::HashMap, path::Path, rc::Rc}; @@ -10,11 +11,16 @@ use merk::{self, Merk}; use rs_merkle::{algorithms::Sha256, Hasher, MerkleTree}; use serde::{Deserialize, Serialize}; use storage::{ - rocksdb_storage::{PrefixedRocksDbStorage, PrefixedRocksDbStorageError}, - Storage, + rocksdb_storage::{ + OptimisticTransactionDBTransaction, PrefixedRocksDbStorage, PrefixedRocksDbStorageError, + }, + Storage, Transaction, }; pub use subtree::Element; +// use crate::transaction::GroveDbTransaction; +// pub use transaction::GroveDbTransaction; + /// A key to store serialized data about subtree prefixes to restore HADS /// structure const SUBTREES_SERIALIZED_KEY: &[u8] = b"subtreesSerialized"; @@ -37,6 +43,11 @@ pub enum Error { StorageError(#[from] PrefixedRocksDbStorageError), #[error("data corruption error: {0}")] CorruptedData(String), + #[error( + "db is in readonly mode due to the active transaction. Please provide transaction or \ + commit it" + )] + DbIsInReadonlyMode, } pub struct PathQuery<'a> { @@ -63,13 +74,39 @@ pub struct GroveDb { root_leaf_keys: HashMap, usize>, subtrees: HashMap, Merk>, meta_storage: PrefixedRocksDbStorage, - db: Rc, + db: Rc, + // Locks the database for writes during the transaction + is_readonly: bool, + // Temp trees used for writes during transaction + temp_root_tree: MerkleTree, + temp_root_leaf_keys: HashMap, usize>, + temp_subtrees: HashMap, Merk>, } impl GroveDb { + pub fn new( + root_tree: MerkleTree, + root_leaf_keys: HashMap, usize>, + subtrees: HashMap, Merk>, + meta_storage: PrefixedRocksDbStorage, + db: Rc, + ) -> Self { + Self { + root_tree, + root_leaf_keys, + subtrees, + meta_storage, + db, + temp_root_tree: MerkleTree::new(), + temp_root_leaf_keys: HashMap::new(), + temp_subtrees: HashMap::new(), + is_readonly: false, + } + } + pub fn open>(path: P) -> Result { let db = Rc::new( - storage::rocksdb_storage::DB::open_cf_descriptors( + storage::rocksdb_storage::OptimisticTransactionDB::open_cf_descriptors( &storage::rocksdb_storage::default_db_opts(), path, storage::rocksdb_storage::column_families(), @@ -104,35 +141,70 @@ impl GroveDb { HashMap::new() }; - Ok(GroveDb { - root_tree: Self::build_root_tree(&subtrees, &root_leaf_keys), - db, - subtrees, + Ok(GroveDb::new( + Self::build_root_tree(&subtrees, &root_leaf_keys), root_leaf_keys, + subtrees, meta_storage, - }) + db, + )) } - pub fn checkpoint>(&self, path: P) -> Result { - storage::rocksdb_storage::Checkpoint::new(&self.db) - .and_then(|x| x.create_checkpoint(&path)) - .map_err(PrefixedRocksDbStorageError::RocksDbError)?; - GroveDb::open(path) - } + // TODO: Checkpoints are currently not implemented for the transactional DB + // pub fn checkpoint>(&self, path: P) -> Result { + // // let snapshot = self.db.transaction().snapshot(); + // + // storage::rocksdb_storage::Checkpoint::new(&self.db) + // .and_then(|x| x.create_checkpoint(&path)) + // .map_err(PrefixedRocksDbStorageError::RocksDbError)?; + // GroveDb::open(path) + // } + + fn store_subtrees_keys_data( + &self, + db_transaction: Option<&OptimisticTransactionDBTransaction>, + ) -> Result<(), Error> { + let subtrees = match db_transaction { + None => &self.subtrees, + Some(_) => &self.temp_subtrees, + }; + + let prefixes: Vec> = subtrees.keys().map(|x| x.clone()).collect(); + + // TODO: make StorageOrTransaction which will has the access to either storage + // or transaction + match db_transaction { + None => { + self.meta_storage.put_meta( + SUBTREES_SERIALIZED_KEY, + &bincode::serialize(&prefixes).map_err(|_| { + Error::CorruptedData(String::from("unable to serialize prefixes")) + })?, + )?; + self.meta_storage.put_meta( + ROOT_LEAFS_SERIALIZED_KEY, + &bincode::serialize(&self.temp_root_leaf_keys).map_err(|_| { + Error::CorruptedData(String::from("unable to serialize root leafs")) + })?, + )?; + } + Some(tx) => { + let transaction = self.meta_storage.transaction(tx); + transaction.put_meta( + SUBTREES_SERIALIZED_KEY, + &bincode::serialize(&prefixes).map_err(|_| { + Error::CorruptedData(String::from("unable to serialize prefixes")) + })?, + )?; + transaction.put_meta( + ROOT_LEAFS_SERIALIZED_KEY, + &bincode::serialize(&self.root_leaf_keys).map_err(|_| { + Error::CorruptedData(String::from("unable to serialize root leafs")) + })?, + )?; + } + } - fn store_subtrees_keys_data(&self) -> Result<(), Error> { - let prefixes: Vec> = self.subtrees.keys().map(|x| x.clone()).collect(); - self.meta_storage.put_meta( - SUBTREES_SERIALIZED_KEY, - &bincode::serialize(&prefixes) - .map_err(|_| Error::CorruptedData(String::from("unable to serialize prefixes")))?, - )?; - self.meta_storage.put_meta( - ROOT_LEAFS_SERIALIZED_KEY, - &bincode::serialize(&self.root_leaf_keys).map_err(|_| { - Error::CorruptedData(String::from("unable to serialize root leafs")) - })?, - )?; Ok(()) } @@ -151,36 +223,61 @@ impl GroveDb { res } - pub fn elements_iterator(&self, path: &[&[u8]]) -> Result { - let merk = self - .subtrees + pub fn elements_iterator( + &self, + path: &[&[u8]], + transaction: Option<&OptimisticTransactionDBTransaction>, + ) -> Result { + let subtrees = match transaction { + None => &self.subtrees, + Some(_) => &self.temp_subtrees, + }; + + let merk = subtrees .get(&Self::compress_subtree_key(path, None)) .ok_or(Error::InvalidPath("no subtree found under that path"))?; Ok(Element::iterator(merk.raw_iter())) } /// Method to propagate updated subtree root hashes up to GroveDB root - fn propagate_changes(&mut self, path: &[&[u8]]) -> Result<(), Error> { + fn propagate_changes<'a: 'b, 'b>( + &'a mut self, + path: &[&[u8]], + transaction: Option<&'b ::DBTransaction<'b>>, + ) -> Result<(), Error> { + let subtrees = match transaction { + None => &mut self.subtrees, + Some(_) => &mut self.temp_subtrees, + }; + + let root_leaf_keys = match transaction { + None => &mut self.root_leaf_keys, + Some(_) => &mut self.temp_root_leaf_keys, + }; + let mut split_path = path.split_last(); // Go up until only one element in path, which means a key of a root tree while let Some((key, path_slice)) = split_path { if path_slice.is_empty() { // Hit the root tree - self.root_tree = Self::build_root_tree(&self.subtrees, &self.root_leaf_keys); + match transaction { + None => self.root_tree = Self::build_root_tree(&subtrees, &root_leaf_keys), + Some(_) => { + self.temp_root_tree = Self::build_root_tree(&subtrees, &root_leaf_keys) + } + }; break; } else { let compressed_path_upper_tree = Self::compress_subtree_key(path_slice, None); let compressed_path_subtree = Self::compress_subtree_key(path_slice, Some(key)); - let subtree = self - .subtrees + let subtree = subtrees .get(&compressed_path_subtree) .ok_or(Error::InvalidPath("no subtree found under that path"))?; let element = Element::Tree(subtree.root_hash()); - let upper_tree = self - .subtrees + let upper_tree = subtrees .get_mut(&compressed_path_upper_tree) .ok_or(Error::InvalidPath("no subtree found under that path"))?; - element.insert(upper_tree, key.to_vec())?; + element.insert(upper_tree, key.to_vec(), transaction)?; split_path = path_slice.split_last(); } } @@ -215,4 +312,94 @@ impl GroveDb { pub fn flush(&self) -> Result<(), Error> { Ok(self.meta_storage.flush()?) } + + /// Returns a clone of reference counter to the underlying db storage. + /// Useful when working with transactions. For more details, please + /// refer to the [`GroveDb::start_transaction`] examples section. + pub fn storage(&self) -> Rc { + self.db.clone() + } + + /// Starts database transaction. Please note that you have to start + /// underlying storage transaction manually. + /// + /// ## Examples: + /// ``` + /// # use grovedb::{Element, Error, GroveDb}; + /// # use rs_merkle::{MerkleTree, MerkleProof, algorithms::Sha256, Hasher, utils}; + /// # use std::convert::TryFrom; + /// # use tempdir::TempDir; + /// # + /// # fn main() -> Result<(), Box> { + /// const TEST_LEAF: &[u8] = b"test_leaf"; + /// + /// let tmp_dir = TempDir::new("db").unwrap(); + /// let mut db = GroveDb::open(tmp_dir.path())?; + /// db.insert(&[], TEST_LEAF.to_vec(), Element::empty_tree(), None)?; + /// + /// let storage = db.storage(); + /// let db_transaction = storage.transaction(); + /// db.start_transaction(); + /// + /// let subtree_key = b"subtree_key".to_vec(); + /// db.insert( + /// &[TEST_LEAF], + /// subtree_key.clone(), + /// Element::empty_tree(), + /// Some(&db_transaction), + /// )?; + /// + /// // This action exists only inside the transaction for now + /// let result = db.get(&[TEST_LEAF], &subtree_key, None); + /// assert!(matches!(result, Err(Error::InvalidPath(_)))); + /// + /// // To access values inside the transaction, transaction needs to be passed to the `db::get` + /// let result_with_transaction = db.get(&[TEST_LEAF], &subtree_key, Some(&db_transaction))?; + /// assert_eq!(result_with_transaction, Element::empty_tree()); + /// + /// // After transaction is committed, the value from it can be accessed normally. + /// db.commit_transaction(db_transaction); + /// let result = db.get(&[TEST_LEAF], &subtree_key, None)?; + /// assert_eq!(result, Element::empty_tree()); + /// + /// # Ok(()) + /// # } + /// ``` + pub fn start_transaction(&mut self) -> Result<(), Error> { + if self.is_readonly { + return Err(Error::DbIsInReadonlyMode); + } + // Locking all writes outside of the transaction + self.is_readonly = true; + + // Cloning all the trees to maintain original state before the transaction + self.temp_root_tree = self.root_tree.clone(); + self.temp_root_leaf_keys = self.root_leaf_keys.clone(); + self.temp_subtrees = self.subtrees.clone(); + + Ok(()) + } + + /// Commits previously started db transaction. For more details on the + /// transaction usage, please check [`GroveDb::start_transaction`] + pub fn commit_transaction( + &mut self, + db_transaction: OptimisticTransactionDBTransaction, + ) -> Result<(), Error> { + // Enabling writes again + self.is_readonly = false; + + // Copying all changes that were made during the transaction into the db + self.root_tree = self.temp_root_tree.clone(); + self.root_leaf_keys = self.temp_root_leaf_keys.drain().collect(); + self.subtrees = self.temp_subtrees.drain().collect(); + + // TODO: root tree actually does support transactions, so this + // code can be reworked to account for that + self.temp_root_tree = MerkleTree::new(); + + Ok(db_transaction + .commit() + .map_err(PrefixedRocksDbStorageError::RocksDbError)?) + } } diff --git a/grovedb/src/operations/delete.rs b/grovedb/src/operations/delete.rs index 080b33f24..4cccafcc0 100644 --- a/grovedb/src/operations/delete.rs +++ b/grovedb/src/operations/delete.rs @@ -1,31 +1,55 @@ +use storage::rocksdb_storage::OptimisticTransactionDBTransaction; + use crate::{Element, Error, GroveDb}; impl GroveDb { - pub fn delete(&mut self, path: &[&[u8]], key: Vec) -> Result<(), Error> { - let element = self.get_raw(path, &key)?; + pub fn delete( + &mut self, + path: &[&[u8]], + key: Vec, + transaction: Option<&OptimisticTransactionDBTransaction>, + ) -> Result<(), Error> { + if let None = transaction { + if self.is_readonly { + return Err(Error::DbIsInReadonlyMode); + } + } if path.is_empty() { // Attempt to delete a root tree leaf Err(Error::InvalidPath( "root tree leafs currently cannot be deleted", )) } else { - let mut merk = self - .subtrees - .get_mut(&Self::compress_subtree_key(path, None)) - .ok_or(Error::InvalidPath("no subtree found under that path"))?; - Element::delete(&mut merk, key.clone())?; + let element = self.get_raw(path, &key, transaction)?; + { + let subtrees = match transaction { + None => &mut self.subtrees, + Some(_) => &mut self.temp_subtrees, + }; + + let mut merk = subtrees + .get_mut(&Self::compress_subtree_key(path, None)) + .ok_or(Error::InvalidPath("no subtree found under that path"))?; + Element::delete(&mut merk, key.clone(), transaction)?; + } + if let Element::Tree(_) = element { // TODO: dumb traversal should not be tolerated let mut concat_path: Vec> = path.iter().map(|x| x.to_vec()).collect(); concat_path.push(key); - let subtrees_paths = self.find_subtrees(concat_path)?; + let subtrees_paths = self.find_subtrees(concat_path, transaction)?; + let subtrees = match transaction { + None => &mut self.subtrees, + Some(_) => &mut self.temp_subtrees, + }; + for subtree_path in subtrees_paths { // TODO: eventually we need to do something about this nested slices let subtree_path_ref: Vec<&[u8]> = subtree_path.iter().map(|x| x.as_slice()).collect(); let prefix = Self::compress_subtree_key(&subtree_path_ref, None); - if let Some(subtree) = self.subtrees.remove(&prefix) { - subtree.clear().map_err(|e| { + if let Some(mut subtree) = subtrees.remove(&prefix) { + subtree.clear(transaction).map_err(|e| { Error::CorruptedData(format!( "unable to cleanup tree from storage: {}", e @@ -34,7 +58,7 @@ impl GroveDb { } } } - self.propagate_changes(path)?; + self.propagate_changes(path, transaction)?; Ok(()) } } @@ -43,14 +67,18 @@ impl GroveDb { /// Finds keys which are trees for a given subtree recursively. /// One element means a key of a `merk`, n > 1 elements mean relative path /// for a deeply nested subtree. - pub(crate) fn find_subtrees(&self, path: Vec>) -> Result>>, Error> { + pub(crate) fn find_subtrees( + &self, + path: Vec>, + transaction: Option<&OptimisticTransactionDBTransaction>, + ) -> Result>>, Error> { let mut queue: Vec>> = vec![path.clone()]; let mut result: Vec>> = vec![path.clone()]; while let Some(q) = queue.pop() { // TODO: eventually we need to do something about this nested slices let q_ref: Vec<&[u8]> = q.iter().map(|x| x.as_slice()).collect(); - let mut iter = self.elements_iterator(&q_ref)?; + let mut iter = self.elements_iterator(&q_ref, transaction)?; while let Some((key, value)) = iter.next()? { match value { Element::Tree(_) => { diff --git a/grovedb/src/operations/get.rs b/grovedb/src/operations/get.rs index 4097aace3..9dd9f2275 100644 --- a/grovedb/src/operations/get.rs +++ b/grovedb/src/operations/get.rs @@ -1,19 +1,32 @@ use std::collections::HashSet; +use storage::rocksdb_storage::OptimisticTransactionDBTransaction; + use crate::{Element, Error, GroveDb, PathQuery}; /// Limit of possible indirections pub(crate) const MAX_REFERENCE_HOPS: usize = 10; impl GroveDb { - pub fn get(&self, path: &[&[u8]], key: &[u8]) -> Result { - match self.get_raw(path, key)? { - Element::Reference(reference_path) => self.follow_reference(reference_path), + pub fn get( + &self, + path: &[&[u8]], + key: &[u8], + transaction: Option<&OptimisticTransactionDBTransaction>, + ) -> Result { + match self.get_raw(path, key, transaction)? { + Element::Reference(reference_path) => { + self.follow_reference(reference_path, transaction) + } other => Ok(other), } } - fn follow_reference(&self, mut path: Vec>) -> Result { + fn follow_reference( + &self, + mut path: Vec>, + transaction: Option<&OptimisticTransactionDBTransaction>, + ) -> Result { let mut hops_left = MAX_REFERENCE_HOPS; let mut current_element; let mut visited = HashSet::new(); @@ -30,6 +43,7 @@ impl GroveDb { .collect::>() .as_slice(), key, + transaction, )?; } else { return Err(Error::InvalidPath("empty path")); @@ -45,19 +59,36 @@ impl GroveDb { } /// Get tree item without following references - pub(super) fn get_raw(&self, path: &[&[u8]], key: &[u8]) -> Result { - let merk = self - .subtrees + pub(super) fn get_raw( + &self, + path: &[&[u8]], + key: &[u8], + transaction: Option<&OptimisticTransactionDBTransaction>, + ) -> Result { + let subtrees = match transaction { + None => &self.subtrees, + Some(_) => &self.temp_subtrees, + }; + + let merk = subtrees .get(&Self::compress_subtree_key(path, None)) .ok_or(Error::InvalidPath("no subtree found under that path"))?; Element::get(&merk, key) } - pub fn get_query(&mut self, path_queries: &[PathQuery]) -> Result, Error> { + pub fn get_query( + &mut self, + path_queries: &[PathQuery], + transaction: Option<&OptimisticTransactionDBTransaction>, + ) -> Result, Error> { + let subtrees = match transaction { + None => &self.subtrees, + Some(_) => &self.temp_subtrees, + }; + let mut result = Vec::new(); for query in path_queries { - let merk = self - .subtrees + let merk = subtrees .get(&Self::compress_subtree_key(query.path, None)) .ok_or(Error::InvalidPath("no subtree found under that path"))?; let subtree_results = Element::get_query(merk, &query.query)?; diff --git a/grovedb/src/operations/insert.rs b/grovedb/src/operations/insert.rs index 2042e1fb7..ae314b6b8 100644 --- a/grovedb/src/operations/insert.rs +++ b/grovedb/src/operations/insert.rs @@ -1,13 +1,13 @@ use std::rc::Rc; -use storage::rocksdb_storage; +use storage::{rocksdb_storage, Storage}; use crate::{Element, Error, GroveDb, Merk, PrefixedRocksDbStorage}; /// A helper function that builds a prefix for a key under a path and opens a /// Merk instance. fn create_merk_with_prefix( - db: Rc, + db: Rc, path: &[&[u8]], key: &[u8], ) -> Result<(Vec, Merk), Error> { @@ -20,15 +20,32 @@ fn create_merk_with_prefix( } impl GroveDb { - pub fn insert(&mut self, path: &[&[u8]], key: Vec, element: Element) -> Result<(), Error> { + pub fn insert<'a: 'b, 'b>( + &'a mut self, + path: &[&[u8]], + key: Vec, + element: Element, + transaction: Option<&'b ::DBTransaction<'b>>, + ) -> Result<(), Error> { + if let None = transaction { + if self.is_readonly { + return Err(Error::DbIsInReadonlyMode); + } + } + + let subtrees = match transaction { + None => &mut self.subtrees, + Some(_) => &mut self.temp_subtrees, + }; + match element { Element::Tree(_) => { if path.is_empty() { - self.add_root_leaf(&key)?; + self.add_root_leaf(&key, transaction)?; } else { - self.add_non_root_subtree(path, key)?; + self.add_non_root_subtree(path, key, transaction)?; } - self.store_subtrees_keys_data()?; + self.store_subtrees_keys_data(transaction)?; } _ => { // If path is empty that means there is an attempt to insert something into a @@ -39,65 +56,103 @@ impl GroveDb { )); } // Get a Merk by a path - let mut merk = self - .subtrees + let mut merk = subtrees .get_mut(&Self::compress_subtree_key(path, None)) .ok_or(Error::InvalidPath("no subtree found under that path"))?; - element.insert(&mut merk, key)?; - self.propagate_changes(path)?; + element.insert(&mut merk, key, transaction)?; + self.propagate_changes(path, transaction)?; } } Ok(()) } /// Add subtree to the root tree - fn add_root_leaf(&mut self, key: &[u8]) -> Result<(), Error> { + fn add_root_leaf<'a: 'b, 'b>( + &'a mut self, + key: &[u8], + transaction: Option<&'b ::DBTransaction<'b>>, + ) -> Result<(), Error> { + if let None = transaction { + if self.is_readonly { + return Err(Error::DbIsInReadonlyMode); + } + } + + let subtrees = match transaction { + None => &mut self.subtrees, + Some(_) => &mut self.temp_subtrees, + }; + + let root_leaf_keys = match transaction { + None => &mut self.root_leaf_keys, + Some(_) => &mut self.temp_root_leaf_keys, + }; + + let root_tree = match transaction { + None => &mut self.root_tree, + Some(_) => &mut self.temp_root_tree, + }; // Open Merk and put handle into `subtrees` dictionary accessible by its // compressed path let (subtree_prefix, subtree_merk) = create_merk_with_prefix(self.db.clone(), &[], &key)?; - self.subtrees.insert(subtree_prefix.clone(), subtree_merk); + subtrees.insert(subtree_prefix.clone(), subtree_merk); // Update root leafs index to persist rs-merkle structure later - if self.root_leaf_keys.get(&subtree_prefix).is_none() { - self.root_leaf_keys - .insert(subtree_prefix, self.root_tree.leaves_len()); + if root_leaf_keys.get(&subtree_prefix).is_none() { + root_leaf_keys.insert(subtree_prefix, root_tree.leaves_len()); } - self.propagate_changes(&[&key])?; + self.propagate_changes(&[&key], transaction)?; Ok(()) } // Add subtree to another subtree. - fn add_non_root_subtree(&mut self, path: &[&[u8]], key: Vec) -> Result<(), Error> { + fn add_non_root_subtree<'a: 'b, 'b>( + &'a mut self, + path: &[&[u8]], + key: Vec, + transaction: Option<&'b ::DBTransaction<'b>>, + ) -> Result<(), Error> { + if let None = transaction { + if self.is_readonly { + return Err(Error::DbIsInReadonlyMode); + } + } + + let subtrees = match transaction { + None => &mut self.subtrees, + Some(_) => &mut self.temp_subtrees, + }; + let compressed_path = Self::compress_subtree_key(path, None); // First, check if a subtree exists to create a new subtree under it - self.subtrees + subtrees .get(&compressed_path) .ok_or(Error::InvalidPath("no subtree found under that path"))?; let (subtree_prefix, subtree_merk) = create_merk_with_prefix(self.db.clone(), path, &key)?; // Set tree value as a a subtree root hash let element = Element::Tree(subtree_merk.root_hash()); - self.subtrees.insert(subtree_prefix, subtree_merk); + subtrees.insert(subtree_prefix, subtree_merk); // Had to take merk from `subtrees` once again to solve multiple &mut s - let mut merk = self - .subtrees + let mut merk = subtrees .get_mut(&compressed_path) .expect("merk object must exist in `subtrees`"); // need to mark key as taken in the upper tree - element.insert(&mut merk, key)?; - self.propagate_changes(path)?; + element.insert(&mut merk, key, transaction)?; + self.propagate_changes(path, transaction)?; Ok(()) } - pub fn insert_if_not_exists( + pub fn insert_if_not_exists<'a: 'b, 'b>( &mut self, path: &[&[u8]], key: Vec, element: Element, + transaction: Option<&'b ::DBTransaction<'b>>, ) -> Result { - if self.get(path, &key).is_ok() { + if self.get(path, &key, transaction).is_ok() { return Ok(false); } - match self.insert(path, key, element) { + match self.insert(path, key, element, transaction) { Ok(_) => Ok(true), Err(e) => Err(e), } diff --git a/grovedb/src/subtree.rs b/grovedb/src/subtree.rs index 4f0a8c2b5..9b1d4a83c 100644 --- a/grovedb/src/subtree.rs +++ b/grovedb/src/subtree.rs @@ -10,8 +10,11 @@ use merk::{ }; use serde::{Deserialize, Serialize}; use storage::{ - rocksdb_storage::{PrefixedRocksDbStorage, RawPrefixedIterator}, - RawIterator, Store, + rocksdb_storage::{ + OptimisticTransactionDBTransaction, PrefixedRocksDbStorage, + RawPrefixedTransactionalIterator, + }, + RawIterator, Storage, Store, }; use crate::{Error, Merk}; @@ -36,10 +39,14 @@ impl Element { } /// Delete an element from Merk under a key - pub fn delete(merk: &mut Merk, key: Vec) -> Result<(), Error> { + pub fn delete( + merk: &mut Merk, + key: Vec, + transaction: Option<&OptimisticTransactionDBTransaction>, + ) -> Result<(), Error> { // TODO: delete references on this element let batch = [(key, Op::Delete)]; - merk.apply(&batch, &[]) + merk.apply(&batch, &[], transaction) .map_err(|e| Error::CorruptedData(e.to_string())) } @@ -99,30 +106,34 @@ impl Element { /// Insert an element in Merk under a key; path should be resolved and /// proper Merk should be loaded by this moment - pub fn insert( - &self, + /// If transaction is not passed, the batch will be written immediately. + /// If transaction is passed, the operation will be committed on the + /// transaction commit. + pub fn insert<'a: 'b, 'b>( + &'a self, merk: &mut Merk, key: Vec, + transaction: Option<&'b ::DBTransaction<'b>>, ) -> Result<(), Error> { - let batch = + let batch_operations = [( key, Op::Put(bincode::serialize(self).map_err(|_| { Error::CorruptedData(String::from("unable to serialize element")) })?), )]; - merk.apply(&batch, &[]) + merk.apply(&batch_operations, &[], transaction) .map_err(|e| Error::CorruptedData(e.to_string())) } - pub fn iterator(mut raw_iter: RawPrefixedIterator) -> ElementsIterator { + pub fn iterator(mut raw_iter: RawPrefixedTransactionalIterator) -> ElementsIterator { raw_iter.seek_to_first(); ElementsIterator { raw_iter } } } pub struct ElementsIterator<'a> { - raw_iter: RawPrefixedIterator<'a>, + raw_iter: RawPrefixedTransactionalIterator<'a>, } fn raw_decode(bytes: &[u8]) -> Result { @@ -159,10 +170,10 @@ mod tests { fn test_success_insert() { let mut merk = TempMerk::new(); Element::empty_tree() - .insert(&mut merk, b"mykey".to_vec()) + .insert(&mut merk, b"mykey".to_vec(), None) .expect("expected successful insertion"); Element::Item(b"value".to_vec()) - .insert(&mut merk, b"another-key".to_vec()) + .insert(&mut merk, b"another-key".to_vec(), None) .expect("expected successful insertion 2"); assert_eq!( @@ -175,16 +186,16 @@ mod tests { fn test_get_query() { let mut merk = TempMerk::new(); Element::Item(b"ayyd".to_vec()) - .insert(&mut merk, b"d".to_vec()) + .insert(&mut merk, b"d".to_vec(), None) .expect("expected successful insertion"); Element::Item(b"ayyc".to_vec()) - .insert(&mut merk, b"c".to_vec()) + .insert(&mut merk, b"c".to_vec(), None) .expect("expected successful insertion"); Element::Item(b"ayya".to_vec()) - .insert(&mut merk, b"a".to_vec()) + .insert(&mut merk, b"a".to_vec(), None) .expect("expected successful insertion"); Element::Item(b"ayyb".to_vec()) - .insert(&mut merk, b"b".to_vec()) + .insert(&mut merk, b"b".to_vec(), None) .expect("expected successful insertion"); // Test queries by key diff --git a/grovedb/src/tests.rs b/grovedb/src/tests.rs index 2ea807d0d..937471e7f 100644 --- a/grovedb/src/tests.rs +++ b/grovedb/src/tests.rs @@ -1,8 +1,12 @@ -use std::ops::{Deref, DerefMut}; +use std::{ + ops::{Deref, DerefMut}, + option::Option::None, +}; use merk::test_utils::TempMerk; use tempdir::TempDir; +// use test::RunIgnored::No; use super::*; const TEST_LEAF: &[u8] = b"test_leaf"; @@ -40,9 +44,9 @@ fn make_grovedb() -> TempGroveDb { } fn add_test_leafs(db: &mut GroveDb) { - db.insert(&[], TEST_LEAF.to_vec(), Element::empty_tree()) + db.insert(&[], TEST_LEAF.to_vec(), Element::empty_tree(), None) .expect("successful root tree leaf insert"); - db.insert(&[], ANOTHER_TEST_LEAF.to_vec(), Element::empty_tree()) + db.insert(&[], ANOTHER_TEST_LEAF.to_vec(), Element::empty_tree(), None) .expect("successful root tree leaf 2 insert"); } @@ -56,10 +60,10 @@ fn test_init() { fn test_insert_value_to_merk() { let mut db = make_grovedb(); let element = Element::Item(b"ayy".to_vec()); - db.insert(&[TEST_LEAF], b"key".to_vec(), element.clone()) + db.insert(&[TEST_LEAF], b"key".to_vec(), element.clone(), None) .expect("successful insert"); assert_eq!( - db.get(&[TEST_LEAF], b"key").expect("succesful get"), + db.get(&[TEST_LEAF], b"key", None).expect("succesful get"), element ); } @@ -70,13 +74,18 @@ fn test_insert_value_to_subtree() { let element = Element::Item(b"ayy".to_vec()); // Insert a subtree first - db.insert(&[TEST_LEAF], b"key1".to_vec(), Element::empty_tree()) + db.insert(&[TEST_LEAF], b"key1".to_vec(), Element::empty_tree(), None) .expect("successful subtree insert"); // Insert an element into subtree - db.insert(&[TEST_LEAF, b"key1"], b"key2".to_vec(), element.clone()) - .expect("successful value insert"); + db.insert( + &[TEST_LEAF, b"key1"], + b"key2".to_vec(), + element.clone(), + None, + ) + .expect("successful value insert"); assert_eq!( - db.get(&[TEST_LEAF, b"key1"], b"key2") + db.get(&[TEST_LEAF, b"key1"], b"key2", None) .expect("succesful get"), element ); @@ -89,12 +98,13 @@ fn test_changes_propagated() { let element = Element::Item(b"ayy".to_vec()); // Insert some nested subtrees - db.insert(&[TEST_LEAF], b"key1".to_vec(), Element::empty_tree()) + db.insert(&[TEST_LEAF], b"key1".to_vec(), Element::empty_tree(), None) .expect("successful subtree 1 insert"); db.insert( &[TEST_LEAF, b"key1"], b"key2".to_vec(), Element::empty_tree(), + None, ) .expect("successful subtree 2 insert"); // Insert an element into subtree @@ -102,10 +112,11 @@ fn test_changes_propagated() { &[TEST_LEAF, b"key1", b"key2"], b"key3".to_vec(), element.clone(), + None, ) .expect("successful value insert"); assert_eq!( - db.get(&[TEST_LEAF, b"key1", b"key2"], b"key3") + db.get(&[TEST_LEAF, b"key1", b"key2"], b"key3", None) .expect("succesful get"), element ); @@ -122,16 +133,22 @@ fn test_follow_references() { &[TEST_LEAF], b"reference_key".to_vec(), Element::Reference(vec![TEST_LEAF.to_vec(), b"key2".to_vec(), b"key3".to_vec()]), + None, ) .expect("successful reference insert"); // Insert an item to refer to - db.insert(&[TEST_LEAF], b"key2".to_vec(), Element::empty_tree()) + db.insert(&[TEST_LEAF], b"key2".to_vec(), Element::empty_tree(), None) .expect("successful subtree 1 insert"); - db.insert(&[TEST_LEAF, b"key2"], b"key3".to_vec(), element.clone()) - .expect("successful value insert"); + db.insert( + &[TEST_LEAF, b"key2"], + b"key3".to_vec(), + element.clone(), + None, + ) + .expect("successful value insert"); assert_eq!( - db.get(&[TEST_LEAF], b"reference_key") + db.get(&[TEST_LEAF], b"reference_key", None) .expect("succesful get"), element ); @@ -145,6 +162,7 @@ fn test_cyclic_references() { &[TEST_LEAF], b"reference_key_1".to_vec(), Element::Reference(vec![TEST_LEAF.to_vec(), b"reference_key_2".to_vec()]), + None, ) .expect("successful reference 1 insert"); @@ -152,11 +170,12 @@ fn test_cyclic_references() { &[TEST_LEAF], b"reference_key_2".to_vec(), Element::Reference(vec![TEST_LEAF.to_vec(), b"reference_key_1".to_vec()]), + None, ) .expect("successful reference 2 insert"); assert!(matches!( - db.get(&[TEST_LEAF], b"reference_key_1").unwrap_err(), + db.get(&[TEST_LEAF], b"reference_key_1", None).unwrap_err(), Error::CyclicReference )); } @@ -172,6 +191,7 @@ fn test_too_many_indirections() { &[TEST_LEAF], b"key0".to_vec(), Element::Item(b"oops".to_vec()), + None, ) .expect("successful item insert"); @@ -180,12 +200,13 @@ fn test_too_many_indirections() { &[TEST_LEAF], keygen(i), Element::Reference(vec![TEST_LEAF.to_vec(), keygen(i - 1)]), + None, ) .expect("successful reference insert"); } assert!(matches!( - db.get(&[TEST_LEAF], &keygen(MAX_REFERENCE_HOPS + 1)) + db.get(&[TEST_LEAF], &keygen(MAX_REFERENCE_HOPS + 1), None) .unwrap_err(), Error::ReferenceLimit )); @@ -201,12 +222,13 @@ fn test_tree_structure_is_presistent() { add_test_leafs(&mut db); // Insert some nested subtrees - db.insert(&[TEST_LEAF], b"key1".to_vec(), Element::empty_tree()) + db.insert(&[TEST_LEAF], b"key1".to_vec(), Element::empty_tree(), None) .expect("successful subtree 1 insert"); db.insert( &[TEST_LEAF, b"key1"], b"key2".to_vec(), Element::empty_tree(), + None, ) .expect("successful subtree 2 insert"); // Insert an element into subtree @@ -214,10 +236,11 @@ fn test_tree_structure_is_presistent() { &[TEST_LEAF, b"key1", b"key2"], b"key3".to_vec(), element.clone(), + None, ) .expect("successful value insert"); assert_eq!( - db.get(&[TEST_LEAF, b"key1", b"key2"], b"key3") + db.get(&[TEST_LEAF, b"key1", b"key2"], b"key3", None) .expect("succesful get 1"), element ); @@ -225,11 +248,13 @@ fn test_tree_structure_is_presistent() { // Open a persisted GroveDB let db = GroveDb::open(tmp_dir).unwrap(); assert_eq!( - db.get(&[TEST_LEAF, b"key1", b"key2"], b"key3") + db.get(&[TEST_LEAF, b"key1", b"key2"], b"key3", None) .expect("succesful get 2"), element ); - assert!(db.get(&[TEST_LEAF, b"key1", b"key2"], b"key4").is_err()); + assert!(db + .get(&[TEST_LEAF, b"key1", b"key2"], b"key4", None) + .is_err()); } #[test] @@ -260,13 +285,19 @@ fn test_proof_construction() { let mut temp_db = make_grovedb(); // Insert level 1 nodes temp_db - .insert(&[TEST_LEAF], b"innertree".to_vec(), Element::empty_tree()) + .insert( + &[TEST_LEAF], + b"innertree".to_vec(), + Element::empty_tree(), + None, + ) .expect("successful subtree insert"); temp_db .insert( &[ANOTHER_TEST_LEAF], b"innertree2".to_vec(), Element::empty_tree(), + None, ) .expect("successful subtree insert"); temp_db @@ -274,6 +305,7 @@ fn test_proof_construction() { &[ANOTHER_TEST_LEAF], b"innertree3".to_vec(), Element::empty_tree(), + None, ) .expect("successful subtree insert"); // Insert level 2 nodes @@ -282,6 +314,7 @@ fn test_proof_construction() { &[TEST_LEAF, b"innertree"], b"key1".to_vec(), Element::Item(b"value1".to_vec()), + None, ) .expect("successful subtree insert"); temp_db @@ -289,6 +322,7 @@ fn test_proof_construction() { &[TEST_LEAF, b"innertree"], b"key2".to_vec(), Element::Item(b"value2".to_vec()), + None, ) .expect("successful subtree insert"); temp_db @@ -296,6 +330,7 @@ fn test_proof_construction() { &[ANOTHER_TEST_LEAF, b"innertree2"], b"key3".to_vec(), Element::Item(b"value3".to_vec()), + None, ) .expect("successful subtree insert"); temp_db @@ -303,6 +338,7 @@ fn test_proof_construction() { &[ANOTHER_TEST_LEAF, b"innertree3"], b"key4".to_vec(), Element::Item(b"value4".to_vec()), + None, ) .expect("successful subtree insert"); @@ -310,26 +346,26 @@ fn test_proof_construction() { // Insert level 2 nodes let mut inner_tree = TempMerk::new(); let value_one = Element::Item(b"value1".to_vec()); - value_one.insert(&mut inner_tree, b"key1".to_vec()); + value_one.insert(&mut inner_tree, b"key1".to_vec(), None); let value_two = Element::Item(b"value2".to_vec()); - value_two.insert(&mut inner_tree, b"key2".to_vec()); + value_two.insert(&mut inner_tree, b"key2".to_vec(), None); let mut inner_tree_2 = TempMerk::new(); let value_three = Element::Item(b"value3".to_vec()); - value_three.insert(&mut inner_tree_2, b"key3".to_vec()); + value_three.insert(&mut inner_tree_2, b"key3".to_vec(), None); let mut inner_tree_3 = TempMerk::new(); let value_four = Element::Item(b"value4".to_vec()); - value_four.insert(&mut inner_tree_3, b"key4".to_vec()); + value_four.insert(&mut inner_tree_3, b"key4".to_vec(), None); // Insert level 1 nodes let mut test_leaf = TempMerk::new(); let inner_tree_root = Element::Tree(inner_tree.root_hash()); - inner_tree_root.insert(&mut test_leaf, b"innertree".to_vec()); + inner_tree_root.insert(&mut test_leaf, b"innertree".to_vec(), None); let mut another_test_leaf = TempMerk::new(); let inner_tree_2_root = Element::Tree(inner_tree_2.root_hash()); - inner_tree_2_root.insert(&mut another_test_leaf, b"innertree2".to_vec()); + inner_tree_2_root.insert(&mut another_test_leaf, b"innertree2".to_vec(), None); let inner_tree_3_root = Element::Tree(inner_tree_3.root_hash()); - inner_tree_3_root.insert(&mut another_test_leaf, b"innertree3".to_vec()); + inner_tree_3_root.insert(&mut another_test_leaf, b"innertree3".to_vec(), None); // Insert root nodes let leaves = [test_leaf.root_hash(), another_test_leaf.root_hash()]; let root_tree = MerkleTree::::from_leaves(&leaves); @@ -468,13 +504,19 @@ fn test_successful_proof_verification() { let mut temp_db = make_grovedb(); // Insert level 1 nodes temp_db - .insert(&[TEST_LEAF], b"innertree".to_vec(), Element::empty_tree()) + .insert( + &[TEST_LEAF], + b"innertree".to_vec(), + Element::empty_tree(), + None, + ) .expect("successful subtree insert"); temp_db .insert( &[ANOTHER_TEST_LEAF], b"innertree2".to_vec(), Element::empty_tree(), + None, ) .expect("successful subtree insert"); temp_db @@ -482,6 +524,7 @@ fn test_successful_proof_verification() { &[ANOTHER_TEST_LEAF], b"innertree3".to_vec(), Element::empty_tree(), + None, ) .expect("successful subtree insert"); // Insert level 2 nodes @@ -490,6 +533,7 @@ fn test_successful_proof_verification() { &[TEST_LEAF, b"innertree"], b"key1".to_vec(), Element::Item(b"value1".to_vec()), + None, ) .expect("successful subtree insert"); temp_db @@ -497,6 +541,7 @@ fn test_successful_proof_verification() { &[TEST_LEAF, b"innertree"], b"key2".to_vec(), Element::Item(b"value2".to_vec()), + None, ) .expect("successful subtree insert"); temp_db @@ -504,6 +549,7 @@ fn test_successful_proof_verification() { &[ANOTHER_TEST_LEAF, b"innertree2"], b"key3".to_vec(), Element::Item(b"value3".to_vec()), + None, ) .expect("successful subtree insert"); temp_db @@ -511,6 +557,7 @@ fn test_successful_proof_verification() { &[ANOTHER_TEST_LEAF, b"innertree3"], b"key4".to_vec(), Element::Item(b"value4".to_vec()), + None, ) .expect("successful subtree insert"); @@ -570,104 +617,208 @@ fn test_successful_proof_verification() { assert_eq!(elem, Element::Item(b"value3".to_vec())); } +// #[test] +// fn test_checkpoint() { +// let mut db = make_grovedb(); +// let element1 = Element::Item(b"ayy".to_vec()); +// +// db.insert(&[], b"key1".to_vec(), Element::empty_tree()) +// .expect("cannot insert a subtree 1 into GroveDB"); +// db.insert(&[b"key1"], b"key2".to_vec(), Element::empty_tree()) +// .expect("cannot insert a subtree 2 into GroveDB"); +// db.insert(&[b"key1", b"key2"], b"key3".to_vec(), element1.clone()) +// .expect("cannot insert an item into GroveDB"); +// +// assert_eq!( +// db.get(&[b"key1", b"key2"], b"key3") +// .expect("cannot get from grovedb"), +// element1 +// ); +// +// let checkpoint_tempdir = TempDir::new("checkpoint").expect("cannot open +// tempdir"); let mut checkpoint = db +// .checkpoint(checkpoint_tempdir.path().join("checkpoint")) +// .expect("cannot create a checkpoint"); +// +// assert_eq!( +// db.get(&[b"key1", b"key2"], b"key3") +// .expect("cannot get from grovedb"), +// element1 +// ); +// assert_eq!( +// checkpoint +// .get(&[b"key1", b"key2"], b"key3") +// .expect("cannot get from checkpoint"), +// element1 +// ); +// +// let element2 = Element::Item(b"ayy2".to_vec()); +// let element3 = Element::Item(b"ayy3".to_vec()); +// +// checkpoint +// .insert(&[b"key1"], b"key4".to_vec(), element2.clone()) +// .expect("cannot insert into checkpoint"); +// +// db.insert(&[b"key1"], b"key4".to_vec(), element3.clone()) +// .expect("cannot insert into GroveDB"); +// +// assert_eq!( +// checkpoint +// .get(&[b"key1"], b"key4") +// .expect("cannot get from checkpoint"), +// element2, +// ); +// +// assert_eq!( +// db.get(&[b"key1"], b"key4") +// .expect("cannot get from GroveDB"), +// element3 +// ); +// +// checkpoint +// .insert(&[b"key1"], b"key5".to_vec(), element3.clone()) +// .expect("cannot insert into checkpoint"); +// +// db.insert(&[b"key1"], b"key6".to_vec(), element3.clone()) +// .expect("cannot insert into GroveDB"); +// +// assert!(matches!( +// checkpoint.get(&[b"key1"], b"key6"), +// Err(Error::InvalidPath(_)) +// )); +// +// assert!(matches!( +// db.get(&[b"key1"], b"key5"), +// Err(Error::InvalidPath(_)) +// )); +// } + #[test] -fn test_checkpoint() { +fn test_insert_if_not_exists() { let mut db = make_grovedb(); - let element1 = Element::Item(b"ayy".to_vec()); - db.insert(&[], b"key1".to_vec(), Element::empty_tree()) - .expect("cannot insert a subtree 1 into GroveDB"); - db.insert(&[b"key1"], b"key2".to_vec(), Element::empty_tree()) - .expect("cannot insert a subtree 2 into GroveDB"); - db.insert(&[b"key1", b"key2"], b"key3".to_vec(), element1.clone()) - .expect("cannot insert an item into GroveDB"); + // Insert twice at the same path assert_eq!( - db.get(&[b"key1", b"key2"], b"key3") - .expect("cannot get from grovedb"), - element1 + db.insert_if_not_exists(&[TEST_LEAF], b"key1".to_vec(), Element::empty_tree(), None) + .expect("Provided valid path"), + true ); - - let checkpoint_tempdir = TempDir::new("checkpoint").expect("cannot open tempdir"); - let mut checkpoint = db - .checkpoint(checkpoint_tempdir.path().join("checkpoint")) - .expect("cannot create a checkpoint"); - assert_eq!( - db.get(&[b"key1", b"key2"], b"key3") - .expect("cannot get from grovedb"), - element1 + db.insert_if_not_exists(&[TEST_LEAF], b"key1".to_vec(), Element::empty_tree(), None) + .expect("Provided valid path"), + false ); - assert_eq!( - checkpoint - .get(&[b"key1", b"key2"], b"key3") - .expect("cannot get from checkpoint"), - element1 + + // Should propagate errors from insertion + let result = db.insert_if_not_exists( + &[TEST_LEAF, b"unknown"], + b"key1".to_vec(), + Element::empty_tree(), + None, ); + assert!(matches!(result, Err(Error::InvalidPath(_)))); +} - let element2 = Element::Item(b"ayy2".to_vec()); - let element3 = Element::Item(b"ayy3".to_vec()); +#[test] +fn transaction_insert_item_with_transaction_should_use_transaction() { + let item_key = b"key3".to_vec(); - checkpoint - .insert(&[b"key1"], b"key4".to_vec(), element2.clone()) - .expect("cannot insert into checkpoint"); + let mut db = make_grovedb(); + db.start_transaction(); + let storage = db.storage(); + let transaction = storage.transaction(); - db.insert(&[b"key1"], b"key4".to_vec(), element3.clone()) - .expect("cannot insert into GroveDB"); + // Check that there's no such key in the DB + let result = db.get(&[TEST_LEAF], &item_key, None); + assert!(matches!(result, Err(Error::InvalidPath(_)))); - assert_eq!( - checkpoint - .get(&[b"key1"], b"key4") - .expect("cannot get from checkpoint"), - element2, - ); + let element1 = Element::Item(b"ayy".to_vec()); - assert_eq!( - db.get(&[b"key1"], b"key4") - .expect("cannot get from GroveDB"), - element3 - ); + db.insert( + &[TEST_LEAF], + item_key.clone(), + element1.clone(), + Some(&transaction), + ) + .expect("cannot insert an item into GroveDB"); - checkpoint - .insert(&[b"key1"], b"key5".to_vec(), element3.clone()) - .expect("cannot insert into checkpoint"); + // The key was inserted inside the transaction, so it shouldn't be possible + // to get it back without committing or using transaction + let result = db.get(&[TEST_LEAF], &item_key, None); + assert!(matches!(result, Err(Error::InvalidPath(_)))); + // Check that the element can be retrieved when transaction is passed + let result_with_transaction = db + .get(&[TEST_LEAF], &item_key, Some(&transaction)) + .expect("Expected to work"); + assert_eq!(result_with_transaction, Element::Item(b"ayy".to_vec())); + + // Test that commit works + // transaction.commit(); + db.commit_transaction(transaction); + + // Check that the change was committed + let result = db + .get(&[TEST_LEAF], &item_key, None) + .expect("Expected transaction to work"); + assert_eq!(result, Element::Item(b"ayy".to_vec())); +} - db.insert(&[b"key1"], b"key6".to_vec(), element3.clone()) - .expect("cannot insert into GroveDB"); +#[test] +fn transaction_insert_tree_with_transaction_should_use_transaction() { + let subtree_key = b"subtree_key".to_vec(); - assert!(matches!( - checkpoint.get(&[b"key1"], b"key6"), - Err(Error::InvalidPath(_)) - )); + let mut db = make_grovedb(); + let storage = db.storage(); + let db_transaction = storage.transaction(); + db.start_transaction(); - assert!(matches!( - db.get(&[b"key1"], b"key5"), - Err(Error::InvalidPath(_)) - )); + // Check that there's no such key in the DB + let result = db.get(&[TEST_LEAF], &subtree_key, None); + assert!(matches!(result, Err(Error::InvalidPath(_)))); + + db.insert( + &[TEST_LEAF], + subtree_key.clone(), + Element::empty_tree(), + Some(&db_transaction), + ) + .expect("cannot insert an item into GroveDB"); + + let result = db.get(&[TEST_LEAF], &subtree_key, None); + assert!(matches!(result, Err(Error::InvalidPath(_)))); + + let result_with_transaction = db + .get(&[TEST_LEAF], &subtree_key, Some(&db_transaction)) + .expect("Expected to work"); + assert_eq!(result_with_transaction, Element::empty_tree()); + + db.commit_transaction(db_transaction); + + let result = db + .get(&[TEST_LEAF], &subtree_key, None) + .expect("Expected transaction to work"); + assert_eq!(result, Element::empty_tree()); } #[test] -fn test_insert_if_not_exists() { +fn transaction_insert_should_return_error_when_trying_to_insert_while_transaction_is_in_process() { + let item_key = b"key3".to_vec(); + let mut db = make_grovedb(); + db.start_transaction(); + let storage = db.storage(); + let transaction = storage.transaction(); - // Insert twice at the same path - assert_eq!( - db.insert_if_not_exists(&[TEST_LEAF], b"key1".to_vec(), Element::empty_tree()) - .expect("Provided valid path"), - true - ); - assert_eq!( - db.insert_if_not_exists(&[TEST_LEAF], b"key1".to_vec(), Element::empty_tree()) - .expect("Provided valid path"), - false - ); + let element1 = Element::Item(b"ayy".to_vec()); - // Should propagate errors from insertion - let result = db.insert_if_not_exists( - &[TEST_LEAF, b"unknown"], - b"key1".to_vec(), - Element::empty_tree(), - ); - assert!(matches!(result, Err(Error::InvalidPath(_)))); + let result = db.insert(&[TEST_LEAF], item_key.clone(), element1.clone(), None); + assert!(matches!(result, Err(Error::DbIsInReadonlyMode))); + + db.commit_transaction(transaction); + + // Check that writes are unlocked after the transaction is committed + let result = db.insert(&[TEST_LEAF], item_key.clone(), element1.clone(), None); + assert!(matches!(result, Ok(()))); } #[test] @@ -677,12 +828,18 @@ fn test_subtree_pairs_iterator() { let element2 = Element::Item(b"lmao".to_vec()); // Insert some nested subtrees - db.insert(&[TEST_LEAF], b"subtree1".to_vec(), Element::empty_tree()) - .expect("successful subtree 1 insert"); + db.insert( + &[TEST_LEAF], + b"subtree1".to_vec(), + Element::empty_tree(), + None, + ) + .expect("successful subtree 1 insert"); db.insert( &[TEST_LEAF, b"subtree1"], b"subtree11".to_vec(), Element::empty_tree(), + None, ) .expect("successful subtree 2 insert"); // Insert an element into subtree @@ -690,10 +847,11 @@ fn test_subtree_pairs_iterator() { &[TEST_LEAF, b"subtree1", b"subtree11"], b"key1".to_vec(), element.clone(), + None, ) .expect("successful value insert"); assert_eq!( - db.get(&[TEST_LEAF, b"subtree1", b"subtree11"], b"key1") + db.get(&[TEST_LEAF, b"subtree1", b"subtree11"], b"key1", None) .expect("succesful get 1"), element ); @@ -701,26 +859,34 @@ fn test_subtree_pairs_iterator() { &[TEST_LEAF, b"subtree1", b"subtree11"], b"key0".to_vec(), element.clone(), + None, ) .expect("successful value insert"); db.insert( &[TEST_LEAF, b"subtree1"], b"subtree12".to_vec(), Element::empty_tree(), + None, ) .expect("successful subtree 3 insert"); - db.insert(&[TEST_LEAF, b"subtree1"], b"key1".to_vec(), element.clone()) - .expect("succesful value insert"); + db.insert( + &[TEST_LEAF, b"subtree1"], + b"key1".to_vec(), + element.clone(), + None, + ) + .expect("succesful value insert"); db.insert( &[TEST_LEAF, b"subtree1"], b"key2".to_vec(), element2.clone(), + None, ) .expect("succesful value insert"); // Iterate over subtree1 to see if keys of other subtrees messed up let mut iter = db - .elements_iterator(&[TEST_LEAF, b"subtree1"]) + .elements_iterator(&[TEST_LEAF, b"subtree1"], None) .expect("cannot create iterator"); assert_eq!(iter.next().unwrap(), Some((b"key1".to_vec(), element))); assert_eq!(iter.next().unwrap(), Some((b"key2".to_vec(), element2))); @@ -751,12 +917,12 @@ fn test_compress_path_not_possible_collision() { fn test_element_deletion() { let mut db = make_grovedb(); let element = Element::Item(b"ayy".to_vec()); - db.insert(&[TEST_LEAF], b"key".to_vec(), element.clone()) + db.insert(&[TEST_LEAF], b"key".to_vec(), element.clone(), None) .expect("successful insert"); let root_hash = db.root_tree.root().unwrap(); - assert!(db.delete(&[TEST_LEAF], b"key".to_vec()).is_ok(),); + assert!(db.delete(&[TEST_LEAF], b"key".to_vec(), None).is_ok()); assert!(matches!( - db.get(&[TEST_LEAF], b"key"), + db.get(&[TEST_LEAF], b"key", None), Err(Error::InvalidPath(_)) )); assert_ne!(root_hash, db.root_tree.root().unwrap()); @@ -767,12 +933,13 @@ fn test_find_subtrees() { let element = Element::Item(b"ayy".to_vec()); let mut db = make_grovedb(); // Insert some nested subtrees - db.insert(&[TEST_LEAF], b"key1".to_vec(), Element::empty_tree()) + db.insert(&[TEST_LEAF], b"key1".to_vec(), Element::empty_tree(), None) .expect("successful subtree 1 insert"); db.insert( &[TEST_LEAF, b"key1"], b"key2".to_vec(), Element::empty_tree(), + None, ) .expect("successful subtree 2 insert"); // Insert an element into subtree @@ -780,12 +947,13 @@ fn test_find_subtrees() { &[TEST_LEAF, b"key1", b"key2"], b"key3".to_vec(), element.clone(), + None, ) .expect("successful value insert"); - db.insert(&[TEST_LEAF], b"key4".to_vec(), Element::empty_tree()) + db.insert(&[TEST_LEAF], b"key4".to_vec(), Element::empty_tree(), None) .expect("successful subtree 3 insert"); let subtrees = db - .find_subtrees(vec![TEST_LEAF.to_vec()]) + .find_subtrees(vec![TEST_LEAF.to_vec()], None) .expect("cannot get subtrees"); assert_eq!( vec![ @@ -803,12 +971,13 @@ fn test_subtree_deletion() { let element = Element::Item(b"ayy".to_vec()); let mut db = make_grovedb(); // Insert some nested subtrees - db.insert(&[TEST_LEAF], b"key1".to_vec(), Element::empty_tree()) + db.insert(&[TEST_LEAF], b"key1".to_vec(), Element::empty_tree(), None) .expect("successful subtree 1 insert"); db.insert( &[TEST_LEAF, b"key1"], b"key2".to_vec(), Element::empty_tree(), + None, ) .expect("successful subtree 2 insert"); // Insert an element into subtree @@ -816,20 +985,21 @@ fn test_subtree_deletion() { &[TEST_LEAF, b"key1", b"key2"], b"key3".to_vec(), element.clone(), + None, ) .expect("successful value insert"); - db.insert(&[TEST_LEAF], b"key4".to_vec(), Element::empty_tree()) + db.insert(&[TEST_LEAF], b"key4".to_vec(), Element::empty_tree(), None) .expect("successful subtree 3 insert"); let root_hash = db.root_tree.root().unwrap(); - db.delete(&[TEST_LEAF], b"key1".to_vec()) + db.delete(&[TEST_LEAF], b"key1".to_vec(), None) .expect("unable to delete subtree"); assert!(matches!( - db.get(&[TEST_LEAF, b"key1", b"key2"], b"key3"), + db.get(&[TEST_LEAF, b"key1", b"key2"], b"key3", None), Err(Error::InvalidPath(_)) )); - assert_eq!(db.subtrees.len(), 3); // TEST_LEAF, ANOTHER_TEST_LEAF and TEST_LEAF.key4 stay - assert!(db.get(&[TEST_LEAF], b"key4").is_ok()); + assert_eq!(db.subtrees.len(), 3); // TEST_LEAF, ANOTHER_TEST_LEAF TEST_LEAF.key4 stay + assert!(db.get(&[TEST_LEAF], b"key4", None).is_ok()); assert_ne!(root_hash, db.root_tree.root().unwrap()); } @@ -838,33 +1008,37 @@ fn test_get_query() { let mut db = make_grovedb(); // Insert a couple of subtrees first - db.insert(&[TEST_LEAF], b"key1".to_vec(), Element::empty_tree()) + db.insert(&[TEST_LEAF], b"key1".to_vec(), Element::empty_tree(), None) .expect("successful subtree insert"); - db.insert(&[TEST_LEAF], b"key2".to_vec(), Element::empty_tree()) + db.insert(&[TEST_LEAF], b"key2".to_vec(), Element::empty_tree(), None) .expect("successful subtree insert"); // Insert some elements into subtree db.insert( &[TEST_LEAF, b"key1"], b"key3".to_vec(), Element::Item(b"ayya".to_vec()), + None, ) .expect("successful value insert"); db.insert( &[TEST_LEAF, b"key1"], b"key4".to_vec(), Element::Item(b"ayyb".to_vec()), + None, ) .expect("successful value insert"); db.insert( &[TEST_LEAF, b"key1"], b"key5".to_vec(), Element::Item(b"ayyc".to_vec()), + None, ) .expect("successful value insert"); db.insert( &[TEST_LEAF, b"key2"], b"key6".to_vec(), Element::Item(b"ayyd".to_vec()), + None, ) .expect("successful value insert"); @@ -879,7 +1053,7 @@ fn test_get_query() { let path_query2 = PathQuery::new(&path2, query2); assert_eq!( - db.get_query(&[path_query1, path_query2]) + db.get_query(&[path_query1, path_query2], None) .expect("expected successful get_query"), vec![ subtree::Element::Item(b"ayya".to_vec()), @@ -894,12 +1068,13 @@ fn test_aux_uses_separate_cf() { let element = Element::Item(b"ayy".to_vec()); let mut db = make_grovedb(); // Insert some nested subtrees - db.insert(&[TEST_LEAF], b"key1".to_vec(), Element::empty_tree()) + db.insert(&[TEST_LEAF], b"key1".to_vec(), Element::empty_tree(), None) .expect("successful subtree 1 insert"); db.insert( &[TEST_LEAF, b"key1"], b"key2".to_vec(), Element::empty_tree(), + None, ) .expect("successful subtree 2 insert"); // Insert an element into subtree @@ -907,6 +1082,7 @@ fn test_aux_uses_separate_cf() { &[TEST_LEAF, b"key1", b"key2"], b"key3".to_vec(), element.clone(), + None, ) .expect("successful value insert"); @@ -916,7 +1092,7 @@ fn test_aux_uses_separate_cf() { db.delete_aux(b"key3").expect("cannot delete from aux"); assert_eq!( - db.get(&[TEST_LEAF, b"key1", b"key2"], b"key3") + db.get(&[TEST_LEAF, b"key1", b"key2"], b"key3", None) .expect("cannot get element"), element ); @@ -928,6 +1104,6 @@ fn test_aux_uses_separate_cf() { db.get_aux(b"key2").expect("cannot get from aux"), Some(b"b".to_vec()) ); - assert_eq!(db.get_aux(b"key3").expect("cannot get from aux"), None,); + assert_eq!(db.get_aux(b"key3").expect("cannot get from aux"), None); assert_eq!(db.get_aux(b"key4").expect("cannot get from aux"), None); } diff --git a/grovedb/src/transaction.rs b/grovedb/src/transaction.rs new file mode 100644 index 000000000..8b311c106 --- /dev/null +++ b/grovedb/src/transaction.rs @@ -0,0 +1,102 @@ +use std::collections::HashMap; + +use merk::Merk; +use rs_merkle::{algorithms::Sha256, MerkleTree}; +use storage::rocksdb_storage::{OptimisticTransactionDBTransaction, PrefixedRocksDbStorage}; + +// use super::subtree; + +// pub struct GroveDbTransaction<'a, 'db> { +// grove_db: &'a mut GroveDb, +// db: Rc, +// transaction: Option<::DBTransaction<'db>> } +// +// impl<'a, 'db> GroveDbTransaction<'a, 'db> { +// pub fn new(grove_db: &'a mut GroveDb, db: +// Rc) -> Self { let +// kek = Self { grove_db, db, transaction: None +// }; +// kek.start() +// } +// +// fn start(mut self) -> Self { +// self.transaction = Some(self.db.transaction()); +// self +// } +// +// pub fn insert( +// &mut self, +// path: &[&[u8]], +// key: Vec, +// mut element: subtree::Element, +// ) -> Result<(), Error> { +// self.grove_db.insert(path, key, element, self.transaction.as_ref()) +// } +// +// pub fn insert_if_not_exists( +// &mut self, +// path: &[&[u8]], +// key: Vec, +// element: subtree::Element, +// ) -> Result { +// self.grove_db.insert_if_not_exists(path, key, element, +// self.transaction.as_ref()) } +// +// // pub fn get(&self, path: &[&[u8]], key: &[u8]) -> +// Result { // self.grove_db.get(path, key, +// Some(&self.transaction)) // } +// +// // /// Commits and consumes the transaction +// // pub fn commit(self) -> Result<(), Error> { +// // +// self.transaction.commit().map_err(Into::::into)? +// ; // Ok(()) +// // } +// // +// // /// Rolls back the transaction +// // pub fn rollback(&self) -> Result<(), Error> { +// // +// self.transaction.rollback().map_err(Into:::: +// into)?; // Ok(()) +// // } +// } + +pub struct GroveDbTransaction<'a> { + db_transaction: OptimisticTransactionDBTransaction<'a>, + root_tree: MerkleTree, + root_leaf_keys: HashMap, usize>, + subtrees: HashMap, Merk>, +} + +impl<'a> GroveDbTransaction<'a> { + pub fn new( + db_transaction: OptimisticTransactionDBTransaction<'a>, + root_tree: MerkleTree, + root_leaf_keys: HashMap, usize>, + subtrees: HashMap, Merk>, + ) -> Self { + Self { + db_transaction, + root_tree, + root_leaf_keys, + subtrees, + } + } + + pub fn root_leaf_keys_mut(&mut self) -> &mut HashMap, usize> { + &mut self.root_leaf_keys + } + + pub fn root_leaf_keys(&self) -> &HashMap, usize> { + &self.root_leaf_keys + } + + pub fn db_transaction(&self) -> &OptimisticTransactionDBTransaction { + &self.db_transaction + } + + pub fn commit_db(self) -> Result<(), storage::rocksdb_storage::Error> { + self.db_transaction.commit() + } +} diff --git a/merk/Cargo.toml b/merk/Cargo.toml index 493fad326..b82930f21 100644 --- a/merk/Cargo.toml +++ b/merk/Cargo.toml @@ -10,7 +10,7 @@ license = "MIT" tempdir = "0.3.7" storage = { path = "../storage" } thiserror = "1.0.30" -rocksdb = "0.17.0" +rocksdb = { git = "https://github.com/yiyuanliu/rust-rocksdb", branch = "transaction" } anyhow = "1.0.51" failure = "0.1.8" diff --git a/merk/src/merk/chunks.rs b/merk/src/merk/chunks.rs index 55a9aaa21..6e5396ec8 100644 --- a/merk/src/merk/chunks.rs +++ b/merk/src/merk/chunks.rs @@ -193,7 +193,7 @@ mod tests { fn len_small() { let mut merk = TempMerk::new(); let batch = make_batch_seq(1..256); - merk.apply(batch.as_slice(), &[]).unwrap(); + merk.apply(batch.as_slice(), &[], None).unwrap(); let chunks = merk.chunks().unwrap(); assert_eq!(chunks.len(), 1); @@ -204,7 +204,7 @@ mod tests { fn len_big() { let mut merk = TempMerk::new(); let batch = make_batch_seq(1..10_000); - merk.apply(batch.as_slice(), &[]).unwrap(); + merk.apply(batch.as_slice(), &[], None).unwrap(); let chunks = merk.chunks().unwrap(); assert_eq!(chunks.len(), 129); @@ -215,7 +215,7 @@ mod tests { fn generate_and_verify_chunks() { let mut merk = TempMerk::new(); let batch = make_batch_seq(1..10_000); - merk.apply(batch.as_slice(), &[]).unwrap(); + merk.apply(batch.as_slice(), &[], None).unwrap(); let mut chunks = merk.chunks().unwrap().into_iter().map(Result::unwrap); @@ -241,7 +241,7 @@ mod tests { let mut merk = Merk::open(PrefixedRocksDbStorage::new(db, Vec::new()).unwrap()).unwrap(); let batch = make_batch_seq(1..10); - merk.apply(batch.as_slice(), &[]).unwrap(); + merk.apply(batch.as_slice(), &[], None).unwrap(); merk.chunks() .unwrap() @@ -288,7 +288,7 @@ mod tests { fn random_access_chunks() { let mut merk = TempMerk::new(); let batch = make_batch_seq(1..111); - merk.apply(batch.as_slice(), &[]).unwrap(); + merk.apply(batch.as_slice(), &[], None).unwrap(); let chunks = merk .chunks() @@ -322,7 +322,7 @@ mod tests { fn test_chunk_index_oob() { let mut merk = TempMerk::new(); let batch = make_batch_seq(1..42); - merk.apply(batch.as_slice(), &[]).unwrap(); + merk.apply(batch.as_slice(), &[], None).unwrap(); let mut producer = merk.chunks().unwrap(); let _chunk = producer.chunk(50000).unwrap(); @@ -332,7 +332,7 @@ mod tests { fn test_chunk_index_gt_1_access() { let mut merk = TempMerk::new(); let batch = make_batch_seq(1..513); - merk.apply(batch.as_slice(), &[]).unwrap(); + merk.apply(batch.as_slice(), &[], None).unwrap(); let mut producer = merk.chunks().unwrap(); println!("length: {}", producer.len()); @@ -413,7 +413,7 @@ mod tests { fn test_next_chunk_index_oob() { let mut merk = TempMerk::new(); let batch = make_batch_seq(1..42); - merk.apply(batch.as_slice(), &[]).unwrap(); + merk.apply(batch.as_slice(), &[], None).unwrap(); let mut producer = merk.chunks().unwrap(); let _chunk1 = producer.next_chunk(); diff --git a/merk/src/merk/mod.rs b/merk/src/merk/mod.rs index 7d2a62401..99b64ad00 100644 --- a/merk/src/merk/mod.rs +++ b/merk/src/merk/mod.rs @@ -4,7 +4,7 @@ pub mod chunks; use std::{cell::Cell, cmp::Ordering, collections::LinkedList}; use anyhow::{anyhow, bail, Result}; -use storage::{self, Batch, RawIterator, Storage, Store}; +use storage::{self, rocksdb_storage::PrefixedRocksDbStorage, Batch, RawIterator, Storage, Store}; use crate::{ proofs::{encode_into, query::QueryItem, Query}, @@ -41,10 +41,10 @@ where } /// Deletes tree data - pub fn clear(self) -> Result<()> { + pub fn clear<'a>(&'a mut self, transaction: Option<&'a S::DBTransaction<'a>>) -> Result<()> { let mut iter = self.raw_iter(); iter.seek_to_first(); - let mut to_delete = self.storage.new_batch()?; + let mut to_delete = self.storage.new_batch(transaction)?; while iter.valid() { if let Some(key) = iter.key() { to_delete.delete(key); @@ -52,6 +52,7 @@ where iter.next(); } self.storage.commit_batch(to_delete)?; + self.tree.set(None); Ok(()) } @@ -127,7 +128,7 @@ where /// # Example /// ``` /// # let mut store = merk::test_utils::TempMerk::new(); - /// # store.apply(&[(vec![4,5,6], Op::Put(vec![0]))], &[]).unwrap(); + /// # store.apply(&[(vec![4,5,6], Op::Put(vec![0]))], &[], None).unwrap(); /// /// use merk::Op; /// @@ -135,9 +136,14 @@ where /// (vec![1, 2, 3], Op::Put(vec![4, 5, 6])), // puts value [4,5,6] to key[1,2,3] /// (vec![4, 5, 6], Op::Delete), // deletes key [4,5,6] /// ]; - /// store.apply(batch, &[]).unwrap(); + /// store.apply(batch, &[], None).unwrap(); /// ``` - pub fn apply(&mut self, batch: &MerkBatch, aux: &MerkBatch) -> Result<()> { + pub fn apply<'a: 'b, 'b>( + &'a mut self, + batch: &MerkBatch, + aux: &MerkBatch, + transaction: Option<&'b S::DBTransaction<'b>>, + ) -> Result<()> { // ensure keys in batch are sorted and unique let mut maybe_prev_key: Option> = None; for (key, _) in batch.iter() { @@ -151,7 +157,7 @@ where maybe_prev_key = Some(key.to_vec()); } - unsafe { self.apply_unchecked(batch, aux) } + unsafe { self.apply_unchecked(batch, aux, transaction) } } /// Applies a batch of operations (puts and deletes) to the tree. @@ -165,7 +171,7 @@ where /// # Example /// ``` /// # let mut store = merk::test_utils::TempMerk::new(); - /// # store.apply(&[(vec![4,5,6], Op::Put(vec![0]))], &[]).unwrap(); + /// # store.apply(&[(vec![4,5,6], Op::Put(vec![0]))], &[], None).unwrap(); /// /// use merk::Op; /// @@ -173,9 +179,14 @@ where /// (vec![1, 2, 3], Op::Put(vec![4, 5, 6])), // puts value [4,5,6] to key [1,2,3] /// (vec![4, 5, 6], Op::Delete), // deletes key [4,5,6] /// ]; - /// unsafe { store.apply_unchecked(batch, &[]).unwrap() }; + /// unsafe { store.apply_unchecked(batch, &[], None).unwrap() }; /// ``` - pub unsafe fn apply_unchecked(&mut self, batch: &MerkBatch, aux: &MerkBatch) -> Result<()> { + pub unsafe fn apply_unchecked<'a: 'b, 'b>( + &'a mut self, + batch: &MerkBatch, + aux: &MerkBatch, + transaction: Option<&'b S::DBTransaction<'b>>, + ) -> Result<()> { let maybe_walker = self .tree .take() @@ -186,7 +197,7 @@ where self.tree.set(maybe_tree); // commit changes to db - self.commit(deleted_keys, aux) + self.commit(deleted_keys, aux, transaction) } /// Creates a Merkle proof for the list of queried keys. For each key in the @@ -236,8 +247,13 @@ where }) } - pub fn commit(&mut self, deleted_keys: LinkedList>, aux: &MerkBatch) -> Result<()> { - let mut batch = self.storage.new_batch()?; + pub fn commit<'a: 'b, 'b>( + &'a mut self, + deleted_keys: LinkedList>, + aux: &MerkBatch, + transaction: Option<&'b S::DBTransaction<'b>>, + ) -> Result<()> { + let mut batch = self.storage.new_batch(transaction)?; let mut to_batch = self.use_tree_mut(|maybe_tree| -> UseTreeMutResult { // TODO: concurrent commit if let Some(tree) = maybe_tree { @@ -329,6 +345,23 @@ where } } +impl Clone for Merk { + fn clone(&self) -> Self { + let tree_clone = match self.tree.take() { + None => None, + Some(tree) => { + let clone = tree.clone(); + self.tree.set(Some(tree)); + Some(clone) + } + }; + Self { + tree: Cell::new(tree_clone), + storage: self.storage.clone(), + } + } +} + // TODO: get rid of Fetch/source and use GroveDB storage abstraction #[derive(Debug)] pub struct MerkSource<'a, S: Storage> { @@ -387,8 +420,11 @@ impl Commit for MerkCommitter { #[cfg(test)] mod test { + use rocksdb::{DBRawIteratorWithThreadMode, OptimisticTransactionDB}; use storage::{ - rocksdb_storage::{default_rocksdb, PrefixedRocksDbStorage}, + rocksdb_storage::{ + default_rocksdb, PrefixedRocksDbStorage, RawPrefixedTransactionalIterator, + }, RawIterator, }; use tempdir::TempDir; @@ -396,6 +432,8 @@ mod test { use super::{Merk, MerkSource, RefWalker}; use crate::{test_utils::*, Op}; + type OptimisticTransactionDBRawIterator<'a> = + DBRawIteratorWithThreadMode<'a, OptimisticTransactionDB>; // TODO: Close and then reopen test fn assert_invariants(merk: &TempMerk) { @@ -410,7 +448,7 @@ mod test { let batch_size = 20; let mut merk = TempMerk::new(); let batch = make_batch_seq(0..batch_size); - merk.apply(&batch, &[]).expect("apply failed"); + merk.apply(&batch, &[], None).expect("apply failed"); assert_invariants(&merk); assert_eq!( @@ -428,11 +466,11 @@ mod test { let mut merk = TempMerk::new(); let batch = make_batch_seq(0..batch_size); - merk.apply(&batch, &[]).expect("apply failed"); + merk.apply(&batch, &[], None).expect("apply failed"); assert_invariants(&merk); let batch = make_batch_seq(batch_size..(batch_size * 2)); - merk.apply(&batch, &[]).expect("apply failed"); + merk.apply(&batch, &[], None).expect("apply failed"); assert_invariants(&merk); } @@ -445,7 +483,7 @@ mod test { for i in 0..(tree_size / batch_size) { println!("i:{}", i); let batch = make_batch_rand(batch_size, i); - merk.apply(&batch, &[]).expect("apply failed"); + merk.apply(&batch, &[], None).expect("apply failed"); } } @@ -454,10 +492,10 @@ mod test { let mut merk = TempMerk::new(); let batch = make_batch_rand(10, 1); - merk.apply(&batch, &[]).expect("apply failed"); + merk.apply(&batch, &[], None).expect("apply failed"); let key = batch.first().unwrap().0.clone(); - merk.apply(&[(key.clone(), Op::Delete)], &[]).unwrap(); + merk.apply(&[(key.clone(), Op::Delete)], &[], None).unwrap(); let value = merk.inner.get(key.as_slice()).unwrap(); assert!(value.is_none()); @@ -466,7 +504,7 @@ mod test { #[test] fn aux_data() { let mut merk = TempMerk::new(); - merk.apply(&[], &[(vec![1, 2, 3], Op::Put(vec![4, 5, 6]))]) + merk.apply(&[], &[(vec![1, 2, 3], Op::Put(vec![4, 5, 6]))], None) .expect("apply failed"); let val = merk.get_aux(&[1, 2, 3]).unwrap(); assert_eq!(val, Some(vec![4, 5, 6])); @@ -479,12 +517,13 @@ mod test { merk.apply( &[(vec![0], Op::Put(vec![1]))], &[(vec![2], Op::Put(vec![3]))], + None, ) .expect("apply failed"); // make enough changes so that main column family gets auto-flushed for i in 0..250 { - merk.apply(&make_batch_seq(i * 2_000..(i + 1) * 2_000), &[]) + merk.apply(&make_batch_seq(i * 2_000..(i + 1) * 2_000), &[], None) .expect("apply failed"); } merk.crash(); @@ -500,7 +539,7 @@ mod test { assert!(merk.get(&[1, 2, 3]).unwrap().is_none()); // cached - merk.apply(&[(vec![5, 5, 5], Op::Put(vec![]))], &[]) + merk.apply(&[(vec![5, 5, 5], Op::Put(vec![]))], &[], None) .unwrap(); assert!(merk.get(&[1, 2, 3]).unwrap().is_none()); @@ -512,6 +551,7 @@ mod test { (vec![2, 2, 2], Op::Put(vec![])), ], &[], + None, ) .unwrap(); assert!(merk.get(&[3, 3, 3]).unwrap().is_none()); @@ -535,7 +575,7 @@ mod test { let mut merk = Merk::open(PrefixedRocksDbStorage::new(db, Vec::new()).unwrap()).unwrap(); let batch = make_batch_seq(1..10_000); - merk.apply(batch.as_slice(), &[]).unwrap(); + merk.apply(batch.as_slice(), &[], None).unwrap(); let mut tree = merk.tree.take().unwrap(); let walker = RefWalker::new(&mut tree, merk.source()); @@ -557,7 +597,10 @@ mod test { #[test] fn reopen_iter() { - fn collect(iter: &mut impl RawIterator, nodes: &mut Vec<(Vec, Vec)>) { + fn collect( + iter: &mut RawPrefixedTransactionalIterator, + nodes: &mut Vec<(Vec, Vec)>, + ) { while iter.valid() { nodes.push((iter.key().unwrap().to_vec(), iter.value().unwrap().to_vec())); iter.next(); @@ -570,7 +613,7 @@ mod test { let mut merk = Merk::open(PrefixedRocksDbStorage::new(db, Vec::new()).unwrap()).unwrap(); let batch = make_batch_seq(1..10_000); - merk.apply(batch.as_slice(), &[]).unwrap(); + merk.apply(batch.as_slice(), &[], None).unwrap(); let mut nodes = vec![]; collect(&mut merk.raw_iter(), &mut nodes); diff --git a/merk/src/proofs/chunk.rs b/merk/src/proofs/chunk.rs index f0495878c..cbaaf23f4 100644 --- a/merk/src/proofs/chunk.rs +++ b/merk/src/proofs/chunk.rs @@ -407,7 +407,7 @@ mod tests { fn leaf_chunk_roundtrip() { let mut merk = TempMerk::new(); let batch = make_batch_seq(0..31); - merk.apply(batch.as_slice(), &[]).unwrap(); + merk.apply(batch.as_slice(), &[], None).unwrap(); let root_node = merk.tree.take(); let root_key = root_node.as_ref().unwrap().key().to_vec(); diff --git a/merk/src/test_utils/crash_merk.rs b/merk/src/test_utils/crash_merk.rs index df007831b..09048f4da 100644 --- a/merk/src/test_utils/crash_merk.rs +++ b/merk/src/test_utils/crash_merk.rs @@ -14,7 +14,7 @@ use crate::Merk; pub struct CrashMerk { merk: Merk, path: Option, - _db: Rc, + _db: Rc, } impl CrashMerk { @@ -61,7 +61,7 @@ mod tests { #[ignore] // currently this still works because we enabled the WAL fn crash() { let mut merk = CrashMerk::open().expect("failed to open merk"); - merk.apply(&[(vec![1, 2, 3], Op::Put(vec![4, 5, 6]))], &[]) + merk.apply(&[(vec![1, 2, 3], Op::Put(vec![4, 5, 6]))], &[], None) .expect("apply failed"); merk.crash(); diff --git a/merk/src/test_utils/temp_merk.rs b/merk/src/test_utils/temp_merk.rs index fc49a27e5..d784c4945 100644 --- a/merk/src/test_utils/temp_merk.rs +++ b/merk/src/test_utils/temp_merk.rs @@ -12,7 +12,7 @@ use crate::Merk; pub struct TempMerk { pub inner: Merk, pub path: TempDir, - _db: Rc, + _db: Rc, } impl TempMerk { diff --git a/merk/src/tree/kv.rs b/merk/src/tree/kv.rs index 8307d5b03..95801551f 100644 --- a/merk/src/tree/kv.rs +++ b/merk/src/tree/kv.rs @@ -10,6 +10,7 @@ use super::hash::{kv_hash, Hash, HASH_LENGTH, NULL_HASH}; // field and value field. /// Contains a key/value pair, and the hash of the key/value pair. +#[derive(Clone)] pub struct KV { pub(super) key: Vec, pub(super) value: Vec, diff --git a/merk/src/tree/link.rs b/merk/src/tree/link.rs index d02820ea8..b3ad2058c 100644 --- a/merk/src/tree/link.rs +++ b/merk/src/tree/link.rs @@ -11,6 +11,7 @@ use super::{hash::Hash, Tree}; /// Represents a reference to a child tree node. Links may or may not contain /// the child's `Tree` instance (storing its key if not). +#[derive(Clone)] pub enum Link { /// Represents a child tree node which has been pruned from memory, only /// retaining a reference to it (its key). The child node can always be diff --git a/merk/src/tree/mod.rs b/merk/src/tree/mod.rs index a6dbad230..0ece56160 100644 --- a/merk/src/tree/mod.rs +++ b/merk/src/tree/mod.rs @@ -25,7 +25,7 @@ pub use walk::{Fetch, RefWalker, Walker}; // relevant methods /// The fields of the `Tree` type, stored on the heap. -#[derive(Encode, Decode)] +#[derive(Clone, Encode, Decode)] struct TreeInner { left: Option, right: Option, @@ -37,7 +37,7 @@ struct TreeInner { /// Trees' inner fields are stored on the heap so that nodes can recursively /// link to each other, and so we can detach nodes from their parents, then /// reattach without allocating or freeing heap memory. -#[derive(Encode, Decode)] +#[derive(Clone, Encode, Decode)] pub struct Tree { inner: Box, } diff --git a/merk/src/tree/ops.rs b/merk/src/tree/ops.rs index 7df0301ae..cd53d7e66 100644 --- a/merk/src/tree/ops.rs +++ b/merk/src/tree/ops.rs @@ -97,9 +97,11 @@ where // TODO: take from batch so we don't have to clone let mid_tree = Tree::new(mid_key.to_vec(), mid_value.to_vec()); let mid_walker = Walker::new(mid_tree, PanicSource {}); + + // use walker, ignore deleted_keys since it should be empty Ok(mid_walker .recurse(batch, mid_index, true)? - .0 // use walker, ignore deleted_keys since it should be empty + .0 .map(|w| w.into_inner())) } diff --git a/node-grove/src/lib.rs b/node-grove/src/lib.rs index 48bf89285..007256b15 100644 --- a/node-grove/src/lib.rs +++ b/node-grove/src/lib.rs @@ -1,6 +1,6 @@ mod converter; -use std::{path::Path, sync::mpsc, thread}; +use std::{option::Option::None, path::Path, sync::mpsc, thread}; use grovedb::GroveDb; use neon::prelude::*; @@ -123,7 +123,7 @@ impl GroveDbWrapper { db.send_to_db_thread(move |grove_db: &mut GroveDb, channel| { let path_slice: Vec<&[u8]> = path.iter().map(|fragment| fragment.as_slice()).collect(); - let result = grove_db.get(&path_slice, &key); + let result = grove_db.get(&path_slice, &key, None); channel.send(move |mut task_context| { let callback = js_callback.into_inner(&mut task_context); @@ -169,7 +169,8 @@ impl GroveDbWrapper { db.send_to_db_thread(move |grove_db: &mut GroveDb, channel| { let path_slice: Vec<&[u8]> = path.iter().map(|fragment| fragment.as_slice()).collect(); - let result = grove_db.insert(&path_slice, key, element); + // TODO: IMPLEMENT BINDINGS FOR THE TRANSACTION + let result = grove_db.insert(&path_slice, key, element, None); channel.send(move |mut task_context| { let callback = js_callback.into_inner(&mut task_context); diff --git a/storage/Cargo.toml b/storage/Cargo.toml index 1eddc5da1..b10b5d9a0 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] num_cpus = "1.13.0" -rocksdb = "0.17.0" +rocksdb = { git = "https://github.com/yiyuanliu/rust-rocksdb", branch = "transaction" } tempdir = "0.3.7" thiserror = "1.0.30" diff --git a/storage/src/lib.rs b/storage/src/lib.rs index c88461179..bb7040c92 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -1,6 +1,9 @@ #![feature(generic_associated_types)] pub mod rocksdb_storage; +// Marker trait for underlying DB transactions +pub trait DBTransaction<'a> {} + /// `Storage` is able to store and retrieve arbitrary bytes by key pub trait Storage { /// Storage error type @@ -15,6 +18,13 @@ pub trait Storage { where Self: 'a; + type StorageTransaction<'a>: Transaction + where + Self: 'a; + type DBTransaction<'a>: DBTransaction<'a> + where + Self: 'a; + /// Put `value` into data storage with `key` fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error>; @@ -52,7 +62,10 @@ pub trait Storage { fn get_meta(&self, key: &[u8]) -> Result>, Self::Error>; /// Initialize a new batch - fn new_batch<'a>(&'a self) -> Result, Self::Error>; + fn new_batch<'a: 'b, 'b>( + &'a self, + transaction: Option<&'b Self::DBTransaction<'b>>, + ) -> Result, Self::Error>; /// Commits changes from batch into storage fn commit_batch<'a>(&'a self, batch: Self::Batch<'a>) -> Result<(), Self::Error>; @@ -62,6 +75,9 @@ pub trait Storage { /// Get raw iterator over storage fn raw_iter<'a>(&'a self) -> Self::RawIterator<'a>; + + /// Starts DB transaction + fn transaction<'a>(&'a self, tx: &'a Self::DBTransaction<'a>) -> Self::StorageTransaction<'a>; } impl<'b, S: Storage> Storage for &'b S { @@ -69,11 +85,19 @@ impl<'b, S: Storage> Storage for &'b S { where 'b: 'a, = S::Batch<'a>; + type DBTransaction<'a> + where + 'b: 'a, + = S::DBTransaction<'a>; type Error = S::Error; type RawIterator<'a> where 'b: 'a, = S::RawIterator<'a>; + type StorageTransaction<'a> + where + 'b: 'a, + = S::StorageTransaction<'a>; fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> { (*self).put(key, value) @@ -87,6 +111,10 @@ impl<'b, S: Storage> Storage for &'b S { (*self).put_root(key, value) } + fn put_meta(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> { + (*self).put_meta(key, value) + } + fn delete(&self, key: &[u8]) -> Result<(), Self::Error> { (*self).delete(key) } @@ -99,6 +127,10 @@ impl<'b, S: Storage> Storage for &'b S { (*self).delete_root(key) } + fn delete_meta(&self, key: &[u8]) -> Result<(), Self::Error> { + (*self).delete_meta(key) + } + fn get(&self, key: &[u8]) -> Result>, Self::Error> { (*self).get(key) } @@ -111,20 +143,15 @@ impl<'b, S: Storage> Storage for &'b S { (*self).get_root(key) } - fn put_meta(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> { - (*self).put_meta(key, value) - } - - fn delete_meta(&self, key: &[u8]) -> Result<(), Self::Error> { - (*self).delete_meta(key) - } - fn get_meta(&self, key: &[u8]) -> Result>, Self::Error> { (*self).get_meta(key) } - fn new_batch<'a>(&'a self) -> Result, Self::Error> { - (*self).new_batch() + fn new_batch<'a: 'c, 'c>( + &'a self, + transaction: Option<&'c Self::DBTransaction<'c>>, + ) -> Result, Self::Error> { + (*self).new_batch(transaction) } fn commit_batch<'a>(&'a self, batch: Self::Batch<'a>) -> Result<(), Self::Error> { @@ -138,6 +165,13 @@ impl<'b, S: Storage> Storage for &'b S { fn raw_iter<'a>(&'a self) -> Self::RawIterator<'a> { (*self).raw_iter() } + + fn transaction<'a>( + &'a self, + transaction: &'a Self::DBTransaction<'a>, + ) -> Self::StorageTransaction<'a> { + (*self).transaction(transaction) + } } pub trait Batch { @@ -170,6 +204,52 @@ pub trait RawIterator { fn valid(&self) -> bool; } +/// Please note that the `Transaction` trait is used to access the underlying +/// transaction through the storage, but many storages can share the same DB +/// transaction. Thus, the storage itself can not commit the transaction, and +/// transaction should be committed by its original opener - GroveDB instance in +/// our case. +pub trait Transaction { + /// Storage error type + type Error: std::error::Error + Send + Sync + 'static; + + /// Put `value` into data storage with `key` + fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error>; + + /// Put `value` into auxiliary data storage with `key` + fn put_aux(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error>; + + /// Put `value` into trees roots storage with `key` + fn put_root(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error>; + + /// Put `value` into GroveDB metadata storage with `key` + fn put_meta(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error>; + + /// Delete entry with `key` from data storage + fn delete(&self, key: &[u8]) -> Result<(), Self::Error>; + + /// Delete entry with `key` from auxiliary data storage + fn delete_aux(&self, key: &[u8]) -> Result<(), Self::Error>; + + /// Delete entry with `key` from trees roots storage + fn delete_root(&self, key: &[u8]) -> Result<(), Self::Error>; + + /// Delete entry with `key` from GroveDB metadata storage + fn delete_meta(&self, key: &[u8]) -> Result<(), Self::Error>; + + /// Get entry by `key` from data storage + fn get(&self, key: &[u8]) -> Result>, Self::Error>; + + /// Get entry by `key` from auxiliary data storage + fn get_aux(&self, key: &[u8]) -> Result>, Self::Error>; + + /// Get entry by `key` from trees roots storage + fn get_root(&self, key: &[u8]) -> Result>, Self::Error>; + + /// Get entry by `key` from GroveDB metadata storage + fn get_meta(&self, key: &[u8]) -> Result>, Self::Error>; +} + /// The `Store` trait allows to store its implementor by key using a storage `S` /// or to delete it. pub trait Store diff --git a/storage/src/rocksdb_storage/batch.rs b/storage/src/rocksdb_storage/batch.rs new file mode 100644 index 000000000..30b429216 --- /dev/null +++ b/storage/src/rocksdb_storage/batch.rs @@ -0,0 +1,146 @@ +use rocksdb::{ColumnFamily, OptimisticTransactionDB}; + +use super::make_prefixed_key; +use crate::Batch; + +/// Wrapper to RocksDB batch +pub struct PrefixedRocksDbBatch<'a> { + pub prefix: Vec, + pub batch: rocksdb::WriteBatchWithTransaction, + pub cf_aux: &'a ColumnFamily, + pub cf_roots: &'a ColumnFamily, +} + +impl<'a> Batch for PrefixedRocksDbBatch<'a> { + fn put(&mut self, key: &[u8], value: &[u8]) { + self.batch + .put(make_prefixed_key(self.prefix.clone(), key), value) + } + + fn put_aux(&mut self, key: &[u8], value: &[u8]) { + self.batch.put_cf( + self.cf_aux, + make_prefixed_key(self.prefix.clone(), key), + value, + ) + } + + fn put_root(&mut self, key: &[u8], value: &[u8]) { + self.batch.put_cf( + self.cf_roots, + make_prefixed_key(self.prefix.clone(), key), + value, + ) + } + + fn delete(&mut self, key: &[u8]) { + self.batch + .delete(make_prefixed_key(self.prefix.clone(), key)) + } + + fn delete_aux(&mut self, key: &[u8]) { + self.batch + .delete_cf(self.cf_aux, make_prefixed_key(self.prefix.clone(), key)) + } + + fn delete_root(&mut self, key: &[u8]) { + self.batch + .delete_cf(self.cf_roots, make_prefixed_key(self.prefix.clone(), key)) + } +} + +/// Wrapper to RocksDB batch +pub struct PrefixedTransactionalRocksDbBatch<'a> { + pub prefix: Vec, + pub cf_aux: &'a ColumnFamily, + pub cf_roots: &'a ColumnFamily, + pub transaction: &'a rocksdb::Transaction<'a, OptimisticTransactionDB>, +} + +// TODO: don't ignore errors +impl<'a> Batch for PrefixedTransactionalRocksDbBatch<'a> { + fn put(&mut self, key: &[u8], value: &[u8]) { + self.transaction + .put(make_prefixed_key(self.prefix.clone(), key), value); + } + + fn put_aux(&mut self, key: &[u8], value: &[u8]) { + self.transaction.put_cf( + self.cf_aux, + make_prefixed_key(self.prefix.clone(), key), + value, + ); + } + + fn put_root(&mut self, key: &[u8], value: &[u8]) { + self.transaction.put_cf( + self.cf_roots, + make_prefixed_key(self.prefix.clone(), key), + value, + ); + } + + fn delete(&mut self, key: &[u8]) { + self.transaction + .delete(make_prefixed_key(self.prefix.clone(), key)); + } + + fn delete_aux(&mut self, key: &[u8]) { + self.transaction + .delete_cf(self.cf_aux, make_prefixed_key(self.prefix.clone(), key)); + } + + fn delete_root(&mut self, key: &[u8]) { + self.transaction + .delete_cf(self.cf_roots, make_prefixed_key(self.prefix.clone(), key)); + } +} + +pub enum OrBatch<'a> { + Batch(PrefixedRocksDbBatch<'a>), + TransactionalBatch(PrefixedTransactionalRocksDbBatch<'a>), +} + +impl<'a> Batch for OrBatch<'a> { + fn put(&mut self, key: &[u8], value: &[u8]) { + match self { + Self::TransactionalBatch(batch) => batch.put(key, value), + Self::Batch(batch) => batch.put(key, value), + } + } + + fn put_aux(&mut self, key: &[u8], value: &[u8]) { + match self { + Self::TransactionalBatch(batch) => batch.put_aux(key, value), + Self::Batch(batch) => batch.put_aux(key, value), + } + } + + fn put_root(&mut self, key: &[u8], value: &[u8]) { + match self { + Self::TransactionalBatch(batch) => batch.put_root(key, value), + Self::Batch(batch) => batch.put_root(key, value), + } + } + + fn delete(&mut self, key: &[u8]) { + match self { + Self::TransactionalBatch(batch) => batch.delete(key), + Self::Batch(batch) => batch.delete(key), + } + } + + fn delete_aux(&mut self, key: &[u8]) { + match self { + Self::TransactionalBatch(batch) => batch.delete_aux(key), + Self::Batch(batch) => batch.delete_aux(key), + } + } + + fn delete_root(&mut self, key: &[u8]) { + match self { + Self::TransactionalBatch(batch) => batch.delete_root(key), + Self::Batch(batch) => batch.delete_root(key), + } + } +} diff --git a/storage/src/rocksdb_storage.rs b/storage/src/rocksdb_storage/mod.rs similarity index 63% rename from storage/src/rocksdb_storage.rs rename to storage/src/rocksdb_storage/mod.rs index 3d2d80456..158560364 100644 --- a/storage/src/rocksdb_storage.rs +++ b/storage/src/rocksdb_storage/mod.rs @@ -1,10 +1,19 @@ //! Storage implementation using RocksDB use std::{path::Path, rc::Rc}; -pub use rocksdb::{checkpoint::Checkpoint, Error, DB}; -use rocksdb::{ColumnFamily, ColumnFamilyDescriptor, DBRawIterator, WriteBatch}; +pub use rocksdb::{checkpoint::Checkpoint, Error, OptimisticTransactionDB}; +use rocksdb::{ColumnFamilyDescriptor, DBRawIterator, DBRawIteratorWithThreadMode}; -use crate::{Batch, RawIterator, Storage}; +use crate::{DBTransaction, RawIterator}; + +mod batch; +mod storage; +mod transaction; + +pub use batch::PrefixedRocksDbBatch; +pub use transaction::PrefixedRocksDbTransaction; + +pub use self::storage::{PrefixedRocksDbStorage, PrefixedRocksDbStorageError}; const AUX_CF_NAME: &str = "aux"; const ROOTS_CF_NAME: &str = "roots"; @@ -22,6 +31,10 @@ pub fn default_db_opts() -> rocksdb::Options { opts } +pub type OptimisticTransactionDBTransaction<'a> = rocksdb::Transaction<'a, OptimisticTransactionDB>; + +impl<'a> DBTransaction<'a> for OptimisticTransactionDBTransaction<'a> {} + /// RocksDB column families pub fn column_families() -> Vec { vec![ @@ -32,10 +45,14 @@ pub fn column_families() -> Vec { } /// Create RocksDB with default settings -pub fn default_rocksdb(path: &Path) -> Rc { +pub fn default_rocksdb(path: &Path) -> Rc { Rc::new( - rocksdb::DB::open_cf_descriptors(&default_db_opts(), &path, column_families()) - .expect("cannot create rocksdb"), + rocksdb::OptimisticTransactionDB::open_cf_descriptors( + &default_db_opts(), + &path, + column_families(), + ) + .expect("cannot create rocksdb"), ) } @@ -45,156 +62,52 @@ fn make_prefixed_key(prefix: Vec, key: &[u8]) -> Vec { prefixed_key } -/// RocksDB wrapper to store items with prefixes -pub struct PrefixedRocksDbStorage { - db: Rc, - prefix: Vec, -} - -#[derive(thiserror::Error, Debug)] -pub enum PrefixedRocksDbStorageError { - #[error("column family not found: {0}")] - ColumnFamilyNotFound(&'static str), - #[error(transparent)] - RocksDbError(#[from] rocksdb::Error), -} - -impl PrefixedRocksDbStorage { - /// Wraps RocksDB to prepend prefixes to each operation - pub fn new(db: Rc, prefix: Vec) -> Result { - Ok(PrefixedRocksDbStorage { prefix, db }) - } - - /// Get auxiliary data column family - fn cf_aux(&self) -> Result<&rocksdb::ColumnFamily, PrefixedRocksDbStorageError> { - self.db - .cf_handle(AUX_CF_NAME) - .ok_or(PrefixedRocksDbStorageError::ColumnFamilyNotFound( - AUX_CF_NAME, - )) - } - - /// Get trees roots data column family - fn cf_roots(&self) -> Result<&rocksdb::ColumnFamily, PrefixedRocksDbStorageError> { - self.db - .cf_handle(ROOTS_CF_NAME) - .ok_or(PrefixedRocksDbStorageError::ColumnFamilyNotFound( - ROOTS_CF_NAME, - )) - } - - /// Get metadata column family - fn cf_meta(&self) -> Result<&rocksdb::ColumnFamily, PrefixedRocksDbStorageError> { - self.db - .cf_handle(META_CF_NAME) - .ok_or(PrefixedRocksDbStorageError::ColumnFamilyNotFound( - META_CF_NAME, - )) - } +pub struct RawPrefixedTransactionalIterator<'a> { + rocksdb_iterator: DBRawIteratorWithThreadMode<'a, OptimisticTransactionDB>, + prefix: &'a [u8], } -impl Storage for PrefixedRocksDbStorage { - type Batch<'a> = PrefixedRocksDbBatch<'a>; - type Error = PrefixedRocksDbStorageError; - type RawIterator<'a> = RawPrefixedIterator<'a>; - - fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> { - self.db - .put(make_prefixed_key(self.prefix.clone(), key), value)?; - Ok(()) - } - - fn put_aux(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> { - self.db.put_cf( - self.cf_aux()?, - make_prefixed_key(self.prefix.clone(), key), - value, - )?; - Ok(()) - } - - fn put_root(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> { - self.db.put_cf( - self.cf_roots()?, - make_prefixed_key(self.prefix.clone(), key), - value, - )?; - Ok(()) - } - - fn delete(&self, key: &[u8]) -> Result<(), Self::Error> { - self.db - .delete(make_prefixed_key(self.prefix.clone(), key))?; - Ok(()) - } - - fn delete_aux(&self, key: &[u8]) -> Result<(), Self::Error> { - self.db - .delete_cf(self.cf_aux()?, make_prefixed_key(self.prefix.clone(), key))?; - Ok(()) - } - - fn delete_root(&self, key: &[u8]) -> Result<(), Self::Error> { - self.db.delete_cf( - self.cf_roots()?, - make_prefixed_key(self.prefix.clone(), key), - )?; - Ok(()) - } - - fn get(&self, key: &[u8]) -> Result>, Self::Error> { - Ok(self.db.get(make_prefixed_key(self.prefix.clone(), key))?) - } - - fn get_aux(&self, key: &[u8]) -> Result>, Self::Error> { - Ok(self - .db - .get_cf(self.cf_aux()?, make_prefixed_key(self.prefix.clone(), key))?) - } - - fn get_root(&self, key: &[u8]) -> Result>, Self::Error> { - Ok(self.db.get_cf( - self.cf_roots()?, - make_prefixed_key(self.prefix.clone(), key), - )?) - } - - fn put_meta(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> { - Ok(self.db.put_cf(self.cf_meta()?, key, value)?) +impl RawIterator for RawPrefixedTransactionalIterator<'_> { + fn seek_to_first(&mut self) { + self.rocksdb_iterator.seek(self.prefix); } - fn delete_meta(&self, key: &[u8]) -> Result<(), Self::Error> { - Ok(self.db.delete_cf(self.cf_meta()?, key)?) + fn seek(&mut self, key: &[u8]) { + self.rocksdb_iterator + .seek(make_prefixed_key(self.prefix.to_vec(), key)); } - fn get_meta(&self, key: &[u8]) -> Result>, Self::Error> { - Ok(self.db.get_cf(self.cf_meta()?, key)?) + fn next(&mut self) { + self.rocksdb_iterator.next(); } - fn new_batch<'a>(&'a self) -> Result, Self::Error> { - Ok(PrefixedRocksDbBatch { - prefix: self.prefix.clone(), - batch: WriteBatch::default(), - cf_aux: self.cf_aux()?, - cf_roots: self.cf_roots()?, - }) + fn prev(&mut self) { + self.rocksdb_iterator.prev(); } - fn commit_batch<'a>(&'a self, batch: Self::Batch<'a>) -> Result<(), Self::Error> { - self.db.write(batch.batch)?; - Ok(()) + fn value(&self) -> Option<&[u8]> { + if self.valid() { + self.rocksdb_iterator.value() + } else { + None + } } - fn flush(&self) -> Result<(), Self::Error> { - self.db.flush()?; - Ok(()) + fn key(&self) -> Option<&[u8]> { + if self.valid() { + self.rocksdb_iterator + .key() + .map(|k| k.split_at(self.prefix.len()).1) + } else { + None + } } - fn raw_iter<'a>(&'a self) -> Self::RawIterator<'a> { - RawPrefixedIterator { - rocksdb_iterator: self.db.raw_iterator(), - prefix: &self.prefix, - } + fn valid(&self) -> bool { + self.rocksdb_iterator + .key() + .map(|k| k.starts_with(self.prefix)) + .unwrap_or(false) } } @@ -247,52 +160,6 @@ impl RawIterator for RawPrefixedIterator<'_> { } } -/// Wrapper to RocksDB batch -pub struct PrefixedRocksDbBatch<'a> { - prefix: Vec, - batch: rocksdb::WriteBatch, - cf_aux: &'a ColumnFamily, - cf_roots: &'a ColumnFamily, -} - -impl<'a> Batch for PrefixedRocksDbBatch<'a> { - fn put(&mut self, key: &[u8], value: &[u8]) { - self.batch - .put(make_prefixed_key(self.prefix.clone(), key), value) - } - - fn put_aux(&mut self, key: &[u8], value: &[u8]) { - self.batch.put_cf( - self.cf_aux, - make_prefixed_key(self.prefix.clone(), key), - value, - ) - } - - fn put_root(&mut self, key: &[u8], value: &[u8]) { - self.batch.put_cf( - self.cf_roots, - make_prefixed_key(self.prefix.clone(), key), - value, - ) - } - - fn delete(&mut self, key: &[u8]) { - self.batch - .delete(make_prefixed_key(self.prefix.clone(), key)) - } - - fn delete_aux(&mut self, key: &[u8]) { - self.batch - .delete_cf(self.cf_aux, make_prefixed_key(self.prefix.clone(), key)) - } - - fn delete_root(&mut self, key: &[u8]) { - self.batch - .delete_cf(self.cf_roots, make_prefixed_key(self.prefix.clone(), key)) - } -} - #[cfg(test)] mod tests { use std::ops::Deref; @@ -300,6 +167,7 @@ mod tests { use tempdir::TempDir; use super::*; + use crate::{Batch, Storage, Transaction}; struct TempPrefixedStorage { storage: PrefixedRocksDbStorage, @@ -487,7 +355,7 @@ mod tests { #[test] fn test_batch() { let storage = TempPrefixedStorage::new(); - let mut batch = storage.new_batch().expect("cannot create batch"); + let mut batch = storage.new_batch(None).expect("cannot create batch"); batch.put(b"key1", b"value1"); batch.put(b"key2", b"value2"); batch.put_root(b"root", b"yeet"); diff --git a/storage/src/rocksdb_storage/storage.rs b/storage/src/rocksdb_storage/storage.rs new file mode 100644 index 000000000..75c2491fc --- /dev/null +++ b/storage/src/rocksdb_storage/storage.rs @@ -0,0 +1,340 @@ +use std::rc::Rc; + +use rocksdb::WriteBatchWithTransaction; + +use super::{ + make_prefixed_key, PrefixedRocksDbBatch, PrefixedRocksDbTransaction, + RawPrefixedTransactionalIterator, AUX_CF_NAME, META_CF_NAME, ROOTS_CF_NAME, +}; +use crate::{ + rocksdb_storage::{ + batch::{OrBatch, PrefixedTransactionalRocksDbBatch}, + OptimisticTransactionDBTransaction, + }, + Storage, +}; + +/// RocksDB wrapper to store items with prefixes +#[derive(Clone)] +pub struct PrefixedRocksDbStorage { + pub(crate) db: Rc, + prefix: Vec, +} + +#[derive(thiserror::Error, Debug)] +pub enum PrefixedRocksDbStorageError { + #[error("column family not found: {0}")] + ColumnFamilyNotFound(&'static str), + #[error(transparent)] + RocksDbError(#[from] rocksdb::Error), +} + +impl PrefixedRocksDbStorage { + /// Wraps RocksDB to prepend prefixes to each operation + pub fn new( + db: Rc, + prefix: Vec, + ) -> Result { + Ok(PrefixedRocksDbStorage { prefix, db }) + } + + /// Get auxiliary data column family + fn cf_aux(&self) -> Result<&rocksdb::ColumnFamily, PrefixedRocksDbStorageError> { + self.db + .cf_handle(AUX_CF_NAME) + .ok_or(PrefixedRocksDbStorageError::ColumnFamilyNotFound( + AUX_CF_NAME, + )) + } + + /// Get trees roots data column family + fn cf_roots(&self) -> Result<&rocksdb::ColumnFamily, PrefixedRocksDbStorageError> { + self.db + .cf_handle(ROOTS_CF_NAME) + .ok_or(PrefixedRocksDbStorageError::ColumnFamilyNotFound( + ROOTS_CF_NAME, + )) + } + + /// Get metadata column family + fn cf_meta(&self) -> Result<&rocksdb::ColumnFamily, PrefixedRocksDbStorageError> { + self.db + .cf_handle(META_CF_NAME) + .ok_or(PrefixedRocksDbStorageError::ColumnFamilyNotFound( + META_CF_NAME, + )) + } +} + +impl Storage for PrefixedRocksDbStorage { + type Batch<'a> = OrBatch<'a>; + type DBTransaction<'a> = OptimisticTransactionDBTransaction<'a>; + type Error = PrefixedRocksDbStorageError; + type RawIterator<'a> = RawPrefixedTransactionalIterator<'a>; + type StorageTransaction<'a> = PrefixedRocksDbTransaction<'a>; + + fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> { + self.db + .put(make_prefixed_key(self.prefix.clone(), key), value)?; + Ok(()) + } + + fn put_aux(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> { + self.db.put_cf( + self.cf_aux()?, + make_prefixed_key(self.prefix.clone(), key), + value, + )?; + Ok(()) + } + + fn put_root(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> { + self.db.put_cf( + self.cf_roots()?, + make_prefixed_key(self.prefix.clone(), key), + value, + )?; + Ok(()) + } + + fn delete(&self, key: &[u8]) -> Result<(), Self::Error> { + self.db + .delete(make_prefixed_key(self.prefix.clone(), key))?; + Ok(()) + } + + fn delete_aux(&self, key: &[u8]) -> Result<(), Self::Error> { + self.db + .delete_cf(self.cf_aux()?, make_prefixed_key(self.prefix.clone(), key))?; + Ok(()) + } + + fn delete_root(&self, key: &[u8]) -> Result<(), Self::Error> { + self.db.delete_cf( + self.cf_roots()?, + make_prefixed_key(self.prefix.clone(), key), + )?; + Ok(()) + } + + fn get(&self, key: &[u8]) -> Result>, Self::Error> { + Ok(self.db.get(make_prefixed_key(self.prefix.clone(), key))?) + } + + fn get_aux(&self, key: &[u8]) -> Result>, Self::Error> { + Ok(self + .db + .get_cf(self.cf_aux()?, make_prefixed_key(self.prefix.clone(), key))?) + } + + fn get_root(&self, key: &[u8]) -> Result>, Self::Error> { + Ok(self.db.get_cf( + self.cf_roots()?, + make_prefixed_key(self.prefix.clone(), key), + )?) + } + + fn put_meta(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> { + Ok(self.db.put_cf(self.cf_meta()?, key, value)?) + } + + fn delete_meta(&self, key: &[u8]) -> Result<(), Self::Error> { + Ok(self.db.delete_cf(self.cf_meta()?, key)?) + } + + fn get_meta(&self, key: &[u8]) -> Result>, Self::Error> { + Ok(self.db.get_cf(self.cf_meta()?, key)?) + } + + fn new_batch<'a: 'b, 'b>( + &'a self, + transaction: Option<&'b OptimisticTransactionDBTransaction>, + ) -> Result, Self::Error> { + match transaction { + Some(tx) => Ok(OrBatch::TransactionalBatch( + PrefixedTransactionalRocksDbBatch { + prefix: self.prefix.clone(), + transaction: tx, + cf_aux: self.cf_aux()?, + cf_roots: self.cf_roots()?, + }, + )), + None => Ok(OrBatch::Batch(PrefixedRocksDbBatch { + prefix: self.prefix.clone(), + batch: WriteBatchWithTransaction::::default(), + cf_aux: self.cf_aux()?, + cf_roots: self.cf_roots()?, + })), + } + } + + fn commit_batch<'a>(&'a self, batch: Self::Batch<'a>) -> Result<(), Self::Error> { + // Do nothing if transaction exists, as the transaction must be explicitly + // committed by its creator + match batch { + OrBatch::TransactionalBatch(_) => {} + OrBatch::Batch(batch) => self.db.write(batch.batch)?, + } + Ok(()) + } + + fn flush(&self) -> Result<(), Self::Error> { + self.db.flush()?; + Ok(()) + } + + fn raw_iter<'a>(&'a self) -> Self::RawIterator<'a> { + RawPrefixedTransactionalIterator { + rocksdb_iterator: self.db.raw_iterator(), + prefix: &self.prefix, + } + } + + fn transaction<'a>( + &'a self, + db_transaction: &'a OptimisticTransactionDBTransaction, + ) -> Self::StorageTransaction<'a> { + PrefixedRocksDbTransaction::new(db_transaction, self.prefix.clone(), &self.db) + } +} + +// pub struct TransactionalPrefixedRocksDbStorage<'a> { +// storage: &'a PrefixedRocksDbStorage, +// transaction: Option>, +// } +// +// impl<'a> TransactionalPrefixedRocksDbStorage<'a> { +// pub fn new( +// storage: &'a PrefixedRocksDbStorage, +// db_transaction: Option<&'a OptimisticTransactionDBTransaction>, +// ) -> Self { +// Self { +// storage, +// transaction: db_transaction.map(|tx| storage.transaction(tx)) +// } +// } +// } +// +// impl<'b> Storage for TransactionalPrefixedRocksDbStorage<'b> { +// type Error = PrefixedRocksDbStorageError; +// type Batch<'a> +// where +// 'b: 'a, +// = PrefixedRocksDbBatch<'a>; +// type RawIterator<'a> +// where +// 'b: 'a, +// = DBRawTransactionIterator<'a>; +// type StorageTransaction<'a> +// where +// 'b: 'a, +// = PrefixedRocksDbTransaction<'a>; +// type DBTransaction<'a> +// where +// 'b: 'a, +// = OptimisticTransactionDBTransaction<'a>; +// +// fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> { +// match &self.transaction { +// None => self.storage.put(key, value), +// Some(tx) => tx.put(key, value), +// } +// } +// +// fn put_aux(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> { +// match &self.transaction { +// None => self.storage.put_aux(key, value), +// Some(tx) => tx.put_aux(key, value), +// } +// } +// +// fn put_root(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> { +// match &self.transaction { +// None => self.storage.put_root(key, value), +// Some(tx) => tx.put_root(key, value), +// } +// } +// +// fn put_meta(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> { +// match &self.transaction { +// None => self.storage.put_meta(key, value), +// Some(tx) => tx.put_meta(key, value), +// } +// } +// +// fn delete(&self, key: &[u8]) -> Result<(), Self::Error> { +// match &self.transaction { +// None => self.storage.delete(key), +// Some(tx) => tx.delete(key), +// } +// } +// +// fn delete_aux(&self, key: &[u8]) -> Result<(), Self::Error> { +// match &self.transaction { +// None => self.storage.delete_aux(key), +// Some(tx) => tx.delete_aux(key), +// } +// } +// +// fn delete_root(&self, key: &[u8]) -> Result<(), Self::Error> { +// match &self.transaction { +// None => self.storage.delete_root(key), +// Some(tx) => tx.delete_root(key), +// } +// } +// +// fn delete_meta(&self, key: &[u8]) -> Result<(), Self::Error> { +// match &self.transaction { +// None => self.storage.delete_meta(key), +// Some(tx) => tx.delete_meta(key), +// } +// } +// +// fn get(&self, key: &[u8]) -> Result>, Self::Error> { +// match &self.transaction { +// None => self.storage.get(key), +// Some(tx) => tx.get(key), +// } +// } +// +// fn get_aux(&self, key: &[u8]) -> Result>, Self::Error> { +// match &self.transaction { +// None => self.storage.get_aux(key), +// Some(tx) => tx.get_aux(key), +// } +// } +// +// fn get_root(&self, key: &[u8]) -> Result>, Self::Error> { +// match &self.transaction { +// None => self.storage.get_root(key), +// Some(tx) => tx.get_root(key), +// } +// } +// +// fn get_meta(&self, key: &[u8]) -> Result>, Self::Error> { +// match &self.transaction { +// None => self.storage.get_meta(key), +// Some(tx) => tx.get_meta(key), +// } +// } +// +// fn new_batch<'a>(&'a self) -> Result, Self::Error> { +// self.storage.new_batch() +// } +// +// fn commit_batch<'a>(&'a self, batch: Self::Batch<'a>) -> Result<(), +// Self::Error> { self.storage.commit_batch(batch) +// } +// +// fn flush(&self) -> Result<(), Self::Error> { +// self.storage.flush() +// } +// +// fn raw_iter<'a>(&'a self) -> Self::RawIterator<'a> { +// self.storage.raw_iter() +// } +// +// fn transaction<'a>(&'a self, tx: &'a Self::DBTransaction<'a>) -> +// Self::StorageTransaction<'a> { self.storage.transaction(tx) +// } +// } diff --git a/storage/src/rocksdb_storage/transaction.rs b/storage/src/rocksdb_storage/transaction.rs new file mode 100644 index 000000000..b744e2a9e --- /dev/null +++ b/storage/src/rocksdb_storage/transaction.rs @@ -0,0 +1,132 @@ +use rocksdb::OptimisticTransactionDB; + +use super::{ + make_prefixed_key, PrefixedRocksDbStorageError, AUX_CF_NAME, META_CF_NAME, ROOTS_CF_NAME, +}; +use crate::Transaction; + +pub struct PrefixedRocksDbTransaction<'a> { + transaction: &'a rocksdb::Transaction<'a, OptimisticTransactionDB>, + prefix: Vec, + pub(crate) db: &'a OptimisticTransactionDB, +} +// TODO: Implement snapshots for transactions +impl<'a> PrefixedRocksDbTransaction<'a> { + pub fn new( + transaction: &'a rocksdb::Transaction<'a, OptimisticTransactionDB>, + prefix: Vec, + db: &'a OptimisticTransactionDB, + ) -> Self { + Self { + transaction, + prefix, + db, + } + } + + /// Get auxiliary data column family + fn cf_aux(&self) -> Result<&rocksdb::ColumnFamily, PrefixedRocksDbStorageError> { + self.db + .cf_handle(AUX_CF_NAME) + .ok_or(PrefixedRocksDbStorageError::ColumnFamilyNotFound( + AUX_CF_NAME, + )) + } + + /// Get trees roots data column family + fn cf_roots(&self) -> Result<&rocksdb::ColumnFamily, PrefixedRocksDbStorageError> { + self.db + .cf_handle(ROOTS_CF_NAME) + .ok_or(PrefixedRocksDbStorageError::ColumnFamilyNotFound( + ROOTS_CF_NAME, + )) + } + + /// Get metadata column family + fn cf_meta(&self) -> Result<&rocksdb::ColumnFamily, PrefixedRocksDbStorageError> { + self.db + .cf_handle(META_CF_NAME) + .ok_or(PrefixedRocksDbStorageError::ColumnFamilyNotFound( + META_CF_NAME, + )) + } +} + +impl Transaction for PrefixedRocksDbTransaction<'_> { + type Error = PrefixedRocksDbStorageError; + + fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> { + self.transaction + .put(make_prefixed_key(self.prefix.clone(), key), value)?; + Ok(()) + } + + fn put_aux(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> { + self.transaction.put_cf( + self.cf_aux()?, + make_prefixed_key(self.prefix.clone(), key), + value, + )?; + Ok(()) + } + + fn put_root(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> { + self.transaction.put_cf( + self.cf_roots()?, + make_prefixed_key(self.prefix.clone(), key), + value, + )?; + Ok(()) + } + + fn put_meta(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> { + Ok(self.transaction.put_cf(self.cf_meta()?, key, value)?) + } + + fn delete(&self, key: &[u8]) -> Result<(), Self::Error> { + self.transaction + .delete(make_prefixed_key(self.prefix.clone(), key))?; + Ok(()) + } + + fn delete_aux(&self, key: &[u8]) -> Result<(), Self::Error> { + self.transaction + .delete_cf(self.cf_aux()?, make_prefixed_key(self.prefix.clone(), key))?; + Ok(()) + } + + fn delete_root(&self, key: &[u8]) -> Result<(), Self::Error> { + self.transaction.delete_cf( + self.cf_roots()?, + make_prefixed_key(self.prefix.clone(), key), + )?; + Ok(()) + } + + fn delete_meta(&self, key: &[u8]) -> Result<(), Self::Error> { + Ok(self.transaction.delete_cf(self.cf_meta()?, key)?) + } + + fn get(&self, key: &[u8]) -> Result>, Self::Error> { + Ok(self + .transaction + .get(make_prefixed_key(self.prefix.clone(), key))?) + } + + fn get_aux(&self, key: &[u8]) -> Result>, Self::Error> { + Ok(self + .transaction + .get_cf(self.cf_aux()?, make_prefixed_key(self.prefix.clone(), key))?) + } + + fn get_root(&self, key: &[u8]) -> Result>, Self::Error> { + Ok(self.transaction.get_cf( + self.cf_roots()?, + make_prefixed_key(self.prefix.clone(), key), + )?) + } + + fn get_meta(&self, key: &[u8]) -> Result>, Self::Error> { + Ok(self.transaction.get_cf(self.cf_meta()?, key)?) + } +}