diff --git a/src/functions/auto-forget.ts b/src/functions/auto-forget.ts index ce90d83..be59af9 100644 --- a/src/functions/auto-forget.ts +++ b/src/functions/auto-forget.ts @@ -86,18 +86,27 @@ export function registerAutoForgetFunction(sdk: ISdk, kv: StateKV): void { } const sessions = await kv.list(KV.sessions); - for (const session of sessions) { - const observations = await kv - .list(KV.observations(session.id)) - .catch(() => []); - for (const obs of observations) { + const obsPerSession: CompressedObservation[][] = []; + for (let batch = 0; batch < sessions.length; batch += 10) { + const chunk = sessions.slice(batch, batch + 10); + const results = await Promise.all( + chunk.map((s) => + kv + .list(KV.observations(s.id)) + .catch(() => [] as CompressedObservation[]), + ), + ); + obsPerSession.push(...results); + } + for (let i = 0; i < sessions.length; i++) { + for (const obs of obsPerSession[i]) { if (!obs.timestamp) continue; const age = now - new Date(obs.timestamp).getTime(); if (age > 180 * MS_PER_DAY && (obs.importance ?? 5) <= 2) { result.lowValueObs.push(obs.id); if (!dryRun) { await kv - .delete(KV.observations(session.id), obs.id) + .delete(KV.observations(sessions[i].id), obs.id) .catch(() => {}); } } diff --git a/src/functions/consolidate.ts b/src/functions/consolidate.ts index 64ccc17..0e6e11b 100644 --- a/src/functions/consolidate.ts +++ b/src/functions/consolidate.ts @@ -78,13 +78,22 @@ export function registerConsolidateFunction( : sessions; const allObs: Array = []; - for (const session of filtered) { - const observations = await kv.list( - KV.observations(session.id), + const obsPerSession: CompressedObservation[][] = []; + for (let batch = 0; batch < filtered.length; batch += 10) { + const chunk = filtered.slice(batch, batch + 10); + const results = await Promise.all( + chunk.map((s) => + kv + .list(KV.observations(s.id)) + .catch(() => [] as CompressedObservation[]), + ), ); - for (const obs of observations) { + obsPerSession.push(...results); + } + for (let i = 0; i < filtered.length; i++) { + for (const obs of obsPerSession[i]) { if (obs.title && obs.importance >= 5) { - allObs.push({ ...obs, sid: session.id }); + allObs.push({ ...obs, sid: filtered[i].id }); } } } diff --git a/src/functions/context.ts b/src/functions/context.ts index 9117462..804f157 100644 --- a/src/functions/context.ts +++ b/src/functions/context.ts @@ -86,9 +86,15 @@ export function registerContextFunction( ) .slice(0, 10); - for (const session of sessions) { - const summary = await kv.get(KV.summaries, session.id); + const summariesPerSession = await Promise.all( + sessions.map((s) => + kv.get(KV.summaries, s.id).catch(() => null), + ), + ); + const sessionsNeedingObs: number[] = []; + for (let i = 0; i < sessions.length; i++) { + const summary = summariesPerSession[i]; if (summary) { const content = `## ${summary.title}\n${summary.narrative}\nDecisions: ${summary.keyDecisions.join("; ")}\nFiles: ${summary.filesModified.join(", ")}`; blocks.push({ @@ -97,12 +103,22 @@ export function registerContextFunction( tokens: estimateTokens(content), recency: new Date(summary.createdAt).getTime(), }); - continue; + } else { + sessionsNeedingObs.push(i); } + } - const observations = await kv.list( - KV.observations(session.id), - ); + const obsResults = await Promise.all( + sessionsNeedingObs.map((i) => + kv + .list(KV.observations(sessions[i].id)) + .catch(() => []), + ), + ); + + for (let j = 0; j < sessionsNeedingObs.length; j++) { + const i = sessionsNeedingObs[j]; + const observations = obsResults[j]; const important = observations.filter( (o) => o.title && o.importance >= 5, ); @@ -113,12 +129,12 @@ export function registerContextFunction( .slice(0, 5) .map((o) => `- [${o.type}] ${o.title}: ${o.narrative}`) .join("\n"); - const content = `## Session ${session.id.slice(0, 8)} (${session.startedAt})\n${items}`; + const content = `## Session ${sessions[i].id.slice(0, 8)} (${sessions[i].startedAt})\n${items}`; blocks.push({ type: "observation", content, tokens: estimateTokens(content), - recency: new Date(session.startedAt).getTime(), + recency: new Date(sessions[i].startedAt).getTime(), }); } } diff --git a/src/functions/export-import.ts b/src/functions/export-import.ts index b77b2f5..2c32763 100644 --- a/src/functions/export-import.ts +++ b/src/functions/export-import.ts @@ -20,6 +20,7 @@ import type { Sketch, Crystal, Facet, + ExportPagination, } from "../types.js"; import { KV } from "../state/schema.js"; import { StateKV } from "../state/kv.js"; @@ -28,59 +29,80 @@ import { VERSION } from "../version.js"; export function registerExportImportFunction(sdk: ISdk, kv: StateKV): void { sdk.registerFunction( { id: "mem::export", description: "Export all memory data as JSON" }, - async () => { + async (data?: { maxSessions?: number; offset?: number }) => { const ctx = getContext(); + const rawMax = Number(data?.maxSessions); + const maxSessions = Number.isFinite(rawMax) && rawMax > 0 ? Math.min(Math.floor(rawMax), 1000) : undefined; + const rawOffset = Number(data?.offset); + const offset = Number.isFinite(rawOffset) && rawOffset >= 0 ? Math.floor(rawOffset) : 0; - const sessions = await kv.list(KV.sessions); + const allSessions = await kv.list(KV.sessions); + const paginatedSessions = maxSessions !== undefined + ? allSessions.slice(offset, offset + maxSessions) + : allSessions; const memories = await kv.list(KV.memories); const summaries = await kv.list(KV.summaries); const observations: Record = {}; - for (const session of sessions) { - const obs = await kv - .list(KV.observations(session.id)) - .catch(() => []); + const obsResults = await Promise.all( + paginatedSessions.map((session) => + kv + .list(KV.observations(session.id)) + .catch(() => [] as CompressedObservation[]) + .then((obs) => ({ sessionId: session.id, obs })), + ), + ); + for (const { sessionId, obs } of obsResults) { if (obs.length > 0) { - observations[session.id] = obs; + observations[sessionId] = obs; } } const profiles: ProjectProfile[] = []; - const uniqueProjects = [...new Set(sessions.map((s) => s.project))]; - for (const project of uniqueProjects) { - const profile = await kv - .get(KV.profiles, project) - .catch(() => null); + const uniqueProjects = [...new Set(paginatedSessions.map((s) => s.project))]; + const profileResults = await Promise.all( + uniqueProjects.map((project) => + kv.get(KV.profiles, project).catch(() => null), + ), + ); + for (const profile of profileResults) { if (profile) profiles.push(profile); } - const graphNodes = await kv - .list(KV.graphNodes) - .catch(() => []); - const graphEdges = await kv - .list(KV.graphEdges) - .catch(() => []); - const semanticMemories = await kv - .list(KV.semantic) - .catch(() => []); - const proceduralMemories = await kv - .list(KV.procedural) - .catch(() => []); - - const actions = await kv.list(KV.actions).catch(() => []); - const actionEdges = await kv.list(KV.actionEdges).catch(() => []); - const sentinels = await kv.list(KV.sentinels).catch(() => []); - const sketches = await kv.list(KV.sketches).catch(() => []); - const crystals = await kv.list(KV.crystals).catch(() => []); - const facets = await kv.list(KV.facets).catch(() => []); - const routines = await kv.list(KV.routines).catch(() => []); - const signals = await kv.list(KV.signals).catch(() => []); - const checkpoints = await kv.list(KV.checkpoints).catch(() => []); + const [ + graphNodes, + graphEdges, + semanticMemories, + proceduralMemories, + actions, + actionEdges, + sentinels, + sketches, + crystals, + facets, + routines, + signals, + checkpoints, + ] = await Promise.all([ + kv.list(KV.graphNodes).catch(() => []), + kv.list(KV.graphEdges).catch(() => []), + kv.list(KV.semantic).catch(() => []), + kv.list(KV.procedural).catch(() => []), + kv.list(KV.actions).catch(() => []), + kv.list(KV.actionEdges).catch(() => []), + kv.list(KV.sentinels).catch(() => []), + kv.list(KV.sketches).catch(() => []), + kv.list(KV.crystals).catch(() => []), + kv.list(KV.facets).catch(() => []), + kv.list(KV.routines).catch(() => []), + kv.list(KV.signals).catch(() => []), + kv.list(KV.checkpoints).catch(() => []), + ]); const exportData: ExportData = { version: VERSION, exportedAt: new Date().toISOString(), - sessions, + sessions: paginatedSessions, observations, memories, summaries, @@ -102,12 +124,22 @@ export function registerExportImportFunction(sdk: ISdk, kv: StateKV): void { checkpoints: checkpoints.length > 0 ? checkpoints : undefined, }; + if (maxSessions !== undefined) { + exportData.pagination = { + offset, + limit: maxSessions, + total: allSessions.length, + hasMore: offset + maxSessions < allSessions.length, + }; + } + const totalObs = Object.values(observations).reduce( (sum, arr) => sum + arr.length, 0, ); ctx.logger.info("Export complete", { - sessions: sessions.length, + sessions: paginatedSessions.length, + totalSessions: allSessions.length, observations: totalObs, memories: memories.length, summaries: summaries.length, diff --git a/src/functions/profile.ts b/src/functions/profile.ts index 1ec6dd7..245d012 100644 --- a/src/functions/profile.ts +++ b/src/functions/profile.ts @@ -49,10 +49,18 @@ export function registerProfileFunction(sdk: ISdk, kv: StateKV): void { new Date(b.startedAt).getTime() - new Date(a.startedAt).getTime(), ); - for (const session of sortedSessions.slice(0, 20)) { - const observations = await kv.list( - KV.observations(session.id), - ); + const top20Sessions = sortedSessions.slice(0, 20); + const obsPerSession = await Promise.all( + top20Sessions.map((s) => + kv + .list(KV.observations(s.id)) + .catch(() => [] as CompressedObservation[]), + ), + ); + + for (let i = 0; i < top20Sessions.length; i++) { + const session = top20Sessions[i]; + const observations = obsPerSession[i]; totalObs += observations.length; for (const obs of observations) { diff --git a/src/functions/search.ts b/src/functions/search.ts index 641b372..660795c 100644 --- a/src/functions/search.ts +++ b/src/functions/search.ts @@ -20,8 +20,27 @@ export async function rebuildIndex(kv: StateKV): Promise { if (!sessions.length) return 0 let count = 0 - for (const session of sessions) { - const observations = await kv.list(KV.observations(session.id)) + const obsPerSession: CompressedObservation[][] = [] + const failedSessions: string[] = [] + for (let batch = 0; batch < sessions.length; batch += 10) { + const chunk = sessions.slice(batch, batch + 10) + const results = await Promise.all( + chunk.map(async (s) => { + try { + return await kv.list(KV.observations(s.id)) + } catch { + failedSessions.push(s.id) + return [] as CompressedObservation[] + } + }) + ) + obsPerSession.push(...results) + } + if (failedSessions.length > 0) { + const ctx = getContext() + ctx.logger.warn('rebuildIndex: failed to load observations for sessions', { failedSessions }) + } + for (const observations of obsPerSession) { for (const obs of observations) { if (obs.title && obs.narrative) { idx.add(obs) diff --git a/src/functions/smart-search.ts b/src/functions/smart-search.ts index d525d6a..5460c08 100644 --- a/src/functions/smart-search.ts +++ b/src/functions/smart-search.ts @@ -19,32 +19,44 @@ export function registerSmartSearchFunction( description: "Search with progressive disclosure: compact results first, expand specific IDs for full details", }, - async (data: { query?: string; expandIds?: string[]; limit?: number }) => { + async (data: { + query?: string; + expandIds?: Array; + limit?: number; + }) => { const ctx = getContext(); if (data.expandIds && data.expandIds.length > 0) { - const ids = data.expandIds.slice(0, 20); + const raw = data.expandIds.slice(0, 20); + const items = raw.map((entry) => { + if (typeof entry === "string") return { obsId: entry, sessionId: undefined as string | undefined }; + if (entry && typeof entry === "object" && typeof (entry as any).obsId === "string") { + return { obsId: (entry as any).obsId, sessionId: (entry as any).sessionId as string | undefined }; + } + return null; + }).filter((item): item is NonNullable => item !== null); + const expanded: Array<{ obsId: string; sessionId: string; observation: CompressedObservation; }> = []; - for (const obsId of ids) { - const obs = await findObservation(kv, obsId); - if (obs) { - expanded.push({ - obsId, - sessionId: obs.sessionId, - observation: obs, - }); - } + const results = await Promise.all( + items.map(({ obsId, sessionId }) => + findObservation(kv, obsId, sessionId).then((obs) => + obs ? { obsId, sessionId: obs.sessionId, observation: obs } : null, + ), + ), + ); + for (const r of results) { + if (r) expanded.push(r); } - const truncated = data.expandIds.length > ids.length; + const truncated = data.expandIds.length > raw.length; ctx.logger.info("Smart search expanded", { requested: data.expandIds.length, - attempted: ids.length, + attempted: raw.length, returned: expanded.length, truncated, }); @@ -79,13 +91,25 @@ export function registerSmartSearchFunction( async function findObservation( kv: StateKV, obsId: string, + sessionIdHint?: string, ): Promise { - const sessions = await kv.list<{ id: string }>(KV.sessions); - for (const session of sessions) { + if (sessionIdHint) { const obs = await kv - .get(KV.observations(session.id), obsId) + .get(KV.observations(sessionIdHint), obsId) .catch(() => null); if (obs) return obs; } + + const sessions = await kv.list<{ id: string }>(KV.sessions); + for (let i = 0; i < sessions.length; i += 5) { + const batch = sessions.slice(i, i + 5); + const results = await Promise.all( + batch.map((s) => + kv.get(KV.observations(s.id), obsId).catch(() => null), + ), + ); + const found = results.find((r) => r !== null); + if (found) return found; + } return null; } diff --git a/src/health/monitor.ts b/src/health/monitor.ts index e49468f..be4b6c0 100644 --- a/src/health/monitor.ts +++ b/src/health/monitor.ts @@ -45,6 +45,24 @@ export function registerHealthMonitor( if (result?.workers) workers = result.workers; } catch {} + const KV_PROBE_TIMEOUT = 5000; + let kvConnectivity: { status: string; latencyMs?: number; error?: string }; + const kvStart = performance.now(); + try { + await Promise.race([ + (async () => { + await kv.set(KV.health, "_probe", { ts: Date.now() }); + await kv.get(KV.health, "_probe"); + })(), + new Promise((_, reject) => + setTimeout(() => reject(new Error("timeout")), KV_PROBE_TIMEOUT), + ), + ]); + kvConnectivity = { status: "ok", latencyMs: Math.round((performance.now() - kvStart) * 100) / 100 }; + } catch { + kvConnectivity = { status: "error", error: "kv_probe_failed", latencyMs: Math.round((performance.now() - kvStart) * 100) / 100 }; + } + const snapshot: HealthSnapshot = { connectionState, workers, @@ -61,6 +79,7 @@ export function registerHealthMonitor( }, eventLoopLagMs, uptimeSeconds: uptime, + kvConnectivity, status: "healthy", alerts: [], }; diff --git a/src/index.ts b/src/index.ts index 86faaf2..863377c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -278,8 +278,7 @@ async function main() { kv, sdk, secret, - metricsStore, - provider, + config.restPort, ); const shutdown = async () => { diff --git a/src/types.ts b/src/types.ts index 16be515..4bea190 100644 --- a/src/types.ts +++ b/src/types.ts @@ -174,6 +174,7 @@ export interface HealthSnapshot { cpu: { userMicros: number; systemMicros: number; percent: number }; eventLoopLagMs: number; uptimeSeconds: number; + kvConnectivity?: { status: string; latencyMs?: number; error?: string }; status: "healthy" | "degraded" | "critical"; alerts: string[]; } @@ -238,6 +239,13 @@ export interface ProjectProfile { summary?: string; } +export interface ExportPagination { + offset: number; + limit: number; + total: number; + hasMore: boolean; +} + export interface ExportData { version: "0.3.0" | "0.4.0" | "0.5.0" | "0.6.0"; exportedAt: string; @@ -259,6 +267,7 @@ export interface ExportData { sketches?: Sketch[]; crystals?: Crystal[]; facets?: Facet[]; + pagination?: ExportPagination; } export interface EmbeddingConfig { diff --git a/src/viewer/server.ts b/src/viewer/server.ts index 3f26725..ecd9667 100644 --- a/src/viewer/server.ts +++ b/src/viewer/server.ts @@ -7,24 +7,6 @@ import { import { readFileSync } from "node:fs"; import { join, dirname } from "node:path"; import { fileURLToPath } from "node:url"; -import type { ISdk } from "iii-sdk"; -import type { - Session, - Memory, - CompressedObservation, - SessionSummary, - MemoryRelation, - SemanticMemory, - ProceduralMemory, - GraphNode, - GraphEdge, - RawObservation, -} from "../types.js"; -import { KV } from "../state/schema.js"; -import type { StateKV } from "../state/kv.js"; -import { getLatestHealth } from "../health/monitor.js"; -import type { MetricsStore } from "../eval/metrics-store.js"; -import type { ResilientProvider } from "../providers/resilient.js"; import { timingSafeCompare, VIEWER_CSP } from "../auth.js"; const ALLOWED_ORIGINS = ( @@ -87,316 +69,20 @@ function checkAuth(req: IncomingMessage, secret: string | undefined): boolean { ); } -function gid(prefix: string): string { - const ts = Date.now().toString(36); - const rand = Math.random().toString(36).slice(2, 14); - return `${prefix}_${ts}_${rand}`; -} - -async function buildGraphFromData(kv: StateKV): Promise<{ - success: boolean; - nodes: number; - edges: number; -}> { - const sessions = await kv.list(KV.sessions); - const memories = await kv.list(KV.memories); - const now = new Date().toISOString(); - - const nodeMap = new Map(); - const edges: GraphEdge[] = []; - - function ensureNode( - name: string, - type: GraphNode["type"], - obsIds: string[] = [], - ): GraphNode { - const key = `${type}:${name}`; - if (!nodeMap.has(key)) { - nodeMap.set(key, { - id: gid("gn"), - type, - name, - properties: {}, - sourceObservationIds: obsIds, - createdAt: now, - }); - } - return nodeMap.get(key)!; - } - - for (const sess of sessions) { - const obs = await kv - .list(KV.observations(sess.id)) - .catch(() => []); - - const projectNode = ensureNode( - sess.project || sess.cwd || sess.id, - "concept", - ); - - for (const o of obs) { - const toolName = o.toolName; - const input = o.toolInput as Record | undefined; - - if (toolName) { - const toolNode = ensureNode(toolName, "function", [o.id]); - edges.push({ - id: gid("ge"), - type: "uses", - sourceNodeId: projectNode.id, - targetNodeId: toolNode.id, - weight: 1, - sourceObservationIds: [o.id], - createdAt: now, - }); - - if (input) { - const fp = - (input.file_path as string) || - (input.path as string) || - (input.pattern as string); - if (fp) { - const fileNode = ensureNode(fp, "file", [o.id]); - edges.push({ - id: gid("ge"), - type: "modifies", - sourceNodeId: toolNode.id, - targetNodeId: fileNode.id, - weight: 1, - sourceObservationIds: [o.id], - createdAt: now, - }); - } - const query = - (input.query as string) || (input.description as string); - if (query) { - const conceptNode = ensureNode( - query.length > 40 ? query.slice(0, 40) + "..." : query, - "concept", - [o.id], - ); - edges.push({ - id: gid("ge"), - type: "related_to", - sourceNodeId: toolNode.id, - targetNodeId: conceptNode.id, - weight: 0.8, - sourceObservationIds: [o.id], - createdAt: now, - }); - } - } - } - - if (o.concepts) { - for (const c of o.concepts) { - ensureNode(c, "concept", [o.id]); - } - } - if (o.files) { - for (const f of o.files) { - ensureNode(f, "file", [o.id]); - } - } - } - } - - for (const mem of memories) { - const memNode = ensureNode( - mem.content.length > 50 ? mem.content.slice(0, 50) + "..." : mem.content, - mem.type === "architecture" - ? "pattern" - : mem.type === "fact" - ? "concept" - : "decision", - [], - ); - const ids = (memNode.properties.memoryIds as string[]) || []; - ids.push(mem.id); - memNode.properties.memoryIds = ids; - const types = (memNode.properties.memoryTypes as string[]) || []; - if (!types.includes(mem.type)) types.push(mem.type); - memNode.properties.memoryTypes = types; - - if (mem.concepts) { - for (const c of mem.concepts) { - const cNode = ensureNode(c, "concept"); - edges.push({ - id: gid("ge"), - type: "related_to", - sourceNodeId: memNode.id, - targetNodeId: cNode.id, - weight: 0.7, - sourceObservationIds: [], - createdAt: now, - }); - } - } - } - - const fileNodes = [...nodeMap.values()].filter((n) => n.type === "file"); - const funcNodes = [...nodeMap.values()].filter((n) => n.type === "function"); - for (const fn of funcNodes) { - for (const file of fileNodes) { - const hasEdge = edges.some( - (e) => - (e.sourceNodeId === fn.id && e.targetNodeId === file.id) || - (e.sourceNodeId === file.id && e.targetNodeId === fn.id), - ); - if (!hasEdge) continue; - for (const fn2 of funcNodes) { - if (fn2.id === fn.id) continue; - const alsoTouches = edges.some( - (e) => - (e.sourceNodeId === fn2.id && e.targetNodeId === file.id) || - (e.sourceNodeId === file.id && e.targetNodeId === fn2.id), - ); - if (alsoTouches) { - const exists = edges.some( - (e) => - (e.sourceNodeId === fn.id && e.targetNodeId === fn2.id) || - (e.sourceNodeId === fn2.id && e.targetNodeId === fn.id), - ); - if (!exists) { - edges.push({ - id: gid("ge"), - type: "related_to", - sourceNodeId: fn.id, - targetNodeId: fn2.id, - weight: 0.5, - sourceObservationIds: [], - createdAt: now, - }); - } - } - } - } - } - - const oldNodes = await kv.list(KV.graphNodes).catch(() => []); - for (const old of oldNodes) { - await kv.delete(KV.graphNodes, old.id); - } - const oldEdges = await kv.list(KV.graphEdges).catch(() => []); - for (const old of oldEdges) { - await kv.delete(KV.graphEdges, old.id); - } - - const nodes = [...nodeMap.values()]; - for (const n of nodes) { - await kv.set(KV.graphNodes, n.id, n); - } - for (const e of edges) { - await kv.set(KV.graphEdges, e.id, e); - } - - return { success: true, nodes: nodes.length, edges: edges.length }; -} - -async function buildProfileFromRawObs( - kv: StateKV, - project: string, -): Promise> { - const sessions = await kv.list(KV.sessions); - const projSessions = sessions.filter( - (s) => s.project === project || s.cwd === project, - ); - - const fileCounts: Record = {}; - const conceptCounts: Record = {}; - const toolCounts: Record = {}; - const conventions: string[] = []; - let totalObs = 0; - - for (const sess of projSessions) { - const obs = await kv - .list(KV.observations(sess.id)) - .catch(() => []); - totalObs += obs.length; - - for (const o of obs) { - if (o.toolName) - toolCounts[o.toolName] = (toolCounts[o.toolName] || 0) + 1; - if (o.concepts) { - for (const c of o.concepts) - conceptCounts[c] = (conceptCounts[c] || 0) + 1; - } - if (o.files) { - for (const f of o.files) fileCounts[f] = (fileCounts[f] || 0) + 1; - } - const input = o.toolInput as Record | undefined; - if (input) { - const fp = - (input.file_path as string) || - (input.path as string) || - (input.pattern as string); - if (fp) fileCounts[fp] = (fileCounts[fp] || 0) + 1; - } - } - } - - const toolList = Object.entries(toolCounts).sort((a, b) => b[1] - a[1]); - if (toolList.length > 0) { - conventions.push( - `Most used tools: ${toolList - .slice(0, 5) - .map(([t, c]) => `${t} (${c}x)`) - .join(", ")}`, - ); - } - if (projSessions.length > 0) { - const active = projSessions.filter((s) => s.status === "active").length; - conventions.push( - `${projSessions.length} sessions (${active} active), ${totalObs} total observations`, - ); - } - - const topConcepts = Object.entries(conceptCounts) - .sort((a, b) => b[1] - a[1]) - .slice(0, 10) - .map(([concept, frequency]) => ({ concept, frequency })); - - const topFiles = Object.entries(fileCounts) - .sort((a, b) => b[1] - a[1]) - .slice(0, 10) - .map(([file, frequency]) => ({ file, frequency })); - - if (topConcepts.length === 0) { - for (const [tool, count] of toolList.slice(0, 8)) { - topConcepts.push({ concept: tool, frequency: count }); - } - } - - return { - project, - sessionCount: projSessions.length, - totalObservations: totalObs, - topConcepts, - topFiles, - conventions, - commonErrors: [], - recentActivity: toolList.slice(0, 5).map(([tool, count]) => ({ - type: tool, - count, - })), - updatedAt: new Date().toISOString(), - }; -} - export function startViewerServer( port: number, - kv: StateKV, - sdk: ISdk, + _kv: unknown, + _sdk: unknown, secret?: string, - metricsStore?: MetricsStore, - provider?: ResilientProvider | { circuitState?: unknown }, + restPort?: number, ): Server { + const resolvedRestPort = restPort ?? port - 2; + const server = createServer(async (req, res) => { const raw = req.url || "/"; const qIdx = raw.indexOf("?"); const pathname = qIdx >= 0 ? raw.slice(0, qIdx) : raw; const qs = qIdx >= 0 ? raw.slice(qIdx + 1) : ""; - const params = new URLSearchParams(qs); const method = req.method || "GET"; if (method === "OPTIONS") { @@ -443,20 +129,10 @@ export function startViewerServer( } try { - await handleApiRoute( - pathname, - method, - params, - req, - res, - kv, - sdk, - metricsStore, - provider, - ); + await proxyToRestApi(resolvedRestPort, pathname, qs, method, req, res, secret); } catch (err) { - console.error(`[viewer] API error on ${method} ${pathname}:`, err); - json(res, 500, { error: "internal error" }, req); + console.error(`[viewer] proxy error on ${method} ${pathname}:`, err); + json(res, 502, { error: "upstream error" }, req); } }); @@ -467,354 +143,65 @@ export function startViewerServer( return server; } -async function handleApiRoute( +async function proxyToRestApi( + restPort: number, pathname: string, + qs: string, method: string, - params: URLSearchParams, req: IncomingMessage, res: ServerResponse, - kv: StateKV, - sdk: ISdk, - metricsStore?: MetricsStore, - provider?: ResilientProvider | { circuitState?: unknown }, + secret?: string, ): Promise { - const path = pathname.replace(/^\/agentmemory\//, ""); + const upstreamPath = pathname.startsWith("/agentmemory/") + ? pathname + : `/agentmemory${pathname.startsWith("/") ? pathname : "/" + pathname}`; - if (method === "GET" && path === "livez") { - json(res, 200, { status: "ok", service: "agentmemory" }, req); - return; - } + const upstreamUrl = `http://127.0.0.1:${restPort}${upstreamPath}${qs ? "?" + qs : ""}`; - if (method === "GET" && path === "health") { - try { - const health = await getLatestHealth(kv); - const functionMetrics = metricsStore ? await metricsStore.getAll() : []; - const circuitBreaker = - provider && "circuitState" in provider ? provider.circuitState : null; - const status = health?.status || "healthy"; - json( - res, - status === "critical" ? 503 : 200, - { - status, - service: "agentmemory", - version: "0.5.0", - health: health || null, - functionMetrics, - circuitBreaker, - }, - req, - ); - } catch { - json( - res, - 200, - { - status: "healthy", - service: "agentmemory", - version: "0.5.0", - health: null, - functionMetrics: [], - circuitBreaker: null, - }, - req, - ); - } - return; - } - - if (method === "GET" && path === "sessions") { - try { - const sessions = await kv.list(KV.sessions); - json(res, 200, { sessions }, req); - } catch { - json(res, 200, { sessions: [] }, req); - } - return; + const headers: Record = {}; + if (secret) { + headers["Authorization"] = `Bearer ${secret}`; } - - if (method === "GET" && path === "memories") { - try { - const memories = await kv.list(KV.memories); - const latest = params.get("latest") === "true"; - json( - res, - 200, - { memories: latest ? memories.filter((m) => m.isLatest) : memories }, - req, - ); - } catch { - json(res, 200, { memories: [] }, req); - } - return; + const ct = req.headers["content-type"]; + if (ct) { + headers["Content-Type"] = ct; } - if (method === "GET" && path === "observations") { - const sessionId = params.get("sessionId"); - if (!sessionId) { - json(res, 400, { error: "sessionId required" }, req); - return; - } - try { - const observations = await kv.list( - KV.observations(sessionId), - ); - json(res, 200, { observations }, req); - } catch { - json(res, 200, { observations: [] }, req); - } - return; + let body: string | undefined; + if (method === "POST" || method === "PUT" || method === "DELETE" || method === "PATCH") { + body = await readBody(req); } - if (method === "GET" && path === "graph/stats") { - try { - const result = await sdk.trigger("mem::graph-stats", {}); - json(res, 200, result, req); - } catch { - try { - const nodes = await kv.list(KV.graphNodes); - const edges = await kv.list(KV.graphEdges); - const types: Record = {}; - for (const n of nodes) types[n.type] = (types[n.type] || 0) + 1; - json( - res, - 200, - { nodes: nodes.length, edges: edges.length, types }, - req, - ); - } catch { - json(res, 200, { nodes: 0, edges: 0, types: {} }, req); - } - } - return; - } - - if (method === "GET" && path === "audit") { - try { - const result = await sdk.trigger("mem::audit-query", { - operation: params.get("operation") || undefined, - limit: parseInt(params.get("limit") || "50"), - }); - const entries = Array.isArray(result) - ? result - : (result as Record).entries || []; - json(res, 200, { entries }, req); - } catch { - json(res, 200, { entries: [] }, req); - } - return; - } - - if (method === "GET" && path === "profile") { - const project = params.get("project"); - if (!project) { - json(res, 400, { error: "project required" }, req); - return; - } - try { - const result = (await sdk.trigger("mem::profile", { project })) as { - profile?: Record; - }; - const prof = result?.profile as Record | undefined; - const hasData = - prof && - ((Array.isArray(prof.topConcepts) && prof.topConcepts.length > 0) || - (Array.isArray(prof.topFiles) && prof.topFiles.length > 0)); - if (hasData) { - json(res, 200, result, req); - return; - } - const enriched = await buildProfileFromRawObs(kv, project); - json(res, 200, { profile: { ...prof, ...enriched }, cached: false }, req); - } catch { - try { - const enriched = await buildProfileFromRawObs(kv, project); - json(res, 200, { profile: enriched, cached: false }, req); - } catch { - json(res, 200, {}, req); - } - } - return; - } - - if (method === "GET" && path === "summaries") { - try { - const summaries = await kv.list(KV.summaries); - json(res, 200, { summaries }, req); - } catch { - json(res, 200, { summaries: [] }, req); - } - return; - } - - if (method === "GET" && path === "relations") { - try { - const relations = await kv.list(KV.relations); - json(res, 200, { relations }, req); - } catch { - json(res, 200, { relations: [] }, req); - } - return; - } - - if (method === "GET" && path === "semantic") { - try { - const memories = await kv.list(KV.semantic); - json(res, 200, { memories }, req); - } catch { - json(res, 200, { memories: [] }, req); - } - return; - } - - if (method === "GET" && path === "procedural") { - try { - const memories = await kv.list(KV.procedural); - json(res, 200, { memories }, req); - } catch { - json(res, 200, { memories: [] }, req); - } - return; - } - - if (method === "GET" && path === "function-metrics") { - try { - const metrics = metricsStore ? await metricsStore.getAll() : []; - json(res, 200, { metrics }, req); - } catch { - json(res, 200, { metrics: [] }, req); - } - return; - } - - if (method === "POST") { - const ct = req.headers["content-type"] || ""; - if (ct && !ct.includes("application/json")) { - json(res, 415, { error: "Content-Type must be application/json" }, req); - return; - } - let body: Record = {}; - try { - const raw = await readBody(req); - if (raw.trim()) body = JSON.parse(raw); - } catch { - json(res, 400, { error: "invalid JSON" }, req); - return; - } - - if (path === "search") { - try { - const result = await sdk.trigger("mem::search", body); - json(res, 200, result, req); - } catch { - json(res, 200, { results: [] }, req); - } - return; - } - - if (path === "graph/query") { - try { - const result = await sdk.trigger("mem::graph-query", body); - json(res, 200, result, req); - } catch { - try { - const allNodes = await kv.list(KV.graphNodes); - const allEdges = await kv.list(KV.graphEdges); - const startId = body.startNodeId as string | undefined; - if (startId) { - const connected = new Set([startId]); - for (const e of allEdges) { - if (e.sourceNodeId === startId) connected.add(e.targetNodeId); - if (e.targetNodeId === startId) connected.add(e.sourceNodeId); - } - json( - res, - 200, - { - nodes: allNodes.filter((n) => connected.has(n.id)), - edges: allEdges.filter( - (e) => - connected.has(e.sourceNodeId) && - connected.has(e.targetNodeId), - ), - depth: 1, - }, - req, - ); - } else { - json(res, 200, { nodes: allNodes, edges: allEdges, depth: 0 }, req); - } - } catch { - json(res, 200, { nodes: [], edges: [], depth: 0 }, req); - } - } - return; - } - - if (path === "graph/build") { - try { - const result = await buildGraphFromData(kv); - json(res, 200, result, req); - } catch { - json(res, 200, { success: false, nodes: 0, edges: 0 }, req); - } - return; - } - - if (path === "session/end") { - if (typeof body.sessionId !== "string" || !body.sessionId) { - json(res, 400, { success: false, error: "invalid sessionId" }, req); - return; - } - try { - const session = await kv.get(KV.sessions, body.sessionId); - if (session) { - await kv.set(KV.sessions, body.sessionId, { - ...session, - endedAt: new Date().toISOString(), - status: "completed", - }); - } - json(res, 200, { success: true }, req); - } catch { - json(res, 200, { success: false }, req); - } - return; - } - - if (path === "summarize") { - try { - const result = await sdk.trigger("mem::summarize", body); - json(res, 200, result, req); - } catch { - json(res, 200, { error: "summarize unavailable" }, req); - } + const controller = new AbortController(); + const fetchTimeout = setTimeout(() => controller.abort(), 10000); + let upstream: Response; + try { + upstream = await fetch(upstreamUrl, { + method, + headers, + body: body || undefined, + signal: controller.signal, + }); + clearTimeout(fetchTimeout); + } catch (err) { + clearTimeout(fetchTimeout); + if (err instanceof Error && err.name === "AbortError") { + json(res, 504, { error: "upstream timeout" }, req); return; } + throw err; } - if (method === "DELETE" && path === "governance/memories") { - const ct = req.headers["content-type"] || ""; - if (ct && !ct.includes("application/json")) { - json(res, 415, { error: "Content-Type must be application/json" }, req); - return; - } - let body: Record = {}; - try { - const raw = await readBody(req); - if (raw.trim()) body = JSON.parse(raw); - } catch { - json(res, 400, { error: "invalid JSON" }, req); - return; - } - try { - const result = await sdk.trigger("mem::governance-delete", body); - json(res, 200, result, req); - } catch { - json(res, 200, { success: false }, req); - } - return; + const cors = corsHeaders(req); + const responseBody = await upstream.text(); + const responseHeaders: Record = { + ...cors, + }; + const upstreamCt = upstream.headers.get("content-type"); + if (upstreamCt) { + responseHeaders["Content-Type"] = upstreamCt; } - json(res, 404, { error: "not found" }, req); + res.writeHead(upstream.status, responseHeaders); + res.end(responseBody); }