Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 115 additions & 64 deletions crates/client/src/listeners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,22 @@ pub(crate) fn truncate_to_chars(s: String, max: usize) -> String {
}
}

/// Log a `tracing::warn!` if `r` is `Err`, otherwise drop the success value.
///
/// Used in this listener hot loop to replace bare `.ok();` calls on
/// "fire-and-forget" sends — broker `do_send`, topic `broadcast`, persistence
/// `do_send`, etc. The previous pattern silently dropped these failures, so a
/// dead broker or broken topic would cause the listener to keep running
/// without any record. We still proceed (the listener should not abort on a
/// downstream failure), but field logs now surface the issue.
///
/// See issue #253.
fn warn_if_err<T, E: std::fmt::Debug>(r: Result<T, E>, context: &'static str) {
if let Err(e) = r {
tracing::warn!(?e, "{context}");
}
}

/// Context passed to listener tasks with all the actor addresses they need.
pub struct ListenerCtx {
pub event_state: Addr<willow_actor::StateActor<willow_state::ServerState>>,
Expand Down Expand Up @@ -117,9 +133,11 @@ async fn topic_listener_loop<T: TopicHandle, E: TopicEvents>(
}
})
.await;
ctx.event_broker
.do_send(willow_actor::Publish(ClientEvent::PeerConnected(id)))
.ok();
warn_if_err(
ctx.event_broker
.do_send(willow_actor::Publish(ClientEvent::PeerConnected(id))),
"event_broker.do_send Publish(PeerConnected)",
);
// Re-broadcast local profile so the newly joined peer gets
// our display name even if they missed the initial broadcast.
if let Some(cb) = &ctx.on_neighbor_up {
Expand All @@ -132,9 +150,11 @@ async fn topic_listener_loop<T: TopicHandle, E: TopicEvents>(
c.peers.retain(|p| p != &id2);
})
.await;
ctx.event_broker
.do_send(willow_actor::Publish(ClientEvent::PeerDisconnected(id)))
.ok();
warn_if_err(
ctx.event_broker
.do_send(willow_actor::Publish(ClientEvent::PeerDisconnected(id))),
"event_broker.do_send Publish(PeerDisconnected)",
);
}
}
}
Expand Down Expand Up @@ -192,12 +212,17 @@ async fn try_insert_event(ctx: &ListenerCtx, event: willow_state::Event) {

// Persist and emit for each applied event.
for ev in &all_applied {
ctx.persistence
.do_send(persistence_actor::PersistEvent { event: ev.clone() })
.ok();
warn_if_err(
ctx.persistence
.do_send(persistence_actor::PersistEvent { event: ev.clone() }),
"persistence.do_send PersistEvent",
);
let client_events = mutations::derive_client_events(ev);
for e in client_events {
ctx.event_broker.do_send(willow_actor::Publish(e)).ok();
warn_if_err(
ctx.event_broker.do_send(willow_actor::Publish(e)),
"event_broker.do_send Publish(derived ClientEvent)",
);
}
}
// Persist the state snapshot so messages survive a page reload
Expand Down Expand Up @@ -239,12 +264,14 @@ async fn process_received_message<T: TopicHandle>(
p.names.insert(peer_id, display_name);
})
.await;
ctx.event_broker
.do_send(willow_actor::Publish(ClientEvent::ProfileUpdated {
peer_id: profile.peer_id,
display_name: profile.display_name,
}))
.ok();
warn_if_err(
ctx.event_broker
.do_send(willow_actor::Publish(ClientEvent::ProfileUpdated {
peer_id: profile.peer_id,
display_name: profile.display_name,
})),
"event_broker.do_send Publish(ProfileUpdated from profile broadcast)",
);
return;
}
Err(willow_identity::IdentityError::PeerMismatch { claimed, signer }) => {
Expand Down Expand Up @@ -314,11 +341,13 @@ async fn process_received_message<T: TopicHandle>(
let _is_synced =
willow_actor::state::select(&ctx.dag, |ds| ds.managed.is_synced()).await;

ctx.event_broker
.do_send(willow_actor::Publish(ClientEvent::SyncCompleted {
ops_applied: count,
}))
.ok();
warn_if_err(
ctx.event_broker
.do_send(willow_actor::Publish(ClientEvent::SyncCompleted {
ops_applied: count,
})),
"event_broker.do_send Publish(SyncCompleted)",
);
}
}
crate::ops::WireMessage::SyncRequest { state_hash, .. } => {
Expand All @@ -340,7 +369,10 @@ async fn process_received_message<T: TopicHandle>(
if !events.is_empty() {
let msg = crate::ops::WireMessage::SyncBatch { events };
if let Some(data) = crate::ops::pack_wire(&msg, &ctx.identity) {
topic.broadcast(bytes::Bytes::from(data)).await.ok();
warn_if_err(
topic.broadcast(bytes::Bytes::from(data)).await,
"topic.broadcast SyncBatch (SyncRequest reply)",
);
}
}
}
Expand Down Expand Up @@ -377,12 +409,14 @@ async fn process_received_message<T: TopicHandle>(
v.participants.entry(ch).or_default().insert(peer_id);
})
.await;
ctx.event_broker
.do_send(willow_actor::Publish(ClientEvent::VoiceJoined {
channel_id,
peer_id,
}))
.ok();
warn_if_err(
ctx.event_broker
.do_send(willow_actor::Publish(ClientEvent::VoiceJoined {
channel_id,
peer_id,
})),
"event_broker.do_send Publish(VoiceJoined)",
);
}
crate::ops::WireMessage::VoiceLeave {
channel_id,
Expand All @@ -395,26 +429,30 @@ async fn process_received_message<T: TopicHandle>(
}
})
.await;
ctx.event_broker
.do_send(willow_actor::Publish(ClientEvent::VoiceLeft {
channel_id,
peer_id,
}))
.ok();
warn_if_err(
ctx.event_broker
.do_send(willow_actor::Publish(ClientEvent::VoiceLeft {
channel_id,
peer_id,
})),
"event_broker.do_send Publish(VoiceLeft)",
);
}
crate::ops::WireMessage::VoiceSignal {
channel_id,
target_peer,
signal,
} => {
if target_peer == ctx.identity.endpoint_id() {
ctx.event_broker
.do_send(willow_actor::Publish(ClientEvent::VoiceSignal {
channel_id,
from_peer: signer,
signal,
}))
.ok();
warn_if_err(
ctx.event_broker
.do_send(willow_actor::Publish(ClientEvent::VoiceSignal {
channel_id,
from_peer: signer,
signal,
})),
"event_broker.do_send Publish(VoiceSignal)",
);
}
}
crate::ops::WireMessage::JoinRequest { link_id, peer_id } => {
Expand Down Expand Up @@ -506,7 +544,10 @@ async fn process_received_message<T: TopicHandle>(
invite_data,
};
if let Some(data) = crate::ops::pack_wire(&msg, &ctx.identity) {
topic.broadcast(bytes::Bytes::from(data)).await.ok();
warn_if_err(
topic.broadcast(bytes::Bytes::from(data)).await,
"topic.broadcast JoinResponse",
);
}

// Grant SendMessages permission to the joining peer so
Expand Down Expand Up @@ -542,17 +583,22 @@ async fn process_received_message<T: TopicHandle>(
})
.await;
// Persist.
ctx.persistence
.do_send(crate::persistence_actor::PersistEvent {
event: event.clone(),
})
.ok();
warn_if_err(
ctx.persistence
.do_send(crate::persistence_actor::PersistEvent {
event: event.clone(),
}),
"persistence.do_send PersistEvent (GrantPermission)",
);
// Broadcast to other peers.
if let Some(data) = crate::ops::pack_wire(
&crate::ops::WireMessage::Event(event),
&ctx.identity,
) {
topic.broadcast(bytes::Bytes::from(data)).await.ok();
warn_if_err(
topic.broadcast(bytes::Bytes::from(data)).await,
"topic.broadcast Event(GrantPermission)",
);
}
}
}
Expand All @@ -563,23 +609,26 @@ async fn process_received_message<T: TopicHandle>(
invite_data,
} => {
if target_peer == ctx.identity.endpoint_id() {
ctx.event_broker
.do_send(willow_actor::Publish(ClientEvent::JoinLinkResponse {
invite_data,
}))
.ok();
warn_if_err(
ctx.event_broker.do_send(willow_actor::Publish(
ClientEvent::JoinLinkResponse { invite_data },
)),
"event_broker.do_send Publish(JoinLinkResponse)",
);
}
}
crate::ops::WireMessage::JoinDenied {
target_peer,
reason,
} => {
if target_peer == ctx.identity.endpoint_id() {
ctx.event_broker
.do_send(willow_actor::Publish(ClientEvent::JoinLinkDenied {
reason,
}))
.ok();
warn_if_err(
ctx.event_broker
.do_send(willow_actor::Publish(ClientEvent::JoinLinkDenied {
reason,
})),
"event_broker.do_send Publish(JoinLinkDenied)",
);
}
}
// TopicAnnounce is consumed by the relay; clients ignore it.
Expand All @@ -603,12 +652,14 @@ async fn process_received_message<T: TopicHandle>(
p.names.insert(peer_id, name);
})
.await;
ctx.event_broker
.do_send(willow_actor::Publish(ClientEvent::ProfileUpdated {
peer_id,
display_name,
}))
.ok();
warn_if_err(
ctx.event_broker
.do_send(willow_actor::Publish(ClientEvent::ProfileUpdated {
peer_id,
display_name,
})),
"event_broker.do_send Publish(ProfileUpdated from ProfileAnnounce)",
);
}
// Worker messages travel on the _willow_workers topic and are never
// delivered to the client's server-ops listener.
Expand Down