Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/api/routers/agentConfigs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export const agentConfigsRouter = router({
model: z.string().nullish(),
maxIterations: z.number().int().positive().nullish(),
agentBackend: z.string().nullish(),
maxConcurrency: z.number().int().positive().nullish(),
}),
)
.mutation(async ({ ctx, input }) => {
Expand All @@ -66,6 +67,7 @@ export const agentConfigsRouter = router({
model: input.model,
maxIterations: input.maxIterations,
agentBackend: input.agentBackend,
maxConcurrency: input.maxConcurrency,
});
}),

Expand All @@ -77,6 +79,7 @@ export const agentConfigsRouter = router({
model: z.string().nullish(),
maxIterations: z.number().int().positive().nullish(),
agentBackend: z.string().nullish(),
maxConcurrency: z.number().int().positive().nullish(),
}),
)
.mutation(async ({ ctx, input }) => {
Expand Down
2 changes: 2 additions & 0 deletions src/cli/dashboard/agents/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export default class AgentsCreate extends DashboardCommand {
model: Flags.string({ description: 'Model override' }),
'max-iterations': Flags.integer({ description: 'Max iterations override' }),
backend: Flags.string({ description: 'Agent backend override' }),
'max-concurrency': Flags.integer({ description: 'Max concurrent runs per project' }),
};

async run(): Promise<void> {
Expand All @@ -26,6 +27,7 @@ export default class AgentsCreate extends DashboardCommand {
model: flags.model,
maxIterations: flags['max-iterations'],
agentBackend: flags.backend,
maxConcurrency: flags['max-concurrency'],
});

if (flags.json) {
Expand Down
2 changes: 2 additions & 0 deletions src/cli/dashboard/agents/update.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export default class AgentsUpdate extends DashboardCommand {
model: Flags.string({ description: 'Model override' }),
'max-iterations': Flags.integer({ description: 'Max iterations override' }),
backend: Flags.string({ description: 'Agent backend override' }),
'max-concurrency': Flags.integer({ description: 'Max concurrent runs per project' }),
};

async run(): Promise<void> {
Expand All @@ -26,6 +27,7 @@ export default class AgentsUpdate extends DashboardCommand {
model: flags.model,
maxIterations: flags['max-iterations'],
agentBackend: flags.backend,
maxConcurrency: flags['max-concurrency'],
});

if (flags.json) {
Expand Down
1 change: 1 addition & 0 deletions src/db/migrations/0026_agent_type_concurrency.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE agent_configs ADD COLUMN max_concurrency INTEGER CHECK (max_concurrency IS NULL OR max_concurrency > 0);
7 changes: 7 additions & 0 deletions src/db/migrations/meta/_journal.json
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,13 @@
"when": 1760000000000,
"tag": "0025_integration_hooks",
"breakpoints": false
},
{
"idx": 26,
"version": "7",
"when": 1761000000000,
"tag": "0026_agent_type_concurrency",
"breakpoints": false
}
]
}
22 changes: 22 additions & 0 deletions src/db/repositories/runsRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,28 @@ export async function hasActiveRunForWorkItem(
return !!row;
}

export async function countActiveRunsForAgentType(
projectId: string,
agentType: string,
maxAgeMs?: number,
): Promise<number> {
const db = getDb();
const conditions: SQL[] = [
eq(agentRuns.projectId, projectId),
eq(agentRuns.agentType, agentType),
eq(agentRuns.status, 'running'),
];
if (maxAgeMs !== undefined) {
const cutoff = new Date(Date.now() - maxAgeMs);
conditions.push(gte(agentRuns.startedAt, cutoff));
}
const [row] = await db
.select({ count: count() })
.from(agentRuns)
.where(and(...conditions));
return row?.count ?? 0;
}

export async function failOrphanedRun(
projectId: string,
workItemId: string,
Expand Down
74 changes: 74 additions & 0 deletions src/db/repositories/settingsRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ export async function createAgentConfig(data: {
model?: string | null;
maxIterations?: number | null;
agentBackend?: string | null;
maxConcurrency?: number | null;
}) {
const db = getDb();
const [row] = await db
Expand All @@ -340,6 +341,7 @@ export async function createAgentConfig(data: {
model: data.model,
maxIterations: data.maxIterations,
agentBackend: data.agentBackend,
maxConcurrency: data.maxConcurrency,
})
.returning({ id: agentConfigs.id });
return row;
Expand All @@ -352,6 +354,7 @@ export async function updateAgentConfig(
model?: string | null;
maxIterations?: number | null;
agentBackend?: string | null;
maxConcurrency?: number | null;
},
) {
const db = getDb();
Expand All @@ -365,3 +368,74 @@ export async function deleteAgentConfig(id: number) {
const db = getDb();
await db.delete(agentConfigs).where(eq(agentConfigs.id, id));
}

/**
* Resolve max_concurrency for a (projectId, agentType) pair.
* Checks project-scoped config first, then org-scoped config.
* Returns null if no config with max_concurrency is found (= no limit).
*
* Results are cached for 5 seconds to avoid repeated DB queries on
* sequential webhook batches.
*/
const MAX_CONCURRENCY_TTL_MS = 5_000;
const maxConcurrencyCache = new Map<string, { value: number | null; expiresAt: number }>();

export async function getMaxConcurrency(
projectId: string,
agentType: string,
): Promise<number | null> {
const cacheKey = `${projectId}:${agentType}`;
const cached = maxConcurrencyCache.get(cacheKey);
if (cached && Date.now() < cached.expiresAt) {
return cached.value;
}

const db = getDb();

// 1. Project-scoped config
const [projectConfig] = await db
.select({ maxConcurrency: agentConfigs.maxConcurrency })
.from(agentConfigs)
.where(and(eq(agentConfigs.projectId, projectId), eq(agentConfigs.agentType, agentType)))
.limit(1);
if (projectConfig?.maxConcurrency != null) {
maxConcurrencyCache.set(cacheKey, {
value: projectConfig.maxConcurrency,
expiresAt: Date.now() + MAX_CONCURRENCY_TTL_MS,
});
return projectConfig.maxConcurrency;
}

// 2. Org-scoped config — need orgId from project
const [project] = await db
.select({ orgId: projects.orgId })
.from(projects)
.where(eq(projects.id, projectId))
.limit(1);
if (!project) {
maxConcurrencyCache.set(cacheKey, {
value: null,
expiresAt: Date.now() + MAX_CONCURRENCY_TTL_MS,
});
return null;
}

const [orgConfig] = await db
.select({ maxConcurrency: agentConfigs.maxConcurrency })
.from(agentConfigs)
.where(
and(
eq(agentConfigs.orgId, project.orgId),
isNull(agentConfigs.projectId),
eq(agentConfigs.agentType, agentType),
),
)
.limit(1);

const result = orgConfig?.maxConcurrency ?? null;
maxConcurrencyCache.set(cacheKey, {
value: result,
expiresAt: Date.now() + MAX_CONCURRENCY_TTL_MS,
});
return result;
}
1 change: 1 addition & 0 deletions src/db/schema/agentConfigs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export const agentConfigs = pgTable(
model: text('model'),
maxIterations: integer('max_iterations'),
agentBackend: text('agent_backend'),
maxConcurrency: integer('max_concurrency'),
createdAt: timestamp('created_at').defaultNow(),
updatedAt: timestamp('updated_at')
.defaultNow()
Expand Down
22 changes: 22 additions & 0 deletions src/pm/webhook-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
* ack comment management) is delegated to the PMIntegration interface.
*/

import {
checkAgentTypeConcurrency,
clearAgentTypeEnqueued,
markAgentTypeEnqueued,
markRecentlyDispatched,
} from '../router/agent-type-lock.js';
import type { TriggerRegistry } from '../triggers/registry.js';
import { runAgentWithCredentials } from '../triggers/shared/webhook-execution.js';
import { processNextQueuedWebhook } from '../triggers/shared/webhook-queue.js';
Expand Down Expand Up @@ -99,6 +105,7 @@ async function resolveTriggerResult(
return result;
}

// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: webhook orchestration with multiple guard checks
async function handleMatchedTrigger(
integration: PMIntegration,
registry: TriggerRegistry,
Expand Down Expand Up @@ -129,6 +136,18 @@ async function handleMatchedTrigger(
return;
}

// Agent-type concurrency limit
let agentTypeMaxConcurrency: number | null = null;
if (result.agentType) {
const concurrencyCheck = await checkAgentTypeConcurrency(project.id, result.agentType);
agentTypeMaxConcurrency = concurrencyCheck.maxConcurrency;
if (concurrencyCheck.blocked) return;
if (agentTypeMaxConcurrency !== null) {
markRecentlyDispatched(project.id, result.agentType);
markAgentTypeEnqueued(project.id, result.agentType);
}
}

logger.info(`${integration.type} trigger matched`, {
agentType: result.agentType,
workItemId,
Expand All @@ -154,6 +173,9 @@ async function handleMatchedTrigger(
if (workItemId) {
clearCardActive(workItemId);
}
if (result.agentType && agentTypeMaxConcurrency !== null) {
clearAgentTypeEnqueued(project.id, result.agentType);
}
setProcessing(false);
processNextQueued(integration, registry);
}
Expand Down
Loading