Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 225 additions & 38 deletions grovedb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod operations;
mod subtree;
#[cfg(test)]
mod tests;
mod transaction;

use std::{collections::HashMap, path::Path, rc::Rc};

Expand All @@ -10,11 +11,16 @@ use merk::{self, Merk};
use rs_merkle::{algorithms::Sha256, Hasher, MerkleTree};
use serde::{Deserialize, Serialize};
use storage::{
rocksdb_storage::{PrefixedRocksDbStorage, PrefixedRocksDbStorageError},
Storage,
rocksdb_storage::{
OptimisticTransactionDBTransaction, PrefixedRocksDbStorage, PrefixedRocksDbStorageError,
},
Storage, Transaction,
};
pub use subtree::Element;

// use crate::transaction::GroveDbTransaction;
// pub use transaction::GroveDbTransaction;

/// A key to store serialized data about subtree prefixes to restore HADS
/// structure
const SUBTREES_SERIALIZED_KEY: &[u8] = b"subtreesSerialized";
Expand All @@ -37,6 +43,11 @@ pub enum Error {
StorageError(#[from] PrefixedRocksDbStorageError),
#[error("data corruption error: {0}")]
CorruptedData(String),
#[error(
"db is in readonly mode due to the active transaction. Please provide transaction or \
commit it"
)]
DbIsInReadonlyMode,
}

pub struct PathQuery<'a> {
Expand All @@ -63,13 +74,39 @@ pub struct GroveDb {
root_leaf_keys: HashMap<Vec<u8>, usize>,
subtrees: HashMap<Vec<u8>, Merk<PrefixedRocksDbStorage>>,
meta_storage: PrefixedRocksDbStorage,
db: Rc<storage::rocksdb_storage::DB>,
db: Rc<storage::rocksdb_storage::OptimisticTransactionDB>,
// Locks the database for writes during the transaction
is_readonly: bool,
// Temp trees used for writes during transaction
temp_root_tree: MerkleTree<Sha256>,
temp_root_leaf_keys: HashMap<Vec<u8>, usize>,
temp_subtrees: HashMap<Vec<u8>, Merk<PrefixedRocksDbStorage>>,
}

impl GroveDb {
pub fn new(
root_tree: MerkleTree<Sha256>,
root_leaf_keys: HashMap<Vec<u8>, usize>,
subtrees: HashMap<Vec<u8>, Merk<PrefixedRocksDbStorage>>,
meta_storage: PrefixedRocksDbStorage,
db: Rc<storage::rocksdb_storage::OptimisticTransactionDB>,
) -> Self {
Self {
root_tree,
root_leaf_keys,
subtrees,
meta_storage,
db,
temp_root_tree: MerkleTree::new(),
temp_root_leaf_keys: HashMap::new(),
temp_subtrees: HashMap::new(),
is_readonly: false,
}
}

pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
let db = Rc::new(
storage::rocksdb_storage::DB::open_cf_descriptors(
storage::rocksdb_storage::OptimisticTransactionDB::open_cf_descriptors(
&storage::rocksdb_storage::default_db_opts(),
path,
storage::rocksdb_storage::column_families(),
Expand Down Expand Up @@ -104,35 +141,70 @@ impl GroveDb {
HashMap::new()
};

Ok(GroveDb {
root_tree: Self::build_root_tree(&subtrees, &root_leaf_keys),
db,
subtrees,
Ok(GroveDb::new(
Self::build_root_tree(&subtrees, &root_leaf_keys),
root_leaf_keys,
subtrees,
meta_storage,
})
db,
))
}

pub fn checkpoint<P: AsRef<Path>>(&self, path: P) -> Result<GroveDb, Error> {
storage::rocksdb_storage::Checkpoint::new(&self.db)
.and_then(|x| x.create_checkpoint(&path))
.map_err(PrefixedRocksDbStorageError::RocksDbError)?;
GroveDb::open(path)
}
// TODO: Checkpoints are currently not implemented for the transactional DB
// pub fn checkpoint<P: AsRef<Path>>(&self, path: P) -> Result<GroveDb, Error> {
// // let snapshot = self.db.transaction().snapshot();
//
// storage::rocksdb_storage::Checkpoint::new(&self.db)
// .and_then(|x| x.create_checkpoint(&path))
// .map_err(PrefixedRocksDbStorageError::RocksDbError)?;
// GroveDb::open(path)
// }

fn store_subtrees_keys_data(
&self,
db_transaction: Option<&OptimisticTransactionDBTransaction>,
) -> Result<(), Error> {
let subtrees = match db_transaction {
None => &self.subtrees,
Some(_) => &self.temp_subtrees,
};

let prefixes: Vec<Vec<u8>> = subtrees.keys().map(|x| x.clone()).collect();

// TODO: make StorageOrTransaction which will has the access to either storage
// or transaction
match db_transaction {
None => {
self.meta_storage.put_meta(
SUBTREES_SERIALIZED_KEY,
&bincode::serialize(&prefixes).map_err(|_| {
Error::CorruptedData(String::from("unable to serialize prefixes"))
})?,
)?;
self.meta_storage.put_meta(
ROOT_LEAFS_SERIALIZED_KEY,
&bincode::serialize(&self.temp_root_leaf_keys).map_err(|_| {
Error::CorruptedData(String::from("unable to serialize root leafs"))
})?,
)?;
}
Some(tx) => {
let transaction = self.meta_storage.transaction(tx);
transaction.put_meta(
SUBTREES_SERIALIZED_KEY,
&bincode::serialize(&prefixes).map_err(|_| {
Error::CorruptedData(String::from("unable to serialize prefixes"))
})?,
)?;
transaction.put_meta(
ROOT_LEAFS_SERIALIZED_KEY,
&bincode::serialize(&self.root_leaf_keys).map_err(|_| {
Error::CorruptedData(String::from("unable to serialize root leafs"))
})?,
)?;
}
}

fn store_subtrees_keys_data(&self) -> Result<(), Error> {
let prefixes: Vec<Vec<u8>> = self.subtrees.keys().map(|x| x.clone()).collect();
self.meta_storage.put_meta(
SUBTREES_SERIALIZED_KEY,
&bincode::serialize(&prefixes)
.map_err(|_| Error::CorruptedData(String::from("unable to serialize prefixes")))?,
)?;
self.meta_storage.put_meta(
ROOT_LEAFS_SERIALIZED_KEY,
&bincode::serialize(&self.root_leaf_keys).map_err(|_| {
Error::CorruptedData(String::from("unable to serialize root leafs"))
})?,
)?;
Ok(())
}

Expand All @@ -151,36 +223,61 @@ impl GroveDb {
res
}

pub fn elements_iterator(&self, path: &[&[u8]]) -> Result<subtree::ElementsIterator, Error> {
let merk = self
.subtrees
pub fn elements_iterator(
&self,
path: &[&[u8]],
transaction: Option<&OptimisticTransactionDBTransaction>,
) -> Result<subtree::ElementsIterator, Error> {
let subtrees = match transaction {
None => &self.subtrees,
Some(_) => &self.temp_subtrees,
};

let merk = subtrees
.get(&Self::compress_subtree_key(path, None))
.ok_or(Error::InvalidPath("no subtree found under that path"))?;
Ok(Element::iterator(merk.raw_iter()))
}

/// Method to propagate updated subtree root hashes up to GroveDB root
fn propagate_changes(&mut self, path: &[&[u8]]) -> Result<(), Error> {
fn propagate_changes<'a: 'b, 'b>(
&'a mut self,
path: &[&[u8]],
transaction: Option<&'b <PrefixedRocksDbStorage as Storage>::DBTransaction<'b>>,
) -> Result<(), Error> {
let subtrees = match transaction {
None => &mut self.subtrees,
Some(_) => &mut self.temp_subtrees,
};

let root_leaf_keys = match transaction {
None => &mut self.root_leaf_keys,
Some(_) => &mut self.temp_root_leaf_keys,
};

let mut split_path = path.split_last();
// Go up until only one element in path, which means a key of a root tree
while let Some((key, path_slice)) = split_path {
if path_slice.is_empty() {
// Hit the root tree
self.root_tree = Self::build_root_tree(&self.subtrees, &self.root_leaf_keys);
match transaction {
None => self.root_tree = Self::build_root_tree(&subtrees, &root_leaf_keys),
Some(_) => {
self.temp_root_tree = Self::build_root_tree(&subtrees, &root_leaf_keys)
}
};
break;
} else {
let compressed_path_upper_tree = Self::compress_subtree_key(path_slice, None);
let compressed_path_subtree = Self::compress_subtree_key(path_slice, Some(key));
let subtree = self
.subtrees
let subtree = subtrees
.get(&compressed_path_subtree)
.ok_or(Error::InvalidPath("no subtree found under that path"))?;
let element = Element::Tree(subtree.root_hash());
let upper_tree = self
.subtrees
let upper_tree = subtrees
.get_mut(&compressed_path_upper_tree)
.ok_or(Error::InvalidPath("no subtree found under that path"))?;
element.insert(upper_tree, key.to_vec())?;
element.insert(upper_tree, key.to_vec(), transaction)?;
split_path = path_slice.split_last();
}
}
Expand Down Expand Up @@ -215,4 +312,94 @@ impl GroveDb {
pub fn flush(&self) -> Result<(), Error> {
Ok(self.meta_storage.flush()?)
}

/// Returns a clone of reference counter to the underlying db storage.
/// Useful when working with transactions. For more details, please
/// refer to the [`GroveDb::start_transaction`] examples section.
pub fn storage(&self) -> Rc<storage::rocksdb_storage::OptimisticTransactionDB> {
self.db.clone()
}

/// Starts database transaction. Please note that you have to start
/// underlying storage transaction manually.
///
/// ## Examples:
/// ```
/// # use grovedb::{Element, Error, GroveDb};
/// # use rs_merkle::{MerkleTree, MerkleProof, algorithms::Sha256, Hasher, utils};
/// # use std::convert::TryFrom;
/// # use tempdir::TempDir;
/// #
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// const TEST_LEAF: &[u8] = b"test_leaf";
///
/// let tmp_dir = TempDir::new("db").unwrap();
/// let mut db = GroveDb::open(tmp_dir.path())?;
/// db.insert(&[], TEST_LEAF.to_vec(), Element::empty_tree(), None)?;
///
/// let storage = db.storage();
/// let db_transaction = storage.transaction();
/// db.start_transaction();
///
/// let subtree_key = b"subtree_key".to_vec();
/// db.insert(
/// &[TEST_LEAF],
/// subtree_key.clone(),
/// Element::empty_tree(),
/// Some(&db_transaction),
/// )?;
///
/// // This action exists only inside the transaction for now
/// let result = db.get(&[TEST_LEAF], &subtree_key, None);
/// assert!(matches!(result, Err(Error::InvalidPath(_))));
///
/// // To access values inside the transaction, transaction needs to be passed to the `db::get`
/// let result_with_transaction = db.get(&[TEST_LEAF], &subtree_key, Some(&db_transaction))?;
/// assert_eq!(result_with_transaction, Element::empty_tree());
///
/// // After transaction is committed, the value from it can be accessed normally.
/// db.commit_transaction(db_transaction);
/// let result = db.get(&[TEST_LEAF], &subtree_key, None)?;
/// assert_eq!(result, Element::empty_tree());
///
/// # Ok(())
/// # }
/// ```
pub fn start_transaction(&mut self) -> Result<(), Error> {
if self.is_readonly {
return Err(Error::DbIsInReadonlyMode);
}
// Locking all writes outside of the transaction
self.is_readonly = true;

// Cloning all the trees to maintain original state before the transaction
self.temp_root_tree = self.root_tree.clone();
self.temp_root_leaf_keys = self.root_leaf_keys.clone();
self.temp_subtrees = self.subtrees.clone();

Ok(())
}

/// Commits previously started db transaction. For more details on the
/// transaction usage, please check [`GroveDb::start_transaction`]
pub fn commit_transaction(
&mut self,
db_transaction: OptimisticTransactionDBTransaction,
) -> Result<(), Error> {
// Enabling writes again
self.is_readonly = false;

// Copying all changes that were made during the transaction into the db
self.root_tree = self.temp_root_tree.clone();
self.root_leaf_keys = self.temp_root_leaf_keys.drain().collect();
self.subtrees = self.temp_subtrees.drain().collect();

// TODO: root tree actually does support transactions, so this
// code can be reworked to account for that
self.temp_root_tree = MerkleTree::new();

Ok(db_transaction
.commit()
.map_err(PrefixedRocksDbStorageError::RocksDbError)?)
}
}
Loading