From db7d7bb6b143f64b2acd9b2fd2bb111610f05d27 Mon Sep 17 00:00:00 2001 From: David Gornshtein Date: Thu, 9 Apr 2026 03:09:29 +0300 Subject: [PATCH 01/14] perf: async analytics/leaderboard + SQLite FTS5 + live partial results MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stops the GUI from hanging on users with large session histories (4800+ codex rollouts / 1 GB+ JSONL). Tested on a 4873-session corpus where cold Analytics previously took 100+ seconds to respond. ## Core changes - **Non-blocking loadSessions** — sync path reads metadata only, uses the parse/cost disk caches, and queues uncached files for a background warmer. Cold `/api/sessions` now returns in ~300ms instead of blocking for 100+ seconds. - **Async background jobs** for `/api/analytics/cost` and `/api/leaderboard`. HTTP returns a `{status, progress, partialResult}` snapshot immediately; the client polls at 500ms and the job publishes a live partial aggregate on each chunk so users see real numbers climb ($0 → $5562 → $11344 → ... → final). - **Incremental cost aggregator** (`createCostAggregator`) — extracted from `getCostAnalytics` so the job can merge sessions one chunk at a time and finalize a snapshot per yield. - **SQLite + FTS5 index** (`src/sqlite-index.js`) at `~/.codedash/cache/index.sqlite` — persistent sessions/messages/ messages_fts (FTS5 porter+unicode61)/daily_stats/files_seen/ aggregate_cache tables. Full-text search runs via `MATCH` in ~5 ms on 1.2M messages. Uses async `spawn` with `-cmd .timeout 30000` so concurrent writers don't deadlock. - **Aggregate result cache** persisted in `aggregate_cache` with a 5-min quantized fingerprint (`count|max_ts_bucket|filters|helpers`). Repeat visits within the bucket are instant; active codex writes don't invalidate the cache on every request. ## Hot-path fixes found along the way - `getActiveSessions` ran `lsof` per matching process with a 2s timeout and called `loadSessions()` inside the loop → blocked the event loop for minutes when 30+ codex-up wrappers were running. Now: single `ps` call, tight regex (catches `codex`/`codex-up`/ `codex-up-exec` as binary names), batched `lsof -a -d cwd -Fpn -p` (the `-a` flag is critical; without it lsof ORs conditions and returns cwds for unrelated processes), pid→cwd cache, 3s result cache, no inner loadSessions. 80s → 350ms cold, 0ms cached. - `resolveGitRoot` called `git rev-parse` synchronously per unique project path — 56 projects × 2s = 112s of blocked event loop. Now queued to a background resolver with its own disk cache in `~/.codedash/cache/git-root-cache.json`. - `scanCodexSessions` was O(n²) via `.find()` on an array, and re-read every rollout file on each call. Now: `Map` + `cacheOnly` parse mode + background warmer drains uncached files. - `parseClaudeSessionFileAsync` — streaming read via `readline` for files >5 MB (user had a 199 MB session file). Yields to the event loop every 2000 lines via `await new Promise(r => setImmediate(r))` so HTTP requests aren't starved during parse. - Persistent cache paths moved from `os.tmpdir()` to `~/.codedash/cache/` so macOS tmpdir cleanup doesn't wipe a morning's worth of parse work. - Flush handlers on SIGINT/SIGTERM + periodic flush every 50 entries — killing the server mid-warm no longer loses hours of progress. ## Accuracy fixes - `parseClaudeSessionFile` counted every `type=user` entry as a user prompt, but Claude Code stores tool_results as `type=user` with `content: [{type:'tool_result', ...}]`. One measured session had 480 type=user entries but only 17 real user prompts — a 28x overcount. Now checks `content` for a real `text` block. - `isSystemMessage` extended to skip Codex runtime injections that were counted as user prompts: ``, ``, ``, ``, `# CLAUDE.md`, `Warning: The maximum number of unified exec`, `AUTOSTEERING:`, `[Sub-agent results]`. - **Helper session detection** — codex rollouts with `session_meta.payload.originator === 'codex_exec'` (or scripted first-message patterns like `You are in /...`, `Read-only task.`, `Work in /...`, `Pair-local ... lane`, `## X Y Agent`, etc) are flagged `is_helper: true`. `/api/sessions` filters them by default; `?include_helpers=1` opts in. On the test corpus this removes 2166/4873 scripted sub-agent runs. - Leaderboard now counts **unique conversations** (via `group_key`), not retries. Real cost is summed over all rollouts (actual money spent); session count uses deduped groups. On the test corpus: 2869 → 612 unique conversations, 867k → 8458 real prompts, cost stays at the real $53,637. ## Shared grouping helper - `computeSessionGroupKey(s)` in data.js: `tool::project::firstMsg[0..200]` (or `helper::project` for helpers) — computed once per session on load, exposed as `s.group_key`. - `groupSessionsByConversation(sessions)` in frontend/app.js — shared by **Timeline**, **All Sessions**, **Projects view**, **Cloud Sync**, and **Activity/Heatmap**. One helper, one representative per group, `+N more` badge on cards. ## Logging - `CODEDASH_LOG=0` (default) silences stdout spam (previous `[ACTIVE] pid=... codex/waiting cpu=0%` lines were emitted on every /api/active poll). `ERROR`/`WARN`/`JOB` still go to stdout. - All logs (including verbose tags) always go to `~/.codedash/logs/server.log` with timestamps. Set `CODEDASH_LOG=1` to also mirror to stdout. ## Exports / new endpoints - `loadSessionsAsync(progressCb)` — async variant with incremental mtime-based change detection. - `getWarmingStatus()` + `/api/warming` — background parse progress. - `getSqliteBackfillStatus()` + `/api/sqlite-status` — FTS5 ingest progress + index size. - `createCostAggregator()`, `computeSessionCostForAnalytics(session, opencodeCache)`, `buildOpencodeCostCache(sessions)` — so the async jobs can stream-aggregate without re-wiring `getCostAnalytics`. ## Benchmark (user's machine, 4873 sessions / 1.1 GB JSONL) ``` before (v6.15.10) after loadSessions 109 s (cold) 72 ms (cacheOnly path) /api/active 80 s (34 procs) 350 ms cold, 0 ms warm search 3–10 s (rebuild) 5 ms (FTS5 MATCH) analytics 30 s (blocking) first partial in ~500 ms, full in ~15 s, instant on cache hit leaderboard 35 s (blocking) first partial in ~500 ms, full in ~15 s, instant on cache hit ``` Browser cache note: after upgrading, users should hard-reload (Cmd+Shift+R) once so the split frontend modules re-load — the poll loops are new code and old cached JS will show the initial "Loading..." spinner without progressing. --- src/data.js | 1623 +++++++++++++++++++++++++++++------ src/frontend/analytics.js | 177 +++- src/frontend/app.js | 88 +- src/frontend/cloud.js | 13 +- src/frontend/heatmap.js | 10 +- src/frontend/leaderboard.js | 32 +- src/frontend/styles.css | 111 +++ src/server.js | 426 ++++++++- src/sqlite-index.js | 479 +++++++++++ 9 files changed, 2668 insertions(+), 291 deletions(-) create mode 100644 src/sqlite-index.js diff --git a/src/data.js b/src/data.js index 55b1eee..298b45f 100644 --- a/src/data.js +++ b/src/data.js @@ -98,10 +98,18 @@ function parseOpenCodeMcpServer(toolName) { return toolName.slice(0, idx); } +// Persistent cache dir (survives tmpdir cleanup and process kills) +const CODEDASH_CACHE_DIR = path.join(os.homedir(), '.codedash', 'cache'); +try { fs.mkdirSync(CODEDASH_CACHE_DIR, { recursive: true }); } catch {} + // Disk cache for parsed Claude session files (keyed by path + mtime + size) -const PARSED_CACHE_FILE = path.join(os.tmpdir(), 'codedash-parsed-cache.json'); +const PARSED_CACHE_FILE = path.join(CODEDASH_CACHE_DIR, 'parsed-cache.json'); +// Legacy tmpdir path (migration: read once, then ignore) +const LEGACY_PARSED_CACHE_FILE = path.join(os.tmpdir(), 'codedash-parsed-cache.json'); let _parsedDiskCache = null; let _parsedDiskCacheDirty = false; +let _parsedDiskCacheEntriesSinceFlush = 0; +const PARSED_CACHE_FLUSH_EVERY = 50; // flush after every N new entries // Reverse index: file path -> cache key (avoids repeated fs.statSync) const _fileCacheKeyIndex = {}; @@ -110,20 +118,490 @@ function _loadParsedDiskCache() { try { if (fs.existsSync(PARSED_CACHE_FILE)) { _parsedDiskCache = JSON.parse(fs.readFileSync(PARSED_CACHE_FILE, 'utf8')); + } else if (fs.existsSync(LEGACY_PARSED_CACHE_FILE)) { + // Migrate from tmpdir once + _parsedDiskCache = JSON.parse(fs.readFileSync(LEGACY_PARSED_CACHE_FILE, 'utf8')); + _parsedDiskCacheDirty = true; } } catch {} if (!_parsedDiskCache) _parsedDiskCache = {}; } -function _saveParsedDiskCache() { +function _saveParsedDiskCache(force) { if (!_parsedDiskCacheDirty || !_parsedDiskCache) return; + if (!force && _parsedDiskCacheEntriesSinceFlush < PARSED_CACHE_FLUSH_EVERY) return; try { - fs.writeFileSync(PARSED_CACHE_FILE, JSON.stringify(_parsedDiskCache)); + // Atomic write: write to .tmp then rename + const tmp = PARSED_CACHE_FILE + '.tmp'; + fs.writeFileSync(tmp, JSON.stringify(_parsedDiskCache)); + fs.renameSync(tmp, PARSED_CACHE_FILE); _parsedDiskCacheDirty = false; + _parsedDiskCacheEntriesSinceFlush = 0; + } catch {} +} + +// ── Disk cache for computed session cost (path+mtime+size → cost) ───────── +const COST_CACHE_FILE = path.join(CODEDASH_CACHE_DIR, 'cost-cache.json'); +let _costDiskCache = null; +let _costDiskCacheDirty = false; +let _costDiskCacheEntriesSinceFlush = 0; +const COST_CACHE_FLUSH_EVERY = 50; + +function _loadCostDiskCache() { + if (_costDiskCache) return; + try { + if (fs.existsSync(COST_CACHE_FILE)) { + _costDiskCache = JSON.parse(fs.readFileSync(COST_CACHE_FILE, 'utf8')); + } + } catch {} + if (!_costDiskCache) _costDiskCache = {}; +} + +function _saveCostDiskCache(force) { + if (!_costDiskCacheDirty || !_costDiskCache) return; + if (!force && _costDiskCacheEntriesSinceFlush < COST_CACHE_FLUSH_EVERY) return; + try { + const tmp = COST_CACHE_FILE + '.tmp'; + fs.writeFileSync(tmp, JSON.stringify(_costDiskCache)); + fs.renameSync(tmp, COST_CACHE_FILE); + _costDiskCacheDirty = false; + _costDiskCacheEntriesSinceFlush = 0; } catch {} } -function parseClaudeSessionFile(sessionFile) { +// ── Background parse warming state ──────────────────────── +// Populated by sync loadSessions() with file paths whose parse cache is stale. +// A singleton background task drains this set with setImmediate yielding so +// the HTTP event loop stays responsive. UI can read _warmingStatus for progress. +const _pendingParseFiles = new Set(); +let _warmingRunning = false; +const _warmingStatus = { + running: false, + done: 0, + total: 0, + phase: 'idle', + startedAt: 0, + finishedAt: 0, +}; + +// ── SQLite ingest helper ──────────────────────────────────── +// Parse a single Claude session file fully (messages + metadata) and push +// a batch row into the SQLite index. Used by the background warmer after +// the parsed-cache entry has been created. Cheap when already indexed. +let _sqliteIngestBatch = []; +const _SQLITE_INGEST_FLUSH_EVERY = 5; + +function _ingestClaudeFileToSqlite(filePath) { + let sqliteIndex; + try { sqliteIndex = require('./sqlite-index'); } catch { return; } + try { + if (sqliteIndex.isFileCurrent(filePath)) return; // already indexed + } catch { return; } + + let stat; + try { stat = fs.statSync(filePath); } catch { return; } + + // Full read — at ingest time we need messages for FTS. Streaming for big + // files would be nicer but the warmer yields between files already. + let lines; + try { lines = readLines(filePath); } catch { return; } + + const sid = path.basename(filePath, '.jsonl'); + let projectPath = ''; + let tool = 'claude'; + let firstTs = stat.mtimeMs; + let lastTs = stat.mtimeMs; + let firstMsg = ''; + let userMsgCount = 0; + let totalMsgCount = 0; + const msgs = []; + const dayMsgs = {}; // day → {messages, first_ts, last_ts} + let seq = 0; + + for (const line of lines) { + try { + const entry = JSON.parse(line); + if (entry.type !== 'user' && entry.type !== 'assistant') continue; + const role = entry.type; + const content = extractContent((entry.message || {}).content); + if (!content) continue; + const ts = entry.timestamp ? (typeof entry.timestamp === 'number' ? entry.timestamp : new Date(entry.timestamp).getTime()) : 0; + if (ts) { + if (ts < firstTs) firstTs = ts; + if (ts > lastTs) lastTs = ts; + } + if (!projectPath && entry.cwd) projectPath = entry.cwd; + totalMsgCount++; + if (role === 'user') { + userMsgCount++; + if (!firstMsg && content) firstMsg = content.slice(0, 200); + } + msgs.push({ seq: seq++, role, ts, content: content.slice(0, 8000) }); + // Day breakdown for user messages only + if (role === 'user' && ts > 1000000000000) { + const d = new Date(ts); + const day = d.getFullYear() + '-' + String(d.getMonth()+1).padStart(2,'0') + '-' + String(d.getDate()).padStart(2,'0'); + if (!dayMsgs[day]) dayMsgs[day] = { messages: 0, first_ts: ts, last_ts: ts }; + dayMsgs[day].messages++; + if (ts < dayMsgs[day].first_ts) dayMsgs[day].first_ts = ts; + if (ts > dayMsgs[day].last_ts) dayMsgs[day].last_ts = ts; + } + } catch {} + } + + // Convert dayMsgs to daily_stats rows + const daily = {}; + for (const day in dayMsgs) { + const dm = dayMsgs[day]; + daily[day] = { + messages: dm.messages, + hours: Math.min((dm.last_ts - dm.first_ts) / 3600000, 16), + }; + } + + _sqliteIngestBatch.push({ + filePath, + session: { + id: sid, + tool, + project: projectPath, + project_short: projectPath.replace(os.homedir(), '~'), + first_ts: firstTs, + last_ts: lastTs, + messages: totalMsgCount, + user_messages: userMsgCount, + file_size: stat.size, + first_message: firstMsg, + source_mtime: stat.mtimeMs, + source_size: stat.size, + }, + messages: msgs, + daily, + }); + + // Don't flush here — backfill loop awaits the flush between files. +} + +// Ingest a single Codex rollout JSONL into SQLite. +function _ingestCodexFileToSqlite(filePath) { + let sqliteIndex; + try { sqliteIndex = require('./sqlite-index'); } catch { return; } + try { if (sqliteIndex.isFileCurrent(filePath)) return; } catch { return; } + + let stat; + try { stat = fs.statSync(filePath); } catch { return; } + + // Extract session id from filename + const basename = path.basename(filePath, '.jsonl'); + const uuidMatch = basename.match(/([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})/); + if (!uuidMatch) return; + const sid = uuidMatch[1]; + + let lines; + try { lines = readLines(filePath); } catch { return; } + + let projectPath = ''; + let firstTs = stat.mtimeMs; + let lastTs = stat.mtimeMs; + let firstMsg = ''; + let userMsgCount = 0; + let totalMsgCount = 0; + const msgs = []; + const dayMsgs = {}; + let seq = 0; + + for (const line of lines) { + try { + const entry = JSON.parse(line); + if (entry.type === 'session_meta' && entry.payload && entry.payload.cwd && !projectPath) { + projectPath = entry.payload.cwd; + continue; + } + if (entry.type !== 'response_item' || !entry.payload) continue; + const role = entry.payload.role; + if (role !== 'user' && role !== 'assistant') continue; + const content = extractContent(entry.payload.content); + if (!content || isSystemMessage(content)) continue; + let ts = 0; + const tsVal = entry.timestamp || entry.ts; + if (typeof tsVal === 'number') ts = tsVal; + else if (typeof tsVal === 'string') ts = Date.parse(tsVal) || 0; + if (ts && ts < firstTs) firstTs = ts; + if (ts && ts > lastTs) lastTs = ts; + + totalMsgCount++; + if (role === 'user') { + userMsgCount++; + if (!firstMsg) firstMsg = content.slice(0, 200); + } + msgs.push({ seq: seq++, role, ts, content: content.slice(0, 8000) }); + if (role === 'user' && ts > 1000000000000) { + const d = new Date(ts); + const day = d.getFullYear() + '-' + String(d.getMonth()+1).padStart(2,'0') + '-' + String(d.getDate()).padStart(2,'0'); + if (!dayMsgs[day]) dayMsgs[day] = { messages: 0, first_ts: ts, last_ts: ts }; + dayMsgs[day].messages++; + if (ts < dayMsgs[day].first_ts) dayMsgs[day].first_ts = ts; + if (ts > dayMsgs[day].last_ts) dayMsgs[day].last_ts = ts; + } + } catch {} + } + + const daily = {}; + for (const day in dayMsgs) { + const dm = dayMsgs[day]; + daily[day] = { + messages: dm.messages, + hours: Math.min((dm.last_ts - dm.first_ts) / 3600000, 16), + }; + } + + _sqliteIngestBatch.push({ + filePath, + session: { + id: sid, + tool: 'codex', + project: projectPath, + project_short: projectPath.replace(os.homedir(), '~'), + first_ts: firstTs, + last_ts: lastTs, + messages: totalMsgCount, + user_messages: userMsgCount, + file_size: stat.size, + first_message: firstMsg, + source_mtime: stat.mtimeMs, + source_size: stat.size, + }, + messages: msgs, + daily, + }); + + // Don't flush here — backfill loop awaits the flush between files. +} + +async function _flushSqliteIngestBatch() { + if (_sqliteIngestBatch.length === 0) return; + const batch = _sqliteIngestBatch; + _sqliteIngestBatch = []; // take ownership before the async call + try { + const sqliteIndex = require('./sqlite-index'); + await sqliteIndex.indexBatchAsync(batch); + } catch (e) { + try { console.error('sqlite ingest flush failed:', e.message); } catch {} + } +} + +// ── SQLite backfill ──────────────────────────────────────── +// One-shot background task that iterates all existing Claude session files +// and ingests them into SQLite (if not already there, matched by mtime+size +// via files_seen). Yields between files. +let _sqliteBackfillRunning = false; +const _sqliteBackfillStatus = { + running: false, + done: 0, + total: 0, + phase: 'idle', + startedAt: 0, + finishedAt: 0, +}; + +function getSqliteBackfillStatus() { + return Object.assign({}, _sqliteBackfillStatus); +} + +function _ensureSqliteBackfillRunning() { + if (_sqliteBackfillRunning) return; + let sqliteIndex; + try { sqliteIndex = require('./sqlite-index'); } catch { return; } + + _sqliteBackfillRunning = true; + _sqliteBackfillStatus.running = true; + _sqliteBackfillStatus.startedAt = Date.now(); + _sqliteBackfillStatus.phase = 'scanning'; + _sqliteBackfillStatus.done = 0; + + setImmediate(async () => { + try { + sqliteIndex.ensureSchema(); + + // Enumerate all Claude JSONL files + Codex session files + const allFiles = []; // array of {file, kind} + const walkClaude = (dir) => { + try { + for (const proj of fs.readdirSync(dir)) { + const pDir = path.join(dir, proj); + try { + if (!fs.statSync(pDir).isDirectory()) continue; + for (const f of fs.readdirSync(pDir)) { + if (f.endsWith('.jsonl')) allFiles.push({ file: path.join(pDir, f), kind: 'claude' }); + } + } catch {} + } + } catch {} + }; + if (fs.existsSync(PROJECTS_DIR)) walkClaude(PROJECTS_DIR); + for (const extra of EXTRA_CLAUDE_DIRS) { + const ep = path.join(extra, 'projects'); + if (fs.existsSync(ep)) walkClaude(ep); + } + // Codex session files (~/.codex/sessions/YYYY/MM/DD/rollout-*.jsonl) + const codexSessDir = path.join(CODEX_DIR, 'sessions'); + if (fs.existsSync(codexSessDir)) { + const walkCodex = (dir) => { + try { + for (const entry of fs.readdirSync(dir, { withFileTypes: true })) { + const full = path.join(dir, entry.name); + if (entry.isDirectory()) walkCodex(full); + else if (entry.name.endsWith('.jsonl')) allFiles.push({ file: full, kind: 'codex' }); + } + } catch {} + }; + walkCodex(codexSessDir); + } + + _sqliteBackfillStatus.total = allFiles.length; + _sqliteBackfillStatus.phase = 'ingesting'; + + // Sort newest first so fresh sessions hit the index sooner + allFiles.sort((a, b) => { + try { return fs.statSync(b.file).mtimeMs - fs.statSync(a.file).mtimeMs; } catch { return 0; } + }); + + // Single bulk read of files_seen — avoids N sync SQL calls + const filesSeen = sqliteIndex.loadAllFilesSeen(); + + for (const item of allFiles) { + try { + let stat; + try { stat = fs.statSync(item.file); } catch { _sqliteBackfillStatus.done++; continue; } + const seen = filesSeen.get(item.file); + const isCurrent = seen && seen.mtime === stat.mtimeMs && seen.size === stat.size; + if (!isCurrent) { + if (item.kind === 'codex') { + _ingestCodexFileToSqlite(item.file); + } else { + _ingestClaudeFileToSqlite(item.file); + } + if (_sqliteIngestBatch.length >= _SQLITE_INGEST_FLUSH_EVERY) { + await _flushSqliteIngestBatch(); + } + } + } catch {} + _sqliteBackfillStatus.done++; + // Yield every 10 files so HTTP requests can slot in + if (_sqliteBackfillStatus.done % 10 === 0) { + await new Promise(r => setImmediate(r)); + } + } + await _flushSqliteIngestBatch(); + + _sqliteBackfillStatus.phase = 'done'; + _sqliteBackfillStatus.finishedAt = Date.now(); + } catch (e) { + _sqliteBackfillStatus.phase = 'error: ' + (e && e.message || 'unknown'); + _sqliteBackfillStatus.finishedAt = Date.now(); + } finally { + _sqliteBackfillStatus.running = false; + _sqliteBackfillRunning = false; + } + }); +} + +function _ensureWarmingRunning() { + if (_warmingRunning) return; + if (_pendingParseFiles.size === 0 && _pendingCodexFiles.size === 0) return; + _warmingRunning = true; + _warmingStatus.running = true; + _warmingStatus.startedAt = Date.now(); + _warmingStatus.done = 0; + _warmingStatus.total = _pendingParseFiles.size + _pendingCodexFiles.size; + _warmingStatus.phase = 'parsing session files'; + setImmediate(async () => { + try { + while (_pendingParseFiles.size > 0 || _pendingCodexFiles.size > 0) { + // Claude batch (newest-first) + if (_pendingParseFiles.size > 0) { + const batch = Array.from(_pendingParseFiles); + batch.sort((a, b) => { + try { return fs.statSync(b).mtimeMs - fs.statSync(a).mtimeMs; } catch { return 0; } + }); + _warmingStatus.total = _warmingStatus.done + batch.length + _pendingCodexFiles.size; + for (const file of batch) { + _pendingParseFiles.delete(file); + try { + let stat; + try { stat = fs.statSync(file); } catch { _warmingStatus.done++; continue; } + if (stat.size >= 5 * 1024 * 1024) { + await parseClaudeSessionFileAsync(file); + } else { + parseClaudeSessionFile(file); + await new Promise(r => setImmediate(r)); + } + // Also push into persistent SQLite index (batched) + try { _ingestClaudeFileToSqlite(file); } catch {} + } catch {} + _warmingStatus.done++; + } + _flushSqliteIngestBatch(); + } + // Codex batch + if (_pendingCodexFiles.size > 0) { + const batch = Array.from(_pendingCodexFiles); + batch.sort((a, b) => { + try { return fs.statSync(b).mtimeMs - fs.statSync(a).mtimeMs; } catch { return 0; } + }); + _warmingStatus.total = _warmingStatus.done + batch.length + _pendingParseFiles.size; + for (const file of batch) { + _pendingCodexFiles.delete(file); + try { + parseCodexSessionFile(file); + await new Promise(r => setImmediate(r)); + } catch {} + _warmingStatus.done++; + } + } + // Invalidate session cache so next /api/sessions picks up new detail + _sessionsCache = null; + _sessionsCacheTs = 0; + } + _saveParsedDiskCache(true); + _flushSqliteIngestBatch(); + _warmingStatus.phase = 'done'; + _warmingStatus.finishedAt = Date.now(); + } catch (e) { + _warmingStatus.phase = 'error: ' + (e && e.message || 'unknown'); + _warmingStatus.finishedAt = Date.now(); + } finally { + _warmingStatus.running = false; + _warmingRunning = false; + } + }); +} + +function getWarmingStatus() { + return Object.assign({}, _warmingStatus, { + pending: _pendingParseFiles.size, + }); +} + +// Flush all caches on process exit / signals so partial progress isn't lost +let _flushHandlersInstalled = false; +function _installFlushHandlers() { + if (_flushHandlersInstalled) return; + _flushHandlersInstalled = true; + const flushAll = () => { + try { _saveParsedDiskCache(true); } catch {} + try { _saveCostDiskCache(true); } catch {} + try { if (typeof _saveDailyStatsDiskCache === 'function') _saveDailyStatsDiskCache(); } catch {} + try { if (typeof _saveGitRootDiskCache === 'function') _saveGitRootDiskCache(); } catch {} + try { if (typeof _flushSqliteIngestBatch === 'function') _flushSqliteIngestBatch(); } catch {} + }; + process.on('exit', flushAll); + process.on('SIGINT', () => { flushAll(); process.exit(0); }); + process.on('SIGTERM', () => { flushAll(); process.exit(0); }); +} +_installFlushHandlers(); + +function parseClaudeSessionFile(sessionFile, opts) { if (!fs.existsSync(sessionFile)) return null; let stat; @@ -139,6 +617,14 @@ function parseClaudeSessionFile(sessionFile) { _fileCacheKeyIndex[sessionFile] = cacheKey; if (_parsedDiskCache[cacheKey]) return _parsedDiskCache[cacheKey]; + // Cache-only mode: skip actual file read when not cached. Caller gets + // null-ish placeholder and the real parse happens in a background job. + // This is what keeps the sync loadSessions() fast on cold caches. + if (opts && opts.cacheOnly) { + _pendingParseFiles.add(sessionFile); + return null; + } + let lines; try { lines = readLines(sessionFile); @@ -158,11 +644,27 @@ function parseClaudeSessionFile(sessionFile) { const mcpSet = new Set(); const skillSet = new Set(); + // Helper: is this `type=user` entry a REAL user prompt, not a tool_result? + // In Claude Code JSONL, tool_result messages are stored as type='user' with + // content=[{type:'tool_result', ...}]. We want to count only messages whose + // content contains real text from the human. + const isRealUserPrompt = (entry) => { + const c = (entry.message || {}).content; + if (typeof c === 'string') return c.trim().length > 0; + if (Array.isArray(c)) { + for (const p of c) { + if (p && p.type === 'text' && p.text && p.text.trim()) return true; + } + return false; + } + return false; + }; + for (const line of lines) { try { const entry = JSON.parse(line); if (entry.type === 'user' || entry.type === 'assistant') msgCount++; - if (entry.type === 'user') userMsgCount++; + if (entry.type === 'user' && isRealUserPrompt(entry)) userMsgCount++; if (entry.timestamp) { if (entry.timestamp < firstTs) firstTs = entry.timestamp; if (entry.timestamp > lastTs) lastTs = entry.timestamp; @@ -225,6 +727,123 @@ function parseClaudeSessionFile(sessionFile) { // Cache to disk _parsedDiskCache[cacheKey] = result; _parsedDiskCacheDirty = true; + _parsedDiskCacheEntriesSinceFlush++; + // Periodic flush so long operations don't lose progress on kill + if (_parsedDiskCacheEntriesSinceFlush >= PARSED_CACHE_FLUSH_EVERY) { + _saveParsedDiskCache(true); + } + return result; +} + +// Async variant: for large files (>5 MB) reads in chunks with setImmediate +// between chunks so the Node event loop can handle other requests. Same +// result shape and cache as sync version. +async function parseClaudeSessionFileAsync(sessionFile) { + if (!fs.existsSync(sessionFile)) return null; + let stat; + try { stat = fs.statSync(sessionFile); } catch { return null; } + + _loadParsedDiskCache(); + const cacheKey = sessionFile + '|' + stat.mtimeMs + '|' + stat.size; + _fileCacheKeyIndex[sessionFile] = cacheKey; + if (_parsedDiskCache[cacheKey]) return _parsedDiskCache[cacheKey]; + + // Small files: use sync path (faster, avoids Promise overhead) + const BIG_THRESHOLD = 5 * 1024 * 1024; // 5 MB + if (stat.size < BIG_THRESHOLD) return parseClaudeSessionFile(sessionFile); + + // Big file: stream read + parse line by line with periodic yielding. + let projectPath = ''; + let tool = 'claude'; + let msgCount = 0; + let firstMsg = ''; + let customTitle = ''; + let firstTs = stat.mtimeMs; + let lastTs = stat.mtimeMs; + let userMsgCount = 0; + let entrypointFound = false; + let worktreeOriginalCwd = ''; + const mcpSet = new Set(); + const skillSet = new Set(); + + const readline = require('readline'); + const stream = fs.createReadStream(sessionFile, { encoding: 'utf8', highWaterMark: 1 << 20 }); + const rl = readline.createInterface({ input: stream, crlfDelay: Infinity }); + + const isRealUserPromptAsync = (entry) => { + const c = (entry.message || {}).content; + if (typeof c === 'string') return c.trim().length > 0; + if (Array.isArray(c)) { + for (const p of c) { + if (p && p.type === 'text' && p.text && p.text.trim()) return true; + } + return false; + } + return false; + }; + + let linesSinceYield = 0; + for await (const line of rl) { + if (!line) continue; + try { + const entry = JSON.parse(line); + if (entry.type === 'user' || entry.type === 'assistant') msgCount++; + if (entry.type === 'user' && isRealUserPromptAsync(entry)) userMsgCount++; + if (entry.timestamp) { + if (entry.timestamp < firstTs) firstTs = entry.timestamp; + if (entry.timestamp > lastTs) lastTs = entry.timestamp; + } + if (!projectPath && entry.type === 'user' && entry.cwd) projectPath = entry.cwd; + if (!worktreeOriginalCwd && entry.type === 'worktree-state' && entry.worktreeSession && entry.worktreeSession.originalCwd) { + worktreeOriginalCwd = entry.worktreeSession.originalCwd; + } + if (!entrypointFound && entry.type === 'user' && entry.entrypoint) { + entrypointFound = true; + if (entry.entrypoint !== 'cli') tool = 'claude-ext'; + } + if (entry.type === 'custom-title' && typeof entry.customTitle === 'string') { + const title = entry.customTitle.trim(); + if (title) customTitle = title.slice(0, 200); + } + if (!firstMsg && entry.type === 'user' && entry.message && entry.message.content) { + const content = extractContent(entry.message.content).trim(); + if (content) firstMsg = content.slice(0, 200); + } + if (entry.type === 'assistant') { + const aContent = (entry.message || {}).content; + if (Array.isArray(aContent)) { + for (const block of aContent) { + if (!block || block.type !== 'tool_use') continue; + const name = block.name || ''; + if (name.startsWith('mcp__')) { + const parts = name.split('__'); + if (parts.length >= 3) mcpSet.add(parts[1]); + } else if (name === 'Skill') { + const sk = (block.input || {}).skill; + if (sk) skillSet.add(sk.includes(':') ? sk.split(':')[0] : sk); + } + } + } + } + } catch {} + if (++linesSinceYield >= 2000) { + linesSinceYield = 0; + await new Promise(r => setImmediate(r)); + } + } + + const result = { + projectPath, tool, msgCount, userMsgCount, + firstMsg, customTitle, firstTs, lastTs, + fileSize: stat.size, worktreeOriginalCwd, + mcpServers: Array.from(mcpSet), skills: Array.from(skillSet), + }; + _parsedDiskCache[cacheKey] = result; + _parsedDiskCacheDirty = true; + _parsedDiskCacheEntriesSinceFlush++; + if (_parsedDiskCacheEntriesSinceFlush >= PARSED_CACHE_FLUSH_EVERY) { + _saveParsedDiskCache(true); + } return result; } @@ -926,13 +1545,25 @@ function loadCursorVscdbDetail(sessionId) { return { messages: messages.slice(0, 200) }; } -function parseCodexSessionFile(sessionFile) { +function parseCodexSessionFile(sessionFile, opts) { if (!fs.existsSync(sessionFile)) return null; let stat; + try { stat = fs.statSync(sessionFile); } catch { return null; } + + // Reuse the same parsed-cache keyed by path+mtime+size + _loadParsedDiskCache(); + const cacheKey = 'codex:' + sessionFile + '|' + stat.mtimeMs + '|' + stat.size; + if (_parsedDiskCache[cacheKey]) return _parsedDiskCache[cacheKey]; + + // Cache-only mode: queue for background parsing, don't block + if (opts && opts.cacheOnly) { + _pendingCodexFiles.add(sessionFile); + return null; + } + let lines; try { - stat = fs.statSync(sessionFile); lines = readLines(sessionFile); } catch { return null; @@ -955,6 +1586,25 @@ function parseCodexSessionFile(sessionFile) { let firstMsg = ''; let firstTs = stat.mtimeMs; let lastTs = stat.mtimeMs; + // Detect sub-agent/scripted codex sessions. Multiple signals: + // 1. originator='codex_exec' or source='exec' (standard `codex exec`) + // 2. first user prompt matches known auto-script patterns (team scripts + // that spawn codex without the exec flag). Covers: "You are in /path. + // Task: X", "Read-only task. Inspect ...", "Work in /path", "Pair-local + // lane", "## Memory Writing Agent", etc. + let isHelper = false; + // Regexes checked against the first user prompt (see after the loop) + const AUTO_SCRIPT_PATTERNS = [ + /^You are in \//, + /^Read-only task\./, + /^Work (only )?in \//, + /^Pair-local [^\n]{1,60} lane/, + /^## [A-Z][a-z]+ [A-Z][a-z]+ Agent/, // "## Memory Writing Agent:..." + /^Read \/[^\s]+\/AGENTS\.md/, + /^Read \$[A-Z_]+\//, + /^\[Sub-agent results\]/, + /^You are OMX /, + ]; const mcpSet = new Set(); for (const line of lines) { @@ -966,8 +1616,11 @@ function parseCodexSessionFile(sessionFile) { if (ts > lastTs) lastTs = ts; } - if (entry.type === 'session_meta' && entry.payload && entry.payload.cwd && !projectPath) { - projectPath = entry.payload.cwd; + if (entry.type === 'session_meta' && entry.payload) { + if (entry.payload.cwd && !projectPath) projectPath = entry.payload.cwd; + const originator = entry.payload.originator || ''; + const source = entry.payload.source || ''; + if (originator === 'codex_exec' || source === 'exec') isHelper = true; continue; } @@ -995,7 +1648,14 @@ function parseCodexSessionFile(sessionFile) { } catch {} } - return { + // Secondary helper detection by first user prompt pattern + if (!isHelper && firstMsg) { + for (const re of AUTO_SCRIPT_PATTERNS) { + if (re.test(firstMsg)) { isHelper = true; break; } + } + } + + const result = { projectPath, msgCount, userMsgCount, @@ -1004,11 +1664,23 @@ function parseCodexSessionFile(sessionFile) { lastTs, fileSize: stat.size, mcpServers: Array.from(mcpSet), + isHelper, }; + _parsedDiskCache[cacheKey] = result; + _parsedDiskCacheDirty = true; + _parsedDiskCacheEntriesSinceFlush++; + if (_parsedDiskCacheEntriesSinceFlush >= PARSED_CACHE_FLUSH_EVERY) { + _saveParsedDiskCache(true); + } + return result; } +// Queue for background codex file parsing (drained by ensureWarmingRunning) +const _pendingCodexFiles = new Set(); + function scanCodexSessions() { - const sessions = []; + // Map for O(1) session lookups (was O(n²) with .find) + const sessionsById = new Map(); const codexTitles = parseCodexSessionIndex(CODEX_DIR); const codexHistory = path.join(CODEX_DIR, 'history.jsonl'); if (fs.existsSync(codexHistory)) { @@ -1016,12 +1688,11 @@ function scanCodexSessions() { for (const line of lines) { try { const d = JSON.parse(line); - // Codex uses session_id, ts (seconds), text const sid = d.session_id || d.sessionId || d.id; if (!sid) continue; const ts = d.ts ? d.ts * 1000 : (d.timestamp || Date.now()); - if (!sessions.find(s => s.id === sid)) { - sessions.push({ + if (!sessionsById.has(sid)) { + sessionsById.set(sid, { id: sid, tool: 'codex', project: d.project || d.cwd || '', @@ -1040,10 +1711,10 @@ function scanCodexSessions() { } // Enrich with session files from ~/.codex/sessions/ + // Cache-only: uses the parse cache; uncached files go to background queue. const codexSessionsDir = path.join(CODEX_DIR, 'sessions'); if (fs.existsSync(codexSessionsDir)) { try { - // Walk year/month/day directories const files = []; const walkDir = (dir) => { for (const entry of fs.readdirSync(dir, { withFileTypes: true })) { @@ -1055,37 +1726,61 @@ function scanCodexSessions() { walkDir(codexSessionsDir); for (const f of files) { - // Extract session ID from filename (rollout-DATE-UUID.jsonl) const basename = path.basename(f, '.jsonl'); const uuidMatch = basename.match(/([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})/); if (!uuidMatch) continue; const sid = uuidMatch[1]; - const summary = parseCodexSessionFile(f); - if (!summary) continue; - - const existing = sessions.find(s => s.id === sid); + const summary = parseCodexSessionFile(f, { cacheOnly: true }); + if (!summary) { + // No cached summary yet — create a minimal placeholder from stat + // so the session still shows in the list. Background job will enrich. + let size = 0, mtimeMs = 0; + try { const st = fs.statSync(f); size = st.size; mtimeMs = st.mtimeMs; } catch {} + const existing = sessionsById.get(sid); + if (existing) { + existing.has_detail = true; + existing.file_size = size; + existing._detail_pending = true; + } else { + sessionsById.set(sid, { + id: sid, + tool: 'codex', + project: '', + project_short: '', + first_ts: mtimeMs, + last_ts: mtimeMs, + messages: 0, + first_message: codexTitles[sid] || '', + has_detail: true, + file_size: size, + detail_messages: 0, + user_messages: 0, + mcp_servers: [], + skills: [], + _detail_pending: true, + }); + } + continue; + } + const existing = sessionsById.get(sid); if (existing) { existing.has_detail = true; existing.file_size = summary.fileSize; existing.messages = summary.msgCount; existing.detail_messages = summary.msgCount; existing.user_messages = summary.userMsgCount || 0; - if (codexTitles[sid]) { - existing.first_message = codexTitles[sid]; - } else if (summary.firstMsg && !existing.first_message) { - existing.first_message = summary.firstMsg; - } + if (summary.isHelper) existing.is_helper = true; + if (codexTitles[sid]) existing.first_message = codexTitles[sid]; + else if (summary.firstMsg && !existing.first_message) existing.first_message = summary.firstMsg; if (summary.projectPath && !existing.project) { existing.project = summary.projectPath; existing.project_short = summary.projectPath.replace(os.homedir(), '~'); } existing.first_ts = Math.min(existing.first_ts, summary.firstTs); existing.last_ts = Math.max(existing.last_ts, summary.lastTs); - if (summary.mcpServers && summary.mcpServers.length > 0) { - existing.mcp_servers = summary.mcpServers; - } + if (summary.mcpServers && summary.mcpServers.length > 0) existing.mcp_servers = summary.mcpServers; } else { - sessions.push({ + sessionsById.set(sid, { id: sid, tool: 'codex', project: summary.projectPath, @@ -1100,13 +1795,14 @@ function scanCodexSessions() { user_messages: summary.userMsgCount || 0, mcp_servers: summary.mcpServers || [], skills: [], + is_helper: summary.isHelper || false, }); } } } catch {} } - return sessions; + return Array.from(sessionsById.values()); } // ── Git root resolver ─────────────────────────────────────── @@ -1121,46 +1817,105 @@ function scanCodexSessions() { // from the session cwd string. Works without git for standard worktree layouts. const _gitRootCache = {}; -const GIT_ROOT_CACHE_FILE = path.join(os.tmpdir(), 'codedash-gitroot-cache.json'); +// Persistent dir (survives /tmp cleanup). Legacy file in tmpdir migrated once. +const GIT_ROOT_CACHE_FILE = path.join(CODEDASH_CACHE_DIR, 'git-root-cache.json'); +const LEGACY_GIT_ROOT_CACHE_FILE = path.join(os.tmpdir(), 'codedash-gitroot-cache.json'); let _gitRootDiskCache = null; +let _gitRootDirty = false; +// Queue of project paths that still need git resolution. Filled by sync +// loadSessions() when a path is uncached, drained in background by +// _ensureGitRootResolverRunning() so the sync path returns immediately. +const _pendingGitRoots = new Set(); +let _gitRootResolverRunning = false; function _loadGitRootDiskCache() { if (_gitRootDiskCache) return; try { if (fs.existsSync(GIT_ROOT_CACHE_FILE)) { _gitRootDiskCache = JSON.parse(fs.readFileSync(GIT_ROOT_CACHE_FILE, 'utf8')); - // Pre-fill memory cache from disk - Object.assign(_gitRootCache, _gitRootDiskCache); + } else if (fs.existsSync(LEGACY_GIT_ROOT_CACHE_FILE)) { + _gitRootDiskCache = JSON.parse(fs.readFileSync(LEGACY_GIT_ROOT_CACHE_FILE, 'utf8')); + _gitRootDirty = true; } + if (_gitRootDiskCache) Object.assign(_gitRootCache, _gitRootDiskCache); } catch {} if (!_gitRootDiskCache) _gitRootDiskCache = {}; } function _saveGitRootDiskCache() { + if (!_gitRootDirty) return; try { - fs.writeFileSync(GIT_ROOT_CACHE_FILE, JSON.stringify(_gitRootCache)); + const tmp = GIT_ROOT_CACHE_FILE + '.tmp'; + fs.writeFileSync(tmp, JSON.stringify(_gitRootCache)); + fs.renameSync(tmp, GIT_ROOT_CACHE_FILE); + _gitRootDirty = false; } catch {} } +// Sync resolver: never shells out from the hot path. Uses cache; queues +// unknown paths for background resolution. +// ── Conversation group key ───────────────────────────────── +// Two sessions belong to the same conversation group if they share the same +// project AND the same initial user prompt (first ~200 chars). This dedupes +// `codex exec` retries, `codex resume` chains, and sub-agent re-runs that +// would otherwise show as N separate cards in the UI. Helper sessions +// (is_helper=true) all collapse into their own single bucket per project. +function computeSessionGroupKey(s) { + const proj = (s.project || 'unknown').trim(); + // Normalize first message: lowercase, collapse whitespace, truncate + let msg = (s.first_message || '').replace(/\s+/g, ' ').trim().slice(0, 200); + if (s.is_helper) { + // All helpers in the same project collapse into one group. + return 'helper::' + proj; + } + if (!msg) return s.tool + '::' + proj + '::' + s.id; // no prompt — unique + return s.tool + '::' + proj + '::' + msg; +} + function resolveGitRoot(projectPath) { if (!projectPath) return ''; _loadGitRootDiskCache(); if (_gitRootCache[projectPath] !== undefined) return _gitRootCache[projectPath]; - // Skip remote/non-existent paths + // Fast skip: non-existent paths can never be git roots, record & done. if (!fs.existsSync(projectPath)) { _gitRootCache[projectPath] = ''; + _gitRootDirty = true; return ''; } - try { - const root = execFileSync('git', ['-C', projectPath, 'rev-parse', '--show-toplevel'], { - encoding: 'utf8', timeout: 2000, windowsHide: true, stdio: ['pipe', 'pipe', 'pipe'] - }).trim(); - _gitRootCache[projectPath] = root; - return root; - } catch { - _gitRootCache[projectPath] = ''; - return ''; - } + // Queue for background resolver — sync path returns '' for now + _pendingGitRoots.add(projectPath); + _ensureGitRootResolverRunning(); + return ''; +} + +function _ensureGitRootResolverRunning() { + if (_gitRootResolverRunning) return; + if (_pendingGitRoots.size === 0) return; + _gitRootResolverRunning = true; + setImmediate(async () => { + try { + while (_pendingGitRoots.size > 0) { + const snapshot = Array.from(_pendingGitRoots); + for (const p of snapshot) { + _pendingGitRoots.delete(p); + if (_gitRootCache[p] !== undefined) continue; + let root = ''; + try { + root = execFileSync('git', ['-C', p, 'rev-parse', '--show-toplevel'], { + encoding: 'utf8', timeout: 2000, windowsHide: true, stdio: ['pipe', 'pipe', 'pipe'], + }).trim(); + } catch {} + _gitRootCache[p] = root; + _gitRootDirty = true; + // Yield between shell-outs so HTTP stays responsive + await new Promise(r => setImmediate(r)); + } + _saveGitRootDiskCache(); + } + } finally { + _gitRootResolverRunning = false; + } + }); } const _gitInfoCache = {}; @@ -1205,15 +1960,15 @@ function getProjectGitInfo(projectPath) { let _sessionsCache = null; let _sessionsCacheTs = 0; -const SESSIONS_CACHE_TTL = 60000; // 60 seconds — hot cache, invalidated by file changes +const SESSIONS_CACHE_TTL = 60000; // 60 seconds — hot cache, extended if no file changes -// Track file mtimes for smart invalidation +// Track history/projects mtime so we only rescan when files actually changed. +// Lets loadSessions() extend the cache window past TTL when nothing happened. let _historyMtime = 0; let _historySize = 0; let _projectsDirMtime = 0; function _sessionsNeedRescan() { - // Check if history.jsonl or projects dir changed since last scan try { if (fs.existsSync(HISTORY_FILE)) { const st = fs.statSync(HISTORY_FILE); @@ -1374,11 +2129,11 @@ function _loadCursorVscdbInBackground() { function loadSessions() { const now = Date.now(); if (_sessionsCache) { - // Hot cache: return immediately if within TTL and no file changes + // Hot cache: return immediately within TTL window if ((now - _sessionsCacheTs) < SESSIONS_CACHE_TTL) return _sessionsCache; - // Extended cache: even after TTL, only rescan if files actually changed + // Extended cache: after TTL, only rescan if files actually changed if (!_sessionsNeedRescan()) { - _sessionsCacheTs = now; // extend TTL + _sessionsCacheTs = now; return _sessionsCache; } } @@ -1539,12 +2294,19 @@ function loadSessions() { } if (sessionFile) { - const summary = parseClaudeSessionFile(sessionFile); + // Cache-only: never read disk from the sync path. Uncached files are + // queued for background warming — subsequent calls will see full data. + const summary = parseClaudeSessionFile(sessionFile, { cacheOnly: true }); if (summary) mergeClaudeSessionDetail(s, summary, sessionFile); else { + // Placeholder: session exists on disk, actual details pending s.has_detail = true; - try { s.file_size = fs.statSync(sessionFile).size; } catch { s.file_size = 0; } s._session_file = sessionFile; + s._detail_pending = true; + try { s.file_size = fs.statSync(sessionFile).size; } catch { s.file_size = 0; } + if (s.detail_messages === undefined) s.detail_messages = 0; + if (!s.mcp_servers) s.mcp_servers = []; + if (!s.skills) s.skills = []; } } else if (!s.has_detail) { s.has_detail = false; @@ -1556,6 +2318,7 @@ function loadSessions() { } // Scan project dirs for orphan sessions (e.g. Claude Extension sessions not in history.jsonl) + // Cache-only in sync path; uncached files get queued and background job parses them. if (fs.existsSync(PROJECTS_DIR)) { try { for (const proj of fs.readdirSync(PROJECTS_DIR)) { @@ -1566,12 +2329,37 @@ function loadSessions() { const sid = file.replace('.jsonl', ''); const filePath = path.join(projDir, file); if (sessions[sid]) { - const summary = parseClaudeSessionFile(filePath); + const summary = parseClaudeSessionFile(filePath, { cacheOnly: true }); if (summary) mergeClaudeSessionDetail(sessions[sid], summary, filePath); + else sessions[sid]._detail_pending = true; + continue; + } + const summary = parseClaudeSessionFile(filePath, { cacheOnly: true }); + if (!summary) { + // Create a placeholder from stat + history of same sid (none here) + let size = 0; + try { size = fs.statSync(filePath).size; } catch {} + sessions[sid] = { + id: sid, + tool: 'claude', + project: '', + project_short: '', + first_ts: 0, + last_ts: 0, + messages: 0, + first_message: '', + has_detail: true, + file_size: size, + detail_messages: 0, + mcp_servers: [], + skills: [], + _claude_dir: CLAUDE_DIR, + _session_file: filePath, + _detail_pending: true, + worktree_original_cwd: '', + }; continue; } - const summary = parseClaudeSessionFile(filePath); - if (!summary) continue; sessions[sid] = { id: sid, tool: summary.tool, @@ -1595,6 +2383,12 @@ function loadSessions() { } catch {} } + // If any Claude files were uncached, _pendingParseFiles has been populated + // by parseClaudeSessionFile(cacheOnly: true). Kick off the warmer. + _ensureWarmingRunning(); + // Kick off one-shot SQLite backfill (noop if already running/done) + _ensureSqliteBackfillRunning(); + // Ensure all sessions have mcp_servers/skills (defaults for non-Claude) for (const s of Object.values(sessions)) { if (!s.mcp_servers) s.mcp_servers = []; @@ -1625,12 +2419,27 @@ function loadSessions() { s.date = dt.getFullYear() + '-' + String(dt.getMonth()+1).padStart(2,'0') + '-' + String(dt.getDate()).padStart(2,'0'); // Priority: worktree-state.originalCwd (container-safe) > git rev-parse > path heuristic (frontend) s.git_root = s.worktree_original_cwd || (s.project ? (_gitRootCache[s.project] || '') : ''); + // Conversation group key — sessions with the same project + initial + // prompt are treated as retries/resumes of the same conversation. Used + // by the shared groupSessions() helper in all views (Timeline, All + // Sessions, Cloud Sync) so we don't render 50 duplicate cards. + s.group_key = computeSessionGroupKey(s); } // Flag for frontend: true = cursor vscdb still loading, will have more data soon result._loading = !_cursorVscdbSessions && _cursorVscdbLoading; + // Warming state: true when background parse of session files is in progress + result._warming = _warmingStatus.running; + if (_warmingStatus.running || _warmingStatus.pending > 0) { + result._warmingProgress = { + done: _warmingStatus.done, + total: _warmingStatus.total, + phase: _warmingStatus.phase, + pending: _pendingParseFiles.size, + }; + } - // Flush disk caches + // Flush disk caches (non-force: only writes if dirty beyond threshold) _saveParsedDiskCache(); _saveGitRootDiskCache(); _updateScanMarkers(); @@ -1640,6 +2449,91 @@ function loadSessions() { return result; } +// ── Async pre-warm of parse cache + incremental change detection ───────── +// Lists all Claude session JSONL files and parses those whose cache entry +// is stale (mtime or size changed) or missing. Yields between files so the +// HTTP event loop stays responsive. Reports progress via callback. +async function loadSessionsAsync(progressCb) { + _loadParsedDiskCache(); + _loadCostDiskCache(); + + const report = (phase, done, total, extra) => { + if (typeof progressCb === 'function') { + try { progressCb({ phase, done, total, ...(extra || {}) }); } catch {} + } + }; + + // Phase 1: enumerate all Claude JSONL files (fast, just readdir) + report('scanning files', 0, 0); + const allClaudeFiles = []; + const walkClaude = (dir) => { + try { + for (const proj of fs.readdirSync(dir)) { + const projDir = path.join(dir, proj); + try { + if (!fs.statSync(projDir).isDirectory()) continue; + for (const file of fs.readdirSync(projDir)) { + if (!file.endsWith('.jsonl')) continue; + allClaudeFiles.push(path.join(projDir, file)); + } + } catch {} + } + } catch {} + }; + if (fs.existsSync(PROJECTS_DIR)) walkClaude(PROJECTS_DIR); + for (const extra of EXTRA_CLAUDE_DIRS) { + const ep = path.join(extra, 'projects'); + if (fs.existsSync(ep)) walkClaude(ep); + } + + // Phase 2: figure out which files need (re)parsing — incremental path + const toParse = []; + let cachedCount = 0; + for (const f of allClaudeFiles) { + let stat; + try { stat = fs.statSync(f); } catch { continue; } + const key = f + '|' + stat.mtimeMs + '|' + stat.size; + if (_parsedDiskCache[key]) { + _fileCacheKeyIndex[f] = key; + cachedCount++; + } else { + toParse.push({ file: f, size: stat.size }); + } + } + // Parse newest (largest mtime) first so dashboards reflect fresh data fastest + toParse.sort((a, b) => { + try { return fs.statSync(b.file).mtimeMs - fs.statSync(a.file).mtimeMs; } catch { return 0; } + }); + + const total = allClaudeFiles.length; + report('parsing session files', cachedCount, total, { cached: cachedCount, toParse: toParse.length }); + + // Phase 3: parse uncached files with yielding + let done = cachedCount; + for (const { file, size } of toParse) { + try { + if (size >= 5 * 1024 * 1024) { + await parseClaudeSessionFileAsync(file); + } else { + parseClaudeSessionFile(file); + // Tiny yield so poll requests can slot in between files + await new Promise(r => setImmediate(r)); + } + } catch {} + done++; + report('parsing session files', done, total, { cached: cachedCount, toParse: toParse.length }); + } + _saveParsedDiskCache(true); + + // Phase 4: invalidate any stale in-memory session cache, then build full + // sessions list from now-warm parse cache (sync call, fast) + _sessionsCache = null; + _sessionsCacheTs = 0; + report('aggregating sessions', total, total); + const sessions = loadSessions(); + return sessions; +} + function loadSessionDetail(sessionId, project) { const found = findSessionFile(sessionId, project); if (!found) return { error: 'Session file not found', messages: [] }; @@ -1852,7 +2746,7 @@ function exportSessionMarkdown(sessionId, project) { // Session file index: sessionId -> file path (built once, avoids O(sessions*projects) scans) let _sessionFileIndex = null; let _sessionFileIndexTs = 0; -const SESSION_FILE_INDEX_TTL = 120000; // 2 minutes — dirs rarely change +const SESSION_FILE_INDEX_TTL = 30000; // 30 seconds function _buildSessionFileIndex() { const now = Date.now(); @@ -1996,14 +2890,27 @@ function isSystemMessage(text) { if (!text) return true; var t = text.trim(); if (t === 'exit' || t === 'quit' || t === '/exit') return true; + // XML-wrapped injected context (Claude Code + Codex) if (t.startsWith('')) return true; - // Codex developer role system prompts + if (t.startsWith('')) return true; + if (t.startsWith('')) return true; + if (t.startsWith('')) return true; + if (t.startsWith('')) return true; + // Agent instruction docs and skill metadata + if (t.startsWith('# AGENTS.md')) return true; + if (t.startsWith('# CLAUDE.md')) return true; + // Codex developer/system prompts if (t.startsWith('You are Codex')) return true; if (t.startsWith('Filesystem sandboxing')) return true; + // Codex runtime nudges / auto-steering (not real user prompts) + if (t.startsWith('Warning: The maximum number of unified exec')) return true; + if (t.indexOf('AUTOSTEERING:') >= 0 && t.length < 400) return true; + // Sub-agent delegate result injection + if (t.startsWith('[Sub-agent results]')) return true; + if (t.startsWith('[sub-agent result]')) return true; return false; } @@ -2188,14 +3095,34 @@ function getSearchIndex(sessions) { function searchFullText(query, sessions) { if (!query || query.length < 2) return []; + + // Prefer the persistent SQLite FTS5 index — O(log n), no RAM bloat. + try { + const sqliteIndex = require('./sqlite-index'); + const rows = sqliteIndex.search(query, 200); + if (rows && rows.length > 0) { + // Group by session_id, keep up to 3 snippets per session + const bySession = {}; + for (const r of rows) { + if (!bySession[r.session_id]) bySession[r.session_id] = []; + if (bySession[r.session_id].length >= 3) continue; + bySession[r.session_id].push({ + role: r.role, + snippet: (r.snippet || '').replace(/<>/g, ''), + }); + } + return Object.keys(bySession).map(sid => ({ sessionId: sid, matches: bySession[sid] })); + } + } catch (e) { + // Fall through to in-memory fallback + } + + // Fallback: in-memory scan (pre-SQLite ingest completion) const q = query.toLowerCase(); const index = getSearchIndex(sessions); const results = []; - for (const entry of index) { if (entry.fullText.indexOf(q) === -1) continue; - - // Find matching messages with snippets const matches = []; for (const t of entry.texts) { if (matches.length >= 3) break; @@ -2209,12 +3136,8 @@ function searchFullText(query, sessions) { }); } } - - if (matches.length > 0) { - results.push({ sessionId: entry.sessionId, matches }); - } + if (matches.length > 0) results.push({ sessionId: entry.sessionId, matches }); } - return results; } @@ -2298,27 +3221,7 @@ function getModelPricing(model) { } // ── Compute real cost from session file token usage ──────── - -// Disk cache for computed session costs -const COST_CACHE_FILE = path.join(os.tmpdir(), 'codedash-cost-cache.json'); -let _costDiskCache = null; - -function _loadCostDiskCache() { - if (_costDiskCache) return; - try { - if (fs.existsSync(COST_CACHE_FILE)) { - _costDiskCache = JSON.parse(fs.readFileSync(COST_CACHE_FILE, 'utf8')); - } - } catch {} - if (!_costDiskCache) _costDiskCache = {}; -} - -function _saveCostDiskCache() { - if (!_costDiskCache) return; - try { - fs.writeFileSync(COST_CACHE_FILE, JSON.stringify(_costDiskCache)); - } catch {} -} +// (COST_CACHE_FILE/_costDiskCache/_loadCostDiskCache/_saveCostDiskCache defined at top) const EMPTY_COST = { cost: 0, inputTokens: 0, outputTokens: 0, cacheReadTokens: 0, cacheCreateTokens: 0, contextPctSum: 0, contextTurnCount: 0, model: '' }; @@ -2458,57 +3361,214 @@ function computeSessionCost(sessionId, project) { } const result = { cost: totalCost, inputTokens: totalInput, outputTokens: totalOutput, cacheReadTokens: totalCacheRead, cacheCreateTokens: totalCacheCreate, contextPctSum, contextTurnCount, model }; - if (cacheKey) _costDiskCache[cacheKey] = result; + if (cacheKey) { + _costDiskCache[cacheKey] = result; + _costDiskCacheDirty = true; + _costDiskCacheEntriesSinceFlush++; + if (_costDiskCacheEntriesSinceFlush >= COST_CACHE_FLUSH_EVERY) { + _saveCostDiskCache(true); + } + } _costMemCache[sessionId] = result; return result; } // ── Cost analytics ──────────────────────────────────────── -// Analytics result cache — avoids recomputing 31k sessions every request -const ANALYTICS_CACHE_FILE = path.join(os.tmpdir(), 'codedash-analytics-cache.json'); -let _analyticsCacheResult = null; -let _analyticsCacheKey = null; +// ── Incremental cost aggregator ─────────────────────────────── +// Streaming version of getCostAnalytics: create an aggregator, feed sessions +// one at a time via merge(), read a snapshot at any point via finalize(). +// Lets the background job expose live partial results to the UI. +function createCostAggregator() { + const state = { + byDay: {}, byProject: {}, byWeek: {}, byAgent: {}, + totalCost: 0, totalTokens: 0, + totalInputTokens: 0, totalOutputTokens: 0, + totalCacheReadTokens: 0, totalCacheCreateTokens: 0, + globalContextPctSum: 0, globalContextTurnCount: 0, + firstDate: null, lastDate: null, + sessionsWithData: 0, + sessionCosts: [], + agentNoCostData: {}, + seenAgents: new Set(), + processedCount: 0, + }; + return { + state, + merge(session, costData) { + state.processedCount++; + if (!state.seenAgents.has(session.tool)) { + state.seenAgents.add(session.tool); + if (!state.byAgent[session.tool]) state.byAgent[session.tool] = { cost: 0, sessions: 0, tokens: 0, estimated: false }; + } + const cost = costData.cost; + const tokens = costData.inputTokens + costData.outputTokens + costData.cacheReadTokens + costData.cacheCreateTokens; + if (cost === 0 && tokens === 0) { + if (!state.agentNoCostData[session.tool]) state.agentNoCostData[session.tool] = 0; + state.agentNoCostData[session.tool]++; + return; + } + state.sessionsWithData++; + state.totalCost += cost; + state.totalTokens += tokens; + state.totalInputTokens += costData.inputTokens; + state.totalOutputTokens += costData.outputTokens; + state.totalCacheReadTokens += costData.cacheReadTokens; + state.totalCacheCreateTokens += costData.cacheCreateTokens; + + const agent = session.tool || 'unknown'; + if (!state.byAgent[agent]) state.byAgent[agent] = { cost: 0, sessions: 0, tokens: 0, estimated: false }; + state.byAgent[agent].cost += cost; + state.byAgent[agent].sessions++; + state.byAgent[agent].tokens += tokens; + if (agent === 'codex') state.byAgent[agent].estimated = true; + if (agent === 'cursor' && costData.model && costData.model.includes('-estimated')) state.byAgent[agent].estimated = true; + if (agent === 'opencode' && !costData.model) state.byAgent[agent].estimated = true; + + state.globalContextPctSum += costData.contextPctSum; + state.globalContextTurnCount += costData.contextTurnCount; + + const day = session.date || 'unknown'; + if (session.date) { + if (!state.firstDate || session.date < state.firstDate) state.firstDate = session.date; + if (!state.lastDate || session.date > state.lastDate) state.lastDate = session.date; + } + if (!state.byDay[day]) state.byDay[day] = { cost: 0, sessions: 0, tokens: 0 }; + state.byDay[day].cost += cost; + state.byDay[day].sessions++; + state.byDay[day].tokens += tokens; + + if (session.date) { + const d = new Date(session.date); + const weekStart = new Date(d); + weekStart.setDate(d.getDate() - d.getDay()); + const weekKey = weekStart.toISOString().slice(0, 10); + if (!state.byWeek[weekKey]) state.byWeek[weekKey] = { cost: 0, sessions: 0 }; + state.byWeek[weekKey].cost += cost; + state.byWeek[weekKey].sessions++; + } + + const proj = session.project_short || session.project || 'unknown'; + if (!state.byProject[proj]) state.byProject[proj] = { cost: 0, sessions: 0, tokens: 0 }; + state.byProject[proj].cost += cost; + state.byProject[proj].sessions++; + state.byProject[proj].tokens += tokens; + + state.sessionCosts.push({ id: session.id, cost, project: proj, date: session.date, last_ts: session.last_ts || 0 }); + }, + finalize() { + // Sort top sessions by cost (snapshot a shallow copy so partials stay consistent) + const topCopy = state.sessionCosts.slice().sort((a, b) => b.cost - a.cost); + const days = state.firstDate && state.lastDate + ? Math.max(1, Math.round((new Date(state.lastDate) - new Date(state.firstDate)) / 86400000) + 1) + : 1; + const now = Date.now(); + const todayStr = new Date().toISOString().slice(0, 10); + const hoursElapsedToday = (now - new Date(todayStr).getTime()) / 3600000; + let last1hCost = 0; + let todayCost = 0; + for (const sc of topCopy) { + if (sc.last_ts >= now - 3600000) last1hCost += sc.cost; + if (sc.date === todayStr) todayCost += sc.cost; + } + return { + totalCost: state.totalCost, + totalTokens: state.totalTokens, + totalInputTokens: state.totalInputTokens, + totalOutputTokens: state.totalOutputTokens, + totalCacheReadTokens: state.totalCacheReadTokens, + totalCacheCreateTokens: state.totalCacheCreateTokens, + avgContextPct: state.globalContextTurnCount > 0 ? Math.round(state.globalContextPctSum / state.globalContextTurnCount) : 0, + dailyRate: state.totalCost / days, + firstDate: state.firstDate, + lastDate: state.lastDate, + days, + totalSessions: state.sessionsWithData, + byDay: state.byDay, + byWeek: state.byWeek, + byProject: state.byProject, + topSessions: topCopy.slice(0, 10), + byAgent: state.byAgent, + agentNoCostData: state.agentNoCostData, + last1hCost, + todayCost, + hoursElapsedToday: Math.max(1, hoursElapsedToday), + processedCount: state.processedCount, + }; + }, + }; +} -function _analyticsKey(sessions) { - // Key: session count + newest session mtime - let newest = 0; - for (const s of sessions) { - if (s.last_ts > newest) newest = s.last_ts; +// Compute per-session cost respecting special per-tool paths used by +// getCostAnalytics (opencode batch, cursor vscdb tokens, ...). +function computeSessionCostForAnalytics(session, opencodeCostCache) { + if (session.tool === 'opencode' && opencodeCostCache && opencodeCostCache[session.id]) { + return opencodeCostCache[session.id]; } - return sessions.length + ':' + newest; + if (session.tool === 'cursor') { + const inp = session._cursor_input_tokens || 0; + const out = session._cursor_output_tokens || 0; + if (inp > 0 || out > 0) { + const model = session._cursor_model || ''; + const pricing = getModelPricing(model); + return { cost: inp * pricing.input + out * pricing.output, inputTokens: inp, outputTokens: out, cacheReadTokens: 0, cacheCreateTokens: 0, contextPctSum: 0, contextTurnCount: 0, model: model }; + } + if (session.user_messages > 0 || session.messages > 0) { + const userMsgs = session.user_messages || Math.ceil((session.messages || 0) * 0.07); + const model = session._cursor_model || 'claude-sonnet'; + const pricing = getModelPricing(model); + const estInput = userMsgs * 2000; + const estOutput = userMsgs * 1000; + return { cost: estInput * pricing.input + estOutput * pricing.output, inputTokens: estInput, outputTokens: estOutput, cacheReadTokens: 0, cacheCreateTokens: 0, contextPctSum: 0, contextTurnCount: 0, model: model + '-estimated' }; + } + return EMPTY_COST; + } + return computeSessionCost(session.id, session.project); } -function getCostAnalytics(sessions) { - // Fast cache check — if sessions haven't changed, return cached result - const key = _analyticsKey(sessions); - if (_analyticsCacheResult && _analyticsCacheKey === key) return _analyticsCacheResult; - - // Try disk cache - if (!_analyticsCacheResult) { - try { - if (fs.existsSync(ANALYTICS_CACHE_FILE)) { - const cached = JSON.parse(fs.readFileSync(ANALYTICS_CACHE_FILE, 'utf8')); - if (cached._key === key) { - _analyticsCacheResult = cached.data; - _analyticsCacheKey = key; - return cached.data; - } +// Pre-compute OpenCode costs in one batch SQL — used by the streaming path +function buildOpencodeCostCache(sessions) { + const cache = {}; + const opencodeSessions = sessions.filter(s => s.tool === 'opencode'); + if (opencodeSessions.length === 0 || !fs.existsSync(OPENCODE_DB)) return cache; + try { + const batchRows = execFileSync('sqlite3', [ + OPENCODE_DB, + `SELECT session_id, data FROM message WHERE json_extract(data, '$.role') = 'assistant' ORDER BY time_created` + ], { encoding: 'utf8', timeout: 30000, windowsHide: true }).trim(); + if (batchRows) { + for (const row of batchRows.split('\n')) { + const sepIdx = row.indexOf('|'); + if (sepIdx < 0) continue; + const sessId = row.slice(0, sepIdx); + const jsonStr = row.slice(sepIdx + 1); + try { + const msgData = JSON.parse(jsonStr); + const t = msgData.tokens || {}; + const inp = t.input || 0; + const out = (t.output || 0) + (t.reasoning || 0); + const cacheRead = (t.cache && t.cache.read) || 0; + const cacheCreate = (t.cache && t.cache.write) || 0; + if (inp === 0 && out === 0) continue; + if (!cache[sessId]) cache[sessId] = { cost: 0, inputTokens: 0, outputTokens: 0, cacheReadTokens: 0, cacheCreateTokens: 0, contextPctSum: 0, contextTurnCount: 0, model: '' }; + const c = cache[sessId]; + if (!c.model && msgData.modelID) c.model = msgData.modelID; + const pricing = getModelPricing(msgData.modelID || c.model); + c.inputTokens += inp; + c.outputTokens += out; + c.cacheReadTokens += cacheRead; + c.cacheCreateTokens += cacheCreate; + c.cost += inp * pricing.input + cacheCreate * pricing.cache_create + cacheRead * pricing.cache_read + out * pricing.output; + const ctx = inp + cacheCreate + cacheRead; + if (ctx > 0) { c.contextPctSum += (ctx / CONTEXT_WINDOW) * 100; c.contextTurnCount++; } + } catch {} } - } catch {} - } - - const result = _computeCostAnalytics(sessions); - - // Save to cache - _analyticsCacheResult = result; - _analyticsCacheKey = key; - try { fs.writeFileSync(ANALYTICS_CACHE_FILE, JSON.stringify({ _key: key, data: result })); } catch {} - - return result; + } + } catch {} + return cache; } -function _computeCostAnalytics(sessions) { +function getCostAnalytics(sessions) { const byDay = {}; const byProject = {}; const byWeek = {}; @@ -2675,7 +3735,8 @@ function _computeCostAnalytics(sessions) { if (sc.date === todayStr) todayCost += sc.cost; } - _saveCostDiskCache(); + _saveCostDiskCache(true); + _saveParsedDiskCache(true); return { totalCost, @@ -2704,13 +3765,63 @@ function _computeCostAnalytics(sessions) { // ── Active sessions detection ───────────────────────────── +// ── Active sessions (cached, non-blocking) ────────────────── +// getActiveSessions() is called from /api/active which the browser polls on +// a timer. The previous version shelled out to `ps` + `lsof` synchronously +// once per matching process, doing an O(N) sync scan that blocked the event +// loop for tens of seconds when many processes had "codex"/"claude" in their +// cmdline (e.g. codex-up-exec wrapper spawned by Claude Code tooling). +// +// New design: +// 1. Cached result served for up to 3 seconds. +// 2. Tighter cmd matching — must be a real agent CLI, not a substring hit. +// 3. pid→cwd is remembered across calls (pids are stable for the process +// lifetime). We only look up cwd for pids we've never seen. +// 4. lsof is run ONCE as a single batch call for all unknown pids, with +// a hard 2-second total timeout. +// 5. No inner loadSessions() calls; session matching uses the in-memory +// sessions cache if it already exists, otherwise cwd-match is skipped +// and a background refresh fires. +let _activeCache = null; +let _activeCacheTs = 0; +const ACTIVE_CACHE_TTL = 3000; // 3 seconds +const _pidCwdCache = new Map(); // pid → cwd (stable for process lifetime) + +// Real agent CLI invocations — tighter than a substring match. +// Matches the binary name at the START of cmd (or after a /path/). +// Includes codex-up variants (user wrapper that spawns codex exec). +const AGENT_CLI_MATCHERS = [ + { tool: 'claude', re: /(^|\/)claude(\s|$)/ }, + { tool: 'codex', re: /(^|\/)codex(-up(-exec)?)?(\s|$)/ }, + { tool: 'opencode',re: /(^|\/)opencode(\s|$)/ }, + { tool: 'kiro', re: /(^|\/)kiro-cli(\s|$)/ }, + { tool: 'cursor', re: /(^|\/)cursor-agent(\s|$)/ }, +]; + +function _matchAgentCli(cmd) { + for (const m of AGENT_CLI_MATCHERS) { + if (m.re.test(cmd)) return m.tool; + } + return ''; +} + function getActiveSessions() { + const now = Date.now(); + if (_activeCache && (now - _activeCacheTs) < ACTIVE_CACHE_TTL) { + return _activeCache; + } const active = []; - const seenPids = new Set(); - // 1. Claude Code — read PID files for session ID mapping + // Skip on Windows + if (process.platform === 'win32') { + _activeCache = active; + _activeCacheTs = now; + return active; + } + + // 1. Read Claude Code PID files (provides cwd + sessionId directly) const sessionsDir = path.join(CLAUDE_DIR, 'sessions'); - const claudePidMap = {}; // pid → {sessionId, cwd, startedAt} + const claudePidMap = {}; if (fs.existsSync(sessionsDir)) { for (const file of fs.readdirSync(sessionsDir)) { if (!file.endsWith('.json')) continue; @@ -2721,104 +3832,129 @@ function getActiveSessions() { } } - // 2. Scan ALL agent processes via ps - const agentPatterns = [ - { pattern: 'claude', tool: 'claude', match: /\bclaude\b/ }, - { pattern: 'codex', tool: 'codex', match: /\bcodex\b/ }, - { pattern: 'opencode', tool: 'opencode', match: /\bopencode\b/ }, - { pattern: 'kiro', tool: 'kiro', match: /kiro-cli/ }, - { pattern: 'cursor-agent', tool: 'cursor', match: /cursor-agent/ }, - ]; - - // Skip process scanning on Windows (no ps/grep) - if (process.platform === 'win32') return active; - + // 2. Single `ps` call + let psOut = ''; try { - const psOut = execSync( - 'ps aux 2>/dev/null | grep -E "claude|codex|opencode|kiro-cli|cursor-agent" | grep -v grep || true', - { encoding: 'utf8', timeout: 3000, stdio: ['pipe', 'pipe', 'pipe'] } + psOut = execSync( + 'ps -eo pid=,pcpu=,rss=,stat=,command= 2>/dev/null', + { encoding: 'utf8', timeout: 2000, stdio: ['pipe', 'pipe', 'pipe'], maxBuffer: 4 * 1024 * 1024 } ); + } catch { + _activeCache = active; + _activeCacheTs = now; + return active; + } - for (const line of psOut.split('\n').filter(Boolean)) { - const parts = line.trim().split(/\s+/); - if (parts.length < 11) continue; - - const pid = parseInt(parts[1]); - if (seenPids.has(pid)) continue; + // 3. Parse + filter with strict agent CLI matching + const matches = []; + const livePids = new Set(); + for (const line of psOut.split('\n')) { + if (!line) continue; + const m = line.match(/^\s*(\d+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(.*)$/); + if (!m) continue; + const pid = parseInt(m[1]); + if (!pid) continue; + livePids.add(pid); + const cpu = parseFloat(m[2]) || 0; + const rss = parseInt(m[3]) || 0; + const stat = m[4] || ''; + const cmd = m[5] || ''; + + // Skip wrappers + if (cmd.includes('node bin/cli') || cmd.includes('/codedash') || cmd.includes('npm ')) continue; + if (cmd.startsWith('grep ') || cmd.includes(' grep ')) continue; + + const tool = _matchAgentCli(cmd); + if (!tool) continue; + + matches.push({ pid, cpu, rss, stat, cmd, tool }); + } - const cpu = parseFloat(parts[2]) || 0; - const rss = parseInt(parts[5]) || 0; - const stat = parts[7] || ''; - const cmd = parts.slice(10).join(' '); + // Drop stale pids from cwd cache + for (const pid of Array.from(_pidCwdCache.keys())) { + if (!livePids.has(pid)) _pidCwdCache.delete(pid); + } - // Determine tool - let tool = ''; - for (const ap of agentPatterns) { - if (ap.match.test(cmd)) { tool = ap.tool; break; } + // 4. Collect unknown pids (not in PID files, not in cache). Look up cwd + // via a SINGLE batch lsof call with a hard 1-second timeout. + const unknownPids = []; + for (const m of matches) { + if (claudePidMap[m.pid] && claudePidMap[m.pid].cwd) continue; + if (_pidCwdCache.has(m.pid)) continue; + unknownPids.push(m.pid); + } + if (unknownPids.length > 0) { + try { + // -a ANDs the -d/-p conditions (without it, lsof ORs and returns cwd for + // all processes, not just the requested pids). + const out = execSync( + `lsof -a -d cwd -Fpn -p ${unknownPids.join(',')} 2>/dev/null`, + { encoding: 'utf8', timeout: 1500, stdio: ['pipe', 'pipe', 'pipe'], maxBuffer: 1 * 1024 * 1024 } + ); + // lsof -Fpn output: p\nn\np\nn... + let curPid = 0; + for (const line of out.split('\n')) { + if (line.startsWith('p')) { + curPid = parseInt(line.slice(1)) || 0; + } else if (line.startsWith('n') && curPid) { + _pidCwdCache.set(curPid, line.slice(1)); + } } - if (!tool) continue; - - // Skip node/npm/shell wrappers — only main processes - if (cmd.includes('node bin/cli') || cmd.includes('npm') || cmd.includes('grep')) continue; - - seenPids.add(pid); - - // Get session ID from Claude PID files - let sessionId = ''; - let cwd = ''; - let startedAt = 0; - let sessionSource = ''; - if (claudePidMap[pid]) { - sessionId = claudePidMap[pid].sessionId || ''; - cwd = claudePidMap[pid].cwd || ''; - startedAt = claudePidMap[pid].startedAt || 0; - if (sessionId) sessionSource = 'pid-file'; + // Mark pids we asked about but didn't get cwd for — empty string so we + // don't keep asking on every poll. + for (const pid of unknownPids) { + if (!_pidCwdCache.has(pid)) _pidCwdCache.set(pid, ''); } + } catch { + // Timeout / error: mark all as unknown so we don't retry each poll + for (const pid of unknownPids) _pidCwdCache.set(pid, ''); + } + } - // Try to get cwd from lsof if not from PID file - if (!cwd) { - try { - const lsofOut = execSync(`lsof -d cwd -p ${pid} -Fn 2>/dev/null`, { encoding: 'utf8', timeout: 2000, stdio: ['pipe', 'pipe', 'pipe'] }); - const match = lsofOut.match(/\nn(\/[^\n]+)/); - if (match) cwd = match[1]; - } catch {} - } + // 5. Assemble results using only already-loaded sessions cache for cwd-match. + // Never call loadSessions() here — it may invalidate and re-parse. + const cachedSessions = _sessionsCache || []; + + for (const m of matches) { + let sessionId = ''; + let cwd = ''; + let startedAt = 0; + let sessionSource = ''; + + if (claudePidMap[m.pid]) { + sessionId = claudePidMap[m.pid].sessionId || ''; + cwd = claudePidMap[m.pid].cwd || ''; + startedAt = claudePidMap[m.pid].startedAt || 0; + if (sessionId) sessionSource = 'pid-file'; + } + if (!cwd) cwd = _pidCwdCache.get(m.pid) || ''; - // Try to find session ID by matching cwd + tool to loaded sessions - if (!sessionId) { - const allS = loadSessions(); - const match = allS.find(s => s.tool === tool && s.project === cwd); - if (match) { - sessionId = match.id; - sessionSource = 'cwd-match'; - } - // If still no match, find latest session of this tool - if (!sessionId) { - const latest = allS.filter(s => s.tool === tool).sort((a,b) => b.last_ts - a.last_ts)[0]; - if (latest) { - sessionId = latest.id; - sessionSource = 'fallback-latest'; - } - } + if (!sessionId && cachedSessions.length > 0) { + const match = cachedSessions.find(s => s.tool === m.tool && s.project === cwd); + if (match) { + sessionId = match.id; + sessionSource = 'cwd-match'; } - - const status = cpu < 1 && (stat.includes('S') || stat.includes('T')) ? 'waiting' : 'active'; - - active.push({ - pid: pid, - sessionId: sessionId, - cwd: cwd, - startedAt: startedAt, - kind: tool, - entrypoint: tool, - status: status, - cpu: cpu, - memoryMB: Math.round(rss / 1024), - _sessionSource: sessionSource, - }); } - } catch {} + const status = m.cpu < 1 && (m.stat.includes('S') || m.stat.includes('T')) ? 'waiting' : 'active'; + + active.push({ + pid: m.pid, + sessionId, + cwd, + startedAt, + kind: m.tool, + entrypoint: m.tool, + status, + cpu: m.cpu, + memoryMB: Math.round(m.rss / 1024), + _sessionSource: sessionSource, + }); + } + + _activeCache = active; + _activeCacheTs = Date.now(); return active; } @@ -2852,7 +3988,8 @@ const fmtLocalDay = (ts) => { }; // Disk cache for per-session daily message breakdown -const DAILY_STATS_CACHE_FILE = path.join(os.tmpdir(), 'codedash-daily-stats-cache.json'); +const DAILY_STATS_CACHE_FILE = path.join(CODEDASH_CACHE_DIR, 'daily-stats-cache.json'); +const LEGACY_DAILY_STATS_CACHE_FILE = path.join(os.tmpdir(), 'codedash-daily-stats-cache.json'); let _dailyStatsDiskCache = null; function _loadDailyStatsDiskCache() { @@ -2860,6 +3997,8 @@ function _loadDailyStatsDiskCache() { try { if (fs.existsSync(DAILY_STATS_CACHE_FILE)) { _dailyStatsDiskCache = JSON.parse(fs.readFileSync(DAILY_STATS_CACHE_FILE, 'utf8')); + } else if (fs.existsSync(LEGACY_DAILY_STATS_CACHE_FILE)) { + _dailyStatsDiskCache = JSON.parse(fs.readFileSync(LEGACY_DAILY_STATS_CACHE_FILE, 'utf8')); } } catch {} if (!_dailyStatsDiskCache) _dailyStatsDiskCache = {}; @@ -2868,7 +4007,9 @@ function _loadDailyStatsDiskCache() { function _saveDailyStatsDiskCache() { if (!_dailyStatsDiskCache) return; try { - fs.writeFileSync(DAILY_STATS_CACHE_FILE, JSON.stringify(_dailyStatsDiskCache)); + const tmp = DAILY_STATS_CACHE_FILE + '.tmp'; + fs.writeFileSync(tmp, JSON.stringify(_dailyStatsDiskCache)); + fs.renameSync(tmp, DAILY_STATS_CACHE_FILE); } catch {} } @@ -2920,37 +4061,7 @@ function _computeSessionDailyBreakdown(s, found) { return { msgsByDay, tsByDay }; } -// Daily stats result cache -const DAILY_RESULT_CACHE_FILE = path.join(os.tmpdir(), 'codedash-daily-result-cache.json'); -let _dailyResultCache = null; -let _dailyResultCacheKey = null; - function getDailyStats(sessions) { - const key = _analyticsKey(sessions); - if (_dailyResultCache && _dailyResultCacheKey === key) return _dailyResultCache; - - // Try disk cache - if (!_dailyResultCache) { - try { - if (fs.existsSync(DAILY_RESULT_CACHE_FILE)) { - const cached = JSON.parse(fs.readFileSync(DAILY_RESULT_CACHE_FILE, 'utf8')); - if (cached._key === key) { - _dailyResultCache = cached.data; - _dailyResultCacheKey = key; - return cached.data; - } - } - } catch {} - } - - const result = _computeDailyStats(sessions); - _dailyResultCache = result; - _dailyResultCacheKey = key; - try { fs.writeFileSync(DAILY_RESULT_CACHE_FILE, JSON.stringify({ _key: key, data: result })); } catch {} - return result; -} - -function _computeDailyStats(sessions) { const byDay = {}; const ensureDay = (date) => { if (!byDay[date]) byDay[date] = { date, sessions: 0, messages: 0, hours: 0, cost: 0, agents: {} }; @@ -3005,12 +4116,10 @@ function _computeDailyStats(sessions) { const day = s.date || fmtLocalDay(s.last_ts); const d = ensureDay(day); d.sessions++; - // Use exact user_messages count if available, otherwise estimate + // Only count EXACT user_messages. The 0.5 estimate was wildly wrong + // because Claude type=user entries include tool_results (up to 28x inflation). if (s.user_messages > 0) { d.messages += s.user_messages; - } else { - const totalMsgEst = s.detail_messages || s.messages || 0; - d.messages += Math.ceil(totalMsgEst * 0.5); } d.hours += Math.min((s.last_ts - s.first_ts) / 3600000, 16); d.cost += sessionCost; @@ -3023,6 +4132,8 @@ function _computeDailyStats(sessions) { d.cost = Math.round(d.cost * 100) / 100; } _saveDailyStatsDiskCache(); + _saveCostDiskCache(true); + _saveParsedDiskCache(true); return Object.values(byDay).sort((a, b) => b.date.localeCompare(a.date)); } @@ -3083,6 +4194,12 @@ function getLeaderboardStats() { module.exports = { loadSessions, + loadSessionsAsync, + getWarmingStatus, + getSqliteBackfillStatus, + createCostAggregator, + computeSessionCostForAnalytics, + buildOpencodeCostCache, loadSessionDetail, getProjectGitInfo, getLeaderboardStats, diff --git a/src/frontend/analytics.js b/src/frontend/analytics.js index d414be9..78f94f8 100644 --- a/src/frontend/analytics.js +++ b/src/frontend/analytics.js @@ -9,9 +9,43 @@ async function renderAnalytics(container) { if (dateFrom) params.push('from=' + dateFrom); if (dateTo) params.push('to=' + dateTo); if (params.length) url += '?' + params.join('&'); - var resp = await fetch(url); - var data = await resp.json(); + // Poll loop — server runs a background job, we render live partial snapshots + var pollStart = Date.now(); + var data = null; + while (true) { + var resp = await fetch(url); + var payload = await resp.json(); + if (payload.status === 'done') { data = payload; break; } + if (payload.status === 'error') { + container.innerHTML = '
Analytics failed: ' + escHtml(payload.error || 'unknown') + '
'; + return; + } + var p = (payload.progress || {}); + var done = p.done || 0, total = p.total || 0; + var pct = total > 0 ? Math.round(done / total * 100) : 0; + var phase = p.phase || 'working'; + var elapsed = Math.round((payload.elapsedMs || (Date.now() - pollStart)) / 1000); + if (payload.partialResult && payload.partialResult.totalSessions > 0) { + renderAnalyticsUI(container, payload.partialResult, { + live: true, done: done, total: total, pct: pct, phase: phase, elapsed: elapsed, + }); + } else { + container.innerHTML = + '
' + + '
Computing cost analytics…
' + + '
' + escHtml(phase) + ' — ' + done + ' / ' + total + ' sessions (' + pct + '%)
' + + '
' + + '
elapsed ' + elapsed + 's · cached for next time
' + + '
'; + } + await new Promise(function(r){ setTimeout(r, 500); }); + } + + renderAnalyticsUI(container, data, { live: false }); + return; + /* === below is the original inline render block; kept so the upstream + split structure is preserved and renderAnalyticsUI uses the same code path === */ var html = '
'; html += '

Cost Analytics

'; @@ -227,6 +261,145 @@ async function renderAnalytics(container) { } } +// Renders the full Cost Analytics UI from either partial (live) or final data. +// When opts.live is true, prepends a sticky progress banner. +function renderAnalyticsUI(container, data, opts) { + opts = opts || {}; + var html = '
'; + + if (opts.live) { + var pct = opts.pct || 0; + html += '
'; + html += '
Computing cost analytics — live' + (opts.elapsed || 0) + 's
'; + html += '
' + escHtml(opts.phase || '') + ' — ' + (opts.done || 0) + ' / ' + (opts.total || 0) + ' sessions (' + pct + '%) · numbers update as sessions are processed
'; + html += '
'; + html += '
'; + } + + html += '

Cost Analytics' + (opts.live ? ' (live)' : '') + '

'; + + // Summary cards + html += '
'; + html += '
$' + (data.totalCost || 0).toFixed(2) + 'Total cost (API-equivalent)
'; + html += '
' + formatTokens(data.totalTokens || 0) + 'Total tokens
'; + html += '
$' + (data.dailyRate || 0).toFixed(2) + 'Avg per day (' + (data.days || 1) + ' days)
'; + html += '
' + (data.totalSessions || 0) + 'Sessions
'; + html += '
'; + + // Burn rate + var todayCost = data.todayCost || 0; + var last1hCost = data.last1hCost || 0; + var dailyRate = data.dailyRate || 0; + var hoursElapsed = data.hoursElapsedToday || 1; + var projectedDaily = todayCost / (hoursElapsed / 24); + var paceRatio = dailyRate > 0 ? projectedDaily / dailyRate : 0; + var burnClass = paceRatio >= 2 ? 'burn-high' : paceRatio >= 1.3 ? 'burn-medium' : 'burn-low'; + var paceLabel = paceRatio >= 2 ? '🔥 ' + Math.round(paceRatio) + 'x avg' : paceRatio >= 1.3 ? '↑ ' + paceRatio.toFixed(1) + 'x avg' : dailyRate > 0 ? '✓ normal' : ''; + html += '
'; + html += '
Burn Rate
'; + html += '
'; + html += '
$' + todayCost.toFixed(3) + 'today' + (paceLabel ? '' + paceLabel + '' : '') + '
'; + html += '
$' + last1hCost.toFixed(3) + 'last hour
'; + if (dailyRate > 0) { + html += '
$' + projectedDaily.toFixed(2) + 'projected today
'; + } + html += '
'; + + // Token breakdown + if (data.totalInputTokens !== undefined) { + var totalTok = (data.totalInputTokens || 0) + (data.totalOutputTokens || 0) + (data.totalCacheReadTokens || 0) + (data.totalCacheCreateTokens || 0); + var pctOf = function(n) { return totalTok > 0 ? Math.round(n / totalTok * 100) : 0; }; + html += '
'; + html += '

Token Breakdown

'; + html += '
'; + html += '
' + formatTokens(data.totalInputTokens || 0) + 'Input' + pctOf(data.totalInputTokens || 0) + '%
'; + html += '
' + formatTokens(data.totalOutputTokens || 0) + 'Output' + pctOf(data.totalOutputTokens || 0) + '%
'; + html += '
' + formatTokens(data.totalCacheReadTokens || 0) + 'Cache read' + pctOf(data.totalCacheReadTokens || 0) + '%
'; + html += '
' + formatTokens(data.totalCacheCreateTokens || 0) + 'Cache write' + pctOf(data.totalCacheCreateTokens || 0) + '%
'; + if (data.avgContextPct > 0) { + html += '
' + data.avgContextPct + '%Avg context usedof 200K
'; + } + html += '
'; + } + + // Cost by agent + var agentEntries = Object.entries(data.byAgent || {}).filter(function(e) { return e[1].sessions > 0; }); + if (agentEntries.length >= 1) { + agentEntries.sort(function(a, b) { return b[1].cost - a[1].cost; }); + html += '

Cost by Agent

'; + html += '
'; + var maxAgentCost = agentEntries[0][1].cost || 1; + agentEntries.forEach(function(entry) { + var name = entry[0]; var info = entry[1]; + var p = maxAgentCost > 0 ? (info.cost / maxAgentCost * 100) : 0; + var label = { 'claude': 'Claude Code', 'claude-ext': 'Claude Ext', 'codex': 'Codex', 'opencode': 'OpenCode', 'cursor': 'Cursor', 'kiro': 'Kiro' }[name] || name; + var estMark = info.estimated ? ' ~est.' : ''; + html += '
'; + html += '' + label + estMark + ''; + html += '
'; + html += '$' + info.cost.toFixed(2) + ' (' + info.sessions + ' sess.)'; + html += '
'; + }); + html += '
'; + } + + // Daily cost chart (last 30 days) + var dayKeys = Object.keys(data.byDay || {}).sort(); + var last30 = dayKeys.slice(-30); + if (last30.length > 0) { + var maxCost = Math.max.apply(null, last30.map(function(d) { return data.byDay[d].cost; })); + html += '

Daily Cost (last 30 days)

'; + html += '
'; + last30.forEach(function(d) { + var c = data.byDay[d]; + var p = maxCost > 0 ? (c.cost / maxCost * 100) : 0; + var label = d.slice(5); + html += '
'; + html += '
'; + html += '
' + label + '
'; + html += '
'; + }); + html += '
'; + } + + // Cost by project + var projects = Object.entries(data.byProject || {}).sort(function(a, b) { return b[1].cost - a[1].cost; }); + var topProjects = projects.slice(0, 10); + if (topProjects.length > 0) { + var maxProjCost = topProjects[0][1].cost || 1; + html += '

Cost by Project

'; + html += '
'; + topProjects.forEach(function(entry) { + var name = entry[0]; var info = entry[1]; + var p = maxProjCost > 0 ? (info.cost / maxProjCost * 100) : 0; + html += '
'; + html += '' + escHtml(name) + ''; + html += '
'; + html += '$' + info.cost.toFixed(2) + ''; + html += '
'; + }); + html += '
'; + } + + // Top expensive sessions + if (data.topSessions && data.topSessions.length > 0) { + html += '

Most Expensive Sessions

'; + html += '
'; + data.topSessions.forEach(function(s) { + html += '
'; + html += '$' + s.cost.toFixed(2) + ''; + html += '' + escHtml(s.project || '') + ''; + html += '' + (s.date || '') + ''; + html += '' + (s.id || '').slice(0, 8) + ''; + html += '
'; + }); + html += '
'; + } + + html += '
'; + container.innerHTML = html; +} + function formatTokens(n) { if (n >= 1000000) return (n / 1000000).toFixed(1) + 'M'; if (n >= 1000) return (n / 1000).toFixed(0) + 'K'; diff --git a/src/frontend/app.js b/src/frontend/app.js index 6fd757a..87eaf97 100644 --- a/src/frontend/app.js +++ b/src/frontend/app.js @@ -691,6 +691,9 @@ function renderCard(s, idx) { } html += '