diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 377f805d3f1d..fc52838bc977 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -483,7 +483,7 @@ export namespace SessionProcessor { const halt = Effect.fn("SessionProcessor.halt")(function* (e: unknown) { log.error("process", { error: e, stack: e instanceof Error ? e.stack : undefined }) const error = parse(e) - if (MessageV2.ContextOverflowError.isInstance(error)) { + if (MessageV2.ContextOverflowError.isInstance(error) || e instanceof RepetitionError) { ctx.needsCompaction = true yield* bus.publish(Session.Event.Error, { sessionID: ctx.sessionID, error }) return diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index 7f1740850dc2..9c1bf16d9369 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -126,7 +126,7 @@ export namespace SessionPrompt { yield* status.set(sessionID, { type: "idle" }) }), onBusy: status.set(sessionID, { type: "busy" }), - onInterrupt: lastAssistant(sessionID), + onInterrupt: latestAssistant(sessionID), busy: () => { throw new Session.BusyError(sessionID) }, @@ -250,6 +250,17 @@ export namespace SessionPrompt { ) }) + const sanitize = (msgs: MessageV2.WithParts[]) => { + const drop = msgs.filter((msg) => msg.info.role === "assistant" && msg.info.summary !== true && !msg.info.finish) + if (drop.length === 0) return msgs + const ids = new Set(drop.map((msg) => msg.info.id)) + log.warn("ignoring incomplete assistant messages", { + sessionID: drop[0]?.info.sessionID, + ids: [...ids], + }) + return msgs.filter((msg) => !ids.has(msg.info.id)) + } + const insertReminders = Effect.fn("SessionPrompt.insertReminders")(function* (input: { messages: MessageV2.WithParts[] agent: Agent.Info @@ -1324,7 +1335,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the }, ) - const lastAssistant = (sessionID: SessionID) => + const latestAssistant: (sessionID: SessionID) => Effect.Effect = (sessionID) => Effect.promise(async () => { let latest: MessageV2.WithParts | undefined for await (const item of MessageV2.stream(sessionID)) { @@ -1346,7 +1357,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the yield* status.set(sessionID, { type: "busy" }) log.info("loop", { step, sessionID }) - let msgs = yield* MessageV2.filterCompactedEffect(sessionID) + let msgs = sanitize(yield* MessageV2.filterCompactedEffect(sessionID)) let lastUser: MessageV2.User | undefined let lastAssistant: MessageV2.Assistant | undefined @@ -1378,7 +1389,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the lastUser.id < lastAssistant.id ) { log.info("exiting loop", { sessionID }) - break + return lastAssistantMsg ?? (yield* latestAssistant(sessionID)) } step++ @@ -1563,7 +1574,8 @@ NOTE: At any point in time through this workflow you should feel free to ask the } yield* compaction.prune({ sessionID }).pipe(Effect.ignore, Effect.forkIn(scope)) - return yield* lastAssistant(sessionID) + const msgs = sanitize(yield* MessageV2.filterCompactedEffect(sessionID)) + return msgs.findLast((msg) => msg.info.role === "assistant") ?? (yield* latestAssistant(sessionID)) }, ) diff --git a/packages/opencode/test/scenario/harness.ts b/packages/opencode/test/scenario/harness.ts index 2e1680b9acf1..5f8fa87745a1 100644 --- a/packages/opencode/test/scenario/harness.ts +++ b/packages/opencode/test/scenario/harness.ts @@ -30,8 +30,6 @@ import { Todo } from "../../src/session/todo" import { ToolRegistry } from "../../src/tool/registry" import { Truncate } from "../../src/tool/truncate" import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner" -import { Question } from "../../src/question" -import { Todo } from "../../src/session/todo" import { Instance } from "../../src/project/instance" import { provideTmpdirInstance } from "../fixture/fixture" import { testEffect } from "../lib/effect" diff --git a/packages/opencode/test/session/processor-effect.test.ts b/packages/opencode/test/session/processor-effect.test.ts index 0fc25c1a6b41..5f1912c6087d 100644 --- a/packages/opencode/test/session/processor-effect.test.ts +++ b/packages/opencode/test/session/processor-effect.test.ts @@ -544,6 +544,49 @@ it.live("session.processor effect tests compact on structured context overflow", ), ) +it.live("session.processor effect tests compact on repetition loops in text output", () => + provideTmpdirServer( + ({ dir, llm }) => + Effect.gen(function* () { + const { processors, session, provider } = yield* boot() + + yield* llm.push(reply().text("".repeat(1200)).stop()) + + const chat = yield* session.create({}) + const parent = yield* user(chat.id, "glm loop") + const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) + const mdl = yield* provider.getModel(ref.providerID, ref.modelID) + const handle = yield* processors.create({ + assistantMessage: msg, + sessionID: chat.id, + model: mdl, + }) + + const value = yield* handle.process({ + user: { + id: parent.id, + sessionID: chat.id, + role: "user", + time: parent.time, + agent: parent.agent, + model: { providerID: ref.providerID, modelID: ref.modelID }, + } satisfies MessageV2.User, + sessionID: chat.id, + model: mdl, + agent: agent(), + system: [], + messages: [{ role: "user", content: "glm loop" }], + tools: {}, + }) + + expect(value).toBe("compact") + expect(yield* llm.calls).toBe(1) + expect(handle.message.error).toBeUndefined() + }), + { git: true, config: (url) => providerCfg(url) }, + ), +) + it.live("session.processor effect tests mark pending tools as aborted on cleanup", () => provideTmpdirServer( ({ dir, llm }) => diff --git a/packages/opencode/test/session/prompt-effect.test.ts b/packages/opencode/test/session/prompt-effect.test.ts index 17689cf274ec..c2029e7da42f 100644 --- a/packages/opencode/test/session/prompt-effect.test.ts +++ b/packages/opencode/test/session/prompt-effect.test.ts @@ -284,6 +284,34 @@ const seed = Effect.fn("test.seed")(function* (sessionID: SessionID, opts?: { fi return { user: msg, assistant } }) +const broken = Effect.fn("test.broken")(function* (sessionID: SessionID, text: string, parentID?: MessageID) { + const session = yield* Session.Service + const id = parentID ?? (yield* user(sessionID, "hello")).id + const msg: MessageV2.Assistant = { + id: MessageID.ascending(), + role: "assistant", + parentID: id, + sessionID, + mode: "build", + agent: "build", + cost: 0, + path: { cwd: "/tmp", root: "/tmp" }, + tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, + modelID: ref.modelID, + providerID: ref.providerID, + time: { created: Date.now(), completed: Date.now() }, + } + yield* session.updateMessage(msg) + yield* session.updatePart({ + id: PartID.ascending(), + messageID: msg.id, + sessionID, + type: "text", + text, + }) + return msg +}) + const addSubtask = (sessionID: SessionID, messageID: MessageID, model = ref) => Effect.gen(function* () { const session = yield* Session.Service @@ -325,6 +353,54 @@ it.live("loop exits immediately when last assistant has stop finish", () => ), ) +it.live("loop ignores unfinished assistant pollution when building the next turn", () => + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ + title: "Pinned", + permission: [{ permission: "*", pattern: "*", action: "allow" }], + }) + const msg = yield* user(chat.id, "check PR #412") + yield* broken( + chat.id, + "`apps/liff` `StatusPage` `LiffProvider` `Wave 1B` `apps/liff` `StatusPage` `LiffProvider`", + msg.id, + ) + yield* llm.text("PR #412 is still open.") + + const result = yield* prompt.loop({ sessionID: chat.id }) + expect(result.info.role).toBe("assistant") + expect(result.parts.some((part) => part.type === "text" && part.text === "PR #412 is still open.")).toBe(true) + + const [hit] = yield* llm.inputs + expect(JSON.stringify(hit)).not.toContain("apps/liff") + expect(JSON.stringify(hit)).not.toContain("LiffProvider") + }), + { git: true, config: providerCfg }, + ), +) + +it.live("loop returns the last finished assistant when a stale unfinished assistant trails it", () => + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ title: "Pinned" }) + const seeded = yield* seed(chat.id, { finish: "stop" }) + yield* broken(chat.id, "stale broken thinking output", seeded.user.id) + + const result = yield* prompt.loop({ sessionID: chat.id }) + expect(result.info.role).toBe("assistant") + expect(result.info.id).toBe(seeded.assistant.id) + expect(result.parts.some((part) => part.type === "text" && part.text === "hi there")).toBe(true) + expect(yield* llm.calls).toBe(0) + }), + { git: true, config: providerCfg }, + ), +) + it.live("loop calls LLM and returns assistant message", () => provideTmpdirServer( Effect.fnUntraced(function* ({ llm }) {