From fc6f222890699e03f53b0dc54ab31b6f62c1b9d1 Mon Sep 17 00:00:00 2001 From: Hwanseo Choi Date: Fri, 3 Apr 2026 08:56:58 -0700 Subject: [PATCH 1/3] Fix orchestration reconnect recovery --- apps/web/src/orchestrationRecovery.ts | 6 +++- apps/web/src/routes/__root.tsx | 43 +++++++++++++++++---------- apps/web/src/wsRpcClient.ts | 33 +++++++++++++++----- apps/web/src/wsTransport.test.ts | 3 ++ apps/web/src/wsTransport.ts | 15 +++++++++- 5 files changed, 74 insertions(+), 26 deletions(-) diff --git a/apps/web/src/orchestrationRecovery.ts b/apps/web/src/orchestrationRecovery.ts index 5af48f85b9..7c9c33e8ea 100644 --- a/apps/web/src/orchestrationRecovery.ts +++ b/apps/web/src/orchestrationRecovery.ts @@ -1,4 +1,8 @@ -export type OrchestrationRecoveryReason = "bootstrap" | "sequence-gap" | "replay-failed"; +export type OrchestrationRecoveryReason = + | "bootstrap" + | "sequence-gap" + | "resubscribe" + | "replay-failed"; export interface OrchestrationRecoveryPhase { kind: "snapshot" | "replay"; diff --git a/apps/web/src/routes/__root.tsx b/apps/web/src/routes/__root.tsx index 3377e4bb44..908ffaf78d 100644 --- a/apps/web/src/routes/__root.tsx +++ b/apps/web/src/routes/__root.tsx @@ -423,8 +423,8 @@ function EventRouter() { queueMicrotask(flushPendingDomainEvents); }; - const recoverFromSequenceGap = async (): Promise => { - if (!recovery.beginReplayRecovery("sequence-gap")) { + const runReplayRecovery = async (reason: "sequence-gap" | "resubscribe"): Promise => { + if (!recovery.beginReplayRecovery(reason)) { return; } @@ -441,7 +441,7 @@ function EventRouter() { } if (!disposed && recovery.completeReplayRecovery()) { - void recoverFromSequenceGap(); + void runReplayRecovery(reason); } }; @@ -471,7 +471,7 @@ function EventRouter() { syncServerReadModel(snapshot); reconcileSnapshotDerivedState(); if (recovery.completeSnapshotRecovery(snapshot.snapshotSequence)) { - void recoverFromSequenceGap(); + void runReplayRecovery("sequence-gap"); } } } catch { @@ -488,18 +488,29 @@ function EventRouter() { const fallbackToSnapshotRecovery = async (): Promise => { await runSnapshotRecovery("replay-failed"); }; - const unsubDomainEvent = api.orchestration.onDomainEvent((event) => { - const action = recovery.classifyDomainEvent(event.sequence); - if (action === "apply") { - pendingDomainEvents.push(event); - schedulePendingDomainEventFlush(); - return; - } - if (action === "recover") { - flushPendingDomainEvents(); - void recoverFromSequenceGap(); - } - }); + const unsubDomainEvent = getWsRpcClient().orchestration.onDomainEvent( + (event) => { + const action = recovery.classifyDomainEvent(event.sequence); + if (action === "apply") { + pendingDomainEvents.push(event); + schedulePendingDomainEventFlush(); + return; + } + if (action === "recover") { + flushPendingDomainEvents(); + void runReplayRecovery("sequence-gap"); + } + }, + { + onResubscribe: () => { + if (disposed) { + return; + } + flushPendingDomainEvents(); + void runReplayRecovery("resubscribe"); + }, + }, + ); const unsubTerminalEvent = api.terminal.onEvent((event) => { const thread = useStore.getState().threads.find((entry) => entry.id === event.threadId); if (thread && thread.archivedAt !== null) { diff --git a/apps/web/src/wsRpcClient.ts b/apps/web/src/wsRpcClient.ts index 60f51ba707..79f5f6da33 100644 --- a/apps/web/src/wsRpcClient.ts +++ b/apps/web/src/wsRpcClient.ts @@ -16,6 +16,10 @@ type RpcTag = keyof WsRpcProtocolClient & string; type RpcMethod = WsRpcProtocolClient[TTag]; type RpcInput = Parameters>[0]; +interface StreamSubscriptionOptions { + readonly onResubscribe?: () => void; +} + type RpcUnaryMethod = RpcMethod extends (input: any, options?: any) => Effect.Effect ? (input: RpcInput) => Promise @@ -28,7 +32,7 @@ type RpcUnaryNoArgMethod = type RpcStreamMethod = RpcMethod extends (input: any, options?: any) => Stream.Stream - ? (listener: (event: TEvent) => void) => () => void + ? (listener: (event: TEvent) => void, options?: StreamSubscriptionOptions) => () => void : never; interface GitRunStackedActionOptions { @@ -120,8 +124,12 @@ export function createWsRpcClient(transport = new WsTransport()): WsRpcClient { clear: (input) => transport.request((client) => client[WS_METHODS.terminalClear](input)), restart: (input) => transport.request((client) => client[WS_METHODS.terminalRestart](input)), close: (input) => transport.request((client) => client[WS_METHODS.terminalClose](input)), - onEvent: (listener) => - transport.subscribe((client) => client[WS_METHODS.subscribeTerminalEvents]({}), listener), + onEvent: (listener, options) => + transport.subscribe( + (client) => client[WS_METHODS.subscribeTerminalEvents]({}), + listener, + options, + ), }, projects: { searchEntries: (input) => @@ -179,10 +187,18 @@ export function createWsRpcClient(transport = new WsTransport()): WsRpcClient { getSettings: () => transport.request((client) => client[WS_METHODS.serverGetSettings]({})), updateSettings: (patch) => transport.request((client) => client[WS_METHODS.serverUpdateSettings]({ patch })), - subscribeConfig: (listener) => - transport.subscribe((client) => client[WS_METHODS.subscribeServerConfig]({}), listener), - subscribeLifecycle: (listener) => - transport.subscribe((client) => client[WS_METHODS.subscribeServerLifecycle]({}), listener), + subscribeConfig: (listener, options) => + transport.subscribe( + (client) => client[WS_METHODS.subscribeServerConfig]({}), + listener, + options, + ), + subscribeLifecycle: (listener, options) => + transport.subscribe( + (client) => client[WS_METHODS.subscribeServerLifecycle]({}), + listener, + options, + ), }, orchestration: { getSnapshot: () => @@ -197,10 +213,11 @@ export function createWsRpcClient(transport = new WsTransport()): WsRpcClient { transport .request((client) => client[ORCHESTRATION_WS_METHODS.replayEvents](input)) .then((events) => [...events]), - onDomainEvent: (listener) => + onDomainEvent: (listener, options) => transport.subscribe( (client) => client[WS_METHODS.subscribeOrchestrationDomainEvents]({}), listener, + options, ), }, }; diff --git a/apps/web/src/wsTransport.test.ts b/apps/web/src/wsTransport.test.ts index 2f64f2a692..b5cbb767e7 100644 --- a/apps/web/src/wsTransport.test.ts +++ b/apps/web/src/wsTransport.test.ts @@ -250,10 +250,12 @@ describe("WsTransport", () => { it("re-subscribes stream listeners after the stream exits", async () => { const transport = new WsTransport("ws://localhost:3020"); const listener = vi.fn(); + const onResubscribe = vi.fn(); const unsubscribe = transport.subscribe( (client) => client[WS_METHODS.subscribeServerLifecycle]({}), listener, + { onResubscribe }, ); await waitFor(() => { expect(sockets).toHaveLength(1); @@ -301,6 +303,7 @@ describe("WsTransport", () => { .find((message) => message._tag === "Request" && message.id !== firstRequest.id); expect(nextRequest).toBeDefined(); }); + expect(onResubscribe).toHaveBeenCalledOnce(); const secondRequest = socket.sent .map((message) => JSON.parse(message) as { _tag?: string; id?: string; tag?: string }) diff --git a/apps/web/src/wsTransport.ts b/apps/web/src/wsTransport.ts index 70042261d5..aed100242f 100644 --- a/apps/web/src/wsTransport.ts +++ b/apps/web/src/wsTransport.ts @@ -9,6 +9,7 @@ import { RpcClient } from "effect/unstable/rpc"; interface SubscribeOptions { readonly retryDelay?: Duration.Input; + readonly onResubscribe?: () => void; } interface RequestOptions { @@ -82,9 +83,21 @@ export class WsTransport { } let active = true; + let firstAttempt = true; const retryDelayMs = options?.retryDelay ?? DEFAULT_SUBSCRIPTION_RETRY_DELAY_MS; const cancel = this.runtime.runCallback( - Effect.promise(() => this.clientPromise).pipe( + Effect.sync(() => { + if (firstAttempt) { + firstAttempt = false; + return; + } + try { + options?.onResubscribe?.(); + } catch { + // Swallow reconnect hook errors so the stream can recover. + } + }).pipe( + Effect.andThen(Effect.promise(() => this.clientPromise)), Effect.flatMap((client) => Stream.runForEach(connect(client), (value) => Effect.sync(() => { From e58393ee9835a1047a27932e1b9d9e18b4589f3f Mon Sep 17 00:00:00 2001 From: Hwanseo Choi Date: Fri, 3 Apr 2026 10:44:11 -0700 Subject: [PATCH 2/3] Preserve NativeApi orchestration subscription abstraction --- apps/web/src/routes/__root.tsx | 2 +- apps/web/src/wsNativeApi.test.ts | 14 ++++++++++++++ apps/web/src/wsNativeApi.ts | 3 ++- packages/contracts/src/ipc.ts | 7 ++++++- 4 files changed, 23 insertions(+), 3 deletions(-) diff --git a/apps/web/src/routes/__root.tsx b/apps/web/src/routes/__root.tsx index 908ffaf78d..bb9e5fdf71 100644 --- a/apps/web/src/routes/__root.tsx +++ b/apps/web/src/routes/__root.tsx @@ -488,7 +488,7 @@ function EventRouter() { const fallbackToSnapshotRecovery = async (): Promise => { await runSnapshotRecovery("replay-failed"); }; - const unsubDomainEvent = getWsRpcClient().orchestration.onDomainEvent( + const unsubDomainEvent = api.orchestration.onDomainEvent( (event) => { const action = recovery.classifyDomainEvent(event.sequence); if (action === "apply") { diff --git a/apps/web/src/wsNativeApi.test.ts b/apps/web/src/wsNativeApi.test.ts index 47f78f967f..cfa6ca6942 100644 --- a/apps/web/src/wsNativeApi.test.ts +++ b/apps/web/src/wsNativeApi.test.ts @@ -243,6 +243,20 @@ describe("wsNativeApi", () => { expect(onDomainEvent).toHaveBeenCalledWith(orchestrationEvent); }); + it("forwards orchestration stream subscription options to the RPC client", async () => { + const { createWsNativeApi } = await import("./wsNativeApi"); + + const api = createWsNativeApi(); + const onDomainEvent = vi.fn(); + const onResubscribe = vi.fn(); + + api.orchestration.onDomainEvent(onDomainEvent, { onResubscribe }); + + expect(rpcClientMock.orchestration.onDomainEvent).toHaveBeenCalledWith(onDomainEvent, { + onResubscribe, + }); + }); + it("sends orchestration dispatch commands as the direct RPC payload", async () => { rpcClientMock.orchestration.dispatchCommand.mockResolvedValue({ sequence: 1 }); const { createWsNativeApi } = await import("./wsNativeApi"); diff --git a/apps/web/src/wsNativeApi.ts b/apps/web/src/wsNativeApi.ts index 31160dfa1c..6c9e07cc75 100644 --- a/apps/web/src/wsNativeApi.ts +++ b/apps/web/src/wsNativeApi.ts @@ -98,7 +98,8 @@ export function createWsNativeApi(): NativeApi { rpcClient.orchestration .replayEvents({ fromSequenceExclusive }) .then((events) => [...events]), - onDomainEvent: (callback) => rpcClient.orchestration.onDomainEvent(callback), + onDomainEvent: (callback, options) => + rpcClient.orchestration.onDomainEvent(callback, options), }, }; diff --git a/packages/contracts/src/ipc.ts b/packages/contracts/src/ipc.ts index 3114f6f5be..57a1c4c3dc 100644 --- a/packages/contracts/src/ipc.ts +++ b/packages/contracts/src/ipc.ts @@ -180,6 +180,11 @@ export interface NativeApi { input: OrchestrationGetFullThreadDiffInput, ) => Promise; replayEvents: (fromSequenceExclusive: number) => Promise; - onDomainEvent: (callback: (event: OrchestrationEvent) => void) => () => void; + onDomainEvent: ( + callback: (event: OrchestrationEvent) => void, + options?: { + onResubscribe?: () => void; + }, + ) => () => void; }; } From 96c1f15f8ecb6440ae8c81374d25d1d4662f8db2 Mon Sep 17 00:00:00 2001 From: Hwanseo Choi Date: Fri, 3 Apr 2026 13:55:14 -0700 Subject: [PATCH 3/3] Fix ws transport resubscribe detection --- apps/web/src/wsTransport.test.ts | 46 ++++++++++++++++++++++++++++++++ apps/web/src/wsTransport.ts | 6 ++--- 2 files changed, 49 insertions(+), 3 deletions(-) diff --git a/apps/web/src/wsTransport.test.ts b/apps/web/src/wsTransport.test.ts index b5cbb767e7..0f664d898b 100644 --- a/apps/web/src/wsTransport.test.ts +++ b/apps/web/src/wsTransport.test.ts @@ -342,6 +342,52 @@ describe("WsTransport", () => { await transport.dispose(); }); + it("does not fire onResubscribe when the first stream attempt exits before any value", async () => { + const transport = new WsTransport("ws://localhost:3020"); + const listener = vi.fn(); + const onResubscribe = vi.fn(); + + const unsubscribe = transport.subscribe( + (client) => client[WS_METHODS.subscribeServerLifecycle]({}), + listener, + { onResubscribe }, + ); + await waitFor(() => { + expect(sockets).toHaveLength(1); + }); + + const socket = getSocket(); + socket.open(); + + await waitFor(() => { + expect(socket.sent).toHaveLength(1); + }); + + const firstRequest = JSON.parse(socket.sent[0] ?? "{}") as { id: string }; + socket.serverMessage( + JSON.stringify({ + _tag: "Exit", + requestId: firstRequest.id, + exit: { + _tag: "Success", + value: null, + }, + }), + ); + + await waitFor(() => { + const nextRequest = socket.sent + .map((message) => JSON.parse(message) as { _tag?: string; id?: string }) + .find((message) => message._tag === "Request" && message.id !== firstRequest.id); + expect(nextRequest).toBeDefined(); + }); + expect(onResubscribe).not.toHaveBeenCalled(); + expect(listener).not.toHaveBeenCalled(); + + unsubscribe(); + await transport.dispose(); + }); + it("streams finite request events without re-subscribing", async () => { const transport = new WsTransport("ws://localhost:3020"); const listener = vi.fn(); diff --git a/apps/web/src/wsTransport.ts b/apps/web/src/wsTransport.ts index aed100242f..fb0bc7dba4 100644 --- a/apps/web/src/wsTransport.ts +++ b/apps/web/src/wsTransport.ts @@ -83,12 +83,11 @@ export class WsTransport { } let active = true; - let firstAttempt = true; + let hasReceivedValue = false; const retryDelayMs = options?.retryDelay ?? DEFAULT_SUBSCRIPTION_RETRY_DELAY_MS; const cancel = this.runtime.runCallback( Effect.sync(() => { - if (firstAttempt) { - firstAttempt = false; + if (!hasReceivedValue) { return; } try { @@ -104,6 +103,7 @@ export class WsTransport { if (!active) { return; } + hasReceivedValue = true; try { listener(value); } catch {