diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index 59e59db3..a3fc1292 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -1256,6 +1256,26 @@ pub async fn test_client_on_hub( mod tests { use super::*; + /// Poll a derived view until `predicate` returns `true`, yielding + /// between attempts. Used in place of fixed `sleep(50ms)` waits after + /// mutations on a *source* actor whose change must propagate through + /// a `DerivedActor` recompute (Notify → spawn → UpdateCache hop). + /// Panics on a 5-second deadline so a real regression surfaces fast. + async fn await_view(mut probe: F) + where + F: FnMut() -> Fut, + Fut: std::future::Future, + { + let deadline = std::time::Duration::from_secs(5); + let ok = tokio::time::timeout(deadline, async { + while !probe().await { + tokio::task::yield_now().await; + } + }) + .await; + assert!(ok.is_ok(), "derived view did not converge within 5s"); + } + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn actor_system_creates_and_responds() { let (client, _rx) = test_client(); @@ -1267,8 +1287,9 @@ mod tests { async fn send_message_and_read_back() { let (client, _rx) = test_client(); client.send_message("general", "hello").await.unwrap(); - // Give actor time to process - tokio::time::sleep(std::time::Duration::from_millis(50)).await; + // send_message awaits state::mutate(event_state) inside apply_event, + // and messages() reads the same actor — the FIFO mailbox guarantees + // the mutation is visible without a wall-clock wait. let msgs = client.messages("general").await; assert!(!msgs.is_empty()); assert_eq!(msgs.last().unwrap().body, "hello"); @@ -1314,18 +1335,19 @@ mod tests { .send_message("general", "hello server2") .await .unwrap(); - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - // Verify the message is on the current (server2) state. + // Verify the message is on the current (server2) state. send_message + // awaits the event_state mutation; messages() reads the same actor. let msgs = client.messages("general").await; assert!( msgs.iter().any(|m| m.body == "hello server2"), "message should be on server2" ); - // Switch back to server1. + // Switch back to server1. switch_server awaits every actor mutation + // it issues (dag, event_state, server_registry, chat_meta), so the + // subsequent messages() read sees the restored state immediately. client.switch_server(&server1_id).await; - tokio::time::sleep(std::time::Duration::from_millis(50)).await; // After switching, messages should NOT contain server2's message. let msgs = client.messages("general").await; @@ -1357,12 +1379,11 @@ mod tests { "Bob should not have SendMessages before invite" ); - // Alice generates an invite for Bob. + // Alice generates an invite for Bob. The GrantPermission event is + // built and applied via apply_event, which awaits the event_state + // mutate; the subsequent select() on the same actor sees the grant. alice.generate_invite(&bob_id).await.unwrap(); - // Give the actor system a tick to process the mutation. - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - // Now Bob should have SendMessages permission in Alice's state. let has_after = willow_actor::state::select(&alice.event_state_addr, move |es| { es.has_permission(&bob_id, &willow_state::Permission::SendMessages) @@ -1847,8 +1868,13 @@ mod tests { client .set_self_presence(presence::PresenceOverride::Away) .await; - // Give the derived view a tick to recompute. - tokio::time::sleep(std::time::Duration::from_millis(50)).await; + // The derived presence view must recompute (Notify → selector → + // UpdateCache) before reflecting the source mutation. Poll the + // cached value rather than hard-waiting on wall-clock time. + await_view(|| async { + client.view_handle.presence.get().await.self_state == presence::PresenceState::Away + }) + .await; let after = client.view_handle.presence.get().await; assert_eq!(after.self_state, presence::PresenceState::Away); @@ -1862,7 +1888,12 @@ mod tests { let bob = willow_identity::Identity::generate().endpoint_id(); client.mutations().peer_connected(bob).await; - tokio::time::sleep(std::time::Duration::from_millis(50)).await; + // Wait for the derived presence view to pick up bob from the + // updated chat_meta source — same race the old sleep papered over. + await_view(|| async { + client.observe_peer_presence(bob).await == presence::PresenceState::Here + }) + .await; let state = client.observe_peer_presence(bob).await; assert_eq!(state, presence::PresenceState::Here); @@ -1890,9 +1921,15 @@ mod tests { connect::tick_once_for_test(&client.presence_meta_addr, &client.chat_meta_addr).await; // Drop bob offline and advance a few ticks. client.mutations().peer_disconnected(bob).await; - // Allow the derived view to recompute. - tokio::time::sleep(std::time::Duration::from_millis(50)).await; // queue > 0 + reachable = false ⇒ Queued before gone threshold. + // Poll the derived view until it reflects the source mutation. + await_view(|| async { + matches!( + client.observe_peer_presence(bob).await, + presence::PresenceState::Queued(_) + ) + }) + .await; let before = client.observe_peer_presence(bob).await; assert!( matches!(before, presence::PresenceState::Queued(_)), @@ -1903,8 +1940,11 @@ mod tests { for _ in 0..6 { connect::tick_once_for_test(&client.presence_meta_addr, &client.chat_meta_addr).await; } - // Let the derived view settle after the mutation burst. - tokio::time::sleep(std::time::Duration::from_millis(50)).await; + // Wait for the derived view to settle on Gone after the burst. + await_view(|| async { + client.observe_peer_presence(bob).await == presence::PresenceState::Gone + }) + .await; let pm = willow_actor::state::get(&client.presence_meta_addr).await; let elapsed = pm .now @@ -1972,8 +2012,10 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn mutate_channel_mute_emits_event_and_flips_stats() { let (client, _broker) = test_client(); + // create_channel awaits apply_event (event_state mutate). No need + // to wait for broker delivery here — subscribe_events comes after, + // so the prior CreateChannel publish cannot leak into the receiver. client.create_channel("quiet").await.unwrap(); - tokio::time::sleep(std::time::Duration::from_millis(50)).await; let mut rx = client.subscribe_events().await; client.mutate_channel_mute("quiet", true).await.unwrap(); @@ -2057,8 +2099,10 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn mutate_channel_mute_toggle_off_clears_set() { let (client, _broker) = test_client(); + // Both create_channel and the two mutate_channel_mute calls await + // their apply_event → state::mutate(event_state), so the final get + // sees the toggled-off set without a wall-clock delay. client.create_channel("noisy").await.unwrap(); - tokio::time::sleep(std::time::Duration::from_millis(50)).await; client.mutate_channel_mute("noisy", true).await.unwrap(); client.mutate_channel_mute("noisy", false).await.unwrap();