diff --git a/CLAUDE.md b/CLAUDE.md index f390dfaf..7962ec36 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -54,6 +54,10 @@ crates/ ├── channels.rs — Channel/role/member management, invites, trust ├── settings.rs — Settings view systems └── files.rs — File picker systems + +docs/superpowers/ +├── specs/ — Design specs for new features and architecture changes +└── plans/ — Implementation plans referencing the specs ``` ## Build & Test diff --git a/docs/plans/2026-03-30-actor-system.md b/docs/plans/2026-03-30-actor-system.md new file mode 100644 index 00000000..21c41bb8 --- /dev/null +++ b/docs/plans/2026-03-30-actor-system.md @@ -0,0 +1,297 @@ +# Actor System Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Replace hand-rolled channel/task patterns across worker, client, and web crates with a formalized actor system crate (`willow-actor`) that works on both native and WASM targets, eliminating all locks from the shared state path. + +**Architecture:** A pure infrastructure crate (`willow-actor`) provides `Actor`, `Handler`, `StreamHandler`, `Addr`, `System`, and a thin platform abstraction (tokio on native, futures-channel + gloo-timers on WASM). Worker actors, client state, and web UI event loops are migrated to use these primitives. + +**Tech Stack:** Rust, tokio (native), futures-channel (WASM), wasm-bindgen-futures, gloo-timers, RPITIT (Rust 1.75+) + +**Spec:** `docs/specs/2026-03-29-actor-system-design.md` + +--- + +## File Map + +### New Crate + +``` +crates/actor/ +├── Cargo.toml +└── src/ + ├── lib.rs — Public API re-exports + ├── actor.rs — Actor, Handler, StreamHandler, Message traits + ├── addr.rs — Addr, AnyAddr, Recipient + ├── context.rs — Context, interval, stream attachment + ├── envelope.rs — BoxEnvelope, type-erased message dispatch + ├── mailbox.rs — Unbounded channel recv loop + ├── runtime.rs — Platform abstraction (spawn, channels, timers) + ├── supervisor.rs — RestartPolicy, supervised spawn + ├── system.rs — System, SystemHandle + └── error.rs — SendError, AskError +``` + +### Modified Files + +``` +crates/worker/Cargo.toml — Add willow-actor dependency +crates/worker/src/actors/mod.rs — Remove StateMsg enum (replaced by per-message types) +crates/worker/src/actors/state.rs — StateActor with Handler, Handler, etc. +crates/worker/src/actors/network.rs — NetworkActor with StreamHandler +crates/worker/src/actors/heartbeat.rs — HeartbeatActor with ctx.run_interval() +crates/worker/src/actors/sync.rs — SyncActor with ctx.run_interval() +crates/worker/src/runtime.rs — Replace manual channel/spawn/join with System + +crates/client/Cargo.toml — Add willow-actor dependency +crates/client/src/lib.rs — Replace Arc> with state actor +crates/client/src/listeners.rs — Replace spawn_topic_listener with StreamHandler actor +crates/client/src/state.rs — State accessor methods become ask() calls +crates/client/src/events.rs — Delete ClientEvent enum (replaced by derived state) + +crates/web/Cargo.toml — Add willow-actor dependency +crates/web/src/app.rs — Remove event loop, wire derived signals +crates/web/src/event_processing.rs — DELETE (replaced by derived state actors) +crates/web/src/state.rs — Rewrite: derived_signal() calls replace create_signals() +crates/web/src/components/*.rs — Replace handle state reads with derived signals + +Cargo.toml — Add actor to workspace members +``` + +--- + +## Task 1: Runtime Abstraction Module + +**Files:** +- Create: `crates/actor/Cargo.toml` +- Create: `crates/actor/src/lib.rs` +- Create: `crates/actor/src/runtime.rs` +- Create: `crates/actor/src/error.rs` +- Modify: `Cargo.toml` (workspace members) + +The platform abstraction is the foundation everything else builds on. It must compile on both native and `wasm32-unknown-unknown` before any actor types are defined. + +- [ ] **Step 1: Create crate skeleton.** `Cargo.toml` with workspace edition/version, dependencies split by target: `tokio` (native), `wasm-bindgen-futures` + `futures-channel` + `gloo-timers` (WASM). Shared deps: `futures-core`, `thiserror`, `tracing`. Add `actor` to workspace `Cargo.toml` members. + +- [ ] **Step 2: Implement `runtime.rs`.** Four functions: `spawn()` (tokio::spawn vs spawn_local), `unbounded_channel()` (tokio mpsc vs futures mpsc), `oneshot()` (tokio vs futures), `sleep()` (tokio vs gloo-timers). Define `Sender` / `Receiver` / `OneshotTx` / `OneshotRx` wrapper types that unify the two backends behind a common API. The `Receiver` must implement both `async fn recv() -> Option` (blocks until a message arrives) and `fn try_recv() -> Option` (non-blocking, returns `None` if empty). `try_recv` is required for the mailbox drain loop (Task 2 Step 6). Both `tokio::sync::mpsc::UnboundedReceiver` and `futures::channel::mpsc::UnboundedReceiver` support this. + +- [ ] **Step 3: Implement `error.rs`.** `SendError` with `Closed(M)` variant. `AskError` with `Closed` and `NoResponse` variants. Both derive `Debug`, `thiserror::Error`. + +- [ ] **Step 4: Verify dual-target compilation.** Run `cargo check -p willow-actor` (native) and `cargo check -p willow-actor --target wasm32-unknown-unknown` (WASM). Both must pass with zero warnings. + +--- + +## Task 2: Core Actor Traits and Envelope + +**Files:** +- Create: `crates/actor/src/actor.rs` +- Create: `crates/actor/src/envelope.rs` +- Create: `crates/actor/src/mailbox.rs` + +Defines the trait hierarchy and the type-erased message dispatch mechanism. + +- [ ] **Step 1: Define `Message` trait.** `Send + 'static` with `type Result: Send + 'static`. **Convenience:** consider a `#[derive(Message)]` proc macro or a declarative macro `message!(MyMsg => ResponseType)` to reduce boilerplate. Not required for initial implementation — can be added later if the manual impls feel verbose. + +- [ ] **Step 2: Define `Actor` trait.** `Send + 'static + Sized` with lifecycle hooks using RPITIT (not async_trait), all with default no-op impls: + - `started(&mut self, ctx: &mut Context)` — called once before processing messages + - `stopped(&mut self)` — called when the actor is shutting down + - `idle(&mut self, ctx: &mut Context)` — called after the mailbox drains all immediately-available messages (i.e., when `try_recv()` returns empty). Used for batched notification patterns. + +- [ ] **Step 3: Define `Handler` trait.** `fn handle(&mut self, msg: M, ctx: &mut Context) -> impl Future + Send`. Supertrait is `Actor`. + +- [ ] **Step 4: Define `StreamHandler` trait.** `handle_stream_item()` and `stream_finished()` with RPITIT. Supertrait is `Actor`. + +- [ ] **Step 5: Implement `envelope.rs`.** `BoxEnvelope` type alias: `Box) -> BoxFuture<'_, ()> + Send>`. Two factory functions: `envelope_send(msg) -> BoxEnvelope` (fire-and-forget, drops the handler's return value) and `envelope_ask(msg, reply_tx) -> BoxEnvelope` (captures oneshot sender, sends the return value back). Both wrap the handler call in a closure. **Implementation note:** the generic bounds on these functions are `A: Handler, M: Message` — the monomorphization happens at the call site (`Addr::send`/`Addr::ask`), and the resulting closure is type-erased into the `BoxEnvelope`. This is where per-message-type dispatch is compiled in. + +- [ ] **Step 6: Implement `mailbox.rs`.** `async fn run_mailbox(mut actor: A, ctx: Context, rx: Receiver>, stop: Arc, done: OneshotTx<()>)`: actor is moved in and mutated via `&mut` for its lifetime. The `Context` and `done` oneshot are provided by the caller (System in Task 3, test harness here). Loop structure: + 1. Call `actor.started(&mut ctx)` + 2. `recv().await` — blocks until at least one message arrives + 3. Execute the envelope + 4. `try_recv()` in a loop — drain all immediately-available messages without yielding, executing each envelope + 5. Call `actor.idle(&mut ctx)` — queue is now empty + 6. Check stop flag — if set, exit loop + 7. Go to step 2 + + On exit (channel closed or stop flag), call `actor.stopped()`, then signal `done`. For Task 2 tests, construct a minimal `Context` with a dummy `SystemHandle`. + +- [ ] **Step 7: Write mailbox-level tests.** Test `run_mailbox` directly by creating a channel, sending `BoxEnvelope`s manually, and verifying the actor processes them. Test that the mailbox loop exits when the sender is dropped. Full `Addr`/`System`-level tests come in Task 3. + +--- + +## Task 3: Addr, Context, and System + +**Files:** +- Create: `crates/actor/src/addr.rs` +- Create: `crates/actor/src/context.rs` +- Create: `crates/actor/src/system.rs` +- Update: `crates/actor/src/lib.rs` + +Wires everything together into the public API. + +- [ ] **Step 1: Implement `Addr`.** Wraps `runtime::Sender>`. Methods: `send()` wraps msg in `envelope_send`, sends on channel. `ask()` creates a oneshot, wraps msg in `envelope_ask`, sends on channel, awaits oneshot receiver. `is_alive()` checks if channel is open. `Clone` impl. + +- [ ] **Step 2: Implement `AnyAddr`.** Type-erased address that can only signal stop (drops a held sender) and check liveness. `From>` impl. + +- [ ] **Step 3: Implement `Context`.** Fields: `addr: Addr`, `system: SystemHandle`, `stop: Arc` (shared with the mailbox loop). Methods: `address()`, `stop()` (sets the `AtomicBool` — mailbox checks it between messages), `system()`, `spawn()` (delegates to system), `add_stream()` (spawns a task that polls the stream and forwards items as `StreamEnvelope`s — a separate envelope variant that calls `StreamHandler::handle_stream_item` instead of `Handler::handle`; stream end sends a `StreamFinishedEnvelope`), `run_interval()` (spawns a task that sleeps + sends a message on each tick, returns `IntervalHandle` with cancel). + +- [ ] **Step 4: Implement `System` / `SystemHandle`.** `System::new()` creates the handle. `SystemHandle` is `Clone` and holds a list of `AnyAddr`s (for shutdown) plus a list of `OneshotRx<()>` (one per actor, signaled when `run_mailbox` exits). `spawn()` creates a channel, a done oneshot, builds `Context`, spawns `run_mailbox` via `runtime::spawn`, stores the `OneshotRx`, returns `Addr`. `shutdown()` drops all tracked addresses (closing mailboxes) then awaits all done oneshots to confirm actors have stopped. + +- [ ] **Step 5: Implement `Recipient`.** Internal `RecipientSender` trait with `send()` and `ask()`. `Addr` implements it for any `A: Handler`. `Recipient` wraps `Box>`. `From>` impl. + +- [ ] **Step 6: Wire up `lib.rs` re-exports.** Public API: `Actor`, `Handler`, `StreamHandler`, `Message`, `Addr`, `AnyAddr`, `Recipient`, `Context`, `System`, `SystemHandle`, `SendError`, `AskError`, `IntervalHandle`. (`RestartPolicy` added in Task 4.) + +- [ ] **Step 7: Write integration tests.** Multi-actor test: spawn two actors, actor A sends to actor B, B replies. Test `StreamHandler` with a `futures::stream::iter`. Test `run_interval` fires expected number of times. Test shutdown stops all actors. Test `idle()` batching: send N messages at once, verify `idle()` is called once (not N times) by checking a counter in the actor. + +- [ ] **Step 8: Verify WASM compilation.** `cargo check -p willow-actor --target wasm32-unknown-unknown`. Add to `just check-wasm`. + +--- + +## Task 4: Supervision + +**Files:** +- Create: `crates/actor/src/supervisor.rs` +- Update: `crates/actor/src/context.rs` + +- [ ] **Step 1: Define `RestartPolicy`.** Enum: `Never`, `OnFailure { max: u32 }`, `Backoff { initial: Duration, max_delay: Duration, max_retries: u32 }`. + +- [ ] **Step 2: Implement `Context::spawn_supervised()`.** Takes `child: C` where `C: Actor + Clone` and `policy: RestartPolicy`. The wrapper task owns both the channel receiver and the actor clone. On restart, it clones the original actor, creates a fresh stop flag, and re-enters `run_mailbox` — but reuses the same channel. The `Addr` returned to callers points at this stable channel, so it remains valid across restarts. Panics are caught via `std::panic::catch_unwind` (requires `AssertUnwindSafe` wrapper). The wrapper respects `RestartPolicy` limits and backoff delays. + +- [ ] **Step 3: Write tests.** Test: actor that panics after N messages gets restarted up to `max` times. Test: `Never` policy does not restart. Test: `Backoff` delays between restarts. + +--- + +## Task 5: Worker Migration + +**Files:** +- Modify: `crates/worker/Cargo.toml` +- Modify: `crates/worker/src/actors/mod.rs` +- Modify: `crates/worker/src/actors/state.rs` +- Modify: `crates/worker/src/actors/network.rs` +- Modify: `crates/worker/src/actors/heartbeat.rs` +- Modify: `crates/worker/src/actors/sync.rs` +- Modify: `crates/worker/src/runtime.rs` + +Migrate the four hand-rolled worker actors to use `willow-actor`. This is the first real consumer and validates the API. + +- [ ] **Step 1: Define message types in `actors/mod.rs`.** Replace `StateMsg` enum with individual message structs: `EventMsg(Event)`, `RequestMsg { req, reply: ... }` → becomes ask-pattern `WorkerRequestMsg(WorkerRequest)` with `type Result = WorkerResponse`, `GetRoleInfoMsg` with `type Result = WorkerRoleInfo`, `GetStateHashesMsg` with `type Result = Vec<(String, StateHash)>`, `ServerDiscoveredMsg { server_id }`. Remove `NetworkOutMsg` (network actor no longer needs a channel — it holds TopicHandle directly). Remove `StateMsg::Shutdown` (handled by mailbox close). + +- [ ] **Step 2: Rewrite `state.rs` as `StateActor`.** Struct holds `Box`. Implement `Actor` (no lifecycle hooks needed). Implement `Handler`, `Handler`, `Handler`, `Handler`, `Handler`. Each handler is 1-3 lines — delegates to `self.role`. Remove the manual `run()` function and its `while let` loop. **Implementation note:** the worker `StateActor` does not need `idle()` or subscriber notifications — it's a simple request-reply actor with no UI signals. The `idle()`/dirty/subscriber pattern is only for `ClientStateActor` in Task 6. + +- [ ] **Step 3: Rewrite `network.rs` as `NetworkActor`.** Struct holds `Addr`, `EndpointId`, and `Option` where `E: TopicEvents + 'static` (the `'static` bound is required because `Actor: 'static`; taken in `started()`). `TopicEvents` is not a `Stream` trait — it has an async `next()` method. Write a thin adapter (`TopicEventStream`) that wraps a `TopicEvents` impl into a `futures::Stream` (filtering errors with a warning log). Implement `StreamHandler` — the `handle_stream_item` replaces the `while let` loop. Keep `parse_worker_message()` and `parse_server_message()` as pure functions. In `started()`, call `self.events.take().unwrap()` to extract the topic events, wrap in `TopicEventStream`, and attach via `ctx.add_stream()`. Remove the manual `run()` function. **Implementation note:** `TopicEventStream` can implement `Stream` via `poll_fn` or `async_stream` — wrap the `next().await` call in a `Pin>`. This adapter is also needed by `TopicListenerActor` in Task 6, so consider placing it in `willow-network` (behind the existing `test-utils` feature or a new `stream` feature) to avoid duplication. + +- [ ] **Step 4: Rewrite `heartbeat.rs` as `HeartbeatActor`.** Struct holds `EndpointId`, `Addr`, and the `TopicHandle` (owned, not borrowed — actor owns it for its lifetime). Define `HeartbeatTick` message. Implement `Handler` — queries state actor via `state_addr.ask(GetRoleInfoMsg)`, broadcasts announcement. In `started()`, call `ctx.run_interval(duration, || HeartbeatTick)`. Implement `stopped()` to broadcast departure message via `self.topic.broadcast()` — best-effort (may fail silently if network is already shut down). Remove `shutdown: watch::Receiver` — the actor stops when its address is dropped. + +- [ ] **Step 5: Rewrite `sync.rs` as `SyncActor`.** Same pattern as heartbeat. Define `SyncTick` message. `Handler` queries state hashes and broadcasts sync requests. `started()` calls `ctx.run_interval()`. Remove watch-based shutdown. + +- [ ] **Step 6: Rewrite `runtime.rs`.** Replace manual channel creation, `tokio::spawn`, watch channel, and `tokio::join!` with: create `System`, spawn all four actors, `ctrl_c().await`, `system.shutdown().await`. The function stays generic over `N: Network`. + +- [ ] **Step 7: Update existing tests.** Adapt tests in `state.rs`, `network.rs`, `heartbeat.rs` to use the actor system. Tests that sent `StateMsg` directly now use `Addr::send()` / `ask()`. Tests that checked `watch::Receiver` for shutdown now verify the actor stops when the system shuts down. + +- [ ] **Step 8: Run `just test-crate worker`.** All existing worker tests must pass. Run `just clippy` — zero warnings. + +--- + +## Task 6: Client Library Migration + +**Files:** +- Modify: `crates/client/Cargo.toml` +- Modify: `crates/client/src/lib.rs` +- Modify: `crates/client/src/listeners.rs` +- Modify: `crates/client/src/state.rs` +- Modify: `crates/client/src/events.rs` +- Modify: `crates/client/src/ops.rs` +- Modify: `crates/client/src/invite.rs` +- Modify: `crates/client/src/files.rs` +- Modify: `crates/client/src/worker_cache.rs` + +Replace `Arc>` and `futures::channel::mpsc` with actors. The state actor becomes the single source of truth, with a subscriber notification mechanism for derived state. + +- [ ] **Step 1: Define `ClientStateActor`.** Holds `SharedState` directly (no Arc, no RwLock) plus a `Vec>` subscriber list and a `dirty: bool` flag. Audit all `shared.write()` and `shared.read()` call sites in the client crate to discover the full message set — expect ~10-15 mutation messages and ~5-10 query messages. Define message structs for each. **Convenience:** to avoid repeating `self.dirty = true` in every mutation handler, add a helper method `fn mutate(&mut self, f: impl FnOnce(&mut SharedState))` that applies the closure and sets `dirty = true`. Mutation handlers become one-liners: `self.mutate(|s| s.peers.push(peer))`. Also implement: + - `Subscribe(Recipient)` — register a new watcher + - `ReadState` — type-erased selector query: carries `Box Box + Send>` and replies with `Box`. Note: the closure is `FnOnce` because each `ReadState` message is constructed fresh per notification — the derived actor holds its selector as `Fn` and wraps it in a new `FnOnce` closure for each ask. The derived actor downcasts the `Box` response to its concrete `T`. This is the one place `Any` is required. + + **Notification batching:** Mutations don't notify subscribers individually. Instead, the state actor uses a `dirty: bool` flag. Each mutation handler sets `dirty = true` but does not send `StateChanged` directly. The mailbox loop is modified to drain: after processing one envelope via `recv().await`, it calls `try_recv()` in a loop to process all immediately-available messages without yielding. Only after the queue is drained does it call a new `Actor::idle()` hook. The `ClientStateActor` implements `idle()`: if `dirty`, send `StateChanged` to all subscribers and reset the flag. This means a burst of N mutations from a sync batch processes all N, then sends a single `StateChanged` round. The `idle()` hook is added to the `Actor` trait in Task 2 with a default no-op. + + **Contract with Task 7:** The `Subscribe`, `StateChanged`, and `ReadState` messages form the interface that `DerivedStateActor` (defined in the web crate) depends on. Design them as a stable API. + +- [ ] **Step 2: Define `TopicListenerActor`.** Replaces `spawn_topic_listener`. Implements `StreamHandler` for gossip events. Holds `Addr` and `TopicHandle`. In `handle_stream_item`, sends mutations to the state actor. All state flows through the actor — typing indicators, connection status, and voice state are fields in `SharedState` mutated via messages, not separate event channels. + +- [ ] **Step 3: Refactor `ClientHandle`.** Replace `shared: Arc>` with `state: Addr`. Also move `topics: Arc>>` into the state actor (or a dedicated topic actor). Remove `event_tx` entirely — derived state actors replace the event channel for all UI updates (including ephemeral state like typing and connection status). **Note:** this makes previously-synchronous state accessors async (`shared.read()` → `state.ask().await`). All callers in `lib.rs`, `ops.rs`, `invite.rs`, `files.rs`, and `worker_cache.rs` must be updated to `.await` state queries. Methods that were `fn` become `async fn`. + +- [ ] **Step 4: Update `listeners.rs`.** Replace `spawn_topic_listener()` with spawning a `TopicListenerActor` on the system. Remove the manual `topic_listener_loop`. + +- [ ] **Step 5: Delete `ClientEvent` enum.** Remove `crates/client/src/events.rs` (or gut it). The `ClientEvent` enum, the `event_tx` channel, and all event emission are replaced by state actor mutations + subscriber notifications. Grep for `ClientEvent` across the workspace to find and remove all references. + +- [ ] **Step 6: Update client tests.** Tests using `test_client()` helper need updating — shared state access changes from lock-based to ask-based. Tests that asserted on `ClientEvent` emissions must switch to asserting on state actor state via `ask()`. Verify all `just test-client` tests pass. + +- [ ] **Step 7: Run `just test-client` and `just clippy`.** All 93+ client tests must pass with zero warnings. + +--- + +## Task 7: Web UI Migration — Derived State Signals + +**Files:** +- Modify: `crates/web/Cargo.toml` +- Modify: `crates/web/src/app.rs` +- Delete: `crates/web/src/event_processing.rs` +- Rewrite: `crates/web/src/state.rs` +- Modify: `crates/web/src/components/*.rs` (remove direct handle state reads) + +Replaces the `ClientEvent` → `process_event_batch` → signal update pipeline with selector-based derived state actors. + +### Actor ↔ Leptos Bridge + +**Current flow** (event-driven, pull-based): +``` +TopicEvents → spawn_local loop → ClientEvent channel → process_event_batch() → WriteSignal::set() + ↓ reads handle.peers(), handle.messages(), etc. + (sync reads via Arc>) +``` + +**New flow** (push-based, selector-driven): +``` +Network → TopicListenerActor → mutations → ClientStateActor + ↓ StateChanged notification + DerivedStateActors (one per signal) + ↓ selector(state) → compare → signal.set() if changed + Leptos reactive signals +``` + +The `ClientEvent` channel and `process_event_batch` are eliminated for state-derived signals. Each Leptos signal is backed by a `DerivedStateActor` that watches a specific slice of `SharedState` via a selector function. + +- [ ] **Step 1: Implement `DerivedStateActor`.** Generic actor parameterized by `T: PartialEq + Clone + Default + Send + 'static`. Fields: `state_addr: Addr`, `selector: Arc T + Send + Sync>` (must be `Arc` so it can be cloned into each `ReadState` closure — a `Box` borrow would not be `'static`), `cached: Option`, `write: SendWrapper>` (Leptos's `WriteSignal` is `!Send` — `SendWrapper` makes it `Send`, safe on single-threaded WASM). Implements `Handler`: clones the `Arc` selector, constructs a `ReadState { Box::new(move |state| Box::new(selector(state))) }`, asks state actor, downcasts `Box` response to `T`, compares with cached, calls `self.write.set(new_value)` if different. In `started()`: subscribes to state actor via `Subscribe`, then immediately asks for the current value to seed the signal. + +- [ ] **Step 2: Implement `derived_signal` helper.** A function in the web crate (not in willow-actor — it depends on Leptos): `fn derived_signal(state_addr, system, selector) -> ReadSignal`. Creates a Leptos signal pair, spawns a `DerivedStateActor`, returns the read half. This is the primary API for connecting actor state to Leptos. + +- [ ] **Step 3: Rewrite `state.rs`.** Replace the current `create_signals()` function (which creates ~30 independent signals) with calls to `derived_signal()`. Each signal maps to a selector: + - `channels` → `|s| s.state.channels.clone()` + - `peers` → `|s| s.state.chat.peers.clone()` + - `display_name` → `|s| s.state.display_name.clone()` + - `messages` → `|s| s.state.messages_for(&s.state.current_channel)` — this requires `current_channel` to be in `SharedState`, not a standalone Leptos signal. **Move `current_channel` into `SharedState`** so the selector can read it. Channel switching becomes a state actor mutation (`SetCurrentChannel`), and the messages selector naturally picks up the new channel. + - etc. + + Signals that are purely local UI state (e.g., `show_settings`, `show_palette`, `current_tab`) remain as regular Leptos signals — no actor needed. The rule: if other selectors depend on it, it belongs in `SharedState`; if nothing else reads it, it stays as a local signal. + + **Cloning cost:** Selectors return owned `T`, which means cloning state out of the actor on every `ReadState`. For small types (`String`, `bool`, `Vec`) this is trivial. For expensive types (e.g., `Vec` with hundreds of messages), wrap the field in `Arc` inside `SharedState` — then the selector clones the `Arc` (cheap pointer bump) instead of deep-cloning the data. The `PartialEq` check on `Arc>` compares pointers first (fast path: same Arc = no change), falling back to element comparison only when the Arc was replaced. Apply `Arc` wrapping selectively to fields where profiling shows cloning is a bottleneck — don't pre-optimize every field. + +- [ ] **Step 4: Update `app.rs`.** Remove the `spawn_local` event loop that drained `ClientEvent`s and called `process_event_batch`. Remove `refresh_all_signals`. The `ClientHandle` connection still happens in a `spawn_local` (network setup is async), but signal updates are now automatic via derived state actors. All state — including typing indicators, connection status, and voice participants — flows through the state actor and derived signals. + +- [ ] **Step 5: Delete `event_processing.rs`.** The entire module is replaced by derived state actors. The `process_event_batch` function, `needs_*_refresh` flags, `ClientEvent` enum, and event-to-signal mapping are all gone. + +- [ ] **Step 6: Update components.** Components that called `handle.peers()`, `handle.messages()`, etc. directly now read from their corresponding derived signal instead. Components that perform actions (send message, create channel) still call `handle.send_message()` etc. — these are now async (Task 6), so sync event handlers (button clicks, key presses) must wrap action calls in `spawn_local`. Grep for `handle.` in components and classify each call: state read → replace with signal, action → wrap in `spawn_local` if in a sync context. + +- [ ] **Step 7: Verify WASM compilation.** `just check-wasm` must pass. + +- [ ] **Step 8: Run `just test-browser`.** All 39+ browser tests must pass. + +--- + +## Task 8: Final Validation + +- [ ] **Step 1: Run `just check`.** Full suite: fmt + clippy + test + WASM. Zero warnings. + +- [ ] **Step 2: Run `just test-scale`.** Verify no performance regression in event throughput or merge benchmarks. + +- [ ] **Step 3: Run `just test-all`.** All 420+ tests pass. + +- [ ] **Step 4: Update `CLAUDE.md`.** Add `crates/actor/` to the repository structure. Update the architecture notes to describe the actor system. diff --git a/docs/specs/2026-03-29-actor-system-design.md b/docs/specs/2026-03-29-actor-system-design.md new file mode 100644 index 00000000..a1a16caa --- /dev/null +++ b/docs/specs/2026-03-29-actor-system-design.md @@ -0,0 +1,823 @@ +# Actor System Design Spec + +**Date**: 2026-03-29 +**Status**: Ready for implementation + +## Existing Solutions + +A survey of existing Rust actor crates was conducted to determine whether +an off-the-shelf solution could be adopted. Summary: + +| Crate | Version | WASM | Send req | Supervision | Handlers | Status | +|-------|---------|------|----------|-------------|----------|--------| +| **ractor** | 0.15.12 | **Yes** (`tokio_with_wasm`) | Send+Sync | Yes (Erlang) | Single `Msg` enum | Active (2026-03) | +| **kameo** | 0.19.2 | No (tokio) | Send | Yes (OneForOne) | Per-message `Message` | Active (2025-11) | +| **actix** | 0.13.5 | No (tokio) | Unpin (no Send) | Basic | Per-message `Handler` | Passive | +| **xtra** | 0.6.0 | **Yes** (`wasm_bindgen`) | Send | No | Per-message `Handler` | Low (2024-02) | +| **coerce** | 0.8.11 | No (tokio full) | Send+Sync | Yes | Per-message `Handler` | Dormant (2023) | +| **xactor** | 0.7.11 | No | Send | No | Per-message `Handler` | Dead (2020) | +| **xtor** | 0.9.10 | **Yes** (`wasm_bindgen`) | Send | Yes | Per-message handler | Dead (2022) | +| **stakker** | 0.2.14 | Provisional | Not Send | No | Macro-based callbacks | Niche | + +### ractor — first-class WASM, Erlang-style + +[ractor](https://github.com/slawlor/ractor) (546k downloads, MIT) is +the most actively maintained option and has **first-class WASM support** +with 84 passing browser tests. Key details: + +- **WASM runtime**: Uses `tokio_with_wasm` (a shim that provides + tokio-compatible channels/spawn/timers on `wasm32-unknown-unknown` + backed by the JS event loop). Platform abstraction lives in a + `concurrency` module with three backends: `tokio_primitives`, + `async_std_primitives`, `wasm_browser_primitives`. +- **Erlang-style API**: Each actor declares a single `type Msg` enum. + The `handle()` method pattern-matches on it. State is separated from + the handler (`&self` + `&mut State`). +- **Supervision**: `spawn_linked()` establishes parent-child links. + `SupervisionEvent` notifies parents of child panics/deaths. No + built-in restart policies — left to the `handle_supervisor_evt` impl + (like Erlang's custom supervisor). +- **Request-reply**: `RpcReplyPort` for typed replies. `call()` and + `cast()` for ask/tell patterns. + +**Why not adopt ractor directly:** + +1. **Single-enum message type**: `type Msg: Message` requires one enum + per actor for all message types. This means every actor needs a + hand-written `match` over its message enum in `handle()`, and adding + a new message type requires modifying the enum + the match arm. With + per-message `Handler` traits, new message types are additive (just + implement another trait). For Willow's actors that handle 5-10+ + message types each, the enum approach produces large match blocks. +2. **Separated `&self` + `&mut State`**: The actor handler is immutable; + mutable state lives in a separate `State` type. This is idiomatic + Erlang but awkward in Rust — fields that logically belong together + (e.g., a `WorkerRole` + its config) are split across two types. +3. **Hard `Send + Sync` on `Actor`**: Requires all actor types to be + `Send + Sync`. This is stricter than necessary — actors are + single-owner by design, so `Sync` is never needed. +4. **`tokio_with_wasm` dependency**: Pulls in a full tokio-compatible + shim for WASM. Willow uses iroh (which handles WASM internally) + + `wasm_bindgen_futures::spawn_local` — adding another abstraction + layer over tokio's API on WASM is unnecessary indirection. +5. **Heavy dependency tree**: `dashmap`, `bon`, `strum`, `once_cell`, + plus the full `tokio_with_wasm` crate on WASM. Willow's actor system + needs only channels, oneshot, and spawn. + +### xtra — per-message handlers, lightweight + +[xtra](https://github.com/Restioson/xtra) (83k downloads, MPL-2.0) is +the closest match to our desired API shape: + +- **Multi-runtime**: tokio, async-std, smol, and `wasm_bindgen` via + feature flags. WASM spawns use `wasm_bindgen_futures::spawn_local`. +- **Per-message `Handler`**: Each message type gets its own `Handler` + impl with `type Return`. Request-reply via `Address::send()` returning + a `SendFuture` that resolves to the handler's return value. +- **Lightweight**: core deps are `catty`, `futures-core`, `event-listener`, + `spin`. No proc macros required (optional `xtra-macros`). +- **Actor lifecycle**: `started(&mut self, &Mailbox)` and + `stopped(self) -> Self::Stop`. +- **Address/Mailbox split**: `Address` for sending, `Mailbox` for + the actor's receive loop. + +**Why not adopt xtra directly:** + +1. **No supervision**: No restart policies or supervisor trees. Actors + that panic are simply gone. +2. **No `Recipient` / type-erased message targets**: xtra has + `MessageChannel` but it's less ergonomic than a standalone + `Recipient` type for pub-sub patterns. +3. **No interval support**: No built-in periodic tick mechanism. The + heartbeat/sync actors would still need manual timer loops. +4. **Low activity**: Last release Feb 2024, limited maintenance signal. + +### kameo — best API shape, no WASM + +[kameo](https://github.com/tqwewe/kameo) (190k downloads, MIT) has the +cleanest API design with per-message `Message` trait impls and +ask/tell naming: + +```rust +impl Message for MyActor { + type Reply = MyReply; + async fn handle(&mut self, msg: MyMsg, ctx: &mut Context<..>) -> Self::Reply; +} +``` + +It has OneForOne supervision, stream attachment, and actor linking. But +it depends on tokio directly with no WASM runtime support and no feature +flags for alternative runtimes. + +### kameo fork feasibility + +A source audit of kameo's tokio coupling reveals it is **shallow and +concentrated** — 6 primitives across 4 core files. The supervision +module has zero production tokio usage. Main work: swap `tokio::sync::mpsc` +for `futures::channel::mpsc` on WASM, `cfg`-gate `spawn_in_thread()` +and `blocking_send/recv`, replace `tokio::spawn` with `spawn_local` +(~200 lines). + +**Verdict: fork is feasible but not clearly better than writing our own.** + +The fork saves ~800 lines of actor machinery but introduces: +- Ongoing merge burden with an actively evolving upstream (v0.19.2) +- kameo's `remote` feature (libp2p-based) conflicts with iroh +- Extra deps: `downcast-rs`, `dyn-clone`, `serde` derive +- MSRV 1.88.0 (edition 2024) + +Writing `willow-actor` from scratch: ~1000-1500 lines for the core. +Comparable effort, no maintenance burden. + +### Iroh integration (complete) + +Willow has migrated from libp2p to iroh. The networking layer now uses +trait-based abstractions (`Network`, `TopicHandle`, `TopicEvents`, +`BlobStore`) backed by iroh's QUIC transport and `iroh-gossip` for +broadcast. See `docs/specs/2026-03-29-iroh-migration-design.md`. + +**Key facts for the actor system:** + +1. **Split runtime.** Tokio is native-only (`cfg(not(wasm32))`). On + WASM, the codebase uses `wasm-bindgen-futures::spawn_local`, + `futures::channel::mpsc`, and `gloo-timers`. The actor system needs + a thin runtime abstraction for spawn, channels, and timers — just + like the original design proposed. + +2. **`Send` is required.** The `Network` trait and its associated types + require `Send + Sync`. The client currently uses `Arc>` (to + be replaced by actors in Phase 2). On WASM, everything is trivially + `Send` (single-threaded), so this compiles without issue. + +3. **`Network` trait is generic.** Workers and client are generic over + `N: Network`, with `IrohNetwork` for production and `MemNetwork` for + tests. Actor types should also be generic over `Network` where they + interact with gossip, following the same pattern. + +4. **`TopicEvents` is a stream.** The `TopicEvents` trait has + `async fn next() -> Option>` — a natural fit for + `StreamHandler`. The network actor currently wraps this in a manual + `while let` loop; the actor system replaces that. + +5. **`TopicHandle` for broadcast.** The heartbeat and sync actors + currently take `T: TopicHandle` and call `topic.broadcast()`. With + the actor system, they hold the `TopicHandle` as actor state and + call it from message handlers. + +6. **Shutdown via `watch` channel.** Workers currently use + `tokio::sync::watch` for shutdown signaling. The actor system + replaces this — dropping all `Addr` handles closes the mailbox, + or `Context::stop()` signals graceful shutdown from within. + +### Recommendation: build `willow-actor` + +No existing crate satisfies all requirements (dual-target, supervision, +intervals, stream handlers, per-message handlers). The design below +combines: + +- **xtra/kameo's `Handler` pattern** — per-message-type trait impls + with typed returns, not a single enum +- **Thin runtime abstraction** — `cfg`-switched spawn/channel/timer for + tokio (native) vs futures-channel + gloo-timers (WASM) +- **`Send` unconditionally** — matches the `Network` trait's bounds, + compiles on WASM (everything is trivially Send on single-threaded targets) +- **Supervision, intervals, streams, `Recipient`** — features + missing from xtra + +--- + +## Overview + +With the iroh migration complete, Willow's networking is now trait-based +(`Network`, `TopicHandle`, `TopicEvents`) and generic — but the +concurrency patterns above the network layer are still hand-rolled: + +| Layer | Current pattern | Problem | +|-------|----------------|---------| +| Worker actors | `tokio::sync::mpsc` + `oneshot` + `watch`, 4 manual loops | Not reusable, manual shutdown via watch channel | +| Client lib | `Arc>` + `futures::channel::mpsc` | Shared mutable state behind locks, monolithic event loop | +| Web UI | `futures::channel::mpsc` + `spawn_local` | Duplicates client event loop logic | + +The worker crate already uses an actor pattern (state, network, heartbeat, +sync actors communicating via channels), but it's hand-rolled and not +reusable. The client and web crates reinvent the same pattern: spawn a +task, create channels, loop on `select!`, handle shutdown. + +`willow-actor` formalizes this into a single crate with a thin runtime +abstraction (tokio on native, futures-channel + gloo-timers on WASM) to +eliminate the per-crate boilerplate. + +## Goals + +1. **Dual-target**: native (tokio) + WASM (wasm-bindgen-futures), single API +2. **Typed mailboxes**: each actor defines its message type, no `Box` +3. **Request-reply**: first-class `ask()` with typed responses, no manual oneshot wiring +4. **Supervision**: restart policies for crashed actors (native), error propagation (WASM) +5. **No locks**: shared state lives inside actors, eliminating `Arc>` / `Arc>` — access is serialized through message passing +6. **Lightweight**: `Addr` send path has no dynamic dispatch (type-erased `Recipient` is opt-in) + +## Non-Goals + +- Distributed actors / remote messaging (iroh gossip handles that) +- Actor persistence / event sourcing (willow-state handles that) +- Bevy desktop app (out of scope for this migration) + +## Core Types + +### Message Trait + +```rust +/// Marker trait for actor messages. +pub trait Message: Send + 'static { + /// The response type for request-reply. Use `()` for fire-and-forget. + type Result: Send + 'static; +} +``` + +`Send` is required unconditionally. On WASM (single-threaded), all types +are trivially `Send`, so this compiles without issue. This matches the +`Network` trait's bounds (`Send + Sync` on associated types). + +### Actor Trait + +```rust +/// An actor processes messages sequentially in its own task. +pub trait Actor: Send + 'static + Sized { + /// Called once when the actor starts, before processing messages. + fn started(&mut self, ctx: &mut Context) + -> impl Future + Send { async {} } + + /// Called when the actor is stopping (mailbox closed or explicit stop). + fn stopped(&mut self) + -> impl Future + Send { async {} } + + /// Called after the mailbox drains all immediately-available messages. + /// The mailbox processes one message via recv().await, then drains + /// remaining messages via try_recv(), then calls idle(). Use this + /// for batched notifications — e.g., set a dirty flag in mutation + /// handlers, then notify subscribers in idle(). + fn idle(&mut self, ctx: &mut Context) + -> impl Future + Send { async {} } +} +``` + +Uses RPITIT (return-position impl trait in trait, stabilized in Rust +1.75) instead of `async_trait` — avoids the proc macro dependency and +Box allocation per handler call. + +### Handler Trait + +```rust +/// Implement Handler for each message type an actor accepts. +pub trait Handler: Actor { + fn handle(&mut self, msg: M, ctx: &mut Context) + -> impl Future + Send; +} +``` + +An actor can implement `Handler` for multiple message types. Each handler +is type-checked at compile time. + +### Context + +```rust +/// Provided to handlers — gives access to the actor's own address and system. +pub struct Context { + addr: Addr, + system: SystemHandle, +} + +impl Context { + /// Get this actor's own address (for self-sends or passing to children). + pub fn address(&self) -> Addr { ... } + + /// Spawn a child actor supervised by this actor. + pub fn spawn(&self, child: C) -> Addr { ... } + + /// Request a graceful stop after the current message finishes. + pub fn stop(&mut self) { ... } + + /// Access the actor system (for spawning unrelated actors). + pub fn system(&self) -> &SystemHandle { ... } +} +``` + +### Addr (Actor Address / Handle) + +```rust +/// Type-safe handle for sending messages to an actor. +/// Cheaply cloneable (wraps an Arc'd channel sender). +pub struct Addr { + tx: MessageSender, // unbounded mpsc sender (type-erased via runtime module) + _phantom: PhantomData, +} + +impl Addr { + /// Fire-and-forget: send a message, don't wait for a response. + /// Returns Err if the actor's mailbox is closed. + pub fn send(&self, msg: M) -> Result<(), SendError> + where + A: Handler, + M: Message, + { ... } + + /// Request-reply: send a message and await the response. + /// Returns a future that resolves to M::Result. + pub fn ask(&self, msg: M) -> impl Future> + where + A: Handler, + M: Message, + { ... } + + /// Check if the actor is still alive. + pub fn is_alive(&self) -> bool { ... } +} + +impl Clone for Addr { ... } +``` + +### AnyAddr (Type-Erased Address) + +For cases where you need to store addresses of different actor types +together (e.g. a supervisor tracking children): + +```rust +/// Type-erased actor address. Can send shutdown signals but not typed messages. +pub struct AnyAddr { ... } + +impl AnyAddr { + pub fn stop(&self) { ... } + pub fn is_alive(&self) -> bool { ... } +} + +impl From> for AnyAddr { ... } +``` + +### Recipient (Multi-Actor Message Target) + +For when multiple actor types handle the same message and you want to +abstract over the concrete actor: + +```rust +/// Type-erased handle that can send a specific message type. +/// Useful for pub-sub patterns where the sender doesn't know the actor type. +pub struct Recipient { + tx: Box + Send>, // internal trait, not public +} + +impl Recipient { + pub fn send(&self, msg: M) -> Result<(), SendError> { ... } + pub fn ask(&self, msg: M) -> impl Future> { ... } +} + +impl From> for Recipient +where + A: Handler, + M: Message, +{ ... } +``` + +## Actor System + +```rust +/// The actor system — owns the runtime and tracks all top-level actors. +pub struct System { + handle: SystemHandle, +} + +/// Cheap cloneable handle into the system. +#[derive(Clone)] +pub struct SystemHandle { ... } + +impl System { + /// Create a new actor system. + pub fn new() -> Self { ... } + + /// Spawn a top-level actor and return its address. + pub fn spawn(&self, actor: A) -> Addr { ... } + + /// Get a handle that can be passed to other contexts. + pub fn handle(&self) -> SystemHandle { ... } + + /// Shut down all actors gracefully. + pub async fn shutdown(self) { ... } +} +``` + +## Platform Abstraction + +Tokio is native-only. On WASM, the codebase uses `futures` channels and +`wasm-bindgen-futures`. The actor crate needs a thin `runtime` module +that abstracts over the three primitives that differ: + +```rust +// crate::runtime (internal) + +/// Spawn a future as a background task. +pub fn spawn + Send + 'static>(fut: F) { + #[cfg(not(target_arch = "wasm32"))] + { tokio::task::spawn(fut); } + + #[cfg(target_arch = "wasm32")] + wasm_bindgen_futures::spawn_local(fut); +} + +/// Unbounded MPSC channel. +pub fn unbounded_channel() -> (Sender, Receiver) { + #[cfg(not(target_arch = "wasm32"))] + { /* tokio::sync::mpsc::unbounded_channel */ } + + #[cfg(target_arch = "wasm32")] + { /* futures::channel::mpsc::unbounded */ } +} + +/// One-shot channel. +pub fn oneshot() -> (OneshotTx, OneshotRx) { + #[cfg(not(target_arch = "wasm32"))] + { /* tokio::sync::oneshot */ } + + #[cfg(target_arch = "wasm32")] + { /* futures::channel::oneshot */ } +} + +/// Sleep for a duration. +pub async fn sleep(duration: Duration) { + #[cfg(not(target_arch = "wasm32"))] + tokio::time::sleep(duration).await; + + #[cfg(target_arch = "wasm32")] + gloo_timers::future::sleep(duration).await; +} +``` + +The `Sender`/`Receiver` types are thin wrappers that unify the +`tokio::sync::mpsc` and `futures::channel::mpsc` APIs behind a common +interface. Both are unbounded and have nearly identical semantics. + +## Mailbox Internals + +Each actor gets a mailbox backed by an unbounded MPSC channel. Messages are +type-erased inside the mailbox using a closure-based envelope pattern: + +```rust +// Internal — not part of the public API. + +type BoxEnvelope = Box) -> BoxFuture<'_, ()> + Send + 'static>; + +// When Addr.send(msg) is called for M where A: Handler: +// 1. msg is wrapped in an envelope closure +// 2. The closure calls A::handle(msg, ctx) when executed +// 3. For ask(), a oneshot sender is captured in the closure +// and the response is sent back through it +``` + +This means the channel carries `BoxEnvelope` — one channel per actor, +handling all message types. No dynamic dispatch on the sender side; the +dispatch happens once when the envelope is executed. + +## Supervision + +```rust +/// Restart policy for supervised actors. +pub enum RestartPolicy { + /// Never restart (default). Errors are logged. + Never, + /// Restart immediately on panic/error, up to `max` times. + OnFailure { max: u32 }, + /// Restart with exponential backoff. + Backoff { initial: Duration, max_delay: Duration, max_retries: u32 }, +} + +impl Context { + /// Spawn a supervised child actor. + pub fn spawn_supervised( + &self, + child: C, + policy: RestartPolicy, + ) -> Addr { ... } +} +``` + +Panics are caught via `std::panic::catch_unwind`. On WASM, this works +only if the actor is `UnwindSafe`; otherwise `Never` is the only safe +option. + +## Streams + +Actors can subscribe to external event streams (e.g., network events, +timers) that feed into their mailbox: + +```rust +pub trait StreamHandler: Actor { + fn handle_stream_item(&mut self, item: S, ctx: &mut Context) + -> impl Future + Send; + + /// Called when the stream ends. + fn stream_finished(&mut self, _ctx: &mut Context) + -> impl Future + Send { async {} } +} + +impl Context { + /// Attach a stream to this actor. Items are delivered as messages. + pub fn add_stream(&mut self, stream: St) + where + A: StreamHandler, + S: 'static + Send, + St: Stream + Send + 'static, + { ... } +} +``` + +## Intervals + +Built-in support for periodic ticks (replaces the manual +`tokio::select! + sleep` pattern in heartbeat/sync actors): + +```rust +impl Context { + /// Start a periodic interval. Delivers messages to the actor + /// on each tick. Returns a handle that can cancel the interval. + pub fn run_interval>( + &mut self, + duration: Duration, + msg_factory: impl Fn() -> M + Send + 'static, + ) -> IntervalHandle + where + A: Handler, + { ... } +} + +pub struct IntervalHandle { ... } +impl IntervalHandle { + pub fn cancel(self) { ... } +} +``` + +## Error Handling + +```rust +#[derive(Debug, thiserror::Error)] +pub enum SendError { + #[error("actor mailbox is closed")] + Closed(M), +} + +#[derive(Debug, thiserror::Error)] +pub enum AskError { + #[error("actor mailbox is closed")] + Closed, + #[error("actor did not respond (dropped the reply channel)")] + NoResponse, +} +``` + +## Crate Structure + +``` +crates/actor/ +├── Cargo.toml +└── src/ + ├── lib.rs — public API re-exports + ├── actor.rs — Actor, Handler, StreamHandler, Message traits + ├── addr.rs — Addr, AnyAddr, Recipient + ├── context.rs — Context, interval, stream attachment + ├── envelope.rs — BoxEnvelope, type-erased message dispatch + ├── mailbox.rs — unbounded channel recv loop + ├── runtime.rs — platform abstraction (spawn, channels, timers) + ├── supervisor.rs — RestartPolicy, supervised spawn + ├── system.rs — System, SystemHandle + └── error.rs — SendError, AskError +``` + +## Dependency Graph + +``` +willow-actor (new) +├── futures-core (Stream trait) +├── thiserror +├── tracing +├── [native] tokio (sync: mpsc, oneshot; time: sleep) +└── [wasm] wasm-bindgen-futures (spawn_local) + futures-channel (mpsc, oneshot) + gloo-timers (sleep) +``` + +No `async-trait` needed — uses RPITIT (Rust 1.75+). `willow-actor` has +**no dependency on any other willow crate**. It is a pure infrastructure +crate. + +## Migration Path + +### Phase 1: Core crate + worker migration + +Create `crates/actor/` with the core types. Migrate the worker crate's +four hand-rolled actor loops to use `willow-actor`. This is the smallest +useful scope and the cleanest test case — the workers already have +well-defined actor boundaries. + +**Before** (current `crates/worker/src/runtime.rs`): +```rust +pub async fn run(role: Box, config: WorkerConfig, network: N) { + let (state_tx, state_rx) = mpsc::channel::(256); + let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); + let state_handle = tokio::spawn(state::run(role, state_rx)); + let network_handle = tokio::spawn(network::run(workers_events, state_tx.clone(), peer_id)); + let heartbeat_handle = tokio::spawn(heartbeat::run(peer_id, ..., workers_sender.clone(), shutdown_rx.clone())); + let sync_handle = tokio::spawn(sync::run(peer_id, ..., workers_sender, shutdown_rx)); + tokio::signal::ctrl_c().await?; + let _ = shutdown_tx.send(true); + let _ = state_tx.send(StateMsg::Shutdown).await; + let _ = tokio::join!(state_handle, network_handle, heartbeat_handle, sync_handle); +} +``` + +**After**: +```rust +pub async fn run(role: Box, config: WorkerConfig, network: N) { + let system = System::new(); + let state_addr = system.spawn(StateActor { role }); + let _network = system.spawn(NetworkActor::new(workers_events, state_addr.clone(), peer_id)); + let _heartbeat = system.spawn(HeartbeatActor::new(peer_id, state_addr.clone(), workers_sender.clone())); + let _sync = system.spawn(SyncActor::new(peer_id, state_addr, workers_sender)); + tokio::signal::ctrl_c().await?; + system.shutdown().await; +} +``` + +The `NetworkActor` uses `StreamHandler` to receive from +`TopicEvents`. The `HeartbeatActor` uses `ctx.run_interval()` instead +of a manual `tokio::select! + sleep` loop. Shutdown is implicit — +`system.shutdown()` drops all addresses, closing mailboxes. No more +`watch` channel. + +### Phase 2: Client library + +Replace `ClientHandle`'s `Arc>` with actors. +Shared state moves into a state actor — no more locks. The client event +loop becomes an actor with `StreamHandler` for `TopicEvents`. External +callers use `Addr` to send commands and `ask()` to query state. + +### Phase 3: Web UI + +Derived state actors bridge the actor system to Leptos signals. See +"Reactive Derived State" section below. + +## Reactive Derived State + +### Problem + +The UI needs to reactively update when shared state changes. Naive +approaches have drawbacks: + +- **Pull on every event** (`ask()` in `process_event_batch`): every + `ClientEvent` triggers async queries for all state slices, even if + most didn't change. Stale data possible between events. +- **Push full snapshots**: state actor sends complete state to the UI + on every mutation. Causes full re-renders, wasteful when only one + field changed. +- **`ClientEvent`-based flags** (current approach): `needs_msg_refresh`, + `needs_peer_refresh`, etc. — manually maintained, error-prone, and + tightly couples event types to signal updates. + +### Solution: Selector-Based Derived State + +Inspired by Yewdux's `use_selector` pattern. The state actor is the +single source of truth. Derived state actors subscribe to state changes, +run a selector function, and only update their signal when the selected +value actually changes. + +``` +Network → TopicListenerActor → mutations → ClientStateActor + ↓ idle() notifies after drain + StateChanged + ↓ + ┌──────────────────┼──────────────────┐ + ▼ ▼ ▼ + DerivedActor DerivedActor DerivedActor + selector: peers() selector: msgs() selector: channels() + cached: [A, B] cached: [m1, m2] cached: [#gen, #dev] + │ │ │ + (if changed) (if changed) (if changed) + ▼ ▼ ▼ + signal.set([A,B,C]) (no update) signal.set(...) +``` + +### How it works + +1. **`ClientStateActor`** holds `SharedState`, a list of + `Recipient` subscribers, and a `dirty` flag. Mutation + handlers set `dirty = true`. The `idle()` hook (called after the + mailbox drains all pending messages) checks the flag, sends + `StateChanged` to all subscribers if dirty, and resets. This + batches a burst of mutations into a single notification round. + +2. **`DerivedStateActor`** is a generic actor parameterized by: + - A selector: `Arc T + Send + Sync>` + (Arc because it's cloned into each `ReadState` closure) + - A cached value: `Option` (last known value) + - A signal writer (Leptos `WriteSignal` via `SendWrapper`) + +3. On receiving `StateChanged`, the derived actor sends a + `ReadState(selector)` ask to the state actor, which runs the + selector against current state and returns `T`. + +4. The derived actor compares the new `T` with its cached value. If + different (via `PartialEq`), it updates the signal and caches the + new value. If equal, it does nothing — no re-render. + +### API sketch (web crate, not in willow-actor) + +```rust +/// Create a derived Leptos signal backed by a state actor selector. +/// Returns a ReadSignal that updates only when the selected value changes. +fn derived_signal( + state_addr: &Addr, + system: &SystemHandle, + selector: impl Fn(&SharedState) -> T + Send + Sync + 'static, +) -> ReadSignal { + let (read, write) = create_signal(T::default()); + system.spawn(DerivedStateActor { + state_addr: state_addr.clone(), + selector: Arc::new(selector), + cached: None, + write: SendWrapper::new(write), + }); + // The DerivedStateActor's started() hook immediately asks the state + // actor for the current value, seeding the signal. Until that first + // ask completes, the signal holds T::default(). + read +} +``` + +### Ephemeral state is the same pattern + +Typing indicators, connection status, voice participants — these are +all just state owned by an actor. There's no reason to treat them +differently. Each gets its own actor (or they live as fields in a +single actor) with the same selector/notify pattern: + +- **`ConnectionActor`**: holds connected peers, relay status. Selectors: + `|s| s.peers.len()`, `|s| s.connection_status.clone()` +- **`TypingActor`**: holds typing peer map with expiry. Selectors: + `|s| s.typing_in("general")` +- **`VoiceActor`**: holds voice participants, mute/deafen state. + Selectors: `|s| s.participants.clone()` + +Or these can all be fields in `ClientStateActor` — the derived state +pattern works the same regardless. The key insight is that **all UI +state flows through actors with selector-based notification**. No +separate event channels, no special cases. + +### Benefits + +- **No stale data**: signals always reflect the latest state +- **Minimal re-renders**: only updates when the selected slice changes +- **No manual event mapping**: no `needs_msg_refresh` flags, no + `process_event_batch` function matching events to signal updates +- **Decoupled**: adding a new signal is one `derived_signal()` call, + no changes to event processing +- **Uniform**: persistent state, ephemeral state, and UI state all + use the same actor → selector → signal pattern. No special channels + or event types for different categories of state. +- **Eliminates `ClientEvent` entirely**: all state flows through actors. + The `ClientEvent` enum, the event channel, and `process_event_batch` + are all removed. + +### Notification cost + +Notifications are batched via the `Actor::idle()` hook. A burst of +mutations (e.g., a sync batch applying 10 events) is processed in one +drain cycle, then triggers a single `StateChanged` round. Each round +sends N messages to subscribers (one per derived signal) plus N +`ReadState` ask round-trips. With ~15-20 derived signals and in-process +message passing (no I/O, same thread on WASM), this is sub-millisecond. +The PartialEq check prevents signal updates from propagating further. + +### Cloning cost + +Selectors return owned `T`, so each `ReadState` clones data out of the +actor. For expensive fields (e.g., `Vec` with hundreds +of entries), wrap the field in `Arc` inside `SharedState`. The selector +then clones the `Arc` (cheap pointer bump) instead of deep-cloning. +`PartialEq` on `Arc>` compares pointers first — if the `Arc` +wasn't replaced, the check is O(1). Apply `Arc` wrapping selectively +to fields where cloning is measurably expensive. + +## Decisions + +1. **Mailboxes are unbounded.** `send()` returns `Err` only if the + actor is dead (mailbox closed). Bounded mailboxes can be added later + if backpressure becomes necessary. + +2. **FIFO, no priority messages.** Shutdown is achieved by dropping all + `Addr` handles (closes the mailbox) or via `Context::stop()`. + +3. **Shared state lives in actors.** `Arc>` in the + client library is replaced by a state actor. External code queries + state via `ask()`. This eliminates all locks from the hot path — + the actor processes messages sequentially, so no synchronization is + needed inside the actor. + +4. **Derived state for UI signals.** Leptos signals are updated via + selector-based derived state actors, not event batch processing. + Signals only re-render when their selected value actually changes.