Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions src/db/car/plain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@
//! - CARv2 support
//! - A wrapper that abstracts over car formats for reading.

use crate::ipld::{CidHashMap, CidHashMapEntry};
use crate::{
blocks::{Tipset, TipsetKeys},
utils::encoding::from_slice_with_fallback,
};
use ahash::HashMapExt as _;

use cid::Cid;
use fvm_ipld_blockstore::Blockstore;
Expand All @@ -76,7 +76,6 @@ use positioned_io::ReadAt;
use std::ops::DerefMut;
use std::{
any::Any,
collections::hash_map::Entry::{Occupied, Vacant},
io::{
self, BufReader,
ErrorKind::{InvalidData, UnexpectedEof, Unsupported},
Expand All @@ -86,6 +85,7 @@ use std::{
};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tracing::{debug, trace};
use CidHashMapEntry::{Occupied, Vacant};

/// **Note that all operations on this store are blocking**.
///
Expand All @@ -110,8 +110,8 @@ use tracing::{debug, trace};
/// See [module documentation](mod@self) for more.
pub struct PlainCar<ReaderT> {
reader: ReaderT,
write_cache: RwLock<ahash::HashMap<Cid, Vec<u8>>>,
index: RwLock<ahash::HashMap<Cid, UncompressedBlockDataLocation>>,
write_cache: RwLock<CidHashMap<Vec<u8>>>,
index: RwLock<CidHashMap<UncompressedBlockDataLocation>>,
roots: Vec<Cid>,
}

Expand All @@ -132,7 +132,7 @@ impl<ReaderT: super::RandomAccessFileReader> PlainCar<ReaderT> {
// now create the index
let index =
iter::from_fn(|| read_block_data_location_and_skip(&mut buf_reader).transpose())
.collect::<Result<ahash::HashMap<_, _>, _>>()?;
.collect::<Result<CidHashMap<_>, _>>()?;

match index.len() {
0 => Err(io::Error::new(
Expand All @@ -145,7 +145,7 @@ impl<ReaderT: super::RandomAccessFileReader> PlainCar<ReaderT> {
reader,
index: RwLock::new(index),
roots,
write_cache: RwLock::new(ahash::HashMap::new()),
write_cache: RwLock::new(CidHashMap::new()),
})
}
}
Expand All @@ -162,7 +162,7 @@ impl<ReaderT: super::RandomAccessFileReader> PlainCar<ReaderT> {
/// In an arbitrary order
#[cfg(test)]
pub fn cids(&self) -> Vec<Cid> {
self.index.read().keys().cloned().collect()
self.index.read().keys().collect()
}

pub fn into_dyn(self) -> PlainCar<Box<dyn super::RandomAccessFileReader>> {
Expand Down Expand Up @@ -196,10 +196,10 @@ where
{
#[tracing::instrument(level = "trace", skip(self))]
fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
match (self.index.read().get(k), self.write_cache.read().get(k)) {
match (self.index.read().get(*k), self.write_cache.read().get(*k)) {
(Some(_location), Some(_cached)) => {
trace!("evicting from write cache");
Ok(self.write_cache.write().remove(k))
Ok(self.write_cache.write().remove(*k))
}
(Some(UncompressedBlockDataLocation { offset, length }), None) => {
trace!("fetching from disk");
Expand Down Expand Up @@ -253,12 +253,12 @@ pub struct CompressedBlockDataLocation {
/// Note: This could potentially be enhanced with fine-grained read/write
/// locking, however the performance is acceptable for now.
fn handle_write_cache(
write_cache: &mut ahash::HashMap<Cid, Vec<u8>>,
index: &mut ahash::HashMap<Cid, impl Any>,
write_cache: &mut CidHashMap<Vec<u8>>,
index: &mut CidHashMap<impl Any>,
k: &Cid,
block: &[u8],
) -> anyhow::Result<()> {
match (index.get(k), write_cache.entry(*k)) {
match (index.get(*k), write_cache.entry(*k)) {
(None, Occupied(already)) => match already.get() == block {
true => {
trace!("already in cache");
Expand Down
90 changes: 86 additions & 4 deletions src/ipld/cid_hashmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use crate::utils::cid::SmallCid;
use ahash::{HashMap, HashMapExt};
use cid::Cid;
use std::collections::hash_map::{Keys, OccupiedEntry, VacantEntry};

// The size of a CID is 96 bytes. A CID contains:
// - a version
Expand Down Expand Up @@ -53,6 +54,39 @@ impl<V> Iterator for IntoIter<V> {
}
}

pub struct CidHashMapKeys<'a, V> {
keys: Keys<'a, SmallCid, V>,
}

impl<V> Iterator for CidHashMapKeys<'_, V> {
type Item = Cid;

fn next(&mut self) -> Option<Self::Item> {
self.keys.next().map(|small_cid| small_cid.cid())
}
}

pub enum CidHashMapEntry<'a, V> {
Occupied(Occupied<'a, V>),
Vacant(Vacant<'a, V>),
}

pub struct Occupied<'a, V>(OccupiedEntry<'a, SmallCid, V>);

impl<V> Occupied<'_, V> {
pub fn get(&self) -> &V {
self.0.get()
}
}

pub struct Vacant<'a, V>(VacantEntry<'a, SmallCid, V>);

impl<'a, V> Vacant<'a, V> {
pub fn insert(self, value: V) -> &'a mut V {
self.0.insert(value)
}
}

impl<V> CidHashMap<V> {
/// Creates an empty `HashMap` with CID type keys.
pub fn new() -> Self {
Expand Down Expand Up @@ -89,15 +123,31 @@ impl<V> CidHashMap<V> {
pub fn len(&self) -> usize {
self.0.len()
}

/// Gets the given key's corresponding entry in the map for in-place manipulation.
pub fn entry(&mut self, key: Cid) -> CidHashMapEntry<'_, V> {
match self.0.entry(SmallCid::from(key)) {
std::collections::hash_map::Entry::Occupied(occupied) => {
CidHashMapEntry::Occupied(Occupied(occupied))
}
std::collections::hash_map::Entry::Vacant(vacant) => {
CidHashMapEntry::Vacant(Vacant(vacant))
}
}
}

#[cfg(test)]
pub fn keys(&self) -> CidHashMapKeys<'_, V> {
CidHashMapKeys {
keys: self.0.keys(),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use cid::{
multihash::{self, MultihashDigest},
Cid,
};
use cid::multihash::{self, MultihashDigest};
use fvm_ipld_encoding::DAG_CBOR;
use quickcheck_macros::quickcheck;

Expand Down Expand Up @@ -175,6 +225,38 @@ mod tests {
assert_eq!(cid_hash_map.len(), hash_map.len());
}

#[quickcheck]
fn check_entry(cid_vector: Vec<(Cid, u64)>, cid: Cid, insert: bool) {
let (mut cid_hash_map, mut hash_map) = generate_hash_maps(cid_vector);
// Insert key half of the time to ensure equal probability of entry being occupied or vacant; occasionally the key will already be present when quickcheck generates the maps, so we also remove the key with 50% probability.
if insert {
cid_hash_map.insert(cid, 0);
hash_map.insert(cid, 0);
} else {
cid_hash_map.remove(cid);
hash_map.remove(&cid);
}
match cid_hash_map.entry(cid) {
CidHashMapEntry::Occupied(occupied) => {
assert_eq!(occupied.get(), hash_map.get(&cid).unwrap());
}
CidHashMapEntry::Vacant(_) => {
assert_eq!(cid_hash_map.get(cid), hash_map.get(&cid));
}
}
}

#[quickcheck]
fn keys(cid_vector: Vec<(Cid, u64)>) {
let (cid_hash_map, hash_map) = generate_hash_maps(cid_vector);
// Hash maps are not required to be ordered, but it is important for vectors, so sort the vectors of keys before comparing.
let mut cid_hash_map = cid_hash_map.keys().collect::<Vec<Cid>>();
cid_hash_map.sort();
let mut hash_map = hash_map.keys().cloned().collect::<Vec<Cid>>();
hash_map.sort();
assert_eq!(cid_hash_map, hash_map);
}

#[quickcheck]
fn cidhashmap_to_hashmap_to_cidhashmap(cid_vector: Vec<(Cid, u64)>) {
let (cid_hash_map, _) = generate_hash_maps(cid_vector);
Expand Down
1 change: 1 addition & 0 deletions src/ipld/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub use libipld_core::ipld::Ipld;
pub use util::*;

pub use self::cid_hashmap::CidHashMap;
pub use self::cid_hashmap::CidHashMapEntry;
pub use self::cid_hashset::CidHashSet;
pub use self::frozen_cids::FrozenCids;

Expand Down