diff --git a/.claude/skills/resolving-issues/SKILL.md b/.claude/skills/resolving-issues/SKILL.md index e69f8634..99ee970f 100644 --- a/.claude/skills/resolving-issues/SKILL.md +++ b/.claude/skills/resolving-issues/SKILL.md @@ -111,6 +111,10 @@ Fresh agent per issue, scoped to one issue + master branch ref. Steps: 8. **Commit + push.** Use `caveman:caveman-commit` for the message. Conventional Commits format. `Refs #N` (NOT `Fixes` — that lives only on the master PR). Push directly to origin master branch. + **Never push `wip:` / `chore: checkpoint` / "work-in-progress" commits to the master branch.** Make all your edits, run the local gate, then commit ONCE with the proper Conventional Commits message. Coordinator's master PR body assembles `Fixes #N` rows from per-commit messages — `wip:` rows look like junk to a human reviewer and force a finalize-implementer detour to squash + force-push (real cost: ~10–15 min of cargo lock contention while the rescue agent re-runs gates, plus a `--force-with-lease` against a branch that may have already accumulated more commits from later dispatches if the coordinator misjudges sequencing). If you genuinely need intermediate checkpoints (long sessions, sandbox interference per next bullet), make them in a Pattern B local feature branch and squash via `git merge --no-ff` when done — never push intermediate commits to master. + + **Sandbox `git reset --hard origin/` interference (known hazard).** Some sandboxed environments run a periodic `git reset --hard origin/` between tool invocations that silently rolls back uncommitted edits — visible as `Edit`/`Write` results vanishing between cargo commands, or as the working tree being clean when you expected staged changes. Detection: run `git status` after a tool call you expected to leave changes; if it's clean and the file content matches origin, the sandbox wiped it. Recovery: apply edits and `git add -A && git commit` in a tight single-shell pipeline (one `bash -c` invocation) before the next tool call lands. If you accumulate commits-as-checkpoints this way, follow the no-wip-commits rule above by squashing at the end via `git reset --soft && git commit -m "" && git push --force-with-lease`. Note the sandbox-interference workaround in the commit body so the human can audit. + 9. **Mid-fix block** (CI red on the local gate that won't resolve, brainstorm reveals deeper structural issue, fix demands cross-cutting refactor): **abort the dispatch.** `git checkout ` + `git reset --hard origin/` to drop any local work. File a follow-up GH issue (caveman body, link original + cite the blocker). Return to coordinator. The follow-up issue is the durable handoff for the next scheduled run. 10. **Already-fixed-upstream path:** if pre-flight investigation (e.g. `cargo audit`, file-state grep, `cargo tree`) shows the issue was resolved by a recently-merged upstream PR, do NOT make a no-op commit. Leave a caveman comment on the original issue naming the upstream PR + the fix location, close the issue (`completed` if the audit's intent now holds — the upstream fix solved it for us; `not_planned` if the audit's premise is moot — e.g. the targeted code was deleted), report back. Coordinator records under `## Already-Fixed` in the master PR — NOT under `Fixes`. diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 45893c93..d0843dc2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -182,4 +182,6 @@ jobs: --ignore RUSTSEC-2025-0141 `# bincode 1.x unmaintained (#247)` \ --ignore RUSTSEC-2024-0436 `# paste unmaintained, via leptos+iroh (#316)` \ --ignore RUSTSEC-2024-0370 `# proc-macro-error unmaintained, via iroh-blobs->genawaiter (#317)` \ - --ignore RUSTSEC-2023-0089 `# atomic-polyfill unmaintained, via iroh->postcard->heapless (#318)` + --ignore RUSTSEC-2023-0089 `# atomic-polyfill unmaintained, via iroh->postcard->heapless (#318)` \ + --ignore RUSTSEC-2026-0119 `# hickory-proto O(n^2) name compression, via iroh-relay (#508)` \ + --ignore RUSTSEC-2026-0120 `# hickory-net NSEC3 unbounded loop, via iroh-relay (#509)` diff --git a/crates/client/src/connect.rs b/crates/client/src/connect.rs index 092affbd..6796dfe0 100644 --- a/crates/client/src/connect.rs +++ b/crates/client/src/connect.rs @@ -184,6 +184,7 @@ impl ClientHandle { event_broker: self.event_broker.clone(), identity: self.identity.clone(), join_links: Arc::clone(&self.join_links), + pending_joins: Arc::clone(&self.pending_joins), dag: self.dag_addr.clone(), server_registry: self.server_registry_addr.clone(), on_neighbor_up: None, diff --git a/crates/client/src/joining.rs b/crates/client/src/joining.rs index 79f11f0a..5c26e5ee 100644 --- a/crates/client/src/joining.rs +++ b/crates/client/src/joining.rs @@ -216,7 +216,23 @@ impl ClientHandle { self.mutation_handle.broadcast_on_topic(topic, data); } - pub fn send_join_request(&self, link_id: &str) { + /// Broadcast a `JoinRequest` to the inviter for `link_id` and + /// record the pending attempt so subsequent `JoinResponse` / + /// `JoinDenied` messages can be authenticated against the expected + /// `inviter_peer_id`. + /// + /// `inviter_peer_id` is taken straight from the + /// [`ops::JoinToken::inviter_peer_id`] the caller decoded; the + /// listener uses it to drop spoofed responses signed by anyone else + /// (issue #309 / SEC-A-07). + pub fn send_join_request(&self, link_id: &str, inviter_peer_id: willow_identity::EndpointId) { + // Record the pending attempt BEFORE broadcasting. If the inviter + // races us and replies before we've populated the map, the + // listener would otherwise drop the legitimate response with a + // "no outstanding join request" debug log. + self.pending_joins + .lock() + .insert(link_id.to_string(), inviter_peer_id); let msg = ops::WireMessage::JoinRequest { link_id: link_id.to_string(), peer_id: self.identity.endpoint_id(), diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index 479889b2..089e002c 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -302,6 +302,25 @@ pub struct ClientHandle { // docs/specs/2026-04-26-state-management-model-design.md § 4 and § F4. // Single guard; deferred to keep this PR scoped. pub(crate) join_links: Arc>>, + /// In-flight join attempts initiated by this client, keyed by the + /// `link_id` of the outgoing [`ops::WireMessage::JoinRequest`]. The + /// value is the `EndpointId` of the inviter the request was sent to + /// (extracted from the `JoinToken`). Listeners use this map to verify + /// that incoming `JoinResponse` / `JoinDenied` messages were signed + /// by the expected inviter — without it, any signer with a guessed + /// `target_peer` could spoof a denial or trigger redundant decryption + /// work on the requester (see issue #309 / SEC-A-07). + /// + /// Entries are inserted by `send_join_request` and removed when a + /// matching response/denial arrives. Stale entries persist until the + /// process restarts; the worst case is a small bounded leak + /// proportional to the number of join links a user clicks without + /// ever receiving a reply. + // state: lock-ok — same rationale as `join_links`; tiny map, rarely + // mutated, actor migration tracked alongside `join_links` in + // docs/specs/2026-04-26-state-management-model-design.md § F4. + pub(crate) pending_joins: + Arc>>, /// Bootstrap peers for gossip topic subscriptions. pub bootstrap_peers: Vec, /// The per-author Merkle-DAG actor — source of truth for all events. @@ -340,6 +359,7 @@ impl Clone for ClientHandle { persistence_addr: self.persistence_addr.clone(), persistence_enabled: self.persistence_enabled, join_links: Arc::clone(&self.join_links), + pending_joins: Arc::clone(&self.pending_joins), bootstrap_peers: self.bootstrap_peers.clone(), dag_addr: self.dag_addr.clone(), view_handle: self.view_handle.clone(), @@ -774,6 +794,7 @@ impl ClientHandle { )); let topics: Arc>> = Arc::new(RwLock::new(HashMap::new())); let join_links = Arc::new(parking_lot::Mutex::new(Vec::new())); + let pending_joins = Arc::new(parking_lot::Mutex::new(std::collections::HashMap::new())); // Build StateRefs for derived actor sources. let event_ref = willow_actor::state::StateRef::from(&event_state_addr); @@ -931,6 +952,7 @@ impl ClientHandle { persistence_addr, persistence_enabled, join_links, + pending_joins, bootstrap_peers: config.bootstrap_peers, dag_addr: dag_addr.clone(), view_handle, @@ -1119,6 +1141,7 @@ pub fn test_client() -> ( >, > = Arc::new(RwLock::new(HashMap::new())); let join_links = Arc::new(parking_lot::Mutex::new(Vec::new())); + let pending_joins = Arc::new(parking_lot::Mutex::new(std::collections::HashMap::new())); // Build StateRefs and derived views. let event_ref = willow_actor::state::StateRef::from(&event_state_addr); @@ -1272,6 +1295,7 @@ pub fn test_client() -> ( persistence_addr, persistence_enabled: false, join_links, + pending_joins, bootstrap_peers: vec![], dag_addr: dag_addr.clone(), view_handle, diff --git a/crates/client/src/listeners.rs b/crates/client/src/listeners.rs index ed3113a6..dbea60d6 100644 --- a/crates/client/src/listeners.rs +++ b/crates/client/src/listeners.rs @@ -31,6 +31,14 @@ pub(crate) const MAX_DISPLAY_NAME_LEN: usize = 128; /// as [`MAX_DISPLAY_NAME_LEN`]. pub(crate) const MAX_TYPING_CHANNEL_LEN: usize = 128; +/// Maximum length (in `char`s) for an attacker-supplied `reason` string on +/// [`crate::ops::WireMessage::JoinDenied`] before it is propagated to the +/// UI via [`crate::events::ClientEvent::JoinLinkDenied`]. The reason is +/// shown to the joining user verbatim, so it is both a phishing surface +/// and a memory-amplification vector — bound it tightly. See `[SEC-A-07]` +/// (issue #309). +pub(crate) const MAX_JOIN_DENIED_REASON_LEN: usize = 256; + /// Maximum number of distinct `channel_id`s tracked in /// [`state_actors::VoiceState::participants`]. Caps the outer dimension of /// the map so a malicious peer cannot flood `VoiceJoin` with fresh @@ -57,6 +65,18 @@ pub(crate) fn truncate_to_chars(s: String, max: usize) -> String { } } +/// Sanitize an attacker-supplied `JoinDenied.reason` string before it is +/// surfaced to the UI: strip control characters (anything matching +/// [`char::is_control`]) and cap the remaining length at +/// [`MAX_JOIN_DENIED_REASON_LEN`]. Stripping control characters first +/// means the cap counts only displayable code points, so the user sees +/// at most `MAX_JOIN_DENIED_REASON_LEN` real characters. See `[SEC-A-07]` +/// (issue #309). +pub(crate) fn sanitize_reason(s: String) -> String { + let stripped: String = s.chars().filter(|c| !c.is_control()).collect(); + truncate_to_chars(stripped, MAX_JOIN_DENIED_REASON_LEN) +} + /// Log a `tracing::warn!` if `r` is `Err`, otherwise drop the success value. /// /// Used in this listener hot loop to replace bare `.ok();` calls on @@ -88,6 +108,11 @@ pub struct ListenerCtx { // state: lock-ok — mirrors `ClientHandle.join_links`; actor migration // tracked in docs/specs/2026-04-26-state-management-model-design.md § F4. pub join_links: Arc>>, + /// Mirrors `ClientHandle.pending_joins`; required for the SEC-A-07 + /// signer-binding check on `JoinResponse` / `JoinDenied` (issue + /// #309). + // state: lock-ok — same rationale as `join_links`. + pub pending_joins: Arc>>, pub dag: Addr>, pub server_registry: Addr>, /// Optional callback invoked on `NeighborUp`. Used by the profile topic @@ -110,6 +135,7 @@ impl Clone for ListenerCtx { event_broker: self.event_broker.clone(), identity: self.identity.clone(), join_links: Arc::clone(&self.join_links), + pending_joins: Arc::clone(&self.pending_joins), dag: self.dag.clone(), server_registry: self.server_registry.clone(), on_neighbor_up: self.on_neighbor_up.clone(), @@ -623,6 +649,7 @@ async fn process_received_message( }; if let Some(invite_data) = invite_result { let msg = crate::ops::WireMessage::JoinResponse { + link_id: link_id.clone(), target_peer: peer_endpoint, invite_data, }; @@ -688,31 +715,97 @@ async fn process_received_message( } } crate::ops::WireMessage::JoinResponse { + link_id, target_peer, invite_data, } => { - if target_peer == ctx.identity.endpoint_id() { - warn_if_err( - ctx.event_broker.do_send(willow_actor::Publish( - ClientEvent::JoinLinkResponse { invite_data }, - )), - "event_broker.do_send Publish(JoinLinkResponse)", + // Three independent gates — all must pass before we surface + // the response to the UI: + // 1. `target_peer` is us (cheap rejection of unrelated traffic). + // 2. We have an outstanding join attempt for this `link_id`. + // 3. The packet was signed by the inviter we sent the + // original `JoinRequest` to. + // Without (3), any peer that observed the `JoinRequest` (or + // simply guessed our `target_peer`) could spoof a response + // and force us to attempt invite decryption — a DoS surface + // even though `invite_data` is encrypted to our pubkey. See + // issue #309 / SEC-A-07. + if target_peer != ctx.identity.endpoint_id() { + return; + } + let expected_inviter = { + let mut pending = ctx.pending_joins.lock(); + pending.remove(&link_id) + }; + let Some(expected_inviter) = expected_inviter else { + tracing::debug!( + %signer, %link_id, + "dropping JoinResponse: no outstanding join request for this link_id" ); + return; + }; + if signer != expected_inviter { + tracing::debug!( + %signer, %expected_inviter, %link_id, + "dropping JoinResponse: signer is not the expected inviter" + ); + // Reinsert so a later legitimate response from the real + // inviter still resolves the join attempt. + ctx.pending_joins.lock().insert(link_id, expected_inviter); + return; } + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::JoinLinkResponse { + invite_data, + })), + "event_broker.do_send Publish(JoinLinkResponse)", + ); } crate::ops::WireMessage::JoinDenied { + link_id, target_peer, reason, } => { - if target_peer == ctx.identity.endpoint_id() { - warn_if_err( - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::JoinLinkDenied { - reason, - })), - "event_broker.do_send Publish(JoinLinkDenied)", + // Same three-gate check as `JoinResponse`. The unencrypted + // `reason` makes spoofing especially dangerous here: an + // attacker could inject a phishing string ("invite link + // expired — please re-enter your seed phrase at evil.example") + // that the UI shows verbatim. We additionally sanitize the + // reason (control chars stripped, length capped) before + // surfacing it, even when the signer check passes — defense + // in depth against a misbehaving but otherwise legitimate + // inviter. See issue #309 / SEC-A-07. + if target_peer != ctx.identity.endpoint_id() { + return; + } + let expected_inviter = { + let mut pending = ctx.pending_joins.lock(); + pending.remove(&link_id) + }; + let Some(expected_inviter) = expected_inviter else { + tracing::debug!( + %signer, %link_id, + "dropping JoinDenied: no outstanding join request for this link_id" + ); + return; + }; + if signer != expected_inviter { + tracing::debug!( + %signer, %expected_inviter, %link_id, + "dropping JoinDenied: signer is not the expected inviter" ); + ctx.pending_joins.lock().insert(link_id, expected_inviter); + return; } + let reason = sanitize_reason(reason); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::JoinLinkDenied { + reason, + })), + "event_broker.do_send Publish(JoinLinkDenied)", + ); } // TopicAnnounce is consumed by the relay; clients ignore it. crate::ops::WireMessage::TopicAnnounce { .. } => {} @@ -808,6 +901,7 @@ mod tests { event_broker: client.event_broker.clone(), identity: client.identity.clone(), join_links: Arc::clone(&client.join_links), + pending_joins: Arc::clone(&client.pending_joins), dag: client.dag_addr.clone(), server_registry: client.server_registry_addr.clone(), on_neighbor_up: None, @@ -902,6 +996,7 @@ mod tests { event_broker: client.event_broker.clone(), identity: client.identity.clone(), join_links: Arc::clone(&client.join_links), + pending_joins: Arc::clone(&client.pending_joins), dag: client.dag_addr.clone(), server_registry: client.server_registry_addr.clone(), on_neighbor_up: None, @@ -983,6 +1078,7 @@ mod tests { event_broker: client.event_broker.clone(), identity: client.identity.clone(), join_links: Arc::clone(&client.join_links), + pending_joins: Arc::clone(&client.pending_joins), dag: client.dag_addr.clone(), server_registry: client.server_registry_addr.clone(), on_neighbor_up: None, @@ -1045,6 +1141,7 @@ mod tests { event_broker: client.event_broker.clone(), identity: client.identity.clone(), join_links: Arc::clone(&client.join_links), + pending_joins: Arc::clone(&client.pending_joins), dag: client.dag_addr.clone(), server_registry: client.server_registry_addr.clone(), on_neighbor_up: None, @@ -1095,6 +1192,7 @@ mod tests { event_broker: client.event_broker.clone(), identity: client.identity.clone(), join_links: Arc::clone(&client.join_links), + pending_joins: Arc::clone(&client.pending_joins), dag: client.dag_addr.clone(), server_registry: client.server_registry_addr.clone(), on_neighbor_up: None, @@ -1364,4 +1462,314 @@ mod tests { tokio::time::sleep(std::time::Duration::from_millis(20)).await; } } + + // ─── [SEC-A-07] / #309 — JoinResponse / JoinDenied signer guard ── + // + // Without the fix, the requester accepted any signed packet whose + // `target_peer == self.endpoint_id()` — meaning any peer that could + // observe the topic (or simply guess the requester's `EndpointId`) + // could: + // + // * Spoof a `JoinDenied` carrying an attacker-chosen `reason`, + // surfaced verbatim to the user via `ClientEvent::JoinLinkDenied` + // — a phishing surface. + // * Spoof a `JoinResponse` with a bogus `invite_data`, forcing the + // requester to attempt decryption / UI churn — a DoS surface + // even though the payload is encrypted to the requester's pubkey. + // + // The tests below drive `process_received_message` with synthetic + // packets and assert that responses signed by anyone other than the + // inviter the requester sent its `JoinRequest` to are dropped. They + // also cover the `reason`-string sanitization that runs after the + // signer check passes. + + /// Build a `ListenerCtx` from a `test_client()` plus a `MemNetwork` + /// topic handle, mirroring the pattern used by the JoinRequest guard + /// tests above. + async fn make_join_guard_ctx() -> ( + ClientHandle, + ListenerCtx, + ::Topic, + ) { + let (client, _rx) = test_client(); + let hub = willow_network::mem::MemHub::new(); + let net = willow_network::mem::MemNetwork::new(&hub); + let (topic, _events) = net + .subscribe(willow_network::topic_id("sec-a-07-test"), vec![]) + .await + .expect("subscribe must succeed"); + let ctx = ListenerCtx { + event_state: client.event_state_addr.clone(), + chat_meta: client.chat_meta_addr.clone(), + profiles: client.profile_state_addr.clone(), + network: client.network_meta_addr.clone(), + voice: client.voice_state_addr.clone(), + persistence: client.persistence_addr.clone(), + persistence_enabled: false, + event_broker: client.event_broker.clone(), + identity: client.identity.clone(), + join_links: Arc::clone(&client.join_links), + pending_joins: Arc::clone(&client.pending_joins), + dag: client.dag_addr.clone(), + server_registry: client.server_registry_addr.clone(), + on_neighbor_up: None, + }; + (client, ctx, topic) + } + + /// Drain pending `ClientEvent`s from `rx` after a short settle window. + async fn drain_events( + rx: &mut crate::EventReceiver, + settle: std::time::Duration, + ) -> Vec { + tokio::time::sleep(settle).await; + let mut out = Vec::new(); + while let Ok(ev) = + tokio::time::timeout(std::time::Duration::from_millis(5), rx.recv()).await + { + match ev { + Some(e) => out.push(e), + None => break, + } + } + out + } + + // ─── reason-string sanitization (pure-function tests) ──────────── + + #[test] + fn sanitize_reason_strips_control_chars() { + let dirty = "hello\nworld\t\u{7}!\u{1b}[31mred".to_string(); + let clean = sanitize_reason(dirty); + // Control chars stripped; printable text preserved verbatim. + assert_eq!(clean, "helloworld![31mred"); + assert!( + !clean.chars().any(|c| c.is_control()), + "no control chars must survive" + ); + } + + #[test] + fn sanitize_reason_caps_length() { + let huge: String = "a".repeat(MAX_JOIN_DENIED_REASON_LEN * 4); + let clean = sanitize_reason(huge); + assert_eq!(clean.chars().count(), MAX_JOIN_DENIED_REASON_LEN); + } + + #[test] + fn sanitize_reason_caps_after_stripping_controls() { + // Control chars should be removed *first*, so the cap counts only + // the displayable code points the user would actually see. + let mixed: String = "\n".repeat(1000) + &"x".repeat(MAX_JOIN_DENIED_REASON_LEN); + let clean = sanitize_reason(mixed); + assert_eq!(clean.chars().count(), MAX_JOIN_DENIED_REASON_LEN); + assert!(clean.chars().all(|c| c == 'x')); + } + + #[test] + fn sanitize_reason_passes_short_clean_input_unchanged() { + let s = "link expired".to_string(); + assert_eq!(sanitize_reason(s.clone()), s); + } + + // ─── signer-binding guard — the SEC-A-07 fix ───────────────────── + + /// A `JoinDenied` whose `signer` is *not* the inviter the requester + /// sent its `JoinRequest` to must be dropped silently — no + /// `JoinLinkDenied` event surfaces the attacker-controlled `reason`. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn join_denied_from_non_inviter_is_dropped() { + let (client, ctx, topic) = make_join_guard_ctx().await; + let mut rx = client.subscribe_events().await; + + // Set up: requester is mid-join, expecting a reply from `alice`. + let alice = willow_identity::Identity::generate(); + let alice_id = alice.endpoint_id(); + let link_id = "test-link-spoof-denied".to_string(); + client + .pending_joins + .lock() + .insert(link_id.clone(), alice_id); + + // Mallory (not Alice) signs a JoinDenied with a phishing reason. + let mallory = willow_identity::Identity::generate(); + let phishing = "Your invite expired. Re-enter your seed phrase at evil.example to recover access."; + let msg = crate::ops::WireMessage::JoinDenied { + link_id: link_id.clone(), + target_peer: client.identity.endpoint_id(), + reason: phishing.to_string(), + }; + let bytes = crate::ops::pack_wire(&msg, &mallory).expect("pack must succeed"); + + process_received_message(&bytes, mallory.endpoint_id(), &ctx, &topic).await; + + let events = drain_events(&mut rx, std::time::Duration::from_millis(50)).await; + assert!( + !events + .iter() + .any(|e| matches!(e, ClientEvent::JoinLinkDenied { .. })), + "spoofed JoinDenied must not surface a JoinLinkDenied event, got: {events:?}" + ); + + // The pending entry must still be there — a later legitimate + // denial from Alice should still resolve the join attempt. + assert_eq!( + client.pending_joins.lock().get(&link_id).copied(), + Some(alice_id), + "rejected JoinDenied must not consume the pending-join entry" + ); + } + + /// A `JoinResponse` whose `signer` is *not* the expected inviter must + /// be dropped silently — no `JoinLinkResponse` event surfaces, so the + /// requester never wastes work attempting to decrypt the bogus + /// payload. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn join_response_from_non_inviter_is_dropped() { + let (client, ctx, topic) = make_join_guard_ctx().await; + let mut rx = client.subscribe_events().await; + + let alice = willow_identity::Identity::generate(); + let alice_id = alice.endpoint_id(); + let link_id = "test-link-spoof-resp".to_string(); + client + .pending_joins + .lock() + .insert(link_id.clone(), alice_id); + + let mallory = willow_identity::Identity::generate(); + let msg = crate::ops::WireMessage::JoinResponse { + link_id: link_id.clone(), + target_peer: client.identity.endpoint_id(), + invite_data: "garbage that would fail to decrypt anyway".to_string(), + }; + let bytes = crate::ops::pack_wire(&msg, &mallory).expect("pack must succeed"); + + process_received_message(&bytes, mallory.endpoint_id(), &ctx, &topic).await; + + let events = drain_events(&mut rx, std::time::Duration::from_millis(50)).await; + assert!( + !events + .iter() + .any(|e| matches!(e, ClientEvent::JoinLinkResponse { .. })), + "spoofed JoinResponse must not surface a JoinLinkResponse event, got: {events:?}" + ); + assert_eq!( + client.pending_joins.lock().get(&link_id).copied(), + Some(alice_id), + ); + } + + /// Regression: a `JoinDenied` signed by the actual inviter must + /// surface a `JoinLinkDenied` event with a sanitized `reason`. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn join_denied_from_real_inviter_surfaces_sanitized_event() { + let (client, ctx, topic) = make_join_guard_ctx().await; + let mut rx = client.subscribe_events().await; + + let alice = willow_identity::Identity::generate(); + let alice_id = alice.endpoint_id(); + let link_id = "test-link-real-denied".to_string(); + client + .pending_joins + .lock() + .insert(link_id.clone(), alice_id); + + // Alice's reason carries control chars + ANSI noise we want + // stripped before display. + let raw_reason = "link\u{7} expired\n"; + let msg = crate::ops::WireMessage::JoinDenied { + link_id: link_id.clone(), + target_peer: client.identity.endpoint_id(), + reason: raw_reason.to_string(), + }; + let bytes = crate::ops::pack_wire(&msg, &alice).expect("pack must succeed"); + + process_received_message(&bytes, alice_id, &ctx, &topic).await; + + let events = drain_events(&mut rx, std::time::Duration::from_millis(50)).await; + let denied: Vec<_> = events + .iter() + .filter_map(|e| match e { + ClientEvent::JoinLinkDenied { reason } => Some(reason.clone()), + _ => None, + }) + .collect(); + assert_eq!(denied.len(), 1, "expected exactly one JoinLinkDenied event"); + assert_eq!(denied[0], "link expired"); + assert!( + client.pending_joins.lock().get(&link_id).is_none(), + "legitimate JoinDenied must consume the pending-join entry" + ); + } + + /// Regression: a `JoinResponse` signed by the actual inviter must + /// surface a `JoinLinkResponse` event with the original + /// `invite_data`. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn join_response_from_real_inviter_surfaces_event() { + let (client, ctx, topic) = make_join_guard_ctx().await; + let mut rx = client.subscribe_events().await; + + let alice = willow_identity::Identity::generate(); + let alice_id = alice.endpoint_id(); + let link_id = "test-link-real-resp".to_string(); + client + .pending_joins + .lock() + .insert(link_id.clone(), alice_id); + + let invite = "real-invite-payload-base64"; + let msg = crate::ops::WireMessage::JoinResponse { + link_id: link_id.clone(), + target_peer: client.identity.endpoint_id(), + invite_data: invite.to_string(), + }; + let bytes = crate::ops::pack_wire(&msg, &alice).expect("pack must succeed"); + + process_received_message(&bytes, alice_id, &ctx, &topic).await; + + let events = drain_events(&mut rx, std::time::Duration::from_millis(50)).await; + let responses: Vec<_> = events + .iter() + .filter_map(|e| match e { + ClientEvent::JoinLinkResponse { invite_data } => Some(invite_data.clone()), + _ => None, + }) + .collect(); + assert_eq!(responses.len(), 1); + assert_eq!(responses[0], invite); + assert!( + client.pending_joins.lock().get(&link_id).is_none(), + "legitimate JoinResponse must consume the pending-join entry" + ); + } + + /// A `JoinResponse` with no matching `pending_joins` entry must be + /// dropped — for example, if it arrives long after the requester + /// abandoned the join attempt and cleared local state. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn join_response_without_pending_entry_is_dropped() { + let (client, ctx, topic) = make_join_guard_ctx().await; + let mut rx = client.subscribe_events().await; + // Note: no `pending_joins` insert. + + let alice = willow_identity::Identity::generate(); + let msg = crate::ops::WireMessage::JoinResponse { + link_id: "unknown-link".to_string(), + target_peer: client.identity.endpoint_id(), + invite_data: "anything".to_string(), + }; + let bytes = crate::ops::pack_wire(&msg, &alice).expect("pack must succeed"); + + process_received_message(&bytes, alice.endpoint_id(), &ctx, &topic).await; + + let events = drain_events(&mut rx, std::time::Duration::from_millis(50)).await; + assert!( + !events + .iter() + .any(|e| matches!(e, ClientEvent::JoinLinkResponse { .. })), + "JoinResponse without matching pending entry must be ignored" + ); + } } diff --git a/crates/client/src/ops.rs b/crates/client/src/ops.rs index 39273b69..973aea17 100644 --- a/crates/client/src/ops.rs +++ b/crates/client/src/ops.rs @@ -314,6 +314,7 @@ mod tests { let id = Identity::generate(); let joiner = Identity::generate(); let msg = WireMessage::JoinResponse { + link_id: "link-1".to_string(), target_peer: joiner.endpoint_id(), invite_data: "base64inviteblob".to_string(), }; @@ -321,9 +322,11 @@ mod tests { let (decoded, _) = unpack_wire(&data).unwrap(); match decoded { WireMessage::JoinResponse { + link_id, target_peer, invite_data, } => { + assert_eq!(link_id, "link-1"); assert_eq!(target_peer, joiner.endpoint_id()); assert_eq!(invite_data, "base64inviteblob"); } @@ -336,6 +339,7 @@ mod tests { let id = Identity::generate(); let joiner = Identity::generate(); let msg = WireMessage::JoinDenied { + link_id: "link-1".to_string(), target_peer: joiner.endpoint_id(), reason: "link_expired".to_string(), }; @@ -343,9 +347,11 @@ mod tests { let (decoded, _) = unpack_wire(&data).unwrap(); match decoded { WireMessage::JoinDenied { + link_id, target_peer, reason, } => { + assert_eq!(link_id, "link-1"); assert_eq!(target_peer, joiner.endpoint_id()); assert_eq!(reason, "link_expired"); } diff --git a/crates/client/src/search/bootstrap.rs b/crates/client/src/search/bootstrap.rs new file mode 100644 index 00000000..e040c400 --- /dev/null +++ b/crates/client/src/search/bootstrap.rs @@ -0,0 +1,105 @@ +//! Bootstrap + incremental hooks for the local search index. +//! +//! Replaces the prior signal-driven full-rebuild path in +//! `crates/web/src/app.rs`. Per `local-search.md` §Build behaviour the +//! index is **incremental on arrival, lazy on historical scan** — every +//! send/receive/edit must NOT trigger a `Rebuild` (which destroys the +//! whole index, allocating fresh `Posting` lists for every token of +//! every message). Issue #354 covers the regression where the prior +//! `Effect` kept doing exactly that. +//! +//! Two paths live here: +//! +//! - [`hydrate_index`] — one-shot bootstrap. Walks every channel in the +//! materialized `ServerState` and inserts each non-deleted message +//! into the index. Idempotent (driven by `SearchIndex::insert`'s +//! `by_msg` dedup). +//! - [`index_message`] / [`reindex_message`] — the incremental hooks +//! the UI calls on `ClientEvent::MessageReceived` / +//! `ClientEvent::MessageEdited`. Both look the channel name up from +//! `ServerState::channels` so the `in:#name` operator stays correct. +//! +//! The web crate keeps the wiring (signal subscription + grove-id +//! resolution); this module owns only the data flow into the index. + +use crate::search::{IndexableMessage, SearchIndexHandle}; + +/// Hydrate the index from every channel in the materialized state. +/// +/// Called once on UI bootstrap. Subsequent updates flow through +/// [`index_message`] / [`reindex_message`] / `SearchIndexHandle::remove_message`. +/// +/// `grove_id` is the active grove (server) id — stamped onto every +/// `IndexableMessage` so per-grove opt-out + `in:#grove` filtering work. +pub async fn hydrate_index( + client: &crate::ClientHandle, + search: &SearchIndexHandle, + grove_id: Option, +) { + let channels = client.channels().await; + for name in &channels { + let msgs = client.messages(name).await; + for m in msgs { + // Skip deleted rows so the index never carries tombstones. + if m.deleted { + continue; + } + let im = IndexableMessage::from_display_message(&m, name, grove_id.clone(), None); + search.insert(im); + } + } +} + +/// Insert one message by id — incremental hook for +/// `ClientEvent::MessageReceived`. +/// +/// Resolves channel name from `channel_id` via `state_snapshot` so the +/// `in:#name` operator continues to match. The DAG id is what +/// `MessageReceived.channel` carries (see `derive_client_events` in +/// `crates/client/src/mutations.rs`). +pub async fn index_message( + client: &crate::ClientHandle, + search: &SearchIndexHandle, + channel_id: &str, + message_id: &str, + grove_id: Option, +) { + let snap = client.state_snapshot().await; + let Some(channel_name) = snap.channels.get(channel_id).map(|c| c.name.clone()) else { + // Channel id doesn't resolve — happens transiently during + // sync if the message arrives before its CreateChannel event. + // Bootstrap re-walks all channels on the next session start, so + // dropping here is recoverable. + return; + }; + let Some(msg) = client + .messages(&channel_name) + .await + .into_iter() + .find(|m| m.id == message_id) + else { + return; + }; + if msg.deleted { + return; + } + let im = IndexableMessage::from_display_message(&msg, &channel_name, grove_id, None); + search.insert(im); +} + +/// Re-index one message — incremental hook for `ClientEvent::MessageEdited`. +/// +/// `SearchIndex::insert` is idempotent (it short-circuits on +/// `message_id` to avoid double-counting from the legacy rebuild path). +/// Edits must therefore go through `remove_message` first; otherwise +/// the new body never lands. +pub async fn reindex_message( + client: &crate::ClientHandle, + search: &SearchIndexHandle, + channel_id: &str, + message_id: &str, + grove_id: Option, +) { + search.remove_message(message_id); + index_message(client, search, channel_id, message_id, grove_id).await; +} diff --git a/crates/client/src/search/mod.rs b/crates/client/src/search/mod.rs index 5e6c5539..b75176d3 100644 --- a/crates/client/src/search/mod.rs +++ b/crates/client/src/search/mod.rs @@ -10,6 +10,7 @@ //! Today (Task 1): [`query`] only. pub mod actor; +pub mod bootstrap; pub mod config; pub mod execute; pub mod handle; @@ -22,6 +23,7 @@ pub mod tokenize; #[cfg(test)] mod tests; +pub use bootstrap::{hydrate_index, index_message, reindex_message}; pub use config::{ clear_all_recents, forget_recent, push_recent, RecentQuery, SearchIndexConfig, MAX_RECENTS, }; diff --git a/crates/client/src/search/tests.rs b/crates/client/src/search/tests.rs index f1fa7432..61eff716 100644 --- a/crates/client/src/search/tests.rs +++ b/crates/client/src/search/tests.rs @@ -1039,3 +1039,162 @@ mod from_display_message_tests { assert_eq!(ix.letter_id.as_deref(), Some("L1")); } } + +/// Bootstrap + incremental hooks (issue #354). Verifies that the +/// hydrate / index / reindex helpers feed the index correctly without +/// invoking the destructive `Rebuild` path. +mod bootstrap_tests { + use super::super::*; + use crate::test_client; + use willow_actor::System; + use willow_state::EventHash; + + /// `hydrate_index` walks every channel and seeds the index with + /// every non-deleted message. Cross-channel content survives — the + /// regression in issue #354 was that the prior signal-driven path + /// only ever held the active channel's messages. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn hydrate_indexes_all_channels() { + let (client, _broker) = test_client(); + + client.send_message("general", "alpha hello").await.unwrap(); + client.create_channel("dev").await.unwrap(); + // create_channel is async — wait for the channel to land before + // sending into it. + for _ in 0..50 { + if client.channels().await.iter().any(|n| n == "dev") { + break; + } + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + client.send_message("dev", "beta progress").await.unwrap(); + // Let the message events apply. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let sys = System::new(); + let search = SearchIndexHandle::new_in_memory(&sys.handle()); + bootstrap::hydrate_index(&client, &search, Some("g0".into())).await; + + // Both channels' content lands in the same index. + let general_hits = search + .query(&parse_query("alpha"), &SearchScope::AllGrovesAndLetters) + .await; + assert_eq!(general_hits.len(), 1, "alpha must be indexed"); + let dev_hits = search + .query(&parse_query("beta"), &SearchScope::AllGrovesAndLetters) + .await; + assert_eq!( + dev_hits.len(), + 1, + "beta from a non-active channel must be indexed" + ); + } + + /// `hydrate_index` is idempotent — `SearchIndex::insert` short- + /// circuits on a known `message_id`, so re-running on the same + /// state must not double-count. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn hydrate_is_idempotent() { + let (client, _broker) = test_client(); + + client.send_message("general", "ping").await.unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let sys = System::new(); + let search = SearchIndexHandle::new_in_memory(&sys.handle()); + bootstrap::hydrate_index(&client, &search, None).await; + let after_first = search.message_count().await; + bootstrap::hydrate_index(&client, &search, None).await; + let after_second = search.message_count().await; + assert_eq!(after_first, after_second, "second hydrate must be no-op"); + } + + /// `index_message` inserts one message by id — the incremental + /// hook the indexer task calls on `ClientEvent::MessageReceived`. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn index_message_inserts_by_id() { + let (client, _broker) = test_client(); + + client.send_message("general", "fresh news").await.unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let snap = client.state_snapshot().await; + let (channel_id, _) = snap + .channels + .iter() + .find(|(_, c)| c.name == "general") + .expect("general channel must exist"); + let msg = snap + .messages + .iter() + .find(|m| m.body == "fresh news") + .expect("message must be in state"); + let message_id = msg.id.to_string(); + + let sys = System::new(); + let search = SearchIndexHandle::new_in_memory(&sys.handle()); + // Index is empty up front — only the incremental hook runs. + assert_eq!(search.message_count().await, 0); + bootstrap::index_message(&client, &search, channel_id, &message_id, None).await; + assert_eq!(search.message_count().await, 1); + + let hits = search + .query(&parse_query("fresh"), &SearchScope::AllGrovesAndLetters) + .await; + assert_eq!(hits.len(), 1); + } + + /// `reindex_message` removes-then-reinserts so an edited body + /// replaces the old posting. Without the explicit remove, + /// `SearchIndex::insert` would short-circuit on the existing + /// `message_id` and the new body would never land. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn reindex_replaces_edited_body() { + let (client, _broker) = test_client(); + + client.send_message("general", "old body").await.unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let snap = client.state_snapshot().await; + let (channel_id, _) = snap + .channels + .iter() + .find(|(_, c)| c.name == "general") + .expect("general channel must exist"); + let msg = snap + .messages + .iter() + .find(|m| m.body == "old body") + .expect("message must be in state"); + let message_id = msg.id.to_string(); + let event_hash: EventHash = msg.id; + + let sys = System::new(); + let search = SearchIndexHandle::new_in_memory(&sys.handle()); + bootstrap::index_message(&client, &search, channel_id, &message_id, None).await; + let pre = search + .query(&parse_query("old"), &SearchScope::AllGrovesAndLetters) + .await; + assert_eq!(pre.len(), 1, "pre-edit body must be queryable"); + + // Edit the message and let the event apply. + client + .edit_message("general", &event_hash, "shiny new body") + .await + .unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + bootstrap::reindex_message(&client, &search, channel_id, &message_id, None).await; + let stale = search + .query(&parse_query("old"), &SearchScope::AllGrovesAndLetters) + .await; + assert!( + stale.is_empty(), + "old body must no longer match after reindex" + ); + let fresh = search + .query(&parse_query("shiny"), &SearchScope::AllGrovesAndLetters) + .await; + assert_eq!(fresh.len(), 1, "new body must match after reindex"); + } +} diff --git a/crates/common/src/wire.rs b/crates/common/src/wire.rs index b3bd19b4..1c76602d 100644 --- a/crates/common/src/wire.rs +++ b/crates/common/src/wire.rs @@ -60,12 +60,23 @@ pub enum WireMessage { peer_id: EndpointId, }, /// The inviter's response with an encrypted invite for the requester. + /// + /// `link_id` echoes the originating `JoinRequest.link_id` so the requester + /// can scope the reply to the specific outstanding join attempt and verify + /// that the message `signer` is the inviter it sent the request to. Without + /// this binding any peer with `target_peer` could spoof a response (see + /// issue #309 / SEC-A-07). JoinResponse { + link_id: String, target_peer: EndpointId, invite_data: String, }, /// The inviter denied the join request. + /// + /// `link_id` echoes the originating `JoinRequest.link_id`; see + /// [`WireMessage::JoinResponse`] for the rationale. JoinDenied { + link_id: String, target_peer: EndpointId, reason: String, }, @@ -424,6 +435,7 @@ mod tests { let id = Identity::generate(); let target = Identity::generate().endpoint_id(); let msg = WireMessage::JoinResponse { + link_id: "link-xyz".to_string(), target_peer: target, invite_data: "encrypted-invite-payload".to_string(), }; @@ -431,9 +443,11 @@ mod tests { let (decoded, _) = unpack_wire(&data).unwrap(); match decoded { WireMessage::JoinResponse { + link_id, target_peer, invite_data, } => { + assert_eq!(link_id, "link-xyz"); assert_eq!(target_peer, target); assert_eq!(invite_data, "encrypted-invite-payload"); } @@ -446,6 +460,7 @@ mod tests { let id = Identity::generate(); let target = Identity::generate().endpoint_id(); let msg = WireMessage::JoinDenied { + link_id: "link-xyz".to_string(), target_peer: target, reason: "invite expired".to_string(), }; @@ -453,9 +468,11 @@ mod tests { let (decoded, _) = unpack_wire(&data).unwrap(); match decoded { WireMessage::JoinDenied { + link_id, target_peer, reason, } => { + assert_eq!(link_id, "link-xyz"); assert_eq!(target_peer, target); assert_eq!(reason, "invite expired"); } diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index fc67bf14..03028aae 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -23,6 +23,22 @@ use rusqlite::{params, Connection}; use willow_common::SYNC_BATCH_LIMIT; use willow_state::{Event, EventKind, HeadsSummary}; +/// Maximum number of `(author, head)` entries accepted from a peer-supplied +/// [`HeadsSummary`] in a single `sync_since` / `history` call. +/// +/// `sync_since` and `history` build their SQL clauses by concatenating one +/// `(author = ? AND seq ?)` fragment per entry. A peer-supplied summary +/// with thousands of entries (still well within the transport envelope cap) +/// would either exceed rusqlite's bind-parameter limit (default 32766) or +/// waste CPU compiling a giant prepared statement. +/// +/// 256 is well above any plausible honest server's distinct-author count +/// while keeping the bind-parameter count (~512 per call) and the SQL +/// expression-tree depth safely below SQLite's defaults +/// (32766 binds, depth 1000). Requests over the cap are rejected at the store layer; +/// the caller in `role.rs` maps the error to a `WorkerResponse::Denied`. +pub const MAX_AUTHORS_PER_SYNC: usize = 256; + /// Ordered list of schema migrations. Each entry is run inside its own /// transaction the first time the database is opened. Once a migration is /// shipped, never edit or reorder it — only append new entries. @@ -192,6 +208,19 @@ impl StorageEventStore { before: Option<&HeadsSummary>, limit: u32, ) -> anyhow::Result<(Vec, bool)> { + // Reject peer-supplied cursors that would balloon SQL construction + // (see `MAX_AUTHORS_PER_SYNC`). Done before any allocation or SQL + // building so a hostile request fails fast. + if let Some(heads) = before { + if heads.heads.len() > MAX_AUTHORS_PER_SYNC { + anyhow::bail!( + "too many heads in history cursor: {} > {}", + heads.heads.len(), + MAX_AUTHORS_PER_SYNC + ); + } + } + let capped = (limit as usize).min(SYNC_BATCH_LIMIT); let fetch_limit = capped + 1; @@ -291,6 +320,17 @@ impl StorageEventStore { /// Defined once in `willow_common::SYNC_BATCH_LIMIT` so storage /// production and client validation cannot drift apart. pub fn sync_since(&self, server_id: &str, heads: &HeadsSummary) -> anyhow::Result> { + // Reject peer-supplied summaries that would balloon SQL construction + // (see `MAX_AUTHORS_PER_SYNC`). Done before any allocation or SQL + // building so a hostile request fails fast. + if heads.heads.len() > MAX_AUTHORS_PER_SYNC { + anyhow::bail!( + "too many heads in sync request: {} > {}", + heads.heads.len(), + MAX_AUTHORS_PER_SYNC + ); + } + if heads.heads.is_empty() { // New peer — send up to SYNC_BATCH_LIMIT events for this server. let mut stmt = self.conn.prepare( @@ -1169,4 +1209,82 @@ mod tests { "serialized bytes of recovered event must match original after reopen" ); } + + /// Build a `HeadsSummary` with `n` distinct random authors for cap tests. + fn heads_summary_with_authors(n: usize) -> HeadsSummary { + use willow_state::AuthorHead; + let mut heads = std::collections::BTreeMap::new(); + for _ in 0..n { + let id = Identity::generate(); + heads.insert( + id.endpoint_id(), + AuthorHead { + seq: 1, + hash: EventHash::ZERO, + }, + ); + } + HeadsSummary { heads } + } + + /// A peer-supplied `HeadsSummary` with more than `MAX_AUTHORS_PER_SYNC` + /// entries must be rejected by `sync_since` before any SQL is built. + /// Without the cap, the rusqlite bind-parameter limit (default 32766) or + /// CPU spent compiling the giant prepared statement is the failure mode. + #[test] + fn sync_since_rejects_oversize_heads() { + let store = StorageEventStore::open(":memory:").unwrap(); + let oversize = heads_summary_with_authors(MAX_AUTHORS_PER_SYNC + 1); + + let err = store + .sync_since("srv-1", &oversize) + .expect_err("oversize heads must be rejected, not processed"); + let msg = format!("{err}"); + assert!( + msg.contains("too many heads"), + "error message should mention the cap; got: {msg}" + ); + } + + /// `sync_since` must accept exactly `MAX_AUTHORS_PER_SYNC` entries — the + /// cap is inclusive on the legal side. + #[test] + fn sync_since_accepts_exact_cap() { + let store = StorageEventStore::open(":memory:").unwrap(); + let at_cap = heads_summary_with_authors(MAX_AUTHORS_PER_SYNC); + // No events stored, so the result is empty — but it must not error. + let events = store + .sync_since("srv-1", &at_cap) + .expect("exactly MAX_AUTHORS_PER_SYNC entries must be accepted"); + assert!(events.is_empty()); + } + + /// A peer-supplied `before` cursor with more than `MAX_AUTHORS_PER_SYNC` + /// entries must be rejected by `history` before any SQL is built. + #[test] + fn history_rejects_oversize_before_cursor() { + let store = StorageEventStore::open(":memory:").unwrap(); + let oversize = heads_summary_with_authors(MAX_AUTHORS_PER_SYNC + 1); + + let err = store + .history("srv-1", None, Some(&oversize), 10) + .expect_err("oversize cursor must be rejected, not processed"); + let msg = format!("{err}"); + assert!( + msg.contains("too many heads"), + "error message should mention the cap; got: {msg}" + ); + } + + /// `history` must accept exactly `MAX_AUTHORS_PER_SYNC` cursor entries. + #[test] + fn history_accepts_exact_cap_cursor() { + let store = StorageEventStore::open(":memory:").unwrap(); + let at_cap = heads_summary_with_authors(MAX_AUTHORS_PER_SYNC); + let (events, has_more) = store + .history("srv-1", None, Some(&at_cap), 10) + .expect("exactly MAX_AUTHORS_PER_SYNC entries must be accepted"); + assert!(events.is_empty()); + assert!(!has_more); + } } diff --git a/crates/web/src/app.rs b/crates/web/src/app.rs index 0fdddd48..b9444aab 100644 --- a/crates/web/src/app.rs +++ b/crates/web/src/app.rs @@ -365,62 +365,96 @@ pub fn App() -> impl IntoView { // Alt+↑ / Alt+↓ grove switch, `/` focus + ⌘F scope-flip for search). crate::keybindings::install(app_state, write); - // Hydrate the search index from the messages signal. Runs on every - // message-list change; the index itself dedups by message_id so - // repeated rebuilds are idempotent. + // Spawn the search indexer. One long-lived task hydrates the index + // from existing state on entry, then keeps it fresh by subscribing + // to `ClientEvent`s and routing message lifecycle events into the + // index incrementally. Replaces the prior signal-driven full + // `Rebuild` Effect (issue #354) — that destroyed + rebuilt every + // posting on every send/receive/edit AND only fed in the current + // channel's messages, so switching channels wiped the index for + // the previous channel. // - // Per `local-search.md` §Build behaviour: incremental on arrival, - // lazy on historical scan. We do a simple full rebuild here in v1 - // since the in-memory backend is fast enough for typical corpora - // (< 10k messages). A future phase can switch to incremental insert - // via `ClientEvent::MessageReceived`. + // Bootstrap timing: `ClientHandle::new` synchronously materializes + // anything in `crate::storage`, so by the time we reach this point + // the on-disk corpus is already in the state actor. Live events + // arriving before the bootstrap walk completes are picked up by + // the subscriber loop below; `SearchIndex::insert` is idempotent + // on `message_id` so a doubled hit is a no-op. + // + // Letter scope (`letter_id`) stays `None` — the active-letter + // signal isn't yet wired into this path; tracked as the follow-up + // referenced from issue #355. { - let messages_sig = app_state.chat.messages; - let current_channel_sig = app_state.chat.current_channel; - let peer_id_sig = app_state.network.peer_id; - let active_server_sig = app_state.server.active_server_id; + let handle = handle.clone(); let search = search_index.clone(); - let write_local = write; - Effect::new(move |_| { - let msgs = messages_sig.get(); - let current_ch = current_channel_sig.get(); - let grove_id = active_server_sig.get(); - let local_peer = peer_id_sig.get(); - - // `IndexableMessage::from_display_message` derives the - // `has_image` / `has_file` / `has_link` operator flags - // from the body so `has:image` / `has:file` / `has:link` - // queries actually match. `letter_id` stays `None` here — - // the active-letter signal isn't yet wired into this - // effect; tracked as a follow-up to issue #355. - let grove = if grove_id.is_empty() { - None - } else { - Some(grove_id.clone()) + let active_server_sig = app_state.server.active_server_id; + let set_status = write.search.set_status; + wasm_bindgen_futures::spawn_local(async move { + let grove_id = { + let g = active_server_sig.get_untracked(); + if g.is_empty() { + None + } else { + Some(g) + } }; - let indexable: Vec = msgs - .into_iter() - .map(|m| { - willow_client::IndexableMessage::from_display_message( - &m, - ¤t_ch, - grove.clone(), - None, - ) - }) - .collect(); - // Ignore local peer binding to avoid unused_variables clippy - // noise — we keep the read so the effect subscribes in case - // a later phase filters by author presence. - let _ = local_peer; - let search = search.clone(); - let set_status = write_local.search.set_status; - leptos::task::spawn_local(async move { - search.rebuild(indexable).await; - // `Rebuild` always settles status to `Idle` before the - // future resolves; no need for a second round-trip. - set_status.set(willow_client::SearchIndexBuildStatus::Idle); - }); + + // One-shot hydration over every channel + message + // currently in state. + willow_client::search::hydrate_index(&handle, &search, grove_id.clone()).await; + set_status.set(willow_client::SearchIndexBuildStatus::Idle); + + // Subscribe to live events and update incrementally. + let mut rx = handle.subscribe_events().await; + while let Some(event) = rx.recv().await { + // Re-read grove on each event so a future multi-grove + // client picks up the right id without us caching it. + let grove_id = { + let g = active_server_sig.get_untracked(); + if g.is_empty() { + None + } else { + Some(g) + } + }; + match event { + ClientEvent::MessageReceived { + channel, + message_id, + .. + } => { + willow_client::search::index_message( + &handle, + &search, + &channel, + &message_id, + grove_id, + ) + .await; + } + ClientEvent::MessageEdited { + channel, + message_id, + .. + } => { + willow_client::search::reindex_message( + &handle, + &search, + &channel, + &message_id, + grove_id, + ) + .await; + } + ClientEvent::MessageDeleted { message_id, .. } => { + search.remove_message(&message_id); + } + ClientEvent::ChannelDeleted(channel_id) => { + search.remove_channel(&channel_id); + } + _ => {} + } + } }); } @@ -435,6 +469,7 @@ pub fn App() -> impl IntoView { write.ui.set_join_token.set(Some(state::ParsedJoinToken { raw: token_str.clone(), link_id: token.link_id, + inviter_peer_id: token.inviter_peer_id, server_name: token.server_name, inviter_name: token.inviter_name, })); @@ -461,6 +496,7 @@ pub fn App() -> impl IntoView { .set(Some(state::ParsedJoinToken { raw: token_str.clone(), link_id: token.link_id, + inviter_peer_id: token.inviter_peer_id, server_name: token.server_name, inviter_name: token.inviter_name, })); @@ -555,7 +591,8 @@ pub fn App() -> impl IntoView { let status = state_for_events.ui.join_status.get_untracked(); if status == "connecting" { if let Some(token) = state_for_events.ui.join_token.get_untracked() { - handle_for_events.send_join_request(&token.link_id); + handle_for_events + .send_join_request(&token.link_id, token.inviter_peer_id); } } } diff --git a/crates/web/src/components/join_page.rs b/crates/web/src/components/join_page.rs index 2dd967d6..4f6441d2 100644 --- a/crates/web/src/components/join_page.rs +++ b/crates/web/src/components/join_page.rs @@ -36,7 +36,7 @@ pub fn JoinPage() -> impl IntoView { break; } if let Some(t) = token.get_untracked() { - handle_retry.send_join_request(&t.link_id); + handle_retry.send_join_request(&t.link_id, t.inviter_peer_id); } backoff_ms = (backoff_ms * 2).min(MAX_BACKOFF); } @@ -56,7 +56,7 @@ pub fn JoinPage() -> impl IntoView { write.ui.set_join_status.set("connecting".to_string()); // Send initial JoinRequest. if let Some(t) = t { - h2.send_join_request(&t.link_id); + h2.send_join_request(&t.link_id, t.inviter_peer_id); } }); } diff --git a/crates/web/src/state.rs b/crates/web/src/state.rs index e5066925..96eb5e47 100644 --- a/crates/web/src/state.rs +++ b/crates/web/src/state.rs @@ -73,6 +73,11 @@ pub struct ParsedJoinToken { /// Original base64 for re-encoding. pub raw: String, pub link_id: String, + /// Endpoint ID of the inviter who minted this link. Required for + /// the SEC-A-07 signer-binding check on `JoinResponse` / + /// `JoinDenied` (issue #309) — must be plumbed through to + /// `send_join_request`. + pub inviter_peer_id: willow_identity::EndpointId, pub server_name: String, pub inviter_name: String, } diff --git a/docs/plans/2026-05-02-issue-354-search-incremental.md b/docs/plans/2026-05-02-issue-354-search-incremental.md new file mode 100644 index 00000000..857f82ba --- /dev/null +++ b/docs/plans/2026-05-02-issue-354-search-incremental.md @@ -0,0 +1,98 @@ +# Issue #354 — search index rebuilt from scratch on every message-list change + +## Problem + +`crates/web/src/app.rs:419` Effect reads `messages_sig.get()` +(current-channel messages) and calls `search.rebuild(indexable)` on every +change. Every send/receive/edit destroys + rebuilds the index +synchronously on the WASM main thread. Switching channels also wipes the +index for the previous channel. + +## Approach + +1. **Drop the rebuild Effect.** Remove the `messages_sig`-driven + indexer at `app.rs:377-425` entirely. The index will be a global, + long-lived structure built incrementally. + +2. **New willow-client helpers (`crates/client/src/search/bootstrap.rs`).** + + - `pub async fn hydrate_index(&handle, &search, grove_id)` — walks + every channel via `client.channels()` + `client.messages(name)`, + builds an `IndexableMessage` per row using + `IndexableMessage::from_display_message`, and calls + `search.insert(...)` (idempotent on `message_id`). Used once at + bootstrap. + + - `pub async fn index_message(&handle, &search, channel_id, + message_id, grove_id)` — looks up channel name from id, fetches + the matching message, builds an `IndexableMessage`, and calls + `search.insert(...)`. Used by the incremental path on + `MessageReceived`. + + - `pub async fn reindex_message(&handle, &search, channel_id, + message_id, grove_id)` — calls `search.remove_message` first so + the new body wins, then re-inserts. Used by the incremental path + on `MessageEdited`. + + Why a new module: the bootstrap and incremental paths are pure + client-tier logic (no DOM). Putting them in willow-client makes + them testable with `test_client()` instead of needing a browser + harness. + +3. **New web-crate indexer task (`crates/web/src/app.rs`).** Replace the + dropped Effect with a single `wasm_bindgen_futures::spawn_local` + task that: + + - Subscribes to `client.subscribe_events()`. + - Reads the active grove from `app_state.server.active_server_id`. + - Calls `hydrate_index(...)` once on entry. (Storage-loaded events + are already materialized synchronously in `ClientHandle::new`, so + anything on disk is hydrated immediately. Live events arriving + before `hydrate_index` returns are picked up by the subscriber + loop — `insert` is idempotent, so a doubled hit is a no-op.) + - Loops on `event_rx.recv()` and dispatches: + - `MessageReceived { channel, message_id, .. }` → `index_message` + - `MessageEdited { channel, message_id, .. }` → `reindex_message` + - `MessageDeleted { channel, message_id }` → `search.remove_message` + - `ChannelDeleted(channel_id)` → `search.remove_channel` + + The grove signal is read fresh on each event so a future + multi-grove client picks up the right id. + +## Tests + +- **client-tier (`crates/client/src/search/tests.rs`)**: new + `bootstrap_tests` module: + - `hydrate_index_inserts_all_channels` — create 2 channels, send N + messages to each via `test_client`, call `hydrate_index`, assert + `message_count == 2N` and querying picks up both channels. + - `hydrate_index_idempotent` — call twice, count stays the same. + - `index_message_inserts_one` — assert single-message insert lands. + - `reindex_message_replaces_body` — edit a message body, assert old + body no longer matches but new body does. + +- **web browser-tier (`crates/web/tests/browser.rs`)** — N/A. The + rebuild Effect lives in `App()` which is wired to a live + `ClientHandle`. We can't realistically mount the full + app under wasm-pack without a fake network. The bootstrap + + incremental-update behaviour is fully covered by client-tier tests + using `test_client()`. Skipping the DOM tier here is consistent with + the test-tier decision tree in `CLAUDE.md`: "Default to lowest tier + covering behaviour." + +## Files touched (target ≤ 5) + +1. `crates/client/src/search/bootstrap.rs` — new +2. `crates/client/src/search/mod.rs` — wire module +3. `crates/client/src/search/tests.rs` — bootstrap tests +4. `crates/web/src/app.rs` — drop rebuild Effect, spawn indexer task + +## Trade-offs + +- **Edits require channel-id → name lookup.** Each `MessageEdited` + triggers a `state_snapshot()` to find the channel name. Cheap; the + snapshot is an `Arc::clone`. Considered caching — premature. +- **Rejected: keep the Effect, gate on first run.** Less robust — + signal-driven first-run still has the wrong subscription target + (only current channel) and doesn't handle edits. +- **Rejected: poll for channels on a timer.** Wastes battery, racy.