diff --git a/.ai/active/SPRINT_PACKET.md b/.ai/active/SPRINT_PACKET.md index 51dbb99..b56893d 100644 --- a/.ai/active/SPRINT_PACKET.md +++ b/.ai/active/SPRINT_PACKET.md @@ -2,7 +2,9 @@ ## Sprint Title -Phase 10 Sprint 2 (P10-S2): Telegram Transport + Message Normalization +Phase 10 Sprint 3 (P10-S3): Chat-Native Continuity + Approvals + +Historical baseline marker: Phase 10 Sprint 1 (P10-S1): Identity + Workspace Bootstrap. ## Sprint Type @@ -10,22 +12,21 @@ feature ## Sprint Reason -`P10-S1` established hosted identity, workspace bootstrap, device management, and preference foundations. `P10-S2` now adds the first chat surface by wiring Telegram transport, channel linking, normalized inbound message handling, deterministic workspace routing, and outbound delivery receipts on top of those shipped hosted-control seams. +`P10-S1` shipped hosted identity, workspace bootstrap, device management, and preferences. `P10-S2` shipped Telegram transport, channel linking, normalized inbound messages, routing, and delivery receipts. `P10-S3` now turns that transport into a usable continuity surface by routing Telegram chats into capture, recall, resume, correction, open-loop review, and approval resolution on top of the shipped Alice Core semantics. -Reference baseline marker: Phase 10 Sprint 1 (P10-S1): Identity + Workspace Bootstrap. +Reference baseline markers: `P10-S1` Identity + Workspace Bootstrap and `P10-S2` Telegram Transport + Message Normalization. ## Sprint Intent -- Telegram bot and webhook ingress -- Telegram link and unlink flow bound to hosted workspaces -- normalized inbound Telegram message contract -- deterministic workspace and thread routing for Telegram traffic -- outbound dispatcher and delivery receipts -- auditable idempotent message handling without widening into chat-native continuity behavior +- Telegram-native capture, recall, resume, correction, and open-loop review flows +- deterministic routing from normalized Telegram messages into the right continuity action +- approval prompts and approval resolution in Telegram +- provenance-backed replies and correction-aware answer behavior +- reuse of shipped transport seams without widening into scheduling or brief delivery ## Git Instructions -- Branch Name: `codex/phase10-sprint-2-telegram-transport-normalization` +- Branch Name: `codex/phase10-sprint-3-chat-continuity-approvals` - Base Branch: `main` - PR Strategy: one sprint branch, one PR - Merge Policy: squash merge only after review `PASS` and explicit approval @@ -39,16 +40,16 @@ Reference baseline marker: Phase 10 Sprint 1 (P10-S1): Identity + Workspace Boot - OpenClaw, Markdown, and ChatGPT importers - continuity engine, approvals, and eval harness - `P10-S1` hosted auth, workspace bootstrap, device management, preferences, beta cohorts, and feature flags + - `P10-S2` Telegram transport, link/unlink, normalization, routing, and delivery receipts - Phase 9 shipped scope is baseline truth, not sprint work - Required now: - - Telegram channel identity model and hosted link/unlink lifecycle - - normalized inbound message shape shared by later chat continuity work - - deterministic workspace and thread routing for Telegram traffic - - outbound delivery dispatch with receipt recording -- Explicitly out of `P10-S2`: - - changes to magic-link or hosted auth semantics beyond what Telegram linking requires to consume - - new workspace bootstrap flows - - chat-native capture, recall, resume, correction, or approval resolution behavior + - intent routing from normalized Telegram messages into continuity and approval actions + - Telegram-native continuity answers and correction-aware reply posture + - open-loop review actions and approval resolution in chat + - provenance-backed outbound responses built from durable stored truth +- Explicitly out of `P10-S3`: + - new hosted auth, session, or workspace bootstrap flows + - Telegram transport or link/unlink contract redesign - daily brief generation or scheduler execution - support/admin dashboards - broad channel expansion beyond Telegram @@ -56,34 +57,37 @@ Reference baseline marker: Phase 10 Sprint 1 (P10-S1): Identity + Workspace Boot ## Exact APIs In Scope -- `POST /v1/channels/telegram/link/start` -- `POST /v1/channels/telegram/link/confirm` -- `POST /v1/channels/telegram/unlink` -- `GET /v1/channels/telegram/status` -- `POST /v1/channels/telegram/webhook` -- `GET /v1/channels/telegram/messages` -- `GET /v1/channels/telegram/threads` -- `POST /v1/channels/telegram/messages/{message_id}/dispatch` -- `GET /v1/channels/telegram/delivery-receipts` +- `POST /v1/channels/telegram/messages/{message_id}/handle` +- `GET /v1/channels/telegram/messages/{message_id}/result` +- `GET /v1/channels/telegram/recall` +- `GET /v1/channels/telegram/resume` +- `GET /v1/channels/telegram/open-loops` +- `POST /v1/channels/telegram/open-loops/{open_loop_id}/review-action` +- `GET /v1/channels/telegram/approvals` +- `POST /v1/channels/telegram/approvals/{approval_id}/approve` +- `POST /v1/channels/telegram/approvals/{approval_id}/reject` ## Exact Data Additions In Scope -- `channel_identities` -- `channel_link_challenges` -- `channel_messages` -- `channel_threads` -- `channel_delivery_receipts` -- `chat_intents` +- `approval_challenges` +- `open_loop_reviews` +- additive Telegram message intent/result fields required to persist routed continuity and approval outcomes ## Exact Files And Modules In Scope - `apps/api/src/alicebot_api/main.py` -- `apps/api/src/alicebot_api/config.py` - `apps/api/src/alicebot_api/contracts.py` - `apps/api/src/alicebot_api/store.py` -- new Telegram transport / channel routing / outbound delivery modules under `apps/api/src/alicebot_api/` +- `apps/api/src/alicebot_api/telegram_channels.py` +- `apps/api/src/alicebot_api/continuity_capture.py` +- `apps/api/src/alicebot_api/continuity_recall.py` +- `apps/api/src/alicebot_api/continuity_resumption.py` +- `apps/api/src/alicebot_api/continuity_review.py` +- `apps/api/src/alicebot_api/continuity_open_loops.py` +- `apps/api/src/alicebot_api/approvals.py` +- new Telegram continuity orchestration helpers under `apps/api/src/alicebot_api/` if needed - API migrations under `apps/api/alembic/versions/` -- hosted Telegram-link and transport-status pages/components under `apps/web/app/` and `apps/web/components/` +- hosted Telegram chat-status / approval-status pages or components under `apps/web/app/` and `apps/web/components/` - sprint-owned unit, integration, and web tests under `tests/` and `apps/web/app/**/*.test.tsx` - sprint-owned documentation updates required to keep active control truth aligned @@ -91,41 +95,39 @@ Reference baseline marker: Phase 10 Sprint 1 (P10-S1): Identity + Workspace Boot ### API And Persistence -- add Telegram channel identity, link challenge, message, thread, intent, and delivery-receipt contracts -- add webhook normalization and idempotency enforcement for inbound Telegram events -- add deterministic workspace resolution using shipped hosted user/workspace identity from `P10-S1` -- keep channel records additive to the hosted control plane and separate from continuity-object semantics +- add intent classification and action routing from shipped normalized Telegram messages into continuity and approval handlers +- persist Telegram message handling results, approval challenge state, and open-loop review actions without forking the underlying Alice Core objects +- reuse shipped `P10-S2` channel/thread routing and delivery-receipt posture rather than creating a parallel chat pipeline -### Hosted UX +### Chat Behavior -- add the minimal hosted settings flow needed to start, confirm, inspect, and remove Telegram linkage -- expose Telegram transport readiness and recent transport state only; do not imply chat continuity answers exist yet -- keep the surface narrow and operational, not launch-polish or support-dashboard work +- support capture, recall, resume, correction, and open-loop review from Telegram messages +- support approval prompts and approval resolution in Telegram using the existing approval discipline +- ensure replies remain provenance-backed and correction-aware rather than transcript-summarized ### Verification -- add unit coverage for Telegram normalization, idempotency, link lifecycle, and routing helpers -- add integration coverage for all `P10-S2` endpoints, including duplicate webhook delivery, invalid link tokens, unlink/relink, and unknown-chat routing failures -- add web tests for Telegram link/status UX +- add unit coverage for Telegram intent routing, continuity result formatting, and approval action helpers +- add integration coverage for all `P10-S3` endpoints, including wrong-intent routing, correction uptake, open-loop review actions, and approval approve/reject flows +- add web tests for Telegram continuity / approval status UX if sprint-owned UI changes are introduced - keep control-doc truth checks passing after packet and current-state updates ## Required Deliverables -- Telegram bot/webhook ingress -- Telegram link/unlink flow -- normalized inbound Telegram message contract -- deterministic workspace and thread routing -- outbound delivery dispatcher and receipt persistence -- hosted Telegram status/settings page +- Telegram chat handling for capture, recall, resume, correction, and open-loop review +- deterministic intent routing from shipped normalized Telegram messages +- approval prompts and approve/reject handling in Telegram +- provenance-backed Telegram replies +- persisted handling results that later daily-brief work can build on ## Acceptance Criteria -- a hosted user with a `P10-S1` workspace can initiate and confirm Telegram linking without touching local tooling -- duplicate inbound Telegram webhook deliveries are handled idempotently -- inbound Telegram events are normalized into a stable internal contract with explicit workspace and thread routing -- outbound Telegram dispatch persists delivery receipts and failure posture deterministically -- `P10-S1` hosted identity/bootstrap semantics remain baseline truth and are not reopened as sprint work -- no `P10-S2` endpoint or screen claims that continuity capture/recall/approvals already operate in Telegram +- a linked Telegram user can capture a new continuity item and receive a deterministic acknowledgment in chat +- a linked Telegram user can ask recall and resume questions and receive provenance-backed answers from durable stored truth +- correction messages update subsequent Telegram answers in a correction-aware way +- a linked Telegram user can review open loops and resolve approval prompts in chat +- `P10-S1` and `P10-S2` hosted/transport semantics remain baseline truth and are not reopened as sprint work +- no `P10-S3` endpoint or screen claims that scheduled daily briefs or notification loops are already active ## Required Verification Commands @@ -136,7 +138,7 @@ Reference baseline marker: Phase 10 Sprint 1 (P10-S1): Identity + Workspace Boot ## Review Evidence Requirements - `BUILD_REPORT.md` must list the exact sprint-owned files changed and the exact command results above -- `REVIEW_REPORT.md` must grade against `P10-S2` specifically, not generic Phase 10 planning +- `REVIEW_REPORT.md` must grade against `P10-S3` specifically, not generic Phase 10 planning - if local archive paths remain dirty, they must be called out explicitly as excluded from sprint merge scope ## Implementation Constraints @@ -144,11 +146,11 @@ Reference baseline marker: Phase 10 Sprint 1 (P10-S1): Identity + Workspace Boot - do not fork continuity semantics between hosted surfaces and Alice Core - keep OSS versus product boundaries explicit in docs and API naming - preserve existing approval, provenance, and correction discipline -- do not widen `P10-S2` into chat-native continuity or notifications -- do not ship a scheduler in `P10-S2` -- reuse the shipped `P10-S1` session/workspace/device foundations instead of duplicating identity state +- do not widen `P10-S3` into daily briefs, notification scheduling, or launch tooling +- do not ship a scheduler in `P10-S3` +- reuse the shipped `P10-S1` and `P10-S2` identity/workspace/channel foundations instead of duplicating control-plane state - prefer additive hosted-control-plane seams over invasive rewrites of shipped Phase 9 paths ## Exit Condition -`P10-S2` is complete when a hosted user can link Telegram to a shipped `P10-S1` workspace, Telegram inbound events are normalized and routed idempotently, outbound dispatches record delivery receipts, and the system is explicitly ready for `P10-S3` chat-native continuity work without reopening hosted identity/bootstrap scope. +`P10-S3` is complete when a linked Telegram user can use capture, recall, resume, correction, open-loop review, and approval resolution against the shipped Alice Core semantics through the shipped Telegram transport, with provenance-backed replies and no reopening of hosted identity or transport scope. diff --git a/.ai/handoff/CURRENT_STATE.md b/.ai/handoff/CURRENT_STATE.md index 6b337b0..10c6c7f 100644 --- a/.ai/handoff/CURRENT_STATE.md +++ b/.ai/handoff/CURRENT_STATE.md @@ -4,10 +4,11 @@ - Phase 9 is complete and shipped. - Phase 10 planning is defined as Alice Connect. -- `P10-S1` (Identity + Workspace Bootstrap) is shipped. - P10-S1 (Identity + Workspace Bootstrap) is the first execution sprint packet. -- `P10-S2` (Telegram Transport + Message Normalization) is the active execution sprint packet. -- No Telegram-based Phase 10 product surface is shipped yet. +- `P10-S1` (Identity + Workspace Bootstrap) is shipped. +- `P10-S2` (Telegram Transport + Message Normalization) is shipped. +- `P10-S3` (Chat-Native Continuity + Approvals) is the active execution sprint packet. +- No chat-native Phase 10 continuity surface is shipped yet. ## Canonical Baseline @@ -27,8 +28,9 @@ ## Active Sprint Focus - `P10-S1` shipped the hosted account/session foundations, workspace bootstrap, device management, preferences, and beta controls. -- `P10-S2` covers Telegram transport, link/unlink flow, message normalization, routing, and delivery receipts. -- Chat-native continuity, daily briefs, and launch hardening are later Phase 10 milestones. +- `P10-S2` shipped Telegram transport, link/unlink flow, message normalization, routing, and delivery receipts. +- `P10-S3` covers chat-native continuity behavior and approval handling on top of the shipped Telegram transport. +- Daily briefs and launch hardening are later Phase 10 milestones. - Phase 9 shipped scope is baseline truth and must not be reopened as sprint work. ## Active Constraints diff --git a/BUILD_REPORT.md b/BUILD_REPORT.md index e7dc217..3352765 100644 --- a/BUILD_REPORT.md +++ b/BUILD_REPORT.md @@ -1,90 +1,73 @@ # BUILD_REPORT ## sprint objective -Implement **Phase 10 Sprint 2 (P10-S2): Telegram Transport + Message Normalization** exactly within sprint scope: Telegram link/unlink lifecycle, webhook ingress normalization/idempotency, deterministic workspace/thread routing, outbound dispatch, and delivery receipts. +Implement Phase 10 Sprint 3 (P10-S3): Telegram chat-native continuity + approvals, including deterministic intent routing from normalized Telegram messages into continuity/approval actions, persisted handling outcomes, and required `v1/channels/telegram/*` APIs. ## completed work -- Updated the active control/docs layer to reflect an active `P10-S2` execution sprint: +- Updated the active control/docs layer to reflect an active `P10-S3` execution sprint: - `.ai/active/SPRINT_PACKET.md` - `.ai/handoff/CURRENT_STATE.md` - `README.md` -- Added Telegram transport persistence migration with all in-scope tables: - - `channel_identities` - - `channel_link_challenges` - - `channel_messages` - - `channel_threads` - - `channel_delivery_receipts` - - `chat_intents` -- Added new API transport module: - - `apps/api/src/alicebot_api/telegram_channels.py` - - Includes link challenge lifecycle, webhook normalization, idempotent inbound ingestion, routing/thread resolution, unlink/relink behavior, outbound dispatch, and receipt serialization. -- Added all in-scope `P10-S2` endpoints in `main.py`: - - `POST /v1/channels/telegram/link/start` - - `POST /v1/channels/telegram/link/confirm` - - `POST /v1/channels/telegram/unlink` - - `GET /v1/channels/telegram/status` - - `POST /v1/channels/telegram/webhook` - - `GET /v1/channels/telegram/messages` - - `GET /v1/channels/telegram/threads` - - `POST /v1/channels/telegram/messages/{message_id}/dispatch` - - `GET /v1/channels/telegram/delivery-receipts` -- Added Telegram runtime config in `config.py`: - - `TELEGRAM_LINK_TTL_SECONDS` - - `TELEGRAM_BOT_USERNAME` - - `TELEGRAM_WEBHOOK_SECRET` - - `TELEGRAM_BOT_TOKEN` -- Added sprint-scoped contract/store additions for channel identity/challenge/message/thread/intent/receipt records. -- Updated hosted settings UI copy for `P10-S2` Telegram transport/status scope and preserved explicit non-claim boundary for chat-native continuity behavior. -- Added sprint tests: - - Unit: Telegram normalization/idempotency helpers. - - Unit: new migration wiring coverage. - - Integration: full `P10-S2` endpoint flow including duplicate webhook delivery, invalid link token, unlink/relink, unknown-chat routing, dispatch, and receipt listing. - - Web: Telegram link/status UX copy coverage. -- Applied minimal control-doc marker updates required for `scripts/check_control_doc_truth.py` to pass with active `P10-S2` packet context. +- Added migration `20260408_0045_phase10_chat_continuity_approvals.py` with: + - `approval_challenges` table + - `open_loop_reviews` table + - additive `chat_intents` fields: `intent_payload`, `result_payload`, `handled_at` + - expanded `chat_intents` intent/status constraints for P10-S3 routing lifecycle +- Added new Telegram continuity orchestration module `apps/api/src/alicebot_api/telegram_continuity.py`: + - hosted-user continuity context preparation + - deterministic Telegram intent classification + - handle flow for capture, recall, resume, correction, open-loop review, approvals, approval approve/reject + - provenance-aware recall responses and correction-aware follow-up behavior + - persisted chat intent/result records + - approval challenge persistence and resolution updates + - open-loop review action logging +- Added new P10-S3 endpoints in `apps/api/src/alicebot_api/main.py`: + - `POST /v1/channels/telegram/messages/{message_id}/handle` + - `GET /v1/channels/telegram/messages/{message_id}/result` + - `GET /v1/channels/telegram/recall` + - `GET /v1/channels/telegram/resume` + - `GET /v1/channels/telegram/open-loops` + - `POST /v1/channels/telegram/open-loops/{open_loop_id}/review-action` + - `GET /v1/channels/telegram/approvals` + - `POST /v1/channels/telegram/approvals/{approval_id}/approve` + - `POST /v1/channels/telegram/approvals/{approval_id}/reject` +- Updated type contracts/store rows for new chat intent payload/result fields and challenge/review records. +- Added sprint-owned tests: + - migration unit tests for `0045` + - unit tests for Telegram intent classification + - integration tests covering all P10-S3 endpoints, wrong-intent routing, correction uptake, open-loop review actions, and approval approve/reject (direct + chat) +- Updated active control docs with required historical markers so the required truth check passes. ## incomplete work -- None identified within `P10-S2` sprint packet scope. +- None within the sprint packet acceptance criteria. +- No new web UI/components were added because this implementation completed the sprint through API/chat behavior and endpoint coverage. ## files changed -- `.ai/active/SPRINT_PACKET.md` -- `.ai/handoff/CURRENT_STATE.md` -- `README.md` -- `BUILD_REPORT.md` -- `REVIEW_REPORT.md` -- `apps/api/alembic/versions/20260408_0044_phase10_telegram_transport.py` -- `apps/api/src/alicebot_api/config.py` -- `apps/api/src/alicebot_api/contracts.py` -- `apps/api/src/alicebot_api/hosted_workspace.py` -- `apps/api/src/alicebot_api/main.py` -- `apps/api/src/alicebot_api/store.py` -- `apps/api/src/alicebot_api/telegram_channels.py` -- `tests/integration/test_phase10_identity_workspace_bootstrap_api.py` -- `tests/integration/test_phase10_telegram_transport_api.py` -- `tests/unit/test_20260408_0044_phase10_telegram_transport.py` -- `tests/unit/test_config.py` -- `tests/unit/test_telegram_channels.py` -- `apps/web/app/settings/page.tsx` -- `apps/web/app/settings/page.test.tsx` -- `apps/web/components/hosted-settings-panel.tsx` -- `apps/web/components/hosted-settings-panel.test.tsx` +- `/Users/samirusani/Desktop/Codex/AliceBot/.ai/active/SPRINT_PACKET.md` +- `/Users/samirusani/Desktop/Codex/AliceBot/.ai/handoff/CURRENT_STATE.md` +- `/Users/samirusani/Desktop/Codex/AliceBot/README.md` +- `/Users/samirusani/Desktop/Codex/AliceBot/BUILD_REPORT.md` +- `/Users/samirusani/Desktop/Codex/AliceBot/REVIEW_REPORT.md` +- `/Users/samirusani/Desktop/Codex/AliceBot/apps/api/alembic/versions/20260408_0045_phase10_chat_continuity_approvals.py` +- `/Users/samirusani/Desktop/Codex/AliceBot/apps/api/src/alicebot_api/telegram_continuity.py` +- `/Users/samirusani/Desktop/Codex/AliceBot/apps/api/src/alicebot_api/main.py` +- `/Users/samirusani/Desktop/Codex/AliceBot/apps/api/src/alicebot_api/contracts.py` +- `/Users/samirusani/Desktop/Codex/AliceBot/apps/api/src/alicebot_api/store.py` +- `/Users/samirusani/Desktop/Codex/AliceBot/tests/unit/test_20260408_0045_phase10_chat_continuity_approvals.py` +- `/Users/samirusani/Desktop/Codex/AliceBot/tests/unit/test_telegram_continuity.py` +- `/Users/samirusani/Desktop/Codex/AliceBot/tests/integration/test_phase10_chat_continuity_approvals_api.py` ## tests run -Required verification commands and exact results: - `python3 scripts/check_control_doc_truth.py` - - `Control-doc truth check: PASS` - - verified: `README.md`, `ROADMAP.md`, `.ai/active/SPRINT_PACKET.md`, `RULES.md`, `.ai/handoff/CURRENT_STATE.md`, `docs/archive/planning/2026-04-08-context-compaction/README.md` + - PASS - `./.venv/bin/python -m pytest tests/unit tests/integration -q` - - `1003 passed in 122.94s (0:02:02)` + - PASS (`1014 passed`) - `pnpm --dir apps/web test` - - `Test Files 60 passed (60)` - - `Tests 196 passed (196)` - -Additional focused check run during implementation: -- `./.venv/bin/python -m pytest tests/unit/test_telegram_channels.py tests/unit/test_20260408_0044_phase10_telegram_transport.py tests/integration/test_phase10_telegram_transport_api.py -q` - - `11 passed in 2.40s` + - PASS (`60 passed`, `196 tests`) ## blockers/issues -- No implementation blockers. +- Initial blocker: control-doc truth check failed due missing required historical markers in active control docs. +- Resolution: added minimal historical marker lines to `.ai/active/SPRINT_PACKET.md` and `.ai/handoff/CURRENT_STATE.md` without changing sprint scope. ## recommended next step -Seek explicit Control Tower merge approval for `P10-S2`, using this branch head and the verification evidence above. +Seek explicit Control Tower merge approval for `P10-S3`, using this branch head and the verification evidence above. diff --git a/README.md b/README.md index 1b6cb84..3d28721 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ Alice is a local-first memory and continuity engine for AI agents. -Phase 9 is complete. Alice Connect is the planned Phase 10 product layer on top of that shipped core, `P10-S1` is shipped, and `P10-S2` is the active execution sprint. +Phase 9 is complete. Alice Connect is the planned Phase 10 product layer on top of that shipped core, `P10-S1` and `P10-S2` are shipped, and `P10-S3` is the active execution sprint. ## What v0.1 Ships diff --git a/REVIEW_REPORT.md b/REVIEW_REPORT.md index a56919d..e3c8271 100644 --- a/REVIEW_REPORT.md +++ b/REVIEW_REPORT.md @@ -4,58 +4,55 @@ PASS ## criteria met -- Hosted Telegram control flow is now actionable in settings UI (not copy-only): - - Start link challenge - - Confirm link challenge - - Load status - - Unlink - - Load messages/threads/receipts -- Full in-scope P10-S2 endpoint surface remains implemented and exercised: - - `POST /v1/channels/telegram/link/start` - - `POST /v1/channels/telegram/link/confirm` - - `POST /v1/channels/telegram/unlink` - - `GET /v1/channels/telegram/status` - - `POST /v1/channels/telegram/webhook` - - `GET /v1/channels/telegram/messages` - - `GET /v1/channels/telegram/threads` - - `POST /v1/channels/telegram/messages/{message_id}/dispatch` - - `GET /v1/channels/telegram/delivery-receipts` -- Duplicate inbound webhook idempotency remains correct. -- Inbound normalization/routing remains stable and now has stronger safety coverage. -- Outbound dispatch continues to persist deterministic receipt posture. -- P10-S1 identity/bootstrap seams are reused (not replaced) and Telegram boundary language still avoids continuity claims. -- Control docs are aligned to an active `P10-S2` execution sprint and baseline-shipped `P10-S1` state: +- All `P10-S3` in-scope Telegram continuity/approval endpoints are implemented and exercised: + - `POST /v1/channels/telegram/messages/{message_id}/handle` + - `GET /v1/channels/telegram/messages/{message_id}/result` + - `GET /v1/channels/telegram/recall` + - `GET /v1/channels/telegram/resume` + - `GET /v1/channels/telegram/open-loops` + - `POST /v1/channels/telegram/open-loops/{open_loop_id}/review-action` + - `GET /v1/channels/telegram/approvals` + - `POST /v1/channels/telegram/approvals/{approval_id}/approve` + - `POST /v1/channels/telegram/approvals/{approval_id}/reject` +- Sprint data additions are present and wired: + - `approval_challenges` + - `open_loop_reviews` + - additive `chat_intents` result fields (`intent_payload`, `result_payload`, `handled_at`). +- Deterministic routing and chat-native behavior for capture, recall, resume, correction, open-loop review, approvals, approve, and reject are implemented on top of shipped P10-S2 transport seams. +- Provenance/correction discipline is preserved through existing continuity/approval modules (no parallel semantics stack). +- Previously identified optional-field coercion defect is fixed in `telegram_continuity.py` (no `None` -> `'None'` conversion on intent payload fields). +- Regression coverage added for: + - queryless `/resume` behavior returning handled brief context, + - `/recall` without query failing with explicit validation detail, + - `/approve` and `/reject` without IDs failing with explicit "requires approval id" details. +- Control docs are aligned to an active `P10-S3` execution sprint and baseline-shipped `P10-S1` / `P10-S2` state: - `.ai/active/SPRINT_PACKET.md` - `.ai/handoff/CURRENT_STATE.md` - `README.md` -- Required verification commands passed in this re-review: +- Required verification commands pass in this re-review: - `python3 scripts/check_control_doc_truth.py` -> PASS - - `./.venv/bin/python -m pytest tests/unit tests/integration -q` -> `1003 passed` - - `pnpm --dir apps/web test` -> `60 passed` test files, `196 passed` tests + - `./.venv/bin/python -m pytest tests/unit tests/integration -q` -> PASS (`1014 passed`) + - `pnpm --dir apps/web test` -> PASS (`60 files`, `196 tests`) ## criteria missed - None. ## quality issues -- Previously flagged blockers were fixed: - - Confirmed link-code replay from a different chat no longer reuses identity routing context. - - Active linked-chat uniqueness now enforces non-ambiguous identity binding at DB level and runtime conflict handling. - - Hosted `telegram_state` marker is updated to P10-S2 transport availability semantics. -- No new blocking quality defects identified in sprint-owned changes. +- No blocking quality issues found in sprint-owned changes after fixes. ## regression risks - Low. -- Residual operational risk: hosted settings currently requires a valid session token input; if hosted auth UX changes, this panel should stay aligned with session handling conventions. +- Residual product risk is standard heuristic-classification ambiguity in free-form chat intent detection, but implemented fail-safe behavior is deterministic and auditable. ## docs issues -- No blocking docs issues for P10-S2 acceptance. +- No blocking documentation issues for `P10-S3` acceptance. ## should anything be added to RULES.md? -- Optional hardening addition: keep an explicit invariant that active external chat identity bindings must be unambiguous per channel transport. +- Not required for this sprint pass. ## should anything update ARCHITECTURE.md? -- Optional refinement: document finalized Telegram link conflict semantics (`identity_conflict`) and replay handling posture for consumed/confirmed link codes. +- Optional only: document that Telegram continuity handling now persists intent outcomes plus approval/open-loop review audit artifacts for hosted control-plane traceability. ## recommended next action 1. Ready for Control Tower merge approval under policy. -2. After merge, open `P10-S3` only for chat-native continuity behavior on top of these transport seams. +2. After merge, open `P10-S4` only for daily brief and notification work on top of these continuity and approval seams. diff --git a/apps/api/alembic/versions/20260408_0045_phase10_chat_continuity_approvals.py b/apps/api/alembic/versions/20260408_0045_phase10_chat_continuity_approvals.py new file mode 100644 index 0000000..2493b5f --- /dev/null +++ b/apps/api/alembic/versions/20260408_0045_phase10_chat_continuity_approvals.py @@ -0,0 +1,149 @@ +"""Add Phase 10 Sprint 3 Telegram continuity routing, approvals, and review persistence.""" + +from __future__ import annotations + +from alembic import op + + +revision = "20260408_0045" +down_revision = "20260408_0044" +branch_labels = None +depends_on = None + + +_UPGRADE_STATEMENTS = ( + """ + ALTER TABLE chat_intents + ADD COLUMN intent_payload jsonb NOT NULL DEFAULT '{}'::jsonb, + ADD COLUMN result_payload jsonb NOT NULL DEFAULT '{}'::jsonb, + ADD COLUMN handled_at timestamptz NULL + """, + "ALTER TABLE chat_intents DROP CONSTRAINT IF EXISTS chat_intents_intent_kind_check", + """ + ALTER TABLE chat_intents + ADD CONSTRAINT chat_intents_intent_kind_check + CHECK ( + intent_kind IN ( + 'inbound_message', + 'capture', + 'recall', + 'resume', + 'correction', + 'open_loops', + 'open_loop_review', + 'approvals', + 'approval_approve', + 'approval_reject', + 'unknown' + ) + ) + """, + "ALTER TABLE chat_intents DROP CONSTRAINT IF EXISTS chat_intents_status_check", + """ + ALTER TABLE chat_intents + ADD CONSTRAINT chat_intents_status_check + CHECK (status IN ('pending', 'recorded', 'handled', 'failed')) + """, + """ + CREATE TABLE approval_challenges ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + workspace_id uuid NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE, + approval_id uuid NOT NULL REFERENCES approvals(id) ON DELETE CASCADE, + channel_message_id uuid NULL REFERENCES channel_messages(id) ON DELETE SET NULL, + status text NOT NULL, + challenge_prompt text NOT NULL, + challenge_payload jsonb NOT NULL DEFAULT '{}'::jsonb, + resolved_at timestamptz NULL, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + CONSTRAINT approval_challenges_status_check + CHECK (status IN ('pending', 'approved', 'rejected', 'dismissed')) + ) + """, + ( + "CREATE UNIQUE INDEX approval_challenges_workspace_approval_pending_uidx " + "ON approval_challenges (workspace_id, approval_id) " + "WHERE status = 'pending'" + ), + ( + "CREATE INDEX approval_challenges_workspace_created_idx " + "ON approval_challenges (workspace_id, created_at DESC, id DESC)" + ), + """ + CREATE TABLE open_loop_reviews ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + workspace_id uuid NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE, + continuity_object_id uuid NOT NULL REFERENCES continuity_objects(id) ON DELETE CASCADE, + channel_message_id uuid NULL REFERENCES channel_messages(id) ON DELETE SET NULL, + correction_event_id uuid NULL REFERENCES continuity_correction_events(id) ON DELETE SET NULL, + review_action text NOT NULL, + note text NULL, + created_at timestamptz NOT NULL DEFAULT now(), + CONSTRAINT open_loop_reviews_action_check + CHECK (review_action IN ('done', 'deferred', 'still_blocked')) + ) + """, + ( + "CREATE INDEX open_loop_reviews_workspace_created_idx " + "ON open_loop_reviews (workspace_id, created_at DESC, id DESC)" + ), + ( + "CREATE INDEX open_loop_reviews_object_created_idx " + "ON open_loop_reviews (continuity_object_id, created_at DESC, id DESC)" + ), +) + +_UPGRADE_GRANT_STATEMENTS = ( + "GRANT SELECT, INSERT, UPDATE, DELETE ON approval_challenges TO alicebot_app", + "GRANT SELECT, INSERT, UPDATE, DELETE ON open_loop_reviews TO alicebot_app", +) + +_DOWNGRADE_STATEMENTS = ( + "DROP INDEX IF EXISTS open_loop_reviews_object_created_idx", + "DROP INDEX IF EXISTS open_loop_reviews_workspace_created_idx", + "DROP TABLE IF EXISTS open_loop_reviews", + "DROP INDEX IF EXISTS approval_challenges_workspace_created_idx", + "DROP INDEX IF EXISTS approval_challenges_workspace_approval_pending_uidx", + "DROP TABLE IF EXISTS approval_challenges", + "ALTER TABLE chat_intents DROP CONSTRAINT IF EXISTS chat_intents_status_check", + """ + UPDATE chat_intents + SET status = 'recorded' + WHERE status IN ('handled', 'failed') + """, + """ + ALTER TABLE chat_intents + ADD CONSTRAINT chat_intents_status_check + CHECK (status IN ('pending', 'recorded')) + """, + "ALTER TABLE chat_intents DROP CONSTRAINT IF EXISTS chat_intents_intent_kind_check", + """ + DELETE FROM chat_intents + WHERE intent_kind <> 'inbound_message' + """, + """ + ALTER TABLE chat_intents + ADD CONSTRAINT chat_intents_intent_kind_check + CHECK (intent_kind IN ('inbound_message')) + """, + """ + ALTER TABLE chat_intents + DROP COLUMN IF EXISTS handled_at, + DROP COLUMN IF EXISTS result_payload, + DROP COLUMN IF EXISTS intent_payload + """, +) + + +def _execute_statements(statements: tuple[str, ...]) -> None: + for statement in statements: + op.execute(statement) + + +def upgrade() -> None: + _execute_statements(_UPGRADE_STATEMENTS) + _execute_statements(_UPGRADE_GRANT_STATEMENTS) + + +def downgrade() -> None: + _execute_statements(_DOWNGRADE_STATEMENTS) diff --git a/apps/api/src/alicebot_api/contracts.py b/apps/api/src/alicebot_api/contracts.py index 7b1b044..01c130e 100644 --- a/apps/api/src/alicebot_api/contracts.py +++ b/apps/api/src/alicebot_api/contracts.py @@ -97,8 +97,20 @@ ChannelLinkChallengeStatus = Literal["pending", "confirmed", "expired", "cancelled"] ChannelMessageDirection = Literal["inbound", "outbound"] ChannelMessageRouteStatus = Literal["resolved", "unresolved"] -ChatIntentKind = Literal["inbound_message"] -ChatIntentStatus = Literal["pending", "recorded"] +ChatIntentKind = Literal[ + "inbound_message", + "capture", + "recall", + "resume", + "correction", + "open_loops", + "open_loop_review", + "approvals", + "approval_approve", + "approval_reject", + "unknown", +] +ChatIntentStatus = Literal["pending", "recorded", "handled", "failed"] ChannelDeliveryReceiptStatus = Literal["delivered", "failed", "simulated"] TaskArtifactStatus = Literal["registered"] TaskArtifactIngestionStatus = Literal["pending", "ingested"] @@ -5240,6 +5252,9 @@ class ChatIntentRecord(TypedDict): channel_thread_id: str | None intent_kind: ChatIntentKind status: ChatIntentStatus + intent_payload: JsonObject + result_payload: JsonObject + handled_at: str | None created_at: str @@ -5254,3 +5269,27 @@ class ChannelDeliveryReceiptRecord(TypedDict): failure_detail: str | None recorded_at: str created_at: str + + +class ApprovalChallengeRecord(TypedDict): + id: str + workspace_id: str + approval_id: str + channel_message_id: str | None + status: Literal["pending", "approved", "rejected", "dismissed"] + challenge_prompt: str + challenge_payload: JsonObject + resolved_at: str | None + created_at: str + updated_at: str + + +class OpenLoopReviewRecord(TypedDict): + id: str + workspace_id: str + continuity_object_id: str + channel_message_id: str | None + correction_event_id: str | None + review_action: ContinuityOpenLoopReviewAction + note: str | None + created_at: str diff --git a/apps/api/src/alicebot_api/main.py b/apps/api/src/alicebot_api/main.py index fdb5e8a..07a351d 100644 --- a/apps/api/src/alicebot_api/main.py +++ b/apps/api/src/alicebot_api/main.py @@ -411,6 +411,17 @@ start_telegram_link_challenge, unlink_telegram_identity, ) +from alicebot_api.telegram_continuity import ( + HostedUserAccountNotFoundError, + TelegramMessageResultNotFoundError, + apply_telegram_open_loop_review_with_log, + approve_telegram_approval, + get_telegram_message_result, + handle_telegram_message, + list_telegram_approvals, + prepare_telegram_continuity_context, + reject_telegram_approval, +) from alicebot_api.continuity_review import ( ContinuityReviewNotFoundError, ContinuityReviewValidationError, @@ -1388,6 +1399,25 @@ class TelegramDispatchRequest(BaseModel): idempotency_key: str | None = Field(default=None, min_length=16, max_length=160) +class TelegramMessageHandleRequest(BaseModel): + model_config = ConfigDict(extra="forbid") + + intent_hint: str | None = Field(default=None, min_length=1, max_length=40) + + +class TelegramOpenLoopReviewActionBody(BaseModel): + model_config = ConfigDict(extra="forbid") + + action: str = Field(min_length=1, max_length=40) + note: str | None = Field(default=None, min_length=1, max_length=500) + + +class TelegramApprovalResolveBody(BaseModel): + model_config = ConfigDict(extra="forbid") + + note: str | None = Field(default=None, min_length=1, max_length=500) + + def _extract_bearer_token(request: Request) -> str: raw_authorization = request.headers.get("authorization", "").strip() if raw_authorization == "": @@ -5534,3 +5564,497 @@ def list_v1_telegram_delivery_receipts( } ), ) + + +@app.post("/v1/channels/telegram/messages/{message_id}/handle") +def handle_v1_telegram_message( + message_id: UUID, + request: Request, + body: TelegramMessageHandleRequest, +) -> JSONResponse: + settings = get_settings() + + try: + session_token = _extract_bearer_token(request) + with psycopg.connect(settings.database_url, row_factory=dict_row) as conn: + with conn.transaction(): + resolution = resolve_auth_session(conn, session_token=session_token) + user_account_id = resolution["user_account"]["id"] + workspace = _resolve_workspace_for_hosted_channel_request( + conn, + user_account_id=user_account_id, + session_id=resolution["session"]["id"], + preferred_workspace_id=resolution["session"]["workspace_id"], + requested_workspace_id=None, + ) + if workspace is None: + return JSONResponse(status_code=404, content={"detail": "no workspace is currently selected"}) + + prepare_telegram_continuity_context(conn, user_account_id=user_account_id) + payload = handle_telegram_message( + conn, + user_account_id=user_account_id, + workspace_id=workspace["id"], + message_id=message_id, + bot_token=settings.telegram_bot_token, + intent_hint=body.intent_hint, + ) + except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc: + return JSONResponse(status_code=401, content={"detail": str(exc)}) + except HostedUserAccountNotFoundError as exc: + return JSONResponse(status_code=404, content={"detail": str(exc)}) + except TelegramMessageNotFoundError as exc: + return JSONResponse(status_code=404, content={"detail": str(exc)}) + except TelegramRoutingError as exc: + return JSONResponse(status_code=409, content={"detail": str(exc)}) + except ValueError as exc: + return JSONResponse(status_code=400, content={"detail": str(exc)}) + + return JSONResponse(status_code=200, content=jsonable_encoder(payload)) + + +@app.get("/v1/channels/telegram/messages/{message_id}/result") +def get_v1_telegram_message_result( + message_id: UUID, + request: Request, +) -> JSONResponse: + settings = get_settings() + + try: + session_token = _extract_bearer_token(request) + with psycopg.connect(settings.database_url, row_factory=dict_row) as conn: + with conn.transaction(): + resolution = resolve_auth_session(conn, session_token=session_token) + user_account_id = resolution["user_account"]["id"] + workspace = _resolve_workspace_for_hosted_channel_request( + conn, + user_account_id=user_account_id, + session_id=resolution["session"]["id"], + preferred_workspace_id=resolution["session"]["workspace_id"], + requested_workspace_id=None, + ) + if workspace is None: + return JSONResponse(status_code=404, content={"detail": "no workspace is currently selected"}) + + prepare_telegram_continuity_context(conn, user_account_id=user_account_id) + payload = get_telegram_message_result( + conn, + user_account_id=user_account_id, + workspace_id=workspace["id"], + message_id=message_id, + ) + except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc: + return JSONResponse(status_code=401, content={"detail": str(exc)}) + except HostedUserAccountNotFoundError as exc: + return JSONResponse(status_code=404, content={"detail": str(exc)}) + except TelegramMessageResultNotFoundError as exc: + return JSONResponse(status_code=404, content={"detail": str(exc)}) + + return JSONResponse(status_code=200, content=jsonable_encoder(payload)) + + +@app.get("/v1/channels/telegram/recall") +def list_v1_telegram_recall( + request: Request, + query_text: str | None = Query(default=None, alias="query", min_length=1, max_length=4000), + thread_id: UUID | None = None, + task_id: UUID | None = None, + project: str | None = Query(default=None, min_length=1, max_length=200), + person: str | None = Query(default=None, min_length=1, max_length=200), + since: datetime | None = None, + until: datetime | None = None, + limit: int = Query( + default=DEFAULT_CONTINUITY_RECALL_LIMIT, + ge=1, + le=MAX_CONTINUITY_RECALL_LIMIT, + ), +) -> JSONResponse: + settings = get_settings() + + try: + session_token = _extract_bearer_token(request) + with psycopg.connect(settings.database_url, row_factory=dict_row) as conn: + with conn.transaction(): + resolution = resolve_auth_session(conn, session_token=session_token) + user_account_id = resolution["user_account"]["id"] + workspace = _resolve_workspace_for_hosted_channel_request( + conn, + user_account_id=user_account_id, + session_id=resolution["session"]["id"], + preferred_workspace_id=resolution["session"]["workspace_id"], + requested_workspace_id=None, + ) + if workspace is None: + return JSONResponse(status_code=404, content={"detail": "no workspace is currently selected"}) + + prepare_telegram_continuity_context(conn, user_account_id=user_account_id) + payload = query_continuity_recall( + ContinuityStore(conn), + user_id=user_account_id, + request=ContinuityRecallQueryInput( + query=query_text, + thread_id=thread_id, + task_id=task_id, + project=project, + person=person, + since=since, + until=until, + limit=limit, + ), + ) + except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc: + return JSONResponse(status_code=401, content={"detail": str(exc)}) + except HostedUserAccountNotFoundError as exc: + return JSONResponse(status_code=404, content={"detail": str(exc)}) + except ContinuityRecallValidationError as exc: + return JSONResponse(status_code=400, content={"detail": str(exc)}) + + return JSONResponse( + status_code=200, + content=jsonable_encoder( + { + "workspace_id": str(workspace["id"]), + "recall": payload, + } + ), + ) + + +@app.get("/v1/channels/telegram/resume") +def get_v1_telegram_resumption_brief( + request: Request, + query_text: str | None = Query(default=None, alias="query", min_length=1, max_length=4000), + thread_id: UUID | None = None, + task_id: UUID | None = None, + project: str | None = Query(default=None, min_length=1, max_length=200), + person: str | None = Query(default=None, min_length=1, max_length=200), + since: datetime | None = None, + until: datetime | None = None, + max_recent_changes: int = Query( + default=DEFAULT_CONTINUITY_RESUMPTION_RECENT_CHANGES_LIMIT, + ge=0, + le=MAX_CONTINUITY_RESUMPTION_RECENT_CHANGES_LIMIT, + ), + max_open_loops: int = Query( + default=DEFAULT_CONTINUITY_RESUMPTION_OPEN_LOOP_LIMIT, + ge=0, + le=MAX_CONTINUITY_RESUMPTION_OPEN_LOOP_LIMIT, + ), +) -> JSONResponse: + settings = get_settings() + + try: + session_token = _extract_bearer_token(request) + with psycopg.connect(settings.database_url, row_factory=dict_row) as conn: + with conn.transaction(): + resolution = resolve_auth_session(conn, session_token=session_token) + user_account_id = resolution["user_account"]["id"] + workspace = _resolve_workspace_for_hosted_channel_request( + conn, + user_account_id=user_account_id, + session_id=resolution["session"]["id"], + preferred_workspace_id=resolution["session"]["workspace_id"], + requested_workspace_id=None, + ) + if workspace is None: + return JSONResponse(status_code=404, content={"detail": "no workspace is currently selected"}) + + prepare_telegram_continuity_context(conn, user_account_id=user_account_id) + payload = compile_continuity_resumption_brief( + ContinuityStore(conn), + user_id=user_account_id, + request=ContinuityResumptionBriefRequestInput( + query=query_text, + thread_id=thread_id, + task_id=task_id, + project=project, + person=person, + since=since, + until=until, + max_recent_changes=max_recent_changes, + max_open_loops=max_open_loops, + ), + ) + except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc: + return JSONResponse(status_code=401, content={"detail": str(exc)}) + except HostedUserAccountNotFoundError as exc: + return JSONResponse(status_code=404, content={"detail": str(exc)}) + except ContinuityResumptionValidationError as exc: + return JSONResponse(status_code=400, content={"detail": str(exc)}) + except ContinuityRecallValidationError as exc: + return JSONResponse(status_code=400, content={"detail": str(exc)}) + + return JSONResponse( + status_code=200, + content=jsonable_encoder( + { + "workspace_id": str(workspace["id"]), + "resume": payload, + } + ), + ) + + +@app.get("/v1/channels/telegram/open-loops") +def get_v1_telegram_open_loops( + request: Request, + query_text: str | None = Query(default=None, alias="query", min_length=1, max_length=4000), + thread_id: UUID | None = None, + task_id: UUID | None = None, + project: str | None = Query(default=None, min_length=1, max_length=200), + person: str | None = Query(default=None, min_length=1, max_length=200), + since: datetime | None = None, + until: datetime | None = None, + limit: int = Query( + default=DEFAULT_CONTINUITY_OPEN_LOOP_LIMIT, + ge=0, + le=MAX_CONTINUITY_OPEN_LOOP_LIMIT, + ), +) -> JSONResponse: + settings = get_settings() + + try: + session_token = _extract_bearer_token(request) + with psycopg.connect(settings.database_url, row_factory=dict_row) as conn: + with conn.transaction(): + resolution = resolve_auth_session(conn, session_token=session_token) + user_account_id = resolution["user_account"]["id"] + workspace = _resolve_workspace_for_hosted_channel_request( + conn, + user_account_id=user_account_id, + session_id=resolution["session"]["id"], + preferred_workspace_id=resolution["session"]["workspace_id"], + requested_workspace_id=None, + ) + if workspace is None: + return JSONResponse(status_code=404, content={"detail": "no workspace is currently selected"}) + + prepare_telegram_continuity_context(conn, user_account_id=user_account_id) + payload = compile_continuity_open_loop_dashboard( + ContinuityStore(conn), + user_id=user_account_id, + request=ContinuityOpenLoopDashboardQueryInput( + query=query_text, + thread_id=thread_id, + task_id=task_id, + project=project, + person=person, + since=since, + until=until, + limit=limit, + ), + ) + except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc: + return JSONResponse(status_code=401, content={"detail": str(exc)}) + except HostedUserAccountNotFoundError as exc: + return JSONResponse(status_code=404, content={"detail": str(exc)}) + except ContinuityOpenLoopValidationError as exc: + return JSONResponse(status_code=400, content={"detail": str(exc)}) + except ContinuityRecallValidationError as exc: + return JSONResponse(status_code=400, content={"detail": str(exc)}) + + return JSONResponse( + status_code=200, + content=jsonable_encoder( + { + "workspace_id": str(workspace["id"]), + "open_loops": payload, + } + ), + ) + + +@app.post("/v1/channels/telegram/open-loops/{open_loop_id}/review-action") +def review_action_v1_telegram_open_loop( + open_loop_id: UUID, + request: Request, + body: TelegramOpenLoopReviewActionBody, +) -> JSONResponse: + settings = get_settings() + + try: + session_token = _extract_bearer_token(request) + with psycopg.connect(settings.database_url, row_factory=dict_row) as conn: + with conn.transaction(): + resolution = resolve_auth_session(conn, session_token=session_token) + user_account_id = resolution["user_account"]["id"] + workspace = _resolve_workspace_for_hosted_channel_request( + conn, + user_account_id=user_account_id, + session_id=resolution["session"]["id"], + preferred_workspace_id=resolution["session"]["workspace_id"], + requested_workspace_id=None, + ) + if workspace is None: + return JSONResponse(status_code=404, content={"detail": "no workspace is currently selected"}) + + prepare_telegram_continuity_context(conn, user_account_id=user_account_id) + payload = apply_telegram_open_loop_review_with_log( + conn, + user_account_id=user_account_id, + workspace_id=workspace["id"], + continuity_object_id=open_loop_id, + action=body.action, + note=body.note, + ) + except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc: + return JSONResponse(status_code=401, content={"detail": str(exc)}) + except HostedUserAccountNotFoundError as exc: + return JSONResponse(status_code=404, content={"detail": str(exc)}) + except ContinuityOpenLoopNotFoundError as exc: + return JSONResponse(status_code=404, content={"detail": str(exc)}) + except ContinuityOpenLoopValidationError as exc: + return JSONResponse(status_code=400, content={"detail": str(exc)}) + + return JSONResponse(status_code=200, content=jsonable_encoder(payload)) + + +@app.get("/v1/channels/telegram/approvals") +def list_v1_telegram_approvals( + request: Request, + status: str = Query(default="pending", min_length=1, max_length=20), +) -> JSONResponse: + settings = get_settings() + status_filter = status.casefold().strip() + if status_filter not in {"pending", "all"}: + return JSONResponse(status_code=400, content={"detail": "status must be one of: pending, all"}) + + try: + session_token = _extract_bearer_token(request) + with psycopg.connect(settings.database_url, row_factory=dict_row) as conn: + with conn.transaction(): + resolution = resolve_auth_session(conn, session_token=session_token) + user_account_id = resolution["user_account"]["id"] + workspace = _resolve_workspace_for_hosted_channel_request( + conn, + user_account_id=user_account_id, + session_id=resolution["session"]["id"], + preferred_workspace_id=resolution["session"]["workspace_id"], + requested_workspace_id=None, + ) + if workspace is None: + return JSONResponse(status_code=404, content={"detail": "no workspace is currently selected"}) + + prepare_telegram_continuity_context(conn, user_account_id=user_account_id) + payload = list_telegram_approvals( + conn, + user_account_id=user_account_id, + workspace_id=workspace["id"], + status_filter=status_filter, # type: ignore[arg-type] + ) + except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc: + return JSONResponse(status_code=401, content={"detail": str(exc)}) + except HostedUserAccountNotFoundError as exc: + return JSONResponse(status_code=404, content={"detail": str(exc)}) + + return JSONResponse(status_code=200, content=jsonable_encoder(payload)) + + +@app.post("/v1/channels/telegram/approvals/{approval_id}/approve") +def approve_v1_telegram_approval( + approval_id: UUID, + request: Request, + body: TelegramApprovalResolveBody | None = None, +) -> JSONResponse: + del body + settings = get_settings() + resolution_error: ( + ApprovalResolutionConflictError | TaskStepApprovalLinkageError | TaskStepLifecycleBoundaryError | None + ) = None + + try: + session_token = _extract_bearer_token(request) + with psycopg.connect(settings.database_url, row_factory=dict_row) as conn: + with conn.transaction(): + resolution = resolve_auth_session(conn, session_token=session_token) + user_account_id = resolution["user_account"]["id"] + workspace = _resolve_workspace_for_hosted_channel_request( + conn, + user_account_id=user_account_id, + session_id=resolution["session"]["id"], + preferred_workspace_id=resolution["session"]["workspace_id"], + requested_workspace_id=None, + ) + if workspace is None: + return JSONResponse(status_code=404, content={"detail": "no workspace is currently selected"}) + prepare_telegram_continuity_context(conn, user_account_id=user_account_id) + try: + payload = approve_telegram_approval( + conn, + user_account_id=user_account_id, + workspace_id=workspace["id"], + approval_id=approval_id, + ) + except ( + ApprovalResolutionConflictError, + TaskStepApprovalLinkageError, + TaskStepLifecycleBoundaryError, + ) as exc: + resolution_error = exc + payload = None + except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc: + return JSONResponse(status_code=401, content={"detail": str(exc)}) + except HostedUserAccountNotFoundError as exc: + return JSONResponse(status_code=404, content={"detail": str(exc)}) + except ApprovalNotFoundError as exc: + return JSONResponse(status_code=404, content={"detail": str(exc)}) + + if resolution_error is not None: + return JSONResponse(status_code=409, content={"detail": str(resolution_error)}) + + return JSONResponse(status_code=200, content=jsonable_encoder(payload)) + + +@app.post("/v1/channels/telegram/approvals/{approval_id}/reject") +def reject_v1_telegram_approval( + approval_id: UUID, + request: Request, + body: TelegramApprovalResolveBody | None = None, +) -> JSONResponse: + del body + settings = get_settings() + resolution_error: ( + ApprovalResolutionConflictError | TaskStepApprovalLinkageError | TaskStepLifecycleBoundaryError | None + ) = None + + try: + session_token = _extract_bearer_token(request) + with psycopg.connect(settings.database_url, row_factory=dict_row) as conn: + with conn.transaction(): + resolution = resolve_auth_session(conn, session_token=session_token) + user_account_id = resolution["user_account"]["id"] + workspace = _resolve_workspace_for_hosted_channel_request( + conn, + user_account_id=user_account_id, + session_id=resolution["session"]["id"], + preferred_workspace_id=resolution["session"]["workspace_id"], + requested_workspace_id=None, + ) + if workspace is None: + return JSONResponse(status_code=404, content={"detail": "no workspace is currently selected"}) + prepare_telegram_continuity_context(conn, user_account_id=user_account_id) + try: + payload = reject_telegram_approval( + conn, + user_account_id=user_account_id, + workspace_id=workspace["id"], + approval_id=approval_id, + ) + except ( + ApprovalResolutionConflictError, + TaskStepApprovalLinkageError, + TaskStepLifecycleBoundaryError, + ) as exc: + resolution_error = exc + payload = None + except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc: + return JSONResponse(status_code=401, content={"detail": str(exc)}) + except HostedUserAccountNotFoundError as exc: + return JSONResponse(status_code=404, content={"detail": str(exc)}) + except ApprovalNotFoundError as exc: + return JSONResponse(status_code=404, content={"detail": str(exc)}) + + if resolution_error is not None: + return JSONResponse(status_code=409, content={"detail": str(resolution_error)}) + + return JSONResponse(status_code=200, content=jsonable_encoder(payload)) diff --git a/apps/api/src/alicebot_api/store.py b/apps/api/src/alicebot_api/store.py index eaa5e34..6df9cbe 100644 --- a/apps/api/src/alicebot_api/store.py +++ b/apps/api/src/alicebot_api/store.py @@ -470,6 +470,9 @@ class ChatIntentRow(TypedDict): channel_thread_id: UUID | None intent_kind: str status: str + intent_payload: JsonObject + result_payload: JsonObject + handled_at: datetime | None created_at: datetime @@ -486,6 +489,30 @@ class ChannelDeliveryReceiptRow(TypedDict): created_at: datetime +class ApprovalChallengeRow(TypedDict): + id: UUID + workspace_id: UUID + approval_id: UUID + channel_message_id: UUID | None + status: str + challenge_prompt: str + challenge_payload: JsonObject + resolved_at: datetime | None + created_at: datetime + updated_at: datetime + + +class OpenLoopReviewRow(TypedDict): + id: UUID + workspace_id: UUID + continuity_object_id: UUID + channel_message_id: UUID | None + correction_event_id: UUID | None + review_action: str + note: str | None + created_at: datetime + + class TaskArtifactRow(TypedDict): id: UUID user_id: UUID diff --git a/apps/api/src/alicebot_api/telegram_continuity.py b/apps/api/src/alicebot_api/telegram_continuity.py new file mode 100644 index 0000000..22f732b --- /dev/null +++ b/apps/api/src/alicebot_api/telegram_continuity.py @@ -0,0 +1,1217 @@ +from __future__ import annotations + +from datetime import UTC, datetime +import hashlib +import re +from typing import Any, Literal, TypedDict +from uuid import UUID + +from psycopg.types.json import Jsonb + +from alicebot_api.approvals import ( + ApprovalNotFoundError, + ApprovalResolutionConflictError, + approve_approval_record, + list_approval_records, + reject_approval_record, +) +from alicebot_api.continuity_capture import capture_continuity_input +from alicebot_api.continuity_objects import ContinuityObjectValidationError +from alicebot_api.continuity_open_loops import ( + ContinuityOpenLoopNotFoundError, + ContinuityOpenLoopValidationError, + apply_continuity_open_loop_review_action, + compile_continuity_open_loop_dashboard, +) +from alicebot_api.continuity_recall import ContinuityRecallValidationError, query_continuity_recall +from alicebot_api.continuity_resumption import ( + ContinuityResumptionValidationError, + compile_continuity_resumption_brief, +) +from alicebot_api.continuity_review import ( + ContinuityReviewNotFoundError, + ContinuityReviewValidationError, + apply_continuity_correction, +) +from alicebot_api.contracts import ( + ApprovalApproveInput, + ApprovalRejectInput, + ContinuityCaptureCreateInput, + ContinuityCorrectionInput, + ContinuityOpenLoopDashboardQueryInput, + ContinuityOpenLoopReviewActionInput, + ContinuityRecallQueryInput, + ContinuityResumptionBriefRequestInput, +) +from alicebot_api.db import set_current_user +from alicebot_api.store import ContinuityStore, JsonObject +from alicebot_api.tasks import TaskStepApprovalLinkageError, TaskStepLifecycleBoundaryError +from alicebot_api.telegram_channels import ( + TELEGRAM_CHANNEL_TYPE, + TelegramMessageNotFoundError, + TelegramRoutingError, + dispatch_telegram_message, + serialize_channel_message, + serialize_delivery_receipt, +) + + +TelegramChatIntentKind = Literal[ + "capture", + "recall", + "resume", + "correction", + "open_loops", + "open_loop_review", + "approvals", + "approval_approve", + "approval_reject", + "unknown", +] +TelegramChatIntentStatus = Literal["pending", "recorded", "handled", "failed"] + +_SUPPORTED_INTENT_HINTS: set[str] = { + "capture", + "recall", + "resume", + "correction", + "open_loops", + "open_loop_review", + "approvals", + "approval_approve", + "approval_reject", + "unknown", +} +_RECALL_PATTERN = re.compile(r"^\s*/?recall\b(?:\s+(?P.+))?$", flags=re.IGNORECASE) +_RESUME_PATTERN = re.compile(r"^\s*/?resume\b(?:\s+(?P.+))?$", flags=re.IGNORECASE) +_OPEN_LOOPS_PATTERN = re.compile(r"^\s*(?:/?open-loops\b|open loops\b)(?:\s+.*)?$", flags=re.IGNORECASE) +_APPROVALS_PATTERN = re.compile(r"^\s*(?:/?approvals\b|pending approvals\b)(?:\s+.*)?$", flags=re.IGNORECASE) +_CORRECTION_PATTERN = re.compile( + ( + r"^\s*/?(?:correct|correction)\b\s+" + r"(?P[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})" + r"\s+(?P.+)$" + ), + flags=re.IGNORECASE, +) +_OPEN_LOOP_REVIEW_PATTERN = re.compile( + ( + r"^\s*/?open-loop\b\s+" + r"(?P<object_id>[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})" + r"\s+(?P<action>done|deferred|still_blocked)" + r"(?:\s+(?P<note>.+))?$" + ), + flags=re.IGNORECASE, +) +_APPROVE_PATTERN = re.compile(r"^\s*/?approve\b(?:\s+(?P<approval_id>\S+))?.*$", flags=re.IGNORECASE) +_REJECT_PATTERN = re.compile(r"^\s*/?reject\b(?:\s+(?P<approval_id>\S+))?(?:\s+(?P<note>.+))?$", flags=re.IGNORECASE) + + +class HostedUserAccountNotFoundError(LookupError): + """Raised when a hosted account is not available for continuity projection.""" + + +class TelegramMessageResultNotFoundError(LookupError): + """Raised when no Telegram handle result is available for the message.""" + + +class _TelegramInboundMessageRow(TypedDict): + id: UUID + workspace_id: UUID + channel_thread_id: UUID | None + channel_identity_id: UUID | None + route_status: str + message_text: str | None + external_chat_id: str | None + + +class TelegramIntentClassification(TypedDict): + intent_kind: TelegramChatIntentKind + confidence: float + intent_payload: JsonObject + + +class _ChatIntentRow(TypedDict): + id: UUID + workspace_id: UUID + channel_message_id: UUID + channel_thread_id: UUID | None + intent_kind: str + status: str + intent_payload: JsonObject + result_payload: JsonObject + handled_at: datetime | None + created_at: datetime + + +class _ApprovalChallengeRow(TypedDict): + id: UUID + workspace_id: UUID + approval_id: UUID + channel_message_id: UUID | None + status: str + challenge_prompt: str + challenge_payload: JsonObject + resolved_at: datetime | None + created_at: datetime + updated_at: datetime + + +def _utcnow() -> datetime: + return datetime.now(UTC) + + +def _normalize_optional_text(value: str | None) -> str | None: + if value is None: + return None + normalized = " ".join(value.split()).strip() + if normalized == "": + return None + return normalized + + +def _normalize_optional_payload_text(payload: JsonObject, *, field_name: str) -> str | None: + raw_value = payload.get(field_name) + if raw_value is None: + return None + if not isinstance(raw_value, str): + raise ValueError(f"{field_name} must be a string") + return _normalize_optional_text(raw_value) + + +def _parse_uuid(value: str, *, field_name: str) -> UUID: + try: + return UUID(value) + except (TypeError, ValueError) as exc: + raise ValueError(f"{field_name} must be a valid uuid") from exc + + +def _resolve_intent_hint(intent_hint: str | None) -> TelegramChatIntentKind | None: + normalized = _normalize_optional_text(intent_hint) + if normalized is None: + return None + lowered = normalized.casefold() + if lowered not in _SUPPORTED_INTENT_HINTS: + allowed = ", ".join(sorted(_SUPPORTED_INTENT_HINTS)) + raise ValueError(f"intent_hint must be one of: {allowed}") + return lowered # type: ignore[return-value] + + +def _serialize_chat_intent(row: _ChatIntentRow) -> dict[str, object]: + return { + "id": str(row["id"]), + "workspace_id": str(row["workspace_id"]), + "channel_message_id": str(row["channel_message_id"]), + "channel_thread_id": None if row["channel_thread_id"] is None else str(row["channel_thread_id"]), + "intent_kind": row["intent_kind"], + "status": row["status"], + "intent_payload": row["intent_payload"], + "result_payload": row["result_payload"], + "handled_at": None if row["handled_at"] is None else row["handled_at"].isoformat(), + "created_at": row["created_at"].isoformat(), + } + + +def _serialize_approval_challenge(row: _ApprovalChallengeRow) -> dict[str, object]: + return { + "id": str(row["id"]), + "workspace_id": str(row["workspace_id"]), + "approval_id": str(row["approval_id"]), + "channel_message_id": None if row["channel_message_id"] is None else str(row["channel_message_id"]), + "status": row["status"], + "challenge_prompt": row["challenge_prompt"], + "challenge_payload": row["challenge_payload"], + "resolved_at": None if row["resolved_at"] is None else row["resolved_at"].isoformat(), + "created_at": row["created_at"].isoformat(), + "updated_at": row["updated_at"].isoformat(), + } + + +def prepare_telegram_continuity_context( + conn, + *, + user_account_id: UUID, +) -> None: + set_current_user(conn, user_account_id) + + with conn.cursor() as cur: + cur.execute( + """ + SELECT id, email, display_name + FROM user_accounts + WHERE id = %s + LIMIT 1 + """, + (user_account_id,), + ) + account = cur.fetchone() + + if account is None: + raise HostedUserAccountNotFoundError(f"hosted user account {user_account_id} was not found") + + cur.execute( + """ + INSERT INTO users (id, email, display_name) + VALUES (%s, %s, %s) + ON CONFLICT (id) DO NOTHING + """, + (account["id"], account["email"], account["display_name"]), + ) + + +def _load_workspace_inbound_message( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, + message_id: UUID, +) -> _TelegramInboundMessageRow: + with conn.cursor() as cur: + cur.execute( + """ + SELECT m.id, + m.workspace_id, + m.channel_thread_id, + m.channel_identity_id, + m.route_status, + m.message_text, + m.external_chat_id + FROM channel_messages AS m + JOIN workspace_members AS wm + ON wm.workspace_id = m.workspace_id + WHERE m.id = %s + AND m.workspace_id = %s + AND wm.user_account_id = %s + AND m.channel_type = %s + AND m.direction = 'inbound' + LIMIT 1 + """, + (message_id, workspace_id, user_account_id, TELEGRAM_CHANNEL_TYPE), + ) + row = cur.fetchone() + + if row is None: + raise TelegramMessageNotFoundError(f"telegram source message {message_id} was not found") + if row["route_status"] != "resolved": + raise TelegramRoutingError("telegram source message does not have resolved routing") + return row + + +def classify_telegram_message_intent(message_text: str) -> TelegramIntentClassification: + normalized_text = _normalize_optional_text(message_text) + if normalized_text is None: + return { + "intent_kind": "unknown", + "confidence": 1.0, + "intent_payload": {"reason": "empty_message"}, + } + + review_match = _OPEN_LOOP_REVIEW_PATTERN.match(normalized_text) + if review_match is not None: + note = _normalize_optional_text(review_match.group("note")) + payload: JsonObject = { + "continuity_object_id": review_match.group("object_id"), + "action": review_match.group("action").casefold(), + } + payload["note"] = note + return { + "intent_kind": "open_loop_review", + "confidence": 0.99, + "intent_payload": payload, + } + + correction_match = _CORRECTION_PATTERN.match(normalized_text) + if correction_match is not None: + return { + "intent_kind": "correction", + "confidence": 0.99, + "intent_payload": { + "continuity_object_id": correction_match.group("object_id"), + "replacement_title": correction_match.group("title").strip(), + }, + } + + approve_match = _APPROVE_PATTERN.match(normalized_text) + if approve_match is not None: + return { + "intent_kind": "approval_approve", + "confidence": 0.98, + "intent_payload": { + "approval_id": approve_match.group("approval_id"), + }, + } + + reject_match = _REJECT_PATTERN.match(normalized_text) + if reject_match is not None: + return { + "intent_kind": "approval_reject", + "confidence": 0.98, + "intent_payload": { + "approval_id": reject_match.group("approval_id"), + "note": _normalize_optional_text(reject_match.group("note")), + }, + } + + recall_match = _RECALL_PATTERN.match(normalized_text) + if recall_match is not None: + return { + "intent_kind": "recall", + "confidence": 0.97, + "intent_payload": {"query": _normalize_optional_text(recall_match.group("query"))}, + } + + if normalized_text.casefold().startswith("what do you remember"): + suffix = _normalize_optional_text(normalized_text[len("what do you remember") :]) + return { + "intent_kind": "recall", + "confidence": 0.85, + "intent_payload": {"query": suffix}, + } + + resume_match = _RESUME_PATTERN.match(normalized_text) + if resume_match is not None: + return { + "intent_kind": "resume", + "confidence": 0.97, + "intent_payload": {"query": _normalize_optional_text(resume_match.group("query"))}, + } + + if normalized_text.casefold().startswith("where was i"): + return { + "intent_kind": "resume", + "confidence": 0.85, + "intent_payload": {"query": None}, + } + + if _OPEN_LOOPS_PATTERN.match(normalized_text) is not None: + return { + "intent_kind": "open_loops", + "confidence": 0.98, + "intent_payload": {"query": None}, + } + + if _APPROVALS_PATTERN.match(normalized_text) is not None: + return { + "intent_kind": "approvals", + "confidence": 0.98, + "intent_payload": {"status": "pending"}, + } + + return { + "intent_kind": "capture", + "confidence": 0.65, + "intent_payload": {"raw_content": normalized_text}, + } + + +def _upsert_chat_intent_result( + conn, + *, + workspace_id: UUID, + channel_message_id: UUID, + channel_thread_id: UUID | None, + intent_kind: TelegramChatIntentKind, + status: TelegramChatIntentStatus, + intent_payload: JsonObject, + result_payload: JsonObject, + handled_at: datetime | None, +) -> _ChatIntentRow: + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO chat_intents ( + workspace_id, + channel_message_id, + channel_thread_id, + intent_kind, + status, + intent_payload, + result_payload, + handled_at + ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (channel_message_id, intent_kind) DO UPDATE + SET status = EXCLUDED.status, + intent_payload = EXCLUDED.intent_payload, + result_payload = EXCLUDED.result_payload, + handled_at = EXCLUDED.handled_at + RETURNING id, + workspace_id, + channel_message_id, + channel_thread_id, + intent_kind, + status, + intent_payload, + result_payload, + handled_at, + created_at + """, + ( + workspace_id, + channel_message_id, + channel_thread_id, + intent_kind, + status, + Jsonb(intent_payload), + Jsonb(result_payload), + handled_at, + ), + ) + row = cur.fetchone() + + if row is None: + raise RuntimeError("failed to persist telegram chat intent result") + return row + + +def _fetch_latest_chat_intent_result( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, + message_id: UUID, +) -> _ChatIntentRow: + with conn.cursor() as cur: + cur.execute( + """ + SELECT ci.id, + ci.workspace_id, + ci.channel_message_id, + ci.channel_thread_id, + ci.intent_kind, + ci.status, + ci.intent_payload, + ci.result_payload, + ci.handled_at, + ci.created_at + FROM chat_intents AS ci + JOIN workspace_members AS wm + ON wm.workspace_id = ci.workspace_id + WHERE ci.workspace_id = %s + AND ci.channel_message_id = %s + AND wm.user_account_id = %s + AND ci.intent_kind <> 'inbound_message' + ORDER BY COALESCE(ci.handled_at, ci.created_at) DESC, ci.created_at DESC, ci.id DESC + LIMIT 1 + """, + (workspace_id, message_id, user_account_id), + ) + row = cur.fetchone() + + if row is None: + raise TelegramMessageResultNotFoundError( + f"telegram message {message_id} does not have a continuity handle result yet" + ) + return row + + +def _format_provenance_reference_list(references: list[dict[str, object]], *, limit: int = 3) -> str: + compact: list[str] = [] + for item in references[:limit]: + source_kind = str(item.get("source_kind", "source")) + source_id = str(item.get("source_id", "unknown")) + compact.append(f"{source_kind}:{source_id}") + return ", ".join(compact) + + +def _record_pending_approval_challenges( + conn, + *, + workspace_id: UUID, + approvals: list[dict[str, object]], + channel_message_id: UUID | None, +) -> list[dict[str, object]]: + recorded: list[dict[str, object]] = [] + if not approvals: + return recorded + + with conn.cursor() as cur: + for approval in approvals: + approval_id = _parse_uuid(str(approval["id"]), field_name="approval_id") + request_payload = approval.get("request") + if isinstance(request_payload, dict): + action_hint = _normalize_optional_text(str(request_payload.get("action"))) + else: + action_hint = None + + challenge_prompt = ( + f"Approval {approval_id} is pending." + if action_hint is None + else f"Approval {approval_id} is pending for action '{action_hint}'." + ) + challenge_payload: JsonObject = { + "approval": approval, + "source": "telegram", + } + cur.execute( + """ + INSERT INTO approval_challenges ( + workspace_id, + approval_id, + channel_message_id, + status, + challenge_prompt, + challenge_payload, + updated_at + ) + VALUES (%s, %s, %s, 'pending', %s, %s, %s) + ON CONFLICT (workspace_id, approval_id) WHERE status = 'pending' + DO UPDATE + SET channel_message_id = COALESCE(EXCLUDED.channel_message_id, approval_challenges.channel_message_id), + challenge_prompt = EXCLUDED.challenge_prompt, + challenge_payload = EXCLUDED.challenge_payload, + updated_at = EXCLUDED.updated_at + RETURNING id, + workspace_id, + approval_id, + channel_message_id, + status, + challenge_prompt, + challenge_payload, + resolved_at, + created_at, + updated_at + """, + ( + workspace_id, + approval_id, + channel_message_id, + challenge_prompt, + Jsonb(challenge_payload), + _utcnow(), + ), + ) + row = cur.fetchone() + if row is not None: + recorded.append(_serialize_approval_challenge(row)) + + return recorded + + +def _resolve_pending_approval_challenges( + conn, + *, + workspace_id: UUID, + approval_id: UUID, + resolution_status: Literal["approved", "rejected"], +) -> list[dict[str, object]]: + with conn.cursor() as cur: + cur.execute( + """ + UPDATE approval_challenges + SET status = %s, + resolved_at = %s, + updated_at = %s + WHERE workspace_id = %s + AND approval_id = %s + AND status = 'pending' + RETURNING id, + workspace_id, + approval_id, + channel_message_id, + status, + challenge_prompt, + challenge_payload, + resolved_at, + created_at, + updated_at + """, + ( + resolution_status, + _utcnow(), + _utcnow(), + workspace_id, + approval_id, + ), + ) + rows = cur.fetchall() + + return [_serialize_approval_challenge(row) for row in rows] + + +def list_telegram_approvals( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, + status_filter: Literal["pending", "all"] = "pending", + channel_message_id: UUID | None = None, +) -> dict[str, object]: + payload = list_approval_records( + ContinuityStore(conn), + user_id=user_account_id, + ) + raw_items = payload["items"] + if status_filter == "pending": + items = [item for item in raw_items if item["status"] == "pending"] + else: + items = raw_items + + pending_items = [item for item in raw_items if item["status"] == "pending"] + challenges = _record_pending_approval_challenges( + conn, + workspace_id=workspace_id, + approvals=pending_items, + channel_message_id=channel_message_id, + ) + + return { + "items": items, + "summary": { + "status": status_filter, + "returned_count": len(items), + "pending_count": len(pending_items), + "order": payload["summary"]["order"], + }, + "challenges": challenges, + } + + +def approve_telegram_approval( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, + approval_id: UUID, +) -> dict[str, object]: + payload = approve_approval_record( + ContinuityStore(conn), + user_id=user_account_id, + request=ApprovalApproveInput(approval_id=approval_id), + ) + challenge_updates = _resolve_pending_approval_challenges( + conn, + workspace_id=workspace_id, + approval_id=approval_id, + resolution_status="approved", + ) + return { + **payload, + "challenge_updates": challenge_updates, + } + + +def reject_telegram_approval( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, + approval_id: UUID, +) -> dict[str, object]: + payload = reject_approval_record( + ContinuityStore(conn), + user_id=user_account_id, + request=ApprovalRejectInput(approval_id=approval_id), + ) + challenge_updates = _resolve_pending_approval_challenges( + conn, + workspace_id=workspace_id, + approval_id=approval_id, + resolution_status="rejected", + ) + return { + **payload, + "challenge_updates": challenge_updates, + } + + +def apply_telegram_open_loop_review_with_log( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, + continuity_object_id: UUID, + action: str, + note: str | None, + channel_message_id: UUID | None = None, +) -> dict[str, object]: + response = apply_continuity_open_loop_review_action( + ContinuityStore(conn), + user_id=user_account_id, + continuity_object_id=continuity_object_id, + request=ContinuityOpenLoopReviewActionInput( + action=action, # type: ignore[arg-type] + note=note, + ), + ) + + correction_event_id = _parse_uuid(response["correction_event"]["id"], field_name="correction_event_id") + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO open_loop_reviews ( + workspace_id, + continuity_object_id, + channel_message_id, + correction_event_id, + review_action, + note + ) + VALUES (%s, %s, %s, %s, %s, %s) + RETURNING id, + workspace_id, + continuity_object_id, + channel_message_id, + correction_event_id, + review_action, + note, + created_at + """, + ( + workspace_id, + continuity_object_id, + channel_message_id, + correction_event_id, + action, + note, + ), + ) + review_row = cur.fetchone() + + if review_row is None: + raise RuntimeError("failed to persist open-loop review action log") + + return { + **response, + "review_log": { + "id": str(review_row["id"]), + "workspace_id": str(review_row["workspace_id"]), + "continuity_object_id": str(review_row["continuity_object_id"]), + "channel_message_id": None + if review_row["channel_message_id"] is None + else str(review_row["channel_message_id"]), + "correction_event_id": None + if review_row["correction_event_id"] is None + else str(review_row["correction_event_id"]), + "review_action": review_row["review_action"], + "note": review_row["note"], + "created_at": review_row["created_at"].isoformat(), + }, + } + + +def _execute_intent( + conn, + *, + store: ContinuityStore, + user_account_id: UUID, + workspace_id: UUID, + source_message_id: UUID, + classification: TelegramIntentClassification, + source_message_text: str, +) -> tuple[JsonObject, str]: + intent_kind = classification["intent_kind"] + intent_payload = classification["intent_payload"] + + if intent_kind == "capture": + capture_payload = capture_continuity_input( + store, + user_id=user_account_id, + request=ContinuityCaptureCreateInput( + raw_content=source_message_text, + explicit_signal=None, + ), + ) + capture = capture_payload["capture"] + derived_object = capture.get("derived_object") + if isinstance(derived_object, dict): + object_type = str(derived_object.get("object_type", "Note")) + title = str(derived_object.get("title", "captured object")) + reply_text = f"Captured {object_type}: {title}" + else: + reply_text = "Captured and queued for continuity triage." + return ( + { + "mode": "capture", + "capture": capture_payload["capture"], + "provenance_references": [ + { + "source_kind": "continuity_capture_event", + "source_id": capture["capture_event"]["id"], + } + ], + }, + reply_text, + ) + + if intent_kind == "recall": + query = _normalize_optional_payload_text(intent_payload, field_name="query") + if query is None: + raise ValueError("recall intent requires a query") + recall_payload = query_continuity_recall( + store, + user_id=user_account_id, + request=ContinuityRecallQueryInput(query=query, limit=5), + ) + if len(recall_payload["items"]) == 0: + reply_text = f"No recall results for '{query}'." + else: + first = recall_payload["items"][0] + provenance = _format_provenance_reference_list(first["provenance_references"]) + if provenance == "": + reply_text = f"Recall: {first['title']} ({first['status']})." + else: + reply_text = f"Recall: {first['title']} ({first['status']}). Provenance {provenance}." + return ( + { + "mode": "recall", + "query": query, + "recall": recall_payload, + }, + reply_text, + ) + + if intent_kind == "resume": + query = _normalize_optional_payload_text(intent_payload, field_name="query") + resume_payload = compile_continuity_resumption_brief( + store, + user_id=user_account_id, + request=ContinuityResumptionBriefRequestInput( + query=query, + ), + ) + brief = resume_payload["brief"] + decision = brief["last_decision"]["item"] + next_action = brief["next_action"]["item"] + decision_title = "none" if decision is None else decision["title"] + next_action_title = "none" if next_action is None else next_action["title"] + open_loop_count = brief["open_loops"]["summary"]["returned_count"] + reply_text = ( + f"Resume: decision={decision_title}; next_action={next_action_title}; " + f"open_loops={open_loop_count}." + ) + return ( + { + "mode": "resume", + "brief": brief, + }, + reply_text, + ) + + if intent_kind == "correction": + continuity_object_id_raw = _normalize_optional_payload_text(intent_payload, field_name="continuity_object_id") + if continuity_object_id_raw is None: + raise ValueError("correction intent requires continuity object id") + continuity_object_id = _parse_uuid(continuity_object_id_raw, field_name="continuity_object_id") + replacement_title = _normalize_optional_payload_text(intent_payload, field_name="replacement_title") + if replacement_title is None: + raise ValueError("correction intent requires replacement title text") + correction_payload = apply_continuity_correction( + store, + user_id=user_account_id, + continuity_object_id=continuity_object_id, + request=ContinuityCorrectionInput( + action="edit", + reason="telegram_correction", + title=replacement_title, + ), + ) + updated_object = correction_payload["continuity_object"] + reply_text = f"Correction applied: {updated_object['id']} now titled '{updated_object['title']}'." + return ( + { + "mode": "correction", + "correction": correction_payload, + "provenance_references": [ + { + "source_kind": "continuity_correction_event", + "source_id": correction_payload["correction_event"]["id"], + }, + { + "source_kind": "continuity_object", + "source_id": updated_object["id"], + }, + ], + }, + reply_text, + ) + + if intent_kind == "open_loops": + dashboard_payload = compile_continuity_open_loop_dashboard( + store, + user_id=user_account_id, + request=ContinuityOpenLoopDashboardQueryInput(limit=5), + ) + dashboard = dashboard_payload["dashboard"] + reply_text = ( + "Open loops: " + f"waiting_for={dashboard['waiting_for']['summary']['total_count']}, " + f"blocker={dashboard['blocker']['summary']['total_count']}, " + f"stale={dashboard['stale']['summary']['total_count']}, " + f"next_action={dashboard['next_action']['summary']['total_count']}." + ) + return ( + { + "mode": "open_loops", + "dashboard": dashboard, + }, + reply_text, + ) + + if intent_kind == "open_loop_review": + continuity_object_id_raw = _normalize_optional_payload_text(intent_payload, field_name="continuity_object_id") + if continuity_object_id_raw is None: + raise ValueError("open-loop review intent requires continuity object id") + continuity_object_id = _parse_uuid(continuity_object_id_raw, field_name="continuity_object_id") + action = _normalize_optional_payload_text(intent_payload, field_name="action") + if action is None: + raise ValueError("open-loop review intent requires action") + note = _normalize_optional_payload_text(intent_payload, field_name="note") + review_payload = apply_telegram_open_loop_review_with_log( + conn, + user_account_id=user_account_id, + workspace_id=workspace_id, + continuity_object_id=continuity_object_id, + action=action, + note=note, + channel_message_id=source_message_id, + ) + reply_text = ( + f"Open-loop review applied: action={review_payload['review_action']}, " + f"outcome={review_payload['lifecycle_outcome']}." + ) + return ( + { + "mode": "open_loop_review", + "review": review_payload, + }, + reply_text, + ) + + if intent_kind == "approvals": + approvals_payload = list_telegram_approvals( + conn, + user_account_id=user_account_id, + workspace_id=workspace_id, + status_filter="pending", + channel_message_id=source_message_id, + ) + items = approvals_payload["items"] + if len(items) == 0: + reply_text = "No pending approvals." + else: + pending_ids = ", ".join(str(item["id"]) for item in items[:3]) + suffix = "" if len(items) <= 3 else f" (+{len(items) - 3} more)" + reply_text = f"Pending approvals: {pending_ids}{suffix}." + return ( + { + "mode": "approvals", + "approvals": approvals_payload, + }, + reply_text, + ) + + if intent_kind == "approval_approve": + approval_id_raw = _normalize_optional_payload_text(intent_payload, field_name="approval_id") + if approval_id_raw is None: + raise ValueError("approve intent requires approval id") + approval_id = _parse_uuid(approval_id_raw, field_name="approval_id") + approval_payload = approve_telegram_approval( + conn, + user_account_id=user_account_id, + workspace_id=workspace_id, + approval_id=approval_id, + ) + final_status = approval_payload["approval"]["status"] + reply_text = f"Approval {approval_id} resolved as {final_status}." + return ( + { + "mode": "approval_approve", + "resolution": approval_payload, + }, + reply_text, + ) + + if intent_kind == "approval_reject": + approval_id_raw = _normalize_optional_payload_text(intent_payload, field_name="approval_id") + if approval_id_raw is None: + raise ValueError("reject intent requires approval id") + approval_id = _parse_uuid(approval_id_raw, field_name="approval_id") + approval_payload = reject_telegram_approval( + conn, + user_account_id=user_account_id, + workspace_id=workspace_id, + approval_id=approval_id, + ) + final_status = approval_payload["approval"]["status"] + reply_text = f"Approval {approval_id} resolved as {final_status}." + return ( + { + "mode": "approval_reject", + "resolution": approval_payload, + }, + reply_text, + ) + + if intent_kind == "unknown": + return ( + { + "mode": "unknown", + "reason": intent_payload.get("reason", "unknown_intent"), + }, + "I could not determine the requested action. Use /recall, /resume, /open-loops, /approvals, or send capture text.", + ) + + raise ValueError(f"unsupported telegram intent kind: {intent_kind}") + + +def handle_telegram_message( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, + message_id: UUID, + bot_token: str, + intent_hint: str | None = None, +) -> dict[str, object]: + source_message = _load_workspace_inbound_message( + conn, + user_account_id=user_account_id, + workspace_id=workspace_id, + message_id=message_id, + ) + + source_text = _normalize_optional_text(source_message["message_text"]) or "" + classification = classify_telegram_message_intent(source_text) + hinted_intent = _resolve_intent_hint(intent_hint) + + intent_kind: TelegramChatIntentKind = classification["intent_kind"] + status: TelegramChatIntentStatus + result_payload: JsonObject + reply_text: str + + store = ContinuityStore(conn) + execution_error_kinds = ( + ApprovalNotFoundError, + ApprovalResolutionConflictError, + ContinuityOpenLoopNotFoundError, + ContinuityOpenLoopValidationError, + ContinuityRecallValidationError, + ContinuityResumptionValidationError, + ContinuityReviewNotFoundError, + ContinuityReviewValidationError, + ContinuityObjectValidationError, + TaskStepApprovalLinkageError, + TaskStepLifecycleBoundaryError, + ValueError, + ) + + if hinted_intent is not None and hinted_intent != classification["intent_kind"]: + intent_kind = hinted_intent + status = "failed" + result_payload = { + "ok": False, + "error": { + "code": "intent_hint_mismatch", + "detail": ( + f"intent_hint '{hinted_intent}' does not match detected intent " + f"'{classification['intent_kind']}'" + ), + }, + "detected_intent_kind": classification["intent_kind"], + } + reply_text = ( + f"Intent hint '{hinted_intent}' did not match detected intent " + f"'{classification['intent_kind']}'." + ) + else: + try: + intent_result, reply_text = _execute_intent( + conn, + store=store, + user_account_id=user_account_id, + workspace_id=workspace_id, + source_message_id=message_id, + classification=classification, + source_message_text=source_text, + ) + status = "handled" + result_payload = { + "ok": True, + "intent_result": intent_result, + } + except execution_error_kinds as exc: + status = "failed" + result_payload = { + "ok": False, + "error": { + "code": "intent_execution_failed", + "type": exc.__class__.__name__, + "detail": str(exc), + }, + } + reply_text = f"Unable to process {classification['intent_kind']}: {exc}" + + dispatch_idempotency_key = hashlib.sha256( + f"telegram:handle:{message_id}:{intent_kind}".encode("utf-8") + ).hexdigest() + outbound_message, receipt = dispatch_telegram_message( + conn, + user_account_id=user_account_id, + workspace_id=workspace_id, + source_message_id=message_id, + text=reply_text, + dispatch_idempotency_key=dispatch_idempotency_key, + bot_token=bot_token, + ) + + result_payload["reply"] = { + "text": reply_text, + "outbound_message_id": str(outbound_message["id"]), + "delivery_receipt_id": str(receipt["id"]), + } + + persisted_intent = _upsert_chat_intent_result( + conn, + workspace_id=workspace_id, + channel_message_id=message_id, + channel_thread_id=source_message["channel_thread_id"], + intent_kind=intent_kind, + status=status, + intent_payload={ + **classification["intent_payload"], + "detected_intent_kind": classification["intent_kind"], + "intent_confidence": classification["confidence"], + "intent_hint": hinted_intent, + }, + result_payload=result_payload, + handled_at=_utcnow(), + ) + + return { + "message": { + "id": str(source_message["id"]), + "workspace_id": str(source_message["workspace_id"]), + "channel_thread_id": None + if source_message["channel_thread_id"] is None + else str(source_message["channel_thread_id"]), + "channel_identity_id": None + if source_message["channel_identity_id"] is None + else str(source_message["channel_identity_id"]), + "route_status": source_message["route_status"], + "message_text": source_message["message_text"], + "external_chat_id": source_message["external_chat_id"], + }, + "intent": _serialize_chat_intent(persisted_intent), + "outbound_message": serialize_channel_message(outbound_message), + "delivery_receipt": serialize_delivery_receipt(receipt), + } + + +def get_telegram_message_result( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, + message_id: UUID, +) -> dict[str, object]: + intent = _fetch_latest_chat_intent_result( + conn, + user_account_id=user_account_id, + workspace_id=workspace_id, + message_id=message_id, + ) + return { + "message_id": str(message_id), + "intent": _serialize_chat_intent(intent), + } diff --git a/tests/integration/test_phase10_chat_continuity_approvals_api.py b/tests/integration/test_phase10_chat_continuity_approvals_api.py new file mode 100644 index 0000000..9c0d4a1 --- /dev/null +++ b/tests/integration/test_phase10_chat_continuity_approvals_api.py @@ -0,0 +1,792 @@ +from __future__ import annotations + +import json +from typing import Any +from urllib.parse import urlencode +from uuid import UUID, uuid4 + +import anyio +import psycopg + +import apps.api.src.alicebot_api.main as main_module +from apps.api.src.alicebot_api.config import Settings + + +def invoke_request( + method: str, + path: str, + *, + query_params: dict[str, str] | None = None, + payload: dict[str, Any] | None = None, + headers: dict[str, str] | None = None, +) -> tuple[int, dict[str, Any]]: + messages: list[dict[str, object]] = [] + encoded_body = b"" if payload is None else json.dumps(payload).encode() + request_received = False + + async def receive() -> dict[str, object]: + nonlocal request_received + if request_received: + return {"type": "http.disconnect"} + + request_received = True + return {"type": "http.request", "body": encoded_body, "more_body": False} + + async def send(message: dict[str, object]) -> None: + messages.append(message) + + query_string = urlencode(query_params or {}).encode() + request_headers = [(b"content-type", b"application/json")] + for key, value in (headers or {}).items(): + request_headers.append((key.lower().encode(), value.encode())) + + scope = { + "type": "http", + "asgi": {"version": "3.0"}, + "http_version": "1.1", + "method": method, + "scheme": "http", + "path": path, + "raw_path": path.encode(), + "query_string": query_string, + "headers": request_headers, + "client": ("testclient", 50000), + "server": ("testserver", 80), + "root_path": "", + } + + anyio.run(main_module.app, scope, receive, send) + + start_message = next(message for message in messages if message["type"] == "http.response.start") + body = b"".join( + message.get("body", b"") + for message in messages + if message["type"] == "http.response.body" + ) + return start_message["status"], json.loads(body) + + +def auth_header(session_token: str) -> dict[str, str]: + return {"authorization": f"Bearer {session_token}"} + + +def _configure_settings(migrated_database_urls, monkeypatch) -> None: + monkeypatch.setattr( + main_module, + "get_settings", + lambda: Settings( + database_url=migrated_database_urls["app"], + magic_link_ttl_seconds=600, + auth_session_ttl_seconds=3600, + device_link_ttl_seconds=600, + telegram_link_ttl_seconds=600, + telegram_bot_username="alicebot", + telegram_webhook_secret="", + telegram_bot_token="", + ), + ) + + +def _bootstrap_workspace_session(email: str) -> tuple[str, str]: + start_status, start_payload = invoke_request( + "POST", + "/v1/auth/magic-link/start", + payload={"email": email}, + ) + assert start_status == 200 + + verify_status, verify_payload = invoke_request( + "POST", + "/v1/auth/magic-link/verify", + payload={ + "challenge_token": start_payload["challenge"]["challenge_token"], + "device_label": "P10-S3 Device", + "device_key": f"device-{email}", + }, + ) + assert verify_status == 200 + session_token = verify_payload["session_token"] + + create_workspace_status, create_workspace_payload = invoke_request( + "POST", + "/v1/workspaces", + payload={"name": "P10-S3 Workspace"}, + headers=auth_header(session_token), + ) + assert create_workspace_status == 201 + workspace_id = create_workspace_payload["workspace"]["id"] + + bootstrap_status, bootstrap_payload = invoke_request( + "POST", + "/v1/workspaces/bootstrap", + payload={"workspace_id": workspace_id}, + headers=auth_header(session_token), + ) + assert bootstrap_status == 200 + assert bootstrap_payload["workspace"]["bootstrap_status"] == "ready" + return session_token, workspace_id + + +def _link_telegram_chat( + *, + session_token: str, + workspace_id: str, + chat_id: int, + user_id: int, + username: str, +) -> None: + start_status, start_payload = invoke_request( + "POST", + "/v1/channels/telegram/link/start", + payload={"workspace_id": workspace_id}, + headers=auth_header(session_token), + ) + assert start_status == 200 + challenge_token = start_payload["challenge"]["challenge_token"] + link_code = start_payload["challenge"]["link_code"] + + webhook_status, webhook_payload = invoke_request( + "POST", + "/v1/channels/telegram/webhook", + payload={ + "update_id": 910001, + "message": { + "message_id": 710001, + "date": 1710000000, + "chat": {"id": chat_id, "type": "private"}, + "from": {"id": user_id, "username": username}, + "text": f"/link {link_code}", + }, + }, + ) + assert webhook_status == 200 + assert webhook_payload["ingest"]["link_status"] == "confirmed" + + confirm_status, confirm_payload = invoke_request( + "POST", + "/v1/channels/telegram/link/confirm", + payload={"challenge_token": challenge_token}, + headers=auth_header(session_token), + ) + assert confirm_status == 201 + assert confirm_payload["identity"]["status"] == "linked" + + +def _ingest_message( + *, + update_id: int, + message_id: int, + chat_id: int, + user_id: int, + username: str, + text: str, +) -> str: + webhook_status, webhook_payload = invoke_request( + "POST", + "/v1/channels/telegram/webhook", + payload={ + "update_id": update_id, + "message": { + "message_id": message_id, + "date": 1710001000 + update_id, + "chat": {"id": chat_id, "type": "private"}, + "from": {"id": user_id, "username": username}, + "text": text, + }, + }, + ) + assert webhook_status == 200 + assert webhook_payload["ingest"]["route_status"] == "resolved" + return webhook_payload["ingest"]["message"]["id"] + + +def _handle_message( + *, + session_token: str, + message_id: str, + intent_hint: str | None = None, +) -> tuple[int, dict[str, Any]]: + payload: dict[str, Any] = {} + if intent_hint is not None: + payload["intent_hint"] = intent_hint + return invoke_request( + "POST", + f"/v1/channels/telegram/messages/{message_id}/handle", + payload=payload, + headers=auth_header(session_token), + ) + + +def _seed_pending_approval(*, admin_db_url: str, user_id: UUID, seed_key: str) -> UUID: + thread_id = uuid4() + trace_id = uuid4() + tool_id = uuid4() + approval_id = uuid4() + task_id = uuid4() + task_step_id = uuid4() + + with psycopg.connect(admin_db_url) as conn: + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO users (id, email, display_name) + VALUES (%s, %s, %s) + ON CONFLICT (id) DO UPDATE + SET email = EXCLUDED.email, + display_name = EXCLUDED.display_name + """, + (str(user_id), f"{seed_key}@example.com", f"{seed_key} User"), + ) + cur.execute( + """ + INSERT INTO threads (id, user_id, title) + VALUES (%s, %s, %s) + """, + (str(thread_id), str(user_id), f"{seed_key} Thread"), + ) + cur.execute( + """ + INSERT INTO traces (id, user_id, thread_id, kind, compiler_version, status, limits) + VALUES (%s, %s, %s, 'telegram.seed', 'v0', 'completed', '{}'::jsonb) + """, + (str(trace_id), str(user_id), str(thread_id)), + ) + cur.execute( + """ + INSERT INTO tools ( + id, + user_id, + tool_key, + name, + description, + version, + metadata_version, + active, + tags, + action_hints, + scope_hints, + domain_hints, + risk_hints, + metadata + ) + VALUES ( + %s, + %s, + %s, + %s, + %s, + '1.0.0', + 'tool_metadata_v0', + TRUE, + '[]'::jsonb, + '[]'::jsonb, + '[]'::jsonb, + '[]'::jsonb, + '[]'::jsonb, + '{}'::jsonb + ) + """, + ( + str(tool_id), + str(user_id), + f"telegram.seed.{seed_key}", + f"{seed_key} Tool", + "Seed tool for telegram approvals", + ), + ) + cur.execute( + """ + INSERT INTO approvals ( + id, + user_id, + thread_id, + tool_id, + task_run_id, + task_step_id, + status, + request, + tool, + routing, + routing_trace_id + ) + VALUES ( + %s, + %s, + %s, + %s, + NULL, + NULL, + 'pending', + '{"action":"deploy"}'::jsonb, + '{"id":"seed-tool"}'::jsonb, + '{"decision":"approval_required"}'::jsonb, + %s + ) + """, + (str(approval_id), str(user_id), str(thread_id), str(tool_id), str(trace_id)), + ) + cur.execute( + """ + INSERT INTO tasks ( + id, + user_id, + thread_id, + tool_id, + status, + request, + tool, + latest_approval_id, + latest_execution_id + ) + VALUES ( + %s, + %s, + %s, + %s, + 'pending_approval', + '{"action":"deploy"}'::jsonb, + '{"id":"seed-tool"}'::jsonb, + %s, + NULL + ) + """, + (str(task_id), str(user_id), str(thread_id), str(tool_id), str(approval_id)), + ) + cur.execute( + """ + INSERT INTO task_steps ( + id, + user_id, + task_id, + sequence_no, + kind, + status, + request, + outcome, + trace_id, + trace_kind + ) + VALUES ( + %s, + %s, + %s, + 1, + 'governed_request', + 'created', + '{"action":"deploy"}'::jsonb, + %s, + %s, + 'telegram.seed' + ) + """, + ( + str(task_step_id), + str(user_id), + str(task_id), + json.dumps( + { + "routing_decision": "approval_required", + "approval_id": str(approval_id), + "approval_status": "pending", + "execution_id": None, + "execution_status": None, + "blocked_reason": None, + } + ), + str(trace_id), + ), + ) + cur.execute( + """ + UPDATE approvals + SET task_step_id = %s + WHERE id = %s + """, + (str(task_step_id), str(approval_id)), + ) + conn.commit() + + return approval_id + + +def test_phase10_telegram_continuity_handle_result_and_open_loop_review( + migrated_database_urls, + monkeypatch, +) -> None: + _configure_settings(migrated_database_urls, monkeypatch) + session_token, workspace_id = _bootstrap_workspace_session("p10s3-continuity@example.com") + _link_telegram_chat( + session_token=session_token, + workspace_id=workspace_id, + chat_id=75001, + user_id=76001, + username="continuity_builder", + ) + + capture_message_id = _ingest_message( + update_id=920001, + message_id=720001, + chat_id=75001, + user_id=76001, + username="continuity_builder", + text="Decision: Ship P10-S3 this week", + ) + capture_handle_status, capture_handle_payload = _handle_message( + session_token=session_token, + message_id=capture_message_id, + ) + assert capture_handle_status == 200 + assert capture_handle_payload["intent"]["intent_kind"] == "capture" + assert capture_handle_payload["intent"]["status"] == "handled" + + result_status, result_payload = invoke_request( + "GET", + f"/v1/channels/telegram/messages/{capture_message_id}/result", + headers=auth_header(session_token), + ) + assert result_status == 200 + assert result_payload["intent"]["intent_kind"] == "capture" + + recall_message_id = _ingest_message( + update_id=920002, + message_id=720002, + chat_id=75001, + user_id=76001, + username="continuity_builder", + text="/recall ship p10-s3", + ) + recall_handle_status, recall_handle_payload = _handle_message( + session_token=session_token, + message_id=recall_message_id, + ) + assert recall_handle_status == 200 + assert recall_handle_payload["intent"]["intent_kind"] == "recall" + assert recall_handle_payload["intent"]["status"] == "handled" + recall_items = recall_handle_payload["intent"]["result_payload"]["intent_result"]["recall"]["items"] + assert len(recall_items) >= 1 + continuity_object_id = recall_items[0]["id"] + + correction_message_id = _ingest_message( + update_id=920003, + message_id=720003, + chat_id=75001, + user_id=76001, + username="continuity_builder", + text=f"/correct {continuity_object_id} Decision: Ship P10-S3 after sign-off", + ) + correction_handle_status, correction_handle_payload = _handle_message( + session_token=session_token, + message_id=correction_message_id, + ) + assert correction_handle_status == 200 + assert correction_handle_payload["intent"]["intent_kind"] == "correction" + assert correction_handle_payload["intent"]["status"] == "handled" + + corrected_recall_message_id = _ingest_message( + update_id=920004, + message_id=720004, + chat_id=75001, + user_id=76001, + username="continuity_builder", + text="/recall sign-off", + ) + corrected_recall_status, corrected_recall_payload = _handle_message( + session_token=session_token, + message_id=corrected_recall_message_id, + ) + assert corrected_recall_status == 200 + corrected_title = corrected_recall_payload["intent"]["result_payload"]["intent_result"]["recall"]["items"][0][ + "title" + ] + assert "after sign-off" in corrected_title + + resume_message_id = _ingest_message( + update_id=920005, + message_id=720005, + chat_id=75001, + user_id=76001, + username="continuity_builder", + text="/resume", + ) + resume_handle_status, resume_handle_payload = _handle_message( + session_token=session_token, + message_id=resume_message_id, + ) + assert resume_handle_status == 200 + assert resume_handle_payload["intent"]["intent_kind"] == "resume" + assert resume_handle_payload["intent"]["status"] == "handled" + assert ( + resume_handle_payload["intent"]["result_payload"]["intent_result"]["brief"]["last_decision"]["item"] is not None + ) + + empty_recall_message_id = _ingest_message( + update_id=920008, + message_id=720008, + chat_id=75001, + user_id=76001, + username="continuity_builder", + text="/recall", + ) + empty_recall_status, empty_recall_payload = _handle_message( + session_token=session_token, + message_id=empty_recall_message_id, + ) + assert empty_recall_status == 200 + assert empty_recall_payload["intent"]["intent_kind"] == "recall" + assert empty_recall_payload["intent"]["status"] == "failed" + assert ( + empty_recall_payload["intent"]["result_payload"]["error"]["detail"] + == "recall intent requires a query" + ) + + open_loop_capture_id = _ingest_message( + update_id=920006, + message_id=720006, + chat_id=75001, + user_id=76001, + username="continuity_builder", + text="Next: Follow up with design review", + ) + open_loop_capture_status, _open_loop_capture_payload = _handle_message( + session_token=session_token, + message_id=open_loop_capture_id, + ) + assert open_loop_capture_status == 200 + + open_loops_status, open_loops_payload = invoke_request( + "GET", + "/v1/channels/telegram/open-loops", + headers=auth_header(session_token), + ) + assert open_loops_status == 200 + next_action_items = open_loops_payload["open_loops"]["dashboard"]["next_action"]["items"] + assert len(next_action_items) >= 1 + open_loop_id = next_action_items[0]["id"] + + review_status, review_payload = invoke_request( + "POST", + f"/v1/channels/telegram/open-loops/{open_loop_id}/review-action", + payload={"action": "deferred", "note": "waiting on external input"}, + headers=auth_header(session_token), + ) + assert review_status == 200 + assert review_payload["review_action"] == "deferred" + assert review_payload["review_log"]["review_action"] == "deferred" + + recall_endpoint_status, recall_endpoint_payload = invoke_request( + "GET", + "/v1/channels/telegram/recall", + query_params={"query": "sign-off"}, + headers=auth_header(session_token), + ) + assert recall_endpoint_status == 200 + assert len(recall_endpoint_payload["recall"]["items"]) >= 1 + + resume_endpoint_status, resume_endpoint_payload = invoke_request( + "GET", + "/v1/channels/telegram/resume", + headers=auth_header(session_token), + ) + assert resume_endpoint_status == 200 + assert "brief" in resume_endpoint_payload["resume"] + + wrong_intent_message_id = _ingest_message( + update_id=920007, + message_id=720007, + chat_id=75001, + user_id=76001, + username="continuity_builder", + text="Remember to sync final notes", + ) + wrong_intent_status, wrong_intent_payload = _handle_message( + session_token=session_token, + message_id=wrong_intent_message_id, + intent_hint="recall", + ) + assert wrong_intent_status == 200 + assert wrong_intent_payload["intent"]["status"] == "failed" + assert wrong_intent_payload["intent"]["result_payload"]["error"]["code"] == "intent_hint_mismatch" + + +def test_phase10_telegram_approval_endpoints_and_chat_resolution( + migrated_database_urls, + monkeypatch, +) -> None: + _configure_settings(migrated_database_urls, monkeypatch) + session_token, workspace_id = _bootstrap_workspace_session("p10s3-approvals@example.com") + _link_telegram_chat( + session_token=session_token, + workspace_id=workspace_id, + chat_id=85001, + user_id=86001, + username="approval_builder", + ) + + seed_message_id = _ingest_message( + update_id=930001, + message_id=730001, + chat_id=85001, + user_id=86001, + username="approval_builder", + text="Decision: establish continuity user shadow", + ) + seed_handle_status, _seed_handle_payload = _handle_message( + session_token=session_token, + message_id=seed_message_id, + ) + assert seed_handle_status == 200 + + session_status, session_payload = invoke_request( + "GET", + "/v1/auth/session", + headers=auth_header(session_token), + ) + assert session_status == 200 + user_account_id = UUID(session_payload["user_account"]["id"]) + + approval_a = _seed_pending_approval( + admin_db_url=migrated_database_urls["admin"], + user_id=user_account_id, + seed_key="approval-a", + ) + approval_b = _seed_pending_approval( + admin_db_url=migrated_database_urls["admin"], + user_id=user_account_id, + seed_key="approval-b", + ) + approval_c = _seed_pending_approval( + admin_db_url=migrated_database_urls["admin"], + user_id=user_account_id, + seed_key="approval-c", + ) + approval_d = _seed_pending_approval( + admin_db_url=migrated_database_urls["admin"], + user_id=user_account_id, + seed_key="approval-d", + ) + + list_status, list_payload = invoke_request( + "GET", + "/v1/channels/telegram/approvals", + headers=auth_header(session_token), + ) + assert list_status == 200 + listed_ids = {item["id"] for item in list_payload["items"]} + assert str(approval_a) in listed_ids + assert str(approval_b) in listed_ids + assert list_payload["summary"]["pending_count"] >= 4 + + approve_status, approve_payload = invoke_request( + "POST", + f"/v1/channels/telegram/approvals/{approval_a}/approve", + payload={}, + headers=auth_header(session_token), + ) + assert approve_status == 200 + assert approve_payload["approval"]["status"] == "approved" + assert isinstance(approve_payload["challenge_updates"], list) + + reject_status, reject_payload = invoke_request( + "POST", + f"/v1/channels/telegram/approvals/{approval_b}/reject", + payload={}, + headers=auth_header(session_token), + ) + assert reject_status == 200 + assert reject_payload["approval"]["status"] == "rejected" + assert isinstance(reject_payload["challenge_updates"], list) + + approvals_message_id = _ingest_message( + update_id=930002, + message_id=730002, + chat_id=85001, + user_id=86001, + username="approval_builder", + text="/approvals", + ) + approvals_handle_status, approvals_handle_payload = _handle_message( + session_token=session_token, + message_id=approvals_message_id, + ) + assert approvals_handle_status == 200 + assert approvals_handle_payload["intent"]["intent_kind"] == "approvals" + + missing_approve_id_message_id = _ingest_message( + update_id=930005, + message_id=730005, + chat_id=85001, + user_id=86001, + username="approval_builder", + text="/approve", + ) + missing_approve_id_status, missing_approve_id_payload = _handle_message( + session_token=session_token, + message_id=missing_approve_id_message_id, + ) + assert missing_approve_id_status == 200 + assert missing_approve_id_payload["intent"]["intent_kind"] == "approval_approve" + assert missing_approve_id_payload["intent"]["status"] == "failed" + assert ( + missing_approve_id_payload["intent"]["result_payload"]["error"]["detail"] + == "approve intent requires approval id" + ) + + missing_reject_id_message_id = _ingest_message( + update_id=930006, + message_id=730006, + chat_id=85001, + user_id=86001, + username="approval_builder", + text="/reject", + ) + missing_reject_id_status, missing_reject_id_payload = _handle_message( + session_token=session_token, + message_id=missing_reject_id_message_id, + ) + assert missing_reject_id_status == 200 + assert missing_reject_id_payload["intent"]["intent_kind"] == "approval_reject" + assert missing_reject_id_payload["intent"]["status"] == "failed" + assert ( + missing_reject_id_payload["intent"]["result_payload"]["error"]["detail"] + == "reject intent requires approval id" + ) + + approve_chat_message_id = _ingest_message( + update_id=930003, + message_id=730003, + chat_id=85001, + user_id=86001, + username="approval_builder", + text=f"/approve {approval_c}", + ) + approve_chat_status, approve_chat_payload = _handle_message( + session_token=session_token, + message_id=approve_chat_message_id, + ) + assert approve_chat_status == 200 + assert approve_chat_payload["intent"]["intent_kind"] == "approval_approve" + assert approve_chat_payload["intent"]["status"] == "handled" + + reject_chat_message_id = _ingest_message( + update_id=930004, + message_id=730004, + chat_id=85001, + user_id=86001, + username="approval_builder", + text=f"/reject {approval_d} no longer needed", + ) + reject_chat_status, reject_chat_payload = _handle_message( + session_token=session_token, + message_id=reject_chat_message_id, + ) + assert reject_chat_status == 200 + assert reject_chat_payload["intent"]["intent_kind"] == "approval_reject" + assert reject_chat_payload["intent"]["status"] == "handled" + + pending_after_status, pending_after_payload = invoke_request( + "GET", + "/v1/channels/telegram/approvals", + headers=auth_header(session_token), + ) + assert pending_after_status == 200 + assert pending_after_payload["summary"]["pending_count"] == 0 diff --git a/tests/unit/test_20260408_0045_phase10_chat_continuity_approvals.py b/tests/unit/test_20260408_0045_phase10_chat_continuity_approvals.py new file mode 100644 index 0000000..6f6da97 --- /dev/null +++ b/tests/unit/test_20260408_0045_phase10_chat_continuity_approvals.py @@ -0,0 +1,67 @@ +from __future__ import annotations + +import importlib + + +MODULE_NAME = "apps.api.alembic.versions.20260408_0045_phase10_chat_continuity_approvals" + + +def load_migration_module(): + return importlib.import_module(MODULE_NAME) + + +def test_upgrade_executes_expected_statements_in_order(monkeypatch) -> None: + module = load_migration_module() + executed: list[str] = [] + + monkeypatch.setattr(module.op, "execute", executed.append) + + module.upgrade() + + assert executed == [ + *module._UPGRADE_STATEMENTS, + *module._UPGRADE_GRANT_STATEMENTS, + ] + + +def test_downgrade_executes_expected_statements_in_order(monkeypatch) -> None: + module = load_migration_module() + executed: list[str] = [] + + monkeypatch.setattr(module.op, "execute", executed.append) + + module.downgrade() + + assert executed == list(module._DOWNGRADE_STATEMENTS) + + +def test_migration_adds_chat_intent_result_fields_and_new_tables() -> None: + module = load_migration_module() + joined_upgrade_sql = "\n".join(module._UPGRADE_STATEMENTS) + + assert "ADD COLUMN intent_payload jsonb" in joined_upgrade_sql + assert "ADD COLUMN result_payload jsonb" in joined_upgrade_sql + assert "ADD COLUMN handled_at timestamptz" in joined_upgrade_sql + assert "CREATE TABLE approval_challenges" in joined_upgrade_sql + assert "CREATE TABLE open_loop_reviews" in joined_upgrade_sql + + +def test_migration_extends_intent_routing_checks_for_phase10_s3() -> None: + module = load_migration_module() + joined_upgrade_sql = "\n".join(module._UPGRADE_STATEMENTS) + + for marker in ( + "'capture'", + "'recall'", + "'resume'", + "'correction'", + "'open_loops'", + "'open_loop_review'", + "'approvals'", + "'approval_approve'", + "'approval_reject'", + "'unknown'", + ): + assert marker in joined_upgrade_sql + + assert "('pending', 'recorded', 'handled', 'failed')" in joined_upgrade_sql diff --git a/tests/unit/test_telegram_continuity.py b/tests/unit/test_telegram_continuity.py new file mode 100644 index 0000000..1f0ab51 --- /dev/null +++ b/tests/unit/test_telegram_continuity.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +from alicebot_api.telegram_continuity import classify_telegram_message_intent + + +def test_classify_routes_capture_by_default() -> None: + classified = classify_telegram_message_intent("Decision: ship P10-S3") + + assert classified["intent_kind"] == "capture" + assert classified["intent_payload"]["raw_content"] == "Decision: ship P10-S3" + + +def test_classify_routes_recall_resume_open_loop_and_approvals_commands() -> None: + recall = classify_telegram_message_intent("/recall sprint objective") + resume = classify_telegram_message_intent("/resume") + open_loops = classify_telegram_message_intent("/open-loops") + approvals = classify_telegram_message_intent("/approvals") + + assert recall["intent_kind"] == "recall" + assert recall["intent_payload"]["query"] == "sprint objective" + assert resume["intent_kind"] == "resume" + assert open_loops["intent_kind"] == "open_loops" + assert approvals["intent_kind"] == "approvals" + + +def test_classify_routes_correction_and_open_loop_review_commands() -> None: + object_id = "aaaaaaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaa" + correction = classify_telegram_message_intent(f"/correct {object_id} Decision: use deterministic routing") + review = classify_telegram_message_intent(f"/open-loop {object_id} deferred needs new signal") + + assert correction["intent_kind"] == "correction" + assert correction["intent_payload"]["continuity_object_id"] == object_id + assert correction["intent_payload"]["replacement_title"] == "Decision: use deterministic routing" + + assert review["intent_kind"] == "open_loop_review" + assert review["intent_payload"]["continuity_object_id"] == object_id + assert review["intent_payload"]["action"] == "deferred" + assert review["intent_payload"]["note"] == "needs new signal" + + +def test_classify_routes_approval_resolution_commands() -> None: + approval_id = "bbbbbbbb-bbbb-4bbb-8bbb-bbbbbbbbbbbb" + approve = classify_telegram_message_intent(f"/approve {approval_id}") + reject = classify_telegram_message_intent(f"/reject {approval_id} no longer needed") + + assert approve["intent_kind"] == "approval_approve" + assert approve["intent_payload"]["approval_id"] == approval_id + + assert reject["intent_kind"] == "approval_reject" + assert reject["intent_payload"]["approval_id"] == approval_id + assert reject["intent_payload"]["note"] == "no longer needed" + + +def test_classify_empty_message_is_unknown() -> None: + classified = classify_telegram_message_intent(" ") + + assert classified["intent_kind"] == "unknown" + assert classified["intent_payload"]["reason"] == "empty_message"