diff --git a/packages/opencode/src/effect/run-service.ts b/packages/opencode/src/effect/run-service.ts index 76248ca88f28..2daa29fde8f6 100644 --- a/packages/opencode/src/effect/run-service.ts +++ b/packages/opencode/src/effect/run-service.ts @@ -9,6 +9,8 @@ export function makeRuntime(service: ServiceMap.Service, layer: L return { runSync: (fn: (svc: S) => Effect.Effect) => getRuntime().runSync(service.use(fn)), + runPromiseExit: (fn: (svc: S) => Effect.Effect, options?: Effect.RunOptions) => + getRuntime().runPromiseExit(service.use(fn), options), runPromise: (fn: (svc: S) => Effect.Effect, options?: Effect.RunOptions) => getRuntime().runPromise(service.use(fn), options), runFork: (fn: (svc: S) => Effect.Effect) => getRuntime().runFork(service.use(fn)), diff --git a/packages/opencode/src/session/compaction.ts b/packages/opencode/src/session/compaction.ts index f9ee5654903a..223e71639cc8 100644 --- a/packages/opencode/src/session/compaction.ts +++ b/packages/opencode/src/session/compaction.ts @@ -14,10 +14,10 @@ import { Agent } from "@/agent/agent" import { Plugin } from "@/plugin" import { Config } from "@/config/config" import { NotFoundError } from "@/storage/db" -import { ProviderTransform } from "@/provider/transform" import { ModelID, ProviderID } from "@/provider/schema" -import { Effect, Layer, ServiceMap } from "effect" +import { Cause, Effect, Exit, Layer, ServiceMap } from "effect" import { makeRuntime } from "@/effect/run-service" +import { isOverflow as overflow } from "./overflow" export namespace SessionCompaction { const log = Log.create({ service: "session.compaction" }) @@ -31,7 +31,6 @@ export namespace SessionCompaction { ), } - const COMPACTION_BUFFER = 20_000 export const PRUNE_MINIMUM = 20_000 export const PRUNE_PROTECT = 40_000 const PRUNE_PROTECTED_TOOLS = ["skill"] @@ -64,7 +63,7 @@ export namespace SessionCompaction { export const layer: Layer.Layer< Service, never, - Bus.Service | Config.Service | Session.Service | Agent.Service | Plugin.Service + Bus.Service | Config.Service | Session.Service | Agent.Service | Plugin.Service | SessionProcessor.Service > = Layer.effect( Service, Effect.gen(function* () { @@ -73,26 +72,13 @@ export namespace SessionCompaction { const session = yield* Session.Service const agents = yield* Agent.Service const plugin = yield* Plugin.Service + const processors = yield* SessionProcessor.Service const isOverflow = Effect.fn("SessionCompaction.isOverflow")(function* (input: { tokens: MessageV2.Assistant["tokens"] model: Provider.Model }) { - const cfg = yield* config.get() - if (cfg.compaction?.auto === false) return false - const context = input.model.limit.context - if (context === 0) return false - - const count = - input.tokens.total || - input.tokens.input + input.tokens.output + input.tokens.cache.read + input.tokens.cache.write - - const reserved = - cfg.compaction?.reserved ?? Math.min(COMPACTION_BUFFER, ProviderTransform.maxOutputTokens(input.model)) - const usable = input.model.limit.input - ? input.model.limit.input - reserved - : context - ProviderTransform.maxOutputTokens(input.model) - return count >= usable + return overflow({ cfg: yield* config.get(), tokens: input.tokens, model: input.model }) }) // goes backwards through parts until there are PRUNE_PROTECT tokens worth of tool @@ -181,38 +167,6 @@ export namespace SessionCompaction { ? Provider.getModel(agent.model.providerID, agent.model.modelID) : Provider.getModel(userMessage.model.providerID, userMessage.model.modelID), ) - const msg = (yield* session.updateMessage({ - id: MessageID.ascending(), - role: "assistant", - parentID: input.parentID, - sessionID: input.sessionID, - mode: "compaction", - agent: "compaction", - variant: userMessage.variant, - summary: true, - path: { - cwd: Instance.directory, - root: Instance.worktree, - }, - cost: 0, - tokens: { - output: 0, - input: 0, - reasoning: 0, - cache: { read: 0, write: 0 }, - }, - modelID: model.id, - providerID: model.providerID, - time: { - created: Date.now(), - }, - })) as MessageV2.Assistant - const processor = SessionProcessor.create({ - assistantMessage: msg, - sessionID: input.sessionID, - model, - abort: input.abort, - }) // Allow plugins to inject context or replace compaction prompt. const compacting = yield* plugin.trigger( "experimental.session.compacting", @@ -251,8 +205,47 @@ When constructing the summary, try to stick to this template: const msgs = structuredClone(messages) yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs }) const modelMessages = yield* Effect.promise(() => MessageV2.toModelMessages(msgs, model, { stripMedia: true })) - const result = yield* Effect.promise(() => - processor.process({ + const msg = (yield* session.updateMessage({ + id: MessageID.ascending(), + role: "assistant", + parentID: input.parentID, + sessionID: input.sessionID, + mode: "compaction", + agent: "compaction", + variant: userMessage.variant, + summary: true, + path: { + cwd: Instance.directory, + root: Instance.worktree, + }, + cost: 0, + tokens: { + output: 0, + input: 0, + reasoning: 0, + cache: { read: 0, write: 0 }, + }, + modelID: model.id, + providerID: model.providerID, + time: { + created: Date.now(), + }, + })) as MessageV2.Assistant + const processor = yield* processors.create({ + assistantMessage: msg, + sessionID: input.sessionID, + model, + abort: input.abort, + }) + const cancel = Effect.fn("SessionCompaction.cancel")(function* () { + if (!input.abort.aborted || msg.time.completed) return + msg.error = msg.error ?? new MessageV2.AbortedError({ message: "Aborted" }).toObject() + msg.finish = msg.finish ?? "error" + msg.time.completed = Date.now() + yield* session.updateMessage(msg) + }) + const result = yield* processor + .process({ user: userMessage, agent, abort: input.abort, @@ -267,8 +260,8 @@ When constructing the summary, try to stick to this template: }, ], model, - }), - ) + }) + .pipe(Effect.ensuring(cancel())) if (result === "compact") { processor.message.error = new MessageV2.ContextOverflowError({ @@ -383,6 +376,7 @@ When constructing the summary, try to stick to this template: Effect.sync(() => layer.pipe( Layer.provide(Session.defaultLayer), + Layer.provide(SessionProcessor.defaultLayer), Layer.provide(Agent.defaultLayer), Layer.provide(Plugin.defaultLayer), Layer.provide(Bus.layer), @@ -391,7 +385,7 @@ When constructing the summary, try to stick to this template: ), ) - const { runPromise } = makeRuntime(Service, defaultLayer) + const { runPromise, runPromiseExit } = makeRuntime(Service, defaultLayer) export async function isOverflow(input: { tokens: MessageV2.Assistant["tokens"]; model: Provider.Model }) { return runPromise((svc) => svc.isOverflow(input)) @@ -409,7 +403,12 @@ When constructing the summary, try to stick to this template: auto: boolean overflow?: boolean }) { - return runPromise((svc) => svc.process(input)) + const exit = await runPromiseExit((svc) => svc.process(input), { signal: input.abort }) + if (Exit.isFailure(exit)) { + if (Cause.hasInterrupts(exit.cause) && input.abort.aborted) return "stop" + throw Cause.squash(exit.cause) + } + return exit.value } export const create = fn( diff --git a/packages/opencode/src/session/llm.ts b/packages/opencode/src/session/llm.ts index ed82ebc59219..7c67c1b3f626 100644 --- a/packages/opencode/src/session/llm.ts +++ b/packages/opencode/src/session/llm.ts @@ -1,5 +1,7 @@ import { Provider } from "@/provider/provider" import { Log } from "@/util/log" +import { Effect, Layer, ServiceMap } from "effect" +import * as Stream from "effect/Stream" import { streamText, wrapLanguageModel, type ModelMessage, type Tool, tool, jsonSchema } from "ai" import { mergeDeep, pipe } from "remeda" import { GitLabWorkflowLanguageModel } from "gitlab-ai-provider" @@ -34,6 +36,35 @@ export namespace LLM { toolChoice?: "auto" | "required" | "none" } + export type Event = Awaited>["fullStream"] extends AsyncIterable ? T : never + + export interface Interface { + readonly stream: (input: StreamInput) => Stream.Stream + } + + export class Service extends ServiceMap.Service()("@opencode/LLM") {} + + export const layer = Layer.effect( + Service, + Effect.gen(function* () { + return Service.of({ + stream(input) { + return Stream.unwrap( + Effect.promise(() => LLM.stream(input)).pipe( + Effect.map((result) => + Stream.fromAsyncIterable(result.fullStream, (err) => err).pipe( + Stream.mapEffect((event) => Effect.succeed(event)), + ), + ), + ), + ) + }, + }) + }), + ) + + export const defaultLayer = layer + export async function stream(input: StreamInput) { const l = log .clone() diff --git a/packages/opencode/src/session/overflow.ts b/packages/opencode/src/session/overflow.ts new file mode 100644 index 000000000000..f0e52565d81f --- /dev/null +++ b/packages/opencode/src/session/overflow.ts @@ -0,0 +1,22 @@ +import type { Config } from "@/config/config" +import type { Provider } from "@/provider/provider" +import { ProviderTransform } from "@/provider/transform" +import type { MessageV2 } from "./message-v2" + +const COMPACTION_BUFFER = 20_000 + +export function isOverflow(input: { cfg: Config.Info; tokens: MessageV2.Assistant["tokens"]; model: Provider.Model }) { + if (input.cfg.compaction?.auto === false) return false + const context = input.model.limit.context + if (context === 0) return false + + const count = + input.tokens.total || input.tokens.input + input.tokens.output + input.tokens.cache.read + input.tokens.cache.write + + const reserved = + input.cfg.compaction?.reserved ?? Math.min(COMPACTION_BUFFER, ProviderTransform.maxOutputTokens(input.model)) + const usable = input.model.limit.input + ? input.model.limit.input - reserved + : context - ProviderTransform.maxOutputTokens(input.model) + return count >= usable +} diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 84ea76656857..d2459cd8ba5a 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -1,430 +1,554 @@ -import { MessageV2 } from "./message-v2" -import { Log } from "@/util/log" -import { Session } from "." +import { Cause, Effect, Exit, Layer, ServiceMap } from "effect" +import * as Stream from "effect/Stream" import { Agent } from "@/agent/agent" -import { Snapshot } from "@/snapshot" -import { SessionSummary } from "./summary" import { Bus } from "@/bus" +import { makeRuntime } from "@/effect/run-service" +import { Config } from "@/config/config" +import { Permission } from "@/permission" +import { Plugin } from "@/plugin" +import { Snapshot } from "@/snapshot" +import { Log } from "@/util/log" +import { Session } from "." +import { LLM } from "./llm" +import { MessageV2 } from "./message-v2" +import { isOverflow } from "./overflow" +import { PartID } from "./schema" +import type { SessionID } from "./schema" import { SessionRetry } from "./retry" import { SessionStatus } from "./status" -import { Plugin } from "@/plugin" +import { SessionSummary } from "./summary" import type { Provider } from "@/provider/provider" -import { LLM } from "./llm" -import { Config } from "@/config/config" -import { SessionCompaction } from "./compaction" -import { Permission } from "@/permission" import { Question } from "@/question" -import { PartID } from "./schema" -import type { SessionID, MessageID } from "./schema" export namespace SessionProcessor { const DOOM_LOOP_THRESHOLD = 3 const log = Log.create({ service: "session.processor" }) - export type Info = Awaited> - export type Result = Awaited> + export type Result = "compact" | "stop" | "continue" + + export type Event = LLM.Event + + export interface Handle { + readonly message: MessageV2.Assistant + readonly partFromToolCall: (toolCallID: string) => MessageV2.ToolPart | undefined + readonly abort: () => Effect.Effect + readonly process: (streamInput: LLM.StreamInput) => Effect.Effect + } + + export interface Info { + readonly message: MessageV2.Assistant + readonly partFromToolCall: (toolCallID: string) => MessageV2.ToolPart | undefined + readonly process: (streamInput: LLM.StreamInput) => Promise + } - export function create(input: { + type Input = { assistantMessage: MessageV2.Assistant sessionID: SessionID model: Provider.Model abort: AbortSignal - }) { - const toolcalls: Record = {} - let snapshot: string | undefined - let blocked = false - let attempt = 0 - let needsCompaction = false - - const result = { - get message() { - return input.assistantMessage - }, - partFromToolCall(toolCallID: string) { - return toolcalls[toolCallID] - }, - async process(streamInput: LLM.StreamInput) { - log.info("process") - needsCompaction = false - const shouldBreak = (await Config.get()).experimental?.continue_loop_on_deny !== true - while (true) { - try { - let currentText: MessageV2.TextPart | undefined - let reasoningMap: Record = {} - const stream = await LLM.stream(streamInput) - - for await (const value of stream.fullStream) { - input.abort.throwIfAborted() - switch (value.type) { - case "start": - await SessionStatus.set(input.sessionID, { type: "busy" }) - break - - case "reasoning-start": - if (value.id in reasoningMap) { - continue - } - const reasoningPart = { - id: PartID.ascending(), - messageID: input.assistantMessage.id, - sessionID: input.assistantMessage.sessionID, - type: "reasoning" as const, - text: "", - time: { - start: Date.now(), - }, - metadata: value.providerMetadata, - } - reasoningMap[value.id] = reasoningPart - await Session.updatePart(reasoningPart) - break - - case "reasoning-delta": - if (value.id in reasoningMap) { - const part = reasoningMap[value.id] - part.text += value.text - if (value.providerMetadata) part.metadata = value.providerMetadata - await Session.updatePartDelta({ - sessionID: part.sessionID, - messageID: part.messageID, - partID: part.id, - field: "text", - delta: value.text, - }) - } - break - - case "reasoning-end": - if (value.id in reasoningMap) { - const part = reasoningMap[value.id] - part.text = part.text.trimEnd() - - part.time = { - ...part.time, - end: Date.now(), - } - if (value.providerMetadata) part.metadata = value.providerMetadata - await Session.updatePart(part) - delete reasoningMap[value.id] - } - break - - case "tool-input-start": - const part = await Session.updatePart({ - id: toolcalls[value.id]?.id ?? PartID.ascending(), - messageID: input.assistantMessage.id, - sessionID: input.assistantMessage.sessionID, - type: "tool", - tool: value.toolName, - callID: value.id, - state: { - status: "pending", - input: {}, - raw: "", - }, - }) - toolcalls[value.id] = part as MessageV2.ToolPart - break - - case "tool-input-delta": - break - - case "tool-input-end": - break - - case "tool-call": { - const match = toolcalls[value.toolCallId] - if (match) { - const part = await Session.updatePart({ - ...match, - tool: value.toolName, - state: { - status: "running", - input: value.input, - time: { - start: Date.now(), - }, - }, - metadata: value.providerMetadata, - }) - toolcalls[value.toolCallId] = part as MessageV2.ToolPart - - const parts = await MessageV2.parts(input.assistantMessage.id) - const lastThree = parts.slice(-DOOM_LOOP_THRESHOLD) - - if ( - lastThree.length === DOOM_LOOP_THRESHOLD && - lastThree.every( - (p) => - p.type === "tool" && - p.tool === value.toolName && - p.state.status !== "pending" && - JSON.stringify(p.state.input) === JSON.stringify(value.input), - ) - ) { - const agent = await Agent.get(input.assistantMessage.agent) - await Permission.ask({ - permission: "doom_loop", - patterns: [value.toolName], - sessionID: input.assistantMessage.sessionID, - metadata: { - tool: value.toolName, - input: value.input, - }, - always: [value.toolName], - ruleset: agent.permission, - }) - } - } - break - } - case "tool-result": { - const match = toolcalls[value.toolCallId] - if (match && match.state.status === "running") { - await Session.updatePart({ - ...match, - state: { - status: "completed", - input: value.input ?? match.state.input, - output: value.output.output, - metadata: value.output.metadata, - title: value.output.title, - time: { - start: match.state.time.start, - end: Date.now(), - }, - attachments: value.output.attachments, - }, - }) - - delete toolcalls[value.toolCallId] - } - break - } + } - case "tool-error": { - const match = toolcalls[value.toolCallId] - if (match && match.state.status === "running") { - await Session.updatePart({ - ...match, - state: { - status: "error", - input: value.input ?? match.state.input, - error: value.error instanceof Error ? value.error.message : String(value.error), - time: { - start: match.state.time.start, - end: Date.now(), - }, - }, - }) - - if ( - value.error instanceof Permission.RejectedError || - value.error instanceof Question.RejectedError - ) { - blocked = shouldBreak - } - delete toolcalls[value.toolCallId] - } - break - } - case "error": - throw value.error + export interface Interface { + readonly create: (input: Input) => Effect.Effect + } - case "start-step": - snapshot = await Snapshot.track() - await Session.updatePart({ - id: PartID.ascending(), - messageID: input.assistantMessage.id, - sessionID: input.sessionID, - snapshot, - type: "step-start", - }) - break + interface ProcessorContext extends Input { + toolcalls: Record + shouldBreak: boolean + snapshot: string | undefined + blocked: boolean + needsCompaction: boolean + currentText: MessageV2.TextPart | undefined + reasoningMap: Record + } - case "finish-step": - const usage = Session.getUsage({ - model: input.model, - usage: value.usage, - metadata: value.providerMetadata, - }) - input.assistantMessage.finish = value.finishReason - input.assistantMessage.cost += usage.cost - input.assistantMessage.tokens = usage.tokens - await Session.updatePart({ - id: PartID.ascending(), - reason: value.finishReason, - snapshot: await Snapshot.track(), - messageID: input.assistantMessage.id, - sessionID: input.assistantMessage.sessionID, - type: "step-finish", - tokens: usage.tokens, - cost: usage.cost, - }) - await Session.updateMessage(input.assistantMessage) - if (snapshot) { - const patch = await Snapshot.patch(snapshot) - if (patch.files.length) { - await Session.updatePart({ - id: PartID.ascending(), - messageID: input.assistantMessage.id, - sessionID: input.sessionID, - type: "patch", - hash: patch.hash, - files: patch.files, - }) - } - snapshot = undefined - } - SessionSummary.summarize({ - sessionID: input.sessionID, - messageID: input.assistantMessage.parentID, - }) - if ( - !input.assistantMessage.summary && - (await SessionCompaction.isOverflow({ tokens: usage.tokens, model: input.model })) - ) { - needsCompaction = true - } - break - - case "text-start": - currentText = { - id: PartID.ascending(), - messageID: input.assistantMessage.id, - sessionID: input.assistantMessage.sessionID, - type: "text", - text: "", - time: { - start: Date.now(), - }, - metadata: value.providerMetadata, - } - await Session.updatePart(currentText) - break - - case "text-delta": - if (currentText) { - currentText.text += value.text - if (value.providerMetadata) currentText.metadata = value.providerMetadata - await Session.updatePartDelta({ - sessionID: currentText.sessionID, - messageID: currentText.messageID, - partID: currentText.id, - field: "text", - delta: value.text, - }) - } - break - - case "text-end": - if (currentText) { - currentText.text = currentText.text.trimEnd() - const textOutput = await Plugin.trigger( - "experimental.text.complete", - { - sessionID: input.sessionID, - messageID: input.assistantMessage.id, - partID: currentText.id, - }, - { text: currentText.text }, - ) - currentText.text = textOutput.text - currentText.time = { - start: Date.now(), - end: Date.now(), - } - if (value.providerMetadata) currentText.metadata = value.providerMetadata - await Session.updatePart(currentText) - } - currentText = undefined - break - - case "finish": - break - - default: - log.info("unhandled", { - ...value, - }) - continue + type StreamEvent = Event + + export class Service extends ServiceMap.Service()("@opencode/SessionProcessor") {} + + export const layer: Layer.Layer< + Service, + never, + | Session.Service + | Config.Service + | Bus.Service + | Snapshot.Service + | Agent.Service + | LLM.Service + | Permission.Service + | Plugin.Service + | SessionStatus.Service + > = Layer.effect( + Service, + Effect.gen(function* () { + const session = yield* Session.Service + const config = yield* Config.Service + const bus = yield* Bus.Service + const snapshot = yield* Snapshot.Service + const agents = yield* Agent.Service + const llm = yield* LLM.Service + const permission = yield* Permission.Service + const plugin = yield* Plugin.Service + const status = yield* SessionStatus.Service + + const create = Effect.fn("SessionProcessor.create")(function* (input: Input) { + const ctx: ProcessorContext = { + assistantMessage: input.assistantMessage, + sessionID: input.sessionID, + model: input.model, + abort: input.abort, + toolcalls: {}, + shouldBreak: false, + snapshot: undefined, + blocked: false, + needsCompaction: false, + currentText: undefined, + reasoningMap: {}, + } + + const parse = (e: unknown) => + MessageV2.fromError(e, { + providerID: input.model.providerID, + aborted: input.abort.aborted, + }) + + const handleEvent = Effect.fn("SessionProcessor.handleEvent")(function* (value: StreamEvent) { + switch (value.type) { + case "start": + yield* status.set(ctx.sessionID, { type: "busy" }) + return + + case "reasoning-start": + if (value.id in ctx.reasoningMap) return + ctx.reasoningMap[value.id] = { + id: PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.assistantMessage.sessionID, + type: "reasoning", + text: "", + time: { start: Date.now() }, + metadata: value.providerMetadata, + } + yield* session.updatePart(ctx.reasoningMap[value.id]) + return + + case "reasoning-delta": + if (!(value.id in ctx.reasoningMap)) return + ctx.reasoningMap[value.id].text += value.text + if (value.providerMetadata) ctx.reasoningMap[value.id].metadata = value.providerMetadata + yield* session.updatePartDelta({ + sessionID: ctx.reasoningMap[value.id].sessionID, + messageID: ctx.reasoningMap[value.id].messageID, + partID: ctx.reasoningMap[value.id].id, + field: "text", + delta: value.text, + }) + return + + case "reasoning-end": + if (!(value.id in ctx.reasoningMap)) return + ctx.reasoningMap[value.id].text = ctx.reasoningMap[value.id].text.trimEnd() + ctx.reasoningMap[value.id].time = { ...ctx.reasoningMap[value.id].time, end: Date.now() } + if (value.providerMetadata) ctx.reasoningMap[value.id].metadata = value.providerMetadata + yield* session.updatePart(ctx.reasoningMap[value.id]) + delete ctx.reasoningMap[value.id] + return + + case "tool-input-start": + ctx.toolcalls[value.id] = (yield* session.updatePart({ + id: ctx.toolcalls[value.id]?.id ?? PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.assistantMessage.sessionID, + type: "tool", + tool: value.toolName, + callID: value.id, + state: { status: "pending", input: {}, raw: "" }, + })) as MessageV2.ToolPart + return + + case "tool-input-delta": + return + + case "tool-input-end": + return + + case "tool-call": { + const match = ctx.toolcalls[value.toolCallId] + if (!match) return + ctx.toolcalls[value.toolCallId] = (yield* session.updatePart({ + ...match, + tool: value.toolName, + state: { status: "running", input: value.input, time: { start: Date.now() } }, + metadata: value.providerMetadata, + })) as MessageV2.ToolPart + + const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id)) + const recentParts = parts.slice(-DOOM_LOOP_THRESHOLD) + + if ( + recentParts.length !== DOOM_LOOP_THRESHOLD || + !recentParts.every( + (part) => + part.type === "tool" && + part.tool === value.toolName && + part.state.status !== "pending" && + JSON.stringify(part.state.input) === JSON.stringify(value.input), + ) + ) { + return } - if (needsCompaction) break + + const agent = yield* agents.get(ctx.assistantMessage.agent) + yield* permission.ask({ + permission: "doom_loop", + patterns: [value.toolName], + sessionID: ctx.assistantMessage.sessionID, + metadata: { tool: value.toolName, input: value.input }, + always: [value.toolName], + ruleset: agent.permission, + }) + return } - } catch (e: any) { - log.error("process", { - error: e, - stack: JSON.stringify(e.stack), - }) - const error = MessageV2.fromError(e, { providerID: input.model.providerID, aborted: input.abort.aborted }) - if (MessageV2.ContextOverflowError.isInstance(error)) { - needsCompaction = true - Bus.publish(Session.Event.Error, { - sessionID: input.sessionID, - error, + + case "tool-result": { + const match = ctx.toolcalls[value.toolCallId] + if (!match || match.state.status !== "running") return + yield* session.updatePart({ + ...match, + state: { + status: "completed", + input: value.input ?? match.state.input, + output: value.output.output, + metadata: value.output.metadata, + title: value.output.title, + time: { start: match.state.time.start, end: Date.now() }, + attachments: value.output.attachments, + }, }) - } else { - const retry = SessionRetry.retryable(error) - if (retry !== undefined) { - attempt++ - const delay = SessionRetry.delay(attempt, error.name === "APIError" ? error : undefined) - await SessionStatus.set(input.sessionID, { - type: "retry", - attempt, - message: retry, - next: Date.now() + delay, - }) - await SessionRetry.sleep(delay, input.abort).catch(() => {}) - continue + delete ctx.toolcalls[value.toolCallId] + return + } + + case "tool-error": { + const match = ctx.toolcalls[value.toolCallId] + if (!match || match.state.status !== "running") return + yield* session.updatePart({ + ...match, + state: { + status: "error", + input: value.input ?? match.state.input, + error: value.error instanceof Error ? value.error.message : String(value.error), + time: { start: match.state.time.start, end: Date.now() }, + }, + }) + if (value.error instanceof Permission.RejectedError || value.error instanceof Question.RejectedError) { + ctx.blocked = ctx.shouldBreak } - input.assistantMessage.error = error - Bus.publish(Session.Event.Error, { - sessionID: input.assistantMessage.sessionID, - error: input.assistantMessage.error, + delete ctx.toolcalls[value.toolCallId] + return + } + + case "error": + throw value.error + + case "start-step": + ctx.snapshot = yield* snapshot.track() + yield* session.updatePart({ + id: PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.sessionID, + snapshot: ctx.snapshot, + type: "step-start", + }) + return + + case "finish-step": { + const usage = Session.getUsage({ + model: ctx.model, + usage: value.usage, + metadata: value.providerMetadata, }) - await SessionStatus.set(input.sessionID, { type: "idle" }) + ctx.assistantMessage.finish = value.finishReason + ctx.assistantMessage.cost += usage.cost + ctx.assistantMessage.tokens = usage.tokens + yield* session.updatePart({ + id: PartID.ascending(), + reason: value.finishReason, + snapshot: yield* snapshot.track(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.assistantMessage.sessionID, + type: "step-finish", + tokens: usage.tokens, + cost: usage.cost, + }) + yield* session.updateMessage(ctx.assistantMessage) + if (ctx.snapshot) { + const patch = yield* snapshot.patch(ctx.snapshot) + if (patch.files.length) { + yield* session.updatePart({ + id: PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.sessionID, + type: "patch", + hash: patch.hash, + files: patch.files, + }) + } + ctx.snapshot = undefined + } + yield* Effect.promise(() => + SessionSummary.summarize({ + sessionID: ctx.sessionID, + messageID: ctx.assistantMessage.parentID, + }), + ).pipe(Effect.ignoreCause({ log: true, message: "session summary failed" }), Effect.forkDetach) + if ( + !ctx.assistantMessage.summary && + isOverflow({ cfg: yield* config.get(), tokens: usage.tokens, model: ctx.model }) + ) { + ctx.needsCompaction = true + } + return } + + case "text-start": + ctx.currentText = { + id: PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.assistantMessage.sessionID, + type: "text", + text: "", + time: { start: Date.now() }, + metadata: value.providerMetadata, + } + yield* session.updatePart(ctx.currentText) + return + + case "text-delta": + if (!ctx.currentText) return + ctx.currentText.text += value.text + if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata + yield* session.updatePartDelta({ + sessionID: ctx.currentText.sessionID, + messageID: ctx.currentText.messageID, + partID: ctx.currentText.id, + field: "text", + delta: value.text, + }) + return + + case "text-end": + if (!ctx.currentText) return + ctx.currentText.text = ctx.currentText.text.trimEnd() + ctx.currentText.text = (yield* plugin.trigger( + "experimental.text.complete", + { + sessionID: ctx.sessionID, + messageID: ctx.assistantMessage.id, + partID: ctx.currentText.id, + }, + { text: ctx.currentText.text }, + )).text + ctx.currentText.time = { start: Date.now(), end: Date.now() } + if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata + yield* session.updatePart(ctx.currentText) + ctx.currentText = undefined + return + + case "finish": + return + + default: + log.info("unhandled", { ...value }) + return } - if (snapshot) { - const patch = await Snapshot.patch(snapshot) + }) + + const cleanup = Effect.fn("SessionProcessor.cleanup")(function* () { + if (ctx.snapshot) { + const patch = yield* snapshot.patch(ctx.snapshot) if (patch.files.length) { - await Session.updatePart({ + yield* session.updatePart({ id: PartID.ascending(), - messageID: input.assistantMessage.id, - sessionID: input.sessionID, + messageID: ctx.assistantMessage.id, + sessionID: ctx.sessionID, type: "patch", hash: patch.hash, files: patch.files, }) } - snapshot = undefined + ctx.snapshot = undefined } - const p = await MessageV2.parts(input.assistantMessage.id) - for (const part of p) { - if (part.type === "tool" && part.state.status !== "completed" && part.state.status !== "error") { - await Session.updatePart({ - ...part, - state: { - ...part.state, - status: "error", - error: "Tool execution aborted", - time: { - start: Date.now(), - end: Date.now(), - }, - }, - }) - } + + if (ctx.currentText) { + const end = Date.now() + ctx.currentText.time = { start: ctx.currentText.time?.start ?? end, end } + yield* session.updatePart(ctx.currentText) + ctx.currentText = undefined + } + + for (const part of Object.values(ctx.reasoningMap)) { + const end = Date.now() + yield* session.updatePart({ + ...part, + time: { start: part.time.start ?? end, end }, + }) } - input.assistantMessage.time.completed = Date.now() - await Session.updateMessage(input.assistantMessage) - if (needsCompaction) return "compact" - if (blocked) return "stop" - if (input.assistantMessage.error) return "stop" + ctx.reasoningMap = {} + + const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id)) + for (const part of parts) { + if (part.type !== "tool" || part.state.status === "completed" || part.state.status === "error") continue + yield* session.updatePart({ + ...part, + state: { + ...part.state, + status: "error", + error: "Tool execution aborted", + time: { start: Date.now(), end: Date.now() }, + }, + }) + } + ctx.assistantMessage.time.completed = Date.now() + yield* session.updateMessage(ctx.assistantMessage) + }) + + const halt = Effect.fn("SessionProcessor.halt")(function* (e: unknown) { + log.error("process", { error: e, stack: JSON.stringify((e as any)?.stack) }) + const error = parse(e) + if (MessageV2.ContextOverflowError.isInstance(error)) { + ctx.needsCompaction = true + yield* bus.publish(Session.Event.Error, { sessionID: ctx.sessionID, error }) + return + } + ctx.assistantMessage.error = error + yield* bus.publish(Session.Event.Error, { + sessionID: ctx.assistantMessage.sessionID, + error: ctx.assistantMessage.error, + }) + yield* status.set(ctx.sessionID, { type: "idle" }) + }) + + const process = Effect.fn("SessionProcessor.process")(function* (streamInput: LLM.StreamInput) { + log.info("process") + ctx.needsCompaction = false + ctx.shouldBreak = (yield* config.get()).experimental?.continue_loop_on_deny !== true + + yield* Effect.gen(function* () { + ctx.currentText = undefined + ctx.reasoningMap = {} + const stream = llm.stream(streamInput) + + yield* stream.pipe( + Stream.tap((event) => + Effect.gen(function* () { + input.abort.throwIfAborted() + yield* handleEvent(event) + }), + ), + Stream.takeUntil(() => ctx.needsCompaction), + Stream.runDrain, + ) + }).pipe( + Effect.catchCauseIf( + (cause) => !Cause.hasInterruptsOnly(cause), + (cause) => Effect.fail(Cause.squash(cause)), + ), + Effect.retry( + SessionRetry.policy({ + parse, + set: (info) => + status.set(ctx.sessionID, { + type: "retry", + attempt: info.attempt, + message: info.message, + next: info.next, + }), + }), + ), + Effect.catchCause((cause) => + Cause.hasInterruptsOnly(cause) + ? halt(new DOMException("Aborted", "AbortError")) + : halt(Cause.squash(cause)), + ), + Effect.ensuring(cleanup()), + ) + + if (input.abort.aborted && !ctx.assistantMessage.error) { + yield* abort() + } + if (ctx.needsCompaction) return "compact" + if (ctx.blocked || ctx.assistantMessage.error || input.abort.aborted) return "stop" return "continue" + }) + + const abort = Effect.fn("SessionProcessor.abort")(() => + Effect.gen(function* () { + if (!ctx.assistantMessage.error) { + yield* halt(new DOMException("Aborted", "AbortError")) + } + if (!ctx.assistantMessage.time.completed) { + yield* cleanup() + return + } + yield* session.updateMessage(ctx.assistantMessage) + }), + ) + + return { + get message() { + return ctx.assistantMessage + }, + partFromToolCall(toolCallID: string) { + return ctx.toolcalls[toolCallID] + }, + abort, + process, + } satisfies Handle + }) + + return Service.of({ create }) + }), + ) + + export const defaultLayer = Layer.unwrap( + Effect.sync(() => + layer.pipe( + Layer.provide(Session.defaultLayer), + Layer.provide(Snapshot.defaultLayer), + Layer.provide(Agent.defaultLayer), + Layer.provide(LLM.defaultLayer), + Layer.provide(Permission.layer), + Layer.provide(Plugin.defaultLayer), + Layer.provide(SessionStatus.layer.pipe(Layer.provide(Bus.layer))), + Layer.provide(Bus.layer), + Layer.provide(Config.defaultLayer), + ), + ), + ) + + const { runPromise } = makeRuntime(Service, defaultLayer) + + export async function create(input: Input): Promise { + const hit = await runPromise((svc) => svc.create(input)) + return { + get message() { + return hit.message + }, + partFromToolCall(toolCallID: string) { + return hit.partFromToolCall(toolCallID) + }, + async process(streamInput: LLM.StreamInput) { + const exit = await Effect.runPromiseExit(hit.process(streamInput), { signal: input.abort }) + if (Exit.isFailure(exit)) { + if (Cause.hasInterrupts(exit.cause) && input.abort.aborted) { + await Effect.runPromise(hit.abort()) + return "stop" + } + throw Cause.squash(exit.cause) } + return exit.value }, } - return result } } diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index dd74b83f50f2..acc9f635953c 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -594,7 +594,7 @@ export namespace SessionPrompt { session, }) - const processor = SessionProcessor.create({ + const processor = await SessionProcessor.create({ assistantMessage: (await Session.updateMessage({ id: MessageID.ascending(), parentID: lastUser.id, diff --git a/packages/opencode/src/session/retry.ts b/packages/opencode/src/session/retry.ts index 6d057f539f81..8ba48375bcfe 100644 --- a/packages/opencode/src/session/retry.ts +++ b/packages/opencode/src/session/retry.ts @@ -1,28 +1,18 @@ import type { NamedError } from "@opencode-ai/util/error" +import { Cause, Clock, Duration, Effect, Schedule } from "effect" import { MessageV2 } from "./message-v2" import { iife } from "@/util/iife" export namespace SessionRetry { + export type Err = ReturnType + export const RETRY_INITIAL_DELAY = 2000 export const RETRY_BACKOFF_FACTOR = 2 export const RETRY_MAX_DELAY_NO_HEADERS = 30_000 // 30 seconds export const RETRY_MAX_DELAY = 2_147_483_647 // max 32-bit signed integer for setTimeout - export async function sleep(ms: number, signal: AbortSignal): Promise { - return new Promise((resolve, reject) => { - const abortHandler = () => { - clearTimeout(timeout) - reject(new DOMException("Aborted", "AbortError")) - } - const timeout = setTimeout( - () => { - signal.removeEventListener("abort", abortHandler) - resolve() - }, - Math.min(ms, RETRY_MAX_DELAY), - ) - signal.addEventListener("abort", abortHandler, { once: true }) - }) + function cap(ms: number) { + return Math.min(ms, RETRY_MAX_DELAY) } export function delay(attempt: number, error?: MessageV2.APIError) { @@ -33,7 +23,7 @@ export namespace SessionRetry { if (retryAfterMs) { const parsedMs = Number.parseFloat(retryAfterMs) if (!Number.isNaN(parsedMs)) { - return parsedMs + return cap(parsedMs) } } @@ -42,23 +32,23 @@ export namespace SessionRetry { const parsedSeconds = Number.parseFloat(retryAfter) if (!Number.isNaN(parsedSeconds)) { // convert seconds to milliseconds - return Math.ceil(parsedSeconds * 1000) + return cap(Math.ceil(parsedSeconds * 1000)) } // Try parsing as HTTP date format const parsed = Date.parse(retryAfter) - Date.now() if (!Number.isNaN(parsed) && parsed > 0) { - return Math.ceil(parsed) + return cap(Math.ceil(parsed)) } } - return RETRY_INITIAL_DELAY * Math.pow(RETRY_BACKOFF_FACTOR, attempt - 1) + return cap(RETRY_INITIAL_DELAY * Math.pow(RETRY_BACKOFF_FACTOR, attempt - 1)) } } - return Math.min(RETRY_INITIAL_DELAY * Math.pow(RETRY_BACKOFF_FACTOR, attempt - 1), RETRY_MAX_DELAY_NO_HEADERS) + return cap(Math.min(RETRY_INITIAL_DELAY * Math.pow(RETRY_BACKOFF_FACTOR, attempt - 1), RETRY_MAX_DELAY_NO_HEADERS)) } - export function retryable(error: ReturnType) { + export function retryable(error: Err) { // context overflow errors should not be retried if (MessageV2.ContextOverflowError.isInstance(error)) return undefined if (MessageV2.APIError.isInstance(error)) { @@ -80,22 +70,37 @@ export namespace SessionRetry { return undefined } }) - try { - if (!json || typeof json !== "object") return undefined - const code = typeof json.code === "string" ? json.code : "" + if (!json || typeof json !== "object") return undefined + const code = typeof json.code === "string" ? json.code : "" - if (json.type === "error" && json.error?.type === "too_many_requests") { - return "Too Many Requests" - } - if (code.includes("exhausted") || code.includes("unavailable")) { - return "Provider is overloaded" - } - if (json.type === "error" && json.error?.code?.includes("rate_limit")) { - return "Rate Limited" - } - return JSON.stringify(json) - } catch { - return undefined + if (json.type === "error" && json.error?.type === "too_many_requests") { + return "Too Many Requests" + } + if (code.includes("exhausted") || code.includes("unavailable")) { + return "Provider is overloaded" + } + if (json.type === "error" && typeof json.error?.code === "string" && json.error.code.includes("rate_limit")) { + return "Rate Limited" } + return undefined + } + + export function policy(opts: { + parse: (error: unknown) => Err + set: (input: { attempt: number; message: string; next: number }) => Effect.Effect + }) { + return Schedule.fromStepWithMetadata( + Effect.succeed((meta: Schedule.InputMetadata) => { + const error = opts.parse(meta.input) + const message = retryable(error) + if (!message) return Cause.done(meta.attempt) + return Effect.gen(function* () { + const wait = delay(meta.attempt, MessageV2.APIError.isInstance(error) ? error : undefined) + const now = yield* Clock.currentTimeMillis + yield* opts.set({ attempt: meta.attempt, message, next: now + wait }) + return [meta.attempt, Duration.millis(wait)] as [number, Duration.Duration] + }) + }), + ) } } diff --git a/packages/opencode/test/session/compaction.test.ts b/packages/opencode/test/session/compaction.test.ts index 9d5f7eeb8adb..9c8559c35a2c 100644 --- a/packages/opencode/test/session/compaction.test.ts +++ b/packages/opencode/test/session/compaction.test.ts @@ -1,18 +1,28 @@ import { afterEach, describe, expect, mock, spyOn, test } from "bun:test" +import { APICallError } from "ai" +import { Cause, Effect, Exit, Layer, ManagedRuntime } from "effect" +import * as Stream from "effect/Stream" import path from "path" import { Bus } from "../../src/bus" +import { Config } from "../../src/config/config" +import { Agent } from "../../src/agent/agent" +import { LLM } from "../../src/session/llm" import { SessionCompaction } from "../../src/session/compaction" import { Token } from "../../src/util/token" import { Instance } from "../../src/project/instance" import { Log } from "../../src/util/log" +import { Permission } from "../../src/permission" +import { Plugin } from "../../src/plugin" import { tmpdir } from "../fixture/fixture" import { Session } from "../../src/session" import { MessageV2 } from "../../src/session/message-v2" import { MessageID, PartID, SessionID } from "../../src/session/schema" +import { SessionStatus } from "../../src/session/status" import { ModelID, ProviderID } from "../../src/provider/schema" import type { Provider } from "../../src/provider/provider" import * as ProviderModule from "../../src/provider/provider" import * as SessionProcessorModule from "../../src/session/processor" +import { Snapshot } from "../../src/snapshot" Log.init({ print: false }) @@ -121,12 +131,13 @@ async function tool(sessionID: SessionID, messageID: MessageID, tool: string, ou function fake( input: Parameters<(typeof SessionProcessorModule.SessionProcessor)["create"]>[0], result: "continue" | "compact", -): ReturnType<(typeof SessionProcessorModule.SessionProcessor)["create"]> { +) { const msg = input.assistantMessage return { get message() { return msg }, + abort: Effect.fn("TestSessionProcessor.abort")(() => Effect.void), partFromToolCall() { return { id: PartID.ascending(), @@ -138,10 +149,74 @@ function fake( state: { status: "pending", input: {}, raw: "" }, } }, - process: async () => result, + process: Effect.fn("TestSessionProcessor.process")(() => Effect.succeed(result)), + } satisfies SessionProcessorModule.SessionProcessor.Handle +} + +function layer(result: "continue" | "compact") { + return Layer.succeed( + SessionProcessorModule.SessionProcessor.Service, + SessionProcessorModule.SessionProcessor.Service.of({ + create: Effect.fn("TestSessionProcessor.create")((input) => Effect.succeed(fake(input, result))), + }), + ) +} + +function runtime(result: "continue" | "compact", plugin = Plugin.defaultLayer) { + const bus = Bus.layer + return ManagedRuntime.make( + Layer.mergeAll(SessionCompaction.layer, bus).pipe( + Layer.provide(Session.defaultLayer), + Layer.provide(layer(result)), + Layer.provide(Agent.defaultLayer), + Layer.provide(plugin), + Layer.provide(bus), + Layer.provide(Config.defaultLayer), + ), + ) +} + +function llm() { + const queue: Array< + Stream.Stream | ((input: LLM.StreamInput) => Stream.Stream) + > = [] + + return { + push(stream: Stream.Stream | ((input: LLM.StreamInput) => Stream.Stream)) { + queue.push(stream) + }, + layer: Layer.succeed( + LLM.Service, + LLM.Service.of({ + stream: (input) => { + const item = queue.shift() ?? Stream.empty + const stream = typeof item === "function" ? item(input) : item + return stream.pipe(Stream.mapEffect((event) => Effect.succeed(event))) + }, + }), + ), } } +function liveRuntime(layer: Layer.Layer) { + const bus = Bus.layer + const status = SessionStatus.layer.pipe(Layer.provide(bus)) + const processor = SessionProcessorModule.SessionProcessor.layer + return ManagedRuntime.make( + Layer.mergeAll(SessionCompaction.layer.pipe(Layer.provide(processor)), processor, bus, status).pipe( + Layer.provide(Session.defaultLayer), + Layer.provide(Snapshot.defaultLayer), + Layer.provide(layer), + Layer.provide(Permission.layer), + Layer.provide(Agent.defaultLayer), + Layer.provide(Plugin.defaultLayer), + Layer.provide(status), + Layer.provide(bus), + Layer.provide(Config.defaultLayer), + ), + ) +} + function wait(ms = 50) { return new Promise((resolve) => setTimeout(resolve, ms)) } @@ -154,6 +229,17 @@ function defer() { return { promise, resolve } } +function plugin(ready: ReturnType) { + return Layer.mock(Plugin.Service)({ + trigger: (name: Name, _input: Input, output: Output) => { + if (name !== "experimental.session.compacting") return Effect.succeed(output) + return Effect.sync(() => ready.resolve()).pipe(Effect.andThen(Effect.never), Effect.as(output)) + }, + list: () => Effect.succeed([]), + init: () => Effect.void, + }) +} + describe("session.compaction.isOverflow", () => { test("returns true when token count exceeds usable context", async () => { await using tmp = await tmpdir() @@ -429,37 +515,49 @@ describe("session.compaction.process", () => { directory: tmp.path, fn: async () => { spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 })) - spyOn(SessionProcessorModule.SessionProcessor, "create").mockImplementation((input) => fake(input, "continue")) const session = await Session.create({}) const msg = await user(session.id, "hello") const msgs = await Session.messages({ sessionID: session.id }) const done = defer() let seen = false - const unsub = Bus.subscribe(SessionCompaction.Event.Compacted, (evt) => { - if (evt.properties.sessionID !== session.id) return - seen = true - done.resolve() - }) - - const result = await SessionCompaction.process({ - parentID: msg.id, - messages: msgs, - sessionID: session.id, - abort: new AbortController().signal, - auto: false, - }) - - await Promise.race([ - done.promise, - wait(500).then(() => { - throw new Error("timed out waiting for compacted event") - }), - ]) - unsub() - - expect(result).toBe("continue") - expect(seen).toBe(true) + const rt = runtime("continue") + let unsub: (() => void) | undefined + try { + unsub = await rt.runPromise( + Bus.Service.use((svc) => + svc.subscribeCallback(SessionCompaction.Event.Compacted, (evt) => { + if (evt.properties.sessionID !== session.id) return + seen = true + done.resolve() + }), + ), + ) + + const result = await rt.runPromise( + SessionCompaction.Service.use((svc) => + svc.process({ + parentID: msg.id, + messages: msgs, + sessionID: session.id, + abort: new AbortController().signal, + auto: false, + }), + ), + ) + + await Promise.race([ + done.promise, + wait(500).then(() => { + throw new Error("timed out waiting for compacted event") + }), + ]) + expect(result).toBe("continue") + expect(seen).toBe(true) + } finally { + unsub?.() + await rt.dispose() + } }, }) }) @@ -470,27 +568,36 @@ describe("session.compaction.process", () => { directory: tmp.path, fn: async () => { spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 })) - spyOn(SessionProcessorModule.SessionProcessor, "create").mockImplementation((input) => fake(input, "compact")) const session = await Session.create({}) const msg = await user(session.id, "hello") - const result = await SessionCompaction.process({ - parentID: msg.id, - messages: await Session.messages({ sessionID: session.id }), - sessionID: session.id, - abort: new AbortController().signal, - auto: false, - }) - - const summary = (await Session.messages({ sessionID: session.id })).find( - (msg) => msg.info.role === "assistant" && msg.info.summary, - ) - - expect(result).toBe("stop") - expect(summary?.info.role).toBe("assistant") - if (summary?.info.role === "assistant") { - expect(summary.info.finish).toBe("error") - expect(JSON.stringify(summary.info.error)).toContain("Session too large to compact") + const rt = runtime("compact") + try { + const msgs = await Session.messages({ sessionID: session.id }) + const result = await rt.runPromise( + SessionCompaction.Service.use((svc) => + svc.process({ + parentID: msg.id, + messages: msgs, + sessionID: session.id, + abort: new AbortController().signal, + auto: false, + }), + ), + ) + + const summary = (await Session.messages({ sessionID: session.id })).find( + (msg) => msg.info.role === "assistant" && msg.info.summary, + ) + + expect(result).toBe("stop") + expect(summary?.info.role).toBe("assistant") + if (summary?.info.role === "assistant") { + expect(summary.info.finish).toBe("error") + expect(JSON.stringify(summary.info.error)).toContain("Session too large to compact") + } + } finally { + await rt.dispose() } }, }) @@ -502,30 +609,38 @@ describe("session.compaction.process", () => { directory: tmp.path, fn: async () => { spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 })) - spyOn(SessionProcessorModule.SessionProcessor, "create").mockImplementation((input) => fake(input, "continue")) const session = await Session.create({}) const msg = await user(session.id, "hello") - - const result = await SessionCompaction.process({ - parentID: msg.id, - messages: await Session.messages({ sessionID: session.id }), - sessionID: session.id, - abort: new AbortController().signal, - auto: true, - }) - - const msgs = await Session.messages({ sessionID: session.id }) - const last = msgs.at(-1) - - expect(result).toBe("continue") - expect(last?.info.role).toBe("user") - expect(last?.parts[0]).toMatchObject({ - type: "text", - synthetic: true, - }) - if (last?.parts[0]?.type === "text") { - expect(last.parts[0].text).toContain("Continue if you have next steps") + const rt = runtime("continue") + try { + const msgs = await Session.messages({ sessionID: session.id }) + const result = await rt.runPromise( + SessionCompaction.Service.use((svc) => + svc.process({ + parentID: msg.id, + messages: msgs, + sessionID: session.id, + abort: new AbortController().signal, + auto: true, + }), + ), + ) + + const all = await Session.messages({ sessionID: session.id }) + const last = all.at(-1) + + expect(result).toBe("continue") + expect(last?.info.role).toBe("user") + expect(last?.parts[0]).toMatchObject({ + type: "text", + synthetic: true, + }) + if (last?.parts[0]?.type === "text") { + expect(last.parts[0].text).toContain("Continue if you have next steps") + } + } finally { + await rt.dispose() } }, }) @@ -537,7 +652,6 @@ describe("session.compaction.process", () => { directory: tmp.path, fn: async () => { spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 })) - spyOn(SessionProcessorModule.SessionProcessor, "create").mockImplementation((input) => fake(input, "continue")) const session = await Session.create({}) await user(session.id, "root") @@ -552,24 +666,33 @@ describe("session.compaction.process", () => { url: "https://example.com/cat.png", }) const msg = await user(session.id, "current") - - const result = await SessionCompaction.process({ - parentID: msg.id, - messages: await Session.messages({ sessionID: session.id }), - sessionID: session.id, - abort: new AbortController().signal, - auto: true, - overflow: true, - }) - - const last = (await Session.messages({ sessionID: session.id })).at(-1) - - expect(result).toBe("continue") - expect(last?.info.role).toBe("user") - expect(last?.parts.some((part) => part.type === "file")).toBe(false) - expect( - last?.parts.some((part) => part.type === "text" && part.text.includes("Attached image/png: cat.png")), - ).toBe(true) + const rt = runtime("continue") + try { + const msgs = await Session.messages({ sessionID: session.id }) + const result = await rt.runPromise( + SessionCompaction.Service.use((svc) => + svc.process({ + parentID: msg.id, + messages: msgs, + sessionID: session.id, + abort: new AbortController().signal, + auto: true, + overflow: true, + }), + ), + ) + + const last = (await Session.messages({ sessionID: session.id })).at(-1) + + expect(result).toBe("continue") + expect(last?.info.role).toBe("user") + expect(last?.parts.some((part) => part.type === "file")).toBe(false) + expect( + last?.parts.some((part) => part.type === "text" && part.text.includes("Attached image/png: cat.png")), + ).toBe(true) + } finally { + await rt.dispose() + } }, }) }) @@ -580,27 +703,191 @@ describe("session.compaction.process", () => { directory: tmp.path, fn: async () => { spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 })) - spyOn(SessionProcessorModule.SessionProcessor, "create").mockImplementation((input) => fake(input, "continue")) const session = await Session.create({}) await user(session.id, "earlier") const msg = await user(session.id, "current") - const result = await SessionCompaction.process({ - parentID: msg.id, - messages: await Session.messages({ sessionID: session.id }), - sessionID: session.id, - abort: new AbortController().signal, - auto: true, - overflow: true, - }) + const rt = runtime("continue") + try { + const msgs = await Session.messages({ sessionID: session.id }) + const result = await rt.runPromise( + SessionCompaction.Service.use((svc) => + svc.process({ + parentID: msg.id, + messages: msgs, + sessionID: session.id, + abort: new AbortController().signal, + auto: true, + overflow: true, + }), + ), + ) + + const last = (await Session.messages({ sessionID: session.id })).at(-1) + + expect(result).toBe("continue") + expect(last?.info.role).toBe("user") + if (last?.parts[0]?.type === "text") { + expect(last.parts[0].text).toContain("previous request exceeded the provider's size limit") + } + } finally { + await rt.dispose() + } + }, + }) + }) + + test("stops quickly when aborted during retry backoff", async () => { + const stub = llm() + const ready = defer() + stub.push( + Stream.fromAsyncIterable( + { + async *[Symbol.asyncIterator]() { + yield { type: "start" } as LLM.Event + throw new APICallError({ + message: "boom", + url: "https://example.com/v1/chat/completions", + requestBodyValues: {}, + statusCode: 503, + responseHeaders: { "retry-after-ms": "10000" }, + responseBody: '{"error":"boom"}', + isRetryable: true, + }) + }, + }, + (err) => err, + ), + ) + + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 })) + + const session = await Session.create({}) + const msg = await user(session.id, "hello") + const msgs = await Session.messages({ sessionID: session.id }) + const abort = new AbortController() + const rt = liveRuntime(stub.layer) + let off: (() => void) | undefined + let run: Promise<"continue" | "stop"> | undefined + try { + off = await rt.runPromise( + Bus.Service.use((svc) => + svc.subscribeCallback(SessionStatus.Event.Status, (evt) => { + if (evt.properties.sessionID !== session.id) return + if (evt.properties.status.type !== "retry") return + ready.resolve() + }), + ), + ) + + run = rt + .runPromiseExit( + SessionCompaction.Service.use((svc) => + svc.process({ + parentID: msg.id, + messages: msgs, + sessionID: session.id, + abort: abort.signal, + auto: false, + }), + ), + { signal: abort.signal }, + ) + .then((exit) => { + if (Exit.isFailure(exit)) { + if (Cause.hasInterrupts(exit.cause) && abort.signal.aborted) return "stop" + throw Cause.squash(exit.cause) + } + return exit.value + }) + + await Promise.race([ + ready.promise, + wait(1000).then(() => { + throw new Error("timed out waiting for retry status") + }), + ]) + + const start = Date.now() + abort.abort() + const result = await Promise.race([ + run.then((value) => ({ kind: "done" as const, value, ms: Date.now() - start })), + wait(250).then(() => ({ kind: "timeout" as const })), + ]) + + expect(result.kind).toBe("done") + if (result.kind === "done") { + expect(result.value).toBe("stop") + expect(result.ms).toBeLessThan(250) + } + } finally { + off?.() + abort.abort() + await rt.dispose() + await run?.catch(() => undefined) + } + }, + }) + }) + + test("does not leave a summary assistant when aborted before processor setup", async () => { + const ready = defer() - const last = (await Session.messages({ sessionID: session.id })).at(-1) + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 })) - expect(result).toBe("continue") - expect(last?.info.role).toBe("user") - if (last?.parts[0]?.type === "text") { - expect(last.parts[0].text).toContain("previous request exceeded the provider's size limit") + const session = await Session.create({}) + const msg = await user(session.id, "hello") + const msgs = await Session.messages({ sessionID: session.id }) + const abort = new AbortController() + const rt = runtime("continue", plugin(ready)) + let run: Promise<"continue" | "stop"> | undefined + try { + run = rt + .runPromiseExit( + SessionCompaction.Service.use((svc) => + svc.process({ + parentID: msg.id, + messages: msgs, + sessionID: session.id, + abort: abort.signal, + auto: false, + }), + ), + { signal: abort.signal }, + ) + .then((exit) => { + if (Exit.isFailure(exit)) { + if (Cause.hasInterrupts(exit.cause) && abort.signal.aborted) return "stop" + throw Cause.squash(exit.cause) + } + return exit.value + }) + + await Promise.race([ + ready.promise, + wait(1000).then(() => { + throw new Error("timed out waiting for compaction hook") + }), + ]) + + abort.abort() + expect(await run).toBe("stop") + + const all = await Session.messages({ sessionID: session.id }) + expect(all.some((msg) => msg.info.role === "assistant" && msg.info.summary)).toBe(false) + } finally { + abort.abort() + await rt.dispose() + await run?.catch(() => undefined) } }, }) diff --git a/packages/opencode/test/session/processor-effect.test.ts b/packages/opencode/test/session/processor-effect.test.ts new file mode 100644 index 000000000000..cd9d97e15fdd --- /dev/null +++ b/packages/opencode/test/session/processor-effect.test.ts @@ -0,0 +1,838 @@ +import { NodeFileSystem } from "@effect/platform-node" +import { expect } from "bun:test" +import { APICallError } from "ai" +import { Effect, Layer, ServiceMap } from "effect" +import * as Stream from "effect/Stream" +import path from "path" +import type { Agent } from "../../src/agent/agent" +import { Agent as AgentSvc } from "../../src/agent/agent" +import { Bus } from "../../src/bus" +import { Config } from "../../src/config/config" +import { Permission } from "../../src/permission" +import { Plugin } from "../../src/plugin" +import { Instance } from "../../src/project/instance" +import type { Provider } from "../../src/provider/provider" +import { ModelID, ProviderID } from "../../src/provider/schema" +import { Session } from "../../src/session" +import { LLM } from "../../src/session/llm" +import { MessageV2 } from "../../src/session/message-v2" +import { SessionProcessor } from "../../src/session/processor" +import { MessageID, PartID, SessionID } from "../../src/session/schema" +import { SessionStatus } from "../../src/session/status" +import { Snapshot } from "../../src/snapshot" +import { Log } from "../../src/util/log" +import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner" +import { provideTmpdirInstance } from "../fixture/fixture" +import { testEffect } from "../lib/effect" + +Log.init({ print: false }) + +const ref = { + providerID: ProviderID.make("test"), + modelID: ModelID.make("test-model"), +} + +type Script = Stream.Stream | ((input: LLM.StreamInput) => Stream.Stream) + +class TestLLM extends ServiceMap.Service< + TestLLM, + { + readonly push: (stream: Script) => Effect.Effect + readonly reply: (...items: LLM.Event[]) => Effect.Effect + readonly calls: Effect.Effect + readonly inputs: Effect.Effect + } +>()("@test/SessionProcessorLLM") {} + +function stream(...items: LLM.Event[]) { + return Stream.make(...items) +} + +function usage(input = 1, output = 1, total = input + output) { + return { + inputTokens: input, + outputTokens: output, + totalTokens: total, + inputTokenDetails: { + noCacheTokens: undefined, + cacheReadTokens: undefined, + cacheWriteTokens: undefined, + }, + outputTokenDetails: { + textTokens: undefined, + reasoningTokens: undefined, + }, + } +} + +function start(): LLM.Event { + return { type: "start" } +} + +function textStart(id = "t"): LLM.Event { + return { type: "text-start", id } +} + +function textDelta(id: string, text: string): LLM.Event { + return { type: "text-delta", id, text } +} + +function textEnd(id = "t"): LLM.Event { + return { type: "text-end", id } +} + +function reasoningStart(id: string): LLM.Event { + return { type: "reasoning-start", id } +} + +function reasoningDelta(id: string, text: string): LLM.Event { + return { type: "reasoning-delta", id, text } +} + +function reasoningEnd(id: string): LLM.Event { + return { type: "reasoning-end", id } +} + +function finishStep(): LLM.Event { + return { + type: "finish-step", + finishReason: "stop", + rawFinishReason: "stop", + response: { id: "res", modelId: "test-model", timestamp: new Date() }, + providerMetadata: undefined, + usage: usage(), + } +} + +function finish(): LLM.Event { + return { type: "finish", finishReason: "stop", rawFinishReason: "stop", totalUsage: usage() } +} + +function toolInputStart(id: string, toolName: string): LLM.Event { + return { type: "tool-input-start", id, toolName } +} + +function toolCall(toolCallId: string, toolName: string, input: unknown): LLM.Event { + return { type: "tool-call", toolCallId, toolName, input } +} + +function fail(err: E, ...items: LLM.Event[]) { + return stream(...items).pipe(Stream.concat(Stream.fail(err))) +} + +function wait(abort: AbortSignal) { + return Effect.promise( + () => + new Promise((done) => { + abort.addEventListener("abort", () => done(), { once: true }) + }), + ) +} + +function hang(input: LLM.StreamInput, ...items: LLM.Event[]) { + return stream(...items).pipe( + Stream.concat( + Stream.unwrap(wait(input.abort).pipe(Effect.as(Stream.fail(new DOMException("Aborted", "AbortError"))))), + ), + ) +} + +function model(context: number): Provider.Model { + return { + id: "test-model", + providerID: "test", + name: "Test", + limit: { context, output: 10 }, + cost: { input: 0, output: 0, cache: { read: 0, write: 0 } }, + capabilities: { + toolcall: true, + attachment: false, + reasoning: false, + temperature: true, + input: { text: true, image: false, audio: false, video: false }, + output: { text: true, image: false, audio: false, video: false }, + }, + api: { npm: "@ai-sdk/anthropic" }, + options: {}, + } as Provider.Model +} + +function agent(): Agent.Info { + return { + name: "build", + mode: "primary", + options: {}, + permission: [{ permission: "*", pattern: "*", action: "allow" }], + } +} + +function defer() { + let resolve!: (value: T | PromiseLike) => void + const promise = new Promise((done) => { + resolve = done + }) + return { promise, resolve } +} + +const user = Effect.fn("TestSession.user")(function* (sessionID: SessionID, text: string) { + const session = yield* Session.Service + const msg = yield* session.updateMessage({ + id: MessageID.ascending(), + role: "user", + sessionID, + agent: "build", + model: ref, + time: { created: Date.now() }, + }) + yield* session.updatePart({ + id: PartID.ascending(), + messageID: msg.id, + sessionID, + type: "text", + text, + }) + return msg +}) + +const assistant = Effect.fn("TestSession.assistant")(function* ( + sessionID: SessionID, + parentID: MessageID, + root: string, +) { + const session = yield* Session.Service + const msg: MessageV2.Assistant = { + id: MessageID.ascending(), + role: "assistant", + sessionID, + mode: "build", + agent: "build", + path: { cwd: root, root }, + cost: 0, + tokens: { + total: 0, + input: 0, + output: 0, + reasoning: 0, + cache: { read: 0, write: 0 }, + }, + modelID: ref.modelID, + providerID: ref.providerID, + parentID, + time: { created: Date.now() }, + finish: "end_turn", + } + yield* session.updateMessage(msg) + return msg +}) + +const llm = Layer.unwrap( + Effect.gen(function* () { + const queue: Script[] = [] + const inputs: LLM.StreamInput[] = [] + let calls = 0 + + const push = Effect.fn("TestLLM.push")((item: Script) => { + queue.push(item) + return Effect.void + }) + + const reply = Effect.fn("TestLLM.reply")((...items: LLM.Event[]) => push(stream(...items))) + return Layer.mergeAll( + Layer.succeed( + LLM.Service, + LLM.Service.of({ + stream: (input) => { + calls += 1 + inputs.push(input) + const item = queue.shift() ?? Stream.empty + return typeof item === "function" ? item(input) : item + }, + }), + ), + Layer.succeed( + TestLLM, + TestLLM.of({ + push, + reply, + calls: Effect.sync(() => calls), + inputs: Effect.sync(() => [...inputs]), + }), + ), + ) + }), +) + +const status = SessionStatus.layer.pipe(Layer.provideMerge(Bus.layer)) +const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer) +const deps = Layer.mergeAll( + Session.defaultLayer, + Snapshot.defaultLayer, + AgentSvc.defaultLayer, + Permission.layer, + Plugin.defaultLayer, + Config.defaultLayer, + status, + llm, +).pipe(Layer.provideMerge(infra)) +const env = SessionProcessor.layer.pipe(Layer.provideMerge(deps)) + +const it = testEffect(env) + +it.effect("session.processor effect tests capture llm input cleanly", () => { + return provideTmpdirInstance( + (dir) => + Effect.gen(function* () { + const test = yield* TestLLM + const processors = yield* SessionProcessor.Service + const session = yield* Session.Service + + yield* test.reply(start(), textStart(), textDelta("t", "hello"), textEnd(), finishStep(), finish()) + + const chat = yield* session.create({}) + const parent = yield* user(chat.id, "hi") + const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) + const abort = new AbortController() + const mdl = model(100) + const handle = yield* processors.create({ + assistantMessage: msg, + sessionID: chat.id, + model: mdl, + abort: abort.signal, + }) + + const input = { + user: { + id: parent.id, + sessionID: chat.id, + role: "user", + time: parent.time, + agent: parent.agent, + model: { providerID: ref.providerID, modelID: ref.modelID }, + } satisfies MessageV2.User, + sessionID: chat.id, + model: mdl, + agent: agent(), + system: [], + abort: abort.signal, + messages: [{ role: "user", content: "hi" }], + tools: {}, + } satisfies LLM.StreamInput + + const value = yield* handle.process(input) + const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) + const calls = yield* test.calls + const inputs = yield* test.inputs + + expect(value).toBe("continue") + expect(calls).toBe(1) + expect(inputs).toHaveLength(1) + expect(inputs[0].messages).toStrictEqual([{ role: "user", content: "hi" }]) + expect(parts.some((part) => part.type === "text" && part.text === "hello")).toBe(true) + }), + { git: true }, + ) +}) + +it.effect("session.processor effect tests stop after token overflow requests compaction", () => { + return provideTmpdirInstance( + (dir) => + Effect.gen(function* () { + const test = yield* TestLLM + const processors = yield* SessionProcessor.Service + const session = yield* Session.Service + + yield* test.reply( + start(), + { + type: "finish-step", + finishReason: "stop", + rawFinishReason: "stop", + response: { id: "res", modelId: "test-model", timestamp: new Date() }, + providerMetadata: undefined, + usage: usage(100, 0, 100), + }, + textStart(), + textDelta("t", "after"), + textEnd(), + ) + + const chat = yield* session.create({}) + const parent = yield* user(chat.id, "compact") + const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) + const abort = new AbortController() + const mdl = model(20) + const handle = yield* processors.create({ + assistantMessage: msg, + sessionID: chat.id, + model: mdl, + abort: abort.signal, + }) + + const value = yield* handle.process({ + user: { + id: parent.id, + sessionID: chat.id, + role: "user", + time: parent.time, + agent: parent.agent, + model: { providerID: ref.providerID, modelID: ref.modelID }, + } satisfies MessageV2.User, + sessionID: chat.id, + model: mdl, + agent: agent(), + system: [], + abort: abort.signal, + messages: [{ role: "user", content: "compact" }], + tools: {}, + }) + + const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) + + expect(value).toBe("compact") + expect(parts.some((part) => part.type === "text")).toBe(false) + expect(parts.some((part) => part.type === "step-finish")).toBe(true) + }), + { git: true }, + ) +}) + +it.effect("session.processor effect tests reset reasoning state across retries", () => { + return provideTmpdirInstance( + (dir) => + Effect.gen(function* () { + const test = yield* TestLLM + const processors = yield* SessionProcessor.Service + const session = yield* Session.Service + + yield* test.push( + fail( + new APICallError({ + message: "boom", + url: "https://example.com/v1/chat/completions", + requestBodyValues: {}, + statusCode: 503, + responseHeaders: { "retry-after-ms": "0" }, + responseBody: '{"error":"boom"}', + isRetryable: true, + }), + start(), + reasoningStart("r"), + reasoningDelta("r", "one"), + ), + ) + + yield* test.reply( + start(), + reasoningStart("r"), + reasoningDelta("r", "two"), + reasoningEnd("r"), + finishStep(), + finish(), + ) + + const chat = yield* session.create({}) + const parent = yield* user(chat.id, "reason") + const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) + const abort = new AbortController() + const mdl = model(100) + const handle = yield* processors.create({ + assistantMessage: msg, + sessionID: chat.id, + model: mdl, + abort: abort.signal, + }) + + const value = yield* handle.process({ + user: { + id: parent.id, + sessionID: chat.id, + role: "user", + time: parent.time, + agent: parent.agent, + model: { providerID: ref.providerID, modelID: ref.modelID }, + } satisfies MessageV2.User, + sessionID: chat.id, + model: mdl, + agent: agent(), + system: [], + abort: abort.signal, + messages: [{ role: "user", content: "reason" }], + tools: {}, + }) + + const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) + const reasoning = parts.filter((part): part is MessageV2.ReasoningPart => part.type === "reasoning") + + expect(value).toBe("continue") + expect(yield* test.calls).toBe(2) + expect(reasoning.some((part) => part.text === "two")).toBe(true) + expect(reasoning.some((part) => part.text === "onetwo")).toBe(false) + }), + { git: true }, + ) +}) + +it.effect("session.processor effect tests do not retry unknown json errors", () => { + return provideTmpdirInstance( + (dir) => + Effect.gen(function* () { + const test = yield* TestLLM + const processors = yield* SessionProcessor.Service + const session = yield* Session.Service + + yield* test.push(fail({ error: { message: "no_kv_space" } }, start())) + + const chat = yield* session.create({}) + const parent = yield* user(chat.id, "json") + const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) + const abort = new AbortController() + const mdl = model(100) + const handle = yield* processors.create({ + assistantMessage: msg, + sessionID: chat.id, + model: mdl, + abort: abort.signal, + }) + + const value = yield* handle.process({ + user: { + id: parent.id, + sessionID: chat.id, + role: "user", + time: parent.time, + agent: parent.agent, + model: { providerID: ref.providerID, modelID: ref.modelID }, + } satisfies MessageV2.User, + sessionID: chat.id, + model: mdl, + agent: agent(), + system: [], + abort: abort.signal, + messages: [{ role: "user", content: "json" }], + tools: {}, + }) + + expect(value).toBe("stop") + expect(yield* test.calls).toBe(1) + expect(yield* test.inputs).toHaveLength(1) + expect(handle.message.error?.name).toBe("UnknownError") + }), + { git: true }, + ) +}) + +it.effect("session.processor effect tests retry recognized structured json errors", () => { + return provideTmpdirInstance( + (dir) => + Effect.gen(function* () { + const test = yield* TestLLM + const processors = yield* SessionProcessor.Service + const session = yield* Session.Service + + yield* test.push(fail({ type: "error", error: { type: "too_many_requests" } }, start())) + yield* test.reply(start(), textStart(), textDelta("t", "after"), textEnd(), finishStep(), finish()) + + const chat = yield* session.create({}) + const parent = yield* user(chat.id, "retry json") + const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) + const abort = new AbortController() + const mdl = model(100) + const handle = yield* processors.create({ + assistantMessage: msg, + sessionID: chat.id, + model: mdl, + abort: abort.signal, + }) + + const value = yield* handle.process({ + user: { + id: parent.id, + sessionID: chat.id, + role: "user", + time: parent.time, + agent: parent.agent, + model: { providerID: ref.providerID, modelID: ref.modelID }, + } satisfies MessageV2.User, + sessionID: chat.id, + model: mdl, + agent: agent(), + system: [], + abort: abort.signal, + messages: [{ role: "user", content: "retry json" }], + tools: {}, + }) + + const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) + + expect(value).toBe("continue") + expect(yield* test.calls).toBe(2) + expect(parts.some((part) => part.type === "text" && part.text === "after")).toBe(true) + expect(handle.message.error).toBeUndefined() + }), + { git: true }, + ) +}) + +it.effect("session.processor effect tests publish retry status updates", () => { + return provideTmpdirInstance( + (dir) => + Effect.gen(function* () { + const test = yield* TestLLM + const processors = yield* SessionProcessor.Service + const session = yield* Session.Service + const bus = yield* Bus.Service + + yield* test.push( + fail( + new APICallError({ + message: "boom", + url: "https://example.com/v1/chat/completions", + requestBodyValues: {}, + statusCode: 503, + responseHeaders: { "retry-after-ms": "0" }, + responseBody: '{"error":"boom"}', + isRetryable: true, + }), + start(), + ), + ) + yield* test.reply(start(), finishStep(), finish()) + + const chat = yield* session.create({}) + const parent = yield* user(chat.id, "retry") + const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) + const abort = new AbortController() + const mdl = model(100) + const states: number[] = [] + const off = yield* bus.subscribeCallback(SessionStatus.Event.Status, (evt) => { + if (evt.properties.sessionID !== chat.id) return + if (evt.properties.status.type === "retry") states.push(evt.properties.status.attempt) + }) + const handle = yield* processors.create({ + assistantMessage: msg, + sessionID: chat.id, + model: mdl, + abort: abort.signal, + }) + + const value = yield* handle.process({ + user: { + id: parent.id, + sessionID: chat.id, + role: "user", + time: parent.time, + agent: parent.agent, + model: { providerID: ref.providerID, modelID: ref.modelID }, + } satisfies MessageV2.User, + sessionID: chat.id, + model: mdl, + agent: agent(), + system: [], + abort: abort.signal, + messages: [{ role: "user", content: "retry" }], + tools: {}, + }) + + off() + + expect(value).toBe("continue") + expect(yield* test.calls).toBe(2) + expect(states).toStrictEqual([1]) + }), + { git: true }, + ) +}) + +it.effect("session.processor effect tests compact on structured context overflow", () => { + return provideTmpdirInstance( + (dir) => + Effect.gen(function* () { + const test = yield* TestLLM + const processors = yield* SessionProcessor.Service + const session = yield* Session.Service + + yield* test.push(fail({ type: "error", error: { code: "context_length_exceeded" } }, start())) + + const chat = yield* session.create({}) + const parent = yield* user(chat.id, "compact json") + const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) + const abort = new AbortController() + const mdl = model(100) + const handle = yield* processors.create({ + assistantMessage: msg, + sessionID: chat.id, + model: mdl, + abort: abort.signal, + }) + + const value = yield* handle.process({ + user: { + id: parent.id, + sessionID: chat.id, + role: "user", + time: parent.time, + agent: parent.agent, + model: { providerID: ref.providerID, modelID: ref.modelID }, + } satisfies MessageV2.User, + sessionID: chat.id, + model: mdl, + agent: agent(), + system: [], + abort: abort.signal, + messages: [{ role: "user", content: "compact json" }], + tools: {}, + }) + + expect(value).toBe("compact") + expect(yield* test.calls).toBe(1) + expect(handle.message.error).toBeUndefined() + }), + { git: true }, + ) +}) + +it.effect("session.processor effect tests mark pending tools as aborted on cleanup", () => { + return provideTmpdirInstance( + (dir) => + Effect.gen(function* () { + const ready = defer() + const seen = defer() + const test = yield* TestLLM + const processors = yield* SessionProcessor.Service + const session = yield* Session.Service + + yield* test.push((input) => + hang(input, start(), toolInputStart("tool-1", "bash"), toolCall("tool-1", "bash", { cmd: "pwd" })).pipe( + Stream.tap((event) => (event.type === "tool-call" ? Effect.sync(() => ready.resolve()) : Effect.void)), + ), + ) + + const chat = yield* session.create({}) + const parent = yield* user(chat.id, "tool abort") + const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) + const abort = new AbortController() + const mdl = model(100) + const handle = yield* processors.create({ + assistantMessage: msg, + sessionID: chat.id, + model: mdl, + abort: abort.signal, + }) + + const run = Effect.runPromise( + handle.process({ + user: { + id: parent.id, + sessionID: chat.id, + role: "user", + time: parent.time, + agent: parent.agent, + model: { providerID: ref.providerID, modelID: ref.modelID }, + } satisfies MessageV2.User, + sessionID: chat.id, + model: mdl, + agent: agent(), + system: [], + abort: abort.signal, + messages: [{ role: "user", content: "tool abort" }], + tools: {}, + }), + ) + + yield* Effect.promise(() => ready.promise) + abort.abort() + + const value = yield* Effect.promise(() => run) + const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) + const tool = parts.find((part): part is MessageV2.ToolPart => part.type === "tool") + + expect(value).toBe("stop") + expect(yield* test.calls).toBe(1) + expect(tool?.state.status).toBe("error") + if (tool?.state.status === "error") { + expect(tool.state.error).toBe("Tool execution aborted") + expect(tool.state.time.end).toBeDefined() + } + }), + { git: true }, + ) +}) + +it.effect("session.processor effect tests record aborted errors and idle state", () => { + return provideTmpdirInstance( + (dir) => + Effect.gen(function* () { + const ready = defer() + const seen = defer() + const test = yield* TestLLM + const processors = yield* SessionProcessor.Service + const session = yield* Session.Service + const bus = yield* Bus.Service + const status = yield* SessionStatus.Service + + yield* test.push((input) => + hang(input, start()).pipe( + Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)), + ), + ) + + const chat = yield* session.create({}) + const parent = yield* user(chat.id, "abort") + const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) + const abort = new AbortController() + const mdl = model(100) + const errs: string[] = [] + const off = yield* bus.subscribeCallback(Session.Event.Error, (evt) => { + if (evt.properties.sessionID !== chat.id) return + if (!evt.properties.error) return + errs.push(evt.properties.error.name) + seen.resolve() + }) + const handle = yield* processors.create({ + assistantMessage: msg, + sessionID: chat.id, + model: mdl, + abort: abort.signal, + }) + + const run = Effect.runPromise( + handle.process({ + user: { + id: parent.id, + sessionID: chat.id, + role: "user", + time: parent.time, + agent: parent.agent, + model: { providerID: ref.providerID, modelID: ref.modelID }, + } satisfies MessageV2.User, + sessionID: chat.id, + model: mdl, + agent: agent(), + system: [], + abort: abort.signal, + messages: [{ role: "user", content: "abort" }], + tools: {}, + }), + ) + + yield* Effect.promise(() => ready.promise) + abort.abort() + + const value = yield* Effect.promise(() => run) + yield* Effect.promise(() => seen.promise) + const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id })) + const state = yield* status.get(chat.id) + off() + + expect(value).toBe("stop") + expect(handle.message.error?.name).toBe("MessageAbortedError") + expect(stored.info.role).toBe("assistant") + if (stored.info.role === "assistant") { + expect(stored.info.error?.name).toBe("MessageAbortedError") + } + expect(state).toMatchObject({ type: "idle" }) + expect(errs).toContain("MessageAbortedError") + }), + { git: true }, + ) +}) diff --git a/packages/opencode/test/session/prompt.test.ts b/packages/opencode/test/session/prompt.test.ts index 7d1d42905792..51d2e11941ae 100644 --- a/packages/opencode/test/session/prompt.test.ts +++ b/packages/opencode/test/session/prompt.test.ts @@ -12,6 +12,83 @@ import { tmpdir } from "../fixture/fixture" Log.init({ print: false }) +function defer() { + let resolve!: (value: T | PromiseLike) => void + const promise = new Promise((done) => { + resolve = done + }) + return { promise, resolve } +} + +function chat(text: string) { + const payload = + [ + `data: ${JSON.stringify({ + id: "chatcmpl-1", + object: "chat.completion.chunk", + choices: [{ delta: { role: "assistant" } }], + })}`, + `data: ${JSON.stringify({ + id: "chatcmpl-1", + object: "chat.completion.chunk", + choices: [{ delta: { content: text } }], + })}`, + `data: ${JSON.stringify({ + id: "chatcmpl-1", + object: "chat.completion.chunk", + choices: [{ delta: {}, finish_reason: "stop" }], + })}`, + "data: [DONE]", + ].join("\n\n") + "\n\n" + + const encoder = new TextEncoder() + return new ReadableStream({ + start(ctrl) { + ctrl.enqueue(encoder.encode(payload)) + ctrl.close() + }, + }) +} + +function hanging(ready: () => void) { + const encoder = new TextEncoder() + let timer: ReturnType | undefined + const first = + `data: ${JSON.stringify({ + id: "chatcmpl-1", + object: "chat.completion.chunk", + choices: [{ delta: { role: "assistant" } }], + })}` + "\n\n" + const rest = + [ + `data: ${JSON.stringify({ + id: "chatcmpl-1", + object: "chat.completion.chunk", + choices: [{ delta: { content: "late" } }], + })}`, + `data: ${JSON.stringify({ + id: "chatcmpl-1", + object: "chat.completion.chunk", + choices: [{ delta: {}, finish_reason: "stop" }], + })}`, + "data: [DONE]", + ].join("\n\n") + "\n\n" + + return new ReadableStream({ + start(ctrl) { + ctrl.enqueue(encoder.encode(first)) + ready() + timer = setTimeout(() => { + ctrl.enqueue(encoder.encode(rest)) + ctrl.close() + }, 10000) + }, + cancel() { + if (timer) clearTimeout(timer) + }, + }) +} + describe("session.prompt missing file", () => { test("does not fail the prompt when a file part is missing", async () => { await using tmp = await tmpdir({ @@ -149,6 +226,159 @@ describe("session.prompt special characters", () => { }) }) +describe("session.prompt regression", () => { + test("does not loop empty assistant turns for a simple reply", async () => { + let calls = 0 + const server = Bun.serve({ + port: 0, + fetch(req) { + const url = new URL(req.url) + if (!url.pathname.endsWith("/chat/completions")) { + return new Response("not found", { status: 404 }) + } + calls++ + return new Response(chat("packages/opencode/src/session/processor.ts"), { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }) + }, + }) + + try { + await using tmp = await tmpdir({ + git: true, + init: async (dir) => { + await Bun.write( + path.join(dir, "opencode.json"), + JSON.stringify({ + $schema: "https://opencode.ai/config.json", + enabled_providers: ["alibaba"], + provider: { + alibaba: { + options: { + apiKey: "test-key", + baseURL: `${server.url.origin}/v1`, + }, + }, + }, + agent: { + build: { + model: "alibaba/qwen-plus", + }, + }, + }), + ) + }, + }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const session = await Session.create({ title: "Prompt regression" }) + const result = await SessionPrompt.prompt({ + sessionID: session.id, + agent: "build", + parts: [{ type: "text", text: "Where is SessionProcessor?" }], + }) + + expect(result.info.role).toBe("assistant") + expect(result.parts.some((part) => part.type === "text" && part.text.includes("processor.ts"))).toBe(true) + + const msgs = await Session.messages({ sessionID: session.id }) + expect(msgs.filter((msg) => msg.info.role === "assistant")).toHaveLength(1) + expect(calls).toBe(1) + }, + }) + } finally { + server.stop(true) + } + }) + + test("records aborted errors when prompt is cancelled mid-stream", async () => { + const ready = defer() + const server = Bun.serve({ + port: 0, + fetch(req) { + const url = new URL(req.url) + if (!url.pathname.endsWith("/chat/completions")) { + return new Response("not found", { status: 404 }) + } + return new Response( + hanging(() => ready.resolve()), + { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }, + ) + }, + }) + + try { + await using tmp = await tmpdir({ + git: true, + init: async (dir) => { + await Bun.write( + path.join(dir, "opencode.json"), + JSON.stringify({ + $schema: "https://opencode.ai/config.json", + enabled_providers: ["alibaba"], + provider: { + alibaba: { + options: { + apiKey: "test-key", + baseURL: `${server.url.origin}/v1`, + }, + }, + }, + agent: { + build: { + model: "alibaba/qwen-plus", + }, + }, + }), + ) + }, + }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const session = await Session.create({ title: "Prompt cancel regression" }) + const run = SessionPrompt.prompt({ + sessionID: session.id, + agent: "build", + parts: [{ type: "text", text: "Cancel me" }], + }) + + await ready.promise + await SessionPrompt.cancel(session.id) + + const result = await Promise.race([ + run, + new Promise((_, reject) => + setTimeout(() => reject(new Error("timed out waiting for cancel")), 1000), + ), + ]) + + expect(result.info.role).toBe("assistant") + if (result.info.role === "assistant") { + expect(result.info.error?.name).toBe("MessageAbortedError") + } + + const msgs = await Session.messages({ sessionID: session.id }) + const last = msgs.findLast((msg) => msg.info.role === "assistant") + expect(last?.info.role).toBe("assistant") + if (last?.info.role === "assistant") { + expect(last.info.error?.name).toBe("MessageAbortedError") + } + }, + }) + } finally { + server.stop(true) + } + }) +}) + describe("session.prompt agent variant", () => { test("applies agent variant only when using agent model", async () => { const prev = process.env.OPENAI_API_KEY diff --git a/packages/opencode/test/session/retry.test.ts b/packages/opencode/test/session/retry.test.ts index a61c44262fad..dfeb7e9a40c4 100644 --- a/packages/opencode/test/session/retry.test.ts +++ b/packages/opencode/test/session/retry.test.ts @@ -2,9 +2,14 @@ import { describe, expect, test } from "bun:test" import type { NamedError } from "@opencode-ai/util/error" import { APICallError } from "ai" import { setTimeout as sleep } from "node:timers/promises" +import { Effect, Schedule } from "effect" import { SessionRetry } from "../../src/session/retry" import { MessageV2 } from "../../src/session/message-v2" import { ProviderID } from "../../src/provider/schema" +import { SessionID } from "../../src/session/schema" +import { SessionStatus } from "../../src/session/status" +import { Instance } from "../../src/project/instance" +import { tmpdir } from "../fixture/fixture" const providerID = ProviderID.make("test") @@ -69,24 +74,47 @@ describe("session.retry.delay", () => { expect(SessionRetry.delay(1, longError)).toBe(700000) }) - test("sleep caps delay to max 32-bit signed integer to avoid TimeoutOverflowWarning", async () => { - const controller = new AbortController() - - const warnings: string[] = [] - const originalWarn = process.emitWarning - process.emitWarning = (warning: string | Error) => { - warnings.push(typeof warning === "string" ? warning : warning.message) - } - - const promise = SessionRetry.sleep(2_560_914_000, controller.signal) - controller.abort() - - try { - await promise - } catch {} - - process.emitWarning = originalWarn - expect(warnings.some((w) => w.includes("TimeoutOverflowWarning"))).toBe(false) + test("caps oversized header delays to the runtime timer limit", () => { + const error = apiError({ "retry-after-ms": "999999999999" }) + expect(SessionRetry.delay(1, error)).toBe(SessionRetry.RETRY_MAX_DELAY) + }) + + test("policy updates retry status and increments attempts", async () => { + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const sessionID = SessionID.make("session-retry-test") + const error = apiError({ "retry-after-ms": "0" }) + + await Effect.runPromise( + Effect.gen(function* () { + const step = yield* Schedule.toStepWithMetadata( + SessionRetry.policy({ + parse: (err) => err as MessageV2.APIError, + set: (info) => + Effect.promise(() => + SessionStatus.set(sessionID, { + type: "retry", + attempt: info.attempt, + message: info.message, + next: info.next, + }), + ), + }), + ) + yield* step(error) + yield* step(error) + }), + ) + + expect(await SessionStatus.get(sessionID)).toMatchObject({ + type: "retry", + attempt: 2, + message: "boom", + }) + }, + }) }) }) @@ -101,9 +129,9 @@ describe("session.retry.retryable", () => { expect(SessionRetry.retryable(error)).toBe("Provider is overloaded") }) - test("handles json messages without code", () => { + test("does not retry unknown json messages", () => { const error = wrap(JSON.stringify({ error: { message: "no_kv_space" } })) - expect(SessionRetry.retryable(error)).toBe(`{"error":{"message":"no_kv_space"}}`) + expect(SessionRetry.retryable(error)).toBeUndefined() }) test("does not throw on numeric error codes", () => {