Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions alembic/versions/030_message_send_at.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""§13 / Phase 12.1.7 — per-message scheduling on POST /v1/messages.

Adds ``messages.send_at`` (TIMESTAMPTZ NULL) so callers can schedule a
message for future delivery in the same shape as the existing per-cue
schedule. Mirrors PR #618's cue-fire ``send_at`` semantics on the
messaging primitive.

Behavior:

* NULL = send now (existing behavior; backward compat for every row
in the messages table at migration time, plus every future call
that omits ``send_at``).
* Non-NULL future timestamp = recipient's inbox query gates with
``send_at IS NULL OR send_at <= now()`` so the message is invisible
until its time. Push-delivery dispatch already gates on
``DispatchOutbox.scheduled_at`` from Slice 3b — service layer plumbs
send_at into that column so the dispatcher delays the push too.
* Past timestamps are forgiving fallback ("send now"); enforced at the
service layer, not the column.

Index covers the inbox-fetch hot path: per-recipient messages
filtered by send_at <= now(). Combined with the existing
``ix_messages_inbox(to_agent_id, delivery_state, created_at)``, this
adds a partial index on rows where send_at IS NOT NULL — the
common case (NULL) doesn't need this index since the existing inbox
index already excludes via the ``send_at IS NULL OR send_at <= now()``
predicate when send_at is NULL.

Revision ID: 030
Revises: 029
"""
from alembic import op
import sqlalchemy as sa


revision = "030"
down_revision = "029"


def upgrade():
op.add_column(
"messages",
sa.Column("send_at", sa.DateTime(timezone=True), nullable=True),
)
# Partial index: only rows where send_at IS NOT NULL. The hot path is
# "is the future-scheduled message past its send time yet?" — for
# rows with NULL send_at (the dominant case), the existing
# ix_messages_inbox already covers per-recipient lookups.
#
# CREATE INDEX CONCURRENTLY required because messages is large on
# prod (multi-million rows once shared messaging gets traffic).
# Postgres rejects CONCURRENTLY inside a transaction; alembic's
# autocommit_block opens a separate connection in autocommit mode.
with op.get_context().autocommit_block():
op.create_index(
"ix_messages_send_at",
"messages",
["send_at"],
postgresql_where=sa.text("send_at IS NOT NULL"),
postgresql_concurrently=True,
if_not_exists=True,
)


def downgrade():
with op.get_context().autocommit_block():
op.drop_index(
"ix_messages_send_at",
table_name="messages",
postgresql_concurrently=True,
if_exists=True,
)
op.drop_column("messages", "send_at")
6 changes: 6 additions & 0 deletions app/models/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ class Message(Base):
# threshold gets moved back to ``retry_ready`` so the dispatcher
# can re-enqueue. Handles worker-crash-mid-delivery.
delivering_started_at = Column(DateTime(timezone=True), nullable=True)
# §13 / Phase 12.1.7: scheduled-send. NULL = send now (existing
# behavior); future timestamp = inbox-fetch + push-delivery dispatch
# both gate on ``send_at IS NULL OR send_at <= now()`` so the
# message is invisible until its time. Past timestamps are forgiving
# fallback at the service layer (treated as send-now).
send_at = Column(DateTime(timezone=True), nullable=True)
expires_at = Column(
DateTime(timezone=True),
nullable=False,
Expand Down
1 change: 1 addition & 0 deletions app/routers/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ async def send_message(
idempotency_key=idempotency_key,
from_agent=from_agent,
correlation_id=body.correlation_id,
send_at=body.send_at,
)
status_code = 200 if was_dedup_hit else 201
headers = {}
Expand Down
14 changes: 14 additions & 0 deletions app/schemas/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,19 @@ class MessageCreate(BaseModel):
"lookup. Null by default."
),
)
send_at: Optional[datetime] = Field(
default=None,
description=(
"§13 / Phase 12.1.7: optional UTC timestamp to schedule this "
"message for future delivery. NULL = send now (default). "
"When set in the future, the message sits in the recipient's "
"inbox-query gate until `send_at <= now()`, then becomes "
"fetchable. Push-delivery dispatch is also gated via "
"DispatchOutbox.scheduled_at. Past timestamps are forgiving "
"fallback (treated as send-now). Same semantics as cue-fire "
"`send_at` (PR #618)."
),
)


class FromAgentRef(BaseModel):
Expand Down Expand Up @@ -92,6 +105,7 @@ class MessageResponse(BaseModel):
read_at: Optional[datetime]
acked_at: Optional[datetime]
failed_at: Optional[datetime]
send_at: Optional[datetime] = None
expires_at: datetime
# PR-2a-OSS additions — D3 RPC framing + dispatcher state surface.
correlation_id: Optional[str] = None
Expand Down
17 changes: 15 additions & 2 deletions app/services/inbox_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,18 @@ async def list_inbox(
# for push-delivery context (not an auth predicate; correct).
# No webhook delivery callback, cue-bus side effect, or audit
# log scan uses ``Message.user_id`` as an auth boundary.
# §13 / Phase 12.1.7: gate scheduled-send messages out of the
# recipient's inbox until their time. NULL send_at means "send
# now" (existing behavior — covered by IS NULL); future timestamps
# become visible the first time the recipient polls after the
# send_at moment.
now_gate = datetime.now(timezone.utc)
send_at_gate = or_(Message.send_at.is_(None), Message.send_at <= now_gate)

base_filters = [
Message.to_agent_id == agent.id,
Message.delivery_state.in_(state_tuple),
send_at_gate,
]
if since is not None:
base_filters.append(Message.created_at > since)
Expand Down Expand Up @@ -189,17 +198,21 @@ async def list_inbox(
# threads. (Pre-v1.1.1 the inbox poll was always full-inbox so
# this distinction didn't exist.)
if "queued" in state_tuple:
now = datetime.now(timezone.utc)
upd_now = datetime.now(timezone.utc)
upd_predicates = [
Message.to_agent_id == agent.id,
Message.delivery_state == "queued",
# §13: don't transition scheduled-but-not-yet-due messages.
# Their queued→delivered flip waits until send_at <= now()
# (whichever poll comes after the scheduled time).
or_(Message.send_at.is_(None), Message.send_at <= upd_now),
]
if counterpart_agent is not None:
upd_predicates.append(Message.from_agent_id == counterpart_agent.id)
upd_q = (
update(Message)
.where(*upd_predicates)
.values(delivery_state="delivered", delivered_at=now)
.values(delivery_state="delivered", delivered_at=upd_now)
.returning(Message.id)
)
await db.execute(upd_q)
Expand Down
24 changes: 23 additions & 1 deletion app/services/message_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ async def create_message(
idempotency_key: Optional[str],
from_agent: Agent,
correlation_id: Optional[str] = None,
send_at: Optional[datetime] = None,
) -> Tuple[Message, bool, bool]:
"""Send a message. Returns (message, was_dedup_hit, priority_downgraded).

Expand Down Expand Up @@ -295,7 +296,19 @@ async def create_message(
# 7. Generate id; thread_id == self.id for root messages, else inherits.
msg_id = generate_message_id()
thread_id = inherited_thread_id or msg_id
expires_at = datetime.now(timezone.utc) + timedelta(days=settings.MESSAGE_TTL_DAYS)
now = datetime.now(timezone.utc)
expires_at = now + timedelta(days=settings.MESSAGE_TTL_DAYS)

# §13 / Phase 12.1.7: per-message scheduling. ``send_at`` in the
# future delays delivery; past timestamps are forgiving fallback
# (treated as send-now — caller doesn't have to worry about clock
# skew or being a few ms late). NULL stays NULL. Same shape as
# cue-fire send_at (PR #618).
effective_send_at: Optional[datetime] = None
is_scheduled = False
if send_at is not None and send_at > now:
effective_send_at = send_at
is_scheduled = True

msg = Message(
id=msg_id,
Expand All @@ -314,6 +327,7 @@ async def create_message(
metadata_=metadata or {},
idempotency_key=idempotency_key,
idempotency_fingerprint=fingerprint if idempotency_key else None,
send_at=effective_send_at,
expires_at=expires_at,
# PR-2a-OSS port — D2 (priority two-axis) + D3 (RPC framing).
# Bucket = priority verbatim at v1; future tier policy reads
Expand Down Expand Up @@ -377,6 +391,13 @@ async def create_message(
execution_id=None,
cue_id=None,
task_type="deliver_message",
# §13: when send_at is in the future, set scheduled_at so
# the dispatcher gates dispatch until then. NULL =
# dispatch immediately (existing behavior). The
# dispatcher's ``scheduled_at IS NULL OR scheduled_at <=
# now()`` filter handles the gating. Same plumbing as
# cue-fire send_at (PR #618).
scheduled_at=effective_send_at if is_scheduled else None,
payload={
"message_id": msg_id,
"to_agent_id": to_agent.id,
Expand Down Expand Up @@ -554,6 +575,7 @@ def to_response_dict(msg: Message) -> Dict:
"read_at": msg.read_at,
"acked_at": msg.acked_at,
"failed_at": msg.failed_at,
"send_at": msg.send_at,
"expires_at": msg.expires_at,
"correlation_id": msg.correlation_id,
"dispatch_priority_bucket": msg.dispatch_priority_bucket,
Expand Down
3 changes: 2 additions & 1 deletion parity-manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@
{"path": "alembic/versions/015_widen_device_code_column.py", "private_counterpart": "alembic/versions/015_widen_device_code_column.py", "last_synced": "2026-04-16"},
{"path": "alembic/versions/016_add_on_failure_column.py", "private_counterpart": "alembic/versions/016_add_on_failure_column.py", "last_synced": "2026-04-16"},
{"path": "alembic/versions/028_event_emit_primitive.py", "private_counterpart": "alembic/versions/058_event_emit_primitive.py", "last_synced": "2026-05-11", "ported_in": "event-emit-primitive-port (PR-1b)", "deviation": "Renumbered 058 → 028 (cueapi-core HEAD was 27 revisions behind private). Schema verbatim."},
{"path": "alembic/versions/029_messaging_emission_columns.py", "private_counterpart": "alembic/versions/059_messaging_emission_columns.py", "last_synced": "2026-05-11", "ported_in": "messaging-emission-port (PR-2a)", "deviation": "Renumbered 059 → 029. Schema verbatim: dispatch_priority_bucket + message_dispatch_error + correlation_id columns + partial-where index on correlation_id."}
{"path": "alembic/versions/029_messaging_emission_columns.py", "private_counterpart": "alembic/versions/059_messaging_emission_columns.py", "last_synced": "2026-05-11", "ported_in": "messaging-emission-port (PR-2a)", "deviation": "Renumbered 059 → 029. Schema verbatim: dispatch_priority_bucket + message_dispatch_error + correlation_id columns + partial-where index on correlation_id."},
{"path": "alembic/versions/030_message_send_at.py", "private_counterpart": "alembic/versions/047_message_send_at.py", "last_synced": "2026-05-11", "ported_in": "message-send-at-port (private #623)", "deviation": "Renumbered 047 → 030. Schema verbatim: messages.send_at TIMESTAMPTZ NULL + partial index ix_messages_send_at (WHERE send_at IS NOT NULL) built CONCURRENTLY."}
],
"app_core": [
{"path": "app/__init__.py", "private_counterpart": "app/__init__.py", "last_synced": "2026-04-16"},
Expand Down
Loading
Loading