Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions packages/agent/src/adapters/claude/claude-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ import type {
SDKMessageFilter,
Session,
ToolUseCache,
ToolUseStreamCache,
} from "./types";

const SESSION_VALIDATION_TIMEOUT_MS = 30_000;
Expand Down Expand Up @@ -145,6 +146,7 @@ export class ClaudeAcpAgent extends BaseAcpAgent {
readonly adapterName = "claude";
declare session: Session;
toolUseCache: ToolUseCache;
toolUseStreamCache: ToolUseStreamCache;
backgroundTerminals: { [key: string]: BackgroundTerminal } = {};
clientCapabilities?: ClientCapabilities;
private options?: ClaudeAcpAgentOptions;
Expand All @@ -155,6 +157,7 @@ export class ClaudeAcpAgent extends BaseAcpAgent {
super(client);
this.options = options;
this.toolUseCache = {};
this.toolUseStreamCache = new Map();
this.logger = new Logger({ debug: true, prefix: "[ClaudeAcpAgent]" });
this.enrichment = createEnrichment(options?.posthogApiConfig, this.logger);
}
Expand Down Expand Up @@ -403,6 +406,7 @@ export class ClaudeAcpAgent extends BaseAcpAgent {
sessionId: params.sessionId,
client: this.client,
toolUseCache: this.toolUseCache,
toolUseStreamCache: this.toolUseStreamCache,
fileContentCache: this.fileContentCache,
enrichedReadCache: this.enrichedReadCache,
logger: this.logger,
Expand Down Expand Up @@ -768,6 +772,11 @@ export class ClaudeAcpAgent extends BaseAcpAgent {
}
throw error;
} finally {
// Drop any leftover streaming-input buffers. Normally cleared per index
// on `content_block_stop`, but a cancelled or errored turn may leave
// entries behind; without this they'd carry over into the next turn
// and collide with new content-block indices.
this.toolUseStreamCache.clear();
if (!handedOff) {
this.session.promptRunning = false;
// Resolve all remaining pending prompts so no callers get stuck.
Expand Down Expand Up @@ -1528,6 +1537,7 @@ export class ClaudeAcpAgent extends BaseAcpAgent {
sessionId,
client: this.client,
toolUseCache: this.toolUseCache,
toolUseStreamCache: this.toolUseStreamCache,
fileContentCache: this.fileContentCache,
enrichedReadCache: this.enrichedReadCache,
logger: this.logger,
Expand Down
73 changes: 67 additions & 6 deletions packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,14 @@ import { POSTHOG_NOTIFICATIONS } from "@/acp-extensions";
import { image, text } from "../../../utils/acp-content";
import { unreachable } from "../../../utils/common";
import type { Logger } from "../../../utils/logger";
import { tryParsePartialJson } from "../../../utils/partial-json";
import { type EnrichedReadCache, registerHookCallback } from "../hooks";
import type { Session, ToolUpdateMeta, ToolUseCache } from "../types";
import type {
Session,
ToolUpdateMeta,
ToolUseCache,
ToolUseStreamCache,
} from "../types";
import {
type ClaudePlanEntry,
planEntries,
Expand Down Expand Up @@ -67,6 +73,8 @@ export interface MessageHandlerContext {
sessionId: string;
client: AgentSideConnection;
toolUseCache: ToolUseCache;
/** Buffers `input_json_delta` partial JSON per content-block index. */
toolUseStreamCache: ToolUseStreamCache;
fileContentCache: { [key: string]: string };
enrichedReadCache?: EnrichedReadCache;
logger: Logger;
Expand Down Expand Up @@ -496,6 +504,7 @@ function streamEventToAcpNotifications(
message: SDKPartialAssistantMessage,
sessionId: string,
toolUseCache: ToolUseCache,
toolUseStreamCache: ToolUseStreamCache,
fileContentCache: { [key: string]: string },
client: AgentSideConnection,
logger: Logger,
Expand All @@ -507,9 +516,16 @@ function streamEventToAcpNotifications(
): SessionNotification[] {
const event = message.event;
switch (event.type) {
case "content_block_start":
case "content_block_start": {
const block = event.content_block;
if (block.type === "tool_use" || block.type === "mcp_tool_use") {
toolUseStreamCache.set(event.index, {
toolUseId: block.id,
partialJson: "",
});
}
return toAcpNotifications(
[event.content_block],
[block],
"assistant",
sessionId,
toolUseCache,
Expand All @@ -523,7 +539,16 @@ function streamEventToAcpNotifications(
undefined,
enrichedReadCache,
);
case "content_block_delta":
}
case "content_block_delta": {
if (event.delta.type === "input_json_delta") {
return inputJsonDeltaToAcpNotifications(
event.index,
event.delta.partial_json,
sessionId,
toolUseStreamCache,
);
}
return toAcpNotifications(
[event.delta],
"assistant",
Expand All @@ -539,10 +564,13 @@ function streamEventToAcpNotifications(
undefined,
enrichedReadCache,
);
}
case "content_block_stop":
toolUseStreamCache.delete(event.index);
return [];
case "message_start":
case "message_delta":
case "message_stop":
case "content_block_stop":
return [];

default:
Expand All @@ -551,6 +579,31 @@ function streamEventToAcpNotifications(
}
}

function inputJsonDeltaToAcpNotifications(
index: number,
partialJson: string,
sessionId: string,
toolUseStreamCache: ToolUseStreamCache,
): SessionNotification[] {
const entry = toolUseStreamCache.get(index);
if (!entry) return [];
entry.partialJson += partialJson;

const parsed = tryParsePartialJson(entry.partialJson);
if (!parsed || typeof parsed !== "object") return [];

return [
{
sessionId,
update: {
sessionUpdate: "tool_call_update" as const,
toolCallId: entry.toolUseId,
rawInput: parsed as Record<string, unknown>,
},
},
];
}

export async function handleSystemMessage(
message: Extract<SDKMessage, { type: "system" }>,
context: MessageHandlerContext,
Expand Down Expand Up @@ -743,13 +796,21 @@ export async function handleStreamEvent(
message: SDKPartialAssistantMessage,
context: MessageHandlerContext,
): Promise<void> {
const { sessionId, client, toolUseCache, fileContentCache, logger } = context;
const {
sessionId,
client,
toolUseCache,
toolUseStreamCache,
fileContentCache,
logger,
} = context;
const parentToolCallId = message.parent_tool_use_id ?? undefined;

for (const notification of streamEventToAcpNotifications(
message,
sessionId,
toolUseCache,
toolUseStreamCache,
fileContentCache,
client,
logger,
Expand Down
10 changes: 10 additions & 0 deletions packages/agent/src/adapters/claude/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ export type ToolUseCache = {
};
};

/**
* Per-content-block-index buffer for tool inputs streamed via
* `input_json_delta` events. Keyed by the Anthropic content block index
* (which resets per assistant message). Cleared on `content_block_stop`.
*/
export type ToolUseStreamCache = Map<
number,
Comment thread
skoob13 marked this conversation as resolved.
{ toolUseId: string; partialJson: string }
>;

export type TerminalInfo = {
terminal_id: string;
};
Expand Down
72 changes: 72 additions & 0 deletions packages/agent/src/utils/partial-json.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { describe, expect, it } from "vitest";
import { tryParsePartialJson } from "./partial-json";

describe("tryParsePartialJson", () => {
it("returns null for empty / whitespace input", () => {
expect(tryParsePartialJson("")).toBeNull();
expect(tryParsePartialJson(" ")).toBeNull();
});

it("parses complete JSON unchanged", () => {
expect(tryParsePartialJson('{"a":1}')).toEqual({ a: 1 });
expect(tryParsePartialJson("[1,2,3]")).toEqual([1, 2, 3]);
expect(tryParsePartialJson('"hello"')).toBe("hello");
});
Comment thread
skoob13 marked this conversation as resolved.

it("closes a single open object", () => {
expect(tryParsePartialJson("{")).toEqual({});
});

it("closes a partial string value and the surrounding object", () => {
expect(tryParsePartialJson('{"command": "call execute-')).toEqual({
command: "call execute-",
});
});

it("closes a complete string value with no closing brace", () => {
expect(tryParsePartialJson('{"command": "tools"')).toEqual({
command: "tools",
});
});

it("strips a trailing comma after a complete entry", () => {
expect(tryParsePartialJson('{"a": 1,')).toEqual({ a: 1 });
});

it("drops a trailing partial key with no value", () => {
expect(tryParsePartialJson('{"a": 1, "b":')).toEqual({ a: 1 });
expect(tryParsePartialJson('{"a": 1, "b"')).toEqual({ a: 1 });
});

it("handles nested objects and arrays mid-stream", () => {
expect(tryParsePartialJson('{"q": {"sql": "SELECT 1')).toEqual({
q: { sql: "SELECT 1" },
});
expect(tryParsePartialJson('{"items": [1, 2, 3')).toEqual({
items: [1, 2, 3],
});
});

it("respects escaped quotes inside strings", () => {
expect(tryParsePartialJson('{"q": "say \\"hi\\"')).toEqual({
q: 'say "hi"',
});
});

it("returns null when nothing parseable can be reconstructed", () => {
// Garbage that can't be balanced into valid JSON.
expect(tryParsePartialJson("not json at all")).toBeNull();
});

it("parses a typical exec command incrementally", () => {
// Simulate growth of a streamed { command: "call dashboard-update {...}" }
expect(tryParsePartialJson('{"command":')).toEqual({});
expect(tryParsePartialJson('{"command": "ca')).toEqual({ command: "ca" });
expect(
tryParsePartialJson('{"command": "call dashboard-update {\\"id\\":'),
).toEqual({ command: 'call dashboard-update {"id":' });
expect(
tryParsePartialJson('{"command": "call dashboard-update {\\"id\\": 1}"}'),
).toEqual({ command: 'call dashboard-update {"id": 1}' });
});
});
68 changes: 68 additions & 0 deletions packages/agent/src/utils/partial-json.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Best-effort parser for incomplete JSON streamed via Anthropic's
* `input_json_delta` events. Used to surface tool inputs while they are still
* being generated so the UI can show the args during execution instead of
* waiting for the finalized assistant message.
*
* Strategy: walk the input tracking open `{`/`[` and quote/escape state, then
* try a few completions in order of likelihood (close any open string, drop
* trailing commas/colons or partial keys, then close any open brackets).
* Returns `null` when no completion parses — callers should silently skip
* that delta and wait for more input.
*/
export function tryParsePartialJson(s: string): unknown {
const trimmed = s.trim();
if (!trimmed) return null;

// Fast path: complete JSON.
try {
return JSON.parse(trimmed);
} catch {}

const closers: string[] = [];
let inString = false;
let escaped = false;

for (let i = 0; i < trimmed.length; i++) {
const ch = trimmed[i];
if (inString) {
if (escaped) {
escaped = false;
} else if (ch === "\\") {
escaped = true;
} else if (ch === '"') {
inString = false;
}
continue;
}
if (ch === '"') inString = true;
else if (ch === "{") closers.push("}");
else if (ch === "[") closers.push("]");
else if (ch === "}" || ch === "]") closers.pop();
}

const closeBrackets = (str: string): string => {
let out = str;
for (let i = closers.length - 1; i >= 0; i--) out += closers[i];
return out;
};

const candidates: string[] = [];

// 1. Close any open string + brackets.
const closedString = inString ? `${trimmed}"` : trimmed;
candidates.push(closeBrackets(closedString));

// 2. Drop trailing partial token (comma, colon, or `"key":`/`"key"`)
// and close brackets.
let stripped = closedString.replace(/[,:]\s*$/, "");
stripped = stripped.replace(/,?\s*"[^"]*"\s*:?\s*$/, "");
candidates.push(closeBrackets(stripped));

for (const candidate of candidates) {
try {
return JSON.parse(candidate);
} catch {}
}
return null;
}
Loading