Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/router/adapters/github.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ export class GitHubRouterAdapter implements RouterPlatformAdapter {
payload: unknown,
_project: RouterProjectConfig,
result: TriggerResult,
ackResult?: AckResult,
): CascadeJob {
const job: GitHubJob = {
type: 'github',
Expand All @@ -224,6 +225,8 @@ export class GitHubRouterAdapter implements RouterPlatformAdapter {
repoFullName: (event as GitHubParsedEvent).repoFullName,
receivedAt: new Date().toISOString(),
triggerResult: result,
ackCommentId: ackResult?.commentId as number | undefined,
ackMessage: ackResult?.message,
};
return job;
}
Expand Down
2 changes: 2 additions & 0 deletions src/router/adapters/jira.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ export class JiraRouterAdapter implements RouterPlatformAdapter {
payload: unknown,
project: RouterProjectConfig,
result: TriggerResult,
ackResult?: AckResult,
): CascadeJob {
const jiraEvent = event as JiraParsedEvent;
const job: JiraJob = {
Expand All @@ -169,6 +170,7 @@ export class JiraRouterAdapter implements RouterPlatformAdapter {
webhookEvent: jiraEvent.webhookEvent,
receivedAt: new Date().toISOString(),
triggerResult: result,
ackCommentId: ackResult?.commentId as string | undefined,
};
return job;
}
Expand Down
2 changes: 2 additions & 0 deletions src/router/adapters/trello.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ export class TrelloRouterAdapter implements RouterPlatformAdapter {
payload: unknown,
project: RouterProjectConfig,
result: TriggerResult,
ackResult?: AckResult,
): CascadeJob {
const job: TrelloJob = {
type: 'trello',
Expand All @@ -162,6 +163,7 @@ export class TrelloRouterAdapter implements RouterPlatformAdapter {
actionType: event.eventType,
receivedAt: new Date().toISOString(),
triggerResult: result,
ackCommentId: ackResult?.commentId as string | undefined,
};
return job;
}
Expand Down
5 changes: 3 additions & 2 deletions src/router/platform-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,15 @@ export interface RouterPlatformAdapter {

/**
* Build the `CascadeJob` to be enqueued.
* The job is built without ack info — `ackCommentId` and `ackMessage`
* are patched onto the job via `updateData()` after the ack is posted.
* The `ackResult` is available at build time (ack is posted before enqueue),
* so `ackCommentId` and `ackMessage` can be embedded directly in the job.
*/
buildJob(
event: ParsedWebhookEvent,
payload: unknown,
project: RouterProjectConfig,
result: TriggerResult,
ackResult?: AckResult,
): CascadeJob;

/**
Expand Down
63 changes: 26 additions & 37 deletions src/router/webhook-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
markRecentlyDispatched,
} from './agent-type-lock.js';
import type { RouterPlatformAdapter } from './platform-adapter.js';
import { type CascadeJob, addJob, jobQueue } from './queue.js';
import { addJob } from './queue.js';
import { isWorkItemLocked, markWorkItemEnqueued } from './work-item-lock.js';

export interface ProcessRouterWebhookResult {
Expand All @@ -40,10 +40,10 @@ export interface ProcessRouterWebhookResult {
* 5. Resolve project config
* 6. Dispatch triggers with platform credential scope
* 7. Work-item concurrency lock check
* 8. Build job (without ack info)
* 9. Fire optional pre-actions (e.g. GitHub 👀 reaction)
* 10. Enqueue job to Redis (durable)
* 11. Post acknowledgment comment and patch ack info onto enqueued job
* 8. Post acknowledgment comment (ack info available at build time)
* 9. Build job (with ack info embedded)
* 10. Fire optional pre-actions (e.g. GitHub 👀 reaction)
* 11. Enqueue job to Redis (durable)
*/
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: webhook pipeline with sequential guard checks
export async function processRouterWebhook(
Expand Down Expand Up @@ -167,16 +167,31 @@ export async function processRouterWebhook(
}
}

// Step 8: Build job (without ack info — patched after ack is posted)
const job = adapter.buildJob(event, payload, project, result);
// Step 8: Post acknowledgment comment — ack info is now available at job build time
const ackResult = await adapter.postAck(event, payload, project, result.agentType);
if (ackResult?.commentId != null) {
logger.info(`${adapter.type} ack comment posted`, {
ackCommentId: ackResult.commentId,
workItemId: event.workItemId,
});
} else {
logger.debug(
`${adapter.type} ack returned no comment ID (worker will run without pre-seeded comment)`,
{
workItemId: event.workItemId,
},
);
}

// Step 9: Build job with ack info embedded
const job = adapter.buildJob(event, payload, project, result, ackResult);

// Step 9: Fire optional pre-actions (fire-and-forget)
// Step 10: Fire optional pre-actions (fire-and-forget)
adapter.firePreActions?.(job, payload);

// Step 10: Enqueue — job is now durable in Redis
let jobId: string | undefined;
// Step 11: Enqueue — job is now durable in Redis
try {
jobId = await addJob(job);
const jobId = await addJob(job);
if (result.workItemId) {
markWorkItemEnqueued(project.id, result.workItemId);
}
Expand All @@ -201,32 +216,6 @@ export async function processRouterWebhook(
};
}

// Step 11: Post acknowledgment comment and patch ack info onto the enqueued job.
// If the router crashes between enqueue and ack, the worker runs without an ack
// comment (acceptable). If ack succeeds, we update the job data in Redis.
const ackResult = await adapter.postAck(event, payload, project, result.agentType);
if (ackResult?.commentId != null && jobId) {
try {
const enqueuedJob = await jobQueue.getJob(jobId);
if (enqueuedJob) {
const patched = {
...enqueuedJob.data,
ackCommentId: ackResult.commentId,
ackMessage: ackResult.message,
};
// BullMQ's updateData generic reduces union to never; safe cast
await (enqueuedJob.updateData as (data: CascadeJob) => Promise<void>)(
patched as CascadeJob,
);
}
} catch (err) {
logger.warn('Failed to update job with ack comment ID (non-fatal)', {
jobId,
error: String(err),
});
}
}

return {
shouldProcess: true,
projectId: project.id,
Expand Down
70 changes: 41 additions & 29 deletions tests/unit/router/webhook-processor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@ vi.mock('../../../src/utils/logging.js', () => ({
}));
vi.mock('../../../src/router/queue.js', () => ({
addJob: vi.fn(),
jobQueue: {
getJob: vi.fn().mockResolvedValue({
data: {},
updateData: vi.fn().mockResolvedValue(undefined),
}),
},
}));
vi.mock('../../../src/router/work-item-lock.js', () => ({
isWorkItemLocked: vi.fn().mockResolvedValue({ locked: false }),
Expand All @@ -30,7 +24,7 @@ vi.mock('../../../src/router/agent-type-lock.js', () => ({
import { checkAgentTypeConcurrency } from '../../../src/router/agent-type-lock.js';
import type { RouterProjectConfig } from '../../../src/router/config.js';
import type { RouterPlatformAdapter } from '../../../src/router/platform-adapter.js';
import { addJob, jobQueue } from '../../../src/router/queue.js';
import { addJob } from '../../../src/router/queue.js';
import type { CascadeJob } from '../../../src/router/queue.js';
import { processRouterWebhook } from '../../../src/router/webhook-processor.js';
import { isWorkItemLocked, markWorkItemEnqueued } from '../../../src/router/work-item-lock.js';
Expand Down Expand Up @@ -144,24 +138,24 @@ describe('processRouterWebhook', () => {
expect(result.shouldProcess).toBe(true);
expect(result.projectId).toBe('p1');
expect(result.decisionReason).toMatch(/Job queued: implementation agent for work item/);
// buildJob is called without ack params (ack is patched after enqueue)
expect(adapter.buildJob).toHaveBeenCalledWith(
// postAck is called before buildJob — ack info is embedded at build time
expect(adapter.postAck).toHaveBeenCalledWith(
expect.objectContaining({ eventType: 'commentCard' }),
expect.anything(),
mockProject,
triggerResult,
'implementation',
);
expect(addJob).toHaveBeenCalled();
// postAck is called after enqueue
expect(adapter.postAck).toHaveBeenCalledWith(
expect(adapter.buildJob).toHaveBeenCalledWith(
expect.objectContaining({ eventType: 'commentCard' }),
expect.anything(),
mockProject,
'implementation',
triggerResult,
{ commentId: 'comment-abc', message: 'Starting...' },
);
expect(addJob).toHaveBeenCalled();
});

it('enqueues job before posting ack comment', async () => {
it('posts ack comment before enqueuing job', async () => {
const callOrder: string[] = [];
const triggerResult = { agentType: 'implementation', agentInput: {} };
vi.mocked(addJob).mockImplementation(async () => {
Expand All @@ -177,32 +171,50 @@ describe('processRouterWebhook', () => {
});

await processRouterWebhook(adapter, {}, mockTriggerRegistry);
expect(callOrder).toEqual(['addJob', 'postAck']);
expect(callOrder).toEqual(['postAck', 'addJob']);
});

it('patches ack info onto enqueued job via updateData', async () => {
it('calls buildJob with ackResult when postAck returns a result', async () => {
const triggerResult = { agentType: 'implementation', agentInput: {} };
const ackResult = { commentId: 'comment-abc', message: 'Starting...' };
vi.mocked(addJob).mockResolvedValue('job-1');
const mockUpdateData = vi.fn().mockResolvedValue(undefined);
vi.mocked(jobQueue.getJob).mockResolvedValue({
data: { type: 'trello', source: 'trello', payload: {} },
updateData: mockUpdateData,
} as never);
const adapter = makeMockAdapter({
dispatchWithCredentials: vi.fn().mockResolvedValue(triggerResult),
postAck: vi.fn().mockResolvedValue({ commentId: 'comment-abc', message: 'Starting...' }),
postAck: vi.fn().mockResolvedValue(ackResult),
});

await processRouterWebhook(adapter, {}, mockTriggerRegistry);
expect(jobQueue.getJob).toHaveBeenCalledWith('job-1');
expect(mockUpdateData).toHaveBeenCalledWith(
expect.objectContaining({
ackCommentId: 'comment-abc',
ackMessage: 'Starting...',
}),
// buildJob must receive ackResult as the 5th argument
expect(adapter.buildJob).toHaveBeenCalledWith(
expect.objectContaining({ eventType: 'commentCard' }),
expect.anything(),
mockProject,
triggerResult,
ackResult,
);
});

it('still enqueues job when postAck returns undefined', async () => {
const triggerResult = { agentType: 'implementation', agentInput: {} };
vi.mocked(addJob).mockResolvedValue('job-1');
const adapter = makeMockAdapter({
dispatchWithCredentials: vi.fn().mockResolvedValue(triggerResult),
postAck: vi.fn().mockResolvedValue(undefined),
});

await processRouterWebhook(adapter, {}, mockTriggerRegistry);
// buildJob is called with undefined ackResult
expect(adapter.buildJob).toHaveBeenCalledWith(
expect.objectContaining({ eventType: 'commentCard' }),
expect.anything(),
mockProject,
triggerResult,
undefined,
);
// Job is still enqueued even without ack
expect(addJob).toHaveBeenCalled();
});

it('fires pre-actions before queuing', async () => {
const triggerResult = { agentType: 'implementation', agentInput: {} };
vi.mocked(addJob).mockResolvedValue('job-1');
Expand Down