Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/api/routers/runs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
listLlmCallsMeta,
listRuns,
} from '../../db/repositories/runsRepository.js';
import { publishCancelCommand } from '../../queue/cancel.js';
import { isAnalysisRunning } from '../../triggers/shared/debug-status.js';
import { logger } from '../../utils/logging.js';
import { protectedProcedure, router, superAdminProcedure } from '../trpc.js';
Expand Down Expand Up @@ -383,6 +384,15 @@ export const runsRouter = router({
});
}

// Publish cancel command to Router (fire-and-forget)
publishCancelCommand(input.runId, reason).catch((err) => {
logger.error('[runs.cancel] Failed to publish cancel command:', {
runId: input.runId,
reason,
error: String(err),
});
});

return { cancelled: true };
}),
});
17 changes: 17 additions & 0 deletions src/queue/cancel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,20 @@ export async function subscribeToCancelCommands(handler: CancelCommandHandler):

await subscriber.subscribe(CANCEL_CHANNEL);
}

/**
* Unsubscribe from the cancel channel and close the subscriber connection.
*
* Should be called during graceful shutdown to release the Redis connection.
*/
export async function unsubscribeFromCancelCommands(): Promise<void> {
if (!subscriberInstance) return;

try {
await subscriberInstance.unsubscribe(CANCEL_CHANNEL);
subscriberInstance.disconnect();
subscriberInstance = null;
} catch (error) {
console.error('[cancel] Failed to unsubscribe from cancel commands:', error);
}
}
82 changes: 82 additions & 0 deletions src/router/cancel-listener.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/**
* Cancel command listener for the router.
*
* Subscribes to Redis cancel commands published by the Dashboard API.
* When a cancel command is received, looks up the jobId from the database
* and kills the corresponding worker container.
*/

import { getRunJobId } from '../db/repositories/runsRepository.js';
import { subscribeToCancelCommands, unsubscribeFromCancelCommands } from '../queue/cancel.js';
import { logger } from '../utils/logging.js';
import { killWorker } from './container-manager.js';

let cancelSubscriberActive = false;

/**
* Start listening for cancel commands on the Redis cancel channel.
*
* For each cancel command, attempts to look up the jobId from the database
* and kills the corresponding worker. If the jobId is not found in the
* database, logs a warning — no Docker fallback is attempted, as containers
* carry no run ID label and a fallback would risk killing the wrong container
* in multi-run environments.
*/
export async function startCancelListener(): Promise<void> {
if (!process.env.REDIS_URL) {
logger.info('[CancelListener] Redis not configured, cancel listener disabled');
return;
}

try {
await subscribeToCancelCommands(async (payload) => {
const { runId, reason } = payload;
logger.info('[CancelListener] Cancel command received:', { runId, reason });

try {
// Try to get jobId from database
const jobId = await getRunJobId(runId);

if (jobId) {
logger.info('[CancelListener] Found jobId for run, killing worker:', { runId, jobId });
await killWorker(jobId);
} else {
// jobId not found — container labels carry no run ID so we cannot safely
// match a container to this run. Log a warning and skip to avoid killing
// the wrong worker in multi-run environments.
logger.warn(
'[CancelListener] JobId not found in DB for run — cannot cancel worker safely:',
{ runId },
);
}
} catch (err) {
logger.error('[CancelListener] Error processing cancel command:', {
runId,
reason,
error: String(err),
});
}
});

cancelSubscriberActive = true;
logger.info('[CancelListener] Cancel listener started');
} catch (err) {
logger.error('[CancelListener] Failed to start cancel listener:', { error: String(err) });
throw err;
}
}

/**
* Stop listening for cancel commands and close the Redis subscriber connection.
*/
export async function stopCancelListener(): Promise<void> {
if (!cancelSubscriberActive) return;

try {
await unsubscribeFromCancelCommands();
cancelSubscriberActive = false;
logger.info('[CancelListener] Cancel listener stopped');
} catch (err) {
logger.error('[CancelListener] Error stopping cancel listener:', { error: String(err) });
}
}
5 changes: 5 additions & 0 deletions src/router/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
import { GitHubRouterAdapter, injectEventType } from './adapters/github.js';
import { JiraRouterAdapter } from './adapters/jira.js';
import { TrelloRouterAdapter } from './adapters/trello.js';
import { startCancelListener, stopCancelListener } from './cancel-listener.js';
import { getQueueStats } from './queue.js';
import { processRouterWebhook } from './webhook-processor.js';
import {
Expand Down Expand Up @@ -129,6 +130,7 @@ app.post(
// Graceful shutdown
async function shutdown(signal: string): Promise<void> {
logger.info('Received shutdown signal', { signal });
await stopCancelListener();
await stopWorkerProcessor();
await flush(3000);
process.exit(0);
Expand Down Expand Up @@ -159,6 +161,9 @@ async function startRouter(): Promise<void> {
await initAgentMessages();
await initPrompts();

// Start cancel listener for handling run cancellations
await startCancelListener();

startWorkerProcessor();
logger.info('Starting router', { port });
serve({ fetch: app.fetch, port });
Expand Down
42 changes: 42 additions & 0 deletions tests/unit/api/routers/runs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ vi.mock('../../../../src/utils/logging.js', () => ({
logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() },
}));

// Mock publishCancelCommand (fire-and-forget)
const mockPublishCancelCommand = vi.fn().mockResolvedValue(undefined);
vi.mock('../../../../src/queue/cancel.js', () => ({
publishCancelCommand: (...args: unknown[]) => mockPublishCancelCommand(...args),
}));

import { runsRouter } from '../../../../src/api/routers/runs.js';

function createCaller(ctx: TRPCContext) {
Expand Down Expand Up @@ -892,6 +898,42 @@ describe('runsRouter', () => {
expect(mockCancelRunById).toHaveBeenCalledWith(RUN_UUID, 'Manually cancelled via API');
});

it('publishes cancel command after successful cancel', async () => {
mockGetRunById.mockResolvedValue({
id: RUN_UUID,
projectId: 'p1',
status: 'running',
});
mockDbWhere.mockResolvedValue([{ orgId: 'org-1' }]);
mockCancelRunById.mockResolvedValue(true);

const caller = createCaller({ user: mockUser, effectiveOrgId: mockUser.orgId });
await caller.cancel({ runId: RUN_UUID });

// Wait a tick for the fire-and-forget to execute
await new Promise((resolve) => setImmediate(resolve));

expect(mockPublishCancelCommand).toHaveBeenCalledWith(RUN_UUID, 'Manually cancelled via API');
});

it('publishes cancel command with custom reason', async () => {
mockGetRunById.mockResolvedValue({
id: RUN_UUID,
projectId: 'p1',
status: 'running',
});
mockDbWhere.mockResolvedValue([{ orgId: 'org-1' }]);
mockCancelRunById.mockResolvedValue(true);

const caller = createCaller({ user: mockUser, effectiveOrgId: mockUser.orgId });
await caller.cancel({ runId: RUN_UUID, reason: 'Orphaned worker' });

// Wait a tick for the fire-and-forget to execute
await new Promise((resolve) => setImmediate(resolve));

expect(mockPublishCancelCommand).toHaveBeenCalledWith(RUN_UUID, 'Orphaned worker');
});

it('uses custom reason when provided', async () => {
mockGetRunById.mockResolvedValue({
id: RUN_UUID,
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/cli/scm/create-pr-review-sidecar.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ describe('CreatePRReviewCommand sidecar write', () => {
expect(existsSync(sidecarPath)).toBe(true);

const sidecar = JSON.parse(readFileSync(sidecarPath, 'utf-8'));
expect(sidecar).toEqual({
expect(sidecar).toMatchObject({
source: 'cascade-tools scm create-pr-review',
reviewUrl: 'https://github.com/owner/repo/pull/1#pullrequestreview-123',
event: 'REQUEST_CHANGES',
Expand Down
151 changes: 151 additions & 0 deletions tests/unit/router/cancel-listener.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import { beforeEach, describe, expect, it, vi } from 'vitest';

// ---------------------------------------------------------------------------
// Module mocks
// ---------------------------------------------------------------------------

const mockGetRunJobId = vi.fn();
vi.mock('../../../src/db/repositories/runsRepository.js', () => ({
getRunJobId: (...args: unknown[]) => mockGetRunJobId(...args),
}));

const mockSubscribeToCancelCommands = vi.fn();
const mockUnsubscribeFromCancelCommands = vi.fn();
vi.mock('../../../src/queue/cancel.js', () => ({
subscribeToCancelCommands: (...args: unknown[]) => mockSubscribeToCancelCommands(...args),
unsubscribeFromCancelCommands: (...args: unknown[]) => mockUnsubscribeFromCancelCommands(...args),
publishCancelCommand: vi.fn().mockResolvedValue(undefined),
}));

const mockKillWorker = vi.fn();
vi.mock('../../../src/router/container-manager.js', () => ({
killWorker: (...args: unknown[]) => mockKillWorker(...args),
}));

const { mockLogger } = vi.hoisted(() => ({
mockLogger: {
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
debug: vi.fn(),
},
}));
vi.mock('../../../src/utils/logging.js', () => ({
logger: mockLogger,
}));

// ---------------------------------------------------------------------------
// Imports (after mocks)
// ---------------------------------------------------------------------------

import { startCancelListener, stopCancelListener } from '../../../src/router/cancel-listener.js';

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

describe('cancel-listener', () => {
beforeEach(async () => {
vi.clearAllMocks();
// Reset module-level cancelSubscriberActive flag by stopping the listener
// (no-op if not active, safe to call always)
mockUnsubscribeFromCancelCommands.mockResolvedValue(undefined);
await stopCancelListener();
vi.clearAllMocks();
});

describe('startCancelListener', () => {
it('subscribes to cancel commands when REDIS_URL is set', async () => {
process.env.REDIS_URL = 'redis://localhost:6379';
vi.clearAllMocks(); // Ensure clean state
await startCancelListener();

expect(mockSubscribeToCancelCommands).toHaveBeenCalled();
});

it('kills worker when jobId is found in database', async () => {
process.env.REDIS_URL = 'redis://localhost:6379';
const handler = vi.fn();
mockSubscribeToCancelCommands.mockImplementation(async (cb: unknown) => {
handler.mockImplementation(cb);
});

await startCancelListener();

mockGetRunJobId.mockResolvedValue('job-123');

// Simulate receiving a cancel command
await handler({ runId: 'run-123', reason: 'user requested' });

expect(mockGetRunJobId).toHaveBeenCalledWith('run-123');
expect(mockKillWorker).toHaveBeenCalledWith('job-123');
});

it('logs a warning and does not kill any container when jobId is not found in DB', async () => {
process.env.REDIS_URL = 'redis://localhost:6379';
const handler = vi.fn();
mockSubscribeToCancelCommands.mockImplementation(async (cb: unknown) => {
handler.mockImplementation(cb);
});

await startCancelListener();

mockGetRunJobId.mockResolvedValue(null);

// Simulate receiving a cancel command
await handler({ runId: 'run-123', reason: 'timeout' });

expect(mockKillWorker).not.toHaveBeenCalled();
expect(mockLogger.warn).toHaveBeenCalledWith(
expect.stringContaining('JobId not found in DB for run'),
expect.objectContaining({ runId: 'run-123' }),
);
});

it('handles errors in cancel handler gracefully', async () => {
process.env.REDIS_URL = 'redis://localhost:6379';
const handler = vi.fn();
mockSubscribeToCancelCommands.mockImplementation(async (cb: unknown) => {
handler.mockImplementation(cb);
});

await startCancelListener();

mockGetRunJobId.mockRejectedValue(new Error('DB error'));

// Should not throw
await expect(handler({ runId: 'run-123', reason: 'user requested' })).resolves.not.toThrow();

expect(mockKillWorker).not.toHaveBeenCalled();
});
});

describe('stopCancelListener', () => {
it('can be called without error when not active', async () => {
await expect(stopCancelListener()).resolves.not.toThrow();
expect(mockUnsubscribeFromCancelCommands).not.toHaveBeenCalled();
});

it('calls unsubscribeFromCancelCommands when active', async () => {
process.env.REDIS_URL = 'redis://localhost:6379';
await startCancelListener();

mockUnsubscribeFromCancelCommands.mockResolvedValue(undefined);
await stopCancelListener();

expect(mockUnsubscribeFromCancelCommands).toHaveBeenCalled();
});

it('can be called multiple times without error', async () => {
process.env.REDIS_URL = 'redis://localhost:6379';
await startCancelListener();

mockUnsubscribeFromCancelCommands.mockResolvedValue(undefined);
await stopCancelListener();
await stopCancelListener(); // Second call is a no-op

// unsubscribe should only have been called once (second call is no-op)
expect(mockUnsubscribeFromCancelCommands).toHaveBeenCalledTimes(1);
});
});
});
Loading