Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 196 additions & 0 deletions cueapi/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2451,5 +2451,201 @@ def message_to(
click.echo(str(e))


# ───────────────────────────────────────────────────────────────────
# Event-emit primitive (PR-1b) — events + subscriptions top-level groups
# ───────────────────────────────────────────────────────────────────


@main.group()
def events() -> None:
"""Pull events from an agent's event stream (PR-1b)."""
pass


@events.command(name="list")
@click.argument("ref")
@click.option("--since", default=None, type=int,
help="Cursor — only return events with id > since (BIGSERIAL).")
@click.option("--limit", default=100, type=int,
help="Page size (default 100, server caps at 1000).")
@click.option("--event-type", "event_type", default=None,
help="Filter to a specific event type.")
@click.pass_context
def events_list(
ctx: click.Context,
ref: str,
since: Optional[int],
limit: int,
event_type: Optional[str],
) -> None:
"""List events for an agent.

Events are append-only with a monotonic id (BIGSERIAL). Use --since
as a cursor: pass the highest id from the previous page to continue
pagination. Default fetches from the beginning.
"""
try:
with CueAPIClient(api_key=ctx.obj.get("api_key"), profile=ctx.obj.get("profile")) as client:
params: dict = {"limit": limit}
if since is not None:
params["since"] = since
if event_type:
params["event_type"] = event_type
resp = client.get(f"/agents/{ref}/events", params=params)
if resp.status_code == 404:
echo_error(f"Agent not found: {ref}")
return
if resp.status_code != 200:
error = resp.json().get("detail", {}).get("error", {})
echo_error(error.get("message", f"Failed (HTTP {resp.status_code})"))
return
data = resp.json()
evs = data.get("events", [])
if not evs:
click.echo("\nNo events.\n")
return
click.echo()
rows = []
for e in evs:
ts = (e.get("emitted_at") or "")[:19].replace("T", " ")
rows.append([
str(e.get("id", "?")),
e.get("event_type", "?"),
ts,
])
echo_table(["ID", "EVENT TYPE", "EMITTED AT"], rows, widths=[12, 32, 22])
cursor = data.get("next_cursor")
if cursor is not None:
echo_info("Next cursor:", str(cursor))
click.echo()
except click.ClickException as e:
click.echo(str(e))


@main.group()
def subscriptions() -> None:
"""Manage event subscriptions for an agent (PR-1b)."""
pass


@subscriptions.command(name="create")
@click.argument("ref")
@click.option("--event-type", "event_type", required=True,
help="Event type to subscribe to (e.g. message.received).")
@click.option("--delivery-target", "delivery_target", required=True,
type=click.Choice(["pull", "webhook"]),
help="Delivery mechanism: pull (poll via `cueapi events list`) or webhook (server POSTs).")
@click.option("--webhook-url", "webhook_url", default=None,
help="Required for delivery-target=webhook; HTTPS only.")
@click.pass_context
def subscriptions_create(
ctx: click.Context,
ref: str,
event_type: str,
delivery_target: str,
webhook_url: Optional[str],
) -> None:
"""Create a subscription for an agent.

Subscriptions are agent-scoped — an agent can only subscribe to
events FOR ITSELF. The calling user must own the agent.

For webhook subscriptions, the response includes ``webhook_secret``
ONE-TIME. Save it now — the server never re-exposes it.
"""
if delivery_target == "webhook" and not webhook_url:
raise click.UsageError("--webhook-url is required when --delivery-target=webhook")
body: dict = {"event_type": event_type, "delivery_target": delivery_target}
if webhook_url:
body["webhook_url"] = webhook_url
try:
with CueAPIClient(api_key=ctx.obj.get("api_key"), profile=ctx.obj.get("profile")) as client:
resp = client.post(f"/agents/{ref}/subscriptions", json=body)
if resp.status_code == 404:
echo_error(f"Agent not found: {ref}")
return
if resp.status_code != 201:
error = resp.json().get("detail", {}).get("error", {})
echo_error(error.get("message", f"Failed (HTTP {resp.status_code})"))
return
sub = resp.json()
click.echo()
echo_success(f"Subscription created: {sub.get('id', '?')}")
echo_info("Event type:", sub.get("event_type", "?"))
echo_info("Delivery target:", sub.get("delivery_target", "?"))
secret = sub.get("webhook_secret")
if secret:
echo_info("Webhook secret (save now — only shown once):", secret)
click.echo()
except click.ClickException as e:
click.echo(str(e))


@subscriptions.command(name="list")
@click.argument("ref")
@click.pass_context
def subscriptions_list(ctx: click.Context, ref: str) -> None:
"""List active subscriptions for an agent.

``webhook_url`` is redacted to host-only in the response;
``webhook_secret`` is never exposed here (only on create).
"""
try:
with CueAPIClient(api_key=ctx.obj.get("api_key"), profile=ctx.obj.get("profile")) as client:
resp = client.get(f"/agents/{ref}/subscriptions")
if resp.status_code == 404:
echo_error(f"Agent not found: {ref}")
return
if resp.status_code != 200:
echo_error(f"Failed (HTTP {resp.status_code})")
return
data = resp.json()
subs = data.get("subscriptions", [])
if not subs:
click.echo("\nNo active subscriptions.\n")
return
click.echo()
rows = []
for s in subs:
rows.append([
s.get("id", "?")[:36],
s.get("event_type", "?"),
s.get("delivery_target", "?"),
s.get("webhook_url", "—") if s.get("delivery_target") == "webhook" else "—",
])
echo_table(["ID", "EVENT TYPE", "TARGET", "WEBHOOK HOST"], rows,
widths=[38, 28, 10, 24])
click.echo()
except click.ClickException as e:
click.echo(str(e))


@subscriptions.command(name="delete")
@click.argument("ref")
@click.argument("subscription_id")
@click.pass_context
def subscriptions_delete(ctx: click.Context, ref: str, subscription_id: str) -> None:
"""Soft-detach a subscription. Idempotent.

Re-DELETE on an already-detached subscription returns 200. The
server does NOT delete the row — it marks it detached so dispatch
stops + audit history is preserved.
"""
try:
with CueAPIClient(api_key=ctx.obj.get("api_key"), profile=ctx.obj.get("profile")) as client:
resp = client.delete(f"/agents/{ref}/subscriptions/{subscription_id}")
if resp.status_code == 404:
echo_error(f"Agent or subscription not found: {ref}/{subscription_id}")
return
if resp.status_code != 200:
echo_error(f"Failed (HTTP {resp.status_code})")
return
click.echo()
echo_success(f"Subscription detached: {subscription_id}")
click.echo()
except click.ClickException as e:
click.echo(str(e))


if __name__ == "__main__":
main()
Loading