diff --git a/packages/opencode/src/effect/run-service.ts b/packages/opencode/src/effect/run-service.ts index f609986b5837..68bd57ed0cbf 100644 --- a/packages/opencode/src/effect/run-service.ts +++ b/packages/opencode/src/effect/run-service.ts @@ -7,7 +7,7 @@ import { Observability } from "./oltp" export const memoMap = Layer.makeMemoMapUnsafe() -function attach(effect: Effect.Effect): Effect.Effect { +export function attach(effect: Effect.Effect): Effect.Effect { try { const ctx = Instance.current return Effect.provideService(effect, InstanceRef, ctx) diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index 7f0a014ab249..c3275ad72926 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -45,7 +45,7 @@ import { decodeDataUrl } from "@/util/data-url" import { Process } from "@/util/process" import { Cause, Effect, Exit, Layer, Option, Scope, ServiceMap } from "effect" import { InstanceState } from "@/effect/instance-state" -import { makeRuntime } from "@/effect/run-service" +import { attach, makeRuntime } from "@/effect/run-service" import { TaskTool } from "@/tool/task" import { SessionRunState } from "./run-state" @@ -62,6 +62,64 @@ IMPORTANT: const STRUCTURED_OUTPUT_SYSTEM_PROMPT = `IMPORTANT: The user has requested structured output. You MUST use the StructuredOutput tool to provide your final response. Do NOT respond with plain text - you MUST call the StructuredOutput tool with your answer formatted according to the schema.` +/** + * Bridges an AI SDK Promise-based `execute` callback to Effect with graceful + * cancel semantics. + * + * On the happy path: runs `before`, awaits `execute()`, then `finalize(result)` + * and returns the output. + * + * On cancel mid-flight: the `onInterrupt` finalizer re-awaits the same in-flight + * native Promise uninterruptibly, runs `finalize` again on the eventual result, + * and posts it via `onCancel` (the processor side channel). This is what lets + * cancelled bash surface its truncated output through the normal completion + * path instead of getting stamped as aborted by processor cleanup. + * + * The returned Promise always resolves with a finalized output when one is + * available (even on interrupt), so the SDK reports the tool as successfully + * completed rather than as a tool-error. + * + * `attach` captures the current Instance context via InstanceRef so the + * onInterrupt finalizer — which runs outside the AsyncLocalStorage chain + * `execute()` is called from — can still resolve it through the ServiceMap. + */ +function runToolExecute(options: { + signal: AbortSignal | undefined + before: Effect.Effect + execute: () => Promise + finalize: (result: Raw) => Effect.Effect + onCancel: (output: Output) => Effect.Effect +}): Promise { + let pending: Promise | undefined + let rescued: Output | undefined + const wait = Effect.promise(() => pending!) + + const program = Effect.gen(function* () { + yield* options.before + pending = options.execute() + const result = yield* wait + return yield* options.finalize(result) + }).pipe( + // On interrupt, re-await the in-flight Promise uninterruptibly (finalizers + // always are), finalize it the same way, and post through the side channel. + // Stash the output so catchCause below can surface it instead of the cause. + Effect.onInterrupt(() => + Effect.gen(function* () { + if (pending === undefined) return + const result = yield* wait + const output = yield* options.finalize(result) + rescued = output + yield* options.onCancel(output) + }).pipe(Effect.catchCause(() => Effect.void)), + ), + Effect.catchCause((cause) => + Effect.suspend(() => (rescued !== undefined ? Effect.succeed(rescued!) : Effect.failCause(cause))), + ), + ) + + return Effect.runPromise(attach(program as Effect.Effect), { signal: options.signal }) +} + export namespace SessionPrompt { const log = Log.create({ service: "session.prompt" }) @@ -396,35 +454,28 @@ NOTE: At any point in time through this workflow you should feel free to ask the description: item.description, inputSchema: jsonSchema(schema as any), execute(args, options) { - return Effect.runPromise( - Effect.gen(function* () { - const ctx = context(args, options) - yield* plugin.trigger( - "tool.execute.before", - { tool: item.id, sessionID: ctx.sessionID, callID: ctx.callID }, - { args }, - ) - const result = yield* Effect.promise(() => item.execute(args, ctx)) - const output = { - ...result, - attachments: result.attachments?.map((attachment) => ({ - ...attachment, - id: PartID.ascending(), - sessionID: ctx.sessionID, - messageID: input.processor.message.id, - })), - } - yield* plugin.trigger( - "tool.execute.after", - { tool: item.id, sessionID: ctx.sessionID, callID: ctx.callID, args }, - output, - ) - if (options.abortSignal?.aborted) { - yield* input.processor.completeToolCall(options.toolCallId, output) - } - return output - }), - ) + const ctx = context(args, options) + const meta = { tool: item.id, sessionID: ctx.sessionID, callID: ctx.callID } + return runToolExecute({ + signal: options.abortSignal, + before: plugin.trigger("tool.execute.before", meta, { args }), + execute: () => item.execute(args, ctx), + finalize: (result) => + Effect.gen(function* () { + const output = { + ...result, + attachments: result.attachments?.map((attachment) => ({ + ...attachment, + id: PartID.ascending(), + sessionID: ctx.sessionID, + messageID: input.processor.message.id, + })), + } + yield* plugin.trigger("tool.execute.after", { ...meta, args }, output) + return output + }), + onCancel: (output) => input.processor.completeToolCall(options.toolCallId, output), + }) }, }) } @@ -436,74 +487,64 @@ NOTE: At any point in time through this workflow you should feel free to ask the const schema = yield* Effect.promise(() => Promise.resolve(asSchema(item.inputSchema).jsonSchema)) const transformed = ProviderTransform.schema(input.model, schema) item.inputSchema = jsonSchema(transformed) - item.execute = (args, opts) => - Effect.runPromise( - Effect.gen(function* () { - const ctx = context(args, opts) - yield* plugin.trigger( - "tool.execute.before", - { tool: key, sessionID: ctx.sessionID, callID: opts.toolCallId }, - { args }, - ) + item.execute = (args, opts) => { + const ctx = context(args, opts) + const meta = { tool: key, sessionID: ctx.sessionID, callID: opts.toolCallId } + type Raw = Awaited>> + return runToolExecute({ + signal: opts.abortSignal, + before: Effect.gen(function* () { + yield* plugin.trigger("tool.execute.before", meta, { args }) yield* Effect.promise(() => ctx.ask({ permission: key, metadata: {}, patterns: ["*"], always: ["*"] })) - const result: Awaited>> = yield* Effect.promise(() => - execute(args, opts), - ) - yield* plugin.trigger( - "tool.execute.after", - { tool: key, sessionID: ctx.sessionID, callID: opts.toolCallId, args }, - result, - ) - - const textParts: string[] = [] - const attachments: Omit[] = [] - for (const contentItem of result.content) { - if (contentItem.type === "text") textParts.push(contentItem.text) - else if (contentItem.type === "image") { - attachments.push({ - type: "file", - mime: contentItem.mimeType, - url: `data:${contentItem.mimeType};base64,${contentItem.data}`, - }) - } else if (contentItem.type === "resource") { - const { resource } = contentItem - if (resource.text) textParts.push(resource.text) - if (resource.blob) { + }), + execute: (): Promise => execute(args, opts), + finalize: (result) => + Effect.gen(function* () { + yield* plugin.trigger("tool.execute.after", { ...meta, args }, result) + const textParts: string[] = [] + const attachments: Omit[] = [] + for (const contentItem of result.content) { + if (contentItem.type === "text") textParts.push(contentItem.text) + else if (contentItem.type === "image") { attachments.push({ type: "file", - mime: resource.mimeType ?? "application/octet-stream", - url: `data:${resource.mimeType ?? "application/octet-stream"};base64,${resource.blob}`, - filename: resource.uri, + mime: contentItem.mimeType, + url: `data:${contentItem.mimeType};base64,${contentItem.data}`, }) + } else if (contentItem.type === "resource") { + const { resource } = contentItem + if (resource.text) textParts.push(resource.text) + if (resource.blob) { + attachments.push({ + type: "file", + mime: resource.mimeType ?? "application/octet-stream", + url: `data:${resource.mimeType ?? "application/octet-stream"};base64,${resource.blob}`, + filename: resource.uri, + }) + } } } - } - - const truncated = yield* truncate.output(textParts.join("\n\n"), {}, input.agent) - const metadata = { - ...(result.metadata ?? {}), - truncated: truncated.truncated, - ...(truncated.truncated && { outputPath: truncated.outputPath }), - } - - const output = { - title: "", - metadata, - output: truncated.content, - attachments: attachments.map((attachment) => ({ - ...attachment, - id: PartID.ascending(), - sessionID: ctx.sessionID, - messageID: input.processor.message.id, - })), - content: result.content, - } - if (opts.abortSignal?.aborted) { - yield* input.processor.completeToolCall(opts.toolCallId, output) - } - return output - }), - ) + const truncated = yield* truncate.output(textParts.join("\n\n"), {}, input.agent) + return { + title: "", + metadata: { + ...(result.metadata ?? {}), + truncated: truncated.truncated, + ...(truncated.truncated && { outputPath: truncated.outputPath }), + }, + output: truncated.content, + attachments: attachments.map((attachment) => ({ + ...attachment, + id: PartID.ascending(), + sessionID: ctx.sessionID, + messageID: input.processor.message.id, + })), + content: result.content, + } + }), + onCancel: (output) => input.processor.completeToolCall(opts.toolCallId, output), + }) + } tools[key] = item }