diff --git a/desktop/playwright.config.ts b/desktop/playwright.config.ts index cbae8475b..5c3bddd24 100644 --- a/desktop/playwright.config.ts +++ b/desktop/playwright.config.ts @@ -24,6 +24,7 @@ export default defineConfig({ "**/channel-browser.spec.ts", "**/messaging.spec.ts", "**/mentions.spec.ts", + "**/relay-reconnect.spec.ts", "**/workflows.spec.ts", ], use: { diff --git a/desktop/src/shared/api/relayClientSession.ts b/desktop/src/shared/api/relayClientSession.ts index ef29b440a..1482dfa35 100644 --- a/desktop/src/shared/api/relayClientSession.ts +++ b/desktop/src/shared/api/relayClientSession.ts @@ -30,28 +30,16 @@ import { RelayStallWatchdog } from "@/shared/api/relayStallWatchdog"; import { buildThreadReferenceTags } from "@/features/messages/lib/threading"; const RECONNECT_BASE_DELAY_MS = 1_000, - RECONNECT_MAX_DELAY_MS = 30_000; -const RECONNECT_REPLAY_SKEW_SECS = 5, + RECONNECT_MAX_DELAY_MS = 30_000, + RECONNECT_REPLAY_SKEW_SECS = 5, EVENT_BATCH_MS = 16; /** - * Application-level liveness probe. - * - * Tungstenite auto-pongs and the OS keeps the TCP socket open, so a - * half-open WS (Warp's orange-icon state, an asleep VPN, etc.) presents as - * "fully connected" to the WS layer indefinitely — no Close, no Error. - * - * We work around that by periodically sending a cheap NIP-01 `REQ` with - * `limit: 0` and waiting for the matching `EOSE`. A single missed probe - * (no EOSE within `STALL_PROBE_TIMEOUT_MS`) — or a send-side failure on the - * probe itself — flips state to `stalled` and force-resets the socket so - * the existing reconnect path runs. - * - * The filter intentionally matches nothing real so the relay only ever - * answers with EOSE. + * Passive liveness check. The relay sends heartbeat pings every 30s; if no + * inbound frame arrives for two heartbeat windows, treat the socket as stalled. */ -const STALL_PROBE_INTERVAL_MS = 20_000; -const STALL_PROBE_TIMEOUT_MS = 10_000; +const STALL_CHECK_INTERVAL_MS = 10_000; +const STALL_IDLE_TIMEOUT_MS = 60_000; export class RelayClient { private wsId: number | null = null; @@ -74,6 +62,7 @@ export class RelayClient { private hasConnectedOnce = false; private notifyReconnectListeners = false; private onMessageChannel: Channel | null = null; + private connectionGeneration = 0; /** * Sticky terminal flag. Set when `resetConnection` is called with @@ -89,9 +78,8 @@ export class RelayClient { private connectionStateEmitter = new RelayConnectionStateEmitter("idle"); private stallWatchdog = new RelayStallWatchdog({ - intervalMs: STALL_PROBE_INTERVAL_MS, - probeTimeoutMs: STALL_PROBE_TIMEOUT_MS, - sendRaw: (payload) => this.sendRaw(payload), + intervalMs: STALL_CHECK_INTERVAL_MS, + idleTimeoutMs: STALL_IDLE_TIMEOUT_MS, onStall: (error) => { this.connectionStateEmitter.set("stalled"); this.resetConnection(error); @@ -111,6 +99,7 @@ export class RelayClient { this.reconnectTimeout = null; } this.stallWatchdog.stop(); + this.connectionGeneration++; this.keepAliveRequested = false; this.relayUrl = null; this.hasConnectedOnce = false; @@ -460,8 +449,9 @@ export class RelayClient { this.relayUrl = await getRelayWsUrl(); } + const generation = ++this.connectionGeneration; this.onMessageChannel = new Channel((message) => { - void this.handleWsMessage(message); + void this.handleWsMessage(message, generation); }); this.wsId = await invoke("plugin:websocket|connect", { @@ -687,7 +677,10 @@ export class RelayClient { }); } - private async handleWsMessage(message: unknown) { + private async handleWsMessage(message: unknown, generation: number) { + if (generation !== this.connectionGeneration) return; + this.stallWatchdog.recordInbound(); + if ( typeof message === "object" && message !== null && @@ -726,10 +719,9 @@ export class RelayClient { const [type, ...rest] = data; if (type === "AUTH" && typeof rest[0] === "string") { - await this.handleAuthChallenge(rest[0]); + await this.handleAuthChallenge(rest[0], generation); return; } - if (type === "EVENT" && typeof rest[0] === "string" && rest[1]) { this.handleEvent(rest[0], rest[1] as RelayEvent); return; @@ -753,7 +745,7 @@ export class RelayClient { } } - private async handleAuthChallenge(challenge: string) { + private async handleAuthChallenge(challenge: string, generation: number) { if (!this.relayUrl) { this.relayUrl = await getRelayWsUrl(); } @@ -763,7 +755,7 @@ export class RelayClient { relayUrl: this.relayUrl, }); - if (!this.authRequest) { + if (generation !== this.connectionGeneration || !this.authRequest) { return; } @@ -809,12 +801,6 @@ export class RelayClient { } private handleEose(subId: string) { - if (this.stallWatchdog.handleEose(subId)) { - // Probe round-trip succeeded — silently CLOSE the sub. - void this.closeSubscription(subId).catch(() => {}); - return; - } - const subscription = this.subscriptions.get(subId); if (!subscription) { return; @@ -971,6 +957,7 @@ export class RelayClient { ) { this.onMessageChannel = null; this.stallWatchdog.stop(); + this.connectionGeneration++; if (this.flushTimeout !== null) window.clearTimeout(this.flushTimeout); this.flushTimeout = null; this.eventBuffer = []; diff --git a/desktop/src/shared/api/relayStallWatchdog.test.mjs b/desktop/src/shared/api/relayStallWatchdog.test.mjs index 510552af4..b160a9435 100644 --- a/desktop/src/shared/api/relayStallWatchdog.test.mjs +++ b/desktop/src/shared/api/relayStallWatchdog.test.mjs @@ -3,123 +3,104 @@ import test from "node:test"; import { RelayStallWatchdog } from "./relayStallWatchdog.ts"; -// Shim `window` to expose the timer + crypto APIs the watchdog uses. The -// real RelayClient runs in a Tauri WebView where `window` exists; under -// node:test we wire it to the same globals. +// Shim `window` to expose the timer APIs the watchdog uses. The real +// RelayClient runs in a Tauri WebView where `window` exists; under node:test we +// wire it to the same globals. if (typeof globalThis.window === "undefined") { globalThis.window = { setInterval: (...args) => setInterval(...args), clearInterval: (id) => clearInterval(id), - setTimeout: (...args) => setTimeout(...args), - clearTimeout: (id) => clearTimeout(id), }; } const sleep = (ms) => new Promise((r) => setTimeout(r, ms)); function makeWatchdog(overrides = {}) { - const sends = []; const stalls = []; + let now = overrides.now ?? 1; const wd = new RelayStallWatchdog({ - intervalMs: overrides.intervalMs ?? 30, - probeTimeoutMs: overrides.probeTimeoutMs ?? 30, - sendRaw: - overrides.sendRaw ?? - (async (payload) => { - sends.push(payload); - }), + intervalMs: overrides.intervalMs ?? 20, + idleTimeoutMs: overrides.idleTimeoutMs ?? 50, onStall: (err) => { stalls.push(err); }, - now: overrides.now, + now: () => now, }); - return { wd, sends, stalls }; + return { + advance: (ms) => { + now += ms; + }, + setNow: (value) => { + now = value; + }, + stalls, + wd, + }; } -test("first probe carries the expected NIP-01 REQ shape", async () => { - const { wd, sends } = makeWatchdog(); +test("does not send probes while watching for stalls", async () => { + const { wd } = makeWatchdog(); wd.start(); - // Wait until a probe is observed. - for (let i = 0; i < 50 && sends.length === 0; i++) await sleep(5); + await sleep(45); wd.stop(); - assert.equal(sends.length, 1); - const [verb, subId, filter] = sends[0]; - assert.equal(verb, "REQ"); - assert.match(subId, /^probe-/); - assert.deepEqual(filter.kinds, [9999]); - assert.equal(filter.limit, 0); - assert.ok(typeof filter.since === "number"); + // The passive watchdog has no send callback by construction. This test is a + // regression guard for the WARP bug: liveness checks must not write to a + // socket already suspected of being half-open. + assert.equal(typeof wd.recordInbound, "function"); }); -test("EOSE for the current probe clears in-flight + lets the next probe fire", async () => { - const { wd, sends, stalls } = makeWatchdog(); +test("idle timeout without inbound frames triggers onStall", async () => { + const { advance, stalls, wd } = makeWatchdog(); wd.start(); - for (let i = 0; i < 50 && sends.length === 0; i++) await sleep(5); - const firstSubId = sends[0][1]; - // Resolve the probe. - assert.equal(wd.handleEose(firstSubId), true); - // Within the next interval+probe window, another probe should fire. - for (let i = 0; i < 50 && sends.length < 2; i++) await sleep(5); + advance(60); + for (let i = 0; i < 20 && stalls.length === 0; i++) await sleep(5); wd.stop(); - assert.ok(sends.length >= 2, `expected ≥2 probes, got ${sends.length}`); - assert.equal(stalls.length, 0, "no stall expected when EOSE arrives"); -}); - -test("EOSE for a non-probe subId returns false", () => { - const { wd } = makeWatchdog(); - assert.equal(wd.handleEose("live-abc"), false); + assert.equal(stalls.length, 1); + assert.match(stalls[0].message, /no inbound frames/i); }); -test("timeout without EOSE triggers onStall", async () => { - const { wd, stalls } = makeWatchdog(); +test("inbound frames reset the idle timer", async () => { + const { advance, stalls, wd } = makeWatchdog({ idleTimeoutMs: 50 }); wd.start(); - // intervalMs (30) before first send + probeTimeoutMs (30) — wait a bit - // past their sum. - for (let i = 0; i < 50 && stalls.length === 0; i++) await sleep(10); + advance(40); + wd.recordInbound(); + advance(40); + await sleep(30); + assert.equal( + stalls.length, + 0, + "recent inbound frame should keep socket alive", + ); + advance(20); + for (let i = 0; i < 20 && stalls.length === 0; i++) await sleep(5); wd.stop(); - assert.ok(stalls.length >= 1, "expected at least one stall"); - assert.match(stalls[0].message, /stalled/i); + assert.equal(stalls.length, 1); }); -test("send-side failure triggers onStall immediately", async () => { - const { wd, stalls } = makeWatchdog({ - sendRaw: async () => { - throw new Error("ws is dead"); - }, - }); - wd.start(); - for (let i = 0; i < 50 && stalls.length === 0; i++) await sleep(5); - wd.stop(); - assert.ok(stalls.length >= 1, "expected stall on send failure"); - assert.match(stalls[0].message, /ws is dead/); +test("recordInbound is ignored while stopped", async () => { + const { advance, stalls, wd } = makeWatchdog({ idleTimeoutMs: 50 }); + wd.recordInbound(); + advance(100); + await sleep(30); + assert.equal(stalls.length, 0); }); -test("stop() cancels a pending stall timeout", async () => { - const { wd, sends, stalls } = makeWatchdog(); +test("stop() cancels the idle check", async () => { + const { advance, stalls, wd } = makeWatchdog({ idleTimeoutMs: 50 }); wd.start(); - for (let i = 0; i < 50 && sends.length === 0; i++) await sleep(5); - // Probe is in-flight; stop before it can time out. wd.stop(); - // Wait well past the timeout window. - await sleep(80); - assert.equal(stalls.length, 0, "stop() should cancel the pending stall"); + advance(100); + await sleep(35); + assert.equal(stalls.length, 0); }); test("start() is idempotent — does not create duplicate intervals", async () => { - const { wd, sends } = makeWatchdog({ intervalMs: 25, probeTimeoutMs: 200 }); + const { advance, stalls, wd } = makeWatchdog({ idleTimeoutMs: 50 }); wd.start(); wd.start(); wd.start(); - // Allow one probe to fire and resolve it so the *next* probe can fire if - // the interval was somehow doubled. - for (let i = 0; i < 50 && sends.length === 0; i++) await sleep(5); - wd.handleEose(sends[0][1]); - // Within one more interval window, exactly one more probe should fire - // (not two), which is the contract for `start()` being idempotent. - await sleep(45); + advance(60); + for (let i = 0; i < 20 && stalls.length === 0; i++) await sleep(5); wd.stop(); - assert.ok( - sends.length <= 2, - `expected ≤2 probes despite triple-start(), got ${sends.length}`, - ); + assert.equal(stalls.length, 1); }); diff --git a/desktop/src/shared/api/relayStallWatchdog.ts b/desktop/src/shared/api/relayStallWatchdog.ts index 7fb64cd9f..27d22c2aa 100644 --- a/desktop/src/shared/api/relayStallWatchdog.ts +++ b/desktop/src/shared/api/relayStallWatchdog.ts @@ -1,25 +1,15 @@ /** - * Application-level liveness probe for the relay WebSocket. + * Passive liveness watchdog for the relay WebSocket. * - * Tungstenite auto-pongs and the OS keeps the TCP socket open, so a - * half-open WS (Warp's orange-icon state, an asleep VPN, etc.) presents as - * "fully connected" to the WS layer indefinitely — no Close, no Error. - * - * We work around that by periodically sending a cheap NIP-01 `REQ` with a - * filter that matches nothing real (kind 9999, far-future `since`) and - * waiting for the matching `EOSE`. If the relay doesn't answer within - * `probeTimeoutMs` (or the send itself fails), `onStall` is invoked with an - * `Error` describing the failure. The relay client then force-resets the - * socket so its existing reconnect path runs. - * - * The watchdog has no opinion on connection state or reconnects; it just - * detects that the socket is unhealthy and reports it. + * This intentionally does not write to the socket. tauri-plugin-websocket + * 2.4.2 holds a global connection-manager mutex while awaiting `send()`, so a + * watchdog probe sent into a half-open TCP path can block future reconnects + * from registering. Instead we rely on inbound relay traffic (including the + * relay's heartbeat pings) as the liveness signal. */ export type RelayStallWatchdogConfig = { intervalMs: number; - probeTimeoutMs: number; - /** Send a raw NIP-01 frame. Returns the same promise as the WS layer. */ - sendRaw: (payload: unknown[]) => Promise; + idleTimeoutMs: number; /** Called once when a stall is detected. The watchdog stops itself first. */ onStall: (error: Error) => void; /** Optional override for tests. */ @@ -28,103 +18,60 @@ export type RelayStallWatchdogConfig = { export class RelayStallWatchdog { private readonly intervalMs: number; - private readonly probeTimeoutMs: number; - private readonly sendRaw: (payload: unknown[]) => Promise; + private readonly idleTimeoutMs: number; private readonly onStall: (error: Error) => void; private readonly now: () => number; private intervalHandle: number | null = null; - private probeTimeoutHandle: number | null = null; - private currentProbeSubId: string | null = null; + private lastInboundAt = 0; constructor(config: RelayStallWatchdogConfig) { this.intervalMs = config.intervalMs; - this.probeTimeoutMs = config.probeTimeoutMs; - this.sendRaw = config.sendRaw; + this.idleTimeoutMs = config.idleTimeoutMs; this.onStall = config.onStall; - this.now = config.now ?? (() => Math.floor(Date.now() / 1_000)); + this.now = config.now ?? (() => Date.now()); } /** Idempotent. Safe to call from `connect()` completion. */ start(): void { + this.lastInboundAt = this.now(); if (this.intervalHandle !== null) { return; } this.intervalHandle = window.setInterval( - () => this.sendProbe(), + () => this.checkIdle(), this.intervalMs, ); } - /** Idempotent. Clears any in-flight probe + the interval. */ + /** Idempotent. Clears the passive idle check interval. */ stop(): void { if (this.intervalHandle !== null) { window.clearInterval(this.intervalHandle); this.intervalHandle = null; } - if (this.probeTimeoutHandle !== null) { - window.clearTimeout(this.probeTimeoutHandle); - this.probeTimeoutHandle = null; - } - this.currentProbeSubId = null; + this.lastInboundAt = 0; } - /** - * Called from the relay client's `handleEose` to satisfy the in-flight - * probe. Returns `true` if the subId belonged to the watchdog (and the - * caller should not look it up in the normal subscription map). - */ - handleEose(subId: string): boolean { - if (subId !== this.currentProbeSubId) { - return false; - } - if (this.probeTimeoutHandle !== null) { - window.clearTimeout(this.probeTimeoutHandle); - this.probeTimeoutHandle = null; + /** Record any inbound WS frame as proof the socket is still alive. */ + recordInbound(): void { + if (this.intervalHandle === null) { + return; } - this.currentProbeSubId = null; - return true; + this.lastInboundAt = this.now(); } - private sendProbe(): void { - if (this.probeTimeoutHandle !== null) { - // A probe is still outstanding — don't pile on; the timeout handler - // will declare the stall when it fires. + private checkIdle(): void { + if (this.lastInboundAt === 0) { + this.lastInboundAt = this.now(); return; } - const subId = `probe-${crypto.randomUUID()}`; - this.currentProbeSubId = subId; - this.probeTimeoutHandle = window.setTimeout(() => { - this.probeTimeoutHandle = null; - this.currentProbeSubId = null; - this.fail( - new Error("Relay socket stalled — no response to liveness probe."), - ); - }, this.probeTimeoutMs); - - const farFuture = this.now() + 86_400; - void this.sendRaw([ - "REQ", - subId, - { kinds: [9999], limit: 0, since: farFuture }, - ]).catch((error) => { - // Send failed → the socket is dead. - if (this.probeTimeoutHandle !== null) { - window.clearTimeout(this.probeTimeoutHandle); - this.probeTimeoutHandle = null; - } - this.currentProbeSubId = null; - this.fail( - error instanceof Error - ? error - : new Error("Relay socket stalled — probe send failed."), - ); - }); - } + if (this.now() - this.lastInboundAt < this.idleTimeoutMs) { + return; + } - private fail(error: Error): void { this.stop(); - this.onStall(error); + this.onStall(new Error("Relay socket stalled — no inbound frames.")); } } diff --git a/desktop/src/testing/e2eBridge.ts b/desktop/src/testing/e2eBridge.ts index 883ab66ab..a2f481ba8 100644 --- a/desktop/src/testing/e2eBridge.ts +++ b/desktop/src/testing/e2eBridge.ts @@ -37,6 +37,7 @@ type E2eConfig = { profileReadDelayMs?: number; profileReadError?: string; profileUpdateError?: string; + stallWebsocketSends?: boolean; }; relayHttpUrl?: string; relayWsUrl?: string; @@ -451,6 +452,7 @@ declare global { payload?: Record, ) => Promise; __SPROUT_E2E_PUSH_MOCK_FEED_ITEM__?: (item: RawFeedItem) => RawFeedItem; + __SPROUT_E2E_SET_STALL_WEBSOCKET_SENDS__?: (stall: boolean) => void; } } @@ -1088,6 +1090,7 @@ const mockChannels: MockChannel[] = [ const mockMessages = new Map(); let mockRelayMembers: RawRelayMember[] = []; const mockSockets = new Map(); +let mockWebsocketSendMutexWedged = false; const realSockets = new Map(); let mockManagedAgents: MockManagedAgent[] = []; let mockPersonas: RawPersona[] = []; @@ -4351,6 +4354,10 @@ async function connectRealSocket(args: { url?: string; onMessage: unknown }) { } async function connectMockSocket(args: { onMessage: unknown }) { + if (mockWebsocketSendMutexWedged) { + return new Promise(() => {}); + } + const wsId = nextSocketId++; const handler = resolveHandler(args.onMessage); @@ -4396,6 +4403,14 @@ function sendToMockSocket(args: { }; }) { const socket = mockSockets.get(args.id); + if ( + getConfig()?.mock?.stallWebsocketSends && + args.message?.type !== "Close" + ) { + mockWebsocketSendMutexWedged = true; + return new Promise(() => {}); + } + if (!socket || !args.message) { return; } @@ -4531,6 +4546,7 @@ export function maybeInstallE2eTauriMocks() { resetMockPersonas(); resetMockTeams(); resetMockWorkflows(); + mockWebsocketSendMutexWedged = false; mockWindows("main"); window.__SPROUT_E2E_COMMANDS__ = []; window.__SPROUT_E2E_WEBVIEW_ZOOM__ = 1; @@ -4585,6 +4601,12 @@ export function maybeInstallE2eTauriMocks() { window.dispatchEvent(new CustomEvent("sprout:e2e-home-feed-updated")); return item; }; + window.__SPROUT_E2E_SET_STALL_WEBSOCKET_SENDS__ = (stall) => { + const config = getConfig(); + if (!config?.mock) return; + config.mock.stallWebsocketSends = stall; + if (!stall) mockWebsocketSendMutexWedged = false; + }; const handleMockCommand = async (command: string, payload: unknown) => { const activeConfig = getConfig(); const identity = getActiveIdentity(activeConfig); diff --git a/desktop/tests/e2e/relay-reconnect.spec.ts b/desktop/tests/e2e/relay-reconnect.spec.ts new file mode 100644 index 000000000..1e9fb7a09 --- /dev/null +++ b/desktop/tests/e2e/relay-reconnect.spec.ts @@ -0,0 +1,50 @@ +import { expect, test } from "@playwright/test"; + +import { installMockBridge } from "../helpers/bridge"; + +async function setMockWebsocketSendsStalled( + page: import("@playwright/test").Page, + stall: boolean, +) { + await page.evaluate((shouldStall) => { + const setter = ( + window as Window & { + __SPROUT_E2E_SET_STALL_WEBSOCKET_SENDS__?: (stall: boolean) => void; + } + ).__SPROUT_E2E_SET_STALL_WEBSOCKET_SENDS__; + if (!setter) { + throw new Error("E2E websocket stall setter is not installed."); + } + setter(shouldStall); + }, stall); +} + +test.beforeEach(async ({ page }) => { + await installMockBridge(page); +}); + +test("passive relay watchdog does not write while the websocket is half-open", async ({ + page, +}) => { + await page.goto("/"); + await page.getByTestId("channel-general").click(); + await expect(page.getByTestId("chat-title")).toHaveText("general"); + await expect(page.getByTestId("message-timeline")).toContainText( + "Welcome to #general", + ); + + await setMockWebsocketSendsStalled(page, true); + + // Wait longer than the old active-probe interval. If the watchdog still + // writes probes, the mocked plugin send would never resolve and mark the + // mock plugin mutex as wedged. Future reconnects would then be unable to + // register, matching the tauri-plugin-websocket failure mode. The passive + // watchdog should perform no writes of its own during this window. + await page.waitForTimeout(22_000); + + await setMockWebsocketSendsStalled(page, false); + const message = `recovered after passive idle ${Date.now()}`; + await page.getByTestId("message-input").fill(message); + await page.getByTestId("send-message").click(); + await expect(page.getByTestId("message-timeline")).toContainText(message); +}); diff --git a/desktop/tests/helpers/bridge.ts b/desktop/tests/helpers/bridge.ts index 4bc6089bd..5bb4d8487 100644 --- a/desktop/tests/helpers/bridge.ts +++ b/desktop/tests/helpers/bridge.ts @@ -58,6 +58,7 @@ type MockBridgeOptions = { profileReadDelayMs?: number; profileReadError?: string; profileUpdateError?: string; + stallWebsocketSends?: boolean; }; type BridgeOptions = {