Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
948f130
feat(alerting): add configurable PM results list for alerting agent +…
Apr 27, 2026
738cb81
fix(alerting): map PM provider to correct discovery capability in con…
Apr 27, 2026
6b5417f
fix(triggers): persist workItemId on respond-to-review and respond-to…
zbigniewsobiecki Apr 29, 2026
0354857
Merge pull request #1220 from mongrel-intelligence/fix/respond-to-rev…
zbigniewsobiecki Apr 29, 2026
e91a834
fix(alerting): fix PM container picker to fetch correct sub-container…
Apr 29, 2026
6abf45e
docs(spec): 017 router-silent-failure-hardening + plans
zbigniewsobiecki Apr 29, 2026
26717c3
chore(plan): 017/3 lock
zbigniewsobiecki Apr 29, 2026
ddb90ea
test: add coverage for buildExecutionPlan in secretOrchestrator
Apr 29, 2026
8a72616
fix(triggers): suppress redundant progress-comment DELETE after gadge…
zbigniewsobiecki Apr 29, 2026
c3f29d9
Merge pull request #1222 from mongrel-intelligence/fix/progress-comme…
zbigniewsobiecki Apr 29, 2026
b2ba6a3
chore(plan): 017/1 lock
zbigniewsobiecki Apr 29, 2026
cfb0afa
fix(router): consolidate PM-ack dispatch via manifest registry, resto…
zbigniewsobiecki Apr 29, 2026
71ac7c2
Merge pull request #1223 from mongrel-intelligence/fix/pm-ack-coverage
zbigniewsobiecki Apr 29, 2026
2638a37
chore(plan): 017/2 lock
zbigniewsobiecki Apr 29, 2026
532dda5
fix(router): wrap PM-source dispatch in PM-provider scope so capacity…
zbigniewsobiecki Apr 29, 2026
b03fb8e
chore(spec): 017 done — all three plans complete (router-side silent-…
zbigniewsobiecki Apr 29, 2026
515b30f
Merge pull request #1219 from mongrel-intelligence/feature/alerting-r…
zbigniewsobiecki Apr 29, 2026
a080cd1
fix(integration-tests): wrap implementation-trigger handle() in withP…
zbigniewsobiecki Apr 29, 2026
80421d4
Merge pull request #1224 from mongrel-intelligence/fix/capacity-gate-…
zbigniewsobiecki Apr 29, 2026
8e68ff9
fix(router): replace in-memory PM coalesce window with BullMQ delayed…
Apr 29, 2026
73e7e91
ci: add Redis service to integration-tests job
Apr 29, 2026
8f9e187
fix(router): address review feedback on PM coalesce deferred ack
Apr 29, 2026
6704274
chore(worker): remove now-unused biome complexity suppression
Apr 29, 2026
22cde8f
fix(router): address lock-leak and active-job-status issues in coales…
Apr 29, 2026
182b472
test(coverage): cover deferred-ack worker path + coalesce-config
Apr 29, 2026
a08b537
fix(router): capture coalesce-schedule failures to Sentry + rename ac…
zbigniewsobiecki Apr 29, 2026
8f9da02
Merge pull request #1226 from mongrel-intelligence/fix/pm-coalesce-bu…
zbigniewsobiecki Apr 29, 2026
dfd5271
fix(router): sanitize Docker-invalid chars in jobId when building wor…
zbigniewsobiecki Apr 29, 2026
47478b2
Merge pull request #1228 from mongrel-intelligence/fix/coalesce-conta…
zbigniewsobiecki Apr 29, 2026
096fd40
fix(router): coalesce uses unique jobIds + name-as-coalesceKey to sto…
zbigniewsobiecki Apr 29, 2026
1c339b6
fix(router): rmi snapshot image on PR merge so it isn't leaked to dis…
zbigniewsobiecki Apr 29, 2026
004f29f
Merge remote-tracking branch 'origin/main' into dev
zbigniewsobiecki Apr 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 71 additions & 57 deletions src/router/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,83 +138,97 @@ export async function addJob(job: CascadeJob): Promise<string> {
}

export interface ScheduleCoalescedJobResult {
/** The unique BullMQ job id for the newly-scheduled delayed job. */
jobId: string;
/** True when a prior pending (delayed/waiting) job for the same coalesceKey was removed. */
superseded: boolean;
/**
* Data from the superseded delayed/waiting job. Present when
* `superseded === true`. Used by the caller to release the orphaned
* in-memory locks that were marked for the previous dispatch — those locks
* are never released via `worker.on('failed')` because BullMQ's `remove()`
* does not fire that event.
* Data from the first superseded pending job (when `superseded === true`).
* Used by the caller to release the orphaned in-memory locks that were
* marked for the previous dispatch — those locks are never released via
* `worker.on('failed')` because BullMQ's `remove()` does not fire that event.
*/
supersededJobData?: CascadeJob;
/**
* True when a job with the same coalesce ID is already active (running).
* BullMQ silently ignores `add()` for a duplicate active jobId, so we skip
* the `add()` call entirely and return this flag instead. The caller must
* NOT mark new in-memory locks — no new job was created.
*/
activeExists?: boolean;
}

/**
* Schedule a PM job as a BullMQ delayed job keyed by `coalesceKey`.
* Schedule a PM job as a BullMQ delayed job, coalescing within `delayMs` of
* other events with the same `coalesceKey`.
*
* **Identifier strategy.** Each call produces a UNIQUE jobId
* (`coalesce:${coalesceKey}:${timestamp}-${rand}`) and stores `coalesceKey`
* as the BullMQ "job name" — that name is what we filter by when locating
* prior pending jobs to supersede. Reusing a deterministic
* `coalesce:${coalesceKey}` jobId (the prior design) was a live bug:
* BullMQ's `add(name, data, { jobId })` is a silent no-op when a job with
* that id already exists in the completed/failed/active set, and BullMQ
* keeps completed jobs for 24h via `removeOnComplete: { age: 86400 }` —
* so any new event for a coalesceKey whose previous job had already
* completed within 24h was silently dropped. (Live incident 2026-04-29:
* splitting agent for `MNG-422` was lost because the same-id planning job
* was still running when the splitting webhook arrived.)
*
* If a delayed/waiting job with the same key already exists it is removed
* before the new job is added, superseding the previous dispatch. Active
* (already running) jobs are left untouched and `activeExists` is returned
* as `true` so the caller can skip lock marking.
* **Supersede semantics.** Only `'delayed'` and `'waiting'` jobs supersede:
* those are the dedup targets — multiple webhooks within the 10s window
* for the same `(projectId, workItemId)`. Active jobs are NOT considered
* (they're busy doing the previous unit of work; the new event becomes its
* own delayed dispatch behind it). Completed/failed jobs are NOT considered
* (they're done — the new event is real new intent and must run).
*
* This replaces the in-memory `create-coalesce-window.ts` mechanism with a
* durable, per-key deduplication that coalesces across any agent types for
* the same `${projectId}:${workItemId}` within the settle window.
* **Concurrency.** The getDelayed → getWaiting → filter → remove → add
* sequence is not atomic. Two concurrent schedules for the same coalesceKey
* may both observe the same prior pending job, both attempt to remove it
* (one wins, the other no-ops), then both add() new jobs with distinct
* unique jobIds. The result is up to two delayed jobs firing — equivalent
* to two unrelated webhooks landing back-to-back, which the downstream
* pipeline already handles via the in-flight work-item lock. The prior
* deterministic-id design had a worse failure mode (silent drop); this
* accepts a rare extra-firing in exchange for never losing events.
*/
export async function scheduleCoalescedJob(
job: CascadeJob,
coalesceKey: string,
delayMs: number,
): Promise<ScheduleCoalescedJobResult> {
const jobId = `coalesce:${coalesceKey}`;
// Build a colon-free unique jobId. BullMQ rejects custom ids that contain
// `:` unless the id has exactly 3 colon-separated parts (legacy repeatable-
// job compatibility); the prior deterministic `coalesce:${coalesceKey}`
// happened to have 3 parts (`coalesce`, projectId, workItemId) so it
// passed, but a 4th `:${timestamp}` segment would not. Using `_` as the
// internal separator also keeps the id compatible with Docker container
// names (which reject colons — verified by the spec-017 follow-up
// hotfix at src/router/container-manager.ts:485).
const safeKey = coalesceKey.replace(/:/g, '_');
const newJobId = `coalesce_${safeKey}_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`;

// Find any pending (delayed/waiting) jobs for the same coalesceKey by
// matching the BullMQ "job name". Note: getDelayed/getWaiting do NOT
// include active/completed/failed jobs — the supersede behavior is by
// design scoped to "events that haven't fired yet".
const [delayed, waiting] = await Promise.all([jobQueue.getDelayed(), jobQueue.getWaiting()]);
const pending = [...delayed, ...waiting].filter((j) => j.name === coalesceKey);

let superseded = false;
let supersededJobData: CascadeJob | undefined;

// Remove any existing delayed/waiting job with the same key so the new
// job supersedes it. Active jobs are left alone — they are already running.
//
// TOCTOU NOTE: The getJob → getState → remove → add sequence is not atomic.
// Two concurrent webhook handlers for the same coalesceKey can both read the
// existing delayed job, both attempt remove() (the second no-ops silently),
// and then both call add() — but BullMQ silently ignores a duplicate jobId
// for a non-completed job, so the second event's data is lost. In practice
// this race is rare: the coalesce window exists for events tens-to-hundreds
// of milliseconds apart, not truly simultaneous arrivals. A Lua-script
// atomic compare-and-replace would close this, but the operational impact is
// low enough that a documented best-effort approach is acceptable here.
const existing = await jobQueue.getJob(jobId);
if (existing) {
const state = await existing.getState();
if (state === 'delayed' || state === 'waiting') {
// Capture job data before removal so the caller can release orphaned locks.
supersededJobData = existing.data;
await existing.remove();
superseded = true;
} else if (state === 'active') {
// An active (running) job already holds this ID. BullMQ would
// silently ignore add() for a duplicate active jobId — no new job
// would be created, but the caller wouldn't know and would mark
// locks incorrectly. Return activeExists=true so the caller can
// log accurately and skip marking new in-memory locks.
logger.info('Coalesced job skipped — active job with same ID already running', {
jobId,
coalesceKey,
});
return { jobId, superseded: false, activeExists: true };
}
if (pending.length > 0) {
// Capture the first job's data for lock cleanup. Multiple concurrent
// schedules for the same key are uncommon (the window is 10s), but
// remove() ALL matching pending jobs to keep the queue tidy.
supersededJobData = pending[0].data as CascadeJob;
await Promise.all(pending.map((j) => j.remove()));
superseded = true;
}

await jobQueue.add(job.type, job, { jobId, delay: delayMs });
logger.info('Coalesced job scheduled', { jobId, coalesceKey, delayMs, superseded });
return { jobId, superseded, supersededJobData };
await jobQueue.add(coalesceKey, job, { jobId: newJobId, delay: delayMs });
logger.info('Coalesced job scheduled', {
jobId: newJobId,
coalesceKey,
delayMs,
superseded,
supersededCount: pending.length,
});

return { jobId: newJobId, superseded, supersededJobData };
}

// Get queue stats
Expand Down
24 changes: 22 additions & 2 deletions src/router/snapshot-cleanup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import Docker from 'dockerode';
import { captureException } from '../sentry.js';
import { logger } from '../utils/logging.js';
import { routerConfig } from './config.js';
import { evictSnapshots, type SnapshotMetadata } from './snapshot-manager.js';
import { evictSnapshots, invalidateSnapshot, type SnapshotMetadata } from './snapshot-manager.js';

const SNAPSHOT_CLEANUP_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes

Expand Down Expand Up @@ -77,7 +77,7 @@ function dockerStatusCode(err: unknown): number | undefined {
* a running container is preserved (Docker returns 409). 404 means the image
* has already been removed by some other path. Both are harmless and silent.
*/
async function removeSnapshotImage(metadata: SnapshotMetadata): Promise<void> {
export async function removeSnapshotImage(metadata: SnapshotMetadata): Promise<void> {
try {
await docker.getImage(metadata.imageName).remove({ force: false });
logger.info('[SnapshotCleanup] Removed snapshot image:', {
Expand Down Expand Up @@ -130,3 +130,23 @@ export async function runSnapshotCleanup(): Promise<void> {

logger.info('[SnapshotCleanup] Cleanup pass complete:', { count: evicted.length });
}

/**
* Eagerly invalidate a snapshot AND remove its Docker image. Called on PR
* merge: the snapshot was built for a specific work-item state and is no
* longer valid. Without this, `invalidateSnapshot` alone clears the registry
* entry but leaves the image in Docker storage — and the periodic 5-min
* cleanup loop iterates registry entries only, so the orphaned image is
* leaked permanently. That leak is what filled prod disk to 100% on
* 2026-04-29.
*
* Safe to call when no snapshot exists for the pair (no-op).
*/
export async function invalidateAndRemoveSnapshot(
projectId: string,
workItemId: string,
): Promise<void> {
const removed = invalidateSnapshot(projectId, workItemId);
if (!removed) return;
await removeSnapshotImage(removed);
}
18 changes: 14 additions & 4 deletions src/router/snapshot-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,17 +155,27 @@ export function getSnapshot(

/**
* Invalidate (remove) snapshot metadata for a project+workItem pair.
* Safe to call even if no snapshot exists.
* Returns the removed metadata so the caller can `docker rmi` the underlying
* image. Returns undefined when no entry was registered. Removing the in-memory
* entry without removing the image would orphan the image (the periodic cleanup
* loop iterates registry entries only) — callers MUST act on the returned
* metadata. See snapshot-cleanup.invalidateAndRemoveSnapshot for the canonical
* caller.
*/
export function invalidateSnapshot(projectId: string, workItemId: string): void {
export function invalidateSnapshot(
projectId: string,
workItemId: string,
): SnapshotMetadata | undefined {
const key = snapshotKey(projectId, workItemId);
const hadEntry = snapshots.delete(key);
if (hadEntry) {
const removed = snapshots.get(key);
snapshots.delete(key);
if (removed) {
logger.info('[SnapshotManager] Snapshot invalidated:', {
projectId,
workItemId,
});
}
return removed;
}

/**
Expand Down
25 changes: 6 additions & 19 deletions src/router/webhook-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,31 +164,18 @@ export async function processRouterWebhook(
}

// Schedule as a delayed BullMQ job; supersedes any prior pending job
// with the same key so only the latest event fires.
// with the same key so only the latest event fires within the window.
// Each schedule produces a UNIQUE jobId — active/completed/failed jobs
// for the same coalesceKey do NOT block a new schedule (the prior
// deterministic-id design silently dropped events; see the
// `scheduleCoalescedJob` JSDoc for the live MNG-422 incident).
try {
const { superseded, supersededJobData, activeExists } = await scheduleCoalescedJob(
const { superseded, supersededJobData } = await scheduleCoalescedJob(
job,
result.coalesceKey,
windowMs,
);

// When an active job is already running for this coalesceKey, BullMQ
// would silently ignore any new add(). No new job was created, so skip
// lock marking and return an accurate decision reason.
if (activeExists) {
logger.info(`${adapter.type} coalesced dispatch skipped — active job already running`, {
agentType: result.agentType,
workItemId: result.workItemId,
projectId: project.id,
coalesceKey: result.coalesceKey,
});
return {
shouldProcess: true,
projectId: project.id,
decisionReason: `Coalesced dispatch skipped: active job already running for work item ${result.workItemId ?? '(unknown)'}`,
};
}

if (superseded) {
logger.info(`${adapter.type} coalesced dispatch superseded prior pending job`, {
agentType: result.agentType,
Expand Down
17 changes: 12 additions & 5 deletions src/triggers/github/pr-merged.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { githubClient } from '../../github/client.js';
import { getPMProvider } from '../../pm/context.js';
import { resolveProjectPMConfig } from '../../pm/lifecycle.js';
import { invalidateSnapshot } from '../../router/snapshot-manager.js';
import { invalidateAndRemoveSnapshot } from '../../router/snapshot-cleanup.js';
import type { TriggerContext, TriggerHandler, TriggerResult } from '../../types/index.js';
import { logger } from '../../utils/logging.js';
import { parseRepoFullName } from '../../utils/repo.js';
Expand Down Expand Up @@ -47,10 +47,17 @@ export class PRMergedTrigger implements TriggerHandler {
return null;
}

// Fire-and-forget: invalidate any stale snapshot for this work item now that
// the PR is merged. The snapshot was built for a specific state of the work
// item and is no longer valid after the work is done.
invalidateSnapshot(ctx.project.id, workItemId);
// Fire-and-forget: invalidate the registry entry AND `docker rmi` the
// underlying image. Registry-only invalidation orphans the image (the
// periodic cleanup loop iterates registry entries) — see 2026-04-29
// disk-fill incident.
void invalidateAndRemoveSnapshot(ctx.project.id, workItemId).catch((err: unknown) => {
logger.warn('Failed to invalidate+remove snapshot on PR merge', {
projectId: ctx.project.id,
workItemId,
error: String(err),
});
});

const pmConfig = resolveProjectPMConfig(ctx.project);
const mergedStatus = pmConfig.statuses.merged;
Expand Down
Loading
Loading