From 6b9ac530eae67e2caa23a405e8de43e348656254 Mon Sep 17 00:00:00 2001 From: Zbigniew Sobiecki Date: Fri, 24 Apr 2026 13:24:13 +0200 Subject: [PATCH] fix(triggers): enforce maxInFlightItems on PM status-changed (not just backlog-manager) (#1181) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `maxInFlightItems` was only consulted at the two backlog-manager chain sites (pr-merged, splitting auto-chain). PM `status-changed` triggers fired `implementation` for every card moved into TODO with no capacity check, so a human pushing N cards burst N parallel implementations straight past the cap. Observed on prod ua-store with limit=1 → 3 implementations running concurrently. This adds a shared gate (`shouldBlockForPipelineCapacity`) called from the JIRA / Linear / Trello status-changed handlers. Only `implementation` is gated — it's the only PM-status-reachable agent that consumes a TODO/ IN_PROGRESS/IN_REVIEW slot per `STATUS_TO_AGENT`. When the active pipeline (excluding the just-moved card) is at the cap, the trigger returns null and the card sits in its column until a slot frees, at which point the existing `pr-merged → backlog-manager` chain picks it up. Mirrors the behavior the backlog-manager gate already has. Adds `isActivePipelineOverCapacity` next to the existing `isPipelineAtCapacity` — same misconfigured/error fallbacks, but no backlog-empty short-circuit (irrelevant here) and an `excludeWorkItemId` arg so the card whose move just fired the webhook isn't double-counted. Co-authored-by: Claude Opus 4.7 (1M context) --- docs/architecture/08-config-credentials.md | 7 +- src/triggers/jira/status-changed.ts | 12 ++ src/triggers/linear/status-changed.ts | 12 ++ src/triggers/shared/backlog-check.ts | 62 +++++++ src/triggers/shared/pipeline-capacity-gate.ts | 63 ++++++++ src/triggers/trello/status-changed.ts | 13 ++ .../triggers/shared/backlog-check.test.ts | 151 +++++++++++++++++- .../shared/pipeline-capacity-gate.test.ts | 111 +++++++++++++ 8 files changed, 429 insertions(+), 2 deletions(-) create mode 100644 src/triggers/shared/pipeline-capacity-gate.ts create mode 100644 tests/unit/triggers/shared/pipeline-capacity-gate.test.ts diff --git a/docs/architecture/08-config-credentials.md b/docs/architecture/08-config-credentials.md index 0acffb24..ad082fb2 100644 --- a/docs/architecture/08-config-credentials.md +++ b/docs/architecture/08-config-credentials.md @@ -50,11 +50,16 @@ interface ProjectConfig { engineSettings?: EngineSettings; agentEngineSettings?: Record; runLinksEnabled: boolean; - maxInFlightItems?: number; + maxInFlightItems?: number; // hard cap on TODO+IN_PROGRESS+IN_REVIEW; default 1 // ... PM config (trello/jira), agent models, snapshot settings } ``` +`maxInFlightItems` is enforced at two points: (a) the `backlog-manager` chain +gates (won't auto-pull from BACKLOG when at capacity) and (b) the PM +`status-changed` triggers (won't fire `implementation` when a card is moved +into TODO past the cap). See `src/triggers/shared/pipeline-capacity-gate.ts`. + ## Credential Resolution CASCADE uses a two-tier credential resolution system, selecting the appropriate resolver based on execution context. diff --git a/src/triggers/jira/status-changed.ts b/src/triggers/jira/status-changed.ts index 77c92858..efaafc79 100644 --- a/src/triggers/jira/status-changed.ts +++ b/src/triggers/jira/status-changed.ts @@ -12,6 +12,7 @@ import { getJiraConfig } from '../../pm/config.js'; import type { TriggerContext, TriggerHandler, TriggerResult } from '../../types/index.js'; import { logger } from '../../utils/logging.js'; +import { shouldBlockForPipelineCapacity } from '../shared/pipeline-capacity-gate.js'; import { checkTriggerEnabledWithParams } from '../shared/trigger-check.js'; import { type JiraWebhookPayload, STATUS_TO_AGENT } from './types.js'; @@ -124,6 +125,17 @@ export class JiraStatusChangedTrigger implements TriggerHandler { return null; } + if ( + await shouldBlockForPipelineCapacity({ + project: ctx.project, + agentType, + workItemId: issueKey, + source: 'jira', + }) + ) { + return null; + } + const statusChange = findStatusChange(payload); logger.info('JIRA issue entered agent-triggering status', { issueKey, diff --git a/src/triggers/linear/status-changed.ts b/src/triggers/linear/status-changed.ts index 74732c20..47e7d8d5 100644 --- a/src/triggers/linear/status-changed.ts +++ b/src/triggers/linear/status-changed.ts @@ -16,6 +16,7 @@ import { getLinearConfig } from '../../pm/config.js'; import type { TriggerContext, TriggerHandler, TriggerResult } from '../../types/index.js'; import { logger } from '../../utils/logging.js'; +import { shouldBlockForPipelineCapacity } from '../shared/pipeline-capacity-gate.js'; import { checkTriggerEnabledWithParams } from '../shared/trigger-check.js'; import { type LinearWebhookTriggerPayload, STATUS_TO_AGENT } from './types.js'; @@ -114,6 +115,17 @@ export class LinearStatusChangedTrigger implements TriggerHandler { return null; } + if ( + await shouldBlockForPipelineCapacity({ + project: ctx.project, + agentType, + workItemId: issueIdentifier, + source: 'linear', + }) + ) { + return null; + } + logger.info('Linear issue entered agent-triggering state', { issueIdentifier, eventKind: isCreate ? 'create' : 'move', diff --git a/src/triggers/shared/backlog-check.ts b/src/triggers/shared/backlog-check.ts index cb20459f..b16120fa 100644 --- a/src/triggers/shared/backlog-check.ts +++ b/src/triggers/shared/backlog-check.ts @@ -100,6 +100,68 @@ function isProviderMisconfigured(project: ProjectConfig, provider: PMProvider): } } +/** + * Lighter sibling of `isPipelineAtCapacity` for the PM `status-changed` gate. + * + * Two semantic differences: + * 1. Does NOT short-circuit on an empty backlog. Backlog state is irrelevant + * when deciding "is the active pipeline already too full?". + * 2. Accepts `excludeWorkItemId` so the card whose status change just fired + * the trigger isn't double-counted (it has typically *already* moved into + * TODO by the time the webhook lands). + * + * "Active pipeline" = TODO + IN_PROGRESS + IN_REVIEW. + */ +export interface ActivePipelineCapacityResult { + overCapacity: boolean; + reason: 'over-capacity' | 'below-capacity' | 'error' | 'misconfigured'; + inFlightCount?: number; + limit?: number; +} + +export async function isActivePipelineOverCapacity( + project: ProjectConfig, + provider: PMProvider, + opts: { excludeWorkItemId?: string } = {}, +): Promise { + const limit = project.maxInFlightItems ?? 1; + + if (isProviderMisconfigured(project, provider)) { + logger.warn('isActivePipelineOverCapacity: provider config incomplete', { + providerType: provider.type, + projectId: project.id, + }); + return { overCapacity: false, reason: 'misconfigured' }; + } + + try { + const lists = await Promise.all( + (['todo', 'inProgress', 'inReview'] as const).map((status) => + provider.listWorkItems(undefined, { status }), + ), + ); + const all = lists.flat() as Array<{ id?: unknown }>; + const counted = opts.excludeWorkItemId + ? all.filter((item) => String(item?.id ?? '') !== opts.excludeWorkItemId) + : all; + const inFlightCount = counted.length; + + if (inFlightCount >= limit) { + return { overCapacity: true, reason: 'over-capacity', inFlightCount, limit }; + } + return { overCapacity: false, reason: 'below-capacity', inFlightCount, limit }; + } catch (err) { + logger.warn( + 'isActivePipelineOverCapacity: failed to check capacity, assuming not over capacity', + { + projectId: project.id, + error: String(err), + }, + ); + return { overCapacity: false, reason: 'error' }; + } +} + export async function isPipelineAtCapacity( project: ProjectConfig, provider: PMProvider, diff --git a/src/triggers/shared/pipeline-capacity-gate.ts b/src/triggers/shared/pipeline-capacity-gate.ts new file mode 100644 index 00000000..a8138128 --- /dev/null +++ b/src/triggers/shared/pipeline-capacity-gate.ts @@ -0,0 +1,63 @@ +/** + * Shared pipeline-capacity gate for PM `status-changed` triggers. + * + * `maxInFlightItems` is meant as a hard cap on the *active pipeline* + * (TODO + IN_PROGRESS + IN_REVIEW). Without this gate, a human moving N + * cards into the TODO column fires N implementation runs in parallel and + * blows past the limit — see the regression on `ua-store` (2026-04-24) + * where 3 implementations ran concurrently despite `maxInFlightItems: 1`. + * + * Currently only `implementation` is gated: of the agents reachable via PM + * `status-changed` (see `STATUS_TO_AGENT`), it is the only one that consumes + * a TODO/IN_PROGRESS/IN_REVIEW slot. `splitting` and `planning` use their own + * dedicated columns; `backlog-manager` already has dedicated capacity gates + * at its two chain sites (pr-merged, splitting auto-chain). + */ + +import { getPMProvider } from '../../pm/context.js'; +import type { PMProvider } from '../../pm/types.js'; +import type { ProjectConfig } from '../../types/index.js'; +import { logger } from '../../utils/logging.js'; +import { isActivePipelineOverCapacity } from './backlog-check.js'; + +const SLOT_CONSUMING_AGENTS: ReadonlySet = new Set(['implementation']); + +export async function shouldBlockForPipelineCapacity(args: { + project: ProjectConfig; + agentType: string; + workItemId: string; + source: string; +}): Promise { + if (!SLOT_CONSUMING_AGENTS.has(args.agentType)) return false; + + let provider: PMProvider; + try { + provider = getPMProvider(); + } catch (err) { + // No credential scope — conservative: allow. + logger.warn('pipeline-capacity-gate: PM provider unavailable, allowing run', { + source: args.source, + projectId: args.project.id, + workItemId: args.workItemId, + error: String(err), + }); + return false; + } + + const result = await isActivePipelineOverCapacity(args.project, provider, { + excludeWorkItemId: args.workItemId, + }); + + if (result.overCapacity) { + logger.info('pipeline-at-capacity: skipping status-changed trigger', { + source: args.source, + projectId: args.project.id, + workItemId: args.workItemId, + agentType: args.agentType, + inFlightCount: result.inFlightCount, + limit: result.limit, + }); + return true; + } + return false; +} diff --git a/src/triggers/trello/status-changed.ts b/src/triggers/trello/status-changed.ts index bacddf87..7172c202 100644 --- a/src/triggers/trello/status-changed.ts +++ b/src/triggers/trello/status-changed.ts @@ -1,6 +1,7 @@ import { getTrelloConfig } from '../../pm/config.js'; import { invalidateSnapshot } from '../../router/snapshot-manager.js'; import { logger } from '../../utils/logging.js'; +import { shouldBlockForPipelineCapacity } from '../shared/pipeline-capacity-gate.js'; import { checkTriggerEnabledWithParams } from '../shared/trigger-check.js'; import type { TriggerContext, TriggerHandler, TriggerResult } from '../types.js'; import { isTrelloWebhookPayload, type TrelloWebhookPayload } from './types.js'; @@ -54,6 +55,7 @@ function createStatusChangedTrigger(config: StatusChangedConfig): TriggerHandler return isMove || isCreate; }, + // biome-ignore lint/complexity/noExcessiveCognitiveComplexity: sequential guard checks (enabled → fire mode → cardId → capacity) async handle(ctx: TriggerContext): Promise { const { enabled, parameters } = await checkTriggerEnabledWithParams( ctx.project.id, @@ -83,6 +85,17 @@ function createStatusChangedTrigger(config: StatusChangedConfig): TriggerHandler return null; } + if ( + await shouldBlockForPipelineCapacity({ + project: ctx.project, + agentType: config.agentType, + workItemId: cardId, + source: 'trello', + }) + ) { + return null; + } + const cardShortLink = payload.action.data.card?.shortLink; const cardName = payload.action.data.card?.name; const workItemUrl = cardShortLink ? `https://trello.com/c/${cardShortLink}` : undefined; diff --git a/tests/unit/triggers/shared/backlog-check.test.ts b/tests/unit/triggers/shared/backlog-check.test.ts index cd119647..db77e45a 100644 --- a/tests/unit/triggers/shared/backlog-check.test.ts +++ b/tests/unit/triggers/shared/backlog-check.test.ts @@ -28,7 +28,10 @@ vi.mock('../../../../src/utils/logging.js', () => ({ logger: mockLogger, })); -import { isPipelineAtCapacity } from '../../../../src/triggers/shared/backlog-check.js'; +import { + isActivePipelineOverCapacity, + isPipelineAtCapacity, +} from '../../../../src/triggers/shared/backlog-check.js'; import { createMockJiraProject, createMockLinearProject, @@ -726,3 +729,149 @@ describe('isPipelineAtCapacity', () => { }); }); }); + +// --------------------------------------------------------------------------- +// isActivePipelineOverCapacity tests +// --------------------------------------------------------------------------- + +describe('isActivePipelineOverCapacity', () => { + const project = createMockJiraProject({ + jira: { + projectKey: 'UA', + statuses: { + backlog: 'BACKLOG', + todo: 'To Do', + inProgress: 'In Progress', + inReview: 'In Review', + }, + }, + maxInFlightItems: 1, + }); + + function arm() { + mockGetJiraConfig.mockReturnValue({ + projectKey: 'UA', + statuses: { + backlog: 'BACKLOG', + todo: 'To Do', + inProgress: 'In Progress', + inReview: 'In Review', + }, + }); + } + + it('returns over-capacity when TODO+IN_PROGRESS+IN_REVIEW >= limit', async () => { + arm(); + const provider = makeProvider('jira', { + todo: [{ id: 'UA-1' }], + inProgress: [{ id: 'UA-2' }], + }); + const result = await isActivePipelineOverCapacity(project, provider); + expect(result.overCapacity).toBe(true); + expect(result.inFlightCount).toBe(2); + expect(result.limit).toBe(1); + }); + + it('returns below-capacity when in-flight < limit', async () => { + arm(); + const provider = makeProvider('jira', { todo: [] }); + const result = await isActivePipelineOverCapacity(project, provider); + expect(result.overCapacity).toBe(false); + expect(result.inFlightCount).toBe(0); + expect(result.limit).toBe(1); + }); + + it('does NOT short-circuit on empty backlog (unlike isPipelineAtCapacity)', async () => { + // Active pipeline is what matters; an empty backlog should not hide an + // already-over-capacity active pipeline. + arm(); + const provider = makeProvider('jira', { + backlog: [], + todo: [{ id: 'UA-1' }], + inProgress: [{ id: 'UA-2' }], + }); + const result = await isActivePipelineOverCapacity(project, provider); + expect(result.overCapacity).toBe(true); + expect(result.inFlightCount).toBe(2); + }); + + it('excludes the given workItemId from the in-flight count', async () => { + // The canonical status-changed case: the card that just moved to TODO is + // already counted by listWorkItems; excluding it lets the first card + // through when limit=1. + arm(); + const provider = makeProvider('jira', { + todo: [{ id: 'UA-42' }], + }); + const result = await isActivePipelineOverCapacity(project, provider, { + excludeWorkItemId: 'UA-42', + }); + expect(result.overCapacity).toBe(false); + expect(result.inFlightCount).toBe(0); + expect(result.limit).toBe(1); + }); + + it('excludes by id even when the card appears in a different active column', async () => { + const projectLimit2 = createMockJiraProject({ + jira: { + projectKey: 'UA', + statuses: { + backlog: 'BACKLOG', + todo: 'To Do', + inProgress: 'In Progress', + inReview: 'In Review', + }, + }, + maxInFlightItems: 2, + }); + arm(); + const provider = makeProvider('jira', { + inProgress: [{ id: 'UA-7' }], + inReview: [{ id: 'UA-9' }], + }); + const result = await isActivePipelineOverCapacity(projectLimit2, provider, { + excludeWorkItemId: 'UA-9', + }); + expect(result.overCapacity).toBe(false); + expect(result.inFlightCount).toBe(1); + expect(result.limit).toBe(2); + }); + + it('returns not-over-capacity on provider error (conservative fallback)', async () => { + arm(); + const provider = makeErrorProvider('jira'); + const result = await isActivePipelineOverCapacity(project, provider); + expect(result.overCapacity).toBe(false); + expect(result.reason).toBe('error'); + }); + + it('returns not-over-capacity when provider is misconfigured', async () => { + mockGetJiraConfig.mockReturnValue({ projectKey: 'UA' }); // no statuses + const provider = makeProvider('jira', { + todo: [{ id: 'UA-1' }, { id: 'UA-2' }], + }); + const result = await isActivePipelineOverCapacity(project, provider); + expect(result.overCapacity).toBe(false); + expect(result.reason).toBe('misconfigured'); + }); + + it('uses default limit=1 when maxInFlightItems is not set', async () => { + const projectNoLimit = createMockJiraProject({ + jira: { + projectKey: 'UA', + statuses: { + backlog: 'BACKLOG', + todo: 'To Do', + inProgress: 'In Progress', + inReview: 'In Review', + }, + }, + }); + arm(); + const provider = makeProvider('jira', { todo: [{ id: 'UA-1' }, { id: 'UA-2' }] }); + const result = await isActivePipelineOverCapacity(projectNoLimit, provider); + expect(result.overCapacity).toBe(true); + expect(result.limit).toBe(1); + expect(result.inFlightCount).toBe(2); + }); +}); diff --git a/tests/unit/triggers/shared/pipeline-capacity-gate.test.ts b/tests/unit/triggers/shared/pipeline-capacity-gate.test.ts new file mode 100644 index 00000000..59502328 --- /dev/null +++ b/tests/unit/triggers/shared/pipeline-capacity-gate.test.ts @@ -0,0 +1,111 @@ +import { describe, expect, it, vi } from 'vitest'; + +const { mockGetPMProvider, mockIsActivePipelineOverCapacity, mockLogger } = vi.hoisted(() => ({ + mockGetPMProvider: vi.fn(), + mockIsActivePipelineOverCapacity: vi.fn(), + mockLogger: { info: vi.fn(), warn: vi.fn(), debug: vi.fn(), error: vi.fn() }, +})); + +vi.mock('../../../../src/pm/context.js', () => ({ + getPMProvider: mockGetPMProvider, +})); + +vi.mock('../../../../src/triggers/shared/backlog-check.js', () => ({ + isActivePipelineOverCapacity: mockIsActivePipelineOverCapacity, +})); + +vi.mock('../../../../src/utils/logging.js', () => ({ + logger: mockLogger, +})); + +import { shouldBlockForPipelineCapacity } from '../../../../src/triggers/shared/pipeline-capacity-gate.js'; +import { createMockProject } from '../../../helpers/factories.js'; + +const project = createMockProject({ maxInFlightItems: 1 }); + +describe('shouldBlockForPipelineCapacity', () => { + it('does not gate non-slot-consuming agent types (review, planning, splitting, backlog-manager)', async () => { + for (const agentType of ['review', 'planning', 'splitting', 'backlog-manager', 'debug']) { + expect( + await shouldBlockForPipelineCapacity({ + project, + agentType, + workItemId: 'UA-1', + source: 'jira', + }), + ).toBe(false); + } + expect(mockGetPMProvider).not.toHaveBeenCalled(); + expect(mockIsActivePipelineOverCapacity).not.toHaveBeenCalled(); + }); + + it('blocks implementation when active pipeline is over capacity', async () => { + const fakeProvider = { type: 'jira' }; + mockGetPMProvider.mockReturnValue(fakeProvider); + mockIsActivePipelineOverCapacity.mockResolvedValue({ + overCapacity: true, + reason: 'over-capacity', + inFlightCount: 2, + limit: 1, + }); + + const blocked = await shouldBlockForPipelineCapacity({ + project, + agentType: 'implementation', + workItemId: 'UA-3', + source: 'jira', + }); + + expect(blocked).toBe(true); + expect(mockIsActivePipelineOverCapacity).toHaveBeenCalledWith(project, fakeProvider, { + excludeWorkItemId: 'UA-3', + }); + expect(mockLogger.info).toHaveBeenCalledWith( + 'pipeline-at-capacity: skipping status-changed trigger', + expect.objectContaining({ + agentType: 'implementation', + workItemId: 'UA-3', + inFlightCount: 2, + limit: 1, + }), + ); + }); + + it('allows implementation when below capacity', async () => { + mockGetPMProvider.mockReturnValue({ type: 'jira' }); + mockIsActivePipelineOverCapacity.mockResolvedValue({ + overCapacity: false, + reason: 'below-capacity', + inFlightCount: 0, + limit: 1, + }); + + const blocked = await shouldBlockForPipelineCapacity({ + project, + agentType: 'implementation', + workItemId: 'UA-3', + source: 'jira', + }); + + expect(blocked).toBe(false); + }); + + it('allows (conservatively) when no PM provider scope is available', async () => { + mockGetPMProvider.mockImplementation(() => { + throw new Error('no scope'); + }); + + const blocked = await shouldBlockForPipelineCapacity({ + project, + agentType: 'implementation', + workItemId: 'UA-3', + source: 'jira', + }); + + expect(blocked).toBe(false); + expect(mockLogger.warn).toHaveBeenCalledWith( + 'pipeline-capacity-gate: PM provider unavailable, allowing run', + expect.objectContaining({ workItemId: 'UA-3' }), + ); + }); +});