diff --git a/src/router/adapters/github.ts b/src/router/adapters/github.ts index 3aba9e96..d699f8d8 100644 --- a/src/router/adapters/github.ts +++ b/src/router/adapters/github.ts @@ -215,8 +215,6 @@ export class GitHubRouterAdapter implements RouterPlatformAdapter { payload: unknown, _project: RouterProjectConfig, result: TriggerResult, - ackCommentId: string | number | undefined, - ackMessage?: string, ): CascadeJob { const job: GitHubJob = { type: 'github', @@ -225,8 +223,6 @@ export class GitHubRouterAdapter implements RouterPlatformAdapter { eventType: event.eventType, repoFullName: (event as GitHubParsedEvent).repoFullName, receivedAt: new Date().toISOString(), - ackCommentId: ackCommentId as number | undefined, - ackMessage, triggerResult: result, }; return job; diff --git a/src/router/adapters/jira.ts b/src/router/adapters/jira.ts index cff4f0e1..d4b8dd74 100644 --- a/src/router/adapters/jira.ts +++ b/src/router/adapters/jira.ts @@ -158,7 +158,6 @@ export class JiraRouterAdapter implements RouterPlatformAdapter { payload: unknown, project: RouterProjectConfig, result: TriggerResult, - ackCommentId: string | number | undefined, ): CascadeJob { const jiraEvent = event as JiraParsedEvent; const job: JiraJob = { @@ -169,7 +168,6 @@ export class JiraRouterAdapter implements RouterPlatformAdapter { issueKey: jiraEvent.issueKey, webhookEvent: jiraEvent.webhookEvent, receivedAt: new Date().toISOString(), - ackCommentId: ackCommentId as string | undefined, triggerResult: result, }; return job; diff --git a/src/router/adapters/trello.ts b/src/router/adapters/trello.ts index cb20ea66..45c9a366 100644 --- a/src/router/adapters/trello.ts +++ b/src/router/adapters/trello.ts @@ -152,7 +152,6 @@ export class TrelloRouterAdapter implements RouterPlatformAdapter { payload: unknown, project: RouterProjectConfig, result: TriggerResult, - ackCommentId: string | number | undefined, ): CascadeJob { const job: TrelloJob = { type: 'trello', @@ -162,7 +161,6 @@ export class TrelloRouterAdapter implements RouterPlatformAdapter { cardId: event.workItemId ?? '', actionType: event.eventType, receivedAt: new Date().toISOString(), - ackCommentId: ackCommentId as string | undefined, triggerResult: result, }; return job; diff --git a/src/router/platform-adapter.ts b/src/router/platform-adapter.ts index beac1053..e9ffc3b7 100644 --- a/src/router/platform-adapter.ts +++ b/src/router/platform-adapter.ts @@ -112,14 +112,14 @@ export interface RouterPlatformAdapter { /** * Build the `CascadeJob` to be enqueued. + * The job is built without ack info — `ackCommentId` and `ackMessage` + * are patched onto the job via `updateData()` after the ack is posted. */ buildJob( event: ParsedWebhookEvent, payload: unknown, project: RouterProjectConfig, result: TriggerResult, - ackCommentId: string | number | undefined, - ackMessage?: string, ): CascadeJob; /** diff --git a/src/router/webhook-processor.ts b/src/router/webhook-processor.ts index a86a9527..20914d18 100644 --- a/src/router/webhook-processor.ts +++ b/src/router/webhook-processor.ts @@ -18,7 +18,7 @@ import { markRecentlyDispatched, } from './agent-type-lock.js'; import type { RouterPlatformAdapter } from './platform-adapter.js'; -import { addJob } from './queue.js'; +import { type CascadeJob, addJob, jobQueue } from './queue.js'; import { isWorkItemLocked, markWorkItemEnqueued } from './work-item-lock.js'; export interface ProcessRouterWebhookResult { @@ -38,9 +38,10 @@ export interface ProcessRouterWebhookResult { * 5. Resolve project config * 6. Dispatch triggers with platform credential scope * 7. Work-item concurrency lock check - * 8. Post acknowledgment comment - * 9. Build and enqueue job - * 10. Fire optional pre-actions (e.g. GitHub 👀 reaction) + * 8. Build job (without ack info) + * 9. Fire optional pre-actions (e.g. GitHub 👀 reaction) + * 10. Enqueue job to Redis (durable) + * 11. Post acknowledgment comment and patch ack info onto enqueued job */ // biome-ignore lint/complexity/noExcessiveCognitiveComplexity: webhook pipeline with sequential guard checks export async function processRouterWebhook( @@ -142,20 +143,16 @@ export async function processRouterWebhook( } } - // Step 8: Post acknowledgment comment - const ackResult = await adapter.postAck(event, payload, project, result.agentType); - const ackCommentId = ackResult?.commentId; - const ackMessage = ackResult?.message; - - // Step 9: Build job - const job = adapter.buildJob(event, payload, project, result, ackCommentId, ackMessage); + // Step 8: Build job (without ack info — patched after ack is posted) + const job = adapter.buildJob(event, payload, project, result); - // Step 10: Fire optional pre-actions (fire-and-forget) + // Step 9: Fire optional pre-actions (fire-and-forget) adapter.firePreActions?.(job, payload); - // Enqueue + // Step 10: Enqueue — job is now durable in Redis + let jobId: string | undefined; try { - const jobId = await addJob(job); + jobId = await addJob(job); if (result.workItemId) { markWorkItemEnqueued(project.id, result.workItemId); } @@ -166,7 +163,6 @@ export async function processRouterWebhook( logger.info(`${adapter.type} job queued`, { jobId, eventType: event.eventType, - ackCommentId, }); } catch (err) { logger.error(`Failed to queue ${adapter.type} job`, { @@ -174,6 +170,33 @@ export async function processRouterWebhook( eventType: event.eventType, workItemId: event.workItemId, }); + return { shouldProcess: true, projectId: project.id }; + } + + // Step 11: Post acknowledgment comment and patch ack info onto the enqueued job. + // If the router crashes between enqueue and ack, the worker runs without an ack + // comment (acceptable). If ack succeeds, we update the job data in Redis. + const ackResult = await adapter.postAck(event, payload, project, result.agentType); + if (ackResult?.commentId != null && jobId) { + try { + const enqueuedJob = await jobQueue.getJob(jobId); + if (enqueuedJob) { + const patched = { + ...enqueuedJob.data, + ackCommentId: ackResult.commentId, + ackMessage: ackResult.message, + }; + // BullMQ's updateData generic reduces union to never; safe cast + await (enqueuedJob.updateData as (data: CascadeJob) => Promise)( + patched as CascadeJob, + ); + } + } catch (err) { + logger.warn('Failed to update job with ack comment ID (non-fatal)', { + jobId, + error: String(err), + }); + } } return { shouldProcess: true, projectId: project.id }; diff --git a/src/triggers/github/check-suite-success.ts b/src/triggers/github/check-suite-success.ts index b6fd20dc..d8d727d2 100644 --- a/src/triggers/github/check-suite-success.ts +++ b/src/triggers/github/check-suite-success.ts @@ -6,7 +6,7 @@ import { checkTriggerEnabledWithParams } from '../shared/trigger-check.js'; import { type GitHubCheckSuitePayload, isGitHubCheckSuitePayload } from './types.js'; import { evaluateAuthorMode, resolveWorkItemId } from './utils.js'; -const MAX_RETRIES = 5; +const MAX_RETRIES = 12; const RETRY_DELAY_MS = 10_000; /** In-memory dedup for review triggers on the same PR+SHA (prevents duplicate reviews from multiple check_suite webhooks) */ diff --git a/src/triggers/github/webhook-handler.ts b/src/triggers/github/webhook-handler.ts index c69f6989..f7dba689 100644 --- a/src/triggers/github/webhook-handler.ts +++ b/src/triggers/github/webhook-handler.ts @@ -8,7 +8,7 @@ * - GitHub-specific AgentExecutionConfig → ./integration.ts */ -import { withGitHubToken } from '../../github/client.js'; +import { githubClient, withGitHubToken } from '../../github/client.js'; import { getPersonaToken, resolvePersonaIdentities } from '../../github/personas.js'; import { withPMCredentials, withPMProvider } from '../../pm/context.js'; import { createPMProvider, pmRegistry } from '../../pm/index.js'; @@ -30,6 +30,8 @@ import { setProcessing, startWatchdog, } from '../../utils/index.js'; +import { parseRepoFullName } from '../../utils/repo.js'; +import { safeOperation } from '../../utils/safeOperation.js'; import type { TriggerRegistry } from '../registry.js'; import { runAgentWithCredentials } from '../shared/webhook-execution.js'; import { processNextQueuedWebhook } from '../shared/webhook-queue.js'; @@ -183,6 +185,7 @@ async function runGitHubAgent( } } +// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: webhook orchestration with ack cleanup export async function processGitHubWebhook( payload: unknown, eventType: string, @@ -237,7 +240,21 @@ export async function processGitHubWebhook( if (result.waitForChecks) { const githubToken = await getPersonaToken(project.id, 'implementation'); const checksOk = await pollWaitForChecks(result, event.projectIdentifier, githubToken); - if (!checksOk) return; + if (!checksOk) { + // Clean up orphaned ack comment so the PR doesn't show a misleading "Reviewing" message + const ackId = result.agentInput.ackCommentId as number | undefined; + if (ackId && event.projectIdentifier) { + const { owner, repo } = parseRepoFullName(event.projectIdentifier); + const deleteToken = await getPersonaToken(project.id, result.agentType ?? 'implementation'); + await withGitHubToken(deleteToken, () => + safeOperation(() => githubClient.deletePRComment(owner, repo, ackId), { + action: 'delete ack comment after check polling timeout', + prNumber: result.prNumber, + }), + ); + } + return; + } } logger.info('GitHub trigger matched', { diff --git a/tests/unit/router/adapters/github.test.ts b/tests/unit/router/adapters/github.test.ts index 576c24d8..9ceba5d8 100644 --- a/tests/unit/router/adapters/github.test.ts +++ b/tests/unit/router/adapters/github.test.ts @@ -354,11 +354,10 @@ describe('GitHubRouterAdapter', () => { {}, mockProject, result as never, - 42, ); expect(job.type).toBe('github'); expect((job as GitHubJob).repoFullName).toBe('owner/repo'); - expect((job as GitHubJob).ackCommentId).toBe(42); + expect((job as GitHubJob).ackCommentId).toBeUndefined(); }); }); diff --git a/tests/unit/router/adapters/jira.test.ts b/tests/unit/router/adapters/jira.test.ts index 2a847339..7e20b962 100644 --- a/tests/unit/router/adapters/jira.test.ts +++ b/tests/unit/router/adapters/jira.test.ts @@ -259,11 +259,10 @@ describe('JiraRouterAdapter', () => { {}, mockProject, result as never, - 'jira-comment-789', ); expect(job.type).toBe('jira'); expect((job as { issueKey: string }).issueKey).toBe('PROJ-1'); - expect((job as { ackCommentId: string }).ackCommentId).toBe('jira-comment-789'); + expect((job as { ackCommentId?: string }).ackCommentId).toBeUndefined(); }); }); }); diff --git a/tests/unit/router/adapters/trello.test.ts b/tests/unit/router/adapters/trello.test.ts index 6eb9f7b0..f29d9df3 100644 --- a/tests/unit/router/adapters/trello.test.ts +++ b/tests/unit/router/adapters/trello.test.ts @@ -268,12 +268,11 @@ describe('TrelloRouterAdapter', () => { { action: { type: 'commentCard' } }, mockProject, result as never, - 'comment-abc', ); expect(job.type).toBe('trello'); expect(job.source).toBe('trello'); expect((job as { cardId: string }).cardId).toBe('card1'); - expect((job as { ackCommentId: string }).ackCommentId).toBe('comment-abc'); + expect((job as { ackCommentId?: string }).ackCommentId).toBeUndefined(); }); }); }); diff --git a/tests/unit/router/webhook-processor.test.ts b/tests/unit/router/webhook-processor.test.ts index ea6006a5..17f63d3f 100644 --- a/tests/unit/router/webhook-processor.test.ts +++ b/tests/unit/router/webhook-processor.test.ts @@ -10,6 +10,12 @@ vi.mock('../../../src/utils/logging.js', () => ({ })); vi.mock('../../../src/router/queue.js', () => ({ addJob: vi.fn(), + jobQueue: { + getJob: vi.fn().mockResolvedValue({ + data: {}, + updateData: vi.fn().mockResolvedValue(undefined), + }), + }, })); vi.mock('../../../src/router/work-item-lock.js', () => ({ isWorkItemLocked: vi.fn().mockResolvedValue({ locked: false }), @@ -24,7 +30,7 @@ vi.mock('../../../src/router/agent-type-lock.js', () => ({ import { checkAgentTypeConcurrency } from '../../../src/router/agent-type-lock.js'; import type { RouterProjectConfig } from '../../../src/router/config.js'; import type { RouterPlatformAdapter } from '../../../src/router/platform-adapter.js'; -import { addJob } from '../../../src/router/queue.js'; +import { addJob, jobQueue } from '../../../src/router/queue.js'; import type { CascadeJob } from '../../../src/router/queue.js'; import { processRouterWebhook } from '../../../src/router/webhook-processor.js'; import { isWorkItemLocked, markWorkItemEnqueued } from '../../../src/router/work-item-lock.js'; @@ -132,21 +138,63 @@ describe('processRouterWebhook', () => { const result = await processRouterWebhook(adapter, {}, mockTriggerRegistry); expect(result.shouldProcess).toBe(true); expect(result.projectId).toBe('p1'); - expect(adapter.postAck).toHaveBeenCalledWith( + // buildJob is called without ack params (ack is patched after enqueue) + expect(adapter.buildJob).toHaveBeenCalledWith( expect.objectContaining({ eventType: 'commentCard' }), expect.anything(), mockProject, - 'implementation', + triggerResult, ); - expect(adapter.buildJob).toHaveBeenCalledWith( + expect(addJob).toHaveBeenCalled(); + // postAck is called after enqueue + expect(adapter.postAck).toHaveBeenCalledWith( expect.objectContaining({ eventType: 'commentCard' }), expect.anything(), mockProject, - triggerResult, - 'comment-abc', - 'Starting...', + 'implementation', + ); + }); + + it('enqueues job before posting ack comment', async () => { + const callOrder: string[] = []; + const triggerResult = { agentType: 'implementation', agentInput: {} }; + vi.mocked(addJob).mockImplementation(async () => { + callOrder.push('addJob'); + return 'job-1'; + }); + const adapter = makeMockAdapter({ + dispatchWithCredentials: vi.fn().mockResolvedValue(triggerResult), + postAck: vi.fn().mockImplementation(async () => { + callOrder.push('postAck'); + return { commentId: 'c1', message: 'ack' }; + }), + }); + + await processRouterWebhook(adapter, {}, mockTriggerRegistry); + expect(callOrder).toEqual(['addJob', 'postAck']); + }); + + it('patches ack info onto enqueued job via updateData', async () => { + const triggerResult = { agentType: 'implementation', agentInput: {} }; + vi.mocked(addJob).mockResolvedValue('job-1'); + const mockUpdateData = vi.fn().mockResolvedValue(undefined); + vi.mocked(jobQueue.getJob).mockResolvedValue({ + data: { type: 'trello', source: 'trello', payload: {} }, + updateData: mockUpdateData, + } as never); + const adapter = makeMockAdapter({ + dispatchWithCredentials: vi.fn().mockResolvedValue(triggerResult), + postAck: vi.fn().mockResolvedValue({ commentId: 'comment-abc', message: 'Starting...' }), + }); + + await processRouterWebhook(adapter, {}, mockTriggerRegistry); + expect(jobQueue.getJob).toHaveBeenCalledWith('job-1'); + expect(mockUpdateData).toHaveBeenCalledWith( + expect.objectContaining({ + ackCommentId: 'comment-abc', + ackMessage: 'Starting...', + }), ); - expect(addJob).toHaveBeenCalled(); }); it('fires pre-actions before queuing', async () => { diff --git a/tests/unit/triggers/github-webhook-handler.test.ts b/tests/unit/triggers/github-webhook-handler.test.ts index 925775b3..b68a09c7 100644 --- a/tests/unit/triggers/github-webhook-handler.test.ts +++ b/tests/unit/triggers/github-webhook-handler.test.ts @@ -33,9 +33,20 @@ vi.mock('../../../src/github/personas.js', () => ({ })); vi.mock('../../../src/github/client.js', () => ({ + githubClient: { + deletePRComment: vi.fn().mockResolvedValue(undefined), + }, withGitHubToken: vi.fn().mockImplementation((_token, fn) => fn()), })); +vi.mock('../../../src/utils/repo.js', () => ({ + parseRepoFullName: vi.fn().mockReturnValue({ owner: 'owner', repo: 'repo' }), +})); + +vi.mock('../../../src/utils/safeOperation.js', () => ({ + safeOperation: vi.fn().mockImplementation((fn) => fn()), +})); + vi.mock('../../../src/pm/context.js', () => ({ withPMCredentials: vi.fn().mockImplementation((_id, _type, _get, fn) => fn()), withPMProvider: vi.fn().mockImplementation((_provider, fn) => fn()), @@ -87,8 +98,10 @@ vi.mock('../../../src/utils/index.js', () => ({ startWatchdog: vi.fn(), })); +import { githubClient } from '../../../src/github/client.js'; import { checkAgentTypeConcurrency } from '../../../src/router/agent-type-lock.js'; import { postAcknowledgmentComment } from '../../../src/triggers/github/ack-comments.js'; +import { pollWaitForChecks } from '../../../src/triggers/github/check-polling.js'; import { processGitHubWebhook } from '../../../src/triggers/github/webhook-handler.js'; import { runAgentWithCredentials } from '../../../src/triggers/shared/webhook-execution.js'; import { @@ -290,4 +303,44 @@ describe('processGitHubWebhook', () => { expect(mockRunAgentWithCredentials).not.toHaveBeenCalled(); expect(mockSetProcessing).not.toHaveBeenCalled(); }); + + it('deletes ack comment when pollWaitForChecks returns false', async () => { + vi.mocked(pollWaitForChecks).mockResolvedValueOnce(false); + const registry = { + dispatch: vi.fn().mockResolvedValue({ + agentType: 'review', + agentInput: { repoFullName: 'owner/repo', headSha: 'abc123' }, + prNumber: 42, + waitForChecks: true, + }), + }; + + await processGitHubWebhook( + validPayload, + 'check_suite', + registry as never, + 999, // ackCommentId from router + '👀 Reviewing', + ); + + expect(vi.mocked(githubClient.deletePRComment)).toHaveBeenCalledWith('owner', 'repo', 999); + expect(mockRunAgentWithCredentials).not.toHaveBeenCalled(); + }); + + it('does not attempt ack deletion when no ackCommentId on check timeout', async () => { + vi.mocked(pollWaitForChecks).mockResolvedValueOnce(false); + const registry = { + dispatch: vi.fn().mockResolvedValue({ + agentType: 'review', + agentInput: { repoFullName: 'owner/repo', headSha: 'abc123' }, + prNumber: 42, + waitForChecks: true, + }), + }; + + await processGitHubWebhook(validPayload, 'check_suite', registry as never); + + expect(vi.mocked(githubClient.deletePRComment)).not.toHaveBeenCalled(); + expect(mockRunAgentWithCredentials).not.toHaveBeenCalled(); + }); });