Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions app/routers/cues.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

from typing import Optional

from fastapi import APIRouter, Depends, HTTPException, Query, Response
from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response
from sqlalchemy.ext.asyncio import AsyncSession

from app.auth import AuthenticatedUser, get_current_user
from app.database import get_db
from app.schemas.cue import CueCreate, CueDetailResponse, CueListResponse, CueResponse, CueUpdate, FireRequest
from app.services.cue_service import create_cue, delete_cue, get_cue, list_cues, update_cue
from app.utils.verify_echo import apply_verify_echo

router = APIRouter(prefix="/v1/cues", tags=["cues"])

Expand Down Expand Up @@ -91,6 +92,7 @@ async def delete(
@router.post("/{cue_id}/fire", status_code=200)
async def fire_cue(
cue_id: str,
request: Request,
body: Optional[FireRequest] = None,
user: AuthenticatedUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
Expand Down Expand Up @@ -159,8 +161,13 @@ async def fire_cue(
db.add(outbox)

await db.commit()
return {
response_content: dict = {
"id": str(execution_id), "cue_id": cue.id,
"scheduled_for": effective_scheduled_for.isoformat(),
"status": "pending", "triggered_by": "manual_fire",
}
# BodyVerify Layer 1: opt-in echo-back when caller sets
# X-CueAPI-Verify-Echo: true. Helper returns {} when header absent
# so non-opted clients see zero shape change.
response_content.update(apply_verify_echo(request=request, parsed_body=body))
return response_content
10 changes: 9 additions & 1 deletion app/routers/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
mark_read,
to_response_dict,
)
from app.utils.verify_echo import apply_verify_echo
from fastapi.responses import JSONResponse

router = APIRouter(prefix="/v1/messages", tags=["messages"])
Expand Down Expand Up @@ -101,9 +102,16 @@ async def send_message(
# to priority=3. Surface the signal so senders can detect and
# adapt without parsing message body.
headers["X-CueAPI-Priority-Downgraded"] = "true"
response_content = MessageResponse(
**to_response_dict(msg)
).model_dump(mode="json")
# BodyVerify Layer 1: opt-in echo-back when caller sets
# X-CueAPI-Verify-Echo: true. Helper returns {} when header absent
# so non-opted clients see zero shape change.
response_content.update(apply_verify_echo(request=request, parsed_body=body))
return JSONResponse(
status_code=status_code,
content=MessageResponse(**to_response_dict(msg)).model_dump(mode="json"),
content=response_content,
headers=headers,
)

Expand Down
82 changes: 82 additions & 0 deletions app/utils/verify_echo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""Body-verify echo-back primitive (Layer 1 of silent-body-corruption defense).

When request header ``X-CueAPI-Verify-Echo: true`` is present on a supported
endpoint, the server adds ``body_received`` and ``body_received_sha256``
fields to the 200 response. The caller diffs sent body vs received to detect
caller-side shell expansion (backticks, $-paren, ${VAR}) that silently
corrupts body content before send-time.

Why this exists: 2026-05-11 ~22:00Z, CMA's outbound Cue Messages 0/6 to
cue-pm fell to garbage via inline bash ``-d "$BODY"`` where $BODY had been
mutated by shell expansion at variable-assignment time. Server received
valid JSON with wrong content; no layer fails loud. Echo-back is the
keystone for the 4-layer defense: substrate (this), SDK auto-verify, CLI
force-file mode, docs leading with file-payload pattern.

Design doc: https://trydock.ai/workspaces/cue-message-silent-corruption-substrate-design-2026-05-11
"""
from __future__ import annotations

import hashlib
import json
from typing import Any, Dict, Optional

from fastapi import Request


VERIFY_ECHO_HEADER = "X-CueAPI-Verify-Echo"


def verify_echo_requested(request: Request) -> bool:
"""True iff ``X-CueAPI-Verify-Echo: true`` header is present (case-insensitive)."""
return request.headers.get(VERIFY_ECHO_HEADER, "").strip().lower() == "true"


def _canonical_json_bytes(value: Any) -> bytes:
"""Stable JSON serialization for hashing: sorted keys + no whitespace."""
return json.dumps(value, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode(
"utf-8"
)


def apply_verify_echo(*, request: Request, parsed_body: Optional[Any]) -> Dict[str, Any]:
"""Return verify-echo fields to merge into the response.

Returns ``{}`` when the header is absent (zero-cost no-op for non-opted
clients). When present, returns::

{
"body_received": <parsed body dict / str / None>,
"body_received_sha256": <64-char hex digest>,
}

Hashing rule:

* ``None`` body → SHA256 of empty bytes (well-known constant).
* Pydantic model → ``model_dump(mode="json")`` then canonical JSON.
* dict → canonical JSON.
* Otherwise → ``str()`` then UTF-8 bytes.

The dict is intended to be ``.update()``-merged into the response dict
or returned alongside other fields. Caller is responsible for placement.
"""
if not verify_echo_requested(request):
return {}

if parsed_body is None:
body_view: Any = None
sha_input: bytes = b""
elif hasattr(parsed_body, "model_dump"):
body_view = parsed_body.model_dump(mode="json")
sha_input = _canonical_json_bytes(body_view)
elif isinstance(parsed_body, dict):
body_view = parsed_body
sha_input = _canonical_json_bytes(body_view)
else:
body_view = str(parsed_body)
sha_input = body_view.encode("utf-8")

return {
"body_received": body_view,
"body_received_sha256": hashlib.sha256(sha_input).hexdigest(),
}
7 changes: 4 additions & 3 deletions parity-manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
{"path": "app/routers/__init__.py", "private_counterpart": "app/routers/__init__.py", "last_synced": "2026-04-16"},
{"path": "app/routers/alerts.py", "private_counterpart": "app/routers/alerts.py", "last_synced": "2026-04-17"},
{"path": "app/routers/auth_routes.py", "private_counterpart": "app/routers/auth_routes.py", "last_synced": "2026-04-16"},
{"path": "app/routers/cues.py", "private_counterpart": "app/routers/cues.py", "last_synced": "2026-04-16"},
{"path": "app/routers/cues.py", "private_counterpart": "app/routers/cues.py", "last_synced": "2026-05-11", "ported_in": "bodyverify-layer-1-substrate-echo-back", "deviation": "BodyVerify port: fire_cue handler integrates apply_verify_echo on response. OSS FireRequest carries only send_at; private's payload_override (dict with user-supplied string content carrying corruption-vector) is hosted-only. Metachar-class parametrization tests on the fire path are intentionally private-only — substrate helper coverage is exercised via messages endpoint tests."},
{"path": "app/routers/device_code.py", "private_counterpart": "app/routers/device_code.py", "last_synced": "2026-04-16"},
{"path": "app/routers/echo.py", "private_counterpart": "app/routers/echo.py", "last_synced": "2026-04-16"},
{"path": "app/routers/executions.py", "private_counterpart": "app/routers/executions.py", "last_synced": "2026-04-16"},
Expand All @@ -125,7 +125,7 @@
{"path": "app/routers/webhook_secret.py", "private_counterpart": "app/routers/webhook_secret.py", "last_synced": "2026-04-16"},
{"path": "app/routers/workers.py", "private_counterpart": "app/routers/workers.py", "last_synced": "2026-04-16"},
{"path": "app/routers/agents.py", "private_counterpart": "app/routers/agents.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"},
{"path": "app/routers/messages.py", "private_counterpart": "app/routers/messages.py", "last_synced": "2026-05-11", "ported_in": "messaging-primitive-port + messaging-emission-port (PR-2a)", "deviation": "PR-2a addition: correlation_id passthrough to create_message service."},
{"path": "app/routers/messages.py", "private_counterpart": "app/routers/messages.py", "last_synced": "2026-05-11", "ported_in": "messaging-primitive-port + messaging-emission-port (PR-2a) + bodyverify-layer-1-substrate-echo-back", "deviation": "PR-2a addition: correlation_id passthrough to create_message service. BodyVerify port: send_message handler integrates apply_verify_echo helper on response — identical shape to private; create_message tuple unpack is 3-element OSS vs 4-element private (private adds bcc_emitted)."},
{"path": "app/routers/events.py", "private_counterpart": "app/routers/events.py", "last_synced": "2026-05-11", "ported_in": "event-emit-primitive-port (PR-1b) + long-poll-port (PR-1b spec Q1) + item-1-inline-body-port + item-2b-cursor-advance-ack-port", "deviation": "Long-poll uses internal polling loop rather than PostgreSQL LISTEN/NOTIFY per impl trade-off; wire contract identical. Future LISTEN/NOTIFY swap tracked at cmp0m7n7r (private) — file OSS counterpart row when prioritized. Item 1 addition: SubscriptionCreate accepts inline_body field; SubscriptionResponse surfaces it. Item 2(b) addition: PATCH /subscriptions/{id}/ack endpoint + AckSubscriptionRequest schema + _advance_ack_after_pull helper + last_acked_event_id surfaced on SubscriptionResponse."}
],
"schemas": [
Expand Down Expand Up @@ -165,7 +165,8 @@
{"path": "app/utils/templates.py", "private_counterpart": "app/utils/templates.py", "last_synced": "2026-04-16"},
{"path": "app/utils/url_validation.py", "private_counterpart": "app/utils/url_validation.py", "last_synced": "2026-04-16"},
{"path": "app/utils/retry_after.py", "private_counterpart": "app/utils/retry_after.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"},
{"path": "app/utils/slug.py", "private_counterpart": "app/utils/slug.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"}
{"path": "app/utils/slug.py", "private_counterpart": "app/utils/slug.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port"},
{"path": "app/utils/verify_echo.py", "private_counterpart": "app/utils/verify_echo.py", "last_synced": "2026-05-11", "ported_in": "bodyverify-layer-1-substrate-echo-back"}
],
"worker": [
{"path": "worker/tasks.py", "private_counterpart": "worker/tasks.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port", "deviation": "ports messaging-specific functions only (deliver_message_task, retry_message_task, _check_concurrent_cap_or_recycle, _release_concurrent, _load_message_context, _claim_message, _route_attempt_outcome) — does not port other private deltas (catch_up_policy, heartbeat trigger, etc., scheduled for a follow-up phase)"},
Expand Down
Loading