From 1edffd4ece0ac4268cc31229aeaf3b9b4414c797 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Sat, 21 Feb 2026 15:49:28 -0500 Subject: [PATCH 01/26] WIP add replication deps --- Cargo.toml | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index e831b78e..16490ba8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,10 +41,20 @@ intmap = "2" moka = { version = "0.12", optional = true, features = ["sync"] } async-broadcast = { version = "0.7.1", optional = true } async-lock = {version = "3.4.0", optional = true } +futures-lite = "2.6.1" [dependencies.hypercore_schema] version = "0.2.0" +[dependencies.hypercore-protocol] +optional = true +path = "../protocol/" + +[dependencies.hypercore_handshake] +optional = true +path = "../handshake/" + + [target.'cfg(not(target_arch = "wasm32"))'.dependencies] random-access-disk = { version = "3", default-features = false } @@ -65,7 +75,7 @@ tracing-subscriber = { version = "0.3.16", features = ["env-filter", "fmt"] } [features] default = ["tokio", "sparse", "replication", "cache"] -replication = ["dep:async-broadcast"] +replication = ["dep:async-broadcast", "dep:hypercore-protocol", "dep:hypercore_handshake"] shared-core = ["replication", "dep:async-lock"] sparse = ["random-access-disk/sparse"] tokio = ["random-access-disk/tokio"] From 65571709ecd846d7570b58cffeb71c112d788b55 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Sat, 21 Feb 2026 15:49:42 -0500 Subject: [PATCH 02/26] Add error from hc proto for replication --- src/common/error.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/common/error.rs b/src/common/error.rs index 89ec0b37..6f5edcdc 100644 --- a/src/common/error.rs +++ b/src/common/error.rs @@ -58,6 +58,10 @@ pub enum HypercoreError { #[source] source: std::io::Error, }, + + #[cfg(feature = "replication")] + #[error("hypercore_protocol Error")] + Protocol(#[from] hypercore_protocol::Error), } impl From for HypercoreError { From 159e380907887f5bb2e93710d071d43bd1117a85 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Sun, 22 Feb 2026 14:17:52 -0500 Subject: [PATCH 03/26] Add peers to core --- src/core.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/core.rs b/src/core.rs index 3adc9a50..8b7dee08 100644 --- a/src/core.rs +++ b/src/core.rs @@ -1,8 +1,9 @@ //! Hypercore's main abstraction. Exposes an append-only, secure log structure. use ed25519_dalek::Signature; use futures::future::Either; -use std::convert::TryFrom; use std::fmt::Debug; +#[cfg(feature = "replication")] +use std::sync::Mutex; use tracing::instrument; #[cfg(feature = "cache")] @@ -51,6 +52,8 @@ pub struct Hypercore { header: Header, #[cfg(feature = "replication")] events: crate::replication::events::Events, + #[cfg(feature = "replication")] + pub(crate) peers: Vec>, } /// Response from append, matches that of the Javascript result @@ -252,6 +255,8 @@ impl Hypercore { skip_flush_count: 0, #[cfg(feature = "replication")] events: crate::replication::events::Events::new(), + #[cfg(feature = "replication")] + peers: Default::default(), }) } From 90646e0f55290a3ffbf25df535de61128fe8e34d Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Mon, 23 Feb 2026 15:30:16 -0500 Subject: [PATCH 04/26] We're in 2024 now --- src/lib.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index c2a4f4d0..0c7c938c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,16 +1,11 @@ -#![forbid( - unsafe_code, - future_incompatible, - rust_2018_idioms, - rust_2018_compatibility, - missing_debug_implementations, - missing_docs -)] +#![forbid(unsafe_code, future_incompatible)] #![doc(test(attr(deny(warnings))))] #![warn( unreachable_pub, redundant_lifetimes, non_local_definitions, + missing_debug_implementations, + missing_docs, clippy::needless_pass_by_value, clippy::needless_pass_by_ref_mut, clippy::enum_glob_use From d0ecc05e7bf202269aed7869c747e09716463b93 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Mon, 23 Feb 2026 15:30:38 -0500 Subject: [PATCH 05/26] WIP revisit after refactoring await all the `&mut self` See issue #148 --- src/replication/mod.rs | 109 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 107 insertions(+), 2 deletions(-) diff --git a/src/replication/mod.rs b/src/replication/mod.rs index 35843736..cda96fe1 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -3,17 +3,27 @@ pub mod events; #[cfg(feature = "shared-core")] pub mod shared_core; +use futures::Stream; +use futures_lite::future::FutureExt; +use hypercore_handshake::CipherTrait; +use hypercore_protocol::{Protocol, discovery_key}; #[cfg(feature = "shared-core")] pub use shared_core::SharedCore; +use tracing::{error, trace, warn}; -use crate::{AppendOutcome, HypercoreError, Info, PartialKeypair}; +use crate::{AppendOutcome, Hypercore, HypercoreError, Info, PartialKeypair}; use hypercore_schema::{Proof, RequestBlock, RequestSeek, RequestUpgrade}; pub use events::Event; use async_broadcast::Receiver; -use std::future::Future; +use std::{ + future::Future, + pin::Pin, + sync::Mutex, + task::{Context, Poll}, +}; /// Methods related to just this core's information pub trait CoreInfo { @@ -90,3 +100,98 @@ pub trait CoreMethods: CoreInfo { batch: B, ) -> impl Future> + Send; } + +pub struct Peer { + protocol: Protocol, + pending_open: Option>>>>, +} + +impl std::fmt::Debug for Peer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Peer") + .field("protocol", &self.protocol) + //.field("pending_open", &self.pending_open) + .finish() + } +} +impl Peer { + fn new(protocol: Protocol) -> Self { + Self { + protocol, + pending_open: Default::default(), + } + } + + fn poll_peer( + &mut self, + core: &mut Hypercore, + cx: &mut Context<'_>, + ) -> Poll> { + if let Some(mut fut) = self.pending_open.take() { + match fut.poll(cx) { + Poll::Ready(res) => match res { + Ok(_) => { + trace!("protocol opened"); + } + Err(e) => { + error!(error =? e, "protocol open failed"); + } + }, + Poll::Pending => { + _ = self.pending_open.insert(fut); + return Poll::Pending; + } + } + } + let event = match Pin::new(&mut self.protocol).poll_next(cx) { + Poll::Ready(res) => match res { + Some(Ok(e)) => e, + Some(Err(e)) => return Poll::Ready(Err(e.into())), + None => return Poll::Pending, + }, + Poll::Pending => todo!(), + }; + match event { + hypercore_protocol::Event::Handshake(_) => { + if self.protocol.is_initiator() { + let key = core.key_pair().public.to_bytes(); + self.pending_open = Some(Box::pin(self.protocol.open(key))); + } + } + hypercore_protocol::Event::DiscoveryKey(dkey) => { + let key = core.key_pair().public.to_bytes(); + let this_dkey = discovery_key(&key); + if this_dkey == dkey { + self.pending_open = Some(Box::pin(self.protocol.open(key))); + } else { + warn!("Got discovery key for different core: {dkey:?}"); + } + } + hypercore_protocol::Event::Channel(channel) => todo!(), + hypercore_protocol::Event::Close(_) => {} + _ => todo!(), + } + todo!() + } +} + +impl Hypercore { + pub fn replicate(&mut self, stream: impl CipherTrait + 'static) { + let protocol = Protocol::new(Box::new(stream)); + self.peers.push(Mutex::new(Peer::new(protocol))); + } +} + +impl Stream for Hypercore { + type Item = Result<(), HypercoreError>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + use std::ops::DerefMut; + for peer in self.peers.iter() { + if let Poll::Ready(_) = peer.lock().unwrap().poll_peer(self.deref_mut(), cx) { + cx.waker().wake_by_ref(); + } + } + Poll::Pending + } +} From 9c9fea0ad63af0bfbc172363850efafe8eaa2404 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Tue, 24 Feb 2026 00:56:32 -0500 Subject: [PATCH 06/26] WIP update RandomAccess.len --- Cargo.toml | 13 ++++++++----- src/storage/mod.rs | 12 ++++++------ 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 16490ba8..5e753afb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,8 +32,6 @@ flat-tree = "6" merkle-tree-stream = "0.12" pretty-hash = "0.4" rand = "0.8" -random-access-memory = "3" -random-access-storage = "5" sha2 = "0.10" futures = "0.3" crc32fast = "1" @@ -54,9 +52,15 @@ path = "../protocol/" optional = true path = "../handshake/" +[dependencies.random-access-storage] +path = "../ram/storage" -[target.'cfg(not(target_arch = "wasm32"))'.dependencies] -random-access-disk = { version = "3", default-features = false } +[dependencies.random-access-memory] +path = "../ram/mem/" + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.random-access-disk] +path = "../ram/disk" +default-features = false [dev-dependencies] anyhow = "1.0.70" @@ -79,7 +83,6 @@ replication = ["dep:async-broadcast", "dep:hypercore-protocol", "dep:hypercore_h shared-core = ["replication", "dep:async-lock"] sparse = ["random-access-disk/sparse"] tokio = ["random-access-disk/tokio"] -async-std = ["random-access-disk/async-std"] cache = ["moka"] # Used only in interoperability tests under tests/js-interop which use the javascript version of hypercore # to verify that this crate works. To run them, use: diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 749192b9..547273b1 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -74,16 +74,16 @@ impl Storage { let mut oplog = create(Store::Oplog).await.map_err(map_random_access_err)?; if overwrite { - if tree.len().await.map_err(map_random_access_err)? > 0 { + if tree.len() > 0 { tree.truncate(0).await.map_err(map_random_access_err)?; } - if data.len().await.map_err(map_random_access_err)? > 0 { + if data.len() > 0 { data.truncate(0).await.map_err(map_random_access_err)?; } - if bitfield.len().await.map_err(map_random_access_err)? > 0 { + if bitfield.len() > 0 { bitfield.truncate(0).await.map_err(map_random_access_err)?; } - if oplog.len().await.map_err(map_random_access_err)? > 0 { + if oplog.len() > 0 { oplog.truncate(0).await.map_err(map_random_access_err)?; } } @@ -138,7 +138,7 @@ impl Storage { StoreInfoType::Content => { let read_length = match instruction.length { Some(length) => length, - None => storage.len().await.map_err(map_random_access_err)?, + None => storage.len(), }; let read_result = storage.read(instruction.index, read_length).await; let info: StoreInfo = match read_result { @@ -167,7 +167,7 @@ impl Storage { infos.push(info); } StoreInfoType::Size => { - let length = storage.len().await.map_err(map_random_access_err)?; + let length = storage.len(); infos.push(StoreInfo::new_size( instruction.store.clone(), instruction.index, From f008b017fe50e445ef88e185ffa2d4822a15cddc Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Tue, 24 Feb 2026 01:26:18 -0500 Subject: [PATCH 07/26] rm async_std --- examples/disk.rs | 2 -- examples/memory.rs | 2 -- examples/replication.rs | 2 -- src/lib.rs | 4 ---- tests/core.rs | 2 -- tests/js_interop.rs | 2 -- tests/model.rs | 6 ------ 7 files changed, 20 deletions(-) diff --git a/examples/disk.rs b/examples/disk.rs index d99b7a10..4bb67446 100644 --- a/examples/disk.rs +++ b/examples/disk.rs @@ -1,5 +1,3 @@ -#[cfg(feature = "async-std")] -use async_std::main as async_main; use hypercore::{HypercoreBuilder, HypercoreError, Storage}; use tempfile::Builder; #[cfg(feature = "tokio")] diff --git a/examples/memory.rs b/examples/memory.rs index a510ed6d..d61362c6 100644 --- a/examples/memory.rs +++ b/examples/memory.rs @@ -1,5 +1,3 @@ -#[cfg(feature = "async-std")] -use async_std::main as async_main; use hypercore::{HypercoreBuilder, HypercoreError, Storage}; #[cfg(feature = "tokio")] use tokio::main as async_main; diff --git a/examples/replication.rs b/examples/replication.rs index f2943796..01f8270e 100644 --- a/examples/replication.rs +++ b/examples/replication.rs @@ -1,5 +1,3 @@ -#[cfg(feature = "async-std")] -use async_std::main as async_main; use hypercore::{Hypercore, HypercoreBuilder, HypercoreError, PartialKeypair, Storage}; use hypercore_schema::{RequestBlock, RequestUpgrade}; use tempfile::Builder; diff --git a/src/lib.rs b/src/lib.rs index 0c7c938c..d80b1aa3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -46,10 +46,6 @@ //! # tokio_test::block_on(async { //! # example().await; //! # }); -//! # #[cfg(feature = "async-std")] -//! # async_std::task::block_on(async { -//! # example().await; -//! # }); //! # async fn example() { //! use hypercore::{HypercoreBuilder, Storage}; //! diff --git a/tests/core.rs b/tests/core.rs index 7b39a76b..09557930 100644 --- a/tests/core.rs +++ b/tests/core.rs @@ -6,8 +6,6 @@ use hypercore::{HypercoreBuilder, Storage}; use tempfile::Builder; use test_log::test; -#[cfg(feature = "async-std")] -use async_std::test as async_test; #[cfg(feature = "tokio")] use tokio::test as async_test; diff --git a/tests/js_interop.rs b/tests/js_interop.rs index 5d02d737..c4f25640 100644 --- a/tests/js_interop.rs +++ b/tests/js_interop.rs @@ -7,8 +7,6 @@ use common::{create_hypercore, create_hypercore_hash, open_hypercore}; use js::{cleanup, install, js_run_step, prepare_test_set}; use test_log::test; -#[cfg(feature = "async-std")] -use async_std::test as async_test; #[cfg(feature = "tokio")] use tokio::test as async_test; diff --git a/tests/model.rs b/tests/model.rs index 86a74657..0ff804c2 100644 --- a/tests/model.rs +++ b/tests/model.rs @@ -43,12 +43,6 @@ proptest! { ..Default::default() })] - #[test] - #[cfg(feature = "async-std")] - fn implementation_matches_model(ops: Vec) { - assert!(async_std::task::block_on(assert_implementation_matches_model(ops))); - } - #[test] #[cfg(feature = "tokio")] fn implementation_matches_model(ops: Vec) { From b1fc656bb9e78b1a6308b21e13cfdfa9cfd6dd55 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Tue, 24 Feb 2026 01:26:31 -0500 Subject: [PATCH 08/26] Add get_random_access --- src/storage/mod.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 547273b1..efd87d7f 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -127,12 +127,12 @@ impl Storage { return Ok(vec![]); } let mut current_store: Store = info_instructions[0].store.clone(); - let mut storage = self.get_random_access_mut(¤t_store); + let mut storage = self.get_random_access(¤t_store); let mut infos: Vec = Vec::with_capacity(info_instructions.len()); for instruction in info_instructions.iter() { if instruction.store != current_store { current_store = instruction.store.clone(); - storage = self.get_random_access_mut(¤t_store); + storage = self.get_random_access(¤t_store); } match instruction.info_type { StoreInfoType::Content => { @@ -238,6 +238,14 @@ impl Storage { Store::Oplog => &mut self.oplog, } } + fn get_random_access(&self, store: &Store) -> &Box { + match store { + Store::Tree => &self.tree, + Store::Data => &self.data, + Store::Bitfield => &self.bitfield, + Store::Oplog => &self.oplog, + } + } /// New storage backed by a `RandomAccessMemory` instance. #[instrument(err)] From 997bc4e2a72a76dbfab3ff3bf397819382002f4d Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Tue, 24 Feb 2026 14:51:01 -0500 Subject: [PATCH 09/26] RMME --- src/lib.rs | 2 +- src/replication/mod.rs | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d80b1aa3..1d039d9c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,7 @@ redundant_lifetimes, non_local_definitions, missing_debug_implementations, - missing_docs, + //missing_docs, clippy::needless_pass_by_value, clippy::needless_pass_by_ref_mut, clippy::enum_glob_use diff --git a/src/replication/mod.rs b/src/replication/mod.rs index cda96fe1..cb4876c2 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -187,11 +187,11 @@ impl Stream for Hypercore { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { use std::ops::DerefMut; - for peer in self.peers.iter() { - if let Poll::Ready(_) = peer.lock().unwrap().poll_peer(self.deref_mut(), cx) { - cx.waker().wake_by_ref(); - } - } + //for peer in self.peers.iter() { + // if let Poll::Ready(_) = peer.lock().unwrap().poll_peer(self.deref_mut(), cx) { + // cx.waker().wake_by_ref(); + // } + //} Poll::Pending } } From f59da0fe5df5cb1fea2d394f6fe0cb56e59d188d Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Tue, 24 Feb 2026 14:52:18 -0500 Subject: [PATCH 10/26] lints --- src/replication/mod.rs | 6 +++--- src/storage/mod.rs | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/replication/mod.rs b/src/replication/mod.rs index cb4876c2..87eacda9 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -167,7 +167,7 @@ impl Peer { warn!("Got discovery key for different core: {dkey:?}"); } } - hypercore_protocol::Event::Channel(channel) => todo!(), + hypercore_protocol::Event::Channel(_channel) => todo!(), hypercore_protocol::Event::Close(_) => {} _ => todo!(), } @@ -185,8 +185,8 @@ impl Hypercore { impl Stream for Hypercore { type Item = Result<(), HypercoreError>; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - use std::ops::DerefMut; + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + //for peer in self.peers.iter() { // if let Poll::Ready(_) = peer.lock().unwrap().poll_peer(self.deref_mut(), cx) { // cx.waker().wake_by_ref(); diff --git a/src/storage/mod.rs b/src/storage/mod.rs index efd87d7f..e42d502a 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -66,12 +66,12 @@ impl Storage { >, >, { - let mut tree = create(Store::Tree).await.map_err(map_random_access_err)?; - let mut data = create(Store::Data).await.map_err(map_random_access_err)?; - let mut bitfield = create(Store::Bitfield) + let tree = create(Store::Tree).await.map_err(map_random_access_err)?; + let data = create(Store::Data).await.map_err(map_random_access_err)?; + let bitfield = create(Store::Bitfield) .await .map_err(map_random_access_err)?; - let mut oplog = create(Store::Oplog).await.map_err(map_random_access_err)?; + let oplog = create(Store::Oplog).await.map_err(map_random_access_err)?; if overwrite { if tree.len() > 0 { From e1d1600afb85c7ad6cb0641192fa9b70e2dd4e01 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Tue, 24 Feb 2026 14:53:51 -0500 Subject: [PATCH 11/26] rm async-std --- benches/disk.rs | 19 ------------------- benches/memory.rs | 18 ------------------ 2 files changed, 37 deletions(-) diff --git a/benches/disk.rs b/benches/disk.rs index 3292df03..60d01c69 100644 --- a/benches/disk.rs +++ b/benches/disk.rs @@ -1,7 +1,5 @@ use std::time::{Duration, Instant}; -#[cfg(feature = "async-std")] -use criterion::async_executor::AsyncStdExecutor; use criterion::{Criterion, black_box, criterion_group, criterion_main}; use hypercore::{Hypercore, HypercoreBuilder, HypercoreError, Storage}; use tempfile::Builder as TempfileBuilder; @@ -10,11 +8,6 @@ fn bench_create_disk(c: &mut Criterion) { let mut group = c.benchmark_group("slow_call"); group.measurement_time(Duration::from_secs(20)); - #[cfg(feature = "async-std")] - group.bench_function("create_disk", move |b| { - b.to_async(AsyncStdExecutor) - .iter(|| create_hypercore("create")); - }); #[cfg(feature = "tokio")] group.bench_function("create_disk", move |b| { let rt = tokio::runtime::Runtime::new().unwrap(); @@ -51,10 +44,6 @@ fn bench_write_disk(c: &mut Criterion) { let mut group = c.benchmark_group("slow_call"); group.measurement_time(Duration::from_secs(20)); - #[cfg(feature = "async-std")] - group.bench_function("write disk", |b| { - b.to_async(AsyncStdExecutor).iter_custom(write_disk); - }); #[cfg(feature = "tokio")] group.bench_function("write disk", |b| { let rt = tokio::runtime::Runtime::new().unwrap(); @@ -76,10 +65,6 @@ fn bench_read_disk(c: &mut Criterion) { let mut group = c.benchmark_group("slow_call"); group.measurement_time(Duration::from_secs(20)); - #[cfg(feature = "async-std")] - group.bench_function("read disk", |b| { - b.to_async(AsyncStdExecutor).iter_custom(read_disk); - }); #[cfg(feature = "tokio")] group.bench_function("read disk", |b| { let rt = tokio::runtime::Runtime::new().unwrap(); @@ -104,10 +89,6 @@ fn bench_clear_disk(c: &mut Criterion) { let mut group = c.benchmark_group("slow_call"); group.measurement_time(Duration::from_secs(20)); - #[cfg(feature = "async-std")] - group.bench_function("clear disk", |b| { - b.to_async(AsyncStdExecutor).iter_custom(clear_disk); - }); #[cfg(feature = "tokio")] group.bench_function("clear disk", |b| { let rt = tokio::runtime::Runtime::new().unwrap(); diff --git a/benches/memory.rs b/benches/memory.rs index ac8015a5..e098ccbe 100644 --- a/benches/memory.rs +++ b/benches/memory.rs @@ -1,16 +1,10 @@ use std::time::{Duration, Instant}; -#[cfg(feature = "async-std")] -use criterion::async_executor::AsyncStdExecutor; use criterion::{Criterion, black_box, criterion_group, criterion_main}; use hypercore::{Hypercore, HypercoreBuilder, HypercoreError, Storage}; use random_access_memory::RandomAccessMemory; fn bench_create_memory(c: &mut Criterion) { - #[cfg(feature = "async-std")] - c.bench_function("create memory", |b| { - b.to_async(AsyncStdExecutor).iter(|| create_hypercore(1024)); - }); #[cfg(feature = "tokio")] c.bench_function("create memory", |b| { let rt = tokio::runtime::Runtime::new().unwrap(); @@ -54,10 +48,6 @@ async fn create_hypercore(page_size: usize) -> Result } fn bench_write_memory(c: &mut Criterion) { - #[cfg(feature = "async-std")] - c.bench_function("write memory", |b| { - b.to_async(AsyncStdExecutor).iter_custom(write_memory); - }); #[cfg(feature = "tokio")] c.bench_function("write memory", |b| { let rt = tokio::runtime::Runtime::new().unwrap(); @@ -76,10 +66,6 @@ async fn write_memory(iters: u64) -> Duration { } fn bench_read_memory(c: &mut Criterion) { - #[cfg(feature = "async-std")] - c.bench_function("read memory", |b| { - b.to_async(AsyncStdExecutor).iter_custom(read_memory); - }); #[cfg(feature = "tokio")] c.bench_function("read memory", |b| { let rt = tokio::runtime::Runtime::new().unwrap(); @@ -101,10 +87,6 @@ async fn read_memory(iters: u64) -> Duration { } fn bench_clear_memory(c: &mut Criterion) { - #[cfg(feature = "async-std")] - c.bench_function("clear memory", |b| { - b.to_async(AsyncStdExecutor).iter_custom(clear_memory); - }); #[cfg(feature = "tokio")] c.bench_function("clear memory", |b| { let rt = tokio::runtime::Runtime::new().unwrap(); From 805ef1b9d479ac1f2c7d838516e258ede0825323 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Tue, 24 Feb 2026 15:02:45 -0500 Subject: [PATCH 12/26] fix the needless_pas_by_ref_mut --- src/core.rs | 42 +++++++++++++++++++++--------------------- src/replication/mod.rs | 3 +-- src/storage/mod.rs | 22 +++++++--------------- tests/core.rs | 2 +- 4 files changed, 30 insertions(+), 39 deletions(-) diff --git a/src/core.rs b/src/core.rs index 8b7dee08..5570248a 100644 --- a/src/core.rs +++ b/src/core.rs @@ -84,7 +84,7 @@ pub struct Info { impl Hypercore { /// Creates/opens new hypercore using given storage and options pub(crate) async fn new( - mut storage: Storage, + storage: Storage, mut options: HypercoreOptions, ) -> Result { let key_pair: Option = if options.open { @@ -365,7 +365,7 @@ impl Hypercore { /// Read value at given index, if any. #[instrument(err, skip(self))] - pub async fn get(&mut self, index: u64) -> Result>, HypercoreError> { + pub async fn get(&self, index: u64) -> Result>, HypercoreError> { if !self.bitfield.get(index) { #[cfg(feature = "replication")] // if not in this core, emit Event::Get(index) @@ -473,7 +473,7 @@ impl Hypercore { /// Create a proof for given request #[instrument(err, skip_all)] pub async fn create_proof( - &mut self, + &self, block: Option, hash: Option, seek: Option, @@ -599,7 +599,7 @@ impl Hypercore { /// Used to fill the nodes field of a `RequestBlock` during /// synchronization. #[instrument(err, skip(self))] - pub async fn missing_nodes(&mut self, index: u64) -> Result { + pub async fn missing_nodes(&self, index: u64) -> Result { self.missing_nodes_from_merkle_tree_index(index * 2).await } @@ -607,7 +607,7 @@ impl Hypercore { /// that allow for special cases of searching directly from the merkle tree. #[instrument(err, skip(self))] pub async fn missing_nodes_from_merkle_tree_index( - &mut self, + &self, merkle_tree_index: u64, ) -> Result { match self.tree.missing_nodes(merkle_tree_index, None)? { @@ -648,7 +648,7 @@ impl Hypercore { } async fn byte_range( - &mut self, + &self, index: u64, initial_infos: Option<&[StoreInfo]>, ) -> Result { @@ -673,7 +673,7 @@ impl Hypercore { } async fn create_valueless_proof( - &mut self, + &self, block: Option, hash: Option, seek: Option, @@ -713,7 +713,7 @@ impl Hypercore { /// Verify a proof received from a peer. Returns a changeset that should be /// applied. - async fn verify_proof(&mut self, proof: &Proof) -> Result { + async fn verify_proof(&self, proof: &Proof) -> Result { match self.tree.verify_proof(proof, &self.key_pair.public, None)? { Either::Right(value) => Ok(value), Either::Left(instructions) => { @@ -786,7 +786,7 @@ pub(crate) mod tests { #[async_std::test] async fn core_create_proof_block_only() -> Result<(), HypercoreError> { - let mut hypercore = create_hypercore_with_data(10).await?; + let hypercore = create_hypercore_with_data(10).await?; let proof = hypercore .create_proof(Some(RequestBlock { index: 4, nodes: 2 }), None, None, None) @@ -804,7 +804,7 @@ pub(crate) mod tests { #[async_std::test] async fn core_create_proof_block_and_upgrade() -> Result<(), HypercoreError> { - let mut hypercore = create_hypercore_with_data(10).await?; + let hypercore = create_hypercore_with_data(10).await?; let proof = hypercore .create_proof( Some(RequestBlock { index: 4, nodes: 0 }), @@ -835,7 +835,7 @@ pub(crate) mod tests { #[async_std::test] async fn core_create_proof_block_and_upgrade_and_additional() -> Result<(), HypercoreError> { - let mut hypercore = create_hypercore_with_data(10).await?; + let hypercore = create_hypercore_with_data(10).await?; let proof = hypercore .create_proof( Some(RequestBlock { index: 4, nodes: 0 }), @@ -867,7 +867,7 @@ pub(crate) mod tests { #[async_std::test] async fn core_create_proof_block_and_upgrade_from_existing_state() -> Result<(), HypercoreError> { - let mut hypercore = create_hypercore_with_data(10).await?; + let hypercore = create_hypercore_with_data(10).await?; let proof = hypercore .create_proof( Some(RequestBlock { index: 1, nodes: 0 }), @@ -898,7 +898,7 @@ pub(crate) mod tests { #[async_std::test] async fn core_create_proof_block_and_upgrade_from_existing_state_with_additional() -> Result<(), HypercoreError> { - let mut hypercore = create_hypercore_with_data(10).await?; + let hypercore = create_hypercore_with_data(10).await?; let proof = hypercore .create_proof( Some(RequestBlock { index: 1, nodes: 0 }), @@ -929,7 +929,7 @@ pub(crate) mod tests { #[async_std::test] async fn core_create_proof_block_and_seek_1_no_upgrade() -> Result<(), HypercoreError> { - let mut hypercore = create_hypercore_with_data(10).await?; + let hypercore = create_hypercore_with_data(10).await?; let proof = hypercore .create_proof( Some(RequestBlock { index: 4, nodes: 2 }), @@ -951,7 +951,7 @@ pub(crate) mod tests { #[async_std::test] async fn core_create_proof_block_and_seek_2_no_upgrade() -> Result<(), HypercoreError> { - let mut hypercore = create_hypercore_with_data(10).await?; + let hypercore = create_hypercore_with_data(10).await?; let proof = hypercore .create_proof( Some(RequestBlock { index: 4, nodes: 2 }), @@ -973,7 +973,7 @@ pub(crate) mod tests { #[async_std::test] async fn core_create_proof_block_and_seek_3_no_upgrade() -> Result<(), HypercoreError> { - let mut hypercore = create_hypercore_with_data(10).await?; + let hypercore = create_hypercore_with_data(10).await?; let proof = hypercore .create_proof( Some(RequestBlock { index: 4, nodes: 2 }), @@ -997,7 +997,7 @@ pub(crate) mod tests { #[async_std::test] async fn core_create_proof_block_and_seek_to_tree_no_upgrade() -> Result<(), HypercoreError> { - let mut hypercore = create_hypercore_with_data(16).await?; + let hypercore = create_hypercore_with_data(16).await?; let proof = hypercore .create_proof( Some(RequestBlock { index: 0, nodes: 4 }), @@ -1022,7 +1022,7 @@ pub(crate) mod tests { #[async_std::test] async fn core_create_proof_block_and_seek_with_upgrade() -> Result<(), HypercoreError> { - let mut hypercore = create_hypercore_with_data(10).await?; + let hypercore = create_hypercore_with_data(10).await?; let proof = hypercore .create_proof( Some(RequestBlock { index: 4, nodes: 2 }), @@ -1052,7 +1052,7 @@ pub(crate) mod tests { #[async_std::test] async fn core_create_proof_seek_with_upgrade() -> Result<(), HypercoreError> { - let mut hypercore = create_hypercore_with_data(10).await?; + let hypercore = create_hypercore_with_data(10).await?; let proof = hypercore .create_proof( None, @@ -1081,7 +1081,7 @@ pub(crate) mod tests { #[async_std::test] async fn core_verify_proof_invalid_signature() -> Result<(), HypercoreError> { - let mut hypercore = create_hypercore_with_data(10).await?; + let hypercore = create_hypercore_with_data(10).await?; // Invalid clone hypercore with a different public key let mut hypercore_clone = create_hypercore_with_data(0).await?; let proof = hypercore @@ -1107,7 +1107,7 @@ pub(crate) mod tests { #[async_std::test] async fn core_verify_and_apply_proof() -> Result<(), HypercoreError> { - let mut main = create_hypercore_with_data(10).await?; + let main = create_hypercore_with_data(10).await?; let mut clone = create_hypercore_with_data_and_key_pair( 0, PartialKeypair { diff --git a/src/replication/mod.rs b/src/replication/mod.rs index 87eacda9..3d198bbc 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -124,7 +124,7 @@ impl Peer { fn poll_peer( &mut self, - core: &mut Hypercore, + core: &Hypercore, cx: &mut Context<'_>, ) -> Poll> { if let Some(mut fut) = self.pending_open.take() { @@ -186,7 +186,6 @@ impl Stream for Hypercore { type Item = Result<(), HypercoreError>; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - //for peer in self.peers.iter() { // if let Poll::Ready(_) = peer.lock().unwrap().poll_peer(self.deref_mut(), cx) { // cx.waker().wake_by_ref(); diff --git a/src/storage/mod.rs b/src/storage/mod.rs index e42d502a..b812d98e 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -100,7 +100,7 @@ impl Storage { /// Read info from store based on given instruction. Convenience method to `read_infos`. pub(crate) async fn read_info( - &mut self, + &self, info_instruction: StoreInfoInstruction, ) -> Result { let mut infos = self.read_infos_to_vec(&[info_instruction]).await?; @@ -111,7 +111,7 @@ impl Storage { /// Read infos from stores based on given instructions pub(crate) async fn read_infos( - &mut self, + &self, info_instructions: &[StoreInfoInstruction], ) -> Result, HypercoreError> { let infos = self.read_infos_to_vec(info_instructions).await?; @@ -120,7 +120,7 @@ impl Storage { /// Reads infos but retains them as a Vec pub(crate) async fn read_infos_to_vec( - &mut self, + &self, info_instructions: &[StoreInfoInstruction], ) -> Result, HypercoreError> { if info_instructions.is_empty() { @@ -180,21 +180,21 @@ impl Storage { } /// Flush info to storage. Convenience method to `flush_infos`. - pub(crate) async fn flush_info(&mut self, slice: StoreInfo) -> Result<(), HypercoreError> { + pub(crate) async fn flush_info(&self, slice: StoreInfo) -> Result<(), HypercoreError> { self.flush_infos(&[slice]).await } /// Flush infos to storage - pub(crate) async fn flush_infos(&mut self, infos: &[StoreInfo]) -> Result<(), HypercoreError> { + pub(crate) async fn flush_infos(&self, infos: &[StoreInfo]) -> Result<(), HypercoreError> { if infos.is_empty() { return Ok(()); } let mut current_store: Store = infos[0].store.clone(); - let mut storage = self.get_random_access_mut(¤t_store); + let mut storage = self.get_random_access(¤t_store); for info in infos.iter() { if info.store != current_store { current_store = info.store.clone(); - storage = self.get_random_access_mut(¤t_store); + storage = self.get_random_access(¤t_store); } match info.info_type { StoreInfoType::Content => { @@ -230,14 +230,6 @@ impl Storage { Ok(()) } - fn get_random_access_mut(&mut self, store: &Store) -> &mut Box { - match store { - Store::Tree => &mut self.tree, - Store::Data => &mut self.data, - Store::Bitfield => &mut self.bitfield, - Store::Oplog => &mut self.oplog, - } - } fn get_random_access(&self, store: &Store) -> &Box { match store { Store::Tree => &self.tree, diff --git a/tests/core.rs b/tests/core.rs index 09557930..0595dc15 100644 --- a/tests/core.rs +++ b/tests/core.rs @@ -72,7 +72,7 @@ async fn hypercore_make_read_only() -> Result<()> { &write_key_pair.secret.as_ref().unwrap().to_bytes()[16..], )); - let mut hypercore = open_hypercore(&dir.path().to_string_lossy()).await?; + let hypercore = open_hypercore(&dir.path().to_string_lossy()).await?; assert_eq!(&hypercore.get(0).await?.unwrap(), b"Hello"); assert_eq!(&hypercore.get(1).await?.unwrap(), b"World!"); Ok(()) From a79047d088d555a4f769fa4db92451d9ce29127f Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Tue, 24 Feb 2026 15:49:02 -0500 Subject: [PATCH 13/26] unnest --- src/core.rs | 105 +++++++++++++++++++++++++++------------------------- 1 file changed, 54 insertions(+), 51 deletions(-) diff --git a/src/core.rs b/src/core.rs index 5570248a..2dcaf081 100644 --- a/src/core.rs +++ b/src/core.rs @@ -277,6 +277,12 @@ impl Hypercore { self.append_batch(&[data]).await } + fn append_outcome(&self) -> AppendOutcome { + AppendOutcome { + length: self.tree.length, + byte_length: self.tree.byte_length, + } + } /// Appends a given batch of data slices to the hypercore. #[instrument(err, skip_all, fields(batch_len = batch.as_ref().len()))] pub async fn append_batch, B: AsRef<[A]>>( @@ -288,67 +294,64 @@ impl Hypercore { None => return Err(HypercoreError::NotWritable), }; - if !batch.as_ref().is_empty() { - // Create a changeset for the tree - let mut changeset = self.tree.changeset(); - let mut batch_length: usize = 0; - for data in batch.as_ref().iter() { - batch_length += changeset.append(data.as_ref()); - } - changeset.hash_and_sign(secret_key); + if batch.as_ref().is_empty() { + return Ok(self.append_outcome()); + } + // Create a changeset for the tree + let mut changeset = self.tree.changeset(); + let mut batch_length: usize = 0; + for data in batch.as_ref().iter() { + batch_length += changeset.append(data.as_ref()); + } + changeset.hash_and_sign(secret_key); - // Write the received data to the block store - let info = - self.block_store - .append_batch(batch.as_ref(), batch_length, self.tree.byte_length); - self.storage.flush_info(info).await?; + // Write the received data to the block store + let info = + self.block_store + .append_batch(batch.as_ref(), batch_length, self.tree.byte_length); + self.storage.flush_info(info).await?; - // Append the changeset to the Oplog - let bitfield_update = BitfieldUpdate { - drop: false, - start: changeset.ancestors, - length: changeset.batch_length, - }; - let outcome = self.oplog.append_changeset( - &changeset, - Some(bitfield_update.clone()), - false, - &self.header, - )?; - self.storage.flush_infos(&outcome.infos_to_flush).await?; - self.header = outcome.header; + // Append the changeset to the Oplog + let bitfield_update = BitfieldUpdate { + drop: false, + start: changeset.ancestors, + length: changeset.batch_length, + }; + let outcome = self.oplog.append_changeset( + &changeset, + Some(bitfield_update.clone()), + false, + &self.header, + )?; + self.storage.flush_infos(&outcome.infos_to_flush).await?; + self.header = outcome.header; - // Write to bitfield - self.bitfield.update(&bitfield_update); + // Write to bitfield + self.bitfield.update(&bitfield_update); - // Contiguous length is known only now - update_contiguous_length(&mut self.header, &self.bitfield, &bitfield_update); + // Contiguous length is known only now + update_contiguous_length(&mut self.header, &self.bitfield, &bitfield_update); - // Commit changeset to in-memory tree - self.tree.commit(changeset)?; + // Commit changeset to in-memory tree + self.tree.commit(changeset)?; - // Now ready to flush - if self.should_flush_bitfield_and_tree_and_oplog() { - self.flush_bitfield_and_tree_and_oplog(false).await?; - } + // Now ready to flush + if self.should_flush_bitfield_and_tree_and_oplog() { + self.flush_bitfield_and_tree_and_oplog(false).await?; + } - #[cfg(feature = "replication")] - { - use tracing::trace; + #[cfg(feature = "replication")] + { + use tracing::trace; - trace!(bitfield_update = ?bitfield_update, "Hppercore.append_batch emit DataUpgrade & Have"); - let _ = self.events.send(crate::replication::events::DataUpgrade {}); - let _ = self - .events - .send(crate::replication::events::Have::from(&bitfield_update)); - } + trace!(bitfield_update = ?bitfield_update, "Hppercore.append_batch emit DataUpgrade & Have"); + let _ = self.events.send(crate::replication::events::DataUpgrade {}); + let _ = self + .events + .send(crate::replication::events::Have::from(&bitfield_update)); } - // Return the new value - Ok(AppendOutcome { - length: self.tree.length, - byte_length: self.tree.byte_length, - }) + Ok(self.append_outcome()) } #[cfg(feature = "replication")] From ddfeeba233f155e532179494e5b4c5c8e33dfa9d Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Tue, 24 Feb 2026 15:57:45 -0500 Subject: [PATCH 14/26] ignore for now --- src/replication/mod.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/replication/mod.rs b/src/replication/mod.rs index 3d198bbc..a16df414 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -103,7 +103,7 @@ pub trait CoreMethods: CoreInfo { pub struct Peer { protocol: Protocol, - pending_open: Option>>>>, + _pending_open: Option>>>>, } impl std::fmt::Debug for Peer { @@ -118,16 +118,16 @@ impl Peer { fn new(protocol: Protocol) -> Self { Self { protocol, - pending_open: Default::default(), + _pending_open: Default::default(), } } - fn poll_peer( + fn _poll_peer( &mut self, core: &Hypercore, cx: &mut Context<'_>, ) -> Poll> { - if let Some(mut fut) = self.pending_open.take() { + if let Some(mut fut) = self._pending_open.take() { match fut.poll(cx) { Poll::Ready(res) => match res { Ok(_) => { @@ -138,7 +138,7 @@ impl Peer { } }, Poll::Pending => { - _ = self.pending_open.insert(fut); + _ = self._pending_open.insert(fut); return Poll::Pending; } } @@ -155,14 +155,14 @@ impl Peer { hypercore_protocol::Event::Handshake(_) => { if self.protocol.is_initiator() { let key = core.key_pair().public.to_bytes(); - self.pending_open = Some(Box::pin(self.protocol.open(key))); + self._pending_open = Some(Box::pin(self.protocol.open(key))); } } hypercore_protocol::Event::DiscoveryKey(dkey) => { let key = core.key_pair().public.to_bytes(); let this_dkey = discovery_key(&key); if this_dkey == dkey { - self.pending_open = Some(Box::pin(self.protocol.open(key))); + self._pending_open = Some(Box::pin(self.protocol.open(key))); } else { warn!("Got discovery key for different core: {dkey:?}"); } From 14fdd8bd4a482681f80a8c99bd45fd6bea77c0de Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Wed, 25 Feb 2026 17:31:14 -0500 Subject: [PATCH 15/26] Add From for HypercoreError --- src/common/error.rs | 31 ++++++++++++++++++++++++ src/storage/mod.rs | 57 ++++++++++----------------------------------- 2 files changed, 43 insertions(+), 45 deletions(-) diff --git a/src/common/error.rs b/src/common/error.rs index 6f5edcdc..dc53b400 100644 --- a/src/common/error.rs +++ b/src/common/error.rs @@ -1,4 +1,5 @@ use compact_encoding::EncodingError; +use random_access_storage::RandomAccessError; use thiserror::Error; use crate::Store; @@ -80,3 +81,33 @@ impl From for HypercoreError { } } } + +impl From for HypercoreError { + fn from(value: RandomAccessError) -> Self { + map_random_access_err(value) + } +} + +pub(crate) fn map_random_access_err(err: RandomAccessError) -> HypercoreError { + match err { + RandomAccessError::IO { + return_code, + context, + source, + } => HypercoreError::IO { + context: Some(format!( + "RandomAccess IO error. Context: {context:?}, return_code: {return_code:?}", + )), + source, + }, + RandomAccessError::OutOfBounds { + offset, + end, + length, + } => HypercoreError::InvalidOperation { + context: format!( + "RandomAccess out of bounds. Offset: {offset}, end: {end:?}, length: {length}", + ), + }, + } +} diff --git a/src/storage/mod.rs b/src/storage/mod.rs index b812d98e..2def4c2e 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -28,30 +28,6 @@ pub struct Storage { oplog: Box, } -pub(crate) fn map_random_access_err(err: RandomAccessError) -> HypercoreError { - match err { - RandomAccessError::IO { - return_code, - context, - source, - } => HypercoreError::IO { - context: Some(format!( - "RandomAccess IO error. Context: {context:?}, return_code: {return_code:?}", - )), - source, - }, - RandomAccessError::OutOfBounds { - offset, - end, - length, - } => HypercoreError::InvalidOperation { - context: format!( - "RandomAccess out of bounds. Offset: {offset}, end: {end:?}, length: {length}", - ), - }, - } -} - impl Storage { /// Create a new instance. Takes a callback to create new storage instances and overwrite flag. pub async fn open(create: Cb, overwrite: bool) -> Result @@ -66,25 +42,23 @@ impl Storage { >, >, { - let tree = create(Store::Tree).await.map_err(map_random_access_err)?; - let data = create(Store::Data).await.map_err(map_random_access_err)?; - let bitfield = create(Store::Bitfield) - .await - .map_err(map_random_access_err)?; - let oplog = create(Store::Oplog).await.map_err(map_random_access_err)?; + let tree = create(Store::Tree).await?; + let data = create(Store::Data).await?; + let bitfield = create(Store::Bitfield).await?; + let oplog = create(Store::Oplog).await?; if overwrite { if tree.len() > 0 { - tree.truncate(0).await.map_err(map_random_access_err)?; + tree.truncate(0).await?; } if data.len() > 0 { - data.truncate(0).await.map_err(map_random_access_err)?; + data.truncate(0).await?; } if bitfield.len() > 0 { - bitfield.truncate(0).await.map_err(map_random_access_err)?; + bitfield.truncate(0).await?; } if oplog.len() > 0 { - oplog.truncate(0).await.map_err(map_random_access_err)?; + oplog.truncate(0).await?; } } @@ -162,7 +136,7 @@ impl Storage { }) } } - Err(e) => Err(map_random_access_err(e)), + Err(e) => Err(e.into()), }?; infos.push(info); } @@ -200,10 +174,7 @@ impl Storage { StoreInfoType::Content => { if !info.miss { if let Some(data) = &info.data { - storage - .write(info.index, data) - .await - .map_err(map_random_access_err)?; + storage.write(info.index, data).await?; } } else { storage @@ -211,16 +182,12 @@ impl Storage { info.index, info.length.expect("When deleting, length must be given"), ) - .await - .map_err(map_random_access_err)?; + .await?; } } StoreInfoType::Size => { if info.miss { - storage - .truncate(info.index) - .await - .map_err(map_random_access_err)?; + storage.truncate(info.index).await?; } else { panic!("Flushing a size that isn't miss, is not supported"); } From be5fbdd87eb4e8136cfd65768083b9a2e2feb093 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Wed, 25 Feb 2026 17:31:39 -0500 Subject: [PATCH 16/26] Add storage test --- src/storage/mod.rs | 49 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 2def4c2e..ac88adaf 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -240,3 +240,52 @@ impl Storage { Self::open(storage, overwrite).await } } + +#[cfg(test)] +mod test { + use super::*; + use crate::common::{StoreInfo, StoreInfoInstruction}; + + #[tokio::test] + async fn test_storage() -> Result<(), Box> { + let storage = Storage::new_memory().await?; + + let data = b"hello hypercore"; + + // Write to tree store + storage + .flush_info(StoreInfo::new_content(Store::Tree, 0, data)) + .await?; + + // Read it back + let info = storage + .read_info(StoreInfoInstruction::new_content( + Store::Tree, + 0, + data.len() as u64, + )) + .await?; + + assert_eq!(info.data.as_deref(), Some(data.as_slice())); + + // Write to two different stores, read back together + storage + .flush_infos(&[ + StoreInfo::new_content(Store::Data, 0, b"block0"), + StoreInfo::new_content(Store::Bitfield, 0, b"bits"), + ]) + .await?; + + let infos = storage + .read_infos(&[ + StoreInfoInstruction::new_content(Store::Data, 0, 6), + StoreInfoInstruction::new_content(Store::Bitfield, 0, 4), + ]) + .await?; + + assert_eq!(infos[0].data.as_deref(), Some(b"block0".as_slice())); + assert_eq!(infos[1].data.as_deref(), Some(b"bits".as_slice())); + + Ok(()) + } +} From fa282a320a0963ea57de6ad22b02949631b972ee Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Wed, 25 Feb 2026 17:57:50 -0500 Subject: [PATCH 17/26] Make flush_infos take Vec not slice --- src/core.rs | 18 +++++++++++------- src/storage/mod.rs | 6 +++--- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/core.rs b/src/core.rs index 2dcaf081..4fb81a8b 100644 --- a/src/core.rs +++ b/src/core.rs @@ -121,7 +121,7 @@ impl Hypercore { } }; storage - .flush_infos(&oplog_open_outcome.infos_to_flush) + .flush_infos(Vec::from(oplog_open_outcome.infos_to_flush)) .await?; // Open/create tree @@ -323,7 +323,9 @@ impl Hypercore { false, &self.header, )?; - self.storage.flush_infos(&outcome.infos_to_flush).await?; + self.storage + .flush_infos(Vec::from(outcome.infos_to_flush)) + .await?; self.header = outcome.header; // Write to bitfield @@ -411,7 +413,7 @@ impl Hypercore { } // Write to oplog let infos_to_flush = self.oplog.clear(start, end)?; - self.storage.flush_infos(&infos_to_flush).await?; + self.storage.flush_infos(Vec::from(infos_to_flush)).await?; // Set bitfield self.bitfield.set_range(start, end - start, false); @@ -563,7 +565,9 @@ impl Hypercore { false, &self.header, )?; - self.storage.flush_infos(&outcome.infos_to_flush).await?; + self.storage + .flush_infos(Vec::from(outcome.infos_to_flush)) + .await?; self.header = outcome.header; if let Some(bitfield_update) = &bitfield_update { @@ -751,11 +755,11 @@ impl Hypercore { clear_traces: bool, ) -> Result<(), HypercoreError> { let infos = self.bitfield.flush(); - self.storage.flush_infos(&infos).await?; + self.storage.flush_infos(Vec::from(infos)).await?; let infos = self.tree.flush(); - self.storage.flush_infos(&infos).await?; + self.storage.flush_infos(Vec::from(infos)).await?; let infos = self.oplog.flush(&self.header, clear_traces)?; - self.storage.flush_infos(&infos).await?; + self.storage.flush_infos(Vec::from(infos)).await?; Ok(()) } } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index ac88adaf..4244395a 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -155,11 +155,11 @@ impl Storage { /// Flush info to storage. Convenience method to `flush_infos`. pub(crate) async fn flush_info(&self, slice: StoreInfo) -> Result<(), HypercoreError> { - self.flush_infos(&[slice]).await + self.flush_infos(vec![slice]).await } /// Flush infos to storage - pub(crate) async fn flush_infos(&self, infos: &[StoreInfo]) -> Result<(), HypercoreError> { + pub(crate) async fn flush_infos(&self, infos: Vec) -> Result<(), HypercoreError> { if infos.is_empty() { return Ok(()); } @@ -270,7 +270,7 @@ mod test { // Write to two different stores, read back together storage - .flush_infos(&[ + .flush_infos(vec![ StoreInfo::new_content(Store::Data, 0, b"block0"), StoreInfo::new_content(Store::Bitfield, 0, b"bits"), ]) From 659d6a244df3e061d5fe44646eb308924226268e Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Wed, 25 Feb 2026 18:22:11 -0500 Subject: [PATCH 18/26] Make Storage returnc owned futures --- benches/memory.rs | 6 +- src/core.rs | 37 +++++-- src/storage/mod.rs | 249 +++++++++++++++++++++++---------------------- 3 files changed, 160 insertions(+), 132 deletions(-) diff --git a/benches/memory.rs b/benches/memory.rs index e098ccbe..1cb9c73a 100644 --- a/benches/memory.rs +++ b/benches/memory.rs @@ -1,4 +1,4 @@ -use std::time::{Duration, Instant}; +use std::{sync::Arc, time::{Duration, Instant}}; use criterion::{Criterion, black_box, criterion_group, criterion_main}; use hypercore::{Hypercore, HypercoreBuilder, HypercoreError, Storage}; @@ -19,7 +19,7 @@ async fn create_hypercore(page_size: usize) -> Result let storage = Storage::open( |_| { Box::pin(async move { - Ok(Box::new(RandomAccessMemory::new(page_size)) as Box) + Ok(Arc::new(RandomAccessMemory::new(page_size)) as Arc) }) }, false, @@ -38,7 +38,7 @@ async fn create_hypercore(page_size: usize) -> Result let storage = Storage::open( |_| { Box::pin(async move { - Ok(Box::new(RandomAccessMemory::new(page_size)) as Box) + Ok(Arc::new(RandomAccessMemory::new(page_size)) as Arc) }) }, false, diff --git a/src/core.rs b/src/core.rs index 4fb81a8b..e677ebff 100644 --- a/src/core.rs +++ b/src/core.rs @@ -133,7 +133,7 @@ impl Hypercore { )? { Either::Right(value) => value, Either::Left(instructions) => { - let infos = storage.read_infos(&instructions).await?; + let infos = storage.read_infos(Vec::from(instructions)).await?; match MerkleTree::open( &oplog_open_outcome.header.tree, Some(&infos), @@ -196,7 +196,7 @@ impl Hypercore { match tree.truncate(tree_upgrade.length, tree_upgrade.fork, None)? { Either::Right(value) => value, Either::Left(instructions) => { - let infos = storage.read_infos(&instructions).await?; + let infos = storage.read_infos(Vec::from(instructions)).await?; match tree.truncate( tree_upgrade.length, tree_upgrade.fork, @@ -440,7 +440,10 @@ impl Hypercore { let clear_offset = match self.tree.byte_offset(start, None)? { Either::Right(value) => value, Either::Left(instructions) => { - let new_infos = self.storage.read_infos_to_vec(&instructions).await?; + let new_infos = self + .storage + .read_infos_to_vec(Vec::from(instructions)) + .await?; infos.extend(new_infos); match self.tree.byte_offset(start, Some(&infos))? { Either::Right(value) => value, @@ -524,7 +527,10 @@ impl Hypercore { { Either::Right(value) => value, Either::Left(instructions) => { - let infos = self.storage.read_infos_to_vec(&instructions).await?; + let infos = self + .storage + .read_infos_to_vec(Vec::from(instructions)) + .await?; match self.tree.byte_offset_in_changeset( block.index, &changeset, @@ -623,7 +629,11 @@ impl Hypercore { let mut instructions = instructions; let mut infos: Vec = vec![]; loop { - infos.extend(self.storage.read_infos_to_vec(&instructions).await?); + infos.extend( + self.storage + .read_infos_to_vec(Vec::from(instructions)) + .await?, + ); match self.tree.missing_nodes(merkle_tree_index, Some(&infos))? { Either::Right(value) => { return Ok(value); @@ -665,7 +675,11 @@ impl Hypercore { let mut instructions = instructions; let mut infos: Vec = vec![]; loop { - infos.extend(self.storage.read_infos_to_vec(&instructions).await?); + infos.extend( + self.storage + .read_infos_to_vec(Vec::from(instructions)) + .await?, + ); match self.tree.byte_range(index, Some(&infos))? { Either::Right(value) => { return Ok(value); @@ -698,7 +712,11 @@ impl Hypercore { let mut instructions = instructions; let mut infos: Vec = vec![]; loop { - infos.extend(self.storage.read_infos_to_vec(&instructions).await?); + infos.extend( + self.storage + .read_infos_to_vec(Vec::from(instructions)) + .await?, + ); match self.tree.create_valueless_proof( block.as_ref(), hash.as_ref(), @@ -724,7 +742,10 @@ impl Hypercore { match self.tree.verify_proof(proof, &self.key_pair.public, None)? { Either::Right(value) => Ok(value), Either::Left(instructions) => { - let infos = self.storage.read_infos_to_vec(&instructions).await?; + let infos = self + .storage + .read_infos_to_vec(Vec::from(instructions)) + .await?; match self .tree .verify_proof(proof, &self.key_pair.public, Some(&infos))? diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 4244395a..d9def7a6 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -4,10 +4,10 @@ use futures::future::FutureExt; #[cfg(not(target_arch = "wasm32"))] use random_access_disk::RandomAccessDisk; use random_access_memory::RandomAccessMemory; -use random_access_storage::{RandomAccess, RandomAccessError}; -use std::fmt::Debug; +use random_access_storage::{BoxFuture, RandomAccess, RandomAccessError}; #[cfg(not(target_arch = "wasm32"))] use std::path::PathBuf; +use std::{fmt::Debug, sync::Arc}; use tracing::instrument; use crate::{ @@ -16,16 +16,16 @@ use crate::{ }; /// Supertrait for Storage -pub trait StorageTraits: RandomAccess + Debug {} -impl StorageTraits for T {} +pub trait StorageTraits: RandomAccess + Debug + Send + Sync {} +impl StorageTraits for T {} /// Save data to a desired storage backend. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Storage { - tree: Box, - data: Box, - bitfield: Box, - oplog: Box, + tree: Arc, + data: Arc, + bitfield: Arc, + oplog: Arc, } impl Storage { @@ -36,16 +36,15 @@ impl Storage { Store, ) -> std::pin::Pin< Box< - dyn std::future::Future< - Output = Result, RandomAccessError>, - > + Send, + dyn std::future::Future, RandomAccessError>> + + Send, >, >, { - let tree = create(Store::Tree).await?; - let data = create(Store::Data).await?; - let bitfield = create(Store::Bitfield).await?; - let oplog = create(Store::Oplog).await?; + let tree: Arc = create(Store::Tree).await?; + let data: Arc = create(Store::Data).await?; + let bitfield: Arc = create(Store::Bitfield).await?; + let oplog: Arc = create(Store::Oplog).await?; if overwrite { if tree.len() > 0 { @@ -62,142 +61,152 @@ impl Storage { } } - let instance = Self { + Ok(Self { tree, data, bitfield, oplog, - }; - - Ok(instance) + }) } - /// Read info from store based on given instruction. Convenience method to `read_infos`. - pub(crate) async fn read_info( + /// Read info from store based on given instruction. + pub(crate) fn read_info( &self, info_instruction: StoreInfoInstruction, - ) -> Result { - let mut infos = self.read_infos_to_vec(&[info_instruction]).await?; - Ok(infos - .pop() - .expect("Should have gotten one info with one instruction")) + ) -> BoxFuture> { + let fut = self.read_infos_to_vec(vec![info_instruction]); + Box::pin(async move { + Ok(fut + .await? + .pop() + .expect("Should have gotten one info with one instruction")) + }) } /// Read infos from stores based on given instructions - pub(crate) async fn read_infos( + pub(crate) fn read_infos( &self, - info_instructions: &[StoreInfoInstruction], - ) -> Result, HypercoreError> { - let infos = self.read_infos_to_vec(info_instructions).await?; - Ok(infos.into_boxed_slice()) + info_instructions: Vec, + ) -> BoxFuture, HypercoreError>> { + let fut = self.read_infos_to_vec(info_instructions); + Box::pin(async move { Ok(fut.await?.into_boxed_slice()) }) } /// Reads infos but retains them as a Vec - pub(crate) async fn read_infos_to_vec( + pub(crate) fn read_infos_to_vec( &self, - info_instructions: &[StoreInfoInstruction], - ) -> Result, HypercoreError> { - if info_instructions.is_empty() { - return Ok(vec![]); - } - let mut current_store: Store = info_instructions[0].store.clone(); - let mut storage = self.get_random_access(¤t_store); - let mut infos: Vec = Vec::with_capacity(info_instructions.len()); - for instruction in info_instructions.iter() { - if instruction.store != current_store { - current_store = instruction.store.clone(); - storage = self.get_random_access(¤t_store); + info_instructions: Vec, + ) -> BoxFuture, HypercoreError>> { + let storage = self.clone(); + let instructions = info_instructions; // TODO rm + Box::pin(async move { + if instructions.is_empty() { + return Ok(vec![]); } - match instruction.info_type { - StoreInfoType::Content => { - let read_length = match instruction.length { - Some(length) => length, - None => storage.len(), - }; - let read_result = storage.read(instruction.index, read_length).await; - let info: StoreInfo = match read_result { - Ok(buf) => Ok(StoreInfo::new_content( + let mut current_store: Store = instructions[0].store.clone(); + let mut ra: Arc = storage.get_random_access(¤t_store).clone(); + let mut infos: Vec = Vec::with_capacity(instructions.len()); + for instruction in instructions.iter() { + if instruction.store != current_store { + current_store = instruction.store.clone(); + ra = storage.get_random_access(¤t_store).clone(); + } + match instruction.info_type { + StoreInfoType::Content => { + let read_length = match instruction.length { + Some(length) => length, + None => ra.len(), + }; + let read_result = ra.read(instruction.index, read_length).await; + let info: StoreInfo = match read_result { + Ok(buf) => Ok(StoreInfo::new_content( + instruction.store.clone(), + instruction.index, + &buf, + )), + Err(RandomAccessError::OutOfBounds { length, .. }) => { + if instruction.allow_miss { + Ok(StoreInfo::new_content_miss( + instruction.store.clone(), + instruction.index, + )) + } else { + Err(HypercoreError::InvalidOperation { + context: format!( + "Could not read from store {}, index {} / length {} is out of bounds for store length {}", + current_store, instruction.index, read_length, length + ), + }) + } + } + Err(e) => Err(HypercoreError::from(e)), + }?; + infos.push(info); + } + StoreInfoType::Size => { + let length = ra.len(); + infos.push(StoreInfo::new_size( instruction.store.clone(), instruction.index, - &buf, - )), - Err(RandomAccessError::OutOfBounds { length, .. }) => { - if instruction.allow_miss { - Ok(StoreInfo::new_content_miss( - instruction.store.clone(), - instruction.index, - )) - } else { - Err(HypercoreError::InvalidOperation { - context: format!( - "Could not read from store {}, index {} / length {} is out of bounds for store length {}", - current_store, instruction.index, read_length, length - ), - }) - } - } - Err(e) => Err(e.into()), - }?; - infos.push(info); - } - StoreInfoType::Size => { - let length = storage.len(); - infos.push(StoreInfo::new_size( - instruction.store.clone(), - instruction.index, - length - instruction.index, - )); + length - instruction.index, + )); + } } } - } - Ok(infos) + Ok(infos) + }) } - /// Flush info to storage. Convenience method to `flush_infos`. - pub(crate) async fn flush_info(&self, slice: StoreInfo) -> Result<(), HypercoreError> { - self.flush_infos(vec![slice]).await + /// Flush info to storage. + pub(crate) fn flush_info(&self, info: StoreInfo) -> BoxFuture> { + self.flush_infos(vec![info]) } /// Flush infos to storage - pub(crate) async fn flush_infos(&self, infos: Vec) -> Result<(), HypercoreError> { - if infos.is_empty() { - return Ok(()); - } - let mut current_store: Store = infos[0].store.clone(); - let mut storage = self.get_random_access(¤t_store); - for info in infos.iter() { - if info.store != current_store { - current_store = info.store.clone(); - storage = self.get_random_access(¤t_store); + pub(crate) fn flush_infos( + &self, + infos: Vec, + ) -> BoxFuture> { + let storage = self.clone(); + Box::pin(async move { + if infos.is_empty() { + return Ok(()); } - match info.info_type { - StoreInfoType::Content => { - if !info.miss { - if let Some(data) = &info.data { - storage.write(info.index, data).await?; - } - } else { - storage - .del( + let mut current_store: Store = infos[0].store.clone(); + let mut ra: Arc = storage.get_random_access(¤t_store).clone(); + for info in infos.iter() { + if info.store != current_store { + current_store = info.store.clone(); + ra = storage.get_random_access(¤t_store).clone(); + } + match info.info_type { + StoreInfoType::Content => { + if !info.miss { + if let Some(data) = &info.data { + ra.write(info.index, data).await?; + } + } else { + ra.del( info.index, info.length.expect("When deleting, length must be given"), ) .await?; + } } - } - StoreInfoType::Size => { - if info.miss { - storage.truncate(info.index).await?; - } else { - panic!("Flushing a size that isn't miss, is not supported"); + StoreInfoType::Size => { + if info.miss { + ra.truncate(info.index).await?; + } else { + panic!("Flushing a size that isn't miss, is not supported"); + } } } } - } - Ok(()) + Ok(()) + }) } - fn get_random_access(&self, store: &Store) -> &Box { + fn get_random_access(&self, store: &Store) -> &Arc { match store { Store::Tree => &self.tree, Store::Data => &self.data, @@ -210,10 +219,8 @@ impl Storage { #[instrument(err)] pub async fn new_memory() -> Result { let create = |_| { - async { Ok(Box::new(RandomAccessMemory::default()) as Box) } - .boxed() + async { Ok(Arc::new(RandomAccessMemory::default()) as Arc) }.boxed() }; - // No reason to overwrite, as this is a new memory segment Self::open(create, false).await } @@ -231,8 +238,8 @@ impl Storage { Store::Oplog => "oplog", }; Ok( - Box::new(RandomAccessDisk::open(dir.as_path().join(name)).await?) - as Box, + Arc::new(RandomAccessDisk::open(dir.as_path().join(name)).await?) + as Arc, ) } .boxed() @@ -277,7 +284,7 @@ mod test { .await?; let infos = storage - .read_infos(&[ + .read_infos(vec![ StoreInfoInstruction::new_content(Store::Data, 0, 6), StoreInfoInstruction::new_content(Store::Bitfield, 0, 4), ]) From 397670059f99fe404d996cb0c4cd92255759f314 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Thu, 26 Feb 2026 00:55:18 -0500 Subject: [PATCH 19/26] Split core into two parts --- src/core/inner.rs | 407 +++++++++++++++++++++++++ src/{core.rs => core/mod.rs} | 538 +++++++-------------------------- src/replication/shared_core.rs | 2 +- src/storage/mod.rs | 15 + 4 files changed, 531 insertions(+), 431 deletions(-) create mode 100644 src/core/inner.rs rename src/{core.rs => core/mod.rs} (59%) diff --git a/src/core/inner.rs b/src/core/inner.rs new file mode 100644 index 00000000..cda5dedf --- /dev/null +++ b/src/core/inner.rs @@ -0,0 +1,407 @@ +use ed25519_dalek::Signature; +use futures::future::Either; +use tracing::instrument; + +use crate::{ + bitfield::Bitfield, + common::{BitfieldUpdate, HypercoreError, NodeByteRange, StoreInfo, ValuelessProof}, + crypto::{PartialKeypair, generate_signing_key}, + data::BlockStore, + oplog::{Header, MAX_OPLOG_ENTRIES_BYTE_SIZE, Oplog}, + storage::Storage, + tree::{MerkleTree, MerkleTreeChangeset}, +}; +use hypercore_schema::{Proof, RequestBlock, RequestSeek, RequestUpgrade}; + +use super::{AppendOutcome, HypercoreOptions, Info}; + +#[derive(Debug)] +pub(crate) struct HypercoreInner { + pub(crate) key_pair: PartialKeypair, + pub(crate) storage: Storage, + pub(crate) oplog: Oplog, + pub(crate) tree: MerkleTree, + pub(crate) block_store: BlockStore, + pub(crate) bitfield: Bitfield, + pub(crate) skip_flush_count: u8, + pub(crate) header: Header, + #[cfg(feature = "replication")] + pub(crate) events: crate::replication::events::Events, +} + +impl HypercoreInner { + pub(crate) async fn new( + storage: Storage, + mut options: HypercoreOptions, + ) -> Result { + let key_pair: Option = if options.open { + if options.key_pair.is_some() { + return Err(HypercoreError::BadArgument { + context: "Key pair can not be used when building an openable hypercore" + .to_string(), + }); + } + None + } else { + Some(options.key_pair.take().unwrap_or_else(|| { + let signing_key = generate_signing_key(); + PartialKeypair { + public: signing_key.verifying_key(), + secret: Some(signing_key), + } + })) + }; + + // Open/create oplog + let mut oplog_open_outcome = match Oplog::open(&key_pair, None)? { + Either::Right(value) => value, + Either::Left(instruction) => { + let info = storage.read_info(instruction).await?; + match Oplog::open(&key_pair, Some(info))? { + Either::Right(value) => value, + Either::Left(_) => { + return Err(HypercoreError::InvalidOperation { + context: "Could not open oplog".to_string(), + }); + } + } + } + }; + storage + .flush_infos(Vec::from(oplog_open_outcome.infos_to_flush)) + .await?; + + // Open/create tree + let mut tree = match MerkleTree::open( + &oplog_open_outcome.header.tree, + None, + #[cfg(feature = "cache")] + &options.node_cache_options, + )? { + Either::Right(value) => value, + Either::Left(instructions) => { + let infos = storage.read_infos(Vec::from(instructions)).await?; + match MerkleTree::open( + &oplog_open_outcome.header.tree, + Some(&infos), + #[cfg(feature = "cache")] + &options.node_cache_options, + )? { + Either::Right(value) => value, + Either::Left(_) => { + return Err(HypercoreError::InvalidOperation { + context: "Could not open tree".to_string(), + }); + } + } + } + }; + + // Create block store instance + let block_store = BlockStore::default(); + + // Open bitfield + let mut bitfield = match Bitfield::open(None) { + Either::Right(value) => value, + Either::Left(instruction) => { + let info = storage.read_info(instruction).await?; + match Bitfield::open(Some(info)) { + Either::Right(value) => value, + Either::Left(instruction) => { + let info = storage.read_info(instruction).await?; + match Bitfield::open(Some(info)) { + Either::Right(value) => value, + Either::Left(_) => { + return Err(HypercoreError::InvalidOperation { + context: "Could not open bitfield".to_string(), + }); + } + } + } + } + } + }; + + // Process entries stored only to the oplog and not yet flushed into bitfield or tree + if let Some(entries) = oplog_open_outcome.entries { + for entry in entries.iter() { + for node in &entry.tree_nodes { + tree.add_node(node.clone()); + } + + if let Some(bitfield_update) = &entry.bitfield { + bitfield.update(bitfield_update); + update_contiguous_length( + &mut oplog_open_outcome.header, + &bitfield, + bitfield_update, + ); + } + if let Some(tree_upgrade) = &entry.tree_upgrade { + let mut changeset = + match tree.truncate(tree_upgrade.length, tree_upgrade.fork, None)? { + Either::Right(value) => value, + Either::Left(instructions) => { + let infos = + storage.read_infos(Vec::from(instructions)).await?; + match tree.truncate( + tree_upgrade.length, + tree_upgrade.fork, + Some(&infos), + )? { + Either::Right(value) => value, + Either::Left(_) => { + return Err(HypercoreError::InvalidOperation { + context: format!( + "Could not truncate tree to length {}", + tree_upgrade.length + ), + }); + } + } + } + }; + changeset.ancestors = tree_upgrade.ancestors; + changeset.hash = Some(changeset.hash()); + changeset.signature = + Some(Signature::try_from(&*tree_upgrade.signature).map_err(|_| { + HypercoreError::InvalidSignature { + context: "Could not parse changeset signature".to_string(), + } + })?); + + oplog_open_outcome.oplog.update_header_with_changeset( + &changeset, + None, + &mut oplog_open_outcome.header, + )?; + + tree.commit(changeset)?; + } + } + } + + let oplog = oplog_open_outcome.oplog; + let header = oplog_open_outcome.header; + let key_pair = header.key_pair.clone(); + + Ok(Self { + key_pair, + storage, + oplog, + tree, + block_store, + bitfield, + header, + skip_flush_count: 0, + #[cfg(feature = "replication")] + events: crate::replication::events::Events::new(), + }) + } + + pub(crate) fn info(&self) -> Info { + Info { + length: self.tree.length, + byte_length: self.tree.byte_length, + contiguous_length: self.header.hints.contiguous_length, + fork: self.tree.fork, + writeable: self.key_pair.secret.is_some(), + } + } + + pub(crate) fn key_pair(&self) -> &PartialKeypair { + &self.key_pair + } + + #[instrument(ret, skip(self))] + pub(crate) fn has(&self, index: u64) -> bool { + self.bitfield.get(index) + } + + #[cfg(feature = "replication")] + pub(crate) fn event_subscribe( + &self, + ) -> async_broadcast::Receiver { + self.events.channel.new_receiver() + } + + pub(crate) fn append_outcome(&self) -> AppendOutcome { + AppendOutcome { + length: self.tree.length, + byte_length: self.tree.byte_length, + } + } + + pub(crate) fn should_flush_bitfield_and_tree_and_oplog(&mut self) -> bool { + if self.skip_flush_count == 0 + || self.oplog.entries_byte_length >= MAX_OPLOG_ENTRIES_BYTE_SIZE + { + self.skip_flush_count = 3; + true + } else { + self.skip_flush_count -= 1; + false + } + } + + pub(crate) async fn flush_bitfield_and_tree_and_oplog( + &mut self, + clear_traces: bool, + ) -> Result<(), HypercoreError> { + let infos = self.bitfield.flush(); + self.storage.flush_infos(Vec::from(infos)).await?; + let infos = self.tree.flush(); + self.storage.flush_infos(Vec::from(infos)).await?; + let infos = self.oplog.flush(&self.header, clear_traces)?; + self.storage.flush_infos(Vec::from(infos)).await?; + Ok(()) + } + + pub(crate) async fn verify_proof( + &self, + proof: &Proof, + ) -> Result { + match self.tree.verify_proof(proof, &self.key_pair.public, None)? { + Either::Right(value) => Ok(value), + Either::Left(instructions) => { + let infos = self + .storage + .read_infos_to_vec(Vec::from(instructions)) + .await?; + match self + .tree + .verify_proof(proof, &self.key_pair.public, Some(&infos))? + { + Either::Right(value) => Ok(value), + Either::Left(_) => Err(HypercoreError::InvalidOperation { + context: "Could not verify proof from tree".to_string(), + }), + } + } + } + } + + #[instrument(err, skip(self))] + pub(crate) async fn missing_nodes_from_merkle_tree_index( + &self, + merkle_tree_index: u64, + ) -> Result { + match self.tree.missing_nodes(merkle_tree_index, None)? { + Either::Right(value) => Ok(value), + Either::Left(instructions) => { + let mut instructions = instructions; + let mut infos: Vec = vec![]; + loop { + infos.extend( + self.storage + .read_infos_to_vec(Vec::from(instructions)) + .await?, + ); + match self.tree.missing_nodes(merkle_tree_index, Some(&infos))? { + Either::Right(value) => { + return Ok(value); + } + Either::Left(new_instructions) => { + instructions = new_instructions; + } + } + } + } + } + } + + pub(crate) async fn byte_range( + &self, + index: u64, + initial_infos: Option<&[StoreInfo]>, + ) -> Result { + match self.tree.byte_range(index, initial_infos)? { + Either::Right(value) => Ok(value), + Either::Left(instructions) => { + let mut instructions = instructions; + let mut infos: Vec = vec![]; + loop { + infos.extend( + self.storage + .read_infos_to_vec(Vec::from(instructions)) + .await?, + ); + match self.tree.byte_range(index, Some(&infos))? { + Either::Right(value) => { + return Ok(value); + } + Either::Left(new_instructions) => { + instructions = new_instructions; + } + } + } + } + } + } + + pub(crate) async fn create_valueless_proof( + &self, + block: Option, + hash: Option, + seek: Option, + upgrade: Option, + ) -> Result { + match self.tree.create_valueless_proof( + block.as_ref(), + hash.as_ref(), + seek.as_ref(), + upgrade.as_ref(), + None, + )? { + Either::Right(value) => Ok(value), + Either::Left(instructions) => { + let mut instructions = instructions; + let mut infos: Vec = vec![]; + loop { + infos.extend( + self.storage + .read_infos_to_vec(Vec::from(instructions)) + .await?, + ); + match self.tree.create_valueless_proof( + block.as_ref(), + hash.as_ref(), + seek.as_ref(), + upgrade.as_ref(), + Some(&infos), + )? { + Either::Right(value) => { + return Ok(value); + } + Either::Left(new_instructions) => { + instructions = new_instructions; + } + } + } + } + } + } +} + +pub(crate) fn update_contiguous_length( + header: &mut Header, + bitfield: &Bitfield, + bitfield_update: &BitfieldUpdate, +) { + let end = bitfield_update.start + bitfield_update.length; + let mut c = header.hints.contiguous_length; + if bitfield_update.drop { + if c <= end && c > bitfield_update.start { + c = bitfield_update.start; + } + } else if c <= end && c >= bitfield_update.start { + c = end; + while bitfield.get(c) { + c += 1; + } + } + + if c != header.hints.contiguous_length { + header.hints.contiguous_length = c; + } +} diff --git a/src/core.rs b/src/core/mod.rs similarity index 59% rename from src/core.rs rename to src/core/mod.rs index e677ebff..6b473851 100644 --- a/src/core.rs +++ b/src/core/mod.rs @@ -1,7 +1,7 @@ //! Hypercore's main abstraction. Exposes an append-only, secure log structure. -use ed25519_dalek::Signature; +mod inner; + use futures::future::Either; -use std::fmt::Debug; #[cfg(feature = "replication")] use std::sync::Mutex; use tracing::instrument; @@ -9,17 +9,15 @@ use tracing::instrument; #[cfg(feature = "cache")] use crate::common::cache::CacheOptions; use crate::{ - bitfield::Bitfield, - common::{BitfieldUpdate, HypercoreError, NodeByteRange, StoreInfo, ValuelessProof}, - crypto::{PartialKeypair, generate_signing_key}, - data::BlockStore, - oplog::{Header, MAX_OPLOG_ENTRIES_BYTE_SIZE, Oplog}, + common::{BitfieldUpdate, HypercoreError, StoreInfo}, + crypto::PartialKeypair, storage::Storage, - tree::{MerkleTree, MerkleTreeChangeset}, }; - use hypercore_schema::{Proof, RequestBlock, RequestSeek, RequestUpgrade}; +pub(crate) use inner::HypercoreInner; +use inner::update_contiguous_length; + #[derive(Debug)] pub(crate) struct HypercoreOptions { pub(crate) key_pair: Option, @@ -42,16 +40,7 @@ impl HypercoreOptions { /// Hypercore is an append-only log structure. #[derive(Debug)] pub struct Hypercore { - pub(crate) key_pair: PartialKeypair, - pub(crate) storage: Storage, - pub(crate) oplog: Oplog, - pub(crate) tree: MerkleTree, - pub(crate) block_store: BlockStore, - pub(crate) bitfield: Bitfield, - skip_flush_count: u8, // autoFlush in Javascript - header: Header, - #[cfg(feature = "replication")] - events: crate::replication::events::Events, + pub(crate) inner: HypercoreInner, #[cfg(feature = "replication")] pub(crate) peers: Vec>, } @@ -85,176 +74,10 @@ impl Hypercore { /// Creates/opens new hypercore using given storage and options pub(crate) async fn new( storage: Storage, - mut options: HypercoreOptions, + options: HypercoreOptions, ) -> Result { - let key_pair: Option = if options.open { - if options.key_pair.is_some() { - return Err(HypercoreError::BadArgument { - context: "Key pair can not be used when building an openable hypercore" - .to_string(), - }); - } - None - } else { - Some(options.key_pair.take().unwrap_or_else(|| { - let signing_key = generate_signing_key(); - PartialKeypair { - public: signing_key.verifying_key(), - secret: Some(signing_key), - } - })) - }; - - // Open/create oplog - let mut oplog_open_outcome = match Oplog::open(&key_pair, None)? { - Either::Right(value) => value, - Either::Left(instruction) => { - let info = storage.read_info(instruction).await?; - match Oplog::open(&key_pair, Some(info))? { - Either::Right(value) => value, - Either::Left(_) => { - return Err(HypercoreError::InvalidOperation { - context: "Could not open oplog".to_string(), - }); - } - } - } - }; - storage - .flush_infos(Vec::from(oplog_open_outcome.infos_to_flush)) - .await?; - - // Open/create tree - let mut tree = match MerkleTree::open( - &oplog_open_outcome.header.tree, - None, - #[cfg(feature = "cache")] - &options.node_cache_options, - )? { - Either::Right(value) => value, - Either::Left(instructions) => { - let infos = storage.read_infos(Vec::from(instructions)).await?; - match MerkleTree::open( - &oplog_open_outcome.header.tree, - Some(&infos), - #[cfg(feature = "cache")] - &options.node_cache_options, - )? { - Either::Right(value) => value, - Either::Left(_) => { - return Err(HypercoreError::InvalidOperation { - context: "Could not open tree".to_string(), - }); - } - } - } - }; - - // Create block store instance - let block_store = BlockStore::default(); - - // Open bitfield - let mut bitfield = match Bitfield::open(None) { - Either::Right(value) => value, - Either::Left(instruction) => { - let info = storage.read_info(instruction).await?; - match Bitfield::open(Some(info)) { - Either::Right(value) => value, - Either::Left(instruction) => { - let info = storage.read_info(instruction).await?; - match Bitfield::open(Some(info)) { - Either::Right(value) => value, - Either::Left(_) => { - return Err(HypercoreError::InvalidOperation { - context: "Could not open bitfield".to_string(), - }); - } - } - } - } - } - }; - - // Process entries stored only to the oplog and not yet flushed into bitfield or tree - if let Some(entries) = oplog_open_outcome.entries { - for entry in entries.iter() { - for node in &entry.tree_nodes { - tree.add_node(node.clone()); - } - - if let Some(bitfield_update) = &entry.bitfield { - bitfield.update(bitfield_update); - update_contiguous_length( - &mut oplog_open_outcome.header, - &bitfield, - bitfield_update, - ); - } - if let Some(tree_upgrade) = &entry.tree_upgrade { - // TODO: Generalize Either response stack - let mut changeset = - match tree.truncate(tree_upgrade.length, tree_upgrade.fork, None)? { - Either::Right(value) => value, - Either::Left(instructions) => { - let infos = storage.read_infos(Vec::from(instructions)).await?; - match tree.truncate( - tree_upgrade.length, - tree_upgrade.fork, - Some(&infos), - )? { - Either::Right(value) => value, - Either::Left(_) => { - return Err(HypercoreError::InvalidOperation { - context: format!( - "Could not truncate tree to length {}", - tree_upgrade.length - ), - }); - } - } - } - }; - changeset.ancestors = tree_upgrade.ancestors; - changeset.hash = Some(changeset.hash()); - changeset.signature = - Some(Signature::try_from(&*tree_upgrade.signature).map_err(|_| { - HypercoreError::InvalidSignature { - context: "Could not parse changeset signature".to_string(), - } - })?); - - // Update the header with this changeset to make in-memory value match that - // of the stored value. - oplog_open_outcome.oplog.update_header_with_changeset( - &changeset, - None, - &mut oplog_open_outcome.header, - )?; - - // TODO: Skip reorg hints for now, seems to only have to do with replication - // addReorgHint(header.hints.reorgs, tree, batch) - - // Commit changeset to in-memory tree - tree.commit(changeset)?; - } - } - } - - let oplog = oplog_open_outcome.oplog; - let header = oplog_open_outcome.header; - let key_pair = header.key_pair.clone(); - Ok(Hypercore { - key_pair, - storage, - oplog, - tree, - block_store, - bitfield, - header, - skip_flush_count: 0, - #[cfg(feature = "replication")] - events: crate::replication::events::Events::new(), + inner: HypercoreInner::new(storage, options).await?, #[cfg(feature = "replication")] peers: Default::default(), }) @@ -262,13 +85,7 @@ impl Hypercore { /// Gets basic info about the Hypercore pub fn info(&self) -> Info { - Info { - length: self.tree.length, - byte_length: self.tree.byte_length, - contiguous_length: self.header.hints.contiguous_length, - fork: self.tree.fork, - writeable: self.key_pair.secret.is_some(), - } + self.inner.info() } /// Appends a data slice to the hypercore. @@ -277,28 +94,22 @@ impl Hypercore { self.append_batch(&[data]).await } - fn append_outcome(&self) -> AppendOutcome { - AppendOutcome { - length: self.tree.length, - byte_length: self.tree.byte_length, - } - } /// Appends a given batch of data slices to the hypercore. #[instrument(err, skip_all, fields(batch_len = batch.as_ref().len()))] pub async fn append_batch, B: AsRef<[A]>>( &mut self, batch: B, ) -> Result { - let secret_key = match &self.key_pair.secret { + let secret_key = match &self.inner.key_pair.secret { Some(key) => key, None => return Err(HypercoreError::NotWritable), }; if batch.as_ref().is_empty() { - return Ok(self.append_outcome()); + return Ok(self.inner.append_outcome()); } // Create a changeset for the tree - let mut changeset = self.tree.changeset(); + let mut changeset = self.inner.tree.changeset(); let mut batch_length: usize = 0; for data in batch.as_ref().iter() { batch_length += changeset.append(data.as_ref()); @@ -306,10 +117,11 @@ impl Hypercore { changeset.hash_and_sign(secret_key); // Write the received data to the block store - let info = - self.block_store - .append_batch(batch.as_ref(), batch_length, self.tree.byte_length); - self.storage.flush_info(info).await?; + let info = self + .inner + .block_store + .append_batch(batch.as_ref(), batch_length, self.inner.tree.byte_length); + self.inner.storage.flush_info(info).await?; // Append the changeset to the Oplog let bitfield_update = BitfieldUpdate { @@ -317,29 +129,34 @@ impl Hypercore { start: changeset.ancestors, length: changeset.batch_length, }; - let outcome = self.oplog.append_changeset( + let outcome = self.inner.oplog.append_changeset( &changeset, Some(bitfield_update.clone()), false, - &self.header, + &self.inner.header, )?; - self.storage + self.inner + .storage .flush_infos(Vec::from(outcome.infos_to_flush)) .await?; - self.header = outcome.header; + self.inner.header = outcome.header; // Write to bitfield - self.bitfield.update(&bitfield_update); + self.inner.bitfield.update(&bitfield_update); // Contiguous length is known only now - update_contiguous_length(&mut self.header, &self.bitfield, &bitfield_update); + update_contiguous_length( + &mut self.inner.header, + &self.inner.bitfield, + &bitfield_update, + ); // Commit changeset to in-memory tree - self.tree.commit(changeset)?; + self.inner.tree.commit(changeset)?; // Now ready to flush - if self.should_flush_bitfield_and_tree_and_oplog() { - self.flush_bitfield_and_tree_and_oplog(false).await?; + if self.inner.should_flush_bitfield_and_tree_and_oplog() { + self.inner.flush_bitfield_and_tree_and_oplog(false).await?; } #[cfg(feature = "replication")] @@ -347,50 +164,54 @@ impl Hypercore { use tracing::trace; trace!(bitfield_update = ?bitfield_update, "Hppercore.append_batch emit DataUpgrade & Have"); - let _ = self.events.send(crate::replication::events::DataUpgrade {}); let _ = self + .inner + .events + .send(crate::replication::events::DataUpgrade {}); + let _ = self + .inner .events .send(crate::replication::events::Have::from(&bitfield_update)); } - Ok(self.append_outcome()) + Ok(self.inner.append_outcome()) } #[cfg(feature = "replication")] /// Subscribe to core events relevant to replication pub fn event_subscribe(&self) -> async_broadcast::Receiver { - self.events.channel.new_receiver() + self.inner.event_subscribe() } /// Check if core has the block at the given `index` locally #[instrument(ret, skip(self))] pub fn has(&self, index: u64) -> bool { - self.bitfield.get(index) + self.inner.has(index) } /// Read value at given index, if any. #[instrument(err, skip(self))] pub async fn get(&self, index: u64) -> Result>, HypercoreError> { - if !self.bitfield.get(index) { + if !self.inner.bitfield.get(index) { #[cfg(feature = "replication")] // if not in this core, emit Event::Get(index) { use tracing::trace; trace!(index = index, "Hppercore emit 'get' event"); - self.events.send_on_get(index); + self.inner.events.send_on_get(index); } return Ok(None); } - let byte_range = self.byte_range(index, None).await?; + let byte_range = self.inner.byte_range(index, None).await?; // TODO: Generalize Either response stack - let data = match self.block_store.read(&byte_range, None) { + let data = match self.inner.block_store.read(&byte_range, None) { Either::Right(value) => value, Either::Left(instruction) => { - let info = self.storage.read_info(instruction).await?; - match self.block_store.read(&byte_range, Some(info)) { + let info = self.inner.storage.read_info(instruction).await?; + match self.inner.block_store.read(&byte_range, Some(info)) { Either::Right(value) => value, Either::Left(_) => { return Err(HypercoreError::InvalidOperation { @@ -412,40 +233,44 @@ impl Hypercore { return Ok(()); } // Write to oplog - let infos_to_flush = self.oplog.clear(start, end)?; - self.storage.flush_infos(Vec::from(infos_to_flush)).await?; + let infos_to_flush = self.inner.oplog.clear(start, end)?; + self.inner + .storage + .flush_infos(Vec::from(infos_to_flush)) + .await?; // Set bitfield - self.bitfield.set_range(start, end - start, false); + self.inner.bitfield.set_range(start, end - start, false); // Set contiguous length - if start < self.header.hints.contiguous_length { - self.header.hints.contiguous_length = start; + if start < self.inner.header.hints.contiguous_length { + self.inner.header.hints.contiguous_length = start; } // Find the biggest hole that can be punched into the data - let start = if let Some(index) = self.bitfield.last_index_of(true, start) { + let start = if let Some(index) = self.inner.bitfield.last_index_of(true, start) { index + 1 } else { 0 }; - let end = if let Some(index) = self.bitfield.index_of(true, end) { + let end = if let Some(index) = self.inner.bitfield.index_of(true, end) { index } else { - self.tree.length + self.inner.tree.length }; // Find byte offset for first value let mut infos: Vec = Vec::new(); - let clear_offset = match self.tree.byte_offset(start, None)? { + let clear_offset = match self.inner.tree.byte_offset(start, None)? { Either::Right(value) => value, Either::Left(instructions) => { let new_infos = self + .inner .storage .read_infos_to_vec(Vec::from(instructions)) .await?; infos.extend(new_infos); - match self.tree.byte_offset(start, Some(&infos))? { + match self.inner.tree.byte_offset(start, Some(&infos))? { Either::Right(value) => value, Either::Left(_) => { return Err(HypercoreError::InvalidOperation { @@ -457,17 +282,17 @@ impl Hypercore { }; // Find byte range for last value - let last_byte_range = self.byte_range(end - 1, Some(&infos)).await?; + let last_byte_range = self.inner.byte_range(end - 1, Some(&infos)).await?; let clear_length = (last_byte_range.index + last_byte_range.length) - clear_offset; // Clear blocks - let info_to_flush = self.block_store.clear(clear_offset, clear_length); - self.storage.flush_info(info_to_flush).await?; + let info_to_flush = self.inner.block_store.clear(clear_offset, clear_length); + self.inner.storage.flush_info(info_to_flush).await?; // Now ready to flush - if self.should_flush_bitfield_and_tree_and_oplog() { - self.flush_bitfield_and_tree_and_oplog(false).await?; + if self.inner.should_flush_bitfield_and_tree_and_oplog() { + self.inner.flush_bitfield_and_tree_and_oplog(false).await?; } Ok(()) @@ -475,7 +300,7 @@ impl Hypercore { /// Access the key pair. pub fn key_pair(&self) -> &PartialKeypair { - &self.key_pair + self.inner.key_pair() } /// Create a proof for given request @@ -488,6 +313,7 @@ impl Hypercore { upgrade: Option, ) -> Result, HypercoreError> { let valueless_proof = self + .inner .create_valueless_proof(block, hash, seek, upgrade) .await?; let value: Option> = if let Some(block) = valueless_proof.block.as_ref() { @@ -508,11 +334,11 @@ impl Hypercore { /// possible to apply. #[instrument(skip_all)] pub async fn verify_and_apply_proof(&mut self, proof: &Proof) -> Result { - if proof.fork != self.tree.fork { + if proof.fork != self.inner.tree.fork { return Ok(false); } - let changeset = self.verify_proof(proof).await?; - if !self.tree.commitable(&changeset) { + let changeset = self.inner.verify_proof(proof).await?; + if !self.inner.tree.commitable(&changeset) { return Ok(false); } @@ -522,16 +348,18 @@ impl Hypercore { let bitfield_update: Option = if let Some(block) = &proof.block.as_ref() { let byte_offset = match self + .inner .tree .byte_offset_in_changeset(block.index, &changeset, None)? { Either::Right(value) => value, Either::Left(instructions) => { let infos = self + .inner .storage .read_infos_to_vec(Vec::from(instructions)) .await?; - match self.tree.byte_offset_in_changeset( + match self.inner.tree.byte_offset_in_changeset( block.index, &changeset, Some(&infos), @@ -550,8 +378,8 @@ impl Hypercore { }; // Write the value to the block store - let info_to_flush = self.block_store.put(&block.value, byte_offset); - self.storage.flush_info(info_to_flush).await?; + let info_to_flush = self.inner.block_store.put(&block.value, byte_offset); + self.inner.storage.flush_info(info_to_flush).await?; // Return a bitfield update for the given value Some(BitfieldUpdate { @@ -565,43 +393,52 @@ impl Hypercore { }; // Append the changeset to the Oplog - let outcome = self.oplog.append_changeset( + let outcome = self.inner.oplog.append_changeset( &changeset, bitfield_update.clone(), false, - &self.header, + &self.inner.header, )?; - self.storage + self.inner + .storage .flush_infos(Vec::from(outcome.infos_to_flush)) .await?; - self.header = outcome.header; + self.inner.header = outcome.header; if let Some(bitfield_update) = &bitfield_update { // Write to bitfield - self.bitfield.update(bitfield_update); + self.inner.bitfield.update(bitfield_update); // Contiguous length is known only now - update_contiguous_length(&mut self.header, &self.bitfield, bitfield_update); + update_contiguous_length( + &mut self.inner.header, + &self.inner.bitfield, + bitfield_update, + ); } // Commit changeset to in-memory tree - self.tree.commit(changeset)?; + self.inner.tree.commit(changeset)?; // Now ready to flush - if self.should_flush_bitfield_and_tree_and_oplog() { - self.flush_bitfield_and_tree_and_oplog(false).await?; + if self.inner.should_flush_bitfield_and_tree_and_oplog() { + self.inner.flush_bitfield_and_tree_and_oplog(false).await?; } #[cfg(feature = "replication")] { if proof.upgrade.is_some() { // Notify replicator if we receieved an upgrade - let _ = self.events.send(crate::replication::events::DataUpgrade {}); + let _ = self + .inner + .events + .send(crate::replication::events::DataUpgrade {}); } // Notify replicator if we receieved a bitfield update if let Some(ref bitfield) = bitfield_update { let _ = self + .inner .events .send(crate::replication::events::Have::from(bitfield)); } @@ -613,38 +450,21 @@ impl Hypercore { /// synchronization. #[instrument(err, skip(self))] pub async fn missing_nodes(&self, index: u64) -> Result { - self.missing_nodes_from_merkle_tree_index(index * 2).await + self.inner + .missing_nodes_from_merkle_tree_index(index * 2) + .await } - /// Get missing nodes using a merkle tree index. Advanced variant of missing_nodex + /// Get missing nodes using a merkle tree index. Advanced variant of missing_nodes /// that allow for special cases of searching directly from the merkle tree. #[instrument(err, skip(self))] pub async fn missing_nodes_from_merkle_tree_index( &self, merkle_tree_index: u64, ) -> Result { - match self.tree.missing_nodes(merkle_tree_index, None)? { - Either::Right(value) => Ok(value), - Either::Left(instructions) => { - let mut instructions = instructions; - let mut infos: Vec = vec![]; - loop { - infos.extend( - self.storage - .read_infos_to_vec(Vec::from(instructions)) - .await?, - ); - match self.tree.missing_nodes(merkle_tree_index, Some(&infos))? { - Either::Right(value) => { - return Ok(value); - } - Either::Left(new_instructions) => { - instructions = new_instructions; - } - } - } - } - } + self.inner + .missing_nodes_from_merkle_tree_index(merkle_tree_index) + .await } /// Makes the hypercore read-only by deleting the secret key. Returns true if the @@ -653,164 +473,22 @@ impl Hypercore { /// been stored. #[instrument(err, skip_all)] pub async fn make_read_only(&mut self) -> Result { - if self.key_pair.secret.is_some() { - self.key_pair.secret = None; - self.header.key_pair.secret = None; + if self.inner.key_pair.secret.is_some() { + self.inner.key_pair.secret = None; + self.inner.header.key_pair.secret = None; // Need to flush clearing traces to make sure both oplog slots are cleared - self.flush_bitfield_and_tree_and_oplog(true).await?; + self.inner.flush_bitfield_and_tree_and_oplog(true).await?; Ok(true) } else { Ok(false) } } - - async fn byte_range( - &self, - index: u64, - initial_infos: Option<&[StoreInfo]>, - ) -> Result { - match self.tree.byte_range(index, initial_infos)? { - Either::Right(value) => Ok(value), - Either::Left(instructions) => { - let mut instructions = instructions; - let mut infos: Vec = vec![]; - loop { - infos.extend( - self.storage - .read_infos_to_vec(Vec::from(instructions)) - .await?, - ); - match self.tree.byte_range(index, Some(&infos))? { - Either::Right(value) => { - return Ok(value); - } - Either::Left(new_instructions) => { - instructions = new_instructions; - } - } - } - } - } - } - - async fn create_valueless_proof( - &self, - block: Option, - hash: Option, - seek: Option, - upgrade: Option, - ) -> Result { - match self.tree.create_valueless_proof( - block.as_ref(), - hash.as_ref(), - seek.as_ref(), - upgrade.as_ref(), - None, - )? { - Either::Right(value) => Ok(value), - Either::Left(instructions) => { - let mut instructions = instructions; - let mut infos: Vec = vec![]; - loop { - infos.extend( - self.storage - .read_infos_to_vec(Vec::from(instructions)) - .await?, - ); - match self.tree.create_valueless_proof( - block.as_ref(), - hash.as_ref(), - seek.as_ref(), - upgrade.as_ref(), - Some(&infos), - )? { - Either::Right(value) => { - return Ok(value); - } - Either::Left(new_instructions) => { - instructions = new_instructions; - } - } - } - } - } - } - - /// Verify a proof received from a peer. Returns a changeset that should be - /// applied. - async fn verify_proof(&self, proof: &Proof) -> Result { - match self.tree.verify_proof(proof, &self.key_pair.public, None)? { - Either::Right(value) => Ok(value), - Either::Left(instructions) => { - let infos = self - .storage - .read_infos_to_vec(Vec::from(instructions)) - .await?; - match self - .tree - .verify_proof(proof, &self.key_pair.public, Some(&infos))? - { - Either::Right(value) => Ok(value), - Either::Left(_) => Err(HypercoreError::InvalidOperation { - context: "Could not verify proof from tree".to_string(), - }), - } - } - } - } - - fn should_flush_bitfield_and_tree_and_oplog(&mut self) -> bool { - if self.skip_flush_count == 0 - || self.oplog.entries_byte_length >= MAX_OPLOG_ENTRIES_BYTE_SIZE - { - self.skip_flush_count = 3; - true - } else { - self.skip_flush_count -= 1; - false - } - } - - async fn flush_bitfield_and_tree_and_oplog( - &mut self, - clear_traces: bool, - ) -> Result<(), HypercoreError> { - let infos = self.bitfield.flush(); - self.storage.flush_infos(Vec::from(infos)).await?; - let infos = self.tree.flush(); - self.storage.flush_infos(Vec::from(infos)).await?; - let infos = self.oplog.flush(&self.header, clear_traces)?; - self.storage.flush_infos(Vec::from(infos)).await?; - Ok(()) - } -} - -fn update_contiguous_length( - header: &mut Header, - bitfield: &Bitfield, - bitfield_update: &BitfieldUpdate, -) { - let end = bitfield_update.start + bitfield_update.length; - let mut c = header.hints.contiguous_length; - if bitfield_update.drop { - if c <= end && c > bitfield_update.start { - c = bitfield_update.start; - } - } else if c <= end && c >= bitfield_update.start { - c = end; - while bitfield.get(c) { - c += 1; - } - } - - if c != header.hints.contiguous_length { - header.hints.contiguous_length = c; - } } #[cfg(test)] pub(crate) mod tests { use super::*; + use crate::crypto::{PartialKeypair, generate_signing_key}; #[async_std::test] async fn core_create_proof_block_only() -> Result<(), HypercoreError> { @@ -1139,7 +817,7 @@ pub(crate) mod tests { let mut clone = create_hypercore_with_data_and_key_pair( 0, PartialKeypair { - public: main.key_pair.public, + public: main.inner.key_pair.public, secret: None, }, ) diff --git a/src/replication/shared_core.rs b/src/replication/shared_core.rs index 80418062..2d54e781 100644 --- a/src/replication/shared_core.rs +++ b/src/replication/shared_core.rs @@ -146,7 +146,7 @@ mod tests { let clone = create_hypercore_with_data_and_key_pair( 0, PartialKeypair { - public: main.key_pair.public, + public: main.inner.key_pair.public, secret: None, }, ) diff --git a/src/storage/mod.rs b/src/storage/mod.rs index d9def7a6..aeab3b18 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -253,6 +253,21 @@ mod test { use super::*; use crate::common::{StoreInfo, StoreInfoInstruction}; + #[tokio::test] + async fn test_futures_are_owned() -> Result<(), Box> { + let fut = { + let storage = Storage::new_memory().await?; + + let data = b"hello hypercore"; + + // Write to tree store + storage.flush_info(StoreInfo::new_content(Store::Tree, 0, data)) + }; + fut.await?; + + Ok(()) + } + #[tokio::test] async fn test_storage() -> Result<(), Box> { let storage = Storage::new_memory().await?; From fb0a1806fa5ccbba5a8c6fdcbfb1bde4fc3d10bd Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Fri, 27 Feb 2026 18:31:52 -0500 Subject: [PATCH 20/26] impl create_valueless_proof on Inner2 --- src/core/inner.rs | 126 +++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 114 insertions(+), 12 deletions(-) diff --git a/src/core/inner.rs b/src/core/inner.rs index cda5dedf..c5e10314 100644 --- a/src/core/inner.rs +++ b/src/core/inner.rs @@ -1,10 +1,22 @@ +use std::{ + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + use ed25519_dalek::Signature; use futures::future::Either; +use random_access_storage::BoxFuture; +use std::sync::Mutex; use tracing::instrument; use crate::{ bitfield::Bitfield, - common::{BitfieldUpdate, HypercoreError, NodeByteRange, StoreInfo, ValuelessProof}, + common::{ + BitfieldUpdate, HypercoreError, NodeByteRange, StoreInfo, + ValuelessProof, + }, crypto::{PartialKeypair, generate_signing_key}, data::BlockStore, oplog::{Header, MAX_OPLOG_ENTRIES_BYTE_SIZE, Oplog}, @@ -142,8 +154,7 @@ impl HypercoreInner { match tree.truncate(tree_upgrade.length, tree_upgrade.fork, None)? { Either::Right(value) => value, Either::Left(instructions) => { - let infos = - storage.read_infos(Vec::from(instructions)).await?; + let infos = storage.read_infos(Vec::from(instructions)).await?; match tree.truncate( tree_upgrade.length, tree_upgrade.fork, @@ -244,17 +255,19 @@ impl HypercoreInner { } } - pub(crate) async fn flush_bitfield_and_tree_and_oplog( + pub(crate) fn flush_bitfield_and_tree_and_oplog( &mut self, clear_traces: bool, - ) -> Result<(), HypercoreError> { - let infos = self.bitfield.flush(); - self.storage.flush_infos(Vec::from(infos)).await?; - let infos = self.tree.flush(); - self.storage.flush_infos(Vec::from(infos)).await?; - let infos = self.oplog.flush(&self.header, clear_traces)?; - self.storage.flush_infos(Vec::from(infos)).await?; - Ok(()) + ) -> BoxFuture> { + let mut infos = vec![]; + infos.extend(self.bitfield.flush()); + infos.extend(self.tree.flush()); + match self.oplog.flush(&self.header, clear_traces) { + Ok(opinfo) => infos.extend(opinfo), + Err(e) => return Box::pin(async { Err(e) }), + } + + self.storage.flush_infos(infos) } pub(crate) async fn verify_proof( @@ -383,6 +396,95 @@ impl HypercoreInner { } } +pub(crate) struct Inner2 { + pub(crate) inner: Arc>, +} + +impl Inner2 { + pub(crate) fn create_valueless_proof( + &self, + block: Option, + hash: Option, + seek: Option, + upgrade: Option, + ) -> ValuelessProofFuture { + ValuelessProofFuture { + inner: self.inner.clone(), + block, + hash, + seek, + upgrade, + infos: Vec::new(), + pending_read: None, + } + } +} + +pub(crate) struct ValuelessProofFuture { + inner: Arc>, + block: Option, + hash: Option, + seek: Option, + upgrade: Option, + infos: Vec, + pending_read: Option, HypercoreError>>>, +} + +impl Future for ValuelessProofFuture { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // ValuelessProofFuture is Unpin (all fields are Unpin), so this is safe. + let this = self.get_mut(); + + loop { + // Phase 1: if there's a pending storage read, drive it to completion. + if let Some(fut) = this.pending_read.as_mut() { + match fut.as_mut().poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Ready(Ok(new_infos)) => { + this.infos.extend(new_infos); + this.pending_read = None; + // Fall through to retry create_valueless_proof. + } + } + } + + // Phase 2: call tree.create_valueless_proof synchronously under the lock. + let result = { + let inner = this.inner.lock().unwrap(); + let infos_opt = if this.infos.is_empty() { + None + } else { + Some(this.infos.as_slice()) + }; + inner.tree.create_valueless_proof( + this.block.as_ref(), + this.hash.as_ref(), + this.seek.as_ref(), + this.upgrade.as_ref(), + infos_opt, + ) + // Lock is dropped here. + }; + + match result { + Err(e) => return Poll::Ready(Err(e)), + Ok(Either::Right(value)) => return Poll::Ready(Ok(value)), + Ok(Either::Left(instructions)) => { + // Need more nodes from storage. Clone storage (cheap Arc clone) + // outside the lock so we don't hold it across the async read. + let storage = this.inner.lock().unwrap().storage.clone(); + this.pending_read = + Some(storage.read_infos_to_vec(Vec::from(instructions))); + // Loop to poll the new future immediately. + } + } + } + } +} + pub(crate) fn update_contiguous_length( header: &mut Header, bitfield: &Bitfield, From f5253a3d90de381d81f288690721930e86f12fa2 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Mon, 2 Mar 2026 13:39:30 -0500 Subject: [PATCH 21/26] add Inner2.verify_proof --- src/core/inner.rs | 67 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/src/core/inner.rs b/src/core/inner.rs index c5e10314..108f8f97 100644 --- a/src/core/inner.rs +++ b/src/core/inner.rs @@ -401,6 +401,15 @@ pub(crate) struct Inner2 { } impl Inner2 { + pub(crate) fn verify_proof(&self, proof: Proof) -> VerifyProofFuture { + VerifyProofFuture { + inner: self.inner.clone(), + proof, + infos: None, + pending_read: None, + } + } + pub(crate) fn create_valueless_proof( &self, block: Option, @@ -420,6 +429,64 @@ impl Inner2 { } } +pub(crate) struct VerifyProofFuture { + inner: Arc>, + proof: Proof, + // None = first attempt (no read done yet), Some = read completed + infos: Option>, + pending_read: Option, HypercoreError>>>, +} + +impl Future for VerifyProofFuture { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + loop { + // Phase 1: if there's a pending storage read, drive it to completion. + if let Some(fut) = this.pending_read.as_mut() { + match fut.as_mut().poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Ready(Ok(infos)) => { + this.infos = Some(infos); + this.pending_read = None; + // Fall through to retry verify_proof. + } + } + } + + // Phase 2: call tree.verify_proof synchronously under the lock. + let result = { + let inner = this.inner.lock().unwrap(); + let public_key = inner.key_pair.public; + let infos_opt = this.infos.as_deref(); + inner.tree.verify_proof(&this.proof, &public_key, infos_opt) + // Lock is dropped here. + }; + + match result { + Err(e) => return Poll::Ready(Err(e)), + Ok(Either::Right(value)) => return Poll::Ready(Ok(value)), + Ok(Either::Left(_)) if this.infos.is_some() => { + // We already read infos and still got Left — the proof can't + // be satisfied, which is an error. + return Poll::Ready(Err(HypercoreError::InvalidOperation { + context: "Could not verify proof from tree".to_string(), + })); + } + Ok(Either::Left(instructions)) => { + let storage = this.inner.lock().unwrap().storage.clone(); + this.pending_read = + Some(storage.read_infos_to_vec(Vec::from(instructions))); + // Loop to poll the new future immediately. + } + } + } + } +} + pub(crate) struct ValuelessProofFuture { inner: Arc>, block: Option, From 0757cc355e1c402d3a73c498abe954b7a868cd84 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Mon, 2 Mar 2026 14:29:49 -0500 Subject: [PATCH 22/26] Add Inner2.byte_range --- src/core/inner.rs | 62 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/src/core/inner.rs b/src/core/inner.rs index 108f8f97..12d3c56e 100644 --- a/src/core/inner.rs +++ b/src/core/inner.rs @@ -401,6 +401,19 @@ pub(crate) struct Inner2 { } impl Inner2 { + pub(crate) fn byte_range( + &self, + index: u64, + initial_infos: Vec, + ) -> ByteRangeFuture { + ByteRangeFuture { + inner: self.inner.clone(), + index, + infos: initial_infos, + pending_read: None, + } + } + pub(crate) fn verify_proof(&self, proof: Proof) -> VerifyProofFuture { VerifyProofFuture { inner: self.inner.clone(), @@ -429,6 +442,55 @@ impl Inner2 { } } +pub(crate) struct ByteRangeFuture { + inner: Arc>, + index: u64, + infos: Vec, + pending_read: Option, HypercoreError>>>, +} + +impl Future for ByteRangeFuture { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + loop { + if let Some(fut) = this.pending_read.as_mut() { + match fut.as_mut().poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Ready(Ok(new_infos)) => { + this.infos.extend(new_infos); + this.pending_read = None; + } + } + } + + let result = { + let inner = this.inner.lock().unwrap(); + let infos_opt = if this.infos.is_empty() { + None + } else { + Some(this.infos.as_slice()) + }; + inner.tree.byte_range(this.index, infos_opt) + // Lock is dropped here. + }; + + match result { + Err(e) => return Poll::Ready(Err(e)), + Ok(Either::Right(value)) => return Poll::Ready(Ok(value)), + Ok(Either::Left(instructions)) => { + let storage = this.inner.lock().unwrap().storage.clone(); + this.pending_read = + Some(storage.read_infos_to_vec(Vec::from(instructions))); + } + } + } + } +} + pub(crate) struct VerifyProofFuture { inner: Arc>, proof: Proof, From 9423cc5480cee4dfbefb3d6a8520b4e0d13c4aa4 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Mon, 2 Mar 2026 21:38:55 -0500 Subject: [PATCH 23/26] Start using owned futures --- src/core/inner.rs | 151 +++++++++++++++++++++++++++++++++++++--------- src/core/mod.rs | 141 ++++++++++++++++++++++--------------------- 2 files changed, 195 insertions(+), 97 deletions(-) diff --git a/src/core/inner.rs b/src/core/inner.rs index 12d3c56e..e9fea21b 100644 --- a/src/core/inner.rs +++ b/src/core/inner.rs @@ -13,10 +13,7 @@ use tracing::instrument; use crate::{ bitfield::Bitfield, - common::{ - BitfieldUpdate, HypercoreError, NodeByteRange, StoreInfo, - ValuelessProof, - }, + common::{BitfieldUpdate, HypercoreError, NodeByteRange, StoreInfo, ValuelessProof}, crypto::{PartialKeypair, generate_signing_key}, data::BlockStore, oplog::{Header, MAX_OPLOG_ENTRIES_BYTE_SIZE, Oplog}, @@ -28,7 +25,7 @@ use hypercore_schema::{Proof, RequestBlock, RequestSeek, RequestUpgrade}; use super::{AppendOutcome, HypercoreOptions, Info}; #[derive(Debug)] -pub(crate) struct HypercoreInner { +pub(crate) struct HypercoreInnerInner { pub(crate) key_pair: PartialKeypair, pub(crate) storage: Storage, pub(crate) oplog: Oplog, @@ -41,7 +38,7 @@ pub(crate) struct HypercoreInner { pub(crate) events: crate::replication::events::Events, } -impl HypercoreInner { +impl HypercoreInnerInner { pub(crate) async fn new( storage: Storage, mut options: HypercoreOptions, @@ -396,22 +393,56 @@ impl HypercoreInner { } } -pub(crate) struct Inner2 { - pub(crate) inner: Arc>, +#[derive(Debug)] +pub(crate) struct HypercoreInner { + pub(crate) inner: Arc>, } -impl Inner2 { - pub(crate) fn byte_range( +impl HypercoreInner { + pub(crate) async fn new( + storage: Storage, + options: HypercoreOptions, + ) -> Result { + Ok(Self { + inner: Arc::new(Mutex::new( + HypercoreInnerInner::new(storage, options).await?, + )), + }) + } + pub(crate) fn info(&self) -> Info { + self.inner.lock().unwrap().info() + } + pub(crate) fn key_pair(&self) -> PartialKeypair { + self.inner.lock().unwrap().key_pair().clone() + } + + pub(crate) fn has(&self, index: u64) -> bool { + self.inner.lock().unwrap().has(index) + } + + #[cfg(feature = "replication")] + pub(crate) fn event_subscribe( &self, - index: u64, - initial_infos: Vec, - ) -> ByteRangeFuture { - ByteRangeFuture { - inner: self.inner.clone(), - index, - infos: initial_infos, - pending_read: None, - } + ) -> async_broadcast::Receiver { + self.inner.lock().unwrap().event_subscribe() + } + pub(crate) fn append_outcome(&self) -> AppendOutcome { + self.inner.lock().unwrap().append_outcome() + } + pub(crate) fn should_flush_bitfield_and_tree_and_oplog(&self) -> bool { + self.inner + .lock() + .unwrap() + .should_flush_bitfield_and_tree_and_oplog() + } + pub(crate) fn flush_bitfield_and_tree_and_oplog( + &self, + clear_traces: bool, + ) -> BoxFuture> { + self.inner + .lock() + .unwrap() + .flush_bitfield_and_tree_and_oplog(clear_traces) } pub(crate) fn verify_proof(&self, proof: Proof) -> VerifyProofFuture { @@ -422,6 +453,25 @@ impl Inner2 { pending_read: None, } } + pub(crate) fn missing_nodes_from_merkle_tree_index( + &self, + merkle_tree_index: u64, + ) -> MissingNodesFuture { + MissingNodesFuture { + inner: self.inner.clone(), + merkle_tree_index, + infos: Vec::new(), + pending_read: None, + } + } + pub(crate) fn byte_range(&self, index: u64, initial_infos: Vec) -> ByteRangeFuture { + ByteRangeFuture { + inner: self.inner.clone(), + index, + infos: initial_infos, + pending_read: None, + } + } pub(crate) fn create_valueless_proof( &self, @@ -442,8 +492,56 @@ impl Inner2 { } } +pub(crate) struct MissingNodesFuture { + inner: Arc>, + merkle_tree_index: u64, + infos: Vec, + pending_read: Option, HypercoreError>>>, +} + +impl Future for MissingNodesFuture { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + loop { + if let Some(fut) = this.pending_read.as_mut() { + match fut.as_mut().poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Ready(Ok(new_infos)) => { + this.infos.extend(new_infos); + this.pending_read = None; + } + } + } + + let result = { + let inner = this.inner.lock().unwrap(); + let infos_opt = if this.infos.is_empty() { + None + } else { + Some(this.infos.as_slice()) + }; + inner.tree.missing_nodes(this.merkle_tree_index, infos_opt) + // Lock is dropped here. + }; + + match result { + Err(e) => return Poll::Ready(Err(e)), + Ok(Either::Right(value)) => return Poll::Ready(Ok(value)), + Ok(Either::Left(instructions)) => { + let storage = this.inner.lock().unwrap().storage.clone(); + this.pending_read = Some(storage.read_infos_to_vec(Vec::from(instructions))); + } + } + } + } +} + pub(crate) struct ByteRangeFuture { - inner: Arc>, + inner: Arc>, index: u64, infos: Vec, pending_read: Option, HypercoreError>>>, @@ -483,8 +581,7 @@ impl Future for ByteRangeFuture { Ok(Either::Right(value)) => return Poll::Ready(Ok(value)), Ok(Either::Left(instructions)) => { let storage = this.inner.lock().unwrap().storage.clone(); - this.pending_read = - Some(storage.read_infos_to_vec(Vec::from(instructions))); + this.pending_read = Some(storage.read_infos_to_vec(Vec::from(instructions))); } } } @@ -492,7 +589,7 @@ impl Future for ByteRangeFuture { } pub(crate) struct VerifyProofFuture { - inner: Arc>, + inner: Arc>, proof: Proof, // None = first attempt (no read done yet), Some = read completed infos: Option>, @@ -540,8 +637,7 @@ impl Future for VerifyProofFuture { } Ok(Either::Left(instructions)) => { let storage = this.inner.lock().unwrap().storage.clone(); - this.pending_read = - Some(storage.read_infos_to_vec(Vec::from(instructions))); + this.pending_read = Some(storage.read_infos_to_vec(Vec::from(instructions))); // Loop to poll the new future immediately. } } @@ -550,7 +646,7 @@ impl Future for VerifyProofFuture { } pub(crate) struct ValuelessProofFuture { - inner: Arc>, + inner: Arc>, block: Option, hash: Option, seek: Option, @@ -605,8 +701,7 @@ impl Future for ValuelessProofFuture { // Need more nodes from storage. Clone storage (cheap Arc clone) // outside the lock so we don't hold it across the async read. let storage = this.inner.lock().unwrap().storage.clone(); - this.pending_read = - Some(storage.read_infos_to_vec(Vec::from(instructions))); + this.pending_read = Some(storage.read_infos_to_vec(Vec::from(instructions))); // Loop to poll the new future immediately. } } diff --git a/src/core/mod.rs b/src/core/mod.rs index 6b473851..5c189e8f 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -37,6 +37,12 @@ impl HypercoreOptions { } } +macro_rules! ininner { + ($self:expr) => { + $self.inner.inner.lock().unwrap() + }; +} + /// Hypercore is an append-only log structure. #[derive(Debug)] pub struct Hypercore { @@ -100,7 +106,7 @@ impl Hypercore { &mut self, batch: B, ) -> Result { - let secret_key = match &self.inner.key_pair.secret { + let secret_key = match self.inner.key_pair().secret { Some(key) => key, None => return Err(HypercoreError::NotWritable), }; @@ -109,19 +115,20 @@ impl Hypercore { return Ok(self.inner.append_outcome()); } // Create a changeset for the tree - let mut changeset = self.inner.tree.changeset(); + let mut changeset = ininner!(self).tree.changeset(); let mut batch_length: usize = 0; for data in batch.as_ref().iter() { batch_length += changeset.append(data.as_ref()); } - changeset.hash_and_sign(secret_key); + changeset.hash_and_sign(&secret_key); // Write the received data to the block store - let info = self - .inner - .block_store - .append_batch(batch.as_ref(), batch_length, self.inner.tree.byte_length); - self.inner.storage.flush_info(info).await?; + let info = ininner!(self).block_store.append_batch( + batch.as_ref(), + batch_length, + ininner!(self).tree.byte_length, + ); + ininner!(self).storage.flush_info(info).await?; // Append the changeset to the Oplog let bitfield_update = BitfieldUpdate { @@ -129,30 +136,30 @@ impl Hypercore { start: changeset.ancestors, length: changeset.batch_length, }; - let outcome = self.inner.oplog.append_changeset( + let outcome = ininner!(self).oplog.append_changeset( &changeset, Some(bitfield_update.clone()), false, - &self.inner.header, + &ininner!(self).header, )?; - self.inner + ininner!(self) .storage .flush_infos(Vec::from(outcome.infos_to_flush)) .await?; - self.inner.header = outcome.header; + ininner!(self).header = outcome.header; // Write to bitfield - self.inner.bitfield.update(&bitfield_update); + ininner!(self).bitfield.update(&bitfield_update); // Contiguous length is known only now update_contiguous_length( - &mut self.inner.header, - &self.inner.bitfield, + &mut ininner!(self).header, + &ininner!(self).bitfield, &bitfield_update, ); // Commit changeset to in-memory tree - self.inner.tree.commit(changeset)?; + ininner!(self).tree.commit(changeset)?; // Now ready to flush if self.inner.should_flush_bitfield_and_tree_and_oplog() { @@ -164,12 +171,10 @@ impl Hypercore { use tracing::trace; trace!(bitfield_update = ?bitfield_update, "Hppercore.append_batch emit DataUpgrade & Have"); - let _ = self - .inner + let _ = ininner!(self) .events .send(crate::replication::events::DataUpgrade {}); - let _ = self - .inner + let _ = ininner!(self) .events .send(crate::replication::events::Have::from(&bitfield_update)); } @@ -192,26 +197,26 @@ impl Hypercore { /// Read value at given index, if any. #[instrument(err, skip(self))] pub async fn get(&self, index: u64) -> Result>, HypercoreError> { - if !self.inner.bitfield.get(index) { + if !ininner!(self).bitfield.get(index) { #[cfg(feature = "replication")] // if not in this core, emit Event::Get(index) { use tracing::trace; trace!(index = index, "Hppercore emit 'get' event"); - self.inner.events.send_on_get(index); + ininner!(self).events.send_on_get(index); } return Ok(None); } - let byte_range = self.inner.byte_range(index, None).await?; + let byte_range = self.inner.byte_range(index, Vec::new()).await?; // TODO: Generalize Either response stack - let data = match self.inner.block_store.read(&byte_range, None) { + let data = match ininner!(self).block_store.read(&byte_range, None) { Either::Right(value) => value, Either::Left(instruction) => { - let info = self.inner.storage.read_info(instruction).await?; - match self.inner.block_store.read(&byte_range, Some(info)) { + let info = ininner!(self).storage.read_info(instruction).await?; + match ininner!(self).block_store.read(&byte_range, Some(info)) { Either::Right(value) => value, Either::Left(_) => { return Err(HypercoreError::InvalidOperation { @@ -233,44 +238,43 @@ impl Hypercore { return Ok(()); } // Write to oplog - let infos_to_flush = self.inner.oplog.clear(start, end)?; - self.inner + let infos_to_flush = ininner!(self).oplog.clear(start, end)?; + ininner!(self) .storage .flush_infos(Vec::from(infos_to_flush)) .await?; // Set bitfield - self.inner.bitfield.set_range(start, end - start, false); + ininner!(self).bitfield.set_range(start, end - start, false); // Set contiguous length - if start < self.inner.header.hints.contiguous_length { - self.inner.header.hints.contiguous_length = start; + if start < ininner!(self).header.hints.contiguous_length { + ininner!(self).header.hints.contiguous_length = start; } // Find the biggest hole that can be punched into the data - let start = if let Some(index) = self.inner.bitfield.last_index_of(true, start) { + let start = if let Some(index) = ininner!(self).bitfield.last_index_of(true, start) { index + 1 } else { 0 }; - let end = if let Some(index) = self.inner.bitfield.index_of(true, end) { + let end = if let Some(index) = ininner!(self).bitfield.index_of(true, end) { index } else { - self.inner.tree.length + ininner!(self).tree.length }; // Find byte offset for first value let mut infos: Vec = Vec::new(); - let clear_offset = match self.inner.tree.byte_offset(start, None)? { + let clear_offset = match ininner!(self).tree.byte_offset(start, None)? { Either::Right(value) => value, Either::Left(instructions) => { - let new_infos = self - .inner + let new_infos = ininner!(self) .storage .read_infos_to_vec(Vec::from(instructions)) .await?; infos.extend(new_infos); - match self.inner.tree.byte_offset(start, Some(&infos))? { + match ininner!(self).tree.byte_offset(start, Some(&infos))? { Either::Right(value) => value, Either::Left(_) => { return Err(HypercoreError::InvalidOperation { @@ -282,13 +286,13 @@ impl Hypercore { }; // Find byte range for last value - let last_byte_range = self.inner.byte_range(end - 1, Some(&infos)).await?; + let last_byte_range = self.inner.byte_range(end - 1, infos).await?; let clear_length = (last_byte_range.index + last_byte_range.length) - clear_offset; // Clear blocks - let info_to_flush = self.inner.block_store.clear(clear_offset, clear_length); - self.inner.storage.flush_info(info_to_flush).await?; + let info_to_flush = ininner!(self).block_store.clear(clear_offset, clear_length); + ininner!(self).storage.flush_info(info_to_flush).await?; // Now ready to flush if self.inner.should_flush_bitfield_and_tree_and_oplog() { @@ -299,7 +303,7 @@ impl Hypercore { } /// Access the key pair. - pub fn key_pair(&self) -> &PartialKeypair { + pub fn key_pair(&self) -> PartialKeypair { self.inner.key_pair() } @@ -334,11 +338,12 @@ impl Hypercore { /// possible to apply. #[instrument(skip_all)] pub async fn verify_and_apply_proof(&mut self, proof: &Proof) -> Result { - if proof.fork != self.inner.tree.fork { + if proof.fork != ininner!(self).tree.fork { return Ok(false); } - let changeset = self.inner.verify_proof(proof).await?; - if !self.inner.tree.commitable(&changeset) { + // TODO rm clone pass as owned + let changeset = self.inner.verify_proof(proof.clone()).await?; + if !ininner!(self).tree.commitable(&changeset) { return Ok(false); } @@ -347,19 +352,17 @@ impl Hypercore { // oplog push, and then flushes in the end only for the whole group. let bitfield_update: Option = if let Some(block) = &proof.block.as_ref() { let byte_offset = - match self - .inner + match ininner!(self) .tree .byte_offset_in_changeset(block.index, &changeset, None)? { Either::Right(value) => value, Either::Left(instructions) => { - let infos = self - .inner + let infos = ininner!(self) .storage .read_infos_to_vec(Vec::from(instructions)) .await?; - match self.inner.tree.byte_offset_in_changeset( + match ininner!(self).tree.byte_offset_in_changeset( block.index, &changeset, Some(&infos), @@ -378,8 +381,8 @@ impl Hypercore { }; // Write the value to the block store - let info_to_flush = self.inner.block_store.put(&block.value, byte_offset); - self.inner.storage.flush_info(info_to_flush).await?; + let info_to_flush = ininner!(self).block_store.put(&block.value, byte_offset); + ininner!(self).storage.flush_info(info_to_flush).await?; // Return a bitfield update for the given value Some(BitfieldUpdate { @@ -393,32 +396,32 @@ impl Hypercore { }; // Append the changeset to the Oplog - let outcome = self.inner.oplog.append_changeset( + let outcome = ininner!(self).oplog.append_changeset( &changeset, bitfield_update.clone(), false, - &self.inner.header, + &ininner!(self).header, )?; - self.inner + ininner!(self) .storage .flush_infos(Vec::from(outcome.infos_to_flush)) .await?; - self.inner.header = outcome.header; + ininner!(self).header = outcome.header; if let Some(bitfield_update) = &bitfield_update { // Write to bitfield - self.inner.bitfield.update(bitfield_update); + ininner!(self).bitfield.update(bitfield_update); // Contiguous length is known only now update_contiguous_length( - &mut self.inner.header, - &self.inner.bitfield, + &mut ininner!(self).header, + &ininner!(self).bitfield, bitfield_update, ); } // Commit changeset to in-memory tree - self.inner.tree.commit(changeset)?; + ininner!(self).tree.commit(changeset)?; // Now ready to flush if self.inner.should_flush_bitfield_and_tree_and_oplog() { @@ -429,16 +432,14 @@ impl Hypercore { { if proof.upgrade.is_some() { // Notify replicator if we receieved an upgrade - let _ = self - .inner + let _ = ininner!(self) .events .send(crate::replication::events::DataUpgrade {}); } // Notify replicator if we receieved a bitfield update if let Some(ref bitfield) = bitfield_update { - let _ = self - .inner + let _ = ininner!(self) .events .send(crate::replication::events::Have::from(bitfield)); } @@ -473,11 +474,13 @@ impl Hypercore { /// been stored. #[instrument(err, skip_all)] pub async fn make_read_only(&mut self) -> Result { - if self.inner.key_pair.secret.is_some() { - self.inner.key_pair.secret = None; - self.inner.header.key_pair.secret = None; + if ininner!(self).key_pair.secret.is_some() { + ininner!(self).key_pair.secret = None; + ininner!(self).header.key_pair.secret = None; // Need to flush clearing traces to make sure both oplog slots are cleared - self.inner.flush_bitfield_and_tree_and_oplog(true).await?; + ininner!(self) + .flush_bitfield_and_tree_and_oplog(true) + .await?; Ok(true) } else { Ok(false) @@ -817,7 +820,7 @@ pub(crate) mod tests { let mut clone = create_hypercore_with_data_and_key_pair( 0, PartialKeypair { - public: main.inner.key_pair.public, + public: ininner!(main).key_pair.public, secret: None, }, ) From d16687fe65817169e159434dac30f2e2d45b23c9 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Mon, 2 Mar 2026 21:39:55 -0500 Subject: [PATCH 24/26] add lint for unused async --- src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lib.rs b/src/lib.rs index 1d039d9c..d0a752ef 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,7 @@ non_local_definitions, missing_debug_implementations, //missing_docs, + clippy::unused_async, clippy::needless_pass_by_value, clippy::needless_pass_by_ref_mut, clippy::enum_glob_use From a0c3e4cc790a34fc47bba63da3c0c1ceb508a1a8 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Tue, 3 Mar 2026 01:12:16 -0500 Subject: [PATCH 25/26] Fix mutex issues --- src/core/mod.rs | 157 +++++++++++++++++++++++++----------------------- 1 file changed, 82 insertions(+), 75 deletions(-) diff --git a/src/core/mod.rs b/src/core/mod.rs index 5c189e8f..987b1522 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -10,6 +10,7 @@ use tracing::instrument; use crate::common::cache::CacheOptions; use crate::{ common::{BitfieldUpdate, HypercoreError, StoreInfo}, + core::inner::HypercoreInnerInner, crypto::PartialKeypair, storage::Storage, }; @@ -123,12 +124,12 @@ impl Hypercore { changeset.hash_and_sign(&secret_key); // Write the received data to the block store - let info = ininner!(self).block_store.append_batch( - batch.as_ref(), - batch_length, - ininner!(self).tree.byte_length, - ); - ininner!(self).storage.flush_info(info).await?; + let byte_length = ininner!(self).tree.byte_length; + let info = + ininner!(self) + .block_store + .append_batch(batch.as_ref(), batch_length, byte_length); + { ininner!(self).storage.flush_info(info) }.await?; // Append the changeset to the Oplog let bitfield_update = BitfieldUpdate { @@ -136,27 +137,28 @@ impl Hypercore { start: changeset.ancestors, length: changeset.batch_length, }; - let outcome = ininner!(self).oplog.append_changeset( - &changeset, - Some(bitfield_update.clone()), - false, - &ininner!(self).header, - )?; - ininner!(self) - .storage - .flush_infos(Vec::from(outcome.infos_to_flush)) - .await?; + let outcome = { + let HypercoreInnerInner { oplog, header, .. } = &mut *ininner!(self); + oplog.append_changeset(&changeset, Some(bitfield_update.clone()), false, header)? + }; + { + ininner!(self) + .storage + .flush_infos(Vec::from(outcome.infos_to_flush)) + } + .await?; ininner!(self).header = outcome.header; // Write to bitfield ininner!(self).bitfield.update(&bitfield_update); // Contiguous length is known only now - update_contiguous_length( - &mut ininner!(self).header, - &ininner!(self).bitfield, - &bitfield_update, - ); + { + let HypercoreInnerInner { + bitfield, header, .. + } = &mut *ininner!(self); + update_contiguous_length(header, bitfield, &bitfield_update); + } // Commit changeset to in-memory tree ininner!(self).tree.commit(changeset)?; @@ -212,10 +214,10 @@ impl Hypercore { let byte_range = self.inner.byte_range(index, Vec::new()).await?; // TODO: Generalize Either response stack - let data = match ininner!(self).block_store.read(&byte_range, None) { + let data = match { ininner!(self).block_store.read(&byte_range, None) } { Either::Right(value) => value, Either::Left(instruction) => { - let info = ininner!(self).storage.read_info(instruction).await?; + let info = { ininner!(self).storage.read_info(instruction) }.await?; match ininner!(self).block_store.read(&byte_range, Some(info)) { Either::Right(value) => value, Either::Left(_) => { @@ -239,10 +241,12 @@ impl Hypercore { } // Write to oplog let infos_to_flush = ininner!(self).oplog.clear(start, end)?; - ininner!(self) - .storage - .flush_infos(Vec::from(infos_to_flush)) - .await?; + { + ininner!(self) + .storage + .flush_infos(Vec::from(infos_to_flush)) + } + .await?; // Set bitfield ininner!(self).bitfield.set_range(start, end - start, false); @@ -266,13 +270,15 @@ impl Hypercore { // Find byte offset for first value let mut infos: Vec = Vec::new(); - let clear_offset = match ininner!(self).tree.byte_offset(start, None)? { + let clear_offset = match { ininner!(self).tree.byte_offset(start, None)? } { Either::Right(value) => value, Either::Left(instructions) => { - let new_infos = ininner!(self) - .storage - .read_infos_to_vec(Vec::from(instructions)) - .await?; + let new_infos = { + ininner!(self) + .storage + .read_infos_to_vec(Vec::from(instructions)) + } + .await?; infos.extend(new_infos); match ininner!(self).tree.byte_offset(start, Some(&infos))? { Either::Right(value) => value, @@ -292,7 +298,7 @@ impl Hypercore { // Clear blocks let info_to_flush = ininner!(self).block_store.clear(clear_offset, clear_length); - ininner!(self).storage.flush_info(info_to_flush).await?; + { ininner!(self).storage.flush_info(info_to_flush) }.await?; // Now ready to flush if self.inner.should_flush_bitfield_and_tree_and_oplog() { @@ -337,7 +343,7 @@ impl Hypercore { /// Verify and apply proof received from peer, returns true if changed, false if not /// possible to apply. #[instrument(skip_all)] - pub async fn verify_and_apply_proof(&mut self, proof: &Proof) -> Result { + pub async fn verify_and_apply_proof(&self, proof: &Proof) -> Result { if proof.fork != ininner!(self).tree.fork { return Ok(false); } @@ -351,38 +357,40 @@ impl Hypercore { // here we do only one. _verifyShared groups together many subsequent changesets into a single // oplog push, and then flushes in the end only for the whole group. let bitfield_update: Option = if let Some(block) = &proof.block.as_ref() { - let byte_offset = - match ininner!(self) + let byte_offset = match { + ininner!(self) .tree .byte_offset_in_changeset(block.index, &changeset, None)? - { - Either::Right(value) => value, - Either::Left(instructions) => { - let infos = ininner!(self) + } { + Either::Right(value) => value, + Either::Left(instructions) => { + let infos = { + ininner!(self) .storage .read_infos_to_vec(Vec::from(instructions)) - .await?; - match ininner!(self).tree.byte_offset_in_changeset( - block.index, - &changeset, - Some(&infos), - )? { - Either::Right(value) => value, - Either::Left(_) => { - return Err(HypercoreError::InvalidOperation { - context: format!( - "Could not read offset for index {} from tree", - block.index - ), - }); - } + } + .await?; + match ininner!(self).tree.byte_offset_in_changeset( + block.index, + &changeset, + Some(&infos), + )? { + Either::Right(value) => value, + Either::Left(_) => { + return Err(HypercoreError::InvalidOperation { + context: format!( + "Could not read offset for index {} from tree", + block.index + ), + }); } } - }; + } + }; // Write the value to the block store let info_to_flush = ininner!(self).block_store.put(&block.value, byte_offset); - ininner!(self).storage.flush_info(info_to_flush).await?; + { ininner!(self).storage.flush_info(info_to_flush) }.await?; // Return a bitfield update for the given value Some(BitfieldUpdate { @@ -396,16 +404,16 @@ impl Hypercore { }; // Append the changeset to the Oplog - let outcome = ininner!(self).oplog.append_changeset( - &changeset, - bitfield_update.clone(), - false, - &ininner!(self).header, - )?; - ininner!(self) - .storage - .flush_infos(Vec::from(outcome.infos_to_flush)) - .await?; + let outcome = { + let HypercoreInnerInner { oplog, header, .. } = &mut *ininner!(self); + oplog.append_changeset(&changeset, bitfield_update.clone(), false, header)? + }; + { + ininner!(self) + .storage + .flush_infos(Vec::from(outcome.infos_to_flush)) + } + .await?; ininner!(self).header = outcome.header; if let Some(bitfield_update) = &bitfield_update { @@ -413,11 +421,12 @@ impl Hypercore { ininner!(self).bitfield.update(bitfield_update); // Contiguous length is known only now - update_contiguous_length( - &mut ininner!(self).header, - &ininner!(self).bitfield, - bitfield_update, - ); + { + let HypercoreInnerInner { + bitfield, header, .. + } = &mut *ininner!(self); + update_contiguous_length(header, bitfield, &bitfield_update); + } } // Commit changeset to in-memory tree @@ -478,9 +487,7 @@ impl Hypercore { ininner!(self).key_pair.secret = None; ininner!(self).header.key_pair.secret = None; // Need to flush clearing traces to make sure both oplog slots are cleared - ininner!(self) - .flush_bitfield_and_tree_and_oplog(true) - .await?; + { ininner!(self).flush_bitfield_and_tree_and_oplog(true) }.await?; Ok(true) } else { Ok(false) From cf81436edc59445ffc1abdc4332b78ada8a0e358 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Thu, 5 Mar 2026 12:42:33 -0500 Subject: [PATCH 26/26] implement Hypercore.get as owned future --- src/core/inner.rs | 89 +++++++++++++++++++++++++++++++++++++++++++++++ src/core/mod.rs | 32 +---------------- 2 files changed, 90 insertions(+), 31 deletions(-) diff --git a/src/core/inner.rs b/src/core/inner.rs index e9fea21b..91a34aa2 100644 --- a/src/core/inner.rs +++ b/src/core/inner.rs @@ -473,6 +473,16 @@ impl HypercoreInner { } } + pub(crate) fn get(&self, index: u64) -> GetFuture { + GetFuture { + inner: self.inner.clone(), + index, + byte_range_fut: None, + byte_range: None, + block_read_fut: None, + } + } + pub(crate) fn create_valueless_proof( &self, block: Option, @@ -492,6 +502,85 @@ impl HypercoreInner { } } +pub(crate) struct GetFuture { + inner: Arc>, + index: u64, + // Phase 1: resolve byte range + byte_range_fut: Option, + // Phase 2: read block from storage (only needed if block_store has no cached value) + byte_range: Option, + block_read_fut: Option>>, +} + +impl Future for GetFuture { + type Output = Result>, HypercoreError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + // TODO: we really need to generalize the Either response stack + loop { + // Phase 2: storage read for block data (highest priority when active). + if let Some(fut) = this.block_read_fut.as_mut() { + match fut.as_mut().poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Ready(Ok(info)) => { + this.block_read_fut = None; + let inner = this.inner.lock().unwrap(); + let byte_range = this.byte_range.as_ref().unwrap(); + return match inner.block_store.read(byte_range, Some(info)) { + Either::Right(data) => Poll::Ready(Ok(Some(data))), + Either::Left(_) => Poll::Ready(Err(HypercoreError::InvalidOperation { + context: "Could not read block storage range".to_string(), + })), + }; + } + } + } + + // Phase 1: resolve the byte range. + if let Some(fut) = this.byte_range_fut.as_mut() { + match Pin::new(fut).poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Ready(Ok(byte_range)) => { + this.byte_range_fut = None; + let inner = this.inner.lock().unwrap(); + match inner.block_store.read(&byte_range, None) { + Either::Right(data) => return Poll::Ready(Ok(Some(data))), + Either::Left(instruction) => { + let storage = inner.storage.clone(); + this.block_read_fut = Some(storage.read_info(instruction)); + this.byte_range = Some(byte_range); + // Loop to poll block_read_fut immediately. + } + } + } + } + continue; + } + + // Initial: check bitfield, then start byte range resolution. + { + let inner = this.inner.lock().unwrap(); + if !inner.bitfield.get(this.index) { + #[cfg(feature = "replication")] + inner.events.send_on_get(this.index); + return Poll::Ready(Ok(None)); + } + } + this.byte_range_fut = Some(ByteRangeFuture { + inner: this.inner.clone(), + index: this.index, + infos: Vec::new(), + pending_read: None, + }); + // Loop to poll byte_range_fut immediately. + } + } +} + pub(crate) struct MissingNodesFuture { inner: Arc>, merkle_tree_index: u64, diff --git a/src/core/mod.rs b/src/core/mod.rs index 987b1522..6dfd4bd1 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -199,37 +199,7 @@ impl Hypercore { /// Read value at given index, if any. #[instrument(err, skip(self))] pub async fn get(&self, index: u64) -> Result>, HypercoreError> { - if !ininner!(self).bitfield.get(index) { - #[cfg(feature = "replication")] - // if not in this core, emit Event::Get(index) - { - use tracing::trace; - - trace!(index = index, "Hppercore emit 'get' event"); - ininner!(self).events.send_on_get(index); - } - return Ok(None); - } - - let byte_range = self.inner.byte_range(index, Vec::new()).await?; - - // TODO: Generalize Either response stack - let data = match { ininner!(self).block_store.read(&byte_range, None) } { - Either::Right(value) => value, - Either::Left(instruction) => { - let info = { ininner!(self).storage.read_info(instruction) }.await?; - match ininner!(self).block_store.read(&byte_range, Some(info)) { - Either::Right(value) => value, - Either::Left(_) => { - return Err(HypercoreError::InvalidOperation { - context: "Could not read block storage range".to_string(), - }); - } - } - } - }; - - Ok(Some(data.to_vec())) + Ok(self.inner.get(index).await?.map(|b| b.into_vec())) } /// Clear data for entries between start and end (exclusive) indexes.