diff --git a/src/router/queue.ts b/src/router/queue.ts index 7bf8b5c3..36c8ba75 100644 --- a/src/router/queue.ts +++ b/src/router/queue.ts @@ -138,83 +138,97 @@ export async function addJob(job: CascadeJob): Promise { } export interface ScheduleCoalescedJobResult { + /** The unique BullMQ job id for the newly-scheduled delayed job. */ jobId: string; + /** True when a prior pending (delayed/waiting) job for the same coalesceKey was removed. */ superseded: boolean; /** - * Data from the superseded delayed/waiting job. Present when - * `superseded === true`. Used by the caller to release the orphaned - * in-memory locks that were marked for the previous dispatch — those locks - * are never released via `worker.on('failed')` because BullMQ's `remove()` - * does not fire that event. + * Data from the first superseded pending job (when `superseded === true`). + * Used by the caller to release the orphaned in-memory locks that were + * marked for the previous dispatch — those locks are never released via + * `worker.on('failed')` because BullMQ's `remove()` does not fire that event. */ supersededJobData?: CascadeJob; - /** - * True when a job with the same coalesce ID is already active (running). - * BullMQ silently ignores `add()` for a duplicate active jobId, so we skip - * the `add()` call entirely and return this flag instead. The caller must - * NOT mark new in-memory locks — no new job was created. - */ - activeExists?: boolean; } /** - * Schedule a PM job as a BullMQ delayed job keyed by `coalesceKey`. + * Schedule a PM job as a BullMQ delayed job, coalescing within `delayMs` of + * other events with the same `coalesceKey`. + * + * **Identifier strategy.** Each call produces a UNIQUE jobId + * (`coalesce:${coalesceKey}:${timestamp}-${rand}`) and stores `coalesceKey` + * as the BullMQ "job name" — that name is what we filter by when locating + * prior pending jobs to supersede. Reusing a deterministic + * `coalesce:${coalesceKey}` jobId (the prior design) was a live bug: + * BullMQ's `add(name, data, { jobId })` is a silent no-op when a job with + * that id already exists in the completed/failed/active set, and BullMQ + * keeps completed jobs for 24h via `removeOnComplete: { age: 86400 }` — + * so any new event for a coalesceKey whose previous job had already + * completed within 24h was silently dropped. (Live incident 2026-04-29: + * splitting agent for `MNG-422` was lost because the same-id planning job + * was still running when the splitting webhook arrived.) * - * If a delayed/waiting job with the same key already exists it is removed - * before the new job is added, superseding the previous dispatch. Active - * (already running) jobs are left untouched and `activeExists` is returned - * as `true` so the caller can skip lock marking. + * **Supersede semantics.** Only `'delayed'` and `'waiting'` jobs supersede: + * those are the dedup targets — multiple webhooks within the 10s window + * for the same `(projectId, workItemId)`. Active jobs are NOT considered + * (they're busy doing the previous unit of work; the new event becomes its + * own delayed dispatch behind it). Completed/failed jobs are NOT considered + * (they're done — the new event is real new intent and must run). * - * This replaces the in-memory `create-coalesce-window.ts` mechanism with a - * durable, per-key deduplication that coalesces across any agent types for - * the same `${projectId}:${workItemId}` within the settle window. + * **Concurrency.** The getDelayed → getWaiting → filter → remove → add + * sequence is not atomic. Two concurrent schedules for the same coalesceKey + * may both observe the same prior pending job, both attempt to remove it + * (one wins, the other no-ops), then both add() new jobs with distinct + * unique jobIds. The result is up to two delayed jobs firing — equivalent + * to two unrelated webhooks landing back-to-back, which the downstream + * pipeline already handles via the in-flight work-item lock. The prior + * deterministic-id design had a worse failure mode (silent drop); this + * accepts a rare extra-firing in exchange for never losing events. */ export async function scheduleCoalescedJob( job: CascadeJob, coalesceKey: string, delayMs: number, ): Promise { - const jobId = `coalesce:${coalesceKey}`; + // Build a colon-free unique jobId. BullMQ rejects custom ids that contain + // `:` unless the id has exactly 3 colon-separated parts (legacy repeatable- + // job compatibility); the prior deterministic `coalesce:${coalesceKey}` + // happened to have 3 parts (`coalesce`, projectId, workItemId) so it + // passed, but a 4th `:${timestamp}` segment would not. Using `_` as the + // internal separator also keeps the id compatible with Docker container + // names (which reject colons — verified by the spec-017 follow-up + // hotfix at src/router/container-manager.ts:485). + const safeKey = coalesceKey.replace(/:/g, '_'); + const newJobId = `coalesce_${safeKey}_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`; + + // Find any pending (delayed/waiting) jobs for the same coalesceKey by + // matching the BullMQ "job name". Note: getDelayed/getWaiting do NOT + // include active/completed/failed jobs — the supersede behavior is by + // design scoped to "events that haven't fired yet". + const [delayed, waiting] = await Promise.all([jobQueue.getDelayed(), jobQueue.getWaiting()]); + const pending = [...delayed, ...waiting].filter((j) => j.name === coalesceKey); + let superseded = false; let supersededJobData: CascadeJob | undefined; - - // Remove any existing delayed/waiting job with the same key so the new - // job supersedes it. Active jobs are left alone — they are already running. - // - // TOCTOU NOTE: The getJob → getState → remove → add sequence is not atomic. - // Two concurrent webhook handlers for the same coalesceKey can both read the - // existing delayed job, both attempt remove() (the second no-ops silently), - // and then both call add() — but BullMQ silently ignores a duplicate jobId - // for a non-completed job, so the second event's data is lost. In practice - // this race is rare: the coalesce window exists for events tens-to-hundreds - // of milliseconds apart, not truly simultaneous arrivals. A Lua-script - // atomic compare-and-replace would close this, but the operational impact is - // low enough that a documented best-effort approach is acceptable here. - const existing = await jobQueue.getJob(jobId); - if (existing) { - const state = await existing.getState(); - if (state === 'delayed' || state === 'waiting') { - // Capture job data before removal so the caller can release orphaned locks. - supersededJobData = existing.data; - await existing.remove(); - superseded = true; - } else if (state === 'active') { - // An active (running) job already holds this ID. BullMQ would - // silently ignore add() for a duplicate active jobId — no new job - // would be created, but the caller wouldn't know and would mark - // locks incorrectly. Return activeExists=true so the caller can - // log accurately and skip marking new in-memory locks. - logger.info('Coalesced job skipped — active job with same ID already running', { - jobId, - coalesceKey, - }); - return { jobId, superseded: false, activeExists: true }; - } + if (pending.length > 0) { + // Capture the first job's data for lock cleanup. Multiple concurrent + // schedules for the same key are uncommon (the window is 10s), but + // remove() ALL matching pending jobs to keep the queue tidy. + supersededJobData = pending[0].data as CascadeJob; + await Promise.all(pending.map((j) => j.remove())); + superseded = true; } - await jobQueue.add(job.type, job, { jobId, delay: delayMs }); - logger.info('Coalesced job scheduled', { jobId, coalesceKey, delayMs, superseded }); - return { jobId, superseded, supersededJobData }; + await jobQueue.add(coalesceKey, job, { jobId: newJobId, delay: delayMs }); + logger.info('Coalesced job scheduled', { + jobId: newJobId, + coalesceKey, + delayMs, + superseded, + supersededCount: pending.length, + }); + + return { jobId: newJobId, superseded, supersededJobData }; } // Get queue stats diff --git a/src/router/snapshot-cleanup.ts b/src/router/snapshot-cleanup.ts index 210ef434..8d8d5179 100644 --- a/src/router/snapshot-cleanup.ts +++ b/src/router/snapshot-cleanup.ts @@ -15,7 +15,7 @@ import Docker from 'dockerode'; import { captureException } from '../sentry.js'; import { logger } from '../utils/logging.js'; import { routerConfig } from './config.js'; -import { evictSnapshots, type SnapshotMetadata } from './snapshot-manager.js'; +import { evictSnapshots, invalidateSnapshot, type SnapshotMetadata } from './snapshot-manager.js'; const SNAPSHOT_CLEANUP_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes @@ -77,7 +77,7 @@ function dockerStatusCode(err: unknown): number | undefined { * a running container is preserved (Docker returns 409). 404 means the image * has already been removed by some other path. Both are harmless and silent. */ -async function removeSnapshotImage(metadata: SnapshotMetadata): Promise { +export async function removeSnapshotImage(metadata: SnapshotMetadata): Promise { try { await docker.getImage(metadata.imageName).remove({ force: false }); logger.info('[SnapshotCleanup] Removed snapshot image:', { @@ -130,3 +130,23 @@ export async function runSnapshotCleanup(): Promise { logger.info('[SnapshotCleanup] Cleanup pass complete:', { count: evicted.length }); } + +/** + * Eagerly invalidate a snapshot AND remove its Docker image. Called on PR + * merge: the snapshot was built for a specific work-item state and is no + * longer valid. Without this, `invalidateSnapshot` alone clears the registry + * entry but leaves the image in Docker storage — and the periodic 5-min + * cleanup loop iterates registry entries only, so the orphaned image is + * leaked permanently. That leak is what filled prod disk to 100% on + * 2026-04-29. + * + * Safe to call when no snapshot exists for the pair (no-op). + */ +export async function invalidateAndRemoveSnapshot( + projectId: string, + workItemId: string, +): Promise { + const removed = invalidateSnapshot(projectId, workItemId); + if (!removed) return; + await removeSnapshotImage(removed); +} diff --git a/src/router/snapshot-manager.ts b/src/router/snapshot-manager.ts index 7559d8f0..6669229f 100644 --- a/src/router/snapshot-manager.ts +++ b/src/router/snapshot-manager.ts @@ -155,17 +155,27 @@ export function getSnapshot( /** * Invalidate (remove) snapshot metadata for a project+workItem pair. - * Safe to call even if no snapshot exists. + * Returns the removed metadata so the caller can `docker rmi` the underlying + * image. Returns undefined when no entry was registered. Removing the in-memory + * entry without removing the image would orphan the image (the periodic cleanup + * loop iterates registry entries only) — callers MUST act on the returned + * metadata. See snapshot-cleanup.invalidateAndRemoveSnapshot for the canonical + * caller. */ -export function invalidateSnapshot(projectId: string, workItemId: string): void { +export function invalidateSnapshot( + projectId: string, + workItemId: string, +): SnapshotMetadata | undefined { const key = snapshotKey(projectId, workItemId); - const hadEntry = snapshots.delete(key); - if (hadEntry) { + const removed = snapshots.get(key); + snapshots.delete(key); + if (removed) { logger.info('[SnapshotManager] Snapshot invalidated:', { projectId, workItemId, }); } + return removed; } /** diff --git a/src/router/webhook-processor.ts b/src/router/webhook-processor.ts index 80baa2e1..3992f604 100644 --- a/src/router/webhook-processor.ts +++ b/src/router/webhook-processor.ts @@ -164,31 +164,18 @@ export async function processRouterWebhook( } // Schedule as a delayed BullMQ job; supersedes any prior pending job - // with the same key so only the latest event fires. + // with the same key so only the latest event fires within the window. + // Each schedule produces a UNIQUE jobId — active/completed/failed jobs + // for the same coalesceKey do NOT block a new schedule (the prior + // deterministic-id design silently dropped events; see the + // `scheduleCoalescedJob` JSDoc for the live MNG-422 incident). try { - const { superseded, supersededJobData, activeExists } = await scheduleCoalescedJob( + const { superseded, supersededJobData } = await scheduleCoalescedJob( job, result.coalesceKey, windowMs, ); - // When an active job is already running for this coalesceKey, BullMQ - // would silently ignore any new add(). No new job was created, so skip - // lock marking and return an accurate decision reason. - if (activeExists) { - logger.info(`${adapter.type} coalesced dispatch skipped — active job already running`, { - agentType: result.agentType, - workItemId: result.workItemId, - projectId: project.id, - coalesceKey: result.coalesceKey, - }); - return { - shouldProcess: true, - projectId: project.id, - decisionReason: `Coalesced dispatch skipped: active job already running for work item ${result.workItemId ?? '(unknown)'}`, - }; - } - if (superseded) { logger.info(`${adapter.type} coalesced dispatch superseded prior pending job`, { agentType: result.agentType, diff --git a/src/triggers/github/pr-merged.ts b/src/triggers/github/pr-merged.ts index 0d145051..9d86b0d7 100644 --- a/src/triggers/github/pr-merged.ts +++ b/src/triggers/github/pr-merged.ts @@ -1,7 +1,7 @@ import { githubClient } from '../../github/client.js'; import { getPMProvider } from '../../pm/context.js'; import { resolveProjectPMConfig } from '../../pm/lifecycle.js'; -import { invalidateSnapshot } from '../../router/snapshot-manager.js'; +import { invalidateAndRemoveSnapshot } from '../../router/snapshot-cleanup.js'; import type { TriggerContext, TriggerHandler, TriggerResult } from '../../types/index.js'; import { logger } from '../../utils/logging.js'; import { parseRepoFullName } from '../../utils/repo.js'; @@ -47,10 +47,17 @@ export class PRMergedTrigger implements TriggerHandler { return null; } - // Fire-and-forget: invalidate any stale snapshot for this work item now that - // the PR is merged. The snapshot was built for a specific state of the work - // item and is no longer valid after the work is done. - invalidateSnapshot(ctx.project.id, workItemId); + // Fire-and-forget: invalidate the registry entry AND `docker rmi` the + // underlying image. Registry-only invalidation orphans the image (the + // periodic cleanup loop iterates registry entries) — see 2026-04-29 + // disk-fill incident. + void invalidateAndRemoveSnapshot(ctx.project.id, workItemId).catch((err: unknown) => { + logger.warn('Failed to invalidate+remove snapshot on PR merge', { + projectId: ctx.project.id, + workItemId, + error: String(err), + }); + }); const pmConfig = resolveProjectPMConfig(ctx.project); const mergedStatus = pmConfig.statuses.merged; diff --git a/tests/integration/coalesce-bullmq.test.ts b/tests/integration/coalesce-bullmq.test.ts index 2a176e2d..b51a4f76 100644 --- a/tests/integration/coalesce-bullmq.test.ts +++ b/tests/integration/coalesce-bullmq.test.ts @@ -1,8 +1,18 @@ /** - * Integration test for BullMQ delayed-job coalescing (spec — PM coalesce). + * Integration test for BullMQ delayed-job coalescing (PM coalesce flow). * - * Tests that `scheduleCoalescedJob` correctly supersedes prior pending - * delayed jobs in a real BullMQ Queue backed by a real Redis instance. + * Tests that the unique-jobId / job-name-as-coalesce-key contract correctly: + * - schedules a new delayed job when no prior pending job exists, + * - supersedes prior delayed/waiting jobs for the same coalesceKey, + * - does NOT block when a prior job for the same coalesceKey is in + * `'completed'`, `'failed'`, or `'active'` state — the new schedule + * always succeeds with its own unique jobId. + * + * The "does NOT block on completed/active" cases are the regression pins + * for the live MNG-422 incident on 2026-04-29: the old deterministic-jobId + * design caused BullMQ's `add()` to silently no-op when a prior job with + * the same id was in the completed (24h-retained) or active set, and + * webhooks for that work item were lost. * * These tests require a running Redis server. They use a dedicated test * queue name to avoid interfering with the production cascade-jobs queue. @@ -28,6 +38,7 @@ beforeAll(async () => { await testQueue.clean(0, 100, 'wait'); await testQueue.clean(0, 100, 'completed'); await testQueue.clean(0, 100, 'failed'); + await testQueue.clean(0, 100, 'active'); }); afterEach(async () => { @@ -35,6 +46,9 @@ afterEach(async () => { await testQueue.drain(); await testQueue.clean(0, 100, 'delayed'); await testQueue.clean(0, 100, 'wait'); + await testQueue.clean(0, 100, 'completed'); + await testQueue.clean(0, 100, 'failed'); + await testQueue.clean(0, 100, 'active'); }); afterAll(async () => { @@ -43,27 +57,35 @@ afterAll(async () => { // --------------------------------------------------------------------------- // Local version of scheduleCoalescedJob that targets the test queue. +// Mirrors the production algorithm in src/router/queue.ts:scheduleCoalescedJob: +// - unique jobId per call (timestamp + random suffix), +// - coalesceKey stored as the BullMQ "job name", +// - supersede only delayed/waiting jobs for the same name (not active / +// completed / failed — those have their own work in flight or already +// done; the new event must run on its own). // --------------------------------------------------------------------------- async function scheduleOnTestQueue( jobData: Record, coalesceKey: string, delayMs: number, -): Promise<{ jobId: string; superseded: boolean }> { - const jobId = `coalesce:${coalesceKey}`; - let superseded = false; +): Promise<{ jobId: string; superseded: boolean; supersededCount: number }> { + // Colon-free jobId: BullMQ rejects custom ids that contain `:` unless they + // have exactly 3 colon-separated parts. Mirrors src/router/queue.ts. + const safeKey = coalesceKey.replace(/:/g, '_'); + const newJobId = `coalesce_${safeKey}_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`; + + const [delayed, waiting] = await Promise.all([testQueue.getDelayed(), testQueue.getWaiting()]); + const pending = [...delayed, ...waiting].filter((j) => j.name === coalesceKey); - const existing = await testQueue.getJob(jobId); - if (existing) { - const state = await existing.getState(); - if (state === 'delayed' || state === 'waiting') { - await existing.remove(); - superseded = true; - } + let superseded = false; + if (pending.length > 0) { + await Promise.all(pending.map((j) => j.remove())); + superseded = true; } - await testQueue.add('test', jobData, { jobId, delay: delayMs }); - return { jobId, superseded }; + await testQueue.add(coalesceKey, jobData, { jobId: newJobId, delay: delayMs }); + return { jobId: newJobId, superseded, supersededCount: pending.length }; } // --------------------------------------------------------------------------- @@ -78,17 +100,17 @@ describe('scheduleCoalescedJob — real BullMQ delayed-job supersede', () => { 60_000, // 1-minute delay so the job doesn't fire during the test ); - expect(jobId).toBe('coalesce:test-project:PROJ-1'); + expect(jobId).toMatch(/^coalesce_test-project_PROJ-1_/); expect(superseded).toBe(false); const job = await testQueue.getJob(jobId); expect(job).not.toBeNull(); + expect(job?.name).toBe('test-project:PROJ-1'); const state = await job?.getState(); expect(state).toBe('delayed'); }); it('supersedes a prior delayed job with the same coalesceKey', async () => { - // First dispatch (create event). const first = await scheduleOnTestQueue( { type: 'jira', issueKey: 'PROJ-2', agentType: 'implementation' }, 'test-project:PROJ-2', @@ -96,19 +118,23 @@ describe('scheduleCoalescedJob — real BullMQ delayed-job supersede', () => { ); expect(first.superseded).toBe(false); - // Second dispatch (update event — same key, should supersede first). const second = await scheduleOnTestQueue( { type: 'jira', issueKey: 'PROJ-2', agentType: 'planning' }, 'test-project:PROJ-2', 60_000, ); expect(second.superseded).toBe(true); - expect(second.jobId).toBe('coalesce:test-project:PROJ-2'); + expect(second.jobId).not.toBe(first.jobId); // unique-id contract - // Only one delayed job should exist; its data should be the latest. - const job = await testQueue.getJob('coalesce:test-project:PROJ-2'); - expect(job).not.toBeNull(); - expect((job?.data as { agentType?: string }).agentType).toBe('planning'); + // Exactly one delayed job remains for the coalesceKey, with the latest data. + const delayed = await testQueue.getDelayed(); + const matching = delayed.filter((j) => j.name === 'test-project:PROJ-2'); + expect(matching).toHaveLength(1); + expect((matching[0].data as { agentType?: string }).agentType).toBe('planning'); + + // The first job should be removed entirely (not findable by id). + const firstStillThere = await testQueue.getJob(first.jobId); + expect(firstStillThere).toBeUndefined(); }); it('different coalesceKeys do not interfere with each other', async () => { @@ -126,20 +152,33 @@ describe('scheduleCoalescedJob — real BullMQ delayed-job supersede', () => { expect(resultA.superseded).toBe(false); expect(resultB.superseded).toBe(false); - // Both jobs should exist independently. - const jobA = await testQueue.getJob('coalesce:project-a:PROJ-3'); - const jobB = await testQueue.getJob('coalesce:project-b:PROJ-4'); + const jobA = await testQueue.getJob(resultA.jobId); + const jobB = await testQueue.getJob(resultB.jobId); expect(jobA).not.toBeNull(); expect(jobB).not.toBeNull(); }); it('triple supersede: last writer wins', async () => { - await scheduleOnTestQueue({ agentType: 'splitting' }, 'proj:TRIPLE', 60_000); - await scheduleOnTestQueue({ agentType: 'planning' }, 'proj:TRIPLE', 60_000); + const first = await scheduleOnTestQueue({ agentType: 'splitting' }, 'proj:TRIPLE', 60_000); + const second = await scheduleOnTestQueue({ agentType: 'planning' }, 'proj:TRIPLE', 60_000); const third = await scheduleOnTestQueue({ agentType: 'implementation' }, 'proj:TRIPLE', 60_000); + expect(first.superseded).toBe(false); + expect(second.superseded).toBe(true); expect(third.superseded).toBe(true); - const job = await testQueue.getJob('coalesce:proj:TRIPLE'); - expect((job?.data as { agentType?: string }).agentType).toBe('implementation'); + + const delayed = await testQueue.getDelayed(); + const matching = delayed.filter((j) => j.name === 'proj:TRIPLE'); + expect(matching).toHaveLength(1); + expect((matching[0].data as { agentType?: string }).agentType).toBe('implementation'); }); + + // NOTE: completed/failed regression pins live in the unit suite at + // `tests/unit/router/queue.test.ts` — moving a real BullMQ job to + // completed/failed requires a worker lock token, which would mean + // spinning up a Worker in this test (significantly more complex setup + // for marginally more confidence than the unit tests already give us). + // The contract under test ("a non-pending prior job does not block a + // new schedule") is fundamentally about NOT consulting completed/failed + // in the supersede pass, which is straightforward to verify by mock. }); diff --git a/tests/unit/router/queue.test.ts b/tests/unit/router/queue.test.ts index 3a3f0efb..b214c444 100644 --- a/tests/unit/router/queue.test.ts +++ b/tests/unit/router/queue.test.ts @@ -6,21 +6,18 @@ import { beforeEach, describe, expect, it, vi } from 'vitest'; // inside factory closures. // --------------------------------------------------------------------------- -const { mockJobInstance, mockQueueInstance } = vi.hoisted(() => { - const mockJobInstance = { - getState: vi.fn(), - remove: vi.fn(), - }; +const { mockQueueInstance } = vi.hoisted(() => { const mockQueueInstance = { on: vi.fn(), add: vi.fn().mockResolvedValue({ id: 'test-job-id' }), - getJob: vi.fn().mockResolvedValue(null), + getDelayed: vi.fn().mockResolvedValue([]), + getWaiting: vi.fn().mockResolvedValue([]), getWaitingCount: vi.fn().mockResolvedValue(0), getActiveCount: vi.fn().mockResolvedValue(0), getCompletedCount: vi.fn().mockResolvedValue(0), getFailedCount: vi.fn().mockResolvedValue(0), }; - return { mockJobInstance, mockQueueInstance }; + return { mockQueueInstance }; }); vi.mock('bullmq', () => ({ @@ -61,73 +58,148 @@ const sampleJob: CascadeJob = { receivedAt: new Date().toISOString(), }; +interface FakeBullJob { + name: string; + data: CascadeJob; + remove: ReturnType; +} + +function makeFakeJob(name: string, data: CascadeJob): FakeBullJob { + return { + name, + data, + remove: vi.fn().mockResolvedValue(undefined), + }; +} + describe('scheduleCoalescedJob', () => { beforeEach(() => { - mockQueueInstance.getJob.mockResolvedValue(null); - mockQueueInstance.add.mockResolvedValue({ id: 'coalesce:proj-1:PROJ-42' }); - mockJobInstance.getState.mockReset(); - mockJobInstance.remove.mockReset(); + mockQueueInstance.getDelayed.mockResolvedValue([]); + mockQueueInstance.getWaiting.mockResolvedValue([]); + mockQueueInstance.add.mockResolvedValue({ id: 'mock-id' }); }); - it('schedules a new delayed job when no existing job exists', async () => { - mockQueueInstance.getJob.mockResolvedValue(null); - + it('schedules a new delayed job when no prior pending job exists', async () => { const result = await scheduleCoalescedJob(sampleJob, 'proj-1:PROJ-42', 10_000); - expect(result.jobId).toBe('coalesce:proj-1:PROJ-42'); expect(result.superseded).toBe(false); + expect(result.supersededJobData).toBeUndefined(); + expect(result.jobId).toMatch(/^coalesce_proj-1_PROJ-42_/); + // The BullMQ "job name" is the coalesceKey — that's what `getDelayed/getWaiting` + // filter on to find supersede targets. expect(mockQueueInstance.add).toHaveBeenCalledWith( - 'jira', + 'proj-1:PROJ-42', sampleJob, - expect.objectContaining({ jobId: 'coalesce:proj-1:PROJ-42', delay: 10_000 }), + expect.objectContaining({ jobId: result.jobId, delay: 10_000 }), ); }); - it('removes existing delayed job and returns superseded=true with supersededJobData', async () => { - const existingData: CascadeJob = { + it('supersedes a prior delayed job with the same coalesceKey', async () => { + const priorData: CascadeJob = { ...sampleJob, - projectId: 'proj-old', - triggerResult: { agentType: 'planning', workItemId: 'PROJ-42', agentInput: {} }, + triggerResult: { + agentType: 'planning', + workItemId: 'PROJ-42', + agentInput: {}, + }, }; - const mockJobWithData = { ...mockJobInstance, data: existingData }; - mockJobWithData.getState = vi.fn().mockResolvedValue('delayed'); - mockJobWithData.remove = vi.fn().mockResolvedValue(undefined); - mockQueueInstance.getJob.mockResolvedValue(mockJobWithData); + const priorJob = makeFakeJob('proj-1:PROJ-42', priorData); + mockQueueInstance.getDelayed.mockResolvedValue([priorJob]); const result = await scheduleCoalescedJob(sampleJob, 'proj-1:PROJ-42', 10_000); expect(result.superseded).toBe(true); - expect(result.supersededJobData).toEqual(existingData); - expect(mockJobWithData.remove).toHaveBeenCalledOnce(); + expect(result.supersededJobData).toEqual(priorData); + expect(priorJob.remove).toHaveBeenCalledOnce(); expect(mockQueueInstance.add).toHaveBeenCalledWith( - 'jira', + 'proj-1:PROJ-42', sampleJob, - expect.objectContaining({ jobId: 'coalesce:proj-1:PROJ-42', delay: 10_000 }), + expect.objectContaining({ jobId: result.jobId, delay: 10_000 }), ); }); - it('returns activeExists=true and skips add() when an active job has the same ID', async () => { - mockJobInstance.getState.mockResolvedValue('active'); - mockJobInstance.remove.mockResolvedValue(undefined); - mockQueueInstance.getJob.mockResolvedValue(mockJobInstance); + it('supersedes a prior waiting job (in addition to delayed)', async () => { + const priorJob = makeFakeJob('proj-1:PROJ-42', sampleJob); + mockQueueInstance.getWaiting.mockResolvedValue([priorJob]); + + const result = await scheduleCoalescedJob(sampleJob, 'proj-1:PROJ-42', 10_000); + + expect(result.superseded).toBe(true); + expect(priorJob.remove).toHaveBeenCalledOnce(); + }); + + it('regression pin (MNG-422 live bug 2026-04-29): does NOT block the new schedule when an active job has the same coalesceKey', async () => { + // Active jobs do NOT appear in getDelayed/getWaiting — they're in the + // 'active' set. The new helper deliberately ignores active jobs so the + // new event always gets its own delayed dispatch. Before this fix, the + // helper reused the deterministic jobId `coalesce:${coalesceKey}` and + // BullMQ silently no-op'd add() because of the duplicate id; the + // splitting agent for MNG-422 was silently dropped while planning was + // still running. + mockQueueInstance.getDelayed.mockResolvedValue([]); + mockQueueInstance.getWaiting.mockResolvedValue([]); const result = await scheduleCoalescedJob(sampleJob, 'proj-1:PROJ-42', 10_000); expect(result.superseded).toBe(false); - expect(result.activeExists).toBe(true); - expect(mockJobInstance.remove).not.toHaveBeenCalled(); - // Must NOT add a new job — BullMQ would silently ignore it for active IDs - // and the caller would incorrectly mark locks for a non-existent job. - expect(mockQueueInstance.add).not.toHaveBeenCalled(); + expect(mockQueueInstance.add).toHaveBeenCalledOnce(); + expect(mockQueueInstance.add).toHaveBeenCalledWith( + 'proj-1:PROJ-42', + sampleJob, + expect.objectContaining({ jobId: result.jobId, delay: 10_000 }), + ); }); - it('uses the coalesceKey to derive the BullMQ job ID', async () => { + it('regression pin: does NOT block the new schedule when a completed/failed job exists with the same coalesceKey', async () => { + // Completed/failed jobs also do NOT appear in getDelayed/getWaiting. + // Before this fix, the helper would fall through to add() with the + // deterministic jobId, BullMQ silently no-op'd because the completed + // job (kept for 24h via `removeOnComplete: { age: 86400 }`) still held + // the id. New webhooks within 24h after a planning run would silently + // disappear. + mockQueueInstance.getDelayed.mockResolvedValue([]); + mockQueueInstance.getWaiting.mockResolvedValue([]); + + const result = await scheduleCoalescedJob(sampleJob, 'proj-1:PROJ-42', 10_000); + + expect(result.superseded).toBe(false); + expect(mockQueueInstance.add).toHaveBeenCalledOnce(); + }); + + it('only supersedes pending jobs whose name === coalesceKey (does not touch unrelated jobs)', async () => { + const matching = makeFakeJob('proj-1:PROJ-42', sampleJob); + const unrelated = makeFakeJob('proj-2:OTHER-99', sampleJob); + mockQueueInstance.getDelayed.mockResolvedValue([matching, unrelated]); + + await scheduleCoalescedJob(sampleJob, 'proj-1:PROJ-42', 10_000); + + expect(matching.remove).toHaveBeenCalledOnce(); + expect(unrelated.remove).not.toHaveBeenCalled(); + }); + + it('returns a unique jobId on each call (regression pin against deterministic-id reuse)', async () => { + const a = await scheduleCoalescedJob(sampleJob, 'proj-1:PROJ-42', 10_000); + // Force a non-zero delta so the timestamp suffix differs even on fast clocks. + await new Promise((r) => setTimeout(r, 2)); + const b = await scheduleCoalescedJob(sampleJob, 'proj-1:PROJ-42', 10_000); + + expect(a.jobId).not.toBe(b.jobId); + expect(a.jobId).toMatch(/^coalesce_proj-1_PROJ-42_/); + expect(b.jobId).toMatch(/^coalesce_proj-1_PROJ-42_/); + }); + + it('uses the coalesceKey as the BullMQ job name and as a colon-replaced prefix in the jobId', async () => { const result = await scheduleCoalescedJob(sampleJob, 'my-project:ISSUE-99', 5_000); - expect(result.jobId).toBe('coalesce:my-project:ISSUE-99'); + + // jobId has colons replaced with `_` so BullMQ accepts it and Docker + // container names derived from it stay valid. + expect(result.jobId).toMatch(/^coalesce_my-project_ISSUE-99_/); + expect(result.jobId).not.toContain(':'); + // The BullMQ name (which we filter on for supersede) keeps the original colons. expect(mockQueueInstance.add).toHaveBeenCalledWith( - expect.any(String), + 'my-project:ISSUE-99', expect.anything(), - expect.objectContaining({ jobId: 'coalesce:my-project:ISSUE-99' }), + expect.objectContaining({ jobId: result.jobId, delay: 5_000 }), ); }); }); diff --git a/tests/unit/router/snapshot-cleanup.test.ts b/tests/unit/router/snapshot-cleanup.test.ts index a19249bf..a2847ce2 100644 --- a/tests/unit/router/snapshot-cleanup.test.ts +++ b/tests/unit/router/snapshot-cleanup.test.ts @@ -50,8 +50,11 @@ vi.mock('../../../src/router/config.js', () => ({ }, })); +const mockInvalidateSnapshot = vi.fn(); + vi.mock('../../../src/router/snapshot-manager.js', () => ({ evictSnapshots: (...args: unknown[]) => mockEvictSnapshots(...args), + invalidateSnapshot: (...args: unknown[]) => mockInvalidateSnapshot(...args), })); // --------------------------------------------------------------------------- @@ -59,6 +62,7 @@ vi.mock('../../../src/router/snapshot-manager.js', () => ({ // --------------------------------------------------------------------------- import { + invalidateAndRemoveSnapshot, runSnapshotCleanup, startSnapshotCleanup, stopSnapshotCleanup, @@ -236,4 +240,59 @@ describe('snapshot-cleanup', () => { expect(mockImageRemove).toHaveBeenCalledTimes(3); }); }); + + // ------------------------------------------------------------------------- + // invalidateAndRemoveSnapshot — eager rmi when a PR merges (no 5-min wait) + // ------------------------------------------------------------------------- + + describe('invalidateAndRemoveSnapshot', () => { + beforeEach(() => { + mockInvalidateSnapshot.mockReset(); + }); + + it('calls docker.getImage().remove() when invalidate returned metadata', async () => { + // Live regression: prior to this fix, pr-merged.ts called + // `invalidateSnapshot()` (registry-only) and never called + // `docker rmi`. The image was orphaned in the Docker daemon — + // the periodic 5-min cleanup loop only iterates registry entries + // and would never see it. The unregistered image leaked + // permanently, contributing to today's disk-fill incident. + const metadata = makeMetadata({ imageName: 'cascade-snapshot-proj-1-card-abc:latest' }); + mockInvalidateSnapshot.mockReturnValue(metadata); + + await invalidateAndRemoveSnapshot('proj-1', 'card-abc'); + + expect(mockInvalidateSnapshot).toHaveBeenCalledWith('proj-1', 'card-abc'); + expect(mockDockerGetImage).toHaveBeenCalledWith('cascade-snapshot-proj-1-card-abc:latest'); + expect(mockImageRemove).toHaveBeenCalledOnce(); + }); + + it('is a no-op when no snapshot was registered (no docker call)', async () => { + mockInvalidateSnapshot.mockReturnValue(undefined); + + await invalidateAndRemoveSnapshot('proj-missing', 'card-missing'); + + expect(mockInvalidateSnapshot).toHaveBeenCalledWith('proj-missing', 'card-missing'); + expect(mockDockerGetImage).not.toHaveBeenCalled(); + expect(mockImageRemove).not.toHaveBeenCalled(); + }); + + it('swallows 404 (image already gone) without warning or sentry capture', async () => { + mockInvalidateSnapshot.mockReturnValue(makeMetadata()); + mockImageRemove.mockRejectedValueOnce(makeDockerError(404, 'no such image')); + + await expect(invalidateAndRemoveSnapshot('proj-1', 'card-1')).resolves.toBeUndefined(); + expect(mockLogger.warn).not.toHaveBeenCalled(); + expect(mockCaptureException).not.toHaveBeenCalled(); + }); + + it('swallows 409 (image in use) without warning or sentry capture', async () => { + mockInvalidateSnapshot.mockReturnValue(makeMetadata()); + mockImageRemove.mockRejectedValueOnce(makeDockerError(409, 'image in use')); + + await expect(invalidateAndRemoveSnapshot('proj-1', 'card-1')).resolves.toBeUndefined(); + expect(mockLogger.warn).not.toHaveBeenCalled(); + expect(mockCaptureException).not.toHaveBeenCalled(); + }); + }); }); diff --git a/tests/unit/router/snapshot-manager.test.ts b/tests/unit/router/snapshot-manager.test.ts index b0825ea7..bb548b01 100644 --- a/tests/unit/router/snapshot-manager.test.ts +++ b/tests/unit/router/snapshot-manager.test.ts @@ -203,6 +203,27 @@ describe('snapshot-manager', () => { expect(getSnapshot('proj-1', 'card-2')).toBeDefined(); expect(getSnapshotCount()).toBe(1); }); + + // Spec: invalidate-on-PR-merge should also `docker rmi` the image. The + // caller (pr-merged trigger) owns the rmi via the snapshot-cleanup + // helper; for that helper to know which image to remove, invalidate + // must return the metadata of the entry it removed (or undefined + // when nothing was registered). + it('returns the removed metadata when an entry was registered', () => { + registerSnapshot('proj-1', 'card-abc', 'cascade-snapshot-proj-1-card-abc:latest'); + + const removed = invalidateSnapshot('proj-1', 'card-abc'); + + expect(removed).toBeDefined(); + expect(removed?.imageName).toBe('cascade-snapshot-proj-1-card-abc:latest'); + expect(removed?.projectId).toBe('proj-1'); + expect(removed?.workItemId).toBe('card-abc'); + }); + + it('returns undefined when no entry was registered', () => { + const removed = invalidateSnapshot('proj-missing', 'card-missing'); + expect(removed).toBeUndefined(); + }); }); // ------------------------------------------------------------------------- diff --git a/tests/unit/router/webhook-processor.test.ts b/tests/unit/router/webhook-processor.test.ts index 8541cd4a..6f4b57de 100644 --- a/tests/unit/router/webhook-processor.test.ts +++ b/tests/unit/router/webhook-processor.test.ts @@ -702,11 +702,18 @@ describe('processRouterWebhook', () => { expect(markAgentTypeEnqueued).toHaveBeenCalled(); }); - it('skips lock marking when activeExists=true (no new job was created)', async () => { + it('regression pin (MNG-422 2026-04-29): an active job for the same coalesceKey does NOT block a new schedule — locks marked + scheduled decision reason', async () => { + // Before the unique-jobId rewrite, an active job for the same + // coalesceKey caused scheduleCoalescedJob to return + // `activeExists: true` and the caller dropped the new event entirely. + // That silently lost the splitting agent for MNG-422 while planning + // was still running. After the rewrite, scheduleCoalescedJob always + // produces a fresh unique jobId; the active prior job no longer + // blocks the new schedule. Locks ARE marked for the new job and the + // decision reason is the normal "Coalesced dispatch scheduled". vi.mocked(scheduleCoalescedJob).mockResolvedValue({ - jobId: 'coalesce:p1:PROJ-1', + jobId: 'coalesce:p1:PROJ-1:1234567890-abc123', superseded: false, - activeExists: true, }); const adapter = makeMockAdapter({ type: 'jira', @@ -720,11 +727,11 @@ describe('processRouterWebhook', () => { const result = await processRouterWebhook(adapter, {}, mockTriggerRegistry); - // No new job → must not mark any in-memory locks - expect(markWorkItemEnqueued).not.toHaveBeenCalled(); - expect(markAgentTypeEnqueued).not.toHaveBeenCalled(); - expect(markRecentlyDispatched).not.toHaveBeenCalled(); - expect(result.decisionReason).toMatch(/active job already running/); + // New job WAS created → locks must be marked + expect(markWorkItemEnqueued).toHaveBeenCalled(); + expect(markAgentTypeEnqueued).toHaveBeenCalled(); + expect(result.decisionReason).toMatch(/Coalesced dispatch scheduled/); + expect(result.decisionReason).not.toMatch(/active job already running/); }); it('falls back to normal dispatch when PM_COALESCE_WINDOW_MS=0 (disable)', async () => { diff --git a/tests/unit/triggers/pr-merged.test.ts b/tests/unit/triggers/pr-merged.test.ts index b11d3904..2bf61481 100644 --- a/tests/unit/triggers/pr-merged.test.ts +++ b/tests/unit/triggers/pr-merged.test.ts @@ -52,10 +52,14 @@ vi.mock('../../../src/db/repositories/prWorkItemsRepository.js', () => ({ lookupWorkItemForPR: vi.fn(), })); -// Mock the snapshot manager so we can verify invalidation calls -const mockInvalidateSnapshot = vi.fn(); -vi.mock('../../../src/router/snapshot-manager.js', () => ({ - invalidateSnapshot: (...args: unknown[]) => mockInvalidateSnapshot(...args), +// Mock the snapshot-cleanup helper so we can verify invalidation+rmi calls. +// pr-merged calls `invalidateAndRemoveSnapshot` (the helper that BOTH clears +// the in-memory registry AND `docker rmi`s the image), not the registry-only +// `invalidateSnapshot`. The latter alone leaks the docker image — see the +// 2026-04-29 disk-fill incident. +const mockInvalidateAndRemoveSnapshot = vi.fn().mockResolvedValue(undefined); +vi.mock('../../../src/router/snapshot-cleanup.js', () => ({ + invalidateAndRemoveSnapshot: (...args: unknown[]) => mockInvalidateAndRemoveSnapshot(...args), })); // Register PM integrations in the registry via the canonical bootstrap path @@ -91,7 +95,8 @@ describe('PRMergedTrigger', () => { beforeEach(() => { vi.mocked(lookupWorkItemForPR).mockResolvedValue('abc123'); vi.mocked(checkTriggerEnabled).mockResolvedValue(true); - mockInvalidateSnapshot.mockClear(); + mockInvalidateAndRemoveSnapshot.mockReset(); + mockInvalidateAndRemoveSnapshot.mockResolvedValue(undefined); }); describe('matches', () => { @@ -703,7 +708,7 @@ describe('PRMergedTrigger', () => { await trigger.handle(ctx); // Snapshot should be invalidated for the project+workItem pair - expect(mockInvalidateSnapshot).toHaveBeenCalledWith(mockProject.id, 'abc123'); + expect(mockInvalidateAndRemoveSnapshot).toHaveBeenCalledWith(mockProject.id, 'abc123'); }); it('does not invalidate snapshot when PR is not merged', async () => { @@ -737,7 +742,7 @@ describe('PRMergedTrigger', () => { await trigger.handle(ctx); // No invalidation when PR is not merged - expect(mockInvalidateSnapshot).not.toHaveBeenCalled(); + expect(mockInvalidateAndRemoveSnapshot).not.toHaveBeenCalled(); }); it('does not invalidate snapshot when no work item is linked', async () => { @@ -772,7 +777,7 @@ describe('PRMergedTrigger', () => { await trigger.handle(ctx); // No invalidation when there's no linked work item - expect(mockInvalidateSnapshot).not.toHaveBeenCalled(); + expect(mockInvalidateAndRemoveSnapshot).not.toHaveBeenCalled(); }); }); });