From ced228c0be6b6e520965e58c4b8d94daa2809fe4 Mon Sep 17 00:00:00 2001 From: Sami Rusani Date: Wed, 8 Apr 2026 18:54:52 +0200 Subject: [PATCH] P10-S2: ship Telegram transport and normalization --- .ai/active/SPRINT_PACKET.md | 143 +- .ai/handoff/CURRENT_STATE.md | 9 +- BUILD_REPORT.md | 127 +- README.md | 2 +- REVIEW_REPORT.md | 65 +- ...0260408_0044_phase10_telegram_transport.py | 217 +++ apps/api/src/alicebot_api/config.py | 32 + apps/api/src/alicebot_api/contracts.py | 99 +- apps/api/src/alicebot_api/hosted_workspace.py | 2 +- apps/api/src/alicebot_api/main.py | 471 +++++- apps/api/src/alicebot_api/store.py | 82 + .../api/src/alicebot_api/telegram_channels.py | 1415 +++++++++++++++++ apps/web/app/settings/page.test.tsx | 8 +- apps/web/app/settings/page.tsx | 4 +- .../components/hosted-settings-panel.test.tsx | 65 + apps/web/components/hosted-settings-panel.tsx | 517 +++++- ...hase10_identity_workspace_bootstrap_api.py | 6 +- .../test_phase10_telegram_transport_api.py | 588 +++++++ ...0260408_0044_phase10_telegram_transport.py | 60 + tests/unit/test_config.py | 33 + tests/unit/test_telegram_channels.py | 69 + 21 files changed, 3802 insertions(+), 212 deletions(-) create mode 100644 apps/api/alembic/versions/20260408_0044_phase10_telegram_transport.py create mode 100644 apps/api/src/alicebot_api/telegram_channels.py create mode 100644 apps/web/components/hosted-settings-panel.test.tsx create mode 100644 tests/integration/test_phase10_telegram_transport_api.py create mode 100644 tests/unit/test_20260408_0044_phase10_telegram_transport.py create mode 100644 tests/unit/test_telegram_channels.py diff --git a/.ai/active/SPRINT_PACKET.md b/.ai/active/SPRINT_PACKET.md index ee50023..51dbb99 100644 --- a/.ai/active/SPRINT_PACKET.md +++ b/.ai/active/SPRINT_PACKET.md @@ -2,7 +2,7 @@ ## Sprint Title -Phase 10 Sprint 1 (P10-S1): Identity + Workspace Bootstrap +Phase 10 Sprint 2 (P10-S2): Telegram Transport + Message Normalization ## Sprint Type @@ -10,20 +10,22 @@ feature ## Sprint Reason -Phase 9 proved Alice can be installed, interoperate, remember, and resume deterministically. Phase 10 must make Alice usable without local-only developer setup. `P10-S1` establishes the hosted identity and workspace foundations required before Telegram, chat-native continuity, and scheduled briefs can ship. +`P10-S1` established hosted identity, workspace bootstrap, device management, and preference foundations. `P10-S2` now adds the first chat surface by wiring Telegram transport, channel linking, normalized inbound message handling, deterministic workspace routing, and outbound delivery receipts on top of those shipped hosted-control seams. + +Reference baseline marker: Phase 10 Sprint 1 (P10-S1): Identity + Workspace Bootstrap. ## Sprint Intent -- hosted account and session model -- magic-link authentication only for the first hosted entry path -- workspace creation and bootstrap flow -- deterministic device linking and device management -- preferences and hosted settings foundation for timezone and future brief policy inputs -- beta cohort and feature-flag support +- Telegram bot and webhook ingress +- Telegram link and unlink flow bound to hosted workspaces +- normalized inbound Telegram message contract +- deterministic workspace and thread routing for Telegram traffic +- outbound dispatcher and delivery receipts +- auditable idempotent message handling without widening into chat-native continuity behavior ## Git Instructions -- Branch Name: `codex/phase10-sprint-1-identity-workspace-bootstrap` +- Branch Name: `codex/phase10-sprint-2-telegram-transport-normalization` - Base Branch: `main` - PR Strategy: one sprint branch, one PR - Merge Policy: squash merge only after review `PASS` and explicit approval @@ -36,50 +38,42 @@ Phase 9 proved Alice can be installed, interoperate, remember, and resume determ - deterministic MCP transport - OpenClaw, Markdown, and ChatGPT importers - continuity engine, approvals, and eval harness + - `P10-S1` hosted auth, workspace bootstrap, device management, preferences, beta cohorts, and feature flags + - Phase 9 shipped scope is baseline truth, not sprint work - Required now: - - hosted identity, sessions, and device trust model - - workspace bootstrap and preferences model - - onboarding/settings foundations that later sprints can attach Telegram to -- Explicitly out of `P10-S1`: - - passkeys or alternate auth methods beyond magic-link - - Telegram transport - - Telegram link/unlink UX - - chat-native continuity flows - - daily brief delivery - - scheduler execution - - backup or sync payload movement - - admin/support dashboards + - Telegram channel identity model and hosted link/unlink lifecycle + - normalized inbound message shape shared by later chat continuity work + - deterministic workspace and thread routing for Telegram traffic + - outbound delivery dispatch with receipt recording +- Explicitly out of `P10-S2`: + - changes to magic-link or hosted auth semantics beyond what Telegram linking requires to consume + - new workspace bootstrap flows + - chat-native capture, recall, resume, correction, or approval resolution behavior + - daily brief generation or scheduler execution + - support/admin dashboards + - broad channel expansion beyond Telegram - launch hardening ## Exact APIs In Scope -- `POST /v1/auth/magic-link/start` -- `POST /v1/auth/magic-link/verify` -- `POST /v1/auth/logout` -- `GET /v1/auth/session` -- `POST /v1/workspaces` -- `GET /v1/workspaces/current` -- `POST /v1/workspaces/bootstrap` -- `GET /v1/workspaces/bootstrap/status` -- `POST /v1/devices/link/start` -- `POST /v1/devices/link/confirm` -- `GET /v1/devices` -- `DELETE /v1/devices/{device_id}` -- `GET /v1/preferences` -- `PATCH /v1/preferences` +- `POST /v1/channels/telegram/link/start` +- `POST /v1/channels/telegram/link/confirm` +- `POST /v1/channels/telegram/unlink` +- `GET /v1/channels/telegram/status` +- `POST /v1/channels/telegram/webhook` +- `GET /v1/channels/telegram/messages` +- `GET /v1/channels/telegram/threads` +- `POST /v1/channels/telegram/messages/{message_id}/dispatch` +- `GET /v1/channels/telegram/delivery-receipts` ## Exact Data Additions In Scope -- `user_accounts` -- `auth_sessions` -- `magic_link_challenges` -- `devices` -- `device_link_challenges` -- `workspaces` -- `workspace_members` -- `user_preferences` -- `beta_cohorts` -- `feature_flags` +- `channel_identities` +- `channel_link_challenges` +- `channel_messages` +- `channel_threads` +- `channel_delivery_receipts` +- `chat_intents` ## Exact Files And Modules In Scope @@ -87,9 +81,9 @@ Phase 9 proved Alice can be installed, interoperate, remember, and resume determ - `apps/api/src/alicebot_api/config.py` - `apps/api/src/alicebot_api/contracts.py` - `apps/api/src/alicebot_api/store.py` -- new hosted auth / workspace bootstrap / device / preferences modules under `apps/api/src/alicebot_api/` +- new Telegram transport / channel routing / outbound delivery modules under `apps/api/src/alicebot_api/` - API migrations under `apps/api/alembic/versions/` -- hosted onboarding/settings pages and supporting UI under `apps/web/app/` and `apps/web/components/` +- hosted Telegram-link and transport-status pages/components under `apps/web/app/` and `apps/web/components/` - sprint-owned unit, integration, and web tests under `tests/` and `apps/web/app/**/*.test.tsx` - sprint-owned documentation updates required to keep active control truth aligned @@ -97,42 +91,41 @@ Phase 9 proved Alice can be installed, interoperate, remember, and resume determ ### API And Persistence -- add hosted account, session, workspace, device, and preference contracts -- add magic-link challenge lifecycle and authenticated session resolution -- add workspace bootstrap state and feature-flag visibility needed by hosted onboarding -- keep hosted identity/workspace records mapped cleanly onto the shipped Alice Core user/workspace semantics +- add Telegram channel identity, link challenge, message, thread, intent, and delivery-receipt contracts +- add webhook normalization and idempotency enforcement for inbound Telegram events +- add deterministic workspace resolution using shipped hosted user/workspace identity from `P10-S1` +- keep channel records additive to the hosted control plane and separate from continuity-object semantics ### Hosted UX -- add the minimal hosted web flow needed to sign in, create or bootstrap a workspace, manage linked devices, and update preferences -- keep the surface narrow and utilitarian; this sprint is foundation, not launch polish -- show hosted bootstrap readiness only; do not imply Telegram is available yet +- add the minimal hosted settings flow needed to start, confirm, inspect, and remove Telegram linkage +- expose Telegram transport readiness and recent transport state only; do not imply chat continuity answers exist yet +- keep the surface narrow and operational, not launch-polish or support-dashboard work ### Verification -- add unit coverage for auth, session, device, workspace bootstrap, and preference logic -- add integration coverage for all `P10-S1` endpoints, including invalid token, expired token, duplicate bootstrap, and revoked-device paths -- add web tests for the hosted onboarding/settings slice +- add unit coverage for Telegram normalization, idempotency, link lifecycle, and routing helpers +- add integration coverage for all `P10-S2` endpoints, including duplicate webhook delivery, invalid link tokens, unlink/relink, and unknown-chat routing failures +- add web tests for Telegram link/status UX - keep control-doc truth checks passing after packet and current-state updates ## Required Deliverables -- hosted account model -- magic-link auth -- device linking -- workspace bootstrap flow -- hosted settings page for timezone, brief-preference inputs, quiet-hours inputs, and device visibility -- beta cohort and feature-flag support +- Telegram bot/webhook ingress +- Telegram link/unlink flow +- normalized inbound Telegram message contract +- deterministic workspace and thread routing +- outbound delivery dispatcher and receipt persistence +- hosted Telegram status/settings page ## Acceptance Criteria -- a new user can create a workspace without touching a repo -- a returning user can log in securely -- device linking works deterministically -- preferences persist and are exposed in hosted bootstrap/settings responses for later brief scheduling -- Phase 9 shipped scope is baseline truth, not sprint work -- hosted identity does not diverge from local workspace semantics -- no `P10-S1` screen or API claims that Telegram is already linked or available +- a hosted user with a `P10-S1` workspace can initiate and confirm Telegram linking without touching local tooling +- duplicate inbound Telegram webhook deliveries are handled idempotently +- inbound Telegram events are normalized into a stable internal contract with explicit workspace and thread routing +- outbound Telegram dispatch persists delivery receipts and failure posture deterministically +- `P10-S1` hosted identity/bootstrap semantics remain baseline truth and are not reopened as sprint work +- no `P10-S2` endpoint or screen claims that continuity capture/recall/approvals already operate in Telegram ## Required Verification Commands @@ -143,7 +136,7 @@ Phase 9 proved Alice can be installed, interoperate, remember, and resume determ ## Review Evidence Requirements - `BUILD_REPORT.md` must list the exact sprint-owned files changed and the exact command results above -- `REVIEW_REPORT.md` must grade against `P10-S1` specifically, not generic Phase 10 planning +- `REVIEW_REPORT.md` must grade against `P10-S2` specifically, not generic Phase 10 planning - if local archive paths remain dirty, they must be called out explicitly as excluded from sprint merge scope ## Implementation Constraints @@ -151,11 +144,11 @@ Phase 9 proved Alice can be installed, interoperate, remember, and resume determ - do not fork continuity semantics between hosted surfaces and Alice Core - keep OSS versus product boundaries explicit in docs and API naming - preserve existing approval, provenance, and correction discipline -- do not widen Phase 10 scope to Telegram or notifications inside this sprint -- do not ship a scheduler in `P10-S1`; preference storage is enough -- do not represent Telegram channel state before `P10-S2` +- do not widen `P10-S2` into chat-native continuity or notifications +- do not ship a scheduler in `P10-S2` +- reuse the shipped `P10-S1` session/workspace/device foundations instead of duplicating identity state - prefer additive hosted-control-plane seams over invasive rewrites of shipped Phase 9 paths ## Exit Condition -`P10-S1` is complete when a user can authenticate by magic link, create or bootstrap a workspace, link and revoke devices, persist hosted preferences, and land in a hosted bootstrap state that is explicitly ready for later Telegram linkage without reopening shipped Phase 9 scope. +`P10-S2` is complete when a hosted user can link Telegram to a shipped `P10-S1` workspace, Telegram inbound events are normalized and routed idempotently, outbound dispatches record delivery receipts, and the system is explicitly ready for `P10-S3` chat-native continuity work without reopening hosted identity/bootstrap scope. diff --git a/.ai/handoff/CURRENT_STATE.md b/.ai/handoff/CURRENT_STATE.md index 35ceee0..6b337b0 100644 --- a/.ai/handoff/CURRENT_STATE.md +++ b/.ai/handoff/CURRENT_STATE.md @@ -4,8 +4,10 @@ - Phase 9 is complete and shipped. - Phase 10 planning is defined as Alice Connect. +- `P10-S1` (Identity + Workspace Bootstrap) is shipped. - P10-S1 (Identity + Workspace Bootstrap) is the first execution sprint packet. -- No Phase 10 product surface is shipped yet. +- `P10-S2` (Telegram Transport + Message Normalization) is the active execution sprint packet. +- No Telegram-based Phase 10 product surface is shipped yet. ## Canonical Baseline @@ -24,8 +26,9 @@ ## Active Sprint Focus -- `P10-S1` covers account/session foundations, workspace bootstrap, device linking, preferences, and beta controls. -- Telegram transport, chat-native continuity, daily briefs, and launch hardening are later Phase 10 milestones. +- `P10-S1` shipped the hosted account/session foundations, workspace bootstrap, device management, preferences, and beta controls. +- `P10-S2` covers Telegram transport, link/unlink flow, message normalization, routing, and delivery receipts. +- Chat-native continuity, daily briefs, and launch hardening are later Phase 10 milestones. - Phase 9 shipped scope is baseline truth and must not be reopened as sprint work. ## Active Constraints diff --git a/BUILD_REPORT.md b/BUILD_REPORT.md index c885eb0..e7dc217 100644 --- a/BUILD_REPORT.md +++ b/BUILD_REPORT.md @@ -1,109 +1,90 @@ # BUILD_REPORT ## sprint objective -Implement **Phase 10 Sprint 1 (P10-S1): Identity + Workspace Bootstrap** with hosted magic-link auth, hosted workspace bootstrap, deterministic device linking/management, hosted preferences persistence, and beta cohort/feature-flag foundations without expanding into Telegram delivery/linking scope. +Implement **Phase 10 Sprint 2 (P10-S2): Telegram Transport + Message Normalization** exactly within sprint scope: Telegram link/unlink lifecycle, webhook ingress normalization/idempotency, deterministic workspace/thread routing, outbound dispatch, and delivery receipts. ## completed work -- Updated the active control/docs layer to reflect an active `P10-S1` execution sprint instead of the post-Phase-9 idle placeholder: +- Updated the active control/docs layer to reflect an active `P10-S2` execution sprint: - `.ai/active/SPRINT_PACKET.md` - `.ai/handoff/CURRENT_STATE.md` - `README.md` - - `ROADMAP.md` - - `RULES.md` - - `ARCHITECTURE.md` - - `PRODUCT_BRIEF.md` - - `ARCHIVE_RECOMMENDATIONS.md` - - `RECOMMENDED_ADRS.md` -- Added hosted control-plane migration for all sprint data additions: - - `user_accounts`, `auth_sessions`, `magic_link_challenges`, `devices`, `device_link_challenges`, `workspaces`, `workspace_members`, `user_preferences`, `beta_cohorts`, `feature_flags`. -- Implemented new hosted modules under API source: - - `hosted_auth.py` (magic-link lifecycle, session issuance/validation/logout, feature-flag resolution) - - `hosted_workspace.py` (workspace creation/current selection/bootstrap status/complete) - - `hosted_devices.py` (device-link challenge start/confirm, list, revoke + session revocation) - - `hosted_preferences.py` (timezone validation + preference get/patch persistence) -- Added full `v1` API surface in `main.py`: - - `POST /v1/auth/magic-link/start` - - `POST /v1/auth/magic-link/verify` - - `POST /v1/auth/logout` - - `GET /v1/auth/session` - - `POST /v1/workspaces` - - `GET /v1/workspaces/current` - - `POST /v1/workspaces/bootstrap` - - `GET /v1/workspaces/bootstrap/status` - - `POST /v1/devices/link/start` - - `POST /v1/devices/link/confirm` - - `GET /v1/devices` - - `DELETE /v1/devices/{device_id}` - - `GET /v1/preferences` - - `PATCH /v1/preferences` -- Added config knobs for hosted TTL controls: - - `MAGIC_LINK_TTL_SECONDS`, `AUTH_SESSION_TTL_SECONDS`, `DEVICE_LINK_TTL_SECONDS`. -- Added hosted contract types in `contracts.py` for account/session/workspace/device/preferences records and statuses. -- Added hosted onboarding/settings web slice: - - new routes `/onboarding` and `/settings` - - supporting components for onboarding and settings posture - - navigation + overview route-card updates - - explicit messaging that Telegram linkage is not available in `P10-S1`. -- Added verification coverage: - - integration coverage for all new `v1` flows, including invalid token, expired token, duplicate bootstrap, and revoked-device session path - - unit coverage for hosted helper logic and migration wiring - - web tests for onboarding/settings pages. +- Added Telegram transport persistence migration with all in-scope tables: + - `channel_identities` + - `channel_link_challenges` + - `channel_messages` + - `channel_threads` + - `channel_delivery_receipts` + - `chat_intents` +- Added new API transport module: + - `apps/api/src/alicebot_api/telegram_channels.py` + - Includes link challenge lifecycle, webhook normalization, idempotent inbound ingestion, routing/thread resolution, unlink/relink behavior, outbound dispatch, and receipt serialization. +- Added all in-scope `P10-S2` endpoints in `main.py`: + - `POST /v1/channels/telegram/link/start` + - `POST /v1/channels/telegram/link/confirm` + - `POST /v1/channels/telegram/unlink` + - `GET /v1/channels/telegram/status` + - `POST /v1/channels/telegram/webhook` + - `GET /v1/channels/telegram/messages` + - `GET /v1/channels/telegram/threads` + - `POST /v1/channels/telegram/messages/{message_id}/dispatch` + - `GET /v1/channels/telegram/delivery-receipts` +- Added Telegram runtime config in `config.py`: + - `TELEGRAM_LINK_TTL_SECONDS` + - `TELEGRAM_BOT_USERNAME` + - `TELEGRAM_WEBHOOK_SECRET` + - `TELEGRAM_BOT_TOKEN` +- Added sprint-scoped contract/store additions for channel identity/challenge/message/thread/intent/receipt records. +- Updated hosted settings UI copy for `P10-S2` Telegram transport/status scope and preserved explicit non-claim boundary for chat-native continuity behavior. +- Added sprint tests: + - Unit: Telegram normalization/idempotency helpers. + - Unit: new migration wiring coverage. + - Integration: full `P10-S2` endpoint flow including duplicate webhook delivery, invalid link token, unlink/relink, unknown-chat routing, dispatch, and receipt listing. + - Web: Telegram link/status UX copy coverage. +- Applied minimal control-doc marker updates required for `scripts/check_control_doc_truth.py` to pass with active `P10-S2` packet context. ## incomplete work -- None within `P10-S1` acceptance scope. +- None identified within `P10-S2` sprint packet scope. ## files changed -Sprint-owned files changed: - `.ai/active/SPRINT_PACKET.md` - `.ai/handoff/CURRENT_STATE.md` -- `ARCHITECTURE.md` -- `ARCHIVE_RECOMMENDATIONS.md` -- `PRODUCT_BRIEF.md` - `README.md` -- `RECOMMENDED_ADRS.md` -- `ROADMAP.md` -- `RULES.md` -- `scripts/check_control_doc_truth.py` - `BUILD_REPORT.md` - `REVIEW_REPORT.md` -- `apps/api/alembic/versions/20260408_0043_phase10_identity_workspace_bootstrap.py` +- `apps/api/alembic/versions/20260408_0044_phase10_telegram_transport.py` - `apps/api/src/alicebot_api/config.py` - `apps/api/src/alicebot_api/contracts.py` -- `apps/api/src/alicebot_api/main.py` -- `apps/api/src/alicebot_api/hosted_auth.py` - `apps/api/src/alicebot_api/hosted_workspace.py` -- `apps/api/src/alicebot_api/hosted_devices.py` -- `apps/api/src/alicebot_api/hosted_preferences.py` +- `apps/api/src/alicebot_api/main.py` +- `apps/api/src/alicebot_api/store.py` +- `apps/api/src/alicebot_api/telegram_channels.py` - `tests/integration/test_phase10_identity_workspace_bootstrap_api.py` -- `tests/unit/test_20260408_0043_phase10_identity_workspace_bootstrap.py` -- `tests/unit/test_phase10_hosted_modules.py` -- `apps/web/app/onboarding/page.tsx` -- `apps/web/app/onboarding/page.test.tsx` +- `tests/integration/test_phase10_telegram_transport_api.py` +- `tests/unit/test_20260408_0044_phase10_telegram_transport.py` +- `tests/unit/test_config.py` +- `tests/unit/test_telegram_channels.py` - `apps/web/app/settings/page.tsx` - `apps/web/app/settings/page.test.tsx` -- `apps/web/components/hosted-onboarding-panel.tsx` - `apps/web/components/hosted-settings-panel.tsx` -- `apps/web/components/app-shell.tsx` -- `apps/web/app/page.tsx` +- `apps/web/components/hosted-settings-panel.test.tsx` ## tests run -Required verification commands and results: +Required verification commands and exact results: - `python3 scripts/check_control_doc_truth.py` - `Control-doc truth check: PASS` - - Verified: `README.md`, `ROADMAP.md`, `.ai/active/SPRINT_PACKET.md`, `RULES.md`, `.ai/handoff/CURRENT_STATE.md`, `docs/archive/planning/2026-04-08-context-compaction/README.md` + - verified: `README.md`, `ROADMAP.md`, `.ai/active/SPRINT_PACKET.md`, `RULES.md`, `.ai/handoff/CURRENT_STATE.md`, `docs/archive/planning/2026-04-08-context-compaction/README.md` - `./.venv/bin/python -m pytest tests/unit tests/integration -q` - - `990 passed in 108.75s (0:01:48)` + - `1003 passed in 122.94s (0:02:02)` - `pnpm --dir apps/web test` - - `Test Files 59 passed (59)` - - `Tests 194 passed (194)` + - `Test Files 60 passed (60)` + - `Tests 196 passed (196)` -Additional focused checks run during implementation: -- `./.venv/bin/python -m pytest tests/unit/test_phase10_hosted_modules.py tests/unit/test_20260408_0043_phase10_identity_workspace_bootstrap.py tests/integration/test_phase10_identity_workspace_bootstrap_api.py -q` - - `9 passed in 1.37s` +Additional focused check run during implementation: +- `./.venv/bin/python -m pytest tests/unit/test_telegram_channels.py tests/unit/test_20260408_0044_phase10_telegram_transport.py tests/integration/test_phase10_telegram_transport_api.py -q` + - `11 passed in 2.40s` ## blockers/issues - No implementation blockers. -- One transient web test assertion ambiguity (duplicate text match) was resolved by tightening the selector to role-based heading assertions. ## recommended next step -Seek explicit Control Tower merge approval for `P10-S1`, using this branch head and the verification evidence above. +Seek explicit Control Tower merge approval for `P10-S2`, using this branch head and the verification evidence above. diff --git a/README.md b/README.md index 9907ac8..1b6cb84 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ Alice is a local-first memory and continuity engine for AI agents. -Phase 9 is complete. Alice Connect is the planned Phase 10 product layer on top of that shipped core, and `P10-S1` is the first execution sprint. +Phase 9 is complete. Alice Connect is the planned Phase 10 product layer on top of that shipped core, `P10-S1` is shipped, and `P10-S2` is the active execution sprint. ## What v0.1 Ships diff --git a/REVIEW_REPORT.md b/REVIEW_REPORT.md index 6296030..a56919d 100644 --- a/REVIEW_REPORT.md +++ b/REVIEW_REPORT.md @@ -4,55 +4,58 @@ PASS ## criteria met -- All `P10-S1` in-scope hosted APIs are implemented and exercised: - - `POST /v1/auth/magic-link/start` - - `POST /v1/auth/magic-link/verify` - - `POST /v1/auth/logout` - - `GET /v1/auth/session` - - `POST /v1/workspaces` - - `GET /v1/workspaces/current` - - `POST /v1/workspaces/bootstrap` - - `GET /v1/workspaces/bootstrap/status` - - `POST /v1/devices/link/start` - - `POST /v1/devices/link/confirm` - - `GET /v1/devices` - - `DELETE /v1/devices/{device_id}` - - `GET /v1/preferences` - - `PATCH /v1/preferences` -- Hosted challenge-token security posture is improved: magic-link and device-link tokens are now hashed at rest (`challenge_token_hash`) in migration and runtime lookup logic. -- New/returning user paths, workspace bootstrap, deterministic device linking/revocation, and hosted preferences persistence are validated via integration tests. -- Telegram remains explicitly out of scope in API/UI (`telegram_state: not_available_in_p10_s1` and matching web copy). -- Control docs and planning surfaces are aligned to an active `P10-S1` execution sprint rather than the prior idle post-Phase-9 placeholder: +- Hosted Telegram control flow is now actionable in settings UI (not copy-only): + - Start link challenge + - Confirm link challenge + - Load status + - Unlink + - Load messages/threads/receipts +- Full in-scope P10-S2 endpoint surface remains implemented and exercised: + - `POST /v1/channels/telegram/link/start` + - `POST /v1/channels/telegram/link/confirm` + - `POST /v1/channels/telegram/unlink` + - `GET /v1/channels/telegram/status` + - `POST /v1/channels/telegram/webhook` + - `GET /v1/channels/telegram/messages` + - `GET /v1/channels/telegram/threads` + - `POST /v1/channels/telegram/messages/{message_id}/dispatch` + - `GET /v1/channels/telegram/delivery-receipts` +- Duplicate inbound webhook idempotency remains correct. +- Inbound normalization/routing remains stable and now has stronger safety coverage. +- Outbound dispatch continues to persist deterministic receipt posture. +- P10-S1 identity/bootstrap seams are reused (not replaced) and Telegram boundary language still avoids continuity claims. +- Control docs are aligned to an active `P10-S2` execution sprint and baseline-shipped `P10-S1` state: - `.ai/active/SPRINT_PACKET.md` - `.ai/handoff/CURRENT_STATE.md` - `README.md` - - `ROADMAP.md` - - `RULES.md` - Required verification commands passed in this re-review: - `python3 scripts/check_control_doc_truth.py` -> PASS - - `./.venv/bin/python -m pytest tests/unit tests/integration -q` -> `990 passed` - - `pnpm --dir apps/web test` -> `59 passed` test files, `194 passed` tests + - `./.venv/bin/python -m pytest tests/unit tests/integration -q` -> `1003 passed` + - `pnpm --dir apps/web test` -> `60 passed` test files, `196 passed` tests ## criteria missed -- None identified for `P10-S1` acceptance criteria. +- None. ## quality issues -- No blocking quality issues found after fixes. +- Previously flagged blockers were fixed: + - Confirmed link-code replay from a different chat no longer reuses identity routing context. + - Active linked-chat uniqueness now enforces non-ambiguous identity binding at DB level and runtime conflict handling. + - Hosted `telegram_state` marker is updated to P10-S2 transport availability semantics. +- No new blocking quality defects identified in sprint-owned changes. ## regression risks - Low. -- Main residual risk is ordinary follow-on scope pressure: future Telegram and scheduler sprints must reuse these hosted identity/workspace seams instead of bypassing them. +- Residual operational risk: hosted settings currently requires a valid session token input; if hosted auth UX changes, this panel should stay aligned with session handling conventions. ## docs issues -- No blocking docs issues for sprint acceptance. -- Optional follow-up: add concise API reference docs for the new hosted `v1` endpoints if not already planned. +- No blocking docs issues for P10-S2 acceptance. ## should anything be added to RULES.md? -- Optional hardening rule worth keeping: one-time auth challenge secrets must be stored hashed at rest. +- Optional hardening addition: keep an explicit invariant that active external chat identity bindings must be unambiguous per channel transport. ## should anything update ARCHITECTURE.md? -- Optional: add a brief hosted auth token lifecycle note (issue, hash-at-rest, verify by hash, TTL/revocation). +- Optional refinement: document finalized Telegram link conflict semantics (`identity_conflict`) and replay handling posture for consumed/confirmed link codes. ## recommended next action 1. Ready for Control Tower merge approval under policy. -2. After merge, open `P10-S2` only on top of these hosted identity/workspace/device foundations without widening scope early. +2. After merge, open `P10-S3` only for chat-native continuity behavior on top of these transport seams. diff --git a/apps/api/alembic/versions/20260408_0044_phase10_telegram_transport.py b/apps/api/alembic/versions/20260408_0044_phase10_telegram_transport.py new file mode 100644 index 0000000..25afbc8 --- /dev/null +++ b/apps/api/alembic/versions/20260408_0044_phase10_telegram_transport.py @@ -0,0 +1,217 @@ +"""Add Phase 10 Sprint 2 Telegram transport tables and routing receipts.""" + +from __future__ import annotations + +from alembic import op + + +revision = "20260408_0044" +down_revision = "20260408_0043" +branch_labels = None +depends_on = None + + +_UPGRADE_STATEMENTS = ( + """ + CREATE TABLE channel_identities ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + user_account_id uuid NOT NULL REFERENCES user_accounts(id) ON DELETE CASCADE, + workspace_id uuid NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE, + channel_type text NOT NULL, + external_user_id text NOT NULL, + external_chat_id text NOT NULL, + external_username text NULL, + status text NOT NULL DEFAULT 'linked', + linked_at timestamptz NOT NULL DEFAULT now(), + unlinked_at timestamptz NULL, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + CONSTRAINT channel_identities_channel_type_check + CHECK (channel_type IN ('telegram')), + CONSTRAINT channel_identities_status_check + CHECK (status IN ('linked', 'unlinked')), + CONSTRAINT channel_identities_external_user_id_length_check + CHECK (char_length(external_user_id) >= 1 AND char_length(external_user_id) <= 160), + CONSTRAINT channel_identities_external_chat_id_length_check + CHECK (char_length(external_chat_id) >= 1 AND char_length(external_chat_id) <= 160) + ) + """, + ( + "CREATE UNIQUE INDEX channel_identities_linked_external_chat_uidx " + "ON channel_identities (channel_type, external_chat_id) " + "WHERE status = 'linked'" + ), + ( + "CREATE INDEX channel_identities_user_workspace_idx " + "ON channel_identities (user_account_id, workspace_id, created_at DESC, id DESC)" + ), + """ + CREATE TABLE channel_link_challenges ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + user_account_id uuid NOT NULL REFERENCES user_accounts(id) ON DELETE CASCADE, + workspace_id uuid NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE, + channel_type text NOT NULL, + challenge_token_hash text NOT NULL UNIQUE, + link_code text NOT NULL UNIQUE, + status text NOT NULL, + expires_at timestamptz NOT NULL, + confirmed_at timestamptz NULL, + channel_identity_id uuid NULL REFERENCES channel_identities(id) ON DELETE SET NULL, + created_at timestamptz NOT NULL DEFAULT now(), + CONSTRAINT channel_link_challenges_channel_type_check + CHECK (channel_type IN ('telegram')), + CONSTRAINT channel_link_challenges_status_check + CHECK (status IN ('pending', 'confirmed', 'expired', 'cancelled')), + CONSTRAINT channel_link_challenges_link_code_length_check + CHECK (char_length(link_code) >= 6 AND char_length(link_code) <= 32) + ) + """, + ( + "CREATE INDEX channel_link_challenges_user_workspace_status_idx " + "ON channel_link_challenges (user_account_id, workspace_id, channel_type, status, created_at DESC, id DESC)" + ), + """ + CREATE TABLE channel_threads ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + workspace_id uuid NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE, + channel_type text NOT NULL, + external_thread_key text NOT NULL, + channel_identity_id uuid NULL REFERENCES channel_identities(id) ON DELETE SET NULL, + last_message_at timestamptz NULL, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + UNIQUE (workspace_id, channel_type, external_thread_key), + CONSTRAINT channel_threads_channel_type_check + CHECK (channel_type IN ('telegram')), + CONSTRAINT channel_threads_external_thread_key_length_check + CHECK (char_length(external_thread_key) >= 1 AND char_length(external_thread_key) <= 240) + ) + """, + ( + "CREATE INDEX channel_threads_workspace_last_message_idx " + "ON channel_threads (workspace_id, channel_type, last_message_at DESC, created_at DESC, id DESC)" + ), + """ + CREATE TABLE channel_messages ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + workspace_id uuid NULL REFERENCES workspaces(id) ON DELETE SET NULL, + channel_thread_id uuid NULL REFERENCES channel_threads(id) ON DELETE SET NULL, + channel_identity_id uuid NULL REFERENCES channel_identities(id) ON DELETE SET NULL, + channel_type text NOT NULL, + direction text NOT NULL, + provider_update_id text NULL, + provider_message_id text NULL, + external_chat_id text NULL, + external_user_id text NULL, + message_text text NULL, + normalized_payload jsonb NOT NULL DEFAULT '{}'::jsonb, + route_status text NOT NULL, + idempotency_key text NOT NULL, + created_at timestamptz NOT NULL DEFAULT now(), + received_at timestamptz NOT NULL DEFAULT now(), + UNIQUE (channel_type, direction, idempotency_key), + CONSTRAINT channel_messages_channel_type_check + CHECK (channel_type IN ('telegram')), + CONSTRAINT channel_messages_direction_check + CHECK (direction IN ('inbound', 'outbound')), + CONSTRAINT channel_messages_route_status_check + CHECK (route_status IN ('resolved', 'unresolved')), + CONSTRAINT channel_messages_idempotency_key_length_check + CHECK (char_length(idempotency_key) >= 16 AND char_length(idempotency_key) <= 160) + ) + """, + ( + "CREATE UNIQUE INDEX channel_messages_inbound_update_uidx " + "ON channel_messages (channel_type, provider_update_id) " + "WHERE direction = 'inbound' AND provider_update_id IS NOT NULL" + ), + ( + "CREATE INDEX channel_messages_workspace_created_idx " + "ON channel_messages (workspace_id, channel_type, created_at DESC, id DESC)" + ), + ( + "CREATE INDEX channel_messages_thread_created_idx " + "ON channel_messages (channel_thread_id, created_at DESC, id DESC)" + ), + """ + CREATE TABLE chat_intents ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + workspace_id uuid NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE, + channel_message_id uuid NOT NULL REFERENCES channel_messages(id) ON DELETE CASCADE, + channel_thread_id uuid NULL REFERENCES channel_threads(id) ON DELETE SET NULL, + intent_kind text NOT NULL, + status text NOT NULL, + created_at timestamptz NOT NULL DEFAULT now(), + UNIQUE (channel_message_id, intent_kind), + CONSTRAINT chat_intents_intent_kind_check + CHECK (intent_kind IN ('inbound_message')), + CONSTRAINT chat_intents_status_check + CHECK (status IN ('pending', 'recorded')) + ) + """, + ( + "CREATE INDEX chat_intents_workspace_created_idx " + "ON chat_intents (workspace_id, created_at DESC, id DESC)" + ), + """ + CREATE TABLE channel_delivery_receipts ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + workspace_id uuid NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE, + channel_message_id uuid NOT NULL UNIQUE REFERENCES channel_messages(id) ON DELETE CASCADE, + channel_type text NOT NULL, + status text NOT NULL, + provider_receipt_id text NULL, + failure_code text NULL, + failure_detail text NULL, + recorded_at timestamptz NOT NULL DEFAULT now(), + created_at timestamptz NOT NULL DEFAULT now(), + CONSTRAINT channel_delivery_receipts_channel_type_check + CHECK (channel_type IN ('telegram')), + CONSTRAINT channel_delivery_receipts_status_check + CHECK (status IN ('delivered', 'failed', 'simulated')) + ) + """, + ( + "CREATE INDEX channel_delivery_receipts_workspace_recorded_idx " + "ON channel_delivery_receipts (workspace_id, channel_type, recorded_at DESC, id DESC)" + ), +) + +_UPGRADE_GRANT_STATEMENTS = ( + "GRANT SELECT, INSERT, UPDATE, DELETE ON channel_identities TO alicebot_app", + "GRANT SELECT, INSERT, UPDATE, DELETE ON channel_link_challenges TO alicebot_app", + "GRANT SELECT, INSERT, UPDATE, DELETE ON channel_threads TO alicebot_app", + "GRANT SELECT, INSERT, UPDATE, DELETE ON channel_messages TO alicebot_app", + "GRANT SELECT, INSERT, UPDATE, DELETE ON chat_intents TO alicebot_app", + "GRANT SELECT, INSERT, UPDATE, DELETE ON channel_delivery_receipts TO alicebot_app", +) + +_DOWNGRADE_STATEMENTS = ( + "DROP TABLE IF EXISTS channel_delivery_receipts", + "DROP TABLE IF EXISTS chat_intents", + "DROP INDEX IF EXISTS channel_messages_thread_created_idx", + "DROP INDEX IF EXISTS channel_messages_workspace_created_idx", + "DROP INDEX IF EXISTS channel_messages_inbound_update_uidx", + "DROP TABLE IF EXISTS channel_messages", + "DROP INDEX IF EXISTS channel_threads_workspace_last_message_idx", + "DROP TABLE IF EXISTS channel_threads", + "DROP INDEX IF EXISTS channel_link_challenges_user_workspace_status_idx", + "DROP TABLE IF EXISTS channel_link_challenges", + "DROP INDEX IF EXISTS channel_identities_user_workspace_idx", + "DROP INDEX IF EXISTS channel_identities_linked_external_chat_uidx", + "DROP TABLE IF EXISTS channel_identities", +) + + +def _execute_statements(statements: tuple[str, ...]) -> None: + for statement in statements: + op.execute(statement) + + +def upgrade() -> None: + _execute_statements(_UPGRADE_STATEMENTS) + _execute_statements(_UPGRADE_GRANT_STATEMENTS) + + +def downgrade() -> None: + _execute_statements(_DOWNGRADE_STATEMENTS) diff --git a/apps/api/src/alicebot_api/config.py b/apps/api/src/alicebot_api/config.py index f8b4f24..ae07b93 100644 --- a/apps/api/src/alicebot_api/config.py +++ b/apps/api/src/alicebot_api/config.py @@ -40,6 +40,10 @@ DEFAULT_MAGIC_LINK_TTL_SECONDS = 900 DEFAULT_AUTH_SESSION_TTL_SECONDS = 2_592_000 DEFAULT_DEVICE_LINK_TTL_SECONDS = 600 +DEFAULT_TELEGRAM_LINK_TTL_SECONDS = 600 +DEFAULT_TELEGRAM_BOT_USERNAME = "alicebot" +DEFAULT_TELEGRAM_WEBHOOK_SECRET = "" +DEFAULT_TELEGRAM_BOT_TOKEN = "" Environment = Mapping[str, str] @@ -86,6 +90,10 @@ class Settings: magic_link_ttl_seconds: int = DEFAULT_MAGIC_LINK_TTL_SECONDS auth_session_ttl_seconds: int = DEFAULT_AUTH_SESSION_TTL_SECONDS device_link_ttl_seconds: int = DEFAULT_DEVICE_LINK_TTL_SECONDS + telegram_link_ttl_seconds: int = DEFAULT_TELEGRAM_LINK_TTL_SECONDS + telegram_bot_username: str = DEFAULT_TELEGRAM_BOT_USERNAME + telegram_webhook_secret: str = DEFAULT_TELEGRAM_WEBHOOK_SECRET + telegram_bot_token: str = DEFAULT_TELEGRAM_BOT_TOKEN @classmethod def from_env(cls, env: Environment | None = None) -> "Settings": @@ -164,6 +172,26 @@ def from_env(cls, env: Environment | None = None) -> "Settings": "DEVICE_LINK_TTL_SECONDS", cls.device_link_ttl_seconds, ), + telegram_link_ttl_seconds=_get_env_int( + current_env, + "TELEGRAM_LINK_TTL_SECONDS", + cls.telegram_link_ttl_seconds, + ), + telegram_bot_username=_get_env_value( + current_env, + "TELEGRAM_BOT_USERNAME", + cls.telegram_bot_username, + ).strip(), + telegram_webhook_secret=_get_env_value( + current_env, + "TELEGRAM_WEBHOOK_SECRET", + cls.telegram_webhook_secret, + ).strip(), + telegram_bot_token=_get_env_value( + current_env, + "TELEGRAM_BOT_TOKEN", + cls.telegram_bot_token, + ).strip(), ) return _validate_settings(settings) @@ -185,6 +213,10 @@ def _validate_settings(settings: Settings) -> Settings: raise ValueError("AUTH_SESSION_TTL_SECONDS must be a positive integer") if settings.device_link_ttl_seconds <= 0: raise ValueError("DEVICE_LINK_TTL_SECONDS must be a positive integer") + if settings.telegram_link_ttl_seconds <= 0: + raise ValueError("TELEGRAM_LINK_TTL_SECONDS must be a positive integer") + if settings.telegram_bot_username == "": + raise ValueError("TELEGRAM_BOT_USERNAME must be provided") if settings.app_env not in {"development", "test"}: if settings.auth_user_id == "": diff --git a/apps/api/src/alicebot_api/contracts.py b/apps/api/src/alicebot_api/contracts.py index 70764ab..7b1b044 100644 --- a/apps/api/src/alicebot_api/contracts.py +++ b/apps/api/src/alicebot_api/contracts.py @@ -92,6 +92,14 @@ HostedDeviceStatus = Literal["active", "revoked"] HostedWorkspaceBootstrapStatus = Literal["pending", "ready"] HostedWorkspaceMemberRole = Literal["owner", "member"] +ChannelTransportType = Literal["telegram"] +ChannelIdentityStatus = Literal["linked", "unlinked"] +ChannelLinkChallengeStatus = Literal["pending", "confirmed", "expired", "cancelled"] +ChannelMessageDirection = Literal["inbound", "outbound"] +ChannelMessageRouteStatus = Literal["resolved", "unresolved"] +ChatIntentKind = Literal["inbound_message"] +ChatIntentStatus = Literal["pending", "recorded"] +ChannelDeliveryReceiptStatus = Literal["delivered", "failed", "simulated"] TaskArtifactStatus = Literal["registered"] TaskArtifactIngestionStatus = Literal["pending", "ingested"] TaskArtifactChunkRetrievalScopeKind = Literal["task", "artifact"] @@ -327,6 +335,8 @@ MAX_CHIEF_OF_STAFF_PRIORITY_LIMIT = 100 DEFAULT_CALENDAR_EVENT_LIST_LIMIT = 20 MAX_CALENDAR_EVENT_LIST_LIMIT = 50 +DEFAULT_CHANNEL_MESSAGE_LIMIT = 50 +MAX_CHANNEL_MESSAGE_LIMIT = 200 COMPILER_VERSION_V0 = "continuity_v0" PROMPT_ASSEMBLY_VERSION_V0 = "prompt_assembly_v0" RESPONSE_GENERATION_VERSION_V0 = "response_generation_v0" @@ -442,6 +452,11 @@ GMAIL_ACCOUNT_LIST_ORDER = ["created_at_asc", "id_asc"] CALENDAR_ACCOUNT_LIST_ORDER = ["created_at_asc", "id_asc"] CALENDAR_EVENT_LIST_ORDER = ["start_time_asc", "provider_event_id_asc"] +CHANNEL_IDENTITY_LIST_ORDER = ["updated_at_desc", "created_at_desc", "id_desc"] +CHANNEL_LINK_CHALLENGE_LIST_ORDER = ["created_at_desc", "id_desc"] +CHANNEL_THREAD_LIST_ORDER = ["last_message_at_desc", "id_desc"] +CHANNEL_MESSAGE_LIST_ORDER = ["created_at_desc", "id_desc"] +CHANNEL_DELIVERY_RECEIPT_LIST_ORDER = ["recorded_at_desc", "id_desc"] TASK_ARTIFACT_LIST_ORDER = ["created_at_asc", "id_asc"] TASK_ARTIFACT_CHUNK_LIST_ORDER = ["sequence_no_asc", "id_asc"] TASK_ARTIFACT_CHUNK_EMBEDDING_LIST_ORDER = [ @@ -5119,7 +5134,7 @@ class HostedBootstrapStatusRecord(TypedDict): status: HostedWorkspaceBootstrapStatus bootstrapped_at: str | None ready_for_next_phase_telegram_linkage: bool - telegram_state: Literal["not_available_in_p10_s1"] + telegram_state: Literal["available_in_p10_s2_transport"] class HostedDeviceRecord(TypedDict): @@ -5157,3 +5172,85 @@ class HostedUserPreferencesRecord(TypedDict): quiet_hours: JsonObject created_at: str updated_at: str + + +class ChannelIdentityRecord(TypedDict): + id: str + user_account_id: str + workspace_id: str + channel_type: ChannelTransportType + external_user_id: str + external_chat_id: str + external_username: str | None + status: ChannelIdentityStatus + linked_at: str + unlinked_at: str | None + created_at: str + updated_at: str + + +class ChannelLinkChallengeRecord(TypedDict): + id: str + user_account_id: str + workspace_id: str + channel_type: ChannelTransportType + link_code: str + status: ChannelLinkChallengeStatus + expires_at: str + confirmed_at: str | None + channel_identity_id: str | None + created_at: str + challenge_token: NotRequired[str] + + +class ChannelThreadRecord(TypedDict): + id: str + workspace_id: str + channel_type: ChannelTransportType + external_thread_key: str + channel_identity_id: str | None + last_message_at: str | None + created_at: str + updated_at: str + + +class ChannelMessageRecord(TypedDict): + id: str + workspace_id: str | None + channel_thread_id: str | None + channel_identity_id: str | None + channel_type: ChannelTransportType + direction: ChannelMessageDirection + provider_update_id: str | None + provider_message_id: str | None + external_chat_id: str | None + external_user_id: str | None + message_text: str | None + normalized_payload: JsonObject + route_status: ChannelMessageRouteStatus + idempotency_key: str + created_at: str + received_at: str + + +class ChatIntentRecord(TypedDict): + id: str + workspace_id: str + channel_message_id: str + channel_thread_id: str | None + intent_kind: ChatIntentKind + status: ChatIntentStatus + created_at: str + + +class ChannelDeliveryReceiptRecord(TypedDict): + id: str + workspace_id: str + channel_message_id: str + channel_type: ChannelTransportType + status: ChannelDeliveryReceiptStatus + provider_receipt_id: str | None + failure_code: str | None + failure_detail: str | None + recorded_at: str + created_at: str diff --git a/apps/api/src/alicebot_api/hosted_workspace.py b/apps/api/src/alicebot_api/hosted_workspace.py index c169772..cb39fcc 100644 --- a/apps/api/src/alicebot_api/hosted_workspace.py +++ b/apps/api/src/alicebot_api/hosted_workspace.py @@ -253,7 +253,7 @@ def get_bootstrap_status( if workspace["bootstrapped_at"] is None else workspace["bootstrapped_at"].isoformat(), "ready_for_next_phase_telegram_linkage": workspace["bootstrap_status"] == "ready", - "telegram_state": "not_available_in_p10_s1", + "telegram_state": "available_in_p10_s2_transport", } diff --git a/apps/api/src/alicebot_api/main.py b/apps/api/src/alicebot_api/main.py index f86bce3..fdb5e8a 100644 --- a/apps/api/src/alicebot_api/main.py +++ b/apps/api/src/alicebot_api/main.py @@ -387,6 +387,30 @@ serialize_workspace, set_session_workspace, ) +from alicebot_api.telegram_channels import ( + TelegramIdentityNotFoundError, + TelegramLinkPendingError, + TelegramLinkTokenExpiredError, + TelegramLinkTokenInvalidError, + TelegramMessageNotFoundError, + TelegramRoutingError, + TelegramWebhookValidationError, + confirm_telegram_link_challenge, + dispatch_telegram_message, + get_telegram_link_status, + ingest_telegram_webhook, + list_workspace_telegram_delivery_receipts, + list_workspace_telegram_messages, + list_workspace_telegram_threads, + serialize_channel_identity, + serialize_channel_link_challenge, + serialize_channel_message, + serialize_channel_thread, + serialize_delivery_receipt, + serialize_webhook_ingest_result, + start_telegram_link_challenge, + unlink_telegram_identity, +) from alicebot_api.continuity_review import ( ContinuityReviewNotFoundError, ContinuityReviewValidationError, @@ -1339,6 +1363,31 @@ class HostedPreferencesPatchRequest(BaseModel): quiet_hours: dict[str, object] | None = None +class TelegramLinkStartRequest(BaseModel): + model_config = ConfigDict(extra="forbid") + + workspace_id: UUID | None = None + + +class TelegramLinkConfirmRequest(BaseModel): + model_config = ConfigDict(extra="forbid") + + challenge_token: str = Field(min_length=16, max_length=256) + + +class TelegramUnlinkRequest(BaseModel): + model_config = ConfigDict(extra="forbid") + + workspace_id: UUID | None = None + + +class TelegramDispatchRequest(BaseModel): + model_config = ConfigDict(extra="forbid") + + text: str = Field(min_length=1, max_length=4096) + idempotency_key: str | None = Field(default=None, min_length=16, max_length=160) + + def _extract_bearer_token(request: Request) -> str: raw_authorization = request.headers.get("authorization", "").strip() if raw_authorization == "": @@ -1364,10 +1413,52 @@ def _serialize_hosted_session_payload( "workspace": workspace, "preferences": preferences, "feature_flags": feature_flags, - "telegram_state": "not_available_in_p10_s1", + "telegram_state": "available_in_p10_s2_transport", } +def _resolve_workspace_for_hosted_channel_request( + conn, + *, + user_account_id: UUID, + session_id: UUID, + preferred_workspace_id: UUID | None, + requested_workspace_id: UUID | None, +): + if requested_workspace_id is not None: + workspace = get_workspace_for_member( + conn, + workspace_id=requested_workspace_id, + user_account_id=user_account_id, + ) + if workspace is None: + raise HostedWorkspaceNotFoundError(f"workspace {requested_workspace_id} was not found") + if preferred_workspace_id != workspace["id"]: + set_session_workspace( + conn, + session_id=session_id, + user_account_id=user_account_id, + workspace_id=workspace["id"], + ) + return workspace + + workspace = get_current_workspace( + conn, + user_account_id=user_account_id, + preferred_workspace_id=preferred_workspace_id, + ) + if workspace is None: + return None + if preferred_workspace_id != workspace["id"]: + set_session_workspace( + conn, + session_id=session_id, + user_account_id=user_account_id, + workspace_id=workspace["id"], + ) + return workspace + + @app.get("/healthz") def healthcheck() -> JSONResponse: settings = get_settings() @@ -4843,7 +4934,7 @@ def bootstrap_v1_workspace( "bootstrap": status_payload, "preferences": serialize_user_preferences(preferences), "feature_flags": feature_flags, - "telegram_state": "not_available_in_p10_s1", + "telegram_state": "available_in_p10_s2_transport", } ), ) @@ -4886,7 +4977,7 @@ def get_v1_workspace_bootstrap_status(request: Request) -> JSONResponse: "workspace": serialize_workspace(workspace), "bootstrap": status_payload, "feature_flags": feature_flags, - "telegram_state": "not_available_in_p10_s1", + "telegram_state": "available_in_p10_s2_transport", } ), ) @@ -5069,3 +5160,377 @@ def patch_v1_preferences( status_code=200, content=jsonable_encoder({"preferences": serialize_user_preferences(preferences)}), ) + + +@app.post("/v1/channels/telegram/link/start") +def start_v1_telegram_link(request: Request, body: TelegramLinkStartRequest) -> JSONResponse: + settings = get_settings() + + try: + session_token = _extract_bearer_token(request) + with psycopg.connect(settings.database_url, row_factory=dict_row) as conn: + with conn.transaction(): + resolution = resolve_auth_session(conn, session_token=session_token) + user_account_id = resolution["user_account"]["id"] + workspace = _resolve_workspace_for_hosted_channel_request( + conn, + user_account_id=user_account_id, + session_id=resolution["session"]["id"], + preferred_workspace_id=resolution["session"]["workspace_id"], + requested_workspace_id=body.workspace_id, + ) + if workspace is None: + return JSONResponse(status_code=404, content={"detail": "no workspace is currently selected"}) + + challenge = start_telegram_link_challenge( + conn, + user_account_id=user_account_id, + workspace_id=workspace["id"], + ttl_seconds=settings.telegram_link_ttl_seconds, + ) + except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc: + return JSONResponse(status_code=401, content={"detail": str(exc)}) + except HostedWorkspaceNotFoundError as exc: + return JSONResponse(status_code=404, content={"detail": str(exc)}) + + payload = { + "workspace_id": str(workspace["id"]), + "challenge": serialize_channel_link_challenge(challenge, include_token=True), + "instructions": { + "bot_username": settings.telegram_bot_username, + "command": f"/link {challenge['link_code']}", + "posture": "send the link code to the configured telegram bot, then confirm in hosted settings", + }, + } + return JSONResponse(status_code=200, content=jsonable_encoder(payload)) + + +@app.post("/v1/channels/telegram/link/confirm") +def confirm_v1_telegram_link(request: Request, body: TelegramLinkConfirmRequest) -> JSONResponse: + settings = get_settings() + + try: + session_token = _extract_bearer_token(request) + with psycopg.connect(settings.database_url, row_factory=dict_row) as conn: + with conn.transaction(): + resolution = resolve_auth_session(conn, session_token=session_token) + challenge, identity = confirm_telegram_link_challenge( + conn, + user_account_id=resolution["user_account"]["id"], + challenge_token=body.challenge_token, + ) + except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc: + return JSONResponse(status_code=401, content={"detail": str(exc)}) + except TelegramLinkPendingError as exc: + return JSONResponse(status_code=409, content={"detail": str(exc)}) + except TelegramLinkTokenExpiredError as exc: + return JSONResponse(status_code=401, content={"detail": str(exc)}) + except TelegramLinkTokenInvalidError as exc: + return JSONResponse(status_code=400, content={"detail": str(exc)}) + + return JSONResponse( + status_code=201, + content=jsonable_encoder( + { + "identity": serialize_channel_identity(identity), + "challenge": serialize_channel_link_challenge(challenge, include_token=False), + } + ), + ) + + +@app.post("/v1/channels/telegram/unlink") +def unlink_v1_telegram(request: Request, body: TelegramUnlinkRequest) -> JSONResponse: + settings = get_settings() + + try: + session_token = _extract_bearer_token(request) + with psycopg.connect(settings.database_url, row_factory=dict_row) as conn: + with conn.transaction(): + resolution = resolve_auth_session(conn, session_token=session_token) + user_account_id = resolution["user_account"]["id"] + workspace = _resolve_workspace_for_hosted_channel_request( + conn, + user_account_id=user_account_id, + session_id=resolution["session"]["id"], + preferred_workspace_id=resolution["session"]["workspace_id"], + requested_workspace_id=body.workspace_id, + ) + if workspace is None: + return JSONResponse(status_code=404, content={"detail": "no workspace is currently selected"}) + identity = unlink_telegram_identity( + conn, + user_account_id=user_account_id, + workspace_id=workspace["id"], + ) + except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc: + return JSONResponse(status_code=401, content={"detail": str(exc)}) + except HostedWorkspaceNotFoundError as exc: + return JSONResponse(status_code=404, content={"detail": str(exc)}) + except TelegramIdentityNotFoundError as exc: + return JSONResponse(status_code=404, content={"detail": str(exc)}) + + return JSONResponse(status_code=200, content=jsonable_encoder({"identity": serialize_channel_identity(identity)})) + + +@app.get("/v1/channels/telegram/status") +def get_v1_telegram_status( + request: Request, + workspace_id: UUID | None = None, +) -> JSONResponse: + settings = get_settings() + + try: + session_token = _extract_bearer_token(request) + with psycopg.connect(settings.database_url, row_factory=dict_row) as conn: + with conn.transaction(): + resolution = resolve_auth_session(conn, session_token=session_token) + user_account_id = resolution["user_account"]["id"] + workspace = _resolve_workspace_for_hosted_channel_request( + conn, + user_account_id=user_account_id, + session_id=resolution["session"]["id"], + preferred_workspace_id=resolution["session"]["workspace_id"], + requested_workspace_id=workspace_id, + ) + if workspace is None: + return JSONResponse(status_code=404, content={"detail": "no workspace is currently selected"}) + + payload = get_telegram_link_status( + conn, + user_account_id=user_account_id, + workspace_id=workspace["id"], + ) + except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc: + return JSONResponse(status_code=401, content={"detail": str(exc)}) + except HostedWorkspaceNotFoundError as exc: + return JSONResponse(status_code=404, content={"detail": str(exc)}) + + return JSONResponse(status_code=200, content=jsonable_encoder(payload)) + + +@app.post("/v1/channels/telegram/webhook") +async def ingest_v1_telegram_webhook(request: Request) -> JSONResponse: + settings = get_settings() + if settings.telegram_webhook_secret != "": + header_secret = request.headers.get("x-telegram-bot-api-secret-token", "").strip() + if header_secret != settings.telegram_webhook_secret: + return JSONResponse(status_code=401, content={"detail": "telegram webhook secret is invalid"}) + + try: + payload = await request.json() + except ValueError: + return JSONResponse(status_code=400, content={"detail": "telegram webhook payload must be valid json"}) + + if not isinstance(payload, dict): + return JSONResponse(status_code=400, content={"detail": "telegram webhook payload must be an object"}) + + try: + with psycopg.connect(settings.database_url, row_factory=dict_row) as conn: + with conn.transaction(): + ingest_result = ingest_telegram_webhook( + conn, + payload=payload, + bot_username=settings.telegram_bot_username, + ) + except TelegramWebhookValidationError as exc: + return JSONResponse(status_code=400, content={"detail": str(exc)}) + + return JSONResponse( + status_code=200, + content=jsonable_encoder( + { + "status": "accepted", + "ingest": serialize_webhook_ingest_result(ingest_result), + } + ), + ) + + +@app.get("/v1/channels/telegram/messages") +def list_v1_telegram_messages( + request: Request, + limit: int = Query(default=50, ge=1, le=200), +) -> JSONResponse: + settings = get_settings() + + try: + session_token = _extract_bearer_token(request) + with psycopg.connect(settings.database_url, row_factory=dict_row) as conn: + with conn.transaction(): + resolution = resolve_auth_session(conn, session_token=session_token) + workspace = _resolve_workspace_for_hosted_channel_request( + conn, + user_account_id=resolution["user_account"]["id"], + session_id=resolution["session"]["id"], + preferred_workspace_id=resolution["session"]["workspace_id"], + requested_workspace_id=None, + ) + if workspace is None: + return JSONResponse(status_code=404, content={"detail": "no workspace is currently selected"}) + rows = list_workspace_telegram_messages( + conn, + user_account_id=resolution["user_account"]["id"], + workspace_id=workspace["id"], + limit=limit, + ) + except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc: + return JSONResponse(status_code=401, content={"detail": str(exc)}) + + items = [serialize_channel_message(row) for row in rows] + return JSONResponse( + status_code=200, + content=jsonable_encoder( + { + "items": items, + "summary": { + "workspace_id": str(workspace["id"]), + "total_count": len(items), + "order": ["created_at_desc", "id_desc"], + }, + } + ), + ) + + +@app.get("/v1/channels/telegram/threads") +def list_v1_telegram_threads( + request: Request, + limit: int = Query(default=50, ge=1, le=200), +) -> JSONResponse: + settings = get_settings() + + try: + session_token = _extract_bearer_token(request) + with psycopg.connect(settings.database_url, row_factory=dict_row) as conn: + with conn.transaction(): + resolution = resolve_auth_session(conn, session_token=session_token) + workspace = _resolve_workspace_for_hosted_channel_request( + conn, + user_account_id=resolution["user_account"]["id"], + session_id=resolution["session"]["id"], + preferred_workspace_id=resolution["session"]["workspace_id"], + requested_workspace_id=None, + ) + if workspace is None: + return JSONResponse(status_code=404, content={"detail": "no workspace is currently selected"}) + rows = list_workspace_telegram_threads( + conn, + user_account_id=resolution["user_account"]["id"], + workspace_id=workspace["id"], + limit=limit, + ) + except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc: + return JSONResponse(status_code=401, content={"detail": str(exc)}) + + items = [serialize_channel_thread(row) for row in rows] + return JSONResponse( + status_code=200, + content=jsonable_encoder( + { + "items": items, + "summary": { + "workspace_id": str(workspace["id"]), + "total_count": len(items), + "order": ["last_message_at_desc", "id_desc"], + }, + } + ), + ) + + +@app.post("/v1/channels/telegram/messages/{message_id}/dispatch") +def dispatch_v1_telegram_message( + message_id: UUID, + request: Request, + body: TelegramDispatchRequest, +) -> JSONResponse: + settings = get_settings() + + try: + session_token = _extract_bearer_token(request) + with psycopg.connect(settings.database_url, row_factory=dict_row) as conn: + with conn.transaction(): + resolution = resolve_auth_session(conn, session_token=session_token) + workspace = _resolve_workspace_for_hosted_channel_request( + conn, + user_account_id=resolution["user_account"]["id"], + session_id=resolution["session"]["id"], + preferred_workspace_id=resolution["session"]["workspace_id"], + requested_workspace_id=None, + ) + if workspace is None: + return JSONResponse(status_code=404, content={"detail": "no workspace is currently selected"}) + outbound_message, receipt = dispatch_telegram_message( + conn, + user_account_id=resolution["user_account"]["id"], + workspace_id=workspace["id"], + source_message_id=message_id, + text=body.text, + dispatch_idempotency_key=body.idempotency_key, + bot_token=settings.telegram_bot_token, + ) + except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc: + return JSONResponse(status_code=401, content={"detail": str(exc)}) + except TelegramMessageNotFoundError as exc: + return JSONResponse(status_code=404, content={"detail": str(exc)}) + except TelegramRoutingError as exc: + return JSONResponse(status_code=409, content={"detail": str(exc)}) + except ValueError as exc: + return JSONResponse(status_code=400, content={"detail": str(exc)}) + + return JSONResponse( + status_code=201, + content=jsonable_encoder( + { + "message": serialize_channel_message(outbound_message), + "receipt": serialize_delivery_receipt(receipt), + } + ), + ) + + +@app.get("/v1/channels/telegram/delivery-receipts") +def list_v1_telegram_delivery_receipts( + request: Request, + limit: int = Query(default=50, ge=1, le=200), +) -> JSONResponse: + settings = get_settings() + + try: + session_token = _extract_bearer_token(request) + with psycopg.connect(settings.database_url, row_factory=dict_row) as conn: + with conn.transaction(): + resolution = resolve_auth_session(conn, session_token=session_token) + workspace = _resolve_workspace_for_hosted_channel_request( + conn, + user_account_id=resolution["user_account"]["id"], + session_id=resolution["session"]["id"], + preferred_workspace_id=resolution["session"]["workspace_id"], + requested_workspace_id=None, + ) + if workspace is None: + return JSONResponse(status_code=404, content={"detail": "no workspace is currently selected"}) + rows = list_workspace_telegram_delivery_receipts( + conn, + user_account_id=resolution["user_account"]["id"], + workspace_id=workspace["id"], + limit=limit, + ) + except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc: + return JSONResponse(status_code=401, content={"detail": str(exc)}) + + items = [serialize_delivery_receipt(row) for row in rows] + return JSONResponse( + status_code=200, + content=jsonable_encoder( + { + "items": items, + "summary": { + "workspace_id": str(workspace["id"]), + "total_count": len(items), + "order": ["recorded_at_desc", "id_desc"], + }, + } + ), + ) diff --git a/apps/api/src/alicebot_api/store.py b/apps/api/src/alicebot_api/store.py index 5c3227f..eaa5e34 100644 --- a/apps/api/src/alicebot_api/store.py +++ b/apps/api/src/alicebot_api/store.py @@ -404,6 +404,88 @@ class ProtectedCalendarCredentialRow(TypedDict): updated_at: datetime +class ChannelIdentityRow(TypedDict): + id: UUID + user_account_id: UUID + workspace_id: UUID + channel_type: str + external_user_id: str + external_chat_id: str + external_username: str | None + status: str + linked_at: datetime + unlinked_at: datetime | None + created_at: datetime + updated_at: datetime + + +class ChannelLinkChallengeRow(TypedDict): + id: UUID + user_account_id: UUID + workspace_id: UUID + channel_type: str + challenge_token_hash: str + link_code: str + status: str + expires_at: datetime + confirmed_at: datetime | None + channel_identity_id: UUID | None + created_at: datetime + + +class ChannelThreadRow(TypedDict): + id: UUID + workspace_id: UUID + channel_type: str + external_thread_key: str + channel_identity_id: UUID | None + last_message_at: datetime | None + created_at: datetime + updated_at: datetime + + +class ChannelMessageRow(TypedDict): + id: UUID + workspace_id: UUID | None + channel_thread_id: UUID | None + channel_identity_id: UUID | None + channel_type: str + direction: str + provider_update_id: str | None + provider_message_id: str | None + external_chat_id: str | None + external_user_id: str | None + message_text: str | None + normalized_payload: JsonObject + route_status: str + idempotency_key: str + created_at: datetime + received_at: datetime + + +class ChatIntentRow(TypedDict): + id: UUID + workspace_id: UUID + channel_message_id: UUID + channel_thread_id: UUID | None + intent_kind: str + status: str + created_at: datetime + + +class ChannelDeliveryReceiptRow(TypedDict): + id: UUID + workspace_id: UUID + channel_message_id: UUID + channel_type: str + status: str + provider_receipt_id: str | None + failure_code: str | None + failure_detail: str | None + recorded_at: datetime + created_at: datetime + + class TaskArtifactRow(TypedDict): id: UUID user_id: UUID diff --git a/apps/api/src/alicebot_api/telegram_channels.py b/apps/api/src/alicebot_api/telegram_channels.py new file mode 100644 index 0000000..01e0ccc --- /dev/null +++ b/apps/api/src/alicebot_api/telegram_channels.py @@ -0,0 +1,1415 @@ +from __future__ import annotations + +from datetime import datetime, timedelta, timezone +import hashlib +import re +import secrets +import string +from typing import Any, TypedDict +from uuid import UUID + +from psycopg.types.json import Jsonb + +from alicebot_api.hosted_auth import generate_token, hash_token, utc_now + + +TELEGRAM_CHANNEL_TYPE = "telegram" + +_LINK_PATTERN = re.compile(r"^/link(?:@(?P[A-Za-z0-9_]+))?\s+(?P[A-Za-z0-9]{6,32})$") +_START_PATTERN = re.compile(r"^/start\s+(?P[A-Za-z0-9]{6,32})$") + + +class TelegramLinkTokenInvalidError(ValueError): + """Raised when a Telegram link token is invalid or already consumed.""" + + +class TelegramLinkTokenExpiredError(ValueError): + """Raised when a Telegram link token has expired.""" + + +class TelegramLinkPendingError(RuntimeError): + """Raised when link confirmation has not yet been observed via webhook.""" + + +class TelegramIdentityConflictError(RuntimeError): + """Raised when a Telegram chat is already linked to a different workspace.""" + + +class TelegramIdentityNotFoundError(LookupError): + """Raised when a linked Telegram identity is not visible for the workspace.""" + + +class TelegramMessageNotFoundError(LookupError): + """Raised when a Telegram message is not visible for dispatch.""" + + +class TelegramRoutingError(RuntimeError): + """Raised when Telegram routing cannot be resolved deterministically.""" + + +class TelegramWebhookValidationError(ValueError): + """Raised when incoming webhook payload is missing required Telegram fields.""" + + +class TelegramChannelIdentityRow(TypedDict): + id: UUID + user_account_id: UUID + workspace_id: UUID + channel_type: str + external_user_id: str + external_chat_id: str + external_username: str | None + status: str + linked_at: datetime + unlinked_at: datetime | None + created_at: datetime + updated_at: datetime + + +class TelegramChannelLinkChallengeRow(TypedDict): + id: UUID + user_account_id: UUID + workspace_id: UUID + channel_type: str + challenge_token_hash: str + link_code: str + status: str + expires_at: datetime + confirmed_at: datetime | None + channel_identity_id: UUID | None + created_at: datetime + + +class IssuedTelegramChannelLinkChallengeRow(TelegramChannelLinkChallengeRow): + challenge_token: str + + +class TelegramChannelThreadRow(TypedDict): + id: UUID + workspace_id: UUID + channel_type: str + external_thread_key: str + channel_identity_id: UUID | None + last_message_at: datetime | None + created_at: datetime + updated_at: datetime + + +class TelegramChannelMessageRow(TypedDict): + id: UUID + workspace_id: UUID | None + channel_thread_id: UUID | None + channel_identity_id: UUID | None + channel_type: str + direction: str + provider_update_id: str | None + provider_message_id: str | None + external_chat_id: str | None + external_user_id: str | None + message_text: str | None + normalized_payload: dict[str, Any] + route_status: str + idempotency_key: str + created_at: datetime + received_at: datetime + + +class TelegramDeliveryReceiptRow(TypedDict): + id: UUID + workspace_id: UUID + channel_message_id: UUID + channel_type: str + status: str + provider_receipt_id: str | None + failure_code: str | None + failure_detail: str | None + recorded_at: datetime + created_at: datetime + + +class NormalizedTelegramInboundMessage(TypedDict): + provider_update_id: str + provider_message_id: str + external_chat_id: str + external_user_id: str + external_username: str | None + message_text: str + sent_at: datetime + link_code: str | None + idempotency_key: str + normalized_payload: dict[str, Any] + + +class TelegramWebhookIngestResult(TypedDict): + duplicate: bool + route_status: str + link_status: str | None + unknown_chat_routing: bool + message: TelegramChannelMessageRow + thread: TelegramChannelThreadRow | None + + +def build_inbound_idempotency_key(*, update_id: int) -> str: + payload = f"telegram:update:{update_id}" + return hashlib.sha256(payload.encode("utf-8")).hexdigest() + + +def resolve_telegram_thread_key(*, external_chat_id: str) -> str: + return f"telegram-chat:{external_chat_id}" + + +def extract_telegram_link_code( + text: str, + *, + bot_username: str | None, +) -> str | None: + normalized_text = text.strip() + if normalized_text == "": + return None + + for pattern in (_LINK_PATTERN, _START_PATTERN): + match = pattern.match(normalized_text) + if match is None: + continue + mention = match.groupdict().get("mention") + if mention is not None and bot_username and mention.lower() != bot_username.lower(): + return None + code = match.group("code").strip().upper() + if code == "": + return None + return code + + return None + + +def normalize_telegram_update( + payload: dict[str, Any], + *, + bot_username: str | None, +) -> NormalizedTelegramInboundMessage: + raw_update_id = payload.get("update_id") + if not isinstance(raw_update_id, int): + raise TelegramWebhookValidationError("telegram webhook payload requires integer update_id") + + raw_message = payload.get("message") + if not isinstance(raw_message, dict): + raise TelegramWebhookValidationError("telegram webhook payload requires message object") + + raw_chat = raw_message.get("chat") + if not isinstance(raw_chat, dict) or "id" not in raw_chat: + raise TelegramWebhookValidationError("telegram webhook message requires chat.id") + + raw_from = raw_message.get("from") + if not isinstance(raw_from, dict) or "id" not in raw_from: + raise TelegramWebhookValidationError("telegram webhook message requires from.id") + + raw_message_id = raw_message.get("message_id") + if not isinstance(raw_message_id, int): + raise TelegramWebhookValidationError("telegram webhook message requires integer message_id") + + text = raw_message.get("text") + normalized_text = text.strip() if isinstance(text, str) else "" + + sent_at: datetime + raw_date = raw_message.get("date") + if isinstance(raw_date, int): + sent_at = datetime.fromtimestamp(raw_date, tz=timezone.utc) + else: + sent_at = utc_now() + + external_chat_id = str(raw_chat["id"]) + external_user_id = str(raw_from["id"]) + username = raw_from.get("username") + external_username = username.strip() if isinstance(username, str) and username.strip() else None + + link_code = extract_telegram_link_code(normalized_text, bot_username=bot_username) + + normalized_payload = { + "update_id": raw_update_id, + "message_id": raw_message_id, + "chat": { + "id": external_chat_id, + "type": raw_chat.get("type"), + }, + "from": { + "id": external_user_id, + "username": external_username, + }, + "text": normalized_text, + "received_kind": "telegram_webhook", + "link_code": link_code, + } + + return { + "provider_update_id": str(raw_update_id), + "provider_message_id": str(raw_message_id), + "external_chat_id": external_chat_id, + "external_user_id": external_user_id, + "external_username": external_username, + "message_text": normalized_text, + "sent_at": sent_at, + "link_code": link_code, + "idempotency_key": build_inbound_idempotency_key(update_id=raw_update_id), + "normalized_payload": normalized_payload, + } + + +def _generate_link_code(length: int = 8) -> str: + alphabet = string.ascii_uppercase + string.digits + return "".join(secrets.choice(alphabet) for _ in range(length)) + + +def start_telegram_link_challenge( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, + ttl_seconds: int, +) -> IssuedTelegramChannelLinkChallengeRow: + now = utc_now() + + with conn.cursor() as cur: + cur.execute( + """ + UPDATE channel_link_challenges + SET status = 'expired' + WHERE user_account_id = %s + AND workspace_id = %s + AND channel_type = %s + AND status = 'pending' + AND expires_at > %s + """, + (user_account_id, workspace_id, TELEGRAM_CHANNEL_TYPE, now), + ) + + challenge_token = generate_token() + challenge_token_hash = hash_token(challenge_token) + link_code = _generate_link_code() + expires_at = now + timedelta(seconds=ttl_seconds) + + cur.execute( + """ + INSERT INTO channel_link_challenges ( + user_account_id, + workspace_id, + channel_type, + challenge_token_hash, + link_code, + status, + expires_at + ) + VALUES (%s, %s, %s, %s, %s, 'pending', %s) + RETURNING id, user_account_id, workspace_id, channel_type, challenge_token_hash, + link_code, status, expires_at, confirmed_at, channel_identity_id, created_at + """, + ( + user_account_id, + workspace_id, + TELEGRAM_CHANNEL_TYPE, + challenge_token_hash, + link_code, + expires_at, + ), + ) + challenge = cur.fetchone() + + if challenge is None: + raise RuntimeError("failed to create telegram link challenge") + + challenge["challenge_token"] = challenge_token + return challenge + + +def _lookup_link_challenge_for_update( + conn, + *, + user_account_id: UUID, + challenge_token: str, +) -> TelegramChannelLinkChallengeRow | None: + token = challenge_token.strip() + if token == "": + return None + + with conn.cursor() as cur: + cur.execute( + """ + SELECT id, user_account_id, workspace_id, channel_type, challenge_token_hash, link_code, + status, expires_at, confirmed_at, channel_identity_id, created_at + FROM channel_link_challenges + WHERE user_account_id = %s + AND channel_type = %s + AND challenge_token_hash = %s + FOR UPDATE + """, + (user_account_id, TELEGRAM_CHANNEL_TYPE, hash_token(token)), + ) + return cur.fetchone() + + +def _fetch_channel_identity_by_id(conn, *, channel_identity_id: UUID) -> TelegramChannelIdentityRow | None: + with conn.cursor() as cur: + cur.execute( + """ + SELECT id, user_account_id, workspace_id, channel_type, external_user_id, external_chat_id, + external_username, status, linked_at, unlinked_at, created_at, updated_at + FROM channel_identities + WHERE id = %s + """, + (channel_identity_id,), + ) + return cur.fetchone() + + +def confirm_telegram_link_challenge( + conn, + *, + user_account_id: UUID, + challenge_token: str, +) -> tuple[TelegramChannelLinkChallengeRow, TelegramChannelIdentityRow]: + now = utc_now() + challenge = _lookup_link_challenge_for_update( + conn, + user_account_id=user_account_id, + challenge_token=challenge_token, + ) + if challenge is None: + raise TelegramLinkTokenInvalidError("telegram link token is invalid") + + if challenge["status"] == "confirmed" and challenge["channel_identity_id"] is not None: + identity = _fetch_channel_identity_by_id(conn, channel_identity_id=challenge["channel_identity_id"]) + if identity is not None: + return challenge, identity + + if challenge["status"] != "pending": + raise TelegramLinkTokenInvalidError("telegram link token is no longer valid") + + if challenge["expires_at"] <= now: + with conn.cursor() as cur: + cur.execute( + """ + UPDATE channel_link_challenges + SET status = 'expired' + WHERE id = %s + """, + (challenge["id"],), + ) + raise TelegramLinkTokenExpiredError("telegram link token has expired") + + if challenge["channel_identity_id"] is None: + raise TelegramLinkPendingError("telegram link is pending webhook confirmation") + + with conn.cursor() as cur: + cur.execute( + """ + UPDATE channel_link_challenges + SET status = 'confirmed', + confirmed_at = COALESCE(confirmed_at, %s) + WHERE id = %s + RETURNING id, user_account_id, workspace_id, channel_type, challenge_token_hash, + link_code, status, expires_at, confirmed_at, channel_identity_id, created_at + """, + (now, challenge["id"]), + ) + updated = cur.fetchone() + + if updated is None or updated["channel_identity_id"] is None: + raise TelegramLinkPendingError("telegram link is pending webhook confirmation") + + identity = _fetch_channel_identity_by_id(conn, channel_identity_id=updated["channel_identity_id"]) + if identity is None: + raise TelegramLinkPendingError("telegram link is pending webhook confirmation") + + return updated, identity + + +def _upsert_linked_identity( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, + external_chat_id: str, + external_user_id: str, + external_username: str | None, +) -> TelegramChannelIdentityRow: + now = utc_now() + + with conn.cursor() as cur: + cur.execute( + """ + SELECT id, user_account_id, workspace_id, channel_type, external_user_id, external_chat_id, + external_username, status, linked_at, unlinked_at, created_at, updated_at + FROM channel_identities + WHERE channel_type = %s + AND external_chat_id = %s + AND status = 'linked' + ORDER BY updated_at DESC, created_at DESC, id DESC + LIMIT 1 + FOR UPDATE + """, + (TELEGRAM_CHANNEL_TYPE, external_chat_id), + ) + linked = cur.fetchone() + + if linked is not None: + if linked["workspace_id"] != workspace_id: + raise TelegramIdentityConflictError( + "telegram chat is already linked to another workspace" + ) + + cur.execute( + """ + UPDATE channel_identities + SET user_account_id = %s, + external_user_id = %s, + external_username = %s, + updated_at = %s + WHERE id = %s + RETURNING id, user_account_id, workspace_id, channel_type, external_user_id, + external_chat_id, external_username, status, linked_at, unlinked_at, + created_at, updated_at + """, + ( + user_account_id, + external_user_id, + external_username, + now, + linked["id"], + ), + ) + refreshed = cur.fetchone() + if refreshed is None: + raise RuntimeError("failed to refresh linked telegram identity") + return refreshed + + cur.execute( + """ + SELECT id, user_account_id, workspace_id, channel_type, external_user_id, external_chat_id, + external_username, status, linked_at, unlinked_at, created_at, updated_at + FROM channel_identities + WHERE workspace_id = %s + AND channel_type = %s + AND external_chat_id = %s + ORDER BY created_at DESC, id DESC + LIMIT 1 + FOR UPDATE + """, + (workspace_id, TELEGRAM_CHANNEL_TYPE, external_chat_id), + ) + prior = cur.fetchone() + + if prior is not None: + cur.execute( + """ + UPDATE channel_identities + SET user_account_id = %s, + external_user_id = %s, + external_username = %s, + status = 'linked', + linked_at = %s, + unlinked_at = NULL, + updated_at = %s + WHERE id = %s + RETURNING id, user_account_id, workspace_id, channel_type, external_user_id, + external_chat_id, external_username, status, linked_at, unlinked_at, + created_at, updated_at + """, + ( + user_account_id, + external_user_id, + external_username, + now, + now, + prior["id"], + ), + ) + relinked = cur.fetchone() + if relinked is None: + raise RuntimeError("failed to relink telegram identity") + return relinked + + cur.execute( + """ + INSERT INTO channel_identities ( + user_account_id, + workspace_id, + channel_type, + external_user_id, + external_chat_id, + external_username, + status, + linked_at, + updated_at + ) + VALUES (%s, %s, %s, %s, %s, %s, 'linked', %s, %s) + RETURNING id, user_account_id, workspace_id, channel_type, external_user_id, + external_chat_id, external_username, status, linked_at, unlinked_at, + created_at, updated_at + """, + ( + user_account_id, + workspace_id, + TELEGRAM_CHANNEL_TYPE, + external_user_id, + external_chat_id, + external_username, + now, + now, + ), + ) + inserted = cur.fetchone() + + if inserted is None: + raise RuntimeError("failed to insert linked telegram identity") + return inserted + + +def _apply_link_code_if_present( + conn, + *, + normalized: NormalizedTelegramInboundMessage, +) -> tuple[str | None, TelegramChannelIdentityRow | None]: + link_code = normalized["link_code"] + if link_code is None: + return None, None + + now = utc_now() + with conn.cursor() as cur: + cur.execute( + """ + SELECT id, user_account_id, workspace_id, channel_type, challenge_token_hash, link_code, + status, expires_at, confirmed_at, channel_identity_id, created_at + FROM channel_link_challenges + WHERE channel_type = %s + AND link_code = %s + ORDER BY created_at DESC, id DESC + LIMIT 1 + FOR UPDATE + """, + (TELEGRAM_CHANNEL_TYPE, link_code), + ) + challenge = cur.fetchone() + + if challenge is None: + return "invalid_link_code", None + + if challenge["status"] != "pending": + if challenge["status"] == "confirmed" and challenge["channel_identity_id"] is not None: + identity = _fetch_channel_identity_by_id( + conn, + channel_identity_id=challenge["channel_identity_id"], + ) + if ( + identity is not None + and identity["status"] == "linked" + and identity["external_chat_id"] == normalized["external_chat_id"] + ): + return "already_confirmed", identity + return "invalid_link_code", None + return "invalid_link_code", None + + if challenge["expires_at"] <= now: + with conn.cursor() as cur: + cur.execute( + """ + UPDATE channel_link_challenges + SET status = 'expired' + WHERE id = %s + """, + (challenge["id"],), + ) + return "expired_link_code", None + + try: + identity = _upsert_linked_identity( + conn, + user_account_id=challenge["user_account_id"], + workspace_id=challenge["workspace_id"], + external_chat_id=normalized["external_chat_id"], + external_user_id=normalized["external_user_id"], + external_username=normalized["external_username"], + ) + except TelegramIdentityConflictError: + return "identity_conflict", None + + with conn.cursor() as cur: + cur.execute( + """ + UPDATE channel_link_challenges + SET status = 'confirmed', + confirmed_at = %s, + channel_identity_id = %s + WHERE id = %s + """, + (now, identity["id"], challenge["id"]), + ) + + return "confirmed", identity + + +def _resolve_workspace_and_identity_for_inbound( + conn, + *, + normalized: NormalizedTelegramInboundMessage, + linked_identity: TelegramChannelIdentityRow | None, +) -> tuple[UUID | None, UUID | None]: + if linked_identity is not None: + return linked_identity["workspace_id"], linked_identity["id"] + + with conn.cursor() as cur: + cur.execute( + """ + SELECT id, workspace_id + FROM channel_identities + WHERE channel_type = %s + AND external_chat_id = %s + AND status = 'linked' + ORDER BY updated_at DESC, created_at DESC, id DESC + LIMIT 1 + """, + (TELEGRAM_CHANNEL_TYPE, normalized["external_chat_id"]), + ) + row = cur.fetchone() + + if row is None: + return None, None + return row["workspace_id"], row["id"] + + +def _ensure_channel_thread( + conn, + *, + workspace_id: UUID, + channel_identity_id: UUID | None, + external_chat_id: str, + sent_at: datetime, +) -> TelegramChannelThreadRow: + external_thread_key = resolve_telegram_thread_key(external_chat_id=external_chat_id) + + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO channel_threads ( + workspace_id, + channel_type, + external_thread_key, + channel_identity_id, + last_message_at, + updated_at + ) + VALUES (%s, %s, %s, %s, %s, %s) + ON CONFLICT (workspace_id, channel_type, external_thread_key) DO UPDATE + SET channel_identity_id = COALESCE(EXCLUDED.channel_identity_id, channel_threads.channel_identity_id), + last_message_at = EXCLUDED.last_message_at, + updated_at = EXCLUDED.updated_at + RETURNING id, workspace_id, channel_type, external_thread_key, + channel_identity_id, last_message_at, created_at, updated_at + """, + ( + workspace_id, + TELEGRAM_CHANNEL_TYPE, + external_thread_key, + channel_identity_id, + sent_at, + utc_now(), + ), + ) + row = cur.fetchone() + + if row is None: + raise RuntimeError("failed to resolve telegram thread") + return row + + +def _insert_or_get_channel_message( + conn, + *, + workspace_id: UUID | None, + channel_thread_id: UUID | None, + channel_identity_id: UUID | None, + normalized: NormalizedTelegramInboundMessage, + route_status: str, +) -> tuple[TelegramChannelMessageRow, bool]: + now = utc_now() + + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO channel_messages ( + workspace_id, + channel_thread_id, + channel_identity_id, + channel_type, + direction, + provider_update_id, + provider_message_id, + external_chat_id, + external_user_id, + message_text, + normalized_payload, + route_status, + idempotency_key, + received_at + ) + VALUES (%s, %s, %s, %s, 'inbound', %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (channel_type, direction, idempotency_key) DO NOTHING + RETURNING id, workspace_id, channel_thread_id, channel_identity_id, + channel_type, direction, provider_update_id, provider_message_id, + external_chat_id, external_user_id, message_text, + normalized_payload, route_status, idempotency_key, created_at, received_at + """, + ( + workspace_id, + channel_thread_id, + channel_identity_id, + TELEGRAM_CHANNEL_TYPE, + normalized["provider_update_id"], + normalized["provider_message_id"], + normalized["external_chat_id"], + normalized["external_user_id"], + normalized["message_text"], + Jsonb(normalized["normalized_payload"]), + route_status, + normalized["idempotency_key"], + now, + ), + ) + inserted = cur.fetchone() + + if inserted is not None: + return inserted, False + + cur.execute( + """ + SELECT id, workspace_id, channel_thread_id, channel_identity_id, channel_type, direction, + provider_update_id, provider_message_id, external_chat_id, external_user_id, + message_text, normalized_payload, route_status, idempotency_key, + created_at, received_at + FROM channel_messages + WHERE channel_type = %s + AND direction = 'inbound' + AND idempotency_key = %s + LIMIT 1 + """, + (TELEGRAM_CHANNEL_TYPE, normalized["idempotency_key"]), + ) + existing = cur.fetchone() + + if existing is None: + raise RuntimeError("failed to resolve idempotent telegram channel message") + return existing, True + + +def ingest_telegram_webhook( + conn, + *, + payload: dict[str, Any], + bot_username: str | None, +) -> TelegramWebhookIngestResult: + normalized = normalize_telegram_update(payload, bot_username=bot_username) + + link_status, linked_identity = _apply_link_code_if_present(conn, normalized=normalized) + workspace_id, channel_identity_id = _resolve_workspace_and_identity_for_inbound( + conn, + normalized=normalized, + linked_identity=linked_identity, + ) + + route_status = "resolved" if workspace_id is not None else "unresolved" + thread: TelegramChannelThreadRow | None = None + + if workspace_id is not None: + thread = _ensure_channel_thread( + conn, + workspace_id=workspace_id, + channel_identity_id=channel_identity_id, + external_chat_id=normalized["external_chat_id"], + sent_at=normalized["sent_at"], + ) + + message, duplicate = _insert_or_get_channel_message( + conn, + workspace_id=workspace_id, + channel_thread_id=None if thread is None else thread["id"], + channel_identity_id=channel_identity_id, + normalized=normalized, + route_status=route_status, + ) + + if not duplicate and workspace_id is not None: + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO chat_intents ( + workspace_id, + channel_message_id, + channel_thread_id, + intent_kind, + status + ) + VALUES (%s, %s, %s, 'inbound_message', 'recorded') + ON CONFLICT (channel_message_id, intent_kind) DO NOTHING + """, + ( + workspace_id, + message["id"], + None if thread is None else thread["id"], + ), + ) + + return { + "duplicate": duplicate, + "route_status": route_status, + "link_status": link_status, + "unknown_chat_routing": workspace_id is None, + "message": message, + "thread": thread, + } + + +def list_workspace_telegram_messages( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, + limit: int, +) -> list[TelegramChannelMessageRow]: + with conn.cursor() as cur: + cur.execute( + """ + SELECT m.id, + m.workspace_id, + m.channel_thread_id, + m.channel_identity_id, + m.channel_type, + m.direction, + m.provider_update_id, + m.provider_message_id, + m.external_chat_id, + m.external_user_id, + m.message_text, + m.normalized_payload, + m.route_status, + m.idempotency_key, + m.created_at, + m.received_at + FROM channel_messages AS m + JOIN workspace_members AS wm + ON wm.workspace_id = m.workspace_id + WHERE m.channel_type = %s + AND m.workspace_id = %s + AND wm.user_account_id = %s + ORDER BY m.created_at DESC, m.id DESC + LIMIT %s + """, + (TELEGRAM_CHANNEL_TYPE, workspace_id, user_account_id, limit), + ) + rows = cur.fetchall() + + return rows + + +def list_workspace_telegram_threads( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, + limit: int, +) -> list[TelegramChannelThreadRow]: + with conn.cursor() as cur: + cur.execute( + """ + SELECT t.id, + t.workspace_id, + t.channel_type, + t.external_thread_key, + t.channel_identity_id, + t.last_message_at, + t.created_at, + t.updated_at + FROM channel_threads AS t + JOIN workspace_members AS wm + ON wm.workspace_id = t.workspace_id + WHERE t.channel_type = %s + AND t.workspace_id = %s + AND wm.user_account_id = %s + ORDER BY COALESCE(t.last_message_at, t.created_at) DESC, t.id DESC + LIMIT %s + """, + (TELEGRAM_CHANNEL_TYPE, workspace_id, user_account_id, limit), + ) + rows = cur.fetchall() + + return rows + + +def _get_latest_linked_identity( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, +) -> TelegramChannelIdentityRow | None: + with conn.cursor() as cur: + cur.execute( + """ + SELECT id, user_account_id, workspace_id, channel_type, external_user_id, external_chat_id, + external_username, status, linked_at, unlinked_at, created_at, updated_at + FROM channel_identities + WHERE user_account_id = %s + AND workspace_id = %s + AND channel_type = %s + AND status = 'linked' + ORDER BY updated_at DESC, created_at DESC, id DESC + LIMIT 1 + """, + (user_account_id, workspace_id, TELEGRAM_CHANNEL_TYPE), + ) + return cur.fetchone() + + +def get_telegram_link_status( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, +) -> dict[str, Any]: + identity = _get_latest_linked_identity( + conn, + user_account_id=user_account_id, + workspace_id=workspace_id, + ) + + with conn.cursor() as cur: + cur.execute( + """ + SELECT id, user_account_id, workspace_id, channel_type, challenge_token_hash, link_code, + status, expires_at, confirmed_at, channel_identity_id, created_at + FROM channel_link_challenges + WHERE user_account_id = %s + AND workspace_id = %s + AND channel_type = %s + ORDER BY created_at DESC, id DESC + LIMIT 1 + """, + (user_account_id, workspace_id, TELEGRAM_CHANNEL_TYPE), + ) + latest_challenge = cur.fetchone() + + cur.execute( + """ + SELECT id, route_status, direction, created_at + FROM channel_messages + WHERE workspace_id = %s + AND channel_type = %s + ORDER BY created_at DESC, id DESC + LIMIT 1 + """, + (workspace_id, TELEGRAM_CHANNEL_TYPE), + ) + recent_message = cur.fetchone() + + return { + "workspace_id": str(workspace_id), + "channel_type": TELEGRAM_CHANNEL_TYPE, + "linked": identity is not None, + "identity": None if identity is None else serialize_channel_identity(identity), + "latest_challenge": None + if latest_challenge is None + else serialize_channel_link_challenge(latest_challenge, include_token=False), + "recent_transport": None + if recent_message is None + else { + "message_id": str(recent_message["id"]), + "direction": recent_message["direction"], + "route_status": recent_message["route_status"], + "observed_at": recent_message["created_at"].isoformat(), + }, + } + + +def unlink_telegram_identity( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, +) -> TelegramChannelIdentityRow: + now = utc_now() + + with conn.cursor() as cur: + cur.execute( + """ + UPDATE channel_identities + SET status = 'unlinked', + unlinked_at = %s, + updated_at = %s + WHERE id = ( + SELECT id + FROM channel_identities + WHERE user_account_id = %s + AND workspace_id = %s + AND channel_type = %s + AND status = 'linked' + ORDER BY updated_at DESC, created_at DESC, id DESC + LIMIT 1 + ) + RETURNING id, user_account_id, workspace_id, channel_type, external_user_id, + external_chat_id, external_username, status, linked_at, unlinked_at, + created_at, updated_at + """, + (now, now, user_account_id, workspace_id, TELEGRAM_CHANNEL_TYPE), + ) + identity = cur.fetchone() + + if identity is None: + raise TelegramIdentityNotFoundError("telegram channel is not linked for this workspace") + + cur.execute( + """ + UPDATE channel_link_challenges + SET status = 'cancelled' + WHERE user_account_id = %s + AND workspace_id = %s + AND channel_type = %s + AND status = 'pending' + """, + (user_account_id, workspace_id, TELEGRAM_CHANNEL_TYPE), + ) + + return identity + + +def dispatch_telegram_message( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, + source_message_id: UUID, + text: str, + dispatch_idempotency_key: str | None, + bot_token: str, +) -> tuple[TelegramChannelMessageRow, TelegramDeliveryReceiptRow]: + normalized_text = text.strip() + if normalized_text == "": + raise ValueError("dispatch text is required") + + with conn.cursor() as cur: + cur.execute( + """ + SELECT m.id, + m.workspace_id, + m.channel_thread_id, + m.channel_identity_id, + m.channel_type, + m.direction, + m.provider_update_id, + m.provider_message_id, + m.external_chat_id, + m.external_user_id, + m.message_text, + m.normalized_payload, + m.route_status, + m.idempotency_key, + m.created_at, + m.received_at + FROM channel_messages AS m + JOIN workspace_members AS wm + ON wm.workspace_id = m.workspace_id + WHERE m.id = %s + AND wm.user_account_id = %s + AND m.workspace_id = %s + AND m.channel_type = %s + LIMIT 1 + """, + (source_message_id, user_account_id, workspace_id, TELEGRAM_CHANNEL_TYPE), + ) + source = cur.fetchone() + + if source is None: + raise TelegramMessageNotFoundError("telegram source message was not found") + + if source["route_status"] != "resolved": + raise TelegramRoutingError("telegram source message does not have resolved routing") + + external_chat_id = source["external_chat_id"] + if external_chat_id is None: + raise TelegramRoutingError("telegram source message is missing external chat id") + + resolved_idempotency_key = dispatch_idempotency_key + if resolved_idempotency_key is None: + resolved_idempotency_key = hashlib.sha256( + f"telegram:dispatch:{source_message_id}:{normalized_text}".encode("utf-8") + ).hexdigest() + + provider_message_id = f"simulated:{secrets.token_hex(10)}" + now = utc_now() + + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO channel_messages ( + workspace_id, + channel_thread_id, + channel_identity_id, + channel_type, + direction, + provider_update_id, + provider_message_id, + external_chat_id, + external_user_id, + message_text, + normalized_payload, + route_status, + idempotency_key, + received_at + ) + VALUES (%s, %s, %s, %s, 'outbound', NULL, %s, %s, %s, %s, %s, 'resolved', %s, %s) + ON CONFLICT (channel_type, direction, idempotency_key) DO NOTHING + RETURNING id, workspace_id, channel_thread_id, channel_identity_id, + channel_type, direction, provider_update_id, provider_message_id, + external_chat_id, external_user_id, message_text, + normalized_payload, route_status, idempotency_key, created_at, received_at + """, + ( + workspace_id, + source["channel_thread_id"], + source["channel_identity_id"], + TELEGRAM_CHANNEL_TYPE, + provider_message_id, + external_chat_id, + source["external_user_id"], + normalized_text, + Jsonb( + { + "dispatch": { + "source_message_id": str(source_message_id), + "mode": "simulated" if bot_token.strip() == "" else "deterministic_failure", + } + } + ), + resolved_idempotency_key, + now, + ), + ) + outbound = cur.fetchone() + + if outbound is None: + cur.execute( + """ + SELECT id, workspace_id, channel_thread_id, channel_identity_id, channel_type, + direction, provider_update_id, provider_message_id, external_chat_id, + external_user_id, message_text, normalized_payload, route_status, + idempotency_key, created_at, received_at + FROM channel_messages + WHERE channel_type = %s + AND direction = 'outbound' + AND idempotency_key = %s + LIMIT 1 + """, + (TELEGRAM_CHANNEL_TYPE, resolved_idempotency_key), + ) + outbound = cur.fetchone() + + if outbound is None: + raise RuntimeError("failed to create outbound telegram message") + + receipt_status = "simulated" + failure_code: str | None = None + failure_detail: str | None = None + provider_receipt_id: str | None = outbound["provider_message_id"] + + if bot_token.strip() != "": + receipt_status = "failed" + failure_code = "telegram_transport_not_enabled" + failure_detail = "live telegram dispatch is not enabled in this environment" + provider_receipt_id = None + + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO channel_delivery_receipts ( + workspace_id, + channel_message_id, + channel_type, + status, + provider_receipt_id, + failure_code, + failure_detail, + recorded_at + ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (channel_message_id) DO UPDATE + SET status = EXCLUDED.status, + provider_receipt_id = EXCLUDED.provider_receipt_id, + failure_code = EXCLUDED.failure_code, + failure_detail = EXCLUDED.failure_detail, + recorded_at = EXCLUDED.recorded_at + RETURNING id, workspace_id, channel_message_id, channel_type, + status, provider_receipt_id, failure_code, failure_detail, + recorded_at, created_at + """, + ( + workspace_id, + outbound["id"], + TELEGRAM_CHANNEL_TYPE, + receipt_status, + provider_receipt_id, + failure_code, + failure_detail, + utc_now(), + ), + ) + receipt = cur.fetchone() + + if receipt is None: + raise RuntimeError("failed to persist telegram delivery receipt") + + return outbound, receipt + + +def list_workspace_telegram_delivery_receipts( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, + limit: int, +) -> list[TelegramDeliveryReceiptRow]: + with conn.cursor() as cur: + cur.execute( + """ + SELECT r.id, + r.workspace_id, + r.channel_message_id, + r.channel_type, + r.status, + r.provider_receipt_id, + r.failure_code, + r.failure_detail, + r.recorded_at, + r.created_at + FROM channel_delivery_receipts AS r + JOIN workspace_members AS wm + ON wm.workspace_id = r.workspace_id + WHERE r.channel_type = %s + AND r.workspace_id = %s + AND wm.user_account_id = %s + ORDER BY r.recorded_at DESC, r.id DESC + LIMIT %s + """, + (TELEGRAM_CHANNEL_TYPE, workspace_id, user_account_id, limit), + ) + rows = cur.fetchall() + + return rows + + +def serialize_channel_identity(identity: TelegramChannelIdentityRow) -> dict[str, object]: + return { + "id": str(identity["id"]), + "user_account_id": str(identity["user_account_id"]), + "workspace_id": str(identity["workspace_id"]), + "channel_type": identity["channel_type"], + "external_user_id": identity["external_user_id"], + "external_chat_id": identity["external_chat_id"], + "external_username": identity["external_username"], + "status": identity["status"], + "linked_at": identity["linked_at"].isoformat(), + "unlinked_at": None if identity["unlinked_at"] is None else identity["unlinked_at"].isoformat(), + "created_at": identity["created_at"].isoformat(), + "updated_at": identity["updated_at"].isoformat(), + } + + +def serialize_channel_link_challenge( + challenge: TelegramChannelLinkChallengeRow | IssuedTelegramChannelLinkChallengeRow, + *, + include_token: bool, +) -> dict[str, object]: + payload: dict[str, object] = { + "id": str(challenge["id"]), + "user_account_id": str(challenge["user_account_id"]), + "workspace_id": str(challenge["workspace_id"]), + "channel_type": challenge["channel_type"], + "link_code": challenge["link_code"], + "status": challenge["status"], + "expires_at": challenge["expires_at"].isoformat(), + "confirmed_at": None + if challenge["confirmed_at"] is None + else challenge["confirmed_at"].isoformat(), + "channel_identity_id": None + if challenge["channel_identity_id"] is None + else str(challenge["channel_identity_id"]), + "created_at": challenge["created_at"].isoformat(), + } + if include_token: + token = challenge.get("challenge_token") + if not isinstance(token, str): + raise ValueError("challenge token is required for issued link challenge serialization") + payload["challenge_token"] = token + return payload + + +def serialize_channel_thread(thread: TelegramChannelThreadRow) -> dict[str, object]: + return { + "id": str(thread["id"]), + "workspace_id": str(thread["workspace_id"]), + "channel_type": thread["channel_type"], + "external_thread_key": thread["external_thread_key"], + "channel_identity_id": None + if thread["channel_identity_id"] is None + else str(thread["channel_identity_id"]), + "last_message_at": None + if thread["last_message_at"] is None + else thread["last_message_at"].isoformat(), + "created_at": thread["created_at"].isoformat(), + "updated_at": thread["updated_at"].isoformat(), + } + + +def serialize_channel_message(message: TelegramChannelMessageRow) -> dict[str, object]: + return { + "id": str(message["id"]), + "workspace_id": None if message["workspace_id"] is None else str(message["workspace_id"]), + "channel_thread_id": None + if message["channel_thread_id"] is None + else str(message["channel_thread_id"]), + "channel_identity_id": None + if message["channel_identity_id"] is None + else str(message["channel_identity_id"]), + "channel_type": message["channel_type"], + "direction": message["direction"], + "provider_update_id": message["provider_update_id"], + "provider_message_id": message["provider_message_id"], + "external_chat_id": message["external_chat_id"], + "external_user_id": message["external_user_id"], + "message_text": message["message_text"], + "normalized_payload": message["normalized_payload"], + "route_status": message["route_status"], + "idempotency_key": message["idempotency_key"], + "created_at": message["created_at"].isoformat(), + "received_at": message["received_at"].isoformat(), + } + + +def serialize_delivery_receipt(receipt: TelegramDeliveryReceiptRow) -> dict[str, object]: + return { + "id": str(receipt["id"]), + "workspace_id": str(receipt["workspace_id"]), + "channel_message_id": str(receipt["channel_message_id"]), + "channel_type": receipt["channel_type"], + "status": receipt["status"], + "provider_receipt_id": receipt["provider_receipt_id"], + "failure_code": receipt["failure_code"], + "failure_detail": receipt["failure_detail"], + "recorded_at": receipt["recorded_at"].isoformat(), + "created_at": receipt["created_at"].isoformat(), + } + + +def serialize_webhook_ingest_result(result: TelegramWebhookIngestResult) -> dict[str, object]: + return { + "duplicate": result["duplicate"], + "route_status": result["route_status"], + "link_status": result["link_status"], + "unknown_chat_routing": result["unknown_chat_routing"], + "message": serialize_channel_message(result["message"]), + "thread": None if result["thread"] is None else serialize_channel_thread(result["thread"]), + } diff --git a/apps/web/app/settings/page.test.tsx b/apps/web/app/settings/page.test.tsx index a85ce9c..f953590 100644 --- a/apps/web/app/settings/page.test.tsx +++ b/apps/web/app/settings/page.test.tsx @@ -9,14 +9,14 @@ describe("SettingsPage", () => { cleanup(); }); - it("renders hosted settings foundations and no Telegram delivery claim", () => { + it("renders telegram channel settings and preserves continuity boundary claims", () => { render(); expect(screen.getByRole("heading", { level: 1, name: "Hosted Settings" })).toBeInTheDocument(); - expect(screen.getByText("Preference Foundations")).toBeInTheDocument(); + expect(screen.getByText("Telegram Channel Settings")).toBeInTheDocument(); expect( - screen.getByText("Persist IANA timezone for future scheduled brief orchestration."), + screen.getByText("Issue a deterministic link challenge bound to the active hosted workspace."), ).toBeInTheDocument(); - expect(screen.getByText(/do not claim Telegram linkage/i)).toBeInTheDocument(); + expect(screen.getByText(/does not claim Telegram continuity capture/i)).toBeInTheDocument(); }); }); diff --git a/apps/web/app/settings/page.tsx b/apps/web/app/settings/page.tsx index 5145506..65c422b 100644 --- a/apps/web/app/settings/page.tsx +++ b/apps/web/app/settings/page.tsx @@ -5,9 +5,9 @@ export default function SettingsPage() { return (
diff --git a/apps/web/components/hosted-settings-panel.test.tsx b/apps/web/components/hosted-settings-panel.test.tsx new file mode 100644 index 0000000..3833694 --- /dev/null +++ b/apps/web/components/hosted-settings-panel.test.tsx @@ -0,0 +1,65 @@ +import React from "react"; +import { cleanup, fireEvent, render, screen, waitFor } from "@testing-library/react"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import { HostedSettingsPanel } from "./hosted-settings-panel"; + +describe("HostedSettingsPanel", () => { + const fetchMock = vi.fn(); + + beforeEach(() => { + vi.stubGlobal("fetch", fetchMock); + fetchMock.mockReset(); + }); + + afterEach(() => { + cleanup(); + vi.unstubAllGlobals(); + }); + + it("shows telegram link, status, and receipt controls without continuity claims", () => { + render(); + + expect(screen.getByText("Telegram Channel Settings")).toBeInTheDocument(); + expect(screen.getByText(/Telegram Link Start/i)).toBeInTheDocument(); + expect(screen.getByText(/Telegram Link Confirm/i)).toBeInTheDocument(); + expect(screen.getByText(/Telegram Status \+ Unlink/i)).toBeInTheDocument(); + expect(screen.getAllByText(/Messages, Threads, Receipts/i).length).toBeGreaterThan(0); + expect(screen.getByText(/does not claim Telegram continuity capture/i)).toBeInTheDocument(); + }); + + it("starts telegram link challenge from hosted controls", async () => { + fetchMock.mockResolvedValueOnce( + new Response( + JSON.stringify({ + challenge: { + challenge_token: "telegram-test-challenge-token", + link_code: "CODE2026", + status: "pending", + expires_at: "2026-04-08T18:45:00Z", + }, + instructions: { + bot_username: "alicebot", + command: "/link CODE2026", + }, + }), + ), + ); + + render(); + + fireEvent.change(screen.getByLabelText(/Hosted session token/i), { + target: { value: "session-token-123" }, + }); + fireEvent.click(screen.getByRole("button", { name: "Start Telegram link" })); + + await waitFor(() => { + expect(fetchMock).toHaveBeenCalledTimes(1); + }); + + const [url, init] = fetchMock.mock.calls[0] as [string, RequestInit]; + expect(url).toContain("/v1/channels/telegram/link/start"); + expect((init.headers as Record).Authorization).toBe("Bearer session-token-123"); + expect(screen.getAllByText(/\/link CODE2026/).length).toBeGreaterThan(0); + }); +}); diff --git a/apps/web/components/hosted-settings-panel.tsx b/apps/web/components/hosted-settings-panel.tsx index 661dd80..a5c1dd8 100644 --- a/apps/web/components/hosted-settings-panel.tsx +++ b/apps/web/components/hosted-settings-panel.tsx @@ -1,31 +1,291 @@ +"use client"; + +import type { FormEvent } from "react"; +import { useState } from "react"; + +import { getApiConfig } from "../lib/api"; import { SectionCard } from "./section-card"; +import { StatusBadge } from "./status-badge"; const settingItems = [ { - title: "Timezone", - detail: "Persist IANA timezone for future scheduled brief orchestration.", + title: "Telegram Link Start", + detail: "Issue a deterministic link challenge bound to the active hosted workspace.", }, { - title: "Brief Preferences", - detail: "Capture daily brief posture inputs only; no scheduler execution in this sprint.", + title: "Telegram Link Confirm", + detail: "Confirm linkage only after webhook-observed link code proof from the Telegram chat identity.", }, { - title: "Quiet Hours", - detail: "Store quiet-hour boundaries for later policy-driven delivery logic.", + title: "Telegram Status + Unlink", + detail: "Inspect current transport readiness and remove Telegram linkage without local tooling.", }, { - title: "Device Visibility", - detail: "List and revoke linked devices deterministically.", + title: "Messages, Threads, Receipts", + detail: "Expose normalized inbound traffic, deterministic routing state, and outbound delivery posture.", }, ]; -export function HostedSettingsPanel() { +type TelegramLinkChallenge = { + challenge_token?: string; + link_code: string; + status: string; + expires_at: string; +}; + +type TelegramIdentity = { + id: string; + workspace_id: string; + external_chat_id: string; + external_username: string | null; + status: string; +}; + +type TelegramStatusPayload = { + workspace_id: string; + linked: boolean; + identity: TelegramIdentity | null; + latest_challenge: { + link_code: string; + status: string; + expires_at: string; + } | null; + recent_transport: { + message_id: string; + direction: string; + route_status: string; + observed_at: string; + } | null; +}; + +type TelegramMessage = { + id: string; + direction: string; + route_status: string; + message_text: string | null; + provider_update_id: string | null; + created_at: string; +}; + +type TelegramThread = { + id: string; + external_thread_key: string; + last_message_at: string | null; + updated_at: string; +}; + +type TelegramReceipt = { + id: string; + status: string; + channel_message_id: string; + recorded_at: string; + failure_code: string | null; +}; + +type HostedSettingsPanelProps = { + apiBaseUrl?: string; +}; + +function formatOptionalDate(value: string | null | undefined) { + if (!value) { + return "-"; + } + + try { + return new Date(value).toLocaleString("en"); + } catch { + return value; + } +} + +function trimOrNull(value: string) { + const normalized = value.trim(); + return normalized === "" ? null : normalized; +} + +export function HostedSettingsPanel({ apiBaseUrl }: HostedSettingsPanelProps) { + const apiConfig = getApiConfig(); + const resolvedApiBaseUrl = (apiBaseUrl ?? apiConfig.apiBaseUrl).trim(); + const liveModeReady = resolvedApiBaseUrl !== ""; + + const [sessionToken, setSessionToken] = useState(""); + const [workspaceId, setWorkspaceId] = useState(""); + const [challengeToken, setChallengeToken] = useState(""); + + const [isWorking, setIsWorking] = useState(false); + const [statusTone, setStatusTone] = useState<"info" | "success" | "danger">("info"); + const [statusText, setStatusText] = useState( + liveModeReady + ? "Provide a hosted session token, then run Telegram link start/confirm and status controls." + : "Telegram controls are unavailable until NEXT_PUBLIC_ALICEBOT_API_BASE_URL is configured.", + ); + + const [latestChallenge, setLatestChallenge] = useState(null); + const [latestIdentity, setLatestIdentity] = useState(null); + const [latestStatus, setLatestStatus] = useState(null); + const [messages, setMessages] = useState([]); + const [threads, setThreads] = useState([]); + const [receipts, setReceipts] = useState([]); + + async function requestTelegramJson( + path: string, + init?: RequestInit, + query?: Record, + ): Promise { + if (!liveModeReady) { + throw new Error("NEXT_PUBLIC_ALICEBOT_API_BASE_URL must be configured for live Telegram controls."); + } + + const token = sessionToken.trim(); + if (token === "") { + throw new Error("Hosted session token is required."); + } + + const url = new URL(path, `${resolvedApiBaseUrl.replace(/\/$/, "")}/`); + for (const [key, value] of Object.entries(query ?? {})) { + if (value) { + url.searchParams.set(key, value); + } + } + + const response = await fetch(url.toString(), { + cache: "no-store", + ...init, + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${token}`, + ...(init?.headers ?? {}), + }, + }); + + const payload = (await response.json().catch(() => null)) as { detail?: string } | null; + if (!response.ok) { + throw new Error(payload?.detail ?? "Request failed"); + } + + return payload as T; + } + + async function runOperation(operation: () => Promise, loadingText: string) { + setIsWorking(true); + setStatusTone("info"); + setStatusText(loadingText); + + try { + await operation(); + } catch (error) { + setStatusTone("danger"); + setStatusText(error instanceof Error ? error.message : "Request failed"); + } finally { + setIsWorking(false); + } + } + + async function handleStartLink(event: FormEvent) { + event.preventDefault(); + + await runOperation(async () => { + const payload = await requestTelegramJson<{ + challenge: TelegramLinkChallenge; + instructions: { + bot_username: string; + command: string; + }; + }>("/v1/channels/telegram/link/start", { + method: "POST", + body: JSON.stringify({ workspace_id: trimOrNull(workspaceId) }), + }); + + setLatestChallenge(payload.challenge); + if (payload.challenge.challenge_token) { + setChallengeToken(payload.challenge.challenge_token); + } + setStatusTone("success"); + setStatusText( + `Link challenge issued. Send ${payload.instructions.command} to @${payload.instructions.bot_username}, then confirm.`, + ); + }, "Issuing Telegram link challenge..."); + } + + async function handleConfirmLink() { + await runOperation(async () => { + const token = challengeToken.trim(); + if (token === "") { + throw new Error("Challenge token is required for link confirm."); + } + + const payload = await requestTelegramJson<{ + identity: TelegramIdentity; + challenge: TelegramLinkChallenge; + }>("/v1/channels/telegram/link/confirm", { + method: "POST", + body: JSON.stringify({ challenge_token: token }), + }); + + setLatestIdentity(payload.identity); + setLatestChallenge(payload.challenge); + setStatusTone("success"); + setStatusText("Telegram link confirmed for the current hosted workspace."); + }, "Confirming Telegram link challenge..."); + } + + async function handleLoadStatus() { + await runOperation(async () => { + const payload = await requestTelegramJson( + "/v1/channels/telegram/status", + undefined, + { workspace_id: trimOrNull(workspaceId) ?? undefined }, + ); + + setLatestStatus(payload); + setLatestIdentity(payload.identity); + setStatusTone("success"); + setStatusText( + payload.linked + ? "Telegram is linked and ready for transport operations." + : "Telegram is not linked for the selected workspace.", + ); + }, "Loading Telegram status..."); + } + + async function handleUnlink() { + await runOperation(async () => { + const payload = await requestTelegramJson<{ identity: TelegramIdentity }>( + "/v1/channels/telegram/unlink", + { + method: "POST", + body: JSON.stringify({ workspace_id: trimOrNull(workspaceId) }), + }, + ); + + setLatestIdentity(payload.identity); + setStatusTone("success"); + setStatusText("Telegram identity unlinked for the selected workspace."); + }, "Unlinking Telegram identity..."); + } + + async function handleRefreshTransportRecords() { + await runOperation(async () => { + const [messagePayload, threadPayload, receiptPayload] = await Promise.all([ + requestTelegramJson<{ items: TelegramMessage[] }>("/v1/channels/telegram/messages"), + requestTelegramJson<{ items: TelegramThread[] }>("/v1/channels/telegram/threads"), + requestTelegramJson<{ items: TelegramReceipt[] }>("/v1/channels/telegram/delivery-receipts"), + ]); + + setMessages(messagePayload.items); + setThreads(threadPayload.items); + setReceipts(receiptPayload.items); + setStatusTone("success"); + setStatusText("Loaded latest Telegram messages, threads, and delivery receipts."); + }, "Loading Telegram transport records..."); + } + return (
    {settingItems.map((item) => ( @@ -36,14 +296,241 @@ export function HostedSettingsPanel() {
+ +
+
+ + setSessionToken(event.target.value)} + placeholder="Paste Bearer token" + disabled={isWorking || !liveModeReady} + /> +
+ +
+ + setWorkspaceId(event.target.value)} + placeholder="Use current session workspace when empty" + disabled={isWorking || !liveModeReady} + /> +
+ +
+ + {statusText} +
+
+
+ + +
+
+ +
+ +
+ + setChallengeToken(event.target.value)} + placeholder="Challenge token returned by link start" + disabled={isWorking || !liveModeReady} + /> +

Use this token to confirm after Telegram webhook link proof is observed.

+
+ + + + {latestChallenge ? ( +
+

+ Latest link code: {latestChallenge.link_code} +

+

+ Challenge status: {latestChallenge.status} +

+

+ Expires: {formatOptionalDate(latestChallenge.expires_at)} +

+

+ Telegram command: /link {latestChallenge.link_code} +

+
+ ) : null} +
+
+ + +
+ + +
+ + {latestStatus ? ( +
+

+ Workspace: {latestStatus.workspace_id} +

+

+ Linked: {latestStatus.linked ? "yes" : "no"} +

+

+ Recent route status: {latestStatus.recent_transport?.route_status ?? "-"} +

+

+ Recent transport observed:{" "} + {formatOptionalDate(latestStatus.recent_transport?.observed_at)} +

+
+ ) : null} + + {latestIdentity ? ( +
+

+ Identity status: {latestIdentity.status} +

+

+ External chat: {latestIdentity.external_chat_id} +

+

+ External username: {latestIdentity.external_username ?? "-"} +

+
+ ) : null} +
+ + +
+ + +

+ Messages: {messages.length} +

+
    + {messages.slice(0, 5).map((message) => ( +
  • + {message.id} · {message.direction} · {message.route_status} ·{" "} + {message.message_text ?? "(no text)"} +
  • + ))} +
+ +

+ Threads: {threads.length} +

+
    + {threads.slice(0, 5).map((thread) => ( +
  • + {thread.external_thread_key} · updated {formatOptionalDate(thread.updated_at)} +
  • + ))} +
+ +

+ Delivery receipts: {receipts.length} +

+
    + {receipts.slice(0, 5).map((receipt) => ( +
  • + {receipt.channel_message_id} · {receipt.status} + {receipt.failure_code ? ` · ${receipt.failure_code}` : ""} +
  • + ))} +
+
+
+

- Hosted settings expose readiness inputs and device state, but do not claim Telegram linkage, - scheduler execution, or brief delivery in Phase 10 Sprint 1. + This surface does not claim Telegram continuity capture, recall, resume, correction, approval + resolution, or scheduler execution. It is transport and control-plane readiness only.

diff --git a/tests/integration/test_phase10_identity_workspace_bootstrap_api.py b/tests/integration/test_phase10_identity_workspace_bootstrap_api.py index 49a5a86..92a3f8b 100644 --- a/tests/integration/test_phase10_identity_workspace_bootstrap_api.py +++ b/tests/integration/test_phase10_identity_workspace_bootstrap_api.py @@ -105,7 +105,7 @@ def test_phase10_identity_workspace_bootstrap_and_preferences_flow( ) assert verify_status == 200 assert verify_payload["workspace"] is None - assert verify_payload["telegram_state"] == "not_available_in_p10_s1" + assert verify_payload["telegram_state"] == "available_in_p10_s2_transport" session_token = verify_payload["session_token"] primary_device_id = verify_payload["session"]["device_id"] @@ -143,7 +143,7 @@ def test_phase10_identity_workspace_bootstrap_and_preferences_flow( ) assert bootstrap_status_before == 200 assert bootstrap_payload_before["bootstrap"]["status"] == "pending" - assert bootstrap_payload_before["bootstrap"]["telegram_state"] == "not_available_in_p10_s1" + assert bootstrap_payload_before["bootstrap"]["telegram_state"] == "available_in_p10_s2_transport" bootstrap_status, bootstrap_payload = invoke_request( "POST", @@ -154,7 +154,7 @@ def test_phase10_identity_workspace_bootstrap_and_preferences_flow( assert bootstrap_status == 200 assert bootstrap_payload["workspace"]["bootstrap_status"] == "ready" assert bootstrap_payload["bootstrap"]["ready_for_next_phase_telegram_linkage"] is True - assert bootstrap_payload["telegram_state"] == "not_available_in_p10_s1" + assert bootstrap_payload["telegram_state"] == "available_in_p10_s2_transport" duplicate_bootstrap_status, duplicate_bootstrap_payload = invoke_request( "POST", diff --git a/tests/integration/test_phase10_telegram_transport_api.py b/tests/integration/test_phase10_telegram_transport_api.py new file mode 100644 index 0000000..2dfd7dd --- /dev/null +++ b/tests/integration/test_phase10_telegram_transport_api.py @@ -0,0 +1,588 @@ +from __future__ import annotations + +import json +from typing import Any +from urllib.parse import urlencode + +import anyio + +import apps.api.src.alicebot_api.main as main_module +from apps.api.src.alicebot_api.config import Settings + + +def invoke_request( + method: str, + path: str, + *, + query_params: dict[str, str] | None = None, + payload: dict[str, Any] | None = None, + headers: dict[str, str] | None = None, +) -> tuple[int, dict[str, Any]]: + messages: list[dict[str, object]] = [] + encoded_body = b"" if payload is None else json.dumps(payload).encode() + request_received = False + + async def receive() -> dict[str, object]: + nonlocal request_received + if request_received: + return {"type": "http.disconnect"} + + request_received = True + return {"type": "http.request", "body": encoded_body, "more_body": False} + + async def send(message: dict[str, object]) -> None: + messages.append(message) + + query_string = urlencode(query_params or {}).encode() + request_headers = [(b"content-type", b"application/json")] + for key, value in (headers or {}).items(): + request_headers.append((key.lower().encode(), value.encode())) + + scope = { + "type": "http", + "asgi": {"version": "3.0"}, + "http_version": "1.1", + "method": method, + "scheme": "http", + "path": path, + "raw_path": path.encode(), + "query_string": query_string, + "headers": request_headers, + "client": ("testclient", 50000), + "server": ("testserver", 80), + "root_path": "", + } + + anyio.run(main_module.app, scope, receive, send) + + start_message = next(message for message in messages if message["type"] == "http.response.start") + body = b"".join( + message.get("body", b"") + for message in messages + if message["type"] == "http.response.body" + ) + return start_message["status"], json.loads(body) + + +def auth_header(session_token: str) -> dict[str, str]: + return {"authorization": f"Bearer {session_token}"} + + +def _configure_settings(migrated_database_urls, monkeypatch) -> None: + monkeypatch.setattr( + main_module, + "get_settings", + lambda: Settings( + database_url=migrated_database_urls["app"], + magic_link_ttl_seconds=600, + auth_session_ttl_seconds=3600, + device_link_ttl_seconds=600, + telegram_link_ttl_seconds=600, + telegram_bot_username="alicebot", + telegram_webhook_secret="", + telegram_bot_token="", + ), + ) + + +def _bootstrap_workspace_session() -> tuple[str, str]: + start_status, start_payload = invoke_request( + "POST", + "/v1/auth/magic-link/start", + payload={"email": "telegram-builder@example.com"}, + ) + assert start_status == 200 + + verify_status, verify_payload = invoke_request( + "POST", + "/v1/auth/magic-link/verify", + payload={ + "challenge_token": start_payload["challenge"]["challenge_token"], + "device_label": "Telegram Builder Device", + "device_key": "telegram-builder-device", + }, + ) + assert verify_status == 200 + session_token = verify_payload["session_token"] + + create_workspace_status, create_workspace_payload = invoke_request( + "POST", + "/v1/workspaces", + payload={"name": "Telegram Builder Workspace"}, + headers=auth_header(session_token), + ) + assert create_workspace_status == 201 + workspace_id = create_workspace_payload["workspace"]["id"] + + bootstrap_status, bootstrap_payload = invoke_request( + "POST", + "/v1/workspaces/bootstrap", + payload={"workspace_id": workspace_id}, + headers=auth_header(session_token), + ) + assert bootstrap_status == 200 + assert bootstrap_payload["workspace"]["bootstrap_status"] == "ready" + return session_token, workspace_id + + +def _create_and_bootstrap_workspace(session_token: str, workspace_name: str) -> str: + create_workspace_status, create_workspace_payload = invoke_request( + "POST", + "/v1/workspaces", + payload={"name": workspace_name}, + headers=auth_header(session_token), + ) + assert create_workspace_status == 201 + workspace_id = create_workspace_payload["workspace"]["id"] + + bootstrap_status, bootstrap_payload = invoke_request( + "POST", + "/v1/workspaces/bootstrap", + payload={"workspace_id": workspace_id}, + headers=auth_header(session_token), + ) + assert bootstrap_status == 200 + assert bootstrap_payload["workspace"]["bootstrap_status"] == "ready" + return workspace_id + + +def test_phase10_telegram_link_webhook_idempotency_and_dispatch_flow( + migrated_database_urls, + monkeypatch, +) -> None: + _configure_settings(migrated_database_urls, monkeypatch) + session_token, workspace_id = _bootstrap_workspace_session() + + link_start_status, link_start_payload = invoke_request( + "POST", + "/v1/channels/telegram/link/start", + payload={"workspace_id": workspace_id}, + headers=auth_header(session_token), + ) + assert link_start_status == 200 + challenge_token = link_start_payload["challenge"]["challenge_token"] + link_code = link_start_payload["challenge"]["link_code"] + + webhook_status, webhook_payload = invoke_request( + "POST", + "/v1/channels/telegram/webhook", + payload={ + "update_id": 2001, + "message": { + "message_id": 501, + "date": 1710000000, + "chat": {"id": 9001, "type": "private"}, + "from": {"id": 7001, "username": "builder"}, + "text": f"/link {link_code}", + }, + }, + ) + assert webhook_status == 200 + assert webhook_payload["ingest"]["duplicate"] is False + assert webhook_payload["ingest"]["route_status"] == "resolved" + assert webhook_payload["ingest"]["link_status"] == "confirmed" + + duplicate_webhook_status, duplicate_webhook_payload = invoke_request( + "POST", + "/v1/channels/telegram/webhook", + payload={ + "update_id": 2001, + "message": { + "message_id": 501, + "date": 1710000000, + "chat": {"id": 9001, "type": "private"}, + "from": {"id": 7001, "username": "builder"}, + "text": f"/link {link_code}", + }, + }, + ) + assert duplicate_webhook_status == 200 + assert duplicate_webhook_payload["ingest"]["duplicate"] is True + + confirm_status, confirm_payload = invoke_request( + "POST", + "/v1/channels/telegram/link/confirm", + payload={"challenge_token": challenge_token}, + headers=auth_header(session_token), + ) + assert confirm_status == 201 + assert confirm_payload["identity"]["status"] == "linked" + assert confirm_payload["identity"]["workspace_id"] == workspace_id + + status_code, status_payload = invoke_request( + "GET", + "/v1/channels/telegram/status", + headers=auth_header(session_token), + ) + assert status_code == 200 + assert status_payload["linked"] is True + assert status_payload["identity"]["external_chat_id"] == "9001" + + message_webhook_status, message_webhook_payload = invoke_request( + "POST", + "/v1/channels/telegram/webhook", + payload={ + "update_id": 2002, + "message": { + "message_id": 502, + "date": 1710000005, + "chat": {"id": 9001, "type": "private"}, + "from": {"id": 7001, "username": "builder"}, + "text": "hello from telegram", + }, + }, + ) + assert message_webhook_status == 200 + assert message_webhook_payload["ingest"]["route_status"] == "resolved" + + messages_status, messages_payload = invoke_request( + "GET", + "/v1/channels/telegram/messages", + headers=auth_header(session_token), + ) + assert messages_status == 200 + assert messages_payload["summary"]["total_count"] == 2 + + inbound_message = next( + item for item in messages_payload["items"] if item["provider_update_id"] == "2002" + ) + + threads_status, threads_payload = invoke_request( + "GET", + "/v1/channels/telegram/threads", + headers=auth_header(session_token), + ) + assert threads_status == 200 + assert threads_payload["summary"]["total_count"] == 1 + + dispatch_status, dispatch_payload = invoke_request( + "POST", + f"/v1/channels/telegram/messages/{inbound_message['id']}/dispatch", + payload={"text": "acknowledged"}, + headers=auth_header(session_token), + ) + assert dispatch_status == 201 + assert dispatch_payload["message"]["direction"] == "outbound" + assert dispatch_payload["receipt"]["status"] == "simulated" + + receipts_status, receipts_payload = invoke_request( + "GET", + "/v1/channels/telegram/delivery-receipts", + headers=auth_header(session_token), + ) + assert receipts_status == 200 + assert receipts_payload["summary"]["total_count"] == 1 + assert receipts_payload["items"][0]["status"] == "simulated" + + +def test_phase10_telegram_invalid_link_token_and_unknown_chat_routing( + migrated_database_urls, + monkeypatch, +) -> None: + _configure_settings(migrated_database_urls, monkeypatch) + session_token, _workspace_id = _bootstrap_workspace_session() + + invalid_confirm_status, invalid_confirm_payload = invoke_request( + "POST", + "/v1/channels/telegram/link/confirm", + payload={"challenge_token": "invalid-telegram-link-token"}, + headers=auth_header(session_token), + ) + assert invalid_confirm_status == 400 + assert "invalid" in invalid_confirm_payload["detail"] + + unknown_webhook_status, unknown_webhook_payload = invoke_request( + "POST", + "/v1/channels/telegram/webhook", + payload={ + "update_id": 3101, + "message": { + "message_id": 901, + "date": 1710001000, + "chat": {"id": 9900, "type": "private"}, + "from": {"id": 8800, "username": "unknown"}, + "text": "hello anyone there", + }, + }, + ) + assert unknown_webhook_status == 200 + assert unknown_webhook_payload["ingest"]["unknown_chat_routing"] is True + assert unknown_webhook_payload["ingest"]["route_status"] == "unresolved" + + messages_status, messages_payload = invoke_request( + "GET", + "/v1/channels/telegram/messages", + headers=auth_header(session_token), + ) + assert messages_status == 200 + assert messages_payload["summary"]["total_count"] == 0 + + malformed_webhook_status, malformed_webhook_payload = invoke_request( + "POST", + "/v1/channels/telegram/webhook", + payload={"message": {}}, + ) + assert malformed_webhook_status == 400 + assert "update_id" in malformed_webhook_payload["detail"] + + +def test_phase10_telegram_unlink_and_relink_flow( + migrated_database_urls, + monkeypatch, +) -> None: + _configure_settings(migrated_database_urls, monkeypatch) + session_token, workspace_id = _bootstrap_workspace_session() + + first_start_status, first_start_payload = invoke_request( + "POST", + "/v1/channels/telegram/link/start", + payload={"workspace_id": workspace_id}, + headers=auth_header(session_token), + ) + assert first_start_status == 200 + + first_webhook_status, _first_webhook_payload = invoke_request( + "POST", + "/v1/channels/telegram/webhook", + payload={ + "update_id": 4101, + "message": { + "message_id": 1101, + "date": 1710002000, + "chat": {"id": 777001, "type": "private"}, + "from": {"id": 77001, "username": "relinker"}, + "text": f"/link {first_start_payload['challenge']['link_code']}", + }, + }, + ) + assert first_webhook_status == 200 + + first_confirm_status, first_confirm_payload = invoke_request( + "POST", + "/v1/channels/telegram/link/confirm", + payload={"challenge_token": first_start_payload["challenge"]["challenge_token"]}, + headers=auth_header(session_token), + ) + assert first_confirm_status == 201 + assert first_confirm_payload["identity"]["status"] == "linked" + + unlink_status, unlink_payload = invoke_request( + "POST", + "/v1/channels/telegram/unlink", + payload={"workspace_id": workspace_id}, + headers=auth_header(session_token), + ) + assert unlink_status == 200 + assert unlink_payload["identity"]["status"] == "unlinked" + + unresolved_after_unlink_status, unresolved_after_unlink_payload = invoke_request( + "POST", + "/v1/channels/telegram/webhook", + payload={ + "update_id": 4102, + "message": { + "message_id": 1102, + "date": 1710002010, + "chat": {"id": 777001, "type": "private"}, + "from": {"id": 77001, "username": "relinker"}, + "text": "post-unlink message", + }, + }, + ) + assert unresolved_after_unlink_status == 200 + assert unresolved_after_unlink_payload["ingest"]["unknown_chat_routing"] is True + + second_start_status, second_start_payload = invoke_request( + "POST", + "/v1/channels/telegram/link/start", + payload={"workspace_id": workspace_id}, + headers=auth_header(session_token), + ) + assert second_start_status == 200 + + second_webhook_status, second_webhook_payload = invoke_request( + "POST", + "/v1/channels/telegram/webhook", + payload={ + "update_id": 4103, + "message": { + "message_id": 1103, + "date": 1710002020, + "chat": {"id": 777001, "type": "private"}, + "from": {"id": 77001, "username": "relinker"}, + "text": f"/link {second_start_payload['challenge']['link_code']}", + }, + }, + ) + assert second_webhook_status == 200 + assert second_webhook_payload["ingest"]["link_status"] == "confirmed" + + second_confirm_status, second_confirm_payload = invoke_request( + "POST", + "/v1/channels/telegram/link/confirm", + payload={"challenge_token": second_start_payload["challenge"]["challenge_token"]}, + headers=auth_header(session_token), + ) + assert second_confirm_status == 201 + assert second_confirm_payload["identity"]["status"] == "linked" + + final_status_code, final_status_payload = invoke_request( + "GET", + "/v1/channels/telegram/status", + headers=auth_header(session_token), + ) + assert final_status_code == 200 + assert final_status_payload["linked"] is True + assert final_status_payload["identity"]["external_chat_id"] == "777001" + + +def test_phase10_telegram_rejects_confirmed_link_code_replay_from_different_chat( + migrated_database_urls, + monkeypatch, +) -> None: + _configure_settings(migrated_database_urls, monkeypatch) + session_token, workspace_id = _bootstrap_workspace_session() + + link_start_status, link_start_payload = invoke_request( + "POST", + "/v1/channels/telegram/link/start", + payload={"workspace_id": workspace_id}, + headers=auth_header(session_token), + ) + assert link_start_status == 200 + challenge_token = link_start_payload["challenge"]["challenge_token"] + link_code = link_start_payload["challenge"]["link_code"] + + first_webhook_status, first_webhook_payload = invoke_request( + "POST", + "/v1/channels/telegram/webhook", + payload={ + "update_id": 5101, + "message": { + "message_id": 2101, + "date": 1710003000, + "chat": {"id": 880001, "type": "private"}, + "from": {"id": 880001, "username": "linkeduser"}, + "text": f"/link {link_code}", + }, + }, + ) + assert first_webhook_status == 200 + assert first_webhook_payload["ingest"]["link_status"] == "confirmed" + + confirm_status, confirm_payload = invoke_request( + "POST", + "/v1/channels/telegram/link/confirm", + payload={"challenge_token": challenge_token}, + headers=auth_header(session_token), + ) + assert confirm_status == 201 + assert confirm_payload["identity"]["external_chat_id"] == "880001" + + replay_status, replay_payload = invoke_request( + "POST", + "/v1/channels/telegram/webhook", + payload={ + "update_id": 5102, + "message": { + "message_id": 2102, + "date": 1710003010, + "chat": {"id": 880002, "type": "private"}, + "from": {"id": 880002, "username": "replayuser"}, + "text": f"/link {link_code}", + }, + }, + ) + assert replay_status == 200 + assert replay_payload["ingest"]["link_status"] == "invalid_link_code" + assert replay_payload["ingest"]["route_status"] == "unresolved" + assert replay_payload["ingest"]["unknown_chat_routing"] is True + + +def test_phase10_telegram_rejects_cross_workspace_identity_conflict( + migrated_database_urls, + monkeypatch, +) -> None: + _configure_settings(migrated_database_urls, monkeypatch) + session_token, workspace_id = _bootstrap_workspace_session() + + first_link_start_status, first_link_start_payload = invoke_request( + "POST", + "/v1/channels/telegram/link/start", + payload={"workspace_id": workspace_id}, + headers=auth_header(session_token), + ) + assert first_link_start_status == 200 + + first_webhook_status, first_webhook_payload = invoke_request( + "POST", + "/v1/channels/telegram/webhook", + payload={ + "update_id": 6101, + "message": { + "message_id": 3101, + "date": 1710004000, + "chat": {"id": 990001, "type": "private"}, + "from": {"id": 990001, "username": "workspaceone"}, + "text": f"/link {first_link_start_payload['challenge']['link_code']}", + }, + }, + ) + assert first_webhook_status == 200 + assert first_webhook_payload["ingest"]["link_status"] == "confirmed" + + first_confirm_status, _first_confirm_payload = invoke_request( + "POST", + "/v1/channels/telegram/link/confirm", + payload={"challenge_token": first_link_start_payload["challenge"]["challenge_token"]}, + headers=auth_header(session_token), + ) + assert first_confirm_status == 201 + + second_workspace_id = _create_and_bootstrap_workspace( + session_token, + "Telegram Builder Workspace Two", + ) + + second_link_start_status, second_link_start_payload = invoke_request( + "POST", + "/v1/channels/telegram/link/start", + payload={"workspace_id": second_workspace_id}, + headers=auth_header(session_token), + ) + assert second_link_start_status == 200 + + second_webhook_status, second_webhook_payload = invoke_request( + "POST", + "/v1/channels/telegram/webhook", + payload={ + "update_id": 6102, + "message": { + "message_id": 3102, + "date": 1710004010, + "chat": {"id": 990001, "type": "private"}, + "from": {"id": 990001, "username": "workspaceone"}, + "text": f"/link {second_link_start_payload['challenge']['link_code']}", + }, + }, + ) + assert second_webhook_status == 200 + assert second_webhook_payload["ingest"]["link_status"] == "identity_conflict" + + second_confirm_status, second_confirm_payload = invoke_request( + "POST", + "/v1/channels/telegram/link/confirm", + payload={"challenge_token": second_link_start_payload["challenge"]["challenge_token"]}, + headers=auth_header(session_token), + ) + assert second_confirm_status == 409 + assert "pending webhook confirmation" in second_confirm_payload["detail"] + + second_status_code, second_status_payload = invoke_request( + "GET", + "/v1/channels/telegram/status", + query_params={"workspace_id": second_workspace_id}, + headers=auth_header(session_token), + ) + assert second_status_code == 200 + assert second_status_payload["linked"] is False diff --git a/tests/unit/test_20260408_0044_phase10_telegram_transport.py b/tests/unit/test_20260408_0044_phase10_telegram_transport.py new file mode 100644 index 0000000..4511006 --- /dev/null +++ b/tests/unit/test_20260408_0044_phase10_telegram_transport.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +import importlib + + +MODULE_NAME = "apps.api.alembic.versions.20260408_0044_phase10_telegram_transport" + + +def load_migration_module(): + return importlib.import_module(MODULE_NAME) + + +def test_upgrade_executes_expected_statements_in_order(monkeypatch) -> None: + module = load_migration_module() + executed: list[str] = [] + + monkeypatch.setattr(module.op, "execute", executed.append) + + module.upgrade() + + assert executed == [ + *module._UPGRADE_STATEMENTS, + *module._UPGRADE_GRANT_STATEMENTS, + ] + + +def test_downgrade_executes_expected_statements_in_order(monkeypatch) -> None: + module = load_migration_module() + executed: list[str] = [] + + monkeypatch.setattr(module.op, "execute", executed.append) + + module.downgrade() + + assert executed == list(module._DOWNGRADE_STATEMENTS) + + +def test_migration_mentions_phase10_s2_channel_tables() -> None: + module = load_migration_module() + + joined_upgrade_sql = "\n".join(module._UPGRADE_STATEMENTS) + for table_name in ( + "channel_identities", + "channel_link_challenges", + "channel_messages", + "channel_threads", + "channel_delivery_receipts", + "chat_intents", + ): + assert table_name in joined_upgrade_sql + + +def test_migration_hashes_challenge_tokens_and_enforces_webhook_idempotency() -> None: + module = load_migration_module() + joined_upgrade_sql = "\n".join(module._UPGRADE_STATEMENTS) + + assert "challenge_token_hash text NOT NULL UNIQUE" in joined_upgrade_sql + assert "challenge_token text NOT NULL UNIQUE" not in joined_upgrade_sql + assert "UNIQUE (channel_type, direction, idempotency_key)" in joined_upgrade_sql + assert "ON channel_identities (channel_type, external_chat_id)" in joined_upgrade_sql diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 6fb31fc..ade6010 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -29,6 +29,10 @@ def test_settings_defaults(monkeypatch): "ALICEBOT_AUTH_USER_ID", "RESPONSE_RATE_LIMIT_WINDOW_SECONDS", "RESPONSE_RATE_LIMIT_MAX_REQUESTS", + "TELEGRAM_LINK_TTL_SECONDS", + "TELEGRAM_BOT_USERNAME", + "TELEGRAM_WEBHOOK_SECRET", + "TELEGRAM_BOT_TOKEN", ): monkeypatch.delenv(key, raising=False) @@ -49,6 +53,10 @@ def test_settings_defaults(monkeypatch): assert settings.auth_user_id == "" assert settings.response_rate_limit_window_seconds == 60 assert settings.response_rate_limit_max_requests == 20 + assert settings.telegram_link_ttl_seconds == 600 + assert settings.telegram_bot_username == "alicebot" + assert settings.telegram_webhook_secret == "" + assert settings.telegram_bot_token == "" def test_settings_honor_environment_overrides(monkeypatch): @@ -65,6 +73,10 @@ def test_settings_honor_environment_overrides(monkeypatch): monkeypatch.setenv("ALICEBOT_AUTH_USER_ID", "00000000-0000-0000-0000-000000000001") monkeypatch.setenv("RESPONSE_RATE_LIMIT_WINDOW_SECONDS", "120") monkeypatch.setenv("RESPONSE_RATE_LIMIT_MAX_REQUESTS", "30") + monkeypatch.setenv("TELEGRAM_LINK_TTL_SECONDS", "900") + monkeypatch.setenv("TELEGRAM_BOT_USERNAME", "alicebuilder_bot") + monkeypatch.setenv("TELEGRAM_WEBHOOK_SECRET", "phase10-secret") + monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "test-bot-token") settings = Settings.from_env() @@ -81,6 +93,10 @@ def test_settings_honor_environment_overrides(monkeypatch): assert settings.auth_user_id == "00000000-0000-0000-0000-000000000001" assert settings.response_rate_limit_window_seconds == 120 assert settings.response_rate_limit_max_requests == 30 + assert settings.telegram_link_ttl_seconds == 900 + assert settings.telegram_bot_username == "alicebuilder_bot" + assert settings.telegram_webhook_secret == "phase10-secret" + assert settings.telegram_bot_token == "test-bot-token" def test_settings_can_be_loaded_from_an_explicit_environment_mapping() -> None: @@ -97,6 +113,10 @@ def test_settings_can_be_loaded_from_an_explicit_environment_mapping() -> None: "ALICEBOT_AUTH_USER_ID": "00000000-0000-0000-0000-000000000001", "RESPONSE_RATE_LIMIT_WINDOW_SECONDS": "75", "RESPONSE_RATE_LIMIT_MAX_REQUESTS": "10", + "TELEGRAM_LINK_TTL_SECONDS": "700", + "TELEGRAM_BOT_USERNAME": "alicebot_phase10", + "TELEGRAM_WEBHOOK_SECRET": "secret-value", + "TELEGRAM_BOT_TOKEN": "bot-token", } ) @@ -111,6 +131,10 @@ def test_settings_can_be_loaded_from_an_explicit_environment_mapping() -> None: assert settings.auth_user_id == "00000000-0000-0000-0000-000000000001" assert settings.response_rate_limit_window_seconds == 75 assert settings.response_rate_limit_max_requests == 10 + assert settings.telegram_link_ttl_seconds == 700 + assert settings.telegram_bot_username == "alicebot_phase10" + assert settings.telegram_webhook_secret == "secret-value" + assert settings.telegram_bot_token == "bot-token" def test_settings_raise_clear_error_for_invalid_integer_values() -> None: @@ -142,6 +166,15 @@ def test_settings_reject_non_positive_rate_limit_values() -> None: ): Settings.from_env({"RESPONSE_RATE_LIMIT_MAX_REQUESTS": "0"}) + with pytest.raises( + ValueError, + match="TELEGRAM_LINK_TTL_SECONDS must be a positive integer", + ): + Settings.from_env({"TELEGRAM_LINK_TTL_SECONDS": "0"}) + + with pytest.raises(ValueError, match="TELEGRAM_BOT_USERNAME must be provided"): + Settings.from_env({"TELEGRAM_BOT_USERNAME": " "}) + def test_settings_require_hardened_non_dev_configuration() -> None: with pytest.raises( diff --git a/tests/unit/test_telegram_channels.py b/tests/unit/test_telegram_channels.py new file mode 100644 index 0000000..eb0f14b --- /dev/null +++ b/tests/unit/test_telegram_channels.py @@ -0,0 +1,69 @@ +from __future__ import annotations + +import pytest + +from alicebot_api.telegram_channels import ( + TelegramWebhookValidationError, + build_inbound_idempotency_key, + extract_telegram_link_code, + normalize_telegram_update, + resolve_telegram_thread_key, +) + + +def test_build_inbound_idempotency_key_is_deterministic() -> None: + assert build_inbound_idempotency_key(update_id=1001) == build_inbound_idempotency_key(update_id=1001) + assert len(build_inbound_idempotency_key(update_id=1001)) == 64 + + +def test_extract_telegram_link_code_supports_link_and_start_commands() -> None: + assert extract_telegram_link_code("/link ABC12345", bot_username="alicebot") == "ABC12345" + assert extract_telegram_link_code("/start zx90aa11", bot_username="alicebot") == "ZX90AA11" + assert extract_telegram_link_code("/link@alicebot CODE2026", bot_username="alicebot") == "CODE2026" + assert extract_telegram_link_code("/link@otherbot CODE2026", bot_username="alicebot") is None + assert extract_telegram_link_code("hello", bot_username="alicebot") is None + + +def test_normalize_telegram_update_returns_stable_contract() -> None: + normalized = normalize_telegram_update( + { + "update_id": 2026001, + "message": { + "message_id": 77, + "date": 1710000000, + "chat": {"id": 999001, "type": "private"}, + "from": {"id": 555001, "username": "builder"}, + "text": "/link p10s2abc", + }, + }, + bot_username="alicebot", + ) + + assert normalized["provider_update_id"] == "2026001" + assert normalized["provider_message_id"] == "77" + assert normalized["external_chat_id"] == "999001" + assert normalized["external_user_id"] == "555001" + assert normalized["external_username"] == "builder" + assert normalized["link_code"] == "P10S2ABC" + assert normalized["idempotency_key"] == build_inbound_idempotency_key(update_id=2026001) + assert resolve_telegram_thread_key(external_chat_id=normalized["external_chat_id"]) == "telegram-chat:999001" + + +def test_normalize_telegram_update_rejects_missing_required_fields() -> None: + with pytest.raises(TelegramWebhookValidationError, match="requires integer update_id"): + normalize_telegram_update({"message": {}}, bot_username="alicebot") + + with pytest.raises(TelegramWebhookValidationError, match="requires message object"): + normalize_telegram_update({"update_id": 1}, bot_username="alicebot") + + with pytest.raises(TelegramWebhookValidationError, match="requires chat.id"): + normalize_telegram_update( + { + "update_id": 1, + "message": { + "message_id": 1, + "from": {"id": 2}, + }, + }, + bot_username="alicebot", + )