Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 129 additions & 49 deletions packages/opencode/src/session/processor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Cause, Effect, Layer, ServiceMap } from "effect"
import { Cause, Deferred, Effect, Layer, ServiceMap } from "effect"
import * as Stream from "effect/Stream"
import { Agent } from "@/agent/agent"
import { Bus } from "@/bus"
Expand All @@ -18,6 +18,7 @@ import { SessionStatus } from "./status"
import { SessionSummary } from "./summary"
import type { Provider } from "@/provider/provider"
import { Question } from "@/question"
import { errorMessage } from "@/util/error"
import { isRecord } from "@/util/record"

export namespace SessionProcessor {
Expand All @@ -30,7 +31,19 @@ export namespace SessionProcessor {

export interface Handle {
readonly message: MessageV2.Assistant
readonly partFromToolCall: (toolCallID: string) => MessageV2.ToolPart | undefined
readonly updateToolCall: (
toolCallID: string,
update: (part: MessageV2.ToolPart) => MessageV2.ToolPart,
) => Effect.Effect<MessageV2.ToolPart | undefined>
readonly completeToolCall: (
toolCallID: string,
output: {
title: string
metadata: Record<string, any>
output: string
attachments?: MessageV2.FilePart[]
},
) => Effect.Effect<void>
readonly process: (streamInput: LLM.StreamInput) => Effect.Effect<Result>
}

Expand All @@ -44,8 +57,15 @@ export namespace SessionProcessor {
readonly create: (input: Input) => Effect.Effect<Handle>
}

type ToolCall = {
partID: MessageV2.ToolPart["id"]
messageID: MessageV2.ToolPart["messageID"]
sessionID: MessageV2.ToolPart["sessionID"]
done: Deferred.Deferred<void>
}

interface ProcessorContext extends Input {
toolcalls: Record<string, MessageV2.ToolPart>
toolcalls: Record<string, ToolCall>
shouldBreak: boolean
snapshot: string | undefined
blocked: boolean
Expand Down Expand Up @@ -108,6 +128,88 @@ export namespace SessionProcessor {
aborted,
})

const settleToolCall = Effect.fn("SessionProcessor.settleToolCall")(function* (toolCallID: string) {
const done = ctx.toolcalls[toolCallID]?.done
delete ctx.toolcalls[toolCallID]
if (done) yield* Deferred.succeed(done, undefined).pipe(Effect.ignore)
})

const readToolCall = Effect.fn("SessionProcessor.readToolCall")(function* (toolCallID: string) {
const call = ctx.toolcalls[toolCallID]
if (!call) return
const part = yield* session.getPart({
partID: call.partID,
messageID: call.messageID,
sessionID: call.sessionID,
})
if (!part || part.type !== "tool") {
delete ctx.toolcalls[toolCallID]
return
}
return { call, part }
})

const updateToolCall = Effect.fn("SessionProcessor.updateToolCall")(function* (
toolCallID: string,
update: (part: MessageV2.ToolPart) => MessageV2.ToolPart,
) {
const match = yield* readToolCall(toolCallID)
if (!match) return
const part = yield* session.updatePart(update(match.part))
ctx.toolcalls[toolCallID] = {
...match.call,
partID: part.id,
messageID: part.messageID,
sessionID: part.sessionID,
}
return part
})

const completeToolCall = Effect.fn("SessionProcessor.completeToolCall")(function* (
toolCallID: string,
output: {
title: string
metadata: Record<string, any>
output: string
attachments?: MessageV2.FilePart[]
},
) {
const match = yield* readToolCall(toolCallID)
if (!match || match.part.state.status !== "running") return
yield* session.updatePart({
...match.part,
state: {
status: "completed",
input: match.part.state.input,
output: output.output,
metadata: output.metadata,
title: output.title,
time: { start: match.part.state.time.start, end: Date.now() },
attachments: output.attachments,
},
})
yield* settleToolCall(toolCallID)
})

const failToolCall = Effect.fn("SessionProcessor.failToolCall")(function* (toolCallID: string, error: unknown) {
const match = yield* readToolCall(toolCallID)
if (!match || match.part.state.status !== "running") return false
yield* session.updatePart({
...match.part,
state: {
status: "error",
input: match.part.state.input,
error: errorMessage(error),
time: { start: match.part.state.time.start, end: Date.now() },
},
})
if (error instanceof Permission.RejectedError || error instanceof Question.RejectedError) {
ctx.blocked = ctx.shouldBreak
}
yield* settleToolCall(toolCallID)
return true
})

const handleEvent = Effect.fn("SessionProcessor.handleEvent")(function* (value: StreamEvent) {
switch (value.type) {
case "start":
Expand Down Expand Up @@ -154,8 +256,8 @@ export namespace SessionProcessor {
if (ctx.assistantMessage.summary) {
throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
}
ctx.toolcalls[value.id] = yield* session.updatePart({
id: ctx.toolcalls[value.id]?.id ?? PartID.ascending(),
const part = yield* session.updatePart({
id: ctx.toolcalls[value.id]?.partID ?? PartID.ascending(),
messageID: ctx.assistantMessage.id,
sessionID: ctx.assistantMessage.sessionID,
type: "tool",
Expand All @@ -164,6 +266,12 @@ export namespace SessionProcessor {
state: { status: "pending", input: {}, raw: "" },
metadata: value.providerExecuted ? { providerExecuted: true } : undefined,
} satisfies MessageV2.ToolPart)
ctx.toolcalls[value.id] = {
done: yield* Deferred.make<void>(),
partID: part.id,
messageID: part.messageID,
sessionID: part.sessionID,
}
return

case "tool-input-delta":
Expand All @@ -176,14 +284,7 @@ export namespace SessionProcessor {
if (ctx.assistantMessage.summary) {
throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
}
const pointer = ctx.toolcalls[value.toolCallId]
const match = yield* session.getPart({
partID: pointer.id,
messageID: pointer.messageID,
sessionID: pointer.sessionID,
})
if (!match || match.type !== "tool") return
ctx.toolcalls[value.toolCallId] = yield* session.updatePart({
yield* updateToolCall(value.toolCallId, (match) => ({
...match,
tool: value.toolName,
state: {
Expand All @@ -195,7 +296,7 @@ export namespace SessionProcessor {
metadata: match.metadata?.providerExecuted
? { ...value.providerMetadata, providerExecuted: true }
: value.providerMetadata,
} satisfies MessageV2.ToolPart)
}))

const parts = MessageV2.parts(ctx.assistantMessage.id)
const recentParts = parts.slice(-DOOM_LOOP_THRESHOLD)
Expand Down Expand Up @@ -226,41 +327,12 @@ export namespace SessionProcessor {
}

case "tool-result": {
const match = ctx.toolcalls[value.toolCallId]
if (!match || match.state.status !== "running") return
yield* session.updatePart({
...match,
state: {
status: "completed",
input: value.input ?? match.state.input,
output: value.output.output,
metadata: value.output.metadata,
title: value.output.title,
time: { start: match.state.time.start, end: Date.now() },
attachments: value.output.attachments,
},
})
delete ctx.toolcalls[value.toolCallId]
yield* completeToolCall(value.toolCallId, value.output)
return
}

case "tool-error": {
const match = ctx.toolcalls[value.toolCallId]
if (!match || match.state.status !== "running") return

yield* session.updatePart({
...match,
state: {
status: "error",
input: value.input ?? match.state.input,
error: value.error instanceof Error ? value.error.message : String(value.error),
time: { start: match.state.time.start, end: Date.now() },
},
})
if (value.error instanceof Permission.RejectedError || value.error instanceof Question.RejectedError) {
ctx.blocked = ctx.shouldBreak
}
delete ctx.toolcalls[value.toolCallId]
yield* failToolCall(value.toolCallId, value.error)
return
}

Expand Down Expand Up @@ -413,7 +485,16 @@ export namespace SessionProcessor {
}
ctx.reasoningMap = {}

for (const part of Object.values(ctx.toolcalls)) {
yield* Effect.forEach(
Object.values(ctx.toolcalls),
(call) => Deferred.await(call.done).pipe(Effect.timeout("250 millis"), Effect.ignore),
{ concurrency: "unbounded" },
)

for (const toolCallID of Object.keys(ctx.toolcalls)) {
const match = yield* readToolCall(toolCallID)
if (!match) continue
const part = match.part
const end = Date.now()
const metadata = "metadata" in part.state && isRecord(part.state.metadata) ? part.state.metadata : {}
yield* session.updatePart({
Expand Down Expand Up @@ -503,9 +584,8 @@ export namespace SessionProcessor {
get message() {
return ctx.assistantMessage
},
partFromToolCall(toolCallID: string) {
return ctx.toolcalls[toolCallID]
},
updateToolCall,
completeToolCall,
process,
} satisfies Handle
})
Expand Down
20 changes: 13 additions & 7 deletions packages/opencode/src/session/prompt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
model: Provider.Model
session: Session.Info
tools?: Record<string, boolean>
processor: Pick<SessionProcessor.Handle, "message" | "partFromToolCall">
processor: Pick<SessionProcessor.Handle, "message" | "updateToolCall" | "completeToolCall">
bypassAgentCheck: boolean
messages: MessageV2.WithParts[]
}) {
Expand All @@ -405,10 +405,9 @@ NOTE: At any point in time through this workflow you should feel free to ask the
messages: input.messages,
metadata: (val) =>
Effect.runPromise(
Effect.gen(function* () {
const match = input.processor.partFromToolCall(options.toolCallId)
if (!match || !["running", "pending"].includes(match.state.status)) return
yield* sessions.updatePart({
input.processor.updateToolCall(options.toolCallId, (match) => {
if (!["running", "pending"].includes(match.state.status)) return match
return {
...match,
state: {
title: val.title,
Expand All @@ -417,7 +416,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
input: args,
time: { start: Date.now() },
},
})
}
}),
),
ask: (req) =>
Expand Down Expand Up @@ -465,6 +464,9 @@ NOTE: At any point in time through this workflow you should feel free to ask the
{ tool: item.id, sessionID: ctx.sessionID, callID: ctx.callID, args },
output,
)
if (options.abortSignal?.aborted) {
yield* input.processor.completeToolCall(options.toolCallId, output)
}
return output
}),
)
Expand Down Expand Up @@ -529,7 +531,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
...(truncated.truncated && { outputPath: truncated.outputPath }),
}

return {
const output = {
title: "",
metadata,
output: truncated.content,
Expand All @@ -541,6 +543,10 @@ NOTE: At any point in time through this workflow you should feel free to ask the
})),
content: result.content,
}
if (opts.abortSignal?.aborted) {
yield* input.processor.completeToolCall(opts.toolCallId, output)
}
return output
}),
)
tools[key] = item
Expand Down
13 changes: 2 additions & 11 deletions packages/opencode/test/session/compaction.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,8 @@ function fake(
get message() {
return msg
},
partFromToolCall() {
return {
id: PartID.ascending(),
messageID: msg.id,
sessionID: msg.sessionID,
type: "tool",
callID: "fake",
tool: "fake",
state: { status: "pending", input: {}, raw: "" },
}
},
updateToolCall: Effect.fn("TestSessionProcessor.updateToolCall")(() => Effect.succeed(undefined)),
completeToolCall: Effect.fn("TestSessionProcessor.completeToolCall")(() => Effect.void),
process: Effect.fn("TestSessionProcessor.process")(() => Effect.succeed(result)),
} satisfies SessionProcessorModule.SessionProcessor.Handle
}
Expand Down
Loading
Loading