From ea19ee79660ee7179aa5e84147a3ab6d2460cf81 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Thu, 9 Apr 2026 16:05:17 -0400 Subject: [PATCH] refactor(session): use onInterrupt finalizer for cancelled tool output Wire the AI SDK's abortSignal into the tool fiber via runPromiseExit's signal option so interruption is first-class, and move the "finalize on cancel" path into an Effect.onInterrupt finalizer that re-awaits the still-running native Promise uninterruptibly, builds the output, and posts it through completeToolCall. Replaces the imperative `if (options.abortSignal?.aborted)` tail check with structural interruption handling. When the fiber is interrupted, the finalizer captures the truncated bash output (or MCP tool result) and the .then on runPromiseExit resolves the SDK's Promise with the captured value instead of propagating the interrupt cause as a rejection, so the tool is reported as successfully completed rather than as a tool-error. InstanceRef is provided on the tool fiber so InstanceState.context resolves through ServiceMap rather than falling through to the AsyncLocalStorage, which the onInterrupt finalizer runs outside of. --- packages/opencode/src/effect/run-service.ts | 2 +- packages/opencode/src/session/prompt.ts | 225 ++++++++++++-------- 2 files changed, 134 insertions(+), 93 deletions(-) diff --git a/packages/opencode/src/effect/run-service.ts b/packages/opencode/src/effect/run-service.ts index f609986b5837..68bd57ed0cbf 100644 --- a/packages/opencode/src/effect/run-service.ts +++ b/packages/opencode/src/effect/run-service.ts @@ -7,7 +7,7 @@ import { Observability } from "./oltp" export const memoMap = Layer.makeMemoMapUnsafe() -function attach(effect: Effect.Effect): Effect.Effect { +export function attach(effect: Effect.Effect): Effect.Effect { try { const ctx = Instance.current return Effect.provideService(effect, InstanceRef, ctx) diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index 7f0a014ab249..c3275ad72926 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -45,7 +45,7 @@ import { decodeDataUrl } from "@/util/data-url" import { Process } from "@/util/process" import { Cause, Effect, Exit, Layer, Option, Scope, ServiceMap } from "effect" import { InstanceState } from "@/effect/instance-state" -import { makeRuntime } from "@/effect/run-service" +import { attach, makeRuntime } from "@/effect/run-service" import { TaskTool } from "@/tool/task" import { SessionRunState } from "./run-state" @@ -62,6 +62,64 @@ IMPORTANT: const STRUCTURED_OUTPUT_SYSTEM_PROMPT = `IMPORTANT: The user has requested structured output. You MUST use the StructuredOutput tool to provide your final response. Do NOT respond with plain text - you MUST call the StructuredOutput tool with your answer formatted according to the schema.` +/** + * Bridges an AI SDK Promise-based `execute` callback to Effect with graceful + * cancel semantics. + * + * On the happy path: runs `before`, awaits `execute()`, then `finalize(result)` + * and returns the output. + * + * On cancel mid-flight: the `onInterrupt` finalizer re-awaits the same in-flight + * native Promise uninterruptibly, runs `finalize` again on the eventual result, + * and posts it via `onCancel` (the processor side channel). This is what lets + * cancelled bash surface its truncated output through the normal completion + * path instead of getting stamped as aborted by processor cleanup. + * + * The returned Promise always resolves with a finalized output when one is + * available (even on interrupt), so the SDK reports the tool as successfully + * completed rather than as a tool-error. + * + * `attach` captures the current Instance context via InstanceRef so the + * onInterrupt finalizer — which runs outside the AsyncLocalStorage chain + * `execute()` is called from — can still resolve it through the ServiceMap. + */ +function runToolExecute(options: { + signal: AbortSignal | undefined + before: Effect.Effect + execute: () => Promise + finalize: (result: Raw) => Effect.Effect + onCancel: (output: Output) => Effect.Effect +}): Promise { + let pending: Promise | undefined + let rescued: Output | undefined + const wait = Effect.promise(() => pending!) + + const program = Effect.gen(function* () { + yield* options.before + pending = options.execute() + const result = yield* wait + return yield* options.finalize(result) + }).pipe( + // On interrupt, re-await the in-flight Promise uninterruptibly (finalizers + // always are), finalize it the same way, and post through the side channel. + // Stash the output so catchCause below can surface it instead of the cause. + Effect.onInterrupt(() => + Effect.gen(function* () { + if (pending === undefined) return + const result = yield* wait + const output = yield* options.finalize(result) + rescued = output + yield* options.onCancel(output) + }).pipe(Effect.catchCause(() => Effect.void)), + ), + Effect.catchCause((cause) => + Effect.suspend(() => (rescued !== undefined ? Effect.succeed(rescued!) : Effect.failCause(cause))), + ), + ) + + return Effect.runPromise(attach(program as Effect.Effect), { signal: options.signal }) +} + export namespace SessionPrompt { const log = Log.create({ service: "session.prompt" }) @@ -396,35 +454,28 @@ NOTE: At any point in time through this workflow you should feel free to ask the description: item.description, inputSchema: jsonSchema(schema as any), execute(args, options) { - return Effect.runPromise( - Effect.gen(function* () { - const ctx = context(args, options) - yield* plugin.trigger( - "tool.execute.before", - { tool: item.id, sessionID: ctx.sessionID, callID: ctx.callID }, - { args }, - ) - const result = yield* Effect.promise(() => item.execute(args, ctx)) - const output = { - ...result, - attachments: result.attachments?.map((attachment) => ({ - ...attachment, - id: PartID.ascending(), - sessionID: ctx.sessionID, - messageID: input.processor.message.id, - })), - } - yield* plugin.trigger( - "tool.execute.after", - { tool: item.id, sessionID: ctx.sessionID, callID: ctx.callID, args }, - output, - ) - if (options.abortSignal?.aborted) { - yield* input.processor.completeToolCall(options.toolCallId, output) - } - return output - }), - ) + const ctx = context(args, options) + const meta = { tool: item.id, sessionID: ctx.sessionID, callID: ctx.callID } + return runToolExecute({ + signal: options.abortSignal, + before: plugin.trigger("tool.execute.before", meta, { args }), + execute: () => item.execute(args, ctx), + finalize: (result) => + Effect.gen(function* () { + const output = { + ...result, + attachments: result.attachments?.map((attachment) => ({ + ...attachment, + id: PartID.ascending(), + sessionID: ctx.sessionID, + messageID: input.processor.message.id, + })), + } + yield* plugin.trigger("tool.execute.after", { ...meta, args }, output) + return output + }), + onCancel: (output) => input.processor.completeToolCall(options.toolCallId, output), + }) }, }) } @@ -436,74 +487,64 @@ NOTE: At any point in time through this workflow you should feel free to ask the const schema = yield* Effect.promise(() => Promise.resolve(asSchema(item.inputSchema).jsonSchema)) const transformed = ProviderTransform.schema(input.model, schema) item.inputSchema = jsonSchema(transformed) - item.execute = (args, opts) => - Effect.runPromise( - Effect.gen(function* () { - const ctx = context(args, opts) - yield* plugin.trigger( - "tool.execute.before", - { tool: key, sessionID: ctx.sessionID, callID: opts.toolCallId }, - { args }, - ) + item.execute = (args, opts) => { + const ctx = context(args, opts) + const meta = { tool: key, sessionID: ctx.sessionID, callID: opts.toolCallId } + type Raw = Awaited>> + return runToolExecute({ + signal: opts.abortSignal, + before: Effect.gen(function* () { + yield* plugin.trigger("tool.execute.before", meta, { args }) yield* Effect.promise(() => ctx.ask({ permission: key, metadata: {}, patterns: ["*"], always: ["*"] })) - const result: Awaited>> = yield* Effect.promise(() => - execute(args, opts), - ) - yield* plugin.trigger( - "tool.execute.after", - { tool: key, sessionID: ctx.sessionID, callID: opts.toolCallId, args }, - result, - ) - - const textParts: string[] = [] - const attachments: Omit[] = [] - for (const contentItem of result.content) { - if (contentItem.type === "text") textParts.push(contentItem.text) - else if (contentItem.type === "image") { - attachments.push({ - type: "file", - mime: contentItem.mimeType, - url: `data:${contentItem.mimeType};base64,${contentItem.data}`, - }) - } else if (contentItem.type === "resource") { - const { resource } = contentItem - if (resource.text) textParts.push(resource.text) - if (resource.blob) { + }), + execute: (): Promise => execute(args, opts), + finalize: (result) => + Effect.gen(function* () { + yield* plugin.trigger("tool.execute.after", { ...meta, args }, result) + const textParts: string[] = [] + const attachments: Omit[] = [] + for (const contentItem of result.content) { + if (contentItem.type === "text") textParts.push(contentItem.text) + else if (contentItem.type === "image") { attachments.push({ type: "file", - mime: resource.mimeType ?? "application/octet-stream", - url: `data:${resource.mimeType ?? "application/octet-stream"};base64,${resource.blob}`, - filename: resource.uri, + mime: contentItem.mimeType, + url: `data:${contentItem.mimeType};base64,${contentItem.data}`, }) + } else if (contentItem.type === "resource") { + const { resource } = contentItem + if (resource.text) textParts.push(resource.text) + if (resource.blob) { + attachments.push({ + type: "file", + mime: resource.mimeType ?? "application/octet-stream", + url: `data:${resource.mimeType ?? "application/octet-stream"};base64,${resource.blob}`, + filename: resource.uri, + }) + } } } - } - - const truncated = yield* truncate.output(textParts.join("\n\n"), {}, input.agent) - const metadata = { - ...(result.metadata ?? {}), - truncated: truncated.truncated, - ...(truncated.truncated && { outputPath: truncated.outputPath }), - } - - const output = { - title: "", - metadata, - output: truncated.content, - attachments: attachments.map((attachment) => ({ - ...attachment, - id: PartID.ascending(), - sessionID: ctx.sessionID, - messageID: input.processor.message.id, - })), - content: result.content, - } - if (opts.abortSignal?.aborted) { - yield* input.processor.completeToolCall(opts.toolCallId, output) - } - return output - }), - ) + const truncated = yield* truncate.output(textParts.join("\n\n"), {}, input.agent) + return { + title: "", + metadata: { + ...(result.metadata ?? {}), + truncated: truncated.truncated, + ...(truncated.truncated && { outputPath: truncated.outputPath }), + }, + output: truncated.content, + attachments: attachments.map((attachment) => ({ + ...attachment, + id: PartID.ascending(), + sessionID: ctx.sessionID, + messageID: input.processor.message.id, + })), + content: result.content, + } + }), + onCancel: (output) => input.processor.completeToolCall(opts.toolCallId, output), + }) + } tools[key] = item }