diff --git a/crates/actor/src/lib.rs b/crates/actor/src/lib.rs index e9f85b79..b5338ed0 100644 --- a/crates/actor/src/lib.rs +++ b/crates/actor/src/lib.rs @@ -1140,4 +1140,279 @@ mod tests { system.shutdown().await; } + + // ───── Shutdown ordering: system waits for children spawned via ctx ──── + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn system_shutdown_terminates_ctx_spawned_child() { + // Children spawned via Context::spawn are tracked by the system and + // must be stopped when the system shuts down. Without this guarantee, + // child actors leak past their parent. + struct ParentActor; + impl Actor for ParentActor {} + + struct SpawnChild; + impl Message for SpawnChild { + type Result = Addr; + } + impl Handler for ParentActor { + fn handle( + &mut self, + _msg: SpawnChild, + ctx: &mut Context, + ) -> impl std::future::Future> + Send { + let child = ctx.spawn(CounterActor::new()); + async move { child } + } + } + + let system = System::new(); + let parent = system.spawn(ParentActor); + let child = parent.ask(SpawnChild).await.unwrap(); + + assert!(parent.is_alive(), "parent must be alive before shutdown"); + assert!(child.is_alive(), "child must be alive before shutdown"); + + system.shutdown().await; + + // After shutdown(), both parent AND child must be stopped — system + // is the registry root, ctx.spawn registers with the same system. + assert!( + !parent.is_alive(), + "parent should be stopped after shutdown" + ); + assert!(!child.is_alive(), "child should be stopped after shutdown"); + } + + // ───── Shutdown waits for in-flight handler to finish ───────────────── + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn system_shutdown_awaits_in_flight_handler() { + // system.shutdown() must not return until each tracked actor has + // run its mailbox loop to completion. We prove this by having a + // handler signal "started" via a oneshot, then sleep, then increment + // a flag. After shutdown() returns, the flag MUST be observed set — + // proving shutdown awaited the in-flight handler. + use std::sync::atomic::{AtomicBool, Ordering}; + use tokio::sync::oneshot; + + struct LongHandlerActor { + started_tx: Option>, + finished: Arc, + } + impl Actor for LongHandlerActor {} + + struct DoWork; + impl Message for DoWork { + type Result = (); + } + impl Handler for LongHandlerActor { + async fn handle(&mut self, _msg: DoWork, _ctx: &mut Context) { + if let Some(tx) = self.started_tx.take() { + let _ = tx.send(()); + } + runtime::sleep(Duration::from_millis(80)).await; + self.finished.store(true, Ordering::SeqCst); + } + } + + let system = System::new(); + let (tx, rx) = oneshot::channel::<()>(); + let finished = Arc::new(AtomicBool::new(false)); + let addr = system.spawn(LongHandlerActor { + started_tx: Some(tx), + finished: finished.clone(), + }); + + addr.do_send(DoWork).unwrap(); + + // Wait until the handler has started — deterministic, no sleep. + rx.await.expect("handler must signal started"); + + // Now request shutdown. It must wait for the in-flight handler. + system.shutdown().await; + + // After shutdown returns, the handler must have completed. + assert!( + finished.load(Ordering::SeqCst), + "shutdown returned before in-flight handler finished — \ + parent must wait for child" + ); + } + + // ───── Broker: delivery to multiple (>2) subscribers ────────────────── + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn broker_delivers_to_many_subscribers() { + // Broker fans out a single Publish to N subscribers. Existing + // tests only cover 2 subscribers. This validates the core fanout + // semantics with N=5 and uses an ask round-trip on each subscriber + // as a deterministic barrier (no sleep-wait for propagation). + use crate::broker::{Broker, BrokerSubscribe, Publish}; + + #[derive(Clone)] + struct Evt; + impl Message for Evt { + type Result = (); + } + + struct EvtCounter { + count: Arc, + } + impl Actor for EvtCounter {} + impl Handler for EvtCounter { + async fn handle(&mut self, _msg: Evt, _ctx: &mut Context) { + self.count.fetch_add(1, Ordering::SeqCst); + } + } + + // Ping = no-op message used as a FIFO barrier on each subscriber. + // When `ask(Ping)` resolves, the actor has processed every envelope + // queued before it — including any prior Publish-fanout deliveries. + struct Ping; + impl Message for Ping { + type Result = (); + } + impl Handler for EvtCounter { + async fn handle(&mut self, _msg: Ping, _ctx: &mut Context) {} + } + + let system = System::new(); + let broker = system.spawn(Broker::::new()); + + let n = 5usize; + let mut subs: Vec<(Addr, Arc)> = Vec::with_capacity(n); + for _ in 0..n { + let count = Arc::new(AtomicU32::new(0)); + let addr = system.spawn(EvtCounter { + count: count.clone(), + }); + // ask() ensures the subscription is registered before we publish. + broker + .ask(BrokerSubscribe(addr.clone().into())) + .await + .unwrap(); + subs.push((addr, count)); + } + + broker.do_send(Publish(Evt)).unwrap(); + + // Barrier strategy: + // 1. ask() the broker to drain its mailbox through the Publish. + // Use a fresh dummy subscriber — re-subscribing an existing + // one wouldn't change semantics here. + // 2. ask() each subscriber a Ping; FIFO on the mailbox guarantees + // the Evt do_send queued by the broker is processed before + // this Ping resolves. + let dummy_count = Arc::new(AtomicU32::new(0)); + let dummy = system.spawn(EvtCounter { + count: dummy_count.clone(), + }); + broker + .ask(BrokerSubscribe(dummy.clone().into())) + .await + .unwrap(); + // dummy subscribed AFTER the Publish, so it should not have + // received it — sanity check that broker doesn't replay. + dummy.ask(Ping).await.unwrap(); + assert_eq!( + dummy_count.load(Ordering::SeqCst), + 0, + "broker must not replay events to late subscribers" + ); + + for (i, (addr, count)) in subs.iter().enumerate() { + addr.ask(Ping).await.unwrap(); + assert_eq!( + count.load(Ordering::SeqCst), + 1, + "subscriber {i} should have received exactly 1 Evt" + ); + } + + system.shutdown().await; + } + + // ───── Broker: slow subscriber doesn't block other deliveries ───────── + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn broker_slow_subscriber_does_not_block_others() { + // Broker uses do_send (fire-and-forget) per subscriber, so a slow + // handler on one subscriber must not delay delivery to another. + use crate::broker::{Broker, BrokerSubscribe, Publish}; + use tokio::sync::Notify; + + #[derive(Clone)] + struct Evt; + impl Message for Evt { + type Result = (); + } + + struct SlowSub { + release: Arc, + received: Arc, + } + impl Actor for SlowSub {} + impl Handler for SlowSub { + async fn handle(&mut self, _msg: Evt, _ctx: &mut Context) { + // Block until we're explicitly released. Without per-subscriber + // isolation, this would prevent the fast subscriber from + // receiving its delivery. + self.release.notified().await; + self.received.fetch_add(1, Ordering::SeqCst); + } + } + + struct FastSub { + received: Arc, + } + impl Actor for FastSub {} + impl Handler for FastSub { + async fn handle(&mut self, _msg: Evt, _ctx: &mut Context) { + self.received.notify_one(); + } + } + + let system = System::new(); + let broker = system.spawn(Broker::::new()); + + let slow_release = Arc::new(Notify::new()); + let slow_received = Arc::new(AtomicU32::new(0)); + let slow = system.spawn(SlowSub { + release: slow_release.clone(), + received: slow_received.clone(), + }); + + let fast_received = Arc::new(Notify::new()); + let fast = system.spawn(FastSub { + received: fast_received.clone(), + }); + + broker.ask(BrokerSubscribe(slow.into())).await.unwrap(); + broker.ask(BrokerSubscribe(fast.into())).await.unwrap(); + + broker.do_send(Publish(Evt)).unwrap(); + + // Fast subscriber must receive without waiting on slow one. + // Bounded wait is generous (2s) but the actual notify fires + // within microseconds in practice — no flake. + tokio::time::timeout(Duration::from_secs(2), fast_received.notified()) + .await + .expect("fast subscriber should receive while slow is blocked"); + + // Slow subscriber is still blocked — release it and confirm. + assert_eq!(slow_received.load(Ordering::SeqCst), 0); + slow_release.notify_one(); + + // Now slow handler completes. + let deadline = std::time::Instant::now() + Duration::from_secs(2); + while slow_received.load(Ordering::SeqCst) == 0 { + if std::time::Instant::now() >= deadline { + panic!("slow subscriber never received after release"); + } + runtime::sleep(Duration::from_millis(5)).await; + } + + system.shutdown().await; + } }