Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/agents/prompts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { fileURLToPath } from 'node:url';
import { Eta } from 'eta';

import { resolveKnownAgentTypes } from '../definitions/index.js';
import { loadAgentDefinition } from '../definitions/loader.js';

const __dirname = dirname(fileURLToPath(import.meta.url));
const templatesDir = join(__dirname, 'templates');
Expand Down Expand Up @@ -231,6 +232,20 @@ export function renderInlineTaskPrompt(
return taskEta.renderString(expanded, context);
}

/**
* Returns the YAML-defined taskPrompt for an agent type (the factory default).
* Does not require initPrompts() — reads directly from YAML.
* Returns null if the agent type is unknown or has no taskPrompt defined.
*/
export function getDefaultTaskPrompt(agentType: string): string | null {
try {
const definition = loadAgentDefinition(agentType);
return definition.prompts.taskPrompt ?? null;
} catch {
return null;
}
}

/** Returns the raw .eta template source from disk (before rendering). */
export function getRawTemplate(agentType: string): string {
requireInitialized('getRawTemplate');
Expand Down
10 changes: 9 additions & 1 deletion src/api/routers/agentConfigs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ import { TRPCError } from '@trpc/server';
import { eq } from 'drizzle-orm';
import { z } from 'zod';
import { resolveAgentDefinition } from '../../agents/definitions/index.js';
import { getRawTemplate, validateTemplate } from '../../agents/prompts/index.js';
import {
getDefaultTaskPrompt,
getRawTemplate,
validateTemplate,
} from '../../agents/prompts/index.js';
import { getEngineCatalog, registerBuiltInEngines } from '../../backends/index.js';
import { EngineSettingsSchema } from '../../config/engineSettings.js';
import { getDb } from '../../db/client.js';
Expand Down Expand Up @@ -178,12 +182,16 @@ export const agentConfigsRouter = router({
// No .eta template on disk — skip gracefully
}

// 4. YAML-defined task prompt (factory default)
const defaultTaskPrompt = getDefaultTaskPrompt(input.agentType);

return {
projectSystemPrompt,
projectTaskPrompt,
globalSystemPrompt,
globalTaskPrompt,
defaultSystemPrompt,
defaultTaskPrompt,
};
}),
});
48 changes: 47 additions & 1 deletion src/db/repositories/runsRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ export async function failOrphanedRun(
projectId: string,
workItemId: string,
reason: string,
status: 'failed' | 'timed_out' = 'failed',
durationMs?: number,
): Promise<string | null> {
const db = getDb();
const [row] = await db
Expand All @@ -377,9 +379,53 @@ export async function failOrphanedRun(
const [updated] = await db
.update(agentRuns)
.set({
status: 'failed',
status,
completedAt: new Date(),
error: reason,
durationMs,
})
.where(and(eq(agentRuns.id, row.id), eq(agentRuns.status, 'running')))
.returning({ id: agentRuns.id });
return updated?.id ?? null;
}

/**
* Fail the most recent running run for a project without a workItemId (e.g. GitHub PR runs).
* Uses projectId + optional agentType + startedAfter to identify the run.
* Guards on status='running' so it's safe to call even if the run already completed.
*/
export async function failOrphanedRunFallback(
projectId: string,
agentType: string | undefined,
startedAfter: Date,
status: 'failed' | 'timed_out',
reason: string,
durationMs?: number,
): Promise<string | null> {
const db = getDb();
const conditions: SQL[] = [
eq(agentRuns.projectId, projectId),
eq(agentRuns.status, 'running'),
gte(agentRuns.startedAt, startedAfter),
];
if (agentType) {
conditions.push(eq(agentRuns.agentType, agentType));
}
const [row] = await db
.select({ id: agentRuns.id })
.from(agentRuns)
.where(and(...conditions))
.orderBy(desc(agentRuns.startedAt))
.limit(1);
if (!row) return null;

const [updated] = await db
.update(agentRuns)
.set({
status,
completedAt: new Date(),
error: reason,
durationMs,
})
.where(and(eq(agentRuns.id, row.id), eq(agentRuns.status, 'running')))
.returning({ id: agentRuns.id });
Expand Down
57 changes: 35 additions & 22 deletions src/router/active-workers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Tracks running worker containers and handles cleanup of their associated locks.
*/

import { failOrphanedRun } from '../db/repositories/runsRepository.js';
import { failOrphanedRun, failOrphanedRunFallback } from '../db/repositories/runsRepository.js';
import { logger } from '../utils/logging.js';
import { clearAgentTypeEnqueued } from './agent-type-lock.js';
import type { CascadeJob } from './queue.js';
Expand Down Expand Up @@ -46,7 +46,9 @@ export function getActiveWorkers(): Array<{ jobId: string; startedAt: Date }> {

/**
* Clean up worker tracking state (timeout handle + map entry).
* When exitCode is non-zero, marks the corresponding DB run as failed (fire-and-forget).
* When exitCode is non-zero, marks the DB run as 'failed' — crash path only.
* The timeout path (killWorker) handles its own 'timed_out' DB update and calls
* cleanupWorker without an exitCode so this block is skipped.
*/
export function cleanupWorker(jobId: string, exitCode?: number): void {
const worker = activeWorkers.get(jobId);
Expand All @@ -58,29 +60,40 @@ export function cleanupWorker(jobId: string, exitCode?: number): void {
if (worker.projectId && worker.workItemId && worker.agentType) {
clearWorkItemEnqueued(worker.projectId, worker.workItemId, worker.agentType);
}
if (worker.projectId && worker.workItemId) {
if (exitCode !== undefined && exitCode !== 0) {
failOrphanedRun(
worker.projectId,
worker.workItemId,
`Worker crashed with exit code ${exitCode}`,
)
.then((runId) => {
if (runId) {
logger.info('[WorkerManager] Marked orphaned run as failed:', {
jobId,
runId,
exitCode,
});
}
})
.catch((err) => {
logger.error('[WorkerManager] Failed to mark orphaned run:', {
if (exitCode !== undefined && exitCode !== 0 && worker.projectId) {
const durationMs = Date.now() - worker.startedAt.getTime();
const updatePromise = worker.workItemId
? failOrphanedRun(
worker.projectId,
worker.workItemId,
`Worker crashed with exit code ${exitCode}`,
'failed',
durationMs,
)
: failOrphanedRunFallback(
worker.projectId,
worker.agentType,
worker.startedAt,
'failed',
`Worker crashed with exit code ${exitCode}`,
durationMs,
);
updatePromise
.then((runId) => {
if (runId) {
logger.info('[WorkerManager] Marked orphaned run as failed:', {
jobId,
error: String(err),
runId,
exitCode,
});
}
})
.catch((err) => {
logger.error('[WorkerManager] Failed to mark orphaned run:', {
jobId,
error: String(err),
});
}
});
}
activeWorkers.delete(jobId);
logger.info('[WorkerManager] Worker cleaned up:', {
Expand Down
71 changes: 65 additions & 6 deletions src/router/container-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@

import type { Job } from 'bullmq';
import Docker from 'dockerode';
import { failOrphanedRun, failOrphanedRunFallback } from '../db/repositories/runsRepository.js';
import { captureException } from '../sentry.js';
import { logger } from '../utils/logging.js';
import { activeWorkers, cleanupWorker } from './active-workers.js';
import { clearAllAgentTypeLocks } from './agent-type-lock.js';
import { routerConfig } from './config.js';
import { loadProjectConfig, routerConfig } from './config.js';
import { notifyTimeout } from './notifications.js';
import { stopOrphanCleanup } from './orphan-cleanup.js';
import type { CascadeJob } from './queue.js';
Expand Down Expand Up @@ -48,6 +49,9 @@ export {

const docker = new Docker();

/** Buffer added on top of the in-container watchdog so the router kill is always a backstop. */
const ROUTER_KILL_BUFFER_MS = 2 * 60 * 1000;

/**
* Spawn a worker container for a job.
* Sets up timeout tracking and monitors container exit asynchronously.
Expand All @@ -61,6 +65,22 @@ export async function spawnWorker(job: Job<CascadeJob>): Promise<void> {
const workerEnv = await buildWorkerEnvWithProjectId(job, projectId);
const hasCredentials = workerEnv.some((e) => e.startsWith('CASCADE_CREDENTIAL_KEYS='));

// Extract agentType early so it can be included in container labels
// (needed by orphan cleanup to narrow DB fallback queries to the right agent type)
const agentType = extractAgentType(job.data);

// Determine container timeout: use project's watchdogTimeoutMs + buffer if available,
// falling back to the global workerTimeoutMs. This makes watchdogTimeoutMs the single source
// of truth — the in-container watchdog fires first, router kill is a backup.
let containerTimeoutMs = routerConfig.workerTimeoutMs;
if (projectId) {
const { fullProjects } = await loadProjectConfig();
const projectCfg = fullProjects.find((p) => p.id === projectId);
if (projectCfg?.watchdogTimeoutMs) {
containerTimeoutMs = projectCfg.watchdogTimeoutMs + ROUTER_KILL_BUFFER_MS;
}
}

logger.info('[WorkerManager] Spawning worker:', {
jobId,
type: job.data.type,
Expand All @@ -83,12 +103,14 @@ export async function spawnWorker(job: Job<CascadeJob>): Promise<void> {
'cascade.job.id': jobId,
'cascade.job.type': job.data.type,
'cascade.managed': 'true',
'cascade.project.id': projectId ?? '',
'cascade.agent.type': agentType ?? '',
},
});

await container.start();

// Set up timeout
// Set up timeout — fires at watchdogTimeoutMs + 2min (router backup kill)
const startedAt = new Date();
const timeoutHandle = setTimeout(() => {
const durationMs = Date.now() - startedAt.getTime();
Expand All @@ -104,11 +126,10 @@ export async function spawnWorker(job: Job<CascadeJob>): Promise<void> {
killWorker(jobId).catch((err) => {
logger.error('[WorkerManager] Failed to kill timed-out worker:', err);
});
}, routerConfig.workerTimeoutMs);
}, containerTimeoutMs);

// Track the worker
const workItemId = extractWorkItemId(job.data);
const agentType = extractAgentType(job.data);
activeWorkers.set(jobId, {
containerId: container.id,
jobId,
Expand Down Expand Up @@ -203,8 +224,45 @@ export async function killWorker(jobId: string): Promise<void> {
});
}

// Send timeout notification (fire-and-forget)
const durationMs = Date.now() - worker.startedAt.getTime();

// Update DB run status to timed_out (fire-and-forget, no-op if watchdog already did it).
// cleanupWorker is called below without an exitCode so it skips its own DB update,
// avoiding a race where the wrong status ('failed') could win.
if (worker.projectId) {
const dbUpdate = worker.workItemId
? failOrphanedRun(
worker.projectId,
worker.workItemId,
'Router timeout',
'timed_out',
durationMs,
)
: failOrphanedRunFallback(
worker.projectId,
worker.agentType,
worker.startedAt,
'timed_out',
'Router timeout',
durationMs,
);
dbUpdate
.then((runId) => {
if (runId)
logger.info('[WorkerManager] Marked run timed_out after router kill', {
jobId,
runId,
});
})
.catch((err) =>
logger.error('[WorkerManager] DB update failed after router kill', {
jobId,
error: String(err),
}),
);
}

// Send timeout notification (fire-and-forget)
notifyTimeout(worker.job, {
jobId: worker.jobId,
startedAt: worker.startedAt,
Expand All @@ -213,7 +271,8 @@ export async function killWorker(jobId: string): Promise<void> {
logger.error('[WorkerManager] Timeout notification error:', String(err));
});

cleanupWorker(jobId, 137);
// No exitCode — DB update is handled above with the correct 'timed_out' status
cleanupWorker(jobId);
}

/**
Expand Down
32 changes: 32 additions & 0 deletions src/router/orphan-cleanup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import Docker from 'dockerode';
import { failOrphanedRunFallback } from '../db/repositories/runsRepository.js';
import { captureException } from '../sentry.js';
import { logger } from '../utils/logging.js';
import { getTrackedContainerIds } from './active-workers.js';
Expand Down Expand Up @@ -108,6 +109,37 @@ export async function scanAndCleanupOrphans(): Promise<void> {
containerId: containerId.slice(0, 12),
ageMinutes,
});

// Update DB run status (fire-and-forget). Containers created before this
// change won't have labels (projectId = '' → falsy) → skip, harmless.
const projectId = containerInfo.Labels?.['cascade.project.id'];
if (projectId) {
const containerCreatedAt = new Date(containerInfo.Created * 1000);
const orphanDurationMs = now - containerInfo.Created * 1000;
// agentType narrows the fallback query when multiple agent types run concurrently
const orphanAgentType = containerInfo.Labels?.['cascade.agent.type'] || undefined;
failOrphanedRunFallback(
projectId,
orphanAgentType,
containerCreatedAt,
'failed',
'Orphan cleanup: container stopped',
orphanDurationMs,
)
.then((runId) => {
if (runId)
logger.info('[WorkerManager] Marked orphaned run as failed after cleanup', {
containerId: containerId.slice(0, 12),
runId,
});
})
.catch((err) =>
logger.error('[WorkerManager] DB update failed after orphan cleanup', {
containerId: containerId.slice(0, 12),
error: String(err),
}),
);
}
} catch (err) {
// Container might already be stopped — log but continue
logger.warn('[WorkerManager] Error stopping orphaned container:', {
Expand Down
Loading
Loading