From 1a3a872e7ca8058e91fac4b5308d965746a0aef6 Mon Sep 17 00:00:00 2001 From: Vojta Bartos Date: Tue, 5 May 2026 10:19:51 +0200 Subject: [PATCH] chore(code): treat turn_complete as implicit run_started for old agents Cloud sandboxes running agent versions that predate the _posthog/run_started notification never flip session.status to "connected", leaving queued follow-up messages stuck. Upgrade status on the first turn_complete when run_started hasn't been received so the queue drains normally. --- .../features/sessions/service/service.test.ts | 81 +++++++++++++++++++ .../features/sessions/service/service.ts | 29 ++++--- 2 files changed, 100 insertions(+), 10 deletions(-) diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index 1408ed18e..f896448be 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -1039,6 +1039,87 @@ describe("SessionService", () => { }); }); + it("upgrades status to connected on turn_complete when run_started was never received", async () => { + const service = getSessionService(); + mockBuildAuthenticatedClient.mockReturnValue(mockAuthenticatedClient); + const queuedMessage = { + id: "q-1", + content: "follow up", + queuedAt: 1700000000, + }; + // Session starts disconnected — simulates an old agent that never + // emitted _posthog/run_started. + const sessionWithQueue = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "disconnected", + isCloud: true, + cloudStatus: "in_progress", + events: [], + messageQueue: [queuedMessage], + }); + // After the turn_complete handler flips status to "connected", + // sendQueuedCloudMessages reads the session again via + // getSessionByTaskId. We return the disconnected version first + // (for the turn_complete handler) then the connected version + // (for the queue dispatcher's canSendNow check). + const connectedSession = createMockSession({ + ...sessionWithQueue, + status: "connected", + }); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": sessionWithQueue, + }); + mockSessionStoreSetters.getSessionByTaskId + .mockReturnValueOnce(sessionWithQueue) + .mockReturnValue(connectedSession); + mockSessionStoreSetters.dequeueMessages.mockReturnValue([queuedMessage]); + mockTrpcLogs.readLocalLogs.query.mockResolvedValue(""); + mockTrpcLogs.fetchS3Logs.query.mockResolvedValue("{}"); + mockTrpcLogs.writeLocalLogs.mutate.mockResolvedValue(undefined); + mockTrpcCloudTask.sendCommand.mutate.mockResolvedValue({ + success: true, + result: { stopReason: "end_turn" }, + }); + + const turnCompleteEvent = { + type: "acp_message" as const, + ts: 1700000001, + message: { + jsonrpc: "2.0" as const, + method: "_posthog/turn_complete", + params: { sessionId: "acp-session", stopReason: "end_turn" }, + }, + }; + mockConvertStoredEntriesToEvents.mockReturnValueOnce([turnCompleteEvent]); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run-123", + ); + + await vi.waitFor(() => { + expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith( + "run-123", + { status: "connected" }, + ); + }); + + await vi.waitFor(() => { + expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledWith( + expect.objectContaining({ + taskId: "task-123", + method: "user_message", + params: expect.objectContaining({ content: "follow up" }), + }), + ); + }); + }); + it("clears isPromptPending from structured turn completion logs on hydration", async () => { const service = getSessionService(); const hydratedSession = createMockSession({ diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index 4636440a2..28486f737 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -1087,16 +1087,25 @@ export class SessionService { isNotification(msg.method, POSTHOG_NOTIFICATIONS.TURN_COMPLETE) ) { const session = sessionStoreSetters.getSessions()[taskRunId]; - if (session?.isCloud && session.messageQueue.length > 0) { - const taskId = session.taskId; - setTimeout(() => { - this.sendQueuedCloudMessages(taskId).catch((err) => - log.error("turn_complete-driven cloud queue flush failed", { - taskId, - error: err, - }), - ); - }, 0); + if (session?.isCloud) { + // Backward compat: treat turn_complete as an implicit run_started + // for agents that predate the run_started notification. + if (session.status !== "connected") { + sessionStoreSetters.updateSession(taskRunId, { + status: "connected", + }); + } + if (session.messageQueue.length > 0) { + const taskId = session.taskId; + setTimeout(() => { + this.sendQueuedCloudMessages(taskId).catch((err) => + log.error("turn_complete-driven cloud queue flush failed", { + taskId, + error: err, + }), + ); + }, 0); + } } } }