From 9ecf822b83f781d8b11c431517e1771e6209c485 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Fri, 27 Mar 2026 17:29:57 -0400 Subject: [PATCH 1/5] refactor(session): effectify SessionCompaction service Migrate SessionCompaction to Effect service pattern with Bus.Service and Config.Service as layer dependencies. Bus.publish for Compacted event now goes through the Effect service instead of raw call. --- packages/opencode/src/session/compaction.ts | 573 +++++++++++--------- 1 file changed, 322 insertions(+), 251 deletions(-) diff --git a/packages/opencode/src/session/compaction.ts b/packages/opencode/src/session/compaction.ts index d352d4f07919..2fa3b5992233 100644 --- a/packages/opencode/src/session/compaction.ts +++ b/packages/opencode/src/session/compaction.ts @@ -16,6 +16,8 @@ import { Config } from "@/config/config" import { NotFoundError } from "@/storage/db" import { ProviderTransform } from "@/provider/transform" import { ModelID, ProviderID } from "@/provider/schema" +import { Effect, Layer, ServiceMap } from "effect" +import { makeRuntime } from "@/effect/run-service" export namespace SessionCompaction { const log = Log.create({ service: "session.compaction" }) @@ -30,153 +32,188 @@ export namespace SessionCompaction { } const COMPACTION_BUFFER = 20_000 + export const PRUNE_MINIMUM = 20_000 + export const PRUNE_PROTECT = 40_000 + const PRUNE_PROTECTED_TOOLS = ["skill"] - export async function isOverflow(input: { tokens: MessageV2.Assistant["tokens"]; model: Provider.Model }) { - const config = await Config.get() - if (config.compaction?.auto === false) return false - const context = input.model.limit.context - if (context === 0) return false - - const count = - input.tokens.total || - input.tokens.input + input.tokens.output + input.tokens.cache.read + input.tokens.cache.write - - const reserved = - config.compaction?.reserved ?? Math.min(COMPACTION_BUFFER, ProviderTransform.maxOutputTokens(input.model)) - const usable = input.model.limit.input - ? input.model.limit.input - reserved - : context - ProviderTransform.maxOutputTokens(input.model) - return count >= usable + export interface Interface { + readonly isOverflow: (input: { + tokens: MessageV2.Assistant["tokens"] + model: Provider.Model + }) => Effect.Effect + readonly prune: (input: { sessionID: SessionID }) => Effect.Effect + readonly process: (input: { + parentID: MessageID + messages: MessageV2.WithParts[] + sessionID: SessionID + abort: AbortSignal + auto: boolean + overflow?: boolean + }) => Effect.Effect<"continue" | "stop"> + readonly create: (input: { + sessionID: SessionID + agent: string + model: { providerID: ProviderID; modelID: ModelID } + auto: boolean + overflow?: boolean + }) => Effect.Effect } - export const PRUNE_MINIMUM = 20_000 - export const PRUNE_PROTECT = 40_000 + export class Service extends ServiceMap.Service()("@opencode/SessionCompaction") {} - const PRUNE_PROTECTED_TOOLS = ["skill"] + export const layer: Layer.Layer = Layer.effect( + Service, + Effect.gen(function* () { + const bus = yield* Bus.Service + const config = yield* Config.Service - // goes backwards through parts until there are 40_000 tokens worth of tool - // calls. then erases output of previous tool calls. idea is to throw away old - // tool calls that are no longer relevant. - export async function prune(input: { sessionID: SessionID }) { - const config = await Config.get() - if (config.compaction?.prune === false) return - log.info("pruning") - const msgs = await Session.messages({ sessionID: input.sessionID }).catch((err) => { - if (NotFoundError.isInstance(err)) return undefined - throw err - }) - if (!msgs) return - let total = 0 - let pruned = 0 - const toPrune = [] - let turns = 0 - - loop: for (let msgIndex = msgs.length - 1; msgIndex >= 0; msgIndex--) { - const msg = msgs[msgIndex] - if (msg.info.role === "user") turns++ - if (turns < 2) continue - if (msg.info.role === "assistant" && msg.info.summary) break loop - for (let partIndex = msg.parts.length - 1; partIndex >= 0; partIndex--) { - const part = msg.parts[partIndex] - if (part.type === "tool") - if (part.state.status === "completed") { - if (PRUNE_PROTECTED_TOOLS.includes(part.tool)) continue - - if (part.state.time.compacted) break loop - const estimate = Token.estimate(part.state.output) - total += estimate - if (total > PRUNE_PROTECT) { - pruned += estimate - toPrune.push(part) + const isOverflow = Effect.fn("SessionCompaction.isOverflow")(function* (input: { + tokens: MessageV2.Assistant["tokens"] + model: Provider.Model + }) { + const cfg = yield* config.get() + if (cfg.compaction?.auto === false) return false + const context = input.model.limit.context + if (context === 0) return false + + const count = + input.tokens.total || + input.tokens.input + input.tokens.output + input.tokens.cache.read + input.tokens.cache.write + + const reserved = + cfg.compaction?.reserved ?? Math.min(COMPACTION_BUFFER, ProviderTransform.maxOutputTokens(input.model)) + const usable = input.model.limit.input + ? input.model.limit.input - reserved + : context - ProviderTransform.maxOutputTokens(input.model) + return count >= usable + }) + + // goes backwards through parts until there are PRUNE_PROTECT tokens worth of tool + // calls, then erases output of older tool calls to free context space + const prune = Effect.fn("SessionCompaction.prune")(function* (input: { sessionID: SessionID }) { + const cfg = yield* config.get() + if (cfg.compaction?.prune === false) return + log.info("pruning") + + const msgs = yield* Effect.promise(() => + Session.messages({ sessionID: input.sessionID }).catch((err) => { + if (NotFoundError.isInstance(err)) return undefined + throw err + }), + ) + if (!msgs) return + + let total = 0 + let pruned = 0 + const toPrune: MessageV2.ToolPart[] = [] + let turns = 0 + + loop: for (let msgIndex = msgs.length - 1; msgIndex >= 0; msgIndex--) { + const msg = msgs[msgIndex] + if (msg.info.role === "user") turns++ + if (turns < 2) continue + if (msg.info.role === "assistant" && msg.info.summary) break loop + for (let partIndex = msg.parts.length - 1; partIndex >= 0; partIndex--) { + const part = msg.parts[partIndex] + if (part.type === "tool") + if (part.state.status === "completed") { + if (PRUNE_PROTECTED_TOOLS.includes(part.tool)) continue + if (part.state.time.compacted) break loop + const estimate = Token.estimate(part.state.output) + total += estimate + if (total > PRUNE_PROTECT) { + pruned += estimate + toPrune.push(part) + } + } + } + } + + log.info("found", { pruned, total }) + if (pruned > PRUNE_MINIMUM) { + for (const part of toPrune) { + if (part.state.status === "completed") { + part.state.time.compacted = Date.now() + yield* Effect.promise(() => Session.updatePart(part)) } } - } - } - log.info("found", { pruned, total }) - if (pruned > PRUNE_MINIMUM) { - for (const part of toPrune) { - if (part.state.status === "completed") { - part.state.time.compacted = Date.now() - await Session.updatePart(part) + log.info("pruned", { count: toPrune.length }) } - } - log.info("pruned", { count: toPrune.length }) - } - } + }) - export async function process(input: { - parentID: MessageID - messages: MessageV2.WithParts[] - sessionID: SessionID - abort: AbortSignal - auto: boolean - overflow?: boolean - }) { - const userMessage = input.messages.findLast((m) => m.info.id === input.parentID)!.info as MessageV2.User - - let messages = input.messages - let replay: MessageV2.WithParts | undefined - if (input.overflow) { - const idx = input.messages.findIndex((m) => m.info.id === input.parentID) - for (let i = idx - 1; i >= 0; i--) { - const msg = input.messages[i] - if (msg.info.role === "user" && !msg.parts.some((p) => p.type === "compaction")) { - replay = msg - messages = input.messages.slice(0, i) - break + const processCompaction = Effect.fn("SessionCompaction.process")(function* (input: { + parentID: MessageID + messages: MessageV2.WithParts[] + sessionID: SessionID + abort: AbortSignal + auto: boolean + overflow?: boolean + }) { + const userMessage = input.messages.findLast((m) => m.info.id === input.parentID)!.info as MessageV2.User + + let messages = input.messages + let replay: MessageV2.WithParts | undefined + if (input.overflow) { + const idx = input.messages.findIndex((m) => m.info.id === input.parentID) + for (let i = idx - 1; i >= 0; i--) { + const msg = input.messages[i] + if (msg.info.role === "user" && !msg.parts.some((p) => p.type === "compaction")) { + replay = msg + messages = input.messages.slice(0, i) + break + } + } + const hasContent = + replay && messages.some((m) => m.info.role === "user" && !m.parts.some((p) => p.type === "compaction")) + if (!hasContent) { + replay = undefined + messages = input.messages + } } - } - const hasContent = - replay && messages.some((m) => m.info.role === "user" && !m.parts.some((p) => p.type === "compaction")) - if (!hasContent) { - replay = undefined - messages = input.messages - } - } - - const agent = await Agent.get("compaction") - const model = agent.model - ? await Provider.getModel(agent.model.providerID, agent.model.modelID) - : await Provider.getModel(userMessage.model.providerID, userMessage.model.modelID) - const msg = (await Session.updateMessage({ - id: MessageID.ascending(), - role: "assistant", - parentID: input.parentID, - sessionID: input.sessionID, - mode: "compaction", - agent: "compaction", - variant: userMessage.variant, - summary: true, - path: { - cwd: Instance.directory, - root: Instance.worktree, - }, - cost: 0, - tokens: { - output: 0, - input: 0, - reasoning: 0, - cache: { read: 0, write: 0 }, - }, - modelID: model.id, - providerID: model.providerID, - time: { - created: Date.now(), - }, - })) as MessageV2.Assistant - const processor = SessionProcessor.create({ - assistantMessage: msg, - sessionID: input.sessionID, - model, - abort: input.abort, - }) - // Allow plugins to inject context or replace compaction prompt - const compacting = await Plugin.trigger( - "experimental.session.compacting", - { sessionID: input.sessionID }, - { context: [], prompt: undefined }, - ) - const defaultPrompt = `Provide a detailed prompt for continuing our conversation above. + + const result = yield* Effect.promise(async (): Promise<"continue" | "stop"> => { + const agent = await Agent.get("compaction") + const model = agent.model + ? await Provider.getModel(agent.model.providerID, agent.model.modelID) + : await Provider.getModel(userMessage.model.providerID, userMessage.model.modelID) + const msg = (await Session.updateMessage({ + id: MessageID.ascending(), + role: "assistant", + parentID: input.parentID, + sessionID: input.sessionID, + mode: "compaction", + agent: "compaction", + variant: userMessage.variant, + summary: true, + path: { + cwd: Instance.directory, + root: Instance.worktree, + }, + cost: 0, + tokens: { + output: 0, + input: 0, + reasoning: 0, + cache: { read: 0, write: 0 }, + }, + modelID: model.id, + providerID: model.providerID, + time: { + created: Date.now(), + }, + })) as MessageV2.Assistant + const processor = SessionProcessor.create({ + assistantMessage: msg, + sessionID: input.sessionID, + model, + abort: input.abort, + }) + const compacting = await Plugin.trigger( + "experimental.session.compacting", + { sessionID: input.sessionID }, + { context: [], prompt: undefined }, + ) + const defaultPrompt = `Provide a detailed prompt for continuing our conversation above. Focus on information that would be helpful for continuing the conversation, including what we did, what we're doing, which files we're working on, and what we're going to do next. The summary that you construct will be used so that another agent can read it and continue the work. @@ -204,133 +241,167 @@ When constructing the summary, try to stick to this template: [Construct a structured list of relevant files that have been read, edited, or created that pertain to the task at hand. If all the files in a directory are relevant, include the path to the directory.] ---` - const promptText = compacting.prompt ?? [defaultPrompt, ...compacting.context].join("\n\n") - const msgs = structuredClone(messages) - await Plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs }) - const result = await processor.process({ - user: userMessage, - agent, - abort: input.abort, - sessionID: input.sessionID, - tools: {}, - system: [], - messages: [ - ...(await MessageV2.toModelMessages(msgs, model, { stripMedia: true })), - { - role: "user", - content: [ - { - type: "text", - text: promptText, - }, - ], - }, - ], - model, - }) - - if (result === "compact") { - processor.message.error = new MessageV2.ContextOverflowError({ - message: replay - ? "Conversation history too large to compact - exceeds model context limit" - : "Session too large to compact - context exceeds model limit even after stripping media", - }).toObject() - processor.message.finish = "error" - await Session.updateMessage(processor.message) - return "stop" - } - - if (result === "continue" && input.auto) { - if (replay) { - const original = replay.info as MessageV2.User - const replayMsg = await Session.updateMessage({ - id: MessageID.ascending(), - role: "user", - sessionID: input.sessionID, - time: { created: Date.now() }, - agent: original.agent, - model: original.model, - format: original.format, - tools: original.tools, - system: original.system, - variant: original.variant, + const promptText = compacting.prompt ?? [defaultPrompt, ...compacting.context].join("\n\n") + const msgs = structuredClone(messages) + await Plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs }) + const result = await processor.process({ + user: userMessage, + agent, + abort: input.abort, + sessionID: input.sessionID, + tools: {}, + system: [], + messages: [ + ...(await MessageV2.toModelMessages(msgs, model, { stripMedia: true })), + { + role: "user", + content: [{ type: "text", text: promptText }], + }, + ], + model, + }) + + if (result === "compact") { + processor.message.error = new MessageV2.ContextOverflowError({ + message: replay + ? "Conversation history too large to compact - exceeds model context limit" + : "Session too large to compact - context exceeds model limit even after stripping media", + }).toObject() + processor.message.finish = "error" + await Session.updateMessage(processor.message) + return "stop" + } + + if (result === "continue" && input.auto) { + if (replay) { + const original = replay.info as MessageV2.User + const replayMsg = await Session.updateMessage({ + id: MessageID.ascending(), + role: "user", + sessionID: input.sessionID, + time: { created: Date.now() }, + agent: original.agent, + model: original.model, + format: original.format, + tools: original.tools, + system: original.system, + variant: original.variant, + }) + for (const part of replay.parts) { + if (part.type === "compaction") continue + const replayPart = + part.type === "file" && MessageV2.isMedia(part.mime) + ? { type: "text" as const, text: `[Attached ${part.mime}: ${part.filename ?? "file"}]` } + : part + await Session.updatePart({ + ...replayPart, + id: PartID.ascending(), + messageID: replayMsg.id, + sessionID: input.sessionID, + }) + } + } else { + const continueMsg = await Session.updateMessage({ + id: MessageID.ascending(), + role: "user", + sessionID: input.sessionID, + time: { created: Date.now() }, + agent: userMessage.agent, + model: userMessage.model, + }) + const text = + (input.overflow + ? "The previous request exceeded the provider's size limit due to large media attachments. The conversation was compacted and media files were removed from context. If the user was asking about attached images or files, explain that the attachments were too large to process and suggest they try again with smaller or fewer files.\n\n" + : "") + + "Continue if you have next steps, or stop and ask for clarification if you are unsure how to proceed." + await Session.updatePart({ + id: PartID.ascending(), + messageID: continueMsg.id, + sessionID: input.sessionID, + type: "text", + synthetic: true, + text, + time: { + start: Date.now(), + end: Date.now(), + }, + }) + } + } + if (processor.message.error) return "stop" as const + return "continue" as const }) - for (const part of replay.parts) { - if (part.type === "compaction") continue - const replayPart = - part.type === "file" && MessageV2.isMedia(part.mime) - ? { type: "text" as const, text: `[Attached ${part.mime}: ${part.filename ?? "file"}]` } - : part + if (result === "continue") yield* bus.publish(Event.Compacted, { sessionID: input.sessionID }) + return result + }) + + const create = Effect.fn("SessionCompaction.create")(function* (input: { + sessionID: SessionID + agent: string + model: { providerID: ProviderID; modelID: ModelID } + auto: boolean + overflow?: boolean + }) { + yield* Effect.promise(async () => { + const msg = await Session.updateMessage({ + id: MessageID.ascending(), + role: "user", + model: input.model, + sessionID: input.sessionID, + agent: input.agent, + time: { created: Date.now() }, + }) await Session.updatePart({ - ...replayPart, id: PartID.ascending(), - messageID: replayMsg.id, - sessionID: input.sessionID, + messageID: msg.id, + sessionID: msg.sessionID, + type: "compaction", + auto: input.auto, + overflow: input.overflow, }) - } - } else { - const continueMsg = await Session.updateMessage({ - id: MessageID.ascending(), - role: "user", - sessionID: input.sessionID, - time: { created: Date.now() }, - agent: userMessage.agent, - model: userMessage.model, }) - const text = - (input.overflow - ? "The previous request exceeded the provider's size limit due to large media attachments. The conversation was compacted and media files were removed from context. If the user was asking about attached images or files, explain that the attachments were too large to process and suggest they try again with smaller or fewer files.\n\n" - : "") + - "Continue if you have next steps, or stop and ask for clarification if you are unsure how to proceed." - await Session.updatePart({ - id: PartID.ascending(), - messageID: continueMsg.id, - sessionID: input.sessionID, - type: "text", - synthetic: true, - text, - time: { - start: Date.now(), - end: Date.now(), - }, - }) - } - } - if (processor.message.error) return "stop" - Bus.publish(Event.Compacted, { sessionID: input.sessionID }) - return "continue" + }) + + return Service.of({ + isOverflow, + prune, + process: processCompaction, + create, + }) + }), + ) + + export const defaultLayer = layer.pipe(Layer.provide(Bus.layer), Layer.provide(Config.defaultLayer)) + + const { runPromise } = makeRuntime(Service, defaultLayer) + + export async function isOverflow(input: { tokens: MessageV2.Assistant["tokens"]; model: Provider.Model }) { + return runPromise((svc) => svc.isOverflow(input)) + } + + export async function prune(input: { sessionID: SessionID }) { + return runPromise((svc) => svc.prune(input)) + } + + export async function process(input: { + parentID: MessageID + messages: MessageV2.WithParts[] + sessionID: SessionID + abort: AbortSignal + auto: boolean + overflow?: boolean + }) { + return runPromise((svc) => svc.process(input)) } export const create = fn( z.object({ sessionID: SessionID.zod, agent: z.string(), - model: z.object({ - providerID: ProviderID.zod, - modelID: ModelID.zod, - }), + model: z.object({ providerID: ProviderID.zod, modelID: ModelID.zod }), auto: z.boolean(), overflow: z.boolean().optional(), }), - async (input) => { - const msg = await Session.updateMessage({ - id: MessageID.ascending(), - role: "user", - model: input.model, - sessionID: input.sessionID, - agent: input.agent, - time: { - created: Date.now(), - }, - }) - await Session.updatePart({ - id: PartID.ascending(), - messageID: msg.id, - sessionID: msg.sessionID, - type: "compaction", - auto: input.auto, - overflow: input.overflow, - }) - }, + (input) => runPromise((svc) => svc.create(input)), ) } From 316915329f9c002d3126ba511d47025f2fffd7cd Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Fri, 27 Mar 2026 20:35:49 -0400 Subject: [PATCH 2/5] refactor(session): extract handleEvent and cleanup from SessionProcessor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pure structural refactor — no Effect yet. Extracts the switch statement into a standalone handleEvent function and the cleanup section into a cleanup function. ProcessorContext object shares mutable state via getters/setters. Zero behavior change (verified by two review agents). --- packages/opencode/src/session/processor.ts | 673 ++++++++++----------- 1 file changed, 336 insertions(+), 337 deletions(-) diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 84ea76656857..94fda16a360d 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -24,6 +24,318 @@ export namespace SessionProcessor { export type Info = Awaited> export type Result = Awaited> + interface ProcessorContext { + assistantMessage: MessageV2.Assistant + sessionID: SessionID + model: Provider.Model + abort: AbortSignal + toolcalls: Record + shouldBreak: boolean + snapshot: string | undefined + blocked: boolean + needsCompaction: boolean + currentText: MessageV2.TextPart | undefined + reasoningMap: Record + } + + type StreamResult = Awaited> + type StreamEvent = StreamResult["fullStream"] extends AsyncIterable ? T : never + + async function handleEvent(value: StreamEvent, ctx: ProcessorContext) { + switch (value.type) { + case "start": + await SessionStatus.set(ctx.sessionID, { type: "busy" }) + break + + case "reasoning-start": + if (value.id in ctx.reasoningMap) break + const reasoningPart = { + id: PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.assistantMessage.sessionID, + type: "reasoning" as const, + text: "", + time: { start: Date.now() }, + metadata: value.providerMetadata, + } + ctx.reasoningMap[value.id] = reasoningPart + await Session.updatePart(reasoningPart) + break + + case "reasoning-delta": + if (value.id in ctx.reasoningMap) { + const part = ctx.reasoningMap[value.id] + part.text += value.text + if (value.providerMetadata) part.metadata = value.providerMetadata + await Session.updatePartDelta({ + sessionID: part.sessionID, + messageID: part.messageID, + partID: part.id, + field: "text", + delta: value.text, + }) + } + break + + case "reasoning-end": + if (value.id in ctx.reasoningMap) { + const part = ctx.reasoningMap[value.id] + part.text = part.text.trimEnd() + part.time = { ...part.time, end: Date.now() } + if (value.providerMetadata) part.metadata = value.providerMetadata + await Session.updatePart(part) + delete ctx.reasoningMap[value.id] + } + break + + case "tool-input-start": { + const part = await Session.updatePart({ + id: ctx.toolcalls[value.id]?.id ?? PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.assistantMessage.sessionID, + type: "tool", + tool: value.toolName, + callID: value.id, + state: { status: "pending", input: {}, raw: "" }, + }) + ctx.toolcalls[value.id] = part as MessageV2.ToolPart + break + } + + case "tool-input-delta": + break + + case "tool-input-end": + break + + case "tool-call": { + const match = ctx.toolcalls[value.toolCallId] + if (match) { + const part = await Session.updatePart({ + ...match, + tool: value.toolName, + state: { status: "running", input: value.input, time: { start: Date.now() } }, + metadata: value.providerMetadata, + }) + ctx.toolcalls[value.toolCallId] = part as MessageV2.ToolPart + + const parts = await MessageV2.parts(ctx.assistantMessage.id) + const lastThree = parts.slice(-DOOM_LOOP_THRESHOLD) + + if ( + lastThree.length === DOOM_LOOP_THRESHOLD && + lastThree.every( + (p) => + p.type === "tool" && + p.tool === value.toolName && + p.state.status !== "pending" && + JSON.stringify(p.state.input) === JSON.stringify(value.input), + ) + ) { + const agent = await Agent.get(ctx.assistantMessage.agent) + await Permission.ask({ + permission: "doom_loop", + patterns: [value.toolName], + sessionID: ctx.assistantMessage.sessionID, + metadata: { tool: value.toolName, input: value.input }, + always: [value.toolName], + ruleset: agent.permission, + }) + } + } + break + } + + case "tool-result": { + const match = ctx.toolcalls[value.toolCallId] + if (match && match.state.status === "running") { + await Session.updatePart({ + ...match, + state: { + status: "completed", + input: value.input ?? match.state.input, + output: value.output.output, + metadata: value.output.metadata, + title: value.output.title, + time: { start: match.state.time.start, end: Date.now() }, + attachments: value.output.attachments, + }, + }) + delete ctx.toolcalls[value.toolCallId] + } + break + } + + case "tool-error": { + const match = ctx.toolcalls[value.toolCallId] + if (match && match.state.status === "running") { + await Session.updatePart({ + ...match, + state: { + status: "error", + input: value.input ?? match.state.input, + error: value.error instanceof Error ? value.error.message : String(value.error), + time: { start: match.state.time.start, end: Date.now() }, + }, + }) + if (value.error instanceof Permission.RejectedError || value.error instanceof Question.RejectedError) { + ctx.blocked = ctx.shouldBreak + } + delete ctx.toolcalls[value.toolCallId] + } + break + } + + case "error": + throw value.error + + case "start-step": + ctx.snapshot = await Snapshot.track() + await Session.updatePart({ + id: PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.sessionID, + snapshot: ctx.snapshot, + type: "step-start", + }) + break + + case "finish-step": { + const usage = Session.getUsage({ + model: ctx.model, + usage: value.usage, + metadata: value.providerMetadata, + }) + ctx.assistantMessage.finish = value.finishReason + ctx.assistantMessage.cost += usage.cost + ctx.assistantMessage.tokens = usage.tokens + await Session.updatePart({ + id: PartID.ascending(), + reason: value.finishReason, + snapshot: await Snapshot.track(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.assistantMessage.sessionID, + type: "step-finish", + tokens: usage.tokens, + cost: usage.cost, + }) + await Session.updateMessage(ctx.assistantMessage) + if (ctx.snapshot) { + const patch = await Snapshot.patch(ctx.snapshot) + if (patch.files.length) { + await Session.updatePart({ + id: PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.sessionID, + type: "patch", + hash: patch.hash, + files: patch.files, + }) + } + ctx.snapshot = undefined + } + SessionSummary.summarize({ + sessionID: ctx.sessionID, + messageID: ctx.assistantMessage.parentID, + }) + if ( + !ctx.assistantMessage.summary && + (await SessionCompaction.isOverflow({ tokens: usage.tokens, model: ctx.model })) + ) { + ctx.needsCompaction = true + } + break + } + + case "text-start": + ctx.currentText = { + id: PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.assistantMessage.sessionID, + type: "text", + text: "", + time: { start: Date.now() }, + metadata: value.providerMetadata, + } + await Session.updatePart(ctx.currentText) + break + + case "text-delta": + if (ctx.currentText) { + ctx.currentText.text += value.text + if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata + await Session.updatePartDelta({ + sessionID: ctx.currentText.sessionID, + messageID: ctx.currentText.messageID, + partID: ctx.currentText.id, + field: "text", + delta: value.text, + }) + } + break + + case "text-end": + if (ctx.currentText) { + ctx.currentText.text = ctx.currentText.text.trimEnd() + const textOutput = await Plugin.trigger( + "experimental.text.complete", + { + sessionID: ctx.sessionID, + messageID: ctx.assistantMessage.id, + partID: ctx.currentText.id, + }, + { text: ctx.currentText.text }, + ) + ctx.currentText.text = textOutput.text + ctx.currentText.time = { start: Date.now(), end: Date.now() } + if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata + await Session.updatePart(ctx.currentText) + } + ctx.currentText = undefined + break + + case "finish": + break + + default: + log.info("unhandled", { ...value }) + break + } + } + + async function cleanup(ctx: ProcessorContext) { + if (ctx.snapshot) { + const patch = await Snapshot.patch(ctx.snapshot) + if (patch.files.length) { + await Session.updatePart({ + id: PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.sessionID, + type: "patch", + hash: patch.hash, + files: patch.files, + }) + } + ctx.snapshot = undefined + } + const parts = await MessageV2.parts(ctx.assistantMessage.id) + for (const part of parts) { + if (part.type === "tool" && part.state.status !== "completed" && part.state.status !== "error") { + await Session.updatePart({ + ...part, + state: { + ...part.state, + status: "error", + error: "Tool execution aborted", + time: { start: Date.now(), end: Date.now() }, + }, + }) + } + } + ctx.assistantMessage.time.completed = Date.now() + await Session.updateMessage(ctx.assistantMessage) + } + export function create(input: { assistantMessage: MessageV2.Assistant sessionID: SessionID @@ -47,322 +359,41 @@ export namespace SessionProcessor { log.info("process") needsCompaction = false const shouldBreak = (await Config.get()).experimental?.continue_loop_on_deny !== true + + const ctx: ProcessorContext = { + assistantMessage: input.assistantMessage, + sessionID: input.sessionID, + model: input.model, + abort: input.abort, + toolcalls, + shouldBreak, + get snapshot() { return snapshot }, + set snapshot(v) { snapshot = v }, + get blocked() { return blocked }, + set blocked(v) { blocked = v }, + get needsCompaction() { return needsCompaction }, + set needsCompaction(v) { needsCompaction = v }, + currentText: undefined, + reasoningMap: {}, + } + while (true) { try { - let currentText: MessageV2.TextPart | undefined - let reasoningMap: Record = {} + ctx.currentText = undefined + ctx.reasoningMap = {} const stream = await LLM.stream(streamInput) for await (const value of stream.fullStream) { input.abort.throwIfAborted() - switch (value.type) { - case "start": - await SessionStatus.set(input.sessionID, { type: "busy" }) - break - - case "reasoning-start": - if (value.id in reasoningMap) { - continue - } - const reasoningPart = { - id: PartID.ascending(), - messageID: input.assistantMessage.id, - sessionID: input.assistantMessage.sessionID, - type: "reasoning" as const, - text: "", - time: { - start: Date.now(), - }, - metadata: value.providerMetadata, - } - reasoningMap[value.id] = reasoningPart - await Session.updatePart(reasoningPart) - break - - case "reasoning-delta": - if (value.id in reasoningMap) { - const part = reasoningMap[value.id] - part.text += value.text - if (value.providerMetadata) part.metadata = value.providerMetadata - await Session.updatePartDelta({ - sessionID: part.sessionID, - messageID: part.messageID, - partID: part.id, - field: "text", - delta: value.text, - }) - } - break - - case "reasoning-end": - if (value.id in reasoningMap) { - const part = reasoningMap[value.id] - part.text = part.text.trimEnd() - - part.time = { - ...part.time, - end: Date.now(), - } - if (value.providerMetadata) part.metadata = value.providerMetadata - await Session.updatePart(part) - delete reasoningMap[value.id] - } - break - - case "tool-input-start": - const part = await Session.updatePart({ - id: toolcalls[value.id]?.id ?? PartID.ascending(), - messageID: input.assistantMessage.id, - sessionID: input.assistantMessage.sessionID, - type: "tool", - tool: value.toolName, - callID: value.id, - state: { - status: "pending", - input: {}, - raw: "", - }, - }) - toolcalls[value.id] = part as MessageV2.ToolPart - break - - case "tool-input-delta": - break - - case "tool-input-end": - break - - case "tool-call": { - const match = toolcalls[value.toolCallId] - if (match) { - const part = await Session.updatePart({ - ...match, - tool: value.toolName, - state: { - status: "running", - input: value.input, - time: { - start: Date.now(), - }, - }, - metadata: value.providerMetadata, - }) - toolcalls[value.toolCallId] = part as MessageV2.ToolPart - - const parts = await MessageV2.parts(input.assistantMessage.id) - const lastThree = parts.slice(-DOOM_LOOP_THRESHOLD) - - if ( - lastThree.length === DOOM_LOOP_THRESHOLD && - lastThree.every( - (p) => - p.type === "tool" && - p.tool === value.toolName && - p.state.status !== "pending" && - JSON.stringify(p.state.input) === JSON.stringify(value.input), - ) - ) { - const agent = await Agent.get(input.assistantMessage.agent) - await Permission.ask({ - permission: "doom_loop", - patterns: [value.toolName], - sessionID: input.assistantMessage.sessionID, - metadata: { - tool: value.toolName, - input: value.input, - }, - always: [value.toolName], - ruleset: agent.permission, - }) - } - } - break - } - case "tool-result": { - const match = toolcalls[value.toolCallId] - if (match && match.state.status === "running") { - await Session.updatePart({ - ...match, - state: { - status: "completed", - input: value.input ?? match.state.input, - output: value.output.output, - metadata: value.output.metadata, - title: value.output.title, - time: { - start: match.state.time.start, - end: Date.now(), - }, - attachments: value.output.attachments, - }, - }) - - delete toolcalls[value.toolCallId] - } - break - } - - case "tool-error": { - const match = toolcalls[value.toolCallId] - if (match && match.state.status === "running") { - await Session.updatePart({ - ...match, - state: { - status: "error", - input: value.input ?? match.state.input, - error: value.error instanceof Error ? value.error.message : String(value.error), - time: { - start: match.state.time.start, - end: Date.now(), - }, - }, - }) - - if ( - value.error instanceof Permission.RejectedError || - value.error instanceof Question.RejectedError - ) { - blocked = shouldBreak - } - delete toolcalls[value.toolCallId] - } - break - } - case "error": - throw value.error - - case "start-step": - snapshot = await Snapshot.track() - await Session.updatePart({ - id: PartID.ascending(), - messageID: input.assistantMessage.id, - sessionID: input.sessionID, - snapshot, - type: "step-start", - }) - break - - case "finish-step": - const usage = Session.getUsage({ - model: input.model, - usage: value.usage, - metadata: value.providerMetadata, - }) - input.assistantMessage.finish = value.finishReason - input.assistantMessage.cost += usage.cost - input.assistantMessage.tokens = usage.tokens - await Session.updatePart({ - id: PartID.ascending(), - reason: value.finishReason, - snapshot: await Snapshot.track(), - messageID: input.assistantMessage.id, - sessionID: input.assistantMessage.sessionID, - type: "step-finish", - tokens: usage.tokens, - cost: usage.cost, - }) - await Session.updateMessage(input.assistantMessage) - if (snapshot) { - const patch = await Snapshot.patch(snapshot) - if (patch.files.length) { - await Session.updatePart({ - id: PartID.ascending(), - messageID: input.assistantMessage.id, - sessionID: input.sessionID, - type: "patch", - hash: patch.hash, - files: patch.files, - }) - } - snapshot = undefined - } - SessionSummary.summarize({ - sessionID: input.sessionID, - messageID: input.assistantMessage.parentID, - }) - if ( - !input.assistantMessage.summary && - (await SessionCompaction.isOverflow({ tokens: usage.tokens, model: input.model })) - ) { - needsCompaction = true - } - break - - case "text-start": - currentText = { - id: PartID.ascending(), - messageID: input.assistantMessage.id, - sessionID: input.assistantMessage.sessionID, - type: "text", - text: "", - time: { - start: Date.now(), - }, - metadata: value.providerMetadata, - } - await Session.updatePart(currentText) - break - - case "text-delta": - if (currentText) { - currentText.text += value.text - if (value.providerMetadata) currentText.metadata = value.providerMetadata - await Session.updatePartDelta({ - sessionID: currentText.sessionID, - messageID: currentText.messageID, - partID: currentText.id, - field: "text", - delta: value.text, - }) - } - break - - case "text-end": - if (currentText) { - currentText.text = currentText.text.trimEnd() - const textOutput = await Plugin.trigger( - "experimental.text.complete", - { - sessionID: input.sessionID, - messageID: input.assistantMessage.id, - partID: currentText.id, - }, - { text: currentText.text }, - ) - currentText.text = textOutput.text - currentText.time = { - start: Date.now(), - end: Date.now(), - } - if (value.providerMetadata) currentText.metadata = value.providerMetadata - await Session.updatePart(currentText) - } - currentText = undefined - break - - case "finish": - break - - default: - log.info("unhandled", { - ...value, - }) - continue - } + await handleEvent(value, ctx) if (needsCompaction) break } } catch (e: any) { - log.error("process", { - error: e, - stack: JSON.stringify(e.stack), - }) + log.error("process", { error: e, stack: JSON.stringify(e.stack) }) const error = MessageV2.fromError(e, { providerID: input.model.providerID, aborted: input.abort.aborted }) if (MessageV2.ContextOverflowError.isInstance(error)) { needsCompaction = true - Bus.publish(Session.Event.Error, { - sessionID: input.sessionID, - error, - }) + Bus.publish(Session.Event.Error, { sessionID: input.sessionID, error }) } else { const retry = SessionRetry.retryable(error) if (retry !== undefined) { @@ -385,39 +416,7 @@ export namespace SessionProcessor { await SessionStatus.set(input.sessionID, { type: "idle" }) } } - if (snapshot) { - const patch = await Snapshot.patch(snapshot) - if (patch.files.length) { - await Session.updatePart({ - id: PartID.ascending(), - messageID: input.assistantMessage.id, - sessionID: input.sessionID, - type: "patch", - hash: patch.hash, - files: patch.files, - }) - } - snapshot = undefined - } - const p = await MessageV2.parts(input.assistantMessage.id) - for (const part of p) { - if (part.type === "tool" && part.state.status !== "completed" && part.state.status !== "error") { - await Session.updatePart({ - ...part, - state: { - ...part.state, - status: "error", - error: "Tool execution aborted", - time: { - start: Date.now(), - end: Date.now(), - }, - }, - }) - } - } - input.assistantMessage.time.completed = Date.now() - await Session.updateMessage(input.assistantMessage) + await cleanup(ctx) if (needsCompaction) return "compact" if (blocked) return "stop" if (input.assistantMessage.error) return "stop" From c66be848e11361e4114f390148ef451f7cfe5be4 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Fri, 27 Mar 2026 20:44:13 -0400 Subject: [PATCH 3/5] refactor(session): convert SessionProcessor to Effect internals Replace the while(true) + for-await + try/catch pattern with: - Stream.fromAsyncIterable + Stream.runForEachWhile for event consumption - Recursive Effect for retry logic (preserves SessionRetry.delay backoff) - Effect.ensuring for cleanup guarantees - Effect.catch for error classification process() still returns Promise externally (via Effect.runPromise) so callers don't change. Event handling and cleanup logic unchanged. --- packages/opencode/src/session/processor.ts | 634 +++++++++++---------- 1 file changed, 328 insertions(+), 306 deletions(-) diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 94fda16a360d..0fb4954b19e0 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -16,6 +16,8 @@ import { Permission } from "@/permission" import { Question } from "@/question" import { PartID } from "./schema" import type { SessionID, MessageID } from "./schema" +import { Effect } from "effect" +import * as Stream from "effect/Stream" export namespace SessionProcessor { const DOOM_LOOP_THRESHOLD = 3 @@ -41,299 +43,303 @@ export namespace SessionProcessor { type StreamResult = Awaited> type StreamEvent = StreamResult["fullStream"] extends AsyncIterable ? T : never - async function handleEvent(value: StreamEvent, ctx: ProcessorContext) { - switch (value.type) { - case "start": - await SessionStatus.set(ctx.sessionID, { type: "busy" }) - break - - case "reasoning-start": - if (value.id in ctx.reasoningMap) break - const reasoningPart = { - id: PartID.ascending(), - messageID: ctx.assistantMessage.id, - sessionID: ctx.assistantMessage.sessionID, - type: "reasoning" as const, - text: "", - time: { start: Date.now() }, - metadata: value.providerMetadata, - } - ctx.reasoningMap[value.id] = reasoningPart - await Session.updatePart(reasoningPart) - break - - case "reasoning-delta": - if (value.id in ctx.reasoningMap) { - const part = ctx.reasoningMap[value.id] - part.text += value.text - if (value.providerMetadata) part.metadata = value.providerMetadata - await Session.updatePartDelta({ - sessionID: part.sessionID, - messageID: part.messageID, - partID: part.id, - field: "text", - delta: value.text, - }) - } - break - - case "reasoning-end": - if (value.id in ctx.reasoningMap) { - const part = ctx.reasoningMap[value.id] - part.text = part.text.trimEnd() - part.time = { ...part.time, end: Date.now() } - if (value.providerMetadata) part.metadata = value.providerMetadata - await Session.updatePart(part) - delete ctx.reasoningMap[value.id] - } - break - - case "tool-input-start": { - const part = await Session.updatePart({ - id: ctx.toolcalls[value.id]?.id ?? PartID.ascending(), - messageID: ctx.assistantMessage.id, - sessionID: ctx.assistantMessage.sessionID, - type: "tool", - tool: value.toolName, - callID: value.id, - state: { status: "pending", input: {}, raw: "" }, - }) - ctx.toolcalls[value.id] = part as MessageV2.ToolPart - break - } + function handleEvent(value: StreamEvent, ctx: ProcessorContext) { + return Effect.promise(async () => { + switch (value.type) { + case "start": + await SessionStatus.set(ctx.sessionID, { type: "busy" }) + break - case "tool-input-delta": - break + case "reasoning-start": + if (value.id in ctx.reasoningMap) break + const reasoningPart = { + id: PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.assistantMessage.sessionID, + type: "reasoning" as const, + text: "", + time: { start: Date.now() }, + metadata: value.providerMetadata, + } + ctx.reasoningMap[value.id] = reasoningPart + await Session.updatePart(reasoningPart) + break + + case "reasoning-delta": + if (value.id in ctx.reasoningMap) { + const part = ctx.reasoningMap[value.id] + part.text += value.text + if (value.providerMetadata) part.metadata = value.providerMetadata + await Session.updatePartDelta({ + sessionID: part.sessionID, + messageID: part.messageID, + partID: part.id, + field: "text", + delta: value.text, + }) + } + break - case "tool-input-end": - break + case "reasoning-end": + if (value.id in ctx.reasoningMap) { + const part = ctx.reasoningMap[value.id] + part.text = part.text.trimEnd() + part.time = { ...part.time, end: Date.now() } + if (value.providerMetadata) part.metadata = value.providerMetadata + await Session.updatePart(part) + delete ctx.reasoningMap[value.id] + } + break - case "tool-call": { - const match = ctx.toolcalls[value.toolCallId] - if (match) { + case "tool-input-start": { const part = await Session.updatePart({ - ...match, + id: ctx.toolcalls[value.id]?.id ?? PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.assistantMessage.sessionID, + type: "tool", tool: value.toolName, - state: { status: "running", input: value.input, time: { start: Date.now() } }, - metadata: value.providerMetadata, + callID: value.id, + state: { status: "pending", input: {}, raw: "" }, }) - ctx.toolcalls[value.toolCallId] = part as MessageV2.ToolPart + ctx.toolcalls[value.id] = part as MessageV2.ToolPart + break + } - const parts = await MessageV2.parts(ctx.assistantMessage.id) - const lastThree = parts.slice(-DOOM_LOOP_THRESHOLD) + case "tool-input-delta": + break - if ( - lastThree.length === DOOM_LOOP_THRESHOLD && - lastThree.every( - (p) => - p.type === "tool" && - p.tool === value.toolName && - p.state.status !== "pending" && - JSON.stringify(p.state.input) === JSON.stringify(value.input), - ) - ) { - const agent = await Agent.get(ctx.assistantMessage.agent) - await Permission.ask({ - permission: "doom_loop", - patterns: [value.toolName], - sessionID: ctx.assistantMessage.sessionID, - metadata: { tool: value.toolName, input: value.input }, - always: [value.toolName], - ruleset: agent.permission, + case "tool-input-end": + break + + case "tool-call": { + const match = ctx.toolcalls[value.toolCallId] + if (match) { + const part = await Session.updatePart({ + ...match, + tool: value.toolName, + state: { status: "running", input: value.input, time: { start: Date.now() } }, + metadata: value.providerMetadata, }) + ctx.toolcalls[value.toolCallId] = part as MessageV2.ToolPart + + const parts = await MessageV2.parts(ctx.assistantMessage.id) + const lastThree = parts.slice(-DOOM_LOOP_THRESHOLD) + + if ( + lastThree.length === DOOM_LOOP_THRESHOLD && + lastThree.every( + (p) => + p.type === "tool" && + p.tool === value.toolName && + p.state.status !== "pending" && + JSON.stringify(p.state.input) === JSON.stringify(value.input), + ) + ) { + const agent = await Agent.get(ctx.assistantMessage.agent) + await Permission.ask({ + permission: "doom_loop", + patterns: [value.toolName], + sessionID: ctx.assistantMessage.sessionID, + metadata: { tool: value.toolName, input: value.input }, + always: [value.toolName], + ruleset: agent.permission, + }) + } } + break + } + + case "tool-result": { + const match = ctx.toolcalls[value.toolCallId] + if (match && match.state.status === "running") { + await Session.updatePart({ + ...match, + state: { + status: "completed", + input: value.input ?? match.state.input, + output: value.output.output, + metadata: value.output.metadata, + title: value.output.title, + time: { start: match.state.time.start, end: Date.now() }, + attachments: value.output.attachments, + }, + }) + delete ctx.toolcalls[value.toolCallId] + } + break } - break - } - case "tool-result": { - const match = ctx.toolcalls[value.toolCallId] - if (match && match.state.status === "running") { + case "tool-error": { + const match = ctx.toolcalls[value.toolCallId] + if (match && match.state.status === "running") { + await Session.updatePart({ + ...match, + state: { + status: "error", + input: value.input ?? match.state.input, + error: value.error instanceof Error ? value.error.message : String(value.error), + time: { start: match.state.time.start, end: Date.now() }, + }, + }) + if (value.error instanceof Permission.RejectedError || value.error instanceof Question.RejectedError) { + ctx.blocked = ctx.shouldBreak + } + delete ctx.toolcalls[value.toolCallId] + } + break + } + + case "error": + throw value.error + + case "start-step": + ctx.snapshot = await Snapshot.track() await Session.updatePart({ - ...match, - state: { - status: "completed", - input: value.input ?? match.state.input, - output: value.output.output, - metadata: value.output.metadata, - title: value.output.title, - time: { start: match.state.time.start, end: Date.now() }, - attachments: value.output.attachments, - }, + id: PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.sessionID, + snapshot: ctx.snapshot, + type: "step-start", }) - delete ctx.toolcalls[value.toolCallId] - } - break - } + break - case "tool-error": { - const match = ctx.toolcalls[value.toolCallId] - if (match && match.state.status === "running") { + case "finish-step": { + const usage = Session.getUsage({ + model: ctx.model, + usage: value.usage, + metadata: value.providerMetadata, + }) + ctx.assistantMessage.finish = value.finishReason + ctx.assistantMessage.cost += usage.cost + ctx.assistantMessage.tokens = usage.tokens await Session.updatePart({ - ...match, - state: { - status: "error", - input: value.input ?? match.state.input, - error: value.error instanceof Error ? value.error.message : String(value.error), - time: { start: match.state.time.start, end: Date.now() }, - }, + id: PartID.ascending(), + reason: value.finishReason, + snapshot: await Snapshot.track(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.assistantMessage.sessionID, + type: "step-finish", + tokens: usage.tokens, + cost: usage.cost, }) - if (value.error instanceof Permission.RejectedError || value.error instanceof Question.RejectedError) { - ctx.blocked = ctx.shouldBreak + await Session.updateMessage(ctx.assistantMessage) + if (ctx.snapshot) { + const patch = await Snapshot.patch(ctx.snapshot) + if (patch.files.length) { + await Session.updatePart({ + id: PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.sessionID, + type: "patch", + hash: patch.hash, + files: patch.files, + }) + } + ctx.snapshot = undefined + } + SessionSummary.summarize({ + sessionID: ctx.sessionID, + messageID: ctx.assistantMessage.parentID, + }) + if ( + !ctx.assistantMessage.summary && + (await SessionCompaction.isOverflow({ tokens: usage.tokens, model: ctx.model })) + ) { + ctx.needsCompaction = true } - delete ctx.toolcalls[value.toolCallId] + break } - break - } - case "error": - throw value.error - - case "start-step": - ctx.snapshot = await Snapshot.track() - await Session.updatePart({ - id: PartID.ascending(), - messageID: ctx.assistantMessage.id, - sessionID: ctx.sessionID, - snapshot: ctx.snapshot, - type: "step-start", - }) - break + case "text-start": + ctx.currentText = { + id: PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.assistantMessage.sessionID, + type: "text", + text: "", + time: { start: Date.now() }, + metadata: value.providerMetadata, + } + await Session.updatePart(ctx.currentText) + break - case "finish-step": { - const usage = Session.getUsage({ - model: ctx.model, - usage: value.usage, - metadata: value.providerMetadata, - }) - ctx.assistantMessage.finish = value.finishReason - ctx.assistantMessage.cost += usage.cost - ctx.assistantMessage.tokens = usage.tokens - await Session.updatePart({ - id: PartID.ascending(), - reason: value.finishReason, - snapshot: await Snapshot.track(), - messageID: ctx.assistantMessage.id, - sessionID: ctx.assistantMessage.sessionID, - type: "step-finish", - tokens: usage.tokens, - cost: usage.cost, - }) - await Session.updateMessage(ctx.assistantMessage) - if (ctx.snapshot) { - const patch = await Snapshot.patch(ctx.snapshot) - if (patch.files.length) { - await Session.updatePart({ - id: PartID.ascending(), - messageID: ctx.assistantMessage.id, - sessionID: ctx.sessionID, - type: "patch", - hash: patch.hash, - files: patch.files, + case "text-delta": + if (ctx.currentText) { + ctx.currentText.text += value.text + if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata + await Session.updatePartDelta({ + sessionID: ctx.currentText.sessionID, + messageID: ctx.currentText.messageID, + partID: ctx.currentText.id, + field: "text", + delta: value.text, }) } - ctx.snapshot = undefined - } - SessionSummary.summarize({ - sessionID: ctx.sessionID, - messageID: ctx.assistantMessage.parentID, - }) - if ( - !ctx.assistantMessage.summary && - (await SessionCompaction.isOverflow({ tokens: usage.tokens, model: ctx.model })) - ) { - ctx.needsCompaction = true - } - break - } + break - case "text-start": - ctx.currentText = { - id: PartID.ascending(), - messageID: ctx.assistantMessage.id, - sessionID: ctx.assistantMessage.sessionID, - type: "text", - text: "", - time: { start: Date.now() }, - metadata: value.providerMetadata, - } - await Session.updatePart(ctx.currentText) - break - - case "text-delta": - if (ctx.currentText) { - ctx.currentText.text += value.text - if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata - await Session.updatePartDelta({ - sessionID: ctx.currentText.sessionID, - messageID: ctx.currentText.messageID, - partID: ctx.currentText.id, - field: "text", - delta: value.text, - }) - } - break - - case "text-end": - if (ctx.currentText) { - ctx.currentText.text = ctx.currentText.text.trimEnd() - const textOutput = await Plugin.trigger( - "experimental.text.complete", - { - sessionID: ctx.sessionID, - messageID: ctx.assistantMessage.id, - partID: ctx.currentText.id, - }, - { text: ctx.currentText.text }, - ) - ctx.currentText.text = textOutput.text - ctx.currentText.time = { start: Date.now(), end: Date.now() } - if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata - await Session.updatePart(ctx.currentText) - } - ctx.currentText = undefined - break + case "text-end": + if (ctx.currentText) { + ctx.currentText.text = ctx.currentText.text.trimEnd() + const textOutput = await Plugin.trigger( + "experimental.text.complete", + { + sessionID: ctx.sessionID, + messageID: ctx.assistantMessage.id, + partID: ctx.currentText.id, + }, + { text: ctx.currentText.text }, + ) + ctx.currentText.text = textOutput.text + ctx.currentText.time = { start: Date.now(), end: Date.now() } + if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata + await Session.updatePart(ctx.currentText) + } + ctx.currentText = undefined + break - case "finish": - break + case "finish": + break - default: - log.info("unhandled", { ...value }) - break - } + default: + log.info("unhandled", { ...value }) + break + } + }) } - async function cleanup(ctx: ProcessorContext) { - if (ctx.snapshot) { - const patch = await Snapshot.patch(ctx.snapshot) - if (patch.files.length) { - await Session.updatePart({ - id: PartID.ascending(), - messageID: ctx.assistantMessage.id, - sessionID: ctx.sessionID, - type: "patch", - hash: patch.hash, - files: patch.files, - }) + function cleanupEffect(ctx: ProcessorContext) { + return Effect.promise(async () => { + if (ctx.snapshot) { + const patch = await Snapshot.patch(ctx.snapshot) + if (patch.files.length) { + await Session.updatePart({ + id: PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.sessionID, + type: "patch", + hash: patch.hash, + files: patch.files, + }) + } + ctx.snapshot = undefined } - ctx.snapshot = undefined - } - const parts = await MessageV2.parts(ctx.assistantMessage.id) - for (const part of parts) { - if (part.type === "tool" && part.state.status !== "completed" && part.state.status !== "error") { - await Session.updatePart({ - ...part, - state: { - ...part.state, - status: "error", - error: "Tool execution aborted", - time: { start: Date.now(), end: Date.now() }, - }, - }) + const parts = await MessageV2.parts(ctx.assistantMessage.id) + for (const part of parts) { + if (part.type === "tool" && part.state.status !== "completed" && part.state.status !== "error") { + await Session.updatePart({ + ...part, + state: { + ...part.state, + status: "error", + error: "Tool execution aborted", + time: { start: Date.now(), end: Date.now() }, + }, + }) + } } - } - ctx.assistantMessage.time.completed = Date.now() - await Session.updateMessage(ctx.assistantMessage) + ctx.assistantMessage.time.completed = Date.now() + await Session.updateMessage(ctx.assistantMessage) + }) } export function create(input: { @@ -355,7 +361,7 @@ export namespace SessionProcessor { partFromToolCall(toolCallID: string) { return toolcalls[toolCallID] }, - async process(streamInput: LLM.StreamInput) { + async process(streamInput: LLM.StreamInput): Promise<"compact" | "stop" | "continue"> { log.info("process") needsCompaction = false const shouldBreak = (await Config.get()).experimental?.continue_loop_on_deny !== true @@ -377,51 +383,67 @@ export namespace SessionProcessor { reasoningMap: {}, } - while (true) { - try { - ctx.currentText = undefined - ctx.reasoningMap = {} - const stream = await LLM.stream(streamInput) + const consumeStream = Effect.gen(function* () { + ctx.currentText = undefined + ctx.reasoningMap = {} + const stream = yield* Effect.promise(() => LLM.stream(streamInput)) - for await (const value of stream.fullStream) { - input.abort.throwIfAborted() - await handleEvent(value, ctx) - if (needsCompaction) break - } - } catch (e: any) { - log.error("process", { error: e, stack: JSON.stringify(e.stack) }) - const error = MessageV2.fromError(e, { providerID: input.model.providerID, aborted: input.abort.aborted }) - if (MessageV2.ContextOverflowError.isInstance(error)) { - needsCompaction = true - Bus.publish(Session.Event.Error, { sessionID: input.sessionID, error }) - } else { - const retry = SessionRetry.retryable(error) - if (retry !== undefined) { - attempt++ - const delay = SessionRetry.delay(attempt, error.name === "APIError" ? error : undefined) - await SessionStatus.set(input.sessionID, { - type: "retry", - attempt, - message: retry, - next: Date.now() + delay, + yield* Stream.fromAsyncIterable(stream.fullStream, (e) => e as Error).pipe( + Stream.runForEachWhile((event) => + Effect.gen(function* () { + input.abort.throwIfAborted() + yield* handleEvent(event, ctx) + return !needsCompaction + }), + ), + ) + }) + + const loop: Effect.Effect = consumeStream.pipe( + Effect.catch((e: unknown) => + Effect.gen(function* () { + log.error("process", { error: e, stack: JSON.stringify((e as any)?.stack) }) + const error = MessageV2.fromError(e, { + providerID: input.model.providerID, + aborted: input.abort.aborted, + }) + if (MessageV2.ContextOverflowError.isInstance(error)) { + needsCompaction = true + Bus.publish(Session.Event.Error, { sessionID: input.sessionID, error }) + } else { + const retry = SessionRetry.retryable(error) + if (retry !== undefined) { + attempt++ + const delay = SessionRetry.delay(attempt, error.name === "APIError" ? error : undefined) + yield* Effect.promise(() => + SessionStatus.set(input.sessionID, { + type: "retry", + attempt, + message: retry, + next: Date.now() + delay, + }), + ) + yield* Effect.promise(() => SessionRetry.sleep(delay, input.abort).catch(() => {})) + yield* loop + return + } + input.assistantMessage.error = error + Bus.publish(Session.Event.Error, { + sessionID: input.assistantMessage.sessionID, + error: input.assistantMessage.error, }) - await SessionRetry.sleep(delay, input.abort).catch(() => {}) - continue + yield* Effect.promise(() => SessionStatus.set(input.sessionID, { type: "idle" })) } - input.assistantMessage.error = error - Bus.publish(Session.Event.Error, { - sessionID: input.assistantMessage.sessionID, - error: input.assistantMessage.error, - }) - await SessionStatus.set(input.sessionID, { type: "idle" }) - } - } - await cleanup(ctx) - if (needsCompaction) return "compact" - if (blocked) return "stop" - if (input.assistantMessage.error) return "stop" - return "continue" - } + }), + ), + ) + + await Effect.runPromise(loop.pipe(Effect.ensuring(cleanupEffect(ctx)))) + + if (needsCompaction) return "compact" + if (blocked) return "stop" + if (input.assistantMessage.error) return "stop" + return "continue" }, } return result From 6fbaed263636f909b34ecbdec0060775da1c2683 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Fri, 27 Mar 2026 20:50:40 -0400 Subject: [PATCH 4/5] refactor(session): deepen compaction effect migration --- packages/opencode/specs/effect-migration.md | 21 +- packages/opencode/src/session/compaction.ts | 299 +++++++------- packages/opencode/src/session/processor.ts | 105 +++-- .../opencode/test/session/compaction.test.ts | 382 +++++++++++++++++- 4 files changed, 618 insertions(+), 189 deletions(-) diff --git a/packages/opencode/specs/effect-migration.md b/packages/opencode/specs/effect-migration.md index f4acc6e52e0b..7b128db0e170 100644 --- a/packages/opencode/specs/effect-migration.md +++ b/packages/opencode/specs/effect-migration.md @@ -59,9 +59,10 @@ Rules: - Keep everything in one namespace, one file — no separate `service.ts` / `index.ts` split - `runPromise` goes inside the namespace (not exported unless tests need it) -- Facade functions are plain `async function` — no `fn()` wrappers +- Public async facades are usually plain `async function`; use `fn(...)` when the facade also needs validation - Use `Effect.fn("Namespace.method")` for all Effect functions (for tracing) - No `Layer.fresh` — InstanceState handles per-directory isolation +- Inside another Effect service, depend on `Foo.Service` directly instead of calling `Foo.get()`-style async facades ## Schema → Zod interop @@ -164,6 +165,20 @@ Prefer these first: - `Config` for effect-native configuration reads - `Clock` / `DateTime` for time reads inside effects +## Use collaborators directly + +Inside an effectified service, prefer another service's `Service` + `layer` over its exported async facade. + +- In the layer, `yield* Foo.Service` and provide `Foo.layer` or `Foo.defaultLayer` +- Inside Effect code, call `yield* foo.method(...)` +- Do not call `Foo.method(...)` when that function is the `runPromise` async facade +- Keep the async facade for non-Effect callers and legacy boundaries + +```ts +const foo = yield * Foo.Service +const item = yield * foo.get(id) +``` + ## Child processes For child process work in services, yield `ChildProcessSpawner.ChildProcessSpawner` in the layer and use `ChildProcess.make(...)`. @@ -212,8 +227,8 @@ Fully migrated (single namespace, InstanceState where needed, flattened facade): Still open and likely worth migrating: -- [ ] `Session` +- [x] `Session` - [ ] `SessionProcessor` - [ ] `SessionPrompt` -- [ ] `SessionCompaction` +- [x] `SessionCompaction` - [ ] `Provider` diff --git a/packages/opencode/src/session/compaction.ts b/packages/opencode/src/session/compaction.ts index 2fa3b5992233..608d1e53766f 100644 --- a/packages/opencode/src/session/compaction.ts +++ b/packages/opencode/src/session/compaction.ts @@ -61,11 +61,18 @@ export namespace SessionCompaction { export class Service extends ServiceMap.Service()("@opencode/SessionCompaction") {} - export const layer: Layer.Layer = Layer.effect( + export const layer: Layer.Layer< + Service, + never, + Bus.Service | Config.Service | Session.Service | Agent.Service | Plugin.Service + > = Layer.effect( Service, Effect.gen(function* () { const bus = yield* Bus.Service const config = yield* Config.Service + const session = yield* Session.Service + const agents = yield* Agent.Service + const plugin = yield* Plugin.Service const isOverflow = Effect.fn("SessionCompaction.isOverflow")(function* (input: { tokens: MessageV2.Assistant["tokens"] @@ -95,12 +102,9 @@ export namespace SessionCompaction { if (cfg.compaction?.prune === false) return log.info("pruning") - const msgs = yield* Effect.promise(() => - Session.messages({ sessionID: input.sessionID }).catch((err) => { - if (NotFoundError.isInstance(err)) return undefined - throw err - }), - ) + const msgs = yield* session + .messages({ sessionID: input.sessionID }) + .pipe(Effect.catchIf(NotFoundError.isInstance, () => Effect.succeed(undefined))) if (!msgs) return let total = 0 @@ -134,7 +138,7 @@ export namespace SessionCompaction { for (const part of toPrune) { if (part.state.status === "completed") { part.state.time.compacted = Date.now() - yield* Effect.promise(() => Session.updatePart(part)) + yield* session.updatePart(part) } } log.info("pruned", { count: toPrune.length }) @@ -171,49 +175,50 @@ export namespace SessionCompaction { } } - const result = yield* Effect.promise(async (): Promise<"continue" | "stop"> => { - const agent = await Agent.get("compaction") - const model = agent.model - ? await Provider.getModel(agent.model.providerID, agent.model.modelID) - : await Provider.getModel(userMessage.model.providerID, userMessage.model.modelID) - const msg = (await Session.updateMessage({ - id: MessageID.ascending(), - role: "assistant", - parentID: input.parentID, - sessionID: input.sessionID, - mode: "compaction", - agent: "compaction", - variant: userMessage.variant, - summary: true, - path: { - cwd: Instance.directory, - root: Instance.worktree, - }, - cost: 0, - tokens: { - output: 0, - input: 0, - reasoning: 0, - cache: { read: 0, write: 0 }, - }, - modelID: model.id, - providerID: model.providerID, - time: { - created: Date.now(), - }, - })) as MessageV2.Assistant - const processor = SessionProcessor.create({ - assistantMessage: msg, - sessionID: input.sessionID, - model, - abort: input.abort, - }) - const compacting = await Plugin.trigger( - "experimental.session.compacting", - { sessionID: input.sessionID }, - { context: [], prompt: undefined }, - ) - const defaultPrompt = `Provide a detailed prompt for continuing our conversation above. + const agent = yield* agents.get("compaction") + const model = yield* Effect.promise(() => + agent.model + ? Provider.getModel(agent.model.providerID, agent.model.modelID) + : Provider.getModel(userMessage.model.providerID, userMessage.model.modelID), + ) + const msg = (yield* session.updateMessage({ + id: MessageID.ascending(), + role: "assistant", + parentID: input.parentID, + sessionID: input.sessionID, + mode: "compaction", + agent: "compaction", + variant: userMessage.variant, + summary: true, + path: { + cwd: Instance.directory, + root: Instance.worktree, + }, + cost: 0, + tokens: { + output: 0, + input: 0, + reasoning: 0, + cache: { read: 0, write: 0 }, + }, + modelID: model.id, + providerID: model.providerID, + time: { + created: Date.now(), + }, + })) as MessageV2.Assistant + const processor = SessionProcessor.create({ + assistantMessage: msg, + sessionID: input.sessionID, + model, + abort: input.abort, + }) + const compacting = yield* plugin.trigger( + "experimental.session.compacting", + { sessionID: input.sessionID }, + { context: [], prompt: undefined }, + ) + const defaultPrompt = `Provide a detailed prompt for continuing our conversation above. Focus on information that would be helpful for continuing the conversation, including what we did, what we're doing, which files we're working on, and what we're going to do next. The summary that you construct will be used so that another agent can read it and continue the work. @@ -241,10 +246,12 @@ When constructing the summary, try to stick to this template: [Construct a structured list of relevant files that have been read, edited, or created that pertain to the task at hand. If all the files in a directory are relevant, include the path to the directory.] ---` - const promptText = compacting.prompt ?? [defaultPrompt, ...compacting.context].join("\n\n") - const msgs = structuredClone(messages) - await Plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs }) - const result = await processor.process({ + const prompt = compacting.prompt ?? [defaultPrompt, ...compacting.context].join("\n\n") + const msgs = structuredClone(messages) + yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs }) + const modelMessages = yield* Effect.promise(() => MessageV2.toModelMessages(msgs, model, { stripMedia: true })) + const result = yield* Effect.promise(() => + processor.process({ user: userMessage, agent, abort: input.abort, @@ -252,85 +259,87 @@ When constructing the summary, try to stick to this template: tools: {}, system: [], messages: [ - ...(await MessageV2.toModelMessages(msgs, model, { stripMedia: true })), + ...modelMessages, { role: "user", - content: [{ type: "text", text: promptText }], + content: [{ type: "text", text: prompt }], }, ], model, - }) - - if (result === "compact") { - processor.message.error = new MessageV2.ContextOverflowError({ - message: replay - ? "Conversation history too large to compact - exceeds model context limit" - : "Session too large to compact - context exceeds model limit even after stripping media", - }).toObject() - processor.message.finish = "error" - await Session.updateMessage(processor.message) - return "stop" - } + }), + ) - if (result === "continue" && input.auto) { - if (replay) { - const original = replay.info as MessageV2.User - const replayMsg = await Session.updateMessage({ - id: MessageID.ascending(), - role: "user", - sessionID: input.sessionID, - time: { created: Date.now() }, - agent: original.agent, - model: original.model, - format: original.format, - tools: original.tools, - system: original.system, - variant: original.variant, - }) - for (const part of replay.parts) { - if (part.type === "compaction") continue - const replayPart = - part.type === "file" && MessageV2.isMedia(part.mime) - ? { type: "text" as const, text: `[Attached ${part.mime}: ${part.filename ?? "file"}]` } - : part - await Session.updatePart({ - ...replayPart, - id: PartID.ascending(), - messageID: replayMsg.id, - sessionID: input.sessionID, - }) - } - } else { - const continueMsg = await Session.updateMessage({ - id: MessageID.ascending(), - role: "user", - sessionID: input.sessionID, - time: { created: Date.now() }, - agent: userMessage.agent, - model: userMessage.model, - }) - const text = - (input.overflow - ? "The previous request exceeded the provider's size limit due to large media attachments. The conversation was compacted and media files were removed from context. If the user was asking about attached images or files, explain that the attachments were too large to process and suggest they try again with smaller or fewer files.\n\n" - : "") + - "Continue if you have next steps, or stop and ask for clarification if you are unsure how to proceed." - await Session.updatePart({ + if (result === "compact") { + processor.message.error = new MessageV2.ContextOverflowError({ + message: replay + ? "Conversation history too large to compact - exceeds model context limit" + : "Session too large to compact - context exceeds model limit even after stripping media", + }).toObject() + processor.message.finish = "error" + yield* session.updateMessage(processor.message) + return "stop" + } + + if (result === "continue" && input.auto) { + if (replay) { + const original = replay.info as MessageV2.User + const replayMsg = yield* session.updateMessage({ + id: MessageID.ascending(), + role: "user", + sessionID: input.sessionID, + time: { created: Date.now() }, + agent: original.agent, + model: original.model, + format: original.format, + tools: original.tools, + system: original.system, + variant: original.variant, + }) + for (const part of replay.parts) { + if (part.type === "compaction") continue + const replayPart = + part.type === "file" && MessageV2.isMedia(part.mime) + ? { type: "text" as const, text: `[Attached ${part.mime}: ${part.filename ?? "file"}]` } + : part + yield* session.updatePart({ + ...replayPart, id: PartID.ascending(), - messageID: continueMsg.id, + messageID: replayMsg.id, sessionID: input.sessionID, - type: "text", - synthetic: true, - text, - time: { - start: Date.now(), - end: Date.now(), - }, }) } } - if (processor.message.error) return "stop" as const - return "continue" as const - }) + + if (!replay) { + const continueMsg = yield* session.updateMessage({ + id: MessageID.ascending(), + role: "user", + sessionID: input.sessionID, + time: { created: Date.now() }, + agent: userMessage.agent, + model: userMessage.model, + }) + const text = + (input.overflow + ? "The previous request exceeded the provider's size limit due to large media attachments. The conversation was compacted and media files were removed from context. If the user was asking about attached images or files, explain that the attachments were too large to process and suggest they try again with smaller or fewer files.\n\n" + : "") + + "Continue if you have next steps, or stop and ask for clarification if you are unsure how to proceed." + yield* session.updatePart({ + id: PartID.ascending(), + messageID: continueMsg.id, + sessionID: input.sessionID, + type: "text", + synthetic: true, + text, + time: { + start: Date.now(), + end: Date.now(), + }, + }) + } + } + + if (processor.message.error) return "stop" if (result === "continue") yield* bus.publish(Event.Compacted, { sessionID: input.sessionID }) return result }) @@ -342,23 +351,21 @@ When constructing the summary, try to stick to this template: auto: boolean overflow?: boolean }) { - yield* Effect.promise(async () => { - const msg = await Session.updateMessage({ - id: MessageID.ascending(), - role: "user", - model: input.model, - sessionID: input.sessionID, - agent: input.agent, - time: { created: Date.now() }, - }) - await Session.updatePart({ - id: PartID.ascending(), - messageID: msg.id, - sessionID: msg.sessionID, - type: "compaction", - auto: input.auto, - overflow: input.overflow, - }) + const msg = yield* session.updateMessage({ + id: MessageID.ascending(), + role: "user", + model: input.model, + sessionID: input.sessionID, + agent: input.agent, + time: { created: Date.now() }, + }) + yield* session.updatePart({ + id: PartID.ascending(), + messageID: msg.id, + sessionID: msg.sessionID, + type: "compaction", + auto: input.auto, + overflow: input.overflow, }) }) @@ -371,7 +378,17 @@ When constructing the summary, try to stick to this template: }), ) - export const defaultLayer = layer.pipe(Layer.provide(Bus.layer), Layer.provide(Config.defaultLayer)) + export const defaultLayer = Layer.unwrap( + Effect.sync(() => + layer.pipe( + Layer.provide(Session.defaultLayer), + Layer.provide(Agent.defaultLayer), + Layer.provide(Plugin.defaultLayer), + Layer.provide(Bus.layer), + Layer.provide(Config.defaultLayer), + ), + ), + ) const { runPromise } = makeRuntime(Service, defaultLayer) diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 0fb4954b19e0..477bd4d58110 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -16,7 +16,7 @@ import { Permission } from "@/permission" import { Question } from "@/question" import { PartID } from "./schema" import type { SessionID, MessageID } from "./schema" -import { Effect } from "effect" +import { Cause, Effect } from "effect" import * as Stream from "effect/Stream" export namespace SessionProcessor { @@ -373,12 +373,24 @@ export namespace SessionProcessor { abort: input.abort, toolcalls, shouldBreak, - get snapshot() { return snapshot }, - set snapshot(v) { snapshot = v }, - get blocked() { return blocked }, - set blocked(v) { blocked = v }, - get needsCompaction() { return needsCompaction }, - set needsCompaction(v) { needsCompaction = v }, + get snapshot() { + return snapshot + }, + set snapshot(v) { + snapshot = v + }, + get blocked() { + return blocked + }, + set blocked(v) { + blocked = v + }, + get needsCompaction() { + return needsCompaction + }, + set needsCompaction(v) { + needsCompaction = v + }, currentText: undefined, reasoningMap: {}, } @@ -388,7 +400,7 @@ export namespace SessionProcessor { ctx.reasoningMap = {} const stream = yield* Effect.promise(() => LLM.stream(streamInput)) - yield* Stream.fromAsyncIterable(stream.fullStream, (e) => e as Error).pipe( + yield* Stream.fromAsyncIterable(stream.fullStream, (e) => e).pipe( Stream.runForEachWhile((event) => Effect.gen(function* () { input.abort.throwIfAborted() @@ -399,43 +411,48 @@ export namespace SessionProcessor { ) }) - const loop: Effect.Effect = consumeStream.pipe( - Effect.catch((e: unknown) => - Effect.gen(function* () { - log.error("process", { error: e, stack: JSON.stringify((e as any)?.stack) }) - const error = MessageV2.fromError(e, { - providerID: input.model.providerID, - aborted: input.abort.aborted, - }) - if (MessageV2.ContextOverflowError.isInstance(error)) { - needsCompaction = true - Bus.publish(Session.Event.Error, { sessionID: input.sessionID, error }) - } else { - const retry = SessionRetry.retryable(error) - if (retry !== undefined) { - attempt++ - const delay = SessionRetry.delay(attempt, error.name === "APIError" ? error : undefined) - yield* Effect.promise(() => - SessionStatus.set(input.sessionID, { - type: "retry", - attempt, - message: retry, - next: Date.now() + delay, - }), - ) - yield* Effect.promise(() => SessionRetry.sleep(delay, input.abort).catch(() => {})) - yield* loop - return - } - input.assistantMessage.error = error - Bus.publish(Session.Event.Error, { - sessionID: input.assistantMessage.sessionID, - error: input.assistantMessage.error, - }) - yield* Effect.promise(() => SessionStatus.set(input.sessionID, { type: "idle" })) + const errorHandler = (e: unknown): Effect.Effect => + Effect.gen(function* () { + log.error("process", { error: e, stack: JSON.stringify((e as any)?.stack) }) + const error = MessageV2.fromError(e, { + providerID: input.model.providerID, + aborted: input.abort.aborted, + }) + if (MessageV2.ContextOverflowError.isInstance(error)) { + needsCompaction = true + Bus.publish(Session.Event.Error, { sessionID: input.sessionID, error }) + } else { + const retry = SessionRetry.retryable(error) + if (retry !== undefined) { + attempt++ + const delay = SessionRetry.delay(attempt, error.name === "APIError" ? error : undefined) + yield* Effect.promise(() => + SessionStatus.set(input.sessionID, { + type: "retry", + attempt, + message: retry, + next: Date.now() + delay, + }), + ) + yield* Effect.promise(() => SessionRetry.sleep(delay, input.abort).catch(() => {})) + yield* loop + return } - }), - ), + input.assistantMessage.error = error + Bus.publish(Session.Event.Error, { + sessionID: input.assistantMessage.sessionID, + error: input.assistantMessage.error, + }) + yield* Effect.promise(() => SessionStatus.set(input.sessionID, { type: "idle" })) + } + }) + + const loop: Effect.Effect = consumeStream.pipe( + Effect.catchCause((cause) => { + const reason = cause.reasons[0] + const e = Cause.isDieReason(reason) ? reason.defect : Cause.isFailReason(reason) ? reason.error : cause + return errorHandler(e) + }), ) await Effect.runPromise(loop.pipe(Effect.ensuring(cleanupEffect(ctx)))) diff --git a/packages/opencode/test/session/compaction.test.ts b/packages/opencode/test/session/compaction.test.ts index 452926d12e1b..9d5f7eeb8adb 100644 --- a/packages/opencode/test/session/compaction.test.ts +++ b/packages/opencode/test/session/compaction.test.ts @@ -1,15 +1,30 @@ -import { describe, expect, test } from "bun:test" +import { afterEach, describe, expect, mock, spyOn, test } from "bun:test" import path from "path" +import { Bus } from "../../src/bus" import { SessionCompaction } from "../../src/session/compaction" import { Token } from "../../src/util/token" import { Instance } from "../../src/project/instance" import { Log } from "../../src/util/log" import { tmpdir } from "../fixture/fixture" import { Session } from "../../src/session" +import { MessageV2 } from "../../src/session/message-v2" +import { MessageID, PartID, SessionID } from "../../src/session/schema" +import { ModelID, ProviderID } from "../../src/provider/schema" import type { Provider } from "../../src/provider/provider" +import * as ProviderModule from "../../src/provider/provider" +import * as SessionProcessorModule from "../../src/session/processor" Log.init({ print: false }) +const ref = { + providerID: ProviderID.make("test"), + modelID: ModelID.make("test-model"), +} + +afterEach(() => { + mock.restore() +}) + function createModel(opts: { context: number output: number @@ -40,6 +55,105 @@ function createModel(opts: { } as Provider.Model } +async function user(sessionID: SessionID, text: string) { + const msg = await Session.updateMessage({ + id: MessageID.ascending(), + role: "user", + sessionID, + agent: "build", + model: ref, + time: { created: Date.now() }, + }) + await Session.updatePart({ + id: PartID.ascending(), + messageID: msg.id, + sessionID, + type: "text", + text, + }) + return msg +} + +async function assistant(sessionID: SessionID, parentID: MessageID, root: string) { + const msg: MessageV2.Assistant = { + id: MessageID.ascending(), + role: "assistant", + sessionID, + mode: "build", + agent: "build", + path: { cwd: root, root }, + cost: 0, + tokens: { + output: 0, + input: 0, + reasoning: 0, + cache: { read: 0, write: 0 }, + }, + modelID: ref.modelID, + providerID: ref.providerID, + parentID, + time: { created: Date.now() }, + finish: "end_turn", + } + await Session.updateMessage(msg) + return msg +} + +async function tool(sessionID: SessionID, messageID: MessageID, tool: string, output: string) { + return Session.updatePart({ + id: PartID.ascending(), + messageID, + sessionID, + type: "tool", + callID: crypto.randomUUID(), + tool, + state: { + status: "completed", + input: {}, + output, + title: "done", + metadata: {}, + time: { start: Date.now(), end: Date.now() }, + }, + }) +} + +function fake( + input: Parameters<(typeof SessionProcessorModule.SessionProcessor)["create"]>[0], + result: "continue" | "compact", +): ReturnType<(typeof SessionProcessorModule.SessionProcessor)["create"]> { + const msg = input.assistantMessage + return { + get message() { + return msg + }, + partFromToolCall() { + return { + id: PartID.ascending(), + messageID: msg.id, + sessionID: msg.sessionID, + type: "tool", + callID: "fake", + tool: "fake", + state: { status: "pending", input: {}, raw: "" }, + } + }, + process: async () => result, + } +} + +function wait(ms = 50) { + return new Promise((resolve) => setTimeout(resolve, ms)) +} + +function defer() { + let resolve!: () => void + const promise = new Promise((done) => { + resolve = done + }) + return { promise, resolve } +} + describe("session.compaction.isOverflow", () => { test("returns true when token count exceeds usable context", async () => { await using tmp = await tmpdir() @@ -227,6 +341,272 @@ describe("session.compaction.isOverflow", () => { }) }) +describe("session.compaction.create", () => { + test("creates a compaction user message and part", async () => { + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const session = await Session.create({}) + + await SessionCompaction.create({ + sessionID: session.id, + agent: "build", + model: ref, + auto: true, + overflow: true, + }) + + const msgs = await Session.messages({ sessionID: session.id }) + expect(msgs).toHaveLength(1) + expect(msgs[0].info.role).toBe("user") + expect(msgs[0].parts).toHaveLength(1) + expect(msgs[0].parts[0]).toMatchObject({ + type: "compaction", + auto: true, + overflow: true, + }) + }, + }) + }) +}) + +describe("session.compaction.prune", () => { + test("compacts old completed tool output", async () => { + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const session = await Session.create({}) + const a = await user(session.id, "first") + const b = await assistant(session.id, a.id, tmp.path) + await tool(session.id, b.id, "bash", "x".repeat(200_000)) + await user(session.id, "second") + await user(session.id, "third") + + await SessionCompaction.prune({ sessionID: session.id }) + + const msgs = await Session.messages({ sessionID: session.id }) + const part = msgs.flatMap((msg) => msg.parts).find((part) => part.type === "tool") + expect(part?.type).toBe("tool") + expect(part?.state.status).toBe("completed") + if (part?.type === "tool" && part.state.status === "completed") { + expect(part.state.time.compacted).toBeNumber() + } + }, + }) + }) + + test("skips protected skill tool output", async () => { + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const session = await Session.create({}) + const a = await user(session.id, "first") + const b = await assistant(session.id, a.id, tmp.path) + await tool(session.id, b.id, "skill", "x".repeat(200_000)) + await user(session.id, "second") + await user(session.id, "third") + + await SessionCompaction.prune({ sessionID: session.id }) + + const msgs = await Session.messages({ sessionID: session.id }) + const part = msgs.flatMap((msg) => msg.parts).find((part) => part.type === "tool") + expect(part?.type).toBe("tool") + if (part?.type === "tool" && part.state.status === "completed") { + expect(part.state.time.compacted).toBeUndefined() + } + }, + }) + }) +}) + +describe("session.compaction.process", () => { + test("publishes compacted event on continue", async () => { + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 })) + spyOn(SessionProcessorModule.SessionProcessor, "create").mockImplementation((input) => fake(input, "continue")) + + const session = await Session.create({}) + const msg = await user(session.id, "hello") + const msgs = await Session.messages({ sessionID: session.id }) + const done = defer() + let seen = false + const unsub = Bus.subscribe(SessionCompaction.Event.Compacted, (evt) => { + if (evt.properties.sessionID !== session.id) return + seen = true + done.resolve() + }) + + const result = await SessionCompaction.process({ + parentID: msg.id, + messages: msgs, + sessionID: session.id, + abort: new AbortController().signal, + auto: false, + }) + + await Promise.race([ + done.promise, + wait(500).then(() => { + throw new Error("timed out waiting for compacted event") + }), + ]) + unsub() + + expect(result).toBe("continue") + expect(seen).toBe(true) + }, + }) + }) + + test("marks summary message as errored on compact result", async () => { + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 })) + spyOn(SessionProcessorModule.SessionProcessor, "create").mockImplementation((input) => fake(input, "compact")) + + const session = await Session.create({}) + const msg = await user(session.id, "hello") + const result = await SessionCompaction.process({ + parentID: msg.id, + messages: await Session.messages({ sessionID: session.id }), + sessionID: session.id, + abort: new AbortController().signal, + auto: false, + }) + + const summary = (await Session.messages({ sessionID: session.id })).find( + (msg) => msg.info.role === "assistant" && msg.info.summary, + ) + + expect(result).toBe("stop") + expect(summary?.info.role).toBe("assistant") + if (summary?.info.role === "assistant") { + expect(summary.info.finish).toBe("error") + expect(JSON.stringify(summary.info.error)).toContain("Session too large to compact") + } + }, + }) + }) + + test("adds synthetic continue prompt when auto is enabled", async () => { + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 })) + spyOn(SessionProcessorModule.SessionProcessor, "create").mockImplementation((input) => fake(input, "continue")) + + const session = await Session.create({}) + const msg = await user(session.id, "hello") + + const result = await SessionCompaction.process({ + parentID: msg.id, + messages: await Session.messages({ sessionID: session.id }), + sessionID: session.id, + abort: new AbortController().signal, + auto: true, + }) + + const msgs = await Session.messages({ sessionID: session.id }) + const last = msgs.at(-1) + + expect(result).toBe("continue") + expect(last?.info.role).toBe("user") + expect(last?.parts[0]).toMatchObject({ + type: "text", + synthetic: true, + }) + if (last?.parts[0]?.type === "text") { + expect(last.parts[0].text).toContain("Continue if you have next steps") + } + }, + }) + }) + + test("replays the prior user turn on overflow when earlier context exists", async () => { + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 })) + spyOn(SessionProcessorModule.SessionProcessor, "create").mockImplementation((input) => fake(input, "continue")) + + const session = await Session.create({}) + await user(session.id, "root") + const replay = await user(session.id, "image") + await Session.updatePart({ + id: PartID.ascending(), + messageID: replay.id, + sessionID: session.id, + type: "file", + mime: "image/png", + filename: "cat.png", + url: "https://example.com/cat.png", + }) + const msg = await user(session.id, "current") + + const result = await SessionCompaction.process({ + parentID: msg.id, + messages: await Session.messages({ sessionID: session.id }), + sessionID: session.id, + abort: new AbortController().signal, + auto: true, + overflow: true, + }) + + const last = (await Session.messages({ sessionID: session.id })).at(-1) + + expect(result).toBe("continue") + expect(last?.info.role).toBe("user") + expect(last?.parts.some((part) => part.type === "file")).toBe(false) + expect( + last?.parts.some((part) => part.type === "text" && part.text.includes("Attached image/png: cat.png")), + ).toBe(true) + }, + }) + }) + + test("falls back to overflow guidance when no replayable turn exists", async () => { + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 })) + spyOn(SessionProcessorModule.SessionProcessor, "create").mockImplementation((input) => fake(input, "continue")) + + const session = await Session.create({}) + await user(session.id, "earlier") + const msg = await user(session.id, "current") + + const result = await SessionCompaction.process({ + parentID: msg.id, + messages: await Session.messages({ sessionID: session.id }), + sessionID: session.id, + abort: new AbortController().signal, + auto: true, + overflow: true, + }) + + const last = (await Session.messages({ sessionID: session.id })).at(-1) + + expect(result).toBe("continue") + expect(last?.info.role).toBe("user") + if (last?.parts[0]?.type === "text") { + expect(last.parts[0].text).toContain("previous request exceeded the provider's size limit") + } + }, + }) + }) +}) + describe("util.token.estimate", () => { test("estimates tokens from text (4 chars per token)", () => { const text = "x".repeat(4000) From c532d208c50a753b79f7889e19117f34e75685e4 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Fri, 27 Mar 2026 20:58:46 -0400 Subject: [PATCH 5/5] refactor(session): narrow compaction PR scope --- packages/opencode/specs/effect-migration.md | 21 +- packages/opencode/src/session/compaction.ts | 1 + packages/opencode/src/session/processor.ts | 762 ++++++++++---------- 3 files changed, 366 insertions(+), 418 deletions(-) diff --git a/packages/opencode/specs/effect-migration.md b/packages/opencode/specs/effect-migration.md index 7b128db0e170..f4acc6e52e0b 100644 --- a/packages/opencode/specs/effect-migration.md +++ b/packages/opencode/specs/effect-migration.md @@ -59,10 +59,9 @@ Rules: - Keep everything in one namespace, one file — no separate `service.ts` / `index.ts` split - `runPromise` goes inside the namespace (not exported unless tests need it) -- Public async facades are usually plain `async function`; use `fn(...)` when the facade also needs validation +- Facade functions are plain `async function` — no `fn()` wrappers - Use `Effect.fn("Namespace.method")` for all Effect functions (for tracing) - No `Layer.fresh` — InstanceState handles per-directory isolation -- Inside another Effect service, depend on `Foo.Service` directly instead of calling `Foo.get()`-style async facades ## Schema → Zod interop @@ -165,20 +164,6 @@ Prefer these first: - `Config` for effect-native configuration reads - `Clock` / `DateTime` for time reads inside effects -## Use collaborators directly - -Inside an effectified service, prefer another service's `Service` + `layer` over its exported async facade. - -- In the layer, `yield* Foo.Service` and provide `Foo.layer` or `Foo.defaultLayer` -- Inside Effect code, call `yield* foo.method(...)` -- Do not call `Foo.method(...)` when that function is the `runPromise` async facade -- Keep the async facade for non-Effect callers and legacy boundaries - -```ts -const foo = yield * Foo.Service -const item = yield * foo.get(id) -``` - ## Child processes For child process work in services, yield `ChildProcessSpawner.ChildProcessSpawner` in the layer and use `ChildProcess.make(...)`. @@ -227,8 +212,8 @@ Fully migrated (single namespace, InstanceState where needed, flattened facade): Still open and likely worth migrating: -- [x] `Session` +- [ ] `Session` - [ ] `SessionProcessor` - [ ] `SessionPrompt` -- [x] `SessionCompaction` +- [ ] `SessionCompaction` - [ ] `Provider` diff --git a/packages/opencode/src/session/compaction.ts b/packages/opencode/src/session/compaction.ts index 608d1e53766f..f9ee5654903a 100644 --- a/packages/opencode/src/session/compaction.ts +++ b/packages/opencode/src/session/compaction.ts @@ -213,6 +213,7 @@ export namespace SessionCompaction { model, abort: input.abort, }) + // Allow plugins to inject context or replace compaction prompt. const compacting = yield* plugin.trigger( "experimental.session.compacting", { sessionID: input.sessionID }, diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 477bd4d58110..84ea76656857 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -16,8 +16,6 @@ import { Permission } from "@/permission" import { Question } from "@/question" import { PartID } from "./schema" import type { SessionID, MessageID } from "./schema" -import { Cause, Effect } from "effect" -import * as Stream from "effect/Stream" export namespace SessionProcessor { const DOOM_LOOP_THRESHOLD = 3 @@ -26,322 +24,6 @@ export namespace SessionProcessor { export type Info = Awaited> export type Result = Awaited> - interface ProcessorContext { - assistantMessage: MessageV2.Assistant - sessionID: SessionID - model: Provider.Model - abort: AbortSignal - toolcalls: Record - shouldBreak: boolean - snapshot: string | undefined - blocked: boolean - needsCompaction: boolean - currentText: MessageV2.TextPart | undefined - reasoningMap: Record - } - - type StreamResult = Awaited> - type StreamEvent = StreamResult["fullStream"] extends AsyncIterable ? T : never - - function handleEvent(value: StreamEvent, ctx: ProcessorContext) { - return Effect.promise(async () => { - switch (value.type) { - case "start": - await SessionStatus.set(ctx.sessionID, { type: "busy" }) - break - - case "reasoning-start": - if (value.id in ctx.reasoningMap) break - const reasoningPart = { - id: PartID.ascending(), - messageID: ctx.assistantMessage.id, - sessionID: ctx.assistantMessage.sessionID, - type: "reasoning" as const, - text: "", - time: { start: Date.now() }, - metadata: value.providerMetadata, - } - ctx.reasoningMap[value.id] = reasoningPart - await Session.updatePart(reasoningPart) - break - - case "reasoning-delta": - if (value.id in ctx.reasoningMap) { - const part = ctx.reasoningMap[value.id] - part.text += value.text - if (value.providerMetadata) part.metadata = value.providerMetadata - await Session.updatePartDelta({ - sessionID: part.sessionID, - messageID: part.messageID, - partID: part.id, - field: "text", - delta: value.text, - }) - } - break - - case "reasoning-end": - if (value.id in ctx.reasoningMap) { - const part = ctx.reasoningMap[value.id] - part.text = part.text.trimEnd() - part.time = { ...part.time, end: Date.now() } - if (value.providerMetadata) part.metadata = value.providerMetadata - await Session.updatePart(part) - delete ctx.reasoningMap[value.id] - } - break - - case "tool-input-start": { - const part = await Session.updatePart({ - id: ctx.toolcalls[value.id]?.id ?? PartID.ascending(), - messageID: ctx.assistantMessage.id, - sessionID: ctx.assistantMessage.sessionID, - type: "tool", - tool: value.toolName, - callID: value.id, - state: { status: "pending", input: {}, raw: "" }, - }) - ctx.toolcalls[value.id] = part as MessageV2.ToolPart - break - } - - case "tool-input-delta": - break - - case "tool-input-end": - break - - case "tool-call": { - const match = ctx.toolcalls[value.toolCallId] - if (match) { - const part = await Session.updatePart({ - ...match, - tool: value.toolName, - state: { status: "running", input: value.input, time: { start: Date.now() } }, - metadata: value.providerMetadata, - }) - ctx.toolcalls[value.toolCallId] = part as MessageV2.ToolPart - - const parts = await MessageV2.parts(ctx.assistantMessage.id) - const lastThree = parts.slice(-DOOM_LOOP_THRESHOLD) - - if ( - lastThree.length === DOOM_LOOP_THRESHOLD && - lastThree.every( - (p) => - p.type === "tool" && - p.tool === value.toolName && - p.state.status !== "pending" && - JSON.stringify(p.state.input) === JSON.stringify(value.input), - ) - ) { - const agent = await Agent.get(ctx.assistantMessage.agent) - await Permission.ask({ - permission: "doom_loop", - patterns: [value.toolName], - sessionID: ctx.assistantMessage.sessionID, - metadata: { tool: value.toolName, input: value.input }, - always: [value.toolName], - ruleset: agent.permission, - }) - } - } - break - } - - case "tool-result": { - const match = ctx.toolcalls[value.toolCallId] - if (match && match.state.status === "running") { - await Session.updatePart({ - ...match, - state: { - status: "completed", - input: value.input ?? match.state.input, - output: value.output.output, - metadata: value.output.metadata, - title: value.output.title, - time: { start: match.state.time.start, end: Date.now() }, - attachments: value.output.attachments, - }, - }) - delete ctx.toolcalls[value.toolCallId] - } - break - } - - case "tool-error": { - const match = ctx.toolcalls[value.toolCallId] - if (match && match.state.status === "running") { - await Session.updatePart({ - ...match, - state: { - status: "error", - input: value.input ?? match.state.input, - error: value.error instanceof Error ? value.error.message : String(value.error), - time: { start: match.state.time.start, end: Date.now() }, - }, - }) - if (value.error instanceof Permission.RejectedError || value.error instanceof Question.RejectedError) { - ctx.blocked = ctx.shouldBreak - } - delete ctx.toolcalls[value.toolCallId] - } - break - } - - case "error": - throw value.error - - case "start-step": - ctx.snapshot = await Snapshot.track() - await Session.updatePart({ - id: PartID.ascending(), - messageID: ctx.assistantMessage.id, - sessionID: ctx.sessionID, - snapshot: ctx.snapshot, - type: "step-start", - }) - break - - case "finish-step": { - const usage = Session.getUsage({ - model: ctx.model, - usage: value.usage, - metadata: value.providerMetadata, - }) - ctx.assistantMessage.finish = value.finishReason - ctx.assistantMessage.cost += usage.cost - ctx.assistantMessage.tokens = usage.tokens - await Session.updatePart({ - id: PartID.ascending(), - reason: value.finishReason, - snapshot: await Snapshot.track(), - messageID: ctx.assistantMessage.id, - sessionID: ctx.assistantMessage.sessionID, - type: "step-finish", - tokens: usage.tokens, - cost: usage.cost, - }) - await Session.updateMessage(ctx.assistantMessage) - if (ctx.snapshot) { - const patch = await Snapshot.patch(ctx.snapshot) - if (patch.files.length) { - await Session.updatePart({ - id: PartID.ascending(), - messageID: ctx.assistantMessage.id, - sessionID: ctx.sessionID, - type: "patch", - hash: patch.hash, - files: patch.files, - }) - } - ctx.snapshot = undefined - } - SessionSummary.summarize({ - sessionID: ctx.sessionID, - messageID: ctx.assistantMessage.parentID, - }) - if ( - !ctx.assistantMessage.summary && - (await SessionCompaction.isOverflow({ tokens: usage.tokens, model: ctx.model })) - ) { - ctx.needsCompaction = true - } - break - } - - case "text-start": - ctx.currentText = { - id: PartID.ascending(), - messageID: ctx.assistantMessage.id, - sessionID: ctx.assistantMessage.sessionID, - type: "text", - text: "", - time: { start: Date.now() }, - metadata: value.providerMetadata, - } - await Session.updatePart(ctx.currentText) - break - - case "text-delta": - if (ctx.currentText) { - ctx.currentText.text += value.text - if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata - await Session.updatePartDelta({ - sessionID: ctx.currentText.sessionID, - messageID: ctx.currentText.messageID, - partID: ctx.currentText.id, - field: "text", - delta: value.text, - }) - } - break - - case "text-end": - if (ctx.currentText) { - ctx.currentText.text = ctx.currentText.text.trimEnd() - const textOutput = await Plugin.trigger( - "experimental.text.complete", - { - sessionID: ctx.sessionID, - messageID: ctx.assistantMessage.id, - partID: ctx.currentText.id, - }, - { text: ctx.currentText.text }, - ) - ctx.currentText.text = textOutput.text - ctx.currentText.time = { start: Date.now(), end: Date.now() } - if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata - await Session.updatePart(ctx.currentText) - } - ctx.currentText = undefined - break - - case "finish": - break - - default: - log.info("unhandled", { ...value }) - break - } - }) - } - - function cleanupEffect(ctx: ProcessorContext) { - return Effect.promise(async () => { - if (ctx.snapshot) { - const patch = await Snapshot.patch(ctx.snapshot) - if (patch.files.length) { - await Session.updatePart({ - id: PartID.ascending(), - messageID: ctx.assistantMessage.id, - sessionID: ctx.sessionID, - type: "patch", - hash: patch.hash, - files: patch.files, - }) - } - ctx.snapshot = undefined - } - const parts = await MessageV2.parts(ctx.assistantMessage.id) - for (const part of parts) { - if (part.type === "tool" && part.state.status !== "completed" && part.state.status !== "error") { - await Session.updatePart({ - ...part, - state: { - ...part.state, - status: "error", - error: "Tool execution aborted", - time: { start: Date.now(), end: Date.now() }, - }, - }) - } - } - ctx.assistantMessage.time.completed = Date.now() - await Session.updateMessage(ctx.assistantMessage) - }) - } - export function create(input: { assistantMessage: MessageV2.Assistant sessionID: SessionID @@ -361,106 +43,386 @@ export namespace SessionProcessor { partFromToolCall(toolCallID: string) { return toolcalls[toolCallID] }, - async process(streamInput: LLM.StreamInput): Promise<"compact" | "stop" | "continue"> { + async process(streamInput: LLM.StreamInput) { log.info("process") needsCompaction = false const shouldBreak = (await Config.get()).experimental?.continue_loop_on_deny !== true - - const ctx: ProcessorContext = { - assistantMessage: input.assistantMessage, - sessionID: input.sessionID, - model: input.model, - abort: input.abort, - toolcalls, - shouldBreak, - get snapshot() { - return snapshot - }, - set snapshot(v) { - snapshot = v - }, - get blocked() { - return blocked - }, - set blocked(v) { - blocked = v - }, - get needsCompaction() { - return needsCompaction - }, - set needsCompaction(v) { - needsCompaction = v - }, - currentText: undefined, - reasoningMap: {}, - } - - const consumeStream = Effect.gen(function* () { - ctx.currentText = undefined - ctx.reasoningMap = {} - const stream = yield* Effect.promise(() => LLM.stream(streamInput)) - - yield* Stream.fromAsyncIterable(stream.fullStream, (e) => e).pipe( - Stream.runForEachWhile((event) => - Effect.gen(function* () { - input.abort.throwIfAborted() - yield* handleEvent(event, ctx) - return !needsCompaction - }), - ), - ) - }) - - const errorHandler = (e: unknown): Effect.Effect => - Effect.gen(function* () { - log.error("process", { error: e, stack: JSON.stringify((e as any)?.stack) }) - const error = MessageV2.fromError(e, { - providerID: input.model.providerID, - aborted: input.abort.aborted, + while (true) { + try { + let currentText: MessageV2.TextPart | undefined + let reasoningMap: Record = {} + const stream = await LLM.stream(streamInput) + + for await (const value of stream.fullStream) { + input.abort.throwIfAborted() + switch (value.type) { + case "start": + await SessionStatus.set(input.sessionID, { type: "busy" }) + break + + case "reasoning-start": + if (value.id in reasoningMap) { + continue + } + const reasoningPart = { + id: PartID.ascending(), + messageID: input.assistantMessage.id, + sessionID: input.assistantMessage.sessionID, + type: "reasoning" as const, + text: "", + time: { + start: Date.now(), + }, + metadata: value.providerMetadata, + } + reasoningMap[value.id] = reasoningPart + await Session.updatePart(reasoningPart) + break + + case "reasoning-delta": + if (value.id in reasoningMap) { + const part = reasoningMap[value.id] + part.text += value.text + if (value.providerMetadata) part.metadata = value.providerMetadata + await Session.updatePartDelta({ + sessionID: part.sessionID, + messageID: part.messageID, + partID: part.id, + field: "text", + delta: value.text, + }) + } + break + + case "reasoning-end": + if (value.id in reasoningMap) { + const part = reasoningMap[value.id] + part.text = part.text.trimEnd() + + part.time = { + ...part.time, + end: Date.now(), + } + if (value.providerMetadata) part.metadata = value.providerMetadata + await Session.updatePart(part) + delete reasoningMap[value.id] + } + break + + case "tool-input-start": + const part = await Session.updatePart({ + id: toolcalls[value.id]?.id ?? PartID.ascending(), + messageID: input.assistantMessage.id, + sessionID: input.assistantMessage.sessionID, + type: "tool", + tool: value.toolName, + callID: value.id, + state: { + status: "pending", + input: {}, + raw: "", + }, + }) + toolcalls[value.id] = part as MessageV2.ToolPart + break + + case "tool-input-delta": + break + + case "tool-input-end": + break + + case "tool-call": { + const match = toolcalls[value.toolCallId] + if (match) { + const part = await Session.updatePart({ + ...match, + tool: value.toolName, + state: { + status: "running", + input: value.input, + time: { + start: Date.now(), + }, + }, + metadata: value.providerMetadata, + }) + toolcalls[value.toolCallId] = part as MessageV2.ToolPart + + const parts = await MessageV2.parts(input.assistantMessage.id) + const lastThree = parts.slice(-DOOM_LOOP_THRESHOLD) + + if ( + lastThree.length === DOOM_LOOP_THRESHOLD && + lastThree.every( + (p) => + p.type === "tool" && + p.tool === value.toolName && + p.state.status !== "pending" && + JSON.stringify(p.state.input) === JSON.stringify(value.input), + ) + ) { + const agent = await Agent.get(input.assistantMessage.agent) + await Permission.ask({ + permission: "doom_loop", + patterns: [value.toolName], + sessionID: input.assistantMessage.sessionID, + metadata: { + tool: value.toolName, + input: value.input, + }, + always: [value.toolName], + ruleset: agent.permission, + }) + } + } + break + } + case "tool-result": { + const match = toolcalls[value.toolCallId] + if (match && match.state.status === "running") { + await Session.updatePart({ + ...match, + state: { + status: "completed", + input: value.input ?? match.state.input, + output: value.output.output, + metadata: value.output.metadata, + title: value.output.title, + time: { + start: match.state.time.start, + end: Date.now(), + }, + attachments: value.output.attachments, + }, + }) + + delete toolcalls[value.toolCallId] + } + break + } + + case "tool-error": { + const match = toolcalls[value.toolCallId] + if (match && match.state.status === "running") { + await Session.updatePart({ + ...match, + state: { + status: "error", + input: value.input ?? match.state.input, + error: value.error instanceof Error ? value.error.message : String(value.error), + time: { + start: match.state.time.start, + end: Date.now(), + }, + }, + }) + + if ( + value.error instanceof Permission.RejectedError || + value.error instanceof Question.RejectedError + ) { + blocked = shouldBreak + } + delete toolcalls[value.toolCallId] + } + break + } + case "error": + throw value.error + + case "start-step": + snapshot = await Snapshot.track() + await Session.updatePart({ + id: PartID.ascending(), + messageID: input.assistantMessage.id, + sessionID: input.sessionID, + snapshot, + type: "step-start", + }) + break + + case "finish-step": + const usage = Session.getUsage({ + model: input.model, + usage: value.usage, + metadata: value.providerMetadata, + }) + input.assistantMessage.finish = value.finishReason + input.assistantMessage.cost += usage.cost + input.assistantMessage.tokens = usage.tokens + await Session.updatePart({ + id: PartID.ascending(), + reason: value.finishReason, + snapshot: await Snapshot.track(), + messageID: input.assistantMessage.id, + sessionID: input.assistantMessage.sessionID, + type: "step-finish", + tokens: usage.tokens, + cost: usage.cost, + }) + await Session.updateMessage(input.assistantMessage) + if (snapshot) { + const patch = await Snapshot.patch(snapshot) + if (patch.files.length) { + await Session.updatePart({ + id: PartID.ascending(), + messageID: input.assistantMessage.id, + sessionID: input.sessionID, + type: "patch", + hash: patch.hash, + files: patch.files, + }) + } + snapshot = undefined + } + SessionSummary.summarize({ + sessionID: input.sessionID, + messageID: input.assistantMessage.parentID, + }) + if ( + !input.assistantMessage.summary && + (await SessionCompaction.isOverflow({ tokens: usage.tokens, model: input.model })) + ) { + needsCompaction = true + } + break + + case "text-start": + currentText = { + id: PartID.ascending(), + messageID: input.assistantMessage.id, + sessionID: input.assistantMessage.sessionID, + type: "text", + text: "", + time: { + start: Date.now(), + }, + metadata: value.providerMetadata, + } + await Session.updatePart(currentText) + break + + case "text-delta": + if (currentText) { + currentText.text += value.text + if (value.providerMetadata) currentText.metadata = value.providerMetadata + await Session.updatePartDelta({ + sessionID: currentText.sessionID, + messageID: currentText.messageID, + partID: currentText.id, + field: "text", + delta: value.text, + }) + } + break + + case "text-end": + if (currentText) { + currentText.text = currentText.text.trimEnd() + const textOutput = await Plugin.trigger( + "experimental.text.complete", + { + sessionID: input.sessionID, + messageID: input.assistantMessage.id, + partID: currentText.id, + }, + { text: currentText.text }, + ) + currentText.text = textOutput.text + currentText.time = { + start: Date.now(), + end: Date.now(), + } + if (value.providerMetadata) currentText.metadata = value.providerMetadata + await Session.updatePart(currentText) + } + currentText = undefined + break + + case "finish": + break + + default: + log.info("unhandled", { + ...value, + }) + continue + } + if (needsCompaction) break + } + } catch (e: any) { + log.error("process", { + error: e, + stack: JSON.stringify(e.stack), }) + const error = MessageV2.fromError(e, { providerID: input.model.providerID, aborted: input.abort.aborted }) if (MessageV2.ContextOverflowError.isInstance(error)) { needsCompaction = true - Bus.publish(Session.Event.Error, { sessionID: input.sessionID, error }) + Bus.publish(Session.Event.Error, { + sessionID: input.sessionID, + error, + }) } else { const retry = SessionRetry.retryable(error) if (retry !== undefined) { attempt++ const delay = SessionRetry.delay(attempt, error.name === "APIError" ? error : undefined) - yield* Effect.promise(() => - SessionStatus.set(input.sessionID, { - type: "retry", - attempt, - message: retry, - next: Date.now() + delay, - }), - ) - yield* Effect.promise(() => SessionRetry.sleep(delay, input.abort).catch(() => {})) - yield* loop - return + await SessionStatus.set(input.sessionID, { + type: "retry", + attempt, + message: retry, + next: Date.now() + delay, + }) + await SessionRetry.sleep(delay, input.abort).catch(() => {}) + continue } input.assistantMessage.error = error Bus.publish(Session.Event.Error, { sessionID: input.assistantMessage.sessionID, error: input.assistantMessage.error, }) - yield* Effect.promise(() => SessionStatus.set(input.sessionID, { type: "idle" })) + await SessionStatus.set(input.sessionID, { type: "idle" }) } - }) - - const loop: Effect.Effect = consumeStream.pipe( - Effect.catchCause((cause) => { - const reason = cause.reasons[0] - const e = Cause.isDieReason(reason) ? reason.defect : Cause.isFailReason(reason) ? reason.error : cause - return errorHandler(e) - }), - ) - - await Effect.runPromise(loop.pipe(Effect.ensuring(cleanupEffect(ctx)))) - - if (needsCompaction) return "compact" - if (blocked) return "stop" - if (input.assistantMessage.error) return "stop" - return "continue" + } + if (snapshot) { + const patch = await Snapshot.patch(snapshot) + if (patch.files.length) { + await Session.updatePart({ + id: PartID.ascending(), + messageID: input.assistantMessage.id, + sessionID: input.sessionID, + type: "patch", + hash: patch.hash, + files: patch.files, + }) + } + snapshot = undefined + } + const p = await MessageV2.parts(input.assistantMessage.id) + for (const part of p) { + if (part.type === "tool" && part.state.status !== "completed" && part.state.status !== "error") { + await Session.updatePart({ + ...part, + state: { + ...part.state, + status: "error", + error: "Tool execution aborted", + time: { + start: Date.now(), + end: Date.now(), + }, + }, + }) + } + } + input.assistantMessage.time.completed = Date.now() + await Session.updateMessage(input.assistantMessage) + if (needsCompaction) return "compact" + if (blocked) return "stop" + if (input.assistantMessage.error) return "stop" + return "continue" + } }, } return result