Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions alembic/versions/024_message_send_at.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""§13 / Phase 12.1.7 — per-message scheduling on POST /v1/messages.

Adds ``messages.send_at`` (TIMESTAMPTZ NULL) so callers can schedule a
message for future delivery in the same shape as the existing per-cue
schedule. Ports cueapi/cueapi#623 to OSS.

Behavior:

* NULL = send now (existing behavior; backward compat for every row
in the messages table at migration time, plus every future call
that omits ``send_at``).
* Non-NULL future timestamp = recipient's inbox query gates with
``send_at IS NULL OR send_at <= now()`` so the message is invisible
until its time. Push-delivery dispatch already gates on
``DispatchOutbox.scheduled_at`` from migration 022 — service layer
plumbs send_at into that column so the dispatcher delays the push
too.
* Past timestamps are forgiving fallback ("send now"); enforced at the
service layer, not the column.

Index covers the inbox-fetch hot path: per-recipient messages
filtered by send_at <= now(). Partial index on rows where
send_at IS NOT NULL — the common case (NULL) doesn't need this index
since the existing per-recipient inbox index already covers that path.

Revision ID: 024
Revises: 023
"""
from alembic import op
import sqlalchemy as sa


revision = "024"
down_revision = "023"


def upgrade():
op.add_column(
"messages",
sa.Column("send_at", sa.DateTime(timezone=True), nullable=True),
)
# CREATE INDEX CONCURRENTLY: avoids ACCESS EXCLUSIVE lock on a
# potentially large messages table. Postgres rejects CONCURRENTLY
# inside a transaction; alembic's autocommit_block opens a separate
# connection in autocommit mode.
with op.get_context().autocommit_block():
op.create_index(
"ix_messages_send_at",
"messages",
["send_at"],
postgresql_where=sa.text("send_at IS NOT NULL"),
postgresql_concurrently=True,
if_not_exists=True,
)


def downgrade():
with op.get_context().autocommit_block():
op.drop_index(
"ix_messages_send_at",
table_name="messages",
postgresql_concurrently=True,
if_exists=True,
)
op.drop_column("messages", "send_at")
6 changes: 6 additions & 0 deletions app/models/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ class Message(Base):
# threshold gets moved back to ``retry_ready`` so the dispatcher
# can re-enqueue. Handles worker-crash-mid-delivery.
delivering_started_at = Column(DateTime(timezone=True), nullable=True)
# §13 / Phase 12.1.7: scheduled-send. NULL = send now (existing
# behavior); future timestamp = inbox-fetch + push-delivery dispatch
# both gate on ``send_at IS NULL OR send_at <= now()`` so the
# message is invisible until its time. Past timestamps are forgiving
# fallback at the service layer (treated as send-now).
send_at = Column(DateTime(timezone=True), nullable=True)
expires_at = Column(
DateTime(timezone=True),
nullable=False,
Expand Down
1 change: 1 addition & 0 deletions app/routers/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ async def send_message(
metadata=body.metadata,
idempotency_key=idempotency_key,
from_agent=from_agent,
send_at=body.send_at,
)
status_code = 200 if was_dedup_hit else 201
headers = {}
Expand Down
14 changes: 14 additions & 0 deletions app/schemas/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,19 @@ class MessageCreate(BaseModel):
description="Decoupled reply target. Null = reply to `from` (default).",
)
metadata: Dict = Field(default_factory=dict)
send_at: Optional[datetime] = Field(
default=None,
description=(
"§13 / Phase 12.1.7: optional UTC timestamp to schedule this "
"message for future delivery. NULL = send now (default). "
"When set in the future, the message sits in the recipient's "
"inbox-query gate until `send_at <= now()`, then becomes "
"fetchable. Push-delivery dispatch is also gated via "
"DispatchOutbox.scheduled_at. Past timestamps are forgiving "
"fallback (treated as send-now). Same semantics as cue-fire "
"`send_at`."
),
)


class FromAgentRef(BaseModel):
Expand Down Expand Up @@ -80,6 +93,7 @@ class MessageResponse(BaseModel):
read_at: Optional[datetime]
acked_at: Optional[datetime]
failed_at: Optional[datetime]
send_at: Optional[datetime] = None
expires_at: datetime


Expand Down
51 changes: 18 additions & 33 deletions app/services/inbox_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,41 +126,22 @@ async def list_inbox(
# is just an additional WHERE clause.
counterpart_agent = await resolve_address(db, counterpart)

# Inbox visibility is gated by AGENT OWNERSHIP, not by Message.user_id.
# ``get_agent_owned`` above already enforces ``agent.user_id == user.id``,
# so any message addressed to ``agent.id`` is implicitly authorized:
# the caller owns the recipient agent, regardless of who sent it.
# Inbox visibility is gated by AGENT OWNERSHIP, not by Message.user_id
# (PR #52 fix — same-tenant ``Message.user_id == user.id`` predicate
# silently dropped cross-user messages once PR-5b enabled cross-user
# send via WebhookAuthorizationBackend).
#
# Pre-fix this query also filtered ``Message.user_id == user.id``,
# which silently dropped cross-user messages because ``Message.user_id``
# is the SENDER's user_id (set at insert time by ``create_message``).
# That worked accidentally for v1's same-tenant constraint where the
# values matched on both sides; it broke when PR-5b added the
# WebhookAuthorizationBackend cross-user path because the send route
# started accepting messages from a different user, but this read
# path mathematically excluded them. Bug surfaced by Dock's
# cue.dock.svc deployment, see CHANGELOG entry for messaging-v1.1.0.
#
# ``Message.user_id`` is retained as the SENDER scope (used by
# ``list_sent`` below, the idempotency check in ``create_message``,
# and the per-user monthly_message_limit accounting). Inbox-side
# access doesn't need it: ``to_agent_id`` plus the agent-ownership
# invariant is the right boundary.
#
# Audit completeness (per CMA review on PR #52, 2026-05-06):
# ``rg "Message\.user_id\s*==" app/ worker/`` returns FOUR call
# sites total — the two flipped here (this base_filter +
# the queued→delivered UPDATE below), plus the two correctly
# KEPT in ``message_service``: the idempotency check at
# ``create_message`` and the ``list_sent`` filter further down
# in this file. ``rg "msg\.user_id"`` returns one extra read in
# ``worker/tasks.py:744`` that fetches the SENDER's user record
# for push-delivery context (not an auth predicate; correct).
# No webhook delivery callback, cue-bus side effect, or audit
# log scan uses ``Message.user_id`` as an auth boundary.
# §13 / Phase 12.1.7: scheduled-send gate. Messages with a future
# ``send_at`` are invisible to the recipient until that time —
# they'll become visible the first time the recipient polls after
# the send_at moment.
now = datetime.now(timezone.utc)
send_at_gate = or_(Message.send_at.is_(None), Message.send_at <= now)

base_filters = [
Message.to_agent_id == agent.id,
Message.delivery_state.in_(state_tuple),
send_at_gate,
]
if since is not None:
base_filters.append(Message.created_at > since)
Expand Down Expand Up @@ -189,17 +170,21 @@ async def list_inbox(
# threads. (Pre-v1.1.1 the inbox poll was always full-inbox so
# this distinction didn't exist.)
if "queued" in state_tuple:
now = datetime.now(timezone.utc)
upd_now = datetime.now(timezone.utc)
upd_predicates = [
Message.to_agent_id == agent.id,
Message.delivery_state == "queued",
# §13: don't transition scheduled-but-not-yet-due messages.
# Their queued→delivered flip waits until send_at <= now()
# (whichever poll comes after the scheduled time).
or_(Message.send_at.is_(None), Message.send_at <= upd_now),
]
if counterpart_agent is not None:
upd_predicates.append(Message.from_agent_id == counterpart_agent.id)
upd_q = (
update(Message)
.where(*upd_predicates)
.values(delivery_state="delivered", delivered_at=now)
.values(delivery_state="delivered", delivered_at=upd_now)
.returning(Message.id)
)
await db.execute(upd_q)
Expand Down
22 changes: 21 additions & 1 deletion app/services/message_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ async def create_message(
metadata: Dict,
idempotency_key: Optional[str],
from_agent: Agent,
send_at: Optional[datetime] = None,
) -> Tuple[Message, bool, bool]:
"""Send a message. Returns (message, was_dedup_hit, priority_downgraded).

Expand Down Expand Up @@ -291,7 +292,18 @@ async def create_message(
# 7. Generate id; thread_id == self.id for root messages, else inherits.
msg_id = generate_message_id()
thread_id = inherited_thread_id or msg_id
expires_at = datetime.now(timezone.utc) + timedelta(days=MESSAGE_TTL_DAYS)
now = datetime.now(timezone.utc)
expires_at = now + timedelta(days=MESSAGE_TTL_DAYS)

# §13 / Phase 12.1.7: per-message scheduling. ``send_at`` in the
# future delays delivery; past timestamps are forgiving fallback
# (treated as send-now — caller doesn't have to worry about clock
# skew or being a few ms late). NULL stays NULL.
effective_send_at: Optional[datetime] = None
is_scheduled = False
if send_at is not None and send_at > now:
effective_send_at = send_at
is_scheduled = True

msg = Message(
id=msg_id,
Expand All @@ -310,6 +322,7 @@ async def create_message(
metadata_=metadata or {},
idempotency_key=idempotency_key,
idempotency_fingerprint=fingerprint if idempotency_key else None,
send_at=effective_send_at,
expires_at=expires_at,
)
db.add(msg)
Expand All @@ -332,6 +345,12 @@ async def create_message(
execution_id=None,
cue_id=None,
task_type="deliver_message",
# §13: when send_at is in the future, set scheduled_at so
# the dispatcher gates dispatch until then. NULL =
# dispatch immediately (existing behavior). The
# dispatcher's ``scheduled_at IS NULL OR scheduled_at <=
# now()`` filter handles the gating.
scheduled_at=effective_send_at if is_scheduled else None,
payload={
"message_id": msg_id,
"to_agent_id": to_agent.id,
Expand Down Expand Up @@ -509,5 +528,6 @@ def to_response_dict(msg: Message) -> Dict:
"read_at": msg.read_at,
"acked_at": msg.acked_at,
"failed_at": msg.failed_at,
"send_at": msg.send_at,
"expires_at": msg.expires_at,
}
10 changes: 5 additions & 5 deletions parity-manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
{"path": "app/models/user.py", "private_counterpart": "app/models/user.py", "last_synced": "2026-04-16"},
{"path": "app/models/worker.py", "private_counterpart": "app/models/worker.py", "last_synced": "2026-04-16"},
{"path": "app/models/agent.py", "private_counterpart": "app/models/agent.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"},
{"path": "app/models/message.py", "private_counterpart": "app/models/message.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port", "deviation": "from_api_key_id column omitted (multi-key scoping is hosted-only)"},
{"path": "app/models/message.py", "private_counterpart": "app/models/message.py", "last_synced": "2026-05-05", "ported_in": "port/623-message-send-at", "deviation": "from_api_key_id column omitted (multi-key scoping is hosted-only). send_at column added by port/623."},
{"path": "app/models/usage_messages_monthly.py", "private_counterpart": "app/models/usage_messages_monthly.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"}
],
"routers": [
Expand All @@ -103,7 +103,7 @@
{"path": "app/routers/webhook_secret.py", "private_counterpart": "app/routers/webhook_secret.py", "last_synced": "2026-04-16"},
{"path": "app/routers/workers.py", "private_counterpart": "app/routers/workers.py", "last_synced": "2026-04-16"},
{"path": "app/routers/agents.py", "private_counterpart": "app/routers/agents.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"},
{"path": "app/routers/messages.py", "private_counterpart": "app/routers/messages.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"}
{"path": "app/routers/messages.py", "private_counterpart": "app/routers/messages.py", "last_synced": "2026-05-05", "ported_in": "port/623-message-send-at"}
],
"schemas": [
{"path": "app/schemas/__init__.py", "private_counterpart": "app/schemas/__init__.py", "last_synced": "2026-04-16"},
Expand All @@ -113,7 +113,7 @@
{"path": "app/schemas/outcome.py", "private_counterpart": "app/schemas/outcome.py", "last_synced": "2026-04-16"},
{"path": "app/schemas/worker.py", "private_counterpart": "app/schemas/worker.py", "last_synced": "2026-04-16"},
{"path": "app/schemas/agent.py", "private_counterpart": "app/schemas/agent.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port", "deviation": "api_key_id field omitted from AgentResponse (multi-key scoping is hosted-only)"},
{"path": "app/schemas/message.py", "private_counterpart": "app/schemas/message.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port", "deviation": "from_api_key_id field omitted from MessageResponse (multi-key scoping is hosted-only)"}
{"path": "app/schemas/message.py", "private_counterpart": "app/schemas/message.py", "last_synced": "2026-05-05", "ported_in": "port/623-message-send-at", "deviation": "from_api_key_id field omitted from MessageResponse (multi-key scoping is hosted-only). send_at field added by port/623."}
],
"services": [
{"path": "app/services/__init__.py", "private_counterpart": "app/services/__init__.py", "last_synced": "2026-04-16"},
Expand All @@ -125,10 +125,10 @@
{"path": "app/services/usage_service.py", "private_counterpart": "app/services/usage_service.py", "last_synced": "2026-04-16"},
{"path": "app/services/webhook.py", "private_counterpart": "app/services/webhook.py", "last_synced": "2026-04-16"},
{"path": "app/services/agent_service.py", "private_counterpart": "app/services/agent_service.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port", "deviation": "user.api_key_id reference removed (multi-key scoping is hosted-only)"},
{"path": "app/services/inbox_service.py", "private_counterpart": "app/services/inbox_service.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"},
{"path": "app/services/inbox_service.py", "private_counterpart": "app/services/inbox_service.py", "last_synced": "2026-05-05", "ported_in": "port/623-message-send-at"},
{"path": "app/services/message_classification.py", "private_counterpart": "app/services/message_classification.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"},
{"path": "app/services/message_delivery.py", "private_counterpart": "app/services/message_delivery.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"},
{"path": "app/services/message_service.py", "private_counterpart": "app/services/message_service.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port", "deviation": "user.api_key_id and msg.from_api_key_id references removed"},
{"path": "app/services/message_service.py", "private_counterpart": "app/services/message_service.py", "last_synced": "2026-05-05", "ported_in": "port/623-message-send-at", "deviation": "user.api_key_id and msg.from_api_key_id references removed. send_at parameter + plumbing added by port/623."},
{"path": "app/services/message_usage_service.py", "private_counterpart": "app/services/message_usage_service.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"}
],
"utils": [
Expand Down
Loading
Loading