From d69414114ef128390d04e0e740525c59f718918f Mon Sep 17 00:00:00 2001 From: nymius <155548262+nymius@users.noreply.github.com> Date: Sun, 29 Sep 2024 17:44:29 -0300 Subject: [PATCH 1/3] feat(store): add StoreError enum StoreError is an ergonomic way to join all the errors that may occur while opening a changeset datafile. It implements the following variants: - EntryIter: any error related to the decoding of the changesets of the file. - Io: errors related with file management internals. - InvalidMagicBytes: error caused by mismatching with the expected magic bytes. --- crates/file_store/src/lib.rs | 48 ++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/crates/file_store/src/lib.rs b/crates/file_store/src/lib.rs index 7c943ca20..9c2cef09c 100644 --- a/crates/file_store/src/lib.rs +++ b/crates/file_store/src/lib.rs @@ -40,3 +40,51 @@ impl From for FileError { } impl std::error::Error for FileError {} + +/// An error while opening or creating the file store +#[derive(Debug)] +pub enum StoreError { + /// Entry iter error + EntryIter { + /// Index that caused the error + index: usize, + /// Iter error + iter: IterError, + /// Amount of bytes read so far + bytes_read: u64, + }, + /// IO error, this may mean that the file is too short. + Io(io::Error), + /// Magic bytes do not match what is expected. + InvalidMagicBytes { got: Vec, expected: Vec }, +} + +impl core::fmt::Display for StoreError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + Self::EntryIter { + index, + iter, + bytes_read, + } => write!( + f, + "{}: changeset index={}, bytes read={}", + iter, index, bytes_read + ), + Self::Io(e) => write!(f, "io error trying to read file: {}", e), + Self::InvalidMagicBytes { got, expected } => write!( + f, + "file has invalid magic bytes: expected={:?} got={:?}", + expected, got, + ), + } + } +} + +impl std::error::Error for StoreError {} + +impl From for StoreError { + fn from(value: io::Error) -> Self { + Self::Io(value) + } +} From 0e2acdfb22b4d797fa23448d6af8fa0e82042d24 Mon Sep 17 00:00:00 2001 From: nymius <155548262+nymius@users.noreply.github.com> Date: Sun, 29 Sep 2024 17:50:34 -0300 Subject: [PATCH 2/3] feat(store): add Store::reopen method This new method ensures the integrity of the changesets while opening the file storing them and also emplaces the file pointer at the EOF to avoid overwriting any changeset and start appending new changesets right away. --- crates/file_store/src/store.rs | 58 +++++++++++++++++++++++++++++++++- 1 file changed, 57 insertions(+), 1 deletion(-) diff --git a/crates/file_store/src/store.rs b/crates/file_store/src/store.rs index 62c3d91b6..a935159c9 100644 --- a/crates/file_store/src/store.rs +++ b/crates/file_store/src/store.rs @@ -1,4 +1,4 @@ -use crate::{bincode_options, EntryIter, FileError, IterError}; +use crate::{bincode_options, EntryIter, FileError, IterError, StoreError}; use bdk_chain::Merge; use bincode::Options; use std::{ @@ -92,6 +92,62 @@ where }) } + /// Open an existing [`Store`], read its content, and return it ready to start receiving new + /// changesets. + /// + /// Use [`create_new`] to create a new `Store`. + /// + /// # Errors + /// + /// If the prefixed bytes of the opened file does not match the provided `magic`, the + /// [`StoreError::InvalidMagicBytes`] error variant will be returned. + /// + /// If there is an error while decoding the changesets stored, the [`StoreError::EntryIter`] + /// error variant will be returned, with the index of the failing changeset and the error it + /// caused. + /// + /// [`create_new`]: Store::create_new + pub fn reopen

(magic: &[u8], file_path: P) -> Result + where + P: AsRef, + { + let mut f = OpenOptions::new().read(true).write(true).open(file_path)?; + + let mut magic_buf = vec![0_u8; magic.len()]; + f.read_exact(&mut magic_buf)?; + if magic_buf != magic { + return Err(StoreError::InvalidMagicBytes { + got: magic_buf, + expected: magic.to_vec(), + }); + } + + let mut store = Self { + magic_len: magic.len(), + db_file: f, + marker: Default::default(), + }; + + let mut index: usize = 0; + let mut error: Option = None; + for (idx, next_changeset) in store.iter_changesets().enumerate() { + if let Err(iter_error) = next_changeset { + index = idx; + error = Some(iter_error); + }; + } + + if let Some(iter) = error { + return Err(StoreError::EntryIter { + index, + iter, + bytes_read: store.db_file.stream_position()?, + }); + } + + Ok(store) + } + /// Attempt to open existing [`Store`] file; create it if the file is non-existent. /// /// Internally, this calls either [`open`] or [`create_new`]. From 59da99a1e9b5971fe74e4f189434a53a5fb8f3cd Mon Sep 17 00:00:00 2001 From: nymius <155548262+nymius@users.noreply.github.com> Date: Sun, 29 Sep 2024 17:54:49 -0300 Subject: [PATCH 3/3] test(store): add tests for Store::reopen The following tests have been added: - reopen_recovers_state_after_last_write: check Store::reopen recovers the changeset stored previously in the file. - fail_to_reopen_if_write_is_short: check Store::reopen reads correct changesets and fails to read failing one, retrieving the needed data with StoreError::EntryIter to recover the underlying file to a working state. - reopen_fails_to_read_if_invalid_bytes: check Store::reopen recognizes garbage data as well as Store::open. --- crates/file_store/src/store.rs | 167 +++++++++++++++++++++++++++++++++ 1 file changed, 167 insertions(+) diff --git a/crates/file_store/src/store.rs b/crates/file_store/src/store.rs index a935159c9..be92b14ed 100644 --- a/crates/file_store/src/store.rs +++ b/crates/file_store/src/store.rs @@ -494,4 +494,171 @@ mod test { assert_eq!(aggregation, exp_aggregation); } } + + #[test] + fn reopen_recovers_state_after_last_write() { + let temp_dir = tempfile::tempdir().unwrap(); + let file_path = temp_dir.path().join("db_file"); + + let changeset1 = TestChangeSet::from(["1".into(), "2".into(), "3".into()]); + let changeset2 = TestChangeSet::from(["4".into(), "5".into(), "6".into()]); + + { + // create new db + let mut db = Store::::create_new(&TEST_MAGIC_BYTES, &file_path) + .expect("must create"); + + // append first changeset to db + db.append_changeset(&changeset1).expect("must succeed"); + } + + { + // open db + let mut db = Store::::reopen(&TEST_MAGIC_BYTES, &file_path) + .expect("failed to load db"); + + // now append the second changeset + db.append_changeset(&changeset2).expect("must succeed"); + + // Retrieve stored changesets from the database + let stored_changesets = db + .aggregate_changesets() + .expect("must succeed") + .expect("must succeed"); + + // expected changeset must be changeset2 + changeset1 + let mut expected_changeset = changeset2.clone(); + expected_changeset.extend(changeset1); + + // Assert that stored_changesets matches expected_changeset but not changeset2 + assert_eq!(stored_changesets, expected_changeset); + assert_ne!(stored_changesets, changeset2); + } + + // Open the store again to verify file pointer position at the EOF + let mut db = Store::::reopen(&TEST_MAGIC_BYTES, &file_path) + .expect("failed to load db"); + + // get the current position of file pointer just after loading store + let current_pointer = db.db_file.stream_position().expect("must suceed"); + + // get pointer to last position for the loaded db + let expected_pointer = db.db_file.seek(io::SeekFrom::End(0)).expect("must succeed"); + + // both should match + assert_eq!(current_pointer, expected_pointer); + } + + #[test] + fn fail_to_reopen_if_write_is_short() { + let temp_dir = tempfile::tempdir().unwrap(); + + let changesets = [ + TestChangeSet::from(["1".into()]), + TestChangeSet::from(["2".into(), "3".into()]), + TestChangeSet::from(["4".into(), "5".into(), "6".into()]), + ]; + let last_changeset = TestChangeSet::from(["7".into(), "8".into(), "9".into()]); + let last_changeset_bytes = bincode_options().serialize(&last_changeset).unwrap(); + + for short_write_len in 1..last_changeset_bytes.len() - 1 { + let file_path = temp_dir.path().join(format!("{}.dat", short_write_len)); + + // simulate creating a file, writing data where the last write is incomplete + { + let mut db = + Store::::create_new(&TEST_MAGIC_BYTES, &file_path).unwrap(); + for changeset in &changesets { + db.append_changeset(changeset).unwrap(); + } + // this is the incomplete write + db.db_file + .write_all(&last_changeset_bytes[..short_write_len]) + .unwrap(); + } + + // Reopen file and fail, but recover once file is truncated to valid bytes + { + match Store::::reopen(&TEST_MAGIC_BYTES, &file_path) { + Err(StoreError::EntryIter { + index, bytes_read, .. + }) => { + // Open file again and truncate file to valid content + let mut f = OpenOptions::new() + .read(true) + .write(true) + .open(&file_path) + .expect("should open"); + f.set_len(bytes_read) + .expect("should truncate the file length to bytes_read"); + f.seek(io::SeekFrom::End(0)) + .expect("should position the file pointer to the new EOF"); + + // Once file is truncated reopen file again + let mut db = Store::::reopen(&TEST_MAGIC_BYTES, &file_path) + .expect("should not fail now"); + let exp_aggregation = db + .iter_changesets() + .take(index) + .map(|r| r.expect("must read valid changeset")) + .fold(TestChangeSet::default(), |mut acc, v| { + Merge::merge(&mut acc, v); + acc + }); + + assert_eq!( + exp_aggregation, + changesets + .iter() + .cloned() + .reduce(|mut acc, cs| { + Merge::merge(&mut acc, cs); + acc + }) + .expect("should merge normally"), + "should recover all changesets that are written in full", + ); + + db.db_file.write_all(&last_changeset_bytes).unwrap(); + } + _ => panic!("reopen must fail to read"), + } + } + + // load file again - this time we should successfully read all changesets + { + let mut db = Store::::reopen(&TEST_MAGIC_BYTES, &file_path).unwrap(); + let aggregated_changesets = db + .aggregate_changesets() + .expect("aggregating all changesets should succeed"); + assert_eq!( + aggregated_changesets, + changesets + .iter() + .cloned() + .chain(core::iter::once(last_changeset.clone())) + .reduce(|mut acc, cs| { + Merge::merge(&mut acc, cs); + acc + }), + "should recover all changesets", + ); + } + } + } + + #[test] + #[should_panic( + expected = "Byte 255 is treated as an extension point; it should not be encoding anything." + )] + fn reopen_fails_to_read_if_invalid_bytes() { + // initial data to write to file (magic bytes + invalid data) + let mut data = [255_u8; 2000]; + data[..TEST_MAGIC_BYTES_LEN].copy_from_slice(&TEST_MAGIC_BYTES); + + let mut file = NamedTempFile::new().unwrap(); + file.write_all(&data).expect("should write"); + + Store::::reopen(&TEST_MAGIC_BYTES, file.path()).unwrap(); + } }