Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion desktop/scripts/check-file-sizes.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const overrides = new Map([
["src/features/messages/ui/MessageComposer.tsx", 710], // media upload handlers (paste, drop, dialog) + channelId reset effect + edit mode (pre-fill, save, cancel, escape) + composer autofocus (#572)
["src/features/settings/ui/SettingsView.tsx", 600],
["src/features/sidebar/ui/AppSidebar.tsx", 860], // channels + forums creation forms + Pulse nav
["src/shared/api/relayClientSession.ts", 930], // durable websocket session manager with reconnect/replay/recovery state + sendTypingIndicator + fetchChannelHistoryBefore + subscribeToChannelLive (huddle TTS) + subscribeToHuddleEvents (huddle indicator) + disconnect() for workspace switch teardown + fetchEvents/subscribeLive/publishEvent for NIP-RS read state + publishUserStatus/subscribeToUserStatusUpdates (NIP-38)
["src/shared/api/relayClientSession.ts", 1040], // durable websocket session manager with reconnect/replay/recovery state + sendTypingIndicator + fetchChannelHistoryBefore + subscribeToChannelLive (huddle TTS) + subscribeToHuddleEvents (huddle indicator) + disconnect() for workspace switch teardown + fetchEvents/subscribeLive/publishEvent for NIP-RS read state + publishUserStatus/subscribeToUserStatusUpdates (NIP-38) + ConnectionState plumbing & stall-watchdog wiring for half-open WS detection (Warp orange-icon case) + terminal session latch (auth rejection no longer racing back to reconnecting) β€” emitter + watchdog + reconnect policy logic extracted to relayConnectionStateEmitter.ts / relayStallWatchdog.ts / relayReconnectPolicy.ts
["src/shared/api/tauri.ts", 1100], // remote agent provider API bindings + canvas API functions
["src-tauri/src/lib.rs", 710], // sprout-media:// proxy + Range headers + Sprout nest init (ensure_nest) in setup() + huddle command registration + PTT global shortcut handler + persona pack commands + app_handle storage for event emission
["src-tauri/src/commands/media.rs", 730], // ffmpeg video transcode + poster frame extraction + run_ffmpeg_with_timeout (find_ffmpeg via resolve_command, is_video_file, transcode_to_mp4, extract_poster_frame, transcode_and_extract_poster) + spawn_blocking wrappers + tests
Expand Down
63 changes: 58 additions & 5 deletions desktop/src/features/workspaces/ui/WorkspaceSwitcher.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import { Check, ChevronDown, MoreHorizontal, Plus } from "lucide-react";
import {
Check,
ChevronDown,
MoreHorizontal,
Plus,
WifiOff,
} from "lucide-react";
import * as React from "react";

import type { Workspace } from "@/features/workspaces/types";
Expand All @@ -14,9 +20,24 @@ import {
SidebarMenuButton,
SidebarMenuItem,
} from "@/shared/ui/sidebar";
import { Tooltip, TooltipContent, TooltipTrigger } from "@/shared/ui/tooltip";
import type { ConnectionState } from "@/shared/api/relayClientShared";
import {
isRelayConnectionDegraded,
useRelayConnection,
} from "@/shared/api/useRelayConnection";

import { EditWorkspaceDialog } from "./EditWorkspaceDialog";

const CONNECTION_STATE_LABEL: Record<ConnectionState, string> = {
idle: "Not connected",
connecting: "Connecting…",
connected: "Connected",
reconnecting: "Reconnecting to relay…",
stalled: "Connection lost β€” relay is not responding",
disconnected: "Disconnected from relay",
};

type WorkspaceSwitcherProps = {
activeWorkspace: Workspace | null;
workspaces: Workspace[];
Expand All @@ -40,6 +61,9 @@ export function WorkspaceSwitcher({
const [editingWorkspace, setEditingWorkspace] =
React.useState<Workspace | null>(null);
const [dropdownOpen, setDropdownOpen] = React.useState(false);
const connectionState = useRelayConnection();
const degraded = isRelayConnectionDegraded(connectionState);
const connectionLabel = CONNECTION_STATE_LABEL[connectionState];

return (
<>
Expand All @@ -48,14 +72,43 @@ export function WorkspaceSwitcher({
<DropdownMenu open={dropdownOpen} onOpenChange={setDropdownOpen}>
<DropdownMenuTrigger asChild>
<SidebarMenuButton
aria-label={
degraded
? `${activeWorkspace?.name ?? "Workspace"} β€” ${connectionLabel}`
: undefined
}
className="h-auto gap-2 rounded-xl px-2.5 py-2 data-[state=open]:bg-sidebar-accent"
data-testid="workspace-switcher"
type="button"
>
<span className="flex h-5 w-5 shrink-0 items-center justify-center text-xs leading-none">
🌱
</span>
<span className="min-w-0 flex-1 truncate text-sm font-medium">
{degraded ? (
<Tooltip>
<TooltipTrigger asChild>
<span
aria-hidden="false"
className="flex h-5 w-5 shrink-0 animate-pulse items-center justify-center text-destructive"
data-testid="relay-connection-warning"
role="img"
>
<WifiOff className="h-4 w-4" />
</span>
</TooltipTrigger>
<TooltipContent side="bottom">
{connectionLabel}
</TooltipContent>
</Tooltip>
) : (
<span className="flex h-5 w-5 shrink-0 items-center justify-center text-xs leading-none">
🌱
</span>
)}
<span
className={
degraded
? "min-w-0 flex-1 truncate text-sm font-medium text-destructive animate-pulse"
: "min-w-0 flex-1 truncate text-sm font-medium"
}
>
{activeWorkspace?.name ?? "No workspace"}
</span>
<ChevronDown className="h-3.5 w-3.5 shrink-0 text-sidebar-foreground/50" />
Expand Down
111 changes: 108 additions & 3 deletions desktop/src/shared/api/relayClientSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,43 @@ import {
import {
getTextPayload,
sortEvents,
type ConnectionState,
type PendingEvent,
type RelaySubscription,
type RelaySubscriptionFilter,
} from "@/shared/api/relayClientShared";
import { RelayConnectionStateEmitter } from "@/shared/api/relayConnectionStateEmitter";
import {
shouldRefuseConnect,
shouldScheduleReconnect,
} from "@/shared/api/relayReconnectPolicy";
import { RelayStallWatchdog } from "@/shared/api/relayStallWatchdog";
import { buildThreadReferenceTags } from "@/features/messages/lib/threading";

const RECONNECT_BASE_DELAY_MS = 1_000,
RECONNECT_MAX_DELAY_MS = 30_000;
const RECONNECT_REPLAY_SKEW_SECS = 5,
EVENT_BATCH_MS = 16;

/**
* Application-level liveness probe.
*
* Tungstenite auto-pongs and the OS keeps the TCP socket open, so a
* half-open WS (Warp's orange-icon state, an asleep VPN, etc.) presents as
* "fully connected" to the WS layer indefinitely β€” no Close, no Error.
*
* We work around that by periodically sending a cheap NIP-01 `REQ` with
* `limit: 0` and waiting for the matching `EOSE`. A single missed probe
* (no EOSE within `STALL_PROBE_TIMEOUT_MS`) β€” or a send-side failure on the
* probe itself β€” flips state to `stalled` and force-resets the socket so
* the existing reconnect path runs.
*
* The filter intentionally matches nothing real so the relay only ever
* answers with EOSE.
*/
const STALL_PROBE_INTERVAL_MS = 20_000;
const STALL_PROBE_TIMEOUT_MS = 10_000;

export class RelayClient {
private wsId: number | null = null;
private relayUrl: string | null = null;
Expand All @@ -49,6 +75,29 @@ export class RelayClient {
private notifyReconnectListeners = false;
private onMessageChannel: Channel<unknown> | null = null;

/**
* Sticky terminal flag. Set when `resetConnection` is called with
* `reconnect: false` (today: auth rejection). Acts as a hard guard against
* the reconnect-timer / retry-wrapper paths racing back to "reconnecting"
* after we've already declared the session dead.
*
* Cleared only on explicit user re-engagement: `disconnect()` (workspace
* switch β€” the singleton is being reused for a different workspace) and
* `preconnect()` (caller is asking us to come back up).
*/
private terminal = false;

private connectionStateEmitter = new RelayConnectionStateEmitter("idle");
private stallWatchdog = new RelayStallWatchdog({
intervalMs: STALL_PROBE_INTERVAL_MS,
probeTimeoutMs: STALL_PROBE_TIMEOUT_MS,
sendRaw: (payload) => this.sendRaw(payload),
onStall: (error) => {
this.connectionStateEmitter.set("stalled");
this.resetConnection(error);
},
});

/**
* Cleanly tear down the connection without scheduling a reconnect.
* Used during workspace switches to reset the singleton before the
Expand All @@ -61,10 +110,13 @@ export class RelayClient {
window.clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = null;
}
this.stallWatchdog.stop();
this.keepAliveRequested = false;
this.relayUrl = null;
this.hasConnectedOnce = false;
this.notifyReconnectListeners = false;
this.terminal = false;
this.connectionStateEmitter.set("idle");

if (this.wsId !== null) {
void invoke("plugin:websocket|disconnect", { id: this.wsId }).catch(
Expand Down Expand Up @@ -101,6 +153,7 @@ export class RelayClient {
}
this.eventBuffer = [];
this.reconnectListeners.clear();
this.connectionStateEmitter.clear();
this.onMessageChannel = null;
this.reconnectDelayMs = RECONNECT_BASE_DELAY_MS;
}
Expand Down Expand Up @@ -334,6 +387,9 @@ export class RelayClient {
}

async preconnect() {
// Explicit re-engagement. If the session went terminal (auth rejection)
// the caller is asking us to try again, so clear the latch.
this.terminal = false;
this.keepAliveRequested = true;
await this.ensureConnected();
}
Expand All @@ -346,7 +402,30 @@ export class RelayClient {
};
}

/** Current connection state β€” synchronous read. */
getConnectionState(): ConnectionState {
return this.connectionStateEmitter.get();
}

/**
* Subscribe to connection-state transitions. The listener is invoked
* immediately with the current state so callers don't need a separate
* `getConnectionState()` call to seed their UI.
*/
subscribeToConnectionState(listener: (state: ConnectionState) => void) {
return this.connectionStateEmitter.subscribe(listener);
}

private async ensureConnected() {
if (shouldRefuseConnect({ terminal: this.terminal })) {
// Session is terminal (e.g. relay rejected auth). Refuse to connect
// until an explicit re-engagement (disconnect()/preconnect()) clears
// the flag. Without this, the reconnect timer's catch handler β€” and
// the retry wrappers in publishEvent / sendRawWithReconnectRetry β€”
// would race the terminal "disconnected" state back to "reconnecting".
throw new Error("Relay session is terminal; cannot reconnect.");
}

if (this.connectPromise) {
return this.connectPromise;
}
Expand All @@ -373,6 +452,10 @@ export class RelayClient {
}

private async connect() {
this.connectionStateEmitter.set(
this.hasConnectedOnce ? "reconnecting" : "connecting",
);

if (!this.relayUrl) {
this.relayUrl = await getRelayWsUrl();
}
Expand Down Expand Up @@ -406,6 +489,8 @@ export class RelayClient {

this.reconnectDelayMs = RECONNECT_BASE_DELAY_MS;
await this.replayLiveSubscriptions();
this.connectionStateEmitter.set("connected");
this.stallWatchdog.start();
this.emitReconnectIfNeeded();
}

Expand Down Expand Up @@ -724,6 +809,12 @@ export class RelayClient {
}

private handleEose(subId: string) {
if (this.stallWatchdog.handleEose(subId)) {
// Probe round-trip succeeded β€” silently CLOSE the sub.
void this.closeSubscription(subId).catch(() => {});
return;
}

const subscription = this.subscriptions.get(subId);
if (!subscription) {
return;
Expand Down Expand Up @@ -827,9 +918,13 @@ export class RelayClient {

private scheduleReconnect() {
if (
this.reconnectTimeout ||
this.wsId !== null ||
(!this.keepAliveRequested && !this.hasLiveSubscriptions())
!shouldScheduleReconnect({
terminal: this.terminal,
hasPendingReconnect: this.reconnectTimeout !== null,
hasLiveSocket: this.wsId !== null,
keepAliveRequested: this.keepAliveRequested,
hasLiveSubscriptions: this.hasLiveSubscriptions(),
})
) {
return;
}
Expand Down Expand Up @@ -875,10 +970,20 @@ export class RelayClient {
},
) {
this.onMessageChannel = null;
this.stallWatchdog.stop();
if (this.flushTimeout !== null) window.clearTimeout(this.flushTimeout);
this.flushTimeout = null;
this.eventBuffer = [];

if (options?.reconnect === false) {
this.terminal = true;
this.connectionStateEmitter.set("disconnected");
} else if (this.connectionStateEmitter.get() !== "stalled") {
// Stall is a stronger signal than a generic drop; keep it until the
// reconnect timer transitions us back to "reconnecting" in connect().
this.connectionStateEmitter.set("reconnecting");
}

if (options?.reconnect !== false && this.hasConnectedOnce) {
this.notifyReconnectListeners = true;
}
Expand Down
16 changes: 16 additions & 0 deletions desktop/src/shared/api/relayClientShared.test.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import assert from "node:assert/strict";
import test from "node:test";

import { isRelayConnectionDegraded } from "./relayClientShared.ts";

test("isRelayConnectionDegraded β€” healthy states are not degraded", () => {
assert.equal(isRelayConnectionDegraded("idle"), false);
assert.equal(isRelayConnectionDegraded("connecting"), false);
assert.equal(isRelayConnectionDegraded("connected"), false);
});

test("isRelayConnectionDegraded β€” non-healthy states are degraded", () => {
assert.equal(isRelayConnectionDegraded("reconnecting"), true);
assert.equal(isRelayConnectionDegraded("stalled"), true);
assert.equal(isRelayConnectionDegraded("disconnected"), true);
});
29 changes: 29 additions & 0 deletions desktop/src/shared/api/relayClientShared.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,34 @@
import type { RelayEvent } from "@/shared/api/types";

/**
* Observable connection state for the relay singleton.
*
* - `idle` β€” never tried to connect yet (post-init, pre-workspace).
* - `connecting` β€” initial socket + AUTH handshake in flight.
* - `connected` β€” socket open and AUTH'd.
* - `reconnecting` β€” socket dropped, waiting for the backoff timer.
* - `stalled` β€” socket is *open* per the WS layer but no inbound frames
* for a long time (half-open / Warp split-brain). We
* surface this so the UI can warn even though tungstenite
* hasn't reported anything wrong yet.
* - `disconnected` β€” final/terminal disconnect (auth rejected, workspace
* switch, etc.) β€” no auto-reconnect scheduled.
*/
export type ConnectionState =
| "idle"
| "connecting"
| "connected"
| "reconnecting"
| "stalled"
| "disconnected";

/** True when the UI should surface a "connection lost" indicator. */
export function isRelayConnectionDegraded(state: ConnectionState): boolean {
return (
state === "reconnecting" || state === "stalled" || state === "disconnected"
);
}

export type RelaySubscriptionFilter = {
kinds: number[];
limit: number;
Expand Down
Loading
Loading