Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ reqwest = { version = "0.11", optional = true, default-features = false, feature
ureq = { version = "~2.2.0", features = ["json"], optional = true }
futures = { version = "0.3", optional = true }
async-trait = { version = "0.1", optional = true }
rocksdb = { version = "0.14", default-features = false, features = ["snappy"], optional = true }
rocksdb = { version = "0.19", default-features = false, features = ["snappy"], optional = true }
cc = { version = ">=1.0.64", optional = true }
socks = { version = "0.3", optional = true }
lazy_static = { version = "1.4", optional = true }
Expand Down
62 changes: 48 additions & 14 deletions src/blockchain/compact_filters/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ lazy_static! {
static ref SIGNET_GENESIS: Block = deserialize(&Vec::<u8>::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4A008F4D5FAE77031E8AD222030101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap();
}

// We can't use the question mark in `filter_map` closures because the closure returns
// an option. This is a macro that works like `?` but wraps the error in an option.
macro_rules! check_err {
($e:expr) => {
match $e {
Ok(v) => v,
Err(e) => return Some(Err(e.into())),
}
};
}

pub trait StoreType: Default + fmt::Debug {}

#[derive(Default, Debug)]
Expand Down Expand Up @@ -339,6 +350,7 @@ impl ChainStore<Full> {

let min_height = match iterator
.next()
.transpose()?
.and_then(|(k, _)| k[1..].try_into().ok())
.map(usize::from_be_bytes)
{
Expand Down Expand Up @@ -385,11 +397,12 @@ impl ChainStore<Full> {

log::debug!("Removing items");
batch.delete_range_cf(cf_handle, &from_key, &to_key);
for (_, v) in read_store.iterator_cf_opt(
for item in read_store.iterator_cf_opt(
cf_handle,
opts,
IteratorMode::From(&from_key, Direction::Forward),
) {
let (_, v) = item?;
let (header, _): (BlockHeader, Uint256) = SerializeDb::deserialize(&v)?;

batch.delete_cf(
Expand All @@ -404,7 +417,8 @@ impl ChainStore<Full> {
batch.delete_range(&from_key, &to_key);

log::debug!("Copying over new items");
for (k, v) in read_store.iterator_cf(snapshot_cf_handle, IteratorMode::Start) {
for item in read_store.iterator_cf(snapshot_cf_handle, IteratorMode::Start) {
let (k, v) = item?;
batch.put_cf(cf_handle, k, v);
}

Expand Down Expand Up @@ -489,16 +503,19 @@ impl ChainStore<Full> {
// FIXME: we have to filter manually because rocksdb sometimes returns stuff that doesn't
// have the right prefix
iterator
.filter(|(k, _)| k.starts_with(&prefix))
.map(|(k, v)| {
let height: usize = usize::from_be_bytes(
k[1..]
.filter_map(|item| {
let (k, v) = check_err!(item);

if !k.starts_with(&prefix) {
None
} else {
let height: usize = usize::from_be_bytes(check_err!(k[1..]
.try_into()
.map_err(|_| CompactFiltersError::DataCorruption)?,
);
let block = SerializeDb::deserialize(&v)?;
.map_err(|_| CompactFiltersError::DataCorruption)));
let block = check_err!(SerializeDb::deserialize(&v));

Ok((height, block))
Some(Ok((height, block)))
}
})
.collect::<Result<_, _>>()
}
Expand All @@ -514,6 +531,7 @@ impl<T: StoreType> ChainStore<T> {

Ok(iterator
.last()
.transpose()?
.map(|(_, v)| -> Result<_, CompactFiltersError> {
let (_, work): (BlockHeader, Uint256) = SerializeDb::deserialize(&v)?;

Expand All @@ -532,6 +550,7 @@ impl<T: StoreType> ChainStore<T> {

Ok(iterator
.last()
.transpose()?
.map(|(k, _)| -> Result<_, CompactFiltersError> {
let height = usize::from_be_bytes(
k[1..]
Expand All @@ -554,6 +573,7 @@ impl<T: StoreType> ChainStore<T> {

iterator
.last()
.transpose()?
.map(|(_, v)| -> Result<_, CompactFiltersError> {
let (header, _): (BlockHeader, Uint256) = SerializeDb::deserialize(&v)?;

Expand Down Expand Up @@ -690,8 +710,15 @@ impl CfStore {
// FIXME: we have to filter manually because rocksdb sometimes returns stuff that doesn't
// have the right prefix
iterator
.filter(|(k, _)| k.starts_with(&prefix))
.map(|(_, data)| BundleEntry::deserialize(&data))
.filter_map(|item| {
let (k, data) = check_err!(item);

if !k.starts_with(&prefix) {
None
} else {
Some(BundleEntry::deserialize(&data))
}
})
.collect::<Result<_, _>>()
}

Expand All @@ -704,9 +731,16 @@ impl CfStore {
// FIXME: we have to filter manually because rocksdb sometimes returns stuff that doesn't
// have the right prefix
iterator
.filter(|(k, _)| k.starts_with(&prefix))
.filter_map(|item| {
let (k, data) = check_err!(item);

if !k.starts_with(&prefix) {
None
} else {
Some(Ok(check_err!(BundleEntry::deserialize(&data)).1))
}
})
.skip(1)
.map(|(_, data)| Ok::<_, CompactFiltersError>(BundleEntry::deserialize(&data)?.1))
.collect::<Result<_, _>>()
}

Expand Down