Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions alembic/versions/024_agents_last_seen_at.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Agent Directory productization (Phase A) — implicit online-state derivation.

Ports cueapi/cueapi#630 (private migration 048) to OSS.

Adds ``agents.last_seen_at`` so the server can derive an agent's
``online`` status from recent activity instead of requiring callers
to ``PATCH /v1/agents/{ref}`` with explicit status updates.

Hot paths that write ``last_seen_at = now()`` (added in this PR's
service-layer changes):

* ``create_message`` — sender's agent
* ``list_inbox`` — recipient's agent (poll-based delivery)

Derivation rules (computed in the service layer, not stored):

* ``last_seen_at`` within 5 min → ``online``
* ``last_seen_at`` within 30 min → ``away``
* anything older / NULL → ``offline``

The existing ``status`` column stays as a caller-overrideable enum;
the new derivation is the default surface. Callers can still assert
``status=away`` (e.g., agent voluntarily marks itself away during a
long-running task) and the override wins over the derivation.

Migration sequence: OSS HEAD at branch creation was 023. Open PR #46
(message send_at) also targets 024; one of the two PRs will land first
and the second will need renumber to 025 — sentinel-rebase / manual
rebase resolves the collision.

Revision ID: 024
Revises: 023
"""
from alembic import op
import sqlalchemy as sa


revision = "024"
down_revision = "023"


def upgrade():
op.add_column(
"agents",
sa.Column("last_seen_at", sa.DateTime(timezone=True), nullable=True),
)


def downgrade():
op.drop_column("agents", "last_seen_at")
5 changes: 5 additions & 0 deletions app/models/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ class Agent(Base):
server_default="{}",
)
status = Column(String(16), nullable=False, default="online", server_default="online")
# Updated by hot-path service hooks (create_message sender,
# list_inbox recipient). Used to derive an "active in last N
# minutes" online signal in GET /v1/agents/roster. NULL = no
# activity observed yet. Ports cueapi/cueapi#630.
last_seen_at = Column(DateTime(timezone=True), nullable=True)
deleted_at = Column(DateTime(timezone=True), nullable=True)
created_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now())
updated_at = Column(
Expand Down
49 changes: 49 additions & 0 deletions app/routers/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
AgentCreate,
AgentListResponse,
AgentResponse,
AgentRosterEntry,
AgentRosterResponse,
AgentUpdate,
WebhookSecretResponse,
)
Expand All @@ -40,6 +42,7 @@
get_agent_owned,
get_webhook_secret,
list_agents,
list_roster,
rotate_webhook_secret,
soft_delete_agent,
to_response_dict,
Expand Down Expand Up @@ -86,6 +89,52 @@ async def create_agent_endpoint(
return AgentResponse(**payload)


def _etag_matches(if_none_match_header, server_etag):
"""Pure helper: does the client's ``If-None-Match`` match the
server-computed weak ETag? Trims whitespace; None/empty → no match.
"""
if not if_none_match_header:
return False
return if_none_match_header.strip() == server_etag


@router.get("/roster", response_model=AgentRosterResponse)
async def get_roster_endpoint(
request: Request,
user: AuthenticatedUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Directory snapshot for session-boot prompt injection.

Returns the caller's full agent roster in a display-optimized
shape — no pagination, no opaque IDs, no secrets, no timestamps,
plus derived ``online`` + ``last_seen_relative`` + ``preferred_contact``
fields. Soft-deleted agents are always excluded.

Distinct from ``GET /v1/agents`` (the management surface with full
``AgentResponse`` shape). See PRD §Surface 5 for context.

Conditional GET: ``If-None-Match`` header that matches the
server-computed weak ETag returns ``304 Not Modified`` with no body.
Ports cueapi/cueapi#630.
"""
result = await list_roster(db, user)
etag = result["etag"]

if _etag_matches(request.headers.get("if-none-match"), etag):
return JSONResponse(status_code=304, content=None, headers={"ETag": etag})

body = AgentRosterResponse(
generated_at=result["generated_at"],
agents=[AgentRosterEntry(**e) for e in result["agents"]],
)
return JSONResponse(
status_code=200,
content=body.model_dump(mode="json"),
headers={"ETag": etag, "Cache-Control": "private, max-age=300"},
)


@router.get("", response_model=AgentListResponse)
async def list_agents_endpoint(
status: Optional[str] = Query(default=None),
Expand Down
35 changes: 35 additions & 0 deletions app/schemas/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,38 @@ class WebhookSecretResponse(BaseModel):
"""Response for the webhook-secret retrieval and rotation endpoints."""

webhook_secret: str


class AgentRosterEntry(BaseModel):
"""One agent in the directory snapshot returned by GET /v1/agents/roster.

Distinct from ``AgentResponse``: drops opaque IDs, secrets,
timestamps, and tenancy metadata, and adds derived ``online`` /
``last_seen_relative`` fields. Optimized for prompt injection at
session-boot — agents see "who else is here" natively without
needing to call a tool. Ports cueapi/cueapi#630 (PRD §Surface 5).
"""

name: str = Field(..., description="Stable per-tenant slug; addressable as `<name>@<user_slug>`.")
display_name: str
description: Optional[str] = Field(default=None, description="From metadata.description if set.")
online: bool = Field(..., description="Derived from last_seen_at within 5 min.")
last_seen_relative: str = Field(
...,
description="Human-readable freshness: 'active now', '5m ago', 'offline 2h', 'never'.",
)
preferred_contact: Literal["sync", "async"] = Field(
...,
description="Derived: webhook_url IS NOT NULL → 'sync' (push-capable), else 'async' (poll-only).",
)
status: Literal["online", "offline", "away"] = Field(
...,
description="Caller-asserted status (PATCH /v1/agents/{ref}); overrides derivation when explicit.",
)


class AgentRosterResponse(BaseModel):
"""Response for GET /v1/agents/roster — full directory snapshot."""

generated_at: datetime
agents: List[AgentRosterEntry]
123 changes: 123 additions & 0 deletions app/services/agent_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,129 @@ async def list_agents(
return {"agents": list(rows), "total": int(total), "limit": limit, "offset": offset}


# Online derivation thresholds (Phase A — Agent Directory PRD §Surface 5).
# Tuned conservatively: 5 min covers typical poll cadence of bundled
# workers + Live-attached sessions; 30 min covers slower agents and
# transient connectivity blips. Ports cueapi/cueapi#630.
_ONLINE_THRESHOLD_SECONDS = 300 # 5 min → online
_AWAY_THRESHOLD_SECONDS = 1800 # 30 min → away (else offline)
_ETAG_BUCKET_SECONDS = 300 # Align ETag freshness to derivation thresholds.


def _format_relative(now, last_seen_at) -> str:
if last_seen_at is None:
return "never"
delta = (now - last_seen_at).total_seconds()
if delta < 60:
return "active now"
if delta < 3600:
return f"{int(delta / 60)}m ago"
if delta < 86400:
return f"{int(delta / 3600)}h ago"
return f"{int(delta / 86400)}d ago"


def _derive_online_state(now, last_seen_at, asserted_status: str):
"""Returns (online_bool, derived_status_str).

Caller override wins: if asserted_status is 'away' or 'offline',
that sticks regardless of recent activity.
"""
if asserted_status in ("away", "offline"):
return (False, asserted_status)
if last_seen_at is None:
return (False, "offline")
delta = (now - last_seen_at).total_seconds()
if delta <= _ONLINE_THRESHOLD_SECONDS:
return (True, "online")
if delta <= _AWAY_THRESHOLD_SECONDS:
return (False, "away")
return (False, "offline")


def _bucketed_seen(last_seen_at):
"""Floor last_seen_at to 5-min bucket for ETag stability.

Quiet windows (no activity within the bucket) produce a stable
ETag — clients polling at the suggested cadence get 304.
"""
if last_seen_at is None:
return ""
epoch = int(last_seen_at.timestamp())
return str(epoch - (epoch % _ETAG_BUCKET_SECONDS))


def _build_roster_entry(agent, now):
"""Pure helper: ORM Agent row → (entry_dict, etag_part_string)."""
online, derived_status = _derive_online_state(
now, agent.last_seen_at, agent.status
)
description = None
meta = agent.metadata_ or {}
if isinstance(meta, dict) and isinstance(meta.get("description"), str):
description = meta["description"]

preferred_contact = "sync" if agent.webhook_url else "async"
entry = {
"name": agent.slug,
"display_name": agent.display_name,
"description": description,
"online": online,
"last_seen_relative": _format_relative(now, agent.last_seen_at),
"preferred_contact": preferred_contact,
"status": derived_status,
}
etag_part = "|".join([
agent.slug,
agent.display_name,
description or "",
"1" if online else "0",
preferred_contact,
derived_status,
_bucketed_seen(agent.last_seen_at),
])
return entry, etag_part


def _compute_roster_etag(etag_parts):
"""Pure helper: list of per-agent etag-part strings → weak ETag."""
import hashlib
digest = hashlib.sha256("\n".join(etag_parts).encode("utf-8")).hexdigest()
return f'W/"{digest[:16]}"'


async def list_roster(db: AsyncSession, user: AuthenticatedUser) -> Dict:
"""Build the roster snapshot for GET /v1/agents/roster.

Returns ``{"generated_at": now, "agents": [...], "etag": "..."}``.
Always-full list (no pagination), always excludes soft-deleted.
Display-optimized for prompt injection — see PRD §Surface 5.
Ports cueapi/cueapi#630.
"""
from datetime import datetime, timezone
now = datetime.now(timezone.utc)

rows_q = (
select(Agent)
.where(Agent.user_id == user.id, Agent.deleted_at.is_(None))
.order_by(Agent.slug)
)
rows = (await db.execute(rows_q)).scalars().all()

entries = []
etag_parts = []
for agent in rows:
entry, etag_part = _build_roster_entry(agent, now)
entries.append(entry)
etag_parts.append(etag_part)

return {
"generated_at": now,
"agents": entries,
"etag": _compute_roster_etag(etag_parts),
}


async def get_agent_owned(
db: AsyncSession,
user: AuthenticatedUser,
Expand Down
18 changes: 18 additions & 0 deletions app/services/inbox_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ def _parse_state_filter(
return parsed


def _bump_last_seen_stmt(agent_id, now):
"""Pure helper: build the UPDATE statement that bumps an agent's
``last_seen_at`` to ``now``. Ports cueapi/cueapi#630.
"""
return update(Agent).where(Agent.id == agent_id).values(last_seen_at=now)


async def list_inbox(
db: AsyncSession,
user: AuthenticatedUser,
Expand Down Expand Up @@ -203,6 +210,17 @@ async def list_inbox(
.returning(Message.id)
)
await db.execute(upd_q)
# Agent Directory (Phase A): bump recipient's last_seen_at on
# every poll. Even if no queued messages, the poll proves the
# agent is active. Ports cueapi/cueapi#630.
await db.execute(_bump_last_seen_stmt(agent.id, now))
await db.commit()
else:
# No queued→delivered transition this call (filter excluded
# ``queued``), but we still observed activity from the recipient.
await db.execute(
_bump_last_seen_stmt(agent.id, datetime.now(timezone.utc))
)
await db.commit()

# Total (after the transition).
Expand Down
10 changes: 10 additions & 0 deletions app/services/message_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,16 @@ async def create_message(
)
)

# Agent Directory (Phase A) — bump sender's last_seen_at so
# GET /v1/agents/roster derives 'online' correctly without
# requiring callers to PATCH status. Inline UPDATE keeps this in
# the same transaction as the message insert. Ports cueapi/cueapi#630.
from sqlalchemy import update as _update
from datetime import datetime as _dt, timezone as _tz
await db.execute(
_update(Agent).where(Agent.id == from_agent.id).values(last_seen_at=_dt.now(_tz.utc))
)

await db.commit()
await db.refresh(msg)

Expand Down
12 changes: 6 additions & 6 deletions parity-manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
{"path": "app/models/usage_monthly.py", "private_counterpart": "app/models/usage_monthly.py", "last_synced": "2026-04-16"},
{"path": "app/models/user.py", "private_counterpart": "app/models/user.py", "last_synced": "2026-04-16"},
{"path": "app/models/worker.py", "private_counterpart": "app/models/worker.py", "last_synced": "2026-04-16"},
{"path": "app/models/agent.py", "private_counterpart": "app/models/agent.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"},
{"path": "app/models/agent.py", "private_counterpart": "app/models/agent.py", "last_synced": "2026-05-05", "ported_in": "agent-directory-port + messaging-primitive-port"},
{"path": "app/models/message.py", "private_counterpart": "app/models/message.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port", "deviation": "from_api_key_id column omitted (multi-key scoping is hosted-only)"},
{"path": "app/models/usage_messages_monthly.py", "private_counterpart": "app/models/usage_messages_monthly.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"}
],
Expand All @@ -102,7 +102,7 @@
{"path": "app/routers/usage.py", "private_counterpart": "app/routers/usage.py", "last_synced": "2026-04-16"},
{"path": "app/routers/webhook_secret.py", "private_counterpart": "app/routers/webhook_secret.py", "last_synced": "2026-04-16"},
{"path": "app/routers/workers.py", "private_counterpart": "app/routers/workers.py", "last_synced": "2026-04-16"},
{"path": "app/routers/agents.py", "private_counterpart": "app/routers/agents.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"},
{"path": "app/routers/agents.py", "private_counterpart": "app/routers/agents.py", "last_synced": "2026-05-05", "ported_in": "agent-directory-port + messaging-primitive-port"},
{"path": "app/routers/messages.py", "private_counterpart": "app/routers/messages.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"}
],
"schemas": [
Expand All @@ -112,7 +112,7 @@
{"path": "app/schemas/execution.py", "private_counterpart": "app/schemas/execution.py", "last_synced": "2026-04-16"},
{"path": "app/schemas/outcome.py", "private_counterpart": "app/schemas/outcome.py", "last_synced": "2026-04-16"},
{"path": "app/schemas/worker.py", "private_counterpart": "app/schemas/worker.py", "last_synced": "2026-04-16"},
{"path": "app/schemas/agent.py", "private_counterpart": "app/schemas/agent.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port", "deviation": "api_key_id field omitted from AgentResponse (multi-key scoping is hosted-only)"},
{"path": "app/schemas/agent.py", "private_counterpart": "app/schemas/agent.py", "last_synced": "2026-05-05", "ported_in": "agent-directory-port + messaging-primitive-port", "deviation": "api_key_id field omitted from AgentResponse (multi-key scoping is hosted-only)"},
{"path": "app/schemas/message.py", "private_counterpart": "app/schemas/message.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port", "deviation": "from_api_key_id field omitted from MessageResponse (multi-key scoping is hosted-only)"}
],
"services": [
Expand All @@ -124,11 +124,11 @@
{"path": "app/services/outcome_service.py", "private_counterpart": "app/services/outcome_service.py", "last_synced": "2026-04-16"},
{"path": "app/services/usage_service.py", "private_counterpart": "app/services/usage_service.py", "last_synced": "2026-04-16"},
{"path": "app/services/webhook.py", "private_counterpart": "app/services/webhook.py", "last_synced": "2026-04-16"},
{"path": "app/services/agent_service.py", "private_counterpart": "app/services/agent_service.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port", "deviation": "user.api_key_id reference removed (multi-key scoping is hosted-only)"},
{"path": "app/services/inbox_service.py", "private_counterpart": "app/services/inbox_service.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"},
{"path": "app/services/agent_service.py", "private_counterpart": "app/services/agent_service.py", "last_synced": "2026-05-05", "ported_in": "agent-directory-port + messaging-primitive-port", "deviation": "user.api_key_id reference removed (multi-key scoping is hosted-only)"},
{"path": "app/services/inbox_service.py", "private_counterpart": "app/services/inbox_service.py", "last_synced": "2026-05-05", "ported_in": "agent-directory-port + messaging-primitive-port"},
{"path": "app/services/message_classification.py", "private_counterpart": "app/services/message_classification.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"},
{"path": "app/services/message_delivery.py", "private_counterpart": "app/services/message_delivery.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"},
{"path": "app/services/message_service.py", "private_counterpart": "app/services/message_service.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port", "deviation": "user.api_key_id and msg.from_api_key_id references removed"},
{"path": "app/services/message_service.py", "private_counterpart": "app/services/message_service.py", "last_synced": "2026-05-05", "ported_in": "agent-directory-port + messaging-primitive-port", "deviation": "user.api_key_id and msg.from_api_key_id references removed"},
{"path": "app/services/message_usage_service.py", "private_counterpart": "app/services/message_usage_service.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"}
],
"utils": [
Expand Down
Loading
Loading