From a0813b22ee8de052763d5a1105fce0993dd51a6b Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 28 Apr 2026 00:25:30 +0000 Subject: [PATCH 1/8] chore: open auto-fix batch 2026-04-28-002530 From 6846ff7c7ff6defc9762065b82824e975e0115cf Mon Sep 17 00:00:00 2001 From: intendednull Date: Mon, 27 Apr 2026 17:45:11 -0700 Subject: [PATCH 2/8] fix(agent): readonly scope must deny join_links (#450) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TokenScope::allows_resource returned true for every URI, so ReadOnly and Messaging tokens could read willow://server/join-links and harvest link_id values. link_id is a single-step bearer credential for WireMessage::JoinRequest, so any client with a low-privilege token could join a server unattended (audit finding AUD-2). Per-scope match now: Full/Admin = all URIs, ReadOnly/Messaging = all except willow://server/join-links, Custom(set) = explicit allowlist (now also gates resources, not just tools — doc updated). Tests added at lowest tier (Rust unit + agent integration): - scopes::tests::readonly_scope_denies_join_links - scopes::tests::messaging_scope_denies_join_links - scopes::tests::full_and_admin_allow_join_links - scopes::tests::custom_resource_allowlist_gates_join_links - e2e::readonly_list_resources_omits_join_links - e2e::readonly_scope_rejects_join_links_read (replaces the prior stub-closure denied_uri_rejects_with_invalid_request — now drives the gate with a real WillowMcpServer) - readonly_token_hides_tools updated to expect join-links denied while every other URI stays visible. Tradeoff: option (a) from the issue tightens Custom(set) to also gate resources, not tools only. Runner-up was leaving Custom unchanged and only filtering join-links in ReadOnly/Messaging — rejected because the issue's preferred patch makes Custom an explicit allowlist for both surfaces, which is the safer default for least-privilege tokens. Refs #436 Co-authored-by: Claude --- crates/agent/src/scopes.rs | 77 ++++++++++++++++++++++++--- crates/agent/tests/e2e.rs | 105 ++++++++++++++++++++++++++++--------- 2 files changed, 152 insertions(+), 30 deletions(-) diff --git a/crates/agent/src/scopes.rs b/crates/agent/src/scopes.rs index 7ebc9c34..33efe75e 100644 --- a/crates/agent/src/scopes.rs +++ b/crates/agent/src/scopes.rs @@ -5,19 +5,32 @@ use std::collections::HashSet; +/// URI of the join-links resource. The `link_id` values surfaced through this +/// resource are single-step bearer credentials for `WireMessage::JoinRequest`, +/// so any scope below `Full`/`Admin` MUST be denied access. See issue #436 +/// (audit finding AUD-2). +const JOIN_LINKS_URI: &str = "willow://server/join-links"; + /// Scope of a bearer token, controlling tool and resource access. +/// +/// Resource access is **not** uniform across all scopes: `willow://server/join-links` +/// exposes bearer credentials usable by `WireMessage::JoinRequest`, so it is +/// restricted to the privileged scopes (`Full`, `Admin`) and to `Custom` token +/// allowlists that explicitly include the URI. #[derive(Debug, Clone, Default)] pub enum TokenScope { /// All tools, all resources. #[default] Full, - /// No tools, all resources. + /// No tools, all resources except `willow://server/join-links`. ReadOnly, - /// Messaging tools only, all resources. + /// Messaging tools only, all resources except `willow://server/join-links`. Messaging, /// All tools, all resources (semantically distinct from Full for audit). Admin, - /// Explicit allowlist of tool names. + /// Explicit allowlist of tool *and* resource names. A name in the set + /// authorises either a tool call or a resource read with that exact + /// identifier. Custom(HashSet), } @@ -45,9 +58,16 @@ impl TokenScope { } /// Returns true if the given resource URI is allowed. - pub fn allows_resource(&self, _uri: &str) -> bool { - // All scopes allow all resources. - true + /// + /// `willow://server/join-links` exposes single-step bearer credentials and + /// is therefore gated to `Full`/`Admin` (or a `Custom` set that lists it). + /// All other URIs are allowed under `ReadOnly` and `Messaging`. + pub fn allows_resource(&self, uri: &str) -> bool { + match self { + Self::Full | Self::Admin => true, + Self::ReadOnly | Self::Messaging => !uri.starts_with(JOIN_LINKS_URI), + Self::Custom(set) => set.contains(uri), + } } } @@ -73,6 +93,45 @@ mod tests { assert!(scope.allows_resource("willow://server/channels")); } + #[test] + fn readonly_scope_denies_join_links() { + let scope = TokenScope::ReadOnly; + assert!( + !scope.allows_resource("willow://server/join-links"), + "ReadOnly scope must NOT expose join-links: link_id is a bearer credential (issue #436)" + ); + } + + #[test] + fn messaging_scope_denies_join_links() { + let scope = TokenScope::Messaging; + assert!( + !scope.allows_resource("willow://server/join-links"), + "Messaging scope must NOT expose join-links: link_id is a bearer credential (issue #436)" + ); + } + + #[test] + fn full_and_admin_allow_join_links() { + assert!(TokenScope::Full.allows_resource("willow://server/join-links")); + assert!(TokenScope::Admin.allows_resource("willow://server/join-links")); + } + + #[test] + fn custom_resource_allowlist_gates_join_links() { + // Custom set without the URI denies it. + let mut set = HashSet::new(); + set.insert("send_message".to_string()); + let scope = TokenScope::Custom(set); + assert!(!scope.allows_resource("willow://server/join-links")); + + // Custom set including the URI grants it. + let mut set = HashSet::new(); + set.insert("willow://server/join-links".to_string()); + let scope = TokenScope::Custom(set); + assert!(scope.allows_resource("willow://server/join-links")); + } + #[test] fn messaging_allows_only_messaging_tools() { let scope = TokenScope::Messaging; @@ -88,6 +147,7 @@ mod tests { assert!(!scope.allows_tool("kick_member")); assert!(!scope.allows_tool("create_server")); assert!(scope.allows_resource("willow://identity")); + assert!(scope.allows_resource("willow://server/channels")); } #[test] @@ -95,12 +155,17 @@ mod tests { let mut set = HashSet::new(); set.insert("send_message".to_string()); set.insert("react".to_string()); + // Custom is also an explicit resource allowlist now (issue #436), + // so include the identity URI to keep its access in this test. + set.insert("willow://identity".to_string()); let scope = TokenScope::Custom(set); assert!(scope.allows_tool("send_message")); assert!(scope.allows_tool("react")); assert!(!scope.allows_tool("create_channel")); assert!(!scope.allows_tool("kick_member")); assert!(scope.allows_resource("willow://identity")); + // URI not in the set is denied. + assert!(!scope.allows_resource("willow://server/channels")); } #[test] diff --git a/crates/agent/tests/e2e.rs b/crates/agent/tests/e2e.rs index 7591cf44..3c012ea3 100644 --- a/crates/agent/tests/e2e.rs +++ b/crates/agent/tests/e2e.rs @@ -573,17 +573,72 @@ async fn readonly_token_hides_tools() { visible.iter().map(|t| &t.name).collect::>() ); - // Resources should all be visible + // All resources except `willow://server/join-links` are visible. The + // join-links resource exposes single-step bearer credentials usable by + // `WireMessage::JoinRequest`, so it is restricted to `Full`/`Admin` + // (audit finding AUD-2, issue #436). let resources = willow_agent::resources::list_resources(); for r in &resources { - assert!( - server.scope.allows_resource(&r.raw.uri), - "ReadOnly should allow resource: {}", - r.raw.uri - ); + if r.raw.uri == "willow://server/join-links" { + assert!( + !server.scope.allows_resource(&r.raw.uri), + "ReadOnly must NOT expose {}: link_id is a bearer credential", + r.raw.uri + ); + } else { + assert!( + server.scope.allows_resource(&r.raw.uri), + "ReadOnly should allow resource: {}", + r.raw.uri + ); + } } } +/// Integration-tier check: under `ReadOnly`, the same filter pipeline used by +/// `WillowMcpServer::list_resources` must omit the join-links entry. +/// +/// `ServerHandler::list_resources` cannot be invoked directly from outside +/// rmcp (the `RequestContext` is not externally constructible), +/// so we replicate the exact filter pipeline that the handler runs: +/// `resources::list_resources().filter(|r| scope.allows_resource(&r.raw.uri))`. +#[tokio::test] +async fn readonly_list_resources_omits_join_links() { + use willow_agent::scopes::TokenScope; + use willow_agent::server::WillowMcpServer; + + let (client, _broker) = test_client(); + let server = WillowMcpServer::with_scope(client.clone(), TokenScope::ReadOnly); + + let visible: Vec = willow_agent::resources::list_resources() + .into_iter() + .filter(|r| server.scope.allows_resource(&r.raw.uri)) + .map(|r| r.raw.uri) + .collect(); + + assert!( + !visible.iter().any(|u| u == "willow://server/join-links"), + "ReadOnly list_resources must omit join-links, got: {visible:?}" + ); + // Sanity: other resources are still listed. + assert!(visible.iter().any(|u| u == "willow://identity")); + assert!(visible.iter().any(|u| u == "willow://server/channels")); + + // `Full` scope must still see join-links (positive control). + let full = WillowMcpServer::with_scope(client, TokenScope::Full); + let visible_full: Vec = willow_agent::resources::list_resources() + .into_iter() + .filter(|r| full.scope.allows_resource(&r.raw.uri)) + .map(|r| r.raw.uri) + .collect(); + assert!( + visible_full + .iter() + .any(|u| u == "willow://server/join-links"), + "Full scope should still list join-links" + ); +} + #[tokio::test] async fn messaging_scope_restricts_tools() { use willow_agent::scopes::TokenScope; @@ -733,31 +788,33 @@ async fn read_resource_allowed_uri_returns_resource() { ); } -/// Test that `WillowMcpServer::read_resource` would reject a URI that the scope -/// denies, with the same error code (`INVALID_REQUEST`) that `call_tool` uses. +/// Test that `WillowMcpServer::read_resource` rejects `willow://server/join-links` +/// for a `ReadOnly`-scoped token with `INVALID_REQUEST`, matching the error +/// code `call_tool` uses when scope blocks a tool. /// -/// `read_resource` on `WillowMcpServer` mirrors `call_tool`'s gate: +/// `read_resource` on `WillowMcpServer` has two layers: /// 1. Scope check — returns `Err(ErrorData)` immediately if the URI is blocked /// 2. Dispatch to `resources::read_resource` — only reached if scope allows /// -/// Today no built-in `TokenScope` variant denies any resource, so we drive the -/// negative path with a stub closure that mimics a future scope returning false -/// for one URI. This pins the contract: blocked reads return `INVALID_REQUEST`, -/// not silently dispatch. Pattern mirrors `readonly_scope_rejects_send_message`. +/// Layer 1 is what this test pins: under `ReadOnly`, join-links is now denied +/// (audit finding AUD-2, issue #436). The `RequestContext` needed +/// by the `ServerHandler` trait method cannot be constructed from outside the +/// rmcp crate, so we replicate the exact gate that `read_resource` runs. #[tokio::test] -async fn denied_uri_rejects_with_invalid_request() { +async fn readonly_scope_rejects_join_links_read() { use rmcp::model::ErrorCode; + use willow_agent::scopes::TokenScope; + use willow_agent::server::WillowMcpServer; + + let (client, _broker) = test_client(); + let server = WillowMcpServer::with_scope(client, TokenScope::ReadOnly); - // Stub scope that denies one specific URI — mimics what a future tightened - // scope (e.g. hiding `willow://server/join-links` from messaging tokens) - // would do once `TokenScope::allows_resource` returns something other - // than `true`. let denied_uri = "willow://server/join-links"; - let scope_allows_resource = |uri: &str| uri != denied_uri; + // The real `ReadOnly` scope must deny the join-links URI now. assert!( - !scope_allows_resource(denied_uri), - "stub scope must deny {denied_uri}" + !server.scope.allows_resource(denied_uri), + "ReadOnly must deny {denied_uri} (link_id is a bearer credential)" ); // Replicate exactly what WillowMcpServer::read_resource does before @@ -783,10 +840,10 @@ async fn denied_uri_rejects_with_invalid_request() { "error message should mention the blocked URI" ); - // Sanity: a URI the stub allows must pass the gate. + // Sanity: non-credential URIs still pass under ReadOnly. assert!( - scope_allows_resource("willow://identity"), - "stub scope must allow non-denied URIs" + server.scope.allows_resource("willow://identity"), + "ReadOnly must still allow non-credential URIs" ); } From 5b78e6178e715e932c43164003a7be6f64e11a97 Mon Sep 17 00:00:00 2001 From: intendednull Date: Mon, 27 Apr 2026 18:10:21 -0700 Subject: [PATCH 3/8] fix(state): per-author cap for pending buffer (#451) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SEC-V-08. PendingBuffer had only a global cap (5_000 in client). One chatty signer could fill 100% of the slots with unresolved-prev events and starve legitimate out-of-order events for the whole 1 h eviction window. Adds a per-author sub-cap. Default = max_entries / 50 (200 at the client cap, 100 with the state-crate default of 10_000), with a floor of 1 so tiny test caps remain usable. When an author is at the sub-cap, further events from that author are dropped without touching other authors' buckets. Bookkeeping mirrors the existing cached_count pattern: per_author_count BTreeMap updated on insert / resolve / evict_expired / evict_to. Warn-once-per-author via warned_full_authors set so a single offender cannot flood logs. Why a config knob and not a hardcoded constant: tests need to override the sub-cap to exercise the global eviction policy in isolation. with_per_author_cap(usize) is the explicit override; production callers stay on the derived default. Runner-up was a hardcoded constant — rejected because three existing tests would either need multi-author rewriting or magic-number tuning, and the override is cheaper to read. Tests at the lowest tier (state crate, sync.rs): * per_author_cap_isolates_one_signer * per_author_cap_defaults_to_total_over_divisor * per_author_cap_floor_is_one * per_author_cap_warns_once_per_author * per_author_cap_frees_on_resolve Adapted three existing capacity-eviction tests to set an explicit sub-cap (so they keep testing the global policy): * sync::tests::pending_buffer_auto_evicts_when_limit_exceeded * sync::tests::capacity_eviction_drops_oldest_when_exceeded * sync::tests::pending_count_reflects_both_eviction_policies * tests::pending_buffer_eviction_reduces_count_to_cap Reduced deep_pending_chain_does_not_stack_overflow chain depth from 3000 to 1500 so it fits within the 100_000-cap'd ManagedDag's per- author budget of 2000. Test still proves the iterative resolution path; capacity policy is validated separately. Refs #237 Co-authored-by: Claude --- crates/state/src/sync.rs | 276 +++++++++++++++++++++++++++++++++++++- crates/state/src/tests.rs | 11 +- 2 files changed, 281 insertions(+), 6 deletions(-) diff --git a/crates/state/src/sync.rs b/crates/state/src/sync.rs index 9cd5561e..233e14bc 100644 --- a/crates/state/src/sync.rs +++ b/crates/state/src/sync.rs @@ -141,6 +141,20 @@ pub const DEFAULT_PENDING_MAX_AGE_MS: u64 = 60 * 60 * 1000; /// Default maximum number of pending events across all prev-hash buckets. pub const DEFAULT_PENDING_MAX_ENTRIES: usize = 10_000; +/// Default divisor used to derive a per-author sub-cap from the total +/// `max_entries`. With the default total of 10_000 and a divisor of 50, +/// each author may hold at most 200 pending entries, so a single +/// misbehaving signer cannot consume the whole buffer (SEC-V-08). +pub const DEFAULT_PENDING_PER_AUTHOR_DIVISOR: usize = 50; + +/// Compute the default per-author sub-cap from a total `max_entries`. +/// +/// Returns at least `1` so the buffer remains usable when callers +/// configure absurdly small totals (e.g. in tests). +fn default_per_author_cap(max_entries: usize) -> usize { + (max_entries / DEFAULT_PENDING_PER_AUTHOR_DIVISOR).max(1) +} + /// A single pending entry — the event plus the time it was buffered. /// /// `inserted_at_ms` is `None` when the caller did not supply a timestamp @@ -181,11 +195,24 @@ pub struct PendingBuffer { /// Optional maximum number of pending events. When set, /// inserts auto-evict the oldest entries to stay within limit. max_entries: Option, + /// Optional per-author sub-cap. When set, a single author may hold at + /// most this many pending entries; further inserts from that author + /// are dropped without disturbing other authors' buckets (SEC-V-08). + /// Derived from `max_entries` by default — see + /// [`DEFAULT_PENDING_PER_AUTHOR_DIVISOR`]. + max_per_author: Option, /// Optional maximum age in ms before an entry is evicted. Only applies /// to entries inserted with a timestamp via [`buffer_for_prev_at`]. max_age_ms: Option, /// Cached total count of pending events for O(1) lookups. cached_count: usize, + /// Per-author pending counts, used to enforce `max_per_author`. + /// Kept in sync with `waiting_on_prev` on every insert/resolve/evict. + per_author_count: BTreeMap, + /// Authors we have already warned about hitting their sub-cap. Used to + /// rate-limit the per-author drop warning so one chatty offender does + /// not flood logs (mirrors the `warned_full` pattern in the relay). + warned_full_authors: BTreeSet, } impl PendingBuffer { @@ -200,9 +227,14 @@ impl PendingBuffer { /// the buffer automatically evicts entries to stay within bounds. No /// age-based eviction is applied — use [`PendingBuffer::with_limits`] /// if you also want timeout-based eviction. + /// + /// A per-author sub-cap is derived automatically from `max_entries` + /// (see [`DEFAULT_PENDING_PER_AUTHOR_DIVISOR`]) so a single misbehaving + /// signer cannot consume the whole buffer (SEC-V-08). pub fn with_capacity(max_entries: usize) -> Self { Self { max_entries: Some(max_entries), + max_per_author: Some(default_per_author_cap(max_entries)), ..Self::default() } } @@ -212,14 +244,26 @@ impl PendingBuffer { /// /// Sensible defaults are [`DEFAULT_PENDING_MAX_ENTRIES`] and /// [`DEFAULT_PENDING_MAX_AGE_MS`]. + /// + /// A per-author sub-cap is derived automatically from `max_entries` + /// — see [`PendingBuffer::with_capacity`]. pub fn with_limits(max_entries: usize, max_age_ms: u64) -> Self { Self { max_entries: Some(max_entries), + max_per_author: Some(default_per_author_cap(max_entries)), max_age_ms: Some(max_age_ms), ..Self::default() } } + /// Override the per-author sub-cap. Mainly intended for tests; the + /// default derived by [`PendingBuffer::with_capacity`] / + /// [`PendingBuffer::with_limits`] is correct for production callers. + pub fn with_per_author_cap(mut self, max_per_author: usize) -> Self { + self.max_per_author = Some(max_per_author); + self + } + /// Buffer an event that's waiting for a prev hash to arrive. /// /// Legacy entry point: entries inserted this way have no timestamp and @@ -240,6 +284,27 @@ impl PendingBuffer { } fn insert_entry(&mut self, prev_hash: EventHash, event: Event, inserted_at_ms: Option) { + let author = event.author; + + // Per-author sub-cap (SEC-V-08): one signer cannot fill the + // whole buffer with unresolved events. Drop the new event when + // its author is already at the sub-cap so other authors retain + // room. Warn at most once per author per buffer instance. + if let Some(per_author_limit) = self.max_per_author { + let current = self.per_author_count.get(&author).copied().unwrap_or(0); + if current >= per_author_limit { + if self.warned_full_authors.insert(author) { + tracing::warn!( + author = %author, + per_author_cap = per_author_limit, + event_hash = %event.hash, + "pending buffer: per-author sub-cap reached; dropping further events from this author" + ); + } + return; + } + } + self.waiting_on_prev .entry(prev_hash) .or_default() @@ -248,6 +313,8 @@ impl PendingBuffer { inserted_at_ms, }); self.cached_count += 1; + *self.per_author_count.entry(author).or_insert(0) += 1; + if let Some(limit) = self.max_entries { let evicted = self.evict_to(limit); if evicted > 0 { @@ -260,6 +327,18 @@ impl PendingBuffer { } } + /// Decrement the per-author count for `author`, removing the entry + /// when it hits zero. Saturating subtraction keeps the bookkeeping + /// safe even if a caller double-resolves the same event. + fn decrement_author(&mut self, author: &EndpointId) { + if let Some(count) = self.per_author_count.get_mut(author) { + *count = count.saturating_sub(1); + if *count == 0 { + self.per_author_count.remove(author); + } + } + } + /// Record a cross-author dep that we don't have yet. pub fn record_missing_dep(&mut self, hash: EventHash) { self.missing_deps.insert(hash); @@ -274,6 +353,9 @@ impl PendingBuffer { .remove(inserted_hash) .unwrap_or_default(); self.cached_count = self.cached_count.saturating_sub(entries.len()); + for entry in &entries { + self.decrement_author(&entry.event.author); + } entries.into_iter().map(|e| e.event).collect() } @@ -296,6 +378,7 @@ impl PendingBuffer { None => return 0, }; let mut evicted = 0usize; + let mut evicted_authors: Vec = Vec::new(); let mut empty_keys: Vec = Vec::new(); for (prev_hash, entries) in self.waiting_on_prev.iter_mut() { entries.retain(|entry| { @@ -311,6 +394,7 @@ impl PendingBuffer { "pending buffer: evicting aged-out event" ); evicted += 1; + evicted_authors.push(entry.event.author); false } else { true @@ -324,6 +408,9 @@ impl PendingBuffer { self.waiting_on_prev.remove(&k); } self.cached_count = self.cached_count.saturating_sub(evicted); + for author in evicted_authors { + self.decrement_author(&author); + } evicted } @@ -347,6 +434,13 @@ impl PendingBuffer { let n = entries.len(); evicted += n; self.cached_count = self.cached_count.saturating_sub(n); + // Keep per-author bookkeeping in sync with the + // bucket we just removed (SEC-V-08). + let evicted_authors: Vec = + entries.iter().map(|e| e.event.author).collect(); + for author in evicted_authors { + self.decrement_author(&author); + } } } else { break; @@ -690,7 +784,10 @@ mod tests { #[test] fn pending_buffer_auto_evicts_when_limit_exceeded() { - let mut buf = PendingBuffer::with_capacity(50); + // Use a generous per-author cap so this test exercises the + // global capacity-eviction path rather than the SEC-V-08 + // per-author sub-cap. + let mut buf = PendingBuffer::with_capacity(50).with_per_author_cap(1_000); let id = Identity::generate(); // Buffer 100 events with unique prev hashes (simulating gaps). for i in 0u64..100 { @@ -1166,11 +1263,15 @@ mod tests { /// After filling to `max_entries + 1` the oldest entry is evicted /// so the count stays at the configured capacity. + /// + /// The per-author sub-cap is widened so this test exercises the + /// global capacity path independently of SEC-V-08. #[test] fn capacity_eviction_drops_oldest_when_exceeded() { let id = Identity::generate(); let max_entries = 5usize; - let mut buf = PendingBuffer::with_limits(max_entries, 60_000); + let mut buf = + PendingBuffer::with_limits(max_entries, 60_000).with_per_author_cap(max_entries + 10); for i in 0..max_entries as u64 { let (prev, event) = make_pending_event(&id, i); @@ -1189,11 +1290,15 @@ mod tests { } /// `pending_count()` accurately reflects eviction activity. + /// + /// Uses an explicit per-author cap so the test focuses on the + /// interaction between age- and capacity-based eviction (not + /// SEC-V-08 sub-cap behaviour, which is covered separately). #[test] fn pending_count_reflects_both_eviction_policies() { let id = Identity::generate(); let max_age_ms = 1_000u64; - let mut buf = PendingBuffer::with_limits(4, max_age_ms); + let mut buf = PendingBuffer::with_limits(4, max_age_ms).with_per_author_cap(100); // Four fresh entries at t=0 → fills capacity exactly. for i in 0..4u64 { @@ -1239,4 +1344,169 @@ mod tests { buf.buffer_for_prev_at(prev, event, 42); assert_eq!(buf.pending_count(), 1); } + + // ── SEC-V-08: per-author sub-cap on pending buffer ────────────── + + /// Helper: build a pending event whose author is `id` and whose + /// `prev` hash is unique (so each event lands in its own bucket). + fn make_pending_event_for(id: &Identity, seed: u64) -> (EventHash, Event) { + let mut hash_bytes = [0u8; 32]; + hash_bytes[..8].copy_from_slice(&seed.to_le_bytes()); + let prev = EventHash(hash_bytes); + let event = Event::new( + id, + seed + 2, + prev, + vec![], + EventKind::SetProfile { + display_name: format!("a-{seed}"), + }, + 0, + ); + (prev, event) + } + + /// One author cannot fill the whole buffer: events past the + /// per-author sub-cap from a single signer are dropped while other + /// authors keep their slots. + #[test] + fn per_author_cap_isolates_one_signer() { + let attacker = Identity::generate(); + let victim = Identity::generate(); + + let per_author = 3usize; + let mut buf = PendingBuffer::with_capacity(1_000).with_per_author_cap(per_author); + + // Attacker tries to insert 10 events — only `per_author` survive. + for i in 0..10u64 { + let (prev, event) = make_pending_event_for(&attacker, i); + buf.buffer_for_prev(prev, event); + } + assert_eq!( + buf.pending_count(), + per_author, + "attacker must be capped to the per-author limit" + ); + + // Victim can still insert events — their bucket is untouched. + for i in 100..103u64 { + let (prev, event) = make_pending_event_for(&victim, i); + buf.buffer_for_prev(prev, event); + } + assert_eq!( + buf.pending_count(), + per_author + 3, + "victim's slots must not be consumed by the attacker" + ); + } + + /// `with_capacity` and `with_limits` derive the per-author cap from + /// the total `max_entries` using `DEFAULT_PENDING_PER_AUTHOR_DIVISOR`. + #[test] + fn per_author_cap_defaults_to_total_over_divisor() { + let id = Identity::generate(); + let total = 5_000usize; + let expected_cap = total / DEFAULT_PENDING_PER_AUTHOR_DIVISOR; // 100 + + let mut buf = PendingBuffer::with_capacity(total); + + // Fill well past the expected cap from one author. + for i in 0..(expected_cap as u64 + 50) { + let (prev, event) = make_pending_event_for(&id, i); + buf.buffer_for_prev(prev, event); + } + + // Per-author count must clamp at the derived sub-cap, not the + // 5000-event global cap. + assert_eq!( + buf.pending_count(), + expected_cap, + "default per-author cap should be max_entries / DEFAULT_PENDING_PER_AUTHOR_DIVISOR" + ); + } + + /// Tiny totals still yield a usable per-author cap (>= 1) — the + /// `(max / divisor).max(1)` floor keeps tests configurable. + #[test] + fn per_author_cap_floor_is_one() { + let id = Identity::generate(); + // 5 / 50 == 0; floor brings it back to 1. + let mut buf = PendingBuffer::with_capacity(5); + let (prev_a, event_a) = make_pending_event_for(&id, 0); + buf.buffer_for_prev(prev_a, event_a); + + let (prev_b, event_b) = make_pending_event_for(&id, 1); + buf.buffer_for_prev(prev_b, event_b); + + assert_eq!( + buf.pending_count(), + 1, + "per-author cap should never collapse to zero" + ); + } + + /// The first time an author hits the sub-cap, we record it in + /// `warned_full_authors` so subsequent drops from the same author + /// don't re-emit a warning. Other authors still get their own first + /// warning. + #[test] + fn per_author_cap_warns_once_per_author() { + let attacker = Identity::generate(); + let other = Identity::generate(); + + let mut buf = PendingBuffer::with_capacity(1_000).with_per_author_cap(1); + + // First insert succeeds. + let (prev_a, event_a) = make_pending_event_for(&attacker, 0); + buf.buffer_for_prev(prev_a, event_a); + assert!(buf.warned_full_authors.is_empty()); + + // Second insert from same author hits the cap → warn once. + let (prev_b, event_b) = make_pending_event_for(&attacker, 1); + buf.buffer_for_prev(prev_b, event_b); + assert!(buf.warned_full_authors.contains(&attacker.endpoint_id())); + assert_eq!(buf.warned_full_authors.len(), 1); + + // Third insert from same author also drops, but the bookkeeping + // set still has exactly one entry for that author. + let (prev_c, event_c) = make_pending_event_for(&attacker, 2); + buf.buffer_for_prev(prev_c, event_c); + assert_eq!(buf.warned_full_authors.len(), 1); + + // Different author → independent warning lifecycle. + let (prev_d, event_d) = make_pending_event_for(&other, 0); + buf.buffer_for_prev(prev_d, event_d); + let (prev_e, event_e) = make_pending_event_for(&other, 1); + buf.buffer_for_prev(prev_e, event_e); + assert!(buf.warned_full_authors.contains(&other.endpoint_id())); + assert_eq!(buf.warned_full_authors.len(), 2); + } + + /// Resolving an author's events frees their slot so subsequent + /// inserts from that author are accepted again. + #[test] + fn per_author_cap_frees_on_resolve() { + let id = Identity::generate(); + let mut buf = PendingBuffer::with_capacity(1_000).with_per_author_cap(2); + + let (prev_a, event_a) = make_pending_event_for(&id, 0); + buf.buffer_for_prev(prev_a, event_a); + let (prev_b, event_b) = make_pending_event_for(&id, 1); + buf.buffer_for_prev(prev_b, event_b); + assert_eq!(buf.pending_count(), 2); + + // Cap reached — extra is dropped. + let (prev_c, event_c) = make_pending_event_for(&id, 2); + buf.buffer_for_prev(prev_c, event_c); + assert_eq!(buf.pending_count(), 2); + + // Resolve one of the existing entries — frees a slot. + let _ = buf.resolve(&prev_a); + assert_eq!(buf.pending_count(), 1); + + // Now a new event from the same author fits again. + let (prev_d, event_d) = make_pending_event_for(&id, 3); + buf.buffer_for_prev(prev_d, event_d); + assert_eq!(buf.pending_count(), 2); + } } diff --git a/crates/state/src/tests.rs b/crates/state/src/tests.rs index aa4272a0..50f77dc2 100644 --- a/crates/state/src/tests.rs +++ b/crates/state/src/tests.rs @@ -2430,8 +2430,11 @@ fn deep_pending_chain_does_not_stack_overflow() { let genesis_hash = managed.dag().genesis().unwrap().hash; - // Build a chain of 3000 events. - let chain_len = 3_000; + // Build a chain of 1500 events. Kept below the per-author sub-cap + // (max_entries / DEFAULT_PENDING_PER_AUTHOR_DIVISOR == 2000) so the + // SEC-V-08 cap doesn't drop chain links — this test is about + // iterative resolution, not capacity policy. + let chain_len = 1_500; let mut events = Vec::with_capacity(chain_len); let mut prev = genesis_hash; for seq_offset in 0..chain_len { @@ -2874,9 +2877,11 @@ fn pending_buffer_eviction_reduces_count_to_cap() { use crate::sync::PendingBuffer; // Insert more events than the cap and verify cached_count stays <= cap. + // Override the SEC-V-08 per-author sub-cap so this test focuses on + // global capacity-eviction behaviour. let id = Identity::generate(); let cap = 10usize; - let mut buf = PendingBuffer::with_capacity(cap); + let mut buf = PendingBuffer::with_capacity(cap).with_per_author_cap(1_000); for i in 0u64..50 { let mut hash_bytes = [0u8; 32]; From 7ee7e0afd1b96f68a95f36e57ba8b8de78fbbc83 Mon Sep 17 00:00:00 2001 From: intendednull Date: Mon, 27 Apr 2026 18:33:28 -0700 Subject: [PATCH 4/8] fix(state): cap RotateChannelKey + Event.deps (SEC-V-07) (#452) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Anti-DoS caps for two attacker-controlled vectors that fan out via .clone() into per-peer state. Ref #236. Caps + tier: - MAX_EVENT_DEPS = 64. EventDag::insert rejects oversize deps with InsertError::DepsTooLong. Author-side path covered too because every event reaches `insert` before reaching state. - MAX_ENCRYPTED_KEY_BYTES = 128. EventDag::insert rejects oversize per-blob payloads inside RotateChannelKey.encrypted_keys with InsertError::EncryptedKeyTooLarge. One X25519-sealed key fits well under that. - MAX_ENCRYPTED_KEYS_OVER_MEMBERS = 4. apply_event for RotateChannelKey rejects when encrypted_keys.len() exceeds state.members.len() + epsilon. Needs state context, so it lives at apply time alongside the existing member gate. Tier choice: caps that need no state context go at the DAG insert boundary so they fire regardless of which call path produced the event. The member-count cap requires `state.members` and naturally slots into the existing apply_event Rejected branch. Runner-up: stuffing all three into Event::new. Rejected because Event::new is currently infallible and changing it cascades to ~25 production + test callsites for no extra coverage — every event already passes through dag.insert before mutating state. willow-replay matches on InsertError exhaustively; added warn arms for the two new variants. Tests (state crate, 5 added): - dag_insert_rejects_deps_over_cap - dag_insert_accepts_deps_at_cap - dag_insert_rejects_oversized_encrypted_key - apply_rotate_channel_key_rejects_excess_entries_over_member_count - apply_rotate_channel_key_accepts_at_member_count_plus_epsilon Local: cargo fmt --check, cargo clippy --workspace --all-targets -D warnings, cargo test --workspace, cargo check --target wasm32-unknown-unknown all green. https://claude.ai/code/session_01VdcwSdvqig423A5CX2fSzR Co-authored-by: Claude --- crates/replay/src/role.rs | 9 ++ crates/state/src/dag.rs | 52 +++++++-- crates/state/src/event.rs | 25 ++++ crates/state/src/materialize.rs | 21 +++- crates/state/src/tests.rs | 196 ++++++++++++++++++++++++++++++++ 5 files changed, 295 insertions(+), 8 deletions(-) diff --git a/crates/replay/src/role.rs b/crates/replay/src/role.rs index 628f7f3d..616f6208 100644 --- a/crates/replay/src/role.rs +++ b/crates/replay/src/role.rs @@ -189,6 +189,15 @@ impl ReplayRole { Err(InsertError::PermissionDenied(reason)) => { warn!(%reason, "rejected event: permission denied"); } + Err(InsertError::DepsTooLong { got, max }) => { + warn!(got, max, "rejected event: deps over cap (SEC-V-07)"); + } + Err(InsertError::EncryptedKeyTooLarge { got, max }) => { + warn!( + got, max, + "rejected event: RotateChannelKey blob over cap (SEC-V-07)", + ); + } } } } diff --git a/crates/state/src/dag.rs b/crates/state/src/dag.rs index b88374e1..8fceefae 100644 --- a/crates/state/src/dag.rs +++ b/crates/state/src/dag.rs @@ -8,7 +8,7 @@ use std::collections::HashMap; use willow_identity::{EndpointId, Identity}; -use crate::event::{Event, EventKind}; +use crate::event::{Event, EventKind, MAX_ENCRYPTED_KEY_BYTES, MAX_EVENT_DEPS}; use crate::hash::EventHash; /// Error returned when inserting an event into the DAG fails. @@ -39,6 +39,13 @@ pub enum InsertError { vote: EventHash, proposal: EventHash, }, + /// `event.deps.len()` exceeds [`crate::event::MAX_EVENT_DEPS`]. + /// Anti-DoS cap; see SEC-V-07 (#236). + DepsTooLong { got: usize, max: usize }, + /// One entry inside `RotateChannelKey.encrypted_keys` carries a + /// blob larger than [`crate::event::MAX_ENCRYPTED_KEY_BYTES`]. + /// Anti-DoS cap; see SEC-V-07 (#236). + EncryptedKeyTooLarge { got: usize, max: usize }, /// Author lacks the required permission for this EventKind. PermissionDenied(String), } @@ -74,6 +81,13 @@ impl std::fmt::Display for InsertError { f, "Vote event {vote} must include proposal {proposal} in deps" ), + Self::DepsTooLong { got, max } => { + write!(f, "event.deps too long: {got} entries (max {max})") + } + Self::EncryptedKeyTooLarge { got, max } => write!( + f, + "RotateChannelKey encrypted blob too large: {got} bytes (max {max})" + ), Self::PermissionDenied(reason) => write!(f, "permission denied: {reason}"), } } @@ -118,12 +132,36 @@ impl EventDag { return Err(InsertError::InvalidSignature); } - // 2. Check duplicate. + // 2. Anti-DoS vector caps (SEC-V-07). + // + // A peer holding any permission could otherwise broadcast events + // with pathologically large `deps` or `encrypted_keys` blobs that + // every other peer would clone into a `BTreeMap` during apply. + // Reject at the inbound DAG boundary so over-cap events never + // even reach `applied_events` / `materialize`. + if event.deps.len() > MAX_EVENT_DEPS { + return Err(InsertError::DepsTooLong { + got: event.deps.len(), + max: MAX_EVENT_DEPS, + }); + } + if let EventKind::RotateChannelKey { encrypted_keys, .. } = &event.kind { + for (_, blob) in encrypted_keys { + if blob.len() > MAX_ENCRYPTED_KEY_BYTES { + return Err(InsertError::EncryptedKeyTooLarge { + got: blob.len(), + max: MAX_ENCRYPTED_KEY_BYTES, + }); + } + } + } + + // 3. Check duplicate. if self.events.contains_key(&event.hash) { return Err(InsertError::Duplicate); } - // 3. Genesis check: first event must be CreateServer. + // 4. Genesis check: first event must be CreateServer. // After genesis is set, reject any further CreateServer events. if self.genesis_hash.is_none() { match &event.kind { @@ -143,7 +181,7 @@ impl EventDag { return Err(InsertError::DuplicateGenesis); } - // 4. Check seq: must be latest_seq + 1. + // 5. Check seq: must be latest_seq + 1. // This also prevents equivocation: an author cannot insert two // events at the same seq number because only seq = latest + 1 // is accepted. Combined with the prev-hash check below, this @@ -157,7 +195,7 @@ impl EventDag { }); } - // 5. Check prev: must match current head (or ZERO for seq=1). + // 6. Check prev: must match current head (or ZERO for seq=1). let expected_prev = self .heads .get(&event.author) @@ -171,7 +209,7 @@ impl EventDag { }); } - // 6. Governance structural checks: Vote events must causally + // 7. Governance structural checks: Vote events must causally // depend on their proposal (via deps or prev) so topological // sort always places the proposal before the vote. if let EventKind::Vote { proposal, .. } = &event.kind { @@ -183,7 +221,7 @@ impl EventDag { } } - // 7. Insert. + // 8. Insert. let hash = event.hash; let author = event.author; self.events.insert(hash, event); diff --git a/crates/state/src/event.rs b/crates/state/src/event.rs index 885180c8..653dc9f1 100644 --- a/crates/state/src/event.rs +++ b/crates/state/src/event.rs @@ -9,6 +9,31 @@ use willow_identity::{EndpointId, Identity, Signature}; use crate::hash::EventHash; +// ───── Vector caps (anti-DoS) ────────────────────────────────────────────── +// +// These caps bound per-event memory growth so a single misbehaving peer +// holding a permission can't blow up every other peer's heap by emitting +// pathologically large vectors. See SEC-V-07 (#236). + +/// Maximum number of cross-author causal hashes an event may carry in +/// `deps`. Legitimate events reference at most a handful of recent +/// other-author heads; 64 is comfortably above that ceiling and keeps +/// the per-event payload small. +pub const MAX_EVENT_DEPS: usize = 64; + +/// Maximum byte length of a single encrypted-channel-key blob inside +/// `EventKind::RotateChannelKey.encrypted_keys`. One X25519-sealed +/// channel key fits well under 128 bytes (32-byte ciphertext + tag + +/// ephemeral pubkey = ~80 bytes); 128 leaves slack without giving a +/// hostile author room to bloat each entry. +pub const MAX_ENCRYPTED_KEY_BYTES: usize = 128; + +/// Slack added to the current member count when capping +/// `RotateChannelKey.encrypted_keys.len()`. The legitimate ceiling is +/// "one entry per current member"; epsilon absorbs benign races between +/// membership changes and key-rotation construction. +pub const MAX_ENCRYPTED_KEYS_OVER_MEMBERS: usize = 4; + // ───── Permission ────────────────────────────────────────────────────────── /// Permission types that can be granted directly by any admin. diff --git a/crates/state/src/materialize.rs b/crates/state/src/materialize.rs index 416e6987..4f057a69 100644 --- a/crates/state/src/materialize.rs +++ b/crates/state/src/materialize.rs @@ -10,7 +10,7 @@ use std::collections::{BTreeMap, BTreeSet}; use willow_identity::EndpointId; use crate::dag::EventDag; -use crate::event::{Event, EventKind, Permission, ProposedAction}; +use crate::event::{Event, EventKind, Permission, ProposedAction, MAX_ENCRYPTED_KEYS_OVER_MEMBERS}; use crate::hash::EventHash; use crate::server::{PendingProposal, ServerState}; use crate::types::{ @@ -631,6 +631,25 @@ fn apply_mutation(state: &mut ServerState, event: &Event) -> ApplyResult { if !state.members.contains_key(&event.author) { return ApplyResult::Rejected(format!("author '{}' is not a member", event.author)); } + // Anti-DoS cap (SEC-V-07). A legitimate RotateChannelKey + // carries at most one entry per current member; epsilon + // absorbs benign races between membership changes and key + // rotation. Anything beyond that is a fabricated-id flood — + // every entry would otherwise `.clone()` into the per-server + // `BTreeMap>` on every peer. + let cap = state + .members + .len() + .saturating_add(MAX_ENCRYPTED_KEYS_OVER_MEMBERS); + if encrypted_keys.len() > cap { + return ApplyResult::Rejected(format!( + "RotateChannelKey: {} encrypted_keys exceeds cap {} (members={} + epsilon={})", + encrypted_keys.len(), + cap, + state.members.len(), + MAX_ENCRYPTED_KEYS_OVER_MEMBERS, + )); + } if !encrypted_keys.is_empty() { let keys = state.channel_keys.entry(channel_id.clone()).or_default(); for (peer_id, key_bytes) in encrypted_keys { diff --git a/crates/state/src/tests.rs b/crates/state/src/tests.rs index 50f77dc2..371d692c 100644 --- a/crates/state/src/tests.rs +++ b/crates/state/src/tests.rs @@ -4516,3 +4516,199 @@ fn set_permission_legacy_unknown_string_drops_silently() { "unknown legacy permission must apply as a no-op" ); } + +// ── Issue #236 (SEC-V-07): vector caps ───────────────────────────────── +// +// `Event.deps` and `RotateChannelKey.encrypted_keys` are both attacker- +// controlled vectors that fan out via `.clone()` into per-peer state. +// Cap them at the inbound DAG boundary (deps + per-blob byte size) and +// at the materializer (encrypted_keys.len() vs current member count). + +#[test] +fn dag_insert_rejects_deps_over_cap() { + use crate::dag::InsertError; + use crate::event::MAX_EVENT_DEPS; + + let admin = Identity::generate(); + let mut dag = test_dag(&admin); + + // Build deps vector one entry over the cap. + let bad_deps: Vec = (0..=MAX_EVENT_DEPS) + .map(|i| EventHash::from_bytes(&i.to_le_bytes())) + .collect(); + assert_eq!(bad_deps.len(), MAX_EVENT_DEPS + 1); + + let bloated = dag.create_event( + &admin, + EventKind::SetProfile { + display_name: "x".into(), + }, + bad_deps, + 0, + ); + let err = dag.insert(bloated).unwrap_err(); + match err { + InsertError::DepsTooLong { got, max } => { + assert_eq!(got, MAX_EVENT_DEPS + 1); + assert_eq!(max, MAX_EVENT_DEPS); + } + other => panic!("expected DepsTooLong, got {other:?}"), + } +} + +#[test] +fn dag_insert_accepts_deps_at_cap() { + use crate::event::MAX_EVENT_DEPS; + + let admin = Identity::generate(); + let mut dag = test_dag(&admin); + + let ok_deps: Vec = (0..MAX_EVENT_DEPS) + .map(|i| EventHash::from_bytes(&i.to_le_bytes())) + .collect(); + assert_eq!(ok_deps.len(), MAX_EVENT_DEPS); + + let event = dag.create_event( + &admin, + EventKind::SetProfile { + display_name: "x".into(), + }, + ok_deps, + 0, + ); + dag.insert(event).expect("deps at cap must be accepted"); +} + +#[test] +fn dag_insert_rejects_oversized_encrypted_key() { + use crate::dag::InsertError; + use crate::event::MAX_ENCRYPTED_KEY_BYTES; + + let admin = Identity::generate(); + let mut dag = test_dag(&admin); + + // One entry whose blob is one byte over the cap. + let too_big = vec![0xab; MAX_ENCRYPTED_KEY_BYTES + 1]; + let bloated = dag.create_event( + &admin, + EventKind::RotateChannelKey { + channel_id: "ch-1".into(), + encrypted_keys: vec![(admin.endpoint_id(), too_big)], + }, + vec![], + 0, + ); + let err = dag.insert(bloated).unwrap_err(); + match err { + InsertError::EncryptedKeyTooLarge { got, max } => { + assert_eq!(got, MAX_ENCRYPTED_KEY_BYTES + 1); + assert_eq!(max, MAX_ENCRYPTED_KEY_BYTES); + } + other => panic!("expected EncryptedKeyTooLarge, got {other:?}"), + } +} + +#[test] +fn apply_rotate_channel_key_rejects_excess_entries_over_member_count() { + use crate::event::MAX_ENCRYPTED_KEYS_OVER_MEMBERS; + use crate::managed::ManagedDag; + use crate::materialize::ApplyResult; + + let admin = Identity::generate(); + let mut managed = ManagedDag::new(&admin, "S", 5000).unwrap(); + + // Create channel. + let create = managed.dag().create_event( + &admin, + EventKind::CreateChannel { + name: "general".into(), + channel_id: "ch-1".into(), + kind: crate::types::ChannelKind::Text, + ephemeral: None, + }, + vec![], + 0, + ); + managed.insert_and_apply(create).unwrap(); + + // Sole admin = 1 member. Cap = 1 + epsilon. Submit (cap + 1) entries. + let member_count = managed.state().members.len(); + assert_eq!(member_count, 1); + let cap = member_count + MAX_ENCRYPTED_KEYS_OVER_MEMBERS; + // Use real generated identities so each EndpointId is a valid curve + // point (`EndpointId::from_bytes` rejects non-curve inputs). + let bloat: Vec<(willow_identity::EndpointId, Vec)> = (0..(cap + 1)) + .map(|_| (Identity::generate().endpoint_id(), vec![0xaa])) + .collect(); + assert_eq!(bloat.len(), cap + 1); + + let rotate = managed.dag().create_event( + &admin, + EventKind::RotateChannelKey { + channel_id: "ch-1".into(), + encrypted_keys: bloat, + }, + vec![], + 10, + ); + let outcome = managed.insert_and_apply(rotate).unwrap(); + assert!( + matches!(outcome.apply_result, Some(ApplyResult::Rejected(_))), + "over-cap rotate must be rejected at apply: {:?}", + outcome.apply_result, + ); + // No channel_keys entry created — state untouched by rejected event. + assert!(!managed.state().channel_keys.contains_key("ch-1")); +} + +#[test] +fn apply_rotate_channel_key_accepts_at_member_count_plus_epsilon() { + use crate::event::MAX_ENCRYPTED_KEYS_OVER_MEMBERS; + use crate::managed::ManagedDag; + use crate::materialize::ApplyResult; + + let admin = Identity::generate(); + let mut managed = ManagedDag::new(&admin, "S", 5000).unwrap(); + + let create = managed.dag().create_event( + &admin, + EventKind::CreateChannel { + name: "general".into(), + channel_id: "ch-1".into(), + kind: crate::types::ChannelKind::Text, + ephemeral: None, + }, + vec![], + 0, + ); + managed.insert_and_apply(create).unwrap(); + + // Cap = members + epsilon. Submit exactly that many — must succeed. + let member_count = managed.state().members.len(); + let cap = member_count + MAX_ENCRYPTED_KEYS_OVER_MEMBERS; + // Use real generated identities so each EndpointId is a valid curve + // point (`EndpointId::from_bytes` rejects non-curve inputs). + let entries: Vec<(willow_identity::EndpointId, Vec)> = (0..cap) + .map(|_| (Identity::generate().endpoint_id(), vec![0xaa])) + .collect(); + + let rotate = managed.dag().create_event( + &admin, + EventKind::RotateChannelKey { + channel_id: "ch-1".into(), + encrypted_keys: entries, + }, + vec![], + 10, + ); + let outcome = managed.insert_and_apply(rotate).unwrap(); + assert!( + matches!(outcome.apply_result, Some(ApplyResult::Applied)), + "boundary case (members + epsilon) must apply: {:?}", + outcome.apply_result, + ); + assert_eq!( + managed.state().channel_keys.get("ch-1").map(|m| m.len()), + Some(cap), + ); +} From 3ee8a98aff351b221dd01c64af683f9f587c791f Mon Sep 17 00:00:00 2001 From: intendednull Date: Mon, 27 Apr 2026 18:57:09 -0700 Subject: [PATCH 5/8] fix(relay): cap TopicAnnounce + per-signer slots (SEC-V-06) (#453) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three layered caps on the TopicAnnounce listener: - MAX_TOPICS_PER_ANNOUNCE = 64 — drop announce before any per-topic work runs. Prior code iterated unbounded; a 256 KB envelope of 2-byte topics meant ~128 000 blake3 hashes per message (CPU amplification). - MAX_TOPICS_PER_SIGNER = 100 with per-signer LRU eviction. Stops one peer from monopolising the global slot table (MAX_TOPICS = 10 000) and starving legitimate clients. Hand-rolled VecDeque + HashMap refcount: insert appends, eviction pops front, refcount==0 triggers network.unsubscribe so the global slot is actually freed. - WARN_RATE_LIMIT = 60s. Replaces the once-per-session warned_full flag so operators see ongoing pressure without log spam. Applied to per-message-cap, global-cap, and per-signer-eviction warns. Refactored listener body around a pure AnnounceState struct that returns TopicActions describing what to do on the network — keeps the state machine unit-testable without driving MemNetwork at 10 000-topic scale. Tests added (relay-tier, lowest covering behaviour): - topic_announce_listener_rejects_oversized_announce — integration test: 65-topic announce dropped, sentinel announce still works. - announce_state_per_signer_lru_evicts_oldest — fills 100 topics for one signer, the 101st evicts t0 and subscribes to the new one. - announce_state_per_signer_lru_does_not_starve_other_signers — A fills its quota, B can still subscribe. - announce_state_repeat_announce_promotes_lru_no_resubscribe — LRU touch on re-announce, no network call. - announce_state_shared_topic_refcount_keeps_subscription — refcount keeps subscription alive when one signer evicts. - announce_state_rejects_at_global_cap — fills 10 000 slots across multiple signers, fresh signer's new topic rejected. - should_emit_warn_rate_limits_to_one_per_window — directly exercises the rate-limit helper. - topic_announce_listener_enforces_max_topics_cap removed (its single- announce-of-10001 setup is now blocked by the per-message cap; the global-cap behaviour is exercised at the unit tier instead). Tradeoff considered: enforcing the per-message cap in willow-common's unpack_wire would protect every consumer, but bincode does not expose inline length caps without a custom Visitor and the relay is the only production consumer; defense-in-depth at the wire layer is a follow-up if we add more consumers. Per-message cap lives in the relay listener where the load-bearing work happens. Refs #235 Co-authored-by: Claude --- crates/relay/src/lib.rs | 507 ++++++++++++++++++++++++++++++++------ crates/replay/src/role.rs | 4 +- 2 files changed, 435 insertions(+), 76 deletions(-) diff --git a/crates/relay/src/lib.rs b/crates/relay/src/lib.rs index c187257d..4c2031c6 100644 --- a/crates/relay/src/lib.rs +++ b/crates/relay/src/lib.rs @@ -41,15 +41,16 @@ //! merge/replay correctness. Relayed bytes are never trusted on the //! strength of having passed through this crate. -use std::collections::HashSet; +use std::collections::{HashMap, VecDeque}; use std::net::SocketAddr; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::net::TcpStream; use tokio::sync::Semaphore; use tracing::{info, warn}; +use willow_identity::EndpointId; use willow_network::traits::{GossipEvent, TopicEvents}; use willow_network::Network; @@ -85,13 +86,49 @@ pub const BOOTSTRAP_DRAIN_BUFFER_CAP: usize = 8 * 1024; /// Maximum number of distinct channel topics the topic-announce /// listener will subscribe to. Once this cap is reached the listener -/// silently drops further unique announces (after a one-shot warn). +/// drops further unique announces (with rate-limited warns; see +/// [`WARN_RATE_LIMIT`]). pub const MAX_TOPICS: usize = 10_000; /// Maximum length, in bytes, of a topic string accepted from a /// `TopicAnnounce` message. Anything longer is rejected outright. pub const MAX_TOPIC_LEN: usize = 256; +/// Maximum number of topic entries accepted in a single `TopicAnnounce` +/// wire message. Larger announces are rejected outright before any +/// per-topic work runs (validation + blake3 hash). Without this cap a +/// peer can ship a 256 KB envelope containing ~128 000 two-byte topics +/// and force the relay to do O(n) work per message — a CPU-amplification +/// vector. 64 distinct channels per announce is comfortably more than +/// any legitimate client subscribes to in a single batch. +pub const MAX_TOPICS_PER_ANNOUNCE: usize = 64; + +/// Maximum number of distinct topic subscriptions the relay will hold +/// on behalf of a single signer (`EndpointId`). Once a signer reaches +/// this cap, announcing a new topic evicts that signer's least-recently +/// used topic (LRU). This prevents one peer from monopolising the +/// global slot table ([`MAX_TOPICS`]) and starving other clients. +pub const MAX_TOPICS_PER_SIGNER: usize = 100; + +/// Minimum gap between repeated cap-hit warns. Replaces the previous +/// once-per-session flag so operators see ongoing pressure without +/// having the log spammed by every announce that overflows the cap. +pub const WARN_RATE_LIMIT: Duration = Duration::from_secs(60); + +/// Returns `true` and updates `last_warn` to `now` iff a warn should be +/// emitted now: either no warn has been emitted yet, or the previous +/// warn is older than `interval`. Pulled out so tests can exercise the +/// rate-limiter directly without poking at the tracing subscriber. +pub fn should_emit_warn(last_warn: &mut Option, now: Instant, interval: Duration) -> bool { + match *last_warn { + Some(prev) if now.duration_since(prev) < interval => false, + _ => { + *last_warn = Some(now); + true + } + } +} + /// Returns `true` iff `s` is a syntactically valid channel topic /// string: non-empty, no longer than [`MAX_TOPIC_LEN`] bytes, and /// composed entirely of ASCII alphanumerics or `_ / : . -`. @@ -390,11 +427,126 @@ pub async fn run_proxy_listener( } } +/// In-memory state for [`topic_announce_listener`]. +/// +/// Pulled out of the listener's stack so the per-message processing +/// step can be exercised by unit tests against a `MemNetwork` without +/// having to drive the full `events` stream. Each field's invariants +/// are documented inline. +#[derive(Default)] +struct AnnounceState { + /// Reference count of how many distinct signers currently hold a + /// subscription on each topic. The relay holds exactly one gossip + /// subscription per topic with a non-zero refcount; the entry is + /// removed (and the subscription torn down) when the count hits 0. + /// Size is bounded by [`MAX_TOPICS`]. + topic_refcount: HashMap, + + /// Per-signer LRU of currently-held topics, oldest at the front. + /// Bounded by [`MAX_TOPICS_PER_SIGNER`] entries per signer; on + /// overflow the front entry is evicted (and its global refcount + /// decremented) before the new topic is inserted at the back. + /// Re-announcing an existing topic promotes it to the back. + signer_topics: HashMap>, + + /// Last time a per-message-cap warn was emitted; throttles the log + /// at [`WARN_RATE_LIMIT`] so an attacker cannot spam the log. + warn_per_msg_last: Option, + /// Last time a global-cap warn was emitted (rate-limited). + warn_global_full_last: Option, + /// Last time a per-signer-eviction warn was emitted (rate-limited). + warn_signer_evict_last: Option, +} + +/// Outcome of processing a single topic for a given signer. The +/// network-level effects (subscribe / unsubscribe) are returned so the +/// caller drives them outside `&mut self`. A single announce can yield +/// up to two actions: an LRU eviction may free a global slot +/// (Unsubscribe) and the new topic may need joining (Subscribe). +#[derive(Debug, Default, PartialEq, Eq)] +struct TopicActions { + /// Topic to unsubscribe from (LRU eviction freed the last reference). + pub unsubscribe: Option, + /// Topic to subscribe to (first reference globally). + pub subscribe: Option, + /// True iff this topic was rejected by the global cap. + pub rejected_global: bool, + /// True iff this topic triggered a per-signer LRU eviction. + pub evicted_for_signer: bool, +} + +impl AnnounceState { + /// Apply one announced topic from `signer`. Returns network-level + /// effects the caller must drive. Pure with respect to the network. + fn process_topic(&mut self, signer: EndpointId, topic_str: &str) -> TopicActions { + let mut actions = TopicActions::default(); + + let entry = self.signer_topics.entry(signer).or_default(); + + // If signer already holds this topic, promote it (LRU touch). + if let Some(pos) = entry.iter().position(|t| t == topic_str) { + if let Some(t) = entry.remove(pos) { + entry.push_back(t); + } + return actions; + } + + // Per-signer cap: evict signer's LRU topic to make room. + if entry.len() >= MAX_TOPICS_PER_SIGNER { + actions.evicted_for_signer = true; + if let Some(oldest) = entry.pop_front() { + if let Some(count) = self.topic_refcount.get_mut(&oldest) { + *count -= 1; + if *count == 0 { + self.topic_refcount.remove(&oldest); + actions.unsubscribe = Some(oldest); + } + } + } + } + + // Global cap: reject if topic is new globally and table is full. + let already_global = self.topic_refcount.contains_key(topic_str); + if !already_global && self.topic_refcount.len() >= MAX_TOPICS { + actions.rejected_global = true; + return actions; + } + + // Accept: append to signer queue and bump global refcount. + let entry = self.signer_topics.entry(signer).or_default(); + entry.push_back(topic_str.to_string()); + let count = self + .topic_refcount + .entry(topic_str.to_string()) + .or_insert(0); + *count += 1; + if *count == 1 { + actions.subscribe = Some(topic_str.to_string()); + } + actions + } +} + /// Listen for `TopicAnnounce` messages on the server-ops topic and /// dynamically subscribe to announced channel topics. Topics are /// validated against [`topic_str_is_valid`] and the number of distinct /// subscribed topics is capped at [`MAX_TOPICS`]. /// +/// Three layered caps protect the relay from CPU amplification and +/// slot-table exhaustion: +/// +/// 1. **[`MAX_TOPICS_PER_ANNOUNCE`]** — per-message ceiling. Announces +/// carrying more entries than this are rejected outright before any +/// per-topic validation or hashing runs. Without this cap an +/// attacker can pack ~128 000 two-byte topics into a 256 KB +/// envelope and force the relay to do O(n) work per message. +/// 2. **[`MAX_TOPICS_PER_SIGNER`]** — per-signer slot cap with LRU +/// eviction. Stops one peer from monopolising the global slot +/// table by pumping ~10 000 unique topics into it. +/// 3. **[`MAX_TOPICS`]** — global slot cap. Once reached, additional +/// new topics are dropped and the warn is rate-limited at +/// [`WARN_RATE_LIMIT`]. +/// /// This is a **transport-layer** routine. The relay simply joins the /// announced gossip topics so clients on those topics can reach each /// other through it; it performs no governance check on who is @@ -406,18 +558,37 @@ pub async fn topic_announce_listener(mut events: N::Events, network: N) where N: Network, { - let mut subscribed: HashSet = HashSet::new(); - let mut warned_full = false; + let mut state = AnnounceState::default(); while let Some(Ok(event)) = events.next().await { let GossipEvent::Received(msg) = event else { continue; }; - let Some((willow_common::WireMessage::TopicAnnounce { topics }, _)) = + let Some((willow_common::WireMessage::TopicAnnounce { topics }, signer)) = willow_common::unpack_wire(&msg.content) else { continue; }; + + // Per-message cap: drop the whole announce if it exceeds the + // per-message limit. Done BEFORE any per-topic loop so an + // oversized announce costs O(1) work, not O(n). + if topics.len() > MAX_TOPICS_PER_ANNOUNCE { + if should_emit_warn( + &mut state.warn_per_msg_last, + Instant::now(), + WARN_RATE_LIMIT, + ) { + warn!( + cap = MAX_TOPICS_PER_ANNOUNCE, + count = topics.len(), + %signer, + "rejecting TopicAnnounce exceeding per-message cap" + ); + } + continue; + } + for topic_str in topics { if !topic_str_is_valid(&topic_str) { warn!( @@ -426,30 +597,63 @@ where ); continue; } - if subscribed.contains(&topic_str) { - continue; + + let actions = state.process_topic(signer, &topic_str); + + if actions.evicted_for_signer + && should_emit_warn( + &mut state.warn_signer_evict_last, + Instant::now(), + WARN_RATE_LIMIT, + ) + { + warn!( + cap = MAX_TOPICS_PER_SIGNER, + %signer, + "per-signer topic cap reached; evicting LRU topic" + ); } - if subscribed.len() >= MAX_TOPICS { - if !warned_full { - warn!( - cap = MAX_TOPICS, - "topic subscription cap reached; dropping further announces" - ); - warned_full = true; - } - continue; + if actions.rejected_global + && should_emit_warn( + &mut state.warn_global_full_last, + Instant::now(), + WARN_RATE_LIMIT, + ) + { + warn!( + cap = MAX_TOPICS, + "topic subscription cap reached; dropping further announces" + ); } - subscribed.insert(topic_str.clone()); - let topic_id = willow_network::topic_id(&topic_str); - match network.subscribe(topic_id, vec![]).await { - Ok(_) => { - info!(topic = %topic_str, "subscribed to announced channel topic"); + + // Drive the eviction unsubscribe FIRST so the global slot + // is freed before any subsequent subscribe attempt. + if let Some(topic) = actions.unsubscribe { + let topic_id = willow_network::topic_id(&topic); + match network.unsubscribe(topic_id).await { + Ok(_) => { + info!(topic = %topic, "unsubscribed evicted topic"); + } + Err(e) => { + warn!( + topic = %topic, %e, + "failed to unsubscribe evicted topic" + ); + } } - Err(e) => { - warn!( - topic = %topic_str, %e, - "failed to subscribe to announced topic" - ); + } + if let Some(topic) = actions.subscribe { + let topic_id = willow_network::topic_id(&topic); + match network.subscribe(topic_id, vec![]).await { + Ok(_) => { + info!(topic = %topic, "subscribed to announced channel topic"); + } + Err(e) => { + warn!( + topic = %topic, %e, + "failed to subscribe to announced topic" + ); + } } } } @@ -652,46 +856,45 @@ mod tests { listener.abort(); } - /// topic_announce_listener enforces MAX_TOPICS: after reaching the cap it - /// stops subscribing to additional unique topics. - /// - /// We send MAX_TOPICS + 1 distinct valid topics in a single announce. - /// We confirm: - /// - The first topic ("t0") IS subscribed — observer sees NeighborUp. - /// - The last topic ("t{MAX_TOPICS}") is NOT subscribed — no NeighborUp. + /// topic_announce_listener enforces MAX_TOPICS_PER_ANNOUNCE: an announce + /// carrying more entries than the per-message cap is rejected outright. /// - /// Since the listener processes all topics in one synchronous for loop - /// within a single async task poll, seeing NeighborUp for the first topic - /// guarantees the entire for loop has already run before we check the - /// overflow topic. + /// We send MAX_TOPICS_PER_ANNOUNCE + 1 valid topics in one announce, then + /// — to confirm the listener is still alive and processing — send a + /// follow-up announce with a smaller, valid sentinel batch from the same + /// signer. The sentinel must succeed (NeighborUp on its topic) while the + /// oversized topics must NOT (no NeighborUp). #[tokio::test] - async fn topic_announce_listener_enforces_max_topics_cap() { + async fn topic_announce_listener_rejects_oversized_announce() { use willow_network::mem::{MemHub, MemNetwork}; use willow_network::traits::{GossipEvent, Network, TopicEvents}; let hub = MemHub::new(); let announcer_net = MemNetwork::new(&hub); let relay_net = MemNetwork::new(&hub); - let first_observer = MemNetwork::new(&hub); - let overflow_observer = MemNetwork::new(&hub); + let oversized_observer = MemNetwork::new(&hub); + let sentinel_observer = MemNetwork::new(&hub); let announcer_identity = announcer_net.identity().clone(); let ops_topic = willow_network::topic_id("_willow_server_ops"); - // Build MAX_TOPICS + 1 distinct, valid topic strings. - let topics: Vec = (0..=(MAX_TOPICS as u64)).map(|i| format!("t{i}")).collect(); + // Oversized batch: MAX_TOPICS_PER_ANNOUNCE + 1 valid topics. + let oversized: Vec = (0..=(MAX_TOPICS_PER_ANNOUNCE as u64)) + .map(|i| format!("over-{i}")) + .collect(); + let oversized_first_id = willow_network::topic_id(&oversized[0]); - let first_topic_id = willow_network::topic_id(topics.first().unwrap()); - let overflow_topic_id = willow_network::topic_id(topics.last().unwrap()); + // Sentinel: a small valid announce that must still be processed + // (proves the listener didn't crash on the oversized announce). + let sentinel = "sentinel-after-overflow".to_string(); + let sentinel_id = willow_network::topic_id(&sentinel); - // Subscribe observers BEFORE the relay listener runs so NeighborUp - // fires toward the observer when the relay joins each topic. - let (_, mut first_events) = first_observer - .subscribe(first_topic_id, vec![]) + let (_, mut oversized_events) = oversized_observer + .subscribe(oversized_first_id, vec![]) .await .unwrap(); - let (_, mut overflow_events) = overflow_observer - .subscribe(overflow_topic_id, vec![]) + let (_, mut sentinel_events) = sentinel_observer + .subscribe(sentinel_id, vec![]) .await .unwrap(); @@ -704,37 +907,34 @@ mod tests { relay_net, )); - // Broadcast all MAX_TOPICS + 1 topics in a single announce. - let data = pack_topic_announce(topics, &announcer_identity); use willow_network::traits::TopicHandle; + // Send the oversized announce — must be rejected. + let data = pack_topic_announce(oversized, &announcer_identity); ops_handle.broadcast(data).await.expect("broadcast failed"); + // Then send the small sentinel announce. + send_announce_and_wait(&ops_handle, vec![sentinel.clone()], &announcer_identity).await; - // Wait for the first-topic observer to see NeighborUp — proves the - // listener ran and subscriptions began. - let first_neighbor = tokio::time::timeout(std::time::Duration::from_secs(5), async { + // Sentinel must arrive — proves listener is still alive and + // processing announces past the rejected one. + let sentinel_neighbor = tokio::time::timeout(std::time::Duration::from_secs(5), async { loop { - match first_events.next().await { - Some(Ok(GossipEvent::NeighborUp(id))) => return id, + match sentinel_events.next().await { + Some(Ok(GossipEvent::NeighborUp(id))) if id == relay_id => return id, Some(_) => continue, - None => panic!("first_observer stream closed unexpectedly"), + None => panic!("sentinel observer stream closed"), } } }) .await - .expect("timed out waiting for relay to subscribe to the first topic"); - assert_eq!( - first_neighbor, relay_id, - "NeighborUp on first topic should be from relay" - ); - - // Give the listener a moment to finish the remainder of the for loop - // and all the async subscribe() calls inside it. - tokio::time::sleep(std::time::Duration::from_millis(50)).await; + .expect("timed out waiting for sentinel subscription"); + assert_eq!(sentinel_neighbor, relay_id); - // The overflow topic (index MAX_TOPICS) must NOT be subscribed — no NeighborUp. - let overflow_result = tokio::time::timeout(std::time::Duration::from_millis(100), async { + // The oversized announce's first topic must NOT have produced a + // subscription — checking after the sentinel arrives gives the + // listener ample time to have processed both announces. + let oversized_result = tokio::time::timeout(std::time::Duration::from_millis(100), async { loop { - match overflow_events.next().await { + match oversized_events.next().await { Some(Ok(GossipEvent::NeighborUp(id))) if id == relay_id => return true, Some(_) => continue, None => return false, @@ -742,16 +942,175 @@ mod tests { } }) .await; - assert!( - overflow_result.is_err() || overflow_result == Ok(false), - "relay must NOT subscribe to the overflow topic (cap at MAX_TOPICS={MAX_TOPICS})" + oversized_result.is_err() || oversized_result == Ok(false), + "oversized announce must be rejected; first topic must NOT be subscribed" ); drop(hub); listener.abort(); } + // ── AnnounceState unit tests ──────────────────────────────────────────── + // + // These exercise the per-signer LRU and global slot accounting directly, + // without driving the full listener task. The state machine is the load- + // bearing piece — running it through MemNetwork at MAX_TOPICS scale + // (10 000 entries) is wasteful when the same invariants hold per call. + + fn fresh_signer() -> EndpointId { + willow_identity::Identity::generate().endpoint_id() + } + + #[test] + fn announce_state_per_signer_lru_evicts_oldest() { + // Add MAX_TOPICS_PER_SIGNER topics for one signer, then add one more. + // The oldest must be evicted; the newest must be present. + let mut state = AnnounceState::default(); + let signer = fresh_signer(); + for i in 0..MAX_TOPICS_PER_SIGNER as u64 { + let actions = state.process_topic(signer, &format!("t{i}")); + assert_eq!( + actions.subscribe.as_deref(), + Some(format!("t{i}").as_str()), + "first reference should produce Subscribe" + ); + assert!(actions.unsubscribe.is_none()); + assert!(!actions.evicted_for_signer); + assert!(!actions.rejected_global); + } + assert_eq!( + state.signer_topics.get(&signer).map(|q| q.len()), + Some(MAX_TOPICS_PER_SIGNER) + ); + + // The 101st: must evict t0 (oldest) and subscribe to the new one. + let actions = state.process_topic(signer, "t-new"); + assert!(actions.evicted_for_signer); + assert_eq!(actions.unsubscribe.as_deref(), Some("t0")); + assert_eq!(actions.subscribe.as_deref(), Some("t-new")); + assert!(!actions.rejected_global); + + // Signer still holds exactly MAX_TOPICS_PER_SIGNER entries. + let queue = state.signer_topics.get(&signer).unwrap(); + assert_eq!(queue.len(), MAX_TOPICS_PER_SIGNER); + // t0 gone; t-new present. + assert!(!queue.contains(&"t0".to_string())); + assert!(queue.contains(&"t-new".to_string())); + // t0 fully removed from the global table. + assert!(!state.topic_refcount.contains_key("t0")); + } + + #[test] + fn announce_state_per_signer_lru_does_not_starve_other_signers() { + // Signer A pumps MAX_TOPICS_PER_SIGNER topics; Signer B can still + // announce successfully — its quota is independent. + let mut state = AnnounceState::default(); + let signer_a = fresh_signer(); + let signer_b = fresh_signer(); + for i in 0..MAX_TOPICS_PER_SIGNER as u64 { + state.process_topic(signer_a, &format!("a-{i}")); + } + // B's first announce must subscribe (no eviction, no rejection). + let actions = state.process_topic(signer_b, "b-first"); + assert_eq!(actions.subscribe.as_deref(), Some("b-first")); + assert!(!actions.evicted_for_signer); + assert!(!actions.rejected_global); + assert!(actions.unsubscribe.is_none()); + // B's slot is recorded under B (not under A). + assert_eq!(state.signer_topics.get(&signer_b).map(|q| q.len()), Some(1)); + } + + #[test] + fn announce_state_repeat_announce_promotes_lru_no_resubscribe() { + // Re-announcing a topic the signer already holds is a no-op on the + // network and promotes the topic to the back of the LRU queue. + let mut state = AnnounceState::default(); + let signer = fresh_signer(); + state.process_topic(signer, "first"); + state.process_topic(signer, "second"); + let actions = state.process_topic(signer, "first"); + assert!(actions.subscribe.is_none()); + assert!(actions.unsubscribe.is_none()); + assert!(!actions.evicted_for_signer); + // After re-announce, "first" is the most recent entry. + let queue = state.signer_topics.get(&signer).unwrap(); + assert_eq!(queue.back().map(|s| s.as_str()), Some("first")); + assert_eq!(queue.front().map(|s| s.as_str()), Some("second")); + } + + #[test] + fn announce_state_shared_topic_refcount_keeps_subscription() { + // Two signers announce the same topic. Only the first results in a + // Subscribe; both signers contribute to the refcount. Evicting the + // first signer's topic must NOT unsubscribe (the second still holds it). + let mut state = AnnounceState::default(); + let signer_a = fresh_signer(); + let signer_b = fresh_signer(); + let actions = state.process_topic(signer_a, "shared"); + assert_eq!(actions.subscribe.as_deref(), Some("shared")); + let actions = state.process_topic(signer_b, "shared"); + assert!(actions.subscribe.is_none()); + + // Force eviction on signer A by filling its quota. + for i in 0..MAX_TOPICS_PER_SIGNER as u64 { + state.process_topic(signer_a, &format!("filler-{i}")); + } + // "shared" should have been evicted from A but the global subscription + // is still held by B — no Unsubscribe action. + assert!(state.topic_refcount.contains_key("shared")); + assert_eq!(state.topic_refcount.get("shared"), Some(&1)); + } + + #[test] + fn should_emit_warn_rate_limits_to_one_per_window() { + // Two warn attempts inside the window: first true, second false. + // After the window: true again. + let mut last = None; + let t0 = Instant::now(); + let interval = Duration::from_millis(100); + assert!(should_emit_warn(&mut last, t0, interval)); + assert!(!should_emit_warn( + &mut last, + t0 + Duration::from_millis(50), + interval + )); + assert!(should_emit_warn( + &mut last, + t0 + Duration::from_millis(150), + interval + )); + } + + #[test] + fn announce_state_rejects_at_global_cap() { + // Fill the global table by spreading entries across many signers + // (because per-signer cap is lower than global cap). Then a fresh + // signer's new topic must be rejected globally. + let mut state = AnnounceState::default(); + let mut signers = Vec::new(); + let mut idx: u64 = 0; + while state.topic_refcount.len() < MAX_TOPICS { + let s = fresh_signer(); + for _ in 0..MAX_TOPICS_PER_SIGNER { + if state.topic_refcount.len() >= MAX_TOPICS { + break; + } + state.process_topic(s, &format!("g-{idx}")); + idx += 1; + } + signers.push(s); + } + assert_eq!(state.topic_refcount.len(), MAX_TOPICS); + + // Fresh signer's new topic — must be rejected (global table full, + // topic not already present). + let outsider = fresh_signer(); + let actions = state.process_topic(outsider, "new-after-full"); + assert!(actions.rejected_global); + assert!(actions.subscribe.is_none()); + } + // ── topic_str_is_valid unit tests ──────────────────────────────────────── #[test] diff --git a/crates/replay/src/role.rs b/crates/replay/src/role.rs index 616f6208..d51b56a7 100644 --- a/crates/replay/src/role.rs +++ b/crates/replay/src/role.rs @@ -194,8 +194,8 @@ impl ReplayRole { } Err(InsertError::EncryptedKeyTooLarge { got, max }) => { warn!( - got, max, - "rejected event: RotateChannelKey blob over cap (SEC-V-07)", + got, + max, "rejected event: RotateChannelKey blob over cap (SEC-V-07)", ); } } From c4796706e6bf8467e0c1c075513bea7a4030a57e Mon Sep 17 00:00:00 2001 From: intendednull Date: Mon, 27 Apr 2026 19:07:03 -0700 Subject: [PATCH 6/8] chore(docker): pin base images by digest (DEP-02) (#455) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace mutable tags (`rust:latest`, `rust:slim`, `nginxinc/nginx-unprivileged:alpine`) with digest-pinned references so container builds are reproducible and resistant to upstream tag re-push or registry takeover. Pinned (verified via `docker buildx imagetools inspect` 2026-04-28): - rust:1.95-slim-bookworm @sha256:caaf9ca7acd474892186860307d6f28e51fdbc1a4eada459fcff81517cf46a36 - nginxinc/nginx-unprivileged:1.27-alpine @sha256:65e3e85dbaed8ba248841d9d58a899b6197106c23cb0ff1a132b7bfe0547e4c0 Both builder and runtime stages now use the same pinned rust slim image across relay/replay/storage/web; web runtime uses the pinned nginx unprivileged variant. Each `FROM` carries an inline comment recording the version, pin date, and bump command for traceability. Considered alternative: pinning runtime stages to `debian:bookworm-slim` (closer to true minimal base). Rejected for this PR — out of scope per issue #313, which asks only to pin the existing `FROM` lines without restructuring the multi-stage layout. Switching runtime base belongs to a separate change. SBOM stage explicitly out of scope per the issue body. Refs #313 Co-authored-by: Claude --- docker/relay.Dockerfile | 6 ++++-- docker/replay.Dockerfile | 6 ++++-- docker/storage.Dockerfile | 6 ++++-- docker/web.Dockerfile | 6 ++++-- 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/docker/relay.Dockerfile b/docker/relay.Dockerfile index c23932a3..c3996940 100644 --- a/docker/relay.Dockerfile +++ b/docker/relay.Dockerfile @@ -1,9 +1,11 @@ -FROM rust:latest AS builder +# rust:1.95-slim-bookworm pinned 2026-04-28; bump via `docker buildx imagetools inspect rust:1.95-slim-bookworm` +FROM rust:1.95-slim-bookworm@sha256:caaf9ca7acd474892186860307d6f28e51fdbc1a4eada459fcff81517cf46a36 AS builder WORKDIR /build COPY . . RUN cargo build --release -p willow-relay -FROM rust:slim +# rust:1.95-slim-bookworm pinned 2026-04-28; bump via `docker buildx imagetools inspect rust:1.95-slim-bookworm` +FROM rust:1.95-slim-bookworm@sha256:caaf9ca7acd474892186860307d6f28e51fdbc1a4eada459fcff81517cf46a36 RUN useradd -r -u 10001 -m -d /home/willow willow \ && mkdir -p /etc/willow /shared \ && chown -R willow:willow /etc/willow /shared diff --git a/docker/replay.Dockerfile b/docker/replay.Dockerfile index bb2c628d..74ed8485 100644 --- a/docker/replay.Dockerfile +++ b/docker/replay.Dockerfile @@ -1,9 +1,11 @@ -FROM rust:latest AS builder +# rust:1.95-slim-bookworm pinned 2026-04-28; bump via `docker buildx imagetools inspect rust:1.95-slim-bookworm` +FROM rust:1.95-slim-bookworm@sha256:caaf9ca7acd474892186860307d6f28e51fdbc1a4eada459fcff81517cf46a36 AS builder WORKDIR /build COPY . . RUN cargo build --release -p willow-replay -FROM rust:slim +# rust:1.95-slim-bookworm pinned 2026-04-28; bump via `docker buildx imagetools inspect rust:1.95-slim-bookworm` +FROM rust:1.95-slim-bookworm@sha256:caaf9ca7acd474892186860307d6f28e51fdbc1a4eada459fcff81517cf46a36 RUN useradd -r -u 10001 -m -d /home/willow willow \ && mkdir -p /etc/willow \ && chown -R willow:willow /etc/willow diff --git a/docker/storage.Dockerfile b/docker/storage.Dockerfile index cc6f8c03..660cc53b 100644 --- a/docker/storage.Dockerfile +++ b/docker/storage.Dockerfile @@ -1,9 +1,11 @@ -FROM rust:latest AS builder +# rust:1.95-slim-bookworm pinned 2026-04-28; bump via `docker buildx imagetools inspect rust:1.95-slim-bookworm` +FROM rust:1.95-slim-bookworm@sha256:caaf9ca7acd474892186860307d6f28e51fdbc1a4eada459fcff81517cf46a36 AS builder WORKDIR /build COPY . . RUN cargo build --release -p willow-storage -FROM rust:slim +# rust:1.95-slim-bookworm pinned 2026-04-28; bump via `docker buildx imagetools inspect rust:1.95-slim-bookworm` +FROM rust:1.95-slim-bookworm@sha256:caaf9ca7acd474892186860307d6f28e51fdbc1a4eada459fcff81517cf46a36 RUN useradd -r -u 10001 -m -d /home/willow willow \ && mkdir -p /etc/willow /var/lib/willow \ && chown -R willow:willow /etc/willow /var/lib/willow diff --git a/docker/web.Dockerfile b/docker/web.Dockerfile index 8fe1677c..fa9c3c15 100644 --- a/docker/web.Dockerfile +++ b/docker/web.Dockerfile @@ -1,11 +1,13 @@ -FROM rust:latest AS builder +# rust:1.95-slim-bookworm pinned 2026-04-28; bump via `docker buildx imagetools inspect rust:1.95-slim-bookworm` +FROM rust:1.95-slim-bookworm@sha256:caaf9ca7acd474892186860307d6f28e51fdbc1a4eada459fcff81517cf46a36 AS builder RUN rustup target add wasm32-unknown-unknown RUN cargo install trunk WORKDIR /build COPY . . RUN cd crates/web && trunk build --release -FROM nginxinc/nginx-unprivileged:alpine +# nginxinc/nginx-unprivileged:1.27-alpine pinned 2026-04-28; bump via `docker buildx imagetools inspect nginxinc/nginx-unprivileged:1.27-alpine` +FROM nginxinc/nginx-unprivileged:1.27-alpine@sha256:65e3e85dbaed8ba248841d9d58a899b6197106c23cb0ff1a132b7bfe0547e4c0 COPY --from=builder --chown=nginx:nginx /build/crates/web/dist/ /usr/share/nginx/html/ RUN chmod 644 /usr/share/nginx/html/* From 87c1648630190d87c7b4258556687938b003c5ac Mon Sep 17 00:00:00 2001 From: intendednull Date: Mon, 27 Apr 2026 19:12:30 -0700 Subject: [PATCH 7/8] fix(storage): propagate clock error in applied_at_ms (TD-08) (#456) migration insert used `.unwrap_or(0)` on `SystemTime::now` duration. clock pre-1970 means `applied_at_ms = 0` lands in schema_version, silent. rest of crate returns `anyhow::Result`, so just `?` it. only one site in the storage crate, no helper needed. test: extend `schema_version_table_exists_after_open` to read back `applied_at_ms` and assert > 0. pins behaviour against the legacy `0` fallback under normal clock. error-branch test would need time mocking the crate doesn't have, out of scope. TD-08, refs #254 Co-authored-by: Claude --- crates/storage/src/store.rs | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 42d5e2bf..fc67bf14 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -103,10 +103,13 @@ impl StorageEventStore { } let tx = self.conn.unchecked_transaction()?; tx.execute_batch(sql)?; + // Propagate clock errors instead of recording `applied_at_ms = 0` + // when the system clock is somehow set before 1970. The rest of + // this crate returns `anyhow::Result`, so surface the error too. let now_ms = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) - .map(|d| d.as_millis() as i64) - .unwrap_or(0); + .map_err(anyhow::Error::from)? + .as_millis() as i64; tx.execute( "INSERT INTO schema_version (version, applied_at_ms) VALUES (?1, ?2)", params![version, now_ms], @@ -831,22 +834,35 @@ mod tests { #[test] fn schema_version_table_exists_after_open() { let (_dir, store) = open_file_store(); - let versions: Vec = store + let rows: Vec<(i64, i64)> = store .conn - .prepare("SELECT version FROM schema_version ORDER BY version ASC") + .prepare("SELECT version, applied_at_ms FROM schema_version ORDER BY version ASC") .unwrap() - .query_map([], |row| row.get(0)) + .query_map([], |row| Ok((row.get(0)?, row.get(1)?))) .unwrap() .map(|r| r.unwrap()) .collect(); assert!( - !versions.is_empty(), + !rows.is_empty(), "schema_version table should be populated after open" ); + let versions: Vec = rows.iter().map(|(v, _)| *v).collect(); assert!( versions.contains(&1), "migration 1 (initial schema) must be recorded" ); + // applied_at_ms must reflect the real clock, not the legacy `0` + // fallback. Under a normal system clock (post-1970) it is strictly + // positive. The pre-1970 branch now returns an error instead of + // silently writing 0; testing that path would require time-mocking + // infrastructure this crate doesn't have, which is out of scope. + for (version, applied_at_ms) in &rows { + assert!( + *applied_at_ms > 0, + "migration {version} recorded applied_at_ms={applied_at_ms}, \ + expected a positive Unix-ms timestamp", + ); + } } #[test] From d7d26a9b6de93cde81a3ef1b6384df81b5294883 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 28 Apr 2026 02:14:03 +0000 Subject: [PATCH 8/8] chore(skill): pre-dispatch sync, already-fixed path, webhook informational MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit three lessons from batch 2026-04-28-002530: 1. coordinator must `git fetch + reset` master batch branch before each implementer dispatch — stale local state contaminates the next worktree (#451 implementer found half-applied prior work in fresh worktree). 2. when implementer's pre-flight detects upstream fix already landed (#316/#317/#318 vs PR #402), close issues + caveman-comment, do NOT include in master PR `Fixes` list — record under `## Already-Fixed` instead. 3. github webhook subscriptions for sub-PRs are informational only — implementer owns its merge gate, coordinator must not duplicate that work. --- .claude/skills/resolving-issues/SKILL.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.claude/skills/resolving-issues/SKILL.md b/.claude/skills/resolving-issues/SKILL.md index e8405f0f..a2769ffc 100644 --- a/.claude/skills/resolving-issues/SKILL.md +++ b/.claude/skills/resolving-issues/SKILL.md @@ -51,9 +51,10 @@ All sub-fixes land on one master branch per session. Master PR is opened **only 4. No in-scope issues? Noop. Skip the rest. No master branch created, no PR opened. 5. Create fresh master branch for this session (push, no PR yet — see ### Master branch setup). 6. Per issue, sequential, max 10 per run: + - **Pre-dispatch sync:** before spawning each implementer, `git fetch origin ` + `git reset --hard origin/` in the coordinator's checkout. Prior implementers' merges + your own session-open commit must be the worktree base; stale local state poisons the next worktree. - Spawn fresh implementer agent. - Implementer: worktree off master branch → research subagents if needed → fix → tests → sub-PR into master branch → merge gate → squash-merge. - - Track `Fixes #N` for final PR body assembly. + - Track `Fixes #N` for final PR body assembly. **If implementer reports the issue was already fixed upstream and closed it directly with a caveman comment**, do NOT include in `Fixes` list — issue is already closed, `Fixes` keyword would be a no-op or worse a stale link. Note in `## Already-Fixed` master-PR section instead. - Tear down worktree. - Next issue. 7. Implementer finds related rot? File follow-up issue. @@ -81,6 +82,7 @@ Fresh agent per issue, scoped to one issue + master branch ref. Steps: 9. **Merge gate:** if sub-PR CI runs (rare — only when workflow `branches: [main]` filter matches), wait for green. If CI doesn't run (sub-PR base ≠ main is the common case), local `just check` green from step 7 IS the gate. Merge with `mcp__github__merge_pull_request` `merge_method: squash`. 10. CI red after one fix attempt OR local `just check` red OR mid-fix block → **file a follow-up GH issue** (caveman body, link the original issue + cite the blocker), then **close the sub-PR** (don't leave it as a draft for someone to resume). The next scheduled run will see the follow-up issue in the queue and pick it up. Return control to coordinator. 11. Tear down worktree on merge OR on close-after-blocker. +12. **Already-fixed-upstream path:** if pre-flight investigation (e.g. `cargo audit`, file-state grep, `cargo tree`) shows the issue was resolved by a recently-merged upstream PR, do NOT open a dead sub-PR. Leave a caveman comment on the original issue naming the upstream PR + the fix location, close the issue as `completed`, tear down the worktree, report back. Coordinator records this under `## Already-Fixed` in the master PR — NOT under `Fixes`. ## Lessons Learned @@ -98,6 +100,7 @@ Never defer skill edits to a follow-up — they ship with the run that surfaced - Read, dispatch, monitor. Implementers touch files. - One worktree per issue. Sequential between issues. Tear down after merge or draft-park. - **Exception:** the master branch's own session-open commit + Lessons Learned skill edits (see ## Lessons Learned). Coordinator commits these directly to the master branch. +- **Webhook subscriptions are informational.** When `` arrives for a sub-PR opened by an implementer, the implementer owns its merge gate. Coordinator does NOT investigate CI / review state — that's the implementer's job, and the implementer is still running. Acknowledge briefly + keep waiting. Only act on the webhook if no implementer is running for that PR (i.e. the implementer already finished and the webhook arrived later as a stale event). ### Sequential between issues - One issue at a time. No parallel implementers.