Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 63 additions & 19 deletions crates/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1256,6 +1256,26 @@ pub async fn test_client_on_hub(
mod tests {
use super::*;

/// Poll a derived view until `predicate` returns `true`, yielding
/// between attempts. Used in place of fixed `sleep(50ms)` waits after
/// mutations on a *source* actor whose change must propagate through
/// a `DerivedActor` recompute (Notify → spawn → UpdateCache hop).
/// Panics on a 5-second deadline so a real regression surfaces fast.
async fn await_view<F, Fut>(mut probe: F)
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = bool>,
{
let deadline = std::time::Duration::from_secs(5);
let ok = tokio::time::timeout(deadline, async {
while !probe().await {
tokio::task::yield_now().await;
}
})
.await;
assert!(ok.is_ok(), "derived view did not converge within 5s");
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn actor_system_creates_and_responds() {
let (client, _rx) = test_client();
Expand All @@ -1267,8 +1287,9 @@ mod tests {
async fn send_message_and_read_back() {
let (client, _rx) = test_client();
client.send_message("general", "hello").await.unwrap();
// Give actor time to process
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
// send_message awaits state::mutate(event_state) inside apply_event,
// and messages() reads the same actor — the FIFO mailbox guarantees
// the mutation is visible without a wall-clock wait.
let msgs = client.messages("general").await;
assert!(!msgs.is_empty());
assert_eq!(msgs.last().unwrap().body, "hello");
Expand Down Expand Up @@ -1314,18 +1335,19 @@ mod tests {
.send_message("general", "hello server2")
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;

// Verify the message is on the current (server2) state.
// Verify the message is on the current (server2) state. send_message
// awaits the event_state mutation; messages() reads the same actor.
let msgs = client.messages("general").await;
assert!(
msgs.iter().any(|m| m.body == "hello server2"),
"message should be on server2"
);

// Switch back to server1.
// Switch back to server1. switch_server awaits every actor mutation
// it issues (dag, event_state, server_registry, chat_meta), so the
// subsequent messages() read sees the restored state immediately.
client.switch_server(&server1_id).await;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;

// After switching, messages should NOT contain server2's message.
let msgs = client.messages("general").await;
Expand Down Expand Up @@ -1357,12 +1379,11 @@ mod tests {
"Bob should not have SendMessages before invite"
);

// Alice generates an invite for Bob.
// Alice generates an invite for Bob. The GrantPermission event is
// built and applied via apply_event, which awaits the event_state
// mutate; the subsequent select() on the same actor sees the grant.
alice.generate_invite(&bob_id).await.unwrap();

// Give the actor system a tick to process the mutation.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;

// Now Bob should have SendMessages permission in Alice's state.
let has_after = willow_actor::state::select(&alice.event_state_addr, move |es| {
es.has_permission(&bob_id, &willow_state::Permission::SendMessages)
Expand Down Expand Up @@ -1847,8 +1868,13 @@ mod tests {
client
.set_self_presence(presence::PresenceOverride::Away)
.await;
// Give the derived view a tick to recompute.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
// The derived presence view must recompute (Notify → selector →
// UpdateCache) before reflecting the source mutation. Poll the
// cached value rather than hard-waiting on wall-clock time.
await_view(|| async {
client.view_handle.presence.get().await.self_state == presence::PresenceState::Away
})
.await;

let after = client.view_handle.presence.get().await;
assert_eq!(after.self_state, presence::PresenceState::Away);
Expand All @@ -1862,7 +1888,12 @@ mod tests {
let bob = willow_identity::Identity::generate().endpoint_id();

client.mutations().peer_connected(bob).await;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
// Wait for the derived presence view to pick up bob from the
// updated chat_meta source — same race the old sleep papered over.
await_view(|| async {
client.observe_peer_presence(bob).await == presence::PresenceState::Here
})
.await;

let state = client.observe_peer_presence(bob).await;
assert_eq!(state, presence::PresenceState::Here);
Expand Down Expand Up @@ -1890,9 +1921,15 @@ mod tests {
connect::tick_once_for_test(&client.presence_meta_addr, &client.chat_meta_addr).await;
// Drop bob offline and advance a few ticks.
client.mutations().peer_disconnected(bob).await;
// Allow the derived view to recompute.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
// queue > 0 + reachable = false ⇒ Queued before gone threshold.
// Poll the derived view until it reflects the source mutation.
await_view(|| async {
matches!(
client.observe_peer_presence(bob).await,
presence::PresenceState::Queued(_)
)
})
.await;
let before = client.observe_peer_presence(bob).await;
assert!(
matches!(before, presence::PresenceState::Queued(_)),
Expand All @@ -1903,8 +1940,11 @@ mod tests {
for _ in 0..6 {
connect::tick_once_for_test(&client.presence_meta_addr, &client.chat_meta_addr).await;
}
// Let the derived view settle after the mutation burst.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
// Wait for the derived view to settle on Gone after the burst.
await_view(|| async {
client.observe_peer_presence(bob).await == presence::PresenceState::Gone
})
.await;
let pm = willow_actor::state::get(&client.presence_meta_addr).await;
let elapsed = pm
.now
Expand Down Expand Up @@ -1972,8 +2012,10 @@ mod tests {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn mutate_channel_mute_emits_event_and_flips_stats() {
let (client, _broker) = test_client();
// create_channel awaits apply_event (event_state mutate). No need
// to wait for broker delivery here — subscribe_events comes after,
// so the prior CreateChannel publish cannot leak into the receiver.
client.create_channel("quiet").await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;

let mut rx = client.subscribe_events().await;
client.mutate_channel_mute("quiet", true).await.unwrap();
Expand Down Expand Up @@ -2057,8 +2099,10 @@ mod tests {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn mutate_channel_mute_toggle_off_clears_set() {
let (client, _broker) = test_client();
// Both create_channel and the two mutate_channel_mute calls await
// their apply_event → state::mutate(event_state), so the final get
// sees the toggled-off set without a wall-clock delay.
client.create_channel("noisy").await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;

client.mutate_channel_mute("noisy", true).await.unwrap();
client.mutate_channel_mute("noisy", false).await.unwrap();
Expand Down