Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .claude/skills/resolving-issues/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ Fresh agent per issue, scoped to one issue + master branch ref. Steps:
- **Either pattern, no GitHub PR is opened.** No `mcp__github__create_pull_request` for sub-fixes. The master PR (end of run) is the only GitHub artifact.
5. Apply fix. Add tests at lowest tier covering behavior (see `CLAUDE.md` decision tree).
6. **Scope-creep guard:** if root-cause fix touches > 5 files OR > 200 LOC AND brainstorm in step 3 didn't already approve that scope, return to coordinator with a brainstorm note before pushing. Coordinator decides: split, defer, or proceed. Don't unilaterally balloon a small-scope ticket.

**Mechanical call-site migration is part of the fix, not scope creep.** If the fix changes a small API (e.g. swapping `map.insert(k, v)` for `lru.insert(k, v)` to make a new cap take effect), every call-site rewrite is load-bearing — without them the cap is dead code. Count them in the LOC delta but don't abort just because they push past 200. Justify the count in the brainstorm + commit body so the human can see why the fan-out was unavoidable. Real scope creep = unrelated cleanup, drive-by refactors, "while I'm in here" tweaks — those still abort.
7. **Local merge gate.** Run, in order:
- `cargo fmt --all -- --check` (or `just fmt-check` if available)
- `cargo clippy <scope> --all-targets -- -D warnings` — scope to touched crate(s) for speed; workspace-wide if changes ripple
Expand Down
4 changes: 2 additions & 2 deletions crates/client/src/accessors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ impl<N: willow_network::Network> ClientHandle<N> {
let my_id = self.identity.endpoint_id();
willow_actor::state::mutate(&self.network_meta_addr, move |n| {
let now = crate::util::current_time_ms();
n.typing_peers
.retain(|_, (_, ts)| now - *ts < crate::TYPING_INDICATOR_TTL_MS);
// Keep map + recency in lockstep: helper drops both.
n.sweep_typing(now, crate::TYPING_INDICATOR_TTL_MS);
n.typing_peers
.iter()
.filter(|(pid, _)| *pid != &my_id)
Expand Down
40 changes: 31 additions & 9 deletions crates/client/src/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,29 @@ use willow_network::TopicHandle as _;
/// Spawn the per-connection presence tick driver.
///
/// Advances [`PresenceMeta::now`](state_actors::PresenceMeta) by one
/// every second and refreshes `last_seen` for each reachable peer so
/// their derived state stays `here` while online.
/// every second, refreshes `last_seen` for each reachable peer so their
/// derived state stays `here` while online, and sweeps stale entries
/// from [`NetworkMeta::typing_peers`](state_actors::NetworkMeta) older
/// than [`crate::TYPING_INDICATOR_TTL_MS`]. Piggy-backing the typing
/// sweep on the existing presence cadence (1 Hz) avoids a second timer
/// task: the TTL is 5 s so 1 Hz drains entries with at most 1 s of
/// extra dwell, far below the user-visible threshold. Followup to
/// issue #429 ([SEC-V-05]).
///
/// On native we use `tokio::spawn`; on wasm we use
/// `wasm_bindgen_futures::spawn_local` with `gloo-timers` for sleep.
fn spawn_presence_tick(
presence_meta_addr: willow_actor::Addr<willow_actor::StateActor<state_actors::PresenceMeta>>,
chat_meta_addr: willow_actor::Addr<willow_actor::StateActor<state_actors::ChatMeta>>,
network_meta_addr: willow_actor::Addr<willow_actor::StateActor<state_actors::NetworkMeta>>,
) {
#[cfg(not(target_arch = "wasm32"))]
{
if let Ok(rt) = tokio::runtime::Handle::try_current() {
rt.spawn(async move {
loop {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
tick_once(&presence_meta_addr, &chat_meta_addr).await;
tick_once(&presence_meta_addr, &chat_meta_addr, &network_meta_addr).await;
}
});
}
Expand All @@ -29,26 +36,29 @@ fn spawn_presence_tick(
wasm_bindgen_futures::spawn_local(async move {
loop {
gloo_timers::future::TimeoutFuture::new(1_000).await;
tick_once(&presence_meta_addr, &chat_meta_addr).await;
tick_once(&presence_meta_addr, &chat_meta_addr, &network_meta_addr).await;
}
});
}
}

/// Single tick: advance `now` and stamp `last_seen` for every peer in
/// `chat_meta.peers`. Kept separate so it can be unit-tested by driving
/// it directly without spawning a timer task.
/// Single tick: advance `now`, stamp `last_seen` for every peer in
/// `chat_meta.peers`, and sweep stale typing entries. Kept separate so
/// it can be unit-tested by driving it directly without spawning a
/// timer task.
#[cfg(any(test, feature = "test-utils"))]
pub async fn tick_once_for_test(
presence_meta_addr: &willow_actor::Addr<willow_actor::StateActor<state_actors::PresenceMeta>>,
chat_meta_addr: &willow_actor::Addr<willow_actor::StateActor<state_actors::ChatMeta>>,
network_meta_addr: &willow_actor::Addr<willow_actor::StateActor<state_actors::NetworkMeta>>,
) {
tick_once(presence_meta_addr, chat_meta_addr).await;
tick_once(presence_meta_addr, chat_meta_addr, network_meta_addr).await;
}

async fn tick_once(
presence_meta_addr: &willow_actor::Addr<willow_actor::StateActor<state_actors::PresenceMeta>>,
chat_meta_addr: &willow_actor::Addr<willow_actor::StateActor<state_actors::ChatMeta>>,
network_meta_addr: &willow_actor::Addr<willow_actor::StateActor<state_actors::NetworkMeta>>,
) {
let reachable = willow_actor::state::select(chat_meta_addr, |c| c.peers.clone()).await;
willow_actor::state::mutate(presence_meta_addr, move |pm| {
Expand All @@ -58,6 +68,14 @@ async fn tick_once(
}
})
.await;
// Drain stale typing entries on the same cadence so the map cannot
// accumulate dead peers indefinitely (#429). The accessor + view
// layers also filter on read, but only the sweep removes entries.
let now_ms = crate::util::current_time_ms();
willow_actor::state::mutate(network_meta_addr, move |n| {
n.sweep_typing(now_ms, crate::TYPING_INDICATOR_TTL_MS);
})
.await;
}

/// Phase 2b — queue tick driver. 1 tick / s. Advances
Expand Down Expand Up @@ -326,7 +344,11 @@ impl<N: willow_network::Network> ClientHandle<N> {
// while reachable. When a peer drops out of `chat_meta.peers`
// their last_seen stays frozen so elapsed = now - last_seen
// climbs past the idle / gone thresholds in due course.
spawn_presence_tick(self.presence_meta_addr.clone(), self.chat_meta_addr.clone());
spawn_presence_tick(
self.presence_meta_addr.clone(),
self.chat_meta_addr.clone(),
self.network_meta_addr.clone(),
);

// Sync-queue tick driver (Phase 2b). Advances `QueueMeta::now`
// and decays `recent_arrivals` entries older than 24 h so the
Expand Down
6 changes: 3 additions & 3 deletions crates/client/src/joining.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ impl<N: willow_network::Network> ClientHandle<N> {
let pid = self.identity.endpoint_id();
let name = name.to_string();
willow_actor::state::mutate(&self.profile_state_addr, move |p| {
p.names.insert(pid, name);
p.insert_name(pid, name);
})
.await;
self.broadcast_profile_via_network();
Expand Down Expand Up @@ -326,8 +326,8 @@ impl<N: willow_network::Network> ClientHandle<N> {
let profiles = willow_actor::state::get(&self.profile_state_addr).await;
willow_actor::state::mutate(&self.network_meta_addr, move |n| {
let now = util::current_time_ms();
n.typing_peers
.retain(|_, (_, ts)| now - *ts < crate::TYPING_INDICATOR_TTL_MS);
// Keep map + recency in lockstep: helper drops both.
n.sweep_typing(now, crate::TYPING_INDICATOR_TTL_MS);
n.typing_peers
.iter()
.filter(|(pid, (ch, _))| ch == &channel && *pid != &my_id)
Expand Down
44 changes: 36 additions & 8 deletions crates/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,11 @@ impl<N: willow_network::Network> ClientHandle<N> {
// Load saved display name, or use config override.
let local_endpoint = identity.endpoint_id();
if let Some(ref name) = config.display_name {
// `state.profiles` is the legacy pre-actor `ProfileStore` —
// intentionally a plain `HashMap`. The total-entry cap lives
// on the actor-side `state_actors::ProfileState` (#429); this
// buffer only holds the local user's own name during init
// and is consumed when the actor is spawned below.
state.profiles.names.insert(local_endpoint, name.clone());
if config.persistence {
storage::save_profile(&storage::LocalProfile {
Expand Down Expand Up @@ -723,10 +728,17 @@ impl<N: willow_network::Network> ClientHandle<N> {
};
system.spawn(willow_actor::StateActor::new(meta))
};
let profile_state_addr =
system.spawn(willow_actor::StateActor::new(state_actors::ProfileState {
names: state.profiles.names.clone(),
}));
let profile_state_addr = {
// Seed the actor via `insert_name` so the LRU recency queue
// mirrors `names`. Iteration order is irrelevant — at startup
// the seed is at most a handful of entries (local user
// profile + any persisted state) far below the cap.
let mut seeded = state_actors::ProfileState::default();
for (pid, name) in state.profiles.names.iter() {
seeded.insert_name(*pid, name.clone());
}
system.spawn(willow_actor::StateActor::new(seeded))
};
let network_meta_addr = system.spawn(willow_actor::StateActor::new(
state_actors::NetworkMeta::default(),
));
Expand Down Expand Up @@ -1076,8 +1088,14 @@ pub fn test_client() -> (
current_channel: state.chat.current_channel.clone(),
peers: state.chat.peers.clone(),
}));
let profile_state_addr = sys.spawn(willow_actor::StateActor::new(state_actors::ProfileState {
names: state.profiles.names.clone(),
let profile_state_addr = sys.spawn(willow_actor::StateActor::new({
// Seed via `insert_name` so the LRU recency queue mirrors
// `names`. See sibling site in `ClientHandle::new` for context.
let mut seeded = state_actors::ProfileState::default();
for (pid, name) in state.profiles.names.iter() {
seeded.insert_name(*pid, name.clone());
}
seeded
}));
let network_meta_addr = sys.spawn(willow_actor::StateActor::new(
state_actors::NetworkMeta::default(),
Expand Down Expand Up @@ -1947,7 +1965,12 @@ mod tests {
client._set_queue_depth(bob, 2).await;

// Advance one tick while reachable — last_seen stays fresh.
connect::tick_once_for_test(&client.presence_meta_addr, &client.chat_meta_addr).await;
connect::tick_once_for_test(
&client.presence_meta_addr,
&client.chat_meta_addr,
&client.network_meta_addr,
)
.await;
// Drop bob offline and advance a few ticks.
client.mutations().peer_disconnected(bob).await;
// queue > 0 + reachable = false ⇒ Queued before gone threshold.
Expand All @@ -1967,7 +1990,12 @@ mod tests {

// Advance past gone_ticks (=5) — tick 6 to guarantee we cross it.
for _ in 0..6 {
connect::tick_once_for_test(&client.presence_meta_addr, &client.chat_meta_addr).await;
connect::tick_once_for_test(
&client.presence_meta_addr,
&client.chat_meta_addr,
&client.network_meta_addr,
)
.await;
}
// Wait for the derived view to settle on Gone after the burst.
await_view(|| async {
Expand Down
6 changes: 3 additions & 3 deletions crates/client/src/listeners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ async fn process_received_message<T: TopicHandle>(
let peer_id = profile.peer_id;
let display_name = profile.display_name.clone();
willow_actor::state::mutate(&ctx.profiles, move |p| {
p.names.insert(peer_id, display_name);
p.insert_name(peer_id, display_name);
})
.await;
warn_if_err(
Expand Down Expand Up @@ -404,7 +404,7 @@ async fn process_received_message<T: TopicHandle>(
}
let now = crate::util::current_time_ms();
willow_actor::state::mutate(&ctx.network, move |n| {
n.typing_peers.insert(signer, (channel, now));
n.insert_typing(signer, channel, now);
})
.await;
let signer2 = signer;
Expand Down Expand Up @@ -732,7 +732,7 @@ async fn process_received_message<T: TopicHandle>(
}
let name = display_name.clone();
willow_actor::state::mutate(&ctx.profiles, move |p| {
p.names.insert(peer_id, name);
p.insert_name(peer_id, name);
})
.await;
warn_if_err(
Expand Down
4 changes: 2 additions & 2 deletions crates/client/src/mutations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ impl<N: willow_network::Network> ClientMutations<N> {
pub async fn update_profile(&self, peer_id: EndpointId, display_name: String) {
let name = display_name.clone();
willow_actor::state::mutate(&self.profiles, move |p| {
p.names.insert(peer_id, name);
p.insert_name(peer_id, name);
})
.await;
self.event_broker
Expand All @@ -772,7 +772,7 @@ impl<N: willow_network::Network> ClientMutations<N> {
pub async fn record_typing(&self, peer_id: EndpointId, channel: String) {
let now = util::current_time_ms();
willow_actor::state::mutate(&self.network, move |n| {
n.typing_peers.insert(peer_id, (channel, now));
n.insert_typing(peer_id, channel, now);
})
.await;
// Also ensure peer is tracked.
Expand Down
2 changes: 1 addition & 1 deletion crates/client/src/servers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl<N: willow_network::Network> ClientHandle<N> {
let pid = self.identity.endpoint_id();
let n = name.clone();
willow_actor::state::mutate(&self.profile_state_addr, move |p| {
p.names.insert(pid, n);
p.insert_name(pid, n);
})
.await;
// Persist to localStorage so broadcast_profile_via_network() can read
Expand Down
Loading