Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions server/Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@ test-cov:
# Run linter and type checker
lint:
@just fix
uv run ruff check osa
uv run ruff check osa tests
uv run ty check osa

# Fix formatting and lint issues
fix:
uv run ruff format osa
uv run ruff check --fix osa
uv run ruff format osa tests
uv run ruff check --fix osa tests

# Format code
format:
uv run ruff format osa
uv run ruff format osa tests

# === Database ===

Expand Down
45 changes: 45 additions & 0 deletions server/migrations/versions/add_deliver_after.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""add_deliver_after_and_batches_failed

Add deliver_after column to deliveries table for explicit backoff scheduling.
Add batches_failed column to ingest_runs table for batch failure accounting.

Revision ID: add_deliver_after
Revises: add_ingest_runs
Create Date: 2026-04-04

"""

from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision: str = "add_deliver_after"
down_revision: Union[str, Sequence[str], None] = "add_ingest_runs"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.add_column(
"deliveries",
sa.Column("deliver_after", sa.DateTime(timezone=True), nullable=True),
)
op.create_index(
"idx_deliveries_deliver_after",
"deliveries",
["deliver_after"],
postgresql_where=sa.text("status = 'pending'"),
)

op.add_column(
"ingest_runs",
sa.Column("batches_failed", sa.Integer, nullable=False, server_default=sa.text("0")),
)


def downgrade() -> None:
op.drop_column("ingest_runs", "batches_failed")
op.drop_index("idx_deliveries_deliver_after", table_name="deliveries")
Comment on lines +26 to +44
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 srn→id column rename not captured in the migration chain

add_ingest_runs.py was edited in-place to rename the primary key from srn to id and to drop the ForeignKey("conventions.srn") constraint on convention_srn. The new add_deliver_after migration only adds batches_failed and deliver_after — it contains no ALTER TABLE ingest_runs RENAME COLUMN srn TO id or DROP CONSTRAINT step.

Any environment (CI, staging, a teammate's dev DB) that already has add_ingest_runs applied with the old schema will:

  1. Run add_deliver_after successfully (adds new columns)
  2. Still have a srn column instead of id, causing every query with .where(ingest_runs_table.c.id == ...) to fail at runtime

If the old migration has never been applied anywhere this is fine, but if it has, the upgrade needs an explicit rename step in add_deliver_after:

op.alter_column("ingest_runs", "srn", new_column_name="id")
op.drop_constraint("<fk_constraint_name>", "ingest_runs", type_="foreignkey")

op.drop_column("deliveries", "deliver_after")
3 changes: 1 addition & 2 deletions server/migrations/versions/add_ingest_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@
def upgrade() -> None:
op.create_table(
"ingest_runs",
sa.Column("srn", sa.String(), primary_key=True),
sa.Column("id", sa.String(), primary_key=True),
sa.Column(
"convention_srn",
sa.String(),
sa.ForeignKey("conventions.srn"),
nullable=False,
),
sa.Column(
Expand Down
8 changes: 3 additions & 5 deletions server/osa/domain/feature/handler/insert_batch_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ async def handle(self, event: IngestBatchPublished) -> None:
if not event.expected_features or not event.published_srns:
return

batch_output_dir = str(
self.layout.ingest_batch_dir(event.ingest_run_srn, event.batch_index)
)
batch_output_dir = str(self.layout.ingest_batch_dir(event.ingest_run_id, event.batch_index))

total_inserted = 0
skipped_dupes = 0
Expand Down Expand Up @@ -59,7 +57,7 @@ async def handle(self, event: IngestBatchPublished) -> None:
)
total_inserted += count

short_id = event.ingest_run_srn.rsplit(":", 1)[-1][:8]
short_id = event.ingest_run_id[:8]
dupe_msg = f", {skipped_dupes} duplicates skipped" if skipped_dupes else ""
log.info(
"[{short_id}] batch {batch_index}: inserted {total_inserted} feature rows ({hook_count} hooks{dupe_msg})",
Expand All @@ -68,5 +66,5 @@ async def handle(self, event: IngestBatchPublished) -> None:
total_inserted=total_inserted,
hook_count=len(event.expected_features),
dupe_msg=dupe_msg,
ingest_run_srn=event.ingest_run_srn,
ingest_run_id=event.ingest_run_id,
)
8 changes: 7 additions & 1 deletion server/osa/domain/ingest/command/start_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,19 @@ class StartIngestHandler(CommandHandler[StartIngest, IngestRunCreated]):
service: IngestService

async def run(self, cmd: StartIngest) -> IngestRunCreated:
from osa.domain.shared.model.srn import Domain

ingest_run = await self.service.start_ingest(
convention_srn=cmd.convention_srn,
batch_size=cmd.batch_size,
limit=cmd.limit,
)

node_domain: Domain = self.service.node_domain
srn = f"urn:osa:{node_domain.root}:ing:{ingest_run.id}"

return IngestRunCreated(
srn=ingest_run.srn,
srn=srn,
convention_srn=ingest_run.convention_srn,
status=ingest_run.status,
started_at=ingest_run.started_at.isoformat(),
Expand Down
6 changes: 4 additions & 2 deletions server/osa/domain/ingest/event/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
HookBatchCompleted,
IngestBatchPublished,
IngestCompleted,
IngestStarted,
IngestRunStarted,
IngesterBatchReady,
NextBatchRequested,
)

__all__ = [
"IngestStarted",
"IngestRunStarted",
"NextBatchRequested",
"IngesterBatchReady",
"HookBatchCompleted",
"IngestBatchPublished",
Expand Down
30 changes: 22 additions & 8 deletions server/osa/domain/ingest/event/events.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,39 @@
"""Ingest domain events — payloads carry path references, not inline data (AD-1)."""

from osa.domain.ingest.model.ingest_run import IngestRunId
from osa.domain.shared.event import Event, EventId


class IngestStarted(Event):
"""Emitted when an ingest run is created. Triggers first ingester pull."""
class IngestRunStarted(Event):
"""Emitted once when an ingest run is created. Observability/audit only."""

id: EventId
ingest_run_srn: str
ingest_run_id: IngestRunId
convention_srn: str
batch_size: int


class NextBatchRequested(Event):
"""Emitted to trigger the next ingester batch pull.

Emitted by StartIngest (first batch) and by RunIngester (continuation).
RunIngester is the only handler that listens to this event.
"""

id: EventId
ingest_run_id: IngestRunId
convention_srn: str
batch_size: int


class IngesterBatchReady(Event):
"""Emitted when an ingester container produces a batch of records.

Batch data is on disk at the path derived from {ingest_run_srn, batch_index}.
Batch data is on disk at the path derived from {ingest_run_id, batch_index}.
"""

id: EventId
ingest_run_srn: str
ingest_run_id: IngestRunId
batch_index: int
has_more: bool

Expand All @@ -31,7 +45,7 @@ class HookBatchCompleted(Event):
"""

id: EventId
ingest_run_srn: str
ingest_run_id: IngestRunId
batch_index: int


Expand All @@ -43,7 +57,7 @@ class IngestBatchPublished(Event):
"""

id: EventId
ingest_run_srn: str
ingest_run_id: IngestRunId
convention_srn: str
batch_index: int
published_srns: list[str]
Expand All @@ -56,5 +70,5 @@ class IngestCompleted(Event):
"""Emitted when all batches are processed and the ingest run is complete."""

id: EventId
ingest_run_srn: str
ingest_run_id: IngestRunId
total_published: int
59 changes: 21 additions & 38 deletions server/osa/domain/ingest/handler/publish_batch.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
"""PublishBatch — reads hook outputs, bulk-publishes passing records."""

from datetime import UTC, datetime
from uuid import uuid4

from osa.domain.deposition.service.convention import ConventionService
from osa.domain.feature.port.storage import FeatureStoragePort
from osa.domain.ingest.event.events import (
HookBatchCompleted,
IngestBatchPublished,
IngestCompleted,
)
from osa.domain.ingest.model.ingest_run import IngestStatus
from osa.domain.ingest.model.ingester_record import IngesterRecord
from osa.domain.ingest.port.repository import IngestRunRepository
from osa.domain.ingest.port.storage import IngestStoragePort
from osa.domain.ingest.service.ingest import IngestService
from osa.domain.record.model.draft import RecordDraft
from osa.domain.record.service import RecordService
from osa.domain.shared.error import NotFoundError
Expand All @@ -35,24 +33,23 @@ class PublishBatch(EventHandler[HookBatchCompleted]):
feature_storage: FeatureStoragePort
outbox: Outbox
ingest_storage: IngestStoragePort
ingest_service: IngestService

async def handle(self, event: HookBatchCompleted) -> None:
ingest_run = await self.ingest_repo.get(event.ingest_run_srn)
ingest_run = await self.ingest_repo.get(event.ingest_run_id)
if ingest_run is None:
raise NotFoundError(f"Ingest run not found: {event.ingest_run_srn}")
raise NotFoundError(f"Ingest run not found: {event.ingest_run_id}")

convention = await self.convention_service.get_convention(
ConventionSRN.parse(ingest_run.convention_srn)
)

# Read ingester records via storage port (filesystem or S3)
raw_records = await self.ingest_storage.read_records(
event.ingest_run_srn, event.batch_index
)
raw_records = await self.ingest_storage.read_records(event.ingest_run_id, event.batch_index)
ingester_records = IngesterRecord.from_dicts(raw_records)

# batch_dir used as locator for hook outcome reads
batch_dir = str(self.ingest_storage.batch_dir(event.ingest_run_srn, event.batch_index))
batch_dir = str(self.ingest_storage.batch_dir(event.ingest_run_id, event.batch_index))

# Read hook outcomes for all hooks
expected_features = [h.name for h in convention.hooks]
Expand All @@ -67,7 +64,7 @@ async def handle(self, event: HookBatchCompleted) -> None:
)

# Log outcome breakdown per hook
short_id = event.ingest_run_srn.rsplit(":", 1)[-1][:8]
short_id = event.ingest_run_id[:8]
total = len(ingester_records)
for hook_name in expected_features:
outcomes = await self.feature_storage.read_batch_outcomes(str(batch_dir), hook_name)
Expand All @@ -88,7 +85,7 @@ async def handle(self, event: HookBatchCompleted) -> None:
rejected=rejected,
errored=errored,
missing=missing,
ingest_run_srn=event.ingest_run_srn,
ingest_run_id=event.ingest_run_id,
)

published_count = 0
Expand All @@ -100,7 +97,7 @@ async def handle(self, event: HookBatchCompleted) -> None:
RecordDraft(
source=IngestSource(
id=f"{ingest_run.convention_srn}:{record.source_id}",
ingest_run_srn=ingest_run.srn,
ingest_run_id=ingest_run.id,
upstream_source=record.source_id,
),
metadata=record.metadata,
Expand Down Expand Up @@ -133,15 +130,15 @@ async def handle(self, event: HookBatchCompleted) -> None:
published=published_count,
passed=len(passed_records),
duplicates=len(drafts) - published_count,
ingest_run_srn=event.ingest_run_srn,
ingest_run_id=event.ingest_run_id,
)

# Emit IngestBatchPublished for feature insertion
if published_count > 0:
await self.outbox.append(
IngestBatchPublished(
id=EventId(uuid4()),
ingest_run_srn=event.ingest_run_srn,
ingest_run_id=event.ingest_run_id,
convention_srn=ingest_run.convention_srn,
batch_index=event.batch_index,
published_srns=published_srns,
Expand All @@ -151,31 +148,17 @@ async def handle(self, event: HookBatchCompleted) -> None:
)
)

# Update counters atomically
updated = await self.ingest_repo.increment_completed(
event.ingest_run_srn,
published_count=published_count,
)

# Check completion condition
if updated.is_complete and updated.status == IngestStatus.RUNNING:
updated.check_completion(datetime.now(UTC))
await self.ingest_repo.save(updated)
# Update counters and check completion via service
await self.ingest_service.complete_batch(event.ingest_run_id, published_count)

await self.outbox.append(
IngestCompleted(
id=EventId(uuid4()),
ingest_run_srn=event.ingest_run_srn,
total_published=updated.published_count,
)
)
short_id = event.ingest_run_srn.rsplit(":", 1)[-1][:8]
log.info(
"[{short_id}] COMPLETE: {total_published} records published",
short_id=short_id,
total_published=updated.published_count,
ingest_run_srn=event.ingest_run_srn,
)
async def on_exhausted(self, event: HookBatchCompleted) -> None:
"""Called when publish retries are exhausted — account for the failed batch."""
log.error(
"batch {batch_index} publish retries exhausted",
batch_index=event.batch_index,
ingest_run_id=event.ingest_run_id,
)
await self.ingest_service.fail_batch(event.ingest_run_id)


async def _get_passed_records(
Expand Down
Loading
Loading