Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ Optional:
- `CREDENTIAL_MASTER_KEY` — 64-char hex (AES-256 key) to encrypt project credentials at rest. Without it, credentials are stored as plaintext; both modes coexist.
- `GITHUB_WEBHOOK_SECRET` — opt-in HMAC verification; store as the `webhook_secret` role on the GitHub SCM integration.
- `SENTRY_DSN`, `SENTRY_ENVIRONMENT`, `SENTRY_RELEASE`, `SENTRY_TRACES_SAMPLE_RATE` — observability.
- `PM_CREATE_COALESCE_WINDOW_MS` — window (ms) the router waits after a PM `pm:status-changed` create trigger before enqueuing, so a follow-up `update` (same `${projectId}:${workItemId}`) can supersede it. Defaults to `2000`; `0` disables. Fixes JIRA's double-fire when an issue is created in a non-default workflow column (JIRA emits `issue_created` at the initial status, then `issue_updated` transitioning to the target).

**Project credentials (GitHub tokens, Trello/JIRA/Linear keys, LLM API keys) live in the `project_credentials` table.** The DB is the **sole source of truth** — there is no env var fallback for project-scoped secrets.

Expand Down
95 changes: 95 additions & 0 deletions src/pm/create-coalesce-window.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Short-window coalescing for PM create→update webhook sequences.
*
* Problem: JIRA emits two webhooks when a user creates an issue in a non-default
* workflow column — `jira:issue_created` (with the workflow's initial status)
* followed ~hundreds of ms later by `jira:issue_updated` (transitioning to the
* target column). Without coalescing, both webhooks fire different agents on
* the same work item.
*
* This module lets a create trigger register a pending entry keyed by
* `${projectId}:${workItemId}`. An incoming update trigger for the same key
* clears the entry, superseding the create. If the window elapses with no
* update, the create proceeds normally.
*
* In-memory state is sufficient — a router restart during the ~2s window
* means the pending create is lost, but the update webhook (which arrives
* independently) will still fire.
*/

type PendingEntry = {
timer: ReturnType<typeof setTimeout>;
resolve: (outcome: 'proceed' | 'superseded') => void;
};

const pending = new Map<string, PendingEntry>();

/**
* Register a pending create for the given key. Returns a promise that resolves
* after `ttlMs` with `'proceed'` if still pending, or earlier with
* `'superseded'` if `clearPendingCreate(key)` is called or another
* `registerPendingCreate(key, …)` supersedes it.
*
* `ttlMs === 0` resolves immediately to `'proceed'` (coalescing disabled).
*/
export function registerPendingCreate(
key: string,
ttlMs: number,
): Promise<'proceed' | 'superseded'> {
if (ttlMs <= 0) {
return Promise.resolve('proceed');
}

// Supersede any existing entry for the same key.
const existing = pending.get(key);
if (existing) {
clearTimeout(existing.timer);
existing.resolve('superseded');
pending.delete(key);
}

return new Promise((resolve) => {
const timer = setTimeout(() => {
const entry = pending.get(key);
if (entry && entry.resolve === resolve) {
pending.delete(key);
}
resolve('proceed');
}, ttlMs);
pending.set(key, { timer, resolve });
});
}

/**
* Clear a pending create, causing its registration promise to resolve with
* `'superseded'`. No-op if no entry exists for the key.
*/
export function clearPendingCreate(key: string): void {
const entry = pending.get(key);
if (!entry) return;
clearTimeout(entry.timer);
pending.delete(key);
entry.resolve('superseded');
}

/**
* Test-only: drop all pending entries without resolving their promises.
* Used by unit tests between cases to ensure isolation.
*/
export function __resetCoalesceWindowForTests(): void {
for (const entry of pending.values()) {
clearTimeout(entry.timer);
}
pending.clear();
}

/**
* Read the configured window duration in milliseconds. `0` disables coalescing.
*/
export function getCoalesceWindowMs(): number {
const raw = process.env.PM_CREATE_COALESCE_WINDOW_MS;
if (raw === undefined) return 2000;
const n = Number.parseInt(raw, 10);
if (!Number.isFinite(n) || n < 0) return 2000;
return n;
}
32 changes: 32 additions & 0 deletions src/router/webhook-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
* from `pm/webhook-handler.ts` but for the router (enqueue-only) path.
*/

import {
clearPendingCreate,
getCoalesceWindowMs,
registerPendingCreate,
} from '../pm/create-coalesce-window.js';
import type { TriggerRegistry } from '../triggers/registry.js';
import { logger } from '../utils/logging.js';
import { isDuplicateAction, markActionProcessed } from './action-dedup.js';
Expand Down Expand Up @@ -135,6 +140,33 @@ export async function processRouterWebhook(
projectId: project.id,
});

// Step 7b: Coalesce PM create→update sequences. JIRA emits two webhooks when
// a user creates an issue in a non-default workflow column (initial status,
// then transition); without this, both fire different agents on the same
// work item. A 'create' trigger waits the coalesce window; an 'update'
// trigger arriving for the same key within the window supersedes it.
if (result.coalesceKey) {
if (result.coalesceRole === 'update') {
clearPendingCreate(result.coalesceKey);
} else if (result.coalesceRole === 'create') {
const windowMs = getCoalesceWindowMs();
const outcome = await registerPendingCreate(result.coalesceKey, windowMs);
if (outcome === 'superseded') {
logger.info(`${adapter.type} create trigger superseded by follow-up update`, {
agentType: result.agentType,
workItemId: result.workItemId,
projectId: project.id,
});
result.onBlocked?.();
return {
shouldProcess: true,
projectId: project.id,
decisionReason: 'Create trigger superseded by follow-up update (coalesce window)',
};
}
}
}

// GitHub special case: no-agent triggers (pr-merged, pr-ready-to-merge)
// dispatch already performed PM operations — no job queuing needed
if (!result.agentType) {
Expand Down
2 changes: 2 additions & 0 deletions src/triggers/jira/status-changed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ export class JiraStatusChangedTrigger implements TriggerHandler {
workItemId: issueKey,
workItemUrl,
workItemTitle,
coalesceKey: `${ctx.project.id}:${issueKey}`,
coalesceRole: isCreate ? 'create' : 'update',
};
}
}
2 changes: 2 additions & 0 deletions src/triggers/linear/status-changed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ export class LinearStatusChangedTrigger implements TriggerHandler {
workItemId,
workItemUrl,
workItemTitle,
coalesceKey: `${ctx.project.id}:${workItemId}`,
coalesceRole: isCreate ? 'create' : 'update',
};
}
}
12 changes: 12 additions & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,18 @@ export interface TriggerResult {
/** Called when the router cannot enqueue the job (work-item lock, concurrency limit).
* Allows the trigger handler to undo side-effects like dedup marking. */
onBlocked?: () => void;
/**
* Coalesce key for handling PM provider create→update webhook sequences.
*
* Set on `pm:status-changed` triggers where the event kind matters. When
* `coalesceRole === 'create'`, the router defers dispatch by the
* `PM_CREATE_COALESCE_WINDOW_MS` window; an incoming `'update'` event
* sharing the same key within the window supersedes the create.
*
* Typical key: `${projectId}:${workItemId}`.
*/
coalesceKey?: string;
coalesceRole?: 'create' | 'update';
}

export interface TriggerHandler {
Expand Down
77 changes: 77 additions & 0 deletions tests/unit/pm/create-coalesce-window.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';

import {
__resetCoalesceWindowForTests,
clearPendingCreate,
registerPendingCreate,
} from '../../../src/pm/create-coalesce-window.js';

describe('create-coalesce-window', () => {
beforeEach(() => {
vi.useFakeTimers();
__resetCoalesceWindowForTests();
});

afterEach(() => {
vi.useRealTimers();
});

it('resolves to "proceed" after the window elapses with no follow-up', async () => {
const promise = registerPendingCreate('proj-1:ITEM-1', 2000);
await vi.advanceTimersByTimeAsync(2000);
await expect(promise).resolves.toBe('proceed');
});

it('resolves to "superseded" when clearPendingCreate is called within the window', async () => {
const promise = registerPendingCreate('proj-1:ITEM-1', 2000);
await vi.advanceTimersByTimeAsync(500);
clearPendingCreate('proj-1:ITEM-1');
await expect(promise).resolves.toBe('superseded');
});

it('honors the window duration — does not resolve early', async () => {
const promise = registerPendingCreate('proj-1:ITEM-1', 2000);
const outcome: Array<'proceed' | 'superseded'> = [];
promise.then((v) => outcome.push(v));

await vi.advanceTimersByTimeAsync(1999);
expect(outcome).toEqual([]);

await vi.advanceTimersByTimeAsync(1);
await promise;
expect(outcome).toEqual(['proceed']);
});

it('keys are isolated — clearing one key does not affect another', async () => {
const p1 = registerPendingCreate('proj-1:ITEM-1', 2000);
const p2 = registerPendingCreate('proj-1:ITEM-2', 2000);

clearPendingCreate('proj-1:ITEM-1');
expect(await p1).toBe('superseded');

await vi.advanceTimersByTimeAsync(2000);
expect(await p2).toBe('proceed');
});

it('registering a second create for the same key supersedes the first', async () => {
const first = registerPendingCreate('proj-1:ITEM-1', 2000);
const second = registerPendingCreate('proj-1:ITEM-1', 2000);

expect(await first).toBe('superseded');

await vi.advanceTimersByTimeAsync(2000);
expect(await second).toBe('proceed');
});

it('ttlMs of 0 resolves immediately to "proceed" without registering state', async () => {
const promise = registerPendingCreate('proj-1:ITEM-1', 0);
// No timer advance needed; microtask flush.
await expect(promise).resolves.toBe('proceed');
// A subsequent clear is a no-op (no pending entry).
expect(() => clearPendingCreate('proj-1:ITEM-1')).not.toThrow();
});

it('clearPendingCreate on unknown key is a no-op', () => {
expect(() => clearPendingCreate('never-registered')).not.toThrow();
});
});
76 changes: 75 additions & 1 deletion tests/unit/router/webhook-processor.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { describe, expect, it, vi } from 'vitest';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';

vi.mock('../../../src/utils/logging.js', () => ({
logger: {
Expand Down Expand Up @@ -463,4 +463,78 @@ describe('processRouterWebhook', () => {
expect(addJob).toHaveBeenCalled();
expect(markWorkItemEnqueued).not.toHaveBeenCalled();
});

describe('create→update coalesce', () => {
// Use real timers + short window so the create resolves quickly when not superseded.
const origWindow = process.env.PM_CREATE_COALESCE_WINDOW_MS;

beforeEach(() => {
process.env.PM_CREATE_COALESCE_WINDOW_MS = '50';
});
afterEach(() => {
if (origWindow === undefined) delete process.env.PM_CREATE_COALESCE_WINDOW_MS;
else process.env.PM_CREATE_COALESCE_WINDOW_MS = origWindow;
});

it('supersedes a create when an update with the same coalesceKey arrives within the window', async () => {
vi.mocked(addJob).mockResolvedValue('job-x');
const createAdapter = makeMockAdapter({
type: 'jira',
dispatchWithCredentials: vi.fn().mockResolvedValue({
agentType: 'implementation',
agentInput: { workItemId: 'PROJ-1' },
workItemId: 'PROJ-1',
coalesceKey: 'p1:PROJ-1',
coalesceRole: 'create',
}),
});
const updateAdapter = makeMockAdapter({
type: 'jira',
dispatchWithCredentials: vi.fn().mockResolvedValue({
agentType: 'planning',
agentInput: { workItemId: 'PROJ-1' },
workItemId: 'PROJ-1',
coalesceKey: 'p1:PROJ-1',
coalesceRole: 'update',
}),
});

// Fire create (will wait 50ms) and update (resolves immediately, supersedes create)
const createPromise = processRouterWebhook(createAdapter, {}, mockTriggerRegistry);
// Let microtasks settle so the create registers before we dispatch the update.
await Promise.resolve();
const updateResult = await processRouterWebhook(updateAdapter, {}, mockTriggerRegistry);
const createResult = await createPromise;

expect(createResult.decisionReason).toBe(
'Create trigger superseded by follow-up update (coalesce window)',
);
expect(updateResult.shouldProcess).toBe(true);

// Only one job should have been queued (for the update), not two.
expect(addJob).toHaveBeenCalledTimes(1);
expect(createAdapter.postAck).not.toHaveBeenCalled();
expect(updateAdapter.postAck).toHaveBeenCalled();
});

it('proceeds with a create when no update arrives within the window', async () => {
vi.mocked(addJob).mockResolvedValue('job-solo');
const adapter = makeMockAdapter({
type: 'jira',
dispatchWithCredentials: vi.fn().mockResolvedValue({
agentType: 'planning',
agentInput: { workItemId: 'PROJ-2' },
workItemId: 'PROJ-2',
coalesceKey: 'p1:PROJ-2',
coalesceRole: 'create',
}),
});

const result = await processRouterWebhook(adapter, {}, mockTriggerRegistry);

expect(result.shouldProcess).toBe(true);
expect(addJob).toHaveBeenCalledTimes(1);
expect(adapter.postAck).toHaveBeenCalled();
});
});
});
24 changes: 24 additions & 0 deletions tests/unit/triggers/jira-status-changed.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,4 +336,28 @@ describe('JiraStatusChangedTrigger', () => {
);
});
});

describe('coalesce metadata', () => {
it('tags move results with coalesceRole: "update" and a project-scoped key', async () => {
const ctx = buildCtx({
statusChangeItems: [{ field: 'status', fromString: 'Backlog', toString: 'Splitting' }],
});
const result = await trigger.handle(ctx);

expect(result?.coalesceKey).toBe('test-project:PROJ-42');
expect(result?.coalesceRole).toBe('update');
});

it('tags create results with coalesceRole: "create" and a project-scoped key', async () => {
mockTriggerConfig(true, { onCreate: true, onMove: true });
const ctx = buildCtx({
webhookEvent: 'jira:issue_created',
issueStatusName: 'To Do',
});
const result = await trigger.handle(ctx);

expect(result?.coalesceKey).toBe('test-project:PROJ-42');
expect(result?.coalesceRole).toBe('create');
});
});
});
Loading
Loading