diff --git a/.ai/active/SPRINT_PACKET.md b/.ai/active/SPRINT_PACKET.md index b56893d..af3a5eb 100644 --- a/.ai/active/SPRINT_PACKET.md +++ b/.ai/active/SPRINT_PACKET.md @@ -2,7 +2,7 @@ ## Sprint Title -Phase 10 Sprint 3 (P10-S3): Chat-Native Continuity + Approvals +Phase 10 Sprint 4 (P10-S4): Daily Brief + Notifications + Scheduled Open-Loop Review Historical baseline marker: Phase 10 Sprint 1 (P10-S1): Identity + Workspace Bootstrap. @@ -12,21 +12,21 @@ feature ## Sprint Reason -`P10-S1` shipped hosted identity, workspace bootstrap, device management, and preferences. `P10-S2` shipped Telegram transport, channel linking, normalized inbound messages, routing, and delivery receipts. `P10-S3` now turns that transport into a usable continuity surface by routing Telegram chats into capture, recall, resume, correction, open-loop review, and approval resolution on top of the shipped Alice Core semantics. +`P10-S1` shipped hosted identity, workspace bootstrap, device management, and preferences. `P10-S2` shipped Telegram transport, channel linking, normalized inbound messages, routing, and delivery receipts. `P10-S3` shipped chat-native continuity and approval handling. `P10-S4` now adds the scheduled habit loop: daily brief generation, notification policy enforcement, quiet-hours-respecting delivery, and scheduled prompts for waiting-for and stale open-loop review. -Reference baseline markers: `P10-S1` Identity + Workspace Bootstrap and `P10-S2` Telegram Transport + Message Normalization. +Reference baseline markers: `P10-S1` Identity + Workspace Bootstrap, `P10-S2` Telegram Transport + Message Normalization, and `P10-S3` Chat-Native Continuity + Approvals. ## Sprint Intent -- Telegram-native capture, recall, resume, correction, and open-loop review flows -- deterministic routing from normalized Telegram messages into the right continuity action -- approval prompts and approval resolution in Telegram -- provenance-backed replies and correction-aware answer behavior -- reuse of shipped transport seams without widening into scheduling or brief delivery +- daily brief generation and Telegram delivery +- notification policy enforcement including quiet hours and user preferences +- scheduled waiting-for and stale-item prompts +- scheduled open-loop review nudges that reuse shipped `P10-S3` review actions +- deterministic job and delivery evidence without widening into launch tooling ## Git Instructions -- Branch Name: `codex/phase10-sprint-3-chat-continuity-approvals` +- Branch Name: `codex/phase10-sprint-4-daily-brief-notifications` - Base Branch: `main` - PR Strategy: one sprint branch, one PR - Merge Policy: squash merge only after review `PASS` and explicit approval @@ -41,37 +41,38 @@ Reference baseline markers: `P10-S1` Identity + Workspace Bootstrap and `P10-S2` - continuity engine, approvals, and eval harness - `P10-S1` hosted auth, workspace bootstrap, device management, preferences, beta cohorts, and feature flags - `P10-S2` Telegram transport, link/unlink, normalization, routing, and delivery receipts + - `P10-S3` Telegram chat-native continuity, open-loop review, and approval handling - Phase 9 shipped scope is baseline truth, not sprint work - Required now: - - intent routing from normalized Telegram messages into continuity and approval actions - - Telegram-native continuity answers and correction-aware reply posture - - open-loop review actions and approval resolution in chat - - provenance-backed outbound responses built from durable stored truth -- Explicitly out of `P10-S3`: + - daily brief compilation from durable continuity state + - scheduler execution and due-job selection + - notification policy enforcement using timezone, quiet hours, and brief preferences + - scheduled waiting-for and stale open-loop prompts delivered through shipped Telegram transport +- Explicitly out of `P10-S4`: - new hosted auth, session, or workspace bootstrap flows - Telegram transport or link/unlink contract redesign - - daily brief generation or scheduler execution + - generic chat-native capture/recall/resume/correction behavior already shipped in `P10-S3` - support/admin dashboards - broad channel expansion beyond Telegram - launch hardening ## Exact APIs In Scope -- `POST /v1/channels/telegram/messages/{message_id}/handle` -- `GET /v1/channels/telegram/messages/{message_id}/result` -- `GET /v1/channels/telegram/recall` -- `GET /v1/channels/telegram/resume` -- `GET /v1/channels/telegram/open-loops` -- `POST /v1/channels/telegram/open-loops/{open_loop_id}/review-action` -- `GET /v1/channels/telegram/approvals` -- `POST /v1/channels/telegram/approvals/{approval_id}/approve` -- `POST /v1/channels/telegram/approvals/{approval_id}/reject` +- `GET /v1/channels/telegram/daily-brief` +- `POST /v1/channels/telegram/daily-brief/deliver` +- `GET /v1/channels/telegram/notification-preferences` +- `PATCH /v1/channels/telegram/notification-preferences` +- `GET /v1/channels/telegram/open-loop-prompts` +- `POST /v1/channels/telegram/open-loop-prompts/{prompt_id}/deliver` +- `GET /v1/channels/telegram/delivery-receipts` +- `GET /v1/channels/telegram/scheduler/jobs` ## Exact Data Additions In Scope -- `approval_challenges` -- `open_loop_reviews` -- additive Telegram message intent/result fields required to persist routed continuity and approval outcomes +- `continuity_briefs` +- `daily_brief_jobs` +- `notification_subscriptions` +- additive scheduled-delivery fields required on `channel_delivery_receipts` and related scheduler/job records ## Exact Files And Modules In Scope @@ -79,15 +80,11 @@ Reference baseline markers: `P10-S1` Identity + Workspace Bootstrap and `P10-S2` - `apps/api/src/alicebot_api/contracts.py` - `apps/api/src/alicebot_api/store.py` - `apps/api/src/alicebot_api/telegram_channels.py` -- `apps/api/src/alicebot_api/continuity_capture.py` -- `apps/api/src/alicebot_api/continuity_recall.py` -- `apps/api/src/alicebot_api/continuity_resumption.py` -- `apps/api/src/alicebot_api/continuity_review.py` +- `apps/api/src/alicebot_api/chief_of_staff.py` - `apps/api/src/alicebot_api/continuity_open_loops.py` -- `apps/api/src/alicebot_api/approvals.py` -- new Telegram continuity orchestration helpers under `apps/api/src/alicebot_api/` if needed +- new daily-brief / notification scheduling helpers under `apps/api/src/alicebot_api/` - API migrations under `apps/api/alembic/versions/` -- hosted Telegram chat-status / approval-status pages or components under `apps/web/app/` and `apps/web/components/` +- hosted brief-preference / notification-status pages or components under `apps/web/app/` and `apps/web/components/` - sprint-owned unit, integration, and web tests under `tests/` and `apps/web/app/**/*.test.tsx` - sprint-owned documentation updates required to keep active control truth aligned @@ -95,39 +92,39 @@ Reference baseline markers: `P10-S1` Identity + Workspace Bootstrap and `P10-S2` ### API And Persistence -- add intent classification and action routing from shipped normalized Telegram messages into continuity and approval handlers -- persist Telegram message handling results, approval challenge state, and open-loop review actions without forking the underlying Alice Core objects -- reuse shipped `P10-S2` channel/thread routing and delivery-receipt posture rather than creating a parallel chat pipeline +- add brief/job/subscription contracts and persistence for scheduled delivery +- add due-job selection and delivery bookkeeping that reuses shipped Telegram delivery seams +- persist scheduled prompt and brief outcomes without creating a parallel chat history model -### Chat Behavior +### Delivery Behavior -- support capture, recall, resume, correction, and open-loop review from Telegram messages -- support approval prompts and approval resolution in Telegram using the existing approval discipline -- ensure replies remain provenance-backed and correction-aware rather than transcript-summarized +- compile useful daily brief payloads from durable continuity and chief-of-staff state +- enforce timezone, quiet hours, and notification preferences before delivery +- deliver scheduled waiting-for and stale-item prompts that point back into shipped `P10-S3` open-loop review handling ### Verification -- add unit coverage for Telegram intent routing, continuity result formatting, and approval action helpers -- add integration coverage for all `P10-S3` endpoints, including wrong-intent routing, correction uptake, open-loop review actions, and approval approve/reject flows -- add web tests for Telegram continuity / approval status UX if sprint-owned UI changes are introduced +- add unit coverage for brief compilation, quiet-hours gating, and due-job selection +- add integration coverage for all `P10-S4` endpoints, including quiet-hours suppression, disabled notifications, repeated-job idempotency, and stale-item prompt delivery +- add web tests for brief-preference / notification-status UX if sprint-owned UI changes are introduced - keep control-doc truth checks passing after packet and current-state updates ## Required Deliverables -- Telegram chat handling for capture, recall, resume, correction, and open-loop review -- deterministic intent routing from shipped normalized Telegram messages -- approval prompts and approve/reject handling in Telegram -- provenance-backed Telegram replies -- persisted handling results that later daily-brief work can build on +- daily brief compiler and Telegram delivery path +- notification preference and quiet-hours enforcement +- scheduled waiting-for and stale-item prompts +- persisted brief/job/subscription evidence +- status surface for brief and notification posture ## Acceptance Criteria -- a linked Telegram user can capture a new continuity item and receive a deterministic acknowledgment in chat -- a linked Telegram user can ask recall and resume questions and receive provenance-backed answers from durable stored truth -- correction messages update subsequent Telegram answers in a correction-aware way -- a linked Telegram user can review open loops and resolve approval prompts in chat -- `P10-S1` and `P10-S2` hosted/transport semantics remain baseline truth and are not reopened as sprint work -- no `P10-S3` endpoint or screen claims that scheduled daily briefs or notification loops are already active +- a linked Telegram user with notifications enabled can receive a useful daily brief generated from durable stored state +- quiet hours and notification preference settings suppress or defer delivery deterministically +- waiting-for and stale open-loop prompts are generated and delivered without reopening generic open-loop review semantics already shipped in `P10-S3` +- delivery jobs and receipts are persisted with deterministic status evidence +- `P10-S1`, `P10-S2`, and `P10-S3` semantics remain baseline truth and are not reopened as sprint work +- no `P10-S4` endpoint or screen claims that beta admin/support tooling or launch hardening is already active ## Required Verification Commands @@ -138,7 +135,7 @@ Reference baseline markers: `P10-S1` Identity + Workspace Bootstrap and `P10-S2` ## Review Evidence Requirements - `BUILD_REPORT.md` must list the exact sprint-owned files changed and the exact command results above -- `REVIEW_REPORT.md` must grade against `P10-S3` specifically, not generic Phase 10 planning +- `REVIEW_REPORT.md` must grade against `P10-S4` specifically, not generic Phase 10 planning - if local archive paths remain dirty, they must be called out explicitly as excluded from sprint merge scope ## Implementation Constraints @@ -146,11 +143,11 @@ Reference baseline markers: `P10-S1` Identity + Workspace Bootstrap and `P10-S2` - do not fork continuity semantics between hosted surfaces and Alice Core - keep OSS versus product boundaries explicit in docs and API naming - preserve existing approval, provenance, and correction discipline -- do not widen `P10-S3` into daily briefs, notification scheduling, or launch tooling -- do not ship a scheduler in `P10-S3` -- reuse the shipped `P10-S1` and `P10-S2` identity/workspace/channel foundations instead of duplicating control-plane state +- do not widen `P10-S4` into beta admin tooling or launch work +- reuse the shipped `P10-S1`, `P10-S2`, and `P10-S3` identity/workspace/channel/chat foundations instead of duplicating control-plane state +- do not re-implement generic open-loop review actions that already shipped in `P10-S3`; this sprint only adds scheduled prompting and brief delivery - prefer additive hosted-control-plane seams over invasive rewrites of shipped Phase 9 paths ## Exit Condition -`P10-S3` is complete when a linked Telegram user can use capture, recall, resume, correction, open-loop review, and approval resolution against the shipped Alice Core semantics through the shipped Telegram transport, with provenance-backed replies and no reopening of hosted identity or transport scope. +`P10-S4` is complete when a linked Telegram user can receive a daily brief and scheduled waiting-for/stale prompts under deterministic quiet-hours and notification-policy enforcement, with persisted job and delivery evidence and no reopening of hosted identity, transport, or generic chat-continuity scope. diff --git a/.ai/handoff/CURRENT_STATE.md b/.ai/handoff/CURRENT_STATE.md index 10c6c7f..0aa8563 100644 --- a/.ai/handoff/CURRENT_STATE.md +++ b/.ai/handoff/CURRENT_STATE.md @@ -7,8 +7,9 @@ - P10-S1 (Identity + Workspace Bootstrap) is the first execution sprint packet. - `P10-S1` (Identity + Workspace Bootstrap) is shipped. - `P10-S2` (Telegram Transport + Message Normalization) is shipped. -- `P10-S3` (Chat-Native Continuity + Approvals) is the active execution sprint packet. -- No chat-native Phase 10 continuity surface is shipped yet. +- `P10-S3` (Chat-Native Continuity + Approvals) is shipped. +- `P10-S4` (Daily Brief + Notifications + Scheduled Open-Loop Review) is the active execution sprint packet. +- No scheduled daily-brief and notification loop is shipped yet. ## Canonical Baseline @@ -29,8 +30,9 @@ - `P10-S1` shipped the hosted account/session foundations, workspace bootstrap, device management, preferences, and beta controls. - `P10-S2` shipped Telegram transport, link/unlink flow, message normalization, routing, and delivery receipts. -- `P10-S3` covers chat-native continuity behavior and approval handling on top of the shipped Telegram transport. -- Daily briefs and launch hardening are later Phase 10 milestones. +- `P10-S3` shipped chat-native continuity behavior and approval handling on top of the shipped Telegram transport. +- `P10-S4` covers daily brief delivery, notification policy, quiet hours, and scheduled waiting-for / stale-item prompts. +- Launch hardening is the later Phase 10 milestone. - Phase 9 shipped scope is baseline truth and must not be reopened as sprint work. ## Active Constraints diff --git a/BUILD_REPORT.md b/BUILD_REPORT.md index 3352765..73a0a8f 100644 --- a/BUILD_REPORT.md +++ b/BUILD_REPORT.md @@ -1,73 +1,101 @@ -# BUILD_REPORT +# BUILD_REPORT.md -## sprint objective -Implement Phase 10 Sprint 3 (P10-S3): Telegram chat-native continuity + approvals, including deterministic intent routing from normalized Telegram messages into continuity/approval actions, persisted handling outcomes, and required `v1/channels/telegram/*` APIs. +## Sprint Objective +Implement `P10-S4` Daily Brief + Notifications + Scheduled Open-Loop Review for hosted Telegram by adding: +- daily brief compile + delivery path +- notification preferences + quiet-hours policy enforcement +- scheduled waiting-for/stale prompt generation + delivery +- persisted scheduler/job/receipt evidence +- hosted settings status surface for brief/notification posture -## completed work -- Updated the active control/docs layer to reflect an active `P10-S3` execution sprint: +## Completed Work +- Updated the active control/docs layer to reflect an active `P10-S4` execution sprint: - `.ai/active/SPRINT_PACKET.md` - `.ai/handoff/CURRENT_STATE.md` - `README.md` -- Added migration `20260408_0045_phase10_chat_continuity_approvals.py` with: - - `approval_challenges` table - - `open_loop_reviews` table - - additive `chat_intents` fields: `intent_payload`, `result_payload`, `handled_at` - - expanded `chat_intents` intent/status constraints for P10-S3 routing lifecycle -- Added new Telegram continuity orchestration module `apps/api/src/alicebot_api/telegram_continuity.py`: - - hosted-user continuity context preparation - - deterministic Telegram intent classification - - handle flow for capture, recall, resume, correction, open-loop review, approvals, approval approve/reject - - provenance-aware recall responses and correction-aware follow-up behavior - - persisted chat intent/result records - - approval challenge persistence and resolution updates - - open-loop review action logging -- Added new P10-S3 endpoints in `apps/api/src/alicebot_api/main.py`: - - `POST /v1/channels/telegram/messages/{message_id}/handle` - - `GET /v1/channels/telegram/messages/{message_id}/result` - - `GET /v1/channels/telegram/recall` - - `GET /v1/channels/telegram/resume` - - `GET /v1/channels/telegram/open-loops` - - `POST /v1/channels/telegram/open-loops/{open_loop_id}/review-action` - - `GET /v1/channels/telegram/approvals` - - `POST /v1/channels/telegram/approvals/{approval_id}/approve` - - `POST /v1/channels/telegram/approvals/{approval_id}/reject` -- Updated type contracts/store rows for new chat intent payload/result fields and challenge/review records. -- Added sprint-owned tests: - - migration unit tests for `0045` - - unit tests for Telegram intent classification - - integration tests covering all P10-S3 endpoints, wrong-intent routing, correction uptake, open-loop review actions, and approval approve/reject (direct + chat) -- Updated active control docs with required historical markers so the required truth check passes. +- Added additive migration `20260408_0046_phase10_daily_brief_notifications.py` with: + - `notification_subscriptions` + - `continuity_briefs` + - `daily_brief_jobs` + - scheduled-delivery metadata columns on `channel_delivery_receipts` + - receipt status extension to include `suppressed` +- Added new API helper module `apps/api/src/alicebot_api/telegram_notifications.py` implementing: + - preference ensure/patch/read + - quiet-hours + window + enablement policy gating + - daily brief preview composition (continuity + chief-of-staff summary) + - daily brief delivery with idempotent job handling + - open-loop prompt listing/delivery (waiting-for + stale) + - scheduler due-job materialization + listing + - workspace-scoped internal idempotency derivation and lookup for custom delivery keys +- Extended Telegram delivery seam in `telegram_channels.py` with workspace-level scheduled dispatch: + - `dispatch_telegram_workspace_message(...)` + - scheduled receipt metadata persistence + - receipt serialization/query updates +- Added `P10-S4` endpoints in `main.py`: + - `GET /v1/channels/telegram/daily-brief` + - `POST /v1/channels/telegram/daily-brief/deliver` + - `GET /v1/channels/telegram/notification-preferences` + - `PATCH /v1/channels/telegram/notification-preferences` + - `GET /v1/channels/telegram/open-loop-prompts` + - `POST /v1/channels/telegram/open-loop-prompts/{prompt_id}/deliver` + - `GET /v1/channels/telegram/scheduler/jobs` +- Updated contract/store typing for new scheduler/subscription/receipt fields. +- Updated hosted settings UI to surface `P10-S4` notification posture, daily brief preview/delivery, open-loop prompts, and scheduler jobs. +- Updated control-truth wording in hosted web surfaces to avoid claims about admin/support/launch hardening. +- Added/updated tests for migration, unit policy logic, new integration API flows, and web settings UI. +- Updated `.ai/handoff/CURRENT_STATE.md` marker required by control-doc truth check. -## incomplete work -- None within the sprint packet acceptance criteria. -- No new web UI/components were added because this implementation completed the sprint through API/chat behavior and endpoint coverage. +## Incomplete Work +- None identified within `P10-S4` sprint packet scope. -## files changed +## Files Changed - `/Users/samirusani/Desktop/Codex/AliceBot/.ai/active/SPRINT_PACKET.md` - `/Users/samirusani/Desktop/Codex/AliceBot/.ai/handoff/CURRENT_STATE.md` - `/Users/samirusani/Desktop/Codex/AliceBot/README.md` - `/Users/samirusani/Desktop/Codex/AliceBot/BUILD_REPORT.md` - `/Users/samirusani/Desktop/Codex/AliceBot/REVIEW_REPORT.md` -- `/Users/samirusani/Desktop/Codex/AliceBot/apps/api/alembic/versions/20260408_0045_phase10_chat_continuity_approvals.py` -- `/Users/samirusani/Desktop/Codex/AliceBot/apps/api/src/alicebot_api/telegram_continuity.py` +- `/Users/samirusani/Desktop/Codex/AliceBot/apps/api/alembic/versions/20260408_0046_phase10_daily_brief_notifications.py` +- `/Users/samirusani/Desktop/Codex/AliceBot/apps/api/src/alicebot_api/telegram_notifications.py` - `/Users/samirusani/Desktop/Codex/AliceBot/apps/api/src/alicebot_api/main.py` +- `/Users/samirusani/Desktop/Codex/AliceBot/apps/api/src/alicebot_api/telegram_channels.py` - `/Users/samirusani/Desktop/Codex/AliceBot/apps/api/src/alicebot_api/contracts.py` - `/Users/samirusani/Desktop/Codex/AliceBot/apps/api/src/alicebot_api/store.py` -- `/Users/samirusani/Desktop/Codex/AliceBot/tests/unit/test_20260408_0045_phase10_chat_continuity_approvals.py` -- `/Users/samirusani/Desktop/Codex/AliceBot/tests/unit/test_telegram_continuity.py` -- `/Users/samirusani/Desktop/Codex/AliceBot/tests/integration/test_phase10_chat_continuity_approvals_api.py` +- `/Users/samirusani/Desktop/Codex/AliceBot/tests/integration/test_phase10_daily_brief_notifications_api.py` +- `/Users/samirusani/Desktop/Codex/AliceBot/tests/unit/test_telegram_notifications.py` +- `/Users/samirusani/Desktop/Codex/AliceBot/tests/unit/test_20260408_0046_phase10_daily_brief_notifications.py` +- `/Users/samirusani/Desktop/Codex/AliceBot/tests/unit/test_main.py` +- `/Users/samirusani/Desktop/Codex/AliceBot/apps/web/components/hosted-settings-panel.tsx` +- `/Users/samirusani/Desktop/Codex/AliceBot/apps/web/components/hosted-settings-panel.test.tsx` +- `/Users/samirusani/Desktop/Codex/AliceBot/apps/web/app/settings/page.tsx` +- `/Users/samirusani/Desktop/Codex/AliceBot/apps/web/app/settings/page.test.tsx` +- `/Users/samirusani/Desktop/Codex/AliceBot/apps/web/app/page.tsx` -## tests run -- `python3 scripts/check_control_doc_truth.py` - - PASS -- `./.venv/bin/python -m pytest tests/unit tests/integration -q` - - PASS (`1014 passed`) -- `pnpm --dir apps/web test` - - PASS (`60 passed`, `196 tests`) +## Tests Run +1. `python3 scripts/check_control_doc_truth.py` +- Result: PASS +- Output: + - `Control-doc truth check: PASS` + - verified: `README.md` + - verified: `ROADMAP.md` + - verified: `.ai/active/SPRINT_PACKET.md` + - verified: `RULES.md` + - verified: `.ai/handoff/CURRENT_STATE.md` + - verified: `docs/archive/planning/2026-04-08-context-compaction/README.md` -## blockers/issues -- Initial blocker: control-doc truth check failed due missing required historical markers in active control docs. -- Resolution: added minimal historical marker lines to `.ai/active/SPRINT_PACKET.md` and `.ai/handoff/CURRENT_STATE.md` without changing sprint scope. +2. `./.venv/bin/python -m pytest tests/unit tests/integration -q` +- Result: PASS +- Output summary: `1025 passed in 139.39s (0:02:19)` -## recommended next step -Seek explicit Control Tower merge approval for `P10-S3`, using this branch head and the verification evidence above. +3. `pnpm --dir apps/web test` +- Result: PASS +- Output summary: + - `Test Files 60 passed (60)` + - `Tests 196 passed (196)` + +## Blockers/Issues +- Initial control-doc truth check failed due a required marker missing in `.ai/handoff/CURRENT_STATE.md`; resolved by adding the required marker. +- Fixed during review: custom idempotency keys are now tenant/workspace scoped to prevent cross-workspace collision/replay. +- No remaining blockers. + +## Recommended Next Step +Seek explicit Control Tower merge approval for `P10-S4`, using this branch head and the verification evidence above. diff --git a/README.md b/README.md index 3d28721..d188b07 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ Alice is a local-first memory and continuity engine for AI agents. -Phase 9 is complete. Alice Connect is the planned Phase 10 product layer on top of that shipped core, `P10-S1` and `P10-S2` are shipped, and `P10-S3` is the active execution sprint. +Phase 9 is complete. Alice Connect is the planned Phase 10 product layer on top of that shipped core, `P10-S1` through `P10-S3` are shipped, and `P10-S4` is the active execution sprint. ## What v0.1 Ships diff --git a/REVIEW_REPORT.md b/REVIEW_REPORT.md index e3c8271..e296310 100644 --- a/REVIEW_REPORT.md +++ b/REVIEW_REPORT.md @@ -1,58 +1,61 @@ -# REVIEW_REPORT +# REVIEW_REPORT.md ## verdict PASS ## criteria met -- All `P10-S3` in-scope Telegram continuity/approval endpoints are implemented and exercised: - - `POST /v1/channels/telegram/messages/{message_id}/handle` - - `GET /v1/channels/telegram/messages/{message_id}/result` - - `GET /v1/channels/telegram/recall` - - `GET /v1/channels/telegram/resume` - - `GET /v1/channels/telegram/open-loops` - - `POST /v1/channels/telegram/open-loops/{open_loop_id}/review-action` - - `GET /v1/channels/telegram/approvals` - - `POST /v1/channels/telegram/approvals/{approval_id}/approve` - - `POST /v1/channels/telegram/approvals/{approval_id}/reject` -- Sprint data additions are present and wired: - - `approval_challenges` - - `open_loop_reviews` - - additive `chat_intents` result fields (`intent_payload`, `result_payload`, `handled_at`). -- Deterministic routing and chat-native behavior for capture, recall, resume, correction, open-loop review, approvals, approve, and reject are implemented on top of shipped P10-S2 transport seams. -- Provenance/correction discipline is preserved through existing continuity/approval modules (no parallel semantics stack). -- Previously identified optional-field coercion defect is fixed in `telegram_continuity.py` (no `None` -> `'None'` conversion on intent payload fields). -- Regression coverage added for: - - queryless `/resume` behavior returning handled brief context, - - `/recall` without query failing with explicit validation detail, - - `/approve` and `/reject` without IDs failing with explicit "requires approval id" details. -- Control docs are aligned to an active `P10-S3` execution sprint and baseline-shipped `P10-S1` / `P10-S2` state: +- `P10-S4` API scope is implemented and routed in `apps/api/src/alicebot_api/main.py`: + - `GET /v1/channels/telegram/daily-brief` + - `POST /v1/channels/telegram/daily-brief/deliver` + - `GET /v1/channels/telegram/notification-preferences` + - `PATCH /v1/channels/telegram/notification-preferences` + - `GET /v1/channels/telegram/open-loop-prompts` + - `POST /v1/channels/telegram/open-loop-prompts/{prompt_id}/deliver` + - `GET /v1/channels/telegram/scheduler/jobs` +- In-scope persistence additions are present: + - `notification_subscriptions` + - `continuity_briefs` + - `daily_brief_jobs` + - additive scheduler metadata on `channel_delivery_receipts` +- Daily brief generation is built from durable continuity/chief-of-staff state and delivered through Telegram delivery seams. +- Quiet-hours and notification preference gates deterministically suppress or allow delivery (`suppressed_disabled`, `suppressed_quiet_hours`, `suppressed_outside_window`, etc.). +- Waiting-for and stale open-loop prompts are generated and delivered as scheduled nudges without reimplementing `P10-S3` generic review semantics. +- Job + receipt evidence is persisted with deterministic status and metadata. +- Blocking idempotency isolation defect identified in prior review is fixed: + - internal idempotency keys are workspace-scoped for client-supplied values + - job lookup/upsert is workspace/channel scoped + - fallback outbound-message idempotency reads are workspace scoped + - migration enforces workspace/channel/idempotency uniqueness for `daily_brief_jobs` +- Regression coverage added for cross-workspace reuse of the same custom idempotency key (`tests/integration/test_phase10_daily_brief_notifications_api.py`). +- Control docs are aligned to an active `P10-S4` execution sprint and baseline-shipped `P10-S1` through `P10-S3` state: - `.ai/active/SPRINT_PACKET.md` - `.ai/handoff/CURRENT_STATE.md` - `README.md` -- Required verification commands pass in this re-review: +- Required verification commands were rerun and pass: - `python3 scripts/check_control_doc_truth.py` -> PASS - - `./.venv/bin/python -m pytest tests/unit tests/integration -q` -> PASS (`1014 passed`) - - `pnpm --dir apps/web test` -> PASS (`60 files`, `196 tests`) + - `./.venv/bin/python -m pytest tests/unit tests/integration -q` -> `1025 passed` + - `pnpm --dir apps/web test` -> `60 passed files`, `196 passed tests` ## criteria missed - None. ## quality issues -- No blocking quality issues found in sprint-owned changes after fixes. +- No blocking quality issues found in sprint-owned implementation after the idempotency scoping fix. ## regression risks - Low. -- Residual product risk is standard heuristic-classification ambiguity in free-form chat intent detection, but implemented fail-safe behavior is deterministic and auditable. +- Residual risk is mainly operational (scheduler volume/throughput behavior under larger datasets), not correctness of the `P10-S4` contracts. ## docs issues -- No blocking documentation issues for `P10-S3` acceptance. +- No blocking docs issues. +- `BUILD_REPORT.md` now reflects the idempotency fix and latest verification totals. ## should anything be added to RULES.md? -- Not required for this sprint pass. +- Optional: add an explicit durable rule that hosted/channel idempotency must be tenant/workspace scoped for lookup and uniqueness. ## should anything update ARCHITECTURE.md? -- Optional only: document that Telegram continuity handling now persists intent outcomes plus approval/open-loop review audit artifacts for hosted control-plane traceability. +- Optional: add a short security invariant under hosted control-plane boundaries that dedupe/idempotency keys are tenant-scoped. ## recommended next action 1. Ready for Control Tower merge approval under policy. -2. After merge, open `P10-S4` only for daily brief and notification work on top of these continuity and approval seams. +2. After merge, open `P10-S5` only for beta hardening and launch-readiness work on top of these scheduled-delivery seams. diff --git a/apps/api/alembic/versions/20260408_0046_phase10_daily_brief_notifications.py b/apps/api/alembic/versions/20260408_0046_phase10_daily_brief_notifications.py new file mode 100644 index 0000000..8ba6ca9 --- /dev/null +++ b/apps/api/alembic/versions/20260408_0046_phase10_daily_brief_notifications.py @@ -0,0 +1,215 @@ +"""Add Phase 10 Sprint 4 daily brief jobs, notification subscriptions, and scheduled receipt metadata.""" + +from __future__ import annotations + +from alembic import op + + +revision = "20260408_0046" +down_revision = "20260408_0045" +branch_labels = None +depends_on = None + + +_UPGRADE_STATEMENTS = ( + """ + CREATE TABLE notification_subscriptions ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + workspace_id uuid NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE, + channel_type text NOT NULL, + channel_identity_id uuid NOT NULL REFERENCES channel_identities(id) ON DELETE CASCADE, + notifications_enabled boolean NOT NULL DEFAULT TRUE, + daily_brief_enabled boolean NOT NULL DEFAULT TRUE, + daily_brief_window_start text NOT NULL DEFAULT '07:00', + open_loop_prompts_enabled boolean NOT NULL DEFAULT TRUE, + waiting_for_prompts_enabled boolean NOT NULL DEFAULT TRUE, + stale_prompts_enabled boolean NOT NULL DEFAULT TRUE, + timezone text NOT NULL DEFAULT 'UTC', + quiet_hours_enabled boolean NOT NULL DEFAULT FALSE, + quiet_hours_start text NOT NULL DEFAULT '22:00', + quiet_hours_end text NOT NULL DEFAULT '07:00', + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + UNIQUE (workspace_id, channel_type), + UNIQUE (channel_identity_id, channel_type), + CONSTRAINT notification_subscriptions_channel_type_check + CHECK (channel_type IN ('telegram')), + CONSTRAINT notification_subscriptions_window_start_format_check + CHECK (daily_brief_window_start ~ '^(?:[01][0-9]|2[0-3]):[0-5][0-9]$'), + CONSTRAINT notification_subscriptions_quiet_start_format_check + CHECK (quiet_hours_start ~ '^(?:[01][0-9]|2[0-3]):[0-5][0-9]$'), + CONSTRAINT notification_subscriptions_quiet_end_format_check + CHECK (quiet_hours_end ~ '^(?:[01][0-9]|2[0-3]):[0-5][0-9]$') + ) + """, + ( + "CREATE INDEX notification_subscriptions_workspace_updated_idx " + "ON notification_subscriptions (workspace_id, channel_type, updated_at DESC, id DESC)" + ), + """ + CREATE TABLE continuity_briefs ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + workspace_id uuid NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE, + channel_type text NOT NULL, + channel_identity_id uuid NOT NULL REFERENCES channel_identities(id) ON DELETE CASCADE, + brief_kind text NOT NULL, + assembly_version text NOT NULL, + summary jsonb NOT NULL DEFAULT '{}'::jsonb, + brief_payload jsonb NOT NULL DEFAULT '{}'::jsonb, + message_text text NOT NULL, + compiled_at timestamptz NOT NULL DEFAULT now(), + created_at timestamptz NOT NULL DEFAULT now(), + CONSTRAINT continuity_briefs_channel_type_check + CHECK (channel_type IN ('telegram')), + CONSTRAINT continuity_briefs_kind_check + CHECK (brief_kind IN ('daily_brief')) + ) + """, + ( + "CREATE INDEX continuity_briefs_workspace_compiled_idx " + "ON continuity_briefs (workspace_id, channel_type, compiled_at DESC, id DESC)" + ), + """ + CREATE TABLE daily_brief_jobs ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + workspace_id uuid NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE, + channel_type text NOT NULL, + channel_identity_id uuid NOT NULL REFERENCES channel_identities(id) ON DELETE CASCADE, + job_kind text NOT NULL, + prompt_kind text NULL, + prompt_id text NULL, + continuity_object_id uuid NULL REFERENCES continuity_objects(id) ON DELETE SET NULL, + continuity_brief_id uuid NULL REFERENCES continuity_briefs(id) ON DELETE SET NULL, + schedule_slot text NOT NULL, + idempotency_key text NOT NULL, + due_at timestamptz NOT NULL, + status text NOT NULL, + suppression_reason text NULL, + attempt_count integer NOT NULL DEFAULT 0, + delivery_receipt_id uuid NULL, + payload jsonb NOT NULL DEFAULT '{}'::jsonb, + result_payload jsonb NOT NULL DEFAULT '{}'::jsonb, + attempted_at timestamptz NULL, + completed_at timestamptz NULL, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + CONSTRAINT daily_brief_jobs_channel_type_check + CHECK (channel_type IN ('telegram')), + CONSTRAINT daily_brief_jobs_kind_check + CHECK (job_kind IN ('daily_brief', 'open_loop_prompt')), + CONSTRAINT daily_brief_jobs_prompt_kind_check + CHECK (prompt_kind IS NULL OR prompt_kind IN ('waiting_for', 'stale')), + CONSTRAINT daily_brief_jobs_status_check + CHECK ( + status IN ( + 'scheduled', + 'delivered', + 'simulated', + 'suppressed_quiet_hours', + 'suppressed_disabled', + 'suppressed_outside_window', + 'failed' + ) + ), + CONSTRAINT daily_brief_jobs_attempt_count_check + CHECK (attempt_count >= 0), + CONSTRAINT daily_brief_jobs_prompt_required_for_open_loop_check + CHECK ( + (job_kind = 'daily_brief' AND prompt_id IS NULL) + OR + (job_kind = 'open_loop_prompt' AND prompt_id IS NOT NULL) + ), + CONSTRAINT daily_brief_jobs_workspace_idempotency_unique + UNIQUE (workspace_id, channel_type, idempotency_key) + ) + """, + ( + "CREATE INDEX daily_brief_jobs_workspace_due_idx " + "ON daily_brief_jobs (workspace_id, channel_type, due_at DESC, id DESC)" + ), + ( + "CREATE INDEX daily_brief_jobs_workspace_status_due_idx " + "ON daily_brief_jobs (workspace_id, channel_type, status, due_at DESC, id DESC)" + ), + ( + "CREATE INDEX daily_brief_jobs_workspace_prompt_slot_idx " + "ON daily_brief_jobs (workspace_id, prompt_id, schedule_slot, created_at DESC, id DESC)" + ), + "ALTER TABLE channel_delivery_receipts DROP CONSTRAINT IF EXISTS channel_delivery_receipts_status_check", + """ + ALTER TABLE channel_delivery_receipts + ADD CONSTRAINT channel_delivery_receipts_status_check + CHECK (status IN ('delivered', 'failed', 'simulated', 'suppressed')) + """, + """ + ALTER TABLE channel_delivery_receipts + ADD COLUMN scheduled_job_id uuid NULL REFERENCES daily_brief_jobs(id) ON DELETE SET NULL, + ADD COLUMN scheduler_job_kind text NULL, + ADD COLUMN scheduled_for timestamptz NULL, + ADD COLUMN schedule_slot text NULL, + ADD COLUMN notification_policy jsonb NOT NULL DEFAULT '{}'::jsonb + """, + """ + ALTER TABLE channel_delivery_receipts + ADD CONSTRAINT channel_delivery_receipts_scheduler_job_kind_check + CHECK (scheduler_job_kind IS NULL OR scheduler_job_kind IN ('daily_brief', 'open_loop_prompt')) + """, + ( + "CREATE INDEX channel_delivery_receipts_workspace_scheduler_idx " + "ON channel_delivery_receipts (workspace_id, scheduled_for DESC, id DESC)" + ), +) + +_UPGRADE_GRANT_STATEMENTS = ( + "GRANT SELECT, INSERT, UPDATE, DELETE ON notification_subscriptions TO alicebot_app", + "GRANT SELECT, INSERT, UPDATE, DELETE ON continuity_briefs TO alicebot_app", + "GRANT SELECT, INSERT, UPDATE, DELETE ON daily_brief_jobs TO alicebot_app", +) + +_DOWNGRADE_STATEMENTS = ( + "DROP INDEX IF EXISTS channel_delivery_receipts_workspace_scheduler_idx", + "ALTER TABLE channel_delivery_receipts DROP CONSTRAINT IF EXISTS channel_delivery_receipts_scheduler_job_kind_check", + """ + ALTER TABLE channel_delivery_receipts + DROP COLUMN IF EXISTS notification_policy, + DROP COLUMN IF EXISTS schedule_slot, + DROP COLUMN IF EXISTS scheduled_for, + DROP COLUMN IF EXISTS scheduler_job_kind, + DROP COLUMN IF EXISTS scheduled_job_id + """, + "ALTER TABLE channel_delivery_receipts DROP CONSTRAINT IF EXISTS channel_delivery_receipts_status_check", + """ + UPDATE channel_delivery_receipts + SET status = 'failed', + failure_code = COALESCE(failure_code, 'suppressed_status_downgrade'), + failure_detail = COALESCE(failure_detail, 'suppressed receipt downgraded during migration rollback') + WHERE status = 'suppressed' + """, + """ + ALTER TABLE channel_delivery_receipts + ADD CONSTRAINT channel_delivery_receipts_status_check + CHECK (status IN ('delivered', 'failed', 'simulated')) + """, + "DROP INDEX IF EXISTS daily_brief_jobs_workspace_prompt_slot_idx", + "DROP INDEX IF EXISTS daily_brief_jobs_workspace_status_due_idx", + "DROP INDEX IF EXISTS daily_brief_jobs_workspace_due_idx", + "DROP TABLE IF EXISTS daily_brief_jobs", + "DROP INDEX IF EXISTS continuity_briefs_workspace_compiled_idx", + "DROP TABLE IF EXISTS continuity_briefs", + "DROP INDEX IF EXISTS notification_subscriptions_workspace_updated_idx", + "DROP TABLE IF EXISTS notification_subscriptions", +) + + +def _execute_statements(statements: tuple[str, ...]) -> None: + for statement in statements: + op.execute(statement) + + +def upgrade() -> None: + _execute_statements(_UPGRADE_STATEMENTS) + _execute_statements(_UPGRADE_GRANT_STATEMENTS) + + +def downgrade() -> None: + _execute_statements(_DOWNGRADE_STATEMENTS) diff --git a/apps/api/src/alicebot_api/contracts.py b/apps/api/src/alicebot_api/contracts.py index 01c130e..bb1f48a 100644 --- a/apps/api/src/alicebot_api/contracts.py +++ b/apps/api/src/alicebot_api/contracts.py @@ -111,7 +111,18 @@ "unknown", ] ChatIntentStatus = Literal["pending", "recorded", "handled", "failed"] -ChannelDeliveryReceiptStatus = Literal["delivered", "failed", "simulated"] +ChannelDeliveryReceiptStatus = Literal["delivered", "failed", "simulated", "suppressed"] +TelegramSchedulerJobKind = Literal["daily_brief", "open_loop_prompt"] +TelegramSchedulerPromptKind = Literal["waiting_for", "stale"] +TelegramSchedulerJobStatus = Literal[ + "scheduled", + "delivered", + "simulated", + "suppressed_quiet_hours", + "suppressed_disabled", + "suppressed_outside_window", + "failed", +] TaskArtifactStatus = Literal["registered"] TaskArtifactIngestionStatus = Literal["pending", "ingested"] TaskArtifactChunkRetrievalScopeKind = Literal["task", "artifact"] @@ -5186,6 +5197,25 @@ class HostedUserPreferencesRecord(TypedDict): updated_at: str +class NotificationSubscriptionRecord(TypedDict): + id: str + workspace_id: str + channel_type: ChannelTransportType + channel_identity_id: str + notifications_enabled: bool + daily_brief_enabled: bool + daily_brief_window_start: str + open_loop_prompts_enabled: bool + waiting_for_prompts_enabled: bool + stale_prompts_enabled: bool + timezone: str + quiet_hours_enabled: bool + quiet_hours_start: str + quiet_hours_end: str + created_at: str + updated_at: str + + class ChannelIdentityRecord(TypedDict): id: str user_account_id: str @@ -5267,10 +5297,54 @@ class ChannelDeliveryReceiptRecord(TypedDict): provider_receipt_id: str | None failure_code: str | None failure_detail: str | None + scheduled_job_id: str | None + scheduler_job_kind: TelegramSchedulerJobKind | None + scheduled_for: str | None + schedule_slot: str | None + notification_policy: JsonObject recorded_at: str created_at: str +class TelegramContinuityBriefRecord(TypedDict): + id: str + workspace_id: str + channel_type: ChannelTransportType + channel_identity_id: str + brief_kind: Literal["daily_brief"] + assembly_version: str + summary: JsonObject + brief_payload: JsonObject + message_text: str + compiled_at: str + created_at: str + + +class TelegramDailyBriefJobRecord(TypedDict): + id: str + workspace_id: str + channel_type: ChannelTransportType + channel_identity_id: str + job_kind: TelegramSchedulerJobKind + prompt_kind: TelegramSchedulerPromptKind | None + prompt_id: str | None + continuity_object_id: str | None + continuity_brief_id: str | None + schedule_slot: str + idempotency_key: str + due_at: str + status: TelegramSchedulerJobStatus + suppression_reason: str | None + attempt_count: int + delivery_receipt_id: str | None + payload: JsonObject + result_payload: JsonObject + attempted_at: str | None + completed_at: str | None + created_at: str + updated_at: str + + class ApprovalChallengeRecord(TypedDict): id: str workspace_id: str diff --git a/apps/api/src/alicebot_api/main.py b/apps/api/src/alicebot_api/main.py index 07a351d..f40d277 100644 --- a/apps/api/src/alicebot_api/main.py +++ b/apps/api/src/alicebot_api/main.py @@ -422,6 +422,17 @@ prepare_telegram_continuity_context, reject_telegram_approval, ) +from alicebot_api.telegram_notifications import ( + TelegramNotificationPreferenceValidationError, + TelegramOpenLoopPromptNotFoundError, + deliver_workspace_daily_brief, + deliver_workspace_open_loop_prompt, + get_workspace_daily_brief_preview, + get_workspace_notification_preferences, + list_workspace_open_loop_prompts, + list_workspace_scheduler_jobs, + patch_workspace_notification_subscription, +) from alicebot_api.continuity_review import ( ContinuityReviewNotFoundError, ContinuityReviewValidationError, @@ -1418,6 +1429,28 @@ class TelegramApprovalResolveBody(BaseModel): note: str | None = Field(default=None, min_length=1, max_length=500) +class TelegramNotificationPreferencesPatchRequest(BaseModel): + model_config = ConfigDict(extra="forbid") + + notifications_enabled: bool | None = None + daily_brief_enabled: bool | None = None + daily_brief_window_start: str | None = Field(default=None, min_length=5, max_length=5) + open_loop_prompts_enabled: bool | None = None + waiting_for_prompts_enabled: bool | None = None + stale_prompts_enabled: bool | None = None + timezone: str | None = Field(default=None, min_length=1, max_length=120) + quiet_hours_enabled: bool | None = None + quiet_hours_start: str | None = Field(default=None, min_length=5, max_length=5) + quiet_hours_end: str | None = Field(default=None, min_length=5, max_length=5) + + +class TelegramScheduledDeliveryRequest(BaseModel): + model_config = ConfigDict(extra="forbid") + + force: bool = False + idempotency_key: str | None = Field(default=None, min_length=8, max_length=200) + + def _extract_bearer_token(request: Request) -> str: raw_authorization = request.headers.get("authorization", "").strip() if raw_authorization == "": @@ -5566,6 +5599,291 @@ def list_v1_telegram_delivery_receipts( ) +@app.get("/v1/channels/telegram/notification-preferences") +def get_v1_telegram_notification_preferences( + request: Request, +) -> JSONResponse: + settings = get_settings() + + try: + session_token = _extract_bearer_token(request) + with psycopg.connect(settings.database_url, row_factory=dict_row) as conn: + with conn.transaction(): + resolution = resolve_auth_session(conn, session_token=session_token) + user_account_id = resolution["user_account"]["id"] + workspace = _resolve_workspace_for_hosted_channel_request( + conn, + user_account_id=user_account_id, + session_id=resolution["session"]["id"], + preferred_workspace_id=resolution["session"]["workspace_id"], + requested_workspace_id=None, + ) + if workspace is None: + return JSONResponse(status_code=404, content={"detail": "no workspace is currently selected"}) + + payload = get_workspace_notification_preferences( + conn, + user_account_id=user_account_id, + workspace_id=workspace["id"], + ) + except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc: + return JSONResponse(status_code=401, content={"detail": str(exc)}) + except TelegramIdentityNotFoundError as exc: + return JSONResponse(status_code=409, content={"detail": str(exc)}) + except TelegramNotificationPreferenceValidationError as exc: + return JSONResponse(status_code=400, content={"detail": str(exc)}) + + return JSONResponse(status_code=200, content=jsonable_encoder(payload)) + + +@app.patch("/v1/channels/telegram/notification-preferences") +def patch_v1_telegram_notification_preferences( + request: Request, + body: TelegramNotificationPreferencesPatchRequest, +) -> JSONResponse: + settings = get_settings() + + try: + session_token = _extract_bearer_token(request) + with psycopg.connect(settings.database_url, row_factory=dict_row) as conn: + with conn.transaction(): + resolution = resolve_auth_session(conn, session_token=session_token) + user_account_id = resolution["user_account"]["id"] + workspace = _resolve_workspace_for_hosted_channel_request( + conn, + user_account_id=user_account_id, + session_id=resolution["session"]["id"], + preferred_workspace_id=resolution["session"]["workspace_id"], + requested_workspace_id=None, + ) + if workspace is None: + return JSONResponse(status_code=404, content={"detail": "no workspace is currently selected"}) + + patch_payload = body.model_dump(exclude_none=True) + patch_workspace_notification_subscription( + conn, + user_account_id=user_account_id, + workspace_id=workspace["id"], + patch=patch_payload, + ) + payload = get_workspace_notification_preferences( + conn, + user_account_id=user_account_id, + workspace_id=workspace["id"], + ) + except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc: + return JSONResponse(status_code=401, content={"detail": str(exc)}) + except TelegramIdentityNotFoundError as exc: + return JSONResponse(status_code=409, content={"detail": str(exc)}) + except TelegramNotificationPreferenceValidationError as exc: + return JSONResponse(status_code=400, content={"detail": str(exc)}) + + return JSONResponse(status_code=200, content=jsonable_encoder(payload)) + + +@app.get("/v1/channels/telegram/daily-brief") +def get_v1_telegram_daily_brief( + request: Request, +) -> JSONResponse: + settings = get_settings() + + try: + session_token = _extract_bearer_token(request) + with psycopg.connect(settings.database_url, row_factory=dict_row) as conn: + with conn.transaction(): + resolution = resolve_auth_session(conn, session_token=session_token) + user_account_id = resolution["user_account"]["id"] + workspace = _resolve_workspace_for_hosted_channel_request( + conn, + user_account_id=user_account_id, + session_id=resolution["session"]["id"], + preferred_workspace_id=resolution["session"]["workspace_id"], + requested_workspace_id=None, + ) + if workspace is None: + return JSONResponse(status_code=404, content={"detail": "no workspace is currently selected"}) + + payload = get_workspace_daily_brief_preview( + conn, + user_account_id=user_account_id, + workspace_id=workspace["id"], + ) + except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc: + return JSONResponse(status_code=401, content={"detail": str(exc)}) + except TelegramIdentityNotFoundError as exc: + return JSONResponse(status_code=409, content={"detail": str(exc)}) + except TelegramNotificationPreferenceValidationError as exc: + return JSONResponse(status_code=400, content={"detail": str(exc)}) + + return JSONResponse(status_code=200, content=jsonable_encoder(payload)) + + +@app.post("/v1/channels/telegram/daily-brief/deliver") +def post_v1_telegram_daily_brief_deliver( + request: Request, + body: TelegramScheduledDeliveryRequest, +) -> JSONResponse: + settings = get_settings() + + try: + session_token = _extract_bearer_token(request) + with psycopg.connect(settings.database_url, row_factory=dict_row) as conn: + with conn.transaction(): + resolution = resolve_auth_session(conn, session_token=session_token) + user_account_id = resolution["user_account"]["id"] + workspace = _resolve_workspace_for_hosted_channel_request( + conn, + user_account_id=user_account_id, + session_id=resolution["session"]["id"], + preferred_workspace_id=resolution["session"]["workspace_id"], + requested_workspace_id=None, + ) + if workspace is None: + return JSONResponse(status_code=404, content={"detail": "no workspace is currently selected"}) + + payload = deliver_workspace_daily_brief( + conn, + user_account_id=user_account_id, + workspace_id=workspace["id"], + bot_token=settings.telegram_bot_token, + force=body.force, + idempotency_key=body.idempotency_key, + ) + except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc: + return JSONResponse(status_code=401, content={"detail": str(exc)}) + except TelegramIdentityNotFoundError as exc: + return JSONResponse(status_code=409, content={"detail": str(exc)}) + except TelegramNotificationPreferenceValidationError as exc: + return JSONResponse(status_code=400, content={"detail": str(exc)}) + + status_code = 200 if bool(payload.get("idempotent_replay")) else 201 + return JSONResponse(status_code=status_code, content=jsonable_encoder(payload)) + + +@app.get("/v1/channels/telegram/open-loop-prompts") +def list_v1_telegram_open_loop_prompts( + request: Request, + limit: int = Query(default=20, ge=1, le=100), +) -> JSONResponse: + settings = get_settings() + + try: + session_token = _extract_bearer_token(request) + with psycopg.connect(settings.database_url, row_factory=dict_row) as conn: + with conn.transaction(): + resolution = resolve_auth_session(conn, session_token=session_token) + user_account_id = resolution["user_account"]["id"] + workspace = _resolve_workspace_for_hosted_channel_request( + conn, + user_account_id=user_account_id, + session_id=resolution["session"]["id"], + preferred_workspace_id=resolution["session"]["workspace_id"], + requested_workspace_id=None, + ) + if workspace is None: + return JSONResponse(status_code=404, content={"detail": "no workspace is currently selected"}) + + payload = list_workspace_open_loop_prompts( + conn, + user_account_id=user_account_id, + workspace_id=workspace["id"], + limit=limit, + ) + except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc: + return JSONResponse(status_code=401, content={"detail": str(exc)}) + except TelegramIdentityNotFoundError as exc: + return JSONResponse(status_code=409, content={"detail": str(exc)}) + except TelegramNotificationPreferenceValidationError as exc: + return JSONResponse(status_code=400, content={"detail": str(exc)}) + + return JSONResponse(status_code=200, content=jsonable_encoder(payload)) + + +@app.post("/v1/channels/telegram/open-loop-prompts/{prompt_id}/deliver") +def post_v1_telegram_open_loop_prompt_deliver( + prompt_id: str, + request: Request, + body: TelegramScheduledDeliveryRequest, +) -> JSONResponse: + settings = get_settings() + + try: + session_token = _extract_bearer_token(request) + with psycopg.connect(settings.database_url, row_factory=dict_row) as conn: + with conn.transaction(): + resolution = resolve_auth_session(conn, session_token=session_token) + user_account_id = resolution["user_account"]["id"] + workspace = _resolve_workspace_for_hosted_channel_request( + conn, + user_account_id=user_account_id, + session_id=resolution["session"]["id"], + preferred_workspace_id=resolution["session"]["workspace_id"], + requested_workspace_id=None, + ) + if workspace is None: + return JSONResponse(status_code=404, content={"detail": "no workspace is currently selected"}) + + payload = deliver_workspace_open_loop_prompt( + conn, + user_account_id=user_account_id, + workspace_id=workspace["id"], + prompt_id=prompt_id, + bot_token=settings.telegram_bot_token, + force=body.force, + idempotency_key=body.idempotency_key, + ) + except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc: + return JSONResponse(status_code=401, content={"detail": str(exc)}) + except TelegramIdentityNotFoundError as exc: + return JSONResponse(status_code=409, content={"detail": str(exc)}) + except TelegramOpenLoopPromptNotFoundError as exc: + return JSONResponse(status_code=404, content={"detail": str(exc)}) + except TelegramNotificationPreferenceValidationError as exc: + return JSONResponse(status_code=400, content={"detail": str(exc)}) + + status_code = 200 if bool(payload.get("idempotent_replay")) else 201 + return JSONResponse(status_code=status_code, content=jsonable_encoder(payload)) + + +@app.get("/v1/channels/telegram/scheduler/jobs") +def list_v1_telegram_scheduler_jobs( + request: Request, + limit: int = Query(default=50, ge=1, le=200), +) -> JSONResponse: + settings = get_settings() + + try: + session_token = _extract_bearer_token(request) + with psycopg.connect(settings.database_url, row_factory=dict_row) as conn: + with conn.transaction(): + resolution = resolve_auth_session(conn, session_token=session_token) + user_account_id = resolution["user_account"]["id"] + workspace = _resolve_workspace_for_hosted_channel_request( + conn, + user_account_id=user_account_id, + session_id=resolution["session"]["id"], + preferred_workspace_id=resolution["session"]["workspace_id"], + requested_workspace_id=None, + ) + if workspace is None: + return JSONResponse(status_code=404, content={"detail": "no workspace is currently selected"}) + + payload = list_workspace_scheduler_jobs( + conn, + user_account_id=user_account_id, + workspace_id=workspace["id"], + limit=limit, + ) + except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc: + return JSONResponse(status_code=401, content={"detail": str(exc)}) + except TelegramIdentityNotFoundError as exc: + return JSONResponse(status_code=409, content={"detail": str(exc)}) + except TelegramNotificationPreferenceValidationError as exc: + return JSONResponse(status_code=400, content={"detail": str(exc)}) + + return JSONResponse(status_code=200, content=jsonable_encoder(payload)) + + @app.post("/v1/channels/telegram/messages/{message_id}/handle") def handle_v1_telegram_message( message_id: UUID, diff --git a/apps/api/src/alicebot_api/store.py b/apps/api/src/alicebot_api/store.py index 6df9cbe..bde596a 100644 --- a/apps/api/src/alicebot_api/store.py +++ b/apps/api/src/alicebot_api/store.py @@ -485,6 +485,11 @@ class ChannelDeliveryReceiptRow(TypedDict): provider_receipt_id: str | None failure_code: str | None failure_detail: str | None + scheduled_job_id: UUID | None + scheduler_job_kind: str | None + scheduled_for: datetime | None + schedule_slot: str | None + notification_policy: JsonObject recorded_at: datetime created_at: datetime @@ -513,6 +518,64 @@ class OpenLoopReviewRow(TypedDict): created_at: datetime +class NotificationSubscriptionRow(TypedDict): + id: UUID + workspace_id: UUID + channel_type: str + channel_identity_id: UUID + notifications_enabled: bool + daily_brief_enabled: bool + daily_brief_window_start: str + open_loop_prompts_enabled: bool + waiting_for_prompts_enabled: bool + stale_prompts_enabled: bool + timezone: str + quiet_hours_enabled: bool + quiet_hours_start: str + quiet_hours_end: str + created_at: datetime + updated_at: datetime + + +class ContinuityBriefRow(TypedDict): + id: UUID + workspace_id: UUID + channel_type: str + channel_identity_id: UUID + brief_kind: str + assembly_version: str + summary: JsonObject + brief_payload: JsonObject + message_text: str + compiled_at: datetime + created_at: datetime + + +class DailyBriefJobRow(TypedDict): + id: UUID + workspace_id: UUID + channel_type: str + channel_identity_id: UUID + job_kind: str + prompt_kind: str | None + prompt_id: str | None + continuity_object_id: UUID | None + continuity_brief_id: UUID | None + schedule_slot: str + idempotency_key: str + due_at: datetime + status: str + suppression_reason: str | None + attempt_count: int + delivery_receipt_id: UUID | None + payload: JsonObject + result_payload: JsonObject + attempted_at: datetime | None + completed_at: datetime | None + created_at: datetime + updated_at: datetime + + class TaskArtifactRow(TypedDict): id: UUID user_id: UUID diff --git a/apps/api/src/alicebot_api/telegram_channels.py b/apps/api/src/alicebot_api/telegram_channels.py index 01e0ccc..753c9da 100644 --- a/apps/api/src/alicebot_api/telegram_channels.py +++ b/apps/api/src/alicebot_api/telegram_channels.py @@ -123,6 +123,11 @@ class TelegramDeliveryReceiptRow(TypedDict): provider_receipt_id: str | None failure_code: str | None failure_detail: str | None + scheduled_job_id: UUID | None + scheduler_job_kind: str | None + scheduled_for: datetime | None + schedule_slot: str | None + notification_policy: dict[str, Any] recorded_at: datetime created_at: datetime @@ -966,6 +971,19 @@ def _get_latest_linked_identity( return cur.fetchone() +def get_latest_linked_telegram_identity( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, +) -> TelegramChannelIdentityRow | None: + return _get_latest_linked_identity( + conn, + user_account_id=user_account_id, + workspace_id=workspace_id, + ) + + def get_telegram_link_status( conn, *, @@ -1199,12 +1217,13 @@ def dispatch_telegram_message( external_user_id, message_text, normalized_payload, route_status, idempotency_key, created_at, received_at FROM channel_messages - WHERE channel_type = %s + WHERE workspace_id = %s + AND channel_type = %s AND direction = 'outbound' AND idempotency_key = %s LIMIT 1 """, - (TELEGRAM_CHANNEL_TYPE, resolved_idempotency_key), + (workspace_id, TELEGRAM_CHANNEL_TYPE, resolved_idempotency_key), ) outbound = cur.fetchone() @@ -1244,7 +1263,8 @@ def dispatch_telegram_message( recorded_at = EXCLUDED.recorded_at RETURNING id, workspace_id, channel_message_id, channel_type, status, provider_receipt_id, failure_code, failure_detail, - recorded_at, created_at + scheduled_job_id, scheduler_job_kind, scheduled_for, schedule_slot, + notification_policy, recorded_at, created_at """, ( workspace_id, @@ -1265,6 +1285,227 @@ def dispatch_telegram_message( return outbound, receipt +def dispatch_telegram_workspace_message( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, + text: str, + dispatch_idempotency_key: str, + bot_token: str, + dispatch_payload: dict[str, Any] | None = None, + receipt_status_override: str | None = None, + failure_code_override: str | None = None, + failure_detail_override: str | None = None, + scheduled_job_id: UUID | None = None, + scheduler_job_kind: str | None = None, + scheduled_for: datetime | None = None, + schedule_slot: str | None = None, + notification_policy: dict[str, Any] | None = None, +) -> tuple[TelegramChannelMessageRow, TelegramDeliveryReceiptRow]: + normalized_text = text.strip() + if normalized_text == "": + raise ValueError("dispatch text is required") + + resolved_idempotency_key = dispatch_idempotency_key.strip() + if resolved_idempotency_key == "": + raise ValueError("dispatch idempotency key is required") + + identity = _get_latest_linked_identity( + conn, + user_account_id=user_account_id, + workspace_id=workspace_id, + ) + if identity is None: + raise TelegramIdentityNotFoundError("telegram channel is not linked for this workspace") + + now = utc_now() + external_thread_key = resolve_telegram_thread_key(external_chat_id=identity["external_chat_id"]) + + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO channel_threads ( + workspace_id, + channel_type, + external_thread_key, + channel_identity_id, + last_message_at, + updated_at + ) + VALUES (%s, %s, %s, %s, %s, %s) + ON CONFLICT (workspace_id, channel_type, external_thread_key) DO UPDATE + SET channel_identity_id = EXCLUDED.channel_identity_id, + last_message_at = EXCLUDED.last_message_at, + updated_at = EXCLUDED.updated_at + RETURNING id + """, + ( + workspace_id, + TELEGRAM_CHANNEL_TYPE, + external_thread_key, + identity["id"], + now, + now, + ), + ) + thread = cur.fetchone() + + if thread is None: + raise RuntimeError("failed to resolve telegram channel thread for workspace dispatch") + + provider_message_id = f"simulated:{hashlib.sha256(resolved_idempotency_key.encode('utf-8')).hexdigest()[:20]}" + normalized_dispatch_payload = dispatch_payload or {} + dispatch_mode = "suppressed" if receipt_status_override == "suppressed" else ( + "simulated" if bot_token.strip() == "" else "deterministic_failure" + ) + + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO channel_messages ( + workspace_id, + channel_thread_id, + channel_identity_id, + channel_type, + direction, + provider_update_id, + provider_message_id, + external_chat_id, + external_user_id, + message_text, + normalized_payload, + route_status, + idempotency_key, + received_at + ) + VALUES (%s, %s, %s, %s, 'outbound', NULL, %s, %s, %s, %s, %s, 'resolved', %s, %s) + ON CONFLICT (channel_type, direction, idempotency_key) DO NOTHING + RETURNING id, workspace_id, channel_thread_id, channel_identity_id, + channel_type, direction, provider_update_id, provider_message_id, + external_chat_id, external_user_id, message_text, + normalized_payload, route_status, idempotency_key, created_at, received_at + """, + ( + workspace_id, + thread["id"], + identity["id"], + TELEGRAM_CHANNEL_TYPE, + provider_message_id, + identity["external_chat_id"], + identity["external_user_id"], + normalized_text, + Jsonb( + { + "dispatch": { + "source": "workspace_notification", + "mode": dispatch_mode, + }, + "scheduler": normalized_dispatch_payload, + } + ), + resolved_idempotency_key, + now, + ), + ) + outbound = cur.fetchone() + + if outbound is None: + cur.execute( + """ + SELECT id, workspace_id, channel_thread_id, channel_identity_id, channel_type, + direction, provider_update_id, provider_message_id, external_chat_id, + external_user_id, message_text, normalized_payload, route_status, + idempotency_key, created_at, received_at + FROM channel_messages + WHERE workspace_id = %s + AND channel_type = %s + AND direction = 'outbound' + AND idempotency_key = %s + LIMIT 1 + """, + (workspace_id, TELEGRAM_CHANNEL_TYPE, resolved_idempotency_key), + ) + outbound = cur.fetchone() + + if outbound is None: + raise RuntimeError("failed to create outbound telegram workspace notification message") + + if receipt_status_override is not None: + receipt_status = receipt_status_override + provider_receipt_id: str | None = None + failure_code = failure_code_override + failure_detail = failure_detail_override + else: + receipt_status = "simulated" + failure_code = None + failure_detail = None + provider_receipt_id = outbound["provider_message_id"] + if bot_token.strip() != "": + receipt_status = "failed" + provider_receipt_id = None + failure_code = "telegram_transport_not_enabled" + failure_detail = "live telegram dispatch is not enabled in this environment" + + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO channel_delivery_receipts ( + workspace_id, + channel_message_id, + channel_type, + status, + provider_receipt_id, + failure_code, + failure_detail, + scheduled_job_id, + scheduler_job_kind, + scheduled_for, + schedule_slot, + notification_policy, + recorded_at + ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (channel_message_id) DO UPDATE + SET status = EXCLUDED.status, + provider_receipt_id = EXCLUDED.provider_receipt_id, + failure_code = EXCLUDED.failure_code, + failure_detail = EXCLUDED.failure_detail, + scheduled_job_id = EXCLUDED.scheduled_job_id, + scheduler_job_kind = EXCLUDED.scheduler_job_kind, + scheduled_for = EXCLUDED.scheduled_for, + schedule_slot = EXCLUDED.schedule_slot, + notification_policy = EXCLUDED.notification_policy, + recorded_at = EXCLUDED.recorded_at + RETURNING id, workspace_id, channel_message_id, channel_type, + status, provider_receipt_id, failure_code, failure_detail, + scheduled_job_id, scheduler_job_kind, scheduled_for, schedule_slot, + notification_policy, recorded_at, created_at + """, + ( + workspace_id, + outbound["id"], + TELEGRAM_CHANNEL_TYPE, + receipt_status, + provider_receipt_id, + failure_code, + failure_detail, + scheduled_job_id, + scheduler_job_kind, + scheduled_for, + schedule_slot, + Jsonb(notification_policy or {}), + utc_now(), + ), + ) + receipt = cur.fetchone() + + if receipt is None: + raise RuntimeError("failed to persist telegram workspace notification delivery receipt") + + return outbound, receipt + + def list_workspace_telegram_delivery_receipts( conn, *, @@ -1283,6 +1524,11 @@ def list_workspace_telegram_delivery_receipts( r.provider_receipt_id, r.failure_code, r.failure_detail, + r.scheduled_job_id, + r.scheduler_job_kind, + r.scheduled_for, + r.schedule_slot, + r.notification_policy, r.recorded_at, r.created_at FROM channel_delivery_receipts AS r @@ -1399,6 +1645,13 @@ def serialize_delivery_receipt(receipt: TelegramDeliveryReceiptRow) -> dict[str, "provider_receipt_id": receipt["provider_receipt_id"], "failure_code": receipt["failure_code"], "failure_detail": receipt["failure_detail"], + "scheduled_job_id": None + if receipt["scheduled_job_id"] is None + else str(receipt["scheduled_job_id"]), + "scheduler_job_kind": receipt["scheduler_job_kind"], + "scheduled_for": None if receipt["scheduled_for"] is None else receipt["scheduled_for"].isoformat(), + "schedule_slot": receipt["schedule_slot"], + "notification_policy": receipt["notification_policy"], "recorded_at": receipt["recorded_at"].isoformat(), "created_at": receipt["created_at"].isoformat(), } diff --git a/apps/api/src/alicebot_api/telegram_notifications.py b/apps/api/src/alicebot_api/telegram_notifications.py new file mode 100644 index 0000000..567d584 --- /dev/null +++ b/apps/api/src/alicebot_api/telegram_notifications.py @@ -0,0 +1,1773 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import UTC, date, datetime, time +import hashlib +import re +from typing import Any, Literal, TypedDict +from uuid import UUID +from zoneinfo import ZoneInfo + +from psycopg.types.json import Jsonb + +from alicebot_api.chief_of_staff import compile_chief_of_staff_priority_brief +from alicebot_api.continuity_open_loops import ( + compile_continuity_daily_brief, + compile_continuity_open_loop_dashboard, +) +from alicebot_api.contracts import ( + DEFAULT_CHIEF_OF_STAFF_PRIORITY_LIMIT, + DEFAULT_CONTINUITY_DAILY_BRIEF_LIMIT, + MAX_CHIEF_OF_STAFF_PRIORITY_LIMIT, + MAX_CONTINUITY_DAILY_BRIEF_LIMIT, + MAX_CONTINUITY_OPEN_LOOP_LIMIT, + ChiefOfStaffPriorityBriefRequestInput, + ContinuityDailyBriefRequestInput, + ContinuityOpenLoopDashboardQueryInput, +) +from alicebot_api.db import set_current_user +from alicebot_api.hosted_preferences import ( + DEFAULT_BRIEF_PREFERENCES, + DEFAULT_QUIET_HOURS, + DEFAULT_TIMEZONE, + ensure_user_preferences, + validate_timezone, +) +from alicebot_api.store import ContinuityStore +from alicebot_api.telegram_channels import ( + TELEGRAM_CHANNEL_TYPE, + TelegramDeliveryReceiptRow, + TelegramIdentityNotFoundError, + dispatch_telegram_workspace_message, + get_latest_linked_telegram_identity, + serialize_delivery_receipt, +) + + +_HHMM_PATTERN = re.compile(r"^(?:[01]\d|2[0-3]):[0-5]\d$") + +_TERMINAL_JOB_STATUSES = { + "delivered", + "simulated", + "suppressed_quiet_hours", + "suppressed_disabled", + "suppressed_outside_window", + "failed", +} + + +class TelegramNotificationPreferenceValidationError(ValueError): + """Raised when Telegram notification preferences are invalid.""" + + +class TelegramOpenLoopPromptNotFoundError(LookupError): + """Raised when a Telegram open-loop prompt id does not map to a scoped item.""" + + +class NotificationSubscriptionRow(TypedDict): + id: UUID + workspace_id: UUID + channel_type: str + channel_identity_id: UUID + notifications_enabled: bool + daily_brief_enabled: bool + daily_brief_window_start: str + open_loop_prompts_enabled: bool + waiting_for_prompts_enabled: bool + stale_prompts_enabled: bool + timezone: str + quiet_hours_enabled: bool + quiet_hours_start: str + quiet_hours_end: str + created_at: datetime + updated_at: datetime + + +class ContinuityBriefRow(TypedDict): + id: UUID + workspace_id: UUID + channel_type: str + channel_identity_id: UUID + brief_kind: str + assembly_version: str + summary: dict[str, Any] + brief_payload: dict[str, Any] + message_text: str + compiled_at: datetime + created_at: datetime + + +class DailyBriefJobRow(TypedDict): + id: UUID + workspace_id: UUID + channel_type: str + channel_identity_id: UUID + job_kind: str + prompt_kind: str | None + prompt_id: str | None + continuity_object_id: UUID | None + continuity_brief_id: UUID | None + schedule_slot: str + idempotency_key: str + due_at: datetime + status: str + suppression_reason: str | None + attempt_count: int + delivery_receipt_id: UUID | None + payload: dict[str, Any] + result_payload: dict[str, Any] + attempted_at: datetime | None + completed_at: datetime | None + created_at: datetime + updated_at: datetime + + +class OpenLoopPromptCandidate(TypedDict): + prompt_id: str + prompt_kind: Literal["waiting_for", "stale"] + continuity_object_id: str + title: str + continuity_status: str + review_action_hint: Literal["still_blocked", "deferred"] + due_at: str + message_text: str + + +@dataclass(frozen=True) +class DeliveryPolicyEvaluation: + allowed: bool + suppression_status: str | None + reason: str + window_open: bool + quiet_hours_active: bool + timezone: str + local_time: str + + def as_dict(self) -> dict[str, object]: + return { + "allowed": self.allowed, + "suppression_status": self.suppression_status, + "reason": self.reason, + "window_open": self.window_open, + "quiet_hours_active": self.quiet_hours_active, + "timezone": self.timezone, + "local_time": self.local_time, + } + + +def _utcnow() -> datetime: + return datetime.now(UTC) + + +def _normalize_hhmm(value: object, *, field_name: str) -> str: + if not isinstance(value, str): + raise TelegramNotificationPreferenceValidationError(f"{field_name} must be a string in HH:MM format") + normalized = value.strip() + if _HHMM_PATTERN.fullmatch(normalized) is None: + raise TelegramNotificationPreferenceValidationError(f"{field_name} must use HH:MM 24-hour format") + return normalized + + +def _hhmm_to_minutes(hhmm: str) -> int: + hours, minutes = hhmm.split(":", maxsplit=1) + return int(hours) * 60 + int(minutes) + + +def _local_now(now: datetime, timezone_name: str) -> datetime: + return now.astimezone(ZoneInfo(timezone_name)) + + +def _quiet_hours_active(*, local_now: datetime, start: str, end: str) -> bool: + start_minutes = _hhmm_to_minutes(start) + end_minutes = _hhmm_to_minutes(end) + current_minutes = local_now.hour * 60 + local_now.minute + + if start_minutes == end_minutes: + return False + if start_minutes < end_minutes: + return start_minutes <= current_minutes < end_minutes + return current_minutes >= start_minutes or current_minutes < end_minutes + + +def _window_open(*, local_now: datetime, start: str) -> bool: + return (local_now.hour * 60 + local_now.minute) >= _hhmm_to_minutes(start) + + +def _daily_slot_for_now(*, now: datetime, timezone_name: str) -> str: + return _local_now(now, timezone_name).date().isoformat() + + +def _daily_due_at(*, slot: str, timezone_name: str, window_start: str) -> datetime: + local_date = date.fromisoformat(slot) + hour, minute = window_start.split(":", maxsplit=1) + local_due = datetime.combine( + local_date, + time(int(hour), int(minute), tzinfo=ZoneInfo(timezone_name)), + ) + return local_due.astimezone(UTC) + + +def _resolve_internal_idempotency_key( + *, + workspace_id: UUID, + job_kind: Literal["daily_brief", "open_loop_prompt"], + schedule_slot: str, + prompt_id: str | None, + client_idempotency_key: str | None, +) -> str: + if client_idempotency_key is None: + if job_kind == "daily_brief": + return f"telegram:daily-brief:{workspace_id}:{schedule_slot}" + if prompt_id is None: + raise TelegramNotificationPreferenceValidationError("prompt_id is required for open-loop prompt delivery") + return f"telegram:open-loop-prompt:{workspace_id}:{prompt_id}:{schedule_slot}" + + normalized_key = client_idempotency_key.strip() + if normalized_key == "": + raise TelegramNotificationPreferenceValidationError("idempotency_key must not be empty") + + digest_payload = ( + f"workspace={workspace_id}|job_kind={job_kind}|prompt_id={prompt_id or ''}|client_key={normalized_key}" + ) + digest = hashlib.sha256(digest_payload.encode("utf-8")).hexdigest() + return f"telegram:{job_kind}:custom:{digest}" + + +def _job_columns_sql() -> str: + return ( + "id, workspace_id, channel_type, channel_identity_id, job_kind, prompt_kind, prompt_id, " + "continuity_object_id, continuity_brief_id, schedule_slot, idempotency_key, due_at, status, " + "suppression_reason, attempt_count, delivery_receipt_id, payload, result_payload, attempted_at, " + "completed_at, created_at, updated_at" + ) + + +def _receipt_columns_sql() -> str: + return ( + "id, workspace_id, channel_message_id, channel_type, status, provider_receipt_id, failure_code, " + "failure_detail, scheduled_job_id, scheduler_job_kind, scheduled_for, schedule_slot, " + "notification_policy, recorded_at, created_at" + ) + + +def _brief_columns_sql() -> str: + return ( + "id, workspace_id, channel_type, channel_identity_id, brief_kind, assembly_version, " + "summary, brief_payload, message_text, compiled_at, created_at" + ) + + +def _resolve_linked_identity( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, +): + identity = get_latest_linked_telegram_identity( + conn, + user_account_id=user_account_id, + workspace_id=workspace_id, + ) + if identity is None: + raise TelegramIdentityNotFoundError("telegram channel is not linked for this workspace") + return identity + + +def _subscription_defaults( + *, + timezone: str, + brief_preferences: dict[str, object], + quiet_hours: dict[str, object], +) -> dict[str, object]: + daily_brief = brief_preferences.get("daily_brief") + if not isinstance(daily_brief, dict): + daily_brief = DEFAULT_BRIEF_PREFERENCES["daily_brief"] + + quiet = quiet_hours if isinstance(quiet_hours, dict) else DEFAULT_QUIET_HOURS + + daily_brief_enabled = bool(daily_brief.get("enabled", False)) + daily_brief_window_start = _normalize_hhmm( + daily_brief.get("window_start", "07:00"), + field_name="daily_brief.window_start", + ) + + quiet_hours_enabled = bool(quiet.get("enabled", False)) + quiet_hours_start = _normalize_hhmm(quiet.get("start", "22:00"), field_name="quiet_hours.start") + quiet_hours_end = _normalize_hhmm(quiet.get("end", "07:00"), field_name="quiet_hours.end") + + return { + "notifications_enabled": True, + "daily_brief_enabled": daily_brief_enabled, + "daily_brief_window_start": daily_brief_window_start, + "open_loop_prompts_enabled": True, + "waiting_for_prompts_enabled": True, + "stale_prompts_enabled": True, + "timezone": validate_timezone(timezone), + "quiet_hours_enabled": quiet_hours_enabled, + "quiet_hours_start": quiet_hours_start, + "quiet_hours_end": quiet_hours_end, + } + + +def ensure_workspace_notification_subscription( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, +) -> NotificationSubscriptionRow: + identity = _resolve_linked_identity( + conn, + user_account_id=user_account_id, + workspace_id=workspace_id, + ) + preferences = ensure_user_preferences(conn, user_account_id=user_account_id) + defaults = _subscription_defaults( + timezone=preferences.get("timezone", DEFAULT_TIMEZONE), + brief_preferences=preferences.get("brief_preferences", DEFAULT_BRIEF_PREFERENCES), + quiet_hours=preferences.get("quiet_hours", DEFAULT_QUIET_HOURS), + ) + + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO notification_subscriptions ( + workspace_id, + channel_type, + channel_identity_id, + notifications_enabled, + daily_brief_enabled, + daily_brief_window_start, + open_loop_prompts_enabled, + waiting_for_prompts_enabled, + stale_prompts_enabled, + timezone, + quiet_hours_enabled, + quiet_hours_start, + quiet_hours_end, + updated_at + ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, clock_timestamp()) + ON CONFLICT (workspace_id, channel_type) DO UPDATE + SET channel_identity_id = EXCLUDED.channel_identity_id, + updated_at = clock_timestamp() + RETURNING id, + workspace_id, + channel_type, + channel_identity_id, + notifications_enabled, + daily_brief_enabled, + daily_brief_window_start, + open_loop_prompts_enabled, + waiting_for_prompts_enabled, + stale_prompts_enabled, + timezone, + quiet_hours_enabled, + quiet_hours_start, + quiet_hours_end, + created_at, + updated_at + """, + ( + workspace_id, + TELEGRAM_CHANNEL_TYPE, + identity["id"], + defaults["notifications_enabled"], + defaults["daily_brief_enabled"], + defaults["daily_brief_window_start"], + defaults["open_loop_prompts_enabled"], + defaults["waiting_for_prompts_enabled"], + defaults["stale_prompts_enabled"], + defaults["timezone"], + defaults["quiet_hours_enabled"], + defaults["quiet_hours_start"], + defaults["quiet_hours_end"], + ), + ) + row = cur.fetchone() + + if row is None: + raise RuntimeError("failed to ensure telegram notification subscription") + return row + + +def _validate_patch_fields(patch: dict[str, object]) -> dict[str, object]: + validated: dict[str, object] = {} + + boolean_fields = ( + "notifications_enabled", + "daily_brief_enabled", + "open_loop_prompts_enabled", + "waiting_for_prompts_enabled", + "stale_prompts_enabled", + "quiet_hours_enabled", + ) + for field_name in boolean_fields: + if field_name in patch: + value = patch[field_name] + if not isinstance(value, bool): + raise TelegramNotificationPreferenceValidationError(f"{field_name} must be a boolean") + validated[field_name] = value + + if "daily_brief_window_start" in patch: + validated["daily_brief_window_start"] = _normalize_hhmm( + patch["daily_brief_window_start"], + field_name="daily_brief_window_start", + ) + if "quiet_hours_start" in patch: + validated["quiet_hours_start"] = _normalize_hhmm( + patch["quiet_hours_start"], + field_name="quiet_hours_start", + ) + if "quiet_hours_end" in patch: + validated["quiet_hours_end"] = _normalize_hhmm( + patch["quiet_hours_end"], + field_name="quiet_hours_end", + ) + if "timezone" in patch: + value = patch["timezone"] + if not isinstance(value, str): + raise TelegramNotificationPreferenceValidationError("timezone must be a string") + validated["timezone"] = validate_timezone(value) + + return validated + + +def patch_workspace_notification_subscription( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, + patch: dict[str, object], +) -> NotificationSubscriptionRow: + existing = ensure_workspace_notification_subscription( + conn, + user_account_id=user_account_id, + workspace_id=workspace_id, + ) + if not patch: + return existing + + validated = _validate_patch_fields(patch) + + merged = { + "notifications_enabled": existing["notifications_enabled"], + "daily_brief_enabled": existing["daily_brief_enabled"], + "daily_brief_window_start": existing["daily_brief_window_start"], + "open_loop_prompts_enabled": existing["open_loop_prompts_enabled"], + "waiting_for_prompts_enabled": existing["waiting_for_prompts_enabled"], + "stale_prompts_enabled": existing["stale_prompts_enabled"], + "timezone": existing["timezone"], + "quiet_hours_enabled": existing["quiet_hours_enabled"], + "quiet_hours_start": existing["quiet_hours_start"], + "quiet_hours_end": existing["quiet_hours_end"], + } + merged.update(validated) + + with conn.cursor() as cur: + cur.execute( + """ + UPDATE notification_subscriptions + SET notifications_enabled = %s, + daily_brief_enabled = %s, + daily_brief_window_start = %s, + open_loop_prompts_enabled = %s, + waiting_for_prompts_enabled = %s, + stale_prompts_enabled = %s, + timezone = %s, + quiet_hours_enabled = %s, + quiet_hours_start = %s, + quiet_hours_end = %s, + updated_at = clock_timestamp() + WHERE id = %s + RETURNING id, + workspace_id, + channel_type, + channel_identity_id, + notifications_enabled, + daily_brief_enabled, + daily_brief_window_start, + open_loop_prompts_enabled, + waiting_for_prompts_enabled, + stale_prompts_enabled, + timezone, + quiet_hours_enabled, + quiet_hours_start, + quiet_hours_end, + created_at, + updated_at + """, + ( + merged["notifications_enabled"], + merged["daily_brief_enabled"], + merged["daily_brief_window_start"], + merged["open_loop_prompts_enabled"], + merged["waiting_for_prompts_enabled"], + merged["stale_prompts_enabled"], + merged["timezone"], + merged["quiet_hours_enabled"], + merged["quiet_hours_start"], + merged["quiet_hours_end"], + existing["id"], + ), + ) + row = cur.fetchone() + + if row is None: + raise RuntimeError("failed to patch telegram notification subscription") + return row + + +def _evaluate_delivery_policy( + subscription: NotificationSubscriptionRow, + *, + mode: Literal["daily_brief", "open_loop_prompt"], + prompt_kind: Literal["waiting_for", "stale"] | None, + now: datetime, + force: bool, +) -> DeliveryPolicyEvaluation: + timezone_name = subscription["timezone"] + local_now = _local_now(now, timezone_name) + local_time = local_now.strftime("%Y-%m-%d %H:%M:%S %Z") + window_open = _window_open(local_now=local_now, start=subscription["daily_brief_window_start"]) + quiet_active = False + if subscription["quiet_hours_enabled"]: + quiet_active = _quiet_hours_active( + local_now=local_now, + start=subscription["quiet_hours_start"], + end=subscription["quiet_hours_end"], + ) + + if force: + return DeliveryPolicyEvaluation( + allowed=True, + suppression_status=None, + reason="forced delivery bypassed notification gating", + window_open=window_open, + quiet_hours_active=quiet_active, + timezone=timezone_name, + local_time=local_time, + ) + + if not subscription["notifications_enabled"]: + return DeliveryPolicyEvaluation( + allowed=False, + suppression_status="suppressed_disabled", + reason="telegram notifications are disabled", + window_open=window_open, + quiet_hours_active=quiet_active, + timezone=timezone_name, + local_time=local_time, + ) + + if mode == "daily_brief" and not subscription["daily_brief_enabled"]: + return DeliveryPolicyEvaluation( + allowed=False, + suppression_status="suppressed_disabled", + reason="daily brief notifications are disabled", + window_open=window_open, + quiet_hours_active=quiet_active, + timezone=timezone_name, + local_time=local_time, + ) + + if mode == "open_loop_prompt": + if not subscription["open_loop_prompts_enabled"]: + return DeliveryPolicyEvaluation( + allowed=False, + suppression_status="suppressed_disabled", + reason="open-loop prompts are disabled", + window_open=window_open, + quiet_hours_active=quiet_active, + timezone=timezone_name, + local_time=local_time, + ) + if prompt_kind == "waiting_for" and not subscription["waiting_for_prompts_enabled"]: + return DeliveryPolicyEvaluation( + allowed=False, + suppression_status="suppressed_disabled", + reason="waiting-for prompts are disabled", + window_open=window_open, + quiet_hours_active=quiet_active, + timezone=timezone_name, + local_time=local_time, + ) + if prompt_kind == "stale" and not subscription["stale_prompts_enabled"]: + return DeliveryPolicyEvaluation( + allowed=False, + suppression_status="suppressed_disabled", + reason="stale-item prompts are disabled", + window_open=window_open, + quiet_hours_active=quiet_active, + timezone=timezone_name, + local_time=local_time, + ) + + if not window_open: + return DeliveryPolicyEvaluation( + allowed=False, + suppression_status="suppressed_outside_window", + reason="current local time is before the configured daily brief window", + window_open=window_open, + quiet_hours_active=quiet_active, + timezone=timezone_name, + local_time=local_time, + ) + + if quiet_active: + return DeliveryPolicyEvaluation( + allowed=False, + suppression_status="suppressed_quiet_hours", + reason="delivery is deferred due to quiet hours", + window_open=window_open, + quiet_hours_active=quiet_active, + timezone=timezone_name, + local_time=local_time, + ) + + return DeliveryPolicyEvaluation( + allowed=True, + suppression_status=None, + reason="delivery allowed", + window_open=window_open, + quiet_hours_active=quiet_active, + timezone=timezone_name, + local_time=local_time, + ) + + +def serialize_notification_subscription( + row: NotificationSubscriptionRow, + *, + now: datetime | None = None, +) -> dict[str, object]: + effective_now = _utcnow() if now is None else now + local_now = _local_now(effective_now, row["timezone"]) + quiet_active = False + if row["quiet_hours_enabled"]: + quiet_active = _quiet_hours_active( + local_now=local_now, + start=row["quiet_hours_start"], + end=row["quiet_hours_end"], + ) + + return { + "id": str(row["id"]), + "workspace_id": str(row["workspace_id"]), + "channel_type": row["channel_type"], + "channel_identity_id": str(row["channel_identity_id"]), + "notifications_enabled": row["notifications_enabled"], + "daily_brief_enabled": row["daily_brief_enabled"], + "daily_brief_window_start": row["daily_brief_window_start"], + "open_loop_prompts_enabled": row["open_loop_prompts_enabled"], + "waiting_for_prompts_enabled": row["waiting_for_prompts_enabled"], + "stale_prompts_enabled": row["stale_prompts_enabled"], + "timezone": row["timezone"], + "quiet_hours": { + "enabled": row["quiet_hours_enabled"], + "start": row["quiet_hours_start"], + "end": row["quiet_hours_end"], + "active_now": quiet_active, + "local_time": local_now.isoformat(), + }, + "created_at": row["created_at"].isoformat(), + "updated_at": row["updated_at"].isoformat(), + } + + +def _serialize_brief_row(row: ContinuityBriefRow | None) -> dict[str, object] | None: + if row is None: + return None + return { + "id": str(row["id"]), + "workspace_id": str(row["workspace_id"]), + "channel_type": row["channel_type"], + "channel_identity_id": str(row["channel_identity_id"]), + "brief_kind": row["brief_kind"], + "assembly_version": row["assembly_version"], + "summary": row["summary"], + "brief_payload": row["brief_payload"], + "message_text": row["message_text"], + "compiled_at": row["compiled_at"].isoformat(), + "created_at": row["created_at"].isoformat(), + } + + +def _serialize_job( + row: DailyBriefJobRow, + *, + now: datetime, +) -> dict[str, object]: + return { + "id": str(row["id"]), + "workspace_id": str(row["workspace_id"]), + "channel_type": row["channel_type"], + "channel_identity_id": str(row["channel_identity_id"]), + "job_kind": row["job_kind"], + "prompt_kind": row["prompt_kind"], + "prompt_id": row["prompt_id"], + "continuity_object_id": None + if row["continuity_object_id"] is None + else str(row["continuity_object_id"]), + "continuity_brief_id": None + if row["continuity_brief_id"] is None + else str(row["continuity_brief_id"]), + "schedule_slot": row["schedule_slot"], + "idempotency_key": row["idempotency_key"], + "due_at": row["due_at"].isoformat(), + "status": row["status"], + "suppression_reason": row["suppression_reason"], + "attempt_count": row["attempt_count"], + "delivery_receipt_id": None + if row["delivery_receipt_id"] is None + else str(row["delivery_receipt_id"]), + "payload": row["payload"], + "result_payload": row["result_payload"], + "attempted_at": None if row["attempted_at"] is None else row["attempted_at"].isoformat(), + "completed_at": None if row["completed_at"] is None else row["completed_at"].isoformat(), + "created_at": row["created_at"].isoformat(), + "updated_at": row["updated_at"].isoformat(), + "is_due": row["status"] == "scheduled" and row["due_at"] <= now, + } + + +def _format_daily_brief_message( + *, + brief: dict[str, Any], + chief_brief: dict[str, Any], + timezone_name: str, + now: datetime, +) -> str: + local_day = _local_now(now, timezone_name).strftime("%Y-%m-%d") + waiting_count = int(brief["waiting_for_highlights"]["summary"]["total_count"]) + blocker_count = int(brief["blocker_highlights"]["summary"]["total_count"]) + stale_count = int(brief["stale_items"]["summary"]["total_count"]) + + next_item = brief["next_suggested_action"]["item"] + next_title = "None" + if isinstance(next_item, dict): + next_title = str(next_item.get("title", "None")) + + recommended = chief_brief.get("recommended_next_action") + recommended_title = "No chief-of-staff recommendation" + if isinstance(recommended, dict): + recommended_title = str(recommended.get("title", recommended_title)) + + return "\n".join( + [ + f"Daily Brief ({local_day})", + f"Waiting-for: {waiting_count}", + f"Blockers: {blocker_count}", + f"Stale: {stale_count}", + f"Next suggested action: {next_title}", + f"Chief-of-staff recommendation: {recommended_title}", + "Review open loops with /open-loops and /open-loop done|deferred|still_blocked.", + ] + ) + + +def _build_daily_brief_bundle( + conn, + *, + user_account_id: UUID, + timezone_name: str, + now: datetime, +) -> dict[str, object]: + set_current_user(conn, user_account_id) + store = ContinuityStore(conn) + daily_payload = compile_continuity_daily_brief( + store, + user_id=user_account_id, + request=ContinuityDailyBriefRequestInput( + limit=min(DEFAULT_CONTINUITY_DAILY_BRIEF_LIMIT, MAX_CONTINUITY_DAILY_BRIEF_LIMIT), + ), + ) + chief_payload = compile_chief_of_staff_priority_brief( + store, + user_id=user_account_id, + request=ChiefOfStaffPriorityBriefRequestInput( + limit=min(DEFAULT_CHIEF_OF_STAFF_PRIORITY_LIMIT, MAX_CHIEF_OF_STAFF_PRIORITY_LIMIT), + ), + ) + + daily_brief = daily_payload["brief"] + chief_brief = chief_payload["brief"] + message_text = _format_daily_brief_message( + brief=daily_brief, + chief_brief=chief_brief, + timezone_name=timezone_name, + now=now, + ) + + chief_summary = { + "trust_confidence_posture": chief_brief["summary"].get("trust_confidence_posture"), + "follow_through_total_count": chief_brief["summary"].get("follow_through_total_count"), + "recommended_next_action": chief_brief.get("recommended_next_action"), + } + + return { + "brief": daily_brief, + "chief_of_staff_summary": chief_summary, + "message_text": message_text, + } + + +def _create_continuity_brief_row( + conn, + *, + workspace_id: UUID, + channel_identity_id: UUID, + brief_payload: dict[str, Any], + chief_summary: dict[str, Any], + message_text: str, + now: datetime, +) -> ContinuityBriefRow: + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO continuity_briefs ( + workspace_id, + channel_type, + channel_identity_id, + brief_kind, + assembly_version, + summary, + brief_payload, + message_text, + compiled_at + ) + VALUES (%s, %s, %s, 'daily_brief', %s, %s, %s, %s, %s) + RETURNING + id, + workspace_id, + channel_type, + channel_identity_id, + brief_kind, + assembly_version, + summary, + brief_payload, + message_text, + compiled_at, + created_at + """, + ( + workspace_id, + TELEGRAM_CHANNEL_TYPE, + channel_identity_id, + brief_payload.get("assembly_version", "continuity_daily_brief_v0"), + Jsonb(chief_summary), + Jsonb(brief_payload), + message_text, + now, + ), + ) + row = cur.fetchone() + + if row is None: + raise RuntimeError("failed to persist continuity brief") + return row + + +def _fetch_job_by_idempotency( + conn, + *, + workspace_id: UUID, + job_kind: Literal["daily_brief", "open_loop_prompt"], + idempotency_key: str, +) -> DailyBriefJobRow | None: + with conn.cursor() as cur: + cur.execute( + f""" + SELECT {_job_columns_sql()} + FROM daily_brief_jobs + WHERE workspace_id = %s + AND channel_type = %s + AND job_kind = %s + AND idempotency_key = %s + LIMIT 1 + """, + (workspace_id, TELEGRAM_CHANNEL_TYPE, job_kind, idempotency_key), + ) + return cur.fetchone() + + +def _fetch_jobs_by_workspace( + conn, + *, + workspace_id: UUID, + limit: int, +) -> list[DailyBriefJobRow]: + with conn.cursor() as cur: + cur.execute( + f""" + SELECT {_job_columns_sql()} + FROM daily_brief_jobs + WHERE workspace_id = %s + AND channel_type = %s + ORDER BY due_at DESC, id DESC + LIMIT %s + """, + (workspace_id, TELEGRAM_CHANNEL_TYPE, limit), + ) + return cur.fetchall() + + +def _fetch_receipt_by_id( + conn, + *, + receipt_id: UUID, +) -> TelegramDeliveryReceiptRow | None: + with conn.cursor() as cur: + cur.execute( + f""" + SELECT {_receipt_columns_sql()} + FROM channel_delivery_receipts + WHERE id = %s + LIMIT 1 + """, + (receipt_id,), + ) + return cur.fetchone() + + +def _fetch_brief_by_id( + conn, + *, + brief_id: UUID, +) -> ContinuityBriefRow | None: + with conn.cursor() as cur: + cur.execute( + f""" + SELECT {_brief_columns_sql()} + FROM continuity_briefs + WHERE id = %s + LIMIT 1 + """, + (brief_id,), + ) + return cur.fetchone() + + +def _upsert_scheduled_job( + conn, + *, + workspace_id: UUID, + channel_identity_id: UUID, + job_kind: Literal["daily_brief", "open_loop_prompt"], + prompt_kind: Literal["waiting_for", "stale"] | None, + prompt_id: str | None, + continuity_object_id: UUID | None, + continuity_brief_id: UUID | None, + schedule_slot: str, + idempotency_key: str, + due_at: datetime, + payload: dict[str, object], +) -> DailyBriefJobRow: + with conn.cursor() as cur: + cur.execute( + f""" + INSERT INTO daily_brief_jobs ( + workspace_id, + channel_type, + channel_identity_id, + job_kind, + prompt_kind, + prompt_id, + continuity_object_id, + continuity_brief_id, + schedule_slot, + idempotency_key, + due_at, + status, + payload, + result_payload, + updated_at + ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'scheduled', %s, '{{}}'::jsonb, clock_timestamp()) + ON CONFLICT (workspace_id, channel_type, idempotency_key) DO UPDATE + SET updated_at = daily_brief_jobs.updated_at + RETURNING {_job_columns_sql()} + """, + ( + workspace_id, + TELEGRAM_CHANNEL_TYPE, + channel_identity_id, + job_kind, + prompt_kind, + prompt_id, + continuity_object_id, + continuity_brief_id, + schedule_slot, + idempotency_key, + due_at, + Jsonb(payload), + ), + ) + row = cur.fetchone() + + if row is None: + raise RuntimeError("failed to upsert scheduled daily brief job") + return row + + +def _update_job_result( + conn, + *, + job_id: UUID, + status: str, + suppression_reason: str | None, + delivery_receipt_id: UUID | None, + continuity_brief_id: UUID | None, + result_payload: dict[str, object], + now: datetime, +) -> DailyBriefJobRow: + with conn.cursor() as cur: + cur.execute( + f""" + UPDATE daily_brief_jobs + SET status = %s, + suppression_reason = %s, + delivery_receipt_id = %s, + continuity_brief_id = COALESCE(%s, continuity_brief_id), + result_payload = %s, + attempted_at = %s, + completed_at = %s, + attempt_count = attempt_count + 1, + updated_at = clock_timestamp() + WHERE id = %s + RETURNING {_job_columns_sql()} + """, + ( + status, + suppression_reason, + delivery_receipt_id, + continuity_brief_id, + Jsonb(result_payload), + now, + now, + job_id, + ), + ) + row = cur.fetchone() + + if row is None: + raise RuntimeError("failed to update daily brief job result") + return row + + +def _build_open_loop_prompt_candidates( + conn, + *, + user_account_id: UUID, + now: datetime, + limit: int, +) -> list[OpenLoopPromptCandidate]: + set_current_user(conn, user_account_id) + bounded_limit = min(max(limit, 1), MAX_CONTINUITY_OPEN_LOOP_LIMIT) + dashboard = compile_continuity_open_loop_dashboard( + ContinuityStore(conn), + user_id=user_account_id, + request=ContinuityOpenLoopDashboardQueryInput(limit=bounded_limit), + )["dashboard"] + + def _build( + kind: Literal["waiting_for", "stale"], + *, + review_action_hint: Literal["still_blocked", "deferred"], + section_items: list[dict[str, object]], + ) -> list[OpenLoopPromptCandidate]: + prompts: list[OpenLoopPromptCandidate] = [] + for item in section_items: + continuity_object_id = str(item["id"]) + title = str(item.get("title", continuity_object_id)) + prompt_id = f"{kind}:{continuity_object_id}" + message_text = ( + f"Open-loop prompt ({kind}): {title}\n" + f"Review via /open-loop {continuity_object_id} {review_action_hint}." + ) + prompts.append( + { + "prompt_id": prompt_id, + "prompt_kind": kind, + "continuity_object_id": continuity_object_id, + "title": title, + "continuity_status": str(item.get("status", "active")), + "review_action_hint": review_action_hint, + "due_at": now.isoformat(), + "message_text": message_text, + } + ) + return prompts + + waiting_prompts = _build( + "waiting_for", + review_action_hint="still_blocked", + section_items=dashboard["waiting_for"]["items"], + ) + stale_prompts = _build( + "stale", + review_action_hint="deferred", + section_items=dashboard["stale"]["items"], + ) + return stale_prompts + waiting_prompts + + +def _prompt_key(prompt_kind: str, continuity_object_id: str) -> str: + payload = f"telegram:{prompt_kind}:{continuity_object_id}" + return hashlib.sha256(payload.encode("utf-8")).hexdigest()[:16] + + +def _fetch_latest_prompt_jobs( + conn, + *, + workspace_id: UUID, + prompt_ids: list[str], +) -> dict[str, DailyBriefJobRow]: + if not prompt_ids: + return {} + + with conn.cursor() as cur: + cur.execute( + f""" + SELECT {_job_columns_sql()} + FROM daily_brief_jobs + WHERE workspace_id = %s + AND channel_type = %s + AND job_kind = 'open_loop_prompt' + AND prompt_id = ANY(%s) + ORDER BY created_at DESC, id DESC + """, + (workspace_id, TELEGRAM_CHANNEL_TYPE, prompt_ids), + ) + rows = cur.fetchall() + + latest: dict[str, DailyBriefJobRow] = {} + for row in rows: + prompt_id = row["prompt_id"] + if prompt_id is None: + continue + if prompt_id not in latest: + latest[prompt_id] = row + return latest + + +def get_workspace_notification_preferences( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, + now: datetime | None = None, +) -> dict[str, object]: + effective_now = _utcnow() if now is None else now + subscription = ensure_workspace_notification_subscription( + conn, + user_account_id=user_account_id, + workspace_id=workspace_id, + ) + return { + "workspace_id": str(workspace_id), + "notification_preferences": serialize_notification_subscription(subscription, now=effective_now), + } + + +def get_workspace_daily_brief_preview( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, + now: datetime | None = None, +) -> dict[str, object]: + effective_now = _utcnow() if now is None else now + subscription = ensure_workspace_notification_subscription( + conn, + user_account_id=user_account_id, + workspace_id=workspace_id, + ) + bundle = _build_daily_brief_bundle( + conn, + user_account_id=user_account_id, + timezone_name=subscription["timezone"], + now=effective_now, + ) + policy = _evaluate_delivery_policy( + subscription, + mode="daily_brief", + prompt_kind=None, + now=effective_now, + force=False, + ) + return { + "workspace_id": str(workspace_id), + "brief": bundle["brief"], + "chief_of_staff_summary": bundle["chief_of_staff_summary"], + "preview_message_text": bundle["message_text"], + "delivery_policy": policy.as_dict(), + } + + +def _existing_delivery_artifacts( + conn, + *, + job: DailyBriefJobRow, +) -> tuple[dict[str, object] | None, dict[str, object] | None]: + receipt_payload: dict[str, object] | None = None + brief_payload: dict[str, object] | None = None + + if job["delivery_receipt_id"] is not None: + receipt = _fetch_receipt_by_id(conn, receipt_id=job["delivery_receipt_id"]) + if receipt is not None: + receipt_payload = serialize_delivery_receipt(receipt) + + if job["continuity_brief_id"] is not None: + brief = _fetch_brief_by_id(conn, brief_id=job["continuity_brief_id"]) + brief_payload = _serialize_brief_row(brief) + + return receipt_payload, brief_payload + + +def deliver_workspace_daily_brief( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, + bot_token: str, + force: bool, + idempotency_key: str | None, + now: datetime | None = None, +) -> dict[str, object]: + effective_now = _utcnow() if now is None else now + subscription = ensure_workspace_notification_subscription( + conn, + user_account_id=user_account_id, + workspace_id=workspace_id, + ) + identity = _resolve_linked_identity( + conn, + user_account_id=user_account_id, + workspace_id=workspace_id, + ) + + schedule_slot = _daily_slot_for_now(now=effective_now, timezone_name=subscription["timezone"]) + due_at = _daily_due_at( + slot=schedule_slot, + timezone_name=subscription["timezone"], + window_start=subscription["daily_brief_window_start"], + ) + resolved_idempotency = _resolve_internal_idempotency_key( + workspace_id=workspace_id, + job_kind="daily_brief", + schedule_slot=schedule_slot, + prompt_id=None, + client_idempotency_key=idempotency_key, + ) + existing = _fetch_job_by_idempotency( + conn, + workspace_id=workspace_id, + job_kind="daily_brief", + idempotency_key=resolved_idempotency, + ) + if existing is not None and existing["status"] in _TERMINAL_JOB_STATUSES: + receipt_payload, brief_payload = _existing_delivery_artifacts(conn, job=existing) + return { + "workspace_id": str(workspace_id), + "job": _serialize_job(existing, now=effective_now), + "brief_record": brief_payload, + "delivery_receipt": receipt_payload, + "idempotent_replay": True, + } + + bundle = _build_daily_brief_bundle( + conn, + user_account_id=user_account_id, + timezone_name=subscription["timezone"], + now=effective_now, + ) + brief_record = _create_continuity_brief_row( + conn, + workspace_id=workspace_id, + channel_identity_id=identity["id"], + brief_payload=bundle["brief"], + chief_summary=bundle["chief_of_staff_summary"], + message_text=str(bundle["message_text"]), + now=effective_now, + ) + + policy = _evaluate_delivery_policy( + subscription, + mode="daily_brief", + prompt_kind=None, + now=effective_now, + force=force, + ) + + job = _upsert_scheduled_job( + conn, + workspace_id=workspace_id, + channel_identity_id=identity["id"], + job_kind="daily_brief", + prompt_kind=None, + prompt_id=None, + continuity_object_id=None, + continuity_brief_id=brief_record["id"], + schedule_slot=schedule_slot, + idempotency_key=resolved_idempotency, + due_at=due_at, + payload={ + "scope": "workspace", + "delivery_policy": policy.as_dict(), + "message_text_preview": bundle["message_text"], + }, + ) + + if job["status"] in _TERMINAL_JOB_STATUSES: + receipt_payload, brief_payload = _existing_delivery_artifacts(conn, job=job) + return { + "workspace_id": str(workspace_id), + "job": _serialize_job(job, now=effective_now), + "brief_record": brief_payload, + "delivery_receipt": receipt_payload, + "idempotent_replay": True, + } + + if policy.allowed: + outbound, receipt = dispatch_telegram_workspace_message( + conn, + user_account_id=user_account_id, + workspace_id=workspace_id, + text=str(bundle["message_text"]), + dispatch_idempotency_key=resolved_idempotency, + bot_token=bot_token, + dispatch_payload={"job_kind": "daily_brief", "schedule_slot": schedule_slot}, + scheduled_job_id=job["id"], + scheduler_job_kind="daily_brief", + scheduled_for=due_at, + schedule_slot=schedule_slot, + notification_policy=policy.as_dict(), + ) + del outbound + next_status = receipt["status"] + suppression_reason = None + else: + _, receipt = dispatch_telegram_workspace_message( + conn, + user_account_id=user_account_id, + workspace_id=workspace_id, + text=str(bundle["message_text"]), + dispatch_idempotency_key=resolved_idempotency, + bot_token=bot_token, + dispatch_payload={"job_kind": "daily_brief", "schedule_slot": schedule_slot}, + receipt_status_override="suppressed", + failure_code_override=policy.suppression_status, + failure_detail_override=policy.reason, + scheduled_job_id=job["id"], + scheduler_job_kind="daily_brief", + scheduled_for=due_at, + schedule_slot=schedule_slot, + notification_policy=policy.as_dict(), + ) + next_status = policy.suppression_status or "suppressed_disabled" + suppression_reason = policy.reason + + updated_job = _update_job_result( + conn, + job_id=job["id"], + status=next_status, + suppression_reason=suppression_reason, + delivery_receipt_id=receipt["id"], + continuity_brief_id=brief_record["id"], + result_payload={ + "delivery_policy": policy.as_dict(), + "delivery_receipt_id": str(receipt["id"]), + "status": next_status, + }, + now=effective_now, + ) + + return { + "workspace_id": str(workspace_id), + "job": _serialize_job(updated_job, now=effective_now), + "brief_record": _serialize_brief_row(brief_record), + "delivery_receipt": serialize_delivery_receipt(receipt), + "idempotent_replay": False, + } + + +def list_workspace_open_loop_prompts( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, + now: datetime | None = None, + limit: int = 20, +) -> dict[str, object]: + effective_now = _utcnow() if now is None else now + subscription = ensure_workspace_notification_subscription( + conn, + user_account_id=user_account_id, + workspace_id=workspace_id, + ) + prompts = _build_open_loop_prompt_candidates( + conn, + user_account_id=user_account_id, + now=effective_now, + limit=limit, + ) + prompt_ids = [prompt["prompt_id"] for prompt in prompts] + latest_jobs = _fetch_latest_prompt_jobs(conn, workspace_id=workspace_id, prompt_ids=prompt_ids) + + today_slot = _daily_slot_for_now(now=effective_now, timezone_name=subscription["timezone"]) + + items: list[dict[str, object]] = [] + for prompt in prompts: + latest = latest_jobs.get(prompt["prompt_id"]) + items.append( + { + **prompt, + "prompt_key": _prompt_key(prompt["prompt_kind"], prompt["continuity_object_id"]), + "latest_job_status": None if latest is None else latest["status"], + "already_delivered_today": False + if latest is None + else latest["schedule_slot"] == today_slot and latest["status"] in _TERMINAL_JOB_STATUSES, + } + ) + + return { + "workspace_id": str(workspace_id), + "notification_preferences": serialize_notification_subscription(subscription, now=effective_now), + "items": items, + "summary": { + "total_count": len(items), + "returned_count": len(items), + "prompt_kind_order": ["stale", "waiting_for"], + "item_order": ["kind_order", "created_at_desc", "id_desc"], + }, + } + + +def _resolve_prompt_candidate( + conn, + *, + user_account_id: UUID, + prompt_id: str, + now: datetime, +) -> OpenLoopPromptCandidate: + normalized = prompt_id.strip() + if normalized == "": + raise TelegramOpenLoopPromptNotFoundError("prompt_id must not be empty") + + candidates = _build_open_loop_prompt_candidates( + conn, + user_account_id=user_account_id, + now=now, + limit=MAX_CONTINUITY_OPEN_LOOP_LIMIT, + ) + for candidate in candidates: + if candidate["prompt_id"] == normalized: + return candidate + + raise TelegramOpenLoopPromptNotFoundError(f"open-loop prompt {normalized!r} was not found") + + +def deliver_workspace_open_loop_prompt( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, + prompt_id: str, + bot_token: str, + force: bool, + idempotency_key: str | None, + now: datetime | None = None, +) -> dict[str, object]: + effective_now = _utcnow() if now is None else now + subscription = ensure_workspace_notification_subscription( + conn, + user_account_id=user_account_id, + workspace_id=workspace_id, + ) + identity = _resolve_linked_identity( + conn, + user_account_id=user_account_id, + workspace_id=workspace_id, + ) + prompt = _resolve_prompt_candidate( + conn, + user_account_id=user_account_id, + prompt_id=prompt_id, + now=effective_now, + ) + + continuity_object_id = UUID(prompt["continuity_object_id"]) + schedule_slot = _daily_slot_for_now(now=effective_now, timezone_name=subscription["timezone"]) + due_at = _daily_due_at( + slot=schedule_slot, + timezone_name=subscription["timezone"], + window_start=subscription["daily_brief_window_start"], + ) + resolved_idempotency = _resolve_internal_idempotency_key( + workspace_id=workspace_id, + job_kind="open_loop_prompt", + schedule_slot=schedule_slot, + prompt_id=prompt["prompt_id"], + client_idempotency_key=idempotency_key, + ) + existing = _fetch_job_by_idempotency( + conn, + workspace_id=workspace_id, + job_kind="open_loop_prompt", + idempotency_key=resolved_idempotency, + ) + if existing is not None and existing["status"] in _TERMINAL_JOB_STATUSES: + receipt_payload, _ = _existing_delivery_artifacts(conn, job=existing) + return { + "workspace_id": str(workspace_id), + "job": _serialize_job(existing, now=effective_now), + "delivery_receipt": receipt_payload, + "prompt": prompt, + "idempotent_replay": True, + } + + policy = _evaluate_delivery_policy( + subscription, + mode="open_loop_prompt", + prompt_kind=prompt["prompt_kind"], + now=effective_now, + force=force, + ) + + job = _upsert_scheduled_job( + conn, + workspace_id=workspace_id, + channel_identity_id=identity["id"], + job_kind="open_loop_prompt", + prompt_kind=prompt["prompt_kind"], + prompt_id=prompt["prompt_id"], + continuity_object_id=continuity_object_id, + continuity_brief_id=None, + schedule_slot=schedule_slot, + idempotency_key=resolved_idempotency, + due_at=due_at, + payload={ + "prompt": prompt, + "delivery_policy": policy.as_dict(), + }, + ) + + if job["status"] in _TERMINAL_JOB_STATUSES: + receipt_payload, _ = _existing_delivery_artifacts(conn, job=job) + return { + "workspace_id": str(workspace_id), + "job": _serialize_job(job, now=effective_now), + "delivery_receipt": receipt_payload, + "prompt": prompt, + "idempotent_replay": True, + } + + if policy.allowed: + _, receipt = dispatch_telegram_workspace_message( + conn, + user_account_id=user_account_id, + workspace_id=workspace_id, + text=prompt["message_text"], + dispatch_idempotency_key=resolved_idempotency, + bot_token=bot_token, + dispatch_payload={ + "job_kind": "open_loop_prompt", + "prompt_id": prompt["prompt_id"], + "schedule_slot": schedule_slot, + }, + scheduled_job_id=job["id"], + scheduler_job_kind="open_loop_prompt", + scheduled_for=due_at, + schedule_slot=schedule_slot, + notification_policy=policy.as_dict(), + ) + next_status = receipt["status"] + suppression_reason = None + else: + _, receipt = dispatch_telegram_workspace_message( + conn, + user_account_id=user_account_id, + workspace_id=workspace_id, + text=prompt["message_text"], + dispatch_idempotency_key=resolved_idempotency, + bot_token=bot_token, + dispatch_payload={ + "job_kind": "open_loop_prompt", + "prompt_id": prompt["prompt_id"], + "schedule_slot": schedule_slot, + }, + receipt_status_override="suppressed", + failure_code_override=policy.suppression_status, + failure_detail_override=policy.reason, + scheduled_job_id=job["id"], + scheduler_job_kind="open_loop_prompt", + scheduled_for=due_at, + schedule_slot=schedule_slot, + notification_policy=policy.as_dict(), + ) + next_status = policy.suppression_status or "suppressed_disabled" + suppression_reason = policy.reason + + updated_job = _update_job_result( + conn, + job_id=job["id"], + status=next_status, + suppression_reason=suppression_reason, + delivery_receipt_id=receipt["id"], + continuity_brief_id=None, + result_payload={ + "delivery_policy": policy.as_dict(), + "delivery_receipt_id": str(receipt["id"]), + "status": next_status, + }, + now=effective_now, + ) + + return { + "workspace_id": str(workspace_id), + "job": _serialize_job(updated_job, now=effective_now), + "delivery_receipt": serialize_delivery_receipt(receipt), + "prompt": prompt, + "idempotent_replay": False, + } + + +def _materialize_due_jobs( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, + subscription: NotificationSubscriptionRow, + now: datetime, + prompt_limit: int, +) -> None: + identity = _resolve_linked_identity( + conn, + user_account_id=user_account_id, + workspace_id=workspace_id, + ) + schedule_slot = _daily_slot_for_now(now=now, timezone_name=subscription["timezone"]) + due_at = _daily_due_at( + slot=schedule_slot, + timezone_name=subscription["timezone"], + window_start=subscription["daily_brief_window_start"], + ) + + daily_policy = _evaluate_delivery_policy( + subscription, + mode="daily_brief", + prompt_kind=None, + now=now, + force=False, + ) + if daily_policy.window_open and daily_policy.suppression_status != "suppressed_disabled": + daily_key = f"telegram:daily-brief:{workspace_id}:{schedule_slot}" + _upsert_scheduled_job( + conn, + workspace_id=workspace_id, + channel_identity_id=identity["id"], + job_kind="daily_brief", + prompt_kind=None, + prompt_id=None, + continuity_object_id=None, + continuity_brief_id=None, + schedule_slot=schedule_slot, + idempotency_key=daily_key, + due_at=due_at, + payload={"materialized_by": "scheduler_jobs", "delivery_policy": daily_policy.as_dict()}, + ) + + prompts = _build_open_loop_prompt_candidates( + conn, + user_account_id=user_account_id, + now=now, + limit=prompt_limit, + ) + for prompt in prompts: + prompt_policy = _evaluate_delivery_policy( + subscription, + mode="open_loop_prompt", + prompt_kind=prompt["prompt_kind"], + now=now, + force=False, + ) + if not prompt_policy.window_open: + continue + if prompt_policy.suppression_status == "suppressed_disabled": + continue + + prompt_key = f"telegram:open-loop-prompt:{workspace_id}:{prompt['prompt_id']}:{schedule_slot}" + _upsert_scheduled_job( + conn, + workspace_id=workspace_id, + channel_identity_id=identity["id"], + job_kind="open_loop_prompt", + prompt_kind=prompt["prompt_kind"], + prompt_id=prompt["prompt_id"], + continuity_object_id=UUID(prompt["continuity_object_id"]), + continuity_brief_id=None, + schedule_slot=schedule_slot, + idempotency_key=prompt_key, + due_at=due_at, + payload={ + "materialized_by": "scheduler_jobs", + "prompt": prompt, + "delivery_policy": prompt_policy.as_dict(), + }, + ) + + +def list_workspace_scheduler_jobs( + conn, + *, + user_account_id: UUID, + workspace_id: UUID, + now: datetime | None = None, + limit: int = 50, + prompt_limit: int = 20, +) -> dict[str, object]: + effective_now = _utcnow() if now is None else now + bounded_limit = min(max(limit, 1), 200) + bounded_prompt_limit = min(max(prompt_limit, 1), MAX_CONTINUITY_OPEN_LOOP_LIMIT) + + subscription = ensure_workspace_notification_subscription( + conn, + user_account_id=user_account_id, + workspace_id=workspace_id, + ) + _materialize_due_jobs( + conn, + user_account_id=user_account_id, + workspace_id=workspace_id, + subscription=subscription, + now=effective_now, + prompt_limit=bounded_prompt_limit, + ) + jobs = _fetch_jobs_by_workspace(conn, workspace_id=workspace_id, limit=bounded_limit) + serialized = [_serialize_job(row, now=effective_now) for row in jobs] + + return { + "workspace_id": str(workspace_id), + "notification_preferences": serialize_notification_subscription(subscription, now=effective_now), + "items": serialized, + "summary": { + "total_count": len(serialized), + "due_count": sum(1 for item in serialized if bool(item["is_due"])), + "order": ["due_at_desc", "id_desc"], + }, + } + + +__all__ = [ + "TelegramNotificationPreferenceValidationError", + "TelegramOpenLoopPromptNotFoundError", + "deliver_workspace_daily_brief", + "deliver_workspace_open_loop_prompt", + "ensure_workspace_notification_subscription", + "get_workspace_daily_brief_preview", + "get_workspace_notification_preferences", + "list_workspace_open_loop_prompts", + "list_workspace_scheduler_jobs", + "patch_workspace_notification_subscription", + "serialize_notification_subscription", +] diff --git a/apps/web/app/page.tsx b/apps/web/app/page.tsx index 638ec42..6ba06d1 100644 --- a/apps/web/app/page.tsx +++ b/apps/web/app/page.tsx @@ -41,7 +41,7 @@ const routeCards = [ href: "/settings", title: "Hosted Settings", description: - "Persist timezone, brief-policy inputs, quiet hours, and linked-device visibility without scheduler execution.", + "Persist Telegram notification posture, quiet hours, daily brief delivery state, and scheduler job visibility.", status: "active", }, { diff --git a/apps/web/app/settings/page.test.tsx b/apps/web/app/settings/page.test.tsx index f953590..5f753f1 100644 --- a/apps/web/app/settings/page.test.tsx +++ b/apps/web/app/settings/page.test.tsx @@ -17,6 +17,6 @@ describe("SettingsPage", () => { expect( screen.getByText("Issue a deterministic link challenge bound to the active hosted workspace."), ).toBeInTheDocument(); - expect(screen.getByText(/does not claim Telegram continuity capture/i)).toBeInTheDocument(); + expect(screen.getByText(/does not claim beta admin dashboards/i)).toBeInTheDocument(); }); }); diff --git a/apps/web/app/settings/page.tsx b/apps/web/app/settings/page.tsx index 65c422b..d7f7ae7 100644 --- a/apps/web/app/settings/page.tsx +++ b/apps/web/app/settings/page.tsx @@ -5,9 +5,9 @@ export default function SettingsPage() { return (
diff --git a/apps/web/components/hosted-settings-panel.test.tsx b/apps/web/components/hosted-settings-panel.test.tsx index 3833694..649ac69 100644 --- a/apps/web/components/hosted-settings-panel.test.tsx +++ b/apps/web/components/hosted-settings-panel.test.tsx @@ -22,10 +22,10 @@ describe("HostedSettingsPanel", () => { expect(screen.getByText("Telegram Channel Settings")).toBeInTheDocument(); expect(screen.getByText(/Telegram Link Start/i)).toBeInTheDocument(); - expect(screen.getByText(/Telegram Link Confirm/i)).toBeInTheDocument(); - expect(screen.getByText(/Telegram Status \+ Unlink/i)).toBeInTheDocument(); + expect(screen.getByText(/Daily Brief \+ Notification Preferences/i)).toBeInTheDocument(); + expect(screen.getByText(/Open-Loop Prompts \+ Scheduler/i)).toBeInTheDocument(); expect(screen.getAllByText(/Messages, Threads, Receipts/i).length).toBeGreaterThan(0); - expect(screen.getByText(/does not claim Telegram continuity capture/i)).toBeInTheDocument(); + expect(screen.getByText(/does not claim beta admin dashboards/i)).toBeInTheDocument(); }); it("starts telegram link challenge from hosted controls", async () => { diff --git a/apps/web/components/hosted-settings-panel.tsx b/apps/web/components/hosted-settings-panel.tsx index a5c1dd8..035ee72 100644 --- a/apps/web/components/hosted-settings-panel.tsx +++ b/apps/web/components/hosted-settings-panel.tsx @@ -20,9 +20,17 @@ const settingItems = [ title: "Telegram Status + Unlink", detail: "Inspect current transport readiness and remove Telegram linkage without local tooling.", }, + { + title: "Daily Brief + Notification Preferences", + detail: "Inspect notification posture, quiet-hours policy, and daily brief preview/delivery controls.", + }, + { + title: "Open-Loop Prompts + Scheduler", + detail: "Surface waiting-for/stale prompt candidates and scheduled job posture without admin tooling claims.", + }, { title: "Messages, Threads, Receipts", - detail: "Expose normalized inbound traffic, deterministic routing state, and outbound delivery posture.", + detail: "Expose normalized inbound traffic, deterministic routing state, and outbound delivery evidence.", }, ]; @@ -80,6 +88,54 @@ type TelegramReceipt = { channel_message_id: string; recorded_at: string; failure_code: string | null; + scheduler_job_kind?: string | null; +}; + +type TelegramNotificationPreferences = { + notifications_enabled: boolean; + daily_brief_enabled: boolean; + daily_brief_window_start: string; + open_loop_prompts_enabled: boolean; + waiting_for_prompts_enabled: boolean; + stale_prompts_enabled: boolean; + timezone: string; + quiet_hours: { + enabled: boolean; + start: string; + end: string; + active_now?: boolean; + }; +}; + +type TelegramDailyBriefPreview = { + preview_message_text: string; + delivery_policy: { + allowed: boolean; + suppression_status: string | null; + reason: string; + }; + brief: { + assembly_version: string; + waiting_for_highlights: { summary: { total_count: number } }; + blocker_highlights: { summary: { total_count: number } }; + stale_items: { summary: { total_count: number } }; + }; +}; + +type TelegramOpenLoopPrompt = { + prompt_id: string; + prompt_kind: string; + title: string; + latest_job_status: string | null; + already_delivered_today: boolean; +}; + +type TelegramSchedulerJob = { + id: string; + job_kind: string; + status: string; + due_at: string; + is_due: boolean; }; type HostedSettingsPanelProps = { @@ -126,6 +182,10 @@ export function HostedSettingsPanel({ apiBaseUrl }: HostedSettingsPanelProps) { const [messages, setMessages] = useState([]); const [threads, setThreads] = useState([]); const [receipts, setReceipts] = useState([]); + const [notificationPreferences, setNotificationPreferences] = useState(null); + const [dailyBriefPreview, setDailyBriefPreview] = useState(null); + const [openLoopPrompts, setOpenLoopPrompts] = useState([]); + const [schedulerJobs, setSchedulerJobs] = useState([]); async function requestTelegramJson( path: string, @@ -280,6 +340,112 @@ export function HostedSettingsPanel({ apiBaseUrl }: HostedSettingsPanelProps) { }, "Loading Telegram transport records..."); } + async function handleLoadNotificationPreferences() { + await runOperation(async () => { + const payload = await requestTelegramJson<{ notification_preferences: TelegramNotificationPreferences }>( + "/v1/channels/telegram/notification-preferences", + ); + setNotificationPreferences(payload.notification_preferences); + setStatusTone("success"); + setStatusText("Loaded Telegram notification preference posture."); + }, "Loading Telegram notification preferences..."); + } + + async function handleEnableDailyLoop() { + await runOperation(async () => { + const payload = await requestTelegramJson<{ notification_preferences: TelegramNotificationPreferences }>( + "/v1/channels/telegram/notification-preferences", + { + method: "PATCH", + body: JSON.stringify({ + notifications_enabled: true, + daily_brief_enabled: true, + open_loop_prompts_enabled: true, + waiting_for_prompts_enabled: true, + stale_prompts_enabled: true, + quiet_hours_enabled: false, + daily_brief_window_start: "07:00", + }), + }, + ); + setNotificationPreferences(payload.notification_preferences); + setStatusTone("success"); + setStatusText("Enabled daily brief + open-loop prompt delivery for Telegram."); + }, "Enabling Telegram daily loop..."); + } + + async function handlePreviewDailyBrief() { + await runOperation(async () => { + const payload = await requestTelegramJson("/v1/channels/telegram/daily-brief"); + setDailyBriefPreview(payload); + setStatusTone("success"); + setStatusText("Loaded current daily brief preview and delivery policy."); + }, "Loading Telegram daily brief preview..."); + } + + async function handleDeliverDailyBrief() { + await runOperation(async () => { + const payload = await requestTelegramJson<{ job: TelegramSchedulerJob; idempotent_replay: boolean }>( + "/v1/channels/telegram/daily-brief/deliver", + { method: "POST", body: JSON.stringify({}) }, + ); + setStatusTone("success"); + setStatusText( + payload.idempotent_replay + ? "Daily brief delivery reused existing idempotent job." + : "Daily brief delivery job recorded for Telegram.", + ); + await loadSchedulerJobs(); + }, "Delivering Telegram daily brief..."); + } + + async function loadOpenLoopPrompts() { + const payload = await requestTelegramJson<{ items: TelegramOpenLoopPrompt[] }>( + "/v1/channels/telegram/open-loop-prompts", + ); + setOpenLoopPrompts(payload.items); + } + + async function handleLoadOpenLoopPrompts() { + await runOperation(async () => { + await loadOpenLoopPrompts(); + setStatusTone("success"); + setStatusText("Loaded scheduled waiting-for and stale open-loop prompts."); + }, "Loading Telegram open-loop prompts..."); + } + + async function handleDeliverPrompt(promptId: string) { + await runOperation(async () => { + const payload = await requestTelegramJson<{ idempotent_replay: boolean }>( + `/v1/channels/telegram/open-loop-prompts/${encodeURIComponent(promptId)}/deliver`, + { method: "POST", body: JSON.stringify({}) }, + ); + setStatusTone("success"); + setStatusText( + payload.idempotent_replay + ? "Prompt delivery reused existing idempotent job." + : "Prompt delivery job recorded for Telegram.", + ); + await loadOpenLoopPrompts(); + await loadSchedulerJobs(); + }, "Delivering Telegram open-loop prompt..."); + } + + async function loadSchedulerJobs() { + const payload = await requestTelegramJson<{ items: TelegramSchedulerJob[] }>( + "/v1/channels/telegram/scheduler/jobs", + ); + setSchedulerJobs(payload.items); + } + + async function handleLoadSchedulerJobs() { + await runOperation(async () => { + await loadSchedulerJobs(); + setStatusTone("success"); + setStatusText("Loaded Telegram scheduler job posture."); + }, "Loading Telegram scheduler jobs..."); + } + return (
+ +
+
+ + +
+ + {notificationPreferences ? ( +
+

+ Notifications enabled: {notificationPreferences.notifications_enabled ? "yes" : "no"} +

+

+ Daily brief enabled: {notificationPreferences.daily_brief_enabled ? "yes" : "no"} +

+

+ Open-loop prompts enabled:{" "} + {notificationPreferences.open_loop_prompts_enabled ? "yes" : "no"} +

+

+ Timezone: {notificationPreferences.timezone} +

+

+ Quiet hours:{" "} + {notificationPreferences.quiet_hours.enabled + ? `${notificationPreferences.quiet_hours.start}–${notificationPreferences.quiet_hours.end}` + : "disabled"} +

+
+ ) : null} + +
+ + +
+ + {dailyBriefPreview ? ( +
+

+ Policy allows delivery now: {dailyBriefPreview.delivery_policy.allowed ? "yes" : "no"} +

+

+ Policy reason: {dailyBriefPreview.delivery_policy.reason} +

+

+ Brief counts: waiting-for {dailyBriefPreview.brief.waiting_for_highlights.summary.total_count} + , blockers {dailyBriefPreview.brief.blocker_highlights.summary.total_count}, stale{" "} + {dailyBriefPreview.brief.stale_items.summary.total_count} +

+
+ ) : null} + +
+ + +
+ +

+ Prompt candidates: {openLoopPrompts.length} +

+
    + {openLoopPrompts.slice(0, 5).map((prompt) => ( +
  • + {prompt.prompt_kind} · {prompt.title} · latest {prompt.latest_job_status ?? "none"} + +
  • + ))} +
+ +

+ Scheduler jobs: {schedulerJobs.length} +

+
    + {schedulerJobs.slice(0, 5).map((job) => ( +
  • + {job.job_kind} · {job.status} · due {formatOptionalDate(job.due_at)} +
  • + ))} +
+
+
+

- This surface does not claim Telegram continuity capture, recall, resume, correction, approval - resolution, or scheduler execution. It is transport and control-plane readiness only. + This surface does not claim beta admin dashboards, support consoles, channel expansion beyond Telegram, + or launch hardening as already active. It stays bounded to hosted Telegram brief + notification control.

diff --git a/tests/integration/test_phase10_daily_brief_notifications_api.py b/tests/integration/test_phase10_daily_brief_notifications_api.py new file mode 100644 index 0000000..fc5f04d --- /dev/null +++ b/tests/integration/test_phase10_daily_brief_notifications_api.py @@ -0,0 +1,602 @@ +from __future__ import annotations + +import hashlib +import json +from datetime import UTC, datetime, timedelta +from typing import Any +from urllib.parse import urlencode +from uuid import UUID, uuid4 + +import anyio +import psycopg + +import apps.api.src.alicebot_api.main as main_module +from apps.api.src.alicebot_api.config import Settings + + +def invoke_request( + method: str, + path: str, + *, + query_params: dict[str, str] | None = None, + payload: dict[str, Any] | None = None, + headers: dict[str, str] | None = None, +) -> tuple[int, dict[str, Any]]: + messages: list[dict[str, object]] = [] + encoded_body = b"" if payload is None else json.dumps(payload).encode() + request_received = False + + async def receive() -> dict[str, object]: + nonlocal request_received + if request_received: + return {"type": "http.disconnect"} + + request_received = True + return {"type": "http.request", "body": encoded_body, "more_body": False} + + async def send(message: dict[str, object]) -> None: + messages.append(message) + + query_string = urlencode(query_params or {}).encode() + request_headers = [(b"content-type", b"application/json")] + for key, value in (headers or {}).items(): + request_headers.append((key.lower().encode(), value.encode())) + + scope = { + "type": "http", + "asgi": {"version": "3.0"}, + "http_version": "1.1", + "method": method, + "scheme": "http", + "path": path, + "raw_path": path.encode(), + "query_string": query_string, + "headers": request_headers, + "client": ("testclient", 50000), + "server": ("testserver", 80), + "root_path": "", + } + + anyio.run(main_module.app, scope, receive, send) + + start_message = next(message for message in messages if message["type"] == "http.response.start") + body = b"".join( + message.get("body", b"") + for message in messages + if message["type"] == "http.response.body" + ) + return start_message["status"], json.loads(body) + + +def auth_header(session_token: str) -> dict[str, str]: + return {"authorization": f"Bearer {session_token}"} + + +def _configure_settings(migrated_database_urls, monkeypatch) -> None: + monkeypatch.setattr( + main_module, + "get_settings", + lambda: Settings( + database_url=migrated_database_urls["app"], + magic_link_ttl_seconds=600, + auth_session_ttl_seconds=3600, + device_link_ttl_seconds=600, + telegram_link_ttl_seconds=600, + telegram_bot_username="alicebot", + telegram_webhook_secret="", + telegram_bot_token="", + ), + ) + + +def _bootstrap_workspace_session(email: str) -> tuple[str, str]: + start_status, start_payload = invoke_request( + "POST", + "/v1/auth/magic-link/start", + payload={"email": email}, + ) + assert start_status == 200 + + verify_status, verify_payload = invoke_request( + "POST", + "/v1/auth/magic-link/verify", + payload={ + "challenge_token": start_payload["challenge"]["challenge_token"], + "device_label": "P10-S4 Device", + "device_key": f"device-{email}", + }, + ) + assert verify_status == 200 + session_token = verify_payload["session_token"] + + create_workspace_status, create_workspace_payload = invoke_request( + "POST", + "/v1/workspaces", + payload={"name": "P10-S4 Workspace"}, + headers=auth_header(session_token), + ) + assert create_workspace_status == 201 + workspace_id = create_workspace_payload["workspace"]["id"] + + bootstrap_status, bootstrap_payload = invoke_request( + "POST", + "/v1/workspaces/bootstrap", + payload={"workspace_id": workspace_id}, + headers=auth_header(session_token), + ) + assert bootstrap_status == 200 + assert bootstrap_payload["workspace"]["bootstrap_status"] == "ready" + return session_token, workspace_id + + +def _link_telegram_chat( + *, + session_token: str, + workspace_id: str, + chat_id: int, + user_id: int, + username: str, +) -> None: + start_status, start_payload = invoke_request( + "POST", + "/v1/channels/telegram/link/start", + payload={"workspace_id": workspace_id}, + headers=auth_header(session_token), + ) + assert start_status == 200 + challenge_token = start_payload["challenge"]["challenge_token"] + link_code = start_payload["challenge"]["link_code"] + + webhook_status, webhook_payload = invoke_request( + "POST", + "/v1/channels/telegram/webhook", + payload={ + "update_id": 944001, + "message": { + "message_id": 744001, + "date": 1710000000, + "chat": {"id": chat_id, "type": "private"}, + "from": {"id": user_id, "username": username}, + "text": f"/link {link_code}", + }, + }, + ) + assert webhook_status == 200 + assert webhook_payload["ingest"]["link_status"] == "confirmed" + + confirm_status, confirm_payload = invoke_request( + "POST", + "/v1/channels/telegram/link/confirm", + payload={"challenge_token": challenge_token}, + headers=auth_header(session_token), + ) + assert confirm_status == 201 + assert confirm_payload["identity"]["status"] == "linked" + + +def _resolve_user_account_id(*, admin_db_url: str, session_token: str) -> UUID: + token_hash = hashlib.sha256(session_token.encode("utf-8")).hexdigest() + with psycopg.connect(admin_db_url) as conn: + with conn.cursor() as cur: + cur.execute( + """ + SELECT user_account_id + FROM auth_sessions + WHERE session_token_hash = %s + LIMIT 1 + """, + (token_hash,), + ) + row = cur.fetchone() + if row is None: + raise AssertionError("failed to resolve user account id for session token") + return row[0] + + +def _seed_open_loop_objects(*, admin_db_url: str, user_id: UUID) -> None: + now = datetime(2026, 4, 8, 8, 0, tzinfo=UTC) + seeded = [ + ("WaitingFor", "active", "Waiting For: Vendor SLA"), + ("Blocker", "active", "Blocker: Missing release key"), + ("NextAction", "active", "Next Action: Publish release note"), + ("WaitingFor", "stale", "Waiting For: Stale security signoff"), + ] + + with psycopg.connect(admin_db_url) as conn: + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO users (id, email, display_name) + VALUES (%s, %s, %s) + ON CONFLICT (id) DO UPDATE + SET email = EXCLUDED.email, + display_name = EXCLUDED.display_name + """, + (str(user_id), f"p10s4-{user_id}@example.com", "P10-S4 User"), + ) + + for index, (object_type, status, title) in enumerate(seeded): + capture_event_id = uuid4() + continuity_object_id = uuid4() + created_at = now - timedelta(minutes=index + 1) + cur.execute( + """ + INSERT INTO continuity_capture_events ( + id, + user_id, + raw_content, + explicit_signal, + admission_posture, + admission_reason, + created_at + ) + VALUES (%s, %s, %s, 'note', 'TRIAGE', 'integration_seed', %s) + """, + ( + str(capture_event_id), + str(user_id), + title, + created_at, + ), + ) + cur.execute( + """ + INSERT INTO continuity_objects ( + id, + user_id, + capture_event_id, + object_type, + status, + title, + body, + provenance, + confidence, + last_confirmed_at, + supersedes_object_id, + superseded_by_object_id, + created_at, + updated_at + ) + VALUES ( + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + NULL, + NULL, + NULL, + %s, + %s + ) + """, + ( + str(continuity_object_id), + str(user_id), + str(capture_event_id), + object_type, + status, + title, + json.dumps({"text": title}), + json.dumps({"thread_id": "p10s4-thread"}), + 0.9, + created_at, + created_at, + ), + ) + + +def test_phase10_daily_brief_delivery_records_scheduler_and_idempotency( + migrated_database_urls, + monkeypatch, +) -> None: + _configure_settings(migrated_database_urls, monkeypatch) + session_token, workspace_id = _bootstrap_workspace_session("p10s4-builder@example.com") + _link_telegram_chat( + session_token=session_token, + workspace_id=workspace_id, + chat_id=988001, + user_id=688001, + username="p10s4builder", + ) + + user_account_id = _resolve_user_account_id( + admin_db_url=migrated_database_urls["admin"], + session_token=session_token, + ) + _seed_open_loop_objects(admin_db_url=migrated_database_urls["admin"], user_id=user_account_id) + + patch_status, patch_payload = invoke_request( + "PATCH", + "/v1/channels/telegram/notification-preferences", + payload={ + "notifications_enabled": True, + "daily_brief_enabled": True, + "open_loop_prompts_enabled": True, + "waiting_for_prompts_enabled": True, + "stale_prompts_enabled": True, + "timezone": "UTC", + "daily_brief_window_start": "00:00", + "quiet_hours_enabled": False, + }, + headers=auth_header(session_token), + ) + assert patch_status == 200 + assert patch_payload["notification_preferences"]["daily_brief_enabled"] is True + + preview_status, preview_payload = invoke_request( + "GET", + "/v1/channels/telegram/daily-brief", + headers=auth_header(session_token), + ) + assert preview_status == 200 + assert preview_payload["brief"]["assembly_version"] == "continuity_daily_brief_v0" + assert preview_payload["delivery_policy"]["allowed"] is True + + first_deliver_status, first_deliver_payload = invoke_request( + "POST", + "/v1/channels/telegram/daily-brief/deliver", + payload={}, + headers=auth_header(session_token), + ) + assert first_deliver_status == 201 + assert first_deliver_payload["idempotent_replay"] is False + assert first_deliver_payload["job"]["job_kind"] == "daily_brief" + assert first_deliver_payload["job"]["status"] == "simulated" + assert first_deliver_payload["delivery_receipt"]["status"] == "simulated" + assert first_deliver_payload["delivery_receipt"]["scheduler_job_kind"] == "daily_brief" + + second_deliver_status, second_deliver_payload = invoke_request( + "POST", + "/v1/channels/telegram/daily-brief/deliver", + payload={}, + headers=auth_header(session_token), + ) + assert second_deliver_status == 200 + assert second_deliver_payload["idempotent_replay"] is True + assert second_deliver_payload["job"]["id"] == first_deliver_payload["job"]["id"] + + receipts_status, receipts_payload = invoke_request( + "GET", + "/v1/channels/telegram/delivery-receipts", + headers=auth_header(session_token), + ) + assert receipts_status == 200 + assert receipts_payload["summary"]["total_count"] >= 1 + assert receipts_payload["items"][0]["scheduler_job_kind"] in {"daily_brief", "open_loop_prompt"} + + +def test_phase10_quiet_hours_disabled_notifications_and_stale_prompt_delivery( + migrated_database_urls, + monkeypatch, +) -> None: + _configure_settings(migrated_database_urls, monkeypatch) + session_token, workspace_id = _bootstrap_workspace_session("p10s4-suppress@example.com") + _link_telegram_chat( + session_token=session_token, + workspace_id=workspace_id, + chat_id=988002, + user_id=688002, + username="p10s4suppress", + ) + + user_account_id = _resolve_user_account_id( + admin_db_url=migrated_database_urls["admin"], + session_token=session_token, + ) + _seed_open_loop_objects(admin_db_url=migrated_database_urls["admin"], user_id=user_account_id) + + disabled_status, disabled_payload = invoke_request( + "PATCH", + "/v1/channels/telegram/notification-preferences", + payload={ + "notifications_enabled": False, + "daily_brief_enabled": True, + "timezone": "UTC", + "daily_brief_window_start": "00:00", + "quiet_hours_enabled": False, + }, + headers=auth_header(session_token), + ) + assert disabled_status == 200 + assert disabled_payload["notification_preferences"]["notifications_enabled"] is False + + disabled_deliver_status, disabled_deliver_payload = invoke_request( + "POST", + "/v1/channels/telegram/daily-brief/deliver", + payload={}, + headers=auth_header(session_token), + ) + assert disabled_deliver_status == 201 + assert disabled_deliver_payload["job"]["status"] == "suppressed_disabled" + assert disabled_deliver_payload["delivery_receipt"]["status"] == "suppressed" + + quiet_status, quiet_payload = invoke_request( + "PATCH", + "/v1/channels/telegram/notification-preferences", + payload={ + "notifications_enabled": True, + "daily_brief_enabled": True, + "open_loop_prompts_enabled": True, + "waiting_for_prompts_enabled": True, + "stale_prompts_enabled": True, + "timezone": "UTC", + "daily_brief_window_start": "00:00", + "quiet_hours_enabled": True, + "quiet_hours_start": "00:00", + "quiet_hours_end": "23:59", + }, + headers=auth_header(session_token), + ) + assert quiet_status == 200 + assert quiet_payload["notification_preferences"]["quiet_hours"]["enabled"] is True + + quiet_deliver_status, quiet_deliver_payload = invoke_request( + "POST", + "/v1/channels/telegram/daily-brief/deliver", + payload={"idempotency_key": "quiet-hours-p10s4-delivery"}, + headers=auth_header(session_token), + ) + assert quiet_deliver_status == 201 + assert quiet_deliver_payload["job"]["status"] == "suppressed_quiet_hours" + assert quiet_deliver_payload["delivery_receipt"]["status"] == "suppressed" + + enable_prompts_status, _enable_prompts_payload = invoke_request( + "PATCH", + "/v1/channels/telegram/notification-preferences", + payload={ + "notifications_enabled": True, + "daily_brief_enabled": True, + "open_loop_prompts_enabled": True, + "waiting_for_prompts_enabled": True, + "stale_prompts_enabled": True, + "timezone": "UTC", + "daily_brief_window_start": "00:00", + "quiet_hours_enabled": False, + }, + headers=auth_header(session_token), + ) + assert enable_prompts_status == 200 + + prompts_status, prompts_payload = invoke_request( + "GET", + "/v1/channels/telegram/open-loop-prompts", + headers=auth_header(session_token), + ) + assert prompts_status == 200 + assert prompts_payload["summary"]["returned_count"] >= 1 + + stale_prompt = next(item for item in prompts_payload["items"] if item["prompt_kind"] == "stale") + + first_prompt_status, first_prompt_payload = invoke_request( + "POST", + f"/v1/channels/telegram/open-loop-prompts/{stale_prompt['prompt_id']}/deliver", + payload={}, + headers=auth_header(session_token), + ) + assert first_prompt_status == 201 + assert first_prompt_payload["job"]["job_kind"] == "open_loop_prompt" + assert first_prompt_payload["job"]["status"] == "simulated" + + second_prompt_status, second_prompt_payload = invoke_request( + "POST", + f"/v1/channels/telegram/open-loop-prompts/{stale_prompt['prompt_id']}/deliver", + payload={}, + headers=auth_header(session_token), + ) + assert second_prompt_status == 200 + assert second_prompt_payload["idempotent_replay"] is True + + scheduler_status, scheduler_payload = invoke_request( + "GET", + "/v1/channels/telegram/scheduler/jobs", + headers=auth_header(session_token), + ) + assert scheduler_status == 200 + assert scheduler_payload["summary"]["total_count"] >= 1 + + +def test_phase10_custom_idempotency_key_is_scoped_per_workspace( + migrated_database_urls, + monkeypatch, +) -> None: + _configure_settings(migrated_database_urls, monkeypatch) + + session_token_a, workspace_id_a = _bootstrap_workspace_session("p10s4-scope-a@example.com") + _link_telegram_chat( + session_token=session_token_a, + workspace_id=workspace_id_a, + chat_id=988010, + user_id=688010, + username="p10s4scopea", + ) + user_account_id_a = _resolve_user_account_id( + admin_db_url=migrated_database_urls["admin"], + session_token=session_token_a, + ) + _seed_open_loop_objects(admin_db_url=migrated_database_urls["admin"], user_id=user_account_id_a) + + session_token_b, workspace_id_b = _bootstrap_workspace_session("p10s4-scope-b@example.com") + _link_telegram_chat( + session_token=session_token_b, + workspace_id=workspace_id_b, + chat_id=988011, + user_id=688011, + username="p10s4scopeb", + ) + user_account_id_b = _resolve_user_account_id( + admin_db_url=migrated_database_urls["admin"], + session_token=session_token_b, + ) + _seed_open_loop_objects(admin_db_url=migrated_database_urls["admin"], user_id=user_account_id_b) + + patch_payload = { + "notifications_enabled": True, + "daily_brief_enabled": True, + "open_loop_prompts_enabled": True, + "waiting_for_prompts_enabled": True, + "stale_prompts_enabled": True, + "timezone": "UTC", + "daily_brief_window_start": "00:00", + "quiet_hours_enabled": False, + } + patch_a_status, _ = invoke_request( + "PATCH", + "/v1/channels/telegram/notification-preferences", + payload=patch_payload, + headers=auth_header(session_token_a), + ) + patch_b_status, _ = invoke_request( + "PATCH", + "/v1/channels/telegram/notification-preferences", + payload=patch_payload, + headers=auth_header(session_token_b), + ) + assert patch_a_status == 200 + assert patch_b_status == 200 + + shared_key = "shared-p10s4-idempotency-key" + + first_a_status, first_a_payload = invoke_request( + "POST", + "/v1/channels/telegram/daily-brief/deliver", + payload={"idempotency_key": shared_key}, + headers=auth_header(session_token_a), + ) + first_b_status, first_b_payload = invoke_request( + "POST", + "/v1/channels/telegram/daily-brief/deliver", + payload={"idempotency_key": shared_key}, + headers=auth_header(session_token_b), + ) + assert first_a_status == 201 + assert first_b_status == 201 + assert first_a_payload["idempotent_replay"] is False + assert first_b_payload["idempotent_replay"] is False + assert first_a_payload["workspace_id"] == workspace_id_a + assert first_b_payload["workspace_id"] == workspace_id_b + assert first_a_payload["job"]["id"] != first_b_payload["job"]["id"] + assert first_a_payload["delivery_receipt"]["id"] != first_b_payload["delivery_receipt"]["id"] + + replay_a_status, replay_a_payload = invoke_request( + "POST", + "/v1/channels/telegram/daily-brief/deliver", + payload={"idempotency_key": shared_key}, + headers=auth_header(session_token_a), + ) + replay_b_status, replay_b_payload = invoke_request( + "POST", + "/v1/channels/telegram/daily-brief/deliver", + payload={"idempotency_key": shared_key}, + headers=auth_header(session_token_b), + ) + assert replay_a_status == 200 + assert replay_b_status == 200 + assert replay_a_payload["idempotent_replay"] is True + assert replay_b_payload["idempotent_replay"] is True + assert replay_a_payload["job"]["id"] == first_a_payload["job"]["id"] + assert replay_b_payload["job"]["id"] == first_b_payload["job"]["id"] diff --git a/tests/unit/test_20260408_0046_phase10_daily_brief_notifications.py b/tests/unit/test_20260408_0046_phase10_daily_brief_notifications.py new file mode 100644 index 0000000..fa8f8ad --- /dev/null +++ b/tests/unit/test_20260408_0046_phase10_daily_brief_notifications.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +import importlib + + +MODULE_NAME = "apps.api.alembic.versions.20260408_0046_phase10_daily_brief_notifications" + + +def load_migration_module(): + return importlib.import_module(MODULE_NAME) + + +def test_upgrade_executes_expected_statements_in_order(monkeypatch) -> None: + module = load_migration_module() + executed: list[str] = [] + + monkeypatch.setattr(module.op, "execute", executed.append) + + module.upgrade() + + assert executed == [ + *module._UPGRADE_STATEMENTS, + *module._UPGRADE_GRANT_STATEMENTS, + ] + + +def test_downgrade_executes_expected_statements_in_order(monkeypatch) -> None: + module = load_migration_module() + executed: list[str] = [] + + monkeypatch.setattr(module.op, "execute", executed.append) + + module.downgrade() + + assert executed == list(module._DOWNGRADE_STATEMENTS) + + +def test_migration_adds_phase10_s4_tables_and_scheduler_receipt_fields() -> None: + module = load_migration_module() + joined_upgrade_sql = "\n".join(module._UPGRADE_STATEMENTS) + + assert "CREATE TABLE notification_subscriptions" in joined_upgrade_sql + assert "CREATE TABLE continuity_briefs" in joined_upgrade_sql + assert "CREATE TABLE daily_brief_jobs" in joined_upgrade_sql + assert "ADD COLUMN scheduled_job_id uuid" in joined_upgrade_sql + assert "ADD COLUMN scheduler_job_kind text" in joined_upgrade_sql + assert "ADD COLUMN scheduled_for timestamptz" in joined_upgrade_sql + assert "ADD COLUMN schedule_slot text" in joined_upgrade_sql + assert "ADD COLUMN notification_policy jsonb" in joined_upgrade_sql + assert "UNIQUE (workspace_id, channel_type, idempotency_key)" in joined_upgrade_sql + + +def test_migration_extends_delivery_receipt_status_for_policy_suppression() -> None: + module = load_migration_module() + joined_upgrade_sql = "\n".join(module._UPGRADE_STATEMENTS) + + assert "status IN ('delivered', 'failed', 'simulated', 'suppressed')" in joined_upgrade_sql + assert "scheduler_job_kind IN ('daily_brief', 'open_loop_prompt')" in joined_upgrade_sql diff --git a/tests/unit/test_main.py b/tests/unit/test_main.py index b2219fc..7a71342 100644 --- a/tests/unit/test_main.py +++ b/tests/unit/test_main.py @@ -163,6 +163,12 @@ def test_healthcheck_route_is_registered() -> None: assert "/v0/task-steps/{task_step_id}/transition" in route_paths assert "/v0/entities/{entity_id}" in route_paths assert "/v0/entities/{entity_id}/edges" in route_paths + assert "/v1/channels/telegram/daily-brief" in route_paths + assert "/v1/channels/telegram/daily-brief/deliver" in route_paths + assert "/v1/channels/telegram/notification-preferences" in route_paths + assert "/v1/channels/telegram/open-loop-prompts" in route_paths + assert "/v1/channels/telegram/open-loop-prompts/{prompt_id}/deliver" in route_paths + assert "/v1/channels/telegram/scheduler/jobs" in route_paths def test_redact_url_credentials_strips_embedded_secrets() -> None: diff --git a/tests/unit/test_telegram_notifications.py b/tests/unit/test_telegram_notifications.py new file mode 100644 index 0000000..e1473ac --- /dev/null +++ b/tests/unit/test_telegram_notifications.py @@ -0,0 +1,201 @@ +from __future__ import annotations + +from datetime import UTC, datetime +from uuid import uuid4 + +import alicebot_api.telegram_notifications as notifications + + +def _subscription_row(**overrides): + row = { + "id": uuid4(), + "workspace_id": uuid4(), + "channel_type": "telegram", + "channel_identity_id": uuid4(), + "notifications_enabled": True, + "daily_brief_enabled": True, + "daily_brief_window_start": "07:00", + "open_loop_prompts_enabled": True, + "waiting_for_prompts_enabled": True, + "stale_prompts_enabled": True, + "timezone": "Europe/Stockholm", + "quiet_hours_enabled": True, + "quiet_hours_start": "22:00", + "quiet_hours_end": "07:00", + "created_at": datetime(2026, 4, 8, 9, 0, tzinfo=UTC), + "updated_at": datetime(2026, 4, 8, 9, 0, tzinfo=UTC), + } + row.update(overrides) + return row + + +def test_delivery_policy_enforces_quiet_hours_deterministically() -> None: + subscription = _subscription_row() + + policy = notifications._evaluate_delivery_policy( + subscription, + mode="daily_brief", + prompt_kind=None, + now=datetime(2026, 4, 8, 21, 30, tzinfo=UTC), + force=False, + ) + + assert policy.allowed is False + assert policy.suppression_status == "suppressed_quiet_hours" + assert policy.quiet_hours_active is True + + +def test_materialize_due_jobs_selects_daily_and_prompt_jobs(monkeypatch) -> None: + captured: list[dict[str, object]] = [] + + monkeypatch.setattr( + notifications, + "_resolve_linked_identity", + lambda *_args, **_kwargs: {"id": uuid4()}, + ) + monkeypatch.setattr( + notifications, + "_build_open_loop_prompt_candidates", + lambda *_args, **_kwargs: [ + { + "prompt_id": "stale:11111111-1111-4111-8111-111111111111", + "prompt_kind": "stale", + "continuity_object_id": "11111111-1111-4111-8111-111111111111", + "title": "Stale item", + "continuity_status": "stale", + "review_action_hint": "deferred", + "due_at": datetime(2026, 4, 8, 8, 0, tzinfo=UTC).isoformat(), + "message_text": "prompt", + }, + { + "prompt_id": "waiting_for:22222222-2222-4222-8222-222222222222", + "prompt_kind": "waiting_for", + "continuity_object_id": "22222222-2222-4222-8222-222222222222", + "title": "Waiting for item", + "continuity_status": "active", + "review_action_hint": "still_blocked", + "due_at": datetime(2026, 4, 8, 8, 0, tzinfo=UTC).isoformat(), + "message_text": "prompt", + }, + ], + ) + + def _capture_upsert(_conn, **kwargs): + captured.append(kwargs) + return { + "id": uuid4(), + "workspace_id": kwargs["workspace_id"], + "channel_type": "telegram", + "channel_identity_id": kwargs["channel_identity_id"], + "job_kind": kwargs["job_kind"], + "prompt_kind": kwargs["prompt_kind"], + "prompt_id": kwargs["prompt_id"], + "continuity_object_id": kwargs["continuity_object_id"], + "continuity_brief_id": kwargs["continuity_brief_id"], + "schedule_slot": kwargs["schedule_slot"], + "idempotency_key": kwargs["idempotency_key"], + "due_at": kwargs["due_at"], + "status": "scheduled", + "suppression_reason": None, + "attempt_count": 0, + "delivery_receipt_id": None, + "payload": kwargs["payload"], + "result_payload": {}, + "attempted_at": None, + "completed_at": None, + "created_at": datetime(2026, 4, 8, 8, 0, tzinfo=UTC), + "updated_at": datetime(2026, 4, 8, 8, 0, tzinfo=UTC), + } + + monkeypatch.setattr(notifications, "_upsert_scheduled_job", _capture_upsert) + + workspace_id = uuid4() + subscription = _subscription_row( + workspace_id=workspace_id, + quiet_hours_enabled=False, + timezone="UTC", + daily_brief_window_start="07:00", + ) + + notifications._materialize_due_jobs( + conn=object(), + user_account_id=uuid4(), + workspace_id=workspace_id, + subscription=subscription, + now=datetime(2026, 4, 8, 8, 0, tzinfo=UTC), + prompt_limit=5, + ) + + assert len(captured) == 3 + assert [item["job_kind"] for item in captured] == [ + "daily_brief", + "open_loop_prompt", + "open_loop_prompt", + ] + + +def test_daily_brief_bundle_uses_continuity_and_chief_of_staff_sources(monkeypatch) -> None: + monkeypatch.setattr(notifications, "set_current_user", lambda *_args, **_kwargs: None) + monkeypatch.setattr( + notifications, + "compile_continuity_daily_brief", + lambda *_args, **_kwargs: { + "brief": { + "assembly_version": "continuity_daily_brief_v0", + "waiting_for_highlights": {"summary": {"total_count": 2}}, + "blocker_highlights": {"summary": {"total_count": 1}}, + "stale_items": {"summary": {"total_count": 1}}, + "next_suggested_action": {"item": {"title": "Next Action: Ship P10-S4"}}, + } + }, + ) + monkeypatch.setattr( + notifications, + "compile_chief_of_staff_priority_brief", + lambda *_args, **_kwargs: { + "brief": { + "summary": { + "trust_confidence_posture": "medium", + "follow_through_total_count": 3, + }, + "recommended_next_action": {"title": "Follow up waiting-for dependency"}, + } + }, + ) + + bundle = notifications._build_daily_brief_bundle( + conn=object(), + user_account_id=uuid4(), + timezone_name="UTC", + now=datetime(2026, 4, 8, 8, 0, tzinfo=UTC), + ) + + assert bundle["brief"]["assembly_version"] == "continuity_daily_brief_v0" + assert bundle["chief_of_staff_summary"]["trust_confidence_posture"] == "medium" + assert "Waiting-for: 2" in bundle["message_text"] + assert "Chief-of-staff recommendation: Follow up waiting-for dependency" in bundle["message_text"] + + +def test_internal_idempotency_key_scopes_custom_values_by_workspace() -> None: + shared_client_key = "same-client-key" + workspace_a = uuid4() + workspace_b = uuid4() + + key_a = notifications._resolve_internal_idempotency_key( + workspace_id=workspace_a, + job_kind="daily_brief", + schedule_slot="2026-04-08", + prompt_id=None, + client_idempotency_key=shared_client_key, + ) + key_b = notifications._resolve_internal_idempotency_key( + workspace_id=workspace_b, + job_kind="daily_brief", + schedule_slot="2026-04-08", + prompt_id=None, + client_idempotency_key=shared_client_key, + ) + + assert key_a != key_b + assert key_a.startswith("telegram:daily_brief:custom:") + assert key_b.startswith("telegram:daily_brief:custom:")