From f96427426059962df32e3cdca171e0a736f80671 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 28 Apr 2026 16:28:55 +0000 Subject: [PATCH 1/7] chore: open auto-fix batch claude/friendly-maxwell-bVlpW From 56bd43375211e000b806e834ca7b481130dc8b4a Mon Sep 17 00:00:00 2001 From: intendednull Date: Tue, 28 Apr 2026 09:31:20 -0700 Subject: [PATCH 2/7] build(docker): pin trunk to 0.21.14 in web image (#478) Mirror deploy.yml pin. Eliminates supply-chain drift from unpinned `cargo install trunk`. Refs #475 Co-authored-by: Claude --- docker/web.Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/web.Dockerfile b/docker/web.Dockerfile index fa9c3c15..e61a53c2 100644 --- a/docker/web.Dockerfile +++ b/docker/web.Dockerfile @@ -1,7 +1,7 @@ # rust:1.95-slim-bookworm pinned 2026-04-28; bump via `docker buildx imagetools inspect rust:1.95-slim-bookworm` FROM rust:1.95-slim-bookworm@sha256:caaf9ca7acd474892186860307d6f28e51fdbc1a4eada459fcff81517cf46a36 AS builder RUN rustup target add wasm32-unknown-unknown -RUN cargo install trunk +RUN cargo install trunk --version 0.21.14 --locked WORKDIR /build COPY . . RUN cd crates/web && trunk build --release From f2b6446573582627a7fb5f5d0b423a2c668eb9a5 Mon Sep 17 00:00:00 2001 From: intendednull Date: Tue, 28 Apr 2026 09:44:23 -0700 Subject: [PATCH 3/7] fix(client): log dropped fire-and-forget sends in listeners hot loop (#479) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces 16 trailing `.ok();` swallows on broker `do_send`, topic `broadcast`, and persistence `do_send` calls in `crates/client/src/listeners.rs` with a private `warn_if_err(Result, &'static str)` helper that logs `tracing::warn!(?e, "{context}")` on `Err` and drops the success value. Each call site gets a distinct diagnostic context string. The single remaining `.ok()` (GrantPermission branch) is intentional — it coerces `Result` to `Option` for `if let Some(event) = ...`, not a swallowed fire-and-forget send. Refs #253 --- crates/client/src/listeners.rs | 179 +++++++++++++++++++++------------ 1 file changed, 115 insertions(+), 64 deletions(-) diff --git a/crates/client/src/listeners.rs b/crates/client/src/listeners.rs index b2543223..5dd01455 100644 --- a/crates/client/src/listeners.rs +++ b/crates/client/src/listeners.rs @@ -42,6 +42,22 @@ pub(crate) fn truncate_to_chars(s: String, max: usize) -> String { } } +/// Log a `tracing::warn!` if `r` is `Err`, otherwise drop the success value. +/// +/// Used in this listener hot loop to replace bare `.ok();` calls on +/// "fire-and-forget" sends — broker `do_send`, topic `broadcast`, persistence +/// `do_send`, etc. The previous pattern silently dropped these failures, so a +/// dead broker or broken topic would cause the listener to keep running +/// without any record. We still proceed (the listener should not abort on a +/// downstream failure), but field logs now surface the issue. +/// +/// See issue #253. +fn warn_if_err(r: Result, context: &'static str) { + if let Err(e) = r { + tracing::warn!(?e, "{context}"); + } +} + /// Context passed to listener tasks with all the actor addresses they need. pub struct ListenerCtx { pub event_state: Addr>, @@ -117,9 +133,11 @@ async fn topic_listener_loop( } }) .await; - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::PeerConnected(id))) - .ok(); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::PeerConnected(id))), + "event_broker.do_send Publish(PeerConnected)", + ); // Re-broadcast local profile so the newly joined peer gets // our display name even if they missed the initial broadcast. if let Some(cb) = &ctx.on_neighbor_up { @@ -132,9 +150,11 @@ async fn topic_listener_loop( c.peers.retain(|p| p != &id2); }) .await; - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::PeerDisconnected(id))) - .ok(); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::PeerDisconnected(id))), + "event_broker.do_send Publish(PeerDisconnected)", + ); } } } @@ -192,12 +212,17 @@ async fn try_insert_event(ctx: &ListenerCtx, event: willow_state::Event) { // Persist and emit for each applied event. for ev in &all_applied { - ctx.persistence - .do_send(persistence_actor::PersistEvent { event: ev.clone() }) - .ok(); + warn_if_err( + ctx.persistence + .do_send(persistence_actor::PersistEvent { event: ev.clone() }), + "persistence.do_send PersistEvent", + ); let client_events = mutations::derive_client_events(ev); for e in client_events { - ctx.event_broker.do_send(willow_actor::Publish(e)).ok(); + warn_if_err( + ctx.event_broker.do_send(willow_actor::Publish(e)), + "event_broker.do_send Publish(derived ClientEvent)", + ); } } // Persist the state snapshot so messages survive a page reload @@ -239,12 +264,14 @@ async fn process_received_message( p.names.insert(peer_id, display_name); }) .await; - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::ProfileUpdated { - peer_id: profile.peer_id, - display_name: profile.display_name, - })) - .ok(); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::ProfileUpdated { + peer_id: profile.peer_id, + display_name: profile.display_name, + })), + "event_broker.do_send Publish(ProfileUpdated from profile broadcast)", + ); return; } Err(willow_identity::IdentityError::PeerMismatch { claimed, signer }) => { @@ -314,11 +341,13 @@ async fn process_received_message( let _is_synced = willow_actor::state::select(&ctx.dag, |ds| ds.managed.is_synced()).await; - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::SyncCompleted { - ops_applied: count, - })) - .ok(); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::SyncCompleted { + ops_applied: count, + })), + "event_broker.do_send Publish(SyncCompleted)", + ); } } crate::ops::WireMessage::SyncRequest { state_hash, .. } => { @@ -340,7 +369,10 @@ async fn process_received_message( if !events.is_empty() { let msg = crate::ops::WireMessage::SyncBatch { events }; if let Some(data) = crate::ops::pack_wire(&msg, &ctx.identity) { - topic.broadcast(bytes::Bytes::from(data)).await.ok(); + warn_if_err( + topic.broadcast(bytes::Bytes::from(data)).await, + "topic.broadcast SyncBatch (SyncRequest reply)", + ); } } } @@ -377,12 +409,14 @@ async fn process_received_message( v.participants.entry(ch).or_default().insert(peer_id); }) .await; - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::VoiceJoined { - channel_id, - peer_id, - })) - .ok(); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::VoiceJoined { + channel_id, + peer_id, + })), + "event_broker.do_send Publish(VoiceJoined)", + ); } crate::ops::WireMessage::VoiceLeave { channel_id, @@ -395,12 +429,14 @@ async fn process_received_message( } }) .await; - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::VoiceLeft { - channel_id, - peer_id, - })) - .ok(); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::VoiceLeft { + channel_id, + peer_id, + })), + "event_broker.do_send Publish(VoiceLeft)", + ); } crate::ops::WireMessage::VoiceSignal { channel_id, @@ -408,13 +444,15 @@ async fn process_received_message( signal, } => { if target_peer == ctx.identity.endpoint_id() { - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::VoiceSignal { - channel_id, - from_peer: signer, - signal, - })) - .ok(); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::VoiceSignal { + channel_id, + from_peer: signer, + signal, + })), + "event_broker.do_send Publish(VoiceSignal)", + ); } } crate::ops::WireMessage::JoinRequest { link_id, peer_id } => { @@ -506,7 +544,10 @@ async fn process_received_message( invite_data, }; if let Some(data) = crate::ops::pack_wire(&msg, &ctx.identity) { - topic.broadcast(bytes::Bytes::from(data)).await.ok(); + warn_if_err( + topic.broadcast(bytes::Bytes::from(data)).await, + "topic.broadcast JoinResponse", + ); } // Grant SendMessages permission to the joining peer so @@ -542,17 +583,22 @@ async fn process_received_message( }) .await; // Persist. - ctx.persistence - .do_send(crate::persistence_actor::PersistEvent { - event: event.clone(), - }) - .ok(); + warn_if_err( + ctx.persistence + .do_send(crate::persistence_actor::PersistEvent { + event: event.clone(), + }), + "persistence.do_send PersistEvent (GrantPermission)", + ); // Broadcast to other peers. if let Some(data) = crate::ops::pack_wire( &crate::ops::WireMessage::Event(event), &ctx.identity, ) { - topic.broadcast(bytes::Bytes::from(data)).await.ok(); + warn_if_err( + topic.broadcast(bytes::Bytes::from(data)).await, + "topic.broadcast Event(GrantPermission)", + ); } } } @@ -563,11 +609,12 @@ async fn process_received_message( invite_data, } => { if target_peer == ctx.identity.endpoint_id() { - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::JoinLinkResponse { - invite_data, - })) - .ok(); + warn_if_err( + ctx.event_broker.do_send(willow_actor::Publish( + ClientEvent::JoinLinkResponse { invite_data }, + )), + "event_broker.do_send Publish(JoinLinkResponse)", + ); } } crate::ops::WireMessage::JoinDenied { @@ -575,11 +622,13 @@ async fn process_received_message( reason, } => { if target_peer == ctx.identity.endpoint_id() { - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::JoinLinkDenied { - reason, - })) - .ok(); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::JoinLinkDenied { + reason, + })), + "event_broker.do_send Publish(JoinLinkDenied)", + ); } } // TopicAnnounce is consumed by the relay; clients ignore it. @@ -603,12 +652,14 @@ async fn process_received_message( p.names.insert(peer_id, name); }) .await; - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::ProfileUpdated { - peer_id, - display_name, - })) - .ok(); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::ProfileUpdated { + peer_id, + display_name, + })), + "event_broker.do_send Publish(ProfileUpdated from ProfileAnnounce)", + ); } // Worker messages travel on the _willow_workers topic and are never // delivered to the client's server-ops listener. From 49855e2bdea44512026219943ccc3ac3bf680588 Mon Sep 17 00:00:00 2001 From: intendednull Date: Tue, 28 Apr 2026 10:00:02 -0700 Subject: [PATCH 4/7] fix(web): route component handler errors through warn_and_toast_with (#480) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #443 closed #350 by routing client-mutation errors in handlers.rs through warn_and_toast / warn_and_toast_with so the user sees a toast instead of the action silently vanishing. The pattern was not propagated to component-internal handlers — 11 sites across roles.rs, settings.rs, sync_queue_view.rs, and channel_sidebar.rs still discarded the anyhow::Result with `let _ = h.foo(...).await`. Migrate all 11 sites: - roles.rs: create role, set permission, assign role, delete role - settings.rs: set server display name, mute server - sync_queue_view.rs: retry queue - channel_sidebar.rs: create channel, create voice channel, delete channel, mute channel Each site now captures `let toasts = use_context::()` on the outer reactive frame (before `wasm_bindgen_futures::spawn_local`, which strips the reactive owner), moves it into the async block, and dispatches `crate::handlers::warn_and_toast_with("", &e, toasts.as_ref())` on Err. `rg -n 'let _ = h\.[a-z_]+\(.*\)\.await' crates/web/src/components/` goes from 11 hits to 0. Refs #476 --- crates/web/src/components/channel_sidebar.rs | 43 +++++++++++++++++--- crates/web/src/components/roles.rs | 37 ++++++++++++++--- crates/web/src/components/settings.rs | 22 ++++++++-- crates/web/src/components/sync_queue_view.rs | 8 +++- 4 files changed, 96 insertions(+), 14 deletions(-) diff --git a/crates/web/src/components/channel_sidebar.rs b/crates/web/src/components/channel_sidebar.rs index c580bc35..83a9af36 100644 --- a/crates/web/src/components/channel_sidebar.rs +++ b/crates/web/src/components/channel_sidebar.rs @@ -15,7 +15,7 @@ use leptos::prelude::*; use crate::app::WebClientHandle; use crate::components::{ ConfirmDialog, ContextMenu, StatusDot, StatusDotBorder, StatusDotSize, TempChannelCreateForm, - VoiceControls, TEMP_DEFAULT_DAYS, + ToastStack, VoiceControls, TEMP_DEFAULT_DAYS, }; use crate::icons; @@ -185,13 +185,29 @@ pub fn ChannelSidebar( } else if let Some(kind) = kind { let h = handle_create.clone(); let name_owned = name.clone(); + // Capture toast stack on the outer reactive frame — + // `spawn_local` strips the owner so `use_context` + // inside the async block would return None. + let toasts = use_context::(); wasm_bindgen_futures::spawn_local(async move { match kind { willow_state::ChannelKind::Voice => { - let _ = h.create_voice_channel(&name_owned).await; + if let Err(e) = h.create_voice_channel(&name_owned).await { + crate::handlers::warn_and_toast_with( + "create voice channel", + &e, + toasts.as_ref(), + ); + } } _ => { - let _ = h.create_channel(&name_owned).await; + if let Err(e) = h.create_channel(&name_owned).await { + crate::handlers::warn_and_toast_with( + "create channel", + &e, + toasts.as_ref(), + ); + } } } }); @@ -658,8 +674,15 @@ pub fn ChannelSidebar( on_confirm=Callback::new(move |_| { if let Some(name) = pending_del_channel.get_untracked() { let h = handle_del_confirm.clone(); + let toasts = use_context::(); wasm_bindgen_futures::spawn_local(async move { - let _ = h.delete_channel(&name).await; + if let Err(e) = h.delete_channel(&name).await { + crate::handlers::warn_and_toast_with( + "delete channel", + &e, + toasts.as_ref(), + ); + } }); } set_pending_del_channel.set(None); @@ -974,8 +997,18 @@ fn render_channel_row( let channel = name_for_mute.clone(); let h = handle.clone(); let target = !is_muted.get_untracked(); + // Capture toast stack on the outer reactive frame — + // `spawn_local` strips the owner so `use_context` + // inside the async block would return None. + let toasts = use_context::(); wasm_bindgen_futures::spawn_local(async move { - let _ = h.mutate_channel_mute(&channel, target).await; + if let Err(e) = h.mutate_channel_mute(&channel, target).await { + crate::handlers::warn_and_toast_with( + "mute channel", + &e, + toasts.as_ref(), + ); + } }); } > diff --git a/crates/web/src/components/roles.rs b/crates/web/src/components/roles.rs index 59f0dfb0..8db97e4a 100644 --- a/crates/web/src/components/roles.rs +++ b/crates/web/src/components/roles.rs @@ -1,7 +1,7 @@ use leptos::prelude::*; use crate::app::WebClientHandle; -use crate::components::ConfirmDialog; +use crate::components::{ConfirmDialog, ToastStack}; use crate::state::AppState; /// List of all permission names that can be toggled on a role. @@ -52,8 +52,14 @@ pub fn RoleManager( let name = name.trim().to_string(); if !name.is_empty() { let h = handle_create.clone(); + // Capture toast stack on the outer reactive frame — + // `spawn_local` strips the owner so `use_context` inside + // the async block would return None. + let toasts = use_context::(); wasm_bindgen_futures::spawn_local(async move { - let _ = h.create_role(&name).await; + if let Err(e) = h.create_role(&name).await { + crate::handlers::warn_and_toast_with("create role", &e, toasts.as_ref()); + } }); } set_new_name.set(String::new()); @@ -197,6 +203,7 @@ pub fn RoleManager( let h = hp_t.clone(); let rid = rid_t.clone(); let perm = perm_toggle.clone(); + let toasts = use_context::(); wasm_bindgen_futures::spawn_local(async move { // Names come from PERMISSION_NAMES which is // kept in sync with willow_state::Permission; @@ -204,7 +211,13 @@ pub fn RoleManager( if let Some(parsed) = willow_state::Permission::from_name(&perm) { - let _ = h.set_permission(&rid, parsed, granted).await; + if let Err(e) = h.set_permission(&rid, parsed, granted).await { + crate::handlers::warn_and_toast_with( + "set permission", + &e, + toasts.as_ref(), + ); + } } }); } @@ -242,8 +255,15 @@ pub fn RoleManager( if let Ok(eid) = pid.trim().parse::() { let h = ha.clone(); let r = rid.clone(); + let toasts = use_context::(); wasm_bindgen_futures::spawn_local(async move { - let _ = h.assign_role(eid, &r).await; + if let Err(e) = h.assign_role(eid, &r).await { + crate::handlers::warn_and_toast_with( + "assign role", + &e, + toasts.as_ref(), + ); + } }); } set_assign_peer.set(String::new()); @@ -287,8 +307,15 @@ pub fn RoleManager( on_confirm=Callback::new(move |_| { if let Some((rid, _)) = pending_del_role.get_untracked() { let h = handle_del_confirm.clone(); + let toasts = use_context::(); wasm_bindgen_futures::spawn_local(async move { - let _ = h.delete_role(&rid).await; + if let Err(e) = h.delete_role(&rid).await { + crate::handlers::warn_and_toast_with( + "delete role", + &e, + toasts.as_ref(), + ); + } }); } set_pending_del_role.set(None); diff --git a/crates/web/src/components/settings.rs b/crates/web/src/components/settings.rs index 849eead0..37d5f7b4 100644 --- a/crates/web/src/components/settings.rs +++ b/crates/web/src/components/settings.rs @@ -2,7 +2,7 @@ use leptos::prelude::*; use willow_client::presence::{PresenceOverride, PresenceState}; use crate::app::WebClientHandle; -use crate::components::{RoleManager, StatusDot, StatusDotBorder, StatusDotSize}; +use crate::components::{RoleManager, StatusDot, StatusDotBorder, StatusDotSize, ToastStack}; use crate::icons; use crate::state::{AppState, SettingsTab}; use crate::util::copy_to_clipboard; @@ -54,8 +54,18 @@ pub fn SettingsPanel( if !name.trim().is_empty() { let h = handle_save.clone(); let name = name.trim().to_string(); + // Capture toast stack on the outer reactive frame — + // `spawn_local` strips the owner so `use_context` inside + // the async block would return None. + let toasts = use_context::(); wasm_bindgen_futures::spawn_local(async move { - let _ = h.set_server_display_name(&name).await; + if let Err(e) = h.set_server_display_name(&name).await { + crate::handlers::warn_and_toast_with( + "set server display name", + &e, + toasts.as_ref(), + ); + } }); } set_status_msg.set("Saved.".to_string()); @@ -435,8 +445,14 @@ fn NotificationsTabPlaceholder() -> impl IntoView { let target = !local_muted.get_untracked(); set_local_muted.set(target); let h = handle.clone(); + // Capture toast stack on the outer reactive frame — + // `spawn_local` strips the owner so `use_context` inside + // the async block would return None. + let toasts = use_context::(); wasm_bindgen_futures::spawn_local(async move { - let _ = h.mutate_grove_mute(target).await; + if let Err(e) = h.mutate_grove_mute(target).await { + crate::handlers::warn_and_toast_with("mute server", &e, toasts.as_ref()); + } }); } }; diff --git a/crates/web/src/components/sync_queue_view.rs b/crates/web/src/components/sync_queue_view.rs index 70090c5f..d39fbb6f 100644 --- a/crates/web/src/components/sync_queue_view.rs +++ b/crates/web/src/components/sync_queue_view.rs @@ -76,8 +76,14 @@ pub fn SyncQueueView() -> impl IntoView { busy.set(false); return; }; + // Capture toast stack on the outer reactive frame — + // `spawn_local` strips the owner so `use_context` inside the + // async block would return None. + let toasts = use_context::(); wasm_bindgen_futures::spawn_local(async move { - let _ = h.retry_queue().await; + if let Err(e) = h.retry_queue().await { + crate::handlers::warn_and_toast_with("retry queue", &e, toasts.as_ref()); + } busy.set(false); }); }; From e805a8321eb6d4e68582bb160227a8a1ac405011 Mon Sep 17 00:00:00 2001 From: intendednull Date: Tue, 28 Apr 2026 10:18:44 -0700 Subject: [PATCH 5/7] test(actor): cover shutdown ordering and broker fanout (#482) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add 4 inline #[tokio::test]s to crates/actor/src/lib.rs covering behaviors the audit (issue #232) called out as missing: - system_shutdown_terminates_ctx_spawned_child: ctx.spawn children are tracked by the system and stopped on shutdown. - system_shutdown_awaits_in_flight_handler: shutdown blocks until every actor's mailbox loop runs to completion (parent waits for child). Uses a oneshot ready-signal — no sleep-for-propagation. - broker_delivers_to_many_subscribers: fanout to N=5 subscribers, with ask() round-trips on each subscriber as a deterministic FIFO barrier. Also asserts no replay to late subscribers. - broker_slow_subscriber_does_not_block_others: a blocked Handler on one subscriber must not delay delivery to another. Uses tokio::sync::Notify for release coordination. Test count: 90 -> 94 (well above the issue's >10 default-run target). Note on the issue's other prescription — splitting tests/performance.rs into actor.rs (correctness) + performance.rs (timing): every test in the file is a genuine performance/throughput case (assert ops_per_sec > 5_000.0, 10k-iteration loops, propagation-latency thresholds, multi-source benchmarks). There are no correctness-only tests lurking in there to extract; splitting would be churn without benefit. Inline tests in src/ already cover the correctness counterparts (e.g. broker_publish_to_subscribers in broker.rs vs perf_broker_fanout in performance.rs). Refs #232 Co-authored-by: Claude --- crates/actor/src/lib.rs | 275 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 275 insertions(+) diff --git a/crates/actor/src/lib.rs b/crates/actor/src/lib.rs index e9f85b79..b5338ed0 100644 --- a/crates/actor/src/lib.rs +++ b/crates/actor/src/lib.rs @@ -1140,4 +1140,279 @@ mod tests { system.shutdown().await; } + + // ───── Shutdown ordering: system waits for children spawned via ctx ──── + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn system_shutdown_terminates_ctx_spawned_child() { + // Children spawned via Context::spawn are tracked by the system and + // must be stopped when the system shuts down. Without this guarantee, + // child actors leak past their parent. + struct ParentActor; + impl Actor for ParentActor {} + + struct SpawnChild; + impl Message for SpawnChild { + type Result = Addr; + } + impl Handler for ParentActor { + fn handle( + &mut self, + _msg: SpawnChild, + ctx: &mut Context, + ) -> impl std::future::Future> + Send { + let child = ctx.spawn(CounterActor::new()); + async move { child } + } + } + + let system = System::new(); + let parent = system.spawn(ParentActor); + let child = parent.ask(SpawnChild).await.unwrap(); + + assert!(parent.is_alive(), "parent must be alive before shutdown"); + assert!(child.is_alive(), "child must be alive before shutdown"); + + system.shutdown().await; + + // After shutdown(), both parent AND child must be stopped — system + // is the registry root, ctx.spawn registers with the same system. + assert!( + !parent.is_alive(), + "parent should be stopped after shutdown" + ); + assert!(!child.is_alive(), "child should be stopped after shutdown"); + } + + // ───── Shutdown waits for in-flight handler to finish ───────────────── + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn system_shutdown_awaits_in_flight_handler() { + // system.shutdown() must not return until each tracked actor has + // run its mailbox loop to completion. We prove this by having a + // handler signal "started" via a oneshot, then sleep, then increment + // a flag. After shutdown() returns, the flag MUST be observed set — + // proving shutdown awaited the in-flight handler. + use std::sync::atomic::{AtomicBool, Ordering}; + use tokio::sync::oneshot; + + struct LongHandlerActor { + started_tx: Option>, + finished: Arc, + } + impl Actor for LongHandlerActor {} + + struct DoWork; + impl Message for DoWork { + type Result = (); + } + impl Handler for LongHandlerActor { + async fn handle(&mut self, _msg: DoWork, _ctx: &mut Context) { + if let Some(tx) = self.started_tx.take() { + let _ = tx.send(()); + } + runtime::sleep(Duration::from_millis(80)).await; + self.finished.store(true, Ordering::SeqCst); + } + } + + let system = System::new(); + let (tx, rx) = oneshot::channel::<()>(); + let finished = Arc::new(AtomicBool::new(false)); + let addr = system.spawn(LongHandlerActor { + started_tx: Some(tx), + finished: finished.clone(), + }); + + addr.do_send(DoWork).unwrap(); + + // Wait until the handler has started — deterministic, no sleep. + rx.await.expect("handler must signal started"); + + // Now request shutdown. It must wait for the in-flight handler. + system.shutdown().await; + + // After shutdown returns, the handler must have completed. + assert!( + finished.load(Ordering::SeqCst), + "shutdown returned before in-flight handler finished — \ + parent must wait for child" + ); + } + + // ───── Broker: delivery to multiple (>2) subscribers ────────────────── + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn broker_delivers_to_many_subscribers() { + // Broker fans out a single Publish to N subscribers. Existing + // tests only cover 2 subscribers. This validates the core fanout + // semantics with N=5 and uses an ask round-trip on each subscriber + // as a deterministic barrier (no sleep-wait for propagation). + use crate::broker::{Broker, BrokerSubscribe, Publish}; + + #[derive(Clone)] + struct Evt; + impl Message for Evt { + type Result = (); + } + + struct EvtCounter { + count: Arc, + } + impl Actor for EvtCounter {} + impl Handler for EvtCounter { + async fn handle(&mut self, _msg: Evt, _ctx: &mut Context) { + self.count.fetch_add(1, Ordering::SeqCst); + } + } + + // Ping = no-op message used as a FIFO barrier on each subscriber. + // When `ask(Ping)` resolves, the actor has processed every envelope + // queued before it — including any prior Publish-fanout deliveries. + struct Ping; + impl Message for Ping { + type Result = (); + } + impl Handler for EvtCounter { + async fn handle(&mut self, _msg: Ping, _ctx: &mut Context) {} + } + + let system = System::new(); + let broker = system.spawn(Broker::::new()); + + let n = 5usize; + let mut subs: Vec<(Addr, Arc)> = Vec::with_capacity(n); + for _ in 0..n { + let count = Arc::new(AtomicU32::new(0)); + let addr = system.spawn(EvtCounter { + count: count.clone(), + }); + // ask() ensures the subscription is registered before we publish. + broker + .ask(BrokerSubscribe(addr.clone().into())) + .await + .unwrap(); + subs.push((addr, count)); + } + + broker.do_send(Publish(Evt)).unwrap(); + + // Barrier strategy: + // 1. ask() the broker to drain its mailbox through the Publish. + // Use a fresh dummy subscriber — re-subscribing an existing + // one wouldn't change semantics here. + // 2. ask() each subscriber a Ping; FIFO on the mailbox guarantees + // the Evt do_send queued by the broker is processed before + // this Ping resolves. + let dummy_count = Arc::new(AtomicU32::new(0)); + let dummy = system.spawn(EvtCounter { + count: dummy_count.clone(), + }); + broker + .ask(BrokerSubscribe(dummy.clone().into())) + .await + .unwrap(); + // dummy subscribed AFTER the Publish, so it should not have + // received it — sanity check that broker doesn't replay. + dummy.ask(Ping).await.unwrap(); + assert_eq!( + dummy_count.load(Ordering::SeqCst), + 0, + "broker must not replay events to late subscribers" + ); + + for (i, (addr, count)) in subs.iter().enumerate() { + addr.ask(Ping).await.unwrap(); + assert_eq!( + count.load(Ordering::SeqCst), + 1, + "subscriber {i} should have received exactly 1 Evt" + ); + } + + system.shutdown().await; + } + + // ───── Broker: slow subscriber doesn't block other deliveries ───────── + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn broker_slow_subscriber_does_not_block_others() { + // Broker uses do_send (fire-and-forget) per subscriber, so a slow + // handler on one subscriber must not delay delivery to another. + use crate::broker::{Broker, BrokerSubscribe, Publish}; + use tokio::sync::Notify; + + #[derive(Clone)] + struct Evt; + impl Message for Evt { + type Result = (); + } + + struct SlowSub { + release: Arc, + received: Arc, + } + impl Actor for SlowSub {} + impl Handler for SlowSub { + async fn handle(&mut self, _msg: Evt, _ctx: &mut Context) { + // Block until we're explicitly released. Without per-subscriber + // isolation, this would prevent the fast subscriber from + // receiving its delivery. + self.release.notified().await; + self.received.fetch_add(1, Ordering::SeqCst); + } + } + + struct FastSub { + received: Arc, + } + impl Actor for FastSub {} + impl Handler for FastSub { + async fn handle(&mut self, _msg: Evt, _ctx: &mut Context) { + self.received.notify_one(); + } + } + + let system = System::new(); + let broker = system.spawn(Broker::::new()); + + let slow_release = Arc::new(Notify::new()); + let slow_received = Arc::new(AtomicU32::new(0)); + let slow = system.spawn(SlowSub { + release: slow_release.clone(), + received: slow_received.clone(), + }); + + let fast_received = Arc::new(Notify::new()); + let fast = system.spawn(FastSub { + received: fast_received.clone(), + }); + + broker.ask(BrokerSubscribe(slow.into())).await.unwrap(); + broker.ask(BrokerSubscribe(fast.into())).await.unwrap(); + + broker.do_send(Publish(Evt)).unwrap(); + + // Fast subscriber must receive without waiting on slow one. + // Bounded wait is generous (2s) but the actual notify fires + // within microseconds in practice — no flake. + tokio::time::timeout(Duration::from_secs(2), fast_received.notified()) + .await + .expect("fast subscriber should receive while slow is blocked"); + + // Slow subscriber is still blocked — release it and confirm. + assert_eq!(slow_received.load(Ordering::SeqCst), 0); + slow_release.notify_one(); + + // Now slow handler completes. + let deadline = std::time::Instant::now() + Duration::from_secs(2); + while slow_received.load(Ordering::SeqCst) == 0 { + if std::time::Instant::now() >= deadline { + panic!("slow subscriber never received after release"); + } + runtime::sleep(Duration::from_millis(5)).await; + } + + system.shutdown().await; + } } From 04b2b93608cd49e5fdc16f17ddaa4f9e215e0627 Mon Sep 17 00:00:00 2001 From: intendednull Date: Tue, 28 Apr 2026 10:32:23 -0700 Subject: [PATCH 6/7] test(client): cover actions.rs translation logic (#483) Refs #420 --- crates/client/src/actions.rs | 17 ++ crates/client/src/lib.rs | 4 + crates/client/src/tests/actions.rs | 292 +++++++++++++++++++++++++++++ 3 files changed, 313 insertions(+) create mode 100644 crates/client/src/tests/actions.rs diff --git a/crates/client/src/actions.rs b/crates/client/src/actions.rs index 31123f92..cfd918d5 100644 --- a/crates/client/src/actions.rs +++ b/crates/client/src/actions.rs @@ -1,3 +1,20 @@ +//! UI-facing action methods on [`ClientHandle`]. +//! +//! Most entry points in this module are thin pass-throughs that forward +//! their arguments to the corresponding method on +//! [`crate::mutations::ClientMutations`]. Their behaviour is exercised +//! through the mutation handle directly in `tests/multi_peer_sync.rs`, +//! `tests/trust_flow.rs`, `tests/ephemeral.rs`, and the inline `tests` +//! module at the bottom of `lib.rs`. State-machine-level invariants are +//! covered by `crates/state/src/tests.rs`. +//! +//! Methods that do non-trivial translation work *before* delegating — +//! validation (`share_file_inline`), ID minting (`create_voice_channel`), +//! direct event assembly with no mutation-handle helper +//! (`set_permission`, `assign_role`), or derived-view composition +//! (`pinned_message_ids`, `pinned_messages`, `is_pinned`) — are covered +//! at the client tier in `tests/actions.rs`. + use super::*; impl ClientHandle { diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index 3199884d..c5ff1fa1 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -68,6 +68,10 @@ mod tests_profile_view; #[path = "tests/ephemeral.rs"] mod tests_ephemeral; +#[cfg(test)] +#[path = "tests/actions.rs"] +mod tests_actions; + /// How long a typing indicator remains visible after the last typing event, in milliseconds. pub const TYPING_INDICATOR_TTL_MS: u64 = 5_000; diff --git a/crates/client/src/tests/actions.rs b/crates/client/src/tests/actions.rs new file mode 100644 index 00000000..22c55ab1 --- /dev/null +++ b/crates/client/src/tests/actions.rs @@ -0,0 +1,292 @@ +//! Tests for `crates/client/src/actions.rs`. +//! +//! `actions.rs` is mostly a thin pass-through layer that forwards UI +//! calls to [`crate::mutations::ClientMutations`] (whose own behaviour is +//! covered elsewhere — see `multi_peer_sync.rs`, `trust_flow.rs`, the +//! `tests` block at the bottom of `lib.rs`, and the state-machine tests +//! in `crates/state/src/tests.rs`). The only paths that warrant a +//! dedicated test at this tier are the ones that do real translation +//! work *before* delegating: validation, ID minting, derived-view +//! composition. +//! +//! What this file covers: +//! +//! * [`ClientHandle::share_file_inline`] — 256 KB size cap and +//! `[file:NAME:BASE64]` body shape. +//! * [`ClientHandle::create_voice_channel`] — UUID minting and +//! `ChannelKind::Voice` is recorded on the materialized channel. +//! * [`ClientHandle::set_permission`] — translates `(role, perm, +//! granted)` into a `SetPermission` event that lands on the role's +//! permission set, including the revoke (`granted = false`) branch. +//! * [`ClientHandle::assign_role`] — translates `(peer, role)` into an +//! `AssignRole` event that lands on the member's role set. +//! * [`ClientHandle::pinned_message_ids`] / `pinned_messages` / +//! `is_pinned` — channel-name → channel-id lookup, ordering, +//! composition, and missing-channel handling. +//! +//! Pure pass-throughs (e.g. `send_message`, `create_channel`, +//! `propose_revoke_admin`, `mutate_channel_mute`, …) are intentionally +//! NOT re-tested here: their behaviour is exercised through the +//! mutation handle directly in the modules listed above. + +use crate::test_client; +use willow_state::{ChannelKind, Permission}; + +/// `share_file_inline` rejects payloads larger than the 256 KiB cap and +/// does not enqueue any message in that case. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn share_file_inline_rejects_oversized_payload() { + let (client, _rx) = test_client(); + + // 256 KiB + 1 byte — one byte over the documented cap. + let oversized = vec![0u8; 256 * 1024 + 1]; + let err = client + .share_file_inline("general", "big.bin", &oversized) + .await + .expect_err("oversized payload must be rejected"); + assert!( + err.to_string().contains("file too large"), + "error must mention size limit, got: {err}" + ); + + // The rejected call must not have produced a message. + let msgs = client.messages("general").await; + assert!( + msgs.is_empty(), + "no message should have been sent for a rejected file" + ); +} + +/// `share_file_inline` formats the body as `[file:NAME:BASE64(DATA)]` +/// when the payload fits, and the encoded bytes round-trip. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn share_file_inline_emits_base64_encoded_body() { + let (client, _rx) = test_client(); + + let data: &[u8] = b"hello, willow!"; + client + .share_file_inline("general", "note.txt", data) + .await + .expect("inline share must succeed under cap"); + // Let the actor system apply the message event. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let msgs = client.messages("general").await; + let body = &msgs.last().expect("message must be recorded").body; + let expected_prefix = "[file:note.txt:"; + assert!( + body.starts_with(expected_prefix) && body.ends_with(']'), + "body must use [file:NAME:BASE64] shape, got: {body}" + ); + let encoded = &body[expected_prefix.len()..body.len() - 1]; + let decoded = crate::base64::decode(encoded).expect("body payload must be valid base64"); + assert_eq!(decoded, data, "round-trip of inlined bytes must match"); +} + +/// `create_voice_channel` mints a fresh channel with `ChannelKind::Voice`. +/// The mutation handle exposes no `create_voice_channel`, so this whole +/// path lives in `actions.rs` and needs its own coverage. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn create_voice_channel_records_voice_kind() { + let (client, _rx) = test_client(); + + client + .create_voice_channel("lounge") + .await + .expect("voice channel creation must succeed"); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let kinds = client.channel_kinds().await; + let lounge = kinds + .iter() + .find(|(name, _)| name == "lounge") + .expect("lounge channel must exist"); + assert!( + matches!(lounge.1, ChannelKind::Voice), + "lounge must be a Voice channel, got {:?}", + lounge.1 + ); +} + +/// `set_permission(granted = true)` adds the permission to the named +/// role's permission set; `set_permission(granted = false)` removes it. +/// `actions.rs::set_permission` builds the `SetPermission` event itself +/// (no equivalent exists on the mutation handle), so both branches need +/// direct coverage here. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn set_permission_grants_then_revokes_on_role() { + let (client, _rx) = test_client(); + + // Create a role we can mutate. `create_role` mints the UUID, so we + // have to discover the assigned id from materialized state. + client + .create_role("Moderator") + .await + .expect("role creation must succeed"); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let role_id = { + let snap = client.state_snapshot().await; + snap.roles + .values() + .find(|r| r.name == "Moderator") + .expect("Moderator role must exist") + .id + .clone() + }; + + // Grant. + client + .set_permission(&role_id, Permission::ManageChannels, true) + .await + .expect("granting permission on owner-authored role must succeed"); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let granted_perms = client + .state_snapshot() + .await + .roles + .get(&role_id) + .expect("role must still exist") + .permissions + .clone(); + assert!( + granted_perms.contains(&Permission::ManageChannels), + "role must hold ManageChannels after grant, got {granted_perms:?}" + ); + + // Revoke (granted = false branch). + client + .set_permission(&role_id, Permission::ManageChannels, false) + .await + .expect("revoking permission must succeed"); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let revoked_perms = client + .state_snapshot() + .await + .roles + .get(&role_id) + .expect("role must still exist") + .permissions + .clone(); + assert!( + !revoked_perms.contains(&Permission::ManageChannels), + "role must lack ManageChannels after revoke, got {revoked_perms:?}" + ); +} + +/// `assign_role` puts the role id into the target member's `roles` set. +/// Like `set_permission`, this entry point assembles the event itself +/// rather than delegating to a mutation-handle helper. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn assign_role_adds_role_to_member() { + let (client, _rx) = test_client(); + let me = client.identity.endpoint_id(); + + client + .create_role("Moderator") + .await + .expect("role creation must succeed"); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let role_id = { + let snap = client.state_snapshot().await; + snap.roles + .values() + .find(|r| r.name == "Moderator") + .expect("Moderator role must exist") + .id + .clone() + }; + + client + .assign_role(me, &role_id) + .await + .expect("assigning role to self (owner) must succeed"); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let snap = client.state_snapshot().await; + let member = snap + .members + .get(&me) + .expect("local peer must be a member of its own server"); + assert!( + member.roles.contains(&role_id), + "member must have role assigned, got roles {:?}", + member.roles + ); +} + +/// `pinned_message_ids` returns an empty vec for an unknown channel +/// rather than panicking — the channel-name → channel-id lookup falls +/// back to a default-empty channel id and the subsequent `channels.get` +/// returns `None`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn pinned_message_ids_empty_for_unknown_channel() { + let (client, _rx) = test_client(); + let ids = client.pinned_message_ids("does-not-exist").await; + assert!( + ids.is_empty(), + "unknown channel must yield no pinned ids, got {ids:?}" + ); + + let msgs = client.pinned_messages("does-not-exist").await; + assert!( + msgs.is_empty(), + "unknown channel must yield no pinned messages" + ); +} + +/// End-to-end pin lifecycle exercises the composition chain inside +/// `actions.rs`: `pin_message` (delegated) → `pinned_message_ids` +/// (channel-name lookup + sort) → `pinned_messages` (filter messages +/// view) and `is_pinned` (membership test). Unpinning then has to +/// remove the entry from each of those derived views. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn pin_message_flows_through_pinned_views() { + let (client, _rx) = test_client(); + + // Author a message we can pin. + client.send_message("general", "pin me").await.unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + let msg = client + .messages("general") + .await + .into_iter() + .find(|m| m.body == "pin me") + .expect("authored message must be present"); + let msg_hash: willow_state::EventHash = + msg.id.parse().expect("DisplayMessage.id is hex EventHash"); + + // Initially nothing is pinned. + assert!(client.pinned_message_ids("general").await.is_empty()); + assert!(!client.is_pinned("general", &msg_hash).await); + assert!(client.pinned_messages("general").await.is_empty()); + + // Pin it. + client + .pin_message("general", &msg_hash) + .await + .expect("owner pin must succeed"); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let ids = client.pinned_message_ids("general").await; + assert_eq!(ids, vec![msg_hash], "pinned id list must contain the pin"); + assert!(client.is_pinned("general", &msg_hash).await); + + let pinned = client.pinned_messages("general").await; + assert_eq!(pinned.len(), 1, "pinned messages must surface one entry"); + assert_eq!(pinned[0].body, "pin me"); + + // Unpin and re-check every derived view drops it. + client + .unpin_message("general", &msg_hash) + .await + .expect("owner unpin must succeed"); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + assert!(client.pinned_message_ids("general").await.is_empty()); + assert!(!client.is_pinned("general", &msg_hash).await); + assert!(client.pinned_messages("general").await.is_empty()); +} From 3df1b33ea46a212ed577fff970752b96c81bb4f7 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 28 Apr 2026 17:33:59 +0000 Subject: [PATCH 7/7] chore(skill): worktree-fallback + stale-audit-with-residual-gap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fold lessons from auto-fix batch claude/friendly-maxwell-bVlpW into the resolving-issues skill: 1. Sandbox signing-service rejects worktree-path commits (hit by 2 of 7 implementers this run); document the canonical-dir fallback in Setup. 2. New "stale-audit-with-residual-gap" path on the implementer agent — when an audit's literal premise is outdated but a residual concern remains, narrow scope and ship the residual; keep the issue closer under Fixes. 3. Already-fixed-upstream path: clarify completed vs not_planned choice based on whether the audit's intent still holds. --- .claude/skills/resolving-issues/SKILL.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.claude/skills/resolving-issues/SKILL.md b/.claude/skills/resolving-issues/SKILL.md index a2769ffc..7802cda4 100644 --- a/.claude/skills/resolving-issues/SKILL.md +++ b/.claude/skills/resolving-issues/SKILL.md @@ -82,7 +82,8 @@ Fresh agent per issue, scoped to one issue + master branch ref. Steps: 9. **Merge gate:** if sub-PR CI runs (rare — only when workflow `branches: [main]` filter matches), wait for green. If CI doesn't run (sub-PR base ≠ main is the common case), local `just check` green from step 7 IS the gate. Merge with `mcp__github__merge_pull_request` `merge_method: squash`. 10. CI red after one fix attempt OR local `just check` red OR mid-fix block → **file a follow-up GH issue** (caveman body, link the original issue + cite the blocker), then **close the sub-PR** (don't leave it as a draft for someone to resume). The next scheduled run will see the follow-up issue in the queue and pick it up. Return control to coordinator. 11. Tear down worktree on merge OR on close-after-blocker. -12. **Already-fixed-upstream path:** if pre-flight investigation (e.g. `cargo audit`, file-state grep, `cargo tree`) shows the issue was resolved by a recently-merged upstream PR, do NOT open a dead sub-PR. Leave a caveman comment on the original issue naming the upstream PR + the fix location, close the issue as `completed`, tear down the worktree, report back. Coordinator records this under `## Already-Fixed` in the master PR — NOT under `Fixes`. +12. **Already-fixed-upstream path:** if pre-flight investigation (e.g. `cargo audit`, file-state grep, `cargo tree`) shows the issue was resolved by a recently-merged upstream PR, do NOT open a dead sub-PR. Leave a caveman comment on the original issue naming the upstream PR + the fix location, close the issue (`completed` if the audit's intent now holds; `not_planned` if the audit's premise is moot — e.g. the targeted code was deleted), tear down the worktree, report back. Coordinator records this under `## Already-Fixed` in the master PR — NOT under `Fixes`. +13. **Stale-audit-with-residual-gap path:** if pre-flight investigation shows the audit's literal premise is stale (e.g. "zero tests" — but a later PR added some) but its underlying concern is partially valid (some specific gap remains), narrow scope to the residual gap and ship that. Note the audit's stale framing + cite the upstream PR that resolved most of it in the sub-PR body. Coordinator still records under `Fixes #N` because the audit issue is the right closer. ## Lessons Learned @@ -130,6 +131,7 @@ Never defer skill edits to a follow-up — they ship with the run that surfaced - Pre-worktree: `git stash` or `git restore` main dir; `.claude/worktrees/` in `.gitignore`. - Worktree per issue, branched off master branch. Tear down after sub-PR merges or parks as draft. +- **Sandbox-specific worktree fallback.** Some sandboxed environments run a commit-signing service that rejects sources outside `/home/user/` with `400 missing source`. If `git commit` inside a worktree fails this way, do NOT fight it: keep the worktree's diff intact, switch to the canonical repo dir on the same branch (`git checkout ` after `git fetch origin `), copy or stage the modified files, commit + push from there. End state matches the worktree-based flow. Tear down the worktree the same way regardless. ## Quality