From d153774e4fe6b6e0e0231be54dbadef6f5ef5c3d Mon Sep 17 00:00:00 2001 From: Cascade Bot Date: Sat, 7 Mar 2026 12:15:30 +0000 Subject: [PATCH] fix(router): post ack comment before enqueuing job to eliminate race condition --- src/router/adapters/github.ts | 3 + src/router/adapters/jira.ts | 2 + src/router/adapters/trello.ts | 2 + src/router/platform-adapter.ts | 5 +- src/router/webhook-processor.ts | 63 ++++++++----------- tests/unit/router/webhook-processor.test.ts | 70 ++++++++++++--------- 6 files changed, 77 insertions(+), 68 deletions(-) diff --git a/src/router/adapters/github.ts b/src/router/adapters/github.ts index d699f8d8..5447a4a0 100644 --- a/src/router/adapters/github.ts +++ b/src/router/adapters/github.ts @@ -215,6 +215,7 @@ export class GitHubRouterAdapter implements RouterPlatformAdapter { payload: unknown, _project: RouterProjectConfig, result: TriggerResult, + ackResult?: AckResult, ): CascadeJob { const job: GitHubJob = { type: 'github', @@ -224,6 +225,8 @@ export class GitHubRouterAdapter implements RouterPlatformAdapter { repoFullName: (event as GitHubParsedEvent).repoFullName, receivedAt: new Date().toISOString(), triggerResult: result, + ackCommentId: ackResult?.commentId as number | undefined, + ackMessage: ackResult?.message, }; return job; } diff --git a/src/router/adapters/jira.ts b/src/router/adapters/jira.ts index d4b8dd74..aa83145a 100644 --- a/src/router/adapters/jira.ts +++ b/src/router/adapters/jira.ts @@ -158,6 +158,7 @@ export class JiraRouterAdapter implements RouterPlatformAdapter { payload: unknown, project: RouterProjectConfig, result: TriggerResult, + ackResult?: AckResult, ): CascadeJob { const jiraEvent = event as JiraParsedEvent; const job: JiraJob = { @@ -169,6 +170,7 @@ export class JiraRouterAdapter implements RouterPlatformAdapter { webhookEvent: jiraEvent.webhookEvent, receivedAt: new Date().toISOString(), triggerResult: result, + ackCommentId: ackResult?.commentId as string | undefined, }; return job; } diff --git a/src/router/adapters/trello.ts b/src/router/adapters/trello.ts index 45c9a366..033c74c1 100644 --- a/src/router/adapters/trello.ts +++ b/src/router/adapters/trello.ts @@ -152,6 +152,7 @@ export class TrelloRouterAdapter implements RouterPlatformAdapter { payload: unknown, project: RouterProjectConfig, result: TriggerResult, + ackResult?: AckResult, ): CascadeJob { const job: TrelloJob = { type: 'trello', @@ -162,6 +163,7 @@ export class TrelloRouterAdapter implements RouterPlatformAdapter { actionType: event.eventType, receivedAt: new Date().toISOString(), triggerResult: result, + ackCommentId: ackResult?.commentId as string | undefined, }; return job; } diff --git a/src/router/platform-adapter.ts b/src/router/platform-adapter.ts index e9ffc3b7..d0693c66 100644 --- a/src/router/platform-adapter.ts +++ b/src/router/platform-adapter.ts @@ -112,14 +112,15 @@ export interface RouterPlatformAdapter { /** * Build the `CascadeJob` to be enqueued. - * The job is built without ack info — `ackCommentId` and `ackMessage` - * are patched onto the job via `updateData()` after the ack is posted. + * The `ackResult` is available at build time (ack is posted before enqueue), + * so `ackCommentId` and `ackMessage` can be embedded directly in the job. */ buildJob( event: ParsedWebhookEvent, payload: unknown, project: RouterProjectConfig, result: TriggerResult, + ackResult?: AckResult, ): CascadeJob; /** diff --git a/src/router/webhook-processor.ts b/src/router/webhook-processor.ts index 885ed984..9b84005e 100644 --- a/src/router/webhook-processor.ts +++ b/src/router/webhook-processor.ts @@ -18,7 +18,7 @@ import { markRecentlyDispatched, } from './agent-type-lock.js'; import type { RouterPlatformAdapter } from './platform-adapter.js'; -import { type CascadeJob, addJob, jobQueue } from './queue.js'; +import { addJob } from './queue.js'; import { isWorkItemLocked, markWorkItemEnqueued } from './work-item-lock.js'; export interface ProcessRouterWebhookResult { @@ -40,10 +40,10 @@ export interface ProcessRouterWebhookResult { * 5. Resolve project config * 6. Dispatch triggers with platform credential scope * 7. Work-item concurrency lock check - * 8. Build job (without ack info) - * 9. Fire optional pre-actions (e.g. GitHub 👀 reaction) - * 10. Enqueue job to Redis (durable) - * 11. Post acknowledgment comment and patch ack info onto enqueued job + * 8. Post acknowledgment comment (ack info available at build time) + * 9. Build job (with ack info embedded) + * 10. Fire optional pre-actions (e.g. GitHub 👀 reaction) + * 11. Enqueue job to Redis (durable) */ // biome-ignore lint/complexity/noExcessiveCognitiveComplexity: webhook pipeline with sequential guard checks export async function processRouterWebhook( @@ -167,16 +167,31 @@ export async function processRouterWebhook( } } - // Step 8: Build job (without ack info — patched after ack is posted) - const job = adapter.buildJob(event, payload, project, result); + // Step 8: Post acknowledgment comment — ack info is now available at job build time + const ackResult = await adapter.postAck(event, payload, project, result.agentType); + if (ackResult?.commentId != null) { + logger.info(`${adapter.type} ack comment posted`, { + ackCommentId: ackResult.commentId, + workItemId: event.workItemId, + }); + } else { + logger.debug( + `${adapter.type} ack returned no comment ID (worker will run without pre-seeded comment)`, + { + workItemId: event.workItemId, + }, + ); + } + + // Step 9: Build job with ack info embedded + const job = adapter.buildJob(event, payload, project, result, ackResult); - // Step 9: Fire optional pre-actions (fire-and-forget) + // Step 10: Fire optional pre-actions (fire-and-forget) adapter.firePreActions?.(job, payload); - // Step 10: Enqueue — job is now durable in Redis - let jobId: string | undefined; + // Step 11: Enqueue — job is now durable in Redis try { - jobId = await addJob(job); + const jobId = await addJob(job); if (result.workItemId) { markWorkItemEnqueued(project.id, result.workItemId); } @@ -201,32 +216,6 @@ export async function processRouterWebhook( }; } - // Step 11: Post acknowledgment comment and patch ack info onto the enqueued job. - // If the router crashes between enqueue and ack, the worker runs without an ack - // comment (acceptable). If ack succeeds, we update the job data in Redis. - const ackResult = await adapter.postAck(event, payload, project, result.agentType); - if (ackResult?.commentId != null && jobId) { - try { - const enqueuedJob = await jobQueue.getJob(jobId); - if (enqueuedJob) { - const patched = { - ...enqueuedJob.data, - ackCommentId: ackResult.commentId, - ackMessage: ackResult.message, - }; - // BullMQ's updateData generic reduces union to never; safe cast - await (enqueuedJob.updateData as (data: CascadeJob) => Promise)( - patched as CascadeJob, - ); - } - } catch (err) { - logger.warn('Failed to update job with ack comment ID (non-fatal)', { - jobId, - error: String(err), - }); - } - } - return { shouldProcess: true, projectId: project.id, diff --git a/tests/unit/router/webhook-processor.test.ts b/tests/unit/router/webhook-processor.test.ts index 71e8d800..f9b32b4c 100644 --- a/tests/unit/router/webhook-processor.test.ts +++ b/tests/unit/router/webhook-processor.test.ts @@ -10,12 +10,6 @@ vi.mock('../../../src/utils/logging.js', () => ({ })); vi.mock('../../../src/router/queue.js', () => ({ addJob: vi.fn(), - jobQueue: { - getJob: vi.fn().mockResolvedValue({ - data: {}, - updateData: vi.fn().mockResolvedValue(undefined), - }), - }, })); vi.mock('../../../src/router/work-item-lock.js', () => ({ isWorkItemLocked: vi.fn().mockResolvedValue({ locked: false }), @@ -30,7 +24,7 @@ vi.mock('../../../src/router/agent-type-lock.js', () => ({ import { checkAgentTypeConcurrency } from '../../../src/router/agent-type-lock.js'; import type { RouterProjectConfig } from '../../../src/router/config.js'; import type { RouterPlatformAdapter } from '../../../src/router/platform-adapter.js'; -import { addJob, jobQueue } from '../../../src/router/queue.js'; +import { addJob } from '../../../src/router/queue.js'; import type { CascadeJob } from '../../../src/router/queue.js'; import { processRouterWebhook } from '../../../src/router/webhook-processor.js'; import { isWorkItemLocked, markWorkItemEnqueued } from '../../../src/router/work-item-lock.js'; @@ -144,24 +138,24 @@ describe('processRouterWebhook', () => { expect(result.shouldProcess).toBe(true); expect(result.projectId).toBe('p1'); expect(result.decisionReason).toMatch(/Job queued: implementation agent for work item/); - // buildJob is called without ack params (ack is patched after enqueue) - expect(adapter.buildJob).toHaveBeenCalledWith( + // postAck is called before buildJob — ack info is embedded at build time + expect(adapter.postAck).toHaveBeenCalledWith( expect.objectContaining({ eventType: 'commentCard' }), expect.anything(), mockProject, - triggerResult, + 'implementation', ); - expect(addJob).toHaveBeenCalled(); - // postAck is called after enqueue - expect(adapter.postAck).toHaveBeenCalledWith( + expect(adapter.buildJob).toHaveBeenCalledWith( expect.objectContaining({ eventType: 'commentCard' }), expect.anything(), mockProject, - 'implementation', + triggerResult, + { commentId: 'comment-abc', message: 'Starting...' }, ); + expect(addJob).toHaveBeenCalled(); }); - it('enqueues job before posting ack comment', async () => { + it('posts ack comment before enqueuing job', async () => { const callOrder: string[] = []; const triggerResult = { agentType: 'implementation', agentInput: {} }; vi.mocked(addJob).mockImplementation(async () => { @@ -177,32 +171,50 @@ describe('processRouterWebhook', () => { }); await processRouterWebhook(adapter, {}, mockTriggerRegistry); - expect(callOrder).toEqual(['addJob', 'postAck']); + expect(callOrder).toEqual(['postAck', 'addJob']); }); - it('patches ack info onto enqueued job via updateData', async () => { + it('calls buildJob with ackResult when postAck returns a result', async () => { const triggerResult = { agentType: 'implementation', agentInput: {} }; + const ackResult = { commentId: 'comment-abc', message: 'Starting...' }; vi.mocked(addJob).mockResolvedValue('job-1'); - const mockUpdateData = vi.fn().mockResolvedValue(undefined); - vi.mocked(jobQueue.getJob).mockResolvedValue({ - data: { type: 'trello', source: 'trello', payload: {} }, - updateData: mockUpdateData, - } as never); const adapter = makeMockAdapter({ dispatchWithCredentials: vi.fn().mockResolvedValue(triggerResult), - postAck: vi.fn().mockResolvedValue({ commentId: 'comment-abc', message: 'Starting...' }), + postAck: vi.fn().mockResolvedValue(ackResult), }); await processRouterWebhook(adapter, {}, mockTriggerRegistry); - expect(jobQueue.getJob).toHaveBeenCalledWith('job-1'); - expect(mockUpdateData).toHaveBeenCalledWith( - expect.objectContaining({ - ackCommentId: 'comment-abc', - ackMessage: 'Starting...', - }), + // buildJob must receive ackResult as the 5th argument + expect(adapter.buildJob).toHaveBeenCalledWith( + expect.objectContaining({ eventType: 'commentCard' }), + expect.anything(), + mockProject, + triggerResult, + ackResult, ); }); + it('still enqueues job when postAck returns undefined', async () => { + const triggerResult = { agentType: 'implementation', agentInput: {} }; + vi.mocked(addJob).mockResolvedValue('job-1'); + const adapter = makeMockAdapter({ + dispatchWithCredentials: vi.fn().mockResolvedValue(triggerResult), + postAck: vi.fn().mockResolvedValue(undefined), + }); + + await processRouterWebhook(adapter, {}, mockTriggerRegistry); + // buildJob is called with undefined ackResult + expect(adapter.buildJob).toHaveBeenCalledWith( + expect.objectContaining({ eventType: 'commentCard' }), + expect.anything(), + mockProject, + triggerResult, + undefined, + ); + // Job is still enqueued even without ack + expect(addJob).toHaveBeenCalled(); + }); + it('fires pre-actions before queuing', async () => { const triggerResult = { agentType: 'implementation', agentInput: {} }; vi.mocked(addJob).mockResolvedValue('job-1');