diff --git a/Cargo.lock b/Cargo.lock index 8e9c1ce4..1bd5dbdd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6131,6 +6131,7 @@ dependencies = [ "iroh-base", "serde", "sha2 0.10.9", + "tracing", "willow-identity", ] diff --git a/Cargo.toml b/Cargo.toml index 8b88a208..a47e8d7b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,3 +40,6 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } # Synchronization primitives (poison-free, WASM-compatible) parking_lot = "0.12" + +[workspace.lints.clippy] +let_underscore_must_use = "deny" diff --git a/crates/actor/src/addr.rs b/crates/actor/src/addr.rs index 1673e714..22005777 100644 --- a/crates/actor/src/addr.rs +++ b/crates/actor/src/addr.rs @@ -39,7 +39,7 @@ impl Addr { let envelope = envelope::envelope_send(msg); // Channel could close between our check and send — that's fine, // the message is lost (same as any async channel race). - let _ = self.tx.send(envelope); + self.tx.send(envelope).ok(); Ok(()) } diff --git a/crates/actor/src/context.rs b/crates/actor/src/context.rs index 64544cb3..2244ae7b 100644 --- a/crates/actor/src/context.rs +++ b/crates/actor/src/context.rs @@ -77,7 +77,7 @@ impl Context { } None => { let envelope = envelope::envelope_stream_finished::(); - let _ = tx.send(envelope); + tx.send(envelope).ok(); break; } } @@ -138,7 +138,7 @@ impl Context { } let envelope = envelope::envelope_send::(msg); - let _ = tx.send(envelope); + tx.send(envelope).ok(); }); TimerHandle { cancelled } diff --git a/crates/actor/src/debounce.rs b/crates/actor/src/debounce.rs index 4041eeea..f9490905 100644 --- a/crates/actor/src/debounce.rs +++ b/crates/actor/src/debounce.rs @@ -72,7 +72,7 @@ impl + Send + 'static> Handler> for Debounce< impl + Send + 'static> Handler for Debounce { fn handle(&mut self, _msg: Flush, _ctx: &mut Context) -> impl Future + Send { if let Some(pending) = self.pending.take() { - let _ = self.target.do_send(pending); + self.target.do_send(pending).ok(); } async {} } @@ -120,7 +120,7 @@ impl + Send + 'static> Handler> for Throttle< ctx: &mut Context, ) -> impl Future + Send { if !self.cooling_down { - let _ = self.target.do_send(msg.0); + self.target.do_send(msg.0).ok(); self.cooling_down = true; self._timer = Some(ctx.run_after(self.interval, CooldownExpired)); } else { @@ -139,7 +139,7 @@ impl + Send + 'static> Handler for Thro self.cooling_down = false; self._timer = None; if let Some(pending) = self.pending.take() { - let _ = self.target.do_send(pending); + self.target.do_send(pending).ok(); self.cooling_down = true; self._timer = Some(ctx.run_after(self.interval, CooldownExpired)); } diff --git a/crates/actor/src/derived.rs b/crates/actor/src/derived.rs index 0e0c4236..b2878d62 100644 --- a/crates/actor/src/derived.rs +++ b/crates/actor/src/derived.rs @@ -126,7 +126,7 @@ impl Actor for DerivedA crate::runtime::spawn(async move { let snapshot = sources.snapshot().await; let value = selector(&snapshot); - let _ = addr.do_send(UpdateCache(value)); + addr.do_send(UpdateCache(value)).ok(); }); async {} @@ -151,7 +151,7 @@ impl Handler crate::runtime::spawn(async move { let snapshot = sources.snapshot().await; let value = selector(&snapshot); - let _ = addr.do_send(UpdateCache(value)); + addr.do_send(UpdateCache(value)).ok(); }); async {} } @@ -241,7 +241,7 @@ impl From<&Addr| { Box::pin(async move { let result = actor.handle(msg, ctx).await; - let _ = reply_tx.send(result); + reply_tx.send(result).ok(); }) }) } diff --git a/crates/actor/src/fsm.rs b/crates/actor/src/fsm.rs index f7a3f849..4e359059 100644 --- a/crates/actor/src/fsm.rs +++ b/crates/actor/src/fsm.rs @@ -157,7 +157,7 @@ impl From<&Addr>> for StateRef { StateRef::new( Arc::new(move |recipient| { - let _ = addr_sub.do_send(Subscribe(recipient)); + addr_sub.do_send(Subscribe(recipient)).ok(); }), Arc::new(move || { let addr = addr_get.clone(); diff --git a/crates/actor/src/mailbox.rs b/crates/actor/src/mailbox.rs index 803f0201..20586d85 100644 --- a/crates/actor/src/mailbox.rs +++ b/crates/actor/src/mailbox.rs @@ -57,5 +57,5 @@ pub async fn run_mailbox( actor.stopped().await; trace!("actor stopped"); - let _ = done.send(()); + done.send(()).ok(); } diff --git a/crates/actor/src/runtime.rs b/crates/actor/src/runtime.rs index 22d50113..943a8e74 100644 --- a/crates/actor/src/runtime.rs +++ b/crates/actor/src/runtime.rs @@ -165,7 +165,7 @@ impl OneshotTx { // futures_channel::oneshot::Sender::send returns Err(T) on failure, // but we need Err(T) for our API. The cancellation error doesn't // carry the value, so we reconstruct it. - self.0.send(val).map_err(|e| e) + self.0.send(val) } } } diff --git a/crates/actor/src/state.rs b/crates/actor/src/state.rs index d85a5b43..120af24a 100644 --- a/crates/actor/src/state.rs +++ b/crates/actor/src/state.rs @@ -205,7 +205,7 @@ where A: Handler, { let recipient: Recipient = subscriber.into(); - let _ = state.do_send(Subscribe(recipient)); + state.do_send(Subscribe(recipient)).ok(); } // ───── StateRef ─────────────────────────────────────────────────────────── @@ -285,7 +285,7 @@ impl From<&Addr>> for StateRef { Self { subscribe_fn: Arc::new(move |recipient| { - let _ = addr_sub.do_send(Subscribe(recipient)); + addr_sub.do_send(Subscribe(recipient)).ok(); }), get_fn: Arc::new(move || { let addr = addr_get.clone(); diff --git a/crates/actor/src/supervisor.rs b/crates/actor/src/supervisor.rs index 6f93804b..ab73e9fd 100644 --- a/crates/actor/src/supervisor.rs +++ b/crates/actor/src/supervisor.rs @@ -124,5 +124,5 @@ async fn run_mailbox_inline( } actor.stopped().await; - let _ = done.send(()); + done.send(()).ok(); } diff --git a/crates/actor/src/system.rs b/crates/actor/src/system.rs index b47ef62a..49c37781 100644 --- a/crates/actor/src/system.rs +++ b/crates/actor/src/system.rs @@ -151,7 +151,7 @@ impl System { }; for rx in done_rxs { - let _ = rx.recv().await; + rx.recv().await.ok(); } } } @@ -202,17 +202,19 @@ impl SystemHandle { Box::new(move || { stop.store(true, Ordering::SeqCst); let noop: BoxEnvelope = Box::new(|_actor, _ctx| Box::pin(async {})); - let _ = tx.send(noop); + tx.send(noop).ok(); }) as Box }; // Register for shutdown tracking (fire-and-forget). // FIFO ordering guarantees this is processed before any // subsequent Shutdown message. - let _ = self.system_addr.do_send(Register(ActorEntry { - signal_stop, - done_rx: Some(done_rx), - })); + self.system_addr + .do_send(Register(ActorEntry { + signal_stop, + done_rx: Some(done_rx), + })) + .ok(); addr } diff --git a/crates/agent/src/tools.rs b/crates/agent/src/tools.rs index 0a9cb431..05913b7c 100644 --- a/crates/agent/src/tools.rs +++ b/crates/agent/src/tools.rs @@ -595,7 +595,7 @@ impl WillowToolRouter { let p: AuthorizeWorkersParams = parse_args(&args)?; let eids = parse_endpoint_ids(&p.worker_peer_ids) .map_err(|e| ErrorData::invalid_params(e, None))?; - let _ = self.client.authorize_workers(&eids).await; + self.client.authorize_workers(&eids).await.ok(); success_json(serde_json::json!({"success": true})) } diff --git a/crates/client/src/accessors.rs b/crates/client/src/accessors.rs index 929faee1..d3d32675 100644 --- a/crates/client/src/accessors.rs +++ b/crates/client/src/accessors.rs @@ -1,4 +1,5 @@ use super::*; +use crate::util; impl ClientHandle { pub fn identity(&self) -> Identity { @@ -67,18 +68,28 @@ impl ClientHandle { pub async fn event_messages(&self, channel_id: &str) -> Vec { let cid = channel_id.to_string(); - willow_actor::state::select(&self.event_state_addr, move |es| { - es.messages - .iter() - .filter(|m| m.channel_id == cid && !m.deleted) - .cloned() - .collect() + let addr = self.event_state_addr.clone(); + util::with_timeout("event_messages", async move { + willow_actor::state::select(&addr, move |es| { + es.messages + .iter() + .filter(|m| m.channel_id == cid && !m.deleted) + .cloned() + .collect() + }) + .await }) .await + .unwrap_or_default() } pub async fn peers(&self) -> Vec { - willow_actor::state::select(&self.chat_meta_addr, |c| c.peers.clone()).await + let addr = self.chat_meta_addr.clone(); + util::with_timeout("peers", async move { + willow_actor::state::select(&addr, |c| c.peers.clone()).await + }) + .await + .unwrap_or_default() } pub async fn server_members(&self) -> Vec<(willow_identity::EndpointId, String, bool)> { @@ -94,7 +105,12 @@ impl ClientHandle { } pub async fn is_connected(&self) -> bool { - willow_actor::state::select(&self.network_meta_addr, |n| n.connected).await + let addr = self.network_meta_addr.clone(); + util::with_timeout("is_connected", async move { + willow_actor::state::select(&addr, |n| n.connected).await + }) + .await + .unwrap_or(false) } pub async fn has_permission( @@ -104,10 +120,12 @@ impl ClientHandle { ) -> bool { let pid = *peer_id; let p = *perm; - willow_actor::state::select(&self.event_state_addr, move |es| { - es.has_permission(&pid, &p) + let addr = self.event_state_addr.clone(); + util::with_timeout("has_permission", async move { + willow_actor::state::select(&addr, move |es| es.has_permission(&pid, &p)).await }) .await + .unwrap_or(false) } pub async fn roles_data(&self) -> Vec<(String, String, Vec)> { @@ -127,7 +145,12 @@ impl ClientHandle { /// Check if a peer is an admin. pub async fn is_admin(&self, peer_id: &willow_identity::EndpointId) -> bool { let pid = *peer_id; - willow_actor::state::select(&self.event_state_addr, move |es| es.is_admin(&pid)).await + let addr = self.event_state_addr.clone(); + util::with_timeout("is_admin", async move { + willow_actor::state::select(&addr, move |es| es.is_admin(&pid)).await + }) + .await + .unwrap_or(false) } /// Get the set of admin EndpointIds. diff --git a/crates/client/src/joining.rs b/crates/client/src/joining.rs index b9c9cf73..0b8b386f 100644 --- a/crates/client/src/joining.rs +++ b/crates/client/src/joining.rs @@ -143,11 +143,11 @@ impl ClientHandle { } // Open event store on persistence actor. - let _ = self - .persistence_addr + self.persistence_addr .do_send(persistence_actor::OpenEventStore { server_id: accepted.server_id.clone(), - }); + }) + .ok(); // Request sync. let msg = ops::WireMessage::SyncRequest { diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index b7ed97cc..4efbb23b 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -61,6 +61,12 @@ pub enum ClientError { /// joiner from the rest of the network. #[error("malformed invite: {0}")] MalformedInvite(String), + /// An actor call did not complete within the allowed timeout. + /// + /// The label names the call site so callers can distinguish which + /// actor became unresponsive without needing to inspect backtraces. + #[error("actor call timed out: {0}")] + ActorTimeout(&'static str), } /// Helper to bridge `Broker` into an async stream receiver. @@ -86,7 +92,7 @@ pub mod event_receiver { willow_actor::runtime::channel(willow_actor::runtime::DEFAULT_MAILBOX_CAPACITY); let addr = system.spawn(ForwarderActor { tx }); let recipient = addr.into(); - let _ = broker.ask(BrokerSubscribe(recipient)).await; + broker.ask(BrokerSubscribe(recipient)).await.ok(); Self { rx } } @@ -114,7 +120,7 @@ pub mod event_receiver { msg: ClientEvent, _ctx: &mut Context, ) -> impl std::future::Future + Send { - let _ = self.tx.send(msg); + self.tx.send(msg).ok(); async {} } } @@ -423,9 +429,11 @@ impl ClientHandle { let event_broker = system.spawn(willow_actor::Broker::::new()); // Open event store on the persistence actor if we have an active server. if let Some(sid) = &state.active_server { - let _ = persistence_addr.do_send(persistence_actor::OpenEventStore { - server_id: sid.clone(), - }); + persistence_addr + .do_send(persistence_actor::OpenEventStore { + server_id: sid.clone(), + }) + .ok(); } // DAG starts empty for loaded servers. It will be populated via // sync when connect() is called — the sync batch delivers the full @@ -615,6 +623,44 @@ fn load_identity() -> Identity { identity } +/// Reconcile a topic map from raw string channel IDs to typed +/// [`willow_messaging::ChannelId`] values. +/// +/// Entries whose channel ID string is not a valid UUID are skipped with a +/// warning log instead of being silently replaced by a randomly generated +/// UUID, which would cause the client to diverge from the rest of the +/// network (issue #141). +/// +/// This function is a pre-wired utility for callers that deserialize topic +/// maps from wire data (e.g. `accept_invite`, `joining`). It is not yet +/// called from all such sites — integration into the full invite/join flow +/// is tracked as follow-up work. +/// +/// # Arguments +/// +/// * `raw` — map from channel-ID string to an arbitrary value `V` +/// +/// # Returns +/// +/// A new map with only the entries whose key parsed successfully. +#[allow(dead_code)] +pub fn reconcile_topic_map( + raw: &std::collections::HashMap, +) -> std::collections::HashMap { + let mut out = std::collections::HashMap::new(); + for (id_str, value) in raw { + let cid = match uuid::Uuid::parse_str(id_str) { + Ok(u) => willow_messaging::ChannelId(u), + Err(e) => { + tracing::warn!(id_str, error = %e, "skipping unparseable channel id in reconcile_topic_map"); + continue; + } + }; + out.insert(cid, value.clone()); + } + out +} + /// Parse a permission string into a [`willow_state::Permission`]. #[allow(dead_code)] fn parse_permission(s: &str) -> anyhow::Result { @@ -1050,6 +1096,44 @@ mod tests { ); } + /// Regression test for issue #141: `reconcile_topic_map` must skip + /// entries whose channel ID is not a valid UUID instead of silently + /// substituting a randomly generated UUID, which would corrupt the + /// topic map with an ID that no peer has ever seen. + #[test] + fn reconcile_topic_map_skips_malformed_id() { + use std::collections::HashMap; + + let mut raw: HashMap = HashMap::new(); + + // A valid UUID entry — should be retained. + let good_id = uuid::Uuid::new_v4().to_string(); + raw.insert(good_id.clone(), "general".to_string()); + + // A malformed entry — must not appear in the output. + raw.insert("not-a-uuid".to_string(), "corrupted".to_string()); + raw.insert("also!bad@id".to_string(), "also-corrupted".to_string()); + + let result = reconcile_topic_map(&raw); + + // Only the valid entry should survive. + assert_eq!( + result.len(), + 1, + "malformed IDs must be dropped, not included" + ); + + let expected_cid = willow_messaging::ChannelId(uuid::Uuid::parse_str(&good_id).unwrap()); + assert!( + result.contains_key(&expected_cid), + "the valid channel ID must be present in the output" + ); + assert_eq!( + result[&expected_cid], "general", + "the value associated with the valid ID must be preserved" + ); + } + /// Regression test for issue #114: switching `join_links` to /// `parking_lot::Mutex` makes lock poisoning impossible by /// construction. After a panic in one task that holds the lock, a diff --git a/crates/client/src/listeners.rs b/crates/client/src/listeners.rs index bc5c23df..a141a6cc 100644 --- a/crates/client/src/listeners.rs +++ b/crates/client/src/listeners.rs @@ -77,9 +77,9 @@ async fn topic_listener_loop( } }) .await; - let _ = ctx - .event_broker - .do_send(willow_actor::Publish(ClientEvent::PeerConnected(id))); + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::PeerConnected(id))) + .ok(); } GossipEvent::NeighborDown(id) => { let id2 = id; @@ -87,9 +87,9 @@ async fn topic_listener_loop( c.peers.retain(|p| p != &id2); }) .await; - let _ = ctx - .event_broker - .do_send(willow_actor::Publish(ClientEvent::PeerDisconnected(id))); + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::PeerDisconnected(id))) + .ok(); } } } @@ -147,12 +147,12 @@ async fn try_insert_event(ctx: &ListenerCtx, event: willow_state::Event) { // Persist and emit for each applied event. for ev in &all_applied { - let _ = ctx - .persistence - .do_send(persistence_actor::PersistEvent { event: ev.clone() }); + ctx.persistence + .do_send(persistence_actor::PersistEvent { event: ev.clone() }) + .ok(); let client_events = mutations::derive_client_events(ev); for e in client_events { - let _ = ctx.event_broker.do_send(willow_actor::Publish(e)); + ctx.event_broker.do_send(willow_actor::Publish(e)).ok(); } } } @@ -176,12 +176,12 @@ async fn process_received_message( p.names.insert(peer_id, display_name); }) .await; - let _ = ctx - .event_broker + ctx.event_broker .do_send(willow_actor::Publish(ClientEvent::ProfileUpdated { peer_id: profile.peer_id, display_name: profile.display_name, - })); + })) + .ok(); return; } @@ -237,11 +237,11 @@ async fn process_received_message( let _is_synced = willow_actor::state::select(&ctx.dag, |ds| ds.managed.is_synced()).await; - let _ = - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::SyncCompleted { - ops_applied: count, - })); + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::SyncCompleted { + ops_applied: count, + })) + .ok(); } } crate::ops::WireMessage::SyncRequest { state_hash, .. } => { @@ -263,7 +263,7 @@ async fn process_received_message( if !events.is_empty() { let msg = crate::ops::WireMessage::SyncBatch { events }; if let Some(data) = crate::ops::pack_wire(&msg, &ctx.identity) { - let _ = topic.broadcast(bytes::Bytes::from(data)).await; + topic.broadcast(bytes::Bytes::from(data)).await.ok(); } } } @@ -290,12 +290,12 @@ async fn process_received_message( v.participants.entry(ch).or_default().insert(peer_id); }) .await; - let _ = ctx - .event_broker + ctx.event_broker .do_send(willow_actor::Publish(ClientEvent::VoiceJoined { channel_id, peer_id, - })); + })) + .ok(); } crate::ops::WireMessage::VoiceLeave { channel_id, @@ -308,12 +308,12 @@ async fn process_received_message( } }) .await; - let _ = ctx - .event_broker + ctx.event_broker .do_send(willow_actor::Publish(ClientEvent::VoiceLeft { channel_id, peer_id, - })); + })) + .ok(); } crate::ops::WireMessage::VoiceSignal { channel_id, @@ -321,13 +321,13 @@ async fn process_received_message( signal, } => { if target_peer == ctx.identity.endpoint_id() { - let _ = ctx - .event_broker + ctx.event_broker .do_send(willow_actor::Publish(ClientEvent::VoiceSignal { channel_id, from_peer: signer, signal, - })); + })) + .ok(); } } crate::ops::WireMessage::JoinRequest { link_id, peer_id } => { @@ -398,7 +398,7 @@ async fn process_received_message( invite_data, }; if let Some(data) = crate::ops::pack_wire(&msg, &ctx.identity) { - let _ = topic.broadcast(bytes::Bytes::from(data)).await; + topic.broadcast(bytes::Bytes::from(data)).await.ok(); } // Grant SendMessages permission to the joining peer so @@ -432,17 +432,17 @@ async fn process_received_message( }) .await; // Persist. - let _ = ctx - .persistence + ctx.persistence .do_send(crate::persistence_actor::PersistEvent { event: event.clone(), - }); + }) + .ok(); // Broadcast to other peers. if let Some(data) = crate::ops::pack_wire( &crate::ops::WireMessage::Event(event), &ctx.identity, ) { - let _ = topic.broadcast(bytes::Bytes::from(data)).await; + topic.broadcast(bytes::Bytes::from(data)).await.ok(); } } } @@ -453,9 +453,11 @@ async fn process_received_message( invite_data, } => { if target_peer == ctx.identity.endpoint_id() { - let _ = ctx.event_broker.do_send(willow_actor::Publish( - ClientEvent::JoinLinkResponse { invite_data }, - )); + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::JoinLinkResponse { + invite_data, + })) + .ok(); } } crate::ops::WireMessage::JoinDenied { @@ -463,14 +465,17 @@ async fn process_received_message( reason, } => { if target_peer == ctx.identity.endpoint_id() { - let _ = - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::JoinLinkDenied { - reason, - })); + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::JoinLinkDenied { + reason, + })) + .ok(); } } // TopicAnnounce is consumed by the relay; clients ignore it. crate::ops::WireMessage::TopicAnnounce { .. } => {} + // Worker messages travel on the _willow_workers topic and are never + // delivered to the client's server-ops listener. + crate::ops::WireMessage::Worker(_) => {} } } diff --git a/crates/client/src/mutations.rs b/crates/client/src/mutations.rs index ae0e89d6..744c4a79 100644 --- a/crates/client/src/mutations.rs +++ b/crates/client/src/mutations.rs @@ -104,24 +104,34 @@ impl ClientMutations { pub(crate) async fn build_event(&self, kind: EventKind) -> anyhow::Result { let identity = self.identity.clone(); let ts = util::current_time_ms(); - willow_actor::state::mutate(&self.dag, move |ds| { - ds.managed - .create_and_insert(&identity, kind, ts) - .map_err(|e| anyhow::anyhow!("DAG insert failed: {e:?}")) + let dag = self.dag.clone(); + util::with_timeout("build_event", async move { + willow_actor::state::mutate(&dag, move |ds| { + ds.managed + .create_and_insert(&identity, kind, ts) + .map_err(|e| anyhow::anyhow!("DAG insert failed: {e:?}")) + }) + .await }) .await + .map_err(|e| anyhow::anyhow!("{e}"))? } /// Resolve channel name → channel ID via event state. pub(crate) async fn resolve_channel_id(&self, channel: &str) -> anyhow::Result { let ch = channel.to_string(); - let channel_id = willow_actor::state::select(&self.event_state, move |es| { - es.channels - .iter() - .find(|(_, c)| c.name == ch) - .map(|(id, _)| id.clone()) + let event_state = self.event_state.clone(); + let channel_id = util::with_timeout("resolve_channel_id", async move { + willow_actor::state::select(&event_state, move |es| { + es.channels + .iter() + .find(|(_, c)| c.name == ch) + .map(|(id, _)| id.clone()) + }) + .await }) - .await; + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; channel_id.ok_or_else(|| anyhow::anyhow!("channel not found: {channel}")) } @@ -130,17 +140,38 @@ impl ClientMutations { /// already applied the event to its internal state atomically. pub(crate) async fn apply_event(&self, event: &willow_state::Event) { // Sync event_state mirror from ManagedDag's authoritative state. - let state = willow_actor::state::select(&self.dag, |ds| ds.managed.state().clone()).await; - willow_actor::state::mutate(&self.event_state, move |es| { - *es = state; + let dag = self.dag.clone(); + let state = match util::with_timeout("apply_event/select_dag", async move { + willow_actor::state::select(&dag, |ds| ds.managed.state().clone()).await }) - .await; - let _ = self.persistence.do_send(persistence_actor::PersistEvent { - event: event.clone(), - }); + .await + { + Ok(s) => s, + Err(e) => { + tracing::warn!(error = %e, "apply_event: dag state read timed out, skipping event_state sync"); + return; + } + }; + let event_state = self.event_state.clone(); + if util::with_timeout("apply_event/mutate_event_state", async move { + willow_actor::state::mutate(&event_state, move |es| { + *es = state; + }) + .await + }) + .await + .is_err() + { + tracing::warn!("apply_event: event_state mutate timed out"); + } + self.persistence + .do_send(persistence_actor::PersistEvent { + event: event.clone(), + }) + .ok(); let client_events = derive_client_events(event); for e in client_events { - let _ = self.event_broker.do_send(Publish(e)); + self.event_broker.do_send(Publish(e)).ok(); } } @@ -168,14 +199,14 @@ impl ClientMutations { { if let Ok(rt) = tokio::runtime::Handle::try_current() { rt.spawn(async move { - let _ = handle.broadcast(bytes).await; + handle.broadcast(bytes).await.ok(); }); } } #[cfg(target_arch = "wasm32")] { wasm_bindgen_futures::spawn_local(async move { - let _ = handle.broadcast(bytes).await; + handle.broadcast(bytes).await.ok(); }); } } @@ -566,9 +597,9 @@ impl ClientMutations { } }) .await; - let _ = self - .event_broker - .do_send(Publish(ClientEvent::PeerConnected(peer_id))); + self.event_broker + .do_send(Publish(ClientEvent::PeerConnected(peer_id))) + .ok(); } /// Track a peer as offline. @@ -577,9 +608,9 @@ impl ClientMutations { c.peers.retain(|p| p != &peer_id); }) .await; - let _ = self - .event_broker - .do_send(Publish(ClientEvent::PeerDisconnected(peer_id))); + self.event_broker + .do_send(Publish(ClientEvent::PeerDisconnected(peer_id))) + .ok(); } /// Update a peer's display name from a profile broadcast. @@ -589,12 +620,12 @@ impl ClientMutations { p.names.insert(peer_id, name); }) .await; - let _ = self - .event_broker + self.event_broker .do_send(Publish(ClientEvent::ProfileUpdated { peer_id, display_name, - })); + })) + .ok(); } /// Record a typing indicator from a peer. @@ -628,10 +659,12 @@ impl ClientMutations { v.participants.entry(ch).or_default().insert(peer_id); }) .await; - let _ = self.event_broker.do_send(Publish(ClientEvent::VoiceJoined { - channel_id, - peer_id, - })); + self.event_broker + .do_send(Publish(ClientEvent::VoiceJoined { + channel_id, + peer_id, + })) + .ok(); } /// Handle a voice leave event from a peer. @@ -643,10 +676,12 @@ impl ClientMutations { } }) .await; - let _ = self.event_broker.do_send(Publish(ClientEvent::VoiceLeft { - channel_id, - peer_id, - })); + self.event_broker + .do_send(Publish(ClientEvent::VoiceLeft { + channel_id, + peer_id, + })) + .ok(); } } diff --git a/crates/client/src/servers.rs b/crates/client/src/servers.rs index b24beb75..189bad5e 100644 --- a/crates/client/src/servers.rs +++ b/crates/client/src/servers.rs @@ -140,11 +140,11 @@ impl ClientHandle { self.mutation_handle.apply_event(&event).await; // Open event store on persistence actor. - let _ = self - .persistence_addr + self.persistence_addr .do_send(persistence_actor::OpenEventStore { server_id: server_id.clone(), - }); + }) + .ok(); Ok(server_id) } diff --git a/crates/client/src/storage.rs b/crates/client/src/storage.rs index de847406..a46f8b3a 100644 --- a/crates/client/src/storage.rs +++ b/crates/client/src/storage.rs @@ -206,7 +206,7 @@ pub struct StoredMessage { #[cfg(not(target_arch = "wasm32"))] pub fn open_message_db() -> Option { let dir = data_dir(); - let _ = std::fs::create_dir_all(&dir); + std::fs::create_dir_all(&dir).ok(); let path = dir.join("messages.db"); MessageDb::open(&path) } @@ -241,23 +241,23 @@ impl MessageDb { ) .ok()?; // Migration: add msg_id column if it doesn't exist (existing DBs). - let _ = - conn.execute_batch("ALTER TABLE messages ADD COLUMN msg_id TEXT NOT NULL DEFAULT '';"); + conn.execute_batch("ALTER TABLE messages ADD COLUMN msg_id TEXT NOT NULL DEFAULT '';") + .ok(); Some(Self { conn }) } /// Insert a message, deduplicating by msg_id. pub fn insert(&self, msg: &StoredMessage) { if msg.msg_id.is_empty() { - let _ = self.conn.execute( + self.conn.execute( "INSERT INTO messages (topic, author, body, is_local, timestamp_ms) VALUES (?1, ?2, ?3, ?4, ?5)", rusqlite::params![msg.topic, msg.author, msg.body, msg.is_local as i32, msg.timestamp_ms], - ); + ).ok(); } else { - let _ = self.conn.execute( + self.conn.execute( "INSERT OR IGNORE INTO messages (topic, author, body, is_local, timestamp_ms, msg_id) VALUES (?1, ?2, ?3, ?4, ?5, ?6)", rusqlite::params![msg.topic, msg.author, msg.body, msg.is_local as i32, msg.timestamp_ms, msg.msg_id], - ); + ).ok(); } } @@ -391,7 +391,7 @@ pub fn open_message_db() -> Option { #[allow(dead_code)] pub fn save_download(filename: &str, data: &[u8]) -> Option { let dir = dirs::download_dir().unwrap_or_else(|| data_dir().join("downloads")); - let _ = std::fs::create_dir_all(&dir); + std::fs::create_dir_all(&dir).ok(); let path = dir.join(filename); std::fs::write(&path, data).ok()?; Some(path) @@ -409,8 +409,8 @@ fn data_dir() -> std::path::PathBuf { #[cfg(not(target_arch = "wasm32"))] fn save_raw(key: &str, data: &[u8]) { let dir = data_dir(); - let _ = std::fs::create_dir_all(&dir); - let _ = std::fs::write(dir.join(format!("{key}.bin")), data); + std::fs::create_dir_all(&dir).ok(); + std::fs::write(dir.join(format!("{key}.bin")), data).ok(); } #[cfg(not(target_arch = "wasm32"))] diff --git a/crates/client/src/util.rs b/crates/client/src/util.rs index fa95d4e6..34293e8e 100644 --- a/crates/client/src/util.rs +++ b/crates/client/src/util.rs @@ -2,6 +2,36 @@ //! //! Shared helper functions for the Willow client. +use std::time::Duration; + +/// Default timeout for actor calls (native only; WASM awaits without timeout). +pub const ACTOR_CALL_TIMEOUT: Duration = Duration::from_secs(5); + +/// Run `f` with a timeout, returning `Err(ClientError::ActorTimeout(label))` +/// if it does not complete within [`ACTOR_CALL_TIMEOUT`]. +/// +/// On WASM there are no tokio timers, so the future is simply awaited with +/// no timeout applied. +pub async fn with_timeout(label: &'static str, f: F) -> Result +where + F: std::future::Future, +{ + #[cfg(not(target_arch = "wasm32"))] + { + tokio::time::timeout(ACTOR_CALL_TIMEOUT, f) + .await + .map_err(|_| { + tracing::warn!(label, "actor call timed out"); + crate::ClientError::ActorTimeout(label) + }) + } + #[cfg(target_arch = "wasm32")] + { + let _ = label; // WASM has no tokio timers — await without timeout for now. + Ok(f.await) + } +} + /// Truncate a peer ID for display. pub fn truncate_peer_id(s: &str) -> String { if s.len() > 12 { @@ -79,4 +109,16 @@ mod tests { // 25 hours = 90000 seconds = 90000000 ms -> wraps to 01:00 assert_eq!(format_timestamp(90_000_000), "01:00"); } + + /// `with_timeout` must return `Err(ClientError::ActorTimeout)` when the + /// wrapped future never resolves. + #[cfg(not(target_arch = "wasm32"))] + #[tokio::test] + async fn with_timeout_fires_on_stall() { + let result = with_timeout("test_label", std::future::pending::<()>()).await; + assert!( + matches!(result, Err(crate::ClientError::ActorTimeout("test_label"))), + "expected ActorTimeout, got {result:?}" + ); + } } diff --git a/crates/common/src/wire.rs b/crates/common/src/wire.rs index 4e9ba317..b13bb384 100644 --- a/crates/common/src/wire.rs +++ b/crates/common/src/wire.rs @@ -75,6 +75,12 @@ pub enum WireMessage { /// Topic name strings (e.g. "{server_id}/{channel_name}"). topics: Vec, }, + /// A signed worker node message (announcement, departure, request, or response). + /// + /// Worker gossip messages travel on the `_willow_workers` topic. + /// They are wrapped in this variant so they share the same Ed25519-signed + /// envelope as all other gossipsub messages. + Worker(crate::WorkerWireMessage), } /// WebRTC signaling payload for voice chat negotiation. @@ -200,6 +206,34 @@ mod tests { assert!(unpack_wire(&data).is_none()); } + #[test] + fn pack_unpack_worker_message_round_trip() { + use crate::{WorkerAnnouncement, WorkerRoleInfo, WorkerWireMessage}; + let id = Identity::generate(); + let announcement = WorkerAnnouncement { + peer_id: id.endpoint_id(), + role: WorkerRoleInfo::Replay { + servers_loaded: 2, + events_buffered: 100, + max_events: 1000, + }, + servers: vec!["srv-abc".to_string()], + timestamp: 12345, + }; + let msg = WireMessage::Worker(WorkerWireMessage::Announcement(announcement.clone())); + let data = pack_wire(&msg, &id).unwrap(); + let (decoded, signer) = unpack_wire(&data).unwrap(); + assert_eq!(signer, id.endpoint_id()); + match decoded { + WireMessage::Worker(WorkerWireMessage::Announcement(a)) => { + assert_eq!(a.peer_id, announcement.peer_id); + assert_eq!(a.servers, announcement.servers); + assert_eq!(a.timestamp, announcement.timestamp); + } + _ => panic!("expected Worker(Announcement)"), + } + } + #[test] fn empty_data_fails_unpack() { assert!(unpack_wire(&[]).is_none()); diff --git a/crates/common/src/worker_types.rs b/crates/common/src/worker_types.rs index 97f215a1..e30f3eca 100644 --- a/crates/common/src/worker_types.rs +++ b/crates/common/src/worker_types.rs @@ -52,10 +52,11 @@ pub struct WorkerAnnouncement { /// Top-level wire message for the `_willow_workers` gossipsub topic. /// -/// **Security note:** These messages are not signed. A malicious peer on -/// the gossipsub topic can forge Announcements, Requests, and Responses. -/// Future work should add Ed25519 signatures (similar to `pack_wire`/ -/// `unpack_wire` for server ops) or authenticated gossipsub channels. +/// **Security note:** These messages are signed with Ed25519. All messages +/// are wrapped in a [`crate::WireMessage::Worker`] variant and signed via +/// [`crate::pack_wire`] before broadcast. Recipients verify signatures via +/// [`crate::unpack_wire`], which returns an error if the signature is invalid. +/// Unsigned, tampered, or wrong-variant messages are rejected. #[derive(Debug, Clone, Serialize, Deserialize)] pub enum WorkerWireMessage { /// Periodic heartbeat. diff --git a/crates/crypto/src/lib.rs b/crates/crypto/src/lib.rs index 5bd30236..48cce831 100644 --- a/crates/crypto/src/lib.rs +++ b/crates/crypto/src/lib.rs @@ -406,6 +406,159 @@ pub fn decrypt_channel_key( Ok(ChannelKey(key_bytes)) } +// ───── RatchetCache ───────────────────────────────────────────────────────── + +/// A per-channel cache of derived ratchet keys that avoids O(counter) replay +/// cost on every decryption. +/// +/// [`derive_message_key`] replays the ratchet from counter=1 on every call, +/// which costs 2 HKDF-Expand operations per step. For a receiver on a chatty +/// channel this repeated work accumulates quickly — 1M counter steps cost +/// roughly 1 second of CPU time. +/// +/// `RatchetCache` remembers the last-derived keys and the ratchet state at the +/// highest cached counter per epoch. When asked for a key at counter `c` it +/// checks the cache first; on a miss it resumes from the highest cached entry +/// rather than rewinding all the way to counter=1. +/// +/// The cache is bounded to `max_entries` entries. When the bound is exceeded +/// the lowest-counter entry is evicted (oldest messages are least likely to be +/// needed again). +/// +/// This type is local to the receiver and has no wire-format impact. +/// +/// # Example +/// +/// ``` +/// use willow_crypto::{generate_channel_key, RatchetCache}; +/// +/// let key = generate_channel_key(); +/// let mut cache = RatchetCache::new(128); +/// +/// // First call replays the ratchet; subsequent calls for the same +/// // (epoch, counter) return the cached key without HKDF work. +/// let k1 = cache.derive_or_cached(&key, 0, 5); +/// let k2 = cache.derive_or_cached(&key, 0, 5); +/// assert_eq!(k1.as_bytes(), k2.as_bytes()); +/// ``` +pub struct RatchetCache { + /// Cached message keys keyed by `(epoch, counter)`. + cache: std::collections::BTreeMap<(u32, u64), ChannelKey>, + /// Saved ratchet state at the highest counter per epoch, so we can + /// advance forward without rewinding. + /// + /// Value is `(counter_at_save, ratchet_ready_to_produce_counter+1)`. + ratchet_states: std::collections::HashMap, + max_entries: usize, +} + +impl std::fmt::Debug for RatchetCache { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RatchetCache") + .field("entries", &self.cache.len()) + .field("max_entries", &self.max_entries) + .finish() + } +} + +impl RatchetCache { + /// Create a new cache with a maximum of `max_entries` cached keys. + pub fn new(max_entries: usize) -> Self { + Self { + cache: std::collections::BTreeMap::new(), + ratchet_states: std::collections::HashMap::new(), + max_entries, + } + } + + /// Return the message key for `(epoch, target_counter)`, deriving and + /// caching it if not already present. + /// + /// Uses any previously saved ratchet state for `epoch` to resume from the + /// highest already-computed counter rather than replaying from counter=1. + /// + /// `target_counter = 0` is the sentinel for "no ratchet" and returns the + /// channel key directly without advancing the ratchet. + pub fn derive_or_cached( + &mut self, + channel_key: &ChannelKey, + epoch: u32, + target_counter: u64, + ) -> ChannelKey { + // counter=0 is the "no ratchet" sentinel — return the channel key as-is. + if target_counter == 0 { + return channel_key.clone(); + } + + // Cache hit: return immediately without any HKDF work. + if let Some(key) = self.cache.get(&(epoch, target_counter)) { + return key.clone(); + } + + // Choose the best starting point: either a saved ratchet state for + // this epoch (if its counter is below the target) or a fresh ratchet. + let mut ratchet = match self.ratchet_states.get(&epoch) { + Some((saved_counter, saved_ratchet)) if *saved_counter < target_counter => { + saved_ratchet.clone() + } + _ => KeyRatchet::new(channel_key, epoch), + }; + + // Advance the ratchet until we hit target_counter, caching each key + // along the way so future calls within this range are O(1). + let mut result: Option = None; + loop { + let (key, _ep, counter) = ratchet.next_key(); + + // Evict oldest entry when the cache is full. + if self.cache.len() >= self.max_entries { + if let Some(oldest) = self.cache.keys().next().copied() { + self.cache.remove(&oldest); + } + } + self.cache.insert((epoch, counter), key.clone()); + + if counter == target_counter { + result = Some(key); + } + + if counter >= target_counter { + // Save ratchet state only at the final step — the ratchet is + // now positioned to produce counter+1 on the next call. + let should_save = match self.ratchet_states.get(&epoch) { + Some((prev, _)) => counter > *prev, + None => true, + }; + if should_save { + self.ratchet_states.insert(epoch, (counter, ratchet)); + } + break; + } + } + + // result is always Some here: the loop exits only when counter == target_counter, + // at which point result was set. The unwrap_or_else is a safety net. + result.unwrap_or_else(|| derive_message_key(channel_key, epoch, target_counter)) + } + + /// Invalidate all cached keys for `epoch` (e.g., on key rotation / + /// re-seed). The next call for this epoch will replay from counter=1. + pub fn evict_epoch(&mut self, epoch: u32) { + self.cache.retain(|(e, _), _| *e != epoch); + self.ratchet_states.remove(&epoch); + } + + /// Number of entries currently in the cache. + pub fn len(&self) -> usize { + self.cache.len() + } + + /// Returns `true` if the cache contains no entries. + pub fn is_empty(&self) -> bool { + self.cache.is_empty() + } +} + // ───── Tests ──────────────────────────────────────────────────────────────── #[cfg(test)] @@ -875,4 +1028,201 @@ mod tests { let decrypted = open_content_bounded(&sealed, &key, 0).unwrap(); assert_eq!(decrypted, content); } + + // ── Issue #120: RatchetCache tests ───────────────────────────── + + /// `derive_or_cached` must return the same key as `derive_message_key` + /// for a given (epoch, counter). + #[test] + fn ratchet_cache_matches_derive_message_key() { + let key = generate_channel_key(); + let mut cache = RatchetCache::new(128); + + for counter in 1u64..=10 { + let cached = cache.derive_or_cached(&key, 0, counter); + let direct = derive_message_key(&key, 0, counter); + assert_eq!( + cached.as_bytes(), + direct.as_bytes(), + "mismatch at counter {counter}" + ); + } + } + + /// A second call for the same (epoch, counter) returns the cached value + /// without re-deriving it (cache hit). + #[test] + fn ratchet_cache_hit_returns_same_key() { + let key = generate_channel_key(); + let mut cache = RatchetCache::new(128); + + let first = cache.derive_or_cached(&key, 0, 5); + let second = cache.derive_or_cached(&key, 0, 5); + assert_eq!(first.as_bytes(), second.as_bytes()); + } + + /// After warming the cache to counter N, requesting counter N+1 should + /// cost only one HKDF step, not N+1 steps. + #[test] + fn ratchet_cache_advances_incrementally() { + let key = generate_channel_key(); + let mut cache = RatchetCache::new(256); + + // Warm to counter 50. + let _ = cache.derive_or_cached(&key, 0, 50); + + // All intermediate keys should now be cached. + for c in 1u64..=50 { + assert!( + cache.cache.contains_key(&(0, c)), + "counter {c} should be in cache" + ); + } + + // Requesting counter 51 should produce the correct key. + let k51_cached = cache.derive_or_cached(&key, 0, 51); + let k51_direct = derive_message_key(&key, 0, 51); + assert_eq!(k51_cached.as_bytes(), k51_direct.as_bytes()); + } + + /// Keys from different epochs must not collide. + #[test] + fn ratchet_cache_epoch_isolation() { + let key = generate_channel_key(); + let mut cache = RatchetCache::new(128); + + let k_epoch0 = cache.derive_or_cached(&key, 0, 1); + let k_epoch1 = cache.derive_or_cached(&key, 1, 1); + assert_ne!( + k_epoch0.as_bytes(), + k_epoch1.as_bytes(), + "keys from different epochs must differ" + ); + } + + /// `evict_epoch` removes all cached entries for that epoch and lets the + /// next call start fresh. + #[test] + fn ratchet_cache_evict_epoch() { + let key = generate_channel_key(); + let mut cache = RatchetCache::new(128); + + // Populate epoch 0 and epoch 1. + let _ = cache.derive_or_cached(&key, 0, 5); + let _ = cache.derive_or_cached(&key, 1, 5); + + assert!(cache.cache.contains_key(&(0, 5))); + assert!(cache.cache.contains_key(&(1, 5))); + + cache.evict_epoch(0); + + assert!( + !cache.cache.contains_key(&(0, 5)), + "epoch 0 entry should be evicted" + ); + assert!( + cache.cache.contains_key(&(1, 5)), + "epoch 1 entry should survive" + ); + + // After eviction, derive_or_cached must still return the correct key. + let fresh = cache.derive_or_cached(&key, 0, 5); + let direct = derive_message_key(&key, 0, 5); + assert_eq!(fresh.as_bytes(), direct.as_bytes()); + } + + /// The cache must not grow beyond `max_entries`. + #[test] + fn ratchet_cache_respects_max_entries() { + let key = generate_channel_key(); + let max = 10usize; + let mut cache = RatchetCache::new(max); + + // Derive 20 keys sequentially; the cache should stay at <= max. + for c in 1u64..=20 { + let _ = cache.derive_or_cached(&key, 0, c); + assert!( + cache.len() <= max, + "cache size {} exceeded max {max} at counter {c}", + cache.len() + ); + } + } + + /// `is_empty` and `len` report correct values. + #[test] + fn ratchet_cache_len_and_is_empty() { + let key = generate_channel_key(); + let mut cache = RatchetCache::new(128); + + assert!(cache.is_empty()); + assert_eq!(cache.len(), 0); + + let _ = cache.derive_or_cached(&key, 0, 3); + assert!(!cache.is_empty()); + // deriving counter 3 from scratch advances through counters 1, 2, 3 + assert_eq!(cache.len(), 3); + } + + /// `RatchetCache` must be `Send + Sync` for use in async contexts. + #[test] + fn ratchet_cache_is_send_and_sync() { + fn assert_send_sync() {} + assert_send_sync::(); + } + + /// Second call for the same counter is O(1): it should be at least + /// 100x faster than the first call (warmup), validating the cache + /// hit path as described in issue #120. + #[test] + fn cached_derive_is_fast_after_warmup() { + let key = generate_channel_key(); + let mut cache = RatchetCache::new(2048); + + // Warm the cache up to counter 1_000. + let warmup_start = std::time::Instant::now(); + let _ = cache.derive_or_cached(&key, 0, 1_000); + let warmup = warmup_start.elapsed(); + + // Repeat derivation — should be essentially free (cache hit). + let repeat_start = std::time::Instant::now(); + let _ = cache.derive_or_cached(&key, 0, 1_000); + let repeat = repeat_start.elapsed(); + + assert!( + repeat < warmup / 100, + "cache hit ({repeat:?}) should be much faster than warmup ({warmup:?})" + ); + } + + /// `counter = 0` is the "no ratchet" sentinel — must return the channel + /// key unchanged without advancing the ratchet. + #[test] + fn ratchet_cache_counter_zero_returns_channel_key() { + let key = generate_channel_key(); + let mut cache = RatchetCache::new(64); + let result = cache.derive_or_cached(&key, 0, 0); + assert_eq!(result, key, "counter=0 should return the channel key as-is"); + // Cache should remain empty — no ratchet work done. + assert!(cache.is_empty()); + } + + /// Requesting a counter lower than the highest previously derived counter + /// (backwards/out-of-order request) correctly falls back to a fresh + /// ratchet replay and still returns the right key. + #[test] + fn ratchet_cache_backwards_request_returns_correct_key() { + let key = generate_channel_key(); + let mut cache = RatchetCache::new(256); + + // Advance to counter 50. + let k50 = cache.derive_or_cached(&key, 0, 50); + let k50_direct = derive_message_key(&key, 0, 50); + assert_eq!(k50, k50_direct); + + // Now request counter 10, which is below the saved ratchet state. + let k10 = cache.derive_or_cached(&key, 0, 10); + let k10_direct = derive_message_key(&key, 0, 10); + assert_eq!(k10, k10_direct, "out-of-order request returned wrong key"); + } } diff --git a/crates/identity/src/lib.rs b/crates/identity/src/lib.rs index d1945821..02ef5654 100644 --- a/crates/identity/src/lib.rs +++ b/crates/identity/src/lib.rs @@ -275,11 +275,11 @@ impl Identity { let write_result = identity.with_secret_bytes(|bytes| -> std::io::Result<()> { file.write_all(bytes) }); if let Err(e) = write_result { - let _ = fs::remove_file(&tmp_path); + fs::remove_file(&tmp_path).ok(); return Err(IdentityError::Other(format!("write temp key file: {e}"))); } if let Err(e) = file.sync_all() { - let _ = fs::remove_file(&tmp_path); + fs::remove_file(&tmp_path).ok(); return Err(IdentityError::Other(format!("fsync temp key file: {e}"))); } // Drop the file handle before rename — Windows requires it, @@ -292,13 +292,13 @@ impl Identity { { use std::os::unix::fs::PermissionsExt; if let Err(e) = fs::set_permissions(&tmp_path, fs::Permissions::from_mode(0o600)) { - let _ = fs::remove_file(&tmp_path); + fs::remove_file(&tmp_path).ok(); return Err(IdentityError::Other(format!("chmod temp key file: {e}"))); } } if let Err(e) = fs::rename(&tmp_path, path) { - let _ = fs::remove_file(&tmp_path); + fs::remove_file(&tmp_path).ok(); return Err(IdentityError::Other(format!("rename key file: {e}"))); } Ok(()) diff --git a/crates/messaging/src/store.rs b/crates/messaging/src/store.rs index 89cff13c..978f60d0 100644 --- a/crates/messaging/src/store.rs +++ b/crates/messaging/src/store.rs @@ -6,9 +6,9 @@ //! must implement. [`InMemoryStore`] is a simple reference implementation that //! keeps everything in RAM — perfect for tests and lightweight nodes. -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; -use crate::{ChannelId, Message, MessageId}; +use crate::{hlc::HlcTimestamp, ChannelId, Message, MessageId}; /// Errors that can occur during storage operations. #[derive(Debug, thiserror::Error)] @@ -48,8 +48,9 @@ pub trait MessageStore: Send + Sync { /// A simple in-memory message store. /// -/// Messages are stored in a `HashMap` keyed by [`MessageId`] and indexed by -/// [`ChannelId`] for fast channel listing. +/// Messages are stored in a `HashMap` keyed by [`MessageId`] for O(1) lookup, +/// and indexed by [`ChannelId`] using a `BTreeMap>` +/// for naturally sorted channel iteration without per-insert sorting. /// /// **Not persistent** — all data is lost when the process exits. Use this for /// testing or as a starting point before implementing a disk-backed store. @@ -74,8 +75,13 @@ pub trait MessageStore: Send + Sync { pub struct InMemoryStore { /// All messages keyed by their unique ID. messages: HashMap, - /// Index: channel ID → ordered list of message IDs. - channel_index: HashMap>, + /// Index: channel ID → BTreeMap of HLC timestamp → message IDs. + /// + /// Using `BTreeMap` gives naturally sorted iteration by timestamp, so + /// `insert` is O(log N) instead of the previous O(N log N) sort-on-every-insert. + /// The `Vec` value handles the (rare) case where two messages share + /// the exact same HLC timestamp. + channel_index: HashMap>>, } impl InMemoryStore { @@ -93,18 +99,16 @@ impl MessageStore for InMemoryStore { let id = message.id.clone(); let channel_id = message.channel_id.clone(); + let hlc = message.hlc; self.messages.insert(id.clone(), message); - let ids = self.channel_index.entry(channel_id).or_default(); - ids.push(id); - - // Keep the channel index sorted by HLC timestamp. - ids.sort_by(|a, b| { - let ma = &self.messages[a]; - let mb = &self.messages[b]; - ma.hlc.cmp(&mb.hlc) - }); + self.channel_index + .entry(channel_id) + .or_default() + .entry(hlc) + .or_default() + .push(id); Ok(()) } @@ -116,10 +120,13 @@ impl MessageStore for InMemoryStore { } fn list_channel(&self, channel_id: &ChannelId) -> Vec<&Message> { - self.channel_index - .get(channel_id) - .map(|ids| ids.iter().filter_map(|id| self.messages.get(id)).collect()) - .unwrap_or_default() + match self.channel_index.get(channel_id) { + None => Vec::new(), + Some(by_ts) => by_ts + .values() + .flat_map(|ids| ids.iter().filter_map(|id| self.messages.get(id))) + .collect(), + } } fn len(&self) -> usize { @@ -231,4 +238,77 @@ mod tests { assert!(!store.is_empty()); assert_eq!(store.len(), 1); } + + #[test] + fn messages_returned_in_hlc_order_after_random_insertion() { + let mut store = InMemoryStore::new(); + let channel = ChannelId::new(); + let peer = Identity::generate().endpoint_id(); + + // Construct messages with explicit out-of-order HLC timestamps. + let make_msg = |millis: u64, counter: u32| -> Message { + let mut m = Message::text(channel.clone(), peer, "test", &mut HLC::new()); + m.hlc = HlcTimestamp { millis, counter }; + m + }; + + let msg_t5 = make_msg(1000, 5); + let msg_t1 = make_msg(1000, 1); + let msg_t3 = make_msg(1000, 3); + let msg_t200 = make_msg(2000, 0); + let msg_t0 = make_msg(500, 0); + + // Insert in deliberately scrambled order. + store.insert(msg_t5.clone()).unwrap(); + store.insert(msg_t200.clone()).unwrap(); + store.insert(msg_t1.clone()).unwrap(); + store.insert(msg_t0.clone()).unwrap(); + store.insert(msg_t3.clone()).unwrap(); + + let listed = store.list_channel(&channel); + assert_eq!(listed.len(), 5); + + // Verify strictly ascending HLC order. + for window in listed.windows(2) { + assert!( + window[0].hlc <= window[1].hlc, + "expected {:?} <= {:?}", + window[0].hlc, + window[1].hlc + ); + } + + // Verify the exact order: t0, t1, t3, t5, t200. + assert_eq!(listed[0].id, msg_t0.id); + assert_eq!(listed[1].id, msg_t1.id); + assert_eq!(listed[2].id, msg_t3.id); + assert_eq!(listed[3].id, msg_t5.id); + assert_eq!(listed[4].id, msg_t200.id); + } + + #[test] + fn duplicate_hlc_timestamps_handled_gracefully() { + let mut store = InMemoryStore::new(); + let channel = ChannelId::new(); + let peer = Identity::generate().endpoint_id(); + + let ts = HlcTimestamp { + millis: 9999, + counter: 0, + }; + + // Two different messages sharing the exact same HLC timestamp. + let mut m1 = Message::text(channel.clone(), peer, "first", &mut HLC::new()); + m1.hlc = ts; + let mut m2 = Message::text(channel.clone(), peer, "second", &mut HLC::new()); + m2.hlc = ts; + + store.insert(m1).unwrap(); + store.insert(m2).unwrap(); + + let listed = store.list_channel(&channel); + assert_eq!(listed.len(), 2, "both messages must be present"); + assert_eq!(listed[0].hlc, ts); + assert_eq!(listed[1].hlc, ts); + } } diff --git a/crates/network/src/iroh.rs b/crates/network/src/iroh.rs index 4ccd3dce..52caac39 100644 --- a/crates/network/src/iroh.rs +++ b/crates/network/src/iroh.rs @@ -342,12 +342,6 @@ impl Network for IrohNetwork { &self.blob_store } - async fn connection_events(&self) -> ConnectionEventStream { - // Placeholder: return a stream that never yields. - // Full implementation would monitor endpoint relay and direct connection state. - Box::pin(futures_lite::stream::pending()) - } - async fn shutdown(&self) -> Result<()> { // Drop all subscriptions first. { diff --git a/crates/network/src/lib.rs b/crates/network/src/lib.rs index edc87880..a92c8974 100644 --- a/crates/network/src/lib.rs +++ b/crates/network/src/lib.rs @@ -27,6 +27,5 @@ pub use topics::{ channel_topic, topic_id, voice_topic, PROFILES_TOPIC, SERVER_OPS_TOPIC, WORKERS_TOPIC, }; pub use traits::{ - BlobHash, BlobStore, ConnectionEvent, ConnectionEventStream, GossipEvent, GossipMessage, - Network, TopicEvents, TopicHandle, + BlobHash, BlobStore, GossipEvent, GossipMessage, Network, TopicEvents, TopicHandle, }; diff --git a/crates/network/src/mem.rs b/crates/network/src/mem.rs index db05b359..cb4abe41 100644 --- a/crates/network/src/mem.rs +++ b/crates/network/src/mem.rs @@ -375,11 +375,6 @@ impl Network for MemNetwork { &self.blobs } - async fn connection_events(&self) -> ConnectionEventStream { - // MemNetwork is always "connected" — return a stream that never yields. - Box::pin(futures_lite::stream::pending()) - } - async fn shutdown(&self) -> Result<()> { let subs = self.subscriptions.lock().unwrap().clone(); for topic in subs { diff --git a/crates/network/src/traits.rs b/crates/network/src/traits.rs index 45e5ad23..c5ca7383 100644 --- a/crates/network/src/traits.rs +++ b/crates/network/src/traits.rs @@ -112,25 +112,6 @@ pub trait BlobStore: Send + Sync { async fn store_size(&self) -> Option; } -// ───── Connection events ──────────────────────────────────────────────────── - -/// Network connectivity events. -#[derive(Debug, Clone)] -pub enum ConnectionEvent { - /// Connected to a relay server. - RelayConnected, - /// Disconnected from the relay server. - RelayDisconnected, - /// A direct QUIC connection was established to a peer. - DirectConnected(EndpointId), - /// A direct QUIC connection to a peer was lost. - DirectDisconnected(EndpointId), -} - -/// A stream of connection events. -pub type ConnectionEventStream = - std::pin::Pin + Send>>; - // ───── Network ────────────────────────────────────────────────────────────── /// Top-level network handle. Assembled once, passed to client/workers. @@ -160,8 +141,9 @@ pub trait Network: Send + Sync + 'static { /// Access the blob store. fn blobs(&self) -> &dyn BlobStore; - /// Stream of connectivity events (relay up/down, peer connects). - async fn connection_events(&self) -> ConnectionEventStream; + // TODO(#119): add connection_events() — stream relay up/down and direct + // peer connect/disconnect events so the client can surface connectivity + // status in the UI. /// Gracefully shut down the network. async fn shutdown(&self) -> Result<()>; diff --git a/crates/state/Cargo.toml b/crates/state/Cargo.toml index df4261a5..0c1725e1 100644 --- a/crates/state/Cargo.toml +++ b/crates/state/Cargo.toml @@ -7,6 +7,7 @@ description = "Pure deterministic event-sourced state machine for the Willow P2P [dependencies] bincode = { workspace = true } +tracing = { workspace = true } iroh-base = { workspace = true, features = ["key"] } serde = { workspace = true } sha2 = "0.10" diff --git a/crates/state/src/materialize.rs b/crates/state/src/materialize.rs index d18e9edb..a225eedb 100644 --- a/crates/state/src/materialize.rs +++ b/crates/state/src/materialize.rs @@ -5,7 +5,7 @@ //! [`apply_event`], producing identical output on all peers given the //! same DAG contents. -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use willow_identity::EndpointId; @@ -306,6 +306,13 @@ fn apply_mutation(state: &mut ServerState, event: &Event) -> ApplyResult { EventKind::DeleteChannel { channel_id } => { state.channels.remove(channel_id); state.messages.retain(|m| m.channel_id != *channel_id); + // Rebuild message_index because retain may have shifted indexes. + state.message_index = state + .messages + .iter() + .enumerate() + .map(|(i, m)| (m.id, i)) + .collect::>(); } EventKind::RenameChannel { @@ -392,6 +399,7 @@ fn apply_mutation(state: &mut ServerState, event: &Event) -> ApplyResult { body, reply_to, } => { + let idx = state.messages.len(); state.messages.push(ChatMessage { id: event.hash, channel_id: channel_id.clone(), @@ -403,32 +411,39 @@ fn apply_mutation(state: &mut ServerState, event: &Event) -> ApplyResult { reactions: BTreeMap::new(), reply_to: *reply_to, }); + state.message_index.insert(event.hash, idx); } EventKind::EditMessage { message_id, new_body, } => { - if let Some(msg) = state.messages.iter_mut().find(|m| m.id == *message_id) { - msg.body = new_body.clone(); - msg.edited = true; + if let Some(&idx) = state.message_index.get(message_id) { + if let Some(msg) = state.messages.get_mut(idx) { + msg.body = new_body.clone(); + msg.edited = true; + } } } EventKind::DeleteMessage { message_id } => { - if let Some(msg) = state.messages.iter_mut().find(|m| m.id == *message_id) { - msg.deleted = true; - msg.body = "[message deleted]".to_string(); - msg.reactions.clear(); + if let Some(&idx) = state.message_index.get(message_id) { + if let Some(msg) = state.messages.get_mut(idx) { + msg.deleted = true; + msg.body = "[message deleted]".to_string(); + msg.reactions.clear(); + } } } EventKind::Reaction { message_id, emoji } => { - if let Some(msg) = state.messages.iter_mut().find(|m| m.id == *message_id) { - msg.reactions - .entry(emoji.clone()) - .or_default() - .insert(event.author); + if let Some(&idx) = state.message_index.get(message_id) { + if let Some(msg) = state.messages.get_mut(idx) { + msg.reactions + .entry(emoji.clone()) + .or_default() + .insert(event.author); + } } } diff --git a/crates/state/src/server.rs b/crates/state/src/server.rs index 7312c418..57d2dd83 100644 --- a/crates/state/src/server.rs +++ b/crates/state/src/server.rs @@ -4,7 +4,7 @@ //! governance state, and profiles. It is derived from a [`EventDag`](crate::dag::EventDag) //! via [`materialize`](crate::materialize::materialize). -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use serde::{Deserialize, Serialize}; use willow_identity::EndpointId; @@ -69,6 +69,23 @@ pub struct ServerState { /// guarantee idempotency — applying the same event twice is a no-op. #[serde(default, skip)] pub applied_events: BTreeSet, + + // -- Fast-lookup indexes -- + /// Maps each message's [`EventHash`] to its index in [`messages`]. + /// + /// Kept in sync with every insertion into `messages` so that + /// `EditMessage`, `DeleteMessage`, and `Reaction` handlers can find + /// their target in O(1) instead of O(N). + /// + /// This field is excluded from serialization (`#[serde(skip)]`). It is + /// always empty on a freshly deserialized `ServerState` and is rebuilt + /// incrementally by each subsequent `apply_event` call, or all at once + /// by a full `materialize()`. Code that deserializes `ServerState` and + /// then calls `apply_incremental` without a prior full materialize will + /// silently no-op on `EditMessage`/`DeleteMessage`/`Reaction` events until + /// the index is warmed. The intended usage is always through `materialize`. + #[serde(default, skip)] + pub message_index: HashMap, } impl ServerState { @@ -104,6 +121,7 @@ impl ServerState { channel_keys: BTreeMap::new(), pending_proposals: BTreeMap::new(), applied_events: BTreeSet::new(), + message_index: HashMap::new(), } } diff --git a/crates/state/src/sync.rs b/crates/state/src/sync.rs index e3a4b960..278f4ac5 100644 --- a/crates/state/src/sync.rs +++ b/crates/state/src/sync.rs @@ -181,7 +181,14 @@ impl PendingBuffer { .push(event); self.cached_count += 1; if let Some(limit) = self.max_pending { - self.evict_to(limit); + let evicted = self.evict_to(limit); + if evicted > 0 { + tracing::warn!( + evicted, + buffer_size = self.cached_count, + "pending buffer at capacity; dropped oldest events" + ); + } } } diff --git a/crates/state/src/tests.rs b/crates/state/src/tests.rs index dddc3b09..04c8b08f 100644 --- a/crates/state/src/tests.rs +++ b/crates/state/src/tests.rs @@ -2689,3 +2689,238 @@ fn create_and_insert_succeeds_with_permission() { result.err() ); } + +// ── Issue #122: O(1) message lookup via message_index ─────────────────── + +#[test] +fn message_index_populated_on_insert() { + let admin = Identity::generate(); + let mut dag = test_dag(&admin); + + let msg = do_emit( + &mut dag, + &admin, + EventKind::Message { + channel_id: "ch".into(), + body: "hello".into(), + reply_to: None, + }, + ); + + let state = materialize(&dag); + assert_eq!(state.message_index.len(), 1); + assert_eq!(state.message_index[&msg.hash], 0); +} + +#[test] +fn message_index_reaction_is_fast_with_many_messages() { + // Insert many messages then apply a reaction — verify the index is + // correct and apply_incremental finds the right message. + use crate::materialize::apply_incremental; + + let admin = Identity::generate(); + let mut dag = test_dag(&admin); + + // Insert 1000 messages. + let mut last_hash = None; + for i in 0..1000u32 { + let e = do_emit( + &mut dag, + &admin, + EventKind::Message { + channel_id: "ch".into(), + body: format!("msg {i}"), + reply_to: None, + }, + ); + if i == 0 { + last_hash = Some(e.hash); + } + } + + let mut state = materialize(&dag); + assert_eq!(state.messages.len(), 1000); + // Index must be fully populated. + assert_eq!(state.message_index.len(), 1000); + + // Apply a reaction to the first message. + let target = last_hash.unwrap(); + let reaction = crate::event::Event::new( + &admin, + 1002, + EventHash::ZERO, + vec![], + EventKind::Reaction { + message_id: target, + emoji: "🚀".into(), + }, + 0, + ); + let result = apply_incremental(&mut state, &reaction); + assert_eq!(result, crate::materialize::ApplyResult::Applied); + // The first message should now have a reaction. + let idx = state.message_index[&target]; + assert!( + state.messages[idx].reactions.contains_key("🚀"), + "reaction should be on the correct message" + ); +} + +#[test] +fn message_index_stable_after_delete_channel() { + // DeleteChannel removes messages via retain() and rebuilds the index. + // Subsequent operations on surviving messages must still work correctly. + use crate::materialize::apply_incremental; + + let admin = Identity::generate(); + let mut dag = test_dag(&admin); + + do_emit( + &mut dag, + &admin, + EventKind::CreateChannel { + name: "ch1".into(), + channel_id: "ch-1".into(), + kind: crate::types::ChannelKind::Text, + }, + ); + do_emit( + &mut dag, + &admin, + EventKind::CreateChannel { + name: "ch2".into(), + channel_id: "ch-2".into(), + kind: crate::types::ChannelKind::Text, + }, + ); + + // Message in ch-1 (index 0). + do_emit( + &mut dag, + &admin, + EventKind::Message { + channel_id: "ch-1".into(), + body: "in ch1".into(), + reply_to: None, + }, + ); + // Message in ch-2 (index 1). + let msg_ch2 = do_emit( + &mut dag, + &admin, + EventKind::Message { + channel_id: "ch-2".into(), + body: "in ch2".into(), + reply_to: None, + }, + ); + + // Delete ch-1 — ch-2's message shifts from index 1 to index 0. + do_emit( + &mut dag, + &admin, + EventKind::DeleteChannel { + channel_id: "ch-1".into(), + }, + ); + + let mut state = materialize(&dag); + assert_eq!(state.messages.len(), 1); + // After rebuild, msg_ch2 must be at index 0. + assert_eq!(state.message_index[&msg_ch2.hash], 0); + + // Apply an edit to the surviving message — must succeed. + let edit = crate::event::Event::new( + &admin, + 100, + EventHash::ZERO, + vec![], + EventKind::EditMessage { + message_id: msg_ch2.hash, + new_body: "edited".into(), + }, + 0, + ); + let result = apply_incremental(&mut state, &edit); + assert_eq!(result, crate::materialize::ApplyResult::Applied); + assert_eq!(state.messages[0].body, "edited"); +} + +// ── Issue #123: PendingBuffer eviction logging ────────────────────────── + +#[test] +fn pending_buffer_eviction_reduces_count_to_cap() { + use crate::sync::PendingBuffer; + + // Insert more events than the cap and verify cached_count stays <= cap. + let id = Identity::generate(); + let cap = 10usize; + let mut buf = PendingBuffer::with_capacity(cap); + + for i in 0u64..50 { + let mut hash_bytes = [0u8; 32]; + hash_bytes[..8].copy_from_slice(&i.to_le_bytes()); + let unique_prev = EventHash(hash_bytes); + let event = crate::event::Event::new( + &id, + i + 1, + unique_prev, + vec![], + EventKind::SetProfile { + display_name: format!("n{i}"), + }, + 0, + ); + buf.buffer_for_prev(unique_prev, event); + // After each insertion, count must never exceed the cap. + assert!( + buf.pending_count() <= cap, + "pending_count {} exceeded cap {} after insertion {}", + buf.pending_count(), + cap, + i + ); + } + assert_eq!(buf.pending_count(), cap); +} + +// ── Issue #122: DeleteMessage path uses message_index ───────────────────── + +#[test] +fn message_index_delete_message_marks_deleted() { + use crate::materialize::apply_incremental; + + let admin = Identity::generate(); + let mut dag = test_dag(&admin); + + let msg = do_emit( + &mut dag, + &admin, + EventKind::Message { + channel_id: "ch".into(), + body: "to be deleted".into(), + reply_to: None, + }, + ); + + let mut state = materialize(&dag); + assert!(!state.messages[0].deleted); + + // Apply DeleteMessage — must use the index to find it. + let del = Event::new( + &admin, + 2, + EventHash::ZERO, + vec![], + EventKind::DeleteMessage { + message_id: msg.hash, + }, + 0, + ); + let result = apply_incremental(&mut state, &del); + assert_eq!(result, crate::materialize::ApplyResult::Applied); + assert!( + state.messages[0].deleted, + "message should be marked deleted" + ); +} diff --git a/crates/web/src/app.rs b/crates/web/src/app.rs index c9298bbb..35df6dc1 100644 --- a/crates/web/src/app.rs +++ b/crates/web/src/app.rs @@ -19,15 +19,17 @@ use crate::voice::VoiceManager; // fn play_notification_sound() { ... } fn init_theme() { - let _ = js_sys::eval( + js_sys::eval( r#"var t=localStorage.getItem('willow-theme')||'dark';document.documentElement.setAttribute('data-theme',t);"#, - ); + ) + .ok(); } pub fn toggle_theme() { - let _ = js_sys::eval( + js_sys::eval( r#"var h=document.documentElement;var c=h.getAttribute('data-theme')||'dark';var n=c==='dark'?'light':'dark';h.setAttribute('data-theme',n);localStorage.setItem('willow-theme',n);"#, - ); + ) + .ok(); } /// How many milliseconds to wait before clearing the loading state automatically. @@ -168,8 +170,9 @@ pub fn App() -> impl IntoView { }, ); if let Some(window) = web_sys::window() { - let _ = window - .add_event_listener_with_callback("keydown", closure.as_ref().unchecked_ref()); + window + .add_event_listener_with_callback("keydown", closure.as_ref().unchecked_ref()) + .ok(); } closure.forget(); } @@ -223,8 +226,9 @@ pub fn App() -> impl IntoView { }, ); if let Some(window) = web_sys::window() { - let _ = window - .add_event_listener_with_callback("hashchange", closure.as_ref().unchecked_ref()); + window + .add_event_listener_with_callback("hashchange", closure.as_ref().unchecked_ref()) + .ok(); } closure.forget(); } @@ -607,7 +611,7 @@ pub fn App() -> impl IntoView { write.voice.set_voice_channel_name.set(String::new()); write.ui.set_show_call_page.set(false); }); - let _ = promise.then2(&on_success, &on_error); + drop(promise.then2(&on_success, &on_error)); on_success.forget(); on_error.forget(); } @@ -695,10 +699,10 @@ pub fn App() -> impl IntoView { impl IntoView { local_display_name={let s: Signal = Signal::from(display_name); s} on_message_click=Callback::new(move |msg: DisplayMessage| { write.chat.set_replying_to.set(Some(msg)); - let _ = js_sys::eval("setTimeout(function(){var i=document.querySelector('.input-area input,.input-area textarea');if(i)i.focus();},50)"); + js_sys::eval("setTimeout(function(){var i=document.querySelector('.input-area input,.input-area textarea');if(i)i.focus();},50)").ok(); }) on_edit=Callback::new(move |msg: DisplayMessage| { write.chat.set_editing.set(Some(msg)); - let _ = js_sys::eval("setTimeout(function(){var i=document.querySelector('.input-area input,.input-area textarea');if(i)i.focus();},50)"); + js_sys::eval("setTimeout(function(){var i=document.querySelector('.input-area input,.input-area textarea');if(i)i.focus();},50)").ok(); }) on_delete=Callback::new(del_msg2) on_react=Callback::new(react2) @@ -814,7 +818,7 @@ pub fn App() -> impl IntoView { #[allow(clippy::await_holding_refcell_ref)] pub async fn handle_voice_create_offer(vm: VoiceManagerHandle, peer_id: String) { let mut mgr = vm.borrow_mut(); - let _ = mgr.create_offer(&peer_id).await; + mgr.create_offer(&peer_id).await.ok(); } /// Helper to handle an incoming WebRTC offer. @@ -824,7 +828,7 @@ pub async fn handle_voice_create_offer(vm: VoiceManagerHandle, peer_id: String) #[allow(clippy::await_holding_refcell_ref)] pub async fn handle_voice_offer(vm: VoiceManagerHandle, from: String, sdp: String) { let mut mgr = vm.borrow_mut(); - let _ = mgr.handle_offer(&from, &sdp).await; + mgr.handle_offer(&from, &sdp).await.ok(); } /// Helper to handle an incoming WebRTC answer. @@ -834,5 +838,5 @@ pub async fn handle_voice_offer(vm: VoiceManagerHandle, from: String, sdp: Strin #[allow(clippy::await_holding_refcell_ref)] pub async fn handle_voice_answer(vm: VoiceManagerHandle, from: String, sdp: String) { let mgr = vm.borrow(); - let _ = mgr.handle_answer(&from, &sdp).await; + mgr.handle_answer(&from, &sdp).await.ok(); } diff --git a/crates/web/src/components/add_server.rs b/crates/web/src/components/add_server.rs index ef0d753f..c08484c1 100644 --- a/crates/web/src/components/add_server.rs +++ b/crates/web/src/components/add_server.rs @@ -43,7 +43,7 @@ pub fn AddServerPanel(on_done: impl Fn(()) + Send + Clone + 'static) -> impl Int match h.create_server(&n).await { Ok(_) => { if !dn.trim().is_empty() { - let _ = h.set_server_display_name(dn.trim()).await; + h.set_server_display_name(dn.trim()).await.ok(); } done_cb(()); } @@ -115,7 +115,7 @@ pub fn AddServerPanel(on_done: impl Fn(()) + Send + Clone + 'static) -> impl Int match h.accept_invite(&code).await { Ok(()) => { if !name.trim().is_empty() { - let _ = h.set_server_display_name(name.trim()).await; + h.set_server_display_name(name.trim()).await.ok(); } set_join_code.set(String::new()); set_join_step.set(false); diff --git a/crates/web/src/components/call_page.rs b/crates/web/src/components/call_page.rs index 71db17db..1c71c9b5 100644 --- a/crates/web/src/components/call_page.rs +++ b/crates/web/src/components/call_page.rs @@ -124,7 +124,10 @@ pub fn CallPage( } // MUST call getUserMedia synchronously in click handler for gesture. - let window = web_sys::window().unwrap(); + let Some(window) = web_sys::window() else { + tracing::error!("camera click: no window"); + return; + }; let navigator = window.navigator(); let Ok(media_devices) = navigator.media_devices() else { tracing::error!("No media devices available"); @@ -155,7 +158,7 @@ pub fn CallPage( let on_error = wasm_bindgen::closure::Closure::once(move |_err: wasm_bindgen::JsValue| { tracing::error!("Camera access denied"); }); - let _ = promise.then2(&on_success, &on_error); + drop(promise.then2(&on_success, &on_error)); on_success.forget(); on_error.forget(); }; @@ -181,7 +184,10 @@ pub fn CallPage( } // MUST call getDisplayMedia synchronously in click handler for gesture. - let window = web_sys::window().unwrap(); + let Some(window) = web_sys::window() else { + tracing::error!("screen share click: no window"); + return; + }; let navigator = window.navigator(); let Ok(media_devices) = navigator.media_devices() else { tracing::error!("No media devices available"); @@ -224,7 +230,7 @@ pub fn CallPage( let on_error = wasm_bindgen::closure::Closure::once(move |_err: wasm_bindgen::JsValue| { tracing::error!("Screen share denied or cancelled"); }); - let _ = promise.then2(&on_success, &on_error); + drop(promise.then2(&on_success, &on_error)); on_success.forget(); on_error.forget(); }; diff --git a/crates/web/src/components/file_share.rs b/crates/web/src/components/file_share.rs index 1036a647..cb002b05 100644 --- a/crates/web/src/components/file_share.rs +++ b/crates/web/src/components/file_share.rs @@ -44,8 +44,11 @@ pub fn FileShareButton(channel: ReadSignal) -> impl IntoView { let size = file.size() as u64; if size > MAX_FILE_SIZE { - let window = web_sys::window().unwrap(); - let _ = window.alert_with_message("File is too large. Maximum size is 256 KB."); + if let Some(window) = web_sys::window() { + window + .alert_with_message("File is too large. Maximum size is 256 KB.") + .ok(); + } return; } @@ -53,25 +56,43 @@ pub fn FileShareButton(channel: ReadSignal) -> impl IntoView { let ch = channel.get_untracked(); let handle_inner = handle_change.clone(); - let reader = web_sys::FileReader::new().unwrap(); + let Ok(reader) = web_sys::FileReader::new() else { + tracing::error!("FileShareButton: FileReader::new failed"); + return; + }; let reader_clone = reader.clone(); let cb = Closure::once(move || { - let result = reader_clone.result().unwrap(); - let array_buf = result.dyn_into::().unwrap(); + let result = match reader_clone.result() { + Ok(r) => r, + Err(e) => { + tracing::error!("FileReader result error: {e:?}"); + return; + } + }; + let array_buf = match result.dyn_into::() { + Ok(b) => b, + Err(_) => { + tracing::error!("FileReader result was not an ArrayBuffer"); + return; + } + }; let uint8 = js_sys::Uint8Array::new(&array_buf); let data = uint8.to_vec(); wasm_bindgen_futures::spawn_local(async move { if let Err(e) = handle_inner.share_file_inline(&ch, &filename, &data).await { - let window = web_sys::window().unwrap(); - let _ = window.alert_with_message(&format!("Failed to share file: {e}")); + if let Some(window) = web_sys::window() { + window + .alert_with_message(&format!("Failed to share file: {e}")) + .ok(); + } } }); }); reader.set_onloadend(Some(cb.as_ref().unchecked_ref())); - let _ = reader.read_as_array_buffer(&file); + reader.read_as_array_buffer(&file).ok(); // Intentional leak: the FileReader callback must outlive this scope. // Since file picks are infrequent, the leak is acceptable. cb.forget(); @@ -125,22 +146,36 @@ pub fn FileCard(filename: String, data: Vec) -> impl IntoView { let parts = js_sys::Array::new(); parts.push(&array.buffer()); - let blob = web_sys::Blob::new_with_u8_array_sequence(&parts).unwrap(); - let url = web_sys::Url::create_object_url_with_blob(&blob).unwrap(); - - let window = web_sys::window().unwrap(); - let document = window.document().unwrap(); - let a = document.create_element("a").unwrap(); - a.set_attribute("href", &url).unwrap(); - a.set_attribute("download", &fname_download).unwrap(); - a.set_attribute("style", "display:none").unwrap(); - document.body().unwrap().append_child(&a).unwrap(); - - let html_a: web_sys::HtmlElement = a.clone().dyn_into().unwrap(); - html_a.click(); + let Ok(blob) = web_sys::Blob::new_with_u8_array_sequence(&parts) else { + tracing::error!("FileCard: Blob::new failed"); + return; + }; + let Ok(url) = web_sys::Url::create_object_url_with_blob(&blob) else { + tracing::error!("FileCard: create_object_url failed"); + return; + }; - document.body().unwrap().remove_child(&a).unwrap(); - web_sys::Url::revoke_object_url(&url).unwrap(); + let Some(window) = web_sys::window() else { + return; + }; + let Some(document) = window.document() else { + return; + }; + let Ok(a) = document.create_element("a") else { + return; + }; + a.set_attribute("href", &url).ok(); + a.set_attribute("download", &fname_download).ok(); + a.set_attribute("style", "display:none").ok(); + if let Some(body) = document.body() { + body.append_child(&a).ok(); + use wasm_bindgen::JsCast; + if let Ok(html_a) = a.clone().dyn_into::() { + html_a.click(); + } + body.remove_child(&a).ok(); + } + web_sys::Url::revoke_object_url(&url).ok(); }; view! { diff --git a/crates/web/src/components/member_list.rs b/crates/web/src/components/member_list.rs index 7a02f011..3939ebf1 100644 --- a/crates/web/src/components/member_list.rs +++ b/crates/web/src/components/member_list.rs @@ -192,7 +192,7 @@ pub fn MemberList( if let Some(eid) = parse_eid(&pt) { let ht = ht.clone(); wasm_bindgen_futures::spawn_local(async move { - let _ = ht.propose_grant_admin(eid).await; + ht.propose_grant_admin(eid).await.ok(); }); } }>"Trust" @@ -200,7 +200,7 @@ pub fn MemberList( if let Some(eid) = parse_eid(&pu) { let hu = hu.clone(); wasm_bindgen_futures::spawn_local(async move { - let _ = hu.propose_revoke_admin(eid).await; + hu.propose_revoke_admin(eid).await.ok(); }); } }>"Untrust" @@ -248,7 +248,7 @@ pub fn MemberList( if let Some(eid) = parse_eid(&pid) { let hk = handle_kick_confirm.clone(); wasm_bindgen_futures::spawn_local(async move { - let _ = hk.propose_kick_member(eid).await; + hk.propose_kick_member(eid).await.ok(); }); } } diff --git a/crates/web/src/components/message.rs b/crates/web/src/components/message.rs index eb8b62d2..79424ff5 100644 --- a/crates/web/src/components/message.rs +++ b/crates/web/src/components/message.rs @@ -71,15 +71,15 @@ fn download_blob(data: &[u8], filename: &str) { let Ok(a) = document.create_element("a") else { return; }; - let _ = a.set_attribute("href", &url); - let _ = a.set_attribute("download", filename); - let _ = a.set_attribute("style", "display:none"); - let _ = body.append_child(&a); + a.set_attribute("href", &url).ok(); + a.set_attribute("download", filename).ok(); + a.set_attribute("style", "display:none").ok(); + body.append_child(&a).ok(); if let Ok(html_a) = a.clone().dyn_into::() { html_a.click(); } - let _ = body.remove_child(&a); - let _ = web_sys::Url::revoke_object_url(&url); + body.remove_child(&a).ok(); + web_sys::Url::revoke_object_url(&url).ok(); } /// Extract URLs from text. Returns (segments, image_urls). diff --git a/crates/web/src/components/participant_tile.rs b/crates/web/src/components/participant_tile.rs index 177aadd5..98c82d53 100644 --- a/crates/web/src/components/participant_tile.rs +++ b/crates/web/src/components/participant_tile.rs @@ -74,7 +74,7 @@ pub fn ParticipantTile( leptos::prelude::set_timeout( move || { media_el.set_src_object(Some(&media_stream)); - let _ = media_el.play(); + media_el.play().ok(); }, std::time::Duration::ZERO, ); diff --git a/crates/web/src/voice.rs b/crates/web/src/voice.rs index 1577940b..ece50b08 100644 --- a/crates/web/src/voice.rs +++ b/crates/web/src/voice.rs @@ -146,13 +146,17 @@ impl SpeakingDetector { } }) as Box); - let window = web_sys::window().unwrap(); - let id = window - .set_interval_with_callback_and_timeout_and_arguments_0( - closure.as_ref().unchecked_ref(), - 60, - ) - .unwrap(); + let Some(window) = web_sys::window() else { + tracing::error!("SpeakingDetector::start_polling: no window"); + return; + }; + let Ok(id) = window.set_interval_with_callback_and_timeout_and_arguments_0( + closure.as_ref().unchecked_ref(), + 60, + ) else { + tracing::error!("SpeakingDetector::start_polling: set_interval failed"); + return; + }; // Intentional leak: the closure must outlive the interval. closure.forget(); @@ -363,7 +367,10 @@ impl VoiceManager { let stream: MediaStream = if streams.length() > 0 { streams.get(0).unchecked_into() } else { - let s = MediaStream::new().unwrap(); + let Ok(s) = MediaStream::new() else { + tracing::warn!("ontrack: MediaStream::new failed"); + return; + }; s.add_track(&track); s }; @@ -502,7 +509,10 @@ impl VoiceManager { remote_peer: &str, ) -> Result<(&PeerConnectionState, Option), String> { if self.connections.contains_key(remote_peer) { - let state = self.connections.get(remote_peer).unwrap(); + let state = self + .connections + .get(remote_peer) + .expect("key just inserted"); return Ok((state, None)); } @@ -521,7 +531,10 @@ impl VoiceManager { PeerConnectionState { pc, making_offer }, ); - let state = self.connections.get(remote_peer).unwrap(); + let state = self + .connections + .get(remote_peer) + .expect("key just inserted"); Ok((state, video_sender)) } diff --git a/crates/worker/src/actors/heartbeat.rs b/crates/worker/src/actors/heartbeat.rs index cff39f77..0ab9d46d 100644 --- a/crates/worker/src/actors/heartbeat.rs +++ b/crates/worker/src/actors/heartbeat.rs @@ -14,6 +14,7 @@ use crate::types::{WorkerAnnouncement, WorkerWireMessage}; /// Heartbeat actor that periodically queries state and broadcasts announcements. pub struct HeartbeatActor { peer_id: EndpointId, + identity: willow_identity::Identity, interval: Duration, state_addr: Addr, topic: T, @@ -26,9 +27,11 @@ impl HeartbeatActor { interval: Duration, state_addr: Addr, topic: T, + identity: willow_identity::Identity, ) -> Self { Self { peer_id, + identity, interval, state_addr, topic, @@ -52,11 +55,13 @@ impl Actor for HeartbeatActor { fn stopped(&mut self) -> impl std::future::Future + Send { debug!("heartbeat actor shutting down"); let peer_id = self.peer_id; + let identity = self.identity.clone(); let topic = self.topic.clone(); async move { // Send departure before exiting. let departure = WorkerWireMessage::Departure { peer_id }; - if let Ok(bytes) = bincode::serialize(&departure) { + let wire = willow_common::WireMessage::Worker(departure); + if let Some(bytes) = willow_common::pack_wire(&wire, &identity) { if let Err(e) = topic.broadcast(bytes::Bytes::from(bytes)).await { warn!(%e, "failed to send departure message"); } @@ -73,6 +78,7 @@ impl Handler for HeartbeatActor { ) -> impl std::future::Future + Send { let state_addr = self.state_addr.clone(); let peer_id = self.peer_id; + let identity = self.identity.clone(); let topic = self.topic.clone(); async move { @@ -95,8 +101,9 @@ impl Handler for HeartbeatActor { }; let msg = WorkerWireMessage::Announcement(announcement); - if let Ok(bytes) = bincode::serialize(&msg) { - let _ = topic.broadcast(bytes::Bytes::from(bytes)).await; + let wire = willow_common::WireMessage::Worker(msg); + if let Some(bytes) = willow_common::pack_wire(&wire, &identity) { + topic.broadcast(bytes::Bytes::from(bytes)).await.ok(); } } } @@ -149,11 +156,13 @@ mod tests { }); let test_peer = net_a.id(); + let test_identity = willow_identity::Identity::generate(); let _hb = system.spawn(HeartbeatActor::new( test_peer, Duration::from_millis(50), state_addr, sender_a, + test_identity, )); // Wait for at least 1 announcement — drain neighbor events first. @@ -168,12 +177,12 @@ mod tests { } }; - let decoded: WorkerWireMessage = bincode::deserialize(&data).unwrap(); - match decoded { - WorkerWireMessage::Announcement(a) => { + let (wire, _signer) = willow_common::unpack_wire(&data).expect("must decode signed wire"); + match wire { + willow_common::WireMessage::Worker(WorkerWireMessage::Announcement(a)) => { assert_eq!(a.peer_id, test_peer); } - _ => panic!("expected Announcement"), + other => panic!("expected Worker(Announcement), got {:?}", other), } system.shutdown().await; @@ -189,7 +198,11 @@ mod tests { break msg.content; } }; - let decoded: WorkerWireMessage = bincode::deserialize(&departure_data).unwrap(); - assert!(matches!(decoded, WorkerWireMessage::Departure { .. })); + let (wire, _signer) = + willow_common::unpack_wire(&departure_data).expect("must decode signed departure"); + assert!(matches!( + wire, + willow_common::WireMessage::Worker(WorkerWireMessage::Departure { .. }) + )); } } diff --git a/crates/worker/src/actors/network.rs b/crates/worker/src/actors/network.rs index 850543b9..950149c3 100644 --- a/crates/worker/src/actors/network.rs +++ b/crates/worker/src/actors/network.rs @@ -16,6 +16,7 @@ use crate::types::WorkerWireMessage; pub struct NetworkActor { state_addr: Addr, local_peer_id: EndpointId, + identity: willow_identity::Identity, events: Option, /// Optional SERVER_OPS topic events stream. ops_events: Option, @@ -31,10 +32,12 @@ impl NetworkActor { state_addr: Addr, local_peer_id: EndpointId, reply_topic: T, + identity: willow_identity::Identity, ) -> Self { Self { state_addr, local_peer_id, + identity, events: Some(events), ops_events: None, reply_topic, @@ -80,7 +83,7 @@ impl Actor for NetworkActor< willow_actor::runtime::spawn(async move { // Wait for StateActor to be ready before draining events. if let Some(ref mut rx) = ready { - let _ = rx.wait_for(|v| *v).await; + rx.wait_for(|v| *v).await.ok(); } while let Some(Ok(event)) = events.next().await { if addr.do_send(GossipEventMsg(event)).is_err() { @@ -96,7 +99,7 @@ impl Actor for NetworkActor< willow_actor::runtime::spawn(async move { // Wait for StateActor to be ready before draining events. if let Some(ref mut rx) = ready { - let _ = rx.wait_for(|v| *v).await; + rx.wait_for(|v| *v).await.ok(); } while let Some(Ok(event)) = ops_events.next().await { if addr.do_send(ServerOpsEventMsg(event)).is_err() { @@ -119,6 +122,7 @@ impl Handler ) -> impl std::future::Future + Send { let state_addr = self.state_addr.clone(); let local_peer_id = self.local_peer_id; + let identity = self.identity.clone(); let event = msg.0; let reply_topic = self.reply_topic.clone(); @@ -138,8 +142,9 @@ impl Handler target_peer: requester, payload: Box::new(response), }; - if let Ok(bytes) = bincode::serialize(&reply) { - let _ = reply_topic.broadcast(bytes::Bytes::from(bytes)).await; + let wire = willow_common::WireMessage::Worker(reply); + if let Some(bytes) = willow_common::pack_wire(&wire, &identity) { + reply_topic.broadcast(bytes::Bytes::from(bytes)).await.ok(); } } } @@ -148,7 +153,7 @@ impl Handler match parse_server_message(&msg.content) { ServerMessageAction::Events(events) => { for event in events { - let _ = state_addr.do_send(EventMsg(event)); + state_addr.do_send(EventMsg(event)).ok(); } } ServerMessageAction::Ignore => {} @@ -176,7 +181,7 @@ impl Handler { for event in events { - let _ = state_addr.do_send(EventMsg(event)); + state_addr.do_send(EventMsg(event)).ok(); } } ServerMessageAction::Ignore => {} @@ -207,9 +212,14 @@ pub enum WorkerMessageAction { /// This is a pure function — no I/O, no channels — so it's easily /// testable. The caller handles the actual I/O. pub fn parse_worker_message(data: &[u8], local_peer_id: &EndpointId) -> WorkerMessageAction { - let msg = match willow_transport::unpack::(data) { - Ok(m) => m, - Err(e) => return WorkerMessageAction::DeserializeError(e.to_string()), + let msg = match willow_common::unpack_wire(data) { + Some((willow_common::WireMessage::Worker(m), _signer)) => m, + Some(_) => return WorkerMessageAction::Ignore, + None => { + return WorkerMessageAction::DeserializeError( + "invalid or unsigned worker message".to_string(), + ) + } }; match msg { @@ -264,7 +274,7 @@ pub fn parse_server_message(data: &[u8]) -> ServerMessageAction { #[cfg(test)] mod tests { use super::*; - use willow_common::{WorkerRequest, WorkerResponse}; + use willow_common::{WorkerRequest, WorkerResponse, WorkerWireMessage}; use willow_identity::Identity; use willow_state::HeadsSummary; @@ -272,8 +282,13 @@ mod tests { Identity::generate().endpoint_id() } + fn pack_worker(msg: WorkerWireMessage, signer: &Identity) -> Vec { + willow_common::pack_wire(&willow_common::WireMessage::Worker(msg), signer).unwrap() + } + #[test] fn parse_worker_request_targeted_at_us() { + let signer = Identity::generate(); let my_id = gen_id(); let msg = WorkerWireMessage::Request { request_id: "req-1".to_string(), @@ -283,7 +298,7 @@ mod tests { heads: HeadsSummary::default(), }, }; - let data = bincode::serialize(&msg).unwrap(); + let data = pack_worker(msg, &signer); match parse_worker_message(&data, &my_id) { WorkerMessageAction::HandleRequest { request_id, .. } => { @@ -296,6 +311,7 @@ mod tests { #[test] fn sync_request_accepted_regardless_of_target() { // Sync requests are broadcast — accepted even if target_peer differs. + let signer = Identity::generate(); let my_id = gen_id(); let other_id = gen_id(); let msg = WorkerWireMessage::Request { @@ -306,7 +322,7 @@ mod tests { heads: HeadsSummary::default(), }, }; - let data = bincode::serialize(&msg).unwrap(); + let data = pack_worker(msg, &signer); assert!(matches!( parse_worker_message(&data, &my_id), @@ -317,6 +333,7 @@ mod tests { #[test] fn history_request_not_for_us_ignored() { // Non-Sync requests targeted at another peer are ignored. + let signer = Identity::generate(); let my_id = gen_id(); let other_id = gen_id(); let msg = WorkerWireMessage::Request { @@ -329,7 +346,7 @@ mod tests { limit: 50, }, }; - let data = bincode::serialize(&msg).unwrap(); + let data = pack_worker(msg, &signer); assert!(matches!( parse_worker_message(&data, &my_id), @@ -339,6 +356,7 @@ mod tests { #[test] fn parse_worker_announcement_ignored() { + let signer = Identity::generate(); let my_id = gen_id(); let msg = WorkerWireMessage::Announcement(willow_common::WorkerAnnouncement { peer_id: gen_id(), @@ -350,7 +368,7 @@ mod tests { servers: vec![], timestamp: 0, }); - let data = bincode::serialize(&msg).unwrap(); + let data = pack_worker(msg, &signer); assert!(matches!( parse_worker_message(&data, &my_id), @@ -360,9 +378,10 @@ mod tests { #[test] fn parse_worker_departure_ignored() { + let signer = Identity::generate(); let my_id = gen_id(); let msg = WorkerWireMessage::Departure { peer_id: gen_id() }; - let data = bincode::serialize(&msg).unwrap(); + let data = pack_worker(msg, &signer); assert!(matches!( parse_worker_message(&data, &my_id), @@ -372,6 +391,7 @@ mod tests { #[test] fn parse_worker_response_ignored() { + let signer = Identity::generate(); let my_id = gen_id(); let msg = WorkerWireMessage::Response { request_id: "r1".to_string(), @@ -380,8 +400,54 @@ mod tests { reason: "test".to_string(), }), }; + let data = pack_worker(msg, &signer); + + assert!(matches!( + parse_worker_message(&data, &my_id), + WorkerMessageAction::Ignore + )); + } + + #[test] + fn unsigned_bytes_rejected() { + // Raw bincode (old unsigned path) must be rejected. + let my_id = gen_id(); + let msg = WorkerWireMessage::Announcement(willow_common::WorkerAnnouncement { + peer_id: gen_id(), + role: willow_common::WorkerRoleInfo::Replay { + servers_loaded: 0, + events_buffered: 0, + max_events: 1000, + }, + servers: vec![], + timestamp: 0, + }); let data = bincode::serialize(&msg).unwrap(); + assert!(matches!( + parse_worker_message(&data, &my_id), + WorkerMessageAction::DeserializeError(_) + )); + } + #[test] + fn non_worker_wire_message_ignored() { + // A signed WireMessage that is not a Worker variant → Ignore. + let id = Identity::generate(); + let my_id = gen_id(); + let event = willow_state::Event::new( + &id, + 1, + willow_state::EventHash::ZERO, + vec![], + willow_state::EventKind::Message { + channel_id: "ch".to_string(), + body: "hi".to_string(), + reply_to: None, + }, + 1000, + ); + let data = + willow_common::pack_wire(&willow_common::WireMessage::Event(event), &id).unwrap(); assert!(matches!( parse_worker_message(&data, &my_id), WorkerMessageAction::Ignore diff --git a/crates/worker/src/actors/state.rs b/crates/worker/src/actors/state.rs index 2e7e530d..2ccd8e6d 100644 --- a/crates/worker/src/actors/state.rs +++ b/crates/worker/src/actors/state.rs @@ -27,7 +27,7 @@ impl Actor for StateActor { ) -> impl std::future::Future + Send { debug!("state actor started"); if let Some(ready) = self.ready.take() { - let _ = ready.send(true); + ready.send(true).ok(); } async {} } diff --git a/crates/worker/src/actors/sync.rs b/crates/worker/src/actors/sync.rs index f4556ec2..697412fb 100644 --- a/crates/worker/src/actors/sync.rs +++ b/crates/worker/src/actors/sync.rs @@ -13,6 +13,7 @@ use crate::types::{WorkerRequest, WorkerWireMessage}; /// Sync actor that periodically queries state hashes and broadcasts sync requests. pub struct SyncActor { peer_id: willow_identity::EndpointId, + identity: willow_identity::Identity, interval: Duration, state_addr: Addr, topic: T, @@ -25,9 +26,11 @@ impl SyncActor { interval: Duration, state_addr: Addr, topic: T, + identity: willow_identity::Identity, ) -> Self { Self { peer_id, + identity, interval, state_addr, topic, @@ -57,6 +60,7 @@ impl Handler for SyncActor { ) -> impl std::future::Future + Send { let state_addr = self.state_addr.clone(); let peer_id = self.peer_id; + let identity = self.identity.clone(); let topic = self.topic.clone(); async move { @@ -76,8 +80,9 @@ impl Handler for SyncActor { target_peer: peer_id, payload: WorkerRequest::Sync { server_id, heads }, }; - if let Ok(bytes) = bincode::serialize(&msg) { - let _ = topic.broadcast(bytes::Bytes::from(bytes)).await; + let wire = willow_common::WireMessage::Worker(msg); + if let Some(bytes) = willow_common::pack_wire(&wire, &identity) { + topic.broadcast(bytes::Bytes::from(bytes)).await.ok(); } } } @@ -136,11 +141,13 @@ mod tests { ready: None, }); + let sync_identity = Identity::generate(); let addr = system.spawn(SyncActor::new( - Identity::generate().endpoint_id(), + sync_identity.endpoint_id(), Duration::from_secs(60), state_addr, sender, + sync_identity, )); assert!(addr.is_alive()); diff --git a/crates/worker/src/runtime.rs b/crates/worker/src/runtime.rs index 7a433dd1..6e46b175 100644 --- a/crates/worker/src/runtime.rs +++ b/crates/worker/src/runtime.rs @@ -50,6 +50,7 @@ pub async fn run( state_addr.clone(), peer_id, workers_sender.clone(), + identity.clone(), ) .with_ops_events(ops_events) .with_ready_signal(ready_rx), @@ -60,6 +61,7 @@ pub async fn run( Duration::from_secs(10), state_addr.clone(), workers_sender.clone(), + identity.clone(), )); let _sync = system.spawn(SyncActor::new( @@ -67,6 +69,7 @@ pub async fn run( Duration::from_secs(config.sync_interval_secs), state_addr, workers_sender, + identity, )); // Wait for shutdown. diff --git a/crates/worker/tests/integration.rs b/crates/worker/tests/integration.rs index ada0e0dd..bb36ddc1 100644 --- a/crates/worker/tests/integration.rs +++ b/crates/worker/tests/integration.rs @@ -181,11 +181,13 @@ async fn heartbeat_and_state_actor_interaction() { }); let test_worker_id = net_a.id(); + let hb_identity = Identity::generate(); let _hb = system.spawn(HeartbeatActor::new( test_worker_id, Duration::from_millis(50), state_addr.clone(), sender_a, + hb_identity, )); // Wait for a heartbeat — drain neighbor events first. @@ -200,9 +202,9 @@ async fn heartbeat_and_state_actor_interaction() { } }; - let decoded: willow_common::WorkerWireMessage = bincode::deserialize(&data).unwrap(); - match decoded { - willow_common::WorkerWireMessage::Announcement(a) => { + let (wire, _) = willow_common::unpack_wire(&data).expect("signed announcement"); + match wire { + willow_common::WireMessage::Worker(willow_common::WorkerWireMessage::Announcement(a)) => { assert_eq!(a.peer_id, test_worker_id); match a.role { WorkerRoleInfo::Replay { @@ -211,7 +213,7 @@ async fn heartbeat_and_state_actor_interaction() { _ => panic!("expected Replay"), } } - _ => panic!("expected Announcement"), + _ => panic!("expected Worker(Announcement)"), } system.shutdown().await; @@ -314,11 +316,13 @@ async fn graceful_shutdown_sends_departure() { }); let departing_id = net_a.id(); + let dep_identity = Identity::generate(); let _hb = system.spawn(HeartbeatActor::new( departing_id, Duration::from_secs(60), // Long interval — won't fire naturally state_addr, sender_a, + dep_identity, )); // Immediately shut down. @@ -335,12 +339,14 @@ async fn graceful_shutdown_sends_departure() { } }; - let decoded: willow_common::WorkerWireMessage = bincode::deserialize(&departure_data).unwrap(); - match decoded { - willow_common::WorkerWireMessage::Departure { peer_id } => { + let (wire, _) = willow_common::unpack_wire(&departure_data).expect("signed departure"); + match wire { + willow_common::WireMessage::Worker(willow_common::WorkerWireMessage::Departure { + peer_id, + }) => { assert_eq!(peer_id, departing_id); } - _ => panic!("expected Departure"), + _ => panic!("expected Worker(Departure)"), } } @@ -369,17 +375,20 @@ async fn full_actor_orchestration_without_network() { }); let orch_id = net_a.id(); + let orch_identity = Identity::generate(); let _hb = system.spawn(HeartbeatActor::new( orch_id, Duration::from_millis(50), state_addr.clone(), sender_a.clone(), + orch_identity.clone(), )); let _sync = system.spawn(SyncActor::new( orch_id, Duration::from_millis(80), state_addr.clone(), sender_a, + orch_identity, )); // Ingest some events. @@ -410,12 +419,14 @@ async fn full_actor_orchestration_without_network() { if let Ok(Some(Ok(willow_network::GossipEvent::Received(msg)))) = tokio::time::timeout(Duration::from_millis(30), events_b.next()).await { - if let Ok(decoded) = - bincode::deserialize::(&msg.content) + if let Some(( + willow_common::WireMessage::Worker(willow_common::WorkerWireMessage::Announcement( + _, + )), + _, + )) = willow_common::unpack_wire(&msg.content) { - if matches!(decoded, willow_common::WorkerWireMessage::Announcement(_)) { - announcement_count += 1; - } + announcement_count += 1; } } } @@ -525,8 +536,14 @@ async fn server_ops_events_forwarded_to_state() { let ops_events = MockTopicEvents { rx: ops_rx }; let _network = system.spawn( - NetworkActor::new(workers_events, state_addr.clone(), peer_id, MockTopicHandle) - .with_ops_events(ops_events), + NetworkActor::new( + workers_events, + state_addr.clone(), + peer_id, + MockTopicHandle, + worker_id.clone(), + ) + .with_ops_events(ops_events), ); // Allow the actor to start. @@ -663,6 +680,7 @@ async fn pre_buffered_events_wait_for_state_ready_signal() { state_addr.clone(), peer_id, MockTopicHandle, + worker_id.clone(), ) .with_ops_events(MockTopicEvents { rx: ops_rx }) .with_ready_signal(ready_rx), @@ -712,6 +730,7 @@ async fn network_actor_drains_immediately_without_ready_signal() { state_addr.clone(), peer_id, MockTopicHandle, + worker_id.clone(), ) .with_ops_events(MockTopicEvents { rx: ops_rx }), // No ready signal — drain starts immediately.