diff --git a/alembic/versions/024_message_send_at.py b/alembic/versions/024_message_send_at.py new file mode 100644 index 0000000..cca10fd --- /dev/null +++ b/alembic/versions/024_message_send_at.py @@ -0,0 +1,65 @@ +"""§13 / Phase 12.1.7 — per-message scheduling on POST /v1/messages. + +Adds ``messages.send_at`` (TIMESTAMPTZ NULL) so callers can schedule a +message for future delivery in the same shape as the existing per-cue +schedule. Ports cueapi/cueapi#623 to OSS. + +Behavior: + +* NULL = send now (existing behavior; backward compat for every row + in the messages table at migration time, plus every future call + that omits ``send_at``). +* Non-NULL future timestamp = recipient's inbox query gates with + ``send_at IS NULL OR send_at <= now()`` so the message is invisible + until its time. Push-delivery dispatch already gates on + ``DispatchOutbox.scheduled_at`` from migration 022 — service layer + plumbs send_at into that column so the dispatcher delays the push + too. +* Past timestamps are forgiving fallback ("send now"); enforced at the + service layer, not the column. + +Index covers the inbox-fetch hot path: per-recipient messages +filtered by send_at <= now(). Partial index on rows where +send_at IS NOT NULL — the common case (NULL) doesn't need this index +since the existing per-recipient inbox index already covers that path. + +Revision ID: 024 +Revises: 023 +""" +from alembic import op +import sqlalchemy as sa + + +revision = "024" +down_revision = "023" + + +def upgrade(): + op.add_column( + "messages", + sa.Column("send_at", sa.DateTime(timezone=True), nullable=True), + ) + # CREATE INDEX CONCURRENTLY: avoids ACCESS EXCLUSIVE lock on a + # potentially large messages table. Postgres rejects CONCURRENTLY + # inside a transaction; alembic's autocommit_block opens a separate + # connection in autocommit mode. + with op.get_context().autocommit_block(): + op.create_index( + "ix_messages_send_at", + "messages", + ["send_at"], + postgresql_where=sa.text("send_at IS NOT NULL"), + postgresql_concurrently=True, + if_not_exists=True, + ) + + +def downgrade(): + with op.get_context().autocommit_block(): + op.drop_index( + "ix_messages_send_at", + table_name="messages", + postgresql_concurrently=True, + if_exists=True, + ) + op.drop_column("messages", "send_at") diff --git a/app/models/message.py b/app/models/message.py index 8c5fafa..29d999c 100644 --- a/app/models/message.py +++ b/app/models/message.py @@ -115,6 +115,12 @@ class Message(Base): # threshold gets moved back to ``retry_ready`` so the dispatcher # can re-enqueue. Handles worker-crash-mid-delivery. delivering_started_at = Column(DateTime(timezone=True), nullable=True) + # §13 / Phase 12.1.7: scheduled-send. NULL = send now (existing + # behavior); future timestamp = inbox-fetch + push-delivery dispatch + # both gate on ``send_at IS NULL OR send_at <= now()`` so the + # message is invisible until its time. Past timestamps are forgiving + # fallback at the service layer (treated as send-now). + send_at = Column(DateTime(timezone=True), nullable=True) expires_at = Column( DateTime(timezone=True), nullable=False, diff --git a/app/routers/messages.py b/app/routers/messages.py index 4981147..c96e784 100644 --- a/app/routers/messages.py +++ b/app/routers/messages.py @@ -91,6 +91,7 @@ async def send_message( metadata=body.metadata, idempotency_key=idempotency_key, from_agent=from_agent, + send_at=body.send_at, ) status_code = 200 if was_dedup_hit else 201 headers = {} diff --git a/app/schemas/message.py b/app/schemas/message.py index 153269f..68127b6 100644 --- a/app/schemas/message.py +++ b/app/schemas/message.py @@ -40,6 +40,19 @@ class MessageCreate(BaseModel): description="Decoupled reply target. Null = reply to `from` (default).", ) metadata: Dict = Field(default_factory=dict) + send_at: Optional[datetime] = Field( + default=None, + description=( + "§13 / Phase 12.1.7: optional UTC timestamp to schedule this " + "message for future delivery. NULL = send now (default). " + "When set in the future, the message sits in the recipient's " + "inbox-query gate until `send_at <= now()`, then becomes " + "fetchable. Push-delivery dispatch is also gated via " + "DispatchOutbox.scheduled_at. Past timestamps are forgiving " + "fallback (treated as send-now). Same semantics as cue-fire " + "`send_at`." + ), + ) class FromAgentRef(BaseModel): @@ -80,6 +93,7 @@ class MessageResponse(BaseModel): read_at: Optional[datetime] acked_at: Optional[datetime] failed_at: Optional[datetime] + send_at: Optional[datetime] = None expires_at: datetime diff --git a/app/services/inbox_service.py b/app/services/inbox_service.py index 92feb9d..fa33766 100644 --- a/app/services/inbox_service.py +++ b/app/services/inbox_service.py @@ -126,41 +126,22 @@ async def list_inbox( # is just an additional WHERE clause. counterpart_agent = await resolve_address(db, counterpart) - # Inbox visibility is gated by AGENT OWNERSHIP, not by Message.user_id. - # ``get_agent_owned`` above already enforces ``agent.user_id == user.id``, - # so any message addressed to ``agent.id`` is implicitly authorized: - # the caller owns the recipient agent, regardless of who sent it. + # Inbox visibility is gated by AGENT OWNERSHIP, not by Message.user_id + # (PR #52 fix — same-tenant ``Message.user_id == user.id`` predicate + # silently dropped cross-user messages once PR-5b enabled cross-user + # send via WebhookAuthorizationBackend). # - # Pre-fix this query also filtered ``Message.user_id == user.id``, - # which silently dropped cross-user messages because ``Message.user_id`` - # is the SENDER's user_id (set at insert time by ``create_message``). - # That worked accidentally for v1's same-tenant constraint where the - # values matched on both sides; it broke when PR-5b added the - # WebhookAuthorizationBackend cross-user path because the send route - # started accepting messages from a different user, but this read - # path mathematically excluded them. Bug surfaced by Dock's - # cue.dock.svc deployment, see CHANGELOG entry for messaging-v1.1.0. - # - # ``Message.user_id`` is retained as the SENDER scope (used by - # ``list_sent`` below, the idempotency check in ``create_message``, - # and the per-user monthly_message_limit accounting). Inbox-side - # access doesn't need it: ``to_agent_id`` plus the agent-ownership - # invariant is the right boundary. - # - # Audit completeness (per CMA review on PR #52, 2026-05-06): - # ``rg "Message\.user_id\s*==" app/ worker/`` returns FOUR call - # sites total — the two flipped here (this base_filter + - # the queued→delivered UPDATE below), plus the two correctly - # KEPT in ``message_service``: the idempotency check at - # ``create_message`` and the ``list_sent`` filter further down - # in this file. ``rg "msg\.user_id"`` returns one extra read in - # ``worker/tasks.py:744`` that fetches the SENDER's user record - # for push-delivery context (not an auth predicate; correct). - # No webhook delivery callback, cue-bus side effect, or audit - # log scan uses ``Message.user_id`` as an auth boundary. + # §13 / Phase 12.1.7: scheduled-send gate. Messages with a future + # ``send_at`` are invisible to the recipient until that time — + # they'll become visible the first time the recipient polls after + # the send_at moment. + now = datetime.now(timezone.utc) + send_at_gate = or_(Message.send_at.is_(None), Message.send_at <= now) + base_filters = [ Message.to_agent_id == agent.id, Message.delivery_state.in_(state_tuple), + send_at_gate, ] if since is not None: base_filters.append(Message.created_at > since) @@ -189,17 +170,21 @@ async def list_inbox( # threads. (Pre-v1.1.1 the inbox poll was always full-inbox so # this distinction didn't exist.) if "queued" in state_tuple: - now = datetime.now(timezone.utc) + upd_now = datetime.now(timezone.utc) upd_predicates = [ Message.to_agent_id == agent.id, Message.delivery_state == "queued", + # §13: don't transition scheduled-but-not-yet-due messages. + # Their queued→delivered flip waits until send_at <= now() + # (whichever poll comes after the scheduled time). + or_(Message.send_at.is_(None), Message.send_at <= upd_now), ] if counterpart_agent is not None: upd_predicates.append(Message.from_agent_id == counterpart_agent.id) upd_q = ( update(Message) .where(*upd_predicates) - .values(delivery_state="delivered", delivered_at=now) + .values(delivery_state="delivered", delivered_at=upd_now) .returning(Message.id) ) await db.execute(upd_q) diff --git a/app/services/message_service.py b/app/services/message_service.py index 58fbfe4..3c2ff1b 100644 --- a/app/services/message_service.py +++ b/app/services/message_service.py @@ -138,6 +138,7 @@ async def create_message( metadata: Dict, idempotency_key: Optional[str], from_agent: Agent, + send_at: Optional[datetime] = None, ) -> Tuple[Message, bool, bool]: """Send a message. Returns (message, was_dedup_hit, priority_downgraded). @@ -291,7 +292,18 @@ async def create_message( # 7. Generate id; thread_id == self.id for root messages, else inherits. msg_id = generate_message_id() thread_id = inherited_thread_id or msg_id - expires_at = datetime.now(timezone.utc) + timedelta(days=MESSAGE_TTL_DAYS) + now = datetime.now(timezone.utc) + expires_at = now + timedelta(days=MESSAGE_TTL_DAYS) + + # §13 / Phase 12.1.7: per-message scheduling. ``send_at`` in the + # future delays delivery; past timestamps are forgiving fallback + # (treated as send-now — caller doesn't have to worry about clock + # skew or being a few ms late). NULL stays NULL. + effective_send_at: Optional[datetime] = None + is_scheduled = False + if send_at is not None and send_at > now: + effective_send_at = send_at + is_scheduled = True msg = Message( id=msg_id, @@ -310,6 +322,7 @@ async def create_message( metadata_=metadata or {}, idempotency_key=idempotency_key, idempotency_fingerprint=fingerprint if idempotency_key else None, + send_at=effective_send_at, expires_at=expires_at, ) db.add(msg) @@ -332,6 +345,12 @@ async def create_message( execution_id=None, cue_id=None, task_type="deliver_message", + # §13: when send_at is in the future, set scheduled_at so + # the dispatcher gates dispatch until then. NULL = + # dispatch immediately (existing behavior). The + # dispatcher's ``scheduled_at IS NULL OR scheduled_at <= + # now()`` filter handles the gating. + scheduled_at=effective_send_at if is_scheduled else None, payload={ "message_id": msg_id, "to_agent_id": to_agent.id, @@ -509,5 +528,6 @@ def to_response_dict(msg: Message) -> Dict: "read_at": msg.read_at, "acked_at": msg.acked_at, "failed_at": msg.failed_at, + "send_at": msg.send_at, "expires_at": msg.expires_at, } diff --git a/parity-manifest.json b/parity-manifest.json index a17196f..9bbe581 100644 --- a/parity-manifest.json +++ b/parity-manifest.json @@ -87,7 +87,7 @@ {"path": "app/models/user.py", "private_counterpart": "app/models/user.py", "last_synced": "2026-04-16"}, {"path": "app/models/worker.py", "private_counterpart": "app/models/worker.py", "last_synced": "2026-04-16"}, {"path": "app/models/agent.py", "private_counterpart": "app/models/agent.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"}, - {"path": "app/models/message.py", "private_counterpart": "app/models/message.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port", "deviation": "from_api_key_id column omitted (multi-key scoping is hosted-only)"}, + {"path": "app/models/message.py", "private_counterpart": "app/models/message.py", "last_synced": "2026-05-05", "ported_in": "port/623-message-send-at", "deviation": "from_api_key_id column omitted (multi-key scoping is hosted-only). send_at column added by port/623."}, {"path": "app/models/usage_messages_monthly.py", "private_counterpart": "app/models/usage_messages_monthly.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"} ], "routers": [ @@ -103,7 +103,7 @@ {"path": "app/routers/webhook_secret.py", "private_counterpart": "app/routers/webhook_secret.py", "last_synced": "2026-04-16"}, {"path": "app/routers/workers.py", "private_counterpart": "app/routers/workers.py", "last_synced": "2026-04-16"}, {"path": "app/routers/agents.py", "private_counterpart": "app/routers/agents.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"}, - {"path": "app/routers/messages.py", "private_counterpart": "app/routers/messages.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"} + {"path": "app/routers/messages.py", "private_counterpart": "app/routers/messages.py", "last_synced": "2026-05-05", "ported_in": "port/623-message-send-at"} ], "schemas": [ {"path": "app/schemas/__init__.py", "private_counterpart": "app/schemas/__init__.py", "last_synced": "2026-04-16"}, @@ -113,7 +113,7 @@ {"path": "app/schemas/outcome.py", "private_counterpart": "app/schemas/outcome.py", "last_synced": "2026-04-16"}, {"path": "app/schemas/worker.py", "private_counterpart": "app/schemas/worker.py", "last_synced": "2026-04-16"}, {"path": "app/schemas/agent.py", "private_counterpart": "app/schemas/agent.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port", "deviation": "api_key_id field omitted from AgentResponse (multi-key scoping is hosted-only)"}, - {"path": "app/schemas/message.py", "private_counterpart": "app/schemas/message.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port", "deviation": "from_api_key_id field omitted from MessageResponse (multi-key scoping is hosted-only)"} + {"path": "app/schemas/message.py", "private_counterpart": "app/schemas/message.py", "last_synced": "2026-05-05", "ported_in": "port/623-message-send-at", "deviation": "from_api_key_id field omitted from MessageResponse (multi-key scoping is hosted-only). send_at field added by port/623."} ], "services": [ {"path": "app/services/__init__.py", "private_counterpart": "app/services/__init__.py", "last_synced": "2026-04-16"}, @@ -125,10 +125,10 @@ {"path": "app/services/usage_service.py", "private_counterpart": "app/services/usage_service.py", "last_synced": "2026-04-16"}, {"path": "app/services/webhook.py", "private_counterpart": "app/services/webhook.py", "last_synced": "2026-04-16"}, {"path": "app/services/agent_service.py", "private_counterpart": "app/services/agent_service.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port", "deviation": "user.api_key_id reference removed (multi-key scoping is hosted-only)"}, - {"path": "app/services/inbox_service.py", "private_counterpart": "app/services/inbox_service.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"}, + {"path": "app/services/inbox_service.py", "private_counterpart": "app/services/inbox_service.py", "last_synced": "2026-05-05", "ported_in": "port/623-message-send-at"}, {"path": "app/services/message_classification.py", "private_counterpart": "app/services/message_classification.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"}, {"path": "app/services/message_delivery.py", "private_counterpart": "app/services/message_delivery.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"}, - {"path": "app/services/message_service.py", "private_counterpart": "app/services/message_service.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port", "deviation": "user.api_key_id and msg.from_api_key_id references removed"}, + {"path": "app/services/message_service.py", "private_counterpart": "app/services/message_service.py", "last_synced": "2026-05-05", "ported_in": "port/623-message-send-at", "deviation": "user.api_key_id and msg.from_api_key_id references removed. send_at parameter + plumbing added by port/623."}, {"path": "app/services/message_usage_service.py", "private_counterpart": "app/services/message_usage_service.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"} ], "utils": [ diff --git a/tests/test_message_send_at.py b/tests/test_message_send_at.py new file mode 100644 index 0000000..3cb3591 --- /dev/null +++ b/tests/test_message_send_at.py @@ -0,0 +1,190 @@ +"""Tests for §13 (Phase 12.1.7): per-message scheduling on POST /v1/messages. + +Optional ``send_at`` timestamp on MessageCreate delays delivery until +the time elapses. Same shape as cue-fire send_at, ported to the +messaging primitive. Ports cueapi/cueapi#623. + +These tests pin: + +1. ``send_at`` omitted → existing behavior (immediate delivery, + inbox shows the message). +2. ``send_at`` in the future → message persisted but recipient's + inbox query gates it out until the time passes. +3. ``send_at`` in the future → DispatchOutbox.scheduled_at set so push + delivery is also gated. +4. ``send_at`` in the past → forgiving fallback to "send now" (no error). +5. Recipient's inbox queued→delivered transition skips + scheduled-but-not-yet-due messages; surfaces them after the time + passes and transitions them on first post-due poll. +6. Sender's `sent` view DOES show scheduled messages (they should see + what they queued). +7. Invalid timestamps return 422. +""" +from __future__ import annotations + +from datetime import datetime, timedelta, timezone + +import pytest +from sqlalchemy import select + +from app.models.dispatch_outbox import DispatchOutbox +from app.models.message import Message + + +async def _create_agent(client, auth_headers, slug, *, webhook_url=None): + body = {"slug": slug, "display_name": slug.title()} + if webhook_url: + body["webhook_url"] = webhook_url + resp = await client.post("/v1/agents", json=body, headers=auth_headers) + assert resp.status_code in (200, 201), resp.text + return resp.json() + + +async def _send(client, auth_headers, from_agent_id, body): + return await client.post( + "/v1/messages", + json=body, + headers={**auth_headers, "X-Cueapi-From-Agent": from_agent_id}, + ) + + +@pytest.mark.asyncio +async def test_send_at_omitted_immediate_delivery(client, auth_headers): + sender = await _create_agent(client, auth_headers, "sa-sender-1") + rcpt = await _create_agent(client, auth_headers, "sa-rcpt-1") + resp = await _send(client, auth_headers, sender["id"], {"to": rcpt["id"], "body": "hi"}) + assert resp.status_code == 201 + assert resp.json()["send_at"] is None + + inbox = await client.get(f"/v1/agents/{rcpt['id']}/inbox", headers=auth_headers) + assert inbox.status_code == 200 + msg_ids = [m["id"] for m in inbox.json()["messages"]] + assert resp.json()["id"] in msg_ids + + +@pytest.mark.asyncio +async def test_send_at_future_invisible_in_inbox(client, auth_headers, db_session): + sender = await _create_agent(client, auth_headers, "sa-sender-2") + rcpt = await _create_agent(client, auth_headers, "sa-rcpt-2") + future = datetime.now(timezone.utc) + timedelta(hours=2) + resp = await _send( + client, auth_headers, sender["id"], + {"to": rcpt["id"], "body": "future-msg", "send_at": future.isoformat()}, + ) + assert resp.status_code == 201 + msg_id = resp.json()["id"] + parsed = datetime.fromisoformat(resp.json()["send_at"]) + assert abs((parsed - future).total_seconds()) < 1.0 + + msg = (await db_session.execute(select(Message).where(Message.id == msg_id))).scalar_one() + assert msg.send_at is not None + assert abs((msg.send_at - future).total_seconds()) < 1.0 + assert msg.delivery_state == "queued" + + inbox = await client.get(f"/v1/agents/{rcpt['id']}/inbox", headers=auth_headers) + assert inbox.status_code == 200 + msg_ids = [m["id"] for m in inbox.json()["messages"]] + assert msg_id not in msg_ids + + +@pytest.mark.asyncio +async def test_send_at_future_outbox_scheduled_at_set(client, auth_headers, db_session): + """When recipient has a webhook, the outbox row's scheduled_at is + set so the dispatcher gates push delivery until send_at.""" + sender = await _create_agent(client, auth_headers, "sa-sender-3") + rcpt = await _create_agent( + client, auth_headers, "sa-rcpt-3", webhook_url="https://example.com/wh" + ) + future = datetime.now(timezone.utc) + timedelta(hours=1) + resp = await _send( + client, auth_headers, sender["id"], + {"to": rcpt["id"], "body": "scheduled push", "send_at": future.isoformat()}, + ) + assert resp.status_code == 201 + msg_id = resp.json()["id"] + + outbox = ( + await db_session.execute( + select(DispatchOutbox).where( + DispatchOutbox.task_type == "deliver_message", + DispatchOutbox.payload["message_id"].astext == msg_id, + ) + ) + ).scalar_one() + assert outbox.scheduled_at is not None + assert abs((outbox.scheduled_at - future).total_seconds()) < 1.0 + + +@pytest.mark.asyncio +async def test_send_at_past_falls_back_to_now(client, auth_headers, db_session): + sender = await _create_agent(client, auth_headers, "sa-sender-4") + rcpt = await _create_agent(client, auth_headers, "sa-rcpt-4") + past = datetime.now(timezone.utc) - timedelta(hours=1) + resp = await _send( + client, auth_headers, sender["id"], + {"to": rcpt["id"], "body": "past-msg", "send_at": past.isoformat()}, + ) + assert resp.status_code == 201 + msg_id = resp.json()["id"] + + msg = (await db_session.execute(select(Message).where(Message.id == msg_id))).scalar_one() + assert msg.send_at is None + inbox = await client.get(f"/v1/agents/{rcpt['id']}/inbox", headers=auth_headers) + msg_ids = [m["id"] for m in inbox.json()["messages"]] + assert msg_id in msg_ids + + +@pytest.mark.asyncio +async def test_send_at_future_visible_after_send_at_passes(client, auth_headers, db_session): + """When a scheduled message's send_at falls into the past, the + next inbox poll surfaces it AND atomically transitions it to delivered.""" + sender = await _create_agent(client, auth_headers, "sa-sender-5") + rcpt = await _create_agent(client, auth_headers, "sa-rcpt-5") + future = datetime.now(timezone.utc) + timedelta(hours=1) + resp = await _send( + client, auth_headers, sender["id"], + {"to": rcpt["id"], "body": "future-msg-5", "send_at": future.isoformat()}, + ) + msg_id = resp.json()["id"] + + msg = (await db_session.execute(select(Message).where(Message.id == msg_id))).scalar_one() + msg.send_at = datetime.now(timezone.utc) - timedelta(seconds=5) + await db_session.commit() + + inbox = await client.get(f"/v1/agents/{rcpt['id']}/inbox", headers=auth_headers) + msg_ids = [m["id"] for m in inbox.json()["messages"]] + assert msg_id in msg_ids + + db_session.expire_all() + msg2 = (await db_session.execute(select(Message).where(Message.id == msg_id))).scalar_one() + assert msg2.delivery_state == "delivered" + + +@pytest.mark.asyncio +async def test_send_at_future_visible_in_sender_sent_view(client, auth_headers): + """Sender's `sent` view shows scheduled messages (they queued them + deliberately; should see them).""" + sender = await _create_agent(client, auth_headers, "sa-sender-6") + rcpt = await _create_agent(client, auth_headers, "sa-rcpt-6") + future = datetime.now(timezone.utc) + timedelta(hours=2) + resp = await _send( + client, auth_headers, sender["id"], + {"to": rcpt["id"], "body": "scheduled", "send_at": future.isoformat()}, + ) + msg_id = resp.json()["id"] + + sent = await client.get(f"/v1/agents/{sender['id']}/sent", headers=auth_headers) + assert sent.status_code == 200 + msg_ids = [m["id"] for m in sent.json()["messages"]] + assert msg_id in msg_ids + + +@pytest.mark.asyncio +async def test_send_at_invalid_timestamp_returns_422(client, auth_headers): + sender = await _create_agent(client, auth_headers, "sa-sender-7") + rcpt = await _create_agent(client, auth_headers, "sa-rcpt-7") + resp = await _send( + client, auth_headers, sender["id"], + {"to": rcpt["id"], "body": "bad", "send_at": "not-a-date"}, + ) + assert resp.status_code in (400, 422)