From ad127563fba18463dbc69c6c0d36f44b49fb469e Mon Sep 17 00:00:00 2001 From: Tyler Longwell <109685178+tlongwell-block@users.noreply.github.com> Date: Tue, 19 May 2026 22:47:26 -0400 Subject: [PATCH 1/2] feat(relay): surface connection loss in the workspace header MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an observable `ConnectionState` to the relay singleton and a status indicator next to the workspace name so users can tell when the relay is unreachable. Addresses Alec's bug report about Warp's half-connected state silently breaking Sprout. The half-open case (Warp orange icon, asleep VPN, etc.) is the interesting one: tungstenite auto-pongs and the OS keeps the TCP socket open, so the WS layer reports nothing wrong. We add an app-level liveness probe β€” a periodic NIP-01 REQ with a filter that matches nothing real β€” and treat a missing EOSE within ~30s as a stalled socket. When stalled the watchdog tears down the WS so the existing reconnect path runs. UI: when state is `reconnecting`, `stalled`, or `disconnected` (after a 2s debounce to avoid flashing on brief blips), the 🌱 emoji is replaced with a pulsing red WifiOff icon, the workspace name turns red, and a tooltip explains what's happening. Split out of relayClientSession.ts: - relayConnectionStateEmitter.ts β€” pure observable - relayStallWatchdog.ts β€” probe + onStall callback Tests: 16 new unit tests covering the emitter, watchdog probe shape, EOSE handling, timeout/send-failure stalls, stop() cancellation, and idempotent start(). Refs Sprout bug: relay connection loss warning Signed-off-by: Tyler Longwell <109685178+tlongwell-block@users.noreply.github.com> Co-authored-by: Dawn (sprout agent) --- desktop/scripts/check-file-sizes.mjs | 2 +- .../workspaces/ui/WorkspaceSwitcher.tsx | 63 ++++++++- desktop/src/shared/api/relayClientSession.ts | 70 ++++++++++ .../src/shared/api/relayClientShared.test.mjs | 16 +++ desktop/src/shared/api/relayClientShared.ts | 29 ++++ .../api/relayConnectionStateEmitter.test.mjs | 79 +++++++++++ .../shared/api/relayConnectionStateEmitter.ts | 58 ++++++++ .../shared/api/relayStallWatchdog.test.mjs | 125 +++++++++++++++++ desktop/src/shared/api/relayStallWatchdog.ts | 130 ++++++++++++++++++ desktop/src/shared/api/useRelayConnection.ts | 64 +++++++++ 10 files changed, 630 insertions(+), 6 deletions(-) create mode 100644 desktop/src/shared/api/relayClientShared.test.mjs create mode 100644 desktop/src/shared/api/relayConnectionStateEmitter.test.mjs create mode 100644 desktop/src/shared/api/relayConnectionStateEmitter.ts create mode 100644 desktop/src/shared/api/relayStallWatchdog.test.mjs create mode 100644 desktop/src/shared/api/relayStallWatchdog.ts create mode 100644 desktop/src/shared/api/useRelayConnection.ts diff --git a/desktop/scripts/check-file-sizes.mjs b/desktop/scripts/check-file-sizes.mjs index 54c924c11..03ba56797 100644 --- a/desktop/scripts/check-file-sizes.mjs +++ b/desktop/scripts/check-file-sizes.mjs @@ -43,7 +43,7 @@ const overrides = new Map([ ["src/features/messages/ui/MessageComposer.tsx", 710], // media upload handlers (paste, drop, dialog) + channelId reset effect + edit mode (pre-fill, save, cancel, escape) + composer autofocus (#572) ["src/features/settings/ui/SettingsView.tsx", 600], ["src/features/sidebar/ui/AppSidebar.tsx", 860], // channels + forums creation forms + Pulse nav - ["src/shared/api/relayClientSession.ts", 930], // durable websocket session manager with reconnect/replay/recovery state + sendTypingIndicator + fetchChannelHistoryBefore + subscribeToChannelLive (huddle TTS) + subscribeToHuddleEvents (huddle indicator) + disconnect() for workspace switch teardown + fetchEvents/subscribeLive/publishEvent for NIP-RS read state + publishUserStatus/subscribeToUserStatusUpdates (NIP-38) + ["src/shared/api/relayClientSession.ts", 1000], // durable websocket session manager with reconnect/replay/recovery state + sendTypingIndicator + fetchChannelHistoryBefore + subscribeToChannelLive (huddle TTS) + subscribeToHuddleEvents (huddle indicator) + disconnect() for workspace switch teardown + fetchEvents/subscribeLive/publishEvent for NIP-RS read state + publishUserStatus/subscribeToUserStatusUpdates (NIP-38) + ConnectionState plumbing & stall-watchdog wiring for half-open WS detection (Warp orange-icon case) β€” emitter + watchdog logic extracted to relayConnectionStateEmitter.ts / relayStallWatchdog.ts ["src/shared/api/tauri.ts", 1100], // remote agent provider API bindings + canvas API functions ["src-tauri/src/lib.rs", 710], // sprout-media:// proxy + Range headers + Sprout nest init (ensure_nest) in setup() + huddle command registration + PTT global shortcut handler + persona pack commands + app_handle storage for event emission ["src-tauri/src/commands/media.rs", 730], // ffmpeg video transcode + poster frame extraction + run_ffmpeg_with_timeout (find_ffmpeg via resolve_command, is_video_file, transcode_to_mp4, extract_poster_frame, transcode_and_extract_poster) + spawn_blocking wrappers + tests diff --git a/desktop/src/features/workspaces/ui/WorkspaceSwitcher.tsx b/desktop/src/features/workspaces/ui/WorkspaceSwitcher.tsx index b51b59def..18c47fc12 100644 --- a/desktop/src/features/workspaces/ui/WorkspaceSwitcher.tsx +++ b/desktop/src/features/workspaces/ui/WorkspaceSwitcher.tsx @@ -1,4 +1,10 @@ -import { Check, ChevronDown, MoreHorizontal, Plus } from "lucide-react"; +import { + Check, + ChevronDown, + MoreHorizontal, + Plus, + WifiOff, +} from "lucide-react"; import * as React from "react"; import type { Workspace } from "@/features/workspaces/types"; @@ -14,9 +20,24 @@ import { SidebarMenuButton, SidebarMenuItem, } from "@/shared/ui/sidebar"; +import { Tooltip, TooltipContent, TooltipTrigger } from "@/shared/ui/tooltip"; +import type { ConnectionState } from "@/shared/api/relayClientShared"; +import { + isRelayConnectionDegraded, + useRelayConnection, +} from "@/shared/api/useRelayConnection"; import { EditWorkspaceDialog } from "./EditWorkspaceDialog"; +const CONNECTION_STATE_LABEL: Record = { + idle: "Not connected", + connecting: "Connecting…", + connected: "Connected", + reconnecting: "Reconnecting to relay…", + stalled: "Connection lost β€” relay is not responding", + disconnected: "Disconnected from relay", +}; + type WorkspaceSwitcherProps = { activeWorkspace: Workspace | null; workspaces: Workspace[]; @@ -40,6 +61,9 @@ export function WorkspaceSwitcher({ const [editingWorkspace, setEditingWorkspace] = React.useState(null); const [dropdownOpen, setDropdownOpen] = React.useState(false); + const connectionState = useRelayConnection(); + const degraded = isRelayConnectionDegraded(connectionState); + const connectionLabel = CONNECTION_STATE_LABEL[connectionState]; return ( <> @@ -48,14 +72,43 @@ export function WorkspaceSwitcher({ - - 🌱 - - + {degraded ? ( + + + + + + + + {connectionLabel} + + + ) : ( + + 🌱 + + )} + {activeWorkspace?.name ?? "No workspace"} diff --git a/desktop/src/shared/api/relayClientSession.ts b/desktop/src/shared/api/relayClientSession.ts index 2d846d473..8f134652e 100644 --- a/desktop/src/shared/api/relayClientSession.ts +++ b/desktop/src/shared/api/relayClientSession.ts @@ -16,10 +16,13 @@ import { import { getTextPayload, sortEvents, + type ConnectionState, type PendingEvent, type RelaySubscription, type RelaySubscriptionFilter, } from "@/shared/api/relayClientShared"; +import { RelayConnectionStateEmitter } from "@/shared/api/relayConnectionStateEmitter"; +import { RelayStallWatchdog } from "@/shared/api/relayStallWatchdog"; import { buildThreadReferenceTags } from "@/features/messages/lib/threading"; const RECONNECT_BASE_DELAY_MS = 1_000, @@ -27,6 +30,24 @@ const RECONNECT_BASE_DELAY_MS = 1_000, const RECONNECT_REPLAY_SKEW_SECS = 5, EVENT_BATCH_MS = 16; +/** + * Application-level liveness probe. + * + * Tungstenite auto-pongs and the OS keeps the TCP socket open, so a + * half-open WS (Warp's orange-icon state, an asleep VPN, etc.) presents as + * "fully connected" to the WS layer indefinitely β€” no Close, no Error. + * + * We work around that by periodically sending a cheap NIP-01 `REQ` with + * `limit: 0` and waiting for the matching `EOSE`. Two consecutive misses + * (β‰ˆ STALL_PROBE_INTERVAL_MS + STALL_PROBE_TIMEOUT_MS) flips state to + * `stalled` and force-resets the socket so the existing reconnect path runs. + * + * The filter intentionally matches nothing real so the relay only ever + * answers with EOSE. + */ +const STALL_PROBE_INTERVAL_MS = 20_000; +const STALL_PROBE_TIMEOUT_MS = 10_000; + export class RelayClient { private wsId: number | null = null; private relayUrl: string | null = null; @@ -49,6 +70,17 @@ export class RelayClient { private notifyReconnectListeners = false; private onMessageChannel: Channel | null = null; + private connectionStateEmitter = new RelayConnectionStateEmitter("idle"); + private stallWatchdog = new RelayStallWatchdog({ + intervalMs: STALL_PROBE_INTERVAL_MS, + probeTimeoutMs: STALL_PROBE_TIMEOUT_MS, + sendRaw: (payload) => this.sendRaw(payload), + onStall: (error) => { + this.connectionStateEmitter.set("stalled"); + this.resetConnection(error); + }, + }); + /** * Cleanly tear down the connection without scheduling a reconnect. * Used during workspace switches to reset the singleton before the @@ -61,10 +93,12 @@ export class RelayClient { window.clearTimeout(this.reconnectTimeout); this.reconnectTimeout = null; } + this.stallWatchdog.stop(); this.keepAliveRequested = false; this.relayUrl = null; this.hasConnectedOnce = false; this.notifyReconnectListeners = false; + this.connectionStateEmitter.set("idle"); if (this.wsId !== null) { void invoke("plugin:websocket|disconnect", { id: this.wsId }).catch( @@ -101,6 +135,7 @@ export class RelayClient { } this.eventBuffer = []; this.reconnectListeners.clear(); + this.connectionStateEmitter.clear(); this.onMessageChannel = null; this.reconnectDelayMs = RECONNECT_BASE_DELAY_MS; } @@ -346,6 +381,20 @@ export class RelayClient { }; } + /** Current connection state β€” synchronous read. */ + getConnectionState(): ConnectionState { + return this.connectionStateEmitter.get(); + } + + /** + * Subscribe to connection-state transitions. The listener is invoked + * immediately with the current state so callers don't need a separate + * `getConnectionState()` call to seed their UI. + */ + subscribeToConnectionState(listener: (state: ConnectionState) => void) { + return this.connectionStateEmitter.subscribe(listener); + } + private async ensureConnected() { if (this.connectPromise) { return this.connectPromise; @@ -373,6 +422,10 @@ export class RelayClient { } private async connect() { + this.connectionStateEmitter.set( + this.hasConnectedOnce ? "reconnecting" : "connecting", + ); + if (!this.relayUrl) { this.relayUrl = await getRelayWsUrl(); } @@ -406,6 +459,8 @@ export class RelayClient { this.reconnectDelayMs = RECONNECT_BASE_DELAY_MS; await this.replayLiveSubscriptions(); + this.connectionStateEmitter.set("connected"); + this.stallWatchdog.start(); this.emitReconnectIfNeeded(); } @@ -724,6 +779,12 @@ export class RelayClient { } private handleEose(subId: string) { + if (this.stallWatchdog.handleEose(subId)) { + // Probe round-trip succeeded β€” silently CLOSE the sub. + void this.closeSubscription(subId).catch(() => {}); + return; + } + const subscription = this.subscriptions.get(subId); if (!subscription) { return; @@ -875,10 +936,19 @@ export class RelayClient { }, ) { this.onMessageChannel = null; + this.stallWatchdog.stop(); if (this.flushTimeout !== null) window.clearTimeout(this.flushTimeout); this.flushTimeout = null; this.eventBuffer = []; + if (options?.reconnect === false) { + this.connectionStateEmitter.set("disconnected"); + } else if (this.connectionStateEmitter.get() !== "stalled") { + // Stall is a stronger signal than a generic drop; keep it until the + // reconnect timer transitions us back to "reconnecting" in connect(). + this.connectionStateEmitter.set("reconnecting"); + } + if (options?.reconnect !== false && this.hasConnectedOnce) { this.notifyReconnectListeners = true; } diff --git a/desktop/src/shared/api/relayClientShared.test.mjs b/desktop/src/shared/api/relayClientShared.test.mjs new file mode 100644 index 000000000..de3180e8a --- /dev/null +++ b/desktop/src/shared/api/relayClientShared.test.mjs @@ -0,0 +1,16 @@ +import assert from "node:assert/strict"; +import test from "node:test"; + +import { isRelayConnectionDegraded } from "./relayClientShared.ts"; + +test("isRelayConnectionDegraded β€” healthy states are not degraded", () => { + assert.equal(isRelayConnectionDegraded("idle"), false); + assert.equal(isRelayConnectionDegraded("connecting"), false); + assert.equal(isRelayConnectionDegraded("connected"), false); +}); + +test("isRelayConnectionDegraded β€” non-healthy states are degraded", () => { + assert.equal(isRelayConnectionDegraded("reconnecting"), true); + assert.equal(isRelayConnectionDegraded("stalled"), true); + assert.equal(isRelayConnectionDegraded("disconnected"), true); +}); diff --git a/desktop/src/shared/api/relayClientShared.ts b/desktop/src/shared/api/relayClientShared.ts index c6cfcbf47..996c54154 100644 --- a/desktop/src/shared/api/relayClientShared.ts +++ b/desktop/src/shared/api/relayClientShared.ts @@ -1,5 +1,34 @@ import type { RelayEvent } from "@/shared/api/types"; +/** + * Observable connection state for the relay singleton. + * + * - `idle` β€” never tried to connect yet (post-init, pre-workspace). + * - `connecting` β€” initial socket + AUTH handshake in flight. + * - `connected` β€” socket open and AUTH'd. + * - `reconnecting` β€” socket dropped, waiting for the backoff timer. + * - `stalled` β€” socket is *open* per the WS layer but no inbound frames + * for a long time (half-open / Warp split-brain). We + * surface this so the UI can warn even though tungstenite + * hasn't reported anything wrong yet. + * - `disconnected` β€” final/terminal disconnect (auth rejected, workspace + * switch, etc.) β€” no auto-reconnect scheduled. + */ +export type ConnectionState = + | "idle" + | "connecting" + | "connected" + | "reconnecting" + | "stalled" + | "disconnected"; + +/** True when the UI should surface a "connection lost" indicator. */ +export function isRelayConnectionDegraded(state: ConnectionState): boolean { + return ( + state === "reconnecting" || state === "stalled" || state === "disconnected" + ); +} + export type RelaySubscriptionFilter = { kinds: number[]; limit: number; diff --git a/desktop/src/shared/api/relayConnectionStateEmitter.test.mjs b/desktop/src/shared/api/relayConnectionStateEmitter.test.mjs new file mode 100644 index 000000000..b13b37ae8 --- /dev/null +++ b/desktop/src/shared/api/relayConnectionStateEmitter.test.mjs @@ -0,0 +1,79 @@ +import assert from "node:assert/strict"; +import test from "node:test"; + +import { RelayConnectionStateEmitter } from "./relayConnectionStateEmitter.ts"; + +test("starts in the constructor-provided state", () => { + const e = new RelayConnectionStateEmitter("connecting"); + assert.equal(e.get(), "connecting"); +}); + +test("defaults to 'idle' when no initial state given", () => { + const e = new RelayConnectionStateEmitter(); + assert.equal(e.get(), "idle"); +}); + +test("set() notifies subscribers and updates state", () => { + const e = new RelayConnectionStateEmitter("connecting"); + const seen = []; + e.subscribe((s) => seen.push(s)); + // Initial replay: subscriber sees current state. + assert.deepEqual(seen, ["connecting"]); + + e.set("connected"); + assert.deepEqual(seen, ["connecting", "connected"]); + assert.equal(e.get(), "connected"); +}); + +test("set() is a no-op when state is unchanged", () => { + const e = new RelayConnectionStateEmitter("connected"); + const seen = []; + e.subscribe((s) => seen.push(s)); + assert.deepEqual(seen, ["connected"]); + + e.set("connected"); + // No duplicate emission. + assert.deepEqual(seen, ["connected"]); +}); + +test("unsubscribe stops further notifications", () => { + const e = new RelayConnectionStateEmitter("idle"); + const seen = []; + const unsub = e.subscribe((s) => seen.push(s)); + unsub(); + e.set("connecting"); + assert.deepEqual(seen, ["idle"]); +}); + +test("listener exceptions do not break other listeners", () => { + const e = new RelayConnectionStateEmitter("idle"); + const seenA = []; + const seenB = []; + // Quiet the expected error log. + const originalError = console.error; + const errors = []; + console.error = (...args) => errors.push(args); + try { + e.subscribe(() => { + throw new Error("boom A"); + }); + e.subscribe((s) => seenA.push(s)); + e.subscribe((s) => seenB.push(s)); + e.set("connected"); + } finally { + console.error = originalError; + } + assert.deepEqual(seenA, ["idle", "connected"]); + assert.deepEqual(seenB, ["idle", "connected"]); + // One during subscribe-replay, one during the set() emit. + assert.ok(errors.length >= 2, "expected console.error to be called"); +}); + +test("clear() drops listeners", () => { + const e = new RelayConnectionStateEmitter("idle"); + const seen = []; + e.subscribe((s) => seen.push(s)); + e.clear(); + e.set("connecting"); + assert.deepEqual(seen, ["idle"]); +}); diff --git a/desktop/src/shared/api/relayConnectionStateEmitter.ts b/desktop/src/shared/api/relayConnectionStateEmitter.ts new file mode 100644 index 000000000..e3773546d --- /dev/null +++ b/desktop/src/shared/api/relayConnectionStateEmitter.ts @@ -0,0 +1,58 @@ +import type { ConnectionState } from "@/shared/api/relayClientShared"; + +/** + * Small observable for `ConnectionState`. Kept separate from the relay client + * so the file-size budget stays sane and so the contract is unit-testable + * without dragging the whole session manager into scope. + * + * Semantics: + * - `set(next)` is a no-op if `next === current` (no duplicate events). + * - New subscribers receive the current state immediately (synchronously), + * so React hooks don't need a separate "getState" call to seed their UI. + * - Listener exceptions are caught and logged, never propagated back into + * the caller of `set`. + */ +export class RelayConnectionStateEmitter { + private state: ConnectionState; + private listeners = new Set<(state: ConnectionState) => void>(); + + constructor(initial: ConnectionState = "idle") { + this.state = initial; + } + + get(): ConnectionState { + return this.state; + } + + set(next: ConnectionState): void { + if (this.state === next) { + return; + } + this.state = next; + for (const listener of this.listeners) { + try { + listener(next); + } catch (error) { + console.error("Failed to deliver relay connection state", error); + } + } + } + + subscribe(listener: (state: ConnectionState) => void): () => void { + this.listeners.add(listener); + try { + listener(this.state); + } catch (error) { + console.error("Failed to deliver initial relay connection state", error); + } + return () => { + this.listeners.delete(listener); + }; + } + + /** Drop all listeners. Called on workspace teardown to match the legacy + * pattern used for the reconnect listener set. */ + clear(): void { + this.listeners.clear(); + } +} diff --git a/desktop/src/shared/api/relayStallWatchdog.test.mjs b/desktop/src/shared/api/relayStallWatchdog.test.mjs new file mode 100644 index 000000000..510552af4 --- /dev/null +++ b/desktop/src/shared/api/relayStallWatchdog.test.mjs @@ -0,0 +1,125 @@ +import assert from "node:assert/strict"; +import test from "node:test"; + +import { RelayStallWatchdog } from "./relayStallWatchdog.ts"; + +// Shim `window` to expose the timer + crypto APIs the watchdog uses. The +// real RelayClient runs in a Tauri WebView where `window` exists; under +// node:test we wire it to the same globals. +if (typeof globalThis.window === "undefined") { + globalThis.window = { + setInterval: (...args) => setInterval(...args), + clearInterval: (id) => clearInterval(id), + setTimeout: (...args) => setTimeout(...args), + clearTimeout: (id) => clearTimeout(id), + }; +} + +const sleep = (ms) => new Promise((r) => setTimeout(r, ms)); + +function makeWatchdog(overrides = {}) { + const sends = []; + const stalls = []; + const wd = new RelayStallWatchdog({ + intervalMs: overrides.intervalMs ?? 30, + probeTimeoutMs: overrides.probeTimeoutMs ?? 30, + sendRaw: + overrides.sendRaw ?? + (async (payload) => { + sends.push(payload); + }), + onStall: (err) => { + stalls.push(err); + }, + now: overrides.now, + }); + return { wd, sends, stalls }; +} + +test("first probe carries the expected NIP-01 REQ shape", async () => { + const { wd, sends } = makeWatchdog(); + wd.start(); + // Wait until a probe is observed. + for (let i = 0; i < 50 && sends.length === 0; i++) await sleep(5); + wd.stop(); + assert.equal(sends.length, 1); + const [verb, subId, filter] = sends[0]; + assert.equal(verb, "REQ"); + assert.match(subId, /^probe-/); + assert.deepEqual(filter.kinds, [9999]); + assert.equal(filter.limit, 0); + assert.ok(typeof filter.since === "number"); +}); + +test("EOSE for the current probe clears in-flight + lets the next probe fire", async () => { + const { wd, sends, stalls } = makeWatchdog(); + wd.start(); + for (let i = 0; i < 50 && sends.length === 0; i++) await sleep(5); + const firstSubId = sends[0][1]; + // Resolve the probe. + assert.equal(wd.handleEose(firstSubId), true); + // Within the next interval+probe window, another probe should fire. + for (let i = 0; i < 50 && sends.length < 2; i++) await sleep(5); + wd.stop(); + assert.ok(sends.length >= 2, `expected β‰₯2 probes, got ${sends.length}`); + assert.equal(stalls.length, 0, "no stall expected when EOSE arrives"); +}); + +test("EOSE for a non-probe subId returns false", () => { + const { wd } = makeWatchdog(); + assert.equal(wd.handleEose("live-abc"), false); +}); + +test("timeout without EOSE triggers onStall", async () => { + const { wd, stalls } = makeWatchdog(); + wd.start(); + // intervalMs (30) before first send + probeTimeoutMs (30) β€” wait a bit + // past their sum. + for (let i = 0; i < 50 && stalls.length === 0; i++) await sleep(10); + wd.stop(); + assert.ok(stalls.length >= 1, "expected at least one stall"); + assert.match(stalls[0].message, /stalled/i); +}); + +test("send-side failure triggers onStall immediately", async () => { + const { wd, stalls } = makeWatchdog({ + sendRaw: async () => { + throw new Error("ws is dead"); + }, + }); + wd.start(); + for (let i = 0; i < 50 && stalls.length === 0; i++) await sleep(5); + wd.stop(); + assert.ok(stalls.length >= 1, "expected stall on send failure"); + assert.match(stalls[0].message, /ws is dead/); +}); + +test("stop() cancels a pending stall timeout", async () => { + const { wd, sends, stalls } = makeWatchdog(); + wd.start(); + for (let i = 0; i < 50 && sends.length === 0; i++) await sleep(5); + // Probe is in-flight; stop before it can time out. + wd.stop(); + // Wait well past the timeout window. + await sleep(80); + assert.equal(stalls.length, 0, "stop() should cancel the pending stall"); +}); + +test("start() is idempotent β€” does not create duplicate intervals", async () => { + const { wd, sends } = makeWatchdog({ intervalMs: 25, probeTimeoutMs: 200 }); + wd.start(); + wd.start(); + wd.start(); + // Allow one probe to fire and resolve it so the *next* probe can fire if + // the interval was somehow doubled. + for (let i = 0; i < 50 && sends.length === 0; i++) await sleep(5); + wd.handleEose(sends[0][1]); + // Within one more interval window, exactly one more probe should fire + // (not two), which is the contract for `start()` being idempotent. + await sleep(45); + wd.stop(); + assert.ok( + sends.length <= 2, + `expected ≀2 probes despite triple-start(), got ${sends.length}`, + ); +}); diff --git a/desktop/src/shared/api/relayStallWatchdog.ts b/desktop/src/shared/api/relayStallWatchdog.ts new file mode 100644 index 000000000..7fb64cd9f --- /dev/null +++ b/desktop/src/shared/api/relayStallWatchdog.ts @@ -0,0 +1,130 @@ +/** + * Application-level liveness probe for the relay WebSocket. + * + * Tungstenite auto-pongs and the OS keeps the TCP socket open, so a + * half-open WS (Warp's orange-icon state, an asleep VPN, etc.) presents as + * "fully connected" to the WS layer indefinitely β€” no Close, no Error. + * + * We work around that by periodically sending a cheap NIP-01 `REQ` with a + * filter that matches nothing real (kind 9999, far-future `since`) and + * waiting for the matching `EOSE`. If the relay doesn't answer within + * `probeTimeoutMs` (or the send itself fails), `onStall` is invoked with an + * `Error` describing the failure. The relay client then force-resets the + * socket so its existing reconnect path runs. + * + * The watchdog has no opinion on connection state or reconnects; it just + * detects that the socket is unhealthy and reports it. + */ +export type RelayStallWatchdogConfig = { + intervalMs: number; + probeTimeoutMs: number; + /** Send a raw NIP-01 frame. Returns the same promise as the WS layer. */ + sendRaw: (payload: unknown[]) => Promise; + /** Called once when a stall is detected. The watchdog stops itself first. */ + onStall: (error: Error) => void; + /** Optional override for tests. */ + now?: () => number; +}; + +export class RelayStallWatchdog { + private readonly intervalMs: number; + private readonly probeTimeoutMs: number; + private readonly sendRaw: (payload: unknown[]) => Promise; + private readonly onStall: (error: Error) => void; + private readonly now: () => number; + + private intervalHandle: number | null = null; + private probeTimeoutHandle: number | null = null; + private currentProbeSubId: string | null = null; + + constructor(config: RelayStallWatchdogConfig) { + this.intervalMs = config.intervalMs; + this.probeTimeoutMs = config.probeTimeoutMs; + this.sendRaw = config.sendRaw; + this.onStall = config.onStall; + this.now = config.now ?? (() => Math.floor(Date.now() / 1_000)); + } + + /** Idempotent. Safe to call from `connect()` completion. */ + start(): void { + if (this.intervalHandle !== null) { + return; + } + this.intervalHandle = window.setInterval( + () => this.sendProbe(), + this.intervalMs, + ); + } + + /** Idempotent. Clears any in-flight probe + the interval. */ + stop(): void { + if (this.intervalHandle !== null) { + window.clearInterval(this.intervalHandle); + this.intervalHandle = null; + } + if (this.probeTimeoutHandle !== null) { + window.clearTimeout(this.probeTimeoutHandle); + this.probeTimeoutHandle = null; + } + this.currentProbeSubId = null; + } + + /** + * Called from the relay client's `handleEose` to satisfy the in-flight + * probe. Returns `true` if the subId belonged to the watchdog (and the + * caller should not look it up in the normal subscription map). + */ + handleEose(subId: string): boolean { + if (subId !== this.currentProbeSubId) { + return false; + } + if (this.probeTimeoutHandle !== null) { + window.clearTimeout(this.probeTimeoutHandle); + this.probeTimeoutHandle = null; + } + this.currentProbeSubId = null; + return true; + } + + private sendProbe(): void { + if (this.probeTimeoutHandle !== null) { + // A probe is still outstanding β€” don't pile on; the timeout handler + // will declare the stall when it fires. + return; + } + + const subId = `probe-${crypto.randomUUID()}`; + this.currentProbeSubId = subId; + this.probeTimeoutHandle = window.setTimeout(() => { + this.probeTimeoutHandle = null; + this.currentProbeSubId = null; + this.fail( + new Error("Relay socket stalled β€” no response to liveness probe."), + ); + }, this.probeTimeoutMs); + + const farFuture = this.now() + 86_400; + void this.sendRaw([ + "REQ", + subId, + { kinds: [9999], limit: 0, since: farFuture }, + ]).catch((error) => { + // Send failed β†’ the socket is dead. + if (this.probeTimeoutHandle !== null) { + window.clearTimeout(this.probeTimeoutHandle); + this.probeTimeoutHandle = null; + } + this.currentProbeSubId = null; + this.fail( + error instanceof Error + ? error + : new Error("Relay socket stalled β€” probe send failed."), + ); + }); + } + + private fail(error: Error): void { + this.stop(); + this.onStall(error); + } +} diff --git a/desktop/src/shared/api/useRelayConnection.ts b/desktop/src/shared/api/useRelayConnection.ts new file mode 100644 index 000000000..e852cd287 --- /dev/null +++ b/desktop/src/shared/api/useRelayConnection.ts @@ -0,0 +1,64 @@ +import * as React from "react"; + +import { relayClient } from "@/shared/api/relayClient"; +import { + isRelayConnectionDegraded, + type ConnectionState, +} from "@/shared/api/relayClientShared"; + +export { isRelayConnectionDegraded }; + +/** + * Subscribe to the relay singleton's connection state with a debounce on + * "transient" transitions. + * + * Why debounce? In normal use the socket can flap to `reconnecting` for a + * second or two between events (initial AUTH, brief network blips) β€” we don't + * want a red "connection lost" banner painting itself for every blink. We + * only surface non-healthy states once they've persisted past + * `degradedAfterMs` (default 2 seconds). `connected` / `idle` are reported + * immediately so the UI clears the warning the moment things recover. + */ +export function useRelayConnection(options?: { + /** Min ms a non-healthy state must persist before being reported. */ + degradedAfterMs?: number; +}): ConnectionState { + const degradedAfterMs = options?.degradedAfterMs ?? 2_000; + const [state, setState] = React.useState(() => + relayClient.getConnectionState(), + ); + + React.useEffect(() => { + let pendingTimer: number | null = null; + + const clearPending = () => { + if (pendingTimer !== null) { + window.clearTimeout(pendingTimer); + pendingTimer = null; + } + }; + + const unsubscribe = relayClient.subscribeToConnectionState((next) => { + clearPending(); + + if (next === "connected" || next === "idle" || next === "disconnected") { + // Healthy or terminal β€” report immediately. + setState(next); + return; + } + + // Transient degraded states β€” wait before showing the user a warning. + pendingTimer = window.setTimeout(() => { + pendingTimer = null; + setState(next); + }, degradedAfterMs); + }); + + return () => { + clearPending(); + unsubscribe(); + }; + }, [degradedAfterMs]); + + return state; +} From 5ef8aff670570be49b949c282fb5e0d353d38d37 Mon Sep 17 00:00:00 2001 From: Tyler Longwell <109685178+tlongwell-block@users.noreply.github.com> Date: Tue, 19 May 2026 22:47:41 -0400 Subject: [PATCH 2/2] fix(relay): make terminal disconnect actually terminal MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Max's review of #623 caught a correctness bug. When the relay rejects AUTH (kind:22242 OK=false), handleOk calls resetConnection(err, { reconnect: false }) which sets state to 'disconnected' and clears the reconnect timer. BUT β€” the reconnect timer's catch handler (`void this.ensureConnected().catch(() => this.scheduleReconnect())`) and the retry wrappers in publishEvent / sendRawWithReconnectRetry would then immediately race the disconnected state back to 'reconnecting'. Same hazard from any future call that goes through ensureConnected(). Fix: add a sticky 'terminal' latch. - Set in resetConnection when reconnect:false. - Guards scheduleReconnect() (no new timers) and ensureConnected() (throws a terminal error). - Cleared on explicit re-engagement: disconnect() (workspace switch) and preconnect() (caller is asking us to come back up). While here: - Extract the reconnect/refuse predicates to a pure relayReconnectPolicy module so the state-machine rules live in one legible place β€” also addresses Max's elegance note about distributing edge cases across connect/reset/scheduleReconnect. - Fix the stale 'two consecutive misses' doc comment β€” implementation stalls on a single missed probe (or send failure) within the STALL_PROBE_TIMEOUT_MS window. 8 new unit tests cover the policy: baseline schedules, terminal wins over every other reason, pending timer suppresses, live socket suppresses, no-subs+no-keepalive idles, keep-alive alone is enough, and shouldRefuseConnect mirrors terminal. Signed-off-by: Tyler Longwell <109685178+tlongwell-block@users.noreply.github.com> Co-authored-by: Dawn (sprout agent) --- desktop/scripts/check-file-sizes.mjs | 2 +- desktop/src/shared/api/relayClientSession.ts | 47 +++++++++-- .../shared/api/relayReconnectPolicy.test.mjs | 83 +++++++++++++++++++ .../src/shared/api/relayReconnectPolicy.ts | 43 ++++++++++ 4 files changed, 168 insertions(+), 7 deletions(-) create mode 100644 desktop/src/shared/api/relayReconnectPolicy.test.mjs create mode 100644 desktop/src/shared/api/relayReconnectPolicy.ts diff --git a/desktop/scripts/check-file-sizes.mjs b/desktop/scripts/check-file-sizes.mjs index 03ba56797..965abcffe 100644 --- a/desktop/scripts/check-file-sizes.mjs +++ b/desktop/scripts/check-file-sizes.mjs @@ -43,7 +43,7 @@ const overrides = new Map([ ["src/features/messages/ui/MessageComposer.tsx", 710], // media upload handlers (paste, drop, dialog) + channelId reset effect + edit mode (pre-fill, save, cancel, escape) + composer autofocus (#572) ["src/features/settings/ui/SettingsView.tsx", 600], ["src/features/sidebar/ui/AppSidebar.tsx", 860], // channels + forums creation forms + Pulse nav - ["src/shared/api/relayClientSession.ts", 1000], // durable websocket session manager with reconnect/replay/recovery state + sendTypingIndicator + fetchChannelHistoryBefore + subscribeToChannelLive (huddle TTS) + subscribeToHuddleEvents (huddle indicator) + disconnect() for workspace switch teardown + fetchEvents/subscribeLive/publishEvent for NIP-RS read state + publishUserStatus/subscribeToUserStatusUpdates (NIP-38) + ConnectionState plumbing & stall-watchdog wiring for half-open WS detection (Warp orange-icon case) β€” emitter + watchdog logic extracted to relayConnectionStateEmitter.ts / relayStallWatchdog.ts + ["src/shared/api/relayClientSession.ts", 1040], // durable websocket session manager with reconnect/replay/recovery state + sendTypingIndicator + fetchChannelHistoryBefore + subscribeToChannelLive (huddle TTS) + subscribeToHuddleEvents (huddle indicator) + disconnect() for workspace switch teardown + fetchEvents/subscribeLive/publishEvent for NIP-RS read state + publishUserStatus/subscribeToUserStatusUpdates (NIP-38) + ConnectionState plumbing & stall-watchdog wiring for half-open WS detection (Warp orange-icon case) + terminal session latch (auth rejection no longer racing back to reconnecting) β€” emitter + watchdog + reconnect policy logic extracted to relayConnectionStateEmitter.ts / relayStallWatchdog.ts / relayReconnectPolicy.ts ["src/shared/api/tauri.ts", 1100], // remote agent provider API bindings + canvas API functions ["src-tauri/src/lib.rs", 710], // sprout-media:// proxy + Range headers + Sprout nest init (ensure_nest) in setup() + huddle command registration + PTT global shortcut handler + persona pack commands + app_handle storage for event emission ["src-tauri/src/commands/media.rs", 730], // ffmpeg video transcode + poster frame extraction + run_ffmpeg_with_timeout (find_ffmpeg via resolve_command, is_video_file, transcode_to_mp4, extract_poster_frame, transcode_and_extract_poster) + spawn_blocking wrappers + tests diff --git a/desktop/src/shared/api/relayClientSession.ts b/desktop/src/shared/api/relayClientSession.ts index 8f134652e..ef29b440a 100644 --- a/desktop/src/shared/api/relayClientSession.ts +++ b/desktop/src/shared/api/relayClientSession.ts @@ -22,6 +22,10 @@ import { type RelaySubscriptionFilter, } from "@/shared/api/relayClientShared"; import { RelayConnectionStateEmitter } from "@/shared/api/relayConnectionStateEmitter"; +import { + shouldRefuseConnect, + shouldScheduleReconnect, +} from "@/shared/api/relayReconnectPolicy"; import { RelayStallWatchdog } from "@/shared/api/relayStallWatchdog"; import { buildThreadReferenceTags } from "@/features/messages/lib/threading"; @@ -38,9 +42,10 @@ const RECONNECT_REPLAY_SKEW_SECS = 5, * "fully connected" to the WS layer indefinitely β€” no Close, no Error. * * We work around that by periodically sending a cheap NIP-01 `REQ` with - * `limit: 0` and waiting for the matching `EOSE`. Two consecutive misses - * (β‰ˆ STALL_PROBE_INTERVAL_MS + STALL_PROBE_TIMEOUT_MS) flips state to - * `stalled` and force-resets the socket so the existing reconnect path runs. + * `limit: 0` and waiting for the matching `EOSE`. A single missed probe + * (no EOSE within `STALL_PROBE_TIMEOUT_MS`) β€” or a send-side failure on the + * probe itself β€” flips state to `stalled` and force-resets the socket so + * the existing reconnect path runs. * * The filter intentionally matches nothing real so the relay only ever * answers with EOSE. @@ -70,6 +75,18 @@ export class RelayClient { private notifyReconnectListeners = false; private onMessageChannel: Channel | null = null; + /** + * Sticky terminal flag. Set when `resetConnection` is called with + * `reconnect: false` (today: auth rejection). Acts as a hard guard against + * the reconnect-timer / retry-wrapper paths racing back to "reconnecting" + * after we've already declared the session dead. + * + * Cleared only on explicit user re-engagement: `disconnect()` (workspace + * switch β€” the singleton is being reused for a different workspace) and + * `preconnect()` (caller is asking us to come back up). + */ + private terminal = false; + private connectionStateEmitter = new RelayConnectionStateEmitter("idle"); private stallWatchdog = new RelayStallWatchdog({ intervalMs: STALL_PROBE_INTERVAL_MS, @@ -98,6 +115,7 @@ export class RelayClient { this.relayUrl = null; this.hasConnectedOnce = false; this.notifyReconnectListeners = false; + this.terminal = false; this.connectionStateEmitter.set("idle"); if (this.wsId !== null) { @@ -369,6 +387,9 @@ export class RelayClient { } async preconnect() { + // Explicit re-engagement. If the session went terminal (auth rejection) + // the caller is asking us to try again, so clear the latch. + this.terminal = false; this.keepAliveRequested = true; await this.ensureConnected(); } @@ -396,6 +417,15 @@ export class RelayClient { } private async ensureConnected() { + if (shouldRefuseConnect({ terminal: this.terminal })) { + // Session is terminal (e.g. relay rejected auth). Refuse to connect + // until an explicit re-engagement (disconnect()/preconnect()) clears + // the flag. Without this, the reconnect timer's catch handler β€” and + // the retry wrappers in publishEvent / sendRawWithReconnectRetry β€” + // would race the terminal "disconnected" state back to "reconnecting". + throw new Error("Relay session is terminal; cannot reconnect."); + } + if (this.connectPromise) { return this.connectPromise; } @@ -888,9 +918,13 @@ export class RelayClient { private scheduleReconnect() { if ( - this.reconnectTimeout || - this.wsId !== null || - (!this.keepAliveRequested && !this.hasLiveSubscriptions()) + !shouldScheduleReconnect({ + terminal: this.terminal, + hasPendingReconnect: this.reconnectTimeout !== null, + hasLiveSocket: this.wsId !== null, + keepAliveRequested: this.keepAliveRequested, + hasLiveSubscriptions: this.hasLiveSubscriptions(), + }) ) { return; } @@ -942,6 +976,7 @@ export class RelayClient { this.eventBuffer = []; if (options?.reconnect === false) { + this.terminal = true; this.connectionStateEmitter.set("disconnected"); } else if (this.connectionStateEmitter.get() !== "stalled") { // Stall is a stronger signal than a generic drop; keep it until the diff --git a/desktop/src/shared/api/relayReconnectPolicy.test.mjs b/desktop/src/shared/api/relayReconnectPolicy.test.mjs new file mode 100644 index 000000000..5a2625347 --- /dev/null +++ b/desktop/src/shared/api/relayReconnectPolicy.test.mjs @@ -0,0 +1,83 @@ +import assert from "node:assert/strict"; +import test from "node:test"; + +import { + shouldRefuseConnect, + shouldScheduleReconnect, +} from "./relayReconnectPolicy.ts"; + +// The "happy" baseline that *should* schedule a reconnect: not terminal, +// no pending timer, no live socket, and at least something keeping the +// session alive (a live subscription, in this case). +const baseline = Object.freeze({ + terminal: false, + hasPendingReconnect: false, + hasLiveSocket: false, + keepAliveRequested: false, + hasLiveSubscriptions: true, +}); + +test("baseline scenario schedules a reconnect", () => { + assert.equal(shouldScheduleReconnect({ ...baseline }), true); +}); + +test("terminal session refuses to schedule (Max's auth-rejection scenario)", () => { + assert.equal(shouldScheduleReconnect({ ...baseline, terminal: true }), false); +}); + +test("terminal beats every other reason to reconnect", () => { + // Even with every "yes please reconnect" predicate flipped on, terminal + // wins. This is the critical guarantee against the reconnect timer's + // catch handler resurrecting a dead session. + assert.equal( + shouldScheduleReconnect({ + terminal: true, + hasPendingReconnect: false, + hasLiveSocket: false, + keepAliveRequested: true, + hasLiveSubscriptions: true, + }), + false, + ); +}); + +test("pending reconnect timer suppresses scheduling another", () => { + assert.equal( + shouldScheduleReconnect({ ...baseline, hasPendingReconnect: true }), + false, + ); +}); + +test("live socket suppresses scheduling", () => { + assert.equal( + shouldScheduleReconnect({ ...baseline, hasLiveSocket: true }), + false, + ); +}); + +test("no live subs and no keep-alive β†’ don't keep an idle socket up", () => { + assert.equal( + shouldScheduleReconnect({ + ...baseline, + hasLiveSubscriptions: false, + keepAliveRequested: false, + }), + false, + ); +}); + +test("keep-alive alone is enough to schedule", () => { + assert.equal( + shouldScheduleReconnect({ + ...baseline, + hasLiveSubscriptions: false, + keepAliveRequested: true, + }), + true, + ); +}); + +test("shouldRefuseConnect mirrors terminal", () => { + assert.equal(shouldRefuseConnect({ terminal: false }), false); + assert.equal(shouldRefuseConnect({ terminal: true }), true); +}); diff --git a/desktop/src/shared/api/relayReconnectPolicy.ts b/desktop/src/shared/api/relayReconnectPolicy.ts new file mode 100644 index 000000000..bcdb53fc1 --- /dev/null +++ b/desktop/src/shared/api/relayReconnectPolicy.ts @@ -0,0 +1,43 @@ +/** + * Pure helpers for the relay reconnect policy. + * + * Extracted from `RelayClient` so the decision rules β€” when to schedule a + * reconnect, when to refuse to connect β€” live in one legible place that + * unit tests can reach without booting the WS layer. + * + * The rules: + * + * - **Terminal sessions never reconnect.** When the relay has explicitly + * rejected us (today: kind:22242 AUTH OK=false) the session is dead + * until the user re-engages (workspace switch or explicit preconnect). + * This guards the reconnect-timer catch handler β€” and the retry wrappers + * in `publishEvent` / `sendRawWithReconnectRetry` β€” from racing the + * `disconnected` state back to `reconnecting`. + * + * - **No-op when a reconnect is already scheduled or in progress.** + * A pending timer or a live `wsId` means we have nothing to do. + * + * - **No reconnect needed when nothing wants the socket.** No live + * subscription, no `keepAliveRequested` from `preconnect()` β†’ don't + * keep an idle socket. + */ +export type RelayReconnectInputs = { + terminal: boolean; + hasPendingReconnect: boolean; + hasLiveSocket: boolean; + keepAliveRequested: boolean; + hasLiveSubscriptions: boolean; +}; + +export function shouldScheduleReconnect(inputs: RelayReconnectInputs): boolean { + if (inputs.terminal) return false; + if (inputs.hasPendingReconnect) return false; + if (inputs.hasLiveSocket) return false; + if (!inputs.keepAliveRequested && !inputs.hasLiveSubscriptions) return false; + return true; +} + +/** Whether `ensureConnected()` should refuse with a terminal error. */ +export function shouldRefuseConnect(inputs: { terminal: boolean }): boolean { + return inputs.terminal; +}