diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 2d67fda0..3a4e2cdb 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -3,10 +3,44 @@ //! Stores all events indefinitely and serves paginated history queries. //! Events are keyed by their content hash and indexed by author/seq for //! DAG-aware pagination. +//! +//! ## Durability +//! +//! On open, the store enables WAL journaling, `synchronous = FULL`, and +//! foreign-key enforcement to provide durability across crashes. WAL is +//! incompatible with in-memory databases, so it is skipped automatically +//! when the path is `:memory:`. +//! +//! ## Schema versioning +//! +//! All `CREATE TABLE` / `CREATE INDEX` statements live in the [`MIGRATIONS`] +//! slice. On open, the store records applied migration versions in a +//! `schema_version` table and applies any pending migrations inside a +//! transaction. New migrations should be appended to [`MIGRATIONS`] and +//! never reordered or rewritten so existing databases stay consistent. use rusqlite::{params, Connection}; use willow_state::{Event, EventKind, HeadsSummary}; +/// Ordered list of schema migrations. Each entry is run inside its own +/// transaction the first time the database is opened. Once a migration is +/// shipped, never edit or reorder it — only append new entries. +const MIGRATIONS: &[&str] = &[ + // Migration 1: initial schema (the events table and its indexes). + "CREATE TABLE IF NOT EXISTS events ( + hash BLOB PRIMARY KEY, + server_id TEXT NOT NULL, + channel_id TEXT NOT NULL DEFAULT '', + author BLOB NOT NULL, + seq INTEGER NOT NULL, + timestamp_hint_ms INTEGER NOT NULL, + event_data BLOB NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_events_server ON events(server_id); + CREATE INDEX IF NOT EXISTS idx_events_channel ON events(server_id, channel_id); + CREATE INDEX IF NOT EXISTS idx_events_author_seq ON events(author, seq);", +]; + /// SQLite-backed event store. pub struct StorageEventStore { conn: Connection, @@ -14,47 +48,131 @@ pub struct StorageEventStore { impl StorageEventStore { /// Open or create the database at `path`. + /// + /// Enables WAL journaling (skipped for `:memory:` databases), + /// `synchronous = FULL`, and foreign-key enforcement, then runs any + /// pending schema migrations. pub fn open(path: &str) -> anyhow::Result { - let conn = if path == ":memory:" { + let is_memory = path == ":memory:"; + let conn = if is_memory { Connection::open_in_memory()? } else { Connection::open(path)? }; - conn.execute_batch( - "CREATE TABLE IF NOT EXISTS events ( - hash BLOB PRIMARY KEY, - server_id TEXT NOT NULL, - channel_id TEXT NOT NULL DEFAULT '', - author BLOB NOT NULL, - seq INTEGER NOT NULL, - timestamp_hint_ms INTEGER NOT NULL, - event_data BLOB NOT NULL - ); - CREATE INDEX IF NOT EXISTS idx_events_server ON events(server_id); - CREATE INDEX IF NOT EXISTS idx_events_channel ON events(server_id, channel_id); - CREATE INDEX IF NOT EXISTS idx_events_author_seq ON events(author, seq);", + + // Durability pragmas. WAL is a no-op for in-memory databases and + // SQLite ignores the request — skip it explicitly so we can assert + // on the resulting journal_mode in tests without surprises. + if !is_memory { + conn.pragma_update(None, "journal_mode", "WAL")?; + } + conn.pragma_update(None, "synchronous", "FULL")?; + conn.pragma_update(None, "foreign_keys", "ON")?; + + let store = Self { conn }; + store.run_migrations()?; + Ok(store) + } + + /// Apply any migrations from [`MIGRATIONS`] that haven't been recorded + /// yet. Each pending migration runs inside its own transaction along + /// with the `schema_version` insert, so a crash mid-migration leaves the + /// database either fully migrated or untouched. + fn run_migrations(&self) -> anyhow::Result<()> { + self.conn.execute_batch( + "CREATE TABLE IF NOT EXISTS schema_version ( + version INTEGER PRIMARY KEY, + applied_at_ms INTEGER NOT NULL + );", )?; - Ok(Self { conn }) + + let current: i64 = self + .conn + .query_row( + "SELECT COALESCE(MAX(version), 0) FROM schema_version", + [], + |row| row.get(0), + ) + .unwrap_or(0); + + for (idx, sql) in MIGRATIONS.iter().enumerate() { + let version = (idx + 1) as i64; + if version <= current { + continue; + } + let tx = self.conn.unchecked_transaction()?; + tx.execute_batch(sql)?; + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis() as i64) + .unwrap_or(0); + tx.execute( + "INSERT INTO schema_version (version, applied_at_ms) VALUES (?1, ?2)", + params![version, now_ms], + )?; + tx.commit()?; + } + Ok(()) } /// Store an event. Deduplicates by event hash. Returns true if inserted. + /// + /// Cross-call duplicates (same hash already in the DB) are silently + /// ignored and reported as `Ok(false)`. For higher throughput, batch + /// many events into a single [`Self::store_events`] call. pub fn store_event(&self, server_id: &str, event: &Event) -> anyhow::Result { - let channel_id = extract_channel_id(event); - let event_data = bincode::serialize(event)?; - let rows = self.conn.execute( - "INSERT OR IGNORE INTO events (hash, server_id, channel_id, author, seq, timestamp_hint_ms, event_data) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)", - params![ - event.hash.0.as_slice(), - server_id, - channel_id, - event.author.as_bytes(), - event.seq as i64, - event.timestamp_hint_ms as i64, - event_data, - ], - )?; - Ok(rows > 0) + let tx = self.conn.unchecked_transaction()?; + let inserted = insert_event_or_ignore(&tx, server_id, event)?; + tx.commit()?; + Ok(inserted) + } + + /// Store a batch of events atomically inside a single transaction. + /// + /// Returns the number of rows actually inserted; duplicates already + /// present in the database are silently ignored. The whole batch is + /// committed at once, so a single fsync flushes every event in the + /// batch — this is much faster than calling [`Self::store_event`] in a + /// loop. + /// + /// If any insert fails (for example, two events in the *same* batch + /// share a primary key, or another constraint is violated), the entire + /// transaction is rolled back and no events from the batch are + /// persisted. To deduplicate across calls only — and roll back the + /// batch on intra-batch duplicates — pass each event at most once. + pub fn store_events(&self, events: &[(String, Event)]) -> anyhow::Result { + if events.is_empty() { + return Ok(0); + } + let tx = self.conn.unchecked_transaction()?; + let mut inserted = 0usize; + { + // Plain INSERT (no OR IGNORE): a duplicate hash inside the + // batch raises a UNIQUE constraint error and rolls back the + // whole transaction. Cross-call duplicates are still tolerated + // via the per-row helper used by `store_event`, which uses + // INSERT OR IGNORE for that case. + let mut stmt = tx.prepare( + "INSERT INTO events (hash, server_id, channel_id, author, seq, timestamp_hint_ms, event_data) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)", + )?; + for (server_id, event) in events { + let channel_id = extract_channel_id(event); + let event_data = bincode::serialize(event)?; + let rows = stmt.execute(params![ + event.hash.0.as_slice(), + server_id, + channel_id, + event.author.as_bytes(), + event.seq as i64, + event.timestamp_hint_ms as i64, + event_data, + ])?; + inserted += rows; + } + } + tx.commit()?; + Ok(inserted) } /// Query events for a server, optionally filtered by channel, paginated @@ -291,6 +409,31 @@ impl StorageEventStore { } } +/// Insert one event into the open transaction, ignoring rows whose hash +/// already exists. Returns `true` if a row was actually inserted. +fn insert_event_or_ignore( + tx: &rusqlite::Transaction<'_>, + server_id: &str, + event: &Event, +) -> anyhow::Result { + let channel_id = extract_channel_id(event); + let event_data = bincode::serialize(event)?; + let rows = tx.execute( + "INSERT OR IGNORE INTO events (hash, server_id, channel_id, author, seq, timestamp_hint_ms, event_data) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)", + params![ + event.hash.0.as_slice(), + server_id, + channel_id, + event.author.as_bytes(), + event.seq as i64, + event.timestamp_hint_ms as i64, + event_data, + ], + )?; + Ok(rows > 0) +} + /// Extract channel_id from an event, if applicable. fn extract_channel_id(event: &Event) -> String { match &event.kind { @@ -606,4 +749,173 @@ mod tests { "sync_since should also skip corrupt events" ); } + + // --------------------------------------------------------------------- + // Durability + schema versioning tests + // --------------------------------------------------------------------- + + /// Helper: open a fresh file-backed store inside a tempdir. + fn open_file_store() -> (tempfile::TempDir, StorageEventStore) { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("storage.db"); + let store = StorageEventStore::open(path.to_str().unwrap()).unwrap(); + (dir, store) + } + + #[test] + fn wal_mode_is_enabled() { + let (_dir, store) = open_file_store(); + let mode: String = store + .conn + .query_row("PRAGMA journal_mode", [], |row| row.get(0)) + .unwrap(); + assert_eq!(mode.to_lowercase(), "wal"); + } + + #[test] + fn synchronous_full_is_enabled() { + let (_dir, store) = open_file_store(); + let sync: i64 = store + .conn + .query_row("PRAGMA synchronous", [], |row| row.get(0)) + .unwrap(); + // SQLite reports synchronous as an integer: 0=OFF, 1=NORMAL, 2=FULL, + // 3=EXTRA. FULL is what we asked for. + assert_eq!(sync, 2); + } + + #[test] + fn foreign_keys_enabled() { + let (_dir, store) = open_file_store(); + let fk: i64 = store + .conn + .query_row("PRAGMA foreign_keys", [], |row| row.get(0)) + .unwrap(); + assert_eq!(fk, 1); + } + + #[test] + fn schema_version_table_exists_after_open() { + let (_dir, store) = open_file_store(); + let versions: Vec = store + .conn + .prepare("SELECT version FROM schema_version ORDER BY version ASC") + .unwrap() + .query_map([], |row| row.get(0)) + .unwrap() + .map(|r| r.unwrap()) + .collect(); + assert!( + !versions.is_empty(), + "schema_version table should be populated after open" + ); + assert!( + versions.contains(&1), + "migration 1 (initial schema) must be recorded" + ); + } + + #[test] + fn migrations_are_idempotent() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("storage.db"); + let path_str = path.to_str().unwrap(); + + // Open once. + { + let _store = StorageEventStore::open(path_str).unwrap(); + } + // Open again — must not duplicate version rows or error. + let store = StorageEventStore::open(path_str).unwrap(); + + let count: i64 = store + .conn + .query_row( + "SELECT COUNT(*) FROM schema_version WHERE version = 1", + [], + |row| row.get(0), + ) + .unwrap(); + assert_eq!( + count, 1, + "reopening the database must not duplicate the schema_version row" + ); + + let total: i64 = store + .conn + .query_row("SELECT COUNT(*) FROM schema_version", [], |row| row.get(0)) + .unwrap(); + assert_eq!( + total, + MIGRATIONS.len() as i64, + "schema_version row count must match the number of migrations" + ); + } + + #[test] + fn batched_store_inserts_all_rows() { + let (_dir, store) = open_file_store(); + let (id, genesis) = setup_identity_and_genesis("general"); + + let mut prev = genesis.hash; + let mut batch = Vec::new(); + for seq in 2..=6 { + let e = make_message(&id, seq, prev, "general"); + prev = e.hash; + batch.push(("srv-1".to_string(), e)); + } + + let inserted = store.store_events(&batch).unwrap(); + assert_eq!(inserted, 5); + assert_eq!(store.count().unwrap(), 5); + } + + #[test] + fn batched_store_is_atomic() { + let (_dir, store) = open_file_store(); + let (id, genesis) = setup_identity_and_genesis("general"); + + // Three valid events; the third one shares its primary key with the + // first, which forces a UNIQUE-constraint violation mid-batch. + let e1 = make_message(&id, 2, genesis.hash, "general"); + let e2 = make_message(&id, 3, e1.hash, "general"); + let dup = e1.clone(); + + let batch = vec![ + ("srv-1".to_string(), e1), + ("srv-1".to_string(), e2), + ("srv-1".to_string(), dup), + ]; + let result = store.store_events(&batch); + assert!( + result.is_err(), + "duplicate primary key in the batch must surface an error" + ); + + // Critical: every row in the batch must be rolled back. + assert_eq!( + store.count().unwrap(), + 0, + "failed batch must leave the database untouched" + ); + } + + #[test] + fn store_event_after_failed_batch_still_works() { + // Sanity check that a failed batch doesn't poison the connection. + let (_dir, store) = open_file_store(); + let (id, genesis) = setup_identity_and_genesis("general"); + + let e1 = make_message(&id, 2, genesis.hash, "general"); + let dup = e1.clone(); + let _ = store.store_events(&[ + ("srv-1".to_string(), e1.clone()), + ("srv-1".to_string(), dup), + ]); + assert_eq!(store.count().unwrap(), 0); + + // Subsequent single-event store should succeed. + assert!(store.store_event("srv-1", &e1).unwrap()); + assert_eq!(store.count().unwrap(), 1); + } }