diff --git a/alembic/versions/030_message_send_at.py b/alembic/versions/030_message_send_at.py new file mode 100644 index 0000000..b40c970 --- /dev/null +++ b/alembic/versions/030_message_send_at.py @@ -0,0 +1,73 @@ +"""§13 / Phase 12.1.7 — per-message scheduling on POST /v1/messages. + +Adds ``messages.send_at`` (TIMESTAMPTZ NULL) so callers can schedule a +message for future delivery in the same shape as the existing per-cue +schedule. Mirrors PR #618's cue-fire ``send_at`` semantics on the +messaging primitive. + +Behavior: + +* NULL = send now (existing behavior; backward compat for every row + in the messages table at migration time, plus every future call + that omits ``send_at``). +* Non-NULL future timestamp = recipient's inbox query gates with + ``send_at IS NULL OR send_at <= now()`` so the message is invisible + until its time. Push-delivery dispatch already gates on + ``DispatchOutbox.scheduled_at`` from Slice 3b — service layer plumbs + send_at into that column so the dispatcher delays the push too. +* Past timestamps are forgiving fallback ("send now"); enforced at the + service layer, not the column. + +Index covers the inbox-fetch hot path: per-recipient messages +filtered by send_at <= now(). Combined with the existing +``ix_messages_inbox(to_agent_id, delivery_state, created_at)``, this +adds a partial index on rows where send_at IS NOT NULL — the +common case (NULL) doesn't need this index since the existing inbox +index already excludes via the ``send_at IS NULL OR send_at <= now()`` +predicate when send_at is NULL. + +Revision ID: 030 +Revises: 029 +""" +from alembic import op +import sqlalchemy as sa + + +revision = "030" +down_revision = "029" + + +def upgrade(): + op.add_column( + "messages", + sa.Column("send_at", sa.DateTime(timezone=True), nullable=True), + ) + # Partial index: only rows where send_at IS NOT NULL. The hot path is + # "is the future-scheduled message past its send time yet?" — for + # rows with NULL send_at (the dominant case), the existing + # ix_messages_inbox already covers per-recipient lookups. + # + # CREATE INDEX CONCURRENTLY required because messages is large on + # prod (multi-million rows once shared messaging gets traffic). + # Postgres rejects CONCURRENTLY inside a transaction; alembic's + # autocommit_block opens a separate connection in autocommit mode. + with op.get_context().autocommit_block(): + op.create_index( + "ix_messages_send_at", + "messages", + ["send_at"], + postgresql_where=sa.text("send_at IS NOT NULL"), + postgresql_concurrently=True, + if_not_exists=True, + ) + + +def downgrade(): + with op.get_context().autocommit_block(): + op.drop_index( + "ix_messages_send_at", + table_name="messages", + postgresql_concurrently=True, + if_exists=True, + ) + op.drop_column("messages", "send_at") diff --git a/app/models/message.py b/app/models/message.py index d8dad67..6f453d7 100644 --- a/app/models/message.py +++ b/app/models/message.py @@ -116,6 +116,12 @@ class Message(Base): # threshold gets moved back to ``retry_ready`` so the dispatcher # can re-enqueue. Handles worker-crash-mid-delivery. delivering_started_at = Column(DateTime(timezone=True), nullable=True) + # §13 / Phase 12.1.7: scheduled-send. NULL = send now (existing + # behavior); future timestamp = inbox-fetch + push-delivery dispatch + # both gate on ``send_at IS NULL OR send_at <= now()`` so the + # message is invisible until its time. Past timestamps are forgiving + # fallback at the service layer (treated as send-now). + send_at = Column(DateTime(timezone=True), nullable=True) expires_at = Column( DateTime(timezone=True), nullable=False, diff --git a/app/routers/messages.py b/app/routers/messages.py index d8409cb..2726d2f 100644 --- a/app/routers/messages.py +++ b/app/routers/messages.py @@ -92,6 +92,7 @@ async def send_message( idempotency_key=idempotency_key, from_agent=from_agent, correlation_id=body.correlation_id, + send_at=body.send_at, ) status_code = 200 if was_dedup_hit else 201 headers = {} diff --git a/app/schemas/message.py b/app/schemas/message.py index ca60d62..7df74cc 100644 --- a/app/schemas/message.py +++ b/app/schemas/message.py @@ -52,6 +52,19 @@ class MessageCreate(BaseModel): "lookup. Null by default." ), ) + send_at: Optional[datetime] = Field( + default=None, + description=( + "§13 / Phase 12.1.7: optional UTC timestamp to schedule this " + "message for future delivery. NULL = send now (default). " + "When set in the future, the message sits in the recipient's " + "inbox-query gate until `send_at <= now()`, then becomes " + "fetchable. Push-delivery dispatch is also gated via " + "DispatchOutbox.scheduled_at. Past timestamps are forgiving " + "fallback (treated as send-now). Same semantics as cue-fire " + "`send_at` (PR #618)." + ), + ) class FromAgentRef(BaseModel): @@ -92,6 +105,7 @@ class MessageResponse(BaseModel): read_at: Optional[datetime] acked_at: Optional[datetime] failed_at: Optional[datetime] + send_at: Optional[datetime] = None expires_at: datetime # PR-2a-OSS additions — D3 RPC framing + dispatcher state surface. correlation_id: Optional[str] = None diff --git a/app/services/inbox_service.py b/app/services/inbox_service.py index 92feb9d..71fd21b 100644 --- a/app/services/inbox_service.py +++ b/app/services/inbox_service.py @@ -158,9 +158,18 @@ async def list_inbox( # for push-delivery context (not an auth predicate; correct). # No webhook delivery callback, cue-bus side effect, or audit # log scan uses ``Message.user_id`` as an auth boundary. + # §13 / Phase 12.1.7: gate scheduled-send messages out of the + # recipient's inbox until their time. NULL send_at means "send + # now" (existing behavior — covered by IS NULL); future timestamps + # become visible the first time the recipient polls after the + # send_at moment. + now_gate = datetime.now(timezone.utc) + send_at_gate = or_(Message.send_at.is_(None), Message.send_at <= now_gate) + base_filters = [ Message.to_agent_id == agent.id, Message.delivery_state.in_(state_tuple), + send_at_gate, ] if since is not None: base_filters.append(Message.created_at > since) @@ -189,17 +198,21 @@ async def list_inbox( # threads. (Pre-v1.1.1 the inbox poll was always full-inbox so # this distinction didn't exist.) if "queued" in state_tuple: - now = datetime.now(timezone.utc) + upd_now = datetime.now(timezone.utc) upd_predicates = [ Message.to_agent_id == agent.id, Message.delivery_state == "queued", + # §13: don't transition scheduled-but-not-yet-due messages. + # Their queued→delivered flip waits until send_at <= now() + # (whichever poll comes after the scheduled time). + or_(Message.send_at.is_(None), Message.send_at <= upd_now), ] if counterpart_agent is not None: upd_predicates.append(Message.from_agent_id == counterpart_agent.id) upd_q = ( update(Message) .where(*upd_predicates) - .values(delivery_state="delivered", delivered_at=now) + .values(delivery_state="delivered", delivered_at=upd_now) .returning(Message.id) ) await db.execute(upd_q) diff --git a/app/services/message_service.py b/app/services/message_service.py index 38fb8c0..cc2de20 100644 --- a/app/services/message_service.py +++ b/app/services/message_service.py @@ -142,6 +142,7 @@ async def create_message( idempotency_key: Optional[str], from_agent: Agent, correlation_id: Optional[str] = None, + send_at: Optional[datetime] = None, ) -> Tuple[Message, bool, bool]: """Send a message. Returns (message, was_dedup_hit, priority_downgraded). @@ -295,7 +296,19 @@ async def create_message( # 7. Generate id; thread_id == self.id for root messages, else inherits. msg_id = generate_message_id() thread_id = inherited_thread_id or msg_id - expires_at = datetime.now(timezone.utc) + timedelta(days=settings.MESSAGE_TTL_DAYS) + now = datetime.now(timezone.utc) + expires_at = now + timedelta(days=settings.MESSAGE_TTL_DAYS) + + # §13 / Phase 12.1.7: per-message scheduling. ``send_at`` in the + # future delays delivery; past timestamps are forgiving fallback + # (treated as send-now — caller doesn't have to worry about clock + # skew or being a few ms late). NULL stays NULL. Same shape as + # cue-fire send_at (PR #618). + effective_send_at: Optional[datetime] = None + is_scheduled = False + if send_at is not None and send_at > now: + effective_send_at = send_at + is_scheduled = True msg = Message( id=msg_id, @@ -314,6 +327,7 @@ async def create_message( metadata_=metadata or {}, idempotency_key=idempotency_key, idempotency_fingerprint=fingerprint if idempotency_key else None, + send_at=effective_send_at, expires_at=expires_at, # PR-2a-OSS port — D2 (priority two-axis) + D3 (RPC framing). # Bucket = priority verbatim at v1; future tier policy reads @@ -377,6 +391,13 @@ async def create_message( execution_id=None, cue_id=None, task_type="deliver_message", + # §13: when send_at is in the future, set scheduled_at so + # the dispatcher gates dispatch until then. NULL = + # dispatch immediately (existing behavior). The + # dispatcher's ``scheduled_at IS NULL OR scheduled_at <= + # now()`` filter handles the gating. Same plumbing as + # cue-fire send_at (PR #618). + scheduled_at=effective_send_at if is_scheduled else None, payload={ "message_id": msg_id, "to_agent_id": to_agent.id, @@ -554,6 +575,7 @@ def to_response_dict(msg: Message) -> Dict: "read_at": msg.read_at, "acked_at": msg.acked_at, "failed_at": msg.failed_at, + "send_at": msg.send_at, "expires_at": msg.expires_at, "correlation_id": msg.correlation_id, "dispatch_priority_bucket": msg.dispatch_priority_bucket, diff --git a/parity-manifest.json b/parity-manifest.json index dec5a5b..914bd4a 100644 --- a/parity-manifest.json +++ b/parity-manifest.json @@ -75,7 +75,8 @@ {"path": "alembic/versions/015_widen_device_code_column.py", "private_counterpart": "alembic/versions/015_widen_device_code_column.py", "last_synced": "2026-04-16"}, {"path": "alembic/versions/016_add_on_failure_column.py", "private_counterpart": "alembic/versions/016_add_on_failure_column.py", "last_synced": "2026-04-16"}, {"path": "alembic/versions/028_event_emit_primitive.py", "private_counterpart": "alembic/versions/058_event_emit_primitive.py", "last_synced": "2026-05-11", "ported_in": "event-emit-primitive-port (PR-1b)", "deviation": "Renumbered 058 → 028 (cueapi-core HEAD was 27 revisions behind private). Schema verbatim."}, - {"path": "alembic/versions/029_messaging_emission_columns.py", "private_counterpart": "alembic/versions/059_messaging_emission_columns.py", "last_synced": "2026-05-11", "ported_in": "messaging-emission-port (PR-2a)", "deviation": "Renumbered 059 → 029. Schema verbatim: dispatch_priority_bucket + message_dispatch_error + correlation_id columns + partial-where index on correlation_id."} + {"path": "alembic/versions/029_messaging_emission_columns.py", "private_counterpart": "alembic/versions/059_messaging_emission_columns.py", "last_synced": "2026-05-11", "ported_in": "messaging-emission-port (PR-2a)", "deviation": "Renumbered 059 → 029. Schema verbatim: dispatch_priority_bucket + message_dispatch_error + correlation_id columns + partial-where index on correlation_id."}, + {"path": "alembic/versions/030_message_send_at.py", "private_counterpart": "alembic/versions/047_message_send_at.py", "last_synced": "2026-05-11", "ported_in": "message-send-at-port (private #623)", "deviation": "Renumbered 047 → 030. Schema verbatim: messages.send_at TIMESTAMPTZ NULL + partial index ix_messages_send_at (WHERE send_at IS NOT NULL) built CONCURRENTLY."} ], "app_core": [ {"path": "app/__init__.py", "private_counterpart": "app/__init__.py", "last_synced": "2026-04-16"}, diff --git a/tests/test_message_send_at.py b/tests/test_message_send_at.py new file mode 100644 index 0000000..e1c0458 --- /dev/null +++ b/tests/test_message_send_at.py @@ -0,0 +1,199 @@ +"""Tests for §13 (Phase 12.1.7): per-message scheduling on POST /v1/messages. + +Optional ``send_at`` timestamp on MessageCreate delays delivery until +the time elapses. Same shape as cue-fire send_at (PR #618), ported to +the messaging primitive. + +These tests pin: + +1. ``send_at`` omitted → existing behavior (immediate delivery, + inbox shows the message). +2. ``send_at`` in the future → message persisted but recipient's + inbox query gates it out until the time passes. +3. ``send_at`` in the future → DispatchOutbox.scheduled_at set so push + delivery is also gated. +4. ``send_at`` in the past → forgiving fallback to "send now" (no error). +5. Recipient's inbox queued→delivered transition skips + scheduled-but-not-yet-due messages. +6. Sender's `sent` view DOES show scheduled messages (they should see + what they queued). +7. Response body exposes the persisted send_at value. +""" +from __future__ import annotations + +from datetime import datetime, timedelta, timezone + +import pytest +from sqlalchemy import select + +from app.models.dispatch_outbox import DispatchOutbox +from app.models.message import Message + + +async def _create_agent(client, auth_headers, slug, *, webhook_url=None): + body = {"slug": slug, "display_name": slug.title()} + if webhook_url: + body["webhook_url"] = webhook_url + resp = await client.post("/v1/agents", json=body, headers=auth_headers) + assert resp.status_code in (200, 201), resp.text + return resp.json() + + +async def _send(client, auth_headers, from_agent_id, body): + return await client.post( + "/v1/messages", + json=body, + headers={**auth_headers, "X-Cueapi-From-Agent": from_agent_id}, + ) + + +@pytest.mark.asyncio +async def test_send_at_omitted_immediate_delivery(client, auth_headers): + sender = await _create_agent(client, auth_headers, "sa-sender-1") + rcpt = await _create_agent(client, auth_headers, "sa-rcpt-1") + resp = await _send(client, auth_headers, sender["id"], {"to": rcpt["id"], "body": "hi"}) + assert resp.status_code == 201 + assert resp.json()["send_at"] is None + + # Recipient's inbox includes the message immediately. + inbox = await client.get(f"/v1/agents/{rcpt['id']}/inbox", headers=auth_headers) + assert inbox.status_code == 200 + msg_ids = [m["id"] for m in inbox.json()["messages"]] + assert resp.json()["id"] in msg_ids + + +@pytest.mark.asyncio +async def test_send_at_future_invisible_in_inbox(client, auth_headers, db_session): + sender = await _create_agent(client, auth_headers, "sa-sender-2") + rcpt = await _create_agent(client, auth_headers, "sa-rcpt-2") + future = datetime.now(timezone.utc) + timedelta(hours=2) + resp = await _send( + client, auth_headers, sender["id"], + {"to": rcpt["id"], "body": "future-msg", "send_at": future.isoformat()}, + ) + assert resp.status_code == 201 + msg_id = resp.json()["id"] + # Response reflects the future send_at. + parsed = datetime.fromisoformat(resp.json()["send_at"]) + assert abs((parsed - future).total_seconds()) < 1.0 + + # Persisted on the row. + msg = (await db_session.execute(select(Message).where(Message.id == msg_id))).scalar_one() + assert msg.send_at is not None + assert abs((msg.send_at - future).total_seconds()) < 1.0 + # Still in queued state — no premature delivery transition. + assert msg.delivery_state == "queued" + + # Recipient's inbox does NOT show the message yet. + inbox = await client.get(f"/v1/agents/{rcpt['id']}/inbox", headers=auth_headers) + assert inbox.status_code == 200 + msg_ids = [m["id"] for m in inbox.json()["messages"]] + assert msg_id not in msg_ids + + +@pytest.mark.asyncio +async def test_send_at_future_outbox_scheduled_at_set(client, auth_headers, db_session): + """When recipient has a webhook, the outbox row's scheduled_at is + set so the dispatcher gates push delivery until send_at.""" + sender = await _create_agent(client, auth_headers, "sa-sender-3") + rcpt = await _create_agent( + client, auth_headers, "sa-rcpt-3", webhook_url="https://example.com/wh" + ) + future = datetime.now(timezone.utc) + timedelta(hours=1) + resp = await _send( + client, auth_headers, sender["id"], + {"to": rcpt["id"], "body": "scheduled push", "send_at": future.isoformat()}, + ) + assert resp.status_code == 201 + msg_id = resp.json()["id"] + + outbox = ( + await db_session.execute( + select(DispatchOutbox).where( + DispatchOutbox.task_type == "deliver_message", + DispatchOutbox.payload["message_id"].astext == msg_id, + ) + ) + ).scalar_one() + assert outbox.scheduled_at is not None + assert abs((outbox.scheduled_at - future).total_seconds()) < 1.0 + + +@pytest.mark.asyncio +async def test_send_at_past_falls_back_to_now(client, auth_headers, db_session): + sender = await _create_agent(client, auth_headers, "sa-sender-4") + rcpt = await _create_agent(client, auth_headers, "sa-rcpt-4") + past = datetime.now(timezone.utc) - timedelta(hours=1) + resp = await _send( + client, auth_headers, sender["id"], + {"to": rcpt["id"], "body": "past-msg", "send_at": past.isoformat()}, + ) + assert resp.status_code == 201 + msg_id = resp.json()["id"] + + msg = (await db_session.execute(select(Message).where(Message.id == msg_id))).scalar_one() + # Past timestamp → forgiving fallback: send_at stays NULL on the row. + assert msg.send_at is None + # Inbox shows it immediately (no gate). + inbox = await client.get(f"/v1/agents/{rcpt['id']}/inbox", headers=auth_headers) + msg_ids = [m["id"] for m in inbox.json()["messages"]] + assert msg_id in msg_ids + + +@pytest.mark.asyncio +async def test_send_at_future_visible_after_send_at_passes(client, auth_headers, db_session): + """When a scheduled message's send_at falls into the past, the + next inbox poll surfaces it AND atomically transitions it to delivered.""" + sender = await _create_agent(client, auth_headers, "sa-sender-5") + rcpt = await _create_agent(client, auth_headers, "sa-rcpt-5") + future = datetime.now(timezone.utc) + timedelta(hours=1) + resp = await _send( + client, auth_headers, sender["id"], + {"to": rcpt["id"], "body": "future-msg-5", "send_at": future.isoformat()}, + ) + msg_id = resp.json()["id"] + + # Cheat: shift send_at into the past via direct DB update (simulates + # time passing). + msg = (await db_session.execute(select(Message).where(Message.id == msg_id))).scalar_one() + msg.send_at = datetime.now(timezone.utc) - timedelta(seconds=5) + await db_session.commit() + + inbox = await client.get(f"/v1/agents/{rcpt['id']}/inbox", headers=auth_headers) + msg_ids = [m["id"] for m in inbox.json()["messages"]] + assert msg_id in msg_ids + + # And the queued→delivered transition fired. + db_session.expire_all() + msg2 = (await db_session.execute(select(Message).where(Message.id == msg_id))).scalar_one() + assert msg2.delivery_state == "delivered" + + +@pytest.mark.asyncio +async def test_send_at_future_visible_in_sender_sent_view(client, auth_headers): + """Sender's `sent` view shows scheduled messages (they queued them + deliberately; should see them).""" + sender = await _create_agent(client, auth_headers, "sa-sender-6") + rcpt = await _create_agent(client, auth_headers, "sa-rcpt-6") + future = datetime.now(timezone.utc) + timedelta(hours=2) + resp = await _send( + client, auth_headers, sender["id"], + {"to": rcpt["id"], "body": "scheduled", "send_at": future.isoformat()}, + ) + msg_id = resp.json()["id"] + + sent = await client.get(f"/v1/agents/{sender['id']}/sent", headers=auth_headers) + assert sent.status_code == 200 + msg_ids = [m["id"] for m in sent.json()["messages"]] + assert msg_id in msg_ids + + +@pytest.mark.asyncio +async def test_send_at_invalid_timestamp_returns_422(client, auth_headers): + sender = await _create_agent(client, auth_headers, "sa-sender-7") + rcpt = await _create_agent(client, auth_headers, "sa-rcpt-7") + resp = await _send( + client, auth_headers, sender["id"], + {"to": rcpt["id"], "body": "bad", "send_at": "not-a-date"}, + ) + assert resp.status_code in (400, 422)