feat: ingest pipeline backpressure and transient failure handling#121
feat: ingest pipeline backpressure and transient failure handling#121
Conversation
…led tracking This migration adds support for explicit delivery scheduling and batch failure accounting: - Add deliver_after column to deliveries table for backoff scheduling - Add batches_failed column to ingest_runs table for failure tracking - Create index on deliver_after for efficient pending delivery queries feat: replace IngestStarted with IngestRunStarted and NextBatchRequested events Split the single IngestStarted event into two distinct events: - IngestRunStarted for observability/audit purposes - NextBatchRequested for triggering ingester batch pulls This separation clarifies event semantics and handler responsibilities. feat: add comprehensive error handling with TransientError and PermanentError Introduce structured error classification for runner failures: - TransientError for retryable failures (timeouts, resource pressure) - PermanentError for unrecoverable failures (config errors, image pull) - OOMError as specialized PermanentError for memory exhaustion feat: implement batch failure accounting in ingest service Add methods to handle permanent batch and ingestion failures: - fail_batch() for hook processing failures - fail_ingestion() for ingester pull failures - Update completion logic to account for failed batches feat: enhance worker retry logic with exponential backoff Implement sophisticated retry handling: - Exponential backoff for transient failures - Immediate failure for permanent errors - deliver_after timestamp for deferred retry scheduling - Call handler.on_exhausted() when retries are exhausted refactor: remove HookStatus.FAILED and HookStatus.OOM Replace status-based failure reporting with exception-based error handling: - Hook failures now raise appropriate error types - Simplify validation run status calculation - Remove oom_killed property from HookResult refactor: update K8s and OCI runners to use new error types Convert all runner implementations to raise structured exceptions: - Map container exit codes and OOM kills to appropriate error types - Remove result-based error reporting - Simplify job completion logic feat: add on_exhausted callback to EventHandler Allow handlers to perform cleanup when events cannot be processed: - Called when retry limit is reached or permanent failure occurs - Default implementation is no-op for backward compatibility
|
Greptile SummaryThis PR introduces backpressure and transient failure handling for the ingest pipeline:
Confidence Score: 3/5Not safe to merge without addressing the open P1s from prior review threads and the new on_exhausted / stale-output issues found here. Several P1 findings remain unresolved from prior review rounds (duplicate IngestCompleted, unbounded backpressure loop, non-zero ingester exit as TransientError, migration srn→id rename gap, DI resolution outside try/finally). This round adds two new P1s: on_exhausted exceptions inside error handlers skip mark_failed leaving deliveries stuck, and the K8s label scheme change breaks job idempotency during rolling deploys. The core accounting logic (batches_failed, check_completion guards, has_capacity Unschedulable fix) is solid, but the delivery lifecycle and deploy story both have gaps that warrant fixes before merging. server/osa/infrastructure/event/worker.py (on_exhausted error handling), server/osa/infrastructure/k8s/runner.py and ingester_runner.py (label migration + stale-output deletion), server/migrations/versions/add_deliver_after.py (missing srn→id rename step)
|
| Filename | Overview |
|---|---|
| server/osa/infrastructure/event/worker.py | Adds TransientError/PermanentError/Exception handlers with on_exhausted + mark_failed sequencing; on_exhausted raising inside an except block will skip mark_failed and propagate the error, leaving deliveries stuck in claimed state. |
| server/osa/domain/ingest/handler/run_ingester.py | Handles NextBatchRequested with backpressure check and PermanentError catch; has_capacity→deliver_after loop has no counter-based termination (prior thread); fail_ingestion accounting logic is correct for the normal path. |
| server/osa/domain/ingest/service/ingest.py | fail_batch/fail_ingestion/check_completion logic is sound; PublishBatch and IngestService both emit IngestCompleted with status saved — concurrent run still has a narrow duplicate-emission window (prior thread). |
| server/osa/infrastructure/k8s/runner.py | API objects moved to init; diagnose_failure now returns exceptions instead of HookResult; label scheme changed from osa.io/run-id to osa.io/ingest-run-id + osa.io/ingest-run-batch — in-flight jobs from the old scheme won't be found after deployment. |
| server/osa/infrastructure/k8s/ingester_runner.py | has_capacity correctly checks PodScheduled=Unschedulable (fixes prior thread); output/files cleared before retry (good for failed runs, but also clears successful output on stale-claim redelivery); non-zero exit classified as TransientError (prior thread). |
| server/migrations/versions/add_deliver_after.py | Adds deliver_after (nullable) to deliveries and batches_failed (not null, default 0) to ingest_runs; does not contain the srn→id rename (prior thread). |
| server/osa/domain/ingest/handler/run_hooks.py | OOMError caught to emit HookBatchCompleted with partial results; PermanentError caught to call fail_batch; TransientError propagates to worker for retry — all three paths account correctly for the batch. |
| server/osa/domain/ingest/handler/publish_batch.py | Now saves status=COMPLETED before emitting IngestCompleted (fixing silent state loss); concurrent emit from IngestService._check_completion still possible within the save-then-append window (prior thread). |
| server/osa/domain/ingest/model/ingest_run.py | batches_failed added; is_complete updated to (completed + failed) >= ingested; check_completion correctly guards status == RUNNING before transitioning. |
| server/osa/infrastructure/persistence/repository/ingest.py | srn→id rename applied throughout; increment_failed added atomically via UPDATE...RETURNING; _row_to_ingest_run uses .get(batches_failed, 0) for backward compatibility. |
Sequence Diagram
sequenceDiagram
participant S as IngestService
participant O as Outbox
participant RI as RunIngester
participant K as K8sIngesterRunner
participant RH as RunHooks
participant PB as PublishBatch
S->>O: NextBatchRequested (+ IngestRunStarted)
O->>RI: deliver NextBatchRequested
alt Cluster at capacity
RI->>O: NextBatchRequested (deliver_after=+60s)
RI-->>O: mark original delivered
else Capacity available
RI->>K: run(ingester)
alt TransientError
K-->>RI: raise TransientError
RI-->>O: mark_failed_with_retry (exponential backoff)
else PermanentError
K-->>RI: raise PermanentError
RI->>S: fail_ingestion()
S->>O: IngestCompleted (if last batch)
else Success
K-->>RI: IngesterOutput
RI->>O: IngesterBatchReady
RI->>O: NextBatchRequested (if has_more)
end
end
O->>RH: deliver IngesterBatchReady
alt OOMError exhausted
RH->>O: HookBatchCompleted (partial)
else PermanentError
RH->>S: fail_batch()
S->>O: IngestCompleted (if last batch)
else Success
RH->>O: HookBatchCompleted
end
O->>PB: deliver HookBatchCompleted
PB->>O: IngestCompleted (if all batches accounted for)
Reviews (6): Last reviewed commit: "fix: only trigger backpressure on genuin..." | Re-trigger Greptile
| async def _check_completion(self, ingest_run: IngestRun) -> None: | ||
| """Transition to COMPLETED and emit IngestCompleted if all batches are accounted for.""" | ||
| if not ingest_run.is_complete: | ||
| return | ||
| now = datetime.now(UTC) | ||
| ingest_run.check_completion(now) | ||
| await self.ingest_repo.save(ingest_run) | ||
| await self.outbox.append( | ||
| IngestCompleted( | ||
| id=EventId(uuid4()), | ||
| ingest_run_srn=ingest_run.srn, | ||
| total_published=ingest_run.published_count, | ||
| ) | ||
| ) |
There was a problem hiding this comment.
IngestCompleted may be emitted on already-completed runs
The return value of ingest_run.check_completion(now) is ignored. check_completion (in ingest_run.py) only performs the state transition and returns True when status == RUNNING; it returns False if the run is already COMPLETED or FAILED. However, _check_completion always proceeds to save and append(IngestCompleted) regardless.
This can happen in practice when a stale-claim reset re-queues an already-processed IngesterBatchReady or NextBatchRequested delivery: the worker retries it, on_exhausted calls _fail_batch/_fail_ingestion again, increment_failed returns an IngestRun with is_complete=True and status=COMPLETED, and a second IngestCompleted event is appended to the outbox — potentially triggering duplicate downstream processing.
| async def _check_completion(self, ingest_run: IngestRun) -> None: | |
| """Transition to COMPLETED and emit IngestCompleted if all batches are accounted for.""" | |
| if not ingest_run.is_complete: | |
| return | |
| now = datetime.now(UTC) | |
| ingest_run.check_completion(now) | |
| await self.ingest_repo.save(ingest_run) | |
| await self.outbox.append( | |
| IngestCompleted( | |
| id=EventId(uuid4()), | |
| ingest_run_srn=ingest_run.srn, | |
| total_published=ingest_run.published_count, | |
| ) | |
| ) | |
| async def _check_completion(self, ingest_run: IngestRun) -> None: | |
| """Transition to COMPLETED and emit IngestCompleted if all batches are accounted for.""" | |
| if not ingest_run.is_complete: | |
| return | |
| now = datetime.now(UTC) | |
| if not ingest_run.check_completion(now): | |
| return # Already in a terminal state — do not emit a duplicate event | |
| await self.ingest_repo.save(ingest_run) | |
| await self.outbox.append( | |
| IngestCompleted( | |
| id=EventId(uuid4()), | |
| ingest_run_srn=ingest_run.srn, | |
| total_published=ingest_run.published_count, | |
| ) | |
| ) |
| self._state.current_batch = result.events | ||
| self._state.last_claim_at = result.claimed_at | ||
|
|
||
| handler = await scope.get(self._handler_type) |
There was a problem hiding this comment.
Handler resolved outside the
try/finally block
handler = await scope.get(self._handler_type) was moved before the try block so the error-handling branches (TransientError, PermanentError, Exception) can call handler.on_exhausted(...). However, if DI resolution raises (e.g. a misconfigured dependency), the finally block at line 267 never executes. This leaves self._state.current_batch populated and self._state.status stuck at PROCESSING, and the claimed deliveries remain in claimed status until the stale-claim cleanup fires.
One approach is to initialise handler to None before the try, and guard on_exhausted calls with if handler:
handler = None
try:
handler = await scope.get(self._handler_type)
events = result.events
...
except TransientError as e:
...
for delivery in result.deliveries:
...
if exhausted:
if handler:
await handler.on_exhausted(delivery.event)
await outbox.mark_failed(delivery.id, str(e))This keeps the finally covering handler initialisation and preserves the on_exhausted behaviour for the common case.
| if terminated: | ||
| if getattr(terminated, "reason", None) == "OOMKilled": | ||
| raise ExternalServiceError("Source killed by OOM") | ||
| return OOMError("Source killed by OOM") | ||
| exit_code = getattr(terminated, "exit_code", -1) | ||
| if exit_code != 0: | ||
| raise ExternalServiceError(f"Source exited with code {exit_code}") | ||
| except ExternalServiceError: | ||
| raise | ||
| return TransientError(f"Source exited with code {exit_code}") | ||
| except Exception: |
There was a problem hiding this comment.
Non-zero exit code classified as
TransientError for ingester, but PermanentError for hooks
In K8sHookRunner._diagnose_failure, a container exit code ≠ 0 raises PermanentError("Hook exited with code …"). Here in K8sIngesterRunner._diagnose_failure, the same situation raises TransientError("Source exited with code …"). If an ingester consistently crashes with a non-zero exit code (e.g. due to a configuration or code bug), the worker will retry it up to __max_retries__ times with full exponential backoff before eventually giving up — amplifying cluster load rather than failing fast.
If this asymmetry is intentional (e.g. transient cluster instability can cause spurious non-zero exits for ingesters but not hooks), a brief comment explaining the rationale would help future readers.
| if terminated: | |
| if getattr(terminated, "reason", None) == "OOMKilled": | |
| raise ExternalServiceError("Source killed by OOM") | |
| return OOMError("Source killed by OOM") | |
| exit_code = getattr(terminated, "exit_code", -1) | |
| if exit_code != 0: | |
| raise ExternalServiceError(f"Source exited with code {exit_code}") | |
| except ExternalServiceError: | |
| raise | |
| return TransientError(f"Source exited with code {exit_code}") | |
| except Exception: | |
| if exit_code != 0: | |
| # Ingester non-zero exits are treated as transient: cluster | |
| # instability (OOM eviction, preemption) can cause spurious | |
| # failures that succeed on retry. Hooks use PermanentError | |
| # for the same case because hook logic is deterministic. | |
| return TransientError(f"Source exited with code {exit_code}") |
Change ingest run identifier from SRN format to simple string ID to simplify the data model and reduce coupling to SRN conventions. Remove foreign key constraint from ingest_runs table to conventions table to allow more flexible data management.
Remove redundant is_complete property check and inline datetime creation to streamline completion validation flow feat: add clarifying comment for ingester exit code handling Explain that non-zero exit codes are treated as transient errors for ingesters due to upstream API failures vs permanent hook errors
Add backpressure mechanism to prevent ingester from overwhelming hook processing by limiting pending batches and deferring next batch requests when threshold is exceeded. Add container log capture functionality to both ingester and hook runners for improved failure diagnostics, with implementation for Kubernetes environments and stub for OCI containers. Update outbox to support delayed event delivery for backpressure implementation.
Add __max_retries__ = 100 to RunHooks handler to configure retry behavior and enhance backpressure log message to include delay duration for better observability
Replace MAX_PENDING_BATCHES constant with dynamic cluster capacity checking to prevent job submission when cluster cannot schedule more work, improving resource utilization and avoiding timeouts
refactor: remove unused HookRunner dependency from RunHooks handler feat: add max_retries configuration to RunIngester event handler refactor: capture container logs in PermanentError for better debugging style: improve log message formatting for backpressure and retry delays feat: add container_logs field to InfrastructureError base class refactor: capture pod logs before cleanup in K8s runners for debugging style: improve logging messages and use structured logging consistently refactor: simplify module name abbreviation to SLF4J-style format
- Add deletion of stale files from previous failed runs in addition to output cleanup to prevent data corruption - Improve error logging by explicitly logging container logs when jobs fail to aid in debugging and troubleshooting
|
@greptile |
| "deliveries", | ||
| sa.Column("deliver_after", sa.DateTime(timezone=True), nullable=True), | ||
| ) | ||
| op.create_index( | ||
| "idx_deliveries_deliver_after", | ||
| "deliveries", | ||
| ["deliver_after"], | ||
| postgresql_where=sa.text("status = 'pending'"), | ||
| ) | ||
|
|
||
| op.add_column( | ||
| "ingest_runs", | ||
| sa.Column("batches_failed", sa.Integer, nullable=False, server_default=sa.text("0")), | ||
| ) | ||
|
|
||
|
|
||
| def downgrade() -> None: | ||
| op.drop_column("ingest_runs", "batches_failed") | ||
| op.drop_index("idx_deliveries_deliver_after", table_name="deliveries") |
There was a problem hiding this comment.
srn→id column rename not captured in the migration chain
add_ingest_runs.py was edited in-place to rename the primary key from srn to id and to drop the ForeignKey("conventions.srn") constraint on convention_srn. The new add_deliver_after migration only adds batches_failed and deliver_after — it contains no ALTER TABLE ingest_runs RENAME COLUMN srn TO id or DROP CONSTRAINT step.
Any environment (CI, staging, a teammate's dev DB) that already has add_ingest_runs applied with the old schema will:
- Run
add_deliver_aftersuccessfully (adds new columns) - Still have a
srncolumn instead ofid, causing every query with.where(ingest_runs_table.c.id == ...)to fail at runtime
If the old migration has never been applied anywhere this is fine, but if it has, the upgrade needs an explicit rename step in add_deliver_after:
op.alter_column("ingest_runs", "srn", new_column_name="id")
op.drop_constraint("<fk_constraint_name>", "ingest_runs", type_="foreignkey")Add on_exhausted method to PublishBatch handler to account for failed batches when publish retries are exhausted, ensuring proper ingest run state tracking and preventing stuck ingestion processes.
|
@greptile |
Add OOMError handling to RunHooks handler to emit HookBatchCompleted even when memory is exhausted, allowing PublishBatch to process records that passed validation before OOM occurred.
|
@greptile |
| ) | ||
| await self.outbox.append( | ||
| NextBatchRequested( | ||
| id=EventId(uuid4()), | ||
| ingest_run_id=event.ingest_run_id, | ||
| convention_srn=event.convention_srn, | ||
| batch_size=event.batch_size, | ||
| ), | ||
| deliver_after=datetime.now(UTC) + BACKPRESSURE_DELAY, | ||
| ) | ||
| return | ||
|
|
||
| if ingest_run.status == IngestStatus.PENDING: | ||
| ingest_run.mark_running() |
There was a problem hiding this comment.
Backpressure loop has no termination condition
When has_capacity() returns False, the handler appends a brand-new NextBatchRequested with deliver_after and marks the current delivery as succeeded — so the new event always enters the outbox with retry_count=0. Because exhaustion is tested as retry_count + 1 >= __max_retries__ (i.e. 1 >= 20), the __max_retries__ = 20 guard never fires and on_exhausted is never called. A cluster that stays saturated will generate an unbounded chain of these events; the ingest run neither completes nor is accounted for as failed.
Consider adding a backpressure attempt counter to the event payload (or the IngestRun row) and failing the ingestion after a configurable maximum:
# e.g., keep a backpressure_attempts field on NextBatchRequested
if event.backpressure_attempts >= MAX_BACKPRESSURE_ATTEMPTS:
await self._fail_ingestion(event)
return
await self.outbox.append(
NextBatchRequested(
...,
backpressure_attempts=event.backpressure_attempts + 1,
),
deliver_after=datetime.now(UTC) + BACKPRESSURE_DELAY,
)|
@greptile |
| async def has_capacity(self) -> bool: | ||
| """Check for pending (unschedulable) pods in the namespace.""" | ||
| namespace = self._config.namespace | ||
| try: | ||
| pod_list = await self._core_api.list_namespaced_pod( | ||
| namespace, field_selector="status.phase=Pending" | ||
| ) | ||
| return len(pod_list.items) == 0 | ||
| except Exception as e: | ||
| logger.warn( | ||
| "Failed to check cluster capacity: {error} — assuming capacity", error=str(e) | ||
| ) | ||
| return True |
There was a problem hiding this comment.
has_capacity() triggers on normally-scheduling pods, not just unschedulable ones
field_selector="status.phase=Pending" matches every pod in the Pending phase, including pods that are actively being scheduled and will be Running in seconds. A pod only becomes truly "unschedulable" when the scheduler assigns it a PodScheduled=False condition with reason Unschedulable.
In an active cluster running concurrent hook/ingester jobs, there will almost always be at least one pod briefly in Pending state, so has_capacity() fires false negatives continuously. Each false negative appends a new NextBatchRequested with retry_count=0 (as noted elsewhere), so the ingest pipeline can be permanently stalled at 1 batch per 60 seconds regardless of actual cluster pressure.
A targeted check avoids the false positives:
pod_list = await self._core_api.list_namespaced_pod(
namespace,
field_selector="status.phase=Pending",
label_selector="osa.io/role in (ingester,hook)",
)
for pod in pod_list.items:
conditions = getattr(pod.status, "conditions", []) or []
for cond in conditions:
if getattr(cond, "reason", None) == "Unschedulable":
return False
return Truehas_capacity() was checking status.phase=Pending which matches all pods in normal startup (image pull, node assignment), causing false backpressure that throttled ingestion to ~1 batch/60s. Now checks for PodScheduled=False with reason=Unschedulable, which only fires when the K8s scheduler genuinely can't place a pod.
|
@greptile |
| ) | ||
|
|
||
| except PermanentError as e: | ||
| self._state.failed_count += len(result.deliveries) | ||
| self._state.error = e | ||
| logger.error( | ||
| "Worker '{name}' permanent failure: {error}", | ||
| name=self.name, | ||
| error=str(e), | ||
| ) | ||
| for delivery in result.deliveries: | ||
| await handler.on_exhausted(delivery.event) |
There was a problem hiding this comment.
on_exhausted raising skips mark_failed, leaving delivery stuck in claimed
handler.on_exhausted(delivery.event) is called immediately before outbox.mark_failed(...). If on_exhausted raises (e.g., a DB outage while fail_batch/fail_ingestion executes), the exception propagates out of the except PermanentError block, mark_failed is never called, and the delivery stays in claimed status until the stale-claim reset fires. The same gap exists in the TransientError exhaustion branch and the generic Exception exhaustion branch.
Guarding each call individually ensures mark_failed is always reached:
for delivery in result.deliveries:
try:
await handler.on_exhausted(delivery.event)
except Exception as exhausted_err:
logger.error(
"Worker '{name}' on_exhausted failed: {error}",
name=self.name,
error=str(exhausted_err),
)
await outbox.mark_failed(delivery.id, str(e))| "active:{job_name}" if a running Job exists | ||
| None if no Job or only failed Jobs exist | ||
| """ | ||
| label_selector = f"osa.io/hook={hook_name},osa.io/run-id={label_value(run_id)}" | ||
| ingest_run_id = run_id.split("_b", 1)[0] | ||
| batch_index = run_id.split("_b", 1)[1] if "_b" in run_id else "0" | ||
| label_selector = f"osa.io/hook={hook_name},osa.io/ingest-run-id={ingest_run_id},osa.io/ingest-run-batch={batch_index}" | ||
| try: | ||
| job_list = await batch_api.list_namespaced_job(namespace, label_selector=label_selector) | ||
| job_list = await self._batch_api.list_namespaced_job( | ||
| namespace, label_selector=label_selector |
There was a problem hiding this comment.
K8s label scheme change breaks idempotency during rolling deploy
_check_existing_job now looks for osa.io/ingest-run-id + osa.io/ingest-run-batch labels, while the old code wrote osa.io/run-id labels. During a rolling deploy, new-code workers processing an IngesterBatchReady or NextBatchRequested for a job that an old-code worker already created won't find it (label mismatch returns None), causing the new worker to delete stale output and create a duplicate K8s Job for the same batch.
If any in-flight jobs are expected during the deploy window, draining the worker queue before deploying (or keeping the old label alongside the new one temporarily) prevents duplicate job creation.
Two fixes: 1. Worker: wrap on_exhausted() in try/except so mark_failed always runs. Previously, if on_exhausted raised (e.g., DB outage during fail_batch), mark_failed was skipped and the delivery stayed claimed indefinitely. 2. PublishBatch: delegate completion to IngestService.complete_batch() instead of duplicating the completion check and IngestCompleted emission. IngestCompleted is now emitted from exactly one place.
Summary
deliver_afteron the deliveries table, instead of retrying immediately and amplifying cluster pressureIngestStartedintoIngestRunStarted(observability) +NextBatchRequested(continuation trigger)TransientError/PermanentError/OOMError—HookResultonly carries business outcomes (PASSED/REJECTED), all failures are exceptions caught at the appropriate layerbatches_failedcounter on IngestRun, completion condition is(completed + failed) >= ingested— runs complete with failures noted, not hang foreveron_exhaustedhook:EventHandlerbase class declares cleanup behavior when delivery retries are spent, used by RunHooks and RunIngester to account for failed batches__init__instead of threaded through every method;_diagnose_failurereturns exceptions instead of raising (callers raise)Closes #120
Test plan
just server lintpasses (ruff + ty)uv run pytest tests/unit/ -v— 1006 tests pass