From 188c7825e797f30d3ac3734d3cbe11b5d0a68420 Mon Sep 17 00:00:00 2001 From: Georgiy Tarasov Date: Mon, 4 May 2026 13:51:01 +0200 Subject: [PATCH 1/2] fix(agent): stream tool input as it arrives during execution Surface tool args while a tool is still running instead of waiting for the finalized assistant message. Accumulate `input_json_delta` events per content-block index, parse the partial JSON best-effort, and emit `tool_call_update`s with the parsed-so-far rawInput. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../agent/src/adapters/claude/claude-agent.ts | 10 +++ .../adapters/claude/conversion/sdk-to-acp.ts | 74 +++++++++++++++++-- packages/agent/src/adapters/claude/types.ts | 10 +++ packages/agent/src/utils/partial-json.test.ts | 72 ++++++++++++++++++ packages/agent/src/utils/partial-json.ts | 68 +++++++++++++++++ 5 files changed, 228 insertions(+), 6 deletions(-) create mode 100644 packages/agent/src/utils/partial-json.test.ts create mode 100644 packages/agent/src/utils/partial-json.ts diff --git a/packages/agent/src/adapters/claude/claude-agent.ts b/packages/agent/src/adapters/claude/claude-agent.ts index a715cb510..4210f955d 100644 --- a/packages/agent/src/adapters/claude/claude-agent.ts +++ b/packages/agent/src/adapters/claude/claude-agent.ts @@ -103,6 +103,7 @@ import type { SDKMessageFilter, Session, ToolUseCache, + ToolUseStreamCache, } from "./types"; const SESSION_VALIDATION_TIMEOUT_MS = 30_000; @@ -145,6 +146,7 @@ export class ClaudeAcpAgent extends BaseAcpAgent { readonly adapterName = "claude"; declare session: Session; toolUseCache: ToolUseCache; + toolUseStreamCache: ToolUseStreamCache; backgroundTerminals: { [key: string]: BackgroundTerminal } = {}; clientCapabilities?: ClientCapabilities; private options?: ClaudeAcpAgentOptions; @@ -155,6 +157,7 @@ export class ClaudeAcpAgent extends BaseAcpAgent { super(client); this.options = options; this.toolUseCache = {}; + this.toolUseStreamCache = new Map(); this.logger = new Logger({ debug: true, prefix: "[ClaudeAcpAgent]" }); this.enrichment = createEnrichment(options?.posthogApiConfig, this.logger); } @@ -403,6 +406,7 @@ export class ClaudeAcpAgent extends BaseAcpAgent { sessionId: params.sessionId, client: this.client, toolUseCache: this.toolUseCache, + toolUseStreamCache: this.toolUseStreamCache, fileContentCache: this.fileContentCache, enrichedReadCache: this.enrichedReadCache, logger: this.logger, @@ -768,6 +772,11 @@ export class ClaudeAcpAgent extends BaseAcpAgent { } throw error; } finally { + // Drop any leftover streaming-input buffers. Normally cleared per index + // on `content_block_stop`, but a cancelled or errored turn may leave + // entries behind; without this they'd carry over into the next turn + // and collide with new content-block indices. + this.toolUseStreamCache.clear(); if (!handedOff) { this.session.promptRunning = false; // Resolve all remaining pending prompts so no callers get stuck. @@ -1528,6 +1537,7 @@ export class ClaudeAcpAgent extends BaseAcpAgent { sessionId, client: this.client, toolUseCache: this.toolUseCache, + toolUseStreamCache: this.toolUseStreamCache, fileContentCache: this.fileContentCache, enrichedReadCache: this.enrichedReadCache, logger: this.logger, diff --git a/packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts b/packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts index 847bf6f6b..75fb07dae 100644 --- a/packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts +++ b/packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts @@ -21,8 +21,14 @@ import { POSTHOG_NOTIFICATIONS } from "@/acp-extensions"; import { image, text } from "../../../utils/acp-content"; import { unreachable } from "../../../utils/common"; import type { Logger } from "../../../utils/logger"; +import { tryParsePartialJson } from "../../../utils/partial-json"; import { type EnrichedReadCache, registerHookCallback } from "../hooks"; -import type { Session, ToolUpdateMeta, ToolUseCache } from "../types"; +import type { + Session, + ToolUpdateMeta, + ToolUseCache, + ToolUseStreamCache, +} from "../types"; import { type ClaudePlanEntry, planEntries, @@ -67,6 +73,8 @@ export interface MessageHandlerContext { sessionId: string; client: AgentSideConnection; toolUseCache: ToolUseCache; + /** Buffers `input_json_delta` partial JSON per content-block index. */ + toolUseStreamCache: ToolUseStreamCache; fileContentCache: { [key: string]: string }; enrichedReadCache?: EnrichedReadCache; logger: Logger; @@ -496,6 +504,7 @@ function streamEventToAcpNotifications( message: SDKPartialAssistantMessage, sessionId: string, toolUseCache: ToolUseCache, + toolUseStreamCache: ToolUseStreamCache, fileContentCache: { [key: string]: string }, client: AgentSideConnection, logger: Logger, @@ -507,9 +516,17 @@ function streamEventToAcpNotifications( ): SessionNotification[] { const event = message.event; switch (event.type) { - case "content_block_start": + case "content_block_start": { + const block = event.content_block; + if (block.type === "tool_use" || block.type === "mcp_tool_use") { + toolUseStreamCache.set(event.index, { + toolUseId: block.id, + toolName: block.name, + partialJson: "", + }); + } return toAcpNotifications( - [event.content_block], + [block], "assistant", sessionId, toolUseCache, @@ -523,7 +540,16 @@ function streamEventToAcpNotifications( undefined, enrichedReadCache, ); - case "content_block_delta": + } + case "content_block_delta": { + if (event.delta.type === "input_json_delta") { + return inputJsonDeltaToAcpNotifications( + event.index, + event.delta.partial_json, + sessionId, + toolUseStreamCache, + ); + } return toAcpNotifications( [event.delta], "assistant", @@ -539,10 +565,13 @@ function streamEventToAcpNotifications( undefined, enrichedReadCache, ); + } + case "content_block_stop": + toolUseStreamCache.delete(event.index); + return []; case "message_start": case "message_delta": case "message_stop": - case "content_block_stop": return []; default: @@ -551,6 +580,31 @@ function streamEventToAcpNotifications( } } +function inputJsonDeltaToAcpNotifications( + index: number, + partialJson: string, + sessionId: string, + toolUseStreamCache: ToolUseStreamCache, +): SessionNotification[] { + const entry = toolUseStreamCache.get(index); + if (!entry) return []; + entry.partialJson += partialJson; + + const parsed = tryParsePartialJson(entry.partialJson); + if (!parsed || typeof parsed !== "object") return []; + + return [ + { + sessionId, + update: { + sessionUpdate: "tool_call_update" as const, + toolCallId: entry.toolUseId, + rawInput: parsed as Record, + }, + }, + ]; +} + export async function handleSystemMessage( message: Extract, context: MessageHandlerContext, @@ -743,13 +797,21 @@ export async function handleStreamEvent( message: SDKPartialAssistantMessage, context: MessageHandlerContext, ): Promise { - const { sessionId, client, toolUseCache, fileContentCache, logger } = context; + const { + sessionId, + client, + toolUseCache, + toolUseStreamCache, + fileContentCache, + logger, + } = context; const parentToolCallId = message.parent_tool_use_id ?? undefined; for (const notification of streamEventToAcpNotifications( message, sessionId, toolUseCache, + toolUseStreamCache, fileContentCache, client, logger, diff --git a/packages/agent/src/adapters/claude/types.ts b/packages/agent/src/adapters/claude/types.ts index 1efd77258..2af7e13c5 100644 --- a/packages/agent/src/adapters/claude/types.ts +++ b/packages/agent/src/adapters/claude/types.ts @@ -76,6 +76,16 @@ export type ToolUseCache = { }; }; +/** + * Per-content-block-index buffer for tool inputs streamed via + * `input_json_delta` events. Keyed by the Anthropic content block index + * (which resets per assistant message). Cleared on `content_block_stop`. + */ +export type ToolUseStreamCache = Map< + number, + { toolUseId: string; toolName: string; partialJson: string } +>; + export type TerminalInfo = { terminal_id: string; }; diff --git a/packages/agent/src/utils/partial-json.test.ts b/packages/agent/src/utils/partial-json.test.ts new file mode 100644 index 000000000..b3b7a86e8 --- /dev/null +++ b/packages/agent/src/utils/partial-json.test.ts @@ -0,0 +1,72 @@ +import { describe, expect, it } from "vitest"; +import { tryParsePartialJson } from "./partial-json"; + +describe("tryParsePartialJson", () => { + it("returns null for empty / whitespace input", () => { + expect(tryParsePartialJson("")).toBeNull(); + expect(tryParsePartialJson(" ")).toBeNull(); + }); + + it("parses complete JSON unchanged", () => { + expect(tryParsePartialJson('{"a":1}')).toEqual({ a: 1 }); + expect(tryParsePartialJson("[1,2,3]")).toEqual([1, 2, 3]); + expect(tryParsePartialJson('"hello"')).toBe("hello"); + }); + + it("closes a single open object", () => { + expect(tryParsePartialJson("{")).toEqual({}); + }); + + it("closes a partial string value and the surrounding object", () => { + expect(tryParsePartialJson('{"command": "call execute-')).toEqual({ + command: "call execute-", + }); + }); + + it("closes a complete string value with no closing brace", () => { + expect(tryParsePartialJson('{"command": "tools"')).toEqual({ + command: "tools", + }); + }); + + it("strips a trailing comma after a complete entry", () => { + expect(tryParsePartialJson('{"a": 1,')).toEqual({ a: 1 }); + }); + + it("drops a trailing partial key with no value", () => { + expect(tryParsePartialJson('{"a": 1, "b":')).toEqual({ a: 1 }); + expect(tryParsePartialJson('{"a": 1, "b"')).toEqual({ a: 1 }); + }); + + it("handles nested objects and arrays mid-stream", () => { + expect(tryParsePartialJson('{"q": {"sql": "SELECT 1')).toEqual({ + q: { sql: "SELECT 1" }, + }); + expect(tryParsePartialJson('{"items": [1, 2, 3')).toEqual({ + items: [1, 2, 3], + }); + }); + + it("respects escaped quotes inside strings", () => { + expect(tryParsePartialJson('{"q": "say \\"hi\\"')).toEqual({ + q: 'say "hi"', + }); + }); + + it("returns null when nothing parseable can be reconstructed", () => { + // Garbage that can't be balanced into valid JSON. + expect(tryParsePartialJson("not json at all")).toBeNull(); + }); + + it("parses a typical exec command incrementally", () => { + // Simulate growth of a streamed { command: "call dashboard-update {...}" } + expect(tryParsePartialJson('{"command":')).toEqual({}); + expect(tryParsePartialJson('{"command": "ca')).toEqual({ command: "ca" }); + expect( + tryParsePartialJson('{"command": "call dashboard-update {\\"id\\":'), + ).toEqual({ command: 'call dashboard-update {"id":' }); + expect( + tryParsePartialJson('{"command": "call dashboard-update {\\"id\\": 1}"}'), + ).toEqual({ command: 'call dashboard-update {"id": 1}' }); + }); +}); diff --git a/packages/agent/src/utils/partial-json.ts b/packages/agent/src/utils/partial-json.ts new file mode 100644 index 000000000..3529166a0 --- /dev/null +++ b/packages/agent/src/utils/partial-json.ts @@ -0,0 +1,68 @@ +/** + * Best-effort parser for incomplete JSON streamed via Anthropic's + * `input_json_delta` events. Used to surface tool inputs while they are still + * being generated so the UI can show the args during execution instead of + * waiting for the finalized assistant message. + * + * Strategy: walk the input tracking open `{`/`[` and quote/escape state, then + * try a few completions in order of likelihood (close any open string, drop + * trailing commas/colons or partial keys, then close any open brackets). + * Returns `null` when no completion parses — callers should silently skip + * that delta and wait for more input. + */ +export function tryParsePartialJson(s: string): unknown | null { + const trimmed = s.trim(); + if (!trimmed) return null; + + // Fast path: complete JSON. + try { + return JSON.parse(trimmed); + } catch {} + + const closers: string[] = []; + let inString = false; + let escaped = false; + + for (let i = 0; i < trimmed.length; i++) { + const ch = trimmed[i]; + if (inString) { + if (escaped) { + escaped = false; + } else if (ch === "\\") { + escaped = true; + } else if (ch === '"') { + inString = false; + } + continue; + } + if (ch === '"') inString = true; + else if (ch === "{") closers.push("}"); + else if (ch === "[") closers.push("]"); + else if (ch === "}" || ch === "]") closers.pop(); + } + + const closeBrackets = (str: string): string => { + let out = str; + for (let i = closers.length - 1; i >= 0; i--) out += closers[i]; + return out; + }; + + const candidates: string[] = []; + + // 1. Close any open string + brackets. + const closedString = inString ? `${trimmed}"` : trimmed; + candidates.push(closeBrackets(closedString)); + + // 2. Drop trailing partial token (comma, colon, or `"key":`/`"key"`) + // and close brackets. + let stripped = closedString.replace(/[,:]\s*$/, ""); + stripped = stripped.replace(/,?\s*"[^"]*"\s*:?\s*$/, ""); + candidates.push(closeBrackets(stripped)); + + for (const candidate of candidates) { + try { + return JSON.parse(candidate); + } catch {} + } + return null; +} From b0b14118b06610b3605e4c7cea497d480270250e Mon Sep 17 00:00:00 2001 From: Georgiy Tarasov Date: Mon, 4 May 2026 13:57:12 +0200 Subject: [PATCH 2/2] fixes --- packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts | 1 - packages/agent/src/adapters/claude/types.ts | 2 +- packages/agent/src/utils/partial-json.ts | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts b/packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts index 75fb07dae..d926861c7 100644 --- a/packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts +++ b/packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts @@ -521,7 +521,6 @@ function streamEventToAcpNotifications( if (block.type === "tool_use" || block.type === "mcp_tool_use") { toolUseStreamCache.set(event.index, { toolUseId: block.id, - toolName: block.name, partialJson: "", }); } diff --git a/packages/agent/src/adapters/claude/types.ts b/packages/agent/src/adapters/claude/types.ts index 2af7e13c5..646a14434 100644 --- a/packages/agent/src/adapters/claude/types.ts +++ b/packages/agent/src/adapters/claude/types.ts @@ -83,7 +83,7 @@ export type ToolUseCache = { */ export type ToolUseStreamCache = Map< number, - { toolUseId: string; toolName: string; partialJson: string } + { toolUseId: string; partialJson: string } >; export type TerminalInfo = { diff --git a/packages/agent/src/utils/partial-json.ts b/packages/agent/src/utils/partial-json.ts index 3529166a0..ffdcc5901 100644 --- a/packages/agent/src/utils/partial-json.ts +++ b/packages/agent/src/utils/partial-json.ts @@ -10,7 +10,7 @@ * Returns `null` when no completion parses — callers should silently skip * that delta and wait for more input. */ -export function tryParsePartialJson(s: string): unknown | null { +export function tryParsePartialJson(s: string): unknown { const trimmed = s.trim(); if (!trimmed) return null;