From 72b4ed8f0d1569f421dcb26ea3829bdc4619d758 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 8 Apr 2026 05:39:02 +0000 Subject: [PATCH] Fix 3 bugs: bounded actor mailboxes, worker startup race, stale admin signal (#78, #79, #81) - #78: Replace unbounded actor mailbox channels with bounded (capacity 10,000) to prevent OOM DoS from message flooding. Add spawn_with_capacity() API, diagnostic logging for dropped messages, and shutdown deadlock protection. - #79: Add watch-channel ready signal between StateActor and NetworkActor so gossip drain tasks wait for StateActor initialization before processing events. - #81: Replace get_untracked() with reactive get() for peer_id in member_list.rs permission checks (lines 46, 124, 178, 227) for consistent reactive patterns. https://claude.ai/code/session_0138qk6y8FcdTUbrVFaH7FXH --- crates/actor/src/lib.rs | 106 +++++++++++++ crates/actor/src/runtime.rs | 52 +++++-- crates/actor/src/supervisor.rs | 2 +- crates/actor/src/system.rs | 19 ++- crates/client/src/lib.rs | 3 +- crates/web/src/components/member_list.rs | 8 +- crates/web/tests/browser.rs | 95 ++++++++++++ crates/worker/src/actors/heartbeat.rs | 1 + crates/worker/src/actors/network.rs | 23 +++ crates/worker/src/actors/state.rs | 11 ++ crates/worker/src/actors/sync.rs | 1 + crates/worker/src/runtime.rs | 11 +- crates/worker/tests/integration.rs | 188 +++++++++++++++++++++++ 13 files changed, 494 insertions(+), 26 deletions(-) diff --git a/crates/actor/src/lib.rs b/crates/actor/src/lib.rs index fc9aede4..873cf3f5 100644 --- a/crates/actor/src/lib.rs +++ b/crates/actor/src/lib.rs @@ -606,6 +606,112 @@ mod tests { assert!(tx.try_send(3).is_err()); } + // ───── Bounded mailbox (issue #78) ─────────────────────────────────── + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn mailbox_drops_messages_when_capacity_exceeded() { + // Issue #78: Actor mailboxes must be bounded to prevent OOM DoS. + // With unbounded channels, send() would never return Err due to + // capacity — only if the receiver is closed. With bounded channels, + // send() returns Err when the mailbox is full. + + // Test directly at the channel level — this is deterministic. + let (tx, _rx) = runtime::channel::(4); + assert!(tx.send(1).is_ok()); + assert!(tx.send(2).is_ok()); + assert!(tx.send(3).is_ok()); + assert!(tx.send(4).is_ok()); + // Channel is full — this should fail. + assert!( + tx.send(5).is_err(), + "expected mailbox full error, but send succeeded (unbounded!)" + ); + + // Now test via actor spawn_with_capacity — use a slow actor to + // ensure we can fill the mailbox. + struct SlowActor; + impl Actor for SlowActor {} + + struct SlowMsg; + impl Message for SlowMsg { + type Result = (); + } + impl Handler for SlowActor { + fn handle( + &mut self, + _msg: SlowMsg, + _ctx: &mut Context, + ) -> impl std::future::Future + Send { + async { + runtime::sleep(Duration::from_millis(200)).await; + } + } + } + + let system = System::new(); + let addr = system.handle().spawn_with_capacity(SlowActor, 4); + + // Send one message to trigger processing (actor will be busy 200ms). + addr.do_send(SlowMsg).unwrap(); + // Let the actor dequeue it and begin handling. + runtime::sleep(Duration::from_millis(30)).await; + + // Now fill the remaining mailbox capacity (4 slots). + for _ in 0..4 { + let _ = addr.do_send(SlowMsg); + } + + // The mailbox should now be full — next send should fail. + let result = addr.do_send(SlowMsg); + assert!( + result.is_err(), + "expected mailbox full error, but send succeeded (unbounded!)" + ); + + system.shutdown().await; + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn shutdown_succeeds_with_full_mailbox() { + // Ensure shutdown doesn't deadlock when a mailbox is full. + // The stop flag (AtomicBool) is set even if the noop wake-up + // message is dropped due to a full mailbox. + struct SlowActor; + impl Actor for SlowActor {} + + struct Block; + impl Message for Block { + type Result = (); + } + impl Handler for SlowActor { + fn handle( + &mut self, + _msg: Block, + _ctx: &mut Context, + ) -> impl std::future::Future + Send { + async { + runtime::sleep(Duration::from_millis(50)).await; + } + } + } + + let system = System::new(); + let addr = system.handle().spawn_with_capacity(SlowActor, 2); + + // Fill the mailbox. + for _ in 0..4 { + let _ = addr.do_send(Block); + } + + // Shutdown should complete even though the noop wake-up may + // be dropped due to the full mailbox. + let shutdown = tokio::time::timeout(Duration::from_secs(5), system.shutdown()); + assert!( + shutdown.await.is_ok(), + "shutdown deadlocked with full mailbox" + ); + } + // ───── Supervision ───────────────────────────────────────────────────── #[tokio::test(flavor = "multi_thread", worker_threads = 2)] diff --git a/crates/actor/src/runtime.rs b/crates/actor/src/runtime.rs index a8a71f58..22d50113 100644 --- a/crates/actor/src/runtime.rs +++ b/crates/actor/src/runtime.rs @@ -28,14 +28,23 @@ pub async fn sleep(duration: Duration) { gloo_timers::future::sleep(duration).await; } -// ───── Unbounded MPSC channel ────────────────────────────────────────────── +// ───── Default mailbox capacity ─────────────────────────────────────────── -/// Sender half of an unbounded MPSC channel. +/// Default capacity for actor mailbox channels. +/// +/// This prevents unbounded memory growth from message flooding (OOM DoS). +/// Actors whose mailbox reaches this limit will have messages dropped with +/// a warning log. +pub const DEFAULT_MAILBOX_CAPACITY: usize = 10_000; + +// ───── Bounded MPSC channel (actor mailbox) ────────────────────────────── + +/// Sender half of a bounded MPSC channel used for actor mailboxes. #[cfg(not(target_arch = "wasm32"))] -pub struct Sender(tokio::sync::mpsc::UnboundedSender); +pub struct Sender(tokio::sync::mpsc::Sender); #[cfg(target_arch = "wasm32")] -pub struct Sender(futures_channel::mpsc::UnboundedSender); +pub struct Sender(futures_channel::mpsc::Sender); impl Clone for Sender { fn clone(&self) -> Self { @@ -44,16 +53,27 @@ impl Clone for Sender { } impl Sender { - /// Send a value. Returns `Err` if the receiver is closed. + /// Try to send a value. Returns `Err(val)` if the channel is full or closed. pub fn send(&self, val: T) -> Result<(), T> { #[cfg(not(target_arch = "wasm32"))] { - self.0.send(val).map_err(|e| e.0) + self.0.try_send(val).map_err(|e| { + if matches!(e, tokio::sync::mpsc::error::TrySendError::Full(_)) { + tracing::warn!("actor mailbox full, dropping message"); + } + e.into_inner() + }) } #[cfg(target_arch = "wasm32")] { - self.0.unbounded_send(val).map_err(|e| e.into_inner()) + let mut sender = self.0.clone(); + sender.try_send(val).map_err(|e| { + if e.is_full() { + tracing::warn!("actor mailbox full, dropping message"); + } + e.into_inner() + }) } } @@ -63,12 +83,12 @@ impl Sender { } } -/// Receiver half of an unbounded MPSC channel. +/// Receiver half of a bounded MPSC channel used for actor mailboxes. #[cfg(not(target_arch = "wasm32"))] -pub struct Receiver(tokio::sync::mpsc::UnboundedReceiver); +pub struct Receiver(tokio::sync::mpsc::Receiver); #[cfg(target_arch = "wasm32")] -pub struct Receiver(futures_channel::mpsc::UnboundedReceiver); +pub struct Receiver(futures_channel::mpsc::Receiver); impl Receiver { /// Wait for the next value. Returns `None` when all senders are dropped. @@ -104,17 +124,21 @@ impl Receiver { } } -/// Create an unbounded MPSC channel. -pub fn unbounded_channel() -> (Sender, Receiver) { +/// Create a bounded MPSC channel with the given capacity. +/// +/// This replaces the former `unbounded_channel()` to prevent OOM DoS attacks +/// from message flooding. When the channel is full, `Sender::send()` returns +/// an error and the message is dropped. +pub fn channel(capacity: usize) -> (Sender, Receiver) { #[cfg(not(target_arch = "wasm32"))] { - let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let (tx, rx) = tokio::sync::mpsc::channel(capacity); (Sender(tx), Receiver(rx)) } #[cfg(target_arch = "wasm32")] { - let (tx, rx) = futures_channel::mpsc::unbounded(); + let (tx, rx) = futures_channel::mpsc::channel(capacity); (Sender(tx), Receiver(rx)) } } diff --git a/crates/actor/src/supervisor.rs b/crates/actor/src/supervisor.rs index 3b2389ac..6f93804b 100644 --- a/crates/actor/src/supervisor.rs +++ b/crates/actor/src/supervisor.rs @@ -35,7 +35,7 @@ pub(crate) fn spawn_supervised( policy: RestartPolicy, system: SystemHandle, ) -> Addr { - let (tx, rx) = runtime::unbounded_channel::>(); + let (tx, rx) = runtime::channel::>(runtime::DEFAULT_MAILBOX_CAPACITY); let addr = Addr::new(tx.clone()); runtime::spawn(supervisor_loop(actor, policy, rx, tx, system, addr.clone())); diff --git a/crates/actor/src/system.rs b/crates/actor/src/system.rs index 12076bcd..b47ef62a 100644 --- a/crates/actor/src/system.rs +++ b/crates/actor/src/system.rs @@ -12,7 +12,7 @@ use crate::addr::Addr; use crate::context::Context; use crate::envelope::BoxEnvelope; use crate::mailbox; -use crate::runtime::{self, OneshotRx, Sender}; +use crate::runtime::{self, OneshotRx, Sender, DEFAULT_MAILBOX_CAPACITY}; // ───── SystemActor (internal) ────────────────────────────────────────── @@ -102,7 +102,7 @@ impl System { pub fn new() -> Self { // Bootstrap: spawn the SystemActor directly (it can't // register itself — it IS the registry). - let (tx, rx) = runtime::unbounded_channel(); + let (tx, rx) = runtime::channel(DEFAULT_MAILBOX_CAPACITY); let addr = Addr::new(tx.clone()); let stop = Arc::new(AtomicBool::new(false)); let (done_tx, _done_rx) = runtime::oneshot(); @@ -174,7 +174,16 @@ impl SystemHandle { /// Spawn a top-level actor and return its address. pub fn spawn(&self, actor: A) -> Addr { - let (tx, rx) = runtime::unbounded_channel(); + self.spawn_with_capacity(actor, DEFAULT_MAILBOX_CAPACITY) + } + + /// Spawn a top-level actor with a custom mailbox capacity. + /// + /// Use this when an actor has different backpressure needs than the + /// default. Also useful in tests to verify bounded mailbox behavior + /// with a small capacity. + pub fn spawn_with_capacity(&self, actor: A, capacity: usize) -> Addr { + let (tx, rx) = runtime::channel(capacity); let addr = Addr::new(tx.clone()); let stop = Arc::new(AtomicBool::new(false)); let (done_tx, done_rx) = runtime::oneshot(); @@ -184,7 +193,9 @@ impl SystemHandle { runtime::spawn(mailbox::run_mailbox(actor, ctx, rx, stop.clone(), done_tx)); // Create a stop signal that sets the flag AND sends a no-op envelope - // to wake up the mailbox if it's blocked on recv(). + // to wake up the mailbox if it's blocked on recv(). If the mailbox + // is full the noop is dropped, but the stop flag is still set — the + // actor will notice it on the next message or idle cycle. let signal_stop = { let stop = stop.clone(); let tx = tx; diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index 4ae12a5a..9e40724f 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -66,7 +66,8 @@ pub mod event_receiver { broker: &Addr>, system: &willow_actor::SystemHandle, ) -> Self { - let (tx, rx) = willow_actor::runtime::unbounded_channel(); + let (tx, rx) = + willow_actor::runtime::channel(willow_actor::runtime::DEFAULT_MAILBOX_CAPACITY); let addr = system.spawn(ForwarderActor { tx }); let recipient = addr.into(); let _ = broker.ask(BrokerSubscribe(recipient)).await; diff --git a/crates/web/src/components/member_list.rs b/crates/web/src/components/member_list.rs index c032d88c..7a02f011 100644 --- a/crates/web/src/components/member_list.rs +++ b/crates/web/src/components/member_list.rs @@ -43,7 +43,7 @@ pub fn MemberList( .iter() .filter(|(pid, _, _)| { sync_providers.contains(pid) - && pid != &peer_id.get_untracked() + && pid != &peer_id.get() && *pid != owner_str }) .cloned() @@ -121,7 +121,7 @@ pub fn MemberList( .filter(|(pid, _, _)| { // Exclude workers from the members section. !sync_providers.contains(pid) - || pid == &peer_id.get_untracked() + || pid == &peer_id.get() || *pid == owner_str }) .collect::>() @@ -175,7 +175,7 @@ pub fn MemberList( let hu = handle_untrust.clone(); let pk = pid_kick.clone(); move || { - let is_admin = app_state.server.admin_ids.get().contains(&peer_id.get_untracked()); + let is_admin = app_state.server.admin_ids.get().contains(&peer_id.get()); if is_self() || !is_admin { None } else { @@ -224,7 +224,7 @@ pub fn MemberList( let sync_providers = app_state.server.sync_provider_ids.get(); let non_worker_count = all.iter().filter(|(pid, _, _)| { !sync_providers.contains(pid) - || pid == &peer_id.get_untracked() + || pid == &peer_id.get() || *pid == owner_str }).count(); if non_worker_count == 0 { diff --git a/crates/web/tests/browser.rs b/crates/web/tests/browser.rs index 7121c795..57298822 100644 --- a/crates/web/tests/browser.rs +++ b/crates/web/tests/browser.rs @@ -3632,3 +3632,98 @@ async fn dropdown_delete_has_danger_class() { "danger button should say 'Delete'" ); } + +// ── Admin Signal Reactivity Tests (Issue #81) ─────────────────────────────── + +#[wasm_bindgen_test] +async fn admin_buttons_hide_when_admin_status_revoked() { + // Issue #81: Admin buttons must reactively hide when admin_ids changes. + // Tests that using get() (not get_untracked()) for admin_ids makes the + // UI update when admin status is revoked. + let (admin_ids, set_admin_ids) = + signal(std::collections::HashSet::from(["peer-a".to_string()])); + let peer_id = "peer-a".to_string(); + + let container = mount_test(move || { + let pid = peer_id.clone(); + view! { + {move || { + let is_admin = admin_ids.get().contains(&pid); + if is_admin { + Some(view! { +
+ + +
+ }) + } else { + None + } + }} + } + }); + + tick().await; + + // Admin buttons should be visible. + assert!( + query(&container, ".admin-actions").is_some(), + "admin buttons should be visible when peer is admin" + ); + + // Revoke admin status. + set_admin_ids.set(std::collections::HashSet::new()); + tick().await; + + // Admin buttons should now be hidden. + assert!( + query(&container, ".admin-actions").is_none(), + "admin buttons should hide after admin status revoked" + ); +} + +#[wasm_bindgen_test] +async fn admin_buttons_respond_to_peer_id_change() { + // Issue #81: Using get() on peer_id (instead of get_untracked()) ensures + // the UI updates when the local peer_id signal changes. + let admin_set = std::collections::HashSet::from(["peer-a".to_string()]); + let (admin_ids, _) = signal(admin_set); + let (peer_id, set_peer_id) = signal("peer-a".to_string()); + + let container = mount_test(move || { + view! { + {move || { + let is_admin = admin_ids.get().contains(&peer_id.get()); + if is_admin { + Some(view! { +
+ +
+ }) + } else { + None + } + }} + } + }); + + tick().await; + + // Initially peer-a is admin — buttons visible. + assert!( + query(&container, ".admin-actions").is_some(), + "admin buttons should be visible for peer-a" + ); + + // Change peer_id to peer-b (not in admin set). + set_peer_id.set("peer-b".to_string()); + tick().await; + + // Buttons should now be hidden because peer-b is not admin. + // With get_untracked(), this would NOT update — the stale value + // "peer-a" would still be checked against admin_ids. + assert!( + query(&container, ".admin-actions").is_none(), + "admin buttons should hide when peer_id changes to non-admin" + ); +} diff --git a/crates/worker/src/actors/heartbeat.rs b/crates/worker/src/actors/heartbeat.rs index 50dde1ec..cff39f77 100644 --- a/crates/worker/src/actors/heartbeat.rs +++ b/crates/worker/src/actors/heartbeat.rs @@ -145,6 +145,7 @@ mod tests { let state_addr = system.spawn(StateActor { role: Box::new(TestRole), + ready: None, }); let test_peer = net_a.id(); diff --git a/crates/worker/src/actors/network.rs b/crates/worker/src/actors/network.rs index bf17f567..c64f596e 100644 --- a/crates/worker/src/actors/network.rs +++ b/crates/worker/src/actors/network.rs @@ -20,6 +20,9 @@ pub struct NetworkActor { /// Optional SERVER_OPS topic events stream. ops_events: Option, reply_topic: T, + /// Optional ready signal — drain tasks wait for `true` before pulling events. + /// Uses `watch` so late subscribers see the value even if StateActor started first. + ready: Option>, } impl NetworkActor { @@ -35,6 +38,7 @@ impl NetworkActor { events: Some(events), ops_events: None, reply_topic, + ready: None, } } @@ -44,6 +48,13 @@ impl NetworkActor { self.ops_events = Some(ops_events); self } + + /// Attach a ready signal. Drain tasks will wait for `true` before pulling + /// events, ensuring the `StateActor` has completed initialization. + pub fn with_ready_signal(mut self, ready: tokio::sync::watch::Receiver) -> Self { + self.ready = Some(ready); + self + } } /// Internal message wrapping a gossip event for the network actor. @@ -60,10 +71,17 @@ impl willow_actor::Message for ServerOpsEventMsg { impl Actor for NetworkActor { fn started(&mut self, ctx: &mut Context) -> impl std::future::Future + Send { + let ready = self.ready.take(); + // Spawn a task that drains WORKERS topic events. if let Some(mut events) = self.events.take() { let addr = ctx.address(); + let mut ready = ready.clone(); willow_actor::runtime::spawn(async move { + // Wait for StateActor to be ready before draining events. + if let Some(ref mut rx) = ready { + let _ = rx.wait_for(|v| *v).await; + } while let Some(Ok(event)) = events.next().await { if addr.do_send(GossipEventMsg(event)).is_err() { break; @@ -74,7 +92,12 @@ impl Actor for NetworkActor< // Spawn a second task that drains SERVER_OPS topic events. if let Some(mut ops_events) = self.ops_events.take() { let addr = ctx.address(); + let mut ready = ready; willow_actor::runtime::spawn(async move { + // Wait for StateActor to be ready before draining events. + if let Some(ref mut rx) = ready { + let _ = rx.wait_for(|v| *v).await; + } while let Some(Ok(event)) = ops_events.next().await { if addr.do_send(ServerOpsEventMsg(event)).is_err() { break; diff --git a/crates/worker/src/actors/state.rs b/crates/worker/src/actors/state.rs index 91553c69..2e7e530d 100644 --- a/crates/worker/src/actors/state.rs +++ b/crates/worker/src/actors/state.rs @@ -14,6 +14,10 @@ use crate::WorkerRole; /// The state actor holds the worker's mutable role and processes messages sequentially. pub struct StateActor { pub role: Box, + /// Optional ready signal — set to `true` when `started()` completes so + /// other actors (e.g. `NetworkActor`) can wait before draining events. + /// Uses `watch` channel so late subscribers see the value immediately. + pub ready: Option>, } impl Actor for StateActor { @@ -22,6 +26,9 @@ impl Actor for StateActor { _ctx: &mut Context, ) -> impl std::future::Future + Send { debug!("state actor started"); + if let Some(ready) = self.ready.take() { + let _ = ready.send(true); + } async {} } @@ -147,6 +154,7 @@ mod tests { let system = System::new(); let addr = system.spawn(StateActor { role: Box::new(TestRole::new()), + ready: None, }); for _ in 0..3 { @@ -169,6 +177,7 @@ mod tests { let system = System::new(); let addr = system.spawn(StateActor { role: Box::new(TestRole::new()), + ready: None, }); // Sync request. @@ -207,6 +216,7 @@ mod tests { let system = System::new(); let addr = system.spawn(StateActor { role: Box::new(TestRole::new()), + ready: None, }); assert!(addr.is_alive()); @@ -225,6 +235,7 @@ mod tests { let system = System::new(); let addr = system.spawn(StateActor { role: Box::new(TestRole::new()), + ready: None, }); let mut futs = vec![]; diff --git a/crates/worker/src/actors/sync.rs b/crates/worker/src/actors/sync.rs index efc68614..f4556ec2 100644 --- a/crates/worker/src/actors/sync.rs +++ b/crates/worker/src/actors/sync.rs @@ -133,6 +133,7 @@ mod tests { let system = System::new(); let state_addr = system.spawn(StateActor { role: Box::new(TestSyncRole), + ready: None, }); let addr = system.spawn(SyncActor::new( diff --git a/crates/worker/src/runtime.rs b/crates/worker/src/runtime.rs index 0b592e39..7a433dd1 100644 --- a/crates/worker/src/runtime.rs +++ b/crates/worker/src/runtime.rs @@ -34,9 +34,15 @@ pub async fn run( let (_ops_sender, ops_events) = network.subscribe(ops_topic_id, vec![]).await?; // Create actor system and spawn actors. + // The ready signal ensures NetworkActor waits for StateActor to + // complete initialization before draining gossip events. let system = System::new(); + let (ready_tx, ready_rx) = tokio::sync::watch::channel(false); - let state_addr = system.spawn(StateActor { role }); + let state_addr = system.spawn(StateActor { + role, + ready: Some(ready_tx), + }); let _network = system.spawn( NetworkActor::new( @@ -45,7 +51,8 @@ pub async fn run( peer_id, workers_sender.clone(), ) - .with_ops_events(ops_events), + .with_ops_events(ops_events) + .with_ready_signal(ready_rx), ); let _heartbeat = system.spawn(HeartbeatActor::new( diff --git a/crates/worker/tests/integration.rs b/crates/worker/tests/integration.rs index b00fb7b6..3a506ee1 100644 --- a/crates/worker/tests/integration.rs +++ b/crates/worker/tests/integration.rs @@ -95,6 +95,7 @@ async fn state_actor_with_replay_role_full_flow() { let system = System::new(); let addr = system.spawn(StateActor { role: Box::new(TestReplayRole::new("srv-1", 100)), + ready: None, }); // 1. Ingest 5 events. @@ -176,6 +177,7 @@ async fn heartbeat_and_state_actor_interaction() { let state_addr = system.spawn(StateActor { role: Box::new(TestReplayRole::new("srv-1", 100)), + ready: None, }); let test_worker_id = net_a.id(); @@ -220,6 +222,7 @@ async fn concurrent_requests_all_resolve() { let system = System::new(); let addr = system.spawn(StateActor { role: Box::new(TestReplayRole::new("srv-1", 100)), + ready: None, }); // Fire 50 concurrent requests. @@ -249,6 +252,7 @@ async fn events_applied_then_queried_via_request() { let system = System::new(); let addr = system.spawn(StateActor { role: Box::new(TestReplayRole::new("srv-1", 5)), + ready: None, }); // Ingest 10 events into a buffer of size 5. @@ -306,6 +310,7 @@ async fn graceful_shutdown_sends_departure() { let state_addr = system.spawn(StateActor { role: Box::new(TestReplayRole::new("srv-1", 100)), + ready: None, }); let departing_id = net_a.id(); @@ -360,6 +365,7 @@ async fn full_actor_orchestration_without_network() { let state_addr = system.spawn(StateActor { role: Box::new(TestReplayRole::new("srv-1", 100)), + ready: None, }); let orch_id = net_a.id(); @@ -512,6 +518,7 @@ async fn server_ops_events_forwarded_to_state() { // State actor with a replay role that tracks ingested events. let state_addr = system.spawn(StateActor { role: Box::new(TestReplayRole::new("srv-1", 100)), + ready: None, }); // Create channels for both WORKERS and SERVER_OPS streams. @@ -578,3 +585,184 @@ async fn server_ops_events_forwarded_to_state() { system.shutdown().await; } + +/// Issue #79: Verify that the ready signal prevents NetworkActor from draining +/// gossip events before StateActor has completed initialization. +/// +/// Without the ready signal, pre-buffered events could arrive at StateActor +/// before its `started()` hook completes. The fix gates the drain tasks on +/// a `watch` channel that StateActor sets to `true` after `started()`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn pre_buffered_events_wait_for_state_ready_signal() { + let system = System::new(); + let (ready_tx, ready_rx) = tokio::sync::watch::channel(false); + + // State actor — we pass the ready sender so it fires on started(). + let state_addr = system.spawn(StateActor { + role: Box::new(TestReplayRole::new("srv-1", 100)), + ready: Some(ready_tx), + }); + + // Pre-buffer 3 signed events in the ops channel BEFORE spawning NetworkActor. + let (_workers_tx, workers_rx) = tokio::sync::mpsc::channel::(16); + let (ops_tx, ops_rx) = tokio::sync::mpsc::channel::(16); + + let sender_id = Identity::generate(); + + // EventDag requires CreateServer as genesis event (seq=1). + let genesis = Event::new( + &sender_id, + 1, + EventHash::ZERO, + vec![], + EventKind::CreateServer { + name: "srv-1".to_string(), + }, + 0, + ); + let mut prev_hash = genesis.hash; + + let genesis_data = + willow_common::pack_wire(&willow_common::WireMessage::Event(genesis), &sender_id).unwrap(); + ops_tx + .send(GossipEvent::Received(GossipMessage { + content: bytes::Bytes::from(genesis_data), + sender: sender_id.endpoint_id(), + })) + .await + .unwrap(); + + // Buffer 2 more events (messages). + for seq in 2..=3 { + let event = Event::new( + &sender_id, + seq, + prev_hash, + vec![], + EventKind::Message { + channel_id: "general".to_string(), + body: format!("msg-{seq}"), + reply_to: None, + }, + seq * 1000, + ); + prev_hash = event.hash; + + let data = willow_common::pack_wire(&willow_common::WireMessage::Event(event), &sender_id) + .unwrap(); + + ops_tx + .send(GossipEvent::Received(GossipMessage { + content: bytes::Bytes::from(data), + sender: sender_id.endpoint_id(), + })) + .await + .unwrap(); + } + + let worker_id = Identity::generate(); + let peer_id = worker_id.endpoint_id(); + + // Spawn NetworkActor with the ready signal — drain tasks should wait. + let _network = system.spawn( + NetworkActor::new( + MockTopicEvents { rx: workers_rx }, + state_addr.clone(), + peer_id, + MockTopicHandle, + ) + .with_ops_events(MockTopicEvents { rx: ops_rx }) + .with_ready_signal(ready_rx), + ); + + // StateActor's started() fires the watch channel, so the + // drain tasks should begin and process all pre-buffered events. + tokio::time::sleep(Duration::from_millis(200)).await; + + // Verify all 3 pre-buffered events were processed by StateActor. + let info = state_addr.ask(GetRoleInfoMsg).await.unwrap(); + match info { + WorkerRoleInfo::Replay { + events_buffered, .. + } => { + assert_eq!( + events_buffered, 3, + "all pre-buffered events should have been processed after ready signal" + ); + } + _ => panic!("expected Replay"), + } + + system.shutdown().await; +} + +/// Issue #79: Verify that without a ready signal, NetworkActor drains +/// immediately (backward compatibility for tests and simple setups). +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn network_actor_drains_immediately_without_ready_signal() { + let system = System::new(); + + let state_addr = system.spawn(StateActor { + role: Box::new(TestReplayRole::new("srv-1", 100)), + ready: None, + }); + + let (_workers_tx, workers_rx) = tokio::sync::mpsc::channel::(16); + let (ops_tx, ops_rx) = tokio::sync::mpsc::channel::(16); + + let worker_id = Identity::generate(); + let peer_id = worker_id.endpoint_id(); + + let _network = system.spawn( + NetworkActor::new( + MockTopicEvents { rx: workers_rx }, + state_addr.clone(), + peer_id, + MockTopicHandle, + ) + .with_ops_events(MockTopicEvents { rx: ops_rx }), + // No ready signal — drain starts immediately. + ); + + // Allow actors to start. + tokio::time::sleep(Duration::from_millis(50)).await; + + // Send a CreateServer event (required as genesis for EventDag). + let sender_id = Identity::generate(); + let event = Event::new( + &sender_id, + 1, + EventHash::ZERO, + vec![], + EventKind::CreateServer { + name: "srv-1".to_string(), + }, + 0, + ); + let data = + willow_common::pack_wire(&willow_common::WireMessage::Event(event), &sender_id).unwrap(); + ops_tx + .send(GossipEvent::Received(GossipMessage { + content: bytes::Bytes::from(data), + sender: sender_id.endpoint_id(), + })) + .await + .unwrap(); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let info = state_addr.ask(GetRoleInfoMsg).await.unwrap(); + match info { + WorkerRoleInfo::Replay { + events_buffered, .. + } => { + assert_eq!( + events_buffered, 1, + "event should be processed without ready signal" + ); + } + _ => panic!("expected Replay"), + } + + system.shutdown().await; +}