Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 87 additions & 59 deletions .claude/skills/resolving-issues/SKILL.md

Large diffs are not rendered by default.

275 changes: 275 additions & 0 deletions crates/actor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1140,4 +1140,279 @@ mod tests {

system.shutdown().await;
}

// ───── Shutdown ordering: system waits for children spawned via ctx ────

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn system_shutdown_terminates_ctx_spawned_child() {
// Children spawned via Context::spawn are tracked by the system and
// must be stopped when the system shuts down. Without this guarantee,
// child actors leak past their parent.
struct ParentActor;
impl Actor for ParentActor {}

struct SpawnChild;
impl Message for SpawnChild {
type Result = Addr<CounterActor>;
}
impl Handler<SpawnChild> for ParentActor {
fn handle(
&mut self,
_msg: SpawnChild,
ctx: &mut Context<Self>,
) -> impl std::future::Future<Output = Addr<CounterActor>> + Send {
let child = ctx.spawn(CounterActor::new());
async move { child }
}
}

let system = System::new();
let parent = system.spawn(ParentActor);
let child = parent.ask(SpawnChild).await.unwrap();

assert!(parent.is_alive(), "parent must be alive before shutdown");
assert!(child.is_alive(), "child must be alive before shutdown");

system.shutdown().await;

// After shutdown(), both parent AND child must be stopped — system
// is the registry root, ctx.spawn registers with the same system.
assert!(
!parent.is_alive(),
"parent should be stopped after shutdown"
);
assert!(!child.is_alive(), "child should be stopped after shutdown");
}

// ───── Shutdown waits for in-flight handler to finish ─────────────────

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn system_shutdown_awaits_in_flight_handler() {
// system.shutdown() must not return until each tracked actor has
// run its mailbox loop to completion. We prove this by having a
// handler signal "started" via a oneshot, then sleep, then increment
// a flag. After shutdown() returns, the flag MUST be observed set —
// proving shutdown awaited the in-flight handler.
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::oneshot;

struct LongHandlerActor {
started_tx: Option<oneshot::Sender<()>>,
finished: Arc<AtomicBool>,
}
impl Actor for LongHandlerActor {}

struct DoWork;
impl Message for DoWork {
type Result = ();
}
impl Handler<DoWork> for LongHandlerActor {
async fn handle(&mut self, _msg: DoWork, _ctx: &mut Context<Self>) {
if let Some(tx) = self.started_tx.take() {
let _ = tx.send(());
}
runtime::sleep(Duration::from_millis(80)).await;
self.finished.store(true, Ordering::SeqCst);
}
}

let system = System::new();
let (tx, rx) = oneshot::channel::<()>();
let finished = Arc::new(AtomicBool::new(false));
let addr = system.spawn(LongHandlerActor {
started_tx: Some(tx),
finished: finished.clone(),
});

addr.do_send(DoWork).unwrap();

// Wait until the handler has started — deterministic, no sleep.
rx.await.expect("handler must signal started");

// Now request shutdown. It must wait for the in-flight handler.
system.shutdown().await;

// After shutdown returns, the handler must have completed.
assert!(
finished.load(Ordering::SeqCst),
"shutdown returned before in-flight handler finished — \
parent must wait for child"
);
}

// ───── Broker: delivery to multiple (>2) subscribers ──────────────────

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn broker_delivers_to_many_subscribers() {
// Broker fans out a single Publish to N subscribers. Existing
// tests only cover 2 subscribers. This validates the core fanout
// semantics with N=5 and uses an ask round-trip on each subscriber
// as a deterministic barrier (no sleep-wait for propagation).
use crate::broker::{Broker, BrokerSubscribe, Publish};

#[derive(Clone)]
struct Evt;
impl Message for Evt {
type Result = ();
}

struct EvtCounter {
count: Arc<AtomicU32>,
}
impl Actor for EvtCounter {}
impl Handler<Evt> for EvtCounter {
async fn handle(&mut self, _msg: Evt, _ctx: &mut Context<Self>) {
self.count.fetch_add(1, Ordering::SeqCst);
}
}

// Ping = no-op message used as a FIFO barrier on each subscriber.
// When `ask(Ping)` resolves, the actor has processed every envelope
// queued before it — including any prior Publish-fanout deliveries.
struct Ping;
impl Message for Ping {
type Result = ();
}
impl Handler<Ping> for EvtCounter {
async fn handle(&mut self, _msg: Ping, _ctx: &mut Context<Self>) {}
}

let system = System::new();
let broker = system.spawn(Broker::<Evt>::new());

let n = 5usize;
let mut subs: Vec<(Addr<EvtCounter>, Arc<AtomicU32>)> = Vec::with_capacity(n);
for _ in 0..n {
let count = Arc::new(AtomicU32::new(0));
let addr = system.spawn(EvtCounter {
count: count.clone(),
});
// ask() ensures the subscription is registered before we publish.
broker
.ask(BrokerSubscribe(addr.clone().into()))
.await
.unwrap();
subs.push((addr, count));
}

broker.do_send(Publish(Evt)).unwrap();

// Barrier strategy:
// 1. ask() the broker to drain its mailbox through the Publish.
// Use a fresh dummy subscriber — re-subscribing an existing
// one wouldn't change semantics here.
// 2. ask() each subscriber a Ping; FIFO on the mailbox guarantees
// the Evt do_send queued by the broker is processed before
// this Ping resolves.
let dummy_count = Arc::new(AtomicU32::new(0));
let dummy = system.spawn(EvtCounter {
count: dummy_count.clone(),
});
broker
.ask(BrokerSubscribe(dummy.clone().into()))
.await
.unwrap();
// dummy subscribed AFTER the Publish, so it should not have
// received it — sanity check that broker doesn't replay.
dummy.ask(Ping).await.unwrap();
assert_eq!(
dummy_count.load(Ordering::SeqCst),
0,
"broker must not replay events to late subscribers"
);

for (i, (addr, count)) in subs.iter().enumerate() {
addr.ask(Ping).await.unwrap();
assert_eq!(
count.load(Ordering::SeqCst),
1,
"subscriber {i} should have received exactly 1 Evt"
);
}

system.shutdown().await;
}

// ───── Broker: slow subscriber doesn't block other deliveries ─────────

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn broker_slow_subscriber_does_not_block_others() {
// Broker uses do_send (fire-and-forget) per subscriber, so a slow
// handler on one subscriber must not delay delivery to another.
use crate::broker::{Broker, BrokerSubscribe, Publish};
use tokio::sync::Notify;

#[derive(Clone)]
struct Evt;
impl Message for Evt {
type Result = ();
}

struct SlowSub {
release: Arc<Notify>,
received: Arc<AtomicU32>,
}
impl Actor for SlowSub {}
impl Handler<Evt> for SlowSub {
async fn handle(&mut self, _msg: Evt, _ctx: &mut Context<Self>) {
// Block until we're explicitly released. Without per-subscriber
// isolation, this would prevent the fast subscriber from
// receiving its delivery.
self.release.notified().await;
self.received.fetch_add(1, Ordering::SeqCst);
}
}

struct FastSub {
received: Arc<Notify>,
}
impl Actor for FastSub {}
impl Handler<Evt> for FastSub {
async fn handle(&mut self, _msg: Evt, _ctx: &mut Context<Self>) {
self.received.notify_one();
}
}

let system = System::new();
let broker = system.spawn(Broker::<Evt>::new());

let slow_release = Arc::new(Notify::new());
let slow_received = Arc::new(AtomicU32::new(0));
let slow = system.spawn(SlowSub {
release: slow_release.clone(),
received: slow_received.clone(),
});

let fast_received = Arc::new(Notify::new());
let fast = system.spawn(FastSub {
received: fast_received.clone(),
});

broker.ask(BrokerSubscribe(slow.into())).await.unwrap();
broker.ask(BrokerSubscribe(fast.into())).await.unwrap();

broker.do_send(Publish(Evt)).unwrap();

// Fast subscriber must receive without waiting on slow one.
// Bounded wait is generous (2s) but the actual notify fires
// within microseconds in practice — no flake.
tokio::time::timeout(Duration::from_secs(2), fast_received.notified())
.await
.expect("fast subscriber should receive while slow is blocked");

// Slow subscriber is still blocked — release it and confirm.
assert_eq!(slow_received.load(Ordering::SeqCst), 0);
slow_release.notify_one();

// Now slow handler completes.
let deadline = std::time::Instant::now() + Duration::from_secs(2);
while slow_received.load(Ordering::SeqCst) == 0 {
if std::time::Instant::now() >= deadline {
panic!("slow subscriber never received after release");
}
runtime::sleep(Duration::from_millis(5)).await;
}

system.shutdown().await;
}
}
17 changes: 17 additions & 0 deletions crates/client/src/actions.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
//! UI-facing action methods on [`ClientHandle`].
//!
//! Most entry points in this module are thin pass-throughs that forward
//! their arguments to the corresponding method on
//! [`crate::mutations::ClientMutations`]. Their behaviour is exercised
//! through the mutation handle directly in `tests/multi_peer_sync.rs`,
//! `tests/trust_flow.rs`, `tests/ephemeral.rs`, and the inline `tests`
//! module at the bottom of `lib.rs`. State-machine-level invariants are
//! covered by `crates/state/src/tests.rs`.
//!
//! Methods that do non-trivial translation work *before* delegating —
//! validation (`share_file_inline`), ID minting (`create_voice_channel`),
//! direct event assembly with no mutation-handle helper
//! (`set_permission`, `assign_role`), or derived-view composition
//! (`pinned_message_ids`, `pinned_messages`, `is_pinned`) — are covered
//! at the client tier in `tests/actions.rs`.

use super::*;

impl<N: willow_network::Network> ClientHandle<N> {
Expand Down
4 changes: 4 additions & 0 deletions crates/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ mod tests_profile_view;
#[path = "tests/ephemeral.rs"]
mod tests_ephemeral;

#[cfg(test)]
#[path = "tests/actions.rs"]
mod tests_actions;

#[cfg(test)]
#[path = "tests/voice.rs"]
mod tests_voice;
Expand Down
Loading