chore(code): backward-compat cloud queue drain for old agents#2023
chore(code): backward-compat cloud queue drain for old agents#2023VojtechBartos merged 1 commit intomainfrom
Conversation
Cloud sandboxes running agent versions that predate the _posthog/run_started notification never flip session.status to "connected", leaving queued follow-up messages stuck. Upgrade status on the first turn_complete when run_started hasn't been received so the queue drains normally.
Prompt To Fix All With AIFix the following 3 code review issues. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 3
apps/code/src/renderer/features/sessions/service/service.ts:1093-1097
The condition `session.status !== "connected"` is broader than the stated intent. The PR description says to upgrade status specifically when the session is "still disconnected", but this guard also fires for `"error"` and `"connecting"` states. A log-tailed `turn_complete` arriving while the session is in an `"error"` state would unexpectedly flip it back to `"connected"` and trigger a queue drain against a broken session. Narrowing to `=== "disconnected"` matches the prose description and avoids the unintended reset.
```suggestion
if (session.status === "disconnected") {
sessionStoreSetters.updateSession(taskRunId, {
status: "connected",
});
}
```
### Issue 2 of 3
apps/code/src/renderer/features/sessions/service/service.test.ts:1055-1064
Misleading mock comment — the comment says the first `mockReturnValueOnce(sessionWithQueue)` is consumed "for the turn_complete handler", but the handler reads from `getSessions()` (not `getSessionByTaskId`). The `mockReturnValueOnce` is actually consumed by some earlier call inside `watchCloudTask` before `sendQueuedCloudMessages` runs. The comment should clarify what actually consumes the first return value, or the mock chain should be simplified to just `mockReturnValue(connectedSession)` if the first entry is a setup detail unrelated to the backward-compat path.
### Issue 3 of 3
apps/code/src/renderer/features/sessions/service/service.test.ts:1042-1125
Opportunity for a parameterised test — the new test is structurally very similar to the existing "flushes queued cloud messages on _posthog/turn_complete" test (line 921): both set up a `turn_complete` event with a queued message and assert the queue is drained. The only dimension that varies is the starting `status` (`"connected"` vs `"disconnected"`) and the expected side effect of the `updateSession` call. Per the team's "we always prefer parameterised tests" rule, these two cases could be collapsed into a single `it.each` table, keeping both scenarios and removing the duplicated fixture setup.
Reviews (1): Last reviewed commit: "chore(code): treat turn_complete as impl..." | Re-trigger Greptile |
| if (session.status !== "connected") { | ||
| sessionStoreSetters.updateSession(taskRunId, { | ||
| status: "connected", | ||
| }); | ||
| } |
There was a problem hiding this comment.
The condition
session.status !== "connected" is broader than the stated intent. The PR description says to upgrade status specifically when the session is "still disconnected", but this guard also fires for "error" and "connecting" states. A log-tailed turn_complete arriving while the session is in an "error" state would unexpectedly flip it back to "connected" and trigger a queue drain against a broken session. Narrowing to === "disconnected" matches the prose description and avoids the unintended reset.
| if (session.status !== "connected") { | |
| sessionStoreSetters.updateSession(taskRunId, { | |
| status: "connected", | |
| }); | |
| } | |
| if (session.status === "disconnected") { | |
| sessionStoreSetters.updateSession(taskRunId, { | |
| status: "connected", | |
| }); | |
| } |
Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/code/src/renderer/features/sessions/service/service.ts
Line: 1093-1097
Comment:
The condition `session.status !== "connected"` is broader than the stated intent. The PR description says to upgrade status specifically when the session is "still disconnected", but this guard also fires for `"error"` and `"connecting"` states. A log-tailed `turn_complete` arriving while the session is in an `"error"` state would unexpectedly flip it back to `"connected"` and trigger a queue drain against a broken session. Narrowing to `=== "disconnected"` matches the prose description and avoids the unintended reset.
```suggestion
if (session.status === "disconnected") {
sessionStoreSetters.updateSession(taskRunId, {
status: "connected",
});
}
```
How can I resolve this? If you propose a fix, please make it concise.| status: "disconnected", | ||
| isCloud: true, | ||
| cloudStatus: "in_progress", | ||
| events: [], | ||
| messageQueue: [queuedMessage], | ||
| }); | ||
| // After the turn_complete handler flips status to "connected", | ||
| // sendQueuedCloudMessages reads the session again via | ||
| // getSessionByTaskId. We return the disconnected version first | ||
| // (for the turn_complete handler) then the connected version |
There was a problem hiding this comment.
Misleading mock comment — the comment says the first
mockReturnValueOnce(sessionWithQueue) is consumed "for the turn_complete handler", but the handler reads from getSessions() (not getSessionByTaskId). The mockReturnValueOnce is actually consumed by some earlier call inside watchCloudTask before sendQueuedCloudMessages runs. The comment should clarify what actually consumes the first return value, or the mock chain should be simplified to just mockReturnValue(connectedSession) if the first entry is a setup detail unrelated to the backward-compat path.
Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/code/src/renderer/features/sessions/service/service.test.ts
Line: 1055-1064
Comment:
Misleading mock comment — the comment says the first `mockReturnValueOnce(sessionWithQueue)` is consumed "for the turn_complete handler", but the handler reads from `getSessions()` (not `getSessionByTaskId`). The `mockReturnValueOnce` is actually consumed by some earlier call inside `watchCloudTask` before `sendQueuedCloudMessages` runs. The comment should clarify what actually consumes the first return value, or the mock chain should be simplified to just `mockReturnValue(connectedSession)` if the first entry is a setup detail unrelated to the backward-compat path.
How can I resolve this? If you propose a fix, please make it concise.| it("upgrades status to connected on turn_complete when run_started was never received", async () => { | ||
| const service = getSessionService(); | ||
| mockBuildAuthenticatedClient.mockReturnValue(mockAuthenticatedClient); | ||
| const queuedMessage = { | ||
| id: "q-1", | ||
| content: "follow up", | ||
| queuedAt: 1700000000, | ||
| }; | ||
| // Session starts disconnected — simulates an old agent that never | ||
| // emitted _posthog/run_started. | ||
| const sessionWithQueue = createMockSession({ | ||
| taskRunId: "run-123", | ||
| taskId: "task-123", | ||
| status: "disconnected", | ||
| isCloud: true, | ||
| cloudStatus: "in_progress", | ||
| events: [], | ||
| messageQueue: [queuedMessage], | ||
| }); | ||
| // After the turn_complete handler flips status to "connected", | ||
| // sendQueuedCloudMessages reads the session again via | ||
| // getSessionByTaskId. We return the disconnected version first | ||
| // (for the turn_complete handler) then the connected version | ||
| // (for the queue dispatcher's canSendNow check). | ||
| const connectedSession = createMockSession({ | ||
| ...sessionWithQueue, | ||
| status: "connected", | ||
| }); | ||
| mockSessionStoreSetters.getSessions.mockReturnValue({ | ||
| "run-123": sessionWithQueue, | ||
| }); | ||
| mockSessionStoreSetters.getSessionByTaskId | ||
| .mockReturnValueOnce(sessionWithQueue) | ||
| .mockReturnValue(connectedSession); | ||
| mockSessionStoreSetters.dequeueMessages.mockReturnValue([queuedMessage]); | ||
| mockTrpcLogs.readLocalLogs.query.mockResolvedValue(""); | ||
| mockTrpcLogs.fetchS3Logs.query.mockResolvedValue("{}"); | ||
| mockTrpcLogs.writeLocalLogs.mutate.mockResolvedValue(undefined); | ||
| mockTrpcCloudTask.sendCommand.mutate.mockResolvedValue({ | ||
| success: true, | ||
| result: { stopReason: "end_turn" }, | ||
| }); | ||
|
|
||
| const turnCompleteEvent = { | ||
| type: "acp_message" as const, | ||
| ts: 1700000001, | ||
| message: { | ||
| jsonrpc: "2.0" as const, | ||
| method: "_posthog/turn_complete", | ||
| params: { sessionId: "acp-session", stopReason: "end_turn" }, | ||
| }, | ||
| }; | ||
| mockConvertStoredEntriesToEvents.mockReturnValueOnce([turnCompleteEvent]); | ||
|
|
||
| service.watchCloudTask( | ||
| "task-123", | ||
| "run-123", | ||
| "https://api.anthropic.com", | ||
| 123, | ||
| undefined, | ||
| "https://logs.example.com/run-123", | ||
| ); | ||
|
|
||
| await vi.waitFor(() => { | ||
| expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith( | ||
| "run-123", | ||
| { status: "connected" }, | ||
| ); | ||
| }); | ||
|
|
||
| await vi.waitFor(() => { | ||
| expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledWith( | ||
| expect.objectContaining({ | ||
| taskId: "task-123", | ||
| method: "user_message", | ||
| params: expect.objectContaining({ content: "follow up" }), | ||
| }), | ||
| ); | ||
| }); | ||
| }); | ||
|
|
||
| it("clears isPromptPending from structured turn completion logs on hydration", async () => { | ||
| const service = getSessionService(); | ||
| const hydratedSession = createMockSession({ |
There was a problem hiding this comment.
Opportunity for a parameterised test — the new test is structurally very similar to the existing "flushes queued cloud messages on _posthog/turn_complete" test (line 921): both set up a
turn_complete event with a queued message and assert the queue is drained. The only dimension that varies is the starting status ("connected" vs "disconnected") and the expected side effect of the updateSession call. Per the team's "we always prefer parameterised tests" rule, these two cases could be collapsed into a single it.each table, keeping both scenarios and removing the duplicated fixture setup.
Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/code/src/renderer/features/sessions/service/service.test.ts
Line: 1042-1125
Comment:
Opportunity for a parameterised test — the new test is structurally very similar to the existing "flushes queued cloud messages on _posthog/turn_complete" test (line 921): both set up a `turn_complete` event with a queued message and assert the queue is drained. The only dimension that varies is the starting `status` (`"connected"` vs `"disconnected"`) and the expected side effect of the `updateSession` call. Per the team's "we always prefer parameterised tests" rule, these two cases could be collapsed into a single `it.each` table, keeping both scenarios and removing the duplicated fixture setup.
How can I resolve this? If you propose a fix, please make it concise.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Problem
PR #1905 gates cloud message queue dispatch on
_posthog/run_startedand_posthog/turn_completenotifications. The sandbox base image hasn't been rebuilt since April 28 and runs agent v2.3.425, which doesn't emit_posthog/run_started. This meanssession.statusnever flips to"connected"and queued follow-up messages are stuck forever.Solution
Treat
_posthog/turn_completeas an implicitrun_started— if the session is still disconnected when a turn completes, upgrade status to"connected"before draining the queue. No-op for new agents that already emittedrun_started.Test plan
turn_completewithout priorrun_startedupgrades status and drains queueCreated with PostHog Code