Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions server/Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -82,21 +82,23 @@ cli *ARGS:
PG_USER := env("PG_USER", "postgres")
PG_PASS := env("PG_PASS", "osa")
PG_HOST := env("PG_HOST", "localhost")
PG_PORT := env("PG_PORT", "5432")
# Dedicated host port for the integration-test DB, distinct from dev (5432)
# so both can coexist. Override with TEST_PG_PORT if 55432 is also taken.
TEST_PG_PORT := env("TEST_PG_PORT", "55432")
TEST_DB := "osa_test"
TEST_DB_URL := "postgresql+asyncpg://" + PG_USER + ":" + PG_PASS + "@" + PG_HOST + ":" + PG_PORT + "/" + TEST_DB
TEST_DB_URL := "postgresql+asyncpg://" + PG_USER + ":" + PG_PASS + "@" + PG_HOST + ":" + TEST_PG_PORT + "/" + TEST_DB

# Create test database (idempotent)
test-db-create:
PGPASSWORD={{PG_PASS}} psql -h {{PG_HOST}} -p {{PG_PORT}} -U {{PG_USER}} \
PGPASSWORD={{PG_PASS}} psql -h {{PG_HOST}} -p {{TEST_PG_PORT}} -U {{PG_USER}} \
-tc "SELECT 1 FROM pg_database WHERE datname='{{TEST_DB}}'" \
| grep -q 1 || \
PGPASSWORD={{PG_PASS}} psql -h {{PG_HOST}} -p {{PG_PORT}} -U {{PG_USER}} \
PGPASSWORD={{PG_PASS}} psql -h {{PG_HOST}} -p {{TEST_PG_PORT}} -U {{PG_USER}} \
-c "CREATE DATABASE {{TEST_DB}}"

# Drop test database
test-db-drop:
PGPASSWORD={{PG_PASS}} psql -h {{PG_HOST}} -p {{PG_PORT}} -U {{PG_USER}} \
PGPASSWORD={{PG_PASS}} psql -h {{PG_HOST}} -p {{TEST_PG_PORT}} -U {{PG_USER}} \
-c "DROP DATABASE IF EXISTS {{TEST_DB}} WITH (FORCE)"

# Run integration tests (persistence tests skip if PG is not available)
Expand All @@ -105,7 +107,7 @@ test-integration:

# Run integration tests with PG: ensure DB running → wipe → create → migrate → test → wipe
test-integration-pg:
just --justfile ../Justfile db-up
POSTGRES_PORT={{TEST_PG_PORT}} just --justfile ../Justfile db-up
@just test-db-drop
@just test-db-create
OSA_DATABASE__URL="{{TEST_DB_URL}}" \
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""076_add_feature_tables_record_srn_fks

For each row currently registered in the ``public.feature_tables`` catalog,
add a foreign-key constraint on ``features.<hook>.record_srn`` referencing
``records.srn`` with ``ON DELETE CASCADE``. Bundles GitHub #75.

Idempotent: skips any hook whose FK is already present (detected by naming
convention). No-op on greenfield deployments where the catalog is empty.

Revision ID: 076_feature_fks
Revises: 076_records_schema_srn
Create Date: 2026-04-19

"""

import re
from typing import Sequence, Union

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "076_feature_fks"
down_revision: Union[str, Sequence[str], None] = "076_records_schema_srn"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


FK_NAME_TEMPLATE = "fk_features_{hook}_record_srn"

# Defense-in-depth: hook names read from ``feature_tables`` are interpolated
# into raw DDL below. Application code constrains hooks to this shape at write
# time, but the migration should not trust that invariant — a stray ``"`` in a
# stored name would break out of quoting. Mirrors the ``_safe_ident`` check in
# ``osa.infrastructure.persistence.metadata_store``.
_PG_IDENT_RE = re.compile(r"^[a-z][a-z0-9_]{0,62}$")


def _safe_ident(name: str) -> str:
if not _PG_IDENT_RE.match(name):
raise ValueError(f"Refusing to interpolate unsafe PG identifier {name!r} into DDL")
return name


def upgrade() -> None:
conn = op.get_bind()
rows = conn.execute(_select_hooks()).fetchall()

for row in rows:
hook = _safe_ident(row[0])
fk_name = _safe_ident(FK_NAME_TEMPLATE.format(hook=hook))
exists = conn.execute(_check_constraint(fk_name)).scalar()
if exists:
Comment on lines +51 to +52
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 FK constraint name exceeds 63-char PG identifier limit for long hook names

FK_NAME_TEMPLATE = "fk_features_{hook}_record_srn" has 23 chars of fixed overhead ("fk_features_" = 12, "_record_srn" = 11). The _PG_IDENT_RE regex allows hook names up to 63 chars, so the rendered FK name can reach 86 chars. _safe_ident(fk_name) will reject any name over 63 chars with a ValueError, aborting the migration for any hook whose name exceeds 40 characters.

fk_name = _safe_ident(FK_NAME_TEMPLATE.format(hook=hook))
# e.g. hook = "computational_protein_structure_analysis_v2"  (42 chars)
# → "fk_features_computational_protein_structure_analysis_v2_record_srn"  (66 chars) → ValueError

Consider truncating or hashing the hook name when the full FK name would overflow:

MAX_HOOK_IN_FK = 63 - len("fk_features__record_srn")  # = 40
safe_hook = hook if len(hook) <= MAX_HOOK_IN_FK else hook[:36] + "_" + hex(hash(hook) & 0xFFFF)[2:]
fk_name = f"fk_features_{safe_hook}_record_srn"

continue

conn.execute(_add_fk_sql(hook, fk_name))


def downgrade() -> None:
conn = op.get_bind()
rows = conn.execute(_select_hooks()).fetchall()
for row in rows:
hook = _safe_ident(row[0])
fk_name = _safe_ident(FK_NAME_TEMPLATE.format(hook=hook))
exists = conn.execute(_check_constraint(fk_name)).scalar()
if not exists:
continue
conn.execute(_drop_fk_sql(hook, fk_name))


def _select_hooks():
from sqlalchemy import text

return text("SELECT hook_name FROM feature_tables")


def _check_constraint(fk_name: str):
from sqlalchemy import text

return text("SELECT 1 FROM pg_constraint WHERE conname = :fk_name").bindparams(fk_name=fk_name)


def _add_fk_sql(hook: str, fk_name: str):
from sqlalchemy import text

return text(
f'ALTER TABLE features."{hook}" '
f'ADD CONSTRAINT "{fk_name}" '
f"FOREIGN KEY (record_srn) REFERENCES records(srn) ON DELETE CASCADE"
)


def _drop_fk_sql(hook: str, fk_name: str):
from sqlalchemy import text

return text(f'ALTER TABLE features."{hook}" DROP CONSTRAINT "{fk_name}"')
47 changes: 47 additions & 0 deletions server/migrations/versions/076_add_metadata_schema_and_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""076_add_metadata_schema_and_catalog

Create the ``metadata`` PostgreSQL schema and the ``public.metadata_tables``
catalog table. Dynamic per-schema metadata tables will live inside the
``metadata`` schema; the catalog indexes them by short schema id + major.

Revision ID: 076_metadata_catalog
Revises: add_deliver_after
Create Date: 2026-04-19

"""

from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects.postgresql import JSONB

# revision identifiers, used by Alembic.
revision: str = "076_metadata_catalog"
down_revision: Union[str, Sequence[str], None] = "add_deliver_after"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.execute('CREATE SCHEMA IF NOT EXISTS "metadata"')

op.create_table(
"metadata_tables",
sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True),
sa.Column("schema_id", sa.Text(), nullable=False),
sa.Column("schema_slug", sa.Text(), nullable=False),
sa.Column("schema_major", sa.Integer(), nullable=False),
sa.Column("schema_versions", JSONB(), nullable=False),
sa.Column("pg_table", sa.Text(), nullable=False),
sa.Column("metadata_schema", JSONB(), nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
sa.UniqueConstraint("schema_id", "schema_major", name="uq_metadata_tables_id_major"),
sa.UniqueConstraint("pg_table", name="uq_metadata_tables_pg_table"),
)


def downgrade() -> None:
op.drop_table("metadata_tables")
op.execute('DROP SCHEMA IF EXISTS "metadata" CASCADE')
40 changes: 40 additions & 0 deletions server/migrations/versions/076_add_records_schema_srn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""076_add_records_schema_id

Add ``records.schema_id`` + ``records.schema_version`` so a Record's typed
linkage is first-class (FR-008).

Greenfield only: no backfill from the linked convention. If this runs
against a populated ``records`` table it fails at ``SET NOT NULL`` with a
clear constraint error, which is the correct signal that the data predates
this schema.

Revision ID: 076_records_schema_srn
Revises: 076_schemas_to_id
Create Date: 2026-04-19

"""

from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision: str = "076_records_schema_srn"
down_revision: Union[str, Sequence[str], None] = "076_schemas_to_id"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.add_column("records", sa.Column("schema_id", sa.Text(), nullable=True))
op.add_column("records", sa.Column("schema_version", sa.Text(), nullable=True))
op.alter_column("records", "schema_id", nullable=False)
op.alter_column("records", "schema_version", nullable=False)
op.create_index("idx_records_schema_id", "records", ["schema_id"])


def downgrade() -> None:
op.drop_index("idx_records_schema_id", table_name="records")
op.drop_column("records", "schema_version")
op.drop_column("records", "schema_id")
66 changes: 66 additions & 0 deletions server/migrations/versions/076_schemas_to_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""076_schemas_to_id

Replace URN-keyed ``schemas`` and ``conventions`` columns with short-form
``(id, version)`` pairs. After this migration, internal code works entirely
in ``SchemaId``; full URNs are reserved for federation edges.

Changes:
- ``schemas.srn`` → ``schemas.id`` + ``schemas.version``. Composite PK.
- ``conventions.schema_srn`` → ``conventions.schema_id`` + ``conventions.schema_version``.

Greenfield only: no backfill from the old URN columns. If this runs against
a populated DB it fails at ``SET NOT NULL`` with a clear constraint error,
which is the correct signal that the data predates this schema.

Revision ID: 076_schemas_to_id
Revises: 076_metadata_catalog
Create Date: 2026-04-20

"""

from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision: str = "076_schemas_to_id"
down_revision: Union[str, Sequence[str], None] = "076_metadata_catalog"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# schemas: drop old SRN PK, add id + version, recompose PK.
op.add_column("schemas", sa.Column("id", sa.String(), nullable=True))
op.add_column("schemas", sa.Column("version", sa.String(), nullable=True))
op.alter_column("schemas", "id", nullable=False)
op.alter_column("schemas", "version", nullable=False)
op.drop_constraint("schemas_pkey", "schemas", type_="primary")
op.drop_column("schemas", "srn")
op.create_primary_key("schemas_pkey", "schemas", ["id", "version"])
op.create_index("idx_schemas_id", "schemas", ["id"])

# conventions: split schema_srn into schema_id + schema_version.
op.add_column("conventions", sa.Column("schema_id", sa.String(), nullable=True))
op.add_column("conventions", sa.Column("schema_version", sa.String(), nullable=True))
op.alter_column("conventions", "schema_id", nullable=False)
op.alter_column("conventions", "schema_version", nullable=False)
op.drop_column("conventions", "schema_srn")


def downgrade() -> None:
# conventions back to schema_srn
op.add_column("conventions", sa.Column("schema_srn", sa.String(), nullable=True))
op.alter_column("conventions", "schema_srn", nullable=False)
op.drop_column("conventions", "schema_version")
op.drop_column("conventions", "schema_id")

# schemas back to srn
op.drop_index("idx_schemas_id", table_name="schemas")
op.drop_constraint("schemas_pkey", "schemas", type_="primary")
op.add_column("schemas", sa.Column("srn", sa.String(), nullable=True))
op.alter_column("schemas", "srn", nullable=False)
op.create_primary_key("schemas_pkey", "schemas", ["srn"])
op.drop_column("schemas", "version")
op.drop_column("schemas", "id")
43 changes: 35 additions & 8 deletions server/osa/application/api/v1/routes/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@
from fastapi import APIRouter
from pydantic import BaseModel, Field

from osa.domain.discovery.model.value import (
Filter,
SortOrder,
)
from osa.domain.discovery.model.value import FilterExpr, SortOrder
from osa.domain.discovery.query.get_feature_catalog import (
GetFeatureCatalog,
GetFeatureCatalogHandler,
Expand All @@ -25,6 +22,8 @@
SearchRecordsHandler,
SearchRecordsResult,
)
from osa.domain.shared.error import ValidationError
from osa.domain.shared.model.srn import ConventionSRN, SchemaId

router = APIRouter(
prefix="/discovery",
Expand All @@ -37,7 +36,11 @@


class RecordSearchRequest(BaseModel):
filters: list[Filter] = []
schema: str | None = None
"""Short-form schema identity: ``"<id>@<semver>"`` (e.g. ``"pdb-structure@1.0.0"``)."""

convention_srn: ConventionSRN | None = None
filter: FilterExpr | None = None
q: str | None = None
sort: str = "published_at"
order: SortOrder = SortOrder.DESC
Expand All @@ -56,7 +59,10 @@ class FeatureCatalogResponse(BaseModel):


class FeatureSearchRequest(BaseModel):
filters: list[Filter] = []
schema: str | None = None
"""Short-form schema identity, optional. See RecordSearchRequest.schema."""

filter: FilterExpr | None = None
record_srn: str | None = None
sort: str = "id"
order: SortOrder = SortOrder.DESC
Expand All @@ -70,6 +76,24 @@ class FeatureSearchResponse(BaseModel):
has_more: bool


def _parse_schema(value: str | None) -> SchemaId | None:
if value is None:
return None
if "@" not in value:
raise ValidationError(
f"Schema {value!r} must be fully qualified as '<id>@<semver>' "
"(e.g. 'pdb-structure@1.0.0'). Family-level scoping "
"(id alone, resolving to the latest version across a schema family) "
"is planned but not yet supported.",
field="schema",
code="cross_scope_not_yet_supported",
)
try:
return SchemaId.parse(value)
except ValueError as exc:
raise ValidationError(str(exc), field="schema") from exc


# ── Routes ──


Expand All @@ -81,7 +105,9 @@ async def search_records(
"""Search and filter published records."""
result: SearchRecordsResult = await handler.run(
SearchRecords(
filters=body.filters,
filter_expr=body.filter,
schema_id=_parse_schema(body.schema),
convention_srn=body.convention_srn,
q=body.q,
sort=body.sort,
order=body.order,
Expand Down Expand Up @@ -115,7 +141,8 @@ async def search_features(
result: SearchFeaturesResult = await handler.run(
SearchFeatures(
hook_name=hook_name,
filters=body.filters,
filter_expr=body.filter,
schema_id=_parse_schema(body.schema),
record_srn=body.record_srn,
sort=body.sort,
order=body.order,
Expand Down
Loading
Loading