Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/__tests__/swarm-monitor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ describe('SwarmMonitor with mocked parent sessions', () => {
it('should not match session without ccPid', async () => {
const session = makeSession({
id: 'no-pid-session',
activeSubagents: ['explore-agent'],
activeSubagents: new Set(['explore-agent']),
status: 'working',
});

Expand Down
10 changes: 10 additions & 0 deletions src/auth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,17 @@ export class AuthManager {

// Valid — consume it
entry.used = true;
const keyId = entry.keyId;
this.sseTokens.delete(token);
// #357: Decrement outstanding count so generateSSEToken doesn't over-limit
const count = this.sseTokenCounts.get(keyId);
if (count !== undefined) {
if (count <= 1) {
this.sseTokenCounts.delete(keyId);
} else {
this.sseTokenCounts.set(keyId, count - 1);
}
}
return true;
}

Expand Down
2 changes: 2 additions & 0 deletions src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,12 @@ export class SessionEventBus {
}
// Clean up after a short delay (let clients receive the event)
// Capture reference — only delete if it's still the same emitter
// #357: Also delete the per-session event buffer to prevent unbounded map growth
setTimeout(() => {
if (this.emitters.get(sessionId) === emitter) {
this.emitters.delete(sessionId);
}
this.eventBuffers.delete(sessionId);
}, 1000);
}

Expand Down
6 changes: 6 additions & 0 deletions src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,12 @@ export class MetricsCollector {
this.latency.delete(sessionId);
}

/** #357: Clean up all per-session data (call on session destroy). */
cleanupSession(sessionId: string): void {
this.perSession.delete(sessionId);
this.latency.delete(sessionId);
}

getGlobalMetrics(activeSessionCount: number): Record<string, unknown> {
const avgMessages = this.global.sessionsCreated > 0
? Math.round(this.global.totalMessages / this.global.sessionsCreated) : 0;
Expand Down
25 changes: 24 additions & 1 deletion src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ async function handleInbound(cmd: InboundCommand): Promise<void> {
await channels.sessionEnded(makePayload('session.ended', cmd.sessionId, 'killed'));
await sessions.killSession(cmd.sessionId);
monitor.removeSession(cmd.sessionId);
metrics.cleanupSession(cmd.sessionId);
break;
case 'message':
case 'command':
Expand Down Expand Up @@ -155,6 +156,17 @@ function checkIpRateLimit(ip: string, isMaster: boolean): boolean {
return timestamps.length > limit;
}

/** #357: Prune IPs whose timestamp arrays are entirely outside the rate-limit window. */
function pruneIpRateLimits(): void {
const cutoff = Date.now() - IP_WINDOW_MS;
for (const [ip, timestamps] of ipRateLimits) {
// All timestamps are old — remove the entry entirely
if (timestamps.length === 0 || timestamps[timestamps.length - 1]! < cutoff) {
ipRateLimits.delete(ip);
}
}
}

function setupAuth(authManager: AuthManager): void {
app.addHook('onRequest', async (req, reply) => {
// Skip auth for health endpoint and dashboard (Issue #349: exact path matching)
Expand Down Expand Up @@ -772,6 +784,7 @@ app.delete<{ Params: { id: string } }>('/v1/sessions/:id', async (req, reply) =>
await channels.sessionEnded(makePayload('session.ended', req.params.id, 'killed'));
await sessions.killSession(req.params.id);
monitor.removeSession(req.params.id);
metrics.cleanupSession(req.params.id);
return { ok: true };
} catch (e: unknown) {
return reply.status(404).send({ error: e instanceof Error ? e.message : String(e) });
Expand All @@ -783,6 +796,7 @@ app.delete<{ Params: { id: string } }>('/sessions/:id', async (req, reply) => {
await channels.sessionEnded(makePayload('session.ended', req.params.id, 'killed'));
await sessions.killSession(req.params.id);
monitor.removeSession(req.params.id);
metrics.cleanupSession(req.params.id);
return { ok: true };
} catch (e: unknown) {
return reply.status(404).send({ error: e instanceof Error ? e.message : String(e) });
Expand Down Expand Up @@ -1171,6 +1185,7 @@ async function reapStaleSessions(maxAgeMs: number): Promise<void> {
});
await sessions.killSession(session.id);
monitor.removeSession(session.id);
metrics.cleanupSession(session.id);
} catch (e) {
console.error(`Reaper: failed to kill session ${session.id}:`, e);
}
Expand Down Expand Up @@ -1198,6 +1213,7 @@ async function reapZombieSessions(): Promise<void> {
try {
monitor.removeSession(session.id);
await sessions.killSession(session.id);
metrics.cleanupSession(session.id);
await channels.sessionEnded({
event: 'session.ended',
timestamp: new Date().toISOString(),
Expand All @@ -1214,7 +1230,11 @@ async function reapZombieSessions(): Promise<void> {

/** Issue #20: Add actionHints to session response for interactive states. */
function addActionHints(session: import('./session.js').SessionInfo): Record<string, unknown> {
const result: Record<string, unknown> = { ...session };
// #357: Convert Set to array for JSON serialization
const result: Record<string, unknown> = {
...session,
activeSubagents: session.activeSubagents ? [...session.activeSubagents] : undefined,
};
if (session.status === 'permission_prompt' || session.status === 'bash_approval') {
result.actionHints = {
approve: { method: 'POST', url: `/v1/sessions/${session.id}/approve`, description: 'Approve the pending permission' },
Expand Down Expand Up @@ -1508,6 +1528,8 @@ async function main(): Promise<void> {
const reaperInterval = setInterval(() => reapStaleSessions(config.maxSessionAgeMs), config.reaperIntervalMs);
const zombieReaperInterval = setInterval(() => reapZombieSessions(), ZOMBIE_REAP_INTERVAL_MS);
const metricsSaveInterval = setInterval(() => { void metrics.save(); }, 5 * 60 * 1000);
// #357: Prune stale IP rate-limit entries every minute
const ipPruneInterval = setInterval(pruneIpRateLimits, 60_000);

// Issue #361: Graceful shutdown handler
let shuttingDown = false;
Expand All @@ -1525,6 +1547,7 @@ async function main(): Promise<void> {
clearInterval(reaperInterval);
clearInterval(zombieReaperInterval);
clearInterval(metricsSaveInterval);
clearInterval(ipPruneInterval);

// 3. Destroy channels (awaits Telegram poll loop)
try { await channels.destroy(); } catch (e) { console.error('Error destroying channels:', e); }
Expand Down
87 changes: 65 additions & 22 deletions src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export interface SessionInfo {
settingsPatched?: boolean; // Permission guard: settings.local.json was patched
hookSettingsFile?: string; // Temp file with HTTP hook settings (Issue #169)
lastHookAt?: number; // Unix timestamp of last received hook event (Issue #169 Phase 3)
activeSubagents?: string[]; // Active subagent names (Issue #88)
activeSubagents?: Set<string>; // Active subagent names (Issue #88, #357: Set for O(1))
// Issue #87: Latency metrics
permissionPromptAt?: number; // Unix timestamp when permission prompt was detected
permissionRespondedAt?: number; // Unix timestamp when user approved/rejected
Expand Down Expand Up @@ -92,8 +92,12 @@ export class SessionManager {
private sessionMapFile: string;
private pollTimers: Map<string, NodeJS.Timeout> = new Map();
private saveQueue: Promise<void> = Promise.resolve(); // #218: serialize concurrent saves
private saveDebounceTimer: NodeJS.Timeout | null = null;
private static readonly SAVE_DEBOUNCE_MS = 5_000; // #357: debounce offset-only saves
private pendingPermissions: Map<string, PendingPermission> = new Map();
private pendingQuestions: Map<string, PendingQuestion> = new Map();
// #357: Cache of all parsed JSONL entries per session to avoid re-reading from offset 0
private parsedEntriesCache = new Map<string, { entries: ParsedEntry[]; offset: number }>();

constructor(
private tmux: TmuxManager,
Expand Down Expand Up @@ -170,6 +174,13 @@ export class SessionManager {
}
}

// #357: Convert deserialized activeSubagents arrays to Sets
for (const session of Object.values(this.state.sessions)) {
if (Array.isArray(session.activeSubagents)) {
session.activeSubagents = new Set(session.activeSubagents);
}
}

// Create backup of successfully loaded state
try {
await writeFile(`${this.stateFile}.bak`, JSON.stringify(this.state, null, 2));
Expand Down Expand Up @@ -252,13 +263,27 @@ export class SessionManager {
await this.saveQueue;
}

/** #357: Debounced save — skips immediate save for offset-only changes.
* Coalesces rapid successive reads into a single disk write. */
debouncedSave(): void {
if (this.saveDebounceTimer !== null) clearTimeout(this.saveDebounceTimer);
this.saveDebounceTimer = setTimeout(() => {
this.saveDebounceTimer = null;
void this.save();
}, SessionManager.SAVE_DEBOUNCE_MS);
}

private async doSave(): Promise<void> {
const dir = dirname(this.stateFile);
if (!existsSync(dir)) {
await mkdir(dir, { recursive: true });
}
const tmpFile = `${this.stateFile}.tmp`;
await writeFile(tmpFile, JSON.stringify(this.state, null, 2));
// #357: Use replacer to serialize Set<string> as arrays
await writeFile(tmpFile, JSON.stringify(this.state, (_, value) => {
if (value instanceof Set) return [...value];
return value;
}, 2));
await rename(tmpFile, this.stateFile);
}

Expand Down Expand Up @@ -538,17 +563,15 @@ export class SessionManager {
addSubagent(id: string, name: string): void {
const session = this.state.sessions[id];
if (!session) return;
if (!session.activeSubagents) session.activeSubagents = [];
if (!session.activeSubagents.includes(name)) {
session.activeSubagents.push(name);
}
if (!session.activeSubagents) session.activeSubagents = new Set<string>();
session.activeSubagents.add(name);
}

/** Issue #88: Remove an active subagent from a session. */
removeSubagent(id: string, name: string): void {
const session = this.state.sessions[id];
if (!session || !session.activeSubagents) return;
session.activeSubagents = session.activeSubagents.filter(n => n !== name);
session.activeSubagents.delete(name);
}

/** Issue #89 L25: Update the model field on a session from hook payload. */
Expand Down Expand Up @@ -965,7 +988,9 @@ export class SessionManager {
}
}

await this.save();
// #357: Debounce saves on GET reads — offsets change frequently but disk
// writes are expensive. Full save still happens on create/kill/reconcile.
this.debouncedSave();

return {
messages,
Expand Down Expand Up @@ -1022,6 +1047,27 @@ export class SessionManager {
};
}

/** #357: Get all parsed entries for a session, using a cache to avoid full reparse.
* Reads only the delta from the last cached offset. */
private async getCachedEntries(session: SessionInfo): Promise<ParsedEntry[]> {
if (!session.jsonlPath || !existsSync(session.jsonlPath)) return [];
const cached = this.parsedEntriesCache.get(session.id);
try {
const fromOffset = cached ? cached.offset : 0;
const result = await readNewEntries(session.jsonlPath, fromOffset);
if (cached) {
cached.entries.push(...result.entries);
cached.offset = result.newOffset;
return cached.entries;
}
// First read — cache it
this.parsedEntriesCache.set(session.id, { entries: [...result.entries], offset: result.newOffset });
return result.entries;
} catch {
return cached ? [...cached.entries] : [];
}
}

/** Issue #35: Get a condensed summary of a session's transcript. */
async getSummary(id: string, maxMessages = 20): Promise<{
sessionId: string;
Expand All @@ -1036,14 +1082,8 @@ export class SessionManager {
const session = this.state.sessions[id];
if (!session) throw new Error(`Session ${id} not found`);

// Read ALL messages from the beginning for summary
let allMessages: ParsedEntry[] = [];
if (session.jsonlPath && existsSync(session.jsonlPath)) {
try {
const result = await readNewEntries(session.jsonlPath, 0);
allMessages = result.entries;
} catch { /* file may be corrupted */ }
}
// #357: Use cached entries instead of re-reading from offset 0
const allMessages = await this.getCachedEntries(session);

// Take last N messages
const recent = allMessages.slice(-maxMessages).map(m => ({
Expand Down Expand Up @@ -1085,12 +1125,8 @@ export class SessionManager {
}

let allEntries: ParsedEntry[] = [];
if (session.jsonlPath && existsSync(session.jsonlPath)) {
try {
const result = await readNewEntries(session.jsonlPath, 0);
allEntries = result.entries;
} catch { /* file may be corrupted */ }
}
// #357: Use cached entries instead of re-reading from offset 0
allEntries = await this.getCachedEntries(session);

if (roleFilter) {
allEntries = allEntries.filter(e => e.role === roleFilter);
Expand Down Expand Up @@ -1141,6 +1177,13 @@ export class SessionManager {
this.cleanupPendingQuestion(id);

delete this.state.sessions[id];
// #357: Clean up parsed entries cache
this.parsedEntriesCache.delete(id);
// #357: Cancel any pending debounced save before doing an immediate save
if (this.saveDebounceTimer !== null) {
clearTimeout(this.saveDebounceTimer);
this.saveDebounceTimer = null;
}
await this.save();
}

Expand Down
Loading
Loading