Skip to content

feat(queue): add Redis pub/sub cancel channel and job_id column on agent_runs#773

Merged
aaight merged 1 commit intodevfrom
feature/redis-cancel-channel-and-job-id
Mar 13, 2026
Merged

feat(queue): add Redis pub/sub cancel channel and job_id column on agent_runs#773
aaight merged 1 commit intodevfrom
feature/redis-cancel-channel-and-job-id

Conversation

@aaight
Copy link
Copy Markdown
Collaborator

@aaight aaight commented Mar 13, 2026

Summary

This PR establishes the foundational cancel command infrastructure for the agent run cancellation feature epic:

  • Redis pub/sub cancel channel (src/queue/cancel.ts): Provides two core functions:

    • publishCancelCommand(runId, reason) - Dashboard publishes cancel requests to cascade:cancel channel
    • subscribeToCancelCommands(handler) - Router subscribes and handles cancel commands with error resilience
  • Database column for job tracking (0035_add_job_id_to_runs.sql):

    • Adds nullable job_id TEXT column to agent_runs table for zero-downtime deploy compatibility
    • Allows worker processes to store BullMQ job IDs for later cancellation lookups
  • Repository helpers (runsRepository.ts):

    • updateRunJobId(runId, jobId) - Store job ID after job enqueue
    • getRunJobId(runId) - Retrieve job ID for cancel operations

Technical Details

Connection pattern: Uses lazy singleton initialization matching src/queue/client.ts
Error handling: JSON parse failures and handler exceptions are logged but don't crash the subscription
Testing:

  • 9 unit tests for pub/sub module (publish, subscribe, error cases)
  • 7 unit tests for repository functions
  • All 4533 existing tests continue to pass

PR Contents

src/db/migrations/0035_add_job_id_to_runs.sql         (+1 lines)
src/db/migrations/meta/_journal.json                   (+7 lines, idx 35)
src/queue/cancel.ts                                    (+93 lines, new file)
src/db/schema/runs.ts                                  (+1 lines, jobId field)
src/db/repositories/runsRepository.ts                  (+15 lines, 2 new functions)
tests/unit/queue/cancel.test.ts                        (+314 lines, 9 tests)
tests/unit/db/runsRepository-jobId.test.ts             (+156 lines, 7 tests)
tests/unit/db/runsRepository.test.ts                   (+1 lines, schema mock update)

Acceptance Criteria

✅ New file src/queue/cancel.ts exports publishCancelCommand(runId, reason)
✅ New file src/queue/cancel.ts exports subscribeToCancelCommands(handler)
✅ Redis connections use lazy initialization matching src/queue/client.ts pattern
✅ New migration 0035_add_job_id_to_runs.sql adds nullable job_id TEXT column
✅ Migration journal entry added with idx 35 and unique when timestamp
✅ Drizzle schema includes jobId field on agentRuns table
✅ Repository functions updateRunJobId() and getRunJobId() implemented
✅ enrichedRunSelect includes jobId field for API visibility
✅ Unit tests cover pub/sub module with mocked Redis
✅ Unit tests cover repository functions with mocked DB
✅ All tests pass, typecheck and lint are clean

Testing

All 4533 tests pass. No linting or type errors.

Card: https://trello.com/c/MTfNVgY3/302-as-a-developer-i-want-a-redis-pub-sub-cancel-channel-and-jobid-column-on-agentruns-so-that-cancel-commands-can-reach-the-router

…ent_runs

- Add src/queue/cancel.ts module with publishCancelCommand() and subscribeToCancelCommands()
  for Dashboard→Router kill command communication via Redis pub/sub
- Add job_id TEXT column to agent_runs table via migration 0035
- Update Drizzle schema to include jobId field
- Add repository functions updateRunJobId() and getRunJobId()
- Include jobId in enrichedRunSelect for API visibility
- Add comprehensive unit tests for pub/sub module (9 tests) and repository functions (7 tests)
- All tests passing, lint and typecheck clean

Fixes: #302
Copy link
Copy Markdown
Collaborator

@nhopeatall nhopeatall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary

Clean foundational infrastructure for the cancel command feature. The code is correct, follows existing patterns, and is well-tested. Ready to merge.

Notes

  • No consumers yetpublishCancelCommand, subscribeToCancelCommands, updateRunJobId, and getRunJobId are all defined but not wired into production code paths. This is expected per the PR description (foundational work for the cancellation epic).

  • Payload validation on subscribe (minor, non-blocking): subscribeToCancelCommands uses JSON.parse(message) as CancelCommandPayload — a type assertion without runtime validation. If a malformed-but-valid-JSON message lands on the channel (e.g., {"foo": 1}), the handler receives an object with undefined for runId/reason. In a controlled internal system this is low risk, but a lightweight Zod parse (or even a runId && reason guard) in a follow-up PR would make the contract more robust. (See src/queue/cancel.ts:84)

  • enrichedRunSelect now includes jobId: This means getRunById, getRunsByWorkItem, and getRunsForPR return jobId to API consumers. The listRuns dashboard query intentionally omits it. For an internal platform tool this is fine — just noting it's now part of the API surface.

  • Redis connection pattern: Uses new Redis(redisUrl) directly (correct for ioredis), separate from the parseRedisUrl helper used by BullMQ consumers. Both patterns are appropriate for their respective use cases.

Everything else checks out: migration is correct for zero-downtime deploys, journal entry is properly sequenced, Drizzle schema matches, repository functions follow existing conventions, and test coverage is thorough.

@aaight aaight merged commit 9aeba70 into dev Mar 13, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants