Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion src/router/container-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,17 @@ function buildSnapshotImageName(projectId: string, workItemId: string): string {
* On failure the error is logged and swallowed — snapshot failure must not
* break the normal post-run flow.
*/
async function inspectImageSizeBestEffort(imageName: string): Promise<number | undefined> {
try {
const image = docker.getImage(imageName);
if (!image) return undefined;
const info = (await image.inspect()) as { Size?: number } | undefined;
return info?.Size;
} catch {
return undefined;
}
}

async function commitContainerToSnapshot(
containerId: string,
projectId: string,
Expand All @@ -88,12 +99,17 @@ async function commitContainerToSnapshot(
try {
const container = docker.getContainer(containerId);
await container.commit({ repo: imageName.split(':')[0], tag: 'latest' });
registerSnapshot(projectId, workItemId, imageName);
// Populate the image size on the registered metadata so max-size
// eviction actually fires. Inspecting is best-effort — without size,
// the entry still gets TTL/max-count eviction.
const imageSize = await inspectImageSizeBestEffort(imageName);
registerSnapshot(projectId, workItemId, imageName, imageSize);
logger.info('[WorkerManager] Committed container to snapshot image:', {
containerId: containerId.slice(0, 12),
imageName,
projectId,
workItemId,
imageSizeBytes: imageSize,
});
} catch (err) {
logger.warn('[WorkerManager] Failed to commit container to snapshot (non-fatal):', {
Expand Down
76 changes: 67 additions & 9 deletions src/router/snapshot-cleanup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,23 @@
* Runs alongside the existing orphan cleanup loop (orphan-cleanup.ts) and
* uses the same start/stop lifecycle pattern. On each tick it calls
* evictSnapshots() to enforce the per-project TTL and global max-count /
* max-size budget limits.
* max-size budget limits, then `docker rmi`s every evicted entry's image.
*
* This module owns only the timer — no Docker API usage. The actual eviction
* logic lives in snapshot-manager.ts.
* The Docker rmi step is critical: prior to PR #1132 the eviction loop only
* cleared the in-memory metadata Map and never freed the underlying images,
* which leaked ~3 GB per work item until the host disk filled.
*/

import Docker from 'dockerode';
import { captureException } from '../sentry.js';
import { logger } from '../utils/logging.js';
import { routerConfig } from './config.js';
import { evictSnapshots } from './snapshot-manager.js';
import { evictSnapshots, type SnapshotMetadata } from './snapshot-manager.js';

const SNAPSHOT_CLEANUP_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes

const docker = new Docker();

/** Periodic snapshot cleanup timer */
let snapshotCleanupTimer: NodeJS.Timeout | null = null;

Expand Down Expand Up @@ -56,9 +60,61 @@ export function stopSnapshotCleanup(): void {
}
}

interface DockerErrorShape {
statusCode?: number;
}

function dockerStatusCode(err: unknown): number | undefined {
if (err && typeof err === 'object' && 'statusCode' in err) {
const code = (err as DockerErrorShape).statusCode;
return typeof code === 'number' ? code : undefined;
}
return undefined;
}

/**
* Remove a snapshot image from Docker. `force: false` so an image still backing
* a running container is preserved (Docker returns 409). 404 means the image
* has already been removed by some other path. Both are harmless and silent.
*/
async function removeSnapshotImage(metadata: SnapshotMetadata): Promise<void> {
try {
await docker.getImage(metadata.imageName).remove({ force: false });
logger.info('[SnapshotCleanup] Removed snapshot image:', {
imageName: metadata.imageName,
});
} catch (err: unknown) {
const status = dockerStatusCode(err);
if (status === 409) {
logger.debug('[SnapshotCleanup] Snapshot image in use, deferring:', {
imageName: metadata.imageName,
});
return;
}
if (status === 404) {
logger.debug('[SnapshotCleanup] Snapshot image already gone:', {
imageName: metadata.imageName,
});
return;
}
logger.warn('[SnapshotCleanup] Failed to remove snapshot image:', {
imageName: metadata.imageName,
error: String(err),
});
captureException(err, {
tags: { source: 'snapshot_image_remove' },
extra: { imageName: metadata.imageName },
level: 'warning',
});
}
}

/**
* Run a single snapshot eviction sweep using the global config limits.
* Exposed for testing and for manual invocation.
* Run a single snapshot eviction sweep using the global config limits, then
* `docker rmi` each evicted image.
*
* Exposed for testing and for manual invocation (e.g. immediately after
* startup-sync registers orphan images).
* @internal Exported for testing
*/
export async function runSnapshotCleanup(): Promise<void> {
Expand All @@ -68,7 +124,9 @@ export async function runSnapshotCleanup(): Promise<void> {
routerConfig.snapshotMaxSizeBytes,
);

if (evicted > 0) {
logger.info('[SnapshotCleanup] Snapshot cleanup scan removed entries:', { evicted });
}
if (evicted.length === 0) return;

await Promise.all(evicted.map(removeSnapshotImage));

logger.info('[SnapshotCleanup] Cleanup pass complete:', { count: evicted.length });
}
73 changes: 62 additions & 11 deletions src/router/snapshot-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,24 @@ export interface SnapshotMetadata {
/** In-memory snapshot registry keyed by `${projectId}:${workItemId}` */
const snapshots = new Map<string, SnapshotMetadata>();

/** Synthetic projectId used for entries discovered on disk at startup. */
const DISCOVERED_PROJECT_ID = '__discovered__';

function snapshotKey(projectId: string, workItemId: string): string {
return `${projectId}:${workItemId}`;
}

function discoveredKey(imageName: string): string {
return snapshotKey(DISCOVERED_PROJECT_ID, imageName);
}

/**
* Register or refresh snapshot metadata for a project+workItem pair.
* Overwrites any existing entry for the same key.
*
* Also drops any "discovered" entry that points to the same image, so the
* startup-sync orphan tracking doesn't double-count an image that's now
* being actively managed.
*/
export function registerSnapshot(
projectId: string,
Expand All @@ -60,6 +71,9 @@ export function registerSnapshot(
imageSizeBytes,
};
snapshots.set(key, metadata);
// Drop any orphan-tracking entry for the same image — the real registration
// supersedes it.
snapshots.delete(discoveredKey(imageName));
logger.info('[SnapshotManager] Snapshot registered:', {
projectId,
workItemId,
Expand All @@ -68,6 +82,43 @@ export function registerSnapshot(
return metadata;
}

/**
* Track a snapshot image discovered on disk at startup so the cleanup loop
* can apply TTL/max-count/max-size limits to it.
*
* The (projectId, workItemId) pair cannot be reliably parsed from the
* sanitised composite image name — sanitisation collapses runs of '-' so
* `cascade-snapshot-llmist-mng-93:latest` is genuinely ambiguous. We sidestep
* that by keying discovered entries on the image name itself under a synthetic
* project (`__discovered__`). Eviction works the same way regardless of key.
*
* No-op when an entry already exists for this image, either as a discovered
* orphan or as a real registration.
*/
export function registerDiscoveredSnapshot(
imageName: string,
createdAt: Date,
imageSizeBytes: number,
): void {
const key = discoveredKey(imageName);
if (snapshots.has(key)) return;
for (const m of snapshots.values()) {
if (m.imageName === imageName) return;
}
snapshots.set(key, {
imageName,
projectId: DISCOVERED_PROJECT_ID,
workItemId: imageName,
createdAt,
imageSizeBytes,
});
logger.debug('[SnapshotManager] Discovered orphan snapshot tracked:', {
imageName,
createdAt: createdAt.toISOString(),
imageSizeBytes,
});
}

/**
* Look up snapshot metadata for a project+workItem pair.
* Returns undefined if no snapshot exists or if the snapshot has exceeded the
Expand Down Expand Up @@ -135,25 +186,25 @@ export function getSnapshotCount(): number {
* 3. Max-size: if still over-budget, remove oldest entries until estimated
* total size is at or below snapshotMaxSizeBytes.
*
* Returns the number of entries removed.
*
* This function operates only on the in-memory metadata registry. It does NOT
* remove Docker images — callers are responsible for any Docker cleanup.
* Returns the metadata of every evicted entry so the caller can do the
* matching `docker rmi`. Removing the in-memory entry without removing the
* underlying image (the prior behaviour of this function) is the leak that
* filled the dev disk to 100% — callers MUST act on the returned list.
*/
export function evictSnapshots(
ttlMs: number = routerConfig.snapshotDefaultTtlMs,
maxCount: number = routerConfig.snapshotMaxCount,
maxSizeBytes: number = routerConfig.snapshotMaxSizeBytes,
): number {
let evicted = 0;
): SnapshotMetadata[] {
const evicted: SnapshotMetadata[] = [];
const now = Date.now();

// Phase 1: TTL eviction — remove all expired entries
for (const [key, metadata] of snapshots) {
const ageMs = now - metadata.createdAt.getTime();
if (ageMs > ttlMs) {
snapshots.delete(key);
evicted++;
evicted.push(metadata);
logger.info('[SnapshotManager] Evicted expired snapshot:', {
projectId: metadata.projectId,
workItemId: metadata.workItemId,
Expand All @@ -172,7 +223,7 @@ export function evictSnapshots(
for (let i = 0; i < toRemove; i++) {
const [key, metadata] = sorted[i];
snapshots.delete(key);
evicted++;
evicted.push(metadata);
logger.info('[SnapshotManager] Evicted snapshot (over max-count):', {
projectId: metadata.projectId,
workItemId: metadata.workItemId,
Expand All @@ -195,7 +246,7 @@ export function evictSnapshots(
for (const [key, metadata] of sorted) {
if (runningSize <= maxSizeBytes) break;
snapshots.delete(key);
evicted++;
evicted.push(metadata);
runningSize -= metadata.imageSizeBytes ?? 0;
logger.info('[SnapshotManager] Evicted snapshot (over max-size):', {
projectId: metadata.projectId,
Expand All @@ -207,9 +258,9 @@ export function evictSnapshots(
}
}

if (evicted > 0) {
if (evicted.length > 0) {
logger.info('[SnapshotManager] Eviction sweep complete:', {
evicted,
evicted: evicted.length,
remaining: snapshots.size,
});
}
Expand Down
67 changes: 67 additions & 0 deletions src/router/snapshot-startup-sync.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/**
* Snapshot startup reconciliation.
*
* Called once at router boot. Lists all `cascade-snapshot-*` images currently
* on disk and registers each one as a "discovered" snapshot in the in-memory
* registry, so the regular cleanup loop can apply TTL/max-count/max-size
* policies to them.
*
* Without this, every snapshot for a work item that never re-runs is orphaned
* forever (the in-memory registry is process-local; restarts wipe it). Exactly
* the leak that filled the dev disk to 100% with 40 GB of three-week-old
* llmist Trello snapshots.
*
* Best-effort: a Docker outage at boot must not block router startup.
*/

import Docker from 'dockerode';
import { captureException } from '../sentry.js';
import { logger } from '../utils/logging.js';
import { runSnapshotCleanup } from './snapshot-cleanup.js';
import { registerDiscoveredSnapshot } from './snapshot-manager.js';

const SNAPSHOT_IMAGE_PREFIX = 'cascade-snapshot-';

const docker = new Docker();

interface DockerImageSummary {
RepoTags?: string[] | null;
Created: number;
Size: number;
}

function isCascadeSnapshotTag(tag: string): boolean {
return tag.startsWith(SNAPSHOT_IMAGE_PREFIX);
}

/**
* Discover existing snapshot images on disk and register them. Always runs the
* cleanup sweep at the end so TTL/max-count/max-size policies apply
* immediately to whatever was just registered (and to anything left over from
* a previous run that the registry already knew about).
*/
export async function syncSnapshotsFromDocker(): Promise<void> {
let registered = 0;
try {
const images = (await docker.listImages()) as DockerImageSummary[];
for (const img of images) {
const tags = img.RepoTags ?? [];
for (const tag of tags) {
if (!isCascadeSnapshotTag(tag)) continue;
registerDiscoveredSnapshot(tag, new Date(img.Created * 1000), img.Size);
registered++;
}
}
logger.info('[SnapshotStartupSync] Reconciled snapshot images from Docker:', { registered });
} catch (err) {
logger.warn('[SnapshotStartupSync] Failed to sync snapshots from Docker:', {
error: String(err),
});
captureException(err, {
tags: { source: 'snapshot_startup_sync' },
level: 'warning',
});
}

await runSnapshotCleanup();
}
10 changes: 10 additions & 0 deletions src/router/worker-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
} from './container-manager.js';
import type { CascadeJob } from './queue.js';
import { startSnapshotCleanup, stopSnapshotCleanup } from './snapshot-cleanup.js';
import { syncSnapshotsFromDocker } from './snapshot-startup-sync.js';

// Re-export container-manager public API so existing callers are unaffected.
export { getActiveWorkerCount, getActiveWorkers, startOrphanCleanup, stopOrphanCleanup };
Expand Down Expand Up @@ -82,6 +83,15 @@ export function startWorkerProcessor(): void {
// Start periodic snapshot eviction alongside orphan cleanup
startSnapshotCleanup();

// Reconcile pre-existing snapshot images on disk so the eviction loop can
// apply TTL/max-count/max-size policies to them. Best-effort — Docker
// outage at boot must not block the worker manager from starting.
void syncSnapshotsFromDocker().catch((err) => {
logger.warn('[WorkerManager] Snapshot startup sync failed (continuing):', {
error: String(err),
});
});

logger.info('[WorkerManager] Started with max', routerConfig.maxWorkers, 'concurrent workers');
}

Expand Down
Loading
Loading