diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 1eacd8c..ace60e5 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -2,9 +2,10 @@ ## Current Implemented Slice -AliceBot now implements the accepted repo slice through Sprint 5T. The shipped backend includes: +AliceBot now implements the accepted repo slice through Sprint 6H. The shipped backend includes: - foundation continuity storage over `users`, `threads`, `sessions`, and append-only `events` +- narrow continuity APIs for thread create/list/detail plus thread session/event review over those durable continuity records - deterministic tracing and context compilation over durable continuity, memory, entity, and entity-edge records - governed memory admission, explicit-preference extraction, memory review labels, review queue reads, evaluation summary reads, explicit embedding config and memory-embedding storage, direct semantic retrieval, and deterministic hybrid compile-path memory merge - deterministic prompt assembly and one no-tools response path that persists assistant replies as immutable continuity events @@ -21,7 +22,7 @@ The current multi-step boundary is narrow and explicit. Manual continuation is i - `docker-compose.yml` starts local Postgres with `pgvector`, Redis, and MinIO. - `scripts/dev_up.sh`, `scripts/migrate.sh`, and `scripts/api_dev.sh` provide the local startup path, with readiness gating before migrations. - `apps/api` exposes FastAPI endpoints for: - - health and compile: `/healthz`, `POST /v0/context/compile`, `POST /v0/responses` + - health, continuity, and compile: `/healthz`, `POST /v0/threads`, `GET /v0/threads`, `GET /v0/threads/{thread_id}`, `GET /v0/threads/{thread_id}/sessions`, `GET /v0/threads/{thread_id}/events`, `POST /v0/context/compile`, `POST /v0/responses` - memory and retrieval: `POST /v0/memories/admit`, `POST /v0/memories/extract-explicit-preferences`, `GET /v0/memories`, `GET /v0/memories/review-queue`, `GET /v0/memories/evaluation-summary`, `POST /v0/memories/semantic-retrieval`, `GET /v0/memories/{memory_id}`, `GET /v0/memories/{memory_id}/revisions`, `POST /v0/memories/{memory_id}/labels`, `GET /v0/memories/{memory_id}/labels` - embeddings and graph seams: `POST /v0/embedding-configs`, `GET /v0/embedding-configs`, `POST /v0/memory-embeddings`, `GET /v0/memories/{memory_id}/embeddings`, `GET /v0/memory-embeddings/{memory_embedding_id}`, `POST /v0/entities`, `GET /v0/entities`, `GET /v0/entities/{entity_id}`, `POST /v0/entity-edges`, `GET /v0/entities/{entity_id}/edges` - governance: `POST /v0/consents`, `GET /v0/consents`, `POST /v0/policies`, `GET /v0/policies`, `GET /v0/policies/{policy_id}`, `POST /v0/policies/evaluate`, `POST /v0/tools`, `GET /v0/tools`, `GET /v0/tools/{tool_id}`, `POST /v0/tools/allowlist/evaluate`, `POST /v0/tools/route`, `POST /v0/approvals/requests`, `GET /v0/approvals`, `GET /v0/approvals/{approval_id}`, `POST /v0/approvals/{approval_id}/approve`, `POST /v0/approvals/{approval_id}/reject`, `POST /v0/approvals/{approval_id}/execute` @@ -64,10 +65,21 @@ The current multi-step boundary is narrow and explicit. Manual continuation is i - `apps/web`: minimal shell only; no shipped workflow UI. - `workers`: scaffold only; no background jobs or runner logic are implemented. - `infra`: local development bootstrap assets only. -- `tests`: unit and Postgres-backed integration coverage for the shipped seams above, including Sprint 4O task-step lineage/manual continuation, Sprint 4S step-linked execution synchronization, Sprint 5A task-workspace provisioning, Sprint 5C task-artifact registration, Sprint 5D local artifact ingestion plus chunk reads, Sprint 5E lexical artifact-chunk retrieval, Sprint 5F compile-path artifact chunk integration, Sprint 5G artifact-chunk embedding persistence and reads, Sprint 5H direct semantic artifact-chunk retrieval, Sprint 5I compile-path semantic artifact retrieval, Sprint 5J deterministic hybrid lexical-plus-semantic artifact merge in compile, Sprint 5L narrow PDF artifact ingestion, Sprint 5M narrow DOCX artifact ingestion, Sprint 5N narrow RFC822 email artifact ingestion, Sprint 5O read-only Gmail account plus single-message ingestion coverage, Sprint 5P Gmail credential hardening coverage, Sprint 5Q Gmail refresh-token lifecycle coverage, Sprint 5R Gmail refresh-token rotation handling coverage, and Sprint 5T Gmail external secret-manager coverage. +- `tests`: unit and Postgres-backed integration coverage for the shipped seams above, including Sprint 4O task-step lineage/manual continuation, Sprint 4S step-linked execution synchronization, Sprint 5A task-workspace provisioning, Sprint 5C task-artifact registration, Sprint 5D local artifact ingestion plus chunk reads, Sprint 5E lexical artifact-chunk retrieval, Sprint 5F compile-path artifact chunk integration, Sprint 5G artifact-chunk embedding persistence and reads, Sprint 5H direct semantic artifact-chunk retrieval, Sprint 5I compile-path semantic artifact retrieval, Sprint 5J deterministic hybrid lexical-plus-semantic artifact merge in compile, Sprint 5L narrow PDF artifact ingestion, Sprint 5M narrow DOCX artifact ingestion, Sprint 5N narrow RFC822 email artifact ingestion, Sprint 5O read-only Gmail account plus single-message ingestion coverage, Sprint 5P Gmail credential hardening coverage, Sprint 5Q Gmail refresh-token lifecycle coverage, Sprint 5R Gmail refresh-token rotation handling coverage, Sprint 5T Gmail external secret-manager coverage, and Sprint 6H continuity API create/list/detail plus thread session/event review coverage. ## Core Flows Implemented Now +### Continuity Review + +1. Accept a user-scoped `POST /v0/threads` request with one caller-supplied thread title. +2. Persist exactly one visible `threads` row through the existing continuity store. +3. List visible threads through `GET /v0/threads` in deterministic `created_at DESC, id DESC` order. +4. Read one visible thread through `GET /v0/threads/{thread_id}`. +5. Read visible sessions for one visible thread through `GET /v0/threads/{thread_id}/sessions` in deterministic `started_at ASC, created_at ASC, id ASC` order. +6. Read immutable visible events for one visible thread through `GET /v0/threads/{thread_id}/events` in deterministic `sequence_no ASC` order. +7. Return stable summary metadata for thread, session, and event list reads. +8. Reuse only already-persisted continuity data; session mutation, event mutation, rename, archive, pagination, and search remain out of scope. + ### Deterministic Context Compilation 1. Accept a user-scoped `POST /v0/context/compile` request. diff --git a/BUILD_REPORT.md b/BUILD_REPORT.md index ea5f49d..7a3a1ab 100644 --- a/BUILD_REPORT.md +++ b/BUILD_REPORT.md @@ -2,131 +2,216 @@ ## sprint objective -Implement Sprint 6G by turning `/chat` into a dual-mode operator conversation surface with: +Implement Sprint 6H by exposing narrow user-scoped continuity APIs over the shipped continuity store: -- assistant response mode backed by `POST /v0/responses` -- governed request mode retained through `POST /v0/approvals/requests` +- `POST /v0/threads` +- `GET /v0/threads` +- `GET /v0/threads/{thread_id}` +- `GET /v0/threads/{thread_id}/sessions` +- `GET /v0/threads/{thread_id}/events` -The sprint stays inside the shipped backend seams and keeps the two behaviors visibly separate. +The work stays inside backend continuity scope and does not widen response generation, task orchestration, Gmail, or UI behavior. ## completed work -- updated `apps/web/app/chat/page.tsx` to: - - make assistant mode the default `/chat` state - - add an explicit mode toggle between assistant chat and governed request submission - - seed fixture history only when live API configuration is absent - - keep the side rail mode-specific so supporting guidance stays relevant instead of noisy -- added `apps/web/components/mode-toggle.tsx` as a stable two-state switch with clear labeling and active-state emphasis -- added `apps/web/components/response-composer.tsx` to: - - submit normal assistant questions through `POST /v0/responses` - - keep thread identity explicit - - provide explicit fixture preview fallback when live API configuration is absent -- added `apps/web/components/response-history.tsx` to show bounded assistant history with: - - operator prompt - - assistant reply - - model metadata - - compile and response trace summaries - - direct links into `/traces` -- refined `apps/web/components/request-composer.tsx` so governed mode reads as an intentional approval-gated workflow instead of a chat-like surface -- extended `apps/web/lib/api.ts` with typed assistant-response submission support for `POST /v0/responses` -- extended `apps/web/lib/fixtures.ts` with assistant response fixtures, fixture trace coverage, and deterministic preview entries -- refined `apps/web/app/globals.css` for the scoped `/chat` surface with: - - stronger hierarchy - - calmer spacing - - bounded history panels - - more deliberate prompt/reply grouping - - safer wrapping for long ids, trace references, and body text - - cleaner mobile stacking for the mode switch and chat workspace -- added narrow frontend coverage in: - - `apps/web/lib/api.test.ts` - - `apps/web/app/chat/page.test.tsx` - - `apps/web/components/response-composer.test.tsx` - - `apps/web/components/response-history.test.tsx` +- added continuity response contracts and ordering metadata in `apps/api/src/alicebot_api/contracts.py` +- introduced `ThreadCreateInput` +- introduced `ThreadRecord` +- introduced `ThreadCreateResponse` +- introduced `ThreadListSummary` +- introduced `ThreadListResponse` +- introduced `ThreadDetailResponse` +- introduced `ThreadSessionRecord` +- introduced `ThreadSessionListSummary` +- introduced `ThreadSessionListResponse` +- introduced `ThreadEventRecord` +- introduced `ThreadEventListSummary` +- introduced `ThreadEventListResponse` +- introduced continuity ordering constants: + - `THREAD_LIST_ORDER` + - `THREAD_SESSION_LIST_ORDER` + - `THREAD_EVENT_LIST_ORDER` +- added deterministic thread listing support in `apps/api/src/alicebot_api/store.py` with `list_threads()` +- implemented the five scoped continuity endpoints in `apps/api/src/alicebot_api/main.py` +- kept continuity reads user-scoped by reusing the existing RLS-backed `ContinuityStore` +- added unit coverage for create/list/detail/session/event response shape, ordering, and invisible-thread handling +- added Postgres-backed integration coverage for create/list/detail/session/event behavior and cross-user isolation +- added a migration guard asserting the shipped thread created-time index remains present for deterministic continuity review queries + +## exact ordering rules + +- thread list order: `created_at DESC`, then `id DESC` +- thread session list order: `started_at ASC`, then `created_at ASC`, then `id ASC` +- thread event list order: `sequence_no ASC` ## incomplete work -- no scoped sprint deliverables remain incomplete in code +- no in-scope code deliverables remain incomplete - intentionally not added: - - backend changes - - thread browsing or thread creation UI - - auth changes - - new routes outside `/chat` - - hidden tool routing or autonomous action behavior - -## exact /chat files and components updated - -- `apps/web/app/chat/page.tsx` -- `apps/web/app/chat/page.test.tsx` -- `apps/web/app/globals.css` -- `apps/web/components/request-composer.tsx` -- `apps/web/components/response-composer.tsx` -- `apps/web/components/response-history.tsx` -- `apps/web/components/mode-toggle.tsx` -- `apps/web/components/status-badge.tsx` -- `apps/web/lib/api.ts` -- `apps/web/lib/fixtures.ts` -- `apps/web/lib/api.test.ts` -- `apps/web/components/response-composer.test.tsx` -- `apps/web/components/response-history.test.tsx` + - thread rename + - thread archive + - session mutation APIs + - event mutation or deletion behavior + - thread search, pagination, or filtering + - `/chat` UI thread selection or thread creation UX + +## files changed + +- `ARCHITECTURE.md` +- `apps/api/src/alicebot_api/contracts.py` +- `apps/api/src/alicebot_api/store.py` +- `apps/api/src/alicebot_api/main.py` +- `tests/unit/test_20260310_0001_foundation_continuity.py` +- `tests/unit/test_events.py` +- `tests/integration/test_continuity_api.py` - `BUILD_REPORT.md` -## route backing mode - -- assistant mode in `/chat` is: - - live-API-backed when API configuration is present - - fixture-backed when API configuration is absent -- governed request mode in `/chat` is: - - live-API-backed when API configuration is present - - fixture-backed when API configuration is absent - -## backend endpoints consumed - -- `POST /v0/responses` -- `POST /v0/approvals/requests` - -## exact commands run - -- `cd /Users/samirusani/Desktop/Codex/AliceBot/apps/web && npm run lint` -- `cd /Users/samirusani/Desktop/Codex/AliceBot/apps/web && npm test` -- `cd /Users/samirusani/Desktop/Codex/AliceBot/apps/web && npm run build` - -## lint, test, and build results - -- lint result: PASS -- test result: PASS - - `7` test files passed - - `28` tests passed -- build result: PASS - -## desktop and mobile visual verification notes - -- no browser-driven visual QA pass was executed in this turn -- desktop note: - - assistant mode now presents the composer and bounded response history as two coordinated panels instead of one long undifferentiated form - - the mode switch is visible near the page header and reads as a stable route-level decision rather than an inline afterthought - - response prompt, reply, ids, and trace summaries all use explicit containment styles with overflow wrapping - - live-configured `/chat` now starts empty in both modes instead of showing synthetic fixture history -- mobile note: - - the mode switch collapses to one column below the existing breakpoint - - the assistant workspace collapses from a two-panel layout to one column so the composer remains primary and the history panel follows cleanly - - buttons continue to expand to full width on narrow screens to avoid cramped action rows +## tests run + +- `./.venv/bin/python -m pytest tests/unit/test_events.py tests/unit/test_20260310_0001_foundation_continuity.py tests/integration/test_continuity_api.py` + - result: partial pass, then blocked by sandbox-local Postgres access for integration setup +- `./.venv/bin/python -m pytest tests/unit/test_main.py -q` + - result: PASS (`41` passed) +- `./.venv/bin/python -m pytest tests/integration/test_continuity_api.py` + - result: PASS (`2` passed) +- `./.venv/bin/python -m pytest tests/unit` + - result: PASS (`454` passed) +- `./.venv/bin/python -m pytest tests/integration` + - result: PASS (`145` passed) + +## example thread create response + +```json +{ + "thread": { + "id": "30000000-0000-4000-8000-000000000003", + "title": "Gamma thread", + "created_at": "2026-03-17T10:00:00+00:00", + "updated_at": "2026-03-17T10:00:00+00:00" + } +} +``` + +## example thread list response + +```json +{ + "items": [ + { + "id": "30000000-0000-4000-8000-000000000003", + "title": "Gamma thread", + "created_at": "2026-03-17T10:00:00+00:00", + "updated_at": "2026-03-17T10:00:00+00:00" + }, + { + "id": "00000000-0000-4000-8000-000000000002", + "title": "Beta thread", + "created_at": "2026-03-17T09:00:00+00:00", + "updated_at": "2026-03-17T09:00:00+00:00" + }, + { + "id": "00000000-0000-4000-8000-000000000001", + "title": "Alpha thread", + "created_at": "2026-03-17T09:00:00+00:00", + "updated_at": "2026-03-17T09:00:00+00:00" + } + ], + "summary": { + "total_count": 3, + "order": ["created_at_desc", "id_desc"] + } +} +``` + +## example thread detail response + +```json +{ + "thread": { + "id": "00000000-0000-4000-8000-000000000002", + "title": "Beta thread", + "created_at": "2026-03-17T09:00:00+00:00", + "updated_at": "2026-03-17T09:00:00+00:00" + } +} +``` + +## example thread-session list response + +```json +{ + "items": [ + { + "id": "10000000-0000-4000-8000-000000000001", + "thread_id": "00000000-0000-4000-8000-000000000002", + "status": "completed", + "started_at": "2026-03-17T09:00:00+00:00", + "ended_at": "2026-03-17T09:05:00+00:00", + "created_at": "2026-03-17T09:00:00+00:00" + }, + { + "id": "10000000-0000-4000-8000-000000000002", + "thread_id": "00000000-0000-4000-8000-000000000002", + "status": "active", + "started_at": "2026-03-17T10:00:00+00:00", + "ended_at": null, + "created_at": "2026-03-17T10:00:00+00:00" + } + ], + "summary": { + "thread_id": "00000000-0000-4000-8000-000000000002", + "total_count": 2, + "order": ["started_at_asc", "created_at_asc", "id_asc"] + } +} +``` + +## example thread-event list response + +```json +{ + "items": [ + { + "id": "20000000-0000-4000-8000-000000000002", + "thread_id": "00000000-0000-4000-8000-000000000002", + "session_id": "10000000-0000-4000-8000-000000000002", + "sequence_no": 1, + "kind": "message.user", + "payload": {"text": "Hello"}, + "created_at": "2026-03-17T10:00:00+00:00" + }, + { + "id": "20000000-0000-4000-8000-000000000001", + "thread_id": "00000000-0000-4000-8000-000000000002", + "session_id": "10000000-0000-4000-8000-000000000002", + "sequence_no": 2, + "kind": "message.assistant", + "payload": {"text": "Hello back"}, + "created_at": "2026-03-17T10:01:00+00:00" + } + ], + "summary": { + "thread_id": "00000000-0000-4000-8000-000000000002", + "total_count": 2, + "order": ["sequence_no_asc"] + } +} +``` ## blockers/issues -- no blockers remain inside sprint scope -- no backend contract changes were required +- no remaining code blockers inside sprint scope +- local sandbox execution initially blocked localhost Postgres access for integration setup; rerunning the Postgres-backed suite with unrestricted execution resolved verification +- `ARCHITECTURE.md` was updated after review so the documented runtime/API inventory matches the shipped `/v0/threads*` continuity surface ## recommended next step -Run a browser-based QA pass against both assistant mode and governed mode to validate: - -- real long-form assistant replies in the bounded history panel -- mode-switch readability and perceived hierarchy on tablet widths -- trace-link destinations against a live configured backend +Use these continuity endpoints in a follow-up `/chat` sprint so the operator can create a thread, browse visible threads, and load thread history without typing raw thread ids manually. ## intentionally deferred after this sprint -- thread browsing, thread create flows, or any broader conversation management UI -- backend changes beyond the shipped `/v0/responses` and `/v0/approvals/requests` seams -- any Gmail, Calendar, auth, runner, or broader workflow expansion -- redesign of unrelated routes outside the scoped `/chat` surface +- all UI work for thread selection, thread creation, and session history presentation +- any new session write endpoint +- any event rewrite, delete, or archive behavior +- any broader chat orchestration changes +- any Gmail, Calendar, auth, approval, task, execution, or runner scope expansion diff --git a/REVIEW_REPORT.md b/REVIEW_REPORT.md index 0d67e0a..a37c64c 100644 --- a/REVIEW_REPORT.md +++ b/REVIEW_REPORT.md @@ -6,46 +6,63 @@ PASS ## criteria met -- The sprint stayed a UI sprint and did not widen backend scope. The implementation remains confined to the web shell and uses only the shipped seams `POST /v0/responses` and `POST /v0/approvals/requests`. -- `/chat` supports assistant response mode via `POST /v0/responses` through `apps/web/lib/api.ts` and `apps/web/components/response-composer.tsx`. -- `/chat` retains governed request mode via the existing approval-request seam through `apps/web/components/request-composer.tsx`. -- The mode switch is explicit and understandable. `apps/web/components/mode-toggle.tsx` keeps assistant and governed modes visibly separate. -- Assistant replies and trace summaries are visible in bounded history panels via `apps/web/components/response-history.tsx`. -- Fixture fallback is now explicit and correctly scoped to the no-config path. Live-configured `/chat` starts empty in both modes instead of showing seeded synthetic history. This is enforced in `apps/web/app/chat/page.tsx` and covered by `apps/web/app/chat/page.test.tsx`. -- The sprint stayed within the exact in-scope files and components listed in the sprint packet. -- The UI continues to follow `DESIGN_SYSTEM.md` materially. The `/chat` surface remains restrained, bounded, and readable on the inspected responsive layouts. -- `BUILD_REPORT.md` now matches the implemented route-backing behavior and current verification totals. -- Verification passed in `apps/web`: - - `npm run lint` - - `npm test` - - `npm run build` - - current totals: `7` test files, `28` tests +- Implemented the exact in-scope continuity API surface: + - `POST /v0/threads` + - `GET /v0/threads` + - `GET /v0/threads/{thread_id}` + - `GET /v0/threads/{thread_id}/sessions` + - `GET /v0/threads/{thread_id}/events` +- Kept the change narrow to continuity scope in the reviewed code diff: + - `apps/api/src/alicebot_api/contracts.py` + - `apps/api/src/alicebot_api/store.py` + - `apps/api/src/alicebot_api/main.py` + - `tests/unit/test_20260310_0001_foundation_continuity.py` + - `tests/unit/test_events.py` + - `tests/integration/test_continuity_api.py` + - `BUILD_REPORT.md` +- Added typed contracts and stable summary metadata for thread create/list/detail, session list, and event list responses. +- Added deterministic thread list ordering in the store: `created_at DESC, id DESC`. +- Reused existing durable continuity data plus narrow thread creation only; no session or event mutation surface was added. +- Preserved user isolation through the existing user-scoped connection and continuity store path. +- Added unit and Postgres-backed integration coverage for create, detail, ordering, event/session reads, not-found behavior, and cross-user isolation. +- Acceptance verification passed in this review: + - `./.venv/bin/python -m pytest tests/unit/test_main.py -q` -> PASS (`41` passed) + - `./.venv/bin/python -m pytest tests/integration/test_continuity_api.py` -> PASS (`2` passed) + - `./.venv/bin/python -m pytest tests/unit` -> PASS (`454` passed) + - `./.venv/bin/python -m pytest tests/integration` -> PASS (`145` passed) ## criteria missed -- None. +- None functionally against the active Sprint 6H packet. ## quality issues -- No blocking quality issues found in the current Sprint 6G implementation. +- No material implementation defects or unsafe behavior were found in the scoped API, store, or tests. +- `tests/unit/test_events.py` now carries thread continuity endpoint tests in addition to event-contract tests. This is not a blocker, but the file is becoming semantically overloaded. ## regression risks -- Residual risk is limited to browser-level presentation because no live browser QA pass was executed in this review cycle. That does not block sprint acceptance. +- Low regression risk in the shipped continuity slice. +- The new reads are deterministic and test-backed for ordering and isolation. +- User visibility still depends on the existing RLS-backed continuity path; the integration suite passing reduces risk materially. +- No additional regression risk was introduced by the follow-up documentation-only edits. ## docs issues -- No blocking docs issues remain for Sprint 6G. +- None. +- `BUILD_REPORT.md` meets the sprint packet requirements. +- `ARCHITECTURE.md` now reflects Sprint 6H and documents the `/v0/threads*` continuity surface accurately. ## should anything be added to RULES.md? - No. +- The existing rules already cover sprint-packet immutability, narrow scope, typed contracts, and test-backed delivery. ## should anything update ARCHITECTURE.md? -- No. +- No further architecture updates are required for this sprint. ## recommended next action -- Sprint 6G can be considered review-passed. -- Next follow-up should be a browser-based QA pass against a live configured backend to validate long-form assistant replies, mode-switch hierarchy on tablet widths, and trace-link destinations. +- Sprint 6H is review-passed and ready for the normal merge/approval path. +- Follow-up work should move to the next scoped sprint that consumes these continuity endpoints in `/chat`. diff --git a/apps/api/src/alicebot_api/contracts.py b/apps/api/src/alicebot_api/contracts.py index 83afd52..a59420e 100644 --- a/apps/api/src/alicebot_api/contracts.py +++ b/apps/api/src/alicebot_api/contracts.py @@ -100,6 +100,9 @@ TRACE_KIND_RESPONSE_GENERATE = "response.generate" TRACE_REVIEW_LIST_ORDER = ["created_at_desc", "id_desc"] TRACE_REVIEW_EVENT_LIST_ORDER = ["sequence_no_asc", "id_asc"] +THREAD_LIST_ORDER = ["created_at_desc", "id_desc"] +THREAD_SESSION_LIST_ORDER = ["started_at_asc", "created_at_asc", "id_asc"] +THREAD_EVENT_LIST_ORDER = ["sequence_no_asc"] MEMORY_REVIEW_ORDER = ["updated_at_desc", "created_at_desc", "id_desc"] MEMORY_REVIEW_QUEUE_ORDER = ["updated_at_desc", "created_at_desc", "id_desc"] MEMORY_REVISION_REVIEW_ORDER = ["sequence_no_asc"] @@ -316,6 +319,77 @@ class TraceEventRecord: payload: JsonObject +@dataclass(frozen=True, slots=True) +class ThreadCreateInput: + title: str + + +class ThreadRecord(TypedDict): + id: str + title: str + created_at: str + updated_at: str + + +class ThreadCreateResponse(TypedDict): + thread: ThreadRecord + + +class ThreadListSummary(TypedDict): + total_count: int + order: list[str] + + +class ThreadListResponse(TypedDict): + items: list[ThreadRecord] + summary: ThreadListSummary + + +class ThreadDetailResponse(TypedDict): + thread: ThreadRecord + + +class ThreadSessionRecord(TypedDict): + id: str + thread_id: str + status: str + started_at: str | None + ended_at: str | None + created_at: str + + +class ThreadSessionListSummary(TypedDict): + thread_id: str + total_count: int + order: list[str] + + +class ThreadSessionListResponse(TypedDict): + items: list[ThreadSessionRecord] + summary: ThreadSessionListSummary + + +class ThreadEventRecord(TypedDict): + id: str + thread_id: str + session_id: str | None + sequence_no: int + kind: str + payload: JsonObject + created_at: str + + +class ThreadEventListSummary(TypedDict): + thread_id: str + total_count: int + order: list[str] + + +class ThreadEventListResponse(TypedDict): + items: list[ThreadEventRecord] + summary: ThreadEventListSummary + + class TraceReviewSummaryRecord(TypedDict): id: str thread_id: str diff --git a/apps/api/src/alicebot_api/main.py b/apps/api/src/alicebot_api/main.py index 4ded160..2025cc6 100644 --- a/apps/api/src/alicebot_api/main.py +++ b/apps/api/src/alicebot_api/main.py @@ -50,6 +50,9 @@ GmailMessageIngestInput, MemoryCandidateInput, MemoryEmbeddingUpsertInput, + THREAD_EVENT_LIST_ORDER, + THREAD_LIST_ORDER, + THREAD_SESSION_LIST_ORDER, MemoryReviewLabelValue, MemoryReviewStatusFilter, PolicyCreateInput, @@ -76,6 +79,18 @@ ToolRoutingDecision, ToolRoutingRequestInput, ToolCreateInput, + ThreadCreateInput, + ThreadCreateResponse, + ThreadDetailResponse, + ThreadEventListResponse, + ThreadEventListSummary, + ThreadEventRecord, + ThreadListResponse, + ThreadListSummary, + ThreadRecord, + ThreadSessionListResponse, + ThreadSessionListSummary, + ThreadSessionRecord, ) from alicebot_api.artifacts import ( TaskArtifactAlreadyExistsError, @@ -237,7 +252,13 @@ ProxyExecutionHandlerNotFoundError, execute_approved_proxy_request, ) -from alicebot_api.store import ContinuityStore, ContinuityStoreInvariantError +from alicebot_api.store import ( + ContinuityStore, + ContinuityStoreInvariantError, + EventRow, + SessionRow, + ThreadRow, +) from alicebot_api.traces import ( TraceNotFoundError, get_trace_record, @@ -379,6 +400,11 @@ class GenerateResponseRequest(BaseModel): max_entity_edges: int = Field(default=DEFAULT_MAX_ENTITY_EDGES, ge=0, le=100) +class CreateThreadRequest(BaseModel): + user_id: UUID + title: str = Field(min_length=1, max_length=200) + + class AdmitMemoryRequest(BaseModel): user_id: UUID memory_key: str = Field(min_length=1, max_length=200) @@ -657,6 +683,38 @@ class SupersedeExecutionBudgetRequest(BaseModel): max_completed_executions: int = Field(ge=1, le=1000000) +def _serialize_thread(thread: ThreadRow) -> ThreadRecord: + return { + "id": str(thread["id"]), + "title": thread["title"], + "created_at": thread["created_at"].isoformat(), + "updated_at": thread["updated_at"].isoformat(), + } + + +def _serialize_thread_session(session: SessionRow) -> ThreadSessionRecord: + return { + "id": str(session["id"]), + "thread_id": str(session["thread_id"]), + "status": session["status"], + "started_at": None if session["started_at"] is None else session["started_at"].isoformat(), + "ended_at": None if session["ended_at"] is None else session["ended_at"].isoformat(), + "created_at": session["created_at"].isoformat(), + } + + +def _serialize_thread_event(event: EventRow) -> ThreadEventRecord: + return { + "id": str(event["id"]), + "thread_id": str(event["thread_id"]), + "session_id": None if event["session_id"] is None else str(event["session_id"]), + "sequence_no": event["sequence_no"], + "kind": event["kind"], + "payload": event["payload"], + "created_at": event["created_at"].isoformat(), + } + + def redact_url_credentials(raw_url: str) -> str: parsed = urlsplit(raw_url) @@ -843,6 +901,111 @@ def generate_assistant_response(request: GenerateResponseRequest) -> JSONRespons ) +@app.post("/v0/threads") +def create_thread(request: CreateThreadRequest) -> JSONResponse: + settings = get_settings() + thread_input = ThreadCreateInput(title=request.title) + + with user_connection(settings.database_url, request.user_id) as conn: + created = ContinuityStore(conn).create_thread(thread_input.title) + + payload: ThreadCreateResponse = {"thread": _serialize_thread(created)} + return JSONResponse( + status_code=201, + content=jsonable_encoder(payload), + ) + + +@app.get("/v0/threads") +def list_threads(user_id: UUID) -> JSONResponse: + settings = get_settings() + + with user_connection(settings.database_url, user_id) as conn: + items = [_serialize_thread(thread) for thread in ContinuityStore(conn).list_threads()] + + summary: ThreadListSummary = { + "total_count": len(items), + "order": list(THREAD_LIST_ORDER), + } + payload: ThreadListResponse = { + "items": items, + "summary": summary, + } + return JSONResponse( + status_code=200, + content=jsonable_encoder(payload), + ) + + +@app.get("/v0/threads/{thread_id}") +def get_thread(thread_id: UUID, user_id: UUID) -> JSONResponse: + settings = get_settings() + + with user_connection(settings.database_url, user_id) as conn: + thread = ContinuityStore(conn).get_thread_optional(thread_id) + + if thread is None: + return JSONResponse(status_code=404, content={"detail": f"thread {thread_id} was not found"}) + + payload: ThreadDetailResponse = {"thread": _serialize_thread(thread)} + return JSONResponse( + status_code=200, + content=jsonable_encoder(payload), + ) + + +@app.get("/v0/threads/{thread_id}/sessions") +def list_thread_sessions(thread_id: UUID, user_id: UUID) -> JSONResponse: + settings = get_settings() + + with user_connection(settings.database_url, user_id) as conn: + store = ContinuityStore(conn) + thread = store.get_thread_optional(thread_id) + if thread is None: + return JSONResponse(status_code=404, content={"detail": f"thread {thread_id} was not found"}) + items = [_serialize_thread_session(session) for session in store.list_thread_sessions(thread_id)] + + summary: ThreadSessionListSummary = { + "thread_id": str(thread["id"]), + "total_count": len(items), + "order": list(THREAD_SESSION_LIST_ORDER), + } + payload: ThreadSessionListResponse = { + "items": items, + "summary": summary, + } + return JSONResponse( + status_code=200, + content=jsonable_encoder(payload), + ) + + +@app.get("/v0/threads/{thread_id}/events") +def list_thread_events(thread_id: UUID, user_id: UUID) -> JSONResponse: + settings = get_settings() + + with user_connection(settings.database_url, user_id) as conn: + store = ContinuityStore(conn) + thread = store.get_thread_optional(thread_id) + if thread is None: + return JSONResponse(status_code=404, content={"detail": f"thread {thread_id} was not found"}) + items = [_serialize_thread_event(event) for event in store.list_thread_events(thread_id)] + + summary: ThreadEventListSummary = { + "thread_id": str(thread["id"]), + "total_count": len(items), + "order": list(THREAD_EVENT_LIST_ORDER), + } + payload: ThreadEventListResponse = { + "items": items, + "summary": summary, + } + return JSONResponse( + status_code=200, + content=jsonable_encoder(payload), + ) + + @app.get("/v0/traces") def list_traces(user_id: UUID) -> JSONResponse: settings = get_settings() diff --git a/apps/api/src/alicebot_api/store.py b/apps/api/src/alicebot_api/store.py index adfaec7..e616ca9 100644 --- a/apps/api/src/alicebot_api/store.py +++ b/apps/api/src/alicebot_api/store.py @@ -418,6 +418,12 @@ class LabelCountRow(TypedDict): WHERE id = %s """ +LIST_THREADS_SQL = """ + SELECT id, user_id, title, created_at, updated_at + FROM threads + ORDER BY created_at DESC, id DESC + """ + INSERT_SESSION_SQL = """ INSERT INTO sessions (user_id, thread_id, status) VALUES (app.current_user_id(), %s, %s) @@ -2584,6 +2590,9 @@ def get_thread(self, thread_id: UUID) -> ThreadRow: def get_thread_optional(self, thread_id: UUID) -> ThreadRow | None: return self._fetch_optional_one(GET_THREAD_SQL, (thread_id,)) + def list_threads(self) -> list[ThreadRow]: + return self._fetch_all(LIST_THREADS_SQL) + def create_session(self, thread_id: UUID, status: str = "active") -> SessionRow: return self._fetch_one("create_session", INSERT_SESSION_SQL, (thread_id, status)) diff --git a/tests/integration/test_continuity_api.py b/tests/integration/test_continuity_api.py new file mode 100644 index 0000000..be9068e --- /dev/null +++ b/tests/integration/test_continuity_api.py @@ -0,0 +1,397 @@ +from __future__ import annotations + +from datetime import UTC, datetime, timedelta +import json +from typing import Any +from urllib.parse import urlencode +from uuid import UUID, uuid4 + +import anyio +import psycopg + +import apps.api.src.alicebot_api.main as main_module +from apps.api.src.alicebot_api.config import Settings +from alicebot_api.db import user_connection +from alicebot_api.store import ContinuityStore + + +def invoke_request( + method: str, + path: str, + *, + query_params: dict[str, str] | None = None, + payload: dict[str, Any] | None = None, +) -> tuple[int, dict[str, Any]]: + messages: list[dict[str, object]] = [] + encoded_body = b"" if payload is None else json.dumps(payload).encode() + request_received = False + + async def receive() -> dict[str, object]: + nonlocal request_received + if request_received: + return {"type": "http.disconnect"} + + request_received = True + return {"type": "http.request", "body": encoded_body, "more_body": False} + + async def send(message: dict[str, object]) -> None: + messages.append(message) + + query_string = urlencode(query_params or {}).encode() + scope = { + "type": "http", + "asgi": {"version": "3.0"}, + "http_version": "1.1", + "method": method, + "scheme": "http", + "path": path, + "raw_path": path.encode(), + "query_string": query_string, + "headers": [(b"content-type", b"application/json")], + "client": ("testclient", 50000), + "server": ("testserver", 80), + "root_path": "", + } + + anyio.run(main_module.app, scope, receive, send) + + start_message = next(message for message in messages if message["type"] == "http.response.start") + body = b"".join( + message.get("body", b"") + for message in messages + if message["type"] == "http.response.body" + ) + return start_message["status"], json.loads(body) + + +def create_user(database_url: str, *, email: str) -> UUID: + user_id = uuid4() + with user_connection(database_url, user_id) as conn: + ContinuityStore(conn).create_user(user_id, email, email.split("@", 1)[0].title()) + return user_id + + +def seed_user_with_continuity(database_url: str, *, email: str) -> dict[str, object]: + user_id = create_user(database_url, email=email) + + with user_connection(database_url, user_id) as conn: + store = ContinuityStore(conn) + first_thread = store.create_thread("Alpha thread") + second_thread = store.create_thread("Beta thread") + first_session = store.create_session(second_thread["id"], status="completed") + second_session = store.create_session(second_thread["id"], status="active") + first_event = store.append_event( + second_thread["id"], + second_session["id"], + "message.user", + {"text": "Hello"}, + ) + second_event = store.append_event( + second_thread["id"], + second_session["id"], + "message.assistant", + {"text": "Hello back"}, + ) + + return { + "user_id": user_id, + "first_thread": first_thread, + "second_thread": second_thread, + "first_session": first_session, + "second_session": second_session, + "first_event": first_event, + "second_event": second_event, + } + + +def set_thread_timestamps( + admin_database_url: str, + *, + thread_id: UUID, + created_at: datetime, + updated_at: datetime, +) -> None: + with psycopg.connect(admin_database_url, autocommit=True) as conn: + with conn.cursor() as cur: + cur.execute( + "UPDATE threads SET created_at = %s, updated_at = %s WHERE id = %s", + (created_at, updated_at, thread_id), + ) + + +def set_session_timestamps( + admin_database_url: str, + *, + session_id: UUID, + started_at: datetime, + ended_at: datetime | None, + created_at: datetime, +) -> None: + with psycopg.connect(admin_database_url, autocommit=True) as conn: + with conn.cursor() as cur: + cur.execute( + "UPDATE sessions SET started_at = %s, ended_at = %s, created_at = %s WHERE id = %s", + (started_at, ended_at, created_at, session_id), + ) + + +def serialize_thread(*, thread_id: UUID, title: str, created_at: datetime, updated_at: datetime) -> dict[str, Any]: + return { + "id": str(thread_id), + "title": title, + "created_at": created_at.isoformat(), + "updated_at": updated_at.isoformat(), + } + + +def serialize_session( + *, + session_id: UUID, + thread_id: UUID, + status: str, + started_at: datetime | None, + ended_at: datetime | None, + created_at: datetime, +) -> dict[str, Any]: + return { + "id": str(session_id), + "thread_id": str(thread_id), + "status": status, + "started_at": None if started_at is None else started_at.isoformat(), + "ended_at": None if ended_at is None else ended_at.isoformat(), + "created_at": created_at.isoformat(), + } + + +def serialize_event(event: dict[str, Any]) -> dict[str, Any]: + return { + "id": str(event["id"]), + "thread_id": str(event["thread_id"]), + "session_id": None if event["session_id"] is None else str(event["session_id"]), + "sequence_no": event["sequence_no"], + "kind": event["kind"], + "payload": event["payload"], + "created_at": event["created_at"].isoformat(), + } + + +def test_thread_continuity_endpoints_create_list_detail_sessions_and_events( + migrated_database_urls, + monkeypatch, +) -> None: + seeded = seed_user_with_continuity(migrated_database_urls["app"], email="owner@example.com") + monkeypatch.setattr( + main_module, + "get_settings", + lambda: Settings(database_url=migrated_database_urls["app"]), + ) + + create_status, create_payload = invoke_request( + "POST", + "/v0/threads", + payload={ + "user_id": str(seeded["user_id"]), + "title": "Gamma thread", + }, + ) + + assert create_status == 201 + assert create_payload["thread"]["title"] == "Gamma thread" + + api_thread_id = UUID(create_payload["thread"]["id"]) + shared_created_at = datetime(2026, 3, 17, 9, 0, tzinfo=UTC) + newer_created_at = datetime(2026, 3, 17, 10, 0, tzinfo=UTC) + first_session_start = shared_created_at + first_session_end = shared_created_at + timedelta(minutes=5) + second_session_start = shared_created_at + timedelta(hours=1) + + set_thread_timestamps( + migrated_database_urls["admin"], + thread_id=seeded["first_thread"]["id"], + created_at=shared_created_at, + updated_at=shared_created_at, + ) + set_thread_timestamps( + migrated_database_urls["admin"], + thread_id=seeded["second_thread"]["id"], + created_at=shared_created_at, + updated_at=shared_created_at, + ) + set_thread_timestamps( + migrated_database_urls["admin"], + thread_id=api_thread_id, + created_at=newer_created_at, + updated_at=newer_created_at, + ) + set_session_timestamps( + migrated_database_urls["admin"], + session_id=seeded["first_session"]["id"], + started_at=first_session_start, + ended_at=first_session_end, + created_at=first_session_start, + ) + set_session_timestamps( + migrated_database_urls["admin"], + session_id=seeded["second_session"]["id"], + started_at=second_session_start, + ended_at=None, + created_at=second_session_start, + ) + + with user_connection(migrated_database_urls["app"], seeded["user_id"]) as conn: + stored_thread_ids = [thread["id"] for thread in ContinuityStore(conn).list_threads()] + + assert api_thread_id in stored_thread_ids + + list_status, list_payload = invoke_request( + "GET", + "/v0/threads", + query_params={"user_id": str(seeded["user_id"])}, + ) + detail_status, detail_payload = invoke_request( + "GET", + f"/v0/threads/{seeded['second_thread']['id']}", + query_params={"user_id": str(seeded["user_id"])}, + ) + sessions_status, sessions_payload = invoke_request( + "GET", + f"/v0/threads/{seeded['second_thread']['id']}/sessions", + query_params={"user_id": str(seeded["user_id"])}, + ) + events_status, events_payload = invoke_request( + "GET", + f"/v0/threads/{seeded['second_thread']['id']}/events", + query_params={"user_id": str(seeded["user_id"])}, + ) + tied_threads = sorted( + [seeded["first_thread"], seeded["second_thread"]], + key=lambda thread: (thread["created_at"], thread["id"]), + reverse=True, + ) + + assert list_status == 200 + assert list_payload == { + "items": [ + serialize_thread( + thread_id=api_thread_id, + title="Gamma thread", + created_at=newer_created_at, + updated_at=newer_created_at, + ), + serialize_thread( + thread_id=tied_threads[0]["id"], + title=tied_threads[0]["title"], + created_at=shared_created_at, + updated_at=shared_created_at, + ), + serialize_thread( + thread_id=tied_threads[1]["id"], + title=tied_threads[1]["title"], + created_at=shared_created_at, + updated_at=shared_created_at, + ), + ], + "summary": { + "total_count": 3, + "order": ["created_at_desc", "id_desc"], + }, + } + + assert detail_status == 200 + assert detail_payload == { + "thread": serialize_thread( + thread_id=seeded["second_thread"]["id"], + title="Beta thread", + created_at=shared_created_at, + updated_at=shared_created_at, + ) + } + + assert sessions_status == 200 + assert sessions_payload == { + "items": [ + serialize_session( + session_id=seeded["first_session"]["id"], + thread_id=seeded["second_thread"]["id"], + status="completed", + started_at=first_session_start, + ended_at=first_session_end, + created_at=first_session_start, + ), + serialize_session( + session_id=seeded["second_session"]["id"], + thread_id=seeded["second_thread"]["id"], + status="active", + started_at=second_session_start, + ended_at=None, + created_at=second_session_start, + ), + ], + "summary": { + "thread_id": str(seeded["second_thread"]["id"]), + "total_count": 2, + "order": ["started_at_asc", "created_at_asc", "id_asc"], + }, + } + + assert events_status == 200 + assert events_payload == { + "items": [ + serialize_event(seeded["first_event"]), + serialize_event(seeded["second_event"]), + ], + "summary": { + "thread_id": str(seeded["second_thread"]["id"]), + "total_count": 2, + "order": ["sequence_no_asc"], + }, + } + + +def test_thread_continuity_endpoints_enforce_user_isolation_and_not_found( + migrated_database_urls, + monkeypatch, +) -> None: + owner = seed_user_with_continuity(migrated_database_urls["app"], email="owner@example.com") + intruder_id = create_user(migrated_database_urls["app"], email="intruder@example.com") + monkeypatch.setattr( + main_module, + "get_settings", + lambda: Settings(database_url=migrated_database_urls["app"]), + ) + + list_status, list_payload = invoke_request( + "GET", + "/v0/threads", + query_params={"user_id": str(intruder_id)}, + ) + detail_status, detail_payload = invoke_request( + "GET", + f"/v0/threads/{owner['second_thread']['id']}", + query_params={"user_id": str(intruder_id)}, + ) + sessions_status, sessions_payload = invoke_request( + "GET", + f"/v0/threads/{owner['second_thread']['id']}/sessions", + query_params={"user_id": str(intruder_id)}, + ) + events_status, events_payload = invoke_request( + "GET", + f"/v0/threads/{owner['second_thread']['id']}/events", + query_params={"user_id": str(intruder_id)}, + ) + + assert list_status == 200 + assert list_payload == { + "items": [], + "summary": { + "total_count": 0, + "order": ["created_at_desc", "id_desc"], + }, + } + assert detail_status == 404 + assert detail_payload == {"detail": f"thread {owner['second_thread']['id']} was not found"} + assert sessions_status == 404 + assert sessions_payload == {"detail": f"thread {owner['second_thread']['id']} was not found"} + assert events_status == 404 + assert events_payload == {"detail": f"thread {owner['second_thread']['id']} was not found"} diff --git a/tests/unit/test_20260310_0001_foundation_continuity.py b/tests/unit/test_20260310_0001_foundation_continuity.py index 9ac3fc7..713bc44 100644 --- a/tests/unit/test_20260310_0001_foundation_continuity.py +++ b/tests/unit/test_20260310_0001_foundation_continuity.py @@ -57,3 +57,9 @@ def test_base_schema_does_not_create_redundant_events_sequence_index() -> None: module = load_migration_module() assert "CREATE INDEX events_thread_sequence_idx" not in module._UPGRADE_SCHEMA_STATEMENT + + +def test_base_schema_keeps_thread_created_index_for_deterministic_review_queries() -> None: + module = load_migration_module() + + assert "CREATE INDEX threads_user_created_idx" in module._UPGRADE_SCHEMA_STATEMENT diff --git a/tests/unit/test_events.py b/tests/unit/test_events.py index 7e64d9d..0b94b7d 100644 --- a/tests/unit/test_events.py +++ b/tests/unit/test_events.py @@ -1,10 +1,157 @@ from __future__ import annotations +from contextlib import contextmanager +from datetime import UTC, datetime, timedelta +import json +from uuid import UUID, uuid4 + import pytest +import apps.api.src.alicebot_api.main as main_module +from apps.api.src.alicebot_api.config import Settings from alicebot_api.store import AppendOnlyViolation, ContinuityStore +class ContinuityApiStoreStub: + def __init__(self, *, current_user_id: UUID) -> None: + self.current_user_id = current_user_id + self.base_time = datetime(2026, 3, 17, 9, 0, tzinfo=UTC) + self.threads: list[dict[str, object]] = [] + self.sessions: list[dict[str, object]] = [] + self.events: list[dict[str, object]] = [] + + def add_thread( + self, + *, + thread_id: UUID, + user_id: UUID, + title: str, + created_at: datetime | None = None, + updated_at: datetime | None = None, + ) -> dict[str, object]: + thread = { + "id": thread_id, + "user_id": user_id, + "title": title, + "created_at": created_at or self.base_time, + "updated_at": updated_at or created_at or self.base_time, + } + self.threads.append(thread) + return thread + + def add_session( + self, + *, + session_id: UUID, + user_id: UUID, + thread_id: UUID, + status: str, + started_at: datetime | None = None, + ended_at: datetime | None = None, + created_at: datetime | None = None, + ) -> dict[str, object]: + session = { + "id": session_id, + "user_id": user_id, + "thread_id": thread_id, + "status": status, + "started_at": started_at or self.base_time, + "ended_at": ended_at, + "created_at": created_at or started_at or self.base_time, + } + self.sessions.append(session) + return session + + def add_event( + self, + *, + event_id: UUID, + user_id: UUID, + thread_id: UUID, + session_id: UUID | None, + sequence_no: int, + kind: str, + payload: dict[str, object], + created_at: datetime | None = None, + ) -> dict[str, object]: + event = { + "id": event_id, + "user_id": user_id, + "thread_id": thread_id, + "session_id": session_id, + "sequence_no": sequence_no, + "kind": kind, + "payload": payload, + "created_at": created_at or self.base_time, + } + self.events.append(event) + return event + + def create_thread(self, title: str) -> dict[str, object]: + created_at = self.base_time + timedelta(minutes=len(self.threads)) + return self.add_thread( + thread_id=uuid4(), + user_id=self.current_user_id, + title=title, + created_at=created_at, + updated_at=created_at, + ) + + def list_threads(self) -> list[dict[str, object]]: + visible_threads = [ + thread for thread in self.threads if thread["user_id"] == self.current_user_id + ] + return sorted(visible_threads, key=lambda thread: (thread["created_at"], thread["id"]), reverse=True) + + def get_thread_optional(self, thread_id: UUID) -> dict[str, object] | None: + return next((thread for thread in self.list_threads() if thread["id"] == thread_id), None) + + def list_thread_sessions(self, thread_id: UUID) -> list[dict[str, object]]: + visible_sessions = [ + session + for session in self.sessions + if session["user_id"] == self.current_user_id and session["thread_id"] == thread_id + ] + return sorted( + visible_sessions, + key=lambda session: (session["started_at"], session["created_at"], session["id"]), + ) + + def list_thread_events(self, thread_id: UUID) -> list[dict[str, object]]: + visible_events = [ + event + for event in self.events + if event["user_id"] == self.current_user_id and event["thread_id"] == thread_id + ] + return sorted(visible_events, key=lambda event: (event["sequence_no"], event["id"])) + + +def install_continuity_api_stubs( + monkeypatch: pytest.MonkeyPatch, + stores: dict[UUID, ContinuityApiStoreStub], +) -> None: + settings = Settings(database_url="postgresql://app") + + class FakeConnection: + def __init__(self, current_user_id: UUID) -> None: + self.current_user_id = current_user_id + + @contextmanager + def fake_user_connection(database_url: str, current_user_id: UUID): + assert database_url == settings.database_url + yield FakeConnection(current_user_id) + + def fake_store_factory(conn: FakeConnection) -> ContinuityApiStoreStub: + return stores.setdefault( + conn.current_user_id, + ContinuityApiStoreStub(current_user_id=conn.current_user_id), + ) + + monkeypatch.setattr(main_module, "get_settings", lambda: settings) + monkeypatch.setattr(main_module, "user_connection", fake_user_connection) + monkeypatch.setattr(main_module, "ContinuityStore", fake_store_factory) + + def test_event_updates_are_rejected_by_contract(): store = ContinuityStore(conn=None) # type: ignore[arg-type] @@ -18,3 +165,200 @@ def test_event_deletes_are_rejected_by_contract(): with pytest.raises(AppendOnlyViolation, match="append-only"): store.delete_event("event-id") + +def test_thread_create_endpoint_persists_one_visible_thread(monkeypatch: pytest.MonkeyPatch) -> None: + owner_id = uuid4() + stores: dict[UUID, ContinuityApiStoreStub] = {} + install_continuity_api_stubs(monkeypatch, stores) + + response = main_module.create_thread( + main_module.CreateThreadRequest(user_id=owner_id, title="Operator Inbox") + ) + + assert response.status_code == 201 + assert json.loads(response.body) == { + "thread": { + "id": json.loads(response.body)["thread"]["id"], + "title": "Operator Inbox", + "created_at": "2026-03-17T09:00:00+00:00", + "updated_at": "2026-03-17T09:00:00+00:00", + } + } + assert [thread["title"] for thread in stores[owner_id].threads] == ["Operator Inbox"] + + +def test_thread_review_endpoints_preserve_shape_order_and_user_isolation( + monkeypatch: pytest.MonkeyPatch, +) -> None: + owner_id = uuid4() + intruder_id = uuid4() + owner_store = ContinuityApiStoreStub(current_user_id=owner_id) + intruder_store = ContinuityApiStoreStub(current_user_id=intruder_id) + stores = { + owner_id: owner_store, + intruder_id: intruder_store, + } + install_continuity_api_stubs(monkeypatch, stores) + + shared_created_at = owner_store.base_time + first_thread = owner_store.add_thread( + thread_id=UUID("00000000-0000-4000-8000-000000000001"), + user_id=owner_id, + title="Alpha thread", + created_at=shared_created_at, + updated_at=shared_created_at, + ) + second_thread = owner_store.add_thread( + thread_id=UUID("00000000-0000-4000-8000-000000000002"), + user_id=owner_id, + title="Beta thread", + created_at=shared_created_at, + updated_at=shared_created_at, + ) + first_session = owner_store.add_session( + session_id=UUID("10000000-0000-4000-8000-000000000001"), + user_id=owner_id, + thread_id=second_thread["id"], + status="completed", + started_at=shared_created_at, + ended_at=shared_created_at + timedelta(minutes=5), + created_at=shared_created_at, + ) + second_session = owner_store.add_session( + session_id=UUID("10000000-0000-4000-8000-000000000002"), + user_id=owner_id, + thread_id=second_thread["id"], + status="active", + started_at=shared_created_at + timedelta(hours=1), + ended_at=None, + created_at=shared_created_at + timedelta(hours=1), + ) + first_event = owner_store.add_event( + event_id=UUID("20000000-0000-4000-8000-000000000001"), + user_id=owner_id, + thread_id=second_thread["id"], + session_id=second_session["id"], + sequence_no=2, + kind="message.assistant", + payload={"text": "Hello back"}, + created_at=shared_created_at + timedelta(hours=1, minutes=1), + ) + second_event = owner_store.add_event( + event_id=UUID("20000000-0000-4000-8000-000000000002"), + user_id=owner_id, + thread_id=second_thread["id"], + session_id=second_session["id"], + sequence_no=1, + kind="message.user", + payload={"text": "Hello"}, + created_at=shared_created_at + timedelta(hours=1), + ) + + list_response = main_module.list_threads(owner_id) + detail_response = main_module.get_thread(second_thread["id"], owner_id) + sessions_response = main_module.list_thread_sessions(second_thread["id"], owner_id) + events_response = main_module.list_thread_events(second_thread["id"], owner_id) + intruder_list_response = main_module.list_threads(intruder_id) + intruder_detail_response = main_module.get_thread(second_thread["id"], intruder_id) + intruder_sessions_response = main_module.list_thread_sessions(second_thread["id"], intruder_id) + intruder_events_response = main_module.list_thread_events(second_thread["id"], intruder_id) + + assert json.loads(list_response.body) == { + "items": [ + { + "id": str(second_thread["id"]), + "title": "Beta thread", + "created_at": shared_created_at.isoformat(), + "updated_at": shared_created_at.isoformat(), + }, + { + "id": str(first_thread["id"]), + "title": "Alpha thread", + "created_at": shared_created_at.isoformat(), + "updated_at": shared_created_at.isoformat(), + }, + ], + "summary": { + "total_count": 2, + "order": ["created_at_desc", "id_desc"], + }, + } + assert json.loads(detail_response.body) == { + "thread": { + "id": str(second_thread["id"]), + "title": "Beta thread", + "created_at": shared_created_at.isoformat(), + "updated_at": shared_created_at.isoformat(), + } + } + assert json.loads(sessions_response.body) == { + "items": [ + { + "id": str(first_session["id"]), + "thread_id": str(second_thread["id"]), + "status": "completed", + "started_at": shared_created_at.isoformat(), + "ended_at": (shared_created_at + timedelta(minutes=5)).isoformat(), + "created_at": shared_created_at.isoformat(), + }, + { + "id": str(second_session["id"]), + "thread_id": str(second_thread["id"]), + "status": "active", + "started_at": (shared_created_at + timedelta(hours=1)).isoformat(), + "ended_at": None, + "created_at": (shared_created_at + timedelta(hours=1)).isoformat(), + }, + ], + "summary": { + "thread_id": str(second_thread["id"]), + "total_count": 2, + "order": ["started_at_asc", "created_at_asc", "id_asc"], + }, + } + assert json.loads(events_response.body) == { + "items": [ + { + "id": str(second_event["id"]), + "thread_id": str(second_thread["id"]), + "session_id": str(second_session["id"]), + "sequence_no": 1, + "kind": "message.user", + "payload": {"text": "Hello"}, + "created_at": (shared_created_at + timedelta(hours=1)).isoformat(), + }, + { + "id": str(first_event["id"]), + "thread_id": str(second_thread["id"]), + "session_id": str(second_session["id"]), + "sequence_no": 2, + "kind": "message.assistant", + "payload": {"text": "Hello back"}, + "created_at": (shared_created_at + timedelta(hours=1, minutes=1)).isoformat(), + }, + ], + "summary": { + "thread_id": str(second_thread["id"]), + "total_count": 2, + "order": ["sequence_no_asc"], + }, + } + assert json.loads(intruder_list_response.body) == { + "items": [], + "summary": { + "total_count": 0, + "order": ["created_at_desc", "id_desc"], + }, + } + assert intruder_detail_response.status_code == 404 + assert intruder_sessions_response.status_code == 404 + assert intruder_events_response.status_code == 404 + assert json.loads(intruder_detail_response.body) == { + "detail": f"thread {second_thread['id']} was not found" + } + assert json.loads(intruder_sessions_response.body) == { + "detail": f"thread {second_thread['id']} was not found" + } + assert json.loads(intruder_events_response.body) == { + "detail": f"thread {second_thread['id']} was not found" + }