Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion src/openhuman/socket/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ pub(super) enum ConnectionOutcome {
Lost(String),
/// Connection failed during handshake (triggers increment of backoff).
Failed(String),
/// The server rejected the auth token (`Socket.IO connect error: Invalid
/// token`). The loop must refresh the session token before retrying —
/// retrying with the same stale token would 401 on every attempt and
/// eventually produce a spurious Sentry "sustained outage" event.
InvalidToken,
}

#[cfg(test)]
Expand All @@ -44,11 +49,13 @@ mod tests {
let a = ConnectionOutcome::Shutdown;
let b = ConnectionOutcome::Lost("net".into());
let c = ConnectionOutcome::Failed("tls".into());
for outcome in [a, b, c] {
let d = ConnectionOutcome::InvalidToken;
for outcome in [a, b, c, d] {
match outcome {
ConnectionOutcome::Shutdown => {}
ConnectionOutcome::Lost(reason) => assert!(!reason.is_empty()),
ConnectionOutcome::Failed(reason) => assert!(!reason.is_empty()),
ConnectionOutcome::InvalidToken => {}
}
}
}
Expand All @@ -65,5 +72,10 @@ mod tests {
} else {
panic!("expected Failed");
}
// InvalidToken carries no reason string — it is self-descriptive.
matches!(
ConnectionOutcome::InvalidToken,
ConnectionOutcome::InvalidToken
);
}
}
130 changes: 128 additions & 2 deletions src/openhuman/socket/ws_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const FAIL_ESCALATE_THRESHOLD: u32 = 5;
/// Background loop that manages the WebSocket connection and reconnection.
pub(super) async fn ws_loop(
url: String,
token: String,
mut token: String,
shared: Arc<SharedState>,
mut emit_rx: mpsc::UnboundedReceiver<String>,
mut shutdown_rx: watch::Receiver<bool>,
Expand Down Expand Up @@ -127,6 +127,63 @@ pub(super) async fn ws_loop(
log_connection_failure(consecutive_failures, &reason);
// keep growing backoff
}
ConnectionOutcome::InvalidToken => {
// The server explicitly rejected our auth token. Retrying
// with the same stale credential would 401 on every attempt
// and, after `FAIL_ESCALATE_THRESHOLD` retries, produce a
// spurious "sustained outage" Sentry event (TAURI-RUST-9C).
//
// Strategy: fetch a fresh token from the credential store. If
// we get a *different* token back, reset the failure streak and
// backoff (this is an auth issue, not a transport outage) and
// reconnect immediately with the new credential.
//
// If the store returns None *or* the same token (still stale),
// fall through the normal `Failed` accounting so the
// sustained-outage Sentry escalation still fires correctly;
// avoid resetting counters before we know the refresh succeeded.
log::warn!(
"[socket] Server rejected token (Invalid token) — attempting token refresh before retry"
);

*shared.status.write() = ConnectionStatus::Disconnected;
*shared.socket_id.write() = None;
emit_state_change(&shared);

if *shutdown_rx.borrow() {
break;
}

match refresh_session_token().await {
Some(fresh) if fresh != token => {
// A genuinely new token — reset retry state and
// reconnect immediately.
consecutive_failures = 0;
backoff = Duration::from_millis(1000);
log::info!(
"[socket] Token refreshed (len={}) — retrying connection immediately",
fresh.len()
);
token = fresh;
// Skip the normal backoff sleep below — reconnect
// right away with the new token.
continue;
}
Some(_) | None => {
log::error!(
"[socket] Token refresh did not yield a new valid session token; \
will retry with backoff"
);
// Treat as a regular failure so the Sentry escalation
// path still fires if this persists.
consecutive_failures = consecutive_failures.saturating_add(1);
log_connection_failure(
consecutive_failures,
"SIO CONNECT: Socket.IO connect error: Invalid token (refresh failed)",
);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
}
}
}

*shared.status.write() = ConnectionStatus::Disconnected;
Expand All @@ -153,6 +210,49 @@ pub(super) async fn ws_loop(
emit_state_change(&shared);
}

// ---------------------------------------------------------------------------
// Token refresh helper
// ---------------------------------------------------------------------------

/// Attempt to load the current session token from the credential store.
///
/// Returns `Some(token)` when a non-empty token is available, `None` when
/// the config cannot be loaded or the store has no valid session.
///
/// This is intentionally the same code path as `handle_connect_with_session`
/// so the two callers stay in sync when the auth plumbing changes.
async fn refresh_session_token() -> Option<String> {
log::debug!("[socket] refresh_session_token: loading config");
let config = match crate::openhuman::config::ops::load_config_with_timeout().await {
Ok(c) => c,
Err(e) => {
log::warn!("[socket] refresh_session_token: config load failed: {e}");
return None;
}
};
match crate::api::jwt::get_session_token(&config) {
Ok(Some(t)) if !t.trim().is_empty() => {
log::debug!(
"[socket] refresh_session_token: got fresh token (len={})",
t.len()
);
Some(t)
}
Ok(Some(_)) => {
log::warn!("[socket] refresh_session_token: stored token is blank");
None
}
Ok(None) => {
log::warn!("[socket] refresh_session_token: no session token in store");
None
}
Err(e) => {
log::warn!("[socket] refresh_session_token: read error: {e}");
None
}
}
}

// ---------------------------------------------------------------------------
// Failure logging
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -276,7 +376,21 @@ async fn run_connection(
.await
{
Ok(Ok(data)) => data,
Ok(Err(e)) => return ConnectionOutcome::Failed(format!("SIO CONNECT: {e}")),
Ok(Err(e)) => {
// Detect the "Invalid token" connect-error from the server
// (type 44 with `{"message":"Invalid token"}`) and surface it
// as `InvalidToken` so the reconnect loop can refresh the
// credential instead of retrying with the same stale token.
// The error string was formatted by `read_sio_connect_ack` as
// "Socket.IO connect error: <message>"; we match on the
// canonical backend message case-insensitively so minor
// casing changes don't re-open the bug.
if is_invalid_token_error(&e) {
log::warn!("[socket] SIO CONNECT rejected with invalid-token error: {e}");
return ConnectionOutcome::InvalidToken;
}
return ConnectionOutcome::Failed(format!("SIO CONNECT: {e}"));
}
Err(_) => {
return ConnectionOutcome::Failed("Timeout waiting for SIO CONNECT ACK".into())
}
Expand Down Expand Up @@ -361,6 +475,18 @@ async fn run_connection(
// Handshake helpers
// ---------------------------------------------------------------------------

/// Return `true` when `error_msg` represents an "Invalid token" rejection
/// from the Socket.IO server.
///
/// `read_sio_connect_ack` formats the error as:
/// `"Socket.IO connect error: <backend message>"`.
/// The backend sends `{"message":"Invalid token"}` (exact casing as of the
/// current production backend). We match case-insensitively so a minor
/// backend casing change does not silently reopen the stale-token retry loop.
fn is_invalid_token_error(error_msg: &str) -> bool {
error_msg.to_lowercase().contains("invalid token")
}

/// Read the Engine.IO OPEN packet (type 0) from the WebSocket.
///
/// Format: `0{"sid":"...","upgrades":[],"pingInterval":25000,"pingTimeout":20000}`
Expand Down
116 changes: 91 additions & 25 deletions src/openhuman/socket/ws_loop_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,97 @@ fn make_shared() -> Arc<SharedState> {
})
}

// ── Invalid-token detection (the fix for TAURI-RUST-9C) ─────────

/// `is_invalid_token_error` must match the exact wire shape the backend sends
/// (`"Socket.IO connect error: Invalid token"`) and must not match
/// legitimately-failed handshakes (`500`, `timeout`, etc.).
#[test]
fn is_invalid_token_error_matches_backend_message() {
// Exact casing the backend currently sends.
assert!(is_invalid_token_error(
"Socket.IO connect error: Invalid token"
));
// Case-insensitive — minor casing changes must not reopen the bug.
assert!(is_invalid_token_error(
"Socket.IO connect error: invalid token"
));
assert!(is_invalid_token_error(
"Socket.IO connect error: INVALID TOKEN"
));
}

#[test]
fn is_invalid_token_error_does_not_match_other_errors() {
assert!(!is_invalid_token_error("Socket.IO connect error: nope"));
assert!(!is_invalid_token_error(
"Timeout waiting for SIO CONNECT ACK"
));
assert!(!is_invalid_token_error(
"WebSocket connect: IO error: timeout"
));
assert!(!is_invalid_token_error(""));
assert!(!is_invalid_token_error("internal server error"));
}

/// Regression guard for TAURI-RUST-9C: the server returning `44{"message":"Invalid token"}`
/// must produce `ConnectionOutcome::InvalidToken`, not `Failed` — so the reconnect
/// loop refreshes the credential instead of retrying with the same stale token.
///
/// We drive this end-to-end through `run_connection` using a mock server that
/// sends the exact wire payload the production backend sends on token expiry.
#[tokio::test]
async fn run_connection_returns_invalid_token_on_auth_reject() {
use tokio::net::TcpListener;
use tokio_tungstenite::accept_async;

let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
let addr = listener.local_addr().expect("addr");

tokio::spawn(async move {
let (stream, _) = listener.accept().await.expect("accept");
let ws = accept_async(stream).await.expect("ws accept");
let (mut write, mut read) = ws.split();

// 1. EIO OPEN
let open = r#"0{"sid":"s","upgrades":[],"pingInterval":25000,"pingTimeout":20000}"#;
let _ = write.send(WsMessage::Text(open.to_string())).await;

// 2. Drain client SIO CONNECT frame.
let _ = read.next().await;

// 3. Send connect error — the exact backend payload for a stale token.
let _ = write
.send(WsMessage::Text(
r#"44{"message":"Invalid token"}"#.to_string(),
))
.await;
let _ = write.close().await;
});

let shared = make_shared();
*shared.status.write() = ConnectionStatus::Disconnected;
let (_emit_tx, mut emit_rx) = mpsc::unbounded_channel::<String>();
let (_shutdown_tx, mut shutdown_rx) = watch::channel(false);
let (internal_tx, _internal_rx) = mpsc::unbounded_channel::<String>();

let mut ws_url = crate::api::socket::websocket_url(&format!("http://{addr}"));
let outcome = run_connection(
&mut ws_url,
"stale-jwt",
&shared,
&mut emit_rx,
&mut shutdown_rx,
&internal_tx,
)
.await;

assert!(
matches!(outcome, ConnectionOutcome::InvalidToken),
"expected InvalidToken for `44{{\"message\":\"Invalid token\"}}` server response"
);
}

// ── Redirect resolution (the real fix for OPENHUMAN-TAURI-9X) ──

#[test]
Expand Down Expand Up @@ -340,31 +431,6 @@ fn sustained_outage_for_network_unreachable_classifies_as_expected() {
);
}

/// Regression guard for TAURI-RUST-4ZD: a TLS handshake aborted by the
/// peer / a firewall / antivirus / corporate TLS proxy surfaces from
/// `run_connection` as `WebSocket connect: TLS error: native-tls error:
/// unexpected EOF during handshake`. The exact wire shape the
/// sustained-outage escalation builds must classify as a
/// network-unreachable expected error so an affected Windows client logs
/// a warn breadcrumb rather than a Sentry event. The pre-existing
/// `"tls handshake"` substring does NOT match this render (the words are
/// not contiguous), so this pins the dedicated `"unexpected eof during
/// handshake"` anchor to the emit site.
#[test]
fn sustained_outage_for_tls_handshake_eof_classifies_as_expected() {
use crate::core::observability::{expected_error_kind, ExpectedErrorKind};

let reason = "WebSocket connect: TLS error: native-tls error: unexpected EOF during handshake";
let detailed = format!(
"[socket] Connection failed (sustained outage after {FAIL_ESCALATE_THRESHOLD} attempts): {reason}"
);
assert_eq!(
expected_error_kind(&detailed),
Some(ExpectedErrorKind::NetworkUnreachable),
"TLS-handshake-EOF shape must classify as expected; got message: {detailed}"
);
}

/// Counterpart: a genuine outage that lacks any of the transport-level
/// markers (e.g. a server-side HTTP 500 wrapped by tungstenite) must
/// still surface as an actionable Sentry event — i.e. not classify as
Expand Down
Loading