The ingest pipeline has two problems:
-
Overloaded event semantics: IngestStarted means both "a new ingest run was kicked off" and "please fetch the next batch." These are different things.
-
No transient failure handling: When K8s Jobs fail due to cluster pressure (scheduling timeouts, pod evictions, resource exhaustion), the worker retries immediately — making the pressure worse. On a multi-tenant cloud deployment, this cascading retry pattern can starve other tenants.
Observed in production: scheduling timeouts, pod evictions, and "K8s resource not found" errors across concurrent hook workers, with immediate retries amplifying the problem.
Design
1. Event vocabulary cleanup
Split IngestStarted into two events with distinct purposes:
IngestRunStarted — emitted once by StartIngest command. Domain fact / notifier for observability, audit, and future consumers.
NextBatchRequested — emitted by StartIngest (first batch) and by RunIngester (continuation when there's more data). The only event RunIngester listens to.
RunIngester listens to NextBatchRequested, runs the ingester, and if there's more data emits another NextBatchRequested. Same sequential flow as today, but with honest event names. Session management stays simple — one ingester runs at a time, each batch reads the previous batch's session.
StartIngest ──► IngestRunStarted (notifier, observability)
──► NextBatchRequested ──► RunIngester ──► IngesterBatchReady ──► RunHooks ──► HookBatchCompleted ──► PublishBatch
▲ │
│ │ (if has_more)
└─── NextBatchRequested
2. Transient failure classification and retry with backoff
When a K8s Job fails due to cluster pressure, the system should back off instead of retrying immediately.
New error types in domain/shared/error.py:
TransientResourceError(InfrastructureError) — retry with backoff (scheduling timeout, pod eviction, resource pressure)
PermanentRunnerError(InfrastructureError) — do not retry (image pull failure, config error)
K8s adapter classifies failures and raises the appropriate error. Worker pool handles them:
TransientResourceError → set deliver_after = now + backoff (60s × 2^attempt, capped at 5min)
PermanentRunnerError → mark delivery failed, no retry
The deliver_after field already exists on the deliveries table. The worker pool just needs to respect it when catching these errors.
How they interact
The ingester runs freely (it's fast and cheap — just fetches records). IngesterBatchReady events queue up in the outbox. RunHooks workers pick them up and create K8s Jobs. When the cluster is full, Jobs fail to schedule, TransientResourceError fires, and the event is deferred with backoff. The pipeline naturally settles at whatever throughput the cluster can sustain.
No scheduler handler, no max_inflight config, no dual emission complexity. The backoff is reactive — the cluster itself is the signal for capacity, not a domain-level estimate of it.
Edge cases
- All hook batches failing: backoff delays grow exponentially, system slows to a crawl but doesn't thrash. If a max attempt count is exceeded, the delivery is marked failed. A separate mechanism (max failure count on the aggregate) should mark the ingest run as failed.
- Multi-tenant pressure: backoff applies globally across all runs and tenants. One tenant's failed hooks back off, freeing capacity for others.
Implementation order
- Event vocabulary rename (
IngestStarted → IngestRunStarted + NextBatchRequested)
- Error types (
TransientResourceError, PermanentRunnerError)
- K8s adapter error classification (map scheduling timeout, pod eviction, etc. to the new types)
- Worker pool backoff (catch
TransientResourceError, set deliver_after)
The ingest pipeline has two problems:
Overloaded event semantics:
IngestStartedmeans both "a new ingest run was kicked off" and "please fetch the next batch." These are different things.No transient failure handling: When K8s Jobs fail due to cluster pressure (scheduling timeouts, pod evictions, resource exhaustion), the worker retries immediately — making the pressure worse. On a multi-tenant cloud deployment, this cascading retry pattern can starve other tenants.
Observed in production: scheduling timeouts, pod evictions, and "K8s resource not found" errors across concurrent hook workers, with immediate retries amplifying the problem.
Design
1. Event vocabulary cleanup
Split
IngestStartedinto two events with distinct purposes:IngestRunStarted— emitted once byStartIngestcommand. Domain fact / notifier for observability, audit, and future consumers.NextBatchRequested— emitted byStartIngest(first batch) and byRunIngester(continuation when there's more data). The only eventRunIngesterlistens to.RunIngesterlistens toNextBatchRequested, runs the ingester, and if there's more data emits anotherNextBatchRequested. Same sequential flow as today, but with honest event names. Session management stays simple — one ingester runs at a time, each batch reads the previous batch's session.2. Transient failure classification and retry with backoff
When a K8s Job fails due to cluster pressure, the system should back off instead of retrying immediately.
New error types in
domain/shared/error.py:TransientResourceError(InfrastructureError)— retry with backoff (scheduling timeout, pod eviction, resource pressure)PermanentRunnerError(InfrastructureError)— do not retry (image pull failure, config error)K8s adapter classifies failures and raises the appropriate error. Worker pool handles them:
TransientResourceError→ setdeliver_after = now + backoff(60s × 2^attempt, capped at 5min)PermanentRunnerError→ mark delivery failed, no retryThe
deliver_afterfield already exists on the deliveries table. The worker pool just needs to respect it when catching these errors.How they interact
The ingester runs freely (it's fast and cheap — just fetches records).
IngesterBatchReadyevents queue up in the outbox.RunHooksworkers pick them up and create K8s Jobs. When the cluster is full, Jobs fail to schedule,TransientResourceErrorfires, and the event is deferred with backoff. The pipeline naturally settles at whatever throughput the cluster can sustain.No scheduler handler, no
max_inflightconfig, no dual emission complexity. The backoff is reactive — the cluster itself is the signal for capacity, not a domain-level estimate of it.Edge cases
Implementation order
IngestStarted→IngestRunStarted+NextBatchRequested)TransientResourceError,PermanentRunnerError)TransientResourceError, setdeliver_after)