diff --git a/Cargo.toml b/Cargo.toml index e831b78e..5e753afb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,8 +32,6 @@ flat-tree = "6" merkle-tree-stream = "0.12" pretty-hash = "0.4" rand = "0.8" -random-access-memory = "3" -random-access-storage = "5" sha2 = "0.10" futures = "0.3" crc32fast = "1" @@ -41,12 +39,28 @@ intmap = "2" moka = { version = "0.12", optional = true, features = ["sync"] } async-broadcast = { version = "0.7.1", optional = true } async-lock = {version = "3.4.0", optional = true } +futures-lite = "2.6.1" [dependencies.hypercore_schema] version = "0.2.0" -[target.'cfg(not(target_arch = "wasm32"))'.dependencies] -random-access-disk = { version = "3", default-features = false } +[dependencies.hypercore-protocol] +optional = true +path = "../protocol/" + +[dependencies.hypercore_handshake] +optional = true +path = "../handshake/" + +[dependencies.random-access-storage] +path = "../ram/storage" + +[dependencies.random-access-memory] +path = "../ram/mem/" + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.random-access-disk] +path = "../ram/disk" +default-features = false [dev-dependencies] anyhow = "1.0.70" @@ -65,11 +79,10 @@ tracing-subscriber = { version = "0.3.16", features = ["env-filter", "fmt"] } [features] default = ["tokio", "sparse", "replication", "cache"] -replication = ["dep:async-broadcast"] +replication = ["dep:async-broadcast", "dep:hypercore-protocol", "dep:hypercore_handshake"] shared-core = ["replication", "dep:async-lock"] sparse = ["random-access-disk/sparse"] tokio = ["random-access-disk/tokio"] -async-std = ["random-access-disk/async-std"] cache = ["moka"] # Used only in interoperability tests under tests/js-interop which use the javascript version of hypercore # to verify that this crate works. To run them, use: diff --git a/benches/disk.rs b/benches/disk.rs index 3292df03..60d01c69 100644 --- a/benches/disk.rs +++ b/benches/disk.rs @@ -1,7 +1,5 @@ use std::time::{Duration, Instant}; -#[cfg(feature = "async-std")] -use criterion::async_executor::AsyncStdExecutor; use criterion::{Criterion, black_box, criterion_group, criterion_main}; use hypercore::{Hypercore, HypercoreBuilder, HypercoreError, Storage}; use tempfile::Builder as TempfileBuilder; @@ -10,11 +8,6 @@ fn bench_create_disk(c: &mut Criterion) { let mut group = c.benchmark_group("slow_call"); group.measurement_time(Duration::from_secs(20)); - #[cfg(feature = "async-std")] - group.bench_function("create_disk", move |b| { - b.to_async(AsyncStdExecutor) - .iter(|| create_hypercore("create")); - }); #[cfg(feature = "tokio")] group.bench_function("create_disk", move |b| { let rt = tokio::runtime::Runtime::new().unwrap(); @@ -51,10 +44,6 @@ fn bench_write_disk(c: &mut Criterion) { let mut group = c.benchmark_group("slow_call"); group.measurement_time(Duration::from_secs(20)); - #[cfg(feature = "async-std")] - group.bench_function("write disk", |b| { - b.to_async(AsyncStdExecutor).iter_custom(write_disk); - }); #[cfg(feature = "tokio")] group.bench_function("write disk", |b| { let rt = tokio::runtime::Runtime::new().unwrap(); @@ -76,10 +65,6 @@ fn bench_read_disk(c: &mut Criterion) { let mut group = c.benchmark_group("slow_call"); group.measurement_time(Duration::from_secs(20)); - #[cfg(feature = "async-std")] - group.bench_function("read disk", |b| { - b.to_async(AsyncStdExecutor).iter_custom(read_disk); - }); #[cfg(feature = "tokio")] group.bench_function("read disk", |b| { let rt = tokio::runtime::Runtime::new().unwrap(); @@ -104,10 +89,6 @@ fn bench_clear_disk(c: &mut Criterion) { let mut group = c.benchmark_group("slow_call"); group.measurement_time(Duration::from_secs(20)); - #[cfg(feature = "async-std")] - group.bench_function("clear disk", |b| { - b.to_async(AsyncStdExecutor).iter_custom(clear_disk); - }); #[cfg(feature = "tokio")] group.bench_function("clear disk", |b| { let rt = tokio::runtime::Runtime::new().unwrap(); diff --git a/benches/memory.rs b/benches/memory.rs index ac8015a5..1cb9c73a 100644 --- a/benches/memory.rs +++ b/benches/memory.rs @@ -1,16 +1,10 @@ -use std::time::{Duration, Instant}; +use std::{sync::Arc, time::{Duration, Instant}}; -#[cfg(feature = "async-std")] -use criterion::async_executor::AsyncStdExecutor; use criterion::{Criterion, black_box, criterion_group, criterion_main}; use hypercore::{Hypercore, HypercoreBuilder, HypercoreError, Storage}; use random_access_memory::RandomAccessMemory; fn bench_create_memory(c: &mut Criterion) { - #[cfg(feature = "async-std")] - c.bench_function("create memory", |b| { - b.to_async(AsyncStdExecutor).iter(|| create_hypercore(1024)); - }); #[cfg(feature = "tokio")] c.bench_function("create memory", |b| { let rt = tokio::runtime::Runtime::new().unwrap(); @@ -25,7 +19,7 @@ async fn create_hypercore(page_size: usize) -> Result let storage = Storage::open( |_| { Box::pin(async move { - Ok(Box::new(RandomAccessMemory::new(page_size)) as Box) + Ok(Arc::new(RandomAccessMemory::new(page_size)) as Arc) }) }, false, @@ -44,7 +38,7 @@ async fn create_hypercore(page_size: usize) -> Result let storage = Storage::open( |_| { Box::pin(async move { - Ok(Box::new(RandomAccessMemory::new(page_size)) as Box) + Ok(Arc::new(RandomAccessMemory::new(page_size)) as Arc) }) }, false, @@ -54,10 +48,6 @@ async fn create_hypercore(page_size: usize) -> Result } fn bench_write_memory(c: &mut Criterion) { - #[cfg(feature = "async-std")] - c.bench_function("write memory", |b| { - b.to_async(AsyncStdExecutor).iter_custom(write_memory); - }); #[cfg(feature = "tokio")] c.bench_function("write memory", |b| { let rt = tokio::runtime::Runtime::new().unwrap(); @@ -76,10 +66,6 @@ async fn write_memory(iters: u64) -> Duration { } fn bench_read_memory(c: &mut Criterion) { - #[cfg(feature = "async-std")] - c.bench_function("read memory", |b| { - b.to_async(AsyncStdExecutor).iter_custom(read_memory); - }); #[cfg(feature = "tokio")] c.bench_function("read memory", |b| { let rt = tokio::runtime::Runtime::new().unwrap(); @@ -101,10 +87,6 @@ async fn read_memory(iters: u64) -> Duration { } fn bench_clear_memory(c: &mut Criterion) { - #[cfg(feature = "async-std")] - c.bench_function("clear memory", |b| { - b.to_async(AsyncStdExecutor).iter_custom(clear_memory); - }); #[cfg(feature = "tokio")] c.bench_function("clear memory", |b| { let rt = tokio::runtime::Runtime::new().unwrap(); diff --git a/examples/disk.rs b/examples/disk.rs index d99b7a10..4bb67446 100644 --- a/examples/disk.rs +++ b/examples/disk.rs @@ -1,5 +1,3 @@ -#[cfg(feature = "async-std")] -use async_std::main as async_main; use hypercore::{HypercoreBuilder, HypercoreError, Storage}; use tempfile::Builder; #[cfg(feature = "tokio")] diff --git a/examples/memory.rs b/examples/memory.rs index a510ed6d..d61362c6 100644 --- a/examples/memory.rs +++ b/examples/memory.rs @@ -1,5 +1,3 @@ -#[cfg(feature = "async-std")] -use async_std::main as async_main; use hypercore::{HypercoreBuilder, HypercoreError, Storage}; #[cfg(feature = "tokio")] use tokio::main as async_main; diff --git a/examples/replication.rs b/examples/replication.rs index f2943796..01f8270e 100644 --- a/examples/replication.rs +++ b/examples/replication.rs @@ -1,5 +1,3 @@ -#[cfg(feature = "async-std")] -use async_std::main as async_main; use hypercore::{Hypercore, HypercoreBuilder, HypercoreError, PartialKeypair, Storage}; use hypercore_schema::{RequestBlock, RequestUpgrade}; use tempfile::Builder; diff --git a/src/common/error.rs b/src/common/error.rs index 89ec0b37..dc53b400 100644 --- a/src/common/error.rs +++ b/src/common/error.rs @@ -1,4 +1,5 @@ use compact_encoding::EncodingError; +use random_access_storage::RandomAccessError; use thiserror::Error; use crate::Store; @@ -58,6 +59,10 @@ pub enum HypercoreError { #[source] source: std::io::Error, }, + + #[cfg(feature = "replication")] + #[error("hypercore_protocol Error")] + Protocol(#[from] hypercore_protocol::Error), } impl From for HypercoreError { @@ -76,3 +81,33 @@ impl From for HypercoreError { } } } + +impl From for HypercoreError { + fn from(value: RandomAccessError) -> Self { + map_random_access_err(value) + } +} + +pub(crate) fn map_random_access_err(err: RandomAccessError) -> HypercoreError { + match err { + RandomAccessError::IO { + return_code, + context, + source, + } => HypercoreError::IO { + context: Some(format!( + "RandomAccess IO error. Context: {context:?}, return_code: {return_code:?}", + )), + source, + }, + RandomAccessError::OutOfBounds { + offset, + end, + length, + } => HypercoreError::InvalidOperation { + context: format!( + "RandomAccess out of bounds. Offset: {offset}, end: {end:?}, length: {length}", + ), + }, + } +} diff --git a/src/core/inner.rs b/src/core/inner.rs new file mode 100644 index 00000000..91a34aa2 --- /dev/null +++ b/src/core/inner.rs @@ -0,0 +1,822 @@ +use std::{ + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +use ed25519_dalek::Signature; +use futures::future::Either; +use random_access_storage::BoxFuture; +use std::sync::Mutex; +use tracing::instrument; + +use crate::{ + bitfield::Bitfield, + common::{BitfieldUpdate, HypercoreError, NodeByteRange, StoreInfo, ValuelessProof}, + crypto::{PartialKeypair, generate_signing_key}, + data::BlockStore, + oplog::{Header, MAX_OPLOG_ENTRIES_BYTE_SIZE, Oplog}, + storage::Storage, + tree::{MerkleTree, MerkleTreeChangeset}, +}; +use hypercore_schema::{Proof, RequestBlock, RequestSeek, RequestUpgrade}; + +use super::{AppendOutcome, HypercoreOptions, Info}; + +#[derive(Debug)] +pub(crate) struct HypercoreInnerInner { + pub(crate) key_pair: PartialKeypair, + pub(crate) storage: Storage, + pub(crate) oplog: Oplog, + pub(crate) tree: MerkleTree, + pub(crate) block_store: BlockStore, + pub(crate) bitfield: Bitfield, + pub(crate) skip_flush_count: u8, + pub(crate) header: Header, + #[cfg(feature = "replication")] + pub(crate) events: crate::replication::events::Events, +} + +impl HypercoreInnerInner { + pub(crate) async fn new( + storage: Storage, + mut options: HypercoreOptions, + ) -> Result { + let key_pair: Option = if options.open { + if options.key_pair.is_some() { + return Err(HypercoreError::BadArgument { + context: "Key pair can not be used when building an openable hypercore" + .to_string(), + }); + } + None + } else { + Some(options.key_pair.take().unwrap_or_else(|| { + let signing_key = generate_signing_key(); + PartialKeypair { + public: signing_key.verifying_key(), + secret: Some(signing_key), + } + })) + }; + + // Open/create oplog + let mut oplog_open_outcome = match Oplog::open(&key_pair, None)? { + Either::Right(value) => value, + Either::Left(instruction) => { + let info = storage.read_info(instruction).await?; + match Oplog::open(&key_pair, Some(info))? { + Either::Right(value) => value, + Either::Left(_) => { + return Err(HypercoreError::InvalidOperation { + context: "Could not open oplog".to_string(), + }); + } + } + } + }; + storage + .flush_infos(Vec::from(oplog_open_outcome.infos_to_flush)) + .await?; + + // Open/create tree + let mut tree = match MerkleTree::open( + &oplog_open_outcome.header.tree, + None, + #[cfg(feature = "cache")] + &options.node_cache_options, + )? { + Either::Right(value) => value, + Either::Left(instructions) => { + let infos = storage.read_infos(Vec::from(instructions)).await?; + match MerkleTree::open( + &oplog_open_outcome.header.tree, + Some(&infos), + #[cfg(feature = "cache")] + &options.node_cache_options, + )? { + Either::Right(value) => value, + Either::Left(_) => { + return Err(HypercoreError::InvalidOperation { + context: "Could not open tree".to_string(), + }); + } + } + } + }; + + // Create block store instance + let block_store = BlockStore::default(); + + // Open bitfield + let mut bitfield = match Bitfield::open(None) { + Either::Right(value) => value, + Either::Left(instruction) => { + let info = storage.read_info(instruction).await?; + match Bitfield::open(Some(info)) { + Either::Right(value) => value, + Either::Left(instruction) => { + let info = storage.read_info(instruction).await?; + match Bitfield::open(Some(info)) { + Either::Right(value) => value, + Either::Left(_) => { + return Err(HypercoreError::InvalidOperation { + context: "Could not open bitfield".to_string(), + }); + } + } + } + } + } + }; + + // Process entries stored only to the oplog and not yet flushed into bitfield or tree + if let Some(entries) = oplog_open_outcome.entries { + for entry in entries.iter() { + for node in &entry.tree_nodes { + tree.add_node(node.clone()); + } + + if let Some(bitfield_update) = &entry.bitfield { + bitfield.update(bitfield_update); + update_contiguous_length( + &mut oplog_open_outcome.header, + &bitfield, + bitfield_update, + ); + } + if let Some(tree_upgrade) = &entry.tree_upgrade { + let mut changeset = + match tree.truncate(tree_upgrade.length, tree_upgrade.fork, None)? { + Either::Right(value) => value, + Either::Left(instructions) => { + let infos = storage.read_infos(Vec::from(instructions)).await?; + match tree.truncate( + tree_upgrade.length, + tree_upgrade.fork, + Some(&infos), + )? { + Either::Right(value) => value, + Either::Left(_) => { + return Err(HypercoreError::InvalidOperation { + context: format!( + "Could not truncate tree to length {}", + tree_upgrade.length + ), + }); + } + } + } + }; + changeset.ancestors = tree_upgrade.ancestors; + changeset.hash = Some(changeset.hash()); + changeset.signature = + Some(Signature::try_from(&*tree_upgrade.signature).map_err(|_| { + HypercoreError::InvalidSignature { + context: "Could not parse changeset signature".to_string(), + } + })?); + + oplog_open_outcome.oplog.update_header_with_changeset( + &changeset, + None, + &mut oplog_open_outcome.header, + )?; + + tree.commit(changeset)?; + } + } + } + + let oplog = oplog_open_outcome.oplog; + let header = oplog_open_outcome.header; + let key_pair = header.key_pair.clone(); + + Ok(Self { + key_pair, + storage, + oplog, + tree, + block_store, + bitfield, + header, + skip_flush_count: 0, + #[cfg(feature = "replication")] + events: crate::replication::events::Events::new(), + }) + } + + pub(crate) fn info(&self) -> Info { + Info { + length: self.tree.length, + byte_length: self.tree.byte_length, + contiguous_length: self.header.hints.contiguous_length, + fork: self.tree.fork, + writeable: self.key_pair.secret.is_some(), + } + } + + pub(crate) fn key_pair(&self) -> &PartialKeypair { + &self.key_pair + } + + #[instrument(ret, skip(self))] + pub(crate) fn has(&self, index: u64) -> bool { + self.bitfield.get(index) + } + + #[cfg(feature = "replication")] + pub(crate) fn event_subscribe( + &self, + ) -> async_broadcast::Receiver { + self.events.channel.new_receiver() + } + + pub(crate) fn append_outcome(&self) -> AppendOutcome { + AppendOutcome { + length: self.tree.length, + byte_length: self.tree.byte_length, + } + } + + pub(crate) fn should_flush_bitfield_and_tree_and_oplog(&mut self) -> bool { + if self.skip_flush_count == 0 + || self.oplog.entries_byte_length >= MAX_OPLOG_ENTRIES_BYTE_SIZE + { + self.skip_flush_count = 3; + true + } else { + self.skip_flush_count -= 1; + false + } + } + + pub(crate) fn flush_bitfield_and_tree_and_oplog( + &mut self, + clear_traces: bool, + ) -> BoxFuture> { + let mut infos = vec![]; + infos.extend(self.bitfield.flush()); + infos.extend(self.tree.flush()); + match self.oplog.flush(&self.header, clear_traces) { + Ok(opinfo) => infos.extend(opinfo), + Err(e) => return Box::pin(async { Err(e) }), + } + + self.storage.flush_infos(infos) + } + + pub(crate) async fn verify_proof( + &self, + proof: &Proof, + ) -> Result { + match self.tree.verify_proof(proof, &self.key_pair.public, None)? { + Either::Right(value) => Ok(value), + Either::Left(instructions) => { + let infos = self + .storage + .read_infos_to_vec(Vec::from(instructions)) + .await?; + match self + .tree + .verify_proof(proof, &self.key_pair.public, Some(&infos))? + { + Either::Right(value) => Ok(value), + Either::Left(_) => Err(HypercoreError::InvalidOperation { + context: "Could not verify proof from tree".to_string(), + }), + } + } + } + } + + #[instrument(err, skip(self))] + pub(crate) async fn missing_nodes_from_merkle_tree_index( + &self, + merkle_tree_index: u64, + ) -> Result { + match self.tree.missing_nodes(merkle_tree_index, None)? { + Either::Right(value) => Ok(value), + Either::Left(instructions) => { + let mut instructions = instructions; + let mut infos: Vec = vec![]; + loop { + infos.extend( + self.storage + .read_infos_to_vec(Vec::from(instructions)) + .await?, + ); + match self.tree.missing_nodes(merkle_tree_index, Some(&infos))? { + Either::Right(value) => { + return Ok(value); + } + Either::Left(new_instructions) => { + instructions = new_instructions; + } + } + } + } + } + } + + pub(crate) async fn byte_range( + &self, + index: u64, + initial_infos: Option<&[StoreInfo]>, + ) -> Result { + match self.tree.byte_range(index, initial_infos)? { + Either::Right(value) => Ok(value), + Either::Left(instructions) => { + let mut instructions = instructions; + let mut infos: Vec = vec![]; + loop { + infos.extend( + self.storage + .read_infos_to_vec(Vec::from(instructions)) + .await?, + ); + match self.tree.byte_range(index, Some(&infos))? { + Either::Right(value) => { + return Ok(value); + } + Either::Left(new_instructions) => { + instructions = new_instructions; + } + } + } + } + } + } + + pub(crate) async fn create_valueless_proof( + &self, + block: Option, + hash: Option, + seek: Option, + upgrade: Option, + ) -> Result { + match self.tree.create_valueless_proof( + block.as_ref(), + hash.as_ref(), + seek.as_ref(), + upgrade.as_ref(), + None, + )? { + Either::Right(value) => Ok(value), + Either::Left(instructions) => { + let mut instructions = instructions; + let mut infos: Vec = vec![]; + loop { + infos.extend( + self.storage + .read_infos_to_vec(Vec::from(instructions)) + .await?, + ); + match self.tree.create_valueless_proof( + block.as_ref(), + hash.as_ref(), + seek.as_ref(), + upgrade.as_ref(), + Some(&infos), + )? { + Either::Right(value) => { + return Ok(value); + } + Either::Left(new_instructions) => { + instructions = new_instructions; + } + } + } + } + } + } +} + +#[derive(Debug)] +pub(crate) struct HypercoreInner { + pub(crate) inner: Arc>, +} + +impl HypercoreInner { + pub(crate) async fn new( + storage: Storage, + options: HypercoreOptions, + ) -> Result { + Ok(Self { + inner: Arc::new(Mutex::new( + HypercoreInnerInner::new(storage, options).await?, + )), + }) + } + pub(crate) fn info(&self) -> Info { + self.inner.lock().unwrap().info() + } + pub(crate) fn key_pair(&self) -> PartialKeypair { + self.inner.lock().unwrap().key_pair().clone() + } + + pub(crate) fn has(&self, index: u64) -> bool { + self.inner.lock().unwrap().has(index) + } + + #[cfg(feature = "replication")] + pub(crate) fn event_subscribe( + &self, + ) -> async_broadcast::Receiver { + self.inner.lock().unwrap().event_subscribe() + } + pub(crate) fn append_outcome(&self) -> AppendOutcome { + self.inner.lock().unwrap().append_outcome() + } + pub(crate) fn should_flush_bitfield_and_tree_and_oplog(&self) -> bool { + self.inner + .lock() + .unwrap() + .should_flush_bitfield_and_tree_and_oplog() + } + pub(crate) fn flush_bitfield_and_tree_and_oplog( + &self, + clear_traces: bool, + ) -> BoxFuture> { + self.inner + .lock() + .unwrap() + .flush_bitfield_and_tree_and_oplog(clear_traces) + } + + pub(crate) fn verify_proof(&self, proof: Proof) -> VerifyProofFuture { + VerifyProofFuture { + inner: self.inner.clone(), + proof, + infos: None, + pending_read: None, + } + } + pub(crate) fn missing_nodes_from_merkle_tree_index( + &self, + merkle_tree_index: u64, + ) -> MissingNodesFuture { + MissingNodesFuture { + inner: self.inner.clone(), + merkle_tree_index, + infos: Vec::new(), + pending_read: None, + } + } + pub(crate) fn byte_range(&self, index: u64, initial_infos: Vec) -> ByteRangeFuture { + ByteRangeFuture { + inner: self.inner.clone(), + index, + infos: initial_infos, + pending_read: None, + } + } + + pub(crate) fn get(&self, index: u64) -> GetFuture { + GetFuture { + inner: self.inner.clone(), + index, + byte_range_fut: None, + byte_range: None, + block_read_fut: None, + } + } + + pub(crate) fn create_valueless_proof( + &self, + block: Option, + hash: Option, + seek: Option, + upgrade: Option, + ) -> ValuelessProofFuture { + ValuelessProofFuture { + inner: self.inner.clone(), + block, + hash, + seek, + upgrade, + infos: Vec::new(), + pending_read: None, + } + } +} + +pub(crate) struct GetFuture { + inner: Arc>, + index: u64, + // Phase 1: resolve byte range + byte_range_fut: Option, + // Phase 2: read block from storage (only needed if block_store has no cached value) + byte_range: Option, + block_read_fut: Option>>, +} + +impl Future for GetFuture { + type Output = Result>, HypercoreError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + // TODO: we really need to generalize the Either response stack + loop { + // Phase 2: storage read for block data (highest priority when active). + if let Some(fut) = this.block_read_fut.as_mut() { + match fut.as_mut().poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Ready(Ok(info)) => { + this.block_read_fut = None; + let inner = this.inner.lock().unwrap(); + let byte_range = this.byte_range.as_ref().unwrap(); + return match inner.block_store.read(byte_range, Some(info)) { + Either::Right(data) => Poll::Ready(Ok(Some(data))), + Either::Left(_) => Poll::Ready(Err(HypercoreError::InvalidOperation { + context: "Could not read block storage range".to_string(), + })), + }; + } + } + } + + // Phase 1: resolve the byte range. + if let Some(fut) = this.byte_range_fut.as_mut() { + match Pin::new(fut).poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Ready(Ok(byte_range)) => { + this.byte_range_fut = None; + let inner = this.inner.lock().unwrap(); + match inner.block_store.read(&byte_range, None) { + Either::Right(data) => return Poll::Ready(Ok(Some(data))), + Either::Left(instruction) => { + let storage = inner.storage.clone(); + this.block_read_fut = Some(storage.read_info(instruction)); + this.byte_range = Some(byte_range); + // Loop to poll block_read_fut immediately. + } + } + } + } + continue; + } + + // Initial: check bitfield, then start byte range resolution. + { + let inner = this.inner.lock().unwrap(); + if !inner.bitfield.get(this.index) { + #[cfg(feature = "replication")] + inner.events.send_on_get(this.index); + return Poll::Ready(Ok(None)); + } + } + this.byte_range_fut = Some(ByteRangeFuture { + inner: this.inner.clone(), + index: this.index, + infos: Vec::new(), + pending_read: None, + }); + // Loop to poll byte_range_fut immediately. + } + } +} + +pub(crate) struct MissingNodesFuture { + inner: Arc>, + merkle_tree_index: u64, + infos: Vec, + pending_read: Option, HypercoreError>>>, +} + +impl Future for MissingNodesFuture { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + loop { + if let Some(fut) = this.pending_read.as_mut() { + match fut.as_mut().poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Ready(Ok(new_infos)) => { + this.infos.extend(new_infos); + this.pending_read = None; + } + } + } + + let result = { + let inner = this.inner.lock().unwrap(); + let infos_opt = if this.infos.is_empty() { + None + } else { + Some(this.infos.as_slice()) + }; + inner.tree.missing_nodes(this.merkle_tree_index, infos_opt) + // Lock is dropped here. + }; + + match result { + Err(e) => return Poll::Ready(Err(e)), + Ok(Either::Right(value)) => return Poll::Ready(Ok(value)), + Ok(Either::Left(instructions)) => { + let storage = this.inner.lock().unwrap().storage.clone(); + this.pending_read = Some(storage.read_infos_to_vec(Vec::from(instructions))); + } + } + } + } +} + +pub(crate) struct ByteRangeFuture { + inner: Arc>, + index: u64, + infos: Vec, + pending_read: Option, HypercoreError>>>, +} + +impl Future for ByteRangeFuture { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + loop { + if let Some(fut) = this.pending_read.as_mut() { + match fut.as_mut().poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Ready(Ok(new_infos)) => { + this.infos.extend(new_infos); + this.pending_read = None; + } + } + } + + let result = { + let inner = this.inner.lock().unwrap(); + let infos_opt = if this.infos.is_empty() { + None + } else { + Some(this.infos.as_slice()) + }; + inner.tree.byte_range(this.index, infos_opt) + // Lock is dropped here. + }; + + match result { + Err(e) => return Poll::Ready(Err(e)), + Ok(Either::Right(value)) => return Poll::Ready(Ok(value)), + Ok(Either::Left(instructions)) => { + let storage = this.inner.lock().unwrap().storage.clone(); + this.pending_read = Some(storage.read_infos_to_vec(Vec::from(instructions))); + } + } + } + } +} + +pub(crate) struct VerifyProofFuture { + inner: Arc>, + proof: Proof, + // None = first attempt (no read done yet), Some = read completed + infos: Option>, + pending_read: Option, HypercoreError>>>, +} + +impl Future for VerifyProofFuture { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + loop { + // Phase 1: if there's a pending storage read, drive it to completion. + if let Some(fut) = this.pending_read.as_mut() { + match fut.as_mut().poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Ready(Ok(infos)) => { + this.infos = Some(infos); + this.pending_read = None; + // Fall through to retry verify_proof. + } + } + } + + // Phase 2: call tree.verify_proof synchronously under the lock. + let result = { + let inner = this.inner.lock().unwrap(); + let public_key = inner.key_pair.public; + let infos_opt = this.infos.as_deref(); + inner.tree.verify_proof(&this.proof, &public_key, infos_opt) + // Lock is dropped here. + }; + + match result { + Err(e) => return Poll::Ready(Err(e)), + Ok(Either::Right(value)) => return Poll::Ready(Ok(value)), + Ok(Either::Left(_)) if this.infos.is_some() => { + // We already read infos and still got Left — the proof can't + // be satisfied, which is an error. + return Poll::Ready(Err(HypercoreError::InvalidOperation { + context: "Could not verify proof from tree".to_string(), + })); + } + Ok(Either::Left(instructions)) => { + let storage = this.inner.lock().unwrap().storage.clone(); + this.pending_read = Some(storage.read_infos_to_vec(Vec::from(instructions))); + // Loop to poll the new future immediately. + } + } + } + } +} + +pub(crate) struct ValuelessProofFuture { + inner: Arc>, + block: Option, + hash: Option, + seek: Option, + upgrade: Option, + infos: Vec, + pending_read: Option, HypercoreError>>>, +} + +impl Future for ValuelessProofFuture { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // ValuelessProofFuture is Unpin (all fields are Unpin), so this is safe. + let this = self.get_mut(); + + loop { + // Phase 1: if there's a pending storage read, drive it to completion. + if let Some(fut) = this.pending_read.as_mut() { + match fut.as_mut().poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Ready(Ok(new_infos)) => { + this.infos.extend(new_infos); + this.pending_read = None; + // Fall through to retry create_valueless_proof. + } + } + } + + // Phase 2: call tree.create_valueless_proof synchronously under the lock. + let result = { + let inner = this.inner.lock().unwrap(); + let infos_opt = if this.infos.is_empty() { + None + } else { + Some(this.infos.as_slice()) + }; + inner.tree.create_valueless_proof( + this.block.as_ref(), + this.hash.as_ref(), + this.seek.as_ref(), + this.upgrade.as_ref(), + infos_opt, + ) + // Lock is dropped here. + }; + + match result { + Err(e) => return Poll::Ready(Err(e)), + Ok(Either::Right(value)) => return Poll::Ready(Ok(value)), + Ok(Either::Left(instructions)) => { + // Need more nodes from storage. Clone storage (cheap Arc clone) + // outside the lock so we don't hold it across the async read. + let storage = this.inner.lock().unwrap().storage.clone(); + this.pending_read = Some(storage.read_infos_to_vec(Vec::from(instructions))); + // Loop to poll the new future immediately. + } + } + } + } +} + +pub(crate) fn update_contiguous_length( + header: &mut Header, + bitfield: &Bitfield, + bitfield_update: &BitfieldUpdate, +) { + let end = bitfield_update.start + bitfield_update.length; + let mut c = header.hints.contiguous_length; + if bitfield_update.drop { + if c <= end && c > bitfield_update.start { + c = bitfield_update.start; + } + } else if c <= end && c >= bitfield_update.start { + c = end; + while bitfield.get(c) { + c += 1; + } + } + + if c != header.hints.contiguous_length { + header.hints.contiguous_length = c; + } +} diff --git a/src/core.rs b/src/core/mod.rs similarity index 50% rename from src/core.rs rename to src/core/mod.rs index 3adc9a50..6dfd4bd1 100644 --- a/src/core.rs +++ b/src/core/mod.rs @@ -1,24 +1,24 @@ //! Hypercore's main abstraction. Exposes an append-only, secure log structure. -use ed25519_dalek::Signature; +mod inner; + use futures::future::Either; -use std::convert::TryFrom; -use std::fmt::Debug; +#[cfg(feature = "replication")] +use std::sync::Mutex; use tracing::instrument; #[cfg(feature = "cache")] use crate::common::cache::CacheOptions; use crate::{ - bitfield::Bitfield, - common::{BitfieldUpdate, HypercoreError, NodeByteRange, StoreInfo, ValuelessProof}, - crypto::{PartialKeypair, generate_signing_key}, - data::BlockStore, - oplog::{Header, MAX_OPLOG_ENTRIES_BYTE_SIZE, Oplog}, + common::{BitfieldUpdate, HypercoreError, StoreInfo}, + core::inner::HypercoreInnerInner, + crypto::PartialKeypair, storage::Storage, - tree::{MerkleTree, MerkleTreeChangeset}, }; - use hypercore_schema::{Proof, RequestBlock, RequestSeek, RequestUpgrade}; +pub(crate) use inner::HypercoreInner; +use inner::update_contiguous_length; + #[derive(Debug)] pub(crate) struct HypercoreOptions { pub(crate) key_pair: Option, @@ -38,19 +38,18 @@ impl HypercoreOptions { } } +macro_rules! ininner { + ($self:expr) => { + $self.inner.inner.lock().unwrap() + }; +} + /// Hypercore is an append-only log structure. #[derive(Debug)] pub struct Hypercore { - pub(crate) key_pair: PartialKeypair, - pub(crate) storage: Storage, - pub(crate) oplog: Oplog, - pub(crate) tree: MerkleTree, - pub(crate) block_store: BlockStore, - pub(crate) bitfield: Bitfield, - skip_flush_count: u8, // autoFlush in Javascript - header: Header, + pub(crate) inner: HypercoreInner, #[cfg(feature = "replication")] - events: crate::replication::events::Events, + pub(crate) peers: Vec>, } /// Response from append, matches that of the Javascript result @@ -81,189 +80,19 @@ pub struct Info { impl Hypercore { /// Creates/opens new hypercore using given storage and options pub(crate) async fn new( - mut storage: Storage, - mut options: HypercoreOptions, + storage: Storage, + options: HypercoreOptions, ) -> Result { - let key_pair: Option = if options.open { - if options.key_pair.is_some() { - return Err(HypercoreError::BadArgument { - context: "Key pair can not be used when building an openable hypercore" - .to_string(), - }); - } - None - } else { - Some(options.key_pair.take().unwrap_or_else(|| { - let signing_key = generate_signing_key(); - PartialKeypair { - public: signing_key.verifying_key(), - secret: Some(signing_key), - } - })) - }; - - // Open/create oplog - let mut oplog_open_outcome = match Oplog::open(&key_pair, None)? { - Either::Right(value) => value, - Either::Left(instruction) => { - let info = storage.read_info(instruction).await?; - match Oplog::open(&key_pair, Some(info))? { - Either::Right(value) => value, - Either::Left(_) => { - return Err(HypercoreError::InvalidOperation { - context: "Could not open oplog".to_string(), - }); - } - } - } - }; - storage - .flush_infos(&oplog_open_outcome.infos_to_flush) - .await?; - - // Open/create tree - let mut tree = match MerkleTree::open( - &oplog_open_outcome.header.tree, - None, - #[cfg(feature = "cache")] - &options.node_cache_options, - )? { - Either::Right(value) => value, - Either::Left(instructions) => { - let infos = storage.read_infos(&instructions).await?; - match MerkleTree::open( - &oplog_open_outcome.header.tree, - Some(&infos), - #[cfg(feature = "cache")] - &options.node_cache_options, - )? { - Either::Right(value) => value, - Either::Left(_) => { - return Err(HypercoreError::InvalidOperation { - context: "Could not open tree".to_string(), - }); - } - } - } - }; - - // Create block store instance - let block_store = BlockStore::default(); - - // Open bitfield - let mut bitfield = match Bitfield::open(None) { - Either::Right(value) => value, - Either::Left(instruction) => { - let info = storage.read_info(instruction).await?; - match Bitfield::open(Some(info)) { - Either::Right(value) => value, - Either::Left(instruction) => { - let info = storage.read_info(instruction).await?; - match Bitfield::open(Some(info)) { - Either::Right(value) => value, - Either::Left(_) => { - return Err(HypercoreError::InvalidOperation { - context: "Could not open bitfield".to_string(), - }); - } - } - } - } - } - }; - - // Process entries stored only to the oplog and not yet flushed into bitfield or tree - if let Some(entries) = oplog_open_outcome.entries { - for entry in entries.iter() { - for node in &entry.tree_nodes { - tree.add_node(node.clone()); - } - - if let Some(bitfield_update) = &entry.bitfield { - bitfield.update(bitfield_update); - update_contiguous_length( - &mut oplog_open_outcome.header, - &bitfield, - bitfield_update, - ); - } - if let Some(tree_upgrade) = &entry.tree_upgrade { - // TODO: Generalize Either response stack - let mut changeset = - match tree.truncate(tree_upgrade.length, tree_upgrade.fork, None)? { - Either::Right(value) => value, - Either::Left(instructions) => { - let infos = storage.read_infos(&instructions).await?; - match tree.truncate( - tree_upgrade.length, - tree_upgrade.fork, - Some(&infos), - )? { - Either::Right(value) => value, - Either::Left(_) => { - return Err(HypercoreError::InvalidOperation { - context: format!( - "Could not truncate tree to length {}", - tree_upgrade.length - ), - }); - } - } - } - }; - changeset.ancestors = tree_upgrade.ancestors; - changeset.hash = Some(changeset.hash()); - changeset.signature = - Some(Signature::try_from(&*tree_upgrade.signature).map_err(|_| { - HypercoreError::InvalidSignature { - context: "Could not parse changeset signature".to_string(), - } - })?); - - // Update the header with this changeset to make in-memory value match that - // of the stored value. - oplog_open_outcome.oplog.update_header_with_changeset( - &changeset, - None, - &mut oplog_open_outcome.header, - )?; - - // TODO: Skip reorg hints for now, seems to only have to do with replication - // addReorgHint(header.hints.reorgs, tree, batch) - - // Commit changeset to in-memory tree - tree.commit(changeset)?; - } - } - } - - let oplog = oplog_open_outcome.oplog; - let header = oplog_open_outcome.header; - let key_pair = header.key_pair.clone(); - Ok(Hypercore { - key_pair, - storage, - oplog, - tree, - block_store, - bitfield, - header, - skip_flush_count: 0, + inner: HypercoreInner::new(storage, options).await?, #[cfg(feature = "replication")] - events: crate::replication::events::Events::new(), + peers: Default::default(), }) } /// Gets basic info about the Hypercore pub fn info(&self) -> Info { - Info { - length: self.tree.length, - byte_length: self.tree.byte_length, - contiguous_length: self.header.hints.contiguous_length, - fork: self.tree.fork, - writeable: self.key_pair.secret.is_some(), - } + self.inner.info() } /// Appends a data slice to the hypercore. @@ -278,120 +107,99 @@ impl Hypercore { &mut self, batch: B, ) -> Result { - let secret_key = match &self.key_pair.secret { + let secret_key = match self.inner.key_pair().secret { Some(key) => key, None => return Err(HypercoreError::NotWritable), }; - if !batch.as_ref().is_empty() { - // Create a changeset for the tree - let mut changeset = self.tree.changeset(); - let mut batch_length: usize = 0; - for data in batch.as_ref().iter() { - batch_length += changeset.append(data.as_ref()); - } - changeset.hash_and_sign(secret_key); + if batch.as_ref().is_empty() { + return Ok(self.inner.append_outcome()); + } + // Create a changeset for the tree + let mut changeset = ininner!(self).tree.changeset(); + let mut batch_length: usize = 0; + for data in batch.as_ref().iter() { + batch_length += changeset.append(data.as_ref()); + } + changeset.hash_and_sign(&secret_key); - // Write the received data to the block store - let info = - self.block_store - .append_batch(batch.as_ref(), batch_length, self.tree.byte_length); - self.storage.flush_info(info).await?; + // Write the received data to the block store + let byte_length = ininner!(self).tree.byte_length; + let info = + ininner!(self) + .block_store + .append_batch(batch.as_ref(), batch_length, byte_length); + { ininner!(self).storage.flush_info(info) }.await?; - // Append the changeset to the Oplog - let bitfield_update = BitfieldUpdate { - drop: false, - start: changeset.ancestors, - length: changeset.batch_length, - }; - let outcome = self.oplog.append_changeset( - &changeset, - Some(bitfield_update.clone()), - false, - &self.header, - )?; - self.storage.flush_infos(&outcome.infos_to_flush).await?; - self.header = outcome.header; - - // Write to bitfield - self.bitfield.update(&bitfield_update); + // Append the changeset to the Oplog + let bitfield_update = BitfieldUpdate { + drop: false, + start: changeset.ancestors, + length: changeset.batch_length, + }; + let outcome = { + let HypercoreInnerInner { oplog, header, .. } = &mut *ininner!(self); + oplog.append_changeset(&changeset, Some(bitfield_update.clone()), false, header)? + }; + { + ininner!(self) + .storage + .flush_infos(Vec::from(outcome.infos_to_flush)) + } + .await?; + ininner!(self).header = outcome.header; - // Contiguous length is known only now - update_contiguous_length(&mut self.header, &self.bitfield, &bitfield_update); + // Write to bitfield + ininner!(self).bitfield.update(&bitfield_update); - // Commit changeset to in-memory tree - self.tree.commit(changeset)?; + // Contiguous length is known only now + { + let HypercoreInnerInner { + bitfield, header, .. + } = &mut *ininner!(self); + update_contiguous_length(header, bitfield, &bitfield_update); + } - // Now ready to flush - if self.should_flush_bitfield_and_tree_and_oplog() { - self.flush_bitfield_and_tree_and_oplog(false).await?; - } + // Commit changeset to in-memory tree + ininner!(self).tree.commit(changeset)?; - #[cfg(feature = "replication")] - { - use tracing::trace; + // Now ready to flush + if self.inner.should_flush_bitfield_and_tree_and_oplog() { + self.inner.flush_bitfield_and_tree_and_oplog(false).await?; + } - trace!(bitfield_update = ?bitfield_update, "Hppercore.append_batch emit DataUpgrade & Have"); - let _ = self.events.send(crate::replication::events::DataUpgrade {}); - let _ = self - .events - .send(crate::replication::events::Have::from(&bitfield_update)); - } + #[cfg(feature = "replication")] + { + use tracing::trace; + + trace!(bitfield_update = ?bitfield_update, "Hppercore.append_batch emit DataUpgrade & Have"); + let _ = ininner!(self) + .events + .send(crate::replication::events::DataUpgrade {}); + let _ = ininner!(self) + .events + .send(crate::replication::events::Have::from(&bitfield_update)); } - // Return the new value - Ok(AppendOutcome { - length: self.tree.length, - byte_length: self.tree.byte_length, - }) + Ok(self.inner.append_outcome()) } #[cfg(feature = "replication")] /// Subscribe to core events relevant to replication pub fn event_subscribe(&self) -> async_broadcast::Receiver { - self.events.channel.new_receiver() + self.inner.event_subscribe() } /// Check if core has the block at the given `index` locally #[instrument(ret, skip(self))] pub fn has(&self, index: u64) -> bool { - self.bitfield.get(index) + self.inner.has(index) } /// Read value at given index, if any. #[instrument(err, skip(self))] - pub async fn get(&mut self, index: u64) -> Result>, HypercoreError> { - if !self.bitfield.get(index) { - #[cfg(feature = "replication")] - // if not in this core, emit Event::Get(index) - { - use tracing::trace; - - trace!(index = index, "Hppercore emit 'get' event"); - self.events.send_on_get(index); - } - return Ok(None); - } - - let byte_range = self.byte_range(index, None).await?; - - // TODO: Generalize Either response stack - let data = match self.block_store.read(&byte_range, None) { - Either::Right(value) => value, - Either::Left(instruction) => { - let info = self.storage.read_info(instruction).await?; - match self.block_store.read(&byte_range, Some(info)) { - Either::Right(value) => value, - Either::Left(_) => { - return Err(HypercoreError::InvalidOperation { - context: "Could not read block storage range".to_string(), - }); - } - } - } - }; - - Ok(Some(data.to_vec())) + pub async fn get(&self, index: u64) -> Result>, HypercoreError> { + Ok(self.inner.get(index).await?.map(|b| b.into_vec())) } /// Clear data for entries between start and end (exclusive) indexes. @@ -402,37 +210,47 @@ impl Hypercore { return Ok(()); } // Write to oplog - let infos_to_flush = self.oplog.clear(start, end)?; - self.storage.flush_infos(&infos_to_flush).await?; + let infos_to_flush = ininner!(self).oplog.clear(start, end)?; + { + ininner!(self) + .storage + .flush_infos(Vec::from(infos_to_flush)) + } + .await?; // Set bitfield - self.bitfield.set_range(start, end - start, false); + ininner!(self).bitfield.set_range(start, end - start, false); // Set contiguous length - if start < self.header.hints.contiguous_length { - self.header.hints.contiguous_length = start; + if start < ininner!(self).header.hints.contiguous_length { + ininner!(self).header.hints.contiguous_length = start; } // Find the biggest hole that can be punched into the data - let start = if let Some(index) = self.bitfield.last_index_of(true, start) { + let start = if let Some(index) = ininner!(self).bitfield.last_index_of(true, start) { index + 1 } else { 0 }; - let end = if let Some(index) = self.bitfield.index_of(true, end) { + let end = if let Some(index) = ininner!(self).bitfield.index_of(true, end) { index } else { - self.tree.length + ininner!(self).tree.length }; // Find byte offset for first value let mut infos: Vec = Vec::new(); - let clear_offset = match self.tree.byte_offset(start, None)? { + let clear_offset = match { ininner!(self).tree.byte_offset(start, None)? } { Either::Right(value) => value, Either::Left(instructions) => { - let new_infos = self.storage.read_infos_to_vec(&instructions).await?; + let new_infos = { + ininner!(self) + .storage + .read_infos_to_vec(Vec::from(instructions)) + } + .await?; infos.extend(new_infos); - match self.tree.byte_offset(start, Some(&infos))? { + match ininner!(self).tree.byte_offset(start, Some(&infos))? { Either::Right(value) => value, Either::Left(_) => { return Err(HypercoreError::InvalidOperation { @@ -444,37 +262,38 @@ impl Hypercore { }; // Find byte range for last value - let last_byte_range = self.byte_range(end - 1, Some(&infos)).await?; + let last_byte_range = self.inner.byte_range(end - 1, infos).await?; let clear_length = (last_byte_range.index + last_byte_range.length) - clear_offset; // Clear blocks - let info_to_flush = self.block_store.clear(clear_offset, clear_length); - self.storage.flush_info(info_to_flush).await?; + let info_to_flush = ininner!(self).block_store.clear(clear_offset, clear_length); + { ininner!(self).storage.flush_info(info_to_flush) }.await?; // Now ready to flush - if self.should_flush_bitfield_and_tree_and_oplog() { - self.flush_bitfield_and_tree_and_oplog(false).await?; + if self.inner.should_flush_bitfield_and_tree_and_oplog() { + self.inner.flush_bitfield_and_tree_and_oplog(false).await?; } Ok(()) } /// Access the key pair. - pub fn key_pair(&self) -> &PartialKeypair { - &self.key_pair + pub fn key_pair(&self) -> PartialKeypair { + self.inner.key_pair() } /// Create a proof for given request #[instrument(err, skip_all)] pub async fn create_proof( - &mut self, + &self, block: Option, hash: Option, seek: Option, upgrade: Option, ) -> Result, HypercoreError> { let valueless_proof = self + .inner .create_valueless_proof(block, hash, seek, upgrade) .await?; let value: Option> = if let Some(block) = valueless_proof.block.as_ref() { @@ -494,12 +313,13 @@ impl Hypercore { /// Verify and apply proof received from peer, returns true if changed, false if not /// possible to apply. #[instrument(skip_all)] - pub async fn verify_and_apply_proof(&mut self, proof: &Proof) -> Result { - if proof.fork != self.tree.fork { + pub async fn verify_and_apply_proof(&self, proof: &Proof) -> Result { + if proof.fork != ininner!(self).tree.fork { return Ok(false); } - let changeset = self.verify_proof(proof).await?; - if !self.tree.commitable(&changeset) { + // TODO rm clone pass as owned + let changeset = self.inner.verify_proof(proof.clone()).await?; + if !ininner!(self).tree.commitable(&changeset) { return Ok(false); } @@ -507,35 +327,40 @@ impl Hypercore { // here we do only one. _verifyShared groups together many subsequent changesets into a single // oplog push, and then flushes in the end only for the whole group. let bitfield_update: Option = if let Some(block) = &proof.block.as_ref() { - let byte_offset = - match self + let byte_offset = match { + ininner!(self) .tree .byte_offset_in_changeset(block.index, &changeset, None)? - { - Either::Right(value) => value, - Either::Left(instructions) => { - let infos = self.storage.read_infos_to_vec(&instructions).await?; - match self.tree.byte_offset_in_changeset( - block.index, - &changeset, - Some(&infos), - )? { - Either::Right(value) => value, - Either::Left(_) => { - return Err(HypercoreError::InvalidOperation { - context: format!( - "Could not read offset for index {} from tree", - block.index - ), - }); - } + } { + Either::Right(value) => value, + Either::Left(instructions) => { + let infos = { + ininner!(self) + .storage + .read_infos_to_vec(Vec::from(instructions)) + } + .await?; + match ininner!(self).tree.byte_offset_in_changeset( + block.index, + &changeset, + Some(&infos), + )? { + Either::Right(value) => value, + Either::Left(_) => { + return Err(HypercoreError::InvalidOperation { + context: format!( + "Could not read offset for index {} from tree", + block.index + ), + }); } } - }; + } + }; // Write the value to the block store - let info_to_flush = self.block_store.put(&block.value, byte_offset); - self.storage.flush_info(info_to_flush).await?; + let info_to_flush = ininner!(self).block_store.put(&block.value, byte_offset); + { ininner!(self).storage.flush_info(info_to_flush) }.await?; // Return a bitfield update for the given value Some(BitfieldUpdate { @@ -549,41 +374,51 @@ impl Hypercore { }; // Append the changeset to the Oplog - let outcome = self.oplog.append_changeset( - &changeset, - bitfield_update.clone(), - false, - &self.header, - )?; - self.storage.flush_infos(&outcome.infos_to_flush).await?; - self.header = outcome.header; + let outcome = { + let HypercoreInnerInner { oplog, header, .. } = &mut *ininner!(self); + oplog.append_changeset(&changeset, bitfield_update.clone(), false, header)? + }; + { + ininner!(self) + .storage + .flush_infos(Vec::from(outcome.infos_to_flush)) + } + .await?; + ininner!(self).header = outcome.header; if let Some(bitfield_update) = &bitfield_update { // Write to bitfield - self.bitfield.update(bitfield_update); + ininner!(self).bitfield.update(bitfield_update); // Contiguous length is known only now - update_contiguous_length(&mut self.header, &self.bitfield, bitfield_update); + { + let HypercoreInnerInner { + bitfield, header, .. + } = &mut *ininner!(self); + update_contiguous_length(header, bitfield, &bitfield_update); + } } // Commit changeset to in-memory tree - self.tree.commit(changeset)?; + ininner!(self).tree.commit(changeset)?; // Now ready to flush - if self.should_flush_bitfield_and_tree_and_oplog() { - self.flush_bitfield_and_tree_and_oplog(false).await?; + if self.inner.should_flush_bitfield_and_tree_and_oplog() { + self.inner.flush_bitfield_and_tree_and_oplog(false).await?; } #[cfg(feature = "replication")] { if proof.upgrade.is_some() { // Notify replicator if we receieved an upgrade - let _ = self.events.send(crate::replication::events::DataUpgrade {}); + let _ = ininner!(self) + .events + .send(crate::replication::events::DataUpgrade {}); } // Notify replicator if we receieved a bitfield update if let Some(ref bitfield) = bitfield_update { - let _ = self + let _ = ininner!(self) .events .send(crate::replication::events::Have::from(bitfield)); } @@ -594,35 +429,22 @@ impl Hypercore { /// Used to fill the nodes field of a `RequestBlock` during /// synchronization. #[instrument(err, skip(self))] - pub async fn missing_nodes(&mut self, index: u64) -> Result { - self.missing_nodes_from_merkle_tree_index(index * 2).await + pub async fn missing_nodes(&self, index: u64) -> Result { + self.inner + .missing_nodes_from_merkle_tree_index(index * 2) + .await } - /// Get missing nodes using a merkle tree index. Advanced variant of missing_nodex + /// Get missing nodes using a merkle tree index. Advanced variant of missing_nodes /// that allow for special cases of searching directly from the merkle tree. #[instrument(err, skip(self))] pub async fn missing_nodes_from_merkle_tree_index( - &mut self, + &self, merkle_tree_index: u64, ) -> Result { - match self.tree.missing_nodes(merkle_tree_index, None)? { - Either::Right(value) => Ok(value), - Either::Left(instructions) => { - let mut instructions = instructions; - let mut infos: Vec = vec![]; - loop { - infos.extend(self.storage.read_infos_to_vec(&instructions).await?); - match self.tree.missing_nodes(merkle_tree_index, Some(&infos))? { - Either::Right(value) => { - return Ok(value); - } - Either::Left(new_instructions) => { - instructions = new_instructions; - } - } - } - } - } + self.inner + .missing_nodes_from_merkle_tree_index(merkle_tree_index) + .await } /// Makes the hypercore read-only by deleting the secret key. Returns true if the @@ -631,157 +453,26 @@ impl Hypercore { /// been stored. #[instrument(err, skip_all)] pub async fn make_read_only(&mut self) -> Result { - if self.key_pair.secret.is_some() { - self.key_pair.secret = None; - self.header.key_pair.secret = None; + if ininner!(self).key_pair.secret.is_some() { + ininner!(self).key_pair.secret = None; + ininner!(self).header.key_pair.secret = None; // Need to flush clearing traces to make sure both oplog slots are cleared - self.flush_bitfield_and_tree_and_oplog(true).await?; + { ininner!(self).flush_bitfield_and_tree_and_oplog(true) }.await?; Ok(true) } else { Ok(false) } } - - async fn byte_range( - &mut self, - index: u64, - initial_infos: Option<&[StoreInfo]>, - ) -> Result { - match self.tree.byte_range(index, initial_infos)? { - Either::Right(value) => Ok(value), - Either::Left(instructions) => { - let mut instructions = instructions; - let mut infos: Vec = vec![]; - loop { - infos.extend(self.storage.read_infos_to_vec(&instructions).await?); - match self.tree.byte_range(index, Some(&infos))? { - Either::Right(value) => { - return Ok(value); - } - Either::Left(new_instructions) => { - instructions = new_instructions; - } - } - } - } - } - } - - async fn create_valueless_proof( - &mut self, - block: Option, - hash: Option, - seek: Option, - upgrade: Option, - ) -> Result { - match self.tree.create_valueless_proof( - block.as_ref(), - hash.as_ref(), - seek.as_ref(), - upgrade.as_ref(), - None, - )? { - Either::Right(value) => Ok(value), - Either::Left(instructions) => { - let mut instructions = instructions; - let mut infos: Vec = vec![]; - loop { - infos.extend(self.storage.read_infos_to_vec(&instructions).await?); - match self.tree.create_valueless_proof( - block.as_ref(), - hash.as_ref(), - seek.as_ref(), - upgrade.as_ref(), - Some(&infos), - )? { - Either::Right(value) => { - return Ok(value); - } - Either::Left(new_instructions) => { - instructions = new_instructions; - } - } - } - } - } - } - - /// Verify a proof received from a peer. Returns a changeset that should be - /// applied. - async fn verify_proof(&mut self, proof: &Proof) -> Result { - match self.tree.verify_proof(proof, &self.key_pair.public, None)? { - Either::Right(value) => Ok(value), - Either::Left(instructions) => { - let infos = self.storage.read_infos_to_vec(&instructions).await?; - match self - .tree - .verify_proof(proof, &self.key_pair.public, Some(&infos))? - { - Either::Right(value) => Ok(value), - Either::Left(_) => Err(HypercoreError::InvalidOperation { - context: "Could not verify proof from tree".to_string(), - }), - } - } - } - } - - fn should_flush_bitfield_and_tree_and_oplog(&mut self) -> bool { - if self.skip_flush_count == 0 - || self.oplog.entries_byte_length >= MAX_OPLOG_ENTRIES_BYTE_SIZE - { - self.skip_flush_count = 3; - true - } else { - self.skip_flush_count -= 1; - false - } - } - - async fn flush_bitfield_and_tree_and_oplog( - &mut self, - clear_traces: bool, - ) -> Result<(), HypercoreError> { - let infos = self.bitfield.flush(); - self.storage.flush_infos(&infos).await?; - let infos = self.tree.flush(); - self.storage.flush_infos(&infos).await?; - let infos = self.oplog.flush(&self.header, clear_traces)?; - self.storage.flush_infos(&infos).await?; - Ok(()) - } -} - -fn update_contiguous_length( - header: &mut Header, - bitfield: &Bitfield, - bitfield_update: &BitfieldUpdate, -) { - let end = bitfield_update.start + bitfield_update.length; - let mut c = header.hints.contiguous_length; - if bitfield_update.drop { - if c <= end && c > bitfield_update.start { - c = bitfield_update.start; - } - } else if c <= end && c >= bitfield_update.start { - c = end; - while bitfield.get(c) { - c += 1; - } - } - - if c != header.hints.contiguous_length { - header.hints.contiguous_length = c; - } } #[cfg(test)] pub(crate) mod tests { use super::*; + use crate::crypto::{PartialKeypair, generate_signing_key}; #[async_std::test] async fn core_create_proof_block_only() -> Result<(), HypercoreError> { - let mut hypercore = create_hypercore_with_data(10).await?; + let hypercore = create_hypercore_with_data(10).await?; let proof = hypercore .create_proof(Some(RequestBlock { index: 4, nodes: 2 }), None, None, None) @@ -799,7 +490,7 @@ pub(crate) mod tests { #[async_std::test] async fn core_create_proof_block_and_upgrade() -> Result<(), HypercoreError> { - let mut hypercore = create_hypercore_with_data(10).await?; + let hypercore = create_hypercore_with_data(10).await?; let proof = hypercore .create_proof( Some(RequestBlock { index: 4, nodes: 0 }), @@ -830,7 +521,7 @@ pub(crate) mod tests { #[async_std::test] async fn core_create_proof_block_and_upgrade_and_additional() -> Result<(), HypercoreError> { - let mut hypercore = create_hypercore_with_data(10).await?; + let hypercore = create_hypercore_with_data(10).await?; let proof = hypercore .create_proof( Some(RequestBlock { index: 4, nodes: 0 }), @@ -862,7 +553,7 @@ pub(crate) mod tests { #[async_std::test] async fn core_create_proof_block_and_upgrade_from_existing_state() -> Result<(), HypercoreError> { - let mut hypercore = create_hypercore_with_data(10).await?; + let hypercore = create_hypercore_with_data(10).await?; let proof = hypercore .create_proof( Some(RequestBlock { index: 1, nodes: 0 }), @@ -893,7 +584,7 @@ pub(crate) mod tests { #[async_std::test] async fn core_create_proof_block_and_upgrade_from_existing_state_with_additional() -> Result<(), HypercoreError> { - let mut hypercore = create_hypercore_with_data(10).await?; + let hypercore = create_hypercore_with_data(10).await?; let proof = hypercore .create_proof( Some(RequestBlock { index: 1, nodes: 0 }), @@ -924,7 +615,7 @@ pub(crate) mod tests { #[async_std::test] async fn core_create_proof_block_and_seek_1_no_upgrade() -> Result<(), HypercoreError> { - let mut hypercore = create_hypercore_with_data(10).await?; + let hypercore = create_hypercore_with_data(10).await?; let proof = hypercore .create_proof( Some(RequestBlock { index: 4, nodes: 2 }), @@ -946,7 +637,7 @@ pub(crate) mod tests { #[async_std::test] async fn core_create_proof_block_and_seek_2_no_upgrade() -> Result<(), HypercoreError> { - let mut hypercore = create_hypercore_with_data(10).await?; + let hypercore = create_hypercore_with_data(10).await?; let proof = hypercore .create_proof( Some(RequestBlock { index: 4, nodes: 2 }), @@ -968,7 +659,7 @@ pub(crate) mod tests { #[async_std::test] async fn core_create_proof_block_and_seek_3_no_upgrade() -> Result<(), HypercoreError> { - let mut hypercore = create_hypercore_with_data(10).await?; + let hypercore = create_hypercore_with_data(10).await?; let proof = hypercore .create_proof( Some(RequestBlock { index: 4, nodes: 2 }), @@ -992,7 +683,7 @@ pub(crate) mod tests { #[async_std::test] async fn core_create_proof_block_and_seek_to_tree_no_upgrade() -> Result<(), HypercoreError> { - let mut hypercore = create_hypercore_with_data(16).await?; + let hypercore = create_hypercore_with_data(16).await?; let proof = hypercore .create_proof( Some(RequestBlock { index: 0, nodes: 4 }), @@ -1017,7 +708,7 @@ pub(crate) mod tests { #[async_std::test] async fn core_create_proof_block_and_seek_with_upgrade() -> Result<(), HypercoreError> { - let mut hypercore = create_hypercore_with_data(10).await?; + let hypercore = create_hypercore_with_data(10).await?; let proof = hypercore .create_proof( Some(RequestBlock { index: 4, nodes: 2 }), @@ -1047,7 +738,7 @@ pub(crate) mod tests { #[async_std::test] async fn core_create_proof_seek_with_upgrade() -> Result<(), HypercoreError> { - let mut hypercore = create_hypercore_with_data(10).await?; + let hypercore = create_hypercore_with_data(10).await?; let proof = hypercore .create_proof( None, @@ -1076,7 +767,7 @@ pub(crate) mod tests { #[async_std::test] async fn core_verify_proof_invalid_signature() -> Result<(), HypercoreError> { - let mut hypercore = create_hypercore_with_data(10).await?; + let hypercore = create_hypercore_with_data(10).await?; // Invalid clone hypercore with a different public key let mut hypercore_clone = create_hypercore_with_data(0).await?; let proof = hypercore @@ -1102,11 +793,11 @@ pub(crate) mod tests { #[async_std::test] async fn core_verify_and_apply_proof() -> Result<(), HypercoreError> { - let mut main = create_hypercore_with_data(10).await?; + let main = create_hypercore_with_data(10).await?; let mut clone = create_hypercore_with_data_and_key_pair( 0, PartialKeypair { - public: main.key_pair.public, + public: ininner!(main).key_pair.public, secret: None, }, ) diff --git a/src/lib.rs b/src/lib.rs index c2a4f4d0..d0a752ef 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,16 +1,12 @@ -#![forbid( - unsafe_code, - future_incompatible, - rust_2018_idioms, - rust_2018_compatibility, - missing_debug_implementations, - missing_docs -)] +#![forbid(unsafe_code, future_incompatible)] #![doc(test(attr(deny(warnings))))] #![warn( unreachable_pub, redundant_lifetimes, non_local_definitions, + missing_debug_implementations, + //missing_docs, + clippy::unused_async, clippy::needless_pass_by_value, clippy::needless_pass_by_ref_mut, clippy::enum_glob_use @@ -51,10 +47,6 @@ //! # tokio_test::block_on(async { //! # example().await; //! # }); -//! # #[cfg(feature = "async-std")] -//! # async_std::task::block_on(async { -//! # example().await; -//! # }); //! # async fn example() { //! use hypercore::{HypercoreBuilder, Storage}; //! diff --git a/src/replication/mod.rs b/src/replication/mod.rs index 35843736..a16df414 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -3,17 +3,27 @@ pub mod events; #[cfg(feature = "shared-core")] pub mod shared_core; +use futures::Stream; +use futures_lite::future::FutureExt; +use hypercore_handshake::CipherTrait; +use hypercore_protocol::{Protocol, discovery_key}; #[cfg(feature = "shared-core")] pub use shared_core::SharedCore; +use tracing::{error, trace, warn}; -use crate::{AppendOutcome, HypercoreError, Info, PartialKeypair}; +use crate::{AppendOutcome, Hypercore, HypercoreError, Info, PartialKeypair}; use hypercore_schema::{Proof, RequestBlock, RequestSeek, RequestUpgrade}; pub use events::Event; use async_broadcast::Receiver; -use std::future::Future; +use std::{ + future::Future, + pin::Pin, + sync::Mutex, + task::{Context, Poll}, +}; /// Methods related to just this core's information pub trait CoreInfo { @@ -90,3 +100,97 @@ pub trait CoreMethods: CoreInfo { batch: B, ) -> impl Future> + Send; } + +pub struct Peer { + protocol: Protocol, + _pending_open: Option>>>>, +} + +impl std::fmt::Debug for Peer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Peer") + .field("protocol", &self.protocol) + //.field("pending_open", &self.pending_open) + .finish() + } +} +impl Peer { + fn new(protocol: Protocol) -> Self { + Self { + protocol, + _pending_open: Default::default(), + } + } + + fn _poll_peer( + &mut self, + core: &Hypercore, + cx: &mut Context<'_>, + ) -> Poll> { + if let Some(mut fut) = self._pending_open.take() { + match fut.poll(cx) { + Poll::Ready(res) => match res { + Ok(_) => { + trace!("protocol opened"); + } + Err(e) => { + error!(error =? e, "protocol open failed"); + } + }, + Poll::Pending => { + _ = self._pending_open.insert(fut); + return Poll::Pending; + } + } + } + let event = match Pin::new(&mut self.protocol).poll_next(cx) { + Poll::Ready(res) => match res { + Some(Ok(e)) => e, + Some(Err(e)) => return Poll::Ready(Err(e.into())), + None => return Poll::Pending, + }, + Poll::Pending => todo!(), + }; + match event { + hypercore_protocol::Event::Handshake(_) => { + if self.protocol.is_initiator() { + let key = core.key_pair().public.to_bytes(); + self._pending_open = Some(Box::pin(self.protocol.open(key))); + } + } + hypercore_protocol::Event::DiscoveryKey(dkey) => { + let key = core.key_pair().public.to_bytes(); + let this_dkey = discovery_key(&key); + if this_dkey == dkey { + self._pending_open = Some(Box::pin(self.protocol.open(key))); + } else { + warn!("Got discovery key for different core: {dkey:?}"); + } + } + hypercore_protocol::Event::Channel(_channel) => todo!(), + hypercore_protocol::Event::Close(_) => {} + _ => todo!(), + } + todo!() + } +} + +impl Hypercore { + pub fn replicate(&mut self, stream: impl CipherTrait + 'static) { + let protocol = Protocol::new(Box::new(stream)); + self.peers.push(Mutex::new(Peer::new(protocol))); + } +} + +impl Stream for Hypercore { + type Item = Result<(), HypercoreError>; + + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + //for peer in self.peers.iter() { + // if let Poll::Ready(_) = peer.lock().unwrap().poll_peer(self.deref_mut(), cx) { + // cx.waker().wake_by_ref(); + // } + //} + Poll::Pending + } +} diff --git a/src/replication/shared_core.rs b/src/replication/shared_core.rs index 80418062..2d54e781 100644 --- a/src/replication/shared_core.rs +++ b/src/replication/shared_core.rs @@ -146,7 +146,7 @@ mod tests { let clone = create_hypercore_with_data_and_key_pair( 0, PartialKeypair { - public: main.key_pair.public, + public: main.inner.key_pair.public, secret: None, }, ) diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 749192b9..aeab3b18 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -4,10 +4,10 @@ use futures::future::FutureExt; #[cfg(not(target_arch = "wasm32"))] use random_access_disk::RandomAccessDisk; use random_access_memory::RandomAccessMemory; -use random_access_storage::{RandomAccess, RandomAccessError}; -use std::fmt::Debug; +use random_access_storage::{BoxFuture, RandomAccess, RandomAccessError}; #[cfg(not(target_arch = "wasm32"))] use std::path::PathBuf; +use std::{fmt::Debug, sync::Arc}; use tracing::instrument; use crate::{ @@ -16,40 +16,16 @@ use crate::{ }; /// Supertrait for Storage -pub trait StorageTraits: RandomAccess + Debug {} -impl StorageTraits for T {} +pub trait StorageTraits: RandomAccess + Debug + Send + Sync {} +impl StorageTraits for T {} /// Save data to a desired storage backend. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Storage { - tree: Box, - data: Box, - bitfield: Box, - oplog: Box, -} - -pub(crate) fn map_random_access_err(err: RandomAccessError) -> HypercoreError { - match err { - RandomAccessError::IO { - return_code, - context, - source, - } => HypercoreError::IO { - context: Some(format!( - "RandomAccess IO error. Context: {context:?}, return_code: {return_code:?}", - )), - source, - }, - RandomAccessError::OutOfBounds { - offset, - end, - length, - } => HypercoreError::InvalidOperation { - context: format!( - "RandomAccess out of bounds. Offset: {offset}, end: {end:?}, length: {length}", - ), - }, - } + tree: Arc, + data: Arc, + bitfield: Arc, + oplog: Arc, } impl Storage { @@ -60,182 +36,182 @@ impl Storage { Store, ) -> std::pin::Pin< Box< - dyn std::future::Future< - Output = Result, RandomAccessError>, - > + Send, + dyn std::future::Future, RandomAccessError>> + + Send, >, >, { - let mut tree = create(Store::Tree).await.map_err(map_random_access_err)?; - let mut data = create(Store::Data).await.map_err(map_random_access_err)?; - let mut bitfield = create(Store::Bitfield) - .await - .map_err(map_random_access_err)?; - let mut oplog = create(Store::Oplog).await.map_err(map_random_access_err)?; + let tree: Arc = create(Store::Tree).await?; + let data: Arc = create(Store::Data).await?; + let bitfield: Arc = create(Store::Bitfield).await?; + let oplog: Arc = create(Store::Oplog).await?; if overwrite { - if tree.len().await.map_err(map_random_access_err)? > 0 { - tree.truncate(0).await.map_err(map_random_access_err)?; + if tree.len() > 0 { + tree.truncate(0).await?; } - if data.len().await.map_err(map_random_access_err)? > 0 { - data.truncate(0).await.map_err(map_random_access_err)?; + if data.len() > 0 { + data.truncate(0).await?; } - if bitfield.len().await.map_err(map_random_access_err)? > 0 { - bitfield.truncate(0).await.map_err(map_random_access_err)?; + if bitfield.len() > 0 { + bitfield.truncate(0).await?; } - if oplog.len().await.map_err(map_random_access_err)? > 0 { - oplog.truncate(0).await.map_err(map_random_access_err)?; + if oplog.len() > 0 { + oplog.truncate(0).await?; } } - let instance = Self { + Ok(Self { tree, data, bitfield, oplog, - }; - - Ok(instance) + }) } - /// Read info from store based on given instruction. Convenience method to `read_infos`. - pub(crate) async fn read_info( - &mut self, + /// Read info from store based on given instruction. + pub(crate) fn read_info( + &self, info_instruction: StoreInfoInstruction, - ) -> Result { - let mut infos = self.read_infos_to_vec(&[info_instruction]).await?; - Ok(infos - .pop() - .expect("Should have gotten one info with one instruction")) + ) -> BoxFuture> { + let fut = self.read_infos_to_vec(vec![info_instruction]); + Box::pin(async move { + Ok(fut + .await? + .pop() + .expect("Should have gotten one info with one instruction")) + }) } /// Read infos from stores based on given instructions - pub(crate) async fn read_infos( - &mut self, - info_instructions: &[StoreInfoInstruction], - ) -> Result, HypercoreError> { - let infos = self.read_infos_to_vec(info_instructions).await?; - Ok(infos.into_boxed_slice()) + pub(crate) fn read_infos( + &self, + info_instructions: Vec, + ) -> BoxFuture, HypercoreError>> { + let fut = self.read_infos_to_vec(info_instructions); + Box::pin(async move { Ok(fut.await?.into_boxed_slice()) }) } /// Reads infos but retains them as a Vec - pub(crate) async fn read_infos_to_vec( - &mut self, - info_instructions: &[StoreInfoInstruction], - ) -> Result, HypercoreError> { - if info_instructions.is_empty() { - return Ok(vec![]); - } - let mut current_store: Store = info_instructions[0].store.clone(); - let mut storage = self.get_random_access_mut(¤t_store); - let mut infos: Vec = Vec::with_capacity(info_instructions.len()); - for instruction in info_instructions.iter() { - if instruction.store != current_store { - current_store = instruction.store.clone(); - storage = self.get_random_access_mut(¤t_store); + pub(crate) fn read_infos_to_vec( + &self, + info_instructions: Vec, + ) -> BoxFuture, HypercoreError>> { + let storage = self.clone(); + let instructions = info_instructions; // TODO rm + Box::pin(async move { + if instructions.is_empty() { + return Ok(vec![]); } - match instruction.info_type { - StoreInfoType::Content => { - let read_length = match instruction.length { - Some(length) => length, - None => storage.len().await.map_err(map_random_access_err)?, - }; - let read_result = storage.read(instruction.index, read_length).await; - let info: StoreInfo = match read_result { - Ok(buf) => Ok(StoreInfo::new_content( + let mut current_store: Store = instructions[0].store.clone(); + let mut ra: Arc = storage.get_random_access(¤t_store).clone(); + let mut infos: Vec = Vec::with_capacity(instructions.len()); + for instruction in instructions.iter() { + if instruction.store != current_store { + current_store = instruction.store.clone(); + ra = storage.get_random_access(¤t_store).clone(); + } + match instruction.info_type { + StoreInfoType::Content => { + let read_length = match instruction.length { + Some(length) => length, + None => ra.len(), + }; + let read_result = ra.read(instruction.index, read_length).await; + let info: StoreInfo = match read_result { + Ok(buf) => Ok(StoreInfo::new_content( + instruction.store.clone(), + instruction.index, + &buf, + )), + Err(RandomAccessError::OutOfBounds { length, .. }) => { + if instruction.allow_miss { + Ok(StoreInfo::new_content_miss( + instruction.store.clone(), + instruction.index, + )) + } else { + Err(HypercoreError::InvalidOperation { + context: format!( + "Could not read from store {}, index {} / length {} is out of bounds for store length {}", + current_store, instruction.index, read_length, length + ), + }) + } + } + Err(e) => Err(HypercoreError::from(e)), + }?; + infos.push(info); + } + StoreInfoType::Size => { + let length = ra.len(); + infos.push(StoreInfo::new_size( instruction.store.clone(), instruction.index, - &buf, - )), - Err(RandomAccessError::OutOfBounds { length, .. }) => { - if instruction.allow_miss { - Ok(StoreInfo::new_content_miss( - instruction.store.clone(), - instruction.index, - )) - } else { - Err(HypercoreError::InvalidOperation { - context: format!( - "Could not read from store {}, index {} / length {} is out of bounds for store length {}", - current_store, instruction.index, read_length, length - ), - }) - } - } - Err(e) => Err(map_random_access_err(e)), - }?; - infos.push(info); - } - StoreInfoType::Size => { - let length = storage.len().await.map_err(map_random_access_err)?; - infos.push(StoreInfo::new_size( - instruction.store.clone(), - instruction.index, - length - instruction.index, - )); + length - instruction.index, + )); + } } } - } - Ok(infos) + Ok(infos) + }) } - /// Flush info to storage. Convenience method to `flush_infos`. - pub(crate) async fn flush_info(&mut self, slice: StoreInfo) -> Result<(), HypercoreError> { - self.flush_infos(&[slice]).await + /// Flush info to storage. + pub(crate) fn flush_info(&self, info: StoreInfo) -> BoxFuture> { + self.flush_infos(vec![info]) } /// Flush infos to storage - pub(crate) async fn flush_infos(&mut self, infos: &[StoreInfo]) -> Result<(), HypercoreError> { - if infos.is_empty() { - return Ok(()); - } - let mut current_store: Store = infos[0].store.clone(); - let mut storage = self.get_random_access_mut(¤t_store); - for info in infos.iter() { - if info.store != current_store { - current_store = info.store.clone(); - storage = self.get_random_access_mut(¤t_store); + pub(crate) fn flush_infos( + &self, + infos: Vec, + ) -> BoxFuture> { + let storage = self.clone(); + Box::pin(async move { + if infos.is_empty() { + return Ok(()); } - match info.info_type { - StoreInfoType::Content => { - if !info.miss { - if let Some(data) = &info.data { - storage - .write(info.index, data) - .await - .map_err(map_random_access_err)?; - } - } else { - storage - .del( + let mut current_store: Store = infos[0].store.clone(); + let mut ra: Arc = storage.get_random_access(¤t_store).clone(); + for info in infos.iter() { + if info.store != current_store { + current_store = info.store.clone(); + ra = storage.get_random_access(¤t_store).clone(); + } + match info.info_type { + StoreInfoType::Content => { + if !info.miss { + if let Some(data) = &info.data { + ra.write(info.index, data).await?; + } + } else { + ra.del( info.index, info.length.expect("When deleting, length must be given"), ) - .await - .map_err(map_random_access_err)?; + .await?; + } } - } - StoreInfoType::Size => { - if info.miss { - storage - .truncate(info.index) - .await - .map_err(map_random_access_err)?; - } else { - panic!("Flushing a size that isn't miss, is not supported"); + StoreInfoType::Size => { + if info.miss { + ra.truncate(info.index).await?; + } else { + panic!("Flushing a size that isn't miss, is not supported"); + } } } } - } - Ok(()) + Ok(()) + }) } - fn get_random_access_mut(&mut self, store: &Store) -> &mut Box { + fn get_random_access(&self, store: &Store) -> &Arc { match store { - Store::Tree => &mut self.tree, - Store::Data => &mut self.data, - Store::Bitfield => &mut self.bitfield, - Store::Oplog => &mut self.oplog, + Store::Tree => &self.tree, + Store::Data => &self.data, + Store::Bitfield => &self.bitfield, + Store::Oplog => &self.oplog, } } @@ -243,10 +219,8 @@ impl Storage { #[instrument(err)] pub async fn new_memory() -> Result { let create = |_| { - async { Ok(Box::new(RandomAccessMemory::default()) as Box) } - .boxed() + async { Ok(Arc::new(RandomAccessMemory::default()) as Arc) }.boxed() }; - // No reason to overwrite, as this is a new memory segment Self::open(create, false).await } @@ -264,8 +238,8 @@ impl Storage { Store::Oplog => "oplog", }; Ok( - Box::new(RandomAccessDisk::open(dir.as_path().join(name)).await?) - as Box, + Arc::new(RandomAccessDisk::open(dir.as_path().join(name)).await?) + as Arc, ) } .boxed() @@ -273,3 +247,67 @@ impl Storage { Self::open(storage, overwrite).await } } + +#[cfg(test)] +mod test { + use super::*; + use crate::common::{StoreInfo, StoreInfoInstruction}; + + #[tokio::test] + async fn test_futures_are_owned() -> Result<(), Box> { + let fut = { + let storage = Storage::new_memory().await?; + + let data = b"hello hypercore"; + + // Write to tree store + storage.flush_info(StoreInfo::new_content(Store::Tree, 0, data)) + }; + fut.await?; + + Ok(()) + } + + #[tokio::test] + async fn test_storage() -> Result<(), Box> { + let storage = Storage::new_memory().await?; + + let data = b"hello hypercore"; + + // Write to tree store + storage + .flush_info(StoreInfo::new_content(Store::Tree, 0, data)) + .await?; + + // Read it back + let info = storage + .read_info(StoreInfoInstruction::new_content( + Store::Tree, + 0, + data.len() as u64, + )) + .await?; + + assert_eq!(info.data.as_deref(), Some(data.as_slice())); + + // Write to two different stores, read back together + storage + .flush_infos(vec![ + StoreInfo::new_content(Store::Data, 0, b"block0"), + StoreInfo::new_content(Store::Bitfield, 0, b"bits"), + ]) + .await?; + + let infos = storage + .read_infos(vec![ + StoreInfoInstruction::new_content(Store::Data, 0, 6), + StoreInfoInstruction::new_content(Store::Bitfield, 0, 4), + ]) + .await?; + + assert_eq!(infos[0].data.as_deref(), Some(b"block0".as_slice())); + assert_eq!(infos[1].data.as_deref(), Some(b"bits".as_slice())); + + Ok(()) + } +} diff --git a/tests/core.rs b/tests/core.rs index 7b39a76b..0595dc15 100644 --- a/tests/core.rs +++ b/tests/core.rs @@ -6,8 +6,6 @@ use hypercore::{HypercoreBuilder, Storage}; use tempfile::Builder; use test_log::test; -#[cfg(feature = "async-std")] -use async_std::test as async_test; #[cfg(feature = "tokio")] use tokio::test as async_test; @@ -74,7 +72,7 @@ async fn hypercore_make_read_only() -> Result<()> { &write_key_pair.secret.as_ref().unwrap().to_bytes()[16..], )); - let mut hypercore = open_hypercore(&dir.path().to_string_lossy()).await?; + let hypercore = open_hypercore(&dir.path().to_string_lossy()).await?; assert_eq!(&hypercore.get(0).await?.unwrap(), b"Hello"); assert_eq!(&hypercore.get(1).await?.unwrap(), b"World!"); Ok(()) diff --git a/tests/js_interop.rs b/tests/js_interop.rs index 5d02d737..c4f25640 100644 --- a/tests/js_interop.rs +++ b/tests/js_interop.rs @@ -7,8 +7,6 @@ use common::{create_hypercore, create_hypercore_hash, open_hypercore}; use js::{cleanup, install, js_run_step, prepare_test_set}; use test_log::test; -#[cfg(feature = "async-std")] -use async_std::test as async_test; #[cfg(feature = "tokio")] use tokio::test as async_test; diff --git a/tests/model.rs b/tests/model.rs index 86a74657..0ff804c2 100644 --- a/tests/model.rs +++ b/tests/model.rs @@ -43,12 +43,6 @@ proptest! { ..Default::default() })] - #[test] - #[cfg(feature = "async-std")] - fn implementation_matches_model(ops: Vec) { - assert!(async_std::task::block_on(assert_implementation_matches_model(ops))); - } - #[test] #[cfg(feature = "tokio")] fn implementation_matches_model(ops: Vec) {