Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions src/router/container-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,125 @@ export interface ActiveWorker {

const activeWorkers = new Map<string, ActiveWorker>();

/**
* Periodic orphan cleanup timer — scans for containers with cascade.managed=true
* that are not tracked in activeWorkers map and are older than workerTimeoutMs.
*/
let orphanCleanupTimer: NodeJS.Timeout | null = null;

/**
* Start periodic orphaned container cleanup.
* Scans every 5 minutes for containers with cascade.managed=true label
* that are not in the activeWorkers map and are older than workerTimeoutMs.
* Stopped containers are logged at warn level with container ID and age.
*/
export function startOrphanCleanup(): void {
if (orphanCleanupTimer) {
logger.warn('[WorkerManager] Orphan cleanup already started');
return;
}

const ORPHAN_SCAN_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes

orphanCleanupTimer = setInterval(() => {
scanAndCleanupOrphans().catch((err) => {
logger.error('[WorkerManager] Error during orphan cleanup scan:', err);
captureException(err, {
tags: { source: 'orphan_cleanup_scan' },
level: 'error',
});
});
}, ORPHAN_SCAN_INTERVAL_MS);

logger.info('[WorkerManager] Started orphan cleanup scan (every 5 minutes)');
}

/**
* Stop periodic orphaned container cleanup.
* Clears the scan timer.
*/
export function stopOrphanCleanup(): void {
if (orphanCleanupTimer) {
clearInterval(orphanCleanupTimer);
orphanCleanupTimer = null;
logger.info('[WorkerManager] Stopped orphan cleanup scan');
}
}

/**
* Scan for orphaned containers and stop them.
* Containers are considered orphaned if:
* 1. They have cascade.managed=true label
* 2. They are NOT in the activeWorkers map (tracked)
* 3. They are older than workerTimeoutMs (avoid killing recently-spawned workers)
* @internal Exported for testing
*/
export async function scanAndCleanupOrphans(): Promise<void> {
try {
const containers = await docker.listContainers({
all: false, // Only running containers
filters: {
label: ['cascade.managed=true'],
},
});

const now = Date.now();
let stoppedCount = 0;

for (const containerInfo of containers) {
const containerId = containerInfo.Id;

// Check if this container is tracked in activeWorkers
const isTracked = Array.from(activeWorkers.values()).some(
(w) => w.containerId === containerId,
);

if (isTracked) {
// Don't touch tracked containers
continue;
}

// Check container age — only stop if older than workerTimeoutMs
const containerCreatedMs = containerInfo.Created * 1000;
const ageMs = now - containerCreatedMs;

if (ageMs < routerConfig.workerTimeoutMs) {
// Too young — might be a newly-spawned worker not yet registered
continue;
}

// This is an orphan — stop it
try {
const container = docker.getContainer(containerId);
await container.stop({ t: 15 }); // 15 second graceful shutdown

stoppedCount++;
const ageMinutes = Math.round(ageMs / 60000);
logger.warn('[WorkerManager] Stopped orphaned container:', {
containerId: containerId.slice(0, 12),
ageMinutes,
});
} catch (err) {
// Container might already be stopped — log but continue
logger.warn('[WorkerManager] Error stopping orphaned container:', {
containerId: containerId.slice(0, 12),
error: String(err),
});
}
}

if (stoppedCount > 0) {
logger.info('[WorkerManager] Orphan cleanup scan completed:', {
stoppedCount,
totalContainers: containers.length,
});
}
} catch (err) {
logger.error('[WorkerManager] Failed to list containers for orphan cleanup:', err);
throw err;
}
}

/**
* Extract projectId from job data for credential resolution.
* Different job types have the projectId in different locations.
Expand Down Expand Up @@ -409,4 +528,5 @@ export function detachAll(): void {
activeWorkers.clear();
clearAllWorkItemLocks();
clearAllAgentTypeLocks();
stopOrphanCleanup();
}
10 changes: 9 additions & 1 deletion src/router/worker-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ import {
getActiveWorkerCount,
getActiveWorkers,
spawnWorker,
startOrphanCleanup,
stopOrphanCleanup,
} from './container-manager.js';
import type { CascadeJob } from './queue.js';

// Re-export container-manager public API so existing callers are unaffected.
export { getActiveWorkerCount, getActiveWorkers };
export { getActiveWorkerCount, getActiveWorkers, startOrphanCleanup, stopOrphanCleanup };

// BullMQ Workers that process jobs by spawning containers
let bullWorker: Worker<CascadeJob> | null = null;
Expand Down Expand Up @@ -68,11 +70,17 @@ export function startWorkerProcessor(): void {
processFn: (job) => guardedSpawn(job as Job<CascadeJob>),
});

// Start periodic orphan cleanup scan
startOrphanCleanup();

logger.info('[WorkerManager] Started with max', routerConfig.maxWorkers, 'concurrent workers');
}

// Graceful shutdown — detach from workers, let them finish independently
export async function stopWorkerProcessor(): Promise<void> {
// Stop orphan cleanup first
stopOrphanCleanup();

if (dashboardWorker) {
await dashboardWorker.close();
dashboardWorker = null;
Expand Down
Loading
Loading