From ee93601e1d18db4d5c8429c28c55ca337578bc68 Mon Sep 17 00:00:00 2001 From: Cascade Bot Date: Fri, 13 Mar 2026 17:43:31 +0000 Subject: [PATCH] feat(router): add periodic orphaned container cleanup --- src/router/container-manager.ts | 120 +++++++++ src/router/worker-manager.ts | 10 +- tests/unit/router/container-manager.test.ts | 265 +++++++++++++++++++- tests/unit/router/worker-manager.test.ts | 17 ++ 4 files changed, 407 insertions(+), 5 deletions(-) diff --git a/src/router/container-manager.ts b/src/router/container-manager.ts index 09e39e52..91371762 100644 --- a/src/router/container-manager.ts +++ b/src/router/container-manager.ts @@ -35,6 +35,125 @@ export interface ActiveWorker { const activeWorkers = new Map(); +/** + * Periodic orphan cleanup timer — scans for containers with cascade.managed=true + * that are not tracked in activeWorkers map and are older than workerTimeoutMs. + */ +let orphanCleanupTimer: NodeJS.Timeout | null = null; + +/** + * Start periodic orphaned container cleanup. + * Scans every 5 minutes for containers with cascade.managed=true label + * that are not in the activeWorkers map and are older than workerTimeoutMs. + * Stopped containers are logged at warn level with container ID and age. + */ +export function startOrphanCleanup(): void { + if (orphanCleanupTimer) { + logger.warn('[WorkerManager] Orphan cleanup already started'); + return; + } + + const ORPHAN_SCAN_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes + + orphanCleanupTimer = setInterval(() => { + scanAndCleanupOrphans().catch((err) => { + logger.error('[WorkerManager] Error during orphan cleanup scan:', err); + captureException(err, { + tags: { source: 'orphan_cleanup_scan' }, + level: 'error', + }); + }); + }, ORPHAN_SCAN_INTERVAL_MS); + + logger.info('[WorkerManager] Started orphan cleanup scan (every 5 minutes)'); +} + +/** + * Stop periodic orphaned container cleanup. + * Clears the scan timer. + */ +export function stopOrphanCleanup(): void { + if (orphanCleanupTimer) { + clearInterval(orphanCleanupTimer); + orphanCleanupTimer = null; + logger.info('[WorkerManager] Stopped orphan cleanup scan'); + } +} + +/** + * Scan for orphaned containers and stop them. + * Containers are considered orphaned if: + * 1. They have cascade.managed=true label + * 2. They are NOT in the activeWorkers map (tracked) + * 3. They are older than workerTimeoutMs (avoid killing recently-spawned workers) + * @internal Exported for testing + */ +export async function scanAndCleanupOrphans(): Promise { + try { + const containers = await docker.listContainers({ + all: false, // Only running containers + filters: { + label: ['cascade.managed=true'], + }, + }); + + const now = Date.now(); + let stoppedCount = 0; + + for (const containerInfo of containers) { + const containerId = containerInfo.Id; + + // Check if this container is tracked in activeWorkers + const isTracked = Array.from(activeWorkers.values()).some( + (w) => w.containerId === containerId, + ); + + if (isTracked) { + // Don't touch tracked containers + continue; + } + + // Check container age — only stop if older than workerTimeoutMs + const containerCreatedMs = containerInfo.Created * 1000; + const ageMs = now - containerCreatedMs; + + if (ageMs < routerConfig.workerTimeoutMs) { + // Too young — might be a newly-spawned worker not yet registered + continue; + } + + // This is an orphan — stop it + try { + const container = docker.getContainer(containerId); + await container.stop({ t: 15 }); // 15 second graceful shutdown + + stoppedCount++; + const ageMinutes = Math.round(ageMs / 60000); + logger.warn('[WorkerManager] Stopped orphaned container:', { + containerId: containerId.slice(0, 12), + ageMinutes, + }); + } catch (err) { + // Container might already be stopped — log but continue + logger.warn('[WorkerManager] Error stopping orphaned container:', { + containerId: containerId.slice(0, 12), + error: String(err), + }); + } + } + + if (stoppedCount > 0) { + logger.info('[WorkerManager] Orphan cleanup scan completed:', { + stoppedCount, + totalContainers: containers.length, + }); + } + } catch (err) { + logger.error('[WorkerManager] Failed to list containers for orphan cleanup:', err); + throw err; + } +} + /** * Extract projectId from job data for credential resolution. * Different job types have the projectId in different locations. @@ -409,4 +528,5 @@ export function detachAll(): void { activeWorkers.clear(); clearAllWorkItemLocks(); clearAllAgentTypeLocks(); + stopOrphanCleanup(); } diff --git a/src/router/worker-manager.ts b/src/router/worker-manager.ts index 6ffcb657..58798157 100644 --- a/src/router/worker-manager.ts +++ b/src/router/worker-manager.ts @@ -16,11 +16,13 @@ import { getActiveWorkerCount, getActiveWorkers, spawnWorker, + startOrphanCleanup, + stopOrphanCleanup, } from './container-manager.js'; import type { CascadeJob } from './queue.js'; // Re-export container-manager public API so existing callers are unaffected. -export { getActiveWorkerCount, getActiveWorkers }; +export { getActiveWorkerCount, getActiveWorkers, startOrphanCleanup, stopOrphanCleanup }; // BullMQ Workers that process jobs by spawning containers let bullWorker: Worker | null = null; @@ -68,11 +70,17 @@ export function startWorkerProcessor(): void { processFn: (job) => guardedSpawn(job as Job), }); + // Start periodic orphan cleanup scan + startOrphanCleanup(); + logger.info('[WorkerManager] Started with max', routerConfig.maxWorkers, 'concurrent workers'); } // Graceful shutdown — detach from workers, let them finish independently export async function stopWorkerProcessor(): Promise { + // Stop orphan cleanup first + stopOrphanCleanup(); + if (dashboardWorker) { await dashboardWorker.close(); dashboardWorker = null; diff --git a/tests/unit/router/container-manager.test.ts b/tests/unit/router/container-manager.test.ts index 0e134622..e80b6e34 100644 --- a/tests/unit/router/container-manager.test.ts +++ b/tests/unit/router/container-manager.test.ts @@ -4,10 +4,13 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; // Hoisted mock state — vi.hoisted creates variables before vi.mock factories run // --------------------------------------------------------------------------- -const { mockDockerCreateContainer, mockDockerGetContainer } = vi.hoisted(() => ({ - mockDockerCreateContainer: vi.fn(), - mockDockerGetContainer: vi.fn(), -})); +const { mockDockerCreateContainer, mockDockerGetContainer, mockDockerListContainers } = vi.hoisted( + () => ({ + mockDockerCreateContainer: vi.fn(), + mockDockerGetContainer: vi.fn(), + mockDockerListContainers: vi.fn(), + }), +); // --------------------------------------------------------------------------- // Module-level mocks @@ -17,6 +20,7 @@ vi.mock('dockerode', () => ({ default: vi.fn().mockImplementation(() => ({ createContainer: mockDockerCreateContainer, getContainer: mockDockerGetContainer, + listContainers: mockDockerListContainers, })), })); @@ -81,7 +85,10 @@ import { getActiveWorkerCount, getActiveWorkers, killWorker, + scanAndCleanupOrphans, spawnWorker, + startOrphanCleanup, + stopOrphanCleanup, } from '../../../src/router/container-manager.js'; import { notifyTimeout } from '../../../src/router/notifications.js'; import type { CascadeJob } from '../../../src/router/queue.js'; @@ -499,4 +506,254 @@ describe('detachAll', () => { detachAll(); expect(mockClearAllWorkItemLocks).toHaveBeenCalled(); }); + + it('calls stopOrphanCleanup on detach', async () => { + vi.spyOn(console, 'log').mockImplementation(() => {}); + setupMockContainer(); + await spawnWorker(makeJob({ id: 'job-d3' }) as never); + + startOrphanCleanup(); + expect(() => detachAll()).not.toThrow(); + // orphan cleanup timer should be cleared + }); +}); + +// --------------------------------------------------------------------------- +// startOrphanCleanup / stopOrphanCleanup +// --------------------------------------------------------------------------- + +describe('orphan cleanup', () => { + beforeEach(() => { + vi.spyOn(console, 'log').mockImplementation(() => {}); + vi.spyOn(console, 'warn').mockImplementation(() => {}); + vi.spyOn(console, 'info').mockImplementation(() => {}); + vi.spyOn(console, 'error').mockImplementation(() => {}); + mockGetAllProjectCredentials.mockResolvedValue({}); + mockDockerListContainers.mockResolvedValue([]); + detachAll(); + }); + + afterEach(() => { + vi.restoreAllMocks(); + stopOrphanCleanup(); + detachAll(); + }); + + describe('startOrphanCleanup / stopOrphanCleanup', () => { + it('starts a periodic orphan cleanup scan', () => { + expect(() => startOrphanCleanup()).not.toThrow(); + stopOrphanCleanup(); + }); + + it('stops the orphan cleanup scan', () => { + startOrphanCleanup(); + expect(() => stopOrphanCleanup()).not.toThrow(); + }); + + it('is a no-op to stop if not started', () => { + expect(() => stopOrphanCleanup()).not.toThrow(); + }); + + it('is idempotent on multiple calls', () => { + startOrphanCleanup(); + expect(() => startOrphanCleanup()).not.toThrow(); + stopOrphanCleanup(); + }); + + it('allows multiple start/stop cycles', () => { + expect(() => { + startOrphanCleanup(); + stopOrphanCleanup(); + startOrphanCleanup(); + stopOrphanCleanup(); + }).not.toThrow(); + }); + }); + + describe('scanAndCleanupOrphans', () => { + it('lists containers with cascade.managed=true label', async () => { + mockDockerListContainers.mockResolvedValue([]); + + await scanAndCleanupOrphans(); + + expect(mockDockerListContainers).toHaveBeenCalledWith( + expect.objectContaining({ + all: false, + filters: expect.objectContaining({ + label: expect.arrayContaining(['cascade.managed=true']), + }), + }), + ); + }); + + it('skips tracked containers', async () => { + setupMockContainer(); + await spawnWorker(makeJob({ id: 'job-tracked' }) as never); + + const trackedContainerId = 'container-abc123def456'; + mockDockerListContainers.mockResolvedValue([ + { + Id: trackedContainerId, + Created: Math.floor(Date.now() / 1000) - 1000, // Very old + State: 'running', + } as never, + ]); + + await scanAndCleanupOrphans(); + + // Container should NOT be stopped since it's tracked + expect(mockDockerGetContainer).not.toHaveBeenCalled(); + }); + + it('stops orphaned containers older than workerTimeoutMs', async () => { + const orphanContainerId = 'orphan-container-old'; + const now = Math.floor(Date.now() / 1000); + const createdAt = now - 6; // 6 seconds old, workerTimeoutMs is 5000ms + + const mockOrphanContainer = { + stop: vi.fn().mockResolvedValue(undefined), + }; + mockDockerListContainers.mockResolvedValue([ + { + Id: orphanContainerId, + Created: createdAt, + State: 'running', + } as never, + ]); + mockDockerGetContainer.mockReturnValue(mockOrphanContainer as never); + + await scanAndCleanupOrphans(); + + expect(mockOrphanContainer.stop).toHaveBeenCalledWith({ t: 15 }); + }); + + it('leaves young orphaned containers alone', async () => { + const youngContainerId = 'orphan-container-young'; + const now = Math.floor(Date.now() / 1000); + const createdAt = now - 1; // 1 second old, workerTimeoutMs is 5000ms + + const mockYoungContainer = { + stop: vi.fn(), + }; + mockDockerListContainers.mockResolvedValue([ + { + Id: youngContainerId, + Created: createdAt, + State: 'running', + } as never, + ]); + mockDockerGetContainer.mockReturnValue(mockYoungContainer as never); + + await scanAndCleanupOrphans(); + + // Young container should NOT be stopped + expect(mockYoungContainer.stop).not.toHaveBeenCalled(); + }); + + it('handles Docker list errors', async () => { + mockDockerListContainers.mockRejectedValue(new Error('Docker unavailable')); + + await expect(scanAndCleanupOrphans()).rejects.toThrow('Docker unavailable'); + }); + + it('handles container stop errors gracefully', async () => { + const orphanContainerId = 'orphan-stop-fails'; + const now = Math.floor(Date.now() / 1000); + const createdAt = now - 6; // Old enough + + const mockFailContainer = { + stop: vi.fn().mockRejectedValue(new Error('already stopped')), + }; + mockDockerListContainers.mockResolvedValue([ + { + Id: orphanContainerId, + Created: createdAt, + State: 'running', + } as never, + ]); + mockDockerGetContainer.mockReturnValue(mockFailContainer as never); + + // Should not throw, just log error + await expect(scanAndCleanupOrphans()).resolves.toBeUndefined(); + expect(mockFailContainer.stop).toHaveBeenCalled(); + }); + + it('stops multiple orphaned containers', async () => { + const now = Math.floor(Date.now() / 1000); + + const mockContainer1 = { + stop: vi.fn().mockResolvedValue(undefined), + }; + const mockContainer2 = { + stop: vi.fn().mockResolvedValue(undefined), + }; + + mockDockerListContainers.mockResolvedValue([ + { + Id: 'orphan-1', + Created: now - 6, + State: 'running', + } as never, + { + Id: 'orphan-2', + Created: now - 10, + State: 'running', + } as never, + ]); + + mockDockerGetContainer.mockImplementation((id: string) => { + if (id === 'orphan-1') return mockContainer1 as never; + if (id === 'orphan-2') return mockContainer2 as never; + return null; + }); + + await scanAndCleanupOrphans(); + + expect(mockContainer1.stop).toHaveBeenCalledWith({ t: 15 }); + expect(mockContainer2.stop).toHaveBeenCalledWith({ t: 15 }); + }); + + it('stops orphans but leaves tracked and young containers', async () => { + setupMockContainer(); + await spawnWorker(makeJob({ id: 'job-tracked' }) as never); + + const now = Math.floor(Date.now() / 1000); + const mockedOrphanContainer = { + stop: vi.fn().mockResolvedValue(undefined), + }; + const mockedYoungContainer = { + stop: vi.fn().mockResolvedValue(undefined), + }; + + mockDockerListContainers.mockResolvedValue([ + { + Id: 'container-abc123def456', // tracked + Created: now - 10, + State: 'running', + } as never, + { + Id: 'orphan-old', + Created: now - 6, + State: 'running', + } as never, + { + Id: 'orphan-young', + Created: now - 1, + State: 'running', + } as never, + ]); + + mockDockerGetContainer.mockImplementation((id: string) => { + if (id === 'orphan-old') return mockedOrphanContainer as never; + if (id === 'orphan-young') return mockedYoungContainer as never; + return { stop: vi.fn() } as never; + }); + + await scanAndCleanupOrphans(); + + // Only the old orphan should be stopped + expect(mockedOrphanContainer.stop).toHaveBeenCalledWith({ t: 15 }); + expect(mockedYoungContainer.stop).not.toHaveBeenCalled(); + }); + }); }); diff --git a/tests/unit/router/worker-manager.test.ts b/tests/unit/router/worker-manager.test.ts index 57a5f8da..bc0caa3d 100644 --- a/tests/unit/router/worker-manager.test.ts +++ b/tests/unit/router/worker-manager.test.ts @@ -14,6 +14,8 @@ vi.mock('../../../src/router/container-manager.js', () => ({ getActiveWorkerCount: vi.fn().mockReturnValue(0), getActiveWorkers: vi.fn().mockReturnValue([]), detachAll: vi.fn(), + startOrphanCleanup: vi.fn(), + stopOrphanCleanup: vi.fn(), })); vi.mock('../../../src/router/config.js', () => ({ @@ -47,6 +49,8 @@ import { getActiveWorkerCount, getActiveWorkers, spawnWorker, + startOrphanCleanup, + stopOrphanCleanup, } from '../../../src/router/container-manager.js'; import { startWorkerProcessor, @@ -62,6 +66,8 @@ const mockSpawnWorker = vi.mocked(spawnWorker); const mockGetActiveWorkerCount = vi.mocked(getActiveWorkerCount); const mockGetActiveWorkers = vi.mocked(getActiveWorkers); const mockDetachAll = vi.mocked(detachAll); +const mockStartOrphanCleanup = vi.mocked(startOrphanCleanup); +const mockStopOrphanCleanup = vi.mocked(stopOrphanCleanup); const mockLogger = vi.mocked(logger); // --------------------------------------------------------------------------- @@ -232,4 +238,15 @@ describe('stopWorkerProcessor', () => { expect(mockLogger.info).toHaveBeenCalledWith(expect.stringContaining('Stopped')); }); + + it('calls startOrphanCleanup during startup', () => { + startWorkerProcessor(); + expect(mockStartOrphanCleanup).toHaveBeenCalled(); + }); + + it('calls stopOrphanCleanup during shutdown', async () => { + startWorkerProcessor(); + await stopWorkerProcessor(); + expect(mockStopOrphanCleanup).toHaveBeenCalled(); + }); });