Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions app/models/dispatch_outbox.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from sqlalchemy import Boolean, CheckConstraint, Column, DateTime, Integer, String, Text, func
from sqlalchemy import BigInteger, Boolean, CheckConstraint, Column, DateTime, ForeignKey, Integer, String, Text, func
from sqlalchemy.dialects.postgresql import JSONB, UUID

from app.database import Base
Expand All @@ -9,12 +9,23 @@
class DispatchOutbox(Base):
__tablename__ = "dispatch_outbox"

id = Column(Integer, primary_key=True, autoincrement=True)
id = Column(BigInteger, primary_key=True, autoincrement=True)
# Nullable since migration 021: cue-task rows still set execution_id +
# cue_id; message-task rows (deliver_message / retry_message) leave
# them NULL and reference message_id in the payload instead.
execution_id = Column(UUID(as_uuid=True), nullable=True)
cue_id = Column(String(20), nullable=True)
# FK declarations match migration 002 intent (table-level CASCADE on
# parent delete). Tests use Base.metadata.create_all and rely on the
# model declaring these so SQLAlchemy ORM can traverse relationships.
execution_id = Column(
UUID(as_uuid=True),
ForeignKey("executions.id", ondelete="CASCADE"),
nullable=True,
)
cue_id = Column(
String(20),
ForeignKey("cues.id", ondelete="CASCADE"),
nullable=True,
)
task_type = Column(String(20), nullable=False, default="deliver")
payload = Column(JSONB, nullable=False, default={})
dispatched = Column(Boolean, nullable=False, default=False)
Expand Down
2 changes: 1 addition & 1 deletion parity-manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
{"path": "app/models/alert.py", "private_counterpart": "app/models/alert.py", "last_synced": "2026-04-17"},
{"path": "app/models/cue.py", "private_counterpart": "app/models/cue.py", "last_synced": "2026-04-16"},
{"path": "app/models/device_code.py", "private_counterpart": "app/models/device_code.py", "last_synced": "2026-04-16"},
{"path": "app/models/dispatch_outbox.py", "private_counterpart": "app/models/dispatch_outbox.py", "last_synced": "2026-04-16"},
{"path": "app/models/dispatch_outbox.py", "private_counterpart": "app/models/dispatch_outbox.py", "last_synced": "2026-05-05", "ported_in": "port/594-dispatch-outbox-cue-id-fk", "notes": "FK declarations on execution_id + cue_id ported from cueapi/cueapi#594"},
{"path": "app/models/execution.py", "private_counterpart": "app/models/execution.py", "last_synced": "2026-04-16"},
{"path": "app/models/usage_monthly.py", "private_counterpart": "app/models/usage_monthly.py", "last_synced": "2026-04-16"},
{"path": "app/models/user.py", "private_counterpart": "app/models/user.py", "last_synced": "2026-04-16"},
Expand Down
82 changes: 82 additions & 0 deletions tests/test_dispatch_outbox_model_drift.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""Regression: ensure DispatchOutbox model agrees with its migration history.

Migration 002 created ``dispatch_outbox`` with:
- ``id BIGSERIAL PRIMARY KEY``
- ``execution_id UUID REFERENCES executions(id) ON DELETE CASCADE`` (NOT NULL)
- ``cue_id VARCHAR(20)`` (NOT NULL, no FK declared in migration)

Migration 021 (messaging primitive) relaxed both ``execution_id`` and
``cue_id`` to NULLABLE to support message-task rows, extended the
``valid_task_type`` check, and added a ``task_payload_shape`` check.

The model historically declared ``id`` as plain ``Integer`` (32-bit) and
omitted the ``ForeignKey`` declarations on both ``execution_id`` and
``cue_id``. Tests use ``Base.metadata.create_all`` (NOT alembic
migrations) to spin up the schema, so a model that drifts from the
intended schema silently builds a slightly different table for the test
suite than what production runs.

This test asserts the model carries both FK declarations and a 64-bit
id. If someone reintroduces ``Integer`` or drops a FK, this test fails
before the drift can ship.

Ported from cueapi/cueapi#594.
"""
from __future__ import annotations

from sqlalchemy import BigInteger, Integer

from app.models.dispatch_outbox import DispatchOutbox


def test_id_is_bigint() -> None:
id_col = DispatchOutbox.__table__.c.id
assert isinstance(id_col.type, BigInteger), (
f"DispatchOutbox.id must be BigInteger to match migration 002 "
f"(BIGSERIAL); got {type(id_col.type).__name__}."
)
assert not (type(id_col.type) is Integer), (
"DispatchOutbox.id is plain Integer (32-bit); migration 002 "
"uses BIGSERIAL. Use BigInteger."
)


def test_execution_id_has_cascade_fk() -> None:
exec_col = DispatchOutbox.__table__.c.execution_id
fks = list(exec_col.foreign_keys)
assert len(fks) == 1, (
f"DispatchOutbox.execution_id must have a FK to executions(id) "
f"per migration 002; got {len(fks)} foreign keys."
)
fk = fks[0]
assert fk.column.table.name == "executions"
assert fk.column.name == "id"
assert fk.ondelete == "CASCADE"


def test_cue_id_has_cascade_fk() -> None:
"""Model declares the FK to cues(id) ON DELETE CASCADE.

The model originally omitted the FK declaration. The DB-level
constraint is enforced via the cues→dispatch_outbox cascade chain
when a cue is deleted (executions cascade-delete first, which then
cascades dispatch_outbox via the execution FK). But the SQLAlchemy
ORM didn't know about the direct FK, blocking any future
``relationship()`` traversal across the link.
"""
cue_col = DispatchOutbox.__table__.c.cue_id
fks = list(cue_col.foreign_keys)
assert len(fks) == 1, (
f"DispatchOutbox.cue_id must have a FK to cues(id); "
f"got {len(fks)} foreign keys."
)
fk = fks[0]
assert fk.column.table.name == "cues"
assert fk.column.name == "id"
assert fk.ondelete == "CASCADE"


def test_execution_id_and_cue_id_nullable_post_021() -> None:
"""Migration 021 relaxed both to NULLABLE for message-task rows."""
assert DispatchOutbox.__table__.c.execution_id.nullable is True
assert DispatchOutbox.__table__.c.cue_id.nullable is True
74 changes: 68 additions & 6 deletions tests/test_qa_observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,70 @@

from app.models.device_code import DeviceCode
from app.models.dispatch_outbox import DispatchOutbox
from app.models.execution import Execution
from worker.poller import cleanup_device_codes, cleanup_outbox

# Imported lazily inside _create_anchor_execution to avoid pulling extra
# model symbols at module import time.


async def _create_anchor_execution(db_session) -> uuid.UUID:
"""Create a real User + Cue + Execution and return the execution id.

Outbox rows have a FK on ``execution_id → executions.id``
(ON DELETE CASCADE) per migration 002. Tests that previously
inserted outbox rows with synthetic UUIDs relied on the model
omitting the FK; now that the model agrees with the migration
they must point at a real execution. Ported from cueapi/cueapi#594.
"""
from app.models.user import User
from app.models.cue import Cue
from app.utils.ids import (
generate_api_key,
generate_cue_id,
generate_webhook_secret,
get_api_key_prefix,
hash_api_key,
)

api_key = generate_api_key()
suffix = uuid.uuid4().hex[:8]
user = User(
email=f"obs-{suffix}@test.com",
api_key_hash=hash_api_key(api_key),
api_key_prefix=get_api_key_prefix(api_key),
webhook_secret=generate_webhook_secret(),
slug=f"obs-{suffix}",
)
db_session.add(user)
await db_session.flush()

cue = Cue(
id=generate_cue_id(),
user_id=user.id,
name=f"obs-cue-{suffix}",
status="active",
schedule_type="once",
schedule_timezone="UTC",
callback_url="http://localhost:19999/webhook",
callback_method="POST",
next_run=datetime.now(timezone.utc) - timedelta(minutes=1),
)
db_session.add(cue)
await db_session.flush()

exec_id = uuid.uuid4()
db_session.add(
Execution(
id=exec_id,
cue_id=cue.id,
scheduled_for=datetime.now(timezone.utc),
status="pending",
)
)
await db_session.commit()
return exec_id


# ── Health endpoint ──────────────────────────────────────────────────

Expand Down Expand Up @@ -39,12 +101,12 @@ async def test_health_includes_metrics(client):
async def test_outbox_cleanup_removes_old_rows(db_session, db_engine):
"""Dispatched outbox rows older than 7 days are deleted."""
old_time = datetime.now(timezone.utc) - timedelta(days=10)
exec_id = uuid.uuid4()
exec_id = await _create_anchor_execution(db_session)

await db_session.execute(
DispatchOutbox.__table__.insert().values(
execution_id=exec_id,
cue_id="cue_old000001",
cue_id=None,
task_type="deliver",
payload={},
dispatched=True,
Expand All @@ -67,12 +129,12 @@ async def test_outbox_cleanup_removes_old_rows(db_session, db_engine):
async def test_outbox_cleanup_keeps_recent(db_session, db_engine):
"""Dispatched outbox rows newer than 7 days are kept."""
recent_time = datetime.now(timezone.utc) - timedelta(days=1)
exec_id = uuid.uuid4()
exec_id = await _create_anchor_execution(db_session)

await db_session.execute(
DispatchOutbox.__table__.insert().values(
execution_id=exec_id,
cue_id="cue_recent0001",
cue_id=None,
task_type="deliver",
payload={},
dispatched=True,
Expand All @@ -94,12 +156,12 @@ async def test_outbox_cleanup_keeps_recent(db_session, db_engine):
async def test_outbox_cleanup_keeps_undispatched(db_session, db_engine):
"""Undispatched outbox rows are never cleaned up regardless of age."""
old_time = datetime.now(timezone.utc) - timedelta(days=30)
exec_id = uuid.uuid4()
exec_id = await _create_anchor_execution(db_session)

await db_session.execute(
DispatchOutbox.__table__.insert().values(
execution_id=exec_id,
cue_id="cue_undisp001",
cue_id=None,
task_type="deliver",
payload={},
dispatched=False,
Expand Down