Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,15 @@ jobs:
OSA_AUTH__JWT__SECRET: test-secret-for-integration-tests-minimum-32-chars
TEST: "1"

# Build & push Docker image (only on main, gated on all server checks)
# Build & push Docker image (main + PRs onto main, gated on all server checks)
image:
name: Server - Image
needs: [changes, server-lint, server-typecheck, server-test, server-contract, server-integration]
if: github.ref == 'refs/heads/main' && needs.changes.outputs.server == 'true'
if: >-
needs.changes.outputs.server == 'true' && (
github.ref == 'refs/heads/main' ||
(github.event_name == 'pull_request' && github.base_ref == 'main')
)
uses: ./.github/workflows/image.yml
permissions:
contents: read
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ jobs:
if [[ "${{ github.event_name }}" == "release" ]]; then
TAGS="${TAGS},${{ env.IMAGE }}:${{ github.event.release.tag_name }}"
fi
if [[ -n "${{ github.event.pull_request.number }}" ]]; then
TAGS="${TAGS},${{ env.IMAGE }}:pr-${{ github.event.pull_request.number }}"
fi
echo "tags=${TAGS}" >> "$GITHUB_OUTPUT"

- uses: docker/build-push-action@v6
Expand Down
1 change: 1 addition & 0 deletions deploy/docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ services:
OSA_DATABASE__URL: postgresql+asyncpg://${POSTGRES_USER:-postgres}:${POSTGRES_PASSWORD:-osa}@db:5432/${POSTGRES_DB:-osa}
OSA_DATA_DIR: /data
OSA_CONFIG_FILE: /app/osa.yaml
OSA_BASE_URL: http://localhost:8000
OSA_LOGGING__LEVEL: ${LOG_LEVEL:-DEBUG}
WATCHFILES_FORCE_POLLING: "true"
entrypoint: []
Expand Down
1 change: 1 addition & 0 deletions deploy/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ services:
environment:
OSA_DATABASE__URL: postgresql+asyncpg://${POSTGRES_USER:-postgres}:${POSTGRES_PASSWORD:-osa}@db:5432/${POSTGRES_DB:-osa}
OSA_DATA_DIR: /data
OSA_BASE_URL: ${OSA_BASE_URL:-http://localhost:8000}
OSA_LOGGING__LEVEL: ${LOG_LEVEL:-INFO}
OSA_AUTH__JWT__SECRET: ${JWT_SECRET:-change-me-in-production-must-be-32-chars-long}
depends_on:
Expand Down
1 change: 1 addition & 0 deletions server/Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ test-cov:

# Run linter and type checker
lint:
@just fix
uv run ruff check osa
uv run ty check osa

Expand Down
93 changes: 93 additions & 0 deletions server/migrations/versions/add_ingest_runs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""add_ingest_runs

Add ingest_runs table for bulk ingestion tracking.

Revision ID: add_ingest_runs
Revises: source_agnostic_records
Create Date: 2026-03-25

"""

from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision: str = "add_ingest_runs"
down_revision: Union[str, Sequence[str], None] = "source_agnostic_records"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.create_table(
"ingest_runs",
sa.Column("srn", sa.String(), primary_key=True),
sa.Column(
"convention_srn",
sa.String(),
sa.ForeignKey("conventions.srn"),
nullable=False,
),
sa.Column(
"status",
sa.String(32),
nullable=False,
server_default=sa.text("'pending'"),
),
sa.Column(
"ingestion_finished",
sa.Boolean(),
nullable=False,
server_default=sa.text("false"),
),
sa.Column(
"batches_ingested",
sa.Integer(),
nullable=False,
server_default=sa.text("0"),
),
sa.Column(
"batches_completed",
sa.Integer(),
nullable=False,
server_default=sa.text("0"),
),
sa.Column(
"published_count",
sa.Integer(),
nullable=False,
server_default=sa.text("0"),
),
sa.Column(
"batch_size",
sa.Integer(),
nullable=False,
server_default=sa.text("1000"),
),
sa.Column("record_limit", sa.Integer(), nullable=True),
sa.Column("started_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("completed_at", sa.DateTime(timezone=True), nullable=True),
sa.CheckConstraint(
"status IN ('pending', 'running', 'completed', 'failed')",
name="ingest_runs_status_check",
),
)

op.create_index(
"idx_ingest_runs_convention",
"ingest_runs",
["convention_srn"],
)
op.create_index(
"idx_ingest_runs_status",
"ingest_runs",
["status"],
)


def downgrade() -> None:
op.drop_index("idx_ingest_runs_status", table_name="ingest_runs")
op.drop_index("idx_ingest_runs_convention", table_name="ingest_runs")
op.drop_table("ingest_runs")
49 changes: 43 additions & 6 deletions server/osa/application/api/rest/app.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import sys
from contextlib import asynccontextmanager
from typing import Any

Expand All @@ -16,6 +17,7 @@
depositions,
discovery,
events,
ingestions,
health,
ontologies,
records,
Expand All @@ -25,7 +27,7 @@
validation,
)
from osa.application.di import create_container
from osa.config import Config, configure_logging
from osa.config import Config
from osa.domain.shared.authorization.startup import validate_all_handlers
from osa.domain.shared.error import OSAError
from osa.domain.shared.event import EventHandler
Expand Down Expand Up @@ -80,9 +82,41 @@ def create_app(
# Pydantic Settings populates from env vars at runtime
config = Config() # type: ignore[call-arg]

# Configure logging early
configure_logging(config.logging)
logger.info("Starting OSA server: %s v%s", config.name, config.version)
# Configure logfire as the single logging system
import logging as _logging

from opentelemetry.sdk.trace.export import SimpleSpanProcessor

from osa.infrastructure.logging import OSAConsoleExporter

logfire.configure(
send_to_logfire="if-token-present",
service_name=config.name,
console=False, # Disable default console — we use OSAConsoleExporter
inspect_arguments=False,
additional_span_processors=[
SimpleSpanProcessor(
OSAConsoleExporter(
output=sys.stderr,
include_timestamp=True,
min_log_level=config.logging.level,
)
),
],
)

# Route Python logging through logfire so old-style logger.info() calls
# appear in the same output stream
root = _logging.getLogger()
root.setLevel(config.logging.level.upper())
for h in root.handlers[:]:
root.removeHandler(h)
root.addHandler(logfire.LogfireLoggingHandler())

# Suppress duplicate access logs — logfire FastAPI instrumentation handles HTTP logging
_logging.getLogger("uvicorn.access").setLevel(_logging.WARNING)

logfire.info("Starting OSA server: {name} v{version}", name=config.name, version=config.version)

# Validate all handlers have authorization declarations (fail fast)
validate_all_handlers()
Expand All @@ -94,9 +128,11 @@ def create_app(
lifespan=lifespan,
)

# Instrument FastAPI for automatic tracing of HTTP requests
logfire.instrument_httpx()
logfire.instrument_fastapi(app_instance)
logfire.instrument_fastapi(
app_instance,
excluded_urls="/api/v1/health",
)

# Setup dependency injection
container = create_container(
Expand All @@ -117,6 +153,7 @@ def create_app(
app_instance.include_router(schemas.router, prefix="/api/v1")
app_instance.include_router(conventions.router, prefix="/api/v1")
app_instance.include_router(depositions.router, prefix="/api/v1")
app_instance.include_router(ingestions.router, prefix="/api/v1")
app_instance.include_router(validation.router, prefix="/api/v1")
app_instance.include_router(discovery.router, prefix="/api/v1")

Expand Down
20 changes: 20 additions & 0 deletions server/osa/application/api/v1/routes/ingestions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""Ingest REST routes."""

from dishka.integrations.fastapi import DishkaRoute, FromDishka
from fastapi import APIRouter

from osa.domain.ingest.command.start_ingest import (
IngestRunCreated,
StartIngest,
StartIngestHandler,
)

router = APIRouter(prefix="/ingestions", tags=["Ingestions"], route_class=DishkaRoute)


@router.post("", response_model=IngestRunCreated, status_code=201)
async def start_ingest(
body: StartIngest,
handler: FromDishka[StartIngestHandler],
) -> IngestRunCreated:
return await handler.run(body)
4 changes: 2 additions & 2 deletions server/osa/application/di.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from osa.infrastructure.index.di import IndexProvider
from osa.infrastructure.k8s.di import RunnerProvider
from osa.infrastructure.persistence import PersistenceProvider
from osa.infrastructure.source.di import SourceProvider
from osa.infrastructure.ingest.di import IngestProvider
from osa.util.di.scope import Scope
from osa.util.paths import OSAPaths

Expand All @@ -44,7 +44,7 @@ def create_container(
PersistenceProvider(),
RunnerProvider(),
IndexProvider(),
SourceProvider(),
IngestProvider(),
EventProvider(extra_handlers=extra_handlers),
HttpProvider(),
DepositionProvider(),
Expand Down
37 changes: 27 additions & 10 deletions server/osa/config.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from logfire import LevelName
import logging
import os
import re
import sys
from pathlib import Path
from typing import Any, Literal
from typing import Any, Literal, Annotated

import yaml
from pydantic import BaseModel, field_validator, model_validator
from pydantic import BaseModel, BeforeValidator, field_validator, model_validator
from pydantic_settings import BaseSettings, PydanticBaseSettingsSource
from typing_extensions import Self

Expand Down Expand Up @@ -63,7 +64,9 @@ class DatabaseConfig(BaseModel):
class LoggingConfig(BaseModel):
"""Logging configuration (nested in Config, uses env_nested_delimiter)."""

level: str = "DEBUG" # Root log level (DEBUG for development)
level: Annotated[
LevelName, BeforeValidator(lambda v: v.lower() if isinstance(v, str) else v)
] = "debug" # Root log level (DEBUG for development)
format: str = "%(asctime)s %(levelname)-8s [%(name)s] %(message)s"
date_format: str = "%Y-%m-%d %H:%M:%S"

Expand All @@ -81,6 +84,7 @@ class WorkerConfig(BaseModel):

poll_interval: float = 0.5 # Seconds between outbox polls
batch_size: int = 100 # Maximum events to fetch per poll cycle
hook_concurrency: int = 8 # Number of concurrent hook workers


class K8sConfig(BaseModel):
Expand Down Expand Up @@ -225,7 +229,8 @@ class Config(BaseSettings):
name: str = "Open Science Archive"
version: str = "0.0.1"
description: str = "An open platform for depositing scientific data"
domain: str = "localhost" # Node domain for SRN construction
domain: str = "localhost" # Node identity for SRN construction (DNS name)
base_url: str = "" # Public URL where users reach the server (e.g. http://localhost:8000)

# These are BaseModel, so env_nested_delimiter handles their env vars
frontend: Frontend = Frontend()
Expand All @@ -243,15 +248,26 @@ class Config(BaseSettings):
"env_nested_delimiter": "__", # Allows OSA_DATABASE__URL override
}

@property
def base_url(self) -> str:
"""Public base URL derived from domain. HTTPS unless localhost."""
scheme = "http" if self.domain == "localhost" else "https"
return f"{scheme}://{self.domain}"
@model_validator(mode="after")
def derive_base_url(self) -> Self:
"""Derive base_url from domain if not explicitly set.

For non-localhost domains, HTTPS on port 443 is assumed.
For localhost, base_url must be set explicitly (port matters).
"""
if self.base_url:
return self
if self.domain == "localhost":
raise ValueError(
"OSA_BASE_URL is required when domain is localhost "
"(e.g. OSA_BASE_URL=http://localhost:8000)"
)
self.base_url = f"https://{self.domain}"
return self

@model_validator(mode="after")
def derive_frontend_url(self) -> Self:
"""Derive frontend URL from domain if still the default localhost value."""
"""Derive frontend URL from base_url if still the default localhost value."""
if self.frontend.url == "http://localhost:3000":
self.frontend = Frontend(url=self.base_url)
return self
Expand Down Expand Up @@ -366,5 +382,6 @@ def configure_logging(config: LoggingConfig) -> None:
logging.getLogger("asyncio").setLevel(logging.WARNING)
logging.getLogger("aiosqlite").setLevel(logging.WARNING)
logging.getLogger("apscheduler").setLevel(logging.WARNING) # Suppress job completion spam
logging.getLogger("uvicorn.access").setLevel(logging.WARNING) # Logfire handles HTTP logging

logging.debug("Logging configured: level=%s, file=%s", config.level, config.file)
6 changes: 3 additions & 3 deletions server/osa/domain/deposition/command/create_convention.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from osa.domain.shared.authorization.gate import public
from osa.domain.shared.command import Command, CommandHandler, Result
from osa.domain.shared.model.hook import HookDefinition
from osa.domain.shared.model.source import SourceDefinition
from osa.domain.shared.model.source import IngesterDefinition
from osa.domain.shared.model.srn import ConventionSRN, SchemaSRN


Expand All @@ -21,7 +21,7 @@ class CreateConvention(Command):
file_requirements: FileRequirements
description: str | None = None
hooks: list[HookDefinition] = []
source: SourceDefinition | None = None
ingester: IngesterDefinition | None = None


class ConventionCreated(Result):
Expand All @@ -44,7 +44,7 @@ async def run(self, cmd: CreateConvention) -> ConventionCreated:
file_requirements=cmd.file_requirements,
description=cmd.description,
hooks=cmd.hooks,
source=cmd.source,
ingester=cmd.ingester,
)
return ConventionCreated(
srn=convention.srn,
Expand Down
5 changes: 1 addition & 4 deletions server/osa/domain/deposition/handler/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
"""Deposition domain event handlers."""

from osa.domain.deposition.handler.create_deposition_from_source import (
CreateDepositionFromSource,
)
from osa.domain.deposition.handler.return_to_draft import ReturnToDraft

__all__ = ["CreateDepositionFromSource", "ReturnToDraft"]
__all__ = ["ReturnToDraft"]
Loading
Loading