Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/router/container-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { logger } from '../utils/logging.js';
import { activeWorkers, cleanupWorker } from './active-workers.js';
import { clearAllAgentTypeLocks } from './agent-type-lock.js';
import { loadProjectConfig, routerConfig } from './config.js';
import { ROUTER_INSTANCE_ID } from './instance-id.js';
import { notifyTimeout } from './notifications.js';
import { stopOrphanCleanup } from './orphan-cleanup.js';
import type { CascadeJob } from './queue.js';
Expand Down Expand Up @@ -386,6 +387,11 @@ async function createAndMonitorContainer(
'cascade.job.id': jobId,
'cascade.job.type': job.data.type,
'cascade.managed': 'true',
// Pinning the spawning router's instance id stops sibling
// cascade-router instances on the same host from claiming
// each other's containers as orphans — see `instance-id.ts`
// and `orphan-cleanup.ts:scanAndCleanupOrphans`.
'cascade.router.instance': ROUTER_INSTANCE_ID,
'cascade.project.id': projectId ?? '',
'cascade.agent.type': agentType ?? '',
'cascade.snapshot.enabled': snapshotEnabled ? 'true' : 'false',
Expand Down
3 changes: 2 additions & 1 deletion src/router/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { LinearRouterAdapter } from './adapters/linear.js';
import { SentryRouterAdapter } from './adapters/sentry.js';
import { TrelloRouterAdapter } from './adapters/trello.js';
import { startCancelListener, stopCancelListener } from './cancel-listener.js';
import { ROUTER_INSTANCE_ID } from './instance-id.js';
import { getQueueStats } from './queue.js';
import { processRouterWebhook } from './webhook-processor.js';
import {
Expand Down Expand Up @@ -242,7 +243,7 @@ async function startRouter(): Promise<void> {
await startCancelListener();

startWorkerProcessor();
logger.info('Starting router', { port });
logger.info('Starting router', { port, instanceId: ROUTER_INSTANCE_ID });
serve({ fetch: app.fetch, port });
}

Expand Down
43 changes: 43 additions & 0 deletions src/router/instance-id.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Stable identifier for this cascade-router process. Used to:
*
* - **Tag spawned worker containers** with a `cascade.router.instance`
* Docker label (see `container-manager.ts`).
* - **Scope the periodic orphan-cleanup scan** to containers carrying
* THIS instance's id (see `orphan-cleanup.ts`).
*
* Without this, two cascade-router instances on the same host (prod +
* dev, two local-dev sandboxes, k8s replicas pre-this-change) would
* each treat the other's worker containers as orphans and silently
* `docker stop` them at the 30-min `workerTimeoutMs` mark — surfacing
* downstream as `exit 137 · OOMKilled=false` agent runs that everyone
* blamed on memory.
*
* Resolution order:
* 1. `process.env.CASCADE_ROUTER_INSTANCE` (trimmed) — explicit
* override for the rare case where two instances share a hostname
* (e.g. local docker with `--network host`).
* 2. `os.hostname()` — Docker injects the container's short id here
* by default, which is per-container unique and stable across the
* cascade-router process's lifetime. This is the normal path.
*
* Memoised at module load. The pure resolver is exported for direct
* unit testing.
*/
import os from 'node:os';

export function resolveRouterInstanceId(
env: NodeJS.ProcessEnv | Record<string, string | undefined>,
hostname: string,
): string {
const fromEnv = env.CASCADE_ROUTER_INSTANCE?.trim();
if (fromEnv) return fromEnv;
const fromHost = hostname.trim();
if (fromHost) return fromHost;
throw new Error(
'Cannot resolve router instance id: both CASCADE_ROUTER_INSTANCE and os.hostname() are empty. ' +
'Set CASCADE_ROUTER_INSTANCE explicitly to disambiguate this cascade-router instance.',
);
}

export const ROUTER_INSTANCE_ID = resolveRouterInstanceId(process.env, os.hostname());
25 changes: 21 additions & 4 deletions src/router/orphan-cleanup.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
/**
* Orphaned container cleanup for CASCADE worker processes.
*
* Self-contained periodic task that scans for containers with cascade.managed=true
* that are not tracked in the activeWorkers map and are older than workerTimeoutMs.
* Self-contained periodic task that scans for containers with
* `cascade.managed=true` AND `cascade.router.instance=<this instance>`
* that are not tracked in the activeWorkers map and are older than
* workerTimeoutMs.
*
* The `cascade.router.instance` clause is the safety belt that stops
* sibling cascade-router instances on the same host (prod ↔ dev,
* multi-replica deployments) from killing each other's healthy
* workers — see `instance-id.ts` for the resolver.
*/

import Docker from 'dockerode';
Expand All @@ -11,6 +18,7 @@ import { captureException } from '../sentry.js';
import { logger } from '../utils/logging.js';
import { getTrackedContainerIds } from './active-workers.js';
import { routerConfig } from './config.js';
import { ROUTER_INSTANCE_ID } from './instance-id.js';

const docker = new Docker();

Expand Down Expand Up @@ -44,7 +52,9 @@ export function startOrphanCleanup(): void {
});
}, ORPHAN_SCAN_INTERVAL_MS);

logger.info('[WorkerManager] Started orphan cleanup scan (every 5 minutes)');
logger.info('[WorkerManager] Started orphan cleanup scan (every 5 minutes)', {
instanceId: ROUTER_INSTANCE_ID,
});
}

/**
Expand Down Expand Up @@ -72,7 +82,13 @@ export async function scanAndCleanupOrphans(): Promise<void> {
const containers = await docker.listContainers({
all: false, // Only running containers
filters: {
label: ['cascade.managed=true'],
// Two-clause filter: `cascade.managed=true` keeps the cross-
// instance debugging affordance (`docker ps -a --filter
// label=cascade.managed=true` finds every cascade worker
// regardless of who spawned it), and the per-instance label
// is the safety belt that stops sibling routers from
// claiming each other's containers as orphans.
label: ['cascade.managed=true', `cascade.router.instance=${ROUTER_INSTANCE_ID}`],
},
});

Expand Down Expand Up @@ -115,6 +131,7 @@ export async function scanAndCleanupOrphans(): Promise<void> {
logger.warn('[WorkerManager] Stopped and removed orphaned container:', {
containerId: containerId.slice(0, 12),
ageMinutes,
instanceId: ROUTER_INSTANCE_ID,
});

// Update DB run status (fire-and-forget). Containers created before this
Expand Down
23 changes: 23 additions & 0 deletions tests/unit/router/container-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,29 @@ describe('spawnWorker', () => {
resolveWait();
});

it('includes cascade.router.instance label so sibling routers do not orphan-kill this container', async () => {
const { resolveWait } = setupMockContainer();

await spawnWorker(
makeJob({
id: 'job-instance',
data: { type: 'trello', projectId: 'proj-1' } as CascadeJob,
}) as never,
);

const createCall = mockDockerCreateContainer.mock.calls[0]?.[0] as {
Labels: Record<string, string>;
};
const instanceLabel = createCall.Labels['cascade.router.instance'];
// The exact value depends on os.hostname() / the env override; pin
// "non-empty string" — it's the mere presence + uniqueness that
// makes orphan-cleanup's per-instance filter work.
expect(instanceLabel).toBeTruthy();
expect(typeof instanceLabel).toBe('string');

resolveWait();
});

it('uses project watchdogTimeoutMs + 2min buffer when available', async () => {
mockLoadProjectConfig.mockResolvedValue({
projects: [],
Expand Down
46 changes: 46 additions & 0 deletions tests/unit/router/instance-id.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { describe, expect, it } from 'vitest';
import { resolveRouterInstanceId } from '../../../src/router/instance-id.js';

describe('resolveRouterInstanceId', () => {
it('returns process.env.CASCADE_ROUTER_INSTANCE when set non-empty', () => {
expect(
resolveRouterInstanceId({ CASCADE_ROUTER_INSTANCE: 'cascade-router-prod' }, 'fallback-host'),
).toBe('cascade-router-prod');
});

it('falls back to hostname when env is undefined', () => {
expect(resolveRouterInstanceId({}, 'bauer-12345')).toBe('bauer-12345');
});

it('falls back to hostname when env is empty string', () => {
expect(resolveRouterInstanceId({ CASCADE_ROUTER_INSTANCE: '' }, 'bauer-12345')).toBe(
'bauer-12345',
);
});

it('falls back to hostname when env is whitespace-only', () => {
expect(resolveRouterInstanceId({ CASCADE_ROUTER_INSTANCE: ' ' }, 'bauer-12345')).toBe(
'bauer-12345',
);
});

it('trims whitespace from the env value when honoring it', () => {
expect(
resolveRouterInstanceId(
{ CASCADE_ROUTER_INSTANCE: ' cascade-router-staging ' },
'fallback-host',
),
).toBe('cascade-router-staging');
});

it('throws when both env and hostname are empty (defensive)', () => {
// Should never happen in practice — os.hostname() always returns
// something — but a fail-loud guard is cheap and prevents a silent
// mis-tagging if a future runtime ever returns '' from hostname().
expect(() => resolveRouterInstanceId({}, '')).toThrow(/Cannot resolve router instance id/i);
});

it('rejects whitespace-only hostname', () => {
expect(() => resolveRouterInstanceId({}, ' ')).toThrow(/Cannot resolve router instance id/i);
});
});
25 changes: 25 additions & 0 deletions tests/unit/router/orphan-cleanup.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,31 @@ describe('orphan-cleanup', () => {
);
});

it('scopes the scan to THIS instance via cascade.router.instance label', async () => {
// Critical safety property: when two cascade-router instances run on
// the same host (prod + dev, k8s replicas, two local sandboxes),
// each must only see its OWN spawned containers. Without this the
// dev instance silently `docker stop`s the prod instance's workers
// at the 30-min `workerTimeoutMs` mark — surfacing downstream as
// `exit 137 · OOMKilled=false` runs that masquerade as OOM.
mockDockerListContainers.mockResolvedValue([]);

await scanAndCleanupOrphans();

const firstCallArg = mockDockerListContainers.mock.calls[0]?.[0] as {
filters: { label: string[] };
};
const labelFilters = firstCallArg.filters.label;
expect(labelFilters).toContain('cascade.managed=true');
// The instance label must be present and non-empty. Pinning the
// exact value would tie the test to whatever hostname the test
// environment happens to expose; pinning the prefix is what
// matters for the safety property.
const instanceLabel = labelFilters.find((l) => l.startsWith('cascade.router.instance='));
expect(instanceLabel).toBeDefined();
expect(instanceLabel?.split('=')[1]).toBeTruthy();
});

it('skips tracked containers', async () => {
const trackedContainerId = 'container-abc123def456';
mockTrackedIds.add(trackedContainerId);
Expand Down
Loading