Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/opencode/src/effect/run-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { Observability } from "./oltp"

export const memoMap = Layer.makeMemoMapUnsafe()

function attach<A, E, R>(effect: Effect.Effect<A, E, R>): Effect.Effect<A, E, R> {
export function attach<A, E, R>(effect: Effect.Effect<A, E, R>): Effect.Effect<A, E, R> {
try {
const ctx = Instance.current
return Effect.provideService(effect, InstanceRef, ctx)
Expand Down
225 changes: 133 additions & 92 deletions packages/opencode/src/session/prompt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import { decodeDataUrl } from "@/util/data-url"
import { Process } from "@/util/process"
import { Cause, Effect, Exit, Layer, Option, Scope, ServiceMap } from "effect"
import { InstanceState } from "@/effect/instance-state"
import { makeRuntime } from "@/effect/run-service"
import { attach, makeRuntime } from "@/effect/run-service"
import { TaskTool } from "@/tool/task"
import { SessionRunState } from "./run-state"

Expand All @@ -62,6 +62,64 @@ IMPORTANT:

const STRUCTURED_OUTPUT_SYSTEM_PROMPT = `IMPORTANT: The user has requested structured output. You MUST use the StructuredOutput tool to provide your final response. Do NOT respond with plain text - you MUST call the StructuredOutput tool with your answer formatted according to the schema.`

/**
* Bridges an AI SDK Promise-based `execute` callback to Effect with graceful
* cancel semantics.
*
* On the happy path: runs `before`, awaits `execute()`, then `finalize(result)`
* and returns the output.
*
* On cancel mid-flight: the `onInterrupt` finalizer re-awaits the same in-flight
* native Promise uninterruptibly, runs `finalize` again on the eventual result,
* and posts it via `onCancel` (the processor side channel). This is what lets
* cancelled bash surface its truncated output through the normal completion
* path instead of getting stamped as aborted by processor cleanup.
*
* The returned Promise always resolves with a finalized output when one is
* available (even on interrupt), so the SDK reports the tool as successfully
* completed rather than as a tool-error.
*
* `attach` captures the current Instance context via InstanceRef so the
* onInterrupt finalizer — which runs outside the AsyncLocalStorage chain
* `execute()` is called from — can still resolve it through the ServiceMap.
*/
function runToolExecute<Raw, Output>(options: {
signal: AbortSignal | undefined
before: Effect.Effect<unknown, any, any>
execute: () => Promise<Raw>
finalize: (result: Raw) => Effect.Effect<Output, any, any>
onCancel: (output: Output) => Effect.Effect<unknown, any, any>
}): Promise<Output> {
let pending: Promise<Raw> | undefined
let rescued: Output | undefined
const wait = Effect.promise(() => pending!)

const program = Effect.gen(function* () {
yield* options.before
pending = options.execute()
const result = yield* wait
return yield* options.finalize(result)
}).pipe(
// On interrupt, re-await the in-flight Promise uninterruptibly (finalizers
// always are), finalize it the same way, and post through the side channel.
// Stash the output so catchCause below can surface it instead of the cause.
Effect.onInterrupt(() =>
Effect.gen(function* () {
if (pending === undefined) return
const result = yield* wait
const output = yield* options.finalize(result)
rescued = output
yield* options.onCancel(output)
}).pipe(Effect.catchCause(() => Effect.void)),
),
Effect.catchCause((cause) =>
Effect.suspend(() => (rescued !== undefined ? Effect.succeed(rescued!) : Effect.failCause(cause))),
),
)

return Effect.runPromise(attach(program as Effect.Effect<Output>), { signal: options.signal })
}

export namespace SessionPrompt {
const log = Log.create({ service: "session.prompt" })

Expand Down Expand Up @@ -396,35 +454,28 @@ NOTE: At any point in time through this workflow you should feel free to ask the
description: item.description,
inputSchema: jsonSchema(schema as any),
execute(args, options) {
return Effect.runPromise(
Effect.gen(function* () {
const ctx = context(args, options)
yield* plugin.trigger(
"tool.execute.before",
{ tool: item.id, sessionID: ctx.sessionID, callID: ctx.callID },
{ args },
)
const result = yield* Effect.promise(() => item.execute(args, ctx))
const output = {
...result,
attachments: result.attachments?.map((attachment) => ({
...attachment,
id: PartID.ascending(),
sessionID: ctx.sessionID,
messageID: input.processor.message.id,
})),
}
yield* plugin.trigger(
"tool.execute.after",
{ tool: item.id, sessionID: ctx.sessionID, callID: ctx.callID, args },
output,
)
if (options.abortSignal?.aborted) {
yield* input.processor.completeToolCall(options.toolCallId, output)
}
return output
}),
)
const ctx = context(args, options)
const meta = { tool: item.id, sessionID: ctx.sessionID, callID: ctx.callID }
return runToolExecute({
signal: options.abortSignal,
before: plugin.trigger("tool.execute.before", meta, { args }),
execute: () => item.execute(args, ctx),
finalize: (result) =>
Effect.gen(function* () {
const output = {
...result,
attachments: result.attachments?.map((attachment) => ({
...attachment,
id: PartID.ascending(),
sessionID: ctx.sessionID,
messageID: input.processor.message.id,
})),
}
yield* plugin.trigger("tool.execute.after", { ...meta, args }, output)
return output
}),
onCancel: (output) => input.processor.completeToolCall(options.toolCallId, output),
})
},
})
}
Expand All @@ -436,74 +487,64 @@ NOTE: At any point in time through this workflow you should feel free to ask the
const schema = yield* Effect.promise(() => Promise.resolve(asSchema(item.inputSchema).jsonSchema))
const transformed = ProviderTransform.schema(input.model, schema)
item.inputSchema = jsonSchema(transformed)
item.execute = (args, opts) =>
Effect.runPromise(
Effect.gen(function* () {
const ctx = context(args, opts)
yield* plugin.trigger(
"tool.execute.before",
{ tool: key, sessionID: ctx.sessionID, callID: opts.toolCallId },
{ args },
)
item.execute = (args, opts) => {
const ctx = context(args, opts)
const meta = { tool: key, sessionID: ctx.sessionID, callID: opts.toolCallId }
type Raw = Awaited<ReturnType<NonNullable<typeof execute>>>
return runToolExecute({
signal: opts.abortSignal,
before: Effect.gen(function* () {
yield* plugin.trigger("tool.execute.before", meta, { args })
yield* Effect.promise(() => ctx.ask({ permission: key, metadata: {}, patterns: ["*"], always: ["*"] }))
const result: Awaited<ReturnType<NonNullable<typeof execute>>> = yield* Effect.promise(() =>
execute(args, opts),
)
yield* plugin.trigger(
"tool.execute.after",
{ tool: key, sessionID: ctx.sessionID, callID: opts.toolCallId, args },
result,
)

const textParts: string[] = []
const attachments: Omit<MessageV2.FilePart, "id" | "sessionID" | "messageID">[] = []
for (const contentItem of result.content) {
if (contentItem.type === "text") textParts.push(contentItem.text)
else if (contentItem.type === "image") {
attachments.push({
type: "file",
mime: contentItem.mimeType,
url: `data:${contentItem.mimeType};base64,${contentItem.data}`,
})
} else if (contentItem.type === "resource") {
const { resource } = contentItem
if (resource.text) textParts.push(resource.text)
if (resource.blob) {
}),
execute: (): Promise<Raw> => execute(args, opts),
finalize: (result) =>
Effect.gen(function* () {
yield* plugin.trigger("tool.execute.after", { ...meta, args }, result)
const textParts: string[] = []
const attachments: Omit<MessageV2.FilePart, "id" | "sessionID" | "messageID">[] = []
for (const contentItem of result.content) {
if (contentItem.type === "text") textParts.push(contentItem.text)
else if (contentItem.type === "image") {
attachments.push({
type: "file",
mime: resource.mimeType ?? "application/octet-stream",
url: `data:${resource.mimeType ?? "application/octet-stream"};base64,${resource.blob}`,
filename: resource.uri,
mime: contentItem.mimeType,
url: `data:${contentItem.mimeType};base64,${contentItem.data}`,
})
} else if (contentItem.type === "resource") {
const { resource } = contentItem
if (resource.text) textParts.push(resource.text)
if (resource.blob) {
attachments.push({
type: "file",
mime: resource.mimeType ?? "application/octet-stream",
url: `data:${resource.mimeType ?? "application/octet-stream"};base64,${resource.blob}`,
filename: resource.uri,
})
}
}
}
}

const truncated = yield* truncate.output(textParts.join("\n\n"), {}, input.agent)
const metadata = {
...(result.metadata ?? {}),
truncated: truncated.truncated,
...(truncated.truncated && { outputPath: truncated.outputPath }),
}

const output = {
title: "",
metadata,
output: truncated.content,
attachments: attachments.map((attachment) => ({
...attachment,
id: PartID.ascending(),
sessionID: ctx.sessionID,
messageID: input.processor.message.id,
})),
content: result.content,
}
if (opts.abortSignal?.aborted) {
yield* input.processor.completeToolCall(opts.toolCallId, output)
}
return output
}),
)
const truncated = yield* truncate.output(textParts.join("\n\n"), {}, input.agent)
return {
title: "",
metadata: {
...(result.metadata ?? {}),
truncated: truncated.truncated,
...(truncated.truncated && { outputPath: truncated.outputPath }),
},
output: truncated.content,
attachments: attachments.map((attachment) => ({
...attachment,
id: PartID.ascending(),
sessionID: ctx.sessionID,
messageID: input.processor.message.id,
})),
content: result.content,
}
}),
onCancel: (output) => input.processor.completeToolCall(opts.toolCallId, output),
})
}
tools[key] = item
}

Expand Down
Loading