Skip to content

feat: add harvest domain for bulk ingestion pipeline #104

@rorybyrne

Description

@rorybyrne

Summary

Add a new harvest bounded context that enables bulk ingestion of scientific databases (PDB, GEO, UniProt) through a batch-native pipeline. Records go Source → Hook → Record directly, bypassing the deposition lifecycle.

Motivation

The Hub product depends on bulk-ingesting large scientific databases. PDB alone has 200k+ structures. The current per-record pipeline takes ~37 days for PDB. The harvest domain batches 1000 records per K8s Job, reducing PDB ingestion to ~4-7 hours.

Design doc: bulk ingestion harvest domain (from /office-hours 2026-03-24)
CEO plan: scope expansion review completed, all expansions deferred, core design approved.

Architecture

POST /harvests → StartHarvest Command → HarvestService
  → Creates HarvestRun(PENDING)
  → Emits SourceRequested

Worker: PullFromSource (reuse) → Source container produces records.jsonl
  → Emits HarvestSourceCompleted

Worker: RunHookBatch → Hook container processes batch (batch-by-default contract)
  → Emits HarvestHooksCompleted (pass/reject/error per record)

Worker: BulkPublishRecords → RecordService.bulk_publish() + feature insertion
  → Emits HarvestBatchPublished + RecordPublished per record

Repeat until source reports no more pages.

Key decisions (from CEO review)

  • Batch-by-default hooks: ONE hook contract. Deposition sends batch of 1. No auto-detection.
  • Three outcomes per record: pass (features), reject (data quality), error (infra, retryable)
  • Prefetch is inherent: outbox workers naturally parallelize source and hook handlers
  • Harvested records are NOT depositions: deposition lifecycle is for human workflow; harvest creates records directly via RecordSource discriminated union

Unified batch hook contract

$OSA_IN/records.jsonl           # always (1 line for deposition, N for harvest)
$OSA_FILES/{id}/                # subdirectories per record
$OSA_OUT/features/{id}.json     # per-record feature output
$OSA_OUT/rejections/{id}.json   # per-record rejection
$OSA_OUT/errors/{id}.json       # per-record error

HarvestRun aggregate

class HarvestRun(Aggregate):
    srn: HarvestRunSRN
    convention_srn: ConventionSRN
    status: HarvestStatus  # PENDING → SOURCING → ENRICHING → PUBLISHING → COMPLETED | FAILED
    batches_sourced: int
    batches_processed: int
    published_count: int
    rejected_count: int
    error_count: int
    rejected_records: list  # source_id + reason
    failed_records: list    # source_id + error
    started_at: datetime
    completed_at: datetime | None

API surface

POST   /api/v1/harvests              → Start harvest for a convention
GET    /api/v1/harvests              → List harvest runs
GET    /api/v1/harvests/{srn}        → Get harvest details + progress
POST   /api/v1/harvests/{srn}/retry  → Retry failed records
DELETE /api/v1/harvests/{srn}        → Cancel running harvest

New files

domain/harvest/
  model/harvest_run.py
  command/start_harvest.py
  event/source_completed.py, hooks_completed.py, batch_published.py
  handler/run_source_batch.py, run_hook_batch.py, bulk_publish_records.py
  port/harvest_repository.py
  service/harvest.py
application/api/v1/routes/harvests.py
infrastructure/persistence/harvest_mapper.py

DB: harvest_runs table

CREATE TABLE harvest_runs (
    srn TEXT PRIMARY KEY,
    convention_srn TEXT NOT NULL,
    status TEXT NOT NULL,
    batches_sourced INTEGER DEFAULT 0,
    batches_processed INTEGER DEFAULT 0,
    published_count INTEGER DEFAULT 0,
    rejected_count INTEGER DEFAULT 0,
    error_count INTEGER DEFAULT 0,
    rejected_records JSONB DEFAULT '[]',
    failed_records JSONB DEFAULT '[]',
    started_at TIMESTAMPTZ,
    completed_at TIMESTAMPTZ
);

Error handling

  • Malformed records.jsonl lines: skip + log, add to failed_records
  • Hook OOM mid-batch: salvage partial results, retry remainder
  • Source API 429: retry via outbox
  • Duplicate records (idempotent): skip via source dedup index on records table

Success criteria

  1. PDB ingestion (200k records) completes in under 8 hours
  2. Hook authors don't change their code for batch mode
  3. Failed records don't block successful ones
  4. Per-record deposition path is completely unaffected

Blocked by

Open questions (resolve during implementation)

  1. Batch size: configurable per convention (default 1000)
  2. Retry: new HarvestRun targeting failed source_ids
  3. source_id safety: sanitize to [a-zA-Z0-9._-] at parse time

Metadata

Metadata

Assignees

No one assigned

    Labels

    featureNew functionality

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions