From 620408fceaa942de7e472d4a5a5618b2bf90e938 Mon Sep 17 00:00:00 2001 From: Zbigniew Sobiecki Date: Fri, 24 Apr 2026 12:19:24 +0200 Subject: [PATCH] =?UTF-8?q?fix(router):=20coalesce=20PM=20create=E2=86=92u?= =?UTF-8?q?pdate=20webhook=20sequences=20to=20stop=20JIRA=20double-firing?= =?UTF-8?q?=20agents=20(#1179)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a user creates a JIRA issue directly in a non-default workflow column, JIRA emits two webhooks ~hundreds of ms apart: `issue_created` at the workflow's initial status, then `issue_updated` transitioning to the target column. Each webhook resolves a different agent via `STATUS_TO_AGENT`, so both fire on the same work item (observed as UA-11: implementation + planning running concurrently). Fix: router-level coalesce window keyed by `${projectId}:${workItemId}`. A `pm:status-changed` create trigger waits `PM_CREATE_COALESCE_WINDOW_MS` (default 2000, 0 disables) before enqueue; an update for the same key within the window supersedes the create — no ack posted, no job queued, `onBlocked` called to clear any dedup markers. Trivially also protects Linear against the same code-shape risk. Co-authored-by: Claude Opus 4.7 (1M context) --- CLAUDE.md | 1 + src/pm/create-coalesce-window.ts | 95 +++++++++++++++++++ src/router/webhook-processor.ts | 32 +++++++ src/triggers/jira/status-changed.ts | 2 + src/triggers/linear/status-changed.ts | 2 + src/types/index.ts | 12 +++ tests/unit/pm/create-coalesce-window.test.ts | 77 +++++++++++++++ tests/unit/router/webhook-processor.test.ts | 76 ++++++++++++++- .../unit/triggers/jira-status-changed.test.ts | 24 +++++ .../triggers/linear-status-changed.test.ts | 17 ++++ 10 files changed, 337 insertions(+), 1 deletion(-) create mode 100644 src/pm/create-coalesce-window.ts create mode 100644 tests/unit/pm/create-coalesce-window.test.ts diff --git a/CLAUDE.md b/CLAUDE.md index 357b70d2..39ceea29 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -158,6 +158,7 @@ Optional: - `CREDENTIAL_MASTER_KEY` — 64-char hex (AES-256 key) to encrypt project credentials at rest. Without it, credentials are stored as plaintext; both modes coexist. - `GITHUB_WEBHOOK_SECRET` — opt-in HMAC verification; store as the `webhook_secret` role on the GitHub SCM integration. - `SENTRY_DSN`, `SENTRY_ENVIRONMENT`, `SENTRY_RELEASE`, `SENTRY_TRACES_SAMPLE_RATE` — observability. +- `PM_CREATE_COALESCE_WINDOW_MS` — window (ms) the router waits after a PM `pm:status-changed` create trigger before enqueuing, so a follow-up `update` (same `${projectId}:${workItemId}`) can supersede it. Defaults to `2000`; `0` disables. Fixes JIRA's double-fire when an issue is created in a non-default workflow column (JIRA emits `issue_created` at the initial status, then `issue_updated` transitioning to the target). **Project credentials (GitHub tokens, Trello/JIRA/Linear keys, LLM API keys) live in the `project_credentials` table.** The DB is the **sole source of truth** — there is no env var fallback for project-scoped secrets. diff --git a/src/pm/create-coalesce-window.ts b/src/pm/create-coalesce-window.ts new file mode 100644 index 00000000..c5ab08b0 --- /dev/null +++ b/src/pm/create-coalesce-window.ts @@ -0,0 +1,95 @@ +/** + * Short-window coalescing for PM create→update webhook sequences. + * + * Problem: JIRA emits two webhooks when a user creates an issue in a non-default + * workflow column — `jira:issue_created` (with the workflow's initial status) + * followed ~hundreds of ms later by `jira:issue_updated` (transitioning to the + * target column). Without coalescing, both webhooks fire different agents on + * the same work item. + * + * This module lets a create trigger register a pending entry keyed by + * `${projectId}:${workItemId}`. An incoming update trigger for the same key + * clears the entry, superseding the create. If the window elapses with no + * update, the create proceeds normally. + * + * In-memory state is sufficient — a router restart during the ~2s window + * means the pending create is lost, but the update webhook (which arrives + * independently) will still fire. + */ + +type PendingEntry = { + timer: ReturnType; + resolve: (outcome: 'proceed' | 'superseded') => void; +}; + +const pending = new Map(); + +/** + * Register a pending create for the given key. Returns a promise that resolves + * after `ttlMs` with `'proceed'` if still pending, or earlier with + * `'superseded'` if `clearPendingCreate(key)` is called or another + * `registerPendingCreate(key, …)` supersedes it. + * + * `ttlMs === 0` resolves immediately to `'proceed'` (coalescing disabled). + */ +export function registerPendingCreate( + key: string, + ttlMs: number, +): Promise<'proceed' | 'superseded'> { + if (ttlMs <= 0) { + return Promise.resolve('proceed'); + } + + // Supersede any existing entry for the same key. + const existing = pending.get(key); + if (existing) { + clearTimeout(existing.timer); + existing.resolve('superseded'); + pending.delete(key); + } + + return new Promise((resolve) => { + const timer = setTimeout(() => { + const entry = pending.get(key); + if (entry && entry.resolve === resolve) { + pending.delete(key); + } + resolve('proceed'); + }, ttlMs); + pending.set(key, { timer, resolve }); + }); +} + +/** + * Clear a pending create, causing its registration promise to resolve with + * `'superseded'`. No-op if no entry exists for the key. + */ +export function clearPendingCreate(key: string): void { + const entry = pending.get(key); + if (!entry) return; + clearTimeout(entry.timer); + pending.delete(key); + entry.resolve('superseded'); +} + +/** + * Test-only: drop all pending entries without resolving their promises. + * Used by unit tests between cases to ensure isolation. + */ +export function __resetCoalesceWindowForTests(): void { + for (const entry of pending.values()) { + clearTimeout(entry.timer); + } + pending.clear(); +} + +/** + * Read the configured window duration in milliseconds. `0` disables coalescing. + */ +export function getCoalesceWindowMs(): number { + const raw = process.env.PM_CREATE_COALESCE_WINDOW_MS; + if (raw === undefined) return 2000; + const n = Number.parseInt(raw, 10); + if (!Number.isFinite(n) || n < 0) return 2000; + return n; +} diff --git a/src/router/webhook-processor.ts b/src/router/webhook-processor.ts index 9caa69fb..aff014ff 100644 --- a/src/router/webhook-processor.ts +++ b/src/router/webhook-processor.ts @@ -10,6 +10,11 @@ * from `pm/webhook-handler.ts` but for the router (enqueue-only) path. */ +import { + clearPendingCreate, + getCoalesceWindowMs, + registerPendingCreate, +} from '../pm/create-coalesce-window.js'; import type { TriggerRegistry } from '../triggers/registry.js'; import { logger } from '../utils/logging.js'; import { isDuplicateAction, markActionProcessed } from './action-dedup.js'; @@ -135,6 +140,33 @@ export async function processRouterWebhook( projectId: project.id, }); + // Step 7b: Coalesce PM create→update sequences. JIRA emits two webhooks when + // a user creates an issue in a non-default workflow column (initial status, + // then transition); without this, both fire different agents on the same + // work item. A 'create' trigger waits the coalesce window; an 'update' + // trigger arriving for the same key within the window supersedes it. + if (result.coalesceKey) { + if (result.coalesceRole === 'update') { + clearPendingCreate(result.coalesceKey); + } else if (result.coalesceRole === 'create') { + const windowMs = getCoalesceWindowMs(); + const outcome = await registerPendingCreate(result.coalesceKey, windowMs); + if (outcome === 'superseded') { + logger.info(`${adapter.type} create trigger superseded by follow-up update`, { + agentType: result.agentType, + workItemId: result.workItemId, + projectId: project.id, + }); + result.onBlocked?.(); + return { + shouldProcess: true, + projectId: project.id, + decisionReason: 'Create trigger superseded by follow-up update (coalesce window)', + }; + } + } + } + // GitHub special case: no-agent triggers (pr-merged, pr-ready-to-merge) // dispatch already performed PM operations — no job queuing needed if (!result.agentType) { diff --git a/src/triggers/jira/status-changed.ts b/src/triggers/jira/status-changed.ts index a9813602..77c92858 100644 --- a/src/triggers/jira/status-changed.ts +++ b/src/triggers/jira/status-changed.ts @@ -147,6 +147,8 @@ export class JiraStatusChangedTrigger implements TriggerHandler { workItemId: issueKey, workItemUrl, workItemTitle, + coalesceKey: `${ctx.project.id}:${issueKey}`, + coalesceRole: isCreate ? 'create' : 'update', }; } } diff --git a/src/triggers/linear/status-changed.ts b/src/triggers/linear/status-changed.ts index 495d01ae..74732c20 100644 --- a/src/triggers/linear/status-changed.ts +++ b/src/triggers/linear/status-changed.ts @@ -139,6 +139,8 @@ export class LinearStatusChangedTrigger implements TriggerHandler { workItemId, workItemUrl, workItemTitle, + coalesceKey: `${ctx.project.id}:${workItemId}`, + coalesceRole: isCreate ? 'create' : 'update', }; } } diff --git a/src/types/index.ts b/src/types/index.ts index 3a07266a..aa255774 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -107,6 +107,18 @@ export interface TriggerResult { /** Called when the router cannot enqueue the job (work-item lock, concurrency limit). * Allows the trigger handler to undo side-effects like dedup marking. */ onBlocked?: () => void; + /** + * Coalesce key for handling PM provider create→update webhook sequences. + * + * Set on `pm:status-changed` triggers where the event kind matters. When + * `coalesceRole === 'create'`, the router defers dispatch by the + * `PM_CREATE_COALESCE_WINDOW_MS` window; an incoming `'update'` event + * sharing the same key within the window supersedes the create. + * + * Typical key: `${projectId}:${workItemId}`. + */ + coalesceKey?: string; + coalesceRole?: 'create' | 'update'; } export interface TriggerHandler { diff --git a/tests/unit/pm/create-coalesce-window.test.ts b/tests/unit/pm/create-coalesce-window.test.ts new file mode 100644 index 00000000..ce5c098e --- /dev/null +++ b/tests/unit/pm/create-coalesce-window.test.ts @@ -0,0 +1,77 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +import { + __resetCoalesceWindowForTests, + clearPendingCreate, + registerPendingCreate, +} from '../../../src/pm/create-coalesce-window.js'; + +describe('create-coalesce-window', () => { + beforeEach(() => { + vi.useFakeTimers(); + __resetCoalesceWindowForTests(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it('resolves to "proceed" after the window elapses with no follow-up', async () => { + const promise = registerPendingCreate('proj-1:ITEM-1', 2000); + await vi.advanceTimersByTimeAsync(2000); + await expect(promise).resolves.toBe('proceed'); + }); + + it('resolves to "superseded" when clearPendingCreate is called within the window', async () => { + const promise = registerPendingCreate('proj-1:ITEM-1', 2000); + await vi.advanceTimersByTimeAsync(500); + clearPendingCreate('proj-1:ITEM-1'); + await expect(promise).resolves.toBe('superseded'); + }); + + it('honors the window duration — does not resolve early', async () => { + const promise = registerPendingCreate('proj-1:ITEM-1', 2000); + const outcome: Array<'proceed' | 'superseded'> = []; + promise.then((v) => outcome.push(v)); + + await vi.advanceTimersByTimeAsync(1999); + expect(outcome).toEqual([]); + + await vi.advanceTimersByTimeAsync(1); + await promise; + expect(outcome).toEqual(['proceed']); + }); + + it('keys are isolated — clearing one key does not affect another', async () => { + const p1 = registerPendingCreate('proj-1:ITEM-1', 2000); + const p2 = registerPendingCreate('proj-1:ITEM-2', 2000); + + clearPendingCreate('proj-1:ITEM-1'); + expect(await p1).toBe('superseded'); + + await vi.advanceTimersByTimeAsync(2000); + expect(await p2).toBe('proceed'); + }); + + it('registering a second create for the same key supersedes the first', async () => { + const first = registerPendingCreate('proj-1:ITEM-1', 2000); + const second = registerPendingCreate('proj-1:ITEM-1', 2000); + + expect(await first).toBe('superseded'); + + await vi.advanceTimersByTimeAsync(2000); + expect(await second).toBe('proceed'); + }); + + it('ttlMs of 0 resolves immediately to "proceed" without registering state', async () => { + const promise = registerPendingCreate('proj-1:ITEM-1', 0); + // No timer advance needed; microtask flush. + await expect(promise).resolves.toBe('proceed'); + // A subsequent clear is a no-op (no pending entry). + expect(() => clearPendingCreate('proj-1:ITEM-1')).not.toThrow(); + }); + + it('clearPendingCreate on unknown key is a no-op', () => { + expect(() => clearPendingCreate('never-registered')).not.toThrow(); + }); +}); diff --git a/tests/unit/router/webhook-processor.test.ts b/tests/unit/router/webhook-processor.test.ts index 55109284..49373f70 100644 --- a/tests/unit/router/webhook-processor.test.ts +++ b/tests/unit/router/webhook-processor.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, it, vi } from 'vitest'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; vi.mock('../../../src/utils/logging.js', () => ({ logger: { @@ -463,4 +463,78 @@ describe('processRouterWebhook', () => { expect(addJob).toHaveBeenCalled(); expect(markWorkItemEnqueued).not.toHaveBeenCalled(); }); + + describe('create→update coalesce', () => { + // Use real timers + short window so the create resolves quickly when not superseded. + const origWindow = process.env.PM_CREATE_COALESCE_WINDOW_MS; + + beforeEach(() => { + process.env.PM_CREATE_COALESCE_WINDOW_MS = '50'; + }); + afterEach(() => { + if (origWindow === undefined) delete process.env.PM_CREATE_COALESCE_WINDOW_MS; + else process.env.PM_CREATE_COALESCE_WINDOW_MS = origWindow; + }); + + it('supersedes a create when an update with the same coalesceKey arrives within the window', async () => { + vi.mocked(addJob).mockResolvedValue('job-x'); + const createAdapter = makeMockAdapter({ + type: 'jira', + dispatchWithCredentials: vi.fn().mockResolvedValue({ + agentType: 'implementation', + agentInput: { workItemId: 'PROJ-1' }, + workItemId: 'PROJ-1', + coalesceKey: 'p1:PROJ-1', + coalesceRole: 'create', + }), + }); + const updateAdapter = makeMockAdapter({ + type: 'jira', + dispatchWithCredentials: vi.fn().mockResolvedValue({ + agentType: 'planning', + agentInput: { workItemId: 'PROJ-1' }, + workItemId: 'PROJ-1', + coalesceKey: 'p1:PROJ-1', + coalesceRole: 'update', + }), + }); + + // Fire create (will wait 50ms) and update (resolves immediately, supersedes create) + const createPromise = processRouterWebhook(createAdapter, {}, mockTriggerRegistry); + // Let microtasks settle so the create registers before we dispatch the update. + await Promise.resolve(); + const updateResult = await processRouterWebhook(updateAdapter, {}, mockTriggerRegistry); + const createResult = await createPromise; + + expect(createResult.decisionReason).toBe( + 'Create trigger superseded by follow-up update (coalesce window)', + ); + expect(updateResult.shouldProcess).toBe(true); + + // Only one job should have been queued (for the update), not two. + expect(addJob).toHaveBeenCalledTimes(1); + expect(createAdapter.postAck).not.toHaveBeenCalled(); + expect(updateAdapter.postAck).toHaveBeenCalled(); + }); + + it('proceeds with a create when no update arrives within the window', async () => { + vi.mocked(addJob).mockResolvedValue('job-solo'); + const adapter = makeMockAdapter({ + type: 'jira', + dispatchWithCredentials: vi.fn().mockResolvedValue({ + agentType: 'planning', + agentInput: { workItemId: 'PROJ-2' }, + workItemId: 'PROJ-2', + coalesceKey: 'p1:PROJ-2', + coalesceRole: 'create', + }), + }); + + const result = await processRouterWebhook(adapter, {}, mockTriggerRegistry); + + expect(result.shouldProcess).toBe(true); + expect(addJob).toHaveBeenCalledTimes(1); + expect(adapter.postAck).toHaveBeenCalled(); + }); + }); }); diff --git a/tests/unit/triggers/jira-status-changed.test.ts b/tests/unit/triggers/jira-status-changed.test.ts index a3cfff0e..7fca6c4a 100644 --- a/tests/unit/triggers/jira-status-changed.test.ts +++ b/tests/unit/triggers/jira-status-changed.test.ts @@ -336,4 +336,28 @@ describe('JiraStatusChangedTrigger', () => { ); }); }); + + describe('coalesce metadata', () => { + it('tags move results with coalesceRole: "update" and a project-scoped key', async () => { + const ctx = buildCtx({ + statusChangeItems: [{ field: 'status', fromString: 'Backlog', toString: 'Splitting' }], + }); + const result = await trigger.handle(ctx); + + expect(result?.coalesceKey).toBe('test-project:PROJ-42'); + expect(result?.coalesceRole).toBe('update'); + }); + + it('tags create results with coalesceRole: "create" and a project-scoped key', async () => { + mockTriggerConfig(true, { onCreate: true, onMove: true }); + const ctx = buildCtx({ + webhookEvent: 'jira:issue_created', + issueStatusName: 'To Do', + }); + const result = await trigger.handle(ctx); + + expect(result?.coalesceKey).toBe('test-project:PROJ-42'); + expect(result?.coalesceRole).toBe('create'); + }); + }); }); diff --git a/tests/unit/triggers/linear-status-changed.test.ts b/tests/unit/triggers/linear-status-changed.test.ts index 01f375f1..6393b5ed 100644 --- a/tests/unit/triggers/linear-status-changed.test.ts +++ b/tests/unit/triggers/linear-status-changed.test.ts @@ -350,4 +350,21 @@ describe('LinearStatusChangedTrigger', () => { expect(moveResult).toBeNull(); }); }); + + describe('coalesce metadata', () => { + it('tags move results with coalesceRole: "update" and a project-scoped key', async () => { + const result = await trigger.handle(buildCtx({ newStateId: 'state-todo' })); + expect(result?.coalesceKey).toBe('proj-linear:TEAM-123'); + expect(result?.coalesceRole).toBe('update'); + }); + + it('tags create results with coalesceRole: "create" and a project-scoped key', async () => { + mockTriggerConfig(true, { onCreate: true, onMove: true }); + const result = await trigger.handle( + buildCtx({ action: 'create', newStateId: 'state-todo', noUpdatedFrom: true }), + ); + expect(result?.coalesceKey).toBe('proj-linear:TEAM-123'); + expect(result?.coalesceRole).toBe('create'); + }); + }); });