Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/pm/webhook-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,16 @@ async function handleMatchedTrigger(
// Agent-type concurrency limit
let agentTypeMaxConcurrency: number | null = null;
if (result.agentType) {
const concurrencyCheck = await checkAgentTypeConcurrency(project.id, result.agentType);
const concurrencyCheck = await checkAgentTypeConcurrency(
project.id,
result.agentType,
undefined,
result.workItemId,
);
agentTypeMaxConcurrency = concurrencyCheck.maxConcurrency;
if (concurrencyCheck.blocked) return;
if (agentTypeMaxConcurrency !== null) {
markRecentlyDispatched(project.id, result.agentType);
markRecentlyDispatched(project.id, result.agentType, result.workItemId);
markAgentTypeEnqueued(project.id, result.agentType);
}
}
Expand Down
29 changes: 22 additions & 7 deletions src/router/agent-type-lock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ function makeKey(projectId: string, agentType: string): string {
return `${projectId}:${agentType}`;
}

function makeDedupKey(projectId: string, agentType: string, dedupScope?: string): string {
const baseKey = makeKey(projectId, agentType);
return dedupScope ? `${baseKey}:${dedupScope}` : baseKey;
}

/**
* Check whether an agent type is at its concurrency limit for a project.
* Fast path: in-memory map. Fallback: DB count of running agent_runs.
Expand Down Expand Up @@ -125,11 +130,15 @@ const DEDUP_TTL_MS = 60 * 1000; // 60 seconds
const dedupMap = new Map<string, number>();

/**
* Check whether an agent type was recently dispatched for a project.
* Check whether an agent type was recently dispatched for a project/scope.
* Returns true if a dispatch happened within the dedup TTL window.
*/
export function wasRecentlyDispatched(projectId: string, agentType: string): boolean {
const key = makeKey(projectId, agentType);
export function wasRecentlyDispatched(
projectId: string,
agentType: string,
dedupScope?: string,
): boolean {
const key = makeDedupKey(projectId, agentType, dedupScope);
const timestamp = dedupMap.get(key);
if (timestamp === undefined) return false;

Expand All @@ -141,11 +150,15 @@ export function wasRecentlyDispatched(projectId: string, agentType: string): boo
}

/**
* Mark an agent type as recently dispatched for a project.
* Mark an agent type as recently dispatched for a project/scope.
* The mark expires after DEDUP_TTL_MS and is NOT cleared on completion.
*/
export function markRecentlyDispatched(projectId: string, agentType: string): void {
const key = makeKey(projectId, agentType);
export function markRecentlyDispatched(
projectId: string,
agentType: string,
dedupScope?: string,
): void {
const key = makeDedupKey(projectId, agentType, dedupScope);
dedupMap.set(key, Date.now());

// Periodic cleanup: evict expired entries when map grows large
Expand Down Expand Up @@ -173,6 +186,7 @@ export async function checkAgentTypeConcurrency(
projectId: string,
agentType: string,
logLabel?: string,
dedupScope?: string,
): Promise<{ maxConcurrency: number | null; blocked: boolean }> {
let maxConcurrency: number | null;
try {
Expand All @@ -187,10 +201,11 @@ export async function checkAgentTypeConcurrency(
}
if (maxConcurrency === null) return { maxConcurrency: null, blocked: false };

if (wasRecentlyDispatched(projectId, agentType)) {
if (wasRecentlyDispatched(projectId, agentType, dedupScope)) {
logger.info(`${logLabel ?? 'Agent'} recently dispatched, skipping (dedup)`, {
projectId,
agentType,
dedupScope,
});
return { maxConcurrency, blocked: true };
}
Expand Down
3 changes: 2 additions & 1 deletion src/router/webhook-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ export async function processRouterWebhook(
project.id,
result.agentType,
adapter.type,
result.workItemId,
);
agentTypeMaxConcurrency = concurrencyCheck.maxConcurrency;
if (concurrencyCheck.blocked) {
Expand Down Expand Up @@ -216,7 +217,7 @@ export async function processRouterWebhook(
markWorkItemEnqueued(project.id, result.workItemId, result.agentType);
}
if (result.agentType && agentTypeMaxConcurrency !== null) {
markRecentlyDispatched(project.id, result.agentType);
markRecentlyDispatched(project.id, result.agentType, result.workItemId);
markAgentTypeEnqueued(project.id, result.agentType);
}
logger.info(`${adapter.type} job queued`, {
Expand Down
25 changes: 25 additions & 0 deletions tests/unit/router/agent-type-lock.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,13 @@ describe('agent-type-lock', () => {
expect(wasRecentlyDispatched('proj1', 'backlog-manager')).toBe(true);
expect(wasRecentlyDispatched('proj1', 'implementation')).toBe(false);
});

it('different dedup scopes are independent', () => {
markRecentlyDispatched('proj1', 'planning', 'TF-8');
expect(wasRecentlyDispatched('proj1', 'planning', 'TF-8')).toBe(true);
expect(wasRecentlyDispatched('proj1', 'planning', 'TF-10')).toBe(false);
expect(wasRecentlyDispatched('proj1', 'planning')).toBe(false);
});
});

// ========================================================================
Expand All @@ -208,6 +215,24 @@ describe('agent-type-lock', () => {
expect(result.maxConcurrency).toBe(1);
});

it('does not block a different work item in the dedup window', async () => {
vi.mocked(getMaxConcurrency).mockResolvedValueOnce(5);
markRecentlyDispatched('proj1', 'planning', 'TF-8');

const result = await checkAgentTypeConcurrency('proj1', 'planning', 'linear', 'TF-10');

expect(result).toEqual({ maxConcurrency: 5, blocked: false });
});

it('blocks the same work item in the dedup window', async () => {
vi.mocked(getMaxConcurrency).mockResolvedValueOnce(5);
markRecentlyDispatched('proj1', 'planning', 'TF-8');

const result = await checkAgentTypeConcurrency('proj1', 'planning', 'linear', 'TF-8');

expect(result).toEqual({ maxConcurrency: 5, blocked: true });
});

it('returns blocked: true when agent type is locked', async () => {
vi.mocked(getMaxConcurrency).mockResolvedValueOnce(1);
markAgentTypeEnqueued('proj1', 'implementation');
Expand Down
Loading