diff --git a/app/routers/cues.py b/app/routers/cues.py index cefa183..0638180 100644 --- a/app/routers/cues.py +++ b/app/routers/cues.py @@ -7,7 +7,7 @@ from app.auth import AuthenticatedUser, get_current_user from app.database import get_db -from app.schemas.cue import CueCreate, CueDetailResponse, CueListResponse, CueResponse, CueUpdate +from app.schemas.cue import CueCreate, CueDetailResponse, CueListResponse, CueResponse, CueUpdate, FireRequest from app.services.cue_service import create_cue, delete_cue, get_cue, list_cues, update_cue router = APIRouter(prefix="/v1/cues", tags=["cues"]) @@ -91,10 +91,22 @@ async def delete( @router.post("/{cue_id}/fire", status_code=200) async def fire_cue( cue_id: str, + body: Optional[FireRequest] = None, user: AuthenticatedUser = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): - """Manually fire a cue — creates an execution immediately regardless of schedule.""" + """Manually fire a cue — creates an execution immediately regardless of schedule. + + Optional body (``FireRequest``): + + * ``send_at`` (datetime): schedule this fire for a future time. When + set, ``execution.scheduled_for`` and ``dispatch_outbox.scheduled_at`` + are set so the dispatcher gates dispatch until then. Past + timestamps are treated as "fire now" (no error). Phase 12.1.7 / + roadmap §13. + + No body fires immediately (existing behavior). + """ import uuid as uuid_mod from datetime import datetime, timezone from sqlalchemy import select @@ -109,7 +121,21 @@ async def fire_cue( now = datetime.now(timezone.utc) execution_id = uuid_mod.uuid4() - execution = Execution(id=execution_id, cue_id=cue.id, scheduled_for=now, status="pending", triggered_by="manual_fire") + + # §13: per-fire scheduling. Future send_at → schedule; past/absent → + # fire now. Forgiving: past timestamps are not rejected, just + # treated as immediate. + requested_at = body.send_at if body and body.send_at else None + effective_scheduled_for = requested_at if requested_at and requested_at > now else now + is_scheduled = effective_scheduled_for > now + + execution = Execution( + id=execution_id, + cue_id=cue.id, + scheduled_for=effective_scheduled_for, + status="pending", + triggered_by="manual_fire", + ) db.add(execution) if cue.callback_transport == "webhook" and cue.callback_url: @@ -118,11 +144,17 @@ async def fire_cue( ws = user_row.scalar_one_or_none() or "" outbox = DispatchOutbox( execution_id=execution_id, cue_id=cue.id, task_type="deliver", + # §13: when send_at is in the future, set scheduled_at so the + # dispatcher gates dispatch until then. NULL = dispatch + # immediately (existing behavior). The dispatcher's existing + # ``scheduled_at IS NULL OR scheduled_at <= now()`` filter + # already does the gating; we just plumb the timestamp. + scheduled_at=effective_scheduled_for if is_scheduled else None, payload={ "execution_id": str(execution_id), "cue_id": cue.id, "cue_name": cue.name, "user_id": str(user.id), "callback_url": cue.callback_url, "callback_method": cue.callback_method, "callback_headers": cue.callback_headers or {}, - "payload": cue.payload or {}, "scheduled_for": now.isoformat(), + "payload": cue.payload or {}, "scheduled_for": effective_scheduled_for.isoformat(), "retry_max_attempts": cue.retry_max_attempts, "retry_backoff_minutes": cue.retry_backoff_minutes or [1, 5, 15], "webhook_secret": ws, @@ -131,4 +163,10 @@ async def fire_cue( db.add(outbox) await db.commit() - return {"id": str(execution_id), "cue_id": cue.id, "scheduled_for": now.isoformat(), "status": "pending", "triggered_by": "manual_fire"} + return { + "id": str(execution_id), + "cue_id": cue.id, + "scheduled_for": effective_scheduled_for.isoformat(), + "status": "pending", + "triggered_by": "manual_fire", + } diff --git a/app/schemas/cue.py b/app/schemas/cue.py index f613031..1974ff1 100644 --- a/app/schemas/cue.py +++ b/app/schemas/cue.py @@ -63,6 +63,28 @@ class OnFailureConfig(BaseModel): pause: bool = False +class FireRequest(BaseModel): + """Body for ``POST /v1/cues/{id}/fire`` (all fields optional). + + No body / empty body fires immediately with ``cue.payload`` as-is. + """ + + model_config = ConfigDict(extra="forbid") + + send_at: Optional[datetime] = Field( + default=None, + description=( + "Optional UTC timestamp to schedule this fire for the future. " + "When set, the resulting execution sits in `pending` until " + "`send_at <= now()`, then enters the normal dispatch path. " + "Same shape as the existing per-cue schedule, but per-fire. " + "Past timestamps are treated as 'fire now' (idempotent + " + "forgiving — no error, just dispatches immediately). Phase " + "12.1.7 / roadmap §13." + ), + ) + + class CueCreate(BaseModel): name: str = Field(..., max_length=255) description: Optional[str] = None diff --git a/parity-manifest.json b/parity-manifest.json index a17196f..7d1af2f 100644 --- a/parity-manifest.json +++ b/parity-manifest.json @@ -94,7 +94,7 @@ {"path": "app/routers/__init__.py", "private_counterpart": "app/routers/__init__.py", "last_synced": "2026-04-16"}, {"path": "app/routers/alerts.py", "private_counterpart": "app/routers/alerts.py", "last_synced": "2026-04-17"}, {"path": "app/routers/auth_routes.py", "private_counterpart": "app/routers/auth_routes.py", "last_synced": "2026-04-16"}, - {"path": "app/routers/cues.py", "private_counterpart": "app/routers/cues.py", "last_synced": "2026-04-16"}, + {"path": "app/routers/cues.py", "private_counterpart": "app/routers/cues.py", "last_synced": "2026-05-05", "ported_in": "port/618-cue-fire-send-at", "notes": "Adds FireRequest body + send_at plumbing through Execution.scheduled_for and DispatchOutbox.scheduled_at. Ports cueapi/cueapi#618."}, {"path": "app/routers/device_code.py", "private_counterpart": "app/routers/device_code.py", "last_synced": "2026-04-16"}, {"path": "app/routers/echo.py", "private_counterpart": "app/routers/echo.py", "last_synced": "2026-04-16"}, {"path": "app/routers/executions.py", "private_counterpart": "app/routers/executions.py", "last_synced": "2026-04-16"}, @@ -108,7 +108,7 @@ "schemas": [ {"path": "app/schemas/__init__.py", "private_counterpart": "app/schemas/__init__.py", "last_synced": "2026-04-16"}, {"path": "app/schemas/alert.py", "private_counterpart": "app/schemas/alert.py", "last_synced": "2026-04-17"}, - {"path": "app/schemas/cue.py", "private_counterpart": "app/schemas/cue.py", "last_synced": "2026-04-16"}, + {"path": "app/schemas/cue.py", "private_counterpart": "app/schemas/cue.py", "last_synced": "2026-05-05", "ported_in": "port/618-cue-fire-send-at", "notes": "Adds FireRequest schema with send_at field. Ports cueapi/cueapi#618."}, {"path": "app/schemas/execution.py", "private_counterpart": "app/schemas/execution.py", "last_synced": "2026-04-16"}, {"path": "app/schemas/outcome.py", "private_counterpart": "app/schemas/outcome.py", "last_synced": "2026-04-16"}, {"path": "app/schemas/worker.py", "private_counterpart": "app/schemas/worker.py", "last_synced": "2026-04-16"}, diff --git a/tests/test_fire_send_at.py b/tests/test_fire_send_at.py new file mode 100644 index 0000000..73aad7f --- /dev/null +++ b/tests/test_fire_send_at.py @@ -0,0 +1,179 @@ +"""Tests for §13 (Phase 12.1.7): per-fire scheduling on POST /v1/cues/{id}/fire. + +Roadmap doc §13: optional `send_at` timestamp on fire that delays dispatch +until the time elapsed. Same shape as cue's per-cue schedule, but per-fire. + +Ported from cueapi/cueapi#618. The private repo's test_fire_send_at.py +also covers a `payload_override` compose case; that field belongs to +private PR #575 (require_payload_override) which is a separate parity +port and not in this OSS port. + +These tests pin: + +1. No `send_at` (or omitted) → dispatch immediately (existing behavior). +2. `send_at` in the future → execution's ``scheduled_for`` is set to + send_at; outbox row has ``scheduled_at`` set so the dispatcher's + existing ``scheduled_at IS NULL OR scheduled_at <= now()`` filter + gates dispatch until then. +3. `send_at` in the past → forgiving fallback to "fire now" (no error). + ``scheduled_for`` set to now; outbox ``scheduled_at`` left NULL. +4. Invalid timestamps return 422. +5. Worker-transport cues don't create an outbox row but + ``scheduled_for`` on the Execution row still reflects send_at. +""" +from __future__ import annotations + +import uuid +from datetime import datetime, timedelta, timezone + +import pytest +from sqlalchemy import select + +from app.models.dispatch_outbox import DispatchOutbox +from app.models.execution import Execution + + +async def _create_cue(client, auth_headers, name="send-at-test"): + resp = await client.post( + "/v1/cues", + json={ + "name": name, + "schedule": {"type": "recurring", "cron": "0 * * * *"}, + "callback": {"url": "https://example.com/webhook"}, + "payload": {"task": "send_at_default"}, + }, + headers=auth_headers, + ) + assert resp.status_code == 201, resp.text + return resp.json()["id"] + + +@pytest.mark.asyncio +async def test_fire_no_send_at_dispatches_immediately(client, auth_headers, db_session): + """Existing behavior preserved: no body or no send_at → outbox.scheduled_at NULL.""" + cue_id = await _create_cue(client, auth_headers, "send-at-immediate") + + resp = await client.post(f"/v1/cues/{cue_id}/fire", headers=auth_headers) + assert resp.status_code == 200 + exec_id = resp.json()["id"] + + outbox = ( + await db_session.execute( + select(DispatchOutbox).where(DispatchOutbox.execution_id == uuid.UUID(exec_id)) + ) + ).scalar_one() + assert outbox.scheduled_at is None, ( + "no send_at → outbox.scheduled_at must be NULL so dispatcher fires immediately" + ) + + +@pytest.mark.asyncio +async def test_fire_send_at_future_delays_dispatch(client, auth_headers, db_session): + cue_id = await _create_cue(client, auth_headers, "send-at-future") + future = datetime.now(timezone.utc) + timedelta(hours=2) + + resp = await client.post( + f"/v1/cues/{cue_id}/fire", + json={"send_at": future.isoformat()}, + headers=auth_headers, + ) + assert resp.status_code == 200 + body = resp.json() + exec_id = body["id"] + + parsed = datetime.fromisoformat(body["scheduled_for"]) + assert abs((parsed - future).total_seconds()) < 1.0 + + execution = ( + await db_session.execute(select(Execution).where(Execution.id == uuid.UUID(exec_id))) + ).scalar_one() + assert abs((execution.scheduled_for - future).total_seconds()) < 1.0 + + outbox = ( + await db_session.execute( + select(DispatchOutbox).where(DispatchOutbox.execution_id == uuid.UUID(exec_id)) + ) + ).scalar_one() + assert outbox.scheduled_at is not None, "send_at in future → outbox.scheduled_at must be set" + assert abs((outbox.scheduled_at - future).total_seconds()) < 1.0 + + +@pytest.mark.asyncio +async def test_fire_send_at_past_falls_back_to_now(client, auth_headers, db_session): + """Past timestamps are forgiving — no error, treated as 'fire now'. + + Idempotent: callers don't need to worry about clock skew or being + a few ms late after computing a send_at locally. + """ + cue_id = await _create_cue(client, auth_headers, "send-at-past") + past = datetime.now(timezone.utc) - timedelta(hours=1) + + resp = await client.post( + f"/v1/cues/{cue_id}/fire", + json={"send_at": past.isoformat()}, + headers=auth_headers, + ) + assert resp.status_code == 200 + exec_id = resp.json()["id"] + + outbox = ( + await db_session.execute( + select(DispatchOutbox).where(DispatchOutbox.execution_id == uuid.UUID(exec_id)) + ) + ).scalar_one() + assert outbox.scheduled_at is None, ( + "send_at in past → forgiving fallback; outbox.scheduled_at must be NULL" + ) + + +@pytest.mark.asyncio +async def test_fire_send_at_invalid_timestamp_returns_422(client, auth_headers): + """Pydantic catches malformed datetime strings.""" + cue_id = await _create_cue(client, auth_headers, "send-at-invalid") + + resp = await client.post( + f"/v1/cues/{cue_id}/fire", + json={"send_at": "not-a-date"}, + headers=auth_headers, + ) + assert resp.status_code in (400, 422) + + +@pytest.mark.asyncio +async def test_fire_send_at_worker_transport_no_outbox(client, auth_headers, db_session): + """Worker-transport cues don't create an outbox row, but ``scheduled_for`` + on the Execution row still reflects send_at so worker pull endpoints + can filter by it (not done here — just pin the Execution shape).""" + create = await client.post( + "/v1/cues", + json={ + "name": "send-at-worker", + "schedule": {"type": "recurring", "cron": "0 * * * *"}, + "transport": "worker", + "payload": {"task": "scheduled_worker_task"}, + }, + headers=auth_headers, + ) + assert create.status_code == 201, create.text + cue_id = create.json()["id"] + + future = datetime.now(timezone.utc) + timedelta(minutes=30) + resp = await client.post( + f"/v1/cues/{cue_id}/fire", + json={"send_at": future.isoformat()}, + headers=auth_headers, + ) + assert resp.status_code == 200 + exec_id = resp.json()["id"] + + execution = ( + await db_session.execute(select(Execution).where(Execution.id == uuid.UUID(exec_id))) + ).scalar_one() + assert abs((execution.scheduled_for - future).total_seconds()) < 1.0 + + outbox = ( + await db_session.execute( + select(DispatchOutbox).where(DispatchOutbox.execution_id == uuid.UUID(exec_id)) + ) + ).scalar_one_or_none() + assert outbox is None