From 1e2c924265ae22b0d095202373be179db7875e4b Mon Sep 17 00:00:00 2001 From: Cascade Bot Date: Mon, 23 Feb 2026 16:52:05 +0000 Subject: [PATCH] refactor: unify duplicated agent execution lifecycles into shared pipeline --- src/agents/shared/executionPipeline.ts | 274 +++++++++ src/agents/shared/lifecycle.ts | 274 +++------ src/backends/adapter.ts | 240 +++----- .../agents/shared/executionPipeline.test.ts | 524 ++++++++++++++++++ tests/unit/backends/adapter.test.ts | 3 +- 5 files changed, 978 insertions(+), 337 deletions(-) create mode 100644 src/agents/shared/executionPipeline.ts create mode 100644 tests/unit/agents/shared/executionPipeline.test.ts diff --git a/src/agents/shared/executionPipeline.ts b/src/agents/shared/executionPipeline.ts new file mode 100644 index 00000000..876a5366 --- /dev/null +++ b/src/agents/shared/executionPipeline.ts @@ -0,0 +1,274 @@ +import { captureException } from '../../sentry.js'; +import type { AgentResult } from '../../types/index.js'; +import { loadCascadeEnv, unloadCascadeEnv } from '../../utils/cascadeEnv.js'; +import { createFileLogger } from '../../utils/fileLogger.js'; +import { setWatchdogCleanup } from '../../utils/lifecycle.js'; +import { logger } from '../../utils/logging.js'; +import { setupRemoteSquintDb } from '../../utils/squintDb.js'; +import { createAgentLogger } from '../utils/logging.js'; +import { cleanupAgentResources } from './cleanup.js'; +import type { RunTrackingInput } from './runTracking.js'; +import { tryCreateRun } from './runTracking.js'; + +export type FileLogger = ReturnType; +export type AgentLogger = ReturnType; + +/** + * A LogWriter that writes to both the file logger and the structured logger. + */ +export type LogWriter = (level: string, message: string, context?: Record) => void; + +/** + * Creates a LogWriter that forwards to both the file logger and the structured logger. + */ +export function createLogWriter(fileLogger: FileLogger): LogWriter { + return (level: string, message: string, context?: Record) => { + fileLogger.write(level, message, context); + const logFn = + level === 'ERROR' + ? logger.error + : level === 'WARN' + ? logger.warn + : level === 'DEBUG' + ? logger.debug + : logger.info; + logFn.call(logger, message, context); + }; +} + +/** + * Context passed to the execute callback. + */ +export interface PipelineContext { + repoDir: string; + fileLogger: FileLogger; + logWriter: LogWriter; + runId: string | undefined; + /** + * Update the pipeline's runId. Call this when the execute callback creates + * the run record itself (e.g., after resolving model/maxIterations). + * The updated runId is used for finalizeRun and in the error path. + */ + setRunId: (id: string) => void; +} + +/** + * Result returned by the execute callback. + */ +export interface ExecutionResult { + success: boolean; + output: string; + error?: string; + cost?: number; + prUrl?: string; + progressCommentId?: string; + /** Log buffer from the execution, if available from the execute callback */ + logBuffer?: Buffer; + /** + * Additional metadata to pass to finalizeRun. + * Useful for backend-specific finalization fields (e.g., llmIterations, gadgetCalls). + */ + finalizeMetadata?: Record; +} + +/** + * Options for the shared agent execution pipeline. + */ +export interface AgentPipelineOptions { + /** + * Identifier for log file naming (e.g., "review-42", "ci-42"). + */ + loggerIdentifier: string; + + /** + * Set up the working directory (clone repo, etc.). + */ + setupRepoDir: (log: AgentLogger) => Promise; + + /** + * Run tracking configuration. When set, creates a DB run record before execution. + */ + runTracking?: RunTrackingInput & { model?: string; maxIterations?: number }; + + /** + * Remote Squint DB URL for projects that don't commit .squint.db. + */ + squintDbUrl?: string; + + /** + * Whether the repoDir was pre-existing (skip deletion on cleanup). + * When true, skips temp dir deletion in cleanup. + */ + skipRepoDeletion?: boolean; + + /** + * The backend-specific execution step. + * Receives the pipeline context and returns the execution result. + * The pipeline handles CWD change/restore around this callback. + */ + execute: (ctx: PipelineContext) => Promise; + + /** + * Called when the watchdog timer expires. + * FileLogger is already closed when this is invoked. + * Runs inside the watchdog cleanup — keep it fast and non-throwing. + */ + onWatchdogTimeout?: (fileLogger: FileLogger, runId?: string) => Promise; + + /** + * Finalize the run record (store logs, mark complete). + * Called with the outcome of execution (success or error). + */ + finalizeRun: ( + runId: string | undefined, + fileLogger: FileLogger, + outcome: FinalizeRunOutcome, + ) => Promise; +} + +/** + * Outcome passed to finalizeRun. + */ +export interface FinalizeRunOutcome { + status: 'completed' | 'failed' | 'timed_out'; + durationMs: number; + success: boolean; + error?: string; + costUsd?: number; + prUrl?: string; + outputSummary?: string; + /** Additional backend-specific metadata (e.g., llmIterations, gadgetCalls for llmist) */ + metadata?: Record; +} + +/** + * Shared agent execution scaffold used by both the llmist lifecycle and + * the Claude Code backend adapter. + * + * Handles: FileLogger → Watchdog → Repo setup → Env snapshot → Squint DB → + * Run tracking → CWD change → Execute → Restore CWD → Finalize run → Cleanup. + * + * The only divergent step is the `execute` callback. + */ +export async function executeAgentPipeline(options: AgentPipelineOptions): Promise { + let repoDir: string | null = null; + let runId: string | undefined; + const startTime = Date.now(); + + const fileLogger = createFileLogger(`cascade-${options.loggerIdentifier}`); + const log = createAgentLogger(fileLogger); + const logWriter = createLogWriter(fileLogger); + + setWatchdogCleanup(async () => { + const durationMs = Date.now() - startTime; + captureException(new Error('Agent watchdog timeout'), { + tags: { source: 'watchdog_timeout', agent: options.loggerIdentifier }, + extra: { runId, durationMs }, + }); + fileLogger.close(); + await options.finalizeRun(runId, fileLogger, { + status: 'timed_out', + durationMs, + success: false, + error: 'Watchdog timeout', + }); + await options.onWatchdogTimeout?.(fileLogger, runId); + }); + + try { + repoDir = await options.setupRepoDir(log); + const envSnapshot = loadCascadeEnv(repoDir, log); + const squintCleanup = await setupRemoteSquintDb( + repoDir, + { squintDbUrl: options.squintDbUrl }, + log, + ); + + if (options.runTracking) { + runId = await tryCreateRun( + options.runTracking, + options.runTracking.model, + options.runTracking.maxIterations, + ); + } + + const originalCwd = process.cwd(); + process.chdir(repoDir); + + let result: ExecutionResult; + try { + result = await options.execute({ + repoDir, + fileLogger, + logWriter, + runId, + setRunId: (id) => { + runId = id; + }, + }); + } finally { + process.chdir(originalCwd); + squintCleanup?.(); + unloadCascadeEnv(envSnapshot); + } + + // runId may have been updated by setRunId() inside execute + const effectiveRunId = runId; + + fileLogger.close(); + const logBuffer = result.logBuffer ?? (await fileLogger.getZippedBuffer()); + + const durationMs = Date.now() - startTime; + await options.finalizeRun(effectiveRunId, fileLogger, { + status: result.success ? 'completed' : 'failed', + durationMs, + success: result.success, + error: result.error, + costUsd: result.cost, + prUrl: result.prUrl, + outputSummary: result.output.slice(0, 500), + metadata: result.finalizeMetadata, + }); + + return { + success: result.success, + output: result.output, + prUrl: result.prUrl, + progressCommentId: result.progressCommentId, + error: result.error, + cost: result.cost, + logBuffer: logBuffer ?? undefined, + runId: effectiveRunId, + durationMs, + }; + } catch (err) { + logger.error('Agent execution failed', { + identifier: options.loggerIdentifier, + error: String(err), + }); + captureException(err, { + tags: { source: 'agent_execution', agent: options.loggerIdentifier }, + extra: { runId, durationMs: Date.now() - startTime }, + }); + + let logBuffer: Buffer | undefined; + try { + fileLogger.close(); + logBuffer = await fileLogger.getZippedBuffer(); + } catch { + // Ignore log buffer errors + } + + const durationMs = Date.now() - startTime; + await options.finalizeRun(runId, fileLogger, { + status: 'failed', + durationMs, + success: false, + error: String(err), + }); + + return { success: false, output: '', error: String(err), logBuffer, runId, durationMs }; + } finally { + cleanupAgentResources(repoDir, fileLogger, options.skipRepoDeletion ?? false); + } +} diff --git a/src/agents/shared/lifecycle.ts b/src/agents/shared/lifecycle.ts index 6ef06947..8c5a2c2a 100644 --- a/src/agents/shared/lifecycle.ts +++ b/src/agents/shared/lifecycle.ts @@ -1,4 +1,5 @@ import fs from 'node:fs'; + import { LLMist, type ModelSpec, createLogger } from 'llmist'; import type { ProgressMonitor } from '../../backends/progressMonitor.js'; @@ -8,24 +9,24 @@ import { storeLlmCallsBulk, storeRunLogs, } from '../../db/repositories/runsRepository.js'; -import { addBreadcrumb, captureException } from '../../sentry.js'; +import { addBreadcrumb } from '../../sentry.js'; import type { AgentResult } from '../../types/index.js'; -import { loadCascadeEnv, unloadCascadeEnv } from '../../utils/cascadeEnv.js'; -import { createFileLogger } from '../../utils/fileLogger.js'; -import { setWatchdogCleanup } from '../../utils/lifecycle.js'; import { logger } from '../../utils/logging.js'; -import { setupRemoteSquintDb } from '../../utils/squintDb.js'; import { runAgentLoop } from '../utils/agentLoop.js'; import type { AccumulatedLlmCall } from '../utils/hooks.js'; import { getLogLevel } from '../utils/index.js'; import { createAgentLogger } from '../utils/logging.js'; -import { type TrackingContext, createTrackingContext } from '../utils/tracking.js'; +import { createTrackingContext } from '../utils/tracking.js'; import type { BuilderType } from './builderFactory.js'; -import { cleanupAgentResources } from './cleanup.js'; -import { type RunTrackingInput, tryCompleteRun, tryCreateRun } from './runTracking.js'; - -type FileLogger = ReturnType; -type AgentLogger = ReturnType; +import { + type AgentLogger, + type FileLogger, + type FinalizeRunOutcome, + type PipelineContext, + executeAgentPipeline, +} from './executionPipeline.js'; +import type { RunTrackingInput } from './runTracking.js'; +import { tryCompleteRun, tryCreateRun } from './runTracking.js'; export type { FileLogger, AgentLogger }; @@ -56,7 +57,7 @@ export interface ExecuteAgentOptions { client: LLMist; ctx: TContext; llmistLogger: ReturnType; - trackingContext: TrackingContext; + trackingContext: ReturnType; fileLogger: FileLogger; repoDir: string; progressMonitor: ProgressMonitor | null; @@ -69,7 +70,7 @@ export interface ExecuteAgentOptions { injectSyntheticCalls: (params: { builder: BuilderType; ctx: TContext; - trackingContext: TrackingContext; + trackingContext: ReturnType; repoDir: string; }) => Promise; @@ -172,11 +173,7 @@ async function tryStoreLogsAndCalls( } } -// ============================================================================ -// Run Finalization Helper -// ============================================================================ - -async function finalizeRun( +async function finalizeRunWithLlmCalls( runId: string | undefined, fileLogger: FileLogger, llmCallAccumulator: AccumulatedLlmCall[], @@ -188,37 +185,6 @@ async function finalizeRun( await tryCompleteRun(runId, input); } -function buildAgentResult( - result: Awaited>, - logBuffer: Buffer, - runId: string | undefined, - durationMs: number, - postProcess?: (output: string) => Partial, -): AgentResult { - if (result.loopTerminated) { - return { - success: false, - output: result.output, - error: 'Agent terminated due to persistent loop', - logBuffer, - cost: result.cost, - runId, - durationMs, - }; - } - - const postProcessed = postProcess?.(result.output) ?? {}; - return { - success: true, - output: result.output, - logBuffer, - cost: result.cost, - runId, - durationMs, - ...postProcessed, - }; -} - // ============================================================================ // Main Lifecycle // ============================================================================ @@ -230,68 +196,69 @@ function buildAgentResult( export async function executeAgentLifecycle( options: ExecuteAgentOptions, ): Promise { - let repoDir: string | null = null; - let runId: string | undefined; - const startTime = Date.now(); const llmCallAccumulator: AccumulatedLlmCall[] = []; - const fileLogger = createFileLogger(`cascade-${options.loggerIdentifier}`); - const log = createAgentLogger(fileLogger); - - setWatchdogCleanup(async () => { - const durationMs = Date.now() - startTime; - captureException(new Error('Agent watchdog timeout'), { - tags: { source: 'watchdog_timeout', agent: options.loggerIdentifier }, - extra: { runId, durationMs }, - }); - fileLogger.close(); - await finalizeRun( - runId, - fileLogger, - llmCallAccumulator, - { - status: 'timed_out', - durationMs, - success: false, - error: 'Watchdog timeout', - }, - !!runId, - ); - await options.onWatchdogTimeout(fileLogger, runId); - }); + // Build the finalizeRun callback with access to llmCallAccumulator + const buildFinalizeRun = + (finalizeRunFn: typeof finalizeRunWithLlmCalls) => + async ( + runId: string | undefined, + fileLogger: FileLogger, + outcome: FinalizeRunOutcome, + ): Promise => { + const meta = outcome.metadata as { llmIterations?: number; gadgetCalls?: number } | undefined; + + const completeInput: CompleteRunInput = { + status: outcome.status, + durationMs: outcome.durationMs, + success: outcome.success, + error: outcome.error, + costUsd: outcome.costUsd, + prUrl: outcome.prUrl, + outputSummary: outcome.outputSummary, + llmIterations: meta?.llmIterations, + gadgetCalls: meta?.gadgetCalls, + }; + await finalizeRunFn(runId, fileLogger, llmCallAccumulator, completeInput, !!runId); + }; - try { - repoDir = await options.setupRepoDir(log); - const envSnapshot = loadCascadeEnv(repoDir, log); - const squintCleanup = await setupRemoteSquintDb( - repoDir, - { squintDbUrl: options.squintDbUrl }, - log, - ); - - const ctx = await options.buildContext(repoDir, log); - - if (options.runTracking) { - runId = await tryCreateRun(options.runTracking, ctx.model, ctx.maxIterations); - } + return executeAgentPipeline({ + loggerIdentifier: options.loggerIdentifier, + setupRepoDir: options.setupRepoDir, + squintDbUrl: options.squintDbUrl, - const originalCwd = process.cwd(); - process.chdir(repoDir); + onWatchdogTimeout: async (fileLogger, runId) => { + await options.onWatchdogTimeout(fileLogger, runId); + }, - log.info('Starting llmist agent', { - model: ctx.model, - maxIterations: ctx.maxIterations, - promptLength: ctx.prompt.length, - runId, - }); + finalizeRun: buildFinalizeRun(finalizeRunWithLlmCalls), - addBreadcrumb({ - category: 'agent', - message: `Starting ${options.loggerIdentifier}`, - data: { model: ctx.model, maxIterations: ctx.maxIterations, runId }, - }); + execute: async (ctx: PipelineContext) => { + const { repoDir, fileLogger, setRunId } = ctx; + + const log = createAgentLogger(fileLogger); + const ctx_ = await options.buildContext(repoDir, log); + + // Create run record now that we have model and maxIterations + let runId: string | undefined; + if (options.runTracking) { + runId = await tryCreateRun(options.runTracking, ctx_.model, ctx_.maxIterations); + if (runId) setRunId(runId); + } + + log.info('Starting llmist agent', { + model: ctx_.model, + maxIterations: ctx_.maxIterations, + promptLength: ctx_.prompt.length, + runId, + }); + + addBreadcrumb({ + category: 'agent', + message: `Starting ${options.loggerIdentifier}`, + data: { model: ctx_.model, maxIterations: ctx_.maxIterations, runId }, + }); - try { process.env.LLMIST_LOG_FILE = fileLogger.llmistLogPath; process.env.LLMIST_LOG_TEE = 'true'; @@ -304,7 +271,7 @@ export async function executeAgentLifecycle( let builder = options.createBuilder({ client, - ctx, + ctx: ctx_, llmistLogger, trackingContext, fileLogger, @@ -313,9 +280,14 @@ export async function executeAgentLifecycle( llmCallAccumulator, runId, }); - builder = await options.injectSyntheticCalls({ builder, ctx, trackingContext, repoDir }); + builder = await options.injectSyntheticCalls({ + builder, + ctx: ctx_, + trackingContext, + repoDir, + }); - const agent = builder.ask(ctx.prompt); + const agent = builder.ask(ctx_.prompt); progressMonitor?.start(); let result: Awaited>; @@ -338,79 +310,17 @@ export async function executeAgentLifecycle( loopTerminated: result.loopTerminated ?? false, }); - fileLogger.close(); - const logBuffer = await fileLogger.getZippedBuffer(); - - const completionInput: CompleteRunInput = result.loopTerminated - ? { - status: 'failed', - durationMs: Date.now() - startTime, - llmIterations: result.iterations, - gadgetCalls: result.gadgetCalls, - costUsd: result.cost, - success: false, - error: 'Agent terminated due to persistent loop', - outputSummary: result.output.slice(0, 500), - } - : { - status: 'completed', - durationMs: Date.now() - startTime, - llmIterations: result.iterations, - gadgetCalls: result.gadgetCalls, - costUsd: result.cost, - success: true, - prUrl: options.postProcess?.(result.output)?.prUrl, - outputSummary: result.output.slice(0, 500), - }; - - await finalizeRun(runId, fileLogger, llmCallAccumulator, completionInput, !!runId); - - return buildAgentResult( - result, - logBuffer, - runId, - Date.now() - startTime, - options.postProcess, - ); - } finally { - process.chdir(originalCwd); - squintCleanup?.(); - unloadCascadeEnv(envSnapshot); - } - } catch (err) { - logger.error('Agent execution failed', { - identifier: options.loggerIdentifier, - error: String(err), - }); - captureException(err, { - tags: { source: 'agent_lifecycle', agent: options.loggerIdentifier }, - extra: { runId, durationMs: Date.now() - startTime }, - }); - - let logBuffer: Buffer | undefined; - try { - fileLogger.close(); - logBuffer = await fileLogger.getZippedBuffer(); - } catch { - // Ignore log buffer errors - } - - const durationMs = Date.now() - startTime; - await finalizeRun( - runId, - fileLogger, - llmCallAccumulator, - { - status: 'failed', - durationMs, - success: false, - error: String(err), - }, - !!runId, - ); - - return { success: false, output: '', error: String(err), logBuffer, runId, durationMs }; - } finally { - cleanupAgentResources(repoDir, fileLogger); - } + return { + success: !result.loopTerminated, + output: result.output, + error: result.loopTerminated ? 'Agent terminated due to persistent loop' : undefined, + cost: result.cost, + prUrl: options.postProcess?.(result.output)?.prUrl, + finalizeMetadata: { + llmIterations: result.iterations, + gadgetCalls: result.gadgetCalls, + }, + }; + }, + }); } diff --git a/src/backends/adapter.ts b/src/backends/adapter.ts index 0d9feec4..e1912a8f 100644 --- a/src/backends/adapter.ts +++ b/src/backends/adapter.ts @@ -1,32 +1,26 @@ import type { ModelSpec } from 'llmist'; import type { PromptContext } from '../agents/prompts/index.js'; -import { cleanupAgentResources } from '../agents/shared/cleanup.js'; +import { + type LogWriter, + type PipelineContext, + executeAgentPipeline, +} from '../agents/shared/executionPipeline.js'; import { resolveModelConfig } from '../agents/shared/modelResolution.js'; import { buildPromptContext } from '../agents/shared/promptContext.js'; import { setupRepository } from '../agents/shared/repository.js'; -import { - type RunTrackingInput, - finalizeBackendRun, - tryCreateRun, -} from '../agents/shared/runTracking.js'; +import { finalizeBackendRun, tryCreateRun } from '../agents/shared/runTracking.js'; import { createAgentLogger } from '../agents/utils/logging.js'; import { CUSTOM_MODELS } from '../config/customModels.js'; import { loadPartials } from '../db/repositories/partialsRepository.js'; import { withGitHubToken } from '../github/client.js'; -import { captureException } from '../sentry.js'; import type { AgentInput, AgentResult, CascadeConfig, ProjectConfig } from '../types/index.js'; -import { loadCascadeEnv, unloadCascadeEnv } from '../utils/cascadeEnv.js'; -import { createFileLogger } from '../utils/fileLogger.js'; -import { setWatchdogCleanup } from '../utils/lifecycle.js'; -import { logger } from '../utils/logging.js'; -import { setupRemoteSquintDb } from '../utils/squintDb.js'; import { getAgentProfile } from './agent-profiles.js'; import { postProcessResult } from './postProcess.js'; import { createProgressMonitor } from './progress.js'; import { augmentProjectSecrets, resolveGitHubToken } from './secretBuilder.js'; import { getToolManifests } from './toolManifests.js'; -import type { AgentBackend, AgentBackendInput, LogWriter } from './types.js'; +import type { AgentBackend, AgentBackendInput } from './types.js'; /** * Resolve the working directory — either a pre-existing log dir or a fresh repo clone. @@ -48,24 +42,6 @@ async function resolveRepoDir( }); } -/** - * Create a LogWriter that writes to both the file logger and the structured logger. - */ -function createLogWriter(fileLogger: ReturnType): LogWriter { - return (level: string, message: string, context?: Record) => { - fileLogger.write(level, message, context); - const logFn = - level === 'ERROR' - ? logger.error - : level === 'WARN' - ? logger.warn - : level === 'DEBUG' - ? logger.debug - : logger.info; - logFn.call(logger, message, context); - }; -} - /** * Build the BackendInput by resolving model config, fetching context, etc. * Uses agent profiles to customize tools, context, and prompts per agent type. @@ -168,95 +144,90 @@ export async function executeWithBackend( input: AgentInput & { project: ProjectConfig; config: CascadeConfig }, ): Promise { const { cardId } = input; - let repoDir: string | null = null; - let runId: string | undefined; - const startTime = Date.now(); - const identifier = `${agentType}-${cardId || 'unknown'}`; - const fileLogger = createFileLogger(`cascade-${identifier}`); - const log = createAgentLogger(fileLogger); - - setWatchdogCleanup(async () => { - fileLogger.close(); - await finalizeBackendRun(runId, fileLogger, { - status: 'timed_out', - durationMs: Date.now() - startTime, - success: false, - error: 'Watchdog timeout', - }); - }); - - try { - repoDir = await resolveRepoDir(input, log, agentType); - const envSnapshot = loadCascadeEnv(repoDir, log); - const squintCleanup = await setupRemoteSquintDb(repoDir, input.project, log); - const logWriter = createLogWriter(fileLogger); - - const profile = getAgentProfile(agentType); - const gitHubToken = await resolveGitHubToken(profile, input.project.id, agentType); - - // Build backend input wrapped in GitHub token scope if needed - const buildPartial = () => - buildBackendInput(agentType, input, repoDir as string, logWriter, log, gitHubToken); - - const partialInput = gitHubToken - ? await withGitHubToken(gitHubToken, buildPartial) - : await buildPartial(); - - const runTrackingInput: RunTrackingInput = { - projectId: input.project.id, - cardId: input.cardId, - prNumber: input.prNumber, - agentType, - backendName: backend.name, - triggerType: input.triggerType, - }; - runId = await tryCreateRun(runTrackingInput, partialInput.model, partialInput.maxIterations); - const monitor = createProgressMonitor({ - logWriter, - agentType, - taskDescription: cardId ? `Work item ${cardId}` : 'Unknown task', - progressModel: input.config.defaults.progressModel, - intervalMinutes: input.config.defaults.progressIntervalMinutes, - customModels: CUSTOM_MODELS as ModelSpec[], - repoDir: repoDir ?? undefined, - trello: cardId ? { cardId } : undefined, - preSeededCommentId: input.ackCommentId as string | undefined, - }); + return executeAgentPipeline({ + loggerIdentifier: identifier, + + setupRepoDir: (log) => resolveRepoDir(input, log, agentType), + + skipRepoDeletion: Boolean(input.logDir), + + squintDbUrl: input.project.squintDbUrl, + + finalizeRun: (runId, fileLogger, outcome) => + finalizeBackendRun(runId, fileLogger, { + status: outcome.status, + durationMs: outcome.durationMs, + success: outcome.success, + error: outcome.error, + costUsd: outcome.costUsd, + prUrl: outcome.prUrl, + outputSummary: outcome.outputSummary, + }), + + execute: async (ctx: PipelineContext) => { + const { repoDir, fileLogger, logWriter, setRunId } = ctx; + const log = createAgentLogger(fileLogger); + + const profile = getAgentProfile(agentType); + const gitHubToken = await resolveGitHubToken(profile, input.project.id, agentType); + + // Build backend input wrapped in GitHub token scope if needed + const buildPartial = () => + buildBackendInput(agentType, input, repoDir, logWriter, log, gitHubToken); + + const partialInput = gitHubToken + ? await withGitHubToken(gitHubToken, buildPartial) + : await buildPartial(); + + // Create run record now that we have model and maxIterations + const runId = await tryCreateRun( + { + projectId: input.project.id, + cardId: input.cardId, + prNumber: input.prNumber as number | undefined, + agentType, + backendName: backend.name, + triggerType: input.triggerType, + }, + partialInput.model, + partialInput.maxIterations, + ); + if (runId) setRunId(runId); + + const monitor = createProgressMonitor({ + logWriter, + agentType, + taskDescription: cardId ? `Work item ${cardId}` : 'Unknown task', + progressModel: input.config.defaults.progressModel, + intervalMinutes: input.config.defaults.progressIntervalMinutes, + customModels: CUSTOM_MODELS as ModelSpec[], + repoDir: repoDir ?? undefined, + trello: cardId ? { cardId } : undefined, + preSeededCommentId: input.ackCommentId as string | undefined, + }); - const backendInput: AgentBackendInput = { - ...partialInput, - progressReporter: monitor ?? { - onIteration: async () => {}, - onToolCall: () => {}, - onText: () => {}, - }, - runId, - }; + const backendInput: AgentBackendInput = { + ...partialInput, + progressReporter: monitor ?? { + onIteration: async () => {}, + onToolCall: () => {}, + onText: () => {}, + }, + runId, + }; - const originalCwd = process.cwd(); - process.chdir(repoDir); - monitor?.start(); + monitor?.start(); + let result: Awaited>; + try { + result = await backend.execute(backendInput); + } finally { + monitor?.stop(); + } - try { - const result = await backend.execute(backendInput); postProcessResult(result, agentType, backend, input, identifier); - fileLogger.close(); - const logBuffer = await fileLogger.getZippedBuffer(); - - const durationMs = Date.now() - startTime; - await finalizeBackendRun(runId, fileLogger, { - status: result.success ? 'completed' : 'failed', - durationMs, - costUsd: result.cost, - success: result.success, - error: result.error, - prUrl: result.prUrl, - outputSummary: result.output.slice(0, 500), - }); - return { success: result.success, output: result.output, @@ -264,45 +235,8 @@ export async function executeWithBackend( progressCommentId: monitor?.getProgressCommentId() ?? undefined, error: result.error, cost: result.cost, - logBuffer: logBuffer ?? result.logBuffer, - runId, - durationMs, + logBuffer: result.logBuffer, }; - } finally { - monitor?.stop(); - process.chdir(originalCwd); - squintCleanup?.(); - unloadCascadeEnv(envSnapshot); - } - } catch (err) { - logger.error('Backend execution failed', { - identifier, - backend: backend.name, - error: String(err), - }); - captureException(err, { - tags: { source: 'backend_execution', backend: backend.name, agent: identifier }, - extra: { runId, durationMs: Date.now() - startTime }, - }); - - let logBuffer: Buffer | undefined; - try { - fileLogger.close(); - logBuffer = await fileLogger.getZippedBuffer(); - } catch { - // Ignore log buffer errors - } - - const durationMs = Date.now() - startTime; - await finalizeBackendRun(runId, fileLogger, { - status: 'failed', - durationMs, - success: false, - error: String(err), - }); - - return { success: false, output: '', error: String(err), logBuffer, runId, durationMs }; - } finally { - cleanupAgentResources(repoDir, fileLogger, Boolean(input.logDir)); - } + }, + }); } diff --git a/tests/unit/agents/shared/executionPipeline.test.ts b/tests/unit/agents/shared/executionPipeline.test.ts new file mode 100644 index 00000000..042d0978 --- /dev/null +++ b/tests/unit/agents/shared/executionPipeline.test.ts @@ -0,0 +1,524 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +// Mock all external dependencies +vi.mock('../../../../src/utils/fileLogger.js', () => ({ + createFileLogger: vi.fn(), + cleanupLogFile: vi.fn(), + cleanupLogDirectory: vi.fn(), +})); + +vi.mock('../../../../src/agents/utils/logging.js', () => ({ + createAgentLogger: vi.fn(), +})); + +vi.mock('../../../../src/utils/cascadeEnv.js', () => ({ + loadCascadeEnv: vi.fn(), + unloadCascadeEnv: vi.fn(), +})); + +vi.mock('../../../../src/utils/repo.js', () => ({ + cleanupTempDir: vi.fn(), +})); + +vi.mock('../../../../src/utils/lifecycle.js', () => ({ + setWatchdogCleanup: vi.fn(), + clearWatchdogCleanup: vi.fn(), +})); + +vi.mock('../../../../src/utils/squintDb.js', () => ({ + setupRemoteSquintDb: vi.fn().mockResolvedValue(null), +})); + +vi.mock('../../../../src/utils/logging.js', () => ({ + logger: { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }, +})); + +const mockCaptureException = vi.fn(); +vi.mock('../../../../src/sentry.js', () => ({ + captureException: (...args: unknown[]) => mockCaptureException(...args), +})); + +vi.mock('../../../../src/db/repositories/runsRepository.js', () => ({ + createRun: vi.fn(), + completeRun: vi.fn(), + storeRunLogs: vi.fn(), +})); + +import { + createLogWriter, + executeAgentPipeline, +} from '../../../../src/agents/shared/executionPipeline.js'; +import { createAgentLogger } from '../../../../src/agents/utils/logging.js'; +import { loadCascadeEnv, unloadCascadeEnv } from '../../../../src/utils/cascadeEnv.js'; +import { + cleanupLogDirectory, + cleanupLogFile, + createFileLogger, +} from '../../../../src/utils/fileLogger.js'; +import { clearWatchdogCleanup, setWatchdogCleanup } from '../../../../src/utils/lifecycle.js'; +import { logger } from '../../../../src/utils/logging.js'; +import { cleanupTempDir } from '../../../../src/utils/repo.js'; +import { setupRemoteSquintDb } from '../../../../src/utils/squintDb.js'; + +const mockCreateFileLogger = vi.mocked(createFileLogger); +const mockCreateAgentLogger = vi.mocked(createAgentLogger); +const mockLoadCascadeEnv = vi.mocked(loadCascadeEnv); +const mockUnloadCascadeEnv = vi.mocked(unloadCascadeEnv); +const mockCleanupTempDir = vi.mocked(cleanupTempDir); +const mockCleanupLogFile = vi.mocked(cleanupLogFile); +const mockCleanupLogDirectory = vi.mocked(cleanupLogDirectory); +const mockClearWatchdogCleanup = vi.mocked(clearWatchdogCleanup); +const mockSetWatchdogCleanup = vi.mocked(setWatchdogCleanup); +const mockSetupRemoteSquintDb = vi.mocked(setupRemoteSquintDb); + +function setupMocks() { + const mockLoggerInstance = { + write: vi.fn(), + close: vi.fn(), + getZippedBuffer: vi.fn().mockResolvedValue(Buffer.from('logs')), + logPath: '/tmp/test.log', + llmistLogPath: '/tmp/test-llmist.log', + llmCallLogger: { logDir: '/tmp/llm-calls' }, + }; + mockCreateFileLogger.mockReturnValue(mockLoggerInstance as never); + mockCreateAgentLogger.mockReturnValue({ info: vi.fn(), warn: vi.fn(), error: vi.fn() } as never); + mockLoadCascadeEnv.mockReturnValue({}); + mockSetupRemoteSquintDb.mockResolvedValue(null); + return mockLoggerInstance; +} + +beforeEach(() => { + vi.clearAllMocks(); + process.env.CASCADE_LOCAL_MODE = ''; +}); + +describe('executeAgentPipeline', () => { + it('returns successful result from execute callback', async () => { + setupMocks(); + + const result = await executeAgentPipeline({ + loggerIdentifier: 'test-run', + setupRepoDir: vi.fn().mockResolvedValue(process.cwd()), + finalizeRun: vi.fn(), + execute: async () => ({ success: true, output: 'Done', cost: 0.5 }), + }); + + expect(result.success).toBe(true); + expect(result.output).toBe('Done'); + expect(result.cost).toBe(0.5); + }); + + it('returns error result when execute callback throws', async () => { + setupMocks(); + + const result = await executeAgentPipeline({ + loggerIdentifier: 'test-run', + setupRepoDir: vi.fn().mockResolvedValue(process.cwd()), + finalizeRun: vi.fn(), + execute: async () => { + throw new Error('Execute failed'); + }, + }); + + expect(result.success).toBe(false); + expect(result.error).toContain('Execute failed'); + }); + + it('returns error result when setupRepoDir throws', async () => { + setupMocks(); + + const result = await executeAgentPipeline({ + loggerIdentifier: 'test-run', + setupRepoDir: vi.fn().mockRejectedValue(new Error('Repo setup failed')), + finalizeRun: vi.fn(), + execute: async () => ({ success: true, output: 'Done' }), + }); + + expect(result.success).toBe(false); + expect(result.error).toContain('Repo setup failed'); + }); + + it('loads and unloads cascade env around execution', async () => { + setupMocks(); + + await executeAgentPipeline({ + loggerIdentifier: 'test-run', + setupRepoDir: vi.fn().mockResolvedValue(process.cwd()), + finalizeRun: vi.fn(), + execute: async () => ({ success: true, output: 'Done' }), + }); + + expect(mockLoadCascadeEnv).toHaveBeenCalled(); + expect(mockUnloadCascadeEnv).toHaveBeenCalled(); + }); + + it('restores CWD even when execute throws', async () => { + setupMocks(); + const originalCwd = process.cwd(); + + await executeAgentPipeline({ + loggerIdentifier: 'test-run', + setupRepoDir: vi.fn().mockResolvedValue(process.cwd()), + finalizeRun: vi.fn(), + execute: async () => { + throw new Error('Failed mid-execution'); + }, + }); + + expect(process.cwd()).toBe(originalCwd); + }); + + it('calls setWatchdogCleanup with a cleanup function', async () => { + setupMocks(); + + await executeAgentPipeline({ + loggerIdentifier: 'test-run', + setupRepoDir: vi.fn().mockResolvedValue(process.cwd()), + finalizeRun: vi.fn(), + execute: async () => ({ success: true, output: 'Done' }), + }); + + expect(mockSetWatchdogCleanup).toHaveBeenCalledWith(expect.any(Function)); + }); + + it('cleans up resources in finally block', async () => { + setupMocks(); + + await executeAgentPipeline({ + loggerIdentifier: 'test-run', + setupRepoDir: vi.fn().mockResolvedValue(process.cwd()), + finalizeRun: vi.fn(), + execute: async () => ({ success: true, output: 'Done' }), + }); + + expect(mockClearWatchdogCleanup).toHaveBeenCalled(); + expect(mockCleanupTempDir).toHaveBeenCalled(); + expect(mockCleanupLogFile).toHaveBeenCalled(); + expect(mockCleanupLogDirectory).toHaveBeenCalled(); + }); + + it('skips temp dir cleanup when skipRepoDeletion is true', async () => { + setupMocks(); + + await executeAgentPipeline({ + loggerIdentifier: 'test-run', + setupRepoDir: vi.fn().mockResolvedValue(process.cwd()), + finalizeRun: vi.fn(), + skipRepoDeletion: true, + execute: async () => ({ success: true, output: 'Done' }), + }); + + expect(mockCleanupTempDir).not.toHaveBeenCalled(); + expect(mockCleanupLogFile).toHaveBeenCalled(); + }); + + it('skips cleanup in CASCADE_LOCAL_MODE', async () => { + process.env.CASCADE_LOCAL_MODE = 'true'; + setupMocks(); + + await executeAgentPipeline({ + loggerIdentifier: 'test-run', + setupRepoDir: vi.fn().mockResolvedValue(process.cwd()), + finalizeRun: vi.fn(), + execute: async () => ({ success: true, output: 'Done' }), + }); + + expect(mockCleanupTempDir).not.toHaveBeenCalled(); + expect(mockCleanupLogFile).not.toHaveBeenCalled(); + expect(mockCleanupLogDirectory).not.toHaveBeenCalled(); + }); + + it('calls finalizeRun with completed status on success', async () => { + setupMocks(); + const mockFinalizeRun = vi.fn(); + + await executeAgentPipeline({ + loggerIdentifier: 'test-run', + setupRepoDir: vi.fn().mockResolvedValue(process.cwd()), + finalizeRun: mockFinalizeRun, + execute: async () => ({ success: true, output: 'Done', cost: 1.0 }), + }); + + expect(mockFinalizeRun).toHaveBeenCalledWith( + undefined, + expect.anything(), + expect.objectContaining({ + status: 'completed', + success: true, + durationMs: expect.any(Number), + costUsd: 1.0, + }), + ); + }); + + it('calls finalizeRun with failed status when execute returns failure', async () => { + setupMocks(); + const mockFinalizeRun = vi.fn(); + + await executeAgentPipeline({ + loggerIdentifier: 'test-run', + setupRepoDir: vi.fn().mockResolvedValue(process.cwd()), + finalizeRun: mockFinalizeRun, + execute: async () => ({ success: false, output: '', error: 'Agent failed' }), + }); + + expect(mockFinalizeRun).toHaveBeenCalledWith( + undefined, + expect.anything(), + expect.objectContaining({ + status: 'failed', + success: false, + error: 'Agent failed', + }), + ); + }); + + it('calls finalizeRun with failed status when execute throws', async () => { + setupMocks(); + const mockFinalizeRun = vi.fn(); + + await executeAgentPipeline({ + loggerIdentifier: 'test-run', + setupRepoDir: vi.fn().mockResolvedValue(process.cwd()), + finalizeRun: mockFinalizeRun, + execute: async () => { + throw new Error('Unexpected crash'); + }, + }); + + expect(mockFinalizeRun).toHaveBeenCalledWith( + undefined, + expect.anything(), + expect.objectContaining({ + status: 'failed', + success: false, + error: expect.stringContaining('Unexpected crash'), + }), + ); + }); + + it('reports errors to Sentry when execute throws', async () => { + setupMocks(); + const error = new Error('Test error'); + + await executeAgentPipeline({ + loggerIdentifier: 'test-agent', + setupRepoDir: vi.fn().mockResolvedValue(process.cwd()), + finalizeRun: vi.fn(), + execute: async () => { + throw error; + }, + }); + + expect(mockCaptureException).toHaveBeenCalledWith(error, { + tags: { + source: 'agent_execution', + agent: 'test-agent', + }, + extra: { + runId: undefined, + durationMs: expect.any(Number), + }, + }); + }); + + it('returns durationMs in successful result', async () => { + setupMocks(); + + const result = await executeAgentPipeline({ + loggerIdentifier: 'test-run', + setupRepoDir: vi.fn().mockResolvedValue(process.cwd()), + finalizeRun: vi.fn(), + execute: async () => ({ success: true, output: 'Done' }), + }); + + expect(result.durationMs).toBeDefined(); + expect(result.durationMs).toBeGreaterThanOrEqual(0); + expect(typeof result.durationMs).toBe('number'); + }); + + it('returns durationMs in error result', async () => { + setupMocks(); + + const result = await executeAgentPipeline({ + loggerIdentifier: 'test-run', + setupRepoDir: vi.fn().mockResolvedValue(process.cwd()), + finalizeRun: vi.fn(), + execute: async () => { + throw new Error('Failed'); + }, + }); + + expect(result.durationMs).toBeDefined(); + expect(result.durationMs).toBeGreaterThanOrEqual(0); + expect(typeof result.durationMs).toBe('number'); + }); + + it('provides logBuffer from fileLogger.getZippedBuffer', async () => { + const mockLogger = setupMocks(); + mockLogger.getZippedBuffer.mockResolvedValue(Buffer.from('log-data')); + + const result = await executeAgentPipeline({ + loggerIdentifier: 'test-run', + setupRepoDir: vi.fn().mockResolvedValue(process.cwd()), + finalizeRun: vi.fn(), + execute: async () => ({ success: true, output: 'Done' }), + }); + + expect(result.logBuffer).toEqual(Buffer.from('log-data')); + }); + + it('uses logBuffer from execute result if provided', async () => { + setupMocks(); + const customBuffer = Buffer.from('custom-log'); + + const result = await executeAgentPipeline({ + loggerIdentifier: 'test-run', + setupRepoDir: vi.fn().mockResolvedValue(process.cwd()), + finalizeRun: vi.fn(), + execute: async () => ({ success: true, output: 'Done', logBuffer: customBuffer }), + }); + + expect(result.logBuffer).toEqual(customBuffer); + }); + + it('passes runId to finalizeRun when setRunId is called in execute', async () => { + setupMocks(); + const mockFinalizeRun = vi.fn(); + + await executeAgentPipeline({ + loggerIdentifier: 'test-run', + setupRepoDir: vi.fn().mockResolvedValue(process.cwd()), + finalizeRun: mockFinalizeRun, + execute: async (ctx) => { + ctx.setRunId('my-run-id'); + return { success: true, output: 'Done' }; + }, + }); + + expect(mockFinalizeRun).toHaveBeenCalledWith( + 'my-run-id', + expect.anything(), + expect.any(Object), + ); + }); + + it('returns runId in result when setRunId is called in execute', async () => { + setupMocks(); + + const result = await executeAgentPipeline({ + loggerIdentifier: 'test-run', + setupRepoDir: vi.fn().mockResolvedValue(process.cwd()), + finalizeRun: vi.fn(), + execute: async (ctx) => { + ctx.setRunId('test-run-id'); + return { success: true, output: 'Done' }; + }, + }); + + expect(result.runId).toBe('test-run-id'); + }); + + it('calls onWatchdogTimeout when watchdog fires', async () => { + setupMocks(); + const mockOnWatchdog = vi.fn(); + + // Capture the watchdog cleanup function + let watchdogCleanup: (() => Promise) | undefined; + mockSetWatchdogCleanup.mockImplementation((fn) => { + watchdogCleanup = fn; + }); + + // Start the pipeline but don't wait for it — we'll fire the watchdog manually + const pipelinePromise = executeAgentPipeline({ + loggerIdentifier: 'test-run', + setupRepoDir: vi.fn().mockResolvedValue(process.cwd()), + finalizeRun: vi.fn(), + onWatchdogTimeout: mockOnWatchdog, + execute: vi.fn().mockResolvedValue({ success: true, output: 'Done' }), + }); + + await pipelinePromise; + + // Manually invoke the captured watchdog cleanup + expect(watchdogCleanup).toBeDefined(); + await watchdogCleanup?.(); + + expect(mockOnWatchdog).toHaveBeenCalled(); + }); + + it('passes finalizeMetadata to finalizeRun', async () => { + setupMocks(); + const mockFinalizeRun = vi.fn(); + + await executeAgentPipeline({ + loggerIdentifier: 'test-run', + setupRepoDir: vi.fn().mockResolvedValue(process.cwd()), + finalizeRun: mockFinalizeRun, + execute: async () => ({ + success: true, + output: 'Done', + finalizeMetadata: { llmIterations: 10, gadgetCalls: 25 }, + }), + }); + + expect(mockFinalizeRun).toHaveBeenCalledWith( + undefined, + expect.anything(), + expect.objectContaining({ + metadata: { llmIterations: 10, gadgetCalls: 25 }, + }), + ); + }); +}); + +describe('createLogWriter', () => { + it('writes to file logger and structured logger', () => { + const mockFileLogger = { + write: vi.fn(), + close: vi.fn(), + getZippedBuffer: vi.fn(), + logPath: '/tmp/test.log', + llmistLogPath: '/tmp/test-llmist.log', + llmCallLogger: { logDir: '/tmp/llm-calls' }, + }; + + const logWriter = createLogWriter(mockFileLogger as never); + logWriter('INFO', 'Test message', { key: 'value' }); + + expect(mockFileLogger.write).toHaveBeenCalledWith('INFO', 'Test message', { key: 'value' }); + expect(logger.info).toHaveBeenCalledWith('Test message', { key: 'value' }); + }); + + it('routes ERROR level to logger.error', () => { + const mockFileLogger = { write: vi.fn() }; + const logWriter = createLogWriter(mockFileLogger as never); + + logWriter('ERROR', 'Error message'); + + expect(logger.error).toHaveBeenCalledWith('Error message', undefined); + }); + + it('routes WARN level to logger.warn', () => { + const mockFileLogger = { write: vi.fn() }; + const logWriter = createLogWriter(mockFileLogger as never); + + logWriter('WARN', 'Warning message'); + + expect(logger.warn).toHaveBeenCalledWith('Warning message', undefined); + }); + + it('routes DEBUG level to logger.debug', () => { + const mockFileLogger = { write: vi.fn() }; + const logWriter = createLogWriter(mockFileLogger as never); + + logWriter('DEBUG', 'Debug message'); + + expect(logger.debug).toHaveBeenCalledWith('Debug message', undefined); + }); +}); diff --git a/tests/unit/backends/adapter.test.ts b/tests/unit/backends/adapter.test.ts index 3fabd5bf..5536cdfa 100644 --- a/tests/unit/backends/adapter.test.ts +++ b/tests/unit/backends/adapter.test.ts @@ -293,8 +293,7 @@ describe('executeWithBackend', () => { expect(mockCaptureException).toHaveBeenCalledWith(error, { tags: { - source: 'backend_execution', - backend: 'test-backend', + source: 'agent_execution', agent: expect.stringContaining('review'), }, extra: {