From b7162593b11df3ba4ff6648e858140a39c5a37f9 Mon Sep 17 00:00:00 2001 From: LydiaCai <1125862926@qq.com> Date: Tue, 7 Apr 2026 16:26:11 +0800 Subject: [PATCH] fix(acp): serialize session updates to prevent out-of-order messages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit All ACP sessionUpdate calls are now routed through a promise chain (`sendUpdate`) that guarantees ordering. After each prompt completes, a reconciliation step fetches the final assistant message and delivers any text/reasoning content that was missed by SSE delta events — fixing a race where streaming events arrive after end_turn. Also adds handling for the `question.asked` event via the ACP permission flow, and updates the question tool to use a two-step async pattern. --- packages/opencode/src/acp/agent.ts | 420 ++++++++++++++----- packages/opencode/src/session/processor.ts | 10 +- packages/opencode/src/tool/question.ts | 4 +- packages/opencode/test/tool/question.test.ts | 1 + packages/slack/package.json | 1 + 5 files changed, 336 insertions(+), 100 deletions(-) diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index 6e87e7642d65..06ae00efa026 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -48,6 +48,7 @@ import { z } from "zod" import { LoadAPIKeyError } from "ai" import type { AssistantMessage, Event, OpencodeClient, SessionMessageResponse, ToolPart } from "@opencode-ai/sdk/v2" import { applyPatch } from "diff" +import { Question } from "@/question" type ModeOption = { id: string; name: string; description?: string } type ModelOption = { modelId: string; name: string } @@ -77,7 +78,7 @@ export namespace ACP { } async function sendUsageUpdate( - connection: AgentSideConnection, + agent: Agent, sdk: OpencodeClient, sessionID: string, directory: string, @@ -111,8 +112,7 @@ export namespace ACP { const used = msg.tokens.input + (msg.tokens.cache?.read ?? 0) const totalCost = assistantMessages.reduce((sum, m) => sum + m.info.cost, 0) - await connection - .sessionUpdate({ + await agent.sendSessionUpdate({ sessionId: sessionID, update: { sessionUpdate: "usage_update", @@ -121,9 +121,6 @@ export namespace ACP { cost: { amount: totalCost, currency: "USD" }, }, }) - .catch((error) => { - log.error("failed to send usage update", { error }) - }) } export async function init({ sdk: _sdk }: { sdk: OpencodeClient }) { @@ -144,12 +141,90 @@ export namespace ACP { private bashSnapshots = new Map() private toolStarts = new Set() private permissionQueues = new Map>() + private questionQueues = new Map>() private permissionOptions: PermissionOption[] = [ { optionId: "once", kind: "allow_once", name: "Allow once" }, { optionId: "always", kind: "allow_always", name: "Always allow" }, { optionId: "reject", kind: "reject_once", name: "Reject" }, ] + // Promise chain to serialize all sessionUpdate calls, ensuring ordering. + private sendChain: Promise = Promise.resolve() + + // Track characters sent per partID via delta events, for post-prompt reconciliation. + private sentDeltaChars = new Map() + // Parts that have been reconciled — delta handler must skip these to avoid duplicates. + private reconciledParts = new Set() + + private sendUpdate( + params: Parameters[0], + ): Promise { + const next = this.sendChain.then(() => + this.connection.sessionUpdate(params).catch((error) => { + log.error("failed to send session update to ACP", { error }) + }), + ) + this.sendChain = next.catch(() => {}) + return next + } + + private drainUpdates(): Promise { + return this.sendChain + } + + /** + * After prompt() returns, fetch the complete assistant message and send any + * text/reasoning content that was not already delivered via delta events. + * This handles the race where SSE events haven't arrived before end_turn. + */ + private async reconcileAssistantMessage(sessionId: string, directory: string): Promise { + const messages = await this.sdk.session + .messages({ sessionID: sessionId, directory }, { throwOnError: true }) + .then((x) => x.data) + .catch((error) => { + log.error("failed to fetch messages for reconciliation", { error }) + return undefined + }) + if (!messages) return + + const lastAssistant = messages.findLast((m) => m.info.role === "assistant") + if (!lastAssistant) return + + for (const part of lastAssistant.parts) { + if (part.type === "text" && part.text) { + // Mark reconciled BEFORE reading sentDeltaChars, so any delta events + // arriving during the await below will be skipped. + this.reconciledParts.add(part.id) + const sent = this.sentDeltaChars.get(part.id) ?? 0 + this.sentDeltaChars.delete(part.id) + if (sent < part.text.length) { + const remaining = part.text.substring(sent) + await this.sendUpdate({ + sessionId, + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: remaining }, + }, + }) + } + } else if (part.type === "reasoning" && part.text) { + this.reconciledParts.add(part.id) + const sent = this.sentDeltaChars.get(part.id) ?? 0 + this.sentDeltaChars.delete(part.id) + if (sent < part.text.length) { + const remaining = part.text.substring(sent) + await this.sendUpdate({ + sessionId, + update: { + sessionUpdate: "agent_thought_chunk", + content: { type: "text", text: remaining }, + }, + }) + } + } + } + } + constructor(connection: AgentSideConnection, config: ACPConfig) { this.connection = connection this.config = config @@ -267,6 +342,200 @@ export namespace ACP { return } + case "question.asked": { + const question = event.properties + const session = this.sessionManager.tryGet(question.sessionID) + if (!session) return + + const prev = this.questionQueues.get(question.sessionID) ?? Promise.resolve() + const next = prev + .then(async () => { + const directory = session.cwd + + // Build flattened options for all questions: q{questionIndex}_opt{optionIndex} + const options: PermissionOption[] = [] + for (let qIdx = 0; qIdx < question.questions.length; qIdx++) { + const q = question.questions[qIdx] + for (let optIdx = 0; optIdx < q.options.length; optIdx++) { + options.push({ + optionId: `q${qIdx}_opt${optIdx}`, + kind: "allow_once" as const, + name: q.options[optIdx].label, + }) + } + } + + // Build title from first question (or combine headers) + const firstQ = question.questions[0] + const title = firstQ.header ?? firstQ.question + + // Build rawInput with questions array (matching cc.json format) + const rawInput = { + questions: question.questions.map((q: Question.Info) => ({ + question: q.question, + header: q.header, + options: q.options, + multiSelect: q.multiple ?? false, + })), + } + + // Build _meta with full questions array (matching cc.json format) + const metaQuestions = question.questions.map((q: Question.Info) => ({ + question: q.question, + header: q.header, + options: q.options, + multiSelect: q.multiple ?? false, + })) + + const res = await this.connection + .requestPermission({ + sessionId: question.sessionID, + toolCall: { + toolCallId: question.tool?.callID ?? question.id, + title, + rawInput, + }, + options, + _meta: { + askUserQuestion: { + questions: metaQuestions, + }, + }, + }) + .catch(async (error) => { + log.error("failed to request permission for question from ACP", { + error, + questionID: question.id, + sessionID: question.sessionID, + }) + await this.sdk.question.reject({ + requestID: question.id, + directory, + }) + return undefined + }) + + if (!res) { + await this.sdk.question.reject({ + requestID: question.id, + directory, + }) + return + } + + if (res.outcome.outcome == "cancelled") { + await this.sdk.question.reject({ + requestID: question.id, + directory, + }) + + // Send tool_call_update to client + await this.sendUpdate({ + sessionId: question.sessionID, + update: { + sessionUpdate: "tool_call_update", + toolCallId: question.tool?.callID ?? question.id, + status: "failed", + content: [ + { + type: "content", + content: { + type: "text", + text: "```\nTool permission request failed: Error: Question cancelled\n```", + }, + }, + ], + _meta: { + opencode: { + toolName: "AskUserQuestion", + }, + }, + }, + }) + return + } + + if (res.outcome.outcome !== "selected") { + await this.sdk.question.reject({ + requestID: question.id, + directory, + }) + return + } + + // Parse response to build answers array + const answers: string[][] = [] + + // Check for _meta.answers format: { "question text": ["answer1", "answer2"] } + const metaAnswers = (res._meta as Record | undefined)?.answers as + | Record + | undefined + if (metaAnswers && typeof metaAnswers === "object") { + for (const questionItem of question.questions) { + const key = questionItem.header + ? `${questionItem.header}: ${questionItem.question}` + : questionItem.question + const answer = metaAnswers[key] ?? metaAnswers[questionItem.question] ?? [] + answers.push(Array.isArray(answer) ? answer : [answer]) + } + } else { + // Parse optionId(s) in format: q{qIdx}_opt{optIdx} or q{qIdx}_other + const optionId = res.outcome.optionId + + // Initialize answers array with empty arrays for each question + for (let i = 0; i < question.questions.length; i++) { + answers.push([]) + } + + // Parse the optionId - could be single or multiple (comma-separated) + const selectedIds = optionId.split(",").map((s: string) => s.trim()) + for (const selId of selectedIds) { + const match = selId.match(/^q(\d+)_(opt(\d+)|other(?::(.*))?)?$/) + if (match) { + const qIdx = parseInt(match[1], 10) + if (qIdx < question.questions.length) { + if (match[2]?.startsWith("opt")) { + const optIdx = parseInt(match[3], 10) + const q = question.questions[qIdx] + if (optIdx < q.options.length) { + answers[qIdx].push(q.options[optIdx].label) + } + } else if (match[2]?.startsWith("other")) { + // Custom "Other" answer: q{idx}_other:customText + const customText = match[4] ?? "" + answers[qIdx].push(customText) + } + } + } + } + } + + // Send all answers at once + await this.sdk.question + .reply( + { + requestID: question.id, + answers, + directory, + }, + { throwOnError: true }, + ) + .catch((error) => { + log.error("failed to reply to question", { error, questionID: question.id }) + }) + }) + .catch((error: unknown) => { + log.error("failed to handle question", { error, questionID: question.id }) + }) + .finally(() => { + if (this.questionQueues.get(question.sessionID) === next) { + this.questionQueues.delete(question.sessionID) + } + }) + this.questionQueues.set(question.sessionID, next) + return + } + case "message.part.updated": { log.info("message part updated", { event: event.properties }) const props = event.properties @@ -290,8 +559,7 @@ export namespace ACP { const hash = Hash.fast(output) if (part.tool === "bash") { if (this.bashSnapshots.get(part.callID) === hash) { - await this.connection - .sessionUpdate({ + await this.sendUpdate({ sessionId, update: { sessionUpdate: "tool_call_update", @@ -303,9 +571,6 @@ export namespace ACP { rawInput: part.state.input, }, }) - .catch((error) => { - log.error("failed to send tool in_progress to ACP", { error }) - }) return } this.bashSnapshots.set(part.callID, hash) @@ -318,8 +583,7 @@ export namespace ACP { }, }) } - await this.connection - .sessionUpdate({ + await this.sendUpdate({ sessionId, update: { sessionUpdate: "tool_call_update", @@ -332,9 +596,6 @@ export namespace ACP { ...(content.length > 0 && { content }), }, }) - .catch((error) => { - log.error("failed to send tool in_progress to ACP", { error }) - }) return case "completed": { @@ -372,8 +633,7 @@ export namespace ACP { if (part.tool === "todowrite") { const parsedTodos = z.array(Todo.Info).safeParse(JSON.parse(part.state.output)) if (parsedTodos.success) { - await this.connection - .sessionUpdate({ + await this.sendUpdate({ sessionId, update: { sessionUpdate: "plan", @@ -388,16 +648,12 @@ export namespace ACP { }), }, }) - .catch((error) => { - log.error("failed to send session update for todo", { error }) - }) } else { log.error("failed to parse todo output", { error: parsedTodos.error }) } } - await this.connection - .sessionUpdate({ + await this.sendUpdate({ sessionId, update: { sessionUpdate: "tool_call_update", @@ -413,16 +669,12 @@ export namespace ACP { }, }, }) - .catch((error) => { - log.error("failed to send tool completed to ACP", { error }) - }) return } case "error": this.toolStarts.delete(part.callID) this.bashSnapshots.delete(part.callID) - await this.connection - .sessionUpdate({ + await this.sendUpdate({ sessionId, update: { sessionUpdate: "tool_call_update", @@ -446,9 +698,6 @@ export namespace ACP { }, }, }) - .catch((error) => { - log.error("failed to send tool error to ACP", { error }) - }) return } } @@ -494,9 +743,12 @@ export namespace ACP { const part = message.parts.find((p) => p.id === props.partID) if (!part) return + // Skip deltas for parts already reconciled after prompt() returned. + if (this.reconciledParts.has(props.partID)) return + if (part.type === "text" && props.field === "text" && part.ignored !== true) { - await this.connection - .sessionUpdate({ + this.sentDeltaChars.set(props.partID, (this.sentDeltaChars.get(props.partID) ?? 0) + props.delta.length) + await this.sendUpdate({ sessionId, update: { sessionUpdate: "agent_message_chunk", @@ -507,15 +759,12 @@ export namespace ACP { }, }, }) - .catch((error) => { - log.error("failed to send text delta to ACP", { error }) - }) return } if (part.type === "reasoning" && props.field === "text") { - await this.connection - .sessionUpdate({ + this.sentDeltaChars.set(props.partID, (this.sentDeltaChars.get(props.partID) ?? 0) + props.delta.length) + await this.sendUpdate({ sessionId, update: { sessionUpdate: "agent_thought_chunk", @@ -526,9 +775,6 @@ export namespace ACP { }, }, }) - .catch((error) => { - log.error("failed to send reasoning delta to ACP", { error }) - }) } return } @@ -676,7 +922,7 @@ export namespace ACP { await this.processMessage(msg) } - await sendUsageUpdate(this.connection, this.sdk, sessionId, directory) + await sendUsageUpdate(this, this.sdk, sessionId, directory) return result } catch (e) { @@ -786,7 +1032,7 @@ export namespace ACP { await this.processMessage(msg) } - await sendUsageUpdate(this.connection, this.sdk, sessionId, directory) + await sendUsageUpdate(this, this.sdk, sessionId, directory) return mode } catch (e) { @@ -817,7 +1063,7 @@ export namespace ACP { sessionId, }) - await sendUsageUpdate(this.connection, this.sdk, sessionId, directory) + await sendUsageUpdate(this, this.sdk, sessionId, directory) return result } catch (e) { @@ -855,8 +1101,7 @@ export namespace ACP { }, }) } - await this.connection - .sessionUpdate({ + await this.sendUpdate({ sessionId, update: { sessionUpdate: "tool_call_update", @@ -869,9 +1114,6 @@ export namespace ACP { ...(runningContent.length > 0 && { content: runningContent }), }, }) - .catch((err) => { - log.error("failed to send tool in_progress to ACP", { error: err }) - }) break case "completed": this.toolStarts.delete(part.callID) @@ -908,8 +1150,7 @@ export namespace ACP { if (part.tool === "todowrite") { const parsedTodos = z.array(Todo.Info).safeParse(JSON.parse(part.state.output)) if (parsedTodos.success) { - await this.connection - .sessionUpdate({ + await this.sendUpdate({ sessionId, update: { sessionUpdate: "plan", @@ -924,16 +1165,12 @@ export namespace ACP { }), }, }) - .catch((err) => { - log.error("failed to send session update for todo", { error: err }) - }) } else { log.error("failed to parse todo output", { error: parsedTodos.error }) } } - await this.connection - .sessionUpdate({ + await this.sendUpdate({ sessionId, update: { sessionUpdate: "tool_call_update", @@ -949,15 +1186,11 @@ export namespace ACP { }, }, }) - .catch((err) => { - log.error("failed to send tool completed to ACP", { error: err }) - }) break case "error": this.toolStarts.delete(part.callID) this.bashSnapshots.delete(part.callID) - await this.connection - .sessionUpdate({ + await this.sendUpdate({ sessionId, update: { sessionUpdate: "tool_call_update", @@ -981,16 +1214,12 @@ export namespace ACP { }, }, }) - .catch((err) => { - log.error("failed to send tool error to ACP", { error: err }) - }) break } } else if (part.type === "text") { if (part.text) { const audience: Role[] | undefined = part.synthetic ? ["assistant"] : part.ignored ? ["user"] : undefined - await this.connection - .sessionUpdate({ + await this.sendUpdate({ sessionId, update: { sessionUpdate: message.info.role === "user" ? "user_message_chunk" : "agent_message_chunk", @@ -1002,9 +1231,6 @@ export namespace ACP { }, }, }) - .catch((err) => { - log.error("failed to send text to ACP", { error: err }) - }) } } else if (part.type === "file") { // Replay file attachments as appropriate ACP content blocks. @@ -1021,8 +1247,7 @@ export namespace ACP { if (url.startsWith("file://")) { // Local file reference - send as resource_link - await this.connection - .sessionUpdate({ + await this.sendUpdate({ sessionId, update: { sessionUpdate: messageChunk, @@ -1030,9 +1255,6 @@ export namespace ACP { content: { type: "resource_link", uri: url, name: filename, mimeType: mime }, }, }) - .catch((err) => { - log.error("failed to send resource_link to ACP", { error: err }) - }) } else if (url.startsWith("data:")) { // Embedded content - parse data URL and send as appropriate block type const base64Match = url.match(/^data:([^;]+);base64,(.*)$/) @@ -1043,8 +1265,7 @@ export namespace ACP { if (effectiveMime.startsWith("image/")) { // Image - send as image block - await this.connection - .sessionUpdate({ + await this.sendUpdate({ sessionId, update: { sessionUpdate: messageChunk, @@ -1057,9 +1278,6 @@ export namespace ACP { }, }, }) - .catch((err) => { - log.error("failed to send image to ACP", { error: err }) - }) } else { // Non-image: text types get decoded, binary types stay as blob const isText = effectiveMime.startsWith("text/") || effectiveMime === "application/json" @@ -1072,8 +1290,7 @@ export namespace ACP { } : { uri: fileUri, mimeType: effectiveMime, blob: base64Data } - await this.connection - .sessionUpdate({ + await this.sendUpdate({ sessionId, update: { sessionUpdate: messageChunk, @@ -1081,16 +1298,12 @@ export namespace ACP { content: { type: "resource", resource }, }, }) - .catch((err) => { - log.error("failed to send resource to ACP", { error: err }) - }) } } // URLs that don't match file:// or data: are skipped (unsupported) } else if (part.type === "reasoning") { if (part.text) { - await this.connection - .sessionUpdate({ + await this.sendUpdate({ sessionId, update: { sessionUpdate: "agent_thought_chunk", @@ -1101,9 +1314,6 @@ export namespace ACP { }, }, }) - .catch((err) => { - log.error("failed to send reasoning to ACP", { error: err }) - }) } } } @@ -1120,8 +1330,7 @@ export namespace ACP { private async toolStart(sessionId: string, part: ToolPart) { if (this.toolStarts.has(part.callID)) return this.toolStarts.add(part.callID) - await this.connection - .sessionUpdate({ + await this.sendUpdate({ sessionId, update: { sessionUpdate: "tool_call", @@ -1133,9 +1342,6 @@ export namespace ACP { rawInput: {}, }, }) - .catch((error) => { - log.error("failed to send tool pending to ACP", { error }) - }) } private async loadAvailableModes(directory: string): Promise { @@ -1259,7 +1465,7 @@ export namespace ACP { ) setTimeout(() => { - this.connection.sessionUpdate({ + this.sendUpdate({ sessionId, update: { sessionUpdate: "available_commands_update", @@ -1358,6 +1564,10 @@ export namespace ACP { } async prompt(params: PromptRequest) { + // Clear stale reconciliation state from previous prompt. + this.reconciledParts.clear() + this.sentDeltaChars.clear() + const sessionID = params.sessionId const session = this.sessionManager.get(sessionID) const directory = session.cwd @@ -1486,7 +1696,9 @@ export namespace ACP { }) const msg = response.data?.info - await sendUsageUpdate(this.connection, this.sdk, sessionID, directory) + await this.drainUpdates() + await this.reconcileAssistantMessage(sessionID, directory) + await sendUsageUpdate(this, this.sdk, sessionID, directory) return { stopReason: "end_turn" as const, @@ -1509,7 +1721,9 @@ export namespace ACP { }) const msg = response.data?.info - await sendUsageUpdate(this.connection, this.sdk, sessionID, directory) + await this.drainUpdates() + await this.reconcileAssistantMessage(sessionID, directory) + await sendUsageUpdate(this, this.sdk, sessionID, directory) return { stopReason: "end_turn" as const, @@ -1532,7 +1746,9 @@ export namespace ACP { break } - await sendUsageUpdate(this.connection, this.sdk, sessionID, directory) + await this.drainUpdates() + await this.reconcileAssistantMessage(sessionID, directory) + await sendUsageUpdate(this, this.sdk, sessionID, directory) return { stopReason: "end_turn" as const, @@ -1550,6 +1766,14 @@ export namespace ACP { { throwOnError: true }, ) } + + hasSession(sessionID: string): boolean { + return this.sessionManager.tryGet(sessionID) !== undefined + } + + async sendSessionUpdate(params: Parameters[0]) { + await this.sendUpdate(params) + } } function toToolKind(toolName: string): ToolKind { diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 146c73f27712..0bfd88e2af9c 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -484,7 +484,15 @@ export namespace SessionProcessor { yield* abort() } if (ctx.needsCompaction) return "compact" - if (ctx.blocked || ctx.assistantMessage.error || aborted) return "stop" + if (ctx.blocked || ctx.assistantMessage.error || aborted) { + log.info("process stop", { + sessionID: ctx.sessionID, + blocked: ctx.blocked, + error: ctx.assistantMessage.error, + aborted, + }) + return "stop" + } return "continue" }).pipe(Effect.onInterrupt(() => abort().pipe(Effect.asVoid))) }) diff --git a/packages/opencode/src/tool/question.ts b/packages/opencode/src/tool/question.ts index dd99688880ea..291b8cbfa103 100644 --- a/packages/opencode/src/tool/question.ts +++ b/packages/opencode/src/tool/question.ts @@ -21,7 +21,7 @@ export const QuestionTool = Tool.defineEffect, ctx: Tool.Context) { - const answers = await question + const result = await question .ask({ sessionID: ctx.sessionID, questions: params.questions, @@ -29,6 +29,8 @@ export const QuestionTool = Tool.defineEffect `"${q.question}"="${answers[i]?.length ? answers[i].join(", ") : "Unanswered"}"`) .join(", ") diff --git a/packages/opencode/test/tool/question.test.ts b/packages/opencode/test/tool/question.test.ts index f1d9492ca8c4..42a7c0abb28b 100644 --- a/packages/opencode/test/tool/question.test.ts +++ b/packages/opencode/test/tool/question.test.ts @@ -17,6 +17,7 @@ const ctx = { messages: [], metadata: () => {}, ask: async () => {}, + question: async () => [], } const it = testEffect(Layer.mergeAll(Question.defaultLayer, CrossSpawnSpawner.defaultLayer)) diff --git a/packages/slack/package.json b/packages/slack/package.json index dad6eac206b3..58d46b9378d4 100644 --- a/packages/slack/package.json +++ b/packages/slack/package.json @@ -12,6 +12,7 @@ "@slack/bolt": "^3.17.1" }, "devDependencies": { + "@types/cross-spawn": "catalog:", "@types/node": "catalog:", "typescript": "catalog:", "@typescript/native-preview": "catalog:"