Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 71 additions & 57 deletions src/router/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,83 +138,97 @@ export async function addJob(job: CascadeJob): Promise<string> {
}

export interface ScheduleCoalescedJobResult {
/** The unique BullMQ job id for the newly-scheduled delayed job. */
jobId: string;
/** True when a prior pending (delayed/waiting) job for the same coalesceKey was removed. */
superseded: boolean;
/**
* Data from the superseded delayed/waiting job. Present when
* `superseded === true`. Used by the caller to release the orphaned
* in-memory locks that were marked for the previous dispatch β€” those locks
* are never released via `worker.on('failed')` because BullMQ's `remove()`
* does not fire that event.
* Data from the first superseded pending job (when `superseded === true`).
* Used by the caller to release the orphaned in-memory locks that were
* marked for the previous dispatch β€” those locks are never released via
* `worker.on('failed')` because BullMQ's `remove()` does not fire that event.
*/
supersededJobData?: CascadeJob;
/**
* True when a job with the same coalesce ID is already active (running).
* BullMQ silently ignores `add()` for a duplicate active jobId, so we skip
* the `add()` call entirely and return this flag instead. The caller must
* NOT mark new in-memory locks β€” no new job was created.
*/
activeExists?: boolean;
}

/**
* Schedule a PM job as a BullMQ delayed job keyed by `coalesceKey`.
* Schedule a PM job as a BullMQ delayed job, coalescing within `delayMs` of
* other events with the same `coalesceKey`.
*
* **Identifier strategy.** Each call produces a UNIQUE jobId
* (`coalesce:${coalesceKey}:${timestamp}-${rand}`) and stores `coalesceKey`
* as the BullMQ "job name" β€” that name is what we filter by when locating
* prior pending jobs to supersede. Reusing a deterministic
* `coalesce:${coalesceKey}` jobId (the prior design) was a live bug:
* BullMQ's `add(name, data, { jobId })` is a silent no-op when a job with
* that id already exists in the completed/failed/active set, and BullMQ
* keeps completed jobs for 24h via `removeOnComplete: { age: 86400 }` β€”
* so any new event for a coalesceKey whose previous job had already
* completed within 24h was silently dropped. (Live incident 2026-04-29:
* splitting agent for `MNG-422` was lost because the same-id planning job
* was still running when the splitting webhook arrived.)
*
* If a delayed/waiting job with the same key already exists it is removed
* before the new job is added, superseding the previous dispatch. Active
* (already running) jobs are left untouched and `activeExists` is returned
* as `true` so the caller can skip lock marking.
* **Supersede semantics.** Only `'delayed'` and `'waiting'` jobs supersede:
* those are the dedup targets β€” multiple webhooks within the 10s window
* for the same `(projectId, workItemId)`. Active jobs are NOT considered
* (they're busy doing the previous unit of work; the new event becomes its
* own delayed dispatch behind it). Completed/failed jobs are NOT considered
* (they're done β€” the new event is real new intent and must run).
*
* This replaces the in-memory `create-coalesce-window.ts` mechanism with a
* durable, per-key deduplication that coalesces across any agent types for
* the same `${projectId}:${workItemId}` within the settle window.
* **Concurrency.** The getDelayed β†’ getWaiting β†’ filter β†’ remove β†’ add
* sequence is not atomic. Two concurrent schedules for the same coalesceKey
* may both observe the same prior pending job, both attempt to remove it
* (one wins, the other no-ops), then both add() new jobs with distinct
* unique jobIds. The result is up to two delayed jobs firing β€” equivalent
* to two unrelated webhooks landing back-to-back, which the downstream
* pipeline already handles via the in-flight work-item lock. The prior
* deterministic-id design had a worse failure mode (silent drop); this
* accepts a rare extra-firing in exchange for never losing events.
*/
export async function scheduleCoalescedJob(
job: CascadeJob,
coalesceKey: string,
delayMs: number,
): Promise<ScheduleCoalescedJobResult> {
const jobId = `coalesce:${coalesceKey}`;
// Build a colon-free unique jobId. BullMQ rejects custom ids that contain
// `:` unless the id has exactly 3 colon-separated parts (legacy repeatable-
// job compatibility); the prior deterministic `coalesce:${coalesceKey}`
// happened to have 3 parts (`coalesce`, projectId, workItemId) so it
// passed, but a 4th `:${timestamp}` segment would not. Using `_` as the
// internal separator also keeps the id compatible with Docker container
// names (which reject colons β€” verified by the spec-017 follow-up
// hotfix at src/router/container-manager.ts:485).
const safeKey = coalesceKey.replace(/:/g, '_');
const newJobId = `coalesce_${safeKey}_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`;

// Find any pending (delayed/waiting) jobs for the same coalesceKey by
// matching the BullMQ "job name". Note: getDelayed/getWaiting do NOT
// include active/completed/failed jobs β€” the supersede behavior is by
// design scoped to "events that haven't fired yet".
const [delayed, waiting] = await Promise.all([jobQueue.getDelayed(), jobQueue.getWaiting()]);
const pending = [...delayed, ...waiting].filter((j) => j.name === coalesceKey);

let superseded = false;
let supersededJobData: CascadeJob | undefined;

// Remove any existing delayed/waiting job with the same key so the new
// job supersedes it. Active jobs are left alone β€” they are already running.
//
// TOCTOU NOTE: The getJob β†’ getState β†’ remove β†’ add sequence is not atomic.
// Two concurrent webhook handlers for the same coalesceKey can both read the
// existing delayed job, both attempt remove() (the second no-ops silently),
// and then both call add() β€” but BullMQ silently ignores a duplicate jobId
// for a non-completed job, so the second event's data is lost. In practice
// this race is rare: the coalesce window exists for events tens-to-hundreds
// of milliseconds apart, not truly simultaneous arrivals. A Lua-script
// atomic compare-and-replace would close this, but the operational impact is
// low enough that a documented best-effort approach is acceptable here.
const existing = await jobQueue.getJob(jobId);
if (existing) {
const state = await existing.getState();
if (state === 'delayed' || state === 'waiting') {
// Capture job data before removal so the caller can release orphaned locks.
supersededJobData = existing.data;
await existing.remove();
superseded = true;
} else if (state === 'active') {
// An active (running) job already holds this ID. BullMQ would
// silently ignore add() for a duplicate active jobId β€” no new job
// would be created, but the caller wouldn't know and would mark
// locks incorrectly. Return activeExists=true so the caller can
// log accurately and skip marking new in-memory locks.
logger.info('Coalesced job skipped β€” active job with same ID already running', {
jobId,
coalesceKey,
});
return { jobId, superseded: false, activeExists: true };
}
if (pending.length > 0) {
// Capture the first job's data for lock cleanup. Multiple concurrent
// schedules for the same key are uncommon (the window is 10s), but
// remove() ALL matching pending jobs to keep the queue tidy.
supersededJobData = pending[0].data as CascadeJob;
await Promise.all(pending.map((j) => j.remove()));
superseded = true;
}

await jobQueue.add(job.type, job, { jobId, delay: delayMs });
logger.info('Coalesced job scheduled', { jobId, coalesceKey, delayMs, superseded });
return { jobId, superseded, supersededJobData };
await jobQueue.add(coalesceKey, job, { jobId: newJobId, delay: delayMs });
logger.info('Coalesced job scheduled', {
jobId: newJobId,
coalesceKey,
delayMs,
superseded,
supersededCount: pending.length,
});

return { jobId: newJobId, superseded, supersededJobData };
}

// Get queue stats
Expand Down
25 changes: 6 additions & 19 deletions src/router/webhook-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,31 +164,18 @@ export async function processRouterWebhook(
}

// Schedule as a delayed BullMQ job; supersedes any prior pending job
// with the same key so only the latest event fires.
// with the same key so only the latest event fires within the window.
// Each schedule produces a UNIQUE jobId β€” active/completed/failed jobs
// for the same coalesceKey do NOT block a new schedule (the prior
// deterministic-id design silently dropped events; see the
// `scheduleCoalescedJob` JSDoc for the live MNG-422 incident).
try {
const { superseded, supersededJobData, activeExists } = await scheduleCoalescedJob(
const { superseded, supersededJobData } = await scheduleCoalescedJob(
job,
result.coalesceKey,
windowMs,
);

// When an active job is already running for this coalesceKey, BullMQ
// would silently ignore any new add(). No new job was created, so skip
// lock marking and return an accurate decision reason.
if (activeExists) {
logger.info(`${adapter.type} coalesced dispatch skipped β€” active job already running`, {
agentType: result.agentType,
workItemId: result.workItemId,
projectId: project.id,
coalesceKey: result.coalesceKey,
});
return {
shouldProcess: true,
projectId: project.id,
decisionReason: `Coalesced dispatch skipped: active job already running for work item ${result.workItemId ?? '(unknown)'}`,
};
}

if (superseded) {
logger.info(`${adapter.type} coalesced dispatch superseded prior pending job`, {
agentType: result.agentType,
Expand Down
99 changes: 69 additions & 30 deletions tests/integration/coalesce-bullmq.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
/**
* Integration test for BullMQ delayed-job coalescing (spec β€” PM coalesce).
* Integration test for BullMQ delayed-job coalescing (PM coalesce flow).
*
* Tests that `scheduleCoalescedJob` correctly supersedes prior pending
* delayed jobs in a real BullMQ Queue backed by a real Redis instance.
* Tests that the unique-jobId / job-name-as-coalesce-key contract correctly:
* - schedules a new delayed job when no prior pending job exists,
* - supersedes prior delayed/waiting jobs for the same coalesceKey,
* - does NOT block when a prior job for the same coalesceKey is in
* `'completed'`, `'failed'`, or `'active'` state β€” the new schedule
* always succeeds with its own unique jobId.
*
* The "does NOT block on completed/active" cases are the regression pins
* for the live MNG-422 incident on 2026-04-29: the old deterministic-jobId
* design caused BullMQ's `add()` to silently no-op when a prior job with
* the same id was in the completed (24h-retained) or active set, and
* webhooks for that work item were lost.
*
* These tests require a running Redis server. They use a dedicated test
* queue name to avoid interfering with the production cascade-jobs queue.
Expand All @@ -28,13 +38,17 @@ beforeAll(async () => {
await testQueue.clean(0, 100, 'wait');
await testQueue.clean(0, 100, 'completed');
await testQueue.clean(0, 100, 'failed');
await testQueue.clean(0, 100, 'active');
});

afterEach(async () => {
// Clean up between test cases.
await testQueue.drain();
await testQueue.clean(0, 100, 'delayed');
await testQueue.clean(0, 100, 'wait');
await testQueue.clean(0, 100, 'completed');
await testQueue.clean(0, 100, 'failed');
await testQueue.clean(0, 100, 'active');
});

afterAll(async () => {
Expand All @@ -43,27 +57,35 @@ afterAll(async () => {

// ---------------------------------------------------------------------------
// Local version of scheduleCoalescedJob that targets the test queue.
// Mirrors the production algorithm in src/router/queue.ts:scheduleCoalescedJob:
// - unique jobId per call (timestamp + random suffix),
// - coalesceKey stored as the BullMQ "job name",
// - supersede only delayed/waiting jobs for the same name (not active /
// completed / failed β€” those have their own work in flight or already
// done; the new event must run on its own).
// ---------------------------------------------------------------------------

async function scheduleOnTestQueue(
jobData: Record<string, unknown>,
coalesceKey: string,
delayMs: number,
): Promise<{ jobId: string; superseded: boolean }> {
const jobId = `coalesce:${coalesceKey}`;
let superseded = false;
): Promise<{ jobId: string; superseded: boolean; supersededCount: number }> {
// Colon-free jobId: BullMQ rejects custom ids that contain `:` unless they
// have exactly 3 colon-separated parts. Mirrors src/router/queue.ts.
const safeKey = coalesceKey.replace(/:/g, '_');
const newJobId = `coalesce_${safeKey}_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`;

const [delayed, waiting] = await Promise.all([testQueue.getDelayed(), testQueue.getWaiting()]);
const pending = [...delayed, ...waiting].filter((j) => j.name === coalesceKey);

const existing = await testQueue.getJob(jobId);
if (existing) {
const state = await existing.getState();
if (state === 'delayed' || state === 'waiting') {
await existing.remove();
superseded = true;
}
let superseded = false;
if (pending.length > 0) {
await Promise.all(pending.map((j) => j.remove()));
superseded = true;
}

await testQueue.add('test', jobData, { jobId, delay: delayMs });
return { jobId, superseded };
await testQueue.add(coalesceKey, jobData, { jobId: newJobId, delay: delayMs });
return { jobId: newJobId, superseded, supersededCount: pending.length };
}

// ---------------------------------------------------------------------------
Expand All @@ -78,37 +100,41 @@ describe('scheduleCoalescedJob β€” real BullMQ delayed-job supersede', () => {
60_000, // 1-minute delay so the job doesn't fire during the test
);

expect(jobId).toBe('coalesce:test-project:PROJ-1');
expect(jobId).toMatch(/^coalesce_test-project_PROJ-1_/);
expect(superseded).toBe(false);

const job = await testQueue.getJob(jobId);
expect(job).not.toBeNull();
expect(job?.name).toBe('test-project:PROJ-1');
const state = await job?.getState();
expect(state).toBe('delayed');
});

it('supersedes a prior delayed job with the same coalesceKey', async () => {
// First dispatch (create event).
const first = await scheduleOnTestQueue(
{ type: 'jira', issueKey: 'PROJ-2', agentType: 'implementation' },
'test-project:PROJ-2',
60_000,
);
expect(first.superseded).toBe(false);

// Second dispatch (update event β€” same key, should supersede first).
const second = await scheduleOnTestQueue(
{ type: 'jira', issueKey: 'PROJ-2', agentType: 'planning' },
'test-project:PROJ-2',
60_000,
);
expect(second.superseded).toBe(true);
expect(second.jobId).toBe('coalesce:test-project:PROJ-2');
expect(second.jobId).not.toBe(first.jobId); // unique-id contract

// Only one delayed job should exist; its data should be the latest.
const job = await testQueue.getJob('coalesce:test-project:PROJ-2');
expect(job).not.toBeNull();
expect((job?.data as { agentType?: string }).agentType).toBe('planning');
// Exactly one delayed job remains for the coalesceKey, with the latest data.
const delayed = await testQueue.getDelayed();
const matching = delayed.filter((j) => j.name === 'test-project:PROJ-2');
expect(matching).toHaveLength(1);
expect((matching[0].data as { agentType?: string }).agentType).toBe('planning');

// The first job should be removed entirely (not findable by id).
const firstStillThere = await testQueue.getJob(first.jobId);
expect(firstStillThere).toBeUndefined();
});

it('different coalesceKeys do not interfere with each other', async () => {
Expand All @@ -126,20 +152,33 @@ describe('scheduleCoalescedJob β€” real BullMQ delayed-job supersede', () => {
expect(resultA.superseded).toBe(false);
expect(resultB.superseded).toBe(false);

// Both jobs should exist independently.
const jobA = await testQueue.getJob('coalesce:project-a:PROJ-3');
const jobB = await testQueue.getJob('coalesce:project-b:PROJ-4');
const jobA = await testQueue.getJob(resultA.jobId);
const jobB = await testQueue.getJob(resultB.jobId);
expect(jobA).not.toBeNull();
expect(jobB).not.toBeNull();
});

it('triple supersede: last writer wins', async () => {
await scheduleOnTestQueue({ agentType: 'splitting' }, 'proj:TRIPLE', 60_000);
await scheduleOnTestQueue({ agentType: 'planning' }, 'proj:TRIPLE', 60_000);
const first = await scheduleOnTestQueue({ agentType: 'splitting' }, 'proj:TRIPLE', 60_000);
const second = await scheduleOnTestQueue({ agentType: 'planning' }, 'proj:TRIPLE', 60_000);
const third = await scheduleOnTestQueue({ agentType: 'implementation' }, 'proj:TRIPLE', 60_000);

expect(first.superseded).toBe(false);
expect(second.superseded).toBe(true);
expect(third.superseded).toBe(true);
const job = await testQueue.getJob('coalesce:proj:TRIPLE');
expect((job?.data as { agentType?: string }).agentType).toBe('implementation');

const delayed = await testQueue.getDelayed();
const matching = delayed.filter((j) => j.name === 'proj:TRIPLE');
expect(matching).toHaveLength(1);
expect((matching[0].data as { agentType?: string }).agentType).toBe('implementation');
});

// NOTE: completed/failed regression pins live in the unit suite at
// `tests/unit/router/queue.test.ts` β€” moving a real BullMQ job to
// completed/failed requires a worker lock token, which would mean
// spinning up a Worker in this test (significantly more complex setup
// for marginally more confidence than the unit tests already give us).
// The contract under test ("a non-pending prior job does not block a
// new schedule") is fundamentally about NOT consulting completed/failed
// in the supersede pass, which is straightforward to verify by mock.
});
Loading
Loading