-
Notifications
You must be signed in to change notification settings - Fork 15
chore(code): backward-compat cloud queue drain for old agents #2023
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1039,6 +1039,87 @@ describe("SessionService", () => { | |
| }); | ||
| }); | ||
|
|
||
| it("upgrades status to connected on turn_complete when run_started was never received", async () => { | ||
| const service = getSessionService(); | ||
| mockBuildAuthenticatedClient.mockReturnValue(mockAuthenticatedClient); | ||
| const queuedMessage = { | ||
| id: "q-1", | ||
| content: "follow up", | ||
| queuedAt: 1700000000, | ||
| }; | ||
| // Session starts disconnected — simulates an old agent that never | ||
| // emitted _posthog/run_started. | ||
| const sessionWithQueue = createMockSession({ | ||
| taskRunId: "run-123", | ||
| taskId: "task-123", | ||
| status: "disconnected", | ||
| isCloud: true, | ||
| cloudStatus: "in_progress", | ||
| events: [], | ||
| messageQueue: [queuedMessage], | ||
| }); | ||
| // After the turn_complete handler flips status to "connected", | ||
| // sendQueuedCloudMessages reads the session again via | ||
| // getSessionByTaskId. We return the disconnected version first | ||
| // (for the turn_complete handler) then the connected version | ||
| // (for the queue dispatcher's canSendNow check). | ||
| const connectedSession = createMockSession({ | ||
| ...sessionWithQueue, | ||
| status: "connected", | ||
| }); | ||
| mockSessionStoreSetters.getSessions.mockReturnValue({ | ||
| "run-123": sessionWithQueue, | ||
| }); | ||
| mockSessionStoreSetters.getSessionByTaskId | ||
| .mockReturnValueOnce(sessionWithQueue) | ||
| .mockReturnValue(connectedSession); | ||
| mockSessionStoreSetters.dequeueMessages.mockReturnValue([queuedMessage]); | ||
| mockTrpcLogs.readLocalLogs.query.mockResolvedValue(""); | ||
| mockTrpcLogs.fetchS3Logs.query.mockResolvedValue("{}"); | ||
| mockTrpcLogs.writeLocalLogs.mutate.mockResolvedValue(undefined); | ||
| mockTrpcCloudTask.sendCommand.mutate.mockResolvedValue({ | ||
| success: true, | ||
| result: { stopReason: "end_turn" }, | ||
| }); | ||
|
|
||
| const turnCompleteEvent = { | ||
| type: "acp_message" as const, | ||
| ts: 1700000001, | ||
| message: { | ||
| jsonrpc: "2.0" as const, | ||
| method: "_posthog/turn_complete", | ||
| params: { sessionId: "acp-session", stopReason: "end_turn" }, | ||
| }, | ||
| }; | ||
| mockConvertStoredEntriesToEvents.mockReturnValueOnce([turnCompleteEvent]); | ||
|
|
||
| service.watchCloudTask( | ||
| "task-123", | ||
| "run-123", | ||
| "https://api.anthropic.com", | ||
| 123, | ||
| undefined, | ||
| "https://logs.example.com/run-123", | ||
| ); | ||
|
|
||
| await vi.waitFor(() => { | ||
| expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith( | ||
| "run-123", | ||
| { status: "connected" }, | ||
| ); | ||
| }); | ||
|
|
||
| await vi.waitFor(() => { | ||
| expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledWith( | ||
| expect.objectContaining({ | ||
| taskId: "task-123", | ||
| method: "user_message", | ||
| params: expect.objectContaining({ content: "follow up" }), | ||
| }), | ||
| ); | ||
| }); | ||
| }); | ||
|
|
||
| it("clears isPromptPending from structured turn completion logs on hydration", async () => { | ||
| const service = getSessionService(); | ||
| const hydratedSession = createMockSession({ | ||
|
Comment on lines
+1042
to
1125
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Prompt To Fix With AIThis is a comment left during a code review.
Path: apps/code/src/renderer/features/sessions/service/service.test.ts
Line: 1042-1125
Comment:
Opportunity for a parameterised test — the new test is structurally very similar to the existing "flushes queued cloud messages on _posthog/turn_complete" test (line 921): both set up a `turn_complete` event with a queued message and assert the queue is drained. The only dimension that varies is the starting `status` (`"connected"` vs `"disconnected"`) and the expected side effect of the `updateSession` call. Per the team's "we always prefer parameterised tests" rule, these two cases could be collapsed into a single `it.each` table, keeping both scenarios and removing the duplicated fixture setup.
How can I resolve this? If you propose a fix, please make it concise.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time! |
||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -1087,16 +1087,25 @@ export class SessionService { | |||||||||||||||||||||
| isNotification(msg.method, POSTHOG_NOTIFICATIONS.TURN_COMPLETE) | ||||||||||||||||||||||
| ) { | ||||||||||||||||||||||
| const session = sessionStoreSetters.getSessions()[taskRunId]; | ||||||||||||||||||||||
| if (session?.isCloud && session.messageQueue.length > 0) { | ||||||||||||||||||||||
| const taskId = session.taskId; | ||||||||||||||||||||||
| setTimeout(() => { | ||||||||||||||||||||||
| this.sendQueuedCloudMessages(taskId).catch((err) => | ||||||||||||||||||||||
| log.error("turn_complete-driven cloud queue flush failed", { | ||||||||||||||||||||||
| taskId, | ||||||||||||||||||||||
| error: err, | ||||||||||||||||||||||
| }), | ||||||||||||||||||||||
| ); | ||||||||||||||||||||||
| }, 0); | ||||||||||||||||||||||
| if (session?.isCloud) { | ||||||||||||||||||||||
| // Backward compat: treat turn_complete as an implicit run_started | ||||||||||||||||||||||
| // for agents that predate the run_started notification. | ||||||||||||||||||||||
| if (session.status !== "connected") { | ||||||||||||||||||||||
| sessionStoreSetters.updateSession(taskRunId, { | ||||||||||||||||||||||
| status: "connected", | ||||||||||||||||||||||
| }); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
Comment on lines
+1093
to
+1097
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Prompt To Fix With AIThis is a comment left during a code review.
Path: apps/code/src/renderer/features/sessions/service/service.ts
Line: 1093-1097
Comment:
The condition `session.status !== "connected"` is broader than the stated intent. The PR description says to upgrade status specifically when the session is "still disconnected", but this guard also fires for `"error"` and `"connecting"` states. A log-tailed `turn_complete` arriving while the session is in an `"error"` state would unexpectedly flip it back to `"connected"` and trigger a queue drain against a broken session. Narrowing to `=== "disconnected"` matches the prose description and avoids the unintended reset.
```suggestion
if (session.status === "disconnected") {
sessionStoreSetters.updateSession(taskRunId, {
status: "connected",
});
}
```
How can I resolve this? If you propose a fix, please make it concise. |
||||||||||||||||||||||
| if (session.messageQueue.length > 0) { | ||||||||||||||||||||||
| const taskId = session.taskId; | ||||||||||||||||||||||
| setTimeout(() => { | ||||||||||||||||||||||
| this.sendQueuedCloudMessages(taskId).catch((err) => | ||||||||||||||||||||||
| log.error("turn_complete-driven cloud queue flush failed", { | ||||||||||||||||||||||
| taskId, | ||||||||||||||||||||||
| error: err, | ||||||||||||||||||||||
| }), | ||||||||||||||||||||||
| ); | ||||||||||||||||||||||
| }, 0); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mockReturnValueOnce(sessionWithQueue)is consumed "for the turn_complete handler", but the handler reads fromgetSessions()(notgetSessionByTaskId). ThemockReturnValueOnceis actually consumed by some earlier call insidewatchCloudTaskbeforesendQueuedCloudMessagesruns. The comment should clarify what actually consumes the first return value, or the mock chain should be simplified to justmockReturnValue(connectedSession)if the first entry is a setup detail unrelated to the backward-compat path.Prompt To Fix With AI