From eafb764c5dea2675a5f052d4836774afff58bb64 Mon Sep 17 00:00:00 2001 From: Zbigniew Sobiecki Date: Sat, 7 Mar 2026 05:47:14 +0000 Subject: [PATCH] feat(router): add agent-type concurrency limiting MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add per-agent-type concurrency limits to prevent multiple instances of the same agent type running simultaneously for a project. Core implementation: - New `agent_configs.max_concurrency` column with CHECK constraint - Two-layer lock in `src/router/agent-type-lock.ts`: 1. In-memory concurrency map (fast path, TTL safety net) 2. DB count of running `agent_runs` (authoritative, survives restarts) - Trigger-level dedup (60s TTL) suppresses batch webhook re-triggers - Shared `checkAgentTypeConcurrency()` consolidates logic from 3 handlers Key design decisions: - Use `Math.max(dbCount, inMemoryCount)` instead of sum to avoid double-counting during the enqueued→running transition - 5-second TTL cache on `getMaxConcurrency()` DB queries - Graceful fallback (no limit) when DB is unreachable - Periodic cleanup of dedup map when it exceeds 100 entries Dashboard & CLI: - Agent config forms expose Max Concurrency field - `cascade agents create/update --max-concurrency N` - Agent configs table shows concurrency column Container manager: - `extractAgentType()` handles both trigger-based and dashboard jobs - Worker cleanup releases agent-type locks on exit - `detachAll()` clears all agent-type locks on shutdown Tests: 24 unit tests for agent-type-lock, concurrency-blocked tests added to all 3 webhook handler test suites (3868 total tests passing). Co-Authored-By: Claude Opus 4.6 --- src/api/routers/agentConfigs.ts | 3 + src/cli/dashboard/agents/create.ts | 2 + src/cli/dashboard/agents/update.ts | 2 + .../0026_agent_type_concurrency.sql | 1 + src/db/migrations/meta/_journal.json | 7 + src/db/repositories/runsRepository.ts | 22 ++ src/db/repositories/settingsRepository.ts | 74 ++++++ src/db/schema/agentConfigs.ts | 1 + src/pm/webhook-handler.ts | 22 ++ src/router/agent-type-lock.ts | 205 ++++++++++++++++ src/router/container-manager.ts | 21 ++ src/router/webhook-processor.ts | 24 ++ src/triggers/github/webhook-handler.ts | 22 ++ tests/unit/pm/webhook-handler.test.ts | 21 ++ tests/unit/router/agent-type-lock.test.ts | 231 ++++++++++++++++++ tests/unit/router/webhook-processor.test.ts | 24 ++ .../triggers/github-webhook-handler.test.ts | 18 ++ .../projects/project-agent-configs.tsx | 18 ++ .../settings/agent-config-form-dialog.tsx | 15 ++ .../settings/agent-configs-table.tsx | 5 +- 20 files changed, 737 insertions(+), 1 deletion(-) create mode 100644 src/db/migrations/0026_agent_type_concurrency.sql create mode 100644 src/router/agent-type-lock.ts create mode 100644 tests/unit/router/agent-type-lock.test.ts diff --git a/src/api/routers/agentConfigs.ts b/src/api/routers/agentConfigs.ts index 215656f4..18bcd9f0 100644 --- a/src/api/routers/agentConfigs.ts +++ b/src/api/routers/agentConfigs.ts @@ -52,6 +52,7 @@ export const agentConfigsRouter = router({ model: z.string().nullish(), maxIterations: z.number().int().positive().nullish(), agentBackend: z.string().nullish(), + maxConcurrency: z.number().int().positive().nullish(), }), ) .mutation(async ({ ctx, input }) => { @@ -66,6 +67,7 @@ export const agentConfigsRouter = router({ model: input.model, maxIterations: input.maxIterations, agentBackend: input.agentBackend, + maxConcurrency: input.maxConcurrency, }); }), @@ -77,6 +79,7 @@ export const agentConfigsRouter = router({ model: z.string().nullish(), maxIterations: z.number().int().positive().nullish(), agentBackend: z.string().nullish(), + maxConcurrency: z.number().int().positive().nullish(), }), ) .mutation(async ({ ctx, input }) => { diff --git a/src/cli/dashboard/agents/create.ts b/src/cli/dashboard/agents/create.ts index 0e91ff5a..2433f959 100644 --- a/src/cli/dashboard/agents/create.ts +++ b/src/cli/dashboard/agents/create.ts @@ -14,6 +14,7 @@ export default class AgentsCreate extends DashboardCommand { model: Flags.string({ description: 'Model override' }), 'max-iterations': Flags.integer({ description: 'Max iterations override' }), backend: Flags.string({ description: 'Agent backend override' }), + 'max-concurrency': Flags.integer({ description: 'Max concurrent runs per project' }), }; async run(): Promise { @@ -26,6 +27,7 @@ export default class AgentsCreate extends DashboardCommand { model: flags.model, maxIterations: flags['max-iterations'], agentBackend: flags.backend, + maxConcurrency: flags['max-concurrency'], }); if (flags.json) { diff --git a/src/cli/dashboard/agents/update.ts b/src/cli/dashboard/agents/update.ts index ede83075..9eaeff3d 100644 --- a/src/cli/dashboard/agents/update.ts +++ b/src/cli/dashboard/agents/update.ts @@ -14,6 +14,7 @@ export default class AgentsUpdate extends DashboardCommand { model: Flags.string({ description: 'Model override' }), 'max-iterations': Flags.integer({ description: 'Max iterations override' }), backend: Flags.string({ description: 'Agent backend override' }), + 'max-concurrency': Flags.integer({ description: 'Max concurrent runs per project' }), }; async run(): Promise { @@ -26,6 +27,7 @@ export default class AgentsUpdate extends DashboardCommand { model: flags.model, maxIterations: flags['max-iterations'], agentBackend: flags.backend, + maxConcurrency: flags['max-concurrency'], }); if (flags.json) { diff --git a/src/db/migrations/0026_agent_type_concurrency.sql b/src/db/migrations/0026_agent_type_concurrency.sql new file mode 100644 index 00000000..09754a06 --- /dev/null +++ b/src/db/migrations/0026_agent_type_concurrency.sql @@ -0,0 +1 @@ +ALTER TABLE agent_configs ADD COLUMN max_concurrency INTEGER CHECK (max_concurrency IS NULL OR max_concurrency > 0); diff --git a/src/db/migrations/meta/_journal.json b/src/db/migrations/meta/_journal.json index 4d83a796..111cbe76 100644 --- a/src/db/migrations/meta/_journal.json +++ b/src/db/migrations/meta/_journal.json @@ -183,6 +183,13 @@ "when": 1760000000000, "tag": "0025_integration_hooks", "breakpoints": false + }, + { + "idx": 26, + "version": "7", + "when": 1761000000000, + "tag": "0026_agent_type_concurrency", + "breakpoints": false } ] } diff --git a/src/db/repositories/runsRepository.ts b/src/db/repositories/runsRepository.ts index 87d3753f..ebcc3aaa 100644 --- a/src/db/repositories/runsRepository.ts +++ b/src/db/repositories/runsRepository.ts @@ -271,6 +271,28 @@ export async function hasActiveRunForWorkItem( return !!row; } +export async function countActiveRunsForAgentType( + projectId: string, + agentType: string, + maxAgeMs?: number, +): Promise { + const db = getDb(); + const conditions: SQL[] = [ + eq(agentRuns.projectId, projectId), + eq(agentRuns.agentType, agentType), + eq(agentRuns.status, 'running'), + ]; + if (maxAgeMs !== undefined) { + const cutoff = new Date(Date.now() - maxAgeMs); + conditions.push(gte(agentRuns.startedAt, cutoff)); + } + const [row] = await db + .select({ count: count() }) + .from(agentRuns) + .where(and(...conditions)); + return row?.count ?? 0; +} + export async function failOrphanedRun( projectId: string, workItemId: string, diff --git a/src/db/repositories/settingsRepository.ts b/src/db/repositories/settingsRepository.ts index d5f080bb..c79261e5 100644 --- a/src/db/repositories/settingsRepository.ts +++ b/src/db/repositories/settingsRepository.ts @@ -329,6 +329,7 @@ export async function createAgentConfig(data: { model?: string | null; maxIterations?: number | null; agentBackend?: string | null; + maxConcurrency?: number | null; }) { const db = getDb(); const [row] = await db @@ -340,6 +341,7 @@ export async function createAgentConfig(data: { model: data.model, maxIterations: data.maxIterations, agentBackend: data.agentBackend, + maxConcurrency: data.maxConcurrency, }) .returning({ id: agentConfigs.id }); return row; @@ -352,6 +354,7 @@ export async function updateAgentConfig( model?: string | null; maxIterations?: number | null; agentBackend?: string | null; + maxConcurrency?: number | null; }, ) { const db = getDb(); @@ -365,3 +368,74 @@ export async function deleteAgentConfig(id: number) { const db = getDb(); await db.delete(agentConfigs).where(eq(agentConfigs.id, id)); } + +/** + * Resolve max_concurrency for a (projectId, agentType) pair. + * Checks project-scoped config first, then org-scoped config. + * Returns null if no config with max_concurrency is found (= no limit). + * + * Results are cached for 5 seconds to avoid repeated DB queries on + * sequential webhook batches. + */ +const MAX_CONCURRENCY_TTL_MS = 5_000; +const maxConcurrencyCache = new Map(); + +export async function getMaxConcurrency( + projectId: string, + agentType: string, +): Promise { + const cacheKey = `${projectId}:${agentType}`; + const cached = maxConcurrencyCache.get(cacheKey); + if (cached && Date.now() < cached.expiresAt) { + return cached.value; + } + + const db = getDb(); + + // 1. Project-scoped config + const [projectConfig] = await db + .select({ maxConcurrency: agentConfigs.maxConcurrency }) + .from(agentConfigs) + .where(and(eq(agentConfigs.projectId, projectId), eq(agentConfigs.agentType, agentType))) + .limit(1); + if (projectConfig?.maxConcurrency != null) { + maxConcurrencyCache.set(cacheKey, { + value: projectConfig.maxConcurrency, + expiresAt: Date.now() + MAX_CONCURRENCY_TTL_MS, + }); + return projectConfig.maxConcurrency; + } + + // 2. Org-scoped config — need orgId from project + const [project] = await db + .select({ orgId: projects.orgId }) + .from(projects) + .where(eq(projects.id, projectId)) + .limit(1); + if (!project) { + maxConcurrencyCache.set(cacheKey, { + value: null, + expiresAt: Date.now() + MAX_CONCURRENCY_TTL_MS, + }); + return null; + } + + const [orgConfig] = await db + .select({ maxConcurrency: agentConfigs.maxConcurrency }) + .from(agentConfigs) + .where( + and( + eq(agentConfigs.orgId, project.orgId), + isNull(agentConfigs.projectId), + eq(agentConfigs.agentType, agentType), + ), + ) + .limit(1); + + const result = orgConfig?.maxConcurrency ?? null; + maxConcurrencyCache.set(cacheKey, { + value: result, + expiresAt: Date.now() + MAX_CONCURRENCY_TTL_MS, + }); + return result; +} diff --git a/src/db/schema/agentConfigs.ts b/src/db/schema/agentConfigs.ts index 15505b2b..3079e297 100644 --- a/src/db/schema/agentConfigs.ts +++ b/src/db/schema/agentConfigs.ts @@ -12,6 +12,7 @@ export const agentConfigs = pgTable( model: text('model'), maxIterations: integer('max_iterations'), agentBackend: text('agent_backend'), + maxConcurrency: integer('max_concurrency'), createdAt: timestamp('created_at').defaultNow(), updatedAt: timestamp('updated_at') .defaultNow() diff --git a/src/pm/webhook-handler.ts b/src/pm/webhook-handler.ts index 0d13b273..374cd73c 100644 --- a/src/pm/webhook-handler.ts +++ b/src/pm/webhook-handler.ts @@ -7,6 +7,12 @@ * ack comment management) is delegated to the PMIntegration interface. */ +import { + checkAgentTypeConcurrency, + clearAgentTypeEnqueued, + markAgentTypeEnqueued, + markRecentlyDispatched, +} from '../router/agent-type-lock.js'; import type { TriggerRegistry } from '../triggers/registry.js'; import { runAgentWithCredentials } from '../triggers/shared/webhook-execution.js'; import { processNextQueuedWebhook } from '../triggers/shared/webhook-queue.js'; @@ -99,6 +105,7 @@ async function resolveTriggerResult( return result; } +// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: webhook orchestration with multiple guard checks async function handleMatchedTrigger( integration: PMIntegration, registry: TriggerRegistry, @@ -129,6 +136,18 @@ async function handleMatchedTrigger( return; } + // Agent-type concurrency limit + let agentTypeMaxConcurrency: number | null = null; + if (result.agentType) { + const concurrencyCheck = await checkAgentTypeConcurrency(project.id, result.agentType); + agentTypeMaxConcurrency = concurrencyCheck.maxConcurrency; + if (concurrencyCheck.blocked) return; + if (agentTypeMaxConcurrency !== null) { + markRecentlyDispatched(project.id, result.agentType); + markAgentTypeEnqueued(project.id, result.agentType); + } + } + logger.info(`${integration.type} trigger matched`, { agentType: result.agentType, workItemId, @@ -154,6 +173,9 @@ async function handleMatchedTrigger( if (workItemId) { clearCardActive(workItemId); } + if (result.agentType && agentTypeMaxConcurrency !== null) { + clearAgentTypeEnqueued(project.id, result.agentType); + } setProcessing(false); processNextQueued(integration, registry); } diff --git a/src/router/agent-type-lock.ts b/src/router/agent-type-lock.ts new file mode 100644 index 00000000..f2dec3c3 --- /dev/null +++ b/src/router/agent-type-lock.ts @@ -0,0 +1,205 @@ +/** + * Agent-type concurrency lock for the router. + * + * Two layers: + * 1. Concurrency lock (in-memory + DB) — prevents multiple instances of the + * same agent type running for the same project. Cleared on completion. + * 2. Trigger-level dedup (in-memory, short TTL) — suppresses re-triggers + * within a 60-second window after the first dispatch. NOT cleared on + * completion (TTL-only). Handles sequential batch webhooks in server mode. + */ + +import { countActiveRunsForAgentType } from '../db/repositories/runsRepository.js'; +import { getMaxConcurrency } from '../db/repositories/settingsRepository.js'; +import { logger } from '../utils/logging.js'; +import { routerConfig } from './config.js'; + +// ============================================================================ +// Layer 1: Agent-Type Concurrency Lock (cleared on completion) +// ============================================================================ + +const CONCURRENCY_TTL_MS = 30 * 60 * 1000; // 30 minutes safety net + +interface ConcurrencyEntry { + timestamp: number; + count: number; +} + +const concurrencyMap = new Map(); + +function makeKey(projectId: string, agentType: string): string { + return `${projectId}:${agentType}`; +} + +/** + * Check whether an agent type is at its concurrency limit for a project. + * Fast path: in-memory map. Fallback: DB count of running agent_runs. + */ +export async function isAgentTypeLocked( + projectId: string, + agentType: string, + maxConcurrency: number, +): Promise<{ locked: boolean; reason?: string }> { + const key = makeKey(projectId, agentType); + + // Lazy TTL cleanup + const entry = concurrencyMap.get(key); + if (entry) { + if (Date.now() - entry.timestamp > CONCURRENCY_TTL_MS) { + concurrencyMap.delete(key); + logger.info('[AgentTypeLock] TTL expired, releasing in-memory lock', { + projectId, + agentType, + }); + } else if (entry.count >= maxConcurrency) { + return { + locked: true, + reason: `in-memory: ${entry.count} enqueued (max ${maxConcurrency})`, + }; + } + } + + // DB check — ignore runs older than 2× worker timeout (stale/orphaned) + const maxAgeMs = 2 * routerConfig.workerTimeoutMs; + const activeCount = await countActiveRunsForAgentType(projectId, agentType, maxAgeMs); + const inMemoryCount = + entry && Date.now() - entry.timestamp <= CONCURRENCY_TTL_MS ? entry.count : 0; + const effectiveCount = Math.max(activeCount, inMemoryCount); + + if (effectiveCount >= maxConcurrency) { + return { + locked: true, + reason: `${activeCount} running, ${inMemoryCount} enqueued (max ${maxConcurrency})`, + }; + } + + return { locked: false }; +} + +/** + * Mark an agent type as enqueued (in-memory, fast path). + * Called after dispatch succeeds. + */ +export function markAgentTypeEnqueued(projectId: string, agentType: string): void { + const key = makeKey(projectId, agentType); + const existing = concurrencyMap.get(key); + if (existing && Date.now() - existing.timestamp <= CONCURRENCY_TTL_MS) { + existing.count += 1; + existing.timestamp = Date.now(); + } else { + concurrencyMap.set(key, { timestamp: Date.now(), count: 1 }); + } +} + +/** + * Clear one enqueued slot for an agent type. + * Called when a worker exits (router mode) or agent completes (server mode). + */ +export function clearAgentTypeEnqueued(projectId: string, agentType: string): void { + const key = makeKey(projectId, agentType); + const entry = concurrencyMap.get(key); + if (entry) { + entry.count -= 1; + if (entry.count <= 0) { + concurrencyMap.delete(key); + } + } +} + +/** + * Clear all in-memory concurrency locks (used on shutdown / detachAll). + */ +export function clearAllAgentTypeLocks(): void { + concurrencyMap.clear(); + dedupMap.clear(); +} + +// ============================================================================ +// Layer 2: Trigger-Level Dedup (short TTL, NOT cleared on completion) +// ============================================================================ + +const DEDUP_TTL_MS = 60 * 1000; // 60 seconds + +const dedupMap = new Map(); + +/** + * Check whether an agent type was recently dispatched for a project. + * Returns true if a dispatch happened within the dedup TTL window. + */ +export function wasRecentlyDispatched(projectId: string, agentType: string): boolean { + const key = makeKey(projectId, agentType); + const timestamp = dedupMap.get(key); + if (timestamp === undefined) return false; + + if (Date.now() - timestamp > DEDUP_TTL_MS) { + dedupMap.delete(key); + return false; + } + return true; +} + +/** + * Mark an agent type as recently dispatched for a project. + * The mark expires after DEDUP_TTL_MS and is NOT cleared on completion. + */ +export function markRecentlyDispatched(projectId: string, agentType: string): void { + const key = makeKey(projectId, agentType); + dedupMap.set(key, Date.now()); + + // Periodic cleanup: evict expired entries when map grows large + if (dedupMap.size > 100) { + const now = Date.now(); + for (const [k, ts] of dedupMap) { + if (now - ts > DEDUP_TTL_MS) dedupMap.delete(k); + } + } +} + +// ============================================================================ +// Combined Concurrency Check (shared by all webhook handlers) +// ============================================================================ + +/** + * Check agent-type concurrency limit for a (projectId, agentType) pair. + * Combines DB config lookup, dedup check, and concurrency lock. + * + * Returns `{ maxConcurrency, blocked }`: + * - `maxConcurrency === null` means no limit is configured + * - `blocked === true` means the agent should be skipped + */ +export async function checkAgentTypeConcurrency( + projectId: string, + agentType: string, + logLabel?: string, +): Promise<{ maxConcurrency: number | null; blocked: boolean }> { + let maxConcurrency: number | null; + try { + maxConcurrency = await getMaxConcurrency(projectId, agentType); + } catch (err) { + logger.warn('[AgentTypeLock] Failed to check max concurrency, proceeding without limit', { + projectId, + agentType, + error: String(err), + }); + return { maxConcurrency: null, blocked: false }; + } + if (maxConcurrency === null) return { maxConcurrency: null, blocked: false }; + + if (wasRecentlyDispatched(projectId, agentType)) { + logger.info(`${logLabel ?? 'Agent'} recently dispatched, skipping (dedup)`, { + projectId, + agentType, + }); + return { maxConcurrency, blocked: true }; + } + const lockStatus = await isAgentTypeLocked(projectId, agentType, maxConcurrency); + if (lockStatus.locked) { + logger.info(`${logLabel ?? 'Agent'} type concurrency limit reached, skipping`, { + projectId, + agentType, + reason: lockStatus.reason, + }); + return { maxConcurrency, blocked: true }; + } + return { maxConcurrency, blocked: false }; +} diff --git a/src/router/container-manager.ts b/src/router/container-manager.ts index 5523ec34..2cbd0186 100644 --- a/src/router/container-manager.ts +++ b/src/router/container-manager.ts @@ -11,6 +11,7 @@ import { findProjectByRepo, getAllProjectCredentials } from '../config/provider. import { failOrphanedRun } from '../db/repositories/runsRepository.js'; import { captureException } from '../sentry.js'; import { logger } from '../utils/logging.js'; +import { clearAgentTypeEnqueued, clearAllAgentTypeLocks } from './agent-type-lock.js'; import { routerConfig } from './config.js'; import { notifyTimeout } from './notifications.js'; import type { CascadeJob } from './queue.js'; @@ -28,6 +29,8 @@ export interface ActiveWorker { projectId?: string; /** Resolved at spawn time for work-item lock cleanup. */ workItemId?: string; + /** Resolved at spawn time for agent-type lock cleanup. */ + agentType?: string; } const activeWorkers = new Map(); @@ -144,6 +147,18 @@ function extractWorkItemId(data: CascadeJob): string | undefined { return undefined; } +/** + * Extract agent type from job data for concurrency lock tracking. + * Checks triggerResult.agentType first, then top-level agentType (dashboard jobs). + */ +function extractAgentType(data: CascadeJob): string | undefined { + const jobData = data as unknown as { + triggerResult?: { agentType?: string }; + agentType?: string; + }; + return jobData.triggerResult?.agentType ?? jobData.agentType ?? undefined; +} + /** * Spawn a worker container for a job. * Sets up timeout tracking and monitors container exit asynchronously. @@ -204,6 +219,7 @@ export async function spawnWorker(job: Job): Promise { // Track the worker const workItemId = extractWorkItemId(job.data); + const agentType = extractAgentType(job.data); activeWorkers.set(jobId, { containerId: container.id, jobId, @@ -212,6 +228,7 @@ export async function spawnWorker(job: Job): Promise { job: job.data, projectId: projectId ?? undefined, workItemId, + agentType, }); logger.info('[WorkerManager] Worker started:', { @@ -318,6 +335,9 @@ export function cleanupWorker(jobId: string, exitCode?: number): void { const worker = activeWorkers.get(jobId); if (worker) { clearTimeout(worker.timeoutHandle); + if (worker.projectId && worker.agentType) { + clearAgentTypeEnqueued(worker.projectId, worker.agentType); + } if (worker.projectId && worker.workItemId) { clearWorkItemEnqueued(worker.projectId, worker.workItemId); if (exitCode !== undefined && exitCode !== 0) { @@ -386,4 +406,5 @@ export function detachAll(): void { } activeWorkers.clear(); clearAllWorkItemLocks(); + clearAllAgentTypeLocks(); } diff --git a/src/router/webhook-processor.ts b/src/router/webhook-processor.ts index 95b7ff4d..a86a9527 100644 --- a/src/router/webhook-processor.ts +++ b/src/router/webhook-processor.ts @@ -12,6 +12,11 @@ import type { TriggerRegistry } from '../triggers/registry.js'; import { logger } from '../utils/logging.js'; +import { + checkAgentTypeConcurrency, + markAgentTypeEnqueued, + markRecentlyDispatched, +} from './agent-type-lock.js'; import type { RouterPlatformAdapter } from './platform-adapter.js'; import { addJob } from './queue.js'; import { isWorkItemLocked, markWorkItemEnqueued } from './work-item-lock.js'; @@ -37,6 +42,7 @@ export interface ProcessRouterWebhookResult { * 9. Build and enqueue job * 10. Fire optional pre-actions (e.g. GitHub 👀 reaction) */ +// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: webhook pipeline with sequential guard checks export async function processRouterWebhook( adapter: RouterPlatformAdapter, payload: unknown, @@ -122,6 +128,20 @@ export async function processRouterWebhook( } } + // Step 7b: Agent-type concurrency limit + let agentTypeMaxConcurrency: number | null = null; + if (result.agentType) { + const concurrencyCheck = await checkAgentTypeConcurrency( + project.id, + result.agentType, + adapter.type, + ); + agentTypeMaxConcurrency = concurrencyCheck.maxConcurrency; + if (concurrencyCheck.blocked) { + return { shouldProcess: true, projectId: project.id }; + } + } + // Step 8: Post acknowledgment comment const ackResult = await adapter.postAck(event, payload, project, result.agentType); const ackCommentId = ackResult?.commentId; @@ -139,6 +159,10 @@ export async function processRouterWebhook( if (result.workItemId) { markWorkItemEnqueued(project.id, result.workItemId); } + if (result.agentType && agentTypeMaxConcurrency !== null) { + markRecentlyDispatched(project.id, result.agentType); + markAgentTypeEnqueued(project.id, result.agentType); + } logger.info(`${adapter.type} job queued`, { jobId, eventType: event.eventType, diff --git a/src/triggers/github/webhook-handler.ts b/src/triggers/github/webhook-handler.ts index f7d53a75..c69f6989 100644 --- a/src/triggers/github/webhook-handler.ts +++ b/src/triggers/github/webhook-handler.ts @@ -12,6 +12,12 @@ import { withGitHubToken } from '../../github/client.js'; import { getPersonaToken, resolvePersonaIdentities } from '../../github/personas.js'; import { withPMCredentials, withPMProvider } from '../../pm/context.js'; import { createPMProvider, pmRegistry } from '../../pm/index.js'; +import { + checkAgentTypeConcurrency, + clearAgentTypeEnqueued, + markAgentTypeEnqueued, + markRecentlyDispatched, +} from '../../router/agent-type-lock.js'; import type { CascadeConfig, ProjectConfig, TriggerContext } from '../../types/index.js'; import { clearCardActive, @@ -107,6 +113,7 @@ async function maybePostAckComment( } /** Run the agent with GitHub-specific execution config, managing processing flags. */ +// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: webhook orchestration with multiple guard checks async function runGitHubAgent( result: TriggerResult, project: ProjectConfig, @@ -119,6 +126,18 @@ async function runGitHubAgent( return; } + // Agent-type concurrency limit + let agentTypeMaxConcurrency: number | null = null; + if (result.agentType) { + const concurrencyCheck = await checkAgentTypeConcurrency(project.id, result.agentType); + agentTypeMaxConcurrency = concurrencyCheck.maxConcurrency; + if (concurrencyCheck.blocked) return; + if (agentTypeMaxConcurrency !== null) { + markRecentlyDispatched(project.id, result.agentType); + markAgentTypeEnqueued(project.id, result.agentType); + } + } + setProcessing(true); startWatchdog(config.defaults.watchdogTimeoutMs); @@ -156,6 +175,9 @@ async function runGitHubAgent( ); } finally { if (workItemId) clearCardActive(workItemId); + if (result.agentType && agentTypeMaxConcurrency !== null) { + clearAgentTypeEnqueued(project.id, result.agentType); + } setProcessing(false); processNextQueuedGitHubWebhook(registry); } diff --git a/tests/unit/pm/webhook-handler.test.ts b/tests/unit/pm/webhook-handler.test.ts index 13aa3386..5a9bd322 100644 --- a/tests/unit/pm/webhook-handler.test.ts +++ b/tests/unit/pm/webhook-handler.test.ts @@ -55,7 +55,15 @@ vi.mock('../../../src/pm/registry.js', () => ({ }, })); +vi.mock('../../../src/router/agent-type-lock.js', () => ({ + checkAgentTypeConcurrency: vi.fn().mockResolvedValue({ maxConcurrency: null, blocked: false }), + markAgentTypeEnqueued: vi.fn(), + clearAgentTypeEnqueued: vi.fn(), + markRecentlyDispatched: vi.fn(), +})); + import { processPMWebhook } from '../../../src/pm/webhook-handler.js'; +import { checkAgentTypeConcurrency } from '../../../src/router/agent-type-lock.js'; import { runAgentExecutionPipeline } from '../../../src/triggers/shared/agent-execution.js'; import { clearCardActive, @@ -294,6 +302,19 @@ describe('processPMWebhook', () => { expect(mockSetCardActive).not.toHaveBeenCalled(); }); + it('skips agent execution when agent-type concurrency is blocked', async () => { + vi.mocked(checkAgentTypeConcurrency).mockResolvedValueOnce({ + maxConcurrency: 1, + blocked: true, + }); + const integration = createMockIntegration(); + const registry = createMockRegistry(); + + await processPMWebhook(integration as never, { type: 'card_moved' }, registry as never); + + expect(mockRunAgentExecutionPipeline).not.toHaveBeenCalled(); + }); + it('calls withCredentials on integration during execution', async () => { const integration = createMockIntegration(); const registry = createMockRegistry(); diff --git a/tests/unit/router/agent-type-lock.test.ts b/tests/unit/router/agent-type-lock.test.ts new file mode 100644 index 00000000..cb1ef47c --- /dev/null +++ b/tests/unit/router/agent-type-lock.test.ts @@ -0,0 +1,231 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +vi.mock('../../../src/db/repositories/runsRepository.js', () => ({ + countActiveRunsForAgentType: vi.fn().mockResolvedValue(0), +})); +vi.mock('../../../src/db/repositories/settingsRepository.js', () => ({ + getMaxConcurrency: vi.fn().mockResolvedValue(null), +})); +vi.mock('../../../src/utils/logging.js', () => ({ + logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() }, +})); +vi.mock('../../../src/router/config.js', () => ({ + routerConfig: { workerTimeoutMs: 30 * 60 * 1000 }, +})); + +import { countActiveRunsForAgentType } from '../../../src/db/repositories/runsRepository.js'; +import { getMaxConcurrency } from '../../../src/db/repositories/settingsRepository.js'; +import { + checkAgentTypeConcurrency, + clearAgentTypeEnqueued, + clearAllAgentTypeLocks, + isAgentTypeLocked, + markAgentTypeEnqueued, + markRecentlyDispatched, + wasRecentlyDispatched, +} from '../../../src/router/agent-type-lock.js'; + +describe('agent-type-lock', () => { + beforeEach(() => { + clearAllAgentTypeLocks(); + vi.clearAllMocks(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + // ======================================================================== + // Layer 1: Concurrency Lock + // ======================================================================== + + describe('Layer 1: concurrency lock', () => { + it('returns locked: false when no active run and no in-memory mark', async () => { + const result = await isAgentTypeLocked('proj1', 'backlog-manager', 1); + expect(result).toEqual({ locked: false }); + expect(countActiveRunsForAgentType).toHaveBeenCalledWith( + 'proj1', + 'backlog-manager', + 2 * 30 * 60 * 1000, + ); + }); + + it('returns locked: true after markAgentTypeEnqueued (maxConcurrency=1)', async () => { + markAgentTypeEnqueued('proj1', 'backlog-manager'); + const result = await isAgentTypeLocked('proj1', 'backlog-manager', 1); + expect(result.locked).toBe(true); + expect(result.reason).toContain('in-memory'); + }); + + it('allows second enqueue when maxConcurrency=2', async () => { + markAgentTypeEnqueued('proj1', 'implementation'); + const result = await isAgentTypeLocked('proj1', 'implementation', 2); + expect(result.locked).toBe(false); + }); + + it('blocks at maxConcurrency=2 with 2 enqueued', async () => { + markAgentTypeEnqueued('proj1', 'implementation'); + markAgentTypeEnqueued('proj1', 'implementation'); + const result = await isAgentTypeLocked('proj1', 'implementation', 2); + expect(result.locked).toBe(true); + }); + + it('clearAgentTypeEnqueued releases one slot', async () => { + markAgentTypeEnqueued('proj1', 'backlog-manager'); + clearAgentTypeEnqueued('proj1', 'backlog-manager'); + const result = await isAgentTypeLocked('proj1', 'backlog-manager', 1); + expect(result.locked).toBe(false); + }); + + it('returns locked: true when DB has active runs at limit', async () => { + vi.mocked(countActiveRunsForAgentType).mockResolvedValueOnce(1); + const result = await isAgentTypeLocked('proj1', 'backlog-manager', 1); + expect(result.locked).toBe(true); + expect(result.reason).toContain('running'); + }); + + it('uses Math.max of in-memory and DB counts (not sum)', async () => { + markAgentTypeEnqueued('proj1', 'implementation'); + vi.mocked(countActiveRunsForAgentType).mockResolvedValueOnce(1); + // Math.max(1 DB, 1 in-memory) = 1 < 2 → NOT locked + const result = await isAgentTypeLocked('proj1', 'implementation', 2); + expect(result.locked).toBe(false); + }); + + it('locks when DB count alone meets max', async () => { + vi.mocked(countActiveRunsForAgentType).mockResolvedValueOnce(2); + const result = await isAgentTypeLocked('proj1', 'implementation', 2); + expect(result.locked).toBe(true); + }); + + it('different projects are independent', async () => { + markAgentTypeEnqueued('proj1', 'backlog-manager'); + + const result1 = await isAgentTypeLocked('proj1', 'backlog-manager', 1); + expect(result1.locked).toBe(true); + + const result2 = await isAgentTypeLocked('proj2', 'backlog-manager', 1); + expect(result2.locked).toBe(false); + }); + + it('different agent types are independent', async () => { + markAgentTypeEnqueued('proj1', 'backlog-manager'); + + const result1 = await isAgentTypeLocked('proj1', 'backlog-manager', 1); + expect(result1.locked).toBe(true); + + const result2 = await isAgentTypeLocked('proj1', 'implementation', 1); + expect(result2.locked).toBe(false); + }); + + it('TTL expiry releases the in-memory lock', async () => { + vi.useFakeTimers(); + markAgentTypeEnqueued('proj1', 'backlog-manager'); + + // Advance past 30 minutes + vi.advanceTimersByTime(30 * 60 * 1000 + 1); + + const result = await isAgentTypeLocked('proj1', 'backlog-manager', 1); + expect(result.locked).toBe(false); + }); + + it('clearAllAgentTypeLocks clears all entries', async () => { + markAgentTypeEnqueued('proj1', 'backlog-manager'); + markAgentTypeEnqueued('proj2', 'implementation'); + + clearAllAgentTypeLocks(); + + const result1 = await isAgentTypeLocked('proj1', 'backlog-manager', 1); + expect(result1.locked).toBe(false); + const result2 = await isAgentTypeLocked('proj2', 'implementation', 1); + expect(result2.locked).toBe(false); + }); + }); + + // ======================================================================== + // Layer 2: Dedup Window + // ======================================================================== + + describe('Layer 2: dedup window', () => { + it('returns false when not recently dispatched', () => { + expect(wasRecentlyDispatched('proj1', 'backlog-manager')).toBe(false); + }); + + it('returns true after markRecentlyDispatched', () => { + markRecentlyDispatched('proj1', 'backlog-manager'); + expect(wasRecentlyDispatched('proj1', 'backlog-manager')).toBe(true); + }); + + it('expires after 60 seconds', () => { + vi.useFakeTimers(); + markRecentlyDispatched('proj1', 'backlog-manager'); + + vi.advanceTimersByTime(60 * 1000 + 1); + + expect(wasRecentlyDispatched('proj1', 'backlog-manager')).toBe(false); + }); + + it('is NOT cleared by clearAgentTypeEnqueued', () => { + markRecentlyDispatched('proj1', 'backlog-manager'); + clearAgentTypeEnqueued('proj1', 'backlog-manager'); + expect(wasRecentlyDispatched('proj1', 'backlog-manager')).toBe(true); + }); + + it('is cleared by clearAllAgentTypeLocks', () => { + markRecentlyDispatched('proj1', 'backlog-manager'); + clearAllAgentTypeLocks(); + expect(wasRecentlyDispatched('proj1', 'backlog-manager')).toBe(false); + }); + + it('different projects are independent', () => { + markRecentlyDispatched('proj1', 'backlog-manager'); + expect(wasRecentlyDispatched('proj1', 'backlog-manager')).toBe(true); + expect(wasRecentlyDispatched('proj2', 'backlog-manager')).toBe(false); + }); + + it('different agent types are independent', () => { + markRecentlyDispatched('proj1', 'backlog-manager'); + expect(wasRecentlyDispatched('proj1', 'backlog-manager')).toBe(true); + expect(wasRecentlyDispatched('proj1', 'implementation')).toBe(false); + }); + }); + + // ======================================================================== + // Combined: checkAgentTypeConcurrency + // ======================================================================== + + describe('checkAgentTypeConcurrency', () => { + it('returns blocked: false when no maxConcurrency configured', async () => { + vi.mocked(getMaxConcurrency).mockResolvedValueOnce(null); + const result = await checkAgentTypeConcurrency('proj1', 'implementation'); + expect(result).toEqual({ maxConcurrency: null, blocked: false }); + }); + + it('returns blocked: true when recently dispatched', async () => { + vi.mocked(getMaxConcurrency).mockResolvedValueOnce(1); + markRecentlyDispatched('proj1', 'implementation'); + const result = await checkAgentTypeConcurrency('proj1', 'implementation'); + expect(result.blocked).toBe(true); + expect(result.maxConcurrency).toBe(1); + }); + + it('returns blocked: true when agent type is locked', async () => { + vi.mocked(getMaxConcurrency).mockResolvedValueOnce(1); + markAgentTypeEnqueued('proj1', 'implementation'); + const result = await checkAgentTypeConcurrency('proj1', 'implementation'); + expect(result.blocked).toBe(true); + }); + + it('returns blocked: false when under limit', async () => { + vi.mocked(getMaxConcurrency).mockResolvedValueOnce(2); + const result = await checkAgentTypeConcurrency('proj1', 'implementation'); + expect(result).toEqual({ maxConcurrency: 2, blocked: false }); + }); + + it('proceeds without limit when getMaxConcurrency throws', async () => { + vi.mocked(getMaxConcurrency).mockRejectedValueOnce(new Error('DB down')); + const result = await checkAgentTypeConcurrency('proj1', 'implementation'); + expect(result).toEqual({ maxConcurrency: null, blocked: false }); + }); + }); +}); diff --git a/tests/unit/router/webhook-processor.test.ts b/tests/unit/router/webhook-processor.test.ts index 8fc061a9..ea6006a5 100644 --- a/tests/unit/router/webhook-processor.test.ts +++ b/tests/unit/router/webhook-processor.test.ts @@ -15,7 +15,13 @@ vi.mock('../../../src/router/work-item-lock.js', () => ({ isWorkItemLocked: vi.fn().mockResolvedValue({ locked: false }), markWorkItemEnqueued: vi.fn(), })); +vi.mock('../../../src/router/agent-type-lock.js', () => ({ + checkAgentTypeConcurrency: vi.fn().mockResolvedValue({ maxConcurrency: null, blocked: false }), + markAgentTypeEnqueued: vi.fn(), + markRecentlyDispatched: vi.fn(), +})); +import { checkAgentTypeConcurrency } from '../../../src/router/agent-type-lock.js'; import type { RouterProjectConfig } from '../../../src/router/config.js'; import type { RouterPlatformAdapter } from '../../../src/router/platform-adapter.js'; import { addJob } from '../../../src/router/queue.js'; @@ -240,6 +246,24 @@ describe('processRouterWebhook', () => { expect(markWorkItemEnqueued).toHaveBeenCalledWith('p1', 'card1'); }); + it('skips job when agent-type concurrency is blocked', async () => { + vi.mocked(checkAgentTypeConcurrency).mockResolvedValueOnce({ + maxConcurrency: 1, + blocked: true, + }); + const triggerResult = { + agentType: 'implementation', + agentInput: { cardId: 'card1' }, + }; + const adapter = makeMockAdapter({ + dispatchWithCredentials: vi.fn().mockResolvedValue(triggerResult), + }); + + const result = await processRouterWebhook(adapter, {}, mockTriggerRegistry); + expect(result.shouldProcess).toBe(true); + expect(addJob).not.toHaveBeenCalled(); + }); + it('always enqueues job when trigger has no workItemId', async () => { const triggerResult = { agentType: 'debug', diff --git a/tests/unit/triggers/github-webhook-handler.test.ts b/tests/unit/triggers/github-webhook-handler.test.ts index f9b8230c..925775b3 100644 --- a/tests/unit/triggers/github-webhook-handler.test.ts +++ b/tests/unit/triggers/github-webhook-handler.test.ts @@ -63,6 +63,13 @@ vi.mock('../../../src/triggers/github/check-polling.js', () => ({ pollWaitForChecks: vi.fn().mockResolvedValue(true), })); +vi.mock('../../../src/router/agent-type-lock.js', () => ({ + checkAgentTypeConcurrency: vi.fn().mockResolvedValue({ maxConcurrency: null, blocked: false }), + markAgentTypeEnqueued: vi.fn(), + clearAgentTypeEnqueued: vi.fn(), + markRecentlyDispatched: vi.fn(), +})); + vi.mock('../../../src/utils/index.js', () => ({ clearCardActive: vi.fn(), enqueueWebhook: vi.fn().mockReturnValue(true), @@ -80,6 +87,7 @@ vi.mock('../../../src/utils/index.js', () => ({ startWatchdog: vi.fn(), })); +import { checkAgentTypeConcurrency } from '../../../src/router/agent-type-lock.js'; import { postAcknowledgmentComment } from '../../../src/triggers/github/ack-comments.js'; import { processGitHubWebhook } from '../../../src/triggers/github/webhook-handler.js'; import { runAgentWithCredentials } from '../../../src/triggers/shared/webhook-execution.js'; @@ -260,6 +268,16 @@ describe('processGitHubWebhook', () => { expect(mockSetProcessing).toHaveBeenCalledWith(false); }); + it('skips agent execution when agent-type concurrency is blocked', async () => { + vi.mocked(checkAgentTypeConcurrency).mockResolvedValueOnce({ + maxConcurrency: 1, + blocked: true, + }); + const registry = createMockRegistry(); + await processGitHubWebhook(validPayload, 'pull_request', registry as never); + expect(mockRunAgentWithCredentials).not.toHaveBeenCalled(); + }); + it('skips execution when no agentType in result', async () => { const registry = { dispatch: vi.fn().mockResolvedValue({ diff --git a/web/src/components/projects/project-agent-configs.tsx b/web/src/components/projects/project-agent-configs.tsx index 1c1046c5..387519cc 100644 --- a/web/src/components/projects/project-agent-configs.tsx +++ b/web/src/components/projects/project-agent-configs.tsx @@ -36,6 +36,7 @@ interface AgentConfig { model: string | null; maxIterations: number | null; agentBackend: string | null; + maxConcurrency: number | null; } function AgentConfigBadge({ config }: { config: AgentConfig | null }) { @@ -45,6 +46,7 @@ function AgentConfigBadge({ config }: { config: AgentConfig | null }) { const parts: string[] = []; if (config.model) parts.push(config.model); if (config.maxIterations) parts.push(`${config.maxIterations} iterations`); + if (config.maxConcurrency) parts.push(`max ${config.maxConcurrency} concurrent`); if (config.agentBackend) parts.push(config.agentBackend); if (parts.length === 0) return Configured; return {parts.join(' · ')}; @@ -237,6 +239,7 @@ export function ProjectAgentConfigs({ projectId }: { projectId: string }) { const [model, setModel] = useState(''); const [maxIterations, setMaxIterations] = useState(''); const [agentBackend, setAgentBackend] = useState(''); + const [maxConcurrency, setMaxConcurrency] = useState(''); const [localLifecycleTriggers, setLocalLifecycleTriggers] = useState>({}); const [lifecycleSaving, setLifecycleSaving] = useState(false); const [lifecycleSaved, setLifecycleSaved] = useState(false); @@ -253,6 +256,7 @@ export function ProjectAgentConfigs({ projectId }: { projectId: string }) { setModel(''); setMaxIterations(''); setAgentBackend(''); + setMaxConcurrency(''); setDialogOpen(true); } @@ -262,6 +266,7 @@ export function ProjectAgentConfigs({ projectId }: { projectId: string }) { setModel(config.model ?? ''); setMaxIterations(config.maxIterations?.toString() ?? ''); setAgentBackend(config.agentBackend ?? ''); + setMaxConcurrency(config.maxConcurrency?.toString() ?? ''); setDialogOpen(true); } @@ -274,6 +279,7 @@ export function ProjectAgentConfigs({ projectId }: { projectId: string }) { model: model || null, maxIterations: maxIterations ? Number(maxIterations) : null, agentBackend: agentBackend || null, + maxConcurrency: maxConcurrency ? Number(maxConcurrency) : null, }), onSuccess: () => { queryClient.invalidateQueries({ queryKey: configsQueryKey }); @@ -289,6 +295,7 @@ export function ProjectAgentConfigs({ projectId }: { projectId: string }) { model: model || null, maxIterations: maxIterations ? Number(maxIterations) : null, agentBackend: agentBackend || null, + maxConcurrency: maxConcurrency ? Number(maxConcurrency) : null, }), onSuccess: () => { queryClient.invalidateQueries({ queryKey: configsQueryKey }); @@ -536,6 +543,17 @@ export function ProjectAgentConfigs({ projectId }: { projectId: string }) { /> +
+ + setMaxConcurrency(e.target.value)} + placeholder="Optional — limits concurrent runs per project" + /> +
setMaxConcurrency(e.target.value)} + placeholder="Optional — limits concurrent runs per project" + /> +