diff --git a/crates/client/src/listeners.rs b/crates/client/src/listeners.rs index b2543223..5dd01455 100644 --- a/crates/client/src/listeners.rs +++ b/crates/client/src/listeners.rs @@ -42,6 +42,22 @@ pub(crate) fn truncate_to_chars(s: String, max: usize) -> String { } } +/// Log a `tracing::warn!` if `r` is `Err`, otherwise drop the success value. +/// +/// Used in this listener hot loop to replace bare `.ok();` calls on +/// "fire-and-forget" sends — broker `do_send`, topic `broadcast`, persistence +/// `do_send`, etc. The previous pattern silently dropped these failures, so a +/// dead broker or broken topic would cause the listener to keep running +/// without any record. We still proceed (the listener should not abort on a +/// downstream failure), but field logs now surface the issue. +/// +/// See issue #253. +fn warn_if_err(r: Result, context: &'static str) { + if let Err(e) = r { + tracing::warn!(?e, "{context}"); + } +} + /// Context passed to listener tasks with all the actor addresses they need. pub struct ListenerCtx { pub event_state: Addr>, @@ -117,9 +133,11 @@ async fn topic_listener_loop( } }) .await; - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::PeerConnected(id))) - .ok(); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::PeerConnected(id))), + "event_broker.do_send Publish(PeerConnected)", + ); // Re-broadcast local profile so the newly joined peer gets // our display name even if they missed the initial broadcast. if let Some(cb) = &ctx.on_neighbor_up { @@ -132,9 +150,11 @@ async fn topic_listener_loop( c.peers.retain(|p| p != &id2); }) .await; - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::PeerDisconnected(id))) - .ok(); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::PeerDisconnected(id))), + "event_broker.do_send Publish(PeerDisconnected)", + ); } } } @@ -192,12 +212,17 @@ async fn try_insert_event(ctx: &ListenerCtx, event: willow_state::Event) { // Persist and emit for each applied event. for ev in &all_applied { - ctx.persistence - .do_send(persistence_actor::PersistEvent { event: ev.clone() }) - .ok(); + warn_if_err( + ctx.persistence + .do_send(persistence_actor::PersistEvent { event: ev.clone() }), + "persistence.do_send PersistEvent", + ); let client_events = mutations::derive_client_events(ev); for e in client_events { - ctx.event_broker.do_send(willow_actor::Publish(e)).ok(); + warn_if_err( + ctx.event_broker.do_send(willow_actor::Publish(e)), + "event_broker.do_send Publish(derived ClientEvent)", + ); } } // Persist the state snapshot so messages survive a page reload @@ -239,12 +264,14 @@ async fn process_received_message( p.names.insert(peer_id, display_name); }) .await; - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::ProfileUpdated { - peer_id: profile.peer_id, - display_name: profile.display_name, - })) - .ok(); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::ProfileUpdated { + peer_id: profile.peer_id, + display_name: profile.display_name, + })), + "event_broker.do_send Publish(ProfileUpdated from profile broadcast)", + ); return; } Err(willow_identity::IdentityError::PeerMismatch { claimed, signer }) => { @@ -314,11 +341,13 @@ async fn process_received_message( let _is_synced = willow_actor::state::select(&ctx.dag, |ds| ds.managed.is_synced()).await; - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::SyncCompleted { - ops_applied: count, - })) - .ok(); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::SyncCompleted { + ops_applied: count, + })), + "event_broker.do_send Publish(SyncCompleted)", + ); } } crate::ops::WireMessage::SyncRequest { state_hash, .. } => { @@ -340,7 +369,10 @@ async fn process_received_message( if !events.is_empty() { let msg = crate::ops::WireMessage::SyncBatch { events }; if let Some(data) = crate::ops::pack_wire(&msg, &ctx.identity) { - topic.broadcast(bytes::Bytes::from(data)).await.ok(); + warn_if_err( + topic.broadcast(bytes::Bytes::from(data)).await, + "topic.broadcast SyncBatch (SyncRequest reply)", + ); } } } @@ -377,12 +409,14 @@ async fn process_received_message( v.participants.entry(ch).or_default().insert(peer_id); }) .await; - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::VoiceJoined { - channel_id, - peer_id, - })) - .ok(); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::VoiceJoined { + channel_id, + peer_id, + })), + "event_broker.do_send Publish(VoiceJoined)", + ); } crate::ops::WireMessage::VoiceLeave { channel_id, @@ -395,12 +429,14 @@ async fn process_received_message( } }) .await; - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::VoiceLeft { - channel_id, - peer_id, - })) - .ok(); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::VoiceLeft { + channel_id, + peer_id, + })), + "event_broker.do_send Publish(VoiceLeft)", + ); } crate::ops::WireMessage::VoiceSignal { channel_id, @@ -408,13 +444,15 @@ async fn process_received_message( signal, } => { if target_peer == ctx.identity.endpoint_id() { - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::VoiceSignal { - channel_id, - from_peer: signer, - signal, - })) - .ok(); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::VoiceSignal { + channel_id, + from_peer: signer, + signal, + })), + "event_broker.do_send Publish(VoiceSignal)", + ); } } crate::ops::WireMessage::JoinRequest { link_id, peer_id } => { @@ -506,7 +544,10 @@ async fn process_received_message( invite_data, }; if let Some(data) = crate::ops::pack_wire(&msg, &ctx.identity) { - topic.broadcast(bytes::Bytes::from(data)).await.ok(); + warn_if_err( + topic.broadcast(bytes::Bytes::from(data)).await, + "topic.broadcast JoinResponse", + ); } // Grant SendMessages permission to the joining peer so @@ -542,17 +583,22 @@ async fn process_received_message( }) .await; // Persist. - ctx.persistence - .do_send(crate::persistence_actor::PersistEvent { - event: event.clone(), - }) - .ok(); + warn_if_err( + ctx.persistence + .do_send(crate::persistence_actor::PersistEvent { + event: event.clone(), + }), + "persistence.do_send PersistEvent (GrantPermission)", + ); // Broadcast to other peers. if let Some(data) = crate::ops::pack_wire( &crate::ops::WireMessage::Event(event), &ctx.identity, ) { - topic.broadcast(bytes::Bytes::from(data)).await.ok(); + warn_if_err( + topic.broadcast(bytes::Bytes::from(data)).await, + "topic.broadcast Event(GrantPermission)", + ); } } } @@ -563,11 +609,12 @@ async fn process_received_message( invite_data, } => { if target_peer == ctx.identity.endpoint_id() { - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::JoinLinkResponse { - invite_data, - })) - .ok(); + warn_if_err( + ctx.event_broker.do_send(willow_actor::Publish( + ClientEvent::JoinLinkResponse { invite_data }, + )), + "event_broker.do_send Publish(JoinLinkResponse)", + ); } } crate::ops::WireMessage::JoinDenied { @@ -575,11 +622,13 @@ async fn process_received_message( reason, } => { if target_peer == ctx.identity.endpoint_id() { - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::JoinLinkDenied { - reason, - })) - .ok(); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::JoinLinkDenied { + reason, + })), + "event_broker.do_send Publish(JoinLinkDenied)", + ); } } // TopicAnnounce is consumed by the relay; clients ignore it. @@ -603,12 +652,14 @@ async fn process_received_message( p.names.insert(peer_id, name); }) .await; - ctx.event_broker - .do_send(willow_actor::Publish(ClientEvent::ProfileUpdated { - peer_id, - display_name, - })) - .ok(); + warn_if_err( + ctx.event_broker + .do_send(willow_actor::Publish(ClientEvent::ProfileUpdated { + peer_id, + display_name, + })), + "event_broker.do_send Publish(ProfileUpdated from ProfileAnnounce)", + ); } // Worker messages travel on the _willow_workers topic and are never // delivered to the client's server-ops listener.