Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .claude/skills/resolving-issues/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ Fresh agent per issue, scoped to one issue + master branch ref. Steps:

12. **Structural-deps follow-up family path:** dependency-multi-version audits (rand, getrandom, convert_case, bincode, etc.) often look "obvious" but are pinned by transitive crates we don't own — no workspace pin / `[patch]` can collapse them without lying about semver. The first 1–2 finds get individual follow-up trackers. On the **3rd** structural-deps follow-up in this family, file or update a single **upstream-domino meta-tracker issue** instead of another standalone TD-NN follow-up — list the holdout crates, the upstream releases that would unblock each version (e.g. `aes-gcm 0.11` stable, `derive_more 3.x`, `iroh ≥ N`), and link prior individual follow-ups under it. Future runs check the meta-tracker, don't refile the same shape.

**Retroactive meta-tracker fill-in (coordinator-direct, no implementer dispatch).** When a run's triage finds 3+ structural-deps trackers already exist *without* a consolidating meta-tracker, the coordinator files the meta-tracker as part of the step 6 already-fixed sweep — same pattern as closing already-fixed issues, pure metadata work, falls under the "Coordinator never codes" exceptions because no source files are touched. List rows for every active tracker, link them under the meta, comment on each individual tracker citing the meta. Skill compliance is *retroactive*: fix the gap when you spot it, don't leave the next run to re-derive the consolidation. Record the new meta issue under `## Skill Evolution` in the master PR body alongside the lessons.

13. **Report back** to coordinator: commit SHA on master branch, sites touched, anything unusual.

## Lessons Learned
Expand Down
2 changes: 1 addition & 1 deletion crates/client/src/base64.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub fn decode(input: &str) -> Option<Vec<u8>> {
Some(result)
}

#[cfg(test)]
#[cfg(all(test, not(target_arch = "wasm32")))]
mod tests {
use super::*;

Expand Down
2 changes: 1 addition & 1 deletion crates/client/src/emoji.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ fn builtin(code: &str) -> Option<&'static str> {
})
}

#[cfg(test)]
#[cfg(all(test, not(target_arch = "wasm32")))]
mod tests {
use super::*;

Expand Down
2 changes: 1 addition & 1 deletion crates/client/src/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub async fn download_file(
Ok(blobs.get(hash).await?.map(|b| b.to_vec()))
}

#[cfg(test)]
#[cfg(all(test, not(target_arch = "wasm32")))]
mod tests {
use super::*;
use willow_network::mem::{MemHub, MemNetwork};
Expand Down
2 changes: 1 addition & 1 deletion crates/client/src/invite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ pub fn endpoint_id_to_ed25519_public(endpoint_id: &willow_identity::EndpointId)
*endpoint_id.as_bytes()
}

#[cfg(test)]
#[cfg(all(test, not(target_arch = "wasm32")))]
mod tests {
use super::*;
use willow_identity::Identity;
Expand Down
2 changes: 1 addition & 1 deletion crates/client/src/joining.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ impl<N: willow_network::Network> ClientHandle<N> {
}
}

#[cfg(test)]
#[cfg(all(test, not(target_arch = "wasm32")))]
mod tests {
//! Tests for the client-side auth guards on invite generation.
//!
Expand Down
28 changes: 17 additions & 11 deletions crates/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,38 +48,42 @@ mod joining;
mod servers;
mod voice;

#[cfg(test)]
#[cfg(all(test, not(target_arch = "wasm32")))]
#[path = "tests/trust_flow.rs"]
mod tests_trust_flow;

#[cfg(test)]
#[cfg(all(test, not(target_arch = "wasm32")))]
#[path = "tests/multi_peer_sync.rs"]
mod tests_multi_peer_sync;

#[cfg(test)]
#[cfg(all(test, not(target_arch = "wasm32")))]
#[path = "tests/queue.rs"]
mod tests_queue;

#[cfg(test)]
#[cfg(all(test, not(target_arch = "wasm32")))]
#[path = "tests/profile_view.rs"]
mod tests_profile_view;

#[cfg(test)]
#[cfg(all(test, not(target_arch = "wasm32")))]
#[path = "tests/ephemeral.rs"]
mod tests_ephemeral;

#[cfg(test)]
#[cfg(all(test, not(target_arch = "wasm32")))]
#[path = "tests/actions.rs"]
mod tests_actions;

#[cfg(test)]
#[cfg(all(test, not(target_arch = "wasm32")))]
#[path = "tests/voice.rs"]
mod tests_voice;

#[cfg(test)]
#[cfg(all(test, not(target_arch = "wasm32")))]
#[path = "tests/governance.rs"]
mod tests_governance;

#[cfg(all(test, not(target_arch = "wasm32")))]
#[path = "tests/sync_reply_cache.rs"]
mod tests_sync_reply_cache;

/// How long a typing indicator remains visible after the last typing event, in milliseconds.
pub const TYPING_INDICATOR_TTL_MS: u64 = 5_000;

Expand Down Expand Up @@ -1036,7 +1040,7 @@ pub fn reconcile_topic_map<V: Clone>(
}

/// Create a test-only ClientHandle without connecting to the network.
#[cfg(any(test, feature = "test-utils"))]
#[cfg(all(not(target_arch = "wasm32"), any(test, feature = "test-utils")))]
pub fn test_client() -> (
ClientHandle<willow_network::mem::MemNetwork>,
willow_actor::Addr<willow_actor::Broker<ClientEvent>>,
Expand All @@ -1055,6 +1059,7 @@ pub fn test_client() -> (
)
.expect("genesis insert must succeed in test helper"),
stashed: HashMap::new(),
sync_reply_cache: None,
};

// Create the general channel in the DAG.
Expand Down Expand Up @@ -1310,7 +1315,7 @@ pub fn test_client() -> (
///
/// Unlike `test_client()`, multiple clients created with the same `hub`
/// can exchange messages through the in-memory gossip mesh.
#[cfg(any(test, feature = "test-utils"))]
#[cfg(all(not(target_arch = "wasm32"), any(test, feature = "test-utils")))]
pub async fn test_client_on_hub(
hub: &std::sync::Arc<willow_network::mem::MemHub>,
) -> (
Expand All @@ -1323,7 +1328,7 @@ pub async fn test_client_on_hub(
(client, broker)
}

#[cfg(test)]
#[cfg(all(test, not(target_arch = "wasm32")))]
mod tests {
use super::*;

Expand Down Expand Up @@ -1849,6 +1854,7 @@ mod tests {
for event in events_for_b {
ds.managed.insert_and_apply(event).ok();
}
ds.invalidate_sync_reply_cache();
})
.await;
// Sync B's event_state mirror from the DAG.
Expand Down
50 changes: 34 additions & 16 deletions crates/client/src/listeners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,17 @@ async fn topic_listener_loop<T: TopicHandle, E: TopicEvents>(

// ───── DAG helpers ──────────────────────────────────────────────────────────

/// Compute the cached `WireMessage::SyncRequest` reply payload — the
/// first [`SYNC_REPLY_LIMIT`](state_actors::SYNC_REPLY_LIMIT) events of
/// the DAG's deterministic topological sort. Cache populated lazily on
/// first read after invalidation; cleared by every successful insertion
/// path on `DagState`. See GEN-08 / issue #268.
pub(crate) async fn compute_sync_reply(
dag: &Addr<willow_actor::StateActor<state_actors::DagState>>,
) -> Vec<willow_state::Event> {
willow_actor::state::mutate(dag, |ds| ds.sync_reply_events()).await
}

/// Try to insert an event into the DAG. On success, ManagedDag atomically
/// applies it to state and resolves pending events. On chain gap, the
/// event is buffered. Duplicates are silently ignored.
Expand All @@ -221,6 +232,13 @@ async fn try_insert_event(ctx: &ListenerCtx, event: willow_state::Event) {
for r in &outcome.resolved {
all.push(r.clone());
}
// Any event entering the DAG (including chains drained
// from the pending buffer) changes the topological-sort
// prefix, so invalidate the SyncRequest-reply cache.
// See GEN-08 / issue #268.
if outcome.applied.is_some() || !outcome.resolved.is_empty() {
ds.invalidate_sync_reply_cache();
}
(outcome.applied, all)
}
Err(willow_state::InsertError::PrevMismatch {
Expand Down Expand Up @@ -393,20 +411,13 @@ async fn process_received_message<T: TopicHandle>(
}
crate::ops::WireMessage::SyncRequest { state_hash, .. } => {
let _ = state_hash; // Legacy field — can't filter by state hash in DAG model.
// TODO: Migrate clients to worker's heads-based sync protocol
// TODO(#65): Migrate clients to worker's heads-based sync protocol
// (WorkerRequest::Sync { heads }) for efficient delta sync.
// For now, send the first 500 events from topological sort.
// Receiver will dedup via InsertError::Duplicate.
let events: Vec<willow_state::Event> = willow_actor::state::select(&ctx.dag, |ds| {
ds.managed
.dag()
.topological_sort()
.into_iter()
.take(500)
.cloned()
.collect()
})
.await;
// For now, send the first SYNC_REPLY_LIMIT events from
// topological sort. Receiver will dedup via InsertError::Duplicate.
// The reply Vec is cached on `DagState` and invalidated on
// every successful DAG insert; see GEN-08 / issue #268.
let events = compute_sync_reply(&ctx.dag).await;
if !events.is_empty() {
let msg = crate::ops::WireMessage::SyncBatch { events };
if let Some(data) = crate::ops::pack_wire(&msg, &ctx.identity) {
Expand Down Expand Up @@ -671,7 +682,8 @@ async fn process_received_message<T: TopicHandle>(
let granted_peer = peer_endpoint;
let ts = crate::util::current_time_ms();
let grant_event = willow_actor::state::mutate(&ctx.dag, move |ds| {
ds.managed
let ev = ds
.managed
.create_and_insert(
&identity,
willow_state::EventKind::GrantPermission {
Expand All @@ -680,7 +692,13 @@ async fn process_received_message<T: TopicHandle>(
},
ts,
)
.ok()
.ok();
if ev.is_some() {
// SyncRequest-reply cache must be invalidated on every
// successful insertion path; see GEN-08 / issue #268.
ds.invalidate_sync_reply_cache();
}
ev
})
.await;
if let Some(event) = grant_event {
Expand Down Expand Up @@ -843,7 +861,7 @@ async fn process_received_message<T: TopicHandle>(
}
}

#[cfg(test)]
#[cfg(all(test, not(target_arch = "wasm32")))]
mod tests {
//! Listener tests for the JoinRequest signer guard (SEC-A-03 / #239).
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion crates/client/src/mentions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ fn resolve_mention(
None
}

#[cfg(test)]
#[cfg(all(test, not(target_arch = "wasm32")))]
mod tests {
use super::*;
use std::collections::HashMap;
Expand Down
14 changes: 12 additions & 2 deletions crates/client/src/mutations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ impl<N: willow_network::Network> ClientMutations<N> {
ds.managed
.insert_and_apply(genesis)
.expect("genesis event must insert successfully");
// SyncRequest-reply cache must be invalidated on every
// successful insertion path; see GEN-08 / issue #268.
ds.invalidate_sync_reply_cache();
ds.managed.state().clone()
})
.await;
Expand All @@ -118,9 +121,16 @@ impl<N: willow_network::Network> ClientMutations<N> {
let dag = self.dag.clone();
util::with_timeout("build_event", async move {
willow_actor::state::mutate(&dag, move |ds| {
ds.managed
let result = ds
.managed
.create_and_insert(&identity, kind, ts)
.map_err(|e| anyhow::anyhow!("DAG insert failed: {e:?}"))
.map_err(|e| anyhow::anyhow!("DAG insert failed: {e:?}"));
if result.is_ok() {
// SyncRequest-reply cache must be invalidated on every
// successful insertion path; see GEN-08 / issue #268.
ds.invalidate_sync_reply_cache();
}
result
})
.await
})
Expand Down
2 changes: 1 addition & 1 deletion crates/client/src/nickname.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl NicknameStore for MemNicknameStore {
}
}

#[cfg(test)]
#[cfg(all(test, not(target_arch = "wasm32")))]
mod tests {
use super::*;

Expand Down
2 changes: 1 addition & 1 deletion crates/client/src/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub const SERVER_OPS_TOPIC: &str = "_willow_server_ops";
/// Global gossipsub topic for profile broadcasts.
pub const PROFILE_TOPIC: &str = "_willow_profiles";

#[cfg(test)]
#[cfg(all(test, not(target_arch = "wasm32")))]
mod tests {
use super::*;
use willow_identity::Identity;
Expand Down
2 changes: 1 addition & 1 deletion crates/client/src/presence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ impl PresenceSnapshot {
}
}

#[cfg(test)]
#[cfg(all(test, not(target_arch = "wasm32")))]
mod tests {
use super::*;

Expand Down
2 changes: 1 addition & 1 deletion crates/client/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ pub fn derive_late_arrival(

// ───── Tests ─────────────────────────────────────────────────────────────

#[cfg(test)]
#[cfg(all(test, not(target_arch = "wasm32")))]
mod tests {
use super::*;
use std::collections::HashSet;
Expand Down
2 changes: 1 addition & 1 deletion crates/client/src/search/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub mod query;
pub mod status;
pub mod tokenize;

#[cfg(test)]
#[cfg(all(test, not(target_arch = "wasm32")))]
mod tests;

pub use bootstrap::{hydrate_index, index_message, reindex_message};
Expand Down
7 changes: 7 additions & 0 deletions crates/client/src/servers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ impl<N: willow_network::Network> ClientHandle<N> {
ds.managed =
willow_state::ManagedDag::empty(crate::state_actors::MAX_CLIENT_PENDING);
}
// The cached SyncRequest reply belongs to the *previous*
// server's DAG; the new active DAG has a different topological
// sort. See GEN-08 / issue #268.
ds.invalidate_sync_reply_cache();
ds.managed.state().clone()
})
.await;
Expand Down Expand Up @@ -120,6 +124,9 @@ impl<N: willow_network::Network> ClientHandle<N> {
// Reset managed to empty so seed_genesis creates fresh state.
ds.managed =
willow_state::ManagedDag::empty(crate::state_actors::MAX_CLIENT_PENDING);
// Cached SyncRequest reply belongs to the previous DAG.
// See GEN-08 / issue #268.
ds.invalidate_sync_reply_cache();
})
.await;
}
Expand Down
Loading