Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions src/functions/auto-forget.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,27 @@ export function registerAutoForgetFunction(sdk: ISdk, kv: StateKV): void {
}

const sessions = await kv.list<Session>(KV.sessions);
for (const session of sessions) {
const observations = await kv
.list<CompressedObservation>(KV.observations(session.id))
.catch(() => []);
for (const obs of observations) {
const obsPerSession: CompressedObservation[][] = [];
for (let batch = 0; batch < sessions.length; batch += 10) {
const chunk = sessions.slice(batch, batch + 10);
const results = await Promise.all(
chunk.map((s) =>
kv
.list<CompressedObservation>(KV.observations(s.id))
.catch(() => [] as CompressedObservation[]),
),
);
obsPerSession.push(...results);
}
for (let i = 0; i < sessions.length; i++) {
for (const obs of obsPerSession[i]) {
if (!obs.timestamp) continue;
const age = now - new Date(obs.timestamp).getTime();
if (age > 180 * MS_PER_DAY && (obs.importance ?? 5) <= 2) {
result.lowValueObs.push(obs.id);
if (!dryRun) {
await kv
.delete(KV.observations(session.id), obs.id)
.delete(KV.observations(sessions[i].id), obs.id)
.catch(() => {});
}
}
Expand Down
19 changes: 14 additions & 5 deletions src/functions/consolidate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,22 @@ export function registerConsolidateFunction(
: sessions;

const allObs: Array<CompressedObservation & { sid: string }> = [];
for (const session of filtered) {
const observations = await kv.list<CompressedObservation>(
KV.observations(session.id),
const obsPerSession: CompressedObservation[][] = [];
for (let batch = 0; batch < filtered.length; batch += 10) {
const chunk = filtered.slice(batch, batch + 10);
const results = await Promise.all(
chunk.map((s) =>
kv
.list<CompressedObservation>(KV.observations(s.id))
.catch(() => [] as CompressedObservation[]),
),
);
for (const obs of observations) {
obsPerSession.push(...results);
}
Comment on lines +82 to +92
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid silently dropping failed session fetches during consolidation.

At Line 88, failures are converted to empty arrays without any warning, so consolidation can quietly produce incomplete output.

Proposed fix
 const allObs: Array<CompressedObservation & { sid: string }> = [];
 const obsPerSession: CompressedObservation[][] = [];
+const failedSessions: string[] = [];
 for (let batch = 0; batch < filtered.length; batch += 10) {
   const chunk = filtered.slice(batch, batch + 10);
   const results = await Promise.all(
-    chunk.map((s) =>
-      kv
-        .list<CompressedObservation>(KV.observations(s.id))
-        .catch(() => [] as CompressedObservation[]),
-    ),
+    chunk.map(async (s) => {
+      try {
+        return await kv.list<CompressedObservation>(KV.observations(s.id));
+      } catch {
+        failedSessions.push(s.id);
+        return [] as CompressedObservation[];
+      }
+    }),
   );
   obsPerSession.push(...results);
 }
+if (failedSessions.length > 0) {
+  ctx.logger.warn("consolidate: failed to load observations for sessions", {
+    failedSessionsCount: failedSessions.length,
+  });
+}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/functions/consolidate.ts` around lines 82 - 92, The consolidation loop
currently swallows errors by converting failed kv.list calls to empty arrays
(using .catch(() => [])), which hides failed session fetches; update the
Promise.all mapping so that failures from kv.list(KV.observations(s.id)) are not
silently dropped: capture the error (include s.id and the error details) and
either log it via the existing logger or push an explicit failure marker so
obsPerSession can reflect the failure, and at the end of consolidate (or
immediately) surface or throw an aggregated error if any session fetches failed;
ensure references to filtered, KV.observations, CompressedObservation, and
obsPerSession are used to locate and update the behavior.

for (let i = 0; i < filtered.length; i++) {
for (const obs of obsPerSession[i]) {
if (obs.title && obs.importance >= 5) {
allObs.push({ ...obs, sid: session.id });
allObs.push({ ...obs, sid: filtered[i].id });
}
}
}
Expand Down
32 changes: 24 additions & 8 deletions src/functions/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,15 @@ export function registerContextFunction(
)
.slice(0, 10);

for (const session of sessions) {
const summary = await kv.get<SessionSummary>(KV.summaries, session.id);
const summariesPerSession = await Promise.all(
sessions.map((s) =>
kv.get<SessionSummary>(KV.summaries, s.id).catch(() => null),
),
);

const sessionsNeedingObs: number[] = [];
for (let i = 0; i < sessions.length; i++) {
const summary = summariesPerSession[i];
if (summary) {
const content = `## ${summary.title}\n${summary.narrative}\nDecisions: ${summary.keyDecisions.join("; ")}\nFiles: ${summary.filesModified.join(", ")}`;
blocks.push({
Expand All @@ -97,12 +103,22 @@ export function registerContextFunction(
tokens: estimateTokens(content),
recency: new Date(summary.createdAt).getTime(),
});
continue;
} else {
sessionsNeedingObs.push(i);
}
}

const observations = await kv.list<CompressedObservation>(
KV.observations(session.id),
);
const obsResults = await Promise.all(
sessionsNeedingObs.map((i) =>
kv
.list<CompressedObservation>(KV.observations(sessions[i].id))
.catch(() => []),
),
);

for (let j = 0; j < sessionsNeedingObs.length; j++) {
const i = sessionsNeedingObs[j];
const observations = obsResults[j];
const important = observations.filter(
(o) => o.title && o.importance >= 5,
);
Expand All @@ -113,12 +129,12 @@ export function registerContextFunction(
.slice(0, 5)
.map((o) => `- [${o.type}] ${o.title}: ${o.narrative}`)
.join("\n");
const content = `## Session ${session.id.slice(0, 8)} (${session.startedAt})\n${items}`;
const content = `## Session ${sessions[i].id.slice(0, 8)} (${sessions[i].startedAt})\n${items}`;
blocks.push({
type: "observation",
content,
tokens: estimateTokens(content),
recency: new Date(session.startedAt).getTime(),
recency: new Date(sessions[i].startedAt).getTime(),
});
}
}
Expand Down
104 changes: 68 additions & 36 deletions src/functions/export-import.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import type {
Sketch,
Crystal,
Facet,
ExportPagination,
} from "../types.js";
import { KV } from "../state/schema.js";
import { StateKV } from "../state/kv.js";
Expand All @@ -28,59 +29,80 @@ import { VERSION } from "../version.js";
export function registerExportImportFunction(sdk: ISdk, kv: StateKV): void {
sdk.registerFunction(
{ id: "mem::export", description: "Export all memory data as JSON" },
async () => {
async (data?: { maxSessions?: number; offset?: number }) => {
const ctx = getContext();
const rawMax = Number(data?.maxSessions);
const maxSessions = Number.isFinite(rawMax) && rawMax > 0 ? Math.min(Math.floor(rawMax), 1000) : undefined;
const rawOffset = Number(data?.offset);
const offset = Number.isFinite(rawOffset) && rawOffset >= 0 ? Math.floor(rawOffset) : 0;

const sessions = await kv.list<Session>(KV.sessions);
const allSessions = await kv.list<Session>(KV.sessions);
const paginatedSessions = maxSessions !== undefined
? allSessions.slice(offset, offset + maxSessions)
: allSessions;
const memories = await kv.list<Memory>(KV.memories);
const summaries = await kv.list<SessionSummary>(KV.summaries);

const observations: Record<string, CompressedObservation[]> = {};
for (const session of sessions) {
const obs = await kv
.list<CompressedObservation>(KV.observations(session.id))
.catch(() => []);
const obsResults = await Promise.all(
paginatedSessions.map((session) =>
kv
.list<CompressedObservation>(KV.observations(session.id))
.catch(() => [] as CompressedObservation[])
.then((obs) => ({ sessionId: session.id, obs })),
),
);
for (const { sessionId, obs } of obsResults) {
if (obs.length > 0) {
observations[session.id] = obs;
observations[sessionId] = obs;
}
}

const profiles: ProjectProfile[] = [];
const uniqueProjects = [...new Set(sessions.map((s) => s.project))];
for (const project of uniqueProjects) {
const profile = await kv
.get<ProjectProfile>(KV.profiles, project)
.catch(() => null);
const uniqueProjects = [...new Set(paginatedSessions.map((s) => s.project))];
const profileResults = await Promise.all(
uniqueProjects.map((project) =>
kv.get<ProjectProfile>(KV.profiles, project).catch(() => null),
),
);
for (const profile of profileResults) {
if (profile) profiles.push(profile);
}

const graphNodes = await kv
.list<GraphNode>(KV.graphNodes)
.catch(() => []);
const graphEdges = await kv
.list<GraphEdge>(KV.graphEdges)
.catch(() => []);
const semanticMemories = await kv
.list<SemanticMemory>(KV.semantic)
.catch(() => []);
const proceduralMemories = await kv
.list<ProceduralMemory>(KV.procedural)
.catch(() => []);

const actions = await kv.list<Action>(KV.actions).catch(() => []);
const actionEdges = await kv.list<ActionEdge>(KV.actionEdges).catch(() => []);
const sentinels = await kv.list<Sentinel>(KV.sentinels).catch(() => []);
const sketches = await kv.list<Sketch>(KV.sketches).catch(() => []);
const crystals = await kv.list<Crystal>(KV.crystals).catch(() => []);
const facets = await kv.list<Facet>(KV.facets).catch(() => []);
const routines = await kv.list<Routine>(KV.routines).catch(() => []);
const signals = await kv.list<Signal>(KV.signals).catch(() => []);
const checkpoints = await kv.list<Checkpoint>(KV.checkpoints).catch(() => []);
const [
graphNodes,
graphEdges,
semanticMemories,
proceduralMemories,
actions,
actionEdges,
sentinels,
sketches,
crystals,
facets,
routines,
signals,
checkpoints,
] = await Promise.all([
kv.list<GraphNode>(KV.graphNodes).catch(() => []),
kv.list<GraphEdge>(KV.graphEdges).catch(() => []),
kv.list<SemanticMemory>(KV.semantic).catch(() => []),
kv.list<ProceduralMemory>(KV.procedural).catch(() => []),
kv.list<Action>(KV.actions).catch(() => []),
kv.list<ActionEdge>(KV.actionEdges).catch(() => []),
kv.list<Sentinel>(KV.sentinels).catch(() => []),
kv.list<Sketch>(KV.sketches).catch(() => []),
kv.list<Crystal>(KV.crystals).catch(() => []),
kv.list<Facet>(KV.facets).catch(() => []),
kv.list<Routine>(KV.routines).catch(() => []),
kv.list<Signal>(KV.signals).catch(() => []),
kv.list<Checkpoint>(KV.checkpoints).catch(() => []),
]);

const exportData: ExportData = {
version: VERSION,
exportedAt: new Date().toISOString(),
sessions,
sessions: paginatedSessions,
observations,
memories,
summaries,
Expand All @@ -102,12 +124,22 @@ export function registerExportImportFunction(sdk: ISdk, kv: StateKV): void {
checkpoints: checkpoints.length > 0 ? checkpoints : undefined,
};

if (maxSessions !== undefined) {
exportData.pagination = {
offset,
limit: maxSessions,
total: allSessions.length,
hasMore: offset + maxSessions < allSessions.length,
};
}

const totalObs = Object.values(observations).reduce(
(sum, arr) => sum + arr.length,
0,
);
ctx.logger.info("Export complete", {
sessions: sessions.length,
sessions: paginatedSessions.length,
totalSessions: allSessions.length,
observations: totalObs,
memories: memories.length,
summaries: summaries.length,
Expand Down
16 changes: 12 additions & 4 deletions src/functions/profile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,18 @@ export function registerProfileFunction(sdk: ISdk, kv: StateKV): void {
new Date(b.startedAt).getTime() - new Date(a.startedAt).getTime(),
);

for (const session of sortedSessions.slice(0, 20)) {
const observations = await kv.list<CompressedObservation>(
KV.observations(session.id),
);
const top20Sessions = sortedSessions.slice(0, 20);
const obsPerSession = await Promise.all(
top20Sessions.map((s) =>
kv
.list<CompressedObservation>(KV.observations(s.id))
.catch(() => [] as CompressedObservation[]),
),
);

for (let i = 0; i < top20Sessions.length; i++) {
const session = top20Sessions[i];
const observations = obsPerSession[i];
totalObs += observations.length;

for (const obs of observations) {
Expand Down
23 changes: 21 additions & 2 deletions src/functions/search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,27 @@ export async function rebuildIndex(kv: StateKV): Promise<number> {
if (!sessions.length) return 0

let count = 0
for (const session of sessions) {
const observations = await kv.list<CompressedObservation>(KV.observations(session.id))
const obsPerSession: CompressedObservation[][] = []
const failedSessions: string[] = []
for (let batch = 0; batch < sessions.length; batch += 10) {
const chunk = sessions.slice(batch, batch + 10)
const results = await Promise.all(
chunk.map(async (s) => {
try {
return await kv.list<CompressedObservation>(KV.observations(s.id))
} catch {
failedSessions.push(s.id)
return [] as CompressedObservation[]
}
})
)
obsPerSession.push(...results)
}
if (failedSessions.length > 0) {
const ctx = getContext()
ctx.logger.warn('rebuildIndex: failed to load observations for sessions', { failedSessions })
}
for (const observations of obsPerSession) {
for (const obs of observations) {
if (obs.title && obs.narrative) {
idx.add(obs)
Expand Down
Loading