Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cueapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from cueapi.payload import CuePayload
from cueapi.resources.agents import AgentsResource
from cueapi.resources.executions import ExecutionsResource
from cueapi.resources.messages import MessagesResource
from cueapi.resources.usage import UsageResource
from cueapi.resources.workers import WorkersResource
from cueapi.webhook import verify_webhook
Expand All @@ -24,6 +25,7 @@
"CueAPI",
"CuePayload",
"ExecutionsResource",
"MessagesResource",
"UsageResource",
"WorkersResource",
"verify_webhook",
Expand Down
2 changes: 2 additions & 0 deletions cueapi/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from cueapi.resources.agents import AgentsResource
from cueapi.resources.cues import CuesResource
from cueapi.resources.executions import ExecutionsResource
from cueapi.resources.messages import MessagesResource
from cueapi.resources.usage import UsageResource
from cueapi.resources.workers import WorkersResource

Expand Down Expand Up @@ -75,6 +76,7 @@ def __init__(
self.workers = WorkersResource(self)
self.usage = UsageResource(self)
self.agents = AgentsResource(self)
self.messages = MessagesResource(self)

def close(self) -> None:
"""Close the underlying HTTP client."""
Expand Down
120 changes: 120 additions & 0 deletions cueapi/resources/messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""Messages resource — messaging primitive lifecycle (Phase 12.1.5)."""

from __future__ import annotations

from typing import TYPE_CHECKING, Any, Dict, Optional

if TYPE_CHECKING:
from cueapi.client import CueAPI


class MessagesResource:
"""Messages API resource.

Wraps the ``/v1/messages`` surface from the messaging primitive
(Phase 12.1.5). Covers send + per-message lifecycle (get / read /
ack). The agents identity surface lives on the sibling
``client.agents`` resource — this class only handles messages.
"""

def __init__(self, client: "CueAPI") -> None:
self._client = client

def send(
self,
*,
from_agent: str,
to: str,
body: str,
subject: Optional[str] = None,
reply_to: Optional[str] = None,
priority: Optional[int] = None,
expects_reply: bool = False,
reply_to_agent: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
idempotency_key: Optional[str] = None,
) -> dict:
"""Send a message.

``from_agent`` is sent as the ``X-Cueapi-From-Agent`` header,
NOT in the body. The server reads it from the header to
authenticate the sender against the calling key. Don't try to
pass it in the body — the server's ``MessageCreate`` schema is
``extra="forbid"`` and will 400.

``idempotency_key`` is sent as the ``Idempotency-Key`` header.
Same key + same body within 24h returns the existing message
with HTTP 200 instead of 201. Same key + different body
returns HTTP 409 ``idempotency_key_conflict``.

Args:
from_agent: Sender agent — opaque agent_id or slug-form
(``agent@user``). MUST be owned by the calling key.
to: Recipient — opaque agent_id or slug-form.
body: Message body (1-32768 chars).
subject: Optional subject line (max 255 chars).
reply_to: Previous message ID this is replying to
(``msg_<12 alphanumeric>``). thread_id inherits.
priority: 1-5 (server default 3). Receiver-pair limits may
downgrade priority>3 to 3; the server signals this via
the ``X-CueAPI-Priority-Downgraded: true`` response
header. Callers wanting to detect downgrade need to
inspect the response shape via the underlying
httpx.Response — not exposed in the SDK return value.
expects_reply: Mark this message as expecting a reply.
Default False; only sent when True.
reply_to_agent: Decoupled reply target. Defaults to
``from`` (sender). Use when reply should route to a
different agent.
metadata: Optional JSON metadata blob.
idempotency_key: Optional ``Idempotency-Key`` header
(≤255 chars).

Returns:
Dict matching the server's ``MessageResponse`` shape.

Raises:
ValueError: If ``idempotency_key`` exceeds 255 chars
(matches the server's hard limit).
"""
if idempotency_key is not None and len(idempotency_key) > 255:
raise ValueError("idempotency_key must be ≤255 characters")

payload: Dict[str, Any] = {"to": to, "body": body}
if subject is not None:
payload["subject"] = subject
if reply_to is not None:
payload["reply_to"] = reply_to
if priority is not None:
payload["priority"] = priority
# Boolean flag — only send when True. Server default is False;
# sending `false` is no-op + adds payload noise. Pinned in tests.
if expects_reply:
payload["expects_reply"] = True
if reply_to_agent is not None:
payload["reply_to_agent"] = reply_to_agent
if metadata is not None:
payload["metadata"] = metadata

headers: Dict[str, str] = {"X-Cueapi-From-Agent": from_agent}
if idempotency_key is not None:
headers["Idempotency-Key"] = idempotency_key

return self._client._post("/v1/messages", json=payload, headers=headers)

def get(self, msg_id: str) -> dict:
"""Get a single message by ID."""
return self._client._get(f"/v1/messages/{msg_id}")

def mark_read(self, msg_id: str) -> dict:
"""Mark a message as read.

Idempotent — calling on already-``read`` returns 200 unchanged.
Returns 409 (raised as ``CueAPIError``) if the message is in a
terminal state (``acked`` / ``expired``).
"""
return self._client._post(f"/v1/messages/{msg_id}/read", json={})

def ack(self, msg_id: str) -> dict:
"""Acknowledge a message — terminal state, no further transitions."""
return self._client._post(f"/v1/messages/{msg_id}/ack", json={})
139 changes: 139 additions & 0 deletions tests/test_messages_resource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""Tests for MessagesResource."""

import pytest
from unittest.mock import MagicMock

from cueapi.resources.messages import MessagesResource


class TestSend:
def test_minimal_body_and_from_header(self):
# Pin: --from goes in X-Cueapi-From-Agent HEADER, NOT in body.
# The server's MessageCreate is extra="forbid" and would 400 on
# `{"from": "..."}` in the body, but we want this caught at unit
# test time, not silently at integration.
mock_client = MagicMock()
mock_client._post.return_value = {
"id": "msg_x", "delivery_state": "queued", "thread_id": "thr_x",
}
r = MessagesResource(mock_client)

r.send(from_agent="sender@x", to="recipient@y", body="hi")

mock_client._post.assert_called_once_with(
"/v1/messages",
json={"to": "recipient@y", "body": "hi"},
headers={"X-Cueapi-From-Agent": "sender@x"},
)

def test_with_all_optionals(self):
mock_client = MagicMock()
mock_client._post.return_value = {"id": "msg_x", "delivery_state": "queued"}
r = MessagesResource(mock_client)

r.send(
from_agent="sender@x",
to="recipient@y",
body="hi",
subject="re: hello",
reply_to="msg_abcdef123456",
priority=5,
expects_reply=True,
reply_to_agent="alt@x",
metadata={"k": "v"},
idempotency_key="idemp-key-1",
)

call = mock_client._post.call_args
assert call.args == ("/v1/messages",)
assert call.kwargs["json"] == {
"to": "recipient@y",
"body": "hi",
"subject": "re: hello",
"reply_to": "msg_abcdef123456",
"priority": 5,
"expects_reply": True,
"reply_to_agent": "alt@x",
"metadata": {"k": "v"},
}
assert call.kwargs["headers"] == {
"X-Cueapi-From-Agent": "sender@x",
"Idempotency-Key": "idemp-key-1",
}

def test_omits_expects_reply_when_default(self):
# Pin: default False MUST NOT appear in body. Server's Pydantic
# default is False; sending `expects_reply: false` is no-op + adds
# noise. Refactor that always-sends would slip past the typed
# server schema but be caught here.
mock_client = MagicMock()
mock_client._post.return_value = {"id": "msg_x"}
r = MessagesResource(mock_client)

r.send(from_agent="x", to="y", body="hi")

body = mock_client._post.call_args.kwargs["json"]
assert "expects_reply" not in body

def test_idempotency_key_too_long_raises_client_side(self):
mock_client = MagicMock()
r = MessagesResource(mock_client)

with pytest.raises(ValueError, match="255"):
r.send(
from_agent="x", to="y", body="hi",
idempotency_key="x" * 256,
)
# Crucially: must NOT have hit the wire.
mock_client._post.assert_not_called()

def test_omits_idempotency_key_header_when_unset(self):
# Headers should ONLY contain X-Cueapi-From-Agent when no
# idempotency_key is passed. Pin so a refactor can't silently
# start adding `Idempotency-Key: None` (httpx would coerce).
mock_client = MagicMock()
mock_client._post.return_value = {"id": "msg_x"}
r = MessagesResource(mock_client)

r.send(from_agent="x", to="y", body="hi")

headers = mock_client._post.call_args.kwargs["headers"]
assert headers == {"X-Cueapi-From-Agent": "x"}
assert "Idempotency-Key" not in headers


class TestGet:
def test_get(self):
mock_client = MagicMock()
mock_client._get.return_value = {"id": "msg_x"}
r = MessagesResource(mock_client)

r.get("msg_x")

mock_client._get.assert_called_once_with("/v1/messages/msg_x")


class TestMarkRead:
def test_mark_read(self):
mock_client = MagicMock()
mock_client._post.return_value = {"delivery_state": "read"}
r = MessagesResource(mock_client)

r.mark_read("msg_x")

mock_client._post.assert_called_once_with(
"/v1/messages/msg_x/read", json={},
)


class TestAck:
def test_ack(self):
mock_client = MagicMock()
mock_client._post.return_value = {"delivery_state": "acked"}
r = MessagesResource(mock_client)

r.ack("msg_x")

mock_client._post.assert_called_once_with(
"/v1/messages/msg_x/ack", json={},
)
Loading