diff --git a/apps/server/package.json b/apps/server/package.json index 7ed3d16144..46debc0e5c 100644 --- a/apps/server/package.json +++ b/apps/server/package.json @@ -17,6 +17,7 @@ "test": "vitest run" }, "dependencies": { + "node-pty": "^1.1.0", "open": "^10.1.0", "ws": "^8.18.0" }, diff --git a/apps/server/src/git.test.ts b/apps/server/src/git.test.ts index dd73725c03..bb3a48acc8 100644 --- a/apps/server/src/git.test.ts +++ b/apps/server/src/git.test.ts @@ -53,6 +53,21 @@ async function initRepoWithCommit(cwd: string): Promise { // ── Tests ── describe("git integration", () => { + describe("runTerminalCommand", () => { + it("caps captured output when maxOutputBytes is exceeded", async () => { + const result = await runTerminalCommand({ + command: `node -e "process.stdout.write('x'.repeat(2000))"`, + cwd: process.cwd(), + timeoutMs: 10_000, + maxOutputBytes: 128, + }); + + expect(result.code).toBe(0); + expect(result.stdout.length).toBeLessThanOrEqual(128); + expect(result.stderr).toContain("output truncated"); + }); + }); + // ── initGitRepo ── describe("initGitRepo", () => { diff --git a/apps/server/src/git.ts b/apps/server/src/git.ts index 34b3b0a0fa..7e299c6c47 100644 --- a/apps/server/src/git.ts +++ b/apps/server/src/git.ts @@ -12,10 +12,53 @@ import type { GitListBranchesInput, GitListBranchesResult, GitRemoveWorktreeInput, - TerminalCommandInput, - TerminalCommandResult, } from "@t3tools/contracts"; +export interface TerminalCommandInput { + command: string; + cwd: string; + timeoutMs?: number; + maxOutputBytes?: number; +} + +export interface TerminalCommandResult { + stdout: string; + stderr: string; + code: number | null; + signal: NodeJS.Signals | null; + timedOut: boolean; +} + +const DEFAULT_MAX_OUTPUT_BYTES = 1_000_000; + +function appendChunkWithinLimit( + target: string, + currentBytes: number, + chunk: Buffer, + maxBytes: number, +): { + next: string; + nextBytes: number; + truncated: boolean; +} { + const remaining = maxBytes - currentBytes; + if (remaining <= 0) { + return { next: target, nextBytes: currentBytes, truncated: true }; + } + if (chunk.length <= remaining) { + return { + next: `${target}${chunk.toString()}`, + nextBytes: currentBytes + chunk.length, + truncated: false, + }; + } + return { + next: `${target}${chunk.subarray(0, remaining).toString()}`, + nextBytes: currentBytes + remaining, + truncated: true, + }; +} + /** Spawn git directly with an argv array — no shell, no quoting needed. */ function runGit(args: string[], cwd: string, timeoutMs = 30_000): Promise { return new Promise((resolve, reject) => { @@ -25,9 +68,13 @@ function runGit(args: string[], cwd: string, timeoutMs = 30_000): Promise { timedOut = true; @@ -38,10 +85,16 @@ function runGit(args: string[], cwd: string, timeoutMs = 30_000): Promise { - stdout += chunk.toString(); + const appended = appendChunkWithinLimit(stdout, stdoutBytes, chunk, maxOutputBytes); + stdout = appended.next; + stdoutBytes = appended.nextBytes; + outputTruncated = outputTruncated || appended.truncated; }); child.stderr?.on("data", (chunk: Buffer) => { - stderr += chunk.toString(); + const appended = appendChunkWithinLimit(stderr, stderrBytes, chunk, maxOutputBytes); + stderr = appended.next; + stderrBytes = appended.nextBytes; + outputTruncated = outputTruncated || appended.truncated; }); child.on("error", (error) => { clearTimeout(timeout); @@ -49,6 +102,9 @@ function runGit(args: string[], cwd: string, timeoutMs = 30_000): Promise { clearTimeout(timeout); + if (outputTruncated) { + stderr = `${stderr}\n[output truncated at ${maxOutputBytes} bytes]`; + } resolve({ stdout, stderr, code: code ?? null, signal: signal ?? null, timedOut }); }); }); @@ -57,6 +113,7 @@ function runGit(args: string[], cwd: string, timeoutMs = 30_000): Promise { + const maxOutputBytes = input.maxOutputBytes ?? DEFAULT_MAX_OUTPUT_BYTES; const shellPath = process.platform === "win32" ? (process.env.ComSpec ?? "cmd.exe") @@ -74,7 +131,10 @@ export async function runTerminalCommand( let stdout = ""; let stderr = ""; + let stdoutBytes = 0; + let stderrBytes = 0; let timedOut = false; + let outputTruncated = false; const timeout = setTimeout(() => { timedOut = true; @@ -87,11 +147,17 @@ export async function runTerminalCommand( }, input.timeoutMs ?? 30_000); child.stdout?.on("data", (chunk: Buffer) => { - stdout += chunk.toString(); + const appended = appendChunkWithinLimit(stdout, stdoutBytes, chunk, maxOutputBytes); + stdout = appended.next; + stdoutBytes = appended.nextBytes; + outputTruncated = outputTruncated || appended.truncated; }); child.stderr?.on("data", (chunk: Buffer) => { - stderr += chunk.toString(); + const appended = appendChunkWithinLimit(stderr, stderrBytes, chunk, maxOutputBytes); + stderr = appended.next; + stderrBytes = appended.nextBytes; + outputTruncated = outputTruncated || appended.truncated; }); child.on("error", (error) => { @@ -101,6 +167,9 @@ export async function runTerminalCommand( child.on("close", (code, signal) => { clearTimeout(timeout); + if (outputTruncated) { + stderr = `${stderr}\n[output truncated at ${maxOutputBytes} bytes]`; + } resolve({ stdout, stderr, @@ -138,8 +207,8 @@ export async function listGitBranches(input: GitListBranchesInput): Promise void): () => void; + onExit(callback: (event: PtyExitEvent) => void): () => void; +} + +export interface PtySpawnInput { + shell: string; + cwd: string; + cols: number; + rows: number; + env: NodeJS.ProcessEnv; +} + +export interface PtyAdapter { + spawn(input: PtySpawnInput): PtyProcess; +} + +class NodePtyProcess implements PtyProcess { + constructor(private readonly process: nodePty.IPty) {} + + get pid(): number { + return this.process.pid; + } + + write(data: string): void { + this.process.write(data); + } + + resize(cols: number, rows: number): void { + this.process.resize(cols, rows); + } + + kill(signal?: string): void { + this.process.kill(signal); + } + + onData(callback: (data: string) => void): () => void { + const disposable = this.process.onData(callback); + return () => { + disposable.dispose(); + }; + } + + onExit(callback: (event: PtyExitEvent) => void): () => void { + const disposable = this.process.onExit((event) => { + callback({ + exitCode: event.exitCode, + signal: event.signal ?? null, + }); + }); + return () => { + disposable.dispose(); + }; + } +} + +export class NodePtyAdapter implements PtyAdapter { + spawn(input: PtySpawnInput): PtyProcess { + const ptyProcess = nodePty.spawn(input.shell, [], { + cwd: input.cwd, + cols: input.cols, + rows: input.rows, + env: input.env, + name: globalThis.process.platform === "win32" ? "xterm-color" : "xterm-256color", + }); + return new NodePtyProcess(ptyProcess); + } +} diff --git a/apps/server/src/terminalManager.test.ts b/apps/server/src/terminalManager.test.ts new file mode 100644 index 0000000000..281a716ae7 --- /dev/null +++ b/apps/server/src/terminalManager.test.ts @@ -0,0 +1,269 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; + +import type { + TerminalEvent, + TerminalOpenInput, +} from "@t3tools/contracts"; +import { afterEach, describe, expect, it } from "vitest"; + +import type { PtyAdapter, PtyExitEvent, PtyProcess, PtySpawnInput } from "./ptyAdapter"; +import { TerminalManager } from "./terminalManager"; + +class FakePtyProcess implements PtyProcess { + readonly writes: string[] = []; + readonly resizeCalls: Array<{ cols: number; rows: number }> = []; + private readonly dataListeners = new Set<(data: string) => void>(); + private readonly exitListeners = new Set<(event: PtyExitEvent) => void>(); + killed = false; + + constructor(readonly pid: number) {} + + write(data: string): void { + this.writes.push(data); + } + + resize(cols: number, rows: number): void { + this.resizeCalls.push({ cols, rows }); + } + + kill(): void { + this.killed = true; + } + + onData(callback: (data: string) => void): () => void { + this.dataListeners.add(callback); + return () => { + this.dataListeners.delete(callback); + }; + } + + onExit(callback: (event: PtyExitEvent) => void): () => void { + this.exitListeners.add(callback); + return () => { + this.exitListeners.delete(callback); + }; + } + + emitData(data: string): void { + for (const listener of this.dataListeners) { + listener(data); + } + } + + emitExit(event: PtyExitEvent): void { + for (const listener of this.exitListeners) { + listener(event); + } + } +} + +class FakePtyAdapter implements PtyAdapter { + readonly spawnInputs: PtySpawnInput[] = []; + readonly processes: FakePtyProcess[] = []; + private nextPid = 9000; + + spawn(input: PtySpawnInput): PtyProcess { + this.spawnInputs.push(input); + const process = new FakePtyProcess(this.nextPid++); + this.processes.push(process); + return process; + } +} + +function waitFor(predicate: () => boolean, timeoutMs = 800): Promise { + const started = Date.now(); + return new Promise((resolve, reject) => { + const poll = () => { + if (predicate()) { + resolve(); + return; + } + if (Date.now() - started > timeoutMs) { + reject(new Error("Timed out waiting for condition")); + return; + } + setTimeout(poll, 15); + }; + poll(); + }); +} + +function openInput(overrides: Partial = {}): TerminalOpenInput { + return { + threadId: "thread-1", + cwd: process.cwd(), + cols: 100, + rows: 24, + ...overrides, + }; +} + +function historyLogName(threadId: string): string { + return `terminal_${Buffer.from(threadId, "utf8").toString("base64url")}.log`; +} + +function historyLogPath(logsDir: string, threadId = "thread-1"): string { + return path.join(logsDir, historyLogName(threadId)); +} + +describe("TerminalManager", () => { + const tempDirs: string[] = []; + + afterEach(() => { + for (const dir of tempDirs.splice(0, tempDirs.length)) { + fs.rmSync(dir, { recursive: true, force: true }); + } + }); + + function makeManager(historyLineLimit = 5) { + const logsDir = fs.mkdtempSync(path.join(os.tmpdir(), "t3code-terminal-")); + tempDirs.push(logsDir); + const ptyAdapter = new FakePtyAdapter(); + const manager = new TerminalManager({ + logsDir, + ptyAdapter, + historyLineLimit, + shellResolver: () => "/bin/bash", + }); + return { logsDir, ptyAdapter, manager }; + } + + it("spawns lazily and reuses running terminal per thread", async () => { + const { manager, ptyAdapter } = makeManager(); + const [first, second] = await Promise.all([manager.open(openInput()), manager.open(openInput())]); + const third = await manager.open(openInput()); + + expect(first.threadId).toBe("thread-1"); + expect(second.threadId).toBe("thread-1"); + expect(third.threadId).toBe("thread-1"); + expect(ptyAdapter.spawnInputs).toHaveLength(1); + + manager.dispose(); + }); + + it("forwards write and resize to active pty process", async () => { + const { manager, ptyAdapter } = makeManager(); + await manager.open(openInput()); + const process = ptyAdapter.processes[0]; + expect(process).toBeDefined(); + if (!process) return; + + await manager.write({ threadId: "thread-1", data: "ls\n" }); + await manager.resize({ threadId: "thread-1", cols: 120, rows: 30 }); + + expect(process.writes).toEqual(["ls\n"]); + expect(process.resizeCalls).toEqual([{ cols: 120, rows: 30 }]); + + manager.dispose(); + }); + + it("clears transcript and emits cleared event", async () => { + const { manager, ptyAdapter, logsDir } = makeManager(); + const events: TerminalEvent[] = []; + manager.on("event", (event) => { + events.push(event); + }); + await manager.open(openInput()); + const process = ptyAdapter.processes[0]; + expect(process).toBeDefined(); + if (!process) return; + + process.emitData("hello\n"); + await waitFor(() => fs.existsSync(historyLogPath(logsDir))); + await manager.clear({ threadId: "thread-1" }); + await waitFor(() => fs.readFileSync(historyLogPath(logsDir), "utf8") === ""); + + expect(events.some((event) => event.type === "cleared")).toBe(true); + + manager.dispose(); + }); + + it("restarts terminal with empty transcript and respawns pty", async () => { + const { manager, ptyAdapter, logsDir } = makeManager(); + await manager.open(openInput()); + const firstProcess = ptyAdapter.processes[0]; + expect(firstProcess).toBeDefined(); + if (!firstProcess) return; + firstProcess.emitData("before restart\n"); + await waitFor(() => fs.existsSync(historyLogPath(logsDir))); + + const snapshot = await manager.restart(openInput()); + expect(snapshot.history).toBe(""); + expect(snapshot.status).toBe("running"); + expect(ptyAdapter.spawnInputs).toHaveLength(2); + await waitFor(() => fs.readFileSync(historyLogPath(logsDir), "utf8") === ""); + + manager.dispose(); + }); + + it("emits exited event and reopens with clean transcript after exit", async () => { + const { manager, ptyAdapter, logsDir } = makeManager(); + const events: TerminalEvent[] = []; + manager.on("event", (event) => { + events.push(event); + }); + await manager.open(openInput()); + const process = ptyAdapter.processes[0]; + expect(process).toBeDefined(); + if (!process) return; + process.emitData("old data\n"); + await waitFor(() => fs.existsSync(historyLogPath(logsDir))); + process.emitExit({ exitCode: 0, signal: 0 }); + + await waitFor(() => events.some((event) => event.type === "exited")); + const reopened = await manager.open(openInput()); + + expect(reopened.history).toBe(""); + expect(ptyAdapter.spawnInputs).toHaveLength(2); + expect(fs.readFileSync(historyLogPath(logsDir), "utf8")).toBe(""); + + manager.dispose(); + }); + + it("caps persisted history to configured line limit", async () => { + const { manager, ptyAdapter } = makeManager(3); + await manager.open(openInput()); + const process = ptyAdapter.processes[0]; + expect(process).toBeDefined(); + if (!process) return; + + process.emitData("line1\nline2\nline3\nline4\n"); + await manager.close({ threadId: "thread-1" }); + + const reopened = await manager.open(openInput()); + const nonEmptyLines = reopened.history.split("\n").filter((line) => line.length > 0); + expect(nonEmptyLines).toEqual(["line2", "line3", "line4"]); + + manager.dispose(); + }); + + it("deletes history file when close(deleteHistory=true)", async () => { + const { manager, ptyAdapter, logsDir } = makeManager(); + await manager.open(openInput()); + const process = ptyAdapter.processes[0]; + expect(process).toBeDefined(); + if (!process) return; + process.emitData("bye\n"); + await waitFor(() => fs.existsSync(historyLogPath(logsDir))); + + await manager.close({ threadId: "thread-1", deleteHistory: true }); + expect(fs.existsSync(historyLogPath(logsDir))).toBe(false); + + manager.dispose(); + }); + + it("loads existing legacy transcript filenames and keeps new naming for writes", async () => { + const { manager, logsDir } = makeManager(); + const legacyPath = path.join(logsDir, "thread-1.log"); + fs.writeFileSync(legacyPath, "legacy-line\n", "utf8"); + + const snapshot = await manager.open(openInput()); + + expect(snapshot.history).toBe("legacy-line\n"); + expect(fs.existsSync(legacyPath)).toBe(true); + + manager.dispose(); + }); +}); diff --git a/apps/server/src/terminalManager.ts b/apps/server/src/terminalManager.ts new file mode 100644 index 0000000000..f13b02280a --- /dev/null +++ b/apps/server/src/terminalManager.ts @@ -0,0 +1,566 @@ +import { EventEmitter } from "node:events"; +import fs from "node:fs"; +import path from "node:path"; + +import { + type TerminalCloseInput, + type TerminalEvent, + type TerminalOpenInput, + type TerminalSessionSnapshot, + type TerminalSessionStatus, + type TerminalThreadInput, + type TerminalWriteInput, + terminalCloseInputSchema, + terminalOpenInputSchema, + terminalResizeInputSchema, + terminalThreadInputSchema, + terminalWriteInputSchema, +} from "@t3tools/contracts"; + +import { createLogger } from "./logger"; +import { NodePtyAdapter, type PtyAdapter, type PtyExitEvent, type PtyProcess } from "./ptyAdapter"; + +const DEFAULT_HISTORY_LINE_LIMIT = 5_000; +const DEFAULT_PERSIST_DEBOUNCE_MS = 40; + +export interface TerminalManagerEvents { + event: [event: TerminalEvent]; +} + +export interface TerminalManagerOptions { + logsDir?: string; + historyLineLimit?: number; + ptyAdapter?: PtyAdapter; + shellResolver?: () => string; +} + +interface TerminalSessionState { + threadId: string; + cwd: string; + status: TerminalSessionStatus; + pid: number | null; + history: string; + exitCode: number | null; + exitSignal: number | null; + updatedAt: string; + cols: number; + rows: number; + process: PtyProcess | null; + unsubscribeData: (() => void) | null; + unsubscribeExit: (() => void) | null; +} + +function defaultShellResolver(): string { + if (process.platform === "win32") { + return process.env.ComSpec ?? "cmd.exe"; + } + return process.env.SHELL ?? "bash"; +} + +function capHistory(history: string, maxLines: number): string { + if (history.length === 0) return history; + const hasTrailingNewline = history.endsWith("\n"); + const lines = history.split("\n"); + if (hasTrailingNewline) { + lines.pop(); + } + if (lines.length <= maxLines) return history; + const capped = lines.slice(lines.length - maxLines).join("\n"); + return hasTrailingNewline ? `${capped}\n` : capped; +} + +function legacySafeThreadId(threadId: string): string { + return threadId.replace(/[^a-zA-Z0-9._-]/g, "_"); +} + +function toSafeThreadId(threadId: string): string { + return `terminal_${Buffer.from(threadId, "utf8").toString("base64url")}`; +} + +export class TerminalManager extends EventEmitter { + private readonly sessions = new Map(); + private readonly logsDir: string; + private readonly historyLineLimit: number; + private readonly ptyAdapter: PtyAdapter; + private readonly shellResolver: () => string; + private readonly persistQueues = new Map>(); + private readonly persistTimers = new Map>(); + private readonly pendingPersistHistory = new Map(); + private readonly threadLocks = new Map>(); + private readonly persistDebounceMs: number; + private readonly logger = createLogger("terminal"); + + constructor(options: TerminalManagerOptions = {}) { + super(); + this.logsDir = options.logsDir ?? path.resolve(process.cwd(), ".logs", "terminals"); + this.historyLineLimit = options.historyLineLimit ?? DEFAULT_HISTORY_LINE_LIMIT; + this.ptyAdapter = options.ptyAdapter ?? new NodePtyAdapter(); + this.shellResolver = options.shellResolver ?? defaultShellResolver; + this.persistDebounceMs = DEFAULT_PERSIST_DEBOUNCE_MS; + fs.mkdirSync(this.logsDir, { recursive: true }); + } + + async open(raw: TerminalOpenInput): Promise { + const input = terminalOpenInputSchema.parse(raw); + return this.runWithThreadLock(input.threadId, async () => { + await this.assertValidCwd(input.cwd); + + const existing = this.sessions.get(input.threadId); + if (!existing) { + await this.flushPersistQueue(input.threadId); + const history = await this.readHistory(input.threadId); + const session: TerminalSessionState = { + threadId: input.threadId, + cwd: input.cwd, + status: "starting", + pid: null, + history, + exitCode: null, + exitSignal: null, + updatedAt: new Date().toISOString(), + cols: input.cols, + rows: input.rows, + process: null, + unsubscribeData: null, + unsubscribeExit: null, + }; + this.sessions.set(input.threadId, session); + this.startSession(session, input, "started"); + return this.snapshot(session); + } + + if (existing.cwd !== input.cwd) { + this.stopProcess(existing); + existing.cwd = input.cwd; + existing.history = ""; + await this.persistHistory(existing.threadId, existing.history); + } else if (existing.status === "exited" || existing.status === "error") { + existing.history = ""; + await this.persistHistory(existing.threadId, existing.history); + } + + if (!existing.process) { + this.startSession(existing, input, "started"); + return this.snapshot(existing); + } + + if (existing.cols !== input.cols || existing.rows !== input.rows) { + existing.cols = input.cols; + existing.rows = input.rows; + existing.process.resize(input.cols, input.rows); + existing.updatedAt = new Date().toISOString(); + } + + return this.snapshot(existing); + }); + } + + async write(raw: TerminalWriteInput): Promise { + const input = terminalWriteInputSchema.parse(raw); + const session = this.requireSession(input.threadId); + if (!session.process || session.status !== "running") { + throw new Error(`Terminal is not running for thread: ${input.threadId}`); + } + session.process.write(input.data); + } + + async resize(raw: TerminalThreadInput & { cols: number; rows: number }): Promise { + const input = terminalResizeInputSchema.parse(raw); + const session = this.requireSession(input.threadId); + if (!session.process || session.status !== "running") { + throw new Error(`Terminal is not running for thread: ${input.threadId}`); + } + session.cols = input.cols; + session.rows = input.rows; + session.updatedAt = new Date().toISOString(); + session.process.resize(input.cols, input.rows); + } + + async clear(raw: TerminalThreadInput): Promise { + const input = terminalThreadInputSchema.parse(raw); + await this.runWithThreadLock(input.threadId, async () => { + const session = this.requireSession(input.threadId); + session.history = ""; + session.updatedAt = new Date().toISOString(); + await this.persistHistory(input.threadId, session.history); + this.emitEvent({ + type: "cleared", + threadId: input.threadId, + createdAt: new Date().toISOString(), + }); + }); + } + + async restart(raw: TerminalOpenInput): Promise { + const input = terminalOpenInputSchema.parse(raw); + return this.runWithThreadLock(input.threadId, async () => { + await this.assertValidCwd(input.cwd); + + let session = this.sessions.get(input.threadId); + if (!session) { + session = { + threadId: input.threadId, + cwd: input.cwd, + status: "starting", + pid: null, + history: "", + exitCode: null, + exitSignal: null, + updatedAt: new Date().toISOString(), + cols: input.cols, + rows: input.rows, + process: null, + unsubscribeData: null, + unsubscribeExit: null, + }; + this.sessions.set(input.threadId, session); + } else { + this.stopProcess(session); + session.cwd = input.cwd; + } + + session.history = ""; + await this.persistHistory(input.threadId, session.history); + this.startSession(session, input, "restarted"); + return this.snapshot(session); + }); + } + + async close(raw: TerminalCloseInput): Promise { + const input = terminalCloseInputSchema.parse(raw); + await this.runWithThreadLock(input.threadId, async () => { + const session = this.sessions.get(input.threadId); + if (session) { + this.stopProcess(session); + this.sessions.delete(input.threadId); + } + await this.flushPersistQueue(input.threadId); + if (input.deleteHistory) { + await this.deleteHistory(input.threadId); + } + }); + } + + dispose(): void { + for (const session of this.sessions.values()) { + this.stopProcess(session); + } + this.sessions.clear(); + for (const timer of this.persistTimers.values()) { + clearTimeout(timer); + } + this.persistTimers.clear(); + this.pendingPersistHistory.clear(); + this.threadLocks.clear(); + this.persistQueues.clear(); + } + + private startSession( + session: TerminalSessionState, + input: TerminalOpenInput, + eventType: "started" | "restarted", + ): void { + this.stopProcess(session); + + session.status = "starting"; + session.cwd = input.cwd; + session.cols = input.cols; + session.rows = input.rows; + session.exitCode = null; + session.exitSignal = null; + session.updatedAt = new Date().toISOString(); + + const shell = this.shellResolver(); + let ptyProcess: PtyProcess | null = null; + try { + ptyProcess = this.ptyAdapter.spawn({ + shell, + cwd: session.cwd, + cols: session.cols, + rows: session.rows, + env: process.env, + }); + session.process = ptyProcess; + session.pid = ptyProcess.pid; + session.status = "running"; + session.updatedAt = new Date().toISOString(); + session.unsubscribeData = ptyProcess.onData((data) => { + this.onProcessData(session, data); + }); + session.unsubscribeExit = ptyProcess.onExit((event) => { + this.onProcessExit(session, event); + }); + this.emitEvent({ + type: eventType, + threadId: session.threadId, + createdAt: new Date().toISOString(), + snapshot: this.snapshot(session), + }); + } catch (error) { + if (ptyProcess) { + try { + ptyProcess.kill(); + } catch { + // Ignore kill errors during failed startup cleanup. + } + } + session.status = "error"; + session.pid = null; + session.process = null; + session.updatedAt = new Date().toISOString(); + const message = error instanceof Error ? error.message : "Terminal start failed"; + this.emitEvent({ + type: "error", + threadId: session.threadId, + createdAt: new Date().toISOString(), + message, + }); + this.logger.error("failed to start terminal", { + threadId: session.threadId, + error: message, + }); + } + } + + private onProcessData(session: TerminalSessionState, data: string): void { + session.history = capHistory(`${session.history}${data}`, this.historyLineLimit); + session.updatedAt = new Date().toISOString(); + this.queuePersist(session.threadId, session.history); + this.emitEvent({ + type: "output", + threadId: session.threadId, + createdAt: new Date().toISOString(), + data, + }); + } + + private onProcessExit(session: TerminalSessionState, event: PtyExitEvent): void { + this.cleanupProcessHandles(session); + session.process = null; + session.pid = null; + session.status = "exited"; + session.exitCode = Number.isInteger(event.exitCode) ? event.exitCode : null; + session.exitSignal = Number.isInteger(event.signal) ? event.signal : null; + session.updatedAt = new Date().toISOString(); + this.emitEvent({ + type: "exited", + threadId: session.threadId, + createdAt: new Date().toISOString(), + exitCode: session.exitCode, + exitSignal: session.exitSignal, + }); + } + + private stopProcess(session: TerminalSessionState): void { + const process = session.process; + if (!process) return; + this.cleanupProcessHandles(session); + session.process = null; + session.pid = null; + session.status = "exited"; + session.updatedAt = new Date().toISOString(); + try { + process.kill(); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + this.logger.warn("failed to kill terminal process", { + threadId: session.threadId, + error: message, + }); + } + } + + private cleanupProcessHandles(session: TerminalSessionState): void { + session.unsubscribeData?.(); + session.unsubscribeData = null; + session.unsubscribeExit?.(); + session.unsubscribeExit = null; + } + + private queuePersist(threadId: string, history: string): void { + this.pendingPersistHistory.set(threadId, history); + this.schedulePersist(threadId); + } + + private async persistHistory(threadId: string, history: string): Promise { + this.clearPersistTimer(threadId); + this.pendingPersistHistory.delete(threadId); + await this.enqueuePersistWrite(threadId, history); + } + + private enqueuePersistWrite(threadId: string, history: string): Promise { + const task = async () => { + await fs.promises.writeFile(this.historyPath(threadId), history, "utf8"); + }; + const previous = this.persistQueues.get(threadId) ?? Promise.resolve(); + const next = previous + .catch(() => undefined) + .then(task) + .catch((error) => { + this.logger.warn("failed to persist terminal history", { + threadId, + error: error instanceof Error ? error.message : String(error), + }); + }); + this.persistQueues.set(threadId, next); + const finalized = next.finally(() => { + if (this.persistQueues.get(threadId) === next) { + this.persistQueues.delete(threadId); + } + if (this.pendingPersistHistory.has(threadId) && !this.persistTimers.has(threadId)) { + this.schedulePersist(threadId); + } + }); + void finalized.catch(() => undefined); + return finalized; + } + + private schedulePersist(threadId: string): void { + if (this.persistTimers.has(threadId)) return; + const timer = setTimeout(() => { + this.persistTimers.delete(threadId); + const pendingHistory = this.pendingPersistHistory.get(threadId); + if (pendingHistory === undefined) return; + this.pendingPersistHistory.delete(threadId); + void this.enqueuePersistWrite(threadId, pendingHistory); + }, this.persistDebounceMs); + this.persistTimers.set(threadId, timer); + } + + private clearPersistTimer(threadId: string): void { + const timer = this.persistTimers.get(threadId); + if (!timer) return; + clearTimeout(timer); + this.persistTimers.delete(threadId); + } + + private async readHistory(threadId: string): Promise { + try { + const raw = await fs.promises.readFile(this.historyPath(threadId), "utf8"); + const capped = capHistory(raw, this.historyLineLimit); + if (capped !== raw) { + await fs.promises.writeFile(this.historyPath(threadId), capped, "utf8"); + } + return capped; + } catch (error) { + if ((error as NodeJS.ErrnoException).code !== "ENOENT") { + throw error; + } + } + + try { + const raw = await fs.promises.readFile(this.legacyHistoryPath(threadId), "utf8"); + const capped = capHistory(raw, this.historyLineLimit); + if (capped !== raw) { + await fs.promises.writeFile(this.legacyHistoryPath(threadId), capped, "utf8"); + } + return capped; + } catch (error) { + if ((error as NodeJS.ErrnoException).code === "ENOENT") { + return ""; + } + throw error; + } + } + + private async deleteHistory(threadId: string): Promise { + try { + await Promise.all([ + fs.promises.rm(this.historyPath(threadId), { force: true }), + fs.promises.rm(this.legacyHistoryPath(threadId), { force: true }), + ]); + } catch (error) { + this.logger.warn("failed to delete terminal history", { + threadId, + error: error instanceof Error ? error.message : String(error), + }); + } + } + + private async flushPersistQueue(threadId: string): Promise { + this.clearPersistTimer(threadId); + + while (true) { + const pendingHistory = this.pendingPersistHistory.get(threadId); + if (pendingHistory !== undefined) { + this.pendingPersistHistory.delete(threadId); + await this.enqueuePersistWrite(threadId, pendingHistory); + } + + const pending = this.persistQueues.get(threadId); + if (!pending) { + return; + } + await pending.catch(() => undefined); + } + } + + private async assertValidCwd(cwd: string): Promise { + let stats: fs.Stats; + try { + stats = await fs.promises.stat(cwd); + } catch (error) { + if ((error as NodeJS.ErrnoException).code === "ENOENT") { + throw new Error(`Terminal cwd does not exist: ${cwd}`, { cause: error }); + } + throw error; + } + if (!stats.isDirectory()) { + throw new Error(`Terminal cwd is not a directory: ${cwd}`); + } + } + + private requireSession(threadId: string): TerminalSessionState { + const session = this.sessions.get(threadId); + if (!session) { + throw new Error(`Unknown terminal thread: ${threadId}`); + } + return session; + } + + private snapshot(session: TerminalSessionState): TerminalSessionSnapshot { + return { + threadId: session.threadId, + cwd: session.cwd, + status: session.status, + pid: session.pid, + history: session.history, + exitCode: session.exitCode, + exitSignal: session.exitSignal, + updatedAt: session.updatedAt, + }; + } + + private emitEvent(event: TerminalEvent): void { + this.emit("event", event); + } + + private historyPath(threadId: string): string { + return path.join(this.logsDir, `${toSafeThreadId(threadId)}.log`); + } + + private legacyHistoryPath(threadId: string): string { + return path.join(this.logsDir, `${legacySafeThreadId(threadId)}.log`); + } + + private async runWithThreadLock( + threadId: string, + task: () => Promise, + ): Promise { + const previous = this.threadLocks.get(threadId) ?? Promise.resolve(); + let release: () => void = () => {}; + const current = new Promise((resolve) => { + release = resolve; + }); + this.threadLocks.set(threadId, current); + await previous.catch(() => undefined); + try { + return await task(); + } finally { + release(); + if (this.threadLocks.get(threadId) === current) { + this.threadLocks.delete(threadId); + } + } + } +} diff --git a/apps/server/src/wsServer.test.ts b/apps/server/src/wsServer.test.ts index 770d1ccdb9..8a86a87869 100644 --- a/apps/server/src/wsServer.test.ts +++ b/apps/server/src/wsServer.test.ts @@ -1,6 +1,7 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; +import { EventEmitter } from "node:events"; import { describe, expect, it, afterEach, vi } from "vitest"; import { createServer } from "./wsServer"; @@ -8,6 +9,15 @@ import WebSocket from "ws"; import { WS_CHANNELS, WS_METHODS, type WsPush, type WsResponse } from "@t3tools/contracts"; import { ProjectRegistry } from "./projectRegistry"; +import type { + TerminalCloseInput, + TerminalEvent, + TerminalOpenInput, + TerminalSessionSnapshot, + TerminalThreadInput, + TerminalWriteInput, +} from "@t3tools/contracts"; +import type { TerminalManager } from "./terminalManager"; interface PendingMessages { queue: unknown[]; @@ -16,6 +26,91 @@ interface PendingMessages { const pendingBySocket = new WeakMap(); +class MockTerminalManager extends EventEmitter<{ event: [event: TerminalEvent] }> { + private readonly sessions = new Map(); + + async open(input: TerminalOpenInput): Promise { + const now = new Date().toISOString(); + const snapshot: TerminalSessionSnapshot = { + threadId: input.threadId, + cwd: input.cwd, + status: "running", + pid: 4242, + history: "", + exitCode: null, + exitSignal: null, + updatedAt: now, + }; + this.sessions.set(input.threadId, snapshot); + queueMicrotask(() => { + this.emit("event", { + type: "started", + threadId: input.threadId, + createdAt: now, + snapshot, + }); + }); + return snapshot; + } + + async write(input: TerminalWriteInput): Promise { + const existing = this.sessions.get(input.threadId); + if (!existing) { + throw new Error(`Unknown terminal thread: ${input.threadId}`); + } + queueMicrotask(() => { + this.emit("event", { + type: "output", + threadId: input.threadId, + createdAt: new Date().toISOString(), + data: input.data, + }); + }); + } + + async resize(_input: TerminalThreadInput & { cols: number; rows: number }): Promise {} + + async clear(input: TerminalThreadInput): Promise { + queueMicrotask(() => { + this.emit("event", { + type: "cleared", + threadId: input.threadId, + createdAt: new Date().toISOString(), + }); + }); + } + + async restart(input: TerminalOpenInput): Promise { + const now = new Date().toISOString(); + const snapshot: TerminalSessionSnapshot = { + threadId: input.threadId, + cwd: input.cwd, + status: "running", + pid: 5252, + history: "", + exitCode: null, + exitSignal: null, + updatedAt: now, + }; + this.sessions.set(input.threadId, snapshot); + queueMicrotask(() => { + this.emit("event", { + type: "restarted", + threadId: input.threadId, + createdAt: now, + snapshot, + }); + }); + return snapshot; + } + + async close(input: TerminalCloseInput): Promise { + this.sessions.delete(input.threadId); + } + + dispose(): void {} +} + function connectWs(port: number, token?: string): Promise { return new Promise((resolve, reject) => { const query = token ? `?token=${encodeURIComponent(token)}` : ""; @@ -85,6 +180,7 @@ describe("WebSocket Server", () => { devUrl?: string; authToken?: string; stateDir?: string; + terminalManager?: TerminalManager; } = {}, ): ReturnType { const stateDir = options.stateDir ?? makeTempDir("t3code-ws-state-"); @@ -94,6 +190,9 @@ describe("WebSocket Server", () => { ...(options.devUrl ? { devUrl: options.devUrl } : {}), ...(options.authToken ? { authToken: options.authToken } : {}), projectRegistry: new ProjectRegistry(stateDir), + ...(options.terminalManager + ? { terminalManager: options.terminalManager } + : {}), }); } @@ -213,6 +312,111 @@ describe("WebSocket Server", () => { expect(response.result).toEqual([]); }); + it("routes terminal RPC methods and broadcasts terminal events", async () => { + const cwd = makeTempDir("t3code-ws-terminal-cwd-"); + const terminalManager = new MockTerminalManager(); + server = createTestServer({ + cwd: "/test", + terminalManager: terminalManager as unknown as TerminalManager, + }); + await server.start(); + const addr = server.httpServer.address(); + const port = typeof addr === "object" && addr !== null ? addr.port : 0; + + const ws = await connectWs(port); + connections.push(ws); + await waitForMessage(ws); + + const open = await sendRequest(ws, WS_METHODS.terminalOpen, { + threadId: "thread-1", + cwd, + cols: 100, + rows: 24, + }); + expect(open.error).toBeUndefined(); + expect((open.result as TerminalSessionSnapshot).threadId).toBe("thread-1"); + + const write = await sendRequest(ws, WS_METHODS.terminalWrite, { + threadId: "thread-1", + data: "echo hello\n", + }); + expect(write.error).toBeUndefined(); + + const resize = await sendRequest(ws, WS_METHODS.terminalResize, { + threadId: "thread-1", + cols: 120, + rows: 30, + }); + expect(resize.error).toBeUndefined(); + + const clear = await sendRequest(ws, WS_METHODS.terminalClear, { + threadId: "thread-1", + }); + expect(clear.error).toBeUndefined(); + + const restart = await sendRequest(ws, WS_METHODS.terminalRestart, { + threadId: "thread-1", + cwd, + cols: 120, + rows: 30, + }); + expect(restart.error).toBeUndefined(); + + const close = await sendRequest(ws, WS_METHODS.terminalClose, { + threadId: "thread-1", + deleteHistory: true, + }); + expect(close.error).toBeUndefined(); + + const manualEvent: TerminalEvent = { + type: "output", + threadId: "thread-1", + createdAt: new Date().toISOString(), + data: "manual test output\n", + }; + terminalManager.emit("event", manualEvent); + + const push = (await waitForMessage(ws)) as WsPush; + expect(push.type).toBe("push"); + expect(push.channel).toBe(WS_CHANNELS.terminalEvent); + expect((push.data as TerminalEvent).type).toBe("output"); + }); + + it("detaches terminal event listener on stop for injected manager", async () => { + const terminalManager = new MockTerminalManager(); + server = createTestServer({ + cwd: "/test", + terminalManager: terminalManager as unknown as TerminalManager, + }); + await server.start(); + + expect(terminalManager.listenerCount("event")).toBe(1); + + await server.stop(); + server = null; + + expect(terminalManager.listenerCount("event")).toBe(0); + }); + + it("returns validation errors for invalid terminal open params", async () => { + server = createTestServer({ cwd: "/test" }); + await server.start(); + const addr = server.httpServer.address(); + const port = typeof addr === "object" && addr !== null ? addr.port : 0; + + const ws = await connectWs(port); + connections.push(ws); + await waitForMessage(ws); + + const response = await sendRequest(ws, WS_METHODS.terminalOpen, { + threadId: "", + cwd: "", + cols: 1, + rows: 1, + }); + expect(response.error).toBeDefined(); + }); + it("handles invalid JSON gracefully", async () => { server = createTestServer({ cwd: "/test" }); await server.start(); diff --git a/apps/server/src/wsServer.ts b/apps/server/src/wsServer.ts index 353535ae32..d010c4a7c4 100644 --- a/apps/server/src/wsServer.ts +++ b/apps/server/src/wsServer.ts @@ -9,6 +9,7 @@ import { EDITORS, WS_CHANNELS, WS_METHODS, + type TerminalEvent, type WsPush, type WsRequest, type WsResponse, @@ -27,6 +28,7 @@ import { listGitBranches, removeGitWorktree, } from "./git"; +import { TerminalManager } from "./terminalManager"; const MIME_TYPES: Record = { ".html": "text/html; charset=utf-8", @@ -51,6 +53,7 @@ export interface ServerOptions { devUrl?: string | undefined; logWebSocketEvents?: boolean | undefined; projectRegistry?: ProjectRegistry | undefined; + terminalManager?: TerminalManager | undefined; authToken?: string | undefined; } @@ -71,9 +74,11 @@ export function createServer(options: ServerOptions) { devUrl, logWebSocketEvents: explicitLogWsEvents, projectRegistry: providedRegistry, + terminalManager: providedTerminalManager, authToken, } = options; const providerManager = new ProviderManager(); + const terminalManager = providedTerminalManager ?? new TerminalManager(); const projectRegistry = providedRegistry ?? new ProjectRegistry(path.join(os.homedir(), ".t3", "userdata")); const clients = new Set(); @@ -108,6 +113,24 @@ export function createServer(options: ServerOptions) { logOutgoingPush(push, recipients); }); + const onTerminalEvent = (event: TerminalEvent) => { + const push: WsPush = { + type: "push", + channel: WS_CHANNELS.terminalEvent, + data: event, + }; + const message = JSON.stringify(push); + let recipients = 0; + for (const client of clients) { + if (client.readyState === client.OPEN) { + client.send(message); + recipients += 1; + } + } + logOutgoingPush(push, recipients); + }; + terminalManager.on("event", onTerminalEvent); + // HTTP server — serves static files or redirects to Vite dev server const httpServer = http.createServer((req, res) => { // In dev mode, redirect to Vite dev server @@ -330,6 +353,28 @@ export function createServer(options: ServerOptions) { case WS_METHODS.gitInit: return initGitRepo(request.params as never); + case WS_METHODS.terminalOpen: + return terminalManager.open(request.params as never); + + case WS_METHODS.terminalWrite: + await terminalManager.write(request.params as never); + return undefined; + + case WS_METHODS.terminalResize: + await terminalManager.resize(request.params as never); + return undefined; + + case WS_METHODS.terminalClear: + await terminalManager.clear(request.params as never); + return undefined; + + case WS_METHODS.terminalRestart: + return terminalManager.restart(request.params as never); + + case WS_METHODS.terminalClose: + await terminalManager.close(request.params as never); + return undefined; + case WS_METHODS.serverGetConfig: return { cwd }; @@ -358,8 +403,10 @@ export function createServer(options: ServerOptions) { } async function stop(): Promise { + terminalManager.off("event", onTerminalEvent); providerManager.stopAll(); providerManager.dispose(); + terminalManager.dispose(); for (const client of clients) { client.close(); diff --git a/apps/web/package.json b/apps/web/package.json index d995f4819b..d2185184eb 100644 --- a/apps/web/package.json +++ b/apps/web/package.json @@ -11,6 +11,8 @@ "test": "vitest run --passWithNoTests" }, "dependencies": { + "@xterm/addon-fit": "^0.11.0", + "@xterm/xterm": "^6.0.0", "@t3tools/contracts": "workspace:*", "@tanstack/react-query": "^5.90.0", "highlight.js": "^11.11.1", diff --git a/apps/web/src/App.tsx b/apps/web/src/App.tsx index 4619f6c933..cec89d05b7 100644 --- a/apps/web/src/App.tsx +++ b/apps/web/src/App.tsx @@ -8,6 +8,7 @@ import { isElectron } from "./env"; import { DEFAULT_MODEL } from "./model-logic"; import { readNativeApi } from "./session-logic"; import { StoreProvider, useStore } from "./store"; +import { DEFAULT_THREAD_TERMINAL_HEIGHT } from "./types"; import { onServerWelcome } from "./wsNativeApi"; function EventRouter() { @@ -26,6 +27,18 @@ function EventRouter() { }); }, [api, dispatch]); + useEffect(() => { + if (!api) return; + return api.terminal.onEvent((event) => { + if (event.type !== "exited") return; + dispatch({ + type: "SET_THREAD_TERMINAL_OPEN", + threadId: event.threadId, + open: false, + }); + }); + }, [api, dispatch]); + return null; } @@ -78,6 +91,8 @@ function AutoProjectBootstrap() { projectId, title: "New thread", model: DEFAULT_MODEL, + terminalOpen: false, + terminalHeight: DEFAULT_THREAD_TERMINAL_HEIGHT, session: null, messages: [], events: [], diff --git a/apps/web/src/components/ChatView.tsx b/apps/web/src/components/ChatView.tsx index 9792d07fff..a052a3db2a 100644 --- a/apps/web/src/components/ChatView.tsx +++ b/apps/web/src/components/ChatView.tsx @@ -7,6 +7,7 @@ import { type FormEvent, Fragment, type KeyboardEvent, + useCallback, useEffect, useMemo, useRef, @@ -34,7 +35,9 @@ import { } from "../session-logic"; import { useStore } from "../store"; import BranchToolbar from "./BranchToolbar"; +import { isTerminalToggleShortcut } from "../terminal-shortcuts"; import ChatMarkdown from "./ChatMarkdown"; +import ThreadTerminalDrawer from "./ThreadTerminalDrawer"; function formatMessageMeta(createdAt: string, duration: string | null): string { if (!duration) return formatTimestamp(createdAt); @@ -145,12 +148,15 @@ export default function ChatView() { const [respondingRequestIds, setRespondingRequestIds] = useState([]); const [expandedWorkGroups, setExpandedWorkGroups] = useState>({}); const [nowTick, setNowTick] = useState(() => Date.now()); + const [terminalFocusRequestId, setTerminalFocusRequestId] = useState(0); const messagesEndRef = useRef(null); const textareaRef = useRef(null); const modelMenuRef = useRef(null); const editorMenuRef = useRef(null); + const terminalOpenByThreadRef = useRef>({}); const activeThread = state.threads.find((t) => t.id === state.activeThreadId); + const activeThreadId = activeThread?.id ?? null; const activeProject = state.projects.find((p) => p.id === activeThread?.projectId); const selectedModel = resolveModelSlug( activeThread?.model ?? activeProject?.model ?? DEFAULT_MODEL, @@ -255,6 +261,25 @@ export default function ChatView() { (activeThread.messages.length > 0 || (activeThread.session !== null && activeThread.session.status !== "closed")), ); + const terminalShortcutHint = navigator.platform.includes("Mac") + ? "\u2318J" + : "Ctrl+J"; + const focusComposer = useCallback(() => { + const textarea = textareaRef.current; + if (!textarea) return; + textarea.focus(); + const cursor = textarea.value.length; + textarea.setSelectionRange(cursor, cursor); + }, []); + const toggleTerminalVisibility = useCallback(() => { + if (!activeThreadId) return; + const isOpen = Boolean(activeThread?.terminalOpen); + dispatch({ + type: "SET_THREAD_TERMINAL_OPEN", + threadId: activeThreadId, + open: !isOpen, + }); + }, [activeThread?.terminalOpen, activeThreadId, dispatch]); const handleRuntimeModeChange = async (mode: "approval-required" | "full-access") => { if (mode === state.runtimeMode) return; @@ -364,6 +389,37 @@ export default function ChatView() { return () => window.removeEventListener("keydown", handler); }, [api, activeProject, activeThread, lastEditor]); + useEffect(() => { + if (!activeThreadId) return; + const previous = terminalOpenByThreadRef.current[activeThreadId] ?? false; + const current = Boolean(activeThread?.terminalOpen); + + if (!previous && current) { + setTerminalFocusRequestId((value) => value + 1); + } else if (previous && !current) { + terminalOpenByThreadRef.current[activeThreadId] = current; + const frame = window.requestAnimationFrame(() => { + focusComposer(); + }); + return () => { + window.cancelAnimationFrame(frame); + }; + } + + terminalOpenByThreadRef.current[activeThreadId] = current; + }, [activeThread?.terminalOpen, activeThreadId, focusComposer]); + + useEffect(() => { + const handler = (event: globalThis.KeyboardEvent) => { + if (!activeThreadId) return; + if (!isTerminalToggleShortcut(event)) return; + event.preventDefault(); + toggleTerminalVisibility(); + }; + window.addEventListener("keydown", handler); + return () => window.removeEventListener("keydown", handler); + }, [activeThreadId, toggleTerminalVisibility]); + const openInEditor = (editorId: EditorId) => { if (!api || !activeProject) return; const cwd = activeThread?.worktreePath ?? activeProject.cwd; @@ -572,7 +628,7 @@ export default function ChatView() { // Empty state: no active thread if (!activeThread) { return ( -
+
{isElectron &&
}
@@ -584,7 +640,7 @@ export default function ChatView() { } return ( -
+
{/* Top bar */}
)} {/* Diff toggle */} +