diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 225961aef05d..2e4d34bfcaf2 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -1,4 +1,4 @@ -import { Cause, Effect, Layer, ServiceMap } from "effect" +import { Cause, Deferred, Effect, Layer, ServiceMap } from "effect" import * as Stream from "effect/Stream" import { Agent } from "@/agent/agent" import { Bus } from "@/bus" @@ -18,6 +18,7 @@ import { SessionStatus } from "./status" import { SessionSummary } from "./summary" import type { Provider } from "@/provider/provider" import { Question } from "@/question" +import { errorMessage } from "@/util/error" import { isRecord } from "@/util/record" export namespace SessionProcessor { @@ -30,7 +31,19 @@ export namespace SessionProcessor { export interface Handle { readonly message: MessageV2.Assistant - readonly partFromToolCall: (toolCallID: string) => MessageV2.ToolPart | undefined + readonly updateToolCall: ( + toolCallID: string, + update: (part: MessageV2.ToolPart) => MessageV2.ToolPart, + ) => Effect.Effect + readonly completeToolCall: ( + toolCallID: string, + output: { + title: string + metadata: Record + output: string + attachments?: MessageV2.FilePart[] + }, + ) => Effect.Effect readonly process: (streamInput: LLM.StreamInput) => Effect.Effect } @@ -44,8 +57,15 @@ export namespace SessionProcessor { readonly create: (input: Input) => Effect.Effect } + type ToolCall = { + partID: MessageV2.ToolPart["id"] + messageID: MessageV2.ToolPart["messageID"] + sessionID: MessageV2.ToolPart["sessionID"] + done: Deferred.Deferred + } + interface ProcessorContext extends Input { - toolcalls: Record + toolcalls: Record shouldBreak: boolean snapshot: string | undefined blocked: boolean @@ -108,6 +128,88 @@ export namespace SessionProcessor { aborted, }) + const settleToolCall = Effect.fn("SessionProcessor.settleToolCall")(function* (toolCallID: string) { + const done = ctx.toolcalls[toolCallID]?.done + delete ctx.toolcalls[toolCallID] + if (done) yield* Deferred.succeed(done, undefined).pipe(Effect.ignore) + }) + + const readToolCall = Effect.fn("SessionProcessor.readToolCall")(function* (toolCallID: string) { + const call = ctx.toolcalls[toolCallID] + if (!call) return + const part = yield* session.getPart({ + partID: call.partID, + messageID: call.messageID, + sessionID: call.sessionID, + }) + if (!part || part.type !== "tool") { + delete ctx.toolcalls[toolCallID] + return + } + return { call, part } + }) + + const updateToolCall = Effect.fn("SessionProcessor.updateToolCall")(function* ( + toolCallID: string, + update: (part: MessageV2.ToolPart) => MessageV2.ToolPart, + ) { + const match = yield* readToolCall(toolCallID) + if (!match) return + const part = yield* session.updatePart(update(match.part)) + ctx.toolcalls[toolCallID] = { + ...match.call, + partID: part.id, + messageID: part.messageID, + sessionID: part.sessionID, + } + return part + }) + + const completeToolCall = Effect.fn("SessionProcessor.completeToolCall")(function* ( + toolCallID: string, + output: { + title: string + metadata: Record + output: string + attachments?: MessageV2.FilePart[] + }, + ) { + const match = yield* readToolCall(toolCallID) + if (!match || match.part.state.status !== "running") return + yield* session.updatePart({ + ...match.part, + state: { + status: "completed", + input: match.part.state.input, + output: output.output, + metadata: output.metadata, + title: output.title, + time: { start: match.part.state.time.start, end: Date.now() }, + attachments: output.attachments, + }, + }) + yield* settleToolCall(toolCallID) + }) + + const failToolCall = Effect.fn("SessionProcessor.failToolCall")(function* (toolCallID: string, error: unknown) { + const match = yield* readToolCall(toolCallID) + if (!match || match.part.state.status !== "running") return false + yield* session.updatePart({ + ...match.part, + state: { + status: "error", + input: match.part.state.input, + error: errorMessage(error), + time: { start: match.part.state.time.start, end: Date.now() }, + }, + }) + if (error instanceof Permission.RejectedError || error instanceof Question.RejectedError) { + ctx.blocked = ctx.shouldBreak + } + yield* settleToolCall(toolCallID) + return true + }) + const handleEvent = Effect.fn("SessionProcessor.handleEvent")(function* (value: StreamEvent) { switch (value.type) { case "start": @@ -154,8 +256,8 @@ export namespace SessionProcessor { if (ctx.assistantMessage.summary) { throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`) } - ctx.toolcalls[value.id] = yield* session.updatePart({ - id: ctx.toolcalls[value.id]?.id ?? PartID.ascending(), + const part = yield* session.updatePart({ + id: ctx.toolcalls[value.id]?.partID ?? PartID.ascending(), messageID: ctx.assistantMessage.id, sessionID: ctx.assistantMessage.sessionID, type: "tool", @@ -164,6 +266,12 @@ export namespace SessionProcessor { state: { status: "pending", input: {}, raw: "" }, metadata: value.providerExecuted ? { providerExecuted: true } : undefined, } satisfies MessageV2.ToolPart) + ctx.toolcalls[value.id] = { + done: yield* Deferred.make(), + partID: part.id, + messageID: part.messageID, + sessionID: part.sessionID, + } return case "tool-input-delta": @@ -176,14 +284,7 @@ export namespace SessionProcessor { if (ctx.assistantMessage.summary) { throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`) } - const pointer = ctx.toolcalls[value.toolCallId] - const match = yield* session.getPart({ - partID: pointer.id, - messageID: pointer.messageID, - sessionID: pointer.sessionID, - }) - if (!match || match.type !== "tool") return - ctx.toolcalls[value.toolCallId] = yield* session.updatePart({ + yield* updateToolCall(value.toolCallId, (match) => ({ ...match, tool: value.toolName, state: { @@ -195,7 +296,7 @@ export namespace SessionProcessor { metadata: match.metadata?.providerExecuted ? { ...value.providerMetadata, providerExecuted: true } : value.providerMetadata, - } satisfies MessageV2.ToolPart) + })) const parts = MessageV2.parts(ctx.assistantMessage.id) const recentParts = parts.slice(-DOOM_LOOP_THRESHOLD) @@ -226,41 +327,12 @@ export namespace SessionProcessor { } case "tool-result": { - const match = ctx.toolcalls[value.toolCallId] - if (!match || match.state.status !== "running") return - yield* session.updatePart({ - ...match, - state: { - status: "completed", - input: value.input ?? match.state.input, - output: value.output.output, - metadata: value.output.metadata, - title: value.output.title, - time: { start: match.state.time.start, end: Date.now() }, - attachments: value.output.attachments, - }, - }) - delete ctx.toolcalls[value.toolCallId] + yield* completeToolCall(value.toolCallId, value.output) return } case "tool-error": { - const match = ctx.toolcalls[value.toolCallId] - if (!match || match.state.status !== "running") return - - yield* session.updatePart({ - ...match, - state: { - status: "error", - input: value.input ?? match.state.input, - error: value.error instanceof Error ? value.error.message : String(value.error), - time: { start: match.state.time.start, end: Date.now() }, - }, - }) - if (value.error instanceof Permission.RejectedError || value.error instanceof Question.RejectedError) { - ctx.blocked = ctx.shouldBreak - } - delete ctx.toolcalls[value.toolCallId] + yield* failToolCall(value.toolCallId, value.error) return } @@ -413,7 +485,16 @@ export namespace SessionProcessor { } ctx.reasoningMap = {} - for (const part of Object.values(ctx.toolcalls)) { + yield* Effect.forEach( + Object.values(ctx.toolcalls), + (call) => Deferred.await(call.done).pipe(Effect.timeout("250 millis"), Effect.ignore), + { concurrency: "unbounded" }, + ) + + for (const toolCallID of Object.keys(ctx.toolcalls)) { + const match = yield* readToolCall(toolCallID) + if (!match) continue + const part = match.part const end = Date.now() const metadata = "metadata" in part.state && isRecord(part.state.metadata) ? part.state.metadata : {} yield* session.updatePart({ @@ -503,9 +584,8 @@ export namespace SessionProcessor { get message() { return ctx.assistantMessage }, - partFromToolCall(toolCallID: string) { - return ctx.toolcalls[toolCallID] - }, + updateToolCall, + completeToolCall, process, } satisfies Handle }) diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index 19f0850ff4c2..088a367cad5d 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -388,7 +388,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the model: Provider.Model session: Session.Info tools?: Record - processor: Pick + processor: Pick bypassAgentCheck: boolean messages: MessageV2.WithParts[] }) { @@ -405,10 +405,9 @@ NOTE: At any point in time through this workflow you should feel free to ask the messages: input.messages, metadata: (val) => Effect.runPromise( - Effect.gen(function* () { - const match = input.processor.partFromToolCall(options.toolCallId) - if (!match || !["running", "pending"].includes(match.state.status)) return - yield* sessions.updatePart({ + input.processor.updateToolCall(options.toolCallId, (match) => { + if (!["running", "pending"].includes(match.state.status)) return match + return { ...match, state: { title: val.title, @@ -417,7 +416,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the input: args, time: { start: Date.now() }, }, - }) + } }), ), ask: (req) => @@ -465,6 +464,9 @@ NOTE: At any point in time through this workflow you should feel free to ask the { tool: item.id, sessionID: ctx.sessionID, callID: ctx.callID, args }, output, ) + if (options.abortSignal?.aborted) { + yield* input.processor.completeToolCall(options.toolCallId, output) + } return output }), ) @@ -529,7 +531,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the ...(truncated.truncated && { outputPath: truncated.outputPath }), } - return { + const output = { title: "", metadata, output: truncated.content, @@ -541,6 +543,10 @@ NOTE: At any point in time through this workflow you should feel free to ask the })), content: result.content, } + if (opts.abortSignal?.aborted) { + yield* input.processor.completeToolCall(opts.toolCallId, output) + } + return output }), ) tools[key] = item diff --git a/packages/opencode/test/session/compaction.test.ts b/packages/opencode/test/session/compaction.test.ts index c37371d9f871..76a83c34da00 100644 --- a/packages/opencode/test/session/compaction.test.ts +++ b/packages/opencode/test/session/compaction.test.ts @@ -139,17 +139,8 @@ function fake( get message() { return msg }, - partFromToolCall() { - return { - id: PartID.ascending(), - messageID: msg.id, - sessionID: msg.sessionID, - type: "tool", - callID: "fake", - tool: "fake", - state: { status: "pending", input: {}, raw: "" }, - } - }, + updateToolCall: Effect.fn("TestSessionProcessor.updateToolCall")(() => Effect.succeed(undefined)), + completeToolCall: Effect.fn("TestSessionProcessor.completeToolCall")(() => Effect.void), process: Effect.fn("TestSessionProcessor.process")(() => Effect.succeed(result)), } satisfies SessionProcessorModule.SessionProcessor.Handle } diff --git a/packages/opencode/test/session/prompt-effect.test.ts b/packages/opencode/test/session/prompt-effect.test.ts index 38d7ed9f5aca..e4c46337c411 100644 --- a/packages/opencode/test/session/prompt-effect.test.ts +++ b/packages/opencode/test/session/prompt-effect.test.ts @@ -538,6 +538,93 @@ it.live("failed subtask preserves metadata on error tool state", () => ), ) +it.live( + "running subtask preserves metadata after tool-call transition", + () => + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ title: "Pinned" }) + yield* llm.hang + const msg = yield* user(chat.id, "hello") + yield* addSubtask(chat.id, msg.id) + + const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + + const tool = yield* Effect.promise(async () => { + const end = Date.now() + 5_000 + while (Date.now() < end) { + const msgs = await Effect.runPromise(MessageV2.filterCompactedEffect(chat.id)) + const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general") + const tool = taskMsg?.parts.find((part): part is MessageV2.ToolPart => part.type === "tool") + if (tool?.state.status === "running" && tool.state.metadata?.sessionId) return tool + await new Promise((done) => setTimeout(done, 20)) + } + throw new Error("timed out waiting for running subtask metadata") + }) + + if (tool.state.status !== "running") return + expect(typeof tool.state.metadata?.sessionId).toBe("string") + expect(tool.state.title).toBeDefined() + expect(tool.state.metadata?.model).toBeDefined() + + yield* prompt.cancel(chat.id) + yield* Fiber.await(fiber) + }), + { git: true, config: providerCfg }, + ), + 5_000, +) + +it.live( + "running task tool preserves metadata after tool-call transition", + () => + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ + title: "Pinned", + permission: [{ permission: "*", pattern: "*", action: "allow" }], + }) + yield* llm.tool("task", { + description: "inspect bug", + prompt: "look into the cache key path", + subagent_type: "general", + }) + yield* llm.hang + yield* user(chat.id, "hello") + + const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + + const tool = yield* Effect.promise(async () => { + const end = Date.now() + 5_000 + while (Date.now() < end) { + const msgs = await Effect.runPromise(MessageV2.filterCompactedEffect(chat.id)) + const assistant = msgs.findLast((item) => item.info.role === "assistant" && item.info.agent === "build") + const tool = assistant?.parts.find( + (part): part is MessageV2.ToolPart => part.type === "tool" && part.tool === "task", + ) + if (tool?.state.status === "running" && tool.state.metadata?.sessionId) return tool + await new Promise((done) => setTimeout(done, 20)) + } + throw new Error("timed out waiting for running task metadata") + }) + + if (tool.state.status !== "running") return + expect(typeof tool.state.metadata?.sessionId).toBe("string") + expect(tool.state.title).toBe("inspect bug") + expect(tool.state.metadata?.model).toBeDefined() + + yield* prompt.cancel(chat.id) + yield* Fiber.await(fiber) + }), + { git: true, config: providerCfg }, + ), + 10_000, +) + it.live( "loop sets status to busy then idle", () => @@ -1173,6 +1260,57 @@ unix( 30_000, ) +unix( + "cancel finalizes interrupted bash tool output through normal truncation", + () => + provideTmpdirServer( + ({ dir, llm }) => + Effect.gen(function* () { + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ + title: "Interrupted bash truncation", + permission: [{ permission: "*", pattern: "*", action: "allow" }], + }) + + yield* prompt.prompt({ + sessionID: chat.id, + agent: "build", + noReply: true, + parts: [{ type: "text", text: "run bash" }], + }) + + yield* llm.tool("bash", { + command: + 'i=0; while [ "$i" -lt 4000 ]; do printf "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx %05d\\n" "$i"; i=$((i + 1)); done; sleep 30', + description: "Print many lines", + timeout: 30_000, + workdir: path.resolve(dir), + }) + + const run = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + yield* llm.wait(1) + yield* Effect.sleep(150) + yield* prompt.cancel(chat.id) + + const exit = yield* Fiber.await(run) + expect(Exit.isSuccess(exit)).toBe(true) + if (Exit.isFailure(exit)) return + + const tool = completedTool(exit.value.parts) + if (!tool) return + + expect(tool.state.metadata.truncated).toBe(true) + expect(typeof tool.state.metadata.outputPath).toBe("string") + expect(tool.state.output).toContain("The tool call succeeded but the output was truncated.") + expect(tool.state.output).toContain("Full output saved to:") + expect(tool.state.output).not.toContain("Tool execution aborted") + }), + { git: true, config: providerCfg }, + ), + 30_000, +) + unix( "cancel interrupts loop queued behind shell", () =>