diff --git a/src/pm/webhook-handler.ts b/src/pm/webhook-handler.ts index 540fb7cb..8548002c 100644 --- a/src/pm/webhook-handler.ts +++ b/src/pm/webhook-handler.ts @@ -113,11 +113,16 @@ async function handleMatchedTrigger( // Agent-type concurrency limit let agentTypeMaxConcurrency: number | null = null; if (result.agentType) { - const concurrencyCheck = await checkAgentTypeConcurrency(project.id, result.agentType); + const concurrencyCheck = await checkAgentTypeConcurrency( + project.id, + result.agentType, + undefined, + result.workItemId, + ); agentTypeMaxConcurrency = concurrencyCheck.maxConcurrency; if (concurrencyCheck.blocked) return; if (agentTypeMaxConcurrency !== null) { - markRecentlyDispatched(project.id, result.agentType); + markRecentlyDispatched(project.id, result.agentType, result.workItemId); markAgentTypeEnqueued(project.id, result.agentType); } } diff --git a/src/router/agent-type-lock.ts b/src/router/agent-type-lock.ts index e2dd01c8..2320fff5 100644 --- a/src/router/agent-type-lock.ts +++ b/src/router/agent-type-lock.ts @@ -32,6 +32,11 @@ function makeKey(projectId: string, agentType: string): string { return `${projectId}:${agentType}`; } +function makeDedupKey(projectId: string, agentType: string, dedupScope?: string): string { + const baseKey = makeKey(projectId, agentType); + return dedupScope ? `${baseKey}:${dedupScope}` : baseKey; +} + /** * Check whether an agent type is at its concurrency limit for a project. * Fast path: in-memory map. Fallback: DB count of running agent_runs. @@ -125,11 +130,15 @@ const DEDUP_TTL_MS = 60 * 1000; // 60 seconds const dedupMap = new Map(); /** - * Check whether an agent type was recently dispatched for a project. + * Check whether an agent type was recently dispatched for a project/scope. * Returns true if a dispatch happened within the dedup TTL window. */ -export function wasRecentlyDispatched(projectId: string, agentType: string): boolean { - const key = makeKey(projectId, agentType); +export function wasRecentlyDispatched( + projectId: string, + agentType: string, + dedupScope?: string, +): boolean { + const key = makeDedupKey(projectId, agentType, dedupScope); const timestamp = dedupMap.get(key); if (timestamp === undefined) return false; @@ -141,11 +150,15 @@ export function wasRecentlyDispatched(projectId: string, agentType: string): boo } /** - * Mark an agent type as recently dispatched for a project. + * Mark an agent type as recently dispatched for a project/scope. * The mark expires after DEDUP_TTL_MS and is NOT cleared on completion. */ -export function markRecentlyDispatched(projectId: string, agentType: string): void { - const key = makeKey(projectId, agentType); +export function markRecentlyDispatched( + projectId: string, + agentType: string, + dedupScope?: string, +): void { + const key = makeDedupKey(projectId, agentType, dedupScope); dedupMap.set(key, Date.now()); // Periodic cleanup: evict expired entries when map grows large @@ -173,6 +186,7 @@ export async function checkAgentTypeConcurrency( projectId: string, agentType: string, logLabel?: string, + dedupScope?: string, ): Promise<{ maxConcurrency: number | null; blocked: boolean }> { let maxConcurrency: number | null; try { @@ -187,10 +201,11 @@ export async function checkAgentTypeConcurrency( } if (maxConcurrency === null) return { maxConcurrency: null, blocked: false }; - if (wasRecentlyDispatched(projectId, agentType)) { + if (wasRecentlyDispatched(projectId, agentType, dedupScope)) { logger.info(`${logLabel ?? 'Agent'} recently dispatched, skipping (dedup)`, { projectId, agentType, + dedupScope, }); return { maxConcurrency, blocked: true }; } diff --git a/src/router/webhook-processor.ts b/src/router/webhook-processor.ts index 45860cd1..9caa69fb 100644 --- a/src/router/webhook-processor.ts +++ b/src/router/webhook-processor.ts @@ -173,6 +173,7 @@ export async function processRouterWebhook( project.id, result.agentType, adapter.type, + result.workItemId, ); agentTypeMaxConcurrency = concurrencyCheck.maxConcurrency; if (concurrencyCheck.blocked) { @@ -216,7 +217,7 @@ export async function processRouterWebhook( markWorkItemEnqueued(project.id, result.workItemId, result.agentType); } if (result.agentType && agentTypeMaxConcurrency !== null) { - markRecentlyDispatched(project.id, result.agentType); + markRecentlyDispatched(project.id, result.agentType, result.workItemId); markAgentTypeEnqueued(project.id, result.agentType); } logger.info(`${adapter.type} job queued`, { diff --git a/tests/unit/router/agent-type-lock.test.ts b/tests/unit/router/agent-type-lock.test.ts index fe5c4375..c296c091 100644 --- a/tests/unit/router/agent-type-lock.test.ts +++ b/tests/unit/router/agent-type-lock.test.ts @@ -187,6 +187,13 @@ describe('agent-type-lock', () => { expect(wasRecentlyDispatched('proj1', 'backlog-manager')).toBe(true); expect(wasRecentlyDispatched('proj1', 'implementation')).toBe(false); }); + + it('different dedup scopes are independent', () => { + markRecentlyDispatched('proj1', 'planning', 'TF-8'); + expect(wasRecentlyDispatched('proj1', 'planning', 'TF-8')).toBe(true); + expect(wasRecentlyDispatched('proj1', 'planning', 'TF-10')).toBe(false); + expect(wasRecentlyDispatched('proj1', 'planning')).toBe(false); + }); }); // ======================================================================== @@ -208,6 +215,24 @@ describe('agent-type-lock', () => { expect(result.maxConcurrency).toBe(1); }); + it('does not block a different work item in the dedup window', async () => { + vi.mocked(getMaxConcurrency).mockResolvedValueOnce(5); + markRecentlyDispatched('proj1', 'planning', 'TF-8'); + + const result = await checkAgentTypeConcurrency('proj1', 'planning', 'linear', 'TF-10'); + + expect(result).toEqual({ maxConcurrency: 5, blocked: false }); + }); + + it('blocks the same work item in the dedup window', async () => { + vi.mocked(getMaxConcurrency).mockResolvedValueOnce(5); + markRecentlyDispatched('proj1', 'planning', 'TF-8'); + + const result = await checkAgentTypeConcurrency('proj1', 'planning', 'linear', 'TF-8'); + + expect(result).toEqual({ maxConcurrency: 5, blocked: true }); + }); + it('returns blocked: true when agent type is locked', async () => { vi.mocked(getMaxConcurrency).mockResolvedValueOnce(1); markAgentTypeEnqueued('proj1', 'implementation');