Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 177 additions & 18 deletions src/router/container-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
* Each BullMQ job gets its own isolated Docker container.
*
* State management, env building, and orphan cleanup are in dedicated modules:
* - active-workers.ts — ActiveWorker state tracking
* - worker-env.ts — Job data parsing + env building
* - orphan-cleanup.ts — Periodic orphan container cleanup
* - active-workers.ts — ActiveWorker state tracking
* - worker-env.ts — Job data parsing + env building
* - orphan-cleanup.ts — Periodic orphan container cleanup
* - snapshot-manager.ts — Snapshot metadata registry
*/

import type { Job } from 'bullmq';
Expand All @@ -21,6 +22,7 @@ import { loadProjectConfig, routerConfig } from './config.js';
import { notifyTimeout } from './notifications.js';
import { stopOrphanCleanup } from './orphan-cleanup.js';
import type { CascadeJob } from './queue.js';
import { getSnapshot, registerSnapshot } from './snapshot-manager.js';
import { clearAllWorkItemLocks } from './work-item-lock.js';
import {
buildWorkerEnvWithProjectId,
Expand All @@ -46,15 +48,150 @@ export {
buildWorkerEnv,
extractProjectIdFromJob,
} from './worker-env.js';
export {
getSnapshot,
invalidateSnapshot,
registerSnapshot,
} from './snapshot-manager.js';

const docker = new Docker();

/** Buffer added on top of the in-container watchdog so the router kill is always a backstop. */
const ROUTER_KILL_BUFFER_MS = 2 * 60 * 1000;

/**
* Build a stable Docker image name for a snapshot.
* Uses a sanitised project+workItem key so it's valid as a Docker image tag.
*/
function buildSnapshotImageName(projectId: string, workItemId: string): string {
// Sanitise: lowercase, replace non-alphanumeric with '-', collapse runs
const sanitise = (s: string) =>
s
.toLowerCase()
.replace(/[^a-z0-9]/g, '-')
.replace(/-+/g, '-')
.replace(/^-|-$/g, '');
return `cascade-snapshot-${sanitise(projectId)}-${sanitise(workItemId)}:latest`;
}

/**
* Commit a container to a snapshot image and register the metadata.
* On failure the error is logged and swallowed — snapshot failure must not
* break the normal post-run flow.
*/
async function commitContainerToSnapshot(
containerId: string,
projectId: string,
workItemId: string,
): Promise<void> {
const imageName = buildSnapshotImageName(projectId, workItemId);
try {
const container = docker.getContainer(containerId);
await container.commit({ repo: imageName.split(':')[0], tag: 'latest' });
registerSnapshot(projectId, workItemId, imageName);
logger.info('[WorkerManager] Committed container to snapshot image:', {
containerId: containerId.slice(0, 12),
imageName,
projectId,
workItemId,
});
} catch (err) {
logger.warn('[WorkerManager] Failed to commit container to snapshot (non-fatal):', {
containerId: containerId.slice(0, 12),
imageName,
error: String(err),
});
captureException(err, {
tags: { source: 'snapshot_commit' },
extra: { containerId, imageName, projectId, workItemId },
level: 'warning',
});
}
}

/**
* Remove a container (used after manual snapshot commit to clean up).
* Swallows errors — the container may already be removed.
*/
async function removeContainer(containerId: string): Promise<void> {
try {
const container = docker.getContainer(containerId);
await container.remove({ force: true });
} catch {
// Container may already be removed — not an error
}
}

interface SpawnSettings {
snapshotEnabled: boolean;
workerImage: string;
containerTimeoutMs: number;
snapshotTtlMs: number;
}

/**
* Resolve per-project spawn settings (snapshot flag, image, timeout).
* Centralises all loadProjectConfig() calls so spawnWorker stays simple.
*/
async function resolveSpawnSettings(
projectId: string | null,
workItemId: string | undefined,
jobId: string,
): Promise<SpawnSettings> {
let snapshotEnabled = false;
let workerImage = routerConfig.workerImage;
let containerTimeoutMs = routerConfig.workerTimeoutMs;
let snapshotTtlMs = routerConfig.snapshotDefaultTtlMs;

if (!projectId) return { snapshotEnabled, workerImage, containerTimeoutMs, snapshotTtlMs };

const { fullProjects } = await loadProjectConfig();
const projectCfg = fullProjects.find((p) => p.id === projectId);

// Project-level snapshotEnabled overrides the global default
snapshotEnabled = projectCfg?.snapshotEnabled ?? routerConfig.snapshotEnabled;

// Per-project TTL overrides the global default
snapshotTtlMs = projectCfg?.snapshotTtlMs ?? routerConfig.snapshotDefaultTtlMs;

if (snapshotEnabled && workItemId) {
const snapshot = getSnapshot(projectId, workItemId, snapshotTtlMs);
if (snapshot) {
logger.info('[WorkerManager] Snapshot hit — using snapshot image:', {
jobId,
imageName: snapshot.imageName,
projectId,
workItemId,
});
workerImage = snapshot.imageName;
} else {
logger.info('[WorkerManager] Snapshot miss — using base worker image:', {
jobId,
projectId,
workItemId,
});
}
}

// Determine container timeout: use project's watchdogTimeoutMs + buffer if available,
// falling back to the global workerTimeoutMs. This makes watchdogTimeoutMs the single source
// of truth — the in-container watchdog fires first, router kill is a backup.
if (projectCfg?.watchdogTimeoutMs) {
containerTimeoutMs = projectCfg.watchdogTimeoutMs + ROUTER_KILL_BUFFER_MS;
}

return { snapshotEnabled, workerImage, containerTimeoutMs, snapshotTtlMs };
}

/**
* Spawn a worker container for a job.
* Sets up timeout tracking and monitors container exit asynchronously.
*
* Snapshot behaviour (when the project has snapshotEnabled):
* - Prefers a valid snapshot image over the base worker image when available.
* - Disables AutoRemove so the container can be committed on clean exit.
* - On successful exit, commits the container to a snapshot image.
* - On failed/timed-out exit, does NOT create a snapshot.
*/
export async function spawnWorker(job: Job<CascadeJob>): Promise<void> {
const jobId = job.id ?? `unknown-${Date.now()}`;
Expand All @@ -69,42 +206,43 @@ export async function spawnWorker(job: Job<CascadeJob>): Promise<void> {
// (needed by orphan cleanup to narrow DB fallback queries to the right agent type)
const agentType = extractAgentType(job.data);

// Determine container timeout: use project's watchdogTimeoutMs + buffer if available,
// falling back to the global workerTimeoutMs. This makes watchdogTimeoutMs the single source
// of truth — the in-container watchdog fires first, router kill is a backup.
let containerTimeoutMs = routerConfig.workerTimeoutMs;
if (projectId) {
const { fullProjects } = await loadProjectConfig();
const projectCfg = fullProjects.find((p) => p.id === projectId);
if (projectCfg?.watchdogTimeoutMs) {
containerTimeoutMs = projectCfg.watchdogTimeoutMs + ROUTER_KILL_BUFFER_MS;
}
}
const workItemId = extractWorkItemId(job.data);

const { snapshotEnabled, workerImage, containerTimeoutMs } = await resolveSpawnSettings(
projectId,
workItemId,
jobId,
);

logger.info('[WorkerManager] Spawning worker:', {
jobId,
type: job.data.type,
containerName,
hasCredentials,
snapshotEnabled,
workerImage,
});

try {
const container = await docker.createContainer({
Image: routerConfig.workerImage,
Image: workerImage,
name: containerName,
Env: workerEnv,
HostConfig: {
Memory: routerConfig.workerMemoryMb * 1024 * 1024,
MemorySwap: routerConfig.workerMemoryMb * 1024 * 1024, // No swap
NetworkMode: routerConfig.dockerNetwork,
AutoRemove: true, // Clean up container on exit
// Disable AutoRemove for snapshot-enabled runs so the container remains
// available for docker commit after a successful exit.
AutoRemove: !snapshotEnabled,
},
Labels: {
'cascade.job.id': jobId,
'cascade.job.type': job.data.type,
'cascade.managed': 'true',
'cascade.project.id': projectId ?? '',
'cascade.agent.type': agentType ?? '',
'cascade.snapshot.enabled': snapshotEnabled ? 'true' : 'false',
},
});

Expand All @@ -129,7 +267,6 @@ export async function spawnWorker(job: Job<CascadeJob>): Promise<void> {
}, containerTimeoutMs);

// Track the worker
const workItemId = extractWorkItemId(job.data);
activeWorkers.set(jobId, {
containerId: container.id,
jobId,
Expand All @@ -150,7 +287,7 @@ export async function spawnWorker(job: Job<CascadeJob>): Promise<void> {
container
.wait()
.then(async (result) => {
// Collect worker logs before auto-removal
// Collect worker logs before removal
try {
const logs = await container.logs({
stdout: true,
Expand Down Expand Up @@ -179,6 +316,24 @@ export async function spawnWorker(job: Job<CascadeJob>): Promise<void> {
jobId,
statusCode: result.StatusCode,
});

// For snapshot-enabled runs, commit on clean exit and then remove the container.
// Failed or timed-out runs must NOT create a snapshot.
// Always remove — even when projectId/workItemId are absent — to avoid
// orphaning containers that ran with AutoRemove=false.
if (snapshotEnabled) {
if (result.StatusCode === 0 && projectId && workItemId) {
await commitContainerToSnapshot(container.id, projectId, workItemId);
} else if (result.StatusCode !== 0) {
logger.info('[WorkerManager] Skipping snapshot commit after non-zero exit:', {
jobId,
statusCode: result.StatusCode,
});
}
// Always remove the container manually since AutoRemove is disabled
await removeContainer(container.id);
}

cleanupWorker(jobId, result.StatusCode);
})
.catch((err) => {
Expand All @@ -187,6 +342,10 @@ export async function spawnWorker(job: Job<CascadeJob>): Promise<void> {
tags: { source: 'worker_wait', jobType: job.data.type },
extra: { jobId },
});
// Ensure container is cleaned up even on wait error (snapshot runs only)
if (snapshotEnabled) {
removeContainer(container.id).catch(() => {});
}
cleanupWorker(jobId);
});
} catch (err) {
Expand Down
11 changes: 9 additions & 2 deletions src/router/orphan-cleanup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,21 @@ export async function scanAndCleanupOrphans(): Promise<void> {
continue;
}

// This is an orphan — stop it
// This is an orphan — stop it and remove it.
// Remove is called unconditionally after stop: for non-snapshot containers
// (AutoRemove=true) Docker may already have removed them, in which case
// remove() is a harmless no-op; for snapshot containers (AutoRemove=false)
// it ensures stopped containers don't accumulate on disk.
try {
const container = docker.getContainer(containerId);
await container.stop({ t: 15 }); // 15 second graceful shutdown
await container.remove({ force: false }).catch(() => {
// Container may have been removed by Docker's AutoRemove — not an error
});

stoppedCount++;
const ageMinutes = Math.round(ageMs / 60000);
logger.warn('[WorkerManager] Stopped orphaned container:', {
logger.warn('[WorkerManager] Stopped and removed orphaned container:', {
containerId: containerId.slice(0, 12),
ageMinutes,
});
Expand Down
Loading
Loading