diff --git a/src/api/routers/runs.ts b/src/api/routers/runs.ts index 5cd38592..0aced729 100644 --- a/src/api/routers/runs.ts +++ b/src/api/routers/runs.ts @@ -13,6 +13,7 @@ import { listLlmCallsMeta, listRuns, } from '../../db/repositories/runsRepository.js'; +import { publishCancelCommand } from '../../queue/cancel.js'; import { isAnalysisRunning } from '../../triggers/shared/debug-status.js'; import { logger } from '../../utils/logging.js'; import { protectedProcedure, router, superAdminProcedure } from '../trpc.js'; @@ -383,6 +384,15 @@ export const runsRouter = router({ }); } + // Publish cancel command to Router (fire-and-forget) + publishCancelCommand(input.runId, reason).catch((err) => { + logger.error('[runs.cancel] Failed to publish cancel command:', { + runId: input.runId, + reason, + error: String(err), + }); + }); + return { cancelled: true }; }), }); diff --git a/src/queue/cancel.ts b/src/queue/cancel.ts index 559611cb..e517e391 100644 --- a/src/queue/cancel.ts +++ b/src/queue/cancel.ts @@ -91,3 +91,20 @@ export async function subscribeToCancelCommands(handler: CancelCommandHandler): await subscriber.subscribe(CANCEL_CHANNEL); } + +/** + * Unsubscribe from the cancel channel and close the subscriber connection. + * + * Should be called during graceful shutdown to release the Redis connection. + */ +export async function unsubscribeFromCancelCommands(): Promise { + if (!subscriberInstance) return; + + try { + await subscriberInstance.unsubscribe(CANCEL_CHANNEL); + subscriberInstance.disconnect(); + subscriberInstance = null; + } catch (error) { + console.error('[cancel] Failed to unsubscribe from cancel commands:', error); + } +} diff --git a/src/router/cancel-listener.ts b/src/router/cancel-listener.ts new file mode 100644 index 00000000..c3eb09aa --- /dev/null +++ b/src/router/cancel-listener.ts @@ -0,0 +1,82 @@ +/** + * Cancel command listener for the router. + * + * Subscribes to Redis cancel commands published by the Dashboard API. + * When a cancel command is received, looks up the jobId from the database + * and kills the corresponding worker container. + */ + +import { getRunJobId } from '../db/repositories/runsRepository.js'; +import { subscribeToCancelCommands, unsubscribeFromCancelCommands } from '../queue/cancel.js'; +import { logger } from '../utils/logging.js'; +import { killWorker } from './container-manager.js'; + +let cancelSubscriberActive = false; + +/** + * Start listening for cancel commands on the Redis cancel channel. + * + * For each cancel command, attempts to look up the jobId from the database + * and kills the corresponding worker. If the jobId is not found in the + * database, logs a warning — no Docker fallback is attempted, as containers + * carry no run ID label and a fallback would risk killing the wrong container + * in multi-run environments. + */ +export async function startCancelListener(): Promise { + if (!process.env.REDIS_URL) { + logger.info('[CancelListener] Redis not configured, cancel listener disabled'); + return; + } + + try { + await subscribeToCancelCommands(async (payload) => { + const { runId, reason } = payload; + logger.info('[CancelListener] Cancel command received:', { runId, reason }); + + try { + // Try to get jobId from database + const jobId = await getRunJobId(runId); + + if (jobId) { + logger.info('[CancelListener] Found jobId for run, killing worker:', { runId, jobId }); + await killWorker(jobId); + } else { + // jobId not found — container labels carry no run ID so we cannot safely + // match a container to this run. Log a warning and skip to avoid killing + // the wrong worker in multi-run environments. + logger.warn( + '[CancelListener] JobId not found in DB for run — cannot cancel worker safely:', + { runId }, + ); + } + } catch (err) { + logger.error('[CancelListener] Error processing cancel command:', { + runId, + reason, + error: String(err), + }); + } + }); + + cancelSubscriberActive = true; + logger.info('[CancelListener] Cancel listener started'); + } catch (err) { + logger.error('[CancelListener] Failed to start cancel listener:', { error: String(err) }); + throw err; + } +} + +/** + * Stop listening for cancel commands and close the Redis subscriber connection. + */ +export async function stopCancelListener(): Promise { + if (!cancelSubscriberActive) return; + + try { + await unsubscribeFromCancelCommands(); + cancelSubscriberActive = false; + logger.info('[CancelListener] Cancel listener stopped'); + } catch (err) { + logger.error('[CancelListener] Error stopping cancel listener:', { error: String(err) }); + } +} diff --git a/src/router/index.ts b/src/router/index.ts index 4c6fcec8..efb1103a 100644 --- a/src/router/index.ts +++ b/src/router/index.ts @@ -18,6 +18,7 @@ import { import { GitHubRouterAdapter, injectEventType } from './adapters/github.js'; import { JiraRouterAdapter } from './adapters/jira.js'; import { TrelloRouterAdapter } from './adapters/trello.js'; +import { startCancelListener, stopCancelListener } from './cancel-listener.js'; import { getQueueStats } from './queue.js'; import { processRouterWebhook } from './webhook-processor.js'; import { @@ -129,6 +130,7 @@ app.post( // Graceful shutdown async function shutdown(signal: string): Promise { logger.info('Received shutdown signal', { signal }); + await stopCancelListener(); await stopWorkerProcessor(); await flush(3000); process.exit(0); @@ -159,6 +161,9 @@ async function startRouter(): Promise { await initAgentMessages(); await initPrompts(); + // Start cancel listener for handling run cancellations + await startCancelListener(); + startWorkerProcessor(); logger.info('Starting router', { port }); serve({ fetch: app.fetch, port }); diff --git a/tests/unit/api/routers/runs.test.ts b/tests/unit/api/routers/runs.test.ts index e8ac1db7..6b1c9d3f 100644 --- a/tests/unit/api/routers/runs.test.ts +++ b/tests/unit/api/routers/runs.test.ts @@ -73,6 +73,12 @@ vi.mock('../../../../src/utils/logging.js', () => ({ logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() }, })); +// Mock publishCancelCommand (fire-and-forget) +const mockPublishCancelCommand = vi.fn().mockResolvedValue(undefined); +vi.mock('../../../../src/queue/cancel.js', () => ({ + publishCancelCommand: (...args: unknown[]) => mockPublishCancelCommand(...args), +})); + import { runsRouter } from '../../../../src/api/routers/runs.js'; function createCaller(ctx: TRPCContext) { @@ -892,6 +898,42 @@ describe('runsRouter', () => { expect(mockCancelRunById).toHaveBeenCalledWith(RUN_UUID, 'Manually cancelled via API'); }); + it('publishes cancel command after successful cancel', async () => { + mockGetRunById.mockResolvedValue({ + id: RUN_UUID, + projectId: 'p1', + status: 'running', + }); + mockDbWhere.mockResolvedValue([{ orgId: 'org-1' }]); + mockCancelRunById.mockResolvedValue(true); + + const caller = createCaller({ user: mockUser, effectiveOrgId: mockUser.orgId }); + await caller.cancel({ runId: RUN_UUID }); + + // Wait a tick for the fire-and-forget to execute + await new Promise((resolve) => setImmediate(resolve)); + + expect(mockPublishCancelCommand).toHaveBeenCalledWith(RUN_UUID, 'Manually cancelled via API'); + }); + + it('publishes cancel command with custom reason', async () => { + mockGetRunById.mockResolvedValue({ + id: RUN_UUID, + projectId: 'p1', + status: 'running', + }); + mockDbWhere.mockResolvedValue([{ orgId: 'org-1' }]); + mockCancelRunById.mockResolvedValue(true); + + const caller = createCaller({ user: mockUser, effectiveOrgId: mockUser.orgId }); + await caller.cancel({ runId: RUN_UUID, reason: 'Orphaned worker' }); + + // Wait a tick for the fire-and-forget to execute + await new Promise((resolve) => setImmediate(resolve)); + + expect(mockPublishCancelCommand).toHaveBeenCalledWith(RUN_UUID, 'Orphaned worker'); + }); + it('uses custom reason when provided', async () => { mockGetRunById.mockResolvedValue({ id: RUN_UUID, diff --git a/tests/unit/cli/scm/create-pr-review-sidecar.test.ts b/tests/unit/cli/scm/create-pr-review-sidecar.test.ts index 1a379a29..bb811ba1 100644 --- a/tests/unit/cli/scm/create-pr-review-sidecar.test.ts +++ b/tests/unit/cli/scm/create-pr-review-sidecar.test.ts @@ -97,7 +97,7 @@ describe('CreatePRReviewCommand sidecar write', () => { expect(existsSync(sidecarPath)).toBe(true); const sidecar = JSON.parse(readFileSync(sidecarPath, 'utf-8')); - expect(sidecar).toEqual({ + expect(sidecar).toMatchObject({ source: 'cascade-tools scm create-pr-review', reviewUrl: 'https://github.com/owner/repo/pull/1#pullrequestreview-123', event: 'REQUEST_CHANGES', diff --git a/tests/unit/router/cancel-listener.test.ts b/tests/unit/router/cancel-listener.test.ts new file mode 100644 index 00000000..11618212 --- /dev/null +++ b/tests/unit/router/cancel-listener.test.ts @@ -0,0 +1,151 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +// --------------------------------------------------------------------------- +// Module mocks +// --------------------------------------------------------------------------- + +const mockGetRunJobId = vi.fn(); +vi.mock('../../../src/db/repositories/runsRepository.js', () => ({ + getRunJobId: (...args: unknown[]) => mockGetRunJobId(...args), +})); + +const mockSubscribeToCancelCommands = vi.fn(); +const mockUnsubscribeFromCancelCommands = vi.fn(); +vi.mock('../../../src/queue/cancel.js', () => ({ + subscribeToCancelCommands: (...args: unknown[]) => mockSubscribeToCancelCommands(...args), + unsubscribeFromCancelCommands: (...args: unknown[]) => mockUnsubscribeFromCancelCommands(...args), + publishCancelCommand: vi.fn().mockResolvedValue(undefined), +})); + +const mockKillWorker = vi.fn(); +vi.mock('../../../src/router/container-manager.js', () => ({ + killWorker: (...args: unknown[]) => mockKillWorker(...args), +})); + +const { mockLogger } = vi.hoisted(() => ({ + mockLogger: { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }, +})); +vi.mock('../../../src/utils/logging.js', () => ({ + logger: mockLogger, +})); + +// --------------------------------------------------------------------------- +// Imports (after mocks) +// --------------------------------------------------------------------------- + +import { startCancelListener, stopCancelListener } from '../../../src/router/cancel-listener.js'; + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('cancel-listener', () => { + beforeEach(async () => { + vi.clearAllMocks(); + // Reset module-level cancelSubscriberActive flag by stopping the listener + // (no-op if not active, safe to call always) + mockUnsubscribeFromCancelCommands.mockResolvedValue(undefined); + await stopCancelListener(); + vi.clearAllMocks(); + }); + + describe('startCancelListener', () => { + it('subscribes to cancel commands when REDIS_URL is set', async () => { + process.env.REDIS_URL = 'redis://localhost:6379'; + vi.clearAllMocks(); // Ensure clean state + await startCancelListener(); + + expect(mockSubscribeToCancelCommands).toHaveBeenCalled(); + }); + + it('kills worker when jobId is found in database', async () => { + process.env.REDIS_URL = 'redis://localhost:6379'; + const handler = vi.fn(); + mockSubscribeToCancelCommands.mockImplementation(async (cb: unknown) => { + handler.mockImplementation(cb); + }); + + await startCancelListener(); + + mockGetRunJobId.mockResolvedValue('job-123'); + + // Simulate receiving a cancel command + await handler({ runId: 'run-123', reason: 'user requested' }); + + expect(mockGetRunJobId).toHaveBeenCalledWith('run-123'); + expect(mockKillWorker).toHaveBeenCalledWith('job-123'); + }); + + it('logs a warning and does not kill any container when jobId is not found in DB', async () => { + process.env.REDIS_URL = 'redis://localhost:6379'; + const handler = vi.fn(); + mockSubscribeToCancelCommands.mockImplementation(async (cb: unknown) => { + handler.mockImplementation(cb); + }); + + await startCancelListener(); + + mockGetRunJobId.mockResolvedValue(null); + + // Simulate receiving a cancel command + await handler({ runId: 'run-123', reason: 'timeout' }); + + expect(mockKillWorker).not.toHaveBeenCalled(); + expect(mockLogger.warn).toHaveBeenCalledWith( + expect.stringContaining('JobId not found in DB for run'), + expect.objectContaining({ runId: 'run-123' }), + ); + }); + + it('handles errors in cancel handler gracefully', async () => { + process.env.REDIS_URL = 'redis://localhost:6379'; + const handler = vi.fn(); + mockSubscribeToCancelCommands.mockImplementation(async (cb: unknown) => { + handler.mockImplementation(cb); + }); + + await startCancelListener(); + + mockGetRunJobId.mockRejectedValue(new Error('DB error')); + + // Should not throw + await expect(handler({ runId: 'run-123', reason: 'user requested' })).resolves.not.toThrow(); + + expect(mockKillWorker).not.toHaveBeenCalled(); + }); + }); + + describe('stopCancelListener', () => { + it('can be called without error when not active', async () => { + await expect(stopCancelListener()).resolves.not.toThrow(); + expect(mockUnsubscribeFromCancelCommands).not.toHaveBeenCalled(); + }); + + it('calls unsubscribeFromCancelCommands when active', async () => { + process.env.REDIS_URL = 'redis://localhost:6379'; + await startCancelListener(); + + mockUnsubscribeFromCancelCommands.mockResolvedValue(undefined); + await stopCancelListener(); + + expect(mockUnsubscribeFromCancelCommands).toHaveBeenCalled(); + }); + + it('can be called multiple times without error', async () => { + process.env.REDIS_URL = 'redis://localhost:6379'; + await startCancelListener(); + + mockUnsubscribeFromCancelCommands.mockResolvedValue(undefined); + await stopCancelListener(); + await stopCancelListener(); // Second call is a no-op + + // unsubscribe should only have been called once (second call is no-op) + expect(mockUnsubscribeFromCancelCommands).toHaveBeenCalledTimes(1); + }); + }); +});