Skip to content

feat: cancel running ingest runs #123

@rorybyrne

Description

@rorybyrne

Summary

Add the ability to cancel a running ingest run. Currently, once an ingest is started there is no way to stop it — event handlers chain indefinitely until all batches complete or fail. The backpressure loop (NextBatchRequested deferred every 60s when the cluster is full) also has no termination condition, relying on the cluster eventually recovering capacity.

Design

New status: CANCELLED

Add CANCELLED to IngestStatus enum. Transition: RUNNING → CANCELLED only.

Handler short-circuit

Every ingest handler already loads the ingest run as its first step. Add one check after the load:

if ingest_run.status == IngestStatus.CANCELLED:
    log.info("ingest run cancelled, skipping", ingest_run_id=event.ingest_run_id)
    return

Handlers affected:

  • RunIngester — stops fetching new batches, breaks the backpressure loop
  • RunHooks — skips hook execution for queued batches
  • PublishBatch — skips publishing for completed batches

In-flight work

Soft cancel only. K8s Jobs already running finish on their own (or hit activeDeadlineSeconds). Their downstream handlers see CANCELLED and skip processing. No active pod deletion needed.

Completion

When the last in-flight handler returns (skipping due to CANCELLED), the normal completion check may not fire since we're short-circuiting before counter increments. Two options:

  • Option A: Set completion at cancel time. cancel() sets status=CANCELLED and completed_at=now(). The run is terminal. In-flight handlers skip silently.
  • Option B: Cancelled handlers still increment counters. Completion fires normally when all batches are accounted for, but with status=CANCELLED instead of COMPLETED.

Option A is simpler and avoids waiting for in-flight work to drain.

API

POST /ingest-runs/{id}/cancel

Returns 200 with the updated IngestRun. Returns 409 if already completed/failed/cancelled.

Service method

async def cancel_ingest(self, ingest_run_id: IngestRunId) -> IngestRun:
    run = await self.ingest_repo.get(ingest_run_id)
    if run.status != IngestStatus.RUNNING:
        raise ConflictError(f"Cannot cancel from {run.status}")
    run.status = IngestStatus.CANCELLED
    run.completed_at = datetime.now(UTC)
    await self.ingest_repo.save(run)
    return run

Implementation

  • Add CANCELLED to IngestStatus enum
  • Add status check at top of RunIngester.handle(), RunHooks.handle(), PublishBatch.handle()
  • Add IngestService.cancel_ingest() method
  • Add POST /ingest-runs/{id}/cancel route
  • Tests: cancel a running ingest, verify handlers skip, verify backpressure loop terminates

Metadata

Metadata

Assignees

No one assigned

    Labels

    featureNew functionality

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions