From 900e12fcd3ea4e0c4b35ad211864dc109a01f138 Mon Sep 17 00:00:00 2001 From: Sami Jawhar Date: Wed, 8 Apr 2026 21:52:33 +0000 Subject: [PATCH] fix(core): harden prompt loop and async session handling --- .../cli/cmd/tui/component/prompt/index.tsx | 9 +- .../opencode/src/server/routes/session.ts | 25 ++- packages/opencode/src/session/message-v2.ts | 33 +++- packages/opencode/src/session/processor.ts | 9 +- packages/opencode/src/session/prompt.ts | 72 +++++++- .../test/server/session-messages.test.ts | 2 + packages/opencode/test/session/cancel.test.ts | 154 ++++++++++++++++++ .../test/session/fixtures/skewed-messages.ts | 84 ++++++++++ .../opencode/test/session/message-v2.test.ts | 48 ++++++ packages/opencode/test/session/prompt.test.ts | 103 +++++++++++- 10 files changed, 514 insertions(+), 25 deletions(-) create mode 100644 packages/opencode/test/session/cancel.test.ts create mode 100644 packages/opencode/test/session/fixtures/skewed-messages.ts diff --git a/packages/opencode/src/cli/cmd/tui/component/prompt/index.tsx b/packages/opencode/src/cli/cmd/tui/component/prompt/index.tsx index 747c61fd0bf9..e7796d5101d7 100644 --- a/packages/opencode/src/cli/cmd/tui/component/prompt/index.tsx +++ b/packages/opencode/src/cli/cmd/tui/component/prompt/index.tsx @@ -13,6 +13,7 @@ import { useSync } from "@tui/context/sync" import { MessageID, PartID } from "@/session/schema" import { createStore, produce } from "solid-js/store" import { useKeybind } from "@tui/context/keybind" +import { Log } from "@/util/log" import { usePromptHistory, type PromptInfo } from "./history" import { assign } from "./part" import { usePromptStash } from "./stash" @@ -277,7 +278,7 @@ export function Prompt(props: PromptProps) { if (store.interrupt >= 2) { sdk.client.session.abort({ sessionID: props.sessionID, - }) + }).catch(() => {}) setStore("interrupt", 0) } dialog.clear() @@ -608,10 +609,10 @@ export function Prompt(props: PromptProps) { }) if (res.error) { - console.log("Creating a session failed:", res.error) + Log.Default.error("session creation failed", { error: res.error }) toast.show({ - message: "Creating a session failed. Open console for more details.", + message: "Creating a session failed", variant: "error", }) @@ -687,7 +688,7 @@ export function Prompt(props: PromptProps) { id: PartID.ascending(), ...x, })), - }) + }).catch(() => {}) } else { sdk.client.session .prompt({ diff --git a/packages/opencode/src/server/routes/session.ts b/packages/opencode/src/server/routes/session.ts index b4c13bca0f11..e3a0dd8ab1f2 100644 --- a/packages/opencode/src/server/routes/session.ts +++ b/packages/opencode/src/server/routes/session.ts @@ -21,6 +21,7 @@ import { errors } from "../error" import { lazy } from "../../util/lazy" import { Bus } from "../../bus" import { NamedError } from "@opencode-ai/util/error" +import { Instance } from "@/project/instance" const log = Log.create({ service: "server" }) @@ -194,7 +195,7 @@ export const SessionRoutes = lazy(() => description: "Create a new OpenCode session for interacting with AI assistants and managing conversations.", operationId: "session.create", responses: { - ...errors(400), + ...errors(400, 409), 200: { description: "Successfully created session", content: { @@ -381,7 +382,7 @@ export const SessionRoutes = lazy(() => }), ), async (c) => { - await SessionPrompt.cancel(c.req.valid("param").sessionID) + await SessionPrompt.cancel(c.req.valid("param").sessionID).catch(() => {}) return c.json(true) }, ) @@ -843,19 +844,25 @@ export const SessionRoutes = lazy(() => ), validator("json", SessionPrompt.PromptInput.omit({ sessionID: true })), async (c) => { - c.status(204) - c.header("Content-Type", "application/json") - return stream(c, async () => { - const sessionID = c.req.valid("param").sessionID - const body = c.req.valid("json") + const sessionID = c.req.valid("param").sessionID + const body = c.req.valid("json") + const run = Instance.bind(() => { SessionPrompt.prompt({ ...body, sessionID }).catch((err) => { - log.error("prompt_async failed", { sessionID, error: err }) + log.error("prompt_async failed", { + sessionID, + error: err, + stack: err instanceof Error ? err.stack : undefined, + }) + const error = MessageV2.fromError(err, { providerID: "unknown" as any }) Bus.publish(Session.Event.Error, { sessionID, - error: new NamedError.Unknown({ message: err instanceof Error ? err.message : String(err) }).toObject(), + error, }) }) }) + run() + c.status(204) + return c.body(null) }, ) .post( diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts index e8aab62d8423..bfca82cd11ae 100644 --- a/packages/opencode/src/session/message-v2.ts +++ b/packages/opencode/src/session/message-v2.ts @@ -696,6 +696,10 @@ export namespace MessageV2 { ) { continue } + // Skip incomplete assistant messages (no finish, no error, and no meaningful parts) + if (!msg.info.finish && !msg.info.error && !msg.parts.some((part) => part.type !== "step-start")) { + continue + } const assistantMessage: UIMessage = { id: msg.info.id, role: "assistant", @@ -972,7 +976,7 @@ export namespace MessageV2 { }, { cause: e }, ).toObject() - case APICallError.isInstance(e): + case APICallError.isInstance(e): { const parsed = ProviderError.parseAPICallError({ providerID: ctx.providerID, error: e, @@ -998,6 +1002,33 @@ export namespace MessageV2 { }, { cause: e }, ).toObject() + } + case e instanceof DOMException && e.name === "TimeoutError": + return new MessageV2.APIError( + { + message: "Request timed out", + isRetryable: true, + }, + { cause: e }, + ).toObject() + case e instanceof Error && + "code" in e && + typeof (e as SystemError).code === "string" && + ["ECONNREFUSED", "ENOTFOUND", "ETIMEDOUT", "EPIPE", "ECONNABORTED", "EHOSTUNREACH"].includes( + (e as SystemError).code ?? "", + ): + return new MessageV2.APIError( + { + message: `Network error: ${(e as SystemError).message}`, + isRetryable: true, + metadata: { + code: (e as SystemError).code ?? "", + syscall: (e as SystemError).syscall ?? "", + message: (e as SystemError).message ?? "", + }, + }, + { cause: e }, + ).toObject() case e instanceof Error: return new NamedError.Unknown({ message: errorMessage(e) }, { cause: e }).toObject() default: diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 146c73f27712..7789b48fd8ad 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -414,7 +414,14 @@ export namespace SessionProcessor { }) const halt = Effect.fn("SessionProcessor.halt")(function* (e: unknown) { - log.error("process", { error: e, stack: e instanceof Error ? e.stack : undefined }) + log.error("process", { + error: e, + stack: e instanceof Error ? e.stack : undefined, + providerID: ctx.assistantMessage.providerID, + modelID: ctx.assistantMessage.modelID, + sessionID: ctx.sessionID, + agent: ctx.assistantMessage.agent, + }) const error = parse(e) if (MessageV2.ContextOverflowError.isInstance(error)) { ctx.needsCompaction = true diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index c29733999214..1fc7b81b20e0 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -946,7 +946,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the }) const createUserMessage = Effect.fn("SessionPrompt.createUserMessage")(function* (input: PromptInput) { - const agentName = input.agent || (yield* agents.defaultAgent()) + const agentName = input.agent ?? (yield* lastPrimaryAgent(input.sessionID)) const ag = yield* agents.get(agentName) if (!ag) { const available = (yield* agents.list()).filter((a) => !a.hidden).map((a) => a.name) @@ -1308,6 +1308,26 @@ NOTE: At any point in time through this workflow you should feel free to ask the function* (input: PromptInput) { const session = yield* sessions.get(input.sessionID) yield* Effect.promise(() => SessionRevert.cleanup(session)) + + const text = input.parts.find((part): part is MessageV2.TextPart => part.type === "text" && !part.synthetic) + const slash = text?.text.trim() + if (slash?.startsWith("/") && !slash.startsWith("//")) { + const body = slash.slice(1) + const at = body.search(/\s/) + const cmd = at === -1 ? body : body.slice(0, at) + const args = at === -1 ? "" : body.slice(at + 1).trim() + return yield* command({ + sessionID: input.sessionID, + messageID: input.messageID, + agent: input.agent, + model: input.model ? `${input.model.providerID}/${input.model.modelID}` : undefined, + variant: input.variant, + command: cmd, + arguments: args, + parts: input.parts.filter((part) => part.type === "file"), + }) + } + const message = yield* createUserMessage(input) yield* sessions.touch(input.sessionID) @@ -1372,12 +1392,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the // Keep the loop running so tool results can be sent back to the model. const hasToolCalls = lastAssistantMsg?.parts.some((part) => part.type === "tool") ?? false - if ( - lastAssistant?.finish && - !["tool-calls"].includes(lastAssistant.finish) && - !hasToolCalls && - lastUser.id < lastAssistant.id - ) { + if (shouldExitLoop(lastUser, lastAssistant, lastFinished, hasToolCalls)) { log.info("exiting loop", { sessionID }) break } @@ -1873,6 +1888,49 @@ NOTE: At any point in time through this workflow you should feel free to ask the return runPromise((svc) => svc.command(CommandInput.parse(input))) } + const lastPrimaryAgent = Effect.fnUntraced(function* (sessionID: SessionID) { + const msgs = yield* Effect.promise(async () => { + const result: MessageV2.WithParts[] = [] + for await (const item of MessageV2.stream(sessionID)) result.push(item) + return result + }) + let name: string | undefined + for (const item of msgs) { + if (item.info.role !== "user") continue + if (!item.info.agent) continue + const agent = yield* Effect.promise(() => Agent.get(item.info.agent).catch(() => undefined)) + if (!agent || agent.hidden || agent.mode === "subagent") continue + name = agent.name + } + if (name) return name + return yield* Effect.promise(() => Agent.defaultAgent()) + }) + + export function shouldExitLoop( + lastUser: MessageV2.User | undefined, + lastAssistant: MessageV2.Assistant | undefined, + lastFinished?: MessageV2.Assistant, + hasToolCalls = false, + ): boolean { + if (!lastUser || hasToolCalls) return false + const done = lastFinished ?? lastAssistant + if (!done?.finish) return false + if (["tool-calls", "unknown"].includes(done.finish)) return false + if (!done.parentID) return true + return done.parentID === lastUser.id + } + + export function shouldWrapSystemReminder( + msg: MessageV2.User | MessageV2.Assistant, + idx: number, + lastFinished: MessageV2.Assistant | undefined, + finishedIdx: number, + ): boolean { + if (msg.role !== "user") return false + if (!lastFinished) return false + return idx > finishedIdx + } + /** @internal Exported for testing */ export function createStructuredOutputTool(input: { schema: Record diff --git a/packages/opencode/test/server/session-messages.test.ts b/packages/opencode/test/server/session-messages.test.ts index 89e6fba5c5fd..10482c7178fe 100644 --- a/packages/opencode/test/server/session-messages.test.ts +++ b/packages/opencode/test/server/session-messages.test.ts @@ -155,5 +155,7 @@ describe("session.prompt_async error handling", () => { const route = src.slice(start, end) expect(route).toContain(".catch(") expect(route).toContain("Bus.publish(Session.Event.Error") + expect(route).toContain("Instance.bind(") + expect(route).not.toContain("return stream(c") }) }) diff --git a/packages/opencode/test/session/cancel.test.ts b/packages/opencode/test/session/cancel.test.ts new file mode 100644 index 000000000000..d82d6a6193e9 --- /dev/null +++ b/packages/opencode/test/session/cancel.test.ts @@ -0,0 +1,154 @@ +import path from "path" +import { describe, expect, test } from "bun:test" +import { Instance } from "../../src/project/instance" +import { Session } from "../../src/session" +import { SessionPrompt } from "../../src/session/prompt" +import { SessionID } from "../../src/session/schema" +import { Log } from "../../src/util/log" +import { tmpdir } from "../fixture/fixture" + +Log.init({ print: false }) + +function defer() { + let resolve!: (value: T | PromiseLike) => void + const promise = new Promise((done) => { + resolve = done + }) + return { promise, resolve } +} + +function hanging(ready: () => void) { + const encoder = new TextEncoder() + let timer: ReturnType | undefined + const first = + `data: ${JSON.stringify({ + id: "chatcmpl-1", + object: "chat.completion.chunk", + choices: [{ delta: { role: "assistant" } }], + })}` + "\n\n" + const rest = + [ + `data: ${JSON.stringify({ + id: "chatcmpl-1", + object: "chat.completion.chunk", + choices: [{ delta: { content: "late" } }], + })}`, + `data: ${JSON.stringify({ + id: "chatcmpl-1", + object: "chat.completion.chunk", + choices: [{ delta: {}, finish_reason: "stop" }], + })}`, + "data: [DONE]", + ].join("\n\n") + "\n\n" + + return new ReadableStream({ + start(ctrl) { + ctrl.enqueue(encoder.encode(first)) + ready() + timer = setTimeout(() => { + ctrl.enqueue(encoder.encode(rest)) + ctrl.close() + }, 10000) + }, + cancel() { + if (timer) clearTimeout(timer) + }, + }) +} + +describe("SessionPrompt.cancel", () => { + test("does not produce unhandled rejections", async () => { + const rejections: unknown[] = [] + const handler = (e: unknown) => rejections.push(e) + process.on("unhandledRejection", handler) + const ready = defer() + const server = Bun.serve({ + port: 0, + fetch(req) { + const url = new URL(req.url) + if (!url.pathname.endsWith("/chat/completions")) { + return new Response("not found", { status: 404 }) + } + return new Response( + hanging(() => ready.resolve()), + { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }, + ) + }, + }) + + try { + await using tmp = await tmpdir({ + git: true, + init: async (dir) => { + await Bun.write( + path.join(dir, "opencode.json"), + JSON.stringify({ + $schema: "https://opencode.ai/config.json", + enabled_providers: ["alibaba"], + provider: { + alibaba: { + options: { + apiKey: "test-key", + baseURL: `${server.url.origin}/v1`, + }, + }, + }, + agent: { + build: { + model: "alibaba/qwen-plus", + }, + }, + }), + ) + }, + }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const session = await Session.create({ title: "Prompt cancel regression" }) + const run = SessionPrompt.prompt({ + sessionID: session.id, + agent: "build", + parts: [{ type: "text", text: "Cancel me" }], + }) + + await ready.promise + await SessionPrompt.cancel(session.id) + + const result = await Promise.race([ + run, + new Promise((_, reject) => + setTimeout(() => reject(new Error("timed out waiting for cancel")), 1000), + ), + ]) + + expect(result.info.role).toBe("assistant") + if (result.info.role === "assistant") { + expect(result.info.error?.name).toBe("MessageAbortedError") + } + + await new Promise((r) => setTimeout(r, 50)) + expect(rejections).toEqual([]) + }, + }) + } finally { + server.stop(true) + process.off("unhandledRejection", handler) + } + }) + + test("cancel on non-existent session does not throw", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + await SessionPrompt.cancel(SessionID.make("ses_nonexistent")) + }, + }) + }) +}) diff --git a/packages/opencode/test/session/fixtures/skewed-messages.ts b/packages/opencode/test/session/fixtures/skewed-messages.ts new file mode 100644 index 000000000000..e30b4f95ec39 --- /dev/null +++ b/packages/opencode/test/session/fixtures/skewed-messages.ts @@ -0,0 +1,84 @@ +import { Identifier } from "../../../src/id/id" +import { MessageV2 } from "../../../src/session/message-v2" +import { MessageID, SessionID } from "../../../src/session/schema" +import { ModelID, ProviderID } from "../../../src/provider/schema" + +export function makeUser( + id: MessageID, + opts?: Partial, +): MessageV2.User { + return { + id, + sessionID: SessionID.make("test-session"), + role: "user", + time: { created: Date.now() }, + agent: "default", + model: { + providerID: ProviderID.openai, + modelID: ModelID.make("gpt-4"), + }, + ...opts, + } +} + +export function makeAssistant( + id: MessageID, + parentID: MessageID, + opts?: Partial, +): MessageV2.Assistant { + return { + id, + sessionID: SessionID.make("test-session"), + role: "assistant", + parentID, + time: { created: Date.now() }, + modelID: ModelID.make("gpt-4"), + providerID: ProviderID.openai, + mode: "default", + agent: "default", + path: { + cwd: "/tmp", + root: "/tmp", + }, + cost: 0, + tokens: { + input: 0, + output: 0, + reasoning: 0, + cache: { + read: 0, + write: 0, + }, + }, + finish: "stop", + ...opts, + } +} + +export function aheadPair(): { + user: MessageV2.User + assistant: MessageV2.Assistant +} { + const now = Date.now() + const userID = MessageID.make(Identifier.create("message", false, now + 60_000)) + const assistantID = MessageID.make(Identifier.create("message", false, now)) + + return { + user: makeUser(userID), + assistant: makeAssistant(assistantID, userID), + } +} + +export function behindPair(): { + user: MessageV2.User + assistant: MessageV2.Assistant +} { + const now = Date.now() + const userID = MessageID.make(Identifier.create("message", false, now - 60_000)) + const assistantID = MessageID.make(Identifier.create("message", false, now)) + + return { + user: makeUser(userID), + assistant: makeAssistant(assistantID, userID), + } +} diff --git a/packages/opencode/test/session/message-v2.test.ts b/packages/opencode/test/session/message-v2.test.ts index 3634d6fb7ec8..33d3ed373514 100644 --- a/packages/opencode/test/session/message-v2.test.ts +++ b/packages/opencode/test/session/message-v2.test.ts @@ -644,6 +644,54 @@ describe("session.message-v2.toModelMessage", () => { ]) }) + test("toModelMessages skips assistant messages with no finish and no error", async () => { + const userID = "m-user" + const assistantID = "m-assistant" + + const input: MessageV2.WithParts[] = [ + { + info: userInfo(userID), + parts: [ + { + ...basePart(userID, "u1"), + type: "text", + text: "hello", + }, + ] as MessageV2.Part[], + }, + { + info: assistantInfo(assistantID, userID), + parts: [ + { + ...basePart(assistantID, "a1"), + type: "step-start", + }, + ] as MessageV2.Part[], + }, + { + info: userInfo("m-user-2"), + parts: [ + { + ...basePart("m-user-2", "u2"), + type: "text", + text: "follow up", + }, + ] as MessageV2.Part[], + }, + ] + + expect(await MessageV2.toModelMessages(input, model)).toStrictEqual([ + { + role: "user", + content: [{ type: "text", text: "hello" }], + }, + { + role: "user", + content: [{ type: "text", text: "follow up" }], + }, + ]) + }) + test("splits assistant messages on step-start boundaries", async () => { const assistantID = "m-assistant" diff --git a/packages/opencode/test/session/prompt.test.ts b/packages/opencode/test/session/prompt.test.ts index bf7b99ef2e23..3ca1d25b8c17 100644 --- a/packages/opencode/test/session/prompt.test.ts +++ b/packages/opencode/test/session/prompt.test.ts @@ -9,6 +9,8 @@ import { MessageV2 } from "../../src/session/message-v2" import { SessionPrompt } from "../../src/session/prompt" import { Log } from "../../src/util/log" import { tmpdir } from "../fixture/fixture" +import { MessageID } from "../../src/session/schema" +import { behindPair, makeUser, makeAssistant } from "./fixtures/skewed-messages" Log.init({ print: false }) @@ -447,6 +449,46 @@ describe("session.prompt agent variant", () => { }) describe("session.agent-resolution", () => { + test("reuses the last primary agent when agent is omitted", async () => { + await using tmp = await tmpdir({ + git: true, + config: { + default_agent: "sisyphus", + agent: { + hephaestus: { + model: "openai/gpt-5.2", + }, + sisyphus: { + model: "openai/gpt-5.2", + }, + }, + }, + }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const session = await Session.create({}) + + const first = await SessionPrompt.prompt({ + sessionID: session.id, + agent: "hephaestus", + noReply: true, + parts: [{ type: "text", text: "first" }], + }) + if (first.info.role !== "user") throw new Error("expected user message") + expect(first.info.agent).toBe("hephaestus") + + const second = await SessionPrompt.prompt({ + sessionID: session.id, + noReply: true, + parts: [{ type: "text", text: "second" }], + }) + if (second.info.role !== "user") throw new Error("expected user message") + expect(second.info.agent).toBe("hephaestus") + }, + }) + }) + test("unknown agent throws typed error", async () => { await using tmp = await tmpdir({ git: true }) await Instance.provide({ @@ -501,10 +543,10 @@ describe("session.agent-resolution", () => { directory: tmp.path, fn: async () => { const session = await Session.create({}) - const err = await SessionPrompt.command({ + const err = await SessionPrompt.prompt({ sessionID: session.id, - command: "nonexistent-command-xyz", - arguments: "", + noReply: true, + parts: [{ type: "text", text: "/nonexistent-command-xyz" }], }).then( () => undefined, (e) => e, @@ -520,3 +562,58 @@ describe("session.agent-resolution", () => { }) }, 30000) }) + +describe("shouldExitLoop", () => { + const user = { id: MessageID.make("msg-user-1") } as any + const assistant = (parentID: string | undefined, finish: string | undefined) => + ({ + role: "assistant", + finish, + parentID: parentID ? MessageID.make(parentID) : undefined, + }) as any + + test("normal exit: parentID matches, finish=end_turn \u2192 true", () => { + expect(SessionPrompt.shouldExitLoop(user, assistant("msg-user-1", "end_turn"))).toBe(true) + }) + + test("clock-skew exit: parentID matches, finish=stop \u2192 true", () => { + expect(SessionPrompt.shouldExitLoop(user, assistant("msg-user-1", "stop"))).toBe(true) + }) + + test("tool-calls: finish=tool-calls \u2192 false", () => { + expect(SessionPrompt.shouldExitLoop(user, assistant("msg-user-1", "tool-calls"))).toBe(false) + }) + + test("unknown: finish=unknown \u2192 false", () => { + expect(SessionPrompt.shouldExitLoop(user, assistant("msg-user-1", "unknown"))).toBe(false) + }) + + test("no assistant: lastAssistant=undefined \u2192 false", () => { + expect(SessionPrompt.shouldExitLoop(user, undefined)).toBe(false) + }) + + test("no finish: finish=undefined \u2192 false", () => { + expect(SessionPrompt.shouldExitLoop(user, assistant("msg-user-1", undefined))).toBe(false) + }) + + test("parentID mismatch: assistant.parentID !== user.id \u2192 false", () => { + expect(SessionPrompt.shouldExitLoop(user, assistant("msg-other-user", "end_turn"))).toBe(false) + }) + + test("no user: lastUser=undefined \u2192 false", () => { + expect(SessionPrompt.shouldExitLoop(undefined, assistant("msg-user-1", "end_turn"))).toBe(false) + }) + + test("should return true when assistant has missing parentID (fail-safe)", () => { + expect(SessionPrompt.shouldExitLoop(user, assistant(undefined, "end_turn"))).toBe(true) + }) +}) + +describe("system-reminder wrapping", () => { + test("wraps queued user messages based on position, not ID ordering", () => { + const { user, assistant } = behindPair() + expect(user.id < assistant.id).toBe(true) + expect(SessionPrompt.shouldWrapSystemReminder(user, 2, assistant, 1)).toBe(true) + expect(SessionPrompt.shouldWrapSystemReminder(user, 1, assistant, 1)).toBe(false) + }) +})