Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions crates/client/src/listeners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::mutations;
use crate::persistence_actor;
use crate::state_actors;
use willow_actor::Addr;
use willow_common::SYNC_BATCH_LIMIT;
use willow_identity::EndpointId;
use willow_network::traits::TopicHandle;
use willow_network::traits::{GossipEvent, TopicEvents};
Expand Down Expand Up @@ -253,12 +254,13 @@ async fn process_received_message<T: TopicHandle>(
}
crate::ops::WireMessage::SyncBatch { events: batch } => {
// Reject oversized batches to prevent memory exhaustion.
const MAX_SYNC_BATCH_SIZE: usize = 10_000;
if batch.len() > MAX_SYNC_BATCH_SIZE {
// Bound matches the producer side in `willow-storage`; the
// shared constant lives in `willow_common::SYNC_BATCH_LIMIT`.
if batch.len() > SYNC_BATCH_LIMIT {
tracing::warn!(
size = batch.len(),
"rejecting oversized sync batch (max {})",
MAX_SYNC_BATCH_SIZE
SYNC_BATCH_LIMIT
);
return;
}
Expand Down
15 changes: 15 additions & 0 deletions crates/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,18 @@ pub mod worker_types;

pub use wire::*;
pub use worker_types::*;

/// Maximum number of events allowed in a single sync batch.
///
/// Single source of truth shared by:
/// - `willow-storage` — caps batches produced by `sync_since` / `history`
/// so the SQLite-backed store cannot OOM a peer.
/// - `willow-client` — rejects oversized inbound `SyncBatch` wire messages
/// so a hostile peer cannot OOM us.
///
/// Both sides MUST agree on this value: if production exceeds validation,
/// honest peers reject honest batches; if validation exceeds production,
/// the validation cap is dead code. Keeping the constant here in
/// `willow-common` (already a dep of both crates) guarantees they stay
/// aligned at compile time.
pub const SYNC_BATCH_LIMIT: usize = 10_000;
1 change: 1 addition & 0 deletions crates/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ willow-worker = { path = "../worker" }
willow-state = { path = "../state" }
willow-identity = { path = "../identity" }
willow-network = { path = "../network" }
willow-common = { path = "../common" }
rusqlite = { version = "0.31", features = ["bundled"] }
clap = { version = "4", features = ["derive"] }
anyhow = { workspace = true }
Expand Down
16 changes: 7 additions & 9 deletions crates/storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
//! never reordered or rewritten so existing databases stay consistent.

use rusqlite::{params, Connection};
use willow_common::SYNC_BATCH_LIMIT;
use willow_state::{Event, EventKind, HeadsSummary};

/// Ordered list of schema migrations. Each entry is run inside its own
Expand Down Expand Up @@ -188,7 +189,7 @@ impl StorageEventStore {
before: Option<&HeadsSummary>,
limit: u32,
) -> anyhow::Result<(Vec<Event>, bool)> {
let capped = (limit as usize).min(Self::SYNC_BATCH_LIMIT);
let capped = (limit as usize).min(SYNC_BATCH_LIMIT);
let fetch_limit = capped + 1;

// Build the query dynamically based on filters.
Expand Down Expand Up @@ -284,8 +285,8 @@ impl StorageEventStore {
/// doesn't know the author at all), return events with seq > their_seq.
/// If heads is empty, returns all events for the server.
/// Maximum events returned in a single sync batch to prevent OOM.
const SYNC_BATCH_LIMIT: usize = 10_000;

/// Defined once in `willow_common::SYNC_BATCH_LIMIT` so storage
/// production and client validation cannot drift apart.
pub fn sync_since(&self, server_id: &str, heads: &HeadsSummary) -> anyhow::Result<Vec<Event>> {
if heads.heads.is_empty() {
// New peer — send up to SYNC_BATCH_LIMIT events for this server.
Expand All @@ -294,7 +295,7 @@ impl StorageEventStore {
)?;
let rows: Vec<Vec<u8>> = stmt
.query_map(
rusqlite::params![server_id, Self::SYNC_BATCH_LIMIT as i64],
rusqlite::params![server_id, SYNC_BATCH_LIMIT as i64],
|row| row.get(0),
)?
.filter_map(|r| match r {
Expand Down Expand Up @@ -343,10 +344,7 @@ impl StorageEventStore {
conditions.push(format!("author NOT IN ({known_authors})"));

sql.push_str(&conditions.join(" OR "));
sql.push_str(&format!(
") ORDER BY seq ASC LIMIT {}",
Self::SYNC_BATCH_LIMIT
));
sql.push_str(&format!(") ORDER BY seq ASC LIMIT {SYNC_BATCH_LIMIT}"));

let mut stmt = self.conn.prepare(&sql)?;

Expand Down Expand Up @@ -558,7 +556,7 @@ mod tests {
.unwrap();
assert_eq!(events.len(), 5);
assert!(!has_more);
assert!(events.len() <= StorageEventStore::SYNC_BATCH_LIMIT);
assert!(events.len() <= SYNC_BATCH_LIMIT);
}

#[test]
Expand Down