From 40cdfa1169e434accf4e66dd77ff84fc127a578c Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 28 Apr 2026 16:43:43 +0000 Subject: [PATCH] fix(client): log dropped fire-and-forget sends in listener loop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces 16 trailing `.ok();` swallows in `crates/client/src/listeners.rs` with a private `warn_if_err(Result, &'static str)` helper that emits `tracing::warn!(?e, "{context}")` on `Err` and drops the success value. Previously, when the broker stopped or a topic broadcast failed, the listener silently kept running with no record. With this change the listener still proceeds (a downstream send failure should not abort the loop), but field logs now surface the issue with a diagnostic context string that names the specific call site. Migrated sites span: - `event_broker.do_send(Publish(...))` for PeerConnected / PeerDisconnected / derived ClientEvent / ProfileUpdated / SyncCompleted / VoiceJoined / VoiceLeft / VoiceSignal / JoinLinkResponse / JoinLinkDenied - `topic.broadcast(...)` for SyncBatch (SyncRequest reply), JoinResponse, Event(GrantPermission) - `persistence.do_send(PersistEvent { ... })` for both the inline DAG-apply loop and the GrantPermission branch One `.ok()` site at line 573 is intentionally left as-is: it converts a `Result` from `ds.managed.create_and_insert(...)` to an `Option` consumed by the surrounding `if let Some(event) = ...`. That is `Result -> Option` coercion, not a swallowed fire-and-forget send, so the helper does not apply. Helper lives in `listeners.rs` only; no propagation to other files — issue #253 is scoped to this hot loop. A separate audit can decide whether other crates' `.ok();` sites warrant the same treatment. Refs #253 --- crates/client/src/listeners.rs | 179 +++++++++++++++++++++------------ 1 file changed, 115 insertions(+), 64 deletions(-) diff --git a/crates/client/src/listeners.rs b/crates/client/src/listeners.rs index b2543223..5dd01455 100644 --- a/crates/client/src/listeners.rs +++ b/crates/client/src/listeners.rs @@ -42,6 +42,22 @@ pub(crate) fn truncate_to_chars(s: String, max: usize) -> String { } } +/// Log a `tracing::warn!` if `r` is `Err`, otherwise drop the success value. +/// +/// Used in this listener hot loop to replace bare `.ok();` calls on +/// "fire-and-forget" sends — broker `do_send`, topic `broadcast`, persistence +/// `do_send`, etc. The previous pattern silently dropped these failures, so a +/// dead broker or broken topic would cause the listener to keep running +/// without any record. We still proceed (the listener should not abort on a +/// downstream failure), but field logs now surface the issue. +/// +/// See issue #253. +fn warn_if_err(r: Result, context: &'static str) { + if let Err(e) = r { + tracing::warn!(?e, "{context}"); + } +} + /// Context passed to listener tasks with all the actor addresses they need. pub struct ListenerCtx { pub event_state: Addr>, @@ -117,9 +133,11 @@ async fn topic_listener_loop( } }) .await; - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::PeerConnected(id))) - .ok(); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::PeerConnected(id))), + "event_broker.do_send Publish(PeerConnected)", + ); // Re-broadcast local profile so the newly joined peer gets // our display name even if they missed the initial broadcast. if let Some(cb) = &ctx.on_neighbor_up { @@ -132,9 +150,11 @@ async fn topic_listener_loop( c.peers.retain(|p| p != &id2); }) .await; - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::PeerDisconnected(id))) - .ok(); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::PeerDisconnected(id))), + "event_broker.do_send Publish(PeerDisconnected)", + ); } } } @@ -192,12 +212,17 @@ async fn try_insert_event(ctx: &ListenerCtx, event: willow_state::Event) { // Persist and emit for each applied event. for ev in &all_applied { - ctx.persistence - .do_send(persistence_actor::PersistEvent { event: ev.clone() }) - .ok(); + warn_if_err( + ctx.persistence + .do_send(persistence_actor::PersistEvent { event: ev.clone() }), + "persistence.do_send PersistEvent", + ); let client_events = mutations::derive_client_events(ev); for e in client_events { - ctx.event_broker.do_send(willow_actor::Publish(e)).ok(); + warn_if_err( + ctx.event_broker.do_send(willow_actor::Publish(e)), + "event_broker.do_send Publish(derived ClientEvent)", + ); } } // Persist the state snapshot so messages survive a page reload @@ -239,12 +264,14 @@ async fn process_received_message( p.names.insert(peer_id, display_name); }) .await; - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::ProfileUpdated { - peer_id: profile.peer_id, - display_name: profile.display_name, - })) - .ok(); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::ProfileUpdated { + peer_id: profile.peer_id, + display_name: profile.display_name, + })), + "event_broker.do_send Publish(ProfileUpdated from profile broadcast)", + ); return; } Err(willow_identity::IdentityError::PeerMismatch { claimed, signer }) => { @@ -314,11 +341,13 @@ async fn process_received_message( let _is_synced = willow_actor::state::select(&ctx.dag, |ds| ds.managed.is_synced()).await; - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::SyncCompleted { - ops_applied: count, - })) - .ok(); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::SyncCompleted { + ops_applied: count, + })), + "event_broker.do_send Publish(SyncCompleted)", + ); } } crate::ops::WireMessage::SyncRequest { state_hash, .. } => { @@ -340,7 +369,10 @@ async fn process_received_message( if !events.is_empty() { let msg = crate::ops::WireMessage::SyncBatch { events }; if let Some(data) = crate::ops::pack_wire(&msg, &ctx.identity) { - topic.broadcast(bytes::Bytes::from(data)).await.ok(); + warn_if_err( + topic.broadcast(bytes::Bytes::from(data)).await, + "topic.broadcast SyncBatch (SyncRequest reply)", + ); } } } @@ -377,12 +409,14 @@ async fn process_received_message( v.participants.entry(ch).or_default().insert(peer_id); }) .await; - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::VoiceJoined { - channel_id, - peer_id, - })) - .ok(); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::VoiceJoined { + channel_id, + peer_id, + })), + "event_broker.do_send Publish(VoiceJoined)", + ); } crate::ops::WireMessage::VoiceLeave { channel_id, @@ -395,12 +429,14 @@ async fn process_received_message( } }) .await; - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::VoiceLeft { - channel_id, - peer_id, - })) - .ok(); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::VoiceLeft { + channel_id, + peer_id, + })), + "event_broker.do_send Publish(VoiceLeft)", + ); } crate::ops::WireMessage::VoiceSignal { channel_id, @@ -408,13 +444,15 @@ async fn process_received_message( signal, } => { if target_peer == ctx.identity.endpoint_id() { - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::VoiceSignal { - channel_id, - from_peer: signer, - signal, - })) - .ok(); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::VoiceSignal { + channel_id, + from_peer: signer, + signal, + })), + "event_broker.do_send Publish(VoiceSignal)", + ); } } crate::ops::WireMessage::JoinRequest { link_id, peer_id } => { @@ -506,7 +544,10 @@ async fn process_received_message( invite_data, }; if let Some(data) = crate::ops::pack_wire(&msg, &ctx.identity) { - topic.broadcast(bytes::Bytes::from(data)).await.ok(); + warn_if_err( + topic.broadcast(bytes::Bytes::from(data)).await, + "topic.broadcast JoinResponse", + ); } // Grant SendMessages permission to the joining peer so @@ -542,17 +583,22 @@ async fn process_received_message( }) .await; // Persist. - ctx.persistence - .do_send(crate::persistence_actor::PersistEvent { - event: event.clone(), - }) - .ok(); + warn_if_err( + ctx.persistence + .do_send(crate::persistence_actor::PersistEvent { + event: event.clone(), + }), + "persistence.do_send PersistEvent (GrantPermission)", + ); // Broadcast to other peers. if let Some(data) = crate::ops::pack_wire( &crate::ops::WireMessage::Event(event), &ctx.identity, ) { - topic.broadcast(bytes::Bytes::from(data)).await.ok(); + warn_if_err( + topic.broadcast(bytes::Bytes::from(data)).await, + "topic.broadcast Event(GrantPermission)", + ); } } } @@ -563,11 +609,12 @@ async fn process_received_message( invite_data, } => { if target_peer == ctx.identity.endpoint_id() { - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::JoinLinkResponse { - invite_data, - })) - .ok(); + warn_if_err( + ctx.event_broker.do_send(willow_actor::Publish( + ClientEvent::JoinLinkResponse { invite_data }, + )), + "event_broker.do_send Publish(JoinLinkResponse)", + ); } } crate::ops::WireMessage::JoinDenied { @@ -575,11 +622,13 @@ async fn process_received_message( reason, } => { if target_peer == ctx.identity.endpoint_id() { - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::JoinLinkDenied { - reason, - })) - .ok(); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::JoinLinkDenied { + reason, + })), + "event_broker.do_send Publish(JoinLinkDenied)", + ); } } // TopicAnnounce is consumed by the relay; clients ignore it. @@ -603,12 +652,14 @@ async fn process_received_message( p.names.insert(peer_id, name); }) .await; - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::ProfileUpdated { - peer_id, - display_name, - })) - .ok(); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::ProfileUpdated { + peer_id, + display_name, + })), + "event_broker.do_send Publish(ProfileUpdated from ProfileAnnounce)", + ); } // Worker messages travel on the _willow_workers topic and are never // delivered to the client's server-ops listener.