Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions app/models/dispatch_outbox.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from sqlalchemy import Boolean, CheckConstraint, Column, DateTime, Integer, String, Text, func
from sqlalchemy import Boolean, CheckConstraint, Column, DateTime, ForeignKey, Integer, String, Text, func
from sqlalchemy.dialects.postgresql import JSONB, UUID

from app.database import Base
Expand All @@ -13,8 +13,21 @@ class DispatchOutbox(Base):
# Nullable since migration 021: cue-task rows still set execution_id +
# cue_id; message-task rows (deliver_message / retry_message) leave
# them NULL and reference message_id in the payload instead.
execution_id = Column(UUID(as_uuid=True), nullable=True)
cue_id = Column(String(20), nullable=True)
# FK declarations match private cueapi: migration 002 declared the
# DB-level FKs to executions.id and cues.id with ondelete=CASCADE; the
# model previously omitted the FK declaration which was benign drift
# (DB constraint still enforced) but broke any future SQLAlchemy ORM
# relationship() traversal. Parity port of cueapi/cueapi#594.
execution_id = Column(
UUID(as_uuid=True),
ForeignKey("executions.id", ondelete="CASCADE"),
nullable=True,
)
cue_id = Column(
String(20),
ForeignKey("cues.id", ondelete="CASCADE"),
nullable=True,
)
task_type = Column(String(20), nullable=False, default="deliver")
payload = Column(JSONB, nullable=False, default={})
dispatched = Column(Boolean, nullable=False, default=False)
Expand Down
41 changes: 35 additions & 6 deletions tests/test_qa_observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,35 @@

from app.models.device_code import DeviceCode
from app.models.dispatch_outbox import DispatchOutbox
from app.models.execution import Execution
from tests.test_poller import _create_due_cue, _create_test_user
from worker.poller import cleanup_device_codes, cleanup_outbox


async def _create_anchor_execution(db_session) -> uuid.UUID:
"""Create a real User + Cue + Execution and return the execution id.

Outbox rows have a FK on ``execution_id → executions.id``
(ON DELETE CASCADE) per migration 002. Tests that previously
inserted outbox rows with synthetic UUIDs relied on the model
omitting the FK; now that the model agrees with the migration
they must point at a real execution.
"""
user_id = await _create_test_user(db_session)
cue = await _create_due_cue(db_session, user_id)
exec_id = uuid.uuid4()
db_session.add(
Execution(
id=exec_id,
cue_id=cue.id,
scheduled_for=datetime.now(timezone.utc),
status="pending",
)
)
await db_session.commit()
return exec_id


# ── Health endpoint ──────────────────────────────────────────────────


Expand Down Expand Up @@ -39,12 +65,15 @@ async def test_health_includes_metrics(client):
async def test_outbox_cleanup_removes_old_rows(db_session, db_engine):
"""Dispatched outbox rows older than 7 days are deleted."""
old_time = datetime.now(timezone.utc) - timedelta(days=10)
exec_id = uuid.uuid4()
exec_id = await _create_anchor_execution(db_session)

await db_session.execute(
DispatchOutbox.__table__.insert().values(
execution_id=exec_id,
cue_id="cue_old000001",
# cue_id NULL — this fixture tests outbox cleanup, not cue
# association. Synthetic IDs broke the cues(id) FK once it
# was declared on the model (PR #594 second-half drift fix).
cue_id=None,
task_type="deliver",
payload={},
dispatched=True,
Expand All @@ -67,12 +96,12 @@ async def test_outbox_cleanup_removes_old_rows(db_session, db_engine):
async def test_outbox_cleanup_keeps_recent(db_session, db_engine):
"""Dispatched outbox rows newer than 7 days are kept."""
recent_time = datetime.now(timezone.utc) - timedelta(days=1)
exec_id = uuid.uuid4()
exec_id = await _create_anchor_execution(db_session)

await db_session.execute(
DispatchOutbox.__table__.insert().values(
execution_id=exec_id,
cue_id="cue_recent0001",
cue_id=None, # see test_outbox_cleanup_removes_old_rows
task_type="deliver",
payload={},
dispatched=True,
Expand All @@ -94,12 +123,12 @@ async def test_outbox_cleanup_keeps_recent(db_session, db_engine):
async def test_outbox_cleanup_keeps_undispatched(db_session, db_engine):
"""Undispatched outbox rows are never cleaned up regardless of age."""
old_time = datetime.now(timezone.utc) - timedelta(days=30)
exec_id = uuid.uuid4()
exec_id = await _create_anchor_execution(db_session)

await db_session.execute(
DispatchOutbox.__table__.insert().values(
execution_id=exec_id,
cue_id="cue_undisp001",
cue_id=None, # see test_outbox_cleanup_removes_old_rows
task_type="deliver",
payload={},
dispatched=False,
Expand Down