From d96ea993e33df7dcdbd4ee7c18609a2bbd73d43a Mon Sep 17 00:00:00 2001 From: Cascade Bot Date: Fri, 13 Mar 2026 16:05:55 +0000 Subject: [PATCH] feat(queue): add Redis pub/sub cancel channel and job_id column on agent_runs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add src/queue/cancel.ts module with publishCancelCommand() and subscribeToCancelCommands() for Dashboard→Router kill command communication via Redis pub/sub - Add job_id TEXT column to agent_runs table via migration 0035 - Update Drizzle schema to include jobId field - Add repository functions updateRunJobId() and getRunJobId() - Include jobId in enrichedRunSelect for API visibility - Add comprehensive unit tests for pub/sub module (9 tests) and repository functions (7 tests) - All tests passing, lint and typecheck clean Fixes: #302 --- src/db/migrations/0035_add_job_id_to_runs.sql | 1 + src/db/migrations/meta/_journal.json | 7 + src/db/repositories/runsRepository.ts | 15 + src/db/schema/runs.ts | 1 + src/queue/cancel.ts | 93 ++++++ tests/unit/db/runsRepository-jobId.test.ts | 156 +++++++++ tests/unit/db/runsRepository.test.ts | 1 + tests/unit/queue/cancel.test.ts | 314 ++++++++++++++++++ 8 files changed, 588 insertions(+) create mode 100644 src/db/migrations/0035_add_job_id_to_runs.sql create mode 100644 src/queue/cancel.ts create mode 100644 tests/unit/db/runsRepository-jobId.test.ts create mode 100644 tests/unit/queue/cancel.test.ts diff --git a/src/db/migrations/0035_add_job_id_to_runs.sql b/src/db/migrations/0035_add_job_id_to_runs.sql new file mode 100644 index 00000000..e7ff2f2e --- /dev/null +++ b/src/db/migrations/0035_add_job_id_to_runs.sql @@ -0,0 +1 @@ +ALTER TABLE "agent_runs" ADD COLUMN "job_id" TEXT; diff --git a/src/db/migrations/meta/_journal.json b/src/db/migrations/meta/_journal.json index ba7b8d44..d36cbaf9 100644 --- a/src/db/migrations/meta/_journal.json +++ b/src/db/migrations/meta/_journal.json @@ -246,6 +246,13 @@ "when": 1769000000000, "tag": "0034_remove_subscription_cost_zero", "breakpoints": false + }, + { + "idx": 35, + "version": "7", + "when": 1770000000000, + "tag": "0035_add_job_id_to_runs", + "breakpoints": false } ] } diff --git a/src/db/repositories/runsRepository.ts b/src/db/repositories/runsRepository.ts index 52af68a7..9faba267 100644 --- a/src/db/repositories/runsRepository.ts +++ b/src/db/repositories/runsRepository.ts @@ -95,6 +95,7 @@ const enrichedRunSelect = { error: agentRuns.error, prUrl: agentRuns.prUrl, outputSummary: agentRuns.outputSummary, + jobId: agentRuns.jobId, workItemUrl: prWorkItems.workItemUrl, workItemTitle: prWorkItems.workItemTitle, prTitle: prWorkItems.prTitle, @@ -127,6 +128,20 @@ export async function updateRunPRNumber(runId: string, prNumber: number): Promis .where(and(eq(agentRuns.id, runId), isNull(agentRuns.prNumber))); } +export async function updateRunJobId(runId: string, jobId: string): Promise { + const db = getDb(); + await db.update(agentRuns).set({ jobId }).where(eq(agentRuns.id, runId)); +} + +export async function getRunJobId(runId: string): Promise { + const db = getDb(); + const [row] = await db + .select({ jobId: agentRuns.jobId }) + .from(agentRuns) + .where(eq(agentRuns.id, runId)); + return row?.jobId ?? null; +} + export async function completeRun(runId: string, input: CompleteRunInput): Promise { const db = getDb(); await db diff --git a/src/db/schema/runs.ts b/src/db/schema/runs.ts index 56a4e0c5..93d66c87 100644 --- a/src/db/schema/runs.ts +++ b/src/db/schema/runs.ts @@ -33,6 +33,7 @@ export const agentRuns = pgTable( error: text('error'), prUrl: text('pr_url'), outputSummary: text('output_summary'), + jobId: text('job_id'), }, (table) => [ index('idx_agent_runs_project_id').on(table.projectId), diff --git a/src/queue/cancel.ts b/src/queue/cancel.ts new file mode 100644 index 00000000..559611cb --- /dev/null +++ b/src/queue/cancel.ts @@ -0,0 +1,93 @@ +/** + * Redis pub/sub module for cancel command distribution. + * + * Provides a mechanism for the Dashboard to publish cancel commands that the Router + * receives and uses to terminate running agent jobs. + */ + +import { Redis } from 'ioredis'; + +// ── Types ──────────────────────────────────────────────────────────────── + +export interface CancelCommandPayload { + runId: string; + reason: string; +} + +type CancelCommandHandler = (payload: CancelCommandPayload) => Promise; + +// ── Channel ────────────────────────────────────────────────────────────── + +const CANCEL_CHANNEL = 'cascade:cancel'; + +// ── Instance caching ──────────────────────────────────────────────────── + +let publisherInstance: Redis | null = null; +let subscriberInstance: Redis | null = null; + +function getPublisher(): Redis { + if (!publisherInstance) { + const redisUrl = process.env.REDIS_URL; + if (!redisUrl) { + throw new Error('REDIS_URL is required for cancel pub/sub'); + } + publisherInstance = new Redis(redisUrl); + } + return publisherInstance; +} + +function getSubscriber(): Redis { + if (!subscriberInstance) { + const redisUrl = process.env.REDIS_URL; + if (!redisUrl) { + throw new Error('REDIS_URL is required for cancel pub/sub'); + } + subscriberInstance = new Redis(redisUrl); + } + return subscriberInstance; +} + +// ── Publish ────────────────────────────────────────────────────────────── + +/** + * Publish a cancel command to the cascade:cancel channel. + * + * The Router process subscribes to this channel and uses the runId to + * identify and terminate the corresponding job. + * + * @param runId - The agent run ID to cancel + * @param reason - Human-readable reason for cancellation (e.g., "user requested", "timeout") + */ +export async function publishCancelCommand(runId: string, reason: string): Promise { + const publisher = getPublisher(); + const payload: CancelCommandPayload = { runId, reason }; + await publisher.publish(CANCEL_CHANNEL, JSON.stringify(payload)); +} + +// ── Subscribe ──────────────────────────────────────────────────────────── + +/** + * Subscribe to cancel commands from the cascade:cancel channel. + * + * Invokes the handler callback for each cancel command received. + * The handler should look up the run's jobId from the database and + * use it to kill the job in BullMQ. + * + * @param handler - Callback function invoked with each cancel payload + */ +export async function subscribeToCancelCommands(handler: CancelCommandHandler): Promise { + const subscriber = getSubscriber(); + + subscriber.on('message', async (channel, message) => { + if (channel === CANCEL_CHANNEL) { + try { + const payload = JSON.parse(message) as CancelCommandPayload; + await handler(payload); + } catch (error) { + console.error('[cancel] Failed to handle cancel command:', error); + } + } + }); + + await subscriber.subscribe(CANCEL_CHANNEL); +} diff --git a/tests/unit/db/runsRepository-jobId.test.ts b/tests/unit/db/runsRepository-jobId.test.ts new file mode 100644 index 00000000..a2d62dba --- /dev/null +++ b/tests/unit/db/runsRepository-jobId.test.ts @@ -0,0 +1,156 @@ +/** + * Unit tests for jobId-related functions in src/db/repositories/runsRepository.ts + * + * Tests updateRunJobId and getRunJobId functions. + */ +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +// Mock the database client +const mockUpdate = vi.fn(); +const mockSelect = vi.fn(); +const mockSet = vi.fn(); +const mockWhere = vi.fn(); +const mockFrom = vi.fn(); + +vi.mock('../../../src/db/client.js', () => ({ + getDb: () => ({ + update: mockUpdate, + select: mockSelect, + }), +})); + +vi.mock('../../../src/db/schema/index.js', () => ({ + agentRuns: { + id: 'id', + jobId: 'job_id', + projectId: 'project_id', + workItemId: 'work_item_id', + agentType: 'agent_type', + status: 'status', + startedAt: 'started_at', + prNumber: 'pr_number', + durationMs: 'duration_ms', + costUsd: 'cost_usd', + engine: 'engine', + triggerType: 'trigger_type', + model: 'model', + maxIterations: 'max_iterations', + completedAt: 'completed_at', + llmIterations: 'llm_iterations', + gadgetCalls: 'gadget_calls', + success: 'success', + error: 'error', + prUrl: 'pr_url', + outputSummary: 'output_summary', + }, + prWorkItems: { + projectId: 'project_id', + prNumber: 'pr_number', + workItemUrl: 'work_item_url', + workItemTitle: 'work_item_title', + prTitle: 'pr_title', + }, + agentRunLogs: { runId: 'run_id' }, + agentRunLlmCalls: { + runId: 'run_id', + callNumber: 'call_number', + id: 'id', + }, + debugAnalyses: { id: 'id' }, + projects: { id: 'id', orgId: 'org_id', name: 'name' }, + organizations: { id: 'id', name: 'name' }, +})); + +vi.mock('../../../src/db/repositories/joinHelpers.js', () => ({ + buildAgentRunWorkItemJoin: () => 'mock-join-condition', +})); + +import { getRunJobId, updateRunJobId } from '../../../src/db/repositories/runsRepository.js'; + +describe('updateRunJobId', () => { + beforeEach(() => { + vi.resetAllMocks(); + + // Set up chained mock returns for update + mockUpdate.mockReturnValue({ set: mockSet }); + mockSet.mockReturnValue({ where: mockWhere }); + mockWhere.mockResolvedValue(undefined); + }); + + it('updates the job_id column for a given run', async () => { + const runId = 'run-123'; + const jobId = 'job-456'; + + await updateRunJobId(runId, jobId); + + expect(mockUpdate).toHaveBeenCalled(); + expect(mockSet).toHaveBeenCalledWith({ jobId }); + expect(mockWhere).toHaveBeenCalled(); + }); + + it('handles multiple jobId updates independently', async () => { + await updateRunJobId('run-1', 'job-1'); + await updateRunJobId('run-2', 'job-2'); + + expect(mockUpdate).toHaveBeenCalledTimes(2); + expect(mockSet).toHaveBeenNthCalledWith(1, { jobId: 'job-1' }); + expect(mockSet).toHaveBeenNthCalledWith(2, { jobId: 'job-2' }); + }); +}); + +describe('getRunJobId', () => { + beforeEach(() => { + vi.resetAllMocks(); + + // Set up chained mock returns for select + mockSelect.mockReturnValue({ from: mockFrom }); + mockFrom.mockReturnValue({ where: mockWhere }); + }); + + it('returns the job_id for a given run', async () => { + const jobId = 'job-789'; + mockWhere.mockResolvedValue([{ jobId }]); + + const result = await getRunJobId('run-123'); + + expect(result).toBe(jobId); + expect(mockSelect).toHaveBeenCalled(); + expect(mockWhere).toHaveBeenCalled(); + }); + + it('returns null when no job_id is found', async () => { + mockWhere.mockResolvedValue([]); + + const result = await getRunJobId('run-nonexistent'); + + expect(result).toBeNull(); + }); + + it('returns null when the jobId field is null in the database', async () => { + mockWhere.mockResolvedValue([{ jobId: null }]); + + const result = await getRunJobId('run-123'); + + expect(result).toBeNull(); + }); + + it('returns null when the row has no jobId property', async () => { + mockWhere.mockResolvedValue([{}]); + + const result = await getRunJobId('run-123'); + + expect(result).toBeNull(); + }); + + it('handles multiple getRunJobId calls independently', async () => { + mockWhere.mockResolvedValueOnce([{ jobId: 'job-1' }]); + mockWhere.mockResolvedValueOnce([{ jobId: 'job-2' }]); + + const result1 = await getRunJobId('run-1'); + const result2 = await getRunJobId('run-2'); + + expect(result1).toBe('job-1'); + expect(result2).toBe('job-2'); + expect(mockSelect).toHaveBeenCalledTimes(2); + }); +}); diff --git a/tests/unit/db/runsRepository.test.ts b/tests/unit/db/runsRepository.test.ts index 28a60dce..c3b3bf1d 100644 --- a/tests/unit/db/runsRepository.test.ts +++ b/tests/unit/db/runsRepository.test.ts @@ -46,6 +46,7 @@ vi.mock('../../../src/db/schema/index.js', () => ({ error: 'error', prUrl: 'pr_url', outputSummary: 'output_summary', + jobId: 'job_id', }, agentRunLogs: { runId: 'run_id' }, agentRunLlmCalls: { diff --git a/tests/unit/queue/cancel.test.ts b/tests/unit/queue/cancel.test.ts new file mode 100644 index 00000000..be1f3706 --- /dev/null +++ b/tests/unit/queue/cancel.test.ts @@ -0,0 +1,314 @@ +/** + * Unit tests for src/queue/cancel.ts + * + * Tests Redis pub/sub publish and subscribe for cancel commands. + */ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +// ── Mocks (must be set up before dynamic import) ────────────────────────────── + +const mockPublish = vi.fn(); +const mockSubscribe = vi.fn(); + +vi.mock('ioredis', () => { + return { + Redis: vi.fn().mockImplementation(() => ({ + publish: (...args: unknown[]) => mockPublish(...args), + subscribe: (...args: unknown[]) => mockSubscribe(...args), + on: vi.fn().mockReturnThis(), + })), + }; +}); + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +/** + * Re-import the module freshly so each test suite starts with clean + * publisher and subscriber singletons. + */ +async function freshImport() { + vi.resetModules(); + + vi.mock('ioredis', () => { + return { + Redis: vi.fn().mockImplementation(() => ({ + publish: (...args: unknown[]) => mockPublish(...args), + subscribe: (...args: unknown[]) => mockSubscribe(...args), + on: vi.fn().mockReturnThis(), + })), + }; + }); + + return import('../../../src/queue/cancel.js'); +} + +// ── Tests ───────────────────────────────────────────────────────────────────── + +describe('publishCancelCommand', () => { + beforeEach(() => { + vi.stubEnv('REDIS_URL', 'redis://localhost:6379'); + mockPublish.mockResolvedValue(1); // 1 subscriber received the message + }); + + afterEach(() => { + vi.unstubAllEnvs(); + vi.resetModules(); + }); + + it('publishes a cancel command as JSON to the cascade:cancel channel', async () => { + const { publishCancelCommand } = await freshImport(); + + await publishCancelCommand('run-123', 'user requested'); + + expect(mockPublish).toHaveBeenCalledWith( + 'cascade:cancel', + JSON.stringify({ + runId: 'run-123', + reason: 'user requested', + }), + ); + }); + + it('handles multiple cancel commands independently', async () => { + const { publishCancelCommand } = await freshImport(); + + await publishCancelCommand('run-1', 'reason-1'); + await publishCancelCommand('run-2', 'reason-2'); + + expect(mockPublish).toHaveBeenCalledTimes(2); + expect(mockPublish).toHaveBeenNthCalledWith( + 1, + 'cascade:cancel', + JSON.stringify({ runId: 'run-1', reason: 'reason-1' }), + ); + expect(mockPublish).toHaveBeenNthCalledWith( + 2, + 'cascade:cancel', + JSON.stringify({ runId: 'run-2', reason: 'reason-2' }), + ); + }); + + it('throws an error when REDIS_URL is not set', async () => { + const saved = process.env.REDIS_URL; + // biome-ignore lint/performance/noDelete: need to fully remove the key + delete process.env.REDIS_URL; + + try { + const { publishCancelCommand } = await freshImport(); + await expect(publishCancelCommand('run-1', 'reason')).rejects.toThrow( + 'REDIS_URL is required', + ); + } finally { + if (saved !== undefined) { + process.env.REDIS_URL = saved; + } + } + }); +}); + +describe('subscribeToCancelCommands', () => { + beforeEach(() => { + vi.stubEnv('REDIS_URL', 'redis://localhost:6379'); + mockSubscribe.mockResolvedValue(1); + }); + + afterEach(() => { + vi.unstubAllEnvs(); + vi.resetModules(); + }); + + it('subscribes to the cascade:cancel channel', async () => { + const { subscribeToCancelCommands } = await freshImport(); + const handler = vi.fn(); + + await subscribeToCancelCommands(handler); + + expect(mockSubscribe).toHaveBeenCalledWith('cascade:cancel'); + }); + + it('invokes handler callback when a cancel message is received', async () => { + const { subscribeToCancelCommands } = await freshImport(); + const handler = vi.fn().mockResolvedValue(undefined); + + const { Redis } = await import('ioredis'); + const RedisMock = Redis as ReturnType; + let onCallback: ((channel: string, message: string) => void) | null = null; + + RedisMock.mockImplementation(() => ({ + publish: vi.fn(), + subscribe: vi.fn().mockResolvedValue(1), + on: vi + .fn() + .mockImplementation((event: string, cb: (channel: string, message: string) => void) => { + if (event === 'message') { + onCallback = cb as (channel: string, message: string) => void; + } + return { + publish: vi.fn(), + subscribe: vi.fn(), + on: vi.fn(), + }; + }), + })); + + await subscribeToCancelCommands(handler); + + // Simulate receiving a message + if (onCallback) { + onCallback('cascade:cancel', JSON.stringify({ runId: 'run-456', reason: 'timeout' })); + + // Wait for async handler to be called + await new Promise((resolve) => setTimeout(resolve, 10)); + + expect(handler).toHaveBeenCalledWith({ + runId: 'run-456', + reason: 'timeout', + }); + } + }); + + it('ignores messages from other channels', async () => { + const { subscribeToCancelCommands } = await freshImport(); + const handler = vi.fn().mockResolvedValue(undefined); + + const { Redis } = await import('ioredis'); + const RedisMock = Redis as ReturnType; + let onCallback: ((channel: string, message: string) => void) | null = null; + + RedisMock.mockImplementation(() => ({ + publish: vi.fn(), + subscribe: vi.fn().mockResolvedValue(1), + on: vi + .fn() + .mockImplementation((event: string, cb: (channel: string, message: string) => void) => { + if (event === 'message') { + onCallback = cb as (channel: string, message: string) => void; + } + return { + publish: vi.fn(), + subscribe: vi.fn(), + on: vi.fn(), + }; + }), + })); + + await subscribeToCancelCommands(handler); + + // Simulate receiving a message from a different channel + if (onCallback) { + onCallback('other:channel', JSON.stringify({ data: 'something' })); + + await new Promise((resolve) => setTimeout(resolve, 10)); + + expect(handler).not.toHaveBeenCalled(); + } + }); + + it('handles JSON parse errors gracefully', async () => { + const { subscribeToCancelCommands } = await freshImport(); + const handler = vi.fn().mockResolvedValue(undefined); + + const { Redis } = await import('ioredis'); + const RedisMock = Redis as ReturnType; + let onCallback: ((channel: string, message: string) => void) | null = null; + + RedisMock.mockImplementation(() => ({ + publish: vi.fn(), + subscribe: vi.fn().mockResolvedValue(1), + on: vi + .fn() + .mockImplementation((event: string, cb: (channel: string, message: string) => void) => { + if (event === 'message') { + onCallback = cb as (channel: string, message: string) => void; + } + return { + publish: vi.fn(), + subscribe: vi.fn(), + on: vi.fn(), + }; + }), + })); + + // Spy on console.error to verify error logging + const consoleSpy = vi.spyOn(console, 'error').mockImplementation(() => {}); + + await subscribeToCancelCommands(handler); + + // Simulate receiving an invalid JSON message + if (onCallback) { + onCallback('cascade:cancel', 'invalid json {'); + + await new Promise((resolve) => setTimeout(resolve, 10)); + + expect(handler).not.toHaveBeenCalled(); + expect(consoleSpy).toHaveBeenCalledWith( + '[cancel] Failed to handle cancel command:', + expect.any(Error), + ); + } + + consoleSpy.mockRestore(); + }); + + it('handles handler errors gracefully', async () => { + const { subscribeToCancelCommands } = await freshImport(); + const handlerError = new Error('Handler failed'); + const handler = vi.fn().mockRejectedValue(handlerError); + + const { Redis } = await import('ioredis'); + const RedisMock = Redis as ReturnType; + let onCallback: ((channel: string, message: string) => void) | null = null; + + RedisMock.mockImplementation(() => ({ + publish: vi.fn(), + subscribe: vi.fn().mockResolvedValue(1), + on: vi + .fn() + .mockImplementation((event: string, cb: (channel: string, message: string) => void) => { + if (event === 'message') { + onCallback = cb as (channel: string, message: string) => void; + } + return { + publish: vi.fn(), + subscribe: vi.fn(), + on: vi.fn(), + }; + }), + })); + + const consoleSpy = vi.spyOn(console, 'error').mockImplementation(() => {}); + + await subscribeToCancelCommands(handler); + + // Simulate receiving a message + if (onCallback) { + onCallback('cascade:cancel', JSON.stringify({ runId: 'run-789', reason: 'test' })); + + await new Promise((resolve) => setTimeout(resolve, 10)); + + expect(handler).toHaveBeenCalled(); + expect(consoleSpy).toHaveBeenCalledWith( + '[cancel] Failed to handle cancel command:', + handlerError, + ); + } + + consoleSpy.mockRestore(); + }); + + it('throws an error when REDIS_URL is not set', async () => { + const saved = process.env.REDIS_URL; + // biome-ignore lint/performance/noDelete: need to fully remove the key + delete process.env.REDIS_URL; + + try { + const { subscribeToCancelCommands } = await freshImport(); + const handler = vi.fn(); + await expect(subscribeToCancelCommands(handler)).rejects.toThrow('REDIS_URL is required'); + } finally { + if (saved !== undefined) { + process.env.REDIS_URL = saved; + } + } + }); +});