diff --git a/CHANGELOG.md b/CHANGELOG.md index eb52f0697..adb9fd1f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 saturation, hyperfine spectroscopy, or pulsed protocols become required. ### Fixed +- **WebSocket broadcast handler now handles Lagged events gracefully and sends periodic ping keepalives to prevent dashboard disconnects** — + `handle_ws_client` and `handle_ws_pose_client` in `wifi-densepose-sensing-server` + were treating `RecvError::Lagged` as a fatal error, causing instant disconnect + when clients fell behind the 256-frame broadcast buffer at 10 Hz ingest. + Clients would reconnect, immediately lag again, and rapid-cycle every 2–4 s. + `Lagged` now continues (drops missed frames, logs debug) rather than breaking. + Added 30 s ping keepalive on the sensing handler to prevent proxy idle timeouts. - **Ghost skeletons in live UI with multi-node ESP32 setups** (#420, ADR-082) — `tracker_bridge::tracker_to_person_detections` documented itself as filtering to `is_alive()` tracks but in fact passed every non-Terminated track to the diff --git a/v2/crates/wifi-densepose-sensing-server/src/main.rs b/v2/crates/wifi-densepose-sensing-server/src/main.rs index a8b207e47..d6bef9a4b 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/main.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/main.rs @@ -2003,6 +2003,10 @@ async fn handle_ws_client(mut socket: WebSocket, state: SharedState) { info!("WebSocket client connected (sensing)"); + // ADR-044/045: ping/pong keepalive to prevent proxy idle timeouts. + let mut ping_interval = tokio::time::interval(std::time::Duration::from_secs(30)); + ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { tokio::select! { msg = rx.recv() => { @@ -2012,13 +2016,24 @@ async fn handle_ws_client(mut socket: WebSocket, state: SharedState) { break; } } - Err(_) => break, + // Lagged: client fell behind — skip missed frames, don't disconnect. + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + tracing::debug!("WS client lagged by {n} frames, skipping"); + continue; + } + Err(_) => break, // channel closed + } + } + _ = ping_interval.tick() => { + if socket.send(Message::Ping(vec![].into())).await.is_err() { + break; } } msg = socket.recv() => { match msg { Some(Ok(Message::Close(_))) | None => break, - _ => {} // ignore client messages + Some(Ok(Message::Pong(_))) => {} // keepalive response + _ => {} // ignore other client messages } } } @@ -2134,7 +2149,12 @@ async fn handle_ws_pose_client(mut socket: WebSocket, state: SharedState) { } } } - Err(_) => break, + // Lagged: skip missed frames, don't disconnect. + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + tracing::debug!("WS pose client lagged by {n} frames, skipping"); + continue; + } + Err(_) => break, // channel closed } } msg = socket.recv() => { @@ -2149,6 +2169,7 @@ async fn handle_ws_pose_client(mut socket: WebSocket, state: SharedState) { } } Some(Ok(Message::Close(_))) | None => break, + Some(Ok(Message::Pong(_))) => {} // keepalive response _ => {} } }