diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index ebd65bb26da1..26f6e5fefa52 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -20,7 +20,7 @@ import { } from "@agentclientprotocol/sdk" import { Log } from "../util/log" import { ACPSessionManager } from "./session" -import type { ACPConfig, ACPSessionState } from "./types" +import type { ACPConfig } from "./types" import { Provider } from "../provider/provider" import { Agent as AgentModule } from "../agent/agent" import { Installation } from "@/installation" @@ -29,7 +29,7 @@ import { Config } from "@/config/config" import { Todo } from "@/session/todo" import { z } from "zod" import { LoadAPIKeyError } from "ai" -import type { OpencodeClient, SessionMessageResponse } from "@opencode-ai/sdk/v2" +import type { Event, OpencodeClient, SessionMessageResponse } from "@opencode-ai/sdk/v2" import { applyPatch } from "diff" export namespace ACP { @@ -47,304 +47,354 @@ export namespace ACP { private connection: AgentSideConnection private config: ACPConfig private sdk: OpencodeClient - private sessionManager + private sessionManager: ACPSessionManager + private eventAbort = new AbortController() + private eventStarted = false + private permissionQueues = new Map>() + private permissionOptions: PermissionOption[] = [ + { optionId: "once", kind: "allow_once", name: "Allow once" }, + { optionId: "always", kind: "allow_always", name: "Always allow" }, + { optionId: "reject", kind: "reject_once", name: "Reject" }, + ] constructor(connection: AgentSideConnection, config: ACPConfig) { this.connection = connection this.config = config this.sdk = config.sdk this.sessionManager = new ACPSessionManager(this.sdk) + this.startEventSubscription() } - private setupEventSubscriptions(session: ACPSessionState) { - const sessionId = session.id - const directory = session.cwd + private startEventSubscription() { + if (this.eventStarted) return + this.eventStarted = true + this.runEventSubscription().catch((error) => { + if (this.eventAbort.signal.aborted) return + log.error("event subscription failed", { error }) + }) + } - const options: PermissionOption[] = [ - { optionId: "once", kind: "allow_once", name: "Allow once" }, - { optionId: "always", kind: "allow_always", name: "Always allow" }, - { optionId: "reject", kind: "reject_once", name: "Reject" }, - ] - this.config.sdk.event.subscribe({ directory }).then(async (events) => { + private async runEventSubscription() { + while (true) { + if (this.eventAbort.signal.aborted) return + const events = await this.sdk.global.event({ + signal: this.eventAbort.signal, + }) for await (const event of events.stream) { - switch (event.type) { - case "permission.asked": - try { - const permission = event.properties - const res = await this.connection - .requestPermission({ - sessionId, - toolCall: { - toolCallId: permission.tool?.callID ?? permission.id, - status: "pending", - title: permission.permission, - rawInput: permission.metadata, - kind: toToolKind(permission.permission), - locations: toLocations(permission.permission, permission.metadata), - }, - options, - }) - .catch(async (error) => { - log.error("failed to request permission from ACP", { - error, - permissionID: permission.id, - sessionID: permission.sessionID, - }) - await this.config.sdk.permission.reply({ - requestID: permission.id, - reply: "reject", - directory, - }) - return + if (this.eventAbort.signal.aborted) return + const payload = (event as any)?.payload + if (!payload) continue + await this.handleEvent(payload as Event).catch((error) => { + log.error("failed to handle event", { error, type: payload.type }) + }) + } + } + } + + private async handleEvent(event: Event) { + switch (event.type) { + case "permission.asked": { + const permission = event.properties + const session = this.sessionManager.tryGet(permission.sessionID) + if (!session) return + + const prev = this.permissionQueues.get(permission.sessionID) ?? Promise.resolve() + const next = prev + .then(async () => { + const directory = session.cwd + + const res = await this.connection + .requestPermission({ + sessionId: permission.sessionID, + toolCall: { + toolCallId: permission.tool?.callID ?? permission.id, + status: "pending", + title: permission.permission, + rawInput: permission.metadata, + kind: toToolKind(permission.permission), + locations: toLocations(permission.permission, permission.metadata), + }, + options: this.permissionOptions, + }) + .catch(async (error) => { + log.error("failed to request permission from ACP", { + error, + permissionID: permission.id, + sessionID: permission.sessionID, }) - if (!res) return - if (res.outcome.outcome !== "selected") { - await this.config.sdk.permission.reply({ + await this.sdk.permission.reply({ requestID: permission.id, reply: "reject", directory, }) - return - } - if (res.outcome.optionId !== "reject" && permission.permission == "edit") { - const metadata = permission.metadata || {} - const filepath = typeof metadata["filepath"] === "string" ? metadata["filepath"] : "" - const diff = typeof metadata["diff"] === "string" ? metadata["diff"] : "" - - const content = await Bun.file(filepath).text() - const newContent = getNewContent(content, diff) - - if (newContent) { - this.connection.writeTextFile({ - sessionId: sessionId, - path: filepath, - content: newContent, - }) - } - } - await this.config.sdk.permission.reply({ + return undefined + }) + + if (!res) return + if (res.outcome.outcome !== "selected") { + await this.sdk.permission.reply({ requestID: permission.id, - reply: res.outcome.optionId as "once" | "always" | "reject", + reply: "reject", directory, }) - } catch (err) { - log.error("unexpected error when handling permission", { error: err }) - } finally { - break + return } - case "message.part.updated": - log.info("message part updated", { event: event.properties }) - try { - const props = event.properties - const { part } = props - - const message = await this.config.sdk.session - .message( - { - sessionID: part.sessionID, - messageID: part.messageID, - directory, + if (res.outcome.optionId !== "reject" && permission.permission == "edit") { + const metadata = permission.metadata || {} + const filepath = typeof metadata["filepath"] === "string" ? metadata["filepath"] : "" + const diff = typeof metadata["diff"] === "string" ? metadata["diff"] : "" + + const content = await Bun.file(filepath).text() + const newContent = getNewContent(content, diff) + + if (newContent) { + this.connection.writeTextFile({ + sessionId: session.id, + path: filepath, + content: newContent, + }) + } + } + + await this.sdk.permission.reply({ + requestID: permission.id, + reply: res.outcome.optionId as "once" | "always" | "reject", + directory, + }) + }) + .catch((error) => { + log.error("failed to handle permission", { error, permissionID: permission.id }) + }) + .finally(() => { + if (this.permissionQueues.get(permission.sessionID) === next) { + this.permissionQueues.delete(permission.sessionID) + } + }) + this.permissionQueues.set(permission.sessionID, next) + return + } + + case "message.part.updated": { + log.info("message part updated", { event: event.properties }) + const props = event.properties + const part = props.part + const session = this.sessionManager.tryGet(part.sessionID) + if (!session) return + const sessionId = session.id + const directory = session.cwd + + const message = await this.sdk.session + .message( + { + sessionID: part.sessionID, + messageID: part.messageID, + directory, + }, + { throwOnError: true }, + ) + .then((x) => x.data) + .catch((error) => { + log.error("unexpected error when fetching message", { error }) + return undefined + }) + + if (!message || message.info.role !== "assistant") return + + if (part.type === "tool") { + switch (part.state.status) { + case "pending": + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call", + toolCallId: part.callID, + title: part.tool, + kind: toToolKind(part.tool), + status: "pending", + locations: [], + rawInput: {}, }, - { throwOnError: true }, - ) - .then((x) => x.data) - .catch((err) => { - log.error("unexpected error when fetching message", { error: err }) - return undefined }) + .catch((error) => { + log.error("failed to send tool pending to ACP", { error }) + }) + return - if (!message || message.info.role !== "assistant") return - - if (part.type === "tool") { - switch (part.state.status) { - case "pending": - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call", - toolCallId: part.callID, - title: part.tool, - kind: toToolKind(part.tool), - status: "pending", - locations: [], - rawInput: {}, - }, - }) - .catch((err) => { - log.error("failed to send tool pending to ACP", { error: err }) - }) - break - case "running": - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "in_progress", - kind: toToolKind(part.tool), - title: part.tool, - locations: toLocations(part.tool, part.state.input), - rawInput: part.state.input, - }, - }) - .catch((err) => { - log.error("failed to send tool in_progress to ACP", { error: err }) - }) - break - case "completed": - const kind = toToolKind(part.tool) - const content: ToolCallContent[] = [ - { - type: "content", - content: { - type: "text", - text: part.state.output, - }, - }, - ] - - if (kind === "edit") { - const input = part.state.input - const filePath = typeof input["filePath"] === "string" ? input["filePath"] : "" - const oldText = typeof input["oldString"] === "string" ? input["oldString"] : "" - const newText = - typeof input["newString"] === "string" - ? input["newString"] - : typeof input["content"] === "string" - ? input["content"] - : "" - content.push({ - type: "diff", - path: filePath, - oldText, - newText, - }) - } - - if (part.tool === "todowrite") { - const parsedTodos = z.array(Todo.Info).safeParse(JSON.parse(part.state.output)) - if (parsedTodos.success) { - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "plan", - entries: parsedTodos.data.map((todo) => { - const status: PlanEntry["status"] = - todo.status === "cancelled" ? "completed" : (todo.status as PlanEntry["status"]) - return { - priority: "medium", - status, - content: todo.content, - } - }), - }, - }) - .catch((err) => { - log.error("failed to send session update for todo", { error: err }) - }) - } else { - log.error("failed to parse todo output", { error: parsedTodos.error }) - } - } - - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "completed", - kind, - content, - title: part.state.title, - rawInput: part.state.input, - rawOutput: { - output: part.state.output, - metadata: part.state.metadata, - }, - }, - }) - .catch((err) => { - log.error("failed to send tool completed to ACP", { error: err }) - }) - break - case "error": - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "failed", - kind: toToolKind(part.tool), - title: part.tool, - rawInput: part.state.input, - content: [ - { - type: "content", - content: { - type: "text", - text: part.state.error, - }, - }, - ], - rawOutput: { - error: part.state.error, - }, - }, - }) - .catch((err) => { - log.error("failed to send tool error to ACP", { error: err }) - }) - break - } - } else if (part.type === "text") { - const delta = props.delta - if (delta && part.synthetic !== true) { + case "running": + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "in_progress", + kind: toToolKind(part.tool), + title: part.tool, + locations: toLocations(part.tool, part.state.input), + rawInput: part.state.input, + }, + }) + .catch((error) => { + log.error("failed to send tool in_progress to ACP", { error }) + }) + return + + case "completed": { + const kind = toToolKind(part.tool) + const content: ToolCallContent[] = [ + { + type: "content", + content: { + type: "text", + text: part.state.output, + }, + }, + ] + + if (kind === "edit") { + const input = part.state.input + const filePath = typeof input["filePath"] === "string" ? input["filePath"] : "" + const oldText = typeof input["oldString"] === "string" ? input["oldString"] : "" + const newText = + typeof input["newString"] === "string" + ? input["newString"] + : typeof input["content"] === "string" + ? input["content"] + : "" + content.push({ + type: "diff", + path: filePath, + oldText, + newText, + }) + } + + if (part.tool === "todowrite") { + const parsedTodos = z.array(Todo.Info).safeParse(JSON.parse(part.state.output)) + if (parsedTodos.success) { await this.connection .sessionUpdate({ sessionId, update: { - sessionUpdate: "agent_message_chunk", - content: { - type: "text", - text: delta, - }, + sessionUpdate: "plan", + entries: parsedTodos.data.map((todo) => { + const status: PlanEntry["status"] = + todo.status === "cancelled" ? "completed" : (todo.status as PlanEntry["status"]) + return { + priority: "medium", + status, + content: todo.content, + } + }), }, }) - .catch((err) => { - log.error("failed to send text to ACP", { error: err }) + .catch((error) => { + log.error("failed to send session update for todo", { error }) }) + } else { + log.error("failed to parse todo output", { error: parsedTodos.error }) } - } else if (part.type === "reasoning") { - const delta = props.delta - if (delta) { - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "agent_thought_chunk", + } + + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "completed", + kind, + content, + title: part.state.title, + rawInput: part.state.input, + rawOutput: { + output: part.state.output, + metadata: part.state.metadata, + }, + }, + }) + .catch((error) => { + log.error("failed to send tool completed to ACP", { error }) + }) + return + } + case "error": + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "failed", + kind: toToolKind(part.tool), + title: part.tool, + rawInput: part.state.input, + content: [ + { + type: "content", content: { type: "text", - text: delta, + text: part.state.error, }, }, - }) - .catch((err) => { - log.error("failed to send reasoning to ACP", { error: err }) - }) - } - } - } finally { - break - } + ], + rawOutput: { + error: part.state.error, + }, + }, + }) + .catch((error) => { + log.error("failed to send tool error to ACP", { error }) + }) + return + } + } + + if (part.type === "text") { + const delta = props.delta + if (delta && part.synthetic !== true) { + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "agent_message_chunk", + content: { + type: "text", + text: delta, + }, + }, + }) + .catch((error) => { + log.error("failed to send text to ACP", { error }) + }) + } + return } + + if (part.type === "reasoning") { + const delta = props.delta + if (delta) { + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "agent_thought_chunk", + content: { + type: "text", + text: delta, + }, + }, + }) + .catch((error) => { + log.error("failed to send reasoning to ACP", { error }) + }) + } + } + return } - }) + } } async initialize(params: InitializeRequest): Promise { @@ -409,8 +459,6 @@ export namespace ACP { sessionId, }) - this.setupEventSubscriptions(state) - return { sessionId, models: load.models, @@ -436,7 +484,7 @@ export namespace ACP { const model = await defaultModel(this.config, directory) // Store ACP session state - const state = await this.sessionManager.load(sessionId, params.cwd, params.mcpServers, model) + await this.sessionManager.load(sessionId, params.cwd, params.mcpServers, model) log.info("load_session", { sessionId, mcpServers: params.mcpServers.length }) @@ -446,8 +494,6 @@ export namespace ACP { sessionId, }) - this.setupEventSubscriptions(state) - // Replay session history const messages = await this.sdk.session .messages( diff --git a/packages/opencode/src/acp/session.ts b/packages/opencode/src/acp/session.ts index 70b658347055..151fa5646ba8 100644 --- a/packages/opencode/src/acp/session.ts +++ b/packages/opencode/src/acp/session.ts @@ -13,6 +13,10 @@ export class ACPSessionManager { this.sdk = sdk } + tryGet(sessionId: string): ACPSessionState | undefined { + return this.sessions.get(sessionId) + } + async create(cwd: string, mcpServers: McpServer[], model?: ACPSessionState["model"]): Promise { const session = await this.sdk.session .create( diff --git a/packages/opencode/test/acp/event-subscription.test.ts b/packages/opencode/test/acp/event-subscription.test.ts new file mode 100644 index 000000000000..8e139ff59732 --- /dev/null +++ b/packages/opencode/test/acp/event-subscription.test.ts @@ -0,0 +1,436 @@ +import { describe, expect, test } from "bun:test" +import { ACP } from "../../src/acp/agent" +import type { AgentSideConnection } from "@agentclientprotocol/sdk" +import type { Event } from "@opencode-ai/sdk/v2" +import { Instance } from "../../src/project/instance" +import { tmpdir } from "../fixture/fixture" + +type SessionUpdateParams = Parameters[0] +type RequestPermissionParams = Parameters[0] +type RequestPermissionResult = Awaited> + +type GlobalEventEnvelope = { + directory?: string + payload?: Event +} + +type EventController = { + push: (event: GlobalEventEnvelope) => void + close: () => void +} + +function createEventStream() { + const queue: GlobalEventEnvelope[] = [] + const waiters: Array<(value: GlobalEventEnvelope | undefined) => void> = [] + const state = { closed: false } + + const push = (event: GlobalEventEnvelope) => { + const waiter = waiters.shift() + if (waiter) { + waiter(event) + return + } + queue.push(event) + } + + const close = () => { + state.closed = true + for (const waiter of waiters.splice(0)) { + waiter(undefined) + } + } + + const stream = async function* (signal?: AbortSignal) { + while (true) { + if (signal?.aborted) return + const next = queue.shift() + if (next) { + yield next + continue + } + if (state.closed) return + const value = await new Promise((resolve) => { + waiters.push(resolve) + if (!signal) return + signal.addEventListener("abort", () => resolve(undefined), { once: true }) + }) + if (!value) return + yield value + } + } + + return { controller: { push, close } satisfies EventController, stream } +} + +function createFakeAgent() { + const updates = new Map() + const chunks = new Map() + const record = (sessionId: string, type: string) => { + const list = updates.get(sessionId) ?? [] + list.push(type) + updates.set(sessionId, list) + } + + const connection = { + async sessionUpdate(params: SessionUpdateParams) { + const update = params.update + const type = update?.sessionUpdate ?? "unknown" + record(params.sessionId, type) + if (update?.sessionUpdate === "agent_message_chunk") { + const content = update.content + if (content?.type !== "text") return + if (typeof content.text !== "string") return + chunks.set(params.sessionId, (chunks.get(params.sessionId) ?? "") + content.text) + } + }, + async requestPermission(_params: RequestPermissionParams): Promise { + return { outcome: { outcome: "selected", optionId: "once" } } as RequestPermissionResult + }, + } as unknown as AgentSideConnection + + const { controller, stream } = createEventStream() + const calls = { + eventSubscribe: 0, + sessionCreate: 0, + } + + const sdk = { + global: { + event: async (opts?: { signal?: AbortSignal }) => { + calls.eventSubscribe++ + return { stream: stream(opts?.signal) } + }, + }, + session: { + create: async (_params?: any) => { + calls.sessionCreate++ + return { + data: { + id: `ses_${calls.sessionCreate}`, + time: { created: new Date().toISOString() }, + }, + } + }, + get: async (_params?: any) => { + return { + data: { + id: "ses_1", + time: { created: new Date().toISOString() }, + }, + } + }, + messages: async () => { + return { data: [] } + }, + message: async () => { + return { + data: { + info: { + role: "assistant", + }, + }, + } + }, + }, + permission: { + respond: async () => { + return { data: true } + }, + }, + config: { + providers: async () => { + return { + data: { + providers: [ + { + id: "opencode", + name: "opencode", + models: { + "big-pickle": { id: "big-pickle", name: "big-pickle" }, + }, + }, + ], + }, + } + }, + }, + app: { + agents: async () => { + return { + data: [ + { + name: "build", + description: "build", + mode: "agent", + }, + ], + } + }, + }, + command: { + list: async () => { + return { data: [] } + }, + }, + mcp: { + add: async () => { + return { data: true } + }, + }, + } as any + + const agent = new ACP.Agent(connection, { + sdk, + defaultModel: { providerID: "opencode", modelID: "big-pickle" }, + } as any) + + const stop = () => { + controller.close() + ;(agent as any).eventAbort.abort() + } + + return { agent, controller, calls, updates, chunks, stop, sdk, connection } +} + +describe("acp.agent event subscription", () => { + test("routes message.part.updated by the event sessionID (no cross-session pollution)", async () => { + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const { agent, controller, updates, stop } = createFakeAgent() + const cwd = "/tmp/opencode-acp-test" + + const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + + controller.push({ + directory: cwd, + payload: { + type: "message.part.updated", + properties: { + part: { + sessionID: sessionB, + messageID: "msg_1", + type: "text", + synthetic: false, + }, + delta: "hello", + }, + }, + } as any) + + await new Promise((r) => setTimeout(r, 10)) + + expect((updates.get(sessionA) ?? []).includes("agent_message_chunk")).toBe(false) + expect((updates.get(sessionB) ?? []).includes("agent_message_chunk")).toBe(true) + + stop() + }, + }) + }) + + test("keeps concurrent sessions isolated when message.part.updated events are interleaved", async () => { + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const { agent, controller, chunks, stop } = createFakeAgent() + const cwd = "/tmp/opencode-acp-test" + + const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + + const tokenA = ["ALPHA_", "111", "_X"] + const tokenB = ["BETA_", "222", "_Y"] + + const push = (sessionId: string, messageID: string, delta: string) => { + controller.push({ + directory: cwd, + payload: { + type: "message.part.updated", + properties: { + part: { + sessionID: sessionId, + messageID, + type: "text", + synthetic: false, + }, + delta, + }, + }, + } as any) + } + + push(sessionA, "msg_a", tokenA[0]) + push(sessionB, "msg_b", tokenB[0]) + push(sessionA, "msg_a", tokenA[1]) + push(sessionB, "msg_b", tokenB[1]) + push(sessionA, "msg_a", tokenA[2]) + push(sessionB, "msg_b", tokenB[2]) + + await new Promise((r) => setTimeout(r, 20)) + + const a = chunks.get(sessionA) ?? "" + const b = chunks.get(sessionB) ?? "" + + expect(a).toContain(tokenA.join("")) + expect(b).toContain(tokenB.join("")) + for (const part of tokenB) expect(a).not.toContain(part) + for (const part of tokenA) expect(b).not.toContain(part) + + stop() + }, + }) + }) + + test("does not create additional event subscriptions on repeated loadSession()", async () => { + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const { agent, calls, stop } = createFakeAgent() + const cwd = "/tmp/opencode-acp-test" + + const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + + await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) + await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) + await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) + await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) + + expect(calls.eventSubscribe).toBe(1) + + stop() + }, + }) + }) + + test("permission.asked events are handled and replied", async () => { + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const permissionReplies: string[] = [] + const { agent, controller, stop, sdk } = createFakeAgent() + sdk.permission.reply = async (params: any) => { + permissionReplies.push(params.requestID) + return { data: true } + } + const cwd = "/tmp/opencode-acp-test" + + const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + + controller.push({ + directory: cwd, + payload: { + type: "permission.asked", + properties: { + id: "perm_1", + sessionID: sessionA, + permission: "bash", + patterns: ["*"], + metadata: {}, + always: [], + }, + }, + } as any) + + await new Promise((r) => setTimeout(r, 20)) + + expect(permissionReplies).toContain("perm_1") + + stop() + }, + }) + }) + + test("permission prompt on session A does not block message updates for session B", async () => { + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const permissionReplies: string[] = [] + let resolvePermissionA: (() => void) | undefined + const permissionABlocking = new Promise((r) => { + resolvePermissionA = r + }) + + const { agent, controller, chunks, stop, sdk, connection } = createFakeAgent() + + // Make permission request for session A block until we release it + const originalRequestPermission = connection.requestPermission.bind(connection) + let permissionCalls = 0 + connection.requestPermission = async (params: RequestPermissionParams) => { + permissionCalls++ + if (params.sessionId.endsWith("1")) { + await permissionABlocking + } + return originalRequestPermission(params) + } + + sdk.permission.reply = async (params: any) => { + permissionReplies.push(params.requestID) + return { data: true } + } + + const cwd = "/tmp/opencode-acp-test" + + const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + + // Push permission.asked for session A (will block) + controller.push({ + directory: cwd, + payload: { + type: "permission.asked", + properties: { + id: "perm_a", + sessionID: sessionA, + permission: "bash", + patterns: ["*"], + metadata: {}, + always: [], + }, + }, + } as any) + + // Give time for permission handling to start + await new Promise((r) => setTimeout(r, 10)) + + // Push message for session B while A's permission is pending + controller.push({ + directory: cwd, + payload: { + type: "message.part.updated", + properties: { + part: { + sessionID: sessionB, + messageID: "msg_b", + type: "text", + synthetic: false, + }, + delta: "session_b_message", + }, + }, + } as any) + + // Wait for session B's message to be processed + await new Promise((r) => setTimeout(r, 20)) + + // Session B should have received message even though A's permission is still pending + expect(chunks.get(sessionB) ?? "").toContain("session_b_message") + expect(permissionReplies).not.toContain("perm_a") + + // Release session A's permission + resolvePermissionA!() + await new Promise((r) => setTimeout(r, 20)) + + // Now session A's permission should be replied + expect(permissionReplies).toContain("perm_a") + + stop() + }, + }) + }) +})