diff --git a/Cargo.lock b/Cargo.lock index 2cec12c3..235db292 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6166,6 +6166,7 @@ dependencies = [ "tracing", "tracing-subscriber", "uuid", + "willow-common", "willow-identity", "willow-network", "willow-state", diff --git a/crates/client/src/listeners.rs b/crates/client/src/listeners.rs index 27681c1f..a18aa62f 100644 --- a/crates/client/src/listeners.rs +++ b/crates/client/src/listeners.rs @@ -9,6 +9,7 @@ use crate::mutations; use crate::persistence_actor; use crate::state_actors; use willow_actor::Addr; +use willow_common::SYNC_BATCH_LIMIT; use willow_identity::EndpointId; use willow_network::traits::TopicHandle; use willow_network::traits::{GossipEvent, TopicEvents}; @@ -253,12 +254,13 @@ async fn process_received_message( } crate::ops::WireMessage::SyncBatch { events: batch } => { // Reject oversized batches to prevent memory exhaustion. - const MAX_SYNC_BATCH_SIZE: usize = 10_000; - if batch.len() > MAX_SYNC_BATCH_SIZE { + // Bound matches the producer side in `willow-storage`; the + // shared constant lives in `willow_common::SYNC_BATCH_LIMIT`. + if batch.len() > SYNC_BATCH_LIMIT { tracing::warn!( size = batch.len(), "rejecting oversized sync batch (max {})", - MAX_SYNC_BATCH_SIZE + SYNC_BATCH_LIMIT ); return; } diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index bf9519f4..e9ab8c3a 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -9,3 +9,18 @@ pub mod worker_types; pub use wire::*; pub use worker_types::*; + +/// Maximum number of events allowed in a single sync batch. +/// +/// Single source of truth shared by: +/// - `willow-storage` — caps batches produced by `sync_since` / `history` +/// so the SQLite-backed store cannot OOM a peer. +/// - `willow-client` — rejects oversized inbound `SyncBatch` wire messages +/// so a hostile peer cannot OOM us. +/// +/// Both sides MUST agree on this value: if production exceeds validation, +/// honest peers reject honest batches; if validation exceeds production, +/// the validation cap is dead code. Keeping the constant here in +/// `willow-common` (already a dep of both crates) guarantees they stay +/// aligned at compile time. +pub const SYNC_BATCH_LIMIT: usize = 10_000; diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index dcb39872..2c9a004f 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -12,6 +12,7 @@ willow-worker = { path = "../worker" } willow-state = { path = "../state" } willow-identity = { path = "../identity" } willow-network = { path = "../network" } +willow-common = { path = "../common" } rusqlite = { version = "0.31", features = ["bundled"] } clap = { version = "4", features = ["derive"] } anyhow = { workspace = true } diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index a5480ed3..42d5e2bf 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -20,6 +20,7 @@ //! never reordered or rewritten so existing databases stay consistent. use rusqlite::{params, Connection}; +use willow_common::SYNC_BATCH_LIMIT; use willow_state::{Event, EventKind, HeadsSummary}; /// Ordered list of schema migrations. Each entry is run inside its own @@ -188,7 +189,7 @@ impl StorageEventStore { before: Option<&HeadsSummary>, limit: u32, ) -> anyhow::Result<(Vec, bool)> { - let capped = (limit as usize).min(Self::SYNC_BATCH_LIMIT); + let capped = (limit as usize).min(SYNC_BATCH_LIMIT); let fetch_limit = capped + 1; // Build the query dynamically based on filters. @@ -284,8 +285,8 @@ impl StorageEventStore { /// doesn't know the author at all), return events with seq > their_seq. /// If heads is empty, returns all events for the server. /// Maximum events returned in a single sync batch to prevent OOM. - const SYNC_BATCH_LIMIT: usize = 10_000; - + /// Defined once in `willow_common::SYNC_BATCH_LIMIT` so storage + /// production and client validation cannot drift apart. pub fn sync_since(&self, server_id: &str, heads: &HeadsSummary) -> anyhow::Result> { if heads.heads.is_empty() { // New peer — send up to SYNC_BATCH_LIMIT events for this server. @@ -294,7 +295,7 @@ impl StorageEventStore { )?; let rows: Vec> = stmt .query_map( - rusqlite::params![server_id, Self::SYNC_BATCH_LIMIT as i64], + rusqlite::params![server_id, SYNC_BATCH_LIMIT as i64], |row| row.get(0), )? .filter_map(|r| match r { @@ -343,10 +344,7 @@ impl StorageEventStore { conditions.push(format!("author NOT IN ({known_authors})")); sql.push_str(&conditions.join(" OR ")); - sql.push_str(&format!( - ") ORDER BY seq ASC LIMIT {}", - Self::SYNC_BATCH_LIMIT - )); + sql.push_str(&format!(") ORDER BY seq ASC LIMIT {SYNC_BATCH_LIMIT}")); let mut stmt = self.conn.prepare(&sql)?; @@ -558,7 +556,7 @@ mod tests { .unwrap(); assert_eq!(events.len(), 5); assert!(!has_more); - assert!(events.len() <= StorageEventStore::SYNC_BATCH_LIMIT); + assert!(events.len() <= SYNC_BATCH_LIMIT); } #[test]