feat: add ingest domain with OOM retry and checkpointing#112
Conversation
|
2f43632 to
c8ea3ab
Compare
Greptile SummaryThis PR introduces a batch-native ingest domain (PDB, GEO, UniProt), OOM retry with memory doubling and checkpointing, file-size sorting, and One new P1 remains: Confidence Score: 4/5Safe to merge with the understanding that limit-based ingest may source fewer records than requested when API page sizes are smaller than batch_size. All prior P0/P1 findings from previous review rounds are resolved. One new P1 remains in run_ingester.py where the limit quota uses batch_size as a per-batch estimate rather than actual record counts, causing premature stop for sub-full batches. The remaining finding is P2. server/osa/domain/ingest/handler/run_ingester.py — total_sourced and ingested_so_far calculation logic.
|
| Filename | Overview |
|---|---|
| server/osa/domain/ingest/handler/run_ingester.py | Contains a P1 logic bug where batch quota uses batch_size instead of actual records returned, causing under-ingestion when batches are smaller than batch_size. The remaining <= 0 early-return (flagged previously) also remains unaddressed. |
| server/osa/domain/ingest/handler/publish_batch.py | Previous review issues resolved: _get_passed_records uses feature_storage abstraction (S3/K8s safe), intersects all hooks' passed IDs, and uses len(published) for accurate published_count. |
| server/osa/domain/validation/service/hook.py | OOM retry with checkpointing cleanly implemented. Checkpoint load/write, memory doubling, and outcome merging across retries look correct. |
| server/osa/domain/feature/handler/insert_batch_features.py | Correctly uses upstream_to_record_srn mapping and read_batch_outcomes. Minor: string literal "passed" instead of OutcomeStatus.PASSED on line 44. |
| server/osa/domain/ingest/handler/run_hooks.py | Delegates correctly to HookService for OOM retry. Passes full batch to each hook independently; _get_passed_records in PublishBatch correctly intersects outcomes across all hooks. |
| server/migrations/versions/add_ingest_runs.py | Adds ingest_runs table with correct column types and indexes. record_limit column name avoids SQL reserved word and maps correctly to IngestRun.limit in the repository. |
| server/osa/infrastructure/persistence/repository/ingest.py | Atomic counter updates via SQL UPDATE … RETURNING correctly avoid lost-update races under concurrent PublishBatch workers. |
| server/osa/infrastructure/storage/layout.py | Consolidates all ingest path resolution with _safe_srn sanitisation. Paths used by RunHooks, PublishBatch, and InsertBatchFeatures are consistent. |
Sequence Diagram
sequenceDiagram
participant API
participant IngestService
participant RunIngester
participant RunHooks
participant HookService
participant PublishBatch
participant InsertBatchFeatures
API->>IngestService: start_ingest(convention_srn, limit)
IngestService->>RunIngester: IngestStarted
loop Each batch (while has_more)
RunIngester->>RunIngester: run ingester container
RunIngester->>RunHooks: IngesterBatchReady(batch_index)
loop Each hook (sequential)
RunHooks->>HookService: run_hook(hook, inputs, work_dir)
loop OOM retry (max 3x, 2x memory)
HookService->>HookService: run hook container
alt OOM killed
HookService->>HookService: write_checkpoint, double memory
else success
HookService->>HookService: write_batch_outcomes
end
end
end
RunHooks->>PublishBatch: HookBatchCompleted(batch_index)
PublishBatch->>PublishBatch: _get_passed_records (intersect all hooks)
PublishBatch->>PublishBatch: bulk_publish(drafts)
PublishBatch->>InsertBatchFeatures: IngestBatchPublished(upstream_to_record_srn)
InsertBatchFeatures->>InsertBatchFeatures: insert_features per published SRN
alt All batches done
PublishBatch->>PublishBatch: IngestCompleted
else has_more
RunIngester->>RunIngester: IngestStarted (next batch)
end
end
Reviews (5): Last reviewed commit: "feat(image): add PR-specific Docker imag..." | Re-trigger Greptile
Introduces the ingest domain that replaces the source domain with a batch-oriented pipeline: tap containers pull records in batches, hook containers validate/enrich them via a unified JSONL contract, and passing records are bulk-published with features. Key changes: - Unified hook contract: hooks process records.jsonl (batch of 1 for depositions, N for ingests) with JSONL outputs (features/rejections/ errors). HookInputs uses typed HookRecord instead of raw dicts. - Ingest pipeline: IngestRun aggregate with event-driven handlers (RunTap → RunHooks → PublishBatch) orchestrated via outbox workers. Atomic SQL counter increments for concurrent batch completion. - RecordService.bulk_publish() with ON CONFLICT DO NOTHING for duplicate detection across ingest runs. - Batch output parsing via FeatureStoragePort (not standalone functions) with both filesystem and S3 implementations. - Renamed source runner → tap runner (SourceRunner → TapRunner, SourceDefinition → TapDefinition) to avoid collision with RecordSource provenance types. - Deleted source domain entirely — ingest supersedes it. - API: POST /api/v1/ingestions to start an ingest run. Closes #104
c8ea3ab to
64fdf6e
Compare
Rename variables, functions, comments, and documentation from "source" terminology to "ingester" to better reflect the actual purpose of these components in the data ingestion pipeline.
Replace hardcoded path construction with centralized StorageLayout to eliminate duplication and provide single source of truth for directory structure across ingest handlers
|
@greptile |
| for record_id, fdir in files_dirs.items(): | ||
| if fdir and fdir.exists(): | ||
| binds.append(f"{self._host_path(fdir)}:/osa/files/{record_id}:ro") | ||
| elif files_base.exists(): |
There was a problem hiding this comment.
Docker bind mount paths break when record IDs contain colons
Docker's Binds format uses : as the delimiter between host-path:container-path:mode. When record_id contains colons — as all deposition SRNs do (e.g., urn:osa:localhost:dep:abc123) and many ingest source IDs do (e.g., arxiv:1234) — the resulting bind string is malformed:
/host/path:/osa/files/urn:osa:localhost:dep:abc123:ro
Docker splits this on : and sees 7 segments instead of 3, causing container creation to fail with a parse error. The existing unit test avoids this because it uses files_dirs={"test": files_dir} where "test" has no colons.
A straightforward fix is to sanitize the record ID before embedding it in the container path (and ensure the hook receives the sanitized ID so it can locate files):
for record_id, fdir in files_dirs.items():
if fdir and fdir.exists():
safe_id = record_id.replace(":", "_").replace("@", "_")
binds.append(f"{self._host_path(fdir)}:/osa/files/{safe_id}:ro")The records.jsonl written to the container would then also need to carry the sanitized ID (or a lookup mapping), so the hook can resolve $OSA_FILES/{safe_id}/.
| continue | ||
|
|
||
| count = await self.feature_service.insert_features( | ||
| hook_name=hook_name, | ||
| record_srn=record_id, | ||
| rows=outcome.features, | ||
| ) | ||
| total_inserted += count |
There was a problem hiding this comment.
Features stored against source IDs, not published OSA record SRNs
The hook processes records whose HookRecord.id is set in RunHooks to the ingester's source ID (e.g., arxiv:1234). The hook therefore writes that same ID into features.jsonl, so outcomes.items() yields record_id = "arxiv:1234".
feature_service.insert_features(record_srn="arxiv:1234", ...) stores the feature rows under the source ID, not under the published OSA SRN (e.g., urn:osa:localhost:rec:...). Any downstream lookup by record SRN will find nothing — the features are effectively orphaned.
The IngestBatchPublished event carries published_srns (the real OSA SRNs) but does not carry a source_id → srn mapping, so there is no way to correlate them inside this handler today.
The cleanest fix is to add the mapping to the event at publish time in PublishBatch:
# In PublishBatch.handle, after bulk_publish:
source_id_to_srn = {
record.get("source_id", record.get("id", "")): str(pub.srn)
for record, pub in zip(passed_records, published)
}
await self.outbox.append(
IngestBatchPublished(
...,
source_id_to_srn=source_id_to_srn, # new field
)
)Then in InsertBatchFeatures:
for record_id, outcome in outcomes.items():
srn = event.source_id_to_srn.get(record_id)
if not srn or outcome.status != "passed":
continue
await self.feature_service.insert_features(hook_name=hook_name, record_srn=srn, rows=outcome.features)- Add record_limit column to ingest_runs table for limiting total records - Add OSA_BASE_URL environment variable support in Docker configs - Implement concurrency control for event handlers via __concurrency__ - Fix feature insertion to use record SRN instead of upstream ID - Increase ingester memory limit from 512m to 1g - Update config validation to require OSA_BASE_URL for localhost
Add hook_concurrency config option to WorkerConfig and remove hardcoded concurrency from RunHooks handler. Update WorkerPool to use config-based concurrency overrides, allowing runtime configuration of hook worker parallelism instead of compile-time constants.
|
@greptile |
| def _read_ingester_records(records_file: Path) -> list[dict]: | ||
| """Read ingester records from JSONL file.""" | ||
| records: list[dict] = [] | ||
| if not records_file.exists(): | ||
| return records | ||
| for line in records_file.open(): | ||
| line = line.strip() | ||
| if not line: | ||
| continue | ||
| try: | ||
| records.append(json.loads(line)) | ||
| except json.JSONDecodeError: | ||
| logger.warning("Skipping malformed ingester record line") | ||
| return records | ||
|
|
||
|
|
||
| def _get_passed_records( | ||
| ingester_records: list[dict], | ||
| batch_dir: Path, | ||
| hooks: list[str], | ||
| feature_storage: FeatureStoragePort, | ||
| ) -> list[dict]: | ||
| """Determine which records passed ALL hooks by intersecting features.jsonl across hooks. | ||
|
|
||
| Each hook processes the full batch independently. A record must appear in | ||
| every hook's features.jsonl to be considered passed. Records rejected or | ||
| errored by any hook are excluded. | ||
| """ | ||
| if not hooks: | ||
| return ingester_records | ||
|
|
||
| passed_ids: set[str] | None = None | ||
|
|
||
| for hook_name in hooks: |
There was a problem hiding this comment.
_get_passed_records ignores the feature_storage abstraction — breaks on S3/K8s deployments
_get_passed_records accepts a feature_storage: FeatureStoragePort argument but never calls it. Instead it opens files directly via features_file.open() on a local Path derived from StorageLayout.
On Kubernetes + S3 deployments the K8s hook runner uploads all outputs to S3 (via put_object). When PublishBatch runs later, those files don't exist on the local filesystem — features_file.exists() returns False for every hook, so _get_passed_records always returns [], and nothing is ever published.
The feature_storage port already has read_batch_outcomes() (newly added in this PR), which handles both filesystem and S3. The function should use it:
async def _get_passed_records(
ingester_records: list[dict],
batch_dir: Path,
hooks: list[str],
feature_storage: FeatureStoragePort,
) -> list[dict]:
if not hooks:
return ingester_records
passed_ids: set[str] | None = None
batch_dir_str = str(batch_dir)
for hook_name in hooks:
outcomes = await feature_storage.read_batch_outcomes(batch_dir_str, hook_name)
if not outcomes:
return []
hook_passed = {rid for rid, o in outcomes.items() if o.status == "passed"}
if passed_ids is None:
passed_ids = hook_passed
else:
passed_ids &= hook_passed
if not passed_ids:
return []
return [r for r in ingester_records if r.get("source_id", r.get("id", "")) in passed_ids]Note that making _get_passed_records async requires calling it with await inside handle().
feat: replace Python logging with logfire structured logging system refactor: replace dict-based ingester records with typed IngesterRecord model feat: add comprehensive logging for ingest pipeline with batch progress tracking feat: increase default hook memory limit from 512m to 1g fix: sanitize record IDs in Docker bind mounts to avoid colon conflicts feat: add OOM detection and logging for hook container failures
Rename source_finished to ingestion_finished and batches_sourced to batches_ingested across the codebase to better reflect the actual ingestion process rather than just sourcing data
Replace hardcoded status strings ("passed", "rejected", "errored") with
OutcomeStatus enum throughout validation domain. Add HookRecordId type
alias for better type safety in batch outcome handling.
feat: add OOM status to HookStatus enum with oom_killed property
Add HookStatus.OOM variant to distinguish OOM failures from general
failures. Include oom_killed property on HookResult for convenient
OOM detection.
feat: implement HookService with OOM retry and checkpointing
Add comprehensive hook execution service that handles OOM conditions
by retrying with doubled memory limits up to MAX_OOM_RETRIES times.
Implements checkpointing for crash recovery and sorts records by size
to maximize progress before potential OOM on large files.
refactor: consolidate ingester record parsing into model class
Move duplicate JSONL parsing logic from handlers into
IngesterRecord.from_jsonl class method. Add IngesterFileRef model
and total_file_mb property for better file size tracking.
feat: add memory parsing and doubling utilities to hook model
Add parse_memory function and with_doubled_memory method to
HookDefinition for dynamic memory limit adjustment during OOM retry.
refactor: replace StringConstraints with BeforeValidator in config
Update logging level validation to use BeforeValidator instead of
deprecated StringConstraints for Pydantic v2 compatibility.
|
@greptile |
| limit=effective_batch_limit, | ||
| session=session, | ||
| ) | ||
| files_dir = batch_dir / "files" | ||
| files_dir.mkdir(parents=True, exist_ok=True) | ||
|
|
There was a problem hiding this comment.
remaining ≤ 0 early return skips IngesterBatchReady, leaving the run permanently stuck
When remaining <= 0, the code increments batches_ingested (and sets ingestion_finished=True) but returns without emitting IngesterBatchReady. No RunHooks/PublishBatch is triggered, so batches_completed is never incremented for this index. The completion predicate ingestion_finished AND batches_ingested == batches_completed never becomes true, and the run stalls.
This path is reachable whenever an IngestStarted event is redelivered after the previous batch already set ingestion_finished=True (e.g. after a worker crash mid-claim). The fix is to omit the increment_batches_ingested call (the previous batch already accounted for it):
if remaining <= 0:
# Limit already met by a previous batch; nothing to do.
returnfix(ingest): add warning log for redelivered IngestStarted events when limit already met
|
@greptile |
Summary
IngesterFileRef.size_mb(required) computed automatically during download.max_file_mbon@ingesterdecorator silently skips oversized records at the entrypoint.OutcomeStatusenum replaces stringly-typed status fields;HookRecordIdNewType for record IDs in batch outcomes.Key changes
ingest/domain withIngestRunaggregate,RunIngester/RunHooks/PublishBatchevent handlersHookServiceinvalidation/service/hook.py— shared by both deposition and ingestion pathwaysHookStatus.OOMenum value distinguishes OOM from generic failure in both OCI and K8s runnersHookDefinition.with_doubled_memory()for memory escalationHookStoragePortextended withwrite_checkpoint()andwrite_batch_outcomes()ValidationService.run_hooks()delegates toHookServicefor per-hook OOM retryStorageLayoutconsolidates all path resolution for ingests, hooks, and depositionsingest_runstable for tracking bulk ingestion stateTest plan
ruff check)max_file_mb=40, verify oversized records skipped, hooks process remaining records, OOM retry triggers on memory-intensive structuresCloses #104, closes #105