---
src/agents/prompts/index.ts | 15 +++++++++++++++
src/api/routers/agentConfigs.ts | 10 +++++++++-
src/triggers/shared/manual-runner.ts | 3 +++
tests/unit/api/routers/agentConfigs.test.ts | 5 +++++
tests/unit/triggers/manual-runner.test.ts | 4 ++++
.../projects/agent-prompt-overrides.tsx | 11 ++++++-----
6 files changed, 42 insertions(+), 6 deletions(-)
diff --git a/src/agents/prompts/index.ts b/src/agents/prompts/index.ts
index 1236a244..ae0e650d 100644
--- a/src/agents/prompts/index.ts
+++ b/src/agents/prompts/index.ts
@@ -4,6 +4,7 @@ import { fileURLToPath } from 'node:url';
import { Eta } from 'eta';
import { resolveKnownAgentTypes } from '../definitions/index.js';
+import { loadAgentDefinition } from '../definitions/loader.js';
const __dirname = dirname(fileURLToPath(import.meta.url));
const templatesDir = join(__dirname, 'templates');
@@ -231,6 +232,20 @@ export function renderInlineTaskPrompt(
return taskEta.renderString(expanded, context);
}
+/**
+ * Returns the YAML-defined taskPrompt for an agent type (the factory default).
+ * Does not require initPrompts() — reads directly from YAML.
+ * Returns null if the agent type is unknown or has no taskPrompt defined.
+ */
+export function getDefaultTaskPrompt(agentType: string): string | null {
+ try {
+ const definition = loadAgentDefinition(agentType);
+ return definition.prompts.taskPrompt ?? null;
+ } catch {
+ return null;
+ }
+}
+
/** Returns the raw .eta template source from disk (before rendering). */
export function getRawTemplate(agentType: string): string {
requireInitialized('getRawTemplate');
diff --git a/src/api/routers/agentConfigs.ts b/src/api/routers/agentConfigs.ts
index 466a1436..8d46bd01 100644
--- a/src/api/routers/agentConfigs.ts
+++ b/src/api/routers/agentConfigs.ts
@@ -2,7 +2,11 @@ import { TRPCError } from '@trpc/server';
import { eq } from 'drizzle-orm';
import { z } from 'zod';
import { resolveAgentDefinition } from '../../agents/definitions/index.js';
-import { getRawTemplate, validateTemplate } from '../../agents/prompts/index.js';
+import {
+ getDefaultTaskPrompt,
+ getRawTemplate,
+ validateTemplate,
+} from '../../agents/prompts/index.js';
import { getEngineCatalog, registerBuiltInEngines } from '../../backends/index.js';
import { EngineSettingsSchema } from '../../config/engineSettings.js';
import { getDb } from '../../db/client.js';
@@ -178,12 +182,16 @@ export const agentConfigsRouter = router({
// No .eta template on disk — skip gracefully
}
+ // 4. YAML-defined task prompt (factory default)
+ const defaultTaskPrompt = getDefaultTaskPrompt(input.agentType);
+
return {
projectSystemPrompt,
projectTaskPrompt,
globalSystemPrompt,
globalTaskPrompt,
defaultSystemPrompt,
+ defaultTaskPrompt,
};
}),
});
diff --git a/src/triggers/shared/manual-runner.ts b/src/triggers/shared/manual-runner.ts
index 715507a0..ba63bcca 100644
--- a/src/triggers/shared/manual-runner.ts
+++ b/src/triggers/shared/manual-runner.ts
@@ -4,6 +4,7 @@ import { getRunById } from '../../db/repositories/runsRepository.js';
import { withPMCredentials } from '../../pm/context.js';
import { createPMProvider, pmRegistry, withPMProvider } from '../../pm/index.js';
import type { AgentInput, CascadeConfig, ProjectConfig } from '../../types/index.js';
+import { startWatchdog } from '../../utils/lifecycle.js';
import { logger } from '../../utils/logging.js';
import { formatValidationErrors, validateIntegrations } from './integration-validation.js';
@@ -104,6 +105,8 @@ export async function triggerManualRun(
markTriggerRunning(triggerKey);
+ startWatchdog(project.watchdogTimeoutMs);
+
const agentInput: AgentInput & { project: ProjectConfig; config: CascadeConfig } = {
workItemId: input.workItemId,
prNumber: input.prNumber,
diff --git a/tests/unit/api/routers/agentConfigs.test.ts b/tests/unit/api/routers/agentConfigs.test.ts
index 1fadeaa5..61e20b27 100644
--- a/tests/unit/api/routers/agentConfigs.test.ts
+++ b/tests/unit/api/routers/agentConfigs.test.ts
@@ -15,6 +15,7 @@ const {
mockLoadPartials,
mockResolveAgentDefinition,
mockGetRawTemplate,
+ mockGetDefaultTaskPrompt,
} = vi.hoisted(() => ({
mockListAgentConfigs: vi.fn(),
mockCreateAgentConfig: vi.fn(),
@@ -27,6 +28,7 @@ const {
mockLoadPartials: vi.fn(),
mockResolveAgentDefinition: vi.fn(),
mockGetRawTemplate: vi.fn(),
+ mockGetDefaultTaskPrompt: vi.fn().mockReturnValue(null),
}));
vi.mock('../../../../src/db/repositories/settingsRepository.js', () => ({
@@ -45,6 +47,7 @@ vi.mock('../../../../src/backends/index.js', () => ({
vi.mock('../../../../src/agents/prompts/index.js', () => ({
validateTemplate: (...args: unknown[]) => mockValidateTemplate(...args),
getRawTemplate: (...args: unknown[]) => mockGetRawTemplate(...args),
+ getDefaultTaskPrompt: (...args: unknown[]) => mockGetDefaultTaskPrompt(...args),
}));
vi.mock('../../../../src/db/repositories/partialsRepository.js', () => ({
@@ -566,6 +569,7 @@ describe('agentConfigsRouter', () => {
},
});
mockGetRawTemplate.mockReturnValue('raw disk template content');
+ mockGetDefaultTaskPrompt.mockReturnValue('yaml default task prompt');
const caller = createCaller({ user: mockUser, effectiveOrgId: mockUser.orgId });
const result = await caller.getPrompts({ projectId: 'proj-1', agentType: 'implementation' });
@@ -576,6 +580,7 @@ describe('agentConfigsRouter', () => {
globalSystemPrompt: 'global system prompt',
globalTaskPrompt: 'global task prompt',
defaultSystemPrompt: 'raw disk template content',
+ defaultTaskPrompt: 'yaml default task prompt',
});
});
diff --git a/tests/unit/triggers/manual-runner.test.ts b/tests/unit/triggers/manual-runner.test.ts
index 2fd37e21..f86808b1 100644
--- a/tests/unit/triggers/manual-runner.test.ts
+++ b/tests/unit/triggers/manual-runner.test.ts
@@ -44,6 +44,10 @@ vi.mock('../../../src/triggers/shared/integration-validation.js', () => ({
formatValidationErrors: vi.fn().mockReturnValue(''),
}));
+vi.mock('../../../src/utils/lifecycle.js', () => ({
+ startWatchdog: vi.fn(),
+}));
+
import { runAgent } from '../../../src/agents/registry.js';
import { isAgentEnabledForProject } from '../../../src/db/repositories/agentConfigsRepository.js';
import { getRunById } from '../../../src/db/repositories/runsRepository.js';
diff --git a/web/src/components/projects/agent-prompt-overrides.tsx b/web/src/components/projects/agent-prompt-overrides.tsx
index c01dd003..94770753 100644
--- a/web/src/components/projects/agent-prompt-overrides.tsx
+++ b/web/src/components/projects/agent-prompt-overrides.tsx
@@ -66,7 +66,8 @@ export function AgentPromptOverrides({
// Initialize with project override, then fall back to global, then default
const initialSystem =
data.projectSystemPrompt ?? data.globalSystemPrompt ?? data.defaultSystemPrompt ?? '';
- const initialTask = data.projectTaskPrompt ?? data.globalTaskPrompt ?? '';
+ const initialTask =
+ data.projectTaskPrompt ?? data.globalTaskPrompt ?? data.defaultTaskPrompt ?? '';
onSystemPromptChange(initialSystem);
onTaskPromptChange(initialTask);
// eslint-disable-next-line react-hooks/exhaustive-deps
@@ -123,7 +124,7 @@ export function AgentPromptOverrides({
const taskBadge = getInheritanceBadge({
projectOverride: data?.projectTaskPrompt ?? null,
globalPrompt: data?.globalTaskPrompt ?? null,
- defaultPrompt: null,
+ defaultPrompt: data?.defaultTaskPrompt ?? null,
});
const currentBadge = isSystemSection ? systemBadge : taskBadge;
@@ -138,8 +139,8 @@ export function AgentPromptOverrides({
if (isSystemSection && data?.defaultSystemPrompt) {
onSystemPromptChange(data.defaultSystemPrompt);
setValidationStatus(null);
- } else if (!isSystemSection && data?.globalTaskPrompt) {
- onTaskPromptChange(data.globalTaskPrompt);
+ } else if (!isSystemSection && data?.defaultTaskPrompt) {
+ onTaskPromptChange(data.defaultTaskPrompt);
setValidationStatus(null);
}
};
@@ -163,7 +164,7 @@ export function AgentPromptOverrides({
const hasProjectSystemOverride = !!data?.projectSystemPrompt;
const hasProjectTaskOverride = !!data?.projectTaskPrompt;
- const canLoadDefault = isSystemSection ? !!data?.defaultSystemPrompt : !!data?.globalTaskPrompt;
+ const canLoadDefault = isSystemSection ? !!data?.defaultSystemPrompt : !!data?.defaultTaskPrompt;
return (
From 2c4b078641ec40900b1a620a3679e58a4b5c5ca4 Mon Sep 17 00:00:00 2001
From: Zbigniew Sobiecki
Date: Mon, 16 Mar 2026 16:24:08 +0000
Subject: [PATCH 4/5] feat(router): unified agent run timeout system
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Replaces two independent, uncoordinated timeout mechanisms with a single
coherent flow where `watchdogTimeoutMs` is the source of truth.
## Problem
Two timeouts existed with no knowledge of each other:
1. In-container watchdog (`startWatchdog(project.watchdogTimeoutMs)`) —
per-project, updates DB to `timed_out` then exits.
2. Router-level kill (`setTimeout → killWorker`) — global env var,
killed the Docker container with no DB update.
This caused three bugs:
- If `WORKER_TIMEOUT_MS` < `watchdogTimeoutMs` the router killed the
container before the watchdog could set the correct DB status.
- GitHub-triggered runs (no `workItemId`) were never marked in the DB
after a router kill — they stayed `running` forever.
- Orphaned containers (after router restart) were stopped but their DB
runs were never updated.
## Solution
**Per-project timeout in `spawnWorker`**: router now reads
`watchdogTimeoutMs` from project config and uses it + 2-minute buffer
(`ROUTER_KILL_BUFFER_MS`) for the container kill timer, so the watchdog
always fires first and the router is purely a backstop.
**DB update on router kill (`killWorker`)**: after stopping the
container, marks the run `timed_out` via `failOrphanedRun` (workItemId
path) or `failOrphanedRunFallback` (GitHub PR runs without workItemId).
The call to `cleanupWorker` no longer passes an exit code so it skips
its own DB write, eliminating the race that could set the wrong status
(`failed` instead of `timed_out`).
**Fallback for GitHub PR runs (`failOrphanedRunFallback`)**: new
repository function that finds the most recent running run by
`projectId + agentType + startedAt ≥ containerStart` and marks it,
guarded by an optimistic `WHERE status='running'` check so it is
always safe to call even if the watchdog already acted.
**DB update in `cleanupWorker`**: extended to also handle the
workItemId-absent case via `failOrphanedRunFallback`, covering crashes
of GitHub PR runs that the watchdog didn't catch.
**`cascade.agent.type` container label**: added at spawn time so orphan
cleanup can pass `agentType` to `failOrphanedRunFallback`, avoiding
matching the wrong run when multiple agent types run concurrently.
**`durationMs` on orphaned runs**: all three fail paths now compute and
persist the elapsed duration so dashboard users see actual run time
instead of null.
**Fixed BullMQ `lockDuration`**: replaced `workerTimeoutMs + 60s` with
a fixed 8-hour constant (`BULLMQ_LOCK_DURATION_MS`) — `guardedSpawn`
resolves immediately after container start so the lock is held for
seconds, and tying it to `workerTimeoutMs` risked lock expiry for
long-running project configs.
Co-Authored-By: Claude Sonnet 4.6
---
src/db/repositories/runsRepository.ts | 48 +++++-
src/router/active-workers.ts | 57 ++++---
src/router/container-manager.ts | 71 ++++++++-
src/router/orphan-cleanup.ts | 32 ++++
src/router/worker-manager.ts | 9 +-
tests/unit/router/active-workers.test.ts | 63 ++++++++
tests/unit/router/container-manager.test.ts | 137 +++++++++++++++-
tests/unit/router/orphan-cleanup.test.ts | 163 +++++++++++++++++++-
8 files changed, 537 insertions(+), 43 deletions(-)
diff --git a/src/db/repositories/runsRepository.ts b/src/db/repositories/runsRepository.ts
index 78a0d531..1fbac5ff 100644
--- a/src/db/repositories/runsRepository.ts
+++ b/src/db/repositories/runsRepository.ts
@@ -358,6 +358,8 @@ export async function failOrphanedRun(
projectId: string,
workItemId: string,
reason: string,
+ status: 'failed' | 'timed_out' = 'failed',
+ durationMs?: number,
): Promise {
const db = getDb();
const [row] = await db
@@ -377,9 +379,53 @@ export async function failOrphanedRun(
const [updated] = await db
.update(agentRuns)
.set({
- status: 'failed',
+ status,
+ completedAt: new Date(),
+ error: reason,
+ durationMs,
+ })
+ .where(and(eq(agentRuns.id, row.id), eq(agentRuns.status, 'running')))
+ .returning({ id: agentRuns.id });
+ return updated?.id ?? null;
+}
+
+/**
+ * Fail the most recent running run for a project without a workItemId (e.g. GitHub PR runs).
+ * Uses projectId + optional agentType + startedAfter to identify the run.
+ * Guards on status='running' so it's safe to call even if the run already completed.
+ */
+export async function failOrphanedRunFallback(
+ projectId: string,
+ agentType: string | undefined,
+ startedAfter: Date,
+ status: 'failed' | 'timed_out',
+ reason: string,
+ durationMs?: number,
+): Promise {
+ const db = getDb();
+ const conditions: SQL[] = [
+ eq(agentRuns.projectId, projectId),
+ eq(agentRuns.status, 'running'),
+ gte(agentRuns.startedAt, startedAfter),
+ ];
+ if (agentType) {
+ conditions.push(eq(agentRuns.agentType, agentType));
+ }
+ const [row] = await db
+ .select({ id: agentRuns.id })
+ .from(agentRuns)
+ .where(and(...conditions))
+ .orderBy(desc(agentRuns.startedAt))
+ .limit(1);
+ if (!row) return null;
+
+ const [updated] = await db
+ .update(agentRuns)
+ .set({
+ status,
completedAt: new Date(),
error: reason,
+ durationMs,
})
.where(and(eq(agentRuns.id, row.id), eq(agentRuns.status, 'running')))
.returning({ id: agentRuns.id });
diff --git a/src/router/active-workers.ts b/src/router/active-workers.ts
index f55b74c6..8e3b74eb 100644
--- a/src/router/active-workers.ts
+++ b/src/router/active-workers.ts
@@ -5,7 +5,7 @@
* Tracks running worker containers and handles cleanup of their associated locks.
*/
-import { failOrphanedRun } from '../db/repositories/runsRepository.js';
+import { failOrphanedRun, failOrphanedRunFallback } from '../db/repositories/runsRepository.js';
import { logger } from '../utils/logging.js';
import { clearAgentTypeEnqueued } from './agent-type-lock.js';
import type { CascadeJob } from './queue.js';
@@ -46,7 +46,9 @@ export function getActiveWorkers(): Array<{ jobId: string; startedAt: Date }> {
/**
* Clean up worker tracking state (timeout handle + map entry).
- * When exitCode is non-zero, marks the corresponding DB run as failed (fire-and-forget).
+ * When exitCode is non-zero, marks the DB run as 'failed' — crash path only.
+ * The timeout path (killWorker) handles its own 'timed_out' DB update and calls
+ * cleanupWorker without an exitCode so this block is skipped.
*/
export function cleanupWorker(jobId: string, exitCode?: number): void {
const worker = activeWorkers.get(jobId);
@@ -58,29 +60,40 @@ export function cleanupWorker(jobId: string, exitCode?: number): void {
if (worker.projectId && worker.workItemId && worker.agentType) {
clearWorkItemEnqueued(worker.projectId, worker.workItemId, worker.agentType);
}
- if (worker.projectId && worker.workItemId) {
- if (exitCode !== undefined && exitCode !== 0) {
- failOrphanedRun(
- worker.projectId,
- worker.workItemId,
- `Worker crashed with exit code ${exitCode}`,
- )
- .then((runId) => {
- if (runId) {
- logger.info('[WorkerManager] Marked orphaned run as failed:', {
- jobId,
- runId,
- exitCode,
- });
- }
- })
- .catch((err) => {
- logger.error('[WorkerManager] Failed to mark orphaned run:', {
+ if (exitCode !== undefined && exitCode !== 0 && worker.projectId) {
+ const durationMs = Date.now() - worker.startedAt.getTime();
+ const updatePromise = worker.workItemId
+ ? failOrphanedRun(
+ worker.projectId,
+ worker.workItemId,
+ `Worker crashed with exit code ${exitCode}`,
+ 'failed',
+ durationMs,
+ )
+ : failOrphanedRunFallback(
+ worker.projectId,
+ worker.agentType,
+ worker.startedAt,
+ 'failed',
+ `Worker crashed with exit code ${exitCode}`,
+ durationMs,
+ );
+ updatePromise
+ .then((runId) => {
+ if (runId) {
+ logger.info('[WorkerManager] Marked orphaned run as failed:', {
jobId,
- error: String(err),
+ runId,
+ exitCode,
});
+ }
+ })
+ .catch((err) => {
+ logger.error('[WorkerManager] Failed to mark orphaned run:', {
+ jobId,
+ error: String(err),
});
- }
+ });
}
activeWorkers.delete(jobId);
logger.info('[WorkerManager] Worker cleaned up:', {
diff --git a/src/router/container-manager.ts b/src/router/container-manager.ts
index ac9d96e1..4eb92ac7 100644
--- a/src/router/container-manager.ts
+++ b/src/router/container-manager.ts
@@ -12,11 +12,12 @@
import type { Job } from 'bullmq';
import Docker from 'dockerode';
+import { failOrphanedRun, failOrphanedRunFallback } from '../db/repositories/runsRepository.js';
import { captureException } from '../sentry.js';
import { logger } from '../utils/logging.js';
import { activeWorkers, cleanupWorker } from './active-workers.js';
import { clearAllAgentTypeLocks } from './agent-type-lock.js';
-import { routerConfig } from './config.js';
+import { loadProjectConfig, routerConfig } from './config.js';
import { notifyTimeout } from './notifications.js';
import { stopOrphanCleanup } from './orphan-cleanup.js';
import type { CascadeJob } from './queue.js';
@@ -48,6 +49,9 @@ export {
const docker = new Docker();
+/** Buffer added on top of the in-container watchdog so the router kill is always a backstop. */
+const ROUTER_KILL_BUFFER_MS = 2 * 60 * 1000;
+
/**
* Spawn a worker container for a job.
* Sets up timeout tracking and monitors container exit asynchronously.
@@ -61,6 +65,22 @@ export async function spawnWorker(job: Job): Promise {
const workerEnv = await buildWorkerEnvWithProjectId(job, projectId);
const hasCredentials = workerEnv.some((e) => e.startsWith('CASCADE_CREDENTIAL_KEYS='));
+ // Extract agentType early so it can be included in container labels
+ // (needed by orphan cleanup to narrow DB fallback queries to the right agent type)
+ const agentType = extractAgentType(job.data);
+
+ // Determine container timeout: use project's watchdogTimeoutMs + buffer if available,
+ // falling back to the global workerTimeoutMs. This makes watchdogTimeoutMs the single source
+ // of truth — the in-container watchdog fires first, router kill is a backup.
+ let containerTimeoutMs = routerConfig.workerTimeoutMs;
+ if (projectId) {
+ const { fullProjects } = await loadProjectConfig();
+ const projectCfg = fullProjects.find((p) => p.id === projectId);
+ if (projectCfg?.watchdogTimeoutMs) {
+ containerTimeoutMs = projectCfg.watchdogTimeoutMs + ROUTER_KILL_BUFFER_MS;
+ }
+ }
+
logger.info('[WorkerManager] Spawning worker:', {
jobId,
type: job.data.type,
@@ -83,12 +103,14 @@ export async function spawnWorker(job: Job): Promise {
'cascade.job.id': jobId,
'cascade.job.type': job.data.type,
'cascade.managed': 'true',
+ 'cascade.project.id': projectId ?? '',
+ 'cascade.agent.type': agentType ?? '',
},
});
await container.start();
- // Set up timeout
+ // Set up timeout — fires at watchdogTimeoutMs + 2min (router backup kill)
const startedAt = new Date();
const timeoutHandle = setTimeout(() => {
const durationMs = Date.now() - startedAt.getTime();
@@ -104,11 +126,10 @@ export async function spawnWorker(job: Job): Promise {
killWorker(jobId).catch((err) => {
logger.error('[WorkerManager] Failed to kill timed-out worker:', err);
});
- }, routerConfig.workerTimeoutMs);
+ }, containerTimeoutMs);
// Track the worker
const workItemId = extractWorkItemId(job.data);
- const agentType = extractAgentType(job.data);
activeWorkers.set(jobId, {
containerId: container.id,
jobId,
@@ -203,8 +224,45 @@ export async function killWorker(jobId: string): Promise {
});
}
- // Send timeout notification (fire-and-forget)
const durationMs = Date.now() - worker.startedAt.getTime();
+
+ // Update DB run status to timed_out (fire-and-forget, no-op if watchdog already did it).
+ // cleanupWorker is called below without an exitCode so it skips its own DB update,
+ // avoiding a race where the wrong status ('failed') could win.
+ if (worker.projectId) {
+ const dbUpdate = worker.workItemId
+ ? failOrphanedRun(
+ worker.projectId,
+ worker.workItemId,
+ 'Router timeout',
+ 'timed_out',
+ durationMs,
+ )
+ : failOrphanedRunFallback(
+ worker.projectId,
+ worker.agentType,
+ worker.startedAt,
+ 'timed_out',
+ 'Router timeout',
+ durationMs,
+ );
+ dbUpdate
+ .then((runId) => {
+ if (runId)
+ logger.info('[WorkerManager] Marked run timed_out after router kill', {
+ jobId,
+ runId,
+ });
+ })
+ .catch((err) =>
+ logger.error('[WorkerManager] DB update failed after router kill', {
+ jobId,
+ error: String(err),
+ }),
+ );
+ }
+
+ // Send timeout notification (fire-and-forget)
notifyTimeout(worker.job, {
jobId: worker.jobId,
startedAt: worker.startedAt,
@@ -213,7 +271,8 @@ export async function killWorker(jobId: string): Promise {
logger.error('[WorkerManager] Timeout notification error:', String(err));
});
- cleanupWorker(jobId, 137);
+ // No exitCode — DB update is handled above with the correct 'timed_out' status
+ cleanupWorker(jobId);
}
/**
diff --git a/src/router/orphan-cleanup.ts b/src/router/orphan-cleanup.ts
index cc690b36..f8747a02 100644
--- a/src/router/orphan-cleanup.ts
+++ b/src/router/orphan-cleanup.ts
@@ -6,6 +6,7 @@
*/
import Docker from 'dockerode';
+import { failOrphanedRunFallback } from '../db/repositories/runsRepository.js';
import { captureException } from '../sentry.js';
import { logger } from '../utils/logging.js';
import { getTrackedContainerIds } from './active-workers.js';
@@ -108,6 +109,37 @@ export async function scanAndCleanupOrphans(): Promise {
containerId: containerId.slice(0, 12),
ageMinutes,
});
+
+ // Update DB run status (fire-and-forget). Containers created before this
+ // change won't have labels (projectId = '' → falsy) → skip, harmless.
+ const projectId = containerInfo.Labels?.['cascade.project.id'];
+ if (projectId) {
+ const containerCreatedAt = new Date(containerInfo.Created * 1000);
+ const orphanDurationMs = now - containerInfo.Created * 1000;
+ // agentType narrows the fallback query when multiple agent types run concurrently
+ const orphanAgentType = containerInfo.Labels?.['cascade.agent.type'] || undefined;
+ failOrphanedRunFallback(
+ projectId,
+ orphanAgentType,
+ containerCreatedAt,
+ 'failed',
+ 'Orphan cleanup: container stopped',
+ orphanDurationMs,
+ )
+ .then((runId) => {
+ if (runId)
+ logger.info('[WorkerManager] Marked orphaned run as failed after cleanup', {
+ containerId: containerId.slice(0, 12),
+ runId,
+ });
+ })
+ .catch((err) =>
+ logger.error('[WorkerManager] DB update failed after orphan cleanup', {
+ containerId: containerId.slice(0, 12),
+ error: String(err),
+ }),
+ );
+ }
} catch (err) {
// Container might already be stopped — log but continue
logger.warn('[WorkerManager] Error stopping orphaned container:', {
diff --git a/src/router/worker-manager.ts b/src/router/worker-manager.ts
index 58798157..c6612f9b 100644
--- a/src/router/worker-manager.ts
+++ b/src/router/worker-manager.ts
@@ -28,6 +28,11 @@ export { getActiveWorkerCount, getActiveWorkers, startOrphanCleanup, stopOrphanC
let bullWorker: Worker | null = null;
let dashboardWorker: Worker | null = null;
+// Fixed lock duration that outlasts any realistic run. guardedSpawn resolves
+// immediately after container start, so BullMQ holds the lock for mere seconds.
+// Using a fixed 8-hour value prevents lock expiry for long-running containers.
+const BULLMQ_LOCK_DURATION_MS = 8 * 60 * 60 * 1000;
+
/** Guard that enforces the per-router concurrency cap before spawning. */
async function guardedSpawn(job: Job): Promise {
// Check if we have capacity.
@@ -55,7 +60,7 @@ export function startWorkerProcessor(): void {
label: 'Job',
connection,
concurrency: routerConfig.maxWorkers,
- lockDuration: routerConfig.workerTimeoutMs + 60000,
+ lockDuration: BULLMQ_LOCK_DURATION_MS,
processFn: guardedSpawn,
});
@@ -66,7 +71,7 @@ export function startWorkerProcessor(): void {
label: 'Dashboard job',
connection,
concurrency: routerConfig.maxWorkers,
- lockDuration: routerConfig.workerTimeoutMs + 60000,
+ lockDuration: BULLMQ_LOCK_DURATION_MS,
processFn: (job) => guardedSpawn(job as Job),
});
diff --git a/tests/unit/router/active-workers.test.ts b/tests/unit/router/active-workers.test.ts
index aae8e6b8..0da4dc5e 100644
--- a/tests/unit/router/active-workers.test.ts
+++ b/tests/unit/router/active-workers.test.ts
@@ -6,12 +6,14 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
const {
mockFailOrphanedRun,
+ mockFailOrphanedRunFallback,
mockClearWorkItemEnqueued,
mockClearAllWorkItemLocks,
mockClearAgentTypeEnqueued,
mockClearAllAgentTypeLocks,
} = vi.hoisted(() => ({
mockFailOrphanedRun: vi.fn().mockResolvedValue(null),
+ mockFailOrphanedRunFallback: vi.fn().mockResolvedValue(null),
mockClearWorkItemEnqueued: vi.fn(),
mockClearAllWorkItemLocks: vi.fn(),
mockClearAgentTypeEnqueued: vi.fn(),
@@ -24,6 +26,7 @@ const {
vi.mock('../../../src/db/repositories/runsRepository.js', () => ({
failOrphanedRun: (...args: unknown[]) => mockFailOrphanedRun(...args),
+ failOrphanedRunFallback: (...args: unknown[]) => mockFailOrphanedRunFallback(...args),
}));
vi.mock('../../../src/router/work-item-lock.js', () => ({
@@ -79,6 +82,8 @@ describe('active-workers', () => {
activeWorkers.clear();
mockFailOrphanedRun.mockReset();
mockFailOrphanedRun.mockResolvedValue(null);
+ mockFailOrphanedRunFallback.mockReset();
+ mockFailOrphanedRunFallback.mockResolvedValue(null);
mockClearWorkItemEnqueued.mockClear();
mockClearAgentTypeEnqueued.mockClear();
});
@@ -185,6 +190,8 @@ describe('active-workers', () => {
'proj-1',
'card-1',
'Worker crashed with exit code 1',
+ 'failed',
+ expect.any(Number),
);
});
@@ -231,5 +238,61 @@ describe('active-workers', () => {
cleanupWorker('job-no-agent', 1);
expect(mockClearWorkItemEnqueued).not.toHaveBeenCalled();
});
+
+ it('calls failOrphanedRunFallback when no workItemId but projectId exists', () => {
+ mockFailOrphanedRunFallback.mockResolvedValue('run-fallback');
+ const startedAt = new Date();
+ activeWorkers.set(
+ 'job-no-wi',
+ makeActiveWorker({
+ jobId: 'job-no-wi',
+ projectId: 'proj-1',
+ startedAt,
+ agentType: 'review',
+ // no workItemId
+ }),
+ );
+
+ cleanupWorker('job-no-wi', 1);
+ expect(mockFailOrphanedRunFallback).toHaveBeenCalledWith(
+ 'proj-1',
+ 'review',
+ startedAt,
+ 'failed',
+ 'Worker crashed with exit code 1',
+ expect.any(Number),
+ );
+ expect(mockFailOrphanedRun).not.toHaveBeenCalled();
+ });
+
+ it('calls failOrphanedRunFallback with undefined agentType when both absent', () => {
+ mockFailOrphanedRunFallback.mockResolvedValue('run-fallback2');
+ activeWorkers.set(
+ 'job-no-wi-no-agent',
+ makeActiveWorker({
+ jobId: 'job-no-wi-no-agent',
+ projectId: 'proj-1',
+ // no workItemId, no agentType
+ }),
+ );
+
+ cleanupWorker('job-no-wi-no-agent', 1);
+ expect(mockFailOrphanedRunFallback).toHaveBeenCalled();
+ expect(mockFailOrphanedRun).not.toHaveBeenCalled();
+ });
+
+ it('does NOT call either fail function when projectId is missing', () => {
+ activeWorkers.set(
+ 'job-no-proj',
+ makeActiveWorker({
+ jobId: 'job-no-proj',
+ // no projectId, no workItemId
+ }),
+ );
+
+ cleanupWorker('job-no-proj', 1);
+ expect(mockFailOrphanedRun).not.toHaveBeenCalled();
+ expect(mockFailOrphanedRunFallback).not.toHaveBeenCalled();
+ });
});
});
diff --git a/tests/unit/router/container-manager.test.ts b/tests/unit/router/container-manager.test.ts
index fb29adac..bfae019d 100644
--- a/tests/unit/router/container-manager.test.ts
+++ b/tests/unit/router/container-manager.test.ts
@@ -4,13 +4,17 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
// Hoisted mock state — vi.hoisted creates variables before vi.mock factories run
// ---------------------------------------------------------------------------
-const { mockDockerCreateContainer, mockDockerGetContainer, mockDockerListContainers } = vi.hoisted(
- () => ({
- mockDockerCreateContainer: vi.fn(),
- mockDockerGetContainer: vi.fn(),
- mockDockerListContainers: vi.fn(),
- }),
-);
+const {
+ mockDockerCreateContainer,
+ mockDockerGetContainer,
+ mockDockerListContainers,
+ mockLoadProjectConfig,
+} = vi.hoisted(() => ({
+ mockDockerCreateContainer: vi.fn(),
+ mockDockerGetContainer: vi.fn(),
+ mockDockerListContainers: vi.fn(),
+ mockLoadProjectConfig: vi.fn().mockResolvedValue({ projects: [], fullProjects: [] }),
+}));
// ---------------------------------------------------------------------------
// Module-level mocks
@@ -34,8 +38,10 @@ vi.mock('../../../src/config/provider.js', () => ({
}));
const mockFailOrphanedRun = vi.fn().mockResolvedValue(null);
+const mockFailOrphanedRunFallback = vi.fn().mockResolvedValue(null);
vi.mock('../../../src/db/repositories/runsRepository.js', () => ({
failOrphanedRun: (...args: unknown[]) => mockFailOrphanedRun(...args),
+ failOrphanedRunFallback: (...args: unknown[]) => mockFailOrphanedRunFallback(...args),
}));
vi.mock('../../../src/config/configCache.js', () => ({
@@ -70,6 +76,7 @@ vi.mock('../../../src/router/config.js', () => ({
workerTimeoutMs: 5000,
dockerNetwork: 'test-network',
},
+ loadProjectConfig: (...args: unknown[]) => mockLoadProjectConfig(...args),
}));
// ---------------------------------------------------------------------------
@@ -238,6 +245,7 @@ describe('spawnWorker', () => {
vi.spyOn(console, 'warn').mockImplementation(() => {});
vi.spyOn(console, 'error').mockImplementation(() => {});
mockGetAllProjectCredentials.mockResolvedValue({});
+ mockLoadProjectConfig.mockResolvedValue({ projects: [], fullProjects: [] });
detachAll();
});
@@ -296,6 +304,52 @@ describe('spawnWorker', () => {
expect(getActiveWorkerCount()).toBe(countBefore);
});
+
+ it('includes cascade.project.id label in container config', async () => {
+ const { resolveWait } = setupMockContainer();
+
+ await spawnWorker(
+ makeJob({
+ id: 'job-label',
+ data: { type: 'trello', projectId: 'proj-42' } as CascadeJob,
+ }) as never,
+ );
+
+ expect(mockDockerCreateContainer).toHaveBeenCalledWith(
+ expect.objectContaining({
+ Labels: expect.objectContaining({
+ 'cascade.project.id': 'proj-42',
+ 'cascade.managed': 'true',
+ 'cascade.agent.type': '',
+ }),
+ }),
+ );
+
+ resolveWait();
+ });
+
+ it('uses project watchdogTimeoutMs + 2min buffer when available', async () => {
+ mockLoadProjectConfig.mockResolvedValue({
+ projects: [],
+ fullProjects: [{ id: 'proj-1', watchdogTimeoutMs: 10000 }],
+ });
+ vi.useFakeTimers();
+ const { container, resolveWait } = setupMockContainer();
+
+ await spawnWorker(makeJob() as never);
+
+ // At watchdogTimeoutMs + 2min - 1ms: should NOT yet have triggered kill
+ vi.advanceTimersByTime(10000 + 2 * 60 * 1000 - 1);
+ expect(container.stop).not.toHaveBeenCalled();
+
+ // One more ms: should trigger killWorker → container.stop
+ await vi.advanceTimersByTimeAsync(1);
+ expect(container.stop).toHaveBeenCalled();
+
+ resolveWait();
+ vi.useRealTimers();
+ mockLoadProjectConfig.mockResolvedValue({ projects: [], fullProjects: [] });
+ });
});
// ---------------------------------------------------------------------------
@@ -307,6 +361,9 @@ describe('killWorker', () => {
vi.spyOn(console, 'log').mockImplementation(() => {});
vi.spyOn(console, 'warn').mockImplementation(() => {});
mockGetAllProjectCredentials.mockResolvedValue({});
+ mockLoadProjectConfig.mockResolvedValue({ projects: [], fullProjects: [] });
+ mockFailOrphanedRun.mockResolvedValue(null);
+ mockFailOrphanedRunFallback.mockResolvedValue(null);
mockNotifyTimeout.mockResolvedValue(undefined);
detachAll();
});
@@ -356,6 +413,61 @@ describe('killWorker', () => {
resolveWait();
});
+
+ it('calls failOrphanedRunFallback on kill when worker has no workItemId', async () => {
+ mockFailOrphanedRunFallback.mockResolvedValue('run-kill-fallback');
+ const { resolveWait } = setupMockContainer();
+
+ // Default job: projectId='proj-1', no workItemId
+ await spawnWorker(makeJob({ id: 'job-kill-fallback' }) as never);
+ await killWorker('job-kill-fallback');
+
+ // Fire-and-forget — flush microtasks
+ await new Promise((r) => setTimeout(r, 10));
+ expect(mockFailOrphanedRunFallback).toHaveBeenCalledWith(
+ 'proj-1',
+ undefined, // no agentType on default job
+ expect.any(Date),
+ 'timed_out',
+ 'Router timeout',
+ expect.any(Number),
+ );
+ // Verify no double-call (cleanupWorker must NOT also trigger a DB update)
+ expect(mockFailOrphanedRunFallback).toHaveBeenCalledTimes(1);
+
+ resolveWait();
+ });
+
+ it('calls failOrphanedRun with timed_out on kill when worker has workItemId', async () => {
+ mockFailOrphanedRun.mockResolvedValue('run-kill-wi');
+ const { resolveWait } = setupMockContainer();
+
+ await spawnWorker(
+ makeJob({
+ id: 'job-kill-wi',
+ data: {
+ type: 'trello',
+ projectId: 'proj-1',
+ workItemId: 'card-1',
+ } as CascadeJob,
+ }) as never,
+ );
+ await killWorker('job-kill-wi');
+
+ // Fire-and-forget — flush microtasks
+ await new Promise((r) => setTimeout(r, 10));
+ expect(mockFailOrphanedRun).toHaveBeenCalledWith(
+ 'proj-1',
+ 'card-1',
+ 'Router timeout',
+ 'timed_out',
+ expect.any(Number),
+ );
+ // Verify no double-call (cleanupWorker must NOT also trigger a DB update)
+ expect(mockFailOrphanedRun).toHaveBeenCalledTimes(1);
+
+ resolveWait();
+ });
});
// ---------------------------------------------------------------------------
@@ -365,7 +477,10 @@ describe('killWorker', () => {
describe('cleanupWorker', () => {
beforeEach(() => {
vi.spyOn(console, 'log').mockImplementation(() => {});
- mockFailOrphanedRun.mockClear();
+ mockGetAllProjectCredentials.mockResolvedValue({});
+ mockLoadProjectConfig.mockResolvedValue({ projects: [], fullProjects: [] });
+ mockFailOrphanedRun.mockResolvedValue(null);
+ mockFailOrphanedRunFallback.mockResolvedValue(null);
detachAll();
});
@@ -420,6 +535,8 @@ describe('cleanupWorker', () => {
'proj-1',
'card-1',
'Worker crashed with exit code 1',
+ 'failed',
+ expect.any(Number),
);
resolveWait();
@@ -467,6 +584,8 @@ describe('cleanupWorker', () => {
'proj-1',
'card-1',
'Worker crashed with exit code 1',
+ 'failed',
+ expect.any(Number),
);
resolveWait();
@@ -497,6 +616,7 @@ describe('detachAll', () => {
beforeEach(() => {
vi.spyOn(console, 'log').mockImplementation(() => {});
mockGetAllProjectCredentials.mockResolvedValue({});
+ mockLoadProjectConfig.mockResolvedValue({ projects: [], fullProjects: [] });
detachAll();
});
@@ -545,6 +665,7 @@ describe('orphan cleanup', () => {
vi.spyOn(console, 'info').mockImplementation(() => {});
vi.spyOn(console, 'error').mockImplementation(() => {});
mockGetAllProjectCredentials.mockResolvedValue({});
+ mockLoadProjectConfig.mockResolvedValue({ projects: [], fullProjects: [] });
mockDockerListContainers.mockResolvedValue([]);
detachAll();
});
diff --git a/tests/unit/router/orphan-cleanup.test.ts b/tests/unit/router/orphan-cleanup.test.ts
index 986c6810..020ecb42 100644
--- a/tests/unit/router/orphan-cleanup.test.ts
+++ b/tests/unit/router/orphan-cleanup.test.ts
@@ -4,10 +4,12 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
// Hoisted mock state — vi.hoisted creates variables before vi.mock factories run
// ---------------------------------------------------------------------------
-const { mockDockerGetContainer, mockDockerListContainers } = vi.hoisted(() => ({
- mockDockerGetContainer: vi.fn(),
- mockDockerListContainers: vi.fn(),
-}));
+const { mockDockerGetContainer, mockDockerListContainers, mockFailOrphanedRunFallback } =
+ vi.hoisted(() => ({
+ mockDockerGetContainer: vi.fn(),
+ mockDockerListContainers: vi.fn(),
+ mockFailOrphanedRunFallback: vi.fn().mockResolvedValue(null),
+ }));
// ---------------------------------------------------------------------------
// Module-level mocks
@@ -20,6 +22,10 @@ vi.mock('dockerode', () => ({
})),
}));
+vi.mock('../../../src/db/repositories/runsRepository.js', () => ({
+ failOrphanedRunFallback: (...args: unknown[]) => mockFailOrphanedRunFallback(...args),
+}));
+
vi.mock('../../../src/sentry.js', () => ({
captureException: vi.fn(),
}));
@@ -63,6 +69,8 @@ describe('orphan-cleanup', () => {
vi.spyOn(console, 'error').mockImplementation(() => {});
mockDockerListContainers.mockResolvedValue([]);
mockTrackedIds.clear();
+ mockFailOrphanedRunFallback.mockClear();
+ mockFailOrphanedRunFallback.mockResolvedValue(null);
});
afterEach(() => {
@@ -243,6 +251,153 @@ describe('orphan-cleanup', () => {
expect(mockContainer2.stop).toHaveBeenCalledWith({ t: 15 });
});
+ it('calls failOrphanedRunFallback when container has cascade.project.id label', async () => {
+ const orphanContainerId = 'orphan-with-project';
+ const now = Math.floor(Date.now() / 1000);
+ const createdAt = now - 6; // old enough
+
+ const mockOrphanContainer = {
+ stop: vi.fn().mockResolvedValue(undefined),
+ };
+ mockDockerListContainers.mockResolvedValue([
+ {
+ Id: orphanContainerId,
+ Created: createdAt,
+ Labels: { 'cascade.project.id': 'proj-1' },
+ State: 'running',
+ } as never,
+ ]);
+ mockDockerGetContainer.mockReturnValue(mockOrphanContainer as never);
+ mockFailOrphanedRunFallback.mockResolvedValue('run-orphan-1');
+
+ await scanAndCleanupOrphans();
+ // Fire-and-forget — flush microtasks
+ await new Promise((r) => setTimeout(r, 10));
+
+ expect(mockFailOrphanedRunFallback).toHaveBeenCalledWith(
+ 'proj-1',
+ undefined,
+ expect.any(Date),
+ 'failed',
+ 'Orphan cleanup: container stopped',
+ expect.any(Number),
+ );
+ });
+
+ it('does NOT call failOrphanedRunFallback when container has no cascade.project.id label', async () => {
+ const orphanContainerId = 'orphan-no-label';
+ const now = Math.floor(Date.now() / 1000);
+ const createdAt = now - 6;
+
+ const mockOrphanContainer = {
+ stop: vi.fn().mockResolvedValue(undefined),
+ };
+ mockDockerListContainers.mockResolvedValue([
+ {
+ Id: orphanContainerId,
+ Created: createdAt,
+ State: 'running',
+ } as never,
+ ]);
+ mockDockerGetContainer.mockReturnValue(mockOrphanContainer as never);
+
+ await scanAndCleanupOrphans();
+ await new Promise((r) => setTimeout(r, 10));
+
+ expect(mockFailOrphanedRunFallback).not.toHaveBeenCalled();
+ });
+
+ it('does NOT call failOrphanedRunFallback when cascade.project.id label is empty string', async () => {
+ const orphanContainerId = 'orphan-empty-label';
+ const now = Math.floor(Date.now() / 1000);
+ const createdAt = now - 6;
+
+ const mockOrphanContainer = {
+ stop: vi.fn().mockResolvedValue(undefined),
+ };
+ mockDockerListContainers.mockResolvedValue([
+ {
+ Id: orphanContainerId,
+ Created: createdAt,
+ Labels: { 'cascade.project.id': '' }, // empty → falsy
+ State: 'running',
+ } as never,
+ ]);
+ mockDockerGetContainer.mockReturnValue(mockOrphanContainer as never);
+
+ await scanAndCleanupOrphans();
+ await new Promise((r) => setTimeout(r, 10));
+
+ expect(mockFailOrphanedRunFallback).not.toHaveBeenCalled();
+ });
+
+ it('passes cascade.agent.type label as agentType to failOrphanedRunFallback', async () => {
+ const orphanContainerId = 'orphan-with-agent-type';
+ const now = Math.floor(Date.now() / 1000);
+ const createdAt = now - 6;
+
+ const mockOrphanContainer = {
+ stop: vi.fn().mockResolvedValue(undefined),
+ };
+ mockDockerListContainers.mockResolvedValue([
+ {
+ Id: orphanContainerId,
+ Created: createdAt,
+ Labels: {
+ 'cascade.project.id': 'proj-2',
+ 'cascade.agent.type': 'review',
+ },
+ State: 'running',
+ } as never,
+ ]);
+ mockDockerGetContainer.mockReturnValue(mockOrphanContainer as never);
+ mockFailOrphanedRunFallback.mockResolvedValue('run-agent-type');
+
+ await scanAndCleanupOrphans();
+ await new Promise((r) => setTimeout(r, 10));
+
+ expect(mockFailOrphanedRunFallback).toHaveBeenCalledWith(
+ 'proj-2',
+ 'review',
+ expect.any(Date),
+ 'failed',
+ 'Orphan cleanup: container stopped',
+ expect.any(Number),
+ );
+ });
+
+ it('passes undefined agentType when cascade.agent.type label is empty or absent', async () => {
+ const orphanContainerId = 'orphan-no-agent-type';
+ const now = Math.floor(Date.now() / 1000);
+ const createdAt = now - 6;
+
+ const mockOrphanContainer = {
+ stop: vi.fn().mockResolvedValue(undefined),
+ };
+ mockDockerListContainers.mockResolvedValue([
+ {
+ Id: orphanContainerId,
+ Created: createdAt,
+ Labels: { 'cascade.project.id': 'proj-3', 'cascade.agent.type': '' },
+ State: 'running',
+ } as never,
+ ]);
+ mockDockerGetContainer.mockReturnValue(mockOrphanContainer as never);
+ mockFailOrphanedRunFallback.mockResolvedValue(null);
+
+ await scanAndCleanupOrphans();
+ await new Promise((r) => setTimeout(r, 10));
+
+ expect(mockFailOrphanedRunFallback).toHaveBeenCalledWith(
+ 'proj-3',
+ undefined, // empty string coerced to undefined
+ expect.any(Date),
+ 'failed',
+ 'Orphan cleanup: container stopped',
+ expect.any(Number),
+ );
+ });
+
it('stops orphans but leaves tracked and young containers', async () => {
const trackedId = 'container-tracked-123';
mockTrackedIds.add(trackedId);
From cea359c3d69a9893aae016f009ddc922c02c83ef Mon Sep 17 00:00:00 2001
From: Zbigniew Sobiecki
Date: Mon, 16 Mar 2026 16:31:44 +0000
Subject: [PATCH 5/5] ci: trigger CI run