From 80624b999030d495855c89779e740c9eac53fb59 Mon Sep 17 00:00:00 2001 From: cte Date: Wed, 18 Feb 2026 23:43:30 -0800 Subject: [PATCH 1/2] feat(cli): add NDJSON stdin protocol, list subcommands, and modularize run.ts Overhaul the stdin prompt stream from raw text lines to a structured NDJSON command protocol (start/message/cancel/ping/shutdown) with requestId correlation, ack/done/error lifecycle events, and queue telemetry. Add list subcommands (commands, modes, models) for programmatic discovery. Extract stdin stream logic from run.ts into stdin-stream.ts and add shared isRecord guard utility. Includes unit tests for all new modules. Co-Authored-By: Claude Opus 4.6 --- apps/cli/scripts/test-stdin-stream.ts | 24 +- .../json-event-emitter-control.test.ts | 170 +++++ apps/cli/src/agent/json-event-emitter.ts | 81 ++- .../src/commands/cli/__tests__/list.test.ts | 29 + .../cli/__tests__/parse-stdin-command.test.ts | 104 +++ apps/cli/src/commands/cli/index.ts | 1 + apps/cli/src/commands/cli/list.ts | 287 ++++++++ apps/cli/src/commands/cli/run.ts | 195 +----- apps/cli/src/commands/cli/stdin-stream.ts | 610 ++++++++++++++++++ apps/cli/src/index.ts | 47 +- .../src/lib/utils/__tests__/guards.test.ts | 27 + apps/cli/src/lib/utils/guards.ts | 3 + apps/cli/src/types/json-events.ts | 31 + 13 files changed, 1433 insertions(+), 176 deletions(-) create mode 100644 apps/cli/src/agent/__tests__/json-event-emitter-control.test.ts create mode 100644 apps/cli/src/commands/cli/__tests__/list.test.ts create mode 100644 apps/cli/src/commands/cli/__tests__/parse-stdin-command.test.ts create mode 100644 apps/cli/src/commands/cli/list.ts create mode 100644 apps/cli/src/commands/cli/stdin-stream.ts create mode 100644 apps/cli/src/lib/utils/__tests__/guards.test.ts create mode 100644 apps/cli/src/lib/utils/guards.ts diff --git a/apps/cli/scripts/test-stdin-stream.ts b/apps/cli/scripts/test-stdin-stream.ts index 5212df5b335..569c30adbb4 100644 --- a/apps/cli/scripts/test-stdin-stream.ts +++ b/apps/cli/scripts/test-stdin-stream.ts @@ -27,6 +27,16 @@ async function main() { console.log("[wrapper] Type a message and press Enter to send it.") console.log("[wrapper] Type /exit to close stdin and let the CLI finish.") + let requestCounter = 0 + let hasStartedTask = false + + const sendCommand = (payload: Record) => { + if (child.stdin?.destroyed) { + return + } + child.stdin?.write(JSON.stringify(payload) + "\n") + } + const rl = readline.createInterface({ input: process.stdin, output: process.stdout, @@ -36,14 +46,22 @@ async function main() { rl.on("line", (line) => { if (line.trim() === "/exit") { console.log("[wrapper] Closing stdin...") + sendCommand({ + command: "shutdown", + requestId: `shutdown-${Date.now()}-${++requestCounter}`, + }) child.stdin?.end() rl.close() return } - if (!child.stdin?.destroyed) { - child.stdin?.write(`${line}\n`) - } + const command = hasStartedTask ? "message" : "start" + sendCommand({ + command, + requestId: `${command}-${Date.now()}-${++requestCounter}`, + prompt: line, + }) + hasStartedTask = true }) const onSignal = (signal: NodeJS.Signals) => { diff --git a/apps/cli/src/agent/__tests__/json-event-emitter-control.test.ts b/apps/cli/src/agent/__tests__/json-event-emitter-control.test.ts new file mode 100644 index 00000000000..8d45538ce34 --- /dev/null +++ b/apps/cli/src/agent/__tests__/json-event-emitter-control.test.ts @@ -0,0 +1,170 @@ +import { Writable } from "stream" + +import { JsonEventEmitter } from "../json-event-emitter.js" + +function createMockStdout(): { stdout: NodeJS.WriteStream; lines: () => Record[] } { + const chunks: string[] = [] + + const writable = new Writable({ + write(chunk, _encoding, callback) { + chunks.push(chunk.toString()) + callback() + }, + }) as unknown as NodeJS.WriteStream + + // Each write is a JSON line terminated by \n + const lines = () => + chunks + .join("") + .split("\n") + .filter((l) => l.length > 0) + .map((l) => JSON.parse(l) as Record) + + return { stdout: writable, lines } +} + +describe("JsonEventEmitter control events", () => { + describe("emitControl", () => { + it("emits an ack event with type control", () => { + const { stdout, lines } = createMockStdout() + const emitter = new JsonEventEmitter({ mode: "stream-json", stdout }) + + emitter.emitControl({ + subtype: "ack", + requestId: "req-1", + command: "start", + content: "starting task", + code: "accepted", + success: true, + }) + + const output = lines() + expect(output).toHaveLength(1) + expect(output[0]!).toMatchObject({ + type: "control", + subtype: "ack", + requestId: "req-1", + command: "start", + content: "starting task", + code: "accepted", + success: true, + }) + expect(output[0]!.done).toBeUndefined() + }) + + it("sets done: true for done events", () => { + const { stdout, lines } = createMockStdout() + const emitter = new JsonEventEmitter({ mode: "stream-json", stdout }) + + emitter.emitControl({ + subtype: "done", + requestId: "req-2", + command: "start", + content: "task completed", + code: "task_completed", + success: true, + }) + + const output = lines() + expect(output[0]!).toMatchObject({ type: "control", subtype: "done", done: true }) + }) + + it("does not set done for error events", () => { + const { stdout, lines } = createMockStdout() + const emitter = new JsonEventEmitter({ mode: "stream-json", stdout }) + + emitter.emitControl({ + subtype: "error", + requestId: "req-3", + command: "start", + content: "something went wrong", + code: "task_error", + success: false, + }) + + const output = lines() + expect(output[0]!.done).toBeUndefined() + expect(output[0]!.success).toBe(false) + }) + }) + + describe("requestIdProvider", () => { + it("injects requestId from provider when event has none", () => { + const { stdout, lines } = createMockStdout() + const emitter = new JsonEventEmitter({ + mode: "stream-json", + stdout, + requestIdProvider: () => "injected-id", + }) + + emitter.emitControl({ subtype: "ack", content: "test" }) + + const output = lines() + expect(output[0]!.requestId).toBe("injected-id") + }) + + it("keeps explicit requestId when provider also returns one", () => { + const { stdout, lines } = createMockStdout() + const emitter = new JsonEventEmitter({ + mode: "stream-json", + stdout, + requestIdProvider: () => "provider-id", + }) + + emitter.emitControl({ subtype: "ack", requestId: "explicit-id", content: "test" }) + + const output = lines() + expect(output[0]!.requestId).toBe("explicit-id") + }) + + it("omits requestId when provider returns undefined and event has none", () => { + const { stdout, lines } = createMockStdout() + const emitter = new JsonEventEmitter({ + mode: "stream-json", + stdout, + requestIdProvider: () => undefined, + }) + + emitter.emitControl({ subtype: "ack", content: "test" }) + + const output = lines() + expect(output[0]!).not.toHaveProperty("requestId") + }) + }) + + describe("emitInit", () => { + it("emits system init with default schema values", () => { + const { stdout, lines } = createMockStdout() + const emitter = new JsonEventEmitter({ mode: "stream-json", stdout }) + + // emitInit requires a client — we call emitControl to test init-like fields instead. + // emitInit is called internally by attach(), so we test the init fields via options. + // Instead, directly verify the constructor defaults by emitting a control event + // and checking that the emitter was created with correct defaults. + + // We can't call emitInit without a client, but we can verify the options + // were stored correctly by checking what emitControl produces. + emitter.emitControl({ subtype: "ack", content: "test" }) + + // The control event itself doesn't include schema fields, but at least + // we verify the emitter was constructed successfully with defaults. + const output = lines() + expect(output).toHaveLength(1) + }) + + it("accepts custom schemaVersion, protocol, and capabilities", () => { + const { stdout } = createMockStdout() + + // Should not throw when constructed with custom values + const emitter = new JsonEventEmitter({ + mode: "stream-json", + stdout, + schemaVersion: 2, + protocol: "custom-protocol", + capabilities: ["stdin:start", "stdin:message"], + }) + + expect(emitter).toBeDefined() + }) + }) +}) diff --git a/apps/cli/src/agent/json-event-emitter.ts b/apps/cli/src/agent/json-event-emitter.ts index bdf96a763de..b772b13553c 100644 --- a/apps/cli/src/agent/json-event-emitter.ts +++ b/apps/cli/src/agent/json-event-emitter.ts @@ -16,7 +16,7 @@ import type { ClineMessage } from "@roo-code/types" -import type { JsonEvent, JsonEventCost, JsonFinalOutput } from "@/types/json-events.js" +import type { JsonEvent, JsonEventCost, JsonEventQueueItem, JsonFinalOutput } from "@/types/json-events.js" import type { ExtensionClient } from "./extension-client.js" import type { AgentStateChangeEvent, TaskCompletedEvent } from "./events.js" @@ -30,6 +30,14 @@ export interface JsonEventEmitterOptions { mode: "json" | "stream-json" /** Output stream (defaults to process.stdout) */ stdout?: NodeJS.WriteStream + /** Optional request id provider for correlating stream events */ + requestIdProvider?: () => string | undefined + /** Transport schema version emitted in system:init */ + schemaVersion?: number + /** Transport protocol identifier emitted in system:init */ + protocol?: string + /** Supported stdin protocol capabilities emitted in system:init */ + capabilities?: string[] } /** @@ -89,17 +97,33 @@ export class JsonEventEmitter { private events: JsonEvent[] = [] private unsubscribers: (() => void)[] = [] private lastCost: JsonEventCost | undefined + private requestIdProvider: () => string | undefined + private schemaVersion: number + private protocol: string + private capabilities: string[] private seenMessageIds = new Set() // Track previous content for delta computation private previousContent = new Map() // Track the completion result content private completionResultContent: string | undefined + // Track the latest assistant text as a fallback for result.content. + private lastAssistantText: string | undefined // The first non-partial "say:text" per task is the echoed user prompt. private expectPromptEchoAsUser = true constructor(options: JsonEventEmitterOptions) { this.mode = options.mode this.stdout = options.stdout ?? process.stdout + this.requestIdProvider = options.requestIdProvider ?? (() => undefined) + this.schemaVersion = options.schemaVersion ?? 1 + this.protocol = options.protocol ?? "roo-cli-stream" + this.capabilities = options.capabilities ?? [ + "stdin:start", + "stdin:message", + "stdin:cancel", + "stdin:ping", + "stdin:shutdown", + ] } /** @@ -120,6 +144,48 @@ export class JsonEventEmitter { type: "system", subtype: "init", content: "Task started", + schemaVersion: this.schemaVersion, + protocol: this.protocol, + capabilities: this.capabilities, + }) + } + + emitControl(event: { + subtype: "ack" | "done" | "error" + requestId?: string + command?: string + taskId?: string + content?: string + success?: boolean + code?: string + }): void { + this.emitEvent({ + type: "control", + subtype: event.subtype, + requestId: event.requestId, + command: event.command, + taskId: event.taskId, + content: event.content, + success: event.success, + code: event.code, + done: event.subtype === "done" ? true : undefined, + }) + } + + emitQueue(event: { + subtype: "snapshot" | "enqueued" | "dequeued" | "drained" | "updated" + taskId?: string + content?: string + queueDepth: number + queue: JsonEventQueueItem[] + }): void { + this.emitEvent({ + type: "queue", + subtype: event.subtype, + taskId: event.taskId, + content: event.content, + queueDepth: event.queueDepth, + queue: event.queue, }) } @@ -248,6 +314,9 @@ export class JsonEventEmitter { } } else { this.emitEvent(this.buildTextEvent("assistant", msg.ts, contentToSend, isDone)) + if (msg.text) { + this.lastAssistantText = msg.text + } } break @@ -387,7 +456,7 @@ export class JsonEventEmitter { */ private handleTaskCompleted(event: TaskCompletedEvent): void { // Use tracked completion result content, falling back to event message - const resultContent = this.completionResultContent || event.message?.text + const resultContent = this.completionResultContent || event.message?.text || this.lastAssistantText this.emitEvent({ type: "result", @@ -421,10 +490,13 @@ export class JsonEventEmitter { * For json mode: accumulate for final output */ private emitEvent(event: JsonEvent): void { - this.events.push(event) + const requestId = event.requestId ?? this.requestIdProvider() + const payload = requestId ? { ...event, requestId } : event + + this.events.push(payload) if (this.mode === "stream-json") { - this.outputLine(event) + this.outputLine(payload) } } @@ -466,6 +538,7 @@ export class JsonEventEmitter { this.seenMessageIds.clear() this.previousContent.clear() this.completionResultContent = undefined + this.lastAssistantText = undefined this.expectPromptEchoAsUser = true } } diff --git a/apps/cli/src/commands/cli/__tests__/list.test.ts b/apps/cli/src/commands/cli/__tests__/list.test.ts new file mode 100644 index 00000000000..09e05022443 --- /dev/null +++ b/apps/cli/src/commands/cli/__tests__/list.test.ts @@ -0,0 +1,29 @@ +import { parseFormat } from "../list.js" + +describe("parseFormat", () => { + it("defaults to json when undefined", () => { + expect(parseFormat(undefined)).toBe("json") + }) + + it("returns json for 'json'", () => { + expect(parseFormat("json")).toBe("json") + }) + + it("returns text for 'text'", () => { + expect(parseFormat("text")).toBe("text") + }) + + it("is case-insensitive", () => { + expect(parseFormat("JSON")).toBe("json") + expect(parseFormat("Text")).toBe("text") + expect(parseFormat("TEXT")).toBe("text") + }) + + it("throws on invalid format", () => { + expect(() => parseFormat("xml")).toThrow('Invalid format: xml. Must be "json" or "text".') + }) + + it("throws on empty string", () => { + expect(() => parseFormat("")).toThrow("Invalid format") + }) +}) diff --git a/apps/cli/src/commands/cli/__tests__/parse-stdin-command.test.ts b/apps/cli/src/commands/cli/__tests__/parse-stdin-command.test.ts new file mode 100644 index 00000000000..81b9d06b8b2 --- /dev/null +++ b/apps/cli/src/commands/cli/__tests__/parse-stdin-command.test.ts @@ -0,0 +1,104 @@ +import { parseStdinStreamCommand } from "../stdin-stream.js" + +describe("parseStdinStreamCommand", () => { + describe("valid commands", () => { + it("parses a start command", () => { + const result = parseStdinStreamCommand( + JSON.stringify({ command: "start", requestId: "req-1", prompt: "hello" }), + 1, + ) + expect(result).toEqual({ command: "start", requestId: "req-1", prompt: "hello" }) + }) + + it("parses a message command", () => { + const result = parseStdinStreamCommand( + JSON.stringify({ command: "message", requestId: "req-2", prompt: "follow up" }), + 1, + ) + expect(result).toEqual({ command: "message", requestId: "req-2", prompt: "follow up" }) + }) + + it.each(["cancel", "ping", "shutdown"] as const)("parses a %s command (no prompt required)", (command) => { + const result = parseStdinStreamCommand(JSON.stringify({ command, requestId: "req-3" }), 1) + expect(result).toEqual({ command, requestId: "req-3" }) + }) + + it("trims whitespace from requestId", () => { + const result = parseStdinStreamCommand(JSON.stringify({ command: "ping", requestId: " req-4 " }), 1) + expect(result.requestId).toBe("req-4") + }) + + it("ignores extra fields", () => { + const result = parseStdinStreamCommand( + JSON.stringify({ command: "ping", requestId: "req-5", extra: "ignored", nested: { a: 1 } }), + 1, + ) + expect(result).toEqual({ command: "ping", requestId: "req-5" }) + }) + }) + + describe("invalid input", () => { + it("throws on invalid JSON", () => { + expect(() => parseStdinStreamCommand("not json", 3)).toThrow("stdin command line 3: invalid JSON") + }) + + it("throws on non-object JSON (string)", () => { + expect(() => parseStdinStreamCommand('"hello"', 1)).toThrow("expected JSON object") + }) + + it("throws on non-object JSON (array)", () => { + // Arrays pass isRecord (typeof [] === "object") but lack a command field + expect(() => parseStdinStreamCommand("[]", 1)).toThrow('missing string "command"') + }) + + it("throws on non-object JSON (number)", () => { + expect(() => parseStdinStreamCommand("42", 1)).toThrow("expected JSON object") + }) + + it("throws on null", () => { + expect(() => parseStdinStreamCommand("null", 1)).toThrow("expected JSON object") + }) + + it("throws when command field is missing", () => { + expect(() => parseStdinStreamCommand(JSON.stringify({ requestId: "req" }), 5)).toThrow( + 'stdin command line 5: missing string "command"', + ) + }) + + it("throws when command is not a string", () => { + expect(() => parseStdinStreamCommand(JSON.stringify({ command: 123, requestId: "req" }), 1)).toThrow( + 'missing string "command"', + ) + }) + + it("throws on unsupported command name", () => { + expect(() => parseStdinStreamCommand(JSON.stringify({ command: "unknown", requestId: "req" }), 2)).toThrow( + 'stdin command line 2: unsupported command "unknown"', + ) + }) + + it("throws when requestId is missing", () => { + expect(() => parseStdinStreamCommand(JSON.stringify({ command: "ping" }), 1)).toThrow( + 'missing non-empty string "requestId"', + ) + }) + + it("throws when requestId is empty", () => { + expect(() => parseStdinStreamCommand(JSON.stringify({ command: "ping", requestId: " " }), 1)).toThrow( + 'missing non-empty string "requestId"', + ) + }) + + it("throws when start command has no prompt", () => { + expect(() => parseStdinStreamCommand(JSON.stringify({ command: "start", requestId: "req" }), 1)).toThrow( + '"start" requires non-empty string "prompt"', + ) + }) + + it("throws when message command has empty prompt", () => { + expect(() => + parseStdinStreamCommand(JSON.stringify({ command: "message", requestId: "req", prompt: " " }), 1), + ).toThrow('"message" requires non-empty string "prompt"') + }) + }) +}) diff --git a/apps/cli/src/commands/cli/index.ts b/apps/cli/src/commands/cli/index.ts index 89e8e9f1ba6..629c665a759 100644 --- a/apps/cli/src/commands/cli/index.ts +++ b/apps/cli/src/commands/cli/index.ts @@ -1 +1,2 @@ export * from "./run.js" +export * from "./list.js" diff --git a/apps/cli/src/commands/cli/list.ts b/apps/cli/src/commands/cli/list.ts new file mode 100644 index 00000000000..8d8e779c3ac --- /dev/null +++ b/apps/cli/src/commands/cli/list.ts @@ -0,0 +1,287 @@ +import fs from "fs" +import path from "path" +import { fileURLToPath } from "url" + +import pWaitFor from "p-wait-for" + +import type { Command, ModelRecord, WebviewMessage } from "@roo-code/types" +import { getProviderDefaultModelId } from "@roo-code/types" + +import { ExtensionHost, type ExtensionHostOptions } from "@/agent/index.js" +import { loadToken } from "@/lib/storage/index.js" +import { getDefaultExtensionPath } from "@/lib/utils/extension.js" +import { getApiKeyFromEnv } from "@/lib/utils/provider.js" +import { isRecord } from "@/lib/utils/guards.js" + +const __dirname = path.dirname(fileURLToPath(import.meta.url)) +const REQUEST_TIMEOUT_MS = 10_000 + +type ListFormat = "json" | "text" + +type BaseListOptions = { + workspace?: string + extension?: string + apiKey?: string + format?: string + debug?: boolean +} + +type CommandLike = Pick +type ModeLike = { slug: string; name: string } + +export function parseFormat(rawFormat: string | undefined): ListFormat { + const format = (rawFormat ?? "json").toLowerCase() + if (format === "json" || format === "text") { + return format + } + + throw new Error(`Invalid format: ${rawFormat}. Must be "json" or "text".`) +} + +function resolveWorkspacePath(workspace: string | undefined): string { + const resolved = workspace ? path.resolve(workspace) : process.cwd() + + if (!fs.existsSync(resolved)) { + throw new Error(`Workspace path does not exist: ${resolved}`) + } + + return resolved +} + +function resolveExtensionPath(extension: string | undefined): string { + const resolved = path.resolve(extension || getDefaultExtensionPath(__dirname)) + + if (!fs.existsSync(path.join(resolved, "extension.js"))) { + throw new Error(`Extension bundle not found at: ${resolved}`) + } + + return resolved +} + +function outputJson(data: unknown): void { + process.stdout.write(JSON.stringify(data, null, 2) + "\n") +} + +function outputCommandsText(commands: CommandLike[]): void { + for (const command of commands) { + const description = command.description ? ` - ${command.description}` : "" + process.stdout.write(`/${command.name} (${command.source})${description}\n`) + } +} + +function outputModesText(modes: ModeLike[]): void { + for (const mode of modes) { + process.stdout.write(`${mode.slug}\t${mode.name}\n`) + } +} + +function outputModelsText(models: ModelRecord): void { + for (const modelId of Object.keys(models).sort()) { + process.stdout.write(`${modelId}\n`) + } +} + +async function createListHost(options: BaseListOptions): Promise { + const workspacePath = resolveWorkspacePath(options.workspace) + const extensionPath = resolveExtensionPath(options.extension) + const apiKey = options.apiKey || (await loadToken()) || getApiKeyFromEnv("roo") + + const extensionHostOptions: ExtensionHostOptions = { + mode: "code", + reasoningEffort: undefined, + user: null, + provider: "roo", + model: getProviderDefaultModelId("roo"), + apiKey, + workspacePath, + extensionPath, + nonInteractive: true, + ephemeral: true, + debug: options.debug ?? false, + exitOnComplete: true, + exitOnError: false, + disableOutput: true, + } + + const host = new ExtensionHost(extensionHostOptions) + await host.activate() + + // Best effort wait; mode/commands requests can still succeed without this. + await pWaitFor(() => host.client.isInitialized(), { + interval: 25, + timeout: 2_000, + }).catch(() => undefined) + + return host +} + +/** + * Send a request to the extension and wait for a matching response message. + * Returns `undefined` from `extract` to skip non-matching messages, or the + * parsed value to resolve the promise. + */ +function requestFromExtension( + host: ExtensionHost, + requestType: WebviewMessage["type"], + extract: (message: Record) => T | undefined, +): Promise { + return new Promise((resolve, reject) => { + let settled = false + + const cleanup = () => { + clearTimeout(timeoutId) + host.off("extensionWebviewMessage", onMessage) + offError() + } + + const finish = (fn: () => void) => { + if (settled) return + settled = true + cleanup() + fn() + } + + const onMessage = (message: unknown) => { + if (!isRecord(message)) { + return + } + + let result: T | undefined + try { + result = extract(message) + } catch (error) { + finish(() => reject(error instanceof Error ? error : new Error(String(error)))) + return + } + + if (result !== undefined) { + finish(() => resolve(result)) + } + } + + const offError = host.client.on("error", (error) => { + finish(() => reject(error)) + }) + + const timeoutId = setTimeout(() => { + finish(() => + reject(new Error(`Timed out waiting for ${requestType} response after ${REQUEST_TIMEOUT_MS}ms`)), + ) + }, REQUEST_TIMEOUT_MS) + + host.on("extensionWebviewMessage", onMessage) + host.sendToExtension({ type: requestType }) + }) +} + +function requestCommands(host: ExtensionHost): Promise { + return requestFromExtension(host, "requestCommands", (message) => { + if (message.type !== "commands") { + return undefined + } + return Array.isArray(message.commands) ? (message.commands as CommandLike[]) : [] + }) +} + +function requestModes(host: ExtensionHost): Promise { + return requestFromExtension(host, "requestModes", (message) => { + if (message.type !== "modes") { + return undefined + } + return Array.isArray(message.modes) ? (message.modes as ModeLike[]) : [] + }) +} + +function requestRooModels(host: ExtensionHost): Promise { + return requestFromExtension(host, "requestRooModels", (message) => { + if (message.type !== "singleRouterModelFetchResponse") { + return undefined + } + + const values = isRecord(message.values) ? message.values : undefined + if (values?.provider !== "roo") { + return undefined + } + + if (message.success === false) { + const errorMessage = + typeof message.error === "string" && message.error.length > 0 + ? message.error + : "Failed to fetch Roo models" + throw new Error(errorMessage) + } + + return isRecord(values.models) ? (values.models as ModelRecord) : {} + }) +} + +async function withHostAndSignalHandlers( + options: BaseListOptions, + fn: (host: ExtensionHost) => Promise, +): Promise { + const host = await createListHost(options) + + const shutdown = async (exitCode: number) => { + await host.dispose() + process.exit(exitCode) + } + + const onSigint = () => void shutdown(130) + const onSigterm = () => void shutdown(143) + + process.on("SIGINT", onSigint) + process.on("SIGTERM", onSigterm) + + try { + return await fn(host) + } finally { + process.off("SIGINT", onSigint) + process.off("SIGTERM", onSigterm) + await host.dispose() + } +} + +export async function listCommands(options: BaseListOptions): Promise { + const format = parseFormat(options.format) + + await withHostAndSignalHandlers(options, async (host) => { + const commands = await requestCommands(host) + + if (format === "json") { + outputJson({ commands }) + return + } + + outputCommandsText(commands) + }) +} + +export async function listModes(options: BaseListOptions): Promise { + const format = parseFormat(options.format) + + await withHostAndSignalHandlers(options, async (host) => { + const modes = await requestModes(host) + + if (format === "json") { + outputJson({ modes }) + return + } + + outputModesText(modes) + }) +} + +export async function listModels(options: BaseListOptions): Promise { + const format = parseFormat(options.format) + + await withHostAndSignalHandlers(options, async (host) => { + const models = await requestRooModels(host) + + if (format === "json") { + outputJson({ models }) + return + } + + outputModelsText(models) + }) +} diff --git a/apps/cli/src/commands/cli/run.ts b/apps/cli/src/commands/cli/run.ts index 365febb9f86..b72e4e72834 100644 --- a/apps/cli/src/commands/cli/run.ts +++ b/apps/cli/src/commands/cli/run.ts @@ -1,10 +1,8 @@ import fs from "fs" import path from "path" -import { createInterface } from "readline" import { fileURLToPath } from "url" import { createElement } from "react" -import pWaitFor from "p-wait-for" import { setLogger } from "@roo-code/vscode-shim" @@ -29,27 +27,10 @@ import { getDefaultExtensionPath } from "@/lib/utils/extension.js" import { VERSION } from "@/lib/utils/version.js" import { ExtensionHost, ExtensionHostOptions } from "@/agent/index.js" +import { runStdinStreamMode } from "./stdin-stream.js" const __dirname = path.dirname(fileURLToPath(import.meta.url)) -async function* readPromptsFromStdinLines(): AsyncGenerator { - const lineReader = createInterface({ - input: process.stdin, - crlfDelay: Infinity, - terminal: false, - }) - - try { - for await (const line of lineReader) { - if (line.trim()) { - yield line - } - } - } finally { - lineReader.close() - } -} - export async function run(promptArg: string | undefined, flagOptions: FlagOptions) { setLogger({ info: () => {}, @@ -211,19 +192,27 @@ export async function run(promptArg: string | undefined, flagOptions: FlagOption if (flagOptions.stdinPromptStream && !flagOptions.print) { console.error("[CLI] Error: --stdin-prompt-stream requires --print mode") - console.error("[CLI] Usage: roo --print --stdin-prompt-stream [options]") + console.error("[CLI] Usage: roo --print --output-format stream-json --stdin-prompt-stream [options]") + process.exit(1) + } + + if (flagOptions.stdinPromptStream && outputFormat !== "stream-json") { + console.error("[CLI] Error: --stdin-prompt-stream requires --output-format=stream-json") + console.error("[CLI] Usage: roo --print --output-format stream-json --stdin-prompt-stream [options]") process.exit(1) } if (flagOptions.stdinPromptStream && process.stdin.isTTY) { console.error("[CLI] Error: --stdin-prompt-stream requires piped stdin") - console.error("[CLI] Example: printf '1+1=?\\n10!=?\\n' | roo --print --stdin-prompt-stream [options]") + console.error( + '[CLI] Example: printf \'{"command":"start","requestId":"1","prompt":"1+1=?"}\\n\' | roo --print --output-format stream-json --stdin-prompt-stream [options]', + ) process.exit(1) } if (flagOptions.stdinPromptStream && prompt) { console.error("[CLI] Error: cannot use positional prompt or --prompt-file with --stdin-prompt-stream") - console.error("[CLI] Usage: roo --print --stdin-prompt-stream [options]") + console.error("[CLI] Usage: roo --print --output-format stream-json --stdin-prompt-stream [options]") process.exit(1) } @@ -234,7 +223,9 @@ export async function run(promptArg: string | undefined, flagOptions: FlagOption if (flagOptions.print) { console.error("[CLI] Error: no prompt provided") console.error("[CLI] Usage: roo --print [options] ") - console.error("[CLI] For stdin control mode: roo --print --stdin-prompt-stream [options]") + console.error( + "[CLI] For stdin control mode: roo --print --output-format stream-json --stdin-prompt-stream [options]", + ) } else { console.error("[CLI] Error: prompt is required in non-interactive mode") console.error("[CLI] Usage: roo [options]") @@ -281,9 +272,13 @@ export async function run(promptArg: string | undefined, flagOptions: FlagOption extensionHostOptions.disableOutput = useJsonOutput const host = new ExtensionHost(extensionHostOptions) + let streamRequestId: string | undefined const jsonEmitter = useJsonOutput - ? new JsonEventEmitter({ mode: outputFormat as "json" | "stream-json" }) + ? new JsonEventEmitter({ + mode: outputFormat as "json" | "stream-json", + requestIdProvider: () => streamRequestId, + }) : null async function shutdown(signal: string, exitCode: number): Promise { @@ -306,151 +301,17 @@ export async function run(promptArg: string | undefined, flagOptions: FlagOption } if (useStdinPromptStream) { - let hasReceivedStdinPrompt = false - // stdin stream mode may start at most one task in this process. - let startedTaskFromStdin = false - let activeTaskPromise: Promise | null = null - let fatalStreamError: Error | null = null - // Extension-owned queue depth mirrored from state pushes. - // CLI does not maintain its own prompt queue. - let extensionQueueDepth = 0 - - const waitForInitialState = async () => { - // Give the extension a brief chance to publish initial state so - // we can continue an existing task instead of creating a new one. - await pWaitFor( - () => { - if (fatalStreamError) { - throw fatalStreamError - } - - return host.client.isInitialized() - }, - { interval: 25, timeout: 2_000 }, - ).catch(() => { - // Best-effort wait only; continuing preserves previous behavior. - }) - - if (fatalStreamError) { - throw fatalStreamError - } + if (!jsonEmitter || outputFormat !== "stream-json") { + throw new Error("--stdin-prompt-stream requires --output-format=stream-json to emit control events") } - const waitForActiveTask = async () => { - await pWaitFor( - () => { - if (fatalStreamError) { - throw fatalStreamError - } - - if (!host.client.hasActiveTask()) { - if (!activeTaskPromise && startedTaskFromStdin) { - throw new Error("task is no longer active; cannot continue conversation from stdin") - } - - return false - } - - return true - }, - { interval: 25, timeout: 5_000 }, - ) - } - - const startInitialTask = async (taskPrompt: string) => { - startedTaskFromStdin = true - - activeTaskPromise = host - .runTask(taskPrompt) - .catch((error) => { - fatalStreamError = error instanceof Error ? error : new Error(String(error)) - }) - .finally(() => { - activeTaskPromise = null - }) - - await waitForActiveTask() - } - - const enqueueContinuation = async (text: string) => { - if (!host.client.hasActiveTask()) { - await waitForActiveTask() - } - - // Delegate ordering/drain behavior to the extension message queue. - host.sendToExtension({ type: "queueMessage", text }) - } - - const offClientError = host.client.on("error", (error) => { - fatalStreamError = error + await runStdinStreamMode({ + host, + jsonEmitter, + setStreamRequestId: (id) => { + streamRequestId = id + }, }) - - const onExtensionMessage = (message: { type?: string; state?: { messageQueue?: unknown } }) => { - if (message.type !== "state") { - return - } - - const messageQueue = message.state?.messageQueue - extensionQueueDepth = Array.isArray(messageQueue) ? messageQueue.length : 0 - } - - host.on("extensionWebviewMessage", onExtensionMessage) - - try { - await waitForInitialState() - - for await (const stdinPrompt of readPromptsFromStdinLines()) { - hasReceivedStdinPrompt = true - - // Start once, then always continue via extension queue. - if (!host.client.hasActiveTask() && !startedTaskFromStdin) { - await startInitialTask(stdinPrompt) - } else { - await enqueueContinuation(stdinPrompt) - } - - if (fatalStreamError) { - throw fatalStreamError - } - } - - if (!hasReceivedStdinPrompt) { - throw new Error("no prompt provided via stdin") - } - - await pWaitFor( - () => { - if (fatalStreamError) { - throw fatalStreamError - } - - const isSettled = - !host.client.hasActiveTask() && !activeTaskPromise && extensionQueueDepth === 0 - - if (isSettled) { - return true - } - - if (host.isWaitingForInput() && extensionQueueDepth === 0) { - const currentAsk = host.client.getCurrentAsk() - - if (currentAsk === "completion_result") { - return true - } - - if (currentAsk) { - throw new Error(`stdin ended while task was waiting for input (${currentAsk})`) - } - } - - return false - }, - { interval: 50 }, - ) - } finally { - offClientError() - host.off("extensionWebviewMessage", onExtensionMessage) - } } else { await host.runTask(prompt!) } diff --git a/apps/cli/src/commands/cli/stdin-stream.ts b/apps/cli/src/commands/cli/stdin-stream.ts new file mode 100644 index 00000000000..dceca2e84d7 --- /dev/null +++ b/apps/cli/src/commands/cli/stdin-stream.ts @@ -0,0 +1,610 @@ +import { createInterface } from "readline" + +import { isRecord } from "@/lib/utils/guards.js" + +import type { ExtensionHost } from "@/agent/index.js" +import type { JsonEventEmitter } from "@/agent/json-event-emitter.js" + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +export type StdinStreamCommandName = "start" | "message" | "cancel" | "ping" | "shutdown" + +export type StdinStreamCommand = + | { command: "start"; requestId: string; prompt: string } + | { command: "message"; requestId: string; prompt: string } + | { command: "cancel"; requestId: string } + | { command: "ping"; requestId: string } + | { command: "shutdown"; requestId: string } + +// --------------------------------------------------------------------------- +// Parsing +// --------------------------------------------------------------------------- + +export const VALID_STDIN_COMMANDS = new Set(["start", "message", "cancel", "ping", "shutdown"]) + +export function parseStdinStreamCommand(line: string, lineNumber: number): StdinStreamCommand { + let parsed: unknown + + try { + parsed = JSON.parse(line) + } catch { + throw new Error(`stdin command line ${lineNumber}: invalid JSON`) + } + + if (!isRecord(parsed)) { + throw new Error(`stdin command line ${lineNumber}: expected JSON object`) + } + + const commandRaw = parsed.command + const requestIdRaw = parsed.requestId + + if (typeof commandRaw !== "string") { + throw new Error(`stdin command line ${lineNumber}: missing string "command"`) + } + + if (!VALID_STDIN_COMMANDS.has(commandRaw as StdinStreamCommandName)) { + throw new Error( + `stdin command line ${lineNumber}: unsupported command "${commandRaw}" (expected start|message|cancel|ping|shutdown)`, + ) + } + + if (typeof requestIdRaw !== "string" || requestIdRaw.trim().length === 0) { + throw new Error(`stdin command line ${lineNumber}: missing non-empty string "requestId"`) + } + + const command = commandRaw as StdinStreamCommandName + const requestId = requestIdRaw.trim() + + if (command === "start" || command === "message") { + const promptRaw = parsed.prompt + if (typeof promptRaw !== "string" || promptRaw.trim().length === 0) { + throw new Error(`stdin command line ${lineNumber}: "${command}" requires non-empty string "prompt"`) + } + + return { command, requestId, prompt: promptRaw } + } + + return { command, requestId } +} + +// --------------------------------------------------------------------------- +// NDJSON stdin reader +// --------------------------------------------------------------------------- + +async function* readCommandsFromStdinNdjson(): AsyncGenerator { + const lineReader = createInterface({ + input: process.stdin, + crlfDelay: Infinity, + terminal: false, + }) + + let lineNumber = 0 + + try { + for await (const line of lineReader) { + lineNumber += 1 + const trimmed = line.trim() + if (!trimmed) { + continue + } + yield parseStdinStreamCommand(trimmed, lineNumber) + } + } finally { + lineReader.close() + } +} + +// --------------------------------------------------------------------------- +// Queue snapshot helpers +// --------------------------------------------------------------------------- + +interface StreamQueueItem { + id: string + text?: string + imageCount: number + timestamp?: number +} + +function normalizeQueueText(text: string | undefined): string | undefined { + if (!text) { + return undefined + } + + const compact = text.replace(/\s+/g, " ").trim() + if (!compact) { + return undefined + } + + return compact.length <= 180 ? compact : `${compact.slice(0, 177)}...` +} + +function parseQueueSnapshot(rawQueue: unknown): StreamQueueItem[] | undefined { + if (!Array.isArray(rawQueue)) { + return undefined + } + + const snapshot: StreamQueueItem[] = [] + + for (const entry of rawQueue) { + if (!isRecord(entry)) { + continue + } + + const idRaw = entry.id + if (typeof idRaw !== "string" || idRaw.trim().length === 0) { + continue + } + + const imagesRaw = entry.images + const timestampRaw = entry.timestamp + const imageCount = Array.isArray(imagesRaw) ? imagesRaw.length : 0 + + snapshot.push({ + id: idRaw, + text: normalizeQueueText(typeof entry.text === "string" ? entry.text : undefined), + imageCount, + timestamp: typeof timestampRaw === "number" ? timestampRaw : undefined, + }) + } + + return snapshot +} + +function areStringArraysEqual(a: string[], b: string[]): boolean { + if (a.length !== b.length) { + return false + } + + for (let i = 0; i < a.length; i++) { + if (a[i] !== b[i]) { + return false + } + } + + return true +} + +// --------------------------------------------------------------------------- +// Orchestrator +// --------------------------------------------------------------------------- + +export interface StdinStreamModeOptions { + host: ExtensionHost + jsonEmitter: JsonEventEmitter + setStreamRequestId: (id: string | undefined) => void +} + +function isCancellationLikeError(error: unknown): boolean { + const message = error instanceof Error ? error.message : String(error) + const normalized = message.toLowerCase() + return normalized.includes("aborted") || normalized.includes("cancelled") || normalized.includes("canceled") +} + +export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId }: StdinStreamModeOptions) { + let hasReceivedStdinCommand = false + let shouldShutdown = false + let activeTaskPromise: Promise | null = null + let fatalStreamError: Error | null = null + let activeRequestId: string | undefined + let activeTaskCommand: "start" | undefined + let latestTaskId: string | undefined + let cancelRequestedForActiveTask = false + let hasSeenQueueState = false + let lastQueueDepth = 0 + let lastQueueMessageIds: string[] = [] + + const waitForPreviousTaskToSettle = async () => { + if (!activeTaskPromise) { + return + } + + try { + await activeTaskPromise + } catch { + // Errors are emitted through control/error events. + } + } + + const offClientError = host.client.on("error", (error) => { + if (cancelRequestedForActiveTask && isCancellationLikeError(error)) { + if (activeTaskCommand === "start") { + jsonEmitter.emitControl({ + subtype: "done", + requestId: activeRequestId, + command: "start", + taskId: latestTaskId, + content: "task cancelled", + code: "task_aborted", + success: false, + }) + } + activeTaskCommand = undefined + activeRequestId = undefined + setStreamRequestId(undefined) + cancelRequestedForActiveTask = false + return + } + + fatalStreamError = error + jsonEmitter.emitControl({ + subtype: "error", + requestId: activeRequestId, + command: activeTaskCommand, + taskId: latestTaskId, + content: error.message, + code: "client_error", + success: false, + }) + }) + + const onExtensionMessage = (message: { + type?: string + state?: { + currentTaskItem?: { id?: unknown } + messageQueue?: unknown + } + }) => { + if (message.type !== "state") { + return + } + + const currentTaskId = message.state?.currentTaskItem?.id + if (typeof currentTaskId === "string" && currentTaskId.trim().length > 0) { + latestTaskId = currentTaskId + } + + const queueSnapshot = parseQueueSnapshot(message.state?.messageQueue) + if (!queueSnapshot) { + return + } + + const queueDepth = queueSnapshot.length + const queueMessageIds = queueSnapshot.map((item) => item.id) + + if (!hasSeenQueueState) { + hasSeenQueueState = true + lastQueueDepth = queueDepth + lastQueueMessageIds = queueMessageIds + + if (queueDepth === 0) { + return + } + + jsonEmitter.emitQueue({ + subtype: "snapshot", + taskId: latestTaskId, + content: `queue snapshot (${queueDepth} item${queueDepth === 1 ? "" : "s"})`, + queueDepth, + queue: queueSnapshot, + }) + return + } + + const depthChanged = queueDepth !== lastQueueDepth + const idsChanged = !areStringArraysEqual(queueMessageIds, lastQueueMessageIds) + + if (!depthChanged && !idsChanged) { + return + } + + const subtype: "enqueued" | "dequeued" | "drained" | "updated" = depthChanged + ? queueDepth > lastQueueDepth + ? "enqueued" + : queueDepth === 0 + ? "drained" + : "dequeued" + : "updated" + + const content = + subtype === "drained" + ? "queue drained" + : `queue ${subtype} (${queueDepth} item${queueDepth === 1 ? "" : "s"})` + + jsonEmitter.emitQueue({ + subtype, + taskId: latestTaskId, + content, + queueDepth, + queue: queueSnapshot, + }) + + lastQueueDepth = queueDepth + lastQueueMessageIds = queueMessageIds + } + + host.on("extensionWebviewMessage", onExtensionMessage) + + const offTaskCompleted = host.client.on("taskCompleted", (event) => { + if (activeTaskCommand === "start") { + const completionCode = event.success + ? "task_completed" + : cancelRequestedForActiveTask + ? "task_aborted" + : "task_failed" + + jsonEmitter.emitControl({ + subtype: "done", + requestId: activeRequestId, + command: "start", + taskId: latestTaskId, + content: event.success + ? "task completed" + : cancelRequestedForActiveTask + ? "task cancelled" + : "task failed", + code: completionCode, + success: event.success, + }) + activeTaskCommand = undefined + activeRequestId = undefined + setStreamRequestId(undefined) + cancelRequestedForActiveTask = false + } + }) + + try { + for await (const stdinCommand of readCommandsFromStdinNdjson()) { + hasReceivedStdinCommand = true + + if (fatalStreamError) { + throw fatalStreamError + } + + switch (stdinCommand.command) { + case "start": + // A task can emit completion events before runTask() finalizers run. + // Wait for full settlement to avoid false "task_busy" on immediate next start. + // Safe from races: `for await` processes stdin commands serially, so no + // concurrent command can mutate state between the check and the await. + if (activeTaskPromise && !host.client.hasActiveTask()) { + await waitForPreviousTaskToSettle() + } + + if (activeTaskPromise || host.client.hasActiveTask()) { + jsonEmitter.emitControl({ + subtype: "error", + requestId: stdinCommand.requestId, + command: "start", + taskId: latestTaskId, + content: "cannot start a new task while another task is active", + code: "task_busy", + success: false, + }) + break + } + + activeRequestId = stdinCommand.requestId + activeTaskCommand = "start" + setStreamRequestId(stdinCommand.requestId) + latestTaskId = undefined + cancelRequestedForActiveTask = false + + jsonEmitter.emitControl({ + subtype: "ack", + requestId: stdinCommand.requestId, + command: "start", + taskId: latestTaskId, + content: "starting task", + code: "accepted", + success: true, + }) + + activeTaskPromise = host + .runTask(stdinCommand.prompt) + .catch((error) => { + const message = error instanceof Error ? error.message : String(error) + + if (cancelRequestedForActiveTask || isCancellationLikeError(error)) { + if (activeTaskCommand === "start") { + jsonEmitter.emitControl({ + subtype: "done", + requestId: stdinCommand.requestId, + command: "start", + taskId: latestTaskId, + content: "task cancelled", + code: "task_aborted", + success: false, + }) + } + activeTaskCommand = undefined + activeRequestId = undefined + setStreamRequestId(undefined) + cancelRequestedForActiveTask = false + return + } + + fatalStreamError = error instanceof Error ? error : new Error(message) + activeTaskCommand = undefined + activeRequestId = undefined + setStreamRequestId(undefined) + jsonEmitter.emitControl({ + subtype: "error", + requestId: stdinCommand.requestId, + command: "start", + taskId: latestTaskId, + content: message, + code: "task_error", + success: false, + }) + }) + .finally(() => { + activeTaskPromise = null + }) + break + + case "message": + if (!host.client.hasActiveTask()) { + jsonEmitter.emitControl({ + subtype: "error", + requestId: stdinCommand.requestId, + command: "message", + taskId: latestTaskId, + content: "no active task; send a start command first", + code: "no_active_task", + success: false, + }) + break + } + + setStreamRequestId(stdinCommand.requestId) + jsonEmitter.emitControl({ + subtype: "ack", + requestId: stdinCommand.requestId, + command: "message", + taskId: latestTaskId, + content: "message accepted", + code: "accepted", + success: true, + }) + host.sendToExtension({ type: "queueMessage", text: stdinCommand.prompt }) + jsonEmitter.emitControl({ + subtype: "done", + requestId: stdinCommand.requestId, + command: "message", + taskId: latestTaskId, + content: "message queued", + code: "queued", + success: true, + }) + break + + case "cancel": { + setStreamRequestId(stdinCommand.requestId) + + const hasTaskInFlight = Boolean( + activeTaskPromise || activeTaskCommand === "start" || host.client.hasActiveTask(), + ) + + if (!hasTaskInFlight) { + jsonEmitter.emitControl({ + subtype: "ack", + requestId: stdinCommand.requestId, + command: "cancel", + taskId: latestTaskId, + content: "no active task to cancel", + code: "accepted", + success: true, + }) + jsonEmitter.emitControl({ + subtype: "done", + requestId: stdinCommand.requestId, + command: "cancel", + taskId: latestTaskId, + content: "cancel ignored (no active task)", + code: "no_active_task", + success: true, + }) + break + } + + cancelRequestedForActiveTask = true + jsonEmitter.emitControl({ + subtype: "ack", + requestId: stdinCommand.requestId, + command: "cancel", + taskId: latestTaskId, + content: host.client.hasActiveTask() ? "cancel requested" : "cancel requested (task starting)", + code: "accepted", + success: true, + }) + try { + host.client.cancelTask() + jsonEmitter.emitControl({ + subtype: "done", + requestId: stdinCommand.requestId, + command: "cancel", + taskId: latestTaskId, + content: "cancel signal sent", + code: "cancel_requested", + success: true, + }) + } catch (error) { + if (!isCancellationLikeError(error)) { + const message = error instanceof Error ? error.message : String(error) + jsonEmitter.emitControl({ + subtype: "error", + requestId: stdinCommand.requestId, + command: "cancel", + taskId: latestTaskId, + content: message, + code: "cancel_error", + success: false, + }) + } + } + break + } + + case "ping": + jsonEmitter.emitControl({ + subtype: "ack", + requestId: stdinCommand.requestId, + command: "ping", + taskId: latestTaskId, + content: "pong", + code: "accepted", + success: true, + }) + jsonEmitter.emitControl({ + subtype: "done", + requestId: stdinCommand.requestId, + command: "ping", + taskId: latestTaskId, + content: "pong", + code: "pong", + success: true, + }) + break + + case "shutdown": + jsonEmitter.emitControl({ + subtype: "ack", + requestId: stdinCommand.requestId, + command: "shutdown", + taskId: latestTaskId, + content: "shutdown requested", + code: "accepted", + success: true, + }) + jsonEmitter.emitControl({ + subtype: "done", + requestId: stdinCommand.requestId, + command: "shutdown", + taskId: latestTaskId, + content: "shutting down process", + code: "shutdown_requested", + success: true, + }) + shouldShutdown = true + break + } + + if (shouldShutdown) { + break + } + } + + if (!hasReceivedStdinCommand) { + throw new Error("no stdin command provided") + } + + if (shouldShutdown && host.client.hasActiveTask()) { + host.client.cancelTask() + } + + if (!shouldShutdown && host.client.hasActiveTask() && host.isWaitingForInput()) { + const currentAsk = host.client.getCurrentAsk() + throw new Error(`stdin ended while task was waiting for input (${currentAsk ?? "unknown"})`) + } + + if (!shouldShutdown && activeTaskPromise) { + await activeTaskPromise + } + } finally { + offClientError() + host.off("extensionWebviewMessage", onExtensionMessage) + offTaskCompleted() + } +} diff --git a/apps/cli/src/index.ts b/apps/cli/src/index.ts index 6eaab059879..8b817db77f4 100644 --- a/apps/cli/src/index.ts +++ b/apps/cli/src/index.ts @@ -2,7 +2,7 @@ import { Command } from "commander" import { DEFAULT_FLAGS } from "@/types/constants.js" import { VERSION } from "@/lib/utils/version.js" -import { run, login, logout, status } from "@/commands/index.js" +import { run, login, logout, status, listCommands, listModes, listModels } from "@/commands/index.js" const program = new Command() @@ -16,7 +16,11 @@ program .option("--prompt-file ", "Read prompt from a file instead of command line argument") .option("-w, --workspace ", "Workspace directory path (defaults to current working directory)") .option("-p, --print", "Print response and exit (non-interactive mode)", false) - .option("--stdin-prompt-stream", "Read prompts from stdin (one prompt per line, requires --print)", false) + .option( + "--stdin-prompt-stream", + "Read NDJSON commands from stdin (requires --print and --output-format stream-json)", + false, + ) .option("-e, --extension ", "Path to the extension bundle directory") .option("-d, --debug", "Enable debug output (includes detailed debug information)", false) .option("-a, --require-approval", "Require manual approval for actions", false) @@ -39,6 +43,45 @@ program ) .action(run) +const listCommand = program.command("list").description("List commands, modes, or models") + +const applyListOptions = (command: Command) => + command + .option("-w, --workspace ", "Workspace directory path (defaults to current working directory)") + .option("-e, --extension ", "Path to the extension bundle directory") + .option("-k, --api-key ", "Roo API key (falls back to saved login/session token)") + .option("--format ", 'Output format: "json" (default) or "text"', "json") + .option("-d, --debug", "Enable debug output", false) + +const runListAction = async (action: () => Promise) => { + try { + await action() + process.exit(0) + } catch (error) { + const message = error instanceof Error ? error.message : String(error) + console.error(`[CLI] Error: ${message}`) + process.exit(1) + } +} + +applyListOptions(listCommand.command("commands").description("List available slash commands")).action( + async (options: Parameters[0]) => { + await runListAction(() => listCommands(options)) + }, +) + +applyListOptions(listCommand.command("modes").description("List available modes")).action( + async (options: Parameters[0]) => { + await runListAction(() => listModes(options)) + }, +) + +applyListOptions(listCommand.command("models").description("List available Roo models")).action( + async (options: Parameters[0]) => { + await runListAction(() => listModels(options)) + }, +) + const authCommand = program.command("auth").description("Manage authentication for Roo Code Cloud") authCommand diff --git a/apps/cli/src/lib/utils/__tests__/guards.test.ts b/apps/cli/src/lib/utils/__tests__/guards.test.ts new file mode 100644 index 00000000000..f59eeb506d4 --- /dev/null +++ b/apps/cli/src/lib/utils/__tests__/guards.test.ts @@ -0,0 +1,27 @@ +import { isRecord } from "../guards.js" + +describe("isRecord", () => { + it("returns true for plain objects", () => { + expect(isRecord({})).toBe(true) + expect(isRecord({ a: 1 })).toBe(true) + }) + + it("returns true for arrays (arrays are objects)", () => { + expect(isRecord([])).toBe(true) + }) + + it("returns false for null", () => { + expect(isRecord(null)).toBe(false) + }) + + it("returns false for undefined", () => { + expect(isRecord(undefined)).toBe(false) + }) + + it("returns false for primitives", () => { + expect(isRecord("string")).toBe(false) + expect(isRecord(42)).toBe(false) + expect(isRecord(true)).toBe(false) + expect(isRecord(Symbol("s"))).toBe(false) + }) +}) diff --git a/apps/cli/src/lib/utils/guards.ts b/apps/cli/src/lib/utils/guards.ts new file mode 100644 index 00000000000..a901f1a6584 --- /dev/null +++ b/apps/cli/src/lib/utils/guards.ts @@ -0,0 +1,3 @@ +export function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null +} diff --git a/apps/cli/src/types/json-events.ts b/apps/cli/src/types/json-events.ts index f18f3b27684..048a303a0f1 100644 --- a/apps/cli/src/types/json-events.ts +++ b/apps/cli/src/types/json-events.ts @@ -27,6 +27,8 @@ export function isValidOutputFormat(format: string): format is OutputFormat { */ export type JsonEventType = | "system" // System messages (init, ready, shutdown) + | "control" // Transport/control protocol events + | "queue" // Message queue telemetry from extension state | "assistant" // Assistant text messages | "user" // User messages (echoed input) | "tool_use" // Tool invocations (file ops, commands, browser, MCP) @@ -35,6 +37,17 @@ export type JsonEventType = | "error" // Errors | "result" // Final task result +export interface JsonEventQueueItem { + /** Queue item id generated by MessageQueueService */ + id: string + /** Queued text prompt preview */ + text?: string + /** Number of attached images in the queued message */ + imageCount?: number + /** Queue insertion/update timestamp (ms epoch) */ + timestamp?: number +} + /** * Tool use information for tool_use events. */ @@ -84,14 +97,32 @@ export interface JsonEventCost { export interface JsonEvent { /** Event type discriminator */ type: JsonEventType + /** Protocol schema version (included on system.init) */ + schemaVersion?: number + /** Transport protocol identifier (included on system.init) */ + protocol?: string + /** Capability names supported by the current process */ + capabilities?: string[] /** Message ID - included on first delta and final message */ id?: number + /** Active task ID when available */ + taskId?: string + /** Request ID for correlating streamed output to stdin commands */ + requestId?: string + /** Command name for control events */ + command?: string /** Content text (for text-based events) */ content?: string /** True when this is the final message (stream complete) */ done?: boolean /** Optional subtype for more specific categorization */ subtype?: string + /** Optional machine-readable status/error code */ + code?: string + /** Current queue depth (for queue events) */ + queueDepth?: number + /** Queue item snapshots (for queue events) */ + queue?: JsonEventQueueItem[] /** Tool use information (for tool_use events) */ tool_use?: JsonEventToolUse /** Tool result information (for tool_result events) */ From 781056ecf8a9a103dc331ae92c19b329713c2958 Mon Sep 17 00:00:00 2001 From: cte Date: Wed, 18 Feb 2026 23:45:43 -0800 Subject: [PATCH 2/2] fix(core): fix Task.ts bug affecting CLI operation Co-Authored-By: Claude Opus 4.6 --- src/core/task/Task.ts | 396 ++++++++++++++++++++++-------------------- 1 file changed, 204 insertions(+), 192 deletions(-) diff --git a/src/core/task/Task.ts b/src/core/task/Task.ts index 16c6b36dce0..e9343425465 100644 --- a/src/core/task/Task.ts +++ b/src/core/task/Task.ts @@ -2011,234 +2011,246 @@ export class Task extends EventEmitter implements TaskLike { } private async resumeTaskFromHistory() { - if (this.enableBridge) { - try { - await BridgeOrchestrator.subscribeToTask(this) - } catch (error) { - console.error( - `[Task#resumeTaskFromHistory] BridgeOrchestrator.subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}`, - ) + try { + if (this.enableBridge) { + try { + await BridgeOrchestrator.subscribeToTask(this) + } catch (error) { + console.error( + `[Task#resumeTaskFromHistory] BridgeOrchestrator.subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}`, + ) + } } - } - const modifiedClineMessages = await this.getSavedClineMessages() + const modifiedClineMessages = await this.getSavedClineMessages() - // Remove any resume messages that may have been added before. - const lastRelevantMessageIndex = findLastIndex( - modifiedClineMessages, - (m) => !(m.ask === "resume_task" || m.ask === "resume_completed_task"), - ) + // Remove any resume messages that may have been added before. + const lastRelevantMessageIndex = findLastIndex( + modifiedClineMessages, + (m) => !(m.ask === "resume_task" || m.ask === "resume_completed_task"), + ) - if (lastRelevantMessageIndex !== -1) { - modifiedClineMessages.splice(lastRelevantMessageIndex + 1) - } + if (lastRelevantMessageIndex !== -1) { + modifiedClineMessages.splice(lastRelevantMessageIndex + 1) + } + + // Remove any trailing reasoning-only UI messages that were not part of the persisted API conversation + while (modifiedClineMessages.length > 0) { + const last = modifiedClineMessages[modifiedClineMessages.length - 1] + if (last.type === "say" && last.say === "reasoning") { + modifiedClineMessages.pop() + } else { + break + } + } + + // Since we don't use `api_req_finished` anymore, we need to check if the + // last `api_req_started` has a cost value, if it doesn't and no + // cancellation reason to present, then we remove it since it indicates + // an api request without any partial content streamed. + const lastApiReqStartedIndex = findLastIndex( + modifiedClineMessages, + (m) => m.type === "say" && m.say === "api_req_started", + ) + + if (lastApiReqStartedIndex !== -1) { + const lastApiReqStarted = modifiedClineMessages[lastApiReqStartedIndex] + const { cost, cancelReason }: ClineApiReqInfo = JSON.parse(lastApiReqStarted.text || "{}") + + if (cost === undefined && cancelReason === undefined) { + modifiedClineMessages.splice(lastApiReqStartedIndex, 1) + } + } + + await this.overwriteClineMessages(modifiedClineMessages) + this.clineMessages = await this.getSavedClineMessages() + + // Now present the cline messages to the user and ask if they want to + // resume (NOTE: we ran into a bug before where the + // apiConversationHistory wouldn't be initialized when opening a old + // task, and it was because we were waiting for resume). + // This is important in case the user deletes messages without resuming + // the task first. + this.apiConversationHistory = await this.getSavedApiConversationHistory() + + const lastClineMessage = this.clineMessages + .slice() + .reverse() + .find((m) => !(m.ask === "resume_task" || m.ask === "resume_completed_task")) // Could be multiple resume tasks. - // Remove any trailing reasoning-only UI messages that were not part of the persisted API conversation - while (modifiedClineMessages.length > 0) { - const last = modifiedClineMessages[modifiedClineMessages.length - 1] - if (last.type === "say" && last.say === "reasoning") { - modifiedClineMessages.pop() + let askType: ClineAsk + if (lastClineMessage?.ask === "completion_result") { + askType = "resume_completed_task" } else { - break + askType = "resume_task" } - } - // Since we don't use `api_req_finished` anymore, we need to check if the - // last `api_req_started` has a cost value, if it doesn't and no - // cancellation reason to present, then we remove it since it indicates - // an api request without any partial content streamed. - const lastApiReqStartedIndex = findLastIndex( - modifiedClineMessages, - (m) => m.type === "say" && m.say === "api_req_started", - ) + this.isInitialized = true + + const { response, text, images } = await this.ask(askType) // Calls `postStateToWebview`. - if (lastApiReqStartedIndex !== -1) { - const lastApiReqStarted = modifiedClineMessages[lastApiReqStartedIndex] - const { cost, cancelReason }: ClineApiReqInfo = JSON.parse(lastApiReqStarted.text || "{}") + let responseText: string | undefined + let responseImages: string[] | undefined - if (cost === undefined && cancelReason === undefined) { - modifiedClineMessages.splice(lastApiReqStartedIndex, 1) + if (response === "messageResponse") { + await this.say("user_feedback", text, images) + responseText = text + responseImages = images } - } - await this.overwriteClineMessages(modifiedClineMessages) - this.clineMessages = await this.getSavedClineMessages() - - // Now present the cline messages to the user and ask if they want to - // resume (NOTE: we ran into a bug before where the - // apiConversationHistory wouldn't be initialized when opening a old - // task, and it was because we were waiting for resume). - // This is important in case the user deletes messages without resuming - // the task first. - this.apiConversationHistory = await this.getSavedApiConversationHistory() - - const lastClineMessage = this.clineMessages - .slice() - .reverse() - .find((m) => !(m.ask === "resume_task" || m.ask === "resume_completed_task")) // Could be multiple resume tasks. - - let askType: ClineAsk - if (lastClineMessage?.ask === "completion_result") { - askType = "resume_completed_task" - } else { - askType = "resume_task" - } + // Make sure that the api conversation history can be resumed by the API, + // even if it goes out of sync with cline messages. + let existingApiConversationHistory: ApiMessage[] = await this.getSavedApiConversationHistory() - this.isInitialized = true + // Tool blocks are always preserved; native tool calling only. - const { response, text, images } = await this.ask(askType) // Calls `postStateToWebview`. + // if the last message is an assistant message, we need to check if there's tool use since every tool use has to have a tool response + // if there's no tool use and only a text block, then we can just add a user message + // (note this isn't relevant anymore since we use custom tool prompts instead of tool use blocks, but this is here for legacy purposes in case users resume old tasks) - let responseText: string | undefined - let responseImages: string[] | undefined + // if the last message is a user message, we can need to get the assistant message before it to see if it made tool calls, and if so, fill in the remaining tool responses with 'interrupted' - if (response === "messageResponse") { - await this.say("user_feedback", text, images) - responseText = text - responseImages = images - } + let modifiedOldUserContent: Anthropic.Messages.ContentBlockParam[] // either the last message if its user message, or the user message before the last (assistant) message + let modifiedApiConversationHistory: ApiMessage[] // need to remove the last user message to replace with new modified user message + if (existingApiConversationHistory.length > 0) { + const lastMessage = existingApiConversationHistory[existingApiConversationHistory.length - 1] - // Make sure that the api conversation history can be resumed by the API, - // even if it goes out of sync with cline messages. - let existingApiConversationHistory: ApiMessage[] = await this.getSavedApiConversationHistory() - - // Tool blocks are always preserved; native tool calling only. - - // if the last message is an assistant message, we need to check if there's tool use since every tool use has to have a tool response - // if there's no tool use and only a text block, then we can just add a user message - // (note this isn't relevant anymore since we use custom tool prompts instead of tool use blocks, but this is here for legacy purposes in case users resume old tasks) - - // if the last message is a user message, we can need to get the assistant message before it to see if it made tool calls, and if so, fill in the remaining tool responses with 'interrupted' - - let modifiedOldUserContent: Anthropic.Messages.ContentBlockParam[] // either the last message if its user message, or the user message before the last (assistant) message - let modifiedApiConversationHistory: ApiMessage[] // need to remove the last user message to replace with new modified user message - if (existingApiConversationHistory.length > 0) { - const lastMessage = existingApiConversationHistory[existingApiConversationHistory.length - 1] - - if (lastMessage.isSummary) { - // IMPORTANT: If the last message is a condensation summary, we must preserve it - // intact. The summary message carries critical metadata (isSummary, condenseId) - // that getEffectiveApiHistory() uses to filter out condensed messages. - // Removing or merging it would destroy this metadata, causing all condensed - // messages to become "orphaned" and restored to active status — effectively - // undoing the condensation and sending the full history to the API. - // See: https://github.com/RooCodeInc/Roo-Code/issues/11487 - modifiedApiConversationHistory = [...existingApiConversationHistory] - modifiedOldUserContent = [] - } else if (lastMessage.role === "assistant") { - const content = Array.isArray(lastMessage.content) - ? lastMessage.content - : [{ type: "text", text: lastMessage.content }] - const hasToolUse = content.some((block) => block.type === "tool_use") - - if (hasToolUse) { - const toolUseBlocks = content.filter( - (block) => block.type === "tool_use", - ) as Anthropic.Messages.ToolUseBlock[] - const toolResponses: Anthropic.ToolResultBlockParam[] = toolUseBlocks.map((block) => ({ - type: "tool_result", - tool_use_id: block.id, - content: "Task was interrupted before this tool call could be completed.", - })) - modifiedApiConversationHistory = [...existingApiConversationHistory] // no changes - modifiedOldUserContent = [...toolResponses] - } else { + if (lastMessage.isSummary) { + // IMPORTANT: If the last message is a condensation summary, we must preserve it + // intact. The summary message carries critical metadata (isSummary, condenseId) + // that getEffectiveApiHistory() uses to filter out condensed messages. + // Removing or merging it would destroy this metadata, causing all condensed + // messages to become "orphaned" and restored to active status — effectively + // undoing the condensation and sending the full history to the API. + // See: https://github.com/RooCodeInc/Roo-Code/issues/11487 modifiedApiConversationHistory = [...existingApiConversationHistory] modifiedOldUserContent = [] - } - } else if (lastMessage.role === "user") { - const previousAssistantMessage: ApiMessage | undefined = - existingApiConversationHistory[existingApiConversationHistory.length - 2] - - const existingUserContent: Anthropic.Messages.ContentBlockParam[] = Array.isArray(lastMessage.content) - ? lastMessage.content - : [{ type: "text", text: lastMessage.content }] - if (previousAssistantMessage && previousAssistantMessage.role === "assistant") { - const assistantContent = Array.isArray(previousAssistantMessage.content) - ? previousAssistantMessage.content - : [{ type: "text", text: previousAssistantMessage.content }] - - const toolUseBlocks = assistantContent.filter( - (block) => block.type === "tool_use", - ) as Anthropic.Messages.ToolUseBlock[] - - if (toolUseBlocks.length > 0) { - const existingToolResults = existingUserContent.filter( - (block) => block.type === "tool_result", - ) as Anthropic.ToolResultBlockParam[] - - const missingToolResponses: Anthropic.ToolResultBlockParam[] = toolUseBlocks - .filter( - (toolUse) => !existingToolResults.some((result) => result.tool_use_id === toolUse.id), - ) - .map((toolUse) => ({ - type: "tool_result", - tool_use_id: toolUse.id, - content: "Task was interrupted before this tool call could be completed.", - })) - - modifiedApiConversationHistory = existingApiConversationHistory.slice(0, -1) // removes the last user message - modifiedOldUserContent = [...existingUserContent, ...missingToolResponses] + } else if (lastMessage.role === "assistant") { + const content = Array.isArray(lastMessage.content) + ? lastMessage.content + : [{ type: "text", text: lastMessage.content }] + const hasToolUse = content.some((block) => block.type === "tool_use") + + if (hasToolUse) { + const toolUseBlocks = content.filter( + (block) => block.type === "tool_use", + ) as Anthropic.Messages.ToolUseBlock[] + const toolResponses: Anthropic.ToolResultBlockParam[] = toolUseBlocks.map((block) => ({ + type: "tool_result", + tool_use_id: block.id, + content: "Task was interrupted before this tool call could be completed.", + })) + modifiedApiConversationHistory = [...existingApiConversationHistory] // no changes + modifiedOldUserContent = [...toolResponses] + } else { + modifiedApiConversationHistory = [...existingApiConversationHistory] + modifiedOldUserContent = [] + } + } else if (lastMessage.role === "user") { + const previousAssistantMessage: ApiMessage | undefined = + existingApiConversationHistory[existingApiConversationHistory.length - 2] + + const existingUserContent: Anthropic.Messages.ContentBlockParam[] = Array.isArray( + lastMessage.content, + ) + ? lastMessage.content + : [{ type: "text", text: lastMessage.content }] + if (previousAssistantMessage && previousAssistantMessage.role === "assistant") { + const assistantContent = Array.isArray(previousAssistantMessage.content) + ? previousAssistantMessage.content + : [{ type: "text", text: previousAssistantMessage.content }] + + const toolUseBlocks = assistantContent.filter( + (block) => block.type === "tool_use", + ) as Anthropic.Messages.ToolUseBlock[] + + if (toolUseBlocks.length > 0) { + const existingToolResults = existingUserContent.filter( + (block) => block.type === "tool_result", + ) as Anthropic.ToolResultBlockParam[] + + const missingToolResponses: Anthropic.ToolResultBlockParam[] = toolUseBlocks + .filter( + (toolUse) => + !existingToolResults.some((result) => result.tool_use_id === toolUse.id), + ) + .map((toolUse) => ({ + type: "tool_result", + tool_use_id: toolUse.id, + content: "Task was interrupted before this tool call could be completed.", + })) + + modifiedApiConversationHistory = existingApiConversationHistory.slice(0, -1) // removes the last user message + modifiedOldUserContent = [...existingUserContent, ...missingToolResponses] + } else { + modifiedApiConversationHistory = existingApiConversationHistory.slice(0, -1) + modifiedOldUserContent = [...existingUserContent] + } } else { modifiedApiConversationHistory = existingApiConversationHistory.slice(0, -1) modifiedOldUserContent = [...existingUserContent] } } else { - modifiedApiConversationHistory = existingApiConversationHistory.slice(0, -1) - modifiedOldUserContent = [...existingUserContent] + throw new Error("Unexpected: Last message is not a user or assistant message") } } else { - throw new Error("Unexpected: Last message is not a user or assistant message") + throw new Error("Unexpected: No existing API conversation history") } - } else { - throw new Error("Unexpected: No existing API conversation history") - } - let newUserContent: Anthropic.Messages.ContentBlockParam[] = [...modifiedOldUserContent] + let newUserContent: Anthropic.Messages.ContentBlockParam[] = [...modifiedOldUserContent] - const agoText = ((): string => { - const timestamp = lastClineMessage?.ts ?? Date.now() - const now = Date.now() - const diff = now - timestamp - const minutes = Math.floor(diff / 60000) - const hours = Math.floor(minutes / 60) - const days = Math.floor(hours / 24) + const agoText = ((): string => { + const timestamp = lastClineMessage?.ts ?? Date.now() + const now = Date.now() + const diff = now - timestamp + const minutes = Math.floor(diff / 60000) + const hours = Math.floor(minutes / 60) + const days = Math.floor(hours / 24) - if (days > 0) { - return `${days} day${days > 1 ? "s" : ""} ago` - } - if (hours > 0) { - return `${hours} hour${hours > 1 ? "s" : ""} ago` + if (days > 0) { + return `${days} day${days > 1 ? "s" : ""} ago` + } + if (hours > 0) { + return `${hours} hour${hours > 1 ? "s" : ""} ago` + } + if (minutes > 0) { + return `${minutes} minute${minutes > 1 ? "s" : ""} ago` + } + return "just now" + })() + + if (responseText) { + newUserContent.push({ + type: "text", + text: `\n${responseText}\n`, + }) } - if (minutes > 0) { - return `${minutes} minute${minutes > 1 ? "s" : ""} ago` + + if (responseImages && responseImages.length > 0) { + newUserContent.push(...formatResponse.imageBlocks(responseImages)) } - return "just now" - })() - if (responseText) { - newUserContent.push({ - type: "text", - text: `\n${responseText}\n`, - }) - } + // Ensure we have at least some content to send to the API. + // If newUserContent is empty, add a minimal resumption message. + if (newUserContent.length === 0) { + newUserContent.push({ + type: "text", + text: "[TASK RESUMPTION] Resuming task...", + }) + } - if (responseImages && responseImages.length > 0) { - newUserContent.push(...formatResponse.imageBlocks(responseImages)) - } + await this.overwriteApiConversationHistory(modifiedApiConversationHistory) - // Ensure we have at least some content to send to the API. - // If newUserContent is empty, add a minimal resumption message. - if (newUserContent.length === 0) { - newUserContent.push({ - type: "text", - text: "[TASK RESUMPTION] Resuming task...", - }) + // Task resuming from history item. + await this.initiateTaskLoop(newUserContent) + } catch (error) { + // Resume and cancellation can race when users issue repeated cancels. + // Treat intentional abort/abandon flows as expected and avoid process-level crashes. + if (this.abandoned === true || this.abort === true || this.abortReason === "user_cancelled") { + return + } + throw error } - - await this.overwriteApiConversationHistory(modifiedApiConversationHistory) - - // Task resuming from history item. - await this.initiateTaskLoop(newUserContent) } /**