From af2fb461382bde3c2148c76cf09f374e8b95130d Mon Sep 17 00:00:00 2001 From: Alessandro Pogliaghi Date: Tue, 5 May 2026 14:03:08 +0100 Subject: [PATCH 1/2] fix(agent): keep idle event streams alive --- .../agent/src/server/agent-server.test.ts | 58 +++++++++++++++++ packages/agent/src/server/agent-server.ts | 64 +++++++++++++------ 2 files changed, 104 insertions(+), 18 deletions(-) diff --git a/packages/agent/src/server/agent-server.test.ts b/packages/agent/src/server/agent-server.test.ts index c487f8a35..dec96daf8 100644 --- a/packages/agent/src/server/agent-server.test.ts +++ b/packages/agent/src/server/agent-server.test.ts @@ -274,6 +274,64 @@ describe("AgentServer HTTP Mode", () => { expect(response.status).toBe(200); expect(response.headers.get("content-type")).toBe("text/event-stream"); }, 20000); + + it("emits transport keepalive comments while idle", async () => { + const keepaliveCallback: { current: (() => void) | null } = { + current: null, + }; + const setIntervalSpy = vi + .spyOn(globalThis, "setInterval") + .mockImplementation( + (callback: (_: undefined) => void, timeout?: number) => { + if (timeout === 25_000) { + keepaliveCallback.current = () => callback(undefined); + } + return setTimeout(() => undefined, 60_000); + }, + ); + + let reader: ReadableStreamDefaultReader | null = null; + try { + await createServer().start(); + const token = createToken(); + + const response = await fetch(`http://localhost:${port}/events`, { + headers: { Authorization: `Bearer ${token}` }, + }); + + expect(response.status).toBe(200); + expect(response.body).not.toBeNull(); + reader = response.body?.getReader() ?? null; + expect(reader).not.toBeNull(); + if (!reader) { + throw new Error("Expected SSE response body reader"); + } + + await vi.waitFor(() => + expect(keepaliveCallback.current).not.toBeNull(), + ); + const emitKeepalive = keepaliveCallback.current; + if (!emitKeepalive) { + throw new Error("Expected keepalive callback to be registered"); + } + emitKeepalive(); + + const decoder = new TextDecoder(); + let streamText = ""; + for (let attempts = 0; attempts < 5; attempts++) { + const { done, value } = await reader.read(); + if (done) break; + streamText += decoder.decode(value, { stream: true }); + if (streamText.includes(": keepalive\n\n")) break; + } + + expect(streamText).toContain(": keepalive\n\n"); + expect(streamText).not.toContain('"type":"keepalive"'); + } finally { + await reader?.cancel(); + setIntervalSpy.mockRestore(); + } + }, 20000); }); describe("POST /command", () => { diff --git a/packages/agent/src/server/agent-server.ts b/packages/agent/src/server/agent-server.ts index 18acf4506..73aa2a530 100644 --- a/packages/agent/src/server/agent-server.ts +++ b/packages/agent/src/server/agent-server.ts @@ -73,6 +73,8 @@ const errorWithClassificationSchema = z.object({ type MessageCallback = (message: unknown) => void; +const SSE_KEEPALIVE_INTERVAL_MS = 25_000; + class NdJsonTap { private decoder = new TextDecoder(); private buffer = ""; @@ -329,20 +331,23 @@ export class AgentServer { ); } + let keepaliveInterval: ReturnType | null = null; + const clearKeepalive = (): void => { + if (keepaliveInterval) { + clearInterval(keepaliveInterval); + keepaliveInterval = null; + } + }; + const stream = new ReadableStream({ start: async (controller) => { const sseController: SseController = { send: (data: unknown) => { - try { - controller.enqueue( - new TextEncoder().encode(`data: ${JSON.stringify(data)}\n\n`), - ); - } catch { - this.detachSseController(sseController); - } + enqueueSseFrame(`data: ${JSON.stringify(data)}\n\n`); }, close: () => { try { + clearKeepalive(); controller.close(); } catch { this.detachSseController(sseController); @@ -350,20 +355,43 @@ export class AgentServer { }, }; - if (!this.session || this.session.payload.run_id !== payload.run_id) { - await this.initializeSession(payload, sseController); - } else { - this.session.sseController = sseController; - this.session.hasDesktopConnected = true; - this.replayPendingEvents(); - } + const encoder = new TextEncoder(); + const enqueueSseFrame = (frame: string): void => { + try { + controller.enqueue(encoder.encode(frame)); + } catch { + clearKeepalive(); + this.detachSseController(sseController); + } + }; - this.sendSseEvent(sseController, { - type: "connected", - run_id: payload.run_id, - }); + keepaliveInterval = setInterval(() => { + enqueueSseFrame(": keepalive\n\n"); + }, SSE_KEEPALIVE_INTERVAL_MS); + + try { + if ( + !this.session || + this.session.payload.run_id !== payload.run_id + ) { + await this.initializeSession(payload, sseController); + } else { + this.session.sseController = sseController; + this.session.hasDesktopConnected = true; + this.replayPendingEvents(); + } + + this.sendSseEvent(sseController, { + type: "connected", + run_id: payload.run_id, + }); + } catch (error) { + clearKeepalive(); + throw error; + } }, cancel: () => { + clearKeepalive(); this.logger.debug("SSE connection closed"); if (this.session?.sseController) { this.session.sseController = null; From 3a7a5cc3ea9d40398afd8c4cb53ddfe26536cbe8 Mon Sep 17 00:00:00 2001 From: Alessandro Pogliaghi Date: Tue, 5 May 2026 14:20:27 +0100 Subject: [PATCH 2/2] fix(agent): address event stream review feedback --- .../agent/src/server/agent-server.test.ts | 4 +-- packages/agent/src/server/agent-server.ts | 32 +++++++++++-------- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/packages/agent/src/server/agent-server.test.ts b/packages/agent/src/server/agent-server.test.ts index dec96daf8..9bbc755ca 100644 --- a/packages/agent/src/server/agent-server.test.ts +++ b/packages/agent/src/server/agent-server.test.ts @@ -13,7 +13,7 @@ import { import { createTestRepo, type TestRepo } from "../test/fixtures/api"; import { createPostHogHandlers } from "../test/mocks/msw-handlers"; import type { TaskRun } from "../types"; -import { AgentServer } from "./agent-server"; +import { AgentServer, SSE_KEEPALIVE_INTERVAL_MS } from "./agent-server"; import { type JwtPayload, SANDBOX_CONNECTION_AUDIENCE } from "./jwt"; interface TestableServer { @@ -283,7 +283,7 @@ describe("AgentServer HTTP Mode", () => { .spyOn(globalThis, "setInterval") .mockImplementation( (callback: (_: undefined) => void, timeout?: number) => { - if (timeout === 25_000) { + if (timeout === SSE_KEEPALIVE_INTERVAL_MS) { keepaliveCallback.current = () => callback(undefined); } return setTimeout(() => undefined, 60_000); diff --git a/packages/agent/src/server/agent-server.ts b/packages/agent/src/server/agent-server.ts index 73aa2a530..357ceef72 100644 --- a/packages/agent/src/server/agent-server.ts +++ b/packages/agent/src/server/agent-server.ts @@ -73,7 +73,7 @@ const errorWithClassificationSchema = z.object({ type MessageCallback = (message: unknown) => void; -const SSE_KEEPALIVE_INTERVAL_MS = 25_000; +export const SSE_KEEPALIVE_INTERVAL_MS = 25_000; class NdJsonTap { private decoder = new TextDecoder(); @@ -341,7 +341,23 @@ export class AgentServer { const stream = new ReadableStream({ start: async (controller) => { - const sseController: SseController = { + let sseController: SseController | null = null; + const encoder = new TextEncoder(); + const detachCurrentSseController = (): void => { + if (sseController) { + this.detachSseController(sseController); + } + }; + const enqueueSseFrame = (frame: string): void => { + try { + controller.enqueue(encoder.encode(frame)); + } catch { + clearKeepalive(); + detachCurrentSseController(); + } + }; + + sseController = { send: (data: unknown) => { enqueueSseFrame(`data: ${JSON.stringify(data)}\n\n`); }, @@ -350,21 +366,11 @@ export class AgentServer { clearKeepalive(); controller.close(); } catch { - this.detachSseController(sseController); + detachCurrentSseController(); } }, }; - const encoder = new TextEncoder(); - const enqueueSseFrame = (frame: string): void => { - try { - controller.enqueue(encoder.encode(frame)); - } catch { - clearKeepalive(); - this.detachSseController(sseController); - } - }; - keepaliveInterval = setInterval(() => { enqueueSseFrame(": keepalive\n\n"); }, SSE_KEEPALIVE_INTERVAL_MS);