From 411b13891cdfb7cf08c682782bb397941b07d55c Mon Sep 17 00:00:00 2001 From: Emanuele Santonastaso Date: Sat, 28 Mar 2026 01:11:50 +0100 Subject: [PATCH] =?UTF-8?q?fix:=20unbounded=20maps=20and=20memory=20leaks?= =?UTF-8?q?=20across=20modules=20=E2=80=94=20Issue=20#357?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/__tests__/swarm-monitor.test.ts | 2 +- src/auth.ts | 10 +++ src/events.ts | 2 + src/metrics.ts | 6 ++ src/server.ts | 25 +++++- src/session.ts | 87 +++++++++++++++----- src/tmux.ts | 123 +++++++++++++++++++--------- 7 files changed, 194 insertions(+), 61 deletions(-) diff --git a/src/__tests__/swarm-monitor.test.ts b/src/__tests__/swarm-monitor.test.ts index cc40a444..57779a44 100644 --- a/src/__tests__/swarm-monitor.test.ts +++ b/src/__tests__/swarm-monitor.test.ts @@ -145,7 +145,7 @@ describe('SwarmMonitor with mocked parent sessions', () => { it('should not match session without ccPid', async () => { const session = makeSession({ id: 'no-pid-session', - activeSubagents: ['explore-agent'], + activeSubagents: new Set(['explore-agent']), status: 'working', }); diff --git a/src/auth.ts b/src/auth.ts index 0e2c6336..883b475f 100644 --- a/src/auth.ts +++ b/src/auth.ts @@ -222,7 +222,17 @@ export class AuthManager { // Valid — consume it entry.used = true; + const keyId = entry.keyId; this.sseTokens.delete(token); + // #357: Decrement outstanding count so generateSSEToken doesn't over-limit + const count = this.sseTokenCounts.get(keyId); + if (count !== undefined) { + if (count <= 1) { + this.sseTokenCounts.delete(keyId); + } else { + this.sseTokenCounts.set(keyId, count - 1); + } + } return true; } diff --git a/src/events.ts b/src/events.ts index 25634949..41e6ff48 100644 --- a/src/events.ts +++ b/src/events.ts @@ -194,10 +194,12 @@ export class SessionEventBus { } // Clean up after a short delay (let clients receive the event) // Capture reference — only delete if it's still the same emitter + // #357: Also delete the per-session event buffer to prevent unbounded map growth setTimeout(() => { if (this.emitters.get(sessionId) === emitter) { this.emitters.delete(sessionId); } + this.eventBuffers.delete(sessionId); }, 1000); } diff --git a/src/metrics.ts b/src/metrics.ts index c1afcdfb..8c0981d7 100644 --- a/src/metrics.ts +++ b/src/metrics.ts @@ -217,6 +217,12 @@ export class MetricsCollector { this.latency.delete(sessionId); } + /** #357: Clean up all per-session data (call on session destroy). */ + cleanupSession(sessionId: string): void { + this.perSession.delete(sessionId); + this.latency.delete(sessionId); + } + getGlobalMetrics(activeSessionCount: number): Record { const avgMessages = this.global.sessionsCreated > 0 ? Math.round(this.global.totalMessages / this.global.sessionsCreated) : 0; diff --git a/src/server.ts b/src/server.ts index 082485ae..7f04ca50 100644 --- a/src/server.ts +++ b/src/server.ts @@ -90,6 +90,7 @@ async function handleInbound(cmd: InboundCommand): Promise { await channels.sessionEnded(makePayload('session.ended', cmd.sessionId, 'killed')); await sessions.killSession(cmd.sessionId); monitor.removeSession(cmd.sessionId); + metrics.cleanupSession(cmd.sessionId); break; case 'message': case 'command': @@ -155,6 +156,17 @@ function checkIpRateLimit(ip: string, isMaster: boolean): boolean { return timestamps.length > limit; } +/** #357: Prune IPs whose timestamp arrays are entirely outside the rate-limit window. */ +function pruneIpRateLimits(): void { + const cutoff = Date.now() - IP_WINDOW_MS; + for (const [ip, timestamps] of ipRateLimits) { + // All timestamps are old — remove the entry entirely + if (timestamps.length === 0 || timestamps[timestamps.length - 1]! < cutoff) { + ipRateLimits.delete(ip); + } + } +} + function setupAuth(authManager: AuthManager): void { app.addHook('onRequest', async (req, reply) => { // Skip auth for health endpoint and dashboard (Issue #349: exact path matching) @@ -772,6 +784,7 @@ app.delete<{ Params: { id: string } }>('/v1/sessions/:id', async (req, reply) => await channels.sessionEnded(makePayload('session.ended', req.params.id, 'killed')); await sessions.killSession(req.params.id); monitor.removeSession(req.params.id); + metrics.cleanupSession(req.params.id); return { ok: true }; } catch (e: unknown) { return reply.status(404).send({ error: e instanceof Error ? e.message : String(e) }); @@ -783,6 +796,7 @@ app.delete<{ Params: { id: string } }>('/sessions/:id', async (req, reply) => { await channels.sessionEnded(makePayload('session.ended', req.params.id, 'killed')); await sessions.killSession(req.params.id); monitor.removeSession(req.params.id); + metrics.cleanupSession(req.params.id); return { ok: true }; } catch (e: unknown) { return reply.status(404).send({ error: e instanceof Error ? e.message : String(e) }); @@ -1171,6 +1185,7 @@ async function reapStaleSessions(maxAgeMs: number): Promise { }); await sessions.killSession(session.id); monitor.removeSession(session.id); + metrics.cleanupSession(session.id); } catch (e) { console.error(`Reaper: failed to kill session ${session.id}:`, e); } @@ -1198,6 +1213,7 @@ async function reapZombieSessions(): Promise { try { monitor.removeSession(session.id); await sessions.killSession(session.id); + metrics.cleanupSession(session.id); await channels.sessionEnded({ event: 'session.ended', timestamp: new Date().toISOString(), @@ -1214,7 +1230,11 @@ async function reapZombieSessions(): Promise { /** Issue #20: Add actionHints to session response for interactive states. */ function addActionHints(session: import('./session.js').SessionInfo): Record { - const result: Record = { ...session }; + // #357: Convert Set to array for JSON serialization + const result: Record = { + ...session, + activeSubagents: session.activeSubagents ? [...session.activeSubagents] : undefined, + }; if (session.status === 'permission_prompt' || session.status === 'bash_approval') { result.actionHints = { approve: { method: 'POST', url: `/v1/sessions/${session.id}/approve`, description: 'Approve the pending permission' }, @@ -1508,6 +1528,8 @@ async function main(): Promise { const reaperInterval = setInterval(() => reapStaleSessions(config.maxSessionAgeMs), config.reaperIntervalMs); const zombieReaperInterval = setInterval(() => reapZombieSessions(), ZOMBIE_REAP_INTERVAL_MS); const metricsSaveInterval = setInterval(() => { void metrics.save(); }, 5 * 60 * 1000); + // #357: Prune stale IP rate-limit entries every minute + const ipPruneInterval = setInterval(pruneIpRateLimits, 60_000); // Issue #361: Graceful shutdown handler let shuttingDown = false; @@ -1525,6 +1547,7 @@ async function main(): Promise { clearInterval(reaperInterval); clearInterval(zombieReaperInterval); clearInterval(metricsSaveInterval); + clearInterval(ipPruneInterval); // 3. Destroy channels (awaits Telegram poll loop) try { await channels.destroy(); } catch (e) { console.error('Error destroying channels:', e); } diff --git a/src/session.ts b/src/session.ts index 0690295a..c724f4fb 100644 --- a/src/session.ts +++ b/src/session.ts @@ -34,7 +34,7 @@ export interface SessionInfo { settingsPatched?: boolean; // Permission guard: settings.local.json was patched hookSettingsFile?: string; // Temp file with HTTP hook settings (Issue #169) lastHookAt?: number; // Unix timestamp of last received hook event (Issue #169 Phase 3) - activeSubagents?: string[]; // Active subagent names (Issue #88) + activeSubagents?: Set; // Active subagent names (Issue #88, #357: Set for O(1)) // Issue #87: Latency metrics permissionPromptAt?: number; // Unix timestamp when permission prompt was detected permissionRespondedAt?: number; // Unix timestamp when user approved/rejected @@ -92,8 +92,12 @@ export class SessionManager { private sessionMapFile: string; private pollTimers: Map = new Map(); private saveQueue: Promise = Promise.resolve(); // #218: serialize concurrent saves + private saveDebounceTimer: NodeJS.Timeout | null = null; + private static readonly SAVE_DEBOUNCE_MS = 5_000; // #357: debounce offset-only saves private pendingPermissions: Map = new Map(); private pendingQuestions: Map = new Map(); + // #357: Cache of all parsed JSONL entries per session to avoid re-reading from offset 0 + private parsedEntriesCache = new Map(); constructor( private tmux: TmuxManager, @@ -170,6 +174,13 @@ export class SessionManager { } } + // #357: Convert deserialized activeSubagents arrays to Sets + for (const session of Object.values(this.state.sessions)) { + if (Array.isArray(session.activeSubagents)) { + session.activeSubagents = new Set(session.activeSubagents); + } + } + // Create backup of successfully loaded state try { await writeFile(`${this.stateFile}.bak`, JSON.stringify(this.state, null, 2)); @@ -252,13 +263,27 @@ export class SessionManager { await this.saveQueue; } + /** #357: Debounced save — skips immediate save for offset-only changes. + * Coalesces rapid successive reads into a single disk write. */ + debouncedSave(): void { + if (this.saveDebounceTimer !== null) clearTimeout(this.saveDebounceTimer); + this.saveDebounceTimer = setTimeout(() => { + this.saveDebounceTimer = null; + void this.save(); + }, SessionManager.SAVE_DEBOUNCE_MS); + } + private async doSave(): Promise { const dir = dirname(this.stateFile); if (!existsSync(dir)) { await mkdir(dir, { recursive: true }); } const tmpFile = `${this.stateFile}.tmp`; - await writeFile(tmpFile, JSON.stringify(this.state, null, 2)); + // #357: Use replacer to serialize Set as arrays + await writeFile(tmpFile, JSON.stringify(this.state, (_, value) => { + if (value instanceof Set) return [...value]; + return value; + }, 2)); await rename(tmpFile, this.stateFile); } @@ -538,17 +563,15 @@ export class SessionManager { addSubagent(id: string, name: string): void { const session = this.state.sessions[id]; if (!session) return; - if (!session.activeSubagents) session.activeSubagents = []; - if (!session.activeSubagents.includes(name)) { - session.activeSubagents.push(name); - } + if (!session.activeSubagents) session.activeSubagents = new Set(); + session.activeSubagents.add(name); } /** Issue #88: Remove an active subagent from a session. */ removeSubagent(id: string, name: string): void { const session = this.state.sessions[id]; if (!session || !session.activeSubagents) return; - session.activeSubagents = session.activeSubagents.filter(n => n !== name); + session.activeSubagents.delete(name); } /** Issue #89 L25: Update the model field on a session from hook payload. */ @@ -965,7 +988,9 @@ export class SessionManager { } } - await this.save(); + // #357: Debounce saves on GET reads — offsets change frequently but disk + // writes are expensive. Full save still happens on create/kill/reconcile. + this.debouncedSave(); return { messages, @@ -1022,6 +1047,27 @@ export class SessionManager { }; } + /** #357: Get all parsed entries for a session, using a cache to avoid full reparse. + * Reads only the delta from the last cached offset. */ + private async getCachedEntries(session: SessionInfo): Promise { + if (!session.jsonlPath || !existsSync(session.jsonlPath)) return []; + const cached = this.parsedEntriesCache.get(session.id); + try { + const fromOffset = cached ? cached.offset : 0; + const result = await readNewEntries(session.jsonlPath, fromOffset); + if (cached) { + cached.entries.push(...result.entries); + cached.offset = result.newOffset; + return cached.entries; + } + // First read — cache it + this.parsedEntriesCache.set(session.id, { entries: [...result.entries], offset: result.newOffset }); + return result.entries; + } catch { + return cached ? [...cached.entries] : []; + } + } + /** Issue #35: Get a condensed summary of a session's transcript. */ async getSummary(id: string, maxMessages = 20): Promise<{ sessionId: string; @@ -1036,14 +1082,8 @@ export class SessionManager { const session = this.state.sessions[id]; if (!session) throw new Error(`Session ${id} not found`); - // Read ALL messages from the beginning for summary - let allMessages: ParsedEntry[] = []; - if (session.jsonlPath && existsSync(session.jsonlPath)) { - try { - const result = await readNewEntries(session.jsonlPath, 0); - allMessages = result.entries; - } catch { /* file may be corrupted */ } - } + // #357: Use cached entries instead of re-reading from offset 0 + const allMessages = await this.getCachedEntries(session); // Take last N messages const recent = allMessages.slice(-maxMessages).map(m => ({ @@ -1085,12 +1125,8 @@ export class SessionManager { } let allEntries: ParsedEntry[] = []; - if (session.jsonlPath && existsSync(session.jsonlPath)) { - try { - const result = await readNewEntries(session.jsonlPath, 0); - allEntries = result.entries; - } catch { /* file may be corrupted */ } - } + // #357: Use cached entries instead of re-reading from offset 0 + allEntries = await this.getCachedEntries(session); if (roleFilter) { allEntries = allEntries.filter(e => e.role === roleFilter); @@ -1141,6 +1177,13 @@ export class SessionManager { this.cleanupPendingQuestion(id); delete this.state.sessions[id]; + // #357: Clean up parsed entries cache + this.parsedEntriesCache.delete(id); + // #357: Cancel any pending debounced save before doing an immediate save + if (this.saveDebounceTimer !== null) { + clearTimeout(this.saveDebounceTimer); + this.saveDebounceTimer = null; + } await this.save(); } diff --git a/src/tmux.ts b/src/tmux.ts index 45ace52f..1f382887 100644 --- a/src/tmux.ts +++ b/src/tmux.ts @@ -7,7 +7,7 @@ import { execFile } from 'node:child_process'; import { promisify } from 'node:util'; -import { readdir, rename as fsRename, mkdir } from 'node:fs/promises'; +import { readdir, rename as fsRename, mkdir, stat } from 'node:fs/promises'; import { existsSync } from 'node:fs'; import { join } from 'node:path'; import { homedir, tmpdir } from 'node:os'; @@ -45,6 +45,10 @@ export class TmuxManager { /** tmux socket name (-L flag). Isolates sessions from other tmux instances. */ readonly socketName: string; + /** #357: Cache for window existence checks — avoids repeated tmux CLI calls. */ + private windowExistsCache = new Map(); + private static readonly WINDOW_CACHE_TTL_MS = 2_000; + constructor(private sessionName: string = 'aegis', socketName?: string) { this.socketName = socketName ?? `aegis-${process.pid}`; } @@ -52,6 +56,9 @@ export class TmuxManager { /** Promise-chain queue that serializes all tmux CLI calls to prevent race conditions. */ private queue: Promise = Promise.resolve(undefined as unknown as void); + /** #357: Short-lived cache for window existence checks to reduce CLI calls. */ + private windowCache = new Map(); + /** Run `fn` sequentially after all previously-queued operations complete. */ private serialize(fn: () => Promise): Promise { let resolve!: () => void; @@ -329,24 +336,27 @@ export class TmuxManager { // Send the command to start Claude await this.sendKeys(windowId, cmd, true); - // Issue #7: Verify Claude process started by checking pane command after a delay. + // Issue #7: Verify Claude process started by checking pane command. + // #357: Poll for pane command change instead of fixed 2s sleep. // Zeus reported sessions where claude never started — byteOffset stayed 0 forever. - await sleep(2000); - try { - const windows = await this.listWindows(); - const win = windows.find(w => w.windowId === windowId); - if (win) { - const paneCmd = win.paneCommand.toLowerCase(); - // After sending 'claude', the pane command should be 'claude' or 'node' (CC runs as node) - // If it's still 'bash' or 'zsh', Claude didn't start - if (paneCmd === 'bash' || paneCmd === 'zsh' || paneCmd === 'sh') { - console.warn(`Tmux: Claude may not have started in ${finalName} — pane command is '${win.paneCommand}', retrying...`); - // Retry sending the command once - await this.sendKeys(windowId, cmd, true); - } - } - } catch { - // Non-fatal: verification failed but session may still work + const CLAUDE_START_POLL_MS = 200; + const CLAUDE_START_TIMEOUT_MS = 3000; + const started = await this.pollUntil( + async () => { + try { + const windows = await this.listWindows(); + const win = windows.find(w => w.windowId === windowId); + if (!win) return false; + const paneCmd = win.paneCommand.toLowerCase(); + return paneCmd !== 'bash' && paneCmd !== 'zsh' && paneCmd !== 'sh'; + } catch { return false; } + }, + CLAUDE_START_POLL_MS, + CLAUDE_START_TIMEOUT_MS, + ); + if (!started) { + console.warn(`Tmux: Claude may not have started in ${finalName} — retrying...`); + try { await this.sendKeys(windowId, cmd, true); } catch { /* best effort */ } } return { windowId, windowName: finalName, freshSessionId }; @@ -428,17 +438,29 @@ export class TmuxManager { // appear in the process environment but not in the terminal history. // The 'source' line is visible but only shows the temp file path, not the values. await this.sendKeys(windowId, `source ${shellEscape(tmpFile)} && rm -f ${shellEscape(tmpFile)}`, true); - await sleep(500); + // #357: Brief poll for shell to process the source command (was fixed 500ms sleep) + await this.pollUntil( + async () => { try { await stat(tmpFile); return false; } catch { return true; } }, + 50, 500, + ); // Belt and suspenders: delete the file from our side too try { await fs.unlink(tmpFile); } catch { /* already deleted by shell */ } } - /** P1 fix: Check if a window exists. Returns true if window is in the session. */ + /** P1 fix: Check if a window exists. Returns true if window is in the session. + * #357: Uses a short-lived cache to avoid repeated tmux CLI calls. */ async windowExists(windowId: string): Promise { + const now = Date.now(); + const cached = this.windowCache.get(windowId); + if (cached && now - cached.timestamp < TmuxManager.WINDOW_CACHE_TTL_MS) { + return cached.exists; + } try { const windows = await this.listWindows(); - return windows.some(w => w.windowId === windowId); + const exists = windows.some(w => w.windowId === windowId); + this.windowCache.set(windowId, { exists, timestamp: now }); + return exists; } catch (e: unknown) { console.warn(`Tmux: windowExists check failed for ${windowId}: ${(e as Error).message}`); return false; @@ -504,20 +526,30 @@ export class TmuxManager { if (enter) { // CC's ! command mode: send "!" first so the TUI switches to bash mode, - // wait 1s, then send the rest. + // then send the rest after TUI acknowledges the mode switch. if (text.startsWith('!')) { await this.tmux('send-keys', '-t', target, '-l', '!'); const rest = text.slice(1); if (rest) { - await sleep(1000); + // #357: Poll for `!` to be absorbed instead of fixed 1s sleep + await this.pollUntil( + async () => { + try { + const pane = await this.capturePaneDirect(windowId); + return pane.includes('!'); + } catch { return false; } + }, + 100, 1000, + ); await this.tmux('send-keys', '-t', target, '-l', rest); } } else { // Send text literally first (no Enter) await this.tmux('send-keys', '-t', target, '-l', text); } - // P2 fix: Adaptive delay based on message length - const delay = text.length > 500 ? 2000 : 1000; + // P2 fix: Short delay for tmux to register text before Enter + // #357: Reduced from 1000/2000ms to 200/500ms + const delay = text.length > 500 ? 500 : 200; await sleep(delay); // Send Enter await this.tmux('send-keys', '-t', target, 'Enter'); @@ -608,18 +640,20 @@ export class TmuxManager { console.log(`Tmux: delivery check ${attempt}/${maxAttempts} — pane is '${preState}', skipping re-send`); } - // Graduated verification: check multiple times with increasing delays. + // #357: Poll for delivery confirmation instead of graduated fixed sleeps. // CC needs time to process input and transition states. - const checkDelays = attempt === 1 ? [800, 1500, 2500] : [500, 1500]; - for (const delay of checkDelays) { - await sleep(delay); - const delivered = await this.verifyDelivery(windowId, text, preState); - if (delivered) { - if (attempt > 1) { - console.log(`Tmux: delivery confirmed on attempt ${attempt}`); - } - return { delivered: true, attempts: attempt }; + const pollInterval = 400; + const pollTimeout = attempt === 1 ? 5000 : 3000; + const delivered = await this.pollUntil( + () => this.verifyDelivery(windowId, text, preState), + pollInterval, + pollTimeout, + ); + if (delivered) { + if (attempt > 1) { + console.log(`Tmux: delivery confirmed on attempt ${attempt}`); } + return { delivered: true, attempts: attempt }; } if (attempt < maxAttempts) { @@ -679,8 +713,8 @@ export class TmuxManager { await execFileAsync('tmux', ['-L', this.socketName, 'send-keys', '-t', target, '-l', text], { timeout: TMUX_DEFAULT_TIMEOUT_MS, }); - // Adaptive delay based on message length - const delay = text.length > 500 ? 2000 : 1000; + // #357: Reduced adaptive delay (was 1000/2000ms) + const delay = text.length > 500 ? 500 : 200; await new Promise(r => setTimeout(r, delay)); await execFileAsync('tmux', ['-L', this.socketName, 'send-keys', '-t', target, 'Enter'], { timeout: TMUX_DEFAULT_TIMEOUT_MS, @@ -701,6 +735,7 @@ export class TmuxManager { /** Kill a window. */ async killWindow(windowId: string): Promise { const target = `${this.sessionName}:${windowId}`; + this.windowCache.delete(windowId); try { await this.tmux('kill-window', '-t', target); } catch (e: unknown) { @@ -718,6 +753,20 @@ export class TmuxManager { console.warn(`Tmux: killSession failed for '${target}': ${(e as Error).message}`); } } + + /** #357: Poll until condition returns true or timeout elapses. */ + private async pollUntil( + condition: () => Promise, + intervalMs: number, + timeoutMs: number, + ): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + if (await condition()) return true; + await sleep(intervalMs); + } + return false; + } } function sleep(ms: number): Promise {