diff --git a/crates/state/src/sync.rs b/crates/state/src/sync.rs index 9cd5561e..233e14bc 100644 --- a/crates/state/src/sync.rs +++ b/crates/state/src/sync.rs @@ -141,6 +141,20 @@ pub const DEFAULT_PENDING_MAX_AGE_MS: u64 = 60 * 60 * 1000; /// Default maximum number of pending events across all prev-hash buckets. pub const DEFAULT_PENDING_MAX_ENTRIES: usize = 10_000; +/// Default divisor used to derive a per-author sub-cap from the total +/// `max_entries`. With the default total of 10_000 and a divisor of 50, +/// each author may hold at most 200 pending entries, so a single +/// misbehaving signer cannot consume the whole buffer (SEC-V-08). +pub const DEFAULT_PENDING_PER_AUTHOR_DIVISOR: usize = 50; + +/// Compute the default per-author sub-cap from a total `max_entries`. +/// +/// Returns at least `1` so the buffer remains usable when callers +/// configure absurdly small totals (e.g. in tests). +fn default_per_author_cap(max_entries: usize) -> usize { + (max_entries / DEFAULT_PENDING_PER_AUTHOR_DIVISOR).max(1) +} + /// A single pending entry — the event plus the time it was buffered. /// /// `inserted_at_ms` is `None` when the caller did not supply a timestamp @@ -181,11 +195,24 @@ pub struct PendingBuffer { /// Optional maximum number of pending events. When set, /// inserts auto-evict the oldest entries to stay within limit. max_entries: Option, + /// Optional per-author sub-cap. When set, a single author may hold at + /// most this many pending entries; further inserts from that author + /// are dropped without disturbing other authors' buckets (SEC-V-08). + /// Derived from `max_entries` by default — see + /// [`DEFAULT_PENDING_PER_AUTHOR_DIVISOR`]. + max_per_author: Option, /// Optional maximum age in ms before an entry is evicted. Only applies /// to entries inserted with a timestamp via [`buffer_for_prev_at`]. max_age_ms: Option, /// Cached total count of pending events for O(1) lookups. cached_count: usize, + /// Per-author pending counts, used to enforce `max_per_author`. + /// Kept in sync with `waiting_on_prev` on every insert/resolve/evict. + per_author_count: BTreeMap, + /// Authors we have already warned about hitting their sub-cap. Used to + /// rate-limit the per-author drop warning so one chatty offender does + /// not flood logs (mirrors the `warned_full` pattern in the relay). + warned_full_authors: BTreeSet, } impl PendingBuffer { @@ -200,9 +227,14 @@ impl PendingBuffer { /// the buffer automatically evicts entries to stay within bounds. No /// age-based eviction is applied — use [`PendingBuffer::with_limits`] /// if you also want timeout-based eviction. + /// + /// A per-author sub-cap is derived automatically from `max_entries` + /// (see [`DEFAULT_PENDING_PER_AUTHOR_DIVISOR`]) so a single misbehaving + /// signer cannot consume the whole buffer (SEC-V-08). pub fn with_capacity(max_entries: usize) -> Self { Self { max_entries: Some(max_entries), + max_per_author: Some(default_per_author_cap(max_entries)), ..Self::default() } } @@ -212,14 +244,26 @@ impl PendingBuffer { /// /// Sensible defaults are [`DEFAULT_PENDING_MAX_ENTRIES`] and /// [`DEFAULT_PENDING_MAX_AGE_MS`]. + /// + /// A per-author sub-cap is derived automatically from `max_entries` + /// — see [`PendingBuffer::with_capacity`]. pub fn with_limits(max_entries: usize, max_age_ms: u64) -> Self { Self { max_entries: Some(max_entries), + max_per_author: Some(default_per_author_cap(max_entries)), max_age_ms: Some(max_age_ms), ..Self::default() } } + /// Override the per-author sub-cap. Mainly intended for tests; the + /// default derived by [`PendingBuffer::with_capacity`] / + /// [`PendingBuffer::with_limits`] is correct for production callers. + pub fn with_per_author_cap(mut self, max_per_author: usize) -> Self { + self.max_per_author = Some(max_per_author); + self + } + /// Buffer an event that's waiting for a prev hash to arrive. /// /// Legacy entry point: entries inserted this way have no timestamp and @@ -240,6 +284,27 @@ impl PendingBuffer { } fn insert_entry(&mut self, prev_hash: EventHash, event: Event, inserted_at_ms: Option) { + let author = event.author; + + // Per-author sub-cap (SEC-V-08): one signer cannot fill the + // whole buffer with unresolved events. Drop the new event when + // its author is already at the sub-cap so other authors retain + // room. Warn at most once per author per buffer instance. + if let Some(per_author_limit) = self.max_per_author { + let current = self.per_author_count.get(&author).copied().unwrap_or(0); + if current >= per_author_limit { + if self.warned_full_authors.insert(author) { + tracing::warn!( + author = %author, + per_author_cap = per_author_limit, + event_hash = %event.hash, + "pending buffer: per-author sub-cap reached; dropping further events from this author" + ); + } + return; + } + } + self.waiting_on_prev .entry(prev_hash) .or_default() @@ -248,6 +313,8 @@ impl PendingBuffer { inserted_at_ms, }); self.cached_count += 1; + *self.per_author_count.entry(author).or_insert(0) += 1; + if let Some(limit) = self.max_entries { let evicted = self.evict_to(limit); if evicted > 0 { @@ -260,6 +327,18 @@ impl PendingBuffer { } } + /// Decrement the per-author count for `author`, removing the entry + /// when it hits zero. Saturating subtraction keeps the bookkeeping + /// safe even if a caller double-resolves the same event. + fn decrement_author(&mut self, author: &EndpointId) { + if let Some(count) = self.per_author_count.get_mut(author) { + *count = count.saturating_sub(1); + if *count == 0 { + self.per_author_count.remove(author); + } + } + } + /// Record a cross-author dep that we don't have yet. pub fn record_missing_dep(&mut self, hash: EventHash) { self.missing_deps.insert(hash); @@ -274,6 +353,9 @@ impl PendingBuffer { .remove(inserted_hash) .unwrap_or_default(); self.cached_count = self.cached_count.saturating_sub(entries.len()); + for entry in &entries { + self.decrement_author(&entry.event.author); + } entries.into_iter().map(|e| e.event).collect() } @@ -296,6 +378,7 @@ impl PendingBuffer { None => return 0, }; let mut evicted = 0usize; + let mut evicted_authors: Vec = Vec::new(); let mut empty_keys: Vec = Vec::new(); for (prev_hash, entries) in self.waiting_on_prev.iter_mut() { entries.retain(|entry| { @@ -311,6 +394,7 @@ impl PendingBuffer { "pending buffer: evicting aged-out event" ); evicted += 1; + evicted_authors.push(entry.event.author); false } else { true @@ -324,6 +408,9 @@ impl PendingBuffer { self.waiting_on_prev.remove(&k); } self.cached_count = self.cached_count.saturating_sub(evicted); + for author in evicted_authors { + self.decrement_author(&author); + } evicted } @@ -347,6 +434,13 @@ impl PendingBuffer { let n = entries.len(); evicted += n; self.cached_count = self.cached_count.saturating_sub(n); + // Keep per-author bookkeeping in sync with the + // bucket we just removed (SEC-V-08). + let evicted_authors: Vec = + entries.iter().map(|e| e.event.author).collect(); + for author in evicted_authors { + self.decrement_author(&author); + } } } else { break; @@ -690,7 +784,10 @@ mod tests { #[test] fn pending_buffer_auto_evicts_when_limit_exceeded() { - let mut buf = PendingBuffer::with_capacity(50); + // Use a generous per-author cap so this test exercises the + // global capacity-eviction path rather than the SEC-V-08 + // per-author sub-cap. + let mut buf = PendingBuffer::with_capacity(50).with_per_author_cap(1_000); let id = Identity::generate(); // Buffer 100 events with unique prev hashes (simulating gaps). for i in 0u64..100 { @@ -1166,11 +1263,15 @@ mod tests { /// After filling to `max_entries + 1` the oldest entry is evicted /// so the count stays at the configured capacity. + /// + /// The per-author sub-cap is widened so this test exercises the + /// global capacity path independently of SEC-V-08. #[test] fn capacity_eviction_drops_oldest_when_exceeded() { let id = Identity::generate(); let max_entries = 5usize; - let mut buf = PendingBuffer::with_limits(max_entries, 60_000); + let mut buf = + PendingBuffer::with_limits(max_entries, 60_000).with_per_author_cap(max_entries + 10); for i in 0..max_entries as u64 { let (prev, event) = make_pending_event(&id, i); @@ -1189,11 +1290,15 @@ mod tests { } /// `pending_count()` accurately reflects eviction activity. + /// + /// Uses an explicit per-author cap so the test focuses on the + /// interaction between age- and capacity-based eviction (not + /// SEC-V-08 sub-cap behaviour, which is covered separately). #[test] fn pending_count_reflects_both_eviction_policies() { let id = Identity::generate(); let max_age_ms = 1_000u64; - let mut buf = PendingBuffer::with_limits(4, max_age_ms); + let mut buf = PendingBuffer::with_limits(4, max_age_ms).with_per_author_cap(100); // Four fresh entries at t=0 → fills capacity exactly. for i in 0..4u64 { @@ -1239,4 +1344,169 @@ mod tests { buf.buffer_for_prev_at(prev, event, 42); assert_eq!(buf.pending_count(), 1); } + + // ── SEC-V-08: per-author sub-cap on pending buffer ────────────── + + /// Helper: build a pending event whose author is `id` and whose + /// `prev` hash is unique (so each event lands in its own bucket). + fn make_pending_event_for(id: &Identity, seed: u64) -> (EventHash, Event) { + let mut hash_bytes = [0u8; 32]; + hash_bytes[..8].copy_from_slice(&seed.to_le_bytes()); + let prev = EventHash(hash_bytes); + let event = Event::new( + id, + seed + 2, + prev, + vec![], + EventKind::SetProfile { + display_name: format!("a-{seed}"), + }, + 0, + ); + (prev, event) + } + + /// One author cannot fill the whole buffer: events past the + /// per-author sub-cap from a single signer are dropped while other + /// authors keep their slots. + #[test] + fn per_author_cap_isolates_one_signer() { + let attacker = Identity::generate(); + let victim = Identity::generate(); + + let per_author = 3usize; + let mut buf = PendingBuffer::with_capacity(1_000).with_per_author_cap(per_author); + + // Attacker tries to insert 10 events — only `per_author` survive. + for i in 0..10u64 { + let (prev, event) = make_pending_event_for(&attacker, i); + buf.buffer_for_prev(prev, event); + } + assert_eq!( + buf.pending_count(), + per_author, + "attacker must be capped to the per-author limit" + ); + + // Victim can still insert events — their bucket is untouched. + for i in 100..103u64 { + let (prev, event) = make_pending_event_for(&victim, i); + buf.buffer_for_prev(prev, event); + } + assert_eq!( + buf.pending_count(), + per_author + 3, + "victim's slots must not be consumed by the attacker" + ); + } + + /// `with_capacity` and `with_limits` derive the per-author cap from + /// the total `max_entries` using `DEFAULT_PENDING_PER_AUTHOR_DIVISOR`. + #[test] + fn per_author_cap_defaults_to_total_over_divisor() { + let id = Identity::generate(); + let total = 5_000usize; + let expected_cap = total / DEFAULT_PENDING_PER_AUTHOR_DIVISOR; // 100 + + let mut buf = PendingBuffer::with_capacity(total); + + // Fill well past the expected cap from one author. + for i in 0..(expected_cap as u64 + 50) { + let (prev, event) = make_pending_event_for(&id, i); + buf.buffer_for_prev(prev, event); + } + + // Per-author count must clamp at the derived sub-cap, not the + // 5000-event global cap. + assert_eq!( + buf.pending_count(), + expected_cap, + "default per-author cap should be max_entries / DEFAULT_PENDING_PER_AUTHOR_DIVISOR" + ); + } + + /// Tiny totals still yield a usable per-author cap (>= 1) — the + /// `(max / divisor).max(1)` floor keeps tests configurable. + #[test] + fn per_author_cap_floor_is_one() { + let id = Identity::generate(); + // 5 / 50 == 0; floor brings it back to 1. + let mut buf = PendingBuffer::with_capacity(5); + let (prev_a, event_a) = make_pending_event_for(&id, 0); + buf.buffer_for_prev(prev_a, event_a); + + let (prev_b, event_b) = make_pending_event_for(&id, 1); + buf.buffer_for_prev(prev_b, event_b); + + assert_eq!( + buf.pending_count(), + 1, + "per-author cap should never collapse to zero" + ); + } + + /// The first time an author hits the sub-cap, we record it in + /// `warned_full_authors` so subsequent drops from the same author + /// don't re-emit a warning. Other authors still get their own first + /// warning. + #[test] + fn per_author_cap_warns_once_per_author() { + let attacker = Identity::generate(); + let other = Identity::generate(); + + let mut buf = PendingBuffer::with_capacity(1_000).with_per_author_cap(1); + + // First insert succeeds. + let (prev_a, event_a) = make_pending_event_for(&attacker, 0); + buf.buffer_for_prev(prev_a, event_a); + assert!(buf.warned_full_authors.is_empty()); + + // Second insert from same author hits the cap → warn once. + let (prev_b, event_b) = make_pending_event_for(&attacker, 1); + buf.buffer_for_prev(prev_b, event_b); + assert!(buf.warned_full_authors.contains(&attacker.endpoint_id())); + assert_eq!(buf.warned_full_authors.len(), 1); + + // Third insert from same author also drops, but the bookkeeping + // set still has exactly one entry for that author. + let (prev_c, event_c) = make_pending_event_for(&attacker, 2); + buf.buffer_for_prev(prev_c, event_c); + assert_eq!(buf.warned_full_authors.len(), 1); + + // Different author → independent warning lifecycle. + let (prev_d, event_d) = make_pending_event_for(&other, 0); + buf.buffer_for_prev(prev_d, event_d); + let (prev_e, event_e) = make_pending_event_for(&other, 1); + buf.buffer_for_prev(prev_e, event_e); + assert!(buf.warned_full_authors.contains(&other.endpoint_id())); + assert_eq!(buf.warned_full_authors.len(), 2); + } + + /// Resolving an author's events frees their slot so subsequent + /// inserts from that author are accepted again. + #[test] + fn per_author_cap_frees_on_resolve() { + let id = Identity::generate(); + let mut buf = PendingBuffer::with_capacity(1_000).with_per_author_cap(2); + + let (prev_a, event_a) = make_pending_event_for(&id, 0); + buf.buffer_for_prev(prev_a, event_a); + let (prev_b, event_b) = make_pending_event_for(&id, 1); + buf.buffer_for_prev(prev_b, event_b); + assert_eq!(buf.pending_count(), 2); + + // Cap reached — extra is dropped. + let (prev_c, event_c) = make_pending_event_for(&id, 2); + buf.buffer_for_prev(prev_c, event_c); + assert_eq!(buf.pending_count(), 2); + + // Resolve one of the existing entries — frees a slot. + let _ = buf.resolve(&prev_a); + assert_eq!(buf.pending_count(), 1); + + // Now a new event from the same author fits again. + let (prev_d, event_d) = make_pending_event_for(&id, 3); + buf.buffer_for_prev(prev_d, event_d); + assert_eq!(buf.pending_count(), 2); + } } diff --git a/crates/state/src/tests.rs b/crates/state/src/tests.rs index aa4272a0..50f77dc2 100644 --- a/crates/state/src/tests.rs +++ b/crates/state/src/tests.rs @@ -2430,8 +2430,11 @@ fn deep_pending_chain_does_not_stack_overflow() { let genesis_hash = managed.dag().genesis().unwrap().hash; - // Build a chain of 3000 events. - let chain_len = 3_000; + // Build a chain of 1500 events. Kept below the per-author sub-cap + // (max_entries / DEFAULT_PENDING_PER_AUTHOR_DIVISOR == 2000) so the + // SEC-V-08 cap doesn't drop chain links — this test is about + // iterative resolution, not capacity policy. + let chain_len = 1_500; let mut events = Vec::with_capacity(chain_len); let mut prev = genesis_hash; for seq_offset in 0..chain_len { @@ -2874,9 +2877,11 @@ fn pending_buffer_eviction_reduces_count_to_cap() { use crate::sync::PendingBuffer; // Insert more events than the cap and verify cached_count stays <= cap. + // Override the SEC-V-08 per-author sub-cap so this test focuses on + // global capacity-eviction behaviour. let id = Identity::generate(); let cap = 10usize; - let mut buf = PendingBuffer::with_capacity(cap); + let mut buf = PendingBuffer::with_capacity(cap).with_per_author_cap(1_000); for i in 0u64..50 { let mut hash_bytes = [0u8; 32];